Skip to content

Commit e69f817

Browse files
authored
[OTel] Propagate IndexPrefix to OTel context (#46638)
Propagate beat IndexPrefix to otelctx. Logstash Exporter needs IndexPrefix to initialise logstash client
1 parent c5e8d26 commit e69f817

File tree

5 files changed

+227
-44
lines changed

5 files changed

+227
-44
lines changed

libbeat/otelbeat/otelctx/otelctx.go

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,18 +26,24 @@ import (
2626
)
2727

2828
const (
29-
BeatNameCtxKey = "beat_name"
30-
BeatVersionCtxKey = "beat_version"
29+
BeatNameCtxKey = "beat_name"
30+
BeatVersionCtxKey = "beat_version"
31+
BeatIndexPrefixCtxKey = "beat_index_prefix"
32+
33+
MetadataBeatKey = "beat"
34+
MetadataVersionKey = "version"
35+
MetadataIndexPrefixKey = "index_prefix"
3136
)
3237

3338
// NewConsumerContext creates a new context.Context adding the beats metadata
34-
// to the client.Info. This is used to pass the beat name and version to the
39+
// to the client.Info. This is used to pass the beat name, version and index prefix to the
3540
// Collector, so it can be used by the components to access that data.
3641
func NewConsumerContext(ctx context.Context, beatInfo beat.Info) context.Context {
3742
clientInfo := client.Info{
3843
Metadata: client.NewMetadata(map[string][]string{
39-
BeatNameCtxKey: {beatInfo.Beat},
40-
BeatVersionCtxKey: {beatInfo.Version},
44+
BeatNameCtxKey: {beatInfo.Beat},
45+
BeatVersionCtxKey: {beatInfo.Version},
46+
BeatIndexPrefixCtxKey: {beatInfo.IndexPrefix},
4147
}),
4248
}
4349
return client.NewContext(ctx, clientInfo)
@@ -63,18 +69,32 @@ func GetBeatVersion(ctx context.Context) string {
6369
return ""
6470
}
6571

72+
// GetBeatIndexPrefix retrieves the beat index prefix from the context metadata
73+
// If it is not found, it returns an empty string.
74+
func GetBeatIndexPrefix(ctx context.Context) string {
75+
clientInfo := client.FromContext(ctx)
76+
if values := clientInfo.Metadata.Get(BeatIndexPrefixCtxKey); len(values) > 0 {
77+
return values[0]
78+
}
79+
return ""
80+
}
81+
6682
// GetBeatEventMeta gives beat.Event.Meta from the context metadata
6783
func GetBeatEventMeta(ctx context.Context) map[string]any {
6884
ctxData := client.FromContext(ctx)
69-
var beatName, beatVersion string
85+
var beatName, beatVersion, beatIndexPrefix string
7086
if v := ctxData.Metadata.Get(BeatNameCtxKey); len(v) > 0 {
7187
beatName = v[0]
7288
}
7389
if v := ctxData.Metadata.Get(BeatVersionCtxKey); len(v) > 0 {
7490
beatVersion = v[0]
7591
}
92+
if v := ctxData.Metadata.Get(BeatIndexPrefixCtxKey); len(v) > 0 {
93+
beatIndexPrefix = v[0]
94+
}
7695
return map[string]any{
77-
"beat": beatName,
78-
"version": beatVersion,
96+
MetadataBeatKey: beatName,
97+
MetadataVersionKey: beatVersion,
98+
MetadataIndexPrefixKey: beatIndexPrefix,
7999
}
80100
}

libbeat/otelbeat/otelctx/otelctx_test.go

Lines changed: 87 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -37,15 +37,17 @@ func TestGetBeatEventMeta(t *testing.T) {
3737
ctx := t.Context()
3838
info := client.Info{
3939
Metadata: client.NewMetadata(map[string][]string{
40-
BeatNameCtxKey: {"filebeat"},
41-
BeatVersionCtxKey: {"8.0.0"},
40+
BeatNameCtxKey: {"something"},
41+
BeatIndexPrefixCtxKey: {"filebeat"},
42+
BeatVersionCtxKey: {"8.0.0"},
4243
}),
4344
}
4445
return client.NewContext(ctx, info)
4546
},
4647
expected: map[string]any{
47-
"beat": "filebeat",
48-
"version": "8.0.0",
48+
MetadataBeatKey: "something",
49+
MetadataVersionKey: "8.0.0",
50+
MetadataIndexPrefixKey: "filebeat",
4951
},
5052
},
5153
{
@@ -54,14 +56,34 @@ func TestGetBeatEventMeta(t *testing.T) {
5456
ctx := t.Context()
5557
info := client.Info{
5658
Metadata: client.NewMetadata(map[string][]string{
59+
BeatIndexPrefixCtxKey: {"filebeat"},
60+
BeatVersionCtxKey: {"8.0.0"},
61+
}),
62+
}
63+
return client.NewContext(ctx, info)
64+
},
65+
expected: map[string]any{
66+
MetadataBeatKey: "",
67+
MetadataVersionKey: "8.0.0",
68+
MetadataIndexPrefixKey: "filebeat",
69+
},
70+
},
71+
{
72+
name: "missing beat index prefix",
73+
setupCtx: func() context.Context {
74+
ctx := t.Context()
75+
info := client.Info{
76+
Metadata: client.NewMetadata(map[string][]string{
77+
BeatNameCtxKey: {"filebeat"},
5778
BeatVersionCtxKey: {"8.0.0"},
5879
}),
5980
}
6081
return client.NewContext(ctx, info)
6182
},
6283
expected: map[string]any{
63-
"beat": "",
64-
"version": "8.0.0",
84+
MetadataBeatKey: "filebeat",
85+
MetadataVersionKey: "8.0.0",
86+
MetadataIndexPrefixKey: "",
6587
},
6688
},
6789
{
@@ -70,14 +92,16 @@ func TestGetBeatEventMeta(t *testing.T) {
7092
ctx := t.Context()
7193
info := client.Info{
7294
Metadata: client.NewMetadata(map[string][]string{
73-
BeatNameCtxKey: {"filebeat"},
95+
BeatNameCtxKey: {"something"},
96+
BeatIndexPrefixCtxKey: {"filebeat"},
7497
}),
7598
}
7699
return client.NewContext(ctx, info)
77100
},
78101
expected: map[string]any{
79-
"beat": "filebeat",
80-
"version": "",
102+
MetadataBeatKey: "something",
103+
MetadataVersionKey: "",
104+
MetadataIndexPrefixKey: "filebeat",
81105
},
82106
},
83107
{
@@ -90,8 +114,9 @@ func TestGetBeatEventMeta(t *testing.T) {
90114
return client.NewContext(ctx, info)
91115
},
92116
expected: map[string]any{
93-
"beat": "",
94-
"version": "",
117+
MetadataBeatKey: "",
118+
MetadataVersionKey: "",
119+
MetadataIndexPrefixKey: "",
95120
},
96121
},
97122
{
@@ -100,8 +125,9 @@ func TestGetBeatEventMeta(t *testing.T) {
100125
return t.Context()
101126
},
102127
expected: map[string]any{
103-
"beat": "",
104-
"version": "",
128+
MetadataBeatKey: "",
129+
MetadataVersionKey: "",
130+
MetadataIndexPrefixKey: "",
105131
},
106132
},
107133
}
@@ -217,3 +243,51 @@ func TestGetBeatName(t *testing.T) {
217243
})
218244
}
219245
}
246+
247+
func TestGetBeatIndexPrefix(t *testing.T) {
248+
tests := []struct {
249+
name string
250+
setupCtx func() context.Context
251+
expected string
252+
}{
253+
{
254+
name: "prefix exists",
255+
setupCtx: func() context.Context {
256+
ctx := t.Context()
257+
info := client.Info{
258+
Metadata: client.NewMetadata(map[string][]string{
259+
BeatIndexPrefixCtxKey: {"filebeat"},
260+
}),
261+
}
262+
return client.NewContext(ctx, info)
263+
},
264+
expected: "filebeat",
265+
},
266+
{
267+
name: "prefix missing",
268+
setupCtx: func() context.Context {
269+
ctx := t.Context()
270+
info := client.Info{
271+
Metadata: client.NewMetadata(map[string][]string{}),
272+
}
273+
return client.NewContext(ctx, info)
274+
},
275+
expected: "",
276+
},
277+
{
278+
name: "no client info",
279+
setupCtx: func() context.Context {
280+
return t.Context()
281+
},
282+
expected: "",
283+
},
284+
}
285+
286+
for _, tt := range tests {
287+
t.Run(tt.name, func(t *testing.T) {
288+
ctx := tt.setupCtx()
289+
name := GetBeatIndexPrefix(ctx)
290+
assert.Equal(t, tt.expected, name)
291+
})
292+
}
293+
}

x-pack/otel/exporter/logstashexporter/internal/batch_test.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"go.opentelemetry.io/collector/pdata/plog"
1717

1818
"github.com/elastic/beats/v7/libbeat/beat"
19+
"github.com/elastic/beats/v7/libbeat/otelbeat/otelctx"
1920
"github.com/elastic/beats/v7/libbeat/publisher"
2021
)
2122

@@ -32,8 +33,9 @@ func TestNewLogBatch(t *testing.T) {
3233
ctx := t.Context()
3334
info := client.Info{
3435
Metadata: client.NewMetadata(map[string][]string{
35-
"beat_name": {"filebeat"},
36-
"beat_version": {"8.0.0"},
36+
otelctx.BeatNameCtxKey: {"filebeat"},
37+
otelctx.BeatVersionCtxKey: {"8.0.0"},
38+
otelctx.BeatIndexPrefixCtxKey: {"filebeat"},
3739
}),
3840
}
3941
return client.NewContext(ctx, info)
@@ -67,8 +69,9 @@ func TestNewLogBatch(t *testing.T) {
6769
ctx := t.Context()
6870
info := client.Info{
6971
Metadata: client.NewMetadata(map[string][]string{
70-
"beat_name": {"filebeat"},
71-
"beat_version": {"8.0.0"},
72+
otelctx.BeatNameCtxKey: {"filebeat"},
73+
otelctx.BeatVersionCtxKey: {"8.0.0"},
74+
otelctx.BeatIndexPrefixCtxKey: {"filebeat"},
7275
}),
7376
}
7477
return client.NewContext(ctx, info)
@@ -123,8 +126,9 @@ func TestCreateEvents(t *testing.T) {
123126
ctx := t.Context()
124127
info := client.Info{
125128
Metadata: client.NewMetadata(map[string][]string{
126-
"beat_name": {"filebeat"},
127-
"beat_version": {"9.0.0"},
129+
otelctx.BeatNameCtxKey: {"filebeat"},
130+
otelctx.BeatVersionCtxKey: {"9.0.0"},
131+
otelctx.BeatIndexPrefixCtxKey: {"filebeat"},
128132
}),
129133
}
130134
return client.NewContext(ctx, info)

x-pack/otel/exporter/logstashexporter/internal/event.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ import (
1818
)
1919

2020
func parseEvent(ctx context.Context, logRecord *plog.LogRecord) (beat.Event, error) {
21-
metadata := otelctx.GetBeatEventMeta(ctx)
21+
metadata := getEventMeta(ctx)
2222
if !isBeatsEvent(metadata) {
2323
return beat.Event{}, consumererror.NewPermanent(errors.New("invalid beats event metadata"))
2424
}
@@ -66,3 +66,14 @@ func isBeatsEvent(metadata map[string]any) bool {
6666
v, ok := metadata["beat"]
6767
return ok && v != nil && v != ""
6868
}
69+
70+
// getEventMeta gives beat.Event.Meta from the context metadata
71+
// The value of `[@metadata][beat]` is taken from the `Index` option of logstash output.
72+
// In Elastic Agent, `Index` option is not available, hence, the value of `[@metadata][beat]` is derived from `IndexPrefix`
73+
func getEventMeta(ctx context.Context) map[string]any {
74+
metadata := otelctx.GetBeatEventMeta(ctx)
75+
return map[string]any{
76+
otelctx.MetadataBeatKey: metadata[otelctx.MetadataIndexPrefixKey],
77+
otelctx.MetadataVersionKey: metadata[otelctx.MetadataVersionKey],
78+
}
79+
}

0 commit comments

Comments
 (0)