Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
The `statsd` source now supports Unix datagram sockets via `socket_type = "datagram"` when using `mode = "unix"`.

authors: ianneub
31 changes: 31 additions & 0 deletions src/sources/statsd/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,8 @@ mod test {
event::{EventContainer, metric::TagValue},
};

#[cfg(unix)]
use super::unix::UnixSocketType;
use super::*;
use crate::{
series,
Expand Down Expand Up @@ -506,6 +508,8 @@ mod test {
let in_path = tempfile::tempdir().unwrap().keep().join("unix_test");
let config = StatsdConfig::Unix(UnixConfig {
path: in_path.clone(),
socket_type: UnixSocketType::Stream,
max_length: crate::serde::default_max_length(),
sanitize: true,
convert_to: ConversionUnit::Seconds,
});
Expand All @@ -525,6 +529,33 @@ mod test {
.await;
}

#[cfg(unix)]
#[tokio::test]
async fn test_statsd_unix_datagram() {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async move {
let in_path = tempfile::tempdir()
.unwrap()
.keep()
.join("unix_datagram_test");
let config = StatsdConfig::Unix(UnixConfig {
path: in_path.clone(),
socket_type: UnixSocketType::Datagram,
max_length: crate::serde::default_max_length(),
sanitize: true,
convert_to: ConversionUnit::Seconds,
});
let (sender, mut receiver) = mpsc::channel(200);
tokio::spawn(async move {
let socket = tokio::net::UnixDatagram::unbound().unwrap();
while let Some(bytes) = receiver.next().await {
socket.send_to(bytes, &in_path).await.unwrap();
}
});
test_statsd(config, sender).await;
})
.await;
}

#[tokio::test]
async fn test_statsd_udp_conversion_disabled() {
let (_guard, in_addr) = next_addr();
Expand Down
61 changes: 52 additions & 9 deletions src/sources/statsd/unix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,29 @@ use crate::{
SourceSender,
codecs::Decoder,
shutdown::ShutdownSignal,
sources::{Source, util::build_unix_stream_source},
sources::{
Source,
util::{build_unix_datagram_source, build_unix_stream_source},
},
};

/// The type of Unix socket to use.
#[configurable_component]
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum UnixSocketType {
/// Stream socket (connection-oriented).
#[default]
Stream,

/// Datagram socket (connectionless).
Datagram,
}

fn default_max_length() -> usize {
crate::serde::default_max_length()
}

/// Unix domain socket configuration for the `statsd` source.
#[configurable_component]
#[derive(Clone, Debug)]
Expand All @@ -26,6 +46,18 @@ pub struct UnixConfig {
#[configurable(metadata(docs::examples = "/path/to/socket"))]
pub path: PathBuf,

/// The type of Unix socket to use.
#[serde(default)]
#[configurable(derived)]
pub socket_type: UnixSocketType,

/// The maximum buffer size of incoming messages.
///
/// Messages larger than this are truncated.
#[serde(default = "default_max_length")]
#[configurable(metadata(docs::type_unit = "bytes"))]
pub max_length: usize,

#[serde(default = "default_sanitize")]
#[configurable(derived)]
pub sanitize: bool,
Expand All @@ -48,12 +80,23 @@ pub fn statsd_unix(
))),
);

build_unix_stream_source(
config.path,
None,
decoder,
|_events, _host| {},
shutdown,
out,
)
match config.socket_type {
UnixSocketType::Stream => build_unix_stream_source(
config.path,
None,
decoder,
|_events, _host| {},
shutdown,
out,
),
UnixSocketType::Datagram => build_unix_datagram_source(
config.path,
None,
config.max_length,
decoder,
|_events, _host| {},
shutdown,
out,
),
}
}
25 changes: 25 additions & 0 deletions website/cue/reference/components/sources/generated/statsd.cue
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,19 @@ generated: components: sources: statsd: configuration: {
type: uint: unit: "seconds"
}
}
max_length: {
description: """
The maximum buffer size of incoming messages.

Messages larger than this are truncated.
"""
relevant_when: "mode = \"unix\""
required: false
type: uint: {
default: 102400
unit: "bytes"
}
}
mode: {
description: "The type of socket to use."
required: true
Expand Down Expand Up @@ -89,6 +102,18 @@ generated: components: sources: statsd: configuration: {
unit: "seconds"
}
}
socket_type: {
description: "The type of Unix socket to use."
relevant_when: "mode = \"unix\""
required: false
type: string: {
default: "stream"
enum: {
datagram: "Datagram socket (connectionless)."
stream: "Stream socket (connection-oriented)."
}
}
}
tls: {
description: "`TlsEnableableConfig` for `sources`, adding metadata from the client certificate."
relevant_when: "mode = \"tcp\""
Expand Down
Loading