Skip to content

Commit dd14f56

Browse files
swsnrhawkw
authored andcommitted
journald: send large journal payloads through memfd (#1744)
See #1698: Properly write large payloads to journal. I'd appreciate a very careful review; this cmsg stuff is nasty, and while it's well documented in `cmsg(3)` I had to fiddle a bit because the corresponding functions in libc aren't const and thus don't permit a direct allocation of the buffer as most `cmsg` C code around does. Closes #1698 ## Motivation Linux limits the maximum amount of data permitted for a single Unix datagram; sending large payloads directly will fail. ## Solution Follow systemd.io/JOURNAL_NATIVE_PROTOCOL/ and check for `EMSGSIZE` from `send()`; in this case write the payload to a memfd, seal it, and pass it on to journald via a corresponding SCM_RIGHTS control message. Per discussion in #1698 this adds no dependency on `nix`, and instead implements fd forwarding directly with some bits of unsafe `libc` code.
1 parent 3be0f8c commit dd14f56

File tree

5 files changed

+167
-10
lines changed

5 files changed

+167
-10
lines changed

tracing-journald/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ keywords = ["tracing", "journald"]
1616
rust-version = "1.42.0"
1717

1818
[dependencies]
19+
libc = "0.2.107"
1920
tracing-core = { path = "../tracing-core", version = "0.1.10" }
2021
tracing-subscriber = { path = "../tracing-subscriber", version = "0.3" }
2122

tracing-journald/src/lib.rs

Lines changed: 49 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,11 @@ use tracing_core::{
4949
};
5050
use tracing_subscriber::{layer::Context, registry::LookupSpan};
5151

52+
#[cfg(target_os = "linux")]
53+
mod memfd;
54+
#[cfg(target_os = "linux")]
55+
mod socket;
56+
5257
/// Sends events and their fields to journald
5358
///
5459
/// [journald conventions] for structured field names differ from typical tracing idioms, and journald
@@ -109,6 +114,48 @@ impl Layer {
109114
self.field_prefix = x;
110115
self
111116
}
117+
118+
#[cfg(not(unix))]
119+
fn send_payload(&self, _opayload: &[u8]) -> io::Result<()> {
120+
Err(io::Error::new(
121+
io::ErrorKind::Unsupported,
122+
"journald not supported on non-Unix",
123+
))
124+
}
125+
126+
#[cfg(unix)]
127+
fn send_payload(&self, payload: &[u8]) -> io::Result<usize> {
128+
self.socket.send(payload).or_else(|error| {
129+
if Some(libc::EMSGSIZE) == error.raw_os_error() {
130+
self.send_large_payload(payload)
131+
} else {
132+
Err(error)
133+
}
134+
})
135+
}
136+
137+
#[cfg(all(unix, not(target_os = "linux")))]
138+
fn send_large_payload(&self, _payload: &[u8]) -> io::Result<usize> {
139+
Err(io::Error::new(
140+
io::ErrorKind::Unsupported,
141+
"Large payloads not supported on non-Linux OS",
142+
))
143+
}
144+
145+
/// Send large payloads to journald via a memfd.
146+
#[cfg(target_os = "linux")]
147+
fn send_large_payload(&self, payload: &[u8]) -> io::Result<usize> {
148+
// If the payload's too large for a single datagram, send it through a memfd, see
149+
// https://systemd.io/JOURNAL_NATIVE_PROTOCOL/
150+
use std::os::unix::prelude::AsRawFd;
151+
// Write the whole payload to a memfd
152+
let mut mem = memfd::create_sealable()?;
153+
mem.write_all(payload)?;
154+
// Fully seal the memfd to signal journald that its backing data won't resize anymore
155+
// and so is safe to mmap.
156+
memfd::seal_fully(mem.as_raw_fd())?;
157+
socket::send_one_fd(&self.socket, mem.as_raw_fd())
158+
}
112159
}
113160

114161
/// Construct a journald layer
@@ -174,9 +221,8 @@ where
174221
self.field_prefix.as_ref().map(|x| &x[..]),
175222
));
176223

177-
// What could we possibly do on error?
178-
#[cfg(unix)]
179-
let _ = self.socket.send(&buf);
224+
// At this point we can't handle the error anymore so just ignore it.
225+
let _ = self.send_payload(&buf);
180226
}
181227
}
182228

tracing-journald/src/memfd.rs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
//! memfd helpers.
2+
3+
use libc::*;
4+
use std::fs::File;
5+
use std::io::Error;
6+
use std::io::Result;
7+
use std::os::raw::c_uint;
8+
use std::os::unix::prelude::{FromRawFd, RawFd};
9+
10+
fn create(flags: c_uint) -> Result<File> {
11+
let fd = unsafe { memfd_create("tracing-journald\0".as_ptr() as *const c_char, flags) };
12+
if fd < 0 {
13+
Err(Error::last_os_error())
14+
} else {
15+
Ok(unsafe { File::from_raw_fd(fd as RawFd) })
16+
}
17+
}
18+
19+
pub fn create_sealable() -> Result<File> {
20+
create(MFD_ALLOW_SEALING | MFD_CLOEXEC)
21+
}
22+
23+
pub fn seal_fully(fd: RawFd) -> Result<()> {
24+
let all_seals = F_SEAL_SHRINK | F_SEAL_GROW | F_SEAL_WRITE | F_SEAL_SEAL;
25+
let result = unsafe { fcntl(fd, F_ADD_SEALS, all_seals) };
26+
if result < 0 {
27+
Err(Error::last_os_error())
28+
} else {
29+
Ok(())
30+
}
31+
}

tracing-journald/src/socket.rs

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
//! socket helpers.
2+
3+
use std::io::{Error, Result};
4+
use std::mem::{size_of, zeroed};
5+
use std::os::unix::net::UnixDatagram;
6+
use std::os::unix::prelude::{AsRawFd, RawFd};
7+
use std::ptr;
8+
9+
use libc::*;
10+
11+
const CMSG_BUFSIZE: usize = 64;
12+
13+
#[repr(C)]
14+
union AlignedBuffer<T: Copy + Clone> {
15+
buffer: T,
16+
align: cmsghdr,
17+
}
18+
19+
fn assert_cmsg_bufsize() {
20+
let space_one_fd = unsafe { CMSG_SPACE(size_of::<RawFd>() as u32) };
21+
assert!(
22+
space_one_fd <= CMSG_BUFSIZE as u32,
23+
"cmsghdr buffer too small (< {}) to hold a single fd",
24+
space_one_fd
25+
);
26+
}
27+
28+
#[cfg(test)]
29+
#[test]
30+
fn cmsg_buffer_size_for_one_fd() {
31+
assert_cmsg_bufsize()
32+
}
33+
34+
pub fn send_one_fd(socket: &UnixDatagram, fd: RawFd) -> Result<usize> {
35+
assert_cmsg_bufsize();
36+
37+
let mut cmsg_buffer = AlignedBuffer {
38+
buffer: ([0u8; CMSG_BUFSIZE]),
39+
};
40+
let mut msg: msghdr = unsafe { zeroed() };
41+
42+
// We send no data body with this message.
43+
msg.msg_iov = ptr::null_mut();
44+
msg.msg_iovlen = 0;
45+
46+
msg.msg_control = unsafe { cmsg_buffer.buffer.as_mut_ptr() as _ };
47+
msg.msg_controllen = unsafe { CMSG_SPACE(size_of::<RawFd>() as _) as _ };
48+
49+
let mut cmsg: &mut cmsghdr =
50+
unsafe { CMSG_FIRSTHDR(&msg).as_mut() }.expect("Control message buffer exhausted");
51+
52+
cmsg.cmsg_level = SOL_SOCKET;
53+
cmsg.cmsg_type = SCM_RIGHTS;
54+
cmsg.cmsg_len = unsafe { CMSG_LEN(size_of::<RawFd>() as _) as _ };
55+
56+
unsafe { ptr::write(CMSG_DATA(cmsg) as *mut RawFd, fd) };
57+
58+
let result = unsafe { sendmsg(socket.as_raw_fd(), &msg, libc::MSG_NOSIGNAL) };
59+
60+
if result < 0 {
61+
Err(Error::last_os_error())
62+
} else {
63+
// sendmsg returns the number of bytes written
64+
Ok(result as usize)
65+
}
66+
}

tracing-journald/tests/journal.rs

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -54,14 +54,14 @@ impl PartialEq<[u8]> for Field {
5454
}
5555
}
5656

57-
/// Retry `f` 10 times 100ms apart.
57+
/// Retry `f` 30 times 100ms apart, i.e. a total of three seconds.
5858
///
5959
/// When `f` returns an error wait 100ms and try it again, up to ten times.
6060
/// If the last attempt failed return the error returned by that attempt.
6161
///
6262
/// If `f` returns Ok immediately return the result.
6363
fn retry<T, E>(f: impl Fn() -> Result<T, E>) -> Result<T, E> {
64-
let attempts = 10;
64+
let attempts = 30;
6565
let interval = Duration::from_millis(100);
6666
for attempt in (0..attempts).rev() {
6767
match f() {
@@ -85,7 +85,8 @@ fn retry<T, E>(f: impl Fn() -> Result<T, E>) -> Result<T, E> {
8585
fn read_from_journal(test_name: &str) -> Vec<HashMap<String, Field>> {
8686
let stdout = String::from_utf8(
8787
Command::new("journalctl")
88-
.args(&["--user", "--output=json"])
88+
// We pass --all to circumvent journalctl's default limit of 4096 bytes for field values
89+
.args(&["--user", "--output=json", "--all"])
8990
// Filter by the PID of the current test process
9091
.arg(format!("_PID={}", std::process::id()))
9192
.arg(format!("TEST_NAME={}", test_name))
@@ -97,10 +98,7 @@ fn read_from_journal(test_name: &str) -> Vec<HashMap<String, Field>> {
9798

9899
stdout
99100
.lines()
100-
.map(|l| {
101-
dbg!(l);
102-
serde_json::from_str(l).unwrap()
103-
})
101+
.map(|l| serde_json::from_str(l).unwrap())
104102
.collect()
105103
}
106104

@@ -169,3 +167,18 @@ fn internal_null_byte() {
169167
assert_eq!(message["PRIORITY"], "6");
170168
});
171169
}
170+
171+
#[test]
172+
fn large_message() {
173+
let large_string = "b".repeat(512_000);
174+
with_journald(|| {
175+
debug!(test.name = "large_message", "Message: {}", large_string);
176+
177+
let message = retry_read_one_line_from_journal("large_message");
178+
assert_eq!(
179+
message["MESSAGE"],
180+
format!("Message: {}", large_string).as_str()
181+
);
182+
assert_eq!(message["PRIORITY"], "6");
183+
});
184+
}

0 commit comments

Comments
 (0)