# -*- mode: python; coding: utf-8 -*-
# Copyright (c) 2018 Radio Astronomy Software Group
# Licensed under the 2-clause BSD License
"""Class for reading FHD save files."""
import os
import warnings
import numpy as np
from astropy import constants as const
from docstring_parser import DocstringStyle
from scipy.io import readsav
from .. import telescopes as uvtel
from .. import utils as uvutils
from ..docstrings import copy_replace_short_description
from .uvdata import UVData, _future_array_shapes_warning
__all__ = ["get_fhd_history", "get_fhd_layout_info", "FHD"]
[docs]def get_fhd_history(settings_file, return_user=False):
"""
Small function to get the important history from an FHD settings text file.
Includes information about the command line call, the user, machine name and date
Parameters
----------
settings_file : str
FHD settings file name
return_user : bool
optionally return the username who ran FHD
Returns
-------
history : str
string of history extracted from the settings file
user : str
Only returned if return_user is True
"""
with open(settings_file, "r") as f:
settings_lines = f.readlines()
main_loc = None
command_loc = None
obs_loc = None
user_line = None
for ind, line in enumerate(settings_lines):
if line.startswith("##MAIN"):
main_loc = ind
if line.startswith("##COMMAND_LINE"):
command_loc = ind
if line.startswith("##OBS"):
obs_loc = ind
if line.startswith("User"):
user_line = ind
if (
main_loc is not None
and command_loc is not None
and obs_loc is not None
and user_line is not None
):
break
main_lines = settings_lines[main_loc + 1 : command_loc]
command_lines = settings_lines[command_loc + 1 : obs_loc]
history_lines = ["FHD history\n"] + main_lines + command_lines
for ind, line in enumerate(history_lines):
history_lines[ind] = line.rstrip().replace("\t", " ")
history = "\n".join(history_lines)
user = settings_lines[user_line].split()[1]
if return_user:
return history, user
else:
return history
def _xyz_close(xyz1, xyz2, loc_tols):
return np.allclose(xyz1, xyz2, rtol=loc_tols[0], atol=loc_tols[1])
def _latlonalt_close(latlonalt1, latlonalt2, radian_tol, loc_tols):
latlon_close = np.allclose(
np.array(latlonalt1[0:2]), np.array(latlonalt2[0:2]), rtol=0, atol=radian_tol
)
alt_close = np.isclose(
latlonalt1[2], latlonalt2[2], rtol=loc_tols[0], atol=loc_tols[1]
)
return latlon_close and alt_close
def get_fhd_layout_info(
layout_file,
telescope_name,
latitude,
longitude,
altitude,
radian_tol,
loc_tols,
obs_tile_names,
run_check_acceptability=True,
):
"""
Get the telescope and antenna positions from an FHD layout file.
Parameters
----------
layout_file : str
FHD layout file name
telescope_name : str
Telescope name
latitude : float
telescope latitude in radians
longitude : float
telescope longitude in radians
altitude : float
telescope altitude in meters
loc_tols : float
telescope_location tolerance in meters.
radian_tols : float
lat/lon tolerance in radians.
obs_tile_names : array-like of str
Tile names from the bl_info structure inside the obs structure.
Only used if telescope_name is "mwa".
run_check_acceptability : bool
Option to check acceptable range of the telescope locations.
Returns
-------
dict
A dictionary of parameters from the layout file to assign to the object. The
keys are:
* telescope_xyz : Telescope location in ECEF, shape (3,) (float)
* Nants_telescope : Number of antennas in the telescope (int)
* antenna_postions : Antenna positions in relative ECEF,
shape (Nants_telescope, 3) (float)
* antenna_names : Antenna names, length Nants_telescope (list of str)
* antenna_numbers : Antenna numbers, shape (Nants_telescope,) (array of int)
* gst0 : Greenwich sidereal time at midnight on reference date. (float)
* earth_omega : Earth's rotation rate in degrees per day. (float)
* dut1 : DUT1 (google it) AIPS 117 calls it UT1UTC. (float)
* timesys : Time system (should only ever be UTC). (str)
* diameters : Antenna diameters in meters. shape (Nants_telescope,) (float)
* extra_keywords : Dictionary of extra keywords to preserve on the object.
"""
layout_dict = readsav(layout_file, python_dict=True)
layout = layout_dict["layout"]
layout_fields = [name.lower() for name in layout.dtype.names]
# Try to get the telescope location from the layout file &
# compare it to the position from the obs structure.
arr_center = layout["array_center"][0]
layout_fields.remove("array_center")
xyz_telescope_frame = layout["coordinate_frame"][0].decode("utf8").lower()
layout_fields.remove("coordinate_frame")
if xyz_telescope_frame.strip() == "itrf":
# compare to lat/lon/alt
location_latlonalt = uvutils.XYZ_from_LatLonAlt(latitude, longitude, altitude)
latlonalt_arr_center = uvutils.LatLonAlt_from_XYZ(
arr_center, check_acceptability=run_check_acceptability
)
# check both lat/lon/alt and xyz because of subtle differences
# in tolerances
if _xyz_close(location_latlonalt, arr_center, loc_tols) or _latlonalt_close(
(latitude, longitude, altitude), latlonalt_arr_center, radian_tol, loc_tols
):
telescope_location = arr_center
else:
# values do not agree with each other to within the tolerances.
# this is a known issue with FHD runs on cotter uvfits
# files for the MWA
# compare with the known_telescopes values
telescope_obj = uvtel.get_telescope(telescope_name)
# start warning message
message = (
"Telescope location derived from obs lat/lon/alt "
"values does not match the location in the layout file."
)
if telescope_obj is not False:
message += " Using the value from known_telescopes."
telescope_location = telescope_obj.telescope_location
else:
message += (
" Telescope is not in known_telescopes. "
"Defaulting to using the obs derived values."
)
telescope_location = location_latlonalt
# issue warning
warnings.warn(message)
else:
telescope_location = uvutils.XYZ_from_LatLonAlt(latitude, longitude, altitude)
# The FHD positions derive directly from uvfits, so they are in the rotated
# ECEF frame and must be converted to ECEF
rot_ecef_positions = layout["antenna_coords"][0]
layout_fields.remove("antenna_coords")
# use the longitude from the layout file because that's how the antenna
# positions were calculated
latitude, longitude, altitude = uvutils.LatLonAlt_from_XYZ(
arr_center, check_acceptability=run_check_acceptability
)
antenna_positions = uvutils.ECEF_from_rotECEF(rot_ecef_positions, longitude)
antenna_names = [ant.decode("utf8") for ant in layout["antenna_names"][0].tolist()]
layout_fields.remove("antenna_names")
# make these 0-indexed (rather than one indexed)
antenna_numbers = layout["antenna_numbers"][0]
layout_fields.remove("antenna_numbers")
Nants_telescope = int(layout["n_antenna"][0])
layout_fields.remove("n_antenna")
if telescope_name.lower() == "mwa":
# check that obs.baseline_info.tile_names match the antenna names
# (accounting for possible differences in white space)
# this only applies for MWA because the tile_names come from
# metafits files. layout["antenna_names"] comes from the antenna table
# in the uvfits file and will be used if no metafits was submitted
if [ant.strip() for ant in obs_tile_names] != [
ant.strip() for ant in antenna_names
]:
warnings.warn(
"tile_names from obs structure does not match "
"antenna_names from layout"
)
gst0 = float(layout["gst0"][0])
layout_fields.remove("gst0")
if layout["ref_date"][0] != "":
rdate = layout["ref_date"][0].decode("utf8").lower()
else:
rdate = None
layout_fields.remove("ref_date")
earth_omega = float(layout["earth_degpd"][0])
layout_fields.remove("earth_degpd")
dut1 = float(layout["dut1"][0])
layout_fields.remove("dut1")
timesys = layout["time_system"][0].decode("utf8").upper().strip()
layout_fields.remove("time_system")
if "diameters" in layout_fields:
diameters = np.asarray(layout["diameters"])
layout_fields.remove("diameters")
else:
diameters = None
extra_keywords = {}
# ignore some fields, put everything else in extra_keywords
layout_fields_ignore = [
"diff_utc",
"pol_type",
"n_pol_cal_params",
"mount_type",
"axis_offset",
"pola",
"pola_orientation",
"pola_cal_params",
"polb",
"polb_orientation",
"polb_cal_params",
"beam_fwhm",
]
for field in layout_fields_ignore:
if field in layout_fields:
layout_fields.remove(field)
for field in layout_fields:
keyword = field
if len(keyword) > 8:
keyword = field.replace("_", "")
value = layout[field][0]
if isinstance(value, bytes):
value = value.decode("utf8")
extra_keywords[keyword.upper()] = value
layout_param_dict = {
"telescope_location": telescope_location,
"Nants_telescope": Nants_telescope,
"antenna_positions": antenna_positions,
"antenna_names": antenna_names,
"antenna_numbers": antenna_numbers,
"gst0": gst0,
"rdate": rdate,
"earth_omega": earth_omega,
"dut1": dut1,
"timesys": timesys,
"diameters": diameters,
"extra_keywords": extra_keywords,
}
return layout_param_dict
[docs]class FHD(UVData):
"""
Defines a FHD-specific subclass of UVData for reading FHD save files.
This class should not be interacted with directly, instead use the read_fhd
method on the UVData class.
"""
[docs] @copy_replace_short_description(UVData.read_fhd, style=DocstringStyle.NUMPYDOC)
def read_fhd(
self,
filelist,
use_model=False,
background_lsts=True,
read_data=True,
run_check=True,
check_extra=True,
run_check_acceptability=True,
strict_uvw_antpos_check=False,
check_autos=True,
fix_autos=True,
use_future_array_shapes=False,
astrometry_library=None,
):
"""Read in data from a list of FHD files."""
datafiles = {}
params_file = None
obs_file = None
flags_file = None
layout_file = None
settings_file = None
if use_model:
data_name = "_vis_model_"
else:
data_name = "_vis_"
for filename in filelist:
# update filelist
basename = os.path.basename(filename)
self.filename = uvutils._combine_filenames(self.filename, [basename])
self._filename.form = (len(self.filename),)
if filename.lower().endswith(data_name + "xx.sav"):
if "xx" in list(datafiles.keys()):
raise ValueError("multiple xx datafiles in filelist")
datafiles["xx"] = filename
elif filename.lower().endswith(data_name + "yy.sav"):
if "yy" in list(datafiles.keys()):
raise ValueError("multiple yy datafiles in filelist")
datafiles["yy"] = filename
elif filename.lower().endswith(data_name + "xy.sav"):
if "xy" in list(datafiles.keys()):
raise ValueError("multiple xy datafiles in filelist")
datafiles["xy"] = filename
elif filename.lower().endswith(data_name + "yx.sav"):
if "yx" in list(datafiles.keys()):
raise ValueError("multiple yx datafiles in filelist")
datafiles["yx"] = filename
elif filename.lower().endswith("_params.sav"):
if params_file is not None:
raise ValueError("multiple params files in filelist")
params_file = filename
elif filename.lower().endswith("_obs.sav"):
if obs_file is not None:
raise ValueError("multiple obs files in filelist")
obs_file = filename
elif filename.lower().endswith("_flags.sav"):
if flags_file is not None:
raise ValueError("multiple flags files in filelist")
flags_file = filename
elif filename.lower().endswith("_layout.sav"):
if layout_file is not None:
raise ValueError("multiple layout files in filelist")
layout_file = filename
elif filename.lower().endswith("_settings.txt"):
if settings_file is not None:
raise ValueError("multiple settings files in filelist")
settings_file = filename
else:
# this is reached in tests but marked as uncovered because
# CPython's peephole optimizer replaces a jump to a continue
# with a jump to the top of the loop
continue # pragma: no cover
if len(datafiles) < 1 and read_data is True:
raise ValueError(
"No data files included in file list and read_data is True."
)
if obs_file is None and read_data is False:
raise ValueError(
"No obs file included in file list and read_data is False."
)
if params_file is None:
raise ValueError("No params file included in file list")
if flags_file is None:
raise ValueError("No flags file included in file list")
if layout_file is None:
warnings.warn(
"No layout file included in file list, "
"antenna_postions will not be defined "
"and antenna names and numbers might be incorrect."
)
if settings_file is None:
warnings.warn("No settings file included in file list")
if not read_data:
obs_dict = readsav(obs_file, python_dict=True)
this_obs = obs_dict["obs"]
self.Npols = int(this_obs[0]["N_POL"])
else:
# TODO: add checking to make sure params, flags and datafiles are
# consistent with each other
vis_data = {}
for pol, file in datafiles.items():
this_dict = readsav(file, python_dict=True)
if use_model:
vis_data[pol] = this_dict["vis_model_ptr"]
else:
vis_data[pol] = this_dict["vis_ptr"]
this_obs = this_dict["obs"]
self.Npols = len(list(vis_data.keys()))
obs = this_obs
bl_info = obs["BASELINE_INFO"][0]
astrometry = obs["ASTR"][0]
fhd_pol_list = []
for pol in obs["POL_NAMES"][0]:
fhd_pol_list.append(pol.decode("utf8").lower())
params_dict = readsav(params_file, python_dict=True)
params = params_dict["params"]
if read_data:
flag_file_dict = readsav(flags_file, python_dict=True)
# The name for this variable changed recently (July 2016). Test for both.
vis_weights_data = {}
if "flag_arr" in flag_file_dict:
weights_key = "flag_arr"
elif "vis_weights" in flag_file_dict:
weights_key = "vis_weights"
else:
raise ValueError(
"No recognized key for visibility weights in flags_file."
)
for index, w in enumerate(flag_file_dict[weights_key]):
vis_weights_data[fhd_pol_list[index]] = w
self.Ntimes = int(obs["N_TIME"][0])
self.Nbls = int(obs["NBASELINES"][0])
self.Nblts = params["UU"][0].size
self.Nfreqs = int(obs["N_FREQ"][0])
self.Nspws = 1
self.spw_array = np.array([0])
# Future proof: set the flex_spw_id_array.
self.flex_spw_id_array = np.zeros(self.Nfreqs, dtype=int)
self.vis_units = "Jy"
# bl_info.JDATE (a vector of length Ntimes) is the only safe date/time
# to use in FHD files.
# (obs.JD0 (float) and params.TIME (vector of length Nblts) are
# context dependent and are not safe
# because they depend on the phasing of the visibilities)
# the values in bl_info.JDATE are the JD for each integration.
# We need to expand up to Nblts.
int_times = list(uvutils._get_iterable(bl_info["JDATE"][0]))
bin_offset = bl_info["BIN_OFFSET"][0]
if self.Ntimes != len(int_times):
warnings.warn(
"Ntimes does not match the number of unique times in the data"
)
self.time_array = np.zeros(self.Nblts)
if self.Ntimes == 1:
self.time_array.fill(int_times[0])
else:
for ii in range(0, len(int_times)):
if ii < (len(int_times) - 1):
self.time_array[bin_offset[ii] : bin_offset[ii + 1]] = int_times[ii]
else:
self.time_array[bin_offset[ii] :] = int_times[ii]
# this is generated in FHD by subtracting the JD of neighboring
# integrations. This can have limited accuracy, so it can be slightly
# off the actual value.
# (e.g. 1.999426... rather than 2)
time_res = obs["TIME_RES"]
# time_res is constrained to be a scalar currently
self.integration_time = (
np.ones_like(self.time_array, dtype=np.float64) * time_res[0]
)
# # --- observation information ---
self.telescope_name = obs["INSTRUMENT"][0].decode("utf8")
# This is a bit of a kludge because nothing like a phase center name exists
# in FHD files.
# At least for the MWA, obs.ORIG_PHASERA and obs.ORIG_PHASEDEC specify
# the field the telescope was nominally pointing at
# (May need to be revisited, but probably isn't too important)
cat_name = (
"Field RA(deg): "
+ str(obs["ORIG_PHASERA"][0])
+ ", Dec:"
+ str(obs["ORIG_PHASEDEC"][0])
)
# For the MWA, this can sometimes be converted to EoR fields
if self.telescope_name.lower() == "mwa":
if np.isclose(obs["ORIG_PHASERA"][0], 0) and np.isclose(
obs["ORIG_PHASEDEC"][0], -27
):
cat_name = "EoR 0 Field"
self.instrument = self.telescope_name
latitude = np.deg2rad(float(obs["LAT"][0]))
longitude = np.deg2rad(float(obs["LON"][0]))
altitude = float(obs["ALT"][0])
# get the stuff FHD read from the antenna table (in layout file)
if layout_file is not None:
# in older FHD versions, incorrect tile names for the mwa
# might have been stored in bl_info, so pull the obs_tile_names
# and check them against the layout file
obs_tile_names = [
ant.decode("utf8") for ant in bl_info["TILE_NAMES"][0].tolist()
]
if self.telescope_name.lower() == "mwa" and obs_tile_names[0][0] != "T":
obs_tile_names = [
"Tile" + "0" * (3 - len(ant.strip())) + ant.strip()
for ant in obs_tile_names
]
layout_param_dict = get_fhd_layout_info(
layout_file,
self.telescope_name,
latitude,
longitude,
altitude,
uvutils.RADIAN_TOL,
self._telescope_location.tols,
obs_tile_names,
run_check_acceptability=True,
)
for key, value in layout_param_dict.items():
setattr(self, key, value)
else:
self.telescope_location_lat_lon_alt = (latitude, longitude, altitude)
# we don't have layout info, so go ahead and set the antenna_names,
# antenna_numbers and Nants_telescope from the baseline info struct.
self.antenna_names = [
ant.decode("utf8") for ant in bl_info["TILE_NAMES"][0].tolist()
]
self.antenna_numbers = np.array([int(ant) for ant in self.antenna_names])
if self.telescope_name.lower() == "mwa":
self.antenna_names = [
"Tile" + "0" * (3 - len(ant.strip())) + ant.strip()
for ant in self.antenna_names
]
self.Nants_telescope = len(self.antenna_names)
# need to make sure telescope location is defined properly before this call
proc = self.set_lsts_from_time_array(
background=background_lsts, astrometry_library=astrometry_library
)
if not np.isclose(obs["OBSRA"][0], obs["PHASERA"][0]) or not np.isclose(
obs["OBSDEC"][0], obs["PHASEDEC"][0]
):
warnings.warn(
"These visibilities may have been phased "
"improperly -- without changing the uvw locations"
)
cat_id = self._add_phase_center(
cat_name=cat_name,
cat_type="sidereal",
cat_lon=np.deg2rad(float(obs["OBSRA"][0])),
cat_lat=np.deg2rad(float(obs["OBSDEC"][0])),
cat_frame=astrometry["RADECSYS"][0].decode().lower(),
cat_epoch=astrometry["EQUINOX"][0],
)
self.phase_center_id_array = np.zeros(self.Nblts, dtype=int) + cat_id
# Note that FHD antenna arrays are 1-indexed so we subtract 1
# to get 0-indexed arrays
ind_1_array = bl_info["TILE_A"][0] - 1
ind_2_array = bl_info["TILE_B"][0] - 1
self.ant_1_array = self.antenna_numbers[ind_1_array]
self.ant_2_array = self.antenna_numbers[ind_2_array]
self.Nants_data = int(np.union1d(self.ant_1_array, self.ant_2_array).size)
self.baseline_array = self.antnums_to_baseline(
self.ant_1_array, self.ant_2_array
)
if self.Nbls != len(np.unique(self.baseline_array)):
warnings.warn(
"Nbls does not match the number of unique baselines in the data"
)
# TODO: Spw axis to be collapsed in future release
self.freq_array = np.zeros((1, len(bl_info["FREQ"][0])), dtype=np.float64)
self.freq_array[0, :] = bl_info["FREQ"][0]
self.channel_width = float(obs["FREQ_RES"][0])
# In FHD, uvws are in seconds not meters.
# FHD follows the FITS uvw direction convention, which is opposite
# ours and Miriad's.
# So conjugate the visibilities and flip the uvws:
self.uvw_array = np.zeros((self.Nblts, 3))
self.uvw_array[:, 0] = (-1) * params["UU"][0] * const.c.to("m/s").value
self.uvw_array[:, 1] = (-1) * params["VV"][0] * const.c.to("m/s").value
self.uvw_array[:, 2] = (-1) * params["WW"][0] * const.c.to("m/s").value
lin_pol_order = ["xx", "yy", "xy", "yx"]
linear_pol_dict = dict(zip(lin_pol_order, np.arange(5, 9) * -1))
pol_list = []
if read_data:
for pol in lin_pol_order:
if pol in vis_data:
pol_list.append(linear_pol_dict[pol])
self.polarization_array = np.asarray(pol_list)
else:
# Use Npols because for FHD, npol fully specifies which pols to use
pol_strings = lin_pol_order[: self.Npols]
self.polarization_array = np.asarray(
[linear_pol_dict[pol] for pol in pol_strings]
)
# history: add the first few lines from the settings file
if settings_file is not None:
self.history = get_fhd_history(settings_file)
else:
self.history = ""
if not uvutils._check_history_version(self.history, self.pyuvdata_version_str):
self.history += self.pyuvdata_version_str
if read_data:
# TODO: Spw axis to be collapsed in future release
self.data_array = np.zeros(
(self.Nblts, 1, self.Nfreqs, self.Npols), dtype=np.complex_
)
# TODO: Spw axis to be collapsed in future release
self.nsample_array = np.zeros(
(self.Nblts, 1, self.Nfreqs, self.Npols), dtype=np.float64
)
# TODO: Spw axis to be collapsed in future release
self.flag_array = np.zeros(
(self.Nblts, 1, self.Nfreqs, self.Npols), dtype=np.bool_
)
for pol, vis in vis_data.items():
pol_i = pol_list.index(linear_pol_dict[pol])
# FHD follows the FITS uvw direction convention, which is opposite
# ours and Miriad's.
# So conjugate the visibilities and flip the uvws:
self.data_array[:, 0, :, pol_i] = np.conj(vis)
self.flag_array[:, 0, :, pol_i] = vis_weights_data[pol] <= 0
self.nsample_array[:, 0, :, pol_i] = np.abs(vis_weights_data[pol])
# wait for LSTs if set in background
if proc is not None:
proc.join()
self._set_app_coords_helper()
if use_future_array_shapes:
self.use_future_array_shapes()
else:
warnings.warn(_future_array_shapes_warning, DeprecationWarning)
# check if object has all required uv_properties set
if run_check:
self.check(
check_extra=check_extra,
run_check_acceptability=run_check_acceptability,
strict_uvw_antpos_check=strict_uvw_antpos_check,
allow_flip_conj=True,
check_autos=check_autos,
fix_autos=fix_autos,
)