From 8e3d41fcbccca7649687a930c45ac88dd4af6c1e Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Thu, 5 May 2022 02:51:15 -0700 Subject: [PATCH 01/51] Move `ensure_memoryview` to `distributed.utils` --- distributed/protocol/core.py | 10 +--------- distributed/utils.py | 9 +++++++++ 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/distributed/protocol/core.py b/distributed/protocol/core.py index 0e0ae003b5f..db063922794 100644 --- a/distributed/protocol/core.py +++ b/distributed/protocol/core.py @@ -17,19 +17,11 @@ serialize_and_split, ) from distributed.protocol.utils import msgpack_opts +from distributed.utils import ensure_memoryview logger = logging.getLogger(__name__) -def ensure_memoryview(obj): - """Ensure `obj` is a memoryview of datatype bytes""" - ret = memoryview(obj) - if ret.nbytes: - return ret.cast("B") - else: - return ret - - def dumps( msg, serializers=None, on_error="message", context=None, frame_split_size=None ) -> list: diff --git a/distributed/utils.py b/distributed/utils.py index 40116f6b01b..71f2b89c2b7 100644 --- a/distributed/utils.py +++ b/distributed/utils.py @@ -1013,6 +1013,15 @@ def ensure_bytes(s): ) from e +def ensure_memoryview(obj): + """Ensure `obj` is a memoryview of datatype bytes""" + ret = memoryview(obj) + if ret.nbytes: + return ret.cast("B") + else: + return ret + + def open_port(host=""): """Return a probably-open port From 20b3f9fc2745d798af4061d19e0146875570b540 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Thu, 5 May 2022 02:51:16 -0700 Subject: [PATCH 02/51] Replace `ret` with `mv` for clarity --- distributed/utils.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/distributed/utils.py b/distributed/utils.py index 71f2b89c2b7..fecb4e44a73 100644 --- a/distributed/utils.py +++ b/distributed/utils.py @@ -1015,11 +1015,11 @@ def ensure_bytes(s): def ensure_memoryview(obj): """Ensure `obj` is a memoryview of datatype bytes""" - ret = memoryview(obj) - if ret.nbytes: - return ret.cast("B") + mv = memoryview(obj) + if mv.nbytes: + return mv.cast("B") else: - return ret + return mv def open_port(host=""): From 6fd7179bbcb17cfd29f6bed68c8380b095e8ecab Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Thu, 5 May 2022 02:51:17 -0700 Subject: [PATCH 03/51] Coerce `obj` to `memoryview` only if needed --- distributed/utils.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/distributed/utils.py b/distributed/utils.py index fecb4e44a73..0dd48af307b 100644 --- a/distributed/utils.py +++ b/distributed/utils.py @@ -1015,7 +1015,12 @@ def ensure_bytes(s): def ensure_memoryview(obj): """Ensure `obj` is a memoryview of datatype bytes""" - mv = memoryview(obj) + mv: memoryview + if type(obj) is memoryview: + mv = obj + else: + mv = memoryview(obj) + if mv.nbytes: return mv.cast("B") else: From 007c8d160b6a2414723b0b3788bd9f519e8ffd1d Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Thu, 5 May 2022 02:51:17 -0700 Subject: [PATCH 04/51] Require `contiguous` data for `.cast("B")` Otherwise `memoryview` will raise a `TypeError`. --- distributed/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/utils.py b/distributed/utils.py index 0dd48af307b..7720ad986ef 100644 --- a/distributed/utils.py +++ b/distributed/utils.py @@ -1021,7 +1021,7 @@ def ensure_memoryview(obj): else: mv = memoryview(obj) - if mv.nbytes: + if mv.nbytes and mv.contiguous: return mv.cast("B") else: return mv From ec4220c43816db773e5456d4561b194f37223f19 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Thu, 5 May 2022 02:51:18 -0700 Subject: [PATCH 05/51] Copy to `bytes` first in non-contiguous case --- distributed/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/utils.py b/distributed/utils.py index 7720ad986ef..72def81e59f 100644 --- a/distributed/utils.py +++ b/distributed/utils.py @@ -1024,7 +1024,7 @@ def ensure_memoryview(obj): if mv.nbytes and mv.contiguous: return mv.cast("B") else: - return mv + return memoryview(mv.tobytes()) def open_port(host=""): From 0a63b8dacf0173a22debbdc722935a0cf3faca8f Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Thu, 5 May 2022 02:51:18 -0700 Subject: [PATCH 06/51] Shortcut trivial `memoryview` case No need to pay the cost for copying here. Just use an empty `bytes` object for the `memoryview`. Should be faster in this case and saves us a check in the `cast` case. --- distributed/utils.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/distributed/utils.py b/distributed/utils.py index 72def81e59f..8531d58f8d3 100644 --- a/distributed/utils.py +++ b/distributed/utils.py @@ -1021,7 +1021,9 @@ def ensure_memoryview(obj): else: mv = memoryview(obj) - if mv.nbytes and mv.contiguous: + if not mv.nbytes: + return memoryview(b"") + elif mv.contiguous: return mv.cast("B") else: return memoryview(mv.tobytes()) From 5fa88cfdf8122fca3ef629509ac0ec161a826ed9 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Thu, 5 May 2022 02:51:19 -0700 Subject: [PATCH 07/51] Fill out docstring & add comments --- distributed/utils.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/distributed/utils.py b/distributed/utils.py index 8531d58f8d3..814301a9776 100644 --- a/distributed/utils.py +++ b/distributed/utils.py @@ -1014,7 +1014,7 @@ def ensure_bytes(s): def ensure_memoryview(obj): - """Ensure `obj` is a memoryview of datatype bytes""" + """Ensure `obj` is a `memoryview` that is 1-D contiguous of `uint8` type""" mv: memoryview if type(obj) is memoryview: mv = obj @@ -1022,10 +1022,14 @@ def ensure_memoryview(obj): mv = memoryview(obj) if not mv.nbytes: + # empty return memoryview(b"") elif mv.contiguous: + # can reshape & cast zero-copy return mv.cast("B") else: + # `memoryview` is non-trivial & not contiguous. + # Copy it to contiguous form of the expected type. return memoryview(mv.tobytes()) From e932c42bef606abacd3383bca7686b48266cbe7e Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Thu, 5 May 2022 02:51:20 -0700 Subject: [PATCH 08/51] Drop unused conversion of `min_size` This is converted to `int` here, but is unused below. So go ahead and drop it as it doesn't seem to be needed. --- distributed/protocol/compression.py | 1 - 1 file changed, 1 deletion(-) diff --git a/distributed/protocol/compression.py b/distributed/protocol/compression.py index 5995d1d9d51..0427f84b763 100644 --- a/distributed/protocol/compression.py +++ b/distributed/protocol/compression.py @@ -161,7 +161,6 @@ def maybe_compress( if len(payload) > 2**31: # Too large, compression libraries often fail return None, payload - min_size = int(min_size) sample_size = int(sample_size) compress = compressions[compression]["compress"] From c3297f743edb7392a9c6700dbeb4221d8d26a7f4 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Thu, 5 May 2022 02:51:20 -0700 Subject: [PATCH 09/51] Join `if` cases together --- distributed/protocol/compression.py | 1 - 1 file changed, 1 deletion(-) diff --git a/distributed/protocol/compression.py b/distributed/protocol/compression.py index 0427f84b763..8077200c050 100644 --- a/distributed/protocol/compression.py +++ b/distributed/protocol/compression.py @@ -153,7 +153,6 @@ def maybe_compress( """ if compression == "auto": compression = default_compression - if not compression: return None, payload if len(payload) < min_size: From 288bdb473f042656efe6f4ac30befeb13d4ce3c9 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Thu, 5 May 2022 02:51:21 -0700 Subject: [PATCH 10/51] Use `nbytes` to get `payload` size Also rename `nbytes` variable to `payload_nbytes` for clarity. --- distributed/protocol/compression.py | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/distributed/protocol/compression.py b/distributed/protocol/compression.py index 8077200c050..9aebfacbbcd 100644 --- a/distributed/protocol/compression.py +++ b/distributed/protocol/compression.py @@ -16,7 +16,7 @@ import dask -from distributed.utils import ensure_bytes +from distributed.utils import ensure_bytes, nbytes compressions: dict[ str | None | Literal[False], @@ -155,9 +155,13 @@ def maybe_compress( compression = default_compression if not compression: return None, payload - if len(payload) < min_size: + + # Store size as it is used in a few cases. + payload_nbytes = nbytes(payload) + + if payload_nbytes < min_size: return None, payload - if len(payload) > 2**31: # Too large, compression libraries often fail + if payload_nbytes > 2**31: # Too large, compression libraries often fail return None, payload sample_size = int(sample_size) @@ -169,14 +173,9 @@ def maybe_compress( if len(compress(sample)) > 0.9 * len(sample): # sample not very compressible return None, payload - if type(payload) is memoryview: - nbytes = payload.itemsize * len(payload) - else: - nbytes = len(payload) - compressed = compress(ensure_bytes(payload)) - if len(compressed) > 0.9 * nbytes: # full data not very compressible + if len(compressed) > 0.9 * payload_nbytes: # full data not very compressible return None, payload else: return compression, compressed From 7871181fbead83c3fbb759356041a367c67affb7 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Thu, 5 May 2022 02:51:22 -0700 Subject: [PATCH 11/51] Consolidate payload size checks --- distributed/protocol/compression.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/distributed/protocol/compression.py b/distributed/protocol/compression.py index 9aebfacbbcd..cf1bab3fa64 100644 --- a/distributed/protocol/compression.py +++ b/distributed/protocol/compression.py @@ -159,9 +159,9 @@ def maybe_compress( # Store size as it is used in a few cases. payload_nbytes = nbytes(payload) - if payload_nbytes < min_size: - return None, payload - if payload_nbytes > 2**31: # Too large, compression libraries often fail + # Either too small to bother or + # too large, compression libraries often fail + if not (min_size < payload_nbytes < 2**31): return None, payload sample_size = int(sample_size) From e8672d36fb982486146e9bb44fbc350e45342ff4 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Thu, 5 May 2022 02:51:23 -0700 Subject: [PATCH 12/51] Use `memoryview` of `payload` To allow more efficient accessing of the `payload` (like when selecting portions in `byte_sample`), take a `memoryview` of the data. Ensure that is 1-D contiguous `uint8` data. This makes it very similar to `bytes`, which will work well in `byte_sample` and compressors that handle only a narrow form of the Python Buffer Protocol. This allows us to drop various `ensure_bytes` calls in compression that would otherwise copy the data. Should reduce memory usage when serializing as part of transmission or spilling. --- distributed/protocol/compression.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/distributed/protocol/compression.py b/distributed/protocol/compression.py index cf1bab3fa64..32243bdb7bf 100644 --- a/distributed/protocol/compression.py +++ b/distributed/protocol/compression.py @@ -16,7 +16,7 @@ import dask -from distributed.utils import ensure_bytes, nbytes +from distributed.utils import ensure_memoryview, nbytes compressions: dict[ str | None | Literal[False], @@ -131,7 +131,7 @@ def byte_sample(b, size, n): ends.append(starts[-1] + size) parts = [b[start:end] for start, end in zip(starts, ends)] - return b"".join(map(ensure_bytes, parts)) + return b"".join(parts) def maybe_compress( @@ -168,12 +168,15 @@ def maybe_compress( compress = compressions[compression]["compress"] + # Take a view of payload for efficient usage + mv = ensure_memoryview(payload) + # Compress a sample, return original if not very compressed - sample = byte_sample(payload, sample_size, nsamples) + sample = byte_sample(mv, sample_size, nsamples) if len(compress(sample)) > 0.9 * len(sample): # sample not very compressible return None, payload - compressed = compress(ensure_bytes(payload)) + compressed = compress(mv) if len(compressed) > 0.9 * payload_nbytes: # full data not very compressible return None, payload From d5b9c6952798097fab926438a84c89fe37f3e852 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Thu, 5 May 2022 02:51:23 -0700 Subject: [PATCH 13/51] Add tests of `ensure_memoryview` --- distributed/tests/test_utils.py | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/distributed/tests/test_utils.py b/distributed/tests/test_utils.py index 3358dbc1907..1659eb20ee4 100644 --- a/distributed/tests/test_utils.py +++ b/distributed/tests/test_utils.py @@ -28,6 +28,7 @@ _maybe_complex, ensure_bytes, ensure_ip, + ensure_memoryview, format_dashboard_link, get_ip_interface, get_traceback, @@ -269,6 +270,30 @@ def test_ensure_bytes_pyarrow_buffer(): assert isinstance(result, bytes) +def test_ensure_memoryview(): + data = [b"1", memoryview(b"1"), bytearray(b"1"), array.array("b", [49])] + for d in data: + result = ensure_memoryview(d) + assert isinstance(result, memoryview) + assert result == memoryview(b"1") + + +def test_ensure_memoryview_ndarray(): + np = pytest.importorskip("numpy") + result = ensure_memoryview(np.arange(12).reshape(3, 4)[:, ::2].T) + assert isinstance(result, memoryview) + assert result.ndim == 1 + assert result.format == "B" + assert result.contiguous + + +def test_ensure_memoryview_pyarrow_buffer(): + pa = pytest.importorskip("pyarrow") + buf = pa.py_buffer(b"123") + result = ensure_memoryview(buf) + assert isinstance(result, memoryview) + + def test_nbytes(): np = pytest.importorskip("numpy") From 8a8125d83adff93379b4d8b7608660acd0afb1e1 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Thu, 5 May 2022 02:51:24 -0700 Subject: [PATCH 14/51] Test compression with `memoryview` --- distributed/protocol/tests/test_protocol.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/distributed/protocol/tests/test_protocol.py b/distributed/protocol/tests/test_protocol.py index b2d0abd64fd..3eff8fe29f1 100644 --- a/distributed/protocol/tests/test_protocol.py +++ b/distributed/protocol/tests/test_protocol.py @@ -58,11 +58,21 @@ def test_compression_2(): pytest.importorskip("lz4") np = pytest.importorskip("numpy") x = np.random.random(10000) - msg = dumps(to_serialize(x.tobytes())) + msg = dumps(to_serialize(x.data)) compression = msgpack.loads(msg[1]).get("compression") assert all(c is None for c in compression) +def test_compression_3(): + pytest.importorskip("lz4") + np = pytest.importorskip("numpy") + x = np.ones(1000000) + frames = dumps({"x": Serialize(x.data)}) + assert sum(map(nbytes, frames)) < x.nbytes + y = loads(frames) + assert {"x": x.data} == y + + def test_compression_without_deserialization(): pytest.importorskip("lz4") np = pytest.importorskip("numpy") From bbed121542a0598a81f09d36729a5cde59aaf2dc Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Thu, 5 May 2022 02:51:25 -0700 Subject: [PATCH 15/51] Drop blank line & add a comment --- distributed/protocol/compression.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/protocol/compression.py b/distributed/protocol/compression.py index 32243bdb7bf..401aaf1e6b0 100644 --- a/distributed/protocol/compression.py +++ b/distributed/protocol/compression.py @@ -176,8 +176,8 @@ def maybe_compress( if len(compress(sample)) > 0.9 * len(sample): # sample not very compressible return None, payload + # Try compressing the real thing compressed = compress(mv) - if len(compressed) > 0.9 * payload_nbytes: # full data not very compressible return None, payload else: From 2658a4ee6fdfbb6ebe29bd274a5990b86555ea2e Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Thu, 5 May 2022 02:51:26 -0700 Subject: [PATCH 16/51] Unwrap comment --- distributed/protocol/compression.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/distributed/protocol/compression.py b/distributed/protocol/compression.py index 401aaf1e6b0..6235ec18406 100644 --- a/distributed/protocol/compression.py +++ b/distributed/protocol/compression.py @@ -159,8 +159,7 @@ def maybe_compress( # Store size as it is used in a few cases. payload_nbytes = nbytes(payload) - # Either too small to bother or - # too large, compression libraries often fail + # Either too small to bother or too large, compression libraries often fail if not (min_size < payload_nbytes < 2**31): return None, payload From c4299c175b96d63918f054becfad70e97c00122d Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Thu, 5 May 2022 02:51:26 -0700 Subject: [PATCH 17/51] Test empty `bytes` with `memoryview` --- distributed/tests/test_utils.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/distributed/tests/test_utils.py b/distributed/tests/test_utils.py index 1659eb20ee4..f50bd2c0081 100644 --- a/distributed/tests/test_utils.py +++ b/distributed/tests/test_utils.py @@ -270,6 +270,12 @@ def test_ensure_bytes_pyarrow_buffer(): assert isinstance(result, bytes) +def test_ensure_memoryview_empty(): + result = ensure_memoryview(b"") + assert isinstance(result, memoryview) + assert result == memoryview(b"") + + def test_ensure_memoryview(): data = [b"1", memoryview(b"1"), bytearray(b"1"), array.array("b", [49])] for d in data: From 6ef14e7483706cb80832f626b55bdf71e775f451 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Thu, 5 May 2022 03:19:57 -0700 Subject: [PATCH 18/51] Coerce `b` to `memoryview` to avoid copies --- distributed/protocol/compression.py | 1 + 1 file changed, 1 insertion(+) diff --git a/distributed/protocol/compression.py b/distributed/protocol/compression.py index 6235ec18406..5977566e424 100644 --- a/distributed/protocol/compression.py +++ b/distributed/protocol/compression.py @@ -124,6 +124,7 @@ def byte_sample(b, size, n): n : int number of samples to collect """ + b = ensure_memoryview(b) starts = [random.randint(0, len(b) - size) for j in range(n)] ends = [] for i, start in enumerate(starts[:-1]): From e74d3c837accd191580b205313f9a5fd2f9caf80 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Thu, 5 May 2022 03:20:47 -0700 Subject: [PATCH 19/51] Add blank line --- distributed/protocol/compression.py | 1 + 1 file changed, 1 insertion(+) diff --git a/distributed/protocol/compression.py b/distributed/protocol/compression.py index 5977566e424..402b070488d 100644 --- a/distributed/protocol/compression.py +++ b/distributed/protocol/compression.py @@ -125,6 +125,7 @@ def byte_sample(b, size, n): number of samples to collect """ b = ensure_memoryview(b) + starts = [random.randint(0, len(b) - size) for j in range(n)] ends = [] for i, start in enumerate(starts[:-1]): From f99b5238da283f4858650b25808344cb175bb822 Mon Sep 17 00:00:00 2001 From: jakirkham Date: Thu, 5 May 2022 12:41:53 -0700 Subject: [PATCH 20/51] Special case fewer `parts` in `byte_sample` Co-authored-by: Martin Durant --- distributed/protocol/compression.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/distributed/protocol/compression.py b/distributed/protocol/compression.py index 402b070488d..6ee2f110513 100644 --- a/distributed/protocol/compression.py +++ b/distributed/protocol/compression.py @@ -133,7 +133,12 @@ def byte_sample(b, size, n): ends.append(starts[-1] + size) parts = [b[start:end] for start, end in zip(starts, ends)] - return b"".join(parts) + if n == 0: + return b"" + elif n == 1: + return parts[0] + else: + return b"".join(parts) def maybe_compress( From e4500b9e5ff4ceb784e7018b4a9b94f680969dfd Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Thu, 5 May 2022 12:59:49 -0700 Subject: [PATCH 21/51] Shortcut `n == 0` in `byte_sample` sooner --- distributed/protocol/compression.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/distributed/protocol/compression.py b/distributed/protocol/compression.py index 6ee2f110513..6ef75ac4463 100644 --- a/distributed/protocol/compression.py +++ b/distributed/protocol/compression.py @@ -124,6 +124,9 @@ def byte_sample(b, size, n): n : int number of samples to collect """ + if n == 0: + return b"" + b = ensure_memoryview(b) starts = [random.randint(0, len(b) - size) for j in range(n)] @@ -133,9 +136,7 @@ def byte_sample(b, size, n): ends.append(starts[-1] + size) parts = [b[start:end] for start, end in zip(starts, ends)] - if n == 0: - return b"" - elif n == 1: + if n == 1: return parts[0] else: return b"".join(parts) From 7c801e5cff91bb1bf8be7fc4c024e9e38baa3c4d Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Thu, 5 May 2022 15:48:13 -0700 Subject: [PATCH 22/51] Always return a `memoryview` from `byte_sample` Make the `return` type more consistent. --- distributed/protocol/compression.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/distributed/protocol/compression.py b/distributed/protocol/compression.py index 6ef75ac4463..4582fe914c9 100644 --- a/distributed/protocol/compression.py +++ b/distributed/protocol/compression.py @@ -125,7 +125,7 @@ def byte_sample(b, size, n): number of samples to collect """ if n == 0: - return b"" + return memoryview(b"") b = ensure_memoryview(b) @@ -139,7 +139,7 @@ def byte_sample(b, size, n): if n == 1: return parts[0] else: - return b"".join(parts) + return memoryview(b"".join(parts)) def maybe_compress( From 2a2650275d55e07c15dfd155fa6c2024bc0b35dc Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Thu, 5 May 2022 15:53:15 -0700 Subject: [PATCH 23/51] Nest compressibility checks --- distributed/protocol/compression.py | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/distributed/protocol/compression.py b/distributed/protocol/compression.py index 4582fe914c9..dcbb5d9c064 100644 --- a/distributed/protocol/compression.py +++ b/distributed/protocol/compression.py @@ -178,17 +178,15 @@ def maybe_compress( # Take a view of payload for efficient usage mv = ensure_memoryview(payload) - # Compress a sample, return original if not very compressed + # Try compressing a sample to see if it compresses well sample = byte_sample(mv, sample_size, nsamples) - if len(compress(sample)) > 0.9 * len(sample): # sample not very compressible - return None, payload - - # Try compressing the real thing - compressed = compress(mv) - if len(compressed) > 0.9 * payload_nbytes: # full data not very compressible - return None, payload - else: - return compression, compressed + if len(compress(sample)) <= 0.9 * len(sample): # sample is compressible + # Try compressing the real thing and check how compressed it is + compressed = compress(mv) + if len(compressed) <= 0.9 * payload_nbytes: # full data is compressible + return compression, compressed + # Sample or payload didn't compress well. Skip compressing. + return None, payload def decompress(header, frames): From d3064f2c62a04624a7b8c6af663eb49adc15b344 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Thu, 5 May 2022 15:53:22 -0700 Subject: [PATCH 24/51] Lighten up on comments around compressibility Some of these comments are redundant now. So drop some of them to focus the content more. --- distributed/protocol/compression.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/distributed/protocol/compression.py b/distributed/protocol/compression.py index dcbb5d9c064..2becd0413c2 100644 --- a/distributed/protocol/compression.py +++ b/distributed/protocol/compression.py @@ -180,10 +180,10 @@ def maybe_compress( # Try compressing a sample to see if it compresses well sample = byte_sample(mv, sample_size, nsamples) - if len(compress(sample)) <= 0.9 * len(sample): # sample is compressible + if len(compress(sample)) <= 0.9 * len(sample): # Try compressing the real thing and check how compressed it is compressed = compress(mv) - if len(compressed) <= 0.9 * payload_nbytes: # full data is compressible + if len(compressed) <= 0.9 * payload_nbytes: return compression, compressed # Sample or payload didn't compress well. Skip compressing. return None, payload From 2f71f887f9205c7006e26a03f4cad5fe413c6727 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Thu, 5 May 2022 15:56:36 -0700 Subject: [PATCH 25/51] Use `;` instead of `,` in comment --- distributed/protocol/compression.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/protocol/compression.py b/distributed/protocol/compression.py index 2becd0413c2..f43cf12f82f 100644 --- a/distributed/protocol/compression.py +++ b/distributed/protocol/compression.py @@ -167,7 +167,7 @@ def maybe_compress( # Store size as it is used in a few cases. payload_nbytes = nbytes(payload) - # Either too small to bother or too large, compression libraries often fail + # Either too small to bother or too large; compression libraries often fail if not (min_size < payload_nbytes < 2**31): return None, payload From 17677b82da52748cbc0d2dcc71d7f899a1a91c0b Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Thu, 5 May 2022 16:00:55 -0700 Subject: [PATCH 26/51] Use `.nbytes` with `memoryview`s --- distributed/protocol/compression.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/distributed/protocol/compression.py b/distributed/protocol/compression.py index f43cf12f82f..f5fe1802f17 100644 --- a/distributed/protocol/compression.py +++ b/distributed/protocol/compression.py @@ -129,7 +129,7 @@ def byte_sample(b, size, n): b = ensure_memoryview(b) - starts = [random.randint(0, len(b) - size) for j in range(n)] + starts = [random.randint(0, b.nbytes - size) for j in range(n)] ends = [] for i, start in enumerate(starts[:-1]): ends.append(min(start + size, starts[i + 1])) @@ -180,7 +180,7 @@ def maybe_compress( # Try compressing a sample to see if it compresses well sample = byte_sample(mv, sample_size, nsamples) - if len(compress(sample)) <= 0.9 * len(sample): + if len(compress(sample)) <= 0.9 * sample.nbytes: # Try compressing the real thing and check how compressed it is compressed = compress(mv) if len(compressed) <= 0.9 * payload_nbytes: From 28d45a3eb291204bdf23d1d27b0486c36c040569 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Thu, 5 May 2022 16:05:07 -0700 Subject: [PATCH 27/51] Clarify `ensure_memoryview` cases in comments --- distributed/utils.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/distributed/utils.py b/distributed/utils.py index 814301a9776..6c1b2b5f365 100644 --- a/distributed/utils.py +++ b/distributed/utils.py @@ -1022,10 +1022,10 @@ def ensure_memoryview(obj): mv = memoryview(obj) if not mv.nbytes: - # empty + # Drop `obj` reference so underlying data can be freed. return memoryview(b"") elif mv.contiguous: - # can reshape & cast zero-copy + # Perform zero-copy reshape & cast. return mv.cast("B") else: # `memoryview` is non-trivial & not contiguous. From 4fd06a1302e4ae994baae8916bf122e6007a7b44 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Thu, 5 May 2022 16:22:03 -0700 Subject: [PATCH 28/51] Also fast path `size == 0` --- distributed/protocol/compression.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/protocol/compression.py b/distributed/protocol/compression.py index f5fe1802f17..95fc81b2806 100644 --- a/distributed/protocol/compression.py +++ b/distributed/protocol/compression.py @@ -124,7 +124,7 @@ def byte_sample(b, size, n): n : int number of samples to collect """ - if n == 0: + if size == 0 or n == 0: return memoryview(b"") b = ensure_memoryview(b) From afb1a99afad77b2db3f85a2f350c02964e469302 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Thu, 5 May 2022 16:22:42 -0700 Subject: [PATCH 29/51] `assert` both `size` & `n` are well behaved --- distributed/protocol/compression.py | 1 + 1 file changed, 1 insertion(+) diff --git a/distributed/protocol/compression.py b/distributed/protocol/compression.py index 95fc81b2806..92232860d69 100644 --- a/distributed/protocol/compression.py +++ b/distributed/protocol/compression.py @@ -127,6 +127,7 @@ def byte_sample(b, size, n): if size == 0 or n == 0: return memoryview(b"") + assert size > 0 and n > 0 b = ensure_memoryview(b) starts = [random.randint(0, b.nbytes - size) for j in range(n)] From fc35a135a0efb1a7c59664443491fefa9a493645 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Thu, 5 May 2022 16:26:19 -0700 Subject: [PATCH 30/51] Use `islice` with `starts` --- distributed/protocol/compression.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/distributed/protocol/compression.py b/distributed/protocol/compression.py index 92232860d69..9ae1f97d6f3 100644 --- a/distributed/protocol/compression.py +++ b/distributed/protocol/compression.py @@ -9,6 +9,7 @@ import random from collections.abc import Callable from contextlib import suppress +from itertools import islice from typing import Literal from packaging.version import parse as parse_version @@ -132,7 +133,7 @@ def byte_sample(b, size, n): starts = [random.randint(0, b.nbytes - size) for j in range(n)] ends = [] - for i, start in enumerate(starts[:-1]): + for i, start in enumerate(islice(starts, n - 1)): ends.append(min(start + size, starts[i + 1])) ends.append(starts[-1] + size) From 689c8a5872f7d677950aee2bf48324a0b373726b Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Thu, 5 May 2022 16:27:33 -0700 Subject: [PATCH 31/51] From `random` just `import` `randint` --- distributed/protocol/compression.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/distributed/protocol/compression.py b/distributed/protocol/compression.py index 9ae1f97d6f3..934b94fece1 100644 --- a/distributed/protocol/compression.py +++ b/distributed/protocol/compression.py @@ -6,10 +6,10 @@ from __future__ import annotations import logging -import random from collections.abc import Callable from contextlib import suppress from itertools import islice +from random import randint from typing import Literal from packaging.version import parse as parse_version @@ -131,7 +131,7 @@ def byte_sample(b, size, n): assert size > 0 and n > 0 b = ensure_memoryview(b) - starts = [random.randint(0, b.nbytes - size) for j in range(n)] + starts = [randint(0, b.nbytes - size) for j in range(n)] ends = [] for i, start in enumerate(islice(starts, n - 1)): ends.append(min(start + size, starts[i + 1])) From a25fb6cfbc8d0fb3003d148620d0da01740ee88f Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Thu, 5 May 2022 16:29:35 -0700 Subject: [PATCH 32/51] Fast path `not compression` case Go ahead and exit immediately in this case before doing anything else. --- distributed/protocol/compression.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/distributed/protocol/compression.py b/distributed/protocol/compression.py index 934b94fece1..ae5f1d7558c 100644 --- a/distributed/protocol/compression.py +++ b/distributed/protocol/compression.py @@ -161,10 +161,10 @@ def maybe_compress( return the original 4. We return the compressed result """ - if compression == "auto": - compression = default_compression if not compression: return None, payload + if compression == "auto": + compression = default_compression # Store size as it is used in a few cases. payload_nbytes = nbytes(payload) From 72dbad015199933aee76fddc14020372ddaa6e0b Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Thu, 5 May 2022 16:38:27 -0700 Subject: [PATCH 33/51] Normalize args after size check --- distributed/protocol/compression.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/distributed/protocol/compression.py b/distributed/protocol/compression.py index ae5f1d7558c..6b7be9f4588 100644 --- a/distributed/protocol/compression.py +++ b/distributed/protocol/compression.py @@ -163,8 +163,6 @@ def maybe_compress( """ if not compression: return None, payload - if compression == "auto": - compression = default_compression # Store size as it is used in a few cases. payload_nbytes = nbytes(payload) @@ -173,8 +171,10 @@ def maybe_compress( if not (min_size < payload_nbytes < 2**31): return None, payload + # Normalize function arguments sample_size = int(sample_size) - + if compression == "auto": + compression = default_compression compress = compressions[compression]["compress"] # Take a view of payload for efficient usage From 3fd096af1dae43bc2df50438762501e1eb1be582 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Thu, 5 May 2022 16:38:27 -0700 Subject: [PATCH 34/51] Use `mv.nbytes` in compression check This is a bit clearer while being just as fast. --- distributed/protocol/compression.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/protocol/compression.py b/distributed/protocol/compression.py index 6b7be9f4588..09c8ba90a68 100644 --- a/distributed/protocol/compression.py +++ b/distributed/protocol/compression.py @@ -185,7 +185,7 @@ def maybe_compress( if len(compress(sample)) <= 0.9 * sample.nbytes: # Try compressing the real thing and check how compressed it is compressed = compress(mv) - if len(compressed) <= 0.9 * payload_nbytes: + if len(compressed) <= 0.9 * mv.nbytes: return compression, compressed # Sample or payload didn't compress well. Skip compressing. return None, payload From 2dab94795956b9724ccb1ebd49e8bf68e657ed38 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Thu, 5 May 2022 16:38:29 -0700 Subject: [PATCH 35/51] Consolidate size check code --- distributed/protocol/compression.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/distributed/protocol/compression.py b/distributed/protocol/compression.py index 09c8ba90a68..1ec5dcc9a26 100644 --- a/distributed/protocol/compression.py +++ b/distributed/protocol/compression.py @@ -164,11 +164,8 @@ def maybe_compress( if not compression: return None, payload - # Store size as it is used in a few cases. - payload_nbytes = nbytes(payload) - # Either too small to bother or too large; compression libraries often fail - if not (min_size < payload_nbytes < 2**31): + if not (min_size < nbytes(payload) < 2**31): return None, payload # Normalize function arguments From d02a4e90eec8f8f32bb46b7540f7dd63d8bc6e43 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Thu, 5 May 2022 16:40:13 -0700 Subject: [PATCH 36/51] Consolidate `size` & `n` handling --- distributed/protocol/compression.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/protocol/compression.py b/distributed/protocol/compression.py index 1ec5dcc9a26..4dc7edff293 100644 --- a/distributed/protocol/compression.py +++ b/distributed/protocol/compression.py @@ -125,10 +125,10 @@ def byte_sample(b, size, n): n : int number of samples to collect """ + assert size >= 0 and n >= 0 if size == 0 or n == 0: return memoryview(b"") - assert size > 0 and n > 0 b = ensure_memoryview(b) starts = [randint(0, b.nbytes - size) for j in range(n)] From dcb8c60ff7508ba8188d240e3725b07cd990b3bd Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Thu, 5 May 2022 16:41:46 -0700 Subject: [PATCH 37/51] Compute largest `start` once --- distributed/protocol/compression.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/distributed/protocol/compression.py b/distributed/protocol/compression.py index 4dc7edff293..1118d396512 100644 --- a/distributed/protocol/compression.py +++ b/distributed/protocol/compression.py @@ -131,7 +131,8 @@ def byte_sample(b, size, n): b = ensure_memoryview(b) - starts = [randint(0, b.nbytes - size) for j in range(n)] + max_start = b.nbytes - size + starts = [randint(0, max_start) for j in range(n)] ends = [] for i, start in enumerate(islice(starts, n - 1)): ends.append(min(start + size, starts[i + 1])) From 63d94e02ef87ac78325db92d0725d145e0e54af4 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Thu, 5 May 2022 16:44:06 -0700 Subject: [PATCH 38/51] Consolidate fast paths --- distributed/protocol/compression.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/distributed/protocol/compression.py b/distributed/protocol/compression.py index 1118d396512..9dc18ddec53 100644 --- a/distributed/protocol/compression.py +++ b/distributed/protocol/compression.py @@ -164,9 +164,9 @@ def maybe_compress( """ if not compression: return None, payload - - # Either too small to bother or too large; compression libraries often fail if not (min_size < nbytes(payload) < 2**31): + # Either too small to bother + # or too large (compression libraries often fail) return None, payload # Normalize function arguments From 5d6aa1cd2a7d6825a204b271f58ac490452007bb Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Thu, 5 May 2022 16:46:21 -0700 Subject: [PATCH 39/51] Simplify final comment --- distributed/protocol/compression.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/protocol/compression.py b/distributed/protocol/compression.py index 9dc18ddec53..37e302c745e 100644 --- a/distributed/protocol/compression.py +++ b/distributed/protocol/compression.py @@ -185,7 +185,7 @@ def maybe_compress( compressed = compress(mv) if len(compressed) <= 0.9 * mv.nbytes: return compression, compressed - # Sample or payload didn't compress well. Skip compressing. + # Skip compression as `sample` or `payload` didn't compress well. return None, payload From cf760cc35cf17f774a7e4aeb5fd2f2a52e4153f7 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Thu, 5 May 2022 17:11:27 -0700 Subject: [PATCH 40/51] Fuse loops in `byte_sample` to make `parts` --- distributed/protocol/compression.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/distributed/protocol/compression.py b/distributed/protocol/compression.py index 37e302c745e..3e9b53cd4d5 100644 --- a/distributed/protocol/compression.py +++ b/distributed/protocol/compression.py @@ -8,7 +8,6 @@ import logging from collections.abc import Callable from contextlib import suppress -from itertools import islice from random import randint from typing import Literal @@ -131,14 +130,16 @@ def byte_sample(b, size, n): b = ensure_memoryview(b) + parts = [] max_start = b.nbytes - size - starts = [randint(0, max_start) for j in range(n)] - ends = [] - for i, start in enumerate(islice(starts, n - 1)): - ends.append(min(start + size, starts[i + 1])) - ends.append(starts[-1] + size) + next_start = randint(0, max_start) + for i in range(n - 1): + start = next_start + next_start = randint(0, max_start) + end = min(start + size, next_start) + parts.append(b[start:end]) + parts.append(b[next_start : next_start + size]) - parts = [b[start:end] for start, end in zip(starts, ends)] if n == 1: return parts[0] else: From 8612caa6bb90eedba5dc2c1f1df5a14a3b82ecfb Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Thu, 5 May 2022 17:18:39 -0700 Subject: [PATCH 41/51] Set `start` to `next_start` at end --- distributed/protocol/compression.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/distributed/protocol/compression.py b/distributed/protocol/compression.py index 3e9b53cd4d5..bc6973237a5 100644 --- a/distributed/protocol/compression.py +++ b/distributed/protocol/compression.py @@ -132,13 +132,13 @@ def byte_sample(b, size, n): parts = [] max_start = b.nbytes - size - next_start = randint(0, max_start) + start = randint(0, max_start) for i in range(n - 1): - start = next_start next_start = randint(0, max_start) end = min(start + size, next_start) parts.append(b[start:end]) - parts.append(b[next_start : next_start + size]) + start = next_start + parts.append(b[start : start + size]) if n == 1: return parts[0] From 883f43f99fe9fbe9a671f6209efe22be4ac4523e Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Thu, 5 May 2022 17:25:21 -0700 Subject: [PATCH 42/51] Tidy comments --- distributed/protocol/compression.py | 2 +- distributed/utils.py | 7 +++---- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/distributed/protocol/compression.py b/distributed/protocol/compression.py index bc6973237a5..e9be5e9da59 100644 --- a/distributed/protocol/compression.py +++ b/distributed/protocol/compression.py @@ -186,7 +186,7 @@ def maybe_compress( compressed = compress(mv) if len(compressed) <= 0.9 * mv.nbytes: return compression, compressed - # Skip compression as `sample` or `payload` didn't compress well. + # Skip compression as `sample` or `payload` didn't compress well return None, payload diff --git a/distributed/utils.py b/distributed/utils.py index 6c1b2b5f365..3c289b29a1f 100644 --- a/distributed/utils.py +++ b/distributed/utils.py @@ -1022,14 +1022,13 @@ def ensure_memoryview(obj): mv = memoryview(obj) if not mv.nbytes: - # Drop `obj` reference so underlying data can be freed. + # Drop `obj` reference to allow freeing underlying data return memoryview(b"") elif mv.contiguous: - # Perform zero-copy reshape & cast. + # Perform zero-copy reshape & cast return mv.cast("B") else: - # `memoryview` is non-trivial & not contiguous. - # Copy it to contiguous form of the expected type. + # Copy to contiguous form of expected type return memoryview(mv.tobytes()) From 8de6296e751448c9f6603ec13275ce1b2bcacc05 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Thu, 5 May 2022 17:26:04 -0700 Subject: [PATCH 43/51] Tweak wording --- distributed/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/utils.py b/distributed/utils.py index 3c289b29a1f..9f002f89378 100644 --- a/distributed/utils.py +++ b/distributed/utils.py @@ -1022,7 +1022,7 @@ def ensure_memoryview(obj): mv = memoryview(obj) if not mv.nbytes: - # Drop `obj` reference to allow freeing underlying data + # Drop `obj` reference to permit freeing underlying data return memoryview(b"") elif mv.contiguous: # Perform zero-copy reshape & cast From e4e86bcf43522ae4427c668faa35aec6453a2172 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Thu, 5 May 2022 17:26:52 -0700 Subject: [PATCH 44/51] Also note `shape` change in comment --- distributed/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/utils.py b/distributed/utils.py index 9f002f89378..634349f1815 100644 --- a/distributed/utils.py +++ b/distributed/utils.py @@ -1028,7 +1028,7 @@ def ensure_memoryview(obj): # Perform zero-copy reshape & cast return mv.cast("B") else: - # Copy to contiguous form of expected type + # Copy to contiguous form of expected shape & type return memoryview(mv.tobytes()) From 57df6fdec59b810056eace6b77deea0df2fbd119 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Thu, 5 May 2022 17:36:47 -0700 Subject: [PATCH 45/51] Clarify `size` given sample selection behavior --- distributed/protocol/compression.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/distributed/protocol/compression.py b/distributed/protocol/compression.py index e9be5e9da59..bd18dfd8161 100644 --- a/distributed/protocol/compression.py +++ b/distributed/protocol/compression.py @@ -120,7 +120,8 @@ def byte_sample(b, size, n): ---------- b : bytes or memoryview size : int - size of each sample to collect + target size of each sample to collect + (may be smaller if samples collide) n : int number of samples to collect """ From 3aded1e2578473cf2c66deb2a2319c495c71d259 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Thu, 5 May 2022 17:49:05 -0700 Subject: [PATCH 46/51] Tweak comment --- distributed/protocol/compression.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/protocol/compression.py b/distributed/protocol/compression.py index bd18dfd8161..ed26e67d557 100644 --- a/distributed/protocol/compression.py +++ b/distributed/protocol/compression.py @@ -187,7 +187,7 @@ def maybe_compress( compressed = compress(mv) if len(compressed) <= 0.9 * mv.nbytes: return compression, compressed - # Skip compression as `sample` or `payload` didn't compress well + # Skip compression as the sample or the data didn't compress well return None, payload From 8aaf04d41a6dd324ab382c7ea0e06c3ada231489 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Thu, 5 May 2022 23:43:30 -0700 Subject: [PATCH 47/51] Fix comparisons As comparisons were effectively flipped from how they were before, these should have `=`s as a condition as well. --- distributed/protocol/compression.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/protocol/compression.py b/distributed/protocol/compression.py index ed26e67d557..458d53bd4fa 100644 --- a/distributed/protocol/compression.py +++ b/distributed/protocol/compression.py @@ -166,7 +166,7 @@ def maybe_compress( """ if not compression: return None, payload - if not (min_size < nbytes(payload) < 2**31): + if not (min_size <= nbytes(payload) <= 2**31): # Either too small to bother # or too large (compression libraries often fail) return None, payload From dc04e4c2190599e3bff88240531192d4e720c920 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Thu, 5 May 2022 23:46:01 -0700 Subject: [PATCH 48/51] Shorten docstring in `ensure_memoryview` --- distributed/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/utils.py b/distributed/utils.py index 634349f1815..475ea836aa5 100644 --- a/distributed/utils.py +++ b/distributed/utils.py @@ -1014,7 +1014,7 @@ def ensure_bytes(s): def ensure_memoryview(obj): - """Ensure `obj` is a `memoryview` that is 1-D contiguous of `uint8` type""" + """Ensure `obj` is a 1-D contiguous `uint8` `memoryview`""" mv: memoryview if type(obj) is memoryview: mv = obj From 75213a4c7bab27e142d52f51a200060c8bc98c50 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Thu, 5 May 2022 23:57:09 -0700 Subject: [PATCH 49/51] Preallocate `parts` to match intended size This can be quite a bit faster than `append`ing each value (particularly if resizing of the underlying array needs to occur). --- distributed/protocol/compression.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/distributed/protocol/compression.py b/distributed/protocol/compression.py index 458d53bd4fa..bc8ca2d4eea 100644 --- a/distributed/protocol/compression.py +++ b/distributed/protocol/compression.py @@ -131,15 +131,15 @@ def byte_sample(b, size, n): b = ensure_memoryview(b) - parts = [] + parts = n * [None] max_start = b.nbytes - size start = randint(0, max_start) for i in range(n - 1): next_start = randint(0, max_start) end = min(start + size, next_start) - parts.append(b[start:end]) + parts[i] = b[start:end] start = next_start - parts.append(b[start : start + size]) + parts[-1] = b[start : start + size] if n == 1: return parts[0] From a2d891a928b49cb40f5fb280279614950bb74dd2 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Fri, 6 May 2022 00:29:41 -0700 Subject: [PATCH 50/51] Just use `int`s for `min_size` & `sample_size` These are basically unused and are expected to be `int`s internally. So just pick default values that are `int`s to start. --- distributed/protocol/compression.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/distributed/protocol/compression.py b/distributed/protocol/compression.py index bc8ca2d4eea..5cbc21445c7 100644 --- a/distributed/protocol/compression.py +++ b/distributed/protocol/compression.py @@ -149,8 +149,8 @@ def byte_sample(b, size, n): def maybe_compress( payload, - min_size=1e4, - sample_size=1e4, + min_size=10_000, + sample_size=10_000, nsamples=5, compression=dask.config.get("distributed.comm.compression"), ): @@ -172,7 +172,6 @@ def maybe_compress( return None, payload # Normalize function arguments - sample_size = int(sample_size) if compression == "auto": compression = default_compression compress = compressions[compression]["compress"] From 86615643d1c2292f0e82c2984abd341c30f5b052 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Fri, 6 May 2022 00:29:51 -0700 Subject: [PATCH 51/51] Call `x.tobytes()` once and assign it Avoid repeated copies while testing that don't add value here. --- distributed/protocol/tests/test_protocol.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/distributed/protocol/tests/test_protocol.py b/distributed/protocol/tests/test_protocol.py index 3eff8fe29f1..8c281003a76 100644 --- a/distributed/protocol/tests/test_protocol.py +++ b/distributed/protocol/tests/test_protocol.py @@ -48,10 +48,11 @@ def test_compression_1(): pytest.importorskip("lz4") np = pytest.importorskip("numpy") x = np.ones(1000000) - frames = dumps({"x": Serialize(x.tobytes())}) - assert sum(map(nbytes, frames)) < x.nbytes + b = x.tobytes() + frames = dumps({"x": Serialize(b)}) + assert sum(map(nbytes, frames)) < nbytes(b) y = loads(frames) - assert {"x": x.tobytes()} == y + assert {"x": b} == y def test_compression_2():