diff --git a/docs/release.rst b/docs/release.rst index 4c978d1516..af4f621fd9 100644 --- a/docs/release.rst +++ b/docs/release.rst @@ -31,6 +31,17 @@ Next release documentation. By :user:`Josh Moore `; :issue:`571`. +* Added support for generic URL opening by ``fsspec``, where the URLs have the + form "protocol://[server]/path" or can be chained URls with "::" separators. + The additional argument ``storage_options`` is passed to the backend, see + the ``fsspec`` docs. + By :user:`Martin Durant `; :issue:`546` + +* Added support for fetching multiple items via ``getitems`` method of a + store, if it exists. This allows for concurrent fetching of data blocks + from stores that implement this; presently HTTP, S3, GCS. Currently only + applies to reading. + By :user:`Martin Durant `; :issue:`606` .. _release_2.4.0: diff --git a/requirements_dev_optional.txt b/requirements_dev_optional.txt index a5cc0e23bd..64a2081a69 100644 --- a/requirements_dev_optional.txt +++ b/requirements_dev_optional.txt @@ -18,6 +18,7 @@ pytest-cov==2.7.1 pytest-doctestplus==0.4.0 pytest-remotedata==0.3.2 h5py==2.10.0 -s3fs==0.5.0; python_version > '3.6' +s3fs==0.5.1; python_version > '3.6' +fsspec==0.8.3; python_version > '3.6' moto>=1.3.14; python_version > '3.6' flask diff --git a/zarr/core.py b/zarr/core.py index f753de2692..8727a5deb5 100644 --- a/zarr/core.py +++ b/zarr/core.py @@ -1020,11 +1020,18 @@ def _get_selection(self, indexer, out=None, fields=None): check_array_shape('out', out, out_shape) # iterate over chunks - for chunk_coords, chunk_selection, out_selection in indexer: + if not hasattr(self.chunk_store, "getitems"): + # sequentially get one key at a time from storage + for chunk_coords, chunk_selection, out_selection in indexer: - # load chunk selection into output array - self._chunk_getitem(chunk_coords, chunk_selection, out, out_selection, - drop_axes=indexer.drop_axes, fields=fields) + # load chunk selection into output array + self._chunk_getitem(chunk_coords, chunk_selection, out, out_selection, + drop_axes=indexer.drop_axes, fields=fields) + else: + # allow storage to get multiple items at once + lchunk_coords, lchunk_selection, lout_selection = zip(*indexer) + self._chunk_getitems(lchunk_coords, lchunk_selection, out, lout_selection, + drop_axes=indexer.drop_axes, fields=fields) if out.shape: return out @@ -1548,6 +1555,52 @@ def _set_selection(self, indexer, value, fields=None): # put data self._chunk_setitem(chunk_coords, chunk_selection, chunk_value, fields=fields) + def _process_chunk(self, out, cdata, chunk_selection, drop_axes, + out_is_ndarray, fields, out_selection): + """Take binary data from storage and fill output array""" + if (out_is_ndarray and + not fields and + is_contiguous_selection(out_selection) and + is_total_slice(chunk_selection, self._chunks) and + not self._filters and + self._dtype != object): + + dest = out[out_selection] + write_direct = ( + dest.flags.writeable and + ( + (self._order == 'C' and dest.flags.c_contiguous) or + (self._order == 'F' and dest.flags.f_contiguous) + ) + ) + + if write_direct: + + # optimization: we want the whole chunk, and the destination is + # contiguous, so we can decompress directly from the chunk + # into the destination array + + if self._compressor: + self._compressor.decode(cdata, dest) + else: + chunk = ensure_ndarray(cdata).view(self._dtype) + chunk = chunk.reshape(self._chunks, order=self._order) + np.copyto(dest, chunk) + return + + # decode chunk + chunk = self._decode_chunk(cdata) + + # select data from chunk + if fields: + chunk = chunk[fields] + tmp = chunk[chunk_selection] + if drop_axes: + tmp = np.squeeze(tmp, axis=drop_axes) + + # store selected data in output + out[out_selection] = tmp + def _chunk_getitem(self, chunk_coords, chunk_selection, out, out_selection, drop_axes=None, fields=None): """Obtain part or whole of a chunk. @@ -1568,15 +1621,14 @@ def _chunk_getitem(self, chunk_coords, chunk_selection, out, out_selection, TODO """ - - assert len(chunk_coords) == len(self._cdata_shape) - out_is_ndarray = True try: out = ensure_ndarray(out) except TypeError: out_is_ndarray = False + assert len(chunk_coords) == len(self._cdata_shape) + # obtain key for chunk ckey = self._chunk_key(chunk_coords) @@ -1594,48 +1646,36 @@ def _chunk_getitem(self, chunk_coords, chunk_selection, out, out_selection, out[out_selection] = fill_value else: + self._process_chunk(out, cdata, chunk_selection, drop_axes, + out_is_ndarray, fields, out_selection) - if (out_is_ndarray and - not fields and - is_contiguous_selection(out_selection) and - is_total_slice(chunk_selection, self._chunks) and - not self._filters and - self._dtype != object): - - dest = out[out_selection] - write_direct = ( - dest.flags.writeable and ( - (self._order == 'C' and dest.flags.c_contiguous) or - (self._order == 'F' and dest.flags.f_contiguous) - ) - ) - - if write_direct: + def _chunk_getitems(self, lchunk_coords, lchunk_selection, out, lout_selection, + drop_axes=None, fields=None): + """As _chunk_getitem, but for lists of chunks - # optimization: we want the whole chunk, and the destination is - # contiguous, so we can decompress directly from the chunk - # into the destination array + This gets called where the storage supports ``getitems``, so that + it can decide how to fetch the keys, allowing concurrency. + """ + out_is_ndarray = True + try: + out = ensure_ndarray(out) + except TypeError: # pragma: no cover + out_is_ndarray = False - if self._compressor: - self._compressor.decode(cdata, dest) + ckeys = [self._chunk_key(ch) for ch in lchunk_coords] + cdatas = self.chunk_store.getitems(ckeys) + for ckey, chunk_select, out_select in zip(ckeys, lchunk_selection, lout_selection): + if ckey in cdatas: + self._process_chunk(out, cdatas[ckey], chunk_select, drop_axes, + out_is_ndarray, fields, out_select) + else: + # check exception type + if self._fill_value is not None: + if fields: + fill_value = self._fill_value[fields] else: - chunk = ensure_ndarray(cdata).view(self._dtype) - chunk = chunk.reshape(self._chunks, order=self._order) - np.copyto(dest, chunk) - return - - # decode chunk - chunk = self._decode_chunk(cdata) - - # select data from chunk - if fields: - chunk = chunk[fields] - tmp = chunk[chunk_selection] - if drop_axes: - tmp = np.squeeze(tmp, axis=drop_axes) - - # store selected data in output - out[out_selection] = tmp + fill_value = self._fill_value + out[out_select] = fill_value def _chunk_setitem(self, chunk_coords, chunk_selection, value, fields=None): """Replace part or whole of a chunk. diff --git a/zarr/storage.py b/zarr/storage.py index f6fbe2d002..6716b3f06d 100644 --- a/zarr/storage.py +++ b/zarr/storage.py @@ -1038,6 +1038,10 @@ def _normalize_key(self, key): key = '/'.join(bits + [end.replace('.', self.key_separator)]) return key.lower() if self.normalize_keys else key + def getitems(self, keys): + keys = [self._normalize_key(key) for key in keys] + return self.map.getitems(keys, on_error="omit") + def __getitem__(self, key): key = self._normalize_key(key) try: diff --git a/zarr/tests/test_storage.py b/zarr/tests/test_storage.py index 7302cf1c8c..930e46a938 100644 --- a/zarr/tests/test_storage.py +++ b/zarr/tests/test_storage.py @@ -955,6 +955,29 @@ def test_s3(self): assert g.data[:].tolist() == [0, 1, 2, 3, 0, 0, 0, 0] + @pytest.mark.usefixtures("s3") + def test_s3_complex(self): + import zarr + g = zarr.open_group("s3://test/out.zarr", mode='w', + storage_options=self.s3so) + expected = np.empty((8, 8, 8), dtype='int64') + expected[:] = -1 + a = g.create_dataset("data", shape=(8, 8, 8), + fill_value=-1, chunks=(1, 1, 1)) + expected[0] = 0 + expected[3] = 3 + a[:4] = expected[:4] + + a = g.create_dataset("data_f", shape=(8, ), chunks=(1,), + dtype=[('foo', 'S3'), ('bar', 'i4')], + fill_value=(b"b", 1)) + a[:4] = (b"aaa", 2) + g = zarr.open_group("s3://test/out.zarr", mode='r', + storage_options=self.s3so) + + assert (g.data[:] == expected).all() + assert g.data_f['foo'].tolist() == [b"aaa"] * 4 + [b"b"] * 4 + @pytest.fixture() def s3(request):