diff --git a/doc/whats-new.rst b/doc/whats-new.rst index cd1b532ed2d..ccc323c0c02 100644 --- a/doc/whats-new.rst +++ b/doc/whats-new.rst @@ -97,6 +97,9 @@ Enhancements Bug fixes ~~~~~~~~~ +- Rolling operations on xarray objects containing dask arrays could silently + compute the incorrect result or use large amounts of memory (:issue:`2940`). + By `Stephan Hoyer `_. - Don't set encoding attributes on bounds variables when writing to netCDF. (:issue:`2921`) By `Deepak Cherian `_. diff --git a/xarray/core/ops.py b/xarray/core/ops.py index 97e240c5126..3759a7c5634 100644 --- a/xarray/core/ops.py +++ b/xarray/core/ops.py @@ -38,11 +38,6 @@ NAN_REDUCE_METHODS = ['argmax', 'argmin', 'max', 'min', 'mean', 'prod', 'sum', 'std', 'var', 'median'] NAN_CUM_METHODS = ['cumsum', 'cumprod'] -BOTTLENECK_ROLLING_METHODS = {'move_sum': 'sum', 'move_mean': 'mean', - 'move_std': 'std', 'move_min': 'min', - 'move_max': 'max', 'move_var': 'var', - 'move_argmin': 'argmin', 'move_argmax': 'argmax', - 'move_median': 'median'} # TODO: wrap take, dot, sort @@ -103,20 +98,6 @@ If fewer than min_count non-NA values are present the result will be NA. New in version 0.10.8: Added with the default being None.""" -_ROLLING_REDUCE_DOCSTRING_TEMPLATE = """\ -Reduce this {da_or_ds}'s data windows by applying `{name}` along its dimension. - -Parameters ----------- -**kwargs : dict - Additional keyword arguments passed on to `{name}`. - -Returns -------- -reduced : {da_or_ds} - New {da_or_ds} object with `{name}` applied along its rolling dimnension. -""" - _COARSEN_REDUCE_DOCSTRING_TEMPLATE = """\ Coarsen this object by applying `{name}` along its dimensions. @@ -236,13 +217,6 @@ def func(self, *args, **kwargs): return func -def rolling_count(rolling): - - rolling_count = rolling._counts() - enough_periods = rolling_count >= rolling._min_periods - return rolling_count.where(enough_periods) - - def inject_reduce_methods(cls): methods = ([(name, getattr(duck_array_ops, 'array_%s' % name), False) for name in REDUCE_METHODS] + @@ -340,55 +314,6 @@ def inject_all_ops_and_reduce_methods(cls, priority=50, array_only=True): inject_cum_methods(cls) -def inject_bottleneck_rolling_methods(cls): - # standard numpy reduce methods - methods = [(name, getattr(duck_array_ops, name)) - for name in NAN_REDUCE_METHODS] - for name, f in methods: - func = cls._reduce_method(f) - func.__name__ = name - func.__doc__ = _ROLLING_REDUCE_DOCSTRING_TEMPLATE.format( - name=func.__name__, da_or_ds='DataArray') - setattr(cls, name, func) - - # bottleneck doesn't offer rolling_count, so we construct it ourselves - func = rolling_count - func.__name__ = 'count' - func.__doc__ = _ROLLING_REDUCE_DOCSTRING_TEMPLATE.format( - name=func.__name__, da_or_ds='DataArray') - setattr(cls, 'count', func) - - # bottleneck rolling methods - if not has_bottleneck: - return - - for bn_name, method_name in BOTTLENECK_ROLLING_METHODS.items(): - f = getattr(bn, bn_name) - func = cls._bottleneck_reduce(f) - func.__name__ = method_name - func.__doc__ = _ROLLING_REDUCE_DOCSTRING_TEMPLATE.format( - name=func.__name__, da_or_ds='DataArray') - setattr(cls, method_name, func) - - -def inject_datasetrolling_methods(cls): - # standard numpy reduce methods - methods = [(name, getattr(duck_array_ops, name)) - for name in NAN_REDUCE_METHODS] - for name, f in methods: - func = cls._reduce_method(f) - func.__name__ = name - func.__doc__ = _ROLLING_REDUCE_DOCSTRING_TEMPLATE.format( - name=func.__name__, da_or_ds='Dataset') - setattr(cls, name, func) - # bottleneck doesn't offer rolling_count, so we construct it ourselves - func = rolling_count - func.__name__ = 'count' - func.__doc__ = _ROLLING_REDUCE_DOCSTRING_TEMPLATE.format( - name=func.__name__, da_or_ds='Dataset') - setattr(cls, 'count', func) - - def inject_coarsen_methods(cls): # standard numpy reduce methods methods = [(name, getattr(duck_array_ops, name)) diff --git a/xarray/core/rolling.py b/xarray/core/rolling.py index 4773512cdc4..fdd51304520 100644 --- a/xarray/core/rolling.py +++ b/xarray/core/rolling.py @@ -1,3 +1,4 @@ +import functools import warnings from collections import OrderedDict from distutils.version import LooseVersion @@ -6,11 +7,30 @@ from . import dtypes, duck_array_ops, utils from .dask_array_ops import dask_rolling_wrapper -from .ops import ( - bn, has_bottleneck, inject_bottleneck_rolling_methods, - inject_coarsen_methods, inject_datasetrolling_methods) +from .ops import inject_coarsen_methods from .pycompat import dask_array_type +try: + import bottleneck +except ImportError: + # use numpy methods instead + bottleneck = None + + +_ROLLING_REDUCE_DOCSTRING_TEMPLATE = """\ +Reduce this object's data windows by applying `{name}` along its dimension. + +Parameters +---------- +**kwargs : dict + Additional keyword arguments passed on to `{name}`. + +Returns +------- +reduced : same type as caller + New object with `{name}` applied along its rolling dimnension. +""" + class Rolling: """A object that implements the moving window pattern. @@ -51,8 +71,8 @@ def __init__(self, obj, windows, min_periods=None, center=False): rolling : type of input argument """ - if (has_bottleneck and - (LooseVersion(bn.__version__) < LooseVersion('1.0'))): + if (bottleneck is not None and + (LooseVersion(bottleneck.__version__) < LooseVersion('1.0'))): warnings.warn('xarray requires bottleneck version of 1.0 or ' 'greater for rolling operations. Rolling ' 'aggregation methods will use numpy instead' @@ -94,6 +114,34 @@ def __repr__(self): def __len__(self): return self.obj.sizes[self.dim] + def _reduce_method(name): + array_agg_func = getattr(duck_array_ops, name) + bottleneck_move_func = getattr(bottleneck, 'move_' + name, None) + + def method(self, **kwargs): + return self._numpy_or_bottleneck_reduce( + array_agg_func, bottleneck_move_func, **kwargs) + method.__name__ = name + method.__doc__ = _ROLLING_REDUCE_DOCSTRING_TEMPLATE.format(name=name) + return method + + argmax = _reduce_method('argmax') + argmin = _reduce_method('argmin') + max = _reduce_method('max') + min = _reduce_method('min') + mean = _reduce_method('mean') + prod = _reduce_method('prod') + sum = _reduce_method('sum') + std = _reduce_method('std') + var = _reduce_method('var') + median = _reduce_method('median') + + def count(self): + rolling_count = self._counts() + enough_periods = rolling_count >= self._min_periods + return rolling_count.where(enough_periods) + count.__doc__ = _ROLLING_REDUCE_DOCSTRING_TEMPLATE.format(name='count') + class DataArrayRolling(Rolling): def __init__(self, obj, windows, min_periods=None, center=False): @@ -257,71 +305,65 @@ def _counts(self): .sum(dim=rolling_dim, skipna=False)) return counts - @classmethod - def _reduce_method(cls, func): - """ - Methods to return a wrapped function for any function `func` for - numpy methods. - """ - - def wrapped_func(self, **kwargs): - return self.reduce(func, **kwargs) - return wrapped_func + def _bottleneck_reduce(self, func, **kwargs): + from .dataarray import DataArray - @classmethod - def _bottleneck_reduce(cls, func): - """ - Methods to return a wrapped function for any function `func` for - bottoleneck method, except for `median`. - """ + # bottleneck doesn't allow min_count to be 0, although it should + # work the same as if min_count = 1 + if self.min_periods is not None and self.min_periods == 0: + min_count = 1 + else: + min_count = self.min_periods - def wrapped_func(self, **kwargs): - from .dataarray import DataArray + axis = self.obj.get_axis_num(self.dim) - # bottleneck doesn't allow min_count to be 0, although it should - # work the same as if min_count = 1 - if self.min_periods is not None and self.min_periods == 0: - min_count = 1 - else: - min_count = self.min_periods - - axis = self.obj.get_axis_num(self.dim) - - padded = self.obj.variable - if self.center: - if (LooseVersion(np.__version__) < LooseVersion('1.13') and - self.obj.dtype.kind == 'b'): - # with numpy < 1.13 bottleneck cannot handle np.nan-Boolean - # mixed array correctly. We cast boolean array to float. - padded = padded.astype(float) - - if isinstance(padded.data, dask_array_type): - # Workaround to make the padded chunk size is larger than - # self.window-1 - shift = - (self.window + 1) // 2 - offset = (self.window - 1) // 2 - valid = (slice(None), ) * axis + ( - slice(offset, offset + self.obj.shape[axis]), ) - else: - shift = (-self.window // 2) + 1 - valid = (slice(None), ) * axis + (slice(-shift, None), ) - padded = padded.pad_with_fill_value({self.dim: (0, -shift)}) + padded = self.obj.variable + if self.center: + if (LooseVersion(np.__version__) < LooseVersion('1.13') and + self.obj.dtype.kind == 'b'): + # with numpy < 1.13 bottleneck cannot handle np.nan-Boolean + # mixed array correctly. We cast boolean array to float. + padded = padded.astype(float) if isinstance(padded.data, dask_array_type): - values = dask_rolling_wrapper(func, padded, - window=self.window, - min_count=min_count, - axis=axis) + # Workaround to make the padded chunk size is larger than + # self.window-1 + shift = - (self.window + 1) // 2 + offset = (self.window - 1) // 2 + valid = (slice(None), ) * axis + ( + slice(offset, offset + self.obj.shape[axis]), ) else: - values = func(padded.data, window=self.window, - min_count=min_count, axis=axis) - - if self.center: - values = values[valid] - result = DataArray(values, self.obj.coords) - - return result - return wrapped_func + shift = (-self.window // 2) + 1 + valid = (slice(None), ) * axis + (slice(-shift, None), ) + padded = padded.pad_with_fill_value({self.dim: (0, -shift)}) + + if isinstance(padded.data, dask_array_type): + raise AssertionError('should not be reachable') + values = dask_rolling_wrapper(func, padded.data, + window=self.window, + min_count=min_count, + axis=axis) + else: + values = func(padded.data, window=self.window, + min_count=min_count, axis=axis) + + if self.center: + values = values[valid] + result = DataArray(values, self.obj.coords) + + return result + + def _numpy_or_bottleneck_reduce( + self, array_agg_func, bottleneck_move_func, **kwargs + ): + if (bottleneck_move_func is not None and + not isinstance(self.obj.data, dask_array_type)): + # TODO: renable bottleneck with dask after the issues + # underlying https://github.com/pydata/xarray/issues/2940 are + # fixed. + return self._bottleneck_reduce(bottleneck_move_func, **kwargs) + else: + return self.reduce(array_agg_func, **kwargs) class DatasetRolling(Rolling): @@ -370,6 +412,16 @@ def __init__(self, obj, windows, min_periods=None, center=False): self.rollings[key] = DataArrayRolling( da, windows, min_periods, center) + def _dataset_implementation(self, func, **kwargs): + from .dataset import Dataset + reduced = OrderedDict() + for key, da in self.obj.data_vars.items(): + if self.dim in da.dims: + reduced[key] = func(self.rollings[key], **kwargs) + else: + reduced[key] = self.obj[key] + return Dataset(reduced, coords=self.obj.coords) + def reduce(self, func, **kwargs): """Reduce the items in this group by applying `func` along some dimension(s). @@ -388,43 +440,20 @@ def reduce(self, func, **kwargs): reduced : DataArray Array with summarized data. """ - from .dataset import Dataset - reduced = OrderedDict() - for key, da in self.obj.data_vars.items(): - if self.dim in da.dims: - reduced[key] = self.rollings[key].reduce(func, **kwargs) - else: - reduced[key] = self.obj[key] - return Dataset(reduced, coords=self.obj.coords) + return self._dataset_implementation( + functools.partial(DataArrayRolling.reduce, func=func), **kwargs) def _counts(self): - from .dataset import Dataset - reduced = OrderedDict() - for key, da in self.obj.data_vars.items(): - if self.dim in da.dims: - reduced[key] = self.rollings[key]._counts() - else: - reduced[key] = self.obj[key] - return Dataset(reduced, coords=self.obj.coords) + return self._dataset_implementation(DataArrayRolling._counts) - @classmethod - def _reduce_method(cls, func): - """ - Return a wrapped function for injecting numpy and bottoleneck methods. - see ops.inject_datasetrolling_methods - """ - - def wrapped_func(self, **kwargs): - from .dataset import Dataset - reduced = OrderedDict() - for key, da in self.obj.data_vars.items(): - if self.dim in da.dims: - reduced[key] = getattr(self.rollings[key], - func.__name__)(**kwargs) - else: - reduced[key] = self.obj[key] - return Dataset(reduced, coords=self.obj.coords) - return wrapped_func + def _numpy_or_bottleneck_reduce( + self, array_agg_func, bottleneck_move_func, **kwargs + ): + return self._dataset_implementation( + functools.partial(DataArrayRolling._numpy_or_bottleneck_reduce, + array_agg_func=array_agg_func, + bottleneck_move_func=bottleneck_move_func), + **kwargs) def construct(self, window_dim, stride=1, fill_value=dtypes.NA): """ @@ -572,7 +601,5 @@ def wrapped_func(self, **kwargs): return wrapped_func -inject_bottleneck_rolling_methods(DataArrayRolling) -inject_datasetrolling_methods(DatasetRolling) inject_coarsen_methods(DataArrayCoarsen) inject_coarsen_methods(DatasetCoarsen) diff --git a/xarray/tests/test_dataarray.py b/xarray/tests/test_dataarray.py index 25d6c69d5a5..b0ea2cd5cc6 100644 --- a/xarray/tests/test_dataarray.py +++ b/xarray/tests/test_dataarray.py @@ -3747,7 +3747,6 @@ def test_rolling_wrapped_bottleneck(da, name, center, min_periods): @pytest.mark.parametrize('center', (True, False, None)) @pytest.mark.parametrize('min_periods', (1, None)) @pytest.mark.parametrize('window', (7, 8)) -@pytest.mark.xfail(reason='https://github.com/pydata/xarray/issues/2940') def test_rolling_wrapped_dask(da_dask, name, center, min_periods, window): pytest.importorskip('dask.array') # dask version