Skip to content

Set HAS_GPU = False in dispatch if relevant packages fail to import#112

Merged
gabrielspmoreira merged 1 commit intoNVIDIA-Merlin:mainfrom
oliverholworthy:dispatch-has-gpu
Aug 3, 2022
Merged

Set HAS_GPU = False in dispatch if relevant packages fail to import#112
gabrielspmoreira merged 1 commit intoNVIDIA-Merlin:mainfrom
oliverholworthy:dispatch-has-gpu

Conversation

@oliverholworthy
Copy link
Contributor

Follow up to #99

Restore HAS_GPU in merlin.core.dispatch to include the requirement that relevant packages are installed for gpu usage {cudf, cupy, rmm, dask_cudf}. This is to avoid an error if you have a GPU available and don't have the required packages installed

File ~/.virtualenv/evalrs/lib/python3.8/site-packages/merlin/core/dispatch.py:79, in <module>
     75         return inner1
     78 if HAS_GPU:
---> 79     DataFrameType = Union[pd.DataFrame, cudf.DataFrame]  # type: ignore
     80     SeriesType = Union[pd.Series, cudf.Series]  # type: ignore
     81 else:

AttributeError: 'NoneType' object has no attribute 'DataFrame'
  • merlin.core.compat.HAS_GPU
    returns True if numba is installed and at least one GPU is visible to the process.
  • merlin.core.dispatch.HAS_GPU
    returns True if merlin.core.compat.HAS_GPU is True and the following packages are installed: {cudf, cupy, rmm, dask_cudf}

@oliverholworthy oliverholworthy added the bug Something isn't working label Aug 3, 2022
@nvidia-merlin-bot
Copy link

Click to view CI Results
GitHub pull request #112 of commit 78799c0a50e1a66e69731df00af7d6b70a2bf18f, no merge conflicts.
Running as SYSTEM
Setting status of 78799c0a50e1a66e69731df00af7d6b70a2bf18f to PENDING with url https://10.20.13.93:8080/job/merlin_core/90/console and message: 'Pending'
Using context: Jenkins
Building on master in workspace /var/jenkins_home/workspace/merlin_core
using credential ce87ff3c-94f0-400a-8303-cb4acb4918b5
 > git rev-parse --is-inside-work-tree # timeout=10
Fetching changes from the remote Git repository
 > git config remote.origin.url https://github.com/NVIDIA-Merlin/core # timeout=10
Fetching upstream changes from https://github.com/NVIDIA-Merlin/core
 > git --version # timeout=10
using GIT_ASKPASS to set credentials login for merlin-systems username and pass
 > git fetch --tags --force --progress -- https://github.com/NVIDIA-Merlin/core +refs/pull/112/*:refs/remotes/origin/pr/112/* # timeout=10
 > git rev-parse 78799c0a50e1a66e69731df00af7d6b70a2bf18f^{commit} # timeout=10
Checking out Revision 78799c0a50e1a66e69731df00af7d6b70a2bf18f (detached)
 > git config core.sparsecheckout # timeout=10
 > git checkout -f 78799c0a50e1a66e69731df00af7d6b70a2bf18f # timeout=10
Commit message: "Set `HAS_GPU = False` in `dispatch` if relevant pkgs fail to import"
 > git rev-list --no-walk 4516dbfa2b45a8580bbc964afe05e09e5c5a54b5 # timeout=10
[merlin_core] $ /bin/bash /tmp/jenkins3318522559532991450.sh
============================= test session starts ==============================
platform linux -- Python 3.8.10, pytest-7.1.2, pluggy-1.0.0
rootdir: /var/jenkins_home/workspace/merlin_core/core, configfile: pyproject.toml
plugins: anyio-3.6.1, xdist-2.5.0, forked-1.4.0, cov-3.0.0
collected 343 items / 1 skipped

tests/unit/core/test_dispatch.py .. [ 0%]
tests/unit/dag/test_base_operator.py .... [ 1%]
tests/unit/dag/test_column_selector.py .......................... [ 9%]
tests/unit/dag/test_graph.py . [ 9%]
tests/unit/dag/test_tags.py ...... [ 11%]
tests/unit/dag/ops/test_selection.py ... [ 12%]
tests/unit/io/test_io.py ..................................FFFF......... [ 25%]
................................................................ [ 44%]
tests/unit/schema/test_column_schemas.py ............................... [ 53%]
........................................................................ [ 74%]
....................................................................... [ 95%]
tests/unit/schema/test_schema.py ...... [ 97%]
tests/unit/schema/test_schema_io.py .. [ 97%]
tests/unit/utils/test_utils.py ........ [100%]

=================================== FAILURES ===================================
_________________ test_dask_dataset_from_dataframe[True-cudf] __________________

tmpdir = local('/tmp/pytest-of-jenkins/pytest-12/test_dask_dataset_from_datafra4')
origin = 'cudf', cpu = True

@pytest.mark.parametrize("origin", ["cudf", "dask_cudf", "pd", "dd"])
@pytest.mark.parametrize("cpu", [None, True])
def test_dask_dataset_from_dataframe(tmpdir, origin, cpu):

    # Generate a DataFrame-based input
    if origin in ("pd", "dd"):
        df = pd.DataFrame({"a": range(100)})
        if origin == "dd":
            df = dask.dataframe.from_pandas(df, npartitions=4)
    elif origin in ("cudf", "dask_cudf"):
        df = cudf.DataFrame({"a": range(100)})
        if origin == "dask_cudf":
            df = dask_cudf.from_cudf(df, npartitions=4)

    # Convert to an NVTabular Dataset and back to a ddf
    dataset = merlin.io.Dataset(df, cpu=cpu)
    result = dataset.to_ddf()

    # Check resulting data
    assert_eq(df, result)

    # Check that the cpu kwarg is working correctly
    if cpu:
        assert isinstance(result.compute(), pd.DataFrame)

        # Should still work if we move to the GPU
        # (test behavior after repetitive conversion)
        dataset.to_gpu()
        dataset.to_cpu()
        dataset.to_cpu()
        dataset.to_gpu()
        result = dataset.to_ddf()
        assert isinstance(result.compute(), cudf.DataFrame)
        dataset.to_cpu()
    else:
        assert isinstance(result.compute(), cudf.DataFrame)

        # Should still work if we move to the CPU
        # (test behavior after repetitive conversion)
        dataset.to_cpu()
        dataset.to_gpu()
        dataset.to_gpu()
        dataset.to_cpu()
        result = dataset.to_ddf()
        assert isinstance(result.compute(), pd.DataFrame)
        dataset.to_gpu()

    # Write to disk and read back
    path = str(tmpdir)
    dataset.to_parquet(path, out_files_per_proc=1, shuffle=None)
  ddf_check = dask_cudf.read_parquet(path).compute()

tests/unit/io/test_io.py:290:


/usr/local/lib/python3.8/dist-packages/dask/base.py:288: in compute
(result,) = compute(self, traverse=False, **kwargs)
/usr/local/lib/python3.8/dist-packages/dask/base.py:571: in compute
results = schedule(dsk, keys, **kwargs)
/usr/local/lib/python3.8/dist-packages/dask/local.py:553: in get_sync
return get_async(
/usr/local/lib/python3.8/dist-packages/dask/local.py:496: in get_async
for key, res_info, failed in queue_get(queue).result():
/usr/lib/python3.8/concurrent/futures/_base.py:437: in result
return self.__get_result()
/usr/lib/python3.8/concurrent/futures/_base.py:389: in __get_result
raise self._exception
/usr/local/lib/python3.8/dist-packages/dask/local.py:538: in submit
fut.set_result(fn(*args, **kwargs))
/usr/local/lib/python3.8/dist-packages/dask/local.py:234: in batch_execute_tasks
return [execute_task(a) for a in it]
/usr/local/lib/python3.8/dist-packages/dask/local.py:234: in
return [execute_task(a) for a in it]
/usr/local/lib/python3.8/dist-packages/dask/local.py:225: in execute_task
result = pack_exception(e, dumps)
/usr/local/lib/python3.8/dist-packages/dask/local.py:220: in execute_task
result = _execute_task(task, data)
/usr/local/lib/python3.8/dist-packages/dask/core.py:119: in _execute_task
return func(
(_execute_task(a, cache) for a in args))
/usr/local/lib/python3.8/dist-packages/dask/optimization.py:969: in call
return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
/usr/local/lib/python3.8/dist-packages/dask/core.py:149: in get
result = _execute_task(task, cache)
/usr/local/lib/python3.8/dist-packages/dask/core.py:119: in _execute_task
return func(
(_execute_task(a, cache) for a in args))
/usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/core.py:87: in call
return read_parquet_part(
/usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/core.py:431: in read_parquet_part
dfs = [
/usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/core.py:432: in
func(fs, rg, columns.copy(), index, **toolz.merge(kwargs, kw))
/usr/local/lib/python3.8/dist-packages/dask_cudf/io/parquet.py:216: in read_partition
cls._read_paths(
/usr/local/lib/python3.8/dist-packages/dask_cudf/io/parquet.py:92: in _read_paths
df = cudf.read_parquet(
/usr/local/lib/python3.8/dist-packages/nvtx/nvtx.py:101: in inner
result = func(*args, **kwargs)
/usr/local/lib/python3.8/dist-packages/cudf/io/parquet.py:379: in read_parquet
) = _process_dataset(
/usr/local/lib/python3.8/dist-packages/nvtx/nvtx.py:101: in inner
result = func(*args, **kwargs)
/usr/local/lib/python3.8/dist-packages/cudf/io/parquet.py:205: in _process_dataset
dataset = ds.dataset(
/usr/local/lib/python3.8/dist-packages/pyarrow/dataset.py:683: in dataset
return _filesystem_dataset(source, **kwargs)
/usr/local/lib/python3.8/dist-packages/pyarrow/dataset.py:435: in _filesystem_dataset
return factory.finish(schema)
pyarrow/_dataset.pyx:2473: in pyarrow._dataset.DatasetFactory.finish
???
pyarrow/error.pxi:143: in pyarrow.lib.pyarrow_internal_check_status
???


???
E pyarrow.lib.ArrowInvalid: Error creating dataset. Could not read schema from '/tmp/pytest-of-jenkins/pytest-12/test_dask_dataset_from_datafra4/part_0.parquet': Could not open Parquet input source '/tmp/pytest-of-jenkins/pytest-12/test_dask_dataset_from_datafra4/part_0.parquet': Parquet file size is 0 bytes. Is this a 'parquet' file?

pyarrow/error.pxi:99: ArrowInvalid
_______________ test_dask_dataset_from_dataframe[True-dask_cudf] _______________

tmpdir = local('/tmp/pytest-of-jenkins/pytest-12/test_dask_dataset_from_datafra5')
origin = 'dask_cudf', cpu = True

@pytest.mark.parametrize("origin", ["cudf", "dask_cudf", "pd", "dd"])
@pytest.mark.parametrize("cpu", [None, True])
def test_dask_dataset_from_dataframe(tmpdir, origin, cpu):

    # Generate a DataFrame-based input
    if origin in ("pd", "dd"):
        df = pd.DataFrame({"a": range(100)})
        if origin == "dd":
            df = dask.dataframe.from_pandas(df, npartitions=4)
    elif origin in ("cudf", "dask_cudf"):
        df = cudf.DataFrame({"a": range(100)})
        if origin == "dask_cudf":
            df = dask_cudf.from_cudf(df, npartitions=4)

    # Convert to an NVTabular Dataset and back to a ddf
    dataset = merlin.io.Dataset(df, cpu=cpu)
    result = dataset.to_ddf()

    # Check resulting data
    assert_eq(df, result)

    # Check that the cpu kwarg is working correctly
    if cpu:
        assert isinstance(result.compute(), pd.DataFrame)

        # Should still work if we move to the GPU
        # (test behavior after repetitive conversion)
        dataset.to_gpu()
        dataset.to_cpu()
        dataset.to_cpu()
        dataset.to_gpu()
        result = dataset.to_ddf()
        assert isinstance(result.compute(), cudf.DataFrame)
        dataset.to_cpu()
    else:
        assert isinstance(result.compute(), cudf.DataFrame)

        # Should still work if we move to the CPU
        # (test behavior after repetitive conversion)
        dataset.to_cpu()
        dataset.to_gpu()
        dataset.to_gpu()
        dataset.to_cpu()
        result = dataset.to_ddf()
        assert isinstance(result.compute(), pd.DataFrame)
        dataset.to_gpu()

    # Write to disk and read back
    path = str(tmpdir)
    dataset.to_parquet(path, out_files_per_proc=1, shuffle=None)
  ddf_check = dask_cudf.read_parquet(path).compute()

tests/unit/io/test_io.py:290:


/usr/local/lib/python3.8/dist-packages/dask/base.py:288: in compute
(result,) = compute(self, traverse=False, **kwargs)
/usr/local/lib/python3.8/dist-packages/dask/base.py:571: in compute
results = schedule(dsk, keys, **kwargs)
/usr/local/lib/python3.8/dist-packages/dask/local.py:553: in get_sync
return get_async(
/usr/local/lib/python3.8/dist-packages/dask/local.py:496: in get_async
for key, res_info, failed in queue_get(queue).result():
/usr/lib/python3.8/concurrent/futures/_base.py:437: in result
return self.__get_result()
/usr/lib/python3.8/concurrent/futures/_base.py:389: in __get_result
raise self._exception
/usr/local/lib/python3.8/dist-packages/dask/local.py:538: in submit
fut.set_result(fn(*args, **kwargs))
/usr/local/lib/python3.8/dist-packages/dask/local.py:234: in batch_execute_tasks
return [execute_task(a) for a in it]
/usr/local/lib/python3.8/dist-packages/dask/local.py:234: in
return [execute_task(a) for a in it]
/usr/local/lib/python3.8/dist-packages/dask/local.py:225: in execute_task
result = pack_exception(e, dumps)
/usr/local/lib/python3.8/dist-packages/dask/local.py:220: in execute_task
result = _execute_task(task, data)
/usr/local/lib/python3.8/dist-packages/dask/core.py:119: in _execute_task
return func(
(_execute_task(a, cache) for a in args))
/usr/local/lib/python3.8/dist-packages/dask/optimization.py:969: in call
return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
/usr/local/lib/python3.8/dist-packages/dask/core.py:149: in get
result = _execute_task(task, cache)
/usr/local/lib/python3.8/dist-packages/dask/core.py:119: in _execute_task
return func(
(_execute_task(a, cache) for a in args))
/usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/core.py:87: in call
return read_parquet_part(
/usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/core.py:431: in read_parquet_part
dfs = [
/usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/core.py:432: in
func(fs, rg, columns.copy(), index, **toolz.merge(kwargs, kw))
/usr/local/lib/python3.8/dist-packages/dask_cudf/io/parquet.py:216: in read_partition
cls._read_paths(
/usr/local/lib/python3.8/dist-packages/dask_cudf/io/parquet.py:92: in _read_paths
df = cudf.read_parquet(
/usr/local/lib/python3.8/dist-packages/nvtx/nvtx.py:101: in inner
result = func(*args, **kwargs)
/usr/local/lib/python3.8/dist-packages/cudf/io/parquet.py:379: in read_parquet
) = _process_dataset(
/usr/local/lib/python3.8/dist-packages/nvtx/nvtx.py:101: in inner
result = func(*args, **kwargs)
/usr/local/lib/python3.8/dist-packages/cudf/io/parquet.py:205: in _process_dataset
dataset = ds.dataset(
/usr/local/lib/python3.8/dist-packages/pyarrow/dataset.py:683: in dataset
return _filesystem_dataset(source, **kwargs)
/usr/local/lib/python3.8/dist-packages/pyarrow/dataset.py:435: in _filesystem_dataset
return factory.finish(schema)
pyarrow/_dataset.pyx:2473: in pyarrow._dataset.DatasetFactory.finish
???
pyarrow/error.pxi:143: in pyarrow.lib.pyarrow_internal_check_status
???


???
E pyarrow.lib.ArrowInvalid: Error creating dataset. Could not read schema from '/tmp/pytest-of-jenkins/pytest-12/test_dask_dataset_from_datafra5/part_0.parquet': Could not open Parquet input source '/tmp/pytest-of-jenkins/pytest-12/test_dask_dataset_from_datafra5/part_0.parquet': Parquet file size is 0 bytes. Is this a 'parquet' file?

pyarrow/error.pxi:99: ArrowInvalid
__________________ test_dask_dataset_from_dataframe[True-pd] ___________________

tmpdir = local('/tmp/pytest-of-jenkins/pytest-12/test_dask_dataset_from_datafra6')
origin = 'pd', cpu = True

@pytest.mark.parametrize("origin", ["cudf", "dask_cudf", "pd", "dd"])
@pytest.mark.parametrize("cpu", [None, True])
def test_dask_dataset_from_dataframe(tmpdir, origin, cpu):

    # Generate a DataFrame-based input
    if origin in ("pd", "dd"):
        df = pd.DataFrame({"a": range(100)})
        if origin == "dd":
            df = dask.dataframe.from_pandas(df, npartitions=4)
    elif origin in ("cudf", "dask_cudf"):
        df = cudf.DataFrame({"a": range(100)})
        if origin == "dask_cudf":
            df = dask_cudf.from_cudf(df, npartitions=4)

    # Convert to an NVTabular Dataset and back to a ddf
    dataset = merlin.io.Dataset(df, cpu=cpu)
    result = dataset.to_ddf()

    # Check resulting data
    assert_eq(df, result)

    # Check that the cpu kwarg is working correctly
    if cpu:
        assert isinstance(result.compute(), pd.DataFrame)

        # Should still work if we move to the GPU
        # (test behavior after repetitive conversion)
        dataset.to_gpu()
        dataset.to_cpu()
        dataset.to_cpu()
        dataset.to_gpu()
        result = dataset.to_ddf()
        assert isinstance(result.compute(), cudf.DataFrame)
        dataset.to_cpu()
    else:
        assert isinstance(result.compute(), cudf.DataFrame)

        # Should still work if we move to the CPU
        # (test behavior after repetitive conversion)
        dataset.to_cpu()
        dataset.to_gpu()
        dataset.to_gpu()
        dataset.to_cpu()
        result = dataset.to_ddf()
        assert isinstance(result.compute(), pd.DataFrame)
        dataset.to_gpu()

    # Write to disk and read back
    path = str(tmpdir)
    dataset.to_parquet(path, out_files_per_proc=1, shuffle=None)
  ddf_check = dask_cudf.read_parquet(path).compute()

tests/unit/io/test_io.py:290:


/usr/local/lib/python3.8/dist-packages/dask/base.py:288: in compute
(result,) = compute(self, traverse=False, **kwargs)
/usr/local/lib/python3.8/dist-packages/dask/base.py:571: in compute
results = schedule(dsk, keys, **kwargs)
/usr/local/lib/python3.8/dist-packages/dask/local.py:553: in get_sync
return get_async(
/usr/local/lib/python3.8/dist-packages/dask/local.py:496: in get_async
for key, res_info, failed in queue_get(queue).result():
/usr/lib/python3.8/concurrent/futures/_base.py:437: in result
return self.__get_result()
/usr/lib/python3.8/concurrent/futures/_base.py:389: in __get_result
raise self._exception
/usr/local/lib/python3.8/dist-packages/dask/local.py:538: in submit
fut.set_result(fn(*args, **kwargs))
/usr/local/lib/python3.8/dist-packages/dask/local.py:234: in batch_execute_tasks
return [execute_task(a) for a in it]
/usr/local/lib/python3.8/dist-packages/dask/local.py:234: in
return [execute_task(a) for a in it]
/usr/local/lib/python3.8/dist-packages/dask/local.py:225: in execute_task
result = pack_exception(e, dumps)
/usr/local/lib/python3.8/dist-packages/dask/local.py:220: in execute_task
result = _execute_task(task, data)
/usr/local/lib/python3.8/dist-packages/dask/core.py:119: in _execute_task
return func(
(_execute_task(a, cache) for a in args))
/usr/local/lib/python3.8/dist-packages/dask/optimization.py:969: in call
return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
/usr/local/lib/python3.8/dist-packages/dask/core.py:149: in get
result = _execute_task(task, cache)
/usr/local/lib/python3.8/dist-packages/dask/core.py:119: in _execute_task
return func(
(_execute_task(a, cache) for a in args))
/usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/core.py:87: in call
return read_parquet_part(
/usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/core.py:431: in read_parquet_part
dfs = [
/usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/core.py:432: in
func(fs, rg, columns.copy(), index, **toolz.merge(kwargs, kw))
/usr/local/lib/python3.8/dist-packages/dask_cudf/io/parquet.py:216: in read_partition
cls._read_paths(
/usr/local/lib/python3.8/dist-packages/dask_cudf/io/parquet.py:92: in _read_paths
df = cudf.read_parquet(
/usr/local/lib/python3.8/dist-packages/nvtx/nvtx.py:101: in inner
result = func(*args, **kwargs)
/usr/local/lib/python3.8/dist-packages/cudf/io/parquet.py:379: in read_parquet
) = _process_dataset(
/usr/local/lib/python3.8/dist-packages/nvtx/nvtx.py:101: in inner
result = func(*args, **kwargs)
/usr/local/lib/python3.8/dist-packages/cudf/io/parquet.py:205: in _process_dataset
dataset = ds.dataset(
/usr/local/lib/python3.8/dist-packages/pyarrow/dataset.py:683: in dataset
return _filesystem_dataset(source, **kwargs)
/usr/local/lib/python3.8/dist-packages/pyarrow/dataset.py:435: in _filesystem_dataset
return factory.finish(schema)
pyarrow/_dataset.pyx:2473: in pyarrow._dataset.DatasetFactory.finish
???
pyarrow/error.pxi:143: in pyarrow.lib.pyarrow_internal_check_status
???


???
E pyarrow.lib.ArrowInvalid: Error creating dataset. Could not read schema from '/tmp/pytest-of-jenkins/pytest-12/test_dask_dataset_from_datafra6/part_0.parquet': Could not open Parquet input source '/tmp/pytest-of-jenkins/pytest-12/test_dask_dataset_from_datafra6/part_0.parquet': Parquet file size is 0 bytes. Is this a 'parquet' file?

pyarrow/error.pxi:99: ArrowInvalid
__________________ test_dask_dataset_from_dataframe[True-dd] ___________________

tmpdir = local('/tmp/pytest-of-jenkins/pytest-12/test_dask_dataset_from_datafra7')
origin = 'dd', cpu = True

@pytest.mark.parametrize("origin", ["cudf", "dask_cudf", "pd", "dd"])
@pytest.mark.parametrize("cpu", [None, True])
def test_dask_dataset_from_dataframe(tmpdir, origin, cpu):

    # Generate a DataFrame-based input
    if origin in ("pd", "dd"):
        df = pd.DataFrame({"a": range(100)})
        if origin == "dd":
            df = dask.dataframe.from_pandas(df, npartitions=4)
    elif origin in ("cudf", "dask_cudf"):
        df = cudf.DataFrame({"a": range(100)})
        if origin == "dask_cudf":
            df = dask_cudf.from_cudf(df, npartitions=4)

    # Convert to an NVTabular Dataset and back to a ddf
    dataset = merlin.io.Dataset(df, cpu=cpu)
    result = dataset.to_ddf()

    # Check resulting data
    assert_eq(df, result)

    # Check that the cpu kwarg is working correctly
    if cpu:
        assert isinstance(result.compute(), pd.DataFrame)

        # Should still work if we move to the GPU
        # (test behavior after repetitive conversion)
        dataset.to_gpu()
        dataset.to_cpu()
        dataset.to_cpu()
        dataset.to_gpu()
        result = dataset.to_ddf()
        assert isinstance(result.compute(), cudf.DataFrame)
        dataset.to_cpu()
    else:
        assert isinstance(result.compute(), cudf.DataFrame)

        # Should still work if we move to the CPU
        # (test behavior after repetitive conversion)
        dataset.to_cpu()
        dataset.to_gpu()
        dataset.to_gpu()
        dataset.to_cpu()
        result = dataset.to_ddf()
        assert isinstance(result.compute(), pd.DataFrame)
        dataset.to_gpu()

    # Write to disk and read back
    path = str(tmpdir)
    dataset.to_parquet(path, out_files_per_proc=1, shuffle=None)
  ddf_check = dask_cudf.read_parquet(path).compute()

tests/unit/io/test_io.py:290:


/usr/local/lib/python3.8/dist-packages/dask/base.py:288: in compute
(result,) = compute(self, traverse=False, **kwargs)
/usr/local/lib/python3.8/dist-packages/dask/base.py:571: in compute
results = schedule(dsk, keys, **kwargs)
/usr/local/lib/python3.8/dist-packages/dask/local.py:553: in get_sync
return get_async(
/usr/local/lib/python3.8/dist-packages/dask/local.py:496: in get_async
for key, res_info, failed in queue_get(queue).result():
/usr/lib/python3.8/concurrent/futures/_base.py:437: in result
return self.__get_result()
/usr/lib/python3.8/concurrent/futures/_base.py:389: in __get_result
raise self._exception
/usr/local/lib/python3.8/dist-packages/dask/local.py:538: in submit
fut.set_result(fn(*args, **kwargs))
/usr/local/lib/python3.8/dist-packages/dask/local.py:234: in batch_execute_tasks
return [execute_task(a) for a in it]
/usr/local/lib/python3.8/dist-packages/dask/local.py:234: in
return [execute_task(a) for a in it]
/usr/local/lib/python3.8/dist-packages/dask/local.py:225: in execute_task
result = pack_exception(e, dumps)
/usr/local/lib/python3.8/dist-packages/dask/local.py:220: in execute_task
result = _execute_task(task, data)
/usr/local/lib/python3.8/dist-packages/dask/core.py:119: in _execute_task
return func(
(_execute_task(a, cache) for a in args))
/usr/local/lib/python3.8/dist-packages/dask/optimization.py:969: in call
return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
/usr/local/lib/python3.8/dist-packages/dask/core.py:149: in get
result = _execute_task(task, cache)
/usr/local/lib/python3.8/dist-packages/dask/core.py:119: in _execute_task
return func(
(_execute_task(a, cache) for a in args))
/usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/core.py:87: in call
return read_parquet_part(
/usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/core.py:431: in read_parquet_part
dfs = [
/usr/local/lib/python3.8/dist-packages/dask/dataframe/io/parquet/core.py:432: in
func(fs, rg, columns.copy(), index, **toolz.merge(kwargs, kw))
/usr/local/lib/python3.8/dist-packages/dask_cudf/io/parquet.py:216: in read_partition
cls._read_paths(
/usr/local/lib/python3.8/dist-packages/dask_cudf/io/parquet.py:92: in _read_paths
df = cudf.read_parquet(
/usr/local/lib/python3.8/dist-packages/nvtx/nvtx.py:101: in inner
result = func(*args, **kwargs)
/usr/local/lib/python3.8/dist-packages/cudf/io/parquet.py:379: in read_parquet
) = _process_dataset(
/usr/local/lib/python3.8/dist-packages/nvtx/nvtx.py:101: in inner
result = func(*args, **kwargs)
/usr/local/lib/python3.8/dist-packages/cudf/io/parquet.py:205: in _process_dataset
dataset = ds.dataset(
/usr/local/lib/python3.8/dist-packages/pyarrow/dataset.py:683: in dataset
return _filesystem_dataset(source, **kwargs)
/usr/local/lib/python3.8/dist-packages/pyarrow/dataset.py:435: in _filesystem_dataset
return factory.finish(schema)
pyarrow/_dataset.pyx:2473: in pyarrow._dataset.DatasetFactory.finish
???
pyarrow/error.pxi:143: in pyarrow.lib.pyarrow_internal_check_status
???


???
E pyarrow.lib.ArrowInvalid: Error creating dataset. Could not read schema from '/tmp/pytest-of-jenkins/pytest-12/test_dask_dataset_from_datafra7/part_0.parquet': Could not open Parquet input source '/tmp/pytest-of-jenkins/pytest-12/test_dask_dataset_from_datafra7/part_0.parquet': Parquet file size is 0 bytes. Is this a 'parquet' file?

pyarrow/error.pxi:99: ArrowInvalid
=============================== warnings summary ===============================
tests/unit/dag/test_base_operator.py: 4 warnings
tests/unit/io/test_io.py: 71 warnings
/usr/local/lib/python3.8/dist-packages/cudf/core/frame.py:384: UserWarning: The deep parameter is ignored and is only included for pandas compatibility.
warnings.warn(

tests/unit/io/test_io.py::test_validate_and_regenerate_dataset
/var/jenkins_home/workspace/merlin_core/core/merlin/io/parquet.py:551: DeprecationWarning: 'ParquetDataset.pieces' attribute is deprecated as of pyarrow 5.0.0 and will be removed in a future version. Specify 'use_legacy_dataset=False' while constructing the ParquetDataset, and then use the '.fragments' attribute instead.
paths = [p.path for p in pa_dataset.pieces]

tests/unit/utils/test_utils.py::test_nvt_distributed[True-True]
/usr/local/lib/python3.8/dist-packages/distributed/node.py:180: UserWarning: Port 8787 is already in use.
Perhaps you already have a cluster running?
Hosting the HTTP server on port 40139 instead
warnings.warn(

tests/unit/utils/test_utils.py::test_nvt_distributed[True-False]
/usr/local/lib/python3.8/dist-packages/distributed/node.py:180: UserWarning: Port 8787 is already in use.
Perhaps you already have a cluster running?
Hosting the HTTP server on port 40105 instead
warnings.warn(

tests/unit/utils/test_utils.py::test_nvt_distributed[False-True]
/usr/local/lib/python3.8/dist-packages/distributed/node.py:180: UserWarning: Port 8787 is already in use.
Perhaps you already have a cluster running?
Hosting the HTTP server on port 35611 instead
warnings.warn(

tests/unit/utils/test_utils.py::test_nvt_distributed[False-False]
/usr/local/lib/python3.8/dist-packages/distributed/node.py:180: UserWarning: Port 8787 is already in use.
Perhaps you already have a cluster running?
Hosting the HTTP server on port 34703 instead
warnings.warn(

tests/unit/utils/test_utils.py::test_nvt_distributed_force[True]
/usr/local/lib/python3.8/dist-packages/distributed/node.py:180: UserWarning: Port 8787 is already in use.
Perhaps you already have a cluster running?
Hosting the HTTP server on port 45457 instead
warnings.warn(

tests/unit/utils/test_utils.py::test_nvt_distributed_force[False]
/usr/local/lib/python3.8/dist-packages/distributed/node.py:180: UserWarning: Port 8787 is already in use.
Perhaps you already have a cluster running?
Hosting the HTTP server on port 43757 instead
warnings.warn(

-- Docs: https://docs.pytest.org/en/stable/how-to/capture-warnings.html
=========================== short test summary info ============================
FAILED tests/unit/io/test_io.py::test_dask_dataset_from_dataframe[True-cudf]
FAILED tests/unit/io/test_io.py::test_dask_dataset_from_dataframe[True-dask_cudf]
FAILED tests/unit/io/test_io.py::test_dask_dataset_from_dataframe[True-pd] - ...
FAILED tests/unit/io/test_io.py::test_dask_dataset_from_dataframe[True-dd] - ...
============ 4 failed, 339 passed, 1 skipped, 82 warnings in 52.11s ============
Build step 'Execute shell' marked build as failure
Performing Post build task...
Match found for : : True
Logical operation result is TRUE
Running script : #!/bin/bash
cd /var/jenkins_home/
CUDA_VISIBLE_DEVICES=1 python test_res_push.py "https://api.GitHub.com/repos/NVIDIA-Merlin/core/issues/$ghprbPullId/comments" "/var/jenkins_home/jobs/$JOB_NAME/builds/$BUILD_NUMBER/log"
[merlin_core] $ /bin/bash /tmp/jenkins4290429288741292030.sh

@github-actions
Copy link

github-actions bot commented Aug 3, 2022

Documentation preview

https://nvidia-merlin.github.io/core/review/pr-112

Copy link
Member

@benfred benfred left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm - thanks!

@gabrielspmoreira gabrielspmoreira merged commit ad142af into NVIDIA-Merlin:main Aug 3, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

bug Something isn't working

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants