Skip to content

Commit 175f709

Browse files
committed
WIP
1 parent 95d950f commit 175f709

File tree

13 files changed

+653
-4
lines changed

13 files changed

+653
-4
lines changed

Cargo.lock

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -716,6 +716,22 @@ version = "0.5.3"
716716
source = "registry+https://github.com/rust-lang/crates.io-index"
717717
checksum = "8dd5a6d5999d9907cda8ed67bbd137d3af8085216c2ac62de5be860bd41f304a"
718718

719+
[[package]]
720+
name = "linkerd2-access-log"
721+
version = "0.1.0"
722+
dependencies = [
723+
"base64 0.10.1",
724+
"bytes 0.5.4",
725+
"futures 0.3.5",
726+
"hex",
727+
"http 0.2.1",
728+
"linkerd2-error",
729+
"pin-project",
730+
"tokio",
731+
"tower",
732+
"tracing",
733+
]
734+
719735
[[package]]
720736
name = "linkerd2-addr"
721737
version = "0.1.0"
@@ -739,6 +755,7 @@ name = "linkerd2-app"
739755
version = "0.1.0"
740756
dependencies = [
741757
"bytes 0.5.4",
758+
"chrono",
742759
"futures 0.3.5",
743760
"h2 0.2.5",
744761
"http 0.2.1",
@@ -755,6 +772,7 @@ dependencies = [
755772
"linkerd2-opencensus",
756773
"linkerd2-proxy-api",
757774
"net2",
775+
"pin-project",
758776
"quickcheck",
759777
"regex 1.0.0",
760778
"ring",
@@ -783,6 +801,7 @@ dependencies = [
783801
"hyper",
784802
"indexmap",
785803
"libc",
804+
"linkerd2-access-log",
786805
"linkerd2-addr",
787806
"linkerd2-admit",
788807
"linkerd2-buffer",

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
[workspace]
22
members = [
33
"hyper-balance",
4+
"linkerd/access-log",
45
"linkerd/addr",
56
"linkerd/admit",
67
"linkerd/app/core",

linkerd/access-log/Cargo.toml

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
[package]
2+
name = "linkerd2-access-log"
3+
version = "0.1.0"
4+
authors = ["Linkerd Developers <[email protected]>"]
5+
edition = "2018"
6+
publish = false
7+
8+
[dependencies]
9+
base64 = "0.10.1"
10+
bytes = "0.5"
11+
futures = "0.3"
12+
hex = "0.3.2"
13+
http = "0.2"
14+
linkerd2-error = { path = "../error" }
15+
tower = { version = "0.3", default-features = false }
16+
tracing = "0.1.2"
17+
tokio = {version = "0.2", features = ["sync"]}
18+
pin-project = "0.4"

linkerd/access-log/src/layer.rs

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
use crate::tracker::{ResponseTracker, TrackerState};
2+
use crate::AccessLog;
3+
use futures::{ready, TryFuture};
4+
use pin_project::pin_project;
5+
use std::future::Future;
6+
use std::pin::Pin;
7+
use std::sync::Arc;
8+
use std::task::{Context, Poll};
9+
use tokio::sync::mpsc;
10+
11+
/// A layer that adds access logging
12+
#[derive(Clone)]
13+
pub struct AccessLogLayer {
14+
shared: Option<Arc<TrackerState>>,
15+
}
16+
17+
#[derive(Clone)]
18+
pub struct AccessLogContext<Svc> {
19+
inner: Svc,
20+
shared: Option<Arc<TrackerState>>,
21+
}
22+
23+
#[pin_project]
24+
pub struct ResponseFuture<F> {
25+
tracker: Option<ResponseTracker>,
26+
27+
#[pin]
28+
inner: F,
29+
}
30+
31+
impl<Svc> tower::layer::Layer<Svc> for AccessLogLayer {
32+
type Service = AccessLogContext<Svc>;
33+
34+
fn layer(&self, inner: Svc) -> Self::Service {
35+
Self::Service {
36+
inner,
37+
shared: self.shared.clone(),
38+
}
39+
}
40+
}
41+
42+
impl<Svc, B1, B2> tower::Service<http::Request<B1>> for AccessLogContext<Svc>
43+
where
44+
Svc: tower::Service<http::Request<B1>, Response = http::Response<B2>>,
45+
{
46+
type Response = Svc::Response;
47+
type Error = Svc::Error;
48+
type Future = ResponseFuture<Svc::Future>;
49+
50+
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Svc::Error>> {
51+
self.inner.poll_ready(cx)
52+
}
53+
54+
fn call(&mut self, mut request: http::Request<B1>) -> Self::Future {
55+
let shared = match &self.shared {
56+
Some(shared) => shared,
57+
None => {
58+
return ResponseFuture {
59+
tracker: None,
60+
inner: self.inner.call(request),
61+
}
62+
}
63+
};
64+
65+
let (request_tracker, response_tracker) = shared.tracker();
66+
request_tracker.register(&mut request);
67+
68+
ResponseFuture {
69+
inner: self.inner.call(request),
70+
tracker: Some(response_tracker),
71+
}
72+
}
73+
}
74+
75+
impl AccessLogLayer {
76+
pub fn new(sink: Option<mpsc::Sender<AccessLog>>) -> Self {
77+
Self {
78+
shared: sink.map(|s| Arc::new(TrackerState::new(s))),
79+
}
80+
}
81+
}
82+
83+
impl<F, B2> Future for ResponseFuture<F>
84+
where
85+
F: TryFuture<Ok = http::Response<B2>>,
86+
{
87+
type Output = Result<F::Ok, F::Error>;
88+
89+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
90+
let this = self.project();
91+
let mut inner: http::Response<B2> = ready!(this.inner.try_poll(cx))?;
92+
93+
if let Some(tracker) = this.tracker.take() {
94+
tracker.register(&mut inner);
95+
}
96+
97+
Poll::Ready(Ok(inner))
98+
}
99+
}

linkerd/access-log/src/lib.rs

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
#![deny(warnings, rust_2018_idioms)]
2+
3+
use linkerd2_error::Error;
4+
use tokio::sync::mpsc;
5+
6+
pub mod layer;
7+
mod tracker;
8+
9+
use http::{HeaderValue, Method, StatusCode, Uri};
10+
pub use layer::{AccessLogContext, AccessLogLayer};
11+
12+
#[derive(Debug)]
13+
pub struct AccessLog {
14+
pub uri: Uri,
15+
pub method: Method,
16+
pub status: StatusCode,
17+
pub host: Option<HeaderValue>,
18+
pub user_agent: Option<HeaderValue>,
19+
pub trace_id: Option<HeaderValue>,
20+
pub request_processing_time_us: u64,
21+
pub upstream_processing_time_us: u64,
22+
pub response_processing_time_us: u64,
23+
}
24+
25+
impl Default for AccessLog {
26+
fn default() -> Self {
27+
AccessLog {
28+
uri: Default::default(),
29+
method: Default::default(),
30+
status: Default::default(),
31+
host: None,
32+
user_agent: None,
33+
trace_id: None,
34+
request_processing_time_us: 0,
35+
upstream_processing_time_us: 0,
36+
response_processing_time_us: 0,
37+
}
38+
}
39+
}
40+
41+
pub trait AccessLogSink {
42+
fn try_send(&mut self, log: AccessLog) -> Result<(), Error>;
43+
}
44+
45+
impl AccessLogSink for mpsc::Sender<AccessLog> {
46+
fn try_send(&mut self, span: AccessLog) -> Result<(), Error> {
47+
self.try_send(span).map_err(Into::into)
48+
}
49+
}

0 commit comments

Comments
 (0)