Skip to content

Commit 05a3958

Browse files
authored
fix(lambda): monitor lambda server, fix performance issue, remove lambda logs from extensions (#8006)
Running lambda script in a new context adds a performance penalty. This PR adds a health monitor in alpha that periodically pings lambda servers to check their responsiveness. The unresponsive lambda server ( configurable via --lambda=restart-after flag, default is10s) is restarted. Also, this PR removes the lambda logs from the GraphQL response's extension. Instead, they are spit out in alpha logs itself with the prefix [LAMBDA-${ns}]
1 parent 20c9a59 commit 05a3958

26 files changed

+213
-222
lines changed

compose/compose.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -705,7 +705,8 @@ func main() {
705705
"./docker-compose.yml", "name of output file")
706706
cmd.PersistentFlags().BoolVarP(&opts.LocalBin, "local", "l", true,
707707
"use locally-compiled binary if true, otherwise use binary from docker container")
708-
cmd.PersistentFlags().StringVar(&opts.Image, "image", "dgraph/dgraph",
708+
// TODO(Naman): Change this to dgraph/dgraph once the lambda changes are released.
709+
cmd.PersistentFlags().StringVar(&opts.Image, "image", "public.ecr.aws/n1e3y0t3/dgraph-lambda",
709710
"Docker image for alphas and zeros.")
710711
cmd.PersistentFlags().StringVarP(&opts.Tag, "tag", "t", "latest",
711712
"Docker tag for the --image image. Requires -l=false to use binary from docker container.")

contrib/bench-lambda/load-data.sh

100644100755
File mode changed.

contrib/bench-lambda/main.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,8 @@ func main() {
3535
callDgraph(client, url)
3636
}
3737
if num := atomic.AddInt32(&count, 1); num%1000 == 0 {
38-
elasped := time.Since(start).Round(time.Second).Seconds()
39-
if elasped == 0 {
38+
elasped := time.Since(start).Seconds()
39+
if elasped < 1 {
4040
return
4141
}
4242
fmt.Printf("[Chan: %d] Done %d requests in time: %f QPS: %d\n",

dgraph/cmd/alpha/dist/index.js

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

dgraph/cmd/alpha/dist/index.js.LICENSE.txt

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,6 @@
2525
* MIT Licensed
2626
*/
2727

28-
/*!
29-
* connect-timeout
30-
* Copyright(c) 2014 Jonathan Ong
31-
* Copyright(c) 2014-2015 Douglas Christopher Wilson
32-
* MIT Licensed
33-
*/
34-
3528
/*!
3629
* content-disposition
3730
* Copyright(c) 2014-2017 Douglas Christopher Wilson
@@ -196,12 +189,6 @@
196189
* MIT Licensed
197190
*/
198191

199-
/*!
200-
* on-headers
201-
* Copyright(c) 2014 Douglas Christopher Wilson
202-
* MIT Licensed
203-
*/
204-
205192
/*!
206193
* parseurl
207194
* Copyright(c) 2014 Jonathan Ong

dgraph/cmd/alpha/run.go

Lines changed: 69 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import (
2626
"math"
2727
"net"
2828
"net/http"
29-
_ "net/http/pprof" // http profiler
29+
_ "net/http/pprof" // http profile
3030
"net/url"
3131
"os"
3232
"os/exec"
@@ -243,6 +243,8 @@ they form a Raft group and provide synchronous replication.
243243
"Number of JS lambda servers to be launched by alpha.").
244244
Flag("port",
245245
"The starting port at which the lambda server listens.").
246+
Flag("restart-after",
247+
"Restarts the lambda server after given duration of unresponsiveness").
246248
String())
247249

248250
flag.String("cdc", worker.CDCDefaults, z.NewSuperFlagHelp(worker.CDCDefaults).
@@ -468,11 +470,7 @@ func setupLambdaServer(closer *z.Closer) {
468470
return
469471
}
470472

471-
glog.Infoln("Setting up lambda servers")
472-
dgraphUrl := fmt.Sprintf("http://localhost:%d", httpPort())
473-
// Entry point of the script is index.js.
474-
filename := filepath.Join(x.WorkerConfig.TmpDir, "index.js")
475-
473+
// Copy over all the embedded files to actual files.
476474
dir := "dist"
477475
files, err := jsLambda.ReadDir(dir)
478476
x.Check(err)
@@ -488,28 +486,84 @@ func setupLambdaServer(closer *z.Closer) {
488486
x.Check(file.Close())
489487
}
490488

489+
type lambda struct {
490+
cmd *exec.Cmd
491+
active bool
492+
lastActive int64
493+
health string
494+
port int
495+
}
496+
497+
lambdas := make([]*lambda, 0, num)
491498
for i := 0; i < num; i++ {
499+
lambdas = append(lambdas, &lambda{
500+
port: port + i,
501+
health: fmt.Sprintf("http://127.0.0.1:%d/health", port+i),
502+
})
503+
}
504+
505+
// Entry point of the script is index.js.
506+
filename := filepath.Join(x.WorkerConfig.TmpDir, "index.js")
507+
dgraphUrl := fmt.Sprintf("http://127.0.0.1:%d", httpPort())
508+
509+
glog.Infoln("Setting up lambda servers")
510+
for i := range lambdas {
492511
go func(i int) {
493512
for {
494513
select {
495514
case <-closer.HasBeenClosed():
496-
break
515+
return
497516
default:
498517
cmd := exec.CommandContext(closer.Ctx(), "node", filename)
499-
cmd.Env = append(cmd.Env, fmt.Sprintf("PORT=%d", port+i))
518+
cmd.Env = append(cmd.Env, fmt.Sprintf("PORT=%d", lambdas[i].port))
500519
cmd.Env = append(cmd.Env, fmt.Sprintf("DGRAPH_URL="+dgraphUrl))
501520
cmd.Stdout = os.Stdout
502521
cmd.Stderr = os.Stderr
522+
lambdas[i].cmd = cmd
523+
lambdas[i].lastActive = time.Now().UnixNano()
524+
lambdas[i].active = true
503525
glog.Infof("Running node command: %+v\n", cmd)
504-
err := cmd.Run()
505-
if err != nil {
506-
glog.Errorf("Lambda server idx: %d stopped with error %v", i, err)
526+
if err := cmd.Run(); err != nil {
527+
glog.Errorf("Lambda server at port: %d stopped with error: %v",
528+
lambdas[i].port, err)
507529
}
508530
time.Sleep(2 * time.Second)
509531
}
510532
}
511533
}(i)
512534
}
535+
536+
// Monitor the lambda servers. If the server is unresponsive for more than restart-after time,
537+
// restart it.
538+
client := http.Client{Timeout: 1 * time.Second}
539+
go func() {
540+
ticker := time.NewTicker(2 * time.Second)
541+
defer ticker.Stop()
542+
for {
543+
select {
544+
case <-closer.HasBeenClosed():
545+
return
546+
case <-ticker.C:
547+
timestamp := time.Now().UnixNano()
548+
for _, l := range lambdas {
549+
if !l.active {
550+
continue
551+
}
552+
resp, err := client.Get(l.health)
553+
if err != nil || resp.StatusCode != 200 {
554+
if time.Duration(timestamp-l.lastActive) > x.Config.Lambda.RestartAfter {
555+
glog.Warningf("Lambda Server at port: %d not responding."+
556+
" Killed it with err: %v", l.port, l.cmd.Process.Kill())
557+
l.active = false
558+
}
559+
continue
560+
}
561+
resp.Body.Close()
562+
l.lastActive = timestamp
563+
}
564+
}
565+
}
566+
}()
513567
}
514568

515569
func serveGRPC(l net.Listener, tlsCfg *tls.Config, closer *z.Closer) {
@@ -809,9 +863,10 @@ func run() {
809863
lambda := z.NewSuperFlag(Alpha.Conf.GetString("lambda")).MergeAndCheckDefault(
810864
worker.LambdaDefaults)
811865
x.Config.Lambda = x.LambdaOptions{
812-
Url: lambda.GetString("url"),
813-
Num: lambda.GetUint32("num"),
814-
Port: lambda.GetUint32("port"),
866+
Url: lambda.GetString("url"),
867+
Num: lambda.GetUint32("num"),
868+
Port: lambda.GetUint32("port"),
869+
RestartAfter: lambda.GetDuration("restart-after"),
815870
}
816871
if x.Config.Lambda.Url != "" {
817872
graphqlLambdaUrl, err := url.Parse(x.Config.Lambda.Url)

dgraph/cmd/alpha/run_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ func processToFastJSON(q string) string {
8080
log.Fatal(err)
8181
}
8282

83-
buf, _, err := query.ToJson(context.Background(), &l, qr.Subgraphs, nil)
83+
buf, err := query.ToJson(context.Background(), &l, qr.Subgraphs, nil)
8484
if err != nil {
8585
log.Fatal(err)
8686
}

edgraph/server.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1564,7 +1564,6 @@ func processQuery(ctx context.Context, qc *queryContext) (*api.Response, error)
15641564
return resp, errors.Wrap(err, "")
15651565
}
15661566

1567-
var logs []string
15681567
if len(er.SchemaNode) > 0 || len(er.Types) > 0 {
15691568
if err = authorizeSchemaQuery(ctx, &er); err != nil {
15701569
return resp, err
@@ -1587,7 +1586,7 @@ func processQuery(ctx context.Context, qc *queryContext) (*api.Response, error)
15871586
} else if qc.req.RespFormat == api.Request_RDF {
15881587
resp.Rdf, err = query.ToRDF(qc.latency, er.Subgraphs)
15891588
} else {
1590-
resp.Json, logs, err = query.ToJson(ctx, qc.latency, er.Subgraphs, qc.gqlField)
1589+
resp.Json, err = query.ToJson(ctx, qc.latency, er.Subgraphs, qc.gqlField)
15911590
}
15921591
// if err is just some error from GraphQL encoding, then we need to continue the normal
15931592
// execution ignoring the error as we still need to assign metrics and latency info to resp.
@@ -1642,7 +1641,6 @@ func processQuery(ctx context.Context, qc *queryContext) (*api.Response, error)
16421641

16431642
resp.Metrics = &api.Metrics{
16441643
NumUids: er.Metrics,
1645-
Logs: logs,
16461644
}
16471645

16481646
var total uint64

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ require (
2020
github.com/blevesearch/bleve v1.0.13
2121
github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd
2222
github.com/dgraph-io/badger/v3 v3.0.0-20210825061050-c2b23c471f5e
23-
github.com/dgraph-io/dgo/v210 v210.0.0-20210825123656-d3f867fe9cc3
23+
github.com/dgraph-io/dgo/v210 v210.0.0-20210421093152-78a2fece3ebd
2424
github.com/dgraph-io/gqlgen v0.13.2
2525
github.com/dgraph-io/gqlparser/v2 v2.2.1
2626
github.com/dgraph-io/graphql-transport-ws v0.0.0-20210511143556-2cef522f1f15

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -168,8 +168,8 @@ github.com/dgraph-io/badger v1.6.0 h1:DshxFxZWXUcO0xX476VJC07Xsr6ZCBVRHKZ93Oh7Ev
168168
github.com/dgraph-io/badger v1.6.0/go.mod h1:zwt7syl517jmP8s94KqSxTlM6IMsdhYy6psNgSztDR4=
169169
github.com/dgraph-io/badger/v3 v3.0.0-20210825061050-c2b23c471f5e h1:lugmhvI1tMal0wKW0g5uxIRHUqXpE5y1lgq/vm/UP/8=
170170
github.com/dgraph-io/badger/v3 v3.0.0-20210825061050-c2b23c471f5e/go.mod h1:dULbq6ehJ5K0cGW/1TQ9iSfUk0gbSiToDWmWmTsJ53E=
171-
github.com/dgraph-io/dgo/v210 v210.0.0-20210825123656-d3f867fe9cc3 h1:/S7Dett03h3+KWRenJeuKE/1jZv76MaB9C1mbR/1Tns=
172-
github.com/dgraph-io/dgo/v210 v210.0.0-20210825123656-d3f867fe9cc3/go.mod h1:dCzdThGGTPYOAuNtrM6BiXj/86voHn7ZzkPL6noXR3s=
171+
github.com/dgraph-io/dgo/v210 v210.0.0-20210421093152-78a2fece3ebd h1:bKck5FnruuJxL1oCmrDSYWRl634IxBwL/IwwWx4UgEM=
172+
github.com/dgraph-io/dgo/v210 v210.0.0-20210421093152-78a2fece3ebd/go.mod h1:dCzdThGGTPYOAuNtrM6BiXj/86voHn7ZzkPL6noXR3s=
173173
github.com/dgraph-io/gqlgen v0.13.2 h1:TNhndk+eHKj5qE7BenKKSYdSIdOGhLqxR1rCiMso9KM=
174174
github.com/dgraph-io/gqlgen v0.13.2/go.mod h1:iCOrOv9lngN7KAo+jMgvUPVDlYHdf7qDwsTkQby2Sis=
175175
github.com/dgraph-io/gqlparser/v2 v2.1.1/go.mod h1:MYS4jppjyx8b9tuUtjV7jU1UFZK6P9fvO8TsIsQtRKU=

0 commit comments

Comments
 (0)