You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
**Description:**
Link to tracking Issue: Fixes#29120
Fixing a bug - panic happens during stop method in async mode only
(didn't affect the default non-async mode).
When stop is called, it closes the messageQueue channel, signaling to
processMessagesAsync to stop running. However, readMessagesAsync
sometimes tries to write into the closed channel (depends whether the
method is currently reading from the closed connection or currently
trying to write to the channel), and as a result, panic error happens.
Separated between wg (waitGroup that serves non-async code and
processMessagesAsync) & the new wg_reader (waitGroup serving
readMessagesAsync only). This allows us to first stop readMessagesAsync,
wait for it to finish, before closing the channel.
Instead, stop (in async mode) should do the following:
1. Close the connection - signaling readMessagesAsync to stop - the
messageQueue channel will remain open until that method is done so
there's no risk of panic (due to writing to a closed channel).
2. Wait for readMessagesAsync to finish (wait for new wg_reader).
3. Close messageQueue channel (signaling processMessagesAsync to stop)
4. Wait for processMessagesAsync to finish (wait sg).
**Link to tracking Issue:** 29120
**Testing:** Unitests ran. Ran concrete strato, stopped & restarted
multiple times, didn't see any panic (and stop completed successfully as
expected)
**Documentation:** None.
readBuffer:=u.readBufferPool.Get().(*[]byte) // Can't reuse the same buffer since same references would be written multiple times to the messageQueue (and cause data override of previous entries)
@@ -372,10 +373,6 @@ func (u *Input) removeTrailingCharactersAndNULsFromBuffer(buffer []byte, n int)
372
373
// Stop will stop listening for udp messages.
373
374
func (u*Input) Stop() error {
374
375
u.stopOnce.Do(func() {
375
-
ifu.AsyncConfig!=nil {
376
-
close(u.messageQueue)
377
-
}
378
-
379
376
ifu.cancel==nil {
380
377
return
381
378
}
@@ -385,6 +382,11 @@ func (u *Input) Stop() error {
385
382
u.Errorf("failed to close UDP connection: %s", err)
386
383
}
387
384
}
385
+
ifu.AsyncConfig!=nil {
386
+
u.wgReader.Wait() // only when all async readers are finished, so there's no risk of sending to a closed channel, do we close messageQueue (which allows the async processors to finish)
0 commit comments