Skip to content

Commit 95778f2

Browse files
authored
[receiver/journald]: add support for matches configuration (#20852)
* [receiver/journald]: support `matches` configuration Signed-off-by: Dominik Rosiek <[email protected]>
1 parent 39681e6 commit 95778f2

File tree

5 files changed

+297
-23
lines changed

5 files changed

+297
-23
lines changed
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
2+
change_type: enhancement
3+
4+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
5+
component: journaldreceiver
6+
7+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
8+
note: add support for `matches` configuration
9+
10+
# One or more tracking issues related to the change
11+
issues: [20295]
12+
13+
# (Optional) One or more lines of additional information to render under the primary note.
14+
# These lines will be padded with 2 spaces and then inserted directly into the document.
15+
# Use pipe (|) for multiline entries.
16+
subtext:

pkg/stanza/docs/operators/journald_input.md

Lines changed: 64 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,15 @@ The `journald_input` operator will use the `__REALTIME_TIMESTAMP` field of the j
1414
| `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries. |
1515
| `directory` | | A directory containing journal files to read entries from. |
1616
| `files` | | A list of journal files to read entries from. |
17-
| `units` | | A list of units to read entries from. |
18-
| `priority` | `info` | Filter output by message priorities or priority ranges. |
17+
| `units` | | A list of units to read entries from. See [Multiple filtering options](#multiple-filtering-options) examples, if you want to use it together with `matches` and/or `priority`. |
18+
| `matches` | | A list of matches to read entries from. See [Matches](#matches) and [Multiple filtering options](#multiple-filtering-options) examples. |
19+
| `priority` | `info` | Filter output by message priorities or priority ranges. See [Multiple filtering options](#multiple-filtering-options) examples, if you want to use it together with `units` and/or `matches`. |
1920
| `start_at` | `end` | At startup, where to start reading logs from the file. Options are `beginning` or `end`. |
2021
| `attributes` | {} | A map of `key: value` pairs to add to the entry's attributes. |
2122
| `resource` | {} | A map of `key: value` pairs to add to the entry's resource. |
2223

2324
### Example Configurations
25+
2426
```yaml
2527
- type: journald_input
2628
units:
@@ -33,7 +35,66 @@ The `journald_input` operator will use the `__REALTIME_TIMESTAMP` field of the j
3335
- type: journald_input
3436
priority: emerg..err
3537
```
36-
#### Simple journald input
38+
39+
#### Matches
40+
41+
The following configuration:
42+
43+
```yaml
44+
- type: journald_input
45+
matches:
46+
- _SYSTEMD_UNIT: ssh
47+
- _SYSTEMD_UNIT: kubelet
48+
_UID: "1000"
49+
```
50+
51+
will be passed to `journalctl` as the following arguments: `journalctl ... _SYSTEMD_UNIT=ssh + _SYSTEMD_UNIT=kubelet _UID=1000`,
52+
which is going to retrieve all entries which match at least one of the following rules:
53+
54+
- `_SYSTEMD_UNIT` is `ssh`
55+
- `_SYSTEMD_UNIT` is `kubelet` and `_UID` is `1000`
56+
57+
#### Multiple filtering options
58+
59+
In case of using multiple following options, conditions between them are logically `AND`ed and within them are logically `OR`ed:
60+
61+
```text
62+
( priority )
63+
AND
64+
( units[0] OR units[1] OR units[2] OR ... units[U] )
65+
AND
66+
( matches[0] OR matches[1] OR matches[2] OR ... matches[M] )
67+
```
68+
69+
Consider the following example:
70+
71+
```yaml
72+
- type: journald_input
73+
matches:
74+
- _SYSTEMD_UNIT: ssh
75+
- _SYSTEMD_UNIT: kubelet
76+
_UID: "1000"
77+
units:
78+
- kubelet
79+
- systemd
80+
priority: info
81+
```
82+
83+
The above configuration will be passed to `journalctl` as the following arguments
84+
`journalctl ... --priority=info --unit=kubelet --unit=systemd _SYSTEMD_UNIT=ssh + _SYSTEMD_UNIT=kubelet _UID=1000`,
85+
which is going to effectively retrieve all entries which matches the following set of rules:
86+
87+
- `_PRIORITY` is `6`, and
88+
- `_SYSTEMD_UNIT` is `kubelet` or `systemd`, and
89+
- entry matches at least one of the following rules:
90+
91+
- `_SYSTEMD_UNIT` is `ssh`
92+
- `_SYSTEMD_UNIT` is `kubelet` and `_UID` is `1000`
93+
94+
Note, that if you use some fields which aren't associated with an entry, the entry will always be filtered out.
95+
Also be careful about using unit name in `matches` configuration, as for the above example, none of the entry for `ssh` and `systemd` is going to be retrieved.
96+
97+
### Simple journald input
3798

3899
Configuration:
39100
```yaml

pkg/stanza/operator/input/journald/journald.go

Lines changed: 77 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ import (
2424
"fmt"
2525
"io"
2626
"os/exec"
27+
"regexp"
28+
"sort"
2729
"strconv"
2830
"sync"
2931
"time"
@@ -60,20 +62,42 @@ func NewConfigWithID(operatorID string) *Config {
6062
type Config struct {
6163
helper.InputConfig `mapstructure:",squash"`
6264

63-
Directory *string `mapstructure:"directory,omitempty"`
64-
Files []string `mapstructure:"files,omitempty"`
65-
StartAt string `mapstructure:"start_at,omitempty"`
66-
Units []string `mapstructure:"units,omitempty"`
67-
Priority string `mapstructure:"priority,omitempty"`
65+
Directory *string `mapstructure:"directory,omitempty"`
66+
Files []string `mapstructure:"files,omitempty"`
67+
StartAt string `mapstructure:"start_at,omitempty"`
68+
Units []string `mapstructure:"units,omitempty"`
69+
Priority string `mapstructure:"priority,omitempty"`
70+
Matches []MatchConfig `mapstructure:"matches,omitempty"`
6871
}
6972

73+
type MatchConfig map[string]string
74+
7075
// Build will build a journald input operator from the supplied configuration
7176
func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) {
7277
inputOperator, err := c.InputConfig.Build(logger)
7378
if err != nil {
7479
return nil, err
7580
}
7681

82+
args, err := c.buildArgs()
83+
if err != nil {
84+
return nil, err
85+
}
86+
87+
return &Input{
88+
InputOperator: inputOperator,
89+
newCmd: func(ctx context.Context, cursor []byte) cmd {
90+
if cursor != nil {
91+
args = append(args, "--after-cursor", string(cursor))
92+
}
93+
return exec.CommandContext(ctx, "journalctl", args...) // #nosec - ...
94+
// journalctl is an executable that is required for this operator to function
95+
},
96+
json: jsoniter.ConfigFastest,
97+
}, nil
98+
}
99+
100+
func (c Config) buildArgs() ([]string, error) {
77101
args := make([]string, 0, 10)
78102

79103
// Export logs in UTC time
@@ -108,17 +132,54 @@ func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) {
108132
}
109133
}
110134

111-
return &Input{
112-
InputOperator: inputOperator,
113-
newCmd: func(ctx context.Context, cursor []byte) cmd {
114-
if cursor != nil {
115-
args = append(args, "--after-cursor", string(cursor))
116-
}
117-
return exec.CommandContext(ctx, "journalctl", args...) // #nosec - ...
118-
// journalctl is an executable that is required for this operator to function
119-
},
120-
json: jsoniter.ConfigFastest,
121-
}, nil
135+
if len(c.Matches) > 0 {
136+
matches, err := c.buildMatchesConfig()
137+
if err != nil {
138+
return nil, err
139+
}
140+
args = append(args, matches...)
141+
}
142+
143+
return args, nil
144+
}
145+
146+
func buildMatchConfig(mc MatchConfig) ([]string, error) {
147+
re := regexp.MustCompile("^[_A-Z]+$")
148+
149+
// Sort keys to be consistent with every run and to be predictable for tests
150+
sortedKeys := make([]string, 0, len(mc))
151+
for key := range mc {
152+
if !re.MatchString(key) {
153+
return []string{}, fmt.Errorf("'%s' is not a valid Systemd field name", key)
154+
}
155+
sortedKeys = append(sortedKeys, key)
156+
}
157+
sort.Strings(sortedKeys)
158+
159+
configs := []string{}
160+
for _, key := range sortedKeys {
161+
configs = append(configs, fmt.Sprintf("%s=%s", key, mc[key]))
162+
}
163+
164+
return configs, nil
165+
}
166+
167+
func (c Config) buildMatchesConfig() ([]string, error) {
168+
matches := []string{}
169+
170+
for i, mc := range c.Matches {
171+
if i > 0 {
172+
matches = append(matches, "+")
173+
}
174+
mcs, err := buildMatchConfig(mc)
175+
if err != nil {
176+
return []string{}, err
177+
}
178+
179+
matches = append(matches, mcs...)
180+
}
181+
182+
return matches, nil
122183
}
123184

124185
// Input is an operator that process logs using journald

pkg/stanza/operator/input/journald/journald_test.go

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"testing"
2525
"time"
2626

27+
"github.com/stretchr/testify/assert"
2728
"github.com/stretchr/testify/mock"
2829
"github.com/stretchr/testify/require"
2930

@@ -115,3 +116,82 @@ func TestInputJournald(t *testing.T) {
115116
require.FailNow(t, "Timed out waiting for entry to be read")
116117
}
117118
}
119+
120+
func TestBuildConfig(t *testing.T) {
121+
testCases := []struct {
122+
Name string
123+
Config func(cfg *Config)
124+
Expected []string
125+
ExpectedError string
126+
}{
127+
{
128+
Name: "empty config",
129+
Config: func(cfg *Config) {},
130+
Expected: []string{"--utc", "--output=json", "--follow", "--priority", "info"},
131+
},
132+
{
133+
Name: "units",
134+
Config: func(cfg *Config) {
135+
cfg.Units = []string{
136+
"dbus.service",
137+
138+
}
139+
},
140+
Expected: []string{"--utc", "--output=json", "--follow", "--unit", "dbus.service", "--unit", "[email protected]", "--priority", "info"},
141+
},
142+
{
143+
Name: "matches",
144+
Config: func(cfg *Config) {
145+
cfg.Matches = []MatchConfig{
146+
{
147+
"_SYSTEMD_UNIT": "dbus.service",
148+
},
149+
{
150+
"_UID": "1000",
151+
"_SYSTEMD_UNIT": "[email protected]",
152+
},
153+
}
154+
},
155+
Expected: []string{"--utc", "--output=json", "--follow", "--priority", "info", "_SYSTEMD_UNIT=dbus.service", "+", "[email protected]", "_UID=1000"},
156+
},
157+
{
158+
Name: "invalid match",
159+
Config: func(cfg *Config) {
160+
cfg.Matches = []MatchConfig{
161+
{
162+
"-SYSTEMD_UNIT": "dbus.service",
163+
},
164+
}
165+
},
166+
ExpectedError: "'-SYSTEMD_UNIT' is not a valid Systemd field name",
167+
},
168+
{
169+
Name: "units and matches",
170+
Config: func(cfg *Config) {
171+
cfg.Units = []string{"ssh"}
172+
cfg.Matches = []MatchConfig{
173+
{
174+
"_SYSTEMD_UNIT": "dbus.service",
175+
},
176+
}
177+
},
178+
Expected: []string{"--utc", "--output=json", "--follow", "--unit", "ssh", "--priority", "info", "_SYSTEMD_UNIT=dbus.service"},
179+
},
180+
}
181+
182+
for _, tt := range testCases {
183+
t.Run(tt.Name, func(t *testing.T) {
184+
cfg := NewConfigWithID("my_journald_input")
185+
tt.Config(cfg)
186+
args, err := cfg.buildArgs()
187+
188+
if tt.ExpectedError != "" {
189+
require.Error(t, err)
190+
require.ErrorContains(t, err, tt.ExpectedError)
191+
return
192+
}
193+
require.NoError(t, err)
194+
assert.Equal(t, tt.Expected, args)
195+
})
196+
}
197+
}

receiver/journaldreceiver/README.md

Lines changed: 60 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,9 @@ Journald receiver is dependent on `journalctl` binary to be present and must be
1616
| `directory` | `/run/log/journal` or `/run/journal` | A directory containing journal files to read entries from |
1717
| `files` | | A list of journal files to read entries from |
1818
| `start_at` | `end` | At startup, where to start reading logs from the file. Options are beginning or end |
19-
| `units` | `[ssh, kubelet, docker, containerd]` | A list of units to read entries from |
20-
| `priority` | `info` | Filter output by message priorities or priority ranges |
19+
| `units` | | A list of units to read entries from. See [Multiple filtering options](#multiple-filtering-options) examples, if you want to use it together with `matches` and/or `priority`. |
20+
| `matches` | | A list of matches to read entries from. See [Matches](#matches) and [Multiple filtering options](#multiple-filtering-options) examples. |
21+
| `priority` | `info` | Filter output by message priorities or priority ranges. See [Multiple filtering options](#multiple-filtering-options) examples, if you want to use it together with `units` and/or `matches`. |
2122
| `storage` | none | The ID of a storage extension to be used to store cursors. Cursors allow the receiver to pick up where it left off in the case of a collector restart. If no storage extension is used, the receiver will manage cursors in memory only. |
2223

2324
### Example Configurations
@@ -34,5 +35,60 @@ receivers:
3435
priority: info
3536
```
3637
37-
[alpha]: https://github.com/open-telemetry/opentelemetry-collector#alpha
38-
[contrib]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib
38+
#### Matches
39+
40+
The following configuration:
41+
42+
```yaml
43+
- type: journald_input
44+
matches:
45+
- _SYSTEMD_UNIT: ssh
46+
- _SYSTEMD_UNIT: kubelet
47+
_UID: "1000"
48+
```
49+
50+
will be passed to `journalctl` as the following arguments: `journalctl ... _SYSTEMD_UNIT=ssh + _SYSTEMD_UNIT=kubelet _UID=1000`,
51+
which is going to retrieve all entries which match at least one of the following rules:
52+
53+
- `_SYSTEMD_UNIT` is `ssh`
54+
- `_SYSTEMD_UNIT` is `kubelet` and `_UID` is `1000`
55+
56+
#### Multiple filtering options
57+
58+
In case of using multiple following options, conditions between them are logically `AND`ed and within them are logically `OR`ed:
59+
60+
```text
61+
( priority )
62+
AND
63+
( units[0] OR units[1] OR units[2] OR ... units[U] )
64+
AND
65+
( matches[0] OR matches[1] OR matches[2] OR ... matches[M] )
66+
```
67+
68+
Consider the following example:
69+
70+
```yaml
71+
- type: journald_input
72+
matches:
73+
- _SYSTEMD_UNIT: ssh
74+
- _SYSTEMD_UNIT: kubelet
75+
_UID: "1000"
76+
units:
77+
- kubelet
78+
- systemd
79+
priority: info
80+
```
81+
82+
The above configuration will be passed to `journalctl` as the following arguments
83+
`journalctl ... --priority=info --unit=kubelet --unit=systemd _SYSTEMD_UNIT=ssh + _SYSTEMD_UNIT=kubelet _UID=1000`,
84+
which is going to effectively retrieve all entries which matches the following set of rules:
85+
86+
- `_PRIORITY` is `6`, and
87+
- `_SYSTEMD_UNIT` is `kubelet` or `systemd`, and
88+
- entry matches at least one of the following rules:
89+
90+
- `_SYSTEMD_UNIT` is `ssh`
91+
- `_SYSTEMD_UNIT` is `kubelet` and `_UID` is `1000`
92+
93+
Note, that if you use some fields which aren't associated with an entry, the entry will always be filtered out.
94+
Also be careful about using unit name in `matches` configuration, as for the above example, none of the entry for `ssh` and `systemd` is going to be retrieved.

0 commit comments

Comments
 (0)