Source code for tdamapper.cover

"""
Open cover construction for the Mapper algorithm.

An open cover is a collection of open subsets of a dataset whose union spans
the whole dataset. Unlike clustering, open subsets do not need to be disjoint.
Indeed, the overlaps of the open subsets define the edges of the Mapper graph.
"""

import math
import numpy as np

from tdamapper.core import Cover, Proximity
from tdamapper.utils.metrics import get_metric, chebyshev
from tdamapper.utils.vptree import VPTree

from tdamapper._common import warn_user


class _Pullback:

    def __init__(self, fun, dist):
        self.fun = fun
        self.dist = dist

    def __call__(self, x, y):
        return self.dist(self.fun(x), self.fun(y))


def _snd(x):
    return x[1]


[docs] class BallCover(Proximity): """ Cover algorithm based on `ball proximity function`, which covers data with open balls of fixed radius. An open ball is a set of points within a specified distance from a center point. This class maps each point to its corresponding open ball with a fixed radius centered on the point itself. :param radius: The radius of the open balls. Must be a positive value. Defaults to 1.0. :type radius: float :param metric: The metric used to define the distance between points. Accepts any value compatible with `tdamapper.utils.metrics.get_metric`. Defaults to 'euclidean'. :type metric: str or callable :param metric_params: Additional parameters for the metric function, to be passed to `tdamapper.utils.metrics.get_metric`. Defaults to None. :type metric_params: dict, optional :param kind: Specifies whether to use a flat or a hierarchical vantage point tree. Acceptable values are 'flat' or 'hierarchical'. Defaults to 'flat'. :type kind: str :param leaf_capacity: The maximum number of points in a leaf node of the vantage point tree. Must be a positive value. Defaults to 1. :type leaf_capacity: int :param leaf_radius: The radius of the leaf nodes. If not specified, it defaults to the value of `radius`. Must be a positive value. Defaults to None. :type leaf_radius: float, optional :param pivoting: The method used for pivoting in the vantage point tree. Acceptable values are None, 'random', or 'furthest'. Defaults to None. :type pivoting: str or callable, optional """ def __init__( self, radius=1.0, metric='euclidean', metric_params=None, kind='flat', leaf_capacity=1, leaf_radius=None, pivoting=None, ): self.radius = radius self.metric = metric self.metric_params = metric_params self.kind = kind self.leaf_capacity = leaf_capacity self.leaf_radius = leaf_radius self.pivoting = pivoting
[docs] def fit(self, X): """ Train internal parameters. This method creates a vptree on the dataset in order to perform fast range queries in the func:`tdamapper.cover.BallCover.search` method. :param X: A dataset of n points. :type X: array-like of shape (n, m) or list-like of length n :return: The object itself. :rtype: self """ metric = get_metric(self.metric, **(self.metric_params or {})) self.__radius = self.radius self.__data = list(enumerate(X)) self.__vptree = VPTree( self.__data, metric=_Pullback(_snd, metric), metric_params=None, kind=self.kind, leaf_capacity=self.leaf_capacity, leaf_radius=self.leaf_radius or self.radius, pivoting=self.pivoting, ) return self
[docs] def search(self, x): """ Return a list of neighbors for the query point. This method uses the internal vptree to perform fast range queries. :param x: A query point for which we want to find neighbors. :type x: Any :return: The indices of the neighbors contained in the dataset. :rtype: list[int] """ if self.__vptree is None: return [] neighs = self.__vptree.ball_search( (-1, x), self.__radius, inclusive=False, ) return [x for (x, _) in neighs]
[docs] class KNNCover(Proximity): """ Cover algorithm based on `KNN proximity function`, which covers data using k-nearest neighbors (KNN). This class maps each point to the set of the k nearest neighbors to the point itself. :param neighbors: The number of neighbors to use for the KNN Proximity function, must be positive and less than the length of the dataset. Defaults to 1. :type neighbors: int :param metric: The metric used to define the distance between points. Accepts any value compatible with `tdamapper.utils.metrics.get_metric`. Defaults to 'euclidean'. :type metric: str or callable :param metric_params: Additional parameters for the metric function, to be passed to `tdamapper.utils.metrics.get_metric`. Defaults to None. :type metric_params: dict, optional :param kind: Specifies whether to use a flat or a hierarchical vantage point tree. Acceptable values are 'flat' or 'hierarchical'. Defaults to 'flat'. :type kind: str :param leaf_capacity: The maximum number of points in a leaf node of the vantage point tree. If not specified, it defaults to the value of `neighbors`. Must be a positive value. Defaults to None. :type leaf_capacity: int, optional :param leaf_radius: The radius of the leaf nodes. Must be a positive value. Defaults to 0.0. :type leaf_radius: float :param pivoting: The method used for pivoting in the vantage point tree. Acceptable values are None, 'random', or 'furthest'. Defaults to None. :type pivoting: str or callable, optional """ def __init__( self, neighbors=1, metric='euclidean', metric_params=None, kind='flat', leaf_capacity=None, leaf_radius=0.0, pivoting=None, ): self.neighbors = neighbors self.metric = metric self.metric_params = metric_params self.kind = kind self.leaf_capacity = leaf_capacity self.leaf_radius = leaf_radius self.pivoting = pivoting
[docs] def fit(self, X): """ Train internal parameters. This method creates a vptree on the dataset in order to perform fast KNN queries in the func:`tdamapper.cover.BallCover.search` method. :param X: A dataset of n points. :type X: array-like of shape (n, m) or list-like of length n :return: The object itself. :rtype: self """ metric = get_metric(self.metric, **(self.metric_params or {})) self.__neighbors = self.neighbors self.__data = list(enumerate(X)) self.__vptree = VPTree( self.__data, metric=_Pullback(_snd, metric), metric_params=None, kind=self.kind, leaf_capacity=self.leaf_capacity or self.neighbors, leaf_radius=self.leaf_radius, pivoting=self.pivoting, ) return self
[docs] def search(self, x): """ Return a list of neighbors for the query point. This method queries the internal vptree in order to perform fast KNN queries. :param x: A query point for which we want to find neighbors. :type x: Any :return: The indices of the neighbors contained in the dataset. :rtype: list[int] """ if self.__vptree is None: return [] neighs = self.__vptree.knn_search((-1, x), self.__neighbors) return [x for (x, _) in neighs]
[docs] class BaseCubicalCover: def __init__( self, n_intervals=1, overlap_frac=None, kind='flat', leaf_capacity=1, leaf_radius=None, pivoting=None, ): self.n_intervals = n_intervals self.overlap_frac = overlap_frac self.kind = kind self.leaf_capacity = leaf_capacity self.leaf_radius = leaf_radius self.pivoting = pivoting def _get_center(self, x): offset = self._offset(x) center = self._phi(x) return tuple(offset), center def _get_overlap_frac(self, dim, overlap_vol_frac): beta = math.pow(1.0 - overlap_vol_frac, 1.0 / dim) return 1.0 - 1.0 / (2.0 - beta) def _offset(self, x): return np.minimum(self._n_intervals - 1, np.floor(self._gamma_n(x))) def _phi(self, x): offset = self._offset(x) return self._gamma_n_inv(0.5 + offset) def _gamma_n(self, x): return self._n_intervals * (x - self._min) / self._delta def _gamma_n_inv(self, x): return self._min + self._delta * x / self._n_intervals def _get_bounds(self, X): if (X is None) or len(X) == 0: return _min, _max = X[0], X[0] eps = np.finfo(np.float64).eps _min = np.min(X, axis=0) _max = np.max(X, axis=0) _delta = _max - _min _delta[(_delta >= -eps) & (_delta <= eps)] = self._n_intervals return _min, _max, _delta
[docs] def fit(self, X): """ Train internal parameters. This method builds an internal :class:`tdamapper.cover.BallCover` attribute that allows efficient queries of the dataset. :param X: A dataset of n points. :type X: array-like of shape (n, m) or list-like of length n :return: The object itself. :rtype: self """ X = np.asarray(X).reshape(len(X), -1).astype(float) if self.overlap_frac is None: dim = 1 if X.ndim == 1 else X.shape[1] self._overlap_frac = self._get_overlap_frac(dim, 0.5) else: self._overlap_frac = self.overlap_frac self._n_intervals = self.n_intervals if self._overlap_frac <= 0.0: raise ValueError( 'The parameter overlap_frac is expected to be ' '> 0.0' ) if self._overlap_frac > 0.5: warn_user( 'The parameter overlap_frac is expected to be <= 0.5' ) self._min, self._max, self._delta = self._get_bounds(X) radius = 1.0 / (2.0 - 2.0 * self._overlap_frac) self._cover = BallCover( radius, metric=_Pullback(self._gamma_n, chebyshev()), kind=self.kind, leaf_capacity=self.leaf_capacity, leaf_radius=self.leaf_radius, pivoting=self.pivoting, ) self._cover.fit(X) return self
[docs] def search(self, x): """ Return a list of neighbors for the query point. This method takes a target point as input and returns the hypercube whose center is closest to the target point. :param x: A query point for which we want to find neighbors. :type x: Any :return: The indices of the neighbors contained in the dataset. :rtype: list[int] """ center = self._phi(x) return self._cover.search(center)
[docs] class ProximityCubicalCover(BaseCubicalCover, Proximity): """ Cover algorithm based on the `cubical proximity function`, which covers data with open hypercubes of uniform size and overlap. The cubical cover is obtained by selecting a subsect of all the hypercubes that intersect the dataset using proximity net (see :class:`tdamapper.core.Proximity`). For an open cover containing all the hypercubes interecting the dataset use :class:`tdamapper.core.StandardCubicalCover`. A hypercube is a multidimensional generalization of a square or a cube. The size and overlap of the hypercubes are determined by the number of intervals and the overlap fraction parameters. This class maps each point to the hypercube with the nearest center. :param n_intervals: The number of intervals to use for each dimension. Must be positive and less than or equal to the length of the dataset. Defaults to 1. :type n_intervals: int :param overlap_frac: The fraction of overlap between adjacent intervals on each dimension, must be in the range (0.0, 0.5]. If not specified, the overlap_frac is computed such that the volume of the overlap within each hypercube is half the total volume. Defaults to None. :type overlap_frac: float :param kind: Specifies whether to use a flat or a hierarchical vantage point tree. Acceptable values are 'flat' or 'hierarchical'. Defaults to 'flat'. :type kind: str :param leaf_capacity: The maximum number of points in a leaf node of the vantage point tree. Must be a positive value. Defaults to 1. :type leaf_capacity: int :param leaf_radius: The radius of the leaf nodes. If not specified, it defaults to the value of `radius`. Must be a positive value. Defaults to None. :type leaf_radius: float, optional :param pivoting: The method used for pivoting in the vantage point tree. Acceptable values are None, 'random', or 'furthest'. Defaults to None. :type pivoting: str or callable, optional """ def __init__( self, n_intervals=1, overlap_frac=None, kind='flat', leaf_capacity=1, leaf_radius=None, pivoting=None, ): super().__init__( n_intervals=n_intervals, overlap_frac=overlap_frac, kind=kind, leaf_capacity=leaf_capacity, leaf_radius=leaf_radius, pivoting=pivoting, )
[docs] class StandardCubicalCover(BaseCubicalCover, Cover): """ Cover algorithm based on the standard open cover, which covers data with open hypercubes of uniform size and overlap. The standard cover is obtained by selecting all the hypercubes that intersect the dataset. A hypercube is a multidimensional generalization of a square or a cube. The size and overlap of the hypercubes are determined by the number of intervals and the overlap fraction parameters. This class maps each point to the hypercube with the nearest center. :param n_intervals: The number of intervals to use for each dimension. Must be positive and less than or equal to the length of the dataset. Defaults to 1. :type n_intervals: int :param overlap_frac: The fraction of overlap between adjacent intervals on each dimension, must be in the range (0.0, 0.5]. If not specified, the overlap_frac is computed such that the volume of the overlap within each hypercube is half the total volume. Defaults to None. :type overlap_frac: float :param kind: Specifies whether to use a flat or a hierarchical vantage point tree. Acceptable values are 'flat' or 'hierarchical'. Defaults to 'flat'. :type kind: str :param leaf_capacity: The maximum number of points in a leaf node of the vantage point tree. Must be a positive value. Defaults to 1. :type leaf_capacity: int :param leaf_radius: The radius of the leaf nodes. If not specified, it defaults to the value of `radius`. Must be a positive value. Defaults to None. :type leaf_radius: float, optional :param pivoting: The method used for pivoting in the vantage point tree. Acceptable values are None, 'random', or 'furthest'. Defaults to None. :type pivoting: str or callable, optional """ def __init__( self, n_intervals=1, overlap_frac=None, kind='flat', leaf_capacity=1, leaf_radius=None, pivoting=None, ): super().__init__( n_intervals=n_intervals, overlap_frac=overlap_frac, kind=kind, leaf_capacity=leaf_capacity, leaf_radius=leaf_radius, pivoting=pivoting, ) def _landmarks(self, X): lmrks = {} for x in X: lmrk, center = self._get_center(x) if lmrk not in lmrks: lmrks[lmrk] = x return lmrks
[docs] def apply(self, X): """ Covers the dataset using landmarks. This function yields all the hypercubes intersecting the dataset. This function returns a generator that yields each element of the open cover as a list of ids. The ids are the indices of the points in the original dataset. :param X: A dataset of n points. :type X: array-like of shape (n, m) or list-like of length n :return: A generator of lists of ids. :rtype: generator of lists of ints """ self.fit(X) lmrks_to_cover = self._landmarks(X) while lmrks_to_cover: _, x = lmrks_to_cover.popitem() neigh_ids = self.search(x) if neigh_ids: yield neigh_ids
[docs] class CubicalCover(Cover): """ Wrapper class for cubical cover algorithms, which cover data with open hypercubes of uniform size and overlap. This class delegates its methods to either :class:`tdamapper.cover.StandardCubicalCover` or :class:`tdamapper.cover.ProximityCubicalCover`, based on the `algorithm` parameter. A hypercube is a multidimensional generalization of a square or a cube. The size and overlap of the hypercubes are determined by the number of intervals and the overlap fraction parameters. :param n_intervals: The number of intervals to use for each dimension. Must be positive and less than or equal to the length of the dataset. Defaults to 1. :type n_intervals: int :param overlap_frac: The fraction of overlap between adjacent intervals on each dimension, must be in the range (0.0, 0.5]. If not specified, the overlap_frac is computed such that the volume of the overlap within each hypercube is half the total volume. Defaults to None. :type overlap_frac: float :param algorithm: Specifies whether to use standard cubical cover, as in :class:`tdamapper.cover.StandardCubicalCover` or proximity cubical cover, as in :class:`tdamapper.cover.ProximityCubicalCover`. Acceptable values are 'standard' or 'proximity'. Defaults to 'proximity'. :type algorithm: str :param kind: Specifies whether to use a flat or a hierarchical vantage point tree. Acceptable values are 'flat' or 'hierarchical'. Defaults to 'flat'. :type kind: str :param leaf_capacity: The maximum number of points in a leaf node of the vantage point tree. Must be a positive value. Defaults to 1. :type leaf_capacity: int :param leaf_radius: The radius of the leaf nodes. If not specified, it defaults to the value of `radius`. Must be a positive value. Defaults to None. :type leaf_radius: float, optional :param pivoting: The method used for pivoting in the vantage point tree. Acceptable values are None, 'random', or 'furthest'. Defaults to None. :type pivoting: str or callable, optional """ def __init__( self, n_intervals=1, overlap_frac=None, algorithm='proximity', kind='flat', leaf_capacity=1, leaf_radius=None, pivoting=None, ): self.n_intervals = n_intervals self.overlap_frac = overlap_frac self.algorithm = algorithm self.kind = kind self.leaf_capacity = leaf_capacity self.leaf_radius = leaf_radius self.pivoting = pivoting def _get_cubical_cover(self): params = dict( n_intervals=self.n_intervals, overlap_frac=self.overlap_frac, kind=self.kind, leaf_capacity=self.leaf_capacity, leaf_radius=self.leaf_radius, pivoting=self.pivoting, ) if self.algorithm == 'proximity': return ProximityCubicalCover(**params) elif self.algorithm == 'standard': return StandardCubicalCover(**params) else: raise ValueError( "The only possible values for algorithm are 'standard' and " "'proximity'." )
[docs] def fit(self, X): """ Train internal parameters. This method delegates to the :func:`fit` method of the internal cubical cover used. :param X: A dataset of n points. :type X: array-like of shape (n, m) or list-like of length n :return: The object itself. :rtype: self """ self._cubical_cover = self._get_cubical_cover() self._cubical_cover.fit(X) return self
[docs] def search(self, x): """ Return a list of neighbors for the query point. This method delegates to the `search` method of the internal cubical cover used. :param x: A query point for which we want to find neighbors. :type x: Any :return: The indices of the neighbors contained in the dataset. :rtype: list[int] """ return self._cubical_cover.search(x)
[docs] def apply(self, X): """ Covers the dataset using hypercubes. This method delegates to the `apply` method of the internal cubical cover used. :param X: A dataset of n points. :type X: array-like of shape (n, m) or list-like of length n :return: A generator of lists of ids. :rtype: generator of lists of ints """ self._cubical_cover = self._get_cubical_cover() return self._cubical_cover.apply(X)