You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

259 lines
7.0 KiB

Meta-programming for the models.
import os
import json
import jsonschema
import inspect
import copy
from abc import ABCMeta
from collections import MutableMapping, namedtuple
class BaseMeta(ABCMeta):
Metaclass for models. It extracts the default values for the fields in
the model.
For instance, instances of the following class wouldn't need to mark
their version or description on initialization:
.. code-block:: python
class MyPlugin(Plugin):
description='A dull plugin'
Note that these operations could be included in the __init__ of the
class, but it would be very inefficient.
_subtypes = {}
def __new__(mcs, name, bases, attrs, **kwargs):
register_afterwards = False
defaults = {}
attrs = mcs.expand_with_schema(name, attrs)
if 'schema' in attrs:
register_afterwards = True
for base in bases:
if hasattr(base, '_defaults'):
defaults.update(getattr(base, '_defaults'))
info, rest = mcs.split_attrs(attrs)
for i in list(info.keys()):
if isinstance(info[i], _Alias):
fget, fset, fdel = make_property(info[i].indict)
rest[i] = property(fget=fget, fset=fset, fdel=fdel)
defaults[i] = info[i]
rest['_defaults'] = defaults
cls = super(BaseMeta, mcs).__new__(mcs, name, tuple(bases), rest)
if register_afterwards:
mcs.register(cls, defaults['@type'])
return cls
def register(mcs, rsubclass, rtype=None):
mcs._subtypes[rtype or rsubclass.__name__] = rsubclass
def expand_with_schema(name, attrs):
if 'schema' in attrs: # Schema specified by name
schema_file = '{}.json'.format(attrs['schema'])
elif 'schema_file' in attrs:
schema_file = attrs['schema_file']
del attrs['schema_file']
return attrs
if '/' not in 'schema_file':
thisdir = os.path.dirname(os.path.realpath(__file__))
schema_file = os.path.join(thisdir,
schema_path = 'file://' + schema_file
with open(schema_file) as f:
schema = json.load(f)
resolver = jsonschema.RefResolver(schema_path, schema)
if '@type' not in attrs:
attrs['@type'] = "".join((name[0].lower(), name[1:]))
attrs['_schema_file'] = schema_file
attrs['schema'] = schema
attrs['_validator'] = jsonschema.Draft4Validator(schema, resolver=resolver)
schema_defaults = BaseMeta.get_defaults(attrs['schema'])
return attrs
def is_func(v):
return inspect.isroutine(v) or inspect.ismethod(v) or \
inspect.ismodule(v) or isinstance(v, property)
def is_internal(k):
return k[0] == '_' or k == 'schema' or k == 'data'
def get_key(key):
if key[0] != '_':
key = key.replace("__", ":", 1)
return key
def split_attrs(attrs):
Extract the attributes of the class.
This allows adding default values in the class definition.
isattr = {}
rest = {}
for key, value in attrs.items():
if not (BaseMeta.is_internal(key)) and (not BaseMeta.is_func(value)):
isattr[key] = value
rest[key] = value
return isattr, rest
def get_defaults(schema):
temp = {}
for obj in [
] + schema.get('allOf', []):
for k, v in obj.get('properties', {}).items():
if 'default' in v and k not in temp:
temp[k] = v['default']
return temp
def make_property(key):
def fget(self):
return self[key]
def fdel(self):
del self[key]
def fset(self, value):
self[key] = value
return fget, fset, fdel
class CustomDict(MutableMapping, object):
A dictionary whose elements can also be accessed as attributes. Since some
characters are not valid in the dot-notation, the attribute names also
converted. e.g.:
> d = CustomDict()
> d.key = d['ns:name'] = 1
> d.key == d['key']
> d.ns__name == d['ns:name']
_defaults = {}
_map_attr_key = {'id': '@id'}
def __init__(self, *args, **kwargs):
super(CustomDict, self).__init__()
for k, v in self._defaults.items():
self[k] = copy.copy(v)
for arg in args:
for k, v in kwargs.items():
self[self._attr_to_key(k)] = v
return self
def serializable(self):
def ser_or_down(item):
if hasattr(item, 'serializable'):
return item.serializable()
elif isinstance(item, dict):
temp = dict()
for kp in item:
vp = item[kp]
temp[kp] = ser_or_down(vp)
return temp
elif isinstance(item, list) or isinstance(item, set):
return list(ser_or_down(i) for i in item)
return item
return ser_or_down(self.as_dict())
def __getitem__(self, key):
key = self._key_to_attr(key)
return self.__dict__[key]
def __setitem__(self, key, value):
'''Do not insert data directly, there might be a property in that key. '''
key = self._key_to_attr(key)
return setattr(self, key, value)
def as_dict(self):
return {self._attr_to_key(k): v for k, v in self.__dict__.items()
if not self._internal_key(k)}
def __iter__(self):
return (k for k in self.__dict__ if not self._internal_key(k))
def __len__(self):
return len(self.__dict__)
def __delitem__(self, key):
del self.__dict__[key]
def update(self, other):
for k, v in other.items():
self[k] = v
def _attr_to_key(self, key):
key = key.replace("__", ":", 1)
key = self._map_attr_key.get(key, key)
return key
def _key_to_attr(self, key):
if self._internal_key(key):
return key
key = key.replace(":", "__", 1)
return key
def __getattr__(self, key):
return self.__dict__[self._attr_to_key(key)]
except KeyError:
raise AttributeError
def _internal_key(key):
return key[0] == '_'
def __str__(self):
return json.dumps(self.serializable(), sort_keys=True, indent=4)
def __repr__(self):
return json.dumps(self.serializable(), sort_keys=True, indent=4)
_Alias = namedtuple('Alias', 'indict')
def alias(key):
return _Alias(key)