From 68637e59645047e0433e1d1d4527eac483c87d47 Mon Sep 17 00:00:00 2001 From: demon143 <59681577+demon143@users.noreply.github.com> Date: Mon, 6 Sep 2021 15:57:03 +0800 Subject: [PATCH 1/8] Update manage.py --- qlib/workflow/task/manage.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/qlib/workflow/task/manage.py b/qlib/workflow/task/manage.py index 41e243b435d..9bbc846789b 100644 --- a/qlib/workflow/task/manage.py +++ b/qlib/workflow/task/manage.py @@ -47,6 +47,14 @@ class TaskManager: The tasks manager assumes that you will only update the tasks you fetched. The mongo fetch one and update will make it date updating secure. + This class can be used as a tool from commandline. Here are serveral examples + + .. code-block:: shell + + python -m qlib.workflow.task.manage -t wait + python -m qlib.workflow.task.manage -t task_stat + + .. note:: Assumption: the data in MongoDB was encoded and the data out of MongoDB was decoded @@ -80,6 +88,7 @@ def __init__(self, task_pool: str): task_pool: str the name of Collection in MongoDB """ + #self.task_pool is mongodb's connection self.task_pool = getattr(get_mongodb(), task_pool) self.logger = get_module_logger(self.__class__.__name__) @@ -207,10 +216,12 @@ def create_task(self, task_def_l, dry_run=False, print_nt=False) -> List[str]: new_tasks = [] _id_list = [] for t in task_def_l: + #self.task_pool: XXX = getattr(...) try: r = self.task_pool.find_one({"filter": t}) except InvalidDocument: r = self.task_pool.find_one({"filter": self._dict_to_str(t)}) + # When r is none, it indicates that r s a new task if r is None: new_tasks.append(t) if not dry_run: @@ -219,6 +230,7 @@ def create_task(self, task_def_l, dry_run=False, print_nt=False) -> List[str]: else: _id_list.append(None) else: + #_decode_task is Serialization tool _id_list.append(self._decode_task(r)["_id"]) self.logger.info(f"Total Tasks: {len(task_def_l)}, New Tasks: {len(new_tasks)}") @@ -496,7 +508,7 @@ def (task_def, **kwargs) -> if task is None: break get_module_logger("run_task").info(task["def"]) - # when fetching `WAITING` task, use task["def"] to train + # when fetching `WAITING` task, use task["def"] to train. "def" means that the task has not been defined if before_status == TaskManager.STATUS_WAITING: param = task["def"] # when fetching `PART_DONE` task, use task["res"] to train because the middle result has been saved to task["res"] From 7e8801c2ec00f88b996d1812791fe439accb0c79 Mon Sep 17 00:00:00 2001 From: demon143 <59681577+demon143@users.noreply.github.com> Date: Tue, 7 Sep 2021 14:21:45 +0800 Subject: [PATCH 2/8] Update manage.py --- qlib/workflow/task/manage.py | 24 +++++++++++++++++------- 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/qlib/workflow/task/manage.py b/qlib/workflow/task/manage.py index 9bbc846789b..9457849bc96 100644 --- a/qlib/workflow/task/manage.py +++ b/qlib/workflow/task/manage.py @@ -88,8 +88,7 @@ def __init__(self, task_pool: str): task_pool: str the name of Collection in MongoDB """ - #self.task_pool is mongodb's connection - self.task_pool = getattr(get_mongodb(), task_pool) + self.task_pool:pymongo.collection.Collection = getattr(get_mongodb(), task_pool) self.logger = get_module_logger(self.__class__.__name__) @staticmethod @@ -110,6 +109,19 @@ def _encode_task(self, task): return task def _decode_task(self, task): + """ + _decode_task is Serialization tool + + Parameters + ---------- + task : dict + task information + + Returns + ------- + bson.objectid.ObjectId + Convert dict to bson + """ for prefix in self.ENCODE_FIELDS_PREFIX: for k in list(task.keys()): if k.startswith(prefix): @@ -216,7 +228,6 @@ def create_task(self, task_def_l, dry_run=False, print_nt=False) -> List[str]: new_tasks = [] _id_list = [] for t in task_def_l: - #self.task_pool: XXX = getattr(...) try: r = self.task_pool.find_one({"filter": t}) except InvalidDocument: @@ -230,7 +241,6 @@ def create_task(self, task_def_l, dry_run=False, print_nt=False) -> List[str]: else: _id_list.append(None) else: - #_decode_task is Serialization tool _id_list.append(self._decode_task(r)["_id"]) self.logger.info(f"Total Tasks: {len(task_def_l)}, New Tasks: {len(new_tasks)}") @@ -473,11 +483,11 @@ def run_task( After running this method, here are 4 situations (before_status -> after_status): - STATUS_WAITING -> STATUS_DONE: use task["def"] as `task_func` param + STATUS_WAITING -> STATUS_DONE: use task["def"] as `task_func` param,it means that the task has not been started STATUS_WAITING -> STATUS_PART_DONE: use task["def"] as `task_func` param - STATUS_PART_DONE -> STATUS_PART_DONE: use task["res"] as `task_func` param + STATUS_PART_DONE -> STATUS_PART_DONE: use task["res"] as `task_func` param,it means that the task has been started but not completed STATUS_PART_DONE -> STATUS_DONE: use task["res"] as `task_func` param @@ -508,7 +518,7 @@ def (task_def, **kwargs) -> if task is None: break get_module_logger("run_task").info(task["def"]) - # when fetching `WAITING` task, use task["def"] to train. "def" means that the task has not been defined + # when fetching `WAITING` task, use task["def"] to train. if before_status == TaskManager.STATUS_WAITING: param = task["def"] # when fetching `PART_DONE` task, use task["res"] to train because the middle result has been saved to task["res"] From 892742d5b07580876c188c58f7190da5d49ab31b Mon Sep 17 00:00:00 2001 From: demon143 <59681577+demon143@users.noreply.github.com> Date: Tue, 7 Sep 2021 14:26:01 +0800 Subject: [PATCH 3/8] Update manage.py --- qlib/workflow/task/manage.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/qlib/workflow/task/manage.py b/qlib/workflow/task/manage.py index 9457849bc96..365739ecf5c 100644 --- a/qlib/workflow/task/manage.py +++ b/qlib/workflow/task/manage.py @@ -483,11 +483,11 @@ def run_task( After running this method, here are 4 situations (before_status -> after_status): - STATUS_WAITING -> STATUS_DONE: use task["def"] as `task_func` param,it means that the task has not been started + STATUS_WAITING -> STATUS_DONE: use task["def"] as `task_func` param, it means that the task has not been started STATUS_WAITING -> STATUS_PART_DONE: use task["def"] as `task_func` param - STATUS_PART_DONE -> STATUS_PART_DONE: use task["res"] as `task_func` param,it means that the task has been started but not completed + STATUS_PART_DONE -> STATUS_PART_DONE: use task["res"] as `task_func` param, it means that the task has been started but not completed STATUS_PART_DONE -> STATUS_DONE: use task["res"] as `task_func` param From a705856811d0c3c8e95cdb3a39a50573a93005e8 Mon Sep 17 00:00:00 2001 From: demon143 <59681577+demon143@users.noreply.github.com> Date: Tue, 7 Sep 2021 14:26:49 +0800 Subject: [PATCH 4/8] Update manage.py --- qlib/workflow/task/manage.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/qlib/workflow/task/manage.py b/qlib/workflow/task/manage.py index 365739ecf5c..f5fae774e9c 100644 --- a/qlib/workflow/task/manage.py +++ b/qlib/workflow/task/manage.py @@ -518,7 +518,7 @@ def (task_def, **kwargs) -> if task is None: break get_module_logger("run_task").info(task["def"]) - # when fetching `WAITING` task, use task["def"] to train. + # when fetching `WAITING` task, use task["def"] to train if before_status == TaskManager.STATUS_WAITING: param = task["def"] # when fetching `PART_DONE` task, use task["res"] to train because the middle result has been saved to task["res"] From e1f09b9b4911e42469a483430fb8cf9b2cc693e1 Mon Sep 17 00:00:00 2001 From: demon143 <59681577+demon143@users.noreply.github.com> Date: Tue, 7 Sep 2021 15:54:59 +0800 Subject: [PATCH 5/8] Update manage.py --- qlib/workflow/task/manage.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/qlib/workflow/task/manage.py b/qlib/workflow/task/manage.py index f5fae774e9c..c298c33b4a5 100644 --- a/qlib/workflow/task/manage.py +++ b/qlib/workflow/task/manage.py @@ -119,8 +119,8 @@ def _decode_task(self, task): Returns ------- - bson.objectid.ObjectId - Convert dict to bson + dict + Implement serialization """ for prefix in self.ENCODE_FIELDS_PREFIX: for k in list(task.keys()): From ff345b296fac29f44b80be350f44619dfbfd4285 Mon Sep 17 00:00:00 2001 From: demon143 <59681577+demon143@users.noreply.github.com> Date: Tue, 7 Sep 2021 20:51:29 +0800 Subject: [PATCH 6/8] Update manage.py --- qlib/workflow/task/manage.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/qlib/workflow/task/manage.py b/qlib/workflow/task/manage.py index c298c33b4a5..cd56b3be167 100644 --- a/qlib/workflow/task/manage.py +++ b/qlib/workflow/task/manage.py @@ -88,7 +88,7 @@ def __init__(self, task_pool: str): task_pool: str the name of Collection in MongoDB """ - self.task_pool:pymongo.collection.Collection = getattr(get_mongodb(), task_pool) + self.task_pool: pymongo.collection.Collection = getattr(get_mongodb(), task_pool) self.logger = get_module_logger(self.__class__.__name__) @staticmethod From 3116861291dd5dd429fe70ef78978856dc00591c Mon Sep 17 00:00:00 2001 From: demon143 <59681577+demon143@users.noreply.github.com> Date: Tue, 7 Sep 2021 21:11:07 +0800 Subject: [PATCH 7/8] Update manage.py --- qlib/workflow/task/manage.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/qlib/workflow/task/manage.py b/qlib/workflow/task/manage.py index cd56b3be167..03e59edb7eb 100644 --- a/qlib/workflow/task/manage.py +++ b/qlib/workflow/task/manage.py @@ -120,7 +120,7 @@ def _decode_task(self, task): Returns ------- dict - Implement serialization + return serialization's result by pickle """ for prefix in self.ENCODE_FIELDS_PREFIX: for k in list(task.keys()): From 4908d26288582f6cd5d90effe02748b8aeec73ee Mon Sep 17 00:00:00 2001 From: demon143 <59681577+demon143@users.noreply.github.com> Date: Thu, 9 Sep 2021 13:31:29 +0800 Subject: [PATCH 8/8] Update manage.py --- qlib/workflow/task/manage.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/qlib/workflow/task/manage.py b/qlib/workflow/task/manage.py index 03e59edb7eb..18b389c7345 100644 --- a/qlib/workflow/task/manage.py +++ b/qlib/workflow/task/manage.py @@ -110,7 +110,8 @@ def _encode_task(self, task): def _decode_task(self, task): """ - _decode_task is Serialization tool + _decode_task is Serialization tool. + Mongodb needs JSON, so it needs to convert Python objects into JSON objects through pickle Parameters ---------- @@ -120,7 +121,7 @@ def _decode_task(self, task): Returns ------- dict - return serialization's result by pickle + JSON required by mongodb """ for prefix in self.ENCODE_FIELDS_PREFIX: for k in list(task.keys()):