Skip to content

Commit 2be002e

Browse files
panys9dehaansa
andauthored
[receiver/cloudflare] Support map fields (#40349)
#### Description Enable the receiver to consume fields from Cloudflare containing a map Also added a configuration option for setting a separator where `.` is default #### Link to tracking issue Fixes #40318 #### Testing Unit tests added --------- Signed-off-by: Patrik Nyström <[email protected]> Co-authored-by: Sam DeHaan <[email protected]>
1 parent 8feda79 commit 2be002e

File tree

7 files changed

+246
-4
lines changed

7 files changed

+246
-4
lines changed

.chloggen/feat-cloudflare-map.yaml

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: "enhancement"
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: "receiver/cloudflare"
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Enable the receiver to consume fields from Cloudflare containing a map
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [40318]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

receiver/cloudflarereceiver/README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ If the receiver will be handling TLS termination:
5050
- `attributes`
5151
- This parameter allows the receiver to be configured to set log record attributes based on fields found in the log message. The fields are not removed from the log message when set in this way. Only string, boolean, integer or float fields can be mapped using this parameter.
5252
- When the `attributes` configuration is empty, the receiver will automatically ingest all fields from the log messages as attributes, using the original field names as attribute names.
53+
- `separator` (default: `.`)
54+
- The separator used to join nested fields in the log message when setting attributes. For example, if the log message contains a field `"RequestHeaders": { "Content-Type": "application/json" }`, and the `separator` is set to `.`, the attribute will be set as `RequestHeaders.Content_Type`. If the separator is set to `_`, it will be set as `RequestHeaders_Content_Type`.
5355

5456

5557
### Example:

receiver/cloudflarereceiver/config.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ type LogsConfig struct {
2626
TLS *configtls.ServerConfig `mapstructure:"tls"`
2727
Attributes map[string]string `mapstructure:"attributes"`
2828
TimestampField string `mapstructure:"timestamp_field"`
29+
Separator string `mapstructure:"separator"`
2930

3031
// prevent unkeyed literal initialization
3132
_ struct{}
@@ -37,6 +38,7 @@ var (
3738
errNoKey = errors.New("tls was configured, but no key file was specified")
3839

3940
defaultTimestampField = "EdgeStartTimestamp"
41+
defaultSeparator = "."
4042
)
4143

4244
func (c *Config) Validate() error {

receiver/cloudflarereceiver/config_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ func TestLoadConfig(t *testing.T) {
123123
},
124124
Secret: "1234567890abcdef1234567890abcdef",
125125
TimestampField: "EdgeStartTimestamp",
126+
Separator: ".",
126127
Attributes: map[string]string{
127128
"ClientIP": "http_request.client_ip",
128129
"ClientRequestURI": "http_request.uri",

receiver/cloudflarereceiver/factory.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ func createDefaultConfig() component.Config {
3636
return &Config{
3737
Logs: LogsConfig{
3838
TimestampField: defaultTimestampField,
39+
Separator: defaultSeparator,
3940
},
4041
}
4142
}

receiver/cloudflarereceiver/logs.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"net"
1515
"net/http"
1616
"strconv"
17+
"strings"
1718
"sync"
1819
"time"
1920

@@ -310,6 +311,29 @@ func (l *logsReceiver) processLogs(now pcommon.Timestamp, logs []map[string]any)
310311
attrs.PutDouble(attrName, v)
311312
case bool:
312313
attrs.PutBool(attrName, v)
314+
case map[string]any:
315+
// Flatten the map and add each field with a prefixed key
316+
flattened := make(map[string]any)
317+
flattenMap(v, attrName+l.cfg.Separator, l.cfg.Separator, flattened)
318+
for k, val := range flattened {
319+
switch v := val.(type) {
320+
case string:
321+
attrs.PutStr(k, v)
322+
case int:
323+
attrs.PutInt(k, int64(v))
324+
case int64:
325+
attrs.PutInt(k, v)
326+
case float64:
327+
attrs.PutDouble(k, v)
328+
case bool:
329+
attrs.PutBool(k, v)
330+
default:
331+
l.logger.Warn("unable to translate flattened field to attribute, unsupported type",
332+
zap.String("field", k),
333+
zap.Any("value", v),
334+
zap.String("type", fmt.Sprintf("%T", v)))
335+
}
336+
}
313337
default:
314338
l.logger.Warn("unable to translate field to attribute, unsupported type",
315339
zap.String("field", field),
@@ -343,3 +367,20 @@ func severityFromStatusCode(statusCode int64) plog.SeverityNumber {
343367
return plog.SeverityNumberUnspecified
344368
}
345369
}
370+
371+
// flattenMap recursively flattens a map[string]any into a single level map
372+
// with keys joined by the specified separator
373+
func flattenMap(input map[string]any, prefix string, separator string, result map[string]any) {
374+
for k, v := range input {
375+
// Replace hyphens with underscores in the key. Content-Type becomes Content_Type
376+
k = strings.ReplaceAll(k, "-", "_")
377+
newKey := prefix + k
378+
switch val := v.(type) {
379+
case map[string]any:
380+
// Recursively flatten nested maps
381+
flattenMap(val, newKey+separator, separator, result)
382+
default:
383+
result[newKey] = v
384+
}
385+
}
386+
}

receiver/cloudflarereceiver/logs_test.go

Lines changed: 172 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -363,14 +363,14 @@ func TestEmptyAttributes(t *testing.T) {
363363
}{
364364
{
365365
name: "Empty Attributes Map",
366-
payload: `{ "ClientIP": "89.163.253.200", "ClientRequestHost": "www.theburritobot0.com", "ClientRequestMethod": "GET", "ClientRequestURI": "/static/img/testimonial-hipster.png", "EdgeEndTimestamp": "2023-03-03T05:30:05Z", "EdgeResponseBytes": "69045", "EdgeResponseStatus": "200", "EdgeStartTimestamp": "2023-03-03T05:29:05Z", "RayID": "3a6050bcbe121a87" }
367-
{ "ClientIP" : "89.163.253.201", "ClientRequestHost": "www.theburritobot1.com", "ClientRequestMethod": "GET", "ClientRequestURI": "/static/img/testimonial-hipster.png", "EdgeEndTimestamp": "2023-03-03T05:30:05Z", "EdgeResponseBytes": "69045", "EdgeResponseStatus": "200", "EdgeStartTimestamp": "2023-03-03T05:29:05Z", "RayID": "3a6050bcbe121a87" }`,
366+
payload: `{ "ClientIP": "89.163.253.200", "ClientRequestHost": "www.theburritobot0.com", "ClientRequestMethod": "GET", "ClientRequestURI": "/static/img/testimonial-hipster.png", "EdgeEndTimestamp": "2023-03-03T05:30:05Z", "EdgeResponseBytes": "69045", "EdgeResponseStatus": "200", "EdgeStartTimestamp": "2023-03-03T05:29:05Z", "RayID": "3a6050bcbe121a87", "RequestHeaders": { "Content-Type": "application/json" } }
367+
{ "ClientIP" : "89.163.253.201", "ClientRequestHost": "www.theburritobot1.com", "ClientRequestMethod": "GET", "ClientRequestURI": "/static/img/testimonial-hipster.png", "EdgeEndTimestamp": "2023-03-03T05:30:05Z", "EdgeResponseBytes": "69045", "EdgeResponseStatus": "200", "EdgeStartTimestamp": "2023-03-03T05:29:05Z", "RayID": "3a6050bcbe121a87", "RequestHeaders": { "Content-Type": "application/json" } }`,
368368
attributes: map[string]string{},
369369
},
370370
{
371371
name: "nil Attributes",
372-
payload: `{ "ClientIP": "89.163.253.200", "ClientRequestHost": "www.theburritobot0.com", "ClientRequestMethod": "GET", "ClientRequestURI": "/static/img/testimonial-hipster.png", "EdgeEndTimestamp": "2023-03-03T05:30:05Z", "EdgeResponseBytes": "69045", "EdgeResponseStatus": "200", "EdgeStartTimestamp": "2023-03-03T05:29:05Z", "RayID": "3a6050bcbe121a87" }
373-
{ "ClientIP" : "89.163.253.201", "ClientRequestHost": "www.theburritobot1.com", "ClientRequestMethod": "GET", "ClientRequestURI": "/static/img/testimonial-hipster.png", "EdgeEndTimestamp": "2023-03-03T05:30:05Z", "EdgeResponseBytes": "69045", "EdgeResponseStatus": "200", "EdgeStartTimestamp": "2023-03-03T05:29:05Z", "RayID": "3a6050bcbe121a87" }`,
372+
payload: `{ "ClientIP": "89.163.253.200", "ClientRequestHost": "www.theburritobot0.com", "ClientRequestMethod": "GET", "ClientRequestURI": "/static/img/testimonial-hipster.png", "EdgeEndTimestamp": "2023-03-03T05:30:05Z", "EdgeResponseBytes": "69045", "EdgeResponseStatus": "200", "EdgeStartTimestamp": "2023-03-03T05:29:05Z", "RayID": "3a6050bcbe121a87", "RequestHeaders": { "Content-Type": "application/json" } }
373+
{ "ClientIP" : "89.163.253.201", "ClientRequestHost": "www.theburritobot1.com", "ClientRequestMethod": "GET", "ClientRequestURI": "/static/img/testimonial-hipster.png", "EdgeEndTimestamp": "2023-03-03T05:30:05Z", "EdgeResponseBytes": "69045", "EdgeResponseStatus": "200", "EdgeStartTimestamp": "2023-03-03T05:29:05Z", "RayID": "3a6050bcbe121a87", "RequestHeaders": { "Content-Type": "application/json" } }`,
374374
attributes: nil,
375375
},
376376
}
@@ -381,6 +381,94 @@ func TestEmptyAttributes(t *testing.T) {
381381
sl := rl.ScopeLogs().AppendEmpty()
382382
sl.Scope().SetName("github.com/open-telemetry/opentelemetry-collector-contrib/receiver/cloudflarereceiver")
383383

384+
for idx, line := range strings.Split(payload, "\n") {
385+
lr := sl.LogRecords().AppendEmpty()
386+
387+
require.NoError(t, lr.Attributes().FromRaw(map[string]any{
388+
"ClientIP": fmt.Sprintf("89.163.253.%d", 200+idx),
389+
"ClientRequestHost": fmt.Sprintf("www.theburritobot%d.com", idx),
390+
"ClientRequestMethod": "GET",
391+
"ClientRequestURI": "/static/img/testimonial-hipster.png",
392+
"EdgeEndTimestamp": "2023-03-03T05:30:05Z",
393+
"EdgeResponseBytes": "69045",
394+
"EdgeResponseStatus": "200",
395+
"EdgeStartTimestamp": "2023-03-03T05:29:05Z",
396+
"RayID": "3a6050bcbe121a87",
397+
"RequestHeaders.Content_Type": "application/json",
398+
}))
399+
400+
lr.SetObservedTimestamp(pcommon.NewTimestampFromTime(now))
401+
ts, err := time.Parse(time.RFC3339, "2023-03-03T05:29:05Z")
402+
require.NoError(t, err)
403+
lr.SetTimestamp(pcommon.NewTimestampFromTime(ts))
404+
lr.SetSeverityNumber(plog.SeverityNumberInfo)
405+
lr.SetSeverityText(plog.SeverityNumberInfo.String())
406+
407+
var log map[string]any
408+
err = json.Unmarshal([]byte(line), &log)
409+
require.NoError(t, err)
410+
411+
payloadToExpectedBody(t, line, lr)
412+
}
413+
return logs
414+
}
415+
416+
for _, tc := range testCases {
417+
t.Run(tc.name, func(t *testing.T) {
418+
recv := newReceiver(t, &Config{
419+
Logs: LogsConfig{
420+
Endpoint: "localhost:0",
421+
TLS: &configtls.ServerConfig{},
422+
TimestampField: "EdgeStartTimestamp",
423+
Attributes: tc.attributes,
424+
Separator: ".",
425+
},
426+
},
427+
&consumertest.LogsSink{},
428+
)
429+
var logs plog.Logs
430+
rawLogs, err := parsePayload([]byte(tc.payload))
431+
if err == nil {
432+
logs = recv.processLogs(pcommon.NewTimestampFromTime(time.Now()), rawLogs)
433+
}
434+
require.NoError(t, err)
435+
require.NotNil(t, logs)
436+
require.NoError(t, plogtest.CompareLogs(expectedLogs(t, tc.payload), logs, plogtest.IgnoreObservedTimestamp()))
437+
})
438+
}
439+
}
440+
441+
func TestAttributesWithSeparator(t *testing.T) {
442+
now := time.Time{}
443+
444+
testCases := []struct {
445+
name string
446+
payload string
447+
attributes map[string]string
448+
separator string
449+
}{
450+
{
451+
name: "Empty Attributes Map",
452+
payload: `{ "ClientIP": "89.163.253.200", "ClientRequestHost": "www.theburritobot0.com", "ClientRequestMethod": "GET", "ClientRequestURI": "/static/img/testimonial-hipster.png", "EdgeEndTimestamp": "2023-03-03T05:30:05Z", "EdgeResponseBytes": "69045", "EdgeResponseStatus": "200", "EdgeStartTimestamp": "2023-03-03T05:29:05Z", "RayID": "3a6050bcbe121a87", "RequestHeaders": { "Content-Type": "application/json" } }
453+
{ "ClientIP" : "89.163.253.201", "ClientRequestHost": "www.theburritobot1.com", "ClientRequestMethod": "GET", "ClientRequestURI": "/static/img/testimonial-hipster.png", "EdgeEndTimestamp": "2023-03-03T05:30:05Z", "EdgeResponseBytes": "69045", "EdgeResponseStatus": "200", "EdgeStartTimestamp": "2023-03-03T05:29:05Z", "RayID": "3a6050bcbe121a87", "RequestHeaders": { "Content-Type": "application/json" } }`,
454+
attributes: map[string]string{},
455+
separator: ".",
456+
},
457+
{
458+
name: "nil Attributes",
459+
payload: `{ "ClientIP": "89.163.253.200", "ClientRequestHost": "www.theburritobot0.com", "ClientRequestMethod": "GET", "ClientRequestURI": "/static/img/testimonial-hipster.png", "EdgeEndTimestamp": "2023-03-03T05:30:05Z", "EdgeResponseBytes": "69045", "EdgeResponseStatus": "200", "EdgeStartTimestamp": "2023-03-03T05:29:05Z", "RayID": "3a6050bcbe121a87", "RequestHeaders": { "Content-Type": "application/json" } }
460+
{ "ClientIP" : "89.163.253.201", "ClientRequestHost": "www.theburritobot1.com", "ClientRequestMethod": "GET", "ClientRequestURI": "/static/img/testimonial-hipster.png", "EdgeEndTimestamp": "2023-03-03T05:30:05Z", "EdgeResponseBytes": "69045", "EdgeResponseStatus": "200", "EdgeStartTimestamp": "2023-03-03T05:29:05Z", "RayID": "3a6050bcbe121a87", "RequestHeaders": { "Content-Type": "application/json" } }`,
461+
attributes: nil,
462+
separator: "_test_",
463+
},
464+
}
465+
466+
expectedLogs := func(t *testing.T, payload string, separator string) plog.Logs {
467+
logs := plog.NewLogs()
468+
rl := logs.ResourceLogs().AppendEmpty()
469+
sl := rl.ScopeLogs().AppendEmpty()
470+
sl.Scope().SetName("github.com/open-telemetry/opentelemetry-collector-contrib/receiver/cloudflarereceiver")
471+
384472
for idx, line := range strings.Split(payload, "\n") {
385473
lr := sl.LogRecords().AppendEmpty()
386474

@@ -394,6 +482,7 @@ func TestEmptyAttributes(t *testing.T) {
394482
"EdgeResponseStatus": "200",
395483
"EdgeStartTimestamp": "2023-03-03T05:29:05Z",
396484
"RayID": "3a6050bcbe121a87",
485+
fmt.Sprintf("RequestHeaders%sContent_Type", separator): "application/json",
397486
}))
398487

399488
lr.SetObservedTimestamp(pcommon.NewTimestampFromTime(now))
@@ -409,7 +498,85 @@ func TestEmptyAttributes(t *testing.T) {
409498

410499
payloadToExpectedBody(t, line, lr)
411500
}
501+
return logs
502+
}
412503

504+
for _, tc := range testCases {
505+
t.Run(tc.name, func(t *testing.T) {
506+
recv := newReceiver(t, &Config{
507+
Logs: LogsConfig{
508+
Endpoint: "localhost:0",
509+
TLS: &configtls.ServerConfig{},
510+
TimestampField: "EdgeStartTimestamp",
511+
Attributes: tc.attributes,
512+
Separator: tc.separator,
513+
},
514+
},
515+
&consumertest.LogsSink{},
516+
)
517+
var logs plog.Logs
518+
rawLogs, err := parsePayload([]byte(tc.payload))
519+
if err == nil {
520+
logs = recv.processLogs(pcommon.NewTimestampFromTime(time.Now()), rawLogs)
521+
}
522+
require.NoError(t, err)
523+
require.NotNil(t, logs)
524+
require.NoError(t, plogtest.CompareLogs(expectedLogs(t, tc.payload, tc.separator), logs, plogtest.IgnoreObservedTimestamp()))
525+
})
526+
}
527+
}
528+
529+
func TestMultipleMapAttributes(t *testing.T) {
530+
now := time.Time{}
531+
532+
testCases := []struct {
533+
name string
534+
payload string
535+
attributes map[string]string
536+
}{
537+
{
538+
name: "Multi Map Attributes",
539+
payload: `{ "ClientIP": "89.163.253.200", "ClientRequestHost": "www.theburritobot0.com", "ClientRequestMethod": "GET", "ClientRequestURI": "/static/img/testimonial-hipster.png", "EdgeEndTimestamp": "2023-03-03T05:30:05Z", "EdgeResponseBytes": "69045", "EdgeResponseStatus": "200", "EdgeStartTimestamp": "2023-03-03T05:29:05Z", "RayID": "3a6050bcbe121a87", "RequestHeaders": { "Cookie-new": { "x-m2-new": "sensitive" } } }
540+
{ "ClientIP" : "89.163.253.201", "ClientRequestHost": "www.theburritobot1.com", "ClientRequestMethod": "GET", "ClientRequestURI": "/static/img/testimonial-hipster.png", "EdgeEndTimestamp": "2023-03-03T05:30:05Z", "EdgeResponseBytes": "69045", "EdgeResponseStatus": "200", "EdgeStartTimestamp": "2023-03-03T05:29:05Z", "RayID": "3a6050bcbe121a87", "RequestHeaders": { "Cookie-new": { "x-m2-new": "sensitive" } } }`,
541+
attributes: map[string]string{},
542+
},
543+
}
544+
545+
expectedLogs := func(t *testing.T, payload string) plog.Logs {
546+
logs := plog.NewLogs()
547+
rl := logs.ResourceLogs().AppendEmpty()
548+
sl := rl.ScopeLogs().AppendEmpty()
549+
sl.Scope().SetName("github.com/open-telemetry/opentelemetry-collector-contrib/receiver/cloudflarereceiver")
550+
551+
for idx, line := range strings.Split(payload, "\n") {
552+
lr := sl.LogRecords().AppendEmpty()
553+
554+
require.NoError(t, lr.Attributes().FromRaw(map[string]any{
555+
"ClientIP": fmt.Sprintf("89.163.253.%d", 200+idx),
556+
"ClientRequestHost": fmt.Sprintf("www.theburritobot%d.com", idx),
557+
"ClientRequestMethod": "GET",
558+
"ClientRequestURI": "/static/img/testimonial-hipster.png",
559+
"EdgeEndTimestamp": "2023-03-03T05:30:05Z",
560+
"EdgeResponseBytes": "69045",
561+
"EdgeResponseStatus": "200",
562+
"EdgeStartTimestamp": "2023-03-03T05:29:05Z",
563+
"RayID": "3a6050bcbe121a87",
564+
"RequestHeaders.Cookie_new.x_m2_new": "sensitive",
565+
}))
566+
567+
lr.SetObservedTimestamp(pcommon.NewTimestampFromTime(now))
568+
ts, err := time.Parse(time.RFC3339, "2023-03-03T05:29:05Z")
569+
require.NoError(t, err)
570+
lr.SetTimestamp(pcommon.NewTimestampFromTime(ts))
571+
lr.SetSeverityNumber(plog.SeverityNumberInfo)
572+
lr.SetSeverityText(plog.SeverityNumberInfo.String())
573+
574+
var log map[string]any
575+
err = json.Unmarshal([]byte(line), &log)
576+
require.NoError(t, err)
577+
578+
payloadToExpectedBody(t, line, lr)
579+
}
413580
return logs
414581
}
415582

@@ -421,6 +588,7 @@ func TestEmptyAttributes(t *testing.T) {
421588
TLS: &configtls.ServerConfig{},
422589
TimestampField: "EdgeStartTimestamp",
423590
Attributes: tc.attributes,
591+
Separator: ".",
424592
},
425593
},
426594
&consumertest.LogsSink{},

0 commit comments

Comments
 (0)