# Copyright (c) 2018 Radio Astronomy Software Group
# Licensed under the 2-clause BSD License
"""Class for reading FHD save files."""
from __future__ import annotations
import warnings
import numpy as np
from astropy import constants as const, units
from astropy.coordinates import EarthLocation
from docstring_parser import DocstringStyle
from scipy.io import readsav
from .. import utils
from ..docstrings import copy_replace_short_description
from ..utils.io import fhd as fhd_utils
from . import UVData
__all__ = ["FHD"]
[docs]class FHD(UVData):
"""
Defines a FHD-specific subclass of UVData for reading FHD save files.
This class should not be interacted with directly, instead use the read_fhd
method on the UVData class.
"""
[docs] @copy_replace_short_description(UVData.read_fhd, style=DocstringStyle.NUMPYDOC)
def read_fhd(
self,
vis_files: list[str] | np.ndarray | str,
*,
params_file: str,
obs_file: str | None = None,
flags_file: str | None = None,
layout_file: str | None = None,
settings_file: str | None = None,
background_lsts=True,
read_data=True,
default_mount_type="other",
run_check=True,
check_extra=True,
run_check_acceptability=True,
strict_uvw_antpos_check=False,
check_autos=True,
fix_autos=True,
astrometry_library=None,
):
"""Read in data from a list of FHD files."""
datafiles_dict = {}
use_model = None
if isinstance(vis_files, str):
vis_files = [vis_files]
for filename in vis_files:
if filename is None:
continue
if filename.lower().endswith("xx.sav"):
if "xx" in list(datafiles_dict.keys()):
raise ValueError("multiple xx datafiles in vis_files")
datafiles_dict["xx"] = filename
elif filename.lower().endswith("yy.sav"):
if "yy" in list(datafiles_dict.keys()):
raise ValueError("multiple yy datafiles in vis_files")
datafiles_dict["yy"] = filename
elif filename.lower().endswith("xy.sav"):
if "xy" in list(datafiles_dict.keys()):
raise ValueError("multiple xy datafiles in vis_files")
datafiles_dict["xy"] = filename
elif filename.lower().endswith("yx.sav"):
if "yx" in list(datafiles_dict.keys()):
raise ValueError("multiple yx datafiles in vis_files")
datafiles_dict["yx"] = filename
else:
raise ValueError("unrecognized file in vis_files")
if "_vis_model_" in filename:
this_model = True
else:
this_model = False
if use_model is None:
use_model = this_model
elif this_model != use_model:
raise ValueError(
"The vis_files parameter has a mix of model and data files."
)
if len(datafiles_dict) < 1 and read_data is True:
raise ValueError(
"The vis_files parameter must be passed if read_data is True"
)
if flags_file is None and read_data is True:
raise ValueError(
"The flags_file parameter must be passed if read_data is True"
)
if obs_file is None and read_data is False:
raise ValueError(
"The obs_file parameter must be passed if read_data is False."
)
if layout_file is None:
warnings.warn(
"The layout_file parameter was not passed, so antenna_postions will "
"not be defined and antenna names and numbers might be incorrect."
)
if settings_file is None:
warnings.warn(
"The settings_file parameter was not passed, so some history "
"information will be missing."
)
filenames = fhd_utils.fhd_filenames(
vis_files=vis_files,
params_file=params_file,
obs_file=obs_file,
flags_file=flags_file,
layout_file=layout_file,
settings_file=settings_file,
)
self.filename = filenames
self._filename.form = (len(self.filename),)
if not read_data:
obs_dict = readsav(obs_file, python_dict=True)
this_obs = obs_dict["obs"]
self.Npols = int(this_obs[0]["N_POL"])
else:
vis_data = {}
for pol, file in datafiles_dict.items():
this_dict = readsav(file, python_dict=True)
if use_model:
vis_data[pol] = this_dict["vis_model_ptr"]
else:
vis_data[pol] = this_dict["vis_ptr"]
this_obs = this_dict["obs"]
self.Npols = len(list(vis_data.keys()))
obs = this_obs
bl_info = obs["BASELINE_INFO"][0]
astrometry = obs["ASTR"][0]
fhd_pol_list = []
for pol in obs["POL_NAMES"][0]:
fhd_pol_list.append(pol.decode("utf8").lower())
params_dict = readsav(params_file, python_dict=True)
params = params_dict["params"]
if read_data:
flag_file_dict = readsav(flags_file, python_dict=True)
# The name for this variable changed recently (July 2016). Test for both.
vis_weights_data = {}
if "flag_arr" in flag_file_dict:
weights_key = "flag_arr"
elif "vis_weights" in flag_file_dict:
weights_key = "vis_weights"
else:
raise ValueError(
"No recognized key for visibility weights in flags_file."
)
for index, w in enumerate(flag_file_dict[weights_key]):
vis_weights_data[fhd_pol_list[index]] = w
self.Ntimes = int(obs["N_TIME"][0])
self.Nbls = int(obs["NBASELINES"][0])
self.Nblts = params["UU"][0].size
self.Nfreqs = int(obs["N_FREQ"][0])
self.Nspws = 1
self.spw_array = np.array([0])
self.flex_spw_id_array = np.zeros(self.Nfreqs, dtype=int)
self.vis_units = "Jy"
self.pol_convention = "sum"
# bl_info.JDATE (a vector of length Ntimes) is the only safe date/time
# to use in FHD files.
# (obs.JD0 (float) and params.TIME (vector of length Nblts) are
# context dependent and are not safe
# because they depend on the phasing of the visibilities)
# the values in bl_info.JDATE are the JD for each integration.
# We need to expand up to Nblts.
int_times = list(utils.tools._get_iterable(bl_info["JDATE"][0]))
bin_offset = bl_info["BIN_OFFSET"][0]
if self.Ntimes != len(int_times):
warnings.warn(
"Ntimes does not match the number of unique times in the data"
)
self.time_array = np.zeros(self.Nblts)
if self.Ntimes == 1:
self.time_array.fill(int_times[0])
else:
for ii in range(0, len(int_times)):
if ii < (len(int_times) - 1):
self.time_array[bin_offset[ii] : bin_offset[ii + 1]] = int_times[ii]
else:
self.time_array[bin_offset[ii] :] = int_times[ii]
# this is generated in FHD by subtracting the JD of neighboring
# integrations. This can have limited accuracy, so it can be slightly
# off the actual value.
# (e.g. 1.999426... rather than 2)
time_res = obs["TIME_RES"]
# time_res is constrained to be a scalar currently
self.integration_time = (
np.ones_like(self.time_array, dtype=np.float64) * time_res[0]
)
# # --- observation information ---
self.telescope.name = obs["INSTRUMENT"][0].decode("utf8")
# This is a bit of a kludge because nothing like a phase center name exists
# in FHD files.
# At least for the MWA, obs.ORIG_PHASERA and obs.ORIG_PHASEDEC specify
# the field the telescope was nominally pointing at
# (May need to be revisited, but probably isn't too important)
cat_name = (
"Field RA(deg): "
+ str(obs["ORIG_PHASERA"][0])
+ ", Dec:"
+ str(obs["ORIG_PHASEDEC"][0])
)
# For the MWA, this can sometimes be converted to EoR fields
if (
self.telescope.name.lower() == "mwa"
and np.isclose(obs["ORIG_PHASERA"][0], 0)
and np.isclose(obs["ORIG_PHASEDEC"][0], -27)
):
cat_name = "EoR 0 Field"
self.telescope.instrument = self.telescope.name
latitude = np.deg2rad(float(obs["LAT"][0]))
longitude = np.deg2rad(float(obs["LON"][0]))
altitude = float(obs["ALT"][0])
# get the stuff FHD read from the antenna table (in layout file)
if layout_file is not None:
# in older FHD versions, incorrect tile names for the mwa
# might have been stored in bl_info, so pull the obs_tile_names
# and check them against the layout file
obs_tile_names = [
ant.decode("utf8") for ant in bl_info["TILE_NAMES"][0].tolist()
]
if self.telescope.name.lower() == "mwa" and obs_tile_names[0][0] != "T":
obs_tile_names = [
"Tile" + "0" * (3 - len(ant.strip())) + ant.strip()
for ant in obs_tile_names
]
layout_param_dict = fhd_utils.get_fhd_layout_info(
layout_file=layout_file,
telescope_name=self.telescope.name,
latitude=latitude,
longitude=longitude,
altitude=altitude,
obs_tile_names=obs_tile_names,
run_check_acceptability=True,
)
telescope_attrs = {
"telescope_location": "location",
"Nants_telescope": "Nants",
"antenna_names": "antenna_names",
"antenna_numbers": "antenna_numbers",
"antenna_positions": "antenna_positions",
"diameters": "antenna_diameters",
}
for key, value in layout_param_dict.items():
if key in telescope_attrs:
setattr(self.telescope, telescope_attrs[key], value)
else:
setattr(self, key, value)
else:
self.telescope.location = EarthLocation.from_geodetic(
lat=latitude * units.rad,
lon=longitude * units.rad,
height=altitude * units.m,
)
# we don't have layout info, so go ahead and set the antenna_names,
# antenna_numbers and Nants_telescope from the baseline info struct.
self.telescope.antenna_names = [
ant.decode("utf8") for ant in bl_info["TILE_NAMES"][0].tolist()
]
self.telescope.antenna_numbers = np.array(
[int(ant) for ant in self.telescope.antenna_names]
)
if self.telescope.name.lower() == "mwa":
self.telescope.antenna_names = [
"Tile" + "0" * (3 - len(ant.strip())) + ant.strip()
for ant in self.telescope.antenna_names
]
self.telescope.Nants = len(self.telescope.antenna_names)
# Polarization information
lin_pol_order = ["xx", "yy", "xy", "yx"]
linear_pol_dict = dict(zip(lin_pol_order, np.arange(5, 9) * -1, strict=True))
pol_list = []
if read_data:
for pol in lin_pol_order:
if pol in vis_data:
pol_list.append(linear_pol_dict[pol])
self.polarization_array = np.asarray(pol_list)
else:
# Use Npols because for FHD, npol fully specifies which pols to use
pol_strings = lin_pol_order[: self.Npols]
self.polarization_array = np.asarray(
[linear_pol_dict[pol] for pol in pol_strings]
)
if self.telescope.name.lower() == "mwa":
# Setting this explicitly to avoid superfluous warnings, given that MWA
# seems to be the most common telescope to pass through FHD
self.telescope.mount_type = ["phased"] * self.telescope.Nants
self.telescope.set_feeds_from_x_orientation(
x_orientation="east", polarization_array=self.polarization_array
)
self.set_telescope_params(mount_type=default_mount_type)
# need to make sure telescope location is defined properly before this call
proc = self.set_lsts_from_time_array(
background=background_lsts, astrometry_library=astrometry_library
)
if not np.isclose(obs["OBSRA"][0], obs["PHASERA"][0]) or not np.isclose(
obs["OBSDEC"][0], obs["PHASEDEC"][0]
):
warnings.warn(
"These visibilities may have been phased "
"improperly -- without changing the uvw locations"
)
cat_id = self._add_phase_center(
cat_name=cat_name,
cat_type="sidereal",
cat_lon=np.deg2rad(float(obs["OBSRA"][0])),
cat_lat=np.deg2rad(float(obs["OBSDEC"][0])),
cat_frame=astrometry["RADECSYS"][0].decode().lower(),
cat_epoch=astrometry["EQUINOX"][0],
)
self.phase_center_id_array = np.zeros(self.Nblts, dtype=int) + cat_id
# Note that FHD antenna arrays are 1-indexed so we subtract 1
# to get 0-indexed arrays
ind_1_array = bl_info["TILE_A"][0] - 1
ind_2_array = bl_info["TILE_B"][0] - 1
self.ant_1_array = self.telescope.antenna_numbers[ind_1_array]
self.ant_2_array = self.telescope.antenna_numbers[ind_2_array]
self.Nants_data = int(np.union1d(self.ant_1_array, self.ant_2_array).size)
self.baseline_array = self.antnums_to_baseline(
self.ant_1_array, self.ant_2_array
)
if self.Nbls != len(np.unique(self.baseline_array)):
warnings.warn(
"Nbls does not match the number of unique baselines in the data"
)
self.freq_array = np.zeros(len(bl_info["FREQ"][0]), dtype=np.float64)
self.freq_array[:] = bl_info["FREQ"][0]
self.channel_width = np.full(self.Nfreqs, float(obs["FREQ_RES"][0]))
# In FHD, uvws are in seconds not meters.
# FHD follows the FITS uvw direction convention, which is opposite
# ours and Miriad's.
# So conjugate the visibilities and flip the uvws:
self.uvw_array = np.zeros((self.Nblts, 3))
self.uvw_array[:, 0] = (-1) * params["UU"][0] * const.c.to_value("m/s")
self.uvw_array[:, 1] = (-1) * params["VV"][0] * const.c.to_value("m/s")
self.uvw_array[:, 2] = (-1) * params["WW"][0] * const.c.to_value("m/s")
# history: add the first few lines from the settings file
if settings_file is not None:
self.history = fhd_utils.get_fhd_history(settings_file)
else:
self.history = ""
if not utils.history._check_history_version(
self.history, self.pyuvdata_version_str
):
self.history += self.pyuvdata_version_str
if read_data:
self.data_array = np.zeros(
(self.Nblts, self.Nfreqs, self.Npols), dtype=np.complex128
)
self.nsample_array = np.zeros(
(self.Nblts, self.Nfreqs, self.Npols), dtype=np.float64
)
self.flag_array = np.zeros(
(self.Nblts, self.Nfreqs, self.Npols), dtype=np.bool_
)
for pol, vis in vis_data.items():
pol_i = pol_list.index(linear_pol_dict[pol])
# FHD follows the FITS uvw direction convention, which is opposite
# ours and Miriad's.
# So conjugate the visibilities and flip the uvws:
self.data_array[:, :, pol_i] = np.conj(vis)
self.flag_array[:, :, pol_i] = vis_weights_data[pol] <= 0
self.nsample_array[:, :, pol_i] = np.abs(vis_weights_data[pol])
# wait for LSTs if set in background
if proc is not None:
proc.join()
self._set_app_coords_helper()
# check if object has all required uv_properties set
if run_check:
self.check(
check_extra=check_extra,
run_check_acceptability=run_check_acceptability,
strict_uvw_antpos_check=strict_uvw_antpos_check,
allow_flip_conj=True,
check_autos=check_autos,
fix_autos=fix_autos,
)