Skip to content

Commit ecb8b2a

Browse files
authored
fix: Bugfixes for how registry is loaded (#2768)
* fix: Bugfixes for how registry is loaded Signed-off-by: Achal Shah <[email protected]> * more fixes Signed-off-by: Achal Shah <[email protected]> * more fixes Signed-off-by: Achal Shah <[email protected]> * increase timeout Signed-off-by: Achal Shah <[email protected]> * more fixes Signed-off-by: Achal Shah <[email protected]> * more fixes Signed-off-by: Achal Shah <[email protected]>
1 parent 88cc47d commit ecb8b2a

File tree

3 files changed

+85
-72
lines changed

3 files changed

+85
-72
lines changed

sdk/python/feast/infra/registry_stores/sql.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -472,7 +472,7 @@ def update_infra(self, infra: Infra, project: str, commit: bool = True):
472472
pass
473473

474474
def get_infra(self, project: str, allow_cache: bool = False) -> Infra:
475-
pass
475+
return Infra()
476476

477477
def apply_user_metadata(
478478
self,
@@ -550,7 +550,8 @@ def proto(self) -> RegistryProto:
550550
(self.list_validation_references, r.validation_references),
551551
]:
552552
objs: List[Any] = lister(project) # type: ignore
553-
registry_proto_field.extend([obj.to_proto() for obj in objs])
553+
if objs:
554+
registry_proto_field.extend([obj.to_proto() for obj in objs])
554555

555556
return r
556557

sdk/python/feast/registry.py

Lines changed: 81 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -663,6 +663,75 @@ def commit(self):
663663
def refresh(self):
664664
"""Refreshes the state of the registry cache by fetching the registry state from the remote registry store."""
665665

666+
@staticmethod
667+
def _message_to_sorted_dict(message: Message) -> Dict[str, Any]:
668+
return json.loads(MessageToJson(message, sort_keys=True))
669+
670+
def to_dict(self, project: str) -> Dict[str, List[Any]]:
671+
"""Returns a dictionary representation of the registry contents for the specified project.
672+
673+
For each list in the dictionary, the elements are sorted by name, so this
674+
method can be used to compare two registries.
675+
676+
Args:
677+
project: Feast project to convert to a dict
678+
"""
679+
registry_dict: Dict[str, Any] = defaultdict(list)
680+
registry_dict["project"] = project
681+
for data_source in sorted(
682+
self.list_data_sources(project=project), key=lambda ds: ds.name
683+
):
684+
registry_dict["dataSources"].append(
685+
self._message_to_sorted_dict(data_source.to_proto())
686+
)
687+
for entity in sorted(
688+
self.list_entities(project=project), key=lambda entity: entity.name
689+
):
690+
registry_dict["entities"].append(
691+
self._message_to_sorted_dict(entity.to_proto())
692+
)
693+
for feature_view in sorted(
694+
self.list_feature_views(project=project),
695+
key=lambda feature_view: feature_view.name,
696+
):
697+
registry_dict["featureViews"].append(
698+
self._message_to_sorted_dict(feature_view.to_proto())
699+
)
700+
for feature_service in sorted(
701+
self.list_feature_services(project=project),
702+
key=lambda feature_service: feature_service.name,
703+
):
704+
registry_dict["featureServices"].append(
705+
self._message_to_sorted_dict(feature_service.to_proto())
706+
)
707+
for on_demand_feature_view in sorted(
708+
self.list_on_demand_feature_views(project=project),
709+
key=lambda on_demand_feature_view: on_demand_feature_view.name,
710+
):
711+
odfv_dict = self._message_to_sorted_dict(on_demand_feature_view.to_proto())
712+
odfv_dict["spec"]["userDefinedFunction"]["body"] = dill.source.getsource(
713+
on_demand_feature_view.udf
714+
)
715+
registry_dict["onDemandFeatureViews"].append(odfv_dict)
716+
for request_feature_view in sorted(
717+
self.list_request_feature_views(project=project),
718+
key=lambda request_feature_view: request_feature_view.name,
719+
):
720+
registry_dict["requestFeatureViews"].append(
721+
self._message_to_sorted_dict(request_feature_view.to_proto())
722+
)
723+
for saved_dataset in sorted(
724+
self.list_saved_datasets(project=project), key=lambda item: item.name
725+
):
726+
registry_dict["savedDatasets"].append(
727+
self._message_to_sorted_dict(saved_dataset.to_proto())
728+
)
729+
for infra_object in sorted(self.get_infra(project=project).infra_objects):
730+
registry_dict["infra"].append(
731+
self._message_to_sorted_dict(infra_object.to_proto())
732+
)
733+
return registry_dict
734+
666735

667736
class Registry(BaseRegistry):
668737
"""
@@ -689,6 +758,18 @@ def get_user_metadata(
689758
cached_registry_proto_created: Optional[datetime] = None
690759
cached_registry_proto_ttl: timedelta
691760

761+
def __new__(
762+
cls, registry_config: Optional[RegistryConfig], repo_path: Optional[Path]
763+
):
764+
# We override __new__ so that we can inspect registry_config and create a SqlRegistry without callers
765+
# needing to make any changes.
766+
if registry_config and registry_config.registry_type == "sql":
767+
from feast.infra.registry_stores.sql import SqlRegistry
768+
769+
return SqlRegistry(registry_config, repo_path)
770+
else:
771+
return super(Registry, cls).__new__(cls)
772+
692773
def __init__(
693774
self, registry_config: Optional[RegistryConfig], repo_path: Optional[Path]
694775
):
@@ -1587,75 +1668,6 @@ def teardown(self):
15871668
def proto(self) -> RegistryProto:
15881669
return self.cached_registry_proto or RegistryProto()
15891670

1590-
def to_dict(self, project: str) -> Dict[str, List[Any]]:
1591-
"""Returns a dictionary representation of the registry contents for the specified project.
1592-
1593-
For each list in the dictionary, the elements are sorted by name, so this
1594-
method can be used to compare two registries.
1595-
1596-
Args:
1597-
project: Feast project to convert to a dict
1598-
"""
1599-
registry_dict: Dict[str, Any] = defaultdict(list)
1600-
registry_dict["project"] = project
1601-
for data_source in sorted(
1602-
self.list_data_sources(project=project), key=lambda ds: ds.name
1603-
):
1604-
registry_dict["dataSources"].append(
1605-
self._message_to_sorted_dict(data_source.to_proto())
1606-
)
1607-
for entity in sorted(
1608-
self.list_entities(project=project), key=lambda entity: entity.name
1609-
):
1610-
registry_dict["entities"].append(
1611-
self._message_to_sorted_dict(entity.to_proto())
1612-
)
1613-
for feature_view in sorted(
1614-
self.list_feature_views(project=project),
1615-
key=lambda feature_view: feature_view.name,
1616-
):
1617-
registry_dict["featureViews"].append(
1618-
self._message_to_sorted_dict(feature_view.to_proto())
1619-
)
1620-
for feature_service in sorted(
1621-
self.list_feature_services(project=project),
1622-
key=lambda feature_service: feature_service.name,
1623-
):
1624-
registry_dict["featureServices"].append(
1625-
self._message_to_sorted_dict(feature_service.to_proto())
1626-
)
1627-
for on_demand_feature_view in sorted(
1628-
self.list_on_demand_feature_views(project=project),
1629-
key=lambda on_demand_feature_view: on_demand_feature_view.name,
1630-
):
1631-
odfv_dict = self._message_to_sorted_dict(on_demand_feature_view.to_proto())
1632-
odfv_dict["spec"]["userDefinedFunction"]["body"] = dill.source.getsource(
1633-
on_demand_feature_view.udf
1634-
)
1635-
registry_dict["onDemandFeatureViews"].append(odfv_dict)
1636-
for request_feature_view in sorted(
1637-
self.list_request_feature_views(project=project),
1638-
key=lambda request_feature_view: request_feature_view.name,
1639-
):
1640-
registry_dict["requestFeatureViews"].append(
1641-
self._message_to_sorted_dict(request_feature_view.to_proto())
1642-
)
1643-
for saved_dataset in sorted(
1644-
self.list_saved_datasets(project=project), key=lambda item: item.name
1645-
):
1646-
registry_dict["savedDatasets"].append(
1647-
self._message_to_sorted_dict(saved_dataset.to_proto())
1648-
)
1649-
for infra_object in sorted(self.get_infra(project=project).infra_objects):
1650-
registry_dict["infra"].append(
1651-
self._message_to_sorted_dict(infra_object.to_proto())
1652-
)
1653-
return registry_dict
1654-
1655-
@staticmethod
1656-
def _message_to_sorted_dict(message: Message) -> Dict[str, Any]:
1657-
return json.loads(MessageToJson(message, sort_keys=True))
1658-
16591671
def _prepare_registry_for_changes(self):
16601672
"""Prepares the Registry for changes by refreshing the cache if necessary."""
16611673
try:

sdk/python/tests/integration/registration/test_sql_registry.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ def mysql_registry():
8585

8686
log_string_to_wait_for = "/usr/sbin/mysqld: ready for connections. Version: '8.0.29' socket: '/var/run/mysqld/mysqld.sock' port: 3306"
8787
waited = wait_for_logs(
88-
container=container, predicate=log_string_to_wait_for, timeout=30, interval=10,
88+
container=container, predicate=log_string_to_wait_for, timeout=60, interval=10,
8989
)
9090
logger.info("Waited for %s seconds until mysql container was up", waited)
9191
container_port = container.get_exposed_port(3306)

0 commit comments

Comments
 (0)