-
Notifications
You must be signed in to change notification settings - Fork 5.9k
Expand file tree
/
Copy pathtest_all_pipeline.py
More file actions
262 lines (220 loc) · 7.17 KB
/
test_all_pipeline.py
File metadata and controls
262 lines (220 loc) · 7.17 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
import sys
import shutil
import unittest
from pathlib import Path
import numpy as np
import pandas as pd
import qlib
from qlib.config import REG_CN, C
from qlib.utils import drop_nan_by_y_index
from qlib.contrib.model.gbdt import LGBModel
from qlib.contrib.data.handler import Alpha158
from qlib.contrib.strategy.strategy import TopkDropoutStrategy
from qlib.contrib.evaluate import (
backtest as normal_backtest,
risk_analysis,
)
from qlib.utils import exists_qlib_data, init_instance_by_config, flatten_dict
from qlib.workflow import R
from qlib.workflow.record_temp import SignalRecord, SigAnaRecord, PortAnaRecord
from qlib.tests.data import GetData
from qlib.tests import TestAutoData
market = "csi300"
benchmark = "SH000300"
###################################
# train model
###################################
data_handler_config = {
"start_time": "2008-01-01",
"end_time": "2020-08-01",
"fit_start_time": "2008-01-01",
"fit_end_time": "2014-12-31",
"instruments": market,
}
task = {
"model": {
"class": "LGBModel",
"module_path": "qlib.contrib.model.gbdt",
"kwargs": {
"loss": "mse",
"colsample_bytree": 0.8879,
"learning_rate": 0.0421,
"subsample": 0.8789,
"lambda_l1": 205.6999,
"lambda_l2": 580.9768,
"max_depth": 8,
"num_leaves": 210,
"num_threads": 20,
},
},
"dataset": {
"class": "DatasetH",
"module_path": "qlib.data.dataset",
"kwargs": {
"handler": {
"class": "Alpha158",
"module_path": "qlib.contrib.data.handler",
"kwargs": data_handler_config,
},
"segments": {
"train": ("2008-01-01", "2014-12-31"),
"valid": ("2015-01-01", "2016-12-31"),
"test": ("2017-01-01", "2020-08-01"),
},
},
},
}
port_analysis_config = {
"strategy": {
"class": "TopkDropoutStrategy",
"module_path": "qlib.contrib.strategy.strategy",
"kwargs": {
"topk": 50,
"n_drop": 5,
},
},
"backtest": {
"verbose": False,
"limit_threshold": 0.095,
"account": 100000000,
"benchmark": benchmark,
"deal_price": "close",
"open_cost": 0.0005,
"close_cost": 0.0015,
"min_cost": 5,
},
}
def train():
"""train model
Returns
-------
pred_score: pandas.DataFrame
predict scores
performance: dict
model performance
"""
# model initiaiton
model = init_instance_by_config(task["model"])
dataset = init_instance_by_config(task["dataset"])
# To test __repr__
print(dataset)
print(R)
# start exp
with R.start(experiment_name="workflow"):
R.log_params(**flatten_dict(task))
model.fit(dataset)
# prediction
recorder = R.get_recorder()
# To test __repr__
print(recorder)
# To test get_local_dir
print(recorder.get_local_dir())
rid = recorder.id
sr = SignalRecord(model, dataset, recorder)
sr.generate()
pred_score = sr.load()
# calculate ic and ric
sar = SigAnaRecord(recorder)
sar.generate()
ic = sar.load(sar.get_path("ic.pkl"))
ric = sar.load(sar.get_path("ric.pkl"))
return pred_score, {"ic": ic, "ric": ric}, rid
def train_with_sigana():
"""train model followed by SigAnaRecord
Returns
-------
pred_score: pandas.DataFrame
predict scores
performance: dict
model performance
"""
model = init_instance_by_config(task["model"])
dataset = init_instance_by_config(task["dataset"])
# start exp
with R.start(experiment_name="workflow_with_sigana"):
R.log_params(**flatten_dict(task))
model.fit(dataset)
# predict and calculate ic and ric
recorder = R.get_recorder()
sar = SigAnaRecord(recorder, model=model, dataset=dataset)
sar.generate()
ic = sar.load(sar.get_path("ic.pkl"))
ric = sar.load(sar.get_path("ric.pkl"))
pred_score = sar.load("pred.pkl")
uri_path = R.get_uri()
return pred_score, {"ic": ic, "ric": ric}, uri_path
def fake_experiment():
"""A fake experiment workflow to test uri
Returns
-------
pass_or_not_for_default_uri: bool
pass_or_not_for_current_uri: bool
temporary_exp_dir: str
"""
# start exp
default_uri = R.get_uri()
current_uri = "file:./temp-test-exp-mag"
with R.start(experiment_name="fake_workflow_for_expm", uri=current_uri):
R.log_params(**flatten_dict(task))
current_uri_to_check = R.get_uri()
default_uri_to_check = R.get_uri()
return default_uri == default_uri_to_check, current_uri == current_uri_to_check, current_uri
def backtest_analysis(pred, rid):
"""backtest and analysis
Parameters
----------
pred : pandas.DataFrame
predict scores
rid : str
the id of the recorder to be used in this function
Returns
-------
analysis : pandas.DataFrame
the analysis result
"""
recorder = R.get_recorder(experiment_name="workflow", recorder_id=rid)
# backtest
par = PortAnaRecord(recorder, port_analysis_config)
par.generate()
analysis_df = par.load(par.get_path("port_analysis.pkl"))
print(analysis_df)
return analysis_df
class TestAllFlow(TestAutoData):
PRED_SCORE = None
REPORT_NORMAL = None
POSITIONS = None
RID = None
@classmethod
def tearDownClass(cls) -> None:
shutil.rmtree(str(Path(C["exp_manager"]["kwargs"]["uri"].strip("file:")).resolve()))
def test_0_train_with_sigana(self):
TestAllFlow.PRED_SCORE, ic_ric, uri_path = train_with_sigana()
self.assertGreaterEqual(ic_ric["ic"].all(), 0, "train failed")
self.assertGreaterEqual(ic_ric["ric"].all(), 0, "train failed")
shutil.rmtree(str(Path(uri_path.strip("file:")).resolve()))
def test_1_train(self):
TestAllFlow.PRED_SCORE, ic_ric, TestAllFlow.RID = train()
self.assertGreaterEqual(ic_ric["ic"].all(), 0, "train failed")
self.assertGreaterEqual(ic_ric["ric"].all(), 0, "train failed")
def test_2_backtest(self):
analyze_df = backtest_analysis(TestAllFlow.PRED_SCORE, TestAllFlow.RID)
self.assertGreaterEqual(
analyze_df.loc(axis=0)["excess_return_with_cost", "annualized_return"].values[0],
0.10,
"backtest failed",
)
def test_3_expmanager(self):
pass_default, pass_current, uri_path = fake_experiment()
self.assertTrue(pass_default, msg="default uri is incorrect")
self.assertTrue(pass_current, msg="current uri is incorrect")
shutil.rmtree(str(Path(uri_path.strip("file:")).resolve()))
def suite():
_suite = unittest.TestSuite()
_suite.addTest(TestAllFlow("test_0_train"))
_suite.addTest(TestAllFlow("test_1_backtest"))
return _suite
if __name__ == "__main__":
runner = unittest.TextTestRunner()
runner.run(suite())