From 795031ecee31e2b79e97ee4e6b4efbdf26397c7e Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Wed, 17 May 2023 15:54:51 +0200 Subject: [PATCH 1/4] Fix missing newline This makes `go test -rewrite ./...` stable. Signed-off-by: Tobias Grieger --- testdata/slow_follower_after_compaction.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/testdata/slow_follower_after_compaction.txt b/testdata/slow_follower_after_compaction.txt index e0d7e2fa..0b169b78 100644 --- a/testdata/slow_follower_after_compaction.txt +++ b/testdata/slow_follower_after_compaction.txt @@ -118,4 +118,4 @@ status 1 ---- 1: StateReplicate match=18 next=19 2: StateReplicate match=18 next=19 -3: StateReplicate match=18 next=19 \ No newline at end of file +3: StateReplicate match=18 next=19 From 029dfee08d35a44cc1788688fa0a3fde73b37062 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Wed, 17 May 2023 19:21:01 +0200 Subject: [PATCH 2/4] Use t.Run in TestLeaderAppResp It will be touched in this PR. Signed-off-by: Tobias Grieger --- raft_test.go | 65 +++++++++++++++++++++++++++++----------------------- 1 file changed, 36 insertions(+), 29 deletions(-) diff --git a/raft_test.go b/raft_test.go index 5637c4a4..e50bab56 100644 --- a/raft_test.go +++ b/raft_test.go @@ -2611,40 +2611,47 @@ func TestLeaderAppResp(t *testing.T) { {0, false, 0, 3, 0, 0, 0}, // ignore heartbeat replies } - for i, tt := range tests { - // sm term is 1 after it becomes the leader. - // thus the last log term must be 1 to be committed. - sm := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3))) - sm.raftLog = &raftLog{ - storage: &MemoryStorage{ents: []pb.Entry{{}, {Index: 1, Term: 1}, {Index: 2, Term: 1}}}, - unstable: unstable{offset: 3}, - } - sm.becomeCandidate() - sm.becomeLeader() - sm.readMessages() - sm.Step(pb.Message{From: 2, Type: pb.MsgAppResp, Index: tt.index, Term: sm.Term, Reject: tt.reject, RejectHint: tt.index}) + for _, tt := range tests { + t.Run("", func(t *testing.T) { + // sm term is 1 after it becomes the leader. + // thus the last log term must be 1 to be committed. + sm := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3))) + sm.raftLog = &raftLog{ + storage: &MemoryStorage{ents: []pb.Entry{{}, {Index: 1, Term: 1}, {Index: 2, Term: 1}}}, + unstable: unstable{offset: 3}, + } + sm.becomeCandidate() + sm.becomeLeader() + sm.readMessages() + require.NoError(t, sm.Step( + pb.Message{ + From: 2, + Type: pb.MsgAppResp, + Index: tt.index, + Term: sm.Term, + Reject: tt.reject, + RejectHint: tt.index, + }, + )) - p := sm.prs.Progress[2] - if p.Match != tt.wmatch { - t.Errorf("#%d match = %d, want %d", i, p.Match, tt.wmatch) - } - if p.Next != tt.wnext { - t.Errorf("#%d next = %d, want %d", i, p.Next, tt.wnext) - } + p := sm.prs.Progress[2] + require.EqualValues(t, tt.wmatch, p.Match) + require.EqualValues(t, tt.wnext, p.Next) - msgs := sm.readMessages() + msgs := sm.readMessages() - if len(msgs) != tt.wmsgNum { - t.Errorf("#%d msgNum = %d, want %d", i, len(msgs), tt.wmsgNum) - } - for j, msg := range msgs { - if msg.Index != tt.windex { - t.Errorf("#%d.%d index = %d, want %d", i, j, msg.Index, tt.windex) + if len(msgs) != tt.wmsgNum { + require.Equal(t, len(msgs), tt.wmsgNum) } - if msg.Commit != tt.wcommitted { - t.Errorf("#%d.%d commit = %d, want %d", i, j, msg.Commit, tt.wcommitted) + for _, msg := range msgs { + if msg.Index != tt.windex { + require.EqualValues(t, tt.windex, msg.Index, "%v", DescribeMessage(msg, nil)) + } + if msg.Commit != tt.wcommitted { + require.EqualValues(t, tt.wcommitted, msg.Commit, "%v", DescribeMessage(msg, nil)) + } } - } + }) } } From 85e68cfce699583813964d9734c6c90ffb4041ad Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Wed, 17 May 2023 15:56:06 +0200 Subject: [PATCH 3/4] Add test documenting follower stuck in StateReplicate After a call to `ReportUnreachable`, a fully caught up follower would end up in StateReplicate and not leave it despite responding to heartbeats. This is a bug which is going to be fixed in a follow-up commit. Signed-off-by: Tobias Grieger --- rafttest/interaction_env_handler.go | 6 + ...eraction_env_handler_report_unreachable.go | 30 ++++ .../heartbeat_resp_recovers_from_probing.txt | 146 ++++++++++++++++++ 3 files changed, 182 insertions(+) create mode 100644 rafttest/interaction_env_handler_report_unreachable.go create mode 100644 testdata/heartbeat_resp_recovers_from_probing.txt diff --git a/rafttest/interaction_env_handler.go b/rafttest/interaction_env_handler.go index 9f95bc12..74845fd1 100644 --- a/rafttest/interaction_env_handler.go +++ b/rafttest/interaction_env_handler.go @@ -157,6 +157,12 @@ func (env *InteractionEnv) Handle(t *testing.T, d datadriven.TestData) string { // propose-conf-change 2 v1=true // v5 err = env.handleProposeConfChange(t, d) + case "report-unreachable": + // Calls <1st>.ReportUnreachable(<2nd>). + // + // Example: + // report-unreachable 1 2 + err = env.handleReportUnreachable(t, d) default: err = fmt.Errorf("unknown command") } diff --git a/rafttest/interaction_env_handler_report_unreachable.go b/rafttest/interaction_env_handler_report_unreachable.go new file mode 100644 index 00000000..dce46ba2 --- /dev/null +++ b/rafttest/interaction_env_handler_report_unreachable.go @@ -0,0 +1,30 @@ +// Copyright 2023 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package rafttest + +import ( + "errors" + "testing" + + "github.com/cockroachdb/datadriven" +) + +func (env *InteractionEnv) handleReportUnreachable(t *testing.T, d datadriven.TestData) error { + sl := nodeIdxs(t, d) + if len(sl) != 2 { + return errors.New("must specify exactly two node indexes: node on which to report, and reported node") + } + env.Nodes[sl[0]].ReportUnreachable(env.Nodes[sl[1]].Config.ID) + return nil +} diff --git a/testdata/heartbeat_resp_recovers_from_probing.txt b/testdata/heartbeat_resp_recovers_from_probing.txt new file mode 100644 index 00000000..8fb3ba05 --- /dev/null +++ b/testdata/heartbeat_resp_recovers_from_probing.txt @@ -0,0 +1,146 @@ +# This test explores what happens when a fully caught-up follower transitions +# into StateProbe (for example due to a call to ReportUnreachable). Ideally, it +# would transition back to StateReplicate via a heartbeat response, but it +# currently doesn't. + +# Turn off output during the setup of the test. +log-level none +---- +ok + +add-nodes 3 voters=(1,2,3) index=10 +---- +ok + +campaign 1 +---- +ok + +stabilize +---- +ok (quiet) + +log-level debug +---- +ok + +status 1 +---- +1: StateReplicate match=11 next=12 +2: StateReplicate match=11 next=12 +3: StateReplicate match=11 next=12 + +# On the first replica, report the second one as not reachable. +report-unreachable 1 2 +---- +DEBUG 1 failed to send message to 2 because it is unreachable [StateProbe match=11 next=12] + +status 1 +---- +1: StateReplicate match=11 next=12 +2: StateProbe match=11 next=12 +3: StateReplicate match=11 next=12 + +tick-heartbeat 1 +---- +ok + +stabilize +---- +> 1 handling Ready + Ready MustSync=false: + Messages: + 1->2 MsgHeartbeat Term:1 Log:0/0 Commit:11 + 1->3 MsgHeartbeat Term:1 Log:0/0 Commit:11 +> 2 receiving messages + 1->2 MsgHeartbeat Term:1 Log:0/0 Commit:11 +> 3 receiving messages + 1->3 MsgHeartbeat Term:1 Log:0/0 Commit:11 +> 2 handling Ready + Ready MustSync=false: + Messages: + 2->1 MsgHeartbeatResp Term:1 Log:0/0 +> 3 handling Ready + Ready MustSync=false: + Messages: + 3->1 MsgHeartbeatResp Term:1 Log:0/0 +> 1 receiving messages + 2->1 MsgHeartbeatResp Term:1 Log:0/0 + 3->1 MsgHeartbeatResp Term:1 Log:0/0 + +# Sad, replica is still tracked as probing. +status 1 +---- +1: StateReplicate match=11 next=12 +2: StateProbe match=11 next=12 +3: StateReplicate match=11 next=12 + +# Need to actually distribute a command to sort things out. +# This should not be necessary. +propose 1 foo +---- +ok + +stabilize +---- +> 1 handling Ready + Ready MustSync=true: + Entries: + 1/12 EntryNormal "foo" + Messages: + 1->2 MsgApp Term:1 Log:1/11 Commit:11 Entries:[1/12 EntryNormal "foo"] + 1->3 MsgApp Term:1 Log:1/11 Commit:11 Entries:[1/12 EntryNormal "foo"] +> 2 receiving messages + 1->2 MsgApp Term:1 Log:1/11 Commit:11 Entries:[1/12 EntryNormal "foo"] +> 3 receiving messages + 1->3 MsgApp Term:1 Log:1/11 Commit:11 Entries:[1/12 EntryNormal "foo"] +> 2 handling Ready + Ready MustSync=true: + Entries: + 1/12 EntryNormal "foo" + Messages: + 2->1 MsgAppResp Term:1 Log:0/12 +> 3 handling Ready + Ready MustSync=true: + Entries: + 1/12 EntryNormal "foo" + Messages: + 3->1 MsgAppResp Term:1 Log:0/12 +> 1 receiving messages + 2->1 MsgAppResp Term:1 Log:0/12 + 3->1 MsgAppResp Term:1 Log:0/12 +> 1 handling Ready + Ready MustSync=false: + HardState Term:1 Vote:1 Commit:12 + CommittedEntries: + 1/12 EntryNormal "foo" + Messages: + 1->2 MsgApp Term:1 Log:1/12 Commit:12 + 1->3 MsgApp Term:1 Log:1/12 Commit:12 +> 2 receiving messages + 1->2 MsgApp Term:1 Log:1/12 Commit:12 +> 3 receiving messages + 1->3 MsgApp Term:1 Log:1/12 Commit:12 +> 2 handling Ready + Ready MustSync=false: + HardState Term:1 Vote:1 Commit:12 + CommittedEntries: + 1/12 EntryNormal "foo" + Messages: + 2->1 MsgAppResp Term:1 Log:0/12 +> 3 handling Ready + Ready MustSync=false: + HardState Term:1 Vote:1 Commit:12 + CommittedEntries: + 1/12 EntryNormal "foo" + Messages: + 3->1 MsgAppResp Term:1 Log:0/12 +> 1 receiving messages + 2->1 MsgAppResp Term:1 Log:0/12 + 3->1 MsgAppResp Term:1 Log:0/12 + +status 1 +---- +1: StateReplicate match=12 next=13 +2: StateReplicate match=12 next=13 +3: StateReplicate match=12 next=13 From 49bac4871e5f9d8650c09beff47a96e2004b76cf Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Wed, 17 May 2023 16:14:04 +0200 Subject: [PATCH 4/4] Move from StatePause->StateReplicate on heartbeat response when possible See individual commits. Essentially, when a fully caught-up follower was reported unreachable, it'd transition to `StateProbe` but then wouldn't recover from that via heartbeats (once they resumed). This caused some issues in CRDB because we rely on the reported status to reason about the safety of leadership changes, etc. This PR makes it such that StateProbe resolves on its own: when the leader hears back from the follower via a heartbeat, it sends an empty MsgApp, and as response to this moves the follower back into StateProbe. Signed-off-by: Tobias Grieger --- raft.go | 21 ++++- raft_test.go | 7 +- .../heartbeat_resp_recovers_from_probing.txt | 84 ++++--------------- 3 files changed, 39 insertions(+), 73 deletions(-) diff --git a/raft.go b/raft.go index d1048294..170d4902 100644 --- a/raft.go +++ b/raft.go @@ -1405,7 +1405,15 @@ func stepLeader(r *raft, m pb.Message) error { } } else { oldPaused := pr.IsPaused() - if pr.MaybeUpdate(m.Index) { + // We want to update our tracking if the response updates our + // matched index or if the response can move a probing peer back + // into StateReplicate (see heartbeat_rep_recovers_from_probing.txt + // for an example of the latter case). + // NB: the same does not make sense for StateSnapshot - if `m.Index` + // equals pr.Match we know we don't m.Index+1 in our log, so moving + // back to replicating state is not useful; besides pr.PendingSnapshot + // would prevent it. + if pr.MaybeUpdate(m.Index) || (pr.Match == m.Index && pr.State == tracker.StateProbe) { switch { case pr.State == tracker.StateProbe: pr.BecomeReplicate() @@ -1460,7 +1468,16 @@ func stepLeader(r *raft, m pb.Message) error { // empty append, allowing it to recover from situations in which all the // messages that filled up Inflights in the first place were dropped. Note // also that the outgoing heartbeat already communicated the commit index. - if pr.Match < r.raftLog.lastIndex() { + // + // If the follower is fully caught up but also in StateProbe (as can happen + // if ReportUnreachable was called), we also want to send an append (it will + // be empty) to allow the follower to transition back to StateReplicate once + // it responds. + // + // Note that StateSnapshot typically satisfies pr.Match < lastIndex, but + // `pr.Paused()` is always true for StateSnapshot, so sendAppend is a + // no-op. + if pr.Match < r.raftLog.lastIndex() || pr.State == tracker.StateProbe { r.sendAppend(m.From) } diff --git a/raft_test.go b/raft_test.go index e50bab56..9f8f4a45 100644 --- a/raft_test.go +++ b/raft_test.go @@ -2608,7 +2608,12 @@ func TestLeaderAppResp(t *testing.T) { {3, true, 0, 3, 0, 0, 0}, // stale resp; no replies {2, true, 0, 2, 1, 1, 0}, // denied resp; leader does not commit; decrease next and send probing msg {2, false, 2, 4, 2, 2, 2}, // accept resp; leader commits; broadcast with commit index - {0, false, 0, 3, 0, 0, 0}, // ignore heartbeat replies + // Follower is StateProbing at 0, it sends MsgAppResp for 0 (which + // matches the pr.Match) so it is moved to StateReplicate and as many + // entries as possible are sent to it (1, 2, and 3). Correspondingly the + // Next is then 4 (an Entry at 4 does not exist, indicating the follower + // will be up to date should it process the emitted MsgApp). + {0, false, 0, 4, 1, 0, 0}, } for _, tt := range tests { diff --git a/testdata/heartbeat_resp_recovers_from_probing.txt b/testdata/heartbeat_resp_recovers_from_probing.txt index 8fb3ba05..690f563a 100644 --- a/testdata/heartbeat_resp_recovers_from_probing.txt +++ b/testdata/heartbeat_resp_recovers_from_probing.txt @@ -1,7 +1,9 @@ -# This test explores what happens when a fully caught-up follower transitions -# into StateProbe (for example due to a call to ReportUnreachable). Ideally, it -# would transition back to StateReplicate via a heartbeat response, but it -# currently doesn't. +# This test checks that if a fully caught-up follower transitions +# into StateProbe (for example due to a call to ReportUnreachable), the +# leader will react to a subsequent heartbeat response from the probing +# follower by sending an empty MsgApp, the response of which restores +# StateReplicate for the follower. In other words, we don't end up in +# a stable state with a fully caught up follower in StateProbe. # Turn off output during the setup of the test. log-level none @@ -45,6 +47,7 @@ tick-heartbeat 1 ---- ok +# Heartbeat -> HeartbeatResp -> MsgApp -> MsgAppResp -> StateReplicate. stabilize ---- > 1 handling Ready @@ -67,80 +70,21 @@ stabilize > 1 receiving messages 2->1 MsgHeartbeatResp Term:1 Log:0/0 3->1 MsgHeartbeatResp Term:1 Log:0/0 - -# Sad, replica is still tracked as probing. -status 1 ----- -1: StateReplicate match=11 next=12 -2: StateProbe match=11 next=12 -3: StateReplicate match=11 next=12 - -# Need to actually distribute a command to sort things out. -# This should not be necessary. -propose 1 foo ----- -ok - -stabilize ----- -> 1 handling Ready - Ready MustSync=true: - Entries: - 1/12 EntryNormal "foo" - Messages: - 1->2 MsgApp Term:1 Log:1/11 Commit:11 Entries:[1/12 EntryNormal "foo"] - 1->3 MsgApp Term:1 Log:1/11 Commit:11 Entries:[1/12 EntryNormal "foo"] -> 2 receiving messages - 1->2 MsgApp Term:1 Log:1/11 Commit:11 Entries:[1/12 EntryNormal "foo"] -> 3 receiving messages - 1->3 MsgApp Term:1 Log:1/11 Commit:11 Entries:[1/12 EntryNormal "foo"] -> 2 handling Ready - Ready MustSync=true: - Entries: - 1/12 EntryNormal "foo" - Messages: - 2->1 MsgAppResp Term:1 Log:0/12 -> 3 handling Ready - Ready MustSync=true: - Entries: - 1/12 EntryNormal "foo" - Messages: - 3->1 MsgAppResp Term:1 Log:0/12 -> 1 receiving messages - 2->1 MsgAppResp Term:1 Log:0/12 - 3->1 MsgAppResp Term:1 Log:0/12 > 1 handling Ready Ready MustSync=false: - HardState Term:1 Vote:1 Commit:12 - CommittedEntries: - 1/12 EntryNormal "foo" Messages: - 1->2 MsgApp Term:1 Log:1/12 Commit:12 - 1->3 MsgApp Term:1 Log:1/12 Commit:12 + 1->2 MsgApp Term:1 Log:1/11 Commit:11 > 2 receiving messages - 1->2 MsgApp Term:1 Log:1/12 Commit:12 -> 3 receiving messages - 1->3 MsgApp Term:1 Log:1/12 Commit:12 + 1->2 MsgApp Term:1 Log:1/11 Commit:11 > 2 handling Ready Ready MustSync=false: - HardState Term:1 Vote:1 Commit:12 - CommittedEntries: - 1/12 EntryNormal "foo" - Messages: - 2->1 MsgAppResp Term:1 Log:0/12 -> 3 handling Ready - Ready MustSync=false: - HardState Term:1 Vote:1 Commit:12 - CommittedEntries: - 1/12 EntryNormal "foo" Messages: - 3->1 MsgAppResp Term:1 Log:0/12 + 2->1 MsgAppResp Term:1 Log:0/11 > 1 receiving messages - 2->1 MsgAppResp Term:1 Log:0/12 - 3->1 MsgAppResp Term:1 Log:0/12 + 2->1 MsgAppResp Term:1 Log:0/11 status 1 ---- -1: StateReplicate match=12 next=13 -2: StateReplicate match=12 next=13 -3: StateReplicate match=12 next=13 +1: StateReplicate match=11 next=12 +2: StateReplicate match=11 next=12 +3: StateReplicate match=11 next=12