diff --git a/distributed/shuffle/_arrow.py b/distributed/shuffle/_arrow.py index 08ef2b3f616..e460f3a3a43 100644 --- a/distributed/shuffle/_arrow.py +++ b/distributed/shuffle/_arrow.py @@ -56,7 +56,7 @@ def convert_partition(data: bytes, meta: pd.DataFrame) -> pd.DataFrame: while file.tell() < end: sr = pa.RecordBatchStreamReader(file) shards.append(sr.read_all()) - table = pa.concat_tables(shards, promote=True) + table = pa.concat_tables(shards, promote=True).combine_chunks() df = from_pyarrow_table_dispatch(meta, table, self_destruct=True) return df.astype(meta.dtypes, copy=False) @@ -68,7 +68,7 @@ def list_of_buffers_to_table(data: list[bytes]) -> pa.Table: return pa.concat_tables( (deserialize_table(buffer) for buffer in data), promote=True - ) + ).combine_chunks() def serialize_table(table: pa.Table) -> bytes: diff --git a/distributed/shuffle/_worker_plugin.py b/distributed/shuffle/_worker_plugin.py index f41c9b5dfd0..0280204f988 100644 --- a/distributed/shuffle/_worker_plugin.py +++ b/distributed/shuffle/_worker_plugin.py @@ -104,10 +104,11 @@ async def shuffle_inputs_done(self, shuffle_id: ShuffleId, run_id: int) -> None: await shuffle.inputs_done() async def _close_shuffle_run(self, shuffle: ShuffleRun) -> None: - await shuffle.close() - async with self._runs_cleanup_condition: - self._runs.remove(shuffle) - self._runs_cleanup_condition.notify_all() + pass + # await shuffle.close() + # async with self._runs_cleanup_condition: + # self._runs.remove(shuffle) + # self._runs_cleanup_condition.notify_all() def shuffle_fail(self, shuffle_id: ShuffleId, run_id: int, message: str) -> None: """Fails the shuffle run with the message as exception and triggers cleanup.