Skip to content

Commit 4310ed7

Browse files
fix: Fix feature service inference logic (#3089)
* Add __init__.py files to allow test files to share names Signed-off-by: Felix Wang <[email protected]> * Add feature service tests Signed-off-by: Felix Wang <[email protected]> * Fix feature service inference logic Signed-off-by: Felix Wang <[email protected]> * Remove stray `__init__.py` file Signed-off-by: Felix Wang <[email protected]> * Fix comments Signed-off-by: Felix Wang <[email protected]> * Add check Signed-off-by: Felix Wang <[email protected]> * Temp Signed-off-by: Felix Wang <[email protected]> * Address comments Signed-off-by: Felix Wang <[email protected]> * Fix Signed-off-by: Felix Wang <[email protected]> * Address comments Signed-off-by: Felix Wang <[email protected]> * Fix Signed-off-by: Felix Wang <[email protected]> Signed-off-by: Felix Wang <[email protected]>
1 parent 7e887e4 commit 4310ed7

22 files changed

+342
-59
lines changed

sdk/python/feast/base_feature_view.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -117,10 +117,16 @@ def __getitem__(self, item):
117117

118118
cp = self.__copy__()
119119
if self.features:
120+
feature_name_to_feature = {
121+
feature.name: feature for feature in self.features
122+
}
120123
referenced_features = []
121-
for feature in self.features:
122-
if feature.name in item:
123-
referenced_features.append(feature)
124+
for feature in item:
125+
if feature not in feature_name_to_feature:
126+
raise ValueError(
127+
f"Feature {feature} does not exist in this feature view."
128+
)
129+
referenced_features.append(feature_name_to_feature[feature])
124130
cp.projection.features = referenced_features
125131
else:
126132
cp.projection.desired_features = item

sdk/python/feast/errors.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -315,6 +315,13 @@ def __init__(self, expected_column_name: str):
315315
)
316316

317317

318+
class FeatureViewMissingDuringFeatureServiceInference(Exception):
319+
def __init__(self, feature_view_name: str, feature_service_name: str):
320+
super().__init__(
321+
f"Missing {feature_view_name} feature view during inference for {feature_service_name} feature service."
322+
)
323+
324+
318325
class InvalidEntityType(Exception):
319326
def __init__(self, entity_type: type):
320327
super().__init__(

sdk/python/feast/feature_service.py

Lines changed: 51 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from typeguard import typechecked
66

77
from feast.base_feature_view import BaseFeatureView
8+
from feast.errors import FeatureViewMissingDuringFeatureServiceInference
89
from feast.feature_logging import LoggingConfig
910
from feast.feature_view import FeatureView
1011
from feast.feature_view_projection import FeatureViewProjection
@@ -85,32 +86,68 @@ def __init__(
8586
if isinstance(feature_grouping, BaseFeatureView):
8687
self.feature_view_projections.append(feature_grouping.projection)
8788

88-
def infer_features(self, fvs_to_update: Optional[Dict[str, FeatureView]] = None):
89+
def infer_features(self, fvs_to_update: Dict[str, FeatureView]):
90+
"""
91+
Infers the features for the projections of this feature service, and updates this feature
92+
service in place.
93+
94+
This method is necessary since feature services may rely on feature views which require
95+
feature inference.
96+
97+
Args:
98+
fvs_to_update: A mapping of feature view names to corresponding feature views that
99+
contains all the feature views necessary to run inference.
100+
"""
89101
for feature_grouping in self._features:
90102
if isinstance(feature_grouping, BaseFeatureView):
91-
# For feature services that depend on an unspecified feature view, apply inferred schema
92-
if fvs_to_update and feature_grouping.name in fvs_to_update:
93-
if feature_grouping.projection.desired_features:
94-
desired_features = set(
95-
feature_grouping.projection.desired_features
96-
)
103+
projection = feature_grouping.projection
104+
105+
if projection.desired_features:
106+
# The projection wants to select a specific set of inferred features.
107+
# Example: FeatureService(features=[fv[["inferred_feature"]]]), where
108+
# 'fv' is a feature view that was defined without a schema.
109+
if feature_grouping.name in fvs_to_update:
110+
# First we validate that the selected features have actually been inferred.
111+
desired_features = set(projection.desired_features)
97112
actual_features = set(
98113
[
99114
f.name
100115
for f in fvs_to_update[feature_grouping.name].features
101116
]
102117
)
103118
assert desired_features.issubset(actual_features)
104-
# We need to set the features for the projection at this point so we ensure we're starting with
105-
# an empty list.
106-
feature_grouping.projection.features = []
119+
120+
# Then we extract the selected features and add them to the projection.
121+
projection.features = []
107122
for f in fvs_to_update[feature_grouping.name].features:
108123
if f.name in desired_features:
109-
feature_grouping.projection.features.append(f)
124+
projection.features.append(f)
110125
else:
111-
feature_grouping.projection.features = fvs_to_update[
112-
feature_grouping.name
113-
].features
126+
raise FeatureViewMissingDuringFeatureServiceInference(
127+
feature_view_name=feature_grouping.name,
128+
feature_service_name=self.name,
129+
)
130+
131+
continue
132+
133+
if projection.features:
134+
# The projection has already selected features from a feature view with a
135+
# known schema, so no action needs to be taken.
136+
# Example: FeatureService(features=[fv[["existing_feature"]]]), where
137+
# 'existing_feature' was defined as part of the schema of 'fv'.
138+
# Example: FeatureService(features=[fv]), where 'fv' was defined with a schema.
139+
continue
140+
141+
# The projection wants to select all possible inferred features.
142+
# Example: FeatureService(features=[fv]), where 'fv' is a feature view that
143+
# was defined without a schema.
144+
if feature_grouping.name in fvs_to_update:
145+
projection.features = fvs_to_update[feature_grouping.name].features
146+
else:
147+
raise FeatureViewMissingDuringFeatureServiceInference(
148+
feature_view_name=feature_grouping.name,
149+
feature_service_name=self.name,
150+
)
114151
else:
115152
raise ValueError(
116153
f"The feature service {self.name} has been provided with an invalid type "

sdk/python/feast/feature_view_projection.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@ class FeatureViewProjection:
2121
name: The unique name of the feature view from which this projection is created.
2222
name_alias: An optional alias for the name.
2323
features: The list of features represented by the feature view projection.
24+
desired_features: The list of features that this feature view projection intends to select.
25+
If empty, the projection intends to select all features. This attribute is only used
26+
for feature service inference. It should only be set if the underlying feature view
27+
is not ready to be projected, i.e. still needs to go through feature inference.
2428
join_key_map: A map to modify join key columns during retrieval of this feature
2529
view projection.
2630
"""

sdk/python/tests/doctest/__init__.py

Whitespace-only changes.
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
from datetime import timedelta
2+
3+
from feast import Entity, FeatureService, FeatureView, Field, FileSource
4+
from feast.types import Float32, Int32, Int64
5+
6+
driver_hourly_stats = FileSource(
7+
path="data/driver_stats.parquet", # Fake path
8+
timestamp_field="event_timestamp",
9+
created_timestamp_column="created",
10+
)
11+
12+
driver = Entity(
13+
name="driver_id",
14+
)
15+
16+
driver_hourly_stats_view = FeatureView(
17+
name="driver_hourly_stats",
18+
entities=[driver],
19+
ttl=timedelta(days=1),
20+
schema=[
21+
Field(name="conv_rate", dtype=Float32),
22+
Field(name="acc_rate", dtype=Float32),
23+
Field(name="avg_daily_trips", dtype=Int64),
24+
Field(name="driver_id", dtype=Int32),
25+
],
26+
online=True,
27+
source=driver_hourly_stats,
28+
tags={},
29+
)
30+
31+
global_daily_stats = FileSource(
32+
path="data/global_stats.parquet", # Fake path
33+
timestamp_field="event_timestamp",
34+
created_timestamp_column="created",
35+
)
36+
37+
global_stats_feature_view = FeatureView(
38+
name="global_daily_stats",
39+
entities=[],
40+
ttl=timedelta(days=1),
41+
schema=[
42+
Field(name="num_rides", dtype=Int32),
43+
Field(name="avg_ride_length", dtype=Float32),
44+
],
45+
online=True,
46+
source=global_daily_stats,
47+
tags={},
48+
)
49+
50+
all_stats_service = FeatureService(
51+
name="all_stats",
52+
features=[driver_hourly_stats_view, global_stats_feature_view],
53+
tags={"release": "production"},
54+
)
55+
56+
some_stats_service = FeatureService(
57+
name="some_stats",
58+
features=[
59+
driver_hourly_stats_view[["conv_rate"]],
60+
global_stats_feature_view[["num_rides"]],
61+
],
62+
tags={"release": "production"},
63+
)
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
from datetime import timedelta
2+
3+
from feast import Entity, FeatureService, FeatureView, FileSource
4+
5+
driver_hourly_stats = FileSource(
6+
path="%PARQUET_PATH%", # placeholder to be replaced by the test
7+
timestamp_field="event_timestamp",
8+
created_timestamp_column="created",
9+
)
10+
11+
driver = Entity(
12+
name="driver_id",
13+
)
14+
15+
driver_hourly_stats_view = FeatureView(
16+
name="driver_hourly_stats",
17+
entities=[driver],
18+
ttl=timedelta(days=1),
19+
online=True,
20+
source=driver_hourly_stats,
21+
tags={},
22+
)
23+
24+
global_daily_stats = FileSource(
25+
path="%PARQUET_PATH_GLOBAL%", # placeholder to be replaced by the test
26+
timestamp_field="event_timestamp",
27+
created_timestamp_column="created",
28+
)
29+
30+
global_stats_feature_view = FeatureView(
31+
name="global_daily_stats",
32+
entities=[],
33+
ttl=timedelta(days=1),
34+
online=True,
35+
source=global_daily_stats,
36+
tags={},
37+
)
38+
39+
all_stats_service = FeatureService(
40+
name="all_stats",
41+
features=[driver_hourly_stats_view, global_stats_feature_view],
42+
tags={"release": "production"},
43+
)
44+
45+
some_stats_service = FeatureService(
46+
name="some_stats",
47+
features=[
48+
driver_hourly_stats_view[["conv_rate"]],
49+
global_stats_feature_view[["num_rides"]],
50+
],
51+
tags={"release": "production"},
52+
)

sdk/python/tests/integration/e2e/__init__.py

Whitespace-only changes.

sdk/python/tests/integration/materialization/__init__.py

Whitespace-only changes.

sdk/python/tests/integration/offline_store/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)