Skip to content

Commit 3347a57

Browse files
author
Tsotne Tabidze
authored
feat: Add http endpoint to the Go feature server (#2658)
* feat: Add http endpoint to the Go feature server Signed-off-by: Tsotne Tabidze <[email protected]> * Remove debugging prints Signed-off-by: Tsotne Tabidze <[email protected]> * Remove grpc keyword where not necessary Signed-off-by: Tsotne Tabidze <[email protected]> * Add feature logging to http server Signed-off-by: Tsotne Tabidze <[email protected]> * Format timestamp correctly Signed-off-by: Tsotne Tabidze <[email protected]> * Add unit test for UnmarshalJSON Signed-off-by: Tsotne Tabidze <[email protected]> * Add e2e http server test & fix logging bug Signed-off-by: Tsotne Tabidze <[email protected]> * Add separate methods for stopping http & grpc servers Signed-off-by: Tsotne Tabidze <[email protected]>
1 parent f4eed30 commit 3347a57

File tree

10 files changed

+501
-27
lines changed

10 files changed

+501
-27
lines changed

go/embedded/online_features.go

Lines changed: 65 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
type OnlineFeatureService struct {
3232
fs *feast.FeatureStore
3333
grpcStopCh chan os.Signal
34+
httpStopCh chan os.Signal
3435
}
3536

3637
type OnlineFeatureServiceConfig struct {
@@ -63,11 +64,13 @@ func NewOnlineFeatureService(conf *OnlineFeatureServiceConfig, transformationCal
6364
log.Fatalln(err)
6465
}
6566

66-
// Notify this channel when receiving interrupt or termination signals from OS
67-
c := make(chan os.Signal, 1)
68-
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
67+
// Notify these channels when receiving interrupt or termination signals from OS
68+
httpStopCh := make(chan os.Signal, 1)
69+
grpcStopCh := make(chan os.Signal, 1)
70+
signal.Notify(httpStopCh, syscall.SIGINT, syscall.SIGTERM)
71+
signal.Notify(grpcStopCh, syscall.SIGINT, syscall.SIGTERM)
6972

70-
return &OnlineFeatureService{fs: fs, grpcStopCh: c}
73+
return &OnlineFeatureService{fs: fs, httpStopCh: httpStopCh, grpcStopCh: grpcStopCh}
7174
}
7275

7376
func (s *OnlineFeatureService) GetEntityTypesMap(featureRefs []string) (map[string]int32, error) {
@@ -225,15 +228,12 @@ func (s *OnlineFeatureService) StartGprcServerWithLoggingDefaultOpts(host string
225228
return s.StartGprcServerWithLogging(host, port, writeLoggedFeaturesCallback, defaultOpts)
226229
}
227230

228-
// StartGprcServerWithLogging starts gRPC server with enabled feature logging
229-
// Caller of this function must provide Python callback to flush buffered logs as well as logging configuration (loggingOpts)
230-
func (s *OnlineFeatureService) StartGprcServerWithLogging(host string, port int, writeLoggedFeaturesCallback logging.OfflineStoreWriteCallback, loggingOpts LoggingOptions) error {
231+
func (s *OnlineFeatureService) constructLoggingService(writeLoggedFeaturesCallback logging.OfflineStoreWriteCallback, loggingOpts LoggingOptions) (*logging.LoggingService, error) {
231232
var loggingService *logging.LoggingService = nil
232-
var err error
233233
if writeLoggedFeaturesCallback != nil {
234234
sink, err := logging.NewOfflineStoreSink(writeLoggedFeaturesCallback)
235235
if err != nil {
236-
return err
236+
return nil, err
237237
}
238238

239239
loggingService, err = logging.NewLoggingService(s.fs, sink, logging.LoggingOptions{
@@ -243,9 +243,19 @@ func (s *OnlineFeatureService) StartGprcServerWithLogging(host string, port int,
243243
FlushInterval: loggingOpts.FlushInterval,
244244
})
245245
if err != nil {
246-
return err
246+
return nil, err
247247
}
248248
}
249+
return loggingService, nil
250+
}
251+
252+
// StartGprcServerWithLogging starts gRPC server with enabled feature logging
253+
// Caller of this function must provide Python callback to flush buffered logs as well as logging configuration (loggingOpts)
254+
func (s *OnlineFeatureService) StartGprcServerWithLogging(host string, port int, writeLoggedFeaturesCallback logging.OfflineStoreWriteCallback, loggingOpts LoggingOptions) error {
255+
loggingService, err := s.constructLoggingService(writeLoggedFeaturesCallback, loggingOpts)
256+
if err != nil {
257+
return err
258+
}
249259
ser := server.NewGrpcServingServiceServer(s.fs, loggingService)
250260
log.Printf("Starting a gRPC server on host %s port %d\n", host, port)
251261
lis, err := net.Listen("tcp", fmt.Sprintf("%s:%d", host, port))
@@ -274,7 +284,51 @@ func (s *OnlineFeatureService) StartGprcServerWithLogging(host string, port int,
274284
return nil
275285
}
276286

277-
func (s *OnlineFeatureService) Stop() {
287+
// StartHttpServer starts HTTP server with disabled feature logging and blocks the thread
288+
func (s *OnlineFeatureService) StartHttpServer(host string, port int) error {
289+
return s.StartHttpServerWithLogging(host, port, nil, LoggingOptions{})
290+
}
291+
292+
// StartHttpServerWithLoggingDefaultOpts starts HTTP server with enabled feature logging but default configuration for logging
293+
// Caller of this function must provide Python callback to flush buffered logs
294+
func (s *OnlineFeatureService) StartHttpServerWithLoggingDefaultOpts(host string, port int, writeLoggedFeaturesCallback logging.OfflineStoreWriteCallback) error {
295+
defaultOpts := LoggingOptions{
296+
ChannelCapacity: logging.DefaultOptions.ChannelCapacity,
297+
EmitTimeout: logging.DefaultOptions.EmitTimeout,
298+
WriteInterval: logging.DefaultOptions.WriteInterval,
299+
FlushInterval: logging.DefaultOptions.FlushInterval,
300+
}
301+
return s.StartHttpServerWithLogging(host, port, writeLoggedFeaturesCallback, defaultOpts)
302+
}
303+
304+
// StartHttpServerWithLogging starts HTTP server with enabled feature logging
305+
// Caller of this function must provide Python callback to flush buffered logs as well as logging configuration (loggingOpts)
306+
func (s *OnlineFeatureService) StartHttpServerWithLogging(host string, port int, writeLoggedFeaturesCallback logging.OfflineStoreWriteCallback, loggingOpts LoggingOptions) error {
307+
loggingService, err := s.constructLoggingService(writeLoggedFeaturesCallback, loggingOpts)
308+
if err != nil {
309+
return err
310+
}
311+
ser := server.NewHttpServer(s.fs, loggingService)
312+
log.Printf("Starting a HTTP server on host %s port %d\n", host, port)
313+
314+
go func() {
315+
// As soon as these signals are received from OS, try to gracefully stop the gRPC server
316+
<-s.httpStopCh
317+
fmt.Println("Stopping the HTTP server...")
318+
err := ser.Stop()
319+
if err != nil {
320+
fmt.Printf("Error when stopping the HTTP server: %v\n", err)
321+
}
322+
}()
323+
324+
return ser.Serve(host, port)
325+
}
326+
327+
func (s *OnlineFeatureService) StopHttpServer() {
328+
s.httpStopCh <- syscall.SIGINT
329+
}
330+
331+
func (s *OnlineFeatureService) StopGrpcServer() {
278332
s.grpcStopCh <- syscall.SIGINT
279333
}
280334

go/internal/feast/server/grpc_server.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ func (s *grpcServingServiceServer) GetOnlineFeatures(ctx context.Context, reques
8686
fmt.Printf("Couldn't instantiate logger for feature service %s: %+v", featuresOrService.FeatureService.Name, err)
8787
}
8888

89-
err = logger.Log(entityValuesMap, resp.Results[len(request.Entities):], resp.Metadata.FeatureNames.Val[len(request.Entities):], request.RequestContext, requestId)
89+
err = logger.Log(request.Entities, resp.Results[len(request.Entities):], resp.Metadata.FeatureNames.Val[len(request.Entities):], request.RequestContext, requestId)
9090
if err != nil {
9191
fmt.Printf("LoggerImpl error[%s]: %+v", featuresOrService.FeatureService.Name, err)
9292
}
Lines changed: 270 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,270 @@
1+
package server
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"fmt"
7+
"github.com/feast-dev/feast/go/internal/feast"
8+
"github.com/feast-dev/feast/go/internal/feast/model"
9+
"github.com/feast-dev/feast/go/internal/feast/server/logging"
10+
"github.com/feast-dev/feast/go/protos/feast/serving"
11+
prototypes "github.com/feast-dev/feast/go/protos/feast/types"
12+
"github.com/feast-dev/feast/go/types"
13+
"net/http"
14+
"time"
15+
)
16+
17+
type httpServer struct {
18+
fs *feast.FeatureStore
19+
loggingService *logging.LoggingService
20+
server *http.Server
21+
}
22+
23+
// Some Feast types aren't supported during JSON conversion
24+
type repeatedValue struct {
25+
stringVal []string
26+
int64Val []int64
27+
doubleVal []float64
28+
boolVal []bool
29+
stringListVal [][]string
30+
int64ListVal [][]int64
31+
doubleListVal [][]float64
32+
boolListVal [][]bool
33+
}
34+
35+
func (u *repeatedValue) UnmarshalJSON(data []byte) error {
36+
isString := false
37+
isDouble := false
38+
isInt64 := false
39+
isArray := false
40+
openBraketCounter := 0
41+
for _, b := range data {
42+
if b == '"' {
43+
isString = true
44+
}
45+
if b == '.' {
46+
isDouble = true
47+
}
48+
if b >= '0' && b <= '9' {
49+
isInt64 = true
50+
}
51+
if b == '[' {
52+
openBraketCounter++
53+
if openBraketCounter > 1 {
54+
isArray = true
55+
}
56+
}
57+
}
58+
var err error
59+
if !isArray {
60+
if isString {
61+
err = json.Unmarshal(data, &u.stringVal)
62+
} else if isDouble {
63+
err = json.Unmarshal(data, &u.doubleVal)
64+
} else if isInt64 {
65+
err = json.Unmarshal(data, &u.int64Val)
66+
} else {
67+
err = json.Unmarshal(data, &u.boolVal)
68+
}
69+
} else {
70+
if isString {
71+
err = json.Unmarshal(data, &u.stringListVal)
72+
} else if isDouble {
73+
err = json.Unmarshal(data, &u.doubleListVal)
74+
} else if isInt64 {
75+
err = json.Unmarshal(data, &u.int64ListVal)
76+
} else {
77+
err = json.Unmarshal(data, &u.boolListVal)
78+
}
79+
}
80+
return err
81+
}
82+
83+
func (u *repeatedValue) ToProto() *prototypes.RepeatedValue {
84+
proto := new(prototypes.RepeatedValue)
85+
if u.stringVal != nil {
86+
for _, val := range u.stringVal {
87+
proto.Val = append(proto.Val, &prototypes.Value{Val: &prototypes.Value_StringVal{StringVal: val}})
88+
}
89+
}
90+
if u.int64Val != nil {
91+
for _, val := range u.int64Val {
92+
proto.Val = append(proto.Val, &prototypes.Value{Val: &prototypes.Value_Int64Val{Int64Val: val}})
93+
}
94+
}
95+
if u.doubleVal != nil {
96+
for _, val := range u.doubleVal {
97+
proto.Val = append(proto.Val, &prototypes.Value{Val: &prototypes.Value_DoubleVal{DoubleVal: val}})
98+
}
99+
}
100+
if u.boolVal != nil {
101+
for _, val := range u.boolVal {
102+
proto.Val = append(proto.Val, &prototypes.Value{Val: &prototypes.Value_BoolVal{BoolVal: val}})
103+
}
104+
}
105+
if u.stringListVal != nil {
106+
for _, val := range u.stringListVal {
107+
proto.Val = append(proto.Val, &prototypes.Value{Val: &prototypes.Value_StringListVal{StringListVal: &prototypes.StringList{Val: val}}})
108+
}
109+
}
110+
if u.int64ListVal != nil {
111+
for _, val := range u.int64ListVal {
112+
proto.Val = append(proto.Val, &prototypes.Value{Val: &prototypes.Value_Int64ListVal{Int64ListVal: &prototypes.Int64List{Val: val}}})
113+
}
114+
}
115+
if u.doubleListVal != nil {
116+
for _, val := range u.doubleListVal {
117+
proto.Val = append(proto.Val, &prototypes.Value{Val: &prototypes.Value_DoubleListVal{DoubleListVal: &prototypes.DoubleList{Val: val}}})
118+
}
119+
}
120+
if u.boolListVal != nil {
121+
for _, val := range u.boolListVal {
122+
proto.Val = append(proto.Val, &prototypes.Value{Val: &prototypes.Value_BoolListVal{BoolListVal: &prototypes.BoolList{Val: val}}})
123+
}
124+
}
125+
return proto
126+
}
127+
128+
type getOnlineFeaturesRequest struct {
129+
FeatureService *string `json:"feature_service"`
130+
Features []string `json:"features"`
131+
Entities map[string]repeatedValue `json:"entities"`
132+
FullFeatureNames bool `json:"full_feature_names"`
133+
RequestContext map[string]repeatedValue `json:"request_context"`
134+
}
135+
136+
func NewHttpServer(fs *feast.FeatureStore, loggingService *logging.LoggingService) *httpServer {
137+
return &httpServer{fs: fs, loggingService: loggingService}
138+
}
139+
140+
func (s *httpServer) getOnlineFeatures(w http.ResponseWriter, r *http.Request) {
141+
if r.Method != "POST" {
142+
http.NotFound(w, r)
143+
return
144+
}
145+
146+
decoder := json.NewDecoder(r.Body)
147+
var request getOnlineFeaturesRequest
148+
err := decoder.Decode(&request)
149+
if err != nil {
150+
http.Error(w, fmt.Sprintf("Error decoding JSON request data: %+v", err), http.StatusInternalServerError)
151+
return
152+
}
153+
var featureService *model.FeatureService
154+
if request.FeatureService != nil {
155+
featureService, err = s.fs.GetFeatureService(*request.FeatureService)
156+
if err != nil {
157+
http.Error(w, fmt.Sprintf("Error getting feature service from registry: %+v", err), http.StatusInternalServerError)
158+
return
159+
}
160+
}
161+
entitiesProto := make(map[string]*prototypes.RepeatedValue)
162+
for key, value := range request.Entities {
163+
entitiesProto[key] = value.ToProto()
164+
}
165+
requestContextProto := make(map[string]*prototypes.RepeatedValue)
166+
for key, value := range request.RequestContext {
167+
requestContextProto[key] = value.ToProto()
168+
}
169+
170+
featureVectors, err := s.fs.GetOnlineFeatures(
171+
r.Context(),
172+
request.Features,
173+
featureService,
174+
entitiesProto,
175+
requestContextProto,
176+
request.FullFeatureNames)
177+
178+
if err != nil {
179+
http.Error(w, fmt.Sprintf("Error getting feature vector: %+v", err), http.StatusInternalServerError)
180+
return
181+
}
182+
183+
var featureNames []string
184+
var results []map[string]interface{}
185+
for _, vector := range featureVectors {
186+
featureNames = append(featureNames, vector.Name)
187+
result := make(map[string]interface{})
188+
var statuses []string
189+
for _, status := range vector.Statuses {
190+
statuses = append(statuses, status.String())
191+
}
192+
var timestamps []string
193+
for _, timestamp := range vector.Timestamps {
194+
timestamps = append(timestamps, timestamp.AsTime().Format(time.RFC3339))
195+
}
196+
197+
result["statuses"] = statuses
198+
result["event_timestamps"] = timestamps
199+
// Note, that vector.Values is an Arrow Array, but this type implements JSON Marshaller.
200+
// So, it's not necessary to pre-process it in any way.
201+
result["values"] = vector.Values
202+
203+
results = append(results, result)
204+
}
205+
206+
response := map[string]interface{}{
207+
"metadata": map[string]interface{}{
208+
"feature_names": featureNames,
209+
},
210+
"results": results,
211+
}
212+
213+
err = json.NewEncoder(w).Encode(response)
214+
215+
if err != nil {
216+
http.Error(w, fmt.Sprintf("Error encoding response: %+v", err), http.StatusInternalServerError)
217+
return
218+
}
219+
220+
w.Header().Set("Content-Type", "application/json")
221+
222+
if featureService != nil && featureService.LoggingConfig != nil && s.loggingService != nil {
223+
logger, err := s.loggingService.GetOrCreateLogger(featureService)
224+
if err != nil {
225+
http.Error(w, fmt.Sprintf("Couldn't instantiate logger for feature service %s: %+v", featureService.Name, err), http.StatusInternalServerError)
226+
return
227+
}
228+
229+
requestId := GenerateRequestId()
230+
231+
// Note: we're converting arrow to proto for feature logging. In the future we should
232+
// base feature logging on arrow so that we don't have to do this extra conversion.
233+
var featureVectorProtos []*serving.GetOnlineFeaturesResponse_FeatureVector
234+
for _, vector := range featureVectors[len(request.Entities):] {
235+
values, err := types.ArrowValuesToProtoValues(vector.Values)
236+
if err != nil {
237+
http.Error(w, fmt.Sprintf("Couldn't convert arrow values into protobuf: %+v", err), http.StatusInternalServerError)
238+
return
239+
}
240+
featureVectorProtos = append(featureVectorProtos, &serving.GetOnlineFeaturesResponse_FeatureVector{
241+
Values: values,
242+
Statuses: vector.Statuses,
243+
EventTimestamps: vector.Timestamps,
244+
})
245+
}
246+
247+
err = logger.Log(entitiesProto, featureVectorProtos, featureNames[len(request.Entities):], requestContextProto, requestId)
248+
if err != nil {
249+
http.Error(w, fmt.Sprintf("LoggerImpl error[%s]: %+v", featureService.Name, err), http.StatusInternalServerError)
250+
return
251+
}
252+
}
253+
}
254+
255+
func (s *httpServer) Serve(host string, port int) error {
256+
s.server = &http.Server{Addr: fmt.Sprintf("%s:%d", host, port), Handler: nil}
257+
http.HandleFunc("/get-online-features", s.getOnlineFeatures)
258+
err := s.server.ListenAndServe()
259+
// Don't return the error if it's caused by graceful shutdown using Stop()
260+
if err == http.ErrServerClosed {
261+
return nil
262+
}
263+
return err
264+
}
265+
func (s *httpServer) Stop() error {
266+
if s.server != nil {
267+
return s.server.Shutdown(context.Background())
268+
}
269+
return nil
270+
}

0 commit comments

Comments
 (0)