Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1405,7 +1405,15 @@ func stepLeader(r *raft, m pb.Message) error {
}
} else {
oldPaused := pr.IsPaused()
if pr.MaybeUpdate(m.Index) {
// We want to update our tracking if the response updates our
// matched index or if the response can move a probing peer back
// into StateReplicate (see heartbeat_rep_recovers_from_probing.txt
// for an example of the latter case).
// NB: the same does not make sense for StateSnapshot - if `m.Index`
// equals pr.Match we know we don't m.Index+1 in our log, so moving
// back to replicating state is not useful; besides pr.PendingSnapshot
// would prevent it.
if pr.MaybeUpdate(m.Index) || (pr.Match == m.Index && pr.State == tracker.StateProbe) {
switch {
case pr.State == tracker.StateProbe:
pr.BecomeReplicate()
Expand Down Expand Up @@ -1460,7 +1468,16 @@ func stepLeader(r *raft, m pb.Message) error {
// empty append, allowing it to recover from situations in which all the
// messages that filled up Inflights in the first place were dropped. Note
// also that the outgoing heartbeat already communicated the commit index.
if pr.Match < r.raftLog.lastIndex() {
//
// If the follower is fully caught up but also in StateProbe (as can happen
// if ReportUnreachable was called), we also want to send an append (it will
// be empty) to allow the follower to transition back to StateReplicate once
// it responds.
//
// Note that StateSnapshot typically satisfies pr.Match < lastIndex, but
// `pr.Paused()` is always true for StateSnapshot, so sendAppend is a
// no-op.
if pr.Match < r.raftLog.lastIndex() || pr.State == tracker.StateProbe {
r.sendAppend(m.From)
}

Expand Down
72 changes: 42 additions & 30 deletions raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2608,43 +2608,55 @@ func TestLeaderAppResp(t *testing.T) {
{3, true, 0, 3, 0, 0, 0}, // stale resp; no replies
{2, true, 0, 2, 1, 1, 0}, // denied resp; leader does not commit; decrease next and send probing msg
{2, false, 2, 4, 2, 2, 2}, // accept resp; leader commits; broadcast with commit index
{0, false, 0, 3, 0, 0, 0}, // ignore heartbeat replies
// Follower is StateProbing at 0, it sends MsgAppResp for 0 (which
// matches the pr.Match) so it is moved to StateReplicate and as many
// entries as possible are sent to it (1, 2, and 3). Correspondingly the
// Next is then 4 (an Entry at 4 does not exist, indicating the follower
// will be up to date should it process the emitted MsgApp).
{0, false, 0, 4, 1, 0, 0},
}

for i, tt := range tests {
// sm term is 1 after it becomes the leader.
// thus the last log term must be 1 to be committed.
sm := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
sm.raftLog = &raftLog{
storage: &MemoryStorage{ents: []pb.Entry{{}, {Index: 1, Term: 1}, {Index: 2, Term: 1}}},
unstable: unstable{offset: 3},
}
sm.becomeCandidate()
sm.becomeLeader()
sm.readMessages()
sm.Step(pb.Message{From: 2, Type: pb.MsgAppResp, Index: tt.index, Term: sm.Term, Reject: tt.reject, RejectHint: tt.index})
for _, tt := range tests {
t.Run("", func(t *testing.T) {
// sm term is 1 after it becomes the leader.
// thus the last log term must be 1 to be committed.
sm := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
sm.raftLog = &raftLog{
storage: &MemoryStorage{ents: []pb.Entry{{}, {Index: 1, Term: 1}, {Index: 2, Term: 1}}},
unstable: unstable{offset: 3},
}
sm.becomeCandidate()
sm.becomeLeader()
sm.readMessages()
require.NoError(t, sm.Step(
pb.Message{
From: 2,
Type: pb.MsgAppResp,
Index: tt.index,
Term: sm.Term,
Reject: tt.reject,
RejectHint: tt.index,
},
))

p := sm.prs.Progress[2]
if p.Match != tt.wmatch {
t.Errorf("#%d match = %d, want %d", i, p.Match, tt.wmatch)
}
if p.Next != tt.wnext {
t.Errorf("#%d next = %d, want %d", i, p.Next, tt.wnext)
}
p := sm.prs.Progress[2]
require.EqualValues(t, tt.wmatch, p.Match)
require.EqualValues(t, tt.wnext, p.Next)

msgs := sm.readMessages()
msgs := sm.readMessages()

if len(msgs) != tt.wmsgNum {
t.Errorf("#%d msgNum = %d, want %d", i, len(msgs), tt.wmsgNum)
}
for j, msg := range msgs {
if msg.Index != tt.windex {
t.Errorf("#%d.%d index = %d, want %d", i, j, msg.Index, tt.windex)
if len(msgs) != tt.wmsgNum {
require.Equal(t, len(msgs), tt.wmsgNum)
}
if msg.Commit != tt.wcommitted {
t.Errorf("#%d.%d commit = %d, want %d", i, j, msg.Commit, tt.wcommitted)
for _, msg := range msgs {
if msg.Index != tt.windex {
require.EqualValues(t, tt.windex, msg.Index, "%v", DescribeMessage(msg, nil))
}
if msg.Commit != tt.wcommitted {
require.EqualValues(t, tt.wcommitted, msg.Commit, "%v", DescribeMessage(msg, nil))
}
}
}
})
}
}

Expand Down
6 changes: 6 additions & 0 deletions rafttest/interaction_env_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,12 @@ func (env *InteractionEnv) Handle(t *testing.T, d datadriven.TestData) string {
// propose-conf-change 2 v1=true
// v5
err = env.handleProposeConfChange(t, d)
case "report-unreachable":
// Calls <1st>.ReportUnreachable(<2nd>).
//
// Example:
// report-unreachable 1 2
err = env.handleReportUnreachable(t, d)
default:
err = fmt.Errorf("unknown command")
}
Expand Down
30 changes: 30 additions & 0 deletions rafttest/interaction_env_handler_report_unreachable.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright 2023 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rafttest

import (
"errors"
"testing"

"github.com/cockroachdb/datadriven"
)

func (env *InteractionEnv) handleReportUnreachable(t *testing.T, d datadriven.TestData) error {
sl := nodeIdxs(t, d)
if len(sl) != 2 {
return errors.New("must specify exactly two node indexes: node on which to report, and reported node")
}
env.Nodes[sl[0]].ReportUnreachable(env.Nodes[sl[1]].Config.ID)
return nil
}
90 changes: 90 additions & 0 deletions testdata/heartbeat_resp_recovers_from_probing.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# This test checks that if a fully caught-up follower transitions
# into StateProbe (for example due to a call to ReportUnreachable), the
# leader will react to a subsequent heartbeat response from the probing
# follower by sending an empty MsgApp, the response of which restores
# StateReplicate for the follower. In other words, we don't end up in
# a stable state with a fully caught up follower in StateProbe.

# Turn off output during the setup of the test.
log-level none
----
ok

add-nodes 3 voters=(1,2,3) index=10
----
ok

campaign 1
----
ok

stabilize
----
ok (quiet)

log-level debug
----
ok

status 1
----
1: StateReplicate match=11 next=12
2: StateReplicate match=11 next=12
3: StateReplicate match=11 next=12

# On the first replica, report the second one as not reachable.
report-unreachable 1 2
----
DEBUG 1 failed to send message to 2 because it is unreachable [StateProbe match=11 next=12]

status 1
----
1: StateReplicate match=11 next=12
2: StateProbe match=11 next=12
3: StateReplicate match=11 next=12

tick-heartbeat 1
----
ok

# Heartbeat -> HeartbeatResp -> MsgApp -> MsgAppResp -> StateReplicate.
stabilize
----
> 1 handling Ready
Ready MustSync=false:
Messages:
1->2 MsgHeartbeat Term:1 Log:0/0 Commit:11
1->3 MsgHeartbeat Term:1 Log:0/0 Commit:11
> 2 receiving messages
1->2 MsgHeartbeat Term:1 Log:0/0 Commit:11
> 3 receiving messages
1->3 MsgHeartbeat Term:1 Log:0/0 Commit:11
> 2 handling Ready
Ready MustSync=false:
Messages:
2->1 MsgHeartbeatResp Term:1 Log:0/0
> 3 handling Ready
Ready MustSync=false:
Messages:
3->1 MsgHeartbeatResp Term:1 Log:0/0
> 1 receiving messages
2->1 MsgHeartbeatResp Term:1 Log:0/0
3->1 MsgHeartbeatResp Term:1 Log:0/0
> 1 handling Ready
Ready MustSync=false:
Messages:
1->2 MsgApp Term:1 Log:1/11 Commit:11
> 2 receiving messages
1->2 MsgApp Term:1 Log:1/11 Commit:11
> 2 handling Ready
Ready MustSync=false:
Messages:
2->1 MsgAppResp Term:1 Log:0/11
> 1 receiving messages
2->1 MsgAppResp Term:1 Log:0/11

status 1
----
1: StateReplicate match=11 next=12
2: StateReplicate match=11 next=12
3: StateReplicate match=11 next=12
2 changes: 1 addition & 1 deletion testdata/slow_follower_after_compaction.txt
Original file line number Diff line number Diff line change
Expand Up @@ -118,4 +118,4 @@ status 1
----
1: StateReplicate match=18 next=19
2: StateReplicate match=18 next=19
3: StateReplicate match=18 next=19
3: StateReplicate match=18 next=19