diff --git a/doc/conf.py b/doc/conf.py index f201af859b9..6c6efb47f6b 100644 --- a/doc/conf.py +++ b/doc/conf.py @@ -323,6 +323,7 @@ "dask": ("https://docs.dask.org/en/latest", None), "cftime": ("https://unidata.github.io/cftime", None), "sparse": ("https://sparse.pydata.org/en/latest/", None), + "cubed": ("https://tom-e-white.com/cubed/", None), } diff --git a/doc/internals/chunked-arrays.rst b/doc/internals/chunked-arrays.rst new file mode 100644 index 00000000000..7192c3f0bc5 --- /dev/null +++ b/doc/internals/chunked-arrays.rst @@ -0,0 +1,102 @@ +.. currentmodule:: xarray + +.. _internals.chunkedarrays: + +Alternative chunked array types +=============================== + +.. warning:: + + This is a *highly* experimental feature. Please report any bugs or other difficulties on `xarray's issue tracker `_. + In particular see discussion on `xarray issue #6807 `_ + +Xarray can wrap chunked dask arrays (see :ref:`dask`), but can also wrap any other chunked array type that exposes the correct interface. +This allows us to support using other frameworks for distributed and out-of-core processing, with user code still written as xarray commands. +In particular xarray also supports wrapping :py:class:`cubed.Array` objects +(see `Cubed's documentation `_ and the `cubed-xarray package `_). + +The basic idea is that by wrapping an array that has an explicit notion of ``.chunks``, xarray can expose control over +the choice of chunking scheme to users via methods like :py:meth:`DataArray.chunk` whilst the wrapped array actually +implements the handling of processing all of the chunks. + +Chunked array methods and "core operations" +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +A chunked array needs to meet all the :ref:`requirements for normal duck arrays `, but must also +implement additional features. + +Chunked arrays have additional attributes and methods, such as ``.chunks`` and ``.rechunk``. +Furthermore, Xarray dispatches chunk-aware computations across one or more chunked arrays using special functions known +as "core operations". Examples include ``map_blocks``, ``blockwise``, and ``apply_gufunc``. + +The core operations are generalizations of functions first implemented in :py:mod:`dask.array`. +The implementation of these functions is specific to the type of arrays passed to them. For example, when applying the +``map_blocks`` core operation, :py:class:`dask.array.Array` objects must be processed by :py:func:`dask.array.map_blocks`, +whereas :py:class:`cubed.Array` objects must be processed by :py:func:`cubed.map_blocks`. + +In order to use the correct implementation of a core operation for the array type encountered, xarray dispatches to the +corresponding subclass of :py:class:`~xarray.core.parallelcompat.ChunkManagerEntrypoint`, +also known as a "Chunk Manager". Therefore **a full list of the operations that need to be defined is set by the +API of the** :py:class:`~xarray.core.parallelcompat.ChunkManagerEntrypoint` **abstract base class**. Note that chunked array +methods are also currently dispatched using this class. + +Chunked array creation is also handled by this class. As chunked array objects have a one-to-one correspondence with +in-memory numpy arrays, it should be possible to create a chunked array from a numpy array by passing the desired +chunking pattern to an implementation of :py:class:`~xarray.core.parallelcompat.ChunkManagerEntrypoint.from_array``. + +.. note:: + + The :py:class:`~xarray.core.parallelcompat.ChunkManagerEntrypoint` abstract base class is mostly just acting as a + namespace for containing the chunked-aware function primitives. Ideally in the future we would have an API standard + for chunked array types which codified this structure, making the entrypoint system unnecessary. + +.. currentmodule:: xarray.core.parallelcompat + +.. autoclass:: xarray.core.parallelcompat.ChunkManagerEntrypoint + :members: + +Registering a new ChunkManagerEntrypoint subclass +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Rather than hard-coding various chunk managers to deal with specific chunked array implementations, xarray uses an +entrypoint system to allow developers of new chunked array implementations to register their corresponding subclass of +:py:class:`~xarray.core.parallelcompat.ChunkManagerEntrypoint`. + + +To register a new entrypoint you need to add an entry to the ``setup.cfg`` like this:: + + [options.entry_points] + xarray.chunkmanagers = + dask = xarray.core.daskmanager:DaskManager + +See also `cubed-xarray `_ for another example. + +To check that the entrypoint has worked correctly, you may find it useful to display the available chunkmanagers using +the internal function :py:func:`~xarray.core.parallelcompat.list_chunkmanagers`. + +.. autofunction:: list_chunkmanagers + + +User interface +~~~~~~~~~~~~~~ + +Once the chunkmanager subclass has been registered, xarray objects wrapping the desired array type can be created in 3 ways: + +#. By manually passing the array type to the :py:class:`~xarray.DataArray` constructor, see the examples for :ref:`numpy-like arrays `, + +#. Calling :py:meth:`~xarray.DataArray.chunk`, passing the keyword arguments ``chunked_array_type`` and ``from_array_kwargs``, + +#. Calling :py:func:`~xarray.open_dataset`, passing the keyword arguments ``chunked_array_type`` and ``from_array_kwargs``. + +The latter two methods ultimately call the chunkmanager's implementation of ``.from_array``, to which they pass the ``from_array_kwargs`` dict. +The ``chunked_array_type`` kwarg selects which registered chunkmanager subclass to dispatch to. It defaults to ``'dask'`` +if Dask is installed, otherwise it defaults to whichever chunkmanager is registered if only one is registered. +If multiple chunkmanagers are registered it will raise an error by default. + +Parallel processing without chunks +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +To use a parallel array type that does not expose a concept of chunks explicitly, none of the information on this page +is theoretically required. Such an array type (e.g. `Ramba `_ or +`Arkouda `_) could be wrapped using xarray's existing support for +:ref:`numpy-like "duck" arrays `. diff --git a/doc/internals/duck-arrays-integration.rst b/doc/internals/duck-arrays-integration.rst index 3b6313dbf2f..1f1f57974df 100644 --- a/doc/internals/duck-arrays-integration.rst +++ b/doc/internals/duck-arrays-integration.rst @@ -11,6 +11,8 @@ Integrating with duck arrays Xarray can wrap custom numpy-like arrays (":term:`duck array`\s") - see the :ref:`user guide documentation `. This page is intended for developers who are interested in wrapping a new custom array type with xarray. +.. _internals.duckarrays.requirements: + Duck array requirements ~~~~~~~~~~~~~~~~~~~~~~~ diff --git a/doc/internals/index.rst b/doc/internals/index.rst index 132f6c40ede..666f8fd2343 100644 --- a/doc/internals/index.rst +++ b/doc/internals/index.rst @@ -21,6 +21,7 @@ The pages in this section are intended for: variable-objects duck-arrays-integration + chunked-arrays extending-xarray zarr-encoding-spec how-to-add-new-backend diff --git a/doc/user-guide/duckarrays.rst b/doc/user-guide/duckarrays.rst index dc1d2d1cb8a..f0650ac61b5 100644 --- a/doc/user-guide/duckarrays.rst +++ b/doc/user-guide/duckarrays.rst @@ -27,7 +27,7 @@ Some numpy-like array types that xarray already has some support for: For information on wrapping dask arrays see :ref:`dask`. Whilst xarray wraps dask arrays in a similar way to that described on this page, chunked array types like :py:class:`dask.array.Array` implement additional methods that require - slightly different user code (e.g. calling ``.chunk`` or ``.compute``). + slightly different user code (e.g. calling ``.chunk`` or ``.compute``). See the docs on :ref:`wrapping chunked arrays `. Why "duck"? ----------- diff --git a/doc/whats-new.rst b/doc/whats-new.rst index 1667fd2fb9c..be7f8ade57c 100644 --- a/doc/whats-new.rst +++ b/doc/whats-new.rst @@ -38,6 +38,8 @@ Bug fixes Documentation ~~~~~~~~~~~~~ +- Added page on wrapping chunked numpy-like arrays as alternatives to dask arrays. + (:pull:`7951`) By `Tom Nicholas `_. - Expanded the page on wrapping numpy-like "duck" arrays. (:pull:`7911`) By `Tom Nicholas `_. - Added examples to docstrings of :py:meth:`Dataset.isel`, :py:meth:`Dataset.reduce`, :py:meth:`Dataset.argmin`, diff --git a/xarray/core/parallelcompat.py b/xarray/core/parallelcompat.py index 4df4ff235c6..26efc5fc412 100644 --- a/xarray/core/parallelcompat.py +++ b/xarray/core/parallelcompat.py @@ -31,7 +31,13 @@ @functools.lru_cache(maxsize=1) def list_chunkmanagers() -> dict[str, ChunkManagerEntrypoint]: """ - Return a dictionary of available chunk managers and their ChunkManagerEntrypoint objects. + Return a dictionary of available chunk managers and their ChunkManagerEntrypoint subclass objects. + + Returns + ------- + chunkmanagers : dict + Dictionary whose values are registered ChunkManagerEntrypoint subclass instances, and whose values + are the strings under which they are registered. Notes ----- @@ -143,7 +149,13 @@ def get_chunked_array_type(*args) -> ChunkManagerEntrypoint: class ChunkManagerEntrypoint(ABC, Generic[T_ChunkedArray]): """ - Adapter between a particular parallel computing framework and xarray. + Interface between a particular parallel computing framework and xarray. + + This abstract base class must be subclassed by libraries implementing chunked array types, and + registered via the ``chunkmanagers`` entrypoint. + + Abstract methods on this class must be implemented, whereas non-abstract methods are only required in order to + enable a subset of xarray functionality, and by default will raise a ``NotImplementedError`` if called. Attributes ---------- @@ -151,7 +163,7 @@ class ChunkManagerEntrypoint(ABC, Generic[T_ChunkedArray]): Type of the array class this parallel computing framework provides. Parallel frameworks need to provide an array class that supports the array API standard. - Used for type checking. + This attribute is used for array instance type checking at runtime. """ array_cls: type[T_ChunkedArray] @@ -159,13 +171,51 @@ class ChunkManagerEntrypoint(ABC, Generic[T_ChunkedArray]): @abstractmethod def __init__(self) -> None: + """Used to set the array_cls attribute at import time.""" raise NotImplementedError() def is_chunked_array(self, data: Any) -> bool: + """ + Check if the given object is an instance of this type of chunked array. + + Compares against the type stored in the array_cls attribute by default. + + Parameters + ---------- + data : Any + + Returns + ------- + is_chunked : bool + + See Also + -------- + dask.is_dask_collection + """ return isinstance(data, self.array_cls) @abstractmethod def chunks(self, data: T_ChunkedArray) -> T_NormalizedChunks: + """ + Return the current chunks of the given array. + + Returns chunks explicitly as a tuple of tuple of ints. + + Used internally by xarray objects' .chunks and .chunksizes properties. + + Parameters + ---------- + data : chunked array + + Returns + ------- + chunks : tuple[tuple[int, ...], ...] + + See Also + -------- + dask.array.Array.chunks + cubed.Array.chunks + """ raise NotImplementedError() @abstractmethod @@ -177,14 +227,58 @@ def normalize_chunks( dtype: np.dtype | None = None, previous_chunks: T_NormalizedChunks | None = None, ) -> T_NormalizedChunks: - """Called by open_dataset""" + """ + Normalize given chunking pattern into an explicit tuple of tuples representation. + + Exposed primarily because different chunking backends may want to make different decisions about how to + automatically chunk along dimensions not given explicitly in the input chunks. + + Called internally by xarray.open_dataset. + + Parameters + ---------- + chunks : tuple, int, dict, or string + The chunks to be normalized. + shape : Tuple[int] + The shape of the array + limit : int (optional) + The maximum block size to target in bytes, + if freedom is given to choose + dtype : np.dtype + previous_chunks : Tuple[Tuple[int]], optional + Chunks from a previous array that we should use for inspiration when + rechunking dimensions automatically. + + See Also + -------- + dask.array.core.normalize_chunks + """ raise NotImplementedError() @abstractmethod def from_array( self, data: np.ndarray, chunks: T_Chunks, **kwargs ) -> T_ChunkedArray: - """Called when .chunk is called on an xarray object that is not already chunked.""" + """ + Create a chunked array from a non-chunked numpy-like array. + + Generally input should have a ``.shape``, ``.ndim``, ``.dtype`` and support numpy-style slicing. + + Called when the .chunk method is called on an xarray object that is not already chunked. + Also called within open_dataset (when chunks is not None) to create a chunked array from + an xarray lazily indexed array. + + Parameters + ---------- + data : array_like + chunks : int, tuple + How to chunk the array. + + See Also + -------- + dask.array.from_array + cubed.from_array + """ raise NotImplementedError() def rechunk( @@ -193,17 +287,71 @@ def rechunk( chunks: T_NormalizedChunks | tuple[int, ...] | T_Chunks, **kwargs, ) -> T_ChunkedArray: - """Called when .chunk is called on an xarray object that is already chunked.""" + """ + Changes the chunking pattern of the given array. + + Called when the .chunk method is called on an xarray object that is already chunked. + + Parameters + ---------- + data : dask array + Array to be rechunked. + chunks : int, tuple, dict or str, optional + The new block dimensions to create. -1 indicates the full size of the + corresponding dimension. Default is "auto" which automatically + determines chunk sizes. + + Returns + ------- + chunked array + + See Also + -------- + dask.array.Array.rechunk + cubed.Array.rechunk + """ return data.rechunk(chunks, **kwargs) # type: ignore[attr-defined] @abstractmethod - def compute(self, *data: T_ChunkedArray, **kwargs) -> tuple[np.ndarray, ...]: - """Used anytime something needs to computed, including multiple arrays at once.""" + def compute(self, *data: T_ChunkedArray | Any, **kwargs) -> tuple[np.ndarray, ...]: + """ + Computes one or more chunked arrays, returning them as eager numpy arrays. + + Called anytime something needs to computed, including multiple arrays at once. + Used by `.compute`, `.persist`, `.values`. + + Parameters + ---------- + *data : object + Any number of objects. If an object is an instance of the chunked array type, it is computed + and the in-memory result returned as a numpy array. All other types should be passed through unchanged. + + Returns + ------- + objs + The input, but with all chunked arrays now computed. + + See Also + -------- + dask.compute + cubed.compute + """ raise NotImplementedError() @property def array_api(self) -> Any: - """Return the array_api namespace following the python array API standard.""" + """ + Return the array_api namespace following the python array API standard. + + See https://data-apis.org/array-api/latest/ . Currently used to access the array API function + ``full_like``, which is called within the xarray constructors ``xarray.full_like``, ``xarray.ones_like``, + ``xarray.zeros_like``, etc. + + See Also + -------- + dask.array + cubed.array_api + """ raise NotImplementedError() def reduction( @@ -216,7 +364,43 @@ def reduction( dtype: np.dtype | None = None, keepdims: bool = False, ) -> T_ChunkedArray: - """Used in some reductions like nanfirst, which is used by groupby.first""" + """ + A general version of array reductions along one or more axes. + + Used inside some reductions like nanfirst, which is used by ``groupby.first``. + + Parameters + ---------- + arr : chunked array + Data to be reduced along one or more axes. + func : Callable(x_chunk, axis, keepdims) + First function to be executed when resolving the dask graph. + This function is applied in parallel to all original chunks of x. + See below for function parameters. + combine_func : Callable(x_chunk, axis, keepdims), optional + Function used for intermediate recursive aggregation (see + split_every below). If omitted, it defaults to aggregate_func. + aggregate_func : Callable(x_chunk, axis, keepdims) + Last function to be executed, producing the final output. It is always invoked, even when the reduced + Array counts a single chunk along the reduced axes. + axis : int or sequence of ints, optional + Axis or axes to aggregate upon. If omitted, aggregate along all axes. + dtype : np.dtype + data type of output. This argument was previously optional, but + leaving as ``None`` will now raise an exception. + keepdims : boolean, optional + Whether the reduction function should preserve the reduced axes, + leaving them at size ``output_size``, or remove them. + + Returns + ------- + chunked array + + See Also + -------- + dask.array.reduction + cubed.core.reduction + """ raise NotImplementedError() @abstractmethod @@ -232,7 +416,77 @@ def apply_gufunc( **kwargs, ): """ - Called inside xarray.apply_ufunc, so must be supplied for vast majority of xarray computations to be supported. + Apply a generalized ufunc or similar python function to arrays. + + ``signature`` determines if the function consumes or produces core + dimensions. The remaining dimensions in given input arrays (``*args``) + are considered loop dimensions and are required to broadcast + naturally against each other. + + In other terms, this function is like ``np.vectorize``, but for + the blocks of chunked arrays. If the function itself shall also + be vectorized use ``vectorize=True`` for convenience. + + Called inside ``xarray.apply_ufunc``, which is called internally for most xarray operations. + Therefore this method must be implemented for the vast majority of xarray computations to be supported. + + Parameters + ---------- + func : callable + Function to call like ``func(*args, **kwargs)`` on input arrays + (``*args``) that returns an array or tuple of arrays. If multiple + arguments with non-matching dimensions are supplied, this function is + expected to vectorize (broadcast) over axes of positional arguments in + the style of NumPy universal functions [1]_ (if this is not the case, + set ``vectorize=True``). If this function returns multiple outputs, + ``output_core_dims`` has to be set as well. + signature: string + Specifies what core dimensions are consumed and produced by ``func``. + According to the specification of numpy.gufunc signature [2]_ + *args : numeric + Input arrays or scalars to the callable function. + axes: List of tuples, optional, keyword only + A list of tuples with indices of axes a generalized ufunc should operate on. + For instance, for a signature of ``"(i,j),(j,k)->(i,k)"`` appropriate for + matrix multiplication, the base elements are two-dimensional matrices + and these are taken to be stored in the two last axes of each argument. The + corresponding axes keyword would be ``[(-2, -1), (-2, -1), (-2, -1)]``. + For simplicity, for generalized ufuncs that operate on 1-dimensional arrays + (vectors), a single integer is accepted instead of a single-element tuple, + and for generalized ufuncs for which all outputs are scalars, the output + tuples can be omitted. + keepdims: bool, optional, keyword only + If this is set to True, axes which are reduced over will be left in the result as + a dimension with size one, so that the result will broadcast correctly against the + inputs. This option can only be used for generalized ufuncs that operate on inputs + that all have the same number of core dimensions and with outputs that have no core + dimensions , i.e., with signatures like ``"(i),(i)->()"`` or ``"(m,m)->()"``. + If used, the location of the dimensions in the output can be controlled with axes + and axis. + output_dtypes : Optional, dtype or list of dtypes, keyword only + Valid numpy dtype specification or list thereof. + If not given, a call of ``func`` with a small set of data + is performed in order to try to automatically determine the + output dtypes. + vectorize: bool, keyword only + If set to ``True``, ``np.vectorize`` is applied to ``func`` for + convenience. Defaults to ``False``. + **kwargs : dict + Extra keyword arguments to pass to `func` + + Returns + ------- + Single chunked array or tuple of chunked arrays + + See Also + -------- + dask.array.gufunc.apply_gufunc + cubed.apply_gufunc + + References + ---------- + .. [1] https://docs.scipy.org/doc/numpy/reference/ufuncs.html + .. [2] https://docs.scipy.org/doc/numpy/reference/c-api/generalized-ufuncs.html """ raise NotImplementedError() @@ -246,7 +500,42 @@ def map_blocks( new_axis: int | Sequence[int] | None = None, **kwargs, ): - """Called in elementwise operations, but notably not called in xarray.map_blocks.""" + """ + Map a function across all blocks of a chunked array. + + Called in elementwise operations, but notably not (currently) called within xarray.map_blocks. + + Parameters + ---------- + func : callable + Function to apply to every block in the array. + If ``func`` accepts ``block_info=`` or ``block_id=`` + as keyword arguments, these will be passed dictionaries + containing information about input and output chunks/arrays + during computation. See examples for details. + args : dask arrays or other objects + dtype : np.dtype, optional + The ``dtype`` of the output array. It is recommended to provide this. + If not provided, will be inferred by applying the function to a small + set of fake data. + chunks : tuple, optional + Chunk shape of resulting blocks if the function does not preserve + shape. If not provided, the resulting array is assumed to have the same + block structure as the first input array. + drop_axis : number or iterable, optional + Dimensions lost by the function. + new_axis : number or iterable, optional + New dimensions created by the function. Note that these are applied + after ``drop_axis`` (if present). + **kwargs : + Other keyword arguments to pass to function. Values must be constants + (not dask.arrays) + + See Also + -------- + dask.array.map_blocks + cubed.map_blocks + """ raise NotImplementedError() def blockwise( @@ -259,7 +548,45 @@ def blockwise( align_arrays: bool = True, **kwargs, ): - """Called by some niche functions in xarray.""" + """ + Tensor operation: Generalized inner and outer products. + + A broad class of blocked algorithms and patterns can be specified with a + concise multi-index notation. The ``blockwise`` function applies an in-memory + function across multiple blocks of multiple inputs in a variety of ways. + Many chunked array operations are special cases of blockwise including + elementwise, broadcasting, reductions, tensordot, and transpose. + + Currently only called explicitly in xarray when performing multidimensional interpolation. + + Parameters + ---------- + func : callable + Function to apply to individual tuples of blocks + out_ind : iterable + Block pattern of the output, something like 'ijk' or (1, 2, 3) + *args : sequence of Array, index pairs + You may also pass literal arguments, accompanied by None index + e.g. (x, 'ij', y, 'jk', z, 'i', some_literal, None) + **kwargs : dict + Extra keyword arguments to pass to function + adjust_chunks : dict + Dictionary mapping index to function to be applied to chunk sizes + new_axes : dict, keyword only + New indexes and their dimension lengths + align_arrays: bool + Whether or not to align chunks along equally sized dimensions when + multiple arrays are provided. This allows for larger chunks in some + arrays to be broken into smaller ones that match chunk sizes in other + arrays such that they are compatible for block function mapping. If + this is false, then an error will be thrown if arrays do not already + have the same number of blocks in each dimension. + + See Also + -------- + dask.array.blockwise + cubed.core.blockwise + """ raise NotImplementedError() def unify_chunks( @@ -267,7 +594,21 @@ def unify_chunks( *args: Any, # can't type this as mypy assumes args are all same type, but dask unify_chunks args alternate types **kwargs, ) -> tuple[dict[str, T_NormalizedChunks], list[T_ChunkedArray]]: - """Called by xr.unify_chunks.""" + """ + Unify chunks across a sequence of arrays. + + Called by xarray.unify_chunks. + + Parameters + ---------- + *args: sequence of Array, index pairs + Sequence like (x, 'ij', y, 'jk', z, 'i') + + See Also + -------- + dask.array.core.unify_chunks + cubed.core.unify_chunks + """ raise NotImplementedError() def store( @@ -276,5 +617,29 @@ def store( targets: Any, **kwargs: dict[str, Any], ): - """Used when writing to any backend.""" + """ + Store chunked arrays in array-like objects, overwriting data in target. + + This stores chunked arrays into object that supports numpy-style setitem + indexing (e.g. a Zarr Store). Allows storing values chunk by chunk so that it does not have to + fill up memory. For best performance you likely want to align the block size of + the storage target with the block size of your array. + + Used when writing to any registered xarray I/O backend. + + Parameters + ---------- + sources: Array or collection of Arrays + targets: array-like or collection of array-likes + These should support setitem syntax ``target[10:20] = ...``. + If sources is a single item, targets must be a single item; if sources is a + collection of arrays, targets must be a matching collection. + kwargs: + Parameters passed to compute/persist (only used if compute=True) + + See Also + -------- + dask.array.store + cubed.store + """ raise NotImplementedError()