Skip to content

Commit a06700d

Browse files
feat: Enable stream feature view materialization (#2798)
* Enable materialization for stream feature views Signed-off-by: Felix Wang <[email protected]> * Fix bugs with stream feature view materialization Signed-off-by: Felix Wang <[email protected]> * Fix SFV tests to use actual data to allow for inference Signed-off-by: Felix Wang <[email protected]> * Expand online retrieval test to also retrieve pushable_location_stats SFV Signed-off-by: Felix Wang <[email protected]> * Remove stray comment Signed-off-by: Felix Wang <[email protected]> * Add SFVs to Go serving path Signed-off-by: Felix Wang <[email protected]> * Clarify comment Signed-off-by: Felix Wang <[email protected]>
1 parent 792751e commit a06700d

File tree

12 files changed

+428
-256
lines changed

12 files changed

+428
-256
lines changed

go/internal/feast/featurestore.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,14 @@ func (fs *FeatureStore) listAllViews() (map[string]*model.FeatureView, map[strin
224224
fvs[featureView.Base.Name] = featureView
225225
}
226226

227+
streamFeatureViews, err := fs.ListStreamFeatureViews()
228+
if err != nil {
229+
return nil, nil, err
230+
}
231+
for _, streamFeatureView := range streamFeatureViews {
232+
fvs[streamFeatureView.Base.Name] = streamFeatureView
233+
}
234+
227235
onDemandFeatureViews, err := fs.registry.ListOnDemandFeatureViews(fs.config.Project)
228236
if err != nil {
229237
return nil, nil, err
@@ -242,6 +250,14 @@ func (fs *FeatureStore) ListFeatureViews() ([]*model.FeatureView, error) {
242250
return featureViews, nil
243251
}
244252

253+
func (fs *FeatureStore) ListStreamFeatureViews() ([]*model.FeatureView, error) {
254+
streamFeatureViews, err := fs.registry.ListStreamFeatureViews(fs.config.Project)
255+
if err != nil {
256+
return streamFeatureViews, err
257+
}
258+
return streamFeatureViews, nil
259+
}
260+
245261
func (fs *FeatureStore) ListEntities(hideDummyEntity bool) ([]*model.Entity, error) {
246262

247263
allEntities, err := fs.registry.ListEntities(fs.config.Project)

go/internal/feast/model/featureview.go

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,24 @@ type FeatureView struct {
2424

2525
func NewFeatureViewFromProto(proto *core.FeatureView) *FeatureView {
2626
featureView := &FeatureView{Base: NewBaseFeatureView(proto.Spec.Name, proto.Spec.Features),
27-
Ttl: &(*proto.Spec.Ttl),
27+
Ttl: proto.Spec.Ttl,
28+
}
29+
if len(proto.Spec.Entities) == 0 {
30+
featureView.EntityNames = []string{DUMMY_ENTITY_NAME}
31+
} else {
32+
featureView.EntityNames = proto.Spec.Entities
33+
}
34+
entityColumns := make([]*Field, len(proto.Spec.EntityColumns))
35+
for i, entityColumn := range proto.Spec.EntityColumns {
36+
entityColumns[i] = NewFieldFromProto(entityColumn)
37+
}
38+
featureView.EntityColumns = entityColumns
39+
return featureView
40+
}
41+
42+
func NewFeatureViewFromStreamFeatureViewProto(proto *core.StreamFeatureView) *FeatureView {
43+
featureView := &FeatureView{Base: NewBaseFeatureView(proto.Spec.Name, proto.Spec.Features),
44+
Ttl: proto.Spec.Ttl,
2845
}
2946
if len(proto.Spec.Entities) == 0 {
3047
featureView.EntityNames = []string{DUMMY_ENTITY_NAME}

go/internal/feast/registry/registry.go

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ type Registry struct {
3030
cachedFeatureServices map[string]map[string]*core.FeatureService
3131
cachedEntities map[string]map[string]*core.Entity
3232
cachedFeatureViews map[string]map[string]*core.FeatureView
33+
cachedStreamFeatureViews map[string]map[string]*core.StreamFeatureView
3334
cachedOnDemandFeatureViews map[string]map[string]*core.OnDemandFeatureView
3435
cachedRegistry *core.Registry
3536
cachedRegistryProtoLastUpdated time.Time
@@ -106,10 +107,12 @@ func (r *Registry) load(registry *core.Registry) {
106107
r.cachedFeatureServices = make(map[string]map[string]*core.FeatureService)
107108
r.cachedEntities = make(map[string]map[string]*core.Entity)
108109
r.cachedFeatureViews = make(map[string]map[string]*core.FeatureView)
110+
r.cachedStreamFeatureViews = make(map[string]map[string]*core.StreamFeatureView)
109111
r.cachedOnDemandFeatureViews = make(map[string]map[string]*core.OnDemandFeatureView)
110112
r.loadEntities(registry)
111113
r.loadFeatureServices(registry)
112114
r.loadFeatureViews(registry)
115+
r.loadStreamFeatureViews(registry)
113116
r.loadOnDemandFeatureViews(registry)
114117
r.cachedRegistryProtoLastUpdated = time.Now()
115118
}
@@ -144,6 +147,16 @@ func (r *Registry) loadFeatureViews(registry *core.Registry) {
144147
}
145148
}
146149

150+
func (r *Registry) loadStreamFeatureViews(registry *core.Registry) {
151+
streamFeatureViews := registry.StreamFeatureViews
152+
for _, streamFeatureView := range streamFeatureViews {
153+
if _, ok := r.cachedStreamFeatureViews[streamFeatureView.Spec.Project]; !ok {
154+
r.cachedStreamFeatureViews[streamFeatureView.Spec.Project] = make(map[string]*core.StreamFeatureView)
155+
}
156+
r.cachedStreamFeatureViews[streamFeatureView.Spec.Project][streamFeatureView.Spec.Name] = streamFeatureView
157+
}
158+
}
159+
147160
func (r *Registry) loadOnDemandFeatureViews(registry *core.Registry) {
148161
onDemandFeatureViews := registry.OnDemandFeatureViews
149162
for _, onDemandFeatureView := range onDemandFeatureViews {
@@ -193,7 +206,26 @@ func (r *Registry) ListFeatureViews(project string) ([]*model.FeatureView, error
193206
}
194207

195208
/*
196-
Look up Feature Views inside project
209+
Look up Stream Feature Views inside project
210+
Returns empty list if project not found
211+
*/
212+
213+
func (r *Registry) ListStreamFeatureViews(project string) ([]*model.FeatureView, error) {
214+
if cachedStreamFeatureViews, ok := r.cachedStreamFeatureViews[project]; !ok {
215+
return []*model.FeatureView{}, nil
216+
} else {
217+
streamFeatureViews := make([]*model.FeatureView, len(cachedStreamFeatureViews))
218+
index := 0
219+
for _, streamFeatureViewProto := range cachedStreamFeatureViews {
220+
streamFeatureViews[index] = model.NewFeatureViewFromStreamFeatureViewProto(streamFeatureViewProto)
221+
index += 1
222+
}
223+
return streamFeatureViews, nil
224+
}
225+
}
226+
227+
/*
228+
Look up Feature Services inside project
197229
Returns empty list if project not found
198230
*/
199231

@@ -254,6 +286,18 @@ func (r *Registry) GetFeatureView(project, featureViewName string) (*model.Featu
254286
}
255287
}
256288

289+
func (r *Registry) GetStreamFeatureView(project, streamFeatureViewName string) (*model.FeatureView, error) {
290+
if cachedStreamFeatureViews, ok := r.cachedStreamFeatureViews[project]; !ok {
291+
return nil, fmt.Errorf("no cached stream feature views found for project %s", project)
292+
} else {
293+
if streamFeatureViewProto, ok := cachedStreamFeatureViews[streamFeatureViewName]; !ok {
294+
return nil, fmt.Errorf("no cached stream feature view %s found for project %s", streamFeatureViewName, project)
295+
} else {
296+
return model.NewFeatureViewFromStreamFeatureViewProto(streamFeatureViewProto), nil
297+
}
298+
}
299+
}
300+
257301
func (r *Registry) GetFeatureService(project, featureServiceName string) (*model.FeatureService, error) {
258302
if cachedFeatureServices, ok := r.cachedFeatureServices[project]; !ok {
259303
return nil, fmt.Errorf("no cached feature services found for project %s", project)

sdk/python/feast/feature_store.py

Lines changed: 73 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,19 @@ def _list_feature_views(
265265
feature_views.append(fv)
266266
return feature_views
267267

268+
def _list_stream_feature_views(
269+
self, allow_cache: bool = False, hide_dummy_entity: bool = True,
270+
) -> List[StreamFeatureView]:
271+
stream_feature_views = []
272+
for sfv in self._registry.list_stream_feature_views(
273+
self.project, allow_cache=allow_cache
274+
):
275+
if hide_dummy_entity and sfv.entities[0] == DUMMY_ENTITY_NAME:
276+
sfv.entities = []
277+
sfv.entity_columns = []
278+
stream_feature_views.append(sfv)
279+
return stream_feature_views
280+
268281
@log_exceptions_and_usage
269282
def list_on_demand_feature_views(
270283
self, allow_cache: bool = False
@@ -289,9 +302,7 @@ def list_stream_feature_views(
289302
Returns:
290303
A list of stream feature views.
291304
"""
292-
return self._registry.list_stream_feature_views(
293-
self.project, allow_cache=allow_cache
294-
)
305+
return self._list_stream_feature_views(allow_cache)
295306

296307
@log_exceptions_and_usage
297308
def list_data_sources(self, allow_cache: bool = False) -> List[DataSource]:
@@ -558,6 +569,9 @@ def _make_inferences(
558569
update_feature_views_with_inferred_features_and_entities(
559570
views_to_update, entities + entities_to_update, self.config
560571
)
572+
update_feature_views_with_inferred_features_and_entities(
573+
sfvs_to_update, entities + entities_to_update, self.config
574+
)
561575
# TODO(kevjumba): Update schema inferrence
562576
for sfv in sfvs_to_update:
563577
if not sfv.schema:
@@ -574,6 +588,53 @@ def _make_inferences(
574588
for feature_service in feature_services_to_update:
575589
feature_service.infer_features(fvs_to_update=fvs_to_update_map)
576590

591+
def _get_feature_views_to_materialize(
592+
self, feature_views: Optional[List[str]],
593+
) -> List[FeatureView]:
594+
"""
595+
Returns the list of feature views that should be materialized.
596+
597+
If no feature views are specified, all feature views will be returned.
598+
599+
Args:
600+
feature_views: List of names of feature views to materialize.
601+
602+
Raises:
603+
FeatureViewNotFoundException: One of the specified feature views could not be found.
604+
ValueError: One of the specified feature views is not configured for materialization.
605+
"""
606+
feature_views_to_materialize: List[FeatureView] = []
607+
608+
if feature_views is None:
609+
feature_views_to_materialize = self._list_feature_views(
610+
hide_dummy_entity=False
611+
)
612+
feature_views_to_materialize = [
613+
fv for fv in feature_views_to_materialize if fv.online
614+
]
615+
stream_feature_views_to_materialize = self._list_stream_feature_views(
616+
hide_dummy_entity=False
617+
)
618+
feature_views_to_materialize += [
619+
sfv for sfv in stream_feature_views_to_materialize if sfv.online
620+
]
621+
else:
622+
for name in feature_views:
623+
try:
624+
feature_view = self._get_feature_view(name, hide_dummy_entity=False)
625+
except FeatureViewNotFoundException:
626+
feature_view = self._get_stream_feature_view(
627+
name, hide_dummy_entity=False
628+
)
629+
630+
if not feature_view.online:
631+
raise ValueError(
632+
f"FeatureView {feature_view.name} is not configured to be served online."
633+
)
634+
feature_views_to_materialize.append(feature_view)
635+
636+
return feature_views_to_materialize
637+
577638
@log_exceptions_and_usage
578639
def _plan(
579640
self, desired_repo_contents: RepoContents
@@ -873,8 +934,8 @@ def apply(
873934

874935
self._get_provider().update_infra(
875936
project=self.project,
876-
tables_to_delete=views_to_delete if not partial else [],
877-
tables_to_keep=views_to_update,
937+
tables_to_delete=views_to_delete + sfvs_to_delete if not partial else [],
938+
tables_to_keep=views_to_update + sfvs_to_update,
878939
entities_to_delete=entities_to_delete if not partial else [],
879940
entities_to_keep=entities_to_update,
880941
partial=partial,
@@ -1151,23 +1212,9 @@ def materialize_incremental(
11511212
<BLANKLINE>
11521213
...
11531214
"""
1154-
feature_views_to_materialize: List[FeatureView] = []
1155-
if feature_views is None:
1156-
feature_views_to_materialize = self._list_feature_views(
1157-
hide_dummy_entity=False
1158-
)
1159-
feature_views_to_materialize = [
1160-
fv for fv in feature_views_to_materialize if fv.online
1161-
]
1162-
else:
1163-
for name in feature_views:
1164-
feature_view = self._get_feature_view(name, hide_dummy_entity=False)
1165-
if not feature_view.online:
1166-
raise ValueError(
1167-
f"FeatureView {feature_view.name} is not configured to be served online."
1168-
)
1169-
feature_views_to_materialize.append(feature_view)
1170-
1215+
feature_views_to_materialize = self._get_feature_views_to_materialize(
1216+
feature_views
1217+
)
11711218
_print_materialization_log(
11721219
None,
11731220
end_date,
@@ -1258,23 +1305,9 @@ def materialize(
12581305
f"The given start_date {start_date} is greater than the given end_date {end_date}."
12591306
)
12601307

1261-
feature_views_to_materialize: List[FeatureView] = []
1262-
if feature_views is None:
1263-
feature_views_to_materialize = self._list_feature_views(
1264-
hide_dummy_entity=False
1265-
)
1266-
feature_views_to_materialize = [
1267-
fv for fv in feature_views_to_materialize if fv.online
1268-
]
1269-
else:
1270-
for name in feature_views:
1271-
feature_view = self._get_feature_view(name, hide_dummy_entity=False)
1272-
if not feature_view.online:
1273-
raise ValueError(
1274-
f"FeatureView {feature_view.name} is not configured to be served online."
1275-
)
1276-
feature_views_to_materialize.append(feature_view)
1277-
1308+
feature_views_to_materialize = self._get_feature_views_to_materialize(
1309+
feature_views
1310+
)
12781311
_print_materialization_log(
12791312
start_date,
12801313
end_date,
@@ -1327,6 +1360,7 @@ def push(
13271360
from feast.data_source import PushSource
13281361

13291362
all_fvs = self.list_feature_views(allow_cache=allow_registry_cache)
1363+
all_fvs += self.list_stream_feature_views(allow_cache=allow_registry_cache)
13301364

13311365
fvs_with_push_sources = {
13321366
fv

sdk/python/feast/inference.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,10 @@ def update_feature_views_with_inferred_features_and_entities(
9999
other columns except designated timestamp columns are considered to be feature columns. If
100100
the feature view already has features, feature inference is skipped.
101101
102+
Note that this inference logic currently does not take any transformations (either a UDF or
103+
aggregations) into account. For example, even if a stream feature view has a transformation,
104+
this method assumes that the batch source contains transformed data with the correct final schema.
105+
102106
Args:
103107
fvs: The feature views to be updated.
104108
entities: A list containing entities associated with the feature views.

sdk/python/feast/registry.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1267,6 +1267,30 @@ def apply_materialization(
12671267
self.commit()
12681268
return
12691269

1270+
for idx, existing_stream_feature_view_proto in enumerate(
1271+
self.cached_registry_proto.stream_feature_views
1272+
):
1273+
if (
1274+
existing_stream_feature_view_proto.spec.name == feature_view.name
1275+
and existing_stream_feature_view_proto.spec.project == project
1276+
):
1277+
existing_stream_feature_view = StreamFeatureView.from_proto(
1278+
existing_stream_feature_view_proto
1279+
)
1280+
existing_stream_feature_view.materialization_intervals.append(
1281+
(start_date, end_date)
1282+
)
1283+
existing_stream_feature_view.last_updated_timestamp = datetime.utcnow()
1284+
stream_feature_view_proto = existing_stream_feature_view.to_proto()
1285+
stream_feature_view_proto.spec.project = project
1286+
del self.cached_registry_proto.stream_feature_views[idx]
1287+
self.cached_registry_proto.stream_feature_views.append(
1288+
stream_feature_view_proto
1289+
)
1290+
if commit:
1291+
self.commit()
1292+
return
1293+
12701294
raise FeatureViewNotFoundException(feature_view.name, project)
12711295

12721296
def list_feature_views(

0 commit comments

Comments
 (0)