Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 10 additions & 11 deletions pkg/rpc/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,6 @@ type Context struct {

heartbeatInterval time.Duration
heartbeatTimeout time.Duration
HeartbeatCB func()

rpcCompression bool

Expand Down Expand Up @@ -483,12 +482,13 @@ func (c connKey) SafeFormat(p redact.SafePrinter, _ rune) {
// ContextOptions are passed to NewContext to set up a new *Context.
// All pointer fields and TenantID are required.
type ContextOptions struct {
TenantID roachpb.TenantID
Config *base.Config
Clock hlc.WallClock
ToleratedOffset time.Duration
Stopper *stop.Stopper
Settings *cluster.Settings
TenantID roachpb.TenantID
Config *base.Config
Clock hlc.WallClock
ToleratedOffset time.Duration
FatalOnOffsetViolation bool
Stopper *stop.Stopper
Settings *cluster.Settings
// OnIncomingPing is called when handling a PingRequest, after
// preliminary checks but before recording clock offset information.
// It can inject an error or modify the response.
Expand Down Expand Up @@ -2546,10 +2546,9 @@ func (rpcCtx *Context) runHeartbeat(
request.Offset.Offset = remoteTimeNow.Sub(receiveTime).Nanoseconds()
}
rpcCtx.RemoteClocks.UpdateOffset(ctx, conn.remoteNodeID, request.Offset, pingDuration)
}

if cb := rpcCtx.HeartbeatCB; cb != nil {
cb()
if err := rpcCtx.RemoteClocks.VerifyClockOffset(ctx); err != nil && rpcCtx.FatalOnOffsetViolation {
log.Ops.Fatalf(ctx, "%v", err)
}
}

return nil
Expand Down
53 changes: 0 additions & 53 deletions pkg/rpc/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,59 +112,6 @@ func newTestContext(
})
}

func TestHeartbeatCB(t *testing.T) {
defer leaktest.AfterTest(t)()

testutils.RunTrueAndFalse(t, "compression", func(t *testing.T, compression bool) {
stopper := stop.NewStopper()
defer stopper.Stop(context.Background())

// Shared cluster ID by all RPC peers (this ensures that the peers
// don't talk to servers from unrelated tests by accident).
clusterID := uuid.MakeV4()

clock := timeutil.NewManualTime(timeutil.Unix(0, 20))
maxOffset := time.Duration(0)
serverCtx := newTestContext(clusterID, clock, maxOffset, stopper)
serverCtx.rpcCompression = compression
const serverNodeID = 1
serverCtx.NodeID.Set(context.Background(), serverNodeID)
s := newTestServer(t, serverCtx)
RegisterHeartbeatServer(s, &HeartbeatService{
clock: clock,
remoteClockMonitor: serverCtx.RemoteClocks,
clusterID: serverCtx.StorageClusterID,
nodeID: serverCtx.NodeID,
version: serverCtx.Settings.Version,
})

ln, err := netutil.ListenAndServeGRPC(serverCtx.Stopper, s, util.TestAddr)
if err != nil {
t.Fatal(err)
}
remoteAddr := ln.Addr().String()

// Clocks don't matter in this test.
clientCtx := newTestContext(clusterID, clock, maxOffset, stopper)
clientCtx.rpcCompression = compression

var once sync.Once
ch := make(chan struct{})

clientCtx.HeartbeatCB = func() {
once.Do(func() {
close(ch)
})
}

if _, err := clientCtx.GRPCDialNode(remoteAddr, serverNodeID, DefaultClass).Connect(context.Background()); err != nil {
t.Fatal(err)
}

<-ch
})
}

// TestPingInterceptors checks that OnOutgoingPing and OnIncomingPing can inject errors.
func TestPingInterceptors(t *testing.T) {
defer leaktest.AfterTest(t)()
Expand Down
24 changes: 10 additions & 14 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,15 +304,16 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
tenantCapabilitiesTestingKnobs, _ := cfg.TestingKnobs.TenantCapabilitiesTestingKnobs.(*tenantcapabilities.TestingKnobs)
authorizer := tenantcapabilitiesauthorizer.New(cfg.Settings, tenantCapabilitiesTestingKnobs)
rpcCtxOpts := rpc.ContextOptions{
TenantID: roachpb.SystemTenantID,
UseNodeAuth: true,
NodeID: cfg.IDContainer,
StorageClusterID: cfg.ClusterIDContainer,
Config: cfg.Config,
Clock: clock.WallClock(),
ToleratedOffset: clock.ToleratedOffset(),
Stopper: stopper,
Settings: cfg.Settings,
TenantID: roachpb.SystemTenantID,
UseNodeAuth: true,
NodeID: cfg.IDContainer,
StorageClusterID: cfg.ClusterIDContainer,
Config: cfg.Config,
Clock: clock.WallClock(),
ToleratedOffset: clock.ToleratedOffset(),
FatalOnOffsetViolation: true,
Stopper: stopper,
Settings: cfg.Settings,
OnOutgoingPing: func(ctx context.Context, req *rpc.PingRequest) error {
// Outgoing ping will block requests with codes.FailedPrecondition to
// notify caller that this replica is decommissioned but others could
Expand Down Expand Up @@ -344,11 +345,6 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
return rpcContext.VerifyDialback(ctx, req, resp, cfg.Locality)
}

rpcContext.HeartbeatCB = func() {
if err := rpcContext.RemoteClocks.VerifyClockOffset(ctx); err != nil {
log.Ops.Fatalf(ctx, "%v", err)
}
}
registry.AddMetricStruct(rpcContext.Metrics())

// Attempt to load TLS configs right away, failures are permanent.
Expand Down