Source code for jwst.pipeline.calwebb_spec2

from collections import defaultdict
import os.path as op
import traceback

from .. import datamodels
from ..assign_wcs.util import NoDataOnDetectorError
from ..lib.pipe_utils import is_tso
from ..stpipe import Pipeline

# step imports
from ..assign_wcs import assign_wcs_step
from ..background import background_step
from ..imprint import imprint_step
from ..msaflagopen import msaflagopen_step
from ..extract_2d import extract_2d_step
from ..flatfield import flat_field_step
from ..srctype import srctype_step
from ..straylight import straylight_step
from ..fringe import fringe_step
from ..pathloss import pathloss_step
from ..barshadow import barshadow_step
from ..photom import photom_step
from ..cube_build import cube_build_step
from ..extract_1d import extract_1d_step
from ..resample import resample_spec_step

__all__ = ['Spec2Pipeline']


[docs]class Spec2Pipeline(Pipeline): """ Spec2Pipeline: Processes JWST spectroscopic exposures from Level 2a to 2b. Accepts a single exposure or an association as input. Included steps are: assign_wcs, background subtraction, NIRSpec MSA imprint subtraction, NIRSpec MSA bad shutter flagging, 2-D subwindow extraction, flat field, source type decision, straylight, fringe, pathloss, barshadow, photom, resample_spec, cube_build, and extract_1d. """ spec = """ save_bsub = boolean(default=False) # Save background-subracted science fail_on_exception = boolean(default=True) # Fail if any product fails. """ # Define aliases to steps step_defs = { 'bkg_subtract': background_step.BackgroundStep, 'assign_wcs': assign_wcs_step.AssignWcsStep, 'imprint_subtract': imprint_step.ImprintStep, 'msa_flagging': msaflagopen_step.MSAFlagOpenStep, 'extract_2d': extract_2d_step.Extract2dStep, 'flat_field': flat_field_step.FlatFieldStep, 'srctype': srctype_step.SourceTypeStep, 'straylight': straylight_step.StraylightStep, 'fringe': fringe_step.FringeStep, 'pathloss': pathloss_step.PathLossStep, 'barshadow': barshadow_step.BarShadowStep, 'photom': photom_step.PhotomStep, 'resample_spec': resample_spec_step.ResampleSpecStep, 'cube_build': cube_build_step.CubeBuildStep, 'extract_1d': extract_1d_step.Extract1dStep } # Main processing
[docs] def process(self, input): """Entrypoint for this pipeline Parameters ---------- input: str, Level2 Association, or DataModel The exposure or association of exposures to process """ self.log.info('Starting calwebb_spec2 ...') # Retrieve the input(s) asn = self.load_as_level2_asn(input) # Each exposure is a product in the association. # Process each exposure. has_exceptions = False for product in asn['products']: self.log.info('Processing product {}'.format(product['name'])) self.output_file = product['name'] try: result = self.process_exposure_product( product, asn['asn_pool'], asn.filename ) except NoDataOnDetectorError as exception: # This error merits a special return # status if run from the command line. # Bump it up now. raise exception except Exception as exception: traceback.print_exc() has_exceptions = True else: if result is not None: self.save_model(result) self.closeout(to_close=[result]) if has_exceptions and self.fail_on_exception: raise RuntimeError( 'One or more products failed to process. Failing calibration.' ) # We're done self.log.info('Ending calwebb_spec2')
# Process each exposure
[docs] def process_exposure_product( self, exp_product, pool_name=' ', asn_file=' ' ): """Process an exposure found in the association product Parameters --------- exp_product: dict A Level2b association product. """ # Find all the member types in the product members_by_type = defaultdict(list) for member in exp_product['members']: members_by_type[member['exptype'].lower()].append(member['expname']) # Get the science member. Technically there should only be # one. We'll just get the first one found. science = members_by_type['science'] if len(science) != 1: self.log.warn( 'Wrong number of science exposures found in {}'.format( exp_product['name'] ) ) self.log.warn(' Using only first one.') science = science[0] self.log.info('Working on input %s ...', science) input = self.open_model(science) exp_type = input.meta.exposure.type tso_mode = is_tso(input) WFSS_TYPES = ["NIS_WFSS", "NRC_WFSS"] # Apply WCS info # check the datamodel to see if it's # a grism image, if so get the catalog # name from the asn and record it to the meta if exp_type in WFSS_TYPES: try: input.meta.source_catalog.filename = members_by_type['sourcecat'][0] except IndexError: if input.meta.source_catalog.filename is None: raise IndexError("No source catalog specified in association or datamodel") assign_wcs_exception = None try: input = self.assign_wcs(input) except Exception as exception: assign_wcs_exception = exception # Do background processing, if necessary if exp_type in WFSS_TYPES or len(members_by_type['background']) > 0: if exp_type in WFSS_TYPES: bkg_list = [] # will be overwritten by the step else: bkg_list = members_by_type['background'] # Setup for saving self.bkg_subtract.suffix = 'bsub' if isinstance(input, datamodels.CubeModel): self.bkg_subtract.suffix = 'bsubints' # Backwards compatibility if self.save_bsub: self.bkg_subtract.save_results = True # Call the background subtraction step input = self.bkg_subtract(input, bkg_list) # If assign_wcs was skipped, abort the rest of processing, # because so many downstream steps depend on the WCS if assign_wcs_exception is not None or \ input.meta.cal_step.assign_wcs != 'COMPLETE': message = ( 'Assign_wcs processing was skipped.' '\nAborting remaining processing for this exposure.' '\nNo output product will be created.' ) if self.assign_wcs.skip: self.log.warning(message) return else: self.log.error(message) if assign_wcs_exception is not None: raise assign_wcs_exception else: raise RuntimeError('Cannot determine WCS.') # Apply NIRSpec MSA imprint subtraction # Technically there should be just one. # We'll just get the first one found imprint = members_by_type['imprint'] if exp_type in ['NRS_MSASPEC', 'NRS_IFU'] and \ len(imprint) > 0: if len(imprint) > 1: self.log.warn('Wrong number of imprint members') imprint = imprint[0] input = self.imprint_subtract(input, imprint) # Apply NIRSpec MSA bad shutter flagging if exp_type in ['NRS_MSASPEC', 'NRS_IFU']: input = self.msa_flagging(input) # This makes it clear that flat_field # should be done before extract_2d for all WFSS/GRISM data. if exp_type in ['NRC_WFSS', 'NIS_WFSS', 'NRC_TSGRISM']: # Apply flat-field correction input = self.flat_field(input) input = self.extract_2d(input) else: # Extract 2D sub-windows for NIRSpec slit and MSA if exp_type in ['NRS_FIXEDSLIT', 'NRS_BRIGHTOBJ', 'NRS_MSASPEC']: input = self.extract_2d(input) # Apply flat-field correction input = self.flat_field(input) # Apply the source type decision step input = self.srctype(input) # Apply the straylight correction for MIRI MRS if exp_type == 'MIR_MRS': input = self.straylight(input) # Apply the fringe correction for MIRI MRS if exp_type == 'MIR_MRS': input = self.fringe(input) # Apply pathloss correction to NIRSpec exposures if exp_type in ['NRS_FIXEDSLIT', 'NRS_MSASPEC', 'NRS_IFU']: input = self.pathloss(input) # Apply barshadow correction to NIRSPEC MSA exposures if exp_type == 'NRS_MSASPEC': input = self.barshadow(input) # Apply flux calibration result = self.photom(input) # Record ASN pool and table names in output result.meta.asn.pool_name = pool_name result.meta.asn.table_name = op.basename(asn_file) # Setup to save the calibrated exposure at end of step. if tso_mode: self.suffix = 'calints' else: self.suffix = 'cal' # Produce a resampled product, either via resample_spec for # "regular" spectra or cube_build for IFU data. No resampled # product is produced for time-series modes. if exp_type in ['NRS_FIXEDSLIT', 'NRS_MSASPEC', 'MIR_LRS-FIXEDSLIT'] \ and not isinstance(result, datamodels.CubeModel): # Call the resample_spec step for 2D slit data self.resample_spec.suffix = 's2d' result_extra = self.resample_spec(result) elif exp_type in ['MIR_MRS', 'NRS_IFU']: # Call the cube_build step for IFU data; # always create a single cube containing multiple # wavelength bands self.cube_build.output_type = 'multi' self.cube_build.suffix = 's3d' self.cube_build.save_results = False result_extra = self.cube_build(result) self.save_model(result_extra[0], 's3d') else: result_extra = result # Extract a 1D spectrum from the 2D/3D data if tso_mode: self.extract_1d.suffix = 'x1dints' else: self.extract_1d.suffix = 'x1d' x1d_result = self.extract_1d(result_extra) result_extra.close() x1d_result.close() # That's all folks self.log.info( 'Finished processing product {}'.format(exp_product['name']) ) return result