Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 52 additions & 12 deletions pkg/kv/kvprober/kvprober.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package kvprober
import (
"context"
"math/rand"
"strings"
"time"

"github.com/cockroachdb/cockroach/pkg/keys"
Expand Down Expand Up @@ -210,6 +211,27 @@ func (p *ProberOps) Write(key roachpb.Key) func(context.Context, *kv.Txn) error
}
}

// errorIsExpectedDuringNormalOperation filters out errors that may be returned
// during normal operation of CRDB.
//
// One such example is the `was permanently removed from the cluster at` error
// that is returned to the kvclient of decommissioned nodes. This error does not
// affect user traffic, since such traffic is drained off the node by the time it
// becomes decommissioned.
//
// Since such errors do not indicate a problem with CRDB, kvprober does not report
// them as an error in its metrics.
func errorIsExpectedDuringNormalOperation(err error) bool {
// Note that errors *other* than decommissioned status errors, such as
// `use of closed network connection`, happen *occasionally* on the kvclient
// of a decommissioned node. The full set of other errors is not known exactly,
// and the errors mostly lack structure. Since they happen rarely, and since
// the intended use of kvprober is to page on a sustained error rate, not a
// single error, we choose to only filter out `was permanently removed from
// the cluster at` errors.
return strings.Contains(err.Error(), "was permanently removed from the cluster at")
}

// validateKey returns an error if the key is not valid for use by the kvprober.
// This is a sanity check to ensure that the kvprober does not corrupt user data
// in the global keyspace or other system data in the local keyspace.
Expand Down Expand Up @@ -352,8 +374,12 @@ func (p *Prober) readProbeImpl(ctx context.Context, ops proberOpsI, txns proberT
return
}
if err != nil {
log.Health.Errorf(ctx, "can't make a plan: %v", err)
p.metrics.ProbePlanFailures.Inc(1)
if errorIsExpectedDuringNormalOperation(err) {
log.Health.Warningf(ctx, "making a plan failed with expected error: %v", err)
} else {
log.Health.Errorf(ctx, "can't make a plan: %v", err)
p.metrics.ProbePlanFailures.Inc(1)
}
return
}

Expand Down Expand Up @@ -383,9 +409,13 @@ func (p *Prober) readProbeImpl(ctx context.Context, ops proberOpsI, txns proberT
return txns.TxnRootKV(ctx, f)
})
if err != nil {
// TODO(josh): Write structured events with log.Structured.
log.Health.Errorf(ctx, "kv.Get(%s), r=%v failed with: %v", step.Key, step.RangeID, err)
p.metrics.ReadProbeFailures.Inc(1)
if errorIsExpectedDuringNormalOperation(err) {
log.Health.Warningf(ctx, "kv.Get(%s), r=%v failed with expected error: %v", step.Key, step.RangeID, err)
} else {
// TODO(josh): Write structured events with log.Structured.
log.Health.Errorf(ctx, "kv.Get(%s), r=%v failed with: %v", step.Key, step.RangeID, err)
p.metrics.ReadProbeFailures.Inc(1)
}
return
}

Expand Down Expand Up @@ -415,8 +445,12 @@ func (p *Prober) writeProbeImpl(ctx context.Context, ops proberOpsI, txns prober
return
}
if err != nil {
log.Health.Errorf(ctx, "can't make a plan: %v", err)
p.metrics.ProbePlanFailures.Inc(1)
if errorIsExpectedDuringNormalOperation(err) {
log.Health.Warningf(ctx, "making a plan failed with expected error: %v", err)
} else {
log.Health.Errorf(ctx, "can't make a plan: %v", err)
p.metrics.ProbePlanFailures.Inc(1)
}
return
}

Expand All @@ -435,11 +469,17 @@ func (p *Prober) writeProbeImpl(ctx context.Context, ops proberOpsI, txns prober
return txns.TxnRootKV(ctx, f)
})
if err != nil {
added := p.quarantineWritePool.maybeAdd(ctx, step)
log.Health.Errorf(
ctx, "kv.Txn(Put(%s); Del(-)), r=%v failed with: %v [quarantined=%t]", step.Key, step.RangeID, err, added,
)
p.metrics.WriteProbeFailures.Inc(1)
if errorIsExpectedDuringNormalOperation(err) {
log.Health.Warningf(
ctx, "kv.Txn(Put(%s); Del(-)), r=%v failed with expected error: %v", step.Key, step.RangeID, err,
)
} else {
added := p.quarantineWritePool.maybeAdd(ctx, step)
log.Health.Errorf(
ctx, "kv.Txn(Put(%s); Del(-)), r=%v failed with: %v [quarantined=%t]", step.Key, step.RangeID, err, added,
)
p.metrics.WriteProbeFailures.Inc(1)
}
return
}
// This will no-op if not in the quarantine pool.
Expand Down
74 changes: 74 additions & 0 deletions pkg/kv/kvprober/kvprober_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -78,6 +79,31 @@ func TestReadProbe(t *testing.T) {
require.Zero(t, p.Metrics().ReadProbeFailures.Count())
})

// Once a node is fully decommissioned, neither kvclient nor kvprober work from
// the node. This does not indicate a service health issue; it is expected behavior.
//
// This is not tested with an integration test, since the kvclient of a decommissioned
// node will occasionally return other errors. We choose not to filter those out for
// reasons given at errorIsExpectedDuringNormalOperation. As a result, an integration test
// would be flaky. We believe a unit test is sufficient, largely because the main risk
// in only having a unit test is false positive pages on SRE, due to changes in what errors
// are returned from the kvclient of a decommissioned node. Though false positive pages add
// ops load, they do not directly affect the customer experience.
t.Run("planning fails due to decommissioning but not counted as error", func(t *testing.T) {
m := &mock{
t: t,
read: true,
planErr: errors.New("n2 was permanently removed from the cluster at 2009-11-10 23:00:00 +0000 UTC m=+0.000000001; it is not allowed to rejoin the cluster"),
}
p := initTestProber(ctx, m)
p.readProbeImpl(ctx, m, m, m)

require.Equal(t, int64(1), p.Metrics().ProbePlanAttempts.Count())
require.Zero(t, p.Metrics().ReadProbeAttempts.Count())
require.Zero(t, p.Metrics().ProbePlanFailures.Count())
require.Zero(t, p.Metrics().ReadProbeFailures.Count())
})

t.Run("txn fails", func(t *testing.T) {
m := &mock{
t: t,
Expand Down Expand Up @@ -107,6 +133,22 @@ func TestReadProbe(t *testing.T) {
require.Zero(t, p.Metrics().ProbePlanFailures.Count())
require.Equal(t, int64(1), p.Metrics().ReadProbeFailures.Count())
})

// See comment above matching case in TestReadProbe regarding planning.
t.Run("read fails due to decommissioning but not counted as error", func(t *testing.T) {
m := &mock{
t: t,
read: true,
readErr: errors.New("n2 was permanently removed from the cluster at 2009-11-10 23:00:00 +0000 UTC m=+0.000000001; it is not allowed to rejoin the cluster"),
}
p := initTestProber(ctx, m)
p.readProbeImpl(ctx, m, m, m)

require.Equal(t, int64(1), p.Metrics().ProbePlanAttempts.Count())
require.Equal(t, int64(1), p.Metrics().ReadProbeAttempts.Count())
require.Zero(t, p.Metrics().ProbePlanFailures.Count())
require.Zero(t, p.Metrics().ReadProbeFailures.Count())
})
}

func TestWriteProbe(t *testing.T) {
Expand Down Expand Up @@ -163,6 +205,22 @@ func TestWriteProbe(t *testing.T) {
require.Zero(t, p.Metrics().WriteProbeFailures.Count())
})

// See comment above matching case in TestReadProbe regarding planning.
t.Run("planning fails due to decommissioning but not counted as error", func(t *testing.T) {
m := &mock{
t: t,
write: true,
planErr: errors.New("n2 was permanently removed from the cluster at 2009-11-10 23:00:00 +0000 UTC m=+0.000000001; it is not allowed to rejoin the cluster"),
}
p := initTestProber(ctx, m)
p.writeProbeImpl(ctx, m, m, m)

require.Equal(t, int64(1), p.Metrics().ProbePlanAttempts.Count())
require.Zero(t, p.Metrics().WriteProbeAttempts.Count())
require.Zero(t, p.Metrics().ProbePlanFailures.Count())
require.Zero(t, p.Metrics().WriteProbeFailures.Count())
})

t.Run("open txn fails", func(t *testing.T) {
m := &mock{
t: t,
Expand Down Expand Up @@ -192,6 +250,22 @@ func TestWriteProbe(t *testing.T) {
require.Zero(t, p.Metrics().ProbePlanFailures.Count())
require.Equal(t, int64(1), p.Metrics().WriteProbeFailures.Count())
})

// See comment above matching case in TestReadProbe regarding planning.
t.Run("write fails due to decommissioning but not counted as error", func(t *testing.T) {
m := &mock{
t: t,
write: true,
writeErr: errors.New("n2 was permanently removed from the cluster at 2009-11-10 23:00:00 +0000 UTC m=+0.000000001; it is not allowed to rejoin the cluster"),
}
p := initTestProber(ctx, m)
p.writeProbeImpl(ctx, m, m, m)

require.Equal(t, int64(1), p.Metrics().ProbePlanAttempts.Count())
require.Equal(t, int64(1), p.Metrics().WriteProbeAttempts.Count())
require.Zero(t, p.Metrics().ProbePlanFailures.Count())
require.Zero(t, p.Metrics().WriteProbeFailures.Count())
})
}

func initTestProber(ctx context.Context, m *mock) *Prober {
Expand Down