diff --git a/Cargo.lock b/Cargo.lock index 59fb534370..c742cda184 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -537,6 +537,12 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6456b8a6c8f33fee7d958fcd1b60d55b11940a79e63ae87013e6d22e26034440" +[[package]] +name = "humantime" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" + [[package]] name = "hyper" version = "0.14.16" @@ -867,6 +873,7 @@ dependencies = [ "libfuzzer-sys", "linkerd-app-core", "linkerd-app-test", + "linkerd-http-access-log", "linkerd-io", "linkerd-meshtls", "linkerd-meshtls-rustls", @@ -1061,6 +1068,22 @@ dependencies = [ "tokio", ] +[[package]] +name = "linkerd-http-access-log" +version = "0.1.0" +dependencies = [ + "futures-core", + "http", + "humantime", + "linkerd-identity", + "linkerd-proxy-transport", + "linkerd-stack", + "linkerd-tls", + "linkerd-tracing", + "pin-project", + "tracing", +] + [[package]] name = "linkerd-http-box" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index d695ef366e..6aca68e28f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,6 +24,7 @@ members = [ "linkerd/errno", "linkerd/error-respond", "linkerd/exp-backoff", + "linkerd/http-access-log", "linkerd/http-box", "linkerd/http-classify", "linkerd/http-metrics", diff --git a/linkerd/app/inbound/Cargo.toml b/linkerd/app/inbound/Cargo.toml index 6141338bd8..8432814ff0 100644 --- a/linkerd/app/inbound/Cargo.toml +++ b/linkerd/app/inbound/Cargo.toml @@ -14,6 +14,7 @@ bytes = "1" http = "0.2" futures = { version = "0.3", default-features = false } linkerd-app-core = { path = "../core" } +linkerd-http-access-log = { path = "../../http-access-log" } linkerd-server-policy = { path = "../../server-policy" } linkerd-tonic-watch = { path = "../../tonic-watch" } linkerd2-proxy-api = { version = "0.3", features = ["client", "inbound"] } diff --git a/linkerd/app/inbound/src/http/server.rs b/linkerd/app/inbound/src/http/server.rs index 8f20d518a7..f976513736 100644 --- a/linkerd/app/inbound/src/http/server.rs +++ b/linkerd/app/inbound/src/http/server.rs @@ -11,9 +11,10 @@ use linkerd_app_core::{ proxy::http, svc::{self, ExtractParam, Param}, tls, - transport::OrigDstAddr, + transport::{ClientAddr, OrigDstAddr, Remote}, Error, Result, }; +use linkerd_http_access_log::NewAccessLog; use tracing::debug_span; #[derive(Copy, Clone, Debug)] @@ -26,7 +27,8 @@ impl Inbound { + Param + Param + Param - + Param, + + Param + + Param>, T: Clone + Send + Unpin + 'static, I: io::AsyncRead + io::AsyncWrite + io::PeerAddr + Send + Unpin + 'static, H: svc::NewService + Clone + Send + Sync + Unpin + 'static, @@ -79,6 +81,7 @@ impl Inbound { .push(http::BoxResponse::layer()), ) .check_new_service::>() + .push(NewAccessLog::layer()) .instrument(|t: &T| debug_span!("http", v = %Param::::param(t))) .push(http::NewServeHttp::layer(h2_settings, rt.drain.clone())) .push_on_service(svc::BoxService::layer()) diff --git a/linkerd/http-access-log/Cargo.toml b/linkerd/http-access-log/Cargo.toml new file mode 100644 index 0000000000..a94691880d --- /dev/null +++ b/linkerd/http-access-log/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "linkerd-http-access-log" +version = "0.1.0" +authors = ["Linkerd Developers "] +license = "Apache-2.0" +edition = "2018" +publish = false + +[dependencies] +futures-core = "0.3" +http = "0.2" +humantime = "2" +pin-project = "1" +linkerd-stack = { path = "../stack" } +linkerd-identity = { path = "../identity" } +linkerd-tls = { path = "../tls" } +linkerd-proxy-transport = { path = "../proxy/transport" } +linkerd-tracing = { path = "../tracing" } +tracing = "0.1.19" diff --git a/linkerd/http-access-log/src/lib.rs b/linkerd/http-access-log/src/lib.rs new file mode 100644 index 0000000000..a01a5677bb --- /dev/null +++ b/linkerd/http-access-log/src/lib.rs @@ -0,0 +1,207 @@ +#![deny(warnings, rust_2018_idioms)] +#![forbid(unsafe_code)] + +use futures_core::TryFuture; +use linkerd_identity as identity; +use linkerd_proxy_transport::{ClientAddr, Remote}; +use linkerd_stack as svc; +use linkerd_tls as tls; +use linkerd_tracing::access_log::TRACE_TARGET; +use pin_project::pin_project; +use std::{ + future::Future, + net::SocketAddr, + pin::Pin, + task::{Context, Poll}, + time::{Duration, Instant, SystemTime}, +}; +use svc::{NewService, Param}; +use tracing::{field, span, Level, Span}; + +#[derive(Clone, Debug)] +pub struct NewAccessLog { + inner: N, +} + +#[derive(Clone, Debug)] +pub struct AccessLogContext { + inner: S, + client_addr: SocketAddr, + client_id: Option, +} + +struct ResponseFutureInner { + span: Span, + start: Instant, + processing: Duration, +} + +#[pin_project] +pub struct AccessLogFuture { + data: Option, + + #[pin] + inner: F, +} + +impl NewAccessLog { + /// Returns a new `NewAccessLog` layer that wraps an inner service with + /// access logging middleware. + /// + /// The access log is recorded by adding a `tracing` span to the service's + /// future. If access logging is not enabled by the `tracing` subscriber, + /// this span will never be enabled, and it can be skipped cheaply. When + /// access logging *is* enabled, additional data will be recorded when the + /// response future completes. + /// + /// Recording the access log will introduce additional overhead in the + /// request path, but this is largely avoided when access logging is not + /// enabled. + #[inline] + pub fn layer() -> impl svc::layer::Layer { + svc::layer::mk(|inner| NewAccessLog { inner }) + } +} + +impl NewService for NewAccessLog +where + T: Param + Param>, + N: NewService, +{ + type Service = AccessLogContext; + + fn new_service(&self, target: T) -> Self::Service { + let Remote(ClientAddr(client_addr)) = target.param(); + let tls: tls::ConditionalServerTls = target.param(); + let client_id = tls + .value() + .and_then(|tls| tls.client_id().map(|tls::ClientId(name)| name.clone())); + let inner = self.inner.new_service(target); + AccessLogContext { + inner, + client_addr, + client_id, + } + } +} + +impl svc::Service> for AccessLogContext +where + S: svc::Service, Response = http::Response>, +{ + type Response = S::Response; + type Error = S::Error; + type Future = AccessLogFuture; + + #[inline] + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx) + } + + fn call(&mut self, request: http::Request) -> Self::Future { + let get_header = |name: http::header::HeaderName| { + request + .headers() + .get(name) + .and_then(|x| x.to_str().ok()) + .unwrap_or_default() + }; + + let trace_id = || { + let headers = request.headers(); + headers + .get("x-b3-traceid") + .or_else(|| headers.get("x-request-id")) + .or_else(|| headers.get("x-amzn-trace-id")) + .and_then(|x| x.to_str().ok()) + .unwrap_or_default() + }; + + let span = span!(target: TRACE_TARGET, Level::INFO, "http", + client.addr = %self.client_addr, + client.id = self.client_id.as_ref().map(|n| n.as_str()).unwrap_or("-"), + timestamp = %now(), + method = request.method().as_str(), + uri = %request.uri(), + version = ?request.version(), + trace_id = trace_id(), + request_bytes = get_header(http::header::CONTENT_LENGTH), + status = field::Empty, + response_bytes = field::Empty, + total_ns = field::Empty, + processing_ns = field::Empty, + user_agent = get_header(http::header::USER_AGENT), + host = get_header(http::header::HOST), + ); + + // The access log span is only enabled by the `tracing` subscriber if + // access logs are being recorded. If it's disabled, we can skip + // recording additional data in the response future. + if span.is_disabled() { + return AccessLogFuture { + data: None, + inner: self.inner.call(request), + }; + } + + AccessLogFuture { + data: Some(ResponseFutureInner { + span, + start: Instant::now(), + processing: Duration::from_secs(0), + }), + inner: self.inner.call(request), + } + } +} + +impl Future for AccessLogFuture +where + F: TryFuture>, +{ + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); + + let data: &mut ResponseFutureInner = match &mut this.data { + Some(data) => data, + None => return this.inner.try_poll(cx), + }; + + let _enter = data.span.enter(); + let poll_start = Instant::now(); + + let response: http::Response = match this.inner.try_poll(cx) { + Poll::Pending => { + data.processing += Instant::now().duration_since(poll_start); + return Poll::Pending; + } + Poll::Ready(Err(e)) => return Poll::Ready(Err(e)), + Poll::Ready(Ok(response)) => response, + }; + + let now = Instant::now(); + let total_ns = now.duration_since(data.start).as_nanos(); + let processing_ns = (now.duration_since(poll_start) + data.processing).as_nanos(); + + let span = &data.span; + + response + .headers() + .get(http::header::CONTENT_LENGTH) + .and_then(|x| x.to_str().ok()) + .map(|x| span.record("response_bytes", &x)); + + span.record("status", &response.status().as_u16()); + span.record("total_ns", &field::display(total_ns)); + span.record("processing_ns", &field::display(processing_ns)); + + Poll::Ready(Ok(response)) + } +} + +#[inline] +fn now() -> humantime::Rfc3339Timestamp { + humantime::format_rfc3339(SystemTime::now()) +} diff --git a/linkerd/http-retry/src/lib.rs b/linkerd/http-retry/src/lib.rs index e67844123b..95a9847de7 100644 --- a/linkerd/http-retry/src/lib.rs +++ b/linkerd/http-retry/src/lib.rs @@ -938,7 +938,7 @@ mod tests { tx: Tx(tx), initial, replay, - _trace: linkerd_tracing::test::with_default_filter("linkerd_http_retry=debug"), + _trace: linkerd_tracing::test::with_default_filter("linkerd_http_retry=debug").0, } } } diff --git a/linkerd/tls/src/server.rs b/linkerd/tls/src/server.rs index 1740ced355..ff8702d9f9 100644 --- a/linkerd/tls/src/server.rs +++ b/linkerd/tls/src/server.rs @@ -294,6 +294,17 @@ impl fmt::Display for NoServerTls { } } +// === impl ServerTls === + +impl ServerTls { + pub fn client_id(&self) -> Option<&ClientId> { + match self { + ServerTls::Established { ref client_id, .. } => client_id.as_ref(), + _ => None, + } + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/linkerd/tracing/Cargo.toml b/linkerd/tracing/Cargo.toml index 37ed839b92..268656a593 100644 --- a/linkerd/tracing/Cargo.toml +++ b/linkerd/tracing/Cargo.toml @@ -18,5 +18,6 @@ tracing-log = "0.1.2" [dependencies.tracing-subscriber] version = "0.3" -features = ["env-filter","smallvec", "tracing-log", "json", "parking_lot"] +default-features = false +features = ["env-filter", "fmt", "smallvec", "tracing-log", "json", "parking_lot", "registry"] diff --git a/linkerd/tracing/src/access_log.rs b/linkerd/tracing/src/access_log.rs new file mode 100644 index 0000000000..d79ee63771 --- /dev/null +++ b/linkerd/tracing/src/access_log.rs @@ -0,0 +1,156 @@ +use std::fmt; +use tracing::{field, span, Id, Level, Metadata, Subscriber}; +use tracing_subscriber::{ + field::RecordFields, + filter::{Directive, FilterFn, Filtered}, + fmt::{format, FormatFields, FormattedFields}, + layer::{Context, Layer}, + registry::LookupSpan, +}; + +pub const TRACE_TARGET: &str = "_access_log"; + +pub(super) type AccessLogLayer = Filtered; + +pub(super) struct Writer { + formatter: F, +} + +#[derive(Default)] +pub(super) struct ApacheCommon { + _p: (), +} + +struct ApacheCommonVisitor<'writer> { + res: fmt::Result, + writer: format::Writer<'writer>, +} + +pub(super) fn build() -> (AccessLogLayer, Directive) +where + S: Subscriber + for<'span> LookupSpan<'span>, +{ + let writer = Writer::new().with_filter( + FilterFn::new( + (|meta| meta.level() == &Level::INFO && meta.target().starts_with(TRACE_TARGET)) + as fn(&Metadata<'_>) -> bool, + ) + .with_max_level_hint(Level::INFO), + ); + + // Also, ensure that the `tracing` filter configuration will + // always enable the access log spans. + let directive = format!("{}=info", TRACE_TARGET) + .parse() + .expect("access logging filter directive must parse"); + + (writer, directive) +} + +// === impl Writer === + +impl Writer { + pub fn new() -> Self { + Self { + formatter: Default::default(), + } + } +} + +impl Layer for Writer +where + S: Subscriber + for<'span> LookupSpan<'span>, + F: for<'writer> FormatFields<'writer> + 'static, +{ + fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &Id, ctx: Context<'_, S>) { + let span = ctx.span(id).expect("Span not found, this is a bug"); + let mut extensions = span.extensions_mut(); + + if extensions.get_mut::>().is_none() { + let mut fields = FormattedFields::::new(String::new()); + if self + .formatter + .format_fields(fields.as_writer(), attrs) + .is_ok() + { + extensions.insert(fields); + } + } + } + + fn on_record(&self, id: &Id, values: &span::Record<'_>, ctx: Context<'_, S>) { + let span = ctx.span(id).expect("Span not found, this is a bug"); + let mut extensions = span.extensions_mut(); + if let Some(fields) = extensions.get_mut::>() { + let _ = self.formatter.add_fields(fields, values); + return; + } + + let mut fields = FormattedFields::::new(String::new()); + if self + .formatter + .format_fields(fields.as_writer(), values) + .is_ok() + { + extensions.insert(fields); + } + } + + fn on_close(&self, id: Id, ctx: Context<'_, S>) { + if let Some(span) = ctx.span(&id) { + if let Some(fields) = span.extensions().get::>() { + eprintln!("{}", fields.fields); + } + } + } +} + +impl ApacheCommon { + const SKIPPED_FIELDS: &'static [&'static str] = &[ + "trace_id", + "request_bytes", + "total_ns", + "processing_ns", + "response_bytes", + "user_agent", + "host", + ]; +} + +impl<'writer> FormatFields<'writer> for ApacheCommon { + fn format_fields(&self, writer: format::Writer<'_>, fields: R) -> fmt::Result { + let mut visitor = ApacheCommonVisitor { + writer, + res: Ok(()), + }; + fields.record(&mut visitor); + visitor.res + } + + #[inline] + fn add_fields( + &self, + current: &mut FormattedFields, + fields: &span::Record<'_>, + ) -> fmt::Result { + self.format_fields(current.as_writer(), fields) + } +} + +impl field::Visit for ApacheCommonVisitor<'_> { + fn record_str(&mut self, field: &field::Field, val: &str) { + self.record_debug(field, &format_args!("{}", val)) + } + + fn record_debug(&mut self, field: &field::Field, val: &dyn fmt::Debug) { + self.res = match field.name() { + n if ApacheCommon::SKIPPED_FIELDS.contains(&n) => return, + "timestamp" => write!(&mut self.writer, " [{:?}]", val), + "client.addr" => write!(&mut self.writer, "{:?}", val), + "client.id" => write!(&mut self.writer, " {:?} -", val), + "method" => write!(&mut self.writer, " \"{:?}", val), + "version" => write!(&mut self.writer, " {:?}\"", val), + _ => write!(&mut self.writer, " {:?}", val), + } + } +} diff --git a/linkerd/tracing/src/lib.rs b/linkerd/tracing/src/lib.rs index 77a6605f42..22b75a3586 100644 --- a/linkerd/tracing/src/lib.rs +++ b/linkerd/tracing/src/lib.rs @@ -1,18 +1,26 @@ #![deny(warnings, rust_2018_idioms)] #![forbid(unsafe_code)] +pub mod access_log; pub mod level; pub mod test; mod uptime; use self::uptime::Uptime; use linkerd_error::Error; -use std::{env, str}; +use std::str; use tracing::{Dispatch, Subscriber}; -use tracing_subscriber::{fmt::format, prelude::*, registry::LookupSpan, reload, EnvFilter, Layer}; +use tracing_subscriber::{ + filter::{EnvFilter, FilterFn}, + fmt::format, + prelude::*, + registry::LookupSpan, + reload, Layer, +}; const ENV_LOG_LEVEL: &str = "LINKERD2_PROXY_LOG"; const ENV_LOG_FORMAT: &str = "LINKERD2_PROXY_LOG_FORMAT"; +const ENV_ACCESS_LOG: &str = "LINKERD2_PROXY_ACCESS_LOG"; const DEFAULT_LOG_LEVEL: &str = "warn,linkerd=info"; const DEFAULT_LOG_FORMAT: &str = "PLAIN"; @@ -21,6 +29,7 @@ const DEFAULT_LOG_FORMAT: &str = "PLAIN"; pub struct Settings { filter: Option, format: Option, + access_log: bool, is_test: bool, } @@ -72,6 +81,7 @@ impl Settings { Some(Self { filter, format: std::env::var(ENV_LOG_FORMAT).ok(), + access_log: std::env::var(ENV_ACCESS_LOG).is_ok(), is_test: false, }) } @@ -80,6 +90,7 @@ impl Settings { Self { filter: Some(filter), format: Some(format), + access_log: false, is_test: true, } } @@ -139,23 +150,40 @@ impl Settings { pub fn build(self) -> (Dispatch, Handle) { let log_level = self.filter.as_deref().unwrap_or(DEFAULT_LOG_LEVEL); - let (filter, level) = reload::Layer::new(EnvFilter::new(log_level)); + + let mut filter = EnvFilter::new(log_level); + + // If access logging is enabled, build the access log layer. + let access_log = if self.access_log { + let (access_log, directive) = access_log::build(); + filter = filter.add_directive(directive); + Some(access_log) + } else { + None + }; + + let (filter, level) = reload::Layer::new(filter); let level = level::Handle::new(level); let logger = match self.format().as_ref() { "JSON" => self.mk_json(), _ => self.mk_plain(), }; + let logger = logger.with_filter(FilterFn::new(|meta| { + !meta.target().starts_with(access_log::TRACE_TARGET) + })); + + let handle = Handle(Some(level)); let dispatch = tracing_subscriber::registry() .with(filter) + .with(access_log) .with(logger) .into(); - (dispatch, Handle(Some(level))) + (dispatch, handle) } } - // === impl Handle === impl Handle { diff --git a/linkerd/tracing/src/test.rs b/linkerd/tracing/src/test.rs index 894359456c..41969fa4a6 100644 --- a/linkerd/tracing/src/test.rs +++ b/linkerd/tracing/src/test.rs @@ -1,4 +1,5 @@ use super::*; +use std::env; /// By default, disable logging in modules that are expected to error in tests. pub const DEFAULT_LOG: &str = "warn,\ @@ -18,11 +19,13 @@ pub fn trace_subscriber(default: impl ToString) -> (Dispatch, Handle) { Settings::for_test(log_level, log_format).build() } -pub fn with_default_filter(default: impl ToString) -> tracing::dispatcher::DefaultGuard { - let (d, _) = trace_subscriber(default); - tracing::dispatcher::set_default(&d) +pub fn with_default_filter( + default: impl ToString, +) -> (tracing::dispatcher::DefaultGuard, crate::Handle) { + let (d, handle) = trace_subscriber(default); + (tracing::dispatcher::set_default(&d), handle) } -pub fn trace_init() -> tracing::dispatcher::DefaultGuard { +pub fn trace_init() -> (tracing::dispatcher::DefaultGuard, crate::Handle) { with_default_filter(DEFAULT_LOG) }