Skip to content

Commit bab54d7

Browse files
Revert changes to dispatch.py. Now part of NVIDIA-Merlin#244
1 parent 2390db4 commit bab54d7

File tree

1 file changed

+60
-96
lines changed

1 file changed

+60
-96
lines changed

merlin/core/dispatch.py

Lines changed: 60 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -24,16 +24,16 @@
2424
import pyarrow as pa
2525
import pyarrow.parquet as pq
2626

27-
# unused HAS_GPU import is here for backwards compatibility
28-
from merlin.core.compat import HAS_GPU # pylint: disable=unused-import # noqa: F401
29-
from merlin.core.compat import cupy as cp
27+
from merlin.core.compat import HAS_GPU
3028
from merlin.core.protocols import DataFrameLike, DictLike, SeriesLike
3129

3230
cudf = None
31+
cp = None
3332
rmm = None
3433

35-
if cudf:
34+
if HAS_GPU:
3635
try:
36+
import cudf # type: ignore[no-redef]
3737
import dask_cudf
3838
import rmm # type: ignore[no-redef]
3939
from cudf.core.column import as_column, build_column
@@ -48,6 +48,10 @@
4848
from cudf.utils.dtypes import is_string_dtype as cudf_is_string_dtype
4949
except ImportError:
5050
pass
51+
try:
52+
import cupy as cp # type: ignore[no-redef]
53+
except ImportError:
54+
pass
5155

5256
try:
5357
# Dask >= 2021.5.1
@@ -73,7 +77,7 @@ def inner2(*args, **kwargs):
7377
return inner1
7478

7579

76-
if cudf:
80+
if HAS_GPU and cudf:
7781
DataFrameType = Union[pd.DataFrame, cudf.DataFrame] # type: ignore
7882
SeriesType = Union[pd.Series, cudf.Series] # type: ignore
7983
else:
@@ -120,7 +124,7 @@ def create_merlin_dataset(df):
120124

121125
def read_parquet_metadata(path):
122126
"""Read parquet metadata from path"""
123-
if cudf:
127+
if HAS_GPU:
124128
return cudf.io.read_parquet_metadata(path)
125129
full_meta = pq.read_metadata(path)
126130
pf = pq.ParquetFile(path)
@@ -129,7 +133,7 @@ def read_parquet_metadata(path):
129133

130134
def get_lib():
131135
"""Dispatch to the appropriate library (cudf or pandas) for the current environment"""
132-
return cudf or pd
136+
return cudf if HAS_GPU else pd
133137

134138

135139
def reinitialize(managed_memory=False):
@@ -140,15 +144,15 @@ def reinitialize(managed_memory=False):
140144

141145
def random_uniform(size):
142146
"""Dispatch for numpy.random.RandomState"""
143-
if cp:
147+
if HAS_GPU:
144148
return cp.random.uniform(size=size)
145149
else:
146150
return np.random.uniform(size=size)
147151

148152

149153
def coo_matrix(data, row, col):
150154
"""Dispatch for scipy.sparse.coo_matrix"""
151-
if cp:
155+
if HAS_GPU:
152156
return cp.sparse.coo_matrix((data, row, col))
153157
else:
154158
import scipy
@@ -159,9 +163,9 @@ def coo_matrix(data, row, col):
159163
def is_dataframe_object(x):
160164
# Simple check if object is a cudf or pandas
161165
# DataFrame object
162-
if cudf:
163-
return isinstance(x, (cudf.DataFrame, pd.DataFrame))
164-
return isinstance(x, pd.DataFrame)
166+
if not HAS_GPU:
167+
return isinstance(x, pd.DataFrame)
168+
return isinstance(x, (cudf.DataFrame, pd.DataFrame))
165169

166170

167171
def nullable_series(data, like_df, dtype):
@@ -178,9 +182,9 @@ def nullable_series(data, like_df, dtype):
178182
def is_series_object(x):
179183
# Simple check if object is a cudf or pandas
180184
# Series object
181-
if cudf:
182-
return isinstance(x, (cudf.Series, pd.Series))
183-
return isinstance(x, pd.Series)
185+
if not HAS_GPU:
186+
return isinstance(x, pd.Series)
187+
return isinstance(x, (cudf.Series, pd.Series))
184188

185189

186190
def is_cpu_object(x):
@@ -213,78 +217,50 @@ def pd_convert_hex(x):
213217

214218
def random_state(seed, like_df=None):
215219
"""Dispatch for numpy.random.RandomState"""
216-
if isinstance(like_df, (pd.DataFrame, pd.Series)):
220+
if not HAS_GPU or isinstance(like_df, (pd.DataFrame, pd.Series)):
217221
return np.random.RandomState(seed)
218-
elif cudf and isinstance(like_df, (cudf.DataFrame, cudf.Series)):
219-
return cp.random.RandomState(seed)
220222
else:
221-
return ValueError(
222-
"Unsupported dataframe type: "
223-
f"{type(like_df)}"
224-
" Expected either a pandas or cudf DataFrame or Series."
225-
)
223+
return cp.random.RandomState(seed)
226224

227225

228226
def arange(size, like_df=None, dtype=None):
229227
"""Dispatch for numpy.arange"""
230-
if isinstance(like_df, (np.ndarray, pd.DataFrame, pd.Series)):
228+
if not HAS_GPU or isinstance(like_df, (np.ndarray, pd.DataFrame, pd.Series)):
231229
return np.arange(size, dtype=dtype)
232-
elif cudf and isinstance(like_df, (cp.ndarray, cudf.DataFrame, cudf.Series)):
233-
return cp.arange(size, dtype=dtype)
234230
else:
235-
return ValueError(
236-
"Unsupported dataframe type: "
237-
f"{type(like_df)}"
238-
" Expected either a pandas or cudf DataFrame or Series."
239-
)
231+
return cp.arange(size, dtype=dtype)
240232

241233

242234
def array(x, like_df=None, dtype=None):
243235
"""Dispatch for numpy.array"""
244-
if isinstance(like_df, (np.ndarray, pd.DataFrame, pd.Series)):
236+
if not HAS_GPU or isinstance(like_df, (np.ndarray, pd.DataFrame, pd.Series)):
245237
return np.array(x, dtype=dtype)
246-
elif cudf and isinstance(like_df, (cp.ndarray, cudf.DataFrame, cudf.Series)):
247-
return cp.array(x, dtype=dtype)
248238
else:
249-
return ValueError(
250-
"Unsupported dataframe type: "
251-
f"{type(like_df)}"
252-
" Expected either a pandas or cudf DataFrame or Series."
253-
)
239+
return cp.array(x, dtype=dtype)
254240

255241

256242
def zeros(size, like_df=None, dtype=None):
257243
"""Dispatch for numpy.array"""
258-
if isinstance(like_df, (np.ndarray, pd.DataFrame, pd.Series)):
244+
if not HAS_GPU or isinstance(like_df, (np.ndarray, pd.DataFrame, pd.Series)):
259245
return np.zeros(size, dtype=dtype)
260-
elif cudf and isinstance(like_df, (cp.ndarray, cudf.DataFrame, cudf.Series)):
261-
return cp.zeros(size, dtype=dtype)
262246
else:
263-
return ValueError(
264-
"Unsupported dataframe type: "
265-
f"{type(like_df)}"
266-
" Expected either a pandas or cudf DataFrame or Series."
267-
)
247+
return cp.zeros(size, dtype=dtype)
268248

269249

270250
def hash_series(ser):
271251
"""Row-wise Series hash"""
272-
if isinstance(ser, pd.Series):
252+
if not HAS_GPU or isinstance(ser, pd.Series):
273253
# Using pandas hashing, which does not produce the
274254
# same result as cudf.Series.hash_values(). Do not
275255
# expect hash-based data transformations to be the
276256
# same on CPU and CPU. TODO: Fix this (maybe use
277257
# murmurhash3 manually on CPU).
278258
return hash_object_dispatch(ser).values
279-
elif cudf and isinstance(ser, cudf.Series):
259+
else:
280260
if is_list_dtype(ser):
281261
return ser.list.leaves.hash_values()
282262
else:
283263
return ser.hash_values()
284-
else:
285-
return ValueError(
286-
"Unsupported series type: " f"{type(ser)}" " Expected either a pandas or cudf Series."
287-
)
288264

289265

290266
def series_has_nulls(s):
@@ -333,11 +309,12 @@ def is_list_dtype(ser):
333309
if not len(ser): # pylint: disable=len-as-condition
334310
return False
335311
return pd.api.types.is_list_like(ser.values[0])
336-
elif cudf and isinstance(ser, cudf.Series):
337-
return cudf_is_list_dtype(ser)
338-
elif isinstance(ser, np.ndarray):
339-
return pd.api.types.is_list_like(ser[0])
340-
return pd.api.types.is_list_like(ser)
312+
elif not HAS_GPU:
313+
# either np.ndarray or a dtype
314+
if isinstance(ser, np.ndarray):
315+
ser = ser[0]
316+
return pd.api.types.is_list_like(ser)
317+
return cudf_is_list_dtype(ser)
341318

342319

343320
def is_string_dtype(dtype: np.dtype) -> bool:
@@ -353,21 +330,18 @@ def is_string_dtype(dtype: np.dtype) -> bool:
353330
bool
354331
`True` if the dtype of `obj` is a string type
355332
"""
356-
if cudf:
333+
if not HAS_GPU:
334+
return pd.api.types.is_string_dtype(dtype)
335+
else:
357336
return cudf_is_string_dtype(dtype)
358-
return pd.api.types.is_string_dtype(dtype)
359337

360338

361339
def flatten_list_column_values(s):
362340
"""returns a flattened list from a list column"""
363-
if isinstance(s, pd.Series):
341+
if isinstance(s, pd.Series) or not cudf:
364342
return pd.Series(itertools.chain(*s))
365-
elif cudf and isinstance(s, cudf.Series):
366-
return s.list.leaves
367343
else:
368-
return ValueError(
369-
"Unsupported series type: " f"{type(s)}" " Expected either a pandas or cudf Series."
370-
)
344+
return s.list.leaves
371345

372346

373347
def flatten_list_column(s):
@@ -410,7 +384,7 @@ def read_dispatch(df: DataFrameLike = None, cpu=None, collection=False, fmt="par
410384
"""Return the necessary read_parquet function to generate
411385
data of a specified type.
412386
"""
413-
if cpu or isinstance(df, pd.DataFrame):
387+
if cpu or isinstance(df, pd.DataFrame) or not HAS_GPU:
414388
_mod = dd if collection else pd
415389
else:
416390
if collection:
@@ -511,16 +485,10 @@ def concat(objs, **kwargs):
511485
"""dispatch function for concat"""
512486
if isinstance(objs[0], dd.DataFrame):
513487
return dd.multi.concat(objs)
514-
elif isinstance(objs[0], (pd.DataFrame, pd.Series)):
488+
elif isinstance(objs[0], (pd.DataFrame, pd.Series)) or not HAS_GPU:
515489
return pd.concat(objs, **kwargs)
516-
elif cudf and isinstance(objs[0], (cudf.DataFrame, cudf.Series)):
517-
return cudf.core.reshape.concat(objs, **kwargs)
518490
else:
519-
return ValueError(
520-
"Unsupported dataframe type: "
521-
f"{type(objs[0])}"
522-
" Expected a pandas, cudf, or dask DataFrame."
523-
)
491+
return cudf.core.reshape.concat(objs, **kwargs)
524492

525493

526494
def make_df(_like_df=None, device=None):
@@ -616,7 +584,7 @@ def convert_data(x, cpu=True, to_collection=None, npartitions=1):
616584
_x = x if isinstance(x, pd.DataFrame) else x.to_pandas()
617585
# Output a collection if `to_collection=True`
618586
return dd.from_pandas(_x, sort=False, npartitions=npartitions) if to_collection else _x
619-
elif cudf and dask_cudf:
587+
else:
620588
if isinstance(x, dd.DataFrame):
621589
# If input is a Dask collection, convert to dask_cudf
622590
if isinstance(x, dask_cudf.DataFrame):
@@ -639,30 +607,26 @@ def convert_data(x, cpu=True, to_collection=None, npartitions=1):
639607
if to_collection
640608
else _x
641609
)
642-
else:
643-
raise RuntimeError(
644-
"Unable to move data to GPU. "
645-
"cudf and dask_cudf are not available. "
646-
"Make sure these packages are installed and can be imported in this environment. "
647-
)
648610

649611

650612
def to_host(x):
651613
"""Move cudf.DataFrame to host memory for caching.
652614
653615
All other data will pass through unchanged.
654616
"""
655-
if cudf and isinstance(x, cudf.DataFrame):
617+
if not HAS_GPU or isinstance(x, (pd.DataFrame, dd.DataFrame)):
618+
return x
619+
else:
656620
return x.to_arrow()
657-
return x
658621

659622

660623
def from_host(x):
661-
if isinstance(x, pd.DataFrame):
662-
return cudf.DataFrame.from_pandas(x)
663-
elif isinstance(x, pa.Table):
624+
if not HAS_GPU:
625+
return x
626+
elif isinstance(x, cudf.DataFrame):
627+
return x
628+
else:
664629
return cudf.DataFrame.from_arrow(x)
665-
return x
666630

667631

668632
def build_cudf_list_column(new_elements, new_offsets):
@@ -681,14 +645,14 @@ def build_cudf_list_column(new_elements, new_offsets):
681645
cudf.Series
682646
The list column with corresponding elements and row_lengths as a series.
683647
"""
684-
if cudf:
685-
return build_column(
686-
None,
687-
dtype=cudf.core.dtypes.ListDtype(new_elements.dtype),
688-
size=new_offsets.size - 1,
689-
children=(as_column(new_offsets), as_column(new_elements)),
690-
)
691-
return []
648+
if not HAS_GPU:
649+
return []
650+
return build_column(
651+
None,
652+
dtype=cudf.core.dtypes.ListDtype(new_elements.dtype),
653+
size=new_offsets.size - 1,
654+
children=(as_column(new_offsets), as_column(new_elements)),
655+
)
692656

693657

694658
def build_pandas_list_column(elements, row_lengths):

0 commit comments

Comments
 (0)