Skip to content

Commit 1cca801

Browse files
committed
Stream Raft Messages and Fix Check Quorum (#3138)
Instead of sending the Raft messages, one message per gRPC call, this PR creates a one-way stream between the sender and the receiver. Each messages gets pushed to a channel. We use smart batching to pick up as many messages as we can and send them over the stream in order. If we see connection issues etc., there are mechanisms in place to recreate the stream. Another issue I saw was related to Zero being unable to maintain quorum. It was because of an unbuffered channel in checkQuorum asking for read index, which didn't allow multiple requests to be pushed into one batch causing check quorum to fail even with one second timeout. After allocating a buffered channel, all the check quorum requests finish within a millisecond, rarely going above 7ms in my tests. Changes: * Stream raft messages instead of sending them one by one. * Set duration to 10s * Zero checkQuorum works well now * Martin's comments * Adjust timeouts in contexts, so the deeper one has shorter timeout and the outer one has longer. * Batch up multiple Raft messages from channel and send them in one request.
1 parent 90b2260 commit 1cca801

File tree

5 files changed

+428
-311
lines changed

5 files changed

+428
-311
lines changed

conn/node.go

Lines changed: 109 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"math/rand"
2626
"strings"
2727
"sync"
28+
"sync/atomic"
2829
"time"
2930

3031
"github.com/dgraph-io/badger/y"
@@ -35,6 +36,7 @@ import (
3536
"github.com/golang/glog"
3637
"go.etcd.io/etcd/raft"
3738
"go.etcd.io/etcd/raft/raftpb"
39+
otrace "go.opencensus.io/trace"
3840
"golang.org/x/net/context"
3941
)
4042

@@ -143,7 +145,7 @@ func NewNode(rc *pb.RaftContext, store *raftwal.DiskStorage) *Node {
143145
confChanges: make(map[uint64]chan error),
144146
messages: make(chan sendmsg, 100),
145147
peers: make(map[uint64]string),
146-
requestCh: make(chan linReadReq),
148+
requestCh: make(chan linReadReq, 100),
147149
}
148150
n.Applied.Init()
149151
// This should match up to the Applied index set above.
@@ -301,12 +303,18 @@ func (n *Node) PastLife() (uint64, bool, error) {
301303
}
302304

303305
const (
304-
messageBatchSoftLimit = 10000000
306+
messageBatchSoftLimit = 10e6
305307
)
306308

309+
type Stream struct {
310+
msgCh chan []byte
311+
alive int32
312+
}
313+
307314
func (n *Node) BatchAndSendMessages() {
308315
batches := make(map[uint64]*bytes.Buffer)
309-
failedConn := make(map[uint64]bool)
316+
streams := make(map[uint64]*Stream)
317+
310318
for {
311319
totalSize := 0
312320
sm := <-n.messages
@@ -342,59 +350,106 @@ func (n *Node) BatchAndSendMessages() {
342350
if buf.Len() == 0 {
343351
continue
344352
}
345-
346-
addr, has := n.Peer(to)
347-
pool, err := Get().Get(addr)
348-
if !has || err != nil {
349-
if exists := failedConn[to]; !exists {
350-
// So that we print error only the first time we are not able to connect.
351-
// Otherwise, the log is polluted with multiple errors.
352-
glog.Warningf("No healthy connection to node Id: %#x addr: [%s], err: %v\n",
353-
to, addr, err)
354-
failedConn[to] = true
353+
stream, ok := streams[to]
354+
if !ok || atomic.LoadInt32(&stream.alive) <= 0 {
355+
stream = &Stream{
356+
msgCh: make(chan []byte, 100),
357+
alive: 1,
355358
}
356-
continue
359+
go n.streamMessages(to, stream)
360+
streams[to] = stream
357361
}
358-
359-
failedConn[to] = false
360362
data := make([]byte, buf.Len())
361363
copy(data, buf.Bytes())
362-
go n.doSendMessage(to, pool, data)
363364
buf.Reset()
365+
366+
select {
367+
case stream.msgCh <- data:
368+
default:
369+
}
364370
}
365371
}
366372
}
367373

368-
func (n *Node) doSendMessage(to uint64, pool *Pool, data []byte) {
369-
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
370-
defer cancel()
371-
372-
client := pool.Get()
373-
374-
c := pb.NewRaftClient(client)
375-
p := &api.Payload{Data: data}
376-
batch := &pb.RaftBatch{
377-
Context: n.RaftContext,
378-
Payload: p,
374+
func (n *Node) streamMessages(to uint64, stream *Stream) {
375+
defer atomic.StoreInt32(&stream.alive, 0)
376+
377+
const dur = 10 * time.Second
378+
deadline := time.Now().Add(dur)
379+
var lastLog time.Time
380+
// Exit after a thousand tries or at least 10s. Let BatchAndSendMessages create another
381+
// goroutine, if needed.
382+
for i := 0; ; i++ {
383+
if err := n.doSendMessage(to, stream.msgCh); err != nil {
384+
// Update lastLog so we print error only a few times if we are not able to connect.
385+
// Otherwise, the log is polluted with repeated errors.
386+
if time.Since(lastLog) > dur {
387+
glog.Warningf("Unable to send message to peer: %#x. Error: %v", to, err)
388+
}
389+
lastLog = time.Now()
390+
}
391+
if i >= 1e3 {
392+
if time.Now().After(deadline) {
393+
return
394+
}
395+
i = 0
396+
}
379397
}
398+
}
380399

381-
// We don't need to run this in a goroutine, because doSendMessage is
382-
// already being run in one.
383-
_, err := c.RaftMessage(ctx, batch)
400+
func (n *Node) doSendMessage(to uint64, msgCh chan []byte) error {
401+
addr, has := n.Peer(to)
402+
if !has {
403+
return x.Errorf("Do not have address of peer %#x", to)
404+
}
405+
pool, err := Get().Get(addr)
384406
if err != nil {
385-
switch {
386-
case strings.Contains(err.Error(), "TransientFailure"):
387-
glog.Warningf("Reporting node: %d addr: %s as unreachable.", to, pool.Addr)
388-
n.Raft().ReportUnreachable(to)
389-
pool.SetUnhealthy()
390-
default:
391-
glog.V(3).Infof("Error while sending Raft message to node with addr: %s, err: %v\n",
392-
pool.Addr, err)
407+
return err
408+
}
409+
c := pb.NewRaftClient(pool.Get())
410+
mc, err := c.RaftMessage(context.Background())
411+
if err != nil {
412+
return err
413+
}
414+
415+
slurp := func(batch *pb.RaftBatch) {
416+
for {
417+
if len(batch.Payload.Data) > messageBatchSoftLimit {
418+
return
419+
}
420+
select {
421+
case data := <-msgCh:
422+
batch.Payload.Data = append(batch.Payload.Data, data...)
423+
default:
424+
return
425+
}
426+
}
427+
}
428+
ctx := mc.Context()
429+
for {
430+
select {
431+
case data := <-msgCh:
432+
batch := &pb.RaftBatch{
433+
Context: n.RaftContext,
434+
Payload: &api.Payload{Data: data},
435+
}
436+
slurp(batch) // Pick up more entries from msgCh, if present.
437+
if err := mc.Send(batch); err != nil {
438+
switch {
439+
case strings.Contains(err.Error(), "TransientFailure"):
440+
glog.Warningf("Reporting node: %d addr: %s as unreachable.", to, pool.Addr)
441+
n.Raft().ReportUnreachable(to)
442+
pool.SetUnhealthy()
443+
default:
444+
}
445+
// We don't need to do anything if we receive any error while sending message.
446+
// RAFT would automatically retry.
447+
return err
448+
}
449+
case <-ctx.Done():
450+
return ctx.Err()
393451
}
394452
}
395-
// We don't need to do anything if we receive any error while sending message.
396-
// RAFT would automatically retry.
397-
return
398453
}
399454

400455
// Connects the node and makes its peerPool refer to the constructed pool and address
@@ -508,21 +563,29 @@ type linReadReq struct {
508563
var errReadIndex = x.Errorf("Cannot get linearized read (time expired or no configured leader)")
509564

510565
func (n *Node) WaitLinearizableRead(ctx context.Context) error {
511-
indexCh := make(chan uint64, 1)
566+
span := otrace.FromContext(ctx)
567+
span.Annotate(nil, "WaitLinearizableRead")
512568

569+
indexCh := make(chan uint64, 1)
513570
select {
514571
case n.requestCh <- linReadReq{indexCh: indexCh}:
572+
span.Annotate(nil, "Pushed to requestCh")
515573
case <-ctx.Done():
574+
span.Annotate(nil, "Context expired")
516575
return ctx.Err()
517576
}
518577

519578
select {
520579
case index := <-indexCh:
580+
span.Annotatef(nil, "Received index: %d", index)
521581
if index == 0 {
522582
return errReadIndex
523583
}
524-
return n.Applied.WaitForMark(ctx, index)
584+
err := n.Applied.WaitForMark(ctx, index)
585+
span.Annotatef(nil, "Error from Applied.WaitForMark: %v", err)
586+
return err
525587
case <-ctx.Done():
588+
span.Annotate(nil, "Context expired")
526589
return ctx.Err()
527590
}
528591
}
@@ -532,7 +595,7 @@ func (n *Node) RunReadIndexLoop(closer *y.Closer, readStateCh <-chan raft.ReadSt
532595
readIndex := func() (uint64, error) {
533596
// Read Request can get rejected then we would wait idefinitely on the channel
534597
// so have a timeout.
535-
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
598+
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
536599
defer cancel()
537600

538601
var activeRctx [8]byte
@@ -548,6 +611,7 @@ func (n *Node) RunReadIndexLoop(closer *y.Closer, readStateCh <-chan raft.ReadSt
548611
return 0, errors.New("Closer has been called")
549612
case rs := <-readStateCh:
550613
if !bytes.Equal(activeRctx[:], rs.RequestCtx) {
614+
glog.V(1).Infof("Read state: %x != requested %x", rs.RequestCtx, activeRctx[:])
551615
goto again
552616
}
553617
return rs.Index, nil

conn/raft_server.go

Lines changed: 43 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -187,48 +187,58 @@ func (w *RaftServer) JoinCluster(ctx context.Context,
187187
return &api.Payload{}, err
188188
}
189189

190-
func (w *RaftServer) RaftMessage(ctx context.Context,
191-
batch *pb.RaftBatch) (*api.Payload, error) {
190+
func (w *RaftServer) RaftMessage(server pb.Raft_RaftMessageServer) error {
191+
ctx := server.Context()
192192
if ctx.Err() != nil {
193-
return &api.Payload{}, ctx.Err()
193+
return ctx.Err()
194194
}
195195

196-
rc := batch.GetContext()
197-
if rc != nil {
198-
n := w.GetNode()
199-
if n == nil || n.Raft() == nil {
200-
return &api.Payload{}, ErrNoNode
201-
}
202-
n.Connect(rc.Id, rc.Addr)
196+
n := w.GetNode()
197+
if n == nil || n.Raft() == nil {
198+
return ErrNoNode
203199
}
204-
if batch.GetPayload() == nil {
205-
return &api.Payload{}, nil
206-
}
207-
data := batch.Payload.Data
208200
raft := w.GetNode().Raft()
209-
210-
for idx := 0; idx < len(data); {
211-
x.AssertTruef(len(data[idx:]) >= 4,
212-
"Slice left of size: %v. Expected at least 4.", len(data[idx:]))
213-
214-
sz := int(binary.LittleEndian.Uint32(data[idx : idx+4]))
215-
idx += 4
216-
msg := raftpb.Message{}
217-
if idx+sz > len(data) {
218-
return &api.Payload{}, x.Errorf(
219-
"Invalid query. Specified size %v overflows slice [%v,%v)\n",
220-
sz, idx, len(data))
201+
for loop := 1; ; loop++ {
202+
batch, err := server.Recv()
203+
if err != nil {
204+
return err
221205
}
222-
if err := msg.Unmarshal(data[idx : idx+sz]); err != nil {
223-
x.Check(err)
206+
if loop%1e6 == 0 {
207+
glog.V(2).Infof("%d messages received by %#x", loop, n.Id)
208+
}
209+
if loop == 1 {
210+
rc := batch.GetContext()
211+
if rc != nil {
212+
n.Connect(rc.Id, rc.Addr)
213+
}
224214
}
225-
// This should be done in order, and not via a goroutine.
226-
if err := raft.Step(ctx, msg); err != nil {
227-
return &api.Payload{}, err
215+
if batch.Payload == nil {
216+
continue
217+
}
218+
data := batch.Payload.Data
219+
220+
for idx := 0; idx < len(data); {
221+
x.AssertTruef(len(data[idx:]) >= 4,
222+
"Slice left of size: %v. Expected at least 4.", len(data[idx:]))
223+
224+
sz := int(binary.LittleEndian.Uint32(data[idx : idx+4]))
225+
idx += 4
226+
msg := raftpb.Message{}
227+
if idx+sz > len(data) {
228+
return x.Errorf(
229+
"Invalid query. Specified size %v overflows slice [%v,%v)\n",
230+
sz, idx, len(data))
231+
}
232+
if err := msg.Unmarshal(data[idx : idx+sz]); err != nil {
233+
x.Check(err)
234+
}
235+
// This should be done in order, and not via a goroutine.
236+
if err := raft.Step(ctx, msg); err != nil {
237+
return err
238+
}
239+
idx += sz
228240
}
229-
idx += sz
230241
}
231-
return &api.Payload{}, nil
232242
}
233243

234244
// Hello rpc call is used to check connection with other workers after worker

dgraph/cmd/zero/raft.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -523,24 +523,33 @@ func (n *node) updateZeroMembershipPeriodically(closer *y.Closer) {
523523
}
524524
}
525525

526+
var startOption = otrace.WithSampler(otrace.ProbabilitySampler(0.01))
527+
526528
func (n *node) checkQuorum(closer *y.Closer) {
527529
defer closer.Done()
528530
ticker := time.NewTicker(time.Second)
529531
defer ticker.Stop()
530532

531533
quorum := func() {
532-
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
534+
// Make this timeout 3x the timeout on RunReadIndexLoop.
535+
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
533536
defer cancel()
534537

538+
ctx, span := otrace.StartSpan(ctx, "Zero.checkQuorum", startOption)
539+
defer span.End()
540+
span.Annotatef(nil, "Node id: %d", n.Id)
541+
535542
if state, err := n.server.latestMembershipState(ctx); err == nil {
536543
n.mu.Lock()
537544
n.lastQuorum = time.Now()
538545
n.mu.Unlock()
539546
// Also do some connection cleanup.
540547
conn.Get().RemoveInvalid(state)
548+
span.Annotate(nil, "Updated lastQuorum")
541549

542550
} else if glog.V(1) {
543-
glog.Warningf("Zero node: %#x unable to reach quorum.", n.Id)
551+
span.Annotatef(nil, "Got error: %v", err)
552+
glog.Warningf("Zero node: %#x unable to reach quorum. Error: %v", n.Id, err)
544553
}
545554
}
546555

protos/pb.proto

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -386,10 +386,10 @@ message RaftBatch {
386386
}
387387

388388
service Raft {
389-
rpc Heartbeat (api.Payload) returns (stream api.Payload) {}
390-
rpc RaftMessage (RaftBatch) returns (api.Payload) {}
391-
rpc JoinCluster (RaftContext) returns (api.Payload) {}
392-
rpc IsPeer (RaftContext) returns (PeerResponse) {}
389+
rpc Heartbeat (api.Payload) returns (stream api.Payload) {}
390+
rpc RaftMessage (stream RaftBatch) returns (api.Payload) {}
391+
rpc JoinCluster (RaftContext) returns (api.Payload) {}
392+
rpc IsPeer (RaftContext) returns (PeerResponse) {}
393393
}
394394

395395
service Zero {

0 commit comments

Comments
 (0)