"""
Core tools for creating and analyzing Mapper graphs.
The Mapper algorithm is a method for exploring the shape and structure of
high-dimensional datasets, by constructing a graph representation called Mapper
graph. The algorithm has three main steps:
1. **Filtering**: Apply a lens function (also called filter) to map the data
points to a lower-dimensional space, such as a scalar value or a 2D plane.
2. **Covering**: Arrange the lens space into overlapping open sets, using a
cover algorithm such as uniform intervals or balls.
3. **Clustering**: Group the data points in each open set into clusters, using
a clustering algorithm such as single-linkage or DBSCAN.
The Mapper graph consists of nodes that represent clusters of data
points, and edges that connect overlapping clusters (clusters obtained from
different open sets can possibly overlap). For more details on the Mapper
algorithm and its applications, see
Gurjeet Singh, Facundo Mémoli and Gunnar Carlsson, "Topological Methods for
the Analysis of High Dimensional Data Sets and 3D Object Recognition",
Eurographics Symposium on Point-Based Graphics, 2007.
This module provides the class :class:`tdamapper.core.MapperAlgorithm`, which
encapsulates the algorithm and its parameters. The Mapper graph produced by
this module is a NetworkX graph object.
"""
import logging
import networkx as nx
from joblib import Parallel, delayed
from tdamapper._common import EstimatorMixin, ParamsMixin, clone, deprecated
from tdamapper.utils.unionfind import UnionFind
ATTR_IDS = "ids"
ATTR_SIZE = "size"
_logger = logging.getLogger(__name__)
logging.basicConfig(
format="%(asctime)s %(module)s %(levelname)s: %(message)s",
datefmt="%m/%d/%Y %I:%M:%S %p",
level=logging.INFO,
handlers=[logging.StreamHandler()],
)
[docs]
def mapper_labels(X, y, cover, clustering, n_jobs=1):
"""
Identify the nodes of the Mapper graph.
The function first covers the lens space with overlapping sets, using the
cover algorithm provided. Then, for each set, it clusters the points of the
dataset that have lens values within that set, using the clustering
algorithm provided. The clusters are then labeled with unique integers,
starting from zero for each set. The function then adds an offset to the
cluster labels, such that the labels are distinct across all sets. The
offset is equal to the maximum label of the previous set plus one.
The function returns a list of node labels for each point in the dataset.
The list at position i contains the labels of the nodes that the point at
position i belongs to. The labels are sorted in ascending order, and there
are no duplicates. If i < j, the labels at position i are strictly less
than those at position j.
:param X: A dataset of n points.
:type X: array-like of shape (n, m) or list-like of length n
:param y: Lens values for the n points of the dataset.
:type y: array-like of shape (n, k) or list-like of length n
:param cover: A cover algorithm.
:type cover: A class compatible with :class:`tdamapper.core.Cover`
:param clustering: A clustering algorithm.
:type clustering: An estimator compatible with scikit-learn's clustering
interface, typically from :mod:`sklearn.cluster`.
:param n_jobs: The maximum number of parallel clustering jobs. This
parameter is passed to the constructor of :class:`joblib.Parallel`.
Defaults to 1.
:type n_jobs: int
:return: A list of node labels for each point in the dataset.
:rtype: list[list[int]]
"""
def _run_clustering(local_ids, X_local, clust):
local_lbls = clust.fit(X_local).labels_
return local_ids, local_lbls
_lbls = Parallel(n_jobs, prefer="threads")(
delayed(_run_clustering)(
local_ids, [X[j] for j in local_ids], clone(clustering)
)
for local_ids in cover.apply(y)
)
itm_lbls = [[] for _ in X]
max_lbl = 0
for local_ids, local_lbls in _lbls:
max_local_lbl = 0
for local_id, local_lbl in zip(local_ids, local_lbls):
if local_lbl >= 0:
itm_lbls[local_id].append(max_lbl + local_lbl)
if local_lbl > max_local_lbl:
max_local_lbl = local_lbl
max_lbl += max_local_lbl + 1
return itm_lbls
[docs]
def mapper_connected_components(X, y, cover, clustering, n_jobs=1):
"""
Identify the connected components of the Mapper graph.
A connected component is a maximal set of nodes that are reachable from
each other by following the edges. This function assigns a unique integer
label to each point in the dataset, based on the connected component of
the Mapper graph that it belongs to.
This function uses a union-find data structure to efficiently keep track of
the connected components as it scans the points of the dataset. This
approach should be faster than computing the Mapper graph by first calling
:func:`tdamapper.core.mapper_graph` and then calling
:func:`networkx.connected_components` on it.
:param X: A dataset of n points.
:type X: array-like of shape (n, m) or list-like of length n
:param y: Lens values for the n points of the dataset.
:type y: array-like of shape (n, k) or list-like of length n
:param cover: A cover algorithm.
:type cover: A class compatible with :class:`tdamapper.core.Cover`
:param clustering: The clustering algorithm to apply to each subset of the
dataset.
:type clustering: An estimator compatible with scikit-learn's clustering
interface, typically from :mod:`sklearn.cluster`.
:param n_jobs: The maximum number of parallel clustering jobs. This
parameter is passed to the constructor of :class:`joblib.Parallel`.
Defaults to 1.
:type n_jobs: int
:return: A list of labels. The label at position i identifies the connected
component of the point at position i in the dataset.
:rtype: list[int]
"""
itm_lbls = mapper_labels(X, y, cover, clustering, n_jobs=n_jobs)
label_values = set()
for lbls in itm_lbls:
label_values.update(lbls)
uf = UnionFind(label_values)
for lbls in itm_lbls:
if len(lbls) > 1:
for first, second in zip(lbls, lbls[1:]):
uf.union(first, second)
labels = [-1 for _ in X]
for i, lbls in enumerate(itm_lbls):
# assign -1 to noise points
root = uf.find(lbls[0]) if lbls else -1
labels[i] = root
return labels
[docs]
def mapper_graph(X, y, cover, clustering, n_jobs=1):
"""
Create the Mapper graph.
This function first identifies the unique cluster labels that each point of
the dataset belongs to. These labels are used to identify the nodes of the
Mapper graph. Then the edges of the Mapper graph are created by checking if
any two nodes share some points in their corresponding clusters.
This function return the Mapper graph as an object of type
:class:`networkx.Graph`. Each node has an attribute 'size' that stores the
number of points contained in its corresponding cluster, and an attribute
'ids' that stores the indices of the points in the dataset that are
contained in the cluster.
:param X: A dataset of n points.
:type X: array-like of shape (n, m) or list-like of length n
:param y: Lens values for the n points of the dataset.
:type y: array-like of shape (n, k) or list-like of length n
:param cover: A cover algorithm.
:type cover: A class compatible with :class:`tdamapper.core.Cover`
:param clustering: The clustering algorithm to apply to each subset of the
dataset.
:type clustering: An estimator compatible with scikit-learn's clustering
interface, typically from :mod:`sklearn.cluster`.
:param n_jobs: The maximum number of parallel clustering jobs. This
parameter is passed to the constructor of :class:`joblib.Parallel`.
Defaults to 1.
:type n_jobs: int
:return: The Mapper graph.
:rtype: :class:`networkx.Graph`
"""
itm_lbls = mapper_labels(X, y, cover, clustering, n_jobs=n_jobs)
graph = nx.Graph()
for n, lbls in enumerate(itm_lbls):
for lbl in lbls:
if not graph.has_node(lbl):
graph.add_node(lbl, **{ATTR_SIZE: 0, ATTR_IDS: []})
nodes = graph.nodes()
nodes[lbl][ATTR_SIZE] += 1
nodes[lbl][ATTR_IDS].append(n)
for lbls in itm_lbls:
lbls_len = len(lbls)
for i in range(lbls_len):
source_lbl = lbls[i]
for j in range(i + 1, lbls_len):
target_lbl = lbls[j]
if target_lbl not in graph[source_lbl]:
graph.add_edge(source_lbl, target_lbl)
return graph
[docs]
def aggregate_graph(X, graph, agg):
"""
Apply an aggregation function to the nodes of a graph.
This function takes a dataset and a graph, and computes an aggregation
value for each node of the graph, based on the data points that are
associated with that node. The aggregation function can be any callable
that takes a list of values and returns a single value, such as `sum`,
`mean`, `max`, `min`, etc.
The function returns a dictionary that maps each node of the graph to its
aggregation value. The keys of the dictionary are the nodes of the graph,
and the values are the aggregation values.
:param X: A dataset of n points.
:type X: array-like of shape (n, m) or list-like of length n
:param graph: The graph to apply the aggregation function to.
:type graph: :class:`networkx.Graph`.
:param agg: The aggregation function to use.
:type agg: Callable.
:return: A dictionary of node-aggregation pairs.
:rtype: dict
"""
agg_values = {}
nodes = graph.nodes()
for node_id in nodes:
node_values = [X[i] for i in nodes[node_id][ATTR_IDS]]
agg_value = agg(node_values)
agg_values[node_id] = agg_value
return agg_values
[docs]
class Cover(ParamsMixin):
"""
Abstract interface for cover algorithms.
This is a naive implementation. Subclasses should override the methods of
this class to implement more meaningful cover algorithms.
"""
[docs]
def apply(self, X):
"""
Covers the dataset with a single open set.
This is a naive implementation that returns a generator producing a
single list containing all the ids if the original dataset. This
method should be overridden by subclasses to implement more meaningful
cover algorithms.
:param X: A dataset of n points.
:type X: array-like of shape (n, m) or list-like of length n
:return: A generator of lists of ids.
:rtype: generator of lists of ints
"""
yield list(range(0, len(X)))
[docs]
class Proximity(Cover):
"""
Abstract interface for proximity functions. A proximity function is a
function that maps each point into a subset of the dataset that contains
the point itself. Every proximity function defines also a covering
algorithm based on proximity-netm that is implemented in this class.
Proximity functions, implemented as subclasses of this class, are a
convenient way to implement open cover algorithms by using the
proximity-net construction. Proximity-net is implemented by function
:func:`tdamapper.core.Proximity.apply`.
Subclasses should override the methods :func:`tdamapper.core.Proximity.fit`
and :func:`tdamapper.core.Proximity.search` of this class to implement
more meaningful proximity functions.
"""
[docs]
def fit(self, X):
"""
Train internal parameters.
This is a naive implementation that should be overridden by subclasses
to implement more meaningful proximity functions.
:param X: A dataset of n points.
:type X: array-like of shape (n, m) or list-like of length n
:return: The object itself.
:rtype: self
"""
self.__X = X
return self
[docs]
def search(self, x):
"""
Return a list of neighbors for the query point.
This is a naive implementation that returns all the points in the
dataset as neighbors. This method should be overridden by subclasses
to implement more meaningful proximity functions.
:param x: A query point for which we want to find neighbors.
:type x: Any
:return: A list containing all the indices of the points in the
dataset.
:rtype: list[int]
"""
return list(range(0, len(self.__X)))
[docs]
def apply(self, X):
"""
Covers the dataset using proximity-net.
This function applies an iterative algorithm to create the
proximity-net. It picks an arbitrary point and forms an open cover
calling the proximity function on the chosen point. The points
contained in the open cover are then marked as covered, and discarded
in the following steps. The procedure is repeated on the leftover
points until every point is eventually covered.
This function returns a generator that yields each element of the
proximity-net as a list of ids. The ids are the indices of the points
in the original dataset.
:param X: A dataset of n points.
:type X: array-like of shape (n, m) or list-like of length n
:return: A generator of lists of ids.
:rtype: generator of lists of ints
"""
covered_ids = set()
self.fit(X)
for i, xi in enumerate(X):
if i not in covered_ids:
neigh_ids = self.search(xi)
covered_ids.update(neigh_ids)
if neigh_ids:
yield neigh_ids
[docs]
class TrivialCover(Cover):
"""
Cover algorithm that covers data with a single subset containing the whole
dataset.
This class creates a single open set that contains all the points in the
dataset.
"""
[docs]
def apply(self, X):
"""
Covers the dataset with a single open set.
:param X: A dataset of n points.
:type X: array-like of shape (n, m) or list-like of length n
:return: A generator of lists of ids.
:rtype: generator of lists of ints
"""
yield list(range(0, len(X)))
class _MapperAlgorithm(EstimatorMixin, ParamsMixin):
def __init__(
self,
cover=None,
clustering=None,
failsafe=True,
verbose=True,
n_jobs=1,
):
self.cover = cover
self.clustering = clustering
self.failsafe = failsafe
self.verbose = verbose
self.n_jobs = n_jobs
def fit(self, X, y=None):
X, y = self._validate_X_y(X, y)
self.__cover = TrivialCover() if self.cover is None else self.cover
self.__clustering = (
TrivialClustering() if self.clustering is None else self.clustering
)
self.__verbose = self.verbose
self.__failsafe = self.failsafe
if self.__failsafe:
self.__clustering = FailSafeClustering(
clustering=self.__clustering,
verbose=self.__verbose,
)
self.__cover = clone(self.__cover)
self.__clustering = clone(self.__clustering)
self.__n_jobs = self.n_jobs
y = X if y is None else y
self.graph_ = mapper_graph(
X,
y,
self.__cover,
self.__clustering,
n_jobs=self.__n_jobs,
)
self._set_n_features_in(X)
return self
def fit_transform(self, X, y):
self.fit(X, y)
return self.graph_
[docs]
class MapperAlgorithm(_MapperAlgorithm):
"""
**DEPRECATED**: This class is deprecated and will be removed in a future
release. Use :class:`tdamapper.learn.MapperAlgorithm`.
"""
@deprecated(
"This class is deprecated and will be removed in a future release. "
"Use tdamapper.learn.MapperAlgorithm."
)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
[docs]
class FailSafeClustering(ParamsMixin):
"""
A delegating clustering algorithm that prevents failure.
This class wraps a clustering algorithm and handles any exceptions that may
occur during the fitting process. If the clustering algorithm fails,
instead of throwing an exception, a single cluster containing all points is
returned. This can be useful for robustness and debugging purposes.
:param clustering: A clustering algorithm to delegate to.
:type clustering: An estimator compatible with scikit-learn's clustering
interface, typically from :mod:`sklearn.cluster`.
:param verbose: A flag to log clustering exceptions. Set to True to
enable logging, or False to suppress it. Defaults to True.
:type verbose: bool, optional.
"""
def __init__(self, clustering=None, verbose=True):
self.clustering = clustering
self.verbose = verbose
[docs]
def fit(self, X, y=None):
self.__clustering = (
TrivialClustering() if self.clustering is None else self.clustering
)
self.__verbose = self.verbose
self.labels_ = None
try:
self.__clustering.fit(X, y)
self.labels_ = self.__clustering.labels_
except ValueError as err:
if self.__verbose:
_logger.warning("Unable to perform clustering on local chart: %s", err)
self.labels_ = [0 for _ in X]
return self
[docs]
class TrivialClustering(ParamsMixin):
"""
A clustering algorithm that returns a single cluster.
This class implements a trivial clustering algorithm that assigns all data
points to the same cluster. It can be used as an argument of the class
:class:`tdamapper.core.MapperAlgorithm` to skip clustering in the
construction of the Mapper graph.
"""
def __init__(self):
pass
[docs]
def fit(self, X, y=None):
"""
Fit the clustering algorithm to the data.
:param X: A dataset of n points.
:type X: array-like of shape (n, m) or list-like of length n
:param y: Ignored.
:return: self
"""
self.labels_ = [0 for _ in X]
return self