diff --git a/linkerd/app/integration/src/proxy.rs b/linkerd/app/integration/src/proxy.rs index ca8887fb77..da7b986c51 100644 --- a/linkerd/app/integration/src/proxy.rs +++ b/linkerd/app/integration/src/proxy.rs @@ -41,7 +41,8 @@ pub struct Listening { controller: controller::Listening, identity: Option, - _shutdown: Shutdown, + shutdown: Shutdown, + terminated: oneshot::Receiver<()>, thread: thread::JoinHandle<()>, } @@ -162,9 +163,19 @@ impl Listening { controller, identity, thread, + shutdown, + terminated, .. } = self; - drop(thread); + + debug!("signaling shutdown"); + shutdown.signal(); + + debug!("waiting for proxy termination"); + terminated.await.unwrap(); + + debug!("proxy terminated"); + thread.join().unwrap(); let outbound = async move { if let Some(srv) = outbound_server { @@ -292,6 +303,7 @@ async fn run(proxy: Proxy, mut env: TestEnv, random_ports: bool) -> Listening { let (trace, trace_handle) = super::trace_subscriber(); let (running_tx, running_rx) = oneshot::channel(); + let (term_tx, term_rx) = oneshot::channel(); let (tx, mut rx) = shutdown_signal(); if let Some(fut) = proxy.shutdown_signal { @@ -339,9 +351,12 @@ async fn run(proxy: Proxy, mut env: TestEnv, random_ports: bool) -> Listening { let drain = main.spawn(); on_shutdown.await; debug!("after on_shutdown"); + drain.drain().await; + debug!("after drain"); - debug!("after on_shutdown"); + // Suppress error as not all tests wait for graceful shutdown + let _ = term_tx.send(()); }); }) }) @@ -383,7 +398,8 @@ async fn run(proxy: Proxy, mut env: TestEnv, random_ports: bool) -> Listening { controller, identity, - _shutdown: tx, + shutdown: tx, + terminated: term_rx, thread, } } diff --git a/linkerd/app/integration/tests/transparency.rs b/linkerd/app/integration/tests/transparency.rs index cffe0e178f..547307c840 100644 --- a/linkerd/app/integration/tests/transparency.rs +++ b/linkerd/app/integration/tests/transparency.rs @@ -69,6 +69,10 @@ async fn outbound_tcp() { tcp_client.write(msg1).await; assert_eq!(tcp_client.read().await, msg2.as_bytes()); + + // TCP client must close first + tcp_client.shutdown().await; + // ensure panics from the server are propagated proxy.join_servers().await; } @@ -95,6 +99,10 @@ async fn inbound_tcp() { tcp_client.write(msg1).await; assert_eq!(tcp_client.read().await, msg2.as_bytes()); + + // TCP client must close first + tcp_client.shutdown().await; + // ensure panics from the server are propagated proxy.join_servers().await; } @@ -181,6 +189,10 @@ async fn test_server_speaks_first(env: TestEnv) { assert_eq!(s(&tcp_client.read_timeout(TIMEOUT).await), msg1); tcp_client.write(msg2).await; timeout(TIMEOUT, rx.recv()).await.unwrap(); + + // TCP client must close first + tcp_client.shutdown().await; + // ensure panics from the server are propagated proxy.join_servers().await; } @@ -491,6 +503,9 @@ macro_rules! http1_tests { let chat_resp = tcp_client.read().await; assert_eq!(s(&chat_resp), chatproto_res); + // TCP client must close first + tcp_client.shutdown().await; + // ensure panics from the server are propagated proxy.join_servers().await; } @@ -640,6 +655,9 @@ macro_rules! http1_tests { let resp2 = tcp_client.read().await; assert_eq!(s(&resp2), s(&tunneled_res[..])); + // TCP client must close first + tcp_client.shutdown().await; + // ensure panics from the server are propagated proxy.join_servers().await; } diff --git a/linkerd/app/src/lib.rs b/linkerd/app/src/lib.rs index c5b977607a..a16ad523ab 100644 --- a/linkerd/app/src/lib.rs +++ b/linkerd/app/src/lib.rs @@ -291,7 +291,7 @@ impl App { .. } = self; - // Run a daemon thread for all administative tasks. + // Run a daemon thread for all administrative tasks. // // The main reactor holds `admin_shutdown_tx` until the reactor drops // the task. This causes the daemon reactor to stop.