Source code for fiberassign.hardware

# Licensed under a 3-clause BSD style license - see LICENSE.rst
# -*- coding: utf-8 -*-
"""
fiberassign.hardware
=======================

Functions for loading information about the telescope hardware.

"""
from __future__ import absolute_import, division, print_function

from datetime import datetime, timezone

import numpy as np

from scipy.interpolate import interp1d
import astropy.table

import desimodel.io as dmio

from .utils import Logger

from ._internal import (
    Hardware,
    FIBER_STATE_OK,
    FIBER_STATE_UNASSIGNED,
    FIBER_STATE_STUCK,
    FIBER_STATE_BROKEN,
    FIBER_STATE_RESTRICT,
    Circle,
    Segments,
    Shape,
)

def get_default_exclusion_margins():
    '''
    We add an additional margin around the exclusion zones in the hardware descriptions.
    This function returns the defaults used by the fiberassign scripts.

    Returns:
    margins (dict): with keys "pos", "gfa" and "petal", to floating-point margins in millimeters.
    '''
    return dict(pos=0.05,
                petal=0.4,
                gfa=0.4)

def expand_closed_curve(xx, yy, margin):
    '''
    For the --margin-{pos,petal,gfa} options, we can add a buffer zone
    around the positioner keep-out polygons.  This function implements
    the geometry to achieve this.

    Given a RIGHT-HANDED closed polygon xx,yy and margin, returns new
    x,y coordinates expanded by a margin of `margin`.

    (By right-handed, I mean that the points are listed
    counter-clockwise, and if you walk the boundary, the inside of the
    shape is to the left; the expanded points will be to the right.)

    If the order of the polygon is reversed, the "expanded" points
    will actually be on the *inside* of the polygon.  Setting the
    margin negative counteracts this.

    Note that we strictly require a closed curve.  Collinear polygon
    segments will cause problems!

    '''
    ex, ey = [],[]

    # These are closed curves (last point = first point)
    # (this isn't strictly required by the fundamental algorithm, but is assumed
    # in the way we select previous and next points in the loop below.)
    if (xx[0] != xx[-1]) or (yy[0] != yy[-1]):
        log = Logger.get()
        log.warning('Expected exclusion polygons to be closed curves; got x, y = %s, %s' % (str(xx), str(yy)))
        return xx, yy
    assert(xx[0] == xx[-1])
    assert(yy[0] == yy[-1])

    N = len(xx)

    for j in range(N):
        # We go through the points, and for each point we consider the previous
        # and next point.  The "expanded" point will be defined according to the
        # two edges (vector) coming from the point.
        i = j - 1
        if i == -1:
            # wrap around, skipping repeated point
            i = N-2
        k = j + 1
        if k == N:
            k = 1

        x1 = xx[i]
        y1 = yy[i]
        x2 = xx[j]
        y2 = yy[j]
        x3 = xx[k]
        y3 = yy[k]

        # Vectors to and from the central point.
        vx1 = x2 - x1
        vy1 = y2 - y1
        vx2 = x3 - x2
        vy2 = y3 - y2
        # We can't handle repeated points!
        assert(not(vx2 == 0. and vy2 == 0.))

        # Get the angle between the vectors -- our expanded point is going to
        # be halfway between these two vectors.
        cross1 = vx1 * vy2 - vx2 * vy1
        vv1 = np.hypot(vx1,vy1)
        vv2 = np.hypot(vx2,vy2)
        theta = np.arcsin(np.clip(cross1 / (vv1 * vv2), -1., 1.))
        # Detect sharp (>90 degree) turns
        dot = vx1*vx2 + vy1*vy2
        # the angle of the expanded point is relative to vector 1
        a = np.arctan2(vy1, vx1)
        if dot < 0:
            # sharp turn -- the theta=arcsin aliases the angle, which is outside
            # the range of [-pi/2, +pi/2]; adjust theta to the aliased angle.
            if theta > 0:
                theta =  np.pi - theta
            else:
                theta = -np.pi - theta
        da = np.pi/2. + theta/2.
        # This places the point further from the original keeps the vectors parallel to their originals
        stretch = 1./np.cos(theta/2.)
        dx = -margin * stretch * np.cos(a + da)
        dy = -margin * stretch * np.sin(a + da)

        ex.append(x2 + dx)
        ey.append(y2 + dy)

    return ex, ey

# Global cache of exclusions.  (exclusions are geometric regions describing the keep-out
# areas around positioners, GFAs, and petal edges)
_cache_shape_to_excl = dict()

def get_exclusions(exclude, exclname, margins, local_cache=None):
    '''
    Fetches an exclusion region from a data file, applying additional margins as required.

    A "local_cache", valid for a single "exclude" file, can be passed in.

    Args:
    * exclude: the exclusions file, eg from desimodel.io.load_focalplane.
    * exclname: the string name to look up
    * margins: dict of additional margins to apply
    * local_cache: dict
    '''
    global _cache_shape_to_excl

    if local_cache is not None:
        # check the local cache, where names are unique
        e = local_cache.get(exclname)
        if e is not None:
            return e

    shp = exclude[exclname]

    # NOTE that different exclusions files can assign
    # different values to the same key!  ie, code 0000fbb5303ebbc2 appears in
    #   desi-exclusion_2021-03-17T23:20:01.json and
    #   desi-exclusion_2021-06-25T22:38:59+00:00.json
    # with totally different values.
    # We therefore cannot use the "exclname" as the (global) cache key.
    #
    # I tried caching the "expand_closed_curve" computation using a hash of the
    # shape itself as the cache key, but that turned out not to help -- computing
    # the key was too expensive?
    #
    # For further speedups, I suspect that moving expand_closed_curve to C++ would be
    # very effective.

    rtn = dict()
    for obj in shp.keys():
        cr = list()
        for crc in shp[obj]["circles"]:
            cr.append(Circle(crc[0], crc[1]))
        sg = list()
        for sgm in shp[obj]["segments"]:
            if obj in margins:
                # Create a hashable version of the line-segment list
                key = (obj, tuple(tuple(xy) for xy in sgm))
                cached = _cache_shape_to_excl.get(key)
                if cached is not None:
                    sgm = cached
                else:
                    sx = [x for x,y in sgm]
                    sy = [y for x,y in sgm]
                    ex,ey = expand_closed_curve(sx, sy, margins[obj])
                    sgm = list(zip(ex, ey))
                    _cache_shape_to_excl[key] = sgm
            sg.append(Segments(sgm))
        fshp = Shape((0.0, 0.0), cr, sg)
        rtn[obj] = fshp
    if local_cache is not None:
        local_cache[exclname] = rtn
    return rtn

[docs]def load_hardware(focalplane=None, rundate=None, add_margins={}, get_time_range=False): """Create a hardware class representing properties of the telescope. Args: focalplane (tuple): Override the focalplane model. If not None, this should be a tuple of the same data types returned by desimodel.io.load_focalplane() rundate (str): ISO 8601 format time stamp as a string in the format YYYY-MM-DDTHH:MM:SS+-zz:zz. If None, uses current time. add_margins (dict): additional margins to add around positioners, GFAs, and petals. Dict with keys "pos", "gfa", "petal" and values of millimeters. get_time_range (bool): if True, return (hw, time_lo, time_hi), where time_lo and time_hi are datetime objects corresponding to the first and last dates when the hardware was in this state. Returns: if get_time_range is True: (hardware, time_lo, time_hi) else: hardware (Hardware): The hardware object. """ args = load_hardware_args(focalplane=focalplane, rundate=rundate, add_margins=add_margins) args, time_lo, time_hi = args hw = Hardware(*args) if get_time_range: return hw, time_lo, time_hi return hw
def load_hardware_args(focalplane=None, rundate=None, add_margins={}): """Reads the arguments needed to create a Hardware object representing the properties of the instrument at a given date. Also returns the range of dates when this hardware configuration is valid. Args: focalplane (tuple): Override the focalplane model. If not None, this should be a tuple of the same data types returned by desimodel.io.load_focalplane() rundate (str): ISO 8601 format time stamp as a string in the format YYYY-MM-DDTHH:MM:SS+-zz:zz. If None, uses current time. add_margins (dict): additional margins to add around positioners, GFAs, and petals. Dict with keys "pos", "gfa", "petal" and values of millimeters. Returns: args (tuple): The arguments to be passed to the Hardware constructor time_lo (datetime): The earliest date when this hardware configuration is valid time_hi (datetime): The latest date when this hardware configuration is valid """ log = Logger.get() # The timestamp for this run. runtime = None if rundate is None: runtime = datetime.now(tz=timezone.utc) else: try: runtime = datetime.strptime(rundate, "%Y-%m-%dT%H:%M:%S%z") except ValueError: runtime = datetime.strptime(rundate, "%Y-%m-%dT%H:%M:%S") msg = "Requested run date '{}' is not timezone-aware. Assuming UTC.".format(runtime) log.warning(msg) runtime = runtime.replace(tzinfo=timezone.utc) runtimestr = None try: runtimestr = runtime.isoformat(timespec="seconds") except TypeError: runtimestr = runtime.isoformat() # Get the focalplane information fp = None exclude = None state = None create_time = "UNKNOWN" time_lo = time_hi = None if focalplane is None: fp, exclude, state, create_time, time_lo, time_hi = dmio.load_focalplane(runtime, get_time_range=True) else: fp, exclude, state = focalplane # Get the plate scale platescale = dmio.load_platescale() # We are going to do a quadratic interpolation to the platescale on a fine grid, # and then use that for *linear* interpolation inside the compiled code. The # default platescale data is on a one mm grid spacing. We also do the same # interpolation of the arclength S(R). fine_radius = np.linspace( platescale["radius"][0], platescale["radius"][-1], num=10000, dtype=np.float64 ) fn = interp1d(platescale["radius"], platescale["theta"], kind="quadratic") fine_theta = fn(fine_radius).astype(np.float64) fn = interp1d(platescale["radius"], platescale["arclength"], kind="quadratic") fine_arc = fn(fine_radius).astype(np.float64) # We are only going to keep rows for LOCATIONs that are assigned to a # science or sky monitor positioner. log.info("Loaded focalplane for time stamp {}".format(runtime)) pos_rows = np.where(fp["DEVICE_TYPE"].astype(str) == "POS")[0] etc_rows = np.where(fp["DEVICE_TYPE"].astype(str) == "ETC")[0] keep_rows = np.unique(np.concatenate((pos_rows, etc_rows))) nloc = len(keep_rows) log.debug(" focalplane table keeping {} rows for POS and ETC devices".format(nloc)) device_type = np.full(nloc, "OOPSBUG", dtype="a8") device_type[:] = fp["DEVICE_TYPE"][keep_rows] locations = np.copy(fp["LOCATION"][keep_rows]) # Map location to row in the table loc_to_fp = dict() for rw, loc in enumerate(fp["LOCATION"]): loc_to_fp[loc] = rw # FIXME: Here we assume that the 32bit STATE column has the same bit # definitions as what is used by fiberassign (defined in hardware.h): # If this is not true, then re-map those values here inside the "state" # table loaded above. # Map location to row of the state table loc_to_state = dict() for rw, loc in enumerate(state["LOCATION"]): loc_to_state[loc] = rw # Slightly reformat the 'add_margins' dict: 'pos' gets copied to # 'theta' and 'phi'. margins = {} if 'pos' in add_margins: margins['theta'] = add_margins['pos'] margins['phi'] = add_margins['pos'] if 'gfa' in add_margins: margins['gfa'] = add_margins['gfa'] # PLUS -- and this is a HACK -- # because the 'petal' exclusion polygons are listed in the db dump files # in "left-handed" order, unlike the other polygons, we must NEGATE the # margin! if 'petal' in add_margins: margins['petal'] = -add_margins['petal'] # For each positioner, select the exclusion polynomials. positioners = dict() excl_cache = dict() for loc in locations: exclname = state["EXCLUSION"][loc_to_state[loc]] positioners[loc] = dict() posexcl = get_exclusions(exclude, exclname, margins, local_cache=excl_cache) positioners[loc]["theta"] = Shape(posexcl["theta"]) positioners[loc]["phi"] = Shape(posexcl["phi"]) if "gfa" in posexcl: positioners[loc]["gfa"] = Shape(posexcl["gfa"]) else: positioners[loc]["gfa"] = Shape() if "petal" in posexcl: positioners[loc]["petal"] = Shape(posexcl["petal"]) else: positioners[loc]["petal"] = Shape() del excl_cache if "MIN_P" in state.colnames: # This is a new-format focalplane model (after desimodel PR #143) Istate = np.array([loc_to_state[x] for x in locations]) Ifp = np.array([loc_to_fp [x] for x in locations]) args = ( runtimestr, locations, fp["PETAL"][keep_rows], fp["DEVICE"][keep_rows], fp["SLITBLOCK"][keep_rows], fp["BLOCKFIBER"][keep_rows], fp["FIBER"][keep_rows], device_type, fp["OFFSET_X"][keep_rows], fp["OFFSET_Y"][keep_rows], state['STATE'][Istate], fp['OFFSET_T'][Ifp], state['MIN_T'][Istate], state['MAX_T'][Istate], state['POS_T'][Istate], fp['LENGTH_R1'][Ifp], fp['OFFSET_P'][Ifp], state['MIN_P'][Istate], state['MAX_P'][Istate], state['POS_P'][Istate], fp['LENGTH_R2'][Ifp], fine_radius, fine_theta, fine_arc, [positioners[x]["theta"] for x in locations], [positioners[x]["phi"] for x in locations], [positioners[x]["gfa"] for x in locations], [positioners[x]["petal"] for x in locations], add_margins ) else: # This is an old-format focalplane model (prior to desimodel PR #143). For # stuck positioners, we want to specify a default POS_T / POS_P to use. # These old models did not include any information about that, so we use the # minimum Theta value and either the maximum Phi value or PI, whichever is # smaller fake_pos_p = np.zeros(len(locations), dtype=np.float64) fake_pos_t = np.zeros(len(locations), dtype=np.float64) for ilid, lid in enumerate(locations): pt = fp["MIN_T"][loc_to_fp[lid]] + fp["OFFSET_T"][loc_to_fp[lid]] pp = fp["MAX_P"][loc_to_fp[lid]] + fp["OFFSET_P"][loc_to_fp[lid]] if pp > 180.0: pp = 180.0 fake_pos_p[ilid] = pp fake_pos_t[ilid] = pt args = ( runtimestr, locations, fp["PETAL"][keep_rows], fp["DEVICE"][keep_rows], fp["SLITBLOCK"][keep_rows], fp["BLOCKFIBER"][keep_rows], fp["FIBER"][keep_rows], device_type, fp["OFFSET_X"][keep_rows], fp["OFFSET_Y"][keep_rows], np.array([state["STATE"][loc_to_state[x]] for x in locations]), np.array([fp["OFFSET_T"][loc_to_fp[x]] for x in locations]), np.array([fp["MIN_T"][loc_to_fp[x]] for x in locations]), np.array([fp["MAX_T"][loc_to_fp[x]] for x in locations]), fake_pos_t, np.array([fp["LENGTH_R1"][loc_to_fp[x]] for x in locations]), np.array([fp["OFFSET_P"][loc_to_fp[x]] for x in locations]), np.array([fp["MIN_P"][loc_to_fp[x]] for x in locations]), np.array([fp["MAX_P"][loc_to_fp[x]] for x in locations]), fake_pos_p, np.array([fp["LENGTH_R2"][loc_to_fp[x]] for x in locations]), fine_radius, fine_theta, fine_arc, [positioners[x]["theta"] for x in locations], [positioners[x]["phi"] for x in locations], [positioners[x]["gfa"] for x in locations], [positioners[x]["petal"] for x in locations], add_margins ) aa = [] for a in args: if type(a) == astropy.table.Column: a = a.data aa.append(a) return aa, time_lo, time_hi def radec2xy(hw, tile_ra, tile_dec, tile_obstime, tile_obstheta, tile_obsha, ra, dec, use_cs5, threads=0): ''' For the tile pointed at (tilera, tiledec), project the (ra, dec) value into X/Y mm. Args: hw: Hardware object tile_ra (float): Tile RA tile_dec (float): Tile Dec tile_obstime (string): Tile observation time, YYYY-MM-DDTHH:MM:SS / Astropy "isot" format. tile_obstheta (float): Tile "fieldrot" rotation angle. tile_obsha (float): Tile designed Hour Angle, in degrees. ra (numpy array): RA to project, in degrees dec (numpy array): Dec to project, in degrees use_CS5 (bool): If True, return CS5 coordinates, else curved. threads=0 (int): currently unused; for backward compatibility. Returns: x, y: numpy arrays: the (X, Y) projected locations. ''' #xy = hw.radec2xy_multi( # tile_ra, tile_dec, tile_obstheta, ra, dec, use_cs5, threads=0 #) #x = np.array([x for x,y in xy]) #y = np.array([y for x,y in xy]) from astropy.time import Time from desimeter.fiberassign import fiberassign_radec2xy_cs5, fiberassign_radec2xy_flat # Note that MJD is only used for precession, so no need for # high precision. t = Time(tile_obstime, format='isot') mjd = t.mjd # Don't pass adc[12]: Let desimeter use its pm-alike routines if use_cs5: x, y = fiberassign_radec2xy_cs5(ra, dec, tile_ra, tile_dec, mjd, tile_obsha, tile_obstheta) else: x, y = fiberassign_radec2xy_flat(ra, dec, tile_ra, tile_dec, mjd, tile_obsha, tile_obstheta) return x,y def xy2radec(hw, tile_ra, tile_dec, tile_obstime, tile_obstheta, tile_obsha, x, y, use_cs5, threads=0): ''' For the tile pointed at (tilera, tiledec), compute the RA,Dec pointing of the specified X/Y location in millimeters. Args: hw: Hardware object tile_obstime (string): Tile observation time, YYYY-MM-DDTHH:MM:SS / Astropy "isot" format. tile_obstheta (float): Tile "fieldrot" rotation angle. tile_obsha (float): Tile designed Hour Angle, in degrees. x (numpy array): X position in mm. y (numpy array): Y position in mm. use_CS5 (bool): If True, assume X,Y are CS5 coordinates, else curved. threads=0 (int): currently unused; for backward compatibility. Returns: ra, dec (numpy arrays): the (RA, Dec) values of the focalplane locations, in degrees. ''' # radec = hw.xy2radec_multi( # tile_ra, tile_dec, tile_obstheta, x, y, use_cs5, threads # ) # ra = np.array([r for r,d in radec]) # dec = np.array([d for r,d in radec]) from desimeter.fiberassign import fiberassign_cs5_xy2radec, fiberassign_flat_xy2radec from astropy.time import Time t = Time(tile_obstime, format='isot') mjd = t.mjd if use_cs5: ra,dec = fiberassign_cs5_xy2radec(x, y, tile_ra, tile_dec, mjd, tile_obsha, tile_obstheta) else: ra,dec = fiberassign_flat_xy2radec(x, y, tile_ra, tile_dec, mjd, tile_obsha, tile_obstheta) return ra,dec def xy2cs5(x, y): ''' Converts from curved focal-plane X,Y coordinates in mm into CS5 coordinates in mm. Args: x (numpy array): X coord (mm) y (numpy array): Y coord (mm) Returns: cs5x (numpy array): CS5 X coord (mm) cs5y (numpy array): CS5 Y coord (mm) ''' # There's a change in terminology between the focal-plane team and # the outside world here... from desimeter.transform.pos2ptl import flat2ptl return flat2ptl(x, y) def cs52xy(x, y): ''' Converts from CS5 coordinates (mm) into curved focal-plane X,Y coordinates in mm. Args: cs5x (numpy array): CS5 X coord (mm) cs5y (numpy array): CS5 Y coord (mm) Returns: x (numpy array): X coord (mm) y (numpy array): Y coord (mm) ''' # There's a change in terminology between the focal-plane team and # the outside world here... from desimeter.transform.pos2ptl import ptl2flat return ptl2flat(x, y)