Source code for coresg_graphhdbscan.core

"""CoreSG-HDBSCAN core implementation"""

import time
import warnings
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Tuple

import numpy as np
import hdbscan
from scipy.spatial.distance import pdist, squareform
from scipy.sparse import coo_matrix, csr_matrix, csgraph

from sklearn.neighbors import NearestNeighbors

warnings.filterwarnings("ignore", category=UserWarning)
warnings.filterwarnings("ignore", category=FutureWarning)

# =========================
# HDBSCAN generic internals
# =========================
from hdbscan._hdbscan_linkage import label as _linkage_label
from hdbscan._hdbscan_tree import (
    condense_tree as _condense_tree,
    compute_stability as _compute_stability,
    get_clusters as _get_clusters,
)
from hdbscan.plots import CondensedTree as _CondensedTree, SingleLinkageTree as _SingleLinkageTree


# ===========================================
# Dense Prim on COMPLETE MRD graph (your code)
# ===========================================
[docs] def prim_mrd_mst_edges(X: np.ndarray, core: np.ndarray) -> np.ndarray: """ Compute MST edges on a mutual-reachability graph using Prim's algorithm. Parameters ---------- D : numpy.ndarray Dense pairwise distance matrix. core : numpy.ndarray Core-distance vector. eps : float, default=1e-12 Numerical tolerance. Returns ------- numpy.ndarray Array of undirected MST edges with shape ``(n_edges, 2)``. """ X = np.asarray(X, dtype=np.float64, order="C") n = X.shape[0] in_tree = np.zeros(n, dtype=bool) key = np.full(n, np.inf, dtype=np.float64) parent = np.full(n, -1, dtype=np.int32) key[0] = 0.0 for _ in range(n): u = np.argmin(key) in_tree[u] = True key[u] = np.inf not_in = ~in_tree if not np.any(not_in): break dif = X[not_in] - X[u] d_uv = np.sqrt(np.einsum('ij,ij->i', dif, dif), dtype=np.float64) idx_not = np.flatnonzero(not_in) cand = np.maximum(np.maximum(core[u], core[idx_not]), d_uv) better = cand < key[idx_not] key[idx_not[better]] = cand[better] parent[idx_not[better]] = u edges = [] for v in range(n): p = parent[v] if p != -1: edges.append((min(p, v), max(p, v))) return np.array(edges, dtype=np.int32)
[docs] def prim_mrd_mst_edges_from_D(D: np.ndarray, core: np.ndarray) -> np.ndarray: """ Compute MST edges from a precomputed distance matrix. Parameters ---------- D : numpy.ndarray Dense pairwise distance matrix. core : numpy.ndarray Core-distance vector. eps : float, default=1e-12 Numerical tolerance. Returns ------- numpy.ndarray Array of undirected MST edges with shape ``(n_edges, 2)``. """ D = np.asarray(D, dtype=np.float64, order="C") n = D.shape[0] if D.shape[1] != n: raise ValueError("D must be a square distance matrix.") if core.shape[0] != n: raise ValueError("core must have length N.") in_tree = np.zeros(n, dtype=bool) key = np.full(n, np.inf, dtype=np.float64) parent = np.full(n, -1, dtype=np.int32) key[0] = 0.0 for _ in range(n): u = np.argmin(key) in_tree[u] = True key[u] = np.inf not_in = ~in_tree if not np.any(not_in): break idx_not = np.flatnonzero(not_in) base = D[u, idx_not] cand = np.maximum(np.maximum(core[u], core[idx_not]), base) better = cand < key[idx_not] key[idx_not[better]] = cand[better] parent[idx_not[better]] = u edges = [] for v in range(n): p = parent[v] if p != -1: edges.append((min(p, v), max(p, v))) return np.array(edges, dtype=np.int32)
# =========================================== # CORE-SG model wrapper (HDBSCAN-like) # ===========================================
[docs] class CoreSGModel: """ Lightweight wrapper that mimics the HDBSCAN attributes used by this package. Stored result object for one fitted ``min_samples`` value. Attributes ---------- labels_ : numpy.ndarray Cluster labels for each sample. probabilities_ : numpy.ndarray Membership strengths for each sample. cluster_persistence_ : numpy.ndarray Persistence score for each cluster. single_linkage_tree_ : object Single-linkage tree wrapper. cluster_persistence_ : numpy.ndarray Cluster persistence values returned by the HDBSCAN*-style cluster selection step. condensed_tree_ : hdbscan.plots.CondensedTree Condensed tree object for plotting and inspection. """
[docs] def __init__(self, labels: np.ndarray, probabilities: np.ndarray, stabilities: np.ndarray, condensed_tree_array: np.recarray, single_linkage_tree: np.ndarray): self.labels_ = labels self.probabilities_ = probabilities self.cluster_persistence_ = stabilities self.condensed_tree_ = _CondensedTree(condensed_tree_array, labels) self.single_linkage_tree_ = _SingleLinkageTree(single_linkage_tree)
# =========================================== # CORE-SG generic with fast MST logic # ===========================================
[docs] @dataclass class CoreSGHDBSCAN: """ CoreSG-based hierarchical density clustering backend. This class implements the lower-level CoreSG-HDBSCAN pipeline operating on feature vectors or distance representations. Workflow -------- 1. Compute the full distance matrix once. 2. Compute self-inclusive core distances for all values in ``min_samples_list``. 3. Build the CORE-SG graph from: - the kmax nearest-neighbor graph with ties - the MST on the complete MRD graph for kmax 4. Precompute a sparse neighbor table for fast edge distance lookup. 5. For each ``m``: - compute MRD edge weights - build the sparse weighted graph - compute the MST - build the single-linkage tree - condense the tree and extract clusters Parameters ---------- min_samples_list : list[int] List of ``min_samples`` values to evaluate. metric : str, default="euclidean" Distance metric mode. eps : float, default=1e-12 Numerical tolerance used in graph construction. min_cluster_size : int or None, default=None Minimum cluster size. If ``None``, the package default behavior is used. """ min_samples_list: List[int] metric: str = "euclidean" eps: float = 1e-12 min_cluster_size: Optional[int] = None save_models: bool = False # Filled by fit() X_: Optional[np.ndarray] = field(init=False, default=None) N_: Optional[int] = field(init=False, default=None) D_: Optional[np.ndarray] = field(init=False, default=None) # (N,N) float64 core_: Dict[int, np.ndarray] = field(init=False, default_factory=dict) # m -> (N,) float64 kmax_: Optional[int] = field(init=False, default=None) edges_ut_: Optional[np.ndarray] = field(init=False, default=None) # CORE-SG edges (E,2), i<j # kNN tables idx_with_self_: Optional[np.ndarray] = field(init=False, default=None) # (N, kmax+1) dst_with_self_: Optional[np.ndarray] = field(init=False, default=None) # (N, kmax+1) idx_no_self_: Optional[np.ndarray] = field(init=False, default=None) # (N, kmax) dst_no_self_: Optional[np.ndarray] = field(init=False, default=None) # (N, kmax) A_knn_: Optional[csr_matrix] = field(init=False, default=None) # neighbor → distance CSR # MSTs per m (optional) msts_: Dict[int, Tuple[np.ndarray, np.ndarray, np.ndarray]] = field(init=False, default_factory=dict) mst_times_: Dict[int, float] = field(init=False, default_factory=dict) # Final HDBSCAN-like models per m models_: Dict[int, CoreSGModel] = field(init=False, default_factory=dict, repr=False) condensed_trees_: Dict[int, object] = field(init=False, default_factory=dict, repr=False) labels_by_m_: Dict[int, np.ndarray] = field(init=False, default_factory=dict, repr=False) times_: Dict[int, float] = field(init=False, default_factory=dict, repr=False) # -------------------------------------------------------- # FIT: build D, self-inclusive cores, CORE-SG graph, CSR neighbor table # --------------------------------------------------------
[docs] def fit(self, X: np.ndarray) -> "CoreSGHDBSCAN": X = np.asarray(X, dtype=np.float64, order="C") N = X.shape[0] mlist = np.sort(np.unique(self.min_samples_list)).astype(int) if len(mlist) == 0: raise ValueError("min_samples_list must contain at least one integer.") kmax = int(mlist[-1]) if kmax >= N: raise ValueError(f"kmax ({kmax}) must be < N ({N}).") self.X_ = X self.N_ = N self.kmax_ = kmax # A) full distances D (for ties + fallback base distances) t0 = time.time() D = squareform(pdist(X, metric=self.metric)).astype(np.float64, copy=False) np.fill_diagonal(D, 0.0) self.D_ = D t1 = time.time() print(f"[CORE-SG] Full distance matrix computed in {t1 - t0:.3f}s") # B) kNN with self included: n_neighbors = kmax + 1, stable tie-breaking t0 = time.time() nn = NearestNeighbors(metric=self.metric) nn.fit(X) d_all, idx_all = nn.kneighbors(X, n_neighbors=kmax + 1, return_distance=True) # Force self at column 0 ar = np.arange(N, dtype=np.int32) if not np.all(idx_all[:, 0] == ar): idx_fixed = np.empty_like(idx_all) d_fixed = np.empty_like(d_all) idx_fixed[:, 0] = ar d_fixed[:, 0] = 0.0 idx_fixed[:, 1:] = idx_all[:, :kmax] d_fixed[:, 1:] = d_all[:, :kmax] idx_all, d_all = idx_fixed, d_fixed # Stable tie-break per row by (distance, index) for cols 1..kmax order = np.argsort(d_all[:, 1:], axis=1, kind="mergesort") row = np.arange(N)[:, None] order_full = np.concatenate([np.zeros((N, 1), dtype=int), order + 1], axis=1) d_all = d_all[row, order_full] idx_all = idx_all[row, order_full] self.idx_with_self_ = idx_all self.dst_with_self_ = d_all self.idx_no_self_ = idx_all[:, 1:] # neighbors without self self.dst_no_self_ = d_all[:, 1:] # SELF-INCLUSIVE cores: core_m[i] = d_with_self[i,m] self.core_.clear() for m in mlist: self.core_[m] = self.dst_with_self_[:, m].astype(np.float64, copy=False) core_kmax = self.core_[kmax] t1 = time.time() print(f"[CORE-SG] Self-inclusive core distances for {len(mlist)} m values in {t1 - t0:.3f}s") # C) kmax-NNG WITH ALL TIES (OR condition with cores) t0 = time.time() iu, ju = np.triu_indices(N, k=1) cond = (D[iu, ju] <= core_kmax[iu] + self.eps) | (D[iu, ju] <= core_kmax[ju] + self.eps) kmax_edges = np.stack((iu[cond].astype(np.int32), ju[cond].astype(np.int32)), axis=1) t1 = time.time() print(f"[CORE-SG] kmax-NNG-with-ties has {kmax_edges.shape[0]} edges (built in {t1 - t0:.3f}s)") # D) MST(kmax) on COMPLETE MRD via your dense Prim t0 = time.time() mst_kmax_edges = prim_mrd_mst_edges(X, core_kmax) # (N-1,2), i<j t1 = time.time() print(f"[CORE-SG] MST_kmax (Prim) has {mst_kmax_edges.shape[0]} edges (built in {t1 - t0:.3f}s)") # E) CORE-SG edges = union(kmax-NNG-with-ties, MST_kmax) if kmax_edges.size: all_edges = np.vstack([kmax_edges, mst_kmax_edges]) else: all_edges = mst_kmax_edges self.edges_ut_ = np.unique(all_edges, axis=0) print(f"[CORE-SG] CORE-SG graph has {self.edges_ut_.shape[0]} undirected edges") # F) Build CSR neighbor table A from kNN (no self) for fast base distances I_dir = np.repeat(np.arange(N, dtype=np.int32), kmax) J_dir = self.idx_no_self_.ravel().astype(np.int32) W_dir = self.dst_no_self_.ravel().astype(np.float64) self.A_knn_ = csr_matrix((W_dir, (I_dir, J_dir)), shape=(N, N)) return self
[docs] def fit_from_distance_matrix(self, D: np.ndarray) -> "CoreSGHDBSCAN": """ Build CORE-SG *from a precomputed distance matrix* D (NxN). - D[i,j] is the base dissimilarity between points i and j. - We compute self-inclusive core distances and kmax-NNG from D. - We build CORE-SG edges via kmax-NNG ∪ MST_kmax (on MRD_kmax). After this, you can call self.run(...) exactly as usual. """ D = np.asarray(D, dtype=np.float64, order="C") if D.ndim != 2 or D.shape[0] != D.shape[1]: raise ValueError("D must be a square matrix.") N = D.shape[0] np.fill_diagonal(D, 0.0) mlist = np.sort(np.unique(self.min_samples_list)).astype(int) if len(mlist) == 0: raise ValueError("min_samples_list must contain at least one integer.") kmax = int(mlist[-1]) if kmax >= N: raise ValueError(f"kmax ({kmax}) must be < N ({N}).") self.X_ = None # we’re working purely in distance space self.N_ = N self.kmax_ = kmax self.D_ = D # --- kNN from D (self-inclusive) --- # sort each row by distance (stable, so ties broken by index) idx_all = np.argsort(D, axis=1, kind="mergesort") d_all = np.take_along_axis(D, idx_all, axis=1) # keep self + kmax neighbors if idx_all.shape[1] < kmax + 1: raise ValueError("Distance matrix does not have enough neighbors per row.") idx_all = idx_all[:, :kmax + 1] d_all = d_all[:, :kmax + 1] # ensure self is at column 0 ar = np.arange(N, dtype=np.int32) if not np.all(idx_all[:, 0] == ar): for i in range(N): pos = int(np.where(idx_all[i] == i)[0][0]) if pos != 0: idx_all[i, 0], idx_all[i, pos] = idx_all[i, pos], idx_all[i, 0] d_all[i, 0], d_all[i, pos] = d_all[i, pos], d_all[i, 0] self.idx_with_self_ = idx_all self.dst_with_self_ = d_all self.idx_no_self_ = idx_all[:, 1:] self.dst_no_self_ = d_all[:, 1:] # --- self-inclusive core distances for all m --- self.core_.clear() for m in mlist: self.core_[m] = self.dst_with_self_[:, m].astype(np.float64, copy=False) core_kmax = self.core_[kmax] # --- kmax-NNG-with-ties from D & core_kmax (same condition as in fit) --- iu, ju = np.triu_indices(N, k=1) cond = (D[iu, ju] <= core_kmax[iu] + self.eps) | ( D[iu, ju] <= core_kmax[ju] + self.eps ) if np.any(cond): kmax_edges = np.stack( (iu[cond].astype(np.int32), ju[cond].astype(np.int32)), axis=1 ) else: kmax_edges = np.empty((0, 2), dtype=np.int32) # --- MST_kmax on COMPLETE MRD graph, using D as base distances --- mst_kmax_edges = prim_mrd_mst_edges_from_D(D, core_kmax) # (N-1,2) # --- CORE-SG edges = union(kmax-NNG-with-ties, MST_kmax) --- if kmax_edges.size: all_edges = np.vstack([kmax_edges, mst_kmax_edges]) else: all_edges = mst_kmax_edges self.edges_ut_ = np.unique(all_edges, axis=0) print(f"[CORE-SG] (precomputed) CORE-SG graph has {self.edges_ut_.shape[0]} edges") # --- CSR neighbor table A_knn_ from idx_no_self_/dst_no_self_ --- I_dir = np.repeat(np.arange(N, dtype=np.int32), kmax) J_dir = self.idx_no_self_.ravel().astype(np.int32) W_dir = self.dst_no_self_.ravel().astype(np.float64) self.A_knn_ = csr_matrix((W_dir, (I_dir, J_dir)), shape=(N, N)) return self
# -------------------------------------------------------- # Helper: base distance per edge using kNN tables or D # -------------------------------------------------------- def _base_distance_from_tables_or_D(self, r: np.ndarray, c: np.ndarray) -> np.ndarray: """ For each undirected edge (r[i], c[i]): try to get base distance from the kNN tables (if either direction appears there); otherwise fall back to D. """ A = self.A_knn_ D = self.D_ w1 = A[r, c].A.ravel() w2 = A[c, r].A.ravel() base = np.empty_like(w1) mask1 = w1 > 0 mask2 = w2 > 0 both = mask1 & mask2 none = (~mask1) & (~mask2) base[both] = np.minimum(w1[both], w2[both]) base[mask1 & (~both)] = w1[mask1 & (~both)] base[mask2 & (~both)] = w2[mask2 & (~both)] base[none] = D[r[none], c[none]] return base
[docs] def model(self, min_samples): if not self.save_models: raise ValueError( "Models were not saved. Initialize with save_models=True to access models_[min_samples]." ) if min_samples not in self.models_: raise KeyError(f"min_samples={min_samples} not found in saved models.") return self.models_[min_samples]
# -------------------------------------------------------- # RUN: per-m MST on CORE-SG graph + generic pipeline # --------------------------------------------------------
[docs] def run(self, cluster_selection_method: str = "eom", allow_single_cluster: bool = False, match_reference_implementation: bool = False, cluster_selection_epsilon: float = 0.0) -> "CoreSGHDBSCAN": """ Run Core-SG clustering for all requested ``min_samples`` values. Stores ------ models_ : dict Saved per-``m`` models when ``save_models=True``. condensed_trees_ : dict Condensed tree objects for all fitted ``m`` values. labels_by_m_ : dict Stored labels for all fitted ``m`` values. """ if self.D_ is None or self.edges_ut_ is None or not self.core_: raise RuntimeError("Call fit(X) before run().") self.models_.clear() self.msts_.clear() self.mst_times_.clear() self.times_.clear() self.condensed_trees_.clear() self.labels_by_m_.clear() N = self.N_ D = self.D_ edges = self.edges_ut_ i_idx = edges[:, 0] j_idx = edges[:, 1] for m in sorted(np.unique(self.min_samples_list)): core_m = self.core_[int(m)] # --- reweight edges with MRD_m via your base-distance scheme --- t0 = time.time() base = self._base_distance_from_tables_or_D(i_idx, j_idx) w_ut = np.maximum.reduce([core_m[i_idx], core_m[j_idx], base]) # build symmetric sparse graph and MST data = np.concatenate([w_ut, w_ut]) row = np.concatenate([i_idx, j_idx]) col = np.concatenate([j_idx, i_idx]) G = coo_matrix((data, (row, col)), shape=(N, N)) mst_sparse = csgraph.minimum_spanning_tree(G) coo_mst = mst_sparse.tocoo() u = coo_mst.row.astype(np.int32) v = coo_mst.col.astype(np.int32) w = coo_mst.data.astype(np.float64) min_spanning_tree = np.vstack([u, v, w]).T order = np.argsort(min_spanning_tree[:, 2]) min_spanning_tree = min_spanning_tree[order] mst_time = time.time() - t0 self.msts_[int(m)] = (u, v, w) self.mst_times_[int(m)] = mst_time # --- generic pipeline: MST -> single_linkage_tree -> labels --- t1 = time.time() single_linkage_tree = _linkage_label(min_spanning_tree) effective_min_cluster_size = int(m) if self.min_cluster_size is None else int(self.min_cluster_size) condensed_tree_array = _condense_tree(single_linkage_tree, effective_min_cluster_size) stability_dict = _compute_stability(condensed_tree_array) labels, probabilities, stabilities = _get_clusters( condensed_tree_array, stability_dict, cluster_selection_method, allow_single_cluster, match_reference_implementation, cluster_selection_epsilon, ) t2 = time.time() model = CoreSGModel( labels=labels, probabilities=probabilities, stabilities=stabilities, condensed_tree_array=condensed_tree_array, single_linkage_tree=single_linkage_tree, ) self.labels_by_m_[int(m)] = labels self.condensed_trees_[int(m)] = model.condensed_tree_ if self.save_models: self.models_[int(m)] = model self.times_[int(m)] = mst_time + (t2 - t1) print(f"[CORE-SG] m={m:2d}: MST+tree+labels in {self.times_[int(m)]:.4f}s") return self
# convenience plotting for one m
[docs] def plot_condensed_tree(self, m: int, figsize=(8, 5)): import matplotlib.pyplot as plt if m in self.condensed_trees_: ct = self.condensed_trees_[m] elif m in self.models_: ct = self.models_[m].condensed_tree_ else: raise KeyError(f"m={m} not found in CORE-SG results.") if ct is None: print(f"No condensed tree for CORE-SG m={m}") return plt.figure(figsize=figsize) ct.plot(select_clusters=False, label_clusters=False) plt.title(f"CORE-SG Condensed Tree (min_samples = {m})") plt.show()
# =========================================== # Helper: plot condensed tree for any model dict # ===========================================
[docs] def plot_condensed_tree_for_m(models_dict, m: int, title_prefix: str = "", figsize=(8, 5)): """ Plot the condensed tree for a selected fitted ``min_samples`` value. Parameters ---------- model : CoreSGHDBSCAN or GraphCoreSGHDBSCAN Fitted clustering object. m : int Selected ``min_samples`` value. Returns ------- None """ import matplotlib.pyplot as plt if m not in models_dict: raise KeyError(f"m={m} not found in models_dict") model = models_dict[m] ct = model.condensed_tree_ if ct is None: print(f"No condensed tree available for m={m}") return plt.figure(figsize=figsize) ct.plot(select_clusters=False, label_clusters=False) if title_prefix: plt.title(f"{title_prefix} Condensed Tree (min_samples = {m})") else: plt.title(f"Condensed Tree (min_samples = {m})") plt.show()