Source code for tdamapper.cover

"""
Open cover construction for the Mapper algorithm.

An open cover is a collection of open subsets of a dataset whose union spans
the whole dataset. Unlike clustering, open subsets do not need to be disjoint.
Indeed, the overlaps of the open subsets define the edges of the Mapper graph.
"""

from __future__ import annotations

import math
from typing import Any, Callable, Generic, Iterator, Literal, Optional, TypeVar, Union

import numpy as np
from numpy.typing import NDArray

from tdamapper._common import ParamsMixin, warn_user
from tdamapper.core import proximity_net
from tdamapper.protocols import ArrayRead, Metric
from tdamapper.utils.metrics import MetricLiteral, chebyshev, get_metric
from tdamapper.utils.vptree import PivotingStrategy, VPTree, VPTreeKind

T = TypeVar("T")
S = TypeVar("S")

T_contra = TypeVar("T_contra", contravariant=True)
S_contra = TypeVar("S_contra", contravariant=True)

CubicalAlgorithm = Literal["standard", "proximity"]


class _Pullback(Generic[S_contra, T_contra]):
    """
    Pullback pseudo-metric function.

    This class is used to adapt a metric function that operates on a
    transformed space to work with the original space. It applies a function
    to the input data before computing the distance, effectively pulling back
    the metric to the original space.

    Given a function :math:`f: X \to Y` and a metric
    :math:`d: Y \times Y \to \\mathbb{R}`,
    this class defines a new pseudo-metric
    :math:`d': X \times X \to \\mathbb{R}` such that:
    :math:`d'(x_1, x_2) = d(f(x_1), f(x_2))`.

    When :math:`f` is injective, this pseudo-metric :math:`d'` is a true
    metric. If :math:`f` is not injective, it is a pseudo-metric, meaning it
    may not satisfy the identity of two objects :math:`x_1`, :math:`x_2` with
    :math:`d'(x_1, x_2) = 0`.

    :param fun: A function that transforms the input data.
    :param dist: A metric function that operates on the transformed data.
    """

    def __init__(
        self, fun: Callable[[S_contra], T_contra], dist: Metric[T_contra]
    ) -> None:
        self.fun = fun
        self.dist = dist

    def __call__(self, x: S_contra, y: S_contra) -> float:
        """
        Compute the distance between two points in the original space
        using the pullback metric.

        This method applies the transformation function to both points and
        then computes the distance using the provided metric function.

        :param x: A point in the original space.
        :param y: Another point in the original space.
        :return: The distance between the transformed points in the metric space.
        """
        return self.dist(self.fun(x), self.fun(y))


def _snd(x: tuple[T, ...]) -> T:
    """
    Extract the second element from a tuple.
    """
    return x[1]


[docs] class BallCover(ParamsMixin, Generic[T_contra]): """ Cover algorithm based on `ball proximity function`, which covers data with open balls of fixed radius. An open ball is a set of points within a specified distance from a center point. This class maps each point to its corresponding open ball with a fixed radius centered on the point itself. :param radius: The radius of the open balls. Must be a positive value. :param metric: The metric used to define the distance between points. Accepts any value compatible with `tdamapper.utils.metrics.get_metric`. :param metric_params: Additional parameters for the metric function, to be passed to `tdamapper.utils.metrics.get_metric`. :param kind: Specifies whether to use a flat or a hierarchical vantage point tree. :param leaf_capacity: The maximum number of points in a leaf node of the vantage point tree. Must be a positive value. :param leaf_radius: The radius of the leaf nodes. If not specified, it defaults to the value of `radius`. Must be a positive value. :param pivoting: The method used for pivoting in the vantage point tree. """ _radius: float _data: list[tuple[int, T_contra]] _vptree: VPTree[tuple[int, T_contra]] def __init__( self, radius: float = 1.0, metric: Union[MetricLiteral, Metric[Any]] = "euclidean", metric_params: Optional[dict[str, Any]] = None, kind: VPTreeKind = "flat", leaf_capacity: int = 1, leaf_radius: Optional[float] = None, pivoting: PivotingStrategy = "disabled", ) -> None: self.radius = radius self.metric = metric self.metric_params = metric_params self.kind = kind self.leaf_capacity = leaf_capacity self.leaf_radius = leaf_radius self.pivoting = pivoting
[docs] def fit(self, X: ArrayRead[T_contra]) -> BallCover[T_contra]: """ Train internal parameters. This method creates a vptree on the dataset in order to perform fast range queries in the func:`tdamapper.cover.BallCover.search` method. :param X: A dataset of n points. :return: The object itself. """ metric = get_metric(self.metric, **(self.metric_params or {})) self._radius = self.radius self._data = list(enumerate(X)) self._vptree = VPTree( self._data, metric=_Pullback(_snd, metric), kind=self.kind, leaf_capacity=self.leaf_capacity, leaf_radius=self.leaf_radius or self.radius, pivoting=self.pivoting, ) return self
[docs] def search(self, x: T_contra) -> list[int]: """ Return a list of neighbors for the query point. This method uses the internal vptree to perform fast range queries. :param x: A query point for which we want to find neighbors. :return: The indices of the neighbors contained in the dataset. """ if self._vptree is None: return [] neighs = self._vptree.ball_search( (-1, x), self._radius, inclusive=False, ) return [x for (x, _) in neighs]
[docs] def apply(self, X: ArrayRead[T_contra]) -> Iterator[list[int]]: """ Covers the dataset using proximity-net. This function returns a generator that yields each element of the open cover as a list of ids. The ids are the indices of the points in the original dataset. :param X: A dataset of n points. :return: A generator of lists of ids. """ return proximity_net(self, X)
[docs] class KNNCover(ParamsMixin, Generic[T_contra]): """ Cover algorithm based on `KNN proximity function`, which covers data using k-nearest neighbors (KNN). This class maps each point to the set of the k nearest neighbors to the point itself. :param neighbors: The number of neighbors to use for the KNN Proximity function, must be positive and less than the length of the dataset. :param metric: The metric used to define the distance between points. Accepts any value compatible with `tdamapper.utils.metrics.get_metric`. :param metric_params: Additional parameters for the metric function, to be passed to `tdamapper.utils.metrics.get_metric`. :param kind: Specifies whether to use a flat or a hierarchical vantage point tree. :param leaf_capacity: The maximum number of points in a leaf node of the vantage point tree. If not specified, it defaults to the value of `neighbors`. Must be a positive value. :param leaf_radius: The radius of the leaf nodes. Must be a positive value. :param pivoting: The method used for pivoting in the vantage point tree. """ _neighbors: int _data: list[tuple[int, T_contra]] _vptree: VPTree[tuple[int, T_contra]] def __init__( self, neighbors: int = 1, metric: Union[MetricLiteral, Metric[Any]] = "euclidean", metric_params: Optional[dict[str, Any]] = None, kind: VPTreeKind = "flat", leaf_capacity: Optional[int] = None, leaf_radius: float = 0.0, pivoting: PivotingStrategy = "disabled", ) -> None: self.neighbors = neighbors self.metric = metric self.metric_params = metric_params self.kind = kind self.leaf_capacity = leaf_capacity self.leaf_radius = leaf_radius self.pivoting = pivoting
[docs] def fit(self, X: ArrayRead[T_contra]) -> KNNCover[T_contra]: """ Train internal parameters. This method creates a vptree on the dataset in order to perform fast KNN queries in the func:`tdamapper.cover.BallCover.search` method. :param X: A dataset of n points. :return: The object itself. """ metric = get_metric(self.metric, **(self.metric_params or {})) self._neighbors = self.neighbors self._data = list(enumerate(X)) self._vptree = VPTree( self._data, metric=_Pullback(_snd, metric), kind=self.kind, leaf_capacity=self.leaf_capacity or self.neighbors, leaf_radius=self.leaf_radius, pivoting=self.pivoting, ) return self
[docs] def search(self, x: T_contra) -> list[int]: """ Return a list of neighbors for the query point. This method queries the internal vptree in order to perform fast KNN queries. :param x: A query point for which we want to find neighbors. :return: The indices of the neighbors contained in the dataset. """ if self._vptree is None: return [] neighs = self._vptree.knn_search((-1, x), self._neighbors) return [x for (x, _) in neighs]
[docs] def apply(self, X: ArrayRead[T_contra]) -> Iterator[list[int]]: """ Covers the dataset using proximity-net. This function returns a generator that yields each element of the open cover as a list of ids. The ids are the indices of the points in the original dataset. :param X: A dataset of n points. :return: A generator of lists of ids. """ return proximity_net(self, X)
[docs] class BaseCubicalCover: """ Base class for cubical cover algorithms, which cover data with open hypercubes of uniform size and overlap. This class provides the basic functionality for cubical covers, including the initialization of parameters and the methods for computing the center of a hypercube and its overlap. A hypercube is a multidimensional generalization of a square or a cube. The size and overlap of the hypercubes are determined by the number of intervals and the overlap fraction parameters. This class maps each point to the hypercube with the nearest center. :param n_intervals: The number of intervals to use for each dimension. Must be positive and less than or equal to the length of the dataset. :param overlap_frac: The fraction of overlap between adjacent intervals on each dimension, must be in the range (0.0, 0.5]. If not specified, the overlap_frac is computed such that the volume of the overlap within each hypercube is half the total volume. :param kind: Specifies whether to use a flat or a hierarchical vantage point tree. :param leaf_capacity: The maximum number of points in a leaf node of the vantage point tree. Must be a positive value. :param leaf_radius: The radius of the leaf nodes. If not specified, it defaults to the value of `radius`. Must be a positive value. :param pivoting: The method used for pivoting in the vantage point tree. """ _overlap_frac: float _n_intervals: int _min: NDArray[np.float_] _max: NDArray[np.float_] _delta: NDArray[np.float_] _cover: BallCover[NDArray[np.float_]] def __init__( self, n_intervals: int = 1, overlap_frac: Optional[float] = None, kind: VPTreeKind = "flat", leaf_capacity: int = 1, leaf_radius: Optional[float] = None, pivoting: PivotingStrategy = "disabled", ) -> None: self.n_intervals = n_intervals self.overlap_frac = overlap_frac self.kind = kind self.leaf_capacity = leaf_capacity self.leaf_radius = leaf_radius self.pivoting = pivoting def _get_center( self, x: NDArray[np.float_] ) -> tuple[tuple[float], NDArray[np.float_]]: offset = self._offset(x) center = self._phi(x) return tuple(offset), center def _get_overlap_frac(self, dim: int, overlap_vol_frac: float) -> float: beta = math.pow(1.0 - overlap_vol_frac, 1.0 / dim) return 1.0 - 1.0 / (2.0 - beta) def _offset(self, x: NDArray[np.float_]) -> NDArray[np.float_]: return np.minimum(self._n_intervals - 1, np.floor(self._gamma_n(x))) def _phi(self, x: NDArray[np.float_]) -> NDArray[np.float_]: offset = self._offset(x) return self._gamma_n_inv(0.5 + offset) def _gamma_n(self, x: NDArray[np.float_]) -> NDArray[np.float_]: return self._n_intervals * (x - self._min) / self._delta def _gamma_n_inv(self, x: NDArray[np.float_]) -> NDArray[np.float_]: return self._min + self._delta * x / self._n_intervals def _get_bounds( self, X: NDArray[np.float_] ) -> tuple[NDArray[np.float_], NDArray[np.float_], NDArray[np.float_]]: if (X is None) or len(X) == 0: raise ValueError("The dataset is empty.") _min, _max = X[0], X[0] eps = np.finfo(np.float64).eps _min = np.min(X, axis=0) _max = np.max(X, axis=0) _delta = _max - _min _delta[(_delta >= -eps) & (_delta <= eps)] = self._n_intervals return _min, _max, _delta
[docs] def fit(self, X: ArrayRead[NDArray[np.float_]]) -> BaseCubicalCover: """ Train internal parameters. This method builds an internal :class:`tdamapper.cover.BallCover` attribute that allows efficient queries of the dataset. :param X: A dataset of n points. :return: The object itself. """ if len(X) == 0: return self X_ = np.asarray(X).reshape(len(X), -1).astype(float) if self.overlap_frac is None: dim = 1 if X_.ndim == 1 else X_.shape[1] self._overlap_frac = self._get_overlap_frac(dim, 0.5) else: self._overlap_frac = self.overlap_frac self._n_intervals = self.n_intervals if self._overlap_frac <= 0.0: raise ValueError("The parameter overlap_frac is expected to be > 0.0") if self._overlap_frac > 0.5: warn_user("The parameter overlap_frac is expected to be <= 0.5") self._min, self._max, self._delta = self._get_bounds(X_) radius = 1.0 / (2.0 - 2.0 * self._overlap_frac) self._cover = BallCover( radius, metric=_Pullback(self._gamma_n, chebyshev()), kind=self.kind, leaf_capacity=self.leaf_capacity, leaf_radius=self.leaf_radius, pivoting=self.pivoting, ) self._cover.fit(X_) return self
[docs] def search(self, x: NDArray[np.float_]) -> list[int]: """ Return a list of neighbors for the query point. This method takes a target point as input and returns the hypercube whose center is closest to the target point. :param x: A query point for which we want to find neighbors. :return: The indices of the neighbors contained in the dataset. """ center = self._phi(x) return self._cover.search(center)
[docs] class ProximityCubicalCover(BaseCubicalCover, ParamsMixin): """ Cover algorithm based on the `cubical proximity function`, which covers data with open hypercubes of uniform size and overlap. The cubical cover is obtained by selecting a subsect of all the hypercubes that intersect the dataset using proximity net (see :class:`tdamapper.core.Proximity`). For an open cover containing all the hypercubes interecting the dataset use :class:`tdamapper.core.StandardCubicalCover`. A hypercube is a multidimensional generalization of a square or a cube. The size and overlap of the hypercubes are determined by the number of intervals and the overlap fraction parameters. This class maps each point to the hypercube with the nearest center. :param n_intervals: The number of intervals to use for each dimension. Must be positive and less than or equal to the length of the dataset. :param overlap_frac: The fraction of overlap between adjacent intervals on each dimension, must be in the range (0.0, 0.5]. If not specified, the overlap_frac is computed such that the volume of the overlap within each hypercube is half the total volume. :param kind: Specifies whether to use a flat or a hierarchical vantage point tree. :param leaf_capacity: The maximum number of points in a leaf node of the vantage point tree. Must be a positive value. :param leaf_radius: The radius of the leaf nodes. If not specified, it defaults to the value of `radius`. Must be a positive value. :param pivoting: The method used for pivoting in the vantage point tree. """ def __init__( self, n_intervals: int = 1, overlap_frac: Optional[float] = None, kind: VPTreeKind = "flat", leaf_capacity: int = 1, leaf_radius: Optional[float] = None, pivoting: PivotingStrategy = "disabled", ) -> None: super().__init__( n_intervals=n_intervals, overlap_frac=overlap_frac, kind=kind, leaf_capacity=leaf_capacity, leaf_radius=leaf_radius, pivoting=pivoting, )
[docs] def apply(self, X: ArrayRead[NDArray[np.float_]]) -> Iterator[list[int]]: """ Covers the dataset using proximity-net. This function returns a generator that yields each element of the open cover as a list of ids. The ids are the indices of the points in the original dataset. :param X: A dataset of n points. :return: A generator of lists of ids. """ return proximity_net(self, X)
[docs] class StandardCubicalCover(BaseCubicalCover, ParamsMixin): """ Cover algorithm based on the standard open cover, which covers data with open hypercubes of uniform size and overlap. The standard cover is obtained by selecting all the hypercubes that intersect the dataset. A hypercube is a multidimensional generalization of a square or a cube. The size and overlap of the hypercubes are determined by the number of intervals and the overlap fraction parameters. This class maps each point to the hypercube with the nearest center. :param n_intervals: The number of intervals to use for each dimension. Must be positive and less than or equal to the length of the dataset. :param overlap_frac: The fraction of overlap between adjacent intervals on each dimension, must be in the range (0.0, 0.5]. If not specified, the overlap_frac is computed such that the volume of the overlap within each hypercube is half the total volume. :param kind: Specifies whether to use a flat or a hierarchical vantage point tree. :param leaf_capacity: The maximum number of points in a leaf node of the vantage point tree. Must be a positive value. :param leaf_radius: The radius of the leaf nodes. If not specified, it defaults to the value of `radius`. Must be a positive value. :param pivoting: The method used for pivoting in the vantage point tree. """ def __init__( self, n_intervals: int = 1, overlap_frac: Optional[float] = None, kind: VPTreeKind = "flat", leaf_capacity: int = 1, leaf_radius: Optional[float] = None, pivoting: PivotingStrategy = "disabled", ) -> None: super().__init__( n_intervals=n_intervals, overlap_frac=overlap_frac, kind=kind, leaf_capacity=leaf_capacity, leaf_radius=leaf_radius, pivoting=pivoting, ) def _landmarks( self, X: ArrayRead[NDArray[np.float_]] ) -> dict[tuple[float], NDArray[np.float_]]: lmrks = {} for x in X: lmrk, _ = self._get_center(x) if lmrk not in lmrks: lmrks[lmrk] = x return lmrks
[docs] def apply(self, X: ArrayRead[NDArray[np.float_]]) -> Iterator[list[int]]: """ Covers the dataset using landmarks. This function yields all the hypercubes intersecting the dataset. This function returns a generator that yields each element of the open cover as a list of ids. The ids are the indices of the points in the original dataset. :param X: A dataset of n points. :return: A generator of lists of ids. """ self.fit(X) lmrks_to_cover = self._landmarks(X) while lmrks_to_cover: _, x = lmrks_to_cover.popitem() neigh_ids = self.search(x) if neigh_ids: yield neigh_ids
[docs] class CubicalCover(ParamsMixin): """ Wrapper class for cubical cover algorithms, which cover data with open hypercubes of uniform size and overlap. This class delegates its methods to either :class:`tdamapper.cover.StandardCubicalCover` or :class:`tdamapper.cover.ProximityCubicalCover`, based on the `algorithm` parameter. A hypercube is a multidimensional generalization of a square or a cube. The size and overlap of the hypercubes are determined by the number of intervals and the overlap fraction parameters. :param n_intervals: The number of intervals to use for each dimension. Must be positive and less than or equal to the length of the dataset. :param overlap_frac: The fraction of overlap between adjacent intervals on each dimension, must be in the range (0.0, 0.5]. If not specified, the overlap_frac is computed such that the volume of the overlap within each hypercube is half the total volume. :param algorithm: Specifies whether to use standard cubical cover, as in :class:`tdamapper.cover.StandardCubicalCover` or proximity cubical cover, as in :class:`tdamapper.cover.ProximityCubicalCover`. :param kind: Specifies whether to use a flat or a hierarchical vantage point tree. :param leaf_capacity: The maximum number of points in a leaf node of the vantage point tree. Must be a positive value. :param leaf_radius: The radius of the leaf nodes. If not specified, it defaults to the value of `radius`. Must be a positive value. :param pivoting: The method used for pivoting in the vantage point tree. """ _cubical_cover: Union[ProximityCubicalCover, StandardCubicalCover] def __init__( self, n_intervals: int = 1, overlap_frac: Optional[float] = None, algorithm: CubicalAlgorithm = "proximity", kind: VPTreeKind = "flat", leaf_capacity: int = 1, leaf_radius: Optional[float] = None, pivoting: PivotingStrategy = "disabled", ): self.n_intervals = n_intervals self.overlap_frac = overlap_frac self.algorithm = algorithm self.kind = kind self.leaf_capacity = leaf_capacity self.leaf_radius = leaf_radius self.pivoting = pivoting def _get_cubical_cover(self) -> Union[ProximityCubicalCover, StandardCubicalCover]: params: dict[str, Any] = dict( n_intervals=self.n_intervals, overlap_frac=self.overlap_frac, kind=self.kind, leaf_capacity=self.leaf_capacity, leaf_radius=self.leaf_radius, pivoting=self.pivoting, ) if self.algorithm == "proximity": return ProximityCubicalCover(**params) if self.algorithm == "standard": return StandardCubicalCover(**params) raise ValueError( "The only possible values for algorithm are 'standard' and 'proximity'." )
[docs] def fit(self, X: ArrayRead[NDArray[np.float_]]) -> CubicalCover: """ Train internal parameters. This method delegates to the :func:`fit` method of the internal cubical cover used. :param X: A dataset of n points. :return: The object itself. """ self._cubical_cover = self._get_cubical_cover() self._cubical_cover.fit(X) return self
[docs] def search(self, x: NDArray[np.float_]) -> list[int]: """ Return a list of neighbors for the query point. This method delegates to the `search` method of the internal cubical cover used. :param x: A query point for which we want to find neighbors. :return: The indices of the neighbors contained in the dataset. """ return self._cubical_cover.search(x)
[docs] def apply(self, X: ArrayRead[NDArray[np.float_]]) -> Iterator[list[int]]: """ Covers the dataset using hypercubes. This method delegates to the `apply` method of the internal cubical cover used. :param X: A dataset of n points. :return: A generator of lists of ids. """ self._cubical_cover = self._get_cubical_cover() return self._cubical_cover.apply(X)