Skip to content

profiles: perform profile resolution for IP addresses #626

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Aug 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1448,7 +1448,6 @@ dependencies = [
"http-body",
"indexmap",
"linkerd2-addr",
"linkerd2-dns",
"linkerd2-error",
"linkerd2-proxy-api",
"linkerd2-stack",
Expand Down
6 changes: 6 additions & 0 deletions linkerd/addr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,12 @@ impl From<NameAddr> for Addr {
}
}

impl AsRef<Self> for Addr {
fn as_ref(&self) -> &Self {
self
}
}

// === impl NameAddr ===

impl NameAddr {
Expand Down
6 changes: 6 additions & 0 deletions linkerd/app/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,3 +117,9 @@ impl std::fmt::Display for DiscoveryRejected {
}

impl std::error::Error for DiscoveryRejected {}

impl From<Addr> for DiscoveryRejected {
fn from(_: Addr) -> Self {
Self::new()
}
}
6 changes: 5 additions & 1 deletion linkerd/app/core/src/svc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

pub use crate::proxy::http;
use crate::transport::Connect;
use crate::{cache, Error};
use crate::{cache, request_filter, Error};
pub use linkerd2_buffer as buffer;
use linkerd2_concurrency_limit as concurrency_limit;
pub use linkerd2_stack::{self as stack, layer, NewService};
Expand Down Expand Up @@ -299,6 +299,10 @@ impl<S> Stack<S> {
self.push(stack::FallbackLayer::new(fallback).with_predicate(predicate))
}

pub fn push_request_filter<F: Clone>(self, filter: F) -> Stack<request_filter::Service<F, S>> {
self.push(request_filter::RequestFilterLayer::new(filter))
}

// pub fn box_http_request<B>(self) -> Stack<http::boxed::BoxRequest<S, B>>
// where
// B: hyper::body::HttpBody<Data = http::boxed::Data, Error = Error> + 'static,
Expand Down
6 changes: 6 additions & 0 deletions linkerd/app/outbound/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,12 @@ impl<T: connect::ConnectAddr> connect::ConnectAddr for Target<T> {
}
}

impl<T> AsRef<Addr> for Target<T> {
fn as_ref(&self) -> &Addr {
&self.addr
}
}

// === impl HttpEndpoint ===

impl HttpEndpoint {
Expand Down
33 changes: 23 additions & 10 deletions linkerd/app/src/dst/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
mod permit;
mod resolve;

use http_body::Body as HttpBody;
use indexmap::IndexSet;
use linkerd2_app_core::{
config::{ControlAddr, ControlConfig},
dns, profiles, Error,
dns, profiles, request_filter, svc, Error,
};
use permit::PermitConfiguredDsts;
use std::time::Duration;
use tonic::{
body::{Body, BoxBody},
Expand All @@ -19,6 +21,7 @@ pub struct Config {
pub get_suffixes: IndexSet<dns::Suffix>,
pub get_networks: IndexSet<ipnet::IpNet>,
pub profile_suffixes: IndexSet<dns::Suffix>,
pub profile_networks: IndexSet<ipnet::IpNet>,
pub initial_profile_timeout: Duration,
}

Expand All @@ -27,8 +30,11 @@ pub struct Config {
/// The addr is preserved for logging.
pub struct Dst<S> {
pub addr: ControlAddr,
pub profiles: profiles::Client<S, resolve::BackoffUnlessInvalidArgument>,
pub resolve: resolve::Resolve<S>,
pub profiles: request_filter::Service<
PermitConfiguredDsts<profiles::InvalidProfileAddr>,
profiles::Client<S, resolve::BackoffUnlessInvalidArgument>,
>,
pub resolve: request_filter::Service<PermitConfiguredDsts, resolve::Resolve<S>>,
}

impl Config {
Expand All @@ -42,21 +48,28 @@ impl Config {
<S::ResponseBody as HttpBody>::Error: Into<Error> + Send,
S::Future: Send,
{
let resolve = resolve::new(
let resolve = svc::stack(resolve::new(
svc.clone(),
self.get_suffixes,
self.get_networks,
&self.context,
self.control.connect.backoff,
);
))
.push_request_filter(PermitConfiguredDsts::new(
self.get_suffixes,
self.get_networks,
))
.into_inner();

let profiles = profiles::Client::new(
let profiles = svc::stack(profiles::Client::new(
svc,
resolve::BackoffUnlessInvalidArgument::from(self.control.connect.backoff),
self.initial_profile_timeout,
self.context,
self.profile_suffixes,
);
))
.push_request_filter(
PermitConfiguredDsts::new(self.profile_suffixes, self.profile_networks)
.with_error::<profiles::InvalidProfileAddr>(),
)
.into_inner();

Ok(Dst {
addr: self.control.addr,
Expand Down
78 changes: 78 additions & 0 deletions linkerd/app/src/dst/permit.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
use ipnet::{Contains, IpNet};
use linkerd2_app_core::{dns::Suffix, request_filter, Addr, DiscoveryRejected, Error};
use std::marker::PhantomData;
use std::net::IpAddr;
use std::sync::Arc;

pub struct PermitConfiguredDsts<E = DiscoveryRejected> {
name_suffixes: Arc<Vec<Suffix>>,
networks: Arc<Vec<IpNet>>,
_error: PhantomData<fn(E)>,
}

// === impl PermitConfiguredDsts ===

impl PermitConfiguredDsts {
pub fn new(
name_suffixes: impl IntoIterator<Item = Suffix>,
nets: impl IntoIterator<Item = IpNet>,
) -> Self {
Self {
name_suffixes: Arc::new(name_suffixes.into_iter().collect()),
networks: Arc::new(nets.into_iter().collect()),
_error: PhantomData,
}
}

/// Configures the returned error type when the target is outside of the
/// configured set of destinations.
pub fn with_error<E>(self) -> PermitConfiguredDsts<E>
where
E: Into<Error> + From<Addr>,
{
PermitConfiguredDsts {
name_suffixes: self.name_suffixes,
networks: self.networks,
_error: PhantomData,
}
}
}

impl<E> Clone for PermitConfiguredDsts<E> {
fn clone(&self) -> Self {
Self {
name_suffixes: self.name_suffixes.clone(),
networks: self.networks.clone(),
_error: PhantomData,
}
}
}

impl<T, E> request_filter::RequestFilter<T> for PermitConfiguredDsts<E>
where
E: Into<Error> + From<Addr>,
T: AsRef<Addr>,
{
type Error = E;

fn filter(&self, t: T) -> Result<T, Self::Error> {
let addr = t.as_ref();
let permitted = match addr {
Addr::Name(ref name) => self
.name_suffixes
.iter()
.any(|suffix| suffix.contains(name.name())),
Addr::Socket(sa) => self.networks.iter().any(|net| match (net, sa.ip()) {
(IpNet::V4(net), IpAddr::V4(addr)) => net.contains(&addr),
(IpNet::V6(net), IpAddr::V6(addr)) => net.contains(&addr),
_ => false,
}),
};

if permitted {
Ok(t)
} else {
Err(E::from(addr.clone()))
}
}
}
75 changes: 8 additions & 67 deletions linkerd/app/src/dst/resolve.rs
Original file line number Diff line number Diff line change
@@ -1,35 +1,23 @@
pub use super::permit::PermitConfiguredDsts;
use http_body::Body as HttpBody;
use ipnet::{Contains, IpNet};
use linkerd2_app_core::{
dns::Suffix,
exp_backoff::{ExponentialBackoff, ExponentialBackoffStream},
proxy::{
api_resolve as api,
resolve::{self, recover},
},
request_filter, Addr, DiscoveryRejected, Error, Recover,
DiscoveryRejected, Error, Recover,
};
use linkerd2_app_outbound::Target;
use std::net::IpAddr;
use std::sync::Arc;
use tonic::{
body::{Body, BoxBody},
client::GrpcService,
Code, Status,
};

pub type Resolve<S> = request_filter::Service<
PermitConfiguredDsts,
recover::Resolve<BackoffUnlessInvalidArgument, resolve::make_unpin::Resolve<api::Resolve<S>>>,
>;
pub type Resolve<S> =
recover::Resolve<BackoffUnlessInvalidArgument, resolve::make_unpin::Resolve<api::Resolve<S>>>;

pub fn new<S>(
service: S,
suffixes: impl IntoIterator<Item = Suffix>,
nets: impl IntoIterator<Item = IpNet>,
token: &str,
backoff: ExponentialBackoff,
) -> Resolve<S>
pub fn new<S>(service: S, token: &str, backoff: ExponentialBackoff) -> Resolve<S>
where
S: GrpcService<BoxBody> + Clone + Send + 'static,
S::Error: Into<Error> + Send,
Expand All @@ -38,62 +26,15 @@ where
<S::ResponseBody as HttpBody>::Error: Into<Error> + Send,
S::Future: Send,
{
request_filter::Service::new(
PermitConfiguredDsts::new(suffixes, nets),
recover::Resolve::new(
backoff.into(),
resolve::make_unpin(api::Resolve::new(service).with_context_token(token)),
),
recover::Resolve::new(
backoff.into(),
resolve::make_unpin(api::Resolve::new(service).with_context_token(token)),
)
}

#[derive(Clone, Debug)]
pub struct PermitConfiguredDsts {
name_suffixes: Arc<Vec<Suffix>>,
networks: Arc<Vec<IpNet>>,
}

#[derive(Clone, Debug, Default)]
pub struct BackoffUnlessInvalidArgument(ExponentialBackoff);

// === impl PermitConfiguredDsts ===

impl PermitConfiguredDsts {
fn new(
name_suffixes: impl IntoIterator<Item = Suffix>,
nets: impl IntoIterator<Item = IpNet>,
) -> Self {
Self {
name_suffixes: Arc::new(name_suffixes.into_iter().collect()),
networks: Arc::new(nets.into_iter().collect()),
}
}
}

impl<T> request_filter::RequestFilter<Target<T>> for PermitConfiguredDsts {
type Error = DiscoveryRejected;

fn filter(&self, t: Target<T>) -> Result<Target<T>, Self::Error> {
let permitted = match t.addr {
Addr::Name(ref name) => self
.name_suffixes
.iter()
.any(|suffix| suffix.contains(name.name())),
Addr::Socket(sa) => self.networks.iter().any(|net| match (net, sa.ip()) {
(IpNet::V4(net), IpAddr::V4(addr)) => net.contains(&addr),
(IpNet::V6(net), IpAddr::V6(addr)) => net.contains(&addr),
_ => false,
}),
};

if permitted {
Ok(t)
} else {
Err(DiscoveryRejected::new())
}
}
}

// === impl BackoffUnlessInvalidArgument ===

impl From<ExponentialBackoff> for BackoffUnlessInvalidArgument {
Expand Down
12 changes: 12 additions & 0 deletions linkerd/app/src/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,16 @@ pub const ENV_DESTINATION_GET_NETWORKS: &str = "LINKERD2_PROXY_DESTINATION_GET_N
/// If unspecified, a default value is used.
pub const ENV_DESTINATION_PROFILE_SUFFIXES: &str = "LINKERD2_PROXY_DESTINATION_PROFILE_SUFFIXES";

/// Constrains which destination addresses may be used for profile/route discovery.
///
/// The value is a comma-separated list of networks that may be
/// resolved via the destination service.
///
/// If specified and empty, the destination service is not used for route discovery.
///
/// If unspecified, a default value is used.
pub const ENV_DESTINATION_PROFILE_NETWORKS: &str = "LINKERD2_PROXY_DESTINATION_PROFILE_NETWORKS";

/// Constrains which destination names are permitted.
///
/// If unspecified or empty, no inbound gateway is configured.
Expand Down Expand Up @@ -331,6 +341,7 @@ pub fn parse_config<S: Strings>(strings: &S) -> Result<super::Config, EnvError>
ENV_DESTINATION_PROFILE_SUFFIXES,
parse_dns_suffixes,
);
let dst_profile_networks = parse(strings, ENV_DESTINATION_PROFILE_NETWORKS, parse_networks);

let initial_stream_window_size = parse(strings, ENV_INITIAL_STREAM_WINDOW_SIZE, parse_number);
let initial_connection_window_size =
Expand Down Expand Up @@ -486,6 +497,7 @@ pub fn parse_config<S: Strings>(strings: &S) -> Result<super::Config, EnvError>
get_networks: dst_get_networks?.unwrap_or_default(),
profile_suffixes: dst_profile_suffixes?
.unwrap_or(parse_dns_suffixes(DEFAULT_DESTINATION_PROFILE_SUFFIXES).unwrap()),
profile_networks: dst_profile_networks?.unwrap_or_default(),
initial_profile_timeout: dst_profile_initial_timeout?
.unwrap_or(DEFAULT_DESTINATION_PROFILE_INITIAL_TIMEOUT),
control: ControlConfig {
Expand Down
21 changes: 21 additions & 0 deletions linkerd/request-filter/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ pub trait RequestFilter<T> {
fn filter(&self, request: T) -> Result<T, Self::Error>;
}

#[derive(Clone, Debug)]
pub struct RequestFilterLayer<T> {
filter: T,
}

#[derive(Clone, Debug)]
pub struct Service<I, S> {
filter: I,
Expand All @@ -28,6 +33,22 @@ pub enum ResponseFuture<F> {
Rejected(Option<Error>),
}

// === impl Layer ===

impl<T: Clone> RequestFilterLayer<T> {
pub fn new(filter: T) -> Self {
Self { filter }
}
}

impl<T: Clone, S> tower::Layer<S> for RequestFilterLayer<T> {
type Service = Service<T, S>;

fn layer(&self, inner: S) -> Self::Service {
Service::new(self.filter.clone(), inner)
}
}

// === impl Service ===

impl<I, S> Service<I, S> {
Expand Down
1 change: 0 additions & 1 deletion linkerd/service-profiles/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ http = "0.2"
http-body = "0.3"
indexmap = "1.0"
linkerd2-addr = { path = "../addr" }
linkerd2-dns = { path = "../dns" }
linkerd2-error = { path = "../error" }
linkerd2-proxy-api = { git = "https://github.com/linkerd/linkerd2-proxy-api", tag = "v0.1.13" }
linkerd2-stack = { path = "../stack" }
Expand Down
Loading