Skip to content

Commit fe7ba4e

Browse files
Add ForcePolicyChangeAcks feature flag
1 parent 9535385 commit fe7ba4e

File tree

4 files changed

+64
-6
lines changed

4 files changed

+64
-6
lines changed

internal/pkg/agent/application/actions/handlers/handler_action_policy_change.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"github.com/elastic/elastic-agent/internal/pkg/fleetapi/client"
3030
"github.com/elastic/elastic-agent/internal/pkg/remote"
3131
"github.com/elastic/elastic-agent/pkg/core/logger"
32+
"github.com/elastic/elastic-agent/pkg/features"
3233
)
3334

3435
// PolicyChangeHandler is a handler for POLICY_CHANGE action.
@@ -41,6 +42,7 @@ type PolicyChangeHandler struct {
4142
setters []actions.ClientSetter
4243
policyLogLevelSetter logLevelSetter
4344
coordinator *coordinator.Coordinator
45+
forceAckFn func() bool
4446
// Disabled for 8.8.0 release in order to limit the surface
4547
// https://github.com/elastic/security-team/issues/6501
4648
// // Last known valid signature validation key
@@ -67,6 +69,7 @@ func NewPolicyChangeHandler(
6769
setters: setters,
6870
coordinator: coordinator,
6971
policyLogLevelSetter: policyLogLevelSetter,
72+
forceAckFn: features.ForcePolicyChangeAcks,
7073
}
7174
}
7275

@@ -111,7 +114,7 @@ func (h *PolicyChangeHandler) Handle(ctx context.Context, a fleetapi.Action, ack
111114
return err
112115
}
113116

114-
h.ch <- newPolicyChange(ctx, c, a, acker, false)
117+
h.ch <- newPolicyChange(ctx, c, a, acker, false, h.forceAckFn())
115118
return nil
116119
}
117120

@@ -474,14 +477,16 @@ type policyChange struct {
474477
action fleetapi.Action
475478
acker acker.Acker
476479
ackWatcher chan struct{}
480+
forceAck bool
477481
}
478482

479483
func newPolicyChange(
480484
ctx context.Context,
481485
config *config.Config,
482486
action fleetapi.Action,
483487
acker acker.Acker,
484-
makeCh bool) *policyChange {
488+
makeCh bool,
489+
forceAck bool) *policyChange {
485490
var ackWatcher chan struct{}
486491
if makeCh {
487492
// we don't need it otherwise
@@ -493,6 +498,7 @@ func newPolicyChange(
493498
action: action,
494499
acker: acker,
495500
ackWatcher: ackWatcher,
501+
forceAck: forceAck,
496502
}
497503
}
498504

@@ -503,15 +509,15 @@ func (l *policyChange) Config() *config.Config {
503509
// Ack sends an ack for the associated action if the results are expected.
504510
// An ack will not be sent for a POLICY_CHANGE action, but will be when this method is used by UNENROLL actions.
505511
func (l *policyChange) Ack() error {
506-
if l.action == nil || l.ackWatcher == nil {
512+
if !l.forceAck || l.action == nil {
507513
return nil
508514
}
509515
err := l.acker.Ack(l.ctx, l.action)
510516
if err != nil {
511517
return err
512518
}
513519
err = l.acker.Commit(l.ctx)
514-
if err == nil {
520+
if err == nil && l.ackWatcher != nil {
515521
close(l.ackWatcher)
516522
}
517523
return err

internal/pkg/agent/application/actions/handlers/handler_action_policy_change_test.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,34 @@ func TestPolicyAcked(t *testing.T) {
131131
actions := tacker.Items()
132132
assert.Empty(t, actions)
133133
})
134+
t.Run("Config change acks when forced", func(t *testing.T) {
135+
ch := make(chan coordinator.ConfigChange, 1)
136+
tacker := &testAcker{}
137+
138+
config := map[string]interface{}{"hello": "world"}
139+
actionID := "abc123"
140+
action := &fleetapi.ActionPolicyChange{
141+
ActionID: actionID,
142+
ActionType: "POLICY_CHANGE",
143+
Data: fleetapi.ActionPolicyChangeData{
144+
Policy: config,
145+
},
146+
}
147+
148+
cfg := configuration.DefaultConfiguration()
149+
handler := NewPolicyChangeHandler(log, agentInfo, cfg, nullStore, ch, nilLogLevelSet(t), &coordinator.Coordinator{})
150+
handler.forceAckFn = func() bool { return true }
151+
152+
err := handler.Handle(context.Background(), action, tacker)
153+
require.NoError(t, err)
154+
155+
change := <-ch
156+
require.NoError(t, change.Ack())
157+
158+
actions := tacker.Items()
159+
assert.Len(t, actions, 1)
160+
assert.Equal(t, actionID, actions[0])
161+
})
134162
}
135163

136164
func TestPolicyChangeHandler_handlePolicyChange_FleetClientSettings(t *testing.T) {

internal/pkg/agent/application/actions/handlers/handler_action_unenroll.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ func (h *Unenroll) handle(ctx context.Context, a fleetapi.Action, acker acker.Ac
9393
}
9494

9595
// Generate empty policy change, this removing all the running components
96-
unenrollPolicy := newPolicyChange(ctx, config.New(), a, acker, true)
96+
unenrollPolicy := newPolicyChange(ctx, config.New(), a, acker, true, true)
9797
h.ch <- unenrollPolicy
9898

9999
// backup action for future start to avoid starting fleet gateway loop

pkg/features/features.go

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,8 @@ type Flags struct {
3636
fqdn bool
3737
fqdnCallbacks map[string]BoolValueOnChangeCallback
3838

39-
tamperProtection bool
39+
tamperProtection bool
40+
forcePolicyChangeAcks bool
4041
}
4142

4243
type cfg struct {
@@ -48,6 +49,9 @@ type cfg struct {
4849
TamperProtection *struct {
4950
Enabled bool `json:"enabled" yaml:"enabled" config:"enabled"`
5051
} `json:"tamper_protection,omitempty" yaml:"tamper_protection,omitempty" config:"tamper_protection,omitempty"`
52+
ForcePolicyChangeAcks struct {
53+
Enabled bool `json:"enabled" yaml:"enabled" config:"enabled"`
54+
} `json:"force_policy_change_acks" toml: "force_policy_change_acks" config:"force_policy_change_acks"`
5155
} `json:"features" yaml:"features" config:"features"`
5256
} `json:"agent" yaml:"agent" config:"agent"`
5357
}
@@ -66,6 +70,13 @@ func (f *Flags) TamperProtection() bool {
6670
return f.tamperProtection
6771
}
6872

73+
func (f *Flags) ForcePolicyChangeAcks() bool {
74+
f.mu.RLock()
75+
defer f.mu.RUnlock()
76+
77+
return f.forcePolicyChangeAcks
78+
}
79+
6980
func (f *Flags) AsProto() *proto.Features {
7081
return &proto.Features{
7182
Fqdn: &proto.FQDNFeature{
@@ -121,6 +132,13 @@ func (f *Flags) setTamperProtection(newValue bool) {
121132
f.tamperProtection = newValue
122133
}
123134

135+
func (f *Flags) setForcePolicyChangeAcks(newValue bool) {
136+
f.mu.Lock()
137+
defer f.mu.Unlock()
138+
139+
f.forcePolicyChangeAcks = newValue
140+
}
141+
124142
// setSource sets the source from he given cfg.
125143
func (f *Flags) setSource(c cfg) error {
126144
// Use JSON marshalling-unmarshalling to convert cfg to mapstr
@@ -208,6 +226,7 @@ func Apply(c *config.Config) error {
208226

209227
current.setFQDN(parsed.FQDN())
210228
current.setTamperProtection(parsed.TamperProtection())
229+
current.setForcePolicyChangeAcks(parsed.ForcePolicyChangeAcks())
211230
return err
212231
}
213232

@@ -220,3 +239,8 @@ func FQDN() bool {
220239
func TamperProtection() bool {
221240
return current.TamperProtection()
222241
}
242+
243+
// ForcePolicyChangeAcks reports if the agent should force sending an ACK for POLICY_CHANGE actions.
244+
func ForcePolicyChangeAcks() bool {
245+
return current.ForcePolicyChangeAcks()
246+
}

0 commit comments

Comments
 (0)