"""
Core tools for creating and analyzing Mapper graphs.
The Mapper algorithm is a method for exploring the shape and structure of
high-dimensional datasets, by constructing a graph representation called Mapper
graph. The algorithm has three main steps:
1. **Filtering**: Apply a lens function (also called filter) to map the data
points to a lower-dimensional space, such as a scalar value or a 2D plane.
2. **Covering**: Arrange the lens space into overlapping open sets, using a
cover algorithm such as uniform intervals or balls.
3. **Clustering**: Group the data points in each open set into clusters, using
a clustering algorithm such as single-linkage or DBSCAN.
The Mapper graph consists of nodes that represent clusters of data
points, and edges that connect overlapping clusters (clusters obtained from
different open sets can possibly overlap). For more details on the Mapper
algorithm and its applications, see
Gurjeet Singh, Facundo Mémoli and Gunnar Carlsson, "Topological Methods for
the Analysis of High Dimensional Data Sets and 3D Object Recognition",
Eurographics Symposium on Point-Based Graphics, 2007.
This module provides the class :class:`tdamapper.core.MapperAlgorithm`, which
encapsulates the algorithm and its parameters. The Mapper graph produced by
this module is a NetworkX graph object.
"""
from __future__ import annotations
import logging
from typing import Any, Callable, Generic, Iterator, Optional, TypeVar
import networkx as nx
from joblib import Parallel, delayed
from tdamapper._common import ParamsMixin, clone
from tdamapper.protocols import ArrayRead, Clustering, Cover, SpatialSearch
from tdamapper.utils.unionfind import UnionFind
ATTR_IDS = "ids"
ATTR_SIZE = "size"
_logger = logging.getLogger(__name__)
logging.basicConfig(
format="%(asctime)s %(module)s %(levelname)s: %(message)s",
datefmt="%m/%d/%Y %I:%M:%S %p",
level=logging.INFO,
handlers=[logging.StreamHandler()],
)
S = TypeVar("S")
T = TypeVar("T")
[docs]
def mapper_labels(
X: ArrayRead[S],
y: ArrayRead[T],
cover: Cover[T],
clustering: Clustering[S],
n_jobs: int = 1,
) -> list[list[int]]:
"""
Identify the nodes of the Mapper graph.
The function first covers the lens space with overlapping sets, using the
cover algorithm provided. Then, for each set, it clusters the points of the
dataset that have lens values within that set, using the clustering
algorithm provided. The clusters are then labeled with unique integers,
starting from zero for each set. The function then adds an offset to the
cluster labels, such that the labels are distinct across all sets. The
offset is equal to the maximum label of the previous set plus one.
The function returns a list of node labels for each point in the dataset.
The list at position i contains the labels of the nodes that the point at
position i belongs to. The labels are sorted in ascending order, and there
are no duplicates. If i < j, the labels at position i are strictly less
than those at position j.
:param X: A dataset of n points.
:param y: Lens values for the n points of the dataset.
:param cover: A cover algorithm.
:param clustering: A clustering algorithm.
:param n_jobs: The maximum number of parallel clustering jobs. This
parameter is passed to the constructor of :class:`joblib.Parallel`.
:return: A list of node labels for each point in the dataset.
"""
def _run_clustering(
local_ids: list[int], X_local: ArrayRead[S], clust: Clustering[S]
) -> tuple[list[int], list[int]]:
local_lbls = clust.fit(X_local).labels_
return local_ids, local_lbls
_lbls = Parallel(n_jobs, prefer="threads")(
delayed(_run_clustering)(
local_ids, [X[j] for j in local_ids], clone(clustering)
)
for local_ids in cover.apply(y)
)
itm_lbls: list[list[int]] = [[] for _ in X]
max_lbl = 0
for local_ids, local_lbls in _lbls:
max_local_lbl = 0
for local_id, local_lbl in zip(local_ids, local_lbls):
if local_lbl >= 0:
itm_lbls[local_id].append(max_lbl + local_lbl)
if local_lbl > max_local_lbl:
max_local_lbl = local_lbl
max_lbl += max_local_lbl + 1
return itm_lbls
[docs]
def mapper_connected_components(
X: ArrayRead[S],
y: ArrayRead[T],
cover: Cover[T],
clustering: Clustering[S],
n_jobs: int = 1,
) -> list[int]:
"""
Identify the connected components of the Mapper graph.
A connected component is a maximal set of nodes that are reachable from
each other by following the edges. This function assigns a unique integer
label to each point in the dataset, based on the connected component of
the Mapper graph that it belongs to.
This function uses a union-find data structure to efficiently keep track of
the connected components as it scans the points of the dataset. This
approach should be faster than computing the Mapper graph by first calling
:func:`tdamapper.core.mapper_graph` and then calling
:func:`networkx.connected_components` on it.
:param X: A dataset of n points.
:param y: Lens values for the n points of the dataset.
:param cover: A cover algorithm.
:param clustering: The clustering algorithm to apply to each subset of the
dataset.
:param n_jobs: The maximum number of parallel clustering jobs. This
parameter is passed to the constructor of :class:`joblib.Parallel`.
:return: A list of labels. The label at position i identifies the connected
component of the point at position i in the dataset.
"""
itm_lbls = mapper_labels(X, y, cover, clustering, n_jobs=n_jobs)
label_values = set()
for lbls in itm_lbls:
label_values.update(lbls)
uf = UnionFind(label_values)
for lbls in itm_lbls:
if len(lbls) > 1:
for first, second in zip(lbls, lbls[1:]):
uf.union(first, second)
labels = [-1 for _ in X]
for i, lbls in enumerate(itm_lbls):
# assign -1 to noise points
root = uf.find(lbls[0]) if lbls else -1
labels[i] = root
return labels
[docs]
def mapper_graph(
X: ArrayRead[S],
y: ArrayRead[T],
cover: Cover[T],
clustering: Clustering[S],
n_jobs: int = 1,
) -> nx.Graph:
"""
Create the Mapper graph.
This function first identifies the unique cluster labels that each point of
the dataset belongs to. These labels are used to identify the nodes of the
Mapper graph. Then the edges of the Mapper graph are created by checking if
any two nodes share some points in their corresponding clusters.
This function return the Mapper graph as an object of type
:class:`networkx.Graph`. Each node has an attribute 'size' that stores the
number of points contained in its corresponding cluster, and an attribute
'ids' that stores the indices of the points in the dataset that are
contained in the cluster.
:param X: A dataset of n points.
:param y: Lens values for the n points of the dataset.
:param cover: A cover algorithm.
:param clustering: The clustering algorithm to apply to each subset of the
dataset.
:param n_jobs: The maximum number of parallel clustering jobs. This
parameter is passed to the constructor of :class:`joblib.Parallel`.
:return: The Mapper graph.
"""
itm_lbls = mapper_labels(X, y, cover, clustering, n_jobs=n_jobs)
graph: nx.Graph = nx.Graph()
for n, lbls in enumerate(itm_lbls):
for lbl in lbls:
if not graph.has_node(lbl):
graph.add_node(lbl, **{ATTR_SIZE: 0, ATTR_IDS: []})
graph.nodes[lbl][ATTR_SIZE] += 1
graph.nodes[lbl][ATTR_IDS].append(n)
for lbls in itm_lbls:
lbls_len = len(lbls)
for i in range(lbls_len):
source_lbl = lbls[i]
for j in range(i + 1, lbls_len):
target_lbl = lbls[j]
if target_lbl not in graph[source_lbl]:
graph.add_edge(source_lbl, target_lbl)
return graph
[docs]
def aggregate_graph(
X: ArrayRead[S], graph: nx.Graph, agg: Callable[..., Any]
) -> dict[int, Any]:
"""
Apply an aggregation function to the nodes of a graph.
This function takes a dataset and a graph, and computes an aggregation
value for each node of the graph, based on the data points that are
associated with that node. The aggregation function can be any callable
that takes a list of values and returns a single value, such as `sum`,
`mean`, `max`, `min`, etc.
The function returns a dictionary that maps each node of the graph to its
aggregation value. The keys of the dictionary are the nodes of the graph,
and the values are the aggregation values.
:param X: A dataset of n points.
:param graph: The graph to apply the aggregation function to.
:param agg: The aggregation function to use.
:return: A dictionary of node-aggregation pairs.
"""
agg_values = {}
for node_id in graph.nodes:
node_values = [X[i] for i in graph.nodes[node_id][ATTR_IDS]]
agg_value = agg(node_values)
agg_values[node_id] = agg_value
return agg_values
[docs]
def proximity_net(search: SpatialSearch[S], X: ArrayRead[S]) -> Iterator[list[int]]:
"""
Covers the dataset using proximity-net.
This function applies an iterative algorithm to create the
proximity-net. It picks an arbitrary point and forms an open cover
calling the proximity function on the chosen point. The points
contained in the open cover are then marked as covered, and discarded
in the following steps. The procedure is repeated on the leftover
points until every point is eventually covered.
This function returns a generator that yields each element of the
proximity-net as a list of ids. The ids are the indices of the points
in the original dataset.
:param X: A dataset of n points.
:return: A generator of lists of ids.
"""
covered_ids = set()
search.fit(X)
for i, xi in enumerate(X):
if i not in covered_ids:
neigh_ids = search.search(xi)
covered_ids.update(neigh_ids)
if neigh_ids:
yield neigh_ids
[docs]
class TrivialCover(ParamsMixin, Generic[T]):
"""
Cover algorithm that covers data with a single subset containing the whole
dataset.
This class creates a single open set that contains all the points in the
dataset.
"""
[docs]
def fit(self, X: ArrayRead[T]) -> TrivialCover[T]:
"""
Fit the cover algorithm to the data.
:param X: A dataset of n points. Ignored.
:return: self
"""
return self
[docs]
def apply(self, X: ArrayRead[T]) -> Iterator[list[int]]:
"""
Covers the dataset with a single open set.
:param X: A dataset of n points.
:return: A generator of lists of ids.
"""
if len(X) > 0:
yield list(range(0, len(X)))
[docs]
class FailSafeClustering(ParamsMixin, Generic[T]):
"""
A delegating clustering algorithm that prevents failure.
This class wraps a clustering algorithm and handles any exceptions that may
occur during the fitting process. If the clustering algorithm fails,
instead of throwing an exception, a single cluster containing all points is
returned. This can be useful for robustness and debugging purposes.
:param clustering: A clustering algorithm to delegate to.
:param verbose: A flag to log clustering exceptions. Set to True to
enable logging, or False to suppress it.
"""
_clustering: Optional[Clustering[T]]
_verbose: bool
labels_: list[int]
def __init__(
self, clustering: Optional[Clustering[T]] = None, verbose: bool = True
) -> None:
self.clustering = clustering
self.verbose = verbose
[docs]
def fit(
self, X: ArrayRead[T], y: Optional[ArrayRead[T]] = None
) -> FailSafeClustering[T]:
"""
Fit the clustering algorithm to the data.
If the clustering algorithm fails, a single cluster containing all
points is returned.
:param X: A dataset of n points.
:param y: Ignored.
:return: self
"""
self._clustering = (
TrivialClustering() if self.clustering is None else self.clustering
)
self._verbose = self.verbose
try:
self._clustering.fit(X, y)
self.labels_ = self._clustering.labels_
except ValueError as err:
if self._verbose:
_logger.warning("Unable to perform clustering on local chart: %s", err)
self.labels_ = [0 for _ in X]
return self
[docs]
class TrivialClustering(ParamsMixin, Generic[T]):
"""
A clustering algorithm that returns a single cluster.
This class implements a trivial clustering algorithm that assigns all data
points to the same cluster. It can be used as an argument of the class
:class:`tdamapper.core.MapperAlgorithm` to skip clustering in the
construction of the Mapper graph.
"""
labels_: list[int]
def __init__(self) -> None:
pass
[docs]
def fit(
self, X: ArrayRead[T], y: Optional[ArrayRead[T]] = None
) -> TrivialClustering[T]:
"""
Fit the clustering algorithm to the data.
:param X: A dataset of n points.
:param y: Ignored.
:return: self
"""
self.labels_ = [0 for _ in X]
return self