Skip to content

Commit 2e044e1

Browse files
authored
Make ReadIndex work safely. (#2469)
Fix a long-standing bug, where we were overwriting raftpb.Message.Context, before sending it over the wire. The understanding we had was that Context field was unused by raft library, but that was not the case. Specifically, MsgReadIndex was being sent as part of HeartBeat using the Context field. Changed the proto that we use for sending Raft message batches, so that it carries RaftContext directly, instead of making it part of the raftpb.Message. Turned linearizable reads back on for Zero, and for retrieving state from Zero. More context here: etcd-io/etcd#9893
1 parent eb3910c commit 2e044e1

File tree

7 files changed

+436
-228
lines changed

7 files changed

+436
-228
lines changed

conn/node.go

Lines changed: 25 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -331,10 +331,14 @@ func (n *Node) doSendMessage(pool *Pool, data []byte) {
331331

332332
c := intern.NewRaftClient(client)
333333
p := &api.Payload{Data: data}
334+
batch := &intern.RaftBatch{
335+
Context: n.RaftContext,
336+
Payload: p,
337+
}
334338

335339
ch := make(chan error, 1)
336340
go func() {
337-
_, err := c.RaftMessage(ctx, p)
341+
_, err := c.RaftMessage(ctx, batch)
338342
if err != nil {
339343
x.Printf("Error while sending message to node with addr: %s, err: %v\n", pool.Addr, err)
340344
}
@@ -464,6 +468,7 @@ var errReadIndex = x.Errorf("cannot get linearized read (time expired or no conf
464468

465469
func (n *Node) WaitLinearizableRead(ctx context.Context) error {
466470
indexCh := make(chan uint64, 1)
471+
467472
select {
468473
case n.requestCh <- linReadReq{indexCh: indexCh}:
469474
case <-ctx.Done():
@@ -489,7 +494,7 @@ func (n *Node) RunReadIndexLoop(closer *y.Closer, readStateCh <-chan raft.ReadSt
489494
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
490495
defer cancel()
491496

492-
activeRctx := make([]byte, 8)
497+
var activeRctx [8]byte
493498
x.Check2(n.Rand.Read(activeRctx[:]))
494499
if err := n.Raft().ReadIndex(ctx, activeRctx[:]); err != nil {
495500
x.Errorf("Error while trying to call ReadIndex: %v\n", err)
@@ -543,6 +548,7 @@ func (n *Node) RunReadIndexLoop(closer *y.Closer, readStateCh <-chan raft.ReadSt
543548
for _, req := range requests {
544549
req.indexCh <- index
545550
}
551+
break
546552
}
547553
requests = requests[:0]
548554
}
@@ -625,17 +631,10 @@ var (
625631
)
626632

627633
func (w *RaftServer) applyMessage(ctx context.Context, msg raftpb.Message) error {
628-
var rc intern.RaftContext
629-
x.Check(rc.Unmarshal(msg.Context))
630-
631634
node := w.GetNode()
632635
if node == nil || node.Raft() == nil {
633636
return errNoNode
634637
}
635-
if rc.Group != node.RaftContext.Group {
636-
return errNoNode
637-
}
638-
node.Connect(msg.From, rc.Addr)
639638

640639
c := make(chan error, 1)
641640
go func() { c <- node.Raft().Step(ctx, msg) }()
@@ -648,24 +647,33 @@ func (w *RaftServer) applyMessage(ctx context.Context, msg raftpb.Message) error
648647
}
649648
}
650649
func (w *RaftServer) RaftMessage(ctx context.Context,
651-
query *api.Payload) (*api.Payload, error) {
650+
batch *intern.RaftBatch) (*api.Payload, error) {
652651
if ctx.Err() != nil {
653652
return &api.Payload{}, ctx.Err()
654653
}
655654

656-
for idx := 0; idx < len(query.Data); {
657-
x.AssertTruef(len(query.Data[idx:]) >= 4,
658-
"Slice left of size: %v. Expected at least 4.", len(query.Data[idx:]))
655+
rc := batch.GetContext()
656+
if rc != nil {
657+
w.GetNode().Connect(rc.Id, rc.Addr)
658+
}
659+
if batch.GetPayload() == nil {
660+
return &api.Payload{}, nil
661+
}
662+
data := batch.Payload.Data
663+
664+
for idx := 0; idx < len(data); {
665+
x.AssertTruef(len(data[idx:]) >= 4,
666+
"Slice left of size: %v. Expected at least 4.", len(data[idx:]))
659667

660-
sz := int(binary.LittleEndian.Uint32(query.Data[idx : idx+4]))
668+
sz := int(binary.LittleEndian.Uint32(data[idx : idx+4]))
661669
idx += 4
662670
msg := raftpb.Message{}
663-
if idx+sz > len(query.Data) {
671+
if idx+sz > len(data) {
664672
return &api.Payload{}, x.Errorf(
665673
"Invalid query. Specified size %v overflows slice [%v,%v)\n",
666-
sz, idx, len(query.Data))
674+
sz, idx, len(data))
667675
}
668-
if err := msg.Unmarshal(query.Data[idx : idx+sz]); err != nil {
676+
if err := msg.Unmarshal(data[idx : idx+sz]); err != nil {
669677
x.Check(err)
670678
}
671679
if err := w.applyMessage(ctx, msg); err != nil {

dgraph/cmd/zero/http.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,13 @@ func (st *state) getState(w http.ResponseWriter, r *http.Request) {
138138
x.AddCorsHeaders(w)
139139
w.Header().Set("Content-Type", "application/json")
140140

141+
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
142+
defer cancel()
143+
if err := st.node.WaitLinearizableRead(ctx); err != nil {
144+
w.WriteHeader(http.StatusInternalServerError)
145+
x.SetStatus(w, x.Error, err.Error())
146+
return
147+
}
141148
mstate := st.zero.membershipState()
142149
if mstate == nil {
143150
x.SetStatus(w, x.ErrorNoData, "No membership state found.")

dgraph/cmd/zero/raft.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -526,8 +526,6 @@ func (n *node) Run() {
526526
var leader bool
527527
ticker := time.NewTicker(20 * time.Millisecond)
528528
defer ticker.Stop()
529-
rcBytes, err := n.RaftContext.Marshal()
530-
x.Check(err)
531529

532530
closer := y.NewCloser(4)
533531
// snapshot can cause select loop to block while deleting entries, so run
@@ -592,7 +590,6 @@ func (n *node) Run() {
592590
}
593591

594592
for _, msg := range rd.Messages {
595-
msg.Context = rcBytes
596593
n.Send(msg)
597594
}
598595
// Need to send membership state to dgraph nodes on leader change also.

dgraph/cmd/zero/zero.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -564,10 +564,8 @@ func (s *Server) Update(stream intern.Zero_UpdateServer) error {
564564
}
565565

566566
func (s *Server) latestMembershipState(ctx context.Context) (*intern.MembershipState, error) {
567-
// TODO: Bring lin read for Zero back, once Etcd folks can tell why ReadStates are not being
568-
// populated. NOTE: This is important to fix quickly.
569-
// if err := s.Node.WaitLinearizableRead(ctx); err != nil {
570-
// return nil, err
571-
// }
567+
if err := s.Node.WaitLinearizableRead(ctx); err != nil {
568+
return nil, err
569+
}
572570
return s.membershipState(), nil
573571
}

0 commit comments

Comments
 (0)