Source code for pyuvdata.utils.times

# Copyright (c) 2024 Radio Astronomy Software Group
# Licensed under the 2-clause BSD License
"""Utilities for working LSTs."""

import warnings

import erfa
import numpy as np
from astropy import units
from astropy.time import Time
from astropy.utils import iers

from .coordinates import get_loc_obj
from .tools import (
    _is_between,
    _strict_raise,
    _test_array_constant,
    _test_array_constant_spacing,
    _where_combine,
)


[docs]def get_lst_for_time( jd_array=None, *, telescope_loc=None, latitude=None, longitude=None, altitude=None, astrometry_library=None, frame="itrs", ellipsoid=None, ): """ Get the local apparent sidereal time for a set of jd times at an earth location. This function calculates the local apparent sidereal time (LAST), given a UTC time and a position on the Earth, using either the astropy or NOVAS libraries. It is important to note that there is an apporoximate 20 microsecond difference between the two methods, presumably due to small differences in the apparent reference frame. These differences will cancel out when calculating coordinates in the TOPO frame, so long as apparent coordinates are calculated using the same library (i.e., astropy or NOVAS). Failing to do so can introduce errors up to ~1 mas in the horizontal coordinate system (i.e., AltAz). Parameters ---------- jd_array : ndarray of float JD times to get lsts for. telescope_loc : tuple or EarthLocation or MoonLocation Alternative way of specifying telescope lat/lon/alt, either as a 3-element tuple with latitude and longitude in degrees, or as an astropy EarthLocation (or lunarsky MoonLocation). Cannot supply both `telescope_loc` and `latitute`, `longitude`, or `altitude`. latitude : float Latitude of location to get lst for in degrees. Cannot specify both `latitute` and `telescope_loc`. longitude : float Longitude of location to get lst for in degrees. Cannot specify both `longitude` and `telescope_loc`. altitude : float Altitude of location to get lst for in meters. Cannot specify both `altitude` and `telescope_loc`. astrometry_library : str Library used for running the LST calculations. Allowed options are 'erfa' (which uses the pyERFA), 'novas' (which uses the python-novas library), and 'astropy' (which uses the astropy utilities). Default is erfa unless the telescope_location is a MoonLocation object, in which case the default is astropy. frame : str Reference frame for latitude/longitude/altitude. Options are itrs (default) or mcmf. Not used if telescope_loc is an EarthLocation or MoonLocation object. ellipsoid : str Ellipsoid to use for lunar coordinates. Must be one of "SPHERE", "GSFC", "GRAIL23", "CE-1-LAM-GEO" (see lunarsky package for details). Default is "SPHERE". Only used if frame is mcmf. Not used if telescope_loc is an EarthLocation or MoonLocation object. Returns ------- ndarray of float LASTs in radians corresponding to the jd_array. """ if telescope_loc is None: if all(item is not None for item in [latitude, longitude, altitude]): telescope_loc = (latitude, longitude, altitude) else: raise ValueError( "Must supply all of latitude, longitude and altitude if " "telescope_loc is not supplied" ) else: if not all(item is None for item in [latitude, longitude, altitude]): raise ValueError( "Cannot set both telescope_loc and latitude/longitude/altitude" ) site_loc, on_moon = get_loc_obj( telescope_loc, telescope_frame=frame, ellipsoid=ellipsoid, angle_units=units.deg, return_moon=True, ) if astrometry_library is None: if not on_moon: astrometry_library = "erfa" else: astrometry_library = "astropy" if astrometry_library not in ["erfa", "astropy", "novas"]: raise ValueError( "Requested coordinate transformation library is not supported, please " "select either 'erfa' or 'astropy' for astrometry_library." ) if isinstance(jd_array, np.ndarray): lst_array = np.zeros_like(jd_array) if lst_array.ndim == 0: lst_array = lst_array.reshape(1) else: lst_array = np.zeros(1) jd, reverse_inds = np.unique(jd_array, return_inverse=True) if not on_moon: TimeClass = Time else: if not astrometry_library == "astropy": raise NotImplementedError( "The MCMF frame is only supported with the 'astropy' astrometry library" ) from lunarsky import Time as LTime TimeClass = LTime times = TimeClass(jd, format="jd", scale="utc", location=site_loc) if iers.conf.auto_max_age is None: # pragma: no cover delta, status = times.get_delta_ut1_utc(return_status=True) if np.any( np.isin(status, (iers.TIME_BEFORE_IERS_RANGE, iers.TIME_BEYOND_IERS_RANGE)) ): warnings.warn( "time is out of IERS range, setting delta ut1 utc to extrapolated value" ) times.delta_ut1_utc = delta if astrometry_library == "erfa": # This appears to be what astropy is using under the hood, # so it _should_ be totally consistent. gast_array = erfa.gst06a( times.ut1.jd1, times.ut1.jd2, times.tt.jd1, times.tt.jd2 ) # Technically one should correct for the polar wobble here, but the differences # along the equitorial are miniscule -- of order 10s of nanoradians, well below # the promised accuracy of IERS -- and rotation matricies can be expensive. # We do want to correct though for for secular polar drift (s'/TIO locator), # which nudges the Earth rotation angle of order 47 uas per century. sp = erfa.sp00(times.tt.jd1, times.tt.jd2) lst_array = np.mod(gast_array + sp + site_loc.lon.rad, 2.0 * np.pi)[ reverse_inds ] elif astrometry_library == "astropy": lst_array = times.sidereal_time("apparent").radian if lst_array.ndim == 0: lst_array = lst_array.reshape(1) lst_array = lst_array[reverse_inds] elif astrometry_library == "novas": # Import the NOVAS library only if it's needed/available. try: import novas_de405 # noqa from novas import compat as novas from novas.compat import eph_manager except ImportError as e: raise ImportError( "novas and/or novas_de405 are not installed but is required for " "NOVAS functionality" ) from e jd_start, jd_end, number = eph_manager.ephem_open() tt_time_array = times.tt.value ut1_high_time_array = times.ut1.jd1 ut1_low_time_array = times.ut1.jd2 full_ut1_time_array = ut1_high_time_array + ut1_low_time_array polar_motion_data = iers.earth_orientation_table.get() delta_x_array = np.interp( times.mjd, polar_motion_data["MJD"].value, polar_motion_data["dX_2000A_B"].value, left=0.0, right=0.0, ) delta_y_array = np.interp( times.mjd, polar_motion_data["MJD"].value, polar_motion_data["dY_2000A_B"].value, left=0.0, right=0.0, ) # Catch the case where we don't have CIP delta values yet (they don't typically # have predictive values like the polar motion does) delta_x_array[np.isnan(delta_x_array)] = 0.0 delta_y_array[np.isnan(delta_y_array)] = 0.0 for idx in range(len(times)): novas.cel_pole( tt_time_array[idx], 2, delta_x_array[idx], delta_y_array[idx] ) # The NOVAS routine will return Greenwich Apparent Sidereal Time (GAST), # in units of hours lst_array[reverse_inds == idx] = novas.sidereal_time( ut1_high_time_array[idx], ut1_low_time_array[idx], (tt_time_array[idx] - full_ut1_time_array[idx]) * 86400.0, ) # Add the telescope lon to convert from GAST to LAST (local) lst_array = np.mod(lst_array + (site_loc.lon.deg / 15.0), 24.0) # Convert from hours back to rad lst_array *= np.pi / 12.0 lst_array = np.reshape(lst_array, jd_array.shape) return lst_array
[docs]def check_lsts_against_times( *, jd_array, lst_array, lst_tols, latitude=None, longitude=None, altitude=None, frame="itrs", ellipsoid=None, telescope_loc=None, ): """ Check that LSTs are consistent with the time_array and telescope location. This just calls `get_lst_for_time`, compares that result to the `lst_array` and warns if they are not within the tolerances specified by `lst_tols`. Parameters ---------- jd_array : ndarray of float JD times to get lsts for. lst_array : ndarray of float LSTs to check to see if they match the jd_array at the location. latitude : float Latitude of location to check the lst for in degrees. longitude : float Longitude of location to check the lst for in degrees. altitude : float Altitude of location to check the lst for in meters. lst_tops : tuple of float A length 2 tuple giving the (relative, absolute) tolerances to check the LST agreement to. These are passed directly to numpy.allclose. frame : str Reference frame for latitude/longitude/altitude. Options are itrs (default) or mcmf. ellipsoid : str Ellipsoid to use for lunar coordinates. Must be one of "SPHERE", "GSFC", "GRAIL23", "CE-1-LAM-GEO" (see lunarsky package for details). Default is "SPHERE". Only used if frame is mcmf. telescope_loc : tuple or EarthLocation or MoonLocation Alternative way of specifying telescope lat/lon/alt, either as a 3-element tuple or as an astropy EarthLocation (or lunarsky MoonLocation). Cannot supply both `telescope_loc` and `latitute`, `longitude`, or `altitude`. Returns ------- None Warns ----- If the `lst_array` does not match the calculated LSTs to the lst_tols. """ # Don't worry about passing the astrometry library because we test that they agree # to better than our standard lst tolerances. lsts = get_lst_for_time( jd_array=jd_array, telescope_loc=telescope_loc, latitude=latitude, longitude=longitude, altitude=altitude, frame=frame, ellipsoid=ellipsoid, ) if not np.allclose(lst_array, lsts, rtol=lst_tols[0], atol=lst_tols[1]): warnings.warn( "The lst_array is not self-consistent with the time_array and " "telescope location. Consider recomputing with the " "`set_lsts_from_time_array` method." )
[docs]def _select_times_helper( *, times, time_range, lsts, lst_range, obj_time_array, obj_time_range, obj_lst_array, obj_lst_range, time_tols, lst_tols, time_inds=None, invert=False, strict=False, warn_spacing=False, ): """ Get time indices in a select. Parameters ---------- times : array_like of float The times to keep in the object, each value passed here should exist in the time_array. Can be None, cannot be set with `time_range`, `lsts` or `lst_array`. time_range : array_like of float The time range in Julian Date to keep in the object, must be length 2. Some of the times in the object should fall between the first and last elements. Can be None, cannot be set with `times`, `lsts` or `lst_array`. lsts : array_like of float The local sidereal times (LSTs) to keep in the object, each value passed here should exist in the lst_array. Can be None, cannot be set with `times`, `time_range`, or `lst_range`. lst_range : array_like of float The local sidereal time (LST) range in radians to keep in the object, must be of length 2. Some of the LSTs in the object should fall between the first and last elements. If the second value is smaller than the first, the LSTs are treated as having phase-wrapped around LST = 2*pi = 0, and the LSTs kept on the object will run from the larger value, through 0, and end at the smaller value. Can be None, cannot be set with `times`, `time_range`, or `lsts`. obj_time_array : array_like of float Time array on object. Can be None if `object_time_range` is set. obj_time_range : array_like of float Time range on object. Can be None if `object_time_array` is set. obj_lst_array : array_like of float LST array on object. Can be None if `object_lst_range` is set. obj_lst_range : array_like of float LST range on object. Can be None if `object_lst_array` is set. time_tols : tuple of float Length 2 tuple giving (rtol, atol) to use for time matching. lst_tols : tuple of float Length 2 tuple giving (rtol, atol) to use for lst matching. time_inds : array_like of int, optional The time indices to keep in the object. This is not commonly used. invert : bool Normally indices matching given criteria are what are included in the subsequent list. However, if set to True, these indices are excluded instead. Default is False. strict : bool or None Normally, select will warn when an element of the selection criteria does not match any element for the parameter, as long as the selection criteria results in *at least one* element being selected. However, if set to True, an error is thrown if any selection criteria does not match what is given for the object parameters element. If set to None, then neither errors nor warnings are raised, unless no records are selected. Default is False. warn_spacing Whether or not to warn about time spacing. Only used if no ranges from the object are supplied. Default is False. Returns ------- time_inds : list of int Indices of times to keep on the object. selections : list of str list of selections done. """ have_times = times is not None have_time_range = time_range is not None have_lsts = lsts is not None have_lst_range = lst_range is not None n_time_params = np.count_nonzero( [have_times, have_time_range, have_lsts, have_lst_range] ) if n_time_params > 1: raise ValueError( "Only one of [times, time_range, lsts, lst_range] may be " "specified per selection operation." ) if n_time_params == 0: return None, [] if time_range is not None: time_range = np.asarray(time_range) if time_range.size != 2: raise ValueError("time_range must be length 2.") if have_lsts and np.any(np.asarray(lsts) > 2 * np.pi): warnings.warn( "The lsts parameter contained a value greater than 2*pi. " "LST values are assumed to be in radians, not hours." ) if have_lst_range: lst_range = np.asarray(lst_range) if lst_range.size != 2: raise ValueError("lst_range must be length 2.") if np.any(lst_range > 2 * np.pi): warnings.warn( "The lst_range contained a value greater than 2*pi. " "LST values are assumed to be in radians, not hours." ) selections = ["lsts"] if (have_lsts or have_lst_range) else ["times"] sel_name = "LST" if (have_lsts or have_lst_range) else "Time" obj_range = obj_lst_range if (have_lsts or have_lst_range) else obj_time_range obj_array = obj_lst_array if (have_lsts or have_lst_range) else obj_time_array sel_range = lst_range if (have_lsts or have_lst_range) else time_range sel_array = lsts if (have_lsts or have_lst_range) else times rtol, atol = lst_tols if (have_lsts or have_lst_range) else time_tols if obj_range is not None: err_txt = "does not fall in any" attr_name = "lst_range" if (have_lsts or have_lst_range) else "time_range" else: err_txt = "is not present in the" attr_name = "lst_array" if (have_lsts or have_lst_range) else "time_array" if sel_range is not None: # This is a range-based selection, so act accordingly if obj_range is not None: wrap_range = have_lst_range and ( (sel_range[0] > sel_range[1]) or any(obj_range[:, 0] < obj_range[:, 1]) ) if wrap_range: # If we _do_ need to wrap ranges, obj_range = np.mod(obj_range, 2 * np.pi) lo_lim, hi_lim = np.mod(sel_range, 2 * np.pi) if lo_lim > hi_lim: lo_lim -= 2 * np.pi # If LST is wrapped in obj_range, then do the unwrap obj_range[obj_range[:, 0] > obj_range[:, 1], 1] += 2 * np.pi # Check that the select end-time is after the range start-time and the # select start-time is before the range end-time. mask = (obj_range[:, 0] <= hi_lim) & (obj_range[:, 1] >= lo_lim) # Now check the wrap -- push the start/stop up one wrap, and then # re-evaluate the mask one one time. lo_lim += 2 * np.pi hi_lim += 2 * np.pi mask |= (obj_range[:, 0] <= hi_lim) & (obj_range[:, 1] >= lo_lim) else: # If we don't need to wrap ranges, then this is a lot easier to handle. # Check that the select end-time is after the range start-time and the # select start-time is before the range end-time. mask = obj_range[:, 0] <= sel_range[1] mask &= obj_range[:, 1] >= sel_range[0] else: wrap_range = have_lst_range and (sel_range[0] > sel_range[1]) # Get sel_range into the correct shape mask = _is_between(obj_array, sel_range, wrap=wrap_range) if not any(mask): msg = ( f"No elements in {attr_name} between {sel_range[0]} and {sel_range[1]}." ) _strict_raise(msg, strict=strict) else: # This is a match-based selection, so move forward accordingly if obj_range is None: # Because of tols, everything is in effect a range -- construct the # effect object ranges that we need. del_range = np.maximum(abs(obj_array * rtol), abs(atol)) obj_range = np.vstack([obj_array - del_range, obj_array + del_range]).T mask = np.zeros(len(obj_range), dtype=bool) for item in np.asarray(sel_array).flat: submask = _is_between(item, obj_range, wrap=have_lsts) if not any(submask): msg = f"{sel_name} {item} {err_txt} {attr_name}." _strict_raise(msg, strict=strict) mask |= submask time_inds = _where_combine(mask, inds=time_inds, invert=invert) if len(time_inds) == 0: raise ValueError( f"No data matching this {sel_name.lower()} selection present in object." ) time_inds = time_inds.tolist() if warn_spacing: if (len(time_inds) > 1) and (obj_time_range is not None): warnings.warn( "Selected times include multiple time ranges. This " "is not supported by some file formats." ) elif ( (obj_time_range is None) and (obj_time_array is not None) and (not _test_array_constant(np.diff(obj_time_array[time_inds]))) ): warnings.warn( "Selected times are not evenly spaced. This " "is not supported by some file formats." ) return time_inds, selections
[docs]def _check_time_spacing( *, times_array, time_range=None, time_tols=None, integration_time=None, int_tols=None, strict=True, ): """ Check if times are evenly spaced. This is a requirement for calfits files. Parameters ---------- times_array : array-like of float or UVParameter Array of times, shape (Ntimes,). time_range : array-like of float or UVParameter Array of time ranges, shape (Ntime_ranges). Optional. time_tols : tuple of float time_array tolerances (from uvobj._time_array.tols). Optional. integration_time : array-like of float or UVParameter Array of integration times, shape (Ntimes,). Optional. channel_width_tols : tuple of float integration_time tolerances (from uvobj._integration_time.tols). Optional. strict : bool If set to True, then the function will raise an error if checks are failed. If set to False, then a warning is raised instead. If set to None, then no errors or warnings are raised. Default is True. """ # Import UVParameter here rather than at the top to avoid circular imports from pyuvdata.parameter import UVParameter if isinstance(time_range, UVParameter): time_range = time_range.value if not _test_array_constant_spacing(times_array, tols=time_tols): err_msg = ( "The times are not evenly spaced. This will make it impossible to write " "this data out to calfits." ) _strict_raise(err_msg=err_msg, strict=strict) if time_range is not None and len(time_range) != 1: err_msg = ( "Object contains multiple time ranges. This will make it impossible to " "write this data out to calfits." ) _strict_raise(err_msg=err_msg, strict=strict) if not _test_array_constant(integration_time): err_msg = ( "The integration times are variable. The calfits format " "does not support variable integration times." ) _strict_raise(err_msg=err_msg, strict=strict)