From da8fe313d7768618dc01847f3cc2ee18f4ff290d Mon Sep 17 00:00:00 2001 From: Hamid Arian Date: Tue, 16 Jun 2026 04:45:41 -0400 Subject: [PATCH 1/2] feat: extension architecture, dedup, logging, pde guard, dead-code removal, perf --- CHANGELOG.md | 68 ++ EXTENDING.md | 166 +++ NAMING_CANON_2.0.0.md | 208 ++++ RiskLabAI/__init__.py | 19 + RiskLabAI/backtest/bet_sizing.py | 47 +- RiskLabAI/backtest/strategy_risk.py | 5 +- RiskLabAI/cluster/clustering.py | 15 +- .../controller/data_structure_controller.py | 42 +- RiskLabAI/core/__init__.py | 67 ++ RiskLabAI/core/_builtins.py | 197 ++++ RiskLabAI/core/base.py | 193 ++++ RiskLabAI/core/registry.py | 393 +++++++ .../data/differentiation/differentiation.py | 28 +- RiskLabAI/data/labeling/labeling.py | 977 ++++++++---------- .../clustered_feature_importance_mda.py | 5 +- .../feature_importance_mda.py | 5 +- RiskLabAI/hpc/hpc.py | 13 +- RiskLabAI/pde/__init__.py | 14 + RiskLabAI/pde/solver.py | 16 +- RiskLabAI/utils/__init__.py | 9 +- RiskLabAI/utils/publication_plots.py | 18 +- RiskLabAI/utils/smoothing_average.py | 42 - RiskLabAI/utils/utilities_lopez.py | 121 --- test/core/test_base.py | 81 ++ test/core/test_builtin_parity.py | 149 +++ test/core/test_registry.py | 207 ++++ test/test_consolidation.py | 85 ++ test/test_performance.py | 163 +++ 28 files changed, 2596 insertions(+), 757 deletions(-) create mode 100644 EXTENDING.md create mode 100644 NAMING_CANON_2.0.0.md create mode 100644 RiskLabAI/core/__init__.py create mode 100644 RiskLabAI/core/_builtins.py create mode 100644 RiskLabAI/core/base.py create mode 100644 RiskLabAI/core/registry.py delete mode 100644 RiskLabAI/utils/smoothing_average.py delete mode 100644 RiskLabAI/utils/utilities_lopez.py create mode 100644 test/core/test_base.py create mode 100644 test/core/test_builtin_parity.py create mode 100644 test/core/test_registry.py create mode 100644 test/test_consolidation.py create mode 100644 test/test_performance.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 13f9664..f6e2e91 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,74 @@ Format: [Keep a Changelog](https://keepachangelog.com/en/1.1.0/); versioning: [S ## [Unreleased] +### Added +- `RiskLabAI.core`: a non-breaking extension layer that makes the library easier + to grow with new models. + - `core.registry.Registry`: a reusable, case-insensitive component registry + with lazy (import-path) registration, aliases, duplicate protection, optional + kwarg filtering, and helpful "not found" errors. Generalises the three + hand-written `name -> class` factories (`bars_initializer`, + `cross_validator_factory`, `feature_importance_factory`). + - `core.base`: base interfaces per model family — re-exports the existing + `AbstractBars`, `CrossValidator`, `FeatureImportanceStrategy` (lazily), an + `Estimator` structural `Protocol`, and new optional contracts + `BaseLabeler` / `BaseBetSizer` / `BasePortfolioOptimizer` for the + free-function families. + - Per-family registries (`BARS`, `CROSS_VALIDATORS`, `FEATURE_IMPORTANCE`, + `LABELERS`, `BET_SIZERS`, `PORTFOLIO_OPTIMIZERS`) pre-populated with the + built-in catalogue, plus `list_components()` / `get_registry()` for + discovery. + - `EXTENDING.md`: a step-by-step guide with worked examples for adding a new + model. Existing factories and public APIs are unchanged; importing + `RiskLabAI.core` pulls in no heavy dependencies. + +### Changed +- Removed duplicated code by single-sourcing two helpers (no public API change): + - `data.labeling` no longer defines its own copies of the parallel-processing + helpers (`lin_parts`, `process_jobs`, `expand_call`, `report_progress`). + They are re-exported from the canonical `RiskLabAI.hpc` (`lin_parts` maps to + `hpc.linear_partitions`); the names remain importable from `data.labeling`. + The dead duplicate's `num_threads=24` default is gone (the canonical default + is `-1` = all cores); the copies were unused inside the package. + - `cluster.covariance_to_correlation` now delegates to the canonical + `data.denoise.cov_to_corr`. Output is identical to floating-point precision + for valid covariance matrices (verified over random inputs; the canonical + version adds zero-std and exact-diagonal safeguards). +- Note: the two `sharpe_ratio` definitions were deliberately left separate. + `backtest_statistics.sharpe_ratio` is numba-jitted and array-only (ddof=0), + while `backtest_overfitting_simulation`'s relies on pandas `Series.std()` + (ddof=1) in its rank-correlation path — unifying them would change results. +- Replaced library `print()` calls with the standard `logging` module across + `controller.data_structure_controller`, `hpc`, `backtest.strategy_risk`, + `features.feature_importance` (MDA/clustered MDA), `pde.solver`, and + `utils.publication_plots` (errors -> `logger.error`, recoverable issues -> + `logger.warning`, progress -> `logger.info`/`debug`). The `RiskLabAI` logger + gets a `NullHandler`, so the library is silent by default; configure logging + (e.g. `logging.basicConfig(level=logging.INFO)`) to see output. Return values + are unchanged. +- `pde`: importing `RiskLabAI.pde` without PyTorch now raises a clear, actionable + `ImportError` pointing to `pip install 'RiskLabAI[pde]'`, instead of a bare + `ModuleNotFoundError: No module named 'torch'`. The base install remains + torch-free (`import RiskLabAI` never imports the sub-package). + +### Performance +- Vectorized three O(n^2) hot paths; outputs are unchanged (locked by + `test/test_performance.py`, which checks each against a brute-force reference): + - `data.differentiation.fractional_difference_std`: the per-row expanding + weighted sum is now a single causal convolution (`np.convolve`) per column. + - `backtest.bet_sizing.mpAvgActiveSignals`: the per-timepoint active-signal + average is now prefix sums + `searchsorted` (interval-stabbing sweep), + turning an O(n*m) double scan into O((n+m) log n) — ~1500x on a 3k-signal + benchmark. + - `data.labeling.triple_barrier`: per-event pandas label slicing replaced + with positional numpy indexing (~70x on 50k closes / 2k events). + +### Removed +- Deleted dead modules `utils/utilities_lopez.py` (unreferenced) and + `utils/smoothing_average.py` (a duplicate of `utils.ewma`). The public + `compute_exponential_weighted_moving_average` name is unaffected — it remains + available from `RiskLabAI.utils` as an alias of the canonical `ewma`. + ### Fixed - `features.feature_importance` (MDA): feature shuffling mutated a column view in place, which raises `ValueError: array is read-only` under pandas diff --git a/EXTENDING.md b/EXTENDING.md new file mode 100644 index 0000000..b4927b5 --- /dev/null +++ b/EXTENDING.md @@ -0,0 +1,166 @@ +# Extending RiskLabAI + +This guide explains how to add a **new model** to RiskLabAI — for example, an +extension, alternative, or improvement of a method from López de Prado's books, +or an entirely novel idea — so that it plugs into the library cleanly and is +discoverable like the built-in components. + +The machinery lives in `RiskLabAI.core` and is designed around three ideas: + +1. **Base interfaces** (`RiskLabAI.core.base`) define the contract for each + model family. +2. **A component registry** (`RiskLabAI.core.registry.Registry`) maps a name to + a component so it can be discovered and constructed by string key. +3. **Per-family registries** (`RiskLabAI.core`) hold the built-in catalogue and + are the place new models register themselves. + +Nothing here is required to *use* the library — the existing functions, classes, +and factories are unchanged. This is the recommended path for *growing* it. + +--- + +## The model families + +| Family | Registry | Base interface | Built-ins today | +|---|---|---|---| +| Bars | `core.BARS` | `core.base.AbstractBars` | standard / time / imbalance / run bars | +| Cross-validation | `core.CROSS_VALIDATORS` | `core.base.CrossValidator` | KFold, WalkForward, PurgedKFold, CPCV (+ bagged, adaptive) | +| Feature importance | `core.FEATURE_IMPORTANCE` | `core.base.FeatureImportanceStrategy` | MDI, ClusteredMDI, MDA, ClusteredMDA, SFI | +| Labeling | `core.LABELERS` | `core.base.BaseLabeler` | *(extension point — free functions today)* | +| Bet sizing | `core.BET_SIZERS` | `core.base.BaseBetSizer` | *(extension point)* | +| Portfolio optimization | `core.PORTFOLIO_OPTIMIZERS` | `core.base.BasePortfolioOptimizer` | *(extension point)* | + +Discover everything at runtime: + +```python +import RiskLabAI.core as core + +core.list_components() # {family: [keys...]} +core.CROSS_VALIDATORS.available() # ['adaptivecombinatorialpurged', 'kfold', ...] +"purgedkfold" in core.CROSS_VALIDATORS +``` + +Construction is by name (case-insensitive, aliases supported): + +```python +cv = core.CROSS_VALIDATORS.create("kfold", n_splits=5) +``` + +Importing `RiskLabAI.core` is cheap: every built-in is registered *lazily*, so +heavy dependencies (pandas, numba, scikit-learn, torch) are only imported when a +component is actually created. + +--- + +## Worked example 1 — a new bar type + +A new bar type is a subclass of `AbstractBars`. Register it and it becomes +constructible by name alongside the built-ins. + +```python +from typing import Any, Iterable, List + +from RiskLabAI.core import BARS +from RiskLabAI.core.base import AbstractBars + + +@BARS.register("range_bars", aliases=("ohlc_range",), + metadata={"family": "bars", "author": "your_paper_2026"}) +class RangeBars(AbstractBars): + """Sample a new bar whenever (high - low) exceeds a fixed range.""" + + def __init__(self, threshold: float = 1.0): + super().__init__(bar_type="range") + self.threshold = threshold + + def _bar_construction_condition(self, threshold: float) -> bool: + return (self.high_price - self.low_price) >= self.threshold + + def construct_bars_from_data(self, data: Iterable) -> List[List[Any]]: + bars = [] + for date_time, price, volume in data: + tick_rule = self._tick_rule(price) + self.update_base_fields(price, tick_rule, volume) + if self._bar_construction_condition(self.threshold): + bars.append(self._construct_next_bar( + date_time, self.tick_counter, price, + self.high_price, self.low_price, self.threshold, + )) + self._reset_cached_fields() + self.tick_counter += 1 + return bars + + +bars = BARS.create("range_bars", threshold=2.5) +``` + +--- + +## Worked example 2 — a new labeler + +Labeling is currently exposed as free functions, which keep working. A *new* +class-based labeler should implement `BaseLabeler` and register in `LABELERS`: + +```python +import pandas as pd + +from RiskLabAI.core import LABELERS +from RiskLabAI.core.base import BaseLabeler + + +@LABELERS.register("fixed_horizon", metadata={"family": "labeling"}) +class FixedHorizonLabeler(BaseLabeler): + """Label the sign of the return over a fixed number of bars.""" + + def __init__(self, horizon: int = 5): + self.horizon = horizon + + def label(self, prices: pd.Series, events=None, **kwargs) -> pd.DataFrame: + forward_return = prices.shift(-self.horizon) / prices - 1.0 + return pd.DataFrame({"label": forward_return.apply(_sign)}) + + +def _sign(x: float) -> int: + if pd.isna(x): + return 0 + return int(x > 0) - int(x < 0) + + +labeler = LABELERS.create("fixed_horizon", horizon=10) +labels = labeler.label(my_price_series) +``` + +The same pattern applies to bet sizers (`BaseBetSizer` + `BET_SIZERS`) and +portfolio optimizers (`BasePortfolioOptimizer` + `PORTFOLIO_OPTIMIZERS`). + +--- + +## Conventions and checklist + +When you add a new model: + +- [ ] **Subclass the family's base interface** (or, for a sklearn-style model, + satisfy the `core.base.Estimator` protocol — `fit`/`predict`). +- [ ] **Register it** in the matching registry with a clear, snake_case key. + Use `metadata={...}` to record the family, an AFML page, or the paper it + comes from. +- [ ] **Keep the public API stable.** Adding new names is always fine; renaming + or removing existing public names is a breaking change and needs a major + version bump (see `CHANGELOG.md` and `CLAUDE.md`). +- [ ] **Write tests** with hand-computed numeric assertions (the existing suite + sets the bar — bar OHLC values, exact HRP weights, purge boundaries). +- [ ] **Cross-language parity.** If this model also belongs in `RiskLabAI.jl`, + mirror the public name and note any deliberate divergence (see the + `PARITY.md` discipline in the improvement plan). + +## Why a registry instead of editing a central `dict`? + +The library previously hand-wrote a `name -> class` mapping in three places +(`controller/bars_initializer.py`, +`backtest/validation/cross_validator_factory.py`, +`features/feature_importance/feature_importance_factory.py`). A registry lets a +new model register itself *without editing a central file* — the open/closed +principle — and makes the full catalogue discoverable and testable. The existing +factories still work; `test/core/test_builtin_parity.py` asserts the registries +and factories stay in sync so they cannot silently drift apart. +``` diff --git a/NAMING_CANON_2.0.0.md b/NAMING_CANON_2.0.0.md new file mode 100644 index 0000000..b1879cc --- /dev/null +++ b/NAMING_CANON_2.0.0.md @@ -0,0 +1,208 @@ +# RiskLabAI.py 2.0.0 — Naming Canon (proposal) + +Status: **proposal, awaiting approval.** No code changes yet. This is the +breaking API cleanup from `IMPROVEMENT_PLAN.md` Phase 3, scoped to Python and +written so that **no existing user breaks on upgrade** — every renamed name +keeps working (with a `DeprecationWarning`) for one minor cycle before removal. + +Governance (`CLAUDE.md`): "never break the public API without a major bump and a +deprecation note." This document is that note. It needs sign-off before any +code lands, because it changes public names. + +--- + +## 1. The convention + +After 2.0.0, the public API follows standard Python naming (PEP 8): + +- **Functions and methods**: `snake_case`. +- **Classes**: `CapWords` (already true across the package — no class renames + except two typo/clarity fixes below). +- **Constants**: `UPPER_SNAKE_CASE`, ASCII identifiers only. +- **Parameters**: `snake_case`. + +The package is already ~95% compliant. The work is a small, concentrated set of +legacy names — chiefly the `bet_sizing` "camelCase island" — plus two misnamed +classes and three non-ASCII constants. + +--- + +## 2. What changes + +### 2.1 `backtest.bet_sizing` — functions (the camelCase island) + +| Current (public) | New (2.0.0) | Notes | +|---|---|---| +| `avgActiveSignals` | `avg_active_signals` | | +| `mpAvgActiveSignals` | `mp_avg_active_signals` | worker for the above | +| `discreteSignal` | `discrete_signal` | | +| `Signal` | `generate_signal` | AFML Snippet 10.1; verb-form name | +| `betSize` | `bet_size_sigmoid` | the sigmoid bet-size `x / sqrt(w + x^2)` | +| `getW` | `compute_sigmoid_width` | implied width `w` from divergence/size | +| `TPos` | `target_position` | | +| `inversePrice` | `inverse_price` | | +| `limitPrice` | `limit_price` | | + +These are exported at package level via `RiskLabAI.backtest`, so the aliases +(Section 3) live there too. `probability_bet_size`, `average_bet_sizes`, and +`strategy_bet_sizing` are already snake_case and unchanged. + +### 2.2 Parameters (same functions) + +Renamed to snake_case as part of the function change. Callers using these as +**keyword arguments** must update; positional callers are unaffected. + +| Function | Current param | New param | +|---|---|---| +| `target_position` (`TPos`) | `acctualPrice` | `actual_price` (also fixes the typo) | +| `target_position` | `maximumPositionSize` | `maximum_position_size` | +| `limit_price` (`limitPrice`) | `targetPositionSize`, `cPosition`, `maximumPositionSize` | `target_position_size`, `current_position`, `maximum_position_size` | +| `discrete_signal` (`discreteSignal`) | `stepSize` | `step_size` | +| `generate_signal` (`Signal`) | `stepSize`, `nClasses`, `nThreads` | `step_size`, `n_classes`, `n_threads` | + +(Parameter renames cannot carry a runtime deprecation shim cleanly, so they are +documented here and called out prominently in the 2.0.0 release notes. This is +the main reason the change is a *major* bump rather than a minor one.) + +### 2.3 Classes — two fixes + +| Current | New | Reason | +|---|---|---| +| `pde.FBSNNolver` | `FBSNNSolver` | typo (missing `S`) | +| `optimization.MyPipeline` | `SampleWeightedPipeline` | placeholder name; describes what it is | + +### 2.4 Constants — ASCII identifiers + +In `utils.constants` (re-exported from `RiskLabAI.utils`): + +| Current identifier | New identifier | +|---|---| +| `CUMULATIVE_θ` | `CUMULATIVE_THETA` | +| `CUMULATIVE_BUY_θ` | `CUMULATIVE_BUY_THETA` | +| `CUMULATIVE_SELL_θ` | `CUMULATIVE_SELL_THETA` | + +**Only the Python identifier changes.** The string *values* (`"Cumulative θ"`, +etc.) are used as internal dict keys in the bar-statistics machinery; leaving +the values untouched means no stored data, serialized output, or internal +lookup changes — purely the name you import. (If we ever want ASCII values too, +that is a separate, data-affecting decision.) + +--- + +## 3. How nothing breaks: the deprecation shims + +Every renamed name stays importable and callable throughout the 2.0.x series, +emitting a `DeprecationWarning` that names the replacement. Removal happens no +earlier than **2.1.0** (or 3.0.0), announced in the changelog. + +Three mechanisms, by kind: + +**Functions** — a tiny decorator keeps the old name as a thin wrapper: + +```python +# RiskLabAI/_deprecation.py (new, ~20 lines) +import functools, warnings + +def deprecated_alias(new_func, old_name, *, removed_in): + @functools.wraps(new_func) + def wrapper(*args, **kwargs): + warnings.warn( + f"{old_name}() is deprecated and will be removed in " + f"{removed_in}; use {new_func.__name__}() instead.", + DeprecationWarning, stacklevel=2, + ) + return new_func(*args, **kwargs) + wrapper.__name__ = old_name + return wrapper +``` + +```python +# in bet_sizing.py, after the renamed definitions +avgActiveSignals = deprecated_alias(avg_active_signals, "avgActiveSignals", removed_in="2.1.0") +Signal = deprecated_alias(generate_signal, "Signal", removed_in="2.1.0") +# ... one line per renamed function +``` + +**Classes** — subclass with a warning in `__init__`: + +```python +class FBSNNolver(FBSNNSolver): + def __init__(self, *args, **kwargs): + warnings.warn("FBSNNolver is deprecated; use FBSNNSolver.", + DeprecationWarning, stacklevel=2) + super().__init__(*args, **kwargs) +``` + +**Constants** — module-level `__getattr__` (PEP 562) on `utils/__init__.py` +warns on access and returns the new value: + +```python +_DEPRECATED_CONSTANTS = {"CUMULATIVE_θ": "CUMULATIVE_THETA", ...} +def __getattr__(name): + if name in _DEPRECATED_CONSTANTS: + new = _DEPRECATED_CONSTANTS[name] + warnings.warn(f"{name} is deprecated; use {new}.", DeprecationWarning, stacklevel=2) + return globals()[new] + # ... (existing lazy-plotting __getattr__ logic merges in here) +``` + +All old names also stay listed in the relevant `__all__` during 2.0.x so +`from ... import *` keeps working. + +--- + +## 4. Tests, notebooks, parity + +- **Tests**: update the suite to call the new names. Add one test that asserts + each deprecated alias still works **and** raises `DeprecationWarning` + (`pytest.warns(DeprecationWarning)`), so the shims are guaranteed intact until + we intentionally remove them. +- **Notebooks** (`Notebooks.py`): update usages to new names; the aliases mean + old notebooks still run (with warnings) until re-pinned. +- **`core` registry / `EXTENDING.md`**: unaffected (those already use + snake_case and the registry keys are independent strings). +- **`PARITY.md`** (to be created with the Julia work): the shared py/jl name + table should use these canonical names as the Python column. +- **`style_guide.md`**: update so it matches the code it governs (the audit + noted it currently contradicts itself). + +## 5. Related, optional, non-breaking hygiene (can ride along or stay separate) + +- Replace `from RiskLabAI.utils.constants import *` in the bars stack with + explicit imports (removes star-imports without changing the public API). +- The pass-through `*Controller` classes vs. factories question (architecture, + not naming) — leave to a separate decision. + +These are not required for the naming canon and carry no deprecation burden. + +--- + +## 6. Release plan + +1. Land the current non-breaking work as **1.1.0** first (extension + architecture, dedup, logging, pde/dead-code, performance). The canon builds + on a clean, green main. +2. Branch `feat/api-canon-2.0`. Implement renames + `_deprecation.py` + shims + + updated tests in one PR. CI must be green (including the new + `pytest.warns(DeprecationWarning)` tests). +3. Bump `pyproject.toml` to `2.0.0`; CHANGELOG gets a **`### Changed (BREAKING)`** + section listing every rename (old → new) and the parameter changes, plus a + **`### Deprecated`** section noting the aliases and their `removed_in` + target. +4. Update `Notebooks.py` to the new names (separate PR is fine; aliases keep + them working meanwhile). +5. Tag `v2.0.0`; release via the existing trusted-publishing flow. +6. One minor cycle later (2.1.0): drop the aliases and the `θ` identifiers; + CHANGELOG `### Removed`. + +## 7. Decisions needed from Prof. Arian + +1. **New names**: approve the table in §2 (especially the judgment calls: + `Signal`→`generate_signal`, `betSize`→`bet_size_sigmoid`, + `getW`→`compute_sigmoid_width`, `MyPipeline`→`SampleWeightedPipeline`). +2. **Deprecation window**: remove aliases in **2.1.0** (recommended) or keep + them through 3.0.0 (longer grace, more clutter)? +3. **Constant values**: keep the `"Cumulative θ"` string values as-is + (recommended — no data impact) or also ASCII-ify them later? +4. **Scope**: rename-only in 2.0.0 (recommended), or fold in the §5 hygiene + items too? diff --git a/RiskLabAI/__init__.py b/RiskLabAI/__init__.py index 8e50c34..4b15392 100644 --- a/RiskLabAI/__init__.py +++ b/RiskLabAI/__init__.py @@ -12,6 +12,10 @@ advanced cross-validation (PurgedKFold, CPCV). cluster Algorithms for portfolio clustering (ONC). +core + The extension layer: a component registry, base interfaces per model + family, and a built-in catalogue. Start here to add a new model. See + ``EXTENDING.md``. controller High-level controllers for bar generation and data processing. data @@ -30,14 +34,29 @@ Implementation of a Deep BSDE solver for PDEs. utils Common helper functions, constants, and plotting utilities. + +Logging +------- +RiskLabAI uses the standard ``logging`` module under the ``"RiskLabAI"`` logger +and is silent by default (a ``NullHandler`` is attached). To see progress and +diagnostics, configure logging in your application, e.g.:: + + import logging + logging.basicConfig(level=logging.INFO) """ +import logging as _logging from importlib import import_module +# Attach a NullHandler so the library never emits "No handlers could be found" +# and stays silent unless the application opts in by configuring logging. +_logging.getLogger(__name__).addHandler(_logging.NullHandler()) + _SUBMODULES = frozenset({ "backtest", "cluster", "controller", + "core", "data", "ensemble", "features", diff --git a/RiskLabAI/backtest/bet_sizing.py b/RiskLabAI/backtest/bet_sizing.py index 4d79a35..cfaf69c 100644 --- a/RiskLabAI/backtest/bet_sizing.py +++ b/RiskLabAI/backtest/bet_sizing.py @@ -215,18 +215,41 @@ def mpAvgActiveSignals( pd.Series Series indexed by `loc` (timestamp) with the average signal value. """ - out = pd.Series() - for loc in molecule: - # Keep signal that contain loc - signal_ = (signals.index.values <= loc) & ( - (loc < signals["t1"]) | pd.isnull(signals["t1"]) - ) - act = signals[signal_].index - if len(act) > 0: - out[loc] = signals.loc[act, "signal"].mean() - else: - out[loc] = 0 # no signals active at this time - return out + # A signal i is active at `loc` iff start_i <= loc < t1_i. The average of + # active signals therefore equals (sum of `signal` over signals started by + # `loc`) minus (sum over signals ended by `loc`), divided by the analogous + # active count. Prefix sums + searchsorted compute this for every `loc` in + # O((n + m) log n) instead of the O(n * m) double scan, with identical + # values (verified in test/test_performance.py). + molecule_list = list(molecule) + if len(signals) == 0 or len(molecule_list) == 0: + return pd.Series(0.0, index=molecule_list) + + molecule_array = np.asarray(molecule_list) + signal_values = signals["signal"].to_numpy(dtype=float) + + starts = signals.index.values + start_order = np.argsort(starts, kind="mergesort") + sorted_starts = starts[start_order] + cum_signal_start = np.concatenate(([0.0], np.cumsum(signal_values[start_order]))) + cum_count_start = np.arange(len(sorted_starts) + 1) + + finite = ~pd.isnull(signals["t1"]).to_numpy() + end_times = signals["t1"].to_numpy()[finite] + end_order = np.argsort(end_times, kind="mergesort") + sorted_ends = end_times[end_order] + cum_signal_end = np.concatenate(([0.0], np.cumsum(signal_values[finite][end_order]))) + cum_count_end = np.arange(len(sorted_ends) + 1) + + started = np.searchsorted(sorted_starts, molecule_array, side="right") + ended = np.searchsorted(sorted_ends, molecule_array, side="right") + active_signal = cum_signal_start[started] - cum_signal_end[ended] + active_count = cum_count_start[started] - cum_count_end[ended] + + with np.errstate(invalid="ignore", divide="ignore"): + averages = np.where(active_count > 0, active_signal / active_count, 0.0) + + return pd.Series(averages, index=molecule_list) def discreteSignal(signal: pd.Series, stepSize: float) -> pd.Series: diff --git a/RiskLabAI/backtest/strategy_risk.py b/RiskLabAI/backtest/strategy_risk.py index 1a72169..d9754f2 100644 --- a/RiskLabAI/backtest/strategy_risk.py +++ b/RiskLabAI/backtest/strategy_risk.py @@ -3,12 +3,15 @@ precision, binomial Sharpe ratio, and probability of failure. """ +import logging from typing import Tuple import numpy as np import scipy.stats as ss import sympy from sympy import symbols, factor +logger = logging.getLogger(__name__) + def sharpe_ratio_trials(p: float, n_run: int) -> Tuple[float, float, float]: r""" Simulate binomial trials to estimate mean, std dev, and Sharpe ratio. @@ -366,5 +369,5 @@ def calculate_strategy_risk( returns, frequency, target_sharpe_ratio ) - print(f"Probability that strategy will fail: {probability_fail:.2%}") + logger.info("Probability that strategy will fail: %.2f%%", probability_fail * 100) return probability_fail \ No newline at end of file diff --git a/RiskLabAI/cluster/clustering.py b/RiskLabAI/cluster/clustering.py index 4b87a81..5d6bc93 100644 --- a/RiskLabAI/cluster/clustering.py +++ b/RiskLabAI/cluster/clustering.py @@ -35,14 +35,13 @@ def covariance_to_correlation(covariance: np.ndarray) -> np.ndarray: np.ndarray The corresponding correlation matrix. """ - std = np.sqrt(np.diag(covariance)) - correlation = covariance / np.outer(std, std) - - # Handle numerical errors - correlation[correlation < -1] = -1.0 - correlation[correlation > 1] = 1.0 - - return correlation + # Single source of truth: RiskLabAI.data.denoise.cov_to_corr implements the + # same conversion (Snippet 2.3) with added zero-std and diagonal safeguards. + # Output is identical to floating-point precision for valid covariance + # matrices. Imported locally to avoid any import cycle at module load. + from RiskLabAI.data.denoise.denoising import cov_to_corr + + return cov_to_corr(covariance) def cluster_k_means_base( correlation: pd.DataFrame, diff --git a/RiskLabAI/controller/data_structure_controller.py b/RiskLabAI/controller/data_structure_controller.py index 19e0178..9113690 100644 --- a/RiskLabAI/controller/data_structure_controller.py +++ b/RiskLabAI/controller/data_structure_controller.py @@ -5,6 +5,8 @@ uses the BarsInitializerController to construct bars based on a specified method. """ +import logging + import pandas as pd import numpy as np from typing import Iterable, Optional, Generator, Union, Dict, Any, List @@ -18,6 +20,8 @@ CUMULATIVE_TICKS, CUMULATIVE_DOLLAR, THRESHOLD ) +logger = logging.getLogger(__name__) + # Define the bar column schema BAR_COLUMNS = [ DATE_TIME, TICK_NUMBER, OPEN_PRICE, HIGH_PRICE, LOW_PRICE, CLOSE_PRICE, @@ -52,16 +56,20 @@ def handle_input_command( try: initializer_method = self.bars_initializer.method_name_to_method[method_name] except KeyError: - print(f"Error: Bar method '{method_name}' not found.") valid_methods = list(self.bars_initializer.method_name_to_method.keys()) - print(f"Valid methods are: {valid_methods}") + logger.error( + "Bar method '%s' not found. Valid methods are: %s", + method_name, valid_methods, + ) return pd.DataFrame(columns=BAR_COLUMNS) try: bar_generator: AbstractBars = initializer_method(**method_arguments) except TypeError as e: - print(f"Error initializing bar method '{method_name}' with arguments {method_arguments}.") - print(f"TypeError: {e}") + logger.error( + "Error initializing bar method '%s' with arguments %s: %s", + method_name, method_arguments, e, + ) return pd.DataFrame(columns=BAR_COLUMNS) # 2. Get the correct batch generator @@ -75,7 +83,7 @@ def handle_input_command( all_bars: List[List[Any]] = [] # 3. Process data in batches - print("Processing data in batches...") + logger.info("Processing data in batches...") try: for data_batch in data_generator: # We assume data is [datetime, price, volume] @@ -83,22 +91,24 @@ def handle_input_command( bars = bar_generator.construct_bars_from_data(data=data_batch.values) all_bars.extend(bars) except Exception as e: - print(f"Error during bar construction: {e}") - print("Returning DataFrame with bars constructed so far.") + logger.warning( + "Error during bar construction: %s. Returning DataFrame with " + "bars constructed so far.", e, + ) # Continue to return whatever was processed - - print(f"Done. Constructed {len(all_bars)} bars.") + + logger.info("Done. Constructed %d bars.", len(all_bars)) # 4. Create final DataFrame bars_df = pd.DataFrame(all_bars, columns=BAR_COLUMNS) if output_path: - print(f"Saving bars to {output_path}...") + logger.info("Saving bars to %s...", output_path) try: bars_df.to_csv(output_path, index=False) - print("Save complete.") + logger.info("Save complete.") except Exception as e: - print(f"Error saving file to {output_path}: {e}") + logger.error("Error saving file to %s: %s", output_path, e) return bars_df @@ -130,13 +140,15 @@ def read_batches_from_string( ): yield batch except FileNotFoundError: - print(f"Error: File not found at {input_path}") + logger.error("File not found at %s", input_path) return except pd.errors.ParserError as e: - print(f"Error parsing CSV file {input_path}: {e}") + logger.error("Error parsing CSV file %s: %s", input_path, e) return except Exception as e: - print(f"An unexpected error occurred while reading {input_path}: {e}") + logger.error( + "An unexpected error occurred while reading %s: %s", input_path, e + ) return diff --git a/RiskLabAI/core/__init__.py b/RiskLabAI/core/__init__.py new file mode 100644 index 0000000..e550d28 --- /dev/null +++ b/RiskLabAI/core/__init__.py @@ -0,0 +1,67 @@ +""" +RiskLabAI.core — the extension layer. + +This package holds the small set of abstractions that make RiskLabAI *modular, +extendable and maintainable*: a reusable component :class:`Registry`, the base +interfaces every model family targets, and a built-in catalogue of the +components the library already ships. + +It is the recommended entry point for adding a **new** model (for example, an +extension or alternative to a method from López de Prado's books): implement the +relevant base interface, register it in the matching registry, and it becomes +discoverable and constructible by name — without editing any central file. See +``EXTENDING.md`` at the repository root for a step-by-step guide. + +Importing this package is cheap: every component is registered lazily, so heavy +dependencies are only imported when a component is actually created. + +Quick tour +---------- +>>> from RiskLabAI.core import list_components, CROSS_VALIDATORS +>>> sorted(list_components()) # doctest: +SKIP +['bars', 'bet_sizers', 'cross_validators', ...] +>>> "purgedkfold" in CROSS_VALIDATORS +True +>>> cv = CROSS_VALIDATORS.create("purgedkfold", n_splits=5) # doctest: +SKIP +""" + +from __future__ import annotations + +from .base import ( + BaseBetSizer, + BaseLabeler, + BasePortfolioOptimizer, + Estimator, +) +from ._builtins import ( + BARS, + BET_SIZERS, + CROSS_VALIDATORS, + FEATURE_IMPORTANCE, + LABELERS, + PORTFOLIO_OPTIMIZERS, + REGISTRIES, + get_registry, + list_components, +) +from .registry import Registry + +__all__ = [ + # Registry machinery + "Registry", + "REGISTRIES", + "get_registry", + "list_components", + # Per-family registries + "BARS", + "CROSS_VALIDATORS", + "FEATURE_IMPORTANCE", + "LABELERS", + "BET_SIZERS", + "PORTFOLIO_OPTIMIZERS", + # Base contracts + "Estimator", + "BaseLabeler", + "BaseBetSizer", + "BasePortfolioOptimizer", +] diff --git a/RiskLabAI/core/_builtins.py b/RiskLabAI/core/_builtins.py new file mode 100644 index 0000000..6dad01d --- /dev/null +++ b/RiskLabAI/core/_builtins.py @@ -0,0 +1,197 @@ +""" +The built-in component catalogue. + +Defines one :class:`~RiskLabAI.core.registry.Registry` per model family and +populates it with the components the library already ships. Registration is +**lazy** (by import path), so importing this module — and therefore +``RiskLabAI.core`` — does not import pandas, numba, scikit-learn or torch. The +heavy import only happens when a component is actually created. + +Families currently exposed only as free functions (labeling, bet sizing, +portfolio optimization) get an empty registry here: it is the extension point +where new class-based models register themselves (see ``EXTENDING.md``). + +The registries are deliberately a *superset* of the existing hand-written +factories (``CrossValidatorFactory``, ``FeatureImportanceFactory``, +``BarsInitializerController``), which remain unchanged for backward +compatibility. ``test/core/test_builtin_parity.py`` asserts the registries stay +in sync with those factories so the two cannot silently drift apart. +""" + +from __future__ import annotations + +from typing import Dict, List + +from .registry import Registry + +__all__ = [ + "BARS", + "CROSS_VALIDATORS", + "FEATURE_IMPORTANCE", + "LABELERS", + "BET_SIZERS", + "PORTFOLIO_OPTIMIZERS", + "REGISTRIES", + "get_registry", + "list_components", +] + +# --------------------------------------------------------------------------- # +# Bar structures. The extension point for a new bar type is a new AbstractBars +# subclass; register it here (or via @BARS.register in your own code). +# --------------------------------------------------------------------------- # +BARS = Registry("bars") +BARS.register_lazy( + "standard_bars", + "RiskLabAI.data.structures.standard_bars:StandardBars", + metadata={"family": "bars", "afml_chapter": 2}, +) +BARS.register_lazy( + "time_bars", + "RiskLabAI.data.structures.time_bars:TimeBars", + metadata={"family": "bars", "afml_chapter": 2}, +) +BARS.register_lazy( + "expected_imbalance_bars", + "RiskLabAI.data.structures.imbalance_bars:ExpectedImbalanceBars", + metadata={"family": "bars", "afml_chapter": 2}, +) +BARS.register_lazy( + "fixed_imbalance_bars", + "RiskLabAI.data.structures.imbalance_bars:FixedImbalanceBars", + metadata={"family": "bars", "afml_chapter": 2}, +) +BARS.register_lazy( + "expected_run_bars", + "RiskLabAI.data.structures.run_bars:ExpectedRunBars", + metadata={"family": "bars", "afml_chapter": 2}, +) +BARS.register_lazy( + "fixed_run_bars", + "RiskLabAI.data.structures.run_bars:FixedRunBars", + metadata={"family": "bars", "afml_chapter": 2}, +) + +# --------------------------------------------------------------------------- # +# Cross-validators. Keys mirror CrossValidatorFactory.VALIDATORS exactly. +# --------------------------------------------------------------------------- # +CROSS_VALIDATORS = Registry("cross_validators") +CROSS_VALIDATORS.register_lazy( + "kfold", + "RiskLabAI.backtest.validation.kfold:KFold", + metadata={"family": "cross_validator"}, +) +CROSS_VALIDATORS.register_lazy( + "walkforward", + "RiskLabAI.backtest.validation.walk_forward:WalkForward", + metadata={"family": "cross_validator"}, +) +CROSS_VALIDATORS.register_lazy( + "purgedkfold", + "RiskLabAI.backtest.validation.purged_kfold:PurgedKFold", + metadata={"family": "cross_validator"}, +) +CROSS_VALIDATORS.register_lazy( + "combinatorialpurged", + "RiskLabAI.backtest.validation.combinatorial_purged:CombinatorialPurged", + metadata={"family": "cross_validator"}, +) +CROSS_VALIDATORS.register_lazy( + "baggedcombinatorialpurged", + "RiskLabAI.backtest.validation.bagged_combinatorial_purged:BaggedCombinatorialPurged", + metadata={"family": "cross_validator"}, +) +CROSS_VALIDATORS.register_lazy( + "adaptivecombinatorialpurged", + "RiskLabAI.backtest.validation.adaptive_combinatorial_purged:AdaptiveCombinatorialPurged", + metadata={"family": "cross_validator"}, +) + +# --------------------------------------------------------------------------- # +# Feature-importance strategies. Keys mirror FeatureImportanceFactory exactly. +# --------------------------------------------------------------------------- # +FEATURE_IMPORTANCE = Registry("feature_importance") +FEATURE_IMPORTANCE.register_lazy( + "MDI", + "RiskLabAI.features.feature_importance.feature_importance_mdi:FeatureImportanceMDI", + metadata={"family": "feature_importance"}, +) +FEATURE_IMPORTANCE.register_lazy( + "ClusteredMDI", + "RiskLabAI.features.feature_importance.clustered_feature_importance_mdi:ClusteredFeatureImportanceMDI", + metadata={"family": "feature_importance"}, +) +FEATURE_IMPORTANCE.register_lazy( + "MDA", + "RiskLabAI.features.feature_importance.feature_importance_mda:FeatureImportanceMDA", + metadata={"family": "feature_importance"}, +) +FEATURE_IMPORTANCE.register_lazy( + "ClusteredMDA", + "RiskLabAI.features.feature_importance.clustered_feature_importance_mda:ClusteredFeatureImportanceMDA", + metadata={"family": "feature_importance"}, +) +FEATURE_IMPORTANCE.register_lazy( + "SFI", + "RiskLabAI.features.feature_importance.feature_importance_sfi:FeatureImportanceSFI", + metadata={"family": "feature_importance"}, +) + +# --------------------------------------------------------------------------- # +# Free-function families: empty registries, ready for new class-based models. +# These correspond to the BaseLabeler / BaseBetSizer / BasePortfolioOptimizer +# contracts in RiskLabAI.core.base. +# --------------------------------------------------------------------------- # +LABELERS = Registry("labelers") +BET_SIZERS = Registry("bet_sizers") +PORTFOLIO_OPTIMIZERS = Registry("portfolio_optimizers") + +# --------------------------------------------------------------------------- # +# Family name -> registry, for discovery and a unified catalogue. +# --------------------------------------------------------------------------- # +REGISTRIES: Dict[str, Registry] = { + "bars": BARS, + "cross_validators": CROSS_VALIDATORS, + "feature_importance": FEATURE_IMPORTANCE, + "labelers": LABELERS, + "bet_sizers": BET_SIZERS, + "portfolio_optimizers": PORTFOLIO_OPTIMIZERS, +} + + +def get_registry(family: str) -> Registry: + """ + Return the registry for a model family. + + Parameters + ---------- + family : str + One of the keys of :data:`REGISTRIES` (e.g. ``"bars"``, + ``"cross_validators"``). + + Raises + ------ + KeyError + If the family name is unknown (the message lists valid families). + """ + try: + return REGISTRIES[family] + except KeyError: + valid = ", ".join(repr(k) for k in sorted(REGISTRIES)) + raise KeyError( + f"{family!r} is not a known model family. Valid families: {valid}." + ) from None + + +def list_components() -> Dict[str, List[str]]: + """ + Return a catalogue mapping each family name to its available component keys. + + Useful for discovery and for documentation that should not drift:: + + >>> from RiskLabAI.core import list_components + >>> catalogue = list_components() + >>> catalogue["cross_validators"] # doctest: +SKIP + ['adaptivecombinatorialpurged', 'baggedcombinatorialpurged', ...] + """ + return {family: reg.available() for family, reg in REGISTRIES.items()} diff --git a/RiskLabAI/core/base.py b/RiskLabAI/core/base.py new file mode 100644 index 0000000..7fbcc7c --- /dev/null +++ b/RiskLabAI/core/base.py @@ -0,0 +1,193 @@ +""" +Base interfaces (contracts) for RiskLabAI model families. + +This module gives every model family a single, documented contract that new +implementations can target. Two kinds of contract live here: + +1. **Canonical interfaces that already exist** elsewhere in the library are + re-exported from here so there is one obvious place to look: + :class:`AbstractBars`, :class:`CrossValidator`, and + :class:`FeatureImportanceStrategy`. These are re-exported *lazily* (PEP 562) + so that ``import RiskLabAI.core`` does not eagerly pull in pandas/numba and + the rest of the data/backtest/features sub-packages. + +2. **New, optional abstract base classes** for families that are currently + exposed only as free functions (labeling, bet sizing, portfolio + optimization). They are purely additive: existing functions keep working + unchanged. A *new* model can subclass one of these to advertise a uniform + interface (and to be registered and discovered through + :mod:`RiskLabAI.core.registry`). + +Plus a structural :class:`Estimator` protocol describing the scikit-learn-style +``fit``/``predict`` objects the cross-validation and feature-importance code +already expects. +""" + +from __future__ import annotations + +from abc import ABC, abstractmethod +from typing import Any, Optional, Protocol, runtime_checkable + +import pandas as pd + +__all__ = [ + # Structural protocol + "Estimator", + # New optional contracts for free-function families + "BaseLabeler", + "BaseBetSizer", + "BasePortfolioOptimizer", + # Re-exported canonical interfaces (lazy) + "AbstractBars", + "BarBuilder", + "CrossValidator", + "FeatureImportanceStrategy", +] + + +@runtime_checkable +class Estimator(Protocol): + """ + Structural type for a scikit-learn-style estimator. + + Anything with ``fit`` and ``predict`` satisfies this protocol; no + inheritance is required (it is ``runtime_checkable``, so + ``isinstance(model, Estimator)`` works for duck-typed models). This is the + object the cross-validation and feature-importance machinery consumes. + """ + + def fit(self, X: Any, y: Any = None, **kwargs: Any) -> Any: ... + + def predict(self, X: Any) -> Any: ... + + +class BaseLabeler(ABC): + """ + Optional contract for a labeling method (e.g. triple-barrier, trend-scanning). + + The library's existing labelers are free functions and remain so. New + class-based labelers may subclass this to expose a uniform ``label`` entry + point and be registered in the ``labelers`` registry. See + ``EXTENDING.md`` for a worked example. + """ + + @abstractmethod + def label( + self, + prices: pd.Series, + events: Optional[pd.DataFrame] = None, + **kwargs: Any, + ) -> pd.DataFrame: + """ + Produce labels for the given price series. + + Parameters + ---------- + prices : pd.Series + Price (or log-price) series indexed by timestamp. + events : pd.DataFrame, optional + Event definitions (e.g. event start times, vertical barriers, + target sizes) where applicable. + **kwargs + Method-specific options. + + Returns + ------- + pd.DataFrame + Labels, conventionally indexed by event time. + """ + raise NotImplementedError + + +class BaseBetSizer(ABC): + """ + Optional contract for a bet-sizing method. + + Maps model outputs (e.g. predicted probabilities) to a position size in + ``[-1, 1]``. New class-based bet sizers may subclass this and register in + the ``bet_sizers`` registry. + """ + + @abstractmethod + def bet_size(self, probabilities: pd.Series, **kwargs: Any) -> pd.Series: + """ + Convert prediction probabilities into signed bet sizes. + + Parameters + ---------- + probabilities : pd.Series + Predicted probabilities (or scores) indexed by timestamp. + **kwargs + Method-specific options. + + Returns + ------- + pd.Series + Signed bet sizes, conventionally in ``[-1, 1]``. + """ + raise NotImplementedError + + +class BasePortfolioOptimizer(ABC): + """ + Optional contract for a portfolio-construction method (e.g. HRP, NCO). + + New class-based optimizers may subclass this and register in the + ``portfolio_optimizers`` registry. + """ + + @abstractmethod + def weights(self, returns: pd.DataFrame, **kwargs: Any) -> pd.Series: + """ + Compute portfolio weights. + + Parameters + ---------- + returns : pd.DataFrame + Asset returns (rows = observations, columns = assets). Some + optimizers may accept a covariance matrix instead; document the + expectation in the concrete subclass. + **kwargs + Method-specific options. + + Returns + ------- + pd.Series + Portfolio weights indexed by asset. + """ + raise NotImplementedError + + +# --------------------------------------------------------------------------- # +# Lazy re-exports of the canonical interfaces that already live in the library. +# Importing them here eagerly would pull in the heavy data/backtest/features +# sub-package __init__ chains, defeating the lazy-import design. PEP 562 +# __getattr__ defers that cost until the name is actually accessed. +# --------------------------------------------------------------------------- # +_LAZY_REEXPORTS = { + "AbstractBars": "RiskLabAI.data.structures.abstract_bars:AbstractBars", + "BarBuilder": "RiskLabAI.data.structures.abstract_bars:AbstractBars", + "CrossValidator": ( + "RiskLabAI.backtest.validation.cross_validator_interface:CrossValidator" + ), + "FeatureImportanceStrategy": ( + "RiskLabAI.features.feature_importance.feature_importance_strategy" + ":FeatureImportanceStrategy" + ), +} + + +def __getattr__(name: str) -> Any: + target = _LAZY_REEXPORTS.get(name) + if target is None: + raise AttributeError(f"module {__name__!r} has no attribute {name!r}") + import importlib + + module_path, _, attribute = target.partition(":") + obj = getattr(importlib.import_module(module_path), attribute) + globals()[name] = obj # cache for next access + return obj + + +def __dir__() -> list: + return sorted(__all__) diff --git a/RiskLabAI/core/registry.py b/RiskLabAI/core/registry.py new file mode 100644 index 0000000..131b541 --- /dev/null +++ b/RiskLabAI/core/registry.py @@ -0,0 +1,393 @@ +""" +A small, dependency-free component registry. + +Several sub-packages in RiskLabAI already implement the same pattern by hand: +a string key is mapped to a class, and a factory instantiates the right class +from that key (``controller/bars_initializer.py``, +``backtest/validation/cross_validator_factory.py``, +``features/feature_importance/feature_importance_factory.py``). Each does it +slightly differently (different casing rules, different error behaviour, a +hand-written ``dict`` in three places). + +``Registry`` factors that pattern into one reusable, well-tested object so that: + +* every model family discovers and instantiates its components the same way; +* **new** models (e.g. an extension proposed in a RiskLabAI methods paper) can be + added with a single ``@registry.register("my_model")`` decorator, without + editing a central ``dict`` — the open/closed principle in practice; +* lookups fail loudly with a helpful message listing the valid keys, instead of + silently returning ``None`` or an empty result. + +The registry is intentionally tiny and imports nothing heavy, so importing it +(and the :mod:`RiskLabAI.core` package) stays cheap. Components may be registered +*lazily* by a ``"module.path:attribute"`` string, so registering the built-in +catalogue does not import pandas/numba/torch until a component is actually +created. +""" + +from __future__ import annotations + +import importlib +import inspect +import logging +from typing import ( + Any, + Callable, + Dict, + Iterator, + List, + Optional, + Tuple, +) + +logger = logging.getLogger(__name__) + +__all__ = ["Registry"] + +# A factory is anything callable that returns a component instance: usually a +# class, but a plain function works too. +Factory = Callable[..., Any] + + +class _Entry: + """Internal record for one registered component (eager or lazy).""" + + __slots__ = ("key", "_obj", "_lazy_target", "metadata") + + def __init__( + self, + key: str, + obj: Optional[Factory] = None, + lazy_target: Optional[str] = None, + metadata: Optional[Dict[str, Any]] = None, + ) -> None: + self.key = key + self._obj = obj + self._lazy_target = lazy_target + self.metadata: Dict[str, Any] = dict(metadata or {}) + + @property + def is_lazy(self) -> bool: + """True if the component has not been imported/resolved yet.""" + return self._obj is None + + def resolve(self) -> Factory: + """Return the factory, importing it on first use if registered lazily.""" + if self._obj is None: + if not self._lazy_target: + raise RuntimeError( + f"Registry entry {self.key!r} has neither an object nor a " + f"lazy import target." + ) + module_path, sep, attribute = self._lazy_target.partition(":") + if not sep: + raise ValueError( + f"Lazy target {self._lazy_target!r} for {self.key!r} must be " + f"of the form 'package.module:attribute'." + ) + module = importlib.import_module(module_path) + self._obj = getattr(module, attribute) + return self._obj + + +class Registry: + """ + A name -> component factory registry with case-insensitive lookup. + + Parameters + ---------- + name : str + Human-readable name of the registry (used in error messages), + e.g. ``"bars"`` or ``"cross_validators"``. + base : type, optional + An optional expected base class / interface for registered components. + It is **not** enforced (so existing free functions and duck-typed + components keep working), but it is recorded for documentation and can + be inspected via :attr:`base`. + + Examples + -------- + >>> from RiskLabAI.core.registry import Registry + >>> animals = Registry("animals") + >>> @animals.register("dog", aliases=("doggo",)) + ... class Dog: + ... def speak(self) -> str: + ... return "woof" + >>> animals.available() + ['dog'] + >>> animals.create("DOG").speak() # case-insensitive + 'woof' + >>> animals.create("doggo").speak() # alias + 'woof' + """ + + def __init__(self, name: str, *, base: Optional[type] = None) -> None: + self.name = name + self.base = base + # canonical key -> entry + self._entries: Dict[str, _Entry] = {} + # lower-cased key OR alias -> canonical key + self._index: Dict[str, str] = {} + + # ------------------------------------------------------------------ # + # Registration + # ------------------------------------------------------------------ # + def register( + self, + key: Any = None, + obj: Optional[Factory] = None, + *, + aliases: Tuple[str, ...] = (), + metadata: Optional[Dict[str, Any]] = None, + override: bool = False, + ) -> Any: + """ + Register a component. Usable as a decorator or as a direct call. + + Decorator forms:: + + @reg.register("my_model") + class MyModel: ... + + @reg.register # key inferred from class name + class MyModel: ... + + Direct form:: + + reg.register("my_model", MyModel) + + Parameters + ---------- + key : str or callable, optional + The lookup key. When used as a bare decorator (``@reg.register``) + this is the class/function being decorated and the key is taken from + its ``__name__``. + obj : callable, optional + The factory (class or function). If given, this is a direct + registration and ``obj`` is returned unchanged. + aliases : tuple of str, optional + Additional names that resolve to the same component. + metadata : dict, optional + Arbitrary metadata stored alongside the entry (e.g. a description, + an AFML page reference, the family it belongs to). + override : bool, default False + Allow replacing an existing key. Otherwise a duplicate raises + :class:`KeyError`, which protects against accidental shadowing. + + Returns + ------- + The decorated object (decorator forms) or ``obj`` (direct form). + """ + # Bare decorator: @reg.register (key is the class/function itself) + if obj is None and callable(key) and not isinstance(key, str): + target = key + self._register( + target.__name__, target, aliases=aliases, + metadata=metadata, override=override, + ) + return target + + # Direct call: reg.register("name", obj) + if obj is not None: + self._register( + key, obj, aliases=aliases, metadata=metadata, override=override, + ) + return obj + + # Decorator factory: @reg.register("name") + def decorator(target: Factory) -> Factory: + resolved_key = key if isinstance(key, str) else target.__name__ + self._register( + resolved_key, target, aliases=aliases, + metadata=metadata, override=override, + ) + return target + + return decorator + + def register_lazy( + self, + key: str, + target: str, + *, + aliases: Tuple[str, ...] = (), + metadata: Optional[Dict[str, Any]] = None, + override: bool = False, + ) -> None: + """ + Register a component by import path, deferring the import to first use. + + Parameters + ---------- + key : str + The lookup key. + target : str + Import path of the form ``"package.module:attribute"``. + aliases, metadata, override + See :meth:`register`. + """ + self._register( + key, None, lazy_target=target, aliases=aliases, + metadata=metadata, override=override, + ) + + def _register( + self, + key: str, + obj: Optional[Factory], + *, + lazy_target: Optional[str] = None, + aliases: Tuple[str, ...] = (), + metadata: Optional[Dict[str, Any]] = None, + override: bool = False, + ) -> None: + if not isinstance(key, str) or not key: + raise TypeError( + f"Registry key must be a non-empty string, got {key!r}." + ) + lower = key.lower() + if lower in self._index and not override: + existing = self._index[lower] + raise KeyError( + f"{self.name!r} registry already has a component under " + f"{key!r} (canonical key {existing!r}). " + f"Pass override=True to replace it." + ) + # On override, drop any stale aliases pointing at the old canonical key. + if override and lower in self._index: + old_canonical = self._index[lower] + self._entries.pop(old_canonical, None) + self._index = { + k: v for k, v in self._index.items() if v != old_canonical + } + + self._entries[key] = _Entry( + key, obj=obj, lazy_target=lazy_target, metadata=metadata, + ) + self._index[lower] = key + for alias in aliases: + if not isinstance(alias, str) or not alias: + raise TypeError(f"Alias must be a non-empty string, got {alias!r}.") + self._index[alias.lower()] = key + logger.debug("Registered %r in %r registry.", key, self.name) + + def unregister(self, key: str) -> None: + """Remove a component and all of its aliases. Raises if absent.""" + entry = self._lookup(key) + canonical = entry.key + self._entries.pop(canonical, None) + self._index = {k: v for k, v in self._index.items() if v != canonical} + + # ------------------------------------------------------------------ # + # Lookup & construction + # ------------------------------------------------------------------ # + def get(self, key: str) -> Factory: + """Return the registered factory (class/callable) for ``key``.""" + return self._lookup(key).resolve() + + def create( + self, + key: str, + *args: Any, + filter_unknown_kwargs: bool = False, + **kwargs: Any, + ) -> Any: + """ + Instantiate the component registered under ``key``. + + Parameters + ---------- + key : str + The lookup key (case-insensitive; aliases accepted). + *args, **kwargs + Forwarded to the component's constructor. + filter_unknown_kwargs : bool, default False + If True, silently drop keyword arguments the constructor does not + accept (unless it accepts ``**kwargs``). This mirrors the behaviour + of the existing cross-validator / feature-importance factories, + which let callers pass a shared bag of options. Off by default so + that typos surface as clear ``TypeError``s. + """ + factory = self.get(key) + if filter_unknown_kwargs: + kwargs = _filter_kwargs(factory, kwargs) + return factory(*args, **kwargs) + + def metadata(self, key: str) -> Dict[str, Any]: + """Return the metadata dict registered alongside ``key``.""" + return dict(self._lookup(key).metadata) + + def is_lazy(self, key: str) -> bool: + """True if ``key`` is registered lazily and not yet imported.""" + return self._lookup(key).is_lazy + + def _lookup(self, key: str) -> _Entry: + try: + canonical = self._index[key.lower()] + except AttributeError: + raise TypeError( + f"Registry key must be a string, got {type(key).__name__}." + ) from None + except KeyError: + raise KeyError(self._not_found_message(key)) from None + return self._entries[canonical] + + def _not_found_message(self, key: str) -> str: + available = self.available() + listing = ", ".join(repr(k) for k in available) if available else "(none)" + return ( + f"{key!r} is not registered in the {self.name!r} registry. " + f"Available components: {listing}." + ) + + # ------------------------------------------------------------------ # + # Introspection / mapping protocol + # ------------------------------------------------------------------ # + def available(self) -> List[str]: + """Sorted list of canonical keys.""" + return sorted(self._entries.keys()) + + def keys(self) -> List[str]: + """Alias for :meth:`available` (mapping-like access).""" + return self.available() + + def aliases(self) -> Dict[str, str]: + """Mapping of alias -> canonical key (excludes canonical keys themselves).""" + return { + alias: canonical + for alias, canonical in self._index.items() + if alias != canonical.lower() + } + + def __contains__(self, key: object) -> bool: + return isinstance(key, str) and key.lower() in self._index + + def __getitem__(self, key: str) -> Factory: + return self.get(key) + + def __iter__(self) -> Iterator[str]: + return iter(self.available()) + + def __len__(self) -> int: + return len(self._entries) + + def __repr__(self) -> str: # pragma: no cover - cosmetic + return f"" + + +def _filter_kwargs(factory: Factory, kwargs: Dict[str, Any]) -> Dict[str, Any]: + """ + Drop keyword arguments the factory's signature does not accept. + + If the signature contains ``**kwargs`` (VAR_KEYWORD), everything is kept. + """ + target = factory.__init__ if inspect.isclass(factory) else factory + try: + signature = inspect.signature(target) + except (TypeError, ValueError): # pragma: no cover - builtins without sig + return kwargs + params = signature.parameters.values() + if any(p.kind is inspect.Parameter.VAR_KEYWORD for p in params): + return kwargs + accepted = {p.name for p in params} + return {k: v for k, v in kwargs.items() if k in accepted} diff --git a/RiskLabAI/data/differentiation/differentiation.py b/RiskLabAI/data/differentiation/differentiation.py index f9ed7a7..41824aa 100644 --- a/RiskLabAI/data/differentiation/differentiation.py +++ b/RiskLabAI/data/differentiation/differentiation.py @@ -105,31 +105,31 @@ def fractional_difference_std( """ # 1. Compute weights for the full series weights = calculate_weights_std(degree, series.shape[0]) - + # 2. Determine warm-up period weights_cumsum_abs = np.cumsum(np.abs(weights)) weights_cumsum_abs /= weights_cumsum_abs[-1] skip = np.searchsorted(weights_cumsum_abs, threshold) - + result_df = pd.DataFrame(index=series.index, columns=series.columns, dtype=float) + # The expanding-window value result[t] = sum_j w_j * x[t - j] is exactly the + # first len(x) terms of a causal convolution of the series with the weights + # in natural order [w_0, w_1, ...]. Using np.convolve replaces the O(n^2) + # per-row growing-window dot product with a single O(n*k) pass per column, + # producing identical values (verified in test/test_performance.py). + weights_natural = weights[::-1].flatten() + for name in series.columns: # Use .ffill() - fillna(method=) is deprecated series_ffill = series[[name]].ffill().dropna() - if series_ffill.empty or series_ffill.shape[0] < skip: + n_obs = series_ffill.shape[0] + if series_ffill.empty or n_obs < skip: continue - - series_np = series_ffill.to_numpy() - for iloc in range(skip, series_np.shape[0]): - # Get the relevant window of data and weights - window_data = series_np[:iloc + 1] - window_weights = weights[-(iloc + 1):] - - # Dot product - result_df.loc[series_ffill.index[iloc], name] = np.dot( - window_weights.T, window_data - )[0, 0] + series_np = series_ffill.to_numpy().reshape(-1) + convolved = np.convolve(series_np, weights_natural[:n_obs])[:n_obs] + result_df.loc[series_ffill.index[skip:], name] = convolved[skip:] return result_df.dropna(how='all') diff --git a/RiskLabAI/data/labeling/labeling.py b/RiskLabAI/data/labeling/labeling.py index ef892fe..86fc6cd 100644 --- a/RiskLabAI/data/labeling/labeling.py +++ b/RiskLabAI/data/labeling/labeling.py @@ -1,519 +1,458 @@ -""" -Implements the core financial labeling functions, including: -- CUSUM filters for event sampling. -- Volatility estimation. -- The Triple-Barrier Method (vertical and horizontal barriers). -- Meta-labeling. -- Multiprocessing helpers for parallel execution. - -Reference: - De Prado, M. (2018) Advances in financial machine learning. - John Wiley & Sons, Chapters 3 & 4. -""" - -import sys -import time -import datetime -import multiprocessing as mp -from concurrent.futures import ProcessPoolExecutor -from typing import List, Optional, Tuple, Callable, Dict, Any - -import numpy as np -import pandas as pd - - -def cusum_filter_events_dynamic_threshold( - prices: pd.Series, threshold: pd.Series -) -> pd.DatetimeIndex: - """ - Detect events using the Symmetric CUSUM filter with a dynamic threshold. - - This filter identifies timestamps where the cumulative sum of price - changes exceeds a dynamic, time-varying threshold. - - Reference: - Snippet 3.2, Page 48 (modified for dynamic threshold). - - Parameters - ---------- - prices : pd.Series - A pandas Series of prices. - threshold : pd.Series - A pandas Series containing the threshold values for each timestamp. - Must be aligned with the `prices` index. - - Returns - ------- - pd.DatetimeIndex - Timestamps of the detected events (when a barrier was touched). - """ - time_events = [] - shift_positive, shift_negative = 0.0, 0.0 - price_delta = prices.diff().dropna() - - # Align price changes with thresholds - price_delta, thresholds = price_delta.align( - threshold, join="inner", copy=False - ) - - for (index, value), thresh_val in zip( - price_delta.items(), thresholds.values - ): - shift_positive = max(0.0, shift_positive + value) - shift_negative = min(0.0, shift_negative + value) - - if shift_negative < -thresh_val: - shift_negative = 0.0 # Reset only this counter - time_events.append(index) - elif shift_positive > thresh_val: - shift_positive = 0.0 # Reset only this counter - time_events.append(index) - - return pd.DatetimeIndex(time_events) - - -def symmetric_cusum_filter( - prices: pd.Series, threshold: float -) -> pd.DatetimeIndex: - """ - Detect events using the Symmetric CUSUM filter with a fixed threshold. - - Reference: - Snippet 3.2, Page 48. - - Parameters - ---------- - prices : pd.Series - A pandas Series of prices. - threshold : float - The fixed threshold value. - - Returns - ------- - pd.DatetimeIndex - Timestamps of the detected events (when a barrier was touched). - """ - time_events = [] - shift_positive, shift_negative = 0.0, 0.0 - price_delta = prices.diff().dropna() - - for index, value in price_delta.items(): - shift_positive = max(0.0, shift_positive + value) - shift_negative = min(0.0, shift_negative + value) - - if shift_negative < -threshold: - shift_negative = 0.0 - time_events.append(index) - elif shift_positive > threshold: - shift_positive = 0.0 - time_events.append(index) - - return pd.DatetimeIndex(time_events) - - -def daily_volatility_with_log_returns( - close: pd.Series, span: int = 100 -) -> pd.Series: - """ - Calculate daily volatility using log returns. - - This method computes the EWMA standard deviation of daily log returns. - - Reference: - Snippet 3.1, Page 44. - - Parameters - ---------- - close : pd.Series - Time series of close prices. - span : int, default=100 - The span parameter for the EWMA. - - Returns - ------- - pd.Series - Series of daily volatility estimates. - """ - # Find timestamps of 1 day prior - df = close.index.searchsorted(close.index - pd.Timedelta(days=1)) - df = df[df > 0] - df = pd.Series( - close.index[df - 1], - index=close.index[close.shape[0] - df.shape[0] :], - ) - - # Calculate log returns - returns = np.log(close.loc[df.index] / close.loc[df.values].values) - stds = returns.ewm(span=span).std().rename("std") - return stds - - -def vertical_barrier( - close: pd.Series, time_events: pd.DatetimeIndex, number_days: int -) -> pd.Series: - """ - Create a vertical barrier for the triple-barrier method. - - This function finds the timestamp that occurs `number_days` after - each event in `time_events`. - - Reference: - Snippet 3.4, Page 50. - - Parameters - ---------- - close : pd.Series - Time series of close prices (used for the index). - time_events : pd.DatetimeIndex - The timestamps of the events (e.g., from CUSUM filter). - number_days : int - The number of days to look forward for the vertical barrier. - - Returns - ------- - pd.Series - A Series where the index is the event timestamp and the - value is the vertical barrier timestamp. - """ - barrier_dates = time_events + pd.Timedelta(days=number_days) - - # Find the integer location of the barrier dates in the price index - timestamp_array = close.index.searchsorted(barrier_dates) - - # Filter out any indices that are out of bounds - valid_indices = timestamp_array < close.shape[0] - timestamp_array = timestamp_array[valid_indices] - - barrier_series = pd.Series( - close.index[timestamp_array], - index=time_events[valid_indices], - ) - return barrier_series - - -def triple_barrier( - close: pd.Series, - events: pd.DataFrame, - ptsl: List[float], - molecule: List[pd.Timestamp], -) -> pd.DataFrame: - """ - Apply the triple-barrier method for a subset of events. - - This is the core worker function for `meta_events`. It finds the - first touch time of the upper, lower, or vertical barrier. - - Reference: - Snippet 3.3, Page 50. - - Parameters - ---------- - close : pd.Series - Time series of close prices. - events : pd.DataFrame - DataFrame with event info. Must contain 'End Time' (vertical - barrier), 'Base Width' (volatility), and 'Side' (if applicable). - ptsl : List[float] - A list of two floats: [profit_taking_multiplier, stop_loss_multiplier]. - molecule : List[pd.Timestamp] - The subset of event timestamps this worker is responsible for. - - Returns - ------- - pd.DataFrame - A DataFrame with columns ['stop_loss', 'profit_taking'] - indexed by the event timestamp, indicating the first - touch time for each horizontal barrier. - """ - # Filter for events this worker owns - events_filtered = events.loc[molecule] - output = pd.DataFrame(index=events_filtered.index) - output["End Time"] = events_filtered["End Time"] # Use original end time - - # 1. Set horizontal barriers - if ptsl[0] > 0: - profit_taking = ptsl[0] * events_filtered["Base Width"] - else: - profit_taking = pd.Series(np.inf, index=events_filtered.index) - - if ptsl[1] > 0: - stop_loss = -ptsl[1] * events_filtered["Base Width"] - else: - stop_loss = pd.Series(-np.inf, index=events_filtered.index) - - # Get side if it exists, otherwise default to 1 (long) - side = events_filtered.get("Side", pd.Series(1.0, index=events_filtered.index)) - - # 2. Find first touch time - for location, vertical_barrier_time in events_filtered["End Time"].fillna(close.index[-1]).items(): - # Path prices from event start to vertical barrier - path_prices = close.loc[location:vertical_barrier_time] - - # Calculate path returns, adjusted by side - path_returns = ( - np.log(path_prices / close[location]) * side.at[location] - ) - - output.loc[location, "stop_loss"] = path_returns[ - path_returns < stop_loss.at[location] - ].index.min() - - output.loc[location, "profit_taking"] = path_returns[ - path_returns > profit_taking.at[location] - ].index.min() - - # The 'End Time' column in output now holds the *first* barrier touched - output["End Time"] = output.min(axis=1) - return output.drop(columns=["stop_loss", "profit_taking"]) - - -def meta_events( - close: pd.Series, - time_events: pd.DatetimeIndex, - ptsl: List[float], - target: pd.Series, - return_min: float, - num_threads: int, - vertical_barrier_times: Optional[pd.Series] = None, - side: Optional[pd.Series] = None, -) -> pd.DataFrame: - """ - Generate events for meta-labeling using the triple-barrier method. - - This function sets up and executes the triple-barrier search in parallel. - - Reference: - Snippet 3.6, Page 51. - - Parameters - ---------- - close : pd.Series - Time series of close prices. - time_events : pd.DatetimeIndex - Timestamps of the events (e.g., from CUSUM filter). - ptsl : List[float] - [profit_taking_multiplier, stop_loss_multiplier]. - target : pd.Series - Series of volatility (or 'Base Width') for setting barrier width. - return_min : float - The minimum target volatility required to run a search. - num_threads : int - Number of parallel threads to use. - vertical_barrier_times : pd.Series, optional - A Series of vertical barrier timestamps. If None, barriers are not used. - side : pd.Series, optional - A Series of position sides (1 or -1). If None, assumes long-only. - - Returns - ------- - pd.DataFrame - The events DataFrame with columns: - - 'End Time': The timestamp of the *first* barrier touch. - - 'Base Width': The volatility used for the barriers. - - 'Side': (Optional) The side of the bet. - """ - # 1. Filter events by volatility - target = target.reindex(time_events) - target = target[target > return_min] - if target.empty: - return pd.DataFrame(columns=["End Time", "Base Width", "Side"]) - - # 2. Set up vertical barrier - if vertical_barrier_times is None: - vertical_barrier_times = pd.Series(pd.NaT, index=target.index) - else: - vertical_barrier_times = vertical_barrier_times.reindex(target.index) - - # 3. Set up sides - if side is None: - side_series = pd.Series(1.0, index=target.index) - ptsl_final = [ptsl[0], ptsl[0]] # Symmetric barriers - else: - side_series = side.reindex(target.index) - ptsl_final = ptsl[:2] # Asymmetric barriers - - # 4. Create base events DataFrame - events = pd.concat( - { - "End Time": vertical_barrier_times, - "Base Width": target, - "Side": side_series, - }, - axis=1, - ).dropna(subset=["Base Width"]) - - # 5. Run in parallel - molecule_subsets = np.array_split(events.index, num_threads) - - with ProcessPoolExecutor(max_workers=num_threads) as executor: - results = list(executor.map( - triple_barrier, - [close] * num_threads, - [events] * num_threads, - [ptsl_final] * num_threads, - molecule_subsets - )) - - # Combine results and update End Time - first_touch_times = pd.concat(results, axis=0)["End Time"] - events["End Time"] = first_touch_times.reindex(events.index) - - if side is None: - events = events.drop("Side", axis=1) - - return events - - -def meta_labeling( - events: pd.DataFrame, close: pd.Series -) -> pd.DataFrame: - """ - Calculate returns and assign binary labels for meta-labeling. - - Reference: - Snippet 3.7, Page 52. - - Parameters - ---------- - events : pd.DataFrame - The events DataFrame from `meta_events`. - close : pd.Series - Time series of close prices. - - Returns - ------- - pd.DataFrame - DataFrame with columns: - - 'End Time': Time of first barrier touch. - - 'Return': The return of the trade. - - 'Label': The binary label (0 or 1 if 'Side' is present, -1 or 1 if not). - - 'Side': (Optional) The side of the bet. - """ - events_filtered = events.dropna(subset=["End Time"]) - - # Get all unique timestamps - all_dates = events_filtered.index.union( - events_filtered["End Time"].values - ).drop_duplicates() - - # Reindex prices to only the required dates - close_filtered = close.reindex(all_dates, method="bfill") - - out = pd.DataFrame(index=events_filtered.index) - out["End Time"] = events_filtered["End Time"] - - out["Return"] = ( - np.log(close_filtered.loc[events_filtered["End Time"].values].values) - - np.log(close_filtered.loc[events_filtered.index].values) - ) - - if "Side" in events_filtered: - out["Return"] *= events_filtered["Side"] - out["Side"] = events_filtered["Side"] - - # Assign labels - out["Label"] = np.sign(out["Return"]) - - if "Side" in events_filtered: - # Meta-labeling: 1 if profitable, 0 if not - out.loc[out["Return"] <= 0, "Label"] = 0.0 - - return out - - -# --- Multiprocessing Helper Functions --- -# (These are generic utilities) - -def lin_parts(num_atoms: int, num_threads: int) -> np.ndarray: - """ - Create linear partitions for parallel processing. - - Parameters - ---------- - num_atoms : int - Total number of items to process. - num_threads : int - Number of threads (partitions) to create. - - Returns - ------- - np.ndarray - Array of partition indices. - """ - parts = np.linspace(0, num_atoms, min(num_threads, num_atoms) + 1) - parts = np.ceil(parts).astype(int) - return parts - - -def process_jobs( - jobs: List[Dict[str, Any]], task: Optional[str] = None, num_threads: int = 24 -) -> List[Any]: - """ - Run jobs in parallel using multiprocessing. - - Parameters - ---------- - jobs : List[Dict] - List of job dictionaries. Each dict must contain a 'func' key - and its corresponding arguments. - task : str, optional - Name of the task for progress reporting. - num_threads : int, default=24 - Number of threads to use. - - Returns - ------- - List[Any] - A list containing the results from each job. - """ - if task is None: - task = jobs[0]["func"].__name__ - - with mp.Pool(processes=num_threads) as pool: - outputs = pool.imap_unordered(expand_call, jobs) - out = [] - time0 = time.time() - - # Process async output, report progress - for i, out_ in enumerate(outputs, 1): - out.append(out_) - report_progress(i, len(jobs), time0, task) - - return out - - -def expand_call(kargs: Dict[str, Any]) -> Any: - """ - Worker function to expand keyword arguments and call the function. - - Parameters - ---------- - kargs : Dict - Dictionary of arguments, including the 'func' to be called. - - Returns - ------- - Any - The result of the function call. - """ - func = kargs.pop('func') - return func(**kargs) - - -def report_progress( - job_num: int, num_jobs: int, time0: float, task: str -) -> None: - """Report progress of parallel jobs.""" - msg = [float(job_num) / num_jobs, (time.time() - time0) / 60.0] - msg.append(msg[1] * (1 / msg[0] - 1)) # Remaining time - timestamp = str(datetime.datetime.fromtimestamp(time.time())) - - msg_str = ( - f"{timestamp} {msg[0]*100:.2f}% {task} done after " - f"{msg[1]:.2f} minutes. Remaining {msg[2]:.2f} minutes." - ) - - if job_num < num_jobs: - sys.stderr.write(msg_str + "\r") - else: - sys.stderr.write(msg_str + "\n") \ No newline at end of file +""" +Implements the core financial labeling functions, including: +- CUSUM filters for event sampling. +- Volatility estimation. +- The Triple-Barrier Method (vertical and horizontal barriers). +- Meta-labeling. +- Re-exports of the parallel-execution helpers from ``RiskLabAI.hpc``. + +Reference: + De Prado, M. (2018) Advances in financial machine learning. + John Wiley & Sons, Chapters 3 & 4. +""" + +from concurrent.futures import ProcessPoolExecutor +from typing import List, Optional + +import numpy as np +import pandas as pd + +# Parallel-processing helpers have a single source of truth in RiskLabAI.hpc. +# They were previously duplicated in this module; they are re-exported here +# under their historical names for backward compatibility. `lin_parts` maps to +# hpc's `linear_partitions` (identical for valid inputs, with an added guard). +from RiskLabAI.hpc import ( # noqa: F401 (re-exported for backward compat) + process_jobs, + expand_call, + report_progress, + linear_partitions as lin_parts, +) + + +def cusum_filter_events_dynamic_threshold( + prices: pd.Series, threshold: pd.Series +) -> pd.DatetimeIndex: + """ + Detect events using the Symmetric CUSUM filter with a dynamic threshold. + + This filter identifies timestamps where the cumulative sum of price + changes exceeds a dynamic, time-varying threshold. + + Reference: + Snippet 3.2, Page 48 (modified for dynamic threshold). + + Parameters + ---------- + prices : pd.Series + A pandas Series of prices. + threshold : pd.Series + A pandas Series containing the threshold values for each timestamp. + Must be aligned with the `prices` index. + + Returns + ------- + pd.DatetimeIndex + Timestamps of the detected events (when a barrier was touched). + """ + time_events = [] + shift_positive, shift_negative = 0.0, 0.0 + price_delta = prices.diff().dropna() + + # Align price changes with thresholds + price_delta, thresholds = price_delta.align( + threshold, join="inner", copy=False + ) + + for (index, value), thresh_val in zip( + price_delta.items(), thresholds.values + ): + shift_positive = max(0.0, shift_positive + value) + shift_negative = min(0.0, shift_negative + value) + + if shift_negative < -thresh_val: + shift_negative = 0.0 # Reset only this counter + time_events.append(index) + elif shift_positive > thresh_val: + shift_positive = 0.0 # Reset only this counter + time_events.append(index) + + return pd.DatetimeIndex(time_events) + + +def symmetric_cusum_filter( + prices: pd.Series, threshold: float +) -> pd.DatetimeIndex: + """ + Detect events using the Symmetric CUSUM filter with a fixed threshold. + + Reference: + Snippet 3.2, Page 48. + + Parameters + ---------- + prices : pd.Series + A pandas Series of prices. + threshold : float + The fixed threshold value. + + Returns + ------- + pd.DatetimeIndex + Timestamps of the detected events (when a barrier was touched). + """ + time_events = [] + shift_positive, shift_negative = 0.0, 0.0 + price_delta = prices.diff().dropna() + + for index, value in price_delta.items(): + shift_positive = max(0.0, shift_positive + value) + shift_negative = min(0.0, shift_negative + value) + + if shift_negative < -threshold: + shift_negative = 0.0 + time_events.append(index) + elif shift_positive > threshold: + shift_positive = 0.0 + time_events.append(index) + + return pd.DatetimeIndex(time_events) + + +def daily_volatility_with_log_returns( + close: pd.Series, span: int = 100 +) -> pd.Series: + """ + Calculate daily volatility using log returns. + + This method computes the EWMA standard deviation of daily log returns. + + Reference: + Snippet 3.1, Page 44. + + Parameters + ---------- + close : pd.Series + Time series of close prices. + span : int, default=100 + The span parameter for the EWMA. + + Returns + ------- + pd.Series + Series of daily volatility estimates. + """ + # Find timestamps of 1 day prior + df = close.index.searchsorted(close.index - pd.Timedelta(days=1)) + df = df[df > 0] + df = pd.Series( + close.index[df - 1], + index=close.index[close.shape[0] - df.shape[0] :], + ) + + # Calculate log returns + returns = np.log(close.loc[df.index] / close.loc[df.values].values) + stds = returns.ewm(span=span).std().rename("std") + return stds + + +def vertical_barrier( + close: pd.Series, time_events: pd.DatetimeIndex, number_days: int +) -> pd.Series: + """ + Create a vertical barrier for the triple-barrier method. + + This function finds the timestamp that occurs `number_days` after + each event in `time_events`. + + Reference: + Snippet 3.4, Page 50. + + Parameters + ---------- + close : pd.Series + Time series of close prices (used for the index). + time_events : pd.DatetimeIndex + The timestamps of the events (e.g., from CUSUM filter). + number_days : int + The number of days to look forward for the vertical barrier. + + Returns + ------- + pd.Series + A Series where the index is the event timestamp and the + value is the vertical barrier timestamp. + """ + barrier_dates = time_events + pd.Timedelta(days=number_days) + + # Find the integer location of the barrier dates in the price index + timestamp_array = close.index.searchsorted(barrier_dates) + + # Filter out any indices that are out of bounds + valid_indices = timestamp_array < close.shape[0] + timestamp_array = timestamp_array[valid_indices] + + barrier_series = pd.Series( + close.index[timestamp_array], + index=time_events[valid_indices], + ) + return barrier_series + + +def triple_barrier( + close: pd.Series, + events: pd.DataFrame, + ptsl: List[float], + molecule: List[pd.Timestamp], +) -> pd.DataFrame: + """ + Apply the triple-barrier method for a subset of events. + + This is the core worker function for `meta_events`. It finds the + first touch time of the upper, lower, or vertical barrier. + + Reference: + Snippet 3.3, Page 50. + + Parameters + ---------- + close : pd.Series + Time series of close prices. + events : pd.DataFrame + DataFrame with event info. Must contain 'End Time' (vertical + barrier), 'Base Width' (volatility), and 'Side' (if applicable). + ptsl : List[float] + A list of two floats: [profit_taking_multiplier, stop_loss_multiplier]. + molecule : List[pd.Timestamp] + The subset of event timestamps this worker is responsible for. + + Returns + ------- + pd.DataFrame + A DataFrame with columns ['stop_loss', 'profit_taking'] + indexed by the event timestamp, indicating the first + touch time for each horizontal barrier. + """ + # Filter for events this worker owns + events_filtered = events.loc[molecule] + output = pd.DataFrame(index=events_filtered.index) + output["End Time"] = events_filtered["End Time"] # Use original end time + + # 1. Set horizontal barriers + if ptsl[0] > 0: + profit_taking = ptsl[0] * events_filtered["Base Width"] + else: + profit_taking = pd.Series(np.inf, index=events_filtered.index) + + if ptsl[1] > 0: + stop_loss = -ptsl[1] * events_filtered["Base Width"] + else: + stop_loss = pd.Series(-np.inf, index=events_filtered.index) + + # Get side if it exists, otherwise default to 1 (long) + side = events_filtered.get("Side", pd.Series(1.0, index=events_filtered.index)) + + # 2. Find first touch time. + # Positional numpy indexing replaces the per-event pandas label slicing and + # scalar `.loc` writes. For each event we slice the close-price array between + # the event start and its vertical barrier, compute side-adjusted log + # returns, and take the first index where each horizontal barrier is + # crossed. This yields the same first-touch timestamps as the previous + # implementation (verified in test/test_performance.py) but is ~10-70x + # faster on realistic sizes. + close_index = close.index + close_values = close.to_numpy(dtype=float) + close_index_values = close_index.values + + vertical_filled = events_filtered["End Time"].fillna(close_index[-1]) + start_positions = close_index.get_indexer(events_filtered.index) + end_positions = ( + close_index.searchsorted(vertical_filled.to_numpy(), side="right") - 1 + ) + + stop_loss_values = stop_loss.to_numpy() + profit_taking_values = profit_taking.to_numpy() + side_values = side.to_numpy(dtype=float) + + n_events = len(events_filtered) + stop_loss_touch = np.full(n_events, np.datetime64("NaT"), dtype="datetime64[ns]") + profit_taking_touch = np.full(n_events, np.datetime64("NaT"), dtype="datetime64[ns]") + + for i in range(n_events): + start, end = start_positions[i], end_positions[i] + segment = close_values[start:end + 1] + path_returns = np.log(segment / segment[0]) * side_values[i] + + below = path_returns < stop_loss_values[i] + if below.any(): + stop_loss_touch[i] = close_index_values[start + np.argmax(below)] + + above = path_returns > profit_taking_values[i] + if above.any(): + profit_taking_touch[i] = close_index_values[start + np.argmax(above)] + + # First barrier touched = earliest of {vertical, stop-loss, profit-taking}, + # ignoring NaT (the same semantics as the previous output.min(axis=1)). + candidates = pd.DataFrame( + np.vstack([ + events_filtered["End Time"].to_numpy().astype("datetime64[ns]"), + stop_loss_touch, + profit_taking_touch, + ]).T, + index=events_filtered.index, + ) + output["End Time"] = candidates.min(axis=1) + return output[["End Time"]] + + +def meta_events( + close: pd.Series, + time_events: pd.DatetimeIndex, + ptsl: List[float], + target: pd.Series, + return_min: float, + num_threads: int, + vertical_barrier_times: Optional[pd.Series] = None, + side: Optional[pd.Series] = None, +) -> pd.DataFrame: + """ + Generate events for meta-labeling using the triple-barrier method. + + This function sets up and executes the triple-barrier search in parallel. + + Reference: + Snippet 3.6, Page 51. + + Parameters + ---------- + close : pd.Series + Time series of close prices. + time_events : pd.DatetimeIndex + Timestamps of the events (e.g., from CUSUM filter). + ptsl : List[float] + [profit_taking_multiplier, stop_loss_multiplier]. + target : pd.Series + Series of volatility (or 'Base Width') for setting barrier width. + return_min : float + The minimum target volatility required to run a search. + num_threads : int + Number of parallel threads to use. + vertical_barrier_times : pd.Series, optional + A Series of vertical barrier timestamps. If None, barriers are not used. + side : pd.Series, optional + A Series of position sides (1 or -1). If None, assumes long-only. + + Returns + ------- + pd.DataFrame + The events DataFrame with columns: + - 'End Time': The timestamp of the *first* barrier touch. + - 'Base Width': The volatility used for the barriers. + - 'Side': (Optional) The side of the bet. + """ + # 1. Filter events by volatility + target = target.reindex(time_events) + target = target[target > return_min] + if target.empty: + return pd.DataFrame(columns=["End Time", "Base Width", "Side"]) + + # 2. Set up vertical barrier + if vertical_barrier_times is None: + vertical_barrier_times = pd.Series(pd.NaT, index=target.index) + else: + vertical_barrier_times = vertical_barrier_times.reindex(target.index) + + # 3. Set up sides + if side is None: + side_series = pd.Series(1.0, index=target.index) + ptsl_final = [ptsl[0], ptsl[0]] # Symmetric barriers + else: + side_series = side.reindex(target.index) + ptsl_final = ptsl[:2] # Asymmetric barriers + + # 4. Create base events DataFrame + events = pd.concat( + { + "End Time": vertical_barrier_times, + "Base Width": target, + "Side": side_series, + }, + axis=1, + ).dropna(subset=["Base Width"]) + + # 5. Run in parallel + molecule_subsets = np.array_split(events.index, num_threads) + + with ProcessPoolExecutor(max_workers=num_threads) as executor: + results = list(executor.map( + triple_barrier, + [close] * num_threads, + [events] * num_threads, + [ptsl_final] * num_threads, + molecule_subsets + )) + + # Combine results and update End Time + first_touch_times = pd.concat(results, axis=0)["End Time"] + events["End Time"] = first_touch_times.reindex(events.index) + + if side is None: + events = events.drop("Side", axis=1) + + return events + + +def meta_labeling( + events: pd.DataFrame, close: pd.Series +) -> pd.DataFrame: + """ + Calculate returns and assign binary labels for meta-labeling. + + Reference: + Snippet 3.7, Page 52. + + Parameters + ---------- + events : pd.DataFrame + The events DataFrame from `meta_events`. + close : pd.Series + Time series of close prices. + + Returns + ------- + pd.DataFrame + DataFrame with columns: + - 'End Time': Time of first barrier touch. + - 'Return': The return of the trade. + - 'Label': The binary label (0 or 1 if 'Side' is present, -1 or 1 if not). + - 'Side': (Optional) The side of the bet. + """ + events_filtered = events.dropna(subset=["End Time"]) + + # Get all unique timestamps + all_dates = events_filtered.index.union( + events_filtered["End Time"].values + ).drop_duplicates() + + # Reindex prices to only the required dates + close_filtered = close.reindex(all_dates, method="bfill") + + out = pd.DataFrame(index=events_filtered.index) + out["End Time"] = events_filtered["End Time"] + + out["Return"] = ( + np.log(close_filtered.loc[events_filtered["End Time"].values].values) + - np.log(close_filtered.loc[events_filtered.index].values) + ) + + if "Side" in events_filtered: + out["Return"] *= events_filtered["Side"] + out["Side"] = events_filtered["Side"] + + # Assign labels + out["Label"] = np.sign(out["Return"]) + + if "Side" in events_filtered: + # Meta-labeling: 1 if profitable, 0 if not + out.loc[out["Return"] <= 0, "Label"] = 0.0 + + return out diff --git a/RiskLabAI/features/feature_importance/clustered_feature_importance_mda.py b/RiskLabAI/features/feature_importance/clustered_feature_importance_mda.py index 5cb9299..83eae73 100644 --- a/RiskLabAI/features/feature_importance/clustered_feature_importance_mda.py +++ b/RiskLabAI/features/feature_importance/clustered_feature_importance_mda.py @@ -2,6 +2,7 @@ Computes Clustered Mean Decrease Accuracy (MDA) feature importance. """ +import logging from typing import Dict, Tuple, List, Any import numpy as np import pandas as pd @@ -10,6 +11,8 @@ from sklearn.metrics import log_loss from .feature_importance_strategy import FeatureImportanceStrategy +logger = logging.getLogger(__name__) + class ClusteredFeatureImportanceMDA(FeatureImportanceStrategy): """ Computes clustered feature importance using MDA. @@ -81,7 +84,7 @@ def compute(self, x: pd.DataFrame, y: pd.Series, **kwargs: Any) -> pd.DataFrame: shuffled_scores = pd.DataFrame(columns=self.clusters.keys(), dtype=float) for i, (train_idx, test_idx) in enumerate(cv_generator.split(X=x)): - print(f"Fold {i} start ...") + logger.debug("Fold %d start ...", i) x_train, y_train, w_train = ( x.iloc[train_idx, :], diff --git a/RiskLabAI/features/feature_importance/feature_importance_mda.py b/RiskLabAI/features/feature_importance/feature_importance_mda.py index 0f839f0..ec003ac 100644 --- a/RiskLabAI/features/feature_importance/feature_importance_mda.py +++ b/RiskLabAI/features/feature_importance/feature_importance_mda.py @@ -2,6 +2,7 @@ Computes Mean Decrease Accuracy (MDA) feature importance. """ +import logging import numpy as np import pandas as pd from sklearn.metrics import log_loss @@ -9,6 +10,8 @@ from typing import List, Optional, Any, Callable from .feature_importance_strategy import FeatureImportanceStrategy +logger = logging.getLogger(__name__) + class FeatureImportanceMDA(FeatureImportanceStrategy): """ Computes feature importance using Mean Decrease Accuracy (MDA). @@ -67,7 +70,7 @@ def compute(self, x: pd.DataFrame, y: pd.Series, **kwargs: Any) -> pd.DataFrame: shuffled_scores = pd.DataFrame(columns=x.columns, dtype=float) for i, (train_idx, test_idx) in enumerate(cv_generator.split(x)): - print(f"Fold {i} start ...") + logger.debug("Fold %d start ...", i) x_train, y_train, w_train = ( x.iloc[train_idx, :], diff --git a/RiskLabAI/hpc/hpc.py b/RiskLabAI/hpc/hpc.py index cc4056f..284c272 100644 --- a/RiskLabAI/hpc/hpc.py +++ b/RiskLabAI/hpc/hpc.py @@ -5,6 +5,7 @@ on pandas objects, using Python's multiprocessing and Joblib. """ +import logging import multiprocessing as mp import pandas as pd import numpy as np @@ -13,6 +14,8 @@ import joblib # <-- Added missing import from typing import List, Dict, Any, Callable, Tuple, Union, Iterable, Optional +logger = logging.getLogger(__name__) + def parallel_run( func: Callable[..., Any], iterable: Iterable[Any], @@ -112,11 +115,9 @@ def report_progress( f"{elapsed_time_min:.2f} minutes. Remaining {remaining_time_min:.2f} minutes." ) - if job_number < total_jobs: - print(message, end='\r') - else: - # Print a newline at the end - print(message) + # Progress is emitted via logging (configure the 'RiskLabAI' logger to see + # it; the library is silent by default). + logger.info(message) def expand_call(kargs: Dict[str, Any]) -> Any: @@ -170,7 +171,7 @@ def process_jobs( # <-- Handle sequential case for debugging if num_threads == 1: - print(f"Running {len(jobs)} jobs sequentially for debugging.") + logger.debug("Running %d jobs sequentially for debugging.", len(jobs)) return process_jobs_sequential(jobs) # <-- Handle -1 for all CPUs diff --git a/RiskLabAI/pde/__init__.py b/RiskLabAI/pde/__init__.py index 105105d..7c29f93 100644 --- a/RiskLabAI/pde/__init__.py +++ b/RiskLabAI/pde/__init__.py @@ -3,8 +3,22 @@ Implements a Deep BSDE (Backward Stochastic Differential Equation) solver for various financial PDEs. + +This sub-package requires PyTorch, which is an optional dependency of +RiskLabAI. The base install stays torch-free: ``import RiskLabAI`` never pulls +in this sub-package (it is loaded lazily). Importing ``RiskLabAI.pde`` without +torch installed raises a clear, actionable error instead of a bare +``ModuleNotFoundError``. """ +try: + import torch as _torch # noqa: F401 (presence check only) +except ImportError as _exc: # pragma: no cover - exercised only without torch + raise ImportError( + "RiskLabAI.pde requires PyTorch, which is an optional dependency. " + "Install it with: pip install 'RiskLabAI[pde]' (or: pip install torch)." + ) from _exc + from .equation import ( Equation, PricingDefaultRisk, diff --git a/RiskLabAI/pde/solver.py b/RiskLabAI/pde/solver.py index 5e39c5e..d13f16a 100644 --- a/RiskLabAI/pde/solver.py +++ b/RiskLabAI/pde/solver.py @@ -3,6 +3,8 @@ Equation) solver classes. """ +import logging + import torch import torch.nn as nn import torch.autograd as autograd @@ -12,6 +14,8 @@ from RiskLabAI.pde.model import * from RiskLabAI.pde.equation import Equation +logger = logging.getLogger(__name__) + def initialize_weights(m: nn.Module) -> None: """ Initializes the weights of a Linear layer. @@ -194,7 +198,7 @@ def solve( model.train() for j in range(num_iterations): - print(f"Iteration {j + 1}/{num_iterations}") + logger.info("Iteration %d/%d", j + 1, num_iterations) dw_train, y_train = self.pde.sample(batch_size) y_train = torch.tensor(y_train, dtype=torch.float32, device=self.device) @@ -228,10 +232,10 @@ def solve( # from the loss function's components. y0_new = torch.mean((payoff - dw_coef) / coef) inits.append(y0_new.item()) - print(f"Loss: {val_loss.item():.4f}, Y_0: {y0_new.item():.4f}") + logger.info("Loss: %.4f, Y_0: %.4f", val_loss.item(), y0_new.item()) else: inits.append(y0.item()) - print(f"Loss: {val_loss.item():.4f}, Y_0: {y0.item():.4f}") + logger.info("Loss: %.4f, Y_0: %.4f", val_loss.item(), y0.item()) losses.append(val_loss.item()) @@ -369,7 +373,7 @@ def solve( t_val = torch.ones((128, 1), device=self.device) for j in range(num_iterations): - print(f"Iteration {j + 1}/{num_iterations}") + logger.info("Iteration %d/%d", j + 1, num_iterations) dw_train, y_train = self.pde.sample(batch_size) y_train = torch.tensor(y_train, dtype=torch.float32, device=self.device) @@ -397,7 +401,7 @@ def solve( y0_mean = torch.mean(y0_val).item() inits.append(y0_mean) - - print(f"Loss: {val_loss.item():.4f}, Y_0: {y0_mean:.4f}") + + logger.info("Loss: %.4f, Y_0: %.4f", val_loss.item(), y0_mean) return losses, inits \ No newline at end of file diff --git a/RiskLabAI/utils/__init__.py b/RiskLabAI/utils/__init__.py index 275a4c6..acd1ea2 100644 --- a/RiskLabAI/utils/__init__.py +++ b/RiskLabAI/utils/__init__.py @@ -36,11 +36,10 @@ def __getattr__(name): return value raise AttributeError(f"module {__name__!r} has no attribute {name!r}") -# --- Alias for Backward Compatibility --- -# 'smoothing_average.py' is a duplicate of 'ewma.py'. -# We import 'ewma' and alias it to 'compute_exponential_weighted_moving_average' -# to maintain compatibility with modules that imported the old name. -# You can safely delete the 'smoothing_average.py' file. +# --- Alias for backward compatibility --- +# The historical `compute_exponential_weighted_moving_average` name now maps to +# the canonical, numba-jitted `ewma` (the former `smoothing_average.py` +# duplicate has been removed). compute_exponential_weighted_moving_average = ewma __all__ = [ diff --git a/RiskLabAI/utils/publication_plots.py b/RiskLabAI/utils/publication_plots.py index c009547..d482fe6 100644 --- a/RiskLabAI/utils/publication_plots.py +++ b/RiskLabAI/utils/publication_plots.py @@ -5,12 +5,16 @@ Provides 6 themes and a configuration-based saving function. """ +import logging + import matplotlib.pyplot as plt import matplotlib.figure as fig # For type hinting import seaborn as sns import os from typing import Optional, Dict, Any +logger = logging.getLogger(__name__) + # [THEMES dictionary remains the same] THEMES: Dict[str, Dict[str, Any]] = { 'light': { @@ -109,8 +113,8 @@ def setup_publication_style( params['savefig.transparent'] = False try: plt.rc('font', family='Times New Roman') - except: - print("Warning: Times New Roman not found. Defaulting to serif.") + except Exception: + logger.warning("Times New Roman not found. Defaulting to serif.") plt.rc('font', family='serif') plt.rcParams.update(params) sns_style = "darkgrid" if base_theme_name == 'dark' else "whitegrid" @@ -120,11 +124,13 @@ def setup_publication_style( _CONFIG['save_plots'] = save_plots _CONFIG['save_dir'] = save_dir - print(f"Matplotlib style updated. Theme: '{theme}', Quality: {quality} DPI.") + logger.info( + "Matplotlib style updated. Theme: '%s', Quality: %s DPI.", theme, quality + ) if save_plots: - print(f"Plot saving enabled. Saving to: '{save_dir}'") + logger.info("Plot saving enabled. Saving to: '%s'", save_dir) else: - print("Plot saving disabled.") + logger.info("Plot saving disabled.") # [apply_plot_style function remains exactly the same] def apply_plot_style( @@ -171,7 +177,7 @@ def finalize_plot( # Save the figure fig.savefig(full_path, bbox_inches='tight') - print(f"Figure saved to: {full_path}") + logger.info("Figure saved to: %s", full_path) # --- 2. Always show the plot --- plt.show() diff --git a/RiskLabAI/utils/smoothing_average.py b/RiskLabAI/utils/smoothing_average.py deleted file mode 100644 index b4bce47..0000000 --- a/RiskLabAI/utils/smoothing_average.py +++ /dev/null @@ -1,42 +0,0 @@ -import numpy as np -from typing import List - - -def compute_exponential_weighted_moving_average( - input_series: np.ndarray, - window_length: int -) -> np.ndarray: - r""" - Compute the exponential weighted moving average (EWMA) of a time series array. - - The EWMA is calculated using the formula: - - .. math:: - EWMA_t = \\frac{x_t + (1 - \\alpha) x_{t-1} + (1 - \\alpha)^2 x_{t-2} + \\ldots}{\\omega_t} - - where: - - .. math:: - \\omega_t = 1 + (1 - \\alpha) + (1 - \\alpha)^2 + \\ldots + (1 - \\alpha)^t, - \\alpha = \\frac{2}{{window\_length + 1}} - - :param input_series: Input time series array. - :type input_series: np.ndarray - :param window_length: Window length for the exponential weighted moving average. - :type window_length: int - :return: An array containing the computed EWMA values. - :rtype: np.ndarray - """ - - num_values = input_series.shape[0] - ewma_output = np.empty(num_values, dtype='float64') - alpha = 2 / float(window_length + 1) - multiplier = 1 - alpha - current_weighted_sum = input_series[0] - ewma_output[0] = current_weighted_sum - - for i in range(1, num_values): - current_weighted_sum = current_weighted_sum * multiplier + input_series[i] - ewma_output[i] = current_weighted_sum / (1 - multiplier ** (i+1)) - - return ewma_output diff --git a/RiskLabAI/utils/utilities_lopez.py b/RiskLabAI/utils/utilities_lopez.py deleted file mode 100644 index eea7116..0000000 --- a/RiskLabAI/utils/utilities_lopez.py +++ /dev/null @@ -1,121 +0,0 @@ -import numpy as np -import pandas as pd -import time -from typing import List, Tuple -from .ewma import ewma # <-- SUGGESTION 1: Import the correct, jitted ewma -from .progress import progress_bar - - -def compute_thresholds( - target_column: np.ndarray, - initial_expected_ticks: int, - initial_bar_size: float -) -> Tuple[List[float], np.ndarray, np.ndarray, List[int], np.ndarray, np.ndarray]: - """ - Groups the target_column DataFrame based on a feature and calculates thresholds. - - This function groups the target_column DataFrame based on a feature - and calculates the thresholds, which can be used in financial machine learning - applications such as dynamic time warping. - - :param target_column: Target column of the DataFrame. - :type target_column: np.ndarray - :param initial_expected_ticks: Initial expected number of ticks. - :type initial_expected_ticks: int - :param initial_bar_size: Initial expected size of each tick. - :type initial_bar_size: float - :return: A tuple containing the time deltas, absolute theta values, thresholds, - times, theta values, and grouping IDs. - :rtype: Tuple[List[float], np.ndarray, np.ndarray, List[int], np.ndarray, np.ndarray] - """ - num_values = target_column.shape[0] - target_column_values = target_column.astype(np.float64) - absolute_thetas = np.zeros(num_values) - thresholds = np.zeros(num_values) - thetas = np.zeros(num_values) - grouping_ids = np.zeros(num_values) - - current_theta = target_column_values[0] - thetas[0] = current_theta - absolute_thetas[0] = np.abs(current_theta) - current_grouping_id = 0 - grouping_ids[0] = current_grouping_id - - time_deltas = [] - times = [] - previous_time = 0 - expected_ticks = initial_expected_ticks - expected_bar_value = initial_bar_size - - start_time = time.time() - - for i in range(1, num_values): - current_theta += target_column_values[i] - thetas[i] = current_theta - absolute_theta = np.abs(current_theta) - absolute_thetas[i] = absolute_theta - - threshold = expected_ticks * expected_bar_value - thresholds[i] = threshold - grouping_ids[i] = current_grouping_id - - if absolute_theta >= threshold: - current_grouping_id += 1 - current_theta = 0 - time_delta = np.float64(i - previous_time) - time_deltas.append(time_delta) - times.append(i) - previous_time = i - - # --- SUGGESTION 1 (Continued) --- - # Call the correct `ewma` function and use the correct `window` parameter. - # Note: The logic of recalculating the EWMA over the full history - # inside the loop is O(N^2) but may be the intended design. - # This change just fixes the function being called. - expected_ticks = ewma( - np.array(time_deltas), window=len(time_deltas) - )[-1] - - expected_bar_value = np.abs( - ewma( - target_column_values[:i], window=initial_expected_ticks - )[-1] - ) - - progress_bar(i, num_values, start_time) - - return time_deltas, absolute_thetas, thresholds, times, thetas, grouping_ids - - -def create_ohlcv_dataframe( - tick_data_grouped: pd.core.groupby.DataFrameGroupBy -) -> pd.DataFrame: - """ - Takes a grouped DataFrame and creates a new one with OHLCV data and other relevant information. - - :param tick_data_grouped: Grouped DataFrame based on some criteria (e.g., time). - :type tick_data_grouped: pd.core.groupby.DataFrameGroupBy - :return: A DataFrame containing OHLCV data and other relevant information. - :rtype: pd.DataFrame - """ - - # --- SUGGESTION 2: Vectorized Approach --- - # This is much faster than using .apply() - - ohlc = tick_data_grouped['price'].ohlc() - volume = tick_data_grouped['size'].sum() - - # Calculate VWAP (Value of Trades) - value = (tick_data_grouped['price'] * tick_data_grouped['size']).sum() - - ohlc['volume'] = volume - ohlc['value_of_trades'] = value / volume # This is the VWAP - ohlc['price_mean'] = tick_data_grouped['price'].mean() - ohlc['tick_count'] = tick_data_grouped['price'].count() - - # Handle potential 0-volume bars to avoid NaN - ohlc['value_of_trades'] = ohlc['value_of_truths'].fillna(0) - - ohlc['price_mean_log_return'] = np.log(ohlc['price_mean']) - np.log(ohlc['price_mean'].shift(1)) - - return ohlc \ No newline at end of file diff --git a/test/core/test_base.py b/test/core/test_base.py new file mode 100644 index 0000000..fdeb38c --- /dev/null +++ b/test/core/test_base.py @@ -0,0 +1,81 @@ +"""Tests for the base interfaces and lazy re-exports in RiskLabAI.core.base.""" + +import pandas as pd +import pytest + +from RiskLabAI.core import base +from RiskLabAI.core.base import ( + BaseBetSizer, + BaseLabeler, + BasePortfolioOptimizer, + Estimator, +) + + +# --------------------------------------------------------------------------- # +# Estimator structural protocol +# --------------------------------------------------------------------------- # +def test_estimator_protocol_is_structural(): + class Model: + def fit(self, X, y=None, **kwargs): + return self + + def predict(self, X): + return X + + class NotAModel: + def fit(self, X, y=None): + return self + + assert isinstance(Model(), Estimator) + assert not isinstance(NotAModel(), Estimator) # missing predict + + +# --------------------------------------------------------------------------- # +# New optional contracts cannot be instantiated until implemented +# --------------------------------------------------------------------------- # +@pytest.mark.parametrize("cls", [BaseLabeler, BaseBetSizer, BasePortfolioOptimizer]) +def test_base_contracts_are_abstract(cls): + with pytest.raises(TypeError): + cls() + + +def test_concrete_labeler_satisfies_contract(): + class ConstantLabeler(BaseLabeler): + def label(self, prices, events=None, **kwargs): + return pd.DataFrame({"label": [1] * len(prices)}, index=prices.index) + + labeler = ConstantLabeler() + out = labeler.label(pd.Series([10.0, 11.0, 12.0])) + assert isinstance(out, pd.DataFrame) + assert list(out["label"]) == [1, 1, 1] + assert isinstance(labeler, BaseLabeler) + + +# --------------------------------------------------------------------------- # +# Lazy re-exports of the canonical interfaces +# --------------------------------------------------------------------------- # +def test_lazy_reexports_resolve_to_real_interfaces(): + from RiskLabAI.data.structures.abstract_bars import AbstractBars + from RiskLabAI.backtest.validation.cross_validator_interface import ( + CrossValidator, + ) + from RiskLabAI.features.feature_importance.feature_importance_strategy import ( + FeatureImportanceStrategy, + ) + + assert base.AbstractBars is AbstractBars + assert base.BarBuilder is AbstractBars + assert base.CrossValidator is CrossValidator + assert base.FeatureImportanceStrategy is FeatureImportanceStrategy + + +def test_base_getattr_rejects_unknown_name(): + with pytest.raises(AttributeError): + _ = base.DoesNotExist + + +def test_base_dir_lists_public_names(): + names = dir(base) + assert "BaseLabeler" in names + assert "CrossValidator" in names diff --git a/test/core/test_builtin_parity.py b/test/core/test_builtin_parity.py new file mode 100644 index 0000000..9faec9d --- /dev/null +++ b/test/core/test_builtin_parity.py @@ -0,0 +1,149 @@ +""" +Parity tests: the built-in registries must stay in sync with the existing +hand-written factories. If someone adds a validator/strategy to a factory but +forgets the registry (or vice versa), these tests fail. +""" + +import pytest + +from RiskLabAI.core import ( + BARS, + BET_SIZERS, + CROSS_VALIDATORS, + FEATURE_IMPORTANCE, + LABELERS, + PORTFOLIO_OPTIMIZERS, + list_components, + get_registry, +) +from RiskLabAI.core.base import CrossValidator + + +# --------------------------------------------------------------------------- # +# Cross-validators +# --------------------------------------------------------------------------- # +def test_cross_validator_registry_matches_factory(): + from RiskLabAI.backtest.validation.cross_validator_factory import ( + CrossValidatorFactory, + ) + + factory_keys = set(CrossValidatorFactory.VALIDATORS) + registry_keys = {k.lower() for k in CROSS_VALIDATORS.available()} + assert registry_keys == factory_keys + + # Same class object behind each key. + for key, cls in CrossValidatorFactory.VALIDATORS.items(): + assert CROSS_VALIDATORS.get(key) is cls + + +def test_cross_validator_create_end_to_end(): + # KFold needs only n_splits; PurgedKFold etc. require a `times` series. + cv = CROSS_VALIDATORS.create("kfold", n_splits=3, filter_unknown_kwargs=True) + assert isinstance(cv, CrossValidator) + + +# --------------------------------------------------------------------------- # +# Feature importance +# --------------------------------------------------------------------------- # +def test_feature_importance_registry_matches_implementations(): + from RiskLabAI.features.feature_importance.feature_importance_mdi import ( + FeatureImportanceMDI, + ) + from RiskLabAI.features.feature_importance.clustered_feature_importance_mdi import ( + ClusteredFeatureImportanceMDI, + ) + from RiskLabAI.features.feature_importance.feature_importance_mda import ( + FeatureImportanceMDA, + ) + from RiskLabAI.features.feature_importance.clustered_feature_importance_mda import ( + ClusteredFeatureImportanceMDA, + ) + from RiskLabAI.features.feature_importance.feature_importance_sfi import ( + FeatureImportanceSFI, + ) + + expected = { + "MDI": FeatureImportanceMDI, + "ClusteredMDI": ClusteredFeatureImportanceMDI, + "MDA": FeatureImportanceMDA, + "ClusteredMDA": ClusteredFeatureImportanceMDA, + "SFI": FeatureImportanceSFI, + } + assert set(FEATURE_IMPORTANCE.available()) == set(expected) + for key, cls in expected.items(): + assert FEATURE_IMPORTANCE.get(key) is cls + + +# --------------------------------------------------------------------------- # +# Bars +# --------------------------------------------------------------------------- # +def test_bars_registry_matches_bar_classes(): + from RiskLabAI.data.structures.standard_bars import StandardBars + from RiskLabAI.data.structures.time_bars import TimeBars + from RiskLabAI.data.structures.imbalance_bars import ( + ExpectedImbalanceBars, + FixedImbalanceBars, + ) + from RiskLabAI.data.structures.run_bars import ( + ExpectedRunBars, + FixedRunBars, + ) + + expected = { + "standard_bars": StandardBars, + "time_bars": TimeBars, + "expected_imbalance_bars": ExpectedImbalanceBars, + "fixed_imbalance_bars": FixedImbalanceBars, + "expected_run_bars": ExpectedRunBars, + "fixed_run_bars": FixedRunBars, + } + assert set(BARS.available()) == set(expected) + for key, cls in expected.items(): + assert BARS.get(key) is cls + + +# --------------------------------------------------------------------------- # +# Catalogue / helpers / extension points +# --------------------------------------------------------------------------- # +def test_list_components_covers_all_families(): + catalogue = list_components() + assert set(catalogue) == { + "bars", + "cross_validators", + "feature_importance", + "labelers", + "bet_sizers", + "portfolio_optimizers", + } + + +def test_free_function_families_start_empty(): + # These are extension points for new class-based models. + assert LABELERS.available() == [] + assert BET_SIZERS.available() == [] + assert PORTFOLIO_OPTIMIZERS.available() == [] + + +def test_get_registry_known_and_unknown(): + assert get_registry("bars") is BARS + with pytest.raises(KeyError): + get_registry("nonexistent_family") + + +def test_new_model_can_register_into_a_family(): + import pandas as pd + + from RiskLabAI.core.base import BaseLabeler + + # Simulate a user/extension paper adding a new labeler. + @LABELERS.register("dummy_const") + class DummyConst(BaseLabeler): + def label(self, prices, events=None, **kwargs): + return pd.DataFrame({"label": 0}, index=prices.index) + + try: + assert "dummy_const" in LABELERS + obj = LABELERS.create("dummy_const") + assert isinstance(obj, BaseLabeler) + finally: + LABELERS.unregister("dummy_const") # keep global registry clean diff --git a/test/core/test_registry.py b/test/core/test_registry.py new file mode 100644 index 0000000..bc66e39 --- /dev/null +++ b/test/core/test_registry.py @@ -0,0 +1,207 @@ +"""Unit tests for the generic component Registry.""" + +import pytest + +from RiskLabAI.core.registry import Registry + + +def make_registry(): + return Registry("test") + + +# --------------------------------------------------------------------------- # +# Registration styles +# --------------------------------------------------------------------------- # +def test_register_as_decorator_with_key(): + reg = make_registry() + + @reg.register("widget") + class Widget: + pass + + assert reg.get("widget") is Widget + assert reg.available() == ["widget"] + + +def test_register_as_bare_decorator_infers_name(): + reg = make_registry() + + @reg.register + class Gadget: + pass + + assert reg.get("Gadget") is Gadget + + +def test_register_direct_call_returns_object(): + reg = make_registry() + + class Thing: + pass + + returned = reg.register("thing", Thing) + assert returned is Thing + assert reg.get("thing") is Thing + + +def test_register_function_factory(): + reg = make_registry() + + @reg.register("builder") + def build(): + return [1, 2, 3] + + assert reg.create("builder") == [1, 2, 3] + + +# --------------------------------------------------------------------------- # +# Lookup behaviour +# --------------------------------------------------------------------------- # +def test_lookup_is_case_insensitive(): + reg = make_registry() + reg.register("Widget", object) + assert reg.get("widget") is object + assert reg.get("WIDGET") is object + assert "wIdGeT" in reg + + +def test_aliases_resolve_to_same_component(): + reg = make_registry() + + class Dog: + pass + + reg.register("dog", Dog, aliases=("doggo", "pup")) + assert reg.get("doggo") is Dog + assert reg.get("pup") is Dog + assert reg.aliases() == {"doggo": "dog", "pup": "dog"} + + +def test_unknown_key_raises_with_available_listed(): + reg = make_registry() + reg.register("alpha", object) + with pytest.raises(KeyError) as excinfo: + reg.get("missing") + message = str(excinfo.value) + assert "missing" in message + assert "'alpha'" in message + + +def test_non_string_key_raises_typeerror(): + reg = make_registry() + with pytest.raises(TypeError): + reg.get(123) + + +# --------------------------------------------------------------------------- # +# Duplicate protection / override +# --------------------------------------------------------------------------- # +def test_duplicate_registration_raises(): + reg = make_registry() + reg.register("x", object) + with pytest.raises(KeyError): + reg.register("x", dict) + + +def test_override_replaces_and_clears_old_aliases(): + reg = make_registry() + reg.register("x", object, aliases=("ex",)) + reg.register("x", dict, override=True) + assert reg.get("x") is dict + # Old alias must no longer resolve. + with pytest.raises(KeyError): + reg.get("ex") + + +# --------------------------------------------------------------------------- # +# create() and kwarg filtering +# --------------------------------------------------------------------------- # +def test_create_forwards_args_and_kwargs(): + reg = make_registry() + + @reg.register("point") + class Point: + def __init__(self, x, y=0): + self.x, self.y = x, y + + p = reg.create("point", 1, y=2) + assert (p.x, p.y) == (1, 2) + + +def test_create_filter_unknown_kwargs_drops_extras(): + reg = make_registry() + + @reg.register("strict") + class Strict: + def __init__(self, a): + self.a = a + + # Without filtering, an unexpected kwarg is an error (typos surface). + with pytest.raises(TypeError): + reg.create("strict", a=1, bogus=2) + + # With filtering, the unknown kwarg is dropped (factory-style behaviour). + obj = reg.create("strict", a=1, bogus=2, filter_unknown_kwargs=True) + assert obj.a == 1 + + +def test_create_filter_keeps_all_when_var_keyword(): + reg = make_registry() + + @reg.register("flexible") + class Flexible: + def __init__(self, **kwargs): + self.kwargs = kwargs + + obj = reg.create("flexible", a=1, b=2, filter_unknown_kwargs=True) + assert obj.kwargs == {"a": 1, "b": 2} + + +# --------------------------------------------------------------------------- # +# Lazy registration +# --------------------------------------------------------------------------- # +def test_register_lazy_defers_import_then_resolves(): + reg = make_registry() + reg.register_lazy("ordered", "collections:OrderedDict") + assert reg.is_lazy("ordered") is True + + from collections import OrderedDict + + assert reg.get("ordered") is OrderedDict + # After resolution the entry is no longer lazy. + assert reg.is_lazy("ordered") is False + + +def test_register_lazy_bad_target_raises_on_use(): + reg = make_registry() + reg.register_lazy("bad", "collections-no-colon") + with pytest.raises(ValueError): + reg.get("bad") + + +# --------------------------------------------------------------------------- # +# Introspection / mapping protocol / metadata / unregister +# --------------------------------------------------------------------------- # +def test_mapping_protocol_and_sorting(): + reg = make_registry() + reg.register("b", object) + reg.register("a", dict) + assert reg.available() == ["a", "b"] + assert list(reg) == ["a", "b"] + assert len(reg) == 2 + assert reg["a"] is dict + + +def test_metadata_round_trips(): + reg = make_registry() + reg.register("m", object, metadata={"family": "demo", "page": 42}) + assert reg.metadata("m") == {"family": "demo", "page": 42} + + +def test_unregister_removes_key_and_aliases(): + reg = make_registry() + reg.register("z", object, aliases=("zee",)) + reg.unregister("z") + assert "z" not in reg + assert "zee" not in reg + assert reg.available() == [] diff --git a/test/test_consolidation.py b/test/test_consolidation.py new file mode 100644 index 0000000..c06140e --- /dev/null +++ b/test/test_consolidation.py @@ -0,0 +1,85 @@ +""" +Tests for the duplication-consolidation pass (non-breaking). + +Verifies that (a) the parallel-processing helpers historically exposed from +``RiskLabAI.data.labeling`` still import and are now the single-source +implementations from ``RiskLabAI.hpc``; and (b) the clustering covariance-> +correlation converter delegates to the canonical ``denoise.cov_to_corr`` and +produces identical output for valid covariance matrices. +""" + +import numpy as np + + +# --------------------------------------------------------------------------- # +# labeling helpers are now single-sourced from RiskLabAI.hpc +# --------------------------------------------------------------------------- # +def test_labeling_helpers_are_reexported_from_hpc(): + from RiskLabAI.data import labeling + from RiskLabAI import hpc + + # Public names still import (backward compatibility). + assert labeling.process_jobs is hpc.process_jobs + assert labeling.expand_call is hpc.expand_call + assert labeling.report_progress is hpc.report_progress + # `lin_parts` was the historical name for hpc's `linear_partitions`. + assert labeling.lin_parts is hpc.linear_partitions + + +def test_labeling_package_exports_still_resolve(): + # The names listed in RiskLabAI.data.labeling.__all__ must all be importable. + from RiskLabAI.data import labeling + + for name in ("lin_parts", "process_jobs", "expand_call", "report_progress"): + assert hasattr(labeling, name), name + + +def test_lin_parts_matches_linear_partitions_numerically(): + from RiskLabAI.data.labeling import lin_parts + + # Same partition boundaries for representative inputs. + np.testing.assert_array_equal(lin_parts(100, 4), np.array([0, 25, 50, 75, 100])) + np.testing.assert_array_equal(lin_parts(10, 3), np.ceil( + np.linspace(0, 10, min(3, 10) + 1)).astype(int)) + + +# --------------------------------------------------------------------------- # +# cov -> corr is single-sourced (clustering delegates to denoise.cov_to_corr) +# --------------------------------------------------------------------------- # +def _reference_cov_to_corr(cov): + std = np.sqrt(np.diag(cov)) + std[std == 0] = 1.0 + corr = cov / np.outer(std, std) + corr[corr < -1] = -1.0 + corr[corr > 1] = 1.0 + np.fill_diagonal(corr, 1.0) + return corr + + +def test_clustering_cov_to_corr_delegates_and_matches(): + from RiskLabAI.cluster.clustering import covariance_to_correlation + from RiskLabAI.data.denoise.denoising import cov_to_corr + + rng = np.random.default_rng(7) + for _ in range(50): + n = int(rng.integers(2, 10)) + a = rng.standard_normal((n, n)) + cov = a @ a.T + np.diag(rng.uniform(0.01, 1.0, n)) # valid PSD, +diag + + out = covariance_to_correlation(cov) + # Delegates to the canonical implementation. + np.testing.assert_allclose(out, cov_to_corr(cov), rtol=0, atol=1e-12) + # And matches an independent reference. + np.testing.assert_allclose(out, _reference_cov_to_corr(cov), + rtol=0, atol=1e-12) + # Correlation diagonal is exactly 1. + np.testing.assert_allclose(np.diag(out), np.ones(n), rtol=0, atol=1e-12) + + +def test_clustering_cov_to_corr_does_not_mutate_input(): + from RiskLabAI.cluster.clustering import covariance_to_correlation + + cov = np.array([[4.0, 2.0], [2.0, 9.0]]) + cov_copy = cov.copy() + _ = covariance_to_correlation(cov) + np.testing.assert_array_equal(cov, cov_copy) diff --git a/test/test_performance.py b/test/test_performance.py new file mode 100644 index 0000000..0612aee --- /dev/null +++ b/test/test_performance.py @@ -0,0 +1,163 @@ +""" +Correctness tests for the vectorized hot-path rewrites (Phase 4, performance). + +Each vectorized implementation is checked against an independent brute-force +reference that encodes the plain mathematical definition. This guarantees the +optimized versions return the same values as the straightforward loops they +replaced, across ordinary and edge cases (leading NaNs, NaT vertical barriers, +both trade sides, and no-touch paths). +""" + +import numpy as np +import pandas as pd + +from RiskLabAI.data.differentiation.differentiation import ( + fractional_difference_std, + calculate_weights_std, +) +from RiskLabAI.backtest.bet_sizing import mpAvgActiveSignals +from RiskLabAI.data.labeling.labeling import triple_barrier + + +# --------------------------------------------------------------------------- # +# fractional_difference_std (expanding-window weighted sum -> convolution) +# --------------------------------------------------------------------------- # +def _frac_diff_std_reference(series, degree, threshold=0.01): + weights = calculate_weights_std(degree, series.shape[0]) + weights_cumsum = np.cumsum(np.abs(weights)) + weights_cumsum /= weights_cumsum[-1] + skip = np.searchsorted(weights_cumsum, threshold) + result = pd.DataFrame(index=series.index, columns=series.columns, dtype=float) + for name in series.columns: + s = series[[name]].ffill().dropna() + if s.empty or s.shape[0] < skip: + continue + arr = s.to_numpy() + for iloc in range(skip, arr.shape[0]): + result.loc[s.index[iloc], name] = np.dot( + weights[-(iloc + 1):].T, arr[:iloc + 1] + )[0, 0] + return result.dropna(how="all") + + +def test_fractional_difference_std_matches_reference(): + rng = np.random.default_rng(1) + idx = pd.date_range("2020-01-01", periods=600, freq="min") + values = np.cumsum(rng.standard_normal(600)) + 100 + series = pd.DataFrame( + {"close": values, "other": values * 0.5 + rng.standard_normal(600)}, + index=idx, + ) + series.iloc[:5, 0] = np.nan # leading NaNs are dropped by ffill().dropna() + + fast = fractional_difference_std(series, 0.4) + reference = _frac_diff_std_reference(series, 0.4) + + assert fast.shape == reference.shape + assert fast.index.equals(reference.index) + common = fast.index.intersection(reference.index) + assert np.allclose( + fast.loc[common].to_numpy(), + reference.loc[common].to_numpy(), + atol=1e-9, + equal_nan=True, + ) + + +# --------------------------------------------------------------------------- # +# mpAvgActiveSignals (interval stabbing -> prefix sums + searchsorted) +# --------------------------------------------------------------------------- # +def _avg_active_reference(signals, molecule): + out = pd.Series(dtype=float) + for loc in molecule: + active = (signals.index.values <= loc) & ( + (loc < signals["t1"]) | pd.isnull(signals["t1"]) + ) + idx = signals[active].index + out[loc] = signals.loc[idx, "signal"].mean() if len(idx) > 0 else 0.0 + return out + + +def test_mp_avg_active_signals_matches_reference(): + rng = np.random.default_rng(7) + n = 500 + starts = pd.to_datetime("2020-01-01") + pd.to_timedelta( + np.sort(rng.integers(0, 50_000, n)), "s" + ) + durations = pd.to_timedelta(rng.integers(1, 4000, n), "s") + t1 = pd.Series(starts + durations) + t1[rng.random(n) < 0.1] = pd.NaT # some open-ended signals + signals = pd.DataFrame( + {"t1": t1.values, "signal": rng.standard_normal(n)}, index=starts + ) + signals = signals[~signals.index.duplicated(keep="first")] + + time_points = sorted( + set(signals["t1"].dropna().values).union(signals.index.values) + ) + + fast = mpAvgActiveSignals(signals, time_points) + reference = _avg_active_reference(signals, time_points) + + assert np.allclose(fast.values, reference.values, atol=1e-12, equal_nan=True) + + +def test_mp_avg_active_signals_empty_molecule(): + signals = pd.DataFrame( + {"t1": [pd.Timestamp("2020-01-02")], "signal": [1.0]}, + index=[pd.Timestamp("2020-01-01")], + ) + assert len(mpAvgActiveSignals(signals, [])) == 0 + + +# --------------------------------------------------------------------------- # +# triple_barrier (per-event pandas slicing -> positional numpy indexing) +# --------------------------------------------------------------------------- # +def _triple_barrier_reference(close, events, ptsl, molecule): + ef = events.loc[molecule] + output = pd.DataFrame(index=ef.index) + output["End Time"] = ef["End Time"] + pt = ptsl[0] * ef["Base Width"] if ptsl[0] > 0 else pd.Series(np.inf, index=ef.index) + sl = -ptsl[1] * ef["Base Width"] if ptsl[1] > 0 else pd.Series(-np.inf, index=ef.index) + side = ef.get("Side", pd.Series(1.0, index=ef.index)) + for loc, vbt in ef["End Time"].fillna(close.index[-1]).items(): + path = close.loc[loc:vbt] + returns = np.log(path / close[loc]) * side.at[loc] + output.loc[loc, "stop_loss"] = returns[returns < sl.at[loc]].index.min() + output.loc[loc, "profit_taking"] = returns[returns > pt.at[loc]].index.min() + output["End Time"] = output.min(axis=1) + return output.drop(columns=["stop_loss", "profit_taking"]) + + +def test_triple_barrier_matches_reference_randomized(): + rng = np.random.default_rng(11) + for _ in range(25): + n = int(rng.integers(60, 300)) + idx = pd.date_range("2020-01-01", periods=n, freq="min") + close = pd.Series(np.cumprod(1 + rng.standard_normal(n) * 0.01) * 100, index=idx) + + positions = np.sort( + rng.choice(n - 2, size=int(rng.integers(3, 25)), replace=False) + ) + vertical = [] + for p in positions: + if rng.random() < 0.25: + vertical.append(pd.NaT) # open vertical barrier + else: + vertical.append(idx[min(p + int(rng.integers(1, 30)), n - 1)]) + events = pd.DataFrame( + { + "End Time": pd.to_datetime(vertical), + "Base Width": rng.uniform(0.005, 0.05, size=len(positions)), + "Side": rng.choice([1.0, -1.0], size=len(positions)), + }, + index=idx[positions], + ) + ptsl = [float(rng.uniform(0.5, 2.0)), float(rng.uniform(0.5, 2.0))] + molecule = list(events.index) + + fast = triple_barrier(close, events, ptsl, molecule) + reference = _triple_barrier_reference(close, events, ptsl, molecule) + pd.testing.assert_series_equal( + fast["End Time"], reference["End Time"], check_names=False + ) From 18fcde4b31b53bc628df31105bb50d949ef3b3cf Mon Sep 17 00:00:00 2001 From: Hamid Arian Date: Tue, 16 Jun 2026 05:09:24 -0400 Subject: [PATCH 2/2] test: don't pin datetime resolution in triple_barrier parity check (pandas 3 ns vs us) --- test/test_performance.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/test/test_performance.py b/test/test_performance.py index 0612aee..c6bc5d7 100644 --- a/test/test_performance.py +++ b/test/test_performance.py @@ -158,6 +158,12 @@ def test_triple_barrier_matches_reference_randomized(): fast = triple_barrier(close, events, ptsl, molecule) reference = _triple_barrier_reference(close, events, ptsl, molecule) + # check_dtype=False: the timestamps are identical, but pandas >= 3 can + # land the two construction paths on different datetime *resolutions* + # (ns vs us). We assert the values match, not the resolution unit. pd.testing.assert_series_equal( - fast["End Time"], reference["End Time"], check_names=False + fast["End Time"], + reference["End Time"], + check_names=False, + check_dtype=False, )