Skip to content
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion examples/highfreq/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ class HighfreqWorkflow(object):
SPEC_CONF = {"custom_ops": [DayLast, FFillNan, BFillNan, Date, Select, IsNull, Cut], "expression_cache": None}

MARKET = "all"
BENCHMARK = "SH000300"

start_time = "2020-09-15 00:00:00"
end_time = "2021-01-18 16:00:00"
Expand Down
1 change: 1 addition & 0 deletions examples/rolling_process_data/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# Rolling Process Data
32 changes: 32 additions & 0 deletions examples/rolling_process_data/rolling_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from qlib.data.dataset.handler import DataHandlerLP
from qlib.data.dataset.loader import DataLoaderDH
from qlib.contrib.data.handler import check_transform_proc


class RollingDataHandler(DataHandlerLP):
def __init__(
self,
start_time=None,
end_time=None,
infer_processors=[],
learn_processors=[],
fit_start_time=None,
fit_end_time=None,
data_loader_kwargs={},
):
infer_processors = check_transform_proc(infer_processors, fit_start_time, fit_end_time)
learn_processors = check_transform_proc(learn_processors, fit_start_time, fit_end_time)

data_loader = {
"class": "DataLoaderDH",
"kwargs": {**data_loader_kwargs},
}

super().__init__(
instruments=None,
start_time=start_time,
end_time=end_time,
data_loader=data_loader,
infer_processors=infer_processors,
learn_processors=learn_processors,
)
135 changes: 135 additions & 0 deletions examples/rolling_process_data/workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

import qlib
import fire
import pickle
import pandas as pd

from datetime import datetime
from qlib.config import REG_CN
from qlib.data.dataset.handler import DataHandlerLP
from qlib.contrib.data.handler import Alpha158
from qlib.utils import exists_qlib_data, init_instance_by_config
from qlib.tests.data import GetData


class RollingDataWorkflow(object):

MARKET = "csi300"
start_time = "2010-01-01"
end_time = "2019-12-31"
rolling_cnt = 5

def _init_qlib(self):
"""initialize qlib"""
# use yahoo_cn_1min data
provider_uri = "~/.qlib/qlib_data/cn_data" # target_dir
if not exists_qlib_data(provider_uri):
print(f"Qlib data is not found in {provider_uri}")
GetData().qlib_data(target_dir=provider_uri, region=REG_CN)
qlib.init(provider_uri=provider_uri, region=REG_CN)

def _dump_pre_handler(self, path):
handler_config = {
"class": "Alpha158",
"module_path": "qlib.contrib.data.handler",
"kwargs": {
"start_time": self.start_time,
"end_time": self.end_time,
"instruments": self.MARKET,
"infer_processors": [],
"learn_processors": [],
},
}
pre_handler = init_instance_by_config(handler_config)
pre_handler.config(dump_all=True)
pre_handler.to_pickle(path)

def _load_pre_handler(self, path):
with open(path, "rb") as file_dataset:
pre_handler = pickle.load(file_dataset)
return pre_handler

def rolling_process(self):
self._init_qlib()
self._dump_pre_handler("pre_handler.pkl")
pre_handler = self._load_pre_handler("pre_handler.pkl")

train_start_time = (2010, 1, 1)
train_end_time = (2012, 12, 31)
valid_start_time = (2013, 1, 1)
valid_end_time = (2013, 12, 31)
test_start_time = (2014, 1, 1)
test_end_time = (2014, 12, 31)

dataset_config = {
"class": "DatasetH",
"module_path": "qlib.data.dataset",
"kwargs": {
"handler": {
"class": "RollingDataHandler",
"module_path": "rolling_handler",
"kwargs": {
"start_time": datetime(*train_start_time),
"end_time": datetime(*test_end_time),
"fit_start_time": datetime(*train_start_time),
"fit_end_time": datetime(*train_end_time),
"infer_processors": [
{"class": "RobustZScoreNorm", "kwargs": {"fields_group": "feature"}},
],
"learn_processors": [
{"class": "DropnaLabel"},
{"class": "CSZScoreNorm", "kwargs": {"fields_group": "label"}},
],
"data_loader_kwargs": {
"handler_config": pre_handler,
},
},
},
"segments": {
"train": (datetime(*train_start_time), datetime(*train_end_time)),
"valid": (datetime(*valid_start_time), datetime(*valid_end_time)),
"test": (datetime(*test_start_time), datetime(*test_end_time)),
},
},
}

dataset = init_instance_by_config(dataset_config)

for rolling_offset in range(self.rolling_cnt):

print(f"===========rolling{rolling_offset} start===========")
if rolling_offset:
dataset.init(
handler_kwargs={
"init_type": DataHandlerLP.IT_FIT_SEQ,
"start_time": datetime(train_start_time[0] + rolling_offset, *train_start_time[1:]),
"end_time": datetime(test_end_time[0] + rolling_offset, *test_end_time[1:]),
"fit_start_time": datetime(train_start_time[0] + rolling_offset, *train_start_time[1:]),
"fit_end_time": datetime(train_end_time[0] + rolling_offset, *train_end_time[1:]),
},
segment_kwargs={
"train": (
datetime(train_start_time[0] + rolling_offset, *train_start_time[1:]),
datetime(train_end_time[0] + rolling_offset, *train_end_time[1:]),
),
"valid": (
datetime(valid_start_time[0] + rolling_offset, *valid_start_time[1:]),
datetime(valid_end_time[0] + rolling_offset, *valid_end_time[1:]),
),
"test": (
datetime(test_start_time[0] + rolling_offset, *test_start_time[1:]),
datetime(test_end_time[0] + rolling_offset, *test_end_time[1:]),
),
},
)

dtrain, dvalid, dtest = dataset.prepare(["train", "valid", "test"])
print(dtrain, dvalid, dtest)
## print or dump data
print(f"===========rolling{rolling_offset} end===========")


if __name__ == "__main__":
fire.Fire(RollingDataWorkflow)
2 changes: 1 addition & 1 deletion qlib/data/dataset/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def init(self, handler_kwargs: dict = None, segment_kwargs: dict = None):
raise TypeError(f"param handler_kwargs must be type dict, not {type(handler_kwargs)}")
kwargs_init = {}
kwargs_conf_data = {}
conf_data_arg = {"instruments", "start_time", "end_time"}
conf_data_arg = {"instruments", "start_time", "end_time", "fit_start_time", "fit_end_time"}
for k, v in handler_kwargs.items():
if k in conf_data_arg:
kwargs_conf_data.update({k: v})
Expand Down
26 changes: 23 additions & 3 deletions qlib/data/dataset/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from ...config import C
from ...utils import parse_config, transform_end_date, init_instance_by_config
from ...utils.serial import Serializable
from .utils import get_level_index, fetch_df_by_index
from .utils import fetch_df_by_index
from pathlib import Path
from .loader import DataLoader

Expand Down Expand Up @@ -115,8 +115,6 @@ def conf_data(self, **kwargs):
for k, v in kwargs.items():
if k in attr_list:
setattr(self, k, v)
else:
raise KeyError("Such config is not supported.")

def init(self, enable_cache: bool = False):
"""
Expand Down Expand Up @@ -405,6 +403,28 @@ def process_data(self, with_fit: bool = False):
if self.drop_raw:
del self._data

def conf_data(self, **kwargs):
"""
configuration of data.
# what data to be loaded from data source

This method will be used when loading pickled handler from dataset.
The data will be initialized with different time range.

"""
attr_list = {"fit_start_time", "fit_end_time"}
for k, v in kwargs.items():
if k in attr_list:
for infer_processor in self.infer_processors:
if getattr(infer_processor, k, None):
setattr(infer_processor, k, v)

for learn_processor in self.learn_processors:
if getattr(learn_processor, k, None):
setattr(learn_processor, k, v)

super().conf_data(**kwargs)

# init type
IT_FIT_SEQ = "fit_seq" # the input of `fit` will be the output of the previous processor
IT_FIT_IND = "fit_ind" # the input of `fit` will be the original df
Expand Down
61 changes: 61 additions & 0 deletions qlib/data/dataset/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,3 +217,64 @@ def _maybe_load_raw_data(self):
join=self.join,
)
self._data.sort_index(inplace=True)


class DataLoaderDH(DataLoader):
"""DataLoaderDH
DataLoader based on (D)ata (H)andler
It is designed to load multiple data from data handler
- If you just want to load data from single datahandler, you can write them in single data handler
"""

def __init__(self, handler_config: dict, fetch_kwargs: dict = {}, is_group=False):
"""
Parameters
----------
handler_config : dict
handler_config will be used to describe the handlers

.. code-block::

<handler_config> := {
"group_name1": <handler>
"group_name2": <handler>
}
or
<handler_config> := <handler>
<handler> := DataHandler Instance | DataHandler Config

fetch_kwargs : dict
fetch_kwargs will be used to describe the different arguments of fetch method, such as col_set, squeeze, data_key, etc.

is_group: bool
is_group will be used to describe whether the key of handler_config is group

"""
from qlib.data.dataset.handler import DataHandler

if is_group:
self.handlers = {
grp: init_instance_by_config(config, accept_types=DataHandler) for grp, config in handler_config.items()
}
else:
self.handlers = init_instance_by_config(handler_config, accept_types=DataHandler)

self.is_group = is_group
self.fetch_kwargs = {"col_set": DataHandler.CS_RAW}
self.fetch_kwargs = {**self.fetch_kwargs, **fetch_kwargs}

def load(self, instruments=None, start_time=None, end_time=None) -> pd.DataFrame:
if instruments is not None:
LOG.warning(f"instruments[{instruments}] is ignored")

if self.is_group:
df = pd.concat(
{
grp: dh.fetch(selector=slice(start_time, end_time), level="datetime", **self.fetch_kwargs)
for grp, dh in self.handlers.items()
},
axis=1,
)
else:
df = self.handlers.fetch(selector=slice(start_time, end_time), level="datetime", **self.fetch_kwargs)
return df