Source code for pyuvdata.utils.tools

# Copyright (c) 2024 Radio Astronomy Software Group
# Licensed under the 2-clause BSD License
"""Basic utility functions."""

from __future__ import annotations

import warnings
from collections.abc import Iterable, Iterable as IterableType

import numpy as np

from .types import FloatArray, IntArray, StrArray


[docs]def _get_iterable(x): """Return iterable version of input.""" if isinstance(x, Iterable): return x else: return (x,)
[docs]def _combine_filenames(filename1, filename2): """Combine the filename attribute from multiple UVBase objects. The 4 cases are: 1. `filename1` has been set, `filename2` has not 2. `filename1` has not been set, `filename2` has 3. `filename1` and `filename2` both have been set 4. `filename1` and `filename2` both have not been set In case (1), we do not want to update the attribute, because it is already set correctly. In case (2), we want to replace `filename1` with the value from `filename2. In case (3), we want to take the union of the sets of the filenames. In case (4), we want the filename attribute to still be `None`. Parameters ---------- filename1 : list of str or None The list of filenames for the first UVBase object. If it is not set, it should be `None`. filename2 : list of str or None The list of filenames for the second UVData object. If it is not set, it should be `None`. Returns ------- combined_filenames : list of str or None The combined list, with potentially duplicate entries removed. """ combined_filenames = filename1 if filename1 is not None: if filename2 is not None: combined_filenames = sorted(set(filename1).union(set(filename2))) elif filename2 is not None: combined_filenames = filename2 return combined_filenames
[docs]def _convert_to_slices( indices, *, max_nslice_frac=0.1, max_nslice=None, return_index_on_fail=False ): """ Convert list of indices to a list of slices. Parameters ---------- indices : list A 1D list of integers for array indexing (boolean ndarrays are also supported). max_nslice_frac : float A float from 0 -- 1. If the number of slices needed to represent input 'indices' divided by len(indices) exceeds this fraction, then we determine that we cannot easily represent 'indices' with a list of slices. max_nslice : int Optional argument, defines the maximum number of slices for determining if `indices` can be easily represented with a list of slices. If set, then the argument supplied to `max_nslice_frac` is ignored. return_index_on_fail : bool If set to True and the list of input indexes cannot easily be represented by a list of slices (as defined by `max_nslice` or `max_nslice_frac`), then return the input list of index values instead of a list of suboptimal slices. Returns ------- slice_list : list Nominally the list of slice objects used to represent indices. However, if `return_index_on_fail=True` and input indexes cannot easily be represented, return a 1-element list containing the input for `indices`. check : bool If True, indices is easily represented by slices (`max_nslice_frac` or `max_nslice` conditions met), otherwise False. Notes ----- Example: if: indices = [1, 2, 3, 4, 10, 11, 12, 13, 14] then: slices = [slice(1, 5, 1), slice(11, 15, 1)] """ # check for already a slice or a single index position if isinstance(indices, slice): return [indices], True if isinstance(indices, int | np.integer): return [slice(indices, indices + 1, 1)], True # check for boolean index if isinstance(indices, np.ndarray) and (indices.dtype == bool): eval_ind = np.where(indices)[0] else: eval_ind = indices # assert indices is longer than 2, or return trivial solutions if len(eval_ind) == 0: return [slice(0, 0)], False if len(eval_ind) <= 2: step = 1 if (len(eval_ind) < 2) else eval_ind[-1] - eval_ind[0] start = eval_ind[0] stop = eval_ind[-1] + step return [slice(start, None if (stop < 0) else stop, step)], True # Catch the simplest case of "give me a single slice or exit" if (max_nslice == 1) and return_index_on_fail: step = eval_ind[1] - eval_ind[0] start = eval_ind[0] stop = eval_ind[-1] + step if all(np.diff(eval_ind) == step): return [slice(start, None if (stop < 0) else stop, step)], True return [indices], False # setup empty slices list slices = [] # iterate over indices start = eval_ind[0] step = None for ind in eval_ind[1:]: if step is None: step = ind - start stop = ind + step continue # if the next index doesn't line up w/ the stop, this ends the slice if ind != stop: # append to list slices.append(slice(start, None if (stop < 0) else stop, step)) # setup next step start = ind stop = ind + 1 # Set this in case loop ends here step = None else: stop += step # Append the last slice slices.append(slice(start, None if (stop < 0) else stop, step)) # determine whether slices are a reasonable representation, and determine max_nslice # if only max_nslice_frac was supplied. if max_nslice is None and max_nslice_frac is not None: max_nslice = max_nslice_frac * len(eval_ind) check = len(slices) <= max_nslice if return_index_on_fail and not check: return [indices], check else: return slices, check
[docs]def slicify( ind: slice | None | IterableType[int], allow_empty: bool = False ) -> slice | None | IterableType[int]: """ Convert an iterable of integers into a slice object if possible. Parameters ---------- ind : list A 1D list of integers for array indexing. allow_empty : bool If set to False (default) and ind is a zero-length list, None is returned. If set to True, then a "zero-length slice" (e.g., `slice(0,0)`) is returned instead. Returns ------- index_obj : slice or list If the list of indices can be represented by a slice, a slice is returned, otherwise the list of indices is returned. """ if ind is None or isinstance(ind, slice): return ind if len(ind) == 0: return slice(0, 0, 1) if allow_empty else None if len(ind) == 1: return slice(ind[0], ind[0] + 1, 1) step = ind[1] - ind[0] if all(np.ediff1d(ind) == step): start = ind[0] stop = ind[-1] + step return slice(start, None if (stop < 0) else stop, step) else: # can't slicify return ind
[docs]def _multidim_ind2sub(dims_dict, dims): """ Build a flag index array based on a multi-dimensional index array. Parameters ---------- dims_dict : dict Dict whose keys are the axes being selected on, and the values are list of index positions along that axis. dims : tuple Shape of the array being accessed. """ Ndims = len(dims) indices = [None] * Ndims for axis in range(Ndims): arr = np.asarray(dims_dict.get(axis, np.arange(dims[axis]))) indices[axis] = arr.reshape([-1 if axis == idx else 1 for idx in range(Ndims)]) ravel_arr = np.ravel_multi_index(indices, dims=dims).flatten() new_dims = tuple(indices[idx].shape[idx] for idx in range(Ndims)) return ravel_arr, new_dims
[docs]def _test_array_constant(array, *, tols=None, mask=...): """ Check if an array contains constant values to some tolerance. Uses np.isclose on the min & max of the arrays with the given tolerances. Parameters ---------- array : np.ndarray or UVParameter UVParameter or array to check for constant values. tols : tuple of float, optional length 2 tuple giving (rtol, atol) to pass to np.isclose, defaults to (0, 0) if passing an array, otherwise defaults to using the tolerance on the UVParameter. mask : array-like (of ints or booleans) or Ellipses Mask which indicates which indices to evaluate. Default is all elements. Returns ------- bool True if the array is constant to the given tolerances, False otherwise. """ # Import UVParameter here rather than at the top to avoid circular imports from pyuvdata.parameter import UVParameter if isinstance(array, UVParameter): array_to_test = np.asarray(array.value)[mask] if tols is None: tols = array.tols else: array_to_test = np.asarray(array)[mask] if tols is None: tols = (0, 0) if not isinstance(tols, tuple) or len(tols) != 2: raise ValueError( "Something went wrong in utils.tools._test_array_constant. " "Please file an issue in our GitHub issue log so that we can help: " "https://github.com/RadioAstronomySoftwareGroup/pyuvdata/issues. " "Developer info: tols must be a length-2 tuple." ) if array_to_test.size < 2: # arrays with 0 or 1 elements are constant by definition return True min_val = np.min(array_to_test) max_val = np.max(array_to_test) # if min and max are equal don't bother with tolerance checking if min_val == max_val: return True return np.isclose(min_val, max_val, rtol=tols[0], atol=tols[1])
[docs]def _test_array_consistent(array, deltas, *, tols=None, mask=...): """ Check if an the spacing of an array is consistent with expect intervals. Parameters ---------- array : np.ndarray or UVParameter UVParameter or array to check for constant values. deltas : np.ndarray or UVParameter Expected widths of each entry in array, should be >= 0. tols : tuple of float, optional length 2 tuple giving (rtol, atol) to pass to np.isclose, defaults to (0, 0) if passing an array, otherwise defaults to using the tolerance on the UVParameter. mask : array-like (of ints or booleans) or Ellipses Mask which indicates which indices to evaluate. Default is all elements. Returns ------- bool True if the array is constant to the given tolerances, False otherwise. """ # Import UVParameter here rather than at the top to avoid circular imports from pyuvdata.parameter import UVParameter if isinstance(array, UVParameter): array_to_test = np.asarray(array.value)[mask] if tols is None: tols = array.tols else: array_to_test = np.asarray(array)[mask] if tols is None: tols = (0, 0) if isinstance(deltas, UVParameter): deltas_to_test = np.asarray(deltas.value)[mask] else: deltas_to_test = np.asarray(deltas)[mask] if deltas_to_test.size == 1: exp_deltas = deltas_to_test else: if array_to_test.shape != deltas_to_test.shape: raise ValueError( "Something went wrong in utils.tools._test_array_consistent. " "Please file an issue in our GitHub issue log so that we can help: " "https://github.com/RadioAstronomySoftwareGroup/pyuvdata/issues. " "Developer info: array and deltas must have same shape." ) exp_deltas = (deltas_to_test[:-1] + deltas_to_test[1:]) * 0.5 if not isinstance(tols, tuple) or len(tols) != 2: raise ValueError( "Something went wrong in utils.tools._test_array_consistent. " "Please file an issue in our GitHub issue log so that we can help: " "https://github.com/RadioAstronomySoftwareGroup/pyuvdata/issues. " "Developer info: tols must be a length-2 tuple." ) if array is None or deltas is None or array_to_test.size < 2: # arrays with 0 or 1 elements are constant by definition return True # Call the mask after isclose to handle non-ndarrays like lists return np.allclose( np.abs(np.diff(array_to_test)), exp_deltas, rtol=tols[0], atol=tols[1] )
[docs]def _test_array_constant_spacing(array, *, tols=None, mask=..., allow_resort=False): """ Check if an array is constantly spaced to some tolerance. Calls _test_array_constant on the np.diff of the array. Parameters ---------- array : np.ndarray or UVParameter UVParameter or array to check for constant spacing. tols : tuple of float, optional length 2 tuple giving (rtol, atol) to pass to np.isclose, defaults to (0, 0) if passing an array, otherwise defaults to using the tolerance on the UVParameter. mask : array-like (of ints or booleans) or Ellipses Mask which indicates which indices to evaluate. Default is all elements. allow_resort : bool If set to False, values in array are checked in their present order. If set to True, values are sorted prior to evaluating (useful for arrays that _can_ be reindexed). Default is False. Returns ------- bool True if the array spacing is constant to the given tolerances, False otherwise. """ # Import UVParameter here rather than at the top to avoid circular imports from pyuvdata.parameter import UVParameter if isinstance(array, UVParameter): array_to_test = np.asarray(array.value)[mask] if tols is None: tols = array.tols else: array_to_test = np.asarray(array)[mask] if tols is None: tols = (0, 0) if array is None or array_to_test.size <= 2: # arrays with 1 or 2 elements are constantly spaced by definition return True if allow_resort: array_to_test = np.sort(array_to_test) array_diff = np.diff(array_to_test) return _test_array_constant(array_diff, tols=tols)
[docs]def _is_between(val, val_range, wrap=False, wrap_amount=(2 * np.pi)): """ Detect if a value is between a specified range(s). Parameters ---------- val : float or ndarray Value to evaluate, either float/singleton, otherwise of shape (Nranges,). val_range : np.array Array of ranges, shape (Nranges, 2). wrap : bool Apply wrapping. Default is False. wrap_amount : float Top end of the range for the wrap (bottom is 0). Default is 2 * pi. Returns ------- bool True if any range overlaps """ lo_lim = val_range[..., 0] hi_lim = val_range[..., 1] if val_range.ndim == 1: if wrap and (hi_lim < lo_lim): lo_lim = lo_lim - wrap_amount elif wrap: hi_lim[hi_lim < lo_lim] += wrap_amount mask = (val >= lo_lim) & (val <= hi_lim) if wrap: if val_range.ndim == 1: lo_lim = wrap_amount + lo_lim hi_lim = wrap_amount + hi_lim else: val += wrap_amount mask |= (val >= lo_lim) & (val <= hi_lim) return mask
[docs]def _check_range_overlap(val_range, range_type="time"): """ Detect if any val_range in an array overlap. Parameters ---------- val_range : np.array of float Array of ranges, shape (Nranges, 2). range_type : str Type of range (for good error messages) Returns ------- bool True if any range overlaps. """ # first check that time ranges are well formed (stop is >= than start) if np.any((val_range[:, 1] - val_range[:, 0]) < 0): raise ValueError( f"The {range_type} ranges are not well-formed, some stop {range_type}s " f"are after start {range_type}s." ) # Sort by start time sorted_ranges = val_range[np.argsort(val_range[:, 0]), :] # then check if adjacent pairs overlap for ind in range(sorted_ranges.shape[0] - 1): range1 = sorted_ranges[ind] range2 = sorted_ranges[ind + 1] if range2[0] < range1[1]: return True
[docs]def _sorted_unique_union(obj1, obj2=None): """ Determine the union of unique elements from two lists. Convenience function for handling various actions with indices. Parameters ---------- obj1 : list or tuple or set or 1D ndarray First list from which to determine unique entries. obj2 : list or tuple or set or 1D ndarray Second list from which to determine unique entries, which is joined with the first list. If None, the method will simply return the sorted list of unique elements in obj1. Returns ------- sorted_unique : list List containing the union of unique entries between obj1 and obj2. """ return sorted(set(obj1)) if obj2 is None else sorted(set(obj1).union(obj2))
[docs]def _sorted_unique_intersection(obj1, obj2=None): """ Determine the intersection of unique elements from two lists. Convenience function for handling various actions with indices. Parameters ---------- obj1 : list or tuple or set or 1D ndarray First list from which to determine unique entries. obj2 : list or tuple or set or 1D ndarray Second list from which to determine unique entries, which is intersected with the first list. If None, the method will simply return the sorted list of unique elements in obj1. Returns ------- sorted_unique : list List containing the intersection of unique entries between obj1 and obj2. """ return sorted(set(obj1)) if obj2 is None else sorted(set(obj1).intersection(obj2))
[docs]def _sorted_unique_difference(obj1, obj2=None): """ Determine the difference of unique elements from two lists. Convenience function for handling various actions with indices. Parameters ---------- obj1 : list or tuple or set or 1D ndarray First list from which to determine unique entries. obj2 : list or tuple or set or 1D ndarray Second list from which to determine unique entries, which is differenced with the first list. If None, the method will simply return the sorted list of unique elements in obj1. Returns ------- sorted_unique : list List containing the difference in unique entries between obj1 and obj2. """ return sorted(set(obj1)) if obj2 is None else sorted(set(obj1).difference(obj2))
[docs]def _strict_raise( err_msg: str, strict: (bool | None), err_type=ValueError, warn_type=UserWarning ): """ Determine whether to raise a warning or an error. Parameters ---------- err_msg : str Message to pass along with the warning/error. strict : bool | None If True, raise an error. If False, raise a warning. If None, no message is raised at all (warning is silenced). err_type : Exception Type of error to raise if `strict=True`. Default is ValueError. warn_type : Warning Type of warning to raise if `strict=False`. Default is UserWarning. """ if strict: raise err_type(err_msg) elif strict is not None: warnings.warn(err_msg, warn_type)
[docs]def _eval_inds(inds, nrecs, name="inds", invert=False, strict=True): """ Determine if indices are outside of the expected range. Parameters ---------- inds : array-like of int Indices to check. nrecs : int Number of records in the underlying array. name : str Name of underlying array, default is "inds". invert : bool If False, inds are treated as the positions in the array that should be preserved, but if True, those positions are discarded instead. Default is False. strict : bool If True, raise an error. If False, raise a warning. Returns ------- inds : ndarray of int Array of well-conditioned, sorted index values (whose value will be within the range of [0, nrecs - 1]). """ if inds is None: return None inds = np.asarray(inds).flatten() mask = np.full(nrecs, invert, dtype=bool) if len(inds) > 0: fix_inds = False if max(inds) >= nrecs: _strict_raise(f"{name} contains indices that are too large", strict=strict) fix_inds = True if min(inds) < 0: _strict_raise(f"{name} contains indices that are negative", strict=strict) fix_inds = True if fix_inds: inds = [i for i in inds if ((i >= 0) and (i < nrecs))] mask[inds] = not invert return np.nonzero(mask)[0]
[docs]def _where_combine(mask, inds=None, invert=False, use_and=True): """ Combine masked array with an existing index list. Parameters ---------- mask : array-like of bool Array that marks whether or not entries meet matching criteria. inds : array-like of int or None Existing list of index positions that meet matching criteria. Can be None, in which case only mask is evaluated. invert : bool If False, then indices where mask == True are returned. But if set to True, indices where mask == False are returned instead. Default is False. use_and : bool If True, then what is returned is the intersection of value derived from both mask and inds. If False, then the union of mask and inds is returned instead. Default is True. Returns ------- new_inds : ndarray of int Index positions which meet the selection criterion recorded in mask and inds. """ eval_func = np.logical_and if use_and else np.logical_or if inds is not None: postmask = np.full(len(mask), invert, dtype=bool) postmask[inds] = not invert mask = eval_func(mask, postmask) return np.nonzero(np.logical_not(mask) if invert else mask)[0]
[docs]def _nants_to_nblts(uvd): """ Obtain indices to convert (Nants,) to (Nblts,). Parameters ---------- uvd : UVData object Returns ------- ind1, ind2 : ndarray, ndarray index pairs to compose (Nblts,) shaped arrays for each baseline from an (Nants,) shaped array """ ant_map = {ant: idx for idx, ant in enumerate(uvd.telescope.antenna_numbers)} ind1 = [ant_map[ant] for ant in uvd.ant_1_array] ind2 = [ant_map[ant] for ant in uvd.ant_2_array] return np.asarray(ind1), np.asarray(ind2)
[docs]def _ntimes_to_nblts(uvd): """ Obtain indices to convert (Ntimes,) to (Nblts,). Parameters ---------- uvd : UVData object UVData object Returns ------- inds : ndarray Indices that, when applied to an array of shape (Ntimes,), correctly convert it to shape (Nblts,) """ unique_t = np.unique(uvd.time_array) t = uvd.time_array inds = [] for i in t: inds.append(np.where(unique_t == i)[0][0]) return np.asarray(inds)
[docs]def float_int_to_str_array( *, fltarr: FloatArray, intarr: IntArray, flt_tol: tuple[float, float], flt_first: bool = True, ) -> StrArray: """ Create a string array built from float and integer arrays for matching. Parameters ---------- fltarr : np.ndarray of float float array to be used in output string array intarr : np.ndarray of int integer array to be used in output string array flt_tol : 2-tuple of float Absolute tolerance to use in formatting the floats as strings. Note that this is converted to a decimal place for print formatting, so the precision might be slightly higher. flt_first : bool Whether to put the float first in the out put string or not (if False the int comes first.) Returns ------- np.ndarray of str String array that combines the float and integer values, useful for matching. Examples -------- >>> float_int_to_str_array(fltarr=[np.pi, np.pi/2], intarr=[1, 2], flt_tol=.01) array(['3.14_00000001', '1.57_00000002'], dtype='<U13') >>> float_int_to_str_array( ... fltarr=[np.pi, np.pi/2], intarr=[1, 2], flt_tol=.001, flt_first=False ... ) array(['00000001_3.142', '00000002_1.571'], dtype='<U14') """ prec_flt = -1 * np.floor(np.log10(flt_tol)).astype(int) prec_int = 8 flt_str_list = ["{1:.{0}f}".format(prec_flt, flt) for flt in fltarr] int_str_list = [str(intv).zfill(prec_int) for intv in intarr] list_of_lists = [] if flt_first: list_of_lists = [flt_str_list, int_str_list] else: list_of_lists = [int_str_list, flt_str_list] return np.array(["_".join(zpval) for zpval in zip(*list_of_lists, strict=True)])