import importlib as I
import traceback
import logging
import rdflib as R
from yarom import DataUser
from .rdfTypeResolver import RDFTypeResolver
import yarom as Y
__all__ = [
"MappedClass",
"MappedPropertyClass",
"MappedClasses",
"DataObjectsParents",
"RDFTypeTable",
"get_most_specific_rdf_type",
"oid",
"Resolver",
"reload_module",
"load_module"]
L = logging.getLogger(__name__)
MappedClasses = dict() # class names to classes
DataObjectsParents = dict() # class names to parents of the related class
DataObjectProperties = dict() # Property classes to
RDFTypeTable = dict() # class rdf types to classes
ProplistToProptype = {"datatypeProperties": "DatatypeProperty",
"objectProperties": "ObjectProperty",
"_": "UnionProperty"
}
[docs]class MappedClass(type):
"""A type for MappedClasses
Sets up the graph with things needed for MappedClasses
"""
def __init__(cls, name, bases, dct):
type.__init__(cls, name, bases, dct)
if 'rdf_type' in dct:
cls.rdf_type = dct['rdf_type']
else:
cls.rdf_type = cls.conf['rdf.namespace'][cls.__name__]
if 'rdf_namespace' in dct:
cls.rdf_namespace = dct['rdf_namespace']
else:
cls.rdf_namespace = R.Namespace(
cls.conf['rdf.namespace'][cls.__name__] + "/")
cls.dataObjectProperties = []
for x in bases:
try:
cls.dataObjectProperties += x.dataObjectProperties
except AttributeError:
pass
cls.register()
@classmethod
[docs] def make_class(
cls,
name,
bases,
objectProperties=False,
datatypeProperties=False):
""" Intended to be used for setting up a class from the RDF graph, for instance. """
# Need to distinguish datatype and object properties...
if not datatypeProperties:
datatypeProperties = []
if not objectProperties:
objectProperties = []
cls(name, bases,
dict(
objectProperties=objectProperties,
datatypeProperties=datatypeProperties))
@property
def du(self):
# This is just a little indirection to make sure we initialize the DataUser property before using it.
# Initialization isn't done in __init__ because I wanted to make sure you could call `connect` before
# or after declaring your classes
if hasattr(self, '_du'):
return self._du
else:
raise Exception("You should have called `map` to get here")
@du.setter
def du(self, value):
assert(isinstance(value, DataUser))
self._du = value
def __lt__(cls, other):
if isinstance(other, MappedPropertyClass):
return False
return issubclass(cls, other) or ((not issubclass(other, cls)) \
and (cls.__name__ < other.__name__))
[docs] def register(cls):
"""Sets up the object graph related to this class
:meth:`regsister` never touches the RDF graph itself.
Also registers the properties of this DataObject
"""
cls._du = DataUser()
cls.children = []
MappedClasses[cls.__name__] = cls
DataObjectsParents[cls.__name__] = \
[x for x in cls.__bases__ if isinstance(x, MappedClass)]
cls.parents = DataObjectsParents[cls.__name__]
for c in cls.parents:
c.add_child(cls)
cls.addProperties('objectProperties')
cls.addProperties('datatypeProperties')
cls.addProperties('_')
if getattr(Y, cls.__name__, False):
new_name = "_" + cls.__name__
warnMismapping(
'yarom module',
cls.__name__,
"nothing",
getattr(
Y,
cls.__name__))
if getattr(Y, new_name, False):
L.warning(
"Still unable to add {0} to {1}. {0} will not be accessible through {1}".format(
new_name,
'yarom module'))
else:
setattr(Y, new_name, cls)
else:
setattr(Y, cls.__name__, cls)
return cls
[docs] def deregister(cls):
"""Removes the class from the object graph.
Should make it possible to garbage collect
:meth:`deregister` never touches the RDF graph itself.
"""
if getattr(Y, cls.__name__) == cls:
delattr(Y, cls.__name__)
elif getattr(Y, "_" + cls.__name__) == cls:
delattr(Y, "_" + cls.__name__)
if cls.__name__ in MappedClasses:
del MappedClasses[cls.__name__]
if cls.__name__ in DataObjectsParents:
del DataObjectsParents[cls.__name__]
for c in cls.parents:
c.remove_child(cls)
def remove_child(cls, child):
if hasattr(cls, 'children'):
cls.children.remove(child)
else:
raise Exception(
"Cannot remove child {0} from {1} as {1} has not yet been registered".format(
child,
cls))
def add_child(cls, child):
if hasattr(cls, 'children'):
cls.children.append(child)
else:
raise Exception(
"Cannot add child {0} to {1} as {1} has not yet been registered".format(
child,
cls))
[docs] def map(cls):
"""
Maps the class to the configured rdf graph.
"""
# NOTE: Map should be quick: it runs for every DataObject sub-class created and possibly
# several times in testing
from .dataObject import TypeDataObject
cls.rdf_type_object = TypeDataObject(ident=cls.rdf_type)
RDFTypeTable[cls.rdf_type] = cls
cls._add_parents_to_graph()
cls._add_namespace_to_manager()
return cls
[docs] def unmap(cls):
"""
Unmaps the class
"""
def _remove_namespace_from_manager(cls):
pass
#cls.du['rdf.namespace_manager'].bind(cls.__name__, "")
def _add_namespace_to_manager(cls):
cls.du['rdf.namespace_manager'].bind(
cls.__name__,
cls.rdf_namespace,
replace=True)
def _add_parents_to_graph(cls):
from .dataObject import RDFSSubClassOfProperty, DataObject
for parent in cls.parents:
ancestors = (x for x in parent.mro() if issubclass(x, DataObject))
for ancestor in ancestors:
cls.rdf_type_object.relate(
'rdfs_subClassOf',
ancestor.rdf_type_object,
RDFSSubClassOfProperty)
def addProperties(cls, listName):
# TODO: Make an option string to abbreviate these options
try:
if hasattr(cls, listName):
propList = getattr(cls, listName)
propType = ProplistToProptype[listName]
assert(isinstance(propList, (tuple, list, set)))
for x in propList:
value_type = None
p = None
if isinstance(x, tuple):
if len(x) > 2:
value_type = x[2]
p = _create_property(
cls,
x[0],
propType,
value_type=value_type)
elif isinstance(x, dict):
if 'prop' in x:
p = DataObjectProperties[x['prop']]
else:
name = x['name']
del x['name']
if 'type' in x:
value_type = x['type']
del x['type']
p = _create_property(
cls,
name,
propType,
value_type=value_type,
**x)
else:
p = _create_property(cls, x, propType)
cls.dataObjectProperties.append(p)
setattr(cls, '_' + listName, propList)
setattr(cls, listName, [])
except:
traceback.print_exc()
def _cleanupGraph(cls):
""" Cleans up the graph by removing statements that can't be connected to typed statement. """
# XXX: This might belong in DataUser instead
q = """DELETE { ?b ?x ?y }
WHERE
{
?b ?x ?y .
FILTER (NOT EXISTS { ?b rdf:type ?c } ) .
}"""
cls.du.rdf.update(q)
def warnMismapping(mapping, key, should_be, is_actually=None):
if is_actually is None:
is_actually = mapping[key]
L.warning(
"Mismapping of {} in {}. Is {}. Should be {}".format(
key,
mapping,
is_actually,
should_be))
raise Exception("Mismapping")
class MappedPropertyClass(type):
def __init__(cls, name, bases, dct):
super(MappedPropertyClass, cls).__init__(name, bases, dct)
if 'link' not in dct:
cls.link = cls.conf['rdf.namespace'][cls.__name__]
else:
cls.link = dct['link']
cls.register()
def register(cls):
# This is how we create the RDF predicate that points from the owner
# to this property
MappedClasses[cls.__name__] = cls
DataObjectProperties[cls.__name__] = cls
setattr(Y, cls.__name__, cls)
return cls
def deregister(cls):
if MappedClasses.get(cls.__name__, False) == cls:
del MappedClasses[cls.__name__]
else:
warnMismapping(MappedClasses, cls.__name__, cls)
if DataObjectProperties.get(cls.__name__, False) == cls:
del DataObjectProperties[cls.__name__]
else:
warnMismapping(DataObjectProperties, cls.__name__, cls)
if getattr(Y, cls.__name__) == cls:
setattr(Y, cls.__name__, cls)
else:
warnMismapping(dir(Y), cls.__name__, cls)
return cls
def map(cls):
from .dataObject import PropertyDataObject, RDFSDomainProperty, RDFSRangeProperty
cls.rdf_object = PropertyDataObject(ident=cls.link)
if hasattr(cls, 'owner_type'):
cls.rdf_object.relate(
'rdfs_domain',
cls.owner_type.rdf_type_object,
RDFSDomainProperty)
if hasattr(cls, 'value_type'):
cls.rdf_object.relate(
'rdfs_range',
cls.value_type.rdf_type_object,
RDFSRangeProperty)
def __lt__(cls, other):
return issubclass(cls, other) \
or isinstance(other, MappedClass) \
or ((not issubclass(other, cls)) \
and (cls.__name__ < other.__name__))
def remap():
""" Calls `map` on all of the registered classes """
classes = sorted(list(MappedClasses.values()))
classes.reverse()
for x in classes:
x.map()
def unmap_all():
for cls in MappedClasses:
cls.unmap()
def deregister_all():
global MappedClasses
keys = list(MappedClasses.keys())
for cname in keys:
MappedClasses[cname].deregister()
def resolve_classes_from_rdf(graph):
""" Gathers Python classes from the RDF graph.
If there is a remote Python module registered in the RDF graph then an
import of the module is attempted. If no remote module can be found (i.e.,
none has been registered or the registered module cannot be retrieved) then
a subclass of DataObject is generated from data available in the graph
"""
# get the DataObject class resource
# get the subclasses of DataObject, transitively
# take the list of subclasses and resolve them into Python classes
for x in graph.transitive_subjects(
R.RDFS['subClassOf'],
Y.DataObject.rdf_type):
L.debug("RESOLVING {}".format(x))
resolve_class(x)
def resolve_class(uri):
from .classRegistry import RegistryEntry
# look up the class in the registryCache
if uri in RDFTypeTable:
# if it is in the regCache, then return the class;
return RDFTypeTable[uri]
else:
# otherwise, attempt to load into the cache by
# reading the RDF graph.
re = RegistryEntry()
re.rdfClass(uri)
for cd in get_class_descriptions(re):
# TODO: if load fails, attempt to construct the class
return load_class_from_description(cd)
def load_class_from_description(cd):
# TODO: Undo the effects to YAROM of loading a class when the
# module doesn't, in fact, have the class being searched
# for.
mod_name = cd.moduleName.one()
class_name = cd.className.one()
load_module(mod_name)
mod = Y
if mod is not None:
if hasattr(mod, class_name):
cls = getattr(mod, class_name)
if cls is not None:
return cls
else:
raise Exception("Cannot find class " + class_name)
def get_class_descriptions(re):
mod_data = []
for x in re.load():
for y in x.pythonClass():
mod_data.append(y)
return mod_data
def load_module(module_name):
a = I.import_module(module_name)
remap()
return a
def reload_module(mod):
from six.moves import reload_module
a = reload_module(mod)
remap()
return a
[docs]def oid(identifier_or_rdf_type, rdf_type=False):
""" Create an object from its rdf type
Parameters
----------
identifier_or_rdf_type : :class:`str` or :class:`rdflib.term.URIRef`
If `rdf_type` is provided, then this value is used as the identifier
for the newly created object. Otherwise, this value will be the
:attr:`rdf_type` of the object used to determine the Python type and the
object's identifier will be randomly generated.
rdf_type : :class:`str`, :class:`rdflib.term.URIRef`, :const:`False`
If provided, this will be the :attr:`rdf_type` of the newly created object.
Returns
-------
The newly created object
"""
identifier = identifier_or_rdf_type
if not rdf_type:
rdf_type = identifier_or_rdf_type
identifier = False
L.debug("oid making a {} with ident {}".format(rdf_type, identifier))
c = RDFTypeTable[rdf_type]
# if its our class name, then make our own object
# if there's a part after that, that's the property name
o = None
if identifier:
o = c(ident=identifier)
else:
o = c(generate_key=True)
return o
def _slice_dict(d, s):
return {k: v for k, v in d.items() if k in s}
def _create_property(
owner_type,
linkName,
property_type,
value_type=None,
multiple=True,
link=None):
# XXX This should actually get called for all of the properties when their owner
# classes are defined.
# The initialization, however, must happen with the owner object's
# creation
from .simpleProperty import ObjectProperty, DatatypeProperty, UnionProperty
properties = _slice_dict(locals(), ['owner_type', 'linkName', 'multiple'])
owner_class_name = owner_type.__name__
property_class_name = owner_class_name + "_" + linkName
x = None
if property_type == 'ObjectProperty':
x = ObjectProperty
if value_type is None:
value_type = Y.DataObject
properties['value_type'] = value_type
properties['value_rdf_type'] = value_type.rdf_type
elif property_type == 'DatatypeProperty':
x = DatatypeProperty
else:
x = UnionProperty
if link is not None:
properties['link'] = link
else:
properties['link'] = owner_type.rdf_namespace[linkName]
c = MappedPropertyClass(property_class_name, (x,), properties)
return c
[docs]def get_most_specific_rdf_type(types):
""" Gets the most specific rdf_type.
Returns the URI corresponding to the lowest in the DataObject class hierarchy
from among the given URIs.
"""
from .dataObject import DataObject
least = DataObject
for x in types:
try:
if RDFTypeTable[x] < least:
least = RDFTypeTable[x]
except KeyError:
pass
return least.rdf_type
class Resolver(RDFTypeResolver):
instance = None
@classmethod
def get_instance(cls):
from .dataObject import DataObject
from .rdfUtils import deserialize_rdflib_term
if cls.instance is None:
cls.instance = RDFTypeResolver(
DataObject.rdf_type,
get_most_specific_rdf_type,
oid,
deserialize_rdflib_term)
return cls.instance