mirror of
https://github.com/gsi-upm/senpy
synced 2025-10-20 10:18:26 +00:00
Last batch of big changes
* Add Box plugin (i.e. black box) * Add SentimentBox, EmotionBox and MappingMixin * Refactored CustomDict
This commit is contained in:
185
senpy/meta.py
185
senpy/meta.py
@@ -8,6 +8,7 @@ import inspect
|
||||
import copy
|
||||
|
||||
from abc import ABCMeta
|
||||
from collections import MutableMapping, namedtuple
|
||||
|
||||
|
||||
class BaseMeta(ABCMeta):
|
||||
@@ -31,24 +32,31 @@ class BaseMeta(ABCMeta):
|
||||
_subtypes = {}
|
||||
|
||||
def __new__(mcs, name, bases, attrs, **kwargs):
|
||||
defaults = {}
|
||||
register_afterwards = False
|
||||
defaults = {}
|
||||
|
||||
attrs = mcs.expand_with_schema(name, attrs)
|
||||
if 'schema' in attrs:
|
||||
register_afterwards = True
|
||||
defaults = mcs.get_defaults(attrs['schema'])
|
||||
for b in bases:
|
||||
if hasattr(b, '_defaults'):
|
||||
defaults.update(b._defaults)
|
||||
info, attrs = mcs.split_attrs(attrs)
|
||||
defaults.update(info)
|
||||
attrs['_defaults'] = defaults
|
||||
for base in bases:
|
||||
if hasattr(base, '_defaults'):
|
||||
defaults.update(getattr(base, '_defaults'))
|
||||
|
||||
cls = super(BaseMeta, mcs).__new__(mcs, name, tuple(bases), attrs)
|
||||
info, rest = mcs.split_attrs(attrs)
|
||||
|
||||
for i in list(info.keys()):
|
||||
if isinstance(info[i], _Alias):
|
||||
fget, fset, fdel = make_property(info[i].indict)
|
||||
rest[i] = property(fget=fget, fset=fset, fdel=fdel)
|
||||
else:
|
||||
defaults[i] = info[i]
|
||||
|
||||
rest['_defaults'] = defaults
|
||||
|
||||
cls = super(BaseMeta, mcs).__new__(mcs, name, tuple(bases), rest)
|
||||
|
||||
if register_afterwards:
|
||||
mcs.register(cls, cls._defaults['@type'])
|
||||
mcs.register(cls, defaults['@type'])
|
||||
return cls
|
||||
|
||||
@classmethod
|
||||
@@ -81,17 +89,26 @@ class BaseMeta(ABCMeta):
|
||||
attrs['_schema_file'] = schema_file
|
||||
attrs['schema'] = schema
|
||||
attrs['_validator'] = jsonschema.Draft4Validator(schema, resolver=resolver)
|
||||
|
||||
schema_defaults = BaseMeta.get_defaults(attrs['schema'])
|
||||
attrs.update(schema_defaults)
|
||||
|
||||
return attrs
|
||||
|
||||
@staticmethod
|
||||
def is_attr(k, v):
|
||||
return (not(inspect.isroutine(v) or
|
||||
inspect.ismethod(v) or
|
||||
inspect.ismodule(v) or
|
||||
isinstance(v, property)) and
|
||||
k[0] != '_' and
|
||||
k != 'schema' and
|
||||
k != 'data')
|
||||
def is_func(v):
|
||||
return inspect.isroutine(v) or inspect.ismethod(v) or \
|
||||
inspect.ismodule(v) or isinstance(v, property)
|
||||
|
||||
@staticmethod
|
||||
def is_internal(k):
|
||||
return k[0] == '_' or k == 'schema' or k == 'data'
|
||||
|
||||
@staticmethod
|
||||
def get_key(key):
|
||||
if key[0] != '_':
|
||||
key = key.replace("__", ":", 1)
|
||||
return key
|
||||
|
||||
@staticmethod
|
||||
def split_attrs(attrs):
|
||||
@@ -102,15 +119,13 @@ class BaseMeta(ABCMeta):
|
||||
e.g.:
|
||||
'''
|
||||
isattr = {}
|
||||
notattr = {}
|
||||
rest = {}
|
||||
for key, value in attrs.items():
|
||||
if BaseMeta.is_attr(key, value):
|
||||
if key[0] != '_':
|
||||
key = key.replace("__", ":", 1)
|
||||
isattr[key] = copy.deepcopy(value)
|
||||
if not (BaseMeta.is_internal(key)) and (not BaseMeta.is_func(value)):
|
||||
isattr[key] = value
|
||||
else:
|
||||
notattr[key] = value
|
||||
return isattr, notattr
|
||||
rest[key] = value
|
||||
return isattr, rest
|
||||
|
||||
@staticmethod
|
||||
def get_defaults(schema):
|
||||
@@ -120,5 +135,123 @@ class BaseMeta(ABCMeta):
|
||||
] + schema.get('allOf', []):
|
||||
for k, v in obj.get('properties', {}).items():
|
||||
if 'default' in v and k not in temp:
|
||||
temp[k] = copy.deepcopy(v['default'])
|
||||
temp[k] = v['default']
|
||||
return temp
|
||||
|
||||
|
||||
def make_property(key):
|
||||
|
||||
def fget(self):
|
||||
return self[key]
|
||||
|
||||
def fdel(self):
|
||||
del self[key]
|
||||
|
||||
def fset(self, value):
|
||||
self[key] = value
|
||||
|
||||
return fget, fset, fdel
|
||||
|
||||
|
||||
class CustomDict(MutableMapping, object):
|
||||
'''
|
||||
A dictionary whose elements can also be accessed as attributes. Since some
|
||||
characters are not valid in the dot-notation, the attribute names also
|
||||
converted. e.g.:
|
||||
|
||||
> d = CustomDict()
|
||||
> d.key = d['ns:name'] = 1
|
||||
> d.key == d['key']
|
||||
True
|
||||
> d.ns__name == d['ns:name']
|
||||
'''
|
||||
|
||||
_defaults = {}
|
||||
_map_attr_key = {'id': '@id'}
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(CustomDict, self).__init__()
|
||||
for k, v in self._defaults.items():
|
||||
self[k] = copy.copy(v)
|
||||
for arg in args:
|
||||
self.update(arg)
|
||||
for k, v in kwargs.items():
|
||||
self[self._attr_to_key(k)] = v
|
||||
return self
|
||||
|
||||
def serializable(self):
|
||||
def ser_or_down(item):
|
||||
if hasattr(item, 'serializable'):
|
||||
return item.serializable()
|
||||
elif isinstance(item, dict):
|
||||
temp = dict()
|
||||
for kp in item:
|
||||
vp = item[kp]
|
||||
temp[kp] = ser_or_down(vp)
|
||||
return temp
|
||||
elif isinstance(item, list) or isinstance(item, set):
|
||||
return list(ser_or_down(i) for i in item)
|
||||
else:
|
||||
return item
|
||||
|
||||
return ser_or_down(self.as_dict())
|
||||
|
||||
def __getitem__(self, key):
|
||||
key = self._key_to_attr(key)
|
||||
return self.__dict__[key]
|
||||
|
||||
def __setitem__(self, key, value):
|
||||
'''Do not insert data directly, there might be a property in that key. '''
|
||||
key = self._key_to_attr(key)
|
||||
return setattr(self, key, value)
|
||||
|
||||
def as_dict(self):
|
||||
return {self._attr_to_key(k): v for k, v in self.__dict__.items()
|
||||
if not self._internal_key(k)}
|
||||
|
||||
def __iter__(self):
|
||||
return (k for k in self.__dict__ if not self._internal_key(k))
|
||||
|
||||
def __len__(self):
|
||||
return len(self.__dict__)
|
||||
|
||||
def __delitem__(self, key):
|
||||
del self.__dict__[key]
|
||||
|
||||
def update(self, other):
|
||||
for k, v in other.items():
|
||||
self[k] = v
|
||||
|
||||
def _attr_to_key(self, key):
|
||||
key = key.replace("__", ":", 1)
|
||||
key = self._map_attr_key.get(key, key)
|
||||
return key
|
||||
|
||||
def _key_to_attr(self, key):
|
||||
if self._internal_key(key):
|
||||
return key
|
||||
key = key.replace(":", "__", 1)
|
||||
return key
|
||||
|
||||
def __getattr__(self, key):
|
||||
try:
|
||||
return self.__dict__[self._attr_to_key(key)]
|
||||
except KeyError:
|
||||
raise AttributeError
|
||||
|
||||
@staticmethod
|
||||
def _internal_key(key):
|
||||
return key[0] == '_'
|
||||
|
||||
def __str__(self):
|
||||
return str(self.serializable())
|
||||
|
||||
def __repr__(self):
|
||||
return str(self.serializable())
|
||||
|
||||
|
||||
_Alias = namedtuple('Alias', 'indict')
|
||||
|
||||
|
||||
def alias(key):
|
||||
return _Alias(key)
|
||||
|
Reference in New Issue
Block a user