Skip to content

Implement data api asset download and supporting functions #707

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Oct 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions docs/guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,42 @@ the context of a `Session` with the `DataClient`:
>>> asyncio.run(main())
```

#### Downloading an Asset

Downloading an asset is a multi-step process involving: activating the asset,
waiting for the asset to be active, downloading the asset, and, optionally,
validating the downloaded file.


With wait and download, it is often desired to track progress as these
processes can take a long time. Therefore, in this example, we use a simple
print command to report wait status. `download_asset` has reporting built in.

```python
>>> async def download_and_validate():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am against having the added ">>>"s and "..."s in example code because it makes it difficult for users to copy and paste and immediately use. We're also inconsistent with its usage. We add it to many of our examples, but don't have it in the checksum example (line 262 in this MD).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm against it too. It is included here for consistency and we need to do a sweep of the docs to remove these. I'm not sure why it isn't in the checksum example, it probably slipped through a PR. But that isn't a part of this PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tricky thing is that >>> is kinda normalized in Python and numpy. See https://docs.python.org/3/tutorial/introduction.html#numbers and https://numpydoc.readthedocs.io/en/latest/format.html#examples. So without some doc linting, this usage might continue to creep in. For what it's worth, the Python doc source includes the >>> prompt (see https://github.com/python/cpython/blob/3.10/Doc/tutorial/introduction.rst#numbers) and the website has a js widget to hide it if you want to copy it without.

... async with Session() as sess:
... cl = DataClient(sess)
...
... # get asset description
... item_type_id = 'PSScene'
... item_id = '20221003_002705_38_2461'
... asset_type_id = 'ortho_analytic_4b'
... asset = await cl.get_asset(item_type_id, item_id, asset_type_id)
...
... # activate asset
... await cl.activate_asset(asset)
...
... # wait for asset to become active
... asset = await cl.wait_asset(asset, callback=print)
...
... # download asset
... path = await cl.download_asset(asset)
...
... # validate download file
... cl.validate_checksum(asset, path)
```


## CLI

### Authentication
Expand Down
168 changes: 141 additions & 27 deletions planet/clients/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,17 @@
# License for the specific language governing permissions and limitations under
# the License.
"""Functionality for interacting with the data api"""
import asyncio
import hashlib
import logging
from pathlib import Path
import time
import typing

from .. import exceptions
from ..constants import PLANET_BASE_URL
from ..http import Session
from ..models import Paged, Request, Response
from ..models import Paged, Request, Response, StreamingBody

BASE_URL = f'{PLANET_BASE_URL}/data/v1/'
SEARCHES_PATH = '/searches'
Expand Down Expand Up @@ -86,6 +90,9 @@ def __init__(self, session: Session, base_url: str = None):
def _searches_url(self):
return f'{self._base_url}{SEARCHES_PATH}'

def _item_url(self, item_type, item_id):
return f'{self._base_url}/item-types/{item_type}/items/{item_id}'

def _request(self, url, method, data=None, params=None, json=None):
return Request(url, method=method, data=data, params=params, json=json)

Expand Down Expand Up @@ -493,7 +500,10 @@ async def list_item_assets(self, item_type_id: str,
Raises:
planet.exceptions.APIError: On API error.
"""
raise NotImplementedError
url = f'{self._item_url(item_type_id, item_id)}/assets'
request = self._request(url, method='GET')
response = await self._do_request(request)
return response.json()

async def get_asset(self,
item_type_id: str,
Expand All @@ -514,26 +524,46 @@ async def get_asset(self,
planet.exceptions.ClientError: If asset type identifier is not
valid.
"""
# NOTE: this is not an API endpoint
# this is getting an asset by name from the dict returned by
# list_item_assets()
raise NotImplementedError
assets = await self.list_item_assets(item_type_id, item_id)

try:
asset = assets[asset_type_id]
except KeyError:
valid = list(assets.keys())
raise exceptions.ClientError(
f'asset_type_id ({asset_type_id}) must be one of {valid}')

return asset

async def activate_asset(self, asset: dict):
"""Activate an item asset.

Parameters:
asset: Description of the asset.
asset: Description of the asset. Obtained from get_asset().

Raises:
planet.exceptions.APIError: On API error.
planet.exceptions.ClientError: If asset description is not
valid.
"""
# NOTE: this is not an API endpoint
# This is getting the 'activate' link from the asset description
# and then sending the activate request to that link
raise NotImplementedError
try:
status = asset['status']
except KeyError:
raise exceptions.ClientError('asset missing ["status"] entry.')

try:
url = asset['_links']['activate']
except KeyError:
raise exceptions.ClientError(
'asset missing ["_links"]["activate"] entry')

# lets not try to activate an asset already activating or active
if status == 'inactive':
request = self._request(url, method='GET')
# no response is returned
await self._do_request(request)

return

async def wait_asset(self,
asset: dict,
Expand All @@ -542,8 +572,11 @@ async def wait_asset(self,
callback: typing.Callable[[str], None] = None) -> str:
"""Wait for an item asset to be active.

Prior to waiting for the asset to be active, be sure to activate the
asset with activate_asset().

Parameters:
asset: Description of the asset.
asset: Description of the asset. Obtained from get_asset().
delay: Time (in seconds) between polls.
max_attempts: Maximum number of polls. When set to 0, no limit
is applied.
Expand All @@ -558,26 +591,64 @@ async def wait_asset(self,
not available or if the maximum number of attempts is reached
before the asset is active.
"""
# NOTE: this is not an API endpoint
# This is getting and checking the asset status and waiting until
# the asset is active
# NOTE: use the url at asset['_links']['_self'] to get the current
# asset status
raise NotImplementedError
# loop without end if max_attempts is zero
# otherwise, loop until num_attempts reaches max_attempts
num_attempts = 0
while not max_attempts or num_attempts < max_attempts:
t = time.time()

try:
current_status = asset['status']
except KeyError:
raise exceptions.ClientError('asset missing ["status"] entry.')

LOGGER.debug(current_status)

if callback:
callback(current_status)

if current_status == 'active':
break

sleep_time = max(delay - (time.time() - t), 0)
LOGGER.debug(f'sleeping {sleep_time}s')
await asyncio.sleep(sleep_time)

num_attempts += 1

try:
asset_url = asset['_links']['_self']
except KeyError:
raise exceptions.ClientError(
'asset missing ["_links"]["_self"] entry.')

request = self._request(asset_url, method='GET')
response = await self._do_request(request)
asset = response.json()

if max_attempts and num_attempts >= max_attempts:
raise exceptions.ClientError(
f'Maximum number of attempts ({max_attempts}) reached.')

return asset

async def download_asset(self,
asset: dict,
filename: str = None,
directory: str = None,
directory: Path = Path('.'),
overwrite: bool = False,
progress_bar: bool = True) -> str:
"""Download an asset.

Asset description is obtained from get_asset() or wait_asset().
The asset must be active before it can be downloaded. This can be
achieved with activate_asset() followed by wait_asset().

If overwrite is False and the file already exists, download will be
skipped and the file path will be returned as usual.

Parameters:
asset: Description of the asset.
location: Download location url including download token.
asset: Description of the asset. Obtained from get_asset() or
wait_asset().
filename: Custom name to assign to downloaded file.
directory: Base directory for file download.
overwrite: Overwrite any existing files.
Expand All @@ -588,10 +659,53 @@ async def download_asset(self,

Raises:
planet.exceptions.APIError: On API error.
planet.exceptions.ClientError: If asset is not activated or asset
planet.exceptions.ClientError: If asset is not active or asset
description is not valid.
"""
# NOTE: this is not an API endpoint
# This is getting the download location from the asset description
# and then downloading the file at that location
raise NotImplementedError
try:
location = asset['location']
except KeyError:
raise exceptions.ClientError(
'asset missing ["location"] entry. Is asset active?')

req = self._request(location, method='GET')

async with self._session.stream(req) as resp:
body = StreamingBody(resp)
dl_path = Path(directory, filename or body.name)
dl_path.parent.mkdir(exist_ok=True, parents=True)
await body.write(dl_path,
overwrite=overwrite,
progress_bar=progress_bar)
return dl_path

@staticmethod
def validate_checksum(asset: dict, filename: Path):
"""Validate checksum of downloaded file

Compares checksum calculated from the file against the value provided
in the asset.

Parameters:
asset: Description of the asset. Obtained from get_asset() or
wait_asset().
filename: Full path to downloaded file.

Raises:
planet.exceptions.ClientError: If the file does not exist or if
checksums do not match.
"""
try:
file_hash = hashlib.md5(filename.read_bytes()).hexdigest()
except FileNotFoundError:
raise exceptions.ClientError(f'File ({filename}) does not exist.')

try:
origin_hash = asset['md5_digest']
except KeyError:
raise exceptions.ClientError(
'asset missing ["md5_digest"] entry. Is asset active?')

if origin_hash != file_hash:
raise exceptions.ClientError(
f'File ({filename}) checksums do not match.')
Loading