Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 150 additions & 27 deletions qlib/contrib/data/highfreq_handler.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from qlib.data.dataset.handler import DataHandler, DataHandlerLP

from .handler import check_transform_proc

EPSILON = 1e-4


Expand All @@ -15,20 +17,9 @@ def __init__(
fit_end_time=None,
drop_raw=True,
):
def check_transform_proc(proc_l):
new_l = []
for p in proc_l:
p["kwargs"].update(
{
"fit_start_time": fit_start_time,
"fit_end_time": fit_end_time,
}
)
new_l.append(p)
return new_l

infer_processors = check_transform_proc(infer_processors)
learn_processors = check_transform_proc(learn_processors)
infer_processors = check_transform_proc(infer_processors, fit_start_time, fit_end_time)
learn_processors = check_transform_proc(learn_processors, fit_start_time, fit_end_time)

data_loader = {
"class": "QlibDataLoader",
Expand Down Expand Up @@ -110,6 +101,103 @@ def get_normalized_price_feature(price_field, shift=0):
return fields, names


class HighFreqGeneralHandler(DataHandlerLP):
def __init__(
self,
instruments="csi300",
start_time=None,
end_time=None,
infer_processors=[],
learn_processors=[],
fit_start_time=None,
fit_end_time=None,
drop_raw=True,
day_length=240,
):
self.day_length = day_length

infer_processors = check_transform_proc(infer_processors, fit_start_time, fit_end_time)
learn_processors = check_transform_proc(learn_processors, fit_start_time, fit_end_time)

data_loader = {
"class": "QlibDataLoader",
"kwargs": {
"config": self.get_feature_config(),
"swap_level": False,
"freq": "1min",
},
}
super().__init__(
instruments=instruments,
start_time=start_time,
end_time=end_time,
data_loader=data_loader,
infer_processors=infer_processors,
learn_processors=learn_processors,
drop_raw=drop_raw,
)

def get_feature_config(self):
fields = []
names = []

template_if = "If(IsNull({1}), {0}, {1})"
template_paused = f"Cut({{0}}, {self.day_length * 2}, None)"

def get_normalized_price_feature(price_field, shift=0):
# norm with the close price of 237th minute of yesterday.
if shift == 0:
template_norm = f"{{0}}/DayLast(Ref({{1}}, {self.day_length * 2}))"
else:
template_norm = f"Ref({{0}}, " + str(shift) + f")/DayLast(Ref({{1}}, {self.day_length}))"

template_fillnan = "FFillNan({0})"
# calculate -> ffill -> remove paused
feature_ops = template_paused.format(
template_fillnan.format(
template_norm.format(template_if.format("$close", price_field), template_fillnan.format("$close"))
)
)
return feature_ops

fields += [get_normalized_price_feature("$open", 0)]
fields += [get_normalized_price_feature("$high", 0)]
fields += [get_normalized_price_feature("$low", 0)]
fields += [get_normalized_price_feature("$close", 0)]
fields += [get_normalized_price_feature("$vwap", 0)]
names += ["$open", "$high", "$low", "$close", "$vwap"]

fields += [get_normalized_price_feature("$open", self.day_length)]
fields += [get_normalized_price_feature("$high", self.day_length)]
fields += [get_normalized_price_feature("$low", self.day_length)]
fields += [get_normalized_price_feature("$close", self.day_length)]
fields += [get_normalized_price_feature("$vwap", self.day_length)]
names += ["$open_1", "$high_1", "$low_1", "$close_1", "$vwap_1"]

# calculate and fill nan with 0
fields += [
template_paused.format(
"If(IsNull({0}), 0, {0})".format(
f"{{0}}/Ref(DayLast(Mean({{0}}, {self.day_length * 30})), {self.day_length})".format("$volume")
)
)
]
names += ["$volume"]

fields += [
template_paused.format(
"If(IsNull({0}), 0, {0})".format(
f"Ref({{0}}, {self.day_length})/Ref(DayLast(Mean({{0}}, {self.day_length * 30})), {self.day_length})".format(
"$volume"
)
)
)
]
names += ["$volume_1"]

return fields, names


class HighFreqBacktestHandler(DataHandler):
def __init__(
self,
Expand Down Expand Up @@ -163,6 +251,53 @@ def get_feature_config(self):
return fields, names


class HighFreqGeneralBacktestHandler(DataHandler):
def __init__(
self,
instruments="csi300",
start_time=None,
end_time=None,
day_length=240,
):
self.day_length = day_length
data_loader = {
"class": "QlibDataLoader",
"kwargs": {
"config": self.get_feature_config(),
"swap_level": False,
"freq": "1min",
},
}
super().__init__(
instruments=instruments,
start_time=start_time,
end_time=end_time,
data_loader=data_loader,
)

def get_feature_config(self):
fields = []
names = []

template_paused = f"Cut({{0}}, {self.day_length * 2}, None)"
template_fillnan = "FFillNan({0})"
template_if = "If(IsNull({1}), {0}, {1})"
fields += [
template_paused.format(template_fillnan.format("$close")),
]
names += ["$close0"]

fields += [
template_paused.format(template_if.format(template_fillnan.format("$close"), "$vwap")),
]
names += ["$vwap0"]

fields += [template_paused.format("If(IsNull({0}), 0, {0})".format("$volume"))]
names += ["$volume0"]

return fields, names


class HighFreqOrderHandler(DataHandlerLP):
def __init__(
self,
Expand All @@ -175,20 +310,9 @@ def __init__(
fit_end_time=None,
drop_raw=True,
):
def check_transform_proc(proc_l):
new_l = []
for p in proc_l:
p["kwargs"].update(
{
"fit_start_time": fit_start_time,
"fit_end_time": fit_end_time,
}
)
new_l.append(p)
return new_l

infer_processors = check_transform_proc(infer_processors)
learn_processors = check_transform_proc(learn_processors)
infer_processors = check_transform_proc(infer_processors, fit_start_time, fit_end_time)
learn_processors = check_transform_proc(learn_processors, fit_start_time, fit_end_time)

data_loader = {
"class": "QlibDataLoader",
Expand Down Expand Up @@ -356,7 +480,6 @@ def get_feature_config(self):

template_if = "If(IsNull({1}), {0}, {1})"
template_paused = "Select(Gt($hx_paused_num, 1.001), {0})"
# template_paused = "{0}"
template_fillnan = "FFillNan({0})"
fields += [
template_fillnan.format(template_paused.format("$close")),
Expand Down