Source code for pyuvdata.uvcal.uvcal

# Copyright (c) 2018 Radio Astronomy Software Group
# Licensed under the 2-clause BSD License
"""Primary container for radio interferometer calibration solutions."""

import copy
import os
import threading
import warnings

import numpy as np
from docstring_parser import DocstringStyle

from .. import Telescope, parameter as uvp, utils
from ..docstrings import combine_docstrings, copy_replace_short_description
from ..uvbase import UVBase
from . import initializers

__all__ = ["UVCal"]


def _time_param_check(this, other):
    """Check if any time parameter is defined in this but not in other or vice-versa."""
    if not isinstance(other, list):
        other = [other]
    time_params = ["time_array", "time_range"]
    for param in time_params:
        if getattr(this, param) is not None:
            for obj in other:
                if getattr(obj, param) is None:
                    raise ValueError(
                        f"Some objects have a {param} while others do not. All "
                        f"objects must either have or not have a {param}."
                    )
        else:
            for obj in other:
                if getattr(obj, param) is not None:
                    raise ValueError(
                        f"Some objects have a {param} while others do not. All "
                        f"objects must either have or not have a {param}."
                    )


[docs]class UVCal(UVBase): """ A class defining calibration solutions for interferometric data. Attributes ---------- UVParameter objects : For full list see the documentation on ReadTheDocs: http://pyuvdata.readthedocs.io/en/latest/. Some are always required, some are required for certain cal_types and cal_styles and others are always optional. """ def __init__(self): self._Nfreqs = uvp.UVParameter( "Nfreqs", description="Number of frequency channels", expected_type=int ) self._Njones = uvp.UVParameter( "Njones", description="Number of Jones calibration " "parameters (Number of Jones matrix elements " "calculated in calibration).", expected_type=int, ) desc = ( "Number of times with different calibrations calculated " "(if a calibration is calculated over a range of integrations, " "this gives the number of separate calibrations along the time axis)." ) self._Ntimes = uvp.UVParameter("Ntimes", description=desc, expected_type=int) self._history = uvp.UVParameter( "history", description="String of history, units English", form="str", expected_type=str, ) self._Nspws = uvp.UVParameter( "Nspws", description="Number of spectral windows " "(ie non-contiguous spectral chunks). ", expected_type=int, ) self._telescope = uvp.UVParameter( "telescope", description=( ":class:`pyuvdata.Telescope` object containing the telescope metadata." ), expected_type=Telescope, ) desc = ( "Number of antennas that have data associated with them " "(i.e. length of ant_array), which may be smaller than the number" "of antennas in the telescope (i.e. length of antenna_numbers)." ) self._Nants_data = uvp.UVParameter( "Nants_data", description=desc, expected_type=int ) desc = ( "Array of integer antenna numbers that appear in self.gain_array," " with shape (Nants_data,). " "This array is ordered to match the inherent ordering of the zeroth" " axis of self.gain_array." ) self._ant_array = uvp.UVParameter( "ant_array", description=desc, expected_type=int, form=("Nants_data",) ) desc = ( "Option to support 'wide-band' calibration solutions with gains or delays " "that apply over a range of frequencies rather than having distinct values " "at each frequency. Delay type cal solutions are always 'wide-band'. If it " "is True several other parameters are affected: the data-like arrays have " "a spw axis that is Nspws long rather than a frequency axis that is Nfreqs " "long; the `freq_range` parameter is required and the `freq_array` and " "`channel_width` parameters should not be set." ) self._wide_band = uvp.UVParameter( "wide_band", description=desc, expected_type=bool, value=False ) self._spw_array = uvp.UVParameter( "spw_array", description="Array of spectral window numbers, shape (Nspws).", form=("Nspws",), expected_type=int, ) # this dimensionality of freq_array does not allow for different spws # to have different numbers of channels desc = ( "Array of frequencies, center of the channel, shape (Nfreqs,) , units Hz." "Should not be set if wide_band=True." ) self._freq_array = uvp.UVParameter( "freq_array", description=desc, form=("Nfreqs",), expected_type=float, tols=1e-3, # mHz ) desc = ( "Width of frequency channels (Hz). Array of shape (Nfreqs,), type = float." "Should not be set if wide_band=True." ) self._channel_width = uvp.UVParameter( "channel_width", description=desc, expected_type=float, form=("Nfreqs",), tols=1e-3, # 1 mHz ) desc = ( "Required if cal_type='delay' or wide_band=True. Frequency range that " "solutions are valid for, with [start_frequency, end_frequency] provided " "for each spectral window. Array of shape (Nspws, 2). Units are Hz." "Should not be set if cal_type='gain' and wide_band=False." ) self._freq_range = uvp.UVParameter( "freq_range", required=False, description=desc, form=("Nspws", 2), expected_type=float, tols=1e-3, ) desc = ( "Array of antenna polarization integers, shape (Njones). " "linear pols -5:-8 (jxx, jyy, jxy, jyx)." "circular pols -1:-4 (jrr, jll. jrl, jlr)." "unknown 0." ) self._jones_array = uvp.UVParameter( "jones_array", description=desc, expected_type=int, acceptable_vals=list(np.arange(-8, 1)), form=("Njones",), ) desc = ( "Array of calibration solution times, center of integration, shape " "(Ntimes), units Julian Date. Should only be set cal solutions were " "calculated per integration (rather than over a range of integrations). " "Only one of time_range and time_array should be set." ) self._time_array = uvp.UVParameter( "time_array", description=desc, form=("Ntimes",), expected_type=float, tols=1e-3 / (60.0 * 60.0 * 24.0), required=False, ) desc = ( "Time range (in JD) that cal solutions are valid for. This should be an " "array with shape (Ntimes, 2) where the second axis gives the start_time " "and end_time (in that order) in JD. Should only be set if the cal " "solutions apply over a range of times. Only one of time_range and " "time_array should be set." ) self._time_range = uvp.UVParameter( "time_range", description=desc, form=("Ntimes", 2), expected_type=float, tols=1e-3 / (60.0 * 60.0 * 24.0), required=False, ) desc = ( "Array of lsts, center of integration, shape (Ntimes), units radians. " "Should only be set cal solutions were calculated per integration (rather " "than over a range of integrations). Only one of lst_range and lst_array " "should be set." ) self._lst_array = uvp.UVParameter( "lst_array", description=desc, form=("Ntimes",), expected_type=float, tols=utils.RADIAN_TOL, required=False, ) desc = ( "LST range (in JD) that cal solutions are valid for. This should be an " "array with shape (Ntimes, 2) where the second axis gives the start_lst " "and end_lst (in that order) in radians. Should only be set if the cal " "solutions apply over a range of times. Only one of lst_range and " "lst_array should be set." ) self._lst_range = uvp.UVParameter( "lst_range", description=desc, form=("Ntimes", 2), expected_type=float, tols=utils.RADIAN_TOL, required=False, ) desc = ( "Integration time of a calibration solution, units seconds. Shape " "(Ntimes), type float. Should be the total integration time of the " "data that went into calculating the calibration solution (i.e. the " "visibility integration time for calibration solutions that are " "calculated per visibility integration, the sum of the integration " "times that go into a calibration solution that was calculated over " "a range of integration times)." ) self._integration_time = uvp.UVParameter( "integration_time", description=desc, expected_type=float, form=("Ntimes",), tols=1e-3, # 1ms ) desc = ( "The convention for applying the calibration solutions to data." 'Values are "divide" or "multiply", indicating that to calibrate ' "one should divide or multiply uncalibrated data by gains. " "Mathematically this indicates the alpha exponent in the equation: " "calibrated data = gain^alpha * uncalibrated data. A value of " '"divide" represents alpha=-1 and "multiply" represents alpha=1.' ) self._gain_convention = uvp.UVParameter( "gain_convention", form="str", expected_type=str, description=desc, acceptable_vals=["divide", "multiply"], ) desc = ( "Array of flags to be applied to calibrated data (logical OR " "of input and flag generated by calibration). True is flagged. " "Shape: (Nants_data, Nfreqs, Ntimes, Njones) if wide_band=False or " "(Nants_data, Nspws, Ntimes, Njones) if wide_band=True, type = bool." ) self._flag_array = uvp.UVParameter( "flag_array", description=desc, form=("Nants_data", "Nfreqs", "Ntimes", "Njones"), expected_type=bool, ) desc = ( "Array of qualities of calibration solutions." "Shape: (Nants_data, Nfreqs, Ntimes, Njones) if wide_band=False or " "(Nants_data, Nspws, Ntimes, Njones) if wide_band=True, type = float." ) self._quality_array = uvp.UVParameter( "quality_array", description=desc, form=("Nants_data", "Nfreqs", "Ntimes", "Njones"), expected_type=float, required=False, ) desc = ( "Orientation of the physical dipole corresponding to what is " 'labelled as the x polarization. Options are "east" ' '(indicating east/west orientation) and "north" (indicating ' "north/south orientation)" ) # --- cal_type parameters --- desc = "cal type parameter. Values are delay or gain." self._cal_type = uvp.UVParameter( "cal_type", form="str", expected_type=str, value="gain", description=desc, acceptable_vals=["delay", "gain"], ) desc = ( 'Required if cal_type = "gain". Array of gains, ' "shape: (Nants_data, Nfreqs, Ntimes, Njones) if wide_band=False, or " "(Nants_data, Nspws, Ntimes, Njones) if wide_band=True, " "type = complex float." ) self._gain_array = uvp.UVParameter( "gain_array", description=desc, required=False, form=("Nants_data", "Nfreqs", "Ntimes", "Njones"), expected_type=complex, ) desc = ( 'Required if cal_type = "delay". Array of delays with units of seconds. ' "Shape: (Nants_data, Nspws, Ntimes, Njones), type=float." ) self._delay_array = uvp.UVParameter( "delay_array", description=desc, required=False, form=("Nants_data", "Nspws", "Ntimes", "Njones"), expected_type=float, ) # --- flexible spectral window information --- desc = ( "Required for non-wide-band objects. Maps individual channels along the " "frequency axis to individual spectral windows, as listed in the " "spw_array. Shape (Nfreqs), type = int." ) self._flex_spw_id_array = uvp.UVParameter( "flex_spw_id_array", description=desc, form=("Nfreqs",), expected_type=int ) desc = ( "Optional parameter that allows for labeling individual spectral windows " "with different polarizations. If set, Njones must be set to 1 (i.e., only " "one Jones vector per spectral window allowed). Shape (Nspws), type = int." ) self._flex_jones_array = uvp.UVParameter( "flex_jones_array", description=desc, form=("Nspws",), expected_type=int, acceptable_vals=list(np.arange(-8, 1)), required=False, ) # --- cal_style parameters --- desc = "Style of calibration. Values are sky or redundant." self._cal_style = uvp.UVParameter( "cal_style", form="str", expected_type=str, description=desc, acceptable_vals=["sky", "redundant"], ) # --- sky based calibration parameters --- desc = 'Required if cal_style = "sky". Name of calibration catalog.' self._sky_catalog = uvp.UVParameter( "sky_catalog", form="str", required=False, expected_type=str, description=desc, ) desc = 'Required if cal_style = "sky". Phase reference antenna.' self._ref_antenna_name = uvp.UVParameter( "ref_antenna_name", form="str", required=False, expected_type=str, description=desc, ) desc = ( 'Optional argument, only used if cal_style = "sky". Allows one to specify' "a different phase reference antenna per time interval. Shape (Ntimes,), " "type=int." ) self._ref_antenna_array = uvp.UVParameter( "ref_antenna_array", description=desc, form=("Ntimes",), expected_type=int, required=False, ) desc = "Number of sources used." self._Nsources = uvp.UVParameter( "Nsources", required=False, expected_type=int, description=desc ) desc = "Range of baselines used for calibration." self._baseline_range = uvp.UVParameter( "baseline_range", form=2, required=False, expected_type=float, description=desc, ) desc = "Name of diffuse model." self._diffuse_model = uvp.UVParameter( "diffuse_model", form="str", required=False, expected_type=str, description=desc, ) desc = ( "Specifies the number of phase centers contained within the calibration " "solution catalog." ) self._Nphase = uvp.UVParameter( "Nphase", required=False, expected_type=int, description=desc ) desc = ( "Optional parameter, similar to the UVData parameter of the same name. " "Dictionary that acts as a catalog, containing information on individual " "phase centers. Keys are the catalog IDs of the different phase centers in " "the object (matched to the parameter ``phase_center_id_array``). At a " "minimum, each dictionary must contain the keys: " "'cat_name' giving the phase center name (this does not have to be unique, " "non-unique values can be used to indicate sets of phase centers that make " "up a mosaic observation), " "'cat_type', which can be 'sidereal' (fixed position in RA/Dec), 'ephem' " "(position in RA/Dec which moves with time), 'driftscan' (fixed postion in " "Az/El, NOT the same as the old ``phase_type`` = 'drift') or 'unprojected' " "(baseline coordinates in ENU, but data are not phased, similar to " "the old ``phase_type`` = 'drift') " "'cat_lon' (longitude coord, e.g. RA, either a single value or a one " "dimensional array of length Npts --the number of ephemeris data points-- " "for ephem type phase centers), " "'cat_lat' (latitude coord, e.g. Dec., either a single value or a one " "dimensional array of length Npts --the number of ephemeris data points-- " "for ephem type phase centers), " "'cat_frame' (coordinate frame, e.g. icrs, must be a frame supported by " "astropy). " "Other optional keys include " "'cat_epoch' (epoch and equinox of the coordinate frame, not needed for " "frames without an epoch (e.g. ICRS) unless the there is proper motion), " "'cat_times' (times for the coordinates, only used for 'ephem' types), " "'cat_pm_ra' (proper motion in RA), " "'cat_pm_dec' (proper motion in Dec), " "'cat_dist' (physical distance to the source in parsec, useful if parallax " "is important, either a single value or a one dimensional array of length " "Npts --the number of ephemeris data points-- for ephem type phase " "centers.), " "'cat_vrad' (rest frame velocity in km/s, either a single value or a one " "dimensional array of length Npts --the number of ephemeris data points-- " "for ephem type phase centers.), and " "'info_source' (describes where catalog info came from). " "Most typically used with MS calibration tables." ) self._phase_center_catalog = uvp.UVParameter( "phase_center_catalog", description=desc, expected_type=dict, required=False ) desc = ( "Optional parameter, similar to the UVData parameter of the same name. " "Maps individual indices along the Ntimes axis to a key in " "`phase_center_catalog`, which maps to a dict containing the other " "metadata for each phase center. Used to specify where the data were " "phased to when calibration tables were derived. Most typically used when " "reading MS calibration tables. Shape (Nblts), type = int." ) self._phase_center_id_array = uvp.UVParameter( "phase_center_id_array", description=desc, form=("Ntimes",), expected_type=int, required=False, ) # --- truly optional parameters --- desc = ( "The gain scale of the calibration, which indicates the units of the " "calibrated visibilities. For example, Jy or K str." ) self._gain_scale = uvp.UVParameter( "gain_scale", form="str", expected_type=str, description=desc, required=False, ) desc = "Origin (on github for e.g) of calibration software. Url and branch." self._git_origin_cal = uvp.UVParameter( "git_origin_cal", form="str", expected_type=str, description=desc, required=False, ) desc = ( "Commit hash of calibration software (from git_origin_cal) used " "to generate solutions." ) self._git_hash_cal = uvp.UVParameter( "git_hash_cal", form="str", expected_type=str, description=desc, required=False, ) desc = "Name of observer who calculated solutions in this file." self._observer = uvp.UVParameter( "observer", form="str", description=desc, expected_type=str, required=False ) desc = ( "Array of qualities of the calibration for entire arrays. " "The shape depends on cal_type, if the cal_type is 'gain', " "the shape is: (Nfreqs, Ntimes, Njones) if wide_band=False, " "or (Nspws, Ntimes, Njones) if wide_band=True. If the cal_type is 'delay', " "the shape is (Nspws, Ntimes, Njones), type = float." ) self._total_quality_array = uvp.UVParameter( "total_quality_array", description=desc, form=("Nfreqs", "Ntimes", "Njones"), expected_type=float, required=False, ) desc = ( "Any user supplied extra keywords, type=dict. Keys should be " "8 character or less strings if writing to calfits files. " "Use the special key 'comment' for long multi-line string comments." ) self._extra_keywords = uvp.UVParameter( "extra_keywords", required=False, description=desc, value={}, spoof_val={}, expected_type=dict, ) desc = ( "List of strings containing the unique basenames (not the full path) of " "input files." ) self._filename = uvp.UVParameter( "filename", required=False, description=desc, expected_type=str ) desc = ( "Optional when reading an MS cal table. Retains the scan number when " "reading a measurement set. Shape (Nblts), type = int." ) self._scan_number_array = uvp.UVParameter( "scan_number_array", description=desc, form=("Ntimes",), expected_type=int, required=False, ) desc = ( "The convention for how instrumental polarizations (e.g. XX and YY) " "are converted to Stokes parameters. Options are 'sum' and 'avg', " "corresponding to I=XX+YY and I=(XX+YY)/2 (for linear instrumental " "polarizations) respectively. This parameter is not required, for " "backwards-compatibility reasons, but is highly recommended. If " "pol_convention is set, gain_scale should also be set." ) self._pol_convention = uvp.UVParameter( "pol_convention", required=False, description=desc, form="str", spoof_val="avg", acceptable_vals=["sum", "avg"], ) super().__init__() # Assign attributes to UVParameters after initialization, since UVBase.__init__ # will link the properties to the underlying UVParameter.value attributes # initialize the telescope object self.telescope = Telescope() # set the appropriate telescope attributes as required self._set_telescope_requirements() def _set_telescope_requirements(self): """Set the UVParameter required fields appropriately for UVCal.""" self.telescope._instrument.required = False self.telescope._feed_array.required = True self.telescope._feed_angle.required = True self.telescope._mount_type.required = True
[docs] @staticmethod @combine_docstrings(initializers.new_uvcal, style=DocstringStyle.NUMPYDOC) def new(**kwargs): """ Create a new UVCal object. All parameters are passed through to the :func:`~pyuvdata.uvcal.initializers.new_uvcal` function. Returns ------- UVCal A new UVCal object. """ return initializers.new_uvcal(**kwargs)
def _set_wide_band(self, wide_band=True): """ Set the wide_band parameter and adjust required parameters. This method should not be called directly by users; instead it is called by the file-reading methods to indicate that an object is a wide-band calibration solution which supports gain or delay values per spectral window. """ if wide_band: if self.flex_spw_id_array is not None and not np.array_equal( self.flex_spw_id_array, self.spw_array ): # pragma: no cover raise RuntimeError( "Something went wrong in UVCal._set_wide_band. Please " "file an issue in our GitHub issue log so that we can help: " "https://github.com/RadioAstronomySoftwareGroup/pyuvdata/issues." " Developer info: flex_spw_id_array is set but != spw_array." ) elif self.cal_type == "delay": # pragma: no cover raise RuntimeError( "delay objects cannot have wide_band=False. This is a bug, " "please make an issue." ) self.wide_band = wide_band if wide_band: self._freq_array.required = False self._channel_width.required = False self._flex_spw_id_array.required = False self._freq_range.required = True self.flex_spw_id_array = None data_shape_params = [ "gain_array", "delay_array", "flag_array", "quality_array", ] data_form = ("Nants_data", "Nspws", "Ntimes", "Njones") tot_qual_form = ("Nspws", "Ntimes", "Njones") for param_name in self._data_params: if param_name in data_shape_params: getattr(self, "_" + param_name).form = data_form elif param_name == "total_quality_array": getattr(self, "_" + param_name).form = tot_qual_form else: self._freq_array.required = True self._channel_width.required = True self._flex_spw_id_array.required = True self._freq_range.required = False self._flex_spw_id_array.required = True # can only get here if not a delay solution data_shape_params = ["gain_array", "flag_array", "quality_array"] data_form = ("Nants_data", "Nfreqs", "Ntimes", "Njones") tot_qual_form = ("Nfreqs", "Ntimes", "Njones") for param_name in self._data_params: if param_name in data_shape_params: getattr(self, "_" + param_name).form = data_form elif param_name == "total_quality_array": getattr(self, "_" + param_name).form = tot_qual_form def _set_gain(self): """Set cal_type to 'gain' and adjust required parameters.""" self.cal_type = "gain" self._gain_array.required = True self._delay_array.required = False self._freq_range.required = self.wide_band self._freq_array.required = not self.wide_band self._channel_width.required = not self.wide_band self._quality_array.form = self._gain_array.form self._total_quality_array.form = self._gain_array.form[1:] def _set_delay(self): """Set cal_type to 'delay' and adjust required parameters.""" self.cal_type = "delay" self._gain_array.required = False self._delay_array.required = True self._freq_range.required = True self._freq_array.required = False self._channel_width.required = False self._quality_array.form = self._delay_array.form self._total_quality_array.form = self._delay_array.form[1:] self._set_wide_band() def _set_sky(self): """Set cal_style to 'sky' and adjust required parameters.""" self.cal_style = "sky" self._sky_catalog.required = True self._ref_antenna_name.required = True def _set_redundant(self): """Set cal_style to 'redundant' and adjust required parameters.""" self.cal_style = "redundant" self._sky_catalog.required = False self._ref_antenna_name.required = False @property def _data_params(self): """List of strings giving the data-like parameters.""" return [ "gain_array", "delay_array", "flag_array", "quality_array", "total_quality_array", ] @property def _required_data_params(self): """List of strings giving the required data-like parameters.""" cal_type = self._cal_type.value if cal_type is None: raise ValueError( "Cannot get required data params because cal_type is not set." ) if cal_type == "gain": return ["gain_array", "flag_array"] elif cal_type == "delay": return ["delay_array", "flag_array"] else: return ["flag_array"] @property def data_like_parameters(self): """Iterate defined parameters which are data-like (not metadata-like).""" for key in self._data_params: if hasattr(self, key): yield getattr(self, key) @property def metadata_only(self): """ Property that determines whether this is a metadata only object. An object is metadata only if data_array, nsample_array and flag_array are all None. """ metadata_only = all(d is None for d in self.data_like_parameters) for param_name in self._required_data_params: getattr(self, "_" + param_name).required = not metadata_only return metadata_only
[docs] def remove_flex_jones(self, *, combine_spws=True): """ Convert a flex-Jones UVCal object into one with a standard Jones axis. This will convert a flexible-Jones dataset into one with standard polarization handling, which is required for some operations or writing in certain filetypes. Note that depending on how it is used, this can inflate the size of data-like parameters by up to a factor of Nspws (the true value depends on the number of unique entries in `flex_jones_array`). Parameters ---------- combine_spws : bool If set to True, the method will attempt to recombine multiple windows carrying different Jones vectors/information into a single (multi-Jones) spectral window. Functionally, this is the inverse of what is done in the `convert_to_flex_jones` method. If set to False, the method will effectively "inflate" the jones-axis of UVCal parameters such that all windows have the same Jones codes (though the added entries will be flagged and will carry no calibration information). Default is True. """ if self.flex_jones_array is None: # There isn't anything to do, so just move along return jones_array = list(reversed(np.unique(self.flex_jones_array))) n_jones = len(jones_array) if n_jones == 1 and (self.Nspws == 1 or not combine_spws): # Just remove the flex_jones_array and fix the polarization array self.jones_array = np.array(jones_array) self.flex_jones_array = None return if combine_spws: # check to see if there are spectral windows that have matching freq_array # and channel_width (up to sorting). If so, they need to be combined. if self.wide_band: freq_array_check = self.freq_range chan_width_check = np.zeros_like(freq_array_check) index_check = self.spw_array ftol = self._freq_range.tols ctol = [0, 0] else: freq_array_check = self.freq_array chan_width_check = self.channel_width index_check = self.flex_spw_id_array ftol = self._freq_array.tols ctol = self._channel_width.tols # Build the main spw map, along with some dicts to save intermediate results # from frequencies and Jones vectors spw_map = {} spw_freqs = {} spw_jones_tally = {} for jones1, spw1 in zip(self.flex_jones_array, self.spw_array, strict=True): freqs1 = freq_array_check[index_check == spw1] chwidth1 = chan_width_check[index_check == spw1] # Default to spw1 at the start, so if no match it will correctly link # back to itself. spw_match = spw1 for spw2, (freqs2, chwidth2) in spw_freqs.items(): if np.allclose( freqs1, freqs2, rtol=ftol[0], atol=ftol[1] ) and np.allclose(chwidth1, chwidth2, rtol=ctol[0], atol=ctol[1]): spw_match = spw2 break # If spw is its own match, save freqs and make an entry in jones dict if spw1 == spw_match: spw_freqs[spw1] = (freqs1, chwidth1) spw_jones_tally[spw1] = [] # Save matching spw, record which Jones vector this window has spw_map[spw1] = spw_match spw_jones_tally[spw_match].append(jones1) # Once all windows are processed, look for failures for jones_subarr in spw_jones_tally.values(): if len(np.unique(jones_subarr)) != len(jones_subarr): raise ValueError( "Some spectral windows have identical frequencies, " "channel widths and polarizations, so spws cannot be " "combined. Set combine_spws=False to avoid this error." ) elif len(np.unique(jones_subarr)) != len(jones_array): warnings.warn( "combine_spws is True but there are not matched spws for all " "polarizations, so spws will not be combined." ) combine_spws = False break if not combine_spws: # If not combining spws, then map everything back to itself index_check = self.spw_array if self.wide_band else self.flex_spw_id_array spw_map = dict(zip(self.spw_array, self.spw_array, strict=True)) # Reverse mapping map for various metadata values rev_map = {value: key for key, value in spw_map.items()} spw_array = sorted(rev_map) if self.flex_spw_id_array is not None: freq_array = np.concatenate( [self.freq_array[self.flex_spw_id_array == spw] for spw in spw_array] ) chan_width = np.concatenate( [self.channel_width[self.flex_spw_id_array == spw] for spw in spw_array] ) flex_spw_id_array = np.concatenate( [ self.flex_spw_id_array[self.flex_spw_id_array == spw] for spw in spw_array ] ) freq_range = None n_freqs = len(flex_spw_id_array) else: old_spw_arr = list(self.spw_array) freq_array = chan_width = None flex_spw_id_array = None freq_range = np.concatenate( [self.freq_range[old_spw_arr.index(spw)] for spw in spw_array] ).reshape((-1, 2)) n_freqs = 1 # Update metadata attributes self.spw_array = np.array(spw_array) self.flex_spw_id_array = flex_spw_id_array self.freq_array = freq_array self.channel_width = chan_width self.freq_range = freq_range self.jones_array = np.array(jones_array) # Adjust the length-related attributes self.Nspws = len(spw_array) self.Nfreqs = n_freqs self.Njones = n_jones # Finally, update data attrs if they exist if not self.metadata_only: freq_index = self.spw_array if self.wide_band else self.flex_spw_id_array for name, param in zip( self._data_params, self.data_like_parameters, strict=True ): if param is None: continue # Update the parameter shape with new_shape = list(param.shape) new_shape[1] = self.Nspws if self.wide_band else self.Nfreqs new_shape[3] = self.Njones new_param = np.full(new_shape, name == "flag_array", dtype=param.dtype) for idx, old_spw in enumerate(spw_map): new_fidx = freq_index == spw_map[old_spw] # index_check is recycled from the original combine_windows check # above (and marks the original indexing positions of the object) old_fidx = index_check == old_spw jones_idx = jones_array.index(self.flex_jones_array[idx]) new_param[:, new_fidx, :, jones_idx] = param[:, old_fidx, :, 0] setattr(self, name, new_param) # Finally, drop flex_jones now that we no longer need it's info self.flex_jones_array = None
def _make_flex_jones(self): """ Convert a regular UVCal object into one with flex-Jones enabled. This is an internal helper function, which is not designed to be called by users, but rather individual read/write functions for the UVCal object. This will convert a regular UVCal object into one that uses flexible Jones arrays, which allows for each spectral window to have its own unique Jones vector/code, useful for storing data more compactly when different windows have different Jones values recorded. Note that at this time, only one Jones code per-spw is allowed -- if more than one Jones code is found to have unflagged data in a given spectral window, then an error is returned. """ if self.metadata_only: raise ValueError( "Cannot make a metadata_only UVCal object flex-Jones because flagging " "info is required. Consider using `convert_to_flex_jones` instead, but " "be aware that the behavior is somewhat different" ) if self.Njones == 1: # This is basically a no-op, fix the relevant attributes and exit if self.flex_jones_array is None: self.flex_jones_array = np.full(self.Nspws, self.jones_array[0]) self.jones_array = np.array([0]) return jones_idx_arr = np.full(self.Nspws, -1) spw_id_arr = self.spw_array if self.wide_band else self.flex_spw_id_array for idx, spw in enumerate(self.spw_array): spw_screen = spw_id_arr == spw # For each window, we want to check that there is only one polarization with # any unflagged data, which we can do by seeing if not all of the flags # are set across the non-polarization axes (hence the ~np.all()). jones_check = ~np.all(self.flag_array[:, spw_screen], axis=(0, 1, 2)) if sum(jones_check) > 1: raise ValueError( "Cannot make a flex-pol UVCal object, as some windows have " "unflagged data in multiple Jones codes." ) elif np.any(jones_check): jones_idx_arr[idx] = np.where(jones_check)[0][0] # If one window was all flagged out, but the others all belong to the same pol, # assume we just want that Jones code. if len(np.unique(jones_idx_arr[jones_idx_arr >= 0])) == 1: jones_idx_arr[:] = np.unique(jones_idx_arr[jones_idx_arr >= 0]) # Now that we have Jones values sorted out, update metadata attributes self.flex_jones_array = self.jones_array[jones_idx_arr] self.jones_array = np.array([0]) self.Njones = 1 # Now go through one-by-one with data-like parameters and update for name, param in zip( self._data_params, self.data_like_parameters, strict=True ): # Grab the shape and update the Jones (last) axis to be length 1. Note we # do it this way since total_quality_array has a different shape if param is None: continue new_shape = list(param.shape) new_shape[-1] = 1 # We can use empty here, since we know that we will be filling all values new_param = np.empty(new_shape, dtype=param.dtype) for spw, jones_idx in zip(self.spw_array, jones_idx_arr, strict=True): # Process each window individually, since jones code can vary spw_screen = spw_id_arr == spw # Grab from the right Jones-position and plug values in new_param[:, spw_screen, :, 0] = param[:, spw_screen, :, jones_idx] # Update the attribute with the new values setattr(self, name, new_param)
[docs] def convert_to_flex_jones(self): """ Convert a regular UVCal object into a flex-Jones object. This effectively combines the frequency and polarization axis with polarization changing slowest. Saving data to uvh5 files this way can speed up some kinds of data access. """ if self.flex_jones_array is not None: raise ValueError("This is already a flex-pol object") if self.Njones == 1: # This is basically a no-op, fix the relevant attributes and exit self.flex_jones_array = np.full(self.Nspws, self.jones_array[0]) self.jones_array = np.array([0]) return old_nspws = self.Nspws * self.Njones njones = self.Njones new_nspws = self.Nspws * (self.Njones - 1) new_spw_ids = list(set(range(1, old_nspws + 1)).difference(self.spw_array)) spw_array = np.concatenate((self.spw_array, new_spw_ids[:new_nspws])) flex_spw_id_arr = None if not self.wide_band: spw_reshape = spw_array.reshape((self.Njones, -1)) flex_spw_id_arr = np.repeat( self.flex_spw_id_array[np.newaxis], self.Njones, axis=0 ) for idx in range(1, self.Njones): spw_map = dict(zip(spw_reshape[0], spw_reshape[idx], strict=True)) flex_spw_id_arr[idx] = [spw_map[jdx] for jdx in flex_spw_id_arr[idx]] flex_spw_id_arr = flex_spw_id_arr.flatten() self.flex_jones_array = np.repeat(self.jones_array, self.Nspws, axis=0) self.jones_array = np.array([0]) # Update metadata attributes self.spw_array = spw_array self.flex_spw_id_array = flex_spw_id_arr # If defined, set the frequency-related attributes if self.freq_array is not None: self.freq_array = np.tile(self.freq_array, njones) if self.freq_range is not None: self.freq_range = np.tile(self.freq_range, (njones, 1)) if self.channel_width is not None: self.channel_width = np.tile(self.channel_width, njones) # Adjust the length-related attributes self.Nspws *= self.Njones self.Nfreqs *= 1 if self.wide_band else self.Njones self.Njones = 1 # Finally, if we have it, update the metadata. if not self.metadata_only: for name, param in zip( self._data_params, self.data_like_parameters, strict=True ): if param is None: continue # Make into a list so that its mutable old_shape = param.shape # ("Nants_data", "Nfreqs", "Ntimes", "Njones"), # Extend the second axis by njones, shrink the last axis to 1 new_shape = [old_shape[0], old_shape[1] * njones, old_shape[2], 1] param = np.transpose(param, (0, 3, 1, 2)).reshape(new_shape) setattr(self, name, param)
[docs] def set_telescope_params( self, *, x_orientation=None, mount_type=None, overwrite=False, warn=None, run_check=True, check_extra=True, run_check_acceptability=True, ): """ Set telescope related parameters. If the telescope_name is in the known_telescopes, set the telescope location to the value for the known telescope. Also set the antenna positions if they are not set on the object and are available for the telescope. Parameters ---------- x_orientation : str or None String describing how the x-orientation is oriented. Must be either "north"/ "n"/"ns" (x-polarization of antenna has a position angle of 0 degrees with respect to zenith/north) or "east"/"e"/"ew" (x-polarization of antenna has a position angle of 90 degrees with respect to zenith/north). Ignored if "x_orientation" is relevant entry for the known telescope, or if set to None. mount_type : str or None String describing the mount amount type, which describes the optics. Supported options include: "alt-az" (primary rotates in azimuth and elevation), "equatorial" (primary rotates in hour angle and declination), "orbiting" (antenna is in motion, and its orientation depends on orbital parameters), "x-y" (primary rotates first in the plane connecting east, west, and zenith, and then perpendicular to that plane), "alt-az+nasmyth-r" ("alt-az" mount with a right-handed 90-degree tertiary mirror), "alt-az+nasmyth-l" ("alt-az" mount with a left-handed 90-degree tertiary mirror), "phased" (antenna is "electronically steered" by summing the voltages of multiple elements, e.g. MWA), "fixed" (antenna beam pattern is fixed in azimuth and elevation, e.g., HERA), and "other" (also referred to in some formats as "bizarre"). See the "Conventions" page of the documentation for further details. overwrite : bool Option to overwrite existing telescope-associated parameters with the values from the known telescope. Default is False. warn : bool Option to issue a warning listing all modified parameters. Default is True if `overwrite=True`, otherwise False. run_check : bool Option to check for the existence and proper shapes of parameters after updating. Default is True. check_extra : bool Option to check optional parameters as well as required ones. Default is True. run_check_acceptability : bool Option to check acceptable range of the values of parameters after updating. Default is True Raises ------ ValueError if the telescope_name is not in known telescopes """ self.telescope.update_params_from_known_telescopes( overwrite=overwrite, warn=overwrite if warn is None else warn, run_check=run_check, check_extra=check_extra, run_check_acceptability=run_check_acceptability, polarization_array=self.jones_array, flex_polarization_array=self.flex_jones_array, x_orientation=x_orientation, mount_type=mount_type, override_known_params=False, )
def _set_lsts_helper(self, *, astrometry_library=None): if self.time_array is not None: self.lst_array = utils.get_lst_for_time( jd_array=self.time_array, telescope_loc=self.telescope.location ) if self.time_range is not None: self.lst_range = utils.get_lst_for_time( jd_array=self.time_range, telescope_loc=self.telescope.location ) return
[docs] def set_lsts_from_time_array(self, *, background=False, astrometry_library=None): """Set the lst_array or lst_range from the time_array or time_range. Parameters ---------- background : bool, False When set to True, start the calculation on a threading.Thread in the background and return the thread to the user. astrometry_library : str Library used for calculating LSTs. Allowed options are 'erfa' (which uses the pyERFA), 'novas' (which uses the python-novas library), and 'astropy' (which uses the astropy utilities). Default is erfa unless the telescope_location frame is MCMF (on the moon), in which case the default is astropy. Returns ------- proc : None or threading.Thread instance When background is set to True, a thread is returned which must be joined before the lst_array exists on the UVCal object. """ if not background: self._set_lsts_helper(astrometry_library=astrometry_library) return else: proc = threading.Thread( target=self._set_lsts_helper, kwargs={"astrometry_library": astrometry_library}, ) proc.start() return proc
def _check_flex_spw_contiguous(self, *, raise_errors=True): """ Check if the spectral windows are contiguous. This checks the flex_spw_id_array to make sure that all channels for each spectral window are together in one block, versus being interspersed (e.g., channel #1 and #3 is in spw #1, channels #2 and #4 are in spw #2). Parameters ---------- raise_errors : bool or None If set to True, then the function will raise an error if checks are failed. If set to False, then a warning is raised instead. If set to None, then no errors or warnings are raised. """ if not self.wide_band: utils.frequency._check_flex_spw_contiguous( spw_array=self.spw_array, flex_spw_id_array=self.flex_spw_id_array, strict=raise_errors, ) def _check_freq_spacing(self, *, raise_errors=True): """ Check if frequencies are evenly spaced. This is a requirement for writing calfits files. Parameters ---------- raise_errors : bool or None If set to True, then the function will raise an error if checks are failed. If set to False, then a warning is raised instead. If set to None, then no errors or warnings are raised. Returns ------- spacing_error : bool Flag that channel spacings or channel widths are not equal. chanwidth_error : bool Flag that channel spacing does not match channel width. """ if (self.freq_array is None) or (self.Nfreqs == 1): return False, False return utils.frequency._check_freq_spacing( freq_array=self._freq_array, channel_width=self._channel_width, spw_array=self.spw_array, flex_spw_id_array=self.flex_spw_id_array, strict=raise_errors, ignore_chanwidth_error=True, ) def _check_jones_spacing(self, *, raise_errors=True): """ Check if Jones terms are evenly spaced. This is a requirement for calfits files. Parameters ---------- raise_errors : bool If set to True, then the function will raise an error if checks are failed. If set to False, then a warning is raised instead. If set to None, then no errors or warnings are raised. """ return utils.pol._check_jones_spacing( jones_array=self._jones_array, strict=raise_errors ) def _check_time_spacing(self, *, raise_errors=True): """ Check if times are evenly spaced. This is a requirement for calfits files. Parameters ---------- raise_errors : bool If set to True, then the function will raise an error if checks are failed. If set to False, then a warning is raised instead. If set to None, then no errors or warnings are raised. """ return utils.times._check_time_spacing( times_array=self._time_array, time_range=self._time_range, integration_time=self._integration_time, strict=raise_errors, ) def _add_phase_center( self, cat_name, *, cat_type=None, cat_lon=None, cat_lat=None, cat_frame=None, cat_epoch=None, cat_times=None, cat_pm_ra=None, cat_pm_dec=None, cat_dist=None, cat_vrad=None, info_source="user", force_update=False, cat_id=None, ): """ Add an entry to the internal object/source catalog or find a matching one. This is a helper function for identifying a adding a phase center to the internal catalog, contained within the attribute `phase_center_catalog`, unless a phase center already exists that matches the passed parameters. If a matching phase center is found, the catalog ID associated with that phase center is returned. Parameters ---------- cat_name : str Name of the phase center to be added. cat_type : str Type of phase center to be added. Must be one of: "sidereal" (fixed RA/Dec), "ephem" (RA/Dec that moves with time), "driftscan" (fixed az/el position), "unprojected" (no w-projection, equivalent to the old `phase_type` == "drift"). cat_lon : float or ndarray Value of the longitudinal coordinate (e.g., RA, Az, l) in radians of the phase center. No default unless `cat_type="unprojected"`, in which case the default is zero. Expected to be a float for sidereal and driftscan phase centers, and an ndarray of floats of shape (Npts,) for ephem phase centers. cat_lat : float or ndarray Value of the latitudinal coordinate (e.g., Dec, El, b) in radians of the phase center. No default unless `cat_type="unprojected"`, in which case the default is pi/2. Expected to be a float for sidereal and driftscan phase centers, and an ndarray of floats of shape (Npts,) for ephem phase centers. cat_frame : str Coordinate frame that cat_lon and cat_lat are given in. Only used for sidereal and ephem targets. Can be any of the several supported frames in astropy (a limited list: fk4, fk5, icrs, gcrs, cirs, galactic). cat_epoch : str or float Epoch of the coordinates, only used when cat_frame = fk4 or fk5. Given in units of fractional years, either as a float or as a string with the epoch abbreviation (e.g, Julian epoch 2000.0 would be J2000.0). cat_times : ndarray of floats Only used when `cat_type="ephem"`. Describes the time for which the values of `cat_lon` and `cat_lat` are calculated, in units of JD. Shape is (Npts,). cat_pm_ra : float Proper motion in RA, in units of mas/year. Only used for sidereal phase centers. cat_pm_dec : float Proper motion in Dec, in units of mas/year. Only used for sidereal phase centers. cat_dist : float or ndarray of float Distance of the source, in units of pc. Only used for sidereal and ephem phase centers. Expected to be a float for sidereal and driftscan phase centers, and an ndarray of floats of shape (Npts,) for ephem phase centers. cat_vrad : float or ndarray of float Radial velocity of the source, in units of km/s. Only used for sidereal and ephem phase centers. Expected to be a float for sidereal and driftscan phase centers, and an ndarray of floats of shape (Npts,) for ephem phase centers. info_source : str Optional string describing the source of the information provided. Used primarily in UVCal to denote when an ephemeris has been supplied by the JPL-Horizons system, user-supplied, or read in by one of the various file interpreters. Default is 'user'. force_update : bool Normally, `_add_phase_center` will throw an error if there already exists a phase_center with the given cat_id. However, if one sets `force_update=True`, the method will overwrite the existing entry in `phase_center_catalog` with the parameters supplied. Note that doing this will _not_ update other attributes of the `UVCal` object. Default is False. cat_id : int An integer signifying the ID number for the phase center, used in the `phase_center_id_array` attribute. If a matching phase center entry exists already, that phase center ID will be returned, which may be different than the value specified to this parameter. The default is for the method to assign this value automatically. Returns ------- cat_id : int The unique ID number for the phase center that either matches the specified parameters or was added to the internal catalog. If a matching entry was found, this may not be the value passed to the `cat_id` parameter. This value is used in the `phase_center_id_array` attribute to denote which source a given baseline-time corresponds to. Raises ------ ValueError If attempting to add a non-unique source name or if adding a sidereal source without coordinates. """ cat_entry = utils.phase_center_catalog.generate_phase_center_cat_entry( cat_name=cat_name, cat_type=cat_type, cat_lon=cat_lon, cat_lat=cat_lat, cat_frame=cat_frame, cat_epoch=cat_epoch, cat_times=cat_times, cat_pm_ra=cat_pm_ra, cat_pm_dec=cat_pm_dec, cat_dist=cat_dist, cat_vrad=cat_vrad, info_source=info_source, ) # We want to create a unique ID for each source, for use in indexing arrays. # The logic below ensures that we pick the lowest positive integer that is # not currently being used by another source if cat_id is None or not force_update: cat_id = utils.phase_center_catalog.generate_new_phase_center_id( phase_center_catalog=self.phase_center_catalog, cat_id=cat_id ) if self.phase_center_catalog is None: # Initialize an empty dict to plug entries into self.phase_center_catalog = {} else: # Let's warn if this entry has the same name as an existing one temp_id, cat_diffs = utils.phase_center_catalog.look_in_catalog( self.phase_center_catalog, phase_dict=cat_entry ) # If the source does have the same name, check to see if all the # attributes match. If so, no problem, go about your business if temp_id is not None: if cat_diffs == 0: # Everything matches, return the catalog ID of the matching entry return temp_id warnings.warn( f"The provided name {cat_name} is already used but has different " "parameters. Adding another entry with the same name but a " "different ID and parameters." ) # If source is unique, begin creating a dictionary for it self.phase_center_catalog[cat_id] = cat_entry self.Nphase = len(self.phase_center_catalog) return cat_id def _remove_phase_center(self, defunct_id): """ Remove an entry from the internal object/source catalog. Removes an entry from the attribute `phase_center_catalog`. Parameters ---------- defunct_id : int Catalog ID of the source to be removed Raises ------ IndexError If the name provided is not found as a key in `phase_center_catalog` """ if defunct_id not in self.phase_center_catalog: raise IndexError("No source by that ID contained in the catalog.") del self.phase_center_catalog[defunct_id] self.Nphase = len(self.phase_center_catalog) def _clear_unused_phase_centers(self): """ Remove objects dictionaries and names that are no longer in use. Goes through the `phase_center_catalog` attribute in of a UVCal object and clears out entries that are no longer being used, and appropriately updates `phase_center_id_array` accordingly. This function is not typically called by users, but instead is used by other methods. """ unique_cat_ids = np.unique(self.phase_center_id_array) defunct_list = [] Nphase = 0 for cat_id in self.phase_center_catalog: if cat_id in unique_cat_ids: Nphase += 1 else: defunct_list.append(cat_id) # Check the number of "good" sources we have -- if we haven't dropped any, # then we are free to bail, otherwise update the Nphase attribute if Nphase == self.Nphase: return # Time to kill the entries that are no longer in the source stack for defunct_id in defunct_list: self._remove_phase_center(defunct_id)
[docs] def print_phase_center_info( self, catalog_identifier=None, *, hms_format=None, return_str=False, print_table=True, ): """ Print out the details of the phase centers. Prints out an ASCII table that contains the details of the `phase_center_catalog` attribute, which acts as the internal source catalog for UVCal objects. Parameters ---------- catalog_identifier : str or int or list of str or int Optional parameter which, if provided, will cause the method to only return information on the phase center(s) with the matching name(s) or catalog ID number(s). Default is to print out information on all catalog entries. hms_format : bool Optional parameter, which if selected, can be used to force coordinates to be printed out in Hours-Min-Sec (if set to True) or Deg-Min-Sec (if set to False) format. Default is to print out in HMS if all the objects have coordinate frames of icrs, gcrs, fk5, fk4, and top; otherwise, DMS format is used. return_str: bool If set to True, the method returns an ASCII string which contains all the table infrmation. Default is False. print_table : bool If set to True, prints the table to the terminal window. Default is True. Returns ------- table_str : bool If return_str=True, an ASCII string containing the entire table text Raises ------ ValueError If `cat_name` matches no keys in `phase_center_catalog`. """ return utils.phase_center_catalog.print_phase_center_info( self.phase_center_catalog, catalog_identifier=catalog_identifier, hms_format=hms_format, return_str=return_str, print_table=print_table, )
def _update_phase_center_id(self, cat_id, *, new_id=None, reserved_ids=None): """ Update a phase center with a new catalog ID number. Parameters ---------- cat_id : int Current catalog ID of the phase center, which corresponds to a key in the attribute `phase_center_catalog`. new_id : int Optional argument. If supplied, then the method will attempt to use the provided value as the new catalog ID, provided that an existing catalog entry is not already using the same value. If not supplied, then the method will automatically assign a value. reserved_ids : array-like in int Optional argument. An array-like of ints that denotes which ID numbers are already reserved. Useful for when combining two separate catalogs. Raises ------ ValueError If not using the method on a multi-phase-ctr data set, if there's no entry that matches `cat_name`, or of the value `new_id` is already taken. """ new_id = utils.phase_center_catalog.generate_new_phase_center_id( phase_center_catalog=self.phase_center_catalog, cat_id=new_id, old_id=cat_id, reserved_ids=reserved_ids, ) # If new_id is None, it means that the existing ID was fine and needs no update if new_id is not None: self.phase_center_id_array[self.phase_center_id_array == cat_id] = new_id self.phase_center_catalog[new_id] = self.phase_center_catalog.pop(cat_id) def _consolidate_phase_center_catalogs( self, *, reference_catalog=None, other=None, ignore_name=False ): """ Consolidate phase center catalogs with a reference or another object. This is a helper method which updates the phase_center_catalog and related parameters to make this object consistent with a reference catalog or with another object so the second object can be added or concatenated to this object. If both `reference_catalog` and `other` are provided, both this object and the one passed to `other` will have their catalogs updated. Parameters ---------- reference_catalog : dict A reference catalog to make this object consistent with. other : UVCal or UVData object Nominally a UVCal object which self needs to be consistent with because it will be added to self. The phase_center_catalog from other is used as the reference catalog if the reference_catalog is None. If `reference_catalog` is also set, the phase_center_catalog on other will also be modified to be consistent with the `reference_catalog`. Note that a UVData object can also be supplied (in order to ensure consistency between two related objects). ignore_name : bool Option to ignore the name of the phase center (`cat_name` in `phase_center_catalog`) when identifying matching phase centers. If set to True, phase centers that are the same up to their name will be combined with the name set to the reference catalog name or the name found in the first UVCal object. If set to False, phase centers that are the same up to the name will be kept as separate phase centers. Default is False. """ if reference_catalog is None and other is None: raise ValueError( "Either the reference_catalog or the other parameter must be set." ) if other is not None: # If exists, first update other to be consistent with the reference if reference_catalog is not None: other._consolidate_phase_center_catalogs( reference_catalog=reference_catalog ) # then use the updated other as the reference reference_catalog = other.phase_center_catalog reserved_ids = list(reference_catalog) # First loop, we want to update all the catalog IDs so that we know there # are no conflicts with the reference for cat_id in list(self.phase_center_catalog): self._update_phase_center_id(cat_id, reserved_ids=reserved_ids) # Next loop, we want to update the IDs of sources that are in the reference. for cat_id in list(reference_catalog): # Normally one would wrap this in an items() call above, except that for # testing it's sometimes convenient to use self.phase_center_catalog as # the ref catalog, which causes a RunTime error due to updates to the dict. cat_entry = reference_catalog[cat_id] match_id, match_diffs = utils.phase_center_catalog.look_in_catalog( self.phase_center_catalog, phase_dict=cat_entry, ignore_name=ignore_name ) if match_id is None or match_diffs != 0: # If no match, just add the entry self._add_phase_center(cat_id=cat_id, **cat_entry) continue # If match_diffs is 0 then all the keys in the phase center catalog # match, so this is functionally the same source self._update_phase_center_id(match_id, new_id=cat_id) # look_in_catalog ignores the "info_source" field, so update it to match self.phase_center_catalog[cat_id]["info_source"] = cat_entry["info_source"] if ignore_name: # Make the names match if names were ignored in matching self.phase_center_catalog[cat_id]["cat_name"] = cat_entry["cat_name"]
[docs] def check( self, *, check_extra=True, run_check_acceptability=True, check_freq_spacing=False, check_jones_spacing=False, check_time_spacing=False, raise_spacing_errors=True, lst_tol=utils.LST_RAD_TOL, ): """ Add some extra checks on top of checks on UVBase class. Check that required parameters exist. Check that parameters have appropriate shapes and optionally that the values are acceptable. Parameters ---------- check_extra : bool If true, check all parameters, otherwise only check required parameters. run_check_acceptability : bool Option to check if values in parameters are acceptable. check_freq_spacing : bool Option to check if frequencies are evenly spaced and the spacing is equal to their channel_width. This is not required for UVCal objects in general but is required to write to calfits files. check_jones_spacing : bool Option to check if Jones terms are evenly spaced. This is not required for UVCal objects in general but is required to write to calfits files. check_time_spacing : bool Option to check if times are evenly spaced. This is not required for UVCal objects in general but is required to write to calfits files. raise_spacing_errors : bool or None If set to True, then the function will raise an error if spacing checks are failed. If set to False, then a warning is raised instead. If set to None, then no errors or warnings are raised. lst_tol : float or None Tolerance level at which to test LSTs against their expected values. If provided as a float, must be in units of radians. If set to None, the default precision tolerance from the `lst_array` parameter is used (1 mas). Default value is 75 mas, which is set by the predictive uncertainty in IERS calculations of DUT1 (RMS is of order 1 ms, with with a 5-sigma threshold for detection is used to prevent false issues from being reported), which for some observatories sets the precision with which these values are written. Returns ------- bool True if check passes Raises ------ ValueError if parameter shapes or types are wrong or do not have acceptable values (if run_check_acceptability is True) """ # Make sure requirements are set properly for cal_style if self.cal_style == "sky": self._set_sky() elif self.cal_style == "redundant": self._set_redundant() self._set_telescope_requirements() # if wide_band is True, Nfreqs must be 1. if self.wide_band and self.Nfreqs != 1: raise ValueError("Nfreqs is required to be 1 for wide_band cals.") # call metadata_only to make sure that parameter requirements are set properly self.metadata_only # noqa B018 # first run the basic check from UVBase super().check( check_extra=check_extra, run_check_acceptability=run_check_acceptability ) # then run telescope object check self.telescope.check( check_extra=check_extra, run_check_acceptability=run_check_acceptability ) # gain_scale should be set if pol_convention is set. if self.pol_convention is not None and self.gain_scale is None: warnings.warn( "gain_scale should be set if pol_convention is set. When calibrating " "data with `uvcalibrate`, pol_convention will be ignored if " "gain_scale is not set." ) # deprecate having both time_array and time_range set time_like_pairs = [("time_array", "time_range"), ("lst_array", "lst_range")] for pair in time_like_pairs: if ( getattr(self, pair[0]) is not None and getattr(self, pair[1]) is not None ): raise ValueError( f"The {pair[0]} and {pair[1]} attributes are both set, but only " "one should be set." ) elif getattr(self, pair[0]) is None and getattr(self, pair[1]) is None: raise ValueError(f"Either {pair[0]} or {pair[1]} must be set.") # check that corresponding lst/time parameters are set for tp_ind, param in enumerate(time_like_pairs[0]): lst_param = time_like_pairs[1][tp_ind] if getattr(self, param) is not None and getattr(self, lst_param) is None: raise ValueError( f"If {param} is present, {lst_param} must also be present." ) # check that time ranges are well formed and do not overlap if self.time_range is not None and utils.tools._check_range_overlap( self.time_range ): raise ValueError("Some time_ranges overlap.") # note: do not check lst range overlap because of branch cut. # Assume they are ok if time_ranges are ok. # require that all entries in ant_array exist in antenna_numbers if not all(ant in self.telescope.antenna_numbers for ant in self.ant_array): raise ValueError("All antennas in ant_array must be in antenna_numbers.") if not self.wide_band: if not np.all(np.isin(self.flex_spw_id_array, self.spw_array)): raise ValueError( "All values in the flex_spw_id_array must exist in the spw_array." ) # warn if freq_range or freq_array set when it shouldn't be if self.freq_range is not None: raise ValueError( "The freq_range attribute should not be set if wide_band=False." ) if self.wide_band: if self.freq_array is not None: raise ValueError( "The freq_array attribute should not be set if wide_band=True." ) if self.channel_width is not None: raise ValueError( "The channel_width attribute should not be set if wide_band=True." ) if self.flex_spw_id_array is not None: raise ValueError( "The flex_spw_id_array attribute should not be set if " "wide_band=True." ) if check_freq_spacing: self._check_freq_spacing(raise_errors=raise_spacing_errors) if check_jones_spacing: self._check_jones_spacing(raise_errors=raise_spacing_errors) if check_time_spacing: self._check_time_spacing(raise_errors=raise_spacing_errors) if run_check_acceptability: # Check antenna positions utils.coordinates.check_surface_based_positions( antenna_positions=self.telescope.antenna_positions, telescope_loc=self.telescope.location, raise_error=False, ) if self.time_array is not None: utils.times.check_lsts_against_times( jd_array=self.time_array, lst_array=self.lst_array, telescope_loc=self.telescope.location, lst_tols=self._lst_array.tols if lst_tol is None else [0, lst_tol], ) if self.time_range is not None: utils.times.check_lsts_against_times( jd_array=self.time_range, lst_array=self.lst_range, telescope_loc=self.telescope.location, lst_tols=self._lst_array.tols if lst_tol is None else [0, lst_tol], ) return True
[docs] def copy(self, *, metadata_only=False): """ Make and return a copy of the UVCal object. Parameters ---------- metadata_only : bool If True, only copy the metadata of the object. Returns ------- UVCal Copy of self. """ if not metadata_only: return super().copy() else: uv = UVCal() # include all attributes, not just UVParameter ones. for attr in self.__iter__(uvparams_only=False): # skip properties if isinstance(getattr(type(self), attr, None), property): continue # skip data like parameters # parameter names have a leading underscore we want to ignore if attr.lstrip("_") in self._data_params: continue setattr(uv, attr, copy.deepcopy(getattr(self, attr))) return uv
def _key_exists(self, *, antnum=None, jpol=None): """ Check if this UVCal has the requested antenna or polarization. Parameters ---------- antnum : int Antenna number to check. jpol : str or int Antenna polarization string or integer to check. Returns ------- bool Boolean indicator of whether the antenna and/or antenna polarization is present on this object. """ if antnum is not None and antnum not in self.ant_array: return False if jpol is not None: if isinstance(jpol, str | np.str_): jpol = utils.jstr2num( jpol, x_orientation=self.telescope.get_x_orientation_from_feeds() ) if jpol not in self.jones_array: return False return True
[docs] def ant2ind(self, antnum): """ Get the index in data arrays for an antenna number. Parameters ---------- antnum : int Antenna number to get index for. Returns ------- int Antenna index in data arrays. """ if not self._key_exists(antnum=antnum): raise ValueError(f"{antnum} not found in ant_array") return np.argmin(np.abs(self.ant_array - antnum))
[docs] def jpol2ind(self, jpol): """ Get the index in data arrays for an antenna polarization. Parameters ---------- jpol : int or str Antenna polarization to get index for. Returns ------- int Antenna polarization index in data arrays """ if isinstance(jpol, str | np.str_): jpol = utils.jstr2num( jpol, x_orientation=self.telescope.get_x_orientation_from_feeds() ) if not self._key_exists(jpol=jpol): raise ValueError(f"{jpol} not found in jones_array") return np.argmin(np.abs(self.jones_array - jpol))
def _slice_array(self, key, data_array, *, squeeze_pol=True): """ Slice a data array given a data key. Parameters ---------- key : int or length 2 tuple of ints or int and str Antenna or antenna and polarization to get slice for. If it's a length 2 tuple, the second value must be an antenna polarization int or string parsable by jpol2ind. data_array : :class: numpy ndarray Array to get slice of. Must have the shape of the gain_array or delay_array. squeeze_pol : bool Option to squeeze pol dimension if possible. Returns ------- :class: numpy ndarray Slice of the data_array for the key. """ key = utils.tools._get_iterable(key) if len(key) == 1: # interpret as a single antenna output = data_array[self.ant2ind(key[0]), :, :, :] if squeeze_pol and output.shape[-1] == 1: output = output[:, :, 0] elif len(key) == 2: # interpret as an antenna-pol pair output = data_array[self.ant2ind(key[0]), :, :, self.jpol2ind(key[1])] return output def _parse_key(self, ant, *, jpol=None): """ Parse key inputs and return a standard antenna-polarization key. Parameters ---------- ant : int or length 2 tuple of ints or int and str Antenna or antenna and polarization to get key for. If it's a length 2 tuple, the second value must be an antenna polarization int or string parsable by jpol2ind. jpol : int or str Antenna polarization int or string parsable by jpol2ind. Only used if `ant` is an integer. Returns ------- tuple Standard key tuple. """ if isinstance(ant, list | tuple): # interpret ant as (ant,) or (ant, jpol) key = tuple(ant) elif isinstance(ant, int | np.integer): # interpret ant as antenna number key = (ant,) # add jpol if fed if jpol is not None: key += (jpol,) return key
[docs] def get_gains(self, ant, jpol=None, *, squeeze_pol=True): """ Get the gain associated with an antenna and/or polarization. Parameters ---------- ant : int or length 2 tuple of ints or int and str Antenna or antenna and polarization to get gains for. If it's a length 2 tuple, the second value must be an antenna polarization int or string parsable by jpol2ind. jpol : int or str, optional Instrumental polarization to request. Ex. 'Jxx' squeeze_pol : bool Option to squeeze pol dimension if possible. Returns ------- complex ndarray Gain solution of shape (Nfreqs, Ntimes, Njones) or (Nfreqs, Ntimes) if jpol is set or if squeeze_pol is True and Njones = 1. """ if self.cal_type != "gain": raise ValueError("cal_type must be 'gain' for get_gains() method") return self._slice_array( self._parse_key(ant, jpol=jpol), self.gain_array, squeeze_pol=squeeze_pol )
[docs] def get_flags(self, ant, jpol=None, *, squeeze_pol=True): """ Get the flags associated with an antenna and/or polarization. Parameters ---------- ant : int or length 2 tuple of ints or int and str Antenna or antenna and polarization to get gains for. If it's a length 2 tuple, the second value must be an antenna polarization int or string parsable by jpol2ind. jpol : int or str, optional Instrumental polarization to request. Ex. 'Jxx' squeeze_pol : bool Option to squeeze pol dimension if possible. Returns ------- boolean ndarray Flags of shape (Nfreqs, Ntimes, Njones) or (Nfreqs, Ntimes) if jpol is set or if squeeze_pol is True and Njones = 1. """ return self._slice_array( self._parse_key(ant, jpol=jpol), self.flag_array, squeeze_pol=squeeze_pol )
[docs] def get_quality(self, ant, jpol=None, *, squeeze_pol=True): """ Get the qualities associated with an antenna and/or polarization. Parameters ---------- ant : int or length 2 tuple of ints or int and str Antenna or antenna and polarization to get gains for. If it's a length 2 tuple, the second value must be an antenna polarization int or string parsable by jpol2ind. jpol : int or str, optional Instrumental polarization to request. Ex. 'Jxx' squeeze_pol : bool Option to squeeze pol dimension if possible. Returns ------- float ndarray Qualities of shape (Nfreqs, Ntimes, Njones) or (Nfreqs, Ntimes) if jpol is not None or if squeeze_pol is True and Njones = 1. """ return self._slice_array( self._parse_key(ant, jpol=jpol), self.quality_array, squeeze_pol=squeeze_pol )
[docs] def get_time_array(self): """ Get a time array of calibration solution times. Times are for the center of the integration, shape (Ntimes), units in Julian Date. If a time_range is defined on the object, the times are the mean of the start and stop times for each range. Otherwise return the time_array (which can be None). """ if self.time_range is not None: return np.mean(self.time_range, axis=1) else: return self.time_array
[docs] def get_lst_array(self, *, astrometry_library=None): """ Get an lst array of calibration solution lsts. LSTs are for the center of the integration, shape (Ntimes), units in radians. If an lst_range is defined on the object, the lsts are the LST of the mean of the start and stop times for each range. Otherwise return the lst_array (which can be None). Parameters ---------- astrometry_library : str Library used for calculating LSTs. Allowed options are 'erfa' (which uses the pyERFA), 'novas' (which uses the python-novas library), and 'astropy' (which uses the astropy utilities). Default is erfa unless the telescope_location frame is MCMF (on the moon), in which case the default is astropy. """ if self.lst_range is not None: return utils.get_lst_for_time( jd_array=self.get_time_array(), telescope_loc=self.telescope.location ) else: return self.lst_array
[docs] def reorder_antennas( self, order="number", *, run_check=True, check_extra=True, run_check_acceptability=True, ): """ Arrange the antenna axis according to desired order. Parameters ---------- order: str or array like of int If a string, allowed values are "name" and "number" to sort on the antenna name or number respectively. A '-' can be prepended to signify descending order instead of the default ascending order (e.g. "-number"). An array of integers of length Nants representing indexes along the existing `ant_array` can also be supplied to sort in any desired order (note these are indices into the `ant_array` not antenna numbers). Returns ------- None Raises ------ ValueError Raised if order is not an allowed string or is an array that does not contain all the required numbers. """ if isinstance(order, np.ndarray | list | tuple): order = np.array(order) if not order.size == self.Nants_data or not np.all( np.sort(order) == np.arange(self.Nants_data) ): raise ValueError( "If order is an index array, it must contain all indices for the" "ant_array, without duplicates." ) index_array = order else: if order not in ["number", "name", "-number", "-name"]: raise ValueError( "order must be one of 'number', 'name', '-number', '-name' or an " "index array of length Nants_data" ) if "number" in order: index_array = np.argsort(self.ant_array) elif "name" in order: name_map = dict( zip( self.telescope.antenna_numbers, self.telescope.antenna_names, strict=True, ) ) index_array = np.argsort([name_map[ant] for ant in self.ant_array]) if order[0] == "-": index_array = np.flip(index_array) self._select_along_param_axis({"Nants_data": index_array}) if run_check: self.check( check_extra=check_extra, run_check_acceptability=run_check_acceptability )
[docs] def reorder_freqs( self, *, spw_order=None, channel_order=None, select_spw=None, run_check=True, check_extra=True, run_check_acceptability=True, ): """ Arrange the frequency axis according to desired order. Parameters ---------- spw_order : str or array_like of int A string describing the desired order of spectral windows along the frequecy axis. Allowed strings include `number` (sort on spectral window number) and `freq` (sort on median frequency). A '-' can be prepended to signify descending order instead of the default ascending order, e.g., if you have SPW #1 and 2, and wanted them ordered as [2, 1], you would specify `-number`. Alternatively, one can supply an index array of length Nspws that specifies how to shuffle the spws (this is not the desired final spw order). Default is to apply no sorting of spectral windows. channel_order : str or array_like of int A string describing the desired order of frequency channels within a spectral window. Allowed strings are "freq" and "-freq", which will sort channels within a spectral window by ascending or descending frequency respectively. Alternatively, one can supply an index array of length Nfreqs that specifies the new order. Default is to apply no sorting of channels within a single spectral window. Note that proving an array_like of ints will cause the values given to `spw_order` and `select_spw` to be ignored. select_spw : int or array_like of int An int or array_like of ints which specifies which spectral windows to apply sorting. Note that setting this argument will cause the value given to `spw_order` to be ignored. Returns ------- None Raises ------ ValueError Raised if select_spw contains values not in spw_array, or if channel_order is not the same length as freq_array. """ if self.Nspws == 1 and self.Nfreqs == 1: warnings.warn( "Cannot reorder the frequency/spw axis with only one frequency and " "spw. Returning the object unchanged." ) return if self.wide_band: # we have an spw axis, not a frequency axis if channel_order is not None or select_spw is not None: warnings.warn( "channel_order and select_spws are ignored for wide-band " "calibration solutions" ) if spw_order is None: # default sensibly spw_order = "number" flip_spws = spw_order[0] == "-" if isinstance(spw_order, np.ndarray | list | tuple): spw_order = np.asarray(spw_order) if not spw_order.size == self.Nspws or not np.all( np.sort(spw_order) == np.arange(self.Nspws) ): raise ValueError( "If spw_order is an array, it must contain all indices for " "the spw_array, without duplicates." ) index_array = np.asarray( [ np.nonzero(self.spw_array == spw)[0][0] for spw in self.spw_array[spw_order] ] ) else: if spw_order not in ["number", "freq", "-number", "-freq", None]: raise ValueError( "spw_order can only be one of 'number', '-number', " "'freq', '-freq', None or an index array of length Nspws" ) if "number" in spw_order: index_array = np.argsort(self.spw_array) elif "freq" in spw_order: mean_freq = np.mean(self.freq_range, axis=1) index_array = np.argsort(mean_freq) if flip_spws: index_array = np.flip(index_array) self._select_along_param_axis({"Nspws": index_array}) else: index_array = utils.frequency._sort_freq_helper( Nfreqs=self.Nfreqs, freq_array=self.freq_array, Nspws=self.Nspws, spw_array=self.spw_array, flex_spw_id_array=self.flex_spw_id_array, spw_order=spw_order, channel_order=channel_order, select_spw=select_spw, ) self._select_along_param_axis({"Nfreqs": index_array}) if (self.flex_spw_id_array is not None) and (self.Nspws > 1): # Reorder the spw-axis items based on their first appearance in the data # Note that the dict will preserve first order. new_spw = dict.fromkeys(self.flex_spw_id_array) spw_map = {spw: idx for idx, spw in enumerate(self.spw_array)} self._select_along_param_axis({"Nspws": [spw_map[k] for k in new_spw]}) if run_check: self.check( check_extra=check_extra, run_check_acceptability=run_check_acceptability )
[docs] def reorder_times( self, order="time", *, run_check=True, check_extra=True, run_check_acceptability=True, ): """ Arrange the time axis according to desired order. Parameters ---------- order: str or array like of int If a string, allowed value is "time" or "-time" to sort on the time in ascending or descending order respectively. An array of integers of length Ntimes representing indexes along the existing `time_array` or `time_range` can also be supplied to sort in any desired order. Returns ------- None Raises ------ ValueError Raised if order is not an allowed string or is an array that does not contain all the required indices. """ if isinstance(order, np.ndarray | list | tuple): order = np.array(order) if not order.size == self.Ntimes or not np.all( np.sort(order) == np.arange(self.Ntimes) ): raise ValueError( "If order is an array, it must contain all indices for " "the time axis, without duplicates." ) index_array = order else: if order not in ["time", "-time"]: raise ValueError( "order must be one of 'time', '-time' or an " "index array of length Ntimes" ) if self.time_array is not None and self.time_range is not None: raise ValueError( "The time_array and time_range attributes are both set." ) if self.time_range is not None: index_array = np.argsort(self.time_range[:, 0]) else: index_array = np.argsort(self.time_array) if order[0] == "-": index_array = np.flip(index_array) self._select_along_param_axis({"Ntimes": index_array}) if run_check: self.check( check_extra=check_extra, run_check_acceptability=run_check_acceptability )
[docs] def reorder_jones( self, order="name", *, run_check=True, check_extra=True, run_check_acceptability=True, ): """ Arrange the jones element axis according to desired order. Parameters ---------- order: str or array like of int If a string, allowed values are "name" and "number" to sort on the jones element name or number respectively. A '-' can be prepended to signify descending order instead of the default ascending order (e.g. "-number"). An array of integers of length Njones representing indexes along the existing `jones_array` can also be supplied to sort in any desired order. Returns ------- None Raises ------ ValueError Raised if order is not an allowed string or is an array that does not contain all the required indices. """ if isinstance(order, np.ndarray | list | tuple): order = np.array(order) if not order.size == self.Njones or not np.all( np.sort(order) == np.arange(self.Njones) ): raise ValueError( "If order is an array, it must contain all indices for " "the jones axis, without duplicates." ) index_array = order else: if order not in ["number", "name", "-number", "-name"]: raise ValueError( "order must be one of 'number', 'name', '-number', '-name' or an " "index array of length Njones" ) if "number" in order: index_array = np.argsort(self.jones_array) elif "name" in order: name_array = np.asarray( utils.jnum2str( self.jones_array, x_orientation=self.telescope.get_x_orientation_from_feeds(), ) ) index_array = np.argsort(name_array) if order[0] == "-": index_array = np.flip(index_array) self._select_along_param_axis({"Njones": index_array}) if run_check: self.check( check_extra=check_extra, run_check_acceptability=run_check_acceptability )
[docs] def convert_to_gain( self, *, freq_array, channel_width, delay_convention="minus", run_check=True, check_extra=True, run_check_acceptability=True, ): """ Convert non-gain cal_types to gains. For the delay cal_type the gain is calculated as: gain = 1 * exp((+/-) * 2 * pi * j * delay * frequency) where the (+/-) is dictated by the delay_convention Parameters ---------- delay_convention : str Exponent sign to use in the conversion, can be "plus" or "minus". freq_array : array of float Frequencies to convert to gain at, units Hz. Required, shape (Nfreqs,). channel_width : array of float Channel widths for each channel, units Hz. Required, shape (Nfreqs,). run_check : bool Option to check for the existence and proper shapes of parameters after converting. check_extra : bool Option to check optional parameters as well as required ones. run_check_acceptability : bool Option to check acceptable range of the values of parameters after converting. """ if self.cal_type == "gain": raise ValueError("The data is already a gain cal_type.") if self.Nspws > 1: raise ValueError( "convert_to_gain currently does not support multiple spectral windows" ) if delay_convention == "minus": conv = -1 elif delay_convention == "plus": conv = 1 else: raise ValueError("delay_convention can only be 'minus' or 'plus'") if freq_array.ndim > 1: raise ValueError("freq_array parameter must be a one dimensional array") if ( not isinstance(channel_width, np.ndarray) or channel_width.shape != freq_array.shape ): raise ValueError( "The channel_width parameter be an array shaped like the freq_array" ) if self.freq_range is not None: # Already errored if more than one spw, so just use the first one here freq_range_use = self.freq_range[0, :] if np.any(freq_array < freq_range_use[0]) or np.any( freq_array > freq_range_use[1] ): raise ValueError("freq_array contains values outside the freq_range.") freq_array_use = freq_array Nfreqs_use = freq_array.size self.history += " Converted from delays to gains using pyuvdata." phase_array = np.zeros((self.Nants_data, Nfreqs_use, self.Ntimes, self.Njones)) temp = ( conv * 2 * np.pi * np.dot( self.delay_array[:, 0, :, :, np.newaxis], freq_array_use[np.newaxis, :] ) ) temp = np.transpose(temp, (0, 3, 1, 2)) phase_array = temp gain_array = np.exp(1j * phase_array) freq_axis = 1 self._set_gain() self._set_wide_band(wide_band=False) self.channel_width = channel_width self.freq_range = None self.gain_array = gain_array self.delay_array = None if self.quality_array is not None: new_quality = np.repeat(self.quality_array, Nfreqs_use, axis=freq_axis) self.quality_array = new_quality new_flag_array = np.repeat(self.flag_array, Nfreqs_use, axis=freq_axis) self.flag_array = new_flag_array if self.total_quality_array is not None: freq_axis = 0 new_total_quality_array = np.repeat( self.total_quality_array, Nfreqs_use, axis=freq_axis ) self.total_quality_array = new_total_quality_array self.freq_array = freq_array_use self.Nfreqs = Nfreqs_use self.flex_spw_id_array = np.full(self.Nfreqs, self.spw_array[0], dtype=int) # check if object is self-consistent if run_check: self.check( check_extra=check_extra, run_check_acceptability=run_check_acceptability )
def __add__( self, other, *, verbose_history=False, run_check=True, check_extra=True, run_check_acceptability=True, inplace=False, ignore_name=False, warn_spacing=False, ): """ Combine two UVCal objects along antenna, frequency, time, and/or Jones axis. Parameters ---------- other : :class: UVCal Another UVCal object which will be added to self. verbose_history : bool Option to allow more verbose history. If True and if the histories for the two objects are different, the combined object will keep all the history of both input objects (if many objects are combined in succession this can lead to very long histories). If False and if the histories for the two objects are different, the combined object will have the history of the first object and only the parts of the second object history that are unique (this is done word by word and can result in hard to interpret histories). run_check : bool Option to check for the existence and proper shapes of parameters after combining objects. check_extra : bool Option to check optional parameters as well as required ones. run_check_acceptability : bool Option to check acceptable range of the values of parameters after combining objects. inplace : bool Option to overwrite self as we go, otherwise create a third object as the sum of the two. ignore_name : bool Option to ignore the name of the phase center (`cat_name` in `phase_center_catalog`) when combining two UVCal objects. If set to True, phase centers that are the same up to their name will be combined with the name set to the name found in the first UVCal object in the sum. If set to False, phase centers that are the same up to the name will be kept as separate phase centers. Default is False. warn_spacing : bool Option to raise warnings about spacing that would prevent writing to calfits file-format. Default is False. """ if inplace: this = self else: this = self.copy() # Check that both objects are UVCal and valid this.check( check_extra=check_extra, run_check_acceptability=run_check_acceptability ) if not issubclass(other.__class__, this.__class__) and not issubclass( this.__class__, other.__class__ ): raise ValueError( "Only UVCal (or subclass) objects can be added to " "a UVCal (or subclass) object" ) other.check( check_extra=check_extra, run_check_acceptability=run_check_acceptability ) if (this.flex_jones_array is None) != (other.flex_jones_array is None): raise ValueError( "To combine these data, both objects must be either set to regular " "or flex-jones." ) if (this.ref_antenna_array is None) != (other.ref_antenna_array is None): raise ValueError( "To combine these data, both or neither object must have the " "ref_antenna_array parameter set." ) # Check that the objects both (or neither) have catalogs and phase ID arrays this_pc_cat = this.phase_center_catalog is None other_pc_cat = other.phase_center_catalog is None this_pc_ids = this.phase_center_id_array is None other_pc_ids = other.phase_center_id_array is None if (this_pc_cat != other_pc_cat) or (this_pc_ids != other_pc_ids): raise ValueError( "To combine these data, phase_center_id_array and " "_phase_center_catalog must be set for all objects." ) # Check that both objects are either wide_band or not if this.wide_band != other.wide_band: raise ValueError( "To combine these data, wide_band must be set to the same " "value (True or False) for both objects." ) # check that both either have or don't have time_array and time_range _time_param_check(this, other) # Check objects are compatible compatibility_params = [ "_cal_type", "_gain_convention", "_cal_style", "_ref_antenna_name", ] warning_params = [ "_observer", "_git_hash_cal", "_sky_catalog", "_Nsources", "_baseline_range", "_diffuse_model", ] for a in compatibility_params: if getattr(this, a) != getattr(other, a): msg = ( "UVParameter " + a[1:] + " does not match. Cannot combine objects." ) raise ValueError(msg) for a in warning_params: if getattr(this, a) != getattr(other, a): msg = "UVParameter " + a[1:] + " does not match. Combining anyway." warnings.warn(msg) # Build up history string history_update_string = " Combined data along " n_axes = 0 # Check we don't have overlapping data both_jones, this_jones_ind, other_jones_ind = np.intersect1d( this.jones_array, other.jones_array, return_indices=True ) if this.time_range is not None: if utils.tools._check_range_overlap( np.concatenate((this.time_range, other.time_range), axis=0) ): raise ValueError("A time_range overlaps in the two objects.") both_times, this_times_ind, other_times_ind = np.intersect1d( this.time_range[:, 0], other.time_range[:, 0], return_indices=True ) else: both_times, this_times_ind, other_times_ind = np.intersect1d( this.time_array, other.time_array, return_indices=True ) if this.wide_band: # this is really about spws, but that replaces the freq axis for wide_band both_freq, this_freq_ind, other_freq_ind = np.intersect1d( this.spw_array, other.spw_array, return_indices=True ) else: # With non-wideband objects, the handling here becomes a bit funky, # because we are allowed to have channels with the same frequency *if* they # belong to different spectral windows (one real-life example: might want # to preserve guard bands in the correlator, which can have overlaping RF # frequency channels) this_freq_ind = np.array([], dtype=np.int64) other_freq_ind = np.array([], dtype=np.int64) both_freq = np.array([], dtype=float) both_spw = np.intersect1d(this.spw_array, other.spw_array) for idx in both_spw: this_mask = np.where(this.flex_spw_id_array == idx)[0] other_mask = np.where(other.flex_spw_id_array == idx)[0] both_spw_freq, this_spw_ind, other_spw_ind = np.intersect1d( this.freq_array[this_mask], other.freq_array[other_mask], return_indices=True, ) this_freq_ind = np.append(this_freq_ind, this_mask[this_spw_ind]) other_freq_ind = np.append(other_freq_ind, other_mask[other_spw_ind]) both_freq = np.append(both_freq, both_spw_freq) both_ants, this_ants_ind, other_ants_ind = np.intersect1d( this.ant_array, other.ant_array, return_indices=True ) if ( len(both_jones) > 0 and len(both_times) > 0 and len(both_freq) > 0 and len(both_ants) > 0 ): raise ValueError( "These objects have overlapping data and cannot be combined." ) # First, handle the internal source catalogs, since merging them is kind of a # weird, one-off process (i.e., nothing is cat'd across a particular axis) if this.phase_center_catalog is not None: this._consolidate_phase_center_catalogs( other=other, ignore_name=ignore_name ) if ( (len(both_times) > 0) and (this.phase_center_id_array is not None) and not np.array_equal( this.phase_center_id_array[this_times_ind], other.phase_center_id_array[other_times_ind], ) ): # TODO: I think this check actually needs to be expanded to other # attributes, and along other axes raise ValueError( "Cannot combine objects due to overlapping times with " "different phase centers." ) if ( (len(both_times) > 0) and (this.ref_antenna_array is not None) and not np.array_equal( this.ref_antenna_array[this_times_ind], other.ref_antenna_array[other_times_ind], ) ): raise ValueError( "Cannot combine objects due to overlapping times with " "different reference antennas." ) # Handle the telescope object in a special fashion, since there can be partially # overlapping information that we want to preserve here. Note that this will # also do equality checking under the hood. this.telescope += other.telescope # Next, we want to make sure that the ordering of the _overlapping_ data is # the same, so that things can get plugged together in a sensible way. if len(this_ants_ind) != 0: this_argsort = np.argsort(this_ants_ind) other_argsort = np.argsort(other_ants_ind) if np.any(this_argsort != other_argsort): temp_ind = np.arange(this.Nants_data) temp_ind[this_ants_ind[this_argsort]] = temp_ind[ this_ants_ind[other_argsort] ] this.reorder_antennas(temp_ind) if len(this_times_ind) != 0: this_argsort = np.argsort(this_times_ind) other_argsort = np.argsort(other_times_ind) if np.any(this_argsort != other_argsort): temp_ind = np.arange(this.Ntimes) temp_ind[this_times_ind[this_argsort]] = temp_ind[ this_times_ind[other_argsort] ] this.reorder_times(temp_ind) if len(this_freq_ind) != 0: this_argsort = np.argsort(this_freq_ind) other_argsort = np.argsort(other_freq_ind) if np.any(this_argsort != other_argsort): if this.wide_band: temp_ind = np.arange(this.Nspws) temp_ind[this_freq_ind[this_argsort]] = temp_ind[ this_freq_ind[other_argsort] ] this.reorder_freqs(spw_order=temp_ind) else: temp_ind = np.arange(this.Nfreqs) temp_ind[this_freq_ind[this_argsort]] = temp_ind[ this_freq_ind[other_argsort] ] this.reorder_freqs(channel_order=temp_ind) if len(this_jones_ind) != 0: this_argsort = np.argsort(this_jones_ind) other_argsort = np.argsort(other_jones_ind) if np.any(this_argsort != other_argsort): temp_ind = np.arange(this.Njones) temp_ind[this_jones_ind[this_argsort]] = temp_ind[ this_jones_ind[other_argsort] ] this.reorder_jones(temp_ind) # Update filename parameter this.filename = utils.tools._combine_filenames(this.filename, other.filename) if this.filename is not None: this._filename.form = (len(this.filename),) temp = np.nonzero(~np.isin(other.ant_array, this.ant_array))[0] if len(temp) > 0: anew_inds = temp history_update_string += "antenna" n_axes += 1 else: anew_inds = [] if this.time_range is not None: temp = np.nonzero(~np.isin(other.time_range[:, 0], this.time_range[:, 0]))[ 0 ] else: temp = np.nonzero(~np.isin(other.time_array, this.time_array))[0] if len(temp) > 0: tnew_inds = temp if n_axes > 0: history_update_string += ", time" else: history_update_string += "time" n_axes += 1 else: tnew_inds = [] if this.wide_band: temp = np.nonzero(~np.isin(other.spw_array, this.spw_array))[0] if len(temp) > 0: fnew_inds = temp if n_axes > 0: history_update_string += ", spectral window" else: history_update_string += "spectral window" n_axes += 1 else: fnew_inds = [] else: # find the freq indices in "other" but not in "this" other_mask = np.ones_like(other.flex_spw_id_array, dtype=bool) for idx in np.intersect1d(this.spw_array, other.spw_array): other_mask[other.flex_spw_id_array == idx] = np.isin( other.freq_array[other.flex_spw_id_array == idx], this.freq_array[this.flex_spw_id_array == idx], invert=True, ) temp = np.where(other_mask)[0] if len(temp) > 0: fnew_inds = temp if n_axes > 0: history_update_string += ", frequency" else: history_update_string += "frequency" n_axes += 1 else: fnew_inds = [] temp = np.nonzero(~np.isin(other.jones_array, this.jones_array))[0] if len(temp) > 0: jnew_inds = temp if n_axes > 0: history_update_string += ", jones" else: history_update_string += "jones" n_axes += 1 else: jnew_inds = [] # Initialize tqa variables can_combine_tqa = True if this.cal_type == "delay": Nf_tqa = 1 else: Nf_tqa = this.Nfreqs if ( not self.metadata_only and this.quality_array is None and other.quality_array is not None ): this.quality_array = np.zeros( this._quality_array.expected_shape(this), dtype=float ) # Pad out self to accommodate new data ant_order = None if len(anew_inds) > 0: this.ant_array = np.concatenate( [this.ant_array, other.ant_array[anew_inds]] ) ant_order = np.argsort(this.ant_array) if not this.metadata_only: data_array_shape = getattr(this, this._required_data_params[0]).shape zero_pad_data = np.zeros( (len(anew_inds), data_array_shape[1], this.Ntimes, this.Njones) ) zero_pad_flags = np.zeros( (len(anew_inds), data_array_shape[1], this.Ntimes, this.Njones) ) if this.cal_type == "delay": this.delay_array = np.concatenate( [this.delay_array, zero_pad_data], axis=0 ) else: this.gain_array = np.concatenate( [this.gain_array, zero_pad_data], axis=0 ) this.flag_array = np.concatenate( [this.flag_array, 1 - zero_pad_flags], axis=0 ).astype(np.bool_) if this.quality_array is not None: this.quality_array = np.concatenate( [this.quality_array, zero_pad_data], axis=0 ) # If total_quality_array exists, we set it to None and warn the user if ( this.total_quality_array is not None or other.total_quality_array is not None ): warnings.warn( "Total quality array detected in at least one file; the " "array in the new object will be set to 'None' because " "whole-array values cannot be combined when adding antennas" ) this.total_quality_array = None can_combine_tqa = False f_order = None if len(fnew_inds) > 0: if this.wide_band: this.spw_array = np.concatenate( [this.spw_array, other.spw_array[fnew_inds]] ) this.freq_range = np.concatenate( [this.freq_range, other.freq_range[fnew_inds]], axis=0 ) if this.flex_jones_array is not None: this.flex_jones_array = np.concatenate( [this.flex_jones_array, other.flex_jones_array[fnew_inds]], axis=0, ) f_order = np.argsort(this.spw_array) else: this.flex_spw_id_array = np.concatenate( [this.flex_spw_id_array, other.flex_spw_id_array[fnew_inds]] ) this.freq_array = np.concatenate( [this.freq_array, other.freq_array[fnew_inds]] ) this.channel_width = np.concatenate( [this.channel_width, other.channel_width[fnew_inds]] ) # We want to preserve per-spw information based on first appearance # in the concatenated array. unique_index = np.sort( np.unique(this.flex_spw_id_array, return_index=True)[1] ) this.spw_array = np.sort(this.flex_spw_id_array[unique_index]) if this.flex_jones_array is not None: concat_spw_array = np.concatenate([this.spw_array, other.spw_array]) spw_index = np.asarray( [ np.nonzero(concat_spw_array == spw)[0][0] for spw in this.spw_array ] ) this.flex_jones_array = np.concatenate( [this.flex_jones_array, other.flex_jones_array] ) this.flex_jones_array = this.flex_jones_array[spw_index] # If we have a multi-spw data set, need to sort out the order of # the individual windows first. f_order = np.concatenate( [ np.where(this.flex_spw_id_array == idx)[0] for idx in this.spw_array ] ) # With spectral windows sorted, check and see if channels within # windows need sorting. If they are ordered in ascending or descending # fashion, leave them be. If not, sort in ascending order for idx in this.spw_array: select_mask = this.flex_spw_id_array[f_order] == idx check_freqs = this.freq_array[f_order[select_mask]] if (not np.all(check_freqs[1:] > check_freqs[:-1])) and ( not np.all(check_freqs[1:] < check_freqs[:-1]) ): subsort_order = f_order[select_mask] f_order[select_mask] = subsort_order[np.argsort(check_freqs)] if not self.metadata_only: data_array_shape = getattr(this, this._required_data_params[0]).shape zero_pad = np.zeros( (data_array_shape[0], len(fnew_inds), this.Ntimes, this.Njones) ) if this.cal_type == "gain": this.gain_array = np.concatenate( [this.gain_array, zero_pad], axis=1 ) else: this.delay_array = np.concatenate( [this.delay_array, zero_pad], axis=1 ) this.flag_array = np.concatenate( [this.flag_array, 1 - zero_pad], axis=1 ).astype(np.bool_) if this.quality_array is not None: this.quality_array = np.concatenate( [this.quality_array, zero_pad], axis=1 ) if this.total_quality_array is not None and can_combine_tqa: zero_pad = np.zeros((len(fnew_inds), this.Ntimes, this.Njones)) this.total_quality_array = np.concatenate( [this.total_quality_array, zero_pad], axis=0 ) elif other.total_quality_array is not None and can_combine_tqa: zero_pad = np.zeros((len(fnew_inds), this.Ntimes, this.Njones)) this.total_quality_array = np.zeros( (Nf_tqa, this.Ntimes, this.Njones) ) this.total_quality_array = np.concatenate( [this.total_quality_array, zero_pad], axis=0 ) t_order = None if len(tnew_inds) > 0: if this.time_array is not None: this.time_array = np.concatenate( [this.time_array, other.time_array[tnew_inds]] ) if this.time_range is not None: this.time_range = np.concatenate( [this.time_range, other.time_range[tnew_inds]] ) if this.lst_array is not None: this.lst_array = np.concatenate( [this.lst_array, other.lst_array[tnew_inds]] ) if this.lst_range is not None: this.lst_range = np.concatenate( [this.lst_range, other.lst_range[tnew_inds]] ) if this.phase_center_id_array is not None: this.phase_center_id_array = np.concatenate( [this.phase_center_id_array, other.phase_center_id_array[tnew_inds]] ) if this.ref_antenna_array is not None: this.ref_antenna_array = np.concatenate( [this.ref_antenna_array, other.ref_antenna_array[tnew_inds]] ) if this.time_range is not None: t_order = np.argsort(this.time_range[:, 0]) else: t_order = np.argsort(this.time_array) this.integration_time = np.concatenate( [this.integration_time, other.integration_time[tnew_inds]] ) if not self.metadata_only: data_array_shape = getattr(this, this._required_data_params[0]).shape zero_pad_data = np.zeros( ( data_array_shape[0], data_array_shape[1], len(tnew_inds), this.Njones, ) ) zero_pad_flags = np.zeros( ( this.flag_array.shape[0], this.flag_array.shape[1], len(tnew_inds), this.Njones, ) ) if this.cal_type == "delay": this.delay_array = np.concatenate( [this.delay_array, zero_pad_data], axis=2 ) else: this.gain_array = np.concatenate( [this.gain_array, zero_pad_data], axis=2 ) this.flag_array = np.concatenate( [this.flag_array, 1 - zero_pad_flags], axis=2 ).astype(np.bool_) if this.quality_array is not None: this.quality_array = np.concatenate( [this.quality_array, zero_pad_data], axis=2 ) if this.total_quality_array is not None and can_combine_tqa: zero_pad = np.zeros( (this.quality_array.shape[1], len(tnew_inds), this.Njones) ) this.total_quality_array = np.concatenate( [this.total_quality_array, zero_pad], axis=1 ) elif other.total_quality_array is not None and can_combine_tqa: zero_pad = np.zeros( (this.quality_array.shape[1], len(tnew_inds), this.Njones) ) this.total_quality_array = np.zeros( (Nf_tqa, this.Ntimes, this.Njones) ) this.total_quality_array = np.concatenate( [this.total_quality_array, zero_pad], axis=1 ) j_order = None if len(jnew_inds) > 0: this.jones_array = np.concatenate( [this.jones_array, other.jones_array[jnew_inds]] ) j_order = np.argsort(np.abs(this.jones_array)) if not self.metadata_only: data_array_shape = getattr(this, this._required_data_params[0]).shape zero_pad_data = np.zeros( ( data_array_shape[0], data_array_shape[1], data_array_shape[2], len(jnew_inds), ) ) zero_pad_flags = np.zeros( ( this.flag_array.shape[0], this.flag_array.shape[1], this.flag_array.shape[2], len(jnew_inds), ) ) if this.cal_type == "delay": this.delay_array = np.concatenate( [this.delay_array, zero_pad_data], axis=3 ) else: this.gain_array = np.concatenate( [this.gain_array, zero_pad_data], axis=3 ) this.flag_array = np.concatenate( [this.flag_array, 1 - zero_pad_flags], axis=3 ).astype(np.bool_) if this.quality_array is not None: this.quality_array = np.concatenate( [this.quality_array, zero_pad_data], axis=3 ) if this.total_quality_array is not None and can_combine_tqa: zero_pad = np.zeros( (data_array_shape[1], data_array_shape[2], len(jnew_inds)) ) this.total_quality_array = np.concatenate( [this.total_quality_array, zero_pad], axis=2 ) elif other.total_quality_array is not None and can_combine_tqa: zero_pad = np.zeros( (data_array_shape[1], data_array_shape[2], len(jnew_inds)) ) this.total_quality_array = np.zeros( (Nf_tqa, this.Ntimes, this.Njones) ) this.total_quality_array = np.concatenate( [this.total_quality_array, zero_pad], axis=2 ) # Now populate the data if not self.metadata_only: jones_t2o = np.nonzero(np.isin(this.jones_array, other.jones_array))[0] if this.time_range is not None: times_t2o = np.nonzero( (np.isin(this.time_range[:, 0], other.time_range[:, 0])) & (np.isin(this.time_range[:, 1], other.time_range[:, 1])) )[0] else: times_t2o = np.nonzero(np.isin(this.time_array, other.time_array))[0] if self.wide_band: freqs_t2o = np.nonzero(np.isin(this.spw_array, other.spw_array))[0] else: freqs_t2o = np.zeros(this.freq_array.shape, dtype=bool) for spw_id in set(this.spw_array).intersection(other.spw_array): mask = this.flex_spw_id_array == spw_id freqs_t2o[mask] |= np.isin( this.freq_array[mask], other.freq_array[other.flex_spw_id_array == spw_id], ) freqs_t2o = np.nonzero(freqs_t2o)[0] ants_t2o = np.nonzero(np.isin(this.ant_array, other.ant_array))[0] if this.cal_type == "delay": this.delay_array[np.ix_(ants_t2o, freqs_t2o, times_t2o, jones_t2o)] = ( other.delay_array ) else: this.gain_array[np.ix_(ants_t2o, freqs_t2o, times_t2o, jones_t2o)] = ( other.gain_array ) if other.quality_array is not None: this.quality_array[ np.ix_(ants_t2o, freqs_t2o, times_t2o, jones_t2o) ] = other.quality_array this.flag_array[np.ix_(ants_t2o, freqs_t2o, times_t2o, jones_t2o)] = ( other.flag_array ) if ( this.total_quality_array is not None and other.total_quality_array is not None ): this.total_quality_array[np.ix_(freqs_t2o, times_t2o, jones_t2o)] = ( other.total_quality_array ) # Fix ordering ant_axis_num = 0 faxis_num = 1 taxis_num = 2 jaxis_num = 3 axis_dict = { ant_axis_num: {"inds": anew_inds, "order": ant_order}, faxis_num: {"inds": fnew_inds, "order": f_order}, taxis_num: {"inds": tnew_inds, "order": t_order}, jaxis_num: {"inds": jnew_inds, "order": j_order}, } for axis, subdict in axis_dict.items(): for name, param in zip( this._data_params, this.data_like_parameters, strict=True ): if param is None: continue axis_delta = 0 if name == "total_quality_array": axis_delta = 1 if len(subdict["inds"]) > 0: unique_order_diffs = np.unique(np.diff(subdict["order"])) if np.array_equal(unique_order_diffs, np.array([1])): # everything is already in order continue setattr( this, name, np.take(param, subdict["order"], axis=axis - axis_delta), ) if len(anew_inds) > 0: this.ant_array = this.ant_array[ant_order] if len(fnew_inds) > 0: if this.wide_band: this.spw_array = this.spw_array[f_order] this.freq_range = this.freq_range[f_order] else: this.freq_array = this.freq_array[f_order] this.channel_width = this.channel_width[f_order] this.flex_spw_id_array = this.flex_spw_id_array[f_order] if len(tnew_inds) > 0: if this.time_array is not None: this.time_array = this.time_array[t_order] if this.time_range is not None: this.time_range = this.time_range[t_order] if this.lst_array is not None: this.lst_array = this.lst_array[t_order] if this.lst_range is not None: this.lst_range = this.lst_range[t_order] this.integration_time = this.integration_time[t_order] if self.phase_center_id_array is not None: this.phase_center_id_array = this.phase_center_id_array[t_order] if self.ref_antenna_array is not None: this.ref_antenna_array = this.ref_antenna_array[t_order] if len(jnew_inds) > 0: this.jones_array = this.jones_array[j_order] # Update N parameters (e.g. Njones) this.Njones = this.jones_array.shape[0] if this.time_range is not None: this.Ntimes = this.time_range.shape[0] else: this.Ntimes = this.time_array.shape[0] if this.cal_type == "gain" and not this.wide_band: this.Nfreqs = this.freq_array.size this.Nspws = this.spw_array.size this.Nants_data = len( np.unique(this.ant_array.tolist() + other.ant_array.tolist()) ) if n_axes > 0: history_update_string += " axis using pyuvdata." histories_match = utils.history._check_histories( this.history, other.history ) this.history += history_update_string if not histories_match: if verbose_history: this.history += " Next object history follows. " + other.history else: extra_history = utils.history._combine_history_addition( this.history, other.history ) if extra_history is not None: this.history += ( " Unique part of next object history follows. " + extra_history ) # Check final object is self-consistent if run_check: this.check( check_extra=check_extra, run_check_acceptability=run_check_acceptability, check_freq_spacing=warn_spacing, check_jones_spacing=warn_spacing, check_time_spacing=warn_spacing, raise_spacing_errors=False, ) if not inplace: return this def __iadd__( self, other, *, run_check=True, check_extra=True, run_check_acceptability=True, ignore_name=False, warn_spacing=False, ): """ Combine two UVCal objects in place. Along antenna, frequency, time, and/or Jones axis. Parameters ---------- other : :class: UVCal Another UVCal object which will be added to self. run_check : bool Option to check for the existence and proper shapes of parameters after combining objects. check_extra : bool Option to check optional parameters as well as required ones. run_check_acceptability : bool Option to check acceptable range of the values of parameters after combining objects. ignore_name : bool Option to ignore the name of the phase center (`cat_name` in `phase_center_catalog`) when combining two UVCal objects. If set to True, phase centers that are the same up to their name will be combined with the name set to the name found in the first UVCal object in the sum. If set to False, phase centers that are the same up to the name will be kept as separate phase centers. Default is False. warn_spacing : bool Option to raise warnings about spacing that would prevent writing to calfits file-format. Default is False. """ self.__add__( other, inplace=True, run_check=run_check, check_extra=check_extra, run_check_acceptability=run_check_acceptability, ignore_name=ignore_name, warn_spacing=warn_spacing, ) return self
[docs] def fast_concat( self, other, axis, *, ignore_name=None, inplace=False, verbose_history=False, run_check=True, check_extra=True, run_check_acceptability=True, warn_spacing=False, ): """ Concatenate two UVCal objects along specified axis with almost no checking. Warning! This method assumes all the metadata along other axes is sorted the same way. The __add__ method is much safer, it checks all the metadata, but it is slower. Some quick checks are run, but this method doesn't make any guarantees that the resulting object is correct. Parameters ---------- other : UVCal object or list of UVCal objects UVCal object or list of UVCal objects which will be added to self. axis : str Axis to concatenate files along. This enables fast concatenation along the specified axis without the normal checking that all other metadata agrees. Allowed values are: 'antenna', 'time', 'freq', 'spw', 'jones' ('freq' is not allowed for delay or wideband objects and 'spw' is only allowed for wideband objects). ignore_name : bool Option to ignore the name of the phase center (`cat_name` in `phase_center_catalog`) when combining two UVCal objects. If set to True, phase centers that are the same up to their name will be combined with the name set to the name found in the first UVCal object in the sum. If set to False, phase centers that are the same up to the name will be kept as separate phase centers. Default is False. inplace : bool If True, overwrite self as we go, otherwise create a third object as the sum of the two. verbose_history : bool Option to allow more verbose history. If True and if the histories for the two objects are different, the combined object will keep all the history of both input objects (if many objects are combined in succession this can lead to very long histories). If False and if the histories for the two objects are different, the combined object will have the history of the first object and only the parts of the second object history that are unique (this is done word by word and can result in hard to interpret histories). run_check : bool Option to check for the existence and proper shapes of parameters after combining objects. check_extra : bool Option to check optional parameters as well as required ones. run_check_acceptability : bool Option to check acceptable range of the values of parameters after combining objects. warn_spacing : bool Option to raise warnings about spacing that would prevent writing to calfits file-format. Default is False. """ if inplace: this = self else: this = self.copy() if not isinstance(other, list | tuple | np.ndarray): # if this is a UVCal object already, stick it in a list other = [other] # Check that both objects are UVCal and valid this.check( check_extra=check_extra, run_check_acceptability=run_check_acceptability ) for obj in other: if not issubclass(obj.__class__, this.__class__) and not issubclass( this.__class__, obj.__class__ ): raise ValueError( "Only UVCal (or subclass) objects can be " "added to a UVCal (or subclass) object" ) obj.check( check_extra=check_extra, run_check_acceptability=run_check_acceptability ) # Check that all objects are consistent w/ use of flex-Jones for obj in other: if (this.flex_jones_array is None) != (obj.flex_jones_array is None): raise ValueError( "To combine these data, all objects must be either set to regular " "or flex-jones." ) for obj in other: this_pc_cat = this.phase_center_catalog is None other_pc_cat = obj.phase_center_catalog is None this_pc_ids = this.phase_center_id_array is None other_pc_ids = obj.phase_center_id_array is None if (this_pc_cat != other_pc_cat) or (this_pc_ids != other_pc_ids): raise ValueError( "To combine these data, phase_center_id_array and " "_phase_center_catalog must be set for all objects." ) # Check that all objects are consistent w/ use of wide_band or not for obj in other: if this.wide_band != obj.wide_band: raise ValueError( "To combine these data, wide_band must be set to the same " "value (True or False) for all objects." ) allowed_axes = ["antenna", "time", "jones"] if this.wide_band is True: allowed_axes.append("spw") elif self.cal_type == "gain": allowed_axes.append("freq") if axis not in allowed_axes: raise ValueError("Axis must be one of: " + ", ".join(allowed_axes)) # Check objects are compatible compatibility_params = [ "_cal_type", "_gain_convention", "_cal_style", "_ref_antenna_name", ] warning_params = [ "_observer", "_git_hash_cal", "_sky_catalog", "_Nsources", "_baseline_range", "_diffuse_model", ] history_update_string = " Combined data along " # Check separately for phase_center_id_array, since it's optional if axis != "time" and this.phase_center_id_array is not None: compatibility_params += ["_phase_center_id_array"] if axis == "freq" or axis == "spw": if axis == "freq": history_update_string += "frequency" else: history_update_string += "spectral window" compatibility_params += [ "_jones_array", "_ant_array", "_time_array", "_integration_time", "_lst_array", "_time_range", "_lst_range", ] elif axis == "jones": history_update_string += "jones" compatibility_params += [ "_freq_array", "_channel_width", "_ant_array", "_time_array", "_integration_time", "_lst_array", "_time_range", "_lst_range", ] elif axis == "antenna": history_update_string += "antenna" compatibility_params += [ "_freq_array", "_channel_width", "_jones_array", "_time_array", "_integration_time", "_lst_array", "_time_range", "_lst_range", ] elif axis == "time": history_update_string += "time" compatibility_params += [ "_freq_array", "_channel_width", "_jones_array", "_ant_array", ] _time_param_check(this, other) history_update_string += " axis using pyuvdata." histories_match = [] for obj in other: histories_match.append( utils.history._check_histories(this.history, obj.history) ) this.history += history_update_string for obj_num, obj in enumerate(other): if not histories_match[obj_num]: if verbose_history: this.history += " Next object history follows. " + obj.history else: extra_history = utils.history._combine_history_addition( this.history, obj.history ) if extra_history is not None: this.history += ( " Unique part of next object history follows. " + extra_history ) # Actually check compatibility parameters # Handle the telescope object in a special fashion, since there can be partially # overlapping information that we want to preserve here. tel_obj = this.telescope.copy() if inplace else this.telescope for obj in other: tel_obj += obj.telescope for a in compatibility_params: params_match = getattr(this, a) == getattr(obj, a) if not params_match: msg = ( "UVParameter " + a[1:] + " does not match. Cannot combine objects." ) raise ValueError(msg) for a in warning_params: params_match = getattr(this, a) == getattr(obj, a) if not params_match: msg = "UVParameter " + a[1:] + " does not match. Combining anyway." warnings.warn(msg) this.telescope = tel_obj # update the phase_center_catalog to make them consistent across objects # Doing this as a binary tree merge # The left object in each loop will have its phase center IDs updated. if this.phase_center_catalog is not None: uv_list = [this] + other while len(uv_list) > 1: # for an odd number of files, the second argument will be shorter # so the last element in the first list won't be combined, but it # will not be lost, so it's ok. for uv1, uv2 in zip(uv_list[0::2], uv_list[1::2], strict=False): uv1._consolidate_phase_center_catalogs( other=uv2, ignore_name=ignore_name ) uv_list = uv_list[0::2] total_quality_exists = [this.total_quality_array is not None] + [ obj.total_quality_array is not None for obj in other ] this_tqa_exp_shape = this._total_quality_array.expected_shape(this) quality_exists = [this.quality_array is not None] + [ obj.quality_array is not None for obj in other ] this_qa_exp_shape = this._quality_array.expected_shape(this) if axis == "antenna": this.Nants_data = sum([this.Nants_data] + [obj.Nants_data for obj in other]) this.ant_array = np.concatenate( [this.ant_array] + [obj.ant_array for obj in other] ) axis_num = 0 elif axis == "freq": this.Nfreqs = sum([this.Nfreqs] + [obj.Nfreqs for obj in other]) this.freq_array = np.concatenate( [this.freq_array] + [obj.freq_array for obj in other] ) this.channel_width = np.concatenate( [this.channel_width] + [obj.channel_width for obj in other] ) this.flex_spw_id_array = np.concatenate( [this.flex_spw_id_array] + [obj.flex_spw_id_array for obj in other] ) # We want to preserve per-spw information based on first appearance # in the concatenated array. unique_index = np.sort( np.unique(this.flex_spw_id_array, return_index=True)[1] ) this.spw_array = this.flex_spw_id_array[unique_index] this.Nspws = this.spw_array.size if this.flex_jones_array is not None: concat_spw_array = np.concatenate( [this.spw_array] + [obj.spw_array for obj in other] ) spw_index = np.asarray( [ np.nonzero(concat_spw_array == spw)[0][0] for spw in this.spw_array ] ) this.flex_jones_array = np.concatenate( [this.flex_jones_array] + [obj.flex_jones_array for obj in other] ) this.flex_jones_array = this.flex_jones_array[spw_index] axis_num = 1 elif axis == "spw": # only get here for a wide-band cal this.Nspws = sum([this.Nspws] + [obj.Nspws for obj in other]) this.spw_array = np.concatenate( [this.spw_array] + [obj.spw_array for obj in other] ) this.freq_range = np.concatenate( [this.freq_range] + [obj.freq_range for obj in other], axis=0 ) if this.flex_jones_array is not None: this.flex_jones_array = np.concatenate( [this.flex_jones_array] + [obj.flex_jones_array for obj in other], axis=0, ) axis_num = 1 elif axis == "time": this.Ntimes = sum([this.Ntimes] + [obj.Ntimes for obj in other]) if this.time_array is not None: this.time_array = np.concatenate( [this.time_array] + [obj.time_array for obj in other] ) if this.time_range is not None: this.time_range = np.concatenate( [this.time_range] + [obj.time_range for obj in other], axis=0 ) this.integration_time = np.concatenate( [this.integration_time] + [obj.integration_time for obj in other] ) if this.lst_array is not None: this.lst_array = np.concatenate( [this.lst_array] + [obj.lst_array for obj in other] ) if this.lst_range is not None: this.lst_range = np.concatenate( [this.lst_range] + [obj.lst_range for obj in other], axis=0 ) if this.phase_center_id_array is not None: this.phase_center_id_array = np.concatenate( ( [this.phase_center_id_array] + [obj.phase_center_id_array for obj in other] ), axis=0, ) if this.ref_antenna_array is not None: this.ref_antenna_array = np.concatenate( [this.ref_antenna_array] + [obj.ref_antenna_array for obj in other], axis=0, ) axis_num = 2 elif axis == "jones": this.Njones = sum([this.Njones] + [obj.Njones for obj in other]) this.jones_array = np.concatenate( [this.jones_array] + [obj.jones_array for obj in other] ) axis_num = 3 if not self.metadata_only: this.flag_array = np.concatenate( [this.flag_array] + [obj.flag_array for obj in other], axis=axis_num ) if this.cal_type == "gain": this.gain_array = np.concatenate( [this.gain_array] + [obj.gain_array for obj in other], axis=axis_num ) else: this.delay_array = np.concatenate( [this.delay_array] + [obj.delay_array for obj in other], axis=axis_num, ) if np.any(quality_exists): if np.all(quality_exists): this.quality_array = np.concatenate( [this.quality_array] + [obj.quality_array for obj in other], axis=axis_num, ) else: qa_list = [] if this.quality_array is None: qa_list.append(np.zeros(this_qa_exp_shape, dtype=float)) else: qa_list.append(this.quality_array) for obj in other: if obj.quality_array is None: qa_list.append( np.zeros( (obj._quality_array.expected_shape(obj)), dtype=float, ) ) else: qa_list.append(obj.quality_array) this.quality_array = np.concatenate(qa_list, axis=axis_num) if np.any(total_quality_exists): if axis == "antenna": warnings.warn( "Total quality array detected in at least one file; the " "array in the new object will be set to 'None' because " "whole-array values cannot be combined when adding antennas" ) this.total_quality_array = None else: if np.all(total_quality_exists): this.total_quality_array = np.concatenate( [this.total_quality_array] + [obj.total_quality_array for obj in other], axis=axis_num - 1, ) else: tqa_list = [] if this.total_quality_array is None: tqa_list.append(np.zeros(this_tqa_exp_shape, dtype=float)) else: tqa_list.append(this.total_quality_array) for obj in other: if obj.total_quality_array is None: tqa_list.append( np.zeros( (obj._total_quality_array.expected_shape(obj)), dtype=float, ) ) else: tqa_list.append(obj.total_quality_array) this.total_quality_array = np.concatenate( tqa_list, axis=axis_num - 1 ) # update filename attribute for obj in other: this.filename = utils.tools._combine_filenames(this.filename, obj.filename) if this.filename is not None: this._filename.form = len(this.filename) # Check final object is self-consistent if run_check: this.check( check_extra=check_extra, run_check_acceptability=run_check_acceptability, check_freq_spacing=warn_spacing, check_jones_spacing=warn_spacing, check_time_spacing=warn_spacing, raise_spacing_errors=False, ) if not inplace: return this
def _select_preprocess( self, *, antenna_nums, antenna_names, frequencies, freq_chans, spws, times, time_range, lsts, lst_range, jones, phase_center_ids, catalog_names, invert=False, strict=False, warn_spacing=False, ): """ Downselect data to keep on the object along various axes. Axes that can be selected along include antennas, frequencies, times and antenna polarization (jones). The history attribute on the object will be updated to identify the operations performed. Parameters ---------- antenna_nums : array_like of int, optional The antennas numbers to keep in the object (antenna positions and names for the removed antennas will be retained). This cannot be provided if `antenna_names` is also provided. antenna_names : array_like of str, optional The antennas names to keep in the object (antenna positions and names for the removed antennas will be retained). This cannot be provided if `antenna_nums` is also provided. frequencies : array_like of float, optional The frequencies to keep in the object, each value passed here should exist in the freq_array. freq_chans : array_like of int, optional The frequency channel numbers to keep in the object. spws : array_like of in, optional The spectral window numbers to keep in the object. If this is not a wide-band object and `frequencies` or `freq_chans` is not None, frequencies that match any of the specifications will be kept (i.e. the selections will be OR'ed together). times : array_like of float, optional The times to keep in the object, each value passed here should exist in the time_array or be contained in a time_range on the object. time_range : array_like of float, optional The time range in Julian Date to keep in the object, must be length 2. Some of the times in the object should fall between the first and last elements. Cannot be used with `times`. lsts : array_like of float, optional The local sidereal times (LSTs) to keep in the object, each value passed here should exist in the lst_array. Cannot be used with `times`, `time_range`, or `lst_range`. lst_range : array_like of float, optional The local sidereal time (LST) range in radians to keep in the object, must be of length 2. Some of the LSTs in the object should fall between the first and last elements. If the second value is smaller than the first, the LSTs are treated as having phase-wrapped around LST = 2*pi = 0, and the LSTs kept on the object will run from the larger value, through 0, and end at the smaller value. jones : array_like of int or str, optional The antenna polarizations numbers to keep in the object, each value passed here should exist in the jones_array. If passing strings, the canonical polarization strings (e.g. "Jxx", "Jrr") are supported and if the `x_orientation` attribute is set, the physical dipole strings (e.g. "Jnn", "Jee") are also supported. phase_center_ids : array_like of int, optional Phase center IDs to keep on the object (effectively a selection on baseline-times). Cannot be used with `catalog_names`. catalog_names : str or array-like of str, optional The names of the phase centers (sources) to keep in the object, which should match exactly in spelling and capitalization. Cannot be used with `phase_center_ids`. invert : bool Normally records matching given criteria are what are included in the subsequent object. However, if set to True, these records are excluded instead. Default is False. strict : bool or None Normally, select will warn when no records match a one element of a parameter, as long as *at least one* element matches with what is in the object. However, if set to True, an error is thrown if any element does not match. If set to None, then neither errors nor warnings are raised. Default is False. warn_spacing : bool Option to raise warnings about spacing that would prevent writing to calfits file-format. Default is False. Returns ------- ant_inds : list of int list of antenna indices to keep. Can be None (to keep everything). time_inds : list of int list of time indices to keep. Can be None (to keep everything). spw_inds : list of int list of spw indices to keep. Can be None (to keep everything). freq_inds : list of int list of frequency indices to keep. Can be None (to keep everything). pol_inds : list of int list of polarization indices to keep. Can be None (to keep everything). history_update_string : str string to append to the end of the history. """ selections = [] if (phase_center_ids is not None) and (catalog_names is not None): raise ValueError("Cannot set both phase_center_ids and catalog_names.") if ( self.phase_center_id_array is None or self.phase_center_catalog is None ) and (phase_center_ids is not None or catalog_names is not None): raise ValueError( "Both phase_center_id_array and phase_center_catalog attributes of " "the UVCal object must be set in order to select on phase center " "IDs or catalog names." ) ant_inds, ant_selections = utils.antenna._select_antenna_helper( antenna_names=antenna_names, antenna_nums=antenna_nums, tel_ant_names=self.telescope.antenna_names, tel_ant_nums=self.telescope.antenna_numbers, obj_ant_array=self.ant_array, invert=invert, strict=strict, ) selections.extend(ant_selections) time_inds, time_selections = utils.times._select_times_helper( times=times, time_range=time_range, lsts=lsts, lst_range=lst_range, obj_time_array=self.time_array, obj_time_range=self.time_range, obj_lst_array=self.lst_array, obj_lst_range=self.lst_range, time_tols=self._time_array.tols, lst_tols=self._lst_array.tols, invert=invert, strict=strict, warn_spacing=warn_spacing, ) selections.extend(time_selections) if catalog_names is not None: phase_center_ids = utils.phase_center_catalog.look_for_name( self.phase_center_catalog, catalog_names ) if phase_center_ids is not None: pc_check = np.isin( self.phase_center_id_array, phase_center_ids, invert=invert ) time_inds = utils.tools._sorted_unique_intersection( np.where(pc_check)[0], time_inds ) pc_selection = ( "phase center IDs" if (catalog_names is None) else "catalog names" ) selections.append(pc_selection) if spws is not None and self.Nspws == 1: warnings.warn( "Cannot select on spws if Nspws=1. Ignoring the spw parameter." ) spws = None if self.freq_array is None and ( freq_chans is not None or frequencies is not None ): raise ValueError( "Cannot select on frequencies because this is a wide_band object with " "no freq_array." ) freq_inds, spw_inds, freq_selections = utils.frequency._select_freq_helper( frequencies=frequencies, freq_chans=freq_chans, obj_freq_array=self.freq_array, freq_tols=self._freq_array.tols, obj_channel_width=self.channel_width, channel_width_tols=self._channel_width.tols, obj_spw_id_array=self.flex_spw_id_array, spws=spws, obj_spw_array=self.spw_array, jones=jones, obj_flex_jones_array=self.flex_jones_array, obj_x_orientation=self.telescope.get_x_orientation_from_feeds(), invert=invert, strict=strict, warn_spacing=warn_spacing, ) selections.extend(freq_selections) jones_inds, jones_selections = utils.pol._select_jones_helper( jones=jones, obj_jones_array=self.jones_array, obj_x_orientation=self.telescope.get_x_orientation_from_feeds(), flex_jones=self.flex_jones_array is not None, invert=invert, strict=strict, warn_spacing=warn_spacing, ) selections.extend(jones_selections) # build up history string from selections history_update_string = "" if len(selections) > 0: history_update_string = ( " Downselected to specific " + ", ".join(selections) + " using pyuvdata." ) return ( ant_inds, time_inds, spw_inds, freq_inds, jones_inds, history_update_string, ) def _select_by_index( self, *, ant_inds, time_inds, spw_inds, freq_inds, jones_inds, history_update_string, ): """ Perform select based on indexing arrays. Parameters ---------- ant_inds : list of int list of antenna indices to keep. Can be None (to keep everything). time_inds : list of int list of time indices to keep. Can be None (to keep everything). freq_inds : list of int list of frequency indices to keep. Can be None (to keep everything). jones_inds : list of int list of jones indices to keep. Can be None (to keep everything). history_update_string : str string to append to the end of the history. """ # Create a dictionary to pass to _select_along_param_axis ind_dict = { "Nants_data": ant_inds, "Ntimes": time_inds, "Nspws": spw_inds, "Nfreqs": freq_inds, "Njones": jones_inds, } # During each loop interval, we pop off an element of this dict, so continue # until the dict is empty. self._select_along_param_axis(ind_dict) if ant_inds is not None and self.total_quality_array is not None: warnings.warn( "Changing number of antennas, but preserving the " "total_quality_array, which may have been defined based " "in part on antennas which will be removed." ) # Update the history string self.history += history_update_string
[docs] def select( self, *, antenna_nums=None, antenna_names=None, frequencies=None, freq_chans=None, spws=None, times=None, time_range=None, lsts=None, lst_range=None, jones=None, phase_center_ids=None, catalog_names=None, invert=False, strict=False, run_check=True, check_extra=True, run_check_acceptability=True, inplace=True, warn_spacing=False, ): """ Downselect data to keep on the object along various axes. Axes that can be selected along include antennas, frequencies, times and antenna polarization (jones). The history attribute on the object will be updated to identify the operations performed. Parameters ---------- antenna_nums : array_like of int, optional The antennas numbers to keep in the object (antenna positions and names for the removed antennas will be retained). This cannot be provided if `antenna_names` is also provided. antenna_names : array_like of str, optional The antennas names to keep in the object (antenna positions and names for the removed antennas will be retained). This cannot be provided if `antenna_nums` is also provided. frequencies : array_like of float, optional The frequencies to keep in the object, each value passed here should exist in the freq_array. freq_chans : array_like of int, optional The frequency channel numbers to keep in the object. spws : array_like of in, optional The spectral window numbers to keep in the object. If this is not a wide-band object and `frequencies` or `freq_chans` is not None, frequencies that match any of the specifications will be kept (i.e. the selections will be OR'ed together). times : array_like of float, optional The times to keep in the object, each value passed here should exist in the time_array or be contained in a time_range on the object. time_range : array_like of float, optional The time range in Julian Date to keep in the object, must be length 2. Some of the times in the object should fall between the first and last elements. Cannot be used with `times`. lsts : array_like of float, optional The local sidereal times (LSTs) to keep in the object, each value passed here should exist in the lst_array. Cannot be used with `times`, `time_range`, or `lst_range`. lst_range : array_like of float, optional The local sidereal time (LST) range in radians to keep in the object, must be of length 2. Some of the LSTs in the object should fall between the first and last elements. If the second value is smaller than the first, the LSTs are treated as having phase-wrapped around LST = 2*pi = 0, and the LSTs kept on the object will run from the larger value, through 0, and end at the smaller value. jones : array_like of int or str, optional The antenna polarizations numbers to keep in the object, each value passed here should exist in the jones_array. If passing strings, the canonical polarization strings (e.g. "Jxx", "Jrr") are supported and if the `x_orientation` attribute is set, the physical dipole strings (e.g. "Jnn", "Jee") are also supported. phase_center_ids : array_like of int, optional Phase center IDs to keep on the object (effectively a selection on baseline-times). Cannot be used with `catalog_names`. catalog_names : str or array-like of str, optional The names of the phase centers (sources) to keep in the object, which should match exactly in spelling and capitalization. Cannot be used with `phase_center_ids`. invert : bool Normally records matching given criteria are what are included in the subsequent object. However, if set to True, these records are excluded instead. Default is False. strict : bool or None Normally, select will warn when no records match a one element of a parameter, as long as *at least one* element matches with what is in the object. However, if set to True, an error is thrown if any element does not match. If set to None, then neither errors nor warnings are raised. Default is False. run_check : bool Option to check for the existence and proper shapes of parameters after downselecting data on this object (the default is True, meaning the check will be run). check_extra : bool Option to check optional parameters as well as required ones (the default is True, meaning the optional parameters will be checked). run_check_acceptability : bool Option to check acceptable range of the values of parameters after downselecting data on this object (the default is True, meaning the acceptable range check will be done). inplace : bool Option to perform the select directly on self or return a new UVCal object with just the selected data (the default is True, meaning the select will be done on self). warn_spacing : bool Option to raise warnings about spacing that would prevent writing to calfits file-format. Default is False. """ if inplace: cal_object = self else: cal_object = self.copy() # Figure out which index positions we want to hold on to. ( ant_inds, time_inds, spw_inds, freq_inds, jones_inds, history_update_string, ) = cal_object._select_preprocess( antenna_nums=antenna_nums, antenna_names=antenna_names, frequencies=frequencies, freq_chans=freq_chans, spws=spws, times=times, time_range=time_range, lsts=lsts, lst_range=lst_range, jones=jones, phase_center_ids=phase_center_ids, catalog_names=catalog_names, invert=invert, strict=strict, warn_spacing=warn_spacing, ) # Call the low-level selection method. cal_object._select_by_index( ant_inds=ant_inds, time_inds=time_inds, freq_inds=freq_inds, spw_inds=spw_inds, jones_inds=jones_inds, history_update_string=history_update_string, ) # check if object is self-consistent if run_check: cal_object.check( check_extra=check_extra, run_check_acceptability=run_check_acceptability ) if not inplace: return cal_object
def _convert_from_filetype(self, other): for p in other: param = getattr(other, p) setattr(self, p, param) def _convert_to_filetype(self, filetype): if filetype == "calfits": from . import calfits other_obj = calfits.CALFITS() elif filetype == "calh5": from . import calh5 other_obj = calh5.CalH5() elif filetype == "ms": from . import ms_cal other_obj = ms_cal.MSCal() else: raise ValueError("filetype must be calh5, calfits, or ms.") for p in self: param = getattr(self, p) setattr(other_obj, p, param) return other_obj
[docs] @classmethod @combine_docstrings(initializers.new_uvcal_from_uvdata) def initialize_from_uvdata( cls, uvdata, *, gain_convention, cal_style, metadata_only=True, **kwargs ): """ Initialize this object based on a UVData object. Parameters ---------- uvdata : UVData object The UVData object to initialize from. metadata_only : bool Option to only initialize the metadata. If False, this method also initializes the data-like arrays to zeros/ones as appropriate (or False for the flag_array) with the appropriate sizes. """ new = initializers.new_uvcal_from_uvdata( uvdata=uvdata, gain_convention=gain_convention, cal_style=cal_style, empty=not metadata_only, **kwargs, ) return new
[docs] def read_calfits(self, filename, **kwargs): """ Read in data from calfits file(s). Parameters ---------- filename : str The calfits file to read from. read_data : bool Read in the gains or delays, quality arrays and flag arrays. If set to False, only the metadata will be read in. Setting read_data to False results in a metadata only object. background_lsts : bool When set to True, the lst_array is calculated in a background thread. run_check : bool Option to check for the existence and proper shapes of parameters after reading in the file. check_extra : bool Option to check optional parameters as well as required ones. run_check_acceptability : bool Option to check acceptable range of the values of parameters after reading in the file. default_mount_type : str If not recorded in the data set or telescope is unknown to pyuvdata, the `Telescope.mount_type` parameter is automatically set to "other". However, users can specify a different default by passing an argument here. astrometry_library : str Library used for calculating LSTs. Allowed options are 'erfa' (which uses the pyERFA), 'novas' (which uses the python-novas library), and 'astropy' (which uses the astropy utilities). Default is erfa unless the telescope_location frame is MCMF (on the moon), in which case the default is astropy. """ from . import calfits if isinstance(filename, list | tuple): raise ValueError( "Use the generic `UVCal.read` method to read multiple files." ) calfits_obj = calfits.CALFITS() calfits_obj.read_calfits(filename, **kwargs) self._convert_from_filetype(calfits_obj) del calfits_obj
[docs] def read_calh5(self, filename, **kwargs): """ Read in data from calh5 file(s). Parameters ---------- filename : string, path, FastCalH5Meta, h5py.File The CalH5 file to read from. A file name or path or a FastCalH5Meta or h5py File object. Must contains a "/Header" group for CalH5 files conforming to spec. antenna_nums : array_like of int, optional The antennas numbers to include when reading data into the object (antenna positions and names for the removed antennas will be retained). This cannot be provided if `antenna_names` is also provided. Ignored if read_data is False. antenna_names : array_like of str, optional The antennas names to include when reading data into the object (antenna positions and names for the removed antennas will be retained). This cannot be provided if `antenna_nums` is also provided. Ignored if read_data is False. frequencies : array_like of float, optional The frequencies to include when reading data into the object, each value passed here should exist in the freq_array. Ignored if read_data is False. freq_chans : array_like of int, optional The frequency channel numbers to include when reading data into the object. Ignored if read_data is False. spws : array_like of in, optional The spectral window numbers to keep in the object. If this is not a wide-band object and `frequencies` or `freq_chans` is not None, frequencies that match any of the specifications will be kept (i.e. the selections will be OR'ed together). times : array_like of float, optional The times to include when reading data into the object, each value passed here should exist in the time_array. Cannot be used with `time_range`. time_range : array_like of float, optional The time range in Julian Date to keep in the object, must be length 2. Some of the times in the object should fall between the first and last elements. Cannot be used with `times`. lsts : array_like of float, optional The local sidereal times (LSTs) to keep in the object, each value passed here should exist in the lst_array. Cannot be used with `times`, `time_range`, or `lst_range`. lst_range : array_like of float, optional The local sidereal time (LST) range in radians to keep in the object, must be of length 2. Some of the LSTs in the object should fall between the first and last elements. If the second value is smaller than the first, the LSTs are treated as having phase-wrapped around LST = 2*pi = 0, and the LSTs kept on the object will run from the larger value, through 0, and end at the smaller value. jones : array_like of int, optional The jones polarizations numbers to include when reading data into the object, each value passed here should exist in the polarization_array. Ignored if read_data is False. phase_center_ids : array_like of int, optional Phase center IDs to keep on the object (effectively a selection on baseline-times). Cannot be used with `catalog_names`. catalog_names : str or array-like of str, optional The names of the phase centers (sources) to keep in the object, which should match exactly in spelling and capitalization. Cannot be used with `phase_center_ids`. read_data : bool Read in the data-like arrays (gains/delays, flags, qualities). If set to False, only the metadata will be read in. Setting read_data to False results in a metadata only object. background_lsts : bool When set to True, the lst_array is calculated in a background thread. run_check : bool Option to check for the existence and proper shapes of parameters after after reading in the file (the default is True, meaning the check will be run). Ignored if read_data is False. check_extra : bool Option to check optional parameters as well as required ones (the default is True, meaning the optional parameters will be checked). Ignored if read_data is False. run_check_acceptability : bool Option to check acceptable range of the values of parameters after reading in the file (the default is True, meaning the acceptable range check will be done). Ignored if read_data is False. default_mount_type : str If not recorded in the data set or telescope is unknown to pyuvdata, the `Telescope.mount_type` parameter is automatically set to "other". However, users can specify a different default by passing an argument here. astrometry_library : str Library used for calculating LSTs. Allowed options are 'erfa' (which uses the pyERFA), 'novas' (which uses the python-novas library), and 'astropy' (which uses the astropy utilities). Default is erfa unless the telescope_location frame is MCMF (on the moon), in which case the default is astropy. Returns ------- None Raises ------ IOError If filename doesn't exist. ValueError If incompatible select keywords are set (e.g. `times` and `time_range`) or select keywords exclude all data or if keywords are set to the wrong type. """ from . import calh5 if isinstance(filename, list | tuple): raise ValueError( "Use the generic `UVCal.read` method to read multiple files." ) calh5_obj = calh5.CalH5() calh5_obj.read_calh5(filename, **kwargs) self._convert_from_filetype(calh5_obj) del calh5_obj
[docs] def read_fhd_cal( self, cal_file, *, obs_file, layout_file=None, settings_file=None, **kwargs ): """ Read data from an FHD cal.sav file. Parameters ---------- cal_file : str or list of str The cal.sav file or list of files to read from. obs_file : str or list of str The obs.sav file or list of files to read from. layout_file : str The FHD layout file. Required for antenna_positions to be set. settings_file : str or list of str, optional The settings_file or list of files to read from. Optional, but very useful for provenance. raw : bool Option to use the raw (per antenna, per frequency) solution or to use the fitted (polynomial over phase/amplitude) solution. Default is True (meaning use the raw solutions). read_data : bool Read in the gains, quality array and flag data. If set to False, only the metadata will be read in. Setting read_data to False results in a metadata only object. If read_data is False, a settings file must be provided. background_lsts : bool When set to True, the lst_array is calculated in a background thread. extra_history : str or list of str, optional String(s) to add to the object's history parameter. run_check : bool Option to check for the existence and proper shapes of parameters after reading in the file. check_extra : bool Option to check optional parameters as well as required ones. run_check_acceptability : bool Option to check acceptable range of the values of parameters after reading in the file. default_mount_type : str If not recorded in the data set or telescope is unknown to pyuvdata, the `Telescope.mount_type` parameter is automatically set to "other". However, users can specify a different default by passing an argument here. astrometry_library : str Library used for calculating LSTs. Allowed options are 'erfa' (which uses the pyERFA), 'novas' (which uses the python-novas library), and 'astropy' (which uses the astropy utilities). Default is erfa unless the telescope_location frame is MCMF (on the moon), in which case the default is astropy. """ from . import fhd_cal if isinstance(cal_file, list | tuple): raise ValueError( "Use the generic `UVCal.read` method to read multiple files." ) fhd_cal_obj = fhd_cal.FHDCal() fhd_cal_obj.read_fhd_cal( cal_file=cal_file, obs_file=obs_file, layout_file=layout_file, settings_file=settings_file, **kwargs, ) self._convert_from_filetype(fhd_cal_obj) del fhd_cal_obj
[docs] def read_ms_cal(self, filename, **kwargs): """ Read in data from an MS calibration table. Parameters ---------- filename : str The measurement set to read from. run_check : bool Option to check for the existence and proper shapes of parameters after reading in the file. default_x_orientation : str By default, if not found on read, the x_orientation parameter will be set to "east" and a warning will be raised. However, if a value for default_x_orientation is provided, it will be used instead and the warning will be suppressed. default_jones_array : ndarray of int By default, if not found on read, the jones_array parameter will be set to [-5, -6] (linear pols) and a warning will be raised. However, if a value for default_jones_array is provided, it will be used instead and the warning will be suppressed. default_mount_type : str If not recorded in the data set or telescope is unknown to pyuvdata, the `Telescope.mount_type` parameter is automatically set to "other". However, users can specify a different default by passing an argument here. check_extra : bool Option to check optional parameters as well as required ones. run_check_acceptability : bool Option to check acceptable range of the values of parameters after reading in the file. astrometry_library : str Library used for calculating LSTs. Allowed options are 'erfa' (which uses the pyERFA), 'novas' (which uses the python-novas library), and 'astropy' (which uses the astropy utilities). Default is erfa unless the telescope_location frame is MCMF (on the moon), in which case the default is astropy. """ from . import ms_cal if isinstance(filename, list | tuple): raise ValueError( "Use the generic `UVCal.read` method to read multiple files." ) ms_cal_obj = ms_cal.MSCal() ms_cal_obj.read_ms_cal(filename, **kwargs) self._convert_from_filetype(ms_cal_obj) del ms_cal_obj
[docs] def read( self, filename, *, axis=None, file_type=None, read_data=True, background_lsts=True, astrometry_library=None, # selecting parameters antenna_nums=None, antenna_names=None, frequencies=None, freq_chans=None, spws=None, times=None, time_range=None, lsts=None, lst_range=None, jones=None, phase_center_ids=None, catalog_names=None, # checking parameters run_check=True, check_extra=True, run_check_acceptability=True, default_mount_type="other", # file-type specific parameters # FHD obs_file=None, layout_file=None, settings_file=None, raw=True, extra_history=None, # MSCal default_x_orientation=None, default_jones_array=None, ): """ Read a generic file into a UVCal object. This method supports a number of different types of files. Universal parameters (required and optional) are listed directly below, followed by parameters used by all file types related to selecting and checking. Each file type also has its own set of optional parameters that are listed at the end of this docstring. Parameters ---------- filename : str or array_like of str The file(s) or list(s) (or array(s)) of files to read from. file_type : str One of ['calfits', 'fhd'] or None. If None, the code attempts to guess what the file type is based on file extensions (FHD: .sav, .txt; uvfits: .calfits). Note that if a list of datasets is passed, the file type is determined from the first dataset. axis : str Axis to concatenate files along. This enables fast concatenation along the specified axis without the normal checking that all other metadata agrees. This method does not guarantee correct resulting objects. Please see the docstring for fast_concat for details. Allowed values are: 'antenna', 'time', 'freq', 'spw', 'jones' ('freq' is not allowed for delay or wideband objects and 'spw' is only allowed for wideband objects). Only used if multiple files are passed. read_data : bool Read in the gains or delays, quality arrays and flag arrays. If set to False, only the metadata will be read in. Setting read_data to False results in a metadata only object. background_lsts : bool When set to True, the lst_array is calculated in a background thread. astrometry_library : str Library used for calculating LSTs. Allowed options are 'erfa' (which uses the pyERFA), 'novas' (which uses the python-novas library), and 'astropy' (which uses the astropy utilities). Default is erfa unless the telescope_location frame is MCMF (on the moon), in which case the default is astropy. Selecting --------- antenna_nums : array_like of int, optional The antennas numbers to include when reading data into the object (antenna positions and names for the removed antennas will be retained). This cannot be provided if `antenna_names` is also provided. Ignored if read_data is False. antenna_names : array_like of str, optional The antennas names to include when reading data into the object (antenna positions and names for the removed antennas will be retained). This cannot be provided if `antenna_nums` is also provided. Ignored if read_data is False. frequencies : array_like of float, optional The frequencies to include when reading data into the object, each value passed here should exist in the freq_array. Ignored if read_data is False. freq_chans : array_like of int, optional The frequency channel numbers to include when reading data into the object. Ignored if read_data is False. spws : array_like of in, optional The spectral window numbers to keep in the object. If this is not a wide-band object and `frequencies` or `freq_chans` is not None, frequencies that match any of the specifications will be kept (i.e. the selections will be OR'ed together). times : array_like of float, optional The times to include when reading data into the object, each value passed here should exist in the time_array. Cannot be used with `time_range`. time_range : array_like of float, optional The time range in Julian Date to keep in the object, must be length 2. Some of the times in the object should fall between the first and last elements. Cannot be used with `times`. lsts : array_like of float, optional The local sidereal times (LSTs) to keep in the object, each value passed here should exist in the lst_array. Cannot be used with `times`, `time_range`, or `lst_range`. lst_range : array_like of float, optional The local sidereal time (LST) range in radians to keep in the object, must be of length 2. Some of the LSTs in the object should fall between the first and last elements. If the second value is smaller than the first, the LSTs are treated as having phase-wrapped around LST = 2*pi = 0, and the LSTs kept on the object will run from the larger value, through 0, and end at the smaller value. jones : array_like of int, optional The jones polarizations numbers to include when reading data into the object, each value passed here should exist in the polarization_array. Ignored if read_data is False. phase_center_ids : array_like of int, optional Phase center IDs to keep on the object (effectively a selection on baseline-times). Cannot be used with `catalog_names`. catalog_names : str or array-like of str, optional The names of the phase centers (sources) to keep in the object, which should match exactly in spelling and capitalization. Cannot be used with `phase_center_ids`. Checking -------- run_check : bool Option to check for the existence and proper shapes of parameters after reading in the file. check_extra : bool Option to check optional parameters as well as required ones. run_check_acceptability : bool Option to check acceptable range of the values of parameters after reading in the file. default_mount_type : str If not recorded in the data set or telescope is unknown to pyuvdata, the `Telescope.mount_type` parameter is automatically set to "other". However, users can specify a different default by passing an argument here. FHD --- obs_file : str or list of str The obs.sav file or list of files to read from. This is required for FHD files. layout_file : str The FHD layout file. Required for antenna_positions to be set. settings_file : str or list of str, optional The settings_file or list of files to read from. Optional, but very useful for provenance. raw : bool Option to use the raw (per antenna, per frequency) solution or to use the fitted (polynomial over phase/amplitude) solution. Default is True (meaning use the raw solutions). MSCal ----- default_x_orientation : str By default, if not found on read, the x_orientation parameter will be set to "east" and a warning will be raised. However, if a value for default_x_orientation is provided, it will be used instead and the warning will be suppressed. default_jones_array : ndarray of int By default, if not found on read, the jones_array parameter will be set to [-5, -6] (linear pols) and a warning will be raised. However, if a value for default_jones_array is provided, it will be used instead and the warning will be suppressed. """ if isinstance(filename, list | tuple | np.ndarray): for ind in range(len(filename)): if isinstance(filename[ind], list | tuple | np.ndarray): raise ValueError( "If filename is a list, tuple or array it cannot be nested or " "multi dimensional." ) _, extension = os.path.splitext(filename[0]) n_files = len(filename) else: _, extension = os.path.splitext(filename) n_files = 1 multi = False if n_files > 1: multi = True elif isinstance(filename, list | tuple | np.ndarray): filename = filename[0] if file_type is None: if extension == ".sav" or extension == ".txt": file_type = "fhd" elif "fits" in extension: file_type = "calfits" elif "h5" in extension: file_type = "calh5" if file_type is None: # Nothing could be auto-determined, so let's jump to the next level and # try to look at the file contents and see what's up. In paticular, if this # is a directory, there's some additional clues for us to look for. file_test = filename[0] if multi else filename if os.path.isdir(file_test) and os.path.exists( os.path.join(file_test, "OBSERVATION") ): # It's a measurement set. file_type = "ms" if file_type is None: raise ValueError( "File type could not be determined, use the " "file_type keyword to specify the type." ) if file_type not in ["calfits", "fhd", "calh5", "ms"]: raise ValueError( "The only supported file_types are 'calfits', 'calh5', 'fhd', and 'ms'." ) obs_file_use = None layout_file_use = None settings_file_use = None if file_type == "fhd": if obs_file is None: raise ValueError("obs_file parameter must be set for FHD files.") else: if isinstance(obs_file, list | tuple): n_obs = len(obs_file) obs_file_use = obs_file[0] else: n_obs = 1 obs_file_use = obs_file if n_obs != n_files: raise ValueError("Number of obs_files must match number of cal_files") if layout_file is not None: if isinstance(layout_file, list | tuple): n_layout = len(layout_file) layout_file_use = layout_file[0] else: n_layout = 1 layout_file_use = layout_file if n_layout != n_files: raise ValueError( "Number of layout_files must match number of cal_files" ) if settings_file is not None: if isinstance(settings_file, list | tuple): n_settings = len(settings_file) settings_file_use = settings_file[0] else: n_settings = 1 settings_file_use = settings_file if n_settings != n_files: raise ValueError( "Number of settings_files must match number of cal_files" ) if multi: self.read( filename[0], file_type=file_type, read_data=read_data, background_lsts=background_lsts, astrometry_library=astrometry_library, # selecting parameters antenna_nums=antenna_nums, antenna_names=antenna_names, frequencies=frequencies, freq_chans=freq_chans, spws=spws, times=times, time_range=time_range, lsts=lsts, lst_range=lst_range, jones=jones, phase_center_ids=phase_center_ids, catalog_names=catalog_names, # checking parameters run_check=run_check, check_extra=check_extra, run_check_acceptability=run_check_acceptability, # file-type specific parameters default_mount_type=default_mount_type, # FHD obs_file=obs_file_use, layout_file=layout_file_use, settings_file=settings_file_use, raw=raw, extra_history=extra_history, # MSCal default_x_orientation=default_x_orientation, default_jones_array=default_jones_array, ) uv_list = [] for ind, file in enumerate(filename[1:]): if file_type == "fhd": file_index = ind + 1 obs_file_use = obs_file[file_index] if layout_file is not None: layout_file_use = layout_file[file_index] if settings_file is not None: settings_file_use = settings_file[file_index] uvcal2 = UVCal() uvcal2.read( file, read_data=read_data, file_type=file_type, background_lsts=background_lsts, astrometry_library=astrometry_library, # selecting parameters antenna_nums=antenna_nums, antenna_names=antenna_names, frequencies=frequencies, freq_chans=freq_chans, spws=spws, times=times, time_range=time_range, lsts=lsts, lst_range=lst_range, jones=jones, phase_center_ids=phase_center_ids, catalog_names=catalog_names, # checking parameters run_check=run_check, check_extra=check_extra, run_check_acceptability=run_check_acceptability, # file-type specific parameters default_mount_type=default_mount_type, # FHD obs_file=obs_file_use, layout_file=layout_file_use, settings_file=settings_file_use, raw=raw, extra_history=extra_history, # MSCal default_x_orientation=default_x_orientation, default_jones_array=default_jones_array, ) uv_list.append(uvcal2) # Concatenate once at end if axis is not None: # fast_concat to operates on lists self.fast_concat( uv_list, axis, run_check=run_check, check_extra=check_extra, run_check_acceptability=run_check_acceptability, inplace=True, ) else: # Too much work to rewrite __add__ to operate on lists # of files, so instead doing a binary tree merge uv_list = [self] + uv_list while len(uv_list) > 1: # for an odd number of files, the second argument will be shorter # so the last element in the first list won't be combined, but it # will not be lost, so it's ok. for uv1, uv2 in zip(uv_list[0::2], uv_list[1::2], strict=False): uv1.__iadd__( uv2, run_check=run_check, check_extra=check_extra, run_check_acceptability=run_check_acceptability, ) uv_list = uv_list[0::2] # Because self was at the beginning of the list, # everything is merged into it at the end of this loop else: if file_type in ["fhd", "calfits", "ms"]: if ( antenna_nums is not None or antenna_names is not None or frequencies is not None or freq_chans is not None or spws is not None or times is not None or lsts is not None or time_range is not None or lst_range is not None or jones is not None or phase_center_ids is not None or catalog_names is not None ): select = True warnings.warn( "Warning: select on read keyword set, but " f'file_type is "{file_type}" which does not support select ' "on read. Entire file will be read and then select " "will be performed" ) # these file types do not have select on read, so set all # select parameters select_antenna_nums = antenna_nums select_antenna_names = antenna_names select_frequencies = frequencies select_freq_chans = freq_chans select_spws = spws select_times = times select_lsts = lsts select_time_range = time_range select_lst_range = lst_range select_jones = jones select_phase_center_ids = phase_center_ids select_catalog_names = catalog_names else: select = False elif file_type in ["calh5"]: select = False if file_type == "calfits": self.read_calfits( filename, read_data=read_data, background_lsts=background_lsts, default_mount_type=default_mount_type, run_check=run_check, check_extra=check_extra, run_check_acceptability=run_check_acceptability, astrometry_library=astrometry_library, ) elif file_type == "fhd": self.read_fhd_cal( cal_file=filename, obs_file=obs_file, layout_file=layout_file, settings_file=settings_file, raw=raw, read_data=read_data, extra_history=extra_history, default_mount_type=default_mount_type, background_lsts=background_lsts, run_check=run_check, check_extra=check_extra, run_check_acceptability=run_check_acceptability, astrometry_library=astrometry_library, ) elif file_type == "calh5": self.read_calh5( filename, antenna_nums=antenna_nums, antenna_names=antenna_names, frequencies=frequencies, freq_chans=freq_chans, spws=spws, times=times, time_range=time_range, lsts=lsts, lst_range=lst_range, jones=jones, phase_center_ids=phase_center_ids, catalog_names=catalog_names, read_data=read_data, default_mount_type=default_mount_type, background_lsts=background_lsts, run_check=run_check, check_extra=check_extra, run_check_acceptability=run_check_acceptability, astrometry_library=astrometry_library, ) elif file_type == "ms": self.read_ms_cal( filename, default_x_orientation=default_x_orientation, default_jones_array=default_jones_array, default_mount_type=default_mount_type, run_check=run_check, check_extra=check_extra, run_check_acceptability=run_check_acceptability, astrometry_library=astrometry_library, ) if select: self.select( antenna_nums=select_antenna_nums, antenna_names=select_antenna_names, frequencies=select_frequencies, freq_chans=select_freq_chans, spws=select_spws, times=select_times, lsts=select_lsts, time_range=select_time_range, lst_range=select_lst_range, jones=select_jones, phase_center_ids=select_phase_center_ids, catalog_names=select_catalog_names, run_check=run_check, check_extra=check_extra, run_check_acceptability=run_check_acceptability, )
[docs] @classmethod @copy_replace_short_description(read, style=DocstringStyle.NUMPYDOC) def from_file(cls, filename, **kwargs): """Initialize a new UVCal object by reading the input file.""" uvc = cls() uvc.read(filename, **kwargs) return uvc
[docs] def write_calfits( self, filename, *, run_check=True, check_extra=True, run_check_acceptability=True, clobber=False, ): """ Write the data to a calfits file. Parameters ---------- filename : str The calfits file to write to. run_check : bool Option to check for the existence and proper shapes of parameters before writing the file. check_extra : bool Option to check optional parameters as well as required ones. run_check_acceptability : bool Option to check acceptable range of the values of parameters before writing the file. clobber : bool Option to overwrite the filename if the file already exists. Raises ------ ValueError If the UVCal object is a metadata only object. """ if self.metadata_only: raise ValueError( "Cannot write out metadata only objects to a calfits file." ) calfits_obj = self._convert_to_filetype("calfits") calfits_obj.write_calfits( filename, run_check=run_check, check_extra=check_extra, run_check_acceptability=run_check_acceptability, clobber=clobber, ) del calfits_obj
[docs] def write_calh5(self, filename, **kwargs): """ Write the data to a calh5 file. Write an in-memory UVCal object to a CalH5 file. Parameters ---------- filename : str The CalH5 file to write to. clobber : bool Option to overwrite the file if it already exists. chunks : tuple or bool h5py.create_dataset chunks keyword. Tuple for chunk shape, True for auto-chunking, None for no chunking. Default is True. data_compression : str HDF5 filter to apply when writing the gain_array or delay. Default is None (no filter/compression). In addition to the normal HDF5 filter values, the user may specify "bitshuffle" which will set the compression to `32008` for bitshuffle and will set the `compression_opts` to `(0, 2)` to allow bitshuffle to automatically determine the block size and to use the LZF filter after bitshuffle. Using `bitshuffle` requires having the `hdf5plugin` package installed. Dataset must be chunked to use compression. flags_compression : str HDF5 filter to apply when writing the flags_array. Default is the LZF filter. Dataset must be chunked. quality_compression : str HDF5 filter to apply when writing the quality_array and/or total_quality_array if they are defined. Default is the LZF filter. Dataset must be chunked. run_check : bool Option to check for the existence and proper shapes of parameters before writing the file. check_extra : bool Option to check optional parameters as well as required ones. run_check_acceptability : bool Option to check acceptable range of the values of parameters before writing the file. Returns ------- None Raises ------ IOError If the file located at `filename` already exists and clobber=False, an IOError is raised. Notes ----- The HDF5 library allows for the application of "filters" when writing data, which can provide moderate to significant levels of compression for the datasets in question. Testing has shown that for some typical cases of UVData objects (empty/sparse flag_array objects, and/or uniform nsample_arrays), the built-in LZF filter provides significant compression for minimal computational overhead. """ if self.metadata_only: raise ValueError("Cannot write out metadata only objects to a calh5 file.") calh5_obj = self._convert_to_filetype("calh5") calh5_obj.write_calh5(filename, **kwargs) del calh5_obj
[docs] def write_ms_cal(self, filename, **kwargs): """ Write the data to a MS calibration table. Parameters ---------- filename : str The measurement set to write to. clobber : bool Option to overwrite the file if it already exists. """ ms_cal_obj = self._convert_to_filetype("ms") ms_cal_obj.write_ms_cal(filename, **kwargs) del ms_cal_obj