Skip to content

Commit a8d282d

Browse files
fix: Fix SparkKafkaProcessor query_timeout parameter (#2789)
Signed-off-by: Felix Wang <[email protected]>
1 parent 336fdd1 commit a8d282d

File tree

3 files changed

+6
-2
lines changed

3 files changed

+6
-2
lines changed

sdk/python/feast/infra/contrib/spark_kafka_processor.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
class SparkProcessorConfig(ProcessorConfig):
1919
spark_session: SparkSession
20+
processing_time: str
21+
query_timeout: int
2022

2123

2224
class SparkKafkaProcessor(StreamProcessor):
@@ -31,7 +33,7 @@ def __init__(
3133
config: ProcessorConfig,
3234
write_function: MethodType,
3335
processing_time: str = "30 seconds",
34-
query_timeout: str = "15 seconds",
36+
query_timeout: int = 15,
3537
):
3638
if not isinstance(sfv.stream_source, KafkaSource):
3739
raise ValueError("data source is not kafka source")

sdk/python/feast/infra/contrib/stream_processor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ def get_stream_processor_object(
8181
if config.mode == "spark" and config.source == "kafka":
8282
stream_processor = STREAM_PROCESSOR_CLASS_FOR_TYPE[("spark", "kafka")]
8383
module_name, class_name = stream_processor.rsplit(".", 1)
84-
cls = import_class(module_name, class_name, "Processor")
84+
cls = import_class(module_name, class_name, "StreamProcessor")
8585
return cls(sfv=sfv, config=config, write_function=write_function,)
8686
else:
8787
raise ValueError("other processors besides spark-kafka not supported")

sdk/python/feast/stream_feature_view.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,10 +132,12 @@ def __eq__(self, other):
132132

133133
if not super().__eq__(other):
134134
return False
135+
135136
if not self.udf:
136137
return not other.udf
137138
if not other.udf:
138139
return False
140+
139141
if (
140142
self.mode != other.mode
141143
or self.timestamp_field != other.timestamp_field

0 commit comments

Comments
 (0)