Source code for jwst.associations.generate

import logging

from .association import (
    make_timestamp
)
from .lib.process_list import (
    ProcessList,
    ProcessQueueSorted
)

# Configure logging
logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())


[docs]def generate(pool, rules, version_id=None): """Generate associations in the pool according to the rules. Parameters ---------- pool: AssociationPool The pool to generate from. rules: Associations The associaton rule set. version_id: None, True, or str The string to use to tag associations and products. If None, no tagging occurs. If True, use a timestamp If a string, the string. Returns ------- associations: [association[,...]] List of associations Notes ----- Refer to the :ref:`Association Generator <association-generator>` documentation for a full description. """ associations = [] if type(version_id) is bool: version_id = make_timestamp() process_queue = ProcessQueueSorted([ ProcessList( items=pool, rules=[rule for _, rule in rules.items()] ) ]) for process_list in process_queue: for item in process_list.items: existing_asns, new_asns, to_process = generate_from_item( item, version_id, associations, rules, process_list ) associations.extend(new_asns) # If working on a process list EXISTING # remove any new `to_process` that is # also EXISTING. Prevent infinite loops. if process_list.work_over in (ProcessList.EXISTING, ProcessList.NONSCIENCE): to_process = [ to_process_list for to_process_list in to_process if to_process_list.work_over != process_list.work_over ] process_queue.extend(to_process) pass # Finalize found associations try: finalized_asns = rules.callback.reduce('finalize', associations) except KeyError: finalized_asns = associations return finalized_asns
[docs]def generate_from_item( item, version_id, associations, rules, process_list): """Either match or generate a new assocation Parameters ---------- item: dict The item to match to existing associations or generate new associations from version_id: str or None Version id to use with association creation. If None, no versioning is used. associations: [association, ...] List of already existing associations. If the item matches any of these, it will be added to them. rules: AssociationRegistry or None List of rules to create new associations process_list: ProcessList The `ProcessList` from which the current item belongs to. Returns ------- (associations, process_list): 3-tuple where existing_asns: [association,...] List of existing associations item belongs to. Empty if none match new_asns: [association,...] List of new associations item creates. Empty if none match process_list: [ProcessList, ...] List of process events. """ # Setup the rules allowed to be examined. if process_list.rules is None or len(process_list.rules) == 0: allowed_rules = list(rules.values()) else: allowed_rules = process_list.rules # Check membership in existing associations. existing_asns = [] reprocess_list = [] if process_list.work_over in ( ProcessList.BOTH, ProcessList.EXISTING, ProcessList.NONSCIENCE, ): associations = [ asn for asn in associations if type(asn) in allowed_rules ] existing_asns, reprocess_list = match_item( item, associations ) # Now see if this item will create new associatons. # By default, a item will not be allowed to create # an association based on rules of existing associations. reprocess = [] new_asns = [] if process_list.work_over in ( ProcessList.BOTH, ProcessList.RULES, ) and rules is not None: ignore_asns = set([type(asn) for asn in existing_asns]) new_asns, reprocess = rules.match( item, version_id=version_id, allow=allowed_rules, ignore=ignore_asns, ) reprocess_list.extend(reprocess) return existing_asns, new_asns, reprocess_list
[docs]def match_item(item, associations): """Match item to a list of associations Parameters ---------- item: dict The item to match to the associations. associations: [association, ...] List of already existing associations. If the item matches any of these, it will be added to them. Returns ------- (associations, process_list): 2-tuple where associations: [association,...] List of associations item belongs to. Empty if none match process_list: [ProcessList, ...] List of process events. """ item_associations = [] process_list = [] for asn in associations: if asn in item_associations: continue matches, reprocess = asn.add(item) process_list.extend(reprocess) if matches: item_associations.append(asn) return item_associations, process_list