Source code for pyuvdata.uvcal.fhd_cal

# -*- mode: python; coding: utf-8 -*-
# Copyright (c) 2018 Radio Astronomy Software Group
# Licensed under the 2-clause BSD License
"""Class for reading FHD calibration save files."""

import os
import warnings

import numpy as np
from docstring_parser import DocstringStyle
from scipy.io import readsav

from .. import utils as uvutils
from ..docstrings import copy_replace_short_description
from ..uvdata.fhd import get_fhd_history, get_fhd_layout_info
from .uvcal import UVCal, _future_array_shapes_warning

__all__ = ["FHDCal"]


[docs]class FHDCal(UVCal): """ Defines a FHD-specific subclass of UVCal for reading FHD calibration save files. This class should not be interacted with directly, instead use the read_fhd_cal method on the UVCal class. """
[docs] @copy_replace_short_description(UVCal.read_fhd_cal, style=DocstringStyle.NUMPYDOC) def read_fhd_cal( self, cal_file, obs_file, layout_file=None, settings_file=None, raw=True, read_data=True, background_lsts=True, extra_history=None, run_check=True, check_extra=True, run_check_acceptability=True, use_future_array_shapes=False, astrometry_library=None, ): """Read data from an FHD cal.sav file.""" if not read_data and settings_file is None: raise ValueError("A settings_file must be provided if read_data is False.") filelist = [cal_file, obs_file] if layout_file is not None: filelist.append(layout_file) if settings_file is not None: filelist.append(settings_file) for filename in filelist: # update filelist basename = os.path.basename(filename) self.filename = uvutils._combine_filenames(self.filename, [basename]) self._filename.form = (len(self.filename),) this_dict = readsav(obs_file, python_dict=True) obs_data = this_dict["obs"] bl_info = obs_data["BASELINE_INFO"][0] self.Nspws = 1 self.spw_array = np.array([0]) self.Nfreqs = int(obs_data["N_FREQ"][0]) self.freq_array = np.zeros((1, len(bl_info["FREQ"][0])), dtype=np.float64) self.freq_array[0, :] = bl_info["FREQ"][0] self.channel_width = float(obs_data["FREQ_RES"][0]) self.flex_spw_id_array = np.zeros(self.Nfreqs, dtype=int) # FHD only calculates one calibration over all the times. # obs_data.n_time /cal_data.n_times gives the number of times that goes into # that one calibration, UVCal.Ntimes gives the number of separate calibrations # along the time axis. self.Ntimes = 1 time_array = bl_info["jdate"][0] self.time_array = np.array([np.mean(time_array)]) # this is generated in FHD by subtracting the JD of neighboring # integrations. This can have limited accuracy, so it can be slightly # off the actual value. # (e.g. 1.999426... rather than 2) time_res = obs_data["TIME_RES"][0] # time_res is constrained to be a scalar currently self.integration_time = np.float64(time_res) # array of used frequencies (1: used, 0: flagged) freq_use = bl_info["freq_use"][0] # array of used antennas (1: used, 0: flagged) ant_use = bl_info["tile_use"][0] # array of used times (1: used, 0: flagged) time_use = bl_info["time_use"][0] time_array_use = time_array[np.where(time_use > 0)] self.time_range = np.asarray([np.min(time_array_use), np.max(time_array_use)]) self.telescope_name = obs_data["instrument"][0].decode("utf8") latitude = np.deg2rad(float(obs_data["LAT"][0])) longitude = np.deg2rad(float(obs_data["LON"][0])) altitude = float(obs_data["ALT"][0]) # get the stuff FHD read from the antenna table (in layout file) if layout_file is not None: obs_tile_names = [ ant.decode("utf8") for ant in bl_info["TILE_NAMES"][0].tolist() ] if self.telescope_name.lower() == "mwa": obs_tile_names = [ "Tile" + "0" * (3 - len(ant.strip())) + ant.strip() for ant in obs_tile_names ] layout_param_dict = get_fhd_layout_info( layout_file, self.telescope_name, latitude, longitude, altitude, self._lst_array.tols, self._telescope_location.tols, obs_tile_names, run_check_acceptability=True, ) layout_params_to_ignore = [ "gst0", "rdate", "earth_omega", "dut1", "timesys", "diameters", ] for key, value in layout_param_dict.items(): if key not in layout_params_to_ignore: setattr(self, key, value) else: warnings.warn( "No layout file, antenna_postions will not be defined " "and antenna_names might be incorrect." ) self.telescope_location_lat_lon_alt = (latitude, longitude, altitude) # FHD stores antenna numbers, not names, in the "TILE_NAMES" field self.antenna_names = [ ant.decode("utf8") for ant in bl_info["TILE_NAMES"][0].tolist() ] self.antenna_numbers = np.array([int(ant) for ant in self.antenna_names]) if self.telescope_name.lower() == "mwa": self.antenna_names = [ "Tile" + "0" * (3 - len(ant.strip())) + ant.strip() for ant in self.antenna_names ] self.Nants_telescope = len(self.antenna_names) self.antenna_names = np.asarray(self.antenna_names) try: self.set_telescope_params() except ValueError as ve: warnings.warn(str(ve)) # need to make sure telescope location is defined properly before this call if self.telescope_location is not None: proc = self.set_lsts_from_time_array( background=background_lsts, astrometry_library=astrometry_library ) self._set_sky() self.gain_convention = "divide" self.x_orientation = "east" self._set_gain() # currently don't have branch info. may change in future. self.git_origin_cal = "https://github.com/EoRImaging/FHD" self.git_hash_cal = obs_data["code_version"][0].decode("utf8") if "DELAYS" in obs_data.dtype.names: if obs_data["delays"][0] is not None: self.extra_keywords["delays"] = ( "[" + ", ".join(str(int(d)) for d in obs_data["delays"][0]) + "]" ) if settings_file is not None: self.history, self.observer = get_fhd_history( settings_file, return_user=True ) else: warnings.warn("No settings file, history will be incomplete") self.history = "" if extra_history is not None: if isinstance(extra_history, (list, tuple)): self.history += "\n" + "\n".join(extra_history) else: self.history += "\n" + extra_history if not uvutils._check_history_version(self.history, self.pyuvdata_version_str): if self.history.endswith("\n"): self.history += self.pyuvdata_version_str else: self.history += "\n" + self.pyuvdata_version_str if not read_data: n_pols = int(obs_data["N_POL"][0]) # FHD only has the diagonal elements (jxx, jyy), so limit to 2 self.Njones = int(np.min([n_pols, 2])) # for calibration FHD includes all antennas in the antenna table, # regardless of whether or not they have data self.Nants_data = len(self.antenna_names) # get details from settings file keywords = [ "ref_antenna_name", "catalog_name", "n_sources", "min_cal_baseline", "max_cal_baseline", "galaxy_model", "diffuse_model", "auto_scale", "n_vis_cal", "time_avg", "conv_thresh", ] if not raw: keywords += [ "polyfit", "bandpass", "mode_fit", "amp_degree", "phase_degree", ] settings_lines = {} with open(settings_file, "r") as read_obj: cal_start = False for line in read_obj: if not cal_start: if line.startswith("##CAL"): cal_start = True else: if line.startswith("##"): break # in cal structure section for kw in keywords: if line.strip().startswith(kw.upper()): settings_lines[kw] = line.split()[1:] self.ref_antenna_name = settings_lines["ref_antenna_name"][0] self.Nsources = int(settings_lines["n_sources"][0]) self.sky_catalog = settings_lines["catalog_name"][0] self.baseline_range = [ float(settings_lines["min_cal_baseline"][0]), float(settings_lines["max_cal_baseline"][0]), ] galaxy_model = int(settings_lines["galaxy_model"][0]) if len(settings_lines["diffuse_model"]) > 0: diffuse_model = settings_lines["diffuse_model"][0] else: diffuse_model = "" auto_scale = settings_lines["auto_scale"] n_vis_cal = np.int64(settings_lines["n_vis_cal"][0]) time_avg = int(settings_lines["time_avg"][0]) conv_thresh = float(settings_lines["conv_thresh"][0]) if not raw: polyfit = int(settings_lines["polyfit"][0]) bandpass = int(settings_lines["bandpass"][0]) mode_fit = settings_lines["mode_fit"] # for some reason, it's a float if it's one value, # and integers otherwise if len(mode_fit) == 1: mode_fit = float(mode_fit[0]) else: mode_fit = np.array(mode_fit, dtype=np.int64) amp_degree = int(settings_lines["amp_degree"][0]) phase_degree = int(settings_lines["phase_degree"][0]) else: this_dict = readsav(cal_file, python_dict=True) cal_data = this_dict["cal"] self.Njones = int(cal_data["n_pol"][0]) self.Nants_data = int(cal_data["n_tile"][0]) self.sky_catalog = cal_data["skymodel"][0]["catalog_name"][0].decode("utf8") self.ref_antenna_name = ( cal_data["ref_antenna_name"][0].decode("utf8").strip() ) self.Nsources = int(cal_data["skymodel"][0]["n_sources"][0]) self.baseline_range = [ float(cal_data["min_cal_baseline"][0]), float(cal_data["max_cal_baseline"][0]), ] galaxy_model = cal_data["skymodel"][0]["galaxy_model"][0] diffuse_model = cal_data["skymodel"][0]["diffuse_model"][0] auto_scale = cal_data["auto_scale"][0] n_vis_cal = cal_data["n_vis_cal"][0] time_avg = cal_data["time_avg"][0] conv_thresh = cal_data["conv_thresh"][0] if not raw: polyfit = cal_data["polyfit"][0] bandpass = cal_data["bandpass"][0] mode_fit = cal_data["mode_fit"][0] amp_degree = cal_data["amp_degree"][0] phase_degree = cal_data["phase_degree"][0] # Now read data like arrays fit_gain_array_in = cal_data["gain"][0] fit_gain_array = np.zeros( self._gain_array.expected_shape(self), dtype=np.complex128 ) for jones_i, arr in enumerate(fit_gain_array_in): fit_gain_array[:, 0, :, 0, jones_i] = arr if raw: res_gain_array_in = cal_data["gain_residual"][0] res_gain_array = np.zeros( self._gain_array.expected_shape(self), dtype=np.complex128 ) for jones_i, arr in enumerate(res_gain_array_in): res_gain_array[:, 0, :, 0, jones_i] = arr self.gain_array = fit_gain_array + res_gain_array else: self.gain_array = fit_gain_array # FHD doesn't really have a chi^2 measure. What is has is a convergence # measure. The solution converged well if this is less than the convergence # threshold ('conv_thresh' in extra_keywords). self.quality_array = np.zeros_like(self.gain_array, dtype=np.float64) convergence = cal_data["convergence"][0] for jones_i, arr in enumerate(convergence): self.quality_array[:, 0, :, 0, jones_i] = arr # Currently this can't include the times because the flag array # dimensions has to match the gain array dimensions. # This is somewhat artificial... self.flag_array = np.zeros_like(self.gain_array, dtype=np.bool_) flagged_ants = np.where(ant_use == 0)[0] for ant in flagged_ants: self.flag_array[ant, :] = 1 flagged_freqs = np.where(freq_use == 0)[0] for freq in flagged_freqs: self.flag_array[:, :, freq] = 1 if self.telescope_name.lower() == "mwa": self.ref_antenna_name = ( "Tile" + "0" * (3 - len(self.ref_antenna_name)) + self.ref_antenna_name ) # In Python 3, we sometimes get Unicode, sometimes bytes # doesn't reliably show up in tests though, so excluding it from coverage if isinstance(galaxy_model, bytes): # pragma: nocover galaxy_model = galaxy_model.decode("utf8") if galaxy_model == 0: galaxy_model = None else: galaxy_model = "gsm" if isinstance(diffuse_model, bytes): diffuse_model = diffuse_model.decode("utf8") if diffuse_model == "": diffuse_model = None else: diffuse_model = os.path.basename(diffuse_model) if galaxy_model is not None: if diffuse_model is not None: self.diffuse_model = galaxy_model + " + " + diffuse_model else: self.diffuse_model = galaxy_model elif diffuse_model is not None: self.diffuse_model = diffuse_model # FHD only has the diagonal elements (jxx, jyy) and if there's only one # present it must be jxx if self.Njones == 1: self.jones_array = np.array([-5]) else: self.jones_array = np.array([-5, -6]) # for calibration FHD creates gain array of shape (Nfreqs, Nants_telescope) # rather than (Nfreqs, Nants_data). This means the antenna array will # contain all antennas in the antenna table instead of only those # which had data in the original uvfits file self.ant_array = self.antenna_numbers self.extra_keywords["autoscal".upper()] = ( "[" + ", ".join(str(d) for d in auto_scale) + "]" ) self.extra_keywords["nvis_cal".upper()] = n_vis_cal self.extra_keywords["time_avg".upper()] = time_avg self.extra_keywords["cvgthres".upper()] = conv_thresh if not raw: self.extra_keywords["polyfit".upper()] = polyfit self.extra_keywords["bandpass".upper()] = bandpass if isinstance(mode_fit, (list, tuple, np.ndarray)): self.extra_keywords["mode_fit".upper()] = ( "[" + ", ".join(str(m) for m in mode_fit) + "]" ) else: self.extra_keywords["mode_fit".upper()] = mode_fit self.extra_keywords["amp_deg".upper()] = amp_degree self.extra_keywords["phse_deg".upper()] = phase_degree # wait for LSTs if set in background if proc is not None: proc.join() if use_future_array_shapes: self.use_future_array_shapes() else: warnings.warn(_future_array_shapes_warning, DeprecationWarning) if run_check: self.check( check_extra=check_extra, run_check_acceptability=run_check_acceptability )