Skip to content

Commit 7076fe0

Browse files
authored
fix: Fix DynamoDB fetches when there are entities that are not found (#2573)
* fix: Fix DynamoDB fetches when there are entities that are not found Signed-off-by: Danny Chiao <[email protected]> * remove sort_keys from dynamo since they must be sorted. Add better test for different unknowns Signed-off-by: Danny Chiao <[email protected]>
1 parent 8a17041 commit 7076fe0

File tree

3 files changed

+55
-21
lines changed

3 files changed

+55
-21
lines changed

sdk/python/feast/infra/online_stores/dynamodb.py

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -59,9 +59,6 @@ class DynamoDBOnlineStoreConfig(FeastConfigBaseModel):
5959
region: StrictStr
6060
"""AWS Region Name"""
6161

62-
sort_response: bool = True
63-
"""Whether or not to sort BatchGetItem response."""
64-
6562
table_name_template: StrictStr = "{project}.{table_name}"
6663
"""DynamoDB table name template"""
6764

@@ -204,9 +201,6 @@ def online_read(
204201
"""
205202
Retrieve feature values from the online DynamoDB store.
206203
207-
Note: This method is currently not optimized to retrieve a lot of data at a time
208-
as it does sequential gets from the DynamoDB table.
209-
210204
Args:
211205
config: The RepoConfig for the current FeatureStore.
212206
table: Feast FeatureView.
@@ -224,7 +218,6 @@ def online_read(
224218
result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = []
225219
entity_ids = [compute_entity_id(entity_key) for entity_key in entity_keys]
226220
batch_size = online_config.batch_size
227-
sort_response = online_config.sort_response
228221
entity_ids_iter = iter(entity_ids)
229222
while True:
230223
batch = list(itertools.islice(entity_ids_iter, batch_size))
@@ -243,20 +236,27 @@ def online_read(
243236
response = response.get("Responses")
244237
table_responses = response.get(table_instance.name)
245238
if table_responses:
246-
if sort_response:
247-
table_responses = self._sort_dynamodb_response(
248-
table_responses, entity_ids
249-
)
239+
table_responses = self._sort_dynamodb_response(
240+
table_responses, entity_ids
241+
)
242+
entity_idx = 0
250243
for tbl_res in table_responses:
244+
entity_id = tbl_res["entity_id"]
245+
while entity_id != batch[entity_idx]:
246+
result.append((None, None))
247+
entity_idx += 1
251248
res = {}
252249
for feature_name, value_bin in tbl_res["values"].items():
253250
val = ValueProto()
254251
val.ParseFromString(value_bin.value)
255252
res[feature_name] = val
256253
result.append((datetime.fromisoformat(tbl_res["event_ts"]), res))
257-
else:
258-
batch_size_nones = ((None, None),) * len(batch)
259-
result.extend(batch_size_nones)
254+
entity_idx += 1
255+
256+
# Not all entities in a batch may have responses
257+
# Pad with remaining values in batch that were not found
258+
batch_size_nones = ((None, None),) * (len(batch) - len(result))
259+
result.extend(batch_size_nones)
260260
return result
261261

262262
def _get_dynamodb_client(self, region: str, endpoint_url: Optional[str] = None):

sdk/python/feast/infra/online_stores/online_store.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -76,9 +76,9 @@ def online_read(
7676
entity_keys: a list of entity keys that should be read from the FeatureStore.
7777
requested_features: (Optional) A subset of the features that should be read from the FeatureStore.
7878
Returns:
79-
Data is returned as a list, one item per entity key. Each item in the list is a tuple
80-
of event_ts for the row, and the feature data as a dict from feature names to values.
81-
Values are returned as Value proto message.
79+
Data is returned as a list, one item per entity key in the original order as the entity_keys argument.
80+
Each item in the list is a tuple of event_ts for the row, and the feature data as a dict from feature names
81+
to values. Values are returned as Value proto message.
8282
"""
8383
...
8484

sdk/python/tests/unit/infra/online_store/test_dynamodb_online_store.py

Lines changed: 38 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
DynamoDBOnlineStoreConfig,
1212
DynamoDBTable,
1313
)
14+
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
15+
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
1416
from feast.repo_config import RepoConfig
1517
from tests.utils.online_store_utils import (
1618
_create_n_customer_test_samples,
@@ -49,7 +51,6 @@ def test_online_store_config_default():
4951
assert dynamodb_store_config.batch_size == 40
5052
assert dynamodb_store_config.endpoint_url is None
5153
assert dynamodb_store_config.region == aws_region
52-
assert dynamodb_store_config.sort_response is True
5354
assert dynamodb_store_config.table_name_template == "{project}.{table_name}"
5455

5556

@@ -70,20 +71,17 @@ def test_online_store_config_custom_params():
7071
aws_region = "us-west-2"
7172
batch_size = 20
7273
endpoint_url = "http://localhost:8000"
73-
sort_response = False
7474
table_name_template = "feast_test.dynamodb_table"
7575
dynamodb_store_config = DynamoDBOnlineStoreConfig(
7676
region=aws_region,
7777
batch_size=batch_size,
7878
endpoint_url=endpoint_url,
79-
sort_response=sort_response,
8079
table_name_template=table_name_template,
8180
)
8281
assert dynamodb_store_config.type == "dynamodb"
8382
assert dynamodb_store_config.batch_size == batch_size
8483
assert dynamodb_store_config.endpoint_url == endpoint_url
8584
assert dynamodb_store_config.region == aws_region
86-
assert dynamodb_store_config.sort_response == sort_response
8785
assert dynamodb_store_config.table_name_template == table_name_template
8886

8987

@@ -175,6 +173,42 @@ def test_online_read(repo_config, n_samples):
175173
assert [item[1] for item in returned_items] == list(features)
176174

177175

176+
@mock_dynamodb2
177+
def test_online_read_unknown_entity(repo_config):
178+
"""Test DynamoDBOnlineStore online_read method."""
179+
n_samples = 2
180+
_create_test_table(PROJECT, f"{TABLE_NAME}_{n_samples}", REGION)
181+
data = _create_n_customer_test_samples(n=n_samples)
182+
_insert_data_test_table(data, PROJECT, f"{TABLE_NAME}_{n_samples}", REGION)
183+
184+
entity_keys, features, *rest = zip(*data)
185+
# Append a nonsensical entity to search for
186+
entity_keys = list(entity_keys)
187+
features = list(features)
188+
dynamodb_store = DynamoDBOnlineStore()
189+
190+
# Have the unknown entity be in the beginning, middle, and end of the list of entities.
191+
for pos in range(len(entity_keys)):
192+
entity_keys_with_unknown = deepcopy(entity_keys)
193+
entity_keys_with_unknown.insert(
194+
pos,
195+
EntityKeyProto(
196+
join_keys=["customer"], entity_values=[ValueProto(string_val="12359")]
197+
),
198+
)
199+
features_with_none = deepcopy(features)
200+
features_with_none.insert(pos, None)
201+
returned_items = dynamodb_store.online_read(
202+
config=repo_config,
203+
table=MockFeatureView(name=f"{TABLE_NAME}_{n_samples}"),
204+
entity_keys=entity_keys_with_unknown,
205+
)
206+
assert len(returned_items) == len(entity_keys_with_unknown)
207+
assert [item[1] for item in returned_items] == list(features_with_none)
208+
# The order should match the original entity key order
209+
assert returned_items[pos] == (None, None)
210+
211+
178212
@mock_dynamodb2
179213
def test_write_batch_non_duplicates(repo_config):
180214
"""Test DynamoDBOnline Store deduplicate write batch request items."""

0 commit comments

Comments
 (0)