Skip to content

Commit 57dfbba

Browse files
Merge pull request #5018 from hashicorp/backport/bosorawis-feat-configurable-get-downstream-worker-timeout/ultimately-apparent-anteater
This pull request was automerged via backport-assistant
2 parents 1998225 + 93d68aa commit 57dfbba

File tree

4 files changed

+278
-29
lines changed

4 files changed

+278
-29
lines changed

internal/cmd/config/config.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,12 @@ type Controller struct {
210210
LivenessTimeToStale any `hcl:"liveness_time_to_stale"`
211211
LivenessTimeToStaleDuration time.Duration `hcl:"-"`
212212

213+
// TODO: This isn't documented (on purpose) because the right place for this
214+
// is central configuration so you can't drift across controllers and workers
215+
// but we don't have that yet.
216+
GetDownstreamWorkersTimeout any `hcl:"get_downstream_workers_timeout"`
217+
GetDownstreamWorkersTimeoutDuration time.Duration `hcl:"-"`
218+
213219
// SchedulerRunJobInterval is the time interval between waking up the
214220
// scheduler to run pending jobs.
215221
//
@@ -279,6 +285,13 @@ type Worker struct {
279285
StatusCallTimeout any `hcl:"status_call_timeout"`
280286
StatusCallTimeoutDuration time.Duration `hcl:"-"`
281287

288+
// GetDownstreamWorkersTimeout represents the period of time (as a duration) timeout
289+
// for GetDownstreamWorkers call in DownstreamWorkerTicker
290+
//
291+
// TODO: This is currently not documented and considered internal.
292+
GetDownstreamWorkersTimeout any `hcl:"get_downstream_workers_timeout"`
293+
GetDownstreamWorkersTimeoutDuration time.Duration `hcl:"-"`
294+
282295
// SuccessfulStatusGracePeriod represents the period of time (as a duration)
283296
// that the worker will wait before disconnecting connections if it cannot
284297
// successfully complete a status report to a controller. This cannot be
@@ -648,6 +661,21 @@ func Parse(d string) (*Config, error) {
648661
return nil, errors.New("Controller liveness time to stale value is negative")
649662
}
650663

664+
getDownstreamWorkersTimeout := result.Controller.GetDownstreamWorkersTimeout
665+
if util.IsNil(getDownstreamWorkersTimeout) {
666+
getDownstreamWorkersTimeout = os.Getenv("BOUNDARY_CONTROLLER_GET_DOWNSTREAM_WORKERS_TIMEOUT")
667+
}
668+
if getDownstreamWorkersTimeout != nil {
669+
t, err := parseutil.ParseDurationSecond(getDownstreamWorkersTimeout)
670+
if err != nil {
671+
return result, fmt.Errorf("error trying to parse controller get_downstream_workers_timeout: %w", err)
672+
}
673+
result.Controller.GetDownstreamWorkersTimeoutDuration = t
674+
}
675+
if result.Controller.GetDownstreamWorkersTimeoutDuration < 0 {
676+
return nil, errors.New("get downstream workers timeout must be greater than 0")
677+
}
678+
651679
if result.Controller.MaxPageSizeRaw != nil {
652680
switch t := result.Controller.MaxPageSizeRaw.(type) {
653681
case string:
@@ -785,6 +813,21 @@ func Parse(d string) (*Config, error) {
785813
return nil, errors.New("Status call timeout value is negative")
786814
}
787815

816+
getDownstreamWorkersTimeoutDuration := result.Worker.GetDownstreamWorkersTimeout
817+
if util.IsNil(getDownstreamWorkersTimeoutDuration) {
818+
getDownstreamWorkersTimeoutDuration = os.Getenv("BOUNDARY_WORKER_GET_DOWNSTREAM_WORKERS_TIMEOUT")
819+
}
820+
if getDownstreamWorkersTimeoutDuration != nil {
821+
t, err := parseutil.ParseDurationSecond(getDownstreamWorkersTimeoutDuration)
822+
if err != nil {
823+
return result, fmt.Errorf("error trying to parse worker get_downstream_workers_timeout: %w", err)
824+
}
825+
result.Worker.GetDownstreamWorkersTimeoutDuration = t
826+
}
827+
if result.Worker.GetDownstreamWorkersTimeoutDuration < 0 {
828+
return nil, errors.New("get downstream workers timeout must be greater than 0")
829+
}
830+
788831
successfulStatusGracePeriod := result.Worker.SuccessfulStatusGracePeriod
789832
if util.IsNil(successfulStatusGracePeriod) {
790833
successfulStatusGracePeriod = os.Getenv("BOUNDARY_WORKER_SUCCESSFUL_STATUS_GRACE_PERIOD")

internal/cmd/config/config_test.go

Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2832,6 +2832,190 @@ func TestSetupWorkerInitialUpstreams(t *testing.T) {
28322832
}
28332833
}
28342834

2835+
func TestGetDownstreamWorkersTimeout(t *testing.T) {
2836+
tests := []struct {
2837+
name string
2838+
in string
2839+
wantController bool
2840+
wantWorker bool
2841+
wantControllerTimeout time.Duration
2842+
wantWorkerTimeout time.Duration
2843+
assertErr func(*testing.T, error)
2844+
}{
2845+
{
2846+
name: "controller_valid_time_value",
2847+
in: `
2848+
controller {
2849+
name = "example-controller"
2850+
get_downstream_workers_timeout = "10s"
2851+
}`,
2852+
wantControllerTimeout: 10 * time.Second,
2853+
wantWorkerTimeout: 0,
2854+
wantController: true,
2855+
wantWorker: false,
2856+
assertErr: nil,
2857+
},
2858+
{
2859+
name: "worker_valid_time_value",
2860+
in: `
2861+
worker {
2862+
name = "example-worker"
2863+
get_downstream_workers_timeout = "5s"
2864+
}`,
2865+
wantControllerTimeout: 0,
2866+
wantWorkerTimeout: 5 * time.Second,
2867+
wantController: false,
2868+
wantWorker: true,
2869+
assertErr: nil,
2870+
},
2871+
{
2872+
name: "both_valid_time_value",
2873+
in: `
2874+
controller {
2875+
name = "example-controller"
2876+
get_downstream_workers_timeout = "5s"
2877+
}
2878+
worker {
2879+
name = "example-worker"
2880+
get_downstream_workers_timeout = "500ms"
2881+
}`,
2882+
wantControllerTimeout: 5 * time.Second,
2883+
wantWorkerTimeout: 500 * time.Millisecond,
2884+
wantController: true,
2885+
wantWorker: true,
2886+
assertErr: nil,
2887+
},
2888+
{
2889+
name: "both_unspecified_defaults_to_zero",
2890+
in: `
2891+
controller {
2892+
name = "example-controller"
2893+
}
2894+
worker {
2895+
name = "example-worker"
2896+
}`,
2897+
wantController: true,
2898+
wantWorker: true,
2899+
wantControllerTimeout: 0,
2900+
wantWorkerTimeout: 0,
2901+
assertErr: nil,
2902+
},
2903+
{
2904+
name: "controller_int_value_no_unit_assumes_seconds",
2905+
in: `
2906+
controller {
2907+
name = "example-controller"
2908+
get_downstream_workers_timeout = 100
2909+
}`,
2910+
wantController: true,
2911+
wantWorker: false,
2912+
wantControllerTimeout: 100 * time.Second,
2913+
wantWorkerTimeout: 0,
2914+
},
2915+
{
2916+
name: "worker_int_value_no_unit_assumes_seconds",
2917+
in: `
2918+
worker {
2919+
name = "example-worker"
2920+
get_downstream_workers_timeout = 30
2921+
}`,
2922+
wantController: false,
2923+
wantWorker: true,
2924+
wantWorkerTimeout: 30 * time.Second,
2925+
},
2926+
{
2927+
name: "controller_invalid_bool_value",
2928+
in: `
2929+
controller {
2930+
name = "example-controller"
2931+
get_downstream_workers_timeout = true
2932+
}`,
2933+
wantController: true,
2934+
wantWorker: false,
2935+
assertErr: func(t *testing.T, err error) {
2936+
require.Error(t, err)
2937+
require.ErrorContains(t, err, `error trying to parse controller get_downstream_workers_timeout`)
2938+
},
2939+
},
2940+
{
2941+
name: "worker_invalid_bool_value",
2942+
in: `
2943+
worker {
2944+
name = "example-worker"
2945+
get_downstream_workers_timeout = false
2946+
}`,
2947+
wantController: false,
2948+
wantWorker: true,
2949+
assertErr: func(t *testing.T, err error) {
2950+
require.Error(t, err)
2951+
require.ErrorContains(t, err, `error trying to parse worker get_downstream_workers_timeout`)
2952+
},
2953+
},
2954+
{
2955+
name: "controller_invalid_empty_value",
2956+
in: `
2957+
controller {
2958+
name = "example-controller"
2959+
get_downstream_workers_timeout = ""
2960+
}`,
2961+
wantController: true,
2962+
wantWorker: false,
2963+
wantControllerTimeout: 0,
2964+
},
2965+
{
2966+
name: "worker_invalid_empty_value",
2967+
in: `
2968+
worker {
2969+
name = "example-worker"
2970+
get_downstream_workers_timeout = ""
2971+
}`,
2972+
wantController: false,
2973+
wantWorker: true,
2974+
wantWorkerTimeout: 0,
2975+
},
2976+
{
2977+
name: "controller_invalid_zero_value",
2978+
in: `
2979+
controller {
2980+
name = "example-controller"
2981+
get_downstream_workers_timeout = "0s"
2982+
}`,
2983+
wantController: true,
2984+
wantWorker: false,
2985+
wantControllerTimeout: 0,
2986+
},
2987+
{
2988+
name: "worker_invalid_zero_value",
2989+
in: `
2990+
worker {
2991+
name = "example-worker"
2992+
get_downstream_workers_timeout = "0s"
2993+
}`,
2994+
wantController: false,
2995+
wantWorker: true,
2996+
wantWorkerTimeout: 0,
2997+
},
2998+
}
2999+
for _, tt := range tests {
3000+
t.Run(tt.name, func(t *testing.T) {
3001+
c, err := Parse(tt.in)
3002+
if tt.assertErr != nil {
3003+
tt.assertErr(t, err)
3004+
return
3005+
}
3006+
require.NoError(t, err)
3007+
if tt.wantController {
3008+
require.NotNil(t, c.Controller)
3009+
require.Equal(t, tt.wantControllerTimeout, c.Controller.GetDownstreamWorkersTimeoutDuration)
3010+
}
3011+
if tt.wantWorker {
3012+
require.NotNil(t, c.Worker)
3013+
require.Equal(t, tt.wantWorkerTimeout, c.Worker.GetDownstreamWorkersTimeoutDuration)
3014+
}
3015+
})
3016+
}
3017+
}
3018+
28353019
func TestMaxPageSize(t *testing.T) {
28363020
tests := []struct {
28373021
name string

internal/daemon/controller/controller.go

Lines changed: 28 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"strings"
1111
"sync"
1212
"sync/atomic"
13+
"time"
1314

1415
"github.com/hashicorp/boundary/internal/alias"
1516
talias "github.com/hashicorp/boundary/internal/alias/target"
@@ -91,7 +92,7 @@ var (
9192
downstreamReceiverFactory func() downstreamReceiver
9293

9394
downstreamersFactory func(context.Context, string, string) (common.Downstreamers, error)
94-
downstreamWorkersTickerFactory func(context.Context, string, string, common.Downstreamers, downstreamReceiver) (downstreamWorkersTicker, error)
95+
downstreamWorkersTickerFactory func(context.Context, string, string, common.Downstreamers, downstreamReceiver, time.Duration) (downstreamWorkersTicker, error)
9596
commandClientFactory func(context.Context, *Controller) error
9697
extControllerFactory func(ctx context.Context, c *Controller, r db.Reader, w db.Writer, kms *kms.Kms) (intglobals.ControllerExtension, error)
9798
)
@@ -121,8 +122,9 @@ type Controller struct {
121122

122123
// Timing variables. These are atomics for SIGHUP support, and are int64
123124
// because they are casted to time.Duration.
124-
workerStatusGracePeriod *atomic.Int64
125-
livenessTimeToStale *atomic.Int64
125+
workerStatusGracePeriod *atomic.Int64
126+
livenessTimeToStale *atomic.Int64
127+
getDownstreamWorkersTimeout *atomic.Pointer[time.Duration]
126128

127129
apiGrpcServer *grpc.Server
128130
apiGrpcServerListener grpcServerListener
@@ -176,18 +178,19 @@ func New(ctx context.Context, conf *Config) (*Controller, error) {
176178
metric.InitializeApiCollectors(conf.PrometheusRegisterer)
177179
ratelimit.InitializeMetrics(conf.PrometheusRegisterer)
178180
c := &Controller{
179-
conf: conf,
180-
logger: conf.Logger.Named("controller"),
181-
started: ua.NewBool(false),
182-
tickerWg: new(sync.WaitGroup),
183-
schedulerWg: new(sync.WaitGroup),
184-
workerAuthCache: new(sync.Map),
185-
workerStatusUpdateTimes: new(sync.Map),
186-
enabledPlugins: conf.Server.EnabledPlugins,
187-
apiListeners: make([]*base.ServerListener, 0),
188-
downstreamConnManager: cluster.NewDownstreamManager(),
189-
workerStatusGracePeriod: new(atomic.Int64),
190-
livenessTimeToStale: new(atomic.Int64),
181+
conf: conf,
182+
logger: conf.Logger.Named("controller"),
183+
started: ua.NewBool(false),
184+
tickerWg: new(sync.WaitGroup),
185+
schedulerWg: new(sync.WaitGroup),
186+
workerAuthCache: new(sync.Map),
187+
workerStatusUpdateTimes: new(sync.Map),
188+
enabledPlugins: conf.Server.EnabledPlugins,
189+
apiListeners: make([]*base.ServerListener, 0),
190+
downstreamConnManager: cluster.NewDownstreamManager(),
191+
workerStatusGracePeriod: new(atomic.Int64),
192+
livenessTimeToStale: new(atomic.Int64),
193+
getDownstreamWorkersTimeout: new(atomic.Pointer[time.Duration]),
191194
}
192195

193196
if downstreamReceiverFactory != nil {
@@ -238,6 +241,15 @@ func New(ctx context.Context, conf *Config) (*Controller, error) {
238241
c.livenessTimeToStale.Store(int64(conf.RawConfig.Controller.LivenessTimeToStaleDuration))
239242
}
240243

244+
switch conf.RawConfig.Controller.GetDownstreamWorkersTimeoutDuration {
245+
case 0:
246+
to := server.DefaultLiveness
247+
c.getDownstreamWorkersTimeout.Store(&to)
248+
default:
249+
to := conf.RawConfig.Controller.GetDownstreamWorkersTimeoutDuration
250+
c.getDownstreamWorkersTimeout.Store(&to)
251+
}
252+
241253
clusterListeners := make([]*base.ServerListener, 0)
242254
for i := range conf.Listeners {
243255
l := conf.Listeners[i]
@@ -579,7 +591,7 @@ func (c *Controller) Start() error {
579591
// we'll use "root" to designate that this is the root of the graph (aka
580592
// a controller)
581593
boundVer := version.Get().VersionNumber()
582-
dswTicker, err := downstreamWorkersTickerFactory(c.baseContext, "root", boundVer, c.downstreamWorkers, c.downstreamConns)
594+
dswTicker, err := downstreamWorkersTickerFactory(c.baseContext, "root", boundVer, c.downstreamWorkers, c.downstreamConns, *c.getDownstreamWorkersTimeout.Load())
583595
if err != nil {
584596
return fmt.Errorf("error creating downstream workers ticker: %w", err)
585597
}

0 commit comments

Comments
 (0)