#!/usr/bin/env python
import os.path as op
from ..stpipe import Pipeline
from .. import datamodels
from ..model_blender import blendmeta
# step imports
from ..coron import stack_refs_step
from ..coron import align_refs_step
from ..coron import klip_step
from ..outlier_detection import outlier_detection_step
from ..resample import resample_step
__all__ = ['Coron3Pipeline']
[docs]class Coron3Pipeline(Pipeline):
"""Class for defining Coron3Pipeline.
Coron3Pipeline: Apply all level-3 calibration steps to a
coronagraphic association of exposures. Included steps are:
#. stack_refs (assemble reference PSF inputs)
#. align_refs (align reference PSFs to target images)
#. klip (PSF subtraction using the KLIP algorithm)
#. outlier_detection (flag outliers)
#. resample (image combination and resampling)
"""
spec = """
suffix = string(default='i2d')
"""
# Define aliases to steps
step_defs = {
'stack_refs': stack_refs_step.StackRefsStep,
'align_refs': align_refs_step.AlignRefsStep,
'klip': klip_step.KlipStep,
'outlier_detection': outlier_detection_step.OutlierDetectionStep,
'resample': resample_step.ResampleStep
}
[docs] def process(self, input):
"""Primary method for performing pipeline."""
self.log.info('Starting calwebb_coron3 ...')
# Load the input association table
asn = self.load_as_level3_asn(input)
acid = asn.get('asn_id', '')
# We assume there's one final product defined by the association
prod = asn['products'][0]
self.output_file = prod.get('name', self.output_file)
# Construct lists of all the PSF and science target members
psf_files = [m['expname'] for m in prod['members']
if m['exptype'].upper() == 'PSF']
targ_files = [m['expname'] for m in prod['members']
if m['exptype'].upper() == 'SCIENCE']
# Make sure we found some PSF and target members
if len(psf_files) == 0:
err_str1 = 'No reference PSF members found in association table.'
self.log.error(err_str1)
self.log.error('Calwebb_coron3 processing will be aborted')
return
if len(targ_files) == 0:
err_str1 = 'No science target members found in association table'
self.log.error(err_str1)
self.log.error('Calwebb_coron3 processing will be aborted')
return
# Assemble all the input psf files into a single ModelContainer
psf_models = datamodels.ModelContainer()
for i in range(len(psf_files)):
psf_input = datamodels.CubeModel(psf_files[i])
psf_models.append(psf_input)
psf_input.close()
# Stack all the PSF images into a single CubeModel
psf_stack = self.stack_refs(psf_models)
psf_models.close()
# Save the resulting PSF stack
self.save_model(psf_stack, suffix='psfstack')
# Call the sequence of steps align_refs, klip, and outlier_detection
# once for each input target exposure
resample_input = datamodels.ModelContainer()
for target_file in targ_files:
# Call align_refs
self.log.debug('Calling align_refs for member %s', target_file)
psf_aligned = self.align_refs(target_file, psf_stack)
# Save the alignment results
self.save_model(
psf_aligned, output_file=target_file,
suffix='psfalign', acid=acid
)
# Call KLIP
self.log.debug('Calling klip for member %s', target_file)
psf_sub = self.klip(target_file, psf_aligned)
psf_aligned.close()
# Save the psf subtraction results
self.save_model(
psf_sub, output_file=target_file,
suffix='psfsub', acid=acid
)
# Create a ModelContainer of the psf_sub results to send to
# outlier_detection
self.log.debug('Building ModelContainer of klip results')
target_models = datamodels.ModelContainer()
for i in range(psf_sub.data.shape[0]):
image = datamodels.ImageModel(data=psf_sub.data[i],
err=psf_sub.err[i],
dq=psf_sub.dq[i])
image.update(psf_sub)
image.meta.wcs = psf_sub.meta.wcs
target_models.append(image)
# Call outlier_detection
target_models = self.outlier_detection(target_models)
# Create Level 2c products
if target_models[0].meta.cal_step.outlier_detection == 'COMPLETE':
err_str1 = "Creating Level 2c output with updated DQ arrays..."
self.log.info(err_str1)
lev2c_model = psf_sub.copy()
# Replace Level 2b product DQ array with Level 2c DQ array
for i in range(len(target_models)):
lev2c_model.dq[i] = target_models[i].dq
lev2c_model.meta.cal_step.outlier_detection = 'COMPLETE'
self.save_model(lev2c_model, output_file=target_file,
suffix='crfints', acid=acid)
# Append results from this target exposure to resample input model
for i in range(len(target_models)):
resample_input.append(target_models[i])
# Call the resample step to combine all psf-subtracted target images
result = self.resample(resample_input)
if result == resample_input:
# Resampling was skipped,
# yet we need to return a DrizProductModel, so...
warn1 = 'Creating fake resample results until step is available'
self.log.warning(warn1)
result = datamodels.DrizProductModel(data=resample_input[0].data,
con=resample_input[0].dq,
wht=resample_input[0].err)
result.update(resample_input[0])
# The resample step blends headers already...
self.log.debug('Blending metadata for {}'.format(
result.meta.filename))
blendmeta.blendmodels(result, inputs=targ_files)
result.meta.asn.pool_name = asn['asn_pool']
result.meta.asn.table_name = op.basename(input)
# Save the final result
self.save_model(result, suffix=self.suffix)
# We're done
self.log.info('...ending calwebb_coron3')
return