diff --git a/xarray/core/computation.py b/xarray/core/computation.py index 784c722989c..a3838a004ce 100644 --- a/xarray/core/computation.py +++ b/xarray/core/computation.py @@ -64,32 +64,13 @@ def all_core_dims(self): return self._all_core_dims @property - def n_inputs(self): + def num_inputs(self): return len(self.input_core_dims) @property - def n_outputs(self): + def num_outputs(self): return len(self.output_core_dims) - @classmethod - def default(cls, n_inputs): - return cls([()] * n_inputs, [()]) - - @classmethod - def from_sequence(cls, nested): - if (not isinstance(nested, collections.Sequence) or - not len(nested) == 2 or - any(not isinstance(arg_list, collections.Sequence) - for arg_list in nested) or - any(isinstance(arg, basestring) or - not isinstance(arg, collections.Sequence) - for arg_list in nested for arg in arg_list)): - raise TypeError('functions signatures not provided as a string ' - 'must be a triply nested sequence providing the ' - 'list of core dimensions for each variable, for ' - 'both input and output.') - return cls(*nested) - def __eq__(self, other): try: return (self.input_core_dims == other.input_core_dims and @@ -190,7 +171,7 @@ def apply_dataarray_ufunc(func, *args, **kwargs): data_vars = [getattr(a, 'variable', a) for a in args] result_var = func(*data_vars) - if signature.n_outputs > 1: + if signature.num_outputs > 1: out = tuple(DataArray(variable, coords, name=name, fastpath=True) for variable, coords in zip(result_var, result_coords)) else: @@ -269,10 +250,10 @@ def _as_variables_or_variable(arg): def _unpack_dict_tuples( result_vars, # type: Mapping[Any, Tuple[Variable]] - n_outputs, # type: int + num_outputs, # type: int ): # type: (...) -> Tuple[Dict[Any, Variable]] - out = tuple(OrderedDict() for _ in range(n_outputs)) + out = tuple(OrderedDict() for _ in range(num_outputs)) for name, values in result_vars.items(): for value, results_dict in zip(values, out): results_dict[name] = value @@ -298,8 +279,8 @@ def apply_dict_of_variables_ufunc(func, *args, **kwargs): for name, variable_args in zip(names, grouped_by_name): result_vars[name] = func(*variable_args) - if signature.n_outputs > 1: - return _unpack_dict_tuples(result_vars, signature.n_outputs) + if signature.num_outputs > 1: + return _unpack_dict_tuples(result_vars, signature.num_outputs) else: return result_vars @@ -335,8 +316,8 @@ def apply_dataset_ufunc(func, *args, **kwargs): if (dataset_join not in _JOINS_WITHOUT_FILL_VALUES and fill_value is _DEFAULT_FILL_VALUE): - raise TypeError('To apply an operation to datasets with different ', - 'data variables, you must supply the ', + raise TypeError('to apply an operation to datasets with different ' + 'data variables with apply_ufunc, you must supply the ' 'dataset_fill_value argument.') if kwargs: @@ -353,7 +334,7 @@ def apply_dataset_ufunc(func, *args, **kwargs): func, *args, signature=signature, join=dataset_join, fill_value=fill_value) - if signature.n_outputs > 1: + if signature.num_outputs > 1: out = tuple(_fast_dataset(*args) for args in zip(result_vars, list_of_coords)) else: @@ -388,12 +369,12 @@ def apply_groupby_ufunc(func, *args): from .variable import Variable groupbys = [arg for arg in args if isinstance(arg, GroupBy)] - if not groupbys: - raise ValueError('must have at least one groupby to iterate over') + assert groupbys, 'must have at least one groupby to iterate over' first_groupby = groupbys[0] if any(not first_groupby._group.equals(gb._group) for gb in groupbys[1:]): - raise ValueError('can only perform operations over multiple groupbys ' - 'at once if they are all grouped the same way') + raise ValueError('apply_ufunc can only perform operations over ' + 'multiple GroupBy objets at once if they are all ' + 'grouped the same way') grouped_dim = first_groupby._group.name unique_values = first_groupby._unique_coord.values @@ -430,7 +411,7 @@ def unified_dim_sizes(variables, exclude_dims=frozenset()): for var in variables: if len(set(var.dims)) < len(var.dims): raise ValueError('broadcasting cannot handle duplicate ' - 'dimensions: %r' % list(var.dims)) + 'dimensions on a variable: %r' % list(var.dims)) for dim, size in zip(var.dims, var.shape): if dim not in exclude_dims: if dim not in dim_sizes: @@ -462,15 +443,17 @@ def broadcast_compat_data(variable, broadcast_dims, core_dims): set_old_dims = set(old_dims) missing_core_dims = [d for d in core_dims if d not in set_old_dims] if missing_core_dims: - raise ValueError('operation requires dimensions missing on input ' - 'variable: %r' % missing_core_dims) + raise ValueError('operand to apply_ufunc has required core dimensions ' + '%r, but some of these are missing on the input ' + 'variable: %r' % (list(core_dims), missing_core_dims)) set_new_dims = set(new_dims) unexpected_dims = [d for d in old_dims if d not in set_new_dims] if unexpected_dims: - raise ValueError('operation encountered unexpected dimensions %r ' - 'on input variable: these are core dimensions on ' - 'other input or output variables' % unexpected_dims) + raise ValueError('operand to apply_ufunc encountered unexpected ' + 'dimensions %r on an input variable: these are core ' + 'dimensions on other input or output variables' + % unexpected_dims) # for consistency with numpy, keep broadcast dimensions to the left old_broadcast_dims = tuple(d for d in broadcast_dims if d in set_old_dims) @@ -500,6 +483,9 @@ def apply_variable_ufunc(func, *args, **kwargs): signature = kwargs.pop('signature') exclude_dims = kwargs.pop('exclude_dims', _DEFAULT_FROZEN_SET) + dask = kwargs.pop('dask', 'forbidden') + output_dtypes = kwargs.pop('output_dtypes', None) + output_sizes = kwargs.pop('output_sizes', None) if kwargs: raise TypeError('apply_variable_ufunc() got unexpected keyword ' 'arguments: %s' % list(kwargs)) @@ -515,9 +501,28 @@ def apply_variable_ufunc(func, *args, **kwargs): else arg for arg, core_dims in zip(args, signature.input_core_dims)] + if any(isinstance(array, dask_array_type) for array in input_data): + if dask == 'forbidden': + raise ValueError('apply_ufunc encountered a dask array on an ' + 'argument, but handling for dask arrays has not ' + 'been enabled. Either set the ``dask`` argument ' + 'or load your data into memory first with ' + '``.load()`` or ``.compute()``') + elif dask == 'parallelized': + input_dims = [broadcast_dims + input_dims + for input_dims in signature.input_core_dims] + numpy_func = func + func = lambda *arrays: _apply_with_dask_atop( + numpy_func, arrays, input_dims, output_dims, signature, + output_dtypes, output_sizes) + elif dask == 'allowed': + pass + else: + raise ValueError('unknown setting for dask array handling in ' + 'apply_ufunc: {}'.format(dask)) result_data = func(*input_data) - if signature.n_outputs > 1: + if signature.num_outputs > 1: output = [] for dims, data in zip(output_dims, result_data): output.append(Variable(dims, data)) @@ -527,24 +532,83 @@ def apply_variable_ufunc(func, *args, **kwargs): return Variable(dims, result_data) +def _apply_with_dask_atop(func, args, input_dims, output_dims, signature, + output_dtypes, output_sizes=None): + import dask.array as da + + if signature.num_outputs > 1: + raise NotImplementedError('multiple outputs from apply_ufunc not yet ' + "supported with dask='parallelized'") + + if output_dtypes is None: + raise ValueError('output dtypes (output_dtypes) must be supplied to ' + "apply_func when using dask='parallelized'") + if not isinstance(output_dtypes, list): + raise TypeError('output_dtypes must be a list of objects coercible to ' + 'numpy dtypes, got {}'.format(output_dtypes)) + if len(output_dtypes) != signature.num_outputs: + raise ValueError('apply_ufunc arguments output_dtypes and ' + 'output_core_dims must have the same length: {} vs {}' + .format(len(output_dtypes), signature.num_outputs)) + (dtype,) = output_dtypes + + if output_sizes is None: + output_sizes = {} + + new_dims = signature.all_output_core_dims - signature.all_input_core_dims + if any(dim not in output_sizes for dim in new_dims): + raise ValueError("when using dask='parallelized' with apply_ufunc, " + 'output core dimensions not found on inputs must have ' + 'explicitly set sizes with ``output_sizes``: {}' + .format(new_dims)) + + for n, (data, core_dims) in enumerate( + zip(args, signature.input_core_dims)): + if isinstance(data, dask_array_type): + # core dimensions cannot span multiple chunks + for axis, dim in enumerate(core_dims, start=-len(core_dims)): + if len(data.chunks[axis]) != 1: + raise ValueError( + 'dimension {!r} on {}th function argument to ' + "apply_ufunc with dask='parallelized' consists of " + 'multiple chunks, but is also a core dimension. To ' + 'fix, rechunk into a single dask array chunk along ' + 'this dimension, i.e., ``.rechunk({})``, but beware ' + 'that this may significantly increase memory usage.' + .format(dim, n, {dim: -1})) + + (out_ind,) = output_dims + # skip leading dimensions that we did not insert with broadcast_compat_data + atop_args = [element + for (arg, dims) in zip(args, input_dims) + for element in (arg, dims[-getattr(arg, 'ndim', 0):])] + return da.atop(func, out_ind, *atop_args, dtype=dtype, concatenate=True, + new_axes=output_sizes) + + def apply_array_ufunc(func, *args, **kwargs): - """apply_variable_ufunc(func, *args, dask_array='forbidden') + """apply_array_ufunc(func, *args, dask='forbidden') """ - dask_array = kwargs.pop('dask_array', 'forbidden') + dask = kwargs.pop('dask', 'forbidden') if kwargs: raise TypeError('apply_array_ufunc() got unexpected keyword ' 'arguments: %s' % list(kwargs)) if any(isinstance(arg, dask_array_type) for arg in args): - # TODO: add a mode dask_array='auto' when dask.array gets a function - # for applying arbitrary gufuncs - if dask_array == 'forbidden': - raise ValueError('encountered dask array, but did not set ' - "dask_array='allowed'") - elif dask_array != 'allowed': - raise ValueError('unknown setting for dask array handling: %r' - % dask_array) - # fall through + if dask == 'forbidden': + raise ValueError('apply_ufunc encountered a dask array on an ' + 'argument, but handling for dask arrays has not ' + 'been enabled. Either set the ``dask`` argument ' + 'or load your data into memory first with ' + '``.load()`` or ``.compute()``') + elif dask == 'parallelized': + raise ValueError("cannot use dask='parallelized' for apply_ufunc " + 'unless at least one input is an xarray object') + elif dask == 'allowed': + pass + else: + raise ValueError('unknown setting for dask array handling: {}' + .format(dask)) return func(*args) @@ -559,7 +623,9 @@ def apply_ufunc(func, *args, **kwargs): dataset_fill_value : Any = _DEFAULT_FILL_VALUE, keep_attrs : bool = False, kwargs : Mapping = None, - dask_array : str = 'forbidden') + dask : str = 'forbidden', + output_dtypes : Optional[Sequence] = None, + output_sizes : Optional[Mapping[Any, int]] = None) Apply a vectorized function for unlabeled arrays on xarray objects. @@ -581,8 +647,8 @@ def apply_ufunc(func, *args, **kwargs): Mix of labeled and/or unlabeled arrays to which to apply the function. input_core_dims : Sequence[Sequence], optional List of the same length as ``args`` giving the list of core dimensions - on each input argument that should be broadcast. By default, we assume - there are no core dimensions on any input arguments. + on each input argument that should not be broadcast. By default, we + assume there are no core dimensions on any input arguments. For example ,``input_core_dims=[[], ['time']]`` indicates that all dimensions on the first argument and all dimensions other than 'time' @@ -630,10 +696,20 @@ def apply_ufunc(func, *args, **kwargs): Whether to copy attributes from the first argument to the output. kwargs: dict, optional Optional keyword arguments passed directly on to call ``func``. - dask_array: 'forbidden' or 'allowed', optional - Whether or not to allow applying the ufunc to objects containing lazy - data in the form of dask arrays. By default, this is forbidden, to - avoid implicitly converting lazy data. + dask: 'forbidden', 'allowed' or 'parallelized', optional + How to handle applying to objects containing lazy data in the form of + dask arrays: + - 'forbidden' (default): raise an error if a dask array is encountered. + - 'allowed': pass dask arrays directly on to ``func``. + - 'parallelized': automatically parallelize ``func`` if any of the + inputs are a dask array. If used, the ``output_dtypes`` argument must + also be provided. Multiple output arguments are not yet supported. + output_dtypes : list of dtypes, optional + Optional list of output dtypes. Only used if dask='parallelized'. + output_sizes : dict, optional + Optional mapping from dimension names to sizes for outputs. Only used if + dask='parallelized' and new dimensions (not found on inputs) appear on + outputs. Returns ------- @@ -710,7 +786,9 @@ def stack(objects, dim, new_coord): exclude_dims = kwargs.pop('exclude_dims', frozenset()) dataset_fill_value = kwargs.pop('dataset_fill_value', _DEFAULT_FILL_VALUE) kwargs_ = kwargs.pop('kwargs', None) - dask_array = kwargs.pop('dask_array', 'forbidden') + dask = kwargs.pop('dask', 'forbidden') + output_dtypes = kwargs.pop('output_dtypes', None) + output_sizes = kwargs.pop('output_sizes', None) if kwargs: raise TypeError('apply_ufunc() got unexpected keyword arguments: %s' % list(kwargs)) @@ -727,12 +805,12 @@ def stack(objects, dim, new_coord): if kwargs_: func = functools.partial(func, **kwargs_) - array_ufunc = functools.partial( - apply_array_ufunc, func, dask_array=dask_array) - - variables_ufunc = functools.partial(apply_variable_ufunc, array_ufunc, + variables_ufunc = functools.partial(apply_variable_ufunc, func, signature=signature, - exclude_dims=exclude_dims) + exclude_dims=exclude_dims, + dask=dask, + output_dtypes=output_dtypes, + output_sizes=output_sizes) if any(isinstance(a, GroupBy) for a in args): # kwargs has already been added into func @@ -744,7 +822,7 @@ def stack(objects, dim, new_coord): dataset_join=dataset_join, dataset_fill_value=dataset_fill_value, keep_attrs=keep_attrs, - dask_array=dask_array) + dask=dask) return apply_groupby_ufunc(this_apply, *args) elif any(is_dict_like(a) for a in args): return apply_dataset_ufunc(variables_ufunc, *args, @@ -763,7 +841,7 @@ def stack(objects, dim, new_coord): elif any(isinstance(a, Variable) for a in args): return variables_ufunc(*args) else: - return array_ufunc(*args) + return apply_array_ufunc(func, *args, dask=dask) def where(cond, x, y): @@ -805,4 +883,4 @@ def where(cond, x, y): cond, x, y, join='exact', dataset_join='exact', - dask_array='allowed') + dask='allowed') diff --git a/xarray/core/ops.py b/xarray/core/ops.py index b840f26073c..04f156a2998 100644 --- a/xarray/core/ops.py +++ b/xarray/core/ops.py @@ -148,7 +148,7 @@ def fillna(data, other, join="left", dataset_join="left"): return apply_ufunc(duck_array_ops.fillna, data, other, join=join, - dask_array="allowed", + dask="allowed", dataset_join=dataset_join, dataset_fill_value=np.nan, keep_attrs=True) @@ -176,7 +176,7 @@ def where_method(self, cond, other=dtypes.NA): self, cond, other, join=join, dataset_join=join, - dask_array='allowed', + dask='allowed', keep_attrs=True) diff --git a/xarray/tests/__init__.py b/xarray/tests/__init__.py index 424484b438a..33dd4305e9f 100644 --- a/xarray/tests/__init__.py +++ b/xarray/tests/__init__.py @@ -188,8 +188,8 @@ def raises_regex(error, pattern): with pytest.raises(error) as excinfo: yield message = str(excinfo.value) - if not re.match(pattern, message): - raise AssertionError('exception %r did not match pattern %s' + if not re.search(pattern, message): + raise AssertionError('exception %r did not match pattern %r' % (excinfo.value, pattern)) diff --git a/xarray/tests/test_computation.py b/xarray/tests/test_computation.py index 76d12d51616..bb19692cb03 100644 --- a/xarray/tests/test_computation.py +++ b/xarray/tests/test_computation.py @@ -13,7 +13,7 @@ join_dict_keys, ordered_set_intersection, ordered_set_union, unified_dim_sizes, apply_ufunc) -from . import requires_dask +from . import requires_dask, raises_regex def assert_identical(a, b): @@ -30,8 +30,8 @@ def test_signature_properties(): assert sig.output_core_dims == (('z',),) assert sig.all_input_core_dims == frozenset(['x', 'y']) assert sig.all_output_core_dims == frozenset(['z']) - assert sig.n_inputs == 2 - assert sig.n_outputs == 1 + assert sig.num_inputs == 2 + assert sig.num_outputs == 1 # dimension names matter assert _UFuncSignature([['x']]) != _UFuncSignature([['y']]) @@ -275,7 +275,7 @@ def test_apply_output_core_dimension(): def stack_negative(obj): def func(x): - return xr.core.npcompat.stack([x, -x], axis=-1) + return np.stack([x, -x], axis=-1) result = apply_ufunc(func, obj, output_core_dims=[['sign']]) if isinstance(result, (xr.Dataset, xr.DataArray)): result.coords['sign'] = [1, -1] @@ -303,7 +303,7 @@ def func(x): def original_and_stack_negative(obj): def func(x): - return (x, xr.core.npcompat.stack([x, -x], axis=-1)) + return (x, np.stack([x, -x], axis=-1)) result = apply_ufunc(func, obj, output_core_dims=[[], ['sign']]) if isinstance(result[1], (xr.Dataset, xr.DataArray)): result[1].coords['sign'] = [1, -1] @@ -344,7 +344,7 @@ def func(*x): output_core_dims=[[dim]], exclude_dims={dim}) if isinstance(result, (xr.Dataset, xr.DataArray)): - # note: this will fail dim is not a coordinate on any input + # note: this will fail if dim is not a coordinate on any input new_coord = np.concatenate([obj.coords[dim] for obj in objects]) result.coords[dim] = new_coord return result @@ -530,25 +530,17 @@ def add(a, b, join, dataset_join): assert_identical(actual, expected) -class _NoCacheVariable(xr.Variable): - """Subclass of Variable for testing that does not cache values.""" - # TODO: remove this class when we change the default behavior for caching - # dask.array objects. - def _data_cached(self): - return np.asarray(self._data) - - @requires_dask def test_apply_dask(): import dask.array as da array = da.ones((2,), chunks=2) - variable = _NoCacheVariable('x', array) + variable = xr.Variable('x', array) coords = xr.DataArray(variable).coords.variables data_array = xr.DataArray(variable, coords, fastpath=True) dataset = xr.Dataset({'y': variable}) - # encountered dask array, but did not set dask_array='allowed' + # encountered dask array, but did not set dask='allowed' with pytest.raises(ValueError): apply_ufunc(identity, array) with pytest.raises(ValueError): @@ -560,10 +552,10 @@ def test_apply_dask(): # unknown setting for dask array handling with pytest.raises(ValueError): - apply_ufunc(identity, array, dask_array='auto') + apply_ufunc(identity, array, dask='unknown') def dask_safe_identity(x): - return apply_ufunc(identity, x, dask_array='allowed') + return apply_ufunc(identity, x, dask='allowed') assert array is dask_safe_identity(array) @@ -580,6 +572,104 @@ def dask_safe_identity(x): assert_identical(dataset, actual) +@requires_dask +def test_apply_dask_parallelized(): + import dask.array as da + + array = da.ones((2, 2), chunks=(1, 1)) + data_array = xr.DataArray(array, dims=('x', 'y')) + + actual = apply_ufunc(identity, data_array, dask='parallelized', + output_dtypes=[float]) + assert isinstance(actual.data, da.Array) + assert actual.data.chunks == array.chunks + assert_identical(data_array, actual) + + +@requires_dask +def test_apply_dask_parallelized_errors(): + import dask.array as da + + array = da.ones((2, 2), chunks=(1, 1)) + data_array = xr.DataArray(array, dims=('x', 'y')) + + with pytest.raises(NotImplementedError): + apply_ufunc(identity, data_array, output_core_dims=[['z'], ['z']], + dask='parallelized') + with raises_regex(ValueError, 'dtypes'): + apply_ufunc(identity, data_array, dask='parallelized') + with raises_regex(TypeError, 'list'): + apply_ufunc(identity, data_array, dask='parallelized', + output_dtypes=float) + with raises_regex(ValueError, 'must have the same length'): + apply_ufunc(identity, data_array, dask='parallelized', + output_dtypes=[float, float]) + with raises_regex(ValueError, 'output_sizes'): + apply_ufunc(identity, data_array, output_core_dims=[['z']], + output_dtypes=[float], dask='parallelized') + with raises_regex(ValueError, 'at least one input is an xarray object'): + apply_ufunc(identity, array, dask='parallelized') + + with raises_regex(ValueError, 'consists of multiple chunks'): + apply_ufunc(identity, data_array, dask='parallelized', + output_dtypes=[float], + input_core_dims=[('y',)], + output_core_dims=[('y',)]) + + +@requires_dask +def test_apply_dask_multiple_inputs(): + import dask.array as da + + def covariance(x, y): + return ((x - x.mean(axis=-1, keepdims=True)) + * (y - y.mean(axis=-1, keepdims=True))).mean(axis=-1) + + rs = np.random.RandomState(42) + array1 = da.from_array(rs.randn(4, 4), chunks=(2, 4)) + array2 = da.from_array(rs.randn(4, 4), chunks=(2, 4)) + data_array_1 = xr.DataArray(array1, dims=('x', 'z')) + data_array_2 = xr.DataArray(array2, dims=('y', 'z')) + + expected = apply_ufunc( + covariance, data_array_1.compute(), data_array_2.compute(), + input_core_dims=[['z'], ['z']]) + allowed = apply_ufunc( + covariance, data_array_1, data_array_2, input_core_dims=[['z'], ['z']], + dask='allowed') + assert isinstance(allowed.data, da.Array) + xr.testing.assert_allclose(expected, allowed.compute()) + + parallelized = apply_ufunc( + covariance, data_array_1, data_array_2, input_core_dims=[['z'], ['z']], + dask='parallelized', output_dtypes=[float]) + assert isinstance(parallelized.data, da.Array) + xr.testing.assert_allclose(expected, parallelized.compute()) + + +@requires_dask +def test_apply_dask_new_output_dimension(): + import dask.array as da + + array = da.ones((2, 2), chunks=(1, 1)) + data_array = xr.DataArray(array, dims=('x', 'y')) + + def stack_negative(obj): + def func(x): + return np.stack([x, -x], axis=-1) + return apply_ufunc(func, obj, output_core_dims=[['sign']], + dask='parallelized', output_dtypes=[obj.dtype], + output_sizes={'sign': 2}) + + expected = stack_negative(data_array.compute()) + + actual = stack_negative(data_array) + assert actual.dims == ('x', 'y', 'sign') + assert actual.shape == (2, 2, 2) + assert isinstance(actual.data, da.Array) + assert_identical(expected, actual) + + def test_where(): cond = xr.DataArray([True, False], dims='x') actual = xr.where(cond, 1, 0)