Skip to content

Commit 3c3c107

Browse files
committed
feat(pkg/stanza/operator/input/windows): add support for XML queries
Signed-off-by: Szilard Parrag <[email protected]>
1 parent 1774584 commit 3c3c107

File tree

6 files changed

+84
-10
lines changed

6 files changed

+84
-10
lines changed

.chloggen/eventlogreceiver-query.yaml

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: eventlogreceiver
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: add raw XML query filtering option
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [38517]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user, api]

pkg/stanza/operator/input/windows/config_all.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ type Config struct {
3737
SuppressRenderingInfo bool `mapstructure:"suppress_rendering_info,omitempty"`
3838
ExcludeProviders []string `mapstructure:"exclude_providers,omitempty"`
3939
Remote RemoteConfig `mapstructure:"remote,omitempty"`
40+
Query *string `mapstructure:"query,omitempty"`
4041
}
4142

4243
// RemoteConfig is the configuration for a remote server.

pkg/stanza/operator/input/windows/config_windows.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,12 @@ func (c *Config) Build(set component.TelemetrySettings) (operator.Operator, erro
2424
return nil, err
2525
}
2626

27-
if c.Channel == "" {
28-
return nil, errors.New("missing required `channel` field")
27+
if c.Channel == "" && c.Query == nil {
28+
return nil, errors.New("either `channel` or `query` must be set")
29+
}
30+
31+
if c.Channel != "" && c.Query != nil {
32+
return nil, errors.New("either `channel` or `query` must be set, but not both")
2933
}
3034

3135
if c.MaxReads < 1 {
@@ -52,6 +56,7 @@ func (c *Config) Build(set component.TelemetrySettings) (operator.Operator, erro
5256
raw: c.Raw,
5357
excludeProviders: excludeProvidersSet(c.ExcludeProviders),
5458
remote: c.Remote,
59+
query: c.Query,
5560
}
5661
input.startRemoteSession = input.defaultStartRemoteSession
5762

pkg/stanza/operator/input/windows/input.go

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ type Input struct {
2727
bookmark Bookmark
2828
buffer *Buffer
2929
channel string
30+
query *string
3031
maxReads int
3132
currentMaxReads int
3233
startAt string
@@ -117,7 +118,7 @@ func (i *Input) Start(persister operator.Persister) error {
117118
i.bookmark = NewBookmark()
118119
offsetXML, err := i.getBookmarkOffset(ctx)
119120
if err != nil {
120-
_ = i.persister.Delete(ctx, i.channel)
121+
_ = i.persister.Delete(ctx, i.getPersistKey())
121122
}
122123

123124
if offsetXML != "" {
@@ -133,7 +134,7 @@ func (i *Input) Start(persister operator.Persister) error {
133134
subscription = NewRemoteSubscription(i.remote.Server)
134135
}
135136

136-
if err := subscription.Open(i.startAt, uintptr(i.remoteSessionHandle), i.channel, i.bookmark); err != nil {
137+
if err := subscription.Open(i.startAt, uintptr(i.remoteSessionHandle), i.channel, i.query, i.bookmark); err != nil {
137138
if isNonTransientError(err) {
138139
if i.isRemote() {
139140
return fmt.Errorf("failed to open subscription for remote server %s: %w", i.remote.Server, err)
@@ -225,7 +226,7 @@ func (i *Input) read(ctx context.Context) {
225226
i.Logger().Error("Failed to re-establish remote session", zap.String("server", i.remote.Server), zap.Error(err))
226227
return
227228
}
228-
if err := i.subscription.Open(i.startAt, uintptr(i.remoteSessionHandle), i.channel, i.bookmark); err != nil {
229+
if err := i.subscription.Open(i.startAt, uintptr(i.remoteSessionHandle), i.channel, i.query, i.bookmark); err != nil {
229230
i.Logger().Error("Failed to re-open subscription for remote server", zap.String("server", i.remote.Server), zap.Error(err))
230231
return
231232
}
@@ -334,7 +335,7 @@ func (i *Input) sendEvent(ctx context.Context, eventXML *EventXML) error {
334335

335336
// getBookmarkXML will get the bookmark xml from the offsets database.
336337
func (i *Input) getBookmarkOffset(ctx context.Context) (string, error) {
337-
bytes, err := i.persister.Get(ctx, i.channel)
338+
bytes, err := i.persister.Get(ctx, i.getPersistKey())
338339
return string(bytes), err
339340
}
340341

@@ -351,8 +352,16 @@ func (i *Input) updateBookmarkOffset(ctx context.Context, event Event) {
351352
return
352353
}
353354

354-
if err := i.persister.Set(ctx, i.channel, []byte(bookmarkXML)); err != nil {
355+
if err := i.persister.Set(ctx, i.getPersistKey(), []byte(bookmarkXML)); err != nil {
355356
i.Logger().Error("failed to set offsets", zap.Error(err))
356357
return
357358
}
358359
}
360+
361+
func (i *Input) getPersistKey() string {
362+
if i.query != nil {
363+
return *i.query
364+
}
365+
366+
return i.channel
367+
}

pkg/stanza/operator/input/windows/subscription.go

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,14 @@ type Subscription struct {
2020
startAt string
2121
sessionHandle uintptr
2222
channel string
23+
query *string
2324
bookmark Bookmark
2425
}
2526

2627
// Open will open the subscription handle.
2728
// It returns an error if the subscription handle is already open or if any step in the process fails.
2829
// If the remote server is not reachable, it returns an error indicating the failure.
29-
func (s *Subscription) Open(startAt string, sessionHandle uintptr, channel string, bookmark Bookmark) error {
30+
func (s *Subscription) Open(startAt string, sessionHandle uintptr, channel string, query *string, bookmark Bookmark) error {
3031
if s.handle != 0 {
3132
return errors.New("subscription handle is already open")
3233
}
@@ -39,13 +40,24 @@ func (s *Subscription) Open(startAt string, sessionHandle uintptr, channel strin
3940
_ = windows.CloseHandle(signalEvent)
4041
}()
4142

43+
if channel != "" && query != nil {
44+
return fmt.Errorf("can not supply both query and channel")
45+
}
46+
4247
channelPtr, err := syscall.UTF16PtrFromString(channel)
4348
if err != nil {
4449
return fmt.Errorf("failed to convert channel to utf16: %w", err)
4550
}
4651

52+
var queryPtr *uint16
53+
if query != nil {
54+
if queryPtr, err = syscall.UTF16PtrFromString(*query); err != nil {
55+
return fmt.Errorf("failed to convert XML query to utf16: %w", err)
56+
}
57+
}
58+
4759
flags := s.createFlags(startAt, bookmark)
48-
subscriptionHandle, err := evtSubscribeFunc(sessionHandle, signalEvent, channelPtr, nil, bookmark.handle, 0, 0, flags)
60+
subscriptionHandle, err := evtSubscribeFunc(sessionHandle, signalEvent, channelPtr, queryPtr, bookmark.handle, 0, 0, flags)
4961
if err != nil {
5062
return fmt.Errorf("failed to subscribe to %s channel: %w", channel, err)
5163
}
@@ -55,6 +67,7 @@ func (s *Subscription) Open(startAt string, sessionHandle uintptr, channel strin
5567
s.sessionHandle = sessionHandle
5668
s.channel = channel
5769
s.bookmark = bookmark
70+
s.query = query
5871
return nil
5972
}
6073

@@ -109,7 +122,7 @@ func (s *Subscription) readWithRetry(maxReads int) ([]Event, int, error) {
109122
}
110123

111124
// reopen subscription with the same parameters
112-
if openErr := s.Open(s.startAt, s.sessionHandle, s.channel, s.bookmark); openErr != nil {
125+
if openErr := s.Open(s.startAt, s.sessionHandle, s.channel, s.query, s.bookmark); openErr != nil {
113126
return nil, maxReads, fmt.Errorf("failed to reopen subscription during recovery: %w", openErr)
114127
}
115128

receiver/windowseventlogreceiver/README.md

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ Tails and parses logs from windows event log API using the [opentelemetry-log-co
3535
| `retry_on_failure.max_interval` | `30 seconds` | Upper bound on retry backoff interval. Once this value is reached the delay between consecutive retries will remain constant at the specified value. |
3636
| `retry_on_failure.max_elapsed_time` | `5 minutes` | Maximum amount of time (including retries) spent trying to send a logs batch to a downstream consumer. Once this value is reached, the data is discarded. Retrying never stops if set to `0`. |
3737
| `remote` | object | Remote configuration for connecting to a remote machine to collect logs. Includes server (the address of the remote server), with username, password, and optional domain. |
38+
| `query` | none | XML query used for filtering events. See [Query Schema](https://learn.microsoft.com/en-us/windows/win32/wes/queryschema-schema) |
3839

3940
### Operators
4041

@@ -111,3 +112,21 @@ receivers:
111112
password: "password"
112113
domain: "domain"
113114
```
115+
116+
#### XML Queries
117+
118+
You can use XML queries to filter events. The query is passed to the `query` field in the configuration. The provided query must be a valid XML string. See [XML Event Queries](https://learn.microsoft.com/en-us/previous-versions/aa385231(v=vs.85)#xml-event-queries)
119+
120+
The following example only forwards logs from the `Application` from `foo` or `bar` providers.
121+
122+
```yaml
123+
receivers:
124+
windowseventlog/query:
125+
query: |
126+
<QueryList>
127+
<Query Id="0">
128+
<Select Path="Application">*[System[Provider[@Name='foo']]]</Select>
129+
<Select Path="Application">*[System[Provider[@Name='bar']]]</Select>
130+
</Query>
131+
</QueryList>
132+
```

0 commit comments

Comments
 (0)