Skip to content

Commit 83ab682

Browse files
feat: Add StreamProcessor and SparkKafkaProcessor as contrib (#2777)
* Add StreamProcessor and SparkKafkaProcessor as contrib Signed-off-by: Felix Wang <[email protected]> * Remove comment Signed-off-by: Felix Wang <[email protected]>
1 parent 0439945 commit 83ab682

File tree

2 files changed

+224
-0
lines changed

2 files changed

+224
-0
lines changed
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
from types import MethodType
2+
from typing import List
3+
4+
from pyspark.sql import DataFrame, SparkSession
5+
from pyspark.sql.avro.functions import from_avro
6+
from pyspark.sql.functions import col, from_json
7+
8+
from feast.data_format import AvroFormat, JsonFormat
9+
from feast.data_source import KafkaSource
10+
from feast.infra.contrib.stream_processor import (
11+
ProcessorConfig,
12+
StreamProcessor,
13+
StreamTable,
14+
)
15+
from feast.stream_feature_view import StreamFeatureView
16+
17+
18+
class SparkProcessorConfig(ProcessorConfig):
19+
spark_session: SparkSession
20+
21+
22+
class SparkKafkaProcessor(StreamProcessor):
23+
spark: SparkSession
24+
format: str
25+
write_function: MethodType
26+
join_keys: List[str]
27+
28+
def __init__(
29+
self,
30+
sfv: StreamFeatureView,
31+
config: ProcessorConfig,
32+
write_function: MethodType,
33+
processing_time: str = "30 seconds",
34+
query_timeout: str = "15 seconds",
35+
):
36+
if not isinstance(sfv.stream_source, KafkaSource):
37+
raise ValueError("data source is not kafka source")
38+
if not isinstance(
39+
sfv.stream_source.kafka_options.message_format, AvroFormat
40+
) and not isinstance(
41+
sfv.stream_source.kafka_options.message_format, JsonFormat
42+
):
43+
raise ValueError(
44+
"spark streaming currently only supports json or avro format for kafka source schema"
45+
)
46+
47+
self.format = (
48+
"json"
49+
if isinstance(sfv.stream_source.kafka_options.message_format, JsonFormat)
50+
else "avro"
51+
)
52+
53+
if not isinstance(config, SparkProcessorConfig):
54+
raise ValueError("config is not spark processor config")
55+
self.spark = config.spark_session
56+
self.write_function = write_function
57+
self.processing_time = processing_time
58+
self.query_timeout = query_timeout
59+
super().__init__(sfv=sfv, data_source=sfv.stream_source)
60+
61+
def ingest_stream_feature_view(self) -> None:
62+
ingested_stream_df = self._ingest_stream_data()
63+
transformed_df = self._construct_transformation_plan(ingested_stream_df)
64+
online_store_query = self._write_to_online_store(transformed_df)
65+
return online_store_query
66+
67+
def _ingest_stream_data(self) -> StreamTable:
68+
"""Only supports json and avro formats currently."""
69+
if self.format == "json":
70+
if not isinstance(
71+
self.data_source.kafka_options.message_format, JsonFormat
72+
):
73+
raise ValueError("kafka source message format is not jsonformat")
74+
stream_df = (
75+
self.spark.readStream.format("kafka")
76+
.option(
77+
"kafka.bootstrap.servers",
78+
self.data_source.kafka_options.bootstrap_servers,
79+
)
80+
.option("subscribe", self.data_source.kafka_options.topic)
81+
.option("startingOffsets", "latest") # Query start
82+
.load()
83+
.selectExpr("CAST(value AS STRING)")
84+
.select(
85+
from_json(
86+
col("value"),
87+
self.data_source.kafka_options.message_format.schema_json,
88+
).alias("table")
89+
)
90+
.select("table.*")
91+
)
92+
else:
93+
if not isinstance(
94+
self.data_source.kafka_options.message_format, AvroFormat
95+
):
96+
raise ValueError("kafka source message format is not avro format")
97+
stream_df = (
98+
self.spark.readStream.format("kafka")
99+
.option(
100+
"kafka.bootstrap.servers",
101+
self.data_source.kafka_options.bootstrap_servers,
102+
)
103+
.option("subscribe", self.data_source.kafka_options.topic)
104+
.option("startingOffsets", "latest") # Query start
105+
.load()
106+
.selectExpr("CAST(value AS STRING)")
107+
.select(
108+
from_avro(
109+
col("value"),
110+
self.data_source.kafka_options.message_format.schema_json,
111+
).alias("table")
112+
)
113+
.select("table.*")
114+
)
115+
return stream_df
116+
117+
def _construct_transformation_plan(self, df: StreamTable) -> StreamTable:
118+
return self.sfv.udf.__call__(df) if self.sfv.udf else df
119+
120+
def _write_to_online_store(self, df: StreamTable):
121+
# Validation occurs at the fs.write_to_online_store() phase against the stream feature view schema.
122+
def batch_write(row: DataFrame, batch_id: int):
123+
pd_row = row.toPandas()
124+
self.write_function(
125+
pd_row, input_timestamp="event_timestamp", output_timestamp=""
126+
)
127+
128+
query = (
129+
df.writeStream.outputMode("update")
130+
.option("checkpointLocation", "/tmp/checkpoint/")
131+
.trigger(processingTime=self.processing_time)
132+
.foreachBatch(batch_write)
133+
.start()
134+
)
135+
136+
query.awaitTermination(timeout=self.query_timeout)
137+
return query
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
from abc import ABC
2+
from typing import Callable
3+
4+
import pandas as pd
5+
from pyspark.sql import DataFrame
6+
7+
from feast.data_source import DataSource
8+
from feast.importer import import_class
9+
from feast.repo_config import FeastConfigBaseModel
10+
from feast.stream_feature_view import StreamFeatureView
11+
12+
STREAM_PROCESSOR_CLASS_FOR_TYPE = {
13+
("spark", "kafka"): "feast.infra.contrib.spark_kafka_processor.SparkKafkaProcessor",
14+
}
15+
16+
# TODO: support more types other than just Spark.
17+
StreamTable = DataFrame
18+
19+
20+
class ProcessorConfig(FeastConfigBaseModel):
21+
# Processor mode (spark, etc)
22+
mode: str
23+
# Ingestion source (kafka, kinesis, etc)
24+
source: str
25+
26+
27+
class StreamProcessor(ABC):
28+
"""
29+
A StreamProcessor can ingest and transform data for a specific stream feature view,
30+
and persist that data to the online store.
31+
32+
Attributes:
33+
sfv: The stream feature view on which the stream processor operates.
34+
data_source: The stream data source from which data will be ingested.
35+
"""
36+
37+
sfv: StreamFeatureView
38+
data_source: DataSource
39+
40+
def __init__(self, sfv: StreamFeatureView, data_source: DataSource):
41+
self.sfv = sfv
42+
self.data_source = data_source
43+
44+
def ingest_stream_feature_view(self) -> None:
45+
"""
46+
Ingests data from the stream source attached to the stream feature view; transforms the data
47+
and then persists it to the online store.
48+
"""
49+
pass
50+
51+
def _ingest_stream_data(self) -> StreamTable:
52+
"""
53+
Ingests data into a StreamTable.
54+
"""
55+
pass
56+
57+
def _construct_transformation_plan(self, table: StreamTable) -> StreamTable:
58+
"""
59+
Applies transformations on top of StreamTable object. Since stream engines use lazy
60+
evaluation, the StreamTable will not be materialized until it is actually evaluated.
61+
For example: df.collect() in spark or tbl.execute() in Flink.
62+
"""
63+
pass
64+
65+
def _write_to_online_store(self, table: StreamTable) -> None:
66+
"""
67+
Returns query for persisting data to the online store.
68+
"""
69+
pass
70+
71+
72+
def get_stream_processor_object(
73+
config: ProcessorConfig,
74+
sfv: StreamFeatureView,
75+
write_function: Callable[[pd.DataFrame, str, str], None],
76+
):
77+
"""
78+
Returns a stream processor object based on the config mode and stream source type. The write function is a
79+
function that wraps the feature store "write_to_online_store" capability.
80+
"""
81+
if config.mode == "spark" and config.source == "kafka":
82+
stream_processor = STREAM_PROCESSOR_CLASS_FOR_TYPE[("spark", "kafka")]
83+
module_name, class_name = stream_processor.rsplit(".", 1)
84+
cls = import_class(module_name, class_name, "Processor")
85+
return cls(sfv=sfv, config=config, write_function=write_function,)
86+
else:
87+
raise ValueError("other processors besides spark-kafka not supported")

0 commit comments

Comments
 (0)