Skip to content

Commit 9639a8c

Browse files
authored
add default protocol_version (#677)
* add default protocol_version * add comment to serial.Serializable.get_backend
1 parent cae4c9c commit 9639a8c

File tree

11 files changed

+139
-127
lines changed

11 files changed

+139
-127
lines changed

qlib/config.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,9 @@ def set_conf_from_C(self, config_c):
7373
REG_CN = "cn"
7474
REG_US = "us"
7575

76+
# pickle.dump protocol version: https://docs.python.org/3/library/pickle.html#data-stream-format
77+
PROTOCOL_VERSION = 4
78+
7679
NUM_USABLE_CPU = max(multiprocessing.cpu_count() - 2, 1)
7780

7881
DISK_DATASET_CACHE = "DiskDatasetCache"
@@ -107,6 +110,8 @@ def set_conf_from_C(self, config_c):
107110
# for simple dataset cache
108111
"local_cache_path": None,
109112
"kernels": NUM_USABLE_CPU,
113+
# pickle.dump protocol version
114+
"dump_protocol_version": PROTOCOL_VERSION,
110115
# How many tasks belong to one process. Recommend 1 for high-frequency data and None for daily data.
111116
"maxtasksperchild": None,
112117
# If joblib_backend is None, use loky

qlib/contrib/online/manager.py

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,14 @@
11
# Copyright (c) Microsoft Corporation.
22
# Licensed under the MIT License.
33

4-
import os
5-
import pickle
64
import yaml
75
import pathlib
86
import pandas as pd
97
import shutil
10-
from ..backtest.account import Account
11-
from ..backtest.exchange import Exchange
8+
from ...backtest.account import Account
129
from .user import User
13-
from .utils import load_instance
14-
from ...utils import save_instance, init_instance_by_config
10+
from .utils import load_instance, save_instance
11+
from ...utils import init_instance_by_config
1512

1613

1714
class UserManager:

qlib/contrib/online/utils.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,10 @@
66
import yaml
77
import pandas as pd
88
from ...data import D
9+
from ...config import C
910
from ...log import get_module_logger
10-
from ...utils import get_module_by_module_path, init_instance_by_config
1111
from ...utils import get_next_trading_date
12-
from ..backtest.exchange import Exchange
12+
from ...backtest.exchange import Exchange
1313

1414
log = get_module_logger("utils")
1515

@@ -42,7 +42,7 @@ def save_instance(instance, file_path):
4242
"""
4343
file_path = pathlib.Path(file_path)
4444
with file_path.open("wb") as fr:
45-
pickle.dump(instance, fr)
45+
pickle.dump(instance, fr, C.dump_protocol_version)
4646

4747

4848
def create_user_folder(path):

qlib/data/base.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -154,10 +154,11 @@ def load(self, instrument, start_index, end_index, freq):
154154
raise ValueError("Invalid index range: {} {}".format(start_index, end_index))
155155
try:
156156
series = self._load_internal(instrument, start_index, end_index, freq)
157-
except Exception:
157+
except Exception as e:
158158
get_module_logger("data").error(
159159
f"Loading data error: instrument={instrument}, expression={str(self)}, "
160-
f"start_index={start_index}, end_index={end_index}, freq={freq}"
160+
f"start_index={start_index}, end_index={end_index}, freq={freq}. "
161+
f"error info: {str(e)}"
161162
)
162163
raise
163164
series.name = str(self)

qlib/data/cache.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,7 @@ def visit(cache_path: Union[str, Path]):
230230
d["meta"]["visits"] = d["meta"]["visits"] + 1
231231
except KeyError:
232232
raise KeyError("Unknown meta keyword")
233-
pickle.dump(d, f)
233+
pickle.dump(d, f, protocol=C.dump_protocol_version)
234234
except Exception as e:
235235
get_module_logger("CacheUtils").warning(f"visit {cache_path} cache error: {e}")
236236

@@ -573,7 +573,7 @@ def gen_expression_cache(self, expression_data, cache_path, instrument, field, f
573573
meta_path = cache_path.with_suffix(".meta")
574574

575575
with meta_path.open("wb") as f:
576-
pickle.dump(meta, f)
576+
pickle.dump(meta, f, protocol=C.dump_protocol_version)
577577
meta_path.chmod(stat.S_IRWXU | stat.S_IRGRP | stat.S_IROTH)
578578
df = expression_data.to_frame()
579579

@@ -638,7 +638,7 @@ def update(self, sid, cache_uri, freq: str = "day"):
638638
# update meta file
639639
d["info"]["last_update"] = str(new_calendar[-1])
640640
with meta_path.open("wb") as f:
641-
pickle.dump(d, f)
641+
pickle.dump(d, f, protocol=C.dump_protocol_version)
642642
return 0
643643

644644

@@ -935,7 +935,7 @@ def gen_dataset_cache(self, cache_path: Union[str, Path], instruments, fields, f
935935
"meta": {"last_visit": time.time(), "visits": 1},
936936
}
937937
with cache_path.with_suffix(".meta").open("wb") as f:
938-
pickle.dump(meta, f)
938+
pickle.dump(meta, f, protocol=C.dump_protocol_version)
939939
cache_path.with_suffix(".meta").chmod(stat.S_IRWXU | stat.S_IRGRP | stat.S_IROTH)
940940
# write index file
941941
im = DiskDatasetCache.IndexManager(cache_path)
@@ -1057,7 +1057,7 @@ def update(self, cache_uri, freq: str = "day"):
10571057
# update meta file
10581058
d["info"]["last_update"] = str(new_calendar[-1])
10591059
with meta_path.open("wb") as f:
1060-
pickle.dump(d, f)
1060+
pickle.dump(d, f, protocol=C.dump_protocol_version)
10611061
return 0
10621062

10631063

qlib/data/client.py

Lines changed: 103 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -1,102 +1,103 @@
1-
# Copyright (c) Microsoft Corporation.
2-
# Licensed under the MIT License.
3-
4-
5-
from __future__ import division
6-
from __future__ import print_function
7-
8-
import socketio
9-
10-
import qlib
11-
from ..log import get_module_logger
12-
import pickle
13-
14-
15-
class Client:
16-
"""A client class
17-
18-
Provide the connection tool functions for ClientProvider.
19-
"""
20-
21-
def __init__(self, host, port):
22-
super(Client, self).__init__()
23-
self.sio = socketio.Client()
24-
self.server_host = host
25-
self.server_port = port
26-
self.logger = get_module_logger(self.__class__.__name__)
27-
# bind connect/disconnect callbacks
28-
self.sio.on(
29-
"connect",
30-
lambda: self.logger.debug("Connect to server {}".format(self.sio.connection_url)),
31-
)
32-
self.sio.on("disconnect", lambda: self.logger.debug("Disconnect from server!"))
33-
34-
def connect_server(self):
35-
"""Connect to server."""
36-
try:
37-
self.sio.connect("ws://" + self.server_host + ":" + str(self.server_port))
38-
except socketio.exceptions.ConnectionError:
39-
self.logger.error("Cannot connect to server - check your network or server status")
40-
41-
def disconnect(self):
42-
"""Disconnect from server."""
43-
try:
44-
self.sio.eio.disconnect(True)
45-
except Exception as e:
46-
self.logger.error("Cannot disconnect from server : %s" % e)
47-
48-
def send_request(self, request_type, request_content, msg_queue, msg_proc_func=None):
49-
"""Send a certain request to server.
50-
51-
Parameters
52-
----------
53-
request_type : str
54-
type of proposed request, 'calendar'/'instrument'/'feature'.
55-
request_content : dict
56-
records the information of the request.
57-
msg_proc_func : func
58-
the function to process the message when receiving response, should have arg `*args`.
59-
msg_queue: Queue
60-
The queue to pass the messsage after callback.
61-
"""
62-
head_info = {"version": qlib.__version__}
63-
64-
def request_callback(*args):
65-
"""callback_wrapper
66-
67-
:param *args: args[0] is the response content
68-
"""
69-
# args[0] is the response content
70-
self.logger.debug("receive data and enter queue")
71-
msg = dict(args[0])
72-
if msg["detailed_info"] is not None:
73-
if msg["status"] != 0:
74-
self.logger.error(msg["detailed_info"])
75-
else:
76-
self.logger.info(msg["detailed_info"])
77-
if msg["status"] != 0:
78-
ex = ValueError(f"Bad response(status=={msg['status']}), detailed info: {msg['detailed_info']}")
79-
msg_queue.put(ex)
80-
else:
81-
if msg_proc_func is not None:
82-
try:
83-
ret = msg_proc_func(msg["result"])
84-
except Exception as e:
85-
self.logger.exception("Error when processing message.")
86-
ret = e
87-
else:
88-
ret = msg["result"]
89-
msg_queue.put(ret)
90-
self.disconnect()
91-
self.logger.debug("disconnected")
92-
93-
self.logger.debug("try connecting")
94-
self.connect_server()
95-
self.logger.debug("connected")
96-
# The pickle is for passing some parameters with special type(such as
97-
# pd.Timestamp)
98-
request_content = {"head": head_info, "body": pickle.dumps(request_content)}
99-
self.sio.on(request_type + "_response", request_callback)
100-
self.logger.debug("try sending")
101-
self.sio.emit(request_type + "_request", request_content)
102-
self.sio.wait()
1+
# Copyright (c) Microsoft Corporation.
2+
# Licensed under the MIT License.
3+
4+
5+
from __future__ import division
6+
from __future__ import print_function
7+
8+
import socketio
9+
10+
import qlib
11+
from ..config import C
12+
from ..log import get_module_logger
13+
import pickle
14+
15+
16+
class Client:
17+
"""A client class
18+
19+
Provide the connection tool functions for ClientProvider.
20+
"""
21+
22+
def __init__(self, host, port):
23+
super(Client, self).__init__()
24+
self.sio = socketio.Client()
25+
self.server_host = host
26+
self.server_port = port
27+
self.logger = get_module_logger(self.__class__.__name__)
28+
# bind connect/disconnect callbacks
29+
self.sio.on(
30+
"connect",
31+
lambda: self.logger.debug("Connect to server {}".format(self.sio.connection_url)),
32+
)
33+
self.sio.on("disconnect", lambda: self.logger.debug("Disconnect from server!"))
34+
35+
def connect_server(self):
36+
"""Connect to server."""
37+
try:
38+
self.sio.connect("ws://" + self.server_host + ":" + str(self.server_port))
39+
except socketio.exceptions.ConnectionError:
40+
self.logger.error("Cannot connect to server - check your network or server status")
41+
42+
def disconnect(self):
43+
"""Disconnect from server."""
44+
try:
45+
self.sio.eio.disconnect(True)
46+
except Exception as e:
47+
self.logger.error("Cannot disconnect from server : %s" % e)
48+
49+
def send_request(self, request_type, request_content, msg_queue, msg_proc_func=None):
50+
"""Send a certain request to server.
51+
52+
Parameters
53+
----------
54+
request_type : str
55+
type of proposed request, 'calendar'/'instrument'/'feature'.
56+
request_content : dict
57+
records the information of the request.
58+
msg_proc_func : func
59+
the function to process the message when receiving response, should have arg `*args`.
60+
msg_queue: Queue
61+
The queue to pass the messsage after callback.
62+
"""
63+
head_info = {"version": qlib.__version__}
64+
65+
def request_callback(*args):
66+
"""callback_wrapper
67+
68+
:param *args: args[0] is the response content
69+
"""
70+
# args[0] is the response content
71+
self.logger.debug("receive data and enter queue")
72+
msg = dict(args[0])
73+
if msg["detailed_info"] is not None:
74+
if msg["status"] != 0:
75+
self.logger.error(msg["detailed_info"])
76+
else:
77+
self.logger.info(msg["detailed_info"])
78+
if msg["status"] != 0:
79+
ex = ValueError(f"Bad response(status=={msg['status']}), detailed info: {msg['detailed_info']}")
80+
msg_queue.put(ex)
81+
else:
82+
if msg_proc_func is not None:
83+
try:
84+
ret = msg_proc_func(msg["result"])
85+
except Exception as e:
86+
self.logger.exception("Error when processing message.")
87+
ret = e
88+
else:
89+
ret = msg["result"]
90+
msg_queue.put(ret)
91+
self.disconnect()
92+
self.logger.debug("disconnected")
93+
94+
self.logger.debug("try connecting")
95+
self.connect_server()
96+
self.logger.debug("connected")
97+
# The pickle is for passing some parameters with special type(such as
98+
# pd.Timestamp)
99+
request_content = {"head": head_info, "body": pickle.dumps(request_content, protocol=C.dump_protocol_version)}
100+
self.sio.on(request_type + "_response", request_callback)
101+
self.logger.debug("try sending")
102+
self.sio.emit(request_type + "_request", request_content)
103+
self.sio.wait()

qlib/data/data.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -726,10 +726,11 @@ def expression(self, instrument, field, start_time=None, end_time=None, freq="da
726726
lft_etd, rght_etd = expression.get_extended_window_size()
727727
try:
728728
series = expression.load(instrument, max(0, start_index - lft_etd), end_index + rght_etd, freq)
729-
except Exception:
729+
except Exception as e:
730730
get_module_logger("data").error(
731731
f"Loading expression error: "
732-
f"instrument={instrument}, field=({field}), start_time={start_time}, end_time={end_time}, freq={freq}"
732+
f"instrument={instrument}, field=({field}), start_time={start_time}, end_time={end_time}, freq={freq}. "
733+
f"error info: {str(e)}"
733734
)
734735
raise
735736
# Ensure that each column type is consistent

qlib/data/ops.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -312,12 +312,12 @@ def _load_internal(self, instrument, start_index, end_index, freq):
312312
warning_info = (
313313
f"Loading {instrument}: {str(self)}; np.{self.func}(series_left, series_right), "
314314
f"The length of series_left and series_right is different: ({len(series_left)}, {len(series_right)}), "
315-
f"series_left is {str(self.feature_left)}, series_right is {str(self.feature_left)}. Please check the data"
315+
f"series_left is {str(self.feature_left)}, series_right is {str(self.feature_right)}. Please check the data"
316316
)
317317
else:
318318
warning_info = (
319319
f"Loading {instrument}: {str(self)}; np.{self.func}(series_left, series_right), "
320-
f"series_left is {str(self.feature_left)}, series_right is {str(self.feature_left)}. Please check the data"
320+
f"series_left is {str(self.feature_left)}, series_right is {str(self.feature_right)}. Please check the data"
321321
)
322322
try:
323323
res = getattr(np, self.func)(series_left, series_right)

qlib/utils/objm.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ def create_path(self) -> str:
106106

107107
def save_obj(self, obj, name):
108108
with (self.path / name).open("wb") as f:
109-
pickle.dump(obj, f)
109+
pickle.dump(obj, f, protocol=C.dump_protocol_version)
110110

111111
def save_objs(self, obj_name_l):
112112
for obj, name in obj_name_l:

qlib/utils/serial.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import dill
66
from pathlib import Path
77
from typing import Union
8+
from ..config import C
89

910

1011
class Serializable:
@@ -85,7 +86,8 @@ def to_pickle(self, path: Union[Path, str], dump_all: bool = None, exclude: list
8586
"""
8687
self.config(dump_all=dump_all, exclude=exclude)
8788
with Path(path).open("wb") as f:
88-
self.get_backend().dump(self, f)
89+
# pickle interface like backend; such as dill
90+
self.get_backend().dump(self, f, protocol=C.dump_protocol_version)
8991

9092
@classmethod
9193
def load(cls, filepath):
@@ -116,6 +118,7 @@ def get_backend(cls):
116118
Returns:
117119
module: pickle or dill module based on pickle_backend
118120
"""
121+
# NOTE: pickle interface like backend; such as dill
119122
if cls.pickle_backend == "pickle":
120123
return pickle
121124
elif cls.pickle_backend == "dill":
@@ -140,4 +143,4 @@ def general_dump(obj, path: Union[Path, str]):
140143
obj.to_pickle(path)
141144
else:
142145
with path.open("wb") as f:
143-
pickle.dump(obj, f)
146+
pickle.dump(obj, f, protocol=C.dump_protocol_version)

0 commit comments

Comments
 (0)