Source code for jwst.tweakreg.wcsimage

"""
This module provides support for working with image footprints on the sky,
source catalogs, and setting and manipulating tangent-plane corrections
of image WCS.

:Authors: Mihai Cara (contact: help@stsci.edu)


"""

# STDLIB
import logging
import numpy as np
from copy import deepcopy

# THIRD-PARTY
import gwcs
from astropy import table
from spherical_geometry.polygon import SphericalPolygon
from stsci.stimage import xyxymatch

# LOCAL
from ..transforms.tpcorr import TPCorr, rot_mat3D
from . import linearfit
from . import matchutils


__all__ = ['convex_hull', 'ImageWCS', 'RefCatalog', 'WCSImageCatalog',
           'WCSGroupCatalog']


log = logging.getLogger(__name__)
log.setLevel(logging.DEBUG)


[docs]class ImageWCS(): """ A class for holding JWST GWCS information and for managing tangent-plane corrections. """ def __init__(self, wcs, v2_ref, v3_ref, roll_ref, ra_ref, dec_ref): """ Parameters ---------- wcs : GWCS A `GWCS` object. v2ref : float V2 position of the reference point in degrees. v3ref : float V3 position of the reference point in degrees. roll : float Roll angle in degrees. ra_ref : float RA of the reference point in degrees. dec_ref : float DEC of the reference point in degrees. """ if not self._check_wcs_structure(wcs): raise ValueError("Unsupported WCS structure.") self._ra_ref = ra_ref self._dec_ref = dec_ref self._v2_ref = v2_ref self._v3_ref = v3_ref self._roll_ref = roll_ref # perform additional check that if tangent plane correction is already # present in the WCS pipeline, it is of TPCorr class and that # its parameters are consistent with reference angles: frms = [f[0] for f in wcs.pipeline] if 'v2v3corr' in frms: self._v23name = 'v2v3corr' self._tpcorr = deepcopy(wcs.pipeline[frms.index('v2v3corr')-1][1]) self._default_tpcorr = None if not isinstance(self._tpcorr, TPCorr): raise ValueError("Unsupported tangent-plance correction type.") # check that transformation parameters are consistent with # reference angles: v2ref = self._tpcorr.v2ref.value v3ref = self._tpcorr.v3ref.value roll = self._tpcorr.roll.value eps_v2 = 10.0 * np.finfo(v2_ref).eps eps_v3 = 10.0 * np.finfo(v3_ref).eps eps_roll = 10.0 * np.finfo(roll_ref).eps if not (np.isclose(v2_ref, v2ref, rtol=eps_v2) and np.isclose(v3_ref, v3ref, rtol=eps_v3) and np.isclose(roll_ref, roll, rtol=eps_roll)): raise ValueError( "WCS/TPCorr parameters 'v2ref', 'v3ref', and/or 'roll' " "differ from the corresponding reference values." ) else: self._v23name = 'v2v3' self._tpcorr = None self._default_tpcorr = TPCorr( v2ref=v2_ref, v3ref=v3_ref, roll=roll_ref, name='tangent-plane linear correction' ) self._owcs = wcs self._wcs = deepcopy(wcs) self._update_transformations() def _update_transformations(self): # define transformations from detector/world coordinates to # the tangent plane: detname = self._wcs.pipeline[0][0] worldname = self._wcs.pipeline[-1][0] self._world_to_v23 = self._wcs.get_transform(worldname, self._v23name) self._v23_to_world = self._wcs.get_transform(self._v23name, worldname) self._det_to_v23 = self._wcs.get_transform(detname, self._v23name) self._v23_to_det = self._wcs.get_transform(self._v23name, detname) self._det_to_world = self._wcs.__call__ self._world_to_det = self._wcs.invert @property def ref_angles(self): """ Return a ``wcsinfo``-like dictionary of main WCS parameters. """ wcsinfo = { 'ra_ref': self._ra_ref, 'dec_ref': self._dec_ref, 'v2_ref': self._v2_ref, 'v3_ref': self._v3_ref, 'roll_ref': self._roll_ref } return wcsinfo @property def wcs(self): """ Get current GWCS object. """ return self._wcs @property def original_wcs(self): """ Get original GWCS object. """ return self._owcs
[docs] def copy(self): """ Returns a deep copy of this `ImageWCS` object. """ return deepcopy(self)
[docs] def set_correction(self, matrix=[[1, 0], [0, 1]], shift=[0, 0]): """ Sets a tangent-plane correction of the GWCS object according to the provided liniar parameters. Parameters ---------- matrix : list, numpy.ndarray A ``2x2`` array or list of lists coefficients representing scale, rotation, and/or skew transformations. shift : list, numpy.ndarray A list of two coordinate shifts to be applied to coordinates *before* ``matrix`` transformations are applied. """ frms = [f[0] for f in self._wcs.pipeline] # if original WCS did not have tangent-plane corrections, create # new correction and add it to the WCs pipeline: if self._tpcorr is None: self._tpcorr = TPCorr( v2ref=self._v2_ref, v3ref=self._v3_ref, roll=self._roll_ref, matrix=matrix, shift=shift, name='tangent-plane linear correction' ) idx_v2v3 = frms.index(self._v23name) pipeline = deepcopy(self._wcs.pipeline) pf, pt = pipeline[idx_v2v3] pipeline[idx_v2v3] = (pf, deepcopy(self._tpcorr)) pipeline.insert(idx_v2v3 + 1, ('v2v3corr', pt)) # The following is a hack around the fact that gwcs does not # currently provide support for inserting a step. for i in range(len(pipeline)): try: frame = getattr(self._wcs, pipeline[i][0]) except AttributeError: continue pipeline[i] = (frame, pipeline[i][1]) self._wcs = gwcs.WCS(pipeline, name=self._owcs.name) self._v23name = 'v2v3corr' else: # combine old and new corrections into a single one and replace # old transformation with the combined correction transformation: tpcorr2 = self._tpcorr.__class__( v2ref=self._tpcorr.v2ref, v3ref=self._tpcorr.v3ref, roll=self._tpcorr.roll, matrix=matrix, shift=shift, name='tangent-plane linear correction' ) self._tpcorr = tpcorr2.combine(tpcorr2, self._tpcorr) idx_v2v3 = frms.index(self._v23name) pipeline = deepcopy(self._wcs.pipeline) pipeline[idx_v2v3 - 1] = (pipeline[idx_v2v3 - 1][0], deepcopy(self._tpcorr)) self._wcs = gwcs.WCS(pipeline, name=self._owcs.name) # reset definitions of the transformations from detector/world # coordinates to the tangent plane: self._update_transformations()
def _check_wcs_structure(self, wcs): if wcs is None or wcs.pipeline is None: return False frms = [f[0] for f in wcs.pipeline] nframes = len(frms) if nframes < 3: return False if frms.count(frms[0]) > 1 or frms.count(frms[-1]) > 1: return False if frms.count('v2v3') != 1: return False idx_v2v3 = frms.index('v2v3') if idx_v2v3 == 0 or idx_v2v3 == (nframes - 1): return False nv2v3corr = frms.count('v2v3corr') if nv2v3corr == 0: return True elif nv2v3corr > 1: return False idx_v2v3corr = frms.index('v2v3corr') if idx_v2v3corr != (idx_v2v3 + 1) or idx_v2v3corr == (nframes - 1): return False return True
[docs] def det_to_world(self, x, y): """ Convert pixel coordinates to sky coordinates using full (i.e., including distortions) transformations. """ ra, dec = self._det_to_world(x, y) return ra, dec
[docs] def world_to_det(self, ra, dec): """ Convert sky coordinates to image's pixel coordinates using full (i.e., including distortions) transformations. """ x, y = self._world_to_det(ra, dec) return x, y
[docs] def det_to_tanp(self, x, y): """ Convert detector (pixel) coordinates to tangent plane coordinates. """ tpc = self._default_tpcorr if self._tpcorr is None else self._tpcorr v2, v3 = self._det_to_v23(x, y) x, y = tpc.v2v3_to_tanp(v2, v3) return x, y
[docs] def tanp_to_det(self, x, y): """ Convert tangent plane coordinates to detector (pixel) coordinates. """ tpc = self._default_tpcorr if self._tpcorr is None else self._tpcorr v2, v3 = tpc.tanp_to_v2v3(x, y) x, y = self._v23_to_det(v2, v3) return x, y
[docs] def world_to_tanp(self, ra, dec): """ Convert tangent plane coordinates to detector (pixel) coordinates. """ tpc = self._default_tpcorr if self._tpcorr is None else self._tpcorr v2, v3 = self._world_to_v23(ra, dec) x, y = tpc.v2v3_to_tanp(v2, v3) return x, y
[docs] def tanp_to_world(self, x, y): """ Convert tangent plane coordinates to world coordinates. """ tpc = self._default_tpcorr if self._tpcorr is None else self._tpcorr v2, v3 = tpc.tanp_to_v2v3(x, y) ra, dec = self._v23_to_world(v2, v3) return ra, dec
[docs]class WCSImageCatalog(): """ A class that holds information pertinent to an image WCS and a source catalog of the sources found in that image. """ def __init__(self, shape, wcs, ref_angles, catalog, name=None, meta={}): """ Parameters ---------- shape : tuple A tuple of two integer values indicating the size of the image along each axis. Must follow the same convention as the shape of a :py:class:`numpy.ndarray` objects. Specifically, first size should be indicate the number of rows in the image and second size should indicate the number of columns in the image. wcs : gwcs.WCS WCS associated with the image and the catalog. ref_angles : dict A Python dictionary providing essential WCS reference angles. This parameter must contain at least the following keys: ``ra_ref``, ``dec_ref``, ``v2_ref``, ``v3_ref``, and ``roll_ref``. catalog : astropy.table.Table Source catalog associated with an image. Must contain 'x' and 'y' columns which indicate source coordinates (in pixels) in the associated image. name : str, None, optional Image catalog's name. meta : dict, optional Additional information about image, catalog, and/or WCS to be stored (for convenience) within `WCSImageCatalog` object. """ self._shape = shape self._name = name self._catalog = None self._bb_radec = None self._imwcs = None self.img_bounding_ra = None self.img_bounding_dec = None self.meta = {} self.meta.update(meta) self.set_wcs(wcs, ref_angles) self.catalog = catalog @property def imwcs(self): """ Get :py:class:`ImageWCS` WCS. """ return self._imwcs @property def wcs(self): """ Get :py:class:`gwcs.WCS`. """ if self._imwcs is None: return None return self._imwcs.wcs @property def ref_angles(self): """ Get ``wcsinfo``. """ if self._imwcs is None: return None return self._imwcs.ref_angles
[docs] def set_wcs(self, wcs, ref_angles): """ Set :py:class:`gwcs.WCS` and the associated ``wcsinfo```. .. note:: Setting the WCS triggers automatic bounding polygon recalculation. Parameters ---------- wcs : gwcs.WCS WCS object. ref_angles : dict A Python dictionary providing essential WCS reference angles. This parameter must contain at least the following keys: ``ra_ref``, ``dec_ref``, ``v2_ref``, ``v3_ref``, and ``roll_ref``. """ for key in ['dec_ref', 'v2_ref', 'v3_ref', 'roll_ref']: if key not in ref_angles: raise KeyError("Parameter 'ref_angles' must contain " "'{:s}' key.".format(key)) self._imwcs = ImageWCS( wcs=wcs, v2_ref=ref_angles['v2_ref'], v3_ref=ref_angles['v3_ref'], roll_ref=ref_angles['roll_ref'], ra_ref=ref_angles['ra_ref'], dec_ref=ref_angles['dec_ref'] ) # create spherical polygon bounding the image self.calc_bounding_polygon()
@property def name(self): """ Get/set :py:class:`WCSImageCatalog` object's name. """ return self._name @name.setter def name(self, name): self._name = name if hasattr(self, '_catalog'): if self._catalog is not None: self._catalog.meta['name'] = name @property def imshape(self): """ Get/set image's shape. This must be a tuple of two dimensions following the same convention as the shape of `numpy.ndarray`. """ return self._shape @imshape.setter def imshape(self, shape): self._shape = shape @property def catalog(self): """ Get/set image's catalog. """ return self._catalog @catalog.setter def catalog(self, catalog): if catalog is None: self._catalog = None return else: self._catalog = table.Table(catalog.copy(), masked=True) self._catalog.meta['name'] = self._name # create spherical polygon bounding the image self.calc_bounding_polygon()
[docs] def det_to_world(self, x, y): """ Convert pixel coordinates to sky coordinates using full (i.e., including distortions) transformations. """ if self._imwcs is None: raise RuntimeError("WCS has not been set") return self._imwcs.det_to_world(x, y)
[docs] def world_to_det(self, ra, dec): """ Convert sky coordinates to image's pixel coordinates using full (i.e., including distortions) transformations. """ if self._imwcs is None: raise RuntimeError("WCS has not been set") return self._imwcs.world_to_det(ra, dec)
[docs] def det_to_tanp(self, x, y): """ Convert detector (pixel) coordinates to tangent plane coordinates. """ if self._imwcs is None: raise RuntimeError("WCS has not been set") return self._imwcs.det_to_tanp(x, y)
[docs] def tanp_to_det(self, x, y): """ Convert tangent plane coordinates to detector (pixel) coordinates. """ if self._imwcs is None: raise RuntimeError("WCS has not been set") return self._imwcs.tanp_to_det(x, y)
[docs] def tanp_to_world(self, x, y): """ Convert tangent plane coordinates to world coordinates. """ if self._imwcs is None: raise RuntimeError("WCS has not been set") return self._imwcs.tanp_to_world(x, y)
[docs] def world_to_tanp(self, ra, dec): """ Convert tangent plane coordinates to detector (pixel) coordinates. """ if self._imwcs is None: raise RuntimeError("WCS has not been set") return self._imwcs.world_to_tanp(ra, dec)
@property def polygon(self): """ Get image's (or catalog's) bounding spherical polygon. """ return self._polygon
[docs] def intersection(self, wcsim): """ Compute intersection of this `WCSImageCatalog` object and another `WCSImageCatalog`, `WCSGroupCatalog`, or :py:class:`~spherical_geometry.polygon.SphericalPolygon` object. Parameters ---------- wcsim : WCSImageCatalog, WCSGroupCatalog, SphericalPolygon Another object that should be intersected with this `WCSImageCatalog`. Returns ------- polygon : SphericalPolygon A :py:class:`~spherical_geometry.polygon.SphericalPolygon` that is the intersection of this `WCSImageCatalog` and `wcsim`. """ if isinstance(wcsim, (WCSImageCatalog, WCSGroupCatalog)): return self._polygon.intersection(wcsim.polygon) else: return self._polygon.intersection(wcsim)
# TODO: due to a bug in the sphere package, see # https://github.com/spacetelescope/sphere/issues/74 # intersections with polygons formed as union does not work. # For this reason I re-implement 'intersection_area' below with # a workaround for the bug. # The original implementation should be uncommented once the bug # is fixed. # #def intersection_area(self, wcsim): #""" Calculate the area of the intersection polygon. #""" #return np.fabs(self.intersection(wcsim).area())
[docs] def intersection_area(self, wcsim): """ Calculate the area of the intersection polygon. """ if isinstance(wcsim, (WCSImageCatalog, RefCatalog)): return np.fabs(self.intersection(wcsim).area()) else: # this is bug workaround for image groups (multi-unions): area = 0.0 for wim in wcsim: area += np.fabs( self.polygon.intersection(wim.polygon).area() ) return area
def _calc_chip_bounding_polygon(self, stepsize=None): """ Compute image's bounding polygon. Parameters ---------- stepsize : int, None, optional Indicates the maximum separation between two adjacent vertices of the bounding polygon along each side of the image. Corners of the image are included automatically. If `stepsize` is `None`, bounding polygon will contain only vertices of the image. """ if self.wcs is None: return ny, nx = self.imshape if stepsize is None: nintx = 2 ninty = 2 else: nintx = max(2, int(np.ceil((nx + 1.0) / stepsize))) ninty = max(2, int(np.ceil((ny + 1.0) / stepsize))) xs = np.linspace(-0.5, nx - 0.5, nintx, dtype=np.float) ys = np.linspace(-0.5, ny - 0.5, ninty, dtype=np.float)[1:-1] nptx = xs.size npty = ys.size npts = 2 * (nptx + npty) borderx = np.empty((npts + 1,), dtype=np.float) bordery = np.empty((npts + 1,), dtype=np.float) # "bottom" points: borderx[:nptx] = xs bordery[:nptx] = -0.5 # "right" sl = np.s_[nptx:nptx + npty] borderx[sl] = nx - 0.5 bordery[sl] = ys # "top" sl = np.s_[nptx + npty:2 * nptx + npty] borderx[sl] = xs[::-1] bordery[sl] = ny - 0.5 # "left" sl = np.s_[2 * nptx + npty:-1] borderx[sl] = -0.5 bordery[sl] = ys[::-1] # close polygon: borderx[-1] = borderx[0] bordery[-1] = bordery[0] ra, dec = self.det_to_world(borderx, bordery) # TODO: for strange reasons, occasionally ra[0] != ra[-1] and/or # dec[0] != dec[-1] (even though we close the polygon in the # previous two lines). Then SphericalPolygon fails because # points are not closed. Threfore we force it to be closed: ra[-1] = ra[0] dec[-1] = dec[0] self.img_bounding_ra = ra self.img_bounding_dec = dec self._polygon = SphericalPolygon.from_radec(ra, dec) def _calc_cat_convex_hull(self): """ Compute convex hull that bounds the sources in the catalog. """ if self.wcs is None or self.catalog is None: return x = self.catalog['x'] y = self.catalog['y'] if len(x) == 0: # no points raise RuntimeError("Unexpected error: Contact sofware developer") elif len(x) == 1: # one point. build a small box around it: x, y = convex_hull(x, y, wcs=self.det_to_tanp) xv = [x[0] - 0.5, x[0] - 0.5, x[0] + 0.5, x[0] + 0.5, x[0] - 0.5] yv = [y[0] - 0.5, y[0] + 0.5, y[0] + 0.5, y[0] - 0.5, y[0] - 0.5] ra, dec = self.tanp_to_world(xv, yv) elif len(x) == 3: # two points. build a small box around them: x, y = convex_hull(x, y, wcs=self.det_to_tanp) vx = -(y[1] - y[0]) vy = x[1] - x[0] norm = 2.0 * np.sqrt(vx * vx + vy * vy) vx /= norm vy /= norm xv = [x[0] + vx, x[1] + vx, x[1] + vx, x[0] - vx, x[0] + vx] yv = [y[0] + vy, y[1] + vy, y[1] + vy, x[0] - vx, x[0] + vx] ra, dec = self.tanp_to_world(xv, yv) else: ra, dec = convex_hull(x, y, wcs=self.det_to_world) # TODO: for strange reasons, occasionally ra[0] != ra[-1] and/or # dec[0] != dec[-1] (even though we close the polygon in the # previous two lines). Then SphericalPolygon fails because # points are not closed. Threfore we force it to be closed: ra[-1] = ra[0] dec[-1] = dec[0] self._bb_radec = (ra, dec) self._polygon = SphericalPolygon.from_radec(ra, dec) self._poly_area = np.fabs(self._polygon.area())
[docs] def calc_bounding_polygon(self): """ Calculate bounding polygon of the image or of the sources in the catalog (if catalog was set). """ # we need image's footprint for later: self._calc_chip_bounding_polygon() # create smallest convex spherical polygon bounding all sources: if self._catalog is not None and len(self.catalog) > 0: self._calc_cat_convex_hull()
@property def bb_radec(self): """ Get a 2xN `numpy.ndarray` of RA and DEC of the vertices of the bounding polygon. """ return self._bb_radec
[docs]class WCSGroupCatalog(): """ A class that holds together `WCSImageCatalog` image catalog objects whose relative positions are fixed and whose source catalogs should be fitted together to a reference catalog. """ def __init__(self, images, name=None): """ Parameters ---------- images : list of WCSImageCatalog A list of `WCSImageCatalog` image catalogs. name : str, None, optional Name of the group. """ self._catalog = None if isinstance(images, WCSImageCatalog): self._images = [images] elif hasattr(images, '__iter__'): self._images = [] for im in images: if not isinstance(im, WCSImageCatalog): raise TypeError("Each element of the 'images' parameter " "must be an 'WCSImageCatalog' object.") self._images.append(im) else: raise TypeError("Parameter 'images' must be either a single " "'WCSImageCatalog' object or a list of " "'WCSImageCatalog' objects") self._name = name self.update_bounding_polygon() self._catalog = self.create_group_catalog() @property def name(self): """ Get/set :py:class:`WCSImageCatalog` object's name. """ return self._name @name.setter def name(self, name): self._name = name @property def polygon(self): """ Get image's (or catalog's) bounding spherical polygon. """ return self._polygon
[docs] def intersection(self, wcsim): """ Compute intersection of this `WCSGroupCatalog` object and another `WCSImageCatalog`, `WCSGroupCatalog`, or :py:class:`~spherical_geometry.polygon.SphericalPolygon` object. Parameters ---------- wcsim : WCSImageCatalog, WCSGroupCatalog, SphericalPolygon Another object that should be intersected with this `WCSGroupCatalog`. Returns ------- polygon : SphericalPolygon A :py:class:`~spherical_geometry.polygon.SphericalPolygon` that is the intersection of this `WCSGroupCatalog` and `wcsim`. """ if isinstance(wcsim, (WCSGroupCatalog, WCSImageCatalog)): return self._polygon.intersection(wcsim.polygon) else: return self._polygon.intersection(wcsim)
# TODO: due to a bug in the sphere package, see # https://github.com/spacetelescope/sphere/issues/74 # intersections with polygons formed as union does not work. # For this reason I re-implement 'intersection_area' below with # a workaround for the bug. # The original implementation should be uncommented once the bug # is fixed. # #def intersection_area(self, wcsim): #""" Calculate the area of the intersection polygon. #""" #return np.fabs(self.intersection(wcsim).area())
[docs] def intersection_area(self, wcsim): """ Calculate the area of the intersection polygon. """ area = 0.0 for im in self._images: area += im.intersection_area(wcsim) return area
[docs] def update_bounding_polygon(self): """ Recompute bounding polygons of the member images. """ polygons = [im.polygon for im in self._images] if len(polygons) == 0: self._polygon = SphericalPolygon([]) else: self._polygon = SphericalPolygon.multi_union(polygons)
def __len__(self): return len(self._images) def __getitem__(self, idx): return self._images[idx] def __iter__(self): for image in self._images: yield image @property def catalog(self): """ Get/set image's catalog. """ return self._catalog
[docs] def create_group_catalog(self): """ Combine member's image catalogs into a single group's catalog. Returns ------- group_catalog : astropy.table.Table Combined group catalog. """ catalogs = [] catno = 0 for image in self._images: catlen = len(image.catalog) if image.name is None: catname = 'Catalog #{:d}'.format(catno) else: catname = image.name col_catname = table.MaskedColumn(catlen * [catname], name='cat_name') col_imcatidx = table.MaskedColumn(catlen * [catno], name='_imcat_idx') col_id = table.MaskedColumn(image.catalog['id']) col_x = table.MaskedColumn(image.catalog['x'], dtype=np.float64) col_y = table.MaskedColumn(image.catalog['y'], dtype=np.float64) ra, dec = image.det_to_world( image.catalog['x'], image.catalog['y'] ) col_ra = table.MaskedColumn(ra, dtype=np.float64, name='RA') col_dec = table.MaskedColumn(dec, dtype=np.float64, name='DEC') cat = table.Table( [col_imcatidx, col_catname, col_id, col_x, col_y, col_ra, col_dec], masked=True ) catalogs.append(cat) catno += 1 return table.vstack(catalogs, join_type='exact')
[docs] def get_unmatched_cat(self): """ Retrieve only those sources from the catalog that have **not** been matched to the sources in the reference catalog. """ mask = self._catalog['matched_ref_id'].mask return self._catalog[mask]
[docs] def get_matched_cat(self): """ Retrieve only those sources from the catalog that **have been** matched to the sources in the reference catalog. """ mask = np.logical_not(self._catalog['matched_ref_id'].mask) return self._catalog[mask]
[docs] def recalc_catalog_radec(self): """ Recalculate RA and DEC of the sources in the catalog. """ for k, image in enumerate(self._images): idx = (self._catalog['_imcat_idx'] == k) if not np.any(idx): continue ra, dec = image.det_to_world( self._catalog['x'][idx], self._catalog['y'][idx] ) self._catalog['RA'][idx] = ra self._catalog['DEC'][idx] = dec
[docs] def calc_tanp_xy(self, tanplane_wcs): """ Compute x- and y-positions of the sources from the image catalog in the tangent plane. This creates the following columns in the catalog's table: ``'xtanp'`` and ``'ytanp'``. Parameters ---------- tanplane_wcs : ImageWCS A `ImageWCS` object that will provide transformations to the tangent plane to which sources of this catalog a should be "projected". """ if 'RA' not in self._catalog.colnames or \ 'DEC' not in self._catalog.colnames: raise RuntimeError("'recalc_catalog_radec()' should have been run " "prior to calc_tanp_xy()") # compute x & y in the reference WCS: xtp, ytp = tanplane_wcs.world_to_tanp(self.catalog['RA'], self.catalog['DEC']) self._catalog['xtanp'] = table.MaskedColumn( xtp, name='xtanp', dtype=np.float64, mask=False ) self._catalog['ytanp'] = table.MaskedColumn( ytp, name='ytanp', dtype=np.float64, mask=False )
[docs] def match2ref(self, refcat, minobj=15, searchrad=1.0, separation=0.5, use2dhist=True, xoffset=0.0, yoffset=0.0, tolerance=1.0): """ Uses xyxymatch to cross-match sources between this catalog and a reference catalog. Parameters ---------- refcat : RefCatalog A `RefCatalog` object that contains a catalog of reference sources as well as a valid reference WCS. minobj : int, None, optional Minimum number of identified objects from each input image to use in matching objects from other images. If the default `None` value is used then `align` will automatically deternmine the minimum number of sources from the value of the `fitgeom` parameter. searchrad : float, optional The search radius for a match. separation : float, optional The minimum separation for sources in the input and reference catalogs in order to be considered to be disctinct sources. Objects closer together than 'separation' pixels are removed from the input and reference coordinate lists prior to matching. This parameter gets passed directly to :py:func:`~stsci.stimage.xyxymatch` for use in matching the object lists from each image with the reference image's object list. use2dhist : bool, optional Use 2D histogram to find initial offset? xoffset : float, optional Initial estimate for the offset in X between the images and the reference frame. This offset will be used for all input images provided. This parameter is ignored when `use2dhist` is `True`. yoffset : float (Default = 0.0) Initial estimate for the offset in Y between the images and the reference frame. This offset will be used for all input images provided. This parameter is ignored when `use2dhist` is `True`. tolerance : float, optional The matching tolerance in pixels after applying an initial solution derived from the 'triangles' algorithm. This parameter gets passed directly to :py:func:`~stsci.stimage.xyxymatch` for use in matching the object lists from each image with the reference image's object list. """ colnames = self._catalog.colnames if 'xtanp' not in colnames or 'ytanp' not in colnames: raise RuntimeError("'calc_tanp_xy()' should have been run prior " "to match2ref()") im_xyref = np.asanyarray([self._catalog['xtanp'], self._catalog['ytanp']]).T refxy = np.asanyarray([refcat.catalog['xtanp'], refcat.catalog['ytanp']]).T log.info("Matching sources from '{}' with sources from reference " "{:s} '{}'".format(self.name, 'image', refcat.name)) xyoff = (xoffset, yoffset) if use2dhist: # Determine xyoff (X,Y offset) and tolerance # to be used with xyxymatch: zpxoff, zpyoff, flux, zpqual = matchutils.build_xy_zeropoint( im_xyref, refxy, searchrad=searchrad ) if zpqual is not None: xyoff = (zpxoff, zpyoff) # set tolerance as well # This value allows initial guess to be off by 1 in both and # still pick up the identified matches tolerance = 1.5 matches = xyxymatch( im_xyref, refxy, origin=xyoff, tolerance=tolerance, separation=separation ) nmatches = len(matches) self._catalog.meta['nmatches'] = nmatches minput_idx = matches['input_idx'] catlen = len(self._catalog) # matched_ref_id: if 'matched_ref_id' not in colnames: c = table.MaskedColumn(name='matched_ref_id', dtype=int, length=catlen, mask=True) self._catalog.add_column(c) else: self._catalog['matched_ref_id'].mask = True self._catalog['matched_ref_id'][minput_idx] = \ self._catalog['id'][minput_idx] self._catalog['matched_ref_id'].mask[minput_idx] = False # this is needed to index reference catalog directly without using # astropy table indexing which at this moment is experimental: if '_raw_matched_ref_idx' not in colnames: c = table.MaskedColumn(name='_raw_matched_ref_idx', dtype=int, length=catlen, mask=True) self._catalog.add_column(c) else: self._catalog['_raw_matched_ref_idx'].mask = True self._catalog['_raw_matched_ref_idx'][minput_idx] = \ matches['ref_idx'] self._catalog['_raw_matched_ref_idx'].mask[minput_idx] = False log.info("Found {:d} matches for '{}'...".format(nmatches, self.name)) return matches
[docs] def fit2ref(self, refcat, tanplane_wcs, fitgeom='general', nclip=3, sigma=3.0): """ Perform linear fit of this group's combined catalog to the reference catalog. Parameters ---------- refcat : RefCatalog A `RefCatalog` object that contains a catalog of reference sources. tanplane_wcs : ImageWCS A `ImageWCS` object that will provide transformations to the tangent plane to which sources of this catalog a should be "projected". fitgeom : {'shift', 'rscale', 'general'}, optional The fitting geometry to be used in fitting the matched object lists. This parameter is used in fitting the offsets, rotations and/or scale changes from the matched object lists. The 'general' fit geometry allows for independent scale and rotation for each axis. nclip : int, optional Number (a non-negative integer) of clipping iterations in fit. sigma : float, optional Clipping limit in sigma units. """ im_xyref = np.asanyarray([self._catalog['xtanp'], self._catalog['ytanp']]).T refxy = np.asanyarray([refcat.catalog['xtanp'], refcat.catalog['ytanp']]).T mask = np.logical_not(self._catalog['matched_ref_id'].mask) im_xyref = im_xyref[mask] ref_idx = self._catalog['_raw_matched_ref_idx'][mask] refxy = refxy[ref_idx] fit = linearfit.iter_linear_fit( refxy, im_xyref, fitgeom=fitgeom, nclip=nclip, sigma=sigma, center=(0, 0) ) xy_fit = fit['img_coords'] + fit['resids'] fit['fit_xy'] = xy_fit fit['fit_RA'], fit['fit_DEC'] = tanplane_wcs.tanp_to_world(*(xy_fit.T)) log.info("Computed '{:s}' fit for {}:".format(fitgeom, self.name)) if fitgeom == 'shift': log.info("XSH: {:.6g} YSH: {:.6g}" .format(fit['offset'][0], fit['offset'][1])) elif fitgeom == 'rscale' and fit['proper']: log.info("XSH: {:.6g} YSH: {:.6g} ROT: {:.6g} SCALE: {:.6g}" .format(fit['offset'][0], fit['offset'][1], fit['rot'], fit['scale'][0])) elif fitgeom == 'general' or (fitgeom == 'rscale' and not fit['proper']): log.info("XSH: {:.6g} YSH: {:.6g} PROPER ROT: {:.6g} " .format(fit['offset'][0], fit['offset'][1], fit['rot'])) log.info("<ROT>: {:.6g} SKEW: {:.6g} ROT_X: {:.6g} " "ROT_Y: {:.6g}".format(fit['rotxy'][2], fit['skew'], fit['rotxy'][0], fit['rotxy'][1])) log.info("<SCALE>: {:.6g} SCALE_X: {:.6g} SCALE_Y: {:.6g}" .format(fit['scale'][0], fit['scale'][1], fit['scale'][2])) else: raise ValueError("Unsupported fit geometry.") log.info("") log.info("XRMS: {:.6g} YRMS: {:.6g}".format(fit['rms'][0], fit['rms'][1])) log.info("Final solution based on {:d} objects." .format(fit['resids'].shape[0])) return fit
[docs] def apply_affine_to_wcs(self, tanplane_wcs, matrix, shift): """ Applies a general affine transformation to the WCS. """ # compute the matrix for the scale and rotation correction matrix = matrix.T shift = -np.dot(np.linalg.inv(matrix), shift) for imcat in self: # compute linear transformation from the tangent plane used for # alignment to the tangent plane of another image in the group: if imcat.imwcs == tanplane_wcs: m = matrix.copy() s = shift.copy() else: r1, t1 = _tp2tp(imcat.imwcs, tanplane_wcs) r2, t2 = _tp2tp(tanplane_wcs, imcat.imwcs) m = np.linalg.multi_dot([r2, matrix, r1]) s = t1 + np.dot(np.linalg.inv(r1), shift) + \ np.dot(np.linalg.inv(np.dot(matrix, r1)), t2) imcat.imwcs.set_correction(m, s) imcat.meta['image_model'].meta.wcs = imcat.wcs
[docs] def align_to_ref(self, refcat, minobj=15, searchrad=1.0, separation=0.5, use2dhist=True, xoffset=0.0, yoffset=0.0, tolerance=1.0, fitgeom='rscale', nclip=3, sigma=3.0): """ Matches sources from the image catalog to the sources in the reference catalog, finds the affine transformation between matched sources, and adjusts images' WCS according to this fit. Parameters ---------- refcat : RefCatalog A `RefCatalog` object that contains a catalog of reference sources as well as a valid reference WCS. minobj : int, None, optional Minimum number of identified objects from each input image to use in matching objects from other images. If the default `None` value is used then `align` will automatically deternmine the minimum number of sources from the value of the `fitgeom` parameter. searchrad : float, optional The search radius for a match. separation : float, optional The minimum separation for sources in the input and reference catalogs in order to be considered to be disctinct sources. Objects closer together than 'separation' pixels are removed from the input and reference coordinate lists prior to matching. This parameter gets passed directly to :py:func:`~stsci.stimage.xyxymatch` for use in matching the object lists from each image with the reference image's object list. use2dhist : bool, optional Use 2D histogram to find initial offset? xoffset : float, optional Initial estimate for the offset in X between the images and the reference frame. This offset will be used for all input images provided. This parameter is ignored when `use2dhist` is `True`. yoffset : float (Default = 0.0) Initial estimate for the offset in Y between the images and the reference frame. This offset will be used for all input images provided. This parameter is ignored when `use2dhist` is `True`. tolerance : float, optional The matching tolerance in pixels after applying an initial solution derived from the 'triangles' algorithm. This parameter gets passed directly to :py:func:`~stsci.stimage.xyxymatch` for use in matching the object lists from each image with the reference image's object list. fitgeom : {'shift', 'rscale', 'general'}, optional The fitting geometry to be used in fitting the matched object lists. This parameter is used in fitting the offsets, rotations and/or scale changes from the matched object lists. The 'general' fit geometry allows for independent scale and rotation for each axis. nclip : int, optional Number (a non-negative integer) of clipping iterations in fit. sigma : float, optional Clipping limit in sigma units. """ if len(self._images) == 0: return tanplane_wcs = deepcopy(self._images[0].imwcs) self.calc_tanp_xy(tanplane_wcs=tanplane_wcs) refcat.calc_tanp_xy(tanplane_wcs=tanplane_wcs) self.match2ref(refcat=refcat, minobj=minobj, searchrad=searchrad, separation=separation, use2dhist=use2dhist, xoffset=xoffset, yoffset=yoffset, tolerance=tolerance) fit = self.fit2ref(refcat=refcat, tanplane_wcs=tanplane_wcs, fitgeom=fitgeom, nclip=nclip, sigma=sigma) self.apply_affine_to_wcs( tanplane_wcs=tanplane_wcs, matrix=fit['fit_matrix'], shift=fit['offset'] )
def _tp2tp(imwcs1, imwcs2): x = np.array([0.0, 1.0, 0.0], dtype=np.float) y = np.array([0.0, 0.0, 1.0], dtype=np.float) xrp, yrp = imwcs2.world_to_tanp(*imwcs1.tanp_to_world(x, y)) matrix = np.array([(xrp[1:] - xrp[0]), (yrp[1:] - yrp[0])]) shift = -np.dot(np.linalg.inv(matrix), [xrp[0], yrp[0]]) return matrix, shift
[docs]class RefCatalog(): """ An object that holds a reference catalog and provides tools for coordinate convertions using reference WCS as well as catalog manipulation and expansion. """ def __init__(self, catalog, name=None): """ Parameters ---------- catalog : astropy.table.Table Reference catalog. ..note:: Reference catalogs (:py:class:`~astropy.table.Table`) *must* contain *both* ``'RA'`` and ``'DEC'`` columns. name : str, None, optional Name of the reference catalog. """ self._name = name self._catalog = None # make sure catalog has RA & DEC if catalog is not None: self.catalog = catalog def _check_catalog(self, catalog): if catalog is None: raise ValueError("Reference catalogs cannot be None") if 'RA' not in catalog.colnames or 'DEC' not in catalog.colnames: raise KeyError("Reference catalogs *must* contain *both* 'RA' " "and 'DEC' columns.") @property def name(self): """ Get/set :py:class:`WCSImageCatalog` object's name. """ return self._name @name.setter def name(self, name): self._name = name @property def catalog(self): """ Get/set image's catalog. """ return self._catalog @catalog.setter def catalog(self, catalog): self._check_catalog(catalog) if len(catalog) == 0: raise ValueError("Catalog must contain at least one source.") self._catalog = catalog.copy() # create spherical polygon bounding the sources self.calc_bounding_polygon() @property def poly_area(self): """ Area of the bounding polygon (in srad). """ return self._poly_area @property def polygon(self): """ Get image's (or catalog's) bounding spherical polygon. """ return self._polygon
[docs] def intersection(self, wcsim): """ Compute intersection of this `WCSImageCatalog` object and another `WCSImageCatalog`, `WCSGroupCatalog`, `RefCatalog`, or :py:class:`~spherical_geometry.polygon.SphericalPolygon` object. Parameters ---------- wcsim : WCSImageCatalog, WCSGroupCatalog, RefCatalog, SphericalPolygon Another object that should be intersected with this `WCSImageCatalog`. Returns ------- polygon : SphericalPolygon A :py:class:`~spherical_geometry.polygon.SphericalPolygon` that is the intersection of this `WCSImageCatalog` and `wcsim`. """ if isinstance(wcsim, (WCSImageCatalog, WCSGroupCatalog, RefCatalog)): return self._polygon.intersection(wcsim.polygon) else: return self._polygon.intersection(wcsim)
# TODO: due to a bug in the sphere package, see # https://github.com/spacetelescope/sphere/issues/74 # intersections with polygons formed as union does not work. # For this reason I re-implement 'intersection_area' below with # a workaround for the bug. # The original implementation should be uncommented once the bug # is fixed. # #def intersection_area(self, wcsim): #""" Calculate the area of the intersection polygon. #""" #return np.fabs(self.intersection(wcsim).area())
[docs] def intersection_area(self, wcsim): """ Calculate the area of the intersection polygon. """ if isinstance(wcsim, (WCSImageCatalog, RefCatalog)): return np.fabs(self.intersection(wcsim).area()) else: # this is bug workaround: area = 0.0 for wim in wcsim: area += np.fabs( self.polygon.intersection(wim.polygon).area() ) return area
def _calc_cat_convex_hull(self): """ Calculate spherical polygon corresponding to the convex hull bounding the sources in the catalog. """ if self.catalog is None: return # Find an "optimal" tangent plane to the catalog points based on the # mean point and then construct a WCS based on the mean point. # Compute x, y coordinates in this tangent plane based on the # previously computed WCS and return the set of x, y coordinates and # "reference WCS". x, y, z = TPCorr.spherical2cartesian( self.catalog['RA'], self.catalog['DEC'] ) ra_ref, dec_ref = TPCorr.cartesian2spherical( x.mean(dtype=np.float64), y.mean(dtype=np.float64), z.mean(dtype=np.float64) ) rotm = [rot_mat3D(np.deg2rad(alpha), 2 - axis) for axis, alpha in enumerate([ra_ref, dec_ref])] euler_rot = np.linalg.multi_dot(rotm) inv_euler_rot = np.linalg.inv(euler_rot) xr, yr, zr = np.dot(euler_rot, (x, y, z)) r0 = TPCorr.r0 x = r0 * yr / xr y = r0 * zr / xr xv, yv = convex_hull(x, y) if len(xv) == 0: # no points raise RuntimeError("Unexpected error: Contact sofware developer") elif len(xv) == 1: # one point. build a small box around it: x, y = convex_hull(x, y, wcs=None) xv = [x[0] - 0.5, x[0] - 0.5, x[0] + 0.5, x[0] + 0.5, x[0] - 0.5] yv = [y[0] - 0.5, y[0] + 0.5, y[0] + 0.5, y[0] - 0.5, y[0] - 0.5] elif len(xv) == 3: # two points. build a small box around them: x, y = convex_hull(x, y, wcs=None) vx = -(y[1] - y[0]) vy = x[1] - x[0] norm = 2.0 * np.sqrt(vx * vx + vy * vy) vx /= norm vy /= norm xv = [x[0] + vx, x[1] + vx, x[1] + vx, x[0] - vx, x[0] + vx] yv = [y[0] + vy, y[1] + vy, y[1] + vy, x[0] - vx, x[0] + vx] # "unrotate" cartezian coordinates back to their original # ra_ref and dec_ref "positions": xt = np.full_like(xv, r0) xcr, ycr, zcr = np.dot(inv_euler_rot, (xt, xv, yv)) # convert cartesian to spherical coordinates: ra, dec = TPCorr.cartesian2spherical(xcr, ycr, zcr) # TODO: for strange reasons, occasionally ra[0] != ra[-1] and/or # dec[0] != dec[-1] (even though we close the polygon in the # previous two lines). Then SphericalPolygon fails because # points are not closed. Threfore we force it to be closed: ra[-1] = ra[0] dec[-1] = dec[0] self._radec = [(ra, dec)] self._polygon = SphericalPolygon.from_radec(ra, dec) self._poly_area = np.fabs(self._polygon.area())
[docs] def calc_bounding_polygon(self): """ Calculate bounding polygon of the sources in the catalog. """ # create spherical polygon bounding the sources self._calc_cat_convex_hull()
[docs] def expand_catalog(self, catalog): """ Expand current reference catalog with sources from another catalog. Parameters ---------- catalog : astropy.table.Table A catalog of new sources to be added to the existing reference catalog. `catalog` *must* contain *both* ``'RA'`` and ``'DEC'`` columns. """ self._check_catalog(catalog) cat = catalog.copy() if self._catalog is None: self._catalog = cat else: self._catalog = table.vstack([self.catalog, cat], join_type='outer') self.calc_bounding_polygon()
[docs] def calc_tanp_xy(self, tanplane_wcs): """ Compute x- and y-positions of the sources from the reference catalog in the tangent plane provided by the `tanplane_wcs`. This creates the following columns in the catalog's table: ``'xtanp'`` and ``'ytanp'``. Parameters ---------- tanplane_wcs : ImageWCS A `ImageWCS` object that will provide transformations to the tangent plane to which sources of this catalog a should be "projected". """ # compute x & y in the reference WCS: xtp, ytp = tanplane_wcs.world_to_tanp(self.catalog['RA'], self.catalog['DEC']) self._catalog['xtanp'] = table.MaskedColumn( xtp, name='xtanp', dtype=np.float64, mask=False ) self._catalog['ytanp'] = table.MaskedColumn( ytp, name='ytanp', dtype=np.float64, mask=False )
[docs]def convex_hull(x, y, wcs=None): """Computes the convex hull of a set of 2D points. Implements `Andrew's monotone chain algorithm <http://en.wikibooks.org\ /wiki/Algorithm_Implementation/Geometry/Convex_hull/Monotone_chain>`_. The algorithm has O(n log n) complexity. Credit: `<http://en.wikibooks.org/wiki/Algorithm_Implementation/Geometry/\ Convex_hull/Monotone_chain>`_ Parameters ---------- points : list of tuples An iterable sequence of (x, y) pairs representing the points. Returns ------- Output : list A list of vertices of the convex hull in counter-clockwise order, starting from the vertex with the lexicographically smallest coordinates. """ ndarray = isinstance(x, np.ndarray) or isinstance(y, np.ndarray) # Sort the points lexicographically (tuples are compared # lexicographically). Remove duplicates to detect the case we have # just one unique point. points = sorted(set(zip(x, y))) # Boring case: no points or a single point, # possibly repeated multiple times. if len(points) == 0: if not ndarray: return (np.array([]), np.array([])) else: return ([], []) elif len(points) == 1: if not ndarray: return (np.array([points[0][0]]), np.array([points[0][1]])) else: return ([points[0][0]], [points[0][1]]) # 2D cross product of OA and OB vectors, i.e. z-component of their # 3D cross product. # Returns a positive value, if OAB makes a counter-clockwise turn, # negative for clockwise turn, and zero if the points are collinear. def cross(o, a, b): return (a[0] - o[0]) * (b[1] - o[1]) - (a[1] - o[1]) * (b[0] - o[0]) # Build lower hull lower = [] for p in points: while len(lower) >= 2 and cross(lower[-2], lower[-1], p) <= 0: lower.pop() lower.append(p) # Build upper hull upper = [] for p in reversed(points): while len(upper) >= 2 and cross(upper[-2], upper[-1], p) <= 0: upper.pop() upper.append(p) # Concatenation of the lower and upper hulls gives the convex hull. # Last point of each list is omitted because it is repeated at the # beginning of the other list. total_hull = np.asanyarray(lower[:-1] + upper) ptx = total_hull[:, 0] pty = total_hull[:, 1] if wcs is None: if not ndarray: return (ptx.tolist(), pty.tolist()) else: return (ptx, pty) # convert x, y vertex coordinates to RA & DEC: ra, dec = wcs(ptx, pty) ra[-1] = ra[0] dec[-1] = dec[0] if not ndarray: return (ra.tolist(), dec.tolist()) else: return (ra, dec)