Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -183,3 +183,4 @@
- Save the agent configuration and the state encrypted on the disk. {issue}535[535] {pull}398[398]
- Bump node.js version for heartbeat/synthetics to 16.15.0
- Support scheduled actions and cancellation of pending actions. {issue}393[393] {pull}419[419]
- Add `source.input_id` and `source.stream_id` when applying the inject stream processor {pull}527[527]
53 changes: 53 additions & 0 deletions internal/pkg/agent/transpiler/rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -669,6 +669,42 @@ func (r *InjectStreamProcessorRule) Apply(_ AgentInfo, ast *AST) (err error) {
namespace := datastreamNamespaceFromInputNode(inputNode)
datastreamType := datastreamTypeFromInputNode(inputNode, r.Type)

var inputID *StrVal
inputIDNode, found := inputNode.Find("id")
if found {
inputID, _ = inputIDNode.Value().(*StrVal)
}

if inputID != nil {
// get input-level processors node
processorsNode, found := inputNode.Find("processors")
if !found {
processorsNode = &Key{
name: "processors",
value: &List{value: make([]Node, 0)},
}

inputMap, ok := inputNode.(*Dict)
if ok {
inputMap.value = append(inputMap.value, processorsNode)
}
}

processorsList, ok := processorsNode.Value().(*List)
if !ok {
return errors.New("InjectStreamProcessorRule: input processors is not a list")
}

// inject `input_id` on the input level
processorMap := &Dict{value: make([]Node, 0)}
processorMap.value = append(processorMap.value, &Key{name: "target", value: &StrVal{value: "source"}})
processorMap.value = append(processorMap.value, &Key{name: "fields", value: &Dict{value: []Node{
&Key{name: "input_id", value: inputID},
}}})
addFieldsMap := &Dict{value: []Node{&Key{"add_fields", processorMap}}}
processorsList.value = mergeStrategy(r.OnConflict).InjectItem(processorsList.value, addFieldsMap)
}

streamsNode, ok := inputNode.Find("streams")
if !ok {
continue
Expand All @@ -680,6 +716,12 @@ func (r *InjectStreamProcessorRule) Apply(_ AgentInfo, ast *AST) (err error) {
}

for _, streamNode := range streamsList.value {
var streamID *StrVal
streamIDNode, ok := streamNode.Find("id")
if ok {
streamID, _ = streamIDNode.Value().(*StrVal)
}

streamMap, ok := streamNode.(*Dict)
if !ok {
continue
Expand Down Expand Up @@ -722,6 +764,17 @@ func (r *InjectStreamProcessorRule) Apply(_ AgentInfo, ast *AST) (err error) {
}}})
addFieldsMap = &Dict{value: []Node{&Key{"add_fields", processorMap}}}
processorsList.value = mergeStrategy(r.OnConflict).InjectItem(processorsList.value, addFieldsMap)

if streamID != nil {
// source stream
processorMap = &Dict{value: make([]Node, 0)}
processorMap.value = append(processorMap.value, &Key{name: "target", value: &StrVal{value: "source"}})
processorMap.value = append(processorMap.value, &Key{name: "fields", value: &Dict{value: []Node{
&Key{name: "stream_id", value: streamID.Clone()},
}}})
addFieldsMap = &Dict{value: []Node{&Key{"add_fields", processorMap}}}
processorsList.value = mergeStrategy(r.OnConflict).InjectItem(processorsList.value, addFieldsMap)
}
}
}

Expand Down
108 changes: 108 additions & 0 deletions internal/pkg/agent/transpiler/rules_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,114 @@ inputs:
},
},

"inject stream": {
givenYAML: `
inputs:
- name: No streams, no IDs
type: file
- name: With streams and IDs
id: input-id
type: file
data_stream.namespace: nsns
streams:
- paths: /var/log/mysql/error.log
id: stream-id
data_stream.dataset: dsds
- name: With processors
id: input-id
type: file
data_stream.namespace: nsns
processors:
- add_fields:
target: some
fields:
dataset: value
streams:
- paths: /var/log/mysql/error.log
id: stream-id
data_stream.dataset: dsds
processors:
- add_fields:
target: another
fields:
dataset: value
`,
expectedYAML: `
inputs:
- name: No streams, no IDs
type: file
- name: With streams and IDs
id: input-id
type: file
data_stream.namespace: nsns
processors:
- add_fields:
target: source
fields:
input_id: input-id
streams:
- paths: /var/log/mysql/error.log
id: stream-id
data_stream.dataset: dsds
processors:
- add_fields:
target: data_stream
fields:
type: stream-type
namespace: nsns
dataset: dsds
- add_fields:
target: event
fields:
dataset: dsds
- add_fields:
target: source
fields:
stream_id: stream-id
- name: With processors
id: input-id
type: file
data_stream.namespace: nsns
processors:
- add_fields:
target: some
fields:
dataset: value
- add_fields:
target: source
fields:
input_id: input-id
streams:
- paths: /var/log/mysql/error.log
id: stream-id
data_stream.dataset: dsds
processors:
- add_fields:
target: another
fields:
dataset: value
- add_fields:
target: data_stream
fields:
type: stream-type
namespace: nsns
dataset: dsds
- add_fields:
target: event
fields:
dataset: dsds
- add_fields:
target: source
fields:
stream_id: stream-id
`,
rule: &RuleList{
Rules: []Rule{
InjectStreamProcessor("insert_after", "stream-type"),
},
},
},

"inject agent info": {
givenYAML: `
inputs:
Expand Down