Skip to content

Default to RemoteStore for fsspec URIs #2198

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Sep 19, 2024
Merged
61 changes: 49 additions & 12 deletions src/zarr/api/asynchronous.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ async def open(
zarr_version: ZarrFormat | None = None, # deprecated
zarr_format: ZarrFormat | None = None,
path: str | None = None,
storage_options: dict[str, Any] | None = None,
**kwargs: Any, # TODO: type kwargs as valid args to open_array
) -> AsyncArray | AsyncGroup:
"""Convenience function to open a group or array using file-mode-like semantics.
Expand All @@ -211,6 +212,9 @@ async def open(
The zarr format to use when saving.
path : str or None, optional
The path within the store to open.
storage_options : dict
If using an fsspec URL to create the store, these will be passed to
the backend implementation. Ignored otherwise.
**kwargs
Additional parameters are passed through to :func:`zarr.creation.open_array` or
:func:`zarr.hierarchy.open_group`.
Expand All @@ -221,7 +225,7 @@ async def open(
Return type depends on what exists in the given store.
"""
zarr_format = _handle_zarr_version_or_format(zarr_version=zarr_version, zarr_format=zarr_format)
store_path = await make_store_path(store, mode=mode)
store_path = await make_store_path(store, mode=mode, storage_options=storage_options)

if path is not None:
store_path = store_path / path
Expand Down Expand Up @@ -276,6 +280,7 @@ async def save_array(
zarr_version: ZarrFormat | None = None, # deprecated
zarr_format: ZarrFormat | None = None,
path: str | None = None,
storage_options: dict[str, Any] | None = None,
**kwargs: Any, # TODO: type kwargs as valid args to create
) -> None:
"""Convenience function to save a NumPy array to the local file system, following a
Expand All @@ -291,6 +296,9 @@ async def save_array(
The zarr format to use when saving.
path : str or None, optional
The path within the store where the array will be saved.
storage_options : dict
If using an fsspec URL to create the store, these will be passed to
the backend implementation. Ignored otherwise.
kwargs
Passed through to :func:`create`, e.g., compressor.
"""
Expand All @@ -299,7 +307,7 @@ async def save_array(
or _default_zarr_version()
)

store_path = await make_store_path(store, mode="w")
store_path = await make_store_path(store, mode="w", storage_options=storage_options)
if path is not None:
store_path = store_path / path
new = await AsyncArray.create(
Expand All @@ -319,6 +327,7 @@ async def save_group(
zarr_version: ZarrFormat | None = None, # deprecated
zarr_format: ZarrFormat | None = None,
path: str | None = None,
storage_options: dict[str, Any] | None = None,
**kwargs: NDArrayLike,
) -> None:
"""Convenience function to save several NumPy arrays to the local file system, following a
Expand All @@ -334,22 +343,40 @@ async def save_group(
The zarr format to use when saving.
path : str or None, optional
Path within the store where the group will be saved.
storage_options : dict
If using an fsspec URL to create the store, these will be passed to
the backend implementation. Ignored otherwise.
kwargs
NumPy arrays with data to save.
"""
zarr_format = (
_handle_zarr_version_or_format(zarr_version=zarr_version, zarr_format=zarr_format)
_handle_zarr_version_or_format(
zarr_version=zarr_version,
zarr_format=zarr_format,
)
or _default_zarr_version()
)

if len(args) == 0 and len(kwargs) == 0:
raise ValueError("at least one array must be provided")
aws = []
for i, arr in enumerate(args):
aws.append(save_array(store, arr, zarr_format=zarr_format, path=f"{path}/arr_{i}"))
aws.append(
save_array(
store,
arr,
zarr_format=zarr_format,
path=f"{path}/arr_{i}",
storage_options=storage_options,
)
)
for k, arr in kwargs.items():
_path = f"{path}/{k}" if path is not None else k
aws.append(save_array(store, arr, zarr_format=zarr_format, path=_path))
aws.append(
save_array(
store, arr, zarr_format=zarr_format, path=_path, storage_options=storage_options
)
)
await asyncio.gather(*aws)


Expand Down Expand Up @@ -418,6 +445,7 @@ async def group(
zarr_format: ZarrFormat | None = None,
meta_array: Any | None = None, # not used
attributes: dict[str, JSON] | None = None,
storage_options: dict[str, Any] | None = None,
) -> AsyncGroup:
"""Create a group.

Expand All @@ -444,6 +472,9 @@ async def group(
to users. Use `numpy.empty(())` by default.
zarr_format : {2, 3, None}, optional
The zarr format to use when saving.
storage_options : dict
If using an fsspec URL to create the store, these will be passed to
the backend implementation. Ignored otherwise.

Returns
-------
Expand All @@ -453,7 +484,7 @@ async def group(

zarr_format = _handle_zarr_version_or_format(zarr_version=zarr_version, zarr_format=zarr_format)

store_path = await make_store_path(store)
store_path = await make_store_path(store, storage_options=storage_options)
if path is not None:
store_path = store_path / path

Expand Down Expand Up @@ -488,7 +519,7 @@ async def open_group(
synchronizer: Any = None, # not used
path: str | None = None,
chunk_store: StoreLike | None = None, # not used
storage_options: dict[str, Any] | None = None, # not used
storage_options: dict[str, Any] | None = None,
zarr_version: ZarrFormat | None = None, # deprecated
zarr_format: ZarrFormat | None = None,
meta_array: Any | None = None, # not used
Expand Down Expand Up @@ -548,10 +579,8 @@ async def open_group(
warnings.warn("meta_array is not yet implemented", RuntimeWarning, stacklevel=2)
if chunk_store is not None:
warnings.warn("chunk_store is not yet implemented", RuntimeWarning, stacklevel=2)
if storage_options is not None:
warnings.warn("storage_options is not yet implemented", RuntimeWarning, stacklevel=2)

store_path = await make_store_path(store, mode=mode)
store_path = await make_store_path(store, mode=mode, storage_options=storage_options)
if path is not None:
store_path = store_path / path

Expand Down Expand Up @@ -603,6 +632,7 @@ async def create(
) = None,
codecs: Iterable[Codec | dict[str, JSON]] | None = None,
dimension_names: Iterable[str] | None = None,
storage_options: dict[str, Any] | None = None,
**kwargs: Any,
) -> AsyncArray:
"""Create an array.
Expand Down Expand Up @@ -674,6 +704,9 @@ async def create(
to users. Use `numpy.empty(())` by default.

.. versionadded:: 2.13
storage_options : dict
If using an fsspec URL to create the store, these will be passed to
the backend implementation. Ignored otherwise.

Returns
-------
Expand Down Expand Up @@ -725,7 +758,7 @@ async def create(
warnings.warn("meta_array is not yet implemented", RuntimeWarning, stacklevel=2)

mode = kwargs.pop("mode", cast(AccessModeLiteral, "r" if read_only else "w"))
store_path = await make_store_path(store, mode=mode)
store_path = await make_store_path(store, mode=mode, storage_options=storage_options)
if path is not None:
store_path = store_path / path

Expand Down Expand Up @@ -875,6 +908,7 @@ async def open_array(
zarr_version: ZarrFormat | None = None, # deprecated
zarr_format: ZarrFormat | None = None,
path: PathLike | None = None,
storage_options: dict[str, Any] | None = None,
**kwargs: Any, # TODO: type kwargs as valid args to save
) -> AsyncArray:
"""Open an array using file-mode-like semantics.
Expand All @@ -887,6 +921,9 @@ async def open_array(
The zarr format to use when saving.
path : string, optional
Path in store to array.
storage_options : dict
If using an fsspec URL to create the store, these will be passed to
the backend implementation. Ignored otherwise.
**kwargs
Any keyword arguments to pass to the array constructor.

Expand All @@ -896,7 +933,7 @@ async def open_array(
The opened array.
"""

store_path = await make_store_path(store)
store_path = await make_store_path(store, storage_options=storage_options)
if path is not None:
store_path = store_path / path

Expand Down
2 changes: 2 additions & 0 deletions src/zarr/api/synchronous.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ def save_group(
zarr_version: ZarrFormat | None = None, # deprecated
zarr_format: ZarrFormat | None = None,
path: str | None = None,
storage_options: dict[str, Any] | None = None,
**kwargs: NDArrayLike,
) -> None:
return sync(
Expand All @@ -143,6 +144,7 @@ def save_group(
zarr_version=zarr_version,
zarr_format=zarr_format,
path=path,
storage_options=storage_options,
**kwargs,
)
)
Expand Down
41 changes: 33 additions & 8 deletions src/zarr/store/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,18 @@
from pathlib import Path
from typing import TYPE_CHECKING, Any, Literal

import fsspec
import fsspec.implementations

from zarr.abc.store import AccessMode, Store
from zarr.core.buffer import Buffer, default_buffer_prototype
from zarr.core.common import ZARR_JSON, ZARRAY_JSON, ZGROUP_JSON, ZarrFormat
from zarr.errors import ContainsArrayAndGroupError, ContainsArrayError, ContainsGroupError
from zarr.store.local import LocalStore
from zarr.store.memory import MemoryStore

# from zarr.store.remote import RemoteStore

if TYPE_CHECKING:
from zarr.core.buffer import BufferPrototype
from zarr.core.common import AccessModeLiteral
Expand Down Expand Up @@ -75,30 +80,50 @@ def __eq__(self, other: Any) -> bool:


async def make_store_path(
store_like: StoreLike | None, *, mode: AccessModeLiteral | None = None
store_like: StoreLike | None,
*,
path: str | None = None,
mode: AccessModeLiteral | None = None,
storage_options: dict[str, Any] | None = None,
) -> StorePath:
from zarr.store.remote import RemoteStore # circular import

if isinstance(store_like, StorePath):
if mode is not None:
assert AccessMode.from_literal(mode) == store_like.store.mode
return store_like
result = store_like
elif isinstance(store_like, Store):
if mode is not None:
assert AccessMode.from_literal(mode) == store_like.mode
await store_like._ensure_open()
return StorePath(store_like)
result = StorePath(store_like)
elif store_like is None:
if mode is None:
mode = "w" # exception to the default mode = 'r'
return StorePath(await MemoryStore.open(mode=mode))
result = StorePath(await MemoryStore.open(mode=mode))
elif isinstance(store_like, Path):
return StorePath(await LocalStore.open(root=store_like, mode=mode or "r"))
result = StorePath(await LocalStore.open(root=store_like, mode=mode or "r"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can pass **storage_options to LocalStore as well

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could be missing something, but I don't think that'll work. LocalStore.open will call LocalStore.__init__, which just takes root and mode, which are passed as regular args here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see what you mean. I was thinking auto_mkdir would be passed through but if that's not the case, let's not get distracted here.

elif isinstance(store_like, str):
return StorePath(await LocalStore.open(root=Path(store_like), mode=mode or "r"))
try:
fs, path = fsspec.url_to_fs(store_like, **storage_options)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick: this constructs the actual filesystem class instance, but we have to go back to a URL (and reconstruct the filesystem instance) for the RemoteStore interface. It'd be nice to avoid that.

except Exception:
# not sure what to do here, but I don't want this to fail...
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is bad, but in my experience fsspec.url_to_fs can run pretty much arbitrary code, depending on what packages you have installed and what URI you pass. I really don't want this to fail and cause issues for people who just want a local path. maybe we'd always get an fsspec LocalFileSystem for a plain, non-fsspec URI string, but I'm not sure.

pass
else:
if "file" not in fs.protocol:
storage_options = storage_options or {}
return StorePath(RemoteStore(url=store_like, mode=mode or "r", **storage_options))
result = StorePath(await LocalStore.open(root=Path(store_like), mode=mode or "r"))
elif isinstance(store_like, dict):
# We deliberate only consider dict[str, Buffer] here, and not arbitrary mutable mappings.
# By only allowing dictionaries, which are in-memory, we know that MemoryStore appropriate.
return StorePath(await MemoryStore.open(store_dict=store_like, mode=mode))
raise TypeError
result = StorePath(await MemoryStore.open(store_dict=store_like, mode=mode))
else:
raise TypeError

if path is not None:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, I forgot to revert this. It's not used yet.

We call make_store_path multiple times in a single call by the user (e.g. save_array calls make_store_path and passes the result to AsyncArray.create which also calls make_store_path). I was thinking about consolidating all the store / path stuff in a single spot, but it ended up being a bit much and I wasn't sure what the tolerance was for expanding the API of methods like AsyncArray.create even more. Do we want users to do

AsyncArray.create(store=StorePath(RemoteStore(..., **storage_options), path))

or

AsyncArray.create(store="s3://bucket/path.zarr", storage_options=...)

result = result / path
return result


async def ensure_no_existing_node(store_path: StorePath, zarr_format: ZarrFormat) -> None:
Expand Down
9 changes: 9 additions & 0 deletions tests/v3/test_store/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from zarr.store.common import make_store_path
from zarr.store.local import LocalStore
from zarr.store.memory import MemoryStore
from zarr.store.remote import RemoteStore


async def test_make_store_path(tmpdir: str) -> None:
Expand Down Expand Up @@ -34,3 +35,11 @@ async def test_make_store_path(tmpdir: str) -> None:

with pytest.raises(TypeError):
await make_store_path(1) # type: ignore[arg-type]


async def test_make_store_path_fsspec(monkeypatch) -> None:
import fsspec.implementations.memory

monkeypatch.setattr(fsspec.implementations.memory.MemoryFileSystem, "async_impl", True)
store_path = await make_store_path("memory://")
assert isinstance(store_path.store, RemoteStore)
41 changes: 41 additions & 0 deletions tests/v3/test_store/test_remote.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
import os
from collections.abc import Generator

Expand All @@ -6,6 +7,7 @@
import pytest
from upath import UPath

import zarr.api.asynchronous
from zarr.core.buffer import Buffer, cpu, default_buffer_prototype
from zarr.core.sync import sync
from zarr.store import RemoteStore
Expand Down Expand Up @@ -158,3 +160,42 @@ def test_store_supports_partial_writes(self, store: RemoteStore) -> None:

def test_store_supports_listing(self, store: RemoteStore) -> None:
assert True

async def test_remote_store_from_uri(
self, store: RemoteStore, store_kwargs: dict[str, str | bool]
):
storage_options = {
"endpoint_url": endpoint_url,
"anon": False,
}

meta = {"attributes": {"key": "value"}, "zarr_format": 3, "node_type": "group"}

await store.set(
"zarr.json",
self.buffer_cls.from_bytes(json.dumps(meta).encode()),
)
group = await zarr.api.asynchronous.open_group(
store=store._url, storage_options=storage_options
)
assert dict(group.attrs) == {"key": "value"}

meta["attributes"]["key"] = "value-2"
await store.set(
"directory-2/zarr.json",
self.buffer_cls.from_bytes(json.dumps(meta).encode()),
)
group = await zarr.api.asynchronous.open_group(
store="/".join([store._url.rstrip("/"), "directory-2"]), storage_options=storage_options
)
assert dict(group.attrs) == {"key": "value-2"}

meta["attributes"]["key"] = "value-3"
await store.set(
"directory-3/zarr.json",
self.buffer_cls.from_bytes(json.dumps(meta).encode()),
)
group = await zarr.api.asynchronous.open_group(
store=store._url, path="directory-3", storage_options=storage_options
)
assert dict(group.attrs) == {"key": "value-3"}
Loading