"""Association Registry"""
from importlib import import_module
from inspect import (
getmembers,
isclass,
isfunction,
ismethod,
ismodule
)
import logging
from os.path import (
basename,
dirname,
expanduser,
expandvars,
)
import sys
from . import libpath
from .exceptions import (
AssociationError,
AssociationNotValidError
)
from .lib.callback_registry import CallbackRegistry
from .lib.constraint import ConstraintTrue
__all__ = [
'AssociationRegistry',
'RegistryMarker'
]
# Configure logging
logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())
# Library files
_ASN_RULE = 'association_rules.py'
[docs]class AssociationRegistry(dict):
"""The available assocations
Parameters
----------
definition_files: [str,]
The files to find the association definitions in.
include_default: bool
True to include the default definitions.
global_constraints: Constraint
Constraints to be added to each rule.
name: str
An identifying string, used to prefix rule names.
include_bases: bool
If True, include base classes not considered
rules.
Attributes
----------
rule_set: {rule [, ...]}
The rules in the registry.
Methods
-------
match(item)
Return associations where `item` matches any of the rules.
validate(association)
Determine whether an association is valid, or complete,
according to any of the rules in the registry.
finalize(associations)
Validate and execute post-processing hooks to
produce a completed and valid set of associations.
load(serialized)
Create an association from a serialized form.
Notes
-----
The general workflow is as follows:
* Create the registry
>>> registry = AssociationRegistry()
* Create associations from an item
>>> associations, reprocess = registry.match(item)
* Finalize the associations
>>> final_asns = registry.finalize(assocations)
In practice, this is one step in a larger loop over all items to
be associated. This does not account for adding items to already
existing associations. See :ref:`generate` for a full example of
using the registry.
"""
def __init__(self,
definition_files=None,
include_default=True,
global_constraints=None,
name=None,
include_bases=False):
super(AssociationRegistry, self).__init__()
# Generate a UUID for this instance. Used to modify rule
# names.
self.name = name
# Callback registry
self.callback = CallbackRegistry()
# Precache the set of rules
self._rule_set = set()
if definition_files is None:
definition_files = []
if include_default:
definition_files.insert(0, libpath(_ASN_RULE))
if len(definition_files) <= 0:
raise AssociationError('No rule definition files specified.')
self.schemas = []
self.Utility = type('Utility', (object,), {})
for fname in definition_files:
module = import_from_file(fname)
self.populate(
module,
global_constraints=global_constraints,
include_bases=include_bases
)
@property
def rule_set(self):
return self._rule_set
[docs] def match(self, item, version_id=None, allow=None, ignore=None):
"""See if item belongs to any of the associations defined.
Parameters
----------
item: dict
A item, like from a Pool, to find assocations for.
version_id: str
If specified, a string appened to association names.
If None, nothing is used.
allow: [type(Association), ...]
List of rules to allow to be matched. If None, all
available rules will be used.
ignore: list
A list of associations to ignore when looking for a match.
Intended to ensure that already created associations
are not re-created.
Returns
-------
(associations, reprocess_list): 2-tuple
associations: [association,...]
List of associations item belongs to. Empty if none match
reprocess_list: [AssociationReprocess, ...]
List of reprocess events.
"""
if allow is None:
allow = self.rule_set
if ignore is None:
ignore = []
associations = []
process_list = []
for name, rule in self.items():
if rule not in ignore and rule in allow:
asn, reprocess = rule.create(item, version_id)
process_list.extend(reprocess)
if asn is not None:
associations.append(asn)
return associations, process_list
[docs] def validate(self, association):
"""Validate a given association against schema
Parameters
----------
association: association-like
The data to validate
Returns
-------
rules: list
List of rules that validated
Raises
------
AssociationNotValidError
Association did not validate
"""
# Change rule validation from an exception
# to a boolean
def is_valid(rule, association):
try:
rule.validate(association)
except AssociationNotValidError:
return False
else:
return True
results = [
rule
for rule_name, rule in self.items()
if is_valid(rule, association)
]
if len(results) == 0:
raise AssociationNotValidError(
'Structure did not validate: "{}"'.format(association)
)
return results
[docs] def load(
self,
serialized,
format=None,
validate=True,
first=True,
**kwargs
):
"""Marshall a previously serialized association
Parameters
----------
serialized: object
The serialized form of the association.
format: str or None
The format to force. If None, try all available.
validate: bool
Validate against the class' defined schema, if any.
first: bool
A serialization potentially matches many rules.
Only return the first succesful load.
kwargs: dict
Other arguments to pass to the `load` method
Returns
-------
The Association object, or the list of association objects.
Raises
------
AssociationError
Cannot create or validate the association.
"""
results = []
for rule_name, rule in self.items():
try:
results.append(
rule.load(
serialized,
format=format,
validate=validate,
**kwargs
)
)
except (AssociationError, AttributeError) as err:
lasterr = err
continue
if first:
break
if len(results) == 0:
raise lasterr
if first:
return results[0]
else:
return results
[docs] def populate(self,
module,
global_constraints=None,
include_bases=None
):
"""Parse out all rules and callbacks in a module
Parameters
----------
module: module
The module, and all submodules, to be parsed.
Modifies
--------
self.callback
Found callbacks are added to the callback registry
"""
for name, obj in get_marked(module, include_bases=include_bases):
# Add rules.
if (include_bases and isclass(obj)) or\
obj._asnreg_role == 'rule':
self.add_rule(name, obj, global_constraints=global_constraints)
continue
# Add callbacks
if obj._asnreg_role == 'callback':
for event in obj._asnreg_events:
self.callback.add(event, obj)
continue
# Add schema
if obj._asnreg_role == 'schema':
self.schemas.append(obj._asnreg_schema)
continue
# Add utility classes
if obj._asnreg_role == 'utility':
self.Utility = type(
'Utility',
(obj, self.Utility),
{}
)
[docs] def add_rule(self, name, obj, global_constraints=None):
"""Add object as rule to registry
Parameters
----------
name: str
Name of the object
obj: object
The object to be considered a rule
global_constraints: dict
The global constraints to attach to the rule.
"""
try:
rule_name = '_'.join([self.name, name])
except TypeError:
rule_name = name
rule = type(rule_name, (obj,), {})
rule.GLOBAL_CONSTRAINT = global_constraints
rule.registry = self
self.__setitem__(rule_name, rule)
self._rule_set.add(rule)
[docs]class RegistryMarker:
"""Mark rules, callbacks, and module"""
class Schema:
def __init__(self, obj):
self._asnreg_role = 'schema'
self._asnreg_schema = obj
RegistryMarker.mark(self)
@property
def schema(self):
return self._asnreg_schema
[docs] @staticmethod
def mark(obj):
"""Mark that object should be part of the registry
Parameters
----------
obj: object
The object to mark
Modifies
--------
_asnreg_mark: True
Attribute added to object and is set to True
_asnreg_role: str or None
Attribute added to object indicating role this object plays.
If None, no particular role is indicated.
Returns
-------
obj: object
Return object to enable use as a decorator.
"""
obj._asnreg_marked = True
obj._asnreg_role = getattr(obj, '_asnreg_role', None)
return obj
[docs] @staticmethod
def rule(obj):
"""Mark object as rule
Parameters
----------
obj: object
The object that should be treated as a rule
Modifies
--------
_asnreg_role: 'rule'
Attributed added to object and set to `rule`
_asnreg_mark: True
Attributed added to object and set to True
Returns
-------
obj: object
Return object to enable use as a decorator.
"""
obj._asnreg_role = 'rule'
RegistryMarker.mark(obj)
return obj
[docs] @staticmethod
def callback(event):
"""Mark object as a callback for an event
Parameters
----------
event: str
Event this is a callback for.
obj: func
Function, or any callable, to be called
when the corresponding event is triggered.
Modifies
--------
_asnreg_role: 'callback'
Attributed added to object and set to `rule`
_asnreg_events: [event[, ...]]
The events this callable object is a callback for.
_asnreg_mark: True
Attributed added to object and set to True
Returns
-------
obj: object
Return object to enable use as a decorator.
"""
def decorator(func):
try:
events = func._asnreg_events
except AttributeError:
events = list()
events.append(event)
RegistryMarker.mark(func)
func._asnreg_role = 'callback'
func._asnreg_events = events
return func
return decorator
[docs] @staticmethod
def schema(filename):
"""Mark a file as a schema source"""
schema = RegistryMarker.Schema(filename)
return schema
[docs] @staticmethod
def utility(class_obj):
"""Mark the class as a Utility class"""
class_obj._asnreg_role = 'utility'
RegistryMarker.mark(class_obj)
return class_obj
[docs] @staticmethod
def is_marked(obj):
return hasattr(obj, '_asnreg_marked')
# Utilities
def import_from_file(filename):
"""Import a file as a module
Parameters
---------
filename: str
The file to import
Returns
-------
module: python module
The imported module
"""
path = expandvars(expanduser(filename))
module_name = basename(path).split('.')[0]
folder = dirname(path)
sys.path.insert(0, folder)
try:
module = import_module(module_name)
finally:
sys.path.pop(0)
return module
def get_marked(module, predicate=None, include_bases=False):
"""Recursively get all executable objects
Parameters
----------
module: python module
The module to examine
predicate: bool func(object)
Determinant of what gets returned.
If None, all object types are examined
include_bases: bool
If True, include base classes not considered
rules.
Returns
-------
class object: generator
A generator that will yield all class members in the module.
"""
def is_method(obj):
return (isfunction(obj) or
ismethod(obj)
)
for name, obj in getmembers(module, predicate):
if isclass(obj):
for sub_name, sub_obj in get_marked(obj, predicate=is_method):
yield sub_name, sub_obj
if RegistryMarker.is_marked(obj) or include_bases:
yield name, obj
elif RegistryMarker.is_marked(obj):
if ismodule(obj):
for sub_name, sub_obj in get_marked(
obj, predicate=predicate, include_bases=include_bases
):
yield sub_name, sub_obj
else:
yield name, obj
# ##########
# Unit Tests
# ##########
def test_import_from_file():
from copy import deepcopy
from pytest import raises as pytest_raises
from tempfile import NamedTemporaryFile
current_path = deepcopy(sys.path)
with NamedTemporaryFile() as junk_fh:
junk_path = junk_fh.name
with pytest_raises(ImportError):
module = import_from_file(junk_path)
assert current_path == sys.path