Skip to content

Commit f758f9e

Browse files
feat: Add snowflake online store (#2902)
* feat: Add snowflake online store Signed-off-by: Miles Adkins <[email protected]> * lint/format Signed-off-by: Miles Adkins <[email protected]> * removing missing testing env variables Signed-off-by: Miles Adkins <[email protected]> * test offline store first Signed-off-by: Miles Adkins <[email protected]> * snowflake online test fixes Signed-off-by: Miles Adkins <[email protected]> * format Signed-off-by: Miles Adkins <[email protected]> * fix snowflake testing (#2903) Signed-off-by: Miles Adkins <[email protected]>
1 parent 0ceb39c commit f758f9e

File tree

3 files changed

+423
-2
lines changed

3 files changed

+423
-2
lines changed
Lines changed: 232 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,232 @@
1+
import itertools
2+
import os
3+
from binascii import hexlify
4+
from datetime import datetime
5+
from pathlib import Path
6+
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple
7+
8+
import pandas as pd
9+
import pytz
10+
from pydantic import Field
11+
from pydantic.schema import Literal
12+
13+
from feast import Entity, FeatureView
14+
from feast.infra.key_encoding_utils import serialize_entity_key
15+
from feast.infra.online_stores.online_store import OnlineStore
16+
from feast.infra.utils.snowflake_utils import get_snowflake_conn, write_pandas_binary
17+
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
18+
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
19+
from feast.repo_config import FeastConfigBaseModel, RepoConfig
20+
from feast.usage import log_exceptions_and_usage
21+
22+
23+
class SnowflakeOnlineStoreConfig(FeastConfigBaseModel):
24+
""" Online store config for Snowflake """
25+
26+
type: Literal["snowflake.online"] = "snowflake.online"
27+
""" Online store type selector"""
28+
29+
config_path: Optional[str] = (
30+
Path(os.environ["HOME"]) / ".snowsql/config"
31+
).__str__()
32+
""" Snowflake config path -- absolute path required (Can't use ~)"""
33+
34+
account: Optional[str] = None
35+
""" Snowflake deployment identifier -- drop .snowflakecomputing.com"""
36+
37+
user: Optional[str] = None
38+
""" Snowflake user name """
39+
40+
password: Optional[str] = None
41+
""" Snowflake password """
42+
43+
role: Optional[str] = None
44+
""" Snowflake role name"""
45+
46+
warehouse: Optional[str] = None
47+
""" Snowflake warehouse name """
48+
49+
database: Optional[str] = None
50+
""" Snowflake database name """
51+
52+
schema_: Optional[str] = Field("PUBLIC", alias="schema")
53+
""" Snowflake schema name """
54+
55+
class Config:
56+
allow_population_by_field_name = True
57+
58+
59+
class SnowflakeOnlineStore(OnlineStore):
60+
@log_exceptions_and_usage(online_store="snowflake")
61+
def online_write_batch(
62+
self,
63+
config: RepoConfig,
64+
table: FeatureView,
65+
data: List[
66+
Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]
67+
],
68+
progress: Optional[Callable[[int], Any]],
69+
) -> None:
70+
assert isinstance(config.online_store, SnowflakeOnlineStoreConfig)
71+
72+
dfs = [None] * len(data)
73+
for i, (entity_key, values, timestamp, created_ts) in enumerate(data):
74+
75+
df = pd.DataFrame(
76+
columns=[
77+
"entity_feature_key",
78+
"entity_key",
79+
"feature_name",
80+
"value",
81+
"event_ts",
82+
"created_ts",
83+
],
84+
index=range(0, len(values)),
85+
)
86+
87+
timestamp = _to_naive_utc(timestamp)
88+
if created_ts is not None:
89+
created_ts = _to_naive_utc(created_ts)
90+
91+
for j, (feature_name, val) in enumerate(values.items()):
92+
df.loc[j, "entity_feature_key"] = serialize_entity_key(
93+
entity_key
94+
) + bytes(feature_name, encoding="utf-8")
95+
df.loc[j, "entity_key"] = serialize_entity_key(entity_key)
96+
df.loc[j, "feature_name"] = feature_name
97+
df.loc[j, "value"] = val.SerializeToString()
98+
df.loc[j, "event_ts"] = timestamp
99+
df.loc[j, "created_ts"] = created_ts
100+
101+
dfs[i] = df
102+
if progress:
103+
progress(1)
104+
105+
if dfs:
106+
agg_df = pd.concat(dfs)
107+
108+
with get_snowflake_conn(config.online_store, autocommit=False) as conn:
109+
110+
write_pandas_binary(conn, agg_df, f"{config.project}_{table.name}")
111+
112+
query = f"""
113+
INSERT OVERWRITE INTO "{config.online_store.database}"."{config.online_store.schema_}"."{config.project}_{table.name}"
114+
SELECT
115+
"entity_feature_key",
116+
"entity_key",
117+
"feature_name",
118+
"value",
119+
"event_ts",
120+
"created_ts"
121+
FROM
122+
(SELECT
123+
*,
124+
ROW_NUMBER() OVER(PARTITION BY "entity_key","feature_name" ORDER BY "event_ts" DESC, "created_ts" DESC) AS "_feast_row"
125+
FROM
126+
"{config.online_store.database}"."{config.online_store.schema_}"."{config.project}_{table.name}")
127+
WHERE
128+
"_feast_row" = 1;
129+
"""
130+
131+
conn.cursor().execute(query)
132+
133+
return None
134+
135+
@log_exceptions_and_usage(online_store="snowflake")
136+
def online_read(
137+
self,
138+
config: RepoConfig,
139+
table: FeatureView,
140+
entity_keys: List[EntityKeyProto],
141+
requested_features: List[str],
142+
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
143+
assert isinstance(config.online_store, SnowflakeOnlineStoreConfig)
144+
145+
result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = []
146+
147+
with get_snowflake_conn(config.online_store) as conn:
148+
149+
df = (
150+
conn.cursor()
151+
.execute(
152+
f"""
153+
SELECT
154+
"entity_key", "feature_name", "value", "event_ts"
155+
FROM
156+
"{config.online_store.database}"."{config.online_store.schema_}"."{config.project}_{table.name}"
157+
WHERE
158+
"entity_feature_key" IN ({','.join([('TO_BINARY('+hexlify(serialize_entity_key(combo[0])+bytes(combo[1], encoding='utf-8')).__str__()[1:]+")") for combo in itertools.product(entity_keys,requested_features)])})
159+
""",
160+
)
161+
.fetch_pandas_all()
162+
)
163+
164+
for entity_key in entity_keys:
165+
entity_key_bin = serialize_entity_key(entity_key)
166+
res = {}
167+
res_ts = None
168+
for index, row in df[df["entity_key"] == entity_key_bin].iterrows():
169+
val = ValueProto()
170+
val.ParseFromString(row["value"])
171+
res[row["feature_name"]] = val
172+
res_ts = row["event_ts"].to_pydatetime()
173+
174+
if not res:
175+
result.append((None, None))
176+
else:
177+
result.append((res_ts, res))
178+
return result
179+
180+
@log_exceptions_and_usage(online_store="snowflake")
181+
def update(
182+
self,
183+
config: RepoConfig,
184+
tables_to_delete: Sequence[FeatureView],
185+
tables_to_keep: Sequence[FeatureView],
186+
entities_to_delete: Sequence[Entity],
187+
entities_to_keep: Sequence[Entity],
188+
partial: bool,
189+
):
190+
assert isinstance(config.online_store, SnowflakeOnlineStoreConfig)
191+
192+
with get_snowflake_conn(config.online_store) as conn:
193+
194+
for table in tables_to_keep:
195+
196+
conn.cursor().execute(
197+
f"""CREATE TABLE IF NOT EXISTS "{config.online_store.database}"."{config.online_store.schema_}"."{config.project}_{table.name}" (
198+
"entity_feature_key" BINARY,
199+
"entity_key" BINARY,
200+
"feature_name" VARCHAR,
201+
"value" BINARY,
202+
"event_ts" TIMESTAMP,
203+
"created_ts" TIMESTAMP
204+
)"""
205+
)
206+
207+
for table in tables_to_delete:
208+
209+
conn.cursor().execute(
210+
f'DROP TABLE IF EXISTS "{config.online_store.database}"."{config.online_store.schema_}"."{config.project}_{table.name}"'
211+
)
212+
213+
def teardown(
214+
self,
215+
config: RepoConfig,
216+
tables: Sequence[FeatureView],
217+
entities: Sequence[Entity],
218+
):
219+
assert isinstance(config.online_store, SnowflakeOnlineStoreConfig)
220+
221+
with get_snowflake_conn(config.online_store) as conn:
222+
223+
for table in tables:
224+
query = f'DROP TABLE IF EXISTS "{config.online_store.database}"."{config.online_store.schema_}"."{config.project}_{table.name}"'
225+
conn.cursor().execute(query)
226+
227+
228+
def _to_naive_utc(ts: datetime):
229+
if ts.tzinfo is None:
230+
return ts
231+
else:
232+
return ts.astimezone(pytz.utc).replace(tzinfo=None)

0 commit comments

Comments
 (0)