Source code for jwst.associations.registry

"""Association Registry"""
from importlib import import_module
from inspect import (
    getmembers,
    isclass,
    isfunction,
    ismethod,
    ismodule
)
import logging
from os.path import (
    basename,
    dirname,
    expanduser,
    expandvars,
)
import sys

from . import libpath
from .exceptions import (
    AssociationError,
    AssociationNotValidError
)
from .lib.callback_registry import CallbackRegistry
from .lib.constraint import ConstraintTrue

__all__ = [
    'AssociationRegistry',
    'RegistryMarker'
]

# Configure logging
logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())

# Library files
_ASN_RULE = 'association_rules.py'


[docs]class AssociationRegistry(dict): """The available assocations Parameters ---------- definition_files: [str,] The files to find the association definitions in. include_default: bool True to include the default definitions. global_constraints: Constraint Constraints to be added to each rule. name: str An identifying string, used to prefix rule names. include_bases: bool If True, include base classes not considered rules. Attributes ---------- rule_set: {rule [, ...]} The rules in the registry. Methods ------- match(item) Return associations where `item` matches any of the rules. validate(association) Determine whether an association is valid, or complete, according to any of the rules in the registry. finalize(associations) Validate and execute post-processing hooks to produce a completed and valid set of associations. load(serialized) Create an association from a serialized form. Notes ----- The general workflow is as follows: * Create the registry >>> registry = AssociationRegistry() * Create associations from an item >>> associations, reprocess = registry.match(item) * Finalize the associations >>> final_asns = registry.finalize(assocations) In practice, this is one step in a larger loop over all items to be associated. This does not account for adding items to already existing associations. See :ref:`generate` for a full example of using the registry. """ def __init__(self, definition_files=None, include_default=True, global_constraints=None, name=None, include_bases=False): super(AssociationRegistry, self).__init__() # Generate a UUID for this instance. Used to modify rule # names. self.name = name # Callback registry self.callback = CallbackRegistry() # Precache the set of rules self._rule_set = set() if definition_files is None: definition_files = [] if include_default: definition_files.insert(0, libpath(_ASN_RULE)) if len(definition_files) <= 0: raise AssociationError('No rule definition files specified.') self.schemas = [] self.Utility = type('Utility', (object,), {}) for fname in definition_files: module = import_from_file(fname) self.populate( module, global_constraints=global_constraints, include_bases=include_bases ) @property def rule_set(self): return self._rule_set
[docs] def match(self, item, version_id=None, allow=None, ignore=None): """See if item belongs to any of the associations defined. Parameters ---------- item: dict A item, like from a Pool, to find assocations for. version_id: str If specified, a string appened to association names. If None, nothing is used. allow: [type(Association), ...] List of rules to allow to be matched. If None, all available rules will be used. ignore: list A list of associations to ignore when looking for a match. Intended to ensure that already created associations are not re-created. Returns ------- (associations, reprocess_list): 2-tuple associations: [association,...] List of associations item belongs to. Empty if none match reprocess_list: [AssociationReprocess, ...] List of reprocess events. """ if allow is None: allow = self.rule_set if ignore is None: ignore = [] associations = [] process_list = [] for name, rule in self.items(): if rule not in ignore and rule in allow: asn, reprocess = rule.create(item, version_id) process_list.extend(reprocess) if asn is not None: associations.append(asn) return associations, process_list
[docs] def validate(self, association): """Validate a given association against schema Parameters ---------- association: association-like The data to validate Returns ------- rules: list List of rules that validated Raises ------ AssociationNotValidError Association did not validate """ # Change rule validation from an exception # to a boolean def is_valid(rule, association): try: rule.validate(association) except AssociationNotValidError: return False else: return True results = [ rule for rule_name, rule in self.items() if is_valid(rule, association) ] if len(results) == 0: raise AssociationNotValidError( 'Structure did not validate: "{}"'.format(association) ) return results
[docs] def load( self, serialized, format=None, validate=True, first=True, **kwargs ): """Marshall a previously serialized association Parameters ---------- serialized: object The serialized form of the association. format: str or None The format to force. If None, try all available. validate: bool Validate against the class' defined schema, if any. first: bool A serialization potentially matches many rules. Only return the first succesful load. kwargs: dict Other arguments to pass to the `load` method Returns ------- The Association object, or the list of association objects. Raises ------ AssociationError Cannot create or validate the association. """ results = [] for rule_name, rule in self.items(): try: results.append( rule.load( serialized, format=format, validate=validate, **kwargs ) ) except (AssociationError, AttributeError) as err: lasterr = err continue if first: break if len(results) == 0: raise lasterr if first: return results[0] else: return results
[docs] def populate(self, module, global_constraints=None, include_bases=None ): """Parse out all rules and callbacks in a module Parameters ---------- module: module The module, and all submodules, to be parsed. Modifies -------- self.callback Found callbacks are added to the callback registry """ for name, obj in get_marked(module, include_bases=include_bases): # Add rules. if (include_bases and isclass(obj)) or\ obj._asnreg_role == 'rule': self.add_rule(name, obj, global_constraints=global_constraints) continue # Add callbacks if obj._asnreg_role == 'callback': for event in obj._asnreg_events: self.callback.add(event, obj) continue # Add schema if obj._asnreg_role == 'schema': self.schemas.append(obj._asnreg_schema) continue # Add utility classes if obj._asnreg_role == 'utility': self.Utility = type( 'Utility', (obj, self.Utility), {} )
[docs] def add_rule(self, name, obj, global_constraints=None): """Add object as rule to registry Parameters ---------- name: str Name of the object obj: object The object to be considered a rule global_constraints: dict The global constraints to attach to the rule. """ try: rule_name = '_'.join([self.name, name]) except TypeError: rule_name = name rule = type(rule_name, (obj,), {}) rule.GLOBAL_CONSTRAINT = global_constraints rule.registry = self self.__setitem__(rule_name, rule) self._rule_set.add(rule)
[docs]class RegistryMarker: """Mark rules, callbacks, and module""" class Schema: def __init__(self, obj): self._asnreg_role = 'schema' self._asnreg_schema = obj RegistryMarker.mark(self) @property def schema(self): return self._asnreg_schema
[docs] @staticmethod def mark(obj): """Mark that object should be part of the registry Parameters ---------- obj: object The object to mark Modifies -------- _asnreg_mark: True Attribute added to object and is set to True _asnreg_role: str or None Attribute added to object indicating role this object plays. If None, no particular role is indicated. Returns ------- obj: object Return object to enable use as a decorator. """ obj._asnreg_marked = True obj._asnreg_role = getattr(obj, '_asnreg_role', None) return obj
[docs] @staticmethod def rule(obj): """Mark object as rule Parameters ---------- obj: object The object that should be treated as a rule Modifies -------- _asnreg_role: 'rule' Attributed added to object and set to `rule` _asnreg_mark: True Attributed added to object and set to True Returns ------- obj: object Return object to enable use as a decorator. """ obj._asnreg_role = 'rule' RegistryMarker.mark(obj) return obj
[docs] @staticmethod def callback(event): """Mark object as a callback for an event Parameters ---------- event: str Event this is a callback for. obj: func Function, or any callable, to be called when the corresponding event is triggered. Modifies -------- _asnreg_role: 'callback' Attributed added to object and set to `rule` _asnreg_events: [event[, ...]] The events this callable object is a callback for. _asnreg_mark: True Attributed added to object and set to True Returns ------- obj: object Return object to enable use as a decorator. """ def decorator(func): try: events = func._asnreg_events except AttributeError: events = list() events.append(event) RegistryMarker.mark(func) func._asnreg_role = 'callback' func._asnreg_events = events return func return decorator
[docs] @staticmethod def schema(filename): """Mark a file as a schema source""" schema = RegistryMarker.Schema(filename) return schema
[docs] @staticmethod def utility(class_obj): """Mark the class as a Utility class""" class_obj._asnreg_role = 'utility' RegistryMarker.mark(class_obj) return class_obj
[docs] @staticmethod def is_marked(obj): return hasattr(obj, '_asnreg_marked')
# Utilities def import_from_file(filename): """Import a file as a module Parameters --------- filename: str The file to import Returns ------- module: python module The imported module """ path = expandvars(expanduser(filename)) module_name = basename(path).split('.')[0] folder = dirname(path) sys.path.insert(0, folder) try: module = import_module(module_name) finally: sys.path.pop(0) return module def get_marked(module, predicate=None, include_bases=False): """Recursively get all executable objects Parameters ---------- module: python module The module to examine predicate: bool func(object) Determinant of what gets returned. If None, all object types are examined include_bases: bool If True, include base classes not considered rules. Returns ------- class object: generator A generator that will yield all class members in the module. """ def is_method(obj): return (isfunction(obj) or ismethod(obj) ) for name, obj in getmembers(module, predicate): if isclass(obj): for sub_name, sub_obj in get_marked(obj, predicate=is_method): yield sub_name, sub_obj if RegistryMarker.is_marked(obj) or include_bases: yield name, obj elif RegistryMarker.is_marked(obj): if ismodule(obj): for sub_name, sub_obj in get_marked( obj, predicate=predicate, include_bases=include_bases ): yield sub_name, sub_obj else: yield name, obj # ########## # Unit Tests # ########## def test_import_from_file(): from copy import deepcopy from pytest import raises as pytest_raises from tempfile import NamedTemporaryFile current_path = deepcopy(sys.path) with NamedTemporaryFile() as junk_fh: junk_path = junk_fh.name with pytest_raises(ImportError): module = import_from_file(junk_path) assert current_path == sys.path