Skip to content

Commit 62e180b

Browse files
authored
Merge pull request #1 from JMorado/sharding
Sharding
2 parents 1fd1dcd + 8c9b185 commit 62e180b

File tree

3 files changed

+56
-4
lines changed

3 files changed

+56
-4
lines changed

xarray/backends/api.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
from xarray import backends, conventions
1414
from xarray.backends import plugins
1515
from xarray.backends.common import AbstractDataStore, ArrayWriter, _normalize_path
16-
from xarray.backends.locks import _get_scheduler
16+
from xarray.backends.locks import _get_scheduler, get_write_lock
1717
from xarray.core import indexing
1818
from xarray.core.combine import (
1919
_infer_concat_order_from_positions,
@@ -1650,7 +1650,11 @@ def to_zarr(
16501650
"mode='r+'. To allow writing new variables, set mode='a'."
16511651
)
16521652

1653-
writer = ArrayWriter()
1653+
if any(["storage_transformers" in encoding[var] for var in encoding]):
1654+
writer = ArrayWriter(lock=get_write_lock("ZARR_SHARDING_LOCK"))
1655+
else:
1656+
writer = ArrayWriter()
1657+
16541658
# TODO: figure out how to properly handle unlimited_dims
16551659
dump_to_store(dataset, zstore, writer, encoding=encoding)
16561660
writes = writer.sync(compute=compute)

xarray/backends/locks.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
# Neither HDF5 nor the netCDF-C library are thread-safe.
1818
HDF5_LOCK = SerializableLock()
1919
NETCDFC_LOCK = SerializableLock()
20-
20+
ZARR_SHARDING_LOCK = SerializableLock()
2121

2222
_FILE_LOCKS: MutableMapping[Any, threading.Lock] = weakref.WeakValueDictionary()
2323

xarray/tests/test_backends.py

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,12 +109,27 @@
109109
KVStore = None
110110

111111
have_zarr_v3 = False
112+
have_sharding_v3 = False
112113
try:
113-
# as of Zarr v2.13 these imports require environment variable
114+
# as of Zarr v2.14 these imports require environment variable
114115
# ZARR_V3_EXPERIMENTAL_API=1
115116
from zarr import DirectoryStoreV3, KVStoreV3
116117

117118
have_zarr_v3 = True
119+
120+
from zarr._storage.v3_storage_transformers import v3_sharding_available
121+
122+
# TODO: change to try except ImportError when available at the top-level zarr namespace
123+
if v3_sharding_available:
124+
# as of Zarr v2.14 these imports require environment variable
125+
# ZARR_V3_SHARDING=1
126+
# TODO: change import to
127+
# from zarr import ShardingStorageTransformer
128+
# when ShardingStorageTransformer becomes available at the top-level zarr namespace
129+
from zarr._storage.v3_storage_transformers import ShardingStorageTransformer
130+
131+
have_sharding_v3 = True
132+
118133
except ImportError:
119134
KVStoreV3 = None
120135

@@ -2660,6 +2675,39 @@ def create_zarr_target(self):
26602675
yield tmp
26612676

26622677

2678+
@pytest.mark.skipif(not have_zarr_v3, reason="requires zarr version 3")
2679+
class TestZarrStorageTransformersV3(TestZarrDirectoryStoreV3):
2680+
@pytest.mark.skipif(not have_sharding_v3, reason="requires sharding")
2681+
def test_sharding_storage_transformer(self):
2682+
original = create_test_data().chunk({"dim1": 2, "dim2": 3, "dim3": 2})
2683+
2684+
encoding = {
2685+
"var1": {
2686+
"storage_transformers": [
2687+
ShardingStorageTransformer("indexed", chunks_per_shard=(2, 1))
2688+
],
2689+
},
2690+
"var2": {
2691+
"storage_transformers": [
2692+
ShardingStorageTransformer("indexed", chunks_per_shard=(2, 2))
2693+
],
2694+
},
2695+
"var3": {
2696+
"storage_transformers": [
2697+
ShardingStorageTransformer("indexed", chunks_per_shard=(1, 1))
2698+
],
2699+
},
2700+
}
2701+
2702+
with self.roundtrip(
2703+
original, save_kwargs={"encoding": encoding}, open_kwargs={"chunks": {}}
2704+
) as ds1:
2705+
assert_identical(ds1, original)
2706+
2707+
with self.roundtrip_append(original, open_kwargs={"chunks": {}}) as ds2:
2708+
assert_identical(ds2, original)
2709+
2710+
26632711
@requires_zarr
26642712
@requires_fsspec
26652713
def test_zarr_storage_options() -> None:

0 commit comments

Comments
 (0)