diff --git a/Cargo.lock b/Cargo.lock index cc17e85b4e..8f0b4dd90d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1448,7 +1448,6 @@ dependencies = [ "http-body", "indexmap", "linkerd2-addr", - "linkerd2-dns", "linkerd2-error", "linkerd2-proxy-api", "linkerd2-stack", diff --git a/linkerd/addr/src/lib.rs b/linkerd/addr/src/lib.rs index 5214620fa3..4b8e417fc0 100644 --- a/linkerd/addr/src/lib.rs +++ b/linkerd/addr/src/lib.rs @@ -129,6 +129,12 @@ impl From for Addr { } } +impl AsRef for Addr { + fn as_ref(&self) -> &Self { + self + } +} + // === impl NameAddr === impl NameAddr { diff --git a/linkerd/app/core/src/lib.rs b/linkerd/app/core/src/lib.rs index 4074f01001..eaeede22e8 100644 --- a/linkerd/app/core/src/lib.rs +++ b/linkerd/app/core/src/lib.rs @@ -117,3 +117,9 @@ impl std::fmt::Display for DiscoveryRejected { } impl std::error::Error for DiscoveryRejected {} + +impl From for DiscoveryRejected { + fn from(_: Addr) -> Self { + Self::new() + } +} diff --git a/linkerd/app/core/src/svc.rs b/linkerd/app/core/src/svc.rs index c30a158476..c142566649 100644 --- a/linkerd/app/core/src/svc.rs +++ b/linkerd/app/core/src/svc.rs @@ -2,7 +2,7 @@ pub use crate::proxy::http; use crate::transport::Connect; -use crate::{cache, Error}; +use crate::{cache, request_filter, Error}; pub use linkerd2_buffer as buffer; use linkerd2_concurrency_limit as concurrency_limit; pub use linkerd2_stack::{self as stack, layer, NewService}; @@ -299,6 +299,10 @@ impl Stack { self.push(stack::FallbackLayer::new(fallback).with_predicate(predicate)) } + pub fn push_request_filter(self, filter: F) -> Stack> { + self.push(request_filter::RequestFilterLayer::new(filter)) + } + // pub fn box_http_request(self) -> Stack> // where // B: hyper::body::HttpBody + 'static, diff --git a/linkerd/app/outbound/src/endpoint.rs b/linkerd/app/outbound/src/endpoint.rs index cbe8a20af7..14491a2c21 100644 --- a/linkerd/app/outbound/src/endpoint.rs +++ b/linkerd/app/outbound/src/endpoint.rs @@ -162,6 +162,12 @@ impl connect::ConnectAddr for Target { } } +impl AsRef for Target { + fn as_ref(&self) -> &Addr { + &self.addr + } +} + // === impl HttpEndpoint === impl HttpEndpoint { diff --git a/linkerd/app/src/dst/mod.rs b/linkerd/app/src/dst/mod.rs index 4edf99cab1..01252e5fe1 100644 --- a/linkerd/app/src/dst/mod.rs +++ b/linkerd/app/src/dst/mod.rs @@ -1,11 +1,13 @@ +mod permit; mod resolve; use http_body::Body as HttpBody; use indexmap::IndexSet; use linkerd2_app_core::{ config::{ControlAddr, ControlConfig}, - dns, profiles, Error, + dns, profiles, request_filter, svc, Error, }; +use permit::PermitConfiguredDsts; use std::time::Duration; use tonic::{ body::{Body, BoxBody}, @@ -19,6 +21,7 @@ pub struct Config { pub get_suffixes: IndexSet, pub get_networks: IndexSet, pub profile_suffixes: IndexSet, + pub profile_networks: IndexSet, pub initial_profile_timeout: Duration, } @@ -27,8 +30,11 @@ pub struct Config { /// The addr is preserved for logging. pub struct Dst { pub addr: ControlAddr, - pub profiles: profiles::Client, - pub resolve: resolve::Resolve, + pub profiles: request_filter::Service< + PermitConfiguredDsts, + profiles::Client, + >, + pub resolve: request_filter::Service>, } impl Config { @@ -42,21 +48,28 @@ impl Config { ::Error: Into + Send, S::Future: Send, { - let resolve = resolve::new( + let resolve = svc::stack(resolve::new( svc.clone(), - self.get_suffixes, - self.get_networks, &self.context, self.control.connect.backoff, - ); + )) + .push_request_filter(PermitConfiguredDsts::new( + self.get_suffixes, + self.get_networks, + )) + .into_inner(); - let profiles = profiles::Client::new( + let profiles = svc::stack(profiles::Client::new( svc, resolve::BackoffUnlessInvalidArgument::from(self.control.connect.backoff), self.initial_profile_timeout, self.context, - self.profile_suffixes, - ); + )) + .push_request_filter( + PermitConfiguredDsts::new(self.profile_suffixes, self.profile_networks) + .with_error::(), + ) + .into_inner(); Ok(Dst { addr: self.control.addr, diff --git a/linkerd/app/src/dst/permit.rs b/linkerd/app/src/dst/permit.rs new file mode 100644 index 0000000000..1fe17f6c47 --- /dev/null +++ b/linkerd/app/src/dst/permit.rs @@ -0,0 +1,78 @@ +use ipnet::{Contains, IpNet}; +use linkerd2_app_core::{dns::Suffix, request_filter, Addr, DiscoveryRejected, Error}; +use std::marker::PhantomData; +use std::net::IpAddr; +use std::sync::Arc; + +pub struct PermitConfiguredDsts { + name_suffixes: Arc>, + networks: Arc>, + _error: PhantomData, +} + +// === impl PermitConfiguredDsts === + +impl PermitConfiguredDsts { + pub fn new( + name_suffixes: impl IntoIterator, + nets: impl IntoIterator, + ) -> Self { + Self { + name_suffixes: Arc::new(name_suffixes.into_iter().collect()), + networks: Arc::new(nets.into_iter().collect()), + _error: PhantomData, + } + } + + /// Configures the returned error type when the target is outside of the + /// configured set of destinations. + pub fn with_error(self) -> PermitConfiguredDsts + where + E: Into + From, + { + PermitConfiguredDsts { + name_suffixes: self.name_suffixes, + networks: self.networks, + _error: PhantomData, + } + } +} + +impl Clone for PermitConfiguredDsts { + fn clone(&self) -> Self { + Self { + name_suffixes: self.name_suffixes.clone(), + networks: self.networks.clone(), + _error: PhantomData, + } + } +} + +impl request_filter::RequestFilter for PermitConfiguredDsts +where + E: Into + From, + T: AsRef, +{ + type Error = E; + + fn filter(&self, t: T) -> Result { + let addr = t.as_ref(); + let permitted = match addr { + Addr::Name(ref name) => self + .name_suffixes + .iter() + .any(|suffix| suffix.contains(name.name())), + Addr::Socket(sa) => self.networks.iter().any(|net| match (net, sa.ip()) { + (IpNet::V4(net), IpAddr::V4(addr)) => net.contains(&addr), + (IpNet::V6(net), IpAddr::V6(addr)) => net.contains(&addr), + _ => false, + }), + }; + + if permitted { + Ok(t) + } else { + Err(E::from(addr.clone())) + } + } +} diff --git a/linkerd/app/src/dst/resolve.rs b/linkerd/app/src/dst/resolve.rs index 6dc4bf7d01..3688e6d537 100644 --- a/linkerd/app/src/dst/resolve.rs +++ b/linkerd/app/src/dst/resolve.rs @@ -1,35 +1,23 @@ +pub use super::permit::PermitConfiguredDsts; use http_body::Body as HttpBody; -use ipnet::{Contains, IpNet}; use linkerd2_app_core::{ - dns::Suffix, exp_backoff::{ExponentialBackoff, ExponentialBackoffStream}, proxy::{ api_resolve as api, resolve::{self, recover}, }, - request_filter, Addr, DiscoveryRejected, Error, Recover, + DiscoveryRejected, Error, Recover, }; -use linkerd2_app_outbound::Target; -use std::net::IpAddr; -use std::sync::Arc; use tonic::{ body::{Body, BoxBody}, client::GrpcService, Code, Status, }; -pub type Resolve = request_filter::Service< - PermitConfiguredDsts, - recover::Resolve>>, ->; +pub type Resolve = + recover::Resolve>>; -pub fn new( - service: S, - suffixes: impl IntoIterator, - nets: impl IntoIterator, - token: &str, - backoff: ExponentialBackoff, -) -> Resolve +pub fn new(service: S, token: &str, backoff: ExponentialBackoff) -> Resolve where S: GrpcService + Clone + Send + 'static, S::Error: Into + Send, @@ -38,62 +26,15 @@ where ::Error: Into + Send, S::Future: Send, { - request_filter::Service::new( - PermitConfiguredDsts::new(suffixes, nets), - recover::Resolve::new( - backoff.into(), - resolve::make_unpin(api::Resolve::new(service).with_context_token(token)), - ), + recover::Resolve::new( + backoff.into(), + resolve::make_unpin(api::Resolve::new(service).with_context_token(token)), ) } -#[derive(Clone, Debug)] -pub struct PermitConfiguredDsts { - name_suffixes: Arc>, - networks: Arc>, -} - #[derive(Clone, Debug, Default)] pub struct BackoffUnlessInvalidArgument(ExponentialBackoff); -// === impl PermitConfiguredDsts === - -impl PermitConfiguredDsts { - fn new( - name_suffixes: impl IntoIterator, - nets: impl IntoIterator, - ) -> Self { - Self { - name_suffixes: Arc::new(name_suffixes.into_iter().collect()), - networks: Arc::new(nets.into_iter().collect()), - } - } -} - -impl request_filter::RequestFilter> for PermitConfiguredDsts { - type Error = DiscoveryRejected; - - fn filter(&self, t: Target) -> Result, Self::Error> { - let permitted = match t.addr { - Addr::Name(ref name) => self - .name_suffixes - .iter() - .any(|suffix| suffix.contains(name.name())), - Addr::Socket(sa) => self.networks.iter().any(|net| match (net, sa.ip()) { - (IpNet::V4(net), IpAddr::V4(addr)) => net.contains(&addr), - (IpNet::V6(net), IpAddr::V6(addr)) => net.contains(&addr), - _ => false, - }), - }; - - if permitted { - Ok(t) - } else { - Err(DiscoveryRejected::new()) - } - } -} - // === impl BackoffUnlessInvalidArgument === impl From for BackoffUnlessInvalidArgument { diff --git a/linkerd/app/src/env.rs b/linkerd/app/src/env.rs index ffc9e777b7..e92a6dd299 100644 --- a/linkerd/app/src/env.rs +++ b/linkerd/app/src/env.rs @@ -121,6 +121,16 @@ pub const ENV_DESTINATION_GET_NETWORKS: &str = "LINKERD2_PROXY_DESTINATION_GET_N /// If unspecified, a default value is used. pub const ENV_DESTINATION_PROFILE_SUFFIXES: &str = "LINKERD2_PROXY_DESTINATION_PROFILE_SUFFIXES"; +/// Constrains which destination addresses may be used for profile/route discovery. +/// +/// The value is a comma-separated list of networks that may be +/// resolved via the destination service. +/// +/// If specified and empty, the destination service is not used for route discovery. +/// +/// If unspecified, a default value is used. +pub const ENV_DESTINATION_PROFILE_NETWORKS: &str = "LINKERD2_PROXY_DESTINATION_PROFILE_NETWORKS"; + /// Constrains which destination names are permitted. /// /// If unspecified or empty, no inbound gateway is configured. @@ -331,6 +341,7 @@ pub fn parse_config(strings: &S) -> Result ENV_DESTINATION_PROFILE_SUFFIXES, parse_dns_suffixes, ); + let dst_profile_networks = parse(strings, ENV_DESTINATION_PROFILE_NETWORKS, parse_networks); let initial_stream_window_size = parse(strings, ENV_INITIAL_STREAM_WINDOW_SIZE, parse_number); let initial_connection_window_size = @@ -486,6 +497,7 @@ pub fn parse_config(strings: &S) -> Result get_networks: dst_get_networks?.unwrap_or_default(), profile_suffixes: dst_profile_suffixes? .unwrap_or(parse_dns_suffixes(DEFAULT_DESTINATION_PROFILE_SUFFIXES).unwrap()), + profile_networks: dst_profile_networks?.unwrap_or_default(), initial_profile_timeout: dst_profile_initial_timeout? .unwrap_or(DEFAULT_DESTINATION_PROFILE_INITIAL_TIMEOUT), control: ControlConfig { diff --git a/linkerd/request-filter/src/lib.rs b/linkerd/request-filter/src/lib.rs index 4652ec688c..d2eba8437f 100644 --- a/linkerd/request-filter/src/lib.rs +++ b/linkerd/request-filter/src/lib.rs @@ -15,6 +15,11 @@ pub trait RequestFilter { fn filter(&self, request: T) -> Result; } +#[derive(Clone, Debug)] +pub struct RequestFilterLayer { + filter: T, +} + #[derive(Clone, Debug)] pub struct Service { filter: I, @@ -28,6 +33,22 @@ pub enum ResponseFuture { Rejected(Option), } +// === impl Layer === + +impl RequestFilterLayer { + pub fn new(filter: T) -> Self { + Self { filter } + } +} + +impl tower::Layer for RequestFilterLayer { + type Service = Service; + + fn layer(&self, inner: S) -> Self::Service { + Service::new(self.filter.clone(), inner) + } +} + // === impl Service === impl Service { diff --git a/linkerd/service-profiles/Cargo.toml b/linkerd/service-profiles/Cargo.toml index 7e634394e4..632342f558 100644 --- a/linkerd/service-profiles/Cargo.toml +++ b/linkerd/service-profiles/Cargo.toml @@ -15,7 +15,6 @@ http = "0.2" http-body = "0.3" indexmap = "1.0" linkerd2-addr = { path = "../addr" } -linkerd2-dns = { path = "../dns" } linkerd2-error = { path = "../error" } linkerd2-proxy-api = { git = "https://github.com/linkerd/linkerd2-proxy-api", tag = "v0.1.13" } linkerd2-stack = { path = "../stack" } diff --git a/linkerd/service-profiles/src/client.rs b/linkerd/service-profiles/src/client.rs index b37beb00f7..765d0f638b 100644 --- a/linkerd/service-profiles/src/client.rs +++ b/linkerd/service-profiles/src/client.rs @@ -4,7 +4,6 @@ use futures::{future, prelude::*, ready, select_biased}; use http; use http_body::Body as HttpBody; use linkerd2_addr::{Addr, NameAddr}; -use linkerd2_dns as dns; use linkerd2_error::{Error, Recover}; use linkerd2_proxy_api::destination as api; use pin_project::pin_project; @@ -32,7 +31,6 @@ pub struct Client { recover: R, initial_timeout: Duration, context_token: String, - suffixes: Vec, } pub type Receiver = watch::Receiver; @@ -47,17 +45,9 @@ where R: Recover, { #[pin] - inner: ProfileFutureInner, -} - -#[pin_project(project = ProfileFutureInnerProj)] -enum ProfileFutureInner -where - S: GrpcService, - R: Recover, -{ - Invalid(Addr), - Pending(#[pin] Option>, #[pin] Delay), + inner: Option>, + #[pin] + timeout: Delay, } #[pin_project] @@ -112,19 +102,12 @@ where R: Recover, R::Backoff: Unpin, { - pub fn new( - service: S, - recover: R, - initial_timeout: Duration, - context_token: String, - suffixes: impl IntoIterator, - ) -> Self { + pub fn new(service: S, recover: R, initial_timeout: Duration, context_token: String) -> Self { Self { service: DestinationClient::new(service), recover, initial_timeout, context_token, - suffixes: suffixes.into_iter().collect(), } } } @@ -150,23 +133,7 @@ where } fn call(&mut self, dst: Addr) -> Self::Future { - let dst = match dst { - Addr::Name(n) => n, - Addr::Socket(_) => { - self.service = self.service.clone(); - return ProfileFuture { - inner: ProfileFutureInner::Invalid(dst), - }; - } - }; - - if !self.suffixes.iter().any(|s| s.contains(dst.name())) { - debug!("name not in profile suffixes"); - self.service = self.service.clone(); - return ProfileFuture { - inner: ProfileFutureInner::Invalid(dst.into()), - }; - } + let path = dst.to_string(); let service = { // In case the ready service holds resources, pass it into the @@ -176,7 +143,7 @@ where }; let request = api::GetDestination { - path: dst.to_string(), + path, context_token: self.context_token.clone(), ..Default::default() }; @@ -190,7 +157,8 @@ where state: State::Disconnected { backoff: None }, }; ProfileFuture { - inner: ProfileFutureInner::Pending(Some(inner), timeout), + inner: Some(inner), + timeout, } } } @@ -209,68 +177,63 @@ where type Output = Result; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match self.project().inner.project() { - ProfileFutureInnerProj::Invalid(addr) => { - Poll::Ready(Err(InvalidProfileAddr(addr.clone()).into())) + let mut this = self.project(); + let profile = match this + .inner + .as_mut() + .as_pin_mut() + .expect("polled after ready") + .poll_profile(cx) + { + Poll::Ready(Err(error)) => { + trace!(%error, "failed to fetch profile"); + return Poll::Ready(Err(error)); } - ProfileFutureInnerProj::Pending(mut inner, timeout) => { - let profile = match inner - .as_mut() - .as_pin_mut() - .expect("polled after ready") - .poll_profile(cx) - { - Poll::Ready(Err(error)) => { - trace!(%error, "failed to fetch profile"); - return Poll::Ready(Err(error)); - } - Poll::Pending => { - if timeout.poll(cx).is_pending() { - return Poll::Pending; - } + Poll::Pending => { + if this.timeout.poll(cx).is_pending() { + return Poll::Pending; + } - info!("Using default service profile after timeout"); - profiles::Routes::default() - } - Poll::Ready(Ok(profile)) => profile, - }; - - trace!("daemonizing"); - let (mut tx, rx) = watch::channel(profile); - let inner = inner.take().expect("polled after ready"); - let daemon = async move { - tokio::pin!(inner); - loop { - select_biased! { - _ = tx.closed().fuse() => { - trace!("profile observation dropped"); + info!("Using default service profile after timeout"); + profiles::Routes::default() + } + Poll::Ready(Ok(profile)) => profile, + }; + + trace!("daemonizing"); + let (mut tx, rx) = watch::channel(profile); + let inner = this.inner.take().expect("polled after ready"); + let daemon = async move { + tokio::pin!(inner); + loop { + select_biased! { + _ = tx.closed().fuse() => { + trace!("profile observation dropped"); + return; + }, + profile = future::poll_fn(|cx| + inner.as_mut().poll_profile(cx) + ).fuse() => { + match profile { + Err(error) => { + error!(%error, "profile client died"); return; - }, - profile = future::poll_fn(|cx| - inner.as_mut().poll_profile(cx) - ).fuse() => { - match profile { - Err(error) => { - error!(%error, "profile client died"); - return; - } - Ok(profile) => { - trace!(?profile, "publishing"); - if tx.broadcast(profile).is_err() { - trace!("failed to publish profile"); - return - } - } + } + Ok(profile) => { + trace!(?profile, "publishing"); + if tx.broadcast(profile).is_err() { + trace!("failed to publish profile"); + return } } } } - }; - tokio::spawn(daemon.in_current_span()); - - Poll::Ready(Ok(rx)) + } } - } + }; + tokio::spawn(daemon.in_current_span()); + + Poll::Ready(Ok(rx)) } } @@ -582,3 +545,9 @@ impl std::fmt::Display for InvalidProfileAddr { } impl std::error::Error for InvalidProfileAddr {} + +impl From for InvalidProfileAddr { + fn from(addr: Addr) -> Self { + Self(addr) + } +}