From fc8a51f410b541aba1d6e3d53ec957a05b307fd3 Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Wed, 12 Dec 2018 17:50:04 -0500 Subject: [PATCH 1/4] Support HighLevelGraphs Fixes #4291 --- xarray/core/dataarray.py | 3 +++ xarray/core/dataset.py | 13 +++++++++++-- xarray/core/variable.py | 3 +++ 3 files changed, 17 insertions(+), 2 deletions(-) diff --git a/xarray/core/dataarray.py b/xarray/core/dataarray.py index 17af3cf2cd1..938b05f963b 100644 --- a/xarray/core/dataarray.py +++ b/xarray/core/dataarray.py @@ -587,6 +587,9 @@ def __dask_graph__(self): def __dask_keys__(self): return self._to_temp_dataset().__dask_keys__() + def __dask_layers__(self): + return self._to_temp_dataset().__dask_layers__() + @property def __dask_optimize__(self): return self._to_temp_dataset().__dask_optimize__ diff --git a/xarray/core/dataset.py b/xarray/core/dataset.py index 888d2566f21..47befe6461d 100644 --- a/xarray/core/dataset.py +++ b/xarray/core/dataset.py @@ -509,14 +509,23 @@ def __dask_graph__(self): if not graphs: return None else: - from dask import sharedict - return sharedict.merge(*graphs.values()) + try: + from dask.highlevelgraph import HighLevelGraph + return HighLevelGraph.merge(*graphs.values()) + except ImportError: + from dask import sharedict + return sharedict.merge(*graphs.values()) + def __dask_keys__(self): import dask return [v.__dask_keys__() for v in self.variables.values() if dask.is_dask_collection(v)] + def __dask_layers__(self): + return sum([v.__dask_layers__() for v in self.variables.values() if + dask.is_dask_collection(v)], []) + @property def __dask_optimize__(self): import dask.array as da diff --git a/xarray/core/variable.py b/xarray/core/variable.py index 7a921805258..469e8741a29 100644 --- a/xarray/core/variable.py +++ b/xarray/core/variable.py @@ -352,6 +352,9 @@ def __dask_graph__(self): def __dask_keys__(self): return self._data.__dask_keys__() + def __dask_layers__(self): + return self._data.__dask_layers__() + @property def __dask_optimize__(self): return self._data.__dask_optimize__ From 9f6e9335a61bd51bb0ab5dd42fdeda5128095757 Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Thu, 13 Dec 2018 09:41:24 -0500 Subject: [PATCH 2/4] test __dask_layers__ --- doc/whats-new.rst | 7 ++++--- xarray/core/dataset.py | 3 ++- xarray/tests/test_dask.py | 11 +++++++++++ 3 files changed, 17 insertions(+), 4 deletions(-) diff --git a/doc/whats-new.rst b/doc/whats-new.rst index 87a0e425693..9273292255b 100644 --- a/doc/whats-new.rst +++ b/doc/whats-new.rst @@ -43,7 +43,8 @@ Enhancements By `Stephan Hoyer `_ - Like :py:class:`pandas.DatetimeIndex`, :py:class:`CFTimeIndex` now supports "dayofyear" and "dayofweek" accessors (:issue:`2597`). By `Spencer Clark - `_. + `_. +- Support Dask ``HighLevelGraphs`` by `Matthew Rocklin `_. Bug fixes @@ -159,9 +160,9 @@ Enhancements to returning (and is now deprecated). This was changed in order to facilitate using tutorial datasets with dask. By `Joe Hamman `_. -- ``DataArray`` can now use ``xr.set_option(keep_attrs=True)`` and retain attributes in binary operations, +- ``DataArray`` can now use ``xr.set_option(keep_attrs=True)`` and retain attributes in binary operations, such as (``+, -, * ,/``). Default behaviour is unchanged (*Attributes will be dismissed*). By `Michael Blaschek `_ - + Bug fixes ~~~~~~~~~ diff --git a/xarray/core/dataset.py b/xarray/core/dataset.py index 47befe6461d..8f28798a8a9 100644 --- a/xarray/core/dataset.py +++ b/xarray/core/dataset.py @@ -523,8 +523,9 @@ def __dask_keys__(self): if dask.is_dask_collection(v)] def __dask_layers__(self): + import dask return sum([v.__dask_layers__() for v in self.variables.values() if - dask.is_dask_collection(v)], []) + dask.is_dask_collection(v)], ()) @property def __dask_optimize__(self): diff --git a/xarray/tests/test_dask.py b/xarray/tests/test_dask.py index 62ce7d074fa..a6ffd7a380f 100644 --- a/xarray/tests/test_dask.py +++ b/xarray/tests/test_dask.py @@ -843,3 +843,14 @@ def test_basic_compute(): ds.compute() ds.foo.compute() ds.foo.variable.compute() + + +def test_dask_layers_and_dependencies(): + ds = Dataset({'foo': ('x', range(5)), + 'bar': ('x', range(5))}).chunk() + + x = dask.delayed(ds) + assert set(x.__dask_graph__().dependencies).issuperset( + ds.__dask_graph__().dependencies) + assert set(x.foo.__dask_graph__().dependencies).issuperset( + ds.__dask_graph__().dependencies) From 6b5d5517f52fe3dd52fdbfef9b8d00326479382c Mon Sep 17 00:00:00 2001 From: Stephan Hoyer Date: Thu, 13 Dec 2018 08:15:57 -0800 Subject: [PATCH 3/4] Skip dependnecies test with old dask --- xarray/tests/test_dask.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/xarray/tests/test_dask.py b/xarray/tests/test_dask.py index a6ffd7a380f..c77384c5733 100644 --- a/xarray/tests/test_dask.py +++ b/xarray/tests/test_dask.py @@ -845,6 +845,8 @@ def test_basic_compute(): ds.foo.variable.compute() +@pytest.mark.skipif(LooseVersion(dask.__version__) < LooseVersion('0.20.0'), + reason='needs newer dask') def test_dask_layers_and_dependencies(): ds = Dataset({'foo': ('x', range(5)), 'bar': ('x', range(5))}).chunk() From bfa69b59d6269046f4505ac73fec6506a7c6956c Mon Sep 17 00:00:00 2001 From: Stephan Hoyer Date: Thu, 13 Dec 2018 08:16:20 -0800 Subject: [PATCH 4/4] Reenable dask-dev test on Travis-CI --- .travis.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 4ebd4f392a9..defb37ec8aa 100644 --- a/.travis.yml +++ b/.travis.yml @@ -44,7 +44,6 @@ matrix: - libhdf5-serial-dev - netcdf-bin - libnetcdf-dev - - env: CONDA_ENV=py36-dask-dev - env: CONDA_ENV=py36-pandas-dev - env: CONDA_ENV=py36-bottleneck-dev - env: CONDA_ENV=py36-condaforge-rc