Skip to content
Merged
156 changes: 134 additions & 22 deletions flixopt/clustering/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@
from typing import TYPE_CHECKING, Any

import numpy as np
import pandas as pd
import xarray as xr

if TYPE_CHECKING:
from ..color_processing import ColorType
from ..flow_system import FlowSystem
from ..plot_result import PlotResult
from ..statistics_accessor import SelectType

Expand Down Expand Up @@ -98,6 +98,29 @@ def __repr__(self) -> str:
f')'
)

def _create_reference_structure(self) -> tuple[dict, dict[str, xr.DataArray]]:
"""Create reference structure for serialization."""
ref = {'__class__': self.__class__.__name__}
arrays = {}

# Store DataArrays with references
arrays[str(self.cluster_order.name)] = self.cluster_order
ref['cluster_order'] = f':::{self.cluster_order.name}'

arrays[str(self.cluster_occurrences.name)] = self.cluster_occurrences
ref['cluster_occurrences'] = f':::{self.cluster_occurrences.name}'

# Store scalar values
if isinstance(self.n_clusters, xr.DataArray):
arrays[str(self.n_clusters.name)] = self.n_clusters
ref['n_clusters'] = f':::{self.n_clusters.name}'
else:
ref['n_clusters'] = int(self.n_clusters)

ref['timesteps_per_cluster'] = self.timesteps_per_cluster

return ref, arrays

@property
def n_original_periods(self) -> int:
"""Number of original periods (before clustering)."""
Expand Down Expand Up @@ -281,10 +304,9 @@ def __post_init__(self):
self.timestep_mapping = self.timestep_mapping.rename('timestep_mapping')

# Ensure representative_weights is a DataArray
# Can be (cluster, time) for 2D structure or (time,) for flat structure
if not isinstance(self.representative_weights, xr.DataArray):
self.representative_weights = xr.DataArray(
self.representative_weights, dims=['time'], name='representative_weights'
)
self.representative_weights = xr.DataArray(self.representative_weights, name='representative_weights')
elif self.representative_weights.name is None:
self.representative_weights = self.representative_weights.rename('representative_weights')

Expand All @@ -304,6 +326,37 @@ def __repr__(self) -> str:
f')'
)

def _create_reference_structure(self) -> tuple[dict, dict[str, xr.DataArray]]:
"""Create reference structure for serialization."""
ref = {'__class__': self.__class__.__name__}
arrays = {}

# Store DataArrays with references
arrays[str(self.timestep_mapping.name)] = self.timestep_mapping
ref['timestep_mapping'] = f':::{self.timestep_mapping.name}'

arrays[str(self.representative_weights.name)] = self.representative_weights
ref['representative_weights'] = f':::{self.representative_weights.name}'

# Store scalar values
if isinstance(self.n_representatives, xr.DataArray):
n_rep_name = self.n_representatives.name or 'n_representatives'
self.n_representatives = self.n_representatives.rename(n_rep_name)
arrays[n_rep_name] = self.n_representatives
ref['n_representatives'] = f':::{n_rep_name}'
else:
ref['n_representatives'] = int(self.n_representatives)

# Store nested ClusterStructure if present
if self.cluster_structure is not None:
cs_ref, cs_arrays = self.cluster_structure._create_reference_structure()
ref['cluster_structure'] = cs_ref
arrays.update(cs_arrays)

# Skip aggregated_data and original_data - not needed for serialization

return ref, arrays

@property
def n_original_timesteps(self) -> int:
"""Number of original timesteps (before aggregation)."""
Expand Down Expand Up @@ -460,24 +513,50 @@ def validate(self) -> None:
if max_idx >= n_rep:
raise ValueError(f'timestep_mapping contains index {max_idx} but n_representatives is {n_rep}')

# Check weights length matches n_representatives
if len(self.representative_weights) != n_rep:
raise ValueError(
f'representative_weights has {len(self.representative_weights)} elements '
f'but n_representatives is {n_rep}'
)
# Check weights dimensions
# representative_weights should have (cluster,) dimension with n_clusters elements
# (plus optional period/scenario dimensions)
if self.cluster_structure is not None:
n_clusters = self.cluster_structure.n_clusters
if 'cluster' in self.representative_weights.dims:
weights_n_clusters = self.representative_weights.sizes['cluster']
if weights_n_clusters != n_clusters:
raise ValueError(
f'representative_weights has {weights_n_clusters} clusters '
f'but cluster_structure has {n_clusters}'
)

# Check weights sum roughly equals original timesteps
weight_sum = float(self.representative_weights.sum().values)
n_original = self.n_original_timesteps
if abs(weight_sum - n_original) > 1e-6:
# Warning only - some aggregation methods may not preserve this exactly
import warnings
# Check weights sum roughly equals number of original periods
# (each weight is how many original periods that cluster represents)
# Sum should be checked per period/scenario slice, not across all dimensions
if self.cluster_structure is not None:
n_original_periods = self.cluster_structure.n_original_periods
# Sum over cluster dimension only (keep period/scenario if present)
weight_sum_per_slice = self.representative_weights.sum(dim='cluster')
# Check each slice
if weight_sum_per_slice.size == 1:
# Simple case: no period/scenario
weight_sum = float(weight_sum_per_slice.values)
if abs(weight_sum - n_original_periods) > 1e-6:
import warnings

warnings.warn(
f'representative_weights sum ({weight_sum}) does not match '
f'n_original_periods ({n_original_periods})',
stacklevel=2,
)
else:
# Multi-dimensional: check each slice
for val in weight_sum_per_slice.values.flat:
if abs(float(val) - n_original_periods) > 1e-6:
import warnings

warnings.warn(
f'representative_weights sum ({weight_sum}) does not match n_original_timesteps ({n_original})',
stacklevel=2,
)
warnings.warn(
f'representative_weights sum per slice ({float(val)}) does not match '
f'n_original_periods ({n_original_periods})',
stacklevel=2,
)
break # Only warn once
Comment on lines +518 to +561
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Fix validate() when cluster_structure.n_clusters is a DataArray and guard cluster dim assumptions.

Two things here can cause runtime errors depending on how clustering backends populate cluster_structure:

  • n_clusters = self.cluster_structure.n_clusters followed by if weights_n_clusters != n_clusters: will break when n_clusters is an xr.DataArray (the comparison yields a DataArray and if ... raises an ambiguity error). Coerce to a Python int first, as done elsewhere:

    n_clusters_da = self.cluster_structure.n_clusters
    n_clusters = (
        int(n_clusters_da)
        if isinstance(n_clusters_da, (int, np.integer))
        else int(n_clusters_da.values)
    )
  • Later, weight_sum_per_slice = self.representative_weights.sum(dim='cluster') assumes a 'cluster' dimension whenever cluster_structure is present. If representative_weights is created in a flat time‑only form for some backends, this will raise a KeyError. Either enforce 'cluster' in representative_weights.dims at construction time or short‑circuit/raise a clearer error here if 'cluster' is missing.

These issues are correctness problems once n_clusters is provided as a DataArray or if a backend supplies non‑clustered weights alongside a cluster_structure.



class ClusteringPlotAccessor:
Expand Down Expand Up @@ -911,7 +990,6 @@ class Clustering:

Attributes:
result: The ClusterResult from the aggregation backend.
original_flow_system: Reference to the FlowSystem before aggregation.
backend_name: Name of the aggregation backend used (e.g., 'tsam', 'manual').

Example:
Expand All @@ -923,9 +1001,23 @@ class Clustering:
"""

result: ClusterResult
original_flow_system: FlowSystem # FlowSystem - avoid circular import
backend_name: str = 'unknown'

def _create_reference_structure(self) -> tuple[dict, dict[str, xr.DataArray]]:
"""Create reference structure for serialization."""
ref = {'__class__': self.__class__.__name__}
arrays = {}

# Store nested ClusterResult
result_ref, result_arrays = self.result._create_reference_structure()
ref['result'] = result_ref
arrays.update(result_arrays)

# Store scalar values
ref['backend_name'] = self.backend_name

return ref, arrays

def __repr__(self) -> str:
cs = self.result.cluster_structure
if cs is not None:
Expand Down Expand Up @@ -1024,6 +1116,14 @@ def cluster_start_positions(self) -> np.ndarray:
n_timesteps = self.n_clusters * self.timesteps_per_period
return np.arange(0, n_timesteps, self.timesteps_per_period)

@property
def original_timesteps(self) -> pd.DatetimeIndex:
"""Original timesteps before clustering.

Derived from the 'original_time' coordinate of timestep_mapping.
"""
return pd.DatetimeIndex(self.result.timestep_mapping.coords['original_time'].values)


def create_cluster_structure_from_mapping(
timestep_mapping: xr.DataArray,
Expand Down Expand Up @@ -1073,3 +1173,15 @@ def create_cluster_structure_from_mapping(
n_clusters=n_clusters,
timesteps_per_cluster=timesteps_per_cluster,
)


def _register_clustering_classes():
"""Register clustering classes for IO.

Called from flow_system.py after all imports are complete to avoid circular imports.
"""
from ..structure import CLASS_REGISTRY

CLASS_REGISTRY['ClusterStructure'] = ClusterStructure
CLASS_REGISTRY['ClusterResult'] = ClusterResult
CLASS_REGISTRY['Clustering'] = Clustering
18 changes: 7 additions & 11 deletions flixopt/elements.py
Original file line number Diff line number Diff line change
Expand Up @@ -677,14 +677,13 @@ def _do_modeling(self):
self._constraint_flow_rate()

# Total flow hours tracking (per period)
# Sum over all temporal dimensions (time, and cluster if present)
weighted_flow = self.flow_rate * self._model.aggregation_weight
# Get temporal_dims from aggregation_weight (not weighted_flow which has linopy's _term dim)
temporal_dims = [d for d in self._model.aggregation_weight.dims if d not in ('period', 'scenario')]
# Sum over temporal dimensions, weighted by cluster_weight
flow_hours = self.flow_rate * self._model.timestep_duration
weighted_flow_hours = flow_hours * self._model.cluster_weight
ModelingPrimitives.expression_tracking_variable(
model=self,
name=f'{self.label_full}|total_flow_hours',
tracked_expression=weighted_flow.sum(temporal_dims),
tracked_expression=weighted_flow_hours.sum(self._model.temporal_dims),
bounds=(
self.element.flow_hours_min if self.element.flow_hours_min is not None else 0,
self.element.flow_hours_max if self.element.flow_hours_max is not None else None,
Expand Down Expand Up @@ -841,9 +840,8 @@ def _create_bounds_for_load_factor(self):
# Get the size (either from element or investment)
size = self.investment.size if self.with_investment else self.element.size

# Sum over all temporal dimensions (time, and cluster if present)
temporal_dims = [d for d in self._model.aggregation_weight.dims if d not in ('period', 'scenario')]
total_hours = self._model.aggregation_weight.sum(temporal_dims)
# Sum over temporal dimensions, weighted by cluster_weight
total_hours = (self._model.timestep_duration * self._model.cluster_weight).sum(self._model.temporal_dims)

# Maximum load factor constraint
if self.element.load_factor_max is not None:
Expand Down Expand Up @@ -959,9 +957,7 @@ def _do_modeling(self):

# Add virtual supply/demand to balance and penalty if needed
if self.element.allows_imbalance:
imbalance_penalty = np.multiply(
self._model.aggregation_weight, self.element.imbalance_penalty_per_flow_hour
)
imbalance_penalty = self.element.imbalance_penalty_per_flow_hour * self._model.timestep_duration

self.virtual_supply = self.add_variables(
lower=0, coords=self._model.get_coords(), short_name='virtual_supply'
Expand Down
17 changes: 8 additions & 9 deletions flixopt/features.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,21 +197,20 @@ def _do_modeling(self):
self.add_constraints(self.status + inactive == 1, short_name='complementary')

# 3. Total duration tracking using existing pattern
# Sum over all temporal dimensions (time, and cluster if present)
weighted_status = self.status * self._model.aggregation_weight
# Get temporal_dims from aggregation_weight (not weighted_status which has linopy's _term dim)
temporal_dims = [d for d in self._model.aggregation_weight.dims if d not in ('period', 'scenario')]
agg_weight_sum = self._model.aggregation_weight.sum(temporal_dims)
# Sum over temporal dimensions, weighted by cluster_weight
active_hours = self.status * self._model.timestep_duration
weighted_active_hours = active_hours * self._model.cluster_weight
total_hours = (self._model.timestep_duration * self._model.cluster_weight).sum(self._model.temporal_dims)
ModelingPrimitives.expression_tracking_variable(
self,
tracked_expression=weighted_status.sum(temporal_dims),
tracked_expression=weighted_active_hours.sum(self._model.temporal_dims),
bounds=(
self.parameters.active_hours_min if self.parameters.active_hours_min is not None else 0,
self.parameters.active_hours_max
if self.parameters.active_hours_max is not None
else agg_weight_sum.max().item()
if hasattr(agg_weight_sum, 'max')
else agg_weight_sum,
else total_hours.max().item()
if hasattr(total_hours, 'max')
else total_hours,
),
short_name='active_hours',
coords=['period', 'scenario'],
Expand Down
Loading
Loading