Skip to content

Commit 141cfe2

Browse files
committed
Add access logging as requested in linkerd/linkerd2#1913
Will write space-delimited access logs to a file specified by the LINKERD2_PROXY_ACCESS_LOG_FILE environment variable in a best-effort fashion Signed-off-by: Raphael Taylor-Davies <[email protected]>
1 parent 95d950f commit 141cfe2

File tree

12 files changed

+412
-4
lines changed

12 files changed

+412
-4
lines changed

Cargo.lock

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -716,6 +716,22 @@ version = "0.5.3"
716716
source = "registry+https://github.com/rust-lang/crates.io-index"
717717
checksum = "8dd5a6d5999d9907cda8ed67bbd137d3af8085216c2ac62de5be860bd41f304a"
718718

719+
[[package]]
720+
name = "linkerd2-access-log"
721+
version = "0.1.0"
722+
dependencies = [
723+
"base64 0.10.1",
724+
"bytes 0.5.4",
725+
"futures 0.3.5",
726+
"hex",
727+
"http 0.2.1",
728+
"linkerd2-error",
729+
"pin-project",
730+
"tokio",
731+
"tower",
732+
"tracing",
733+
]
734+
719735
[[package]]
720736
name = "linkerd2-addr"
721737
version = "0.1.0"
@@ -739,6 +755,7 @@ name = "linkerd2-app"
739755
version = "0.1.0"
740756
dependencies = [
741757
"bytes 0.5.4",
758+
"chrono",
742759
"futures 0.3.5",
743760
"h2 0.2.5",
744761
"http 0.2.1",
@@ -755,6 +772,7 @@ dependencies = [
755772
"linkerd2-opencensus",
756773
"linkerd2-proxy-api",
757774
"net2",
775+
"pin-project",
758776
"quickcheck",
759777
"regex 1.0.0",
760778
"ring",
@@ -783,6 +801,7 @@ dependencies = [
783801
"hyper",
784802
"indexmap",
785803
"libc",
804+
"linkerd2-access-log",
786805
"linkerd2-addr",
787806
"linkerd2-admit",
788807
"linkerd2-buffer",

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
[workspace]
22
members = [
33
"hyper-balance",
4+
"linkerd/access-log",
45
"linkerd/addr",
56
"linkerd/admit",
67
"linkerd/app/core",

linkerd/access-log/Cargo.toml

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
[package]
2+
name = "linkerd2-access-log"
3+
version = "0.1.0"
4+
authors = ["Linkerd Developers <[email protected]>"]
5+
edition = "2018"
6+
publish = false
7+
8+
[dependencies]
9+
base64 = "0.10.1"
10+
bytes = "0.5"
11+
futures = "0.3"
12+
hex = "0.3.2"
13+
http = "0.2"
14+
linkerd2-error = { path = "../error" }
15+
tower = { version = "0.3", default-features = false }
16+
tracing = "0.1.2"
17+
tokio = {version = "0.2", features = ["sync"]}
18+
pin-project = "0.4"

linkerd/access-log/src/layer.rs

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
use crate::AccessLog;
2+
use futures::{ready, TryFuture};
3+
use pin_project::pin_project;
4+
use std::future::Future;
5+
use std::pin::Pin;
6+
use std::task::{Context, Poll};
7+
use std::time::SystemTime;
8+
use tokio::sync::mpsc;
9+
use tracing::warn;
10+
11+
/// A layer that adds access logging
12+
#[derive(Clone)]
13+
pub struct AccessLogLayer {
14+
sink: Option<mpsc::Sender<AccessLog>>,
15+
}
16+
17+
#[derive(Clone)]
18+
pub struct AccessLogContext<Svc> {
19+
inner: Svc,
20+
sink: Option<mpsc::Sender<AccessLog>>,
21+
}
22+
23+
#[pin_project]
24+
pub struct ResponseFuture<F> {
25+
state: Option<(AccessLog, mpsc::Sender<AccessLog>)>,
26+
27+
#[pin]
28+
inner: F,
29+
}
30+
31+
impl<Svc> tower::layer::Layer<Svc> for AccessLogLayer {
32+
type Service = AccessLogContext<Svc>;
33+
34+
fn layer(&self, inner: Svc) -> Self::Service {
35+
Self::Service {
36+
inner,
37+
sink: self.sink.clone(),
38+
}
39+
}
40+
}
41+
42+
impl<Svc, B1, B2> tower::Service<http::Request<B1>> for AccessLogContext<Svc>
43+
where
44+
Svc: tower::Service<http::Request<B1>, Response = http::Response<B2>>,
45+
{
46+
type Response = Svc::Response;
47+
type Error = Svc::Error;
48+
type Future = ResponseFuture<Svc::Future>;
49+
50+
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Svc::Error>> {
51+
self.inner.poll_ready(cx)
52+
}
53+
54+
fn call(&mut self, request: http::Request<B1>) -> Self::Future {
55+
let sink = match &self.sink {
56+
Some(sink) => sink,
57+
None => {
58+
return ResponseFuture {
59+
state: None,
60+
inner: self.inner.call(request),
61+
}
62+
}
63+
};
64+
65+
let t0 = SystemTime::now();
66+
67+
let host = request.headers().get("Host").map(|x| x.clone());
68+
69+
let trace_id = request
70+
.headers()
71+
.get("X-Amzn-Trace-Id")
72+
.or_else(|| request.headers().get("X-Request-ID"))
73+
.map(|x| x.clone());
74+
75+
let user_agent = request.headers().get("User-Agent").map(|x| x.clone());
76+
77+
let log = AccessLog {
78+
uri: request.uri().clone(),
79+
method: request.method().clone(),
80+
status: Default::default(),
81+
host,
82+
user_agent,
83+
trace_id,
84+
start_time: t0,
85+
end_time: t0,
86+
};
87+
88+
ResponseFuture {
89+
state: Some((log, sink.clone())),
90+
inner: self.inner.call(request),
91+
}
92+
}
93+
}
94+
95+
impl AccessLogLayer {
96+
pub fn new(sink: Option<mpsc::Sender<AccessLog>>) -> Self {
97+
Self { sink }
98+
}
99+
}
100+
101+
impl<F, B2> Future for ResponseFuture<F>
102+
where
103+
F: TryFuture<Ok = http::Response<B2>>,
104+
{
105+
type Output = Result<F::Ok, F::Error>;
106+
107+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
108+
let this = self.project();
109+
let response: http::Response<B2> = ready!(this.inner.try_poll(cx))?;
110+
111+
if let Some((mut log, mut sink)) = this.state.take() {
112+
log.end_time = SystemTime::now();
113+
log.status = response.status().clone();
114+
115+
if let Err(error) = sink.try_send(log) {
116+
warn!(message = "access log dropped", %error);
117+
}
118+
}
119+
120+
Poll::Ready(Ok(response))
121+
}
122+
}

linkerd/access-log/src/lib.rs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
#![deny(warnings, rust_2018_idioms)]
2+
3+
use linkerd2_error::Error;
4+
use tokio::sync::mpsc;
5+
6+
pub mod layer;
7+
8+
use http::{HeaderValue, Method, StatusCode, Uri};
9+
pub use layer::{AccessLogContext, AccessLogLayer};
10+
use std::time::SystemTime;
11+
12+
#[derive(Debug)]
13+
pub struct AccessLog {
14+
pub uri: Uri,
15+
pub method: Method,
16+
pub status: StatusCode,
17+
pub host: Option<HeaderValue>,
18+
pub user_agent: Option<HeaderValue>,
19+
pub trace_id: Option<HeaderValue>,
20+
pub start_time: SystemTime,
21+
pub end_time: SystemTime,
22+
}
23+
24+
pub trait AccessLogSink {
25+
fn try_send(&mut self, log: AccessLog) -> Result<(), Error>;
26+
}
27+
28+
impl AccessLogSink for mpsc::Sender<AccessLog> {
29+
fn try_send(&mut self, span: AccessLog) -> Result<(), Error> {
30+
self.try_send(span).map_err(Into::into)
31+
}
32+
}

linkerd/app/Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ This is used by tests and the executable.
1414
mock-orig-dst = ["linkerd2-app-core/mock-orig-dst"]
1515

1616
[dependencies]
17+
chrono = "0.4"
1718
futures = { version = "0.3" }
1819
http-body = "0.3"
1920
indexmap = "1.0"
@@ -24,8 +25,9 @@ linkerd2-app-inbound = { path = "./inbound" }
2425
linkerd2-app-outbound = { path = "./outbound" }
2526
linkerd2-opencensus = { path = "../opencensus" }
2627
linkerd2-error = { path = "../error" }
28+
pin-project = "0.4"
2729
regex = "1.0.0"
28-
tokio = { version = "0.2", features = ["rt-util"] }
30+
tokio = { version = "0.2", features = ["rt-util", "fs"] }
2931
tonic = { version = "0.2", default-features = false, features = ["prost"] }
3032
tower = "0.3"
3133
tracing = "0.1.9"

linkerd/app/core/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ http-body = "0.3"
2222
hyper = "0.13"
2323
futures = "0.3"
2424
indexmap = "1.0"
25+
linkerd2-access-log = { path = "../../access-log" }
2526
linkerd2-addr = { path = "../../addr" }
2627
linkerd2-admit = { path = "../../admit" }
2728
linkerd2-cache = { path = "../../cache" }

linkerd/app/core/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
#![type_length_limit = "1586225"]
1010
#![deny(warnings, rust_2018_idioms)]
1111

12+
pub use linkerd2_access_log::{AccessLog, AccessLogLayer};
1213
pub use linkerd2_addr::{self as addr, Addr, NameAddr};
1314
pub use linkerd2_admit as admit;
1415
pub use linkerd2_cache as cache;

linkerd/app/inbound/src/lib.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@ use linkerd2_app_core::{
2727
spans::SpanConverter,
2828
svc::{self, NewService},
2929
transport::{self, io::BoxedIo, tls},
30-
Error, ProxyMetrics, TraceContextLayer, DST_OVERRIDE_HEADER, L5D_CLIENT_ID, L5D_REMOTE_IP,
31-
L5D_SERVER_ID,
30+
AccessLog, AccessLogLayer, Error, ProxyMetrics, TraceContextLayer, DST_OVERRIDE_HEADER,
31+
L5D_CLIENT_ID, L5D_REMOTE_IP, L5D_SERVER_ID,
3232
};
3333
use std::collections::HashMap;
3434
use tokio::sync::mpsc;
@@ -61,6 +61,7 @@ impl Config {
6161
tap_layer: tap::Layer,
6262
metrics: ProxyMetrics,
6363
span_sink: Option<mpsc::Sender<oc::Span>>,
64+
access_log_sink: Option<mpsc::Sender<AccessLog>>,
6465
drain: drain::Watch,
6566
) -> Result<(), Error>
6667
where
@@ -98,6 +99,7 @@ impl Config {
9899
local_identity,
99100
metrics,
100101
span_sink,
102+
access_log_sink,
101103
drain,
102104
)
103105
.await
@@ -304,6 +306,7 @@ impl Config {
304306
local_identity: tls::Conditional<identity::Local>,
305307
metrics: ProxyMetrics,
306308
span_sink: Option<mpsc::Sender<oc::Span>>,
309+
access_log_sink: Option<mpsc::Sender<AccessLog>>,
307310
drain: drain::Watch,
308311
) -> Result<(), Error>
309312
where
@@ -363,6 +366,7 @@ impl Config {
363366
.push(errors::layer());
364367

365368
let http_server_observability = svc::layers()
369+
.push(AccessLogLayer::new(access_log_sink))
366370
.push(TraceContextLayer::new(span_sink.map(|span_sink| {
367371
SpanConverter::server(span_sink, trace_labels())
368372
})))

0 commit comments

Comments
 (0)