# Licensed under a 3-clause BSD style license - see LICENSE.rst
# -*- coding: utf-8 -*-
"""
Models used by the JWST pipeline.
The models are written using the astropy.modeling framework.
Since they are specific to JWST, the models and their ASDF schemas
are kept here separately from astropy. An ASDF extension for this package is
registered with ASDF through entry points.
"""
import math
from collections import namedtuple
import numpy as np
from astropy.modeling.core import Model
from astropy.modeling.parameters import Parameter, InputParameterError
from astropy.modeling.models import Rotation2D
from astropy.utils import isiterable
__all__ = ['AngleFromGratingEquation', 'WavelengthFromGratingEquation',
'Unitless2DirCos', 'DirCos2Unitless', 'Rotation3DToGWA', 'Gwa2Slit',
'Slit2Msa', 'Snell', 'Logical', 'NirissSOSSModel', 'V23ToSky', 'Slit',
'NIRCAMForwardRowGrismDispersion', 'NIRCAMForwardColumnGrismDispersion',
'NIRCAMBackwardGrismDispersion', 'MIRI_AB2Slice', 'GrismObject',
'NIRISSForwardRowGrismDispersion', 'NIRISSForwardColumnGrismDispersion',
'NIRISSBackwardGrismDispersion', 'V2V3ToIdeal', 'IdealToV2V3']
N_SHUTTERS_QUADRANT = 62415
""" Number of shutters per quadrant in the NIRSPEC MSA shutter array"""
Slit = namedtuple('Slit', ["name", "shutter_id", "xcen", "ycen",
"ymin", "ymax", "quadrant", "source_id", "shutter_state",
"source_name", "source_alias", "stellarity",
"source_xpos", "source_ypos"])
""" Nirspec Slit structure definition"""
Slit.__new__.__defaults__ = ("", 0, 0.0, 0.0, 0.0, 0.0, 0, 0, "", "", "", "",
0.0, 0.0, 0.0)
[docs]class GrismObject(namedtuple('GrismObject', ("sid",
"order_bounding",
"sky_centroid",
"partial_order",
"waverange",
"sky_bbox_ll",
"sky_bbox_lr",
"sky_bbox_ur",
"sky_bbox_ul",
"xcentroid",
"ycentroid",
), rename=False)):
""" Grism Objects identified from a direct image catalog and segment map.
Notes
-----
The object bounding box is computed from the segementation map,
using the min and max wavelegnth for each of the orders that
are available. The order_bounding member is a dictionary of
bounding boxes for the object keyed by order
ra and dec are the sky ra and dec of the center of the object as measured
from the non-dispersed image.
the segment_[ra/dec][min/max] are also as measured on the direct image
order_bounding is stored as a lookup dictionary per order and contains
the object x,y bounding location on the grism image
GrismObject(order_bounding={"+1":((xmin,xmax),(ymin,ymax)),"+2":((2,3),(2,3))})
``sky_bbox_??`` contains the ra,dec,frame information for the bbox from the catalog
"""
__slots__ = () # prevent instance dictionary for lower memory
def __new__(cls,
sid=None,
order_bounding={},
sky_centroid=None,
partial_order=False,
waverange=None,
sky_bbox_ll=None,
sky_bbox_lr=None,
sky_bbox_ur=None,
sky_bbox_ul=None,
xcentroid=None,
ycentroid=None):
return super(GrismObject, cls).__new__(cls,
sid=sid,
order_bounding=order_bounding,
sky_centroid=sky_centroid,
partial_order=partial_order,
waverange=waverange,
sky_bbox_ll=sky_bbox_ll,
sky_bbox_lr=sky_bbox_lr,
sky_bbox_ur=sky_bbox_ur,
sky_bbox_ul=sky_bbox_ul,
xcentroid=xcentroid,
ycentroid=ycentroid)
def __str__(self):
"""Return a pretty print for the object information."""
return ("id: {0}\n"
"order_bounding {1}\n"
"sky_centroid: {2}\n"
"sky_bbox_ll: {3}\n"
"sky_bbox_lr: {4}\n"
"sky_bbox_ur: {5}\n"
"sky_bbox_ul:{6}\n"
"xcentroid: {7}\n"
"ycentroid: {8}\n"
"partial_order: {9}\n"
"waverange: {10}\n"
.format(self.sid,
str(self.order_bounding),
str(self.sky_centroid),
str(self.sky_bbox_ll),
str(self.sky_bbox_lr),
str(self.sky_bbox_ur),
str(self.sky_bbox_ul),
self.xcentroid,
self.ycentroid,
str(self.partial_order),
str(self.waverange)))
[docs]class MIRI_AB2Slice(Model):
"""
MIRI MRS alpha, beta to slice transform
Parameters
----------
beta_zero : float
beta_del : float
"""
standard_broadcasting = False
_separable = False
inputs = ("beta",)
""" "beta": the beta angle """
outputs = ("slice",)
""" "slice": Slice number"""
beta_zero = Parameter('beta_zero', default=0)
""" Beta_zero parameter"""
beta_del = Parameter('beta_del', default=1)
""" Beta_del parameter"""
channel = Parameter("channel", default=1)
""" MIRI MRS channel: one of 1, 2, 3, 4"""
[docs] @staticmethod
def evaluate(beta, beta_zero, beta_del, channel):
s = channel * 100 + (beta - beta_zero) / beta_del + 1
return _toindex(s)
[docs]class Snell(Model):
"""
Apply transforms, including Snell law, through the NIRSpec prism.
Parameters
----------
angle : flaot
Prism angle in deg.
kcoef : list
K coefficients in Sellmeir equation.
lcoef : list
L coefficients in Sellmeir equation.
tcoef : list
Thermal coefficients of glass.
tref : float
Refernce temperature in K.
pref : float
Refernce pressure in ATM.
temperature : float
System temperature during observation in K
pressure : float
System pressure during observation in ATM.
"""
standard_broadcasting = False
_separable = False
inputs = ("lam", "alpha_in", "beta_in", "zin")
outputs = ("alpha_out", "beta_out", "zout")
def __init__(self, angle, kcoef, lcoef, tcoef, tref, pref,
temperature, pressure, name=None):
self.prism_angle = angle
self.kcoef = np.array(kcoef, dtype=np.float)
self.lcoef = np.array(lcoef, dtype=np.float)
self.tcoef = np.array(tcoef, dtype=np.float)
self.tref = tref
self.pref = pref
self.temp = temperature
self.pressure = pref
super(Snell, self).__init__(name=name)
[docs] @staticmethod
def compute_refraction_index(lam, temp, tref, pref, pressure, kcoef, lcoef, tcoef):
"""Calculate and retrun the refraction index."""
# Convert to microns
lam = np.asarray(lam * 1e6)
KtoC = 273.15 # kelvin to celcius conversion
temp -= KtoC
tref -= KtoC
delt = temp - tref
K1, K2, K3 = kcoef
L1, L2, L3 = lcoef
D0, D1, D2, E0, E1, lam_tk = tcoef
if delt < 20:
n = np.sqrt(1. +
K1 * lam**2 / (lam**2 - L1) +
K2 * lam**2 / (lam**2 - L2) +
K3 * lam**2 / (lam**2 - L3)
)
else:
# Derive the refractive index of air at the reference temperature and pressure
# and at the operational system's temperature and pressure.
nref = 1. + (6432.8 + 2949810. * lam**2 /
(146.0 * lam**2 - 1.) + (5540.0 * lam**2) /
(41.0 * lam**2 - 1.)) * 1e-8
# T should be in C, P should be in ATM
nair_obs = 1.0 + ((nref - 1.0) * pressure) / (1.0 + (temp - 15.) * 3.4785e-3)
nair_ref = 1.0 + ((nref - 1.0) * pref) / (1.0 + (tref - 15) * 3.4785e-3)
# Compute the relative index of the glass at Tref and Pref using Sellmeier equation I.
lamrel = lam * nair_obs / nair_ref
nrel = np.sqrt(1. +
K1 * lamrel ** 2 / (lamrel ** 2 - L1) +
K2 * lamrel ** 2 / (lamrel ** 2 - L2) +
K3 * lamrel ** 2 / (lamrel ** 2 - L3)
)
# Convert the relative index of refraction at the reference temperature and pressure
# to absolute.
nabs_ref = nrel * nair_ref
# Compute the absolute index of the glass
delnabs = (0.5 * (nrel ** 2 - 1.) / nrel) * \
(D0 * delt + D1 * delt ** 2 + D2 * delt ** 3 + \
(E0 * delt + E1 * delt ** 2) / (lamrel ** 2 - lam_tk ** 2))
nabs_obs = nabs_ref + delnabs
# Define the relative index at the system's operating T and P.
n = nabs_obs / nair_obs
return n
[docs] def evaluate(self, lam, alpha_in, beta_in, zin):
"""Go through the prism"""
n = self.compute_refraction_index(lam, self.temp, self.tref, self.pref, self.pressure,
self.kcoef, self.lcoef, self.tcoef)
# Apply Snell's law through front surface, eq 5.3.3 II
xout = alpha_in / n
yout = beta_in / n
zout = np.sqrt(1.0 - xout**2 - yout**2)
# Go to back surface frame # eq 5.3.3 III
y_rotation = Rotation3DToGWA([self.prism_angle], "y")
xout, yout, zout = y_rotation(xout, yout, zout)
# Reflection on back surface
xout = -1 * xout
yout = -1 * yout
# Back to front surface
y_rotation = Rotation3DToGWA([-self.prism_angle], "y")
xout, yout, zout = y_rotation(xout, yout, zout)
# Snell's refraction law through front surface
xout = xout * n
yout = yout * n
zout = np.sqrt(1.0 - xout**2 - yout**2)
return xout, yout, zout
class RefractionIndexFromPrism(Model):
"""
Compute the refraction index of a prism (NIRSpec).
Parameters
----------
prism_angle : float
Prism angle in deg.
"""
standard_broadcasting = False
_separable = False
inputs = ("alpha_in", "beta_in", "alpha_out",)
outputs = ("n")
prism_angle = Parameter(setter=np.deg2rad, getter=np.rad2deg)
def __init__(self, prism_angle, name=None):
super(RefractionIndexFromPrism, self).__init__(prism_angle=prism_angle, name=name)
def evaluate(self, alpha_in, beta_in, alpha_out, prism_angle):
sangle = (math.sin(prism_angle))
cangle = (math.cos(prism_angle))
nsq = ((alpha_out + alpha_in * (1 - 2 * sangle ** 2)) / (2 * sangle * cangle)) ** 2 + \
alpha_in ** 2 + beta_in ** 2
return np.sqrt(nsq)
[docs]class AngleFromGratingEquation(Model):
"""
Solve the 3D Grating Dispersion Law for the refracted angle.
Parameters
----------
groove_density : int
Grating ruling density.
order : int
Spectral order.
"""
_separable = False
inputs = ("lam", "alpha_in", "beta_in", "z")
""" Wavelength and 3 angle coordinates going into the grating."""
outputs = ("alpha_out", "beta_out", "zout")
""" Three angles coming out of the grating. """
groove_density = Parameter()
""" Grating ruling density."""
order = Parameter(default=-1)
""" Spectral order."""
[docs] def evaluate(self, lam, alpha_in, beta_in, z, groove_density, order):
if alpha_in.shape != beta_in.shape != z.shape:
raise ValueError("Expected input arrays to have the same shape")
orig_shape = alpha_in.shape or (1,)
xout = -alpha_in - groove_density * order * lam
yout = - beta_in
zout = np.sqrt(1 - xout**2 - yout**2)
xout.shape = yout.shape = zout.shape = orig_shape
return xout, yout, zout
[docs]class WavelengthFromGratingEquation(Model):
"""
Solve the 3D Grating Dispersion Law for the wavelength.
Parameters
----------
groove_density : int
Grating ruling density.
order : int
Spectral order.
"""
_separable = False
inputs = ("alpha_in", "beta_in", "alpha_out")
""" three angle - alpha_in and beta_in going into the grating and alpha_out coming out of the grating."""
outputs = ("lam",)
""" Wavelength."""
groove_density = Parameter()
""" Grating ruling density."""
order = Parameter(default=1)
""" Spectral order."""
[docs] def evaluate(self, alpha_in, beta_in, alpha_out, groove_density, order):
# beta_in is not used in this equation but is here because it's
# needed for the prism computation. Currently these two computations
# need to have the same interface.
return -(alpha_in + alpha_out) / (groove_density * order)
[docs]class Unitless2DirCos(Model):
"""
Transform a vector to directional cosines.
"""
_separable = False
inputs = ('x', 'y')
outputs = ('x', 'y', 'z')
[docs] def evaluate(self, x, y):
vabs = np.sqrt(1. + x**2 + y**2)
cosa = x / vabs
cosb = y / vabs
cosc = 1. / vabs
return cosa, cosb, cosc
def inverse(self):
return DirCos2Unitless()
[docs]class DirCos2Unitless(Model):
"""
Transform directional cosines to vector.
"""
_separable = False
inputs = ('x', 'y', 'z')
outputs = ('x', 'y')
[docs] def evaluate(self, x, y, z):
return x / z, y / z
def inverse(self):
return Unitless2DirCos()
[docs]class Rotation3DToGWA(Model):
"""
Perform a 3D rotation given an angle in degrees.
Positive angles represent a counter-clockwise rotation and vice-versa.
Parameters
----------
angles : array-like
Angles of rotation in deg in the order of axes_order.
axes_order : str
A sequence of 'x', 'y', 'z' corresponding of axis of rotation/
"""
standard_broadcasting = False
_separable = False
separable = False
inputs = ('x', 'y', 'z')
outputs = ('x', 'y', 'z')
angles = Parameter(getter=np.rad2deg, setter=np.deg2rad)
def __init__(self, angles, axes_order, name=None):
if len(angles) != len(axes_order):
raise InputParameterError(
"Number of angles must equal number of axes in axes_order.")
self.axes = ['x', 'y', 'z']
unrecognized = set(axes_order).difference(self.axes)
if unrecognized:
raise ValueError("Unrecognized axis label {0}; "
"should be one of {1} ".format(unrecognized,
self.axes))
self.axes_order = axes_order
self._func_map = {'x': self._xrot,
'y': self._yrot,
'z': self._zrot
}
super(Rotation3DToGWA, self).__init__(angles, name=name)
@property
def inverse(self):
"""Inverse rotation."""
angles = self.angles.value[::-1] * -1
return self.__class__(angles, self.axes_order[::-1])
def _xrot(self, x, y, z, theta):
xout = x
yout = y * np.cos(theta) + z * np.sin(theta)
zout = np.sqrt(1 - xout ** 2 - yout ** 2)
return [xout, yout, zout]
def _yrot(self, x, y, z, theta):
xout = x * np.cos(theta) - z * np.sin(theta)
yout = y
zout = np.sqrt(1 - xout ** 2 - yout ** 2)
return [xout, yout, zout]
def _zrot(self, x, y, z, theta):
xout = x * np.cos(theta) + y * np.sin(theta)
yout = -x * np.sin(theta) + y * np.cos(theta)
zout = np.sqrt(1 - xout ** 2 - yout ** 2)
return [xout, yout, zout]
[docs] def evaluate(self, x, y, z, angles):
"""
Apply the rotation to a set of 3D Cartesian coordinates.
"""
if x.shape != y.shape != z.shape:
raise ValueError("Expected input arrays to have the same shape")
# Note: If the original shape was () (an array scalar) convert to a
# 1-element 1-D array on output for consistency with most other models
orig_shape = x.shape or (1,)
for ang, ax in zip(angles[0], self.axes_order):
x, y, z = self._func_map[ax](x, y, z, theta=ang)
x.shape = y.shape = z.shape = orig_shape
return x, y, z
class Rotation3D(Model):
"""
Perform a series of rotations about different axis in 3D space.
Positive angles represent a counter-clockwise rotation.
Parameters
----------
angles : array-like
Angles of rotation in deg in the order of axes_order.
axes_order : str
A sequence of 'x', 'y', 'z' corresponding of axis of rotation.
"""
standard_broadcasting = False
_separable = False
inputs = ('x', 'y', 'z')
outputs = ('x', 'y', 'z')
angles = Parameter(getter=np.rad2deg, setter=np.deg2rad)
def __init__(self, angles, axes_order, name=None):
self.axes = ['x', 'y', 'z']
unrecognized = set(axes_order).difference(self.axes)
if unrecognized:
raise ValueError("Unrecognized axis label {0}; "
"should be one of {1} ".format(unrecognized,
self.axes))
self.axes_order = axes_order
if len(angles) != len(axes_order):
raise ValueError("The number of angles {0} should match the number \
of axes {1}.".format(len(angles),
len(axes_order)))
super(Rotation3D, self).__init__(angles, name=name)
@property
def inverse(self):
"""Inverse rotation."""
angles = self.angles.value[::-1] * -1
return self.__class__(angles, axes_order=self.axes_order[::-1])
@staticmethod
def _compute_matrix(angles, axes_order):
if len(angles) != len(axes_order):
raise InputParameterError(
"Number of angles must equal number of axes in axes_order.")
matrices = []
for angle, axis in zip(angles, axes_order):
matrix = np.zeros((3, 3), dtype=np.float)
if axis == 'x':
mat = Rotation3D.rotation_matrix_from_angle(angle)
matrix[0, 0] = 1
matrix[1:, 1:] = mat
elif axis == 'y':
mat = Rotation3D.rotation_matrix_from_angle(-angle)
matrix[1, 1] = 1
matrix[0, 0] = mat[0, 0]
matrix[0, 2] = mat[0, 1]
matrix[2, 0] = mat[1, 0]
matrix[2, 2] = mat[1, 1]
elif axis == 'z':
mat = Rotation3D.rotation_matrix_from_angle(angle)
matrix[2, 2] = 1
matrix[:2, :2] = mat
else:
raise ValueError("Expected axes_order to be a combination \
of characters 'x', 'y' and 'z', got {0}".format(
set(axes_order).difference(['x', 'y', 'z'])))
matrices.append(matrix)
if len(angles) == 1:
return matrix
elif len(matrices) == 2:
return np.dot(matrices[1], matrices[0])
else:
prod = np.dot(matrices[1], matrices[0])
for m in matrices[2:]:
prod = np.dot(m, prod)
return prod
@staticmethod
def rotation_matrix_from_angle(angle):
"""
Clockwise rotation matrix.
"""
return np.array([[math.cos(angle), -math.sin(angle)],
[math.sin(angle), math.cos(angle)]])
def evaluate(self, x, y, z, angles):
"""
Apply the rotation to a set of 3D Cartesian coordinates.
"""
if x.shape != y.shape != z.shape:
raise ValueError("Expected input arrays to have the same shape")
# Note: If the original shape was () (an array scalar) convert to a
# 1-element 1-D array on output for consistency with most other models
orig_shape = x.shape or (1,)
inarr = np.array([x.flatten(), y.flatten(), z.flatten()])
result = np.dot(self._compute_matrix(angles[0], self.axes_order), inarr)
x, y, z = result[0], result[1], result[2]
x.shape = y.shape = z.shape = orig_shape
return x, y, z
class LRSWavelength(Model):
"""
The MIRI LRS wavelength solution implemented as an astropy.modeling.Model.
Parameters
----------
wavetable : ndarray
Array of wavelengths.
zero_point : tuple
The (X, Y) pixel coordinates of the wavelength zero point.
"""
standard_broadcasting = False
_separable = False
linear = False
fittable = False
inputs = ('x', 'y')
outputs = ('lambda',)
def __init__(self, wavetable, zero_point, name=None):
self._wavetable = wavetable
self._zero_point = zero_point
super(LRSWavelength, self).__init__(name=name)
@property
def wavetable(self):
return self._wavetable
@property
def zero_point(self):
return self._zero_point
def evaluate(self, x, y):
slitsize = 1.00076751 # The MIRI LRS slit size.
imx, imy = self.zero_point
dx = x - imx
dy = y - imy
if x.shape != y.shape:
raise ValueError("Inputs have different shape.")
x0 = self._wavetable[:, 3]
y0 = self._wavetable[:, 4]
x1 = self._wavetable[:, 5]
#y1 = self._wavetable[:, 6]
wave = self._wavetable[:, 2]
diff0 = (dy - y0[0])
ind = np.abs(np.asarray(diff0 / slitsize, dtype=np.int))
condition = np.logical_and(dy < y0[0], dy > y0[-1]) #, dx>x0, dx<x1)
xyind = condition.nonzero()
wavelength = np.zeros(condition.shape)
wavelength += np.nan
wavelength[xyind] = wave[ind[xyind]]
wavelength = wavelength.flatten()
wavelength[(dx[xyind] < x0[ind[xyind]]).nonzero()[0]] = np.nan
wavelength[(dx[xyind] > x1[ind[xyind]]).nonzero()[0]] = np.nan
wavelength.shape = condition.shape
return wavelength
[docs]class Gwa2Slit(Model):
"""
NIRSpec GWA to slit transform.
Parameters
----------
slits : list
A list of open slits.
A slit is a namedtupe of type `~jwst.transforms.models.Slit`
Slit("name", "shutter_id", "xcen", "ycen", "ymin", "ymax",
"quadrant", "source_id", "shutter_state", "source_name",
"source_alias", "stellarity", "source_xpos", "source_ypos"])
models : list
List of models (`~astropy.modeling.core.Model`) corresponding to the
list of slits.
"""
_separable = False
inputs = ('name', 'angle1', 'angle2', 'angle3')
""" Name of the slit and the three angle coordinates at the GWA going from detector to sky."""
outputs = ('name', 'x_slit', 'y_slit', 'lam')
""" Name of the slit, x and y coordinates within the virtual slit and wavelength."""
def __init__(self, slits, models):
if isiterable(slits[0]):
self._slits = [tuple(s) for s in slits]
self.slit_ids = [s[0] for s in self._slits]
else:
self._slits = list(slits)
self.slit_ids = self._slits
self.models = models
super(Gwa2Slit, self).__init__()
@property
def slits(self):
if isiterable(self._slits[0]):
return [Slit(*row) for row in self._slits]
else:
return self.slit_ids
[docs] def get_model(self, name):
index = self.slit_ids.index(name)
return self.models[index]
[docs] def evaluate(self, name, x, y, z):
index = self.slit_ids.index(name)
return (name, ) + self.models[index](x, y, z)
[docs]class Slit2Msa(Model):
"""
NIRSpec slit to MSA transform.
Parameters
----------
slits : list
A list of open slits.
A slit is a namedtupe, `~jwst.transforms.models.Slit`
Slit("name", "shutter_id", "xcen", "ycen", "ymin", "ymax",
"quadrant", "source_id", "shutter_state", "source_name",
"source_alias", "stellarity", "source_xpos", "source_ypos")
models : list
List of models (`~astropy.modeling.core.Model`) corresponding to the
list of slits.
"""
_separable = False
inputs = ('name', 'x_slit', 'y_slit')
""" Name of the slit, x and y coordinates within the virtual slit."""
outputs = ('x_msa', 'y_msa')
""" x and y coordinates in the MSA frame."""
def __init__(self, slits, models):
super(Slit2Msa, self).__init__()
if isiterable(slits[0]):
self._slits = [tuple(s) for s in slits]
self.slit_ids = [s[0] for s in self._slits]
else:
self._slits = list(slits)
self.slit_ids = self._slits
self.models = models
@property
def slits(self):
if isiterable(self._slits[0]):
return [Slit(*row) for row in self._slits]
else:
return self.slit_ids
[docs] def get_model(self, name):
index = self.slit_ids.index(name)
return self.models[index]
[docs] def evaluate(self, name, x, y):
index = self.slit_ids.index(name)
return self.models[index](x, y)
[docs]class NirissSOSSModel(Model):
"""
NIRISS SOSS wavelength solution implemented as a Model.
Parameters
----------
spectral_orders : list of int
Spectral orders for which there is a wavelength solution.
models : list of `~astropy.modeling.core.Model`
A list of transforms representing the wavelength solution for
each order in spectral orders. It should match the order in
``spectral_orders``.
"""
_separable = False
inputs = ('x', 'y', 'spectral_order')
""" x and y pixel coordinates and spectral order"""
outputs = ('ra', 'dec', 'lam')
""" RA and DEC coordinates and wavelength"""
def __init__(self, spectral_orders, models):
super(NirissSOSSModel, self).__init__()
self.spectral_orders = spectral_orders
self.models = dict(zip(spectral_orders, models))
[docs] def get_model(self, spectral_order):
return self.models[spectral_order]
[docs] def evaluate(self, x, y, spectral_order):
# The spectral_order variable is coming in as an array/list of one element.
# So, we are going to just take the 0'th element and use that as the index.
try:
order_number = int(spectral_order[0])
except Exception as e:
raise ValueError('Spectral order is not between 1 and 3, {}'.format(spectral_order))
return self.models[order_number](x, y)
[docs]class Logical(Model):
"""
Substitute values in an array where the condition is evaluated to True.
Similar to numpy's where function.
Parameters
----------
condition : str
A string representing the logical, one of GT, LT, NE, EQ
compareto : float, ndarray
A number to compare to using the condition
If ndarray then the input array, compareto and value should have the
same shape.
value : float, ndarray
Value to substitute where condition is True.
"""
inputs = ('x', )
outputs = ('x', )
_separable = False
conditions = {'GT': np.greater,
'LT': np.less,
'EQ': np.equal,
'NE': np.not_equal
}
def __init__(self, condition, compareto, value, **kwargs):
self.condition = condition.upper()
self.compareto = compareto
self.value = value
super(Logical, self).__init__(**kwargs)
[docs] def evaluate(self, x):
x = x.copy()
m = ~np.isnan(x)
m_ind = np.flatnonzero(m)
if isinstance(self.compareto, np.ndarray):
cond = self.conditions[self.condition](x[m], self.compareto[m])
x.flat[m_ind[cond]] = self.value.flat[m_ind[cond]]
else:
cond = self.conditions[self.condition](x[m], self.compareto)
x.flat[m_ind[cond]] = self.value
return x
def __repr__(self):
txt = "{0}(condition={1}, compareto={2}, value={3})"
return txt.format(self.__class__.__name__, self.condition,
self.compareto, self.value)
[docs]class V23ToSky(Rotation3D):
"""
Transform from V2V3 to a standard coordinate system (ICRS).
Parameters
----------
angles : list
A sequence of angles (in deg).
The angles are [-V2_REF, V3_REF, -ROLL_REF, -DEC_REF, RA_REF].
axes_order : str
A sequence of characters ('x', 'y', or 'z') corresponding to the
axis of rotation and matching the order in ``angles``.
The axes are "zyxyz".
"""
_separable = False
inputs = ("v2", "v3")
""" ("v2", "v3"): Coordinates in the (V2, V3) telescope frame."""
outputs = ("ra", "dec")
""" ("ra", "dec"): RA, DEC cooridnates in ICRS."""
def __init__(self, angles, axes_order, name=None):
super(V23ToSky, self).__init__(angles, axes_order=axes_order, name=name)
[docs] @staticmethod
def spherical2cartesian(alpha, delta):
"""
Convert spherical coordinates (in deg) to cartesian.
"""
alpha = np.deg2rad(alpha)
delta = np.deg2rad(delta)
x = np.cos(alpha) * np.cos(delta)
y = np.cos(delta) * np.sin(alpha)
z = np.sin(delta)
return np.array([x, y, z])
[docs] @staticmethod
def cartesian2spherical(x, y, z):
"""
Convert cartesian coordinates to spherical coordinates (in deg).
"""
h = np.hypot(x, y)
alpha = np.rad2deg(np.arctan2(y, x))
delta = np.rad2deg(np.arctan2(z, h))
return alpha, delta
[docs] def evaluate(self, v2, v3, angles):
x, y, z = self.spherical2cartesian(v2, v3)
x1, y1, z1 = super(V23ToSky, self).evaluate(x, y, z, angles)
ra, dec = self.cartesian2spherical(x1, y1, z1)
return ra, dec
[docs] def __call__(self, v2, v3):
from itertools import chain
inputs, format_info = self.prepare_inputs(v2, v3)
parameters = self._param_sets(raw=True)
outputs = self.evaluate(*chain([v2, v3], parameters))
if self.n_outputs == 1:
outputs = (outputs,)
return self.prepare_outputs(format_info, *outputs)
[docs]class IdealToV2V3(Model):
"""
Performs the transform from Ideal to telescope V2,V3 coordinate system.
The two systems have the same origin: V2_REF, V3_REF.
Note: This model has no schema implemented - add schema if needed.
"""
_separable = False
inputs = ('xidl', 'yidl')
""" x and y coordinates in the telescope Ideal frame."""
outputs = ('v2', 'v3')
""" coorinates in the telescope (V2,V3) frame."""
v3idlyangle = Parameter() # in deg
v2ref = Parameter() # in arcsec
v3ref = Parameter() # in arcsec
vparity = Parameter()
def __init__(self, v3idlyangle, v2ref, v3ref, vparity, name='idl2V', **kwargs):
super(IdealToV2V3, self).__init__(v3idlyangle=v3idlyangle, v2ref=v2ref,
v3ref=v3ref, vparity=vparity, name=name,
**kwargs)
[docs] @staticmethod
def evaluate(xidl, yidl, v3idlyangle, v2ref, v3ref, vparity):
"""
Parameters
----------
xidl, yidl : ndarray-like
Coordinates in Ideal System [in arcsec]
v3idlyangle : float
Angle between Ideal Y-axis and V3 [ in deg]
v2ref, v3ref : ndarray-like
Coordinates in V2, V3 [in arcsec]
vparity : int
Parity.
Returns
-------
v2, v3 : ndarray-like
Coordinates in the (V2, V3) telescope system [in arcsec].
"""
v3idlyangle = np.deg2rad(v3idlyangle)
v2 = v2ref + vparity * xidl * np.cos(v3idlyangle) + yidl * np.sin(v3idlyangle)
v3 = v3ref - vparity * xidl * np.sin(v3idlyangle) + yidl * np.cos(v3idlyangle)
return v2, v3
def inverse(self):
return V2V3ToIdeal(self.v3idlyangle, self.v2ref, self.v3ref, self.vparity)
[docs]class V2V3ToIdeal(Model):
"""
Performs the transform from telescope V2,V3 to Ideal coordinate system.
The two systems have the same origin - V2_REF, V3_REF.
Note: This model has no schema implemented - add if needed.
"""
_separable = False
inputs = ('v2', 'v3')
""" ('v2', 'v3'): coorinates in the telescope (V2,V3) frame."""
outputs = ('xidl', 'yidl')
""" ('xidl', 'yidl'): x and y coordinates in the telescope Ideal frame."""
v3idlyangle = Parameter() # in deg
v2ref = Parameter() # in arcsec
v3ref = Parameter() # in arcsec
vparity = Parameter()
def __init__(self, v3idlyangle, v2ref, v3ref, vparity, name='V2idl', **kwargs):
super(V2V3ToIdeal, self).__init__(v3idlyangle=v3idlyangle, v2ref=v2ref,
v3ref=v3ref, vparity=vparity, name=name,
**kwargs)
[docs] @staticmethod
def evaluate(v2, v3, v3idlyangle, v2ref, v3ref, vparity):
"""
Parameters
----------
xidl, yidl : ndarray-like
Coordinates in Ideal System [in arcsec]
v3idlyangle : float
Angle between Ideal Y-axis and V3 [ in deg]
v2ref, v3ref : ndarray-like
Coordinates in V2, V3 [in arcsec]
vparity : int
Parity.
Returns
-------
xidl, yidl : ndarray-like
Coordinates in the Ideal telescope system [in arcsec].
"""
v3idlyangle = np.deg2rad(v3idlyangle)
xidl = vparity * ((v2 - v2ref) * np.cos(v3idlyangle) -
(v3 - v3ref) * np.sin(v3idlyangle))
yidl = ((v2 - v2ref) * np.sin(v3idlyangle) +
(v3 - v3ref) * np.cos(v3idlyangle))
return xidl, yidl
def inverse(self):
return IdealToV2V3(self.v3idlyangle, self.v2ref, self.v3ref, self.vparity)
def _toindex(value):
"""
Convert value to an int or an int array.
Input coordinates converted to integers
corresponding to the center of the pixel.
The convention is that the center of the pixel is
(0, 0), while the lower left corner is (-0.5, -0.5).
Examples
--------
>>> _toindex(np.array([-0.5, 0.49999]))
array([0, 0])
>>> _toindex(np.array([0.5, 1.49999]))
array([1, 1])
>>> _toindex(np.array([1.5, 2.49999]))
array([2, 2])
"""
indx = np.asarray(np.floor(np.asarray(value) + 0.5), dtype=np.int)
return indx
[docs]class NIRCAMForwardRowGrismDispersion(Model):
"""Return the transform from grism to image for the given spectral order.
Parameters
----------
orders : list [int]
List of orders which are available
lmodels : list [astropy.modeling.Model]
List of models which govern the wavelength solutions for each order
xmodels : list [astropy.modeling.Model]
List of models which govern the x solutions for each order
ymodels : list [astropy.modeling.Model]
List of models which givern the y solutions for each order
Returns
-------
x, y, wavelength, order in the grism image for the pixel at x0,y0 that was
specified as input using the input delta pix for the specified order
Notes
-----
The evaluation here is linear currently because higher orders have not yet been
defined for NIRCAM (NIRCAM polynomials currently do not have any field
dependence)
"""
standard_broadcasting = False
_separable = False
fittable = False
linear = False
inputs = ("x", "y", "x0", "y0", "order")
outputs = ("x", "y", "wavelength", "order")
def __init__(self, orders, lmodels=None, xmodels=None,
ymodels=None, name=None, meta=None):
self.orders = orders
self.lmodels = lmodels
self.xmodels = xmodels
self.ymodels = ymodels
self._order_mapping = {int(k): v for v, k in enumerate(orders)}
meta = {"orders": orders} # informational for users
if name is None:
name = 'nircam_forward_row_grism_dispersion'
super(NIRCAMForwardRowGrismDispersion, self).__init__(name=name,
meta=meta)
[docs] def evaluate(self, x, y, x0, y0, order):
"""Return the transform from grism to image for the given spectral order.
Parameters
----------
x : float
input x pixel
y : float
intput y pixel
x0 : float
input x-center of object
y0 : float
input y-center of object
order : int
the spectral order to use
"""
try:
iorder = self._order_mapping[int(order)]
except KeyError:
raise ValueError("Specified order is not available")
# for accepting the dy and known source object center
t = self.xmodels[iorder](x - x0)
dy = self.ymodels[iorder](t)
wavelength = self.lmodels[iorder](t)
return (x0, y0+dy, wavelength, order)
[docs]class NIRCAMForwardColumnGrismDispersion(Model):
"""Return the transform from grism to image for the given spectral order.
Parameters
----------
orders : list [int]
List of orders which are available
lmodels : list [astropy.modeling.Model]
List of models which govern the wavelength solutions
xmodels : list [astropy.modeling.Model]
List of models which govern the x solutions
ymodels : list [astropy.modeling.Model]
List of models which givern the y solutions
Returns
-------
x, y, lam, order in the grism image for the pixel at x0,y0 that was
specified as input using the input delta pix for the specified order
Notes
-----
The evaluation here is lineaer because higher orders have not yet been
defined for NIRCAM (NIRCAM polynomials currently do not have any field
dependence)
"""
standard_broadcasting = False
_separable = False
fittable = False
linear = False
inputs = ("x", "y", "x0", "y0", "order")
outputs = ("x", "y", "wavelength", "order")
def __init__(self, orders, lmodels=None, xmodels=None,
ymodels=None, name=None, meta=None):
self.orders = orders
self.lmodels = lmodels
self.xmodels = xmodels
self.ymodels = ymodels
self._order_mapping = {int(k): v for v, k in enumerate(orders)}
meta = {"orders": orders} # informational for users
if name is None:
name = 'nircam_forward_column_grism_dispersion'
super(NIRCAMForwardColumnGrismDispersion, self).__init__(name=name,
meta=meta)
[docs] def evaluate(self, x, y, x0, y0, order):
"""Return the transform from grism to image for the given spectral order.
Parameters
----------
x : float
input x pixel
y : float
intput y pixel
x0 : float
input x-center of object
y0 : float
input y-center of object
order : int
the spectral order to use
"""
try:
iorder = self._order_mapping[int(order)]
except KeyError:
raise ValueError("Specified order is not available")
# for accepting the dy and known source object center
t = self.ymodels[iorder](y-y0)
dx = self.xmodels[iorder](t)
wavelength = self.lmodels[iorder](t)
return (x0+dx, y0, wavelength, order)
[docs]class NIRCAMBackwardGrismDispersion(Model):
"""Return the valid pixel(s) and wavelengths given center x,y and lam
Parameters
----------
orders : list [int]
List of orders which are available
lmodels : list [astropy.modeling.Model]
List of models which govern the wavelength solutions
xmodels : list [astropy.modeling.Model]
List of models which govern the x solutions
ymodels : list [astropy.modeling.Model]
List of models which givern the y solutions
Returns
-------
x, y, lam, order in the grism image for the pixel at x0,y0 that was
specified as input using the wavelength l for the specified order
Notes
-----
The evaluation here is lineaer because higher orders have not yet been defined for NIRCAM
(NIRCAM polynomials currently do not have any field dependence)
"""
standard_broadcasting = False
_separable = False
fittable = False
linear = False
inputs = ("x", "y", "wavelength", "order")
outputs = ("x", "y", "x0", "y0", "order")
def __init__(self, orders, lmodels=None, xmodels=None,
ymodels=None, name=None, meta=None):
self._order_mapping = {int(k): v for v, k in enumerate(orders)}
self.lmodels = lmodels
self.xmodels = xmodels
self.ymodels = ymodels
self.orders = orders
meta = {"orders": orders}
if name is None:
name = "nircam_backward_grism_dispersion"
super(NIRCAMBackwardGrismDispersion, self).__init__(name=name,
meta=meta)
[docs] def evaluate(self, x, y, wavelength, order):
"""Return the tranfrom from image to grism for the given spectral order.
Parameters
----------
x : float
input x pixel
y : float
intput y pixel
wavelength : float
input wavelength in angstroms
order : int
specifies the spectral order
"""
try:
iorder = self._order_mapping[int(order)]
except KeyError:
raise ValueError("Specified order is not available")
if wavelength < 0:
raise ValueError("wavelength should be greater than zero")
t = self.lmodels[iorder](float(wavelength))
dx = self.xmodels[iorder](float(t))
dy = self.ymodels[iorder](float(t))
return (x+dx, y+dy, x, y, order)
[docs]class NIRISSBackwardGrismDispersion(Model):
"""This model calculates the dispersion extent of NIRISS pixels.
The dispersion is relative to the input x,y for a given wavelength.
Parameters
----------
xmodels : list[tuple]
The list of tuple(models) for the polynomial model in x
ymodels : list[tuple]
The list of tuple(models) for the polynomial model in y
lmodels : list
The list of models for the polynomial model in l
orders : list
The list of orders which are available to the model
theta : float
The rotation to apply
Notes
-----
Given the x,y, wave, order as known on the direct image,
it returns the tuple of x, y, wave, order for that wave in the dispersed image.
This model needs to be generalized, at the moment it satisfies the
2t x 6(xy)th order polynomial currently used by NIRISS.
There's spatial dependence for NIRISS so the forward transform is
iterative
"""
standard_broadcasting = False
_separable = False
fittable = False
linear = False
inputs = ("x", "y", "wavelength", "order")
outputs = ("x", "y", "x0", "y0", "order")
def __init__(self, orders, lmodels=None, xmodels=None,
ymodels=None, theta=None, name=None, meta=None):
self._order_mapping = {int(k): v for v, k in enumerate(orders)}
self.xmodels = xmodels
self.ymodels = ymodels
self.lmodels = lmodels
self.orders = orders
self.theta = theta
meta = {"orders": orders}
if name is None:
name = 'niriss_backward_grism_dispersion'
super(NIRISSBackwardGrismDispersion, self).__init__(name=name,
meta=meta)
[docs] def evaluate(self, x, y, wavelength, order):
"""Return the valid pixel(s) and wavelengths given center x,y and lam
Parameters
----------
wavelength : int,float
Input wavelength you want to know about, will be converted to float
x : int,float
Input x location
y : int,float
Input y location
wavelength : float
Wavelength to disperse
order : list
The order to use
Returns
-------
x, y, wavelength, order in the grism image for the pixel at x,y that was
specified as input using the wavelength and order specified
Notes
-----
There's spatial dependence for NIRISS so the forward transform
dependes on x,y as well as the filter wheel rotation. Theta is
usu. taken to be the different between fwcpos_ref in the specwcs
reference file and fwcpos from the input image.
"""
if wavelength < 0:
raise ValueError("Wavelength should be greater than zero")
try:
iorder = self._order_mapping[int(order)]
except KeyError:
raise ValueError("Specified order is not available")
t = self.lmodels[iorder](wavelength)
# use that t to compute the dx and dy
dx = self.xmodels[iorder][0](x, y) + t * self.xmodels[iorder][1](x, y)
dy = self.ymodels[iorder][0](x, y) + t * self.ymodels[iorder][1](x, y)
# rotate by theta
if self.theta != 0.0:
rotate = Rotation2D(self.theta)
dx, dy = rotate(dx, dy)
return (x+dx, y+dy, x, y, order)
[docs]class NIRISSForwardRowGrismDispersion(Model):
"""This model calculates the dispersion extent of NIRISS pixels.
The dispersion polynomial is relative to the input x,y pixels
in the direct image for a given wavelength.
Parameters
----------
xmodels : list[tuples]
The list of tuple(models) for the polynomial model in x
ymodels : list[tuples]
The list of tuple(models) for the polynomial model in y
lmodels : list
The list of models for the polynomial model in l
orders : list
The list of orders which are available to the model
Notes
-----
Given the x,y, source location as known on the dispersed image, as well as order,
it returns the tuple of x,y,wavelength,order.
This model needs to be generalized, at the moment it satisfies the
2t x 6(xy)th order polynomial currently used by NIRISS.
"""
standard_broadcasting = False
_separable = False
fittable = False
linear = False
# starts with the backwards pixel and calculates the forward pixel
inputs = ("x", "y", "x0", "y0", "order")
outputs = ("x", "y", "wavelength", "order")
def __init__(self, orders, lmodels=None, xmodels=None,
ymodels=None, theta=0., name=None, meta=None):
self._order_mapping = {int(k): v for v, k in enumerate(orders)}
self.xmodels = xmodels
self.ymodels = ymodels
self.lmodels = lmodels
self.theta = theta
self.orders = orders
meta = {"orders": orders}
if name is None:
name = 'niriss_forward_row_grism_dispersion'
super(NIRISSForwardRowGrismDispersion, self).__init__(name=name,
meta=meta)
[docs] def evaluate(self, x, y, x0, y0, order):
"""Return the valid pixel(s) and wavelengths given center x,y and lam
Parameters
----------
x0: int,float,list
Source object x-center
y0: int,float,list
Source object y-center
x : int,float,list
Input x location
y : int,float,list
Input y location
order : int
Spectral order to use
Returns
-------
x, y, lambda, order, theta, in the direct image for the pixel that was
specified as input using the wavelength l and spectral order
Notes
-----
There's spatial dependence for NIRISS as well as dependence on the
filter wheel rotation during the exposure.
"""
try:
iorder = self._order_mapping[int(order)]
except KeyError:
raise ValueError("Specified order is not available")
dxr = x-x0 # delta x in rotated trace coordinates
t = np.linspace(0, 1, 10) #sample t
dx = self.xmodels[iorder][0](x0, y0) + t * self.xmodels[iorder][1](x0, y0)
dy = self.ymodels[iorder][0](x0, y0) + t * self.ymodels[iorder][1](x0, y0)
if self.theta != 0.0:
rotate = Rotation2D(self.theta)
dx, dy = rotate(dx, dy)
so = np.argsort(dx)
tr = np.interp(dxr, dx[so], t[so])
wavelength = self.lmodels[iorder](tr)
return (x0, y0, wavelength, order)
[docs]class NIRISSForwardColumnGrismDispersion(Model):
"""This model calculates the dispersion extent of NIRISS pixels.
The dispersion polynomial is relative to the input x,y pixels
in the direct image for a given wavelength.
Parameters
----------
xmodels : list[tuple]
The list of tuple(models) for the polynomial model in x
ymodels : list[tuple]
The list of tuple(models) for the polynomial model in y
lmodels : list
The list of models for the polynomial model in l
orders : list
The list of orders which are available to the model
Notes
-----
Given the x,y, source location, order, it returns the tuple of
x,y,wavelength,order on the dispersed image. It also requires
FWCPOS from the image header, this is the filter wheel position
in degrees.
"""
standard_broadcasting = False
_separable = False
fittable = False
linear = False
# starts with the backwards pixel and calculates the forward pixel
inputs = ("x", "y", "x0", "y0", "order")
outputs = ("x", "y", "wavelength", "order")
def __init__(self, orders, lmodels=None, xmodels=None,
ymodels=None, theta=None, name=None, meta=None):
self._order_mapping = {int(k): v for v, k in enumerate(orders)}
self.xmodels = xmodels
self.ymodels = ymodels
self.lmodels = lmodels
self.orders = orders
self.theta = theta
meta = {"orders": orders}
if name is None:
name = 'niriss_forward_column_grism_dispersion'
super(NIRISSForwardColumnGrismDispersion, self).__init__(name=name,
meta=meta)
[docs] def evaluate(self, x, y, x0, y0, order):
"""Return the valid pixel(s) and wavelengths given center x,y and lam
Parameters
----------
x0: int,float
Source object x-center
y0: int,float
Source object y-center
x : int,float
Input x location
y : int,float
Input y location
order : int
Spectral order to use
theta : float
input filter wheel rotation angle in degrees
Returns
-------
x, y, lambda, order, in the direct image for the pixel that was
specified as input using the wavelength l and spectral order
Notes
-----
There's spatial dependence for NIRISS as well as rotation for the filter wheel
"""
try:
iorder = self._order_mapping[int(order)]
except KeyError:
raise ValueError("Specified order is not available")
dyr = y - y0 # delta x in rotated trace coordinate
t = np.linspace(0, 1, 10)
dx = self.xmodels[iorder][0](x0, y0) + t * self.xmodels[iorder][1](x0, y0)
dy = self.ymodels[iorder][0](x0, y0) + t * self.ymodels[iorder][1](x0, y0)
if self.theta != 0.0:
rotate = Rotation2D(self.theta)
dx, dy = rotate(dx, dy)
so = np.argsort(dy)
tr = np.interp(dyr, dy[so], t[so])
wavelength = self.lmodels[iorder](tr)
return (x0, y0, wavelength, order)