Skip to content

Commit 2ce33cd

Browse files
authored
fix: Forcing ODFV udfs to be __main__ module and fixing false positive duplicate data source warning (#2677)
* fix: Forcing ODFV udfs to be __main__ module so clients don't need the udf's module. Fixing duplicate data source conflict between push source and batch source Signed-off-by: Danny Chiao <[email protected]> * fix Signed-off-by: Danny Chiao <[email protected]> * fix Signed-off-by: Danny Chiao <[email protected]>
1 parent 01f9f0a commit 2ce33cd

File tree

2 files changed

+13
-1
lines changed

2 files changed

+13
-1
lines changed

sdk/python/feast/on_demand_feature_view.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -628,7 +628,14 @@ def on_demand_feature_view(
628628
if not _sources:
629629
raise ValueError("The `sources` parameter must be specified.")
630630

631+
def mainify(obj):
632+
# Needed to allow dill to properly serialize the udf. Otherwise, clients will need to have a file with the same
633+
# name as the original file defining the ODFV.
634+
if obj.__module__ != "__main__":
635+
obj.__module__ = "__main__"
636+
631637
def decorator(user_function):
638+
mainify(user_function)
632639
on_demand_feature_view_obj = OnDemandFeatureView(
633640
name=user_function.__name__,
634641
sources=_sources,

sdk/python/feast/repo_operations.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ def parse_repo(repo_root: Path) -> RepoContents:
110110
request_feature_views=[],
111111
)
112112

113+
data_sources_set = set()
113114
for repo_file in get_repo_files(repo_root):
114115
module_path = py_path_to_module(repo_file)
115116
module = importlib.import_module(module_path)
@@ -119,14 +120,18 @@ def parse_repo(repo_root: Path) -> RepoContents:
119120
(obj is ds) for ds in res.data_sources
120121
):
121122
res.data_sources.append(obj)
123+
data_sources_set.add(obj)
122124
if isinstance(obj, FeatureView) and not any(
123125
(obj is fv) for fv in res.feature_views
124126
):
125127
res.feature_views.append(obj)
126128
if isinstance(obj.stream_source, PushSource) and not any(
127129
(obj is ds) for ds in res.data_sources
128130
):
129-
res.data_sources.append(obj.stream_source.batch_source)
131+
push_source_dep = obj.stream_source.batch_source
132+
# Don't add if the push source's batch source is a duplicate of an existing batch source
133+
if push_source_dep not in data_sources_set:
134+
res.data_sources.append(push_source_dep)
130135
elif isinstance(obj, Entity) and not any(
131136
(obj is entity) for entity in res.entities
132137
):

0 commit comments

Comments
 (0)