Skip to content

Commit 5133f5d

Browse files
committed
feat(pkg/stanza/operator/input/windows): add support for XML queries
Signed-off-by: Szilard Parrag <[email protected]>
1 parent 4930224 commit 5133f5d

File tree

5 files changed

+54
-7
lines changed

5 files changed

+54
-7
lines changed

.chloggen/eventlogreceiver-query.yaml

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: eventlogreceiver
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: add raw XML query filtering option
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [38517]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user, api]

pkg/stanza/operator/input/windows/config_all.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ type Config struct {
3737
SuppressRenderingInfo bool `mapstructure:"suppress_rendering_info,omitempty"`
3838
ExcludeProviders []string `mapstructure:"exclude_providers,omitempty"`
3939
Remote RemoteConfig `mapstructure:"remote,omitempty"`
40+
Query *string `mapstructure:"query,omitempty"`
4041
}
4142

4243
// RemoteConfig is the configuration for a remote server.

pkg/stanza/operator/input/windows/config_windows.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,12 @@ func (c *Config) Build(set component.TelemetrySettings) (operator.Operator, erro
2424
return nil, err
2525
}
2626

27-
if c.Channel == "" {
28-
return nil, fmt.Errorf("missing required `channel` field")
27+
if c.Channel == "" && c.Query == nil {
28+
return nil, fmt.Errorf("either `channel` or `query` must be set")
29+
}
30+
31+
if c.Channel != "" && c.Query != nil {
32+
return nil, fmt.Errorf("either `channel` or `query` must be set, but not both")
2933
}
3034

3135
if c.MaxReads < 1 {
@@ -52,6 +56,7 @@ func (c *Config) Build(set component.TelemetrySettings) (operator.Operator, erro
5256
raw: c.Raw,
5357
excludeProviders: excludeProvidersSet(c.ExcludeProviders),
5458
remote: c.Remote,
59+
query: c.Query,
5560
}
5661
input.startRemoteSession = input.defaultStartRemoteSession
5762

pkg/stanza/operator/input/windows/input.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ type Input struct {
2626
bookmark Bookmark
2727
buffer *Buffer
2828
channel string
29+
query *string
2930
maxReads int
3031
currentMaxReads int
3132
startAt string
@@ -132,7 +133,7 @@ func (i *Input) Start(persister operator.Persister) error {
132133
subscription = NewRemoteSubscription(i.remote.Server)
133134
}
134135

135-
if err := subscription.Open(i.startAt, uintptr(i.remoteSessionHandle), i.channel, i.bookmark); err != nil {
136+
if err := subscription.Open(i.startAt, uintptr(i.remoteSessionHandle), i.channel, i.query, i.bookmark); err != nil {
136137
if isNonTransientError(err) {
137138
if i.isRemote() {
138139
return fmt.Errorf("failed to open subscription for remote server %s: %w", i.remote.Server, err)
@@ -224,7 +225,7 @@ func (i *Input) read(ctx context.Context) {
224225
i.Logger().Error("Failed to re-establish remote session", zap.String("server", i.remote.Server), zap.Error(err))
225226
return
226227
}
227-
if err := i.subscription.Open(i.startAt, uintptr(i.remoteSessionHandle), i.channel, i.bookmark); err != nil {
228+
if err := i.subscription.Open(i.startAt, uintptr(i.remoteSessionHandle), i.channel, i.query, i.bookmark); err != nil {
228229
i.Logger().Error("Failed to re-open subscription for remote server", zap.String("server", i.remote.Server), zap.Error(err))
229230
return
230231
}

pkg/stanza/operator/input/windows/subscription.go

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,14 @@ type Subscription struct {
2020
startAt string
2121
sessionHandle uintptr
2222
channel string
23+
query *string
2324
bookmark Bookmark
2425
}
2526

2627
// Open will open the subscription handle.
2728
// It returns an error if the subscription handle is already open or if any step in the process fails.
2829
// If the remote server is not reachable, it returns an error indicating the failure.
29-
func (s *Subscription) Open(startAt string, sessionHandle uintptr, channel string, bookmark Bookmark) error {
30+
func (s *Subscription) Open(startAt string, sessionHandle uintptr, channel string, query *string, bookmark Bookmark) error {
3031
if s.handle != 0 {
3132
return fmt.Errorf("subscription handle is already open")
3233
}
@@ -39,13 +40,24 @@ func (s *Subscription) Open(startAt string, sessionHandle uintptr, channel strin
3940
_ = windows.CloseHandle(signalEvent)
4041
}()
4142

43+
if channel != "" && query != nil {
44+
return fmt.Errorf("can not supply both query and channel")
45+
}
46+
4247
channelPtr, err := syscall.UTF16PtrFromString(channel)
4348
if err != nil {
4449
return fmt.Errorf("failed to convert channel to utf16: %w", err)
4550
}
4651

52+
var queryPtr *uint16
53+
if query != nil {
54+
if queryPtr, err = syscall.UTF16PtrFromString(*query); err != nil {
55+
return fmt.Errorf("failed to convert channel to utf16: %w", err)
56+
}
57+
}
58+
4759
flags := s.createFlags(startAt, bookmark)
48-
subscriptionHandle, err := evtSubscribeFunc(sessionHandle, signalEvent, channelPtr, nil, bookmark.handle, 0, 0, flags)
60+
subscriptionHandle, err := evtSubscribeFunc(sessionHandle, signalEvent, channelPtr, queryPtr, bookmark.handle, 0, 0, flags)
4961
if err != nil {
5062
return fmt.Errorf("failed to subscribe to %s channel: %w", channel, err)
5163
}
@@ -55,6 +67,7 @@ func (s *Subscription) Open(startAt string, sessionHandle uintptr, channel strin
5567
s.sessionHandle = sessionHandle
5668
s.channel = channel
5769
s.bookmark = bookmark
70+
s.query = query
5871
return nil
5972
}
6073

@@ -109,7 +122,7 @@ func (s *Subscription) readWithRetry(maxReads int) ([]Event, int, error) {
109122
}
110123

111124
// reopen subscription with the same parameters
112-
if openErr := s.Open(s.startAt, s.sessionHandle, s.channel, s.bookmark); openErr != nil {
125+
if openErr := s.Open(s.startAt, s.sessionHandle, s.channel, s.query, s.bookmark); openErr != nil {
113126
return nil, maxReads, fmt.Errorf("failed to reopen subscription during recovery: %w", openErr)
114127
}
115128

0 commit comments

Comments
 (0)