Skip to content

cftime 1.5.0 changes behviour upon pickling : breaks get_clean_interp_index with a dask distributed scheduler. #5551

Open
@aulemahal

Description

@aulemahal

What happened:

Quite a specific bug! Using map_blocks to wrap a polyfit computation, using a dask client (not the local scheduler) and a time axis with a cftime calendar, I got the error : TypeError: cannot compute the time difference between dates with different calendars.

What you expected to happen:

No bug.

Minimal Complete Verifiable Example:

ds = xr.tutorial.open_dataset('rasm').chunk({'x': 25, 'y': 25})

templ = ds.Tair

def func(ds, verbose=False):
    # Dummy function that call get_clean_interp_index function
    # Return the Tair as-is just for the test.
    if verbose:
        print(ds.time)
        print(type(ds.time[0].item()))
    x = xr.core.missing.get_clean_interp_index(ds, 'time')

    return ds.Tair

# This works (time is a coordinate, so it is already loaded
x = xr.core.missing.get_clean_interp_index(ds, 'time')

# This works too. The local scheduler is used.
out = ds.map_blocks(
    func,
    template=templ,
    kwargs={'verbose': False}
)
out.load()

# This fails!
with Client(n_workers=1, threads_per_worker=8, dashboard_address=8786, memory_limit='7GB') as c:
    out = ds.map_blocks(
        func,
        template=templ,
        kwargs={'verbose': True}
    )
    out.load()

The full traceback is here:

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-12-de89288ffcd5> in <module>
     27         kwargs={'verbose': True}
     28     )
---> 29     out.load()

/exec/pbourg/.conda/x38/lib/python3.9/site-packages/xarray/core/dataarray.py in load(self, **kwargs)
    883         dask.compute
    884         """
--> 885         ds = self._to_temp_dataset().load(**kwargs)
    886         new = self._from_temp_dataset(ds)
    887         self._variable = new._variable

/exec/pbourg/.conda/x38/lib/python3.9/site-packages/xarray/core/dataset.py in load(self, **kwargs)
    848 
    849             # evaluate all the dask arrays simultaneously
--> 850             evaluated_data = da.compute(*lazy_data.values(), **kwargs)
    851 
    852             for k, data in zip(lazy_data, evaluated_data):

/exec/pbourg/.conda/x38/lib/python3.9/site-packages/dask/base.py in compute(*args, **kwargs)
    565         postcomputes.append(x.__dask_postcompute__())
    566 
--> 567     results = schedule(dsk, keys, **kwargs)
    568     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    569 

/exec/pbourg/.conda/x38/lib/python3.9/site-packages/distributed/client.py in get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   2705                     should_rejoin = False
   2706             try:
-> 2707                 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
   2708             finally:
   2709                 for f in futures.values():

/exec/pbourg/.conda/x38/lib/python3.9/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
   2019             else:
   2020                 local_worker = None
-> 2021             return self.sync(
   2022                 self._gather,
   2023                 futures,

/exec/pbourg/.conda/x38/lib/python3.9/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    860             return future
    861         else:
--> 862             return sync(
    863                 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    864             )

/exec/pbourg/.conda/x38/lib/python3.9/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
    336     if error[0]:
    337         typ, exc, tb = error[0]
--> 338         raise exc.with_traceback(tb)
    339     else:
    340         return result[0]

/exec/pbourg/.conda/x38/lib/python3.9/site-packages/distributed/utils.py in f()
    319             if callback_timeout is not None:
    320                 future = asyncio.wait_for(future, callback_timeout)
--> 321             result[0] = yield future
    322         except Exception:
    323             error[0] = sys.exc_info()

/exec/pbourg/.conda/x38/lib/python3.9/site-packages/tornado/gen.py in run(self)
    760 
    761                     try:
--> 762                         value = future.result()
    763                     except Exception:
    764                         exc_info = sys.exc_info()

/exec/pbourg/.conda/x38/lib/python3.9/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
   1884                             exc = CancelledError(key)
   1885                         else:
-> 1886                             raise exception.with_traceback(traceback)
   1887                         raise exc
   1888                     if errors == "skip":

/exec/pbourg/.conda/x38/lib/python3.9/site-packages/xarray/core/parallel.py in _wrapper()
    284         ]
    285 
--> 286         result = func(*converted_args, **kwargs)
    287 
    288         # check all dims are present

<ipython-input-12-de89288ffcd5> in func()
      8         print(ds.time)
      9         print(type(ds.time[0].item()))
---> 10     x = xr.core.missing.get_clean_interp_index(ds, 'time')
     11 
     12     return ds.Tair

/exec/pbourg/.conda/x38/lib/python3.9/site-packages/xarray/core/missing.py in get_clean_interp_index()
    276             index = index.values
    277         index = Variable(
--> 278             data=datetime_to_numeric(index, offset=offset, datetime_unit="ns"),
    279             dims=(dim,),
    280         )

/exec/pbourg/.conda/x38/lib/python3.9/site-packages/xarray/core/duck_array_ops.py in datetime_to_numeric()
    462     # For np.datetime64, this can silently yield garbage due to overflow.
    463     # One option is to enforce 1970-01-01 as the universal offset.
--> 464     array = array - offset
    465 
    466     # Scalar is converted to 0d-array

src/cftime/_cftime.pyx in cftime._cftime.datetime.__sub__()

TypeError: cannot compute the time difference between dates with different calendars

The printout to the console. I am calling this in a jupyter notebook so the prints from within workers are in the console, not in the cell's output. I removed useless lines to shorten it.

<xarray.DataArray 'time' (time: 36)>
array([cftime.datetime(1980, 9, 16, 12, 0, 0, 0, calendar='noleap', has_year_zero=True),
       cftime.datetime(1980, 10, 17, 0, 0, 0, 0, calendar='noleap', has_year_zero=True),
       cftime.datetime(1980, 11, 16, 12, 0, 0, 0, calendar='noleap', has_year_zero=True),
         ....
)],
      dtype=object)
Coordinates:
  * time     (time) object 1980-09-16 12:00:00 ... 1983-08-17 00:00:00
Attributes:
    long_name:       time
    type_preferred:  int

<class 'cftime._cftime.datetime'>

And for reference:

>>> ds.time
array([cftime.DatetimeNoLeap(1980, 9, 16, 12, 0, 0, 0, has_year_zero=True),
       cftime.DatetimeNoLeap(1980, 10, 17, 0, 0, 0, 0, has_year_zero=True),
       cftime.DatetimeNoLeap(1980, 11, 16, 12, 0, 0, 0, has_year_zero=True),
        ...
>>> type(ds.time[0].item())
cftime._cftime.DatetimeNoLeap

Anything else we need to know?:

I'm not sure where the exact breaking change lies (dask or cftime?), but this worked with dask 2021.5 and cftime <= 1.4.1. The problem lies in get_clean_interp_index, specifically these lines:

offset = type(index[0])(1970, 1, 1)
if isinstance(index, CFTimeIndex):
index = index.values
index = Variable(
data=datetime_to_numeric(index, offset=offset, datetime_unit="ns"),
dims=(dim,),
)

On the original dataset, the class of the time values is DatetimeNoLeap whereas the time coordinates received by func are of class datetime, the calendar is only a kwargs. Thus, in get_clean_interp_index the offset is created with the default "standard" calendar and becomes incompatible with the array itself. Which makes datetime_to_numeric fail.

Environment:

Output of xr.show_versions() INSTALLED VERSIONS ------------------ commit: None python: 3.9.5 | packaged by conda-forge | (default, Jun 19 2021, 00:32:32) [GCC 9.3.0] python-bits: 64 OS: Linux OS-release: 3.10.0-514.2.2.el7.x86_64 machine: x86_64 processor: x86_64 byteorder: little LC_ALL: None LANG: en_CA.UTF-8 LOCALE: ('en_CA', 'UTF-8') libhdf5: 1.10.6 libnetcdf: 4.7.4

xarray: 0.18.2
pandas: 1.2.5
numpy: 1.21.0
scipy: 1.7.0
netCDF4: 1.5.6
pydap: None
h5netcdf: None
h5py: None
Nio: None
zarr: 2.8.3
cftime: 1.5.0
nc_time_axis: None
PseudoNetCDF: None
rasterio: None
cfgrib: None
iris: None
bottleneck: 1.3.2
dask: 2021.06.2
distributed: 2021.06.2
matplotlib: 3.4.2
cartopy: 0.19.0
seaborn: None
numbagg: None
pint: 0.17
setuptools: 49.6.0.post20210108
pip: 21.1.3
conda: None
pytest: None
IPython: 7.25.0
sphinx: None

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions