From 1ca3c6a61c11cff9adf79b1657af555cf68a365a Mon Sep 17 00:00:00 2001 From: bxdd Date: Thu, 25 Mar 2021 01:29:59 +0800 Subject: [PATCH 01/23] add DataHandlerDL --- qlib/data/dataset/loader.py | 58 +++++++++++++++++++++++++++++++++++++ 1 file changed, 58 insertions(+) diff --git a/qlib/data/dataset/loader.py b/qlib/data/dataset/loader.py index 921bf01c57e..faabe2c021c 100644 --- a/qlib/data/dataset/loader.py +++ b/qlib/data/dataset/loader.py @@ -217,3 +217,61 @@ def _maybe_load_raw_data(self): join=self.join, ) self._data.sort_index(inplace=True) + +class DataHandlerDL(DataLoader): + '''DataHandlerDL + DataHandler-based (D)ata (L)oader + It is designed to load multiple data from data handler + - If you just want to load data from single datahandler, you can write them in single data handler + ''' + + def __init__(self, handler_config:dict, fetch_config:dict = {}, is_group=False): + """ + Parameters + ---------- + handler_config : dict + handler_config will be used to describe the handlers + + .. code-block:: + + := { + "group_name1": + "group_name2": + } + or + := + := DataHandler Instance | DataHandler Config + + fetch_config : dict + fetch_config will be used to describe the different arguments of fetch method, such as squeeze, data_key, etc. + + is_group: bool + is_group will be used to describe whether the key of handler_config is group + + """ + if self.is_group: + self.handlers = { + grp: init_instance_by_config(config, accept_types=DataHandler) + for grp, config in handler_config.items() + } + else: + self.handlers = init_instance_by_config(handler_config, accept_types=DataHandler) + + self.is_group = is_group + self.fetch_config = fetch_config + + def load(self, instruments=None, start_time=None, end_time=None) -> pd.DataFrame: + if instruments is not None: + LOG.warning(f"instruments[{instruments}] is ignored") + + if self.is_group: + df = pd.concat( + { + grp: dh.fetch(slice(start_time, end_time), col_set=DataHandler.CS_RAW, **fetch_config) + for grp, dh in self.handlers.items() + }, + axis=1, + ) + else: + df = self.handler.fetch(slice(start_time, end_time), col_set=DataHandler.CS_RAW, **fetch_config) + return df From b1a28358adb9b9e15abd09fe59f7ff4544e399ed Mon Sep 17 00:00:00 2001 From: bxdd Date: Thu, 25 Mar 2021 01:30:31 +0800 Subject: [PATCH 02/23] black format --- qlib/data/dataset/loader.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/qlib/data/dataset/loader.py b/qlib/data/dataset/loader.py index faabe2c021c..884d15635a1 100644 --- a/qlib/data/dataset/loader.py +++ b/qlib/data/dataset/loader.py @@ -218,14 +218,15 @@ def _maybe_load_raw_data(self): ) self._data.sort_index(inplace=True) + class DataHandlerDL(DataLoader): - '''DataHandlerDL + """DataHandlerDL DataHandler-based (D)ata (L)oader It is designed to load multiple data from data handler - If you just want to load data from single datahandler, you can write them in single data handler - ''' + """ - def __init__(self, handler_config:dict, fetch_config:dict = {}, is_group=False): + def __init__(self, handler_config: dict, fetch_config: dict = {}, is_group=False): """ Parameters ---------- @@ -251,12 +252,11 @@ def __init__(self, handler_config:dict, fetch_config:dict = {}, is_group=False): """ if self.is_group: self.handlers = { - grp: init_instance_by_config(config, accept_types=DataHandler) - for grp, config in handler_config.items() + grp: init_instance_by_config(config, accept_types=DataHandler) for grp, config in handler_config.items() } else: self.handlers = init_instance_by_config(handler_config, accept_types=DataHandler) - + self.is_group = is_group self.fetch_config = fetch_config From 1fcfe8e4ba6e655ba59ae95180c491ea3fe85c8e Mon Sep 17 00:00:00 2001 From: bxdd Date: Thu, 25 Mar 2021 01:37:17 +0800 Subject: [PATCH 03/23] add rolling process data --- examples/rolling_process_data/README.md | 2 ++ examples/rolling_process_data/workflow.py | 0 2 files changed, 2 insertions(+) create mode 100644 examples/rolling_process_data/README.md create mode 100644 examples/rolling_process_data/workflow.py diff --git a/examples/rolling_process_data/README.md b/examples/rolling_process_data/README.md new file mode 100644 index 00000000000..3f1c8768d5e --- /dev/null +++ b/examples/rolling_process_data/README.md @@ -0,0 +1,2 @@ +# Rolling Process Data + diff --git a/examples/rolling_process_data/workflow.py b/examples/rolling_process_data/workflow.py new file mode 100644 index 00000000000..e69de29bb2d From f6dc25b22982d5e80b4cd2f9c2fc823ed98d244b Mon Sep 17 00:00:00 2001 From: bxdd Date: Thu, 25 Mar 2021 16:14:22 +0800 Subject: [PATCH 04/23] update rolling process --- examples/highfreq/workflow.py | 1 - .../rolling_process_data/rolling_handler.py | 34 ++++ examples/rolling_process_data/workflow.py | 145 ++++++++++++++++++ qlib/data/dataset/handler.py | 2 +- qlib/data/dataset/loader.py | 21 +-- 5 files changed, 192 insertions(+), 11 deletions(-) create mode 100644 examples/rolling_process_data/rolling_handler.py diff --git a/examples/highfreq/workflow.py b/examples/highfreq/workflow.py index 01de59c0e77..c2ca36db344 100644 --- a/examples/highfreq/workflow.py +++ b/examples/highfreq/workflow.py @@ -32,7 +32,6 @@ class HighfreqWorkflow(object): SPEC_CONF = {"custom_ops": [DayLast, FFillNan, BFillNan, Date, Select, IsNull, Cut], "expression_cache": None} MARKET = "all" - BENCHMARK = "SH000300" start_time = "2020-09-15 00:00:00" end_time = "2021-01-18 16:00:00" diff --git a/examples/rolling_process_data/rolling_handler.py b/examples/rolling_process_data/rolling_handler.py new file mode 100644 index 00000000000..50a36f21973 --- /dev/null +++ b/examples/rolling_process_data/rolling_handler.py @@ -0,0 +1,34 @@ +from qlib.data.dataset.handler import DataHandlerLP +from qlib.data.dataset.loader import DataLoaderDH +from qlib.contrib.data.handler import check_transform_proc + + +class RollingDataHandler(DataHandlerLP): + def __init__( + self, + start_time=None, + end_time=None, + infer_processors=[], + learn_processors=[], + fit_start_time=None, + fit_end_time=None, + data_loader_kwargs={} + ): + infer_processors = check_transform_proc(infer_processors, fit_start_time, fit_end_time) + learn_processors = check_transform_proc(learn_processors, fit_start_time, fit_end_time) + + data_loader = { + "class": "DataLoaderDH", + "kwargs": { + **data_loader_kwargs + }, + } + + super().__init__( + instruments=None, + start_time=start_time, + end_time=end_time, + data_loader=data_loader, + infer_processors=infer_processors, + learn_processors=learn_processors, + ) diff --git a/examples/rolling_process_data/workflow.py b/examples/rolling_process_data/workflow.py index e69de29bb2d..8581f149b02 100644 --- a/examples/rolling_process_data/workflow.py +++ b/examples/rolling_process_data/workflow.py @@ -0,0 +1,145 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +import qlib +import pickle +import datetime +import pandas as pd +from qlib.config import REG_CN +from qlib.data.dataset.handler import DataHandlerLP +from qlib.contrib.data.handler import Alpha158 +from qlib.utils import exists_qlib_data, init_instance_by_config +from qlib.tests.data import GetData + +class RollingDataWorkflow(object): + + MARKET = "csi300" + + start_time = "2010-01-01" + end_time = "2019-12-31" + rolling_cnt = 5 + + def _init_qlib(self): + """initialize qlib""" + # use yahoo_cn_1min data + provider_uri = "~/.qlib/qlib_data/cn_data" # target_dir + if not exists_qlib_data(provider_uri): + print(f"Qlib data is not found in {provider_uri}") + GetData().qlib_data(target_dir=provider_uri, region=REG_CN) + qlib.init(provider_uri=provider_uri, region=REG_CN) + + def _dump_pre_handler(self, path): + handler_config = { + "class": "Alpha158", + "module_path": "qlib.contrib.data.handler", + "kwargs": { + "start_time": start_time, + "end_time": end_time, + "instruments": MARKET, + }, + } + pre_handler = init_instance_by_config(handler_config) + pre_handler.to_pickle(path) + + def _load_pre_handler(self, path): + with open(path, "rb") as file_dataset: + pre_handler = pickle.load(file_dataset) + return pre_handler + + def rolling_process(self): + self._init_qlib() + self._dump_pre_handler("pre_handler.py") + pre_handler = self._load_pre_handler("pre_handler.py") + + init_start_time = datetime.datetime(2010,1,1) + init_end_time = datetime.datetime(2014,12,31) + init_fit_end_time = datetime.datetime(2012,12,31) + + dataset_config = { + "class": "DatasetH", + "module_path": "qlib.data.dataset", + "kwargs": { + "handler": { + "class": "RollingDataHandler", + "module_path": "rolling_handler", + "kwargs": { + "start_time": init_start_time, + "end_time": init_start_time, + "fit_start_time": init_fit_start_time, + "fit_end_time": init_fit_end_time, + "data_loader_kwargs":{ + "handler_config": pre_handler, + } + }, + }, + "segments": { + "train": (init_start_time, init_fit_end_time), + "valid": (init_start_time, "2013-12-31"), + "test": (init_start_time, init_end_time), + }, + }, + } + + dataset = init_instance_by_config(dataset_config) + + for rolling_offset in range(rolling_cnt): + if rolling_offset: + dataset.init( + handler_kwargs={ + "init_type": DataHandlerLP.IT_FIT_IND, + "start_time": "2021-01-19 00:00:00", + "end_time": "2021-01-25 16:00:00", + }, + segment_kwargs={ + "train": ("2010-01-01", "2012-12-31"), + "valid": ("2013-01-01", "2013-12-31"), + "test": ("2014-01-01", "2014-12-31"), + }, + ) + + +if __name__ == "__main__": + + # use default data + provider_uri = "~/.qlib/qlib_data/cn_data" # target_dir + if not exists_qlib_data(provider_uri): + print(f"Qlib data is not found in {provider_uri}") + GetData().qlib_data(target_dir=provider_uri, region=REG_CN) + + qlib.init(provider_uri=provider_uri, region=REG_CN) + + market = "csi300" + benchmark = "SH000300" + + ################################### + # train model + ################################### + data_handler_config = { + "start_time": "2008-01-01", + "end_time": "2020-08-01", + "fit_start_time": "2008-01-01", + "fit_end_time": "2014-12-31", + "instruments": market, + } + + task = { + "dataset": { + "class": "DatasetH", + "module_path": "qlib.data.dataset", + "kwargs": { + "handler": { + "class": "Alpha158", + "module_path": "qlib.contrib.data.handler", + "kwargs": data_handler_config, + }, + "segments": { + "train": ("2008-01-01", "2014-12-31"), + "valid": ("2015-01-01", "2016-12-31"), + "test": ("2017-01-01", "2020-08-01"), + }, + }, + }, + } + + dataset = init_instance_by_config(task["dataset"]) + diff --git a/qlib/data/dataset/handler.py b/qlib/data/dataset/handler.py index 050043ba607..f4795c5667d 100644 --- a/qlib/data/dataset/handler.py +++ b/qlib/data/dataset/handler.py @@ -16,7 +16,7 @@ from ...config import C from ...utils import parse_config, transform_end_date, init_instance_by_config from ...utils.serial import Serializable -from .utils import get_level_index, fetch_df_by_index +from .utils import fetch_df_by_index from pathlib import Path from .loader import DataLoader diff --git a/qlib/data/dataset/loader.py b/qlib/data/dataset/loader.py index 884d15635a1..f88aaf05e8e 100644 --- a/qlib/data/dataset/loader.py +++ b/qlib/data/dataset/loader.py @@ -219,14 +219,14 @@ def _maybe_load_raw_data(self): self._data.sort_index(inplace=True) -class DataHandlerDL(DataLoader): - """DataHandlerDL - DataHandler-based (D)ata (L)oader +class DataLoaderDH(DataLoader): + """DataLoaderDH + DataLoader based on (D)ata (H)andler It is designed to load multiple data from data handler - If you just want to load data from single datahandler, you can write them in single data handler """ - def __init__(self, handler_config: dict, fetch_config: dict = {}, is_group=False): + def __init__(self, handler_config: dict, fetch_kwargs: dict = {}, is_group=False): """ Parameters ---------- @@ -243,8 +243,8 @@ def __init__(self, handler_config: dict, fetch_config: dict = {}, is_group=False := := DataHandler Instance | DataHandler Config - fetch_config : dict - fetch_config will be used to describe the different arguments of fetch method, such as squeeze, data_key, etc. + fetch_kwargs : dict + fetch_kwargs will be used to describe the different arguments of fetch method, such as col_set, squeeze, data_key, etc. is_group: bool is_group will be used to describe whether the key of handler_config is group @@ -258,7 +258,10 @@ def __init__(self, handler_config: dict, fetch_config: dict = {}, is_group=False self.handlers = init_instance_by_config(handler_config, accept_types=DataHandler) self.is_group = is_group - self.fetch_config = fetch_config + self.fetch_kwargs = { + "col_set":DataHandler.CS_RAW + } + self.fetch_kwargs = {**self.fetch_kwargs, **fetch_kwargs} def load(self, instruments=None, start_time=None, end_time=None) -> pd.DataFrame: if instruments is not None: @@ -267,11 +270,11 @@ def load(self, instruments=None, start_time=None, end_time=None) -> pd.DataFrame if self.is_group: df = pd.concat( { - grp: dh.fetch(slice(start_time, end_time), col_set=DataHandler.CS_RAW, **fetch_config) + grp: dh.fetch(selector=slice(start_time, end_time), level="datetime", **self.fetch_kwargs) for grp, dh in self.handlers.items() }, axis=1, ) else: - df = self.handler.fetch(slice(start_time, end_time), col_set=DataHandler.CS_RAW, **fetch_config) + df = self.handler.fetch(selector=slice(start_time, end_time), level="datetime", **self.fetch_kwargs) return df From 4ec300787efc87900db522145f43e20d52402bc1 Mon Sep 17 00:00:00 2001 From: bxdd Date: Thu, 25 Mar 2021 19:54:52 +0800 Subject: [PATCH 05/23] update rolling workflow --- examples/rolling_process_data/workflow.py | 49 +++++++++++++---------- 1 file changed, 27 insertions(+), 22 deletions(-) diff --git a/examples/rolling_process_data/workflow.py b/examples/rolling_process_data/workflow.py index 8581f149b02..62523aefd74 100644 --- a/examples/rolling_process_data/workflow.py +++ b/examples/rolling_process_data/workflow.py @@ -3,8 +3,9 @@ import qlib import pickle -import datetime import pandas as pd + +from datetime import datetime from qlib.config import REG_CN from qlib.data.dataset.handler import DataHandlerLP from qlib.contrib.data.handler import Alpha158 @@ -14,7 +15,6 @@ class RollingDataWorkflow(object): MARKET = "csi300" - start_time = "2010-01-01" end_time = "2019-12-31" rolling_cnt = 5 @@ -33,9 +33,9 @@ def _dump_pre_handler(self, path): "class": "Alpha158", "module_path": "qlib.contrib.data.handler", "kwargs": { - "start_time": start_time, - "end_time": end_time, - "instruments": MARKET, + "start_time": self.start_time, + "end_time": self.end_time, + "instruments": self.MARKET, }, } pre_handler = init_instance_by_config(handler_config) @@ -51,10 +51,13 @@ def rolling_process(self): self._dump_pre_handler("pre_handler.py") pre_handler = self._load_pre_handler("pre_handler.py") - init_start_time = datetime.datetime(2010,1,1) - init_end_time = datetime.datetime(2014,12,31) - init_fit_end_time = datetime.datetime(2012,12,31) - + train_start_time = (2010,1,1) + train_end_time = (2012,12,31) + valid_start_time = (2013,1,1) + valid_end_time = (2013,12,31) + test_start_time = (2014,1,1) + test_end_time = (2014,12,31) + dataset_config = { "class": "DatasetH", "module_path": "qlib.data.dataset", @@ -63,19 +66,19 @@ def rolling_process(self): "class": "RollingDataHandler", "module_path": "rolling_handler", "kwargs": { - "start_time": init_start_time, - "end_time": init_start_time, - "fit_start_time": init_fit_start_time, - "fit_end_time": init_fit_end_time, + "start_time": datetime(*train_start_time), + "end_time": datetime(*test_end_time), + "fit_start_time": datetime(*train_start_time), + "fit_end_time": datetime(*train_end_time), "data_loader_kwargs":{ "handler_config": pre_handler, } }, }, "segments": { - "train": (init_start_time, init_fit_end_time), - "valid": (init_start_time, "2013-12-31"), - "test": (init_start_time, init_end_time), + "train": (datetime(*train_start_time), datetime(*train_end_time)), + "valid": (datetime(*valid_start_time), datetime(*valid_end_time)), + "test": (datetime(*test_start_time), datetime(*test_end_time)), }, }, } @@ -86,17 +89,19 @@ def rolling_process(self): if rolling_offset: dataset.init( handler_kwargs={ - "init_type": DataHandlerLP.IT_FIT_IND, - "start_time": "2021-01-19 00:00:00", - "end_time": "2021-01-25 16:00:00", + "init_type": DataHandlerLP.IT_FIT_SEQ, + "start_time": datetime(train_start_time[0] + 1, *train_start_time[1:]), + "end_time": datetime(test_end_time[0] + 1, *test_end_time[1:]), }, segment_kwargs={ - "train": ("2010-01-01", "2012-12-31"), - "valid": ("2013-01-01", "2013-12-31"), - "test": ("2014-01-01", "2014-12-31"), + "train": (datetime(train_start_time[0] + 1, *train_start_time[1:]), datetime(train_end_time[0], *train_end_time[1:])), + "valid": (datetime(valid_start_time[0] + 1, *valid_start_time[1:]), datetime(valid_end_time[0], *valid_end_time[1:])), + "test": (datetime(test_start_time[0] + 1, *test_start_time[1:]), datetime(test_end_time[0], *test_end_time[1:])), }, ) + dtrain, dvalid, dtest = dataset.prepare(["train", "valid", "test"]) + if __name__ == "__main__": From efe134e9f4f5445055f9c1cd30576bf5f6b42217 Mon Sep 17 00:00:00 2001 From: bxdd Date: Thu, 25 Mar 2021 19:56:04 +0800 Subject: [PATCH 06/23] update workflow --- examples/rolling_process_data/rolling_handler.py | 8 +++----- examples/rolling_process_data/workflow.py | 2 +- qlib/data/dataset/loader.py | 4 +--- 3 files changed, 5 insertions(+), 9 deletions(-) diff --git a/examples/rolling_process_data/rolling_handler.py b/examples/rolling_process_data/rolling_handler.py index 50a36f21973..13b399afd87 100644 --- a/examples/rolling_process_data/rolling_handler.py +++ b/examples/rolling_process_data/rolling_handler.py @@ -12,17 +12,15 @@ def __init__( learn_processors=[], fit_start_time=None, fit_end_time=None, - data_loader_kwargs={} + data_loader_kwargs={}, ): infer_processors = check_transform_proc(infer_processors, fit_start_time, fit_end_time) learn_processors = check_transform_proc(learn_processors, fit_start_time, fit_end_time) data_loader = { "class": "DataLoaderDH", - "kwargs": { - **data_loader_kwargs - }, - } + "kwargs": {**data_loader_kwargs}, + } super().__init__( instruments=None, diff --git a/examples/rolling_process_data/workflow.py b/examples/rolling_process_data/workflow.py index 62523aefd74..9b61af47e58 100644 --- a/examples/rolling_process_data/workflow.py +++ b/examples/rolling_process_data/workflow.py @@ -101,7 +101,7 @@ def rolling_process(self): ) dtrain, dvalid, dtest = dataset.prepare(["train", "valid", "test"]) - + if __name__ == "__main__": diff --git a/qlib/data/dataset/loader.py b/qlib/data/dataset/loader.py index f88aaf05e8e..539b930ec3f 100644 --- a/qlib/data/dataset/loader.py +++ b/qlib/data/dataset/loader.py @@ -258,9 +258,7 @@ def __init__(self, handler_config: dict, fetch_kwargs: dict = {}, is_group=False self.handlers = init_instance_by_config(handler_config, accept_types=DataHandler) self.is_group = is_group - self.fetch_kwargs = { - "col_set":DataHandler.CS_RAW - } + self.fetch_kwargs = {"col_set": DataHandler.CS_RAW} self.fetch_kwargs = {**self.fetch_kwargs, **fetch_kwargs} def load(self, instruments=None, start_time=None, end_time=None) -> pd.DataFrame: From a04c6bd6c941027d1beab07d65be8712d41e2406 Mon Sep 17 00:00:00 2001 From: bxdd Date: Thu, 25 Mar 2021 19:56:22 +0800 Subject: [PATCH 07/23] balck format --- examples/rolling_process_data/workflow.py | 43 ++++++++++++++--------- 1 file changed, 26 insertions(+), 17 deletions(-) diff --git a/examples/rolling_process_data/workflow.py b/examples/rolling_process_data/workflow.py index 9b61af47e58..9dd4285da94 100644 --- a/examples/rolling_process_data/workflow.py +++ b/examples/rolling_process_data/workflow.py @@ -12,11 +12,12 @@ from qlib.utils import exists_qlib_data, init_instance_by_config from qlib.tests.data import GetData + class RollingDataWorkflow(object): MARKET = "csi300" start_time = "2010-01-01" - end_time = "2019-12-31" + end_time = "2019-12-31" rolling_cnt = 5 def _init_qlib(self): @@ -27,7 +28,7 @@ def _init_qlib(self): print(f"Qlib data is not found in {provider_uri}") GetData().qlib_data(target_dir=provider_uri, region=REG_CN) qlib.init(provider_uri=provider_uri, region=REG_CN) - + def _dump_pre_handler(self, path): handler_config = { "class": "Alpha158", @@ -51,13 +52,13 @@ def rolling_process(self): self._dump_pre_handler("pre_handler.py") pre_handler = self._load_pre_handler("pre_handler.py") - train_start_time = (2010,1,1) - train_end_time = (2012,12,31) - valid_start_time = (2013,1,1) - valid_end_time = (2013,12,31) - test_start_time = (2014,1,1) - test_end_time = (2014,12,31) - + train_start_time = (2010, 1, 1) + train_end_time = (2012, 12, 31) + valid_start_time = (2013, 1, 1) + valid_end_time = (2013, 12, 31) + test_start_time = (2014, 1, 1) + test_end_time = (2014, 12, 31) + dataset_config = { "class": "DatasetH", "module_path": "qlib.data.dataset", @@ -70,9 +71,9 @@ def rolling_process(self): "end_time": datetime(*test_end_time), "fit_start_time": datetime(*train_start_time), "fit_end_time": datetime(*train_end_time), - "data_loader_kwargs":{ + "data_loader_kwargs": { "handler_config": pre_handler, - } + }, }, }, "segments": { @@ -94,14 +95,23 @@ def rolling_process(self): "end_time": datetime(test_end_time[0] + 1, *test_end_time[1:]), }, segment_kwargs={ - "train": (datetime(train_start_time[0] + 1, *train_start_time[1:]), datetime(train_end_time[0], *train_end_time[1:])), - "valid": (datetime(valid_start_time[0] + 1, *valid_start_time[1:]), datetime(valid_end_time[0], *valid_end_time[1:])), - "test": (datetime(test_start_time[0] + 1, *test_start_time[1:]), datetime(test_end_time[0], *test_end_time[1:])), + "train": ( + datetime(train_start_time[0] + 1, *train_start_time[1:]), + datetime(train_end_time[0], *train_end_time[1:]), + ), + "valid": ( + datetime(valid_start_time[0] + 1, *valid_start_time[1:]), + datetime(valid_end_time[0], *valid_end_time[1:]), + ), + "test": ( + datetime(test_start_time[0] + 1, *test_start_time[1:]), + datetime(test_end_time[0], *test_end_time[1:]), + ), }, ) - dtrain, dvalid, dtest = dataset.prepare(["train", "valid", "test"]) - + dtrain, dvalid, dtest = dataset.prepare(["train", "valid", "test"]) + if __name__ == "__main__": @@ -147,4 +157,3 @@ def rolling_process(self): } dataset = init_instance_by_config(task["dataset"]) - From 68246b3b6d7037f3134ceb6e59aef869e96f1d8f Mon Sep 17 00:00:00 2001 From: bxdd Date: Thu, 25 Mar 2021 19:58:55 +0800 Subject: [PATCH 08/23] update workflow --- examples/rolling_process_data/workflow.py | 87 +++++------------------ 1 file changed, 18 insertions(+), 69 deletions(-) diff --git a/examples/rolling_process_data/workflow.py b/examples/rolling_process_data/workflow.py index 9dd4285da94..2f48662bd65 100644 --- a/examples/rolling_process_data/workflow.py +++ b/examples/rolling_process_data/workflow.py @@ -2,6 +2,7 @@ # Licensed under the MIT License. import qlib +import fire import pickle import pandas as pd @@ -12,12 +13,11 @@ from qlib.utils import exists_qlib_data, init_instance_by_config from qlib.tests.data import GetData - class RollingDataWorkflow(object): MARKET = "csi300" start_time = "2010-01-01" - end_time = "2019-12-31" + end_time = "2019-12-31" rolling_cnt = 5 def _init_qlib(self): @@ -28,7 +28,7 @@ def _init_qlib(self): print(f"Qlib data is not found in {provider_uri}") GetData().qlib_data(target_dir=provider_uri, region=REG_CN) qlib.init(provider_uri=provider_uri, region=REG_CN) - + def _dump_pre_handler(self, path): handler_config = { "class": "Alpha158", @@ -52,13 +52,13 @@ def rolling_process(self): self._dump_pre_handler("pre_handler.py") pre_handler = self._load_pre_handler("pre_handler.py") - train_start_time = (2010, 1, 1) - train_end_time = (2012, 12, 31) - valid_start_time = (2013, 1, 1) - valid_end_time = (2013, 12, 31) - test_start_time = (2014, 1, 1) - test_end_time = (2014, 12, 31) - + train_start_time = (2010,1,1) + train_end_time = (2012,12,31) + valid_start_time = (2013,1,1) + valid_end_time = (2013,12,31) + test_start_time = (2014,1,1) + test_end_time = (2014,12,31) + dataset_config = { "class": "DatasetH", "module_path": "qlib.data.dataset", @@ -71,9 +71,9 @@ def rolling_process(self): "end_time": datetime(*test_end_time), "fit_start_time": datetime(*train_start_time), "fit_end_time": datetime(*train_end_time), - "data_loader_kwargs": { + "data_loader_kwargs":{ "handler_config": pre_handler, - }, + } }, }, "segments": { @@ -95,65 +95,14 @@ def rolling_process(self): "end_time": datetime(test_end_time[0] + 1, *test_end_time[1:]), }, segment_kwargs={ - "train": ( - datetime(train_start_time[0] + 1, *train_start_time[1:]), - datetime(train_end_time[0], *train_end_time[1:]), - ), - "valid": ( - datetime(valid_start_time[0] + 1, *valid_start_time[1:]), - datetime(valid_end_time[0], *valid_end_time[1:]), - ), - "test": ( - datetime(test_start_time[0] + 1, *test_start_time[1:]), - datetime(test_end_time[0], *test_end_time[1:]), - ), + "train": (datetime(train_start_time[0] + 1, *train_start_time[1:]), datetime(train_end_time[0], *train_end_time[1:])), + "valid": (datetime(valid_start_time[0] + 1, *valid_start_time[1:]), datetime(valid_end_time[0], *valid_end_time[1:])), + "test": (datetime(test_start_time[0] + 1, *test_start_time[1:]), datetime(test_end_time[0], *test_end_time[1:])), }, ) - dtrain, dvalid, dtest = dataset.prepare(["train", "valid", "test"]) - + dtrain, dvalid, dtest = dataset.prepare(["train", "valid", "test"]) + if __name__ == "__main__": - - # use default data - provider_uri = "~/.qlib/qlib_data/cn_data" # target_dir - if not exists_qlib_data(provider_uri): - print(f"Qlib data is not found in {provider_uri}") - GetData().qlib_data(target_dir=provider_uri, region=REG_CN) - - qlib.init(provider_uri=provider_uri, region=REG_CN) - - market = "csi300" - benchmark = "SH000300" - - ################################### - # train model - ################################### - data_handler_config = { - "start_time": "2008-01-01", - "end_time": "2020-08-01", - "fit_start_time": "2008-01-01", - "fit_end_time": "2014-12-31", - "instruments": market, - } - - task = { - "dataset": { - "class": "DatasetH", - "module_path": "qlib.data.dataset", - "kwargs": { - "handler": { - "class": "Alpha158", - "module_path": "qlib.contrib.data.handler", - "kwargs": data_handler_config, - }, - "segments": { - "train": ("2008-01-01", "2014-12-31"), - "valid": ("2015-01-01", "2016-12-31"), - "test": ("2017-01-01", "2020-08-01"), - }, - }, - }, - } - - dataset = init_instance_by_config(task["dataset"]) + fire.Fire(RollingDataWorkflow) From e119c8576c78f7729364358ce1a3515ca682177a Mon Sep 17 00:00:00 2001 From: bxdd Date: Thu, 25 Mar 2021 19:59:22 +0800 Subject: [PATCH 09/23] black format --- examples/rolling_process_data/workflow.py | 42 ++++++++++++++--------- 1 file changed, 26 insertions(+), 16 deletions(-) diff --git a/examples/rolling_process_data/workflow.py b/examples/rolling_process_data/workflow.py index 2f48662bd65..d5f7fec1074 100644 --- a/examples/rolling_process_data/workflow.py +++ b/examples/rolling_process_data/workflow.py @@ -13,11 +13,12 @@ from qlib.utils import exists_qlib_data, init_instance_by_config from qlib.tests.data import GetData + class RollingDataWorkflow(object): MARKET = "csi300" start_time = "2010-01-01" - end_time = "2019-12-31" + end_time = "2019-12-31" rolling_cnt = 5 def _init_qlib(self): @@ -28,7 +29,7 @@ def _init_qlib(self): print(f"Qlib data is not found in {provider_uri}") GetData().qlib_data(target_dir=provider_uri, region=REG_CN) qlib.init(provider_uri=provider_uri, region=REG_CN) - + def _dump_pre_handler(self, path): handler_config = { "class": "Alpha158", @@ -52,13 +53,13 @@ def rolling_process(self): self._dump_pre_handler("pre_handler.py") pre_handler = self._load_pre_handler("pre_handler.py") - train_start_time = (2010,1,1) - train_end_time = (2012,12,31) - valid_start_time = (2013,1,1) - valid_end_time = (2013,12,31) - test_start_time = (2014,1,1) - test_end_time = (2014,12,31) - + train_start_time = (2010, 1, 1) + train_end_time = (2012, 12, 31) + valid_start_time = (2013, 1, 1) + valid_end_time = (2013, 12, 31) + test_start_time = (2014, 1, 1) + test_end_time = (2014, 12, 31) + dataset_config = { "class": "DatasetH", "module_path": "qlib.data.dataset", @@ -71,9 +72,9 @@ def rolling_process(self): "end_time": datetime(*test_end_time), "fit_start_time": datetime(*train_start_time), "fit_end_time": datetime(*train_end_time), - "data_loader_kwargs":{ + "data_loader_kwargs": { "handler_config": pre_handler, - } + }, }, }, "segments": { @@ -95,14 +96,23 @@ def rolling_process(self): "end_time": datetime(test_end_time[0] + 1, *test_end_time[1:]), }, segment_kwargs={ - "train": (datetime(train_start_time[0] + 1, *train_start_time[1:]), datetime(train_end_time[0], *train_end_time[1:])), - "valid": (datetime(valid_start_time[0] + 1, *valid_start_time[1:]), datetime(valid_end_time[0], *valid_end_time[1:])), - "test": (datetime(test_start_time[0] + 1, *test_start_time[1:]), datetime(test_end_time[0], *test_end_time[1:])), + "train": ( + datetime(train_start_time[0] + 1, *train_start_time[1:]), + datetime(train_end_time[0], *train_end_time[1:]), + ), + "valid": ( + datetime(valid_start_time[0] + 1, *valid_start_time[1:]), + datetime(valid_end_time[0], *valid_end_time[1:]), + ), + "test": ( + datetime(test_start_time[0] + 1, *test_start_time[1:]), + datetime(test_end_time[0], *test_end_time[1:]), + ), }, ) - dtrain, dvalid, dtest = dataset.prepare(["train", "valid", "test"]) - + dtrain, dvalid, dtest = dataset.prepare(["train", "valid", "test"]) + if __name__ == "__main__": fire.Fire(RollingDataWorkflow) From 9cc3b18e4e9cd61f7745271a01d628063b1b48a3 Mon Sep 17 00:00:00 2001 From: bxdd Date: Thu, 25 Mar 2021 20:36:07 +0800 Subject: [PATCH 10/23] fix but --- examples/rolling_process_data/README.md | 1 - examples/rolling_process_data/workflow.py | 19 ++++++++++++++++--- qlib/data/dataset/loader.py | 6 ++++-- 3 files changed, 20 insertions(+), 6 deletions(-) diff --git a/examples/rolling_process_data/README.md b/examples/rolling_process_data/README.md index 3f1c8768d5e..6a6af0d3d6c 100644 --- a/examples/rolling_process_data/README.md +++ b/examples/rolling_process_data/README.md @@ -1,2 +1 @@ # Rolling Process Data - diff --git a/examples/rolling_process_data/workflow.py b/examples/rolling_process_data/workflow.py index d5f7fec1074..29b1c19f89c 100644 --- a/examples/rolling_process_data/workflow.py +++ b/examples/rolling_process_data/workflow.py @@ -38,9 +38,12 @@ def _dump_pre_handler(self, path): "start_time": self.start_time, "end_time": self.end_time, "instruments": self.MARKET, + "infer_processors": [], + "learn_processors": [], }, } pre_handler = init_instance_by_config(handler_config) + pre_handler.config(dump_all=True) pre_handler.to_pickle(path) def _load_pre_handler(self, path): @@ -50,8 +53,8 @@ def _load_pre_handler(self, path): def rolling_process(self): self._init_qlib() - self._dump_pre_handler("pre_handler.py") - pre_handler = self._load_pre_handler("pre_handler.py") + self._dump_pre_handler("pre_handler.pkl") + pre_handler = self._load_pre_handler("pre_handler.pkl") train_start_time = (2010, 1, 1) train_end_time = (2012, 12, 31) @@ -72,6 +75,13 @@ def rolling_process(self): "end_time": datetime(*test_end_time), "fit_start_time": datetime(*train_start_time), "fit_end_time": datetime(*train_end_time), + "infer_processors": [ + {"class":"RobustZScoreNorm", "kwargs": {"fields_group": "feature"}}, + ], + "learn_processors": [ + {"class": "DropnaLabel"}, + {"class": "CSZScoreNorm", "kwargs": {"fields_group": "label"}}, + ], "data_loader_kwargs": { "handler_config": pre_handler, }, @@ -87,7 +97,8 @@ def rolling_process(self): dataset = init_instance_by_config(dataset_config) - for rolling_offset in range(rolling_cnt): + for rolling_offset in range(self.rolling_cnt): + print(f"===========rolling{rolling_offset} start===========") if rolling_offset: dataset.init( handler_kwargs={ @@ -112,6 +123,8 @@ def rolling_process(self): ) dtrain, dvalid, dtest = dataset.prepare(["train", "valid", "test"]) + ## print or dump data + print(f"===========rolling{rolling_offset} end===========") if __name__ == "__main__": diff --git a/qlib/data/dataset/loader.py b/qlib/data/dataset/loader.py index 539b930ec3f..1cda5c0250d 100644 --- a/qlib/data/dataset/loader.py +++ b/qlib/data/dataset/loader.py @@ -250,7 +250,9 @@ def __init__(self, handler_config: dict, fetch_kwargs: dict = {}, is_group=False is_group will be used to describe whether the key of handler_config is group """ - if self.is_group: + from qlib.data.dataset.handler import DataHandler + + if is_group: self.handlers = { grp: init_instance_by_config(config, accept_types=DataHandler) for grp, config in handler_config.items() } @@ -274,5 +276,5 @@ def load(self, instruments=None, start_time=None, end_time=None) -> pd.DataFrame axis=1, ) else: - df = self.handler.fetch(selector=slice(start_time, end_time), level="datetime", **self.fetch_kwargs) + df = self.handlers.fetch(selector=slice(start_time, end_time), level="datetime", **self.fetch_kwargs) return df From d6ff764bb270017b74099205dcfb78ade161a9e7 Mon Sep 17 00:00:00 2001 From: bxdd Date: Thu, 25 Mar 2021 20:36:45 +0800 Subject: [PATCH 11/23] black format --- examples/rolling_process_data/workflow.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/rolling_process_data/workflow.py b/examples/rolling_process_data/workflow.py index 29b1c19f89c..3b38faa3156 100644 --- a/examples/rolling_process_data/workflow.py +++ b/examples/rolling_process_data/workflow.py @@ -76,7 +76,7 @@ def rolling_process(self): "fit_start_time": datetime(*train_start_time), "fit_end_time": datetime(*train_end_time), "infer_processors": [ - {"class":"RobustZScoreNorm", "kwargs": {"fields_group": "feature"}}, + {"class": "RobustZScoreNorm", "kwargs": {"fields_group": "feature"}}, ], "learn_processors": [ {"class": "DropnaLabel"}, From 194217fb07696530d5b575567c5bb664d479948d Mon Sep 17 00:00:00 2001 From: bxdd Date: Thu, 25 Mar 2021 21:47:17 +0800 Subject: [PATCH 12/23] fix bug --- examples/rolling_process_data/workflow.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/examples/rolling_process_data/workflow.py b/examples/rolling_process_data/workflow.py index 3b38faa3156..719d93a1bea 100644 --- a/examples/rolling_process_data/workflow.py +++ b/examples/rolling_process_data/workflow.py @@ -103,21 +103,21 @@ def rolling_process(self): dataset.init( handler_kwargs={ "init_type": DataHandlerLP.IT_FIT_SEQ, - "start_time": datetime(train_start_time[0] + 1, *train_start_time[1:]), - "end_time": datetime(test_end_time[0] + 1, *test_end_time[1:]), + "start_time": datetime(train_start_time[0] + rolling_offset, *train_start_time[1:]), + "end_time": datetime(test_end_time[0] + rolling_offset, *test_end_time[1:]), }, segment_kwargs={ "train": ( - datetime(train_start_time[0] + 1, *train_start_time[1:]), - datetime(train_end_time[0], *train_end_time[1:]), + datetime(train_start_time[0] + rolling_offset, *train_start_time[1:]), + datetime(train_end_time[0] + rolling_offset, *train_end_time[1:]), ), "valid": ( - datetime(valid_start_time[0] + 1, *valid_start_time[1:]), - datetime(valid_end_time[0], *valid_end_time[1:]), + datetime(valid_start_time[0] + rolling_offset, *valid_start_time[1:]), + datetime(valid_end_time[0] + rolling_offset, *valid_end_time[1:]), ), "test": ( - datetime(test_start_time[0] + 1, *test_start_time[1:]), - datetime(test_end_time[0], *test_end_time[1:]), + datetime(test_start_time[0] + rolling_offset, *test_start_time[1:]), + datetime(test_end_time[0] + rolling_offset, *test_end_time[1:]), ), }, ) From 5f60d18dfe2fa71d341ee7e8128f0f4c1f79c119 Mon Sep 17 00:00:00 2001 From: bxdd Date: Thu, 25 Mar 2021 22:08:23 +0800 Subject: [PATCH 13/23] fix config_data bug --- examples/rolling_process_data/workflow.py | 4 ++++ qlib/data/dataset/__init__.py | 2 +- qlib/data/dataset/handler.py | 28 ++++++++++++++++++++--- 3 files changed, 30 insertions(+), 4 deletions(-) diff --git a/examples/rolling_process_data/workflow.py b/examples/rolling_process_data/workflow.py index 719d93a1bea..0be88dddcc5 100644 --- a/examples/rolling_process_data/workflow.py +++ b/examples/rolling_process_data/workflow.py @@ -98,6 +98,7 @@ def rolling_process(self): dataset = init_instance_by_config(dataset_config) for rolling_offset in range(self.rolling_cnt): + print(f"===========rolling{rolling_offset} start===========") if rolling_offset: dataset.init( @@ -105,6 +106,8 @@ def rolling_process(self): "init_type": DataHandlerLP.IT_FIT_SEQ, "start_time": datetime(train_start_time[0] + rolling_offset, *train_start_time[1:]), "end_time": datetime(test_end_time[0] + rolling_offset, *test_end_time[1:]), + "fit_start_time": datetime(train_start_time[0] + rolling_offset, *train_start_time[1:]), + "fit_end_time": datetime(train_end_time[0] + rolling_offset, *train_end_time[1:]), }, segment_kwargs={ "train": ( @@ -123,6 +126,7 @@ def rolling_process(self): ) dtrain, dvalid, dtest = dataset.prepare(["train", "valid", "test"]) + print(dtrain, dvalid, dtest) ## print or dump data print(f"===========rolling{rolling_offset} end===========") diff --git a/qlib/data/dataset/__init__.py b/qlib/data/dataset/__init__.py index 0f5d2baba0c..518b8eecd93 100644 --- a/qlib/data/dataset/__init__.py +++ b/qlib/data/dataset/__init__.py @@ -98,7 +98,7 @@ def init(self, handler_kwargs: dict = None, segment_kwargs: dict = None): raise TypeError(f"param handler_kwargs must be type dict, not {type(handler_kwargs)}") kwargs_init = {} kwargs_conf_data = {} - conf_data_arg = {"instruments", "start_time", "end_time"} + conf_data_arg = {"instruments", "start_time", "end_time", "fit_start_time", "fit_end_time"} for k, v in handler_kwargs.items(): if k in conf_data_arg: kwargs_conf_data.update({k: v}) diff --git a/qlib/data/dataset/handler.py b/qlib/data/dataset/handler.py index f4795c5667d..40db5e4f38f 100644 --- a/qlib/data/dataset/handler.py +++ b/qlib/data/dataset/handler.py @@ -115,8 +115,7 @@ def conf_data(self, **kwargs): for k, v in kwargs.items(): if k in attr_list: setattr(self, k, v) - else: - raise KeyError("Such config is not supported.") + def init(self, enable_cache: bool = False): """ @@ -405,11 +404,34 @@ def process_data(self, with_fit: bool = False): if self.drop_raw: del self._data + + def conf_data(self, **kwargs): + """ + configuration of data. + # what data to be loaded from data source + + This method will be used when loading pickled handler from dataset. + The data will be initialized with different time range. + + """ + attr_list = {"fit_start_time", "fit_end_time"} + for k, v in kwargs.items(): + if k in attr_list: + for infer_processor in self.infer_processors: + if getattr(infer_processor, k, None): + setattr(infer_processor, k, v) + + for learn_processor in self.learn_processors: + if getattr(learn_processor, k, None): + setattr(learn_processor, k, v) + + super().conf_data(**kwargs) + # init type IT_FIT_SEQ = "fit_seq" # the input of `fit` will be the output of the previous processor IT_FIT_IND = "fit_ind" # the input of `fit` will be the original df IT_LS = "load_state" # The state of the object has been load by pickle - + def init(self, init_type: str = IT_FIT_SEQ, enable_cache: bool = False): """ Initialize the data of Qlib From 4ee0240c2483383a28099d97e5688bce8ea030b1 Mon Sep 17 00:00:00 2001 From: bxdd Date: Thu, 25 Mar 2021 22:08:39 +0800 Subject: [PATCH 14/23] black format --- qlib/data/dataset/handler.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/qlib/data/dataset/handler.py b/qlib/data/dataset/handler.py index 40db5e4f38f..9aa05b9b908 100644 --- a/qlib/data/dataset/handler.py +++ b/qlib/data/dataset/handler.py @@ -116,7 +116,6 @@ def conf_data(self, **kwargs): if k in attr_list: setattr(self, k, v) - def init(self, enable_cache: bool = False): """ initialize the data. @@ -404,7 +403,6 @@ def process_data(self, with_fit: bool = False): if self.drop_raw: del self._data - def conf_data(self, **kwargs): """ configuration of data. @@ -431,7 +429,7 @@ def conf_data(self, **kwargs): IT_FIT_SEQ = "fit_seq" # the input of `fit` will be the output of the previous processor IT_FIT_IND = "fit_ind" # the input of `fit` will be the original df IT_LS = "load_state" # The state of the object has been load by pickle - + def init(self, init_type: str = IT_FIT_SEQ, enable_cache: bool = False): """ Initialize the data of Qlib From 31bc85bf867ba2161c638b819b41e3cb7e863ce1 Mon Sep 17 00:00:00 2001 From: bxdd Date: Mon, 29 Mar 2021 19:49:30 +0800 Subject: [PATCH 15/23] restructure data layer config & setup --- examples/highfreq/highfreq_processor.py | 7 ++ examples/highfreq/workflow.py | 21 +++-- qlib/data/dataset/__init__.py | 116 ++++++++++++++---------- qlib/data/dataset/handler.py | 46 +++++----- qlib/data/dataset/loader.py | 1 - qlib/data/dataset/processor.py | 22 +++++ 6 files changed, 132 insertions(+), 81 deletions(-) diff --git a/examples/highfreq/highfreq_processor.py b/examples/highfreq/highfreq_processor.py index f0ab0dec2b1..4ec8f3dd2ca 100644 --- a/examples/highfreq/highfreq_processor.py +++ b/examples/highfreq/highfreq_processor.py @@ -70,3 +70,10 @@ def __call__(self, df_features): columns=["FEATURE_%d" % i for i in range(12 * 240)], ).sort_index() return df_new_features + + def config(fit_start_time=None, fit_end_time=None, **kwargs): + if fit_start_time: + self.fit_start_time = fit_start_time + if fit_end_time: + self.fit_end_time = fit_end_time + super().config(**kwargs) diff --git a/examples/highfreq/workflow.py b/examples/highfreq/workflow.py index c2ca36db344..0b48b971f9f 100644 --- a/examples/highfreq/workflow.py +++ b/examples/highfreq/workflow.py @@ -31,7 +31,7 @@ class HighfreqWorkflow(object): SPEC_CONF = {"custom_ops": [DayLast, FFillNan, BFillNan, Date, Select, IsNull, Cut], "expression_cache": None} - MARKET = "all" + MARKET = "csi300" start_time = "2020-09-15 00:00:00" end_time = "2021-01-18 16:00:00" @@ -145,35 +145,40 @@ def dump_and_load_dataset(self): self._prepare_calender_cache() ##=============reinit dataset============= - dataset.init( + dataset.config( handler_kwargs={ - "init_type": DataHandlerLP.IT_LS, "start_time": "2021-01-19 00:00:00", "end_time": "2021-01-25 16:00:00", }, - segment_kwargs={ + segments={ "test": ( "2021-01-19 00:00:00", "2021-01-25 16:00:00", ), }, ) - dataset_backtest.init( + dataset.setup_data( + handler_kwargs={ + "init_type": DataHandlerLP.IT_LS, + }, + ) + dataset_backtest.config( handler_kwargs={ "start_time": "2021-01-19 00:00:00", "end_time": "2021-01-25 16:00:00", }, - segment_kwargs={ + segments={ "test": ( "2021-01-19 00:00:00", "2021-01-25 16:00:00", ), }, ) + dataset_backtest.setup_data(handler_kwargs={}) ##=============get data============= - xtest = dataset.prepare(["test"]) - backtest_test = dataset_backtest.prepare(["test"]) + xtest, = dataset.prepare(["test"]) + backtest_test, = dataset_backtest.prepare(["test"]) print(xtest, backtest_test) return diff --git a/qlib/data/dataset/__init__.py b/qlib/data/dataset/__init__.py index 518b8eecd93..aa90cee2fed 100644 --- a/qlib/data/dataset/__init__.py +++ b/qlib/data/dataset/__init__.py @@ -20,17 +20,25 @@ def __init__(self, *args, **kwargs): """ init is designed to finish following steps: - - setup data - - The data related attributes' names should start with '_' so that it will not be saved on disk when serializing. + - init instance - - initialize the state of the dataset(info to prepare the data) + - config the state of the dataset(info to prepare the data) - The name of essential state for preparing data should not start with '_' so that it could be serialized on disk when serializing. + - setup data + - The data related attributes' names should start with '_' so that it will not be saved on disk when serializing. + The data could specify the info to caculate the essential data for preparation """ self.setup_data(*args, **kwargs) super().__init__() + def config(self, *arg, **kwargs): + """ + config is designed to configure and parameters that cannot be learned from the data + """ + super().config(*arg, **kwargs) + def setup_data(self, *args, **kwargs): """ Setup the data. @@ -39,7 +47,7 @@ def setup_data(self, *args, **kwargs): - User have a Dataset object with learned status on disk. - - User load the Dataset object from the disk(Note the init function is skiped). + - User load the Dataset object from the disk. - User call `setup_data` to load new data. @@ -76,44 +84,7 @@ class DatasetH(Dataset): - The processing is related to data split. """ - def init(self, handler_kwargs: dict = None, segment_kwargs: dict = None): - """ - Initialize the DatasetH - - Parameters - ---------- - handler_kwargs : dict - Config of DataHanlder, which could include the following arguments: - - - arguments of DataHandler.conf_data, such as 'instruments', 'start_time' and 'end_time'. - - - arguments of DataHandler.init, such as 'enable_cache', etc. - - segment_kwargs : dict - Config of segments which is same as 'segments' in DatasetH.setup_data - - """ - if handler_kwargs: - if not isinstance(handler_kwargs, dict): - raise TypeError(f"param handler_kwargs must be type dict, not {type(handler_kwargs)}") - kwargs_init = {} - kwargs_conf_data = {} - conf_data_arg = {"instruments", "start_time", "end_time", "fit_start_time", "fit_end_time"} - for k, v in handler_kwargs.items(): - if k in conf_data_arg: - kwargs_conf_data.update({k: v}) - else: - kwargs_init.update({k: v}) - - self.handler.conf_data(**kwargs_conf_data) - self.handler.init(**kwargs_init) - - if segment_kwargs: - if not isinstance(segment_kwargs, dict): - raise TypeError(f"param handler_kwargs must be type dict, not {type(segment_kwargs)}") - self.segments = segment_kwargs.copy() - - def setup_data(self, handler: Union[Dict, DataHandler], segments: Dict[Text, Tuple]): + def __init__(self, handler: Union[Dict, DataHandler], segments: Dict[Text, Tuple], **kwargs): """ Setup the underlying data. @@ -144,6 +115,52 @@ def setup_data(self, handler: Union[Dict, DataHandler], segments: Dict[Text, Tup """ self.handler = init_instance_by_config(handler, accept_types=DataHandler) self.segments = segments.copy() + super().__init__(**kwargs) + + def config(self, handler_kwargs:dict = None, segments:dict = None, **kwargs): + """ + Initialize the DatasetH + + Parameters + ---------- + handler_kwargs : dict + Config of DataHanlder, which could include the following arguments: + + - arguments of DataHandler.conf_data, such as 'instruments', 'start_time' and 'end_time'. + + kwargs : dict + Config of DatasetH, such as + + - segments : dict + Config of segments which is same as 'segments' in self.__init__ + + """ + super().config(**kwargs) + if handler_kwargs is not None: + self.handler.config(**handler_kwargs) + if segments is not None: + self.segments = segments.copy() + + + + def setup_data(self, handler_kwargs: dict = None, **kwargs): + """ + Setup the Data + + Parameters + ---------- + handler_kwargs : dict + init arguments of DataHanlder, which could include the following arguments: + + - init_type : Init Type of Handler + + - enable_cache : wheter to enable cache + + """ + super().setup_data(**kwargs) + if handler_kwargs is not None: + self.handler.setup_data(**handler_kwargs) + def __repr__(self): return "{name}(handler={handler}, segments={segments})".format( @@ -433,16 +450,21 @@ class TSDatasetH(DatasetH): - The dimension of a batch of data """ - def __init__(self, step_len=30, *args, **kwargs): + def __init__(self, step_len=30, **kwargs): self.step_len = step_len - super().__init__(*args, **kwargs) + super().__init__(**kwargs) - def setup_data(self, *args, **kwargs): - super().setup_data(*args, **kwargs) + def config(self, step_len=None, **kwargs): + super().config(**kwargs) + if step_len: + self.step_len = step_len + + def setup_data(self, **kwargs): + super().setup_data(**kwargs) cal = self.handler.fetch(col_set=self.handler.CS_RAW).index.get_level_values("datetime").unique() cal = sorted(cal) - # Get the datatime index for building timestamp self.cal = cal + def _prepare_seg(self, slc: slice, **kwargs) -> TSDataSampler: # Dataset decide how to slice data(Get more data for timeseries). diff --git a/qlib/data/dataset/handler.py b/qlib/data/dataset/handler.py index 9aa05b9b908..712cd62327f 100644 --- a/qlib/data/dataset/handler.py +++ b/qlib/data/dataset/handler.py @@ -6,6 +6,7 @@ import bisect import logging import warnings +from inspect import getfullargspec from typing import Union, Tuple, List, Iterator, Optional import pandas as pd @@ -99,10 +100,10 @@ def __init__( self.fetch_orig = fetch_orig if init_data: with TimeInspector.logt("Init data"): - self.init() + self.setup_data() super().__init__() - def conf_data(self, **kwargs): + def config(self, instruments=None, start_time=None, end_time=None, **kwargs): """ configuration of data. # what data to be loaded from data source @@ -111,14 +112,17 @@ def conf_data(self, **kwargs): The data will be initialized with different time range. """ - attr_list = {"instruments", "start_time", "end_time"} - for k, v in kwargs.items(): - if k in attr_list: - setattr(self, k, v) - - def init(self, enable_cache: bool = False): + super().config(**kwargs) + if instruments: + self.instruments = instruments + if start_time: + self.start_time = start_time + if end_time: + self.end_time = end_time + + def setup_data(self, enable_cache: bool = False): """ - initialize the data. + Set Up the data. In case of running intialization for multiple time, it will do nothing for the second time. It is responsible for maintaining following variable @@ -403,7 +407,7 @@ def process_data(self, with_fit: bool = False): if self.drop_raw: del self._data - def conf_data(self, **kwargs): + def config(self, processors_kwargs:dict = None, **kwargs): """ configuration of data. # what data to be loaded from data source @@ -412,27 +416,19 @@ def conf_data(self, **kwargs): The data will be initialized with different time range. """ - attr_list = {"fit_start_time", "fit_end_time"} - for k, v in kwargs.items(): - if k in attr_list: - for infer_processor in self.infer_processors: - if getattr(infer_processor, k, None): - setattr(infer_processor, k, v) - - for learn_processor in self.learn_processors: - if getattr(learn_processor, k, None): - setattr(learn_processor, k, v) - - super().conf_data(**kwargs) + super().config(**kwargs) + if processors_kwargs is not None: + for processor in self.get_all_processors(): + processor.config(**processor_kwargs) # init type IT_FIT_SEQ = "fit_seq" # the input of `fit` will be the output of the previous processor IT_FIT_IND = "fit_ind" # the input of `fit` will be the original df IT_LS = "load_state" # The state of the object has been load by pickle - def init(self, init_type: str = IT_FIT_SEQ, enable_cache: bool = False): + def setup_data(self, init_type: str = IT_FIT_SEQ, **kwargs): """ - Initialize the data of Qlib + Set up the data of Qlib Parameters ---------- @@ -447,7 +443,7 @@ def init(self, init_type: str = IT_FIT_SEQ, enable_cache: bool = False): when we call `init` next time """ # init raw data - super().init(enable_cache=enable_cache) + super().setup_data(**kwargs) with TimeInspector.logt("fit & process data"): if init_type == DataHandlerLP.IT_FIT_IND: diff --git a/qlib/data/dataset/loader.py b/qlib/data/dataset/loader.py index 1cda5c0250d..a58bca5e87a 100644 --- a/qlib/data/dataset/loader.py +++ b/qlib/data/dataset/loader.py @@ -53,7 +53,6 @@ def load(self, instruments, start_time=None, end_time=None) -> pd.DataFrame: """ pass - class DLWParser(DataLoader): """ (D)ata(L)oader (W)ith (P)arser for features and names diff --git a/qlib/data/dataset/processor.py b/qlib/data/dataset/processor.py index 5a06f66beff..e14e85831b0 100755 --- a/qlib/data/dataset/processor.py +++ b/qlib/data/dataset/processor.py @@ -72,6 +72,9 @@ def is_for_infer(self) -> bool: """ return True + def config(**kwargs): + super().config(kwargs.get("dump_all", None), kwargs.get("exclude", None)) + class DropnaProcessor(Processor): def __init__(self, fields_group=None): @@ -192,6 +195,12 @@ def normalize(x, min_val=self.min_val, max_val=self.max_val, ignore=self.ignore) df.loc(axis=1)[self.cols] = normalize(df[self.cols].values) return df + def config(fit_start_time=None, fit_end_time=None, **kwargs): + if fit_start_time: + self.fit_start_time = fit_start_time + if fit_end_time: + self.fit_end_time = fit_end_time + super().config(**kwargs) class ZScoreNorm(Processor): """ZScore Normalization""" @@ -220,6 +229,13 @@ def normalize(x, mean_train=self.mean_train, std_train=self.std_train, ignore=se df.loc(axis=1)[self.cols] = normalize(df[self.cols].values) return df + + def config(fit_start_time=None, fit_end_time=None, **kwargs): + if fit_start_time: + self.fit_start_time = fit_start_time + if fit_end_time: + self.fit_end_time = fit_end_time + super().config(**kwargs) class RobustZScoreNorm(Processor): @@ -257,6 +273,12 @@ def __call__(self, df): df.clip(-3, 3, inplace=True) return df + def config(fit_start_time=None, fit_end_time=None, **kwargs): + if fit_start_time: + self.fit_start_time = fit_start_time + if fit_end_time: + self.fit_end_time = fit_end_time + super().config(**kwargs) class CSZScoreNorm(Processor): """Cross Sectional ZScore Normalization""" From fb7f84f31e4e3b6a6e76cf496d97b6a62fe2fe04 Mon Sep 17 00:00:00 2001 From: bxdd Date: Mon, 29 Mar 2021 20:15:42 +0800 Subject: [PATCH 16/23] fix ubg --- examples/highfreq/highfreq_processor.py | 2 +- examples/highfreq/workflow.py | 2 +- examples/rolling_process_data/workflow.py | 14 +++++++++----- qlib/data/dataset/handler.py | 4 ++-- qlib/data/dataset/processor.py | 8 ++++---- 5 files changed, 17 insertions(+), 13 deletions(-) diff --git a/examples/highfreq/highfreq_processor.py b/examples/highfreq/highfreq_processor.py index 4ec8f3dd2ca..6ed68ff38d5 100644 --- a/examples/highfreq/highfreq_processor.py +++ b/examples/highfreq/highfreq_processor.py @@ -71,7 +71,7 @@ def __call__(self, df_features): ).sort_index() return df_new_features - def config(fit_start_time=None, fit_end_time=None, **kwargs): + def config(self, fit_start_time=None, fit_end_time=None, **kwargs): if fit_start_time: self.fit_start_time = fit_start_time if fit_end_time: diff --git a/examples/highfreq/workflow.py b/examples/highfreq/workflow.py index 0b48b971f9f..97762f182a6 100644 --- a/examples/highfreq/workflow.py +++ b/examples/highfreq/workflow.py @@ -31,7 +31,7 @@ class HighfreqWorkflow(object): SPEC_CONF = {"custom_ops": [DayLast, FFillNan, BFillNan, Date, Select, IsNull, Cut], "expression_cache": None} - MARKET = "csi300" + MARKET = "all" start_time = "2020-09-15 00:00:00" end_time = "2021-01-18 16:00:00" diff --git a/examples/rolling_process_data/workflow.py b/examples/rolling_process_data/workflow.py index 0be88dddcc5..ffdd8329aad 100644 --- a/examples/rolling_process_data/workflow.py +++ b/examples/rolling_process_data/workflow.py @@ -101,15 +101,16 @@ def rolling_process(self): print(f"===========rolling{rolling_offset} start===========") if rolling_offset: - dataset.init( + dataset.config( handler_kwargs={ - "init_type": DataHandlerLP.IT_FIT_SEQ, "start_time": datetime(train_start_time[0] + rolling_offset, *train_start_time[1:]), "end_time": datetime(test_end_time[0] + rolling_offset, *test_end_time[1:]), - "fit_start_time": datetime(train_start_time[0] + rolling_offset, *train_start_time[1:]), - "fit_end_time": datetime(train_end_time[0] + rolling_offset, *train_end_time[1:]), + "processor_kwargs":{ + "fit_start_time": datetime(train_start_time[0] + rolling_offset, *train_start_time[1:]), + "fit_end_time": datetime(train_end_time[0] + rolling_offset, *train_end_time[1:]), + }, }, - segment_kwargs={ + segments={ "train": ( datetime(train_start_time[0] + rolling_offset, *train_start_time[1:]), datetime(train_end_time[0] + rolling_offset, *train_end_time[1:]), @@ -124,6 +125,9 @@ def rolling_process(self): ), }, ) + dataset.setup_data( + handler_kwargs={"init_type": DataHandlerLP.IT_FIT_SEQ,} + ) dtrain, dvalid, dtest = dataset.prepare(["train", "valid", "test"]) print(dtrain, dvalid, dtest) diff --git a/qlib/data/dataset/handler.py b/qlib/data/dataset/handler.py index 712cd62327f..4adef23a048 100644 --- a/qlib/data/dataset/handler.py +++ b/qlib/data/dataset/handler.py @@ -407,7 +407,7 @@ def process_data(self, with_fit: bool = False): if self.drop_raw: del self._data - def config(self, processors_kwargs:dict = None, **kwargs): + def config(self, processor_kwargs:dict = None, **kwargs): """ configuration of data. # what data to be loaded from data source @@ -417,7 +417,7 @@ def config(self, processors_kwargs:dict = None, **kwargs): """ super().config(**kwargs) - if processors_kwargs is not None: + if processor_kwargs is not None: for processor in self.get_all_processors(): processor.config(**processor_kwargs) diff --git a/qlib/data/dataset/processor.py b/qlib/data/dataset/processor.py index e14e85831b0..5be178c5c03 100755 --- a/qlib/data/dataset/processor.py +++ b/qlib/data/dataset/processor.py @@ -72,7 +72,7 @@ def is_for_infer(self) -> bool: """ return True - def config(**kwargs): + def config(self, **kwargs): super().config(kwargs.get("dump_all", None), kwargs.get("exclude", None)) @@ -195,7 +195,7 @@ def normalize(x, min_val=self.min_val, max_val=self.max_val, ignore=self.ignore) df.loc(axis=1)[self.cols] = normalize(df[self.cols].values) return df - def config(fit_start_time=None, fit_end_time=None, **kwargs): + def config(self, fit_start_time=None, fit_end_time=None, **kwargs): if fit_start_time: self.fit_start_time = fit_start_time if fit_end_time: @@ -230,7 +230,7 @@ def normalize(x, mean_train=self.mean_train, std_train=self.std_train, ignore=se df.loc(axis=1)[self.cols] = normalize(df[self.cols].values) return df - def config(fit_start_time=None, fit_end_time=None, **kwargs): + def config(self, fit_start_time=None, fit_end_time=None, **kwargs): if fit_start_time: self.fit_start_time = fit_start_time if fit_end_time: @@ -273,7 +273,7 @@ def __call__(self, df): df.clip(-3, 3, inplace=True) return df - def config(fit_start_time=None, fit_end_time=None, **kwargs): + def config(self, fit_start_time=None, fit_end_time=None, **kwargs): if fit_start_time: self.fit_start_time = fit_start_time if fit_end_time: From 8743576f7238003530ae55e78fa50554d8d6ba33 Mon Sep 17 00:00:00 2001 From: bxdd Date: Mon, 29 Mar 2021 20:16:00 +0800 Subject: [PATCH 17/23] black format --- examples/highfreq/highfreq_processor.py | 2 +- examples/highfreq/workflow.py | 4 ++-- examples/rolling_process_data/workflow.py | 6 ++++-- qlib/data/dataset/__init__.py | 14 +++++--------- qlib/data/dataset/handler.py | 4 ++-- qlib/data/dataset/loader.py | 1 + qlib/data/dataset/processor.py | 4 +++- 7 files changed, 18 insertions(+), 17 deletions(-) diff --git a/examples/highfreq/highfreq_processor.py b/examples/highfreq/highfreq_processor.py index 6ed68ff38d5..d843c6ac081 100644 --- a/examples/highfreq/highfreq_processor.py +++ b/examples/highfreq/highfreq_processor.py @@ -70,7 +70,7 @@ def __call__(self, df_features): columns=["FEATURE_%d" % i for i in range(12 * 240)], ).sort_index() return df_new_features - + def config(self, fit_start_time=None, fit_end_time=None, **kwargs): if fit_start_time: self.fit_start_time = fit_start_time diff --git a/examples/highfreq/workflow.py b/examples/highfreq/workflow.py index 97762f182a6..94c9b689f40 100644 --- a/examples/highfreq/workflow.py +++ b/examples/highfreq/workflow.py @@ -177,8 +177,8 @@ def dump_and_load_dataset(self): dataset_backtest.setup_data(handler_kwargs={}) ##=============get data============= - xtest, = dataset.prepare(["test"]) - backtest_test, = dataset_backtest.prepare(["test"]) + (xtest,) = dataset.prepare(["test"]) + (backtest_test,) = dataset_backtest.prepare(["test"]) print(xtest, backtest_test) return diff --git a/examples/rolling_process_data/workflow.py b/examples/rolling_process_data/workflow.py index ffdd8329aad..02f43889d10 100644 --- a/examples/rolling_process_data/workflow.py +++ b/examples/rolling_process_data/workflow.py @@ -105,7 +105,7 @@ def rolling_process(self): handler_kwargs={ "start_time": datetime(train_start_time[0] + rolling_offset, *train_start_time[1:]), "end_time": datetime(test_end_time[0] + rolling_offset, *test_end_time[1:]), - "processor_kwargs":{ + "processor_kwargs": { "fit_start_time": datetime(train_start_time[0] + rolling_offset, *train_start_time[1:]), "fit_end_time": datetime(train_end_time[0] + rolling_offset, *train_end_time[1:]), }, @@ -126,7 +126,9 @@ def rolling_process(self): }, ) dataset.setup_data( - handler_kwargs={"init_type": DataHandlerLP.IT_FIT_SEQ,} + handler_kwargs={ + "init_type": DataHandlerLP.IT_FIT_SEQ, + } ) dtrain, dvalid, dtest = dataset.prepare(["train", "valid", "test"]) diff --git a/qlib/data/dataset/__init__.py b/qlib/data/dataset/__init__.py index aa90cee2fed..d8a9e0209a5 100644 --- a/qlib/data/dataset/__init__.py +++ b/qlib/data/dataset/__init__.py @@ -35,7 +35,7 @@ def __init__(self, *args, **kwargs): def config(self, *arg, **kwargs): """ - config is designed to configure and parameters that cannot be learned from the data + config is designed to configure and parameters that cannot be learned from the data """ super().config(*arg, **kwargs) @@ -117,7 +117,7 @@ def __init__(self, handler: Union[Dict, DataHandler], segments: Dict[Text, Tuple self.segments = segments.copy() super().__init__(**kwargs) - def config(self, handler_kwargs:dict = None, segments:dict = None, **kwargs): + def config(self, handler_kwargs: dict = None, segments: dict = None, **kwargs): """ Initialize the DatasetH @@ -130,7 +130,7 @@ def config(self, handler_kwargs:dict = None, segments:dict = None, **kwargs): kwargs : dict Config of DatasetH, such as - + - segments : dict Config of segments which is same as 'segments' in self.__init__ @@ -141,8 +141,6 @@ def config(self, handler_kwargs:dict = None, segments:dict = None, **kwargs): if segments is not None: self.segments = segments.copy() - - def setup_data(self, handler_kwargs: dict = None, **kwargs): """ Setup the Data @@ -151,16 +149,15 @@ def setup_data(self, handler_kwargs: dict = None, **kwargs): ---------- handler_kwargs : dict init arguments of DataHanlder, which could include the following arguments: - + - init_type : Init Type of Handler - + - enable_cache : wheter to enable cache """ super().setup_data(**kwargs) if handler_kwargs is not None: self.handler.setup_data(**handler_kwargs) - def __repr__(self): return "{name}(handler={handler}, segments={segments})".format( @@ -464,7 +461,6 @@ def setup_data(self, **kwargs): cal = self.handler.fetch(col_set=self.handler.CS_RAW).index.get_level_values("datetime").unique() cal = sorted(cal) self.cal = cal - def _prepare_seg(self, slc: slice, **kwargs) -> TSDataSampler: # Dataset decide how to slice data(Get more data for timeseries). diff --git a/qlib/data/dataset/handler.py b/qlib/data/dataset/handler.py index 4adef23a048..2190deeb150 100644 --- a/qlib/data/dataset/handler.py +++ b/qlib/data/dataset/handler.py @@ -119,7 +119,7 @@ def config(self, instruments=None, start_time=None, end_time=None, **kwargs): self.start_time = start_time if end_time: self.end_time = end_time - + def setup_data(self, enable_cache: bool = False): """ Set Up the data. @@ -407,7 +407,7 @@ def process_data(self, with_fit: bool = False): if self.drop_raw: del self._data - def config(self, processor_kwargs:dict = None, **kwargs): + def config(self, processor_kwargs: dict = None, **kwargs): """ configuration of data. # what data to be loaded from data source diff --git a/qlib/data/dataset/loader.py b/qlib/data/dataset/loader.py index a58bca5e87a..1cda5c0250d 100644 --- a/qlib/data/dataset/loader.py +++ b/qlib/data/dataset/loader.py @@ -53,6 +53,7 @@ def load(self, instruments, start_time=None, end_time=None) -> pd.DataFrame: """ pass + class DLWParser(DataLoader): """ (D)ata(L)oader (W)ith (P)arser for features and names diff --git a/qlib/data/dataset/processor.py b/qlib/data/dataset/processor.py index 5be178c5c03..d25d36c88f0 100755 --- a/qlib/data/dataset/processor.py +++ b/qlib/data/dataset/processor.py @@ -202,6 +202,7 @@ def config(self, fit_start_time=None, fit_end_time=None, **kwargs): self.fit_end_time = fit_end_time super().config(**kwargs) + class ZScoreNorm(Processor): """ZScore Normalization""" @@ -229,7 +230,7 @@ def normalize(x, mean_train=self.mean_train, std_train=self.std_train, ignore=se df.loc(axis=1)[self.cols] = normalize(df[self.cols].values) return df - + def config(self, fit_start_time=None, fit_end_time=None, **kwargs): if fit_start_time: self.fit_start_time = fit_start_time @@ -280,6 +281,7 @@ def config(self, fit_start_time=None, fit_end_time=None, **kwargs): self.fit_end_time = fit_end_time super().config(**kwargs) + class CSZScoreNorm(Processor): """Cross Sectional ZScore Normalization""" From d18c3674974dfa3593424418e53d167247dadf74 Mon Sep 17 00:00:00 2001 From: bxdd Date: Mon, 29 Mar 2021 20:34:36 +0800 Subject: [PATCH 18/23] update README --- examples/rolling_process_data/README.md | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/examples/rolling_process_data/README.md b/examples/rolling_process_data/README.md index 6a6af0d3d6c..b04f5ed7fe3 100644 --- a/examples/rolling_process_data/README.md +++ b/examples/rolling_process_data/README.md @@ -1 +1,17 @@ # Rolling Process Data + +This workflow is an example for `Rolling Process Data`. + +## Background + +When rolling train the models, data also needs to be generated in the different rolling windows. When the rolling window moves, the training data will also change, and the processor's learnable state (such as standard deviation, mean, etc.) will also be changed. + +In order to avoid regenerating data, this example uses the `DataHandler-based DataLoader` to load the raw features that are not related to the rolling window, and then used Processors to generate processed-features related to the sliding window. + + +### Run the Code + +Run the example by running the following command: +```bash + python workflow.py rolling_process +``` \ No newline at end of file From 1074284666113389cbcb6c5707f59e5c69f07f99 Mon Sep 17 00:00:00 2001 From: bxdd Date: Mon, 29 Mar 2021 20:38:09 +0800 Subject: [PATCH 19/23] fix docstring --- qlib/data/dataset/__init__.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/qlib/data/dataset/__init__.py b/qlib/data/dataset/__init__.py index d8a9e0209a5..668ea833b5b 100644 --- a/qlib/data/dataset/__init__.py +++ b/qlib/data/dataset/__init__.py @@ -20,9 +20,7 @@ def __init__(self, *args, **kwargs): """ init is designed to finish following steps: - - init instance - - - config the state of the dataset(info to prepare the data) + - init the sub instance and the state of the dataset(info to prepare the data) - The name of essential state for preparing data should not start with '_' so that it could be serialized on disk when serializing. - setup data From 136830bc2bf8281838d96c22fb0cdd45e93ae16b Mon Sep 17 00:00:00 2001 From: bxdd Date: Tue, 30 Mar 2021 00:38:15 +0800 Subject: [PATCH 20/23] update comments --- examples/highfreq/highfreq_processor.py | 7 ----- examples/highfreq/workflow.py | 6 ++--- examples/rolling_process_data/workflow.py | 2 +- qlib/data/dataset/__init__.py | 27 ++++++++++---------- qlib/data/dataset/handler.py | 17 ++++++++----- qlib/data/dataset/loader.py | 2 +- qlib/data/dataset/processor.py | 31 +++++++---------------- 7 files changed, 38 insertions(+), 54 deletions(-) diff --git a/examples/highfreq/highfreq_processor.py b/examples/highfreq/highfreq_processor.py index d843c6ac081..f0ab0dec2b1 100644 --- a/examples/highfreq/highfreq_processor.py +++ b/examples/highfreq/highfreq_processor.py @@ -70,10 +70,3 @@ def __call__(self, df_features): columns=["FEATURE_%d" % i for i in range(12 * 240)], ).sort_index() return df_new_features - - def config(self, fit_start_time=None, fit_end_time=None, **kwargs): - if fit_start_time: - self.fit_start_time = fit_start_time - if fit_end_time: - self.fit_end_time = fit_end_time - super().config(**kwargs) diff --git a/examples/highfreq/workflow.py b/examples/highfreq/workflow.py index 94c9b689f40..5660ab2e9e4 100644 --- a/examples/highfreq/workflow.py +++ b/examples/highfreq/workflow.py @@ -27,7 +27,7 @@ from highfreq_ops import get_calendar_day, DayLast, FFillNan, BFillNan, Date, Select, IsNull, Cut -class HighfreqWorkflow(object): +class HighfreqWorkflow: SPEC_CONF = {"custom_ops": [DayLast, FFillNan, BFillNan, Date, Select, IsNull, Cut], "expression_cache": None} @@ -177,8 +177,8 @@ def dump_and_load_dataset(self): dataset_backtest.setup_data(handler_kwargs={}) ##=============get data============= - (xtest,) = dataset.prepare(["test"]) - (backtest_test,) = dataset_backtest.prepare(["test"]) + xtest = dataset.prepare("test") + backtest_test = dataset_backtest.prepare("test") print(xtest, backtest_test) return diff --git a/examples/rolling_process_data/workflow.py b/examples/rolling_process_data/workflow.py index 02f43889d10..5757aaa876e 100644 --- a/examples/rolling_process_data/workflow.py +++ b/examples/rolling_process_data/workflow.py @@ -14,7 +14,7 @@ from qlib.tests.data import GetData -class RollingDataWorkflow(object): +class RollingDataWorkflow: MARKET = "csi300" start_time = "2010-01-01" diff --git a/qlib/data/dataset/__init__.py b/qlib/data/dataset/__init__.py index 668ea833b5b..b3eaac7a33d 100644 --- a/qlib/data/dataset/__init__.py +++ b/qlib/data/dataset/__init__.py @@ -3,6 +3,7 @@ from ...utils import init_instance_by_config, np_ffill from ...log import get_module_logger from .handler import DataHandler, DataHandlerLP +from copy import deepcopy from inspect import getfullargspec import pandas as pd import numpy as np @@ -16,7 +17,7 @@ class Dataset(Serializable): Preparing data for model training and inferencing. """ - def __init__(self, *args, **kwargs): + def __init__(self, **kwargs): """ init is designed to finish following steps: @@ -28,16 +29,16 @@ def __init__(self, *args, **kwargs): The data could specify the info to caculate the essential data for preparation """ - self.setup_data(*args, **kwargs) + self.setup_data(**kwargs) super().__init__() - def config(self, *arg, **kwargs): + def config(self, **kwargs): """ config is designed to configure and parameters that cannot be learned from the data """ - super().config(*arg, **kwargs) + super().config(**kwargs) - def setup_data(self, *args, **kwargs): + def setup_data(self, **kwargs): """ Setup the data. @@ -53,7 +54,7 @@ def setup_data(self, *args, **kwargs): """ pass - def prepare(self, *args, **kwargs) -> object: + def prepare(self, **kwargs) -> object: """ The type of dataset depends on the model. (It could be pd.DataFrame, pytorch.DataLoader, etc.) The parameters should specify the scope for the prepared data @@ -115,7 +116,7 @@ def __init__(self, handler: Union[Dict, DataHandler], segments: Dict[Text, Tuple self.segments = segments.copy() super().__init__(**kwargs) - def config(self, handler_kwargs: dict = None, segments: dict = None, **kwargs): + def config(self, handler_kwargs: dict = None, **kwargs): """ Initialize the DatasetH @@ -133,11 +134,11 @@ def config(self, handler_kwargs: dict = None, segments: dict = None, **kwargs): Config of segments which is same as 'segments' in self.__init__ """ - super().config(**kwargs) if handler_kwargs is not None: self.handler.config(**handler_kwargs) - if segments is not None: - self.segments = segments.copy() + if "segments" in kwargs: + self.segments = deepcopy(kwargs.pop("segments")) + super().config(**kwargs) def setup_data(self, handler_kwargs: dict = None, **kwargs): """ @@ -449,10 +450,10 @@ def __init__(self, step_len=30, **kwargs): self.step_len = step_len super().__init__(**kwargs) - def config(self, step_len=None, **kwargs): + def config(self, **kwargs): + if "step_len" in kwargs: + self.step_len = kwargs.pop("step_len") super().config(**kwargs) - if step_len: - self.step_len = step_len def setup_data(self, **kwargs): super().setup_data(**kwargs) diff --git a/qlib/data/dataset/handler.py b/qlib/data/dataset/handler.py index 2190deeb150..7fb7090d2ef 100644 --- a/qlib/data/dataset/handler.py +++ b/qlib/data/dataset/handler.py @@ -103,7 +103,7 @@ def __init__( self.setup_data() super().__init__() - def config(self, instruments=None, start_time=None, end_time=None, **kwargs): + def config(self, **kwargs): """ configuration of data. # what data to be loaded from data source @@ -112,13 +112,16 @@ def config(self, instruments=None, start_time=None, end_time=None, **kwargs): The data will be initialized with different time range. """ + attr_list = {"instruments", "start_time", "end_time"} + for k, v in kwargs.items(): + if k in attr_list: + setattr(self, k, v) + + for attr in attr_list: + if attr in kwargs: + kwargs.pop(attr) + super().config(**kwargs) - if instruments: - self.instruments = instruments - if start_time: - self.start_time = start_time - if end_time: - self.end_time = end_time def setup_data(self, enable_cache: bool = False): """ diff --git a/qlib/data/dataset/loader.py b/qlib/data/dataset/loader.py index 1cda5c0250d..58aca1d4f71 100644 --- a/qlib/data/dataset/loader.py +++ b/qlib/data/dataset/loader.py @@ -261,7 +261,7 @@ def __init__(self, handler_config: dict, fetch_kwargs: dict = {}, is_group=False self.is_group = is_group self.fetch_kwargs = {"col_set": DataHandler.CS_RAW} - self.fetch_kwargs = {**self.fetch_kwargs, **fetch_kwargs} + self.fetch_kwargs.update(fetch_kwargs) def load(self, instruments=None, start_time=None, end_time=None) -> pd.DataFrame: if instruments is not None: diff --git a/qlib/data/dataset/processor.py b/qlib/data/dataset/processor.py index d25d36c88f0..8f69a5dff12 100755 --- a/qlib/data/dataset/processor.py +++ b/qlib/data/dataset/processor.py @@ -73,7 +73,15 @@ def is_for_infer(self) -> bool: return True def config(self, **kwargs): - super().config(kwargs.get("dump_all", None), kwargs.get("exclude", None)) + attr_list = {"fit_start_time", "fit_end_time"} + for k, v in kwargs.items(): + if k in attr_list and getattr(self, k, None) is not None: + setattr(self, k, v) + + for attr in attr_list: + if attr in kwargs: + kwargs.pop(attr) + super().config(**kwargs) class DropnaProcessor(Processor): @@ -195,13 +203,6 @@ def normalize(x, min_val=self.min_val, max_val=self.max_val, ignore=self.ignore) df.loc(axis=1)[self.cols] = normalize(df[self.cols].values) return df - def config(self, fit_start_time=None, fit_end_time=None, **kwargs): - if fit_start_time: - self.fit_start_time = fit_start_time - if fit_end_time: - self.fit_end_time = fit_end_time - super().config(**kwargs) - class ZScoreNorm(Processor): """ZScore Normalization""" @@ -231,13 +232,6 @@ def normalize(x, mean_train=self.mean_train, std_train=self.std_train, ignore=se df.loc(axis=1)[self.cols] = normalize(df[self.cols].values) return df - def config(self, fit_start_time=None, fit_end_time=None, **kwargs): - if fit_start_time: - self.fit_start_time = fit_start_time - if fit_end_time: - self.fit_end_time = fit_end_time - super().config(**kwargs) - class RobustZScoreNorm(Processor): """Robust ZScore Normalization @@ -274,13 +268,6 @@ def __call__(self, df): df.clip(-3, 3, inplace=True) return df - def config(self, fit_start_time=None, fit_end_time=None, **kwargs): - if fit_start_time: - self.fit_start_time = fit_start_time - if fit_end_time: - self.fit_end_time = fit_end_time - super().config(**kwargs) - class CSZScoreNorm(Processor): """Cross Sectional ZScore Normalization""" From f8da79b802d617234f6ae20bea2ae2bc771c39a9 Mon Sep 17 00:00:00 2001 From: bxdd Date: Tue, 30 Mar 2021 00:54:00 +0800 Subject: [PATCH 21/23] fix readme --- examples/rolling_process_data/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/rolling_process_data/README.md b/examples/rolling_process_data/README.md index b04f5ed7fe3..c84eaac20c8 100644 --- a/examples/rolling_process_data/README.md +++ b/examples/rolling_process_data/README.md @@ -9,7 +9,7 @@ When rolling train the models, data also needs to be generated in the different In order to avoid regenerating data, this example uses the `DataHandler-based DataLoader` to load the raw features that are not related to the rolling window, and then used Processors to generate processed-features related to the sliding window. -### Run the Code +## Run the Code Run the example by running the following command: ```bash From 023603479c5e451671d2c68fcec65574ec847fe7 Mon Sep 17 00:00:00 2001 From: bxdd Date: Tue, 30 Mar 2021 01:00:12 +0800 Subject: [PATCH 22/23] fix readme --- examples/rolling_process_data/README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/rolling_process_data/README.md b/examples/rolling_process_data/README.md index c84eaac20c8..315fe2eede9 100644 --- a/examples/rolling_process_data/README.md +++ b/examples/rolling_process_data/README.md @@ -4,9 +4,9 @@ This workflow is an example for `Rolling Process Data`. ## Background -When rolling train the models, data also needs to be generated in the different rolling windows. When the rolling window moves, the training data will also change, and the processor's learnable state (such as standard deviation, mean, etc.) will also be changed. +When rolling train the models, data also needs to be generated in the different rolling windows. When the rolling window moves, the training data will change, and the processor's learnable state (such as standard deviation, mean, etc.) will also change. -In order to avoid regenerating data, this example uses the `DataHandler-based DataLoader` to load the raw features that are not related to the rolling window, and then used Processors to generate processed-features related to the sliding window. +In order to avoid regenerating data, this example uses the `DataHandler-based DataLoader` to load the raw features that are not related to the rolling window, and then used Processors to generate processed-features related to the rolling window. ## Run the Code From 7a2203f116bd79338481ffe439ad389b247c0e03 Mon Sep 17 00:00:00 2001 From: bxdd Date: Tue, 30 Mar 2021 11:03:07 +0800 Subject: [PATCH 23/23] update comments --- qlib/data/dataset/handler.py | 5 ++--- qlib/data/dataset/processor.py | 2 +- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/qlib/data/dataset/handler.py b/qlib/data/dataset/handler.py index 7fb7090d2ef..201d2459dbf 100644 --- a/qlib/data/dataset/handler.py +++ b/qlib/data/dataset/handler.py @@ -125,8 +125,7 @@ def config(self, **kwargs): def setup_data(self, enable_cache: bool = False): """ - Set Up the data. - In case of running intialization for multiple time, it will do nothing for the second time. + Set Up the data in case of running intialization for multiple time It is responsible for maintaining following variable 1) self._data @@ -431,7 +430,7 @@ def config(self, processor_kwargs: dict = None, **kwargs): def setup_data(self, init_type: str = IT_FIT_SEQ, **kwargs): """ - Set up the data of Qlib + Set up the data in case of running intialization for multiple time Parameters ---------- diff --git a/qlib/data/dataset/processor.py b/qlib/data/dataset/processor.py index 8f69a5dff12..e035f562423 100755 --- a/qlib/data/dataset/processor.py +++ b/qlib/data/dataset/processor.py @@ -75,7 +75,7 @@ def is_for_infer(self) -> bool: def config(self, **kwargs): attr_list = {"fit_start_time", "fit_end_time"} for k, v in kwargs.items(): - if k in attr_list and getattr(self, k, None) is not None: + if k in attr_list and hasattr(self, k): setattr(self, k, v) for attr in attr_list: