Skip to content

Commit 47c3de7

Browse files
authored
Merge branch 'main' into autoops/otel-send-smaller-batches
2 parents 3a7fec4 + f2c4cfa commit 47c3de7

File tree

10 files changed

+318
-79
lines changed

10 files changed

+318
-79
lines changed
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
# Kind can be one of:
2+
# - breaking-change: a change to previously-documented behavior
3+
# - deprecation: functionality that is being removed in a later release
4+
# - bug-fix: fixes a problem in a previous version
5+
# - enhancement: extends functionality but does not break or fix existing behavior
6+
# - feature: new functionality
7+
# - known-issue: problems that we are aware of in a given version
8+
# - security: impacts on the security of a product or a user’s deployment.
9+
# - upgrade: important information for someone upgrading from a prior version
10+
# - other: does not fit into any of the other categories
11+
kind: feature
12+
13+
# Change summary; a 80ish characters long description of the change.
14+
summary: Add agent_policy_id and policy_revision_idx to checkin requests
15+
16+
# Long description; in case the summary is not enough to describe the change
17+
# this field accommodate a description without length limits.
18+
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
19+
description: |
20+
Add agent_policy_id and policy_revision_idx attributes to checkin requests.
21+
These attributes are used to inform fleet-server of the policy id and revision that the agent is currently running.
22+
Add a feature flag to disable sending acks for POLICY_CHANGE actions on a future release.
23+
24+
# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
25+
component: elastic-agent
26+
27+
# PR URL; optional; the PR number that added the changeset.
28+
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
29+
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
30+
# Please provide it if you are adding a fragment for a different PR.
31+
pr: https://github.com/elastic/elastic-agent/pull/9931
32+
33+
# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
34+
# If not present is automatically filled by the tooling with the issue linked to the PR number.
35+
issue: https://github.com/elastic/elastic-agent/issues/6446
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
# Kind can be one of:
2+
# - breaking-change: a change to previously-documented behavior
3+
# - deprecation: functionality that is being removed in a later release
4+
# - bug-fix: fixes a problem in a previous version
5+
# - enhancement: extends functionality but does not break or fix existing behavior
6+
# - feature: new functionality
7+
# - known-issue: problems that we are aware of in a given version
8+
# - security: impacts on the security of a product or a user’s deployment.
9+
# - upgrade: important information for someone upgrading from a prior version
10+
# - other: does not fit into any of the other categories
11+
kind: feature
12+
13+
# Change summary; a 80ish characters long description of the change.
14+
summary: Remove resource/k8s processor and use k8sattributes processor for mOTEL service attributes
15+
16+
# Long description; in case the summary is not enough to describe the change
17+
# this field accommodate a description without length limits.
18+
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
19+
description: |
20+
This PR removes the `resource/k8s` processor in honour of the k8sattributes processor that
21+
provides native support for the Service attributes:
22+
https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/v0.127.0/processor/k8sattributesprocessor#configuring-recommended-resource-attributes
23+
This change is aligned with the respective Semantic Conventions' guidance:
24+
https://opentelemetry.io/docs/specs/semconv/non-normative/k8s-attributes/#service-attributes
25+
26+
# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
27+
component: elastic-agent
28+
29+
# PR URL; optional; the PR number that added the changeset.
30+
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
31+
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
32+
# Please provide it if you are adding a fragment for a different PR.
33+
#pr: https://github.com/owner/repo/1234
34+
35+
# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
36+
# If not present is automatically filled by the tooling with the issue linked to the PR number.
37+
#issue: https://github.com/owner/repo/1234

deploy/helm/edot-collector/kube-stack/managed_otlp/values.yaml

Lines changed: 6 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -73,22 +73,6 @@ collectors:
7373
resource_attributes:
7474
k8s.cluster.name:
7575
enabled: true
76-
# [Resource Processor](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/resourceprocessor)
77-
resource/k8s: # Resource attributes tailored for services within Kubernetes.
78-
attributes:
79-
- key: service.name # Set the service.name resource attribute based on the well-known app.kubernetes.io/name label
80-
from_attribute: app.label.name
81-
action: insert
82-
- key: service.name # Set the service.name resource attribute based on the k8s.container.name attribute
83-
from_attribute: k8s.container.name
84-
action: insert
85-
- key: app.label.name # Delete app.label.name attribute previously used for service.name
86-
action: delete
87-
- key: service.version # Set the service.version resource attribute based on the well-known app.kubernetes.io/version label
88-
from_attribute: app.label.version
89-
action: insert
90-
- key: app.label.version # Delete app.label.version attribute previously used for service.version
91-
action: delete
9276
resource/hostname:
9377
attributes:
9478
- key: host.name
@@ -121,13 +105,9 @@ collectors:
121105
- "k8s.pod.ip"
122106
- "k8s.pod.uid"
123107
- "k8s.pod.start_time"
124-
labels:
125-
- tag_name: app.label.name
126-
key: app.kubernetes.io/name
127-
from: pod
128-
- tag_name: app.label.version
129-
key: app.kubernetes.io/version
130-
from: pod
108+
# Service attributes added based on https://opentelemetry.io/docs/specs/semconv/non-normative/k8s-attributes/#service-attributes
109+
- "service.name"
110+
- "service.version"
131111
receivers:
132112
# [K8s Objects Receiver](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/receiver/k8sobjectsreceiver)
133113
k8sobjects:
@@ -171,7 +151,6 @@ collectors:
171151
- resourcedetection/eks
172152
- resourcedetection/gcp
173153
- resourcedetection/aks
174-
- resource/k8s
175154
- resource/hostname
176155
receivers:
177156
- k8s_cluster
@@ -290,22 +269,6 @@ collectors:
290269
enabled: false
291270
host.id:
292271
enabled: true
293-
# [Resource Processor](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/resourceprocessor)
294-
resource/k8s: # Resource attributes tailored for services within Kubernetes.
295-
attributes:
296-
- key: service.name # Set the service.name resource attribute based on the well-known app.kubernetes.io/name label
297-
from_attribute: app.label.name
298-
action: insert
299-
- key: service.name # Set the service.name resource attribute based on the k8s.container.name attribute
300-
from_attribute: k8s.container.name
301-
action: insert
302-
- key: app.label.name # Delete app.label.name attribute previously used for service.name
303-
action: delete
304-
- key: service.version # Set the service.version resource attribute based on the well-known app.kubernetes.io/version label
305-
from_attribute: app.label.version
306-
action: insert
307-
- key: app.label.version # Delete app.label.version attribute previously used for service.version
308-
action: delete
309272
resource/cloud:
310273
attributes:
311274
- key: cloud.instance.id
@@ -341,13 +304,9 @@ collectors:
341304
- "k8s.pod.ip"
342305
- "k8s.pod.uid"
343306
- "k8s.pod.start_time"
344-
labels:
345-
- tag_name: app.label.name
346-
key: app.kubernetes.io/name
347-
from: pod
348-
- tag_name: app.label.version
349-
key: app.kubernetes.io/version
350-
from: pod
307+
# Service attributes added based on https://opentelemetry.io/docs/specs/semconv/non-normative/k8s-attributes/#service-attributes
308+
- "service.name"
309+
- "service.version"
351310
receivers:
352311
# [OTLP Receiver](https://github.com/open-telemetry/opentelemetry-collector/tree/main/receiver/otlpreceiver)
353312
otlp:
@@ -493,7 +452,6 @@ collectors:
493452
- resourcedetection/eks
494453
- resourcedetection/gcp
495454
- resourcedetection/aks
496-
- resource/k8s
497455
- resource/hostname
498456
- resource/cloud
499457
exporters:
@@ -509,7 +467,6 @@ collectors:
509467
- resourcedetection/eks
510468
- resourcedetection/gcp
511469
- resourcedetection/aks
512-
- resource/k8s
513470
- resource/hostname
514471
- resource/cloud
515472
exporters:

internal/pkg/agent/application/actions/handlers/handler_action_policy_change.go

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"github.com/elastic/elastic-agent/internal/pkg/fleetapi/client"
3030
"github.com/elastic/elastic-agent/internal/pkg/remote"
3131
"github.com/elastic/elastic-agent/pkg/core/logger"
32+
"github.com/elastic/elastic-agent/pkg/features"
3233
)
3334

3435
// PolicyChangeHandler is a handler for POLICY_CHANGE action.
@@ -41,6 +42,7 @@ type PolicyChangeHandler struct {
4142
setters []actions.ClientSetter
4243
policyLogLevelSetter logLevelSetter
4344
coordinator *coordinator.Coordinator
45+
disableAckFn func() bool
4446
// Disabled for 8.8.0 release in order to limit the surface
4547
// https://github.com/elastic/security-team/issues/6501
4648
// // Last known valid signature validation key
@@ -67,6 +69,7 @@ func NewPolicyChangeHandler(
6769
setters: setters,
6870
coordinator: coordinator,
6971
policyLogLevelSetter: policyLogLevelSetter,
72+
disableAckFn: features.DisablePolicyChangeAcks,
7073
}
7174
}
7275

@@ -111,7 +114,7 @@ func (h *PolicyChangeHandler) Handle(ctx context.Context, a fleetapi.Action, ack
111114
return err
112115
}
113116

114-
h.ch <- newPolicyChange(ctx, c, a, acker, false)
117+
h.ch <- newPolicyChange(ctx, c, a, acker, false, h.disableAckFn())
115118
return nil
116119
}
117120

@@ -473,18 +476,19 @@ type policyChange struct {
473476
cfg *config.Config
474477
action fleetapi.Action
475478
acker acker.Acker
476-
commit bool
477479
ackWatcher chan struct{}
480+
disableAck bool
478481
}
479482

480483
func newPolicyChange(
481484
ctx context.Context,
482485
config *config.Config,
483486
action fleetapi.Action,
484487
acker acker.Acker,
485-
commit bool) *policyChange {
488+
makeCh bool,
489+
disableAck bool) *policyChange {
486490
var ackWatcher chan struct{}
487-
if commit {
491+
if makeCh {
488492
// we don't need it otherwise
489493
ackWatcher = make(chan struct{})
490494
}
@@ -493,39 +497,38 @@ func newPolicyChange(
493497
cfg: config,
494498
action: action,
495499
acker: acker,
496-
commit: true,
497500
ackWatcher: ackWatcher,
501+
disableAck: disableAck,
498502
}
499503
}
500504

501505
func (l *policyChange) Config() *config.Config {
502506
return l.cfg
503507
}
504508

509+
// Ack sends an ack for the associated action if the results are expected.
510+
// An ack will be sent for UNENROLL actions, or by POLICY_CHANGE actions if it has not been explicitly disabled.
505511
func (l *policyChange) Ack() error {
506-
if l.action == nil {
512+
if l.disableAck || l.action == nil {
507513
return nil
508514
}
509515
err := l.acker.Ack(l.ctx, l.action)
510516
if err != nil {
511517
return err
512518
}
513-
if l.commit {
514-
err := l.acker.Commit(l.ctx)
515-
if l.ackWatcher != nil && err == nil {
516-
close(l.ackWatcher)
517-
}
518-
return err
519+
err = l.acker.Commit(l.ctx)
520+
if err == nil && l.ackWatcher != nil {
521+
close(l.ackWatcher)
519522
}
520-
return nil
523+
return err
521524
}
522525

523526
// WaitAck waits for policy change to be acked.
524527
// Policy change ack is awaitable only in case commit flag was set.
525528
// Caller is responsible to use any reasonable deadline otherwise
526529
// function call can be endlessly blocking.
527530
func (l *policyChange) WaitAck(ctx context.Context) {
528-
if !l.commit || l.ackWatcher == nil {
531+
if l.ackWatcher == nil {
529532
return
530533
}
531534

internal/pkg/agent/application/actions/handlers/handler_action_policy_change_test.go

Lines changed: 58 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ func TestPolicyAcked(t *testing.T) {
105105
agentInfo := &info.AgentInfo{}
106106
nullStore := &storage.NullStore{}
107107

108-
t.Run("Config change should ACK", func(t *testing.T) {
108+
t.Run("Default: Config changes are ACKed", func(t *testing.T) {
109109
ch := make(chan coordinator.ConfigChange, 1)
110110
tacker := &testAcker{}
111111

@@ -119,6 +119,7 @@ func TestPolicyAcked(t *testing.T) {
119119
},
120120
}
121121

122+
// Test default FF value
122123
cfg := configuration.DefaultConfiguration()
123124
handler := NewPolicyChangeHandler(log, agentInfo, cfg, nullStore, ch, nilLogLevelSet(t), &coordinator.Coordinator{})
124125

@@ -129,9 +130,64 @@ func TestPolicyAcked(t *testing.T) {
129130
require.NoError(t, change.Ack())
130131

131132
actions := tacker.Items()
132-
assert.EqualValues(t, 1, len(actions))
133+
assert.Len(t, actions, 1)
133134
assert.Equal(t, actionID, actions[0])
134135
})
136+
t.Run("Config change acks when forced", func(t *testing.T) {
137+
ch := make(chan coordinator.ConfigChange, 1)
138+
tacker := &testAcker{}
139+
140+
config := map[string]interface{}{"hello": "world"}
141+
actionID := "abc123"
142+
action := &fleetapi.ActionPolicyChange{
143+
ActionID: actionID,
144+
ActionType: "POLICY_CHANGE",
145+
Data: fleetapi.ActionPolicyChangeData{
146+
Policy: config,
147+
},
148+
}
149+
150+
cfg := configuration.DefaultConfiguration()
151+
handler := NewPolicyChangeHandler(log, agentInfo, cfg, nullStore, ch, nilLogLevelSet(t), &coordinator.Coordinator{})
152+
handler.disableAckFn = func() bool { return false }
153+
154+
err := handler.Handle(context.Background(), action, tacker)
155+
require.NoError(t, err)
156+
157+
change := <-ch
158+
require.NoError(t, change.Ack())
159+
160+
actions := tacker.Items()
161+
assert.Len(t, actions, 1)
162+
assert.Equal(t, actionID, actions[0])
163+
})
164+
t.Run("Config change do not ack when disabled", func(t *testing.T) {
165+
ch := make(chan coordinator.ConfigChange, 1)
166+
tacker := &testAcker{}
167+
168+
config := map[string]interface{}{"hello": "world"}
169+
actionID := "abc123"
170+
action := &fleetapi.ActionPolicyChange{
171+
ActionID: actionID,
172+
ActionType: "POLICY_CHANGE",
173+
Data: fleetapi.ActionPolicyChangeData{
174+
Policy: config,
175+
},
176+
}
177+
178+
cfg := configuration.DefaultConfiguration()
179+
handler := NewPolicyChangeHandler(log, agentInfo, cfg, nullStore, ch, nilLogLevelSet(t), &coordinator.Coordinator{})
180+
handler.disableAckFn = func() bool { return true }
181+
182+
err := handler.Handle(context.Background(), action, tacker)
183+
require.NoError(t, err)
184+
185+
change := <-ch
186+
require.NoError(t, change.Ack())
187+
188+
actions := tacker.Items()
189+
assert.Empty(t, actions)
190+
})
135191
}
136192

137193
func TestPolicyChangeHandler_handlePolicyChange_FleetClientSettings(t *testing.T) {

internal/pkg/agent/application/actions/handlers/handler_action_unenroll.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ func (h *Unenroll) handle(ctx context.Context, a fleetapi.Action, acker acker.Ac
9393
}
9494

9595
// Generate empty policy change, this removing all the running components
96-
unenrollPolicy := newPolicyChange(ctx, config.New(), a, acker, true)
96+
unenrollPolicy := newPolicyChange(ctx, config.New(), a, acker, true, false)
9797
h.ch <- unenrollPolicy
9898

9999
// backup action for future start to avoid starting fleet gateway loop

0 commit comments

Comments
 (0)