Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: feature

# Change summary; a 80ish characters long description of the change.
summary: Add agent_policy_id and policy_revision_idx to checkin requests

# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
description: |
Add agent_policy_id and policy_revision_idx attributes to checkin requests.
These attributes are used to inform fleet-server of the policy id and revision that the agent is currently running.
Add a feature flag to disable sending acks for POLICY_CHANGE actions on a future release.

# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
component: elastic-agent

# PR URL; optional; the PR number that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
pr: https://github.com/elastic/elastic-agent/pull/9931

# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
issue: https://github.com/elastic/elastic-agent/issues/6446
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/elastic/elastic-agent/internal/pkg/fleetapi/client"
"github.com/elastic/elastic-agent/internal/pkg/remote"
"github.com/elastic/elastic-agent/pkg/core/logger"
"github.com/elastic/elastic-agent/pkg/features"
)

// PolicyChangeHandler is a handler for POLICY_CHANGE action.
Expand All @@ -41,6 +42,7 @@ type PolicyChangeHandler struct {
setters []actions.ClientSetter
policyLogLevelSetter logLevelSetter
coordinator *coordinator.Coordinator
disableAckFn func() bool
// Disabled for 8.8.0 release in order to limit the surface
// https://github.com/elastic/security-team/issues/6501
// // Last known valid signature validation key
Expand All @@ -67,6 +69,7 @@ func NewPolicyChangeHandler(
setters: setters,
coordinator: coordinator,
policyLogLevelSetter: policyLogLevelSetter,
disableAckFn: features.DisablePolicyChangeAcks,
}
}

Expand Down Expand Up @@ -111,7 +114,7 @@ func (h *PolicyChangeHandler) Handle(ctx context.Context, a fleetapi.Action, ack
return err
}

h.ch <- newPolicyChange(ctx, c, a, acker, false)
h.ch <- newPolicyChange(ctx, c, a, acker, false, h.disableAckFn())
return nil
}

Expand Down Expand Up @@ -473,18 +476,19 @@ type policyChange struct {
cfg *config.Config
action fleetapi.Action
acker acker.Acker
commit bool
ackWatcher chan struct{}
disableAck bool
}

func newPolicyChange(
ctx context.Context,
config *config.Config,
action fleetapi.Action,
acker acker.Acker,
commit bool) *policyChange {
makeCh bool,
disableAck bool) *policyChange {
var ackWatcher chan struct{}
if commit {
if makeCh {
// we don't need it otherwise
ackWatcher = make(chan struct{})
}
Expand All @@ -493,39 +497,38 @@ func newPolicyChange(
cfg: config,
action: action,
acker: acker,
commit: true,
ackWatcher: ackWatcher,
disableAck: disableAck,
}
}

func (l *policyChange) Config() *config.Config {
return l.cfg
}

// Ack sends an ack for the associated action if the results are expected.
// An ack will be sent for UNENROLL actions, or by POLICY_CHANGE actions if it has not been explicitly disabled.
func (l *policyChange) Ack() error {
if l.action == nil {
if l.disableAck || l.action == nil {
return nil
}
err := l.acker.Ack(l.ctx, l.action)
if err != nil {
return err
}
if l.commit {
err := l.acker.Commit(l.ctx)
if l.ackWatcher != nil && err == nil {
close(l.ackWatcher)
}
return err
err = l.acker.Commit(l.ctx)
if err == nil && l.ackWatcher != nil {
close(l.ackWatcher)
}
return nil
return err
}

// WaitAck waits for policy change to be acked.
// Policy change ack is awaitable only in case commit flag was set.
// Caller is responsible to use any reasonable deadline otherwise
// function call can be endlessly blocking.
func (l *policyChange) WaitAck(ctx context.Context) {
if !l.commit || l.ackWatcher == nil {
if l.ackWatcher == nil {
return
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func TestPolicyAcked(t *testing.T) {
agentInfo := &info.AgentInfo{}
nullStore := &storage.NullStore{}

t.Run("Config change should ACK", func(t *testing.T) {
t.Run("Default: Config changes are ACKed", func(t *testing.T) {
ch := make(chan coordinator.ConfigChange, 1)
tacker := &testAcker{}

Expand All @@ -119,6 +119,7 @@ func TestPolicyAcked(t *testing.T) {
},
}

// Test default FF value
cfg := configuration.DefaultConfiguration()
handler := NewPolicyChangeHandler(log, agentInfo, cfg, nullStore, ch, nilLogLevelSet(t), &coordinator.Coordinator{})

Expand All @@ -129,9 +130,64 @@ func TestPolicyAcked(t *testing.T) {
require.NoError(t, change.Ack())

actions := tacker.Items()
assert.EqualValues(t, 1, len(actions))
assert.Len(t, actions, 1)
assert.Equal(t, actionID, actions[0])
})
t.Run("Config change acks when forced", func(t *testing.T) {
ch := make(chan coordinator.ConfigChange, 1)
tacker := &testAcker{}

config := map[string]interface{}{"hello": "world"}
actionID := "abc123"
action := &fleetapi.ActionPolicyChange{
ActionID: actionID,
ActionType: "POLICY_CHANGE",
Data: fleetapi.ActionPolicyChangeData{
Policy: config,
},
}

cfg := configuration.DefaultConfiguration()
handler := NewPolicyChangeHandler(log, agentInfo, cfg, nullStore, ch, nilLogLevelSet(t), &coordinator.Coordinator{})
handler.disableAckFn = func() bool { return false }

err := handler.Handle(context.Background(), action, tacker)
require.NoError(t, err)

change := <-ch
require.NoError(t, change.Ack())

actions := tacker.Items()
assert.Len(t, actions, 1)
assert.Equal(t, actionID, actions[0])
})
t.Run("Config change do not ack when disabled", func(t *testing.T) {
ch := make(chan coordinator.ConfigChange, 1)
tacker := &testAcker{}

config := map[string]interface{}{"hello": "world"}
actionID := "abc123"
action := &fleetapi.ActionPolicyChange{
ActionID: actionID,
ActionType: "POLICY_CHANGE",
Data: fleetapi.ActionPolicyChangeData{
Policy: config,
},
}

cfg := configuration.DefaultConfiguration()
handler := NewPolicyChangeHandler(log, agentInfo, cfg, nullStore, ch, nilLogLevelSet(t), &coordinator.Coordinator{})
handler.disableAckFn = func() bool { return true }

err := handler.Handle(context.Background(), action, tacker)
require.NoError(t, err)

change := <-ch
require.NoError(t, change.Ack())

actions := tacker.Items()
assert.Empty(t, actions)
})
}

func TestPolicyChangeHandler_handlePolicyChange_FleetClientSettings(t *testing.T) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (h *Unenroll) handle(ctx context.Context, a fleetapi.Action, acker acker.Ac
}

// Generate empty policy change, this removing all the running components
unenrollPolicy := newPolicyChange(ctx, config.New(), a, acker, true)
unenrollPolicy := newPolicyChange(ctx, config.New(), a, acker, true, false)
h.ch <- unenrollPolicy

// backup action for future start to avoid starting fleet gateway loop
Expand Down
59 changes: 53 additions & 6 deletions internal/pkg/agent/application/gateway/fleet/fleet_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ type stateStore interface {
AckToken() string
SetAckToken(ackToken string)
Save() error
Action() fleetapi.Action
}

type FleetGateway struct {
Expand Down Expand Up @@ -356,15 +357,21 @@ func (f *FleetGateway) execute(ctx context.Context) (*fleetapi.CheckinResponse,
// Fix loglevel with the current log level used by coordinator
ecsMeta.Elastic.Agent.LogLevel = state.LogLevel.String()

action := f.stateStore.Action()
agentPolicyID := getPolicyID(action)
policyRevisionIDX := getPolicyRevisionIDX(action)

// checkin
cmd := fleetapi.NewCheckinCmd(f.agentInfo, f.client)
req := &fleetapi.CheckinRequest{
AckToken: ackToken,
Metadata: ecsMeta,
Status: agentStateToString(state.State),
Message: state.Message,
Components: components,
UpgradeDetails: state.UpgradeDetails,
AckToken: ackToken,
Metadata: ecsMeta,
Status: agentStateToString(state.State),
Message: state.Message,
Components: components,
UpgradeDetails: state.UpgradeDetails,
AgentPolicyID: agentPolicyID,
PolicyRevisionIDX: policyRevisionIDX,
}

resp, took, err := cmd.Execute(ctx, req)
Expand Down Expand Up @@ -447,3 +454,43 @@ func RequestBackoff(done <-chan struct{}) backoff.Backoff {
defaultFleetBackoffSettings.Max,
)
}

// getPolicyID will check that the passed action is a POLICY_CHANGE action and return the policy_id attribute of the policy as a string.
func getPolicyID(action fleetapi.Action) string {
policyChange, ok := action.(*fleetapi.ActionPolicyChange)
if !ok {
return ""
}
v, ok := policyChange.Data.Policy["policy_id"]
if !ok {
return ""
}
vv, ok := v.(string)
if !ok {
return ""
}
return vv
}

// getPolicyRevisionIDX will check that the passed action is a POLICY_CHANGE action and return the policy_revision_idx attribute of the policy as an int64.
// The function will attempt to convert the attribute to int64 if int or float64 is used in order to prevent issues from serialization.
func getPolicyRevisionIDX(action fleetapi.Action) int64 {
policyChange, ok := action.(*fleetapi.ActionPolicyChange)
if !ok {
return 0
}
v, ok := policyChange.Data.Policy["policy_revision_idx"]
if !ok {
return 0
}
switch vv := v.(type) {
case int64:
return vv
case int:
return int64(vv)
case float64:
return int64(vv)
default:
return 0
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,74 @@ func TestFleetGateway(t *testing.T) {
default:
}
})

t.Run("sends agent_policy_id and policy_revision_idx", func(t *testing.T) {
ctx, cancel := context.WithCancel(t.Context())
defer cancel()

scheduler := scheduler.NewStepper()
client := newTestingClient()

log, _ := loggertest.New("fleet_gateway")

stateStore := newStateStore(t, log)
stateStore.SetAction(&fleetapi.ActionPolicyChange{
ActionID: "test-action-id",
ActionType: fleetapi.ActionTypePolicyChange,
Data: fleetapi.ActionPolicyChangeData{
Policy: map[string]interface{}{
"policy_id": "test-policy-id",
"policy_revision_idx": 1,
},
},
})
err := stateStore.Save()
require.NoError(t, err)

gateway, err := newFleetGatewayWithScheduler(
log,
settings,
agentInfo,
client,
scheduler,
noop.New(),
emptyStateFetcher,
stateStore,
)
require.NoError(t, err)

waitFn := ackSeq(
client.Answer(func(headers http.Header, body io.Reader) (*http.Response, error) {
data, err := io.ReadAll(body)
require.NoError(t, err)

var checkinRequest fleetapi.CheckinRequest
err = json.Unmarshal(data, &checkinRequest)
require.NoError(t, err)

require.Equal(t, "test-policy-id", checkinRequest.AgentPolicyID)
require.Equal(t, int64(1), checkinRequest.PolicyRevisionIDX)

resp := wrapStrToResp(http.StatusOK, `{ "actions": [] }`)
return resp, nil
}),
)

errCh := runFleetGateway(ctx, gateway)

// Synchronize scheduler and acking of calls from the worker go routine.
scheduler.Next()
waitFn()

cancel()
err = <-errCh
require.NoError(t, err)
select {
case actions := <-gateway.Actions():
t.Errorf("Expected no actions, got %v", actions)
default:
}
})
}

func TestRetriesOnFailures(t *testing.T) {
Expand Down
Loading
Loading