Skip to content

Commit 839e464

Browse files
[pkg/ottl] enhance flatten editor to resolve attribute key conflicts (#37006)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description Introduce an option into `flatten()` editor to resolve conflicts in map keys and avoid data loss. The original behavior is perserved <!-- Issue number (e.g. #1234) or full URL to issue, if applicable. --> #### Link to tracking issue Fixes #35793 --------- Signed-off-by: odubajDT <[email protected]> Co-authored-by: Evan Bradley <[email protected]>
1 parent fdb0a20 commit 839e464

File tree

5 files changed

+502
-27
lines changed

5 files changed

+502
-27
lines changed

.chloggen/flatten-conflict.yaml

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: pkg/ottl
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: "Enhance flatten() editor to resolve attribute key conflicts by adding a number suffix to the conflicting keys."
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [35793]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: []

pkg/ottl/e2e/e2e_test.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,16 +61,21 @@ func Test_e2e_editors(t *testing.T) {
6161
tCtx.GetLogRecord().Attributes().Remove("total.string")
6262
tCtx.GetLogRecord().Attributes().Remove("foo")
6363
tCtx.GetLogRecord().Attributes().Remove("things")
64+
tCtx.GetLogRecord().Attributes().Remove("conflict.conflict1")
65+
tCtx.GetLogRecord().Attributes().Remove("conflict")
6466
},
6567
},
6668
{
6769
statement: `flatten(attributes)`,
6870
want: func(tCtx ottllog.TransformContext) {
6971
tCtx.GetLogRecord().Attributes().Remove("foo")
72+
tCtx.GetLogRecord().Attributes().Remove("conflict.conflict1")
73+
tCtx.GetLogRecord().Attributes().Remove("conflict")
7074
tCtx.GetLogRecord().Attributes().PutStr("foo.bar", "pass")
7175
tCtx.GetLogRecord().Attributes().PutStr("foo.flags", "pass")
7276
tCtx.GetLogRecord().Attributes().PutStr("foo.slice.0", "val")
7377
tCtx.GetLogRecord().Attributes().PutStr("foo.nested.test", "pass")
78+
tCtx.GetLogRecord().Attributes().PutStr("conflict.conflict1.conflict2", "nopass")
7479

7580
tCtx.GetLogRecord().Attributes().Remove("things")
7681
tCtx.GetLogRecord().Attributes().PutStr("things.0.name", "foo")
@@ -95,6 +100,7 @@ func Test_e2e_editors(t *testing.T) {
95100
m.PutStr("test.foo.flags", "pass")
96101
m.PutStr("test.foo.slice.0", "val")
97102
m.PutStr("test.foo.nested.test", "pass")
103+
m.PutStr("test.conflict.conflict1.conflict2", "nopass")
98104

99105
m.PutStr("test.things.0.name", "foo")
100106
m.PutInt("test.things.0.value", 2)
@@ -103,6 +109,34 @@ func Test_e2e_editors(t *testing.T) {
103109
m.CopyTo(tCtx.GetLogRecord().Attributes())
104110
},
105111
},
112+
{
113+
statement: `flatten(attributes, "test", resolveConflicts=true)`,
114+
want: func(tCtx ottllog.TransformContext) {
115+
m := pcommon.NewMap()
116+
m.PutStr("test.http.method", "get")
117+
m.PutStr("test.http.path", "/health")
118+
m.PutStr("test.http.url", "http://localhost/health")
119+
m.PutStr("test.flags", "A|B|C")
120+
m.PutStr("test.total.string", "123456789")
121+
m.PutStr("test.foo.bar", "pass")
122+
m.PutStr("test.foo.flags", "pass")
123+
m.PutStr("test.foo.bar", "pass")
124+
m.PutStr("test.foo.flags", "pass")
125+
m.PutStr("test.foo.slice", "val")
126+
m.PutStr("test.foo.nested.test", "pass")
127+
128+
m.PutStr("test.conflict.conflict1.conflict2", "pass")
129+
m.PutStr("test.conflict.conflict1.conflict2.0", "nopass")
130+
131+
m.PutStr("test.things.0.name", "foo")
132+
m.PutInt("test.things.0.value", 2)
133+
134+
m.PutStr("test.things.1.name", "bar")
135+
m.PutInt("test.things.1.value", 5)
136+
137+
m.CopyTo(tCtx.GetLogRecord().Attributes())
138+
},
139+
},
106140
{
107141
statement: `flatten(attributes, depth=1)`,
108142
want: func(tCtx ottllog.TransformContext) {
@@ -117,6 +151,9 @@ func Test_e2e_editors(t *testing.T) {
117151
m.PutStr("foo.bar", "pass")
118152
m.PutStr("foo.flags", "pass")
119153
m.PutEmptySlice("foo.slice").AppendEmpty().SetStr("val")
154+
m.PutStr("conflict.conflict1.conflict2", "nopass")
155+
mm := m.PutEmptyMap("conflict.conflict1")
156+
mm.PutStr("conflict2", "pass")
120157

121158
m1 := m.PutEmptyMap("things.0")
122159
m1.PutStr("name", "foo")
@@ -139,6 +176,8 @@ func Test_e2e_editors(t *testing.T) {
139176
tCtx.GetLogRecord().Attributes().Remove("http.url")
140177
tCtx.GetLogRecord().Attributes().Remove("foo")
141178
tCtx.GetLogRecord().Attributes().Remove("things")
179+
tCtx.GetLogRecord().Attributes().Remove("conflict.conflict1")
180+
tCtx.GetLogRecord().Attributes().Remove("conflict")
142181
},
143182
},
144183
{
@@ -154,6 +193,8 @@ func Test_e2e_editors(t *testing.T) {
154193
tCtx.GetLogRecord().Attributes().Remove("flags")
155194
tCtx.GetLogRecord().Attributes().Remove("foo")
156195
tCtx.GetLogRecord().Attributes().Remove("things")
196+
tCtx.GetLogRecord().Attributes().Remove("conflict.conflict1")
197+
tCtx.GetLogRecord().Attributes().Remove("conflict")
157198
},
158199
},
159200
{
@@ -1459,6 +1500,11 @@ func constructLogTransformContextEditors() ottllog.TransformContext {
14591500
logRecord.Attributes().PutStr("http.url", "http://localhost/health")
14601501
logRecord.Attributes().PutStr("flags", "A|B|C")
14611502
logRecord.Attributes().PutStr("total.string", "123456789")
1503+
mm := logRecord.Attributes().PutEmptyMap("conflict")
1504+
mm1 := mm.PutEmptyMap("conflict1")
1505+
mm1.PutStr("conflict2", "pass")
1506+
mmm := logRecord.Attributes().PutEmptyMap("conflict.conflict1")
1507+
mmm.PutStr("conflict2", "nopass")
14621508
m := logRecord.Attributes().PutEmptyMap("foo")
14631509
m.PutStr("bar", "pass")
14641510
m.PutStr("flags", "pass")

pkg/ottl/ottlfuncs/README.md

Lines changed: 46 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,11 +126,12 @@ Examples:
126126

127127
### flatten
128128

129-
`flatten(target, Optional[prefix], Optional[depth])`
129+
`flatten(target, Optional[prefix], Optional[depth], Optional[resolveConflicts])`
130130

131131
The `flatten` function flattens a `pcommon.Map` by moving items from nested maps to the root.
132132

133-
`target` is a path expression to a `pcommon.Map` type field. `prefix` is an optional string. `depth` is an optional non-negative int.
133+
`target` is a path expression to a `pcommon.Map` type field. `prefix` is an optional string. `depth` is an optional non-negative int, `resolveConflicts` resolves the potential conflicts in the map keys by adding a number suffix starting with `0` from the first duplicated key.
134+
134135

135136
For example, the following map
136137

@@ -199,6 +200,46 @@ the result would be
199200

200201
A `depth` of `0` means that no flattening will occur.
201202

203+
If `resolveConflicts` is set to `true`, conflicts within the map will be resolved
204+
205+
```json
206+
{
207+
"address": {
208+
"street": {
209+
"number": "first",
210+
},
211+
"house": "1234",
212+
},
213+
"address.street": {
214+
"number": ["second", "third"],
215+
},
216+
"address.street.number": "fourth",
217+
"occupants": [
218+
"user 1",
219+
"user 2",
220+
],
221+
}
222+
```
223+
224+
the result would be
225+
226+
```json
227+
{
228+
"address.street.number": "first",
229+
"address.house": "1234",
230+
"address.street.number.0": "second",
231+
"address.street.number.1": "third",
232+
"occupants": "user 1",
233+
"occupants.0": "user 2",
234+
"address.street.number.2": "fourth",
235+
}
236+
237+
```
238+
239+
**Note:**
240+
Please note that when the `resolveConflicts` parameter is set to `true`, the flattening of arrays is managed differently.
241+
With conflict resolution enabled, arrays and any potentially conflicting keys are handled in a standardized manner. Specifically, a `.<number>` suffix is added to the first conflicting key, with the `number` incrementing for each additional conflict.
242+
202243
Examples:
203244

204245
- `flatten(resource.attributes)`
@@ -210,6 +251,9 @@ Examples:
210251
- `flatten(log.body, depth=2)`
211252

212253

254+
- `flatten(body, resolveConflicts=true)`
255+
256+
213257
### keep_keys
214258

215259
`keep_keys(target, keys[])`

pkg/ottl/ottlfuncs/func_flatten.go

Lines changed: 66 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,25 @@ import (
77
"context"
88
"fmt"
99
"math"
10+
"strconv"
1011

1112
"go.opentelemetry.io/collector/pdata/pcommon"
1213

1314
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl"
1415
)
1516

1617
type FlattenArguments[K any] struct {
17-
Target ottl.PMapGetter[K]
18-
Prefix ottl.Optional[string]
19-
Depth ottl.Optional[int64]
18+
Target ottl.PMapGetter[K]
19+
Prefix ottl.Optional[string]
20+
Depth ottl.Optional[int64]
21+
ResolveConflicts ottl.Optional[bool]
22+
}
23+
24+
type flattenData struct {
25+
result pcommon.Map
26+
existingKeys map[string]int
27+
resolveConflict bool
28+
maxDepth int64
2029
}
2130

2231
func NewFlattenFactory[K any]() ottl.Factory[K] {
@@ -30,10 +39,10 @@ func createFlattenFunction[K any](_ ottl.FunctionContext, oArgs ottl.Arguments)
3039
return nil, fmt.Errorf("FlattenFactory args must be of type *FlattenArguments[K]")
3140
}
3241

33-
return flatten(args.Target, args.Prefix, args.Depth)
42+
return flatten(args.Target, args.Prefix, args.Depth, args.ResolveConflicts)
3443
}
3544

36-
func flatten[K any](target ottl.PMapGetter[K], p ottl.Optional[string], d ottl.Optional[int64]) (ottl.ExprFunc[K], error) {
45+
func flatten[K any](target ottl.PMapGetter[K], p ottl.Optional[string], d ottl.Optional[int64], c ottl.Optional[bool]) (ottl.ExprFunc[K], error) {
3746
depth := int64(math.MaxInt64)
3847
if !d.IsEmpty() {
3948
depth = d.Get()
@@ -47,52 +56,87 @@ func flatten[K any](target ottl.PMapGetter[K], p ottl.Optional[string], d ottl.O
4756
prefix = p.Get()
4857
}
4958

59+
resolveConflict := false
60+
if !c.IsEmpty() {
61+
resolveConflict = c.Get()
62+
}
63+
5064
return func(ctx context.Context, tCtx K) (any, error) {
5165
m, err := target.Get(ctx, tCtx)
5266
if err != nil {
5367
return nil, err
5468
}
5569

56-
result := pcommon.NewMap()
57-
flattenMap(m, result, prefix, 0, depth)
58-
result.MoveTo(m)
70+
flattenData := initFlattenData(resolveConflict, depth)
71+
flattenData.flattenMap(m, prefix, 0)
72+
flattenData.result.MoveTo(m)
5973

6074
return nil, nil
6175
}, nil
6276
}
6377

64-
func flattenMap(m pcommon.Map, result pcommon.Map, prefix string, currentDepth, maxDepth int64) {
78+
func initFlattenData(resolveConflict bool, maxDepth int64) *flattenData {
79+
return &flattenData{
80+
result: pcommon.NewMap(),
81+
existingKeys: map[string]int{},
82+
resolveConflict: resolveConflict,
83+
maxDepth: maxDepth,
84+
}
85+
}
86+
87+
func (f *flattenData) flattenMap(m pcommon.Map, prefix string, currentDepth int64) {
6588
if len(prefix) > 0 {
6689
prefix += "."
6790
}
6891
m.Range(func(k string, v pcommon.Value) bool {
69-
return flattenValue(k, v, currentDepth, maxDepth, result, prefix)
92+
return f.flattenValue(k, v, currentDepth, prefix)
7093
})
7194
}
7295

73-
func flattenSlice(s pcommon.Slice, result pcommon.Map, prefix string, currentDepth int64, maxDepth int64) {
96+
func (f *flattenData) flattenSlice(s pcommon.Slice, prefix string, currentDepth int64) {
7497
for i := 0; i < s.Len(); i++ {
75-
flattenValue(fmt.Sprintf("%d", i), s.At(i), currentDepth+1, maxDepth, result, prefix)
98+
f.flattenValue(fmt.Sprintf("%d", i), s.At(i), currentDepth+1, prefix)
7699
}
77100
}
78101

79-
func flattenValue(k string, v pcommon.Value, currentDepth int64, maxDepth int64, result pcommon.Map, prefix string) bool {
102+
func (f *flattenData) flattenValue(k string, v pcommon.Value, currentDepth int64, prefix string) bool {
80103
switch {
81-
case v.Type() == pcommon.ValueTypeMap && currentDepth < maxDepth:
82-
flattenMap(v.Map(), result, prefix+k, currentDepth+1, maxDepth)
83-
case v.Type() == pcommon.ValueTypeSlice && currentDepth < maxDepth:
104+
case v.Type() == pcommon.ValueTypeMap && currentDepth < f.maxDepth:
105+
f.flattenMap(v.Map(), prefix+k, currentDepth+1)
106+
case v.Type() == pcommon.ValueTypeSlice && currentDepth < f.maxDepth:
84107
for i := 0; i < v.Slice().Len(); i++ {
85108
switch {
86-
case v.Slice().At(i).Type() == pcommon.ValueTypeMap && currentDepth+1 < maxDepth:
87-
flattenMap(v.Slice().At(i).Map(), result, fmt.Sprintf("%v.%v", prefix+k, i), currentDepth+2, maxDepth)
88-
case v.Slice().At(i).Type() == pcommon.ValueTypeSlice && currentDepth+1 < maxDepth:
89-
flattenSlice(v.Slice().At(i).Slice(), result, fmt.Sprintf("%v.%v", prefix+k, i), currentDepth+2, maxDepth)
109+
case v.Slice().At(i).Type() == pcommon.ValueTypeMap && currentDepth+1 < f.maxDepth:
110+
f.flattenMap(v.Slice().At(i).Map(), fmt.Sprintf("%v.%v", prefix+k, i), currentDepth+2)
111+
case v.Slice().At(i).Type() == pcommon.ValueTypeSlice && currentDepth+1 < f.maxDepth:
112+
f.flattenSlice(v.Slice().At(i).Slice(), fmt.Sprintf("%v.%v", prefix+k, i), currentDepth+2)
90113
default:
91-
v.Slice().At(i).CopyTo(result.PutEmpty(fmt.Sprintf("%v.%v", prefix+k, i)))
114+
key := prefix + k
115+
if f.resolveConflict {
116+
f.handleConflict(key, v.Slice().At(i))
117+
} else {
118+
v.Slice().At(i).CopyTo(f.result.PutEmpty(fmt.Sprintf("%v.%v", key, i)))
119+
}
92120
}
93121
}
94122
default:
95-
v.CopyTo(result.PutEmpty(prefix + k))
123+
key := prefix + k
124+
if f.resolveConflict {
125+
f.handleConflict(key, v)
126+
} else {
127+
v.CopyTo(f.result.PutEmpty(key))
128+
}
96129
}
97130
return true
98131
}
132+
133+
func (f *flattenData) handleConflict(key string, v pcommon.Value) {
134+
if _, exists := f.result.Get(key); exists {
135+
newKey := key + "." + strconv.Itoa(f.existingKeys[key])
136+
f.existingKeys[key]++
137+
v.CopyTo(f.result.PutEmpty(newKey))
138+
} else {
139+
f.existingKeys[key] = 0
140+
v.CopyTo(f.result.PutEmpty(key))
141+
}
142+
}

0 commit comments

Comments
 (0)