Skip to content

Commit 20bf15f

Browse files
Remove use of HAS_GPU from dispatch functions (#244)
* Remove use of HAS_GPU from `dispatch` functions * Add cudf import to compat module * Correct return to raise for ValueError * Check for RangeIndex in `random_state` Used in the Target Encoding NVTabular Operator * Update {arange, array, zeros, and random_state} to handle base case * Update read_dispatch to handle string argument --------- Co-authored-by: Karl Higley <karlb@nvidia.com>
1 parent 0702d9a commit 20bf15f

File tree

2 files changed

+117
-68
lines changed

2 files changed

+117
-68
lines changed

merlin/core/compat.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,11 @@ def _get_gpu_count():
6161
except ImportError:
6262
cupy = None
6363

64+
try:
65+
import cudf
66+
except ImportError:
67+
cudf = None
68+
6469
try:
6570
import tensorflow
6671
from tensorflow.python.framework import ops as tf_ops

merlin/core/dispatch.py

Lines changed: 112 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -24,16 +24,16 @@
2424
import pyarrow as pa
2525
import pyarrow.parquet as pq
2626

27-
from merlin.core.compat import HAS_GPU
27+
# unused HAS_GPU import is here for backwards compatibility
28+
from merlin.core.compat import HAS_GPU # pylint: disable=unused-import # noqa: F401
29+
from merlin.core.compat import cudf
30+
from merlin.core.compat import cupy as cp
2831
from merlin.core.protocols import DataFrameLike, DictLike, SeriesLike
2932

30-
cudf = None
31-
cp = None
3233
rmm = None
3334

34-
if HAS_GPU:
35+
if cudf:
3536
try:
36-
import cudf # type: ignore[no-redef]
3737
import dask_cudf
3838
import rmm # type: ignore[no-redef]
3939
from cudf.core.column import as_column, build_column
@@ -48,10 +48,6 @@
4848
from cudf.utils.dtypes import is_string_dtype as cudf_is_string_dtype
4949
except ImportError:
5050
pass
51-
try:
52-
import cupy as cp # type: ignore[no-redef]
53-
except ImportError:
54-
pass
5551

5652
try:
5753
# Dask >= 2021.5.1
@@ -77,7 +73,7 @@ def inner2(*args, **kwargs):
7773
return inner1
7874

7975

80-
if HAS_GPU and cudf:
76+
if cudf:
8177
DataFrameType = Union[pd.DataFrame, cudf.DataFrame] # type: ignore
8278
SeriesType = Union[pd.Series, cudf.Series] # type: ignore
8379
else:
@@ -124,7 +120,7 @@ def create_merlin_dataset(df):
124120

125121
def read_parquet_metadata(path):
126122
"""Read parquet metadata from path"""
127-
if HAS_GPU:
123+
if cudf:
128124
return cudf.io.read_parquet_metadata(path)
129125
full_meta = pq.read_metadata(path)
130126
pf = pq.ParquetFile(path)
@@ -133,7 +129,7 @@ def read_parquet_metadata(path):
133129

134130
def get_lib():
135131
"""Dispatch to the appropriate library (cudf or pandas) for the current environment"""
136-
return cudf if HAS_GPU else pd
132+
return cudf or pd
137133

138134

139135
def reinitialize(managed_memory=False):
@@ -144,15 +140,15 @@ def reinitialize(managed_memory=False):
144140

145141
def random_uniform(size):
146142
"""Dispatch for numpy.random.RandomState"""
147-
if HAS_GPU:
143+
if cp:
148144
return cp.random.uniform(size=size)
149145
else:
150146
return np.random.uniform(size=size)
151147

152148

153149
def coo_matrix(data, row, col):
154150
"""Dispatch for scipy.sparse.coo_matrix"""
155-
if HAS_GPU:
151+
if cp:
156152
return cp.sparse.coo_matrix((data, row, col))
157153
else:
158154
import scipy
@@ -163,9 +159,9 @@ def coo_matrix(data, row, col):
163159
def is_dataframe_object(x):
164160
# Simple check if object is a cudf or pandas
165161
# DataFrame object
166-
if not HAS_GPU:
167-
return isinstance(x, pd.DataFrame)
168-
return isinstance(x, (cudf.DataFrame, pd.DataFrame))
162+
if cudf:
163+
return isinstance(x, (cudf.DataFrame, pd.DataFrame))
164+
return isinstance(x, pd.DataFrame)
169165

170166

171167
def nullable_series(data, like_df, dtype):
@@ -182,9 +178,9 @@ def nullable_series(data, like_df, dtype):
182178
def is_series_object(x):
183179
# Simple check if object is a cudf or pandas
184180
# Series object
185-
if not HAS_GPU:
186-
return isinstance(x, pd.Series)
187-
return isinstance(x, (cudf.Series, pd.Series))
181+
if cudf:
182+
return isinstance(x, (cudf.Series, pd.Series))
183+
return isinstance(x, pd.Series)
188184

189185

190186
def is_cpu_object(x):
@@ -217,50 +213,86 @@ def pd_convert_hex(x):
217213

218214
def random_state(seed, like_df=None):
219215
"""Dispatch for numpy.random.RandomState"""
220-
if not HAS_GPU or isinstance(like_df, (pd.DataFrame, pd.Series)):
216+
if like_df is None:
217+
return cp.random.RandomState(seed) if cp else np.random.RandomState(seed)
218+
elif isinstance(like_df, (pd.DataFrame, pd.Series, pd.RangeIndex)):
221219
return np.random.RandomState(seed)
222-
else:
220+
elif cudf and isinstance(like_df, (cudf.DataFrame, cudf.Series, cudf.RangeIndex)):
223221
return cp.random.RandomState(seed)
222+
else:
223+
raise ValueError(
224+
"Unsupported dataframe type: "
225+
f"{type(like_df)}"
226+
" Supported types: a DataFrame, Series, or RangeIndex (cudf or pandas)."
227+
)
224228

225229

226230
def arange(size, like_df=None, dtype=None):
227231
"""Dispatch for numpy.arange"""
228-
if not HAS_GPU or isinstance(like_df, (np.ndarray, pd.DataFrame, pd.Series)):
232+
if like_df is None:
233+
return cp.arange(size, dtype=dtype) if cp else np.arange(size, dtype=dtype)
234+
elif isinstance(like_df, (np.ndarray, pd.DataFrame, pd.Series, pd.RangeIndex)):
229235
return np.arange(size, dtype=dtype)
230-
else:
236+
elif cudf and isinstance(like_df, (cp.ndarray, cudf.DataFrame, cudf.Series, cudf.RangeIndex)):
231237
return cp.arange(size, dtype=dtype)
238+
else:
239+
raise ValueError(
240+
"Unsupported dataframe type: "
241+
f"{type(like_df)}"
242+
" Expected either a pandas or cudf DataFrame or Series."
243+
)
232244

233245

234246
def array(x, like_df=None, dtype=None):
235247
"""Dispatch for numpy.array"""
236-
if not HAS_GPU or isinstance(like_df, (np.ndarray, pd.DataFrame, pd.Series)):
248+
if like_df is None:
249+
return cp.array(x, dtype=dtype) if cp else np.array(x, dtype=dtype)
250+
elif isinstance(like_df, (np.ndarray, pd.DataFrame, pd.Series, pd.RangeIndex)):
237251
return np.array(x, dtype=dtype)
238-
else:
252+
elif cudf and isinstance(like_df, (cp.ndarray, cudf.DataFrame, cudf.Series, cudf.RangeIndex)):
239253
return cp.array(x, dtype=dtype)
254+
else:
255+
raise ValueError(
256+
"Unsupported dataframe type: "
257+
f"{type(like_df)}"
258+
" Expected either a pandas or cudf DataFrame or Series."
259+
)
240260

241261

242262
def zeros(size, like_df=None, dtype=None):
243263
"""Dispatch for numpy.array"""
244-
if not HAS_GPU or isinstance(like_df, (np.ndarray, pd.DataFrame, pd.Series)):
264+
if like_df is None:
265+
return cp.zeros(size, dtype=dtype) if cp else np.zeros(size, dtype=dtype)
266+
elif isinstance(like_df, (np.ndarray, pd.DataFrame, pd.Series, cudf.RangeIndex)):
245267
return np.zeros(size, dtype=dtype)
246-
else:
268+
elif cudf and isinstance(like_df, (cp.ndarray, cudf.DataFrame, cudf.Series, cudf.RangeIndex)):
247269
return cp.zeros(size, dtype=dtype)
270+
else:
271+
raise ValueError(
272+
"Unsupported dataframe type: "
273+
f"{type(like_df)}"
274+
" Expected either a pandas or cudf DataFrame or Series."
275+
)
248276

249277

250278
def hash_series(ser):
251279
"""Row-wise Series hash"""
252-
if not HAS_GPU or isinstance(ser, pd.Series):
280+
if isinstance(ser, pd.Series):
253281
# Using pandas hashing, which does not produce the
254282
# same result as cudf.Series.hash_values(). Do not
255283
# expect hash-based data transformations to be the
256284
# same on CPU and CPU. TODO: Fix this (maybe use
257285
# murmurhash3 manually on CPU).
258286
return hash_object_dispatch(ser).values
259-
else:
287+
elif cudf and isinstance(ser, cudf.Series):
260288
if is_list_dtype(ser):
261289
return ser.list.leaves.hash_values()
262290
else:
263291
return ser.hash_values()
292+
else:
293+
raise ValueError(
294+
"Unsupported series type: " f"{type(ser)}" " Expected either a pandas or cudf Series."
295+
)
264296

265297

266298
def series_has_nulls(s):
@@ -309,12 +341,11 @@ def is_list_dtype(ser):
309341
if not len(ser): # pylint: disable=len-as-condition
310342
return False
311343
return pd.api.types.is_list_like(ser.values[0])
312-
elif not HAS_GPU:
313-
# either np.ndarray or a dtype
314-
if isinstance(ser, np.ndarray):
315-
ser = ser[0]
316-
return pd.api.types.is_list_like(ser)
317-
return cudf_is_list_dtype(ser)
344+
elif cudf and isinstance(ser, cudf.Series):
345+
return cudf_is_list_dtype(ser)
346+
elif isinstance(ser, np.ndarray):
347+
return pd.api.types.is_list_like(ser[0])
348+
return pd.api.types.is_list_like(ser)
318349

319350

320351
def is_string_dtype(dtype: np.dtype) -> bool:
@@ -330,18 +361,21 @@ def is_string_dtype(dtype: np.dtype) -> bool:
330361
bool
331362
`True` if the dtype of `obj` is a string type
332363
"""
333-
if not HAS_GPU:
334-
return pd.api.types.is_string_dtype(dtype)
335-
else:
364+
if cudf:
336365
return cudf_is_string_dtype(dtype)
366+
return pd.api.types.is_string_dtype(dtype)
337367

338368

339369
def flatten_list_column_values(s):
340370
"""returns a flattened list from a list column"""
341-
if isinstance(s, pd.Series) or not cudf:
371+
if isinstance(s, pd.Series):
342372
return pd.Series(itertools.chain(*s))
343-
else:
373+
elif cudf and isinstance(s, cudf.Series):
344374
return s.list.leaves
375+
else:
376+
raise ValueError(
377+
"Unsupported series type: " f"{type(s)}" " Expected either a pandas or cudf Series."
378+
)
345379

346380

347381
def flatten_list_column(s):
@@ -380,21 +414,21 @@ def read_parquet_dispatch(df: DataFrameLike) -> Callable:
380414
return read_dispatch(df=df, fmt="parquet")
381415

382416

383-
def read_dispatch(df: DataFrameLike = None, cpu=None, collection=False, fmt="parquet") -> Callable:
417+
def read_dispatch(
418+
df: Union[DataFrameLike, str] = None, cpu=None, collection=False, fmt="parquet"
419+
) -> Callable:
384420
"""Return the necessary read_parquet function to generate
385421
data of a specified type.
386422
"""
387-
if cpu or isinstance(df, pd.DataFrame) or not HAS_GPU:
423+
if cpu or isinstance(df, pd.DataFrame):
388424
_mod = dd if collection else pd
425+
elif cudf and isinstance(df, cudf.DataFrame):
426+
_mod = dask_cudf if collection else cudf.io
389427
else:
390428
if collection:
391-
_mod = dask_cudf
392-
elif cudf is not None:
393-
_mod = cudf.io
429+
_mod = dask_cudf if cudf else dd
394430
else:
395-
raise ValueError(
396-
"Unable to load cudf. Please check your environment GPU and cudf available."
397-
)
431+
_mod = cudf.io if cudf else pd
398432
_attr = "read_csv" if fmt == "csv" else "read_parquet"
399433
return getattr(_mod, _attr)
400434

@@ -485,10 +519,16 @@ def concat(objs, **kwargs):
485519
"""dispatch function for concat"""
486520
if isinstance(objs[0], dd.DataFrame):
487521
return dd.multi.concat(objs)
488-
elif isinstance(objs[0], (pd.DataFrame, pd.Series)) or not HAS_GPU:
522+
elif isinstance(objs[0], (pd.DataFrame, pd.Series)):
489523
return pd.concat(objs, **kwargs)
490-
else:
524+
elif cudf and isinstance(objs[0], (cudf.DataFrame, cudf.Series)):
491525
return cudf.core.reshape.concat(objs, **kwargs)
526+
else:
527+
raise ValueError(
528+
"Unsupported dataframe type: "
529+
f"{type(objs[0])}"
530+
" Expected a pandas, cudf, or dask DataFrame."
531+
)
492532

493533

494534
def make_df(_like_df=None, device=None):
@@ -584,7 +624,7 @@ def convert_data(x, cpu=True, to_collection=None, npartitions=1):
584624
_x = x if isinstance(x, pd.DataFrame) else x.to_pandas()
585625
# Output a collection if `to_collection=True`
586626
return dd.from_pandas(_x, sort=False, npartitions=npartitions) if to_collection else _x
587-
else:
627+
elif cudf and dask_cudf:
588628
if isinstance(x, dd.DataFrame):
589629
# If input is a Dask collection, convert to dask_cudf
590630
if isinstance(x, dask_cudf.DataFrame):
@@ -607,26 +647,30 @@ def convert_data(x, cpu=True, to_collection=None, npartitions=1):
607647
if to_collection
608648
else _x
609649
)
650+
else:
651+
raise RuntimeError(
652+
"Unable to move data to GPU. "
653+
"cudf and dask_cudf are not available. "
654+
"Make sure these packages are installed and can be imported in this environment. "
655+
)
610656

611657

612658
def to_host(x):
613659
"""Move cudf.DataFrame to host memory for caching.
614660
615661
All other data will pass through unchanged.
616662
"""
617-
if not HAS_GPU or isinstance(x, (pd.DataFrame, dd.DataFrame)):
618-
return x
619-
else:
663+
if cudf and isinstance(x, cudf.DataFrame):
620664
return x.to_arrow()
665+
return x
621666

622667

623668
def from_host(x):
624-
if not HAS_GPU:
625-
return x
626-
elif isinstance(x, cudf.DataFrame):
627-
return x
628-
else:
669+
if isinstance(x, pd.DataFrame):
670+
return cudf.DataFrame.from_pandas(x)
671+
elif isinstance(x, pa.Table):
629672
return cudf.DataFrame.from_arrow(x)
673+
return x
630674

631675

632676
def build_cudf_list_column(new_elements, new_offsets):
@@ -645,14 +689,14 @@ def build_cudf_list_column(new_elements, new_offsets):
645689
cudf.Series
646690
The list column with corresponding elements and row_lengths as a series.
647691
"""
648-
if not HAS_GPU:
649-
return []
650-
return build_column(
651-
None,
652-
dtype=cudf.core.dtypes.ListDtype(new_elements.dtype),
653-
size=new_offsets.size - 1,
654-
children=(as_column(new_offsets), as_column(new_elements)),
655-
)
692+
if cudf:
693+
return build_column(
694+
None,
695+
dtype=cudf.core.dtypes.ListDtype(new_elements.dtype),
696+
size=new_offsets.size - 1,
697+
children=(as_column(new_offsets), as_column(new_elements)),
698+
)
699+
return []
656700

657701

658702
def build_pandas_list_column(elements, row_lengths):

0 commit comments

Comments
 (0)