Skip to content

Quantum Graph API

kmeanssa_ng.quantum_graph

Quantum graph implementation and utilities.

MostFrequentNode

Bases: RobustificationStrategy[list[Any]]

Strategy to find the most frequent node for each center.

Returns QGCenter objects located at the most frequently visited nodes during the robustification phase.

Source code in kmeanssa_ng/quantum_graph/robustification.py
class MostFrequentNode(RobustificationStrategy[list[Any]]):
    """Strategy to find the most frequent node for each center.

    Returns QGCenter objects located at the most frequently visited nodes
    during the robustification phase.
    """

    def initialize(self, sa: SimulatedAnnealing) -> None:
        """Initialize an empty list to store node collections.

        Raises:
            TypeError: If the space is not a QuantumGraph.
        """
        from .space import QuantumGraph

        if not isinstance(sa.space, QuantumGraph):
            raise TypeError(
                "MostFrequentNode strategy can only be used with QuantumGraph spaces."
            )

        self._central_nodes_collections: list[list] = []
        self.sa = sa

    def collect(self, sa: SimulatedAnnealing) -> None:
        """Collect the closest node for each center at the current step."""
        current_nodes = [center._closest_node() for center in sa.centers]
        self._central_nodes_collections.append(current_nodes)

    def get_result(self) -> list[Any]:
        """Return QGCenter objects at the most frequent nodes.

        Always returns a list of QGCenter objects, even for k=1.
        """
        if not self._central_nodes_collections:
            return []

        num_centers = len(self._central_nodes_collections[0])
        transposed_nodes = [
            [nodes[i] for nodes in self._central_nodes_collections]
            for i in range(num_centers)
        ]

        robust_nodes = [
            Counter(center_nodes).most_common(1)[0][0]
            for center_nodes in transposed_nodes
        ]

        # Convert node IDs to QGCenter objects
        from .space import QuantumGraph

        if isinstance(self.sa.space, QuantumGraph):
            robust_centers = [
                self.sa.space.node_as_center(node) for node in robust_nodes
            ]
        else:
            # Fallback for non-QuantumGraph spaces (shouldn't happen with this strategy)
            robust_centers = robust_nodes

        # Always return a list for consistency
        return robust_centers

collect(sa)

Collect the closest node for each center at the current step.

Source code in kmeanssa_ng/quantum_graph/robustification.py
def collect(self, sa: SimulatedAnnealing) -> None:
    """Collect the closest node for each center at the current step."""
    current_nodes = [center._closest_node() for center in sa.centers]
    self._central_nodes_collections.append(current_nodes)

get_result()

Return QGCenter objects at the most frequent nodes.

Always returns a list of QGCenter objects, even for k=1.

Source code in kmeanssa_ng/quantum_graph/robustification.py
def get_result(self) -> list[Any]:
    """Return QGCenter objects at the most frequent nodes.

    Always returns a list of QGCenter objects, even for k=1.
    """
    if not self._central_nodes_collections:
        return []

    num_centers = len(self._central_nodes_collections[0])
    transposed_nodes = [
        [nodes[i] for nodes in self._central_nodes_collections]
        for i in range(num_centers)
    ]

    robust_nodes = [
        Counter(center_nodes).most_common(1)[0][0]
        for center_nodes in transposed_nodes
    ]

    # Convert node IDs to QGCenter objects
    from .space import QuantumGraph

    if isinstance(self.sa.space, QuantumGraph):
        robust_centers = [
            self.sa.space.node_as_center(node) for node in robust_nodes
        ]
    else:
        # Fallback for non-QuantumGraph spaces (shouldn't happen with this strategy)
        robust_centers = robust_nodes

    # Always return a list for consistency
    return robust_centers

initialize(sa)

Initialize an empty list to store node collections.

Raises:

Type Description
TypeError

If the space is not a QuantumGraph.

Source code in kmeanssa_ng/quantum_graph/robustification.py
def initialize(self, sa: SimulatedAnnealing) -> None:
    """Initialize an empty list to store node collections.

    Raises:
        TypeError: If the space is not a QuantumGraph.
    """
    from .space import QuantumGraph

    if not isinstance(sa.space, QuantumGraph):
        raise TypeError(
            "MostFrequentNode strategy can only be used with QuantumGraph spaces."
        )

    self._central_nodes_collections: list[list] = []
    self.sa = sa

QGCenter

Bases: QGPoint, Center

A movable cluster center on a quantum graph.

Centers can perform: - Brownian motion: Random walk for exploration - Drift: Directed movement toward target points

Attributes:

Name Type Description
space QuantumGraph

The quantum graph this center belongs to.

edge tuple[int, int]

The edge containing this center.

position tuple[int, int]

Position along the edge.

Example
graph = QuantumGraph(...)
point = QGPoint(graph, edge=(0, 1), position=0.5)
center = QGCenter(point)
center.brownian_motion(0.1)
center.drift(target_point, 0.5)
Source code in kmeanssa_ng/quantum_graph/center.py
class QGCenter(QGPoint, Center):
    """A movable cluster center on a quantum graph.

    Centers can perform:
    - Brownian motion: Random walk for exploration
    - Drift: Directed movement toward target points

    Attributes:
        space: The quantum graph this center belongs to.
        edge: The edge containing this center.
        position: Position along the edge.

    Example:
        ```python
        graph = QuantumGraph(...)
        point = QGPoint(graph, edge=(0, 1), position=0.5)
        center = QGCenter(point)
        center.brownian_motion(0.1)
        center.drift(target_point, 0.5)
        ```
    """

    def __init__(
        self,
        point: QGPoint,
        rng: Generator | None = None,
    ) -> None:
        """Initialize a center from a point.

        Args:
            point: The initial point location.
            rng: Random number generator. If None, creates a new default_rng().
        """
        super().__init__(point.space, point.edge, point.position)
        self._rng = rng if rng is not None else default_rng()

    def _find_best_neighbor(self, n1: int, n2: int) -> int:
        """Find the neighbor of n1 that is closest to n2.

        Args:
            n1: Starting node.
            n2: Target node.

        Returns:
            The neighbor of n1 that minimizes the path to n2.
        """
        if n1 == n2:
            return n1

        neighbors = list(self.space.neighbors(n1))
        min_distance = np.inf
        best_neighbors = []

        for neighbor in neighbors:
            distance = self.space.node_distance(
                neighbor, n2
            ) + self.space.node_distance(n1, neighbor)
            if distance < min_distance:
                min_distance = distance
                best_neighbors = [neighbor]
            elif distance == min_distance:
                best_neighbors.append(neighbor)

        return rd.choice(best_neighbors)

    def drift(self, target_point: QGPoint, prop_to_travel: float) -> None:
        """Move toward a target point.

        Moves a proportion of the distance to the target point along
        the geodesic path in the quantum graph.

        Args:
            target_point: The point to move toward.
            prop_to_travel: Proportion of distance to travel (0 to 1).

        Raises:
            ValueError: If target_point is None, prop_to_travel is not numeric,
                or prop_to_travel is not in [0, 1].
        """
        # Validate target_point
        if target_point is None:
            raise ValueError("target_point cannot be None")

        # Validate prop_to_travel
        try:
            prop_float = float(prop_to_travel)
        except (TypeError, ValueError) as e:
            raise ValueError(
                f"prop_to_travel must be a number, got {type(prop_to_travel).__name__}"
            ) from e

        if prop_float < 0 or prop_float > 1:
            raise ValueError(f"prop_to_travel must be in [0, 1], got {prop_float}")

        quantum_path = self.space.quantum_path(self, target_point)
        path = quantum_path["path"]
        dist_to_target = quantum_path["distance"]
        dist_to_travel = prop_to_travel * dist_to_target

        # Case 1: Same edge (simple movement)
        if path is None:
            self._drift_on_same_edge(target_point, dist_to_travel)
        else:
            # Case 2: Different edges (traverse graph)
            self._drift_across_edges(target_point, path, dist_to_travel)

    def _drift_on_same_edge(self, target_point: QGPoint, dist_to_travel: float) -> None:
        """Handle drift when center and target are on the same edge."""
        if self.edge == target_point.edge:
            # Same edge, same orientation
            if self.position < target_point.position:
                self.position += dist_to_travel
            else:
                self.position -= dist_to_travel
        else:
            # Same physical edge, different parametrization
            if self.position < target_point.position:
                self.position -= dist_to_travel
            else:
                self.position += dist_to_travel

    def _drift_across_edges(
        self,
        target_point: QGPoint,
        path: tuple[int, int],
        dist_to_travel: float,
    ) -> None:
        """Handle drift when center and target are on different edges."""
        next_node, target_node = path

        # Orient edges correctly
        if self.edge[0] == next_node:
            self.reverse()
        if target_point.edge[1] == target_node:
            target_point.reverse()

        remaining_dist = dist_to_travel
        dist_to_next_node = self.space.get_edge_length(*self.edge) - self.position

        # Traverse edges until we've traveled the required distance
        while remaining_dist > dist_to_next_node:
            remaining_dist -= dist_to_next_node

            if target_node == next_node:
                # Reached target edge
                self.position = 0
                self.edge = target_point.edge
                dist_to_next_node = remaining_dist + 1  # Exit condition
            else:
                # Move to next edge
                self.position = 0
                cur_node = next_node
                next_node = self._find_best_neighbor(cur_node, target_node)
                self.edge = (cur_node, next_node)
                dist_to_next_node = self.space.get_edge_length(*self.edge)

        self.position += remaining_dist

    def clone(self) -> QGCenter:
        """Create an independent copy of this center.

        The cloned center shares the same quantum graph (space) but has
        independent edge and position attributes. This is much faster than
        deepcopy as it doesn't duplicate the entire graph structure.

        Note: This creates a shallow copy of the center's state, bypassing
        validation to handle intermediate states during brownian motion.

        Returns:
            A new QGCenter with the same location but independent state.

        Example:
            ```python
            original = QGCenter(...)
            copy = original.clone()
            original.brownian_motion(0.1)  # Doesn't affect copy
            ```
        """
        # Create new center without validation (like deepcopy does)
        # This is needed because brownian_motion can leave centers in
        # temporarily invalid states (e.g., negative positions before
        # edge traversal is complete)
        new_center = object.__new__(QGCenter)
        new_center._quantum_graph = self._quantum_graph
        new_center._edge = self._edge
        new_center.position = self.position
        new_center._rng = self._rng
        return new_center

    def brownian_motion(self, time_to_travel: float) -> None:
        """Perform Brownian motion on the quantum graph.

        The center performs a random walk with step size proportional
        to sqrt(time_to_travel).

        Args:
            time_to_travel: Time parameter (distance ~ sqrt(time)).

        Raises:
            ValueError: If time_to_travel is negative or not numeric.
        """
        # Validate time_to_travel
        try:
            time_float = float(time_to_travel)
        except (TypeError, ValueError) as e:
            raise ValueError(
                f"time_to_travel must be a number, got {type(time_to_travel).__name__}"
            ) from e

        if time_float < 0:
            raise ValueError(f"time_to_travel must be non-negative, got {time_float}")

        dist_to_travel = np.sqrt(time_float) * self._rng.standard_normal()
        edge_length = self.space.get_edge_length(*self.edge)

        forward = dist_to_travel > 0
        if forward:
            next_node = self.edge[1]
            dist_to_next_node = edge_length - self.position
        else:
            next_node = self.edge[0]
            dist_to_next_node = self.position

        remaining_dist = abs(dist_to_travel)

        # Traverse edges if motion exceeds current edge
        while remaining_dist >= dist_to_next_node:
            remaining_dist -= dist_to_next_node
            cur_node = next_node
            next_node = rd.choice(list(self.space.neighbors(cur_node)))
            self.edge = (cur_node, next_node)
            edge_length = self.space.get_edge_length(*self.edge)
            self.position = 0
            dist_to_next_node = edge_length

        # Final position update
        self.position += remaining_dist if forward else -remaining_dist

    def __repr__(self) -> str:
        """Detailed string representation."""
        return f"QGCenter(edge={self.edge}, position={self.position:.3f})"

    def __str__(self) -> str:
        """User-friendly string representation."""
        closest = self._closest_node()
        return f"Center near node {closest} [edge {self.edge}, pos={self.position:.3f}]"

__init__(point, rng=None)

Initialize a center from a point.

Parameters:

Name Type Description Default
point QGPoint

The initial point location.

required
rng Generator | None

Random number generator. If None, creates a new default_rng().

None
Source code in kmeanssa_ng/quantum_graph/center.py
def __init__(
    self,
    point: QGPoint,
    rng: Generator | None = None,
) -> None:
    """Initialize a center from a point.

    Args:
        point: The initial point location.
        rng: Random number generator. If None, creates a new default_rng().
    """
    super().__init__(point.space, point.edge, point.position)
    self._rng = rng if rng is not None else default_rng()

__repr__()

Detailed string representation.

Source code in kmeanssa_ng/quantum_graph/center.py
def __repr__(self) -> str:
    """Detailed string representation."""
    return f"QGCenter(edge={self.edge}, position={self.position:.3f})"

__str__()

User-friendly string representation.

Source code in kmeanssa_ng/quantum_graph/center.py
def __str__(self) -> str:
    """User-friendly string representation."""
    closest = self._closest_node()
    return f"Center near node {closest} [edge {self.edge}, pos={self.position:.3f}]"

brownian_motion(time_to_travel)

Perform Brownian motion on the quantum graph.

The center performs a random walk with step size proportional to sqrt(time_to_travel).

Parameters:

Name Type Description Default
time_to_travel float

Time parameter (distance ~ sqrt(time)).

required

Raises:

Type Description
ValueError

If time_to_travel is negative or not numeric.

Source code in kmeanssa_ng/quantum_graph/center.py
def brownian_motion(self, time_to_travel: float) -> None:
    """Perform Brownian motion on the quantum graph.

    The center performs a random walk with step size proportional
    to sqrt(time_to_travel).

    Args:
        time_to_travel: Time parameter (distance ~ sqrt(time)).

    Raises:
        ValueError: If time_to_travel is negative or not numeric.
    """
    # Validate time_to_travel
    try:
        time_float = float(time_to_travel)
    except (TypeError, ValueError) as e:
        raise ValueError(
            f"time_to_travel must be a number, got {type(time_to_travel).__name__}"
        ) from e

    if time_float < 0:
        raise ValueError(f"time_to_travel must be non-negative, got {time_float}")

    dist_to_travel = np.sqrt(time_float) * self._rng.standard_normal()
    edge_length = self.space.get_edge_length(*self.edge)

    forward = dist_to_travel > 0
    if forward:
        next_node = self.edge[1]
        dist_to_next_node = edge_length - self.position
    else:
        next_node = self.edge[0]
        dist_to_next_node = self.position

    remaining_dist = abs(dist_to_travel)

    # Traverse edges if motion exceeds current edge
    while remaining_dist >= dist_to_next_node:
        remaining_dist -= dist_to_next_node
        cur_node = next_node
        next_node = rd.choice(list(self.space.neighbors(cur_node)))
        self.edge = (cur_node, next_node)
        edge_length = self.space.get_edge_length(*self.edge)
        self.position = 0
        dist_to_next_node = edge_length

    # Final position update
    self.position += remaining_dist if forward else -remaining_dist

clone()

Create an independent copy of this center.

The cloned center shares the same quantum graph (space) but has independent edge and position attributes. This is much faster than deepcopy as it doesn't duplicate the entire graph structure.

Note: This creates a shallow copy of the center's state, bypassing validation to handle intermediate states during brownian motion.

Returns:

Type Description
QGCenter

A new QGCenter with the same location but independent state.

Example
original = QGCenter(...)
copy = original.clone()
original.brownian_motion(0.1)  # Doesn't affect copy
Source code in kmeanssa_ng/quantum_graph/center.py
def clone(self) -> QGCenter:
    """Create an independent copy of this center.

    The cloned center shares the same quantum graph (space) but has
    independent edge and position attributes. This is much faster than
    deepcopy as it doesn't duplicate the entire graph structure.

    Note: This creates a shallow copy of the center's state, bypassing
    validation to handle intermediate states during brownian motion.

    Returns:
        A new QGCenter with the same location but independent state.

    Example:
        ```python
        original = QGCenter(...)
        copy = original.clone()
        original.brownian_motion(0.1)  # Doesn't affect copy
        ```
    """
    # Create new center without validation (like deepcopy does)
    # This is needed because brownian_motion can leave centers in
    # temporarily invalid states (e.g., negative positions before
    # edge traversal is complete)
    new_center = object.__new__(QGCenter)
    new_center._quantum_graph = self._quantum_graph
    new_center._edge = self._edge
    new_center.position = self.position
    new_center._rng = self._rng
    return new_center

drift(target_point, prop_to_travel)

Move toward a target point.

Moves a proportion of the distance to the target point along the geodesic path in the quantum graph.

Parameters:

Name Type Description Default
target_point QGPoint

The point to move toward.

required
prop_to_travel float

Proportion of distance to travel (0 to 1).

required

Raises:

Type Description
ValueError

If target_point is None, prop_to_travel is not numeric, or prop_to_travel is not in [0, 1].

Source code in kmeanssa_ng/quantum_graph/center.py
def drift(self, target_point: QGPoint, prop_to_travel: float) -> None:
    """Move toward a target point.

    Moves a proportion of the distance to the target point along
    the geodesic path in the quantum graph.

    Args:
        target_point: The point to move toward.
        prop_to_travel: Proportion of distance to travel (0 to 1).

    Raises:
        ValueError: If target_point is None, prop_to_travel is not numeric,
            or prop_to_travel is not in [0, 1].
    """
    # Validate target_point
    if target_point is None:
        raise ValueError("target_point cannot be None")

    # Validate prop_to_travel
    try:
        prop_float = float(prop_to_travel)
    except (TypeError, ValueError) as e:
        raise ValueError(
            f"prop_to_travel must be a number, got {type(prop_to_travel).__name__}"
        ) from e

    if prop_float < 0 or prop_float > 1:
        raise ValueError(f"prop_to_travel must be in [0, 1], got {prop_float}")

    quantum_path = self.space.quantum_path(self, target_point)
    path = quantum_path["path"]
    dist_to_target = quantum_path["distance"]
    dist_to_travel = prop_to_travel * dist_to_target

    # Case 1: Same edge (simple movement)
    if path is None:
        self._drift_on_same_edge(target_point, dist_to_travel)
    else:
        # Case 2: Different edges (traverse graph)
        self._drift_across_edges(target_point, path, dist_to_travel)

QGPoint

Bases: Point

A point on a quantum graph.

A quantum graph point is located on an edge at a specific position. The edge is represented as a tuple (node1, node2) and the position is the distance from node1 along the edge.

Attributes:

Name Type Description
space QuantumGraph

The quantum graph this point belongs to.

edge tuple[int, int]

The edge (node1, node2) containing this point.

position tuple[int, int]

Distance from node1 along the edge.

Example
graph = QuantumGraph(...)
point = QGPoint(graph, edge=(0, 1), position=0.5)
Source code in kmeanssa_ng/quantum_graph/point.py
class QGPoint(Point):
    """A point on a quantum graph.

    A quantum graph point is located on an edge at a specific position.
    The edge is represented as a tuple (node1, node2) and the position
    is the distance from node1 along the edge.

    Attributes:
        space: The quantum graph this point belongs to.
        edge: The edge (node1, node2) containing this point.
        position: Distance from node1 along the edge.

    Example:
        ```python
        graph = QuantumGraph(...)
        point = QGPoint(graph, edge=(0, 1), position=0.5)
        ```
    """

    def __init__(
        self,
        quantum_graph: QuantumGraph,
        edge: tuple[int, int],
        position: float,
    ) -> None:
        """Initialize a point on a quantum graph.

        Args:
            quantum_graph: The quantum graph containing this point.
            edge: Tuple (node1, node2) representing the edge.
            position: Distance from node1 along the edge.

        Raises:
            ValueError: If quantum_graph is None, edge doesn't exist in graph,
                or position is outside [0, edge_length].
        """
        if quantum_graph is None:
            raise ValueError("quantum_graph cannot be None")

        if not isinstance(edge, tuple) or len(edge) != 2:
            raise ValueError(f"edge must be a tuple of two nodes, got {edge}")

        # Check edge exists in graph (allow self-loops)
        if edge[0] != edge[1] and edge not in quantum_graph.edges:
            # Try reversed edge
            if (edge[1], edge[0]) not in quantum_graph.edges:
                raise ValueError(f"Edge {edge} does not exist in the graph")

        self._quantum_graph = quantum_graph
        self._edge = edge

        # Validate and set position
        self._validate_and_set_position(position)

    def _validate_and_set_position(self, position: float) -> None:
        """Validate and set the position on the edge.

        Args:
            position: Position along the edge from node1.

        Raises:
            ValueError: If position is not numeric or outside [0, edge_length].
        """
        # Check if position is numeric
        try:
            position_float = float(position)
        except (TypeError, ValueError) as e:
            raise ValueError(
                f"Position must be a number, got {type(position).__name__}"
            ) from e

        # Check if position is non-negative
        if position_float < 0:
            raise ValueError(f"Position must be non-negative, got {position_float}")

        # Check if position is within edge length
        edge_length = self.space.get_edge_length(*self._edge)
        if position_float > edge_length:
            raise ValueError(
                f"Position {position_float} exceeds edge length {edge_length} for edge {self._edge}"
            )

        self.position = position_float

    @property
    def space(self) -> QuantumGraph:
        """The quantum graph this point belongs to."""
        return self._quantum_graph

    @property
    def edge(self) -> tuple[int, int]:
        """The edge containing this point."""
        if self._edge not in self.space.edges:
            if self._edge[0] == self._edge[1]:
                return self._edge
            else:
                raise ValueError("The edge does not belong to the graph")
        return self._edge

    @edge.setter
    def edge(self, new_edge: tuple[int, int]) -> None:
        """Set the edge containing this point.

        Args:
            new_edge: New edge (node1, node2).

        Raises:
            ValueError: If the edge doesn't belong to the graph.

        Note:
            Position validation is not performed here to allow internal
            operations (brownian_motion, drift) to change edge first, then
            update position. Position validation is done in __init__.
        """
        if not ((new_edge in self.space.edges) or (new_edge[0] == new_edge[1])):
            raise ValueError("The edge does not belong to the graph")

        self._edge = new_edge

    def _closest_node(self) -> int:
        """Get the closest node to this point.

        Returns:
            The node (edge[0] or edge[1]) closest to this point.
        """
        edge_length = self.space.get_edge_length(*self.edge)
        if self.position < edge_length / 2:
            return self.edge[0]
        else:
            return self.edge[1]

    def reverse(self) -> None:
        """Reverse the edge orientation and adjust position.

        Changes edge from (a, b) to (b, a) and updates position accordingly.
        """
        self.edge = (self.edge[1], self.edge[0])
        edge_length = self.space.get_edge_length(*self.edge)
        self.position = edge_length - self.position

    def __str__(self) -> str:
        """String representation of the point."""
        name = (
            f" '{self.space.name}'"
            if hasattr(self.space, "name") and self.space.name
            else ""
        )
        return f"QGPoint on{name} edge {self.edge} at position {self.position:.3f}"

    def __repr__(self) -> str:
        """Detailed string representation."""
        return f"QGPoint(edge={self.edge}, position={self.position})"

edge property writable

The edge containing this point.

space property

The quantum graph this point belongs to.

__init__(quantum_graph, edge, position)

Initialize a point on a quantum graph.

Parameters:

Name Type Description Default
quantum_graph QuantumGraph

The quantum graph containing this point.

required
edge tuple[int, int]

Tuple (node1, node2) representing the edge.

required
position float

Distance from node1 along the edge.

required

Raises:

Type Description
ValueError

If quantum_graph is None, edge doesn't exist in graph, or position is outside [0, edge_length].

Source code in kmeanssa_ng/quantum_graph/point.py
def __init__(
    self,
    quantum_graph: QuantumGraph,
    edge: tuple[int, int],
    position: float,
) -> None:
    """Initialize a point on a quantum graph.

    Args:
        quantum_graph: The quantum graph containing this point.
        edge: Tuple (node1, node2) representing the edge.
        position: Distance from node1 along the edge.

    Raises:
        ValueError: If quantum_graph is None, edge doesn't exist in graph,
            or position is outside [0, edge_length].
    """
    if quantum_graph is None:
        raise ValueError("quantum_graph cannot be None")

    if not isinstance(edge, tuple) or len(edge) != 2:
        raise ValueError(f"edge must be a tuple of two nodes, got {edge}")

    # Check edge exists in graph (allow self-loops)
    if edge[0] != edge[1] and edge not in quantum_graph.edges:
        # Try reversed edge
        if (edge[1], edge[0]) not in quantum_graph.edges:
            raise ValueError(f"Edge {edge} does not exist in the graph")

    self._quantum_graph = quantum_graph
    self._edge = edge

    # Validate and set position
    self._validate_and_set_position(position)

__repr__()

Detailed string representation.

Source code in kmeanssa_ng/quantum_graph/point.py
def __repr__(self) -> str:
    """Detailed string representation."""
    return f"QGPoint(edge={self.edge}, position={self.position})"

__str__()

String representation of the point.

Source code in kmeanssa_ng/quantum_graph/point.py
def __str__(self) -> str:
    """String representation of the point."""
    name = (
        f" '{self.space.name}'"
        if hasattr(self.space, "name") and self.space.name
        else ""
    )
    return f"QGPoint on{name} edge {self.edge} at position {self.position:.3f}"

reverse()

Reverse the edge orientation and adjust position.

Changes edge from (a, b) to (b, a) and updates position accordingly.

Source code in kmeanssa_ng/quantum_graph/point.py
def reverse(self) -> None:
    """Reverse the edge orientation and adjust position.

    Changes edge from (a, b) to (b, a) and updates position accordingly.
    """
    self.edge = (self.edge[1], self.edge[0])
    edge_length = self.space.get_edge_length(*self.edge)
    self.position = edge_length - self.position

QuantumGraph

Bases: Graph, Space

A quantum graph is a metric graph where points can lie on edges.

This class extends NetworkX Graph to provide: - Distance computation between points on edges - Sampling of random points and centers - k-means clustering support

Each edge should have a 'length' attribute representing its metric length. Nodes and edges can have 'weight' and 'distribution' attributes for sampling.

Attributes:

Name Type Description
diameter float

The diameter of the graph (max distance between nodes).

node_position dict

Layout positions for visualization.

Example
graph = QuantumGraph()
graph.add_edge(0, 1, length=1.0)
graph.add_edge(1, 2, length=2.0)
graph.precomputing()  # Cache pairwise distances

points = graph.sample_points(100)
centers = graph.sample_centers(5)
Source code in kmeanssa_ng/quantum_graph/space.py
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
class QuantumGraph(nx.Graph, Space):
    """A quantum graph is a metric graph where points can lie on edges.

    This class extends NetworkX Graph to provide:
    - Distance computation between points on edges
    - Sampling of random points and centers
    - k-means clustering support

    Each edge should have a 'length' attribute representing its metric length.
    Nodes and edges can have 'weight' and 'distribution' attributes for sampling.

    Attributes:
        diameter: The diameter of the graph (max distance between nodes).
        node_position: Layout positions for visualization.

    Example:
        ```python
        graph = QuantumGraph()
        graph.add_edge(0, 1, length=1.0)
        graph.add_edge(1, 2, length=2.0)
        graph.precomputing()  # Cache pairwise distances

        points = graph.sample_points(100)
        centers = graph.sample_centers(5)
        ```
    """

    def __init__(
        self, incoming_graph_data=None, precompute: bool = False, **attr
    ) -> None:
        """Initialize a quantum graph.

        Args:
            incoming_graph_data: Input graph data (see networkx.Graph).
            precompute: If True, automatically precompute distances after initialization.
            **attr: Additional graph attributes.
        """
        super().__init__(incoming_graph_data, **attr)
        self._pairwise_nodes_distance: dict[int, dict[int, float]] | None = None
        self._pairwise_nodes_distance_array: np.ndarray | None = None
        self._node_to_index: dict[int, int] | None = None
        self._diameter: float = 0.0
        self._node_position: dict | None = None

        if precompute and self.number_of_nodes() > 0:
            self.precomputing()

    def add_edge(self, u_for_edge, v_for_edge, **attr) -> None:
        """Add an edge with validation of the length attribute.

        Args:
            u_for_edge: First node.
            v_for_edge: Second node.
            **attr: Edge attributes. Must include 'length' with a positive value.

        Raises:
            ValueError: If 'length' is missing, not positive, or not a number.
        """
        if "length" not in attr:
            raise ValueError(
                f"Edge ({u_for_edge}, {v_for_edge}) must have a 'length' attribute"
            )

        length = attr["length"]

        # Check if length is a number
        try:
            length_float = float(length)
        except (TypeError, ValueError) as e:
            raise ValueError(
                f"Edge ({u_for_edge}, {v_for_edge}) length must be a number, got {type(length).__name__}"
            ) from e

        # Check if length is positive
        if length_float <= 0:
            raise ValueError(
                f"Edge ({u_for_edge}, {v_for_edge}) length must be positive, got {length_float}"
            )

        super().add_edge(u_for_edge, v_for_edge, **attr)

    @property
    def diameter(self) -> float:
        """Compute and cache the graph diameter.

        Returns:
            Maximum distance between any two nodes.
        """
        if self._diameter == 0.0:
            for n1 in self.nodes:
                for n2 in self.nodes:
                    d = self.node_distance(n1, n2)
                    if d > self._diameter:
                        self._diameter = d
        return self._diameter

    def validate_edge_lengths(self) -> None:
        """Validate that all edges have positive length attributes.

        Raises:
            ValueError: If any edge is missing 'length' or has invalid length.
        """
        for u, v, data in self.edges(data=True):
            if "length" not in data:
                raise ValueError(f"Edge ({u}, {v}) missing 'length' attribute")

            length = data["length"]
            try:
                length_float = float(length)
            except (TypeError, ValueError) as e:
                raise ValueError(
                    f"Edge ({u}, {v}) length must be a number, got {type(length).__name__}"
                ) from e

            if length_float <= 0:
                raise ValueError(
                    f"Edge ({u}, {v}) has invalid length {length_float}, must be positive"
                )

    def precomputing(self) -> None:
        """Precompute and cache all pairwise node distances.

        This significantly speeds up distance queries.
        Should be called once after graph construction.

        Raises:
            ValueError: If graph is not connected or has invalid edge lengths.
        """
        # Validate edge lengths first
        self.validate_edge_lengths()

        # Check connectivity
        if self.number_of_nodes() > 0 and not nx.is_connected(self):
            num_components = nx.number_connected_components(self)
            raise ValueError(
                f"Graph must be connected for distance precomputing. "
                f"Found {num_components} connected components."
            )

        if self._pairwise_nodes_distance is None:
            self._pairwise_nodes_distance = dict(
                nx.all_pairs_dijkstra_path_length(self, weight="length")
            )

            # Create numpy array version for Numba-accelerated computations
            node_list = list(self.nodes())
            n = len(node_list)
            self._node_to_index = {node: i for i, node in enumerate(node_list)}
            self._pairwise_nodes_distance_array = np.zeros((n, n), dtype=np.float64)

            for i, node_i in enumerate(node_list):
                for j, node_j in enumerate(node_list):
                    self._pairwise_nodes_distance_array[i, j] = (
                        self._pairwise_nodes_distance[node_i][node_j]
                    )

    @property
    def node_position(self) -> dict:
        """Compute layout positions for visualization.

        Returns:
            Dictionary mapping nodes to (x, y) positions.
        """
        if self._node_position is None:
            drawing = nx.get_edge_attributes(self, "length")
            drawing_weights = {
                (i, j): {"drawing": 1 / drawing[(i, j)]} for i, j in drawing
            }
            nx.set_edge_attributes(self, drawing_weights)
            self._node_position = nx.layout.spring_layout(self, weight="drawing")
        return self._node_position

    def node_distance(self, n1: int, n2: int) -> float:
        """Compute shortest path distance between two nodes.

        Args:
            n1: First node.
            n2: Second node.

        Returns:
            Shortest path length using 'length' edge attribute.
        """
        if self._pairwise_nodes_distance is not None:
            return self._pairwise_nodes_distance[n1][n2]
        else:
            return nx.shortest_path_length(self, n1, n2, weight="length")

    def get_edge_length(self, n1: int, n2: int) -> float:
        """Get the length of an edge.

        Args:
            n1: First node of the edge.
            n2: Second node of the edge.

        Returns:
            Edge length.

        Raises:
            ValueError: If edge does not exist.
        """
        edge_data = self.get_edge_data(n1, n2)
        if edge_data is None:
            raise ValueError(f"Edge ({n1}, {n2}) does not exist in graph")
        return edge_data["length"]

    def quantum_path(self, p1: QGPoint, p2: QGPoint) -> dict[str, float | tuple | None]:
        """Compute the geodesic between two points on the graph.

        Args:
            p1: First point.
            p2: Second point.

        Returns:
            Dictionary with:
                - 'distance': geodesic distance
                - 'path': (node_from_p1_edge, node_from_p2_edge) or None if same edge
        """
        edge1, edge2 = p1.edge, p2.edge
        pos1, pos2 = p1.position, p2.position
        length1 = self.get_edge_length(*edge1)
        length2 = self.get_edge_length(*edge2)

        # Compute distances for all 4 possible paths
        d0 = self.node_distance(edge1[0], edge2[0]) + pos1 + pos2
        d1 = self.node_distance(edge1[0], edge2[1]) + pos1 + (length2 - pos2)
        d2 = self.node_distance(edge1[1], edge2[0]) + (length1 - pos1) + pos2
        d3 = (
            self.node_distance(edge1[1], edge2[1]) + (length1 - pos1) + (length2 - pos2)
        )

        # Find minimum distance (break ties randomly)
        distances = np.array([d0, d1, d2, d3])
        all_idx = np.where(distances == distances.min())[0]
        idx = rd.choice(all_idx)
        d_min = distances[idx]

        # Check if points are on the same edge
        if (
            edge1[0] == edge2[1]
            and edge1[1] == edge2[0]
            and d_min > abs(length1 - pos1 - pos2)
        ):
            return {"distance": abs(length1 - pos1 - pos2), "path": None}

        if edge1 == edge2 and abs(pos1 - pos2) < d_min:
            return {"distance": abs(pos1 - pos2), "path": None}

        # Determine path nodes
        if idx == 0:
            path = (edge1[0], edge2[0])
        elif idx == 1:
            path = (edge1[0], edge2[1])
        elif idx == 2:
            path = (edge1[1], edge2[0])
        else:
            path = (edge1[1], edge2[1])

        return {"distance": d_min, "path": path}

    def distance(self, p1: QGPoint, p2: QGPoint) -> float:
        """Compute distance between two points on the graph.

        Args:
            p1: First point.
            p2: Second point.

        Returns:
            Geodesic distance.
        """
        return self.quantum_path(p1, p2)["distance"]

    def distances_from_centers(
        self, centers: list[QGCenter], target: QGPoint
    ) -> np.ndarray:
        """Compute distances from multiple centers to a single target point.

        This is a Numba-accelerated operation that efficiently computes distances
        from all centers to one target point. Works for any target location
        (on nodes or edges).

        Args:
            centers: List of k centers to compute distances from.
            target: The target point (can be on a node or edge).

        Returns:
            Array of shape (k,) with distances from each center to target.

        Raises:
            ValueError: If pairwise distances not precomputed.

        Example:
            ```python
            centers = graph.sample_centers(5)
            target = graph.sample_points(1)[0]
            distances = graph.distances_from_centers(centers, target)
            closest_idx = np.argmin(distances)
            closest_center = centers[closest_idx]
            ```
        """
        if self._pairwise_nodes_distance_array is None:
            raise ValueError("Must call precomputing() before distances_from_centers")

        k = len(centers)

        # Extract center data into numpy arrays for Numba
        center_edges_0 = np.empty(k, dtype=np.int32)
        center_edges_1 = np.empty(k, dtype=np.int32)
        center_positions = np.empty(k, dtype=np.float64)
        center_lengths = np.empty(k, dtype=np.float64)

        for i, center in enumerate(centers):
            edge = center.edge
            center_edges_0[i] = self._node_to_index[edge[0]]
            center_edges_1[i] = self._node_to_index[edge[1]]
            center_positions[i] = center.position
            center_lengths[i] = self.get_edge_length(*edge)

        # Extract target information
        target_edge = target.edge
        target_edge_0 = self._node_to_index[target_edge[0]]
        target_edge_1 = self._node_to_index[target_edge[1]]
        target_pos = target.position
        target_length = self.get_edge_length(*target_edge)

        # Call Numba-accelerated function
        return _batch_distances_numba(
            center_edges_0,
            center_edges_1,
            center_positions,
            center_lengths,
            target_edge_0,
            target_edge_1,
            target_pos,
            target_length,
            self._pairwise_nodes_distance_array,
        )

    def light_sample_points(self, n: int) -> list[QGPoint]:
        """Fast sampling of points at random nodes.

        Args:
            n: Number of points to sample.

        Returns:
            List of n points at random nodes.
        """
        nodes = rd.choices(list(self.nodes()), k=n)
        points = []
        for node in nodes:
            neighbor = rd.choice(list(self.neighbors(node)))
            points.append(QGPoint(self, (node, neighbor), 0))
        return points

    def center_from_point(self, point: QGPoint) -> QGCenter:
        """Create a QGCenter object from a QGPoint object."""
        return QGCenter(point)

    def nodes_as_points(self) -> list[QGPoint]:
        """Convert all nodes to points.

        Returns:
            List of points, one at each node.
        """
        points = []
        for node in self.nodes:
            neighbor = rd.choice(list(self.neighbors(node)))
            points.append(QGPoint(self, (node, neighbor), 0))
        return points

    def node_as_center(self, node: int) -> QGCenter:
        """Create a center at a specific node.

        Args:
            node: The node to place the center at.

        Returns:
            A center located at the node.
        """
        neighbor = rd.choice(list(self.neighbors(node)))
        point = QGPoint(self, (node, neighbor), 0)
        return QGCenter(point)

    def compute_clusters(self, centers: list[QGCenter]) -> None:
        """Assign each node to its nearest center.

        Updates node 'cluster' attribute.

        Args:
            centers: List of cluster centers.
        """
        for node in self.nodes:
            distances = np.array(
                [self.node_distance(center.edge[0], node) for center in centers]
            )
            nx.set_node_attributes(self, {node: {"cluster": np.argmin(distances)}})

    def calculate_energy(
        self,
        centers: list[QGCenter],
        how: str = "uniform",
    ) -> float:
        """Calculate k-means energy for given centers.

        Args:
            centers: List of cluster centers.
            how: Energy calculation mode:
                - "uniform": Use uniform distribution over nodes
                - "obs": Weight by observed point counts at nodes

        Returns:
            Average squared distance to nearest center.
        """
        if how == "uniform":
            energy = 0.0
            for node in self.nodes:
                neighbor = next(self.neighbors(node))
                point = QGPoint(self, (node, neighbor), 0)
                min_dist_sq = min(
                    self.distance(center, point) ** 2 for center in centers
                )
                energy += min_dist_sq
            return energy / self.number_of_nodes()
        else:  # how == "obs"
            energy = 0.0
            total_obs = 0
            for node, data in self.nodes(data=True):
                nb_obs = data.get("nb_obs", 0)
                if nb_obs > 0:
                    neighbor = rd.choice(list(self.neighbors(node)))
                    point = QGPoint(self, (node, neighbor), 0)
                    min_dist_sq = min(
                        self.distance(center, point) ** 2 for center in centers
                    )
                    energy += min_dist_sq * nb_obs
                    total_obs += nb_obs

            return energy / total_obs if total_obs > 0 else 0.0

    def calculate_energy_numba(
        self, centers: list[QGCenter], how: str = "uniform"
    ) -> float:
        """Numba-accelerated energy calculation for centers.

        Uses precomputed distance matrix for fast computation. This method is
        automatically detected and used by MinimizeEnergy strategy when available.

        Args:
            centers: List of cluster centers.
            how: Energy calculation mode:
                - "uniform": Use uniform distribution over nodes
                - "obs": Weight by observed point counts at nodes

        Returns:
            Average squared distance to nearest center.

        Raises:
            ValueError: If pairwise distances not precomputed.

        Note:
            Requires precomputing() to have been called first.
        """
        if self._pairwise_nodes_distance_array is None:
            raise ValueError("Must call precomputing() before calculate_energy_numba")

        # Extract center data
        k = len(centers)
        center_edges_0 = np.empty(k, dtype=np.int32)
        center_edges_1 = np.empty(k, dtype=np.int32)
        center_positions = np.empty(k, dtype=np.float64)
        center_lengths = np.empty(k, dtype=np.float64)

        for i, center in enumerate(centers):
            edge = center.edge
            center_edges_0[i] = self._node_to_index[edge[0]]
            center_edges_1[i] = self._node_to_index[edge[1]]
            center_positions[i] = center.position
            center_lengths[i] = self.get_edge_length(*edge)

        # Extract point data from stored observations (nodes as points at position 0)
        nodes = list(self.nodes())
        n = len(nodes)
        point_edges_0 = np.empty(n, dtype=np.int32)
        point_edges_1 = np.empty(n, dtype=np.int32)
        point_positions = np.zeros(n, dtype=np.float64)  # All at position 0
        point_lengths = np.empty(n, dtype=np.float64)

        for i, node in enumerate(nodes):
            neighbor = next(self.neighbors(node))
            edge = (node, neighbor)
            point_edges_0[i] = self._node_to_index[edge[0]]
            point_edges_1[i] = self._node_to_index[edge[1]]
            point_lengths[i] = self.get_edge_length(*edge)

        if how == "uniform":
            return _calculate_energy_uniform_numba(
                center_edges_0,
                center_edges_1,
                center_positions,
                center_lengths,
                point_edges_0,
                point_edges_1,
                point_positions,
                point_lengths,
                self._pairwise_nodes_distance_array,
            )
        else:  # how == "obs"
            point_nb_obs = np.array(
                [data.get("nb_obs", 0) for _, data in self.nodes(data=True)],
                dtype=np.int32,
            )
            return _calculate_energy_obs_numba(
                center_edges_0,
                center_edges_1,
                center_positions,
                center_lengths,
                point_edges_0,
                point_edges_1,
                point_positions,
                point_lengths,
                point_nb_obs,
                self._pairwise_nodes_distance_array,
            )

    def distance_matrix(self) -> np.ndarray:
        """Compute the pairwise distance matrix between all nodes.

        Returns:
            n×n matrix of pairwise distances.

        Raises:
            ValueError: If pairwise distances not precomputed.
        """
        if self._pairwise_nodes_distance is None:
            raise ValueError("Must call precomputing() first")

        n = self.number_of_nodes()
        node_list = list(self.nodes())
        matrix_distances = np.zeros((n, n))

        for i, node_i in enumerate(node_list):
            for j, node_j in enumerate(node_list):
                matrix_distances[i, j] = self._pairwise_nodes_distance[node_i][node_j]

        self.matrix_distance = matrix_distances
        return matrix_distances

    def index_to_centers(self, indices: list[int]) -> list[QGCenter]:
        """Convert node indices to centers.

        Args:
            indices: List of node indices.

        Returns:
            List of centers at the specified nodes.
        """
        node_list = list(self.nodes())
        return [self.node_as_center(node_list[idx]) for idx in indices]

    def draw(
        self,
        color_by: str | None = "cluster",
        centers: list[QGCenter] | None = None,
        node_size_by_obs: bool = True,
        ax: "plt.Axes" | None = None,
        **kwargs,
    ):
        """Draws the graph using matplotlib.

        This method provides a flexible way to visualize the graph, its clusters,
        and the results of the k-means algorithm. To use this method, you need
        to install the 'plot' extras: `pip install kmeanssa-ng[plot]`

        Args:
            color_by: Node attribute to use for coloring.
                - "cluster": Colors nodes by their assigned cluster.
                - "block": Colors nodes by a pre-defined 'block' attribute.
                - None: All nodes will have the same default color. (Default: "skyblue")
            centers: A list of QGCenter objects to be highlighted on the graph.
            node_size_by_obs: If True, sizes nodes based on 'nb_obs' attribute.
            ax: A matplotlib axes object to draw on. If None, a new figure
                and axes are created.
            **kwargs: Additional arguments passed to nx.draw().
        """
        try:
            import matplotlib.pyplot as plt
        except ImportError as e:
            raise ImportError(
                "Plotting requires matplotlib. Please install it with "
                "`pip install kmeanssa-ng[plot]`"
            ) from e

        if ax is None:
            fig, ax = plt.subplots(figsize=kwargs.pop("figsize", (12, 12)))

        # Use pre-computed layout for consistency
        pos = self.node_position

        # Determine node colors
        node_colors = "skyblue"  # Default color
        if color_by:
            # Check if any node has the attribute
            if not any(color_by in n[1] for n in self.nodes(data=True)):
                print(
                    f"Warning: Node attribute '{color_by}' not found. Using default color."
                )
            else:
                node_colors = [self.nodes[n].get(color_by, 0) for n in self.nodes()]

        # Determine node sizes
        node_sizes = 300  # Default size
        if node_size_by_obs:
            if not any("nb_obs" in n[1] for n in self.nodes(data=True)):
                print("Warning: Node attribute 'nb_obs' not found. Using default size.")
            else:
                # Scale size by number of observations, with a minimum size
                node_sizes = [
                    self.nodes[n].get("nb_obs", 0) * 50 + 100 for n in self.nodes()
                ]

        # Draw the graph
        nx.draw(
            self,
            pos=pos,
            ax=ax,
            node_color=node_colors,
            node_size=node_sizes,
            with_labels=False,
            **kwargs,
        )

        # Highlight centers
        if centers:
            center_nodes = [c.edge[0] for c in centers]
            node_list = list(self.nodes)
            center_node_indices = [node_list.index(n) for n in center_nodes]

            center_sizes = (
                [node_sizes[i] * 2 for i in center_node_indices]
                if isinstance(node_sizes, list)
                else node_sizes * 2
            )

            nx.draw_networkx_nodes(
                self,
                pos,
                nodelist=center_nodes,
                ax=ax,
                node_size=center_sizes,
                edgecolors="grey",
                linewidths=2,
            )

diameter property

Compute and cache the graph diameter.

Returns:

Type Description
float

Maximum distance between any two nodes.

node_position property

Compute layout positions for visualization.

Returns:

Type Description
dict

Dictionary mapping nodes to (x, y) positions.

__init__(incoming_graph_data=None, precompute=False, **attr)

Initialize a quantum graph.

Parameters:

Name Type Description Default
incoming_graph_data

Input graph data (see networkx.Graph).

None
precompute bool

If True, automatically precompute distances after initialization.

False
**attr

Additional graph attributes.

{}
Source code in kmeanssa_ng/quantum_graph/space.py
def __init__(
    self, incoming_graph_data=None, precompute: bool = False, **attr
) -> None:
    """Initialize a quantum graph.

    Args:
        incoming_graph_data: Input graph data (see networkx.Graph).
        precompute: If True, automatically precompute distances after initialization.
        **attr: Additional graph attributes.
    """
    super().__init__(incoming_graph_data, **attr)
    self._pairwise_nodes_distance: dict[int, dict[int, float]] | None = None
    self._pairwise_nodes_distance_array: np.ndarray | None = None
    self._node_to_index: dict[int, int] | None = None
    self._diameter: float = 0.0
    self._node_position: dict | None = None

    if precompute and self.number_of_nodes() > 0:
        self.precomputing()

add_edge(u_for_edge, v_for_edge, **attr)

Add an edge with validation of the length attribute.

Parameters:

Name Type Description Default
u_for_edge

First node.

required
v_for_edge

Second node.

required
**attr

Edge attributes. Must include 'length' with a positive value.

{}

Raises:

Type Description
ValueError

If 'length' is missing, not positive, or not a number.

Source code in kmeanssa_ng/quantum_graph/space.py
def add_edge(self, u_for_edge, v_for_edge, **attr) -> None:
    """Add an edge with validation of the length attribute.

    Args:
        u_for_edge: First node.
        v_for_edge: Second node.
        **attr: Edge attributes. Must include 'length' with a positive value.

    Raises:
        ValueError: If 'length' is missing, not positive, or not a number.
    """
    if "length" not in attr:
        raise ValueError(
            f"Edge ({u_for_edge}, {v_for_edge}) must have a 'length' attribute"
        )

    length = attr["length"]

    # Check if length is a number
    try:
        length_float = float(length)
    except (TypeError, ValueError) as e:
        raise ValueError(
            f"Edge ({u_for_edge}, {v_for_edge}) length must be a number, got {type(length).__name__}"
        ) from e

    # Check if length is positive
    if length_float <= 0:
        raise ValueError(
            f"Edge ({u_for_edge}, {v_for_edge}) length must be positive, got {length_float}"
        )

    super().add_edge(u_for_edge, v_for_edge, **attr)

calculate_energy(centers, how='uniform')

Calculate k-means energy for given centers.

Parameters:

Name Type Description Default
centers list[QGCenter]

List of cluster centers.

required
how str

Energy calculation mode: - "uniform": Use uniform distribution over nodes - "obs": Weight by observed point counts at nodes

'uniform'

Returns:

Type Description
float

Average squared distance to nearest center.

Source code in kmeanssa_ng/quantum_graph/space.py
def calculate_energy(
    self,
    centers: list[QGCenter],
    how: str = "uniform",
) -> float:
    """Calculate k-means energy for given centers.

    Args:
        centers: List of cluster centers.
        how: Energy calculation mode:
            - "uniform": Use uniform distribution over nodes
            - "obs": Weight by observed point counts at nodes

    Returns:
        Average squared distance to nearest center.
    """
    if how == "uniform":
        energy = 0.0
        for node in self.nodes:
            neighbor = next(self.neighbors(node))
            point = QGPoint(self, (node, neighbor), 0)
            min_dist_sq = min(
                self.distance(center, point) ** 2 for center in centers
            )
            energy += min_dist_sq
        return energy / self.number_of_nodes()
    else:  # how == "obs"
        energy = 0.0
        total_obs = 0
        for node, data in self.nodes(data=True):
            nb_obs = data.get("nb_obs", 0)
            if nb_obs > 0:
                neighbor = rd.choice(list(self.neighbors(node)))
                point = QGPoint(self, (node, neighbor), 0)
                min_dist_sq = min(
                    self.distance(center, point) ** 2 for center in centers
                )
                energy += min_dist_sq * nb_obs
                total_obs += nb_obs

        return energy / total_obs if total_obs > 0 else 0.0

calculate_energy_numba(centers, how='uniform')

Numba-accelerated energy calculation for centers.

Uses precomputed distance matrix for fast computation. This method is automatically detected and used by MinimizeEnergy strategy when available.

Parameters:

Name Type Description Default
centers list[QGCenter]

List of cluster centers.

required
how str

Energy calculation mode: - "uniform": Use uniform distribution over nodes - "obs": Weight by observed point counts at nodes

'uniform'

Returns:

Type Description
float

Average squared distance to nearest center.

Raises:

Type Description
ValueError

If pairwise distances not precomputed.

Note

Requires precomputing() to have been called first.

Source code in kmeanssa_ng/quantum_graph/space.py
def calculate_energy_numba(
    self, centers: list[QGCenter], how: str = "uniform"
) -> float:
    """Numba-accelerated energy calculation for centers.

    Uses precomputed distance matrix for fast computation. This method is
    automatically detected and used by MinimizeEnergy strategy when available.

    Args:
        centers: List of cluster centers.
        how: Energy calculation mode:
            - "uniform": Use uniform distribution over nodes
            - "obs": Weight by observed point counts at nodes

    Returns:
        Average squared distance to nearest center.

    Raises:
        ValueError: If pairwise distances not precomputed.

    Note:
        Requires precomputing() to have been called first.
    """
    if self._pairwise_nodes_distance_array is None:
        raise ValueError("Must call precomputing() before calculate_energy_numba")

    # Extract center data
    k = len(centers)
    center_edges_0 = np.empty(k, dtype=np.int32)
    center_edges_1 = np.empty(k, dtype=np.int32)
    center_positions = np.empty(k, dtype=np.float64)
    center_lengths = np.empty(k, dtype=np.float64)

    for i, center in enumerate(centers):
        edge = center.edge
        center_edges_0[i] = self._node_to_index[edge[0]]
        center_edges_1[i] = self._node_to_index[edge[1]]
        center_positions[i] = center.position
        center_lengths[i] = self.get_edge_length(*edge)

    # Extract point data from stored observations (nodes as points at position 0)
    nodes = list(self.nodes())
    n = len(nodes)
    point_edges_0 = np.empty(n, dtype=np.int32)
    point_edges_1 = np.empty(n, dtype=np.int32)
    point_positions = np.zeros(n, dtype=np.float64)  # All at position 0
    point_lengths = np.empty(n, dtype=np.float64)

    for i, node in enumerate(nodes):
        neighbor = next(self.neighbors(node))
        edge = (node, neighbor)
        point_edges_0[i] = self._node_to_index[edge[0]]
        point_edges_1[i] = self._node_to_index[edge[1]]
        point_lengths[i] = self.get_edge_length(*edge)

    if how == "uniform":
        return _calculate_energy_uniform_numba(
            center_edges_0,
            center_edges_1,
            center_positions,
            center_lengths,
            point_edges_0,
            point_edges_1,
            point_positions,
            point_lengths,
            self._pairwise_nodes_distance_array,
        )
    else:  # how == "obs"
        point_nb_obs = np.array(
            [data.get("nb_obs", 0) for _, data in self.nodes(data=True)],
            dtype=np.int32,
        )
        return _calculate_energy_obs_numba(
            center_edges_0,
            center_edges_1,
            center_positions,
            center_lengths,
            point_edges_0,
            point_edges_1,
            point_positions,
            point_lengths,
            point_nb_obs,
            self._pairwise_nodes_distance_array,
        )

center_from_point(point)

Create a QGCenter object from a QGPoint object.

Source code in kmeanssa_ng/quantum_graph/space.py
def center_from_point(self, point: QGPoint) -> QGCenter:
    """Create a QGCenter object from a QGPoint object."""
    return QGCenter(point)

compute_clusters(centers)

Assign each node to its nearest center.

Updates node 'cluster' attribute.

Parameters:

Name Type Description Default
centers list[QGCenter]

List of cluster centers.

required
Source code in kmeanssa_ng/quantum_graph/space.py
def compute_clusters(self, centers: list[QGCenter]) -> None:
    """Assign each node to its nearest center.

    Updates node 'cluster' attribute.

    Args:
        centers: List of cluster centers.
    """
    for node in self.nodes:
        distances = np.array(
            [self.node_distance(center.edge[0], node) for center in centers]
        )
        nx.set_node_attributes(self, {node: {"cluster": np.argmin(distances)}})

distance(p1, p2)

Compute distance between two points on the graph.

Parameters:

Name Type Description Default
p1 QGPoint

First point.

required
p2 QGPoint

Second point.

required

Returns:

Type Description
float

Geodesic distance.

Source code in kmeanssa_ng/quantum_graph/space.py
def distance(self, p1: QGPoint, p2: QGPoint) -> float:
    """Compute distance between two points on the graph.

    Args:
        p1: First point.
        p2: Second point.

    Returns:
        Geodesic distance.
    """
    return self.quantum_path(p1, p2)["distance"]

distance_matrix()

Compute the pairwise distance matrix between all nodes.

Returns:

Type Description
ndarray

n×n matrix of pairwise distances.

Raises:

Type Description
ValueError

If pairwise distances not precomputed.

Source code in kmeanssa_ng/quantum_graph/space.py
def distance_matrix(self) -> np.ndarray:
    """Compute the pairwise distance matrix between all nodes.

    Returns:
        n×n matrix of pairwise distances.

    Raises:
        ValueError: If pairwise distances not precomputed.
    """
    if self._pairwise_nodes_distance is None:
        raise ValueError("Must call precomputing() first")

    n = self.number_of_nodes()
    node_list = list(self.nodes())
    matrix_distances = np.zeros((n, n))

    for i, node_i in enumerate(node_list):
        for j, node_j in enumerate(node_list):
            matrix_distances[i, j] = self._pairwise_nodes_distance[node_i][node_j]

    self.matrix_distance = matrix_distances
    return matrix_distances

distances_from_centers(centers, target)

Compute distances from multiple centers to a single target point.

This is a Numba-accelerated operation that efficiently computes distances from all centers to one target point. Works for any target location (on nodes or edges).

Parameters:

Name Type Description Default
centers list[QGCenter]

List of k centers to compute distances from.

required
target QGPoint

The target point (can be on a node or edge).

required

Returns:

Type Description
ndarray

Array of shape (k,) with distances from each center to target.

Raises:

Type Description
ValueError

If pairwise distances not precomputed.

Example
centers = graph.sample_centers(5)
target = graph.sample_points(1)[0]
distances = graph.distances_from_centers(centers, target)
closest_idx = np.argmin(distances)
closest_center = centers[closest_idx]
Source code in kmeanssa_ng/quantum_graph/space.py
def distances_from_centers(
    self, centers: list[QGCenter], target: QGPoint
) -> np.ndarray:
    """Compute distances from multiple centers to a single target point.

    This is a Numba-accelerated operation that efficiently computes distances
    from all centers to one target point. Works for any target location
    (on nodes or edges).

    Args:
        centers: List of k centers to compute distances from.
        target: The target point (can be on a node or edge).

    Returns:
        Array of shape (k,) with distances from each center to target.

    Raises:
        ValueError: If pairwise distances not precomputed.

    Example:
        ```python
        centers = graph.sample_centers(5)
        target = graph.sample_points(1)[0]
        distances = graph.distances_from_centers(centers, target)
        closest_idx = np.argmin(distances)
        closest_center = centers[closest_idx]
        ```
    """
    if self._pairwise_nodes_distance_array is None:
        raise ValueError("Must call precomputing() before distances_from_centers")

    k = len(centers)

    # Extract center data into numpy arrays for Numba
    center_edges_0 = np.empty(k, dtype=np.int32)
    center_edges_1 = np.empty(k, dtype=np.int32)
    center_positions = np.empty(k, dtype=np.float64)
    center_lengths = np.empty(k, dtype=np.float64)

    for i, center in enumerate(centers):
        edge = center.edge
        center_edges_0[i] = self._node_to_index[edge[0]]
        center_edges_1[i] = self._node_to_index[edge[1]]
        center_positions[i] = center.position
        center_lengths[i] = self.get_edge_length(*edge)

    # Extract target information
    target_edge = target.edge
    target_edge_0 = self._node_to_index[target_edge[0]]
    target_edge_1 = self._node_to_index[target_edge[1]]
    target_pos = target.position
    target_length = self.get_edge_length(*target_edge)

    # Call Numba-accelerated function
    return _batch_distances_numba(
        center_edges_0,
        center_edges_1,
        center_positions,
        center_lengths,
        target_edge_0,
        target_edge_1,
        target_pos,
        target_length,
        self._pairwise_nodes_distance_array,
    )

draw(color_by='cluster', centers=None, node_size_by_obs=True, ax=None, **kwargs)

Draws the graph using matplotlib.

This method provides a flexible way to visualize the graph, its clusters, and the results of the k-means algorithm. To use this method, you need to install the 'plot' extras: pip install kmeanssa-ng[plot]

Parameters:

Name Type Description Default
color_by str | None

Node attribute to use for coloring. - "cluster": Colors nodes by their assigned cluster. - "block": Colors nodes by a pre-defined 'block' attribute. - None: All nodes will have the same default color. (Default: "skyblue")

'cluster'
centers list[QGCenter] | None

A list of QGCenter objects to be highlighted on the graph.

None
node_size_by_obs bool

If True, sizes nodes based on 'nb_obs' attribute.

True
ax 'plt.Axes' | None

A matplotlib axes object to draw on. If None, a new figure and axes are created.

None
**kwargs

Additional arguments passed to nx.draw().

{}
Source code in kmeanssa_ng/quantum_graph/space.py
def draw(
    self,
    color_by: str | None = "cluster",
    centers: list[QGCenter] | None = None,
    node_size_by_obs: bool = True,
    ax: "plt.Axes" | None = None,
    **kwargs,
):
    """Draws the graph using matplotlib.

    This method provides a flexible way to visualize the graph, its clusters,
    and the results of the k-means algorithm. To use this method, you need
    to install the 'plot' extras: `pip install kmeanssa-ng[plot]`

    Args:
        color_by: Node attribute to use for coloring.
            - "cluster": Colors nodes by their assigned cluster.
            - "block": Colors nodes by a pre-defined 'block' attribute.
            - None: All nodes will have the same default color. (Default: "skyblue")
        centers: A list of QGCenter objects to be highlighted on the graph.
        node_size_by_obs: If True, sizes nodes based on 'nb_obs' attribute.
        ax: A matplotlib axes object to draw on. If None, a new figure
            and axes are created.
        **kwargs: Additional arguments passed to nx.draw().
    """
    try:
        import matplotlib.pyplot as plt
    except ImportError as e:
        raise ImportError(
            "Plotting requires matplotlib. Please install it with "
            "`pip install kmeanssa-ng[plot]`"
        ) from e

    if ax is None:
        fig, ax = plt.subplots(figsize=kwargs.pop("figsize", (12, 12)))

    # Use pre-computed layout for consistency
    pos = self.node_position

    # Determine node colors
    node_colors = "skyblue"  # Default color
    if color_by:
        # Check if any node has the attribute
        if not any(color_by in n[1] for n in self.nodes(data=True)):
            print(
                f"Warning: Node attribute '{color_by}' not found. Using default color."
            )
        else:
            node_colors = [self.nodes[n].get(color_by, 0) for n in self.nodes()]

    # Determine node sizes
    node_sizes = 300  # Default size
    if node_size_by_obs:
        if not any("nb_obs" in n[1] for n in self.nodes(data=True)):
            print("Warning: Node attribute 'nb_obs' not found. Using default size.")
        else:
            # Scale size by number of observations, with a minimum size
            node_sizes = [
                self.nodes[n].get("nb_obs", 0) * 50 + 100 for n in self.nodes()
            ]

    # Draw the graph
    nx.draw(
        self,
        pos=pos,
        ax=ax,
        node_color=node_colors,
        node_size=node_sizes,
        with_labels=False,
        **kwargs,
    )

    # Highlight centers
    if centers:
        center_nodes = [c.edge[0] for c in centers]
        node_list = list(self.nodes)
        center_node_indices = [node_list.index(n) for n in center_nodes]

        center_sizes = (
            [node_sizes[i] * 2 for i in center_node_indices]
            if isinstance(node_sizes, list)
            else node_sizes * 2
        )

        nx.draw_networkx_nodes(
            self,
            pos,
            nodelist=center_nodes,
            ax=ax,
            node_size=center_sizes,
            edgecolors="grey",
            linewidths=2,
        )

get_edge_length(n1, n2)

Get the length of an edge.

Parameters:

Name Type Description Default
n1 int

First node of the edge.

required
n2 int

Second node of the edge.

required

Returns:

Type Description
float

Edge length.

Raises:

Type Description
ValueError

If edge does not exist.

Source code in kmeanssa_ng/quantum_graph/space.py
def get_edge_length(self, n1: int, n2: int) -> float:
    """Get the length of an edge.

    Args:
        n1: First node of the edge.
        n2: Second node of the edge.

    Returns:
        Edge length.

    Raises:
        ValueError: If edge does not exist.
    """
    edge_data = self.get_edge_data(n1, n2)
    if edge_data is None:
        raise ValueError(f"Edge ({n1}, {n2}) does not exist in graph")
    return edge_data["length"]

index_to_centers(indices)

Convert node indices to centers.

Parameters:

Name Type Description Default
indices list[int]

List of node indices.

required

Returns:

Type Description
list[QGCenter]

List of centers at the specified nodes.

Source code in kmeanssa_ng/quantum_graph/space.py
def index_to_centers(self, indices: list[int]) -> list[QGCenter]:
    """Convert node indices to centers.

    Args:
        indices: List of node indices.

    Returns:
        List of centers at the specified nodes.
    """
    node_list = list(self.nodes())
    return [self.node_as_center(node_list[idx]) for idx in indices]

light_sample_points(n)

Fast sampling of points at random nodes.

Parameters:

Name Type Description Default
n int

Number of points to sample.

required

Returns:

Type Description
list[QGPoint]

List of n points at random nodes.

Source code in kmeanssa_ng/quantum_graph/space.py
def light_sample_points(self, n: int) -> list[QGPoint]:
    """Fast sampling of points at random nodes.

    Args:
        n: Number of points to sample.

    Returns:
        List of n points at random nodes.
    """
    nodes = rd.choices(list(self.nodes()), k=n)
    points = []
    for node in nodes:
        neighbor = rd.choice(list(self.neighbors(node)))
        points.append(QGPoint(self, (node, neighbor), 0))
    return points

node_as_center(node)

Create a center at a specific node.

Parameters:

Name Type Description Default
node int

The node to place the center at.

required

Returns:

Type Description
QGCenter

A center located at the node.

Source code in kmeanssa_ng/quantum_graph/space.py
def node_as_center(self, node: int) -> QGCenter:
    """Create a center at a specific node.

    Args:
        node: The node to place the center at.

    Returns:
        A center located at the node.
    """
    neighbor = rd.choice(list(self.neighbors(node)))
    point = QGPoint(self, (node, neighbor), 0)
    return QGCenter(point)

node_distance(n1, n2)

Compute shortest path distance between two nodes.

Parameters:

Name Type Description Default
n1 int

First node.

required
n2 int

Second node.

required

Returns:

Type Description
float

Shortest path length using 'length' edge attribute.

Source code in kmeanssa_ng/quantum_graph/space.py
def node_distance(self, n1: int, n2: int) -> float:
    """Compute shortest path distance between two nodes.

    Args:
        n1: First node.
        n2: Second node.

    Returns:
        Shortest path length using 'length' edge attribute.
    """
    if self._pairwise_nodes_distance is not None:
        return self._pairwise_nodes_distance[n1][n2]
    else:
        return nx.shortest_path_length(self, n1, n2, weight="length")

nodes_as_points()

Convert all nodes to points.

Returns:

Type Description
list[QGPoint]

List of points, one at each node.

Source code in kmeanssa_ng/quantum_graph/space.py
def nodes_as_points(self) -> list[QGPoint]:
    """Convert all nodes to points.

    Returns:
        List of points, one at each node.
    """
    points = []
    for node in self.nodes:
        neighbor = rd.choice(list(self.neighbors(node)))
        points.append(QGPoint(self, (node, neighbor), 0))
    return points

precomputing()

Precompute and cache all pairwise node distances.

This significantly speeds up distance queries. Should be called once after graph construction.

Raises:

Type Description
ValueError

If graph is not connected or has invalid edge lengths.

Source code in kmeanssa_ng/quantum_graph/space.py
def precomputing(self) -> None:
    """Precompute and cache all pairwise node distances.

    This significantly speeds up distance queries.
    Should be called once after graph construction.

    Raises:
        ValueError: If graph is not connected or has invalid edge lengths.
    """
    # Validate edge lengths first
    self.validate_edge_lengths()

    # Check connectivity
    if self.number_of_nodes() > 0 and not nx.is_connected(self):
        num_components = nx.number_connected_components(self)
        raise ValueError(
            f"Graph must be connected for distance precomputing. "
            f"Found {num_components} connected components."
        )

    if self._pairwise_nodes_distance is None:
        self._pairwise_nodes_distance = dict(
            nx.all_pairs_dijkstra_path_length(self, weight="length")
        )

        # Create numpy array version for Numba-accelerated computations
        node_list = list(self.nodes())
        n = len(node_list)
        self._node_to_index = {node: i for i, node in enumerate(node_list)}
        self._pairwise_nodes_distance_array = np.zeros((n, n), dtype=np.float64)

        for i, node_i in enumerate(node_list):
            for j, node_j in enumerate(node_list):
                self._pairwise_nodes_distance_array[i, j] = (
                    self._pairwise_nodes_distance[node_i][node_j]
                )

quantum_path(p1, p2)

Compute the geodesic between two points on the graph.

Parameters:

Name Type Description Default
p1 QGPoint

First point.

required
p2 QGPoint

Second point.

required

Returns:

Type Description
dict[str, float | tuple | None]

Dictionary with: - 'distance': geodesic distance - 'path': (node_from_p1_edge, node_from_p2_edge) or None if same edge

Source code in kmeanssa_ng/quantum_graph/space.py
def quantum_path(self, p1: QGPoint, p2: QGPoint) -> dict[str, float | tuple | None]:
    """Compute the geodesic between two points on the graph.

    Args:
        p1: First point.
        p2: Second point.

    Returns:
        Dictionary with:
            - 'distance': geodesic distance
            - 'path': (node_from_p1_edge, node_from_p2_edge) or None if same edge
    """
    edge1, edge2 = p1.edge, p2.edge
    pos1, pos2 = p1.position, p2.position
    length1 = self.get_edge_length(*edge1)
    length2 = self.get_edge_length(*edge2)

    # Compute distances for all 4 possible paths
    d0 = self.node_distance(edge1[0], edge2[0]) + pos1 + pos2
    d1 = self.node_distance(edge1[0], edge2[1]) + pos1 + (length2 - pos2)
    d2 = self.node_distance(edge1[1], edge2[0]) + (length1 - pos1) + pos2
    d3 = (
        self.node_distance(edge1[1], edge2[1]) + (length1 - pos1) + (length2 - pos2)
    )

    # Find minimum distance (break ties randomly)
    distances = np.array([d0, d1, d2, d3])
    all_idx = np.where(distances == distances.min())[0]
    idx = rd.choice(all_idx)
    d_min = distances[idx]

    # Check if points are on the same edge
    if (
        edge1[0] == edge2[1]
        and edge1[1] == edge2[0]
        and d_min > abs(length1 - pos1 - pos2)
    ):
        return {"distance": abs(length1 - pos1 - pos2), "path": None}

    if edge1 == edge2 and abs(pos1 - pos2) < d_min:
        return {"distance": abs(pos1 - pos2), "path": None}

    # Determine path nodes
    if idx == 0:
        path = (edge1[0], edge2[0])
    elif idx == 1:
        path = (edge1[0], edge2[1])
    elif idx == 2:
        path = (edge1[1], edge2[0])
    else:
        path = (edge1[1], edge2[1])

    return {"distance": d_min, "path": path}

validate_edge_lengths()

Validate that all edges have positive length attributes.

Raises:

Type Description
ValueError

If any edge is missing 'length' or has invalid length.

Source code in kmeanssa_ng/quantum_graph/space.py
def validate_edge_lengths(self) -> None:
    """Validate that all edges have positive length attributes.

    Raises:
        ValueError: If any edge is missing 'length' or has invalid length.
    """
    for u, v, data in self.edges(data=True):
        if "length" not in data:
            raise ValueError(f"Edge ({u}, {v}) missing 'length' attribute")

        length = data["length"]
        try:
            length_float = float(length)
        except (TypeError, ValueError) as e:
            raise ValueError(
                f"Edge ({u}, {v}) length must be a number, got {type(length).__name__}"
            ) from e

        if length_float <= 0:
            raise ValueError(
                f"Edge ({u}, {v}) has invalid length {length_float}, must be positive"
            )

as_quantum_graph(graph, node_weight=1.0, edge_length=1.0, edge_weight=1.0, precompute=False)

Convert a NetworkX graph to a quantum graph with uniform attributes.

Parameters:

Name Type Description Default
graph Graph

The NetworkX graph to convert.

required
node_weight float

Uniform weight to assign to all nodes.

1.0
edge_length float

Uniform length to assign to all edges.

1.0
edge_weight float

Uniform weight to assign to all edges.

1.0
precompute bool

If True, precompute pairwise distances (default: False for compatibility).

False

Returns:

Type Description
QuantumGraph

The converted quantum graph.

Example
import networkx as nx
G = nx.karate_club_graph()
qg = as_quantum_graph(G, edge_length=1.0)
Source code in kmeanssa_ng/quantum_graph/generators.py
def as_quantum_graph(
    graph: nx.Graph,
    node_weight: float = 1.0,
    edge_length: float = 1.0,
    edge_weight: float = 1.0,
    precompute: bool = False,
) -> QuantumGraph:
    """Convert a NetworkX graph to a quantum graph with uniform attributes.

    Args:
        graph: The NetworkX graph to convert.
        node_weight: Uniform weight to assign to all nodes.
        edge_length: Uniform length to assign to all edges.
        edge_weight: Uniform weight to assign to all edges.
        precompute: If True, precompute pairwise distances (default: False for compatibility).

    Returns:
        The converted quantum graph.

    Example:
        ```python
        import networkx as nx
        G = nx.karate_club_graph()
        qg = as_quantum_graph(G, edge_length=1.0)
        ```
    """
    # Validate 'graph'
    if not isinstance(graph, nx.Graph):
        raise ValueError("`graph` must be a networkx.Graph object.")

    # Validate 'node_weight'
    if not isinstance(node_weight, (int, float)) or node_weight <= 0:
        raise ValueError("`node_weight` must be a positive number.")

    # Validate 'edge_length'
    if not isinstance(edge_length, (int, float)) or edge_length <= 0:
        raise ValueError("`edge_length` must be a positive number.")

    # Validate 'edge_weight'
    if not isinstance(edge_weight, (int, float)) or edge_weight <= 0:
        raise ValueError("`edge_weight` must be a positive number.")

    qg = QuantumGraph(graph, precompute=False)
    nx.set_node_attributes(qg, node_weight, "weight")
    nx.set_edge_attributes(qg, edge_length, "length")
    nx.set_edge_attributes(qg, edge_weight, "weight")

    distrib = UniformDistribution(edge_length)
    for edge in qg.edges:
        nx.set_edge_attributes(qg, {edge: {"distribution": distrib}})

    if precompute:
        qg.precomputing()
    return qg

complete_quantum_graph(objects, similarities=None, true_labels=None, precompute=True)

Create a complete quantum graph from objects with optional similarity matrix.

Useful for clustering when you have a pairwise distance/similarity matrix.

Parameters:

Name Type Description Default
objects list

List of objects (nodes will be indexed by position).

required
similarities ndarray | None

Optional n×n matrix of similarities/distances. If None, all edges have length 1.

None
true_labels list | None

Optional true cluster labels for each object.

None
precompute bool

If True, precompute pairwise distances (default: True).

True

Returns:

Type Description
QuantumGraph

A complete quantum graph where edge lengths are given by the similarity matrix.

Example
objects = [1, 2, 3, 4, 5]
similarities = np.array([...])  # 5x5 matrix
labels = [0, 0, 1, 1, 1]
graph = complete_quantum_graph(objects, similarities, labels)
Source code in kmeanssa_ng/quantum_graph/generators.py
def complete_quantum_graph(
    objects: list,
    similarities: np.ndarray | None = None,
    true_labels: list | None = None,
    precompute: bool = True,
) -> QuantumGraph:
    """Create a complete quantum graph from objects with optional similarity matrix.

    Useful for clustering when you have a pairwise distance/similarity matrix.

    Args:
        objects: List of objects (nodes will be indexed by position).
        similarities: Optional n×n matrix of similarities/distances. If None, all edges
            have length 1.
        true_labels: Optional true cluster labels for each object.
        precompute: If True, precompute pairwise distances (default: True).

    Returns:
        A complete quantum graph where edge lengths are given by the similarity matrix.

    Example:
        ```python
        objects = [1, 2, 3, 4, 5]
        similarities = np.array([...])  # 5x5 matrix
        labels = [0, 0, 1, 1, 1]
        graph = complete_quantum_graph(objects, similarities, labels)
        ```
    """
    # Validate 'objects'
    if not isinstance(objects, list) or not objects:
        raise ValueError("`objects` must be a non-empty list.")

    num_objects = len(objects)

    # Validate 'similarities'
    if similarities is not None:
        if not isinstance(similarities, np.ndarray):
            raise ValueError("`similarities` must be a numpy array.")
        if similarities.shape != (num_objects, num_objects):
            raise ValueError(
                f"`similarities` must be a square matrix of size {num_objects}x{num_objects}."
            )
        if np.any(similarities < 0):
            raise ValueError("Elements of `similarities` must be non-negative.")

    # Validate 'true_labels'
    if true_labels is not None:
        if not isinstance(true_labels, list):
            raise ValueError("`true_labels` must be a list.")
        if len(true_labels) != num_objects:
            raise ValueError(
                f"`true_labels` must have the same length as `objects` ({num_objects})."
            )

    graph = QuantumGraph(precompute=False)

    # Add nodes
    for i, _ in enumerate(objects):
        if true_labels is not None:
            graph.add_node(i, weight=1, group=true_labels[i])
        else:
            graph.add_node(i, weight=1)

    # Add all edges (complete graph)
    for i in range(len(objects)):
        for j in range(i + 1, len(objects)):
            if similarities is not None:
                edge_length = similarities[i][j]
            else:
                edge_length = 1.0

            distrib = UniformDistribution(edge_length)
            graph.add_edge(i, j, weight=1, length=edge_length, distribution=distrib)

    if precompute:
        graph.precomputing()
    return graph

generate_random_sbm(sizes=None, p=None, weights=None, lengths=None, precompute=True)

Generate an SBM quantum graph with block-specific edge lengths and node weights.

Parameters:

Name Type Description Default
sizes list[int] | None

Number of nodes in each block. Defaults to [50, 50].

None
p list[list[float]] | None

Matrix of edge probabilities. Defaults to [[0.7, 0.1], [0.1, 0.7]].

None
weights list[float] | None

Node weight for each block. Defaults to [1, 1].

None
lengths list[list[float]] | None

Matrix of edge lengths. Element (i, j) gives the length for edges between blocks i and j. Defaults to [[1, 4], [4, 1]].

None
precompute bool

If True, precompute pairwise distances (default: True).

True

Returns:

Type Description
QuantumGraph

A quantum graph with block-specific attributes.

Example
# Two clusters with different intra/inter-cluster distances
graph = generate_random_sbm(
    sizes=[50, 50],
    p=[[0.7, 0.1], [0.1, 0.7]],
    weights=[1, 1],
    lengths=[[1, 4], [4, 1]]  # Longer inter-cluster edges
)
Source code in kmeanssa_ng/quantum_graph/generators.py
def generate_random_sbm(
    sizes: list[int] | None = None,
    p: list[list[float]] | None = None,
    weights: list[float] | None = None,
    lengths: list[list[float]] | None = None,
    precompute: bool = True,
) -> QuantumGraph:
    """Generate an SBM quantum graph with block-specific edge lengths and node weights.

    Args:
        sizes: Number of nodes in each block. Defaults to [50, 50].
        p: Matrix of edge probabilities. Defaults to [[0.7, 0.1], [0.1, 0.7]].
        weights: Node weight for each block. Defaults to [1, 1].
        lengths: Matrix of edge lengths. Element (i, j) gives the length
            for edges between blocks i and j. Defaults to [[1, 4], [4, 1]].
        precompute: If True, precompute pairwise distances (default: True).

    Returns:
        A quantum graph with block-specific attributes.

    Example:
        ```python
        # Two clusters with different intra/inter-cluster distances
        graph = generate_random_sbm(
            sizes=[50, 50],
            p=[[0.7, 0.1], [0.1, 0.7]],
            weights=[1, 1],
            lengths=[[1, 4], [4, 1]]  # Longer inter-cluster edges
        )
        ```
    """
    if sizes is None:
        sizes = [50, 50]
    if p is None:
        p = [[0.7, 0.1], [0.1, 0.7]]
    if weights is None:
        weights = [1, 1]
    if lengths is None:
        lengths = [[1, 4], [4, 1]]

    num_blocks = len(sizes)

    # Validate 'sizes'
    if (
        not isinstance(sizes, list)
        or not sizes
        or not all(isinstance(s, int) and s > 0 for s in sizes)
    ):
        raise ValueError("`sizes` must be a non-empty list of positive integers.")

    # Validate 'p'
    if not isinstance(p, list) or len(p) != num_blocks:
        raise ValueError(
            f"`p` must be a square matrix of size {num_blocks}x{num_blocks}."
        )
    for row in p:
        if not isinstance(row, list) or len(row) != num_blocks:
            raise ValueError(
                f"`p` must be a square matrix of size {num_blocks}x{num_blocks}."
            )
        if not all(isinstance(val, (float, int)) and 0 <= val <= 1 for val in row):
            raise ValueError(
                "Elements of `p` must be floats or integers between 0 and 1."
            )

    # Validate 'weights'
    if not isinstance(weights, list) or len(weights) != num_blocks:
        raise ValueError(f"`weights` must be a list of size {num_blocks}.")
    if not all(isinstance(w, (float, int)) and w > 0 for w in weights):
        raise ValueError("Elements of `weights` must be positive numbers.")

    # Validate 'lengths'
    if not isinstance(lengths, list) or len(lengths) != num_blocks:
        raise ValueError(
            f"`lengths` must be a square matrix of size {num_blocks}x{num_blocks}."
        )
    for row in lengths:
        if not isinstance(row, list) or len(row) != num_blocks:
            raise ValueError(
                f"`lengths` must be a square matrix of size {num_blocks}x{num_blocks}."
            )
        if not all(isinstance(val, (float, int)) and val > 0 for val in row):
            raise ValueError("Elements of `lengths` must be positive numbers.")

    nx_graph = nx.stochastic_block_model(sizes=sizes, p=p)
    graph = QuantumGraph(nx_graph, precompute=False)

    # Set node weights based on block
    for node in graph.nodes:
        block_idx = nx.get_node_attributes(graph, name="block")[node]
        w = weights[block_idx]
        nx.set_node_attributes(graph, {node: {"weight": w}})

    # Set edge lengths based on blocks
    for edge in graph.edges:
        block_i = nx.get_node_attributes(graph, name="block")[edge[0]]
        block_j = nx.get_node_attributes(graph, name="block")[edge[1]]
        edge_length = lengths[block_i][block_j]

        nx.set_edge_attributes(graph, {edge: {"length": edge_length, "weight": 1}})
        distrib = UniformDistribution(edge_length)
        nx.set_edge_attributes(graph, {edge: {"distribution": distrib}})

    if precompute:
        graph.precomputing()
    return graph

generate_sbm(sizes=None, p=None, precompute=True)

Generate a Stochastic Block Model quantum graph.

Creates a quantum graph from a stochastic block model with uniform edge lengths and node weights.

Parameters:

Name Type Description Default
sizes list[int] | None

Number of nodes in each block. Defaults to [50, 50]. Must be a non-empty list of positive integers.

None
p list[list[float]] | None

Matrix of edge probabilities. Element (r, s) gives the density of edges from block r to block s. Must be symmetric for undirected graphs. Defaults to [[0.7, 0.1], [0.1, 0.7]]. Must be a square matrix with probabilities in [0, 1].

None
precompute bool

If True, precompute pairwise distances (default: True).

True

Returns:

Type Description
QuantumGraph

A quantum graph representing the SBM.

Raises:

Type Description
ValueError

If sizes is empty, contains non-positive values, or if p is not a valid probability matrix matching sizes.

Example
# Two balanced clusters with high intra-cluster, low inter-cluster edges
graph = generate_sbm(sizes=[50, 50], p=[[0.7, 0.1], [0.1, 0.7]])
Source code in kmeanssa_ng/quantum_graph/generators.py
def generate_sbm(
    sizes: list[int] | None = None,
    p: list[list[float]] | None = None,
    precompute: bool = True,
) -> QuantumGraph:
    """Generate a Stochastic Block Model quantum graph.

    Creates a quantum graph from a stochastic block model with uniform
    edge lengths and node weights.

    Args:
        sizes: Number of nodes in each block. Defaults to [50, 50].
            Must be a non-empty list of positive integers.
        p: Matrix of edge probabilities. Element (r, s) gives the density
            of edges from block r to block s. Must be symmetric for undirected graphs.
            Defaults to [[0.7, 0.1], [0.1, 0.7]].
            Must be a square matrix with probabilities in [0, 1].
        precompute: If True, precompute pairwise distances (default: True).

    Returns:
        A quantum graph representing the SBM.

    Raises:
        ValueError: If sizes is empty, contains non-positive values,
            or if p is not a valid probability matrix matching sizes.

    Example:
        ```python
        # Two balanced clusters with high intra-cluster, low inter-cluster edges
        graph = generate_sbm(sizes=[50, 50], p=[[0.7, 0.1], [0.1, 0.7]])
        ```
    """
    if sizes is None:
        sizes = [50, 50]
    if p is None:
        p = [[0.7, 0.1], [0.1, 0.7]]

    # Validate sizes
    if not isinstance(sizes, list) or len(sizes) == 0:
        raise ValueError("sizes must be a non-empty list")

    for i, size in enumerate(sizes):
        try:
            size_int = int(size)
        except (TypeError, ValueError) as e:
            raise ValueError(
                f"sizes[{i}] must be an integer, got {type(size).__name__}"
            ) from e
        if size_int <= 0:
            raise ValueError(f"sizes[{i}] must be positive, got {size_int}")

    # Validate p
    if not isinstance(p, list) or len(p) == 0:
        raise ValueError("p must be a non-empty list")

    if len(p) != len(sizes):
        raise ValueError(f"p must have {len(sizes)} rows to match sizes, got {len(p)}")

    for i, row in enumerate(p):
        if not isinstance(row, list):
            raise ValueError(f"p[{i}] must be a list, got {type(row).__name__}")
        if len(row) != len(sizes):
            raise ValueError(f"p[{i}] must have {len(sizes)} columns, got {len(row)}")

        for j, prob in enumerate(row):
            try:
                prob_float = float(prob)
            except (TypeError, ValueError) as e:
                raise ValueError(
                    f"p[{i}][{j}] must be a number, got {type(prob).__name__}"
                ) from e
            if prob_float < 0 or prob_float > 1:
                raise ValueError(f"p[{i}][{j}] must be in [0, 1], got {prob_float}")

    nx_graph = nx.stochastic_block_model(sizes=sizes, p=p)
    graph = QuantumGraph(
        nx_graph, precompute=False, attr=nx.get_node_attributes(nx_graph, name="block")
    )

    # Set uniform attributes
    nx.set_node_attributes(graph, 1, "weight")
    nx.set_edge_attributes(graph, 1, "length")

    for edge in graph.edges:
        nx.set_edge_attributes(graph, {edge: {"weight": 1}})
        edge_length = graph.get_edge_data(*edge)["length"]
        distrib = UniformDistribution(edge_length)
        nx.set_edge_attributes(graph, {edge: {"distribution": distrib}})

    if precompute:
        graph.precomputing()
    return graph

generate_simple_graph(n_a=5, n_aa=3, bridge_length=2.0, precompute=True, **attr)

Generate a symmetric two-cluster graph connected by a bridge.

Creates a graph with two symmetric star-like clusters (A and B) connected by a single edge. Each cluster has a central node with n_a neighbors, and each neighbor has n_aa further neighbors.

Parameters:

Name Type Description Default
n_a int

Number of neighbors for each central node (must be >= 0).

5
n_aa int

Number of second-level neighbors (must be >= 0).

3
bridge_length float

Length of the edge connecting the two clusters (must be > 0).

2.0
precompute bool

If True, precompute pairwise distances (default: True).

True
**attr

Additional graph attributes.

{}

Returns:

Type Description
QuantumGraph

A quantum graph with two symmetric clusters.

Raises:

Type Description
ValueError

If n_a, n_aa < 0 or bridge_length <= 0.

Example
graph = generate_simple_graph(n_a=5, n_aa=3, bridge_length=2.0)
Source code in kmeanssa_ng/quantum_graph/generators.py
def generate_simple_graph(
    n_a: int = 5,
    n_aa: int = 3,
    bridge_length: float = 2.0,
    precompute: bool = True,
    **attr,
) -> QuantumGraph:
    """Generate a symmetric two-cluster graph connected by a bridge.

    Creates a graph with two symmetric star-like clusters (A and B) connected
    by a single edge. Each cluster has a central node with n_a neighbors,
    and each neighbor has n_aa further neighbors.

    Args:
        n_a: Number of neighbors for each central node (must be >= 0).
        n_aa: Number of second-level neighbors (must be >= 0).
        bridge_length: Length of the edge connecting the two clusters (must be > 0).
        precompute: If True, precompute pairwise distances (default: True).
        **attr: Additional graph attributes.

    Returns:
        A quantum graph with two symmetric clusters.

    Raises:
        ValueError: If n_a, n_aa < 0 or bridge_length <= 0.

    Example:
        ```python
        graph = generate_simple_graph(n_a=5, n_aa=3, bridge_length=2.0)
        ```
    """
    # Validate n_a
    try:
        n_a_int = int(n_a)
    except (TypeError, ValueError) as e:
        raise ValueError(f"n_a must be an integer, got {type(n_a).__name__}") from e
    if n_a_int < 0:
        raise ValueError(f"n_a must be non-negative, got {n_a_int}")

    # Validate n_aa
    try:
        n_aa_int = int(n_aa)
    except (TypeError, ValueError) as e:
        raise ValueError(f"n_aa must be an integer, got {type(n_aa).__name__}") from e
    if n_aa_int < 0:
        raise ValueError(f"n_aa must be non-negative, got {n_aa_int}")

    # Validate bridge_length
    try:
        bridge_length_float = float(bridge_length)
    except (TypeError, ValueError) as e:
        raise ValueError(
            f"bridge_length must be a number, got {type(bridge_length).__name__}"
        ) from e
    if bridge_length_float <= 0:
        raise ValueError(f"bridge_length must be positive, got {bridge_length_float}")

    graph = QuantumGraph(precompute=False, **attr)

    # Add central nodes
    graph.add_node("A0", weight=1)
    graph.add_node("B0", weight=1)
    graph.add_edge("A0", "B0", length=bridge_length_float)

    # Build cluster A
    for i in range(1, n_a_int + 1):
        node_a = f"A{i}"
        graph.add_node(node_a, weight=1)
        graph.add_edge("A0", node_a, length=1.0)

        for j in range(1, n_aa_int + 1):
            node_aa = f"{node_a}{j}"
            graph.add_node(node_aa, weight=1)
            graph.add_edge(node_a, node_aa, length=1.0)

    # Build cluster B (symmetric to A)
    for i in range(1, n_a_int + 1):
        node_b = f"B{i}"
        graph.add_node(node_b, weight=1)
        graph.add_edge("B0", node_b, length=1.0)

        for j in range(1, n_aa_int + 1):
            node_bb = f"{node_b}{j}"
            graph.add_node(node_bb, weight=1)
            graph.add_edge(node_b, node_bb, length=1.0)

    # Set edge weights and distributions
    for edge in graph.edges:
        nx.set_edge_attributes(graph, {edge: {"weight": 1.0}})
        edge_length = graph.get_edge_data(*edge)["length"]
        # Use picklable callable class instead of lambda
        distrib = UniformDistribution(edge_length)
        nx.set_edge_attributes(graph, {edge: {"distribution": distrib}})

    if precompute:
        graph.precomputing()
    return graph

generate_simple_random_graph(n_a=5, n_b=5, lam_a=0, lam_b=0, bridge_length=10.0, precompute=True, **attr)

Generate a random two-cluster graph with Poisson branching.

Similar to generate_simple_graph but with: - Asymmetric clusters (different sizes) - Random edge lengths - Poisson-distributed third-level neighbors

Parameters:

Name Type Description Default
n_a int

Number of first-level neighbors of A0.

5
n_b int

Number of first-level neighbors of B0.

5
lam_a int

Poisson parameter for A cluster third-level branching.

0
lam_b int

Poisson parameter for B cluster third-level branching.

0
bridge_length float

Mean length of the bridge edge (actual length is uniform random).

10.0
precompute bool

If True, precompute pairwise distances (default: True).

True
**attr

Additional graph attributes.

{}

Returns:

Type Description
QuantumGraph

A random quantum graph with two clusters.

Source code in kmeanssa_ng/quantum_graph/generators.py
def generate_simple_random_graph(
    n_a: int = 5,
    n_b: int = 5,
    lam_a: int = 0,
    lam_b: int = 0,
    bridge_length: float = 10.0,
    precompute: bool = True,
    **attr,
) -> QuantumGraph:
    """Generate a random two-cluster graph with Poisson branching.

    Similar to generate_simple_graph but with:
    - Asymmetric clusters (different sizes)
    - Random edge lengths
    - Poisson-distributed third-level neighbors

    Args:
        n_a: Number of first-level neighbors of A0.
        n_b: Number of first-level neighbors of B0.
        lam_a: Poisson parameter for A cluster third-level branching.
        lam_b: Poisson parameter for B cluster third-level branching.
        bridge_length: Mean length of the bridge edge (actual length is uniform random).
        precompute: If True, precompute pairwise distances (default: True).
        **attr: Additional graph attributes.

    Returns:
        A random quantum graph with two clusters.
    """
    graph = QuantumGraph(precompute=False, **attr)
    rng = np.random.default_rng()

    # Central nodes and bridge
    graph.add_node("A0", weight=5)
    graph.add_node("B0", weight=5)
    graph.add_edge(
        "A0", "B0", length=rd.uniform(0.9 * bridge_length, 1.1 * bridge_length)
    )

    # Build cluster A
    for i in range(1, n_a + 1):
        node_a = f"A{i}"
        graph.add_node(node_a, weight=3)
        graph.add_edge("A0", node_a, length=rd.uniform(0.9, 1.1))

        # Poisson-distributed third level
        num_children = rng.poisson(lam_a)
        for j in range(1, num_children + 1):
            node_aa = f"{node_a}{j}"
            graph.add_node(node_aa, weight=1)
            graph.add_edge(node_a, node_aa, length=rd.uniform(0.4, 0.6))

    # Build cluster B
    for i in range(1, n_b + 1):
        node_b = f"B{i}"
        graph.add_node(node_b, weight=3)
        graph.add_edge("B0", node_b, length=rd.uniform(0.9, 1.1))

        num_children = rng.poisson(lam_b)
        for j in range(1, num_children + 1):
            node_bb = f"{node_b}{j}"
            graph.add_node(node_bb, weight=1)
            graph.add_edge(node_b, node_bb, length=rd.uniform(0.4, 0.6))

    # Set edge weights and distributions
    node_weights = nx.get_node_attributes(graph, "weight")
    for edge in graph.edges:
        # Harmonic mean of node weights
        w = 0.5 / (1.0 / node_weights[edge[0]] + 1.0 / node_weights[edge[1]])
        nx.set_edge_attributes(graph, {edge: {"weight": w}})

        edge_length = graph.get_edge_data(*edge)["length"]
        distrib = UniformDistribution(edge_length)
        nx.set_edge_attributes(graph, {edge: {"distribution": distrib}})

    if precompute:
        graph.precomputing()
    return graph