Skip to content

Commit f70c8a3

Browse files
authored
Header support for json_array_parser (#30814)
**Description:** <Describe what has changed.> <!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> Adding a feature following #30644. This feature allow json_array_parser parser to accept a comma-delimited header and for every json array it parses, output a map which contains the header fileds as keys and the matching values are the ones parsed from the input json array. This feature as added mainly for performance reasons as from a functional POV, this is mostly similar to chaining the 2 operators: `json_array_parser -> assign_keys ` **Link to tracking Issue:** <Issue number if applicable> #30321 **Testing:** <Describe what testing was performed and which tests were added.> - unittests - End to end tests Used generated traffic on a running otel collector thats using the parser and verified the data is as expected in the end table and performance looks good **Documentation:** <Describe the documentation added.> - [json_array_parser.md](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/58cc91ca30eabbd35c074d79db8630fc474164d9/pkg/stanza/docs/operators/json_array_parser.md)
1 parent ba3d660 commit f70c8a3

File tree

6 files changed

+198
-34
lines changed

6 files changed

+198
-34
lines changed
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
2+
change_type: enhancement
3+
4+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
5+
component: pkg/stanza
6+
7+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
8+
note: Add support in a header configuration for json array parser.
9+
10+
# One or more tracking issues related to the change
11+
issues: [30321]
12+
13+
# (Optional) One or more lines of additional information to render under the primary note.
14+
# These lines will be padded with 2 spaces and then inserted directly into the document.
15+
# Use pipe (|) for multiline entries.
16+
subtext: |

pkg/stanza/docs/operators/json_array_parser.md

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,9 @@ More information on json arrays can be found [here](https://json-schema.org/unde
4747
|--------------------|------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------|
4848
| `id` | `json_array_parser` | A unique identifier for the operator. |
4949
| `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries. |
50+
| `header` | optional | A string of comma delimited field names. When a header is set, the output will be a map containing the header fields as keys and the parsed input json array fields as matching values |
5051
| `parse_from` | `body` | The [field](../types/field.md) from which the value will be parsed. |
51-
| `parse_to` | required. can be one of `body` or a nested field inside `body`, `attributes` or `resource` (ie `attributes.parsed`) | The [field](../types/field.md) to which the value will be parsed. |
52+
| `parse_to` | required. can be one of `body` or a nested field inside `body`, `attributes` or `resource` (ie `attributes.parsed`). When a header is used, `attributes` is also valid | The [field](../types/field.md) to which the value will be parsed. |
5253
| `on_error` | `send` | The behavior of the operator if it encounters an error. See [on_error](../types/on_error.md). |
5354
| `timestamp` | `nil` | An optional [timestamp](../types/timestamp.md) block which will parse a timestamp field before passing the entry to the output operator. |
5455
| `severity` | `nil` | An optional [severity](../types/severity.md) block which will parse a severity field before passing the entry to the output operator. |
@@ -124,6 +125,46 @@ Configuration:
124125
}
125126
```
126127

128+
</td>
129+
</tr>
130+
</table>
131+
132+
#### Parse the field `body` with a json array parser and a header into attributes
133+
134+
Configuration:
135+
136+
```yaml
137+
- type: json_array_parser
138+
parse_to: attributes
139+
header: origin,sev,message,isBool
140+
```
141+
142+
<table>
143+
<tr><td> Input Entry </td> <td> Output Entry </td></tr>
144+
<tr>
145+
<td>
146+
147+
```json
148+
{
149+
"body": "[1,\"debug\",\"Debug Message\", true]"
150+
}
151+
```
152+
153+
</td>
154+
<td>
155+
156+
```json
157+
{
158+
"body": "[1,\"debug\",\"Debug Message\", true]",
159+
"attributes": {
160+
"origin": 1,
161+
"sev": "debug",
162+
"message": "Debug Message",
163+
"isBool": true,
164+
}
165+
}
166+
```
167+
127168
</td>
128169
</tr>
129170
</table>

pkg/stanza/operator/parser/jsonarray/config_test.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,15 @@ func TestConfig(t *testing.T) {
5353
return p
5454
}(),
5555
},
56+
{
57+
Name: "parse_with_header_as_attributes",
58+
Expect: func() *Config {
59+
p := NewConfig()
60+
p.ParseTo = entry.RootableField{Field: entry.NewAttributeField()}
61+
p.Header = "A,B,C"
62+
return p
63+
}(),
64+
},
5665
},
5766
}.Run(t)
5867
}

pkg/stanza/operator/parser/jsonarray/json_array_parser.go

Lines changed: 90 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"context"
77
"errors"
88
"fmt"
9+
"strings"
910

1011
"github.com/valyala/fastjson"
1112
"go.opentelemetry.io/collector/featuregate"
@@ -17,6 +18,7 @@ import (
1718
)
1819

1920
const operatorType = "json_array_parser"
21+
const headerDelimiter = ","
2022

2123
var jsonArrayParserFeatureGate = featuregate.GlobalRegistry().MustRegister(
2224
"logs.jsonParserArray",
@@ -46,6 +48,7 @@ func NewConfigWithID(operatorID string) *Config {
4648
// Config is the configuration of a json array parser operator.
4749
type Config struct {
4850
helper.ParserConfig `mapstructure:",squash"`
51+
Header string `mapstructure:"header"`
4952
}
5053

5154
// Build will build a json array parser operator.
@@ -55,59 +58,113 @@ func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) {
5558
return nil, err
5659
}
5760

61+
if c.Header != "" {
62+
return &Parser{
63+
ParserOperator: parserOperator,
64+
parse: generateParseToMapFunc(new(fastjson.ParserPool), strings.Split(c.Header, headerDelimiter)),
65+
}, nil
66+
}
67+
5868
return &Parser{
5969
ParserOperator: parserOperator,
60-
pool: new(fastjson.ParserPool),
70+
parse: generateParseToArrayFunc(new(fastjson.ParserPool)),
6171
}, nil
6272
}
6373

6474
// Parser is an operator that parses json array in an entry.
6575
type Parser struct {
6676
helper.ParserOperator
67-
pool *fastjson.ParserPool
77+
parse parseFunc
6878
}
6979

80+
type parseFunc func(any) (any, error)
81+
7082
// Process will parse an entry for json array.
7183
func (r *Parser) Process(ctx context.Context, e *entry.Entry) error {
7284
return r.ParserOperator.ProcessWith(ctx, e, r.parse)
7385
}
7486

75-
func (r *Parser) parse(value any) (any, error) {
76-
jArrayLine, err := valueAsString(value)
77-
if err != nil {
78-
return nil, err
79-
}
87+
func generateParseToArrayFunc(pool *fastjson.ParserPool) parseFunc {
88+
return func(value any) (any, error) {
89+
jArrayLine, err := valueAsString(value)
90+
if err != nil {
91+
return nil, err
92+
}
8093

81-
p := r.pool.Get()
82-
v, err := p.Parse(jArrayLine)
83-
r.pool.Put(p)
84-
if err != nil {
85-
return nil, errors.New("failed to parse entry")
86-
}
94+
p := pool.Get()
95+
v, err := p.Parse(jArrayLine)
96+
pool.Put(p)
97+
if err != nil {
98+
return nil, errors.New("failed to parse entry")
99+
}
87100

88-
jArray := v.GetArray() // a is a []*Value slice
89-
parsedValues := make([]any, len(jArray))
90-
for i := range jArray {
91-
switch jArray[i].Type() {
92-
case fastjson.TypeNumber:
93-
parsedValues[i] = jArray[i].GetInt64()
94-
case fastjson.TypeString:
95-
parsedValues[i] = string(jArray[i].GetStringBytes())
96-
case fastjson.TypeTrue:
97-
parsedValues[i] = true
98-
case fastjson.TypeFalse:
99-
parsedValues[i] = false
100-
case fastjson.TypeNull:
101-
parsedValues[i] = nil
102-
case fastjson.TypeObject:
103-
// Nested objects handled as a string since this parser doesn't support nested headers
104-
parsedValues[i] = jArray[i].String()
105-
default:
106-
return nil, errors.New("failed to parse entry: " + string(jArray[i].MarshalTo(nil)))
101+
jArray := v.GetArray() // a is a []*Value slice
102+
parsedValues := make([]any, len(jArray))
103+
for i := range jArray {
104+
switch jArray[i].Type() {
105+
case fastjson.TypeNumber:
106+
parsedValues[i] = jArray[i].GetInt64()
107+
case fastjson.TypeString:
108+
parsedValues[i] = string(jArray[i].GetStringBytes())
109+
case fastjson.TypeTrue:
110+
parsedValues[i] = true
111+
case fastjson.TypeFalse:
112+
parsedValues[i] = false
113+
case fastjson.TypeNull:
114+
parsedValues[i] = nil
115+
case fastjson.TypeObject:
116+
// Nested objects handled as a string since this parser doesn't support nested headers
117+
parsedValues[i] = jArray[i].String()
118+
default:
119+
return nil, errors.New("failed to parse entry: " + string(jArray[i].MarshalTo(nil)))
120+
}
107121
}
122+
123+
return parsedValues, nil
108124
}
125+
}
126+
127+
func generateParseToMapFunc(pool *fastjson.ParserPool, header []string) parseFunc {
128+
return func(value any) (any, error) {
129+
jArrayLine, err := valueAsString(value)
130+
if err != nil {
131+
return nil, err
132+
}
133+
134+
p := pool.Get()
135+
v, err := p.Parse(jArrayLine)
136+
pool.Put(p)
137+
if err != nil {
138+
return nil, errors.New("failed to parse entry")
139+
}
109140

110-
return parsedValues, nil
141+
jArray := v.GetArray() // a is a []*Value slice
142+
if len(header) != len(jArray) {
143+
return nil, fmt.Errorf("wrong number of fields: expected %d, found %d", len(header), len(jArray))
144+
}
145+
parsedValues := make(map[string]any, len(jArray))
146+
for i := range jArray {
147+
switch jArray[i].Type() {
148+
case fastjson.TypeNumber:
149+
parsedValues[header[i]] = jArray[i].GetInt64()
150+
case fastjson.TypeString:
151+
parsedValues[header[i]] = string(jArray[i].GetStringBytes())
152+
case fastjson.TypeTrue:
153+
parsedValues[header[i]] = true
154+
case fastjson.TypeFalse:
155+
parsedValues[header[i]] = false
156+
case fastjson.TypeNull:
157+
parsedValues[header[i]] = nil
158+
case fastjson.TypeObject:
159+
// Nested objects handled as a string since this parser doesn't support nested headers
160+
parsedValues[header[i]] = jArray[i].String()
161+
default:
162+
return nil, errors.New("failed to parse entry: " + string(jArray[i].MarshalTo(nil)))
163+
}
164+
}
165+
166+
return parsedValues, nil
167+
}
111168
}
112169

113170
// valueAsString interprets the given value as a string.

pkg/stanza/operator/parser/jsonarray/json_array_parser_test.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,17 @@ func TestParserInvalidType(t *testing.T) {
3636
require.Contains(t, err.Error(), "type '[]int' cannot be parsed as json array")
3737
}
3838

39+
func TestParserByteFailureHeadersMismatch(t *testing.T) {
40+
cfg := NewConfigWithID("test")
41+
cfg.Header = "name,sev,msg"
42+
op, err := cfg.Build(testutil.Logger(t))
43+
require.NoError(t, err)
44+
parser := op.(*Parser)
45+
_, err = parser.parse("[\"stanza\",\"INFO\",\"started agent\", 42, true]")
46+
require.Error(t, err)
47+
require.Contains(t, err.Error(), "wrong number of fields: expected 3, found 5")
48+
}
49+
3950
func TestParserJarray(t *testing.T) {
4051
cases := []struct {
4152
name string
@@ -193,6 +204,32 @@ func TestParserJarray(t *testing.T) {
193204
false,
194205
false,
195206
},
207+
{
208+
"parse-as-attributes-with-header",
209+
func(p *Config) {
210+
p.ParseTo = entry.RootableField{Field: entry.NewAttributeField()}
211+
p.Header = "origin,sev,message,count,isBool"
212+
},
213+
[]entry.Entry{
214+
{
215+
Body: "[\"stanza\",\"INFO\",\"started agent\", 42, true]",
216+
},
217+
},
218+
[]entry.Entry{
219+
{
220+
Body: "[\"stanza\",\"INFO\",\"started agent\", 42, true]",
221+
Attributes: map[string]any{
222+
"origin": "stanza",
223+
"sev": "INFO",
224+
"message": "started agent",
225+
"count": int64(42),
226+
"isBool": true,
227+
},
228+
},
229+
},
230+
false,
231+
false,
232+
},
196233
}
197234

198235
for _, tc := range cases {

pkg/stanza/operator/parser/jsonarray/testdata/config.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,3 +12,7 @@ parse_to_body:
1212
parse_to_resource:
1313
type: json_array_parser
1414
parse_to: resource.output
15+
parse_with_header_as_attributes:
16+
type: json_array_parser
17+
parse_to: attributes
18+
header: A,B,C

0 commit comments

Comments
 (0)