# Copyright (c) 2024 Radio Astronomy Software Group
# Licensed under the 2-clause BSD License
"""Utilities for phasing."""
from copy import deepcopy
import erfa
import numpy as np
from astropy import units
from astropy.coordinates import AltAz, Distance, EarthLocation, SkyCoord
from astropy.time import Time
from astropy.utils import iers
from . import _phasing
from .coordinates import get_loc_obj
from .times import get_lst_for_time
from .tools import _nants_to_nblts, _ntimes_to_nblts
[docs]def old_uvw_calc(ra, dec, initial_uvw):
"""
Calculate old uvws from unphased ones in an icrs or gcrs frame.
This method should not be used and is only retained for testing the
undo_old_uvw_calc method, which is needed for fixing phases.
This code expects input uvws relative to the telescope location in the same frame
that ra/dec are in (e.g. icrs or gcrs) and returns phased ones in the same frame.
Parameters
----------
ra : float
Right ascension of phase center.
dec : float
Declination of phase center.
initial_uvw : ndarray of float
Unphased uvws or positions relative to the array center,
shape (Nlocs, 3).
Returns
-------
uvw : ndarray of float
uvw array in the same frame as initial_uvws, ra and dec.
"""
if initial_uvw.ndim == 1:
initial_uvw = initial_uvw[np.newaxis, :]
return _phasing._old_uvw_calc(
np.float64(ra),
np.float64(dec),
np.ascontiguousarray(initial_uvw.T, dtype=np.float64),
).T
[docs]def undo_old_uvw_calc(ra, dec, uvw):
"""
Undo the old phasing calculation on uvws in an icrs or gcrs frame.
This code expects phased uvws or positions in the same frame that ra/dec
are in (e.g. icrs or gcrs) and returns unphased ones in the same frame.
Parameters
----------
ra : float
Right ascension of phase center.
dec : float
Declination of phase center.
uvw : ndarray of float
Phased uvws or positions relative to the array center,
shape (Nlocs, 3).
Returns
-------
unphased_uvws : ndarray of float
Unphased uvws or positions relative to the array center,
shape (Nlocs, 3).
"""
if uvw.ndim == 1:
uvw = uvw[np.newaxis, :]
return _phasing._undo_old_uvw_calc(
np.float64(ra), np.float64(dec), np.ascontiguousarray(uvw.T, dtype=np.float64)
).T
[docs]def polar2_to_cart3(*, lon_array, lat_array):
"""
Convert 2D polar coordinates into 3D cartesian coordinates.
This is a simple routine for converting a set of spherical angular coordinates
into a 3D cartesian vectors, where the x-direction is set by the position (0, 0).
Parameters
----------
lon_array : float or ndarray
Longitude coordinates, which increases in the counter-clockwise direction.
Units of radians. Can either be a float or ndarray -- if the latter, must have
the same shape as lat_array.
lat_array : float or ndarray
Latitude coordinates, where 0 falls on the equator of the sphere. Units of
radians. Can either be a float or ndarray -- if the latter, must have the same
shape as lat_array.
Returns
-------
xyz_array : ndarray of float
Cartesian coordinates of the given longitude and latitude on a unit sphere.
Shape is (3, coord_shape), where coord_shape is the shape of lon_array and
lat_array if they were provided as type ndarray, otherwise (3,).
"""
# Check to make sure that we are not playing with mixed types
if type(lon_array) is not type(lat_array):
raise ValueError(
"lon_array and lat_array must either both be floats or ndarrays."
)
if isinstance(lon_array, np.ndarray) and lon_array.shape != lat_array.shape:
raise ValueError("lon_array and lat_array must have the same shape.")
# Once we know that lon_array and lat_array are of the same shape,
# time to create our 3D set of vectors!
xyz_array = np.array(
[
np.cos(lon_array) * np.cos(lat_array),
np.sin(lon_array) * np.cos(lat_array),
np.sin(lat_array),
],
dtype=float,
)
return xyz_array
[docs]def cart3_to_polar2(xyz_array):
"""
Convert 3D cartesian coordinates into 2D polar coordinates.
This is a simple routine for converting a set of 3D cartesian vectors into
spherical coordinates, where the position (0, 0) lies along the x-direction.
Parameters
----------
xyz_array : ndarray of float
Cartesian coordinates, need not be of unit vector length. Shape is
(3, coord_shape).
Returns
-------
lon_array : ndarray of float
Longitude coordinates, which increases in the counter-clockwise direction.
Units of radians, shape is (coord_shape,).
lat_array : ndarray of float
Latitude coordinates, where 0 falls on the equator of the sphere. Units of
radians, shape is (coord_shape,).
"""
if not isinstance(xyz_array, np.ndarray):
raise ValueError("xyz_array must be an ndarray.")
if xyz_array.ndim == 0:
raise ValueError("xyz_array must have ndim > 0")
if xyz_array.shape[0] != 3:
raise ValueError("xyz_array must be length 3 across the zeroth axis.")
# The longitude coord is relatively easy to calculate, just take the X and Y
# components and find the arctac of the pair.
lon_array = np.mod(np.arctan2(xyz_array[1], xyz_array[0]), 2.0 * np.pi, dtype=float)
# If we _knew_ that xyz_array was always of length 1, then this call could be a much
# simpler one to arcsin. But to make this generic, we'll use the length of the XY
# component along with arctan2.
lat_array = np.arctan2(
xyz_array[2], np.sqrt((xyz_array[0:2] ** 2.0).sum(axis=0)), dtype=float
)
# Return the two arrays
return lon_array, lat_array
[docs]def _rotate_matmul_wrapper(*, xyz_array, rot_matrix, n_rot):
"""
Apply a rotation matrix to a series of vectors.
This is a simple convenience function which wraps numpy's matmul function for use
with various vector rotation functions in this module. This code could, in
principle, be replaced by a cythonized piece of code, although the matmul function
is _pretty_ well optimized already. This function is not meant to be called by
users, but is instead used by multiple higher-level utility functions (namely those
that perform rotations).
Parameters
----------
xyz_array : ndarray of floats
Array of vectors to be rotated. When nrot > 1, shape may be (n_rot, 3, n_vec)
or (1, 3, n_vec), the latter is useful for when performing multiple rotations
on a fixed set of vectors. If nrot = 1, shape may be (1, 3, n_vec), (3, n_vec),
or (3,).
rot_matrix : ndarray of floats
Series of rotation matricies to be applied to the stack of vectors. Must be
of shape (n_rot, 3, 3)
n_rot : int
Number of individual rotation matricies to be applied.
Returns
-------
rotated_xyz : ndarray of floats
Array of vectors that have been rotated, of shape (n_rot, 3, n_vectors,).
"""
# Do a quick check to make sure that things look sensible
if rot_matrix.shape != (n_rot, 3, 3):
raise ValueError(
f"rot_matrix must be of shape (n_rot, 3, 3), where n_rot={n_rot}."
)
if (xyz_array.ndim == 3) and (
(xyz_array.shape[0] not in [1, n_rot]) or (xyz_array.shape[-2] != 3)
):
raise ValueError("Misshaped xyz_array - expected shape (n_rot, 3, n_vectors).")
if (xyz_array.ndim < 3) and (xyz_array.shape[0] != 3):
raise ValueError("Misshaped xyz_array - expected shape (3, n_vectors) or (3,).")
rotated_xyz = np.matmul(rot_matrix, xyz_array)
return rotated_xyz
[docs]def _rotate_one_axis(xyz_array, *, rot_amount, rot_axis):
"""
Rotate an array of 3D positions around the a single axis (x, y, or z).
This function performs a basic rotation of 3D vectors about one of the priciple
axes -- the x-axis, the y-axis, or the z-axis.
Note that the rotations here obey the right-hand rule -- that is to say, from the
perspective of the positive side of the axis of rotation, a positive rotation will
cause points on the plane intersecting this axis to move in a counter-clockwise
fashion.
Parameters
----------
xyz_array : ndarray of float
Set of 3-dimensional vectors be rotated, in typical right-handed cartesian
order, e.g. (x, y, z). Shape is (Nrot, 3, Nvectors).
rot_amount : float or ndarray of float
Amount (in radians) to rotate the given set of coordinates. Can either be a
single float (or ndarray of shape (1,)) if rotating all vectors by the same
amount, otherwise expected to be shape (Nrot,).
rot_axis : int
Axis around which the rotation is applied. 0 is the x-axis, 1 is the y-axis,
and 2 is the z-axis.
Returns
-------
rotated_xyz : ndarray of float
Set of rotated 3-dimensional vectors, shape (Nrot, 3, Nvector).
"""
# If rot_amount is None or all zeros, then this is just one big old no-op.
if (rot_amount is None) or np.all(rot_amount == 0.0):
if np.ndim(xyz_array) == 1:
return deepcopy(xyz_array[np.newaxis, :, np.newaxis])
elif np.ndim(xyz_array) == 2:
return deepcopy(xyz_array[np.newaxis, :, :])
else:
return deepcopy(xyz_array)
# Check and see how big of a rotation matrix we need
n_rot = 1 if (not isinstance(rot_amount, np.ndarray)) else (rot_amount.shape[0])
n_vec = xyz_array.shape[-1]
# The promotion of values to float64 is to suppress numerical precision issues,
# since the matrix math can - in limited circumstances - introduce precision errors
# of order 10x the limiting numerical precision of the float. For a float32/single,
# thats a part in 1e6 (~arcsec-level errors), but for a float64 it translates to
# a part in 1e15.
rot_matrix = np.zeros((3, 3, n_rot), dtype=np.float64)
# Figure out which pieces of the matrix we need to update
temp_jdx = (rot_axis + 1) % 3
temp_idx = (rot_axis + 2) % 3
# Fill in the rotation matricies accordingly
rot_matrix[rot_axis, rot_axis] = 1
rot_matrix[temp_idx, temp_idx] = np.cos(rot_amount, dtype=np.float64)
rot_matrix[temp_jdx, temp_jdx] = rot_matrix[temp_idx, temp_idx]
rot_matrix[temp_idx, temp_jdx] = np.sin(rot_amount, dtype=np.float64)
rot_matrix[temp_jdx, temp_idx] = -rot_matrix[temp_idx, temp_jdx]
# The rot matrix was shape (3, 3, n_rot) to help speed up filling in the elements
# of each matrix, but now we want to flip it into its proper shape of (n_rot, 3, 3)
rot_matrix = np.transpose(rot_matrix, axes=[2, 0, 1])
if (n_rot == 1) and (n_vec == 1) and (xyz_array.ndim == 3):
# This is a special case where we allow the rotation axis to "expand" along
# the 0th axis of the rot_amount arrays. For xyz_array, if n_vectors = 1
# but n_rot !=1, then it's a lot faster (by about 10x) to "switch it up" and
# swap the n_vector and n_rot axes, and then swap them back once everything
# else is done.
return np.transpose(
_rotate_matmul_wrapper(
xyz_array=np.transpose(xyz_array, axes=[2, 1, 0]),
rot_matrix=rot_matrix,
n_rot=n_rot,
),
axes=[2, 1, 0],
)
else:
return _rotate_matmul_wrapper(
xyz_array=xyz_array, rot_matrix=rot_matrix, n_rot=n_rot
)
[docs]def _rotate_two_axis(xyz_array, *, rot_amount1, rot_amount2, rot_axis1, rot_axis2):
"""
Rotate an array of 3D positions sequentially around a pair of axes (x, y, or z).
This function performs a sequential pair of basic rotations of 3D vectors about
the priciple axes -- the x-axis, the y-axis, or the z-axis.
Note that the rotations here obey the right-hand rule -- that is to say, from the
perspective of the positive side of the axis of rotation, a positive rotation will
cause points on the plane intersecting this axis to move in a counter-clockwise
fashion.
Parameters
----------
xyz_array : ndarray of float
Set of 3-dimensional vectors be rotated, in typical right-handed cartesian
order, e.g. (x, y, z). Shape is (Nrot, 3, Nvectors).
rot_amount1 : float or ndarray of float
Amount (in radians) of rotatation to apply during the first rotation of the
sequence, to the given set of coordinates. Can either be a single float (or
ndarray of shape (1,)) if rotating all vectors by the same amount, otherwise
expected to be shape (Nrot,).
rot_amount2 : float or ndarray of float
Amount (in radians) of rotatation to apply during the second rotation of the
sequence, to the given set of coordinates. Can either be a single float (or
ndarray of shape (1,)) if rotating all vectors by the same amount, otherwise
expected to be shape (Nrot,).
rot_axis1 : int
Axis around which the first rotation is applied. 0 is the x-axis, 1 is the
y-axis, and 2 is the z-axis.
rot_axis2 : int
Axis around which the second rotation is applied. 0 is the x-axis, 1 is the
y-axis, and 2 is the z-axis.
Returns
-------
rotated_xyz : ndarray of float
Set of rotated 3-dimensional vectors, shape (Nrot, 3, Nvector).
"""
# Capture some special cases upfront, where we can save ourselves a bit of work
no_rot1 = (rot_amount1 is None) or np.all(rot_amount1 == 0.0)
no_rot2 = (rot_amount2 is None) or np.all(rot_amount2 == 0.0)
if no_rot1 and no_rot2:
# If rot_amount is None, then this is just one big old no-op.
return deepcopy(xyz_array)
elif no_rot1:
# If rot_amount1 is None, then ignore it and just work w/ the 2nd rotation
return _rotate_one_axis(xyz_array, rot_amount=rot_amount2, rot_axis=rot_axis2)
elif no_rot2:
# If rot_amount2 is None, then ignore it and just work w/ the 1st rotation
return _rotate_one_axis(xyz_array, rot_amount=rot_amount1, rot_axis=rot_axis1)
elif rot_axis1 == rot_axis2:
# Capture the case where someone wants to do a sequence of rotations on the same
# axis. Also known as just rotating a single axis.
return _rotate_one_axis(
xyz_array, rot_amount=rot_amount1 + rot_amount2, rot_axis=rot_axis1
)
# Figure out how many individual rotation matricies we need, accounting for the
# fact that these can either be floats or ndarrays.
n_rot = max(
rot_amount1.shape[0] if isinstance(rot_amount1, np.ndarray) else 1,
rot_amount2.shape[0] if isinstance(rot_amount2, np.ndarray) else 1,
)
n_vec = xyz_array.shape[-1]
# The promotion of values to float64 is to suppress numerical precision issues,
# since the matrix math can - in limited circumstances - introduce precision errors
# of order 10x the limiting numerical precision of the float. For a float32/single,
# thats a part in 1e6 (~arcsec-level errors), but for a float64 it translates to
# a part in 1e15.
rot_matrix = np.empty((3, 3, n_rot), dtype=np.float64)
# There are two permulations per pair of axes -- when the pair is right-hand
# oriented vs left-hand oriented. Check here which one it is. For example,
# rotating first on the x-axis, second on the y-axis is considered a
# "right-handed" pair, whereas z-axis first, then y-axis would be considered
# a "left-handed" pair.
lhd_order = np.mod(rot_axis2 - rot_axis1, 3) != 1
temp_idx = [
np.mod(rot_axis1 - lhd_order, 3),
np.mod(rot_axis1 + 1 - lhd_order, 3),
np.mod(rot_axis1 + 2 - lhd_order, 3),
]
# We're using lots of sin and cos calculations -- doing them once upfront saves
# quite a bit of time by eliminating redundant calculations
sin_lo = np.sin(rot_amount2 if lhd_order else rot_amount1, dtype=np.float64)
cos_lo = np.cos(rot_amount2 if lhd_order else rot_amount1, dtype=np.float64)
sin_hi = np.sin(rot_amount1 if lhd_order else rot_amount2, dtype=np.float64)
cos_hi = np.cos(rot_amount1 if lhd_order else rot_amount2, dtype=np.float64)
# Take care of the diagonal terms first, since they aren't actually affected by the
# order of rotational opertations
rot_matrix[temp_idx[0], temp_idx[0]] = cos_hi
rot_matrix[temp_idx[1], temp_idx[1]] = cos_lo
rot_matrix[temp_idx[2], temp_idx[2]] = cos_lo * cos_hi
# Now time for the off-diagonal terms, as a set of 3 pairs. The rotation matrix
# for a left-hand oriented pair of rotation axes (e.g., x-rot, then y-rot) is just
# a transpose of the right-hand orientation of the same pair (e.g., y-rot, then
# x-rot).
rot_matrix[temp_idx[0 + lhd_order], temp_idx[1 - lhd_order]] = sin_lo * sin_hi
rot_matrix[temp_idx[0 - lhd_order], temp_idx[lhd_order - 1]] = (
cos_lo * sin_hi * ((-1.0) ** lhd_order)
)
rot_matrix[temp_idx[1 - lhd_order], temp_idx[0 + lhd_order]] = 0.0
rot_matrix[temp_idx[1 + lhd_order], temp_idx[2 - lhd_order]] = sin_lo * (
(-1.0) ** (1 + lhd_order)
)
rot_matrix[temp_idx[lhd_order - 1], temp_idx[0 - lhd_order]] = sin_hi * (
(-1.0) ** (1 + lhd_order)
)
rot_matrix[temp_idx[2 - lhd_order], temp_idx[1 + lhd_order]] = (
sin_lo * cos_hi * ((-1.0) ** (lhd_order))
)
# The rot matrix was shape (3, 3, n_rot) to help speed up filling in the elements
# of each matrix, but now we want to flip it into its proper shape of (n_rot, 3, 3)
rot_matrix = np.transpose(rot_matrix, axes=[2, 0, 1])
if (n_rot == 1) and (n_vec == 1) and (xyz_array.ndim == 3):
# This is a special case where we allow the rotation axis to "expand" along
# the 0th axis of the rot_amount arrays. For xyz_array, if n_vectors = 1
# but n_rot !=1, then it's a lot faster (by about 10x) to "switch it up" and
# swap the n_vector and n_rot axes, and then swap them back once everything
# else is done.
return np.transpose(
_rotate_matmul_wrapper( # xyz_array, rot_matrix, n_rot
xyz_array=np.transpose(xyz_array, axes=[2, 1, 0]),
rot_matrix=rot_matrix,
n_rot=n_rot,
),
axes=[2, 1, 0],
)
else:
return _rotate_matmul_wrapper(
xyz_array=xyz_array, rot_matrix=rot_matrix, n_rot=n_rot
)
[docs]def calc_uvw(
*,
app_ra=None,
app_dec=None,
frame_pa=None,
lst_array=None,
use_ant_pos=True,
uvw_array=None,
antenna_positions=None,
antenna_numbers=None,
ant_1_array=None,
ant_2_array=None,
old_app_ra=None,
old_app_dec=None,
old_frame_pa=None,
telescope_lat=None,
telescope_lon=None,
from_enu=False,
to_enu=False,
):
"""
Calculate an array of baseline coordinates, in either uvw or ENU.
This routine is meant as a convenience function for producing baseline coordinates
based under a few different circumstances:
1) Calculating ENU coordinates using antenna positions
2) Calculating uvw coordinates at a given sky position using antenna positions
3) Converting from ENU coordinates to uvw coordinates
4) Converting from uvw coordinate to ENU coordinates
5) Converting from uvw coordinates at one sky position to another sky position
Different conversion pathways have different parameters that are required.
Parameters
----------
app_ra : ndarray of float
Apparent RA of the target phase center, required if calculating baseline
coordinates in uvw-space (vs ENU-space). Shape is (Nblts,), units are
radians.
app_dec : ndarray of float
Apparent declination of the target phase center, required if calculating
baseline coordinates in uvw-space (vs ENU-space). Shape is (Nblts,),
units are radians.
frame_pa : ndarray of float
Position angle between the great circle of declination in the apparent frame
versus that of the reference frame, used for making sure that "North" on
the derived maps points towards a particular celestial pole (not just the
topocentric one). Required if not deriving baseline coordinates from antenna
positions, from_enu=False, and a value for old_frame_pa is given. Shape is
(Nblts,), units are radians.
old_app_ra : ndarray of float
Apparent RA of the previous phase center, required if not deriving baseline
coordinates from antenna positions and from_enu=False. Shape is (Nblts,),
units are radians.
old_app_dec : ndarray of float
Apparent declination of the previous phase center, required if not deriving
baseline coordinates from antenna positions and from_enu=False. Shape is
(Nblts,), units are radians.
old_frame_pa : ndarray of float
Frame position angle of the previous phase center, required if not deriving
baseline coordinates from antenna positions, from_enu=False, and a value
for frame_pa is supplied. Shape is (Nblts,), units are radians.
lst_array : ndarray of float
Local apparent sidereal time, required if deriving baseline coordinates from
antenna positions, or converting to/from ENU coordinates. Shape is (Nblts,).
use_ant_pos : bool
Switch to determine whether to derive uvw values from the antenna positions
(if set to True), or to use the previously calculated uvw coordinates to derive
new the new baseline vectors (if set to False). Default is True.
uvw_array : ndarray of float
Array of previous baseline coordinates (in either uvw or ENU), required if
not deriving new coordinates from antenna positions. Shape is (Nblts, 3).
antenna_positions : ndarray of float
List of antenna positions relative to array center in ECEF coordinates,
required if not providing `uvw_array`. Shape is (Nants, 3).
antenna_numbers: ndarray of int
List of antenna numbers, ordered in the same way as `antenna_positions` (e.g.,
`antenna_numbers[0]` should given the number of antenna that resides at ECEF
position given by `antenna_positions[0]`). Shape is (Nants,), requred if not
providing `uvw_array`. Contains all unique entires of the joint set of
`ant_1_array` and `ant_2_array`.
ant_1_array : ndarray of int
Antenna number of the first antenna in the baseline pair, for all baselines
Required if not providing `uvw_array`, shape is (Nblts,).
ant_2_array : ndarray of int
Antenna number of the second antenna in the baseline pair, for all baselines
Required if not providing `uvw_array`, shape is (Nblts,).
telescope_lat : float
Latitude of the phase center, units radians, required if deriving baseline
coordinates from antenna positions, or converting to/from ENU coordinates.
telescope_lon : float
Longitude of the phase center, units radians, required if deriving baseline
coordinates from antenna positions, or converting to/from ENU coordinates.
from_enu : boolean
Set to True if uvw_array is expressed in ENU coordinates. Default is False.
to_enu : boolean
Set to True if you would like the output expressed in ENU coordinates. Default
is False.
Returns
-------
new_coords : ndarray of float64
Set of baseline coordinates, shape (Nblts, 3).
"""
if to_enu:
if lst_array is None and not use_ant_pos:
raise ValueError(
"Must include lst_array to calculate baselines in ENU coordinates!"
)
if telescope_lat is None:
raise ValueError(
"Must include telescope_lat to calculate baselines in ENU coordinates!"
)
else:
if ((app_ra is None) or (app_dec is None)) and frame_pa is None:
raise ValueError(
"Must include both app_ra and app_dec, or frame_pa to calculate "
"baselines in uvw coordinates!"
)
if use_ant_pos:
# Assume at this point we are dealing w/ antenna positions
if antenna_positions is None:
raise ValueError("Must include antenna_positions if use_ant_pos=True.")
if (ant_1_array is None) or (ant_2_array is None) or (antenna_numbers is None):
raise ValueError(
"Must include ant_1_array, ant_2_array, and antenna_numbers "
"setting use_ant_pos=True."
)
if lst_array is None and not to_enu:
raise ValueError(
"Must include lst_array if use_ant_pos=True and not calculating "
"baselines in ENU coordinates."
)
if telescope_lon is None:
raise ValueError("Must include telescope_lon if use_ant_pos=True.")
ant_dict = {ant_num: idx for idx, ant_num in enumerate(antenna_numbers)}
ant_1_index = np.array(
[ant_dict[ant_num] for ant_num in ant_1_array], dtype=int
)
ant_2_index = np.array(
[ant_dict[ant_num] for ant_num in ant_2_array], dtype=int
)
N_ants = antenna_positions.shape[0]
# Use the app_ra, app_dec, and lst_array arrays to figure out how many unique
# rotations are actually needed. If the ratio of Nblts to number of unique
# entries is favorable, we can just rotate the antenna positions and save
# outselves a bit of work.
if to_enu:
# If to_enu, skip all this -- there's only one unique ha + dec combo
unique_mask = np.zeros(len(ant_1_index), dtype=np.bool_)
unique_mask[0] = True
else:
unique_mask = np.append(
True,
(
((lst_array[:-1] - app_ra[:-1]) != (lst_array[1:] - app_ra[1:]))
| (app_dec[:-1] != app_dec[1:])
),
)
# GHA -> Hour Angle as measured at Greenwich (because antenna coords are
# centered such that x-plane intersects the meridian at longitude 0).
if to_enu:
# Unprojected coordinates are given in the ENU convention -- that's
# equivalent to calculating uvw's based on zenith. We can use that to our
# advantage and spoof the gha and dec based on telescope lon and lat
unique_gha = np.zeros(1) - telescope_lon
unique_dec = np.zeros(1) + telescope_lat
unique_pa = None
else:
unique_gha = (lst_array[unique_mask] - app_ra[unique_mask]) - telescope_lon
unique_dec = app_dec[unique_mask]
unique_pa = 0.0 if frame_pa is None else frame_pa[unique_mask]
# Tranpose the ant vectors so that they are in the proper shape
ant_vectors = np.transpose(antenna_positions)[np.newaxis, :, :]
# Apply rotations, and then reorganize the ndarray so that you can access
# individual antenna vectors quickly.
ant_rot_vectors = np.reshape(
np.transpose(
_rotate_one_axis(
_rotate_two_axis(
ant_vectors,
rot_amount1=unique_gha,
rot_amount2=unique_dec,
rot_axis1=2,
rot_axis2=1,
),
rot_amount=unique_pa,
rot_axis=0,
),
axes=[0, 2, 1],
),
(-1, 3),
)
unique_mask[0] = False
unique_map = np.cumsum(unique_mask) * N_ants
new_coords = (
ant_rot_vectors[unique_map + ant_2_index]
- ant_rot_vectors[unique_map + ant_1_index]
)
else:
if uvw_array is None:
raise ValueError("Must include uvw_array if use_ant_pos=False.")
if from_enu:
if to_enu:
# Well this was pointless... returning your uvws unharmed
return uvw_array
# Unprojected coordinates appear to be stored in ENU coordinates -- that's
# equivalent to calculating uvw's based on zenith. We can use that to our
# advantage and spoof old_app_ra and old_app_dec based on lst_array and
# telescope_lat
if telescope_lat is None:
raise ValueError(
"Must include telescope_lat if moving between "
"ENU (i.e., 'unprojected') and uvw coordinates!"
)
if lst_array is None:
raise ValueError(
"Must include lst_array if moving between ENU "
"(i.e., 'unprojected') and uvw coordinates!"
)
else:
if (old_frame_pa is None) and not (frame_pa is None or to_enu):
raise ValueError(
"Must include old_frame_pa values if data are phased and "
"applying new position angle values (frame_pa)."
)
if ((old_app_ra is None) and not (app_ra is None or to_enu)) or (
(old_app_dec is None) and not (app_dec is None or to_enu)
):
raise ValueError(
"Must include old_app_ra and old_app_dec values when data are "
"already phased and phasing to a new position."
)
# For this operation, all we need is the delta-ha coverage, which _should_ be
# entirely encapsulated by the change in RA.
if (app_ra is None) and (old_app_ra is None):
gha_delta_array = 0.0
else:
gha_delta_array = (lst_array if from_enu else old_app_ra) - (
lst_array if to_enu else app_ra
)
# Notice below there's an axis re-orientation here, to go from uvw -> XYZ,
# where X is pointing in the direction of the source. This is mostly here
# for convenience and code legibility -- a slightly different pair of
# rotations would give you the same result w/o needing to cycle the axes.
# Up front, we want to trap the corner-case where the sky position you are
# phasing up to hasn't changed, just the position angle (i.e., which way is
# up on the map). This is a much easier transform to handle.
if np.all(gha_delta_array == 0.0) and np.all(old_app_dec == app_dec):
new_coords = _rotate_one_axis(
uvw_array[:, [2, 0, 1], np.newaxis],
rot_amount=frame_pa - (0.0 if old_frame_pa is None else old_frame_pa),
rot_axis=0,
)[:, :, 0]
else:
new_coords = _rotate_two_axis(
_rotate_two_axis(
uvw_array[:, [2, 0, 1], np.newaxis],
rot_amount1=(
0.0 if (from_enu or old_frame_pa is None) else (-old_frame_pa)
),
rot_amount2=(-telescope_lat) if from_enu else (-old_app_dec),
rot_axis1=0,
rot_axis2=1,
),
rot_amount1=gha_delta_array,
rot_amount2=telescope_lat if to_enu else app_dec,
rot_axis1=2,
rot_axis2=1,
)
# One final rotation applied here, to compensate for the fact that we want
# the Dec-axis of our image (Fourier dual to the v-axis) to be aligned with
# the chosen frame, if we not in ENU coordinates
if not to_enu:
new_coords = _rotate_one_axis(
new_coords, rot_amount=frame_pa, rot_axis=0
)
# Finally drop the now-vestigal last axis of the array
new_coords = new_coords[:, :, 0]
# There's one last task to do, which is to re-align the axes from projected
# XYZ -> uvw, where X (which points towards the source) falls on the w axis,
# and Y and Z fall on the u and v axes, respectively.
return new_coords[:, [1, 2, 0]]
[docs]def calc_parallactic_angle(*, app_ra, app_dec, lst_array, telescope_lat):
"""
Calculate the parallactic angle between RA/Dec and the AltAz frame.
Parameters
----------
app_ra : ndarray of floats
Array of apparent RA values in units of radians, shape (Ntimes,).
app_dec : ndarray of floats
Array of apparent dec values in units of radians, shape (Ntimes,).
telescope_lat : float
Latitude of the observatory, in units of radians.
lst_array : float or ndarray of float
Array of local apparent sidereal timesto calculate position angle values
for, in units of radians. Can either be a single float or an array of shape
(Ntimes,).
"""
# This is just a simple wrapped around the pas function in ERFA
return erfa.pas(app_ra, app_dec, lst_array, telescope_lat)
[docs]def calc_frame_pos_angle(
*,
time_array,
app_ra,
app_dec,
telescope_loc,
ref_frame,
ref_epoch=None,
telescope_frame="itrs",
ellipsoid="SPHERE",
offset_pos=(np.pi / 360.0),
):
"""
Calculate an position angle given apparent position and reference frame.
This function is used to determine the position angle between the great
circle of declination in apparent coordinates, versus that in a given
reference frame. Note that this is slightly different than parallactic
angle, which is the difference between apparent declination and elevation.
Paramters
---------
time_array : ndarray of floats
Array of julian dates to calculate position angle values for, of shape
(Ntimes,).
app_ra : ndarray of floats
Array of apparent RA values in units of radians, shape (Ntimes,).
app_dec : ndarray of floats
Array of apparent dec values in units of radians, shape (Ntimes,).
telescope_loc : tuple of floats or EarthLocation
ITRF latitude, longitude, and altitude (rel to sea-level) of the observer.
Can either be provided as an astropy EarthLocation, or an array-like of shape
(3,) containing the latitude, longitude, and altitude, in that order, with units
of radians, radians, and meters, respectively.
ref_frame : str
Coordinate frame to calculate position angles for. Can be any of the
several supported frames in astropy (a limited list: fk4, fk5, icrs,
gcrs, cirs, galactic).
ref_epoch : str or flt
Epoch of the coordinates, only used when ref_frame = fk4 or fk5. Given
in unites of fractional years, either as a float or as a string with
the epoch abbreviation (e.g, Julian epoch 2000.0 would be J2000.0).
telescope_frame: str, optional
Reference frame for telescope location. Options are itrs (default) or mcmf.
Only used if telescope_loc is not an EarthLocation or MoonLocation.
ellipsoid : str
Ellipsoid to use for lunar coordinates. Must be one of "SPHERE",
"GSFC", "GRAIL23", "CE-1-LAM-GEO" (see lunarsky package for details). Default
is "SPHERE". Only used if frame is mcmf.
offset_pos : float
Distance of the offset position used to calculate the frame PA. Default
is 0.5 degrees, which should be sufficent for most applications.
Returns
-------
frame_pa : ndarray of floats
Array of position angles, in units of radians.
"""
# Check to see if the position angles should default to zero
if (ref_frame is None) or (ref_frame == "topo"):
# No-op detected, ENGAGE MAXIMUM SNARK!
return np.zeros_like(time_array)
if offset_pos <= 0:
raise ValueError(
"offset_pos must be greater than 0. This should not happen when called "
"from higher level functions, please make an issue in our issue log."
)
# This creates an array of unique entries of ra + dec + time, since the processing
# time for each element can be non-negligible, and entries along the Nblt axis can
# be highly redundant.
unique_mask = np.union1d(
np.union1d(
np.unique(app_ra, return_index=True)[1],
np.unique(app_dec, return_index=True)[1],
),
np.unique(time_array, return_index=True)[1],
)
# Pluck out the unique entries for each
unique_ra = app_ra[unique_mask]
unique_dec = app_dec[unique_mask]
unique_time = time_array[unique_mask]
# Figure out how many elements we need to transform
n_coord = len(unique_mask)
# Offset north/south positions by 0.5 deg, such that the PA is determined over a
# 1 deg arc.
up_dec = unique_dec + offset_pos
dn_dec = unique_dec - offset_pos
up_ra = np.array(unique_ra)
dn_ra = np.array(unique_ra)
# Wrap the positions if they happen to go over the poles
select_mask = up_dec > (np.pi / 2.0)
up_ra[select_mask] = np.mod(up_ra[select_mask] + np.pi, 2.0 * np.pi)
up_dec[select_mask] = np.pi - up_dec[select_mask]
select_mask = dn_dec < (-np.pi / 2.0)
dn_ra[select_mask] = np.mod(dn_ra[select_mask] + np.pi, 2.0 * np.pi)
dn_dec[select_mask] = (-np.pi) - dn_dec[select_mask]
# Run the set of offset coordinates through the "reverse" transform. The two offset
# positions are concat'd together to help reduce overheads
ref_ra, ref_dec = calc_sidereal_coords(
time_array=np.tile(unique_time, 2),
app_ra=np.concatenate((dn_ra, up_ra)),
app_dec=np.concatenate((dn_dec, up_dec)),
telescope_loc=telescope_loc,
coord_frame=ref_frame,
telescope_frame=telescope_frame,
ellipsoid=ellipsoid,
coord_epoch=ref_epoch,
)
# Use the pas function from ERFA to calculate the position angle. The negative sign
# is here because we're measuring PA of app -> frame, but we want frame -> app.
unique_pa = -erfa.pas(
ref_ra[:n_coord], ref_dec[:n_coord], ref_ra[n_coord:], ref_dec[n_coord:]
)
# Finally, we have to go back through and "fill in" the redundant entries
frame_pa = np.zeros_like(app_ra)
for idx in range(n_coord):
select_mask = np.logical_and(
np.logical_and(unique_ra[idx] == app_ra, unique_dec[idx] == app_dec),
unique_time[idx] == time_array,
)
frame_pa[select_mask] = unique_pa[idx]
return frame_pa
[docs]def lookup_jplhorizons(
target_name,
time_array,
*,
telescope_loc=None,
high_cadence=False,
force_indv_lookup=None,
):
"""
Lookup solar system body coordinates via the JPL-Horizons service.
This utility is useful for generating ephemerides, which can then be interpolated in
order to provide positional data for a target which is moving, such as planetary
bodies and other solar system objects. Use of this function requires the
installation of the `astroquery` module.
Parameters
----------
target_name : str
Name of the target to gather an ephemeris for. Must match the name
in the JPL-Horizons database.
time_array : array-like of float
Times in UTC Julian days to gather an ephemeris for.
telescope_loc : tuple of floats or EarthLocation
ITRS latitude, longitude, and altitude (rel to sea-level) of the observer.
Can either be provided as an EarthLocation object, or an
array-like of shape (3,) containing the latitude, longitude, and altitude,
in that order, with units of radians, radians, and meters, respectively.
high_cadence : bool
If set to True, will calculate ephemeris points every 3 minutes in time, as
opposed to the default of every 3 hours.
force_indv_lookup : bool
If set to True, will calculate coordinate values for each value found within
`time_array`. If False, a regularized time grid is sampled that encloses the
values contained within `time_array`. Default is False, unless `time_array` is
of length 1, in which the default is set to True.
Returns
-------
ephem_times : ndarray of float
Times for which the ephemeris values were calculated, in UTC Julian days.
ephem_ra : ndarray of float
ICRS Right ascension of the target at the values within `ephem_times`, in
units of radians.
ephem_dec : ndarray of float
ICRS Declination of the target at the values within `ephem_times`, in units
of radians.
ephem_dist : ndarray of float
Distance of the target relative to the observer, at the values within
`ephem_times`, in units of parsecs.
ephem_vel : ndarray of float
Velocity of the targets relative to the observer, at the values within
`ephem_times`, in units of km/sec.
"""
try:
from astroquery.jplhorizons import Horizons
except ImportError as err:
raise ImportError(
"astroquery is not installed but is required for "
"planet ephemeris functionality"
) from err
from json import load as json_load
from os.path import join as path_join
from pyuvdata.data import DATA_PATH
# Get the telescope location into a format that JPL-Horizons can understand,
# which is nominally a dict w/ entries for lon (units of deg), lat (units of
# deg), and elevation (units of km).
if isinstance(telescope_loc, EarthLocation):
site_loc = {
"lon": telescope_loc.lon.deg,
"lat": telescope_loc.lat.deg,
"elevation": telescope_loc.height.to_value(unit=units.km),
}
elif telescope_loc is None:
# Setting to None will report the geocentric position
site_loc = None
elif isinstance(telescope_loc, tuple | list) or (
isinstance(telescope_loc, np.ndarray) and telescope_loc.size > 1
):
# MoonLocations are instances of np.ndarray but have size 1
site_loc = {
"lon": telescope_loc[1] * (180.0 / np.pi),
"lat": telescope_loc[0] * (180.0 / np.pi),
"elevation": telescope_loc[2] * (0.001), # m -> km
}
else:
bad_type = False
try:
from lunarsky import MoonLocation
if isinstance(telescope_loc, MoonLocation):
raise NotImplementedError(
"Cannot lookup JPL positions for telescopes with a MoonLocation"
)
else:
bad_type = True
except ImportError: # pragma: no cover
# getting here requires having astroquery but not lunarsky. We don't
# have a CI like that.
bad_type = True
if bad_type:
raise ValueError(
f"telescope_loc is not a valid type: {type(telescope_loc)}"
)
# If force_indv_lookup is True, or unset but only providing a single value, then
# just calculate the RA/Dec for the times requested rather than creating a table
# to interpolate from.
if force_indv_lookup or (
(np.array(time_array).size == 1) and (force_indv_lookup is None)
):
epoch_list = np.unique(time_array)
if len(epoch_list) > 50:
raise ValueError(
"Requesting too many individual ephem points from JPL-Horizons. This "
"can be remedied by setting force_indv_lookup=False or limiting the "
"number of values in time_array."
)
else:
# When querying for multiple times, its faster (and kinder to the
# good folks at JPL) to create a range to query, and then interpolate
# between values. The extra buffer of 0.001 or 0.25 days for high and
# low cadence is to give enough data points to allow for spline
# interpolation of the data.
if high_cadence:
start_time = np.min(time_array) - 0.001
stop_time = np.max(time_array) + 0.001
step_time = "3m"
n_entries = (stop_time - start_time) * (1440.0 / 3.0)
else:
# The start/stop time here are setup to maximize reusability of the
# data, since astroquery appears to cache the results from previous
# queries.
start_time = (0.25 * np.floor(4.0 * np.min(time_array))) - 0.25
stop_time = (0.25 * np.ceil(4.0 * np.max(time_array))) + 0.25
step_time = "3h"
n_entries = (stop_time - start_time) * (24.0 / 3.0)
# We don't want to overtax the JPL service, so limit ourselves to 1000
# individual queries at a time. Note that this is likely a conservative
# cap for JPL-Horizons, but there should be exceptionally few applications
# that actually require more than this.
if n_entries > 1000:
if (len(np.unique(time_array)) <= 50) and (force_indv_lookup is None):
# If we have a _very_ sparse set of epochs, pass that along instead
epoch_list = np.unique(time_array)
else:
# Otherwise, time to raise an error
raise ValueError(
"Too many ephem points requested from JPL-Horizons. This "
"can be remedied by setting high_cadance=False or limiting "
"the number of values in time_array."
)
else:
epoch_list = {
"start": Time(start_time, format="jd").isot,
"stop": Time(stop_time, format="jd").isot,
"step": step_time,
}
# Check to make sure dates are within the 1700-2200 time range,
# since not all targets are supported outside of this range
if (np.min(time_array) < 2341973.0) or (np.max(time_array) > 2524593.0):
raise ValueError(
"No current support for JPL ephems outside of 1700 - 2300 AD. "
"Check back later (or possibly earlier)..."
)
# JPL-Horizons has a separate catalog with what it calls 'major bodies',
# and will throw an error if you use the wrong catalog when calling for
# astrometry. We'll use the dict below to capture this behavior.
with open(path_join(DATA_PATH, "jpl_major_bodies.json")) as fhandle:
major_body_dict = json_load(fhandle)
target_id = target_name
id_type = "smallbody"
# If we find the target in the major body database, then we can extract the
# target ID to make the query a bit more robust (otherwise JPL-Horizons will fail
# on account that id will find multiple partial matches: e.g., "Mars" will be
# matched with "Mars", "Mars Explorer", "Mars Barycenter"..., and JPL-Horizons will
# not know which to choose).
if target_name in major_body_dict:
target_id = major_body_dict[target_name]
id_type = None
query_obj = Horizons(
id=target_id, location=site_loc, epochs=epoch_list, id_type=id_type
)
# If not in the major bodies catalog, try the minor bodies list, and if
# still not found, throw an error.
try:
ephem_data = query_obj.ephemerides(extra_precision=True)
except KeyError:
# This is a fix for an astroquery + JPL-Horizons bug, that's related to
# API change on JPL's side. In this case, the source is identified, but
# astroquery can't correctly parse the return message from JPL-Horizons.
# See astroquery issue #2169.
ephem_data = query_obj.ephemerides(extra_precision=False) # pragma: no cover
except ValueError as err:
query_obj._session.close()
if "Unknown target" in str(err):
raise ValueError(
"Target ID is not recognized in either the small or major bodies "
"catalogs, please consult the JPL-Horizons database for supported "
"targets (https://ssd.jpl.nasa.gov/?horizons)."
) from err
else:
raise # pragma: no cover
# This is explicitly closed here to trap a bug that occassionally throws an
# unexpected warning, see astroquery issue #1807
query_obj._session.close()
# Now that we have the ephem data, extract out the relevant data
ephem_times = np.array(ephem_data["datetime_jd"])
ephem_ra = np.array(ephem_data["RA"]) * (np.pi / 180.0)
ephem_dec = np.array(ephem_data["DEC"]) * (np.pi / 180.0)
ephem_dist = np.array(ephem_data["delta"]) # AU
ephem_vel = np.array(ephem_data["delta_rate"]) # km/s
return ephem_times, ephem_ra, ephem_dec, ephem_dist, ephem_vel
[docs]def interpolate_ephem(
*, time_array, ephem_times, ephem_ra, ephem_dec, ephem_dist=None, ephem_vel=None
):
"""
Interpolates ephemerides to give positions for requested times.
This is a simple tool for calculated interpolated RA and Dec positions, as well
as distances and velocities, for a given ephemeris. Under the hood, the method
uses as cubic spline interpolation to calculate values at the requested times,
provided that there are enough values to interpolate over to do so (requires
>= 4 points), otherwise a linear interpolation is used.
Parameters
----------
time_array : array-like of floats
Times to interpolate positions for, in UTC Julian days.
ephem_times : array-like of floats
Times in UTC Julian days which describe that match to the recorded postions
of the target. Must be array-like, of shape (Npts,), where Npts is the number
of ephemeris points.
ephem_ra : array-like of floats
Right ascencion of the target, at the times given in `ephem_times`. Units are
in radians, must have the same shape as `ephem_times`.
ephem_dec : array-like of floats
Declination of the target, at the times given in `ephem_times`. Units are
in radians, must have the same shape as `ephem_times`.
ephem_dist : array-like of floats
Distance of the target from the observer, at the times given in `ephem_times`.
Optional argument, in units of parsecs. Must have the same shape as
`ephem_times`.
ephem_vel : array-like of floats
Velocities of the target, at the times given in `ephem_times`. Optional
argument, in units of km/sec. Must have the same shape as `ephem_times`.
Returns
-------
ra_vals : ndarray of float
Interpolated RA values, returned as an ndarray of floats with
units of radians, and the same shape as `time_array`.
dec_vals : ndarray of float
Interpolated declination values, returned as an ndarray of floats with
units of radians, and the same shape as `time_array`.
dist_vals : None or ndarray of float
If `ephem_dist` was provided, an ndarray of floats (with same shape as
`time_array`) with the interpolated target distances, in units of parsecs.
If `ephem_dist` was not provided, this returns as None.
vel_vals : None or ndarray of float
If `ephem_vals` was provided, an ndarray of floats (with same shape as
`time_array`) with the interpolated target velocities, in units of km/sec.
If `ephem_vals` was not provided, this returns as None.
"""
# We're importing this here since it's only used for this one function
from scipy.interpolate import interp1d
ephem_shape = np.array(ephem_times).shape
# Make sure that things look reasonable
if np.array(ephem_ra).shape != ephem_shape:
raise ValueError("ephem_ra must have the same shape as ephem_times.")
if np.array(ephem_dec).shape != ephem_shape:
raise ValueError("ephem_dec must have the same shape as ephem_times.")
if (np.array(ephem_dist).shape != ephem_shape) and (ephem_dist is not None):
raise ValueError("ephem_dist must have the same shape as ephem_times.")
if (np.array(ephem_vel).shape != ephem_shape) and (ephem_vel is not None):
raise ValueError("ephem_vel must have the same shape as ephem_times.")
ra_vals = np.zeros_like(time_array, dtype=float)
dec_vals = np.zeros_like(time_array, dtype=float)
dist_vals = None if ephem_dist is None else np.zeros_like(time_array, dtype=float)
vel_vals = None if ephem_vel is None else np.zeros_like(time_array, dtype=float)
if len(ephem_times) == 1:
ra_vals += ephem_ra
dec_vals += ephem_dec
if ephem_dist is not None:
dist_vals += ephem_dist
if ephem_vel is not None:
vel_vals += ephem_vel
else:
if len(ephem_times) > 3:
interp_kind = "cubic"
else:
interp_kind = "linear"
# If we have values that line up perfectly, just use those directly
select_mask = np.isin(time_array, ephem_times)
if np.any(select_mask):
time_select = time_array[select_mask]
ra_vals[select_mask] = interp1d(ephem_times, ephem_ra, kind="nearest")(
time_select
)
dec_vals[select_mask] = interp1d(ephem_times, ephem_dec, kind="nearest")(
time_select
)
if ephem_dist is not None:
dist_vals[select_mask] = interp1d(
ephem_times, ephem_dist, kind="nearest"
)(time_select)
if ephem_vel is not None:
vel_vals[select_mask] = interp1d(
ephem_times, ephem_vel, kind="nearest"
)(time_select)
# If we have values lining up between grid points, use spline interpolation
# to calculate their values
select_mask = ~select_mask
if np.any(select_mask):
time_select = time_array[select_mask]
ra_vals[select_mask] = interp1d(ephem_times, ephem_ra, kind=interp_kind)(
time_select
)
dec_vals[select_mask] = interp1d(ephem_times, ephem_dec, kind=interp_kind)(
time_select
)
if ephem_dist is not None:
dist_vals[select_mask] = interp1d(
ephem_times, ephem_dist, kind=interp_kind
)(time_select)
if ephem_vel is not None:
vel_vals[select_mask] = interp1d(
ephem_times, ephem_vel, kind=interp_kind
)(time_select)
return (ra_vals, dec_vals, dist_vals, vel_vals)
[docs]def calc_app_coords(
*,
lon_coord,
lat_coord,
coord_frame="icrs",
coord_epoch=None,
coord_times=None,
coord_type="sidereal",
time_array=None,
lst_array=None,
telescope_loc=None,
telescope_frame="itrs",
ellipsoid=None,
pm_ra=None,
pm_dec=None,
vrad=None,
dist=None,
all_times_unique: bool | None = None,
):
"""
Calculate apparent coordinates for several different coordinate types.
This function calculates apparent positions at the current epoch.
Parameters
----------
lon_coord : float or ndarray of float
Longitudinal (e.g., RA) coordinates, units of radians. Must match the same
shape as lat_coord.
lat_coord : float or ndarray of float
Latitudinal (e.g., Dec) coordinates, units of radians. Must match the same
shape as lon_coord.
coord_frame : string
The requested reference frame for the output coordinates, can be any frame
that is presently supported by astropy.
coord_epoch : float or str or Time object
Epoch for ref_frame, nominally only used if converting to either the FK4 or
FK5 frames, in units of fractional years. If provided as a float and the
coord_frame is an FK4-variant, value will assumed to be given in Besselian
years (i.e., 1950 would be 'B1950'), otherwise the year is assumed to be
in Julian years.
coord_times : float or ndarray of float
Only used when `coord_type="ephem"`, the JD UTC time for each value of
`lon_coord` and `lat_coord`. These values are used to interpolate `lon_coord`
and `lat_coord` values to those times listed in `time_array`.
coord_type : str
Type of source to calculate coordinates for. Must be one of:
- "sidereal" (fixed RA/Dec),
- "ephem" (RA/Dec that moves with time),
- "driftscan" (fixed az/el position),
- "unprojected" (alias for "driftscan" with (Az, Alt) = (0 deg, 90 deg)).
- "near_field" (equivalent to sidereal, with the addition of
near-field corrections)
time_array : float or ndarray of float or Time object
Times for which the apparent coordinates are to be calculated, in UTC JD.
If more than a single element, must be the same shape as lon_coord and
lat_coord if both of those are arrays (instead of single floats).
telescope_loc : array-like of floats or EarthLocation or MoonLocation
ITRF latitude, longitude, and altitude (rel to sea-level) of the phase center
of the array. Can either be provided as an astropy EarthLocation, a lunarsky
Moonlocation, or a tuple of shape (3,) containing (in order) the latitude,
longitude, and altitude for a position on Earth in units of radians, radians,
and meters, respectively.
telescope_frame: str, optional
Reference frame for telescope location. Options are itrs (default) or mcmf.
Only used if telescope_loc is not an EarthLocation or MoonLocation.
ellipsoid : str
Ellipsoid to use for lunar coordinates. Must be one of "SPHERE",
"GSFC", "GRAIL23", "CE-1-LAM-GEO" (see lunarsky package for details). Default
is "SPHERE". Only used if frame is mcmf.
pm_ra : float or ndarray of float
Proper motion in RA of the source, expressed in units of milliarcsec / year.
Can either be a single float or array of shape (Ntimes,), although this must
be consistent with other parameters (namely ra_coord and dec_coord). Not
required, motion is calculated relative to the value of `coord_epoch`.
pm_dec : float or ndarray of float
Proper motion in Dec of the source, expressed in units of milliarcsec / year.
Can either be a single float or array of shape (Ntimes,), although this must
be consistent with other parameters (namely ra_coord and dec_coord). Not
required, motion is calculated relative to the value of `coord_epoch`.
vrad : float or ndarray of float
Radial velocity of the source, expressed in units of km / sec. Can either be
a single float or array of shape (Ntimes,), although this must be consistent
with other parameters (namely ra_coord and dec_coord). Not required.
dist : float or ndarray of float
Distance of the source, expressed in milliarcseconds. Can either be a single
float or array of shape (Ntimes,), although this must be consistent with other
parameters (namely ra_coord and dec_coord). Not required.
all_times_unique
Boolean specifying whether all the times that were passed are unique. Default
is to determine this within the function, but specifying it here can improve
performance.
Returns
-------
app_ra : ndarray of floats
Apparent right ascension coordinates, in units of radians.
app_dec : ndarray of floats
Apparent declination coordinates, in units of radians.
"""
site_loc = get_loc_obj(
telescope_loc,
telescope_frame=telescope_frame,
ellipsoid=ellipsoid,
angle_units=units.rad,
)
# Time objects and unique don't seem to play well together, so we break apart
# their handling here
if isinstance(time_array, Time):
time_array = time_array.utc.jd
if not all_times_unique:
unique_time_array, unique_mask = np.unique(time_array, return_index=True)
else:
unique_time_array = time_array
unique_mask = slice(None)
if coord_type in ["driftscan", "unprojected"]:
if lst_array is None:
unique_lst = get_lst_for_time(unique_time_array, telescope_loc=site_loc)
else:
unique_lst = lst_array[unique_mask]
if coord_type == "sidereal" or coord_type == "near_field":
# If the coordinates are not in the ICRS frame, go ahead and transform them now
if coord_frame != "icrs":
icrs_ra, icrs_dec = transform_sidereal_coords(
longitude=lon_coord,
latitude=lat_coord,
in_coord_frame=coord_frame,
out_coord_frame="icrs",
in_coord_epoch=coord_epoch,
time_array=unique_time_array,
)
else:
icrs_ra = lon_coord
icrs_dec = lat_coord
unique_app_ra, unique_app_dec = transform_icrs_to_app(
time_array=unique_time_array,
ra=icrs_ra,
dec=icrs_dec,
telescope_loc=site_loc,
pm_ra=pm_ra,
pm_dec=pm_dec,
vrad=vrad,
dist=dist,
)
elif coord_type == "driftscan":
# Use the ERFA function ae2hd, which will do all the heavy
# lifting for us
unique_app_ha, unique_app_dec = erfa.ae2hd(
lon_coord, lat_coord, site_loc.lat.rad
)
# The above returns HA/Dec, so we just need to rotate by
# the LST to get back app RA and Dec
unique_app_ra = np.mod(unique_app_ha + unique_lst, 2 * np.pi)
unique_app_dec = unique_app_dec + np.zeros_like(unique_app_ra)
elif coord_type == "ephem":
interp_ra, interp_dec, _, _ = interpolate_ephem(
time_array=unique_time_array,
ephem_times=coord_times,
ephem_ra=lon_coord,
ephem_dec=lat_coord,
)
if coord_frame != "icrs":
icrs_ra, icrs_dec = transform_sidereal_coords(
longitude=interp_ra,
latitude=interp_dec,
in_coord_frame=coord_frame,
out_coord_frame="icrs",
in_coord_epoch=coord_epoch,
time_array=unique_time_array,
)
else:
icrs_ra = interp_ra
icrs_dec = interp_dec
# TODO: Vel and distance handling to be integrated here, once they are are
# needed for velocity frame tracking
unique_app_ra, unique_app_dec = transform_icrs_to_app(
time_array=unique_time_array,
ra=icrs_ra,
dec=icrs_dec,
telescope_loc=site_loc,
pm_ra=pm_ra,
pm_dec=pm_dec,
)
elif coord_type == "unprojected":
# This is the easiest one - this is just supposed to be ENU, so set the
# apparent coords to the current lst and telescope_lat.
unique_app_ra = unique_lst.copy()
unique_app_dec = np.zeros_like(unique_app_ra) + site_loc.lat.rad
else:
raise ValueError(f"Object type {coord_type} is not recognized.")
# Now that we've calculated all the unique values, time to backfill through the
# "redundant" entries in the Nblt axis.
app_ra = np.zeros(np.array(time_array).shape)
app_dec = np.zeros(np.array(time_array).shape)
for idx, unique_time in enumerate(unique_time_array):
if not all_times_unique:
select_mask = time_array == unique_time
else:
select_mask = idx
app_ra[select_mask] = unique_app_ra[idx]
app_dec[select_mask] = unique_app_dec[idx]
return app_ra, app_dec
[docs]def calc_sidereal_coords(
*,
time_array,
app_ra,
app_dec,
telescope_loc,
coord_frame,
telescope_frame="itrs",
ellipsoid=None,
coord_epoch=None,
):
"""
Calculate sidereal coordinates given apparent coordinates.
This function calculates coordinates in the requested frame (at a given epoch)
from a set of apparent coordinates.
Parameters
----------
time_array : float or ndarray of float or Time object
Times for which the apparent coordinates were calculated, in UTC JD. Must
match the shape of app_ra and app_dec.
app_ra : float or ndarray of float
Array of apparent right ascension coordinates, units of radians. Must match
the shape of time_array and app_dec.
app_ra : float or ndarray of float
Array of apparent right declination coordinates, units of radians. Must match
the shape of time_array and app_dec.
telescope_loc : tuple of floats or EarthLocation
ITRF latitude, longitude, and altitude (rel to sea-level) of the phase center
of the array. Can either be provided as an astropy EarthLocation, or a tuple
of shape (3,) containing (in order) the latitude, longitude, and altitude,
in units of radians, radians, and meters, respectively.
coord_frame : string
The requested reference frame for the output coordinates, can be any frame
that is presently supported by astropy. Default is ICRS.
telescope_frame: str, optional
Reference frame for telescope location. Options are itrs (default) or mcmf.
Only used if telescope_loc is not an EarthLocation or MoonLocation.
ellipsoid : str
Ellipsoid to use for lunar coordinates. Must be one of "SPHERE",
"GSFC", "GRAIL23", "CE-1-LAM-GEO" (see lunarsky package for details). Default
is "SPHERE". Only used if frame is mcmf.
coord_epoch : float or str or Time object
Epoch for ref_frame, nominally only used if converting to either the FK4 or
FK5 frames, in units of fractional years. If provided as a float and the
ref_frame is an FK4-variant, value will assumed to be given in Besselian
years (i.e., 1950 would be 'B1950'), otherwise the year is assumed to be
in Julian years.
Returns
-------
ref_ra : ndarray of floats
Right ascension coordinates in the requested frame, in units of radians.
Either shape (Ntimes,) if Ntimes >1, otherwise (Ncoord,).
ref_dec : ndarray of floats
Declination coordinates in the requested frame, in units of radians.
Either shape (Ntimes,) if Ntimes >1, otherwise (Ncoord,).
"""
# Check to make sure that we have a properly formatted epoch for our in-bound
# coordinate frame
epoch = None
if isinstance(coord_epoch, str | Time):
# If its a string or a Time object, we don't need to do anything more
epoch = Time(coord_epoch)
elif coord_epoch is not None:
if coord_frame.lower() in ["fk4", "fk4noeterms"]:
epoch = Time(coord_epoch, format="byear")
else:
epoch = Time(coord_epoch, format="jyear")
if telescope_frame == "mcmf" and ellipsoid is None:
ellipsoid = "SPHERE"
icrs_ra, icrs_dec = transform_app_to_icrs(
time_array=time_array,
app_ra=app_ra,
app_dec=app_dec,
telescope_loc=telescope_loc,
telescope_frame=telescope_frame,
ellipsoid=ellipsoid,
)
if coord_frame == "icrs":
ref_ra, ref_dec = (icrs_ra, icrs_dec)
else:
ref_ra, ref_dec = transform_sidereal_coords(
longitude=icrs_ra,
latitude=icrs_dec,
in_coord_frame="icrs",
out_coord_frame=coord_frame,
out_coord_epoch=epoch,
time_array=time_array,
)
return ref_ra, ref_dec
[docs]def uvw_track_generator(
*,
lon_coord=None,
lat_coord=None,
coord_frame="icrs",
coord_epoch=None,
coord_type="sidereal",
time_array=None,
telescope_loc=None,
telescope_frame="itrs",
ellipsoid=None,
antenna_positions=None,
antenna_numbers=None,
ant_1_array=None,
ant_2_array=None,
uvw_array=None,
force_postive_u=False,
):
"""
Calculate uvw coordinates (among other values) for a given position on the sky.
This function is meant to be a user-friendly wrapper around several pieces of code
for effectively simulating a track.
Parameters
----------
lon_coord : float or ndarray of float
Longitudinal (e.g., RA) coordinates, units of radians. Must match the same
shape as lat_coord.
lat_coord : float or ndarray of float
Latitudinal (e.g., Dec) coordinates, units of radians. Must match the same
shape as lon_coord.
coord_frame : string
The requested reference frame for the output coordinates, can be any frame
that is presently supported by astropy.
coord_epoch : float or str or Time object, optional
Epoch for ref_frame, nominally only used if converting to either the FK4 or
FK5 frames, in units of fractional years. If provided as a float and the
ref_frame is an FK4-variant, value will assumed to be given in Besselian
years (i.e., 1950 would be 'B1950'), otherwise the year is assumed to be
in Julian years.
coord_type : str
Type of source to calculate coordinates for. Must be one of:
"sidereal" (fixed RA/Dec),
"ephem" (RA/Dec that moves with time),
"driftscan" (fixed az/el position),
"unprojected" (alias for "driftscan" with (Az, Alt) = (0 deg, 90 deg)).
time_array : ndarray of float or Time object
Times for which the apparent coordinates were calculated, in UTC JD. Must
match the shape of lon_coord and lat_coord.
telescope_loc : array-like of floats or EarthLocation or MoonLocation
ITRF latitude, longitude, and altitude (rel to sea-level) of the phase center
of the array. Can either be provided as an astropy EarthLocation, a lunarsky
Moonlocation, or a tuple of shape (3,) containing (in order) the latitude,
longitude, and altitude for a position on Earth in units of degrees, degrees,
and meters, respectively.
telescope_frame : str, optional
Reference frame for latitude/longitude/altitude. Options are itrs (default) or
mcmf. Only used if telescope_loc is not an EarthLocation or MoonLocation.
ellipsoid : str
Ellipsoid to use for lunar coordinates. Must be one of "SPHERE",
"GSFC", "GRAIL23", "CE-1-LAM-GEO" (see lunarsky package for details). Default
is "SPHERE". Only used if frame is mcmf.
antenna_positions : ndarray of float
List of antenna positions relative to array center in ECEF coordinates,
required if not providing `uvw_array`. Shape is (Nants, 3).
antenna_numbers: ndarray of int, optional
List of antenna numbers, ordered in the same way as `antenna_positions` (e.g.,
`antenna_numbers[0]` should given the number of antenna that resides at ECEF
position given by `antenna_positions[0]`). Shape is (Nants,), requred if
supplying ant_1_array and ant_2_array.
ant_1_array : ndarray of int, optional
Antenna number of the first antenna in the baseline pair, for all baselines
Required if not providing `uvw_array`, shape is (Nblts,). If not supplied, then
the method will automatically fill in ant_1_array with all unique antenna
pairings for each time/position.
ant_2_array : ndarray of int, optional
Antenna number of the second antenna in the baseline pair, for all baselines
Required if not providing `uvw_array`, shape is (Nblts,). If not supplied, then
the method will automatically fill in ant_2_array with all unique antenna
pairings for each time/position.
uvw_array : ndarray of float, optional
Array of baseline coordinates (in ENU), required if not deriving new coordinates
from antenna positions. Setting this value will will cause antenna positions to
be ignored. Shape is (Nblts, 3).
force_positive_u : bool, optional
If set to true, then forces the conjugation of each individual baseline to be
set such that the uvw coordinates land on the positive-u side of the uv-plane.
Default is False.
Returns
-------
obs_dict : dict
Dictionary containing the results of the simulation, which includes:
"uvw" the uvw-coordinates (meters),
"app_ra" apparent RA of the sources (radians),
"app_dec" apparent Dec of the sources (radians),
"frame_pa" ngle between apparent north and `coord_frame` north (radians),
"lst" local apparent sidereal time (radians),
"site_loc" EarthLocation or MoonLocation for the telescope site.
"""
site_loc = get_loc_obj(
telescope_loc,
telescope_frame=telescope_frame,
ellipsoid=ellipsoid,
angle_units=units.deg,
)
if not isinstance(lon_coord, np.ndarray):
lon_coord = np.array(lon_coord)
if not isinstance(lat_coord, np.ndarray):
lat_coord = np.array(lat_coord)
if not isinstance(time_array, np.ndarray):
time_array = np.array(time_array)
if lon_coord.ndim == 0:
lon_coord = lon_coord.reshape(1)
if lat_coord.ndim == 0:
lat_coord = lat_coord.reshape(1)
if time_array.ndim == 0:
time_array = time_array.reshape(1)
Ntimes = len(time_array)
if uvw_array is None and all(
item is None for item in [antenna_numbers, ant_1_array, ant_2_array]
):
antenna_numbers = np.arange(1, 1 + len(antenna_positions))
ant_1_array = []
ant_2_array = []
for idx in range(len(antenna_positions)):
for jdx in range(idx + 1, len(antenna_positions)):
ant_1_array.append(idx + 1)
ant_2_array.append(jdx + 1)
Nbase = len(ant_1_array)
ant_1_array = np.tile(ant_1_array, Ntimes)
ant_2_array = np.tile(ant_2_array, Ntimes)
if len(lon_coord) == len(time_array):
lon_coord = np.repeat(lon_coord, Nbase)
lat_coord = np.repeat(lat_coord, Nbase)
time_array = np.repeat(time_array, Nbase)
lst_array = get_lst_for_time(jd_array=time_array, telescope_loc=site_loc)
app_ra, app_dec = calc_app_coords(
lon_coord=lon_coord,
lat_coord=lat_coord,
coord_frame=coord_frame,
coord_type=coord_type,
time_array=time_array,
lst_array=lst_array,
telescope_loc=site_loc,
)
frame_pa = calc_frame_pos_angle(
time_array=time_array,
app_ra=app_ra,
app_dec=app_dec,
telescope_loc=site_loc,
ref_frame=coord_frame,
ref_epoch=coord_epoch,
)
uvws = calc_uvw(
app_ra=app_ra,
app_dec=app_dec,
frame_pa=frame_pa,
lst_array=lst_array,
antenna_positions=antenna_positions,
antenna_numbers=antenna_numbers,
ant_1_array=ant_1_array,
ant_2_array=ant_2_array,
telescope_lon=site_loc.lon.rad,
telescope_lat=site_loc.lat.rad,
uvw_array=uvw_array,
use_ant_pos=(uvw_array is None),
from_enu=(uvw_array is not None),
)
if force_postive_u:
mask = (uvws[:, 0] < 0.0) | ((uvws[:, 0] == 0.0) & (uvws[:, 1] < 0.0))
uvws[mask, :] *= -1.0
return {
"uvw": uvws,
"app_ra": app_ra,
"app_dec": app_dec,
"frame_pa": frame_pa,
"lst": lst_array,
"site_loc": site_loc,
}
[docs]def _get_focus_xyz(uvd, focus, ra, dec):
"""
Return the x,y,z coordinates of the focal point.
The focal point corresponds to the location of
the near-field object of interest in the ENU
frame centered on the median position of the
antennas.
Parameters
----------
uvd : UVData object
UVData object
focus : float
Focal distance of the array (km)
ra : float
Right ascension of the focal point ie phase center (deg; shape (Ntimes,))
dec : float
Declination of the focal point ie phase center (deg; shape (Ntimes,))
Returns
-------
x, y, z: ndarray, ndarray, ndarray
ENU-frame coordinates of the focal point (meters) (shape (Ntimes,))
"""
# Obtain timesteps
timesteps = Time(np.unique(uvd.time_array), format="jd")
# Initialize sky-based coordinates using right ascension and declination
obj = SkyCoord(ra * units.deg, dec * units.deg)
# Initialize EarthLocation object centred on the telescope
loc = uvd.telescope.location.itrs.cartesian.xyz.value
antpos = uvd.telescope.antenna_positions + loc
x, y, z = np.median(antpos, axis=0)
telescope = EarthLocation(x, y, z, unit=units.m)
# Convert sky object to an AltAz frame centered on the telescope
obj = obj.transform_to(AltAz(obstime=timesteps, location=telescope))
# Obtain altitude and azimuth
theta, phi = obj.alt.to(units.rad), obj.az.to(units.rad)
# Obtain x,y,z ENU coordinates
x = focus * 1e3 * np.cos(theta) * np.sin(phi)
y = focus * 1e3 * np.cos(theta) * np.cos(phi)
z = focus * 1e3 * np.sin(theta)
return x, y, z
[docs]def _get_nearfield_delay(uvd, focus_x, focus_y, focus_z):
"""
Calculate near-field phase/delay along the Nblts axis.
Parameters
----------
uvd : UVData object
UVData object
focus_x, focus_y, focus_z : ndarray, ndarray, ndarray
ENU-frame coordinates of focal point (Each of shape (Ntimes,))
Returns
-------
new_w : ndarray
The calculated near-field delay (or w-term) for each visibility along
the Nblts axis
"""
# Get indices to convert between Nants and Nblts
ind1, ind2 = _nants_to_nblts(uvd)
# The center of the ENU frame should be located at the median position of the array
antpos = uvd.telescope.get_enu_antpos() - np.median(
uvd.telescope.get_enu_antpos(), axis=0
)
# Get tile positions for each baseline
tile1 = antpos[ind1] # Shape (Nblts, 3)
tile2 = antpos[ind2] # Shape (Nblts, 3)
# Focus points have shape (Ntimes,); convert to shape (Nblts,)
t_inds = _ntimes_to_nblts(uvd)
(focus_x, focus_y, focus_z) = (focus_x[t_inds], focus_y[t_inds], focus_z[t_inds])
# Calculate distance from antennas to focal point
# for each visibility along the Nblts axis
r1 = np.sqrt(
(tile1[:, 0] - focus_x) ** 2
+ (tile1[:, 1] - focus_y) ** 2
+ (tile1[:, 2] - focus_z) ** 2
)
r2 = np.sqrt(
(tile2[:, 0] - focus_x) ** 2
+ (tile2[:, 1] - focus_y) ** 2
+ (tile2[:, 2] - focus_z) ** 2
)
# Get the uvw array along the Nblts axis; select only the w's
old_w = uvd.uvw_array[:, -1]
# Calculate near-field delay
new_w = r1 - r2
# Mask autocorrelations
mask = np.not_equal(uvd.ant_1_array, uvd.ant_2_array)
new_w = np.where(mask, new_w, old_w)
return new_w # Shape (Nblts,)