-
Notifications
You must be signed in to change notification settings - Fork 87
Expand file tree
/
Copy pathquant_engine.py
More file actions
2307 lines (1965 loc) · 117 KB
/
quant_engine.py
File metadata and controls
2307 lines (1965 loc) · 117 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
Core quantitative engine for backtesting, training, tuning, and inference of trading strategies.
This module provides a comprehensive suite of tools built on top of the `pybroker`
backtesting library. It is designed to be run from the command line using `click`
for various tasks:
- `train`: Runs a walk-forward analysis for a given strategy and ticker, trains a
machine learning model on the final data fold, and saves all artifacts (model,
results, parameters).
- `tune-strategy`: Performs Bayesian optimization on strategy-level parameters
(e.g., indicator periods, risk levels) to find the optimal combination based on
a defined objective function.
- `visualize-model`: Loads and displays the out-of-sample performance metrics and
charts from a completed `train` run.
- `predict`: Loads a trained model and runs inference on the latest market data
to generate a trading decision (BUY/HOLD/SELL).
- `scan`: Scans a universe of tickers for active trading signals based on one or
more strategies.
- `batch-train`: A powerful command to automate the tuning and training process
across a universe of tickers and strategies, logging results to a CSV.
The engine is designed to be extensible, with strategies encapsulated in their own
modules within the `pybroker_trainer` directory.
"""
import csv
import logging
from typing import Dict, Optional
import pandas as pd
import numpy as np
import pybroker
from sklearn.metrics import roc_auc_score
from sklearn.model_selection import StratifiedKFold
from sklearn.inspection import permutation_importance
from decimal import Decimal
from skopt import gp_minimize
from skopt.space import Real, Integer
from pybroker import ExecContext, FeeMode, PositionMode, Strategy, StrategyConfig, TestResult
from pybroker.strategy import WalkforwardWindow
from lightgbm import LGBMClassifier
import os
import json
import pickle
import click
from datetime import datetime, timedelta
from dataclasses import asdict, replace
import matplotlib.pyplot as plt
import traceback
from core.db import get_db
from core.model import Company, PriceHistory
from tools.file_wrapper import convert_to_json_serializable
from tools.yfinance_tool import get_earnings_dates, load_ticker_data
from pybroker_trainer.strategy_loader import load_strategy_class, get_strategy_defaults, get_strategy_tuning_space, STRATEGY_CLASS_MAP
from pybroker_trainer.config_loader import load_strategy_config
from load_cfg import WORKING_DIRECTORY
from core.logging_config import setup_logging
# --- Logging Configuration ---
setup_logging('quant_engine.log')
logger = logging.getLogger(__name__)
class PassThroughModel:
"""A dummy model that always predicts a high probability for the positive class."""
def __init__(self, n_classes=2):
self.n_classes_ = n_classes
self.objective = 'binary' if n_classes == 2 else 'multiclass'
def predict_proba(self, X):
# For binary, return [0.0, 1.0] to always pass the probability threshold.
# For multiclass, return neutral probability as a safe default.
if self.n_classes_ == 2:
return np.array([[0.0, 1.0]] * len(X))
else:
return np.full((len(X), self.n_classes_), 1.0 / self.n_classes_)
@property
def feature_name_(self):
return [] # No features needed
class NumpyEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, np.integer):
return int(obj)
elif isinstance(obj, np.floating):
return float(obj)
elif isinstance(obj, np.ndarray):
return obj.tolist()
return json.JSONEncoder.default(self, obj)
def _load_price_data(ticker: str, start_date: str = None, end_date: str = None) -> pd.DataFrame:
"""
Loads historical price data for the ticker from the database.
"""
db_session = next(get_db())
try:
company = db_session.query(Company).filter(Company.symbol == ticker).first()
if not company:
logger.error(f"Company with ticker {ticker} not found in database.")
return pd.DataFrame()
query = db_session.query(PriceHistory).filter(PriceHistory.company_id == company.id)
if start_date:
query = query.filter(PriceHistory.date >= start_date)
if end_date:
query = query.filter(PriceHistory.date <= end_date)
query = query.order_by(PriceHistory.date.asc())
price_data = pd.read_sql(query.statement, db_session.bind)
if price_data.empty:
logger.warning(f"No price data found for {ticker} between {start_date} and {end_date}.")
return pd.DataFrame()
# Ensure essential columns are present
for col in ['open', 'high', 'low', 'close', 'adjclose', 'volume']:
if col not in price_data.columns:
logger.error(f"Essential column '{col}' missing from price data for {ticker}.")
return pd.DataFrame()
price_data['date'] = pd.to_datetime(price_data['date'])
if price_data['date'].dt.tz is not None:
price_data['date'] = price_data['date'].dt.tz_localize(None)
price_data.set_index('date', inplace=True)
logger.info(f"Loaded {len(price_data)} data points for {ticker}.")
return price_data
except Exception as e:
logger.error(f"Error loading price data for {ticker}: {e}")
return pd.DataFrame()
finally:
if db_session and db_session.is_active:
db_session.close()
def _prepare_base_data(ticker: str, start_date: str, end_date: str, strategy_params: dict) -> pd.DataFrame:
"""
Performs the initial, static data loading and feature calculation.
This includes loading price data, calculating seasonality, and fetching earnings.
This part of the process does not depend on tunable strategy parameters.
"""
logger.info(f"Preparing base data for {ticker} from {start_date} to {end_date}...")
data_df = _load_price_data(ticker, start_date, end_date)
if data_df.empty:
logger.error(f"No data found for {ticker} between {start_date} and {end_date}. Returning empty DataFrame.")
return pd.DataFrame()
# Calculate Buy-and-Hold Performance for the period
if not data_df.empty:
buy_and_hold_return_pct = 0.0
first_price = data_df['adjclose'].iloc[0]
last_price = data_df['adjclose'].iloc[-1]
if first_price != 0:
buy_and_hold_return_pct = ((last_price - first_price) / first_price) * 100
logger.info(f"Buy-and-Hold Performance for {ticker} between {data_df.index[0].date()} and {data_df.index[-1].date()}: {buy_and_hold_return_pct:.2f}%")
# --- Add Seasonality Features ---
data_df['month'] = data_df.index.month
month_dummies = pd.get_dummies(data_df['month'], prefix='month', dtype=int)
for m in range(1, 13):
if f'month_{m}' not in month_dummies.columns:
month_dummies[f'month_{m}'] = 0
data_df = pd.concat([data_df, month_dummies], axis=1)
data_df.drop('month', axis=1, inplace=True)
# --- Add Turn-of-Month Seasonality Feature ---
data_df['day_of_month'] = data_df.index.day
data_df['is_turn_of_month'] = ((data_df['day_of_month'] >= 28) | (data_df['day_of_month'] <= 4)).astype(int)
include_earnings_dates = strategy_params.get('include_earnings_dates', False)
if include_earnings_dates:
try:
earnings_dates = get_earnings_dates(ticker)
if isinstance(earnings_dates, dict) and "error" in earnings_dates:
logger.warning(f"Could not fetch earnings dates for {ticker}. Error: {earnings_dates['error']}. Skipping pre-earnings features.")
elif earnings_dates is not None and not earnings_dates.empty:
earnings_dates = earnings_dates.index.tz_localize(None).normalize()
earnings_series = pd.Series(pd.NaT, index=data_df.index)
for date in data_df.index:
future_earnings = earnings_dates[earnings_dates > date]
if not future_earnings.empty:
earnings_series.loc[date] = future_earnings.min()
data_df['days_to_earnings'] = (earnings_series - data_df.index).dt.days
pre_earnings_window = strategy_params.get('pre_earnings_window', 21)
data_df['is_pre_earnings_window'] = ((data_df['days_to_earnings'] >= 0) & (data_df['days_to_earnings'] <= pre_earnings_window)).astype(int)
else:
data_df['days_to_earnings'], data_df['is_pre_earnings_window'] = -1, 0
except Exception as e:
logger.warning(f"Could not fetch earnings dates for {ticker}. Error: {e}. Skipping pre-earnings features.")
data_df['days_to_earnings'], data_df['is_pre_earnings_window'] = -1, 0
# --- Reformat DataFrame for PyBroker ---
# PyBroker's Strategy class expects lowercase column names and 'date'/'symbol' as columns.
df_formatted = data_df.reset_index() # Move 'Date' from index to a column
df_formatted['symbol'] = ticker
# --- Force pybroker to use adjusted prices for backtesting and features ---
# This aligns the backtest environment with the training target calculation by
# replacing the raw OHLC columns with their split/dividend-adjusted values.
adjustment_factor = df_formatted['adjclose'] / df_formatted['close']
df_formatted['open'] = df_formatted['open'] * adjustment_factor
df_formatted['high'] = df_formatted['high'] * adjustment_factor
df_formatted['low'] = df_formatted['low'] * adjustment_factor
df_formatted['close'] = df_formatted['adjclose']
return df_formatted
def check_noise_impact(model, X_test, y_test, feature_names):
"""
Performs a stress test on the model to check for data leakage and overfitting.
1. Permutation Importance: Checks which features actually contribute to performance.
2. Noise Test: Replaces data with random noise to ensure performance drops.
Returns a dictionary with test results.
"""
if X_test.empty or y_test.empty:
return {}
logger.info("--- Running Noise/Permutation Test on Out-of-Sample Data ---")
test_results = {}
try:
# 1. Measure Baseline Performance
baseline_score = model.score(X_test, y_test)
logger.info(f"Baseline Accuracy on Test Data: {baseline_score:.4f}")
test_results['baseline_score'] = baseline_score
# 2. Run Permutation Importance
# n_repeats=5 is sufficient for a quick diagnostic
result = permutation_importance(
model, X_test, y_test,
n_repeats=5,
random_state=42,
n_jobs=-1
)
# 3. Analyze Results
sorted_idx = result.importances_mean.argsort()[::-1]
logger.info("Top 5 Features by Permutation Importance (Drop in Accuracy):")
top_features = []
for i in sorted_idx[:5]:
feature_name = feature_names[i]
importance = result.importances_mean[i]
logger.info(f" {feature_name}: {importance:.4f}")
top_features.append({'feature': feature_name, 'importance': importance})
test_results['permutation_importances'] = top_features
# 4. The "Total Noise" Test
# Create a completely random dataset of the same shape
X_noise = pd.DataFrame(
np.random.rand(*X_test.shape),
columns=X_test.columns,
index=X_test.index
)
noise_score = model.score(X_noise, y_test)
logger.info(f"Score on Pure Noise Data: {noise_score:.4f}")
test_results['noise_score'] = noise_score
# Heuristic check
if noise_score >= baseline_score * 0.98 and baseline_score > 0.5:
msg = "CRITICAL WARNING: Model performs well on random noise. Check for Data Leakage!"
logger.warning(msg)
test_results['status'] = 'FAIL'
test_results['message'] = msg
elif noise_score < baseline_score:
msg = "Test Passed: Model performance collapsed on noise (as expected)."
logger.info(msg)
test_results['status'] = 'PASS'
test_results['message'] = msg
else:
test_results['status'] = 'INCONCLUSIVE'
if noise_score > baseline_score:
test_results['message'] = "Model performed worse than random noise. No signal found in this fold."
else:
test_results['message'] = "Baseline is too low (<= 0.5) to verify leakage, but no signal is present."
return test_results
except Exception as e:
if "not fitted" in str(e).lower():
return {'status': 'SKIPPED', 'message': 'Model not fitted (insufficient training data).', 'baseline_score': 0, 'noise_score': 0}
logger.warning(f"Noise impact check failed: {e}")
return {}
def _calculate_drawdown(series: pd.Series) -> pd.Series:
"""Calculates the drawdown for a given time series of values."""
# Calculate the running maximum
cumulative_max = series.cummax()
# Calculate drawdown as the percentage drop from the running maximum
drawdown = (series - cumulative_max) / cumulative_max
return drawdown * 100 # Return as a percentage
def plot_performance_vs_benchmark(result: TestResult, title: str, ticker: Optional[str] = None) -> Optional[plt.Figure]:
"""
Generates a plot to analyze strategy performance.
If a benchmark ticker is provided, it generates a three-panel plot comparing
the strategy to the benchmark (equity, relative performance, and drawdown).
If no benchmark ticker is provided (e.g., for portfolio backtests), it plots
a simple equity curve of the strategy.
Returns the matplotlib Figure object.
"""
if not hasattr(result, 'portfolio') or result.portfolio.empty:
logger.warning("No portfolio data found in results to plot.")
return None
portfolio_df = result.portfolio
if 'market_value' not in portfolio_df.columns:
logger.warning("Portfolio DataFrame is missing 'market_value' column.")
return None
# --- Prepare Benchmark Data ---
benchmark_ticker = ticker
normalized_benchmark = None
if benchmark_ticker:
start_date = result.start_date.strftime('%Y-%m-%d')
end_date = result.end_date.strftime('%Y-%m-%d')
logger.info(f"Loading benchmark data for {benchmark_ticker}...")
data_dict = load_ticker_data(benchmark_ticker, start_date, end_date)
if data_dict and 'shareprices' in data_dict and not data_dict['shareprices'].empty:
price_data = data_dict['shareprices']
initial_capital = portfolio_df['market_value'].iloc[0]
benchmark_series = price_data['Adj Close']
portfolio_dates = portfolio_df.index
benchmark_series = benchmark_series.reindex(portfolio_dates, method='ffill').dropna()
if not benchmark_series.empty:
normalized_benchmark = (benchmark_series / benchmark_series.iloc[0]) * initial_capital
else:
logger.warning(f"Benchmark data for {benchmark_ticker} could not be aligned with portfolio dates.")
else:
logger.warning(f"Could not load benchmark data for {benchmark_ticker}.")
# --- Create Plots ---
# If no benchmark is available, plot a simple equity curve.
if normalized_benchmark is None:
fig, ax = plt.subplots(figsize=(15, 7))
portfolio_df['market_value'].plot(ax=ax, label='Strategy Equity')
ax.set_title(title)
ax.set_ylabel('Portfolio Value ($)')
ax.set_xlabel('Date')
ax.legend()
ax.grid(True)
plt.tight_layout()
return fig
# If a benchmark is available, create the full 3-panel comparison plot.
fig, (ax1, ax2, ax3) = plt.subplots(3, 1, figsize=(15, 12), sharex=True,
gridspec_kw={'height_ratios': [3, 1, 2]})
fig.suptitle(title, fontsize=16)
# Top plot: Equity Curve vs. Benchmark
portfolio_df['market_value'].plot(ax=ax1, label='Strategy Equity')
normalized_benchmark.plot(ax=ax1, label=f'Buy & Hold {benchmark_ticker}', linestyle='--', color='gray')
ax1.set_ylabel('Portfolio Value ($)')
ax1.grid(True)
ax1.legend()
# Middle plot: Relative Performance Ratio
relative_performance = portfolio_df['market_value'] / normalized_benchmark
relative_performance.plot(ax=ax2, label='Relative Performance (Strategy / Benchmark)', color='purple')
ax2.axhline(1, color='black', linestyle='--', linewidth=1)
ax2.set_ylabel('Ratio')
ax2.grid(True, which='both', linestyle='--', linewidth=0.5)
ax2.legend()
# Third plot: Underwater (Drawdown)
strategy_drawdown = _calculate_drawdown(portfolio_df['market_value'])
strategy_drawdown.plot(ax=ax3, label='Strategy Drawdown', color='red')
ax3.fill_between(strategy_drawdown.index, strategy_drawdown, 0, color='red', alpha=0.3)
benchmark_drawdown = _calculate_drawdown(normalized_benchmark)
benchmark_drawdown.plot(ax=ax3, label=f'Benchmark Drawdown ({benchmark_ticker})', color='gray', linestyle='--')
ax3.set_ylabel('Drawdown (%)')
ax3.axhline(0, color='black', linestyle='-', linewidth=1)
ax3.grid(True, which='both', linestyle='--', linewidth=0.5)
ax3.legend()
ax3.set_xlabel('Date')
plt.tight_layout(rect=[0, 0.03, 1, 0.97])
return fig
def prepare_metrics_df_for_display(metrics_df: pd.DataFrame, timeframe: str = '1d') -> pd.DataFrame:
"""
Prepares the pybroker metrics DataFrame for display by annualizing key ratios.
Args:
metrics_df: The raw metrics_df from a TestResult.
timeframe: The timeframe of the backtest data (e.g., '1d', '1h').
Returns:
A new DataFrame with annualized Sharpe and Sortino ratios.
"""
display_df = metrics_df.copy()
sharpe_mask = display_df['name'] == 'sharpe'
if sharpe_mask.any():
original_sharpe = float(display_df.loc[sharpe_mask, 'value'].iloc[0])
display_df.loc[sharpe_mask, 'value'] = _calculate_annualized_ratio(original_sharpe, timeframe)
sortino_mask = display_df['name'] == 'sortino'
if sortino_mask.any():
original_sortino = float(display_df.loc[sortino_mask, 'value'].iloc[0])
display_df.loc[sortino_mask, 'value'] = _calculate_annualized_ratio(original_sortino, timeframe)
for col in display_df.columns:
if display_df[col].dtype == 'object':
display_df[col] = display_df[col].astype(str)
return display_df
def _calculate_annualized_ratio(daily_ratio: float | None, timeframe: str = '1d') -> float | None:
"""
Annualizes a daily risk-adjusted return ratio (like Sharpe or Sortino).
Args:
daily_ratio: The raw, per-period ratio from pybroker metrics.
timeframe: The timeframe of the backtest data (e.g., '1d', '1h').
Returns:
The annualized ratio, or None if it cannot be calculated.
"""
if daily_ratio is None:
return None
# Assuming 252 trading days, 252 * 6.5 trading hours for '1h', etc.
# This is a simplification but standard practice.
annualization_factors = {'1d': 252, '1h': 252 * 7, '30m': 252 * 13, '15m': 252 * 26}
factor = annualization_factors.get(timeframe, 252) # Default to daily
return float(daily_ratio * np.sqrt(factor))
def plot_trades_on_chart(result: TestResult, ticker: str, title: str) -> Optional[plt.Figure]:
"""
Plots the executed trades on top of the price chart to visualize strategy behavior.
Returns the matplotlib Figure object.
"""
if not hasattr(result, 'trades') or result.trades.empty:
logger.warning("No trade data found in results to plot.")
return None
trades_df = result.trades
# Load the price data for the backtest period to plot the price curve
start_date = result.start_date.strftime('%Y-%m-%d')
end_date = result.end_date.strftime('%Y-%m-%d')
data_dict = load_ticker_data(ticker, start_date, end_date)
if not data_dict or 'shareprices' not in data_dict or data_dict['shareprices'] is None:
logger.error(f"Could not load price data for {ticker} to plot trades.")
return None
price_data = data_dict['shareprices']
if price_data.empty:
logger.error(f"Price data DataFrame is empty for {ticker}.")
return None
if 'Date' in price_data.columns:
price_data['Date'] = pd.to_datetime(price_data['Date'])
price_data = price_data.set_index('Date')
elif not isinstance(price_data.index, pd.DatetimeIndex):
price_data.index = pd.to_datetime(price_data.index)
fig, ax = plt.subplots(figsize=(15, 7))
ax.plot(price_data.index, price_data['Adj Close'], label=f'{ticker} Price', color='skyblue', alpha=0.7, zorder=1)
# Separate winning and losing trades for different coloring
winning_trades = trades_df[trades_df['pnl'] > 0]
losing_trades = trades_df[trades_df['pnl'] <= 0]
ax.scatter(winning_trades['entry_date'], winning_trades['entry'], marker='^', color='green', s=100, label='Winning Entry', zorder=2)
ax.scatter(losing_trades['entry_date'], losing_trades['entry'], marker='^', color='red', s=100, label='Losing Entry', zorder=2)
# Connect entry and exit points with lines
for _, trade in trades_df.iterrows():
color = 'green' if trade['pnl'] > 0 else 'red'
ax.plot([trade['entry_date'], trade['exit_date']], [trade['entry'], trade['exit']], color=color, linestyle='--', linewidth=1.5, zorder=3)
ax.set_title(title)
ax.set_xlabel('Date')
ax.set_ylabel('Price ($)')
ax.legend()
ax.grid(True)
return fig
def plot_feature_importance(feature_names, all_importances, top_n=20) -> Optional[plt.Figure]:
"""
Calculates and plots the average feature importance from all walk-forward folds.
Returns the matplotlib Figure object.
"""
if not all_importances or not feature_names:
logger.warning("No feature importances or feature names were collected to plot.")
return None
# Calculate the mean and standard deviation of importances across all folds
mean_importances = np.mean(all_importances, axis=0)
std_importances = np.std(all_importances, axis=0)
importance_df = pd.DataFrame({
'feature': feature_names,
'importance': mean_importances,
'std': std_importances
}).sort_values(by='importance', ascending=True)
# Plot the top N features
top_features_df = importance_df.tail(top_n)
# The log should show features from most to least important, so we reverse the list from the tail.
logger.info(f"Top {top_n} Feature Importances (most to least): {top_features_df['feature'].tolist()[::-1]}")
fig, ax = plt.subplots(figsize=(12, 8))
ax.barh(top_features_df['feature'], top_features_df['importance'], xerr=top_features_df['std'], align='center', capsize=5, color='skyblue', edgecolor='black', alpha=0.8)
ax.set_xlabel('Mean Feature Importance (LGBM Gain)')
ax.set_ylabel('Features')
ax.set_title(f'Top {top_n} Feature Importances (Averaged Across Walk-Forward Folds)')
# With ascending sort and tail(), the most important is last, which barh plots at the top. No inversion needed.
ax.grid(axis='x', linestyle='--', alpha=0.6)
plt.tight_layout() # Adjust layout to make room for labels
return fig
def run_visualize_model(ticker: str, strategy_type: str) -> Optional[Dict]:
"""
Loads saved model artifacts from a 'train' run and generates visualization assets.
Args:
ticker: The stock ticker symbol.
strategy_type: The strategy type.
Returns:
A dictionary containing DataFrames and matplotlib Figures for UI display,
or None if artifacts cannot be loaded.
"""
logger.info(f"--- Visualizing Results for {ticker} with {strategy_type} strategy ---")
model_dir = os.path.join('pybroker_trainer', 'artifacts')
# --- Construct file paths ---
results_filename = os.path.join(model_dir, f'{ticker}_{strategy_type}_results.pkl')
features_filename = os.path.join(model_dir, f'{ticker}_{strategy_type}_features.json')
importances_filename = os.path.join(model_dir, f'{ticker}_{strategy_type}_importances.pkl') # type: ignore
model_params_filename = os.path.join(model_dir, f'{ticker}_{strategy_type}_best_params.json')
strategy_params_filename = os.path.join(model_dir, f'{ticker}_{strategy_type}_best_strategy_params.json')
# --- Load artifacts, handling missing files gracefully for non-ML strategies ---
try:
# The results file is mandatory for any visualization.
with open(results_filename, 'rb') as f: result = pickle.load(f)
except FileNotFoundError:
logger.error(f"Could not find required results file: {results_filename}. Please run the 'train' command first.")
return None
# Optional artifacts (may not exist for non-ML strategies)
features = None
all_importances = None
best_model_params = None
best_strategy_params = None
if os.path.exists(features_filename):
with open(features_filename, 'r') as f: features = json.load(f)
if os.path.exists(importances_filename):
with open(importances_filename, 'rb') as f: all_importances = pickle.load(f)
if os.path.exists(model_params_filename):
with open(model_params_filename, 'r') as f: best_model_params = json.load(f)
if os.path.exists(strategy_params_filename):
with open(strategy_params_filename, 'r') as f: best_strategy_params = json.load(f)
# --- Check for the annualization flag in the correct file ---
# The flag is saved in `_best_strategy_params.json` during a tune run,
# and in `_strategy_params.json` during a regular train run. We need to check both.
params_to_check = best_strategy_params
if not params_to_check:
# Fallback to the regular strategy params file if the 'best' one doesn't exist
regular_params_filename = os.path.join(model_dir, f'{ticker}_{strategy_type}_strategy_params.json')
if os.path.exists(regular_params_filename):
with open(regular_params_filename, 'r') as f: params_to_check = json.load(f)
if params_to_check and params_to_check.get('ratios_annualized'):
metrics_df = result.metrics_df # Ratios are already annualized.
else:
metrics_df = prepare_metrics_df_for_display(result.metrics_df, '1d') # Legacy file, annualize on-the-fly.
trades_df = result.trades.copy()
for col in trades_df.columns:
if trades_df[col].dtype == 'object':
trades_df[col] = trades_df[col].astype(str)
# --- Generate plots, checking if the necessary data exists ---
importance_fig = None
if features and all_importances:
importance_fig = plot_feature_importance(features, all_importances)
return {
"metrics_df": metrics_df,
"trades_df": trades_df,
"performance_fig": plot_performance_vs_benchmark(result, f'Walk-Forward Performance for {ticker} ({strategy_type})'),
"trades_fig": plot_trades_on_chart(result, ticker, f'Trades for {ticker} ({strategy_type})'),
"importance_fig": importance_fig,
"best_model_params": best_model_params,
"best_strategy_params": best_strategy_params
}
def _load_strategy_params(ticker: str, strategy_type: str) -> Optional[dict]:
"""
Loads strategy parameters for a given ticker and strategy, prioritizing
tuned parameters over last-run parameters, and falling back to defaults.
"""
strategy_class = load_strategy_class(strategy_type)
if not strategy_class:
logger.error(f"Cannot load params: Strategy class for '{strategy_type}' not found.")
return None
# Start with defaults
params = get_strategy_defaults(strategy_class)
model_dir = os.path.join('pybroker_trainer', 'artifacts')
best_params_path = os.path.join(model_dir, f'{ticker}_{strategy_type}_best_strategy_params.json')
last_run_params_path = os.path.join(model_dir, f'{ticker}_{strategy_type}_strategy_params.json')
# Prioritize best tuned params
if os.path.exists(best_params_path):
try:
with open(best_params_path, 'r') as f:
tuned_params = json.load(f)
params.update(tuned_params)
logger.info(f"Loaded BEST strategy parameters for {ticker}-{strategy_type}")
except Exception as e:
logger.warning(f"Could not load best strategy params file {best_params_path}. Error: {e}")
# Fallback to last-run params
elif os.path.exists(last_run_params_path):
try:
with open(last_run_params_path, 'r') as f:
last_run_params = json.load(f)
params.update(last_run_params)
logger.info(f"Loaded last-run strategy parameters for {ticker}-{strategy_type}")
except Exception as e:
logger.warning(f"Could not load last-run strategy params file {last_run_params_path}. Error: {e}")
return params
def custom_predict_fn(model_bundle, data):
"""
Custom predict function for LGBM models.
Relies solely on the model_bundle returned by train_fn.
Returns a 2D NumPy array compatible with PyBroker's ctx.preds.
"""
# Unpack the model, features, and scaler from the bundle.
model = model_bundle['model']
features = model_bundle['features']
if isinstance(model, PassThroughModel):
return model.predict_proba(data)
# --- Handle cases where the model was not fitted due to lack of data ---
# The train_fn returns an unfitted model if the training data for a fold is empty.
# This check prevents a crash during the prediction phase for that fold.
if not hasattr(model, 'n_classes_'):
# This model is unfitted. Determine the expected number of classes from its objective.
n_classes = 3 if model.objective == 'multiclass' else 2
logger.warning("custom_predict_fn received an unfitted model. Returning neutral probabilities.")
# Return a neutral probability for every row in the input data.
# This ensures the backtest can continue without taking trades in this fold.
return np.full((len(data), n_classes), 1.0 / n_classes, dtype=np.float64)
n_classes = model.n_classes_
expected_features = model.feature_name_ if hasattr(model, 'feature_name_') else features
if data.empty:
logger.warning("custom_predict_fn received empty data, returning neutral probabilities.")
# Return an empty array with the correct number of columns (classes)
return np.empty((0, n_classes), dtype=np.float64)
if not all(f in data.columns for f in expected_features):
missing = set(expected_features) - set(data.columns)
raise ValueError(f"Missing features: {missing}")
input_data = data[expected_features]
probabilities = model.predict_proba(input_data) # Return full array to match working case
return probabilities # Let PyBroker handle multi-row output
# --- NEW: Custom Strategy class for Expanding Window Walk-Forward ---
class ExpandingWindowStrategy(pybroker.Strategy):
"""
A custom Strategy class that overrides the default walk-forward split logic
to implement an expanding window instead of a sliding one. This is necessary
for versions of pybroker that do not have a `rolling` parameter.
"""
def walkforward_split(
self,
df: pd.DataFrame,
windows: int,
lookahead: int,
train_size: float, # This parameter is ignored in this implementation
shuffle: bool = False,
) -> iter:
"""
Generates train/test splits for an expanding window walk-forward analysis.
The training data grows with each window to include the previous test set.
"""
logger.info("Using custom ExpandingWindowStrategy to generate expanding window splits...")
date_col = 'date' # pybroker lowercases column names
unique_dates = np.unique(df[date_col])
n_dates = len(unique_dates)
if windows <= 0 or n_dates == 0: return
# The data is divided into `windows + 1` chunks. The first is for initial training.
test_size_days = n_dates // (windows + 1)
if test_size_days < 1:
logger.error(f"Not enough data for {windows} windows. Each test set would have < 1 day.")
return
train_end_date_idx = n_dates - (windows * test_size_days) - 1
for i in range(windows):
test_start_date_idx = train_end_date_idx + lookahead
# For the last window, extend the test set to the end of the data to include any remainder.
if i == windows - 1:
test_end_date_idx = n_dates - 1
else:
test_end_date_idx = test_start_date_idx + test_size_days - 1
if test_end_date_idx >= n_dates: test_end_date_idx = n_dates - 1
if test_start_date_idx > test_end_date_idx: break
train_end_date = unique_dates[train_end_date_idx]
test_start_date = unique_dates[test_start_date_idx]
test_end_date = unique_dates[test_end_date_idx]
train_indices = df.index[df[date_col] <= train_end_date].to_numpy()
test_indices = df.index[(df[date_col] >= test_start_date) & (df[date_col] <= test_end_date)].to_numpy()
if shuffle: np.random.shuffle(train_indices)
yield WalkforwardWindow(train_indices, test_indices)
train_end_date_idx = test_end_date_idx
BASE_CONTEXT_COLUMNS = ['open', 'high', 'low', 'close', 'volume', 'target', 'setup_mask', 'atr']
def run_pybroker_walkforward(ticker: str = 'SPY', start_date: str = '2000-01-01', end_date: Optional[str] = None, strategy_type: str = 'trend_following', tune_hyperparameters: bool = True, plot_results: bool = True, save_assets: bool = False, override_params: dict = None, use_tuned_strategy_params: bool = False, disable_inner_parallelism: bool = False, preloaded_data_df: pd.DataFrame = None, preloaded_features: list = None, commission_cost: float = 0.0, calc_bootstrap: bool = True, perform_noise_test: bool = False, stop_event_checker=None):
"""
Runs the full walk-forward analysis for a given ticker.
"""
features = [] # Ensure features is defined in the outer scope for the finally block
context_columns_to_register = [] # for the finally block
last_best_params = None
last_trained_model = None
is_ml = True # Assume ML strategy by default
all_quality_scores = [] # For tracking model quality across folds
noise_test_results = [] # For tracking noise test results across folds
try:
if stop_event_checker and stop_event_checker():
logger.warning("Stop event detected before starting walkforward analysis.")
# Return the expected tuple format
return None, [], []
# --- Unify default end_date handling ---
if end_date is None:
end_date = (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d')
# --- Load strategy config from JSON file ---
strategy_class = load_strategy_class(strategy_type)
base_params = get_strategy_defaults(strategy_class) if strategy_class else {}
base_params.update({ # Add runtime params
'disable_inner_parallelism': disable_inner_parallelism,
})
# Load tuned strategy parameters if requested
if use_tuned_strategy_params:
strategy_params = load_strategy_config(strategy_type, base_params)
else:
strategy_params = base_params
# Allow overriding parameters for optimization
if override_params:
logger.info(f"Overriding strategy parameters with: {override_params}")
strategy_params.update(override_params)
if strategy_class:
strategy_instance = strategy_class(params=strategy_params)
is_ml = strategy_instance.is_ml_strategy
# --- OPTIMIZATION: Use pre-loaded data if provided (e.g., from tune_strategy) ---
if preloaded_data_df is not None and preloaded_features is not None:
logger.info("Using pre-loaded data for backtest.")
data_df = preloaded_data_df
features = preloaded_features
else:
if strategy_class:
base_df = _prepare_base_data(ticker, start_date, end_date, strategy_params)
data_df = strategy_instance.prepare_data(data=base_df)
features = strategy_instance.get_feature_list()
context_columns_to_register = BASE_CONTEXT_COLUMNS + strategy_instance.get_extra_context_columns_to_register()
else: # Fallback for legacy strategies
raise NotImplementedError(f"Strategy '{strategy_type}' has not been migrated to the new encapsulated format. Please create a strategy module for it.")
if is_ml:
# --- NEW: Add a global check for minimum setups before starting the walk-forward ---
min_total_setups_for_run = 60 # A reasonable minimum for the entire dataset
valid_setups = data_df.dropna(subset=['target'])
if len(valid_setups) < min_total_setups_for_run:
logger.error(f"Strategy '{strategy_type}' for {ticker} has only {len(valid_setups)} total setups.")
logger.error(f"This is insufficient for a reliable walk-forward backtest (min: {min_total_setups_for_run}).")
logger.error("Consider using the 'pre-scan-universe' command to find tickers with more frequent setups.")
return None, [], [] # Abort the run early
# Filter out features that might not have been calculated or don't exist
# features = [f for f in data_df.columns if f in features]
# data_df.dropna(subset=features + ['target'], inplace=True)
if is_ml:
pybroker.register_columns(features)
pybroker.register_columns(context_columns_to_register) # Always register context columns
# Define a function to provide input data for the model.
# This is required when custom columns are registered. It tells pybroker
# which columns from the main DataFrame should be fed to the model for prediction.
def model_input_data_fn(data):
return data[features]
# Step 2: Define the training function for the model.
# This function will be called by PyBroker for each walk-forward window.
def train_fn(symbol, train_data, test_data, **kwargs):
nonlocal all_feature_importances, last_best_params, last_trained_model, noise_test_results
from sklearn.model_selection import train_test_split # Local import for this function
model_config = strategy_instance.get_model_config()
# --- NEW: Enhanced logging and checks for data validity ---
train_start = train_data['date'].min().date() if not train_data.empty else 'N/A'
train_end = train_data['date'].max().date() if not train_data.empty else 'N/A'
logger.info(f"[{symbol}] Training fold: {train_start} to {train_end}. Initial samples: {len(train_data)}")
# Use nonlocal to modify the list defined in the outer scope
pybroker.disable_logging()
if train_data.empty:
logger.warning(f"[{symbol}] Training data is empty for this fold. This is expected if the stock did not exist for the full period. Returning untrained model.")
model = LGBMClassifier(random_state=42, n_jobs=1, **model_config)
if perform_noise_test:
noise_test_results.append({'status': 'SKIPPED', 'message': 'Training data empty.', 'baseline_score': 0, 'noise_score': 0})
return {'model': model, 'features': features}
# --- Filter training data to only include rows with valid targets ---
if 'target' in train_data.columns:
train_data = train_data.dropna(subset=['target'])
# --- Apply embargo to prevent data leakage from future information ---
# The embargo period is based on the `target_eval_bars` or `stop_out_window`
# parameter, which defines how many bars into the future the target is calculated.
# We remove data points from the end of the training set that would overlap
# with the target calculation window of the test set.
# This is crucial for preventing look-ahead bias in walk-forward validation.
if not train_data.empty:
eval_bars = strategy_instance.params.get('target_eval_bars')
if eval_bars is None:
eval_bars = strategy_instance.params.get('stop_out_window', 15) # Default to 15 if neither is found
last_train_date = train_data['date'].max()
embargo_start_date = last_train_date - pd.Timedelta(days=eval_bars * 1.5) # Use 1.5 for a safety margin
original_len = len(train_data)
train_data = train_data[train_data['date'] < embargo_start_date]
logger.info(f"[{symbol}] Applied embargo: Purged {original_len - len(train_data)} samples from the end of the training set.")
# --- More robust check for minimum samples per fold ---
min_total_samples = 30 # Increased minimum total setups required for training
min_class_samples = 10 # Minimum required setups for EACH class (win and loss)
if train_data.empty or len(train_data) < min_total_samples:
logger.warning(f"[{symbol}] Training data is empty or has insufficient samples ({len(train_data)} < {min_total_samples}). No model will be trained for this fold.")
model = LGBMClassifier(random_state=42, n_jobs=1, **model_config)
if perform_noise_test:
noise_test_results.append({'status': 'SKIPPED', 'message': f'Insufficient samples ({len(train_data)} < {min_total_samples}).', 'baseline_score': 0, 'noise_score': 0})
return {'model': model, 'features': features}
# Check for minimum samples in the minority class
if 'target' in train_data.columns and not train_data['target'].value_counts().empty:
if len(train_data['target'].unique()) < 2 or train_data['target'].value_counts().min() < min_class_samples:
logger.warning(f"[{symbol}] Minority class has too few samples ({train_data['target'].value_counts().min()} < {min_class_samples}). No model will be trained for this fold.")
model = LGBMClassifier(random_state=42, n_jobs=1, **model_config)
if perform_noise_test:
noise_test_results.append({'status': 'SKIPPED', 'message': f'Class imbalance (min class < {min_class_samples}).', 'baseline_score': 0, 'noise_score': 0})
return {'model': model, 'features': features}
# --- Determine if tuning is feasible for this specific fold ---
can_tune = tune_hyperparameters
if can_tune:
n_splits = 3 # Must match the cv_splitter below
min_samples_for_tuning = 20 # A reasonable minimum to attempt tuning
# Check if any class has fewer samples than n_splits, which would break StratifiedKFold
if 'target' in train_data.columns and not train_data['target'].value_counts().empty:
min_class_count = train_data['target'].value_counts().min()
if min_class_count < n_splits or len(train_data) < min_samples_for_tuning:
logger.warning(f"[{symbol}] Insufficient samples for tuning (Total: {len(train_data)}, Min Class: {min_class_count}).")
logger.warning(f"[{symbol}] Disabling hyperparameter tuning for this fold and using default parameters.")
can_tune = False
else:
logger.warning(f"[{symbol}] Target column missing or empty in train_data. Disabling tuning for this fold.")
can_tune = False
if can_tune:
best_params = _tune_hyperparameters_with_gp_minimize(
train_data=train_data,
features=features,
model_config=model_config,
stop_event_checker=stop_event_checker,
symbol=symbol
)
final_model = LGBMClassifier(random_state=42, n_jobs=-1, class_weight='balanced', **model_config, **best_params)
final_model.fit(train_data[features], train_data['target'].astype(int))
last_best_params = best_params
last_trained_model = final_model
if hasattr(final_model, 'feature_importances_'):
all_feature_importances.append(final_model.feature_importances_)
return {'model': final_model, 'features': features}
else:
# --- Train with default hyperparameters (no tuning) ---
logger.info("Running with default hyperparameters (no tuning).")
# --- Train LGBM with default hyperparameters ---
try:
sub_train, sub_val = train_test_split(train_data, test_size=0.2, random_state=42, stratify=train_data['target'])
except ValueError:
logger.warning(f"[{symbol}] Could not create validation split due to class imbalance. Proceeding without quality gate for this fold.")
sub_train, sub_val = train_data, pd.DataFrame()
n_jobs = 1 if strategy_params.get('disable_inner_parallelism') else -1
default_lgbm_params = {'random_state': 42, 'n_jobs': n_jobs, 'class_weight': 'balanced', 'min_child_samples': 5, **model_config}
temp_model = LGBMClassifier(**default_lgbm_params)
temp_model.fit(sub_train[features], sub_train['target'].astype(int))
# Add a model quality gate to prevent poorly performing models from being used in backtest
auc_score = 0.5
if not sub_val.empty and len(np.unique(sub_val['target'])) > 1:
val_preds = temp_model.predict_proba(sub_val[features])
if model_config.get('objective') == 'binary':
auc_score = roc_auc_score(sub_val['target'], val_preds[:, 1])
else:
auc_score = roc_auc_score(sub_val['target'], val_preds, multi_class='ovr')
# Track the quality score for this fold
all_quality_scores.append(auc_score)
min_auc_threshold = 0.52
if auc_score < min_auc_threshold:
logger.warning(f"[{symbol}] Model quality check failed for this fold (AUC: {auc_score:.3f} < {min_auc_threshold}).")
logger.warning(f"[{symbol}] Discarding trained model and using a pass-through model instead.")
final_model = PassThroughModel(n_classes=model_config.get('num_class', 2))
else:
logger.info(f"[{symbol}] Model quality check passed (AUC: {auc_score:.3f}). Retraining on full fold data.")
final_model = LGBMClassifier(**default_lgbm_params)
final_model.fit(train_data[features], train_data['target'].astype(int))
if hasattr(final_model, 'feature_importances_'):
all_feature_importances.append(final_model.feature_importances_)
last_trained_model = final_model
# --- NEW: Noise Impact Check ---
if perform_noise_test:
if isinstance(final_model, PassThroughModel):
# If the model was discarded due to low quality, record it as a failure/skip
# We use the calculated auc_score if available, otherwise 0.5
score = locals().get('auc_score', 0.5)
noise_test_results.append({'status': 'FAIL', 'message': f'Model discarded due to low validation AUC ({score:.3f} < {min_auc_threshold}).', 'baseline_score': score, 'noise_score': 0})
elif not test_data.empty and 'target' in test_data.columns:
test_data_clean = test_data.dropna(subset=['target'])
if not test_data_clean.empty:
res = check_noise_impact(final_model, test_data_clean[features], test_data_clean['target'].astype(int), features)
if res:
noise_test_results.append(res)
return {'model': final_model, 'features': features}
# Step 3: Register the model with PyBroker.
# We don't pass `indicators` because they are already calculated and part of `data_df`.
model_name = 'binary_classifier'
model_source = pybroker.model(name=model_name, fn=train_fn, predict_fn=custom_predict_fn, input_data_fn=model_input_data_fn)
# Step 4: Configure StrategyConfig and instantiate the correct trader
strategy_config = StrategyConfig(
position_mode=PositionMode.LONG_ONLY,
fee_mode=pybroker.FeeMode.PER_SHARE if commission_cost > 0 else None,
fee_amount=commission_cost
)
strategy = ExpandingWindowStrategy(data_source=data_df, start_date=start_date, end_date=end_date, config=strategy_config)
trader = strategy_instance.get_trader(model_name if is_ml else None, {ticker: strategy_params})
if is_ml:
models_to_use = model_source if isinstance(model_source, list) else [model_source]
strategy.add_execution(trader.execute, [ticker], models=models_to_use)
else:
# Non-ML strategies don't need a model passed to their execution
strategy.add_execution(trader.execute, [ticker])
# Step 5: Run the walk-forward analysis
all_feature_importances = [] # Reset before running
logger.info("Starting PyBroker walk-forward analysis...")
# --- REVISED: Walk-Forward Configuration for Compatibility ---
# The ability to specify `test_size` in walkforward is a feature of newer pybroker versions.
# To ensure compatibility, we will revert to calculating the number of `windows` based on the
# total length of the dataset. This is a robust method that works across versions.
total_years = (data_df['date'].max() - data_df['date'].min()).days / 365.25 if not data_df.empty else 0
if total_years < 4: # Need at least ~4 years for a meaningful split
logger.error(f"Not enough data ({total_years:.2f} years) for a walk-forward. Minimum 4 years required.")