Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 13 additions & 8 deletions examples/highfreq/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from highfreq_ops import get_calendar_day, DayLast, FFillNan, BFillNan, Date, Select, IsNull, Cut


class HighfreqWorkflow(object):
class HighfreqWorkflow:

SPEC_CONF = {"custom_ops": [DayLast, FFillNan, BFillNan, Date, Select, IsNull, Cut], "expression_cache": None}

Expand Down Expand Up @@ -145,35 +145,40 @@ def dump_and_load_dataset(self):

self._prepare_calender_cache()
##=============reinit dataset=============
dataset.init(
dataset.config(
handler_kwargs={
"init_type": DataHandlerLP.IT_LS,
"start_time": "2021-01-19 00:00:00",
"end_time": "2021-01-25 16:00:00",
},
segment_kwargs={
segments={
"test": (
"2021-01-19 00:00:00",
"2021-01-25 16:00:00",
),
},
)
dataset_backtest.init(
dataset.setup_data(
handler_kwargs={
"init_type": DataHandlerLP.IT_LS,
},
)
dataset_backtest.config(
handler_kwargs={
"start_time": "2021-01-19 00:00:00",
"end_time": "2021-01-25 16:00:00",
},
segment_kwargs={
segments={
"test": (
"2021-01-19 00:00:00",
"2021-01-25 16:00:00",
),
},
)
dataset_backtest.setup_data(handler_kwargs={})

##=============get data=============
xtest = dataset.prepare(["test"])
backtest_test = dataset_backtest.prepare(["test"])
xtest = dataset.prepare("test")
backtest_test = dataset_backtest.prepare("test")

print(xtest, backtest_test)
return
Expand Down
16 changes: 16 additions & 0 deletions examples/rolling_process_data/README.md
Original file line number Diff line number Diff line change
@@ -1 +1,17 @@
# Rolling Process Data

This workflow is an example for `Rolling Process Data`.

## Background

When rolling train the models, data also needs to be generated in the different rolling windows. When the rolling window moves, the training data will change, and the processor's learnable state (such as standard deviation, mean, etc.) will also change.

In order to avoid regenerating data, this example uses the `DataHandler-based DataLoader` to load the raw features that are not related to the rolling window, and then used Processors to generate processed-features related to the rolling window.


## Run the Code

Run the example by running the following command:
```bash
python workflow.py rolling_process
```
18 changes: 12 additions & 6 deletions examples/rolling_process_data/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from qlib.tests.data import GetData


class RollingDataWorkflow(object):
class RollingDataWorkflow:

MARKET = "csi300"
start_time = "2010-01-01"
Expand Down Expand Up @@ -101,15 +101,16 @@ def rolling_process(self):

print(f"===========rolling{rolling_offset} start===========")
if rolling_offset:
dataset.init(
dataset.config(
handler_kwargs={
"init_type": DataHandlerLP.IT_FIT_SEQ,
"start_time": datetime(train_start_time[0] + rolling_offset, *train_start_time[1:]),
"end_time": datetime(test_end_time[0] + rolling_offset, *test_end_time[1:]),
"fit_start_time": datetime(train_start_time[0] + rolling_offset, *train_start_time[1:]),
"fit_end_time": datetime(train_end_time[0] + rolling_offset, *train_end_time[1:]),
"processor_kwargs": {
"fit_start_time": datetime(train_start_time[0] + rolling_offset, *train_start_time[1:]),
"fit_end_time": datetime(train_end_time[0] + rolling_offset, *train_end_time[1:]),
},
},
segment_kwargs={
segments={
"train": (
datetime(train_start_time[0] + rolling_offset, *train_start_time[1:]),
datetime(train_end_time[0] + rolling_offset, *train_end_time[1:]),
Expand All @@ -124,6 +125,11 @@ def rolling_process(self):
),
},
)
dataset.setup_data(
handler_kwargs={
"init_type": DataHandlerLP.IT_FIT_SEQ,
}
)

dtrain, dvalid, dtest = dataset.prepare(["train", "valid", "test"])
print(dtrain, dvalid, dtest)
Expand Down
119 changes: 68 additions & 51 deletions qlib/data/dataset/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from ...utils import init_instance_by_config, np_ffill
from ...log import get_module_logger
from .handler import DataHandler, DataHandlerLP
from copy import deepcopy
from inspect import getfullargspec
import pandas as pd
import numpy as np
Expand All @@ -16,38 +17,44 @@ class Dataset(Serializable):
Preparing data for model training and inferencing.
"""

def __init__(self, *args, **kwargs):
def __init__(self, **kwargs):
"""
init is designed to finish following steps:

- init the sub instance and the state of the dataset(info to prepare the data)
- The name of essential state for preparing data should not start with '_' so that it could be serialized on disk when serializing.

- setup data
- The data related attributes' names should start with '_' so that it will not be saved on disk when serializing.

- initialize the state of the dataset(info to prepare the data)
- The name of essential state for preparing data should not start with '_' so that it could be serialized on disk when serializing.

The data could specify the info to caculate the essential data for preparation
"""
self.setup_data(*args, **kwargs)
self.setup_data(**kwargs)
super().__init__()

def setup_data(self, *args, **kwargs):
def config(self, **kwargs):
"""
config is designed to configure and parameters that cannot be learned from the data
"""
super().config(**kwargs)

def setup_data(self, **kwargs):
"""
Setup the data.

We split the setup_data function for following situation:

- User have a Dataset object with learned status on disk.

- User load the Dataset object from the disk(Note the init function is skiped).
- User load the Dataset object from the disk.

- User call `setup_data` to load new data.

- User prepare data for model based on previous status.
"""
pass

def prepare(self, *args, **kwargs) -> object:
def prepare(self, **kwargs) -> object:
"""
The type of dataset depends on the model. (It could be pd.DataFrame, pytorch.DataLoader, etc.)
The parameters should specify the scope for the prepared data
Expand Down Expand Up @@ -76,44 +83,7 @@ class DatasetH(Dataset):
- The processing is related to data split.
"""

def init(self, handler_kwargs: dict = None, segment_kwargs: dict = None):
"""
Initialize the DatasetH

Parameters
----------
handler_kwargs : dict
Config of DataHanlder, which could include the following arguments:

- arguments of DataHandler.conf_data, such as 'instruments', 'start_time' and 'end_time'.

- arguments of DataHandler.init, such as 'enable_cache', etc.

segment_kwargs : dict
Config of segments which is same as 'segments' in DatasetH.setup_data

"""
if handler_kwargs:
if not isinstance(handler_kwargs, dict):
raise TypeError(f"param handler_kwargs must be type dict, not {type(handler_kwargs)}")
kwargs_init = {}
kwargs_conf_data = {}
conf_data_arg = {"instruments", "start_time", "end_time", "fit_start_time", "fit_end_time"}
for k, v in handler_kwargs.items():
if k in conf_data_arg:
kwargs_conf_data.update({k: v})
else:
kwargs_init.update({k: v})

self.handler.conf_data(**kwargs_conf_data)
self.handler.init(**kwargs_init)

if segment_kwargs:
if not isinstance(segment_kwargs, dict):
raise TypeError(f"param handler_kwargs must be type dict, not {type(segment_kwargs)}")
self.segments = segment_kwargs.copy()

def setup_data(self, handler: Union[Dict, DataHandler], segments: Dict[Text, Tuple]):
def __init__(self, handler: Union[Dict, DataHandler], segments: Dict[Text, Tuple], **kwargs):
"""
Setup the underlying data.

Expand Down Expand Up @@ -144,6 +114,49 @@ def setup_data(self, handler: Union[Dict, DataHandler], segments: Dict[Text, Tup
"""
self.handler = init_instance_by_config(handler, accept_types=DataHandler)
self.segments = segments.copy()
super().__init__(**kwargs)

def config(self, handler_kwargs: dict = None, **kwargs):
"""
Initialize the DatasetH

Parameters
----------
handler_kwargs : dict
Config of DataHanlder, which could include the following arguments:

- arguments of DataHandler.conf_data, such as 'instruments', 'start_time' and 'end_time'.

kwargs : dict
Config of DatasetH, such as

- segments : dict
Config of segments which is same as 'segments' in self.__init__

"""
if handler_kwargs is not None:
self.handler.config(**handler_kwargs)
if "segments" in kwargs:
self.segments = deepcopy(kwargs.pop("segments"))
super().config(**kwargs)

def setup_data(self, handler_kwargs: dict = None, **kwargs):
"""
Setup the Data

Parameters
----------
handler_kwargs : dict
init arguments of DataHanlder, which could include the following arguments:

- init_type : Init Type of Handler

- enable_cache : wheter to enable cache

"""
super().setup_data(**kwargs)
if handler_kwargs is not None:
self.handler.setup_data(**handler_kwargs)

def __repr__(self):
return "{name}(handler={handler}, segments={segments})".format(
Expand Down Expand Up @@ -433,15 +446,19 @@ class TSDatasetH(DatasetH):
- The dimension of a batch of data <batch_idx, feature, timestep>
"""

def __init__(self, step_len=30, *args, **kwargs):
def __init__(self, step_len=30, **kwargs):
self.step_len = step_len
super().__init__(*args, **kwargs)
super().__init__(**kwargs)

def config(self, **kwargs):
if "step_len" in kwargs:
self.step_len = kwargs.pop("step_len")
super().config(**kwargs)

def setup_data(self, *args, **kwargs):
super().setup_data(*args, **kwargs)
def setup_data(self, **kwargs):
super().setup_data(**kwargs)
cal = self.handler.fetch(col_set=self.handler.CS_RAW).index.get_level_values("datetime").unique()
cal = sorted(cal)
# Get the datatime index for building timestamp
self.cal = cal

def _prepare_seg(self, slc: slice, **kwargs) -> TSDataSampler:
Expand Down
39 changes: 19 additions & 20 deletions qlib/data/dataset/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import bisect
import logging
import warnings
from inspect import getfullargspec
from typing import Union, Tuple, List, Iterator, Optional

import pandas as pd
Expand Down Expand Up @@ -99,10 +100,10 @@ def __init__(
self.fetch_orig = fetch_orig
if init_data:
with TimeInspector.logt("Init data"):
self.init()
self.setup_data()
super().__init__()

def conf_data(self, **kwargs):
def config(self, **kwargs):
"""
configuration of data.
# what data to be loaded from data source
Expand All @@ -116,9 +117,15 @@ def conf_data(self, **kwargs):
if k in attr_list:
setattr(self, k, v)

def init(self, enable_cache: bool = False):
for attr in attr_list:
if attr in kwargs:
kwargs.pop(attr)

super().config(**kwargs)

def setup_data(self, enable_cache: bool = False):
"""
initialize the data.
Set Up the data.
In case of running intialization for multiple time, it will do nothing for the second time.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it will do nothing for the second time is no longer the right docs


It is responsible for maintaining following variable
Expand Down Expand Up @@ -403,7 +410,7 @@ def process_data(self, with_fit: bool = False):
if self.drop_raw:
del self._data

def conf_data(self, **kwargs):
def config(self, processor_kwargs: dict = None, **kwargs):
"""
configuration of data.
# what data to be loaded from data source
Expand All @@ -412,27 +419,19 @@ def conf_data(self, **kwargs):
The data will be initialized with different time range.

"""
attr_list = {"fit_start_time", "fit_end_time"}
for k, v in kwargs.items():
if k in attr_list:
for infer_processor in self.infer_processors:
if getattr(infer_processor, k, None):
setattr(infer_processor, k, v)

for learn_processor in self.learn_processors:
if getattr(learn_processor, k, None):
setattr(learn_processor, k, v)

super().conf_data(**kwargs)
super().config(**kwargs)
if processor_kwargs is not None:
for processor in self.get_all_processors():
processor.config(**processor_kwargs)

# init type
IT_FIT_SEQ = "fit_seq" # the input of `fit` will be the output of the previous processor
IT_FIT_IND = "fit_ind" # the input of `fit` will be the original df
IT_LS = "load_state" # The state of the object has been load by pickle

def init(self, init_type: str = IT_FIT_SEQ, enable_cache: bool = False):
def setup_data(self, init_type: str = IT_FIT_SEQ, **kwargs):
"""
Initialize the data of Qlib
Set up the data of Qlib

Parameters
----------
Expand All @@ -447,7 +446,7 @@ def init(self, init_type: str = IT_FIT_SEQ, enable_cache: bool = False):
when we call `init` next time
"""
# init raw data
super().init(enable_cache=enable_cache)
super().setup_data(**kwargs)

with TimeInspector.logt("fit & process data"):
if init_type == DataHandlerLP.IT_FIT_IND:
Expand Down
Loading