From 93f714ef49d49a79c7d7ab380110ed2a843448fa Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Mon, 3 Jun 2019 17:29:11 -0700 Subject: [PATCH 1/3] Add tests for cudf serialization --- distributed/protocol/tests/test_cudf.py | 29 +++++++++++++++++++++++++ 1 file changed, 29 insertions(+) create mode 100644 distributed/protocol/tests/test_cudf.py diff --git a/distributed/protocol/tests/test_cudf.py b/distributed/protocol/tests/test_cudf.py new file mode 100644 index 00000000000..b556823ed5d --- /dev/null +++ b/distributed/protocol/tests/test_cudf.py @@ -0,0 +1,29 @@ +import pytest + +cudf = pytest.importorskip("cudf") + +from distributed.protocol import serialize, deserialize +import dask.dataframe as dd + + +@pytest.mark.parametrize( + "df", + [ + cudf.Series([1, 2, 3]), + cudf.Series([1, 2, None]), + cudf.DataFrame({"x": [1, 2, 3], "y": [1.0, 2.0, 3.0]}), + cudf.DataFrame({"x": [1, 2, 3], "s": ["a", "bb", "ccc"]}), + cudf.DataFrame( + {"x": [1, 2, None], "y": [1.0, 2.0, None], "s": ["a", "bb", None]} + ), + ], +) +def test_basic(df): + header, frames = serialize( + df, serializers=("cuda", "dask", "pickle"), on_error="raise" + ) + assert header["serializer"] == "cuda" + assert not any(isinstance(frame, (bytes, memoryview)) for frame in frames) + + df2 = deserialize(header, frames, deserializers=("cuda", "dask", "pickle")) + dd.assert_eq(df, df2) From 61d6c357fc905f2a0413aaed1182e03367dc1b2e Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Mon, 3 Jun 2019 17:29:37 -0700 Subject: [PATCH 2/3] Add test for cudf with ucx --- distributed/comm/tests/test_ucx.py | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/distributed/comm/tests/test_ucx.py b/distributed/comm/tests/test_ucx.py index 55a2f4ec82c..78a10d65adc 100644 --- a/distributed/comm/tests/test_ucx.py +++ b/distributed/comm/tests/test_ucx.py @@ -10,7 +10,7 @@ from distributed.comm import ucx, parse_address from distributed.protocol import to_serialize from distributed.deploy.local import LocalCluster -from distributed.utils_test import gen_test, loop, inc # noqa: 401 +from distributed.utils_test import gen_cluster, gen_test, loop, inc # noqa: 401 from .test_comms import check_deserialize @@ -294,3 +294,20 @@ def test_tcp_localcluster(loop): # assert any(w.data == {x.key: 2} for w in c.workers) # assert e.loop is c.loop # print(c.scheduler.workers) + + +@pytest.mark.asyncio +async def test_cudf_join(): + from dask.distributed import Scheduler, Worker + import dask + + cudf = pytest.importorskip("cudf") + async with Scheduler(protocol="ucx", port=0, interface="ib0") as s: + async with Worker(s.address, port=0) as a, Worker(s.address, port=0) as b: + async with Client(s.address, asynchronous=True) as c: + df = dask.datasets.timeseries( + dtypes={"x": int, "y": float}, freq="1s" + ).partitions[:2] + df = df.map_partitions(cudf.from_pandas) + await c.compute(df.x.sum()) + await c.compute(df[["x"]].sum()) From 30145ab64c56202cd5ecacf60d1449b827ab1353 Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Mon, 3 Jun 2019 17:30:45 -0700 Subject: [PATCH 3/3] Remove unnecessary is_cuda entry from header --- distributed/protocol/cudf.py | 1 - distributed/protocol/cupy.py | 1 - distributed/protocol/numba.py | 1 - 3 files changed, 3 deletions(-) diff --git a/distributed/protocol/cudf.py b/distributed/protocol/cudf.py index 018596b1560..679824167cf 100644 --- a/distributed/protocol/cudf.py +++ b/distributed/protocol/cudf.py @@ -34,7 +34,6 @@ def serialize_cudf_dataframe(x): arrays.extend(null_masks) header = { - "is_cuda": len(arrays), "subheaders": sub_headers, # TODO: the header must be msgpack (de)serializable. # See if we can avoid names, and just use integer positions. diff --git a/distributed/protocol/cupy.py b/distributed/protocol/cupy.py index 13c0348a821..acdbb2e15b6 100644 --- a/distributed/protocol/cupy.py +++ b/distributed/protocol/cupy.py @@ -24,7 +24,6 @@ def serialize_cupy_ndarray(x): # used in the ucx comms for gpu/cpu message passing # 'lengths' set by dask header = x.__cuda_array_interface__.copy() - header["is_cuda"] = 1 header["dtype"] = dtype return header, [data] diff --git a/distributed/protocol/numba.py b/distributed/protocol/numba.py index 18405ffebe0..06aeb398b08 100644 --- a/distributed/protocol/numba.py +++ b/distributed/protocol/numba.py @@ -29,7 +29,6 @@ def serialize_numba_ndarray(x): # used in the ucx comms for gpu/cpu message passing # 'lengths' set by dask header = x.__cuda_array_interface__.copy() - header["is_cuda"] = 1 header["dtype"] = dtype return header, [data]