From 5f69bf161c1be45da873d7b4cd6f8dda6dc1c6f4 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Wed, 26 Jul 2023 10:57:54 -0700 Subject: [PATCH 1/6] simplify system for testing cudf --- dask_expr/_expr.py | 5 +++ dask_expr/_groupby.py | 13 ++++++ dask_expr/_reductions.py | 2 + dask_expr/io/tests/test_io.py | 11 ++--- dask_expr/tests/test_categorical.py | 11 +++-- dask_expr/tests/test_collection.py | 65 +++++++++++++++-------------- dask_expr/tests/test_concat.py | 29 +++++++------ dask_expr/tests/test_fusion.py | 11 +++-- dask_expr/tests/test_groupby.py | 28 ++++++++++--- dask_expr/tests/test_merge.py | 35 +++++++++------- 10 files changed, 130 insertions(+), 80 deletions(-) diff --git a/dask_expr/_expr.py b/dask_expr/_expr.py index 6390450bc..8275b9516 100644 --- a/dask_expr/_expr.py +++ b/dask_expr/_expr.py @@ -151,6 +151,11 @@ def __getattr__(self, key): try: return object.__getattribute__(self, key) except AttributeError as err: + if key == "_meta": + # Avoid a recursive loop if/when `self._meta` + # produces an `AttributeError` + raise RuntimeError(f"Failed to generate metadata for {self}") + # Allow operands to be accessed as attributes # as long as the keys are not already reserved # by existing methods/properties diff --git a/dask_expr/_groupby.py b/dask_expr/_groupby.py index dc113ff32..63714463a 100644 --- a/dask_expr/_groupby.py +++ b/dask_expr/_groupby.py @@ -81,6 +81,18 @@ class SingleAggregation(ApplyConcatApply): groupby_chunk = None groupby_aggregate = None + _required_groupby_attribute = None + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + if self._required_groupby_attribute: + g = self.frame._meta.groupby(self.by) + if not hasattr(g, self._required_groupby_attribute): + # Raise a ValueError instead of AttributeError to + # avoid infinite recursion + raise ValueError( + f"{g} has no attribute {self._required_groupby_attribute}" + ) @classmethod def chunk(cls, df, by=None, **kwargs): @@ -274,6 +286,7 @@ class Size(SingleAggregation): class ValueCounts(SingleAggregation): groupby_chunk = staticmethod(_value_counts) groupby_aggregate = staticmethod(_value_counts_aggregate) + _required_groupby_attribute = "value_counts" class Var(Reduction): diff --git a/dask_expr/_reductions.py b/dask_expr/_reductions.py index 4f2868661..0f37f9780 100644 --- a/dask_expr/_reductions.py +++ b/dask_expr/_reductions.py @@ -530,6 +530,7 @@ class IdxMin(Reduction): reduction_chunk = idxmaxmin_chunk reduction_combine = idxmaxmin_combine reduction_aggregate = idxmaxmin_agg + _required_attribute = "idxmin" _fn = "idxmin" @property @@ -547,6 +548,7 @@ def aggregate_kwargs(self): class IdxMax(IdxMin): + _required_attribute = "idxmax" _fn = "idxmax" diff --git a/dask_expr/io/tests/test_io.py b/dask_expr/io/tests/test_io.py index 68c2d7363..487e9c204 100644 --- a/dask_expr/io/tests/test_io.py +++ b/dask_expr/io/tests/test_io.py @@ -1,19 +1,17 @@ -import importlib import os import dask.dataframe as dd import pytest -from dask import config from dask.dataframe.utils import assert_eq from dask_expr import from_dask_dataframe, from_pandas, optimize, read_csv, read_parquet from dask_expr._expr import Expr, Lengths, Literal, Replace from dask_expr._reductions import Len from dask_expr.io import ReadParquet +from dask_expr.tests._util import _backend_library -# Import backend DataFrame library to test -BACKEND = config.get("dataframe.backend", "pandas") -lib = importlib.import_module(BACKEND) +# Set DataFrame backend for this module +lib = _backend_library() def _make_file(dir, format="parquet", df=None): @@ -215,8 +213,7 @@ def test_from_pandas_immutable(): def test_parquet_complex_filters(tmpdir): - with config.set({"dataframe.backend": BACKEND}): - df = read_parquet(_make_file(tmpdir)) + df = read_parquet(_make_file(tmpdir)) pdf = df.compute() got = df["a"][df["b"] > df["b"].mean()] expect = pdf["a"][pdf["b"] > pdf["b"].mean()] diff --git a/dask_expr/tests/test_categorical.py b/dask_expr/tests/test_categorical.py index 635ba2c1b..7fe9e9abb 100644 --- a/dask_expr/tests/test_categorical.py +++ b/dask_expr/tests/test_categorical.py @@ -1,13 +1,16 @@ -import pandas as pd import pytest from dask.dataframe import assert_eq from dask_expr import from_pandas +from dask_expr.tests._util import _backend_library + +# Set DataFrame backend for this module +lib = _backend_library() @pytest.fixture def pdf(): - pdf = pd.DataFrame({"x": [1, 2, 3, 4, 1, 2]}, dtype="category") + pdf = lib.DataFrame({"x": [1, 2, 3, 4, 1, 2]}, dtype="category") return pdf @@ -22,7 +25,7 @@ def test_set_categories(df, pdf): ser = df.x.cat.as_unknown() assert not ser.cat.known ser = ser.cat.as_known() - assert_eq(ser.cat.categories, pd.Index([1, 2, 3, 4])) + assert_eq(ser.cat.categories, lib.Index([1, 2, 3, 4])) ser = ser.cat.set_categories([1, 2, 3, 5, 4]) - assert_eq(ser.cat.categories, pd.Index([1, 2, 3, 5, 4])) + assert_eq(ser.cat.categories, lib.Index([1, 2, 3, 5, 4])) assert not ser.cat.ordered diff --git a/dask_expr/tests/test_collection.py b/dask_expr/tests/test_collection.py index f231dbbbf..06f75363b 100644 --- a/dask_expr/tests/test_collection.py +++ b/dask_expr/tests/test_collection.py @@ -1,6 +1,5 @@ from __future__ import annotations -import importlib import operator import pickle @@ -15,11 +14,10 @@ from dask_expr._expr import are_co_aligned from dask_expr._reductions import Len from dask_expr.datasets import timeseries +from dask_expr.tests._util import _backend_library, xfail_gpu -# Import backend DataFrame library to test -BACKEND = dask.config.get("dataframe.backend", "pandas") -CUDF_BACKEND = BACKEND == "cudf" -lib = importlib.import_module(BACKEND) +# Set DataFrame backend for this module +lib = _backend_library() @pytest.fixture @@ -53,7 +51,7 @@ def test_setitem(pdf, df): assert_eq(df, pdf) -@pytest.mark.xfail(CUDF_BACKEND, reason="https://github.com/rapidsai/cudf/issues/10271") +@xfail_gpu("https://github.com/rapidsai/cudf/issues/10271") def test_explode(): pdf = lib.DataFrame({"a": [[1, 2], [3, 4]]}) df = from_pandas(pdf) @@ -61,7 +59,7 @@ def test_explode(): assert_eq(pdf.a.explode(), df.a.explode()) -@pytest.mark.xfail(CUDF_BACKEND, reason="https://github.com/rapidsai/cudf/issues/10271") +@xfail_gpu("https://github.com/rapidsai/cudf/issues/10271") def test_explode_simplify(pdf): pdf["z"] = 1 df = from_pandas(pdf) @@ -115,8 +113,12 @@ def test_dask(pdf, df): M.mean, M.std, M.var, - M.idxmin, - M.idxmax, + pytest.param( + M.idxmin, marks=xfail_gpu("https://github.com/rapidsai/cudf/issues/9602") + ), + pytest.param( + M.idxmax, marks=xfail_gpu("https://github.com/rapidsai/cudf/issues/9602") + ), pytest.param( lambda df: df.size, marks=pytest.mark.skip(reason="scalars don't work yet"), @@ -124,8 +126,6 @@ def test_dask(pdf, df): ], ) def test_reductions(func, pdf, df): - if CUDF_BACKEND and func in [M.idxmin, M.idxmax]: - pytest.xfail(reason="https://github.com/rapidsai/cudf/issues/9602") result = func(df) assert result.known_divisions assert_eq(result, func(pdf)) @@ -138,11 +138,17 @@ def test_reductions(func, pdf, df): @pytest.mark.parametrize("axis", [0, 1]) -@pytest.mark.parametrize("skipna", [True, False]) +@pytest.mark.parametrize( + "skipna", + [ + True, + pytest.param( + False, marks=xfail_gpu("cudf requires skipna=True when nulls are present.") + ), + ], +) @pytest.mark.parametrize("ddof", [1, 2]) def test_std_kwargs(axis, skipna, ddof): - if CUDF_BACKEND and skipna is False: - pytest.xfail(reason="cudf requires skipna=True when nulls are present.") pdf = lib.DataFrame( {"x": range(30), "y": [1, 2, None] * 10, "z": ["dog", "cat"] * 15} ) @@ -153,7 +159,7 @@ def test_std_kwargs(axis, skipna, ddof): ) -@pytest.mark.xfail(CUDF_BACKEND, reason="nbytes not supported by cudf") +@xfail_gpu("nbytes not supported by cudf") def test_nbytes(pdf, df): with pytest.raises(NotImplementedError, match="nbytes is not implemented"): df.nbytes @@ -280,7 +286,7 @@ def test_and_or(func, pdf, df): assert_eq(func(pdf), func(df), check_names=False) -@pytest.mark.xfail(CUDF_BACKEND, reason="period_range not supported by cudf") +@xfail_gpu("period_range not supported by cudf") @pytest.mark.parametrize("how", ["start", "end"]) def test_to_timestamp(pdf, how): pdf.index = lib.period_range("2019-12-31", freq="D", periods=len(pdf)) @@ -321,7 +327,7 @@ def test_blockwise(func, pdf, df): assert_eq(func(pdf), func(df)) -@pytest.mark.xfail(CUDF_BACKEND, reason="func not supported by cudf") +@xfail_gpu("func not supported by cudf") @pytest.mark.parametrize( "func", [ @@ -353,7 +359,7 @@ def test_simplify_add_suffix_add_prefix(df, pdf): assert_eq(result, pdf.add_suffix("_2")["x_2"]) -@pytest.mark.xfail(CUDF_BACKEND, reason="rename_axis not supported by cudf") +@xfail_gpu("rename_axis not supported by cudf") def test_rename_axis(pdf): pdf.index.name = "a" pdf.columns.name = "b" @@ -386,7 +392,7 @@ def test_repr(df): assert "sum(skipna=False)" in s -@pytest.mark.xfail(CUDF_BACKEND, reason="combine_first not supported by cudf") +@xfail_gpu("combine_first not supported by cudf") def test_combine_first_simplify(pdf): df = from_pandas(pdf) pdf2 = pdf.rename(columns={"y": "z"}) @@ -681,7 +687,7 @@ def test_serialization(pdf, df): assert_eq(pickle.loads(before), pickle.loads(after)) -@pytest.mark.xfail(CUDF_BACKEND, reason="Cannot apply lambda function in cudf") +@xfail_gpu("Cannot apply lambda function in cudf") def test_size_optimized(df): expr = (df.x + 1).apply(lambda x: x).size out = optimize(expr) @@ -697,10 +703,7 @@ def test_size_optimized(df): @pytest.mark.parametrize("fuse", [True, False]) def test_tree_repr(fuse): s = from_pandas(lib.Series(range(10))).expr.tree_repr() - if BACKEND == "pandas": - assert "" in s - else: - assert "" in s + assert ("" in s) or ("" in s) df = timeseries() expr = ((df.x + 1).sum(skipna=False) + df.y.mean()).expr @@ -963,7 +966,7 @@ def test_sample(df): assert_eq(result, expected) -@pytest.mark.xfail(CUDF_BACKEND, reason="align not supported by cudf") +@xfail_gpu("align not supported by cudf") def test_align(df, pdf): result_1, result_2 = df.align(df) pdf_result_1, pdf_result_2 = pdf.align(pdf) @@ -976,7 +979,7 @@ def test_align(df, pdf): assert_eq(result_2, pdf_result_2) -@pytest.mark.xfail(CUDF_BACKEND, reason="align not supported by cudf") +@xfail_gpu("align not supported by cudf") def test_align_different_partitions(): pdf = lib.DataFrame({"a": [11, 12, 31, 1, 2, 3], "b": [1, 2, 3, 4, 5, 6]}) df = from_pandas(pdf, npartitions=2) @@ -991,7 +994,7 @@ def test_align_different_partitions(): assert_eq(result_2, pdf_result_2) -@pytest.mark.xfail(CUDF_BACKEND, reason="align not supported by cudf") +@xfail_gpu("align not supported by cudf") def test_align_unknown_partitions_same_root(): pdf = lib.DataFrame({"a": 1}, index=[3, 2, 1]) df = from_pandas(pdf, npartitions=2, sort=False) @@ -1001,7 +1004,7 @@ def test_align_unknown_partitions_same_root(): assert_eq(result_2, pdf_result_2) -@pytest.mark.skipif(CUDF_BACKEND, reason="align not supported by cudf") +@xfail_gpu(reason="align not supported by cudf") def test_unknown_partitions_different_root(): pdf = lib.DataFrame({"a": 1}, index=[3, 2, 1]) df = from_pandas(pdf, npartitions=2, sort=False) @@ -1011,7 +1014,7 @@ def test_unknown_partitions_different_root(): df.align(df2) -@pytest.mark.xfail(CUDF_BACKEND, reason="compute_hll_array doesn't work for cudf") +@xfail_gpu("compute_hll_array doesn't work for cudf") def test_nunique_approx(df): result = df.nunique_approx().compute() assert 99 < result < 101 @@ -1050,7 +1053,7 @@ def test_assign_simplify_series(pdf): assert result._name == expected._name -@pytest.mark.xfail(CUDF_BACKEND, reason="assign function not supported by cudf") +@xfail_gpu("assign function not supported by cudf") def test_assign_non_series_inputs(df, pdf): assert_eq(df.assign(a=lambda x: x.x * 2), pdf.assign(a=lambda x: x.x * 2)) assert_eq(df.assign(a=2), pdf.assign(a=2)) @@ -1081,7 +1084,7 @@ def test_are_co_aligned(pdf, df): assert not are_co_aligned(merged_first.expr, df.expr) -@pytest.mark.xfail(CUDF_BACKEND, reason="TODO") +@xfail_gpu() def test_astype_categories(df): result = df.astype("category") assert_eq(result.x._meta.cat.categories, lib.Index([UNKNOWN_CATEGORIES])) diff --git a/dask_expr/tests/test_concat.py b/dask_expr/tests/test_concat.py index 154376457..3abdc4659 100644 --- a/dask_expr/tests/test_concat.py +++ b/dask_expr/tests/test_concat.py @@ -1,14 +1,17 @@ import numpy as np -import pandas as pd import pytest from dask.dataframe import assert_eq from dask_expr import concat, from_pandas +from dask_expr.tests._util import _backend_library + +# Set DataFrame backend for this module +lib = _backend_library() @pytest.fixture def pdf(): - pdf = pd.DataFrame({"x": range(100)}) + pdf = lib.DataFrame({"x": range(100)}) pdf["y"] = pdf.x * 10.0 yield pdf @@ -26,14 +29,14 @@ def test_concat_str(df): def test_concat(pdf, df): result = concat([df, df]) - expected = pd.concat([pdf, pdf]) + expected = lib.concat([pdf, pdf]) assert_eq(result, expected) assert all(div is None for div in result.divisions) def test_concat_pdf(pdf, df): result = concat([df, pdf]) - expected = pd.concat([pdf, pdf]) + expected = lib.concat([pdf, pdf]) assert_eq(result, expected) assert all(div is None for div in result.divisions) @@ -42,7 +45,7 @@ def test_concat_divisions(pdf, df): pdf2 = pdf.set_index(np.arange(200, 300)) df2 = from_pandas(pdf2, npartitions=10) result = concat([df, df2]) - expected = pd.concat([pdf, pdf2]) + expected = lib.concat([pdf, pdf2]) assert_eq(result, expected) assert not any(div is None for div in result.divisions) @@ -62,14 +65,14 @@ def test_concat_invalid(): def test_concat_one_object(df, pdf): result = concat([df]) - expected = pd.concat([pdf]) + expected = lib.concat([pdf]) assert_eq(result, expected) assert not any(div is None for div in result.divisions) def test_concat_one_no_columns(df, pdf): result = concat([df, df[[]]]) - expected = pd.concat([pdf, pdf[[]]]) + expected = lib.concat([pdf, pdf[[]]]) assert_eq(result, expected) @@ -82,7 +85,7 @@ def test_concat_simplify(pdf, df): expected = concat([df[["x"]], df2[["x", "z"]]]).simplify()[["z", "x"]] assert result._name == expected._name - assert_eq(q, pd.concat([pdf, pdf2])[["z", "x"]]) + assert_eq(q, lib.concat([pdf, pdf2])[["z", "x"]]) def test_concat_simplify_projection_not_added(pdf, df): @@ -94,13 +97,13 @@ def test_concat_simplify_projection_not_added(pdf, df): expected = concat([df, df2[["x", "y"]]]).simplify()[["y", "x"]] assert result._name == expected._name - assert_eq(q, pd.concat([pdf, pdf2])[["y", "x"]]) + assert_eq(q, lib.concat([pdf, pdf2])[["y", "x"]]) def test_concat_axis_one_co_aligned(pdf, df): df2 = df.add_suffix("_2") pdf2 = pdf.add_suffix("_2") - assert_eq(concat([df, df2], axis=1), pd.concat([pdf, pdf2], axis=1)) + assert_eq(concat([df, df2], axis=1), lib.concat([pdf, pdf2], axis=1)) def test_concat_axis_one_all_divisions_unknown(pdf): @@ -109,10 +112,10 @@ def test_concat_axis_one_all_divisions_unknown(pdf): pdf2 = pdf.add_suffix("_2") df2 = from_pandas(pdf2, npartitions=2, sort=False) with pytest.warns(UserWarning): - assert_eq(concat([df, df2], axis=1), pd.concat([pdf, pdf2], axis=1)) + assert_eq(concat([df, df2], axis=1), lib.concat([pdf, pdf2], axis=1)) assert_eq( concat([df, df2], axis=1, ignore_unknown_divisions=True), - pd.concat([pdf, pdf2], axis=1), + lib.concat([pdf, pdf2], axis=1), ) @@ -124,4 +127,4 @@ def test_concat_axis_one_drop_dfs_not_selected(pdf, df): result = concat([df, df2, df3], axis=1)[["x", "y", "x_2"]].simplify() expected = concat([df, df2[["x_2"]]], axis=1).simplify() assert result._name == expected._name - assert_eq(result, pd.concat([pdf, pdf2, pdf3], axis=1)[["x", "y", "x_2"]]) + assert_eq(result, lib.concat([pdf, pdf2, pdf3], axis=1)[["x", "y", "x_2"]]) diff --git a/dask_expr/tests/test_fusion.py b/dask_expr/tests/test_fusion.py index c0f9cdc61..fcf3be0b6 100644 --- a/dask_expr/tests/test_fusion.py +++ b/dask_expr/tests/test_fusion.py @@ -1,13 +1,16 @@ -import pandas as pd import pytest from dask.dataframe.utils import assert_eq from dask_expr import from_pandas, optimize +from dask_expr.tests._util import _backend_library + +# Set DataFrame backend for this module +lib = _backend_library() @pytest.fixture def pdf(): - pdf = pd.DataFrame({"x": range(100)}) + pdf = lib.DataFrame({"x": range(100)}) pdf["y"] = pdf.x * 10.0 yield pdf @@ -45,8 +48,8 @@ def test_optimize_fusion_many(): # Test that many `Blockwise`` operations, # originating from various IO operations, # can all be fused together - a = from_pandas(pd.DataFrame({"x": range(100), "y": range(100)}), 10) - b = from_pandas(pd.DataFrame({"a": range(100)}), 10) + a = from_pandas(lib.DataFrame({"x": range(100), "y": range(100)}), 10) + b = from_pandas(lib.DataFrame({"a": range(100)}), 10) # some generic elemwise operations aa = a[["x"]] + 1 diff --git a/dask_expr/tests/test_groupby.py b/dask_expr/tests/test_groupby.py index f6ff21ec1..b761f5a5c 100644 --- a/dask_expr/tests/test_groupby.py +++ b/dask_expr/tests/test_groupby.py @@ -1,13 +1,16 @@ -import pandas as pd import pytest from dask.dataframe.utils import assert_eq from dask_expr import from_pandas +from dask_expr.tests._util import _backend_library, xfail_gpu + +# Set DataFrame backend for this module +lib = _backend_library() @pytest.fixture def pdf(): - pdf = pd.DataFrame({"x": list(range(10)) * 10, "y": range(100), "z": 1}) + pdf = lib.DataFrame({"x": list(range(10)) * 10, "y": range(100), "z": 1}) yield pdf @@ -24,7 +27,13 @@ def test_groupby_unsupported_by(pdf, df): @pytest.mark.parametrize( "api", ["sum", "mean", "min", "max", "prod", "first", "last", "var", "std"] ) -@pytest.mark.parametrize("numeric_only", [True, False]) +@pytest.mark.parametrize( + "numeric_only", + [ + pytest.param(True, marks=xfail_gpu("numeric_only not supported by cudf")), + False, + ], +) def test_groupby_numeric(pdf, df, api, numeric_only): if not numeric_only and api in {"var", "std"}: pytest.xfail("not implemented") @@ -41,7 +50,16 @@ def test_groupby_numeric(pdf, df, api, numeric_only): assert_eq(agg, expect) -@pytest.mark.parametrize("func", ["count", "value_counts", "size"]) +@pytest.mark.parametrize( + "func", + [ + "count", + pytest.param( + "value_counts", marks=xfail_gpu("value_counts not supported by cudf") + ), + "size", + ], +) def test_groupby_no_numeric_only(pdf, func): pdf = pdf.drop(columns="z") df = from_pandas(pdf, npartitions=10) @@ -67,7 +85,7 @@ def test_groupby_series(pdf, df): result = df.groupby("x").sum() assert_eq(result, pdf_result) - df2 = from_pandas(pd.DataFrame({"a": [1, 2, 3]})) + df2 = from_pandas(lib.DataFrame({"a": [1, 2, 3]})) with pytest.raises(ValueError, match="DataFrames columns"): df.groupby(df2.a) diff --git a/dask_expr/tests/test_merge.py b/dask_expr/tests/test_merge.py index 8b7ac408b..ceb5204bf 100644 --- a/dask_expr/tests/test_merge.py +++ b/dask_expr/tests/test_merge.py @@ -1,17 +1,20 @@ -import pandas as pd import pytest from dask.dataframe.utils import assert_eq from dask_expr import from_pandas +from dask_expr.tests._util import _backend_library + +# Set DataFrame backend for this module +lib = _backend_library() @pytest.mark.parametrize("how", ["left", "right", "inner", "outer"]) @pytest.mark.parametrize("shuffle_backend", ["tasks", "disk"]) def test_merge(how, shuffle_backend): # Make simple left & right dfs - pdf1 = pd.DataFrame({"x": range(20), "y": range(20)}) + pdf1 = lib.DataFrame({"x": range(20), "y": range(20)}) df1 = from_pandas(pdf1, 4) - pdf2 = pd.DataFrame({"x": range(0, 20, 2), "z": range(10)}) + pdf2 = lib.DataFrame({"x": range(0, 20, 2), "z": range(10)}) df2 = from_pandas(pdf2, 2) # Partition-wise merge with map_partitions @@ -29,9 +32,9 @@ def test_merge(how, shuffle_backend): @pytest.mark.parametrize("shuffle_backend", ["tasks", "disk"]) def test_merge_indexed(how, pass_name, sort, shuffle_backend): # Make simple left & right dfs - pdf1 = pd.DataFrame({"x": range(20), "y": range(20)}).set_index("x") + pdf1 = lib.DataFrame({"x": range(20), "y": range(20)}).set_index("x") df1 = from_pandas(pdf1, 4) - pdf2 = pd.DataFrame({"x": range(0, 20, 2), "z": range(10)}).set_index("x") + pdf2 = lib.DataFrame({"x": range(0, 20, 2), "z": range(10)}).set_index("x") df2 = from_pandas(pdf2, 2, sort=sort) if pass_name: @@ -67,9 +70,9 @@ def test_merge_indexed(how, pass_name, sort, shuffle_backend): @pytest.mark.parametrize("how", ["left", "right", "inner", "outer"]) def test_broadcast_merge(how): # Make simple left & right dfs - pdf1 = pd.DataFrame({"x": range(20), "y": range(20)}) + pdf1 = lib.DataFrame({"x": range(20), "y": range(20)}) df1 = from_pandas(pdf1, 4) - pdf2 = pd.DataFrame({"x": range(0, 20, 2), "z": range(10)}) + pdf2 = lib.DataFrame({"x": range(0, 20, 2), "z": range(10)}) df2 = from_pandas(pdf2, 1) df3 = df1.merge(df2, on="x", how=how) @@ -86,9 +89,9 @@ def test_broadcast_merge(how): def test_merge_column_projection(): # Make simple left & right dfs - pdf1 = pd.DataFrame({"x": range(20), "y": range(20), "z": range(20)}) + pdf1 = lib.DataFrame({"x": range(20), "y": range(20), "z": range(20)}) df1 = from_pandas(pdf1, 4) - pdf2 = pd.DataFrame({"x": range(0, 20, 2), "z": range(10)}) + pdf2 = lib.DataFrame({"x": range(0, 20, 2), "z": range(10)}) df2 = from_pandas(pdf2, 2) # Partition-wise merge with map_partitions @@ -101,9 +104,9 @@ def test_merge_column_projection(): @pytest.mark.parametrize("shuffle_backend", ["tasks", "disk"]) def test_join(how, shuffle_backend): # Make simple left & right dfs - pdf1 = pd.DataFrame({"x": range(20), "y": range(20)}) + pdf1 = lib.DataFrame({"x": range(20), "y": range(20)}) df1 = from_pandas(pdf1, 4) - pdf2 = pd.DataFrame({"z": range(10)}, index=pd.Index(range(10), name="a")) + pdf2 = lib.DataFrame({"z": range(10)}, index=lib.Index(range(10), name="a")) df2 = from_pandas(pdf2, 2) # Partition-wise merge with map_partitions @@ -120,15 +123,15 @@ def test_join(how, shuffle_backend): def test_join_recursive(): - pdf = pd.DataFrame({"x": [1, 2, 3], "y": 1}, index=pd.Index([1, 2, 3], name="a")) + pdf = lib.DataFrame({"x": [1, 2, 3], "y": 1}, index=lib.Index([1, 2, 3], name="a")) df = from_pandas(pdf, npartitions=2) - pdf2 = pd.DataFrame( - {"a": [1, 2, 3, 4, 5, 6], "b": 1}, index=pd.Index([1, 2, 3, 4, 5, 6], name="a") + pdf2 = lib.DataFrame( + {"a": [1, 2, 3, 4, 5, 6], "b": 1}, index=lib.Index([1, 2, 3, 4, 5, 6], name="a") ) df2 = from_pandas(pdf2, npartitions=2) - pdf3 = pd.DataFrame({"c": [1, 2, 3], "d": 1}, index=pd.Index([1, 2, 3], name="a")) + pdf3 = lib.DataFrame({"c": [1, 2, 3], "d": 1}, index=lib.Index([1, 2, 3], name="a")) df3 = from_pandas(pdf3, npartitions=2) result = df.join([df2, df3], how="outer") @@ -140,7 +143,7 @@ def test_join_recursive(): def test_join_recursive_raises(): - pdf = pd.DataFrame({"x": [1, 2, 3], "y": 1}, index=pd.Index([1, 2, 3], name="a")) + pdf = lib.DataFrame({"x": [1, 2, 3], "y": 1}, index=lib.Index([1, 2, 3], name="a")) df = from_pandas(pdf, npartitions=2) with pytest.raises(ValueError, match="other must be DataFrame"): df.join(["dummy"]) From 9a4b98c30a3a0ff4a79d34ed385aa19570a01eac Mon Sep 17 00:00:00 2001 From: rjzamora Date: Wed, 26 Jul 2023 11:10:05 -0700 Subject: [PATCH 2/6] finsh modifying all tests --- dask_expr/_expr.py | 5 ++++- dask_expr/_groupby.py | 13 ------------- dask_expr/tests/_util.py | 20 ++++++++++++++++++++ dask_expr/tests/test_quantiles.py | 11 +++++++---- dask_expr/tests/test_reshape.py | 8 +++----- dask_expr/tests/test_shuffle.py | 4 ++-- 6 files changed, 36 insertions(+), 25 deletions(-) create mode 100644 dask_expr/tests/_util.py diff --git a/dask_expr/_expr.py b/dask_expr/_expr.py index 8275b9516..8b5b5daff 100644 --- a/dask_expr/_expr.py +++ b/dask_expr/_expr.py @@ -154,7 +154,10 @@ def __getattr__(self, key): if key == "_meta": # Avoid a recursive loop if/when `self._meta` # produces an `AttributeError` - raise RuntimeError(f"Failed to generate metadata for {self}") + raise RuntimeError( + f"Failed to generate metadata for {self}. " + "This operation may not be supported by the current backend." + ) # Allow operands to be accessed as attributes # as long as the keys are not already reserved diff --git a/dask_expr/_groupby.py b/dask_expr/_groupby.py index 63714463a..dc113ff32 100644 --- a/dask_expr/_groupby.py +++ b/dask_expr/_groupby.py @@ -81,18 +81,6 @@ class SingleAggregation(ApplyConcatApply): groupby_chunk = None groupby_aggregate = None - _required_groupby_attribute = None - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - if self._required_groupby_attribute: - g = self.frame._meta.groupby(self.by) - if not hasattr(g, self._required_groupby_attribute): - # Raise a ValueError instead of AttributeError to - # avoid infinite recursion - raise ValueError( - f"{g} has no attribute {self._required_groupby_attribute}" - ) @classmethod def chunk(cls, df, by=None, **kwargs): @@ -286,7 +274,6 @@ class Size(SingleAggregation): class ValueCounts(SingleAggregation): groupby_chunk = staticmethod(_value_counts) groupby_aggregate = staticmethod(_value_counts_aggregate) - _required_groupby_attribute = "value_counts" class Var(Reduction): diff --git a/dask_expr/tests/_util.py b/dask_expr/tests/_util.py new file mode 100644 index 000000000..f2024e46a --- /dev/null +++ b/dask_expr/tests/_util.py @@ -0,0 +1,20 @@ +import importlib + +import pytest +from dask import config + + +def _backend_name() -> str: + return config.get("dataframe.backend", "pandas") + + +def _backend_library(): + return importlib.import_module(_backend_name()) + + +def xfail_gpu(reason=None, skip=False): + condition = _backend_name() == "cudf" + reason = reason or "Failure expected for cudf backend." + if not skip: + return pytest.mark.xfail(condition, reason=reason) + return pytest.mark.skipif(condition, reason=reason) diff --git a/dask_expr/tests/test_quantiles.py b/dask_expr/tests/test_quantiles.py index e60643462..4c63b603d 100644 --- a/dask_expr/tests/test_quantiles.py +++ b/dask_expr/tests/test_quantiles.py @@ -1,18 +1,21 @@ -import pandas as pd from dask.dataframe import assert_eq from dask_expr import from_pandas +from dask_expr.tests._util import _backend_library + +# Set DataFrame backend for this module +lib = _backend_library() def test_repartition_quantiles(): - pdf = pd.DataFrame({"a": [1, 2, 3, 4, 5, 15, 7, 8, 9, 10, 11], "d": 3}) + pdf = lib.DataFrame({"a": [1, 2, 3, 4, 5, 15, 7, 8, 9, 10, 11], "d": 3}) df = from_pandas(pdf, npartitions=5) result = df.a._repartition_quantiles(npartitions=5) - expected = pd.Series( + expected = lib.Series( [1, 1, 3, 7, 9, 15], index=[0, 0.2, 0.4, 0.6, 0.8, 1], name="a" ) assert_eq(result, expected) result = df.a._repartition_quantiles(npartitions=4) - expected = pd.Series([1, 2, 5, 8, 15], index=[0, 0.25, 0.5, 0.75, 1], name="a") + expected = lib.Series([1, 2, 5, 8, 15], index=[0, 0.25, 0.5, 0.75, 1], name="a") assert_eq(result, expected) diff --git a/dask_expr/tests/test_reshape.py b/dask_expr/tests/test_reshape.py index c8d4d37c6..682926ca5 100644 --- a/dask_expr/tests/test_reshape.py +++ b/dask_expr/tests/test_reshape.py @@ -1,13 +1,11 @@ -import importlib - -import dask import pytest from dask.dataframe import assert_eq from dask_expr import from_pandas +from dask_expr.tests._util import _backend_library -BACKEND = dask.config.get("dataframe.backend", "pandas") -lib = importlib.import_module(BACKEND) +# Set DataFrame backend for this module +lib = _backend_library() @pytest.fixture diff --git a/dask_expr/tests/test_shuffle.py b/dask_expr/tests/test_shuffle.py index 1e70973df..a2fbbd732 100644 --- a/dask_expr/tests/test_shuffle.py +++ b/dask_expr/tests/test_shuffle.py @@ -1,15 +1,15 @@ -import pandas as pd import pytest from dask.dataframe.utils import assert_eq from dask_expr import SetIndexBlockwise, from_pandas from dask_expr._expr import Blockwise from dask_expr.io import FromPandas +from dask_expr.tests._util import _backend_library @pytest.fixture def pdf(): - return pd.DataFrame({"x": list(range(20)) * 5, "y": range(100)}) + return _backend_library().DataFrame({"x": list(range(20)) * 5, "y": range(100)}) @pytest.fixture From 49b86b738506b6821f3881ab0b149eb2c82cd1fe Mon Sep 17 00:00:00 2001 From: rjzamora Date: Wed, 26 Jul 2023 11:16:01 -0700 Subject: [PATCH 3/6] roll back unnecessary change --- dask_expr/_reductions.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/dask_expr/_reductions.py b/dask_expr/_reductions.py index 0f37f9780..4f2868661 100644 --- a/dask_expr/_reductions.py +++ b/dask_expr/_reductions.py @@ -530,7 +530,6 @@ class IdxMin(Reduction): reduction_chunk = idxmaxmin_chunk reduction_combine = idxmaxmin_combine reduction_aggregate = idxmaxmin_agg - _required_attribute = "idxmin" _fn = "idxmin" @property @@ -548,7 +547,6 @@ def aggregate_kwargs(self): class IdxMax(IdxMin): - _required_attribute = "idxmax" _fn = "idxmax" From 42ecf35ee9bd11c2ad4775bce48b6950947bc802 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Wed, 26 Jul 2023 11:17:34 -0700 Subject: [PATCH 4/6] simplify xfail_gpu --- dask_expr/tests/_util.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/dask_expr/tests/_util.py b/dask_expr/tests/_util.py index f2024e46a..1343da74b 100644 --- a/dask_expr/tests/_util.py +++ b/dask_expr/tests/_util.py @@ -12,9 +12,7 @@ def _backend_library(): return importlib.import_module(_backend_name()) -def xfail_gpu(reason=None, skip=False): +def xfail_gpu(reason=None): condition = _backend_name() == "cudf" reason = reason or "Failure expected for cudf backend." - if not skip: - return pytest.mark.xfail(condition, reason=reason) - return pytest.mark.skipif(condition, reason=reason) + return pytest.mark.xfail(condition, reason=reason) From 8a04f8fe8f959e3f0f8ff9d0874e8fa05402ceee Mon Sep 17 00:00:00 2001 From: rjzamora Date: Wed, 26 Jul 2023 11:38:38 -0700 Subject: [PATCH 5/6] update test_shuffle.py --- dask_expr/tests/test_shuffle.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/dask_expr/tests/test_shuffle.py b/dask_expr/tests/test_shuffle.py index a2fbbd732..eafb4d75c 100644 --- a/dask_expr/tests/test_shuffle.py +++ b/dask_expr/tests/test_shuffle.py @@ -6,10 +6,13 @@ from dask_expr.io import FromPandas from dask_expr.tests._util import _backend_library +# Set DataFrame backend for this module +lib = _backend_library() + @pytest.fixture def pdf(): - return _backend_library().DataFrame({"x": list(range(20)) * 5, "y": range(100)}) + return lib.DataFrame({"x": list(range(20)) * 5, "y": range(100)}) @pytest.fixture From 10da6eed3ff797bb47059b46432cf47d9a2e6037 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Wed, 26 Jul 2023 12:14:21 -0700 Subject: [PATCH 6/6] make sure backend is imported --- dask_expr/_collection.py | 1 + 1 file changed, 1 insertion(+) diff --git a/dask_expr/_collection.py b/dask_expr/_collection.py index 810dad163..1ab753ed4 100644 --- a/dask_expr/_collection.py +++ b/dask_expr/_collection.py @@ -1051,6 +1051,7 @@ def new_collection(expr): """Create new collection from an expr""" meta = expr._meta + expr._name # Ensure backend is imported if is_dataframe_like(meta): return DataFrame(expr) elif is_series_like(meta):