Skip to content

Commit b585e91

Browse files
Change FF to explicitly disable acks
1 parent fe7ba4e commit b585e91

File tree

5 files changed

+65
-26
lines changed

5 files changed

+65
-26
lines changed

changelog/fragments/1757710842-Add-agent_policy_id-and-policy_revision_idx-to-checkin-requests.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ summary: Add agent_policy_id and policy_revision_idx to checkin requests
1919
description: |
2020
Add agent_policy_id and policy_revision_idx attributes to checkin requests.
2121
These attributes are used to inform fleet-server of the policy id and revision that the agent is currently running.
22-
Agents that use these policies no longer need to send acks for POLICY_CHANGE actions.
22+
Add a feature flag to disable sending acks for POLICY_CHANGE actions on a future release.
2323
2424
# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
2525
component: elastic-agent
@@ -28,7 +28,7 @@ component: elastic-agent
2828
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
2929
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
3030
# Please provide it if you are adding a fragment for a different PR.
31-
#pr: https://github.com/owner/repo/1234
31+
pr: https://github.com/elastic/elastic-agent/pull/9931
3232

3333
# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
3434
# If not present is automatically filled by the tooling with the issue linked to the PR number.

internal/pkg/agent/application/actions/handlers/handler_action_policy_change.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ type PolicyChangeHandler struct {
4242
setters []actions.ClientSetter
4343
policyLogLevelSetter logLevelSetter
4444
coordinator *coordinator.Coordinator
45-
forceAckFn func() bool
45+
disableAckFn func() bool
4646
// Disabled for 8.8.0 release in order to limit the surface
4747
// https://github.com/elastic/security-team/issues/6501
4848
// // Last known valid signature validation key
@@ -69,7 +69,7 @@ func NewPolicyChangeHandler(
6969
setters: setters,
7070
coordinator: coordinator,
7171
policyLogLevelSetter: policyLogLevelSetter,
72-
forceAckFn: features.ForcePolicyChangeAcks,
72+
disableAckFn: features.DisablePolicyChangeAcks,
7373
}
7474
}
7575

@@ -114,7 +114,7 @@ func (h *PolicyChangeHandler) Handle(ctx context.Context, a fleetapi.Action, ack
114114
return err
115115
}
116116

117-
h.ch <- newPolicyChange(ctx, c, a, acker, false, h.forceAckFn())
117+
h.ch <- newPolicyChange(ctx, c, a, acker, false, h.disableAckFn())
118118
return nil
119119
}
120120

@@ -477,7 +477,7 @@ type policyChange struct {
477477
action fleetapi.Action
478478
acker acker.Acker
479479
ackWatcher chan struct{}
480-
forceAck bool
480+
disableAck bool
481481
}
482482

483483
func newPolicyChange(
@@ -486,7 +486,7 @@ func newPolicyChange(
486486
action fleetapi.Action,
487487
acker acker.Acker,
488488
makeCh bool,
489-
forceAck bool) *policyChange {
489+
disableAck bool) *policyChange {
490490
var ackWatcher chan struct{}
491491
if makeCh {
492492
// we don't need it otherwise
@@ -498,7 +498,7 @@ func newPolicyChange(
498498
action: action,
499499
acker: acker,
500500
ackWatcher: ackWatcher,
501-
forceAck: forceAck,
501+
disableAck: disableAck,
502502
}
503503
}
504504

@@ -507,9 +507,9 @@ func (l *policyChange) Config() *config.Config {
507507
}
508508

509509
// Ack sends an ack for the associated action if the results are expected.
510-
// An ack will not be sent for a POLICY_CHANGE action, but will be when this method is used by UNENROLL actions.
510+
// An ack will be sent for UNENROLL actions, or by POLICY_CHANGE actions if it has not been explicitly disabled.
511511
func (l *policyChange) Ack() error {
512-
if !l.forceAck || l.action == nil {
512+
if l.disableAck || l.action == nil {
513513
return nil
514514
}
515515
err := l.acker.Ack(l.ctx, l.action)

internal/pkg/agent/application/actions/handlers/handler_action_policy_change_test.go

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ func TestPolicyAcked(t *testing.T) {
105105
agentInfo := &info.AgentInfo{}
106106
nullStore := &storage.NullStore{}
107107

108-
t.Run("Config change shouldn't ACK", func(t *testing.T) {
108+
t.Run("Default: Config changes are ACKed", func(t *testing.T) {
109109
ch := make(chan coordinator.ConfigChange, 1)
110110
tacker := &testAcker{}
111111

@@ -119,6 +119,7 @@ func TestPolicyAcked(t *testing.T) {
119119
},
120120
}
121121

122+
// Test default FF value
122123
cfg := configuration.DefaultConfiguration()
123124
handler := NewPolicyChangeHandler(log, agentInfo, cfg, nullStore, ch, nilLogLevelSet(t), &coordinator.Coordinator{})
124125

@@ -129,7 +130,8 @@ func TestPolicyAcked(t *testing.T) {
129130
require.NoError(t, change.Ack())
130131

131132
actions := tacker.Items()
132-
assert.Empty(t, actions)
133+
assert.Len(t, actions, 1)
134+
assert.Equal(t, actionID, actions[0])
133135
})
134136
t.Run("Config change acks when forced", func(t *testing.T) {
135137
ch := make(chan coordinator.ConfigChange, 1)
@@ -147,7 +149,7 @@ func TestPolicyAcked(t *testing.T) {
147149

148150
cfg := configuration.DefaultConfiguration()
149151
handler := NewPolicyChangeHandler(log, agentInfo, cfg, nullStore, ch, nilLogLevelSet(t), &coordinator.Coordinator{})
150-
handler.forceAckFn = func() bool { return true }
152+
handler.disableAckFn = func() bool { return false }
151153

152154
err := handler.Handle(context.Background(), action, tacker)
153155
require.NoError(t, err)
@@ -159,6 +161,33 @@ func TestPolicyAcked(t *testing.T) {
159161
assert.Len(t, actions, 1)
160162
assert.Equal(t, actionID, actions[0])
161163
})
164+
t.Run("Config change do not ack when disabled", func(t *testing.T) {
165+
ch := make(chan coordinator.ConfigChange, 1)
166+
tacker := &testAcker{}
167+
168+
config := map[string]interface{}{"hello": "world"}
169+
actionID := "abc123"
170+
action := &fleetapi.ActionPolicyChange{
171+
ActionID: actionID,
172+
ActionType: "POLICY_CHANGE",
173+
Data: fleetapi.ActionPolicyChangeData{
174+
Policy: config,
175+
},
176+
}
177+
178+
cfg := configuration.DefaultConfiguration()
179+
handler := NewPolicyChangeHandler(log, agentInfo, cfg, nullStore, ch, nilLogLevelSet(t), &coordinator.Coordinator{})
180+
handler.disableAckFn = func() bool { return true }
181+
182+
err := handler.Handle(context.Background(), action, tacker)
183+
require.NoError(t, err)
184+
185+
change := <-ch
186+
require.NoError(t, change.Ack())
187+
188+
actions := tacker.Items()
189+
assert.Empty(t, actions)
190+
})
162191
}
163192

164193
func TestPolicyChangeHandler_handlePolicyChange_FleetClientSettings(t *testing.T) {

internal/pkg/agent/application/actions/handlers/handler_action_unenroll.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ func (h *Unenroll) handle(ctx context.Context, a fleetapi.Action, acker acker.Ac
9393
}
9494

9595
// Generate empty policy change, this removing all the running components
96-
unenrollPolicy := newPolicyChange(ctx, config.New(), a, acker, true, true)
96+
unenrollPolicy := newPolicyChange(ctx, config.New(), a, acker, true, false)
9797
h.ch <- unenrollPolicy
9898

9999
// backup action for future start to avoid starting fleet gateway loop

pkg/features/features.go

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@ import (
2121
// 8.11+ - default is enabled
2222
const defaultTamperProtection = true
2323

24+
// The default value of the disable policy change acks flag if the flag is missing.
25+
// 9.2 - disabled (acks are sent)
26+
const defaultDisablePolicyChangeAcks = false
27+
2428
var (
2529
current = Flags{
2630
tamperProtection: defaultTamperProtection,
@@ -36,8 +40,8 @@ type Flags struct {
3640
fqdn bool
3741
fqdnCallbacks map[string]BoolValueOnChangeCallback
3842

39-
tamperProtection bool
40-
forcePolicyChangeAcks bool
43+
tamperProtection bool
44+
disablePolicyChangeAcks bool
4145
}
4246

4347
type cfg struct {
@@ -49,9 +53,9 @@ type cfg struct {
4953
TamperProtection *struct {
5054
Enabled bool `json:"enabled" yaml:"enabled" config:"enabled"`
5155
} `json:"tamper_protection,omitempty" yaml:"tamper_protection,omitempty" config:"tamper_protection,omitempty"`
52-
ForcePolicyChangeAcks struct {
56+
DisablePolicyChangeAcks struct {
5357
Enabled bool `json:"enabled" yaml:"enabled" config:"enabled"`
54-
} `json:"force_policy_change_acks" toml: "force_policy_change_acks" config:"force_policy_change_acks"`
58+
} `json:"disable_policy_change_acks" toml: "disable_policy_change_acks" config:"disable_policy_change_acks"`
5559
} `json:"features" yaml:"features" config:"features"`
5660
} `json:"agent" yaml:"agent" config:"agent"`
5761
}
@@ -70,11 +74,11 @@ func (f *Flags) TamperProtection() bool {
7074
return f.tamperProtection
7175
}
7276

73-
func (f *Flags) ForcePolicyChangeAcks() bool {
77+
func (f *Flags) DisablePolicyChangeAcks() bool {
7478
f.mu.RLock()
7579
defer f.mu.RUnlock()
7680

77-
return f.forcePolicyChangeAcks
81+
return f.disablePolicyChangeAcks
7882
}
7983

8084
func (f *Flags) AsProto() *proto.Features {
@@ -132,11 +136,11 @@ func (f *Flags) setTamperProtection(newValue bool) {
132136
f.tamperProtection = newValue
133137
}
134138

135-
func (f *Flags) setForcePolicyChangeAcks(newValue bool) {
139+
func (f *Flags) setDisablePolicyChangeAcks(newValue bool) {
136140
f.mu.Lock()
137141
defer f.mu.Unlock()
138142

139-
f.forcePolicyChangeAcks = newValue
143+
f.disablePolicyChangeAcks = newValue
140144
}
141145

142146
// setSource sets the source from he given cfg.
@@ -204,6 +208,12 @@ func Parse(policy any) (*Flags, error) {
204208
flags.setTamperProtection(defaultTamperProtection)
205209
}
206210

211+
if parsedFlags.Agent.Features.DisablePolicyChangeAcks != nil {
212+
flags.setDisablePolicyChangeAcks(parsedFlags.Agent.Features.DisablePolicyChangeAcks.Enabled)
213+
} else {
214+
flags.setDisablePolicyChangeAcks(defaultDisablePolicyChangeAcks)
215+
}
216+
207217
if err := flags.setSource(parsedFlags); err != nil {
208218
return nil, fmt.Errorf("error creating feature flags source: %w", err)
209219
}
@@ -226,7 +236,7 @@ func Apply(c *config.Config) error {
226236

227237
current.setFQDN(parsed.FQDN())
228238
current.setTamperProtection(parsed.TamperProtection())
229-
current.setForcePolicyChangeAcks(parsed.ForcePolicyChangeAcks())
239+
current.setDisablePolicyChangeAcks(parsed.DisablePolicyChangeAcks())
230240
return err
231241
}
232242

@@ -240,7 +250,7 @@ func TamperProtection() bool {
240250
return current.TamperProtection()
241251
}
242252

243-
// ForcePolicyChangeAcks reports if the agent should force sending an ACK for POLICY_CHANGE actions.
244-
func ForcePolicyChangeAcks() bool {
245-
return current.ForcePolicyChangeAcks()
253+
// DisablePolicyChangeAcks reports if the agent will stop using ACKs for POLICY_CHANGE actions.
254+
func DisablePolicyChangeAcks() bool {
255+
return current.DisablePolicyChangeAcks()
246256
}

0 commit comments

Comments
 (0)