From fc9c4efeccaadff591c847fffccae265b9c59723 Mon Sep 17 00:00:00 2001 From: Stephan Hoyer Date: Sun, 23 Jun 2019 20:55:58 +0300 Subject: [PATCH] Fix rolling window operations with dask when bottleneck is installed xref GH2940, GH2942 Previously, these operations could silently return incorrect results (dask 2.0), or use unbounded amounts of memory (older versions of dask). This requires a fairly large refactoring, because deciding when to use bottleneck now needs to be done at runtime rather than at import-time. These methods are now constructed as methods rather being injected aftewards into the class, which should also be a much more standard and understable design. --- doc/whats-new.rst | 3 + xarray/core/ops.py | 75 ----------- xarray/core/rolling.py | 227 ++++++++++++++++++--------------- xarray/tests/test_dataarray.py | 1 - 4 files changed, 130 insertions(+), 176 deletions(-) diff --git a/doc/whats-new.rst b/doc/whats-new.rst index 373cb8d13dc..61cc0a8d55e 100644 --- a/doc/whats-new.rst +++ b/doc/whats-new.rst @@ -70,6 +70,9 @@ Enhancements Bug fixes ~~~~~~~~~ +- Rolling operations on xarray objects containing dask arrays could silently + compute the incorrect result or use large amounts of memory (:issue:`2940`). + By `Stephan Hoyer `_. - NetCDF4 output: variables with unlimited dimensions must be chunked (not contiguous) on output. (:issue:`1849`) By `James McCreight `_. diff --git a/xarray/core/ops.py b/xarray/core/ops.py index 97e240c5126..3759a7c5634 100644 --- a/xarray/core/ops.py +++ b/xarray/core/ops.py @@ -38,11 +38,6 @@ NAN_REDUCE_METHODS = ['argmax', 'argmin', 'max', 'min', 'mean', 'prod', 'sum', 'std', 'var', 'median'] NAN_CUM_METHODS = ['cumsum', 'cumprod'] -BOTTLENECK_ROLLING_METHODS = {'move_sum': 'sum', 'move_mean': 'mean', - 'move_std': 'std', 'move_min': 'min', - 'move_max': 'max', 'move_var': 'var', - 'move_argmin': 'argmin', 'move_argmax': 'argmax', - 'move_median': 'median'} # TODO: wrap take, dot, sort @@ -103,20 +98,6 @@ If fewer than min_count non-NA values are present the result will be NA. New in version 0.10.8: Added with the default being None.""" -_ROLLING_REDUCE_DOCSTRING_TEMPLATE = """\ -Reduce this {da_or_ds}'s data windows by applying `{name}` along its dimension. - -Parameters ----------- -**kwargs : dict - Additional keyword arguments passed on to `{name}`. - -Returns -------- -reduced : {da_or_ds} - New {da_or_ds} object with `{name}` applied along its rolling dimnension. -""" - _COARSEN_REDUCE_DOCSTRING_TEMPLATE = """\ Coarsen this object by applying `{name}` along its dimensions. @@ -236,13 +217,6 @@ def func(self, *args, **kwargs): return func -def rolling_count(rolling): - - rolling_count = rolling._counts() - enough_periods = rolling_count >= rolling._min_periods - return rolling_count.where(enough_periods) - - def inject_reduce_methods(cls): methods = ([(name, getattr(duck_array_ops, 'array_%s' % name), False) for name in REDUCE_METHODS] + @@ -340,55 +314,6 @@ def inject_all_ops_and_reduce_methods(cls, priority=50, array_only=True): inject_cum_methods(cls) -def inject_bottleneck_rolling_methods(cls): - # standard numpy reduce methods - methods = [(name, getattr(duck_array_ops, name)) - for name in NAN_REDUCE_METHODS] - for name, f in methods: - func = cls._reduce_method(f) - func.__name__ = name - func.__doc__ = _ROLLING_REDUCE_DOCSTRING_TEMPLATE.format( - name=func.__name__, da_or_ds='DataArray') - setattr(cls, name, func) - - # bottleneck doesn't offer rolling_count, so we construct it ourselves - func = rolling_count - func.__name__ = 'count' - func.__doc__ = _ROLLING_REDUCE_DOCSTRING_TEMPLATE.format( - name=func.__name__, da_or_ds='DataArray') - setattr(cls, 'count', func) - - # bottleneck rolling methods - if not has_bottleneck: - return - - for bn_name, method_name in BOTTLENECK_ROLLING_METHODS.items(): - f = getattr(bn, bn_name) - func = cls._bottleneck_reduce(f) - func.__name__ = method_name - func.__doc__ = _ROLLING_REDUCE_DOCSTRING_TEMPLATE.format( - name=func.__name__, da_or_ds='DataArray') - setattr(cls, method_name, func) - - -def inject_datasetrolling_methods(cls): - # standard numpy reduce methods - methods = [(name, getattr(duck_array_ops, name)) - for name in NAN_REDUCE_METHODS] - for name, f in methods: - func = cls._reduce_method(f) - func.__name__ = name - func.__doc__ = _ROLLING_REDUCE_DOCSTRING_TEMPLATE.format( - name=func.__name__, da_or_ds='Dataset') - setattr(cls, name, func) - # bottleneck doesn't offer rolling_count, so we construct it ourselves - func = rolling_count - func.__name__ = 'count' - func.__doc__ = _ROLLING_REDUCE_DOCSTRING_TEMPLATE.format( - name=func.__name__, da_or_ds='Dataset') - setattr(cls, 'count', func) - - def inject_coarsen_methods(cls): # standard numpy reduce methods methods = [(name, getattr(duck_array_ops, name)) diff --git a/xarray/core/rolling.py b/xarray/core/rolling.py index 4773512cdc4..fdd51304520 100644 --- a/xarray/core/rolling.py +++ b/xarray/core/rolling.py @@ -1,3 +1,4 @@ +import functools import warnings from collections import OrderedDict from distutils.version import LooseVersion @@ -6,11 +7,30 @@ from . import dtypes, duck_array_ops, utils from .dask_array_ops import dask_rolling_wrapper -from .ops import ( - bn, has_bottleneck, inject_bottleneck_rolling_methods, - inject_coarsen_methods, inject_datasetrolling_methods) +from .ops import inject_coarsen_methods from .pycompat import dask_array_type +try: + import bottleneck +except ImportError: + # use numpy methods instead + bottleneck = None + + +_ROLLING_REDUCE_DOCSTRING_TEMPLATE = """\ +Reduce this object's data windows by applying `{name}` along its dimension. + +Parameters +---------- +**kwargs : dict + Additional keyword arguments passed on to `{name}`. + +Returns +------- +reduced : same type as caller + New object with `{name}` applied along its rolling dimnension. +""" + class Rolling: """A object that implements the moving window pattern. @@ -51,8 +71,8 @@ def __init__(self, obj, windows, min_periods=None, center=False): rolling : type of input argument """ - if (has_bottleneck and - (LooseVersion(bn.__version__) < LooseVersion('1.0'))): + if (bottleneck is not None and + (LooseVersion(bottleneck.__version__) < LooseVersion('1.0'))): warnings.warn('xarray requires bottleneck version of 1.0 or ' 'greater for rolling operations. Rolling ' 'aggregation methods will use numpy instead' @@ -94,6 +114,34 @@ def __repr__(self): def __len__(self): return self.obj.sizes[self.dim] + def _reduce_method(name): + array_agg_func = getattr(duck_array_ops, name) + bottleneck_move_func = getattr(bottleneck, 'move_' + name, None) + + def method(self, **kwargs): + return self._numpy_or_bottleneck_reduce( + array_agg_func, bottleneck_move_func, **kwargs) + method.__name__ = name + method.__doc__ = _ROLLING_REDUCE_DOCSTRING_TEMPLATE.format(name=name) + return method + + argmax = _reduce_method('argmax') + argmin = _reduce_method('argmin') + max = _reduce_method('max') + min = _reduce_method('min') + mean = _reduce_method('mean') + prod = _reduce_method('prod') + sum = _reduce_method('sum') + std = _reduce_method('std') + var = _reduce_method('var') + median = _reduce_method('median') + + def count(self): + rolling_count = self._counts() + enough_periods = rolling_count >= self._min_periods + return rolling_count.where(enough_periods) + count.__doc__ = _ROLLING_REDUCE_DOCSTRING_TEMPLATE.format(name='count') + class DataArrayRolling(Rolling): def __init__(self, obj, windows, min_periods=None, center=False): @@ -257,71 +305,65 @@ def _counts(self): .sum(dim=rolling_dim, skipna=False)) return counts - @classmethod - def _reduce_method(cls, func): - """ - Methods to return a wrapped function for any function `func` for - numpy methods. - """ - - def wrapped_func(self, **kwargs): - return self.reduce(func, **kwargs) - return wrapped_func + def _bottleneck_reduce(self, func, **kwargs): + from .dataarray import DataArray - @classmethod - def _bottleneck_reduce(cls, func): - """ - Methods to return a wrapped function for any function `func` for - bottoleneck method, except for `median`. - """ + # bottleneck doesn't allow min_count to be 0, although it should + # work the same as if min_count = 1 + if self.min_periods is not None and self.min_periods == 0: + min_count = 1 + else: + min_count = self.min_periods - def wrapped_func(self, **kwargs): - from .dataarray import DataArray + axis = self.obj.get_axis_num(self.dim) - # bottleneck doesn't allow min_count to be 0, although it should - # work the same as if min_count = 1 - if self.min_periods is not None and self.min_periods == 0: - min_count = 1 - else: - min_count = self.min_periods - - axis = self.obj.get_axis_num(self.dim) - - padded = self.obj.variable - if self.center: - if (LooseVersion(np.__version__) < LooseVersion('1.13') and - self.obj.dtype.kind == 'b'): - # with numpy < 1.13 bottleneck cannot handle np.nan-Boolean - # mixed array correctly. We cast boolean array to float. - padded = padded.astype(float) - - if isinstance(padded.data, dask_array_type): - # Workaround to make the padded chunk size is larger than - # self.window-1 - shift = - (self.window + 1) // 2 - offset = (self.window - 1) // 2 - valid = (slice(None), ) * axis + ( - slice(offset, offset + self.obj.shape[axis]), ) - else: - shift = (-self.window // 2) + 1 - valid = (slice(None), ) * axis + (slice(-shift, None), ) - padded = padded.pad_with_fill_value({self.dim: (0, -shift)}) + padded = self.obj.variable + if self.center: + if (LooseVersion(np.__version__) < LooseVersion('1.13') and + self.obj.dtype.kind == 'b'): + # with numpy < 1.13 bottleneck cannot handle np.nan-Boolean + # mixed array correctly. We cast boolean array to float. + padded = padded.astype(float) if isinstance(padded.data, dask_array_type): - values = dask_rolling_wrapper(func, padded, - window=self.window, - min_count=min_count, - axis=axis) + # Workaround to make the padded chunk size is larger than + # self.window-1 + shift = - (self.window + 1) // 2 + offset = (self.window - 1) // 2 + valid = (slice(None), ) * axis + ( + slice(offset, offset + self.obj.shape[axis]), ) else: - values = func(padded.data, window=self.window, - min_count=min_count, axis=axis) - - if self.center: - values = values[valid] - result = DataArray(values, self.obj.coords) - - return result - return wrapped_func + shift = (-self.window // 2) + 1 + valid = (slice(None), ) * axis + (slice(-shift, None), ) + padded = padded.pad_with_fill_value({self.dim: (0, -shift)}) + + if isinstance(padded.data, dask_array_type): + raise AssertionError('should not be reachable') + values = dask_rolling_wrapper(func, padded.data, + window=self.window, + min_count=min_count, + axis=axis) + else: + values = func(padded.data, window=self.window, + min_count=min_count, axis=axis) + + if self.center: + values = values[valid] + result = DataArray(values, self.obj.coords) + + return result + + def _numpy_or_bottleneck_reduce( + self, array_agg_func, bottleneck_move_func, **kwargs + ): + if (bottleneck_move_func is not None and + not isinstance(self.obj.data, dask_array_type)): + # TODO: renable bottleneck with dask after the issues + # underlying https://github.com/pydata/xarray/issues/2940 are + # fixed. + return self._bottleneck_reduce(bottleneck_move_func, **kwargs) + else: + return self.reduce(array_agg_func, **kwargs) class DatasetRolling(Rolling): @@ -370,6 +412,16 @@ def __init__(self, obj, windows, min_periods=None, center=False): self.rollings[key] = DataArrayRolling( da, windows, min_periods, center) + def _dataset_implementation(self, func, **kwargs): + from .dataset import Dataset + reduced = OrderedDict() + for key, da in self.obj.data_vars.items(): + if self.dim in da.dims: + reduced[key] = func(self.rollings[key], **kwargs) + else: + reduced[key] = self.obj[key] + return Dataset(reduced, coords=self.obj.coords) + def reduce(self, func, **kwargs): """Reduce the items in this group by applying `func` along some dimension(s). @@ -388,43 +440,20 @@ def reduce(self, func, **kwargs): reduced : DataArray Array with summarized data. """ - from .dataset import Dataset - reduced = OrderedDict() - for key, da in self.obj.data_vars.items(): - if self.dim in da.dims: - reduced[key] = self.rollings[key].reduce(func, **kwargs) - else: - reduced[key] = self.obj[key] - return Dataset(reduced, coords=self.obj.coords) + return self._dataset_implementation( + functools.partial(DataArrayRolling.reduce, func=func), **kwargs) def _counts(self): - from .dataset import Dataset - reduced = OrderedDict() - for key, da in self.obj.data_vars.items(): - if self.dim in da.dims: - reduced[key] = self.rollings[key]._counts() - else: - reduced[key] = self.obj[key] - return Dataset(reduced, coords=self.obj.coords) + return self._dataset_implementation(DataArrayRolling._counts) - @classmethod - def _reduce_method(cls, func): - """ - Return a wrapped function for injecting numpy and bottoleneck methods. - see ops.inject_datasetrolling_methods - """ - - def wrapped_func(self, **kwargs): - from .dataset import Dataset - reduced = OrderedDict() - for key, da in self.obj.data_vars.items(): - if self.dim in da.dims: - reduced[key] = getattr(self.rollings[key], - func.__name__)(**kwargs) - else: - reduced[key] = self.obj[key] - return Dataset(reduced, coords=self.obj.coords) - return wrapped_func + def _numpy_or_bottleneck_reduce( + self, array_agg_func, bottleneck_move_func, **kwargs + ): + return self._dataset_implementation( + functools.partial(DataArrayRolling._numpy_or_bottleneck_reduce, + array_agg_func=array_agg_func, + bottleneck_move_func=bottleneck_move_func), + **kwargs) def construct(self, window_dim, stride=1, fill_value=dtypes.NA): """ @@ -572,7 +601,5 @@ def wrapped_func(self, **kwargs): return wrapped_func -inject_bottleneck_rolling_methods(DataArrayRolling) -inject_datasetrolling_methods(DatasetRolling) inject_coarsen_methods(DataArrayCoarsen) inject_coarsen_methods(DatasetCoarsen) diff --git a/xarray/tests/test_dataarray.py b/xarray/tests/test_dataarray.py index 47222194151..3dfe12bd310 100644 --- a/xarray/tests/test_dataarray.py +++ b/xarray/tests/test_dataarray.py @@ -3747,7 +3747,6 @@ def test_rolling_wrapped_bottleneck(da, name, center, min_periods): @pytest.mark.parametrize('center', (True, False, None)) @pytest.mark.parametrize('min_periods', (1, None)) @pytest.mark.parametrize('window', (7, 8)) -@pytest.mark.xfail(reason='https://github.com/pydata/xarray/issues/2940') def test_rolling_wrapped_dask(da_dask, name, center, min_periods, window): pytest.importorskip('dask.array') # dask version