diff --git a/examples/highfreq/workflow.py b/examples/highfreq/workflow.py index 01de59c0e77..5660ab2e9e4 100644 --- a/examples/highfreq/workflow.py +++ b/examples/highfreq/workflow.py @@ -27,12 +27,11 @@ from highfreq_ops import get_calendar_day, DayLast, FFillNan, BFillNan, Date, Select, IsNull, Cut -class HighfreqWorkflow(object): +class HighfreqWorkflow: SPEC_CONF = {"custom_ops": [DayLast, FFillNan, BFillNan, Date, Select, IsNull, Cut], "expression_cache": None} MARKET = "all" - BENCHMARK = "SH000300" start_time = "2020-09-15 00:00:00" end_time = "2021-01-18 16:00:00" @@ -146,35 +145,40 @@ def dump_and_load_dataset(self): self._prepare_calender_cache() ##=============reinit dataset============= - dataset.init( + dataset.config( handler_kwargs={ - "init_type": DataHandlerLP.IT_LS, "start_time": "2021-01-19 00:00:00", "end_time": "2021-01-25 16:00:00", }, - segment_kwargs={ + segments={ "test": ( "2021-01-19 00:00:00", "2021-01-25 16:00:00", ), }, ) - dataset_backtest.init( + dataset.setup_data( + handler_kwargs={ + "init_type": DataHandlerLP.IT_LS, + }, + ) + dataset_backtest.config( handler_kwargs={ "start_time": "2021-01-19 00:00:00", "end_time": "2021-01-25 16:00:00", }, - segment_kwargs={ + segments={ "test": ( "2021-01-19 00:00:00", "2021-01-25 16:00:00", ), }, ) + dataset_backtest.setup_data(handler_kwargs={}) ##=============get data============= - xtest = dataset.prepare(["test"]) - backtest_test = dataset_backtest.prepare(["test"]) + xtest = dataset.prepare("test") + backtest_test = dataset_backtest.prepare("test") print(xtest, backtest_test) return diff --git a/examples/rolling_process_data/README.md b/examples/rolling_process_data/README.md new file mode 100644 index 00000000000..315fe2eede9 --- /dev/null +++ b/examples/rolling_process_data/README.md @@ -0,0 +1,17 @@ +# Rolling Process Data + +This workflow is an example for `Rolling Process Data`. + +## Background + +When rolling train the models, data also needs to be generated in the different rolling windows. When the rolling window moves, the training data will change, and the processor's learnable state (such as standard deviation, mean, etc.) will also change. + +In order to avoid regenerating data, this example uses the `DataHandler-based DataLoader` to load the raw features that are not related to the rolling window, and then used Processors to generate processed-features related to the rolling window. + + +## Run the Code + +Run the example by running the following command: +```bash + python workflow.py rolling_process +``` \ No newline at end of file diff --git a/examples/rolling_process_data/rolling_handler.py b/examples/rolling_process_data/rolling_handler.py new file mode 100644 index 00000000000..13b399afd87 --- /dev/null +++ b/examples/rolling_process_data/rolling_handler.py @@ -0,0 +1,32 @@ +from qlib.data.dataset.handler import DataHandlerLP +from qlib.data.dataset.loader import DataLoaderDH +from qlib.contrib.data.handler import check_transform_proc + + +class RollingDataHandler(DataHandlerLP): + def __init__( + self, + start_time=None, + end_time=None, + infer_processors=[], + learn_processors=[], + fit_start_time=None, + fit_end_time=None, + data_loader_kwargs={}, + ): + infer_processors = check_transform_proc(infer_processors, fit_start_time, fit_end_time) + learn_processors = check_transform_proc(learn_processors, fit_start_time, fit_end_time) + + data_loader = { + "class": "DataLoaderDH", + "kwargs": {**data_loader_kwargs}, + } + + super().__init__( + instruments=None, + start_time=start_time, + end_time=end_time, + data_loader=data_loader, + infer_processors=infer_processors, + learn_processors=learn_processors, + ) diff --git a/examples/rolling_process_data/workflow.py b/examples/rolling_process_data/workflow.py new file mode 100644 index 00000000000..5757aaa876e --- /dev/null +++ b/examples/rolling_process_data/workflow.py @@ -0,0 +1,141 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +import qlib +import fire +import pickle +import pandas as pd + +from datetime import datetime +from qlib.config import REG_CN +from qlib.data.dataset.handler import DataHandlerLP +from qlib.contrib.data.handler import Alpha158 +from qlib.utils import exists_qlib_data, init_instance_by_config +from qlib.tests.data import GetData + + +class RollingDataWorkflow: + + MARKET = "csi300" + start_time = "2010-01-01" + end_time = "2019-12-31" + rolling_cnt = 5 + + def _init_qlib(self): + """initialize qlib""" + # use yahoo_cn_1min data + provider_uri = "~/.qlib/qlib_data/cn_data" # target_dir + if not exists_qlib_data(provider_uri): + print(f"Qlib data is not found in {provider_uri}") + GetData().qlib_data(target_dir=provider_uri, region=REG_CN) + qlib.init(provider_uri=provider_uri, region=REG_CN) + + def _dump_pre_handler(self, path): + handler_config = { + "class": "Alpha158", + "module_path": "qlib.contrib.data.handler", + "kwargs": { + "start_time": self.start_time, + "end_time": self.end_time, + "instruments": self.MARKET, + "infer_processors": [], + "learn_processors": [], + }, + } + pre_handler = init_instance_by_config(handler_config) + pre_handler.config(dump_all=True) + pre_handler.to_pickle(path) + + def _load_pre_handler(self, path): + with open(path, "rb") as file_dataset: + pre_handler = pickle.load(file_dataset) + return pre_handler + + def rolling_process(self): + self._init_qlib() + self._dump_pre_handler("pre_handler.pkl") + pre_handler = self._load_pre_handler("pre_handler.pkl") + + train_start_time = (2010, 1, 1) + train_end_time = (2012, 12, 31) + valid_start_time = (2013, 1, 1) + valid_end_time = (2013, 12, 31) + test_start_time = (2014, 1, 1) + test_end_time = (2014, 12, 31) + + dataset_config = { + "class": "DatasetH", + "module_path": "qlib.data.dataset", + "kwargs": { + "handler": { + "class": "RollingDataHandler", + "module_path": "rolling_handler", + "kwargs": { + "start_time": datetime(*train_start_time), + "end_time": datetime(*test_end_time), + "fit_start_time": datetime(*train_start_time), + "fit_end_time": datetime(*train_end_time), + "infer_processors": [ + {"class": "RobustZScoreNorm", "kwargs": {"fields_group": "feature"}}, + ], + "learn_processors": [ + {"class": "DropnaLabel"}, + {"class": "CSZScoreNorm", "kwargs": {"fields_group": "label"}}, + ], + "data_loader_kwargs": { + "handler_config": pre_handler, + }, + }, + }, + "segments": { + "train": (datetime(*train_start_time), datetime(*train_end_time)), + "valid": (datetime(*valid_start_time), datetime(*valid_end_time)), + "test": (datetime(*test_start_time), datetime(*test_end_time)), + }, + }, + } + + dataset = init_instance_by_config(dataset_config) + + for rolling_offset in range(self.rolling_cnt): + + print(f"===========rolling{rolling_offset} start===========") + if rolling_offset: + dataset.config( + handler_kwargs={ + "start_time": datetime(train_start_time[0] + rolling_offset, *train_start_time[1:]), + "end_time": datetime(test_end_time[0] + rolling_offset, *test_end_time[1:]), + "processor_kwargs": { + "fit_start_time": datetime(train_start_time[0] + rolling_offset, *train_start_time[1:]), + "fit_end_time": datetime(train_end_time[0] + rolling_offset, *train_end_time[1:]), + }, + }, + segments={ + "train": ( + datetime(train_start_time[0] + rolling_offset, *train_start_time[1:]), + datetime(train_end_time[0] + rolling_offset, *train_end_time[1:]), + ), + "valid": ( + datetime(valid_start_time[0] + rolling_offset, *valid_start_time[1:]), + datetime(valid_end_time[0] + rolling_offset, *valid_end_time[1:]), + ), + "test": ( + datetime(test_start_time[0] + rolling_offset, *test_start_time[1:]), + datetime(test_end_time[0] + rolling_offset, *test_end_time[1:]), + ), + }, + ) + dataset.setup_data( + handler_kwargs={ + "init_type": DataHandlerLP.IT_FIT_SEQ, + } + ) + + dtrain, dvalid, dtest = dataset.prepare(["train", "valid", "test"]) + print(dtrain, dvalid, dtest) + ## print or dump data + print(f"===========rolling{rolling_offset} end===========") + + +if __name__ == "__main__": + fire.Fire(RollingDataWorkflow) diff --git a/qlib/data/dataset/__init__.py b/qlib/data/dataset/__init__.py index 0f5d2baba0c..b3eaac7a33d 100644 --- a/qlib/data/dataset/__init__.py +++ b/qlib/data/dataset/__init__.py @@ -3,6 +3,7 @@ from ...utils import init_instance_by_config, np_ffill from ...log import get_module_logger from .handler import DataHandler, DataHandlerLP +from copy import deepcopy from inspect import getfullargspec import pandas as pd import numpy as np @@ -16,22 +17,28 @@ class Dataset(Serializable): Preparing data for model training and inferencing. """ - def __init__(self, *args, **kwargs): + def __init__(self, **kwargs): """ init is designed to finish following steps: + - init the sub instance and the state of the dataset(info to prepare the data) + - The name of essential state for preparing data should not start with '_' so that it could be serialized on disk when serializing. + - setup data - The data related attributes' names should start with '_' so that it will not be saved on disk when serializing. - - initialize the state of the dataset(info to prepare the data) - - The name of essential state for preparing data should not start with '_' so that it could be serialized on disk when serializing. - The data could specify the info to caculate the essential data for preparation """ - self.setup_data(*args, **kwargs) + self.setup_data(**kwargs) super().__init__() - def setup_data(self, *args, **kwargs): + def config(self, **kwargs): + """ + config is designed to configure and parameters that cannot be learned from the data + """ + super().config(**kwargs) + + def setup_data(self, **kwargs): """ Setup the data. @@ -39,7 +46,7 @@ def setup_data(self, *args, **kwargs): - User have a Dataset object with learned status on disk. - - User load the Dataset object from the disk(Note the init function is skiped). + - User load the Dataset object from the disk. - User call `setup_data` to load new data. @@ -47,7 +54,7 @@ def setup_data(self, *args, **kwargs): """ pass - def prepare(self, *args, **kwargs) -> object: + def prepare(self, **kwargs) -> object: """ The type of dataset depends on the model. (It could be pd.DataFrame, pytorch.DataLoader, etc.) The parameters should specify the scope for the prepared data @@ -76,44 +83,7 @@ class DatasetH(Dataset): - The processing is related to data split. """ - def init(self, handler_kwargs: dict = None, segment_kwargs: dict = None): - """ - Initialize the DatasetH - - Parameters - ---------- - handler_kwargs : dict - Config of DataHanlder, which could include the following arguments: - - - arguments of DataHandler.conf_data, such as 'instruments', 'start_time' and 'end_time'. - - - arguments of DataHandler.init, such as 'enable_cache', etc. - - segment_kwargs : dict - Config of segments which is same as 'segments' in DatasetH.setup_data - - """ - if handler_kwargs: - if not isinstance(handler_kwargs, dict): - raise TypeError(f"param handler_kwargs must be type dict, not {type(handler_kwargs)}") - kwargs_init = {} - kwargs_conf_data = {} - conf_data_arg = {"instruments", "start_time", "end_time"} - for k, v in handler_kwargs.items(): - if k in conf_data_arg: - kwargs_conf_data.update({k: v}) - else: - kwargs_init.update({k: v}) - - self.handler.conf_data(**kwargs_conf_data) - self.handler.init(**kwargs_init) - - if segment_kwargs: - if not isinstance(segment_kwargs, dict): - raise TypeError(f"param handler_kwargs must be type dict, not {type(segment_kwargs)}") - self.segments = segment_kwargs.copy() - - def setup_data(self, handler: Union[Dict, DataHandler], segments: Dict[Text, Tuple]): + def __init__(self, handler: Union[Dict, DataHandler], segments: Dict[Text, Tuple], **kwargs): """ Setup the underlying data. @@ -144,6 +114,49 @@ def setup_data(self, handler: Union[Dict, DataHandler], segments: Dict[Text, Tup """ self.handler = init_instance_by_config(handler, accept_types=DataHandler) self.segments = segments.copy() + super().__init__(**kwargs) + + def config(self, handler_kwargs: dict = None, **kwargs): + """ + Initialize the DatasetH + + Parameters + ---------- + handler_kwargs : dict + Config of DataHanlder, which could include the following arguments: + + - arguments of DataHandler.conf_data, such as 'instruments', 'start_time' and 'end_time'. + + kwargs : dict + Config of DatasetH, such as + + - segments : dict + Config of segments which is same as 'segments' in self.__init__ + + """ + if handler_kwargs is not None: + self.handler.config(**handler_kwargs) + if "segments" in kwargs: + self.segments = deepcopy(kwargs.pop("segments")) + super().config(**kwargs) + + def setup_data(self, handler_kwargs: dict = None, **kwargs): + """ + Setup the Data + + Parameters + ---------- + handler_kwargs : dict + init arguments of DataHanlder, which could include the following arguments: + + - init_type : Init Type of Handler + + - enable_cache : wheter to enable cache + + """ + super().setup_data(**kwargs) + if handler_kwargs is not None: + self.handler.setup_data(**handler_kwargs) def __repr__(self): return "{name}(handler={handler}, segments={segments})".format( @@ -433,15 +446,19 @@ class TSDatasetH(DatasetH): - The dimension of a batch of data """ - def __init__(self, step_len=30, *args, **kwargs): + def __init__(self, step_len=30, **kwargs): self.step_len = step_len - super().__init__(*args, **kwargs) + super().__init__(**kwargs) + + def config(self, **kwargs): + if "step_len" in kwargs: + self.step_len = kwargs.pop("step_len") + super().config(**kwargs) - def setup_data(self, *args, **kwargs): - super().setup_data(*args, **kwargs) + def setup_data(self, **kwargs): + super().setup_data(**kwargs) cal = self.handler.fetch(col_set=self.handler.CS_RAW).index.get_level_values("datetime").unique() cal = sorted(cal) - # Get the datatime index for building timestamp self.cal = cal def _prepare_seg(self, slc: slice, **kwargs) -> TSDataSampler: diff --git a/qlib/data/dataset/handler.py b/qlib/data/dataset/handler.py index 050043ba607..201d2459dbf 100644 --- a/qlib/data/dataset/handler.py +++ b/qlib/data/dataset/handler.py @@ -6,6 +6,7 @@ import bisect import logging import warnings +from inspect import getfullargspec from typing import Union, Tuple, List, Iterator, Optional import pandas as pd @@ -16,7 +17,7 @@ from ...config import C from ...utils import parse_config, transform_end_date, init_instance_by_config from ...utils.serial import Serializable -from .utils import get_level_index, fetch_df_by_index +from .utils import fetch_df_by_index from pathlib import Path from .loader import DataLoader @@ -99,10 +100,10 @@ def __init__( self.fetch_orig = fetch_orig if init_data: with TimeInspector.logt("Init data"): - self.init() + self.setup_data() super().__init__() - def conf_data(self, **kwargs): + def config(self, **kwargs): """ configuration of data. # what data to be loaded from data source @@ -115,13 +116,16 @@ def conf_data(self, **kwargs): for k, v in kwargs.items(): if k in attr_list: setattr(self, k, v) - else: - raise KeyError("Such config is not supported.") - def init(self, enable_cache: bool = False): + for attr in attr_list: + if attr in kwargs: + kwargs.pop(attr) + + super().config(**kwargs) + + def setup_data(self, enable_cache: bool = False): """ - initialize the data. - In case of running intialization for multiple time, it will do nothing for the second time. + Set Up the data in case of running intialization for multiple time It is responsible for maintaining following variable 1) self._data @@ -405,14 +409,28 @@ def process_data(self, with_fit: bool = False): if self.drop_raw: del self._data + def config(self, processor_kwargs: dict = None, **kwargs): + """ + configuration of data. + # what data to be loaded from data source + + This method will be used when loading pickled handler from dataset. + The data will be initialized with different time range. + + """ + super().config(**kwargs) + if processor_kwargs is not None: + for processor in self.get_all_processors(): + processor.config(**processor_kwargs) + # init type IT_FIT_SEQ = "fit_seq" # the input of `fit` will be the output of the previous processor IT_FIT_IND = "fit_ind" # the input of `fit` will be the original df IT_LS = "load_state" # The state of the object has been load by pickle - def init(self, init_type: str = IT_FIT_SEQ, enable_cache: bool = False): + def setup_data(self, init_type: str = IT_FIT_SEQ, **kwargs): """ - Initialize the data of Qlib + Set up the data in case of running intialization for multiple time Parameters ---------- @@ -427,7 +445,7 @@ def init(self, init_type: str = IT_FIT_SEQ, enable_cache: bool = False): when we call `init` next time """ # init raw data - super().init(enable_cache=enable_cache) + super().setup_data(**kwargs) with TimeInspector.logt("fit & process data"): if init_type == DataHandlerLP.IT_FIT_IND: diff --git a/qlib/data/dataset/loader.py b/qlib/data/dataset/loader.py index 921bf01c57e..58aca1d4f71 100644 --- a/qlib/data/dataset/loader.py +++ b/qlib/data/dataset/loader.py @@ -217,3 +217,64 @@ def _maybe_load_raw_data(self): join=self.join, ) self._data.sort_index(inplace=True) + + +class DataLoaderDH(DataLoader): + """DataLoaderDH + DataLoader based on (D)ata (H)andler + It is designed to load multiple data from data handler + - If you just want to load data from single datahandler, you can write them in single data handler + """ + + def __init__(self, handler_config: dict, fetch_kwargs: dict = {}, is_group=False): + """ + Parameters + ---------- + handler_config : dict + handler_config will be used to describe the handlers + + .. code-block:: + + := { + "group_name1": + "group_name2": + } + or + := + := DataHandler Instance | DataHandler Config + + fetch_kwargs : dict + fetch_kwargs will be used to describe the different arguments of fetch method, such as col_set, squeeze, data_key, etc. + + is_group: bool + is_group will be used to describe whether the key of handler_config is group + + """ + from qlib.data.dataset.handler import DataHandler + + if is_group: + self.handlers = { + grp: init_instance_by_config(config, accept_types=DataHandler) for grp, config in handler_config.items() + } + else: + self.handlers = init_instance_by_config(handler_config, accept_types=DataHandler) + + self.is_group = is_group + self.fetch_kwargs = {"col_set": DataHandler.CS_RAW} + self.fetch_kwargs.update(fetch_kwargs) + + def load(self, instruments=None, start_time=None, end_time=None) -> pd.DataFrame: + if instruments is not None: + LOG.warning(f"instruments[{instruments}] is ignored") + + if self.is_group: + df = pd.concat( + { + grp: dh.fetch(selector=slice(start_time, end_time), level="datetime", **self.fetch_kwargs) + for grp, dh in self.handlers.items() + }, + axis=1, + ) + else: + df = self.handlers.fetch(selector=slice(start_time, end_time), level="datetime", **self.fetch_kwargs) + return df diff --git a/qlib/data/dataset/processor.py b/qlib/data/dataset/processor.py index 5a06f66beff..e035f562423 100755 --- a/qlib/data/dataset/processor.py +++ b/qlib/data/dataset/processor.py @@ -72,6 +72,17 @@ def is_for_infer(self) -> bool: """ return True + def config(self, **kwargs): + attr_list = {"fit_start_time", "fit_end_time"} + for k, v in kwargs.items(): + if k in attr_list and hasattr(self, k): + setattr(self, k, v) + + for attr in attr_list: + if attr in kwargs: + kwargs.pop(attr) + super().config(**kwargs) + class DropnaProcessor(Processor): def __init__(self, fields_group=None):