@ -6,7 +6,11 @@ For compatibility with Py3 and for easier debugging, this new version drops
introspection and adds all arguments to the models .
'''
from __future__ import print_function
from six import string_types
from future import standard_library
standard_library . install_aliases ( )
from future . utils import with_metaclass
from past . builtins import basestring
import time
import copy
@ -15,6 +19,8 @@ import os
import jsonref
import jsonschema
import inspect
from collections import UserDict
from abc import ABCMeta
from flask import Response as FlaskResponse
from pyld import jsonld
@ -62,7 +68,7 @@ class Context(dict):
return contexts
elif isinstance ( context , dict ) :
return Context ( context )
elif isinstance ( context , string_types ) :
elif isinstance ( context , base string) :
try :
with open ( context ) as f :
return Context ( json . loads ( f . read ( ) ) )
@ -75,9 +81,154 @@ class Context(dict):
base_context = Context . load ( CONTEXT_PATH )
class SenpyMixin ( object ) :
class BaseMeta ( ABCMeta ) :
'''
Metaclass for models . It extracts the default values for the fields in
the model .
For instance , instances of the following class wouldn ' t need to mark
their version or description on initialization :
. . code - block : : python
class MyPlugin ( Plugin ) :
version = 0.3
description = ' A dull plugin '
Note that these operations could be included in the __init__ of the
class , but it would be very inefficient .
'''
def __new__ ( mcs , name , bases , attrs , * * kwargs ) :
defaults = { }
if ' schema ' in attrs :
defaults = mcs . get_defaults ( attrs [ ' schema ' ] )
for b in bases :
if hasattr ( b , ' defaults ' ) :
defaults . update ( b . defaults )
info = mcs . attrs_to_dict ( attrs )
defaults . update ( info )
attrs [ ' defaults ' ] = defaults
return super ( BaseMeta , mcs ) . __new__ ( mcs , name , bases , attrs )
@staticmethod
def attrs_to_dict ( attrs ) :
'''
Extract the attributes of the class .
This allows adding default values in the class definition .
e . g . :
'''
def is_attr ( k , v ) :
return ( not ( inspect . isroutine ( v ) or
inspect . ismethod ( v ) or
inspect . ismodule ( v ) or
isinstance ( v , property ) ) and
k [ 0 ] != ' _ ' and
k != ' schema ' and
k != ' data ' )
return { key : copy . deepcopy ( value ) for key , value in attrs . items ( ) if is_attr ( key , value ) }
@staticmethod
def get_defaults ( schema ) :
temp = { }
for obj in [
schema ,
] + schema . get ( ' allOf ' , [ ] ) :
for k , v in obj . get ( ' properties ' , { } ) . items ( ) :
if ' default ' in v and k not in temp :
temp [ k ] = copy . deepcopy ( v [ ' default ' ] )
return temp
class CustomDict ( UserDict , object ) :
'''
A dictionary whose elements can also be accessed as attributes . Since some
characters are not valid in the dot - notation , the attribute names also
converted . e . g . :
> d = CustomDict ( )
> d . key = d [ ' ns:name ' ] = 1
> d . key == d [ ' key ' ]
True
> d . ns__name == d [ ' ns:name ' ]
'''
defaults = [ ]
def __init__ ( self , * args , * * kwargs ) :
temp = copy . deepcopy ( self . defaults )
for arg in args :
temp . update ( copy . deepcopy ( arg ) )
for k , v in kwargs . items ( ) :
temp [ self . _get_key ( k ) ] = v
super ( CustomDict , self ) . __init__ ( temp )
@staticmethod
def _get_key ( key ) :
if key is ' id ' :
key = ' @id '
key = key . replace ( " __ " , " : " , 1 )
return key
@staticmethod
def _internal_key ( key ) :
return key [ 0 ] == ' _ ' or key == ' data '
def __getattr__ ( self , key ) :
'''
__getattr__ only gets called when the attribute could not be found
in the __dict__ . So we only need to look for the the element in the
dictionary , or raise an Exception .
'''
mkey = self . _get_key ( key )
if not self . _internal_key ( key ) and mkey in self :
return self [ mkey ]
raise AttributeError ( key )
def __setattr__ ( self , key , value ) :
# Work as usual for internal properties or already existing
# properties
if self . _internal_key ( key ) or key in self . __dict__ :
return super ( CustomDict , self ) . __setattr__ ( key , value )
key = self . _get_key ( key )
return self . __setitem__ ( self . _get_key ( key ) , value )
def __delattr__ ( self , key ) :
if self . _internal_key ( key ) :
return object . __delattr__ ( self , key )
key = self . _get_key ( key )
self . __delitem__ ( self . _get_key ( key ) )
class BaseModel ( with_metaclass ( BaseMeta , CustomDict ) ) :
'''
Entities of the base model are a special kind of dictionary that emulates
a JSON - LD object . The structure of the dictionary is checked via JSON - schema .
For convenience , the values can also be accessed as attributes
( a la Javascript ) . e . g . :
> myobject . key == myobject [ ' key ' ]
True
> myobject . ns__name == myobject [ ' ns:name ' ]
True
'''
schema = base_schema
_context = base_context [ " @context " ]
def __init__ ( self , * args , * * kwargs ) :
auto_id = kwargs . pop ( ' _auto_id ' , True )
super ( BaseModel , self ) . __init__ ( * args , * * kwargs )
if ' @id ' not in self and auto_id :
self . id = ' : {} _ {} ' . format ( type ( self ) . __name__ , time . time ( ) )
if ' @type ' not in self :
logger . warn ( ' Created an instance of an unknown model ' )
def flask ( self ,
in_headers = True ,
headers = None ,
@ -146,7 +297,7 @@ class SenpyMixin(object):
else :
return item
return ser_or_down ( self . _plain_dict( ) )
return ser_or_down ( self . data )
def jsonld ( self ,
with_context = True ,
@ -188,108 +339,41 @@ class SenpyMixin(object):
return str ( self . serialize ( ) )
class BaseModel ( SenpyMixin , dict ) :
'''
Entities of the base model are a special kind of dictionary that emulates
a JSON - LD object . For convenience , the values can also be accessed as attributes
( a la Javascript ) . e . g . :
> myobject . key == myobject [ ' key ' ]
True
> myobject . ns__name == myobject [ ' ns:name ' ]
True
'''
schema = base_schema
def __init__ ( self , * args , * * kwargs ) :
self . attrs_to_dict ( )
if ' id ' in kwargs :
self . id = kwargs . pop ( ' id ' )
elif kwargs . pop ( ' _auto_id ' , True ) :
self . id = ' _: {} _ {} ' . format ( type ( self ) . __name__ , time . time ( ) )
temp = self . get_defaults ( )
temp . update ( dict ( * args ) )
for k , v in kwargs . items ( ) :
temp [ self . _get_key ( k ) ] = v
super ( BaseModel , self ) . __init__ ( temp )
if ' @type ' not in self :
logger . warn ( ' Created an instance of an unknown model ' )
def get_defaults ( self ) :
temp = { }
for obj in [
self . schema ,
] + self . schema . get ( ' allOf ' , [ ] ) :
for k , v in obj . get ( ' properties ' , { } ) . items ( ) :
if ' default ' in v and k not in temp :
temp [ k ] = copy . deepcopy ( v [ ' default ' ] )
return temp
def attrs_to_dict ( self ) :
'''
Copy the attributes of the class to the instance .
This allows adding default values in the class definition .
e . g . :
class MyPlugin ( Plugin ) :
version = 0.3
description = ' A dull plugin '
'''
def is_attr ( x ) :
return not ( inspect . isroutine ( x ) or inspect . ismethod ( x ) or isinstance ( x , property ) )
for key , value in inspect . getmembers ( self . __class__ , is_attr ) :
if key [ 0 ] != ' _ ' and key != ' schema ' :
self [ key ] = value
def _get_key ( self , key ) :
if key is ' id ' :
key = ' @id '
key = key . replace ( " __ " , " : " , 1 )
return key
_subtypes = { }
def __delitem__ ( self , key ) :
key = self . _get_key ( key )
dict . __delitem__ ( self , key )
def _internal_key ( self , key ) :
return key [ 0 ] == ' _ ' or key in self . __dict__
def register ( rsubclass , rtype = None ) :
_subtypes [ rtype or rsubclass . __name__ ] = rsubclass
def _plain_dict ( self ) :
d = { k : v for ( k , v ) in self . items ( ) if k [ 0 ] != " _ " }
return d
def __getattr__ ( self , key ) :
'''
__getattr__ only gets called when the attribute could not
be found in the __dict__ . So we only need to look for the
the element in the dictionary , or raise an Exception .
'''
if self . _internal_key ( key ) :
raise AttributeError ( key )
return self . __getitem__ ( self . _get_key ( key ) )
def from_schema ( name , schema = None , schema_file = None , base_classes = None ) :
base_classes = base_classes or [ ]
base_classes . append ( BaseModel )
schema_file = schema_file or ' {} .json ' . format ( name )
class_name = ' {} {} ' . format ( name [ 0 ] . upper ( ) , name [ 1 : ] )
if ' / ' not in ' schema_file ' :
thisdir = os . path . dirname ( os . path . realpath ( __file__ ) )
schema_file = os . path . join ( thisdir ,
' schemas ' ,
schema_file )
def __setattr__ ( self , key , value ) :
if self . _internal_key ( key ) :
return super ( BaseModel , self ) . __setattr__ ( key , value )
key = self . _get_key ( key )
return self . __setitem__ ( self . _get_key ( key ) , value )
schema_path = ' file:// ' + schema_file
def __delattr__ ( self , key ) :
if self . _internal_key ( key ) :
return object . __delattr__ ( self , key )
key = self . _get_key ( key )
self . __delitem__ ( self . _get_key ( key ) )
with open ( schema_file ) as f :
schema = json . load ( f )
dct = { }
def register ( rsubclass , rtype = None ) :
_subtypes [ rtype or rsubclass . __name__ ] = rsubclass
resolver = jsonschema . RefResolver ( schema_path , schema )
dct [ ' @type ' ] = name
dct [ ' _schema_file ' ] = schema_file
dct [ ' schema ' ] = schema
dct [ ' _validator ' ] = jsonschema . Draft4Validator ( schema , resolver = resolver )
newclass = type ( class_name , tuple ( base_classes ) , dct )
_subtypes = { }
register ( newclass , name )
return newclass
def from_dict ( indict , cls = None ) :
@ -309,10 +393,11 @@ def from_dict(indict, cls=None):
elif isinstance ( v , dict ) :
v = from_dict ( indict [ k ] )
elif isinstance ( v , list ) :
v = v [ : ]
for ix , v2 in enumerate ( v ) :
if isinstance ( v2 , dict ) :
v [ ix ] = from_dict ( v2 )
outdict [ k ] = v
outdict [ k ] = copy. deepcopy ( v)
return cls ( * * outdict )
@ -325,35 +410,6 @@ def from_json(injson):
return from_dict ( indict )
def from_schema ( name , schema = None , schema_file = None , base_classes = None ) :
base_classes = base_classes or [ ]
base_classes . append ( BaseModel )
schema_file = schema_file or ' {} .json ' . format ( name )
class_name = ' {} {} ' . format ( name [ 0 ] . upper ( ) , name [ 1 : ] )
if ' / ' not in ' schema_file ' :
schema_file = os . path . join ( os . path . dirname ( os . path . realpath ( __file__ ) ) ,
' schemas ' ,
schema_file )
schema_path = ' file:// ' + schema_file
with open ( schema_file ) as f :
schema = json . load ( f )
dct = { }
resolver = jsonschema . RefResolver ( schema_path , schema )
dct [ ' @type ' ] = name
dct [ ' _schema_file ' ] = schema_file
dct [ ' schema ' ] = schema
dct [ ' _validator ' ] = jsonschema . Draft4Validator ( schema , resolver = resolver )
newclass = type ( class_name , tuple ( base_classes ) , dct )
register ( newclass , name )
return newclass
def _add_from_schema ( * args , * * kwargs ) :
generatedClass = from_schema ( * args , * * kwargs )
globals ( ) [ generatedClass . __name__ ] = generatedClass
@ -384,40 +440,14 @@ for i in [
_ErrorModel = from_schema ( ' error ' )
class Error ( SenpyMixin , Exception ) :
class Error ( _ErrorModel , Exception ) :
def __init__ ( self , message , * args , * * kwargs ) :
super ( Error , self ) . __init__ ( self , message , message )
self . _error = _ErrorModel ( message = message , * args , * * kwargs )
Exception . __init__ ( self , message )
super ( Error , self ) . __init__ ( * args , * * kwargs )
self . message = message
def validate ( self , obj = None ) :
self . _error . validate ( )
def __getitem__ ( self , key ) :
return self . _error [ key ]
def __setitem__ ( self , key , value ) :
self . _error [ key ] = value
def __delitem__ ( self , key ) :
del self . _error [ key ]
def __getattr__ ( self , key ) :
if key != ' _error ' and hasattr ( self . _error , key ) :
return getattr ( self . _error , key )
raise AttributeError ( key )
def __setattr__ ( self , key , value ) :
if key != ' _error ' :
return setattr ( self . _error , key , value )
else :
super ( Error , self ) . __setattr__ ( key , value )
def __delattr__ ( self , key ) :
delattr ( self . _error , key )
def __str__ ( self ) :
return str ( self . to_JSON ( with_context = False ) )
def __hash__ ( self ) :
return Exception . __hash__ ( self )
register ( Error , ' error ' )