From 374d186a6c05b1861760f8004fbce1940e6c6532 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Mon, 10 Oct 2022 05:51:18 -0700 Subject: [PATCH 1/4] Raise `CommClosedError` on UCX read frame errors --- distributed/comm/ucx.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/distributed/comm/ucx.py b/distributed/comm/ucx.py index 9c73a538156..d6db40d3895 100644 --- a/distributed/comm/ucx.py +++ b/distributed/comm/ucx.py @@ -352,8 +352,17 @@ async def read(self, deserializers=("cuda", "dask", "pickle", "error")): if any(cuda_recv_frames): synchronize_stream(0) - for each_frame in recv_frames: - await self.ep.recv(each_frame) + try: + for each_frame in recv_frames: + await self.ep.recv(each_frame) + except BaseException as e: + # In addition to UCX exceptions, may be CancelledError or a another + # "low-level" exception. The only safe thing to do is to abort. + # (See also https://github.com/dask/distributed/pull/6574). + self.abort() + raise CommClosedError( + f"Connection closed by writer.\nInner exception: {e!r}" + ) msg = await from_frames( frames, deserialize=self.deserialize, From a6564a6edc3bcc7b3b044def70861069b4c7eca9 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Mon, 10 Oct 2022 05:57:07 -0700 Subject: [PATCH 2/4] Tighten exception catching of UCX write errors --- distributed/comm/ucx.py | 39 ++++++++++++++++++++------------------- 1 file changed, 20 insertions(+), 19 deletions(-) diff --git a/distributed/comm/ucx.py b/distributed/comm/ucx.py index d6db40d3895..90879f4ec24 100644 --- a/distributed/comm/ucx.py +++ b/distributed/comm/ucx.py @@ -254,27 +254,28 @@ async def write( ) -> int: if self.closed(): raise CommClosedError("Endpoint is closed -- unable to send message") - try: - if serializers is None: - serializers = ("cuda", "dask", "pickle", "error") - # msg can also be a list of dicts when sending batched messages - frames = await to_frames( - msg, - serializers=serializers, - on_error=on_error, - allow_offload=self.allow_offload, - ) - nframes = len(frames) - cuda_frames = tuple(hasattr(f, "__cuda_array_interface__") for f in frames) - sizes = tuple(nbytes(f) for f in frames) - cuda_send_frames, send_frames = zip( - *( - (is_cuda, each_frame) - for is_cuda, each_frame in zip(cuda_frames, frames) - if nbytes(each_frame) > 0 - ) + + if serializers is None: + serializers = ("cuda", "dask", "pickle", "error") + # msg can also be a list of dicts when sending batched messages + frames = await to_frames( + msg, + serializers=serializers, + on_error=on_error, + allow_offload=self.allow_offload, + ) + nframes = len(frames) + cuda_frames = tuple(hasattr(f, "__cuda_array_interface__") for f in frames) + sizes = tuple(nbytes(f) for f in frames) + cuda_send_frames, send_frames = zip( + *( + (is_cuda, each_frame) + for is_cuda, each_frame in zip(cuda_frames, frames) + if nbytes(each_frame) > 0 ) + ) + try: # Send meta data # Send close flag and number of frames (_Bool, int64) From e661b92b4076c78666aa6d8dfe83ff237e41387e Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Mon, 10 Oct 2022 05:53:50 -0700 Subject: [PATCH 3/4] Raise `CommClosedError` on UCX upon possibly truncated frames --- distributed/comm/ucx.py | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/distributed/comm/ucx.py b/distributed/comm/ucx.py index 90879f4ec24..71bdc1eb612 100644 --- a/distributed/comm/ucx.py +++ b/distributed/comm/ucx.py @@ -364,12 +364,18 @@ async def read(self, deserializers=("cuda", "dask", "pickle", "error")): raise CommClosedError( f"Connection closed by writer.\nInner exception: {e!r}" ) - msg = await from_frames( - frames, - deserialize=self.deserialize, - deserializers=deserializers, - allow_offload=self.allow_offload, - ) + + try: + msg = await from_frames( + frames, + deserialize=self.deserialize, + deserializers=deserializers, + allow_offload=self.allow_offload, + ) + except EOFError: + # Frames possibly garbled or truncated by communication error + self.abort() + raise CommClosedError("Aborted stream on truncated data") return msg async def close(self): From e5d289fbe38b4ca242200d277557fdbea2b9c706 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Mon, 10 Oct 2022 12:45:50 -0700 Subject: [PATCH 4/4] Fix comment typos --- distributed/comm/ucx.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/distributed/comm/ucx.py b/distributed/comm/ucx.py index 71bdc1eb612..fc07d0489be 100644 --- a/distributed/comm/ucx.py +++ b/distributed/comm/ucx.py @@ -327,7 +327,7 @@ async def read(self, deserializers=("cuda", "dask", "pickle", "error")): header = struct.unpack(header_fmt, header) cuda_frames, sizes = header[:nframes], header[nframes:] except BaseException as e: - # In addition to UCX exceptions, may be CancelledError or a another + # In addition to UCX exceptions, may be CancelledError or another # "low-level" exception. The only safe thing to do is to abort. # (See also https://github.com/dask/distributed/pull/6574). self.abort() @@ -357,7 +357,7 @@ async def read(self, deserializers=("cuda", "dask", "pickle", "error")): for each_frame in recv_frames: await self.ep.recv(each_frame) except BaseException as e: - # In addition to UCX exceptions, may be CancelledError or a another + # In addition to UCX exceptions, may be CancelledError or another # "low-level" exception. The only safe thing to do is to abort. # (See also https://github.com/dask/distributed/pull/6574). self.abort()