diff --git a/xarray/core/common.py b/xarray/core/common.py index 936a6bb2489..28ffc80e4e7 100644 --- a/xarray/core/common.py +++ b/xarray/core/common.py @@ -1067,7 +1067,8 @@ def _resample( # TODO support non-string indexer after removing the old API. from xarray.core.dataarray import DataArray - from xarray.core.groupby import ResolvedGrouper, TimeResampler + from xarray.core.groupby import ResolvedGrouper + from xarray.core.groupers import TimeResampler from xarray.core.resample import RESAMPLE_DIM # note: the second argument (now 'skipna') use to be 'dim' diff --git a/xarray/core/dataarray.py b/xarray/core/dataarray.py index 16b9330345b..d3390d26655 100644 --- a/xarray/core/dataarray.py +++ b/xarray/core/dataarray.py @@ -6751,9 +6751,9 @@ def groupby( from xarray.core.groupby import ( DataArrayGroupBy, ResolvedGrouper, - UniqueGrouper, _validate_groupby_squeeze, ) + from xarray.core.groupers import UniqueGrouper _validate_groupby_squeeze(squeeze) rgrouper = ResolvedGrouper(UniqueGrouper(), group, self) @@ -6833,11 +6833,11 @@ def groupby_bins( .. [1] http://pandas.pydata.org/pandas-docs/stable/generated/pandas.cut.html """ from xarray.core.groupby import ( - BinGrouper, DataArrayGroupBy, ResolvedGrouper, _validate_groupby_squeeze, ) + from xarray.core.groupers import BinGrouper _validate_groupby_squeeze(squeeze) grouper = BinGrouper( diff --git a/xarray/core/dataset.py b/xarray/core/dataset.py index 872cb482fe8..0923c5d4822 100644 --- a/xarray/core/dataset.py +++ b/xarray/core/dataset.py @@ -10301,9 +10301,9 @@ def groupby( from xarray.core.groupby import ( DatasetGroupBy, ResolvedGrouper, - UniqueGrouper, _validate_groupby_squeeze, ) + from xarray.core.groupers import UniqueGrouper _validate_groupby_squeeze(squeeze) rgrouper = ResolvedGrouper(UniqueGrouper(), group, self) @@ -10384,11 +10384,11 @@ def groupby_bins( .. [1] http://pandas.pydata.org/pandas-docs/stable/generated/pandas.cut.html """ from xarray.core.groupby import ( - BinGrouper, DatasetGroupBy, ResolvedGrouper, _validate_groupby_squeeze, ) + from xarray.core.groupers import BinGrouper _validate_groupby_squeeze(squeeze) grouper = BinGrouper( diff --git a/xarray/core/groupby.py b/xarray/core/groupby.py index 5966c32df92..eceb4e62199 100644 --- a/xarray/core/groupby.py +++ b/xarray/core/groupby.py @@ -1,9 +1,7 @@ from __future__ import annotations import copy -import datetime import warnings -from abc import ABC, abstractmethod from collections.abc import Hashable, Iterator, Mapping, Sequence from dataclasses import dataclass, field from typing import TYPE_CHECKING, Any, Callable, Generic, Literal, Union @@ -12,7 +10,6 @@ import pandas as pd from packaging.version import Version -from xarray.coding.cftime_offsets import _new_to_legacy_freq from xarray.core import dtypes, duck_array_ops, nputils, ops from xarray.core._aggregations import ( DataArrayGroupByAggregations, @@ -26,7 +23,6 @@ from xarray.core.indexes import ( create_default_index_implicit, filter_indexes_from_coords, - safe_cast_to_index, ) from xarray.core.options import OPTIONS, _get_keep_attrs from xarray.core.types import ( @@ -55,14 +51,10 @@ from xarray.core.dataarray import DataArray from xarray.core.dataset import Dataset - from xarray.core.resample_cftime import CFTimeGrouper - from xarray.core.types import DatetimeLike, SideOptions + from xarray.core.groupers import Grouper + from xarray.core.types import GroupIndex, GroupKey, T_GroupIndices from xarray.core.utils import Frozen - GroupKey = Any - GroupIndex = Union[int, slice, list[int]] - T_GroupIndices = list[GroupIndex] - def check_reduce_dims(reduce_dims, dimensions): if reduce_dims is not ...: @@ -79,6 +71,8 @@ def check_reduce_dims(reduce_dims, dimensions): def _maybe_squeeze_indices( indices, squeeze: bool | None, grouper: ResolvedGrouper, warn: bool ): + from xarray.core.groupers import UniqueGrouper + is_unique_grouper = isinstance(grouper.grouper, UniqueGrouper) can_squeeze = is_unique_grouper and grouper.grouper.can_squeeze if squeeze in [None, True] and can_squeeze: @@ -95,32 +89,6 @@ def _maybe_squeeze_indices( return indices -def unique_value_groups( - ar, sort: bool = True -) -> tuple[np.ndarray | pd.Index, np.ndarray]: - """Group an array by its unique values. - - Parameters - ---------- - ar : array-like - Input array. This will be flattened if it is not already 1-D. - sort : bool, default: True - Whether or not to sort unique values. - - Returns - ------- - values : np.ndarray - Sorted, unique values as returned by `np.unique`. - indices : list of lists of int - Each element provides the integer indices in `ar` with values given by - the corresponding value in `unique_values`. - """ - inverse, values = pd.factorize(ar, sort=sort) - if isinstance(values, pd.MultiIndex): - values.names = ar.names - return values, inverse - - def _codes_to_group_indices(inverse: np.ndarray, N: int) -> T_GroupIndices: assert inverse.ndim == 1 groups: T_GroupIndices = [[] for _ in range(N)] @@ -308,96 +276,6 @@ def _ensure_1d(group: T_Group, obj: T_DataWithCoords) -> tuple[ ) -def _apply_loffset( - loffset: str | pd.DateOffset | datetime.timedelta | pd.Timedelta, - result: pd.Series | pd.DataFrame, -): - """ - (copied from pandas) - if loffset is set, offset the result index - - This is NOT an idempotent routine, it will be applied - exactly once to the result. - - Parameters - ---------- - result : Series or DataFrame - the result of resample - """ - # pd.Timedelta is a subclass of datetime.timedelta so we do not need to - # include it in instance checks. - if not isinstance(loffset, (str, pd.DateOffset, datetime.timedelta)): - raise ValueError( - f"`loffset` must be a str, pd.DateOffset, datetime.timedelta, or pandas.Timedelta object. " - f"Got {loffset}." - ) - - if isinstance(loffset, str): - loffset = pd.tseries.frequencies.to_offset(loffset) - - needs_offset = ( - isinstance(loffset, (pd.DateOffset, datetime.timedelta)) - and isinstance(result.index, pd.DatetimeIndex) - and len(result.index) > 0 - ) - - if needs_offset: - result.index = result.index + loffset - - -class Grouper(ABC): - """Base class for Grouper objects that allow specializing GroupBy instructions.""" - - @property - def can_squeeze(self) -> bool: - """TODO: delete this when the `squeeze` kwarg is deprecated. Only `UniqueGrouper` - should override it.""" - return False - - @abstractmethod - def factorize(self, group) -> EncodedGroups: - """ - Takes the group, and creates intermediates necessary for GroupBy. - These intermediates are - 1. codes - Same shape as `group` containing a unique integer code for each group. - 2. group_indices - Indexes that let us index out the members of each group. - 3. unique_coord - Unique groups present in the dataset. - 4. full_index - Unique groups in the output. This differs from `unique_coord` in the - case of resampling and binning, where certain groups in the output are not present in - the input. - """ - pass - - -class Resampler(Grouper): - """Base class for Grouper objects that allow specializing resampling-type GroupBy instructions. - Currently only used for TimeResampler, but could be used for SpaceResampler in the future. - """ - - pass - - -@dataclass -class EncodedGroups: - """ - Dataclass for storing intermediate values for GroupBy operation. - Returned by factorize method on Grouper objects. - - Parameters - ---------- - codes: integer codes for each group - full_index: pandas Index for the group coordinate - group_indices: optional, List of indices of array elements belonging - to each group. Inferred if not provided. - unique_coord: Unique group values present in dataset. Inferred if not provided - """ - - codes: DataArray - full_index: pd.Index - group_indices: T_GroupIndices | None = field(default=None) - unique_coord: IndexVariable | _DummyGroup | None = field(default=None) - - @dataclass class ResolvedGrouper(Generic[T_DataWithCoords]): """ @@ -481,248 +359,12 @@ def factorize(self) -> None: if encoded.unique_coord is None: unique_values = self.full_index[np.unique(encoded.codes)] self.unique_coord = IndexVariable( - self.group.name, unique_values, attrs=self.group.attrs + self.codes.name, unique_values, attrs=self.group.attrs ) else: self.unique_coord = encoded.unique_coord -@dataclass -class UniqueGrouper(Grouper): - """Grouper object for grouping by a categorical variable.""" - - _group_as_index: pd.Index | None = None - - @property - def is_unique_and_monotonic(self) -> bool: - if isinstance(self.group, _DummyGroup): - return True - index = self.group_as_index - return index.is_unique and index.is_monotonic_increasing - - @property - def group_as_index(self) -> pd.Index: - if self._group_as_index is None: - self._group_as_index = self.group.to_index() - return self._group_as_index - - @property - def can_squeeze(self) -> bool: - is_dimension = self.group.dims == (self.group.name,) - return is_dimension and self.is_unique_and_monotonic - - def factorize(self, group1d) -> EncodedGroups: - self.group = group1d - - if self.can_squeeze: - return self._factorize_dummy() - else: - return self._factorize_unique() - - def _factorize_unique(self) -> EncodedGroups: - # look through group to find the unique values - sort = not isinstance(self.group_as_index, pd.MultiIndex) - unique_values, codes_ = unique_value_groups(self.group_as_index, sort=sort) - if (codes_ == -1).all(): - raise ValueError( - "Failed to group data. Are you grouping by a variable that is all NaN?" - ) - codes = self.group.copy(data=codes_) - unique_coord = IndexVariable( - self.group.name, unique_values, attrs=self.group.attrs - ) - full_index = unique_coord - - return EncodedGroups( - codes=codes, full_index=full_index, unique_coord=unique_coord - ) - - def _factorize_dummy(self) -> EncodedGroups: - size = self.group.size - # no need to factorize - # use slices to do views instead of fancy indexing - # equivalent to: group_indices = group_indices.reshape(-1, 1) - group_indices: T_GroupIndices = [slice(i, i + 1) for i in range(size)] - size_range = np.arange(size) - if isinstance(self.group, _DummyGroup): - codes = self.group.to_dataarray().copy(data=size_range) - else: - codes = self.group.copy(data=size_range) - unique_coord = self.group - full_index = IndexVariable( - self.group.name, unique_coord.values, self.group.attrs - ) - return EncodedGroups( - codes=codes, - group_indices=group_indices, - full_index=full_index, - unique_coord=unique_coord, - ) - - -@dataclass -class BinGrouper(Grouper): - """Grouper object for binning numeric data.""" - - bins: Any # TODO: What is the typing? - cut_kwargs: Mapping = field(default_factory=dict) - binned: Any = None - name: Any = None - - def __post_init__(self) -> None: - if duck_array_ops.isnull(self.bins).all(): - raise ValueError("All bin edges are NaN.") - - def factorize(self, group) -> EncodedGroups: - from xarray.core.dataarray import DataArray - - data = group.data - - binned, self.bins = pd.cut(data, self.bins, **self.cut_kwargs, retbins=True) - - binned_codes = binned.codes - if (binned_codes == -1).all(): - raise ValueError( - f"None of the data falls within bins with edges {self.bins!r}" - ) - - new_dim_name = f"{group.name}_bins" - - full_index = binned.categories - uniques = np.sort(pd.unique(binned_codes)) - unique_values = full_index[uniques[uniques != -1]] - - codes = DataArray( - binned_codes, getattr(group, "coords", None), name=new_dim_name - ) - unique_coord = IndexVariable(new_dim_name, pd.Index(unique_values), group.attrs) - return EncodedGroups( - codes=codes, full_index=full_index, unique_coord=unique_coord - ) - - -@dataclass -class TimeResampler(Resampler): - """Grouper object specialized to resampling the time coordinate.""" - - freq: str - closed: SideOptions | None = field(default=None) - label: SideOptions | None = field(default=None) - origin: str | DatetimeLike = field(default="start_day") - offset: pd.Timedelta | datetime.timedelta | str | None = field(default=None) - loffset: datetime.timedelta | str | None = field(default=None) - base: int | None = field(default=None) - - index_grouper: CFTimeGrouper | pd.Grouper = field(init=False) - group_as_index: pd.Index = field(init=False) - - def __post_init__(self): - if self.loffset is not None: - emit_user_level_warning( - "Following pandas, the `loffset` parameter to resample is deprecated. " - "Switch to updating the resampled dataset time coordinate using " - "time offset arithmetic. For example:\n" - " >>> offset = pd.tseries.frequencies.to_offset(freq) / 2\n" - ' >>> resampled_ds["time"] = resampled_ds.get_index("time") + offset', - FutureWarning, - ) - - if self.base is not None: - emit_user_level_warning( - "Following pandas, the `base` parameter to resample will be deprecated in " - "a future version of xarray. Switch to using `origin` or `offset` instead.", - FutureWarning, - ) - - if self.base is not None and self.offset is not None: - raise ValueError("base and offset cannot be present at the same time") - - def _init_properties(self, group: T_Group) -> None: - from xarray import CFTimeIndex - from xarray.core.pdcompat import _convert_base_to_offset - - group_as_index = safe_cast_to_index(group) - - if self.base is not None: - # grouper constructor verifies that grouper.offset is None at this point - offset = _convert_base_to_offset(self.base, self.freq, group_as_index) - else: - offset = self.offset - - if not group_as_index.is_monotonic_increasing: - # TODO: sort instead of raising an error - raise ValueError("index must be monotonic for resampling") - - if isinstance(group_as_index, CFTimeIndex): - from xarray.core.resample_cftime import CFTimeGrouper - - index_grouper = CFTimeGrouper( - freq=self.freq, - closed=self.closed, - label=self.label, - origin=self.origin, - offset=offset, - loffset=self.loffset, - ) - else: - index_grouper = pd.Grouper( - # TODO remove once requiring pandas >= 2.2 - freq=_new_to_legacy_freq(self.freq), - closed=self.closed, - label=self.label, - origin=self.origin, - offset=offset, - ) - self.index_grouper = index_grouper - self.group_as_index = group_as_index - - def _get_index_and_items(self) -> tuple[pd.Index, pd.Series, np.ndarray]: - first_items, codes = self.first_items() - full_index = first_items.index - if first_items.isnull().any(): - first_items = first_items.dropna() - - full_index = full_index.rename("__resample_dim__") - return full_index, first_items, codes - - def first_items(self) -> tuple[pd.Series, np.ndarray]: - from xarray import CFTimeIndex - - if isinstance(self.group_as_index, CFTimeIndex): - return self.index_grouper.first_items(self.group_as_index) - else: - s = pd.Series(np.arange(self.group_as_index.size), self.group_as_index) - grouped = s.groupby(self.index_grouper) - first_items = grouped.first() - counts = grouped.count() - # This way we generate codes for the final output index: full_index. - # So for _flox_reduce we avoid one reindex and copy by avoiding - # _maybe_restore_empty_groups - codes = np.repeat(np.arange(len(first_items)), counts) - if self.loffset is not None: - _apply_loffset(self.loffset, first_items) - return first_items, codes - - def factorize(self, group) -> EncodedGroups: - self._init_properties(group) - full_index, first_items, codes_ = self._get_index_and_items() - sbins = first_items.values.astype(np.int64) - group_indices: T_GroupIndices = [ - slice(i, j) for i, j in zip(sbins[:-1], sbins[1:]) - ] - group_indices += [slice(sbins[-1], None)] - - unique_coord = IndexVariable(group.name, first_items.index, group.attrs) - codes = group.copy(data=codes_) - - return EncodedGroups( - codes=codes, - group_indices=group_indices, - full_index=full_index, - unique_coord=unique_coord, - ) - - def _validate_groupby_squeeze(squeeze: bool | None) -> None: # While we don't generally check the type of every arg, passing # multiple dimensions as multiple arguments is common enough, and the @@ -1084,6 +726,8 @@ def _maybe_restore_empty_groups(self, combined): """Our index contained empty groups (e.g., from a resampling or binning). If we reduced on that dimension, we want to restore the full index. """ + from xarray.core.groupers import BinGrouper, TimeResampler + (grouper,) = self.groupers if ( isinstance(grouper.grouper, (BinGrouper, TimeResampler)) @@ -1118,6 +762,7 @@ def _flox_reduce( from flox.xarray import xarray_reduce from xarray.core.dataset import Dataset + from xarray.core.groupers import BinGrouper obj = self._original_obj (grouper,) = self.groupers @@ -1513,14 +1158,8 @@ def _restore_dim_order(self, stacked: DataArray) -> DataArray: (grouper,) = self.groupers group = grouper.group1d - groupby_coord = ( - f"{group.name}_bins" - if isinstance(grouper.grouper, BinGrouper) - else group.name - ) - def lookup_order(dimension): - if dimension == groupby_coord: + if dimension == grouper.name: (dimension,) = group.dims if dimension in self._obj.dims: axis = self._obj.get_axis_num(dimension) diff --git a/xarray/core/groupers.py b/xarray/core/groupers.py new file mode 100644 index 00000000000..e33cd3ad99f --- /dev/null +++ b/xarray/core/groupers.py @@ -0,0 +1,392 @@ +""" +This module provides Grouper objects that encapsulate the +"factorization" process - conversion of value we are grouping by +to integer codes (one per group). +""" + +from __future__ import annotations + +import datetime +from abc import ABC, abstractmethod +from collections.abc import Mapping, Sequence +from dataclasses import dataclass, field +from typing import Any + +import numpy as np +import pandas as pd + +from xarray.coding.cftime_offsets import _new_to_legacy_freq +from xarray.core import duck_array_ops +from xarray.core.dataarray import DataArray +from xarray.core.groupby import T_Group, _DummyGroup +from xarray.core.indexes import safe_cast_to_index +from xarray.core.resample_cftime import CFTimeGrouper +from xarray.core.types import DatetimeLike, SideOptions, T_GroupIndices +from xarray.core.utils import emit_user_level_warning +from xarray.core.variable import IndexVariable + +__all__ = [ + "EncodedGroups", + "Grouper", + "Resampler", + "UniqueGrouper", + "BinGrouper", + "TimeResampler", +] + +RESAMPLE_DIM = "__resample_dim__" + + +@dataclass +class EncodedGroups: + """ + Dataclass for storing intermediate values for GroupBy operation. + Returned by factorize method on Grouper objects. + + Parameters + ---------- + codes: integer codes for each group + full_index: pandas Index for the group coordinate + group_indices: optional, List of indices of array elements belonging + to each group. Inferred if not provided. + unique_coord: Unique group values present in dataset. Inferred if not provided + """ + + codes: DataArray + full_index: pd.Index + group_indices: T_GroupIndices | None = field(default=None) + unique_coord: IndexVariable | _DummyGroup | None = field(default=None) + + +class Grouper(ABC): + """Base class for Grouper objects that allow specializing GroupBy instructions.""" + + @property + def can_squeeze(self) -> bool: + """TODO: delete this when the `squeeze` kwarg is deprecated. Only `UniqueGrouper` + should override it.""" + return False + + @abstractmethod + def factorize(self, group) -> EncodedGroups: + """ + Takes the group, and creates intermediates necessary for GroupBy. + These intermediates are + 1. codes - Same shape as `group` containing a unique integer code for each group. + 2. group_indices - Indexes that let us index out the members of each group. + 3. unique_coord - Unique groups present in the dataset. + 4. full_index - Unique groups in the output. This differs from `unique_coord` in the + case of resampling and binning, where certain groups in the output are not present in + the input. + + Returns an instance of EncodedGroups. + """ + pass + + +class Resampler(Grouper): + """Base class for Grouper objects that allow specializing resampling-type GroupBy instructions. + Currently only used for TimeResampler, but could be used for SpaceResampler in the future. + """ + + pass + + +@dataclass +class UniqueGrouper(Grouper): + """Grouper object for grouping by a categorical variable.""" + + _group_as_index: pd.Index | None = None + + @property + def is_unique_and_monotonic(self) -> bool: + if isinstance(self.group, _DummyGroup): + return True + index = self.group_as_index + return index.is_unique and index.is_monotonic_increasing + + @property + def group_as_index(self) -> pd.Index: + if self._group_as_index is None: + self._group_as_index = self.group.to_index() + return self._group_as_index + + @property + def can_squeeze(self) -> bool: + """This is a deprecated method and will be removed eventually.""" + is_dimension = self.group.dims == (self.group.name,) + return is_dimension and self.is_unique_and_monotonic + + def factorize(self, group1d) -> EncodedGroups: + self.group = group1d + + if self.can_squeeze: + return self._factorize_dummy() + else: + return self._factorize_unique() + + def _factorize_unique(self) -> EncodedGroups: + # look through group to find the unique values + sort = not isinstance(self.group_as_index, pd.MultiIndex) + unique_values, codes_ = unique_value_groups(self.group_as_index, sort=sort) + if (codes_ == -1).all(): + raise ValueError( + "Failed to group data. Are you grouping by a variable that is all NaN?" + ) + codes = self.group.copy(data=codes_) + unique_coord = IndexVariable( + self.group.name, unique_values, attrs=self.group.attrs + ) + full_index = unique_coord + + return EncodedGroups( + codes=codes, full_index=full_index, unique_coord=unique_coord + ) + + def _factorize_dummy(self) -> EncodedGroups: + size = self.group.size + # no need to factorize + # use slices to do views instead of fancy indexing + # equivalent to: group_indices = group_indices.reshape(-1, 1) + group_indices: T_GroupIndices = [slice(i, i + 1) for i in range(size)] + size_range = np.arange(size) + if isinstance(self.group, _DummyGroup): + codes = self.group.to_dataarray().copy(data=size_range) + else: + codes = self.group.copy(data=size_range) + unique_coord = self.group + full_index = IndexVariable( + self.group.name, unique_coord.values, self.group.attrs + ) + return EncodedGroups( + codes=codes, + group_indices=group_indices, + full_index=full_index, + unique_coord=unique_coord, + ) + + +@dataclass +class BinGrouper(Grouper): + """Grouper object for binning numeric data.""" + + bins: int | Sequence | pd.IntervalIndex + cut_kwargs: Mapping = field(default_factory=dict) + binned: Any = None + name: Any = None + + def __post_init__(self) -> None: + if duck_array_ops.isnull(self.bins).all(): + raise ValueError("All bin edges are NaN.") + + def factorize(self, group) -> EncodedGroups: + from xarray.core.dataarray import DataArray + + data = group.data + + binned, self.bins = pd.cut(data, self.bins, **self.cut_kwargs, retbins=True) + + binned_codes = binned.codes + if (binned_codes == -1).all(): + raise ValueError( + f"None of the data falls within bins with edges {self.bins!r}" + ) + + new_dim_name = f"{group.name}_bins" + + full_index = binned.categories + uniques = np.sort(pd.unique(binned_codes)) + unique_values = full_index[uniques[uniques != -1]] + + codes = DataArray( + binned_codes, getattr(group, "coords", None), name=new_dim_name + ) + unique_coord = IndexVariable(new_dim_name, pd.Index(unique_values), group.attrs) + return EncodedGroups( + codes=codes, full_index=full_index, unique_coord=unique_coord + ) + + +@dataclass +class TimeResampler(Resampler): + """Grouper object specialized to resampling the time coordinate.""" + + freq: str + closed: SideOptions | None = field(default=None) + label: SideOptions | None = field(default=None) + origin: str | DatetimeLike = field(default="start_day") + offset: pd.Timedelta | datetime.timedelta | str | None = field(default=None) + loffset: datetime.timedelta | str | None = field(default=None) + base: int | None = field(default=None) + + index_grouper: CFTimeGrouper | pd.Grouper = field(init=False) + group_as_index: pd.Index = field(init=False) + + def __post_init__(self): + if self.loffset is not None: + emit_user_level_warning( + "Following pandas, the `loffset` parameter to resample is deprecated. " + "Switch to updating the resampled dataset time coordinate using " + "time offset arithmetic. For example:\n" + " >>> offset = pd.tseries.frequencies.to_offset(freq) / 2\n" + ' >>> resampled_ds["time"] = resampled_ds.get_index("time") + offset', + FutureWarning, + ) + + if self.base is not None: + emit_user_level_warning( + "Following pandas, the `base` parameter to resample will be deprecated in " + "a future version of xarray. Switch to using `origin` or `offset` instead.", + FutureWarning, + ) + + if self.base is not None and self.offset is not None: + raise ValueError("base and offset cannot be present at the same time") + + def _init_properties(self, group: T_Group) -> None: + from xarray import CFTimeIndex + from xarray.core.pdcompat import _convert_base_to_offset + + group_as_index = safe_cast_to_index(group) + + if self.base is not None: + # grouper constructor verifies that grouper.offset is None at this point + offset = _convert_base_to_offset(self.base, self.freq, group_as_index) + else: + offset = self.offset + + if not group_as_index.is_monotonic_increasing: + # TODO: sort instead of raising an error + raise ValueError("index must be monotonic for resampling") + + if isinstance(group_as_index, CFTimeIndex): + from xarray.core.resample_cftime import CFTimeGrouper + + index_grouper = CFTimeGrouper( + freq=self.freq, + closed=self.closed, + label=self.label, + origin=self.origin, + offset=offset, + loffset=self.loffset, + ) + else: + index_grouper = pd.Grouper( + # TODO remove once requiring pandas >= 2.2 + freq=_new_to_legacy_freq(self.freq), + closed=self.closed, + label=self.label, + origin=self.origin, + offset=offset, + ) + self.index_grouper = index_grouper + self.group_as_index = group_as_index + + def _get_index_and_items(self) -> tuple[pd.Index, pd.Series, np.ndarray]: + first_items, codes = self.first_items() + full_index = first_items.index + if first_items.isnull().any(): + first_items = first_items.dropna() + + full_index = full_index.rename("__resample_dim__") + return full_index, first_items, codes + + def first_items(self) -> tuple[pd.Series, np.ndarray]: + from xarray import CFTimeIndex + + if isinstance(self.group_as_index, CFTimeIndex): + return self.index_grouper.first_items(self.group_as_index) + else: + s = pd.Series(np.arange(self.group_as_index.size), self.group_as_index) + grouped = s.groupby(self.index_grouper) + first_items = grouped.first() + counts = grouped.count() + # This way we generate codes for the final output index: full_index. + # So for _flox_reduce we avoid one reindex and copy by avoiding + # _maybe_restore_empty_groups + codes = np.repeat(np.arange(len(first_items)), counts) + if self.loffset is not None: + _apply_loffset(self.loffset, first_items) + return first_items, codes + + def factorize(self, group) -> EncodedGroups: + self._init_properties(group) + full_index, first_items, codes_ = self._get_index_and_items() + sbins = first_items.values.astype(np.int64) + group_indices: T_GroupIndices = [ + slice(i, j) for i, j in zip(sbins[:-1], sbins[1:]) + ] + group_indices += [slice(sbins[-1], None)] + + unique_coord = IndexVariable(group.name, first_items.index, group.attrs) + codes = group.copy(data=codes_) + + return EncodedGroups( + codes=codes, + group_indices=group_indices, + full_index=full_index, + unique_coord=unique_coord, + ) + + +def _apply_loffset( + loffset: str | pd.DateOffset | datetime.timedelta | pd.Timedelta, + result: pd.Series | pd.DataFrame, +): + """ + (copied from pandas) + if loffset is set, offset the result index + + This is NOT an idempotent routine, it will be applied + exactly once to the result. + + Parameters + ---------- + result : Series or DataFrame + the result of resample + """ + # pd.Timedelta is a subclass of datetime.timedelta so we do not need to + # include it in instance checks. + if not isinstance(loffset, (str, pd.DateOffset, datetime.timedelta)): + raise ValueError( + f"`loffset` must be a str, pd.DateOffset, datetime.timedelta, or pandas.Timedelta object. " + f"Got {loffset}." + ) + + if isinstance(loffset, str): + loffset = pd.tseries.frequencies.to_offset(loffset) + + needs_offset = ( + isinstance(loffset, (pd.DateOffset, datetime.timedelta)) + and isinstance(result.index, pd.DatetimeIndex) + and len(result.index) > 0 + ) + + if needs_offset: + result.index = result.index + loffset + + +def unique_value_groups( + ar, sort: bool = True +) -> tuple[np.ndarray | pd.Index, np.ndarray]: + """Group an array by its unique values. + + Parameters + ---------- + ar : array-like + Input array. This will be flattened if it is not already 1-D. + sort : bool, default: True + Whether or not to sort unique values. + + Returns + ------- + values : np.ndarray + Sorted, unique values as returned by `np.unique`. + indices : list of lists of int + Each element provides the integer indices in `ar` with values given by + the corresponding value in `unique_values`. + """ + inverse, values = pd.factorize(ar, sort=sort) + if isinstance(values, pd.MultiIndex): + values.names = ar.names + return values, inverse diff --git a/xarray/core/resample.py b/xarray/core/resample.py index 9181bb4ee9b..ceab0a891c9 100644 --- a/xarray/core/resample.py +++ b/xarray/core/resample.py @@ -15,7 +15,7 @@ from xarray.core.dataarray import DataArray from xarray.core.dataset import Dataset -RESAMPLE_DIM = "__resample_dim__" +from xarray.core.groupers import RESAMPLE_DIM class Resample(GroupBy[T_Xarray]): diff --git a/xarray/core/types.py b/xarray/core/types.py index 41078d29c0e..7477963aadf 100644 --- a/xarray/core/types.py +++ b/xarray/core/types.py @@ -84,14 +84,20 @@ # anything with a dtype attribute _SupportsDType[np.dtype[Any]], ] - try: - from cftime import datetime as CFTimeDatetime - except ImportError: - CFTimeDatetime = Any - DatetimeLike = Union[pd.Timestamp, datetime.datetime, np.datetime64, CFTimeDatetime] + else: DTypeLikeSave: Any = None +# https://mypy.readthedocs.io/en/stable/common_issues.html#variables-vs-type-aliases +try: + from cftime import datetime as CFTimeDatetime +except ImportError: + CFTimeDatetime = np.datetime64 + +DatetimeLike: TypeAlias = Union[ + pd.Timestamp, datetime.datetime, np.datetime64, CFTimeDatetime +] + class Alignable(Protocol): """Represents any Xarray type that supports alignment. @@ -283,3 +289,7 @@ def copy( NetcdfWriteModes = Literal["w", "a"] ZarrWriteModes = Literal["w", "w-", "a", "a-", "r+", "r"] + +GroupKey = Any +GroupIndex = Union[int, slice, list[int]] +T_GroupIndices = list[GroupIndex]