Add open_virtual_mfdataset#349
Conversation
| datasets, closers = dask.compute(datasets, closers) | ||
| elif parallel == "lithops": | ||
|
|
||
| def generate_refs(path): |
There was a problem hiding this comment.
This is the equivalent of @thodson-usgs 's map_references function
|
|
||
| # wait for all the serverless workers to finish, and send their resulting virtual datasets back to the client | ||
| completed_futures, _ = fn_exec.wait(futures, download_results=True) | ||
| virtual_datasets = [future.get_result() for future in completed_futures] |
There was a problem hiding this comment.
IIUC this will cause every serverless worker to send a small virtual dataset back to the client process over the internet somehow
| from xarray.backends.api import _multi_file_closer | ||
| from xarray.backends.common import _find_absolute_paths | ||
| from xarray.core.combine import _infer_concat_order_from_positions, _nested_combine |
There was a problem hiding this comment.
I don't like importing these deep xarray internals like this (though _infer_concat_order_from_positions and _nested_combine haven't changed since I wrote them 6 years ago), but the only alternative would be to make a general virtualizarr backend engine for xarray (see #35).
| elif parallel == "lithops": | ||
| import lithops |
There was a problem hiding this comment.
I believe all of this could also be useful upstream in xr.open_mfdataset
There was a problem hiding this comment.
Yes, that should work fine. We may want to loosen/generalize blockwise slightly in Cubed to return an arbitrary object so it can be done with Cubed - but that can be done later.
Agreed - it will be interesting to see this for large datasets. (It's also similar to the approach I've taken for storing data in Icechunk where the changesets are returned to the client - again, small kB-sized UUIDs.) |
for more information, see https://pre-commit.ci
* need latest version of xarray to import internals correctly * Fix metadata equality for nan fill value (#502) * add check that works for fill_values too * note about removing once merged upstream * type hint * regression test * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Remove accidental changes to pyproject.toml * Update pyproject.toml * ignore mypy --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> * Setup intersphinx mapping for docs (#503) * Setup intersphinx mapping for docs --------- Co-authored-by: Kyle Barron <kylebarron2@gmail.com> * Change default loadable_variables (and indexes) to match xarray's behaviour (#477) * draft refactor * sketch of simplified handling of loadable_variables * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * get at least some tests working * separate VirtualBackend api definition from common utilities * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * remove indexes={} everywhere in tests * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * stop passing through loadable_variables to where it isn't used * implement logic to load 1D dimension coords by default * remove more instances of indexes={} * remove more indexes={} * refactor logic for choosing loadable_variables * fix more tets * xfail Aimee's test that I don't understand * xfail test that explicitly specifies no indexes * made a bunch more stuff pass * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * fix netcdf3 reader * fix bad import in FITS reader * fix import in tiff reader * fix import in icechunk test * release note * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * update docstring * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * fix fits reader * xfail on empty dict for indexes * linting * actually test new expected behaviour * fix logic for setting loadable_variables * update docs page to reflect new behaviour * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * fix expected behaviour in another tests * additional assert * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * use encode_dataset_coordinates in kerchunk writer * Encode zarr vars * fix some mypy errors * move drop_variables implmentation to the end of every reader * override loadable_variables and raise warning * fix failing test by not creating loadable variables that would get inlined by default * improve error message * remove some more occurrences of indexes={} * skip slow test * slay mypy errors * docs typos * should fix dmrpp test * Delete commented-out code * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * remove unecessary test skip --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> Co-authored-by: Max Jones <14077947+maxrjones@users.noreply.github.com> * Update pyproject.toml deps (#504) * re-add icechunk to upstream tests * add pytest-asyncio to test envs * passing serial open_virtual_mfdataset test * passes with lithops but only for the HDF backend * add test for dask * refactored serial and lithops codepaths to use an executor pattern * xfail lithops * consolidate tests by parametrizing over parallel kwarg * re-enable lithops test * remove unneeded get_executor function * add test for using dask distributed to parallelize --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> Co-authored-by: Max Jones <14077947+maxrjones@users.noreply.github.com> Co-authored-by: Kyle Barron <kylebarron2@gmail.com>
* need latest version of xarray to import internals correctly * passing serial open_virtual_mfdataset test * passes with lithops but only for the HDF backend * add test for dask * refactored serial and lithops codepaths to use an executor pattern * xfail lithops * consolidate tests by parametrizing over parallel kwarg * re-enable lithops test * remove unneeded get_executor function * add test for using dask distributed to parallelize * Add ManifestStore for loading data from ManifestArrays (#490) * Draft ManifestStore implementation --------- Co-authored-by: Tom Nicholas <tom@earthmover.io> Co-authored-by: Kyle Barron <kylebarron2@gmail.com> Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> * make it work for dask delayed * correct docstring --------- Co-authored-by: Max Jones <14077947+maxrjones@users.noreply.github.com> Co-authored-by: Kyle Barron <kylebarron2@gmail.com> Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
* need latest version of xarray to import internals correctly * passing serial open_virtual_mfdataset test * passes with lithops but only for the HDF backend * add test for dask * refactored serial and lithops codepaths to use an executor pattern * xfail lithops * consolidate tests by parametrizing over parallel kwarg * re-enable lithops test * remove unneeded get_executor function * add test for using dask distributed to parallelize * make it work for dask delayed * correct docstring * added compliant executor for lithops * add links to lithops issues * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
| pytest.mark.xfail( | ||
| reason="Lithops bug - see https://github.com/lithops-cloud/lithops/issues/1428" |
There was a problem hiding this comment.
There was a problem hiding this comment.
I was able to dodge this by not using functools.partial, but I'm a bit worried that my newer approach of using a closure won't work properly in a remote execution context.
| class LithopsEagerFunctionExecutor(Executor): | ||
| """ | ||
| Lithops-based function executor which follows the concurrent.futures.Executor API. | ||
|
|
||
| Only required because lithops doesn't follow the concurrent.futures.Executor API, see https://github.com/lithops-cloud/lithops/issues/1427. | ||
| """ |
There was a problem hiding this comment.
* need latest version of xarray to import internals correctly * passing serial open_virtual_mfdataset test * passes with lithops but only for the HDF backend * add test for dask * refactored serial and lithops codepaths to use an executor pattern * xfail lithops * consolidate tests by parametrizing over parallel kwarg * re-enable lithops test * remove unneeded get_executor function * add test for using dask distributed to parallelize * make it work for dask delayed * correct docstring * added compliant executor for lithops * add links to lithops issues * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * specify dask and lithops executors with a string again * fix easy typing stuff * fix typing errors by aligning executor signatures * remove open_virtual_mfdataset from public API for now * release note * refactor construction of expected result * implement preprocess arg, and dodge lithops bug * update comment --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Here I have copied the code from
xr.open_mfdataset, changed it to useopen_virtual_dataset, and added an option to parallelize with lithops as an alternative to usingdask.delayed.I haven't even tried to run this yet, but I think this is the right approach @tomwhite? I realised we don't need cubed's blockwise because xarray.open_mfdataset has internal logic to turn the N-dimensional concat into a 1D map already, so
lithops.mapshould be fine?Also I think based on our conversation we should be able to use
lithops.mapinstead oflithops.map_reducelike @thodson-usgs did in #203 because the tiny size of the virtual datasets being returned to the client means that we should be able to get away with a single reduction step on the client even at large scale? (see also #104 for justification that we only need to send back kB-sized objects).open_virtual_mfdatasetas suggested in open_virtual_mfdataset #345, but also sketches out how we might close both Trying to runopen_virtual_datasetin parallel #95 and Serverless parallelization of reference generation #123docs/releases.rstapi.rstNew functionality has documentationMake it a context manager