|
1 | | -import shutil |
2 | | -import subprocess |
3 | | -from datetime import datetime |
4 | | -from pathlib import Path |
5 | | -from typing import Any, Optional, Union |
6 | | - |
7 | 1 | import fire |
8 | 2 |
|
9 | 3 | from rdagent.app.data_science.conf import DS_RD_SETTING |
10 | | -from rdagent.components.coder.data_science.ensemble import EnsembleCoSTEER |
11 | | -from rdagent.components.coder.data_science.ensemble.exp import EnsembleTask |
12 | | -from rdagent.components.coder.data_science.feature import FeatureCoSTEER |
13 | | -from rdagent.components.coder.data_science.feature.exp import FeatureTask |
14 | | -from rdagent.components.coder.data_science.model import ModelCoSTEER |
15 | | -from rdagent.components.coder.data_science.model.exp import ModelTask |
16 | | -from rdagent.components.coder.data_science.pipeline import PipelineCoSTEER |
17 | | -from rdagent.components.coder.data_science.pipeline.exp import PipelineTask |
18 | | -from rdagent.components.coder.data_science.raw_data_loader import DataLoaderCoSTEER |
19 | | -from rdagent.components.coder.data_science.raw_data_loader.exp import DataLoaderTask |
20 | | -from rdagent.components.coder.data_science.share.doc import DocDev |
21 | | -from rdagent.components.coder.data_science.workflow import WorkflowCoSTEER |
22 | | -from rdagent.components.coder.data_science.workflow.exp import WorkflowTask |
23 | | -from rdagent.components.workflow.conf import BasePropSetting |
24 | | -from rdagent.components.workflow.rd_loop import RDLoop |
25 | | -from rdagent.core.conf import RD_AGENT_SETTINGS |
26 | | -from rdagent.core.exception import CoderError, RunnerError |
27 | | -from rdagent.core.proposal import ExperimentFeedback |
28 | | -from rdagent.core.scenario import Scenario |
29 | 4 | from rdagent.core.utils import import_class |
30 | 5 | from rdagent.log import rdagent_logger as logger |
31 | | -from rdagent.scenarios.data_science.dev.feedback import DSExperiment2Feedback |
32 | | -from rdagent.scenarios.data_science.dev.runner import DSCoSTEERRunner |
33 | | -from rdagent.scenarios.data_science.experiment.experiment import DSExperiment |
34 | | -from rdagent.scenarios.data_science.proposal.exp_gen import DSExpGen, DSTrace |
35 | | -from rdagent.scenarios.data_science.proposal.exp_gen.ckp_select import ( |
36 | | - BackJumpCKPSelector, |
37 | | - LatestCKPSelector, |
38 | | - SOTAJumpCKPSelector, |
39 | | -) |
40 | | -from rdagent.scenarios.data_science.proposal.exp_gen.idea_pool import DSKnowledgeBase |
41 | | -from rdagent.scenarios.data_science.proposal.exp_gen.sota_exp_select import ( |
42 | | - AutoSOTAexpSelector, |
43 | | - BestValidSelector, |
44 | | - GlobalSOTASelector, |
45 | | -) |
46 | | -from rdagent.scenarios.kaggle.kaggle_crawler import download_data |
47 | | - |
48 | | -CKP_SELECTOR_NAME_MAP = { |
49 | | - "latest": LatestCKPSelector, |
50 | | - "sota_jump": SOTAJumpCKPSelector, |
51 | | - "back_jump": BackJumpCKPSelector, |
52 | | -} |
53 | | - |
54 | | -SOTA_EXP_SELECTOR_NAME_MAP = { |
55 | | - "global_sota": GlobalSOTASelector, |
56 | | - "auto_sota": AutoSOTAexpSelector, |
57 | | - "best_valid_sota": BestValidSelector, |
58 | | -} |
59 | | - |
60 | | - |
61 | | -class DataScienceRDLoop(RDLoop): |
62 | | - skip_loop_error = (CoderError, RunnerError) |
63 | | - |
64 | | - def __init__(self, PROP_SETTING: BasePropSetting): |
65 | | - logger.log_object(PROP_SETTING.competition, tag="competition") |
66 | | - scen: Scenario = import_class(PROP_SETTING.scen)(PROP_SETTING.competition) |
67 | | - |
68 | | - # 1) task generation from scratch |
69 | | - # self.scratch_gen: tuple[HypothesisGen, Hypothesis2Experiment] = DummyHypothesisGen(scen), |
70 | | - |
71 | | - # 2) task generation from a complete solution |
72 | | - # self.exp_gen: ExpGen = import_class(PROP_SETTING.exp_gen)(scen) |
73 | | - |
74 | | - # self.ckp_selector = CKP_SELECTOR_NAME_MAP[DS_RD_SETTING.selector_name]() |
75 | | - # self.sota_exp_selector = SOTA_EXP_SELECTOR_NAME_MAP[DS_RD_SETTING.sota_exp_selector_name]() |
76 | | - self.ckp_selector = import_class(PROP_SETTING.selector_name)() |
77 | | - self.sota_exp_selector = import_class(PROP_SETTING.sota_exp_selector_name)() |
78 | | - |
79 | | - self.exp_gen = import_class(PROP_SETTING.hypothesis_gen)(scen) |
80 | | - |
81 | | - # coders |
82 | | - self.data_loader_coder = DataLoaderCoSTEER(scen) |
83 | | - self.feature_coder = FeatureCoSTEER(scen) |
84 | | - self.model_coder = ModelCoSTEER(scen) |
85 | | - self.ensemble_coder = EnsembleCoSTEER(scen) |
86 | | - self.workflow_coder = WorkflowCoSTEER(scen) |
87 | | - |
88 | | - self.pipeline_coder = PipelineCoSTEER(scen) |
89 | | - |
90 | | - self.runner = DSCoSTEERRunner(scen) |
91 | | - if DS_RD_SETTING.enable_doc_dev: |
92 | | - self.docdev = DocDev(scen) |
93 | | - # self.summarizer: Experiment2Feedback = import_class(PROP_SETTING.summarizer)(scen) |
94 | | - # logger.log_object(self.summarizer, tag="summarizer") |
95 | | - |
96 | | - if DS_RD_SETTING.enable_knowledge_base and DS_RD_SETTING.knowledge_base_version == "v1": |
97 | | - knowledge_base = DSKnowledgeBase( |
98 | | - path=DS_RD_SETTING.knowledge_base_path, idea_pool_json_path=DS_RD_SETTING.idea_pool_json_path |
99 | | - ) |
100 | | - self.trace = DSTrace(scen=scen, knowledge_base=knowledge_base) |
101 | | - else: |
102 | | - self.trace = DSTrace(scen=scen) |
103 | | - self.summarizer = DSExperiment2Feedback(scen) |
104 | | - super(RDLoop, self).__init__() |
105 | | - |
106 | | - def direct_exp_gen(self, prev_out: dict[str, Any]): |
107 | | - |
108 | | - # set the SOTA experiment to submit |
109 | | - sota_exp_to_submit = self.sota_exp_selector.get_sota_exp_to_submit(self.trace) |
110 | | - self.trace.set_sota_exp_to_submit(sota_exp_to_submit) |
111 | | - |
112 | | - # set the checkpoint to start from |
113 | | - selection = self.ckp_selector.get_selection(self.trace) |
114 | | - exp = self.exp_gen.gen(self.trace, selection) |
115 | | - logger.log_object(exp) |
116 | | - |
117 | | - # FIXME: this is for LLM debug webapp, remove this when the debugging is done. |
118 | | - logger.log_object(exp, tag="debug_exp_gen") |
119 | | - return exp |
120 | | - |
121 | | - def coding(self, prev_out: dict[str, Any]): |
122 | | - exp = prev_out["direct_exp_gen"] |
123 | | - for tasks in exp.pending_tasks_list: |
124 | | - exp.sub_tasks = tasks |
125 | | - with logger.tag(f"{exp.sub_tasks[0].__class__.__name__}"): |
126 | | - if isinstance(exp.sub_tasks[0], DataLoaderTask): |
127 | | - exp = self.data_loader_coder.develop(exp) |
128 | | - elif isinstance(exp.sub_tasks[0], FeatureTask): |
129 | | - exp = self.feature_coder.develop(exp) |
130 | | - elif isinstance(exp.sub_tasks[0], ModelTask): |
131 | | - exp = self.model_coder.develop(exp) |
132 | | - elif isinstance(exp.sub_tasks[0], EnsembleTask): |
133 | | - exp = self.ensemble_coder.develop(exp) |
134 | | - elif isinstance(exp.sub_tasks[0], WorkflowTask): |
135 | | - exp = self.workflow_coder.develop(exp) |
136 | | - elif isinstance(exp.sub_tasks[0], PipelineTask): |
137 | | - exp = self.pipeline_coder.develop(exp) |
138 | | - else: |
139 | | - raise NotImplementedError(f"Unsupported component in DataScienceRDLoop: {exp.hypothesis.component}") |
140 | | - exp.sub_tasks = [] |
141 | | - logger.log_object(exp) |
142 | | - return exp |
143 | | - |
144 | | - def running(self, prev_out: dict[str, Any]): |
145 | | - exp: DSExperiment = prev_out["coding"] |
146 | | - if exp.is_ready_to_run(): |
147 | | - new_exp = self.runner.develop(exp) |
148 | | - logger.log_object(new_exp) |
149 | | - exp = new_exp |
150 | | - if DS_RD_SETTING.enable_doc_dev: |
151 | | - self.docdev.develop(exp) |
152 | | - return exp |
153 | | - |
154 | | - def feedback(self, prev_out: dict[str, Any]) -> ExperimentFeedback: |
155 | | - """ |
156 | | - Assumption: |
157 | | - - If we come to feedback phase, the previous development steps are successful. |
158 | | - """ |
159 | | - exp: DSExperiment = prev_out["running"] |
160 | | - if self.trace.next_incomplete_component() is None or DS_RD_SETTING.coder_on_whole_pipeline: |
161 | | - # we have alreadly completed components in previous trace. So current loop is focusing on a new proposed idea. |
162 | | - # So we need feedback for the proposal. |
163 | | - feedback = self.summarizer.generate_feedback(exp, self.trace) |
164 | | - else: |
165 | | - # Otherwise, it is on drafting stage, don't need complicated feedbacks. |
166 | | - feedback = ExperimentFeedback( |
167 | | - reason=f"{exp.hypothesis.component} is completed.", |
168 | | - decision=True, |
169 | | - ) |
170 | | - logger.log_object(feedback) |
171 | | - return feedback |
172 | | - |
173 | | - def record(self, prev_out: dict[str, Any]): |
174 | | - # set the DAG parent for the trace |
175 | | - self.trace.sync_dag_parent_and_hist() |
176 | | - |
177 | | - e = prev_out.get(self.EXCEPTION_KEY, None) |
178 | | - if e is None: |
179 | | - self.trace.hist.append((prev_out["running"], prev_out["feedback"])) |
180 | | - else: |
181 | | - self.trace.hist.append( |
182 | | - ( |
183 | | - prev_out["direct_exp_gen"] if isinstance(e, CoderError) else prev_out["coding"], |
184 | | - ExperimentFeedback.from_exception(e), |
185 | | - ) |
186 | | - ) |
187 | | - if self.trace.sota_experiment() is None: |
188 | | - if DS_RD_SETTING.coder_on_whole_pipeline: |
189 | | - # check if feedback is not generated |
190 | | - if len(self.trace.hist) >= DS_RD_SETTING.coding_fail_reanalyze_threshold: |
191 | | - recent_hist = self.trace.hist[-DS_RD_SETTING.coding_fail_reanalyze_threshold :] |
192 | | - if all(isinstance(fb.exception, (CoderError, RunnerError)) for _, fb in recent_hist): |
193 | | - new_scen = self.trace.scen |
194 | | - if hasattr(new_scen, "reanalyze_competition_description"): |
195 | | - logger.info( |
196 | | - "Reanalyzing the competition description after three consecutive coding failures." |
197 | | - ) |
198 | | - new_scen.reanalyze_competition_description() |
199 | | - self.trace.scen = new_scen |
200 | | - else: |
201 | | - logger.info("Can not reanalyze the competition description.") |
202 | | - elif len(self.trace.hist) >= DS_RD_SETTING.consecutive_errors: |
203 | | - # if {in inital/drafting stage} and {tried enough times} |
204 | | - for _, fb in self.trace.hist[-DS_RD_SETTING.consecutive_errors :]: |
205 | | - if fb: |
206 | | - break # any success will stop restarting. |
207 | | - else: # otherwise restart it |
208 | | - logger.error("Consecutive errors reached the limit. Dumping trace.") |
209 | | - logger.log_object(self.trace, tag="trace before restart") |
210 | | - self.trace = DSTrace(scen=self.trace.scen, knowledge_base=self.trace.knowledge_base) |
211 | | - |
212 | | - logger.log_object(self.trace, tag="trace") |
213 | | - logger.log_object(self.trace.sota_experiment(), tag="SOTA experiment") |
214 | | - |
215 | | - if DS_RD_SETTING.enable_knowledge_base and DS_RD_SETTING.knowledge_base_version == "v1": |
216 | | - logger.log_object(self.trace.knowledge_base, tag="knowledge_base") |
217 | | - self.trace.knowledge_base.dump() |
218 | | - |
219 | | - if ( |
220 | | - DS_RD_SETTING.enable_log_archive |
221 | | - and DS_RD_SETTING.log_archive_path is not None |
222 | | - and Path(DS_RD_SETTING.log_archive_path).is_dir() |
223 | | - ): |
224 | | - start_archive_datetime = datetime.now() |
225 | | - logger.info(f"Archiving log and workspace folder after loop {self.loop_idx}") |
226 | | - mid_log_tar_path = ( |
227 | | - Path( |
228 | | - DS_RD_SETTING.log_archive_temp_path |
229 | | - if DS_RD_SETTING.log_archive_temp_path |
230 | | - else DS_RD_SETTING.log_archive_path |
231 | | - ) |
232 | | - / "mid_log.tar" |
233 | | - ) |
234 | | - mid_workspace_tar_path = ( |
235 | | - Path( |
236 | | - DS_RD_SETTING.log_archive_temp_path |
237 | | - if DS_RD_SETTING.log_archive_temp_path |
238 | | - else DS_RD_SETTING.log_archive_path |
239 | | - ) |
240 | | - / "mid_workspace.tar" |
241 | | - ) |
242 | | - subprocess.run(["tar", "-cf", str(mid_log_tar_path), "-C", (Path().cwd() / "log"), "."], check=True) |
243 | | - |
244 | | - # remove all files and folders in the workspace except for .py, .md, and .csv files to avoid large workspace dump |
245 | | - for workspace_id in Path(RD_AGENT_SETTINGS.workspace_path).iterdir(): |
246 | | - for file_and_folder in workspace_id.iterdir(): |
247 | | - if file_and_folder.is_dir(): |
248 | | - shutil.rmtree(file_and_folder) |
249 | | - elif file_and_folder.is_file() and file_and_folder.suffix not in [".py", ".md", ".csv"]: |
250 | | - file_and_folder.unlink() |
251 | | - |
252 | | - subprocess.run( |
253 | | - ["tar", "-cf", str(mid_workspace_tar_path), "-C", (RD_AGENT_SETTINGS.workspace_path), "."], check=True |
254 | | - ) |
255 | | - if DS_RD_SETTING.log_archive_temp_path is not None: |
256 | | - shutil.move(mid_log_tar_path, Path(DS_RD_SETTING.log_archive_path) / "mid_log.tar") |
257 | | - mid_log_tar_path = Path(DS_RD_SETTING.log_archive_path) / "mid_log.tar" |
258 | | - shutil.move(mid_workspace_tar_path, Path(DS_RD_SETTING.log_archive_path) / "mid_workspace.tar") |
259 | | - mid_workspace_tar_path = Path(DS_RD_SETTING.log_archive_path) / "mid_workspace.tar" |
260 | | - shutil.copy( |
261 | | - mid_log_tar_path, Path(DS_RD_SETTING.log_archive_path) / "mid_log_bak.tar" |
262 | | - ) # backup when upper code line is killed when running |
263 | | - shutil.copy( |
264 | | - mid_workspace_tar_path, Path(DS_RD_SETTING.log_archive_path) / "mid_workspace_bak.tar" |
265 | | - ) # backup when upper code line is killed when running |
266 | | - self.timer.add_duration(datetime.now() - start_archive_datetime) |
267 | | - |
268 | | - @classmethod |
269 | | - def load( |
270 | | - cls, |
271 | | - path: Union[str, Path], |
272 | | - output_path: Optional[Union[str, Path]] = None, |
273 | | - do_truncate: bool = False, |
274 | | - replace_timer: bool = True, |
275 | | - ) -> "LoopBase": |
276 | | - session = super().load(path, output_path, do_truncate, replace_timer) |
277 | | - logger.log_object(DS_RD_SETTING.competition, tag="competition") # NOTE: necessary to make mle_summary work. |
278 | | - if DS_RD_SETTING.enable_knowledge_base and DS_RD_SETTING.knowledge_base_version == "v1": |
279 | | - session.trace.knowledge_base = DSKnowledgeBase( |
280 | | - path=DS_RD_SETTING.knowledge_base_path, idea_pool_json_path=DS_RD_SETTING.idea_pool_json_path |
281 | | - ) |
282 | | - return session |
283 | | - |
284 | | - def dump(self, path: str | Path) -> None: |
285 | | - """ |
286 | | - Since knowledge_base is big and we don't want to dump it every time |
287 | | - So we remove it from the trace before dumping and restore it after. |
288 | | - """ |
289 | | - backup_knowledge_base = None |
290 | | - if self.trace.knowledge_base is not None: |
291 | | - backup_knowledge_base = self.trace.knowledge_base |
292 | | - self.trace.knowledge_base = None |
293 | | - super().dump(path) |
294 | | - if backup_knowledge_base is not None: |
295 | | - self.trace.knowledge_base = backup_knowledge_base |
| 6 | +from rdagent.scenarios.data_science.loop import DataScienceRDLoop |
296 | 7 |
|
297 | 8 |
|
298 | 9 | def main( |
|
0 commit comments