"""Graph-based wrapper around CoreSG-HDBSCAN."""
from .core import CoreSGHDBSCAN
import importlib
import networkx as nx
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import hdbscan
from scipy.spatial.distance import cdist
from scipy.spatial import distance
import scipy.sparse as sp
from scipy.sparse import csr_matrix, triu
from scipy.sparse.csgraph import minimum_spanning_tree
from sklearn.cluster import HDBSCAN
from sklearn.metrics import pairwise_distances
from sklearn.metrics.pairwise import rbf_kernel
from sklearn.neighbors import NearestNeighbors as NN, kneighbors_graph
import heapq
from collections.abc import Iterable
try:
from numba import njit, prange
_HAS_NUMBA = True
except Exception:
njit = None
prange = range
_HAS_NUMBA = False
def _optional_import(module_name, package_name=None):
try:
return importlib.import_module(module_name)
except Exception as e:
pkg = package_name or module_name
raise ImportError(f"Optional dependency '{pkg}' is required for this functionality. Please install it.") from e
def _get_scanpy_modules():
sc = _optional_import("scanpy")
sce = _optional_import("scanpy.external")
sc_neighbors_connectivity = _optional_import("scanpy.neighbors._connectivity")
sc_neighbors_common = _optional_import("scanpy.neighbors._common")
return (
sc,
sce,
sc_neighbors_connectivity.gauss,
sc_neighbors_connectivity.umap,
sc_neighbors_common._get_indices_distances_from_dense_matrix,
)
if _HAS_NUMBA:
@njit(parallel=True, cache=True)
def _directed_jaccard_weights_numba(knn_idx, sorted_knn_idx):
n, k = knn_idx.shape
weights = np.zeros((n, k), dtype=np.float64)
for i in prange(n):
ni = sorted_knn_idx[i]
for t in range(k):
j = knn_idx[i, t]
nj = sorted_knn_idx[j]
a = 0
b = 0
shared = 0
while a < k and b < k:
va = ni[a]
vb = nj[b]
if va == vb:
shared += 1
a += 1
b += 1
elif va < vb:
a += 1
else:
b += 1
if shared > 0:
weights[i, t] = shared / ((2.0 * k) - shared)
return weights
[docs]
class GraphCoreSGHDBSCAN(CoreSGHDBSCAN):
"""
Graph-based CoreSG + HDBSCAN interface.
This class constructs a similarity graph from feature data or accepts a
precomputed graph representation, transforms that graph into a
graph-derived distance representation, and then runs the CoreSG-HDBSCAN
clustering pipeline.
Parameters
----------
min_samples : int or iterable of int, default=10
Main clustering hyperparameter. A single integer gives one fitted
solution, while an iterable allows fitting multiple values in one run.
sim_graph_method : {"sc_umap", "sc_gauss", "jaccard_phenograph", "precomputed"}, default="sc_umap"
Graph-construction backend.
metric : str, callable, or None, default="euclidean"
Distance metric used during similarity graph construction.
Supported string metrics are "cityblock", "cosine", "euclidean",
"l1", "l2", "manhattan", "braycurtis", "canberra",
"chebyshev", "correlation", "dice", "hamming", "jaccard",
"mahalanobis", "minkowski", "rogerstanimoto",
"russellrao", "seuclidean", "sokalmichener", "sokalsneath",
"sqeuclidean", "yule", and the package-specific
"hybrid_euclidean_cosine".
The metric "kulsinski" is not supported. The combination
metric="yule" with sim_graph_method="sc_gauss" is also not
supported because it can produce non-finite graph weights.
metric_kwds : dict or None, default=None
Additional keyword arguments passed to the selected distance metric.
add_neighbor : bool, default=True
Controls how weighted structural similarity is expanded into graph edges.
no_noise : bool, default=True
If True, points initially labeled as noise are reassigned after
clustering.
n_neighbors : int, default=15
Number of neighbors used during graph construction.
heuristic_connect : bool, default=False
If True, increase ``n_neighbors`` until the WSS dissimilarity graph becomes connected,
except in precomputed mode, where bridge edges are used instead.
If False, disconnected components are connected with synthetic bridge
edges.
min_cluster_size : int or None, default=None
Minimum cluster size used in the clustering stage. If None, the package
follows the selected ``min_samples`` value for each run.
save_models : bool, default=False
If True, save hdbscan models for different min_samples which can add some memory overhead.
If False, just save labels and condensed trees for each min_samples.
**kwargs
Additional keyword arguments passed to internal graph-construction
helpers.
Attributes
----------
similarity_graph_ : networkx.Graph
Initial similarity graph.
similarity_graph_WSS : networkx.Graph
Weighted structural similarity graph.
dissimilarity_graph_ : networkx.Graph
Graph after conversion from similarity to dissimilarity.
connected_graph_ : networkx.Graph
Final connected graph used by the clustering stage.
dist_matrix_ : numpy.ndarray
Dense matrix used by the CoreSG-HDBSCAN pipeline.
coresg_ : CoreSGHDBSCAN
Internal fitted CoreSG-HDBSCAN object.
models_ : dict
Dictionary of saved per-``min_samples`` models. Populated only when
``save_models=True``.
condensed_trees_ : dict
Dictionary of condensed tree objects keyed by fitted
``min_samples`` value.
labels_by_m_ : dict
Dictionary of stored labels keyed by fitted ``min_samples`` value.
"""
[docs]
def __init__(
self,
min_samples=10,
sim_graph_method='sc_umap',
metric='euclidean',
metric_kwds=None,
add_neighbor=True,
no_noise=True,
n_neighbors=15,
heuristic_connect=False,
min_cluster_size=None,
save_models=False,
similarity_backend="auto",
**kwargs,
):
# store graph params
valid_graph_methods = {'sc_gauss', 'sc_umap', 'jaccard_phenograph', 'precomputed'}
if sim_graph_method not in valid_graph_methods:
raise ValueError(
f"Unsupported sim_graph_method '{sim_graph_method}'. "
f"Use one of {sorted(valid_graph_methods)}."
)
if metric is None:
metric = 'euclidean'
valid_metrics = {
'cityblock',
'cosine',
'euclidean',
'l1',
'l2',
'manhattan',
'braycurtis',
'canberra',
'chebyshev',
'correlation',
'dice',
'hamming',
'jaccard',
'mahalanobis',
'minkowski',
'rogerstanimoto',
'russellrao',
'seuclidean',
'sokalmichener',
'sokalsneath',
'sqeuclidean',
'yule',
'hybrid_euclidean_cosine',
}
if not isinstance(metric, str) and not callable(metric):
raise TypeError(
"metric must be a string metric name, a callable distance function, or None."
)
if isinstance(metric, str) and metric not in valid_metrics:
raise ValueError(
f"Unsupported metric '{metric}'. "
f"Use one of {sorted(valid_metrics)}, or pass a callable metric."
)
if sim_graph_method == 'sc_gauss' and metric == 'yule':
raise ValueError(
"metric='yule' is not supported with sim_graph_method='sc_gauss' "
"because this combination can produce non-finite graph weights. "
"Use sim_graph_method='sc_umap' or sim_graph_method='jaccard_phenograph' "
"with metric='yule', or choose a different metric with sim_graph_method='sc_gauss'."
)
valid_similarity_backends = {"auto", "default", "numba"}
if similarity_backend not in valid_similarity_backends:
raise ValueError(
"similarity_backend must be one of "
f"{sorted(valid_similarity_backends)}, got {similarity_backend!r}."
)
self.similarity_backend = similarity_backend
self.sim_graph_method = sim_graph_method
self.metric = metric
self.metric_kwds = {} if metric_kwds is None else dict(metric_kwds)
self.add_neighbor = add_neighbor
self.no_noise = no_noise
self.n_neighbors = n_neighbors
if 'mst_approx' in kwargs:
heuristic_connect = kwargs.pop('mst_approx')
self.heuristic_connect = bool(heuristic_connect)
self.save_models = bool(save_models)
self.models_ = {}
self.condensed_trees_ = {}
self.labels_by_m_ = {}
# Backward-compatible handling of removed parameters.
kwargs.pop('force_connected', None)
kwargs.pop('gamma', None)
kwargs.pop('min_dist', None)
# ``m_list`` is now internal rather than a public hyperparameter.
# Keep a legacy escape hatch through kwargs only.
legacy_m_list = kwargs.pop('m_list', None)
if legacy_m_list is not None:
resolved_m_list = list(legacy_m_list)
elif isinstance(min_samples, Iterable) and not isinstance(min_samples, (str, bytes, np.str_)):
resolved_m_list = list(min_samples)
else:
resolved_m_list = [int(min_samples)]
if len(resolved_m_list) == 0:
raise ValueError("min_samples must define at least one value.")
self.m_list = [int(m) for m in resolved_m_list]
self.min_samples = list(self.m_list) if len(self.m_list) > 1 else int(self.m_list[0])
resolved_min_cluster_size = None if min_cluster_size is None else int(min_cluster_size)
core_metric = 'euclidean' if (metric == 'hybrid_euclidean_cosine' or callable(metric)) else metric
super().__init__(
min_samples_list=self.m_list,
metric=core_metric,
min_cluster_size=resolved_min_cluster_size,
save_models=self.save_models,
**kwargs
)
self.min_cluster_size = resolved_min_cluster_size
def __repr__(self):
fitted = hasattr(self, "coresg_") and self.coresg_ is not None
if fitted:
n_models = len(getattr(self.coresg_, "models_", {}))
n_trees = len(getattr(self.coresg_, "condensed_trees_", {}))
n_label_sets = len(getattr(self.coresg_, "labels_by_m_", {}))
else:
n_models = 0
n_trees = 0
n_label_sets = 0
return (
f"GraphCoreSGHDBSCAN("
f"min_samples={list(self.m_list)}, "
f"sim_graph_method={self.sim_graph_method!r}, "
f"metric={self.metric!r}, "
f"n_neighbors={self.n_neighbors}, "
f"min_cluster_size={self.min_cluster_size}, "
f"save_models={self.save_models}, "
f"fitted={fitted}, "
f"n_models={n_models}, "
f"n_condensed_trees={n_trees}, "
f"n_label_sets={n_label_sets}"
f")"
)
def _min_cluster_size_for(self, m):
m = int(m)
return m if self.min_cluster_size is None else int(self.min_cluster_size)
[docs]
def compute_similarity_sparse(self, graph) -> sp.csr_matrix:
"""Fast weighted structural similarity as a sparse matrix.
This is algebraically equivalent to the original ``compute_similarity``
implementation, but avoids Python-level all-pairs iteration. The
weighted adjacency vector for each node includes an explicit self-loop
of weight 1 before cosine normalization.
"""
n = graph.number_of_nodes()
if n == 0:
return sp.csr_matrix((0, 0))
u_list, v_list, w_list = [], [], []
for u, v, d in graph.edges(data=True):
u_list.append(u)
v_list.append(v)
w_list.append(d.get("weight", 1.0))
if u_list:
u = np.asarray(u_list, dtype=np.int32)
v = np.asarray(v_list, dtype=np.int32)
w = np.asarray(w_list, dtype=np.float64)
rows = np.concatenate([u, v, np.arange(n, dtype=np.int32)])
cols = np.concatenate([v, u, np.arange(n, dtype=np.int32)])
data = np.concatenate([w, w, np.ones(n, dtype=np.float64)])
else:
rows = np.arange(n, dtype=np.int32)
cols = np.arange(n, dtype=np.int32)
data = np.ones(n, dtype=np.float64)
A = sp.csr_matrix((data, (rows, cols)), shape=(n, n))
A.eliminate_zeros()
norms = np.sqrt(np.asarray(A.multiply(A).sum(axis=1)).ravel())
norms[norms == 0.0] = 1.0
inv = 1.0 / norms
if not self.add_neighbor:
A_norm = A.multiply(inv[:, None]).tocsr()
out_rows, out_cols, out_data = [], [], []
for u, v in graph.edges():
sim = float(A_norm[u].multiply(A_norm[v]).sum())
out_rows.extend((u, v))
out_cols.extend((v, u))
out_data.extend((sim, sim))
S = sp.csr_matrix((out_data, (out_rows, out_cols)), shape=(n, n))
S.eliminate_zeros()
return S
numerators = (A @ A.T).tocsr()
S = numerators.multiply(inv[:, None]).multiply(inv[None, :]).tocsr()
S.setdiag(0.0)
S.eliminate_zeros()
return S
[docs]
def compute_similarity(self, graph):
"""Backward-compatible NetworkX wrapper over the sparse implementation."""
S = self.compute_similarity_sparse(graph)
if self.add_neighbor:
S = triu(S, k=1).tocsr()
if S.nnz:
mask = S.data > 0.0
if not np.all(mask):
S = sp.csr_matrix((S.data[mask], S.indices[mask], S.indptr.copy()), shape=S.shape)
S.eliminate_zeros()
out = nx.from_scipy_sparse_array(S, edge_attribute='weight')
out.add_nodes_from(range(graph.number_of_nodes()))
return out
[docs]
@staticmethod
def similarity_to_dissimilarity_sparse(similarity_matrix: sp.csr_matrix) -> sp.csr_matrix:
D = similarity_matrix.copy().tocsr()
D.data = 1.0 - D.data
D.setdiag(0.0)
D.eliminate_zeros()
return D
[docs]
@staticmethod
def similarity_to_dissimilarity(similarity_graph):
dissimilarity_graph = nx.Graph()
for u, v, data in similarity_graph.edges(data=True):
dissimilarity_graph.add_edge(u, v, weight=1 - data['weight'])
return dissimilarity_graph
[docs]
@staticmethod
def is_graph_connected(graph):
return nx.is_connected(graph)
@staticmethod
def _coerce_precomputed_graph(graph_like):
"""Convert a supported precomputed graph representation into a NetworkX graph."""
if isinstance(graph_like, nx.Graph):
graph = nx.convert_node_labels_to_integers(
graph_like,
first_label=0,
ordering="default",
label_attribute="original_label",
)
elif hasattr(graph_like, 'tocoo'):
graph = nx.from_scipy_sparse_array(graph_like, edge_attribute='weight')
else:
arr = np.asarray(graph_like)
if arr.ndim != 2 or arr.shape[0] != arr.shape[1]:
raise ValueError(
"For sim_graph_method='precomputed', input must be a NetworkX graph, "
"a scipy sparse adjacency matrix, or a square dense adjacency matrix."
)
graph = nx.from_numpy_array(arr)
graph.remove_edges_from([(u, v) for u, v, d in graph.edges(data=True) if d.get('weight', 0) == 0])
return graph
def _fast_phenograph_jaccard_from_knn_graph(self, knn_graph):
"""
Fast replacement for PhenoGraph's Jaccard graph construction.
Input is the same sparse kNN graph that the old code passed to:
sce.tl.phenograph(knn_dist, directed=False, clustering_algo=None)
Output matches PhenoGraph's default undirected Jaccard graph.
"""
if not _HAS_NUMBA:
raise ImportError(
"Fast jaccard_phenograph requires numba. "
"Install it with `pip install numba`."
)
knn_graph = knn_graph.tocsr()
n = knn_graph.shape[0]
if n <= 1:
return sp.csr_matrix((n, n), dtype=np.float64)
# The old branch passes a kNN graph with exactly self.n_neighbors - 1
# neighbors per row.
k = int(self.n_neighbors) - 1
if k < 1:
raise ValueError("n_neighbors must be at least 2 for jaccard_phenograph.")
indptr = knn_graph.indptr
indices = knn_graph.indices
row_counts = np.diff(indptr)
if not np.all(row_counts == k):
raise ValueError(
"Expected the kNN graph to have exactly "
f"{k} neighbors per row, but got row counts from "
f"{row_counts.min()} to {row_counts.max()}."
)
knn_idx = indices.reshape(n, k).astype(np.int32, copy=False)
sorted_knn_idx = np.sort(knn_idx, axis=1).astype(np.int32, copy=False)
weights = _directed_jaccard_weights_numba(
knn_idx,
sorted_knn_idx,
)
rows = np.repeat(np.arange(n, dtype=np.int32), k)
cols = knn_idx.ravel()
data = weights.ravel()
mask = data > 0.0
directed = sp.csr_matrix(
(data[mask], (rows[mask], cols[mask])),
shape=(n, n),
dtype=np.float64,
)
directed.eliminate_zeros()
conn = (directed + directed.T).multiply(0.5)
conn = sp.tril(conn, k=-1).tocsr()
conn.eliminate_zeros()
return conn
[docs]
def create_similarity_graph(self, data):
if self.sim_graph_method == 'precomputed':
return self._coerce_precomputed_graph(data)
sc, sce, sc_gauss, sc_umap, _get_indices_distances_from_dense_matrix = _get_scanpy_modules()
X = data.toarray() if hasattr(data, "toarray") else np.asarray(data)
if X.ndim != 2:
raise ValueError("Input data must be a 2D array-like object.")
if self.metric == 'hybrid_euclidean_cosine':
distances_full = pairwise_distances(X, metric='euclidean')
knn_metric = 'cosine'
use_precomputed_knn = False
else:
distances_full = pairwise_distances(
X,
metric=self.metric,
**self.metric_kwds,
)
knn_metric = 'precomputed'
use_precomputed_knn = True
self.distances_full_ = distances_full
if self.sim_graph_method == 'jaccard_phenograph':
if use_precomputed_knn:
knn_dist = kneighbors_graph(
distances_full,
n_neighbors=self.n_neighbors - 1,
mode='distance',
metric='precomputed',
include_self=False,
)
else:
knn_dist = kneighbors_graph(
distances_full,
n_neighbors=self.n_neighbors - 1,
mode='distance',
metric='cosine',
include_self=False,
)
if self.similarity_backend == "numba":
conn = self._fast_phenograph_jaccard_from_knn_graph(knn_dist)
elif self.similarity_backend == "default":
_, conn, _ = sce.tl.phenograph(
knn_dist,
directed=False,
clustering_algo=None,
)
else: # similarity_backend == "auto"
if _HAS_NUMBA:
conn = self._fast_phenograph_jaccard_from_knn_graph(knn_dist)
else:
_, conn, _ = sce.tl.phenograph(
knn_dist,
directed=False,
clustering_algo=None,
)
return nx.from_scipy_sparse_array(conn.tocsr(), edge_attribute='weight')
if self.sim_graph_method == 'sc_gauss':
if use_precomputed_knn:
knn_dist = kneighbors_graph(
distances_full,
n_neighbors=self.n_neighbors - 1,
mode='distance',
metric='precomputed',
include_self=False,
)
else:
knn_dist = kneighbors_graph(
distances_full,
n_neighbors=self.n_neighbors - 1,
mode='distance',
metric='cosine',
include_self=False,
)
conn = sc_gauss(knn_dist, n_neighbors=self.n_neighbors, knn=True)
return nx.from_scipy_sparse_array(conn, edge_attribute='weight')
if self.sim_graph_method == 'sc_umap':
if use_precomputed_knn:
idx, dists = _get_indices_distances_from_dense_matrix(
distances_full, self.n_neighbors
)
else:
nn = NN(n_neighbors=self.n_neighbors, metric=knn_metric)
nn.fit(X)
dists, idx = nn.kneighbors(X, return_distance=True)
conn = sc_umap(
idx,
dists,
n_obs=distances_full.shape[0],
n_neighbors=self.n_neighbors,
)
return nx.from_scipy_sparse_array(conn, edge_attribute='weight')
raise ValueError(
"Unsupported sim_graph_method. Use one of 'sc_gauss', 'sc_umap', 'jaccard_phenograph', or 'precomputed'."
)
[docs]
def connect_graph_heuristically(self, graph, n_obs):
"""Connect disconnected components with synthetic bridge edges.
This function assumes `graph` is already a dissimilarity graph:
smaller weight = closer
larger weight = farther
It does not rebuild the similarity graph.
It only adds bridge edges of distance weight 1 between disconnected components.
"""
new_graph = graph.copy()
new_graph.add_nodes_from(range(n_obs))
if nx.is_connected(new_graph):
return new_graph
components = list(nx.connected_components(new_graph))
for i in range(len(components) - 1):
u = next(iter(components[i]))
v = next(iter(components[i + 1]))
# weight=1 means weakest / maximum-distance bridge
new_graph.add_edge(u, v, weight=1)
return new_graph
[docs]
@staticmethod
def compute_full_distance_matrix(graph):
"""
Compute the full dense matrix of shortest path distances using Floyd–Warshall.
"""
return np.array(nx.floyd_warshall_numpy(graph, weight='weight'))
[docs]
@staticmethod
def compute_sparse_distance_dict(graph):
"""
Compute a dictionary-of-dictionaries of shortest path distances.
For each node, run single_source_dijkstra_path_length and store the results.
"""
distance_dict = {}
for node in graph.nodes():
# Compute shortest path lengths from 'node' to all others.
distance_dict[node] = nx.single_source_dijkstra_path_length(graph, node, weight='weight')
return distance_dict
[docs]
def graph_metric(self, u, v):
"""
Custom distance metric that uses the precomputed sparse distance dictionary.
The data points are mapped to node indices using self._point_to_index.
"""
idx_u = self._point_to_index.get(tuple(u))
idx_v = self._point_to_index.get(tuple(v))
try:
return self.distance_dict_[idx_u][idx_v]
except KeyError:
# Since the graph is connected, this should rarely happen.
# Check the reverse ordering as a fallback.
return self.distance_dict_[idx_v][idx_u]
[docs]
@staticmethod
def compute_custom_distance_matrix(graph):
"""Compute the pairwise distance matrix used by the graph-based pipeline.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Input feature matrix.
Returns
-------
numpy.ndarray
Pairwise distance matrix.
"""
n = graph.number_of_nodes()
dist = np.full((n, n), 1, dtype=np.float64)
np.fill_diagonal(dist, 0)
for u, v, data in graph.edges(data=True):
weight = data['weight']
dist[u, v] = weight
dist[v, u] = weight
return dist
[docs]
@staticmethod
def dense_from_sparse_edges_fill1(D_sparse: sp.csr_matrix) -> np.ndarray:
"""Create the dense edge-distance matrix expected by CoreSG/HDBSCAN.
Non-edges are filled with 1, diagonal with 0, and sparse entries
overwrite the corresponding distances.
"""
D_sparse = D_sparse.tocsr()
n = D_sparse.shape[0]
D = np.ones((n, n), dtype=np.float64)
np.fill_diagonal(D, 0.0)
coo = D_sparse.tocoo()
D[coo.row, coo.col] = coo.data
return np.minimum(D, D.T)
[docs]
@staticmethod
def reassign_noise_via_mst(mst_graph, labels0, c=5):
"""
Reassign noise labels by propagating labels over a precomputed MST.
Parameters
----------
mst_graph : networkx.Graph
Minimum spanning tree of the final connected WSS graph.
labels0 : ndarray
Initial labels with noise marked as -1.
c : int, default=5
Number of largest edge weights to keep in the lexicographic path
signature during propagation.
"""
if not isinstance(mst_graph, nx.Graph):
raise TypeError("mst_graph must be a networkx.Graph.")
n = len(labels0)
labels = np.asarray(labels0).copy()
if mst_graph.number_of_nodes() != n:
raise ValueError("mst_graph and labels0 must have the same number of nodes.")
# adjacency of the tree
adj = [[] for _ in range(n)]
for u, v, data in mst_graph.edges(data=True):
w = float(data.get('weight', 1.0))
adj[int(u)].append((int(v), w))
adj[int(v)].append((int(u), w))
# Multi-source propagation from labeled vertices.
pq = []
paths = [None] * n
for u in range(n):
if labels[u] != -1:
paths[u] = [0.0] * c
for v, w in adj[u]:
if labels[v] == -1:
heapq.heappush(pq, (w, u, v))
while pq:
w, u, v = heapq.heappop(pq)
if labels[v] != -1:
continue
same = [(u, v)]
while pq and pq[0][0] == w and pq[0][2] == v:
_, u2, _ = heapq.heappop(pq)
same.append((u2, v))
def top_c_path(u_idx):
vec = list(paths[u_idx]) + [w]
return sorted(vec, reverse=True)[:c]
candidates = [(top_c_path(u_idx), labels[u_idx]) for u_idx, _ in same]
best_path, best_label = min(candidates, key=lambda x: tuple(x[0]))
labels[v] = best_label
paths[v] = best_path
for nbr, w2 in adj[v]:
if labels[nbr] == -1:
heapq.heappush(pq, (w2, v, nbr))
return labels
# ------------------------------------------------------------------
# ------------------------ GRAPH PREPROCESSING ---------------------
# ------------------------------------------------------------------
def _build_graph_distance(self, X):
"""Build graph-derived dense distances using the sparse fast path.
Pipeline:
data / precomputed graph
-> initial similarity graph
-> weighted structural similarity graph
-> WSS dissimilarity graph
-> connected graph
-> dense precomputed distance matrix
"""
self.data_ = (
X if self.sim_graph_method == "precomputed"
else (np.array(X) if isinstance(X, pd.DataFrame) else X)
)
# ------------------------------------------------------------
# 1. Build initial similarity graph
# ------------------------------------------------------------
self.similarity_graph_ = self.create_similarity_graph(self.data_)
# Use the graph size, not len(self.data_).
# This is safer for precomputed NetworkX graphs and scipy sparse matrices.
n_obs = self.similarity_graph_.number_of_nodes()
self.n_obs_ = n_obs
self.similarity_graph_.add_nodes_from(range(n_obs))
# ------------------------------------------------------------
# 2. Compute WSS similarity, then convert to dissimilarity
# ------------------------------------------------------------
self.similarity_graph_WSS_sparse_ = self.compute_similarity_sparse(
self.similarity_graph_
)
self.dissimilarity_graph_sparse_ = self.similarity_to_dissimilarity_sparse(
self.similarity_graph_WSS_sparse_
)
# ------------------------------------------------------------
# 3. Check whether WSS dissimilarity graph is connected
# ------------------------------------------------------------
n_components, _ = sp.csgraph.connected_components(
self.dissimilarity_graph_sparse_,
directed=False
)
# ------------------------------------------------------------
# 4. If disconnected and NOT precomputed, optionally increase
# n_neighbors until the WSS dissimilarity graph is connected.
#
# In precomputed mode, n_neighbors cannot change the graph,
# so this block is skipped.
# ------------------------------------------------------------
self.n_neighbors_initial_ = self.n_neighbors
self.n_neighbors_used_ = self.n_neighbors
if (
n_components > 1
and self.heuristic_connect
and self.sim_graph_method != "precomputed"
):
original_n_neighbors = self.n_neighbors
new_n_neighbors = self.n_neighbors
max_neighbors = n_obs
while n_components > 1 and new_n_neighbors < max_neighbors:
new_n_neighbors += 1
print("Trying n_neighbors =", new_n_neighbors)
self.n_neighbors = new_n_neighbors
# Rebuild the full correct pipeline:
# similarity graph -> WSS similarity -> WSS dissimilarity
self.similarity_graph_ = self.create_similarity_graph(self.data_)
self.similarity_graph_.add_nodes_from(range(n_obs))
self.similarity_graph_WSS_sparse_ = self.compute_similarity_sparse(
self.similarity_graph_
)
self.dissimilarity_graph_sparse_ = self.similarity_to_dissimilarity_sparse(
self.similarity_graph_WSS_sparse_
)
n_components, _ = sp.csgraph.connected_components(
self.dissimilarity_graph_sparse_,
directed=False
)
self.n_neighbors_used_ = self.n_neighbors
if n_components > 1:
raise RuntimeError(
"Could not build a connected WSS dissimilarity graph even after "
f"increasing n_neighbors from {original_n_neighbors} "
f"to {max_neighbors}."
)
# ------------------------------------------------------------
# 5. If connected, use WSS dissimilarity graph directly
# ------------------------------------------------------------
if n_components <= 1:
self.dist_matrix_ = self.dense_from_sparse_edges_fill1(
self.dissimilarity_graph_sparse_
)
self.connected_graph_ = nx.from_scipy_sparse_array(
self.dissimilarity_graph_sparse_,
edge_attribute="weight"
)
self.connected_graph_.add_nodes_from(range(n_obs))
# ------------------------------------------------------------
# 6. If still disconnected, connect components with bridge
# edges of distance 1.
#
# This is used when:
# - heuristic_connect=False
# - or sim_graph_method="precomputed"
# ------------------------------------------------------------
else:
sparse_nx = nx.from_scipy_sparse_array(
self.dissimilarity_graph_sparse_,
edge_attribute="weight"
)
sparse_nx.add_nodes_from(range(n_obs))
self.connected_graph_ = self.connect_graph_heuristically(
sparse_nx,
n_obs
)
self.dist_matrix_ = self.compute_custom_distance_matrix(
self.connected_graph_
)
# ------------------------------------------------------------
# 7. MST used later for optional noise reassignment
# ------------------------------------------------------------
self.mst_graph_ = nx.minimum_spanning_tree(
self.connected_graph_,
weight="weight"
)
# ------------------------------------------------------------
# 8. Store NetworkX versions for inspection/debugging
# ------------------------------------------------------------
self.similarity_graph_WSS = nx.from_scipy_sparse_array(
self.similarity_graph_WSS_sparse_,
edge_attribute="weight"
)
self.similarity_graph_WSS.add_nodes_from(range(n_obs))
self.dissimilarity_graph_ = nx.from_scipy_sparse_array(
self.dissimilarity_graph_sparse_,
edge_attribute="weight"
)
self.dissimilarity_graph_.add_nodes_from(range(n_obs))
# ------------------------------------------------------------------
# ------------------------- FIT ------------------------------------
# ------------------------------------------------------------------
[docs]
def fit(self, X, y=None):
"""
Fit the model on feature data or a precomputed graph.
Parameters
----------
X : array-like of shape (n_samples, n_features) or graph-like
Input feature matrix when ``sim_graph_method`` is not ``"precomputed"``.
In ``"precomputed"`` mode, this may be a ``networkx.Graph``, a SciPy
sparse adjacency matrix, or a square dense adjacency matrix.
Returns
-------
self : GraphCoreSGHDBSCAN
Fitted estimator.
"""
self._build_graph_distance(X)
self.coresg_ = CoreSGHDBSCAN(
min_samples_list=self.m_list,
metric="precomputed",
min_cluster_size=self.min_cluster_size,
save_models=self.save_models,
)
self.coresg_.fit_from_distance_matrix(self.dist_matrix_)
self.coresg_.run()
self.models_ = self.coresg_.models_
self.condensed_trees_ = self.coresg_.condensed_trees_
self.labels_by_m_ = self.coresg_.labels_by_m_
return self
[docs]
def fit_predict(self, X, y=None, m=None, c=5, **fit_params):
"""
Fit the model and return cluster labels.
Parameters
----------
X : array-like of shape (n_samples, n_features) or graph-like
Input feature matrix or supported precomputed graph representation.
Returns
-------
numpy.ndarray
Cluster labels for the fitted solution.
"""
self.fit(X, y, **fit_params)
if m is None:
if len(self.m_list) != 1:
raise ValueError(
"fit_predict requires `m` when m_list contains multiple values. "
"Use labels_for(m) or pass m=... explicitly."
)
m = self.m_list[0]
labels = self.coresg_.labels_by_m_[int(m)]
if self.no_noise:
return self.reassign_noise_via_mst(
self.mst_graph_,
labels,
c=c,
)
return labels
[docs]
def fit_coresg(self, X, m_list, coresg_kwargs=None):
"""Build graph-derived distances and run CoreSGHDBSCAN on them."""
self._build_graph_distance(X)
if coresg_kwargs is None:
coresg_kwargs = {}
self.coresg_ = CoreSGHDBSCAN(
min_samples_list=list(m_list),
min_cluster_size=self.min_cluster_size,
save_models=self.save_models,
**coresg_kwargs
).fit_from_distance_matrix(self.dist_matrix_)
self.coresg_.run()
self.models_ = self.coresg_.models_
self.condensed_trees_ = self.coresg_.condensed_trees_
self.labels_by_m_ = self.coresg_.labels_by_m_
return self
# ------------------------------------------------------------------
# -------------------------- ACCESSORS -----------------------------
# ------------------------------------------------------------------
[docs]
def labels_for(self, m, no_noise=None, c=5):
"""
Return labels for a selected ``min_samples`` value.
Parameters
----------
m : int
Selected ``min_samples`` value.
no_noise : bool or None, optional
If ``True``, apply MST-based noise reassignment. If ``None``, use
the instance-level ``no_noise`` setting.
c : int, optional
Tie-breaking path length used during noise reassignment.
Returns
-------
numpy.ndarray
Cluster labels for the requested fitted solution.
Notes
-----
``labels_by_m_[m]`` stores the directly fitted labels.
``labels_for(m)`` may additionally apply noise reassignment.
"""
labels = self.coresg_.labels_by_m_[int(m)]
if no_noise is None:
no_noise = self.no_noise
if no_noise:
labels = self.reassign_noise_via_mst(
self.mst_graph_,
labels,
c=c,
)
return labels
[docs]
def plot_condensed_tree(self, m, figsize=(10, 6), **kwargs):
"""
Plot the condensed tree for a selected ``min_samples`` value.
Parameters
----------
m : int
The ``min_samples`` value whose condensed tree should be displayed.
figsize : tuple of float, optional
Figure size passed to Matplotlib, by default ``(8, 5)``.
**kwargs
Additional keyword arguments forwarded to
``CondensedTree.plot()``.
Returns
-------
None
Displays the condensed tree plot.
Raises
------
ValueError
If the model has not been fitted yet.
KeyError
If the requested ``m`` is not available in the stored results.
Notes
-----
This method first looks for the condensed tree in
``self.coresg_.condensed_trees_``. If it is not found there, it falls
back to ``self.coresg_.models_[m].condensed_tree_`` when full models
have been saved.
Examples
--------
>>> g.fit(X)
>>> g.plot_condensed_tree(10)
"""
import matplotlib.pyplot as plt
if not hasattr(self, "coresg_") or self.coresg_ is None:
raise ValueError("Model is not fitted yet. Call fit(...) first.")
m = int(m)
if m in getattr(self.coresg_, "condensed_trees_", {}):
ct = self.coresg_.condensed_trees_[m]
elif m in getattr(self.coresg_, "models_", {}):
ct = self.coresg_.models_[m].condensed_tree_
else:
raise KeyError(f"m={m} not found in CORE-SG results.")
if ct is None or not hasattr(ct, "plot"):
print(f"No condensed tree for CORE-SG m={m}")
return
plt.figure(figsize=figsize)
ct.plot(select_clusters=False, label_clusters=False, **kwargs)
plt.title(f"CORE-SG Condensed Tree (min_samples = {m})")
plt.show()
[docs]
def interactive_condensed_tree(self, figsize=(10, 6)):
"""
Create an interactive condensed tree explorer across fitted
``min_samples`` values.
Parameters
----------
figsize : tuple of float, optional
Figure size passed to Matplotlib for each displayed condensed
tree, by default ``(10, 6)``.
Returns
-------
ipywidgets.Widget
A selection slider widget for browsing condensed trees across
available ``min_samples`` values.
Raises
------
ImportError
If ``ipywidgets`` is not installed.
RuntimeError
If the model has not been fitted yet.
ValueError
If no condensed trees are available.
Notes
-----
This method is intended for use in an interactive Jupyter
environment. It uses the stored condensed trees in
``self.coresg_.condensed_trees_`` and falls back to any available
entries in ``self.coresg_.models_``.
Examples
--------
>>> g.fit(X)
>>> widget = g.interactive_condensed_tree()
"""
try:
import ipywidgets as widgets
from IPython.display import display, clear_output
except ImportError as e:
raise ImportError(
"ipywidgets is required for interactive plotting. "
"Install it with `pip install ipywidgets`."
) from e
import matplotlib.pyplot as plt
if not hasattr(self, "coresg_") or self.coresg_ is None:
raise RuntimeError("Call fit(...) before interactive_condensed_tree().")
m_list = sorted(
set(getattr(self.coresg_, "condensed_trees_", {}).keys()) |
set(getattr(self.coresg_, "models_", {}).keys())
)
if len(m_list) == 0:
raise ValueError("No condensed trees are available.")
output = widgets.Output()
slider = widgets.SelectionSlider(
options=m_list,
value=m_list[0],
description="min_samples",
continuous_update=False,
style={"description_width": "initial"},
layout=widgets.Layout(width="500px"),
)
def redraw(m):
with output:
clear_output(wait=True)
self.plot_condensed_tree(m=int(m), figsize=figsize)
def on_change(change):
if change["name"] == "value":
redraw(change["new"])
slider.observe(on_change, names="value")
display(widgets.VBox([slider, output]))
redraw(m_list[0])
return slider