Skip to content

Use lithops to parallelize open_mfdataset#9932

Draft
TomNicholas wants to merge 17 commits into
pydata:mainfrom
TomNicholas:lithops_open_mfdataset
Draft

Use lithops to parallelize open_mfdataset#9932
TomNicholas wants to merge 17 commits into
pydata:mainfrom
TomNicholas:lithops_open_mfdataset

Conversation

@TomNicholas

@TomNicholas TomNicholas commented Jan 8, 2025

Copy link
Copy Markdown
Member

Experiment generalizing the parallel kwarg to open_mfdataset to accept 'lithops', instead of a assuming that dask is the only option. Lithops can perform each open_dataset on a different serverless worker, though the test case here uses lithops' default configuration to just run on one machine.

As cubed can run the computations on its lithops executor, this could allow an entirely serverless user workflow like:

ds = open_mfdataset(
    's3://bucket/files*.nc',
    combine='by_coords',
    parallel='lithops,
    chunked_array_type='cubed',
)

Related to #7810, although the lithops API uses futures and the dask API uses delayed, which makes the case-handling logic in this PR a little convoluted.

Still has the same downside described in #8523, in that each lazy dataset will be sent back to the client (over the network).

Inspired by messing around with the same idea in virtualizarr zarr-developers/VirtualiZarr#349

  • Closes #xxxx
  • Tests added
  • User visible changes (including notable bug fixes) are documented in whats-new.rst
  • New functions/methods are listed in api.rst

cc @dcherian @keewis @tomwhite

@TomNicholas TomNicholas added topic-combine combine/concat/merge dependencies Pull requests that update a dependency file labels Jan 8, 2025
@TomNicholas

Copy link
Copy Markdown
Member Author

Hmm the test I added passes locally but it looks like I need to install lithops via pip in the CI?

@dcherian

dcherian commented Jan 8, 2025

Copy link
Copy Markdown
Contributor

I think we should strongly consider just taking an Executor and calling executor.submit or executor.map. The use of Delayed here to create a "lazy" future is pointless, everything is immediately "computed"/executed two lines later anyway.

@TomNicholas

Copy link
Copy Markdown
Member Author

That makes total sense, but what's the dask equivalent of the Executor?

@dcherian

dcherian commented Jan 8, 2025

Copy link
Copy Markdown
Contributor

distributed.Client has map and submit. For the rest we might need to write a wrapper class that implements .submit as a .get on whichever scheduler is active.

cc @phofl

Comment thread xarray/backends/api.py
Comment on lines +1648 to +1651
def generate_lazy_ds(path):
# allows passing the open_dataset function to lithops without evaluating it
ds = open_(path, **kwargs)
return ds

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks potentially like a functools.partial with **kwargs?

Comment thread xarray/backends/api.py
Comment on lines +1653 to +1658
futures = fn_exec.map(generate_lazy_ds, paths1d)

# wait for all the serverless workers to finish, and send their resulting lazy datasets back to the client
# TODO do we need download_results?
completed_futures, _ = fn_exec.wait(futures, download_results=True)
datasets = completed_futures.get_result()

@keewis keewis Jan 8, 2025

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we can find an abstraction that works for both this (which is kinda like concurrent.futures' pool executors) and dask.

For example, maybe we can use functools.partial to mimic dask.delayed. The result would be a bunch of function objects without parameters, which would then be evaluated in fn_exec.map using operator.call.

(But I guess if we refactor the dask code as well we don't really need that idea)

@phofl

phofl commented Jan 8, 2025

Copy link
Copy Markdown
Contributor

distributed.Client has map and submit. For the rest we might need to write a wrapper class that implements .submit as a .get on whichever scheduler is active.

Yep that sounds sensible

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

dependencies Pull requests that update a dependency file topic-combine combine/concat/merge

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants