diff --git a/linkerd/proxy/api-resolve/src/resolve.rs b/linkerd/proxy/api-resolve/src/resolve.rs index 78076229a5..18823da0de 100644 --- a/linkerd/proxy/api-resolve/src/resolve.rs +++ b/linkerd/proxy/api-resolve/src/resolve.rs @@ -104,14 +104,10 @@ where } } -impl resolve::Resolution for Resolution { - type Endpoint = Metadata; - type Error = grpc::Status; +impl Stream for Resolution { + type Item = Result, grpc::Status>; - fn poll( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll, Self::Error>> { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut this = self.project(); loop { match ready!(this.inner.as_mut().poll_next(cx)) { @@ -126,7 +122,7 @@ impl resolve::Resolution for Resolution { .collect::>(); if !addr_metas.is_empty() { debug!(endpoints = %addr_metas.len(), "Add"); - return Poll::Ready(Ok(Update::Add(addr_metas))); + return Poll::Ready(Some(Ok(Update::Add(addr_metas)))); } } @@ -137,7 +133,7 @@ impl resolve::Resolution for Resolution { .collect::>(); if !sock_addrs.is_empty() { debug!(endpoints = %sock_addrs.len(), "Remove"); - return Poll::Ready(Ok(Update::Remove(sock_addrs))); + return Poll::Ready(Some(Ok(Update::Remove(sock_addrs)))); } } @@ -148,14 +144,12 @@ impl resolve::Resolution for Resolution { } else { Update::DoesNotExist }; - return Poll::Ready(Ok(update.into())); + return Poll::Ready(Some(Ok(update.into()))); } None => {} // continue }, - None => { - return Poll::Ready(Err(grpc::Status::new(grpc::Code::Ok, "end of stream"))) - } + None => return Poll::Ready(None), }; } } diff --git a/linkerd/proxy/core/src/lib.rs b/linkerd/proxy/core/src/lib.rs index 7102ead4cb..0e5a43e03b 100644 --- a/linkerd/proxy/core/src/lib.rs +++ b/linkerd/proxy/core/src/lib.rs @@ -2,4 +2,4 @@ pub mod resolve; -pub use self::resolve::{Resolution, Resolve}; +pub use self::{resolve::Resolve, resolve::Update}; diff --git a/linkerd/proxy/core/src/resolve.rs b/linkerd/proxy/core/src/resolve.rs index 7fe22fb5ef..16f9362fba 100644 --- a/linkerd/proxy/core/src/resolve.rs +++ b/linkerd/proxy/core/src/resolve.rs @@ -1,14 +1,14 @@ +use futures::stream::TryStream; use linkerd2_error::Error; use std::future::Future; use std::net::SocketAddr; -use std::pin::Pin; use std::task::{Context, Poll}; -/// Resolves `T`-typed names/addresses as a `Resolution`. +/// Resolves `T`-typed names/addresses as an infinite stream of `Update`. pub trait Resolve { type Endpoint; type Error: Into; - type Resolution: Resolution; + type Resolution: TryStream, Error = Self::Error>; type Future: Future>; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll>; @@ -23,27 +23,6 @@ pub trait Resolve { } } -/// An infinite stream of endpoint updates. -pub trait Resolution { - type Endpoint; - type Error: Into; - - fn poll( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll, Self::Error>>; - - fn poll_unpin( - &mut self, - cx: &mut Context<'_>, - ) -> Poll, Self::Error>> - where - Self: Unpin, - { - Pin::new(self).poll(cx) - } -} - #[derive(Clone, Debug)] pub struct Service(S); @@ -57,13 +36,13 @@ pub enum Update { // === impl Resolve === -impl Resolve for S +impl Resolve for S where S: tower::Service, S::Error: Into, - R: Resolution, + R: TryStream, Error = S::Error>, { - type Endpoint = ::Endpoint; + type Endpoint = E; type Error = S::Error; type Resolution = S::Response; type Future = S::Future; diff --git a/linkerd/proxy/discover/src/from_resolve.rs b/linkerd/proxy/discover/src/from_resolve.rs index a031b0fc88..feb8370864 100644 --- a/linkerd/proxy/discover/src/from_resolve.rs +++ b/linkerd/proxy/discover/src/from_resolve.rs @@ -1,6 +1,6 @@ -use futures::{ready, Stream, TryFuture}; +use futures::{ready, Stream, TryFuture, TryStream}; use indexmap::IndexSet; -use linkerd2_proxy_core::resolve::{Resolution, Resolve, Update}; +use linkerd2_proxy_core::resolve::{Resolve, Update}; use pin_project::pin_project; use std::collections::VecDeque; use std::future::Future; @@ -10,45 +10,50 @@ use std::task::{Context, Poll}; use tower::discover::Change; #[derive(Clone, Debug)] -pub struct FromResolve { +pub struct FromResolve { resolve: R, + _marker: std::marker::PhantomData, } #[pin_project] #[derive(Debug)] -pub struct DiscoverFuture { +pub struct DiscoverFuture { #[pin] future: F, + _marker: std::marker::PhantomData, } /// Observes an `R`-typed resolution stream, using an `M`-typed endpoint stack to /// build a service for each endpoint. #[pin_project] -pub struct Discover { +pub struct Discover { #[pin] resolution: R, active: IndexSet, - pending: VecDeque>, + pending: VecDeque>, } // === impl FromResolve === -impl FromResolve { +impl FromResolve { pub fn new(resolve: R) -> Self where R: Resolve, { - Self { resolve } + Self { + resolve, + _marker: std::marker::PhantomData, + } } } -impl tower::Service for FromResolve +impl tower::Service for FromResolve where R: Resolve + Clone, { - type Response = Discover; + type Response = Discover; type Error = R::Error; - type Future = DiscoverFuture; + type Future = DiscoverFuture; #[inline] fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { @@ -59,18 +64,19 @@ where fn call(&mut self, target: T) -> Self::Future { Self::Future { future: self.resolve.resolve(target), + _marker: std::marker::PhantomData, } } } // === impl DiscoverFuture === -impl Future for DiscoverFuture +impl Future for DiscoverFuture where F: TryFuture, - F::Ok: Resolution, + F::Ok: TryStream, { - type Output = Result, F::Error>; + type Output = Result, F::Error>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let resolution = ready!(self.project().future.try_poll(cx))?; @@ -80,7 +86,7 @@ where // === impl Discover === -impl Discover { +impl Discover { pub fn new(resolution: R) -> Self { Self { resolution, @@ -90,8 +96,11 @@ impl Discover { } } -impl Stream for Discover { - type Item = Result, R::Error>; +impl Stream for Discover +where + R: TryStream>, +{ + type Item = Result, R::Error>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { loop { @@ -100,24 +109,27 @@ impl Stream for Discover { return Poll::Ready(Some(Ok(change))); } - match ready!(this.resolution.poll(cx))? { - Update::Add(endpoints) => { - for (addr, endpoint) in endpoints.into_iter() { - this.active.insert(addr); - this.pending.push_back(Change::Insert(addr, endpoint)); + match ready!(this.resolution.try_poll_next(cx)) { + Some(update) => match update? { + Update::Add(endpoints) => { + for (addr, endpoint) in endpoints.into_iter() { + this.active.insert(addr); + this.pending.push_back(Change::Insert(addr, endpoint)); + } } - } - Update::Remove(addrs) => { - for addr in addrs.into_iter() { - if this.active.remove(&addr) { - this.pending.push_back(Change::Remove(addr)); + Update::Remove(addrs) => { + for addr in addrs.into_iter() { + if this.active.remove(&addr) { + this.pending.push_back(Change::Remove(addr)); + } } } - } - Update::DoesNotExist | Update::Empty => { - this.pending - .extend(this.active.drain(..).map(Change::Remove)); - } + Update::DoesNotExist | Update::Empty => { + this.pending + .extend(this.active.drain(..).map(Change::Remove)); + } + }, + None => return Poll::Ready(None), } } } diff --git a/linkerd/proxy/discover/src/lib.rs b/linkerd/proxy/discover/src/lib.rs index 798fde7ff3..cedc7faea7 100644 --- a/linkerd/proxy/discover/src/lib.rs +++ b/linkerd/proxy/discover/src/lib.rs @@ -43,7 +43,7 @@ where T: fmt::Display, R: Resolve + Send + Clone + 'static, R::Error: Into, - R::Endpoint: fmt::Debug + Clone + PartialEq + Send, + R::Endpoint: fmt::Debug + Clone + PartialEq + Send + 'static, R::Resolution: Send + 'static, R::Future: Send + 'static, M: tower::Service + Clone + Send + 'static, @@ -51,7 +51,7 @@ where M::Response: Send + 'static, M::Future: Send + 'static, { - type Service = Buffer, M>>; + type Service = Buffer, M>>; fn layer(&self, make_endpoint: M) -> Self::Service { let make_discover = diff --git a/linkerd/proxy/discover/src/make_endpoint.rs b/linkerd/proxy/discover/src/make_endpoint.rs index 6d44e30fb7..ae67f80410 100644 --- a/linkerd/proxy/discover/src/make_endpoint.rs +++ b/linkerd/proxy/discover/src/make_endpoint.rs @@ -195,18 +195,17 @@ where // services. Don't process any updates until we can do so. ready!(this.make_endpoint.poll_ready(cx)).map_err(Into::into)?; - match ready!(this.discover.poll_discover(cx)) - .expect("XXX(eliza): can this ever be none???") - .map_err(Into::into)? - { - Change::Insert(key, target) => { - // Start building the service and continue. If a pending - // service exists for this addr, it will be canceled. - let fut = this.make_endpoint.call(target); - this.make_futures.push(key, fut); - } - Change::Remove(key) => { - this.pending_removals.push(key); + if let Some(change) = ready!(this.discover.poll_discover(cx)) { + match change.map_err(Into::into)? { + Change::Insert(key, target) => { + // Start building the service and continue. If a pending + // service exists for this addr, it will be canceled. + let fut = this.make_endpoint.call(target); + this.make_futures.push(key, fut); + } + Change::Remove(key) => { + this.pending_removals.push(key); + } } } } diff --git a/linkerd/proxy/resolve/src/make_unpin.rs b/linkerd/proxy/resolve/src/make_unpin.rs index fef01e3aa5..fad1a82e6c 100644 --- a/linkerd/proxy/resolve/src/make_unpin.rs +++ b/linkerd/proxy/resolve/src/make_unpin.rs @@ -1,5 +1,6 @@ +use futures::stream::{Stream, TryStream}; use futures::TryFuture; -use linkerd2_proxy_core::resolve::{self, Resolution, Update}; +use linkerd2_proxy_core::resolve; use pin_project::pin_project; use std::future::Future; use std::pin::Pin; @@ -38,15 +39,11 @@ impl MakeUnpin { } } -impl Resolution for MakeUnpin { - type Endpoint = T::Endpoint; - type Error = T::Error; +impl Stream for MakeUnpin { + type Item = Result; - fn poll( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll, Self::Error>> { - self.project().0.as_mut().as_mut().poll(cx) + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().0.as_mut().as_mut().try_poll_next(cx) } } diff --git a/linkerd/proxy/resolve/src/map_endpoint.rs b/linkerd/proxy/resolve/src/map_endpoint.rs index 6032ec93f2..5b31f74bf1 100644 --- a/linkerd/proxy/resolve/src/map_endpoint.rs +++ b/linkerd/proxy/resolve/src/map_endpoint.rs @@ -1,6 +1,9 @@ //! A middleware that wraps `Resolutions`, modifying their endpoint type. +use futures::stream::Stream; +use futures::stream::TryStream; use futures::{ready, TryFuture}; +use linkerd2_error::Error; use linkerd2_proxy_core::resolve; use pin_project::pin_project; use std::future::Future; @@ -30,11 +33,12 @@ pub struct ResolveFuture { #[pin_project] #[derive(Clone, Debug)] -pub struct Resolution { +pub struct Resolution { #[pin] resolution: R, target: T, map: M, + _marker: std::marker::PhantomData, } // === impl Resolve === @@ -54,7 +58,7 @@ where R: resolve::Resolve, M: MapEndpoint + Clone, { - type Response = Resolution; + type Response = Resolution; type Error = R::Error; type Future = ResolveFuture; @@ -76,13 +80,14 @@ where // === impl ResolveFuture === -impl Future for ResolveFuture +impl Future for ResolveFuture where F: TryFuture, - F::Ok: resolve::Resolution, - M: MapEndpoint::Endpoint>, + F::Ok: TryStream>, + ::Error: Into, + M: MapEndpoint, { - type Output = Result, F::Error>; + type Output = Result, F::Error>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.project(); @@ -93,40 +98,41 @@ where resolution, target, map, + _marker: std::marker::PhantomData, })) } } // === impl Resolution === -impl resolve::Resolution for Resolution +impl Stream for Resolution where - R: resolve::Resolution, - M: MapEndpoint, + R: TryStream>, + R::Error: Into, + M: MapEndpoint, { - type Endpoint = M::Out; - type Error = R::Error; + type Item = Result, R::Error>; - fn poll( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll, Self::Error>> { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.project(); - let update = match ready!(this.resolution.poll(cx))? { - resolve::Update::Add(eps) => { - let mut update = Vec::new(); - for (a, ep) in eps.into_iter() { - let ep = this.map.map_endpoint(&this.target, a, ep); - update.push((a, ep)); + let update = match ready!(this.resolution.try_poll_next(cx)) { + Some(result) => match result? { + resolve::Update::Add(eps) => { + let mut update = Vec::new(); + for (a, ep) in eps.into_iter() { + let ep = this.map.map_endpoint(&this.target, a, ep); + update.push((a, ep)); + } + + resolve::Update::Add(update) } - - resolve::Update::Add(update) - } - resolve::Update::Remove(addrs) => resolve::Update::Remove(addrs), - resolve::Update::DoesNotExist => resolve::Update::DoesNotExist, - resolve::Update::Empty => resolve::Update::Empty, + resolve::Update::Remove(addrs) => resolve::Update::Remove(addrs), + resolve::Update::DoesNotExist => resolve::Update::DoesNotExist, + resolve::Update::Empty => resolve::Update::Empty, + }, + None => return Poll::Ready(None), }; - Poll::Ready(Ok(update)) + Poll::Ready(Some(Ok(update))) } } diff --git a/linkerd/proxy/resolve/src/recover.rs b/linkerd/proxy/resolve/src/recover.rs index 0c562bfd20..0db7e2b66b 100644 --- a/linkerd/proxy/resolve/src/recover.rs +++ b/linkerd/proxy/resolve/src/recover.rs @@ -1,15 +1,19 @@ //! A middleware that recovers a resolution after some failures. -use futures::{prelude::*, ready}; +use futures::stream::TryStream; +use futures::{prelude::*, ready, FutureExt, Stream}; use indexmap::IndexMap; use linkerd2_error::{Error, Recover}; -use linkerd2_proxy_core::resolve::{self, Resolution as _, Update}; +use linkerd2_proxy_core::resolve::{self, Update}; use pin_project::pin_project; use std::future::Future; use std::net::SocketAddr; use std::pin::Pin; use std::task::{Context, Poll}; +#[derive(Clone, Debug)] +pub struct Eos(()); + #[derive(Clone, Debug)] pub struct Resolve { resolve: R, @@ -42,7 +46,7 @@ struct Cache { } #[pin_project] -enum State { +enum State { Disconnected { backoff: Option, }, @@ -62,7 +66,7 @@ enum State { Connected { #[pin] resolution: R, - initial: Option>, + initial: Option, }, Recover { @@ -151,7 +155,7 @@ where // === impl Resolution === -impl resolve::Resolution for Resolution +impl Stream for Resolution where T: Clone, R: resolve::Resolve, @@ -161,20 +165,16 @@ where E: Recover, E::Backoff: Unpin, { - type Endpoint = R::Endpoint; - type Error = Error; + type Item = Result, Error>; - fn poll( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll, Self::Error>> { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut this = self.project(); loop { // If a reconciliation update is buffered (i.e. after // reconcile_after_reconnect), process it immediately. if let Some(update) = this.reconcile.take() { this.update_active(&update); - return Poll::Ready(Ok(update)); + return Poll::Ready(Some(Ok(update))); } match this.inner.state { @@ -195,24 +195,26 @@ where { *this.reconcile = reconcile; this.update_active(&update); - return Poll::Ready(Ok(update)); + return Poll::Ready(Some(Ok(update))); } } // Process the resolution stream, updating the cache. // // Attempt recovery/backoff if the resolution fails. - match ready!(resolution.poll_unpin(cx)) { - Ok(update) => { + + match ready!(resolution.try_poll_next_unpin(cx)) { + Some(Ok(update)) => { this.update_active(&update); - return Poll::Ready(Ok(update)); + return Poll::Ready(Some(Ok(update))); } - Err(e) => { + Some(Err(e)) => { this.inner.state = State::Recover { error: Some(e.into()), backoff: None, } } + None => return Poll::Ready(None), } } // XXX(eliza): note that this match was originally an `if let`, @@ -300,18 +302,26 @@ where State::Pending { ref mut resolution, ref mut backoff, - } => match ready!(resolution.as_mut().expect("illegal state").poll_unpin(cx)) { - Err(e) => State::Recover { + } => match ready!(resolution + .as_mut() + .expect("illegal state") + .try_poll_next_unpin(cx)) + { + Some(Err(e)) => State::Recover { error: Some(e.into()), backoff: backoff.take(), }, - Ok(initial) => { + Some(Ok(initial)) => { tracing::trace!("connected"); State::Connected { resolution: resolution.take().expect("illegal state"), initial: Some(initial), } } + None => State::Recover { + error: Some(Eos(()).into()), + backoff: backoff.take(), + }, }, State::Connected { .. } => return Poll::Ready(Ok(())), @@ -394,6 +404,14 @@ fn reconcile_after_connect( } } +impl std::fmt::Display for Eos { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "end of stream reached") + } +} + +impl std::error::Error for Eos {} + #[cfg(test)] mod tests { use super::*;