diff --git a/raft.go b/raft.go index d1048294..170d4902 100644 --- a/raft.go +++ b/raft.go @@ -1405,7 +1405,15 @@ func stepLeader(r *raft, m pb.Message) error { } } else { oldPaused := pr.IsPaused() - if pr.MaybeUpdate(m.Index) { + // We want to update our tracking if the response updates our + // matched index or if the response can move a probing peer back + // into StateReplicate (see heartbeat_rep_recovers_from_probing.txt + // for an example of the latter case). + // NB: the same does not make sense for StateSnapshot - if `m.Index` + // equals pr.Match we know we don't m.Index+1 in our log, so moving + // back to replicating state is not useful; besides pr.PendingSnapshot + // would prevent it. + if pr.MaybeUpdate(m.Index) || (pr.Match == m.Index && pr.State == tracker.StateProbe) { switch { case pr.State == tracker.StateProbe: pr.BecomeReplicate() @@ -1460,7 +1468,16 @@ func stepLeader(r *raft, m pb.Message) error { // empty append, allowing it to recover from situations in which all the // messages that filled up Inflights in the first place were dropped. Note // also that the outgoing heartbeat already communicated the commit index. - if pr.Match < r.raftLog.lastIndex() { + // + // If the follower is fully caught up but also in StateProbe (as can happen + // if ReportUnreachable was called), we also want to send an append (it will + // be empty) to allow the follower to transition back to StateReplicate once + // it responds. + // + // Note that StateSnapshot typically satisfies pr.Match < lastIndex, but + // `pr.Paused()` is always true for StateSnapshot, so sendAppend is a + // no-op. + if pr.Match < r.raftLog.lastIndex() || pr.State == tracker.StateProbe { r.sendAppend(m.From) } diff --git a/raft_test.go b/raft_test.go index 5637c4a4..9f8f4a45 100644 --- a/raft_test.go +++ b/raft_test.go @@ -2608,43 +2608,55 @@ func TestLeaderAppResp(t *testing.T) { {3, true, 0, 3, 0, 0, 0}, // stale resp; no replies {2, true, 0, 2, 1, 1, 0}, // denied resp; leader does not commit; decrease next and send probing msg {2, false, 2, 4, 2, 2, 2}, // accept resp; leader commits; broadcast with commit index - {0, false, 0, 3, 0, 0, 0}, // ignore heartbeat replies + // Follower is StateProbing at 0, it sends MsgAppResp for 0 (which + // matches the pr.Match) so it is moved to StateReplicate and as many + // entries as possible are sent to it (1, 2, and 3). Correspondingly the + // Next is then 4 (an Entry at 4 does not exist, indicating the follower + // will be up to date should it process the emitted MsgApp). + {0, false, 0, 4, 1, 0, 0}, } - for i, tt := range tests { - // sm term is 1 after it becomes the leader. - // thus the last log term must be 1 to be committed. - sm := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3))) - sm.raftLog = &raftLog{ - storage: &MemoryStorage{ents: []pb.Entry{{}, {Index: 1, Term: 1}, {Index: 2, Term: 1}}}, - unstable: unstable{offset: 3}, - } - sm.becomeCandidate() - sm.becomeLeader() - sm.readMessages() - sm.Step(pb.Message{From: 2, Type: pb.MsgAppResp, Index: tt.index, Term: sm.Term, Reject: tt.reject, RejectHint: tt.index}) + for _, tt := range tests { + t.Run("", func(t *testing.T) { + // sm term is 1 after it becomes the leader. + // thus the last log term must be 1 to be committed. + sm := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3))) + sm.raftLog = &raftLog{ + storage: &MemoryStorage{ents: []pb.Entry{{}, {Index: 1, Term: 1}, {Index: 2, Term: 1}}}, + unstable: unstable{offset: 3}, + } + sm.becomeCandidate() + sm.becomeLeader() + sm.readMessages() + require.NoError(t, sm.Step( + pb.Message{ + From: 2, + Type: pb.MsgAppResp, + Index: tt.index, + Term: sm.Term, + Reject: tt.reject, + RejectHint: tt.index, + }, + )) - p := sm.prs.Progress[2] - if p.Match != tt.wmatch { - t.Errorf("#%d match = %d, want %d", i, p.Match, tt.wmatch) - } - if p.Next != tt.wnext { - t.Errorf("#%d next = %d, want %d", i, p.Next, tt.wnext) - } + p := sm.prs.Progress[2] + require.EqualValues(t, tt.wmatch, p.Match) + require.EqualValues(t, tt.wnext, p.Next) - msgs := sm.readMessages() + msgs := sm.readMessages() - if len(msgs) != tt.wmsgNum { - t.Errorf("#%d msgNum = %d, want %d", i, len(msgs), tt.wmsgNum) - } - for j, msg := range msgs { - if msg.Index != tt.windex { - t.Errorf("#%d.%d index = %d, want %d", i, j, msg.Index, tt.windex) + if len(msgs) != tt.wmsgNum { + require.Equal(t, len(msgs), tt.wmsgNum) } - if msg.Commit != tt.wcommitted { - t.Errorf("#%d.%d commit = %d, want %d", i, j, msg.Commit, tt.wcommitted) + for _, msg := range msgs { + if msg.Index != tt.windex { + require.EqualValues(t, tt.windex, msg.Index, "%v", DescribeMessage(msg, nil)) + } + if msg.Commit != tt.wcommitted { + require.EqualValues(t, tt.wcommitted, msg.Commit, "%v", DescribeMessage(msg, nil)) + } } - } + }) } } diff --git a/rafttest/interaction_env_handler.go b/rafttest/interaction_env_handler.go index 9f95bc12..74845fd1 100644 --- a/rafttest/interaction_env_handler.go +++ b/rafttest/interaction_env_handler.go @@ -157,6 +157,12 @@ func (env *InteractionEnv) Handle(t *testing.T, d datadriven.TestData) string { // propose-conf-change 2 v1=true // v5 err = env.handleProposeConfChange(t, d) + case "report-unreachable": + // Calls <1st>.ReportUnreachable(<2nd>). + // + // Example: + // report-unreachable 1 2 + err = env.handleReportUnreachable(t, d) default: err = fmt.Errorf("unknown command") } diff --git a/rafttest/interaction_env_handler_report_unreachable.go b/rafttest/interaction_env_handler_report_unreachable.go new file mode 100644 index 00000000..dce46ba2 --- /dev/null +++ b/rafttest/interaction_env_handler_report_unreachable.go @@ -0,0 +1,30 @@ +// Copyright 2023 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package rafttest + +import ( + "errors" + "testing" + + "github.com/cockroachdb/datadriven" +) + +func (env *InteractionEnv) handleReportUnreachable(t *testing.T, d datadriven.TestData) error { + sl := nodeIdxs(t, d) + if len(sl) != 2 { + return errors.New("must specify exactly two node indexes: node on which to report, and reported node") + } + env.Nodes[sl[0]].ReportUnreachable(env.Nodes[sl[1]].Config.ID) + return nil +} diff --git a/testdata/heartbeat_resp_recovers_from_probing.txt b/testdata/heartbeat_resp_recovers_from_probing.txt new file mode 100644 index 00000000..690f563a --- /dev/null +++ b/testdata/heartbeat_resp_recovers_from_probing.txt @@ -0,0 +1,90 @@ +# This test checks that if a fully caught-up follower transitions +# into StateProbe (for example due to a call to ReportUnreachable), the +# leader will react to a subsequent heartbeat response from the probing +# follower by sending an empty MsgApp, the response of which restores +# StateReplicate for the follower. In other words, we don't end up in +# a stable state with a fully caught up follower in StateProbe. + +# Turn off output during the setup of the test. +log-level none +---- +ok + +add-nodes 3 voters=(1,2,3) index=10 +---- +ok + +campaign 1 +---- +ok + +stabilize +---- +ok (quiet) + +log-level debug +---- +ok + +status 1 +---- +1: StateReplicate match=11 next=12 +2: StateReplicate match=11 next=12 +3: StateReplicate match=11 next=12 + +# On the first replica, report the second one as not reachable. +report-unreachable 1 2 +---- +DEBUG 1 failed to send message to 2 because it is unreachable [StateProbe match=11 next=12] + +status 1 +---- +1: StateReplicate match=11 next=12 +2: StateProbe match=11 next=12 +3: StateReplicate match=11 next=12 + +tick-heartbeat 1 +---- +ok + +# Heartbeat -> HeartbeatResp -> MsgApp -> MsgAppResp -> StateReplicate. +stabilize +---- +> 1 handling Ready + Ready MustSync=false: + Messages: + 1->2 MsgHeartbeat Term:1 Log:0/0 Commit:11 + 1->3 MsgHeartbeat Term:1 Log:0/0 Commit:11 +> 2 receiving messages + 1->2 MsgHeartbeat Term:1 Log:0/0 Commit:11 +> 3 receiving messages + 1->3 MsgHeartbeat Term:1 Log:0/0 Commit:11 +> 2 handling Ready + Ready MustSync=false: + Messages: + 2->1 MsgHeartbeatResp Term:1 Log:0/0 +> 3 handling Ready + Ready MustSync=false: + Messages: + 3->1 MsgHeartbeatResp Term:1 Log:0/0 +> 1 receiving messages + 2->1 MsgHeartbeatResp Term:1 Log:0/0 + 3->1 MsgHeartbeatResp Term:1 Log:0/0 +> 1 handling Ready + Ready MustSync=false: + Messages: + 1->2 MsgApp Term:1 Log:1/11 Commit:11 +> 2 receiving messages + 1->2 MsgApp Term:1 Log:1/11 Commit:11 +> 2 handling Ready + Ready MustSync=false: + Messages: + 2->1 MsgAppResp Term:1 Log:0/11 +> 1 receiving messages + 2->1 MsgAppResp Term:1 Log:0/11 + +status 1 +---- +1: StateReplicate match=11 next=12 +2: StateReplicate match=11 next=12 +3: StateReplicate match=11 next=12 diff --git a/testdata/slow_follower_after_compaction.txt b/testdata/slow_follower_after_compaction.txt index e0d7e2fa..0b169b78 100644 --- a/testdata/slow_follower_after_compaction.txt +++ b/testdata/slow_follower_after_compaction.txt @@ -118,4 +118,4 @@ status 1 ---- 1: StateReplicate match=18 next=19 2: StateReplicate match=18 next=19 -3: StateReplicate match=18 next=19 \ No newline at end of file +3: StateReplicate match=18 next=19