Skip to content

Commit c9eda79

Browse files
authored
feat: Add hbase online store support in feast (#2590)
* Resolved conflict with latest master Signed-off-by: aurobindoc <[email protected]> * Fixed lint and format Signed-off-by: aurobindoc <[email protected]> * Added documentation for hbase online store Signed-off-by: aurobindoc <[email protected]> * Minor update in documentation for hbase online store Signed-off-by: aurobindoc <[email protected]> * Added docstrings for all the method of hbase online store Signed-off-by: aurobindoc <[email protected]> * Added template for hbase online store Signed-off-by: aurobindoc <[email protected]> * fixed docstring tests Signed-off-by: aurobindoc <[email protected]> * Addressed review comments Signed-off-by: aurobindoc <[email protected]> * resolves a couple of nits Signed-off-by: aurobindoc <[email protected]>
1 parent 00ed65a commit c9eda79

File tree

14 files changed

+627
-1
lines changed

14 files changed

+627
-1
lines changed

CONTRIBUTING.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,7 @@ The services with containerized replacements currently implemented are:
177177
- DynamoDB
178178
- Redis
179179
- Trino
180+
- HBase
180181

181182
You can run `make test-python-integration-container` to run tests against the containerized versions of dependencies.
182183

sdk/python/feast/cli.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -539,7 +539,8 @@ def materialize_incremental_command(ctx: click.Context, end_ts: str, views: List
539539
"--template",
540540
"-t",
541541
type=click.Choice(
542-
["local", "gcp", "aws", "snowflake", "spark", "postgres"], case_sensitive=False
542+
["local", "gcp", "aws", "snowflake", "spark", "postgres", "hbase"],
543+
case_sensitive=False,
543544
),
544545
help="Specify a template for the created project",
545546
default="local",
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
# HBase Online Store
2+
HBase is not included in current [Feast](https://github.com/feast-dev/feast) roadmap, this project intends to add HBase support for Online Store.
3+
We create a table <project_name>_<feature_view_name> which gets updated with data on every materialize call
4+
5+
6+
#### Create a feature repository
7+
8+
```shell
9+
feast init feature_repo
10+
cd feature_repo
11+
```
12+
13+
#### Edit `feature_store.yaml`
14+
15+
set `online_store` type to be `hbase`
16+
17+
```yaml
18+
project: feature_repo
19+
registry: data/registry.db
20+
provider: local
21+
online_store:
22+
type: hbase
23+
host: 127.0.0.1 # hbase thrift endpoint
24+
port: 9090 # hbase thrift api port
25+
```
26+
27+
#### Apply the feature definitions in `example.py`
28+
29+
```shell
30+
feast -c feature_repo apply
31+
```
32+
##### Output
33+
```
34+
Registered entity driver_id
35+
Registered feature view driver_hourly_stats_view
36+
Deploying infrastructure for driver_hourly_stats_view
37+
```
38+
39+
### Materialize Latest Data to Online Feature Store (HBase)
40+
```
41+
$ CURRENT_TIME=$(date -u +"%Y-%m-%dT%H:%M:%S")
42+
$ feast -c feature_repo materialize-incremental $CURRENT_TIME
43+
```
44+
#### Output
45+
```
46+
Materializing 1 feature views from 2022-04-16 15:30:39+05:30 to 2022-04-19 15:31:04+05:30 into the hbase online store.
47+
48+
driver_hourly_stats_view from 2022-04-16 15:30:39+05:30 to 2022-04-19 15:31:04+05:30:
49+
100%|████████████████████████████████████████████████████████████████| 5/5 [00:00<00:00, 120.59it/s]
50+
```
51+
52+
### Fetch the latest features for some entity id
53+
```python
54+
from pprint import pprint
55+
from feast import FeatureStore
56+
57+
store = FeatureStore(repo_path=".")
58+
feature_vector = store.get_online_features(
59+
features=[
60+
"driver_hourly_stats:conv_rate",
61+
"driver_hourly_stats:acc_rate",
62+
"driver_hourly_stats:avg_daily_trips",
63+
],
64+
entity_rows=[
65+
{"driver_id": 1004},
66+
{"driver_id": 1005},
67+
],
68+
).to_dict()
69+
pprint(feature_vector)
70+
71+
```
72+
#### Output
73+
```
74+
{'acc_rate': [0.01390857808291912, 0.4063614010810852],
75+
'avg_daily_trips': [69, 706],
76+
'conv_rate': [0.6624961495399475, 0.7595928311347961],
77+
'driver_id': [1004, 1005]}
78+
```

sdk/python/feast/infra/online_stores/contrib/hbase_online_store/__init__.py

Whitespace-only changes.
Lines changed: 234 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,234 @@
1+
import calendar
2+
import struct
3+
from datetime import datetime
4+
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple
5+
6+
from happybase import Connection
7+
from pydantic.typing import Literal
8+
9+
from feast import Entity
10+
from feast.feature_view import FeatureView
11+
from feast.infra.key_encoding_utils import serialize_entity_key
12+
from feast.infra.online_stores.online_store import OnlineStore
13+
from feast.infra.utils.hbase_utils import HbaseConstants, HbaseUtils
14+
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
15+
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
16+
from feast.repo_config import FeastConfigBaseModel, RepoConfig
17+
18+
19+
class HbaseOnlineStoreConfig(FeastConfigBaseModel):
20+
"""Online store config for Hbase store"""
21+
22+
type: Literal["hbase"] = "hbase"
23+
"""Online store type selector"""
24+
25+
host: str
26+
"""Hostname of Hbase Thrift server"""
27+
28+
port: str
29+
"""Port in which Hbase Thrift server is running"""
30+
31+
32+
class HbaseConnection:
33+
"""
34+
Hbase connecttion to connect to hbase.
35+
36+
Attributes:
37+
store_config: Online store config for Hbase store.
38+
"""
39+
40+
def __init__(self, store_config: HbaseOnlineStoreConfig):
41+
self._store_config = store_config
42+
self._real_conn = Connection(
43+
host=store_config.host, port=int(store_config.port)
44+
)
45+
46+
@property
47+
def real_conn(self) -> Connection:
48+
"""Stores the real happybase Connection to connect to hbase."""
49+
return self._real_conn
50+
51+
def close(self) -> None:
52+
"""Close the happybase connection."""
53+
self.real_conn.close()
54+
55+
56+
class HbaseOnlineStore(OnlineStore):
57+
"""
58+
Online feature store for Hbase.
59+
60+
Attributes:
61+
_conn: Happybase Connection to connect to hbase thrift server.
62+
"""
63+
64+
_conn: Connection = None
65+
66+
def _get_conn(self, config: RepoConfig):
67+
"""
68+
Get or Create Hbase Connection from Repoconfig.
69+
70+
Args:
71+
config: The RepoConfig for the current FeatureStore.
72+
"""
73+
74+
store_config = config.online_store
75+
assert isinstance(store_config, HbaseOnlineStoreConfig)
76+
77+
if not self._conn:
78+
self._conn = Connection(host=store_config.host, port=int(store_config.port))
79+
return self._conn
80+
81+
def online_write_batch(
82+
self,
83+
config: RepoConfig,
84+
table: FeatureView,
85+
data: List[
86+
Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]
87+
],
88+
progress: Optional[Callable[[int], Any]],
89+
) -> None:
90+
"""
91+
Write a batch of feature rows to Hbase online store.
92+
93+
Args:
94+
config: The RepoConfig for the current FeatureStore.
95+
table: Feast FeatureView.
96+
data: a list of quadruplets containing Feature data. Each quadruplet contains an Entity Key,
97+
a dict containing feature values, an event timestamp for the row, and
98+
the created timestamp for the row if it exists.
99+
progress: Optional function to be called once every mini-batch of rows is written to
100+
the online store. Can be used to display progress.
101+
"""
102+
103+
hbase = HbaseUtils(self._get_conn(config))
104+
project = config.project
105+
table_name = _table_id(project, table)
106+
107+
b = hbase.batch(table_name)
108+
for entity_key, values, timestamp, created_ts in data:
109+
row_key = serialize_entity_key(entity_key).hex()
110+
values_dict = {}
111+
for feature_name, val in values.items():
112+
values_dict[
113+
HbaseConstants.get_col_from_feature(feature_name)
114+
] = val.SerializeToString()
115+
if isinstance(timestamp, datetime):
116+
values_dict[HbaseConstants.DEFAULT_EVENT_TS] = struct.pack(
117+
">L", int(calendar.timegm(timestamp.timetuple()))
118+
)
119+
else:
120+
values_dict[HbaseConstants.DEFAULT_EVENT_TS] = timestamp
121+
if created_ts is not None:
122+
if isinstance(created_ts, datetime):
123+
values_dict[HbaseConstants.DEFAULT_CREATED_TS] = struct.pack(
124+
">L", int(calendar.timegm(created_ts.timetuple()))
125+
)
126+
else:
127+
values_dict[HbaseConstants.DEFAULT_CREATED_TS] = created_ts
128+
b.put(row_key, values_dict)
129+
b.send()
130+
131+
def online_read(
132+
self,
133+
config: RepoConfig,
134+
table: FeatureView,
135+
entity_keys: List[EntityKeyProto],
136+
requested_features: Optional[List[str]] = None,
137+
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
138+
"""
139+
Retrieve feature values from the Hbase online store.
140+
141+
Args:
142+
config: The RepoConfig for the current FeatureStore.
143+
table: Feast FeatureView.
144+
entity_keys: a list of entity keys that should be read from the FeatureStore.
145+
"""
146+
hbase = HbaseUtils(self._get_conn(config))
147+
project = config.project
148+
table_name = _table_id(project, table)
149+
150+
result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = []
151+
152+
row_keys = [
153+
serialize_entity_key(entity_key).hex() for entity_key in entity_keys
154+
]
155+
rows = hbase.rows(table_name, row_keys=row_keys)
156+
157+
for _, row in rows:
158+
res = {}
159+
res_ts = None
160+
for feature_name, feature_value in row.items():
161+
f_name = HbaseConstants.get_feature_from_col(feature_name)
162+
if requested_features is not None and f_name in requested_features:
163+
v = ValueProto()
164+
v.ParseFromString(feature_value)
165+
res[f_name] = v
166+
if f_name is HbaseConstants.EVENT_TS:
167+
ts = struct.unpack(">L", feature_value)[0]
168+
res_ts = datetime.fromtimestamp(ts)
169+
if not res:
170+
result.append((None, None))
171+
else:
172+
result.append((res_ts, res))
173+
return result
174+
175+
def update(
176+
self,
177+
config: RepoConfig,
178+
tables_to_delete: Sequence[FeatureView],
179+
tables_to_keep: Sequence[FeatureView],
180+
entities_to_delete: Sequence[Entity],
181+
entities_to_keep: Sequence[Entity],
182+
partial: bool,
183+
):
184+
"""
185+
Update tables from the Hbase Online Store.
186+
187+
Args:
188+
config: The RepoConfig for the current FeatureStore.
189+
tables_to_delete: Tables to delete from the Hbase Online Store.
190+
tables_to_keep: Tables to keep in the Hbase Online Store.
191+
"""
192+
hbase = HbaseUtils(self._get_conn(config))
193+
project = config.project
194+
195+
# We don't create any special state for the entites in this implementation.
196+
for table in tables_to_keep:
197+
table_name = _table_id(project, table)
198+
if not hbase.check_if_table_exist(table_name):
199+
hbase.create_table_with_default_cf(table_name)
200+
201+
for table in tables_to_delete:
202+
table_name = _table_id(project, table)
203+
hbase.delete_table(table_name)
204+
205+
def teardown(
206+
self,
207+
config: RepoConfig,
208+
tables: Sequence[FeatureView],
209+
entities: Sequence[Entity],
210+
):
211+
"""
212+
Delete tables from the Hbase Online Store.
213+
214+
Args:
215+
config: The RepoConfig for the current FeatureStore.
216+
tables: Tables to delete from the feature repo.
217+
"""
218+
hbase = HbaseUtils(self._get_conn(config))
219+
project = config.project
220+
221+
for table in tables:
222+
table_name = _table_id(project, table)
223+
hbase.delete_table(table_name)
224+
225+
226+
def _table_id(project: str, table: FeatureView) -> str:
227+
"""
228+
Returns table name given the project_name and the feature_view.
229+
230+
Args:
231+
project: Name of the feast project.
232+
table: Feast FeatureView.
233+
"""
234+
return f"{project}_{table.name}"
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
from tests.integration.feature_repos.integration_test_repo_config import (
2+
IntegrationTestRepoConfig,
3+
)
4+
from tests.integration.feature_repos.universal.online_store.hbase import (
5+
HbaseOnlineStoreCreator,
6+
)
7+
8+
FULL_REPO_CONFIGS = [
9+
IntegrationTestRepoConfig(online_store_creator=HbaseOnlineStoreCreator),
10+
]

0 commit comments

Comments
 (0)