From 27c46c754b103abd6651b3729bd25845327bb8a5 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Fri, 27 Mar 2020 12:07:33 -0700 Subject: [PATCH 1/5] Prefix `for`-loop variables with `each_*` Should make it easier to disambiguate things like `frame` and `frames` as they are now `each_frame` and `frames`. --- distributed/comm/ucx.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/distributed/comm/ucx.py b/distributed/comm/ucx.py index 7295b11bb48..7be38d014bb 100644 --- a/distributed/comm/ucx.py +++ b/distributed/comm/ucx.py @@ -160,9 +160,9 @@ async def write( np.array([nbytes(f) for f in frames], dtype=np.uint64) ) # Send frames - for frame in frames: - if nbytes(frame) > 0: - await self.ep.send(frame) + for each_frame in frames: + if nbytes(each_frame) > 0: + await self.ep.send(each_frame) return sum(map(nbytes, frames)) except (ucp.exceptions.UCXBaseException): self.abort() @@ -190,17 +190,17 @@ async def read(self, deserializers=("cuda", "dask", "pickle", "error")): else: # Recv frames frames = [] - for is_cuda, size in zip(is_cudas.tolist(), sizes.tolist()): - if size > 0: + for is_cuda, each_size in zip(is_cudas.tolist(), sizes.tolist()): + if each_size > 0: if is_cuda: - frame = cuda_array(size) + each_frame = cuda_array(each_size) else: - frame = np.empty(size, dtype=np.uint8) - await self.ep.recv(frame) - frames.append(frame) + each_frame = np.empty(each_size, dtype=np.uint8) + await self.ep.recv(each_frame) + frames.append(each_frame) else: if is_cuda: - frames.append(cuda_array(size)) + frames.append(cuda_array(each_size)) else: frames.append(b"") msg = await from_frames( From 75bd867ccb0e99a6826c933eb40335f5adf50e3a Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Fri, 27 Mar 2020 12:07:36 -0700 Subject: [PATCH 2/5] Allocate frames the same way in 0-length case --- distributed/comm/ucx.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/comm/ucx.py b/distributed/comm/ucx.py index 7be38d014bb..98226ca41d9 100644 --- a/distributed/comm/ucx.py +++ b/distributed/comm/ucx.py @@ -202,7 +202,7 @@ async def read(self, deserializers=("cuda", "dask", "pickle", "error")): if is_cuda: frames.append(cuda_array(each_size)) else: - frames.append(b"") + each_frame = np.empty(each_size, dtype=np.uint8) msg = await from_frames( frames, deserialize=self.deserialize, deserializers=deserializers ) From 8b87eddefc3b33746c834837ab2cdb8f894f6898 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Fri, 27 Mar 2020 12:07:38 -0700 Subject: [PATCH 3/5] Always allocate frames, receive non-trivial ones --- distributed/comm/ucx.py | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/distributed/comm/ucx.py b/distributed/comm/ucx.py index 98226ca41d9..c45f01270d6 100644 --- a/distributed/comm/ucx.py +++ b/distributed/comm/ucx.py @@ -191,18 +191,13 @@ async def read(self, deserializers=("cuda", "dask", "pickle", "error")): # Recv frames frames = [] for is_cuda, each_size in zip(is_cudas.tolist(), sizes.tolist()): + if is_cuda: + each_frame = cuda_array(each_size) + else: + each_frame = np.empty(each_size, dtype=np.uint8) if each_size > 0: - if is_cuda: - each_frame = cuda_array(each_size) - else: - each_frame = np.empty(each_size, dtype=np.uint8) await self.ep.recv(each_frame) - frames.append(each_frame) - else: - if is_cuda: - frames.append(cuda_array(each_size)) - else: - each_frame = np.empty(each_size, dtype=np.uint8) + frames.append(each_frame) msg = await from_frames( frames, deserialize=self.deserialize, deserializers=deserializers ) From 7623ac2c10a876138ddd3724e70d072abb1bf169 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Fri, 27 Mar 2020 12:07:39 -0700 Subject: [PATCH 4/5] Allocate all frames to fill before receiving --- distributed/comm/ucx.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/distributed/comm/ucx.py b/distributed/comm/ucx.py index c45f01270d6..27bcd476be4 100644 --- a/distributed/comm/ucx.py +++ b/distributed/comm/ucx.py @@ -189,15 +189,15 @@ async def read(self, deserializers=("cuda", "dask", "pickle", "error")): raise CommClosedError("While reading, the connection was closed") else: # Recv frames - frames = [] - for is_cuda, each_size in zip(is_cudas.tolist(), sizes.tolist()): - if is_cuda: - each_frame = cuda_array(each_size) - else: - each_frame = np.empty(each_size, dtype=np.uint8) - if each_size > 0: + frames = [ + cuda_array(each_size) + if is_cuda + else np.empty(each_size, dtype=np.uint8) + for is_cuda, each_size in zip(is_cudas.tolist(), sizes.tolist()) + ] + for each_frame in frames: + if len(each_frame) > 0: await self.ep.recv(each_frame) - frames.append(each_frame) msg = await from_frames( frames, deserialize=self.deserialize, deserializers=deserializers ) From e3f0520b916d79ee84ceb5edf91fe82b2dce3fcc Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Fri, 27 Mar 2020 12:07:40 -0700 Subject: [PATCH 5/5] Filter out non-trivial frames to transmit --- distributed/comm/ucx.py | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/distributed/comm/ucx.py b/distributed/comm/ucx.py index 27bcd476be4..01d8df47d59 100644 --- a/distributed/comm/ucx.py +++ b/distributed/comm/ucx.py @@ -147,6 +147,9 @@ async def write( frames = await to_frames( msg, serializers=serializers, on_error=on_error ) + send_frames = [ + each_frame for each_frame in frames if len(each_frame) > 0 + ] # Send meta data await self.ep.send(np.array([len(frames)], dtype=np.uint64)) @@ -159,11 +162,11 @@ async def write( await self.ep.send( np.array([nbytes(f) for f in frames], dtype=np.uint64) ) + # Send frames - for each_frame in frames: - if nbytes(each_frame) > 0: - await self.ep.send(each_frame) - return sum(map(nbytes, frames)) + for each_frame in send_frames: + await self.ep.send(each_frame) + return sum(map(nbytes, send_frames)) except (ucp.exceptions.UCXBaseException): self.abort() raise CommClosedError("While writing, the connection was closed") @@ -195,9 +198,11 @@ async def read(self, deserializers=("cuda", "dask", "pickle", "error")): else np.empty(each_size, dtype=np.uint8) for is_cuda, each_size in zip(is_cudas.tolist(), sizes.tolist()) ] - for each_frame in frames: - if len(each_frame) > 0: - await self.ep.recv(each_frame) + recv_frames = [ + each_frame for each_frame in frames if len(each_frame) > 0 + ] + for each_frame in recv_frames: + await self.ep.recv(each_frame) msg = await from_frames( frames, deserialize=self.deserialize, deserializers=deserializers )