Skip to content

Commit 38cd7f9

Browse files
authored
fix: Update on demand feature view api (#2587)
* Migrate over Signed-off-by: Kevin Zhang <[email protected]> * Fix Signed-off-by: Kevin Zhang <[email protected]> * Fix Signed-off-by: Kevin Zhang <[email protected]> * Fix Signed-off-by: Kevin Zhang <[email protected]> * temp fix Signed-off-by: Kevin Zhang <[email protected]> * Fix lint Signed-off-by: Kevin Zhang <[email protected]> * Fix lint Signed-off-by: Kevin Zhang <[email protected]> * Fix lint Signed-off-by: Kevin Zhang <[email protected]> * Fix Signed-off-by: Kevin Zhang <[email protected]> * Fix import Signed-off-by: Kevin Zhang <[email protected]> * Fix Signed-off-by: Kevin Zhang <[email protected]> * Fix Signed-off-by: Kevin Zhang <[email protected]> * Fix lint Signed-off-by: Kevin Zhang <[email protected]> * Fix Signed-off-by: Kevin Zhang <[email protected]> * Fix lint Signed-off-by: Kevin Zhang <[email protected]> * Fix integration tests Signed-off-by: Kevin Zhang <[email protected]> * Fix integration tests Signed-off-by: Kevin Zhang <[email protected]> * Fix Signed-off-by: Kevin Zhang <[email protected]> * lilnt Signed-off-by: Kevin Zhang <[email protected]> * Fix lint? Signed-off-by: Kevin Zhang <[email protected]> * Fix Signed-off-by: Kevin Zhang <[email protected]>
1 parent 4606d7c commit 38cd7f9

File tree

11 files changed

+255
-77
lines changed

11 files changed

+255
-77
lines changed

docs/tutorials/validating-historical-features.md

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ pyarrow.parquet.write_table(entities_2019_table, "entities.parquet")
107107
import pyarrow.parquet
108108
import pandas as pd
109109

110-
from feast import FeatureView, Entity, FeatureStore, Field
110+
from feast import FeatureView, Entity, FeatureStore, Field, BatchFeatureView
111111
from feast.types import Float64, Int64
112112
from feast.value_type import ValueType
113113
from feast.data_format import ParquetFormat
@@ -134,7 +134,7 @@ taxi_entity = Entity(name='taxi', join_keys=['taxi_id'])
134134

135135

136136
```python
137-
trips_stats_fv = FeatureView(
137+
trips_stats_fv = BatchFeatureView(
138138
name='trip_stats',
139139
entities=['taxi'],
140140
features=[
@@ -160,9 +160,9 @@ trips_stats_fv = FeatureView(
160160
Field("avg_trip_seconds", Float64),
161161
Field("earned_per_hour", Float64),
162162
],
163-
sources={
164-
"stats": trips_stats_fv
165-
}
163+
sources=[
164+
trips_stats_fv,
165+
]
166166
)
167167
def on_demand_stats(inp):
168168
out = pd.DataFrame()

examples/java-demo/feature_repo/driver_repo.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,15 @@
77
from google.protobuf.duration_pb2 import Duration
88
from feast.field import Field
99

10-
from feast import Entity, Feature, FeatureView, FileSource, ValueType
10+
from feast import Entity, Feature, BatchFeatureView, FileSource, ValueType
1111

1212
driver_hourly_stats = FileSource(
1313
path="data/driver_stats_with_string.parquet",
1414
timestamp_field="event_timestamp",
1515
created_timestamp_column="created",
1616
)
1717
driver = Entity(name="driver_id", value_type=ValueType.INT64, description="driver id",)
18-
driver_hourly_stats_view = FeatureView(
18+
driver_hourly_stats_view = BatchFeatureView(
1919
name="driver_hourly_stats",
2020
entities=["driver_id"],
2121
ttl=Duration(seconds=86400000),
@@ -43,10 +43,10 @@
4343
# Define an on demand feature view which can generate new features based on
4444
# existing feature views and RequestSource features
4545
@on_demand_feature_view(
46-
inputs={
47-
"driver_hourly_stats": driver_hourly_stats_view,
48-
"vals_to_add": input_request,
49-
},
46+
inputs=[
47+
driver_hourly_stats_view,
48+
input_request,
49+
],
5050
schema=[
5151
Field(name="conv_rate_plus_val1", dtype=Float64),
5252
Field(name="conv_rate_plus_val2", dtype=Float64),

sdk/python/feast/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from feast.infra.offline_stores.redshift_source import RedshiftSource
88
from feast.infra.offline_stores.snowflake_source import SnowflakeSource
99

10+
from .batch_feature_view import BatchFeatureView
1011
from .data_source import (
1112
KafkaSource,
1213
KinesisSource,
@@ -23,6 +24,7 @@
2324
from .on_demand_feature_view import OnDemandFeatureView
2425
from .repo_config import RepoConfig
2526
from .request_feature_view import RequestFeatureView
27+
from .stream_feature_view import StreamFeatureView
2628
from .value_type import ValueType
2729

2830
logging.basicConfig(
@@ -38,6 +40,7 @@
3840
pass
3941

4042
__all__ = [
43+
"BatchFeatureView",
4144
"Entity",
4245
"KafkaSource",
4346
"KinesisSource",
@@ -49,6 +52,7 @@
4952
"OnDemandFeatureView",
5053
"RepoConfig",
5154
"SourceType",
55+
"StreamFeatureView",
5256
"ValueType",
5357
"BigQuerySource",
5458
"FileSource",

sdk/python/feast/feature_view.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -439,8 +439,9 @@ def from_proto(cls, feature_view_proto: FeatureViewProto):
439439
else feature_view_proto.spec.ttl.ToTimedelta()
440440
),
441441
source=batch_source,
442-
stream_source=stream_source,
443442
)
443+
if stream_source:
444+
feature_view.stream_source = stream_source
444445

445446
# FeatureViewProjections are not saved in the FeatureView proto.
446447
# Create the default projection.

sdk/python/feast/on_demand_feature_view.py

Lines changed: 111 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import pandas as pd
99

1010
from feast.base_feature_view import BaseFeatureView
11+
from feast.batch_feature_view import BatchFeatureView
1112
from feast.data_source import RequestSource
1213
from feast.errors import RegistryInferenceFailure, SpecifiedFeaturesNotPresentError
1314
from feast.feature import Feature
@@ -25,6 +26,7 @@
2526
from feast.protos.feast.core.OnDemandFeatureView_pb2 import (
2627
UserDefinedFunction as UserDefinedFunctionProto,
2728
)
29+
from feast.stream_feature_view import StreamFeatureView
2830
from feast.type_map import (
2931
feast_value_type_to_pandas_type,
3032
python_type_to_feast_value_type,
@@ -66,14 +68,21 @@ class OnDemandFeatureView(BaseFeatureView):
6668
tags: Dict[str, str]
6769
owner: str
6870

69-
@log_exceptions
70-
def __init__(
71+
@log_exceptions # noqa: C901
72+
def __init__( # noqa: C901
7173
self,
7274
*args,
7375
name: Optional[str] = None,
7476
features: Optional[List[Feature]] = None,
7577
sources: Optional[
76-
Dict[str, Union[FeatureView, FeatureViewProjection, RequestSource]]
78+
List[
79+
Union[
80+
BatchFeatureView,
81+
StreamFeatureView,
82+
RequestSource,
83+
FeatureViewProjection,
84+
]
85+
]
7786
] = None,
7887
udf: Optional[MethodType] = None,
7988
inputs: Optional[
@@ -92,11 +101,11 @@ def __init__(
92101
features (deprecated): The list of features in the output of the on demand
93102
feature view, after the transformation has been applied.
94103
sources (optional): A map from input source names to the actual input sources,
95-
which may be feature views, feature view projections, or request data sources.
104+
which may be feature views, or request data sources.
96105
These sources serve as inputs to the udf, which will refer to them by name.
97106
udf (optional): The user defined transformation function, which must take pandas
98107
dataframes as inputs.
99-
inputs (optional): A map from input source names to the actual input sources,
108+
inputs (optional): (Deprecated) A map from input source names to the actual input sources,
100109
which may be feature views, feature view projections, or request data sources.
101110
These sources serve as inputs to the udf, which will refer to them by name.
102111
schema (optional): The list of features in the output of the on demand feature
@@ -123,8 +132,7 @@ def __init__(
123132
),
124133
DeprecationWarning,
125134
)
126-
127-
_sources = sources or inputs
135+
_sources = sources or []
128136
if inputs and sources:
129137
raise ValueError("At most one of `sources` or `inputs` can be specified.")
130138
elif inputs:
@@ -135,7 +143,17 @@ def __init__(
135143
),
136144
DeprecationWarning,
137145
)
138-
146+
for _, source in inputs.items():
147+
if isinstance(source, FeatureView):
148+
_sources.append(feature_view_to_batch_feature_view(source))
149+
elif isinstance(source, RequestSource) or isinstance(
150+
source, FeatureViewProjection
151+
):
152+
_sources.append(source)
153+
else:
154+
raise ValueError(
155+
"input can only accept FeatureView, FeatureViewProjection, or RequestSource"
156+
)
139157
_udf = udf
140158

141159
if args:
@@ -169,7 +187,18 @@ def __init__(
169187
DeprecationWarning,
170188
)
171189
if len(args) >= 3:
172-
_sources = args[2]
190+
_inputs = args[2]
191+
for _, source in _inputs.items():
192+
if isinstance(source, FeatureView):
193+
_sources.append(feature_view_to_batch_feature_view(source))
194+
elif isinstance(source, RequestSource) or isinstance(
195+
source, FeatureViewProjection
196+
):
197+
_sources.append(source)
198+
else:
199+
raise ValueError(
200+
"input can only accept FeatureView, FeatureViewProjection, or RequestSource"
201+
)
173202
warnings.warn(
174203
(
175204
"The `inputs` parameter is being deprecated. Please use `sources` instead. "
@@ -195,18 +224,17 @@ def __init__(
195224
tags=tags,
196225
owner=owner,
197226
)
198-
199227
assert _sources is not None
200228
self.source_feature_view_projections: Dict[str, FeatureViewProjection] = {}
201229
self.source_request_sources: Dict[str, RequestSource] = {}
202-
for source_name, odfv_source in _sources.items():
230+
for odfv_source in _sources:
203231
if isinstance(odfv_source, RequestSource):
204-
self.source_request_sources[source_name] = odfv_source
232+
self.source_request_sources[odfv_source.name] = odfv_source
205233
elif isinstance(odfv_source, FeatureViewProjection):
206-
self.source_feature_view_projections[source_name] = odfv_source
234+
self.source_feature_view_projections[odfv_source.name] = odfv_source
207235
else:
208236
self.source_feature_view_projections[
209-
source_name
237+
odfv_source.name
210238
] = odfv_source.projection
211239

212240
if _udf is None:
@@ -219,12 +247,12 @@ def proto_class(self) -> Type[OnDemandFeatureViewProto]:
219247
return OnDemandFeatureViewProto
220248

221249
def __copy__(self):
250+
222251
fv = OnDemandFeatureView(
223252
name=self.name,
224253
schema=self.features,
225-
sources=dict(
226-
**self.source_feature_view_projections, **self.source_request_sources,
227-
),
254+
sources=list(self.source_feature_view_projections.values())
255+
+ list(self.source_request_sources.values()),
228256
udf=self.udf,
229257
description=self.description,
230258
tags=self.tags,
@@ -302,22 +330,21 @@ def from_proto(cls, on_demand_feature_view_proto: OnDemandFeatureViewProto):
302330
Returns:
303331
A OnDemandFeatureView object based on the on-demand feature view protobuf.
304332
"""
305-
sources = {}
306-
for (
307-
source_name,
308-
on_demand_source,
309-
) in on_demand_feature_view_proto.spec.sources.items():
333+
sources = []
334+
for (_, on_demand_source,) in on_demand_feature_view_proto.spec.sources.items():
310335
if on_demand_source.WhichOneof("source") == "feature_view":
311-
sources[source_name] = FeatureView.from_proto(
312-
on_demand_source.feature_view
313-
).projection
336+
sources.append(
337+
FeatureView.from_proto(on_demand_source.feature_view).projection
338+
)
314339
elif on_demand_source.WhichOneof("source") == "feature_view_projection":
315-
sources[source_name] = FeatureViewProjection.from_proto(
316-
on_demand_source.feature_view_projection
340+
sources.append(
341+
FeatureViewProjection.from_proto(
342+
on_demand_source.feature_view_projection
343+
)
317344
)
318345
else:
319-
sources[source_name] = RequestSource.from_proto(
320-
on_demand_source.request_data_source
346+
sources.append(
347+
RequestSource.from_proto(on_demand_source.request_data_source)
321348
)
322349
on_demand_feature_view_obj = cls(
323350
name=on_demand_feature_view_proto.spec.name,
@@ -476,7 +503,16 @@ def get_requested_odfvs(feature_refs, project, registry):
476503
def on_demand_feature_view(
477504
*args,
478505
features: Optional[List[Feature]] = None,
479-
sources: Optional[Dict[str, Union[FeatureView, RequestSource]]] = None,
506+
sources: Optional[
507+
List[
508+
Union[
509+
BatchFeatureView,
510+
StreamFeatureView,
511+
RequestSource,
512+
FeatureViewProjection,
513+
]
514+
]
515+
] = None,
480516
inputs: Optional[Dict[str, Union[FeatureView, RequestSource]]] = None,
481517
schema: Optional[List[Field]] = None,
482518
description: str = "",
@@ -490,7 +526,7 @@ def on_demand_feature_view(
490526
features (deprecated): The list of features in the output of the on demand
491527
feature view, after the transformation has been applied.
492528
sources (optional): A map from input source names to the actual input sources,
493-
which may be feature views, feature view projections, or request data sources.
529+
which may be feature views, or request data sources.
494530
These sources serve as inputs to the udf, which will refer to them by name.
495531
inputs (optional): A map from input source names to the actual input sources,
496532
which may be feature views, feature view projections, or request data sources.
@@ -517,8 +553,7 @@ def on_demand_feature_view(
517553
),
518554
DeprecationWarning,
519555
)
520-
521-
_sources = sources or inputs
556+
_sources = sources or []
522557
if inputs and sources:
523558
raise ValueError("At most one of `sources` or `inputs` can be specified.")
524559
elif inputs:
@@ -529,6 +564,17 @@ def on_demand_feature_view(
529564
),
530565
DeprecationWarning,
531566
)
567+
for _, source in inputs.items():
568+
if isinstance(source, FeatureView):
569+
_sources.append(feature_view_to_batch_feature_view(source))
570+
elif isinstance(source, RequestSource) or isinstance(
571+
source, FeatureViewProjection
572+
):
573+
_sources.append(source)
574+
else:
575+
raise ValueError(
576+
"input can only accept FeatureView, FeatureViewProjection, or RequestSource"
577+
)
532578

533579
if args:
534580
warnings.warn(
@@ -559,14 +605,25 @@ def on_demand_feature_view(
559605
DeprecationWarning,
560606
)
561607
if len(args) >= 2:
562-
_sources = args[1]
563-
warnings.warn(
564-
(
565-
"The `inputs` parameter is being deprecated. Please use `sources` instead. "
566-
"Feast 0.21 and onwards will not support the `inputs` parameter."
567-
),
568-
DeprecationWarning,
569-
)
608+
_inputs = args[1]
609+
for _, source in _inputs.items():
610+
if isinstance(source, FeatureView):
611+
_sources.append(feature_view_to_batch_feature_view(source))
612+
elif isinstance(source, RequestSource) or isinstance(
613+
source, FeatureViewProjection
614+
):
615+
_sources.append(source)
616+
else:
617+
raise ValueError(
618+
"input can only accept FeatureView, FeatureViewProjection, or RequestSource"
619+
)
620+
warnings.warn(
621+
(
622+
"The `inputs` parameter is being deprecated. Please use `sources` instead. "
623+
"Feast 0.21 and onwards will not support the `inputs` parameter."
624+
),
625+
DeprecationWarning,
626+
)
570627

571628
if not _sources:
572629
raise ValueError("The `sources` parameter must be specified.")
@@ -587,3 +644,16 @@ def decorator(user_function):
587644
return on_demand_feature_view_obj
588645

589646
return decorator
647+
648+
649+
def feature_view_to_batch_feature_view(fv: FeatureView) -> BatchFeatureView:
650+
return BatchFeatureView(
651+
name=fv.name,
652+
entities=fv.entities,
653+
ttl=fv.ttl,
654+
tags=fv.tags,
655+
online=fv.online,
656+
owner=fv.owner,
657+
schema=fv.schema,
658+
source=fv.source,
659+
)

0 commit comments

Comments
 (0)