Source code for cubes.wsme.types

# -*- coding: utf-8 -*-
# copyright 2014 UNLISH, all rights reserved.
# contact http://www.unlish.com/ -- mailto:christophe@unlish.com
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 2.1 of the License, or (at your option)
# any later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

"""
Types
"""

import base64
import inspect
import itertools
import json
import re

import six
import wsme
import wsme.types
import wsme.rest.args
import wsme.rest.json

from cubicweb import Unauthorized, Binary


[docs]class PassThroughType(object): """Special webservice type that transmit a value without doing anything It is used in wsme signature for the 'entity' argument which is passed by the controller itself and should not be transtyped in any way.""" @classmethod def validate(self, value): return value
@wsme.rest.args.from_param.when_object(PassThroughType) def any_from_param(datatype, value): return value class JsonDataType(wsme.types.UserType): name = 'json' basetype = wsme.types.text def frombasetype(self, value): if value is None: return value return json.loads(value) def tobasetype(self, value): return json.dumps(value) @classmethod def validate(self, value): return value #: User type that carry json encoded arbitrary data. JsonData = JsonDataType() @wsme.rest.json.fromjson.when_object(JsonData) def jsondata_fromjson(datatype, value): return value
[docs]class wsattr(wsme.types.wsattr): """Cubicweb-specific version of :class:`wsme.types.wsattr` To be used on a :class:`Base` class. All the attributes are optionnal and can be deduced from the :attr:`rtype`. .. attribute:: rtype The corresponding relation in the model .. attribute:: role The role of the parent class in the relation .. attribute:: etype The entity type on the other side of the relation .. attribute:: datatype The webservice type """ def __init__( self, rtype=None, role='subject', etype=None, datatype=None, writeonly=False, **kw): super(wsattr, self).__init__(datatype, **kw) self.rtype = rtype self.role = role self.etype = etype self.writeonly = writeonly def reginit(self, vreg, class_): if self.rtype is None: self.rtype = self.name rschema = vreg.schema.rschema(self.rtype) self.isfinal = rschema.final self.inlined = rschema.inlined etypes = rschema.targets( None if class_.__etype__ == 'Any' else class_.__etype__, self.role) if self.role == 'subject': self.rname = self.rtype elif self.role == 'object': self.rname = 'reverse_' + self.rtype if self.etype is None: # Autodetect the etype if len(etypes) != 1: self.etype = 'Any' else: self.etype = etypes[0].type else: # check that the etype is valid if self.etype == 'Any': if len(etypes) == 1: raise ValueError( 'Any should not be used on non-polymorphic relations') else: if self.etype not in etypes: raise ValueError( 'Wrong etype %s for relation %s %s' % (self.etype, self.rtype, self.role)) def finalize_init(self, class_): if self.datatype is not None and self.datatype != [None]: return rschema = class_.__registry__.vreg.schema.rschema(self.rtype) datatype = class_.__registry__.guess_datatype(self.etype) if class_.__etype__ == 'Any' or self.etype == 'Any': needarray = False for rdef in rschema.rdefs.values(): if rdef.role_cardinality(self.role) in '+*': needarray = True break else: rdef = rschema.role_rdef(class_.__etype__, self.etype, self.role) needarray = rdef.role_cardinality(self.role) in '+*' if needarray: datatype = wsme.types.ArrayType(datatype) self.datatype = datatype
[docs]def iswsattr(obj): """returns True if an object is a :class:`wsattr`""" return isinstance(obj, wsattr)
class BaseMeta(wsme.types.BaseMeta): """ Metaclass for :class:`Base` Bypass the wsme.types.BaseMeta __new__ and __init__ because they set a default __registry__ """ def __new__(cls, name, bases, dct): return type.__new__(cls, name, bases, dct) def __init__(cls, name, bases, dct): pass
[docs]class Base(six.with_metaclass(BaseMeta, wsme.types.Base)): """Base class for a complex type that map an entity type """ #: Entity eid eid = long __autoexclude__ = ( 'is', 'identity', 'cw_source', 'is_instance_of', 'has_text') def __init__(self, entity=None, keyonly=False, fetch=()): if entity: self.from_entity(entity, keyonly, fetch) @classmethod
[docs] def reginit(cls, vreg): """ Register the class Use the informations in the registry, and most notably the schema, to initialize the attributes. """ if getattr(cls, '__autoattr__', False): eschema = vreg.schema.eschema(cls.__etype__) existing_rels = set( (attr.rtype, attr.role) for name, attr in inspect.getmembers(cls, iswsattr)) if cls.__autoattr__ is True: rels = itertools.chain( ((rel, 'subject', rel.type) for rel in eschema.subject_relations()), ((rel, 'object', '<' + rel.type) for rel in eschema.object_relations())) else: rels = itertools.chain( ((eschema.subjrels[name], 'subject', name) for name in cls.__autoattr__ if not name.startswith('<')), ((eschema.objrels[name[1:]], 'object', name) for name in cls.__autoattr__ if name.startswith('<'))) rels = ( (rel, role, name) for rel, role, name in rels if name not in cls.__autoexclude__ and (rel.type, role) not in existing_rels ) for rel, role, name in rels: key = rel.type if role == 'object': key += '_object' setattr( cls, key, wsattr(rel.type, role=role, name=name)) vreg.wsme_registry.register(cls) for attr in cls._wsme_attributes: if isinstance(attr, wsattr): attr.reginit(vreg, cls)
@classmethod
[docs] def finalize_init(cls): """ Finalize the class initialization. This last step resolve types in the underlying attributes. """ for attr in cls._wsme_attributes: if isinstance(attr, wsattr): attr.finalize_init(cls)
_rtype_re = re.compile(u'(?P<rtype>[^.[]+)\[(?P<etypes>[^\]]+)\]')
[docs] def from_entity(self, entity, keyonly=False, fetch=()): """ Load values from an entity :param entity: the entity :param keyonly: if `True`, only the .eid and .modfication_date will be loaded. The result can be used as "timestamped reference". :param fetch: a list of relations to eager load. Unless specified, all the '1' or '?' relation targets will be loaded as 'keyonly', and the '*' or '+' relations will not be loaded at all. """ self.eid = entity.eid self.modification_date = entity.modification_date if keyonly: # Append the modification_date return typed_fetch = { m.group('rtype'): m.group('etypes').split(',') for m in ( self._rtype_re.match(f) for f in fetch) if m is not None} for attr in self._wsme_attributes: if not isinstance(attr, wsattr): continue if attr.writeonly: continue if attr.isfinal: subfetch = None else: prefix = attr.name + '.' prefixlen = len(prefix) subfetch = [ x[prefixlen:] for x in fetch if x.startswith(prefix) ] if wsme.types.isarray(attr.datatype): if not attr.name in fetch \ and not attr.name in typed_fetch \ and not subfetch: continue if attr.name in typed_fetch: value = entity.related( attr.rtype, role=attr.role, entities=True, targettypes=typed_fetch[attr.name]) else: try: value = getattr(entity, attr.rname) except Unauthorized: # XXX set value = Forbidden ? continue if not attr.isfinal: attr_keyonly = not subfetch if wsme.types.isarray(attr.datatype): if value: value = [ attr.datatype.item_type( o, keyonly=attr_keyonly, fetch=subfetch) for o in value ] else: value = [] else: if value: value = attr.datatype( value[0], keyonly=attr_keyonly, fetch=subfetch ) else: value = None attr.__set__(self, value)
[docs] def final_values(self): """ Returns a dict with all the attribute values. This dict can be used to feed :meth:`cubicweb.entity.Entity.cw_set`. """ values = {} for attr in self._wsme_attributes: if isinstance(attr, wsattr): if attr.isfinal and not attr.readonly: value = attr.__get__(self, self.__class__) if value is not wsme.types.Unset: values[attr.rtype] = value return values
[docs] def to_entity(self, entity): """ Update the entity attributes (not the non-final relations). """ values = self.final_values() if values: entity.cw_set(**values)
[docs]class Any(Base): """Complex type to carry any type of entity. Automatically used for polymorphic relations targets """ __etype__ = 'Any' cw_etype = wsme.types.text created_by = wsattr() creation_date = wsattr() modification_date = wsattr() owned_by = wsattr() def from_entity(self, entity, keyonly=False, fetch=()): self.cw_etype = entity.cw_etype super(Any, self).from_entity( entity, keyonly=keyonly, fetch=())
class BinaryType(wsme.types.UserType): basetype = wsme.types.bytes name = 'binary' def tobasetype(self, value): if value is not None: return base64.encodestring(value.getvalue()) def frombasetype(self, value): if value is not None: return Binary(base64.decodestring(value)) #: webservice type that map the :class:`cubicweb.Binary` values. binary = BinaryType()
[docs]def scan(vreg, modname): """Scan a module for any class inheriting :class:`Base` and register them. """ mod = __import__(modname) if '.' in modname: for token in modname.split('.')[1:]: mod = getattr(mod, token) for name, value in inspect.getmembers(mod): if inspect.isclass(value) and issubclass(value, Base): value.reginit(vreg)