Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions log.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,32 @@ func (l *raftLog) restore(s pb.Snapshot) {
l.unstable.restore(s)
}

// scan visits all log entries in the [lo, hi) range, returning them via the
// given callback. The callback can be invoked multiple times, with consecutive
// sub-ranges of the requested range. Returns up to pageSize bytes worth of
// entries at a time. May return more if a single entry size exceeds the limit.
//
// The entries in [lo, hi) must exist, otherwise scan() eventually returns an
// error (possibly after passing some entries through the callback).
//
// If the callback returns an error, scan terminates and returns this error
// immediately. This can be used to stop the scan early ("break" the loop).
func (l *raftLog) scan(lo, hi uint64, pageSize entryEncodingSize, v func([]pb.Entry) error) error {
for lo < hi {
ents, err := l.slice(lo, hi, pageSize)
if err != nil {
return err
} else if len(ents) == 0 {
return fmt.Errorf("got 0 entries in [%d, %d)", lo, hi)
}
if err := v(ents); err != nil {
return err
}
lo += uint64(len(ents))
}
return nil
}

// slice returns a slice of log entries from lo through hi-1, inclusive.
func (l *raftLog) slice(lo, hi uint64, maxSize entryEncodingSize) ([]pb.Entry, error) {
err := l.mustCheckOutOfBounds(lo, hi)
Expand Down
58 changes: 58 additions & 0 deletions log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -948,6 +948,64 @@ func TestSlice(t *testing.T) {
}
}

func TestScan(t *testing.T) {
offset := uint64(47)
num := uint64(20)
last := offset + num
half := offset + num/2
entries := func(from, to uint64) []pb.Entry {
res := make([]pb.Entry, 0, to-from)
for i := from; i < to; i++ {
res = append(res, pb.Entry{Index: i, Term: i})
}
return res
}
entrySize := entsSize(entries(half, half+1))

storage := NewMemoryStorage()
require.NoError(t, storage.ApplySnapshot(pb.Snapshot{
Metadata: pb.SnapshotMetadata{Index: offset}}))
require.NoError(t, storage.Append(entries(offset+1, half)))
l := newLog(storage, raftLogger)
l.append(entries(half, last)...)

// Test that scan() returns the same entries as slice(), on all inputs.
for _, pageSize := range []entryEncodingSize{0, 1, 10, 100, entrySize, entrySize + 1} {
for lo := offset + 1; lo < last; lo++ {
for hi := lo; hi <= last; hi++ {
var got []pb.Entry
require.NoError(t, l.scan(lo, hi, pageSize, func(e []pb.Entry) error {
got = append(got, e...)
require.True(t, len(e) == 1 || entsSize(e) <= pageSize)
return nil
}))
want, err := l.slice(lo, hi, noLimit)
require.NoError(t, err)
require.Equal(t, want, got, "scan() and slice() mismatch on [%d, %d) @ %d", lo, hi, pageSize)
}
}
}

// Test that the callback error is propagated to the caller.
iters := 0
require.ErrorIs(t, errBreak, l.scan(offset+1, half, 0, func([]pb.Entry) error {
iters++
if iters == 2 {
return errBreak
}
return nil
}))
require.Equal(t, 2, iters)

// Test that we max out the limit, and not just always return a single entry.
// NB: this test works only because the requested range length is even.
require.NoError(t, l.scan(offset+1, offset+11, entrySize*2, func(ents []pb.Entry) error {
require.Len(t, ents, 2)
require.Equal(t, entrySize*2, entsSize(ents))
return nil
}))
}

func mustTerm(term uint64, err error) uint64 {
if err != nil {
panic(err)
Expand Down
49 changes: 33 additions & 16 deletions raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -916,19 +916,46 @@ func (r *raft) hup(t CampaignType) {
r.logger.Warningf("%x is unpromotable and can not campaign", r.id)
return
}
ents, err := r.raftLog.slice(r.raftLog.applied+1, r.raftLog.committed+1, noLimit)
if err != nil {
r.logger.Panicf("unexpected error getting unapplied entries (%v)", err)
}
if n := numOfPendingConf(ents); n != 0 && r.raftLog.committed > r.raftLog.applied {
r.logger.Warningf("%x cannot campaign at term %d since there are still %d pending configuration changes to apply", r.id, r.Term, n)
if r.hasUnappliedConfChanges() {
r.logger.Warningf("%x cannot campaign at term %d since there are still pending configuration changes to apply", r.id, r.Term)
return
}

r.logger.Infof("%x is starting a new election at term %d", r.id, r.Term)
r.campaign(t)
}

// errBreak is a sentinel error used to break a callback-based loop.
var errBreak = errors.New("break")

func (r *raft) hasUnappliedConfChanges() bool {
if r.raftLog.applied >= r.raftLog.committed { // in fact applied == committed
return false
}
found := false
// Scan all unapplied committed entries to find a config change. Paginate the
// scan, to avoid a potentially unlimited memory spike.
lo, hi := r.raftLog.applied+1, r.raftLog.committed+1
// Reuse the maxApplyingEntsSize limit because it is used for similar purposes
// (limiting the read of unapplied committed entries) when raft sends entries
// via the Ready struct for application.
// TODO(pavelkalinnikov): find a way to budget memory/bandwidth for this scan
// outside the raft package.
pageSize := r.raftLog.maxApplyingEntsSize
if err := r.raftLog.scan(lo, hi, pageSize, func(ents []pb.Entry) error {
for i := range ents {
if ents[i].Type == pb.EntryConfChange || ents[i].Type == pb.EntryConfChangeV2 {
found = true
return errBreak
}
}
return nil
}); err != nil && err != errBreak {
r.logger.Panicf("error scanning unapplied entries [%d, %d): %v", lo, hi, err)
}
return found
}

// campaign transitions the raft instance to candidate state. This must only be
// called after verifying that this is a legitimate transition.
func (r *raft) campaign(t CampaignType) {
Expand Down Expand Up @@ -1971,16 +1998,6 @@ func (r *raft) reduceUncommittedSize(s entryPayloadSize) {
}
}

func numOfPendingConf(ents []pb.Entry) int {
n := 0
for i := range ents {
if ents[i].Type == pb.EntryConfChange || ents[i].Type == pb.EntryConfChangeV2 {
n++
}
}
return n
}

func releasePendingReadIndexMessages(r *raft) {
if len(r.pendingReadIndexMessages) == 0 {
// Fast path for the common case to avoid a call to storage.LastIndex()
Expand Down
3 changes: 2 additions & 1 deletion tracker/inflights.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ func (in *Inflights) Count() int { return in.count }

// reset frees all inflights.
func (in *Inflights) reset() {
in.count = 0
in.start = 0
in.count = 0
in.bytes = 0
}
22 changes: 22 additions & 0 deletions tracker/inflights_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,28 @@ func TestInflightsFull(t *testing.T) {
}
}

func TestInflightsReset(t *testing.T) {
in := NewInflights(10, 1000)
// Imitate a semi-realistic flow during which the inflight tracker is
// periodically reset to empty. Byte usage must not "leak" across resets.
index := uint64(0)
for epoch := 0; epoch < 100; epoch++ {
in.reset()
// Add 5 messages. They should not max out the limit yet.
for i := 0; i < 5; i++ {
require.False(t, in.Full())
index++
in.Add(index, 16)
}
// Ack all but last 2 indices.
in.FreeLE(index - 2)
require.False(t, in.Full())
require.Equal(t, 2, in.Count())
}
in.FreeLE(index)
require.Equal(t, 0, in.Count())
}

func inflightsBuffer(indices []uint64, sizes []uint64) []inflight {
if len(indices) != len(sizes) {
panic("len(indices) != len(sizes)")
Expand Down