diff --git a/docs/guide.md b/docs/guide.md index f825cebe2..cb7156c52 100644 --- a/docs/guide.md +++ b/docs/guide.md @@ -334,6 +334,42 @@ the context of a `Session` with the `DataClient`: >>> asyncio.run(main()) ``` +#### Downloading an Asset + +Downloading an asset is a multi-step process involving: activating the asset, +waiting for the asset to be active, downloading the asset, and, optionally, +validating the downloaded file. + + +With wait and download, it is often desired to track progress as these +processes can take a long time. Therefore, in this example, we use a simple +print command to report wait status. `download_asset` has reporting built in. + +```python +>>> async def download_and_validate(): +... async with Session() as sess: +... cl = DataClient(sess) +... +... # get asset description +... item_type_id = 'PSScene' +... item_id = '20221003_002705_38_2461' +... asset_type_id = 'ortho_analytic_4b' +... asset = await cl.get_asset(item_type_id, item_id, asset_type_id) +... +... # activate asset +... await cl.activate_asset(asset) +... +... # wait for asset to become active +... asset = await cl.wait_asset(asset, callback=print) +... +... # download asset +... path = await cl.download_asset(asset) +... +... # validate download file +... cl.validate_checksum(asset, path) +``` + + ## CLI ### Authentication diff --git a/planet/clients/data.py b/planet/clients/data.py index b2f5d50b2..4fd3eaa03 100644 --- a/planet/clients/data.py +++ b/planet/clients/data.py @@ -12,13 +12,17 @@ # License for the specific language governing permissions and limitations under # the License. """Functionality for interacting with the data api""" +import asyncio +import hashlib import logging +from pathlib import Path +import time import typing from .. import exceptions from ..constants import PLANET_BASE_URL from ..http import Session -from ..models import Paged, Request, Response +from ..models import Paged, Request, Response, StreamingBody BASE_URL = f'{PLANET_BASE_URL}/data/v1/' SEARCHES_PATH = '/searches' @@ -86,6 +90,9 @@ def __init__(self, session: Session, base_url: str = None): def _searches_url(self): return f'{self._base_url}{SEARCHES_PATH}' + def _item_url(self, item_type, item_id): + return f'{self._base_url}/item-types/{item_type}/items/{item_id}' + def _request(self, url, method, data=None, params=None, json=None): return Request(url, method=method, data=data, params=params, json=json) @@ -493,7 +500,10 @@ async def list_item_assets(self, item_type_id: str, Raises: planet.exceptions.APIError: On API error. """ - raise NotImplementedError + url = f'{self._item_url(item_type_id, item_id)}/assets' + request = self._request(url, method='GET') + response = await self._do_request(request) + return response.json() async def get_asset(self, item_type_id: str, @@ -514,26 +524,46 @@ async def get_asset(self, planet.exceptions.ClientError: If asset type identifier is not valid. """ - # NOTE: this is not an API endpoint - # this is getting an asset by name from the dict returned by - # list_item_assets() - raise NotImplementedError + assets = await self.list_item_assets(item_type_id, item_id) + + try: + asset = assets[asset_type_id] + except KeyError: + valid = list(assets.keys()) + raise exceptions.ClientError( + f'asset_type_id ({asset_type_id}) must be one of {valid}') + + return asset async def activate_asset(self, asset: dict): """Activate an item asset. Parameters: - asset: Description of the asset. + asset: Description of the asset. Obtained from get_asset(). Raises: planet.exceptions.APIError: On API error. planet.exceptions.ClientError: If asset description is not valid. """ - # NOTE: this is not an API endpoint - # This is getting the 'activate' link from the asset description - # and then sending the activate request to that link - raise NotImplementedError + try: + status = asset['status'] + except KeyError: + raise exceptions.ClientError('asset missing ["status"] entry.') + + try: + url = asset['_links']['activate'] + except KeyError: + raise exceptions.ClientError( + 'asset missing ["_links"]["activate"] entry') + + # lets not try to activate an asset already activating or active + if status == 'inactive': + request = self._request(url, method='GET') + # no response is returned + await self._do_request(request) + + return async def wait_asset(self, asset: dict, @@ -542,8 +572,11 @@ async def wait_asset(self, callback: typing.Callable[[str], None] = None) -> str: """Wait for an item asset to be active. + Prior to waiting for the asset to be active, be sure to activate the + asset with activate_asset(). + Parameters: - asset: Description of the asset. + asset: Description of the asset. Obtained from get_asset(). delay: Time (in seconds) between polls. max_attempts: Maximum number of polls. When set to 0, no limit is applied. @@ -558,26 +591,64 @@ async def wait_asset(self, not available or if the maximum number of attempts is reached before the asset is active. """ - # NOTE: this is not an API endpoint - # This is getting and checking the asset status and waiting until - # the asset is active - # NOTE: use the url at asset['_links']['_self'] to get the current - # asset status - raise NotImplementedError + # loop without end if max_attempts is zero + # otherwise, loop until num_attempts reaches max_attempts + num_attempts = 0 + while not max_attempts or num_attempts < max_attempts: + t = time.time() + + try: + current_status = asset['status'] + except KeyError: + raise exceptions.ClientError('asset missing ["status"] entry.') + + LOGGER.debug(current_status) + + if callback: + callback(current_status) + + if current_status == 'active': + break + + sleep_time = max(delay - (time.time() - t), 0) + LOGGER.debug(f'sleeping {sleep_time}s') + await asyncio.sleep(sleep_time) + + num_attempts += 1 + + try: + asset_url = asset['_links']['_self'] + except KeyError: + raise exceptions.ClientError( + 'asset missing ["_links"]["_self"] entry.') + + request = self._request(asset_url, method='GET') + response = await self._do_request(request) + asset = response.json() + + if max_attempts and num_attempts >= max_attempts: + raise exceptions.ClientError( + f'Maximum number of attempts ({max_attempts}) reached.') + + return asset async def download_asset(self, asset: dict, filename: str = None, - directory: str = None, + directory: Path = Path('.'), overwrite: bool = False, progress_bar: bool = True) -> str: """Download an asset. - Asset description is obtained from get_asset() or wait_asset(). + The asset must be active before it can be downloaded. This can be + achieved with activate_asset() followed by wait_asset(). + + If overwrite is False and the file already exists, download will be + skipped and the file path will be returned as usual. Parameters: - asset: Description of the asset. - location: Download location url including download token. + asset: Description of the asset. Obtained from get_asset() or + wait_asset(). filename: Custom name to assign to downloaded file. directory: Base directory for file download. overwrite: Overwrite any existing files. @@ -588,10 +659,53 @@ async def download_asset(self, Raises: planet.exceptions.APIError: On API error. - planet.exceptions.ClientError: If asset is not activated or asset + planet.exceptions.ClientError: If asset is not active or asset description is not valid. """ - # NOTE: this is not an API endpoint - # This is getting the download location from the asset description - # and then downloading the file at that location - raise NotImplementedError + try: + location = asset['location'] + except KeyError: + raise exceptions.ClientError( + 'asset missing ["location"] entry. Is asset active?') + + req = self._request(location, method='GET') + + async with self._session.stream(req) as resp: + body = StreamingBody(resp) + dl_path = Path(directory, filename or body.name) + dl_path.parent.mkdir(exist_ok=True, parents=True) + await body.write(dl_path, + overwrite=overwrite, + progress_bar=progress_bar) + return dl_path + + @staticmethod + def validate_checksum(asset: dict, filename: Path): + """Validate checksum of downloaded file + + Compares checksum calculated from the file against the value provided + in the asset. + + Parameters: + asset: Description of the asset. Obtained from get_asset() or + wait_asset(). + filename: Full path to downloaded file. + + Raises: + planet.exceptions.ClientError: If the file does not exist or if + checksums do not match. + """ + try: + file_hash = hashlib.md5(filename.read_bytes()).hexdigest() + except FileNotFoundError: + raise exceptions.ClientError(f'File ({filename}) does not exist.') + + try: + origin_hash = asset['md5_digest'] + except KeyError: + raise exceptions.ClientError( + 'asset missing ["md5_digest"] entry. Is asset active?') + + if origin_hash != file_hash: + raise exceptions.ClientError( + f'File ({filename}) checksums do not match.') diff --git a/tests/integration/test_data_api.py b/tests/integration/test_data_api.py index cd84688f3..2e9522f28 100644 --- a/tests/integration/test_data_api.py +++ b/tests/integration/test_data_api.py @@ -12,9 +12,13 @@ # License for the specific language governing permissions and limitations under # the License. from contextlib import nullcontext as does_not_raise +import copy from http import HTTPStatus +import hashlib import json import logging +import math +from pathlib import Path import httpx import pytest @@ -435,3 +439,314 @@ async def test_get_stats_invalid_interval(search_filter, session): with pytest.raises(exceptions.ClientError): await cl.get_stats(['PSScene'], search_filter, 'invalid') + + +@respx.mock +@pytest.mark.asyncio +async def test_list_item_assets_success(session): + item_type_id = 'PSScene' + item_id = '20221003_002705_38_2461' + assets_url = f'{TEST_URL}/item-types/{item_type_id}/items/{item_id}/assets' + + page_response = { + "basic_analytic_4b": { + "_links": { + "_self": + "SELFURL", + "activate": + "ACTIVATEURL", + "type": + "https://api.planet.com/data/v1/asset-types/basic_analytic_4b" + }, + "_permissions": ["download"], + "md5_digest": None, + "status": "inactive", + "type": "basic_analytic_4b" + }, + "basic_udm2": { + "_links": { + "_self": "SELFURL", + "activate": "ACTIVATEURL", + "type": "https://api.planet.com/data/v1/asset-types/basic_udm2" + }, + "_permissions": ["download"], + "md5_digest": None, + "status": "inactive", + "type": "basic_udm2" + } + } + mock_resp = httpx.Response(HTTPStatus.OK, json=page_response) + respx.get(assets_url).return_value = mock_resp + + cl = DataClient(session, base_url=TEST_URL) + assets = await cl.list_item_assets(item_type_id, item_id) + + # check the response is returned unaltered + assert assets == page_response + + +@respx.mock +@pytest.mark.asyncio +async def test_list_item_assets_missing(session): + item_type_id = 'PSScene' + item_id = '20221003_002705_38_2461xx' + assets_url = f'{TEST_URL}/item-types/{item_type_id}/items/{item_id}/assets' + + mock_resp = httpx.Response(404) + respx.get(assets_url).return_value = mock_resp + + cl = DataClient(session, base_url=TEST_URL) + + with pytest.raises(exceptions.APIError): + await cl.list_item_assets(item_type_id, item_id) + + +@respx.mock +@pytest.mark.asyncio +@pytest.mark.parametrize("asset_type_id, expectation", + [('basic_udm2', does_not_raise()), + ('invalid', pytest.raises(exceptions.ClientError))]) +async def test_get_asset(asset_type_id, expectation, session): + item_type_id = 'PSScene' + item_id = '20221003_002705_38_2461' + assets_url = f'{TEST_URL}/item-types/{item_type_id}/items/{item_id}/assets' + + basic_udm2_asset = { + "_links": { + "_self": "SELFURL", + "activate": "ACTIVATEURL", + "type": "https://api.planet.com/data/v1/asset-types/basic_udm2" + }, + "_permissions": ["download"], + "md5_digest": None, + "status": "inactive", + "type": "basic_udm2" + } + + page_response = { + "basic_analytic_4b": { + "_links": { + "_self": + "SELFURL", + "activate": + "ACTIVATEURL", + "type": + "https://api.planet.com/data/v1/asset-types/basic_analytic_4b" + }, + "_permissions": ["download"], + "md5_digest": None, + "status": "inactive", + "type": "basic_analytic_4b" + }, + "basic_udm2": basic_udm2_asset + } + + mock_resp = httpx.Response(HTTPStatus.OK, json=page_response) + respx.get(assets_url).return_value = mock_resp + + cl = DataClient(session, base_url=TEST_URL) + + with expectation: + asset = await cl.get_asset(item_type_id, item_id, asset_type_id) + assert asset == basic_udm2_asset + + +@respx.mock +@pytest.mark.asyncio +@pytest.mark.parametrize("status, expectation", [('inactive', True), + ('active', False)]) +async def test_activate_asset_success(status, expectation, session): + activate_url = f'{TEST_URL}/activate' + + mock_resp = httpx.Response(HTTPStatus.OK) + route = respx.get(activate_url) + route.return_value = mock_resp + + basic_udm2_asset = { + "_links": { + "_self": "SELFURL", + "activate": activate_url, + "type": "https://api.planet.com/data/v1/asset-types/basic_udm2" + }, + "_permissions": ["download"], + "md5_digest": None, + "status": status, + "type": "basic_udm2" + } + + cl = DataClient(session, base_url=TEST_URL) + await cl.activate_asset(basic_udm2_asset) + + assert route.called == expectation + + +@respx.mock +@pytest.mark.asyncio +async def test_activate_asset_invalid_asset(session): + cl = DataClient(session, base_url=TEST_URL) + + with pytest.raises(exceptions.ClientError): + await cl.activate_asset({}) + + +@respx.mock +@pytest.mark.asyncio +async def test_wait_asset_success(session): + asset_url = f'{TEST_URL}/asset' + + basic_udm2_asset = { + "_links": { + "_self": asset_url, + "activate": "ACTIVATEURL", + "type": "https://api.planet.com/data/v1/asset-types/basic_udm2" + }, + "_permissions": ["download"], + "md5_digest": None, + "status": 'activating', + "type": "basic_udm2" + } + + basic_udm2_asset_active = copy.deepcopy(basic_udm2_asset) + basic_udm2_asset_active['status'] = 'active' + + route = respx.get(asset_url) + route.side_effect = [ + httpx.Response(HTTPStatus.OK, json=basic_udm2_asset), + httpx.Response(HTTPStatus.OK, json=basic_udm2_asset), + httpx.Response(HTTPStatus.OK, json=basic_udm2_asset_active) + ] + + cl = DataClient(session, base_url=TEST_URL) + asset = await cl.wait_asset(basic_udm2_asset, delay=0) + + assert asset == basic_udm2_asset_active + + +@respx.mock +@pytest.mark.asyncio +async def test_wait_asset_max_attempts(session): + asset_url = f'{TEST_URL}/asset' + + basic_udm2_asset = { + "_links": { + "_self": asset_url, + "activate": "ACTIVATEURL", + "type": "https://api.planet.com/data/v1/asset-types/basic_udm2" + }, + "_permissions": ["download"], + "md5_digest": None, + "status": 'activating', + "type": "basic_udm2" + } + + route = respx.get(asset_url) + route.side_effect = [ + httpx.Response(HTTPStatus.OK, json=basic_udm2_asset), + httpx.Response(HTTPStatus.OK, json=basic_udm2_asset), + ] + + cl = DataClient(session, base_url=TEST_URL) + + with pytest.raises(exceptions.ClientError): + await cl.wait_asset(basic_udm2_asset, delay=0, max_attempts=1) + + +@respx.mock +@pytest.mark.asyncio +@pytest.mark.parametrize("exists, overwrite", + [(False, False), (True, False), (True, True), + (False, True)]) +async def test_download_asset(exists, + overwrite, + tmpdir, + open_test_img, + session): + # NOTE: this is a slightly edited version of test_download_asset_img from + # tests/integration/test_orders_api + dl_url = f'{TEST_URL}/1?token=IAmAToken' + + img_headers = { + 'Content-Type': 'image/tiff', + 'Content-Length': '527', + 'Content-Disposition': 'attachment; filename="img.tif"' + } + + async def _stream_img(): + data = open_test_img.read() + v = memoryview(data) + + chunksize = 100 + for i in range(math.ceil(len(v) / (chunksize))): + yield v[i * chunksize:min((i + 1) * chunksize, len(v))] + + # populate request parameter to avoid respx cloning, which throws + # an error caused by respx and not this code + # https://github.com/lundberg/respx/issues/130 + mock_resp = httpx.Response(HTTPStatus.OK, + stream=_stream_img(), + headers=img_headers, + request='donotcloneme') + respx.get(dl_url).return_value = mock_resp + + basic_udm2_asset = { + "_links": { + "_self": "SELFURL", + "activate": "ACTIVATEURL", + "type": "https://api.planet.com/data/v1/asset-types/basic_udm2" + }, + "_permissions": ["download"], + "md5_digest": None, + "status": 'active', + "location": dl_url, + "type": "basic_udm2" + } + + cl = DataClient(session, base_url=TEST_URL) + + if exists: + Path(tmpdir, 'img.tif').write_text('i exist') + + path = await cl.download_asset(basic_udm2_asset, + directory=tmpdir, + overwrite=overwrite) + assert path.name == 'img.tif' + assert path.is_file() + + if exists and not overwrite: + assert path.read_text() == 'i exist' + else: + assert len(path.read_bytes()) == 527 + + +@respx.mock +@pytest.mark.asyncio +@pytest.mark.parametrize("hashes_match, md5_entry, expectation", + [(True, True, does_not_raise()), + (False, True, pytest.raises(exceptions.ClientError)), + (True, False, pytest.raises(exceptions.ClientError))] + ) +async def test_validate_checksum(hashes_match, md5_entry, expectation, tmpdir): + test_bytes = b'foo bar' + testfile = Path(tmpdir / 'test.txt') + testfile.write_bytes(test_bytes) + + hash_md5 = hashlib.md5(test_bytes).hexdigest() + + basic_udm2_asset = { + "_links": { + "_self": "SELFURL", + "activate": "ACTIVATEURL", + "type": "https://api.planet.com/data/v1/asset-types/basic_udm2" + }, + "_permissions": ["download"], + "status": 'active', + "location": "DOWNLOADURL", + "type": "basic_udm2" + } + + if md5_entry: + asset_hash = hash_md5 if hashes_match else 'invalid' + basic_udm2_asset["md5_digest"] = asset_hash + + with expectation: + DataClient.validate_checksum(basic_udm2_asset, testfile) diff --git a/tests/integration/test_orders_api.py b/tests/integration/test_orders_api.py index 9acd4eec4..00701d10d 100644 --- a/tests/integration/test_orders_api.py +++ b/tests/integration/test_orders_api.py @@ -559,6 +559,41 @@ async def test_download_asset_md(tmpdir, session): assert Path(filename).name == 'metadata.json' +@respx.mock +@pytest.mark.asyncio +async def test_download_asset_img(tmpdir, open_test_img, session): + dl_url = TEST_DOWNLOAD_URL + '/1?token=IAmAToken' + + img_headers = { + 'Content-Type': 'image/tiff', + 'Content-Length': '527', + 'Content-Disposition': 'attachment; filename="img.tif"' + } + + async def _stream_img(): + data = open_test_img.read() + v = memoryview(data) + + chunksize = 100 + for i in range(math.ceil(len(v) / (chunksize))): + yield v[i * chunksize:min((i + 1) * chunksize, len(v))] + + # populate request parameter to avoid respx cloning, which throws + # an error caused by respx and not this code + # https://github.com/lundberg/respx/issues/130 + mock_resp = httpx.Response(HTTPStatus.OK, + stream=_stream_img(), + headers=img_headers, + request='donotcloneme') + respx.get(dl_url).return_value = mock_resp + + cl = OrdersClient(session, base_url=TEST_URL) + filename = await cl.download_asset(dl_url, directory=str(tmpdir)) + + assert Path(filename).name == 'img.tif' + assert os.path.isfile(filename) + + @respx.mock @pytest.mark.asyncio @pytest.mark.parametrize("checksum", [("MD5"), ("SHA256")]) @@ -644,41 +679,6 @@ async def test_validate_checksum_manifest( OrdersClient.validate_checksum(Path(tmpdir), 'md5') -@respx.mock -@pytest.mark.asyncio -async def test_download_asset_img(tmpdir, open_test_img, session): - dl_url = TEST_DOWNLOAD_URL + '/1?token=IAmAToken' - - img_headers = { - 'Content-Type': 'image/tiff', - 'Content-Length': '527', - 'Content-Disposition': 'attachment; filename="img.tif"' - } - - async def _stream_img(): - data = open_test_img.read() - v = memoryview(data) - - chunksize = 100 - for i in range(math.ceil(len(v) / (chunksize))): - yield v[i * chunksize:min((i + 1) * chunksize, len(v))] - - # populate request parameter to avoid respx cloning, which throws - # an error caused by respx and not this code - # https://github.com/lundberg/respx/issues/130 - mock_resp = httpx.Response(HTTPStatus.OK, - stream=_stream_img(), - headers=img_headers, - request='donotcloneme') - respx.get(dl_url).return_value = mock_resp - - cl = OrdersClient(session, base_url=TEST_URL) - filename = await cl.download_asset(dl_url, directory=str(tmpdir)) - - assert Path(filename).name == 'img.tif' - assert os.path.isfile(filename) - - @respx.mock @pytest.mark.asyncio @pytest.mark.parametrize(