Skip to content

Commit f2c4cfa

Browse files
Add agent_policy_id and policy_revision_idx to checkin requests (#9931)
* Add agent_policy_id and policy_revision_idx to checkin requests Add the agent_policy_id and policy_revision_idx attributes to checkin requests. * Remove policy change action acks * Add ForcePolicyChangeAcks feature flag * Change FF to explicitly disable acks * Fix feature flag
1 parent 395d0da commit f2c4cfa

File tree

8 files changed

+275
-30
lines changed

8 files changed

+275
-30
lines changed
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
# Kind can be one of:
2+
# - breaking-change: a change to previously-documented behavior
3+
# - deprecation: functionality that is being removed in a later release
4+
# - bug-fix: fixes a problem in a previous version
5+
# - enhancement: extends functionality but does not break or fix existing behavior
6+
# - feature: new functionality
7+
# - known-issue: problems that we are aware of in a given version
8+
# - security: impacts on the security of a product or a user’s deployment.
9+
# - upgrade: important information for someone upgrading from a prior version
10+
# - other: does not fit into any of the other categories
11+
kind: feature
12+
13+
# Change summary; a 80ish characters long description of the change.
14+
summary: Add agent_policy_id and policy_revision_idx to checkin requests
15+
16+
# Long description; in case the summary is not enough to describe the change
17+
# this field accommodate a description without length limits.
18+
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
19+
description: |
20+
Add agent_policy_id and policy_revision_idx attributes to checkin requests.
21+
These attributes are used to inform fleet-server of the policy id and revision that the agent is currently running.
22+
Add a feature flag to disable sending acks for POLICY_CHANGE actions on a future release.
23+
24+
# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
25+
component: elastic-agent
26+
27+
# PR URL; optional; the PR number that added the changeset.
28+
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
29+
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
30+
# Please provide it if you are adding a fragment for a different PR.
31+
pr: https://github.com/elastic/elastic-agent/pull/9931
32+
33+
# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
34+
# If not present is automatically filled by the tooling with the issue linked to the PR number.
35+
issue: https://github.com/elastic/elastic-agent/issues/6446

internal/pkg/agent/application/actions/handlers/handler_action_policy_change.go

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"github.com/elastic/elastic-agent/internal/pkg/fleetapi/client"
3030
"github.com/elastic/elastic-agent/internal/pkg/remote"
3131
"github.com/elastic/elastic-agent/pkg/core/logger"
32+
"github.com/elastic/elastic-agent/pkg/features"
3233
)
3334

3435
// PolicyChangeHandler is a handler for POLICY_CHANGE action.
@@ -41,6 +42,7 @@ type PolicyChangeHandler struct {
4142
setters []actions.ClientSetter
4243
policyLogLevelSetter logLevelSetter
4344
coordinator *coordinator.Coordinator
45+
disableAckFn func() bool
4446
// Disabled for 8.8.0 release in order to limit the surface
4547
// https://github.com/elastic/security-team/issues/6501
4648
// // Last known valid signature validation key
@@ -67,6 +69,7 @@ func NewPolicyChangeHandler(
6769
setters: setters,
6870
coordinator: coordinator,
6971
policyLogLevelSetter: policyLogLevelSetter,
72+
disableAckFn: features.DisablePolicyChangeAcks,
7073
}
7174
}
7275

@@ -111,7 +114,7 @@ func (h *PolicyChangeHandler) Handle(ctx context.Context, a fleetapi.Action, ack
111114
return err
112115
}
113116

114-
h.ch <- newPolicyChange(ctx, c, a, acker, false)
117+
h.ch <- newPolicyChange(ctx, c, a, acker, false, h.disableAckFn())
115118
return nil
116119
}
117120

@@ -473,18 +476,19 @@ type policyChange struct {
473476
cfg *config.Config
474477
action fleetapi.Action
475478
acker acker.Acker
476-
commit bool
477479
ackWatcher chan struct{}
480+
disableAck bool
478481
}
479482

480483
func newPolicyChange(
481484
ctx context.Context,
482485
config *config.Config,
483486
action fleetapi.Action,
484487
acker acker.Acker,
485-
commit bool) *policyChange {
488+
makeCh bool,
489+
disableAck bool) *policyChange {
486490
var ackWatcher chan struct{}
487-
if commit {
491+
if makeCh {
488492
// we don't need it otherwise
489493
ackWatcher = make(chan struct{})
490494
}
@@ -493,39 +497,38 @@ func newPolicyChange(
493497
cfg: config,
494498
action: action,
495499
acker: acker,
496-
commit: true,
497500
ackWatcher: ackWatcher,
501+
disableAck: disableAck,
498502
}
499503
}
500504

501505
func (l *policyChange) Config() *config.Config {
502506
return l.cfg
503507
}
504508

509+
// Ack sends an ack for the associated action if the results are expected.
510+
// An ack will be sent for UNENROLL actions, or by POLICY_CHANGE actions if it has not been explicitly disabled.
505511
func (l *policyChange) Ack() error {
506-
if l.action == nil {
512+
if l.disableAck || l.action == nil {
507513
return nil
508514
}
509515
err := l.acker.Ack(l.ctx, l.action)
510516
if err != nil {
511517
return err
512518
}
513-
if l.commit {
514-
err := l.acker.Commit(l.ctx)
515-
if l.ackWatcher != nil && err == nil {
516-
close(l.ackWatcher)
517-
}
518-
return err
519+
err = l.acker.Commit(l.ctx)
520+
if err == nil && l.ackWatcher != nil {
521+
close(l.ackWatcher)
519522
}
520-
return nil
523+
return err
521524
}
522525

523526
// WaitAck waits for policy change to be acked.
524527
// Policy change ack is awaitable only in case commit flag was set.
525528
// Caller is responsible to use any reasonable deadline otherwise
526529
// function call can be endlessly blocking.
527530
func (l *policyChange) WaitAck(ctx context.Context) {
528-
if !l.commit || l.ackWatcher == nil {
531+
if l.ackWatcher == nil {
529532
return
530533
}
531534

internal/pkg/agent/application/actions/handlers/handler_action_policy_change_test.go

Lines changed: 58 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ func TestPolicyAcked(t *testing.T) {
105105
agentInfo := &info.AgentInfo{}
106106
nullStore := &storage.NullStore{}
107107

108-
t.Run("Config change should ACK", func(t *testing.T) {
108+
t.Run("Default: Config changes are ACKed", func(t *testing.T) {
109109
ch := make(chan coordinator.ConfigChange, 1)
110110
tacker := &testAcker{}
111111

@@ -119,6 +119,7 @@ func TestPolicyAcked(t *testing.T) {
119119
},
120120
}
121121

122+
// Test default FF value
122123
cfg := configuration.DefaultConfiguration()
123124
handler := NewPolicyChangeHandler(log, agentInfo, cfg, nullStore, ch, nilLogLevelSet(t), &coordinator.Coordinator{})
124125

@@ -129,9 +130,64 @@ func TestPolicyAcked(t *testing.T) {
129130
require.NoError(t, change.Ack())
130131

131132
actions := tacker.Items()
132-
assert.EqualValues(t, 1, len(actions))
133+
assert.Len(t, actions, 1)
133134
assert.Equal(t, actionID, actions[0])
134135
})
136+
t.Run("Config change acks when forced", func(t *testing.T) {
137+
ch := make(chan coordinator.ConfigChange, 1)
138+
tacker := &testAcker{}
139+
140+
config := map[string]interface{}{"hello": "world"}
141+
actionID := "abc123"
142+
action := &fleetapi.ActionPolicyChange{
143+
ActionID: actionID,
144+
ActionType: "POLICY_CHANGE",
145+
Data: fleetapi.ActionPolicyChangeData{
146+
Policy: config,
147+
},
148+
}
149+
150+
cfg := configuration.DefaultConfiguration()
151+
handler := NewPolicyChangeHandler(log, agentInfo, cfg, nullStore, ch, nilLogLevelSet(t), &coordinator.Coordinator{})
152+
handler.disableAckFn = func() bool { return false }
153+
154+
err := handler.Handle(context.Background(), action, tacker)
155+
require.NoError(t, err)
156+
157+
change := <-ch
158+
require.NoError(t, change.Ack())
159+
160+
actions := tacker.Items()
161+
assert.Len(t, actions, 1)
162+
assert.Equal(t, actionID, actions[0])
163+
})
164+
t.Run("Config change do not ack when disabled", func(t *testing.T) {
165+
ch := make(chan coordinator.ConfigChange, 1)
166+
tacker := &testAcker{}
167+
168+
config := map[string]interface{}{"hello": "world"}
169+
actionID := "abc123"
170+
action := &fleetapi.ActionPolicyChange{
171+
ActionID: actionID,
172+
ActionType: "POLICY_CHANGE",
173+
Data: fleetapi.ActionPolicyChangeData{
174+
Policy: config,
175+
},
176+
}
177+
178+
cfg := configuration.DefaultConfiguration()
179+
handler := NewPolicyChangeHandler(log, agentInfo, cfg, nullStore, ch, nilLogLevelSet(t), &coordinator.Coordinator{})
180+
handler.disableAckFn = func() bool { return true }
181+
182+
err := handler.Handle(context.Background(), action, tacker)
183+
require.NoError(t, err)
184+
185+
change := <-ch
186+
require.NoError(t, change.Ack())
187+
188+
actions := tacker.Items()
189+
assert.Empty(t, actions)
190+
})
135191
}
136192

137193
func TestPolicyChangeHandler_handlePolicyChange_FleetClientSettings(t *testing.T) {

internal/pkg/agent/application/actions/handlers/handler_action_unenroll.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ func (h *Unenroll) handle(ctx context.Context, a fleetapi.Action, acker acker.Ac
9393
}
9494

9595
// Generate empty policy change, this removing all the running components
96-
unenrollPolicy := newPolicyChange(ctx, config.New(), a, acker, true)
96+
unenrollPolicy := newPolicyChange(ctx, config.New(), a, acker, true, false)
9797
h.ch <- unenrollPolicy
9898

9999
// backup action for future start to avoid starting fleet gateway loop

internal/pkg/agent/application/gateway/fleet/fleet_gateway.go

Lines changed: 53 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ type stateStore interface {
7171
AckToken() string
7272
SetAckToken(ackToken string)
7373
Save() error
74+
Action() fleetapi.Action
7475
}
7576

7677
type FleetGateway struct {
@@ -356,15 +357,21 @@ func (f *FleetGateway) execute(ctx context.Context) (*fleetapi.CheckinResponse,
356357
// Fix loglevel with the current log level used by coordinator
357358
ecsMeta.Elastic.Agent.LogLevel = state.LogLevel.String()
358359

360+
action := f.stateStore.Action()
361+
agentPolicyID := getPolicyID(action)
362+
policyRevisionIDX := getPolicyRevisionIDX(action)
363+
359364
// checkin
360365
cmd := fleetapi.NewCheckinCmd(f.agentInfo, f.client)
361366
req := &fleetapi.CheckinRequest{
362-
AckToken: ackToken,
363-
Metadata: ecsMeta,
364-
Status: agentStateToString(state.State),
365-
Message: state.Message,
366-
Components: components,
367-
UpgradeDetails: state.UpgradeDetails,
367+
AckToken: ackToken,
368+
Metadata: ecsMeta,
369+
Status: agentStateToString(state.State),
370+
Message: state.Message,
371+
Components: components,
372+
UpgradeDetails: state.UpgradeDetails,
373+
AgentPolicyID: agentPolicyID,
374+
PolicyRevisionIDX: policyRevisionIDX,
368375
}
369376

370377
resp, took, err := cmd.Execute(ctx, req)
@@ -447,3 +454,43 @@ func RequestBackoff(done <-chan struct{}) backoff.Backoff {
447454
defaultFleetBackoffSettings.Max,
448455
)
449456
}
457+
458+
// getPolicyID will check that the passed action is a POLICY_CHANGE action and return the policy_id attribute of the policy as a string.
459+
func getPolicyID(action fleetapi.Action) string {
460+
policyChange, ok := action.(*fleetapi.ActionPolicyChange)
461+
if !ok {
462+
return ""
463+
}
464+
v, ok := policyChange.Data.Policy["policy_id"]
465+
if !ok {
466+
return ""
467+
}
468+
vv, ok := v.(string)
469+
if !ok {
470+
return ""
471+
}
472+
return vv
473+
}
474+
475+
// getPolicyRevisionIDX will check that the passed action is a POLICY_CHANGE action and return the policy_revision_idx attribute of the policy as an int64.
476+
// The function will attempt to convert the attribute to int64 if int or float64 is used in order to prevent issues from serialization.
477+
func getPolicyRevisionIDX(action fleetapi.Action) int64 {
478+
policyChange, ok := action.(*fleetapi.ActionPolicyChange)
479+
if !ok {
480+
return 0
481+
}
482+
v, ok := policyChange.Data.Policy["policy_revision_idx"]
483+
if !ok {
484+
return 0
485+
}
486+
switch vv := v.(type) {
487+
case int64:
488+
return vv
489+
case int:
490+
return int64(vv)
491+
case float64:
492+
return int64(vv)
493+
default:
494+
return 0
495+
}
496+
}

internal/pkg/agent/application/gateway/fleet/fleet_gateway_test.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -377,6 +377,74 @@ func TestFleetGateway(t *testing.T) {
377377
default:
378378
}
379379
})
380+
381+
t.Run("sends agent_policy_id and policy_revision_idx", func(t *testing.T) {
382+
ctx, cancel := context.WithCancel(t.Context())
383+
defer cancel()
384+
385+
scheduler := scheduler.NewStepper()
386+
client := newTestingClient()
387+
388+
log, _ := loggertest.New("fleet_gateway")
389+
390+
stateStore := newStateStore(t, log)
391+
stateStore.SetAction(&fleetapi.ActionPolicyChange{
392+
ActionID: "test-action-id",
393+
ActionType: fleetapi.ActionTypePolicyChange,
394+
Data: fleetapi.ActionPolicyChangeData{
395+
Policy: map[string]interface{}{
396+
"policy_id": "test-policy-id",
397+
"policy_revision_idx": 1,
398+
},
399+
},
400+
})
401+
err := stateStore.Save()
402+
require.NoError(t, err)
403+
404+
gateway, err := newFleetGatewayWithScheduler(
405+
log,
406+
settings,
407+
agentInfo,
408+
client,
409+
scheduler,
410+
noop.New(),
411+
emptyStateFetcher,
412+
stateStore,
413+
)
414+
require.NoError(t, err)
415+
416+
waitFn := ackSeq(
417+
client.Answer(func(headers http.Header, body io.Reader) (*http.Response, error) {
418+
data, err := io.ReadAll(body)
419+
require.NoError(t, err)
420+
421+
var checkinRequest fleetapi.CheckinRequest
422+
err = json.Unmarshal(data, &checkinRequest)
423+
require.NoError(t, err)
424+
425+
require.Equal(t, "test-policy-id", checkinRequest.AgentPolicyID)
426+
require.Equal(t, int64(1), checkinRequest.PolicyRevisionIDX)
427+
428+
resp := wrapStrToResp(http.StatusOK, `{ "actions": [] }`)
429+
return resp, nil
430+
}),
431+
)
432+
433+
errCh := runFleetGateway(ctx, gateway)
434+
435+
// Synchronize scheduler and acking of calls from the worker go routine.
436+
scheduler.Next()
437+
waitFn()
438+
439+
cancel()
440+
err = <-errCh
441+
require.NoError(t, err)
442+
select {
443+
case actions := <-gateway.Actions():
444+
t.Errorf("Expected no actions, got %v", actions)
445+
default:
446+
}
447+
})
380448
}
381449

382450
func TestRetriesOnFailures(t *testing.T) {

0 commit comments

Comments
 (0)