Skip to content

Custom Spaces

One of the main strengths of kmeanssa-ng is how easy it is to add support for new metric spaces. The library’s architecture separates the clustering algorithm from the geometric space, so you can make the algorithm work on your own custom space by implementing three abstract classes: Point, Center, and Space.

This guide will walk you through the process.

Step 1: Define Your Point

A Point represents an immobile element in your metric space. It’s the basic building block for your data. You need to create a class that inherits from kmeanssa_ng.core.abstract.Point.

from kmeanssa_ng.core.abstract import Point, Space

class MyPoint(Point):
    def __init__(self, space: 'MySpace', location: YourLocationType):
        self._space = space
        self._location = location

    @property
    def space(self) -> Space:
        return self._space

    # Add any space-specific properties you need
    @property
    def location(self) -> YourLocationType:
        return self._location

Key requirements: - It must have a space property that returns the Space object it belongs to. - It should store its location or state in a way that makes sense for your space.

Step 2: Define Your Center

A Center represents a mobile point that can serve as a cluster center. It inherits from your custom Point class and from kmeanssa_ng.core.abstract.Center. It must implement two methods for movement: brownian_motion and drift.

from kmeanssa_ng.core.abstract import Center

class MyCenter(MyPoint, Center):
    def brownian_motion(self, time_to_travel: float) -> None:
        '''Implement random movement in your space.

        For example:
        - Choose a random direction
        - Move distance ~ sqrt(time_to_travel)
        - Update self._location
        '''
        # Your implementation here
        pass

    def drift(self, target_point: Point, prop_to_travel: float) -> None:
        '''Implement directed movement toward a target.

        For example:
        - Compute the geodesic from self to target
        - Move a proportion `prop_to_travel` along the geodesic
        - Update self._location
        '''
        # Your implementation here
        pass
  • brownian_motion: Implements random, exploratory movement. The distance of the move should typically scale with the square root of time_to_travel.
  • drift: Implements deterministic, exploitative movement towards a target_point. The prop_to_travel parameter specifies what fraction of the distance to the target should be covered.

Step 3: Define Your Space

The Space class represents the metric space itself. It brings everything together, providing the core geometric operations. It must inherit from kmeanssa_ng.core.abstract.Space and implement several methods.

from kmeanssa_ng.core.abstract import Space

class MySpace(Space):
    def distance(self, p1: Point, p2: Point) -> float:
        """Compute your metric."""
        # Your distance computation
        pass

    def sample_points(self, n: int, strategy) -> list[Point]:
        """Sample random points using a strategy."""
        # Your sampling logic, e.g., by delegating to the strategy
        return strategy.sample(self, n)

    def center_from_point(self, point: Point) -> Center:
        """Create a Center from a Point."""
        # e.g., return MyCenter(point)
        pass

    def compute_clusters(self, centers: list[Center]) -> None:
        """Assign each observation to the nearest center."""
        # Your cluster assignment logic
        pass

    def calculate_energy(self, centers: list[Center]) -> float:
        """Compute sum of squared distances to nearest centers."""
        # Your energy calculation
        pass

Key responsibilities: - distance: The metric \(d(p_1, p_2)\). - sample_points: A factory method for creating points, which typically delegates to a SamplingStrategy. - center_from_point: A factory method for creating a Center from a Point. - compute_clusters and calculate_energy: Methods for assigning points to clusters and calculating the total k-means energy.

Step 4: Use the Algorithm

Once you have implemented MyPoint, MyCenter, and MySpace, you can use the SimulatedAnnealing algorithm out of the box.

from kmeanssa_ng import SimulatedAnnealing

# 1. Create your space and sample observations
my_space = MySpace(...)
# Assuming you've implemented a MySamplingStrategy
points = my_space.sample_points(100, strategy=MySamplingStrategy())

# 2. Run simulated annealing - it works immediately!
from kmeanssa_ng.core.strategies.initialization import KMeansPlusPlus
from kmeanssa_ng.core.strategies.robustification import MinimizeEnergy

sa = SimulatedAnnealing(observations=points, k=5)
centers = sa.run_interleaved(
    initialization_strategy=KMeansPlusPlus(),
    robustification_strategy=MinimizeEnergy()
)

print("Found cluster centers:", centers)

That’s it! The algorithm automatically works with your new space.

Conceptual Example: Unit Circle

Here’s a conceptual implementation for clustering on the unit circle \(S^1\), where points are defined by an angle.

import numpy as np
from kmeanssa_ng.core.abstract import Point, Center, Space

class CirclePoint(Point):
    def __init__(self, space: 'CircleSpace', angle: float):
        self._space = space
        self.angle = angle % (2 * np.pi)  # Normalize to [0, 2π)

    @property
    def space(self) -> Space:
        return self._space

class CircleCenter(CirclePoint, Center):
    def brownian_motion(self, time_to_travel: float) -> None:
        # Random angular displacement
        displacement = np.random.normal(0, np.sqrt(time_to_travel))
        self.angle = (self.angle + displacement) % (2 * np.pi)

    def drift(self, target_point: Point, prop_to_travel: float) -> None:
        # Move toward target along the shorter arc
        target_angle = target_point.angle
        diff = (target_angle - self.angle) % (2 * np.pi)
        if diff > np.pi:
            diff -= 2 * np.pi # Move along the shorter arc
        self.angle = (self.angle + prop_to_travel * diff) % (2 * np.pi)

class CircleSpace(Space):
    def distance(self, p1: Point, p2: Point) -> float:
        # Angular distance (length of the shorter arc)
        diff = abs(p1.angle - p2.angle)
        return min(diff, 2 * np.pi - diff)

    # ... other methods (sampling, etc.) would be implemented here ...

This example demonstrates how the framework generalizes to any space where you can define a distance metric and a notion of movement for the centers.