Source code for pyuvdata.uvcal.fhd_cal

# -*- mode: python; coding: utf-8 -*-
# Copyright (c) 2018 Radio Astronomy Software Group
# Licensed under the 2-clause BSD License
"""Class for reading FHD calibration save files."""

from __future__ import absolute_import, division, print_function

import os
import numpy as np
import warnings
from scipy.io.idl import readsav

from .uvcal import UVCal
from .. import utils as uvutils
from ..uvdata.fhd import get_fhd_history

__all__ = ["FHDCal"]


[docs]class FHDCal(UVCal): """ Defines a FHD-specific subclass of UVCal for reading FHD calibration save files. This class should not be interacted with directly, instead use the read_fhd_cal method on the UVCal class. """
[docs] def read_fhd_cal( self, cal_file, obs_file, settings_file=None, raw=True, extra_history=None, run_check=True, check_extra=True, run_check_acceptability=True, ): """ Read data from an FHD cal.sav file. Parameters ---------- cal_file : str The cal.sav file to read from. obs_file : str The obs.sav file to read from. settings_file : str, optional The settings_file to read from. Optional, but very useful for provenance. raw : bool Option to use the raw (per antenna, per frequency) solution or to use the fitted (polynomial over phase/amplitude) solution. Default is True (meaning use the raw solutions). extra_history : str or list of str, optional String(s) to add to the object's history parameter. run_check : bool Option to check for the existence and proper shapes of parameters after reading in the file. check_extra : bool Option to check optional parameters as well as required ones. run_check_acceptability : bool Option to check acceptable range of the values of parameters after reading in the file. """ this_dict = readsav(cal_file, python_dict=True) cal_data = this_dict["cal"] this_dict = readsav(obs_file, python_dict=True) obs_data = this_dict["obs"] self.Nspws = 1 self.spw_array = np.array([0]) self.Nfreqs = int(cal_data["n_freq"][0]) self.freq_array = np.zeros( (self.Nspws, len(cal_data["freq"][0])), dtype=np.float_ ) self.freq_array[0, :] = cal_data["freq"][0] self.channel_width = float(np.mean(np.diff(self.freq_array))) # FHD only calculates one calibration over all the times. # cal_data.n_times gives the number of times that goes into that one # calibration, UVCal.Ntimes gives the number of separate calibrations # along the time axis. self.Ntimes = 1 time_array = obs_data["baseline_info"][0]["jdate"][0] self.integration_time = np.round(np.mean(np.diff(time_array)) * 24 * 3600, 2) self.time_array = np.array([np.mean(time_array)]) self.Njones = int(cal_data["n_pol"][0]) # FHD only has the diagonal elements (jxx, jyy) and if there's only one # present it must be jxx if self.Njones == 1: self.jones_array = np.array([-5]) else: self.jones_array = np.array([-5, -6]) self.telescope_name = obs_data["instrument"][0].decode("utf8") self.Nants_data = int(cal_data["n_tile"][0]) self.Nants_telescope = int(cal_data["n_tile"][0]) self.antenna_names = np.array( [n.decode("utf8") for n in cal_data["tile_names"][0].tolist()] ) self.antenna_numbers = np.arange(self.Nants_telescope) self.ant_array = np.arange(self.Nants_data) self.set_sky() self.sky_field = "phase center (RA, Dec): ({ra}, {dec})".format( ra=obs_data["orig_phasera"][0], dec=obs_data["orig_phasedec"][0] ) self.sky_catalog = cal_data["skymodel"][0]["catalog_name"][0].decode("utf8") self.ref_antenna_name = cal_data["ref_antenna_name"][0].decode("utf8") self.Nsources = int(cal_data["skymodel"][0]["n_sources"][0]) self.baseline_range = [ float(cal_data["min_cal_baseline"][0]), float(cal_data["max_cal_baseline"][0]), ] galaxy_model = cal_data["skymodel"][0]["galaxy_model"][0] # In Python 3, we sometimes get Unicode, sometimes bytes if isinstance(galaxy_model, bytes): galaxy_model = galaxy_model.decode("utf8") if galaxy_model == 0: galaxy_model = None else: galaxy_model = "gsm" diffuse_model = cal_data["skymodel"][0]["diffuse_model"][0] if isinstance(diffuse_model, bytes): diffuse_model = diffuse_model.decode("utf8") if diffuse_model == "": diffuse_model = None else: diffuse_model = os.path.basename(diffuse_model) if galaxy_model is not None: if diffuse_model is not None: self.diffuse_model = galaxy_model + " + " + diffuse_model else: self.diffuse_model = galaxy_model elif diffuse_model is not None: self.diffuse_model = diffuse_model self.gain_convention = "divide" self.x_orientation = "east" self.set_gain() fit_gain_array_in = cal_data["gain"][0] fit_gain_array = np.zeros( self._gain_array.expected_shape(self), dtype=np.complex_ ) for jones_i, arr in enumerate(fit_gain_array_in): fit_gain_array[:, 0, :, 0, jones_i] = arr if raw: res_gain_array_in = cal_data["gain_residual"][0] res_gain_array = np.zeros( self._gain_array.expected_shape(self), dtype=np.complex_ ) for jones_i, arr in enumerate(res_gain_array_in): res_gain_array[:, 0, :, 0, jones_i] = arr self.gain_array = fit_gain_array + res_gain_array else: self.gain_array = fit_gain_array # FHD doesn't really have a chi^2 measure. What is has is a convergence measure. # The solution converged well if this is less than the convergence # threshold ('conv_thresh' in extra_keywords). self.quality_array = np.zeros_like(self.gain_array, dtype=np.float) convergence = cal_data["convergence"][0] for jones_i, arr in enumerate(convergence): self.quality_array[:, 0, :, 0, jones_i] = arr # array of used frequencies (1: used, 0: flagged) freq_use = obs_data["baseline_info"][0]["freq_use"][0] # array of used antennas (1: used, 0: flagged) ant_use = obs_data["baseline_info"][0]["tile_use"][0] # array of used times (1: used, 0: flagged) time_use = obs_data["baseline_info"][0]["time_use"][0] time_array_use = time_array[np.where(time_use > 0)] self.time_range = [np.min(time_array_use), np.max(time_array_use)] # Currently this can't include the times because the flag array # dimensions has to match the gain array dimensions. # This is somewhat artificial... self.flag_array = np.zeros_like(self.gain_array, dtype=np.bool) flagged_ants = np.where(ant_use == 0)[0] for ant in flagged_ants: self.flag_array[ant, :] = 1 flagged_freqs = np.where(freq_use == 0)[0] for freq in flagged_freqs: self.flag_array[:, :, freq] = 1 # currently don't have branch info. may change in future. self.git_origin_cal = "https://github.com/EoRImaging/FHD" self.git_hash_cal = obs_data["code_version"][0].decode("utf8") self.extra_keywords["autoscal"] = ( "[" + ", ".join(str(d) for d in cal_data["auto_scale"][0]) + "]" ) self.extra_keywords["nvis_cal"] = cal_data["n_vis_cal"][0] self.extra_keywords["time_avg"] = cal_data["time_avg"][0] self.extra_keywords["cvgthres"] = cal_data["conv_thresh"][0] if "DELAYS" in obs_data.dtype.names: if obs_data["delays"][0] is not None: self.extra_keywords["delays"] = ( "[" + ", ".join(str(int(d)) for d in obs_data["delays"][0]) + "]" ) if not raw: self.extra_keywords["polyfit"] = cal_data["polyfit"][0] self.extra_keywords["bandpass"] = cal_data["bandpass"][0] self.extra_keywords["mode_fit"] = cal_data["mode_fit"][0] self.extra_keywords["amp_deg"] = cal_data["amp_degree"][0] self.extra_keywords["phse_deg"] = cal_data["phase_degree"][0] if settings_file is not None: self.history, self.observer = get_fhd_history( settings_file, return_user=True ) else: warnings.warn("No settings file, history will be incomplete") self.history = "" if extra_history is not None: if isinstance(extra_history, (list, tuple)): self.history += "\n" + "\n".join(extra_history) else: self.history += "\n" + extra_history if not uvutils._check_history_version(self.history, self.pyuvdata_version_str): if self.history.endswith("\n"): self.history += self.pyuvdata_version_str else: self.history += "\n" + self.pyuvdata_version_str if run_check: self.check( check_extra=check_extra, run_check_acceptability=run_check_acceptability )