From 9d5db6c0b13f945757a2f9a9f2e943e20845851d Mon Sep 17 00:00:00 2001 From: Jake Ross Date: Thu, 4 Dec 2025 22:50:23 -0700 Subject: [PATCH 1/2] Stream after-hook processing in WellTransferer --- transfers/well_transfer.py | 29 +++++++++++++++++++---------- 1 file changed, 19 insertions(+), 10 deletions(-) diff --git a/transfers/well_transfer.py b/transfers/well_transfer.py index 50ea868b3..bb83750a8 100644 --- a/transfers/well_transfer.py +++ b/transfers/well_transfer.py @@ -624,18 +624,16 @@ def _after_hook(self, session): measuring_point_estimator = MeasuringPointEstimator() # add things thate need well id query = session.query(Thing).filter(Thing.thing_type == "water well") - count = query.count() - wells = query.all() - # for j, chunk in enumerate(chunk_by_size(query.all(), 100)): chunk_size = 100 - for j, chunki in enumerate(range(0, count, chunk_size)): - chunk = wells[chunki : chunki + chunk_size] + count = query.count() + processed = 0 + chunk = [] + + def _process_chunk(chunk_index: int, wells_chunk: list[Thing]): step_start_time = time.time() all_objects = [] - for i, well in enumerate(chunk): - objs = self._after_hook_chunk( - well, formations, measuring_point_estimator - ) + for well in wells_chunk: + objs = self._after_hook_chunk(well, formations, measuring_point_estimator) if objs: all_objects.extend(objs) @@ -649,10 +647,21 @@ def _after_hook(self, session): finally: save_time = time.time() - save_time + processed_count = chunk_index * chunk_size + len(wells_chunk) logger.info( - f"After hook: {(j+1)*100}/{count} took {time.time() - step_start_time:.2f}s, " + f"After hook: {processed_count}/{count} took {time.time() - step_start_time:.2f}s, " f"n_objects={len(all_objects)}, save_time={save_time}" ) + return processed_count + + for well in query.yield_per(chunk_size): + chunk.append(well) + if len(chunk) == chunk_size: + processed = _process_chunk(processed // chunk_size, chunk) + chunk = [] + + if chunk: + _process_chunk(processed // chunk_size, chunk) def _after_hook_chunk(self, well, formations, measuring_point_estimator): From 0056c63b16458e105f0614fd5599902b4b82b428 Mon Sep 17 00:00:00 2001 From: jirhiker Date: Fri, 5 Dec 2025 05:50:42 +0000 Subject: [PATCH 2/2] Formatting changes --- transfers/well_transfer.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/transfers/well_transfer.py b/transfers/well_transfer.py index bb83750a8..2aa70ba01 100644 --- a/transfers/well_transfer.py +++ b/transfers/well_transfer.py @@ -633,7 +633,9 @@ def _process_chunk(chunk_index: int, wells_chunk: list[Thing]): step_start_time = time.time() all_objects = [] for well in wells_chunk: - objs = self._after_hook_chunk(well, formations, measuring_point_estimator) + objs = self._after_hook_chunk( + well, formations, measuring_point_estimator + ) if objs: all_objects.extend(objs)