Skip to content

[pkg/stanza/operator/input/windows] [receiver/windowseventlogreceiver] add raw XML query support #39055

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Apr 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/eventlogreceiver-query.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: eventlogreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: add raw XML query filtering option

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [38517]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user, api]
1 change: 1 addition & 0 deletions pkg/stanza/operator/input/windows/config_all.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type Config struct {
SuppressRenderingInfo bool `mapstructure:"suppress_rendering_info,omitempty"`
ExcludeProviders []string `mapstructure:"exclude_providers,omitempty"`
Remote RemoteConfig `mapstructure:"remote,omitempty"`
Query *string `mapstructure:"query,omitempty"`
}

// RemoteConfig is the configuration for a remote server.
Expand Down
9 changes: 7 additions & 2 deletions pkg/stanza/operator/input/windows/config_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,12 @@ func (c *Config) Build(set component.TelemetrySettings) (operator.Operator, erro
return nil, err
}

if c.Channel == "" {
return nil, errors.New("missing required `channel` field")
if c.Channel == "" && c.Query == nil {
return nil, errors.New("either `channel` or `query` must be set")
}

if c.Channel != "" && c.Query != nil {
return nil, errors.New("either `channel` or `query` must be set, but not both")
}

if c.MaxReads < 1 {
Expand All @@ -52,6 +56,7 @@ func (c *Config) Build(set component.TelemetrySettings) (operator.Operator, erro
raw: c.Raw,
excludeProviders: excludeProvidersSet(c.ExcludeProviders),
remote: c.Remote,
query: c.Query,
}
input.startRemoteSession = input.defaultStartRemoteSession

Expand Down
19 changes: 14 additions & 5 deletions pkg/stanza/operator/input/windows/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type Input struct {
bookmark Bookmark
buffer *Buffer
channel string
query *string
maxReads int
currentMaxReads int
startAt string
Expand Down Expand Up @@ -117,7 +118,7 @@ func (i *Input) Start(persister operator.Persister) error {
i.bookmark = NewBookmark()
offsetXML, err := i.getBookmarkOffset(ctx)
if err != nil {
_ = i.persister.Delete(ctx, i.channel)
_ = i.persister.Delete(ctx, i.getPersistKey())
}

if offsetXML != "" {
Expand All @@ -133,7 +134,7 @@ func (i *Input) Start(persister operator.Persister) error {
subscription = NewRemoteSubscription(i.remote.Server)
}

if err := subscription.Open(i.startAt, uintptr(i.remoteSessionHandle), i.channel, i.bookmark); err != nil {
if err := subscription.Open(i.startAt, uintptr(i.remoteSessionHandle), i.channel, i.query, i.bookmark); err != nil {
if isNonTransientError(err) {
if i.isRemote() {
return fmt.Errorf("failed to open subscription for remote server %s: %w", i.remote.Server, err)
Expand Down Expand Up @@ -225,7 +226,7 @@ func (i *Input) read(ctx context.Context) {
i.Logger().Error("Failed to re-establish remote session", zap.String("server", i.remote.Server), zap.Error(err))
return
}
if err := i.subscription.Open(i.startAt, uintptr(i.remoteSessionHandle), i.channel, i.bookmark); err != nil {
if err := i.subscription.Open(i.startAt, uintptr(i.remoteSessionHandle), i.channel, i.query, i.bookmark); err != nil {
i.Logger().Error("Failed to re-open subscription for remote server", zap.String("server", i.remote.Server), zap.Error(err))
return
}
Expand Down Expand Up @@ -334,7 +335,7 @@ func (i *Input) sendEvent(ctx context.Context, eventXML *EventXML) error {

// getBookmarkXML will get the bookmark xml from the offsets database.
func (i *Input) getBookmarkOffset(ctx context.Context) (string, error) {
bytes, err := i.persister.Get(ctx, i.channel)
bytes, err := i.persister.Get(ctx, i.getPersistKey())
return string(bytes), err
}

Expand All @@ -351,8 +352,16 @@ func (i *Input) updateBookmarkOffset(ctx context.Context, event Event) {
return
}

if err := i.persister.Set(ctx, i.channel, []byte(bookmarkXML)); err != nil {
if err := i.persister.Set(ctx, i.getPersistKey(), []byte(bookmarkXML)); err != nil {
i.Logger().Error("failed to set offsets", zap.Error(err))
return
}
}

func (i *Input) getPersistKey() string {
if i.query != nil {
return *i.query
}

return i.channel
}
19 changes: 16 additions & 3 deletions pkg/stanza/operator/input/windows/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@ type Subscription struct {
startAt string
sessionHandle uintptr
channel string
query *string
bookmark Bookmark
}

// Open will open the subscription handle.
// It returns an error if the subscription handle is already open or if any step in the process fails.
// If the remote server is not reachable, it returns an error indicating the failure.
func (s *Subscription) Open(startAt string, sessionHandle uintptr, channel string, bookmark Bookmark) error {
func (s *Subscription) Open(startAt string, sessionHandle uintptr, channel string, query *string, bookmark Bookmark) error {
if s.handle != 0 {
return errors.New("subscription handle is already open")
}
Expand All @@ -39,13 +40,24 @@ func (s *Subscription) Open(startAt string, sessionHandle uintptr, channel strin
_ = windows.CloseHandle(signalEvent)
}()

if channel != "" && query != nil {
return errors.New("can not supply both query and channel")
}

channelPtr, err := syscall.UTF16PtrFromString(channel)
if err != nil {
return fmt.Errorf("failed to convert channel to utf16: %w", err)
}

var queryPtr *uint16
if query != nil {
if queryPtr, err = syscall.UTF16PtrFromString(*query); err != nil {
return fmt.Errorf("failed to convert XML query to utf16: %w", err)
}
}

flags := s.createFlags(startAt, bookmark)
subscriptionHandle, err := evtSubscribeFunc(sessionHandle, signalEvent, channelPtr, nil, bookmark.handle, 0, 0, flags)
subscriptionHandle, err := evtSubscribeFunc(sessionHandle, signalEvent, channelPtr, queryPtr, bookmark.handle, 0, 0, flags)
if err != nil {
return fmt.Errorf("failed to subscribe to %s channel: %w", channel, err)
}
Expand All @@ -55,6 +67,7 @@ func (s *Subscription) Open(startAt string, sessionHandle uintptr, channel strin
s.sessionHandle = sessionHandle
s.channel = channel
s.bookmark = bookmark
s.query = query
return nil
}

Expand Down Expand Up @@ -109,7 +122,7 @@ func (s *Subscription) readWithRetry(maxReads int) ([]Event, int, error) {
}

// reopen subscription with the same parameters
if openErr := s.Open(s.startAt, s.sessionHandle, s.channel, s.bookmark); openErr != nil {
if openErr := s.Open(s.startAt, s.sessionHandle, s.channel, s.query, s.bookmark); openErr != nil {
return nil, maxReads, fmt.Errorf("failed to reopen subscription during recovery: %w", openErr)
}

Expand Down
19 changes: 19 additions & 0 deletions receiver/windowseventlogreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ Tails and parses logs from windows event log API using the [opentelemetry-log-co
| `retry_on_failure.max_interval` | `30 seconds` | Upper bound on retry backoff interval. Once this value is reached the delay between consecutive retries will remain constant at the specified value. |
| `retry_on_failure.max_elapsed_time` | `5 minutes` | Maximum amount of time (including retries) spent trying to send a logs batch to a downstream consumer. Once this value is reached, the data is discarded. Retrying never stops if set to `0`. |
| `remote` | object | Remote configuration for connecting to a remote machine to collect logs. Includes server (the address of the remote server), with username, password, and optional domain. |
| `query` | none | XML query used for filtering events. See [Query Schema](https://learn.microsoft.com/en-us/windows/win32/wes/queryschema-schema) |

### Operators

Expand Down Expand Up @@ -111,3 +112,21 @@ receivers:
password: "password"
domain: "domain"
```

#### XML Queries

You can use XML queries to filter events. The query is passed to the `query` field in the configuration. The provided query must be a valid XML string. See [XML Event Queries](https://learn.microsoft.com/en-us/previous-versions/aa385231(v=vs.85)#xml-event-queries)

The following example only forwards logs from the `Application` from `foo` or `bar` providers.

```yaml
receivers:
windowseventlog/query:
query: |
<QueryList>
<Query Id="0">
<Select Path="Application">*[System[Provider[@Name='foo']]]</Select>
<Select Path="Application">*[System[Provider[@Name='bar']]]</Select>
</Query>
</QueryList>
```
76 changes: 76 additions & 0 deletions receiver/windowseventlogreceiver/receiver_windows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,59 @@ func TestReadWindowsEventLogger(t *testing.T) {
require.Equal(t, int64(10), eventIDMap["id"])
}

func TestReadWindowsEventLoggerWithQuery(t *testing.T) {
logMessage := "Test log"
src := "otel-windowseventlogreceiver-test"
uninstallEventSource, err := assertEventSourceInstallation(t, src)
defer uninstallEventSource()
require.NoError(t, err)

ctx := context.Background()
factory := newFactoryAdapter()
createSettings := receivertest.NewNopSettings(metadata.Type)
cfg := createTestConfigWithQuery()
sink := new(consumertest.LogsSink)

receiver, err := factory.CreateLogs(ctx, createSettings, cfg, sink)
require.NoError(t, err)

err = receiver.Start(ctx, componenttest.NewNopHost())
require.NoError(t, err)
defer func() {
require.NoError(t, receiver.Shutdown(ctx))
}()
// Start launches nested goroutines, give them a chance to run before logging the test event(s).
time.Sleep(3 * time.Second)

logger, err := eventlog.Open(src)
require.NoError(t, err)
defer logger.Close()

err = logger.Info(10, logMessage)
require.NoError(t, err)

records := assertExpectedLogRecords(t, sink, src, 1)
require.Len(t, records, 1)
record := records[0]
body := record.Body().Map().AsRaw()

require.Equal(t, logMessage, body["message"])

eventData := body["event_data"]
eventDataMap, ok := eventData.(map[string]any)
require.True(t, ok)
require.Equal(t, map[string]any{
"data": []any{map[string]any{"": "Test log"}},
}, eventDataMap)

eventID := body["event_id"]
require.NotNil(t, eventID)

eventIDMap, ok := eventID.(map[string]any)
require.True(t, ok)
require.Equal(t, int64(10), eventIDMap["id"])
}

func TestReadWindowsEventLoggerRaw(t *testing.T) {
logMessage := "Test log"
src := "otel-windowseventlogreceiver-test"
Expand Down Expand Up @@ -298,6 +351,28 @@ func createTestConfig() *WindowsLogConfig {
}
}

func createTestConfigWithQuery() *WindowsLogConfig {
queryXML := `
<QueryList>
<Query Id="0">
<Select Path="Application">*</Select>
</Query>
</QueryList>
`
return &WindowsLogConfig{
BaseConfig: adapter.BaseConfig{
Operators: []operator.Config{},
RetryOnFailure: consumerretry.NewDefaultConfig(),
},
InputConfig: func() windows.Config {
c := windows.NewConfig()
c.Query = &queryXML
c.StartAt = "end"
return *c
}(),
}
}

// assertEventSourceInstallation installs an event source and verifies that the registry key was created.
// It returns a function that can be used to uninstall the event source, that function is never nil
func assertEventSourceInstallation(t *testing.T, src string) (uninstallEventSource func(), err error) {
Expand All @@ -321,6 +396,7 @@ func assertEventSourceInstallation(t *testing.T, src string) (uninstallEventSour
return
}

//nolint:unparam // expectedEventCount might be greater than one in the future
func assertExpectedLogRecords(t *testing.T, sink *consumertest.LogsSink, expectedEventSrc string, expectedEventCount int) []plog.LogRecord {
var actualLogRecords []plog.LogRecord

Expand Down