Skip to content

Commit c8b11b3

Browse files
authored
feat: CLI interface for validation of logged features (#2718)
* store validation reference in registry Signed-off-by: Oleksii Moskalenko <[email protected]> * CLI test Signed-off-by: Oleksii Moskalenko <[email protected]> * clean function before pickle Signed-off-by: Oleksii Moskalenko <[email protected]> * ignore "too complex" lint rule Signed-off-by: Oleksii Moskalenko <[email protected]> * lazy import & correct feature status in logs Signed-off-by: Oleksii Moskalenko <[email protected]> * pygments dependency Signed-off-by: Oleksii Moskalenko <[email protected]> * ttl for regular feature views Signed-off-by: Oleksii Moskalenko <[email protected]> * some apidocs Signed-off-by: Oleksii Moskalenko <[email protected]> * address comments Signed-off-by: Oleksii Moskalenko <[email protected]>
1 parent 8ec0790 commit c8b11b3

30 files changed

+505
-45
lines changed

go/embedded/online_features.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -269,12 +269,12 @@ func (s *OnlineFeatureService) StartGprcServerWithLogging(host string, port int,
269269
go func() {
270270
// As soon as these signals are received from OS, try to gracefully stop the gRPC server
271271
<-s.grpcStopCh
272-
fmt.Println("Stopping the gRPC server...")
272+
log.Println("Stopping the gRPC server...")
273273
grpcServer.GracefulStop()
274274
if loggingService != nil {
275275
loggingService.Stop()
276276
}
277-
fmt.Println("gRPC server terminated")
277+
log.Println("gRPC server terminated")
278278
}()
279279

280280
err = grpcServer.Serve(lis)
@@ -314,11 +314,15 @@ func (s *OnlineFeatureService) StartHttpServerWithLogging(host string, port int,
314314
go func() {
315315
// As soon as these signals are received from OS, try to gracefully stop the gRPC server
316316
<-s.httpStopCh
317-
fmt.Println("Stopping the HTTP server...")
317+
log.Println("Stopping the HTTP server...")
318318
err := ser.Stop()
319319
if err != nil {
320-
fmt.Printf("Error when stopping the HTTP server: %v\n", err)
320+
log.Printf("Error when stopping the HTTP server: %v\n", err)
321321
}
322+
if loggingService != nil {
323+
loggingService.Stop()
324+
}
325+
log.Println("HTTP server terminated")
322326
}()
323327

324328
return ser.Serve(host, port)

protos/feast/core/FeatureService.proto

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,6 @@ message FeatureServiceMeta {
5454

5555
message LoggingConfig {
5656
float sample_rate = 1;
57-
google.protobuf.Duration partition_interval = 2;
5857

5958
oneof destination {
6059
FileDestination file_destination = 3;

protos/feast/core/Registry.proto

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,10 @@ import "feast/core/OnDemandFeatureView.proto";
3030
import "feast/core/RequestFeatureView.proto";
3131
import "feast/core/DataSource.proto";
3232
import "feast/core/SavedDataset.proto";
33+
import "feast/core/ValidationProfile.proto";
3334
import "google/protobuf/timestamp.proto";
3435

35-
// Next id: 13
36+
// Next id: 14
3637
message Registry {
3738
repeated Entity entities = 1;
3839
repeated FeatureTable feature_tables = 2;
@@ -42,6 +43,7 @@ message Registry {
4243
repeated RequestFeatureView request_feature_views = 9;
4344
repeated FeatureService feature_services = 7;
4445
repeated SavedDataset saved_datasets = 11;
46+
repeated ValidationReference validation_references = 13;
4547
Infra infra = 10;
4648

4749
string registry_schema_version = 3; // to support migrations; incremented when schema is changed

protos/feast/core/ValidationProfile.proto

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,24 @@ message GEValidationProfile {
3939
}
4040

4141
message ValidationReference {
42-
SavedDataset dataset = 1;
43-
42+
// Unique name of validation reference within the project
43+
string name = 1;
44+
// Name of saved dataset used as reference dataset
45+
string reference_dataset_name = 2;
46+
// Name of Feast project that this object source belongs to
47+
string project = 3;
48+
// Description of the validation reference
49+
string description = 4;
50+
// User defined metadata
51+
map<string,string> tags = 5;
52+
53+
// validation profiler
4454
oneof profiler {
45-
GEValidationProfiler ge_profiler = 2;
55+
GEValidationProfiler ge_profiler = 6;
56+
}
57+
58+
// (optional) cached validation profile (to avoid constant recalculation)
59+
oneof cached_profile {
60+
GEValidationProfile ge_profile = 7;
4661
}
4762
}

sdk/python/feast/cli.py

Lines changed: 58 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
14-
14+
import json
1515
import logging
1616
import warnings
1717
from datetime import datetime
@@ -23,6 +23,7 @@
2323
import yaml
2424
from colorama import Fore, Style
2525
from dateutil import parser
26+
from pygments import formatters, highlight, lexers
2627

2728
from feast import flags, flags_helper, utils
2829
from feast.constants import DEFAULT_FEATURE_TRANSFORMATION_SERVER_PORT
@@ -758,5 +759,61 @@ def disable_alpha_features(ctx: click.Context):
758759
store.config.write_to_path(Path(repo_path))
759760

760761

762+
@cli.command("validate")
763+
@click.option(
764+
"--feature-service", "-f", help="Specify a feature service name",
765+
)
766+
@click.option(
767+
"--reference", "-r", help="Specify a validation reference name",
768+
)
769+
@click.option(
770+
"--no-profile-cache", is_flag=True, help="Do not store cached profile in registry",
771+
)
772+
@click.argument("start_ts")
773+
@click.argument("end_ts")
774+
@click.pass_context
775+
def validate(
776+
ctx: click.Context,
777+
feature_service: str,
778+
reference: str,
779+
start_ts: str,
780+
end_ts: str,
781+
no_profile_cache,
782+
):
783+
"""
784+
Perform validation of logged features (produced by a given feature service) against provided reference.
785+
786+
START_TS and END_TS should be in ISO 8601 format, e.g. '2021-07-16T19:20:01'
787+
"""
788+
repo = ctx.obj["CHDIR"]
789+
cli_check_repo(repo)
790+
store = FeatureStore(repo_path=str(repo))
791+
792+
feature_service = store.get_feature_service(name=feature_service)
793+
reference = store.get_validation_reference(reference)
794+
795+
result = store.validate_logged_features(
796+
source=feature_service,
797+
reference=reference,
798+
start=datetime.fromisoformat(start_ts),
799+
end=datetime.fromisoformat(end_ts),
800+
throw_exception=False,
801+
cache_profile=not no_profile_cache,
802+
)
803+
804+
if not result:
805+
print(f"{Style.BRIGHT + Fore.GREEN}Validation successful!{Style.RESET_ALL}")
806+
return
807+
808+
errors = [e.to_dict() for e in result.report.errors]
809+
formatted_json = json.dumps(errors, indent=4)
810+
colorful_json = highlight(
811+
formatted_json, lexers.JsonLexer(), formatters.TerminalFormatter()
812+
)
813+
print(f"{Style.BRIGHT + Fore.RED}Validation failed!{Style.RESET_ALL}")
814+
print(colorful_json)
815+
exit(1)
816+
817+
761818
if __name__ == "__main__":
762819
cli()

sdk/python/feast/diff/registry_diff.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@
2020
from feast.protos.feast.core.RequestFeatureView_pb2 import (
2121
RequestFeatureView as RequestFeatureViewProto,
2222
)
23+
from feast.protos.feast.core.ValidationProfile_pb2 import (
24+
ValidationReference as ValidationReferenceProto,
25+
)
2326
from feast.registry import FEAST_OBJECT_TYPES, FeastObjectType, Registry
2427
from feast.repo_contents import RepoContents
2528

@@ -103,6 +106,7 @@ def tag_objects_for_keep_delete_update_add(
103106
FeatureServiceProto,
104107
OnDemandFeatureViewProto,
105108
RequestFeatureViewProto,
109+
ValidationReferenceProto,
106110
)
107111

108112

@@ -120,9 +124,9 @@ def diff_registry_objects(
120124

121125
current_spec: FeastObjectSpecProto
122126
new_spec: FeastObjectSpecProto
123-
if isinstance(current_proto, DataSourceProto) or isinstance(
124-
new_proto, DataSourceProto
125-
):
127+
if isinstance(
128+
current_proto, (DataSourceProto, ValidationReferenceProto)
129+
) or isinstance(new_proto, (DataSourceProto, ValidationReferenceProto)):
126130
assert type(current_proto) == type(new_proto)
127131
current_spec = cast(DataSourceProto, current_proto)
128132
new_spec = cast(DataSourceProto, new_proto)

sdk/python/feast/dqm/profilers/ge_profiler.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import json
2+
from types import FunctionType
23
from typing import Any, Callable, Dict, List
34

45
import dill
@@ -140,9 +141,12 @@ def analyze_dataset(self, df: pd.DataFrame) -> Profile:
140141
return GEProfile(expectation_suite=self.user_defined_profiler(dataset))
141142

142143
def to_proto(self):
144+
# keep only the code and drop context for now
145+
# ToDo (pyalex): include some context, but not all (dill tries to pull too much)
146+
udp = FunctionType(self.user_defined_profiler.__code__, {})
143147
return GEValidationProfilerProto(
144148
profiler=GEValidationProfilerProto.UserDefinedProfiler(
145-
body=dill.dumps(self.user_defined_profiler, recurse=True)
149+
body=dill.dumps(udp, recurse=False)
146150
)
147151
)
148152

sdk/python/feast/dqm/profilers/profiler.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ class ValidationError:
6969

7070
missing_count: Optional[int]
7171
missing_percent: Optional[float]
72+
observed_value: Optional[float]
7273

7374
def __init__(
7475
self,
@@ -77,12 +78,24 @@ def __init__(
7778
check_config: Optional[Any] = None,
7879
missing_count: Optional[int] = None,
7980
missing_percent: Optional[float] = None,
81+
observed_value: Optional[float] = None,
8082
):
8183
self.check_name = check_name
8284
self.column_name = column_name
8385
self.check_config = check_config
8486
self.missing_count = missing_count
8587
self.missing_percent = missing_percent
88+
self.observed_value = observed_value
8689

8790
def __repr__(self):
8891
return f"<ValidationError {self.check_name}:{self.column_name}>"
92+
93+
def to_dict(self):
94+
return dict(
95+
check_name=self.check_name,
96+
column_name=self.column_name,
97+
check_config=self.check_config,
98+
missing_count=self.missing_count,
99+
missing_percent=self.missing_percent,
100+
observed_value=self.observed_value,
101+
)

sdk/python/feast/errors.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,13 @@ def __init__(self, name: str, project: str):
9494
super().__init__(f"Saved dataset {name} does not exist in project {project}")
9595

9696

97+
class ValidationReferenceNotFound(FeastObjectNotFoundException):
98+
def __init__(self, name: str, project: str):
99+
super().__init__(
100+
f"Validation reference {name} does not exist in project {project}"
101+
)
102+
103+
97104
class FeastProviderLoginError(Exception):
98105
"""Error class that indicates a user has not authenticated with their provider."""
99106

sdk/python/feast/feast_object.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,11 @@
1111
from .protos.feast.core.FeatureView_pb2 import FeatureViewSpec
1212
from .protos.feast.core.OnDemandFeatureView_pb2 import OnDemandFeatureViewSpec
1313
from .protos.feast.core.RequestFeatureView_pb2 import RequestFeatureViewSpec
14+
from .protos.feast.core.ValidationProfile_pb2 import (
15+
ValidationReference as ValidationReferenceProto,
16+
)
1417
from .request_feature_view import RequestFeatureView
18+
from .saved_dataset import ValidationReference
1519

1620
# Convenience type representing all Feast objects
1721
FeastObject = Union[
@@ -21,6 +25,7 @@
2125
Entity,
2226
FeatureService,
2327
DataSource,
28+
ValidationReference,
2429
]
2530

2631
FeastObjectSpecProto = Union[
@@ -30,4 +35,5 @@
3035
EntitySpecV2,
3136
FeatureServiceSpec,
3237
DataSourceProto,
38+
ValidationReferenceProto,
3339
]

0 commit comments

Comments
 (0)