Skip to content

Commit 134dc5f

Browse files
authored
feat: Write logged features to an offline store (Python API) (#2574)
* write logs to offline store Signed-off-by: pyalex <[email protected]> * format Signed-off-by: pyalex <[email protected]> * fix after rebase Signed-off-by: pyalex <[email protected]> * fix tests Signed-off-by: pyalex <[email protected]> * handle table not found in tests Signed-off-by: pyalex <[email protected]> * some api docs Signed-off-by: pyalex <[email protected]> * fix import Signed-off-by: pyalex <[email protected]> * use predefined schema in tests Signed-off-by: pyalex <[email protected]> * address pr comments Signed-off-by: pyalex <[email protected]> * more api docs Signed-off-by: pyalex <[email protected]> * add proto attr to snowflake dest Signed-off-by: pyalex <[email protected]> * add prefixes to system fields Signed-off-by: pyalex <[email protected]> * add custom destination Signed-off-by: pyalex <[email protected]> * move partition columns to destination config Signed-off-by: pyalex <[email protected]> * after rebase Signed-off-by: pyalex <[email protected]> * allow data source creator implementations w/o logging destination Signed-off-by: pyalex <[email protected]>
1 parent 689d20b commit 134dc5f

23 files changed

+936
-44
lines changed

protos/feast/core/FeatureService.proto

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ option go_package = "github.com/feast-dev/feast/go/protos/feast/core";
55
option java_outer_classname = "FeatureServiceProto";
66
option java_package = "feast.proto.core";
77

8+
import "google/protobuf/duration.proto";
89
import "google/protobuf/timestamp.proto";
910
import "feast/core/FeatureViewProjection.proto";
1011

@@ -35,6 +36,9 @@ message FeatureServiceSpec {
3536

3637
// Owner of the feature service.
3738
string owner = 6;
39+
40+
// (optional) if provided logging will be enabled for this feature service.
41+
LoggingConfig logging_config = 7;
3842
}
3943

4044

@@ -46,3 +50,45 @@ message FeatureServiceMeta {
4650
google.protobuf.Timestamp last_updated_timestamp = 2;
4751

4852
}
53+
54+
55+
message LoggingConfig {
56+
float sample_rate = 1;
57+
google.protobuf.Duration partition_interval = 2;
58+
59+
oneof destination {
60+
FileDestination file_destination = 3;
61+
BigQueryDestination bigquery_destination = 4;
62+
RedshiftDestination redshift_destination = 5;
63+
SnowflakeDestination snowflake_destination = 6;
64+
CustomDestination custom_destination = 7;
65+
}
66+
67+
message FileDestination {
68+
string path = 1;
69+
string s3_endpoint_override = 2;
70+
71+
// column names to use for partitioning
72+
repeated string partition_by = 3;
73+
}
74+
75+
message BigQueryDestination {
76+
// Full table reference in the form of [project:dataset.table]
77+
string table_ref = 1;
78+
}
79+
80+
message RedshiftDestination {
81+
// Destination table name. ClusterId and database will be taken from an offline store config
82+
string table_name = 1;
83+
}
84+
85+
message SnowflakeDestination {
86+
// Destination table name. Schema and database will be taken from an offline store config
87+
string table_name = 1;
88+
}
89+
90+
message CustomDestination {
91+
string kind = 1;
92+
map<string, string> config = 2;
93+
}
94+
}

sdk/python/feast/feature_logging.py

Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
import abc
2+
from typing import TYPE_CHECKING, Dict, Optional, Type, cast
3+
4+
import pyarrow as pa
5+
from pytz import UTC
6+
7+
from feast.data_source import DataSource
8+
from feast.embedded_go.type_map import FEAST_TYPE_TO_ARROW_TYPE, PA_TIMESTAMP_TYPE
9+
from feast.errors import (
10+
FeastObjectNotFoundException,
11+
FeatureViewNotFoundException,
12+
OnDemandFeatureViewNotFoundException,
13+
)
14+
from feast.protos.feast.core.FeatureService_pb2 import (
15+
LoggingConfig as LoggingConfigProto,
16+
)
17+
from feast.types import from_value_type
18+
19+
if TYPE_CHECKING:
20+
from feast import FeatureService
21+
from feast.registry import Registry
22+
23+
24+
REQUEST_ID_FIELD = "__request_id"
25+
LOG_TIMESTAMP_FIELD = "__log_timestamp"
26+
LOG_DATE_FIELD = "__log_date"
27+
28+
29+
class LoggingSource:
30+
"""
31+
Logging source describes object that produces logs (eg, feature service produces logs of served features).
32+
It should be able to provide schema of produced logs table and additional metadata that describes logs data.
33+
"""
34+
35+
@abc.abstractmethod
36+
def get_schema(self, registry: "Registry") -> pa.Schema:
37+
""" Generate schema for logs destination. """
38+
raise NotImplementedError
39+
40+
@abc.abstractmethod
41+
def get_log_timestamp_column(self) -> str:
42+
""" Return timestamp column that must exist in generated schema. """
43+
raise NotImplementedError
44+
45+
46+
class FeatureServiceLoggingSource(LoggingSource):
47+
def __init__(self, feature_service: "FeatureService", project: str):
48+
self._feature_service = feature_service
49+
self._project = project
50+
51+
def get_schema(self, registry: "Registry") -> pa.Schema:
52+
fields: Dict[str, pa.DataType] = {}
53+
54+
for projection in self._feature_service.feature_view_projections:
55+
for feature in projection.features:
56+
fields[
57+
f"{projection.name_to_use()}__{feature.name}"
58+
] = FEAST_TYPE_TO_ARROW_TYPE[feature.dtype]
59+
fields[
60+
f"{projection.name_to_use()}__{feature.name}__timestamp"
61+
] = PA_TIMESTAMP_TYPE
62+
fields[
63+
f"{projection.name_to_use()}__{feature.name}__status"
64+
] = pa.int32()
65+
66+
try:
67+
feature_view = registry.get_feature_view(projection.name, self._project)
68+
except FeatureViewNotFoundException:
69+
try:
70+
on_demand_feature_view = registry.get_on_demand_feature_view(
71+
projection.name, self._project
72+
)
73+
except OnDemandFeatureViewNotFoundException:
74+
raise FeastObjectNotFoundException(
75+
f"Can't recognize feature view with a name {projection.name}"
76+
)
77+
78+
for (
79+
request_source
80+
) in on_demand_feature_view.source_request_sources.values():
81+
for field in request_source.schema:
82+
fields[field.name] = FEAST_TYPE_TO_ARROW_TYPE[field.dtype]
83+
84+
else:
85+
for entity_name in feature_view.entities:
86+
entity = registry.get_entity(entity_name, self._project)
87+
join_key = projection.join_key_map.get(
88+
entity.join_key, entity.join_key
89+
)
90+
fields[join_key] = FEAST_TYPE_TO_ARROW_TYPE[
91+
from_value_type(entity.value_type)
92+
]
93+
94+
# system columns
95+
fields[REQUEST_ID_FIELD] = pa.string()
96+
fields[LOG_TIMESTAMP_FIELD] = pa.timestamp("us", tz=UTC)
97+
98+
return pa.schema(
99+
[pa.field(name, data_type) for name, data_type in fields.items()]
100+
)
101+
102+
def get_log_timestamp_column(self) -> str:
103+
return LOG_TIMESTAMP_FIELD
104+
105+
106+
class _DestinationRegistry(type):
107+
classes_by_proto_attr_name: Dict[str, Type["LoggingDestination"]] = {}
108+
109+
def __new__(cls, name, bases, dct):
110+
kls = type.__new__(cls, name, bases, dct)
111+
if dct.get("_proto_attr_name"):
112+
cls.classes_by_proto_attr_name[dct["_proto_attr_name"]] = kls
113+
return kls
114+
115+
116+
class LoggingDestination:
117+
"""
118+
Logging destination contains details about where exactly logs should be written inside an offline store.
119+
It is implementation specific - each offline store must implement LoggingDestination subclass.
120+
121+
Kind of logging destination will be determined by matching attribute name in LoggingConfig protobuf message
122+
and "_proto_kind" property of each subclass.
123+
"""
124+
125+
_proto_kind: str
126+
127+
@classmethod
128+
@abc.abstractmethod
129+
def from_proto(cls, config_proto: LoggingConfigProto) -> "LoggingDestination":
130+
raise NotImplementedError
131+
132+
@abc.abstractmethod
133+
def to_proto(self) -> LoggingConfigProto:
134+
raise NotImplementedError
135+
136+
@abc.abstractmethod
137+
def to_data_source(self) -> DataSource:
138+
"""
139+
Convert this object into a data source to read logs from an offline store.
140+
"""
141+
raise NotImplementedError
142+
143+
144+
class LoggingConfig:
145+
destination: LoggingDestination
146+
147+
def __init__(self, destination: LoggingDestination):
148+
self.destination = destination
149+
150+
@classmethod
151+
def from_proto(cls, config_proto: LoggingConfigProto) -> Optional["LoggingConfig"]:
152+
proto_kind = cast(str, config_proto.WhichOneof("destination"))
153+
if proto_kind is None:
154+
return
155+
156+
if proto_kind == "custom_destination":
157+
proto_kind = config_proto.custom_destination.kind
158+
159+
destination_class = _DestinationRegistry.classes_by_proto_attr_name[proto_kind]
160+
return LoggingConfig(destination=destination_class.from_proto(config_proto))
161+
162+
def to_proto(self) -> LoggingConfigProto:
163+
proto = self.destination.to_proto()
164+
return proto

sdk/python/feast/feature_service.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from google.protobuf.json_format import MessageToJson
66

77
from feast.base_feature_view import BaseFeatureView
8+
from feast.feature_logging import LoggingConfig
89
from feast.feature_view import FeatureView
910
from feast.feature_view_projection import FeatureViewProjection
1011
from feast.on_demand_feature_view import OnDemandFeatureView
@@ -44,6 +45,7 @@ class FeatureService:
4445
owner: str
4546
created_timestamp: Optional[datetime] = None
4647
last_updated_timestamp: Optional[datetime] = None
48+
logging_config: Optional[LoggingConfig] = None
4749

4850
@log_exceptions
4951
def __init__(
@@ -54,6 +56,7 @@ def __init__(
5456
tags: Dict[str, str] = None,
5557
description: str = "",
5658
owner: str = "",
59+
logging_config: Optional[LoggingConfig] = None,
5760
):
5861
"""
5962
Creates a FeatureService object.
@@ -106,6 +109,7 @@ def __init__(
106109
self.owner = owner
107110
self.created_timestamp = None
108111
self.last_updated_timestamp = None
112+
self.logging_config = logging_config
109113

110114
def __repr__(self):
111115
items = (f"{k} = {v}" for k, v in self.__dict__.items())
@@ -152,6 +156,9 @@ def from_proto(cls, feature_service_proto: FeatureServiceProto):
152156
tags=dict(feature_service_proto.spec.tags),
153157
description=feature_service_proto.spec.description,
154158
owner=feature_service_proto.spec.owner,
159+
logging_config=LoggingConfig.from_proto(
160+
feature_service_proto.spec.logging_config
161+
),
155162
)
156163
fs.feature_view_projections.extend(
157164
[
@@ -192,6 +199,9 @@ def to_proto(self) -> FeatureServiceProto:
192199
tags=self.tags,
193200
description=self.description,
194201
owner=self.owner,
202+
logging_config=self.logging_config.to_proto()
203+
if self.logging_config
204+
else None,
195205
)
196206

197207
return FeatureServiceProto(spec=spec, meta=meta)

sdk/python/feast/feature_store.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
)
3535

3636
import pandas as pd
37+
import pyarrow as pa
3738
from colorama import Fore, Style
3839
from google.protobuf.timestamp_pb2 import Timestamp
3940
from tqdm import tqdm
@@ -1976,6 +1977,25 @@ def serve_transformations(self, port: int) -> None:
19761977
def _teardown_go_server(self):
19771978
self._go_server = None
19781979

1980+
def write_logged_features(self, logs: pa.Table, source: Union[FeatureService]):
1981+
"""
1982+
Write logs produced by a source (currently only feature service is supported as a source)
1983+
to an offline store.
1984+
"""
1985+
if not isinstance(source, FeatureService):
1986+
raise ValueError("Only feature service is currently supported as a source")
1987+
1988+
assert (
1989+
source.logging_config is not None
1990+
), "Feature service must be configured with logging config in order to use this functionality"
1991+
1992+
self._get_provider().write_feature_service_logs(
1993+
feature_service=source,
1994+
logs=logs,
1995+
config=self.config,
1996+
registry=self._registry,
1997+
)
1998+
19791999

19802000
def _validate_entity_values(join_key_values: Dict[str, List[Value]]):
19812001
set_of_row_lengths = {len(v) for v in join_key_values.values()}

0 commit comments

Comments
 (0)