# Copyright (c) 2024 Radio Astronomy Software Group
# Licensed under the 2-clause BSD License
"""Utilities for working with redundant baselines."""
import warnings
from copy import deepcopy
import numpy as np
from scipy.spatial.distance import cdist
from .bls import antnums_to_baseline, baseline_index_flip
[docs]def _adj_list(vecs, tol, n_blocks=None):
"""Identify neighbors of each vec in vecs, to distance tol."""
n_items = len(vecs)
max_items = 2**10 # Max array size used is max_items**2. Avoid using > 1 GiB
if n_blocks is None:
n_blocks = max(n_items // max_items, 1)
# We may sort blocks so that some pairs of blocks may be skipped.
# Reorder vectors by x.
order = np.argsort(vecs[:, 0])
blocks = np.array_split(order, n_blocks)
adj = [{k} for k in range(n_items)] # Adjacency lists
for b1 in blocks:
for b2 in blocks:
v1, v2 = vecs[b1], vecs[b2]
# Check for no overlap, with tolerance.
xmin1 = v1[0, 0] - tol
xmax1 = v1[-1, 0] + tol
xmin2 = v2[0, 0] - tol
xmax2 = v2[-1, 0] + tol
if max(xmin1, xmin2) > min(xmax1, xmax2):
continue
adj_mat = cdist(vecs[b1], vecs[b2]) < tol
for bi, col in enumerate(adj_mat):
adj[b1[bi]] = adj[b1[bi]].union(b2[col])
return [frozenset(g) for g in adj]
[docs]def _find_cliques(adj, strict=False):
n_items = len(adj)
loc_gps = []
visited = np.zeros(n_items, dtype=bool)
for k in range(n_items):
if visited[k]:
continue
a0 = adj[k]
visited[k] = True
if all(adj[it].__hash__() == a0.__hash__() for it in a0):
group = list(a0)
group.sort()
visited[list(a0)] = True
loc_gps.append(group)
# Require all adjacency lists to be isolated maximal cliques:
if strict and not all(sorted(st) in loc_gps for st in adj):
raise ValueError("Non-isolated cliques found in graph.")
return loc_gps
[docs]def find_clusters(*, location_ids, location_vectors, tol, strict=False):
"""
Find clusters of vectors (e.g. redundant baselines, times).
Parameters
----------
location_ids : array_like of int
ID labels for locations.
location_vectors : array_like of float
location vectors, can be multidimensional
tol : float
tolerance for clusters
strict : bool
Require that all adjacency lists be isolated maximal cliques.
This ensures that vectors do not fall into multiple clusters.
Default: False
Returns
-------
list of list of location_ids
"""
location_vectors = np.asarray(location_vectors)
location_ids = np.asarray(location_ids)
if location_vectors.ndim == 1:
location_vectors = location_vectors[:, np.newaxis]
adj = _adj_list(location_vectors, tol) # adj = list of sets
loc_gps = _find_cliques(adj, strict=strict)
loc_gps = [np.sort(location_ids[gp]).tolist() for gp in loc_gps]
return loc_gps
[docs]def find_clusters_grid(location_ids, location_vectors, tol=1.0):
"""
Find redundant groups using a gridding algorithm developed by the HERA team.
This is essentially a gridding approach, but it only keeps track of the grid
points that have baselines assigned to them. It iterates through the
baselines and assigns each baseline to a an existing group if it is within
a grid spacing or makes a new group if there is no group. The location of
the group is the baseline vector of the first baseline assigned to it, rounded
to the grid spacing, so the resulting assigned grid point can depend on the
order in which baseline vectors are passed to it. It is possible for a baseline
to be assigned to a group that is up to but strictly less than 4 times the
grid spacing from its true location, so we use a grid a factor of 4 smaller
than the passed tolerance (`tol`). This method is quite robust for regular
arrays if the tolerance is properly specified, but may not behave predictably
for highly non-redundant arrays.
Parameters
----------
baselines : array_like of int
Baseline numbers, shape (Nbls,)
baseline_vecs : array_like of float
Baseline vectors in meters, shape (Nbls, 3).
tol : float
Absolute tolerance of redundancy, in meters.
Returns
-------
baseline_groups : list of lists of int
list of lists of redundant baseline numbers
baseline_ind_conj : list of int
List of baselines that are redundant when reversed. Only returned if
include_conjugates is True
"""
bl_gps = {}
# reduce the grid size to ensure baselines won't be assigned to a group
# more than the tol away from their location. The factor of 4 is a personal
# communication from Josh Dillon who developed this algorithm.
grid_size = tol / 4.0
p_or_m = (0, -1, 1)
epsilons = [[dx, dy, dz] for dx in p_or_m for dy in p_or_m for dz in p_or_m]
def check_neighbors(delta):
# Check to make sure bl_gps doesn't have the key plus or minus rounding error
for epsilon in epsilons:
newKey = (
delta[0] + epsilon[0],
delta[1] + epsilon[1],
delta[2] + epsilon[2],
)
if newKey in bl_gps:
return newKey
return
baseline_ind_conj = []
for bl_i, bl in enumerate(location_ids):
delta = tuple(np.round(location_vectors[bl_i] / grid_size).astype(int))
new_key = check_neighbors(delta)
if new_key is not None:
# this has a match
bl_gps[new_key].append(bl)
else:
# this is a new group
bl_gps[delta] = [bl]
bl_list = [sorted(gv) for gv in bl_gps.values()]
return bl_list, baseline_ind_conj
[docs]def get_baseline_redundancies(
baselines, baseline_vecs, *, tol=1.0, include_conjugates=None, use_grid_alg=True
):
"""
Find redundant baseline groups.
Parameters
----------
baselines : array_like of int
Baseline numbers, shape (Nbls,)
baseline_vecs : array_like of float
Baseline vectors in meters, shape (Nbls, 3).
tol : float
Absolute tolerance of redundancy, in meters.
include_conjugates : bool
Option to include baselines that are redundant under conjugation.
Only used if use_antpos is False. Default is currently False but will
become True in version 3.4.
use_grid_alg : bool
Option to use the gridding based algorithm (developed by the HERA team)
to find redundancies rather than the older clustering algorithm.
Returns
-------
baseline_groups : list of lists of int
list of lists of redundant baseline numbers
vec_bin_centers : list of array_like of float
List of vectors describing redundant group centers
lengths : list of float
List of redundant group baseline lengths in meters
baseline_ind_conj : list of int
List of baselines that are redundant when reversed. Only returned if
include_conjugates is True
"""
Nbls = baselines.shape[0]
if not baseline_vecs.shape == (Nbls, 3):
raise ValueError("Baseline vectors must be shape (Nbls, 3)")
if include_conjugates is None:
warnings.warn(
"The include_conjugates parameter is not set. The default is "
"currently False, which produces different groups than the groups "
"produced when using the `compress_by_redundancy` method. "
"The default will change to True in version 3.4.",
DeprecationWarning,
)
include_conjugates = False
baseline_vecs = deepcopy(baseline_vecs) # Protect the vectors passed in.
if include_conjugates:
conjugates = []
for bv in baseline_vecs:
uneg = bv[0] < -tol
uzer = np.isclose(bv[0], 0.0, atol=tol)
vneg = bv[1] < -tol
vzer = np.isclose(bv[1], 0.0, atol=tol)
wneg = bv[2] < -tol
conjugates.append(uneg or (uzer and vneg) or (uzer and vzer and wneg))
conjugates = np.array(conjugates, dtype=bool)
baseline_vecs[conjugates] *= -1
baseline_ind_conj = baselines[conjugates]
bl_gps, vec_bin_centers, lens = get_baseline_redundancies(
baselines,
baseline_vecs,
tol=tol,
include_conjugates=False,
use_grid_alg=use_grid_alg,
)
return bl_gps, vec_bin_centers, lens, baseline_ind_conj
if use_grid_alg:
output = find_clusters_grid(
location_ids=baselines, location_vectors=baseline_vecs, tol=tol
)
bl_gps, baseline_ind_conj = output
else:
try:
bl_gps = find_clusters(
location_ids=baselines,
location_vectors=baseline_vecs,
tol=tol,
strict=True,
)
except ValueError as exc:
raise ValueError(
"Some baselines are falling into multiple redundant groups. "
"Lower the tolerance to resolve ambiguity or use the gridding "
"based algorithm (developed by the HERA team) to find redundancies "
"by setting use_grid_alg=True."
) from exc
n_unique = len(bl_gps)
vec_bin_centers = np.zeros((n_unique, 3))
for gi, gp in enumerate(bl_gps):
inds = [np.where(i == baselines)[0] for i in gp]
vec_bin_centers[gi] = np.mean(baseline_vecs[inds, :], axis=0)
lens = np.sqrt(np.sum(vec_bin_centers**2, axis=1))
return bl_gps, vec_bin_centers, lens
[docs]def get_antenna_redundancies(
antenna_numbers,
antenna_positions,
*,
tol=1.0,
include_autos=False,
use_grid_alg=True,
):
"""
Find redundant baseline groups based on antenna positions.
Parameters
----------
antenna_numbers : array_like of int
Antenna numbers, shape (Nants,).
antenna_positions : array_like of float
Antenna position vectors in the ENU (topocentric) frame in meters,
shape (Nants, 3).
tol : float
Redundancy tolerance in meters.
include_autos : bool
Option to include autocorrelations.
use_grid_alg : bool
Option to use the gridding based algorithm (developed by the HERA team)
to find redundancies rather than the older clustering algorithm.
Returns
-------
baseline_groups : list of lists of int
list of lists of redundant baseline numbers
vec_bin_centers : list of array_like of float
List of vectors describing redundant group centers
lengths : list of float
List of redundant group baseline lengths in meters
Notes
-----
The baseline numbers refer to antenna pairs (a1, a2) such that
the baseline vector formed from ENU antenna positions,
blvec = enu[a1] - enu[a2]
is close to the other baselines in the group.
This is achieved by putting baselines in a form of the u>0
convention, but with a tolerance in defining the signs of
vector components.
To guarantee that the same baseline numbers are present in a UVData
object, ``UVData.conjugate_bls('u>0', uvw_tol=tol)``, where `tol` is
the tolerance used here.
"""
Nants = antenna_numbers.size
bls = []
bl_vecs = []
for aj in range(Nants):
mini = aj + 1
if include_autos:
mini = aj
for ai in range(mini, Nants):
anti, antj = antenna_numbers[ai], antenna_numbers[aj]
bidx = antnums_to_baseline(antj, anti, Nants_telescope=Nants)
bv = antenna_positions[ai] - antenna_positions[aj]
bl_vecs.append(bv)
bls.append(bidx)
bls = np.array(bls)
bl_vecs = np.array(bl_vecs)
gps, vecs, lens, conjs = get_baseline_redundancies(
bls, bl_vecs, tol=tol, include_conjugates=True, use_grid_alg=use_grid_alg
)
# Flip the baselines in the groups.
for gi, gp in enumerate(gps):
for bi, bl in enumerate(gp):
if bl in conjs:
gps[gi][bi] = baseline_index_flip(bl, Nants_telescope=Nants)
return gps, vecs, lens