diff --git a/src/sysls/backtest/optimize.py b/src/sysls/backtest/optimize.py new file mode 100644 index 0000000..aafd7f4 --- /dev/null +++ b/src/sysls/backtest/optimize.py @@ -0,0 +1,449 @@ +"""Walk-forward analysis, grid search, and parameter optimization. + +Provides tools for systematic parameter optimization of trading strategies +using the vectorized backtester. All functions are pure computation -- +no async code, no I/O. + +Typical usage:: + + import numpy as np + from sysls.backtest.optimize import ( + ParameterGrid, + grid_search, + walk_forward, + ) + + prices = np.array([100.0, 102.0, 101.0, 105.0, 103.0, 107.0]) + + def my_signal(prices, fast=2, slow=5): + # simple moving-average crossover signal generator + ... + return signals + + param_grid = ParameterGrid({"fast": [2, 3], "slow": [5, 10]}) + result = grid_search(prices, my_signal, param_grid) + print(result.best_params, result.best_score) +""" + +from __future__ import annotations + +import itertools +import math +from typing import TYPE_CHECKING, Any + +import structlog +from pydantic import BaseModel, ConfigDict + +from sysls.backtest.metrics import BacktestResult +from sysls.backtest.vectorized import run_vectorized_backtest + +if TYPE_CHECKING: + from collections.abc import Callable, Iterator + + import numpy as np + +logger = structlog.get_logger(__name__) + +# --------------------------------------------------------------------------- +# ParameterGrid +# --------------------------------------------------------------------------- + + +class ParameterGrid: + """Generates all Cartesian-product combinations of parameter values. + + Takes a dictionary mapping parameter names to lists of candidate values + and yields every combination as a ``dict[str, Any]``. + + Args: + param_dict: Mapping of parameter names to lists of candidate values. + + Example:: + + grid = ParameterGrid({"fast": [2, 3], "slow": [5, 10]}) + for combo in grid: + print(combo) + # {"fast": 2, "slow": 5} + # {"fast": 2, "slow": 10} + # {"fast": 3, "slow": 5} + # {"fast": 3, "slow": 10} + """ + + def __init__(self, param_dict: dict[str, list[Any]]) -> None: + self._keys = list(param_dict.keys()) + self._values = [list(v) for v in param_dict.values()] + + def __iter__(self) -> Iterator[dict[str, Any]]: + """Yield each parameter combination as a dictionary.""" + if not self._keys: + yield {} + return + for combo in itertools.product(*self._values): + yield dict(zip(self._keys, combo, strict=True)) + + def __len__(self) -> int: + """Return the total number of parameter combinations.""" + if not self._keys: + return 1 + return math.prod(len(v) for v in self._values) + + +# --------------------------------------------------------------------------- +# Pydantic result models (frozen) +# --------------------------------------------------------------------------- + + +class GridSearchResult(BaseModel, frozen=True): + """Result of a grid search optimization. + + Attributes: + best_params: Parameter combination that produced the best score. + best_score: Value of the optimization metric for the best params. + all_results: List of ``(params, BacktestResult)`` for every + combination evaluated, sorted by score (best first). + """ + + model_config = ConfigDict(ser_json_inf_nan="constants") + + best_params: dict[str, Any] + best_score: float + all_results: list[tuple[dict[str, Any], BacktestResult]] + + +class WalkForwardSplit(BaseModel, frozen=True): + """Result for a single walk-forward split. + + Attributes: + split_index: Zero-based index of this split. + train_start: Start index of the training window (inclusive). + train_end: End index of the training window (exclusive). + oos_start: Start index of the out-of-sample window (inclusive). + oos_end: End index of the out-of-sample window (exclusive). + best_params: Best parameters found during in-sample optimization. + oos_result: Backtest result on the out-of-sample data. + """ + + split_index: int + train_start: int + train_end: int + oos_start: int + oos_end: int + best_params: dict[str, Any] + oos_result: BacktestResult + + +class WalkForwardResult(BaseModel, frozen=True): + """Aggregated result of a walk-forward analysis. + + Attributes: + splits: Per-split results with train/OOS boundaries and metrics. + combined_oos_equity: Concatenated out-of-sample equity curves + across all splits. + combined_metrics: Performance metrics computed over the combined + out-of-sample equity curve. + """ + + model_config = ConfigDict(ser_json_inf_nan="constants") + + splits: list[WalkForwardSplit] + combined_oos_equity: list[float] + combined_metrics: BacktestResult + + +# --------------------------------------------------------------------------- +# Grid search +# --------------------------------------------------------------------------- + + +def grid_search( + prices: np.ndarray, + signal_func: Callable[..., np.ndarray], + param_grid: ParameterGrid, + initial_capital: float = 100_000.0, + commission_rate: float = 0.0, + slippage_rate: float = 0.0, + metric: str = "sharpe_ratio", + periods_per_year: int = 252, +) -> GridSearchResult: + """Run a grid search over parameter combinations. + + Evaluates every combination in *param_grid* by calling *signal_func* + with the candidate parameters, running a vectorized backtest, and + ranking by the chosen *metric*. + + Args: + prices: 1-D array of asset prices. + signal_func: Callable with signature + ``signal_func(prices, **params) -> signals``. + param_grid: :class:`ParameterGrid` of candidate parameter values. + initial_capital: Starting capital for each backtest. + commission_rate: Commission rate per trade. + slippage_rate: Slippage rate per trade. + metric: Name of the :class:`BacktestResult` attribute to optimize. + For ``"max_drawdown"``, lower is better; for all other metrics, + higher is better. + periods_per_year: Annualization factor. + + Returns: + A :class:`GridSearchResult` with the best parameters, best score, + and all evaluated results sorted by score. + + Raises: + ValueError: If *param_grid* is empty or *metric* is not a valid + :class:`BacktestResult` attribute. + """ + import numpy as np + + # Validate metric name against BacktestResult fields. + if metric not in BacktestResult.model_fields: + raise ValueError( + f"Invalid metric {metric!r}. Must be one of: {sorted(BacktestResult.model_fields)}" + ) + + prices_arr = np.asarray(prices, dtype=np.float64) + all_results: list[tuple[dict[str, Any], BacktestResult]] = [] + + for params in param_grid: + signals = signal_func(prices_arr, **params) + result = run_vectorized_backtest( + prices_arr, + signals, + initial_capital=initial_capital, + commission_rate=commission_rate, + slippage_rate=slippage_rate, + periods_per_year=periods_per_year, + ) + all_results.append((params, result)) + logger.debug("grid_search.evaluated", params=params, score=getattr(result, metric)) + + # Sort: for max_drawdown lower is better, otherwise higher is better. + reverse = metric != "max_drawdown" + all_results.sort(key=lambda x: getattr(x[1], metric), reverse=reverse) + + best_params, best_result = all_results[0] + best_score = float(getattr(best_result, metric)) + + logger.info( + "grid_search.complete", + n_combos=len(all_results), + best_params=best_params, + best_score=best_score, + ) + + return GridSearchResult( + best_params=best_params, + best_score=best_score, + all_results=all_results, + ) + + +# --------------------------------------------------------------------------- +# Time-series cross-validation splits +# --------------------------------------------------------------------------- + + +class TimeSeriesSplit: + """Generate expanding-window time-series train/test splits. + + The training window always starts at index 0 and expands with each + split. The out-of-sample (test) window immediately follows the + training window. + + Args: + n_samples: Total number of data points. + n_splits: Number of train/test splits to generate. + train_ratio: Fraction of the total data allocated to training + in the *first* split. The training window grows with each + subsequent split. + """ + + def __init__( + self, + n_samples: int, + n_splits: int, + train_ratio: float = 0.7, + ) -> None: + if n_splits < 1: + raise ValueError(f"n_splits must be >= 1, got {n_splits}") + if not 0.0 < train_ratio < 1.0: + raise ValueError(f"train_ratio must be in (0, 1), got {train_ratio}") + self._n_samples = n_samples + self._n_splits = n_splits + self._train_ratio = train_ratio + + # First train window ends at train_ratio of total data. + # Remaining data is divided into n_splits equal OOS segments. + self._first_train_end = int(n_samples * train_ratio) + remaining = n_samples - self._first_train_end + self._oos_step = remaining // n_splits + + if self._oos_step < 1: + raise ValueError( + f"Data too short for {n_splits} splits with " + f"train_ratio={train_ratio}. Need at least " + f"{self._first_train_end + n_splits} samples, got {n_samples}." + ) + + def __iter__( + self, + ) -> Iterator[tuple[int, int, int, int]]: + """Yield ``(train_start, train_end, oos_start, oos_end)`` tuples.""" + for i in range(self._n_splits): + train_start = 0 + train_end = self._first_train_end + i * self._oos_step + oos_start = train_end + # Last split absorbs any remainder from integer division. + oos_end = self._n_samples if i == self._n_splits - 1 else oos_start + self._oos_step + yield (train_start, train_end, oos_start, oos_end) + + def __len__(self) -> int: + """Return the number of splits.""" + return self._n_splits + + +# --------------------------------------------------------------------------- +# Walk-forward analysis +# --------------------------------------------------------------------------- + + +def walk_forward( + prices: np.ndarray, + signal_func: Callable[..., np.ndarray], + param_grid: ParameterGrid, + n_splits: int = 5, + train_ratio: float = 0.7, + initial_capital: float = 100_000.0, + commission_rate: float = 0.0, + slippage_rate: float = 0.0, + metric: str = "sharpe_ratio", + periods_per_year: int = 252, +) -> WalkForwardResult: + """Run walk-forward analysis with expanding training windows. + + For each split the data is divided into a training (in-sample) window + and an out-of-sample (OOS) window. A grid search is performed on the + training data to find the best parameters, which are then used to + backtest the OOS window. The OOS equity curves are concatenated to + produce a combined performance estimate. + + Args: + prices: 1-D array of asset prices. + signal_func: Callable with signature + ``signal_func(prices, **params) -> signals``. + param_grid: :class:`ParameterGrid` of candidate parameter values. + n_splits: Number of expanding-window splits. + train_ratio: Fraction of total data for training in the first + split. + initial_capital: Starting capital for each backtest. + commission_rate: Commission rate per trade. + slippage_rate: Slippage rate per trade. + metric: :class:`BacktestResult` attribute to optimize. + periods_per_year: Annualization factor. + + Returns: + A :class:`WalkForwardResult` with per-split results, concatenated + OOS equity, and combined metrics. + + Raises: + ValueError: If *n_splits* < 1 or data is too short to split. + """ + import numpy as np + + from sysls.backtest.metrics import summarize_backtest + + prices_arr = np.asarray(prices, dtype=np.float64) + n_samples = prices_arr.size + + splitter = TimeSeriesSplit( + n_samples=n_samples, + n_splits=n_splits, + train_ratio=train_ratio, + ) + + splits: list[WalkForwardSplit] = [] + oos_equity_segments: list[np.ndarray] = [] + + for idx, (train_start, train_end, oos_start, oos_end) in enumerate(splitter): + train_prices = prices_arr[train_start:train_end] + oos_prices = prices_arr[oos_start:oos_end] + + # Grid search on training data to find best params. + train_result = grid_search( + train_prices, + signal_func, + param_grid, + initial_capital=initial_capital, + commission_rate=commission_rate, + slippage_rate=slippage_rate, + metric=metric, + periods_per_year=periods_per_year, + ) + + best_params = train_result.best_params + + # Backtest OOS data with the best params from training. + oos_signals = signal_func(oos_prices, **best_params) + oos_result = run_vectorized_backtest( + oos_prices, + oos_signals, + initial_capital=initial_capital, + commission_rate=commission_rate, + slippage_rate=slippage_rate, + periods_per_year=periods_per_year, + ) + + oos_equity_segments.append(np.array(oos_result.equity_curve, dtype=np.float64)) + + splits.append( + WalkForwardSplit( + split_index=idx, + train_start=train_start, + train_end=train_end, + oos_start=oos_start, + oos_end=oos_end, + best_params=best_params, + oos_result=oos_result, + ) + ) + + logger.debug( + "walk_forward.split_complete", + split=idx, + best_params=best_params, + oos_return=oos_result.total_return, + ) + + # Concatenate OOS equity curves, chaining each segment's end + # value as the next segment's starting capital. + combined_equity_list: list[float] = [] + current_capital = initial_capital + for segment in oos_equity_segments: + if segment.size == 0: + continue + # Scale segment so it starts at current_capital. + scale = current_capital / segment[0] if segment[0] != 0.0 else 1.0 + scaled = segment * scale + combined_equity_list.extend(scaled.tolist()) + current_capital = float(scaled[-1]) + + combined_equity_arr = np.array(combined_equity_list, dtype=np.float64) + + combined_metrics = summarize_backtest( + equity_curve=combined_equity_arr, + trades=[], # Individual trades not tracked across splits + initial_capital=initial_capital, + periods_per_year=periods_per_year, + ) + + logger.info( + "walk_forward.complete", + n_splits=n_splits, + combined_return=combined_metrics.total_return, + combined_sharpe=combined_metrics.sharpe_ratio, + ) + + return WalkForwardResult( + splits=splits, + combined_oos_equity=combined_equity_list, + combined_metrics=combined_metrics, + ) diff --git a/tests/backtest/test_optimize.py b/tests/backtest/test_optimize.py new file mode 100644 index 0000000..8c849e8 --- /dev/null +++ b/tests/backtest/test_optimize.py @@ -0,0 +1,630 @@ +"""Tests for sysls.backtest.optimize. + +Tests cover parameter grid generation, grid search optimization, +time-series cross-validation splits, and walk-forward analysis. +""" + +from __future__ import annotations + +import numpy as np +import pytest + +from sysls.backtest.metrics import BacktestResult + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _make_trending_prices(n: int = 100, start: float = 100.0) -> np.ndarray: + """Create a simple upward-trending price series for testing.""" + rng = np.random.default_rng(42) + returns = 0.001 + rng.normal(0, 0.01, n) + prices = start * np.cumprod(1 + returns) + return prices + + +def _simple_signal_func( + prices: np.ndarray, + *, + threshold: float = 0.0, +) -> np.ndarray: + """Signal function: long when return > threshold, else flat.""" + signals = np.zeros(len(prices), dtype=np.float64) + for i in range(1, len(prices)): + ret = (prices[i] - prices[i - 1]) / prices[i - 1] + signals[i] = 1.0 if ret > threshold else 0.0 + return signals + + +def _dual_param_signal( + prices: np.ndarray, + *, + fast: int = 2, + slow: int = 5, +) -> np.ndarray: + """MA crossover signal: long when fast MA > slow MA.""" + signals = np.zeros(len(prices), dtype=np.float64) + for i in range(slow, len(prices)): + fast_ma = np.mean(prices[max(0, i - fast + 1) : i + 1]) + slow_ma = np.mean(prices[max(0, i - slow + 1) : i + 1]) + signals[i] = 1.0 if fast_ma > slow_ma else -1.0 + return signals + + +# --------------------------------------------------------------------------- +# ParameterGrid tests +# --------------------------------------------------------------------------- + + +class TestParameterGrid: + """Tests for ParameterGrid.""" + + def test_basic_grid(self) -> None: + """Two-param grid produces correct Cartesian product.""" + from sysls.backtest.optimize import ParameterGrid + + grid = ParameterGrid({"fast": [2, 3], "slow": [5, 10]}) + combos = list(grid) + assert len(combos) == 4 + assert {"fast": 2, "slow": 5} in combos + assert {"fast": 2, "slow": 10} in combos + assert {"fast": 3, "slow": 5} in combos + assert {"fast": 3, "slow": 10} in combos + + def test_single_param(self) -> None: + """Single-parameter grid yields one dict per value.""" + from sysls.backtest.optimize import ParameterGrid + + grid = ParameterGrid({"threshold": [0.01, 0.02, 0.03]}) + combos = list(grid) + assert len(combos) == 3 + assert combos[0] == {"threshold": 0.01} + assert combos[1] == {"threshold": 0.02} + assert combos[2] == {"threshold": 0.03} + + def test_empty_grid(self) -> None: + """Grid with no parameters yields one empty dict.""" + from sysls.backtest.optimize import ParameterGrid + + grid = ParameterGrid({}) + combos = list(grid) + assert combos == [{}] + + def test_single_value_per_param(self) -> None: + """Grid with one value per param yields exactly one combo.""" + from sysls.backtest.optimize import ParameterGrid + + grid = ParameterGrid({"a": [1], "b": [2]}) + combos = list(grid) + assert combos == [{"a": 1, "b": 2}] + + def test_length(self) -> None: + """__len__ matches number of yielded combinations.""" + from sysls.backtest.optimize import ParameterGrid + + grid = ParameterGrid({"a": [1, 2, 3], "b": [4, 5]}) + assert len(grid) == 6 + assert len(grid) == len(list(grid)) + + def test_iteration_multiple_times(self) -> None: + """Grid can be iterated multiple times.""" + from sysls.backtest.optimize import ParameterGrid + + grid = ParameterGrid({"x": [1, 2]}) + first = list(grid) + second = list(grid) + assert first == second + + +# --------------------------------------------------------------------------- +# Pydantic model tests +# --------------------------------------------------------------------------- + + +class TestPydanticModels: + """Tests for GridSearchResult, WalkForwardSplit, WalkForwardResult.""" + + @staticmethod + def _make_backtest_result() -> BacktestResult: + """Create a minimal BacktestResult for model tests.""" + return BacktestResult( + equity_curve=[100_000.0, 101_000.0, 102_000.0], + returns=[0.01, 0.0099], + trades=[], + total_return=0.02, + sharpe_ratio=1.5, + sortino_ratio=2.0, + max_drawdown=0.01, + calmar_ratio=2.0, + annualized_return=0.15, + annualized_volatility=0.10, + win_rate=0.0, + profit_factor=0.0, + total_trades=0, + initial_capital=100_000.0, + final_equity=102_000.0, + ) + + def test_grid_search_result_construction(self) -> None: + """GridSearchResult can be constructed with valid data.""" + from sysls.backtest.optimize import GridSearchResult + + br = self._make_backtest_result() + gsr = GridSearchResult( + best_params={"fast": 2}, + best_score=1.5, + all_results=[({"fast": 2}, br)], + ) + assert gsr.best_params == {"fast": 2} + assert gsr.best_score == 1.5 + assert len(gsr.all_results) == 1 + + def test_grid_search_result_frozen(self) -> None: + """GridSearchResult is immutable.""" + from sysls.backtest.optimize import GridSearchResult + + br = self._make_backtest_result() + gsr = GridSearchResult( + best_params={"fast": 2}, + best_score=1.5, + all_results=[({"fast": 2}, br)], + ) + with pytest.raises(Exception): # noqa: B017 + gsr.best_score = 2.0 # type: ignore[misc] + + def test_walk_forward_split_construction(self) -> None: + """WalkForwardSplit can be constructed with valid data.""" + from sysls.backtest.optimize import WalkForwardSplit + + br = self._make_backtest_result() + wfs = WalkForwardSplit( + split_index=0, + train_start=0, + train_end=70, + oos_start=70, + oos_end=100, + best_params={"threshold": 0.01}, + oos_result=br, + ) + assert wfs.split_index == 0 + assert wfs.train_end == 70 + assert wfs.oos_start == 70 + + def test_walk_forward_result_construction(self) -> None: + """WalkForwardResult can be constructed with valid data.""" + from sysls.backtest.optimize import WalkForwardResult + + br = self._make_backtest_result() + wfr = WalkForwardResult( + splits=[], + combined_oos_equity=[100_000.0, 101_000.0], + combined_metrics=br, + ) + assert len(wfr.splits) == 0 + assert len(wfr.combined_oos_equity) == 2 + + def test_serialization_round_trip(self) -> None: + """Models survive JSON serialization round-trip.""" + from sysls.backtest.optimize import GridSearchResult + + br = self._make_backtest_result() + gsr = GridSearchResult( + best_params={"fast": 2, "slow": 5}, + best_score=1.5, + all_results=[({"fast": 2, "slow": 5}, br)], + ) + json_str = gsr.model_dump_json() + reconstructed = GridSearchResult.model_validate_json(json_str) + assert reconstructed.best_params == gsr.best_params + assert reconstructed.best_score == gsr.best_score + assert len(reconstructed.all_results) == len(gsr.all_results) + + +# --------------------------------------------------------------------------- +# grid_search tests +# --------------------------------------------------------------------------- + + +class TestGridSearch: + """Tests for grid_search.""" + + def test_basic_search(self) -> None: + """Grid search returns valid result with correct structure.""" + from sysls.backtest.optimize import GridSearchResult, ParameterGrid, grid_search + + prices = _make_trending_prices(50) + param_grid = ParameterGrid({"threshold": [0.0, 0.005, 0.01]}) + result = grid_search(prices, _simple_signal_func, param_grid) + + assert isinstance(result, GridSearchResult) + assert "threshold" in result.best_params + assert isinstance(result.best_score, float) + assert len(result.all_results) == 3 + + def test_best_params_highest_metric(self) -> None: + """Best params correspond to the highest metric value.""" + from sysls.backtest.optimize import ParameterGrid, grid_search + + prices = _make_trending_prices(50) + param_grid = ParameterGrid({"threshold": [0.0, 0.005, 0.01]}) + result = grid_search(prices, _simple_signal_func, param_grid, metric="sharpe_ratio") + + # Best should have highest sharpe among all results + all_sharpes = [r.sharpe_ratio for _, r in result.all_results] + assert result.best_score == all_sharpes[0] # first after sort + # Verify descending order + for i in range(len(all_sharpes) - 1): + assert all_sharpes[i] >= all_sharpes[i + 1] + + def test_max_drawdown_ascending(self) -> None: + """When metric is max_drawdown, lower is better.""" + from sysls.backtest.optimize import ParameterGrid, grid_search + + prices = _make_trending_prices(50) + param_grid = ParameterGrid({"threshold": [0.0, 0.005, 0.01]}) + result = grid_search(prices, _simple_signal_func, param_grid, metric="max_drawdown") + + # Sorted ascending: lowest drawdown first + all_dd = [r.max_drawdown for _, r in result.all_results] + for i in range(len(all_dd) - 1): + assert all_dd[i] <= all_dd[i + 1] + + def test_single_param_grid(self) -> None: + """Grid search works with a single parameter.""" + from sysls.backtest.optimize import ParameterGrid, grid_search + + prices = _make_trending_prices(50) + param_grid = ParameterGrid({"threshold": [0.005]}) + result = grid_search(prices, _simple_signal_func, param_grid) + + assert len(result.all_results) == 1 + assert result.best_params == {"threshold": 0.005} + + def test_all_results_populated(self) -> None: + """All parameter combinations appear in all_results.""" + from sysls.backtest.optimize import ParameterGrid, grid_search + + prices = _make_trending_prices(50) + param_grid = ParameterGrid({"threshold": [0.0, 0.005, 0.01, 0.02]}) + result = grid_search(prices, _simple_signal_func, param_grid) + + result_params = [p for p, _ in result.all_results] + for expected in [{"threshold": t} for t in [0.0, 0.005, 0.01, 0.02]]: + assert expected in result_params + + def test_invalid_metric_raises(self) -> None: + """Invalid metric name raises ValueError.""" + from sysls.backtest.optimize import ParameterGrid, grid_search + + prices = _make_trending_prices(50) + param_grid = ParameterGrid({"threshold": [0.0]}) + with pytest.raises(ValueError, match="Invalid metric"): + grid_search(prices, _simple_signal_func, param_grid, metric="not_a_metric") + + +# --------------------------------------------------------------------------- +# TimeSeriesSplit tests +# --------------------------------------------------------------------------- + + +class TestTimeSeriesSplit: + """Tests for TimeSeriesSplit.""" + + def test_basic_splits(self) -> None: + """Splits generate correct expanding windows.""" + from sysls.backtest.optimize import TimeSeriesSplit + + splitter = TimeSeriesSplit(n_samples=100, n_splits=3, train_ratio=0.7) + splits = list(splitter) + assert len(splits) == 3 + + # Each split is a 4-tuple of ints + for train_start, train_end, oos_start, oos_end in splits: + assert isinstance(train_start, int) + assert isinstance(train_end, int) + assert isinstance(oos_start, int) + assert isinstance(oos_end, int) + + def test_expanding_window(self) -> None: + """Training window starts at 0 and grows each split.""" + from sysls.backtest.optimize import TimeSeriesSplit + + splitter = TimeSeriesSplit(n_samples=100, n_splits=3, train_ratio=0.7) + splits = list(splitter) + + # All training windows start at 0 + for train_start, _, _, _ in splits: + assert train_start == 0 + + # Training end grows monotonically + train_ends = [te for _, te, _, _ in splits] + for i in range(len(train_ends) - 1): + assert train_ends[i] < train_ends[i + 1] + + def test_no_overlap(self) -> None: + """Training and OOS windows do not overlap.""" + from sysls.backtest.optimize import TimeSeriesSplit + + splitter = TimeSeriesSplit(n_samples=100, n_splits=3, train_ratio=0.7) + for _, train_end, oos_start, _ in splitter: + assert train_end == oos_start # contiguous, no gap/overlap + + def test_length(self) -> None: + """__len__ returns the configured number of splits.""" + from sysls.backtest.optimize import TimeSeriesSplit + + splitter = TimeSeriesSplit(n_samples=100, n_splits=5, train_ratio=0.5) + assert len(splitter) == 5 + + def test_single_split(self) -> None: + """A single split covers training + OOS correctly.""" + from sysls.backtest.optimize import TimeSeriesSplit + + splitter = TimeSeriesSplit(n_samples=100, n_splits=1, train_ratio=0.7) + splits = list(splitter) + assert len(splits) == 1 + + train_start, train_end, oos_start, oos_end = splits[0] + assert train_start == 0 + assert train_end == 70 + assert oos_start == 70 + assert oos_end == 100 + + def test_coverage(self) -> None: + """Last split's OOS extends to the end of the data.""" + from sysls.backtest.optimize import TimeSeriesSplit + + splitter = TimeSeriesSplit(n_samples=100, n_splits=3, train_ratio=0.7) + splits = list(splitter) + + # Last split's OOS end must be n_samples + _, _, _, last_oos_end = splits[-1] + assert last_oos_end == 100 + + # OOS windows are contiguous across splits + for i in range(len(splits) - 1): + _, _, _, oos_end_i = splits[i] + _, _, oos_start_next, _ = splits[i + 1] + assert oos_end_i == oos_start_next + + def test_too_short_data_raises(self) -> None: + """Data too short for requested splits raises ValueError.""" + from sysls.backtest.optimize import TimeSeriesSplit + + with pytest.raises(ValueError, match="Data too short"): + TimeSeriesSplit(n_samples=10, n_splits=10, train_ratio=0.9) + + +# --------------------------------------------------------------------------- +# walk_forward tests +# --------------------------------------------------------------------------- + + +class TestWalkForward: + """Tests for walk_forward.""" + + def test_basic_walk_forward(self) -> None: + """Walk-forward produces valid result with correct split count.""" + from sysls.backtest.optimize import ( + ParameterGrid, + WalkForwardResult, + walk_forward, + ) + + prices = _make_trending_prices(100) + param_grid = ParameterGrid({"threshold": [0.0, 0.005]}) + result = walk_forward( + prices, + _simple_signal_func, + param_grid, + n_splits=3, + train_ratio=0.7, + ) + + assert isinstance(result, WalkForwardResult) + assert len(result.splits) == 3 + assert len(result.combined_oos_equity) > 0 + + def test_oos_equity_concatenation(self) -> None: + """Combined OOS equity has entries from all splits.""" + from sysls.backtest.optimize import ParameterGrid, walk_forward + + prices = _make_trending_prices(100) + param_grid = ParameterGrid({"threshold": [0.0, 0.005]}) + result = walk_forward( + prices, + _simple_signal_func, + param_grid, + n_splits=3, + train_ratio=0.7, + ) + + # Total OOS equity points should equal sum of per-split equity lengths + total_oos_points = sum(len(s.oos_result.equity_curve) for s in result.splits) + assert len(result.combined_oos_equity) == total_oos_points + + def test_combined_metrics_populated(self) -> None: + """Combined metrics are computed over the full OOS equity.""" + from sysls.backtest.optimize import ParameterGrid, walk_forward + + prices = _make_trending_prices(100) + param_grid = ParameterGrid({"threshold": [0.0]}) + result = walk_forward( + prices, + _simple_signal_func, + param_grid, + n_splits=2, + train_ratio=0.7, + ) + + metrics = result.combined_metrics + assert metrics.initial_capital == 100_000.0 + assert len(metrics.equity_curve) == len(result.combined_oos_equity) + assert isinstance(metrics.sharpe_ratio, float) + + def test_split_params_may_differ(self) -> None: + """Different splits can select different best params.""" + from sysls.backtest.optimize import ParameterGrid, walk_forward + + # Use a signal that behaves differently on different data slices + prices = _make_trending_prices(200) + param_grid = ParameterGrid({"threshold": [0.0, 0.005, 0.01]}) + result = walk_forward( + prices, + _simple_signal_func, + param_grid, + n_splits=3, + train_ratio=0.5, + ) + + # Each split has best_params (they may be same or different) + for split in result.splits: + assert "threshold" in split.best_params + + def test_invalid_n_splits_raises(self) -> None: + """n_splits < 1 raises ValueError.""" + from sysls.backtest.optimize import ParameterGrid, walk_forward + + prices = _make_trending_prices(100) + param_grid = ParameterGrid({"threshold": [0.0]}) + with pytest.raises(ValueError, match="n_splits must be >= 1"): + walk_forward( + prices, + _simple_signal_func, + param_grid, + n_splits=0, + ) + + def test_single_split_walk_forward(self) -> None: + """Walk-forward with a single split still works.""" + from sysls.backtest.optimize import ParameterGrid, walk_forward + + prices = _make_trending_prices(100) + param_grid = ParameterGrid({"threshold": [0.0]}) + result = walk_forward( + prices, + _simple_signal_func, + param_grid, + n_splits=1, + train_ratio=0.7, + ) + assert len(result.splits) == 1 + assert result.splits[0].train_start == 0 + assert result.splits[0].oos_end == 100 + + +# --------------------------------------------------------------------------- +# Edge case tests +# --------------------------------------------------------------------------- + + +class TestEdgeCases: + """Edge case tests for the optimize module.""" + + def test_grid_search_with_multi_param(self) -> None: + """Grid search works with multiple parameters (MA crossover).""" + from sysls.backtest.optimize import ParameterGrid, grid_search + + prices = _make_trending_prices(100) + param_grid = ParameterGrid({"fast": [2, 3], "slow": [5, 10]}) + result = grid_search(prices, _dual_param_signal, param_grid) + + assert len(result.all_results) == 4 + assert "fast" in result.best_params + assert "slow" in result.best_params + + def test_grid_search_total_return_metric(self) -> None: + """Grid search can optimize on total_return metric.""" + from sysls.backtest.optimize import ParameterGrid, grid_search + + prices = _make_trending_prices(50) + param_grid = ParameterGrid({"threshold": [0.0, 0.005, 0.01]}) + result = grid_search(prices, _simple_signal_func, param_grid, metric="total_return") + + all_returns = [r.total_return for _, r in result.all_results] + # Sorted descending + for i in range(len(all_returns) - 1): + assert all_returns[i] >= all_returns[i + 1] + + def test_grid_search_with_costs(self) -> None: + """Grid search passes commission and slippage through correctly.""" + from sysls.backtest.optimize import ParameterGrid, grid_search + + prices = _make_trending_prices(50) + param_grid = ParameterGrid({"threshold": [0.0]}) + + result_no_cost = grid_search(prices, _simple_signal_func, param_grid) + result_with_cost = grid_search( + prices, + _simple_signal_func, + param_grid, + commission_rate=0.01, + slippage_rate=0.005, + ) + + # With costs, final equity should be lower + no_cost_equity = result_no_cost.all_results[0][1].final_equity + with_cost_equity = result_with_cost.all_results[0][1].final_equity + assert with_cost_equity < no_cost_equity + + def test_parameter_grid_three_params(self) -> None: + """Grid with 3 parameters produces correct product.""" + from sysls.backtest.optimize import ParameterGrid + + grid = ParameterGrid({"a": [1, 2], "b": [3, 4], "c": [5, 6]}) + assert len(grid) == 8 + combos = list(grid) + assert len(combos) == 8 + # Check one specific combo exists + assert {"a": 1, "b": 3, "c": 5} in combos + + def test_time_series_split_invalid_train_ratio(self) -> None: + """Invalid train_ratio raises ValueError.""" + from sysls.backtest.optimize import TimeSeriesSplit + + with pytest.raises(ValueError, match="train_ratio"): + TimeSeriesSplit(n_samples=100, n_splits=3, train_ratio=0.0) + with pytest.raises(ValueError, match="train_ratio"): + TimeSeriesSplit(n_samples=100, n_splits=3, train_ratio=1.0) + + def test_walk_forward_equity_continuity(self) -> None: + """Combined OOS equity is scaled so segments chain smoothly.""" + from sysls.backtest.optimize import ParameterGrid, walk_forward + + prices = _make_trending_prices(100) + param_grid = ParameterGrid({"threshold": [0.0]}) + result = walk_forward( + prices, + _simple_signal_func, + param_grid, + n_splits=2, + train_ratio=0.7, + ) + + # First point should equal initial_capital + assert result.combined_oos_equity[0] == pytest.approx(100_000.0) + + # At the boundary between splits, equity should be continuous + first_split_len = len(result.splits[0].oos_result.equity_curve) + if first_split_len < len(result.combined_oos_equity): + end_of_first = result.combined_oos_equity[first_split_len - 1] + start_of_second = result.combined_oos_equity[first_split_len] + # Should be approximately equal (scaled to chain) + assert start_of_second == pytest.approx(end_of_first, rel=0.01) + + def test_walk_forward_combined_metrics_has_correct_length(self) -> None: + """Combined metrics equity curve matches combined_oos_equity.""" + from sysls.backtest.optimize import ParameterGrid, walk_forward + + prices = _make_trending_prices(100) + param_grid = ParameterGrid({"threshold": [0.0, 0.005]}) + result = walk_forward( + prices, + _simple_signal_func, + param_grid, + n_splits=3, + train_ratio=0.7, + ) + + assert len(result.combined_metrics.equity_curve) == len(result.combined_oos_equity)