Skip to content

Commit 4930224

Browse files
authored
[pkg/stanza] [receiver/windowseventlog] Fix: Windows Event Max Read (ERRNO 1734) (open-telemetry#38149)
1 parent 3bd2b28 commit 4930224

File tree

5 files changed

+168
-10
lines changed

5 files changed

+168
-10
lines changed
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: bug_fix
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: pkg/stanza
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Add retries when calls to retrieve Windows event via `EvtNext` fail with error RPC_S_INVALID_BOUND (1734).
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [38149]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: |
19+
Whenever large events were read in by the Windows event log receiver, via the stanza input operator,
20+
the collector would fail with error RPC_S_INVALID_BOUND (1734). Now the operator tries to workaround
21+
this issue by reducing the number of events read on each attempt.
22+
# If your change doesn't affect end users or the exported elements of any package,
23+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
24+
# Optional: The change log or logs in which this entry should be included.
25+
# e.g. '[user]' or '[user, api]'
26+
# Include 'user' if the change is relevant to end users.
27+
# Include 'api' if there is a change to a library API.
28+
# Default: '[user]'
29+
change_logs: [user]

pkg/stanza/operator/input/windows/config_windows.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ func (c *Config) Build(set component.TelemetrySettings) (operator.Operator, erro
4646
buffer: NewBuffer(),
4747
channel: c.Channel,
4848
maxReads: c.MaxReads,
49+
currentMaxReads: c.MaxReads,
4950
startAt: c.StartAt,
5051
pollInterval: c.PollInterval,
5152
raw: c.Raw,

pkg/stanza/operator/input/windows/input.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ type Input struct {
2727
buffer *Buffer
2828
channel string
2929
maxReads int
30+
currentMaxReads int
3031
startAt string
3132
raw bool
3233
excludeProviders map[string]struct{}
@@ -197,7 +198,14 @@ func (i *Input) readOnInterval(ctx context.Context) {
197198

198199
// read will read events from the subscription.
199200
func (i *Input) read(ctx context.Context) {
200-
events, err := i.subscription.Read(i.maxReads)
201+
events, actualMaxReads, err := i.subscription.Read(i.currentMaxReads)
202+
203+
// Update the current max reads if it changed
204+
if err == nil && actualMaxReads < i.currentMaxReads {
205+
i.currentMaxReads = actualMaxReads
206+
i.Logger().Debug("Encountered RPC_S_INVALID_BOUND, reduced batch size", zap.Int("current_batch_size", i.currentMaxReads), zap.Int("original_batch_size", i.maxReads))
207+
}
208+
201209
if err != nil {
202210
i.Logger().Error("Failed to read events from subscription", zap.Error(err))
203211
if i.isRemote() && (errors.Is(err, windows.ERROR_INVALID_HANDLE) || errors.Is(err, errSubscriptionHandleNotOpen)) {
@@ -230,6 +238,9 @@ func (i *Input) read(ctx context.Context) {
230238
}
231239
if len(events) == n+1 {
232240
i.updateBookmarkOffset(ctx, event)
241+
if err := i.subscription.bookmark.Update(event); err != nil {
242+
i.Logger().Error("Failed to update bookmark from event", zap.Error(err))
243+
}
233244
}
234245
event.Close()
235246
}

pkg/stanza/operator/input/windows/input_test.go

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,16 @@
66
package windows // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/input/windows"
77

88
import (
9+
"context"
910
"errors"
1011
"testing"
1112
"time"
1213

1314
"github.com/stretchr/testify/assert"
15+
"github.com/stretchr/testify/require"
1416
"go.opentelemetry.io/collector/component"
1517
"go.uber.org/zap"
18+
"go.uber.org/zap/zaptest/observer"
1619
"golang.org/x/sys/windows"
1720

1821
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/testutil"
@@ -128,3 +131,83 @@ func TestInputStart_BadChannelName(t *testing.T) {
128131
assert.ErrorContains(t, err, "failed to open subscription for remote server")
129132
assert.ErrorContains(t, err, "The specified channel could not be found")
130133
}
134+
135+
// TestInputRead_RPCInvalidBound tests that the Input handles RPC_S_INVALID_BOUND errors properly
136+
func TestInputRead_RPCInvalidBound(t *testing.T) {
137+
// Save original procs and restore after test
138+
originalNextProc := nextProc
139+
originalCloseProc := closeProc
140+
originalSubscribeProc := subscribeProc
141+
142+
// Track calls to our mocked functions
143+
var nextCalls, closeCalls, subscribeCalls int
144+
145+
// Mock the procs
146+
closeProc = MockProc{
147+
call: func(_ ...uintptr) (uintptr, uintptr, error) {
148+
closeCalls++
149+
return 1, 0, nil
150+
},
151+
}
152+
153+
subscribeProc = MockProc{
154+
call: func(_ ...uintptr) (uintptr, uintptr, error) {
155+
subscribeCalls++
156+
return 42, 0, nil
157+
},
158+
}
159+
160+
nextProc = MockProc{
161+
call: func(_ ...uintptr) (uintptr, uintptr, error) {
162+
nextCalls++
163+
if nextCalls == 1 {
164+
return 0, 0, windows.RPC_S_INVALID_BOUND
165+
}
166+
167+
return 1, 0, nil
168+
},
169+
}
170+
171+
defer func() {
172+
nextProc = originalNextProc
173+
closeProc = originalCloseProc
174+
subscribeProc = originalSubscribeProc
175+
}()
176+
177+
// Create a logger with an observer for testing log output
178+
core, logs := observer.New(zap.DebugLevel)
179+
logger := zap.New(core)
180+
181+
// Create input instance with mocked dependencies
182+
input := newInput(component.TelemetrySettings{
183+
Logger: logger,
184+
})
185+
186+
// Set up test values
187+
input.maxReads = 100
188+
input.currentMaxReads = 100
189+
190+
// Set up subscription with valid handle and enough info to reopen
191+
input.subscription = Subscription{
192+
handle: 42, // Dummy handle
193+
startAt: "beginning",
194+
sessionHandle: 0,
195+
channel: "test-channel",
196+
}
197+
198+
// Call the method under test
199+
ctx := context.Background()
200+
input.read(ctx)
201+
202+
// Verify the correct number of calls to each mock
203+
assert.Equal(t, 2, nextCalls, "nextProc should be called twice (initial failure and retry)")
204+
assert.Equal(t, 1, closeCalls, "closeProc should be called once to close subscription")
205+
assert.Equal(t, 1, subscribeCalls, "subscribeProc should be called once to reopen subscription")
206+
207+
// Verify that batch size was reduced
208+
assert.Equal(t, 50, input.currentMaxReads)
209+
210+
// Verify that a warning log was generated
211+
require.Equal(t, 1, logs.Len())
212+
assert.Contains(t, logs.All()[0].Message, "Encountered RPC_S_INVALID_BOUND")
213+
}

pkg/stanza/operator/input/windows/subscription.go

Lines changed: 43 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,12 @@ import (
1515

1616
// Subscription is a subscription to a windows eventlog channel.
1717
type Subscription struct {
18-
handle uintptr
19-
Server string
18+
handle uintptr
19+
Server string
20+
startAt string
21+
sessionHandle uintptr
22+
channel string
23+
bookmark Bookmark
2024
}
2125

2226
// Open will open the subscription handle.
@@ -47,6 +51,10 @@ func (s *Subscription) Open(startAt string, sessionHandle uintptr, channel strin
4751
}
4852

4953
s.handle = subscriptionHandle
54+
s.startAt = startAt
55+
s.sessionHandle = sessionHandle
56+
s.channel = channel
57+
s.bookmark = bookmark
5058
return nil
5159
}
5260

@@ -66,26 +74,52 @@ func (s *Subscription) Close() error {
6674

6775
var errSubscriptionHandleNotOpen = errors.New("subscription handle is not open")
6876

69-
// Read will read events from the subscription.
70-
func (s *Subscription) Read(maxReads int) ([]Event, error) {
77+
func (s *Subscription) Read(maxReads int) ([]Event, int, error) {
7178
if s.handle == 0 {
72-
return nil, errSubscriptionHandleNotOpen
79+
return nil, 0, errSubscriptionHandleNotOpen
7380
}
7481

7582
if maxReads < 1 {
76-
return nil, fmt.Errorf("max reads must be greater than 0")
83+
return nil, 0, fmt.Errorf("max reads must be greater than 0")
7784
}
7885

86+
events, actualMaxReads, err := s.readWithRetry(maxReads)
87+
if err != nil {
88+
return nil, 0, err
89+
}
90+
91+
return events, actualMaxReads, nil
92+
}
93+
94+
// readWithRetry will read events from the subscription with dynamic batch sizing if the RPC_S_INVALID_BOUND error occurs.
95+
func (s *Subscription) readWithRetry(maxReads int) ([]Event, int, error) {
7996
eventHandles := make([]uintptr, maxReads)
8097
var eventsRead uint32
98+
8199
err := evtNext(s.handle, uint32(maxReads), &eventHandles[0], 0, 0, &eventsRead)
82100

83101
if errors.Is(err, ErrorInvalidOperation) && eventsRead == 0 {
84-
return nil, nil
102+
return nil, maxReads, nil
103+
}
104+
105+
if errors.Is(err, windows.RPC_S_INVALID_BOUND) {
106+
// close current subscription
107+
if closeErr := s.Close(); closeErr != nil {
108+
return nil, maxReads, fmt.Errorf("failed to close subscription during recovery: %w", closeErr)
109+
}
110+
111+
// reopen subscription with the same parameters
112+
if openErr := s.Open(s.startAt, s.sessionHandle, s.channel, s.bookmark); openErr != nil {
113+
return nil, maxReads, fmt.Errorf("failed to reopen subscription during recovery: %w", openErr)
114+
}
115+
116+
// retry with half the batch size
117+
newMaxReads := max(maxReads/2, 1)
118+
return s.readWithRetry(newMaxReads)
85119
}
86120

87121
if err != nil && !errors.Is(err, windows.ERROR_NO_MORE_ITEMS) {
88-
return nil, err
122+
return nil, maxReads, err
89123
}
90124

91125
events := make([]Event, 0, eventsRead)
@@ -94,7 +128,7 @@ func (s *Subscription) Read(maxReads int) ([]Event, error) {
94128
events = append(events, event)
95129
}
96130

97-
return events, nil
131+
return events, maxReads, nil
98132
}
99133

100134
// createFlags will create the necessary subscription flags from the supplied arguments.

0 commit comments

Comments
 (0)