Skip to content

Commit 452dcd3

Browse files
authored
fix: Hydrate infra object in the sql registry proto() method (#2782)
* fix: Implement apply_materialziation and infra methods in sql registry Signed-off-by: Achal Shah <[email protected]> * fix: hydrate infra object in the sql registry proto() method Signed-off-by: Achal Shah <[email protected]> * rm old comment Signed-off-by: Achal Shah <[email protected]>
1 parent 331a214 commit 452dcd3

File tree

1 file changed

+53
-1
lines changed
  • sdk/python/feast/infra/registry_stores

1 file changed

+53
-1
lines changed

sdk/python/feast/infra/registry_stores/sql.py

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,15 @@
148148
)
149149

150150

151+
feast_metadata = Table(
152+
"feast_metadata",
153+
metadata,
154+
Column("metadata_key", String(50), primary_key=True),
155+
Column("metadata_value", String(50), nullable=False),
156+
Column("last_updated_timestamp", BigInteger, nullable=False),
157+
)
158+
159+
151160
class SqlRegistry(BaseRegistry):
152161
def __init__(
153162
self, registry_config: Optional[RegistryConfig], repo_path: Optional[Path]
@@ -575,7 +584,6 @@ def get_user_metadata(
575584
def proto(self) -> RegistryProto:
576585
r = RegistryProto()
577586
project = ""
578-
# TODO(achal): Support Infra object, and last_updated_timestamp.
579587
for lister, registry_proto_field in [
580588
(self.list_entities, r.entities),
581589
(self.list_feature_views, r.feature_views),
@@ -591,6 +599,11 @@ def proto(self) -> RegistryProto:
591599
if objs:
592600
registry_proto_field.extend([obj.to_proto() for obj in objs])
593601

602+
r.infra.CopyFrom(self.get_infra(project).to_proto())
603+
last_updated_timestamp = self._get_last_updated_metadata()
604+
if last_updated_timestamp:
605+
r.last_updated.FromDatetime(last_updated_timestamp)
606+
594607
return r
595608

596609
def commit(self):
@@ -626,13 +639,15 @@ def _apply_object(self, table, id_field_name, obj, proto_field_name, name=None):
626639
}
627640
insert_stmt = insert(table).values(values,)
628641
conn.execute(insert_stmt)
642+
self._set_last_updated_metadata(update_datetime)
629643

630644
def _delete_object(self, table, name, project, id_field_name, not_found_exception):
631645
with self.engine.connect() as conn:
632646
stmt = delete(table).where(getattr(table.c, id_field_name) == name)
633647
rows = conn.execute(stmt)
634648
if rows.rowcount < 1 and not_found_exception:
635649
raise not_found_exception(name, project)
650+
self._set_last_updated_metadata(datetime.utcnow())
636651
return rows.rowcount
637652

638653
def _get_object(
@@ -666,3 +681,40 @@ def _list_objects(self, table, proto_class, python_class, proto_field_name):
666681
for row in rows
667682
]
668683
return []
684+
685+
def _set_last_updated_metadata(self, last_updated: datetime):
686+
with self.engine.connect() as conn:
687+
stmt = select(feast_metadata).where(
688+
feast_metadata.c.metadata_key == "last_updated_timestamp"
689+
)
690+
row = conn.execute(stmt).first()
691+
692+
update_time = int(last_updated.timestamp())
693+
694+
values = {
695+
"metadata_key": "last_updated_timestamp",
696+
"metadata_value": f"{update_time}",
697+
"last_updated_timestamp": update_time,
698+
}
699+
if row:
700+
update_stmt = (
701+
update(feast_metadata)
702+
.where(feast_metadata.c.metadata_key == "last_updated_timestamp")
703+
.values(values)
704+
)
705+
conn.execute(update_stmt)
706+
else:
707+
insert_stmt = insert(feast_metadata).values(values,)
708+
conn.execute(insert_stmt)
709+
710+
def _get_last_updated_metadata(self):
711+
with self.engine.connect() as conn:
712+
stmt = select(feast_metadata).where(
713+
feast_metadata.c.metadata_key == "last_updated_timestamp"
714+
)
715+
row = conn.execute(stmt).first()
716+
if not row:
717+
return None
718+
update_time = int(row["last_updated_timestamp"])
719+
720+
return datetime.utcfromtimestamp(update_time)

0 commit comments

Comments
 (0)