Skip to content

Commit 360b9ce

Browse files
mauri870MikeGoldsmith
authored andcommitted
[processor/logdedup] Add include_fields option (open-telemetry#37219)
#### Link to tracking issue Fixes open-telemetry#36965 <!--Describe what testing was performed and which tests were added.--> #### Testing Added tests that validate the new `include_fields` option as well as integration tests with ConsumeLogs. <!--Describe the documentation added.--> #### Documentation Added documentation for the new `include_fields` option as well as a README example. --------- Co-authored-by: Mike Goldsmith <[email protected]>
1 parent 6f23b2e commit 360b9ce

File tree

10 files changed

+432
-23
lines changed

10 files changed

+432
-23
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: logdedupprocessor
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Add 'include_fields' option to deduplicate log records via body or attribute fields.
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [36965]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

processor/logdedupprocessor/README.md

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,9 @@ This processor is used to deduplicate logs by detecting identical logs over a ra
3030
| interval | duration | `10s` | The interval at which logs are aggregated. The counter will reset after each interval. |
3131
| conditions | []string | `[]` | A slice of [OTTL] expressions used to evaluate which log records are deduped. All paths in the [log context] are available to reference. All [converters] are available to use. |
3232
| log_count_attribute | string | `log_count` | The name of the count attribute of deduplicated logs that will be added to the emitted aggregated log. |
33+
| include_fields | []string | `[]` | Fields to include in duplication matching. Fields can be from the log `body` or `attributes`. Nested fields must be `.` delimited. If a field contains a `.` it can be escaped by using a `\`. This option is **mutually exclusive** with `exclude_fields`. See [example config](#example-config-with-deduplication-key).
3334
| timezone | string | `UTC` | The timezone of the `first_observed_timestamp` and `last_observed_timestamp` timestamps on the emitted aggregated log. The available locations depend on the local IANA Time Zone database. [This page](https://en.wikipedia.org/wiki/List_of_tz_database_time_zones) contains many examples, such as `America/New_York`. |
34-
| exclude_fields | []string | `[]` | Fields to exclude from duplication matching. Fields can be excluded from the log `body` or `attributes`. These fields will not be present in the emitted aggregated log. Nested fields must be `.` delimited. If a field contains a `.` it can be escaped by using a `\` see [example config](#example-config-with-excluded-fields).<br><br>**Note**: The entire `body` cannot be excluded. If the body is a map then fields within it can be excluded. |
35+
| exclude_fields | []string | `[]` | Fields to exclude from duplication matching. Fields can be excluded from the log `body` or `attributes`. These fields will not be present in the emitted aggregated log. Nested fields must be `.` delimited. This option is `mutually exclusive` with `include_fields`. If a field contains a `.` it can be escaped by using a `\` see [example config](#example-config-with-excluded-fields).<br><br>**Note**: The entire `body` cannot be excluded. If the body is a map then fields within it can be excluded. |
3536

3637
[OTTL]: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/v0.109.0/pkg/ottl#readme
3738
[converters]: https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/v0.109.0/pkg/ottl/ottlfuncs/README.md#converters
@@ -87,6 +88,31 @@ service:
8788
exporters: [googlecloud]
8889
```
8990

91+
### Example Config with Include Fields
92+
This example demonstrates a configuration where deduplication is applied to telemetry based on specified fields. Only logs with the same values for the fields defined in the `include_fields` parameter are deduplicated:
93+
94+
```yaml
95+
receivers:
96+
filelog:
97+
include: [./example/*.log]
98+
processors:
99+
logdedup:
100+
include_fields:
101+
- attributes.id
102+
- attributes.name
103+
interval: 60s
104+
log_count_attribute: dedup_count
105+
timezone: 'America/Los_Angeles'
106+
exporters:
107+
googlecloud:
108+
109+
service:
110+
pipelines:
111+
logs:
112+
receivers: [filelog]
113+
processors: [logdedup]
114+
exporters: [googlecloud]
115+
```
90116

91117
### Example Config with Conditions
92118
The following config is an example configuration that only performs the deduping process on telemetry where Attribute `ID` equals `1` OR where Resource Attribute `service.name` equals `my-service`:

processor/logdedupprocessor/config.go

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ var (
3636
errInvalidLogCountAttribute = errors.New("log_count_attribute must be set")
3737
errInvalidInterval = errors.New("interval must be greater than 0")
3838
errCannotExcludeBody = errors.New("cannot exclude the entire body")
39+
errCannotIncludeBody = errors.New("cannot include the entire body")
3940
)
4041

4142
// Config is the config of the processor.
@@ -44,6 +45,7 @@ type Config struct {
4445
Interval time.Duration `mapstructure:"interval"`
4546
Timezone string `mapstructure:"timezone"`
4647
ExcludeFields []string `mapstructure:"exclude_fields"`
48+
IncludeFields []string `mapstructure:"include_fields"`
4749
Conditions []string `mapstructure:"conditions"`
4850
}
4951

@@ -54,6 +56,7 @@ func createDefaultConfig() component.Config {
5456
Interval: defaultInterval,
5557
Timezone: defaultTimezone,
5658
ExcludeFields: []string{},
59+
IncludeFields: []string{},
5760
Conditions: []string{},
5861
}
5962
}
@@ -73,7 +76,19 @@ func (c Config) Validate() error {
7376
return fmt.Errorf("timezone is invalid: %w", err)
7477
}
7578

76-
return c.validateExcludeFields()
79+
if len(c.ExcludeFields) > 0 && len(c.IncludeFields) > 0 {
80+
return errors.New("cannot define both exclude_fields and include_fields")
81+
}
82+
83+
if err = c.validateExcludeFields(); err != nil {
84+
return err
85+
}
86+
87+
if err = c.validateIncludeFields(); err != nil {
88+
return err
89+
}
90+
91+
return nil
7792
}
7893

7994
// validateExcludeFields validates that all the exclude fields
@@ -102,3 +117,30 @@ func (c Config) validateExcludeFields() error {
102117

103118
return nil
104119
}
120+
121+
// validateIncludeFields validates that all the exclude fields
122+
func (c Config) validateIncludeFields() error {
123+
knownFields := make(map[string]struct{})
124+
125+
for _, field := range c.IncludeFields {
126+
// Special check to make sure the entire body is not included
127+
if field == bodyField {
128+
return errCannotIncludeBody
129+
}
130+
131+
// Split and ensure the field starts with `body` or `attributes`
132+
parts := strings.Split(field, fieldDelimiter)
133+
if parts[0] != bodyField && parts[0] != attributeField {
134+
return fmt.Errorf("an include_fields must start with %s or %s", bodyField, attributeField)
135+
}
136+
137+
// If a field is valid make sure we haven't already seen it
138+
if _, ok := knownFields[field]; ok {
139+
return fmt.Errorf("duplicate include_fields %s", field)
140+
}
141+
142+
knownFields[field] = struct{}{}
143+
}
144+
145+
return nil
146+
}

processor/logdedupprocessor/config_test.go

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,36 @@ func TestValidateConfig(t *testing.T) {
8484
},
8585
expectedErr: errors.New("duplicate exclude_field"),
8686
},
87+
{
88+
desc: "invalid include_fields using entire body",
89+
cfg: &Config{
90+
LogCountAttribute: defaultLogCountAttribute,
91+
Interval: defaultInterval,
92+
Timezone: defaultTimezone,
93+
IncludeFields: []string{bodyField},
94+
},
95+
expectedErr: errors.New("cannot include the entire body"),
96+
},
97+
{
98+
desc: "invalid include_fields not starting with body or attributes",
99+
cfg: &Config{
100+
LogCountAttribute: defaultLogCountAttribute,
101+
Interval: defaultInterval,
102+
Timezone: defaultTimezone,
103+
IncludeFields: []string{"not.valid"},
104+
},
105+
expectedErr: errors.New("an include_fields must start with body or attributes"),
106+
},
107+
{
108+
desc: "empty include_fields is the default behavior",
109+
cfg: &Config{
110+
LogCountAttribute: defaultLogCountAttribute,
111+
Interval: defaultInterval,
112+
Timezone: defaultTimezone,
113+
IncludeFields: []string{},
114+
},
115+
expectedErr: nil,
116+
},
87117
{
88118
desc: "valid config",
89119
cfg: &Config{
@@ -95,6 +125,29 @@ func TestValidateConfig(t *testing.T) {
95125
},
96126
expectedErr: nil,
97127
},
128+
{
129+
desc: "valid config include_fields",
130+
cfg: &Config{
131+
LogCountAttribute: defaultLogCountAttribute,
132+
Interval: defaultInterval,
133+
Timezone: defaultTimezone,
134+
Conditions: []string{},
135+
IncludeFields: []string{"body.thing", "attributes.otherthing"},
136+
},
137+
expectedErr: nil,
138+
},
139+
{
140+
desc: "invalid config defines both exclude_fields and include_fields",
141+
cfg: &Config{
142+
LogCountAttribute: defaultLogCountAttribute,
143+
Interval: defaultInterval,
144+
Timezone: defaultTimezone,
145+
Conditions: []string{},
146+
ExcludeFields: []string{"body.thing", "attributes.otherthing"},
147+
IncludeFields: []string{"body.thing", "attributes.otherthing"},
148+
},
149+
expectedErr: errors.New("cannot define both exclude_fields and include_fields"),
150+
},
98151
}
99152

100153
for _, tc := range testCases {

processor/logdedupprocessor/counter.go

Lines changed: 64 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,15 +29,17 @@ type logAggregator struct {
2929
logCountAttribute string
3030
timezone *time.Location
3131
telemetryBuilder *metadata.TelemetryBuilder
32+
dedupFields []string
3233
}
3334

3435
// newLogAggregator creates a new LogCounter.
35-
func newLogAggregator(logCountAttribute string, timezone *time.Location, telemetryBuilder *metadata.TelemetryBuilder) *logAggregator {
36+
func newLogAggregator(logCountAttribute string, timezone *time.Location, telemetryBuilder *metadata.TelemetryBuilder, dedupFields []string) *logAggregator {
3637
return &logAggregator{
3738
resources: make(map[uint64]*resourceAggregator),
3839
logCountAttribute: logCountAttribute,
3940
timezone: timezone,
4041
telemetryBuilder: telemetryBuilder,
42+
dedupFields: dedupFields,
4143
}
4244
}
4345

@@ -83,7 +85,7 @@ func (l *logAggregator) Add(resource pcommon.Resource, scope pcommon.Instrumenta
8385
key := getResourceKey(resource)
8486
resourceAggregator, ok := l.resources[key]
8587
if !ok {
86-
resourceAggregator = newResourceAggregator(resource)
88+
resourceAggregator = newResourceAggregator(resource, l.dedupFields)
8789
l.resources[key] = resourceAggregator
8890
}
8991
resourceAggregator.Add(scope, logRecord)
@@ -98,13 +100,15 @@ func (l *logAggregator) Reset() {
98100
type resourceAggregator struct {
99101
resource pcommon.Resource
100102
scopeCounters map[uint64]*scopeAggregator
103+
dedupFields []string
101104
}
102105

103106
// newResourceAggregator creates a new ResourceCounter.
104-
func newResourceAggregator(resource pcommon.Resource) *resourceAggregator {
107+
func newResourceAggregator(resource pcommon.Resource, dedupFields []string) *resourceAggregator {
105108
return &resourceAggregator{
106109
resource: resource,
107110
scopeCounters: make(map[uint64]*scopeAggregator),
111+
dedupFields: dedupFields,
108112
}
109113
}
110114

@@ -113,7 +117,7 @@ func (r *resourceAggregator) Add(scope pcommon.InstrumentationScope, logRecord p
113117
key := getScopeKey(scope)
114118
scopeAggregator, ok := r.scopeCounters[key]
115119
if !ok {
116-
scopeAggregator = newScopeAggregator(scope)
120+
scopeAggregator = newScopeAggregator(scope, r.dedupFields)
117121
r.scopeCounters[key] = scopeAggregator
118122
}
119123
scopeAggregator.Add(logRecord)
@@ -123,19 +127,21 @@ func (r *resourceAggregator) Add(scope pcommon.InstrumentationScope, logRecord p
123127
type scopeAggregator struct {
124128
scope pcommon.InstrumentationScope
125129
logCounters map[uint64]*logCounter
130+
dedupFields []string
126131
}
127132

128133
// newScopeAggregator creates a new ScopeCounter.
129-
func newScopeAggregator(scope pcommon.InstrumentationScope) *scopeAggregator {
134+
func newScopeAggregator(scope pcommon.InstrumentationScope, dedupFields []string) *scopeAggregator {
130135
return &scopeAggregator{
131136
scope: scope,
132137
logCounters: make(map[uint64]*logCounter),
138+
dedupFields: dedupFields,
133139
}
134140
}
135141

136142
// Add increments the counter that the logRecord matches.
137143
func (s *scopeAggregator) Add(logRecord plog.LogRecord) {
138-
key := getLogKey(logRecord)
144+
key := getLogKey(logRecord, s.dedupFields)
139145
lc, ok := s.logCounters[key]
140146
if !ok {
141147
lc = newLogCounter(logRecord)
@@ -184,12 +190,62 @@ func getScopeKey(scope pcommon.InstrumentationScope) uint64 {
184190
)
185191
}
186192

187-
// getLogKey creates a unique hash for the log record to use as a map key
188-
func getLogKey(logRecord plog.LogRecord) uint64 {
193+
// getLogKey creates a unique hash for the log record to use as a map key.
194+
// If dedupFields is non-empty, it is used to determine the fields whose values are hashed.
195+
func getLogKey(logRecord plog.LogRecord, dedupFields []string) uint64 {
196+
if len(dedupFields) > 0 {
197+
var opts []pdatautil.HashOption
198+
199+
for _, field := range dedupFields {
200+
parts := splitField(field)
201+
var m pcommon.Map
202+
switch parts[0] {
203+
case bodyField:
204+
if logRecord.Body().Type() == pcommon.ValueTypeMap {
205+
m = logRecord.Body().Map()
206+
}
207+
case attributeField:
208+
m = logRecord.Attributes()
209+
}
210+
211+
value, ok := getKeyValue(m, parts[1:])
212+
if ok {
213+
opts = append(opts, pdatautil.WithString(value.AsString()))
214+
}
215+
}
216+
217+
if len(opts) > 0 {
218+
return pdatautil.Hash64(opts...)
219+
}
220+
}
221+
189222
return pdatautil.Hash64(
190223
pdatautil.WithMap(logRecord.Attributes()),
191224
pdatautil.WithValue(logRecord.Body()),
192225
pdatautil.WithString(logRecord.SeverityNumber().String()),
193226
pdatautil.WithString(logRecord.SeverityText()),
194227
)
195228
}
229+
230+
func getKeyValue(valueMap pcommon.Map, keyParts []string) (pcommon.Value, bool) {
231+
nextKeyPart, remainingParts := keyParts[0], keyParts[1:]
232+
233+
// Look for the value associated with the next key part.
234+
// If we don't find it then return
235+
value, ok := valueMap.Get(nextKeyPart)
236+
if !ok {
237+
return pcommon.NewValueEmpty(), false
238+
}
239+
240+
// No more key parts that means we have found the value
241+
if len(remainingParts) == 0 {
242+
return valueMap.Get(nextKeyPart)
243+
}
244+
245+
// If the value is a map then recurse through with the remaining parts
246+
if value.Type() == pcommon.ValueTypeMap {
247+
return getKeyValue(value.Map(), remainingParts)
248+
}
249+
250+
return pcommon.NewValueEmpty(), false
251+
}

0 commit comments

Comments
 (0)