Skip to content

Commit a053a2e

Browse files
benjamin-awdprontthomasqueirozb
authored
fix(websocket source): reconnect indefinitely when connection fails (#24069)
* fix(websocket source): reconnect indefinitely when connection fails * chore: add changelog fragment * Update src/common/websocket.rs * Remove references to now deleted fresh_backoff * Fix expect()() * fix: add missing protocol --------- Co-authored-by: Pavlos Rontidis <[email protected]> Co-authored-by: Thomas <[email protected]>
1 parent 0f99849 commit a053a2e

File tree

4 files changed

+50
-20
lines changed

4 files changed

+50
-20
lines changed
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
Fixed the `websocket` source entering a "zombie" state when the `connect_timeout_secs` threshold was reached with multiple sources running. The connection timeout is now applied per connect attempt with indefinite retries, rather than as a total timeout limit.
2+
3+
authors: benjamin-awd

src/common/websocket.rs

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ pub enum WebSocketError {
4040
DnsError { source: dns::DnsError },
4141
#[snafu(display("No addresses returned."))]
4242
NoAddresses,
43+
#[snafu(display("Connection attempt timed out"))]
44+
ConnectionTimedOut,
4345
}
4446

4547
#[derive(Clone)]
@@ -138,6 +140,41 @@ impl WebSocketConnector {
138140
}
139141
}
140142

143+
/// Connects with exponential backoff, applying a timeout to each individual connection attempt.
144+
/// This will retry forever until a connection is established.
145+
pub(crate) async fn connect_backoff_with_timeout(
146+
&self,
147+
timeout_duration: Duration,
148+
) -> WebSocketStream<MaybeTlsStream<TcpStream>> {
149+
let mut backoff = ExponentialBackoff::default();
150+
151+
loop {
152+
match time::timeout(timeout_duration, self.connect()).await {
153+
Ok(Ok(ws_stream)) => {
154+
emit!(WebSocketConnectionEstablished {});
155+
return ws_stream;
156+
}
157+
Ok(Err(error)) => {
158+
emit!(WebSocketConnectionFailedError {
159+
error: Box::new(error)
160+
});
161+
}
162+
Err(_) => {
163+
emit!(WebSocketConnectionFailedError {
164+
error: Box::new(WebSocketError::ConnectionTimedOut),
165+
});
166+
}
167+
}
168+
169+
time::sleep(
170+
backoff
171+
.next()
172+
.expect("backoff iterator always returns some value"),
173+
)
174+
.await;
175+
}
176+
}
177+
141178
#[cfg(feature = "sinks-websocket")]
142179
pub(crate) async fn healthcheck(&self) -> crate::Result<()> {
143180
self.connect().await.map(|_| ()).map_err(Into::into)

src/internal_events/websocket.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ impl InternalEvent for WebSocketConnectionFailedError {
4242
);
4343
counter!(
4444
"component_errors_total",
45+
"protocol" => PROTOCOL,
4546
"error_code" => "websocket_connection_failed",
4647
"error_type" => error_type::CONNECTION_FAILED,
4748
"stage" => error_stage::SENDING,
@@ -209,6 +210,7 @@ impl InternalEvent for WebSocketSendError<'_> {
209210
);
210211
counter!(
211212
"component_errors_total",
213+
"protocol" => PROTOCOL,
212214
"error_code" => "websocket_send_error",
213215
"error_type" => error_type::CONNECTION_FAILED,
214216
"stage" => error_stage::PROCESSING,

src/sources/websocket/source.rs

Lines changed: 8 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,9 @@ use crate::{
2121
common::websocket::{PingInterval, WebSocketConnector, is_closed},
2222
config::SourceContext,
2323
internal_events::{
24-
ConnectionOpen, OpenGauge, PROTOCOL, WebSocketBytesReceived, WebSocketConnectionError,
25-
WebSocketConnectionEstablished, WebSocketConnectionFailedError,
26-
WebSocketConnectionShutdown, WebSocketKind, WebSocketMessageReceived,
27-
WebSocketReceiveError, WebSocketSendError,
24+
ConnectionOpen, OpenGauge, PROTOCOL, WebSocketBytesReceived,
25+
WebSocketConnectionFailedError, WebSocketConnectionShutdown, WebSocketKind,
26+
WebSocketMessageReceived, WebSocketReceiveError, WebSocketSendError,
2827
},
2928
sources::websocket::config::WebSocketConfig,
3029
vector_lib::codecs::StreamDecodingError,
@@ -297,23 +296,12 @@ impl WebSocketSource {
297296
async fn try_create_sink_and_stream(
298297
&self,
299298
) -> Result<(WebSocketSink, WebSocketStream), WebSocketSourceError> {
300-
let connect_future = self.params.connector.connect_backoff();
301-
let timeout = self.config.connect_timeout_secs;
302-
303-
let ws_stream = match time::timeout(timeout, connect_future).await {
304-
Ok(ws) => ws,
305-
Err(_) => {
306-
emit!(WebSocketConnectionError {
307-
error: TungsteniteError::Io(std::io::Error::new(
308-
std::io::ErrorKind::TimedOut,
309-
"Connection attempt timed out",
310-
))
311-
});
312-
return Err(WebSocketSourceError::ConnectTimeout);
313-
}
314-
};
299+
let ws_stream = self
300+
.params
301+
.connector
302+
.connect_backoff_with_timeout(self.config.connect_timeout_secs)
303+
.await;
315304

316-
emit!(WebSocketConnectionEstablished {});
317305
let (sink, stream) = ws_stream.split();
318306

319307
Ok((Box::pin(sink), Box::pin(stream)))

0 commit comments

Comments
 (0)