Skip to content

Commit a8568d2

Browse files
craig[bot]knz
andcommitted
Merge #84031
84031: rpc: fix the maxoffset check on the incoming path r=erikgrinaker a=knz Fixes #84017. Fixes #84027. ### Bug fix 1 Back in PR #34197 we mistakenly removed the .Offset field sent by each RPC heartbeat, through which the remote node monitors the current/client node's offset. This looks bad, but is actually somewhat innocuous. This is because we update the offset map and do the check on two situations: - when a server node gets pinged by a remote node using the remote-provided .Offset, - when receiving a ping response as client, using an estimate of the roundtrip latency between PingRequest-PingResponse. The bug above only invalidates the first check. The second check still occurs. Since all nodes are both client to another node, and server for the same other node, we still get a check both ways on all pairs of nodes. Nonetheless, the half-way broken check reduces robustness overall. So it's good to fix it. ### Bug fix 2 Release note (bug fix): CLI `cockroach` commands connecting to a remote server will not produce spurious "latency jump" warnings any more. This bug had been introduced in CockroachDB v21.2. Co-authored-by: Raphael 'kena' Poss <[email protected]>
2 parents 262dabe + 50c1196 commit a8568d2

File tree

3 files changed

+141
-30
lines changed

3 files changed

+141
-30
lines changed

pkg/rpc/context.go

Lines changed: 47 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -577,13 +577,19 @@ func NewContext(ctx context.Context, opts ContextOptions) *Context {
577577
breakerClock: breakerClock{
578578
clock: opts.Clock,
579579
},
580-
RemoteClocks: newRemoteClockMonitor(
581-
opts.Clock, opts.MaxOffset, 10*opts.Config.RPCHeartbeatInterval, opts.Config.HistogramWindowInterval()),
582580
rpcCompression: enableRPCCompression,
583581
MasterCtx: masterCtx,
584582
metrics: makeMetrics(),
585583
heartbeatTimeout: 2 * opts.Config.RPCHeartbeatInterval,
586584
}
585+
586+
// We only monitor remote clocks in server-to-server connections.
587+
// CLI commands are exempted.
588+
if !opts.ClientOnly {
589+
rpcCtx.RemoteClocks = newRemoteClockMonitor(
590+
opts.Clock, opts.MaxOffset, 10*opts.Config.RPCHeartbeatInterval, opts.Config.HistogramWindowInterval())
591+
}
592+
587593
if id := opts.Knobs.StorageClusterID; id != nil {
588594
rpcCtx.StorageClusterID.Set(masterCtx, *id)
589595
}
@@ -1666,6 +1672,16 @@ func (rpcCtx *Context) runHeartbeat(
16661672
maxOffset := rpcCtx.MaxOffset
16671673
maxOffsetNanos := maxOffset.Nanoseconds()
16681674

1675+
// The request object. Note that we keep the same object from
1676+
// heartbeat to heartbeat: we compute a new .Offset at the end of
1677+
// the current heartbeat as input to the next one.
1678+
request := &PingRequest{
1679+
OriginAddr: rpcCtx.Config.Addr,
1680+
OriginMaxOffsetNanos: maxOffsetNanos,
1681+
TargetNodeID: conn.remoteNodeID,
1682+
ServerVersion: rpcCtx.Settings.Version.BinaryVersion(),
1683+
}
1684+
16691685
heartbeatClient := NewHeartbeatClient(conn.grpcConn)
16701686

16711687
var heartbeatTimer timeutil.Timer
@@ -1692,16 +1708,10 @@ func (rpcCtx *Context) runHeartbeat(
16921708
}
16931709

16941710
if err := rpcCtx.Stopper.RunTaskWithErr(ctx, "rpc heartbeat", func(ctx context.Context) error {
1695-
// We re-mint the PingRequest to pick up any asynchronous update to clusterID.
1711+
// Pick up any asynchronous update to clusterID and NodeID.
16961712
clusterID := rpcCtx.StorageClusterID.Get()
1697-
request := &PingRequest{
1698-
OriginNodeID: rpcCtx.NodeID.Get(),
1699-
OriginAddr: rpcCtx.Config.Addr,
1700-
OriginMaxOffsetNanos: maxOffsetNanos,
1701-
ClusterID: &clusterID,
1702-
TargetNodeID: conn.remoteNodeID,
1703-
ServerVersion: rpcCtx.Settings.Version.BinaryVersion(),
1704-
}
1713+
request.ClusterID = &clusterID
1714+
request.OriginNodeID = rpcCtx.NodeID.Get()
17051715

17061716
interceptor := func(context.Context, *PingRequest) error { return nil }
17071717
if fn := rpcCtx.OnOutgoingPing; fn != nil {
@@ -1760,25 +1770,33 @@ func (rpcCtx *Context) runHeartbeat(
17601770

17611771
if err == nil {
17621772
everSucceeded = true
1763-
receiveTime := rpcCtx.Clock.Now()
1764-
1765-
// Only update the clock offset measurement if we actually got a
1766-
// successful response from the server.
1767-
pingDuration := receiveTime.Sub(sendTime)
1768-
if pingDuration > maximumPingDurationMult*rpcCtx.MaxOffset {
1769-
request.Offset.Reset()
1770-
} else {
1771-
// Offset and error are measured using the remote clock reading
1772-
// technique described in
1773-
// http://se.inf.tu-dresden.de/pubs/papers/SRDS1994.pdf, page 6.
1774-
// However, we assume that drift and min message delay are 0, for
1775-
// now.
1776-
request.Offset.MeasuredAt = receiveTime.UnixNano()
1777-
request.Offset.Uncertainty = (pingDuration / 2).Nanoseconds()
1778-
remoteTimeNow := timeutil.Unix(0, response.ServerTime).Add(pingDuration / 2)
1779-
request.Offset.Offset = remoteTimeNow.Sub(receiveTime).Nanoseconds()
1773+
1774+
// Only a server connecting to another server needs to check
1775+
// clock offsets. A CLI command does not need to update its
1776+
// local HLC, nor does it care that strictly about
1777+
// client-server latency, nor does it need to track the
1778+
// offsets.
1779+
if rpcCtx.RemoteClocks != nil {
1780+
receiveTime := rpcCtx.Clock.Now()
1781+
1782+
// Only update the clock offset measurement if we actually got a
1783+
// successful response from the server.
1784+
pingDuration := receiveTime.Sub(sendTime)
1785+
if pingDuration > maximumPingDurationMult*rpcCtx.MaxOffset {
1786+
request.Offset.Reset()
1787+
} else {
1788+
// Offset and error are measured using the remote clock reading
1789+
// technique described in
1790+
// http://se.inf.tu-dresden.de/pubs/papers/SRDS1994.pdf, page 6.
1791+
// However, we assume that drift and min message delay are 0, for
1792+
// now.
1793+
request.Offset.MeasuredAt = receiveTime.UnixNano()
1794+
request.Offset.Uncertainty = (pingDuration / 2).Nanoseconds()
1795+
remoteTimeNow := timeutil.Unix(0, response.ServerTime).Add(pingDuration / 2)
1796+
request.Offset.Offset = remoteTimeNow.Sub(receiveTime).Nanoseconds()
1797+
}
1798+
rpcCtx.RemoteClocks.UpdateOffset(ctx, target, request.Offset, pingDuration)
17801799
}
1781-
rpcCtx.RemoteClocks.UpdateOffset(ctx, target, request.Offset, pingDuration)
17821800

17831801
if cb := rpcCtx.HeartbeatCB; cb != nil {
17841802
cb()

pkg/rpc/context_test.go

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,96 @@ func TestPingInterceptors(t *testing.T) {
226226
}
227227
}
228228

229+
// TestClockOffsetInPingRequest ensures that all ping requests
230+
// after the first one have a non-zero offset.
231+
// (Regression test for issue #84027.)
232+
func TestClockOffsetInPingRequest(t *testing.T) {
233+
defer leaktest.AfterTest(t)()
234+
testClockOffsetInPingRequestInternal(t, false /* clientOnly */)
235+
}
236+
237+
// TestClockOffsetInclientPingRequest ensures that all ping requests
238+
// have a zero offset and there is no Remotelocks to update.
239+
// (Regression test for issue #84017.)
240+
func TestClockOffsetInClientPingRequest(t *testing.T) {
241+
defer leaktest.AfterTest(t)()
242+
testClockOffsetInPingRequestInternal(t, true /* clientOnly */)
243+
}
244+
245+
func testClockOffsetInPingRequestInternal(t *testing.T, clientOnly bool) {
246+
ctx := context.Background()
247+
stopper := stop.NewStopper()
248+
defer stopper.Stop(ctx)
249+
250+
pings := make(chan PingRequest, 5)
251+
done := make(chan struct{})
252+
defer func() { close(done) }()
253+
254+
// Build a minimal server.
255+
opts := ContextOptions{
256+
TenantID: roachpb.SystemTenantID,
257+
Config: testutils.NewNodeTestBaseContext(),
258+
Clock: &timeutil.DefaultTimeSource{},
259+
MaxOffset: 500 * time.Millisecond,
260+
Stopper: stopper,
261+
Settings: cluster.MakeTestingClusterSettings(),
262+
}
263+
rpcCtxServer := NewContext(ctx, opts)
264+
265+
clientOpts := opts
266+
clientOpts.Config = testutils.NewNodeTestBaseContext()
267+
// Experimentally, values below 50ms seem to incur flakiness.
268+
clientOpts.Config.RPCHeartbeatInterval = 100 * time.Millisecond
269+
clientOpts.ClientOnly = clientOnly
270+
clientOpts.OnOutgoingPing = func(ctx context.Context, req *PingRequest) error {
271+
select {
272+
case <-done:
273+
case pings <- *req:
274+
}
275+
return nil
276+
}
277+
rpcCtxClient := NewContext(ctx, clientOpts)
278+
279+
require.NotNil(t, rpcCtxServer.RemoteClocks)
280+
if !clientOnly {
281+
require.NotNil(t, rpcCtxClient.RemoteClocks)
282+
} else {
283+
require.Nil(t, rpcCtxClient.RemoteClocks)
284+
}
285+
286+
t.Logf("server listen")
287+
s := newTestServer(t, rpcCtxServer)
288+
RegisterHeartbeatServer(s, rpcCtxServer.NewHeartbeatService())
289+
rpcCtxServer.NodeID.Set(ctx, 1)
290+
ln, err := netutil.ListenAndServeGRPC(stopper, s, util.TestAddr)
291+
require.NoError(t, err)
292+
293+
t.Logf("client dial")
294+
// Dial: this causes the heartbeats to start.
295+
remoteAddr := ln.Addr().String()
296+
_, err = rpcCtxClient.GRPCDialNode(remoteAddr, 1, SystemClass).Connect(ctx)
297+
require.NoError(t, err)
298+
299+
t.Logf("first ping check")
300+
firstPing := <-pings
301+
require.Zero(t, firstPing.Offset.Offset)
302+
require.Zero(t, firstPing.Offset.Uncertainty)
303+
require.Zero(t, firstPing.Offset.MeasuredAt)
304+
for i := 1; i < 3; i++ {
305+
t.Logf("ping %d check", i)
306+
nextPing := <-pings
307+
if !clientOnly {
308+
require.NotZero(t, nextPing.Offset.Offset, i)
309+
require.NotZero(t, nextPing.Offset.Uncertainty, i)
310+
require.NotZero(t, nextPing.Offset.MeasuredAt, i)
311+
} else {
312+
require.Zero(t, nextPing.Offset.Offset, i)
313+
require.Zero(t, nextPing.Offset.Uncertainty, i)
314+
require.Zero(t, nextPing.Offset.MeasuredAt, i)
315+
}
316+
}
317+
}
318+
229319
var _ roachpb.InternalServer = &internalServer{}
230320

231321
type internalServer struct {

pkg/rpc/nodedialer/nodedialer.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -294,7 +294,10 @@ func (n *Dialer) getBreaker(nodeID roachpb.NodeID, class rpc.ConnectionClass) *w
294294
// samples to compute a reliable average.
295295
func (n *Dialer) Latency(nodeID roachpb.NodeID) (time.Duration, error) {
296296
if n == nil || n.resolver == nil {
297-
return 0, errors.New("no node dialer configured")
297+
return 0, errors.AssertionFailedf("no node dialer configured")
298+
}
299+
if n.rpcContext.RemoteClocks == nil {
300+
return 0, errors.AssertionFailedf("can't call Latency in a client command")
298301
}
299302
addr, err := n.resolver(nodeID)
300303
if err != nil {

0 commit comments

Comments
 (0)