From 56a088617833928ff50ee4d8daac58785da5d662 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Fri, 6 May 2022 21:06:50 -0700 Subject: [PATCH 1/7] Use `ensure_memoryview` in `array` deserialization --- distributed/protocol/serialize.py | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/distributed/protocol/serialize.py b/distributed/protocol/serialize.py index f9bc91cf423..35b371d4208 100644 --- a/distributed/protocol/serialize.py +++ b/distributed/protocol/serialize.py @@ -23,7 +23,7 @@ pack_frames_prelude, unpack_frames, ) -from distributed.utils import has_keyword +from distributed.utils import ensure_memoryview, has_keyword dask_serialize = dask.utils.Dispatch("dask_serialize") dask_deserialize = dask.utils.Dispatch("dask_deserialize") @@ -765,12 +765,8 @@ def _serialize_array(obj): @dask_deserialize.register(array) def _deserialize_array(header, frames): a = array(header["typecode"]) - for f in map(memoryview, frames): - try: - f = f.cast("B") - except TypeError: - f = f.tobytes() - a.frombytes(f) + for f in frames: + a.frombytes(ensure_memoryview(f)) return a From ba31fa5f288c37cf6c4638a42d7e46c68d08ad51 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Fri, 6 May 2022 21:18:04 -0700 Subject: [PATCH 2/7] Test deserializing `array` with split frames too Make sure that if `frames` get split up `array` is still able to handle combining them back into an `array` during deserialization. --- distributed/protocol/tests/test_serialize.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/distributed/protocol/tests/test_serialize.py b/distributed/protocol/tests/test_serialize.py index 8c4eb51c02b..9b63b774fad 100644 --- a/distributed/protocol/tests/test_serialize.py +++ b/distributed/protocol/tests/test_serialize.py @@ -94,6 +94,8 @@ def test_serialize_bytestrings(): def test_serialize_arrays(typecode): a = array(typecode) a.extend(range(5)) + + # handle normal round trip through serialization header, frames = serialize(a) assert frames[0] == memoryview(a) a2 = deserialize(header, frames) @@ -101,6 +103,15 @@ def test_serialize_arrays(typecode): assert a2.typecode == a.typecode assert a2 == a + # split up frames to test joining them back together + header, frames = serialize(a) + (f,) = frames + frames = [f[:2], f[2:]] + a2 = deserialize(header, frames) + assert type(a2) == type(a) + assert a2.typecode == a.typecode + assert a2 == a + def test_Serialize(): s = Serialize(123) From f96d7543d2c14774e162ae12b9daa9cc7cf2635f Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Sat, 7 May 2022 03:41:48 -0700 Subject: [PATCH 3/7] Simplify test `array` construction --- distributed/protocol/tests/test_serialize.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/distributed/protocol/tests/test_serialize.py b/distributed/protocol/tests/test_serialize.py index 9b63b774fad..9016ea22dc7 100644 --- a/distributed/protocol/tests/test_serialize.py +++ b/distributed/protocol/tests/test_serialize.py @@ -92,8 +92,7 @@ def test_serialize_bytestrings(): "typecode", ["b", "B", "h", "H", "i", "I", "l", "L", "q", "Q", "f", "d"] ) def test_serialize_arrays(typecode): - a = array(typecode) - a.extend(range(5)) + a = array(typecode, range(5)) # handle normal round trip through serialization header, frames = serialize(a) From 36e498482cf2c38201547477d53b15fb6538f942 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Sat, 7 May 2022 03:56:10 -0700 Subject: [PATCH 4/7] Use `a3` for second array serialization test --- distributed/protocol/tests/test_serialize.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/distributed/protocol/tests/test_serialize.py b/distributed/protocol/tests/test_serialize.py index 9016ea22dc7..157376ef54f 100644 --- a/distributed/protocol/tests/test_serialize.py +++ b/distributed/protocol/tests/test_serialize.py @@ -106,10 +106,10 @@ def test_serialize_arrays(typecode): header, frames = serialize(a) (f,) = frames frames = [f[:2], f[2:]] - a2 = deserialize(header, frames) - assert type(a2) == type(a) - assert a2.typecode == a.typecode - assert a2 == a + a3 = deserialize(header, frames) + assert type(a3) == type(a) + assert a3.typecode == a.typecode + assert a3 == a def test_Serialize(): From fa5148d0d7dc8b17d5eb21e10e753d566e978d3f Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Sat, 7 May 2022 21:08:27 -0700 Subject: [PATCH 5/7] Call `array.frombytes(...)` once Just join all `frames` into a single `bytes` object if there are multiple to avoid resizing the array repeatedly and possibly running into `memoryview`s with incomplete array elements. --- distributed/protocol/serialize.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/distributed/protocol/serialize.py b/distributed/protocol/serialize.py index 35b371d4208..082b274a483 100644 --- a/distributed/protocol/serialize.py +++ b/distributed/protocol/serialize.py @@ -765,8 +765,11 @@ def _serialize_array(obj): @dask_deserialize.register(array) def _deserialize_array(header, frames): a = array(header["typecode"]) - for f in frames: - a.frombytes(ensure_memoryview(f)) + nframes = len(frames) + if nframes == 1: + a.frombytes(ensure_memoryview(frames[0])) + elif nframes > 1: + a.frombytes(b"".join(map(ensure_memoryview, frames))) return a From e3cc39cc2e45233410fd330cc2931166f9430a41 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Sat, 7 May 2022 21:08:28 -0700 Subject: [PATCH 6/7] Test `array` with `frames` that split elements As a frame could be split so that a whole element from the array is split into two different frames, make sure we test this case for good measure. --- distributed/protocol/tests/test_serialize.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/distributed/protocol/tests/test_serialize.py b/distributed/protocol/tests/test_serialize.py index 157376ef54f..fa5ac8a40e4 100644 --- a/distributed/protocol/tests/test_serialize.py +++ b/distributed/protocol/tests/test_serialize.py @@ -32,7 +32,7 @@ to_serialize, ) from distributed.protocol.serialize import check_dask_serializable -from distributed.utils import nbytes +from distributed.utils import ensure_memoryview, nbytes from distributed.utils_test import gen_test, inc @@ -105,7 +105,8 @@ def test_serialize_arrays(typecode): # split up frames to test joining them back together header, frames = serialize(a) (f,) = frames - frames = [f[:2], f[2:]] + f = ensure_memoryview(f) + frames = [f[:1], f[1:2], f[2:-1], f[-1:]] a3 = deserialize(header, frames) assert type(a3) == type(a) assert a3.typecode == a.typecode From b24513ee139c8f75b3b7d7efe9cb58d4ba0b5204 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Mon, 9 May 2022 11:01:47 -0700 Subject: [PATCH 7/7] Test serializing `array` with no `frames` --- distributed/protocol/tests/test_serialize.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/distributed/protocol/tests/test_serialize.py b/distributed/protocol/tests/test_serialize.py index fa5ac8a40e4..27368393b7d 100644 --- a/distributed/protocol/tests/test_serialize.py +++ b/distributed/protocol/tests/test_serialize.py @@ -88,6 +88,21 @@ def test_serialize_bytestrings(): assert bb == b +def test_serialize_empty_array(): + a = array("I") + + # serialize array + header, frames = serialize(a) + assert frames[0] == memoryview(a) + # drop empty frame + del frames[:] + # deserialize with no frames + a2 = deserialize(header, frames) + assert type(a2) == type(a) + assert a2.typecode == a.typecode + assert a2 == a + + @pytest.mark.parametrize( "typecode", ["b", "B", "h", "H", "i", "I", "l", "L", "q", "Q", "f", "d"] )