# Copyright (c) 2024 Radio Astronomy Software Group
# Licensed under the 2-clause BSD License
"""Utilities for working with MeasurementSet files."""
import contextlib
import os
import warnings
import numpy as np
from astropy.coordinates import EarthLocation
from astropy.time import Time
from ... import __version__, utils
from ...telescopes import known_telescope_location, known_telescopes
from ...uvdata.uvdata import reporting_request
no_casa_message = (
"casacore is not installed but is required for measurement set functionality"
)
casa_present = True
casa_error = None
try:
from casacore import tables
from casacore.tables import tableutil
except ImportError as error:
casa_present = False
casa_error = error
"""
This dictionary defines the mapping between CASA polarization numbers and
AIPS polarization numbers
"""
# convert from casa polarization integers to pyuvdata
POL_CASA2AIPS_DICT = {
1: 1,
2: 2,
3: 3,
4: 4,
5: -1,
6: -3,
7: -4,
8: -2,
9: -5,
10: -7,
11: -8,
12: -6,
}
POL_AIPS2CASA_DICT = {
aipspol: casapol for casapol, aipspol in POL_CASA2AIPS_DICT.items()
}
VEL_DICT = {
"REST": 0,
"LSRK": 1,
"LSRD": 2,
"BARY": 3,
"GEO": 4,
"TOPO": 5,
"GALACTO": 6,
"LGROUP": 7,
"CMB": 8,
"Undefined": 64,
}
# In CASA 'J2000' refers to a specific frame -- FK5 w/ an epoch of
# J2000. We'll plug that in here directly, noting that CASA has an
# explicit list of supported reference frames, located here:
# casa.nrao.edu/casadocs/casa-5.0.0/reference-material/coordinate-frames
COORD_PYUVDATA2CASA_DICT = {
"J2000": ("fk5", 2000.0), # mean equator and equinox at J2000.0 (FK5)
"JNAT": None, # geocentric natural frame
"JMEAN": None, # mean equator and equinox at frame epoch
"JTRUE": None, # true equator and equinox at frame epoch
"APP": ("gcrs", 2000.0), # apparent geocentric position
"B1950": ("fk4", 1950.0), # mean epoch and ecliptic at B1950.0.
"B1950_VLA": ("fk4", 1979.0), # mean epoch (1979.9) and ecliptic at B1950.0
"BMEAN": None, # mean equator and equinox at frame epoch
"BTRUE": None, # true equator and equinox at frame epoch
"GALACTIC": None, # Galactic coordinates
"HADEC": None, # topocentric HA and declination
"AZEL": None, # topocentric Azimuth and Elevation (N through E)
"AZELSW": None, # topocentric Azimuth and Elevation (S through W)
"AZELNE": None, # topocentric Azimuth and Elevation (N through E)
"AZELGEO": None, # geodetic Azimuth and Elevation (N through E)
"AZELSWGEO": None, # geodetic Azimuth and Elevation (S through W)
"AZELNEGEO": None, # geodetic Azimuth and Elevation (N through E)
"ECLIPTIC": None, # ecliptic for J2000 equator and equinox
"MECLIPTIC": None, # ecliptic for mean equator of date
"TECLIPTIC": None, # ecliptic for true equator of date
"SUPERGAL": None, # supergalactic coordinates
"ITRF": None, # coordinates wrt ITRF Earth frame
"TOPO": None, # apparent topocentric position
"ICRS": ("icrs", 2000.0), # International Celestial reference system
}
[docs]def _ms_utils_call_checks(filepath, invert_check=False):
# Check for casa.
if not casa_present:
raise ImportError(no_casa_message) from casa_error
if invert_check:
if os.path.exists(filepath):
raise FileExistsError(filepath + " already exists.")
elif not tables.tableexists(filepath):
raise FileNotFoundError(
filepath + " not found or not recognized as an MS table."
)
[docs]def _parse_pyuvdata_frame_ref(frame_name, epoch_val, *, raise_error=True):
"""
Interpret a UVData pair of frame + epoch into a CASA frame name.
Parameters
----------
frame_name : str
Name of the frame. Typically matched to the UVData attribute
`phase_center_frame`.
epoch_val : float
Epoch value for the given frame, in Julian years unless `frame_name="FK4"`,
in which case the value is in Besselian years. Typically matched to the
UVData attribute `phase_center_epoch`.
raise_error : bool
Whether to raise an error if the name has no match. Default is True, if set
to false will raise a warning instead.
Returns
-------
ref_name : str
Name of the CASA-based spatial coordinate reference frame.
Raises
------
ValueError
If the provided coordinate frame and/or epoch value has no matching
counterpart to those supported in CASA.
"""
# N.B. -- this is something of a stub for a more sophisticated function to
# handle this. For now, this just does a reverse lookup on the limited frames
# supported by UVData objects, although eventually it can be expanded to support
# more native MS frames.
reverse_dict = {ref: key for key, ref in COORD_PYUVDATA2CASA_DICT.items()}
ref_name = None
try:
ref_name = reverse_dict[
(str(frame_name), 2000.0 if (epoch_val is None) else float(epoch_val))
]
except KeyError as err:
epoch_msg = (
"no epoch" if epoch_val is None else f"epoch {format(epoch_val, 'g')}"
)
message = (
f"Frame {frame_name} ({epoch_msg}) does not have a "
"corresponding match to supported frames in the MS file format."
)
if raise_error:
raise ValueError(message) from err
else:
warnings.warn(message)
return ref_name
[docs]def _get_time_scale(ms_table, *, raise_error=False):
"""
Read time scale from TIME column in an MS table.
Parameters
----------
ms_table : table
Handle for the MeasurementSet table that contains a "TIME" column.
raise_error : bool
Whether to raise an error if the name has no match. Default is True, if set
to false will raise a warning instead.
Returns
-------
time_scale_name : str
Name of the time scale.
Raises
------
ValueError
If the CASA time scale frame does not match the known supported list of
time scales for astropy.
"""
timescale = ms_table.getcolkeyword("TIME", "MEASINFO")["Ref"]
if timescale.lower() not in Time.SCALES:
msg = (
"This file has a timescale that is not supported by astropy. "
"If you need support for this timescale please file an issue in our "
"GitHub issue log: "
"https://github.com/RadioAstronomySoftwareGroup/pyuvdata/issues."
)
if raise_error:
raise NotImplementedError(
msg + " To bypass this error, you can set raise_error=False, which "
"will raise a warning instead and treat the time as being in UTC."
)
else:
warnings.warn(msg + " Defaulting to treating it as being in UTC.")
timescale = "utc"
return timescale.lower()
[docs]def _parse_casa_frame_ref(ref_name, *, raise_error=True):
"""
Interpret a CASA frame into an astropy-friendly frame and epoch.
Parameters
----------
ref_name : str
Name of the CASA-based spatial coordinate reference frame.
raise_error : bool
Whether to raise an error if the name has no match. Default is True, if set
to false will raise a warning instead.
Returns
-------
frame_name : str
Name of the frame. Typically matched to the UVData attribute
`phase_center_frame`.
epoch_val : float
Epoch value for the given frame, in Julian years unless `frame_name="FK4"`,
in which case the value is in Besselian years. Typically matched to the
UVData attribute `phase_center_epoch`.
Raises
------
ValueError
If the CASA coordinate frame does not match the known supported list of
frames for CASA.
NotImplementedError
If using a CASA coordinate frame that does not yet have a corresponding
astropy frame that is supported by pyuvdata.
"""
frame_name = None
epoch_val = None
try:
frame_tuple = COORD_PYUVDATA2CASA_DICT[ref_name]
if frame_tuple is None:
message = f"Support for the {ref_name} frame is not yet supported."
if raise_error:
raise NotImplementedError(message)
else:
warnings.warn(message)
else:
frame_name = frame_tuple[0]
epoch_val = frame_tuple[1]
except KeyError as err:
message = (
f"The coordinate frame {ref_name} is not one of the supported frames "
"for measurement sets."
)
if raise_error:
raise ValueError(message) from err
else:
warnings.warn(message)
return frame_name, epoch_val
[docs]def read_ms_antenna(filepath, check_frame=True):
"""
Read Measurement Set ANTENNA table.
Parameters
----------
filepath : str
path to MS (without ANTENNA suffix)
check_frame : bool
If set to True and the "telescope_frame" keyword is found within the Measurement
Set, check that the frame is one supported by pyuvdata (and if not, an error is
raised). Currently supported frames include ITRS/ITRF and MCMF.
Returns
-------
ant_dict : dict
A dictionary with keys that map to columns of the MS, including "antenna_names"
(list of type string, shape (Nants_telescope,)), "station_names" (list of type
string, shape (Nants_telescope,)), "antenna_numbers" (ndarray of type int,
shape (Nants_telescope,)), "antenna_diameters" (ndarray of type float, shape
(Nants_telescope,)), "telescope_frame" (str), "telescope_ellipsoid" (str),
and "antenna_positions" (ndarray of type float, shape (Nants_telescope, 3)).
Raises
------
ValueError
If `check_frame=True` and "telescope_frame" does not match to a supported type.
FileNotFoundError
If no MS file is found with the provided name.
"""
_ms_utils_call_checks(filepath + "/ANTENNA")
# open table with antenna location information
with tables.table(filepath + "/ANTENNA", ack=False) as tb_ant:
antenna_positions = tb_ant.getcol("POSITION")
meas_info_dict = tb_ant.getcolkeyword("POSITION", "MEASINFO")
telescope_frame = meas_info_dict["Ref"].lower()
try:
telescope_ellipsoid = str(meas_info_dict["RefEllipsoid"])
except KeyError:
# No keyword means go to defaults
telescope_ellipsoid = "SPHERE" if telescope_frame == "mcmf" else None
if check_frame:
# Check the telescope frame to make sure it's supported
if telescope_frame not in ["itrs", "mcmf", "itrf"]:
raise ValueError(
f"Telescope frame in file is {telescope_frame}. "
"Only 'itrs' and 'mcmf' are currently supported."
)
# MS uses "ITRF" while astropy uses "itrs". They are the same.
elif telescope_frame == "itrf":
telescope_frame = "itrs"
# Note: measurement sets use the antenna number as an index into the antenna
# table. This means that if the antenna numbers do not start from 0 and/or are
# not contiguous, empty rows are inserted into the antenna table (similar to
# miriad)). These 'dummy' rows have positions of zero and need to be removed.
n_ants_table = antenna_positions.shape[0]
good_mask = np.any(antenna_positions, axis=1)
antenna_positions = antenna_positions[good_mask, :]
antenna_numbers = np.arange(n_ants_table)[good_mask]
# antenna names
antenna_names = np.asarray(tb_ant.getcol("NAME"))[good_mask].tolist()
station_names = np.asarray(tb_ant.getcol("STATION"))[good_mask].tolist()
ant_diameters = np.asarray(tb_ant.getcol("DISH_DIAMETER"))[good_mask]
antenna_mount = np.asarray(np.char.lower(tb_ant.getcol("MOUNT")))[
good_mask
].tolist()
if all(mount == "" for mount in antenna_mount):
# If no mounts recorded, discard the information.
antenna_mount = None
if not np.any(ant_diameters > 0):
ant_diameters = None
# Build a dict with all the relevant entries we need.
ant_dict = {
"antenna_positions": antenna_positions,
"antenna_numbers": antenna_numbers,
"telescope_frame": telescope_frame,
"telescope_ellipsoid": telescope_ellipsoid,
"antenna_names": antenna_names,
"station_names": station_names,
"antenna_mount": antenna_mount,
"antenna_diameters": ant_diameters,
}
# Return the dict
return ant_dict
[docs]def write_ms_antenna(
filepath,
uvobj=None,
*,
antenna_numbers=None,
antenna_names=None,
antenna_positions=None,
antenna_diameters=None,
antenna_mount=None,
telescope_location=None,
telescope_frame=None,
telescope_ellipsoid=None,
):
"""
Write out the antenna information into a CASA table.
Parameters
----------
filepath : str
Path to MS (without ANTENNA suffix).
uvobj : UVBase (with matching parameters)
Optional parameter, can be used to automatically fill the other required
keywords for this function. Note that the UVBase object must have a telescope
parameter with parameters that match by name to the other keywords
required here (with the exception of telescope_frame and telescope_ellipsoid,
which are derived from the telescope.location UVParameter).
antenna_numbers : ndarray
Required if uvobj not provided, antenna numbers for all antennas of the
telescope, dtype int and shape (Nants_telescope,).
antenna_names : list
Required if uvobj not provided, antenna names for all antennas of the telescope.
List should be length Nants_telescope, and contain elements of type str.
antenna_positions : ndarray
Required if uvobj not provided, ITRF/MCMF 3D position (in meters) of antennas
relative to the array center, of dtype float with shape (Nants_telescope, 3).
antenna_diameters : ndarray
Required if uvobj not provided, diameter (in meters) of each antenna, dtype
float with shape (Nants_telescope,).
telescope_location : ndarray
Required if uvobj not provided, ITRF/MCMF 3D location of the array center (in
meters), dtype float with shape (3,).
telescope_frame : str
Required if uvobj not provided, name of the frame in which the telescope
positions are provided (typically "MCMF", "ITRS", or "ITRF").
telescope_ellipsoid : str
Required if uvobj not provided and telescope frame is "MCMF", ellipsoid to use
for lunar coordinates. Must be one of "SPHERE", "GSFC", "GRAIL23",
"CE-1-LAM-GEO" (see lunarsky package for details).
Raises
------
FileNotFoundError
If no main MS table is found when looking at filepath.
"""
_ms_utils_call_checks(filepath)
filepath += "::ANTENNA"
if uvobj is not None:
antenna_numbers = uvobj.telescope.antenna_numbers
antenna_names = uvobj.telescope.antenna_names
antenna_positions = uvobj.telescope.antenna_positions
antenna_diameters = uvobj.telescope.antenna_diameters
antenna_mount = uvobj.telescope.mount_type
telescope_location = uvobj.telescope._location.xyz()
telescope_frame = uvobj.telescope._location.frame
telescope_ellipsoid = uvobj.telescope._location.ellipsoid
tabledesc = tables.required_ms_desc("ANTENNA")
dminfo = tables.makedminfo(tabledesc)
with tables.table(
filepath, tabledesc=tabledesc, dminfo=dminfo, ack=False, readonly=False
) as antenna_table:
# Note: measurement sets use the antenna number as an index into the antenna
# table. This means that if the antenna numbers do not start from 0 and/or are
# not contiguous, empty rows need to be inserted into the antenna table
# (this is somewhat similar to miriad)
nants_table = np.max(antenna_numbers) + 1
antenna_table.addrows(nants_table)
ant_names_table = [""] * nants_table
ant_mount_table = [""] * nants_table
for ai, num in enumerate(antenna_numbers):
ant_names_table[num] = antenna_names[ai]
ant_mount_table[num] = "" if antenna_mount is None else antenna_mount[ai]
# There seem to be some variation on whether the antenna names are stored
# in the NAME or STATION column (importuvfits puts them in the STATION column
# while Cotter and the MS definition doc puts them in the NAME column).
# The MS definition doc suggests that antenna names belong in the NAME column
# and the telescope name belongs in the STATION column (it gives the example of
# GREENBANK for this column.) so we follow that here. For a reconfigurable
# array, the STATION can be though of as the "pad" name, which is distinct from
# the antenna name/number, and nominally fixed in position.
antenna_table.putcol("NAME", ant_names_table)
antenna_table.putcol("STATION", ant_names_table)
antenna_table.putcol("MOUNT", ant_mount_table)
# Antenna positions in measurement sets appear to be in absolute ECEF
ant_pos_absolute = antenna_positions + telescope_location.reshape(1, 3)
ant_pos_table = np.zeros((nants_table, 3), dtype=np.float64)
for ai, num in enumerate(antenna_numbers):
ant_pos_table[num, :] = ant_pos_absolute[ai, :]
antenna_table.putcol("POSITION", ant_pos_table)
if antenna_diameters is not None:
ant_diam_table = np.zeros((nants_table), dtype=np.float64)
# This is here is suppress an error that arises when one has antennas of
# different diameters (which CASA can't handle), since otherwise the
# "padded" antennas have zero diameter (as opposed to any real telescope).
if len(np.unique(antenna_diameters)) == 1:
ant_diam_table[:] = antenna_diameters[0]
else:
for ai, num in enumerate(antenna_numbers):
ant_diam_table[num] = antenna_diameters[ai]
antenna_table.putcol("DISH_DIAMETER", ant_diam_table)
# Add telescope frame
telescope_frame = telescope_frame.upper()
telescope_frame = "ITRF" if (telescope_frame == "ITRS") else telescope_frame
meas_info_dict = antenna_table.getcolkeyword("POSITION", "MEASINFO")
meas_info_dict["Ref"] = telescope_frame
if telescope_frame == "MCMF" and telescope_ellipsoid is not None:
meas_info_dict["RefEllipsoid"] = telescope_ellipsoid
antenna_table.putcolkeyword("POSITION", "MEASINFO", meas_info_dict)
[docs]def read_ms_data_description(filepath):
"""Read Measurement Set DATA_DESCRIPTION table.
Parameters
----------
filepath : str
path to MS (without DATA_DESCRIPTION suffix)
Returns
-------
data_desc_dict : dict
A dictionary with keys that map to columns of the MS, used to index against
other tables. These include "SPECTRAL_WINDOW_ID", which match to given rows
from the SPECTRAL_WINDOW table, "POLARIZATION_ID", which match to given rows
in the POLARIZATION table, and "FLAG_ROW", which denotes of the ID in question
has been flagged.
Raises
------
FileNotFoundError
If no MS file is found with the provided name.
"""
_ms_utils_call_checks(filepath + "/DATA_DESCRIPTION")
# open table with the general data description
with tables.table(filepath + "/DATA_DESCRIPTION", ack=False) as tb_desc:
data_desc_dict = {}
for idx in range(tb_desc.nrows()):
data_desc_dict[idx] = {
"SPECTRAL_WINDOW_ID": tb_desc.getcell("SPECTRAL_WINDOW_ID", idx),
"POLARIZATION_ID": tb_desc.getcell("POLARIZATION_ID", idx),
"FLAG_ROW": tb_desc.getcell("FLAG_ROW", idx),
}
return data_desc_dict
[docs]def write_ms_data_description(
filepath, uvobj=None, nspws=None, flex_spw_polarization_array=None
):
"""
Write out the data description information into a CASA table.
Parameters
----------
filepath : str
Path to MS (without DATA_DESCRIPTION suffix).
uvobj : UVBase (with matching parameters)
Optional parameter, can be used to automatically fill the other required
keywords for this function. Note that the UVBase object must have parameters
that match by name to the other keywords required here.
nspws : int
Required if uvobj is not supplied, the total number of spectral windows to be
recorded.
flex_spw_polarization_array : list of int
Optional argument, required if trying to record a flex-pol data set, which
denotes the polarization per spectral window, with length equal to nspws.
Raises
------
FileNotFoundError
If no main MS table is found when looking at filepath.
"""
_ms_utils_call_checks(filepath)
filepath += "::DATA_DESCRIPTION"
if uvobj is not None:
nspws = uvobj.Nspws
flex_spw_polarization_array = uvobj.flex_spw_polarization_array
with tables.table(filepath, ack=False, readonly=False) as data_descrip_table:
data_descrip_table.addrows(nspws)
data_descrip_table.putcol("SPECTRAL_WINDOW_ID", np.arange(nspws))
if flex_spw_polarization_array is not None:
pol_dict = {
pol: idx
for idx, pol in enumerate(np.unique(flex_spw_polarization_array))
}
data_descrip_table.putcol(
"POLARIZATION_ID",
np.array([pol_dict[key] for key in flex_spw_polarization_array]),
)
[docs]def read_ms_field(filepath, return_phase_center_catalog=False):
"""
Read Measurement Set FIELD table.
Parameters
----------
filepath : str
path to MS (without FIELD suffix)
return_phase_center_catalog : bool
Nominally this function will return a dict containing the columns of the table,
but if set to True, instead a catalog will be supplied in the style of the
UVData/UVCal `phase_center_catalog` parameters (further description of this
parameter can be found in the class documentation).
Returns
-------
field_dict : dict
A dictionary with keys that map to columns of the MS, including "name" (field
names, list of len Nfield with elements of type str), "ra" (RA in radians,
ndarray of floats with shape (Nfield,)), "dec" (Dec in radians, ndarray of
floats with shape (Nfield,)), "source_id" (row number in MS file, ndarray of
ints with shape (Nfield,)), and "alt_id" (matching source ID number, ndarray of
ints with shape (Nfield,)). Only given if return_phase_center_catalog=False.
phase_center_catalog : dict
A catalog stylized like the UVData/UVCal `phase_center_catalog` parameters, with
keys matching to the "alt_id"/source ID entries. Useful for plugging directly
into the `phase_center_catalog` attribute of a UVBase object. Only supplied if
return_phase_center_catalog=True.
field_map : dict
A dict between row number (keys) and the source ID (values), which can be used
to map the phase center ID array from the main MS table. Only supplied if
return_phase_center_catalog=True.
Raises
------
FileNotFoundError
If no MS file is found with the provided name.
"""
_ms_utils_call_checks(filepath + "/FIELD")
with tables.table(filepath + "/FIELD", ack=False) as tb_field:
n_rows = tb_field.nrows()
field_dict = {
"name": tb_field.getcol("NAME"),
"ra": [None] * n_rows,
"dec": [None] * n_rows,
"source_id": [None] * n_rows,
"alt_id": [None] * n_rows,
}
# MSv2.0 appears to assume J2000. Not sure how to specifiy otherwise
measinfo_keyword = tb_field.getcolkeyword("PHASE_DIR", "MEASINFO")
var_frame = "VarRefCol" in measinfo_keyword
if var_frame:
# This seems to be a yet-undocumented feature for CASA, which allows one
# to specify an additional (optional?) column that defines the reference
# frame on a per-source basis.
ref_dir_dict = dict(
zip(
measinfo_keyword["TabRefCodes"],
measinfo_keyword["TabRefTypes"],
strict=True,
)
)
frame_list = []
epoch_list = []
for key in tb_field.getcol(measinfo_keyword["VarRefCol"]):
frame, epoch = _parse_casa_frame_ref(ref_dir_dict[key])
frame_list.append(frame)
epoch_list.append(epoch)
field_dict["frame"] = frame_list
field_dict["epoch"] = epoch_list
elif "Ref" in measinfo_keyword:
field_dict["frame"], field_dict["epoch"] = _parse_casa_frame_ref(
measinfo_keyword["Ref"]
)
else:
warnings.warn("Coordinate reference frame not detected, defaulting to ICRS")
field_dict["frame"] = "icrs"
field_dict["epoch"] = 2000.0
for idx in range(n_rows):
phase_dir = tb_field.getcell("PHASE_DIR", idx)
# Error if the phase_dir has a polynomial term because we don't know
# how to handle that
if phase_dir.shape[0] != 1: # pragma: no cover
raise NotImplementedError(
"PHASE_DIR is expressed as a polynomial. We do not currently "
"support this mode, if you need support for it please file "
"an issue in our GitHub issue log: "
"https://github.com/RadioAstronomySoftwareGroup/pyuvdata/issues."
)
field_dict["ra"][idx] = float(phase_dir[0, 0])
field_dict["dec"][idx] = float(phase_dir[0, 1])
field_dict["source_id"][idx] = idx
# If no column named SOURCE_ID exists, or if it does exist but is
# completely unfilled, just move on.
with contextlib.suppress(RuntimeError):
field_dict["alt_id"][idx] = int(tb_field.getcell("SOURCE_ID", idx))
if not return_phase_center_catalog:
return field_dict
phase_center_catalog = {}
for idx in range(n_rows):
phase_center_catalog[field_dict["source_id"][idx]] = {
"cat_name": field_dict["name"][idx],
"cat_type": "sidereal",
"cat_lon": field_dict["ra"][idx],
"cat_lat": field_dict["dec"][idx],
"cat_frame": field_dict["frame"][idx] if var_frame else field_dict["frame"],
"cat_epoch": field_dict["epoch"][idx] if var_frame else field_dict["epoch"],
"cat_times": None,
"cat_pm_ra": None,
"cat_pm_dec": None,
"cat_vrad": None,
"cat_dist": None,
"info_source": "file",
}
if any(item is None or item < 0 for item in field_dict["alt_id"]):
# If there's no alt id, return a blank mapping for the field IDs
return phase_center_catalog, {}
# Construct the mappings of row number to preferred ID, map back to catalog
field_id_map = dict(zip(field_dict["source_id"], field_dict["alt_id"], strict=True))
phase_center_catalog = {
newkey: phase_center_catalog[oldkey] for oldkey, newkey in field_id_map.items()
}
return phase_center_catalog, field_id_map
[docs]def write_ms_field(filepath, uvobj=None, phase_center_catalog=None, time_val=None):
"""
Write out the field information into a CASA table.
Parameters
----------
filepath : str
path to MS (without FIELD suffix)
uvobj : UVBase (with matching parameters)
Optional parameter, can be used to automatically fill the other required
keywords for this function. Note that the UVBase object must have parameters
that match by name to the other keywords required here (with the exception of
time_val, which is calculated from the time_array UVParameter).
phase_center_catalog : dict
A catalog stylized like the UVData/UVCal `phase_center_catalog` parameters (see
documentation of those classes for more details on expected structure). Required
if uvobj is not supplied.
time_val : float
Required if uvobj is not supplied, representative JD date for the catalog to
be recorded into the MS file.
Raises
------
FileNotFoundError
If no main MS table is found when looking at filepath.
"""
_ms_utils_call_checks(filepath)
filepath += "::FIELD"
if uvobj is not None:
phase_center_catalog = uvobj.phase_center_catalog
time_val = (
Time(np.median(uvobj.time_array), format="jd", scale="utc").mjd * 86400.0
)
tabledesc = tables.required_ms_desc("FIELD")
dminfo = tables.makedminfo(tabledesc)
with tables.table(
filepath, tabledesc=tabledesc, dminfo=dminfo, ack=False, readonly=False
) as field_table:
n_poly = 0
var_ref = False
for ind, phase_dict in enumerate(phase_center_catalog.values()):
if ind == 0:
sou_frame = phase_dict["cat_frame"]
sou_epoch = phase_dict["cat_epoch"]
continue
if (sou_frame != phase_dict["cat_frame"]) or (
sou_epoch != phase_dict["cat_epoch"]
):
var_ref = True
break
if var_ref:
var_ref_dict = {
key: val for val, key in enumerate(COORD_PYUVDATA2CASA_DICT)
}
col_ref_dict = {
"PHASE_DIR": "PhaseDir_Ref",
"DELAY_DIR": "DelayDir_Ref",
"REFERENCE_DIR": "RefDir_Ref",
}
for key in col_ref_dict:
fieldcoldesc = tables.makearrcoldesc(
col_ref_dict[key],
0,
valuetype="int",
datamanagertype="StandardStMan",
datamanagergroup="field standard manager",
)
del fieldcoldesc["desc"]["shape"]
del fieldcoldesc["desc"]["ndim"]
del fieldcoldesc["desc"]["_c_order"]
field_table.addcols(fieldcoldesc)
field_table.getcolkeyword(key, "MEASINFO")
ref_frame = _parse_pyuvdata_frame_ref(sou_frame, sou_epoch)
for col in ["PHASE_DIR", "DELAY_DIR", "REFERENCE_DIR"]:
meas_info_dict = field_table.getcolkeyword(col, "MEASINFO")
meas_info_dict["Ref"] = ref_frame
if var_ref:
rev_ref_dict = {value: key for key, value in var_ref_dict.items()}
meas_info_dict["TabRefTypes"] = [
rev_ref_dict[key] for key in sorted(rev_ref_dict.keys())
]
meas_info_dict["TabRefCodes"] = np.arange(
len(rev_ref_dict.keys()), dtype=np.int32
)
meas_info_dict["VarRefCol"] = col_ref_dict[col]
field_table.putcolkeyword(col, "MEASINFO", meas_info_dict)
sou_id_list = list(phase_center_catalog)
for idx, sou_id in enumerate(sou_id_list):
cat_dict = phase_center_catalog[sou_id]
phase_dir = np.array([[cat_dict["cat_lon"], cat_dict["cat_lat"]]])
if (cat_dict["cat_type"] == "ephem") and (phase_dir.ndim == 3):
phase_dir = np.median(phase_dir, axis=2)
sou_name = cat_dict["cat_name"]
ref_dir = _parse_pyuvdata_frame_ref(
cat_dict["cat_frame"], cat_dict["cat_epoch"], raise_error=var_ref
)
field_table.addrows()
field_table.putcell("DELAY_DIR", idx, phase_dir)
field_table.putcell("PHASE_DIR", idx, phase_dir)
field_table.putcell("REFERENCE_DIR", idx, phase_dir)
field_table.putcell("NAME", idx, sou_name)
field_table.putcell("NUM_POLY", idx, n_poly)
field_table.putcell("TIME", idx, time_val)
field_table.putcell("SOURCE_ID", idx, sou_id)
if var_ref:
for key in col_ref_dict:
field_table.putcell(col_ref_dict[key], idx, var_ref_dict[ref_dir])
[docs]def read_ms_history(filepath, pyuvdata_version_str, check_origin=False, raise_err=True):
"""
Read a CASA history table into a string for the uvdata history parameter.
Also stores messages column as a list for consistency with other uvdata types
Parameters
----------
filepath : str
Path to CASA table with history information.
pyuvdata_version_str : str
String containing the version of pyuvdata used to read in the MS file, which is
appended to the history if not previously encoded into the history table.
check_origin : bool
If set to True, check whether the MS in question was created by pyuvdata, as
determined by the history table. Default is False.
raise_err : bool
Normally an error is raised if a HISTORY table cannot be found. However, if
set to False, the history string containing just the pyuvdata version will
be returned instead.
Returns
-------
str
string encoding complete casa history table converted with a new
line denoting rows and a ';' denoting column breaks.
pyuvdata_written : bool
boolean indicating whether or not this MS was written by pyuvdata. Only returned
of `check_origin=True`.
Raises
------
FileNotFoundError
If no MS file is found with the provided name.
"""
try:
_ms_utils_call_checks(filepath + "/HISTORY")
except FileNotFoundError as err:
if raise_err:
raise err
# Just return the defaults, since no history file was found.
return (pyuvdata_version_str, False) if check_origin else pyuvdata_version_str
# Set up the history string and pyuvdata check
history_str = ""
pyuvdata_written = False
# Skip reading the history table if it has no information
with tables.table(filepath + "/HISTORY", ack=False) as tb_hist:
nrows = tb_hist.nrows()
if nrows > 0:
history_str = "Begin measurement set history\n"
# Grab the standard history columns to stitch together
application = tb_hist.getcol("APPLICATION")
message = tb_hist.getcol("MESSAGE")
obj_id = tb_hist.getcol("OBJECT_ID")
obs_id = tb_hist.getcol("OBSERVATION_ID")
origin = tb_hist.getcol("ORIGIN")
priority = tb_hist.getcol("PRIORITY")
times = tb_hist.getcol("TIME")
cols = []
# APP_PARAMS and CLI_COMMAND are not consistently filled, even though they
# appear to be required columns. Fill them in with zero-length strings
# if they don't conform to what we expect.
default_col = [""] * len(times)
for field in ["APP_PARAMS", "CLI_COMMAND"]:
try:
check_val = tb_hist.getcol(field)["array"]
cols.append(check_val if (len(check_val) > 0) else default_col)
except RuntimeError:
cols.append(default_col)
# Now add the rest of the columns and generate history string
cols += [application, message, obj_id, obs_id, origin, priority, times]
history_str += (
"APP_PARAMS;CLI_COMMAND;APPLICATION;MESSAGE;OBJECT_ID;"
"OBSERVATION_ID;ORIGIN;PRIORITY;TIME\n"
)
# if this MS was written by pyuvdata, some history that originated in
# pyuvdata is in the MS history table. We separate that out since it doesn't
# really belong to the MS history block (and so round tripping works)
pyuvdata_line_idx = [
idx for idx, line in enumerate(application) if "pyuvdata" in line
]
for row_idx in range(nrows):
# first check to see if this row was put in by pyuvdata.
# If so, don't mix them in with the MS stuff
if row_idx in pyuvdata_line_idx:
continue
newline = ";".join([str(col[row_idx]) for col in cols]) + "\n"
history_str += newline
history_str += "End measurement set history.\n"
# make a list of lines that are MS specific (i.e. not pyuvdata lines)
ms_line_idx = list(np.arange(nrows))
for drop_idx in reversed(pyuvdata_line_idx):
# Drop the pyuvdata-related lines, since we handle them separately.
# We do this in reverse to keep from messing up the indexing of the
# earlier entries.
ms_line_idx.pop(drop_idx)
# Handle the case where there is no history but pyuvdata
if len(ms_line_idx) == 0:
ms_line_idx = [-1]
if len(pyuvdata_line_idx) > 0:
pyuvdata_written = True
for idx in pyuvdata_line_idx:
if idx < min(ms_line_idx):
# prepend these lines to the history
history_str = message[idx] + "\n" + history_str
else:
# append these lines to the history
history_str += message[idx] + "\n"
# Check and make sure the pyuvdata version is in the history if it's not already
if not utils.history._check_history_version(history_str, pyuvdata_version_str):
history_str += pyuvdata_version_str
# Finally, return the completed string
if check_origin:
return history_str, pyuvdata_written
else:
return history_str
[docs]def write_ms_history(filepath, history=None, uvobj=None):
"""
Parse the history into an MS history table.
If the history string contains output from `_ms_hist_to_string`, parse that back
into the MS history table.
Parameters
----------
filepath : str
path to MS (without HISTORY suffix)
history : str
Required if uvobj is not given, a string containing the history to be written.
uvobj : UVBase (with matching parameters)
Optional parameter, can be used to automatically fill the history string.
Raises
------
FileNotFoundError
If no main MS table is found when looking at filepath.
"""
_ms_utils_call_checks(filepath)
filepath += "::HISTORY"
if uvobj is not None:
history = uvobj.history
app_params = []
cli_command = []
application = []
message = []
obj_id = []
obs_id = []
origin = []
priority = []
times = []
ms_history = "APP_PARAMS;CLI_COMMAND;APPLICATION;MESSAGE" in history
if ms_history:
# this history contains info from an MS history table. Need to parse it.
ms_header_line_no = None
ms_end_line_no = None
pre_ms_history_lines = []
post_ms_history_lines = []
for line_no, line in enumerate(history.splitlines()):
if not ms_history:
continue
if "APP_PARAMS;CLI_COMMAND;APPLICATION;MESSAGE" in line:
ms_header_line_no = line_no
# we don't need this line anywhere below so continue
continue
if "End measurement set history" in line:
ms_end_line_no = line_no
# we don't need this line anywhere below so continue
continue
if ms_header_line_no is not None and ms_end_line_no is None:
# this is part of the MS history block. Parse it.
line_parts = line.split(";")
if len(line_parts) != 9:
# If the line has the wrong number of elements, then the history
# is mangled and we shouldn't try to parse it -- just record
# line-by-line as we do with any other pyuvdata history.
warnings.warn(
"Failed to parse prior history of MS file, "
"switching to standard recording method."
)
pre_ms_history_lines = post_ms_history_lines = []
ms_history = False
continue
app_params.append(line_parts[0])
cli_command.append(line_parts[1])
application.append(line_parts[2])
message.append(line_parts[3])
obj_id.append(int(line_parts[4]))
obs_id.append(int(line_parts[5]))
origin.append(line_parts[6])
priority.append(line_parts[7])
times.append(np.float64(line_parts[8]))
elif ms_header_line_no is None:
# this is before the MS block
if "Begin measurement set history" not in line:
pre_ms_history_lines.append(line)
else:
# this is after the MS block
post_ms_history_lines.append(line)
for line_no, line in enumerate(pre_ms_history_lines):
app_params.insert(line_no, "")
cli_command.insert(line_no, "")
application.insert(line_no, "pyuvdata")
message.insert(line_no, line)
obj_id.insert(line_no, 0)
obs_id.insert(line_no, -1)
origin.insert(line_no, "pyuvdata")
priority.insert(line_no, "INFO")
times.insert(line_no, Time.now().mjd * 3600.0 * 24.0)
for line in post_ms_history_lines:
app_params.append("")
cli_command.append("")
application.append("pyuvdata")
message.append(line)
obj_id.append(0)
obs_id.append(-1)
origin.append("pyuvdata")
priority.append("INFO")
times.append(Time.now().mjd * 3600.0 * 24.0)
if not ms_history:
# no prior MS history detected in the history. Put all of our history in
# the message column
for line in history.splitlines():
app_params.append("")
cli_command.append("")
application.append("pyuvdata")
message.append(line)
obj_id.append(0)
obs_id.append(-1)
origin.append("pyuvdata")
priority.append("INFO")
times.append(Time.now().mjd * 3600.0 * 24.0)
tabledesc = tables.required_ms_desc("HISTORY")
dminfo = tables.makedminfo(tabledesc)
with tables.table(
filepath, tabledesc=tabledesc, dminfo=dminfo, ack=False, readonly=False
) as history_table:
nrows = len(message)
history_table.addrows(nrows)
# the first two lines below break on python-casacore < 3.1.0
history_table.putcol("APP_PARAMS", np.asarray(app_params)[:, np.newaxis])
history_table.putcol("CLI_COMMAND", np.asarray(cli_command)[:, np.newaxis])
history_table.putcol("APPLICATION", application)
history_table.putcol("MESSAGE", message)
history_table.putcol("OBJECT_ID", obj_id)
history_table.putcol("OBSERVATION_ID", obs_id)
history_table.putcol("ORIGIN", origin)
history_table.putcol("PRIORITY", priority)
history_table.putcol("TIME", times)
[docs]def read_ms_observation(filepath):
"""
Read Measurement Set OBSERVATION table.
Parameters
----------
filepath : str
path to MS (without OBSERVATION suffix)
Returns
-------
obs_dict : dict
A dictionary containing observation information, including "telescope_name"
(str with the name of the telescope), "observer" (str of observer name), and
if present, "telescope_location" (ndarray of floats and shape (3,) describing
the ITRF/MCMF 3D location in meters of the array center).
Raises
------
FileNotFoundError
If no MS file is found with the provided name.
"""
_ms_utils_call_checks(filepath + "/OBSERVATION")
obs_dict = {}
with tables.table(filepath + "/OBSERVATION", ack=False) as tb_obs:
obs_dict["telescope_name"] = tb_obs.getcol("TELESCOPE_NAME")[0]
obs_dict["observer"] = tb_obs.getcol("OBSERVER")[0]
# check to see if a TELESCOPE_LOCATION column is present in the observation
# table. This is non-standard, but inserted by pyuvdata
if "TELESCOPE_LOCATION" in tb_obs.colnames():
telescope_location = np.squeeze(tb_obs.getcol("TELESCOPE_LOCATION"))
obs_dict["telescope_location"] = telescope_location
return obs_dict
[docs]def write_ms_observation(
filepath, uvobj=None, *, telescope_name=None, telescope_location=None, observer=None
):
"""
Write out the observation information into a CASA table.
Parameters
----------
filepath : str
path to MS (without OBSERVATION suffix)
uvobj : UVBase (with matching parameters)
Optional parameter, can be used to automatically fill the other required
keywords for this function. Note that the UVBase object must have parameters
that match by name to the other keywords required here (with the exception of
telescope_frame, which is pulled from the telescope_location UVParameter).
telescope_name :str
Required if uvobj not provided, name of the telescope.
telescope_location : ndarray
Required if uvobj not provided, 3D location (in ITRF/MCMF coordinates in meters)
of the array center.
observer : str
Required if uvobj not provided, name of the observer.
Raises
------
FileNotFoundError
If no main MS table is found when looking at filepath.
"""
_ms_utils_call_checks(filepath)
filepath += "::OBSERVATION"
if uvobj is not None:
telescope_name = uvobj.telescope.name
telescope_location = uvobj.telescope._location.xyz()
observer = telescope_name
for key in uvobj.extra_keywords:
if key.upper() == "OBSERVER":
observer = uvobj.extra_keywords[key]
tabledesc = tables.required_ms_desc("OBSERVATION")
dminfo = tables.makedminfo(tabledesc)
with tables.table(
filepath, tabledesc=tabledesc, dminfo=dminfo, ack=False, readonly=False
) as observation_table:
observation_table.addrows()
observation_table.putcell("TELESCOPE_NAME", 0, telescope_name)
# It appears that measurement sets do not have a concept of a telescope location
# We add it here as a non-standard column in order to round trip it properly
name_col_desc = tableutil.makearrcoldesc(
"TELESCOPE_LOCATION", telescope_location[0], shape=[3], valuetype="double"
)
observation_table.addcols(name_col_desc)
observation_table.putcell("TELESCOPE_LOCATION", 0, telescope_location)
observation_table.putcell("OBSERVER", 0, observer)
[docs]def read_ms_spectral_window(filepath):
"""
Read Measurement Set SPECTRAL_WINDOW table.
Parameters
----------
filepath : str
path to MS (without SPECTRAL_WINDOW suffix)
Returns
-------
spw_dict : dict
Dictionary containing spectral window information, including "chan_freq"
(ndarray of float and shape (Nspws,), center frequencies of the channels),
"chan_width" (ndarray of float and shape (Nspws,), channel bandwidths),
"num_chan" (ndarray of int and shape (Nspws,), number of channels per window),
and "row_idx" (list of length Nspws with int elements, containing the table row
number for each window).
Raises
------
FileNotFoundError
If no MS file is found with the provided name.
"""
_ms_utils_call_checks(filepath + "/SPECTRAL_WINDOW")
with tables.table(filepath + "/SPECTRAL_WINDOW", ack=False) as tb_spws:
n_rows = tb_spws.nrows()
# The SPECTRAL_WINDOW table is a little special, in that some rows can
# contain arrays of different shapes. For that reason, we'll pre-populate lists
# for each element that we're interested in plugging things into.
spw_dict = {
"chan_freq": [None] * n_rows,
"chan_width": [None] * n_rows,
"num_chan": tb_spws.getcol("NUM_CHAN"),
"row_idx": list(range(n_rows)),
}
try:
spw_dict["assoc_spw_id"] = [
int(idx[0]) for idx in tb_spws.getcol("ASSOC_SPW_ID")
]
spw_dict["spoof_spw_id"] = False
except RuntimeError:
spw_dict["assoc_spw_id"] = np.arange(n_rows)
spw_dict["spoof_spw"] = True
for idx in range(n_rows):
spw_dict["chan_freq"][idx] = tb_spws.getcell("CHAN_FREQ", idx)
spw_dict["chan_width"][idx] = tb_spws.getcell("CHAN_WIDTH", idx)
return spw_dict
[docs]def write_ms_spectral_window(
filepath=None,
uvobj=None,
*,
freq_array=None,
channel_width=None,
spw_array=None,
id_array=None,
):
"""
Write out the spectral information into a CASA table.
Parameters
----------
filepath : str
path to MS (without SPECTRAL_WINDOW suffix)
uvobj : UVBase (with matching parameters)
Optional parameter, can be used to automatically fill the other required
keywords for this function. Note that the UVBase object must have parameters
that match by name to the other keywords required here (with the exception of
id_array, which is pulled from spw_array or flex_spw_id_array depending on the
context).
freq_array : ndarray
Required if uvobj not provided, frequency centers for each channel. Expected
shape is (Nfreqs,), type float.
channel_width : ndarray or float
Required if uvobj not provided, frequency centers for each channel. Expected
to be of shape (Nfreqs,), type float.
spw_array : ndarray
Required if uvobj not provided, ID numbers for spectral windows.
id_array : ndarray
Map of how each entry in `freq_array` matches to the spectral windows in
`spw_array`. Required if uvobj not provided.
Raises
------
FileNotFoundError
If no main MS table is found when looking at filepath.
"""
_ms_utils_call_checks(filepath)
filepath += "::SPECTRAL_WINDOW"
if uvobj is not None:
if "_freq_range" in uvobj and uvobj.freq_range is not None:
freq_array = np.mean(uvobj.freq_range, axis=-1)
channel_width = np.squeeze(np.diff(uvobj.freq_range, axis=-1), axis=-1)
id_array = np.array(uvobj.spw_array)
else:
freq_array = uvobj.freq_array
channel_width = uvobj.channel_width
id_array = uvobj.flex_spw_id_array
spw_array = uvobj.spw_array
# Construct a couple of columns we're going to use that are not part of
# the MS v2.0 baseline format (though are useful for pyuvdata objects).
tabledesc = tables.required_ms_desc("SPECTRAL_WINDOW")
extended_desc = tables.complete_ms_desc("SPECTRAL_WINDOW")
tabledesc["ASSOC_SPW_ID"] = extended_desc["ASSOC_SPW_ID"]
tabledesc["ASSOC_NATURE"] = extended_desc["ASSOC_NATURE"]
dminfo = tables.makedminfo(tabledesc)
with tables.table(
filepath, tabledesc=tabledesc, dminfo=dminfo, ack=False, readonly=False
) as sw_table:
for idx, spw_id in enumerate(spw_array):
ch_mask = ... if id_array is None else id_array == spw_id
sw_table.addrows()
sw_table.putcell("NUM_CHAN", idx, np.sum(ch_mask))
sw_table.putcell("NAME", idx, f"SPW{spw_id}")
sw_table.putcell("ASSOC_SPW_ID", idx, spw_id)
sw_table.putcell("ASSOC_NATURE", idx, "") # Blank for now
sw_table.putcell("CHAN_FREQ", idx, freq_array[ch_mask])
sw_table.putcell("CHAN_WIDTH", idx, channel_width[ch_mask])
sw_table.putcell("EFFECTIVE_BW", idx, channel_width[ch_mask])
sw_table.putcell("TOTAL_BANDWIDTH", idx, np.sum(channel_width[ch_mask]))
sw_table.putcell("RESOLUTION", idx, channel_width[ch_mask])
# TODO: These are placeholders for now, but should be replaced with
# actual frequency reference info (once pyuvdata handles that)
sw_table.putcell("MEAS_FREQ_REF", idx, VEL_DICT["TOPO"])
sw_table.putcell("REF_FREQUENCY", idx, freq_array[0])
[docs]def read_ms_feed(filepath, select_ants=None):
"""
Read Measurement Set FEED table.
Note that this method is not yet implemented, and is a placeholder for future
development.
Parameters
----------
filepath : str
path to MS (without FEED suffix)
Returns
-------
feed_dict : dict
Dictionary containing feed information.
Raises
------
FileNotFoundError
If no MS file is found with the provided name.
"""
_ms_utils_call_checks(filepath + "/FEED")
filepath += "::FEED"
with tables.table(filepath, ack=False) as feed_table:
if "pyuvdata_has_feed" in feed_table.getkeywords() and not (
feed_table.getkeyword("pyuvdata_has_feed")
):
feed_array = feed_angle = Nfeeds = antenna_numbers = None
else:
Nfeeds = feed_table.getcol("NUM_RECEPTORS")
if not all(max(Nfeeds) == Nfeeds):
# This seems like a rare possibility, but screen for it here.
raise NotImplementedError( # pragma: no cover
"Support for differing numbers of feeds is not supported. If "
"you need support for it please file an issue in our GitHub "
"issue log: "
"https://github.com/RadioAstronomySoftwareGroup/pyuvdata/issues."
)
Nfeeds = int(Nfeeds[0])
ant_arr = feed_table.getcol("ANTENNA_ID")
pol_arr = np.char.lower(feed_table.getcol("POLARIZATION_TYPE")["array"])
pol_arr = pol_arr.reshape(feed_table.getcol("POLARIZATION_TYPE")["shape"])
rang_arr = feed_table.getcol("RECEPTOR_ANGLE")
max_ant = max(ant_arr) + 1
feed_array = np.full((max_ant, Nfeeds), "", dtype=np.object_)
feed_angle = np.zeros((max_ant, Nfeeds), dtype=float)
antenna_numbers = np.arange(max_ant)
feed_array[ant_arr] = pol_arr
feed_angle[ant_arr] = rang_arr
# Set default case to be lower to be consistent w/ pyuvdata standards
feed_array.flat[:] = [item.lower() for item in feed_array.flat]
if select_ants is not None and Nfeeds is not None:
mask = np.isin(np.arange(max_ant), select_ants)
feed_array = feed_array[mask]
feed_angle = feed_angle[mask]
antenna_numbers = select_ants
tb_feed_dict = {
"feed_array": feed_array,
"feed_angle": feed_angle,
"Nfeeds": Nfeeds,
"antenna_numbers": antenna_numbers,
}
return tb_feed_dict
[docs]def write_ms_feed(
filepath,
uvobj=None,
nfeeds=None,
nspws=None,
feed_array=None,
feed_angle=None,
antenna_numbers=None,
time_val=None,
):
"""
Write out the feed information into a CASA table.
Parameters
----------
filepath : str
path to MS (without FEED suffix)
uvobj : UVBase (with matching parameters)
Optional parameter, can be used to automatically fill the other required
keywords for this function. Note that the UVBase object must have parameters
that match by name to the other keywords required here.
nfeeds : int
Required if uvobj not provided, number of feeds in the telescope for the object.
feed_array : ndarray of str
Required if uvobj not provided, describes the polarization of each receiver,
should be one of "X", "Y", "L", or "R". Shape (nants, nfeeds).
feed_angle : ndarray of float
Required if uvobj not provided, orientation of the receiver with respect to the
great circle at fixed azimuth, shape (nants, nfeeds).
antenna_numbers : array-like of int
Required if uvobj not provided, antenna numbers for all antennas of the
telescope, dtype int and shape (nants,).
time_val : float
Required if uvobj is not supplied, representative JD date for the catalog to
be recorded into the MS file.
Raises
------
FileNotFoundError
If no main MS table is found when looking at filepath.
"""
_ms_utils_call_checks(filepath)
filepath += "::FEED"
has_feed = feed_array is None
if uvobj is not None:
if uvobj.telescope.feed_array is not None:
feed_array = uvobj.telescope.feed_array
feed_angle = uvobj.telescope.feed_angle
nfeeds = int(uvobj.telescope.Nfeeds)
has_feed = True
else:
if uvobj.flex_spw_polarization_array is None:
pols = uvobj.polarization_array
else:
pols = uvobj.flex_spw_polarization_array
feed_pols = utils.pol.get_feeds_from_pols(pols)
nfeeds = len(feed_pols)
feed_array = np.tile(sorted(feed_pols), (uvobj.telescope.Nants, 1))
feed_angle = np.zeros((uvobj.telescope.Nants, nfeeds))
has_feed = False
antenna_numbers = uvobj.telescope.antenna_numbers
nspws = uvobj.Nspws
time_val = (
Time(np.median(uvobj.time_array), format="jd", scale="utc").mjd * 86400.0
)
nrows = np.max(antenna_numbers) + 1
antenna_id_table = np.arange(nrows, dtype=np.int32)
with tables.table(filepath, ack=False, readonly=False) as feed_table:
# Record whether or not we actually have the feed information specified, versus
# deriving it from a polarization table
feed_table.putkeyword("pyuvdata_has_feed", has_feed)
# Plug in what we need here. Tile based on the first element to plug in valid
# entries for all antennas so that CASA doesn't complain.
pol_type_table = np.tile(feed_array[0], (nrows, 1)).astype("<U1")
pol_type_table[antenna_numbers] = feed_array
pol_type_table = np.char.upper(pol_type_table)
receptor_angle_table = np.zeros((nrows, nfeeds), dtype=np.float64)
receptor_angle_table[antenna_numbers] = feed_angle
for idx in range(nrows):
if pol_type_table[idx].tolist() in [["Y", "X"], ["L", "R"]]:
pol_type_table[idx, :] = pol_type_table[idx, ::-1]
receptor_angle_table[idx, :] = receptor_angle_table[idx, ::-1]
pol_type_table = np.repeat(pol_type_table, nspws, axis=0)
receptor_angle_table = np.repeat(receptor_angle_table, nspws, axis=0)
antenna_id_table = np.repeat(antenna_id_table, nspws, axis=0)
spectral_window_id_table = np.tile(np.arange(nspws, dtype=np.int32), nrows)
num_receptors_table = np.full(nrows * nspws, nfeeds, dtype=np.int32)
# These are all "sensible defaults" for now.
beam_id_table = -1 * np.ones(nrows * nspws, dtype=np.int32)
beam_offset_table = np.zeros((nrows * nspws, 2, 2), dtype=np.float64)
feed_id_table = np.zeros(nrows * nspws, dtype=np.int32)
time_table = np.full(nrows * nspws, time_val, dtype=np.float64)
interval_table = np.full(nrows * nspws, np.finfo(float).max, dtype=np.float64)
position_table = np.zeros((nrows * nspws, 3), dtype=np.float64)
# TODO: Check and see if this needs additional info for polcal...
pol_response_table = np.zeros((nspws * nrows, 2, 2), dtype=np.complex64)
feed_table.addrows(nrows * nspws)
feed_table.putcol("ANTENNA_ID", antenna_id_table)
feed_table.putcol("BEAM_ID", beam_id_table)
feed_table.putcol("BEAM_OFFSET", beam_offset_table)
feed_table.putcol("FEED_ID", feed_id_table)
feed_table.putcol("TIME", time_table)
feed_table.putcol("INTERVAL", interval_table)
feed_table.putcol("NUM_RECEPTORS", num_receptors_table)
feed_table.putcol("POLARIZATION_TYPE", pol_type_table)
feed_table.putcol("POL_RESPONSE", pol_response_table)
feed_table.putcol("POSITION", position_table)
feed_table.putcol("RECEPTOR_ANGLE", receptor_angle_table)
feed_table.putcol("SPECTRAL_WINDOW_ID", spectral_window_id_table)
[docs]def read_ms_source(filepath):
"""
Read Measurement Set SOURCE table.
Parameters
----------
filepath : str
path to MS (without SOURCE suffix)
Returns
-------
source_dict : dict
A dictionary with keys matched to the source ID, and up to six values which
map to specific keys in entries of the `phase_center_catalog` parameter
found in UVData/UVCal objects. They are "cat_lon" (longitudinal position in
radians), "cat_lat" (latitudinal position in radians), "cat_times"
(JD dates of ephemeris entries), "cat_type" (type of catalog entry), "cat_pm_ra"
(proper motion in RA), and "cat_pm_dec" (proper motion in Dec). These entries
can be used to update keys in `phase_center_catalog` as derived from the FIELD
table, which only has space to record a single RA/Dec coordinate plus reference
frame information.
Raises
------
FileNotFoundError
If no MS file is found with the provided name.
"""
_ms_utils_call_checks(filepath + "/SOURCE")
tb_sou_dict = {}
with tables.table(filepath + "/SOURCE", ack=False) as tb_source:
for idx in range(tb_source.nrows()):
sou_id = tb_source.getcell("SOURCE_ID", idx)
pm_vec = tb_source.getcell("PROPER_MOTION", idx)
time_stamp = tb_source.getcell("TIME", idx)
sou_vec = tb_source.getcell("DIRECTION", idx)
try:
for idx in np.where(
np.isclose(
tb_sou_dict[sou_id]["cat_times"], time_stamp, rtol=0, atol=1e-3
)
)[0]:
if not (
(tb_sou_dict[sou_id]["cat_lon"][idx] == sou_vec[0])
and (tb_sou_dict[sou_id]["cat_lat"][idx] == sou_vec[1])
and (tb_sou_dict[sou_id]["cat_pm_ra"][idx] == pm_vec[0])
and (tb_sou_dict[sou_id]["cat_pm_dec"][idx] == pm_vec[1])
):
warnings.warn(
"Different windows in this MS file contain different "
"metadata for the same integration. Be aware that "
"UVData objects do not allow for this, and thus will "
"default to using the metadata from the last row read "
"from the SOURCE table." + reporting_request
)
_ = tb_sou_dict[sou_id]["cat_times"].pop(idx)
_ = tb_sou_dict[sou_id]["cat_lon"].pop(idx)
_ = tb_sou_dict[sou_id]["cat_lat"].pop(idx)
_ = tb_sou_dict[sou_id]["cat_pm_ra"].pop(idx)
_ = tb_sou_dict[sou_id]["cat_pm_dec"].pop(idx)
tb_sou_dict[sou_id]["cat_times"].append(time_stamp)
tb_sou_dict[sou_id]["cat_lon"].append(sou_vec[0])
tb_sou_dict[sou_id]["cat_lat"].append(sou_vec[1])
tb_sou_dict[sou_id]["cat_pm_ra"].append(pm_vec[0])
tb_sou_dict[sou_id]["cat_pm_dec"].append(pm_vec[1])
except KeyError:
tb_sou_dict[sou_id] = {
"cat_times": [time_stamp],
"cat_lon": [sou_vec[0]],
"cat_lat": [sou_vec[1]],
"cat_pm_ra": [pm_vec[0]],
"cat_pm_dec": [pm_vec[1]],
}
for cat_dict in tb_sou_dict.values():
make_arr = len(cat_dict["cat_times"]) != 1
if make_arr:
# Convert CASA time to JD (pyuvdata standard)
cat_dict["cat_times"] = Time(
np.array(cat_dict["cat_times"]) / 86400.0, format="mjd", scale="utc"
).jd
cat_dict["cat_type"] = "ephem"
else:
del cat_dict["cat_times"]
for key in cat_dict:
if make_arr:
cat_dict[key] = np.array(cat_dict[key])
else:
cat_dict[key] = cat_dict[key][0]
if np.allclose(cat_dict["cat_pm_ra"], 0) and np.allclose(
cat_dict["cat_pm_dec"], 0
):
cat_dict["cat_pm_ra"] = cat_dict["cat_pm_dec"] = None
return tb_sou_dict
[docs]def write_ms_source(filepath, uvobj=None, time_default=None, phase_center_catalog=None):
"""
Write out the source information into a CASA table.
Parameters
----------
filepath : str
path to MS (without SOURCE suffix)
uvobj : UVBase (with matching parameters)
Optional parameter, can be used to automatically fill the other required
keywords for this function. Note that the UVBase object must have parameters
that match by name to the other keywords required here (with the exception of
time_default, which is derived from the time_array UVParameter).
time_default : float
Default time (in MJD seconds) to use for the catalog entries.
phase_center_catalog : dict
A catalog stylized like the UVData/UVCal `phase_center_catalog` parameters (see
documentation of those classes for more details on expected structure). Required
if uvobj is not supplied.
Raises
------
FileNotFoundError
If no main MS table is found when looking at filepath.
"""
_ms_utils_call_checks(filepath)
filepath += "::SOURCE"
if uvobj is not None:
time_default = (
Time(np.median(uvobj.time_array), format="jd", scale="utc").mjd * 86400.0
)
phase_center_catalog = uvobj.phase_center_catalog
# Handle this table a bit specially, since it's not a strictly required table
source_desc = tables.complete_ms_desc("SOURCE")
dminfo = tables.makedminfo(source_desc)
with tables.table(
filepath, tabledesc=source_desc, dminfo=dminfo, ack=False, readonly=False
) as source_table:
# Make the default time be the midpoint of the obs
# Default interval should be several times a Hubble time. Gotta make this code
# future-proofed until the eventual heat death of the Universe.
int_default = np.finfo(float).max
row_count = 0
for sou_id, pc_dict in phase_center_catalog.items():
# Get some pieces of info that should not depend on the cat type, like name,
# proper motions, (others possible)
int_val = int_default
sou_name = pc_dict["cat_name"]
pm_dir = np.array(
[pc_dict.get("cat_pm_ra"), pc_dict.get("cat_pm_ra")], dtype=np.double
)
if not np.all(np.isfinite(pm_dir)):
pm_dir[:] = 0.0
if pc_dict["cat_type"] == "sidereal":
# If this is just a single set of points, set up values to have shape
# (1, X) so that we can iterate through them later.
sou_dir = np.array([[pc_dict["cat_lon"], pc_dict["cat_lat"]]])
time_val = np.array([time_default])
elif pc_dict["cat_type"] == "ephem":
# Otherwise for ephem, make time the outer-most axis so that we
# can easily iterate through.
sou_dir = np.vstack((pc_dict["cat_lon"], pc_dict["cat_lat"])).T
time_val = (
Time(pc_dict["cat_times"], format="jd", scale="utc").mjd * 86400.0
).flatten()
# If there are multiple time entries, then approximate a value for the
# interval (needed for CASA, not UVData) by taking the range divided
# by the number of points in the ephem.
if len(time_val) > 1:
int_val = (time_val.max() - time_val.min()) / (len(time_val) - 1)
for idx in range(len(sou_dir)):
source_table.addrows()
source_table.putcell("NAME", row_count, sou_name)
source_table.putcell("SOURCE_ID", row_count, sou_id)
source_table.putcell("INTERVAL", row_count, int_val)
source_table.putcell("SPECTRAL_WINDOW_ID", row_count, -1)
source_table.putcell("NUM_LINES", row_count, 0)
source_table.putcell("TIME", row_count, time_val[idx])
source_table.putcell("DIRECTION", row_count, sou_dir[idx])
source_table.putcell("PROPER_MOTION", row_count, pm_dir)
row_count += 1
[docs]def read_ms_pointing(filepath):
"""
Read Measurement Set POINTING table.
Note that this method is not yet implemented, and is a placeholder for future
development.
Parameters
----------
filepath : str
path to MS (without POINTING suffix)
Returns
-------
pointing_dict : dict
Dictionary containing pointing information.
Raises
------
FileNotFoundError
If no MS file is found with the provided name.
"""
_ms_utils_call_checks(filepath + "/POINTING")
raise NotImplementedError("Reading of MS POINTING tables not available yet.")
[docs]def write_ms_pointing(
filepath, uvobj=None, max_ant=None, integration_time=None, time_array=None
):
"""
Write out the pointing information into a CASA table.
Parameters
----------
filepath : str
path to MS (without POINTING suffix)
uvobj : UVBase (with matching parameters)
Optional parameter, can be used to automatically fill the other required
keywords for this function. Note that the UVBase object must have parameters
that match by name to the other keywords required here (with the exception of
max_ant, which is derived from from the antenna_numbers UVParameter).
max_ant : int
Required if uvobj not provided, the highest-number antenna for the telescope.
integration_time : ndarray
Required if uvobj not provided, integration time per entry. Ndarray of float,
should match in shape to `time_array`.
time_array : ndarray
Required if uvobj not provided, JD date of each entry. Ndarray of float, should
match in shape to `integration_time`.
Raises
------
FileNotFoundError
If no main MS table is found when looking at filepath.
"""
_ms_utils_call_checks(filepath)
filepath += "::POINTING"
if uvobj is not None:
max_ant = np.max(uvobj.telescope.antenna_numbers)
integration_time = uvobj.integration_time
time_array = uvobj.time_array
with tables.table(filepath, ack=False, readonly=False) as pointing_table:
nants_casa = max_ant + 1
antennas = np.arange(nants_casa, dtype=np.int32)
# We want the number of unique timestamps here, since CASA wants a
# per-timestamp, per-antenna entry
unique_times = np.unique(time_array)
times = Time(unique_times, format="jd", scale="utc").mjd * 86400.0
ntimes = len(times)
# Same for the number of intervals
intervals = np.zeros_like(times)
for idx, ind_time in enumerate(unique_times):
intervals[idx] = np.median(integration_time[time_array == ind_time])
nrows = nants_casa * ntimes
# This extra step of adding a single row and plugging in values for DIRECTION
# and TARGET are a workaround for a weird issue that pops up where, because the
# values are not a fixed size (they are shape(2, Npoly), where Npoly is allowed
# to vary from integration to integration), casacore seems to be very slow
# filling in the data after the fact. By "pre-filling" the first row, the
# addrows operation will automatically fill in an appropriately shaped array.
# TODO: This works okay for steerable dishes, but less well for non-tracking
# arrays (i.e., where primary beam center != phase center). Fix later.
pointing_table.addrows(1)
pointing_table.putcell("DIRECTION", 0, np.zeros((2, 1), dtype=np.float64))
pointing_table.putcell("TARGET", 0, np.zeros((2, 1), dtype=np.float64))
pointing_table.addrows(nrows - 1)
pointing_table.putcol("ANTENNA_ID", np.tile(antennas, ntimes))
pointing_table.putcol("TIME", np.repeat(times, nants_casa))
pointing_table.putcol("INTERVAL", np.repeat(intervals, nants_casa))
name_col = np.asarray(["ZENITH"] * nrows, dtype=np.bytes_)
pointing_table.putcol("NAME", name_col, nrow=nrows)
pointing_table.putcol("NUM_POLY", np.zeros(nrows, dtype=np.int32))
pointing_table.putcol("TIME_ORIGIN", np.repeat(times, nants_casa))
# we always "point" at zenith
# TODO: Fix this for steerable arrays
direction_col = np.zeros((nrows, 2, 1), dtype=np.float64)
direction_col[:, 1, :] = np.pi / 2
pointing_table.putcol("DIRECTION", direction_col)
# just reuse "direction" for "target"
pointing_table.putcol("TARGET", direction_col)
# set tracking info to `False`
pointing_table.putcol("TRACKING", np.zeros(nrows, dtype=bool))
[docs]def read_ms_polarization(filepath):
"""
Read Measurement Set POLARIZATION table.
Parameters
----------
filepath : str
path to MS (without POLARIZATION suffix)
Returns
-------
pol_dict : dict
A dictionary with keys matched to the polarization ID, with values "corr_type"
(ndarray of int, shape (Npols,), matched to polarization code) and "num_corr"
(ndarray of int, shape (Npols,), tallying the total number of polarization
entries per record).
Raises
------
FileNotFoundError
If no MS file is found with the provided name.
"""
_ms_utils_call_checks(filepath + "/POLARIZATION")
with tables.table(filepath + "/POLARIZATION", ack=False) as tb_pol:
pol_dict = {}
for pol_id in range(tb_pol.nrows()):
pol_dict[pol_id] = {
"corr_type": tb_pol.getcell("CORR_TYPE", pol_id).astype(int),
"num_corr": tb_pol.getcell("NUM_CORR", pol_id),
}
return pol_dict
[docs]def write_ms_polarization(
filepath, *, pol_order=..., uvobj=None, polarization_array=None, flex_pol=False
):
"""
Write out the polarization information into a CASA table.
Parameters
----------
filepath : str
path to MS (without POLARIZATION suffix)
pol_order : slice or list of int
Ordering of the polarization axis on write, only used if not writing a
flex-pol dataset.
uvobj : UVBase (with matching parameters)
Optional parameter, can be used to automatically fill the other required
keywords for this function. Note that the UVBase object must have parameters
that match by name to the other keywords required here (with the exception of
flex_pol, which is derived from the presence of the flex_spw_polarization array
UVParameter).
polarization_array : ndarray
Required if uvobj not provided, array containing polarization ID codes (ndarray
of dtype int and shape (Npols,) if flex_pol=False, otherwise shape (Nspws,)).
flex_pol : bool
Required if uvobj not provided, whether or not the supplied polarization_array
is derived for a flex-pol object (can contain duplicate entries of codes, with
positions indexed against entries in the spectral window table). Default is
False.
Raises
------
FileNotFoundError
If no main MS table is found when looking at filepath.
"""
_ms_utils_call_checks(filepath)
filepath += "::POLARIZATION"
if uvobj is not None:
flex_pol = uvobj.flex_spw_polarization_array is not None
if flex_pol:
polarization_array = uvobj.flex_spw_polarization_array
else:
polarization_array = uvobj.polarization_array
pol_arr = polarization_array if flex_pol else polarization_array[pol_order]
with tables.table(filepath, ack=False, readonly=False) as pol_table:
if flex_pol:
for idx, spw_pol in enumerate(np.unique(pol_arr)):
pol_str = utils.polnum2str([spw_pol])
feed_pols = {
feed for pol in pol_str for feed in utils.pol.POL_TO_FEED_DICT[pol]
}
pol_types = [pol.lower() for pol in sorted(feed_pols)]
pol_tuples = np.asarray(
[(pol_types.index(i), pol_types.index(j)) for i, j in pol_str],
dtype=np.int32,
)
pol_table.addrows()
pol_table.putcell(
"CORR_TYPE", idx, np.array([POL_AIPS2CASA_DICT[spw_pol]])
)
pol_table.putcell("CORR_PRODUCT", idx, pol_tuples)
pol_table.putcell("NUM_CORR", idx, 1)
else:
pol_str = utils.polnum2str(pol_arr)
feed_pols = {
feed for pol in pol_str for feed in utils.pol.POL_TO_FEED_DICT[pol]
}
pol_types = [pol.lower() for pol in sorted(feed_pols)]
pol_tuples = np.asarray(
[(pol_types.index(i), pol_types.index(j)) for i, j in pol_str],
dtype=np.int32,
)
pol_table.addrows()
pol_table.putcell(
"CORR_TYPE", 0, np.array([POL_AIPS2CASA_DICT[pol] for pol in pol_arr])
)
pol_table.putcell("CORR_PRODUCT", 0, pol_tuples)
pol_table.putcell("NUM_CORR", 0, len(polarization_array))
[docs]def init_ms_file(filepath, make_model_col=False, make_corr_col=False):
"""
Create a skeleton MS dataset to fill.
Parameters
----------
filepath : str
Path to MS to be created.
make_model_col : bool
If set to True, will construct a measurement set that contains a MODEL_DATA
column in addition to the DATA column. Default is False.
make_model_col : bool
If set to True, will construct a measurement set that contains a CORRECTED_DATA
column in addition to the DATA column. Default is False.
Returns
-------
ms_table : casacore Table
Table object linked to the newly created MS file.
"""
# The required_ms_desc returns the defaults for a CASA MS table
ms_desc = tables.required_ms_desc("MAIN")
# The tables have a different choice of dataManagerType and dataManagerGroup
# based on a test ALMA dataset and comparison with what gets generated with
# a dataset that comes through importuvfits.
ms_desc["FLAG"].update(
dataManagerType="TiledShapeStMan", dataManagerGroup="TiledFlag"
)
ms_desc["UVW"].update(
dataManagerType="TiledColumnStMan", dataManagerGroup="TiledUVW"
)
# TODO: Can stuff UVFLAG objects into this
ms_desc["FLAG_CATEGORY"].update(
dataManagerType="TiledShapeStMan",
dataManagerGroup="TiledFlagCategory",
keywords={"CATEGORY": np.array("baddata")},
)
ms_desc["WEIGHT"].update(
dataManagerType="TiledShapeStMan", dataManagerGroup="TiledWgt"
)
ms_desc["SIGMA"].update(
dataManagerType="TiledShapeStMan", dataManagerGroup="TiledSigma"
)
# The ALMA default for the next set of columns from the MAIN table use the
# name of the column as the dataManagerGroup, so we update the casacore
# defaults accordingly.
for key in ["ANTENNA1", "ANTENNA2", "DATA_DESC_ID", "FLAG_ROW"]:
ms_desc[key].update(dataManagerGroup=key)
# The ALMA default for he next set of columns from the MAIN table use the
# IncrementalStMan dataMangerType, and so we update the casacore defaults
# (along with the name dataManagerGroup name to the column name, which is
# also the apparent default for ALMA).
incremental_list = [
"ARRAY_ID",
"EXPOSURE",
"FEED1",
"FEED2",
"FIELD_ID",
"INTERVAL",
"OBSERVATION_ID",
"PROCESSOR_ID",
"SCAN_NUMBER",
"STATE_ID",
"TIME",
"TIME_CENTROID",
]
for key in incremental_list:
ms_desc[key].update(dataManagerType="IncrementalStMan", dataManagerGroup=key)
# TODO: Verify that the casacore defaults for coldesc are satisfactory for
# the tables and columns below (note that these were columns with apparent
# discrepancies between a test ALMA dataset and what casacore generated).
# FEED:FOCUS_LENGTH
# FIELD
# POINTING:TARGET
# POINTING:POINTING_OFFSET
# POINTING:ENCODER
# POINTING:ON_SOURCE
# POINTING:OVER_THE_TOP
# SPECTRAL_WINDOW:BBC_NO
# SPECTRAL_WINDOW:ASSOC_SPW_ID
# SPECTRAL_WINDOW:ASSOC_NATURE
# SPECTRAL_WINDOW:SDM_WINDOW_FUNCTION
# SPECTRAL_WINDOW:SDM_NUM_BIN
# Create a column for the data, which is amusingly enough not actually
# creaed by default.
datacoldesc = tables.makearrcoldesc(
"DATA",
0.0 + 0.0j,
valuetype="complex",
ndim=2,
datamanagertype="TiledShapeStMan",
datamanagergroup="TiledData",
comment="The data column",
)
del datacoldesc["desc"]["shape"]
ms_desc.update(tables.maketabdesc(datacoldesc))
if make_model_col:
datacoldesc = tables.makearrcoldesc(
"MODEL_DATA",
0.0 + 0.0j,
valuetype="complex",
ndim=2,
datamanagertype="TiledShapeStMan",
datamanagergroup="TiledData",
comment="The data column",
)
del datacoldesc["desc"]["shape"]
ms_desc.update(tables.maketabdesc(datacoldesc))
if make_corr_col:
datacoldesc = tables.makearrcoldesc(
"CORRECTED_DATA",
0.0 + 0.0j,
valuetype="complex",
ndim=2,
datamanagertype="TiledShapeStMan",
datamanagergroup="TiledData",
comment="The data column",
)
del datacoldesc["desc"]["shape"]
ms_desc.update(tables.maketabdesc(datacoldesc))
# Now create a column for the weight spectrum, which we plug nsample_array into
weightspeccoldesc = tables.makearrcoldesc(
"WEIGHT_SPECTRUM",
0.0,
valuetype="float",
ndim=2,
datamanagertype="TiledShapeStMan",
datamanagergroup="TiledWgtSpectrum",
comment="Weight for each data point",
)
del weightspeccoldesc["desc"]["shape"]
ms_desc.update(tables.maketabdesc(weightspeccoldesc))
# Finally, create the dataset, and return a handle to the freshly created
# skeleton measurement set.
return tables.default_ms(filepath, ms_desc, tables.makedminfo(ms_desc))
[docs]def init_ms_cal_file(filename, delay_table=False):
"""
Create a skeleton MS calibration table to fill.
Parameters
----------
filepath : str
Path to MS table to be created.
delay_table : bool
Set to False by default, which will create a MS table capable of storing
complex gains. However, if set to True, the method will instead construct a
table which can store delay information.
"""
standard_desc = tables.required_ms_desc()
tabledesc = {}
tabledesc["TIME"] = standard_desc["TIME"]
tabledesc["FIELD_ID"] = standard_desc["FIELD_ID"]
tabledesc["ANTENNA1"] = standard_desc["ANTENNA1"]
tabledesc["ANTENNA2"] = standard_desc["ANTENNA2"]
tabledesc["INTERVAL"] = standard_desc["INTERVAL"]
tabledesc["EXPOSURE"] = standard_desc["EXPOSURE"] # Used to track int time
tabledesc["SCAN_NUMBER"] = standard_desc["SCAN_NUMBER"]
tabledesc["OBSERVATION_ID"] = standard_desc["OBSERVATION_ID"]
# This is kind of a weird aliasing that's done for tables -- may not be always true,
# but this seems to be needed as of now (circa 2024).
tabledesc["SPECTRAL_WINDOW_ID"] = standard_desc["DATA_DESC_ID"]
for field in tabledesc:
# Option seems to be set to 5 for the above fields, based on CASA testing
tabledesc[field]["option"] = 5
# FLAG and weight are _mostly_ standard, just needs ndim modified
tabledesc["FLAG"] = standard_desc["FLAG"]
tabledesc["WEIGHT"] = standard_desc["WEIGHT"]
tabledesc["FLAG"]["ndim"] = tabledesc["WEIGHT"]["ndim"] = -1
# PARAMERR and SNR are very similar to SIGMA, so we'll boot-strap from it, with
# the comments just being updated
tabledesc["PARAMERR"] = standard_desc["SIGMA"]
tabledesc["SNR"] = standard_desc["SIGMA"]
tabledesc["SNR"]["ndim"] = tabledesc["PARAMERR"]["ndim"] = -1
tabledesc["SNR"]["comment"] = "Signal-to-noise of the gain solution."
tabledesc["PARAMERR"]["comment"] = "Uncertainty in the gains."
tabledesc["FPARAM" if delay_table else "CPARAM"] = tables.makearrcoldesc(
None,
None,
valuetype="float" if delay_table else "complex",
ndim=-1,
datamanagertype="StandardStMan",
comment="Delay values." if delay_table else "Complex gain values.",
)["desc"]
del tabledesc["FPARAM" if delay_table else "CPARAM"]["shape"]
for field in tabledesc:
tabledesc[field]["dataManagerGroup"] = "MSMTAB"
dminfo = tables.makedminfo(tabledesc)
with tables.table(
filename, tabledesc=tabledesc, dminfo=dminfo, ack=False, readonly=False
) as ms:
# Put some general stuff into the top level dict, default to wideband gains.
ms.putinfo(
{
"type": "Calibration",
"subType": "K Jones" if delay_table else "G Jones",
"readme": f"Written with pyuvdata version: {__version__}.",
}
)
# Finally, set up some de
ms.putkeyword("ParType", "Float" if delay_table else "Complex")
ms.putkeyword("MSName", "unknown")
ms.putkeyword("VisCal", "unknown")
ms.putkeyword("PolBasis", "unknown")
ms.putkeyword("CASA_Version", "unknown")
[docs]def get_ms_telescope_location(*, tb_ant_dict, obs_dict):
"""
Get the telescope location object.
Parameters
----------
tb_ant_dict : dict
dict returned by `read_ms_antenna`
obs_dict : dict
dict returned by `read_ms_observation`
"""
xyz_telescope_frame = tb_ant_dict["telescope_frame"]
xyz_telescope_ellipsoid = tb_ant_dict["telescope_ellipsoid"]
# check to see if a TELESCOPE_LOCATION column is present in the observation
# table. This is non-standard, but inserted by pyuvdata
if "telescope_location" not in obs_dict and (
obs_dict["telescope_name"] in known_telescopes()
or obs_dict["telescope_name"].upper() in known_telescopes()
):
# get it from known telescopes
telname = obs_dict["telescope_name"]
if telname.upper() in known_telescopes():
telname = telname.upper()
telescope_loc = known_telescope_location(telname)
warnings.warn(
f"Setting telescope_location to value in known_telescopes for {telname}."
)
return telescope_loc
else:
if xyz_telescope_frame == "mcmf":
try:
from lunarsky import MoonLocation
except ImportError as ie: # pragma: no cover
# There is a test for this, but it is always skipped with our
# current CI setup because it requires that python-casacore is
# installed but lunarsky isn't. Doesn't seem worth setting up a
# whole separate CI for this.
raise ValueError( # pragma: no cover
"Need to install `lunarsky` package to work with MCMF frames."
) from ie
if "telescope_location" in obs_dict:
if xyz_telescope_frame == "itrs":
return EarthLocation.from_geocentric(
*np.squeeze(obs_dict["telescope_location"]), unit="m"
)
else:
loc = MoonLocation.from_selenocentric(
*np.squeeze(obs_dict["telescope_location"]), unit="m"
)
loc.ellipsoid = xyz_telescope_ellipsoid
return loc
else:
# Set it to be the mean of the antenna positions (this is not ideal!)
if xyz_telescope_frame == "itrs":
return EarthLocation.from_geocentric(
*np.array(np.mean(tb_ant_dict["antenna_positions"], axis=0)),
unit="m",
)
else:
loc = MoonLocation.from_selenocentric(
*np.array(np.mean(tb_ant_dict["antenna_positions"], axis=0)),
unit="m",
)
loc.ellipsoid = xyz_telescope_ellipsoid
return loc