From 320de59d0c4a66a38da98676f9c4c4ce7d157d9c Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Wed, 23 Aug 2023 18:56:40 +0200 Subject: [PATCH 1/2] Reduce final footprint --- distributed/shuffle/_arrow.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/distributed/shuffle/_arrow.py b/distributed/shuffle/_arrow.py index 08ef2b3f616..e460f3a3a43 100644 --- a/distributed/shuffle/_arrow.py +++ b/distributed/shuffle/_arrow.py @@ -56,7 +56,7 @@ def convert_partition(data: bytes, meta: pd.DataFrame) -> pd.DataFrame: while file.tell() < end: sr = pa.RecordBatchStreamReader(file) shards.append(sr.read_all()) - table = pa.concat_tables(shards, promote=True) + table = pa.concat_tables(shards, promote=True).combine_chunks() df = from_pyarrow_table_dispatch(meta, table, self_destruct=True) return df.astype(meta.dtypes, copy=False) @@ -68,7 +68,7 @@ def list_of_buffers_to_table(data: list[bytes]) -> pa.Table: return pa.concat_tables( (deserialize_table(buffer) for buffer in data), promote=True - ) + ).combine_chunks() def serialize_table(table: pa.Table) -> bytes: From aa2711f599565d750479cc3f580e71146744bed5 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Thu, 24 Aug 2023 14:11:12 +0200 Subject: [PATCH 2/2] Experiment --- distributed/shuffle/_worker_plugin.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/distributed/shuffle/_worker_plugin.py b/distributed/shuffle/_worker_plugin.py index f41c9b5dfd0..0280204f988 100644 --- a/distributed/shuffle/_worker_plugin.py +++ b/distributed/shuffle/_worker_plugin.py @@ -104,10 +104,11 @@ async def shuffle_inputs_done(self, shuffle_id: ShuffleId, run_id: int) -> None: await shuffle.inputs_done() async def _close_shuffle_run(self, shuffle: ShuffleRun) -> None: - await shuffle.close() - async with self._runs_cleanup_condition: - self._runs.remove(shuffle) - self._runs_cleanup_condition.notify_all() + pass + # await shuffle.close() + # async with self._runs_cleanup_condition: + # self._runs.remove(shuffle) + # self._runs_cleanup_condition.notify_all() def shuffle_fail(self, shuffle_id: ShuffleId, run_id: int, message: str) -> None: """Fails the shuffle run with the message as exception and triggers cleanup.