diff --git a/pyproject.toml b/pyproject.toml index 8ea231a..7209fe2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -116,6 +116,7 @@ extend-fixable = ["B", "SIM", "RUF", "C4", "UP"] [tool.ruff.lint.per-file-ignores] "tests/**/*.py" = ["S101"] "scripts/**/*.py" = ["T201", "RUF003"] +"src/fluxopt/types.py" = ["UP040"] # TypeAlias for mkdocstrings compatibility "docs/notebooks/*.ipynb" = ["T201", "RUF001", "RUF003"] [tool.ruff.format] diff --git a/src/fluxopt/__init__.py b/src/fluxopt/__init__.py index 0abbe29..585f299 100644 --- a/src/fluxopt/__init__.py +++ b/src/fluxopt/__init__.py @@ -7,9 +7,13 @@ from fluxopt.model_data import Dims, ModelData from fluxopt.results import Result from fluxopt.types import ( + ArrayInput, IdList, + OnceEffectInput, + PeriodicInput, + TemporalInput, + TemporalPeriodicInput, TimeIndex, - TimeSeries, Timesteps, as_dataarray, ) @@ -63,6 +67,7 @@ def optimize( __all__ = [ 'PENALTY_EFFECT_ID', + 'ArrayInput', 'Carrier', 'Converter', 'Dims', @@ -72,13 +77,16 @@ def optimize( 'IdList', 'Investment', 'ModelData', + 'OnceEffectInput', + 'PeriodicInput', 'Port', 'Result', 'Sizing', 'Status', 'Storage', + 'TemporalInput', + 'TemporalPeriodicInput', 'TimeIndex', - 'TimeSeries', 'Timesteps', 'as_dataarray', 'optimize', diff --git a/src/fluxopt/components.py b/src/fluxopt/components.py index 701749d..18c9c1e 100644 --- a/src/fluxopt/components.py +++ b/src/fluxopt/components.py @@ -8,7 +8,7 @@ if TYPE_CHECKING: from fluxopt.elements import Flow - from fluxopt.types import TimeSeries + from fluxopt.types import TemporalInput def _qualify_flows(component_id: str, flows: list[Flow]) -> IdList[Flow]: @@ -49,7 +49,7 @@ class Converter: id: str inputs: list[Flow] | IdList[Flow] outputs: list[Flow] | IdList[Flow] - conversion_factors: list[dict[str, TimeSeries]] = field(default_factory=list) # a_f + conversion_factors: list[dict[str, TemporalInput]] = field(default_factory=list) # a_f _short_to_id: dict[str, str] = field(init=False, default_factory=dict) def __post_init__(self) -> None: @@ -59,7 +59,7 @@ def __post_init__(self) -> None: self._short_to_id = {f.short_id: f.id for f in (*self.inputs, *self.outputs)} @classmethod - def _single_io(cls, id: str, coefficient: TimeSeries, input_flow: Flow, output_flow: Flow) -> Converter: + def _single_io(cls, id: str, coefficient: TemporalInput, input_flow: Flow, output_flow: Flow) -> Converter: """Create a single-input/single-output converter: input * coefficient = output.""" return cls( id, @@ -69,7 +69,7 @@ def _single_io(cls, id: str, coefficient: TimeSeries, input_flow: Flow, output_f ) @classmethod - def boiler(cls, id: str, thermal_efficiency: TimeSeries, fuel_flow: Flow, thermal_flow: Flow) -> Converter: + def boiler(cls, id: str, thermal_efficiency: TemporalInput, fuel_flow: Flow, thermal_flow: Flow) -> Converter: """Create a boiler converter: fuel * eta = thermal. Args: @@ -84,7 +84,7 @@ def boiler(cls, id: str, thermal_efficiency: TimeSeries, fuel_flow: Flow, therma def heat_pump( cls, id: str, - cop: TimeSeries, + cop: TemporalInput, electrical_flow: Flow, source_flow: Flow, thermal_flow: Flow, @@ -113,7 +113,7 @@ def heat_pump( ) @classmethod - def power2heat(cls, id: str, efficiency: TimeSeries, electrical_flow: Flow, thermal_flow: Flow) -> Converter: + def power2heat(cls, id: str, efficiency: TemporalInput, electrical_flow: Flow, thermal_flow: Flow) -> Converter: """Create an electric resistance heater: electrical * eta = thermal. Args: @@ -128,8 +128,8 @@ def power2heat(cls, id: str, efficiency: TimeSeries, electrical_flow: Flow, ther def chp( cls, id: str, - eta_el: TimeSeries, - eta_th: TimeSeries, + eta_el: TemporalInput, + eta_th: TemporalInput, fuel_flow: Flow, electrical_flow: Flow, thermal_flow: Flow, diff --git a/src/fluxopt/elements.py b/src/fluxopt/elements.py index 2c290d7..c096407 100644 --- a/src/fluxopt/elements.py +++ b/src/fluxopt/elements.py @@ -4,7 +4,12 @@ from typing import TYPE_CHECKING if TYPE_CHECKING: - from fluxopt.types import TimeSeries + from fluxopt.types import ( + OnceEffectInput, + PeriodicInput, + TemporalInput, + TemporalPeriodicInput, + ) PENALTY_EFFECT_ID = 'penalty' @@ -55,8 +60,8 @@ class Sizing: min_size: float max_size: float mandatory: bool = True - effects_per_size: dict[str, TimeSeries] = field(default_factory=dict) - effects_fixed: dict[str, TimeSeries] = field(default_factory=dict) + effects_per_size: dict[str, PeriodicInput] = field(default_factory=dict) + effects_fixed: dict[str, PeriodicInput] = field(default_factory=dict) @dataclass @@ -73,8 +78,10 @@ class Investment: mandatory: If True, must build exactly once; if False, may build at most once. lifetime: Periods active after build; None = forever. prior_size: Pre-existing capacity available from period 0. - effects_per_size: One-time per-MW costs charged in the build period. - effects_fixed: One-time fixed costs charged in the build period. + effects_per_size: One-time per-MW costs. Scalar or 1D ``(build_period,)`` + → diagonal (cost lands in the build period). 2D ``(period, build_period)`` + → as-is (e.g. installment plans, learning curves). + effects_fixed: One-time fixed costs. Same expansion rules as effects_per_size. effects_per_size_periodic: Recurring per-MW costs charged every active period. effects_fixed_periodic: Recurring fixed costs charged every active period. """ @@ -84,10 +91,10 @@ class Investment: mandatory: bool = True lifetime: int | None = None prior_size: float = 0.0 - effects_per_size: dict[str, TimeSeries] = field(default_factory=dict) - effects_fixed: dict[str, TimeSeries] = field(default_factory=dict) - effects_per_size_periodic: dict[str, TimeSeries] = field(default_factory=dict) - effects_fixed_periodic: dict[str, TimeSeries] = field(default_factory=dict) + effects_per_size: dict[str, OnceEffectInput] = field(default_factory=dict) + effects_fixed: dict[str, OnceEffectInput] = field(default_factory=dict) + effects_per_size_periodic: dict[str, PeriodicInput] = field(default_factory=dict) + effects_fixed_periodic: dict[str, PeriodicInput] = field(default_factory=dict) @dataclass @@ -102,8 +109,8 @@ class Status: max_uptime: float | None = None # [h] min_downtime: float | None = None # [h] max_downtime: float | None = None # [h] - effects_per_running_hour: dict[str, TimeSeries] = field(default_factory=dict) - effects_per_startup: dict[str, TimeSeries] = field(default_factory=dict) + effects_per_running_hour: dict[str, TemporalPeriodicInput] = field(default_factory=dict) + effects_per_startup: dict[str, PeriodicInput] = field(default_factory=dict) @dataclass(eq=False) @@ -129,10 +136,10 @@ class Flow: id: str = field(init=False, default='') node: str | None = None size: float | Sizing | Investment | None = None # P̄_f [MW] - relative_minimum: TimeSeries = 0.0 # p̲_f [-] - relative_maximum: TimeSeries = 1.0 # p̄_f [-] - fixed_relative_profile: TimeSeries | None = None # π_f [-] - effects_per_flow_hour: dict[str, TimeSeries] = field(default_factory=dict) # c_{f,k} [varies] + relative_minimum: TemporalInput = 0.0 # p̲_f [-] + relative_maximum: TemporalInput = 1.0 # p̄_f [-] + fixed_relative_profile: TemporalInput | None = None # π_f [-] + effects_per_flow_hour: dict[str, TemporalPeriodicInput] = field(default_factory=dict) # c_{f,k} [varies] status: Status | None = None prior_rates: list[float] | None = None # flow rates before horizon [MW] @@ -156,10 +163,10 @@ class Effect: is_objective: bool = False maximum_total: float | None = None # Φ̄_k [unit] minimum_total: float | None = None # Φ̲_k [unit] - maximum_per_hour: TimeSeries | None = None # Φ̄_{k,t} [unit] - minimum_per_hour: TimeSeries | None = None # Φ̲_{k,t} [unit] - contribution_from: dict[str, TimeSeries] = field(default_factory=dict) - contribution_from_per_hour: dict[str, TimeSeries] = field(default_factory=dict) + maximum_per_hour: TemporalInput | None = None # Φ̄_{k,t} [unit] + minimum_per_hour: TemporalInput | None = None # Φ̲_{k,t} [unit] + contribution_from: dict[str, PeriodicInput] = field(default_factory=dict) + contribution_from_per_hour: dict[str, TemporalPeriodicInput] = field(default_factory=dict) period_weights_periodic: list[float] | None = None # ω_periodic[p] — scales temporal+periodic period_weights_once: list[float] | None = None # ω_once[p] — scales once @@ -183,13 +190,13 @@ class Storage: charging: Flow discharging: Flow capacity: float | Sizing | Investment | None = None # Ē_s [MWh] - eta_charge: TimeSeries = 1.0 # η^c_s [-] - eta_discharge: TimeSeries = 1.0 # η^d_s [-] - relative_loss_per_hour: TimeSeries = 0.0 # δ_s [1/h] + eta_charge: TemporalInput = 1.0 # η^c_s [-] + eta_discharge: TemporalInput = 1.0 # η^d_s [-] + relative_loss_per_hour: TemporalInput = 0.0 # δ_s [1/h] prior_level: float | None = None # E_{s,0} [MWh] cyclic: bool = True # E_{s,first} == E_{s,last} - relative_minimum_level: TimeSeries = 0.0 # e̲_s [-] - relative_maximum_level: TimeSeries = 1.0 # ē_s [-] + relative_minimum_level: TemporalInput = 0.0 # e̲_s [-] + relative_maximum_level: TemporalInput = 1.0 # ē_s [-] def __post_init__(self) -> None: """Validate carrier match, rename colliding flow ids, and qualify.""" diff --git a/src/fluxopt/model.py b/src/fluxopt/model.py index 715b1a3..f21d652 100644 --- a/src/fluxopt/model.py +++ b/src/fluxopt/model.py @@ -877,17 +877,20 @@ def _create_effects(self) -> None: once_direct: Any = 0 - # Investment: one-time per-size costs (charged in build period) + # Investment: one-time per-size costs — (flow, effect, period, build_period) + # Multiply by invest_size_at_build renamed to build_period, sum over both. if self.invest_size_at_build is not None and d.flows.invest_effects_per_size is not None: eps_once = d.flows.invest_effects_per_size.rename({'invest_flow': 'flow'}) if (eps_once != 0).any(): - once_direct = once_direct + (eps_once * self.invest_size_at_build).sum('flow') + sab_bp = self.invest_size_at_build.rename({'period': 'build_period'}) + once_direct = once_direct + (eps_once * sab_bp).sum(['flow', 'build_period']) - # Investment: one-time fixed costs (charged in build period) + # Investment: one-time fixed costs — (flow, effect, period, build_period) if self.invest_build is not None and d.flows.invest_effects_fixed is not None: ef_once = d.flows.invest_effects_fixed.rename({'invest_flow': 'flow'}) if (ef_once != 0).any(): - once_direct = once_direct + (ef_once * self.invest_build).sum('flow') + bld_bp = self.invest_build.rename({'period': 'build_period'}) + once_direct = once_direct + (ef_once * bld_bp).sum(['flow', 'build_period']) self.m.add_constraints(self.effect_once == once_direct, name='effect_once_eq') diff --git a/src/fluxopt/model_data.py b/src/fluxopt/model_data.py index c4d1c4a..20deb74 100644 --- a/src/fluxopt/model_data.py +++ b/src/fluxopt/model_data.py @@ -13,7 +13,7 @@ if TYPE_CHECKING: from fluxopt.components import Converter, Port from fluxopt.elements import Carrier, Effect, Flow, Investment, Sizing, Status, Storage - from fluxopt.types import TimeIndex, Timesteps + from fluxopt.types import OnceEffectInput, TimeIndex, Timesteps @dataclass(frozen=True) @@ -59,6 +59,55 @@ def _effect_template( ) +def _expand_once_effect(value: OnceEffectInput, period: pd.Index) -> xr.DataArray: + """Expand an investment once-effect value to 2D (period, build_period). + + Construction rule: + - Scalar → diagonal filled with that constant + - list/array → treated as ``(build_period,)`` → diagonal + - 1D DataArray ``(build_period,)`` or ``(period,)`` → diagonal + - 2D DataArray ``(period, build_period)`` → as-is + + Args: + value: Scalar, list, 1D, or 2D effect value. + period: Period index (shared by both axes). + """ + n = len(period) + coords: dict[str, Any] = {'period': period, 'build_period': period} + dims = ['period', 'build_period'] + + if isinstance(value, (int, float)): + return xr.DataArray(np.eye(n) * float(value), dims=dims, coords=coords) + + if isinstance(value, xr.DataArray): + vdims = {str(d) for d in value.dims} + if vdims == {'period', 'build_period'}: + aligned = value.reindex(period=period, build_period=period) + if aligned.isnull().any(): + raise ValueError( + f'Once-effect DataArray (period, build_period) has coords that do not ' + f'fully cover the model periods {list(period)}' + ) + return aligned + if vdims <= {'period', 'build_period'} and len(vdims) == 1: + dim_name = next(iter(vdims)) + aligned = value.reindex({dim_name: period}) + if aligned.isnull().any(): + raise ValueError( + f'Once-effect DataArray with dim {dim_name!r} has coords that do not ' + f'match the model periods {list(period)}' + ) + return xr.DataArray(np.diag(aligned.values), dims=dims, coords=coords) + foreign = [str(d) for d in value.dims if d not in ('period', 'build_period')] + if foreign: + raise ValueError( + f'Once-effect DataArray has unexpected dims {foreign}. Expected subset of (period, build_period).' + ) + + da = as_dataarray(value, {'build_period': period}) + return xr.DataArray(np.diag(da.values), dims=dims, coords=coords) + + _NC_GROUPS = { 'flows': 'model/flows', 'carriers': 'model/carriers', @@ -175,8 +224,8 @@ class _InvestmentArrays: mandatory: xr.DataArray | None = None # (invest_dim,) lifetime: xr.DataArray | None = None # (invest_dim,) — NaN = forever prior_size: xr.DataArray | None = None # (invest_dim,) - effects_per_size: xr.DataArray | None = None # (invest_dim, effect, period?) — once - effects_fixed: xr.DataArray | None = None # (invest_dim, effect, period?) — once + effects_per_size: xr.DataArray | None = None # (invest_dim, effect, period, build_period) — once + effects_fixed: xr.DataArray | None = None # (invest_dim, effect, period, build_period) — once effects_per_size_periodic: xr.DataArray | None = None # (invest_dim, effect, period?) effects_fixed_periodic: xr.DataArray | None = None # (invest_dim, effect, period?) @@ -200,7 +249,25 @@ def build( return cls() effect_set = set(effect_ids) - tmpl = _effect_template({'effect': effect_ids}, period) + periodic_tmpl = _effect_template({'effect': effect_ids}, period) + + # Once-effect template: (effect, period, build_period) when multi-period + once_coords: dict[str, Any] + once_shape: tuple[int, ...] + once_dims: tuple[str, ...] + if period is not None: + n_p = len(period) + once_coords = { + 'effect': effect_ids, + 'period': period, + 'build_period': period, + } + once_shape = (len(effect_ids), n_p, n_p) + once_dims = ('effect', 'period', 'build_period') + else: + once_coords = {'effect': effect_ids} + once_shape = (len(effect_ids),) + once_dims = ('effect',) ids: list[str] = [] mins: list[float] = [] @@ -230,17 +297,31 @@ def build( lifetimes.append(float(inv.lifetime) if inv.lifetime is not None else np.nan) prior_sizes.append(inv.prior_size) + # Once-effects: expand to (effect, period, build_period) via diagonal rule for label, src_dict, dest_key in [ ('Investment.effects_per_size', inv.effects_per_size, 'eps'), ('Investment.effects_fixed', inv.effects_fixed, 'ef'), + ]: + arr = xr.DataArray(np.zeros(once_shape), dims=list(once_dims), coords=once_coords) + for ek, ev in src_dict.items(): + if ek not in effect_set: + raise ValueError(f'Unknown effect {ek!r} in {label} on {item_id!r}') + if period is not None: + arr.loc[ek] = _expand_once_effect(ev, period) + else: + arr.loc[ek] = as_dataarray(ev, {}) + all_slices[dest_key].append(arr) + + # Periodic effects: (effect, period?) — no build_period axis + for label, src_dict, dest_key in [ ('Investment.effects_per_size_periodic', inv.effects_per_size_periodic, 'eps_p'), ('Investment.effects_fixed_periodic', inv.effects_fixed_periodic, 'ef_p'), ]: - arr = tmpl.zeros() + arr = periodic_tmpl.zeros() for ek, ev in src_dict.items(): if ek not in effect_set: raise ValueError(f'Unknown effect {ek!r} in {label} on {item_id!r}') - arr.loc[ek] = as_dataarray(ev, tmpl.as_da_coords) + arr.loc[ek] = as_dataarray(ev, periodic_tmpl.as_da_coords) all_slices[dest_key].append(arr) coords = {dim: ids} @@ -420,8 +501,8 @@ class FlowsData: invest_mandatory: xr.DataArray | None = None # (invest_flow,) invest_lifetime: xr.DataArray | None = None # (invest_flow,) — NaN = forever invest_prior_size: xr.DataArray | None = None # (invest_flow,) - invest_effects_per_size: xr.DataArray | None = None # (invest_flow, effect, period?) — once - invest_effects_fixed: xr.DataArray | None = None # (invest_flow, effect, period?) — once + invest_effects_per_size: xr.DataArray | None = None # (invest_flow, effect, period, build_period) — once + invest_effects_fixed: xr.DataArray | None = None # (invest_flow, effect, period, build_period) — once invest_effects_per_size_periodic: xr.DataArray | None = None # (invest_flow, effect, period?) invest_effects_fixed_periodic: xr.DataArray | None = None # (invest_flow, effect, period?) @@ -1030,8 +1111,8 @@ class StoragesData: invest_mandatory: xr.DataArray | None = None # (invest_storage,) invest_lifetime: xr.DataArray | None = None # (invest_storage,) — NaN = forever invest_prior_size: xr.DataArray | None = None # (invest_storage,) - invest_effects_per_size: xr.DataArray | None = None # (invest_storage, effect, period?) — once - invest_effects_fixed: xr.DataArray | None = None # (invest_storage, effect, period?) — once + invest_effects_per_size: xr.DataArray | None = None # (invest_storage, effect, period, build_period) — once + invest_effects_fixed: xr.DataArray | None = None # (invest_storage, effect, period, build_period) — once invest_effects_per_size_periodic: xr.DataArray | None = None # (invest_storage, effect, period?) invest_effects_fixed_periodic: xr.DataArray | None = None # (invest_storage, effect, period?) diff --git a/src/fluxopt/types.py b/src/fluxopt/types.py index 304ea58..4b3c0c1 100644 --- a/src/fluxopt/types.py +++ b/src/fluxopt/types.py @@ -1,7 +1,7 @@ from __future__ import annotations from datetime import datetime -from typing import TYPE_CHECKING, Any, Protocol, overload, runtime_checkable +from typing import TYPE_CHECKING, Annotated, Any, Protocol, TypeAlias, overload, runtime_checkable import numpy as np import pandas as pd @@ -11,11 +11,32 @@ from collections.abc import Iterable, Iterator, Mapping # -- User input types -------------------------------------------------- -type TimeSeries = float | int | list[float] | np.ndarray | pd.Series | xr.DataArray -type Timesteps = list[datetime] | list[int] | pd.DatetimeIndex | pd.Index + +ArrayInput: TypeAlias = float | int | list[float] | np.ndarray | pd.Series | xr.DataArray +"""Flexible user input: scalar, sequence, or labeled array. + +Normalized to `xr.DataArray` by `as_dataarray()`. +""" + +TemporalInput: TypeAlias = Annotated[ArrayInput, 'dims ⊆ {time}'] +"""Scalar or array with dims ⊆ {time}.""" + +TemporalPeriodicInput: TypeAlias = Annotated[ArrayInput, 'dims ⊆ {time, period}'] +"""Scalar or array with dims ⊆ {time, period}.""" + +PeriodicInput: TypeAlias = Annotated[ArrayInput, 'dims ⊆ {period}'] +"""Scalar or array with dims ⊆ {period}.""" + +OnceEffectInput: TypeAlias = Annotated[ArrayInput, 'dims ⊆ {period, build_period}'] +"""Scalar or array with dims ⊆ {period, build_period}.""" + +Timesteps: TypeAlias = list[datetime] | list[int] | pd.DatetimeIndex | pd.Index +"""User-provided timestep labels.""" # -- Internal types (after normalization) ------------------------------ -type TimeIndex = pd.DatetimeIndex | pd.Index + +TimeIndex: TypeAlias = pd.DatetimeIndex | pd.Index +"""Normalized time index (after `normalize_timesteps`).""" @runtime_checkable @@ -109,13 +130,13 @@ def fast_concat(arrays: list[xr.DataArray], dim: pd.Index) -> xr.DataArray: def as_dataarray( - value: TimeSeries, + value: ArrayInput, coords: Mapping[str, Any], *, name: str = 'value', broadcast: bool = True, ) -> xr.DataArray: - """Convert a TimeSeries to a DataArray aligned to given coordinates. + """Convert an ArrayInput to a DataArray aligned to given coordinates. Args: value: Scalar, list, ndarray, Series, or DataArray. @@ -161,7 +182,7 @@ def as_dataarray( elif isinstance(value, list): arr = np.array(value, dtype=float) else: - raise TypeError(f'Unsupported TimeSeries type: {type(value)}') + raise TypeError(f'Unsupported ArrayInput type: {type(value)}') n = len(arr) matches = [k for k, v in coord_idx.items() if len(v) == n] diff --git a/tests/math_port/test_multi_period.py b/tests/math_port/test_multi_period.py index 88c7d37..308d341 100644 --- a/tests/math_port/test_multi_period.py +++ b/tests/math_port/test_multi_period.py @@ -778,3 +778,156 @@ def test_storage_sizing_effects_per_size_vary_by_period(self, optimize): period_weights=[1, 1], ) assert_allclose(result.objective, 40.0, rtol=1e-4) + + +class TestBuildPeriodEffects: + def test_capex_per_size_build_period_dim(self, optimize): + """Proves: 1D (build_period,) input produces diagonal — cost charged when built. + + CAPEX = 10 if built in 2020, 20 if built in 2025. Mandatory, size=10. + Optimizer builds in cheapest period (2020). Once cost = 10*10 = 100. + """ + _xfail_if_validate(optimize) + periods = [2020, 2025] + capex = xr.DataArray([10.0, 20.0], dims=['build_period'], coords={'build_period': periods}) + result = optimize( + timesteps=ts(3), + carriers=[Carrier('Heat')], + effects=[Effect('cost', is_objective=True)], + ports=[ + Port( + 'Demand', + exports=[ + Flow('Heat', size=1, fixed_relative_profile=np.array([10, 10, 10])), + ], + ), + Port( + 'Grid', + imports=[ + Flow( + 'Heat', + size=Investment(10, 10, effects_per_size={'cost': capex}), + ), + ], + ), + ], + periods=periods, + period_weights=[1, 1], + ) + assert_allclose(result.objective, 100.0, rtol=1e-4) + + def test_capex_fixed_build_period_dim(self, optimize): + """Proves: 1D (build_period,) fixed cost — charged when built. + + Fixed CAPEX = 50 if built in 2020, 100 if built in 2025. Mandatory. + Builds in 2020. Once cost = 50. + """ + _xfail_if_validate(optimize) + periods = [2020, 2025] + capex = xr.DataArray([50.0, 100.0], dims=['build_period'], coords={'build_period': periods}) + result = optimize( + timesteps=ts(3), + carriers=[Carrier('Heat')], + effects=[Effect('cost', is_objective=True)], + ports=[ + Port( + 'Demand', + exports=[ + Flow('Heat', size=1, fixed_relative_profile=np.array([10, 10, 10])), + ], + ), + Port( + 'Grid', + imports=[ + Flow( + 'Heat', + size=Investment(10, 10, effects_fixed={'cost': capex}), + ), + ], + ), + ], + periods=periods, + period_weights=[1, 1], + ) + assert_allclose(result.objective, 50.0, rtol=1e-4) + + def test_capex_2d_spread_across_periods(self, optimize): + """Proves: 2D (period, build_period) matrix spreads costs across periods. + + Building in 2020 costs 5/MW in 2020 + 5/MW in 2025 (installment plan). + Building in 2025 costs 15/MW in 2025 only. + Size=10, mandatory. Optimizer picks 2020: total = 10*(5+5) = 100. + vs 2025: total = 10*15 = 150. Objective = 100. + """ + _xfail_if_validate(optimize) + periods = [2020, 2025] + capex_2d = xr.DataArray( + [[5.0, 0.0], [5.0, 15.0]], + dims=['period', 'build_period'], + coords={'period': periods, 'build_period': periods}, + ) + result = optimize( + timesteps=ts(3), + carriers=[Carrier('Heat')], + effects=[Effect('cost', is_objective=True)], + ports=[ + Port( + 'Demand', + exports=[ + Flow('Heat', size=1, fixed_relative_profile=np.array([10, 10, 10])), + ], + ), + Port( + 'Grid', + imports=[ + Flow( + 'Heat', + size=Investment(10, 10, effects_per_size={'cost': capex_2d}), + ), + ], + ), + ], + periods=periods, + period_weights=[1, 1], + ) + assert_allclose(result.objective, 100.0, rtol=1e-4) + + def test_fixed_2d_installments(self, optimize): + """Proves: 2D fixed costs can spread across periods. + + Building in 2020: fixed cost 30 in 2020 + 30 in 2025. + Building in 2025: fixed cost 80 in 2025 only. + Mandatory. Build in 2020: total = 30+30 = 60. Objective = 60. + """ + _xfail_if_validate(optimize) + periods = [2020, 2025] + fixed_2d = xr.DataArray( + [[30.0, 0.0], [30.0, 80.0]], + dims=['period', 'build_period'], + coords={'period': periods, 'build_period': periods}, + ) + result = optimize( + timesteps=ts(3), + carriers=[Carrier('Heat')], + effects=[Effect('cost', is_objective=True)], + ports=[ + Port( + 'Demand', + exports=[ + Flow('Heat', size=1, fixed_relative_profile=np.array([10, 10, 10])), + ], + ), + Port( + 'Grid', + imports=[ + Flow( + 'Heat', + size=Investment(10, 10, effects_fixed={'cost': fixed_2d}), + ), + ], + ), + ], + periods=periods, + period_weights=[1, 1], + ) + assert_allclose(result.objective, 60.0, rtol=1e-4) diff --git a/tests/test_data.py b/tests/test_data.py index ccb9e6e..6576b53 100644 --- a/tests/test_data.py +++ b/tests/test_data.py @@ -1,11 +1,13 @@ from __future__ import annotations import numpy as np +import pandas as pd import pytest import xarray as xr from conftest import ts from fluxopt import Carrier, Converter, Dims, Effect, Flow, ModelData, Port, Storage, optimize +from fluxopt.model_data import _expand_once_effect class TestFlowsTable: @@ -301,3 +303,78 @@ def test_mismatched_coords_raises(self): bad_weights = xr.DataArray(np.ones(3), dims=['time'], coords={'time': [0, 1, 2]}) with pytest.raises(ValueError, match='does not match'): Dims(time=time, dt=dt, weights=bad_weights) + + +class TestExpandOnceEffect: + """Unit tests for _expand_once_effect diagonal expansion.""" + + period = pd.Index([2020, 2025, 2030]) + + def test_scalar(self): + result = _expand_once_effect(5.0, self.period) + expected = np.eye(3) * 5.0 + np.testing.assert_array_equal(result.values, expected) + assert list(result.dims) == ['period', 'build_period'] + + def test_list_input(self): + result = _expand_once_effect([1.0, 2.0, 3.0], self.period) + expected = np.diag([1.0, 2.0, 3.0]) + np.testing.assert_array_equal(result.values, expected) + + def test_1d_build_period(self): + da = xr.DataArray([10.0, 20.0, 30.0], dims=['build_period'], coords={'build_period': self.period}) + result = _expand_once_effect(da, self.period) + expected = np.diag([10.0, 20.0, 30.0]) + np.testing.assert_array_equal(result.values, expected) + + def test_1d_period_dim_treated_as_diagonal(self): + da = xr.DataArray([10.0, 20.0, 30.0], dims=['period'], coords={'period': self.period}) + result = _expand_once_effect(da, self.period) + expected = np.diag([10.0, 20.0, 30.0]) + np.testing.assert_array_equal(result.values, expected) + + def test_1d_reordered_coords_aligned(self): + """Coords in different order are aligned to model period.""" + da = xr.DataArray([30.0, 10.0, 20.0], dims=['build_period'], coords={'build_period': [2030, 2020, 2025]}) + result = _expand_once_effect(da, self.period) + # After alignment: [10.0, 20.0, 30.0] matching [2020, 2025, 2030] + expected = np.diag([10.0, 20.0, 30.0]) + np.testing.assert_array_equal(result.values, expected) + + def test_2d_passthrough(self): + data = np.array([[1, 2, 0], [0, 3, 4], [0, 0, 5]], dtype=float) + da = xr.DataArray( + data, dims=['period', 'build_period'], coords={'period': self.period, 'build_period': self.period} + ) + result = _expand_once_effect(da, self.period) + np.testing.assert_array_equal(result.values, data) + + def test_2d_reordered_aligned(self): + """2D input with swapped period order is reindexed.""" + rev = pd.Index([2030, 2025, 2020]) + data = np.array([[9, 8, 7], [6, 5, 4], [3, 2, 1]], dtype=float) + da = xr.DataArray(data, dims=['period', 'build_period'], coords={'period': rev, 'build_period': rev}) + result = _expand_once_effect(da, self.period) + # After reindex to [2020, 2025, 2030]: reversed rows and columns + expected = np.array([[1, 2, 3], [4, 5, 6], [7, 8, 9]], dtype=float) + np.testing.assert_array_equal(result.values, expected) + + def test_1d_mismatched_coords_raises(self): + da = xr.DataArray([1.0, 2.0], dims=['build_period'], coords={'build_period': [2020, 9999]}) + with pytest.raises(ValueError, match='do not match'): + _expand_once_effect(da, self.period) + + def test_2d_mismatched_coords_raises(self): + """2D input with coords not covering model periods raises.""" + da = xr.DataArray( + np.eye(2), + dims=['period', 'build_period'], + coords={'period': [2020, 9999], 'build_period': [2020, 9999]}, + ) + with pytest.raises(ValueError, match='do not fully cover'): + _expand_once_effect(da, self.period) + + def test_foreign_dim_raises(self): + da = xr.DataArray([1.0, 2.0, 3.0], dims=['time'], coords={'time': [0, 1, 2]}) + with pytest.raises(ValueError, match='unexpected dims'): + _expand_once_effect(da, self.period)