Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions lib/vector-core/src/event/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub(super) struct Inner {
pub(crate) secrets: Secrets,

#[serde(default, skip)]
finalizers: EventFinalizers,
pub(crate) finalizers: EventFinalizers,

/// The id of the source
pub(crate) source_id: Option<Arc<ComponentKey>>,
Expand All @@ -60,15 +60,15 @@ pub(super) struct Inner {
///
/// TODO(Jean): must not skip serialization to track schemas across restarts.
#[serde(default = "default_schema_definition", skip)]
schema_definition: Arc<schema::Definition>,
pub(crate) schema_definition: Arc<schema::Definition>,

/// A store of values that may be dropped during the encoding process but may be needed
/// later on. The map is indexed by meaning.
/// Currently this is just used for the `service`. If the service field is dropped by `only_fields`
/// we need to ensure it is still available later on for emitting metrics tagged by the service.
/// This field could almost be keyed by `&'static str`, but because it needs to be deserializable
/// we have to use `String`.
dropped_fields: ObjectMap,
pub(crate) dropped_fields: ObjectMap,

/// Metadata to track the origin of metrics. This is always `None` for log and trace events.
/// Only a small set of Vector sources and transforms explicitly set this field.
Expand Down Expand Up @@ -264,7 +264,7 @@ impl Default for EventMetadata {
}
}

fn default_schema_definition() -> Arc<schema::Definition> {
pub(super) fn default_schema_definition() -> Arc<schema::Definition> {
Arc::new(schema::Definition::new_with_default_metadata(
Kind::any(),
[LogNamespace::Legacy, LogNamespace::Vector],
Expand Down
66 changes: 34 additions & 32 deletions lib/vector-core/src/event/proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ pub use metric::Value as MetricValue;
pub use proto_event::*;
use vrl::value::{ObjectMap, Value as VrlValue};

use super::EventFinalizers;
use super::metadata::{Inner, default_schema_definition};
use super::{EventMetadata, array, metric::MetricSketch};

impl event_array::Events {
Expand Down Expand Up @@ -644,49 +646,49 @@ impl From<EventMetadata> for Metadata {

impl From<Metadata> for EventMetadata {
fn from(value: Metadata) -> Self {
let mut metadata = EventMetadata::default();

if let Some(value) = value.value.and_then(decode_value) {
*metadata.value_mut() = value;
}

if let Some(source_id) = value.source_id {
metadata.set_source_id(Arc::new(source_id.into()));
}

if let Some(source_type) = value.source_type {
metadata.set_source_type(source_type);
}

if let Some(upstream_id) = value.upstream_id {
metadata.set_upstream_id(Arc::new(upstream_id.into()));
}

if let Some(secrets) = value.secrets {
metadata.secrets_mut().merge(secrets.into());
}

if let Some(origin_metadata) = value.datadog_origin_metadata {
metadata = metadata.with_origin_metadata(origin_metadata.into());
}

let maybe_source_event_id = if value.source_event_id.is_empty() {
let Metadata {
value: metadata_value,
source_id,
source_type,
upstream_id,
secrets,
datadog_origin_metadata,
source_event_id,
} = value;

let metadata_value = metadata_value.and_then(decode_value);
let source_id = source_id.map(|s| Arc::new(s.into()));
let upstream_id = upstream_id.map(|id| Arc::new(id.into()));
let secrets = secrets.map(Into::into);
let datadog_origin_metadata = datadog_origin_metadata.map(Into::into);
let source_event_id = if source_event_id.is_empty() {
None
} else {
match Uuid::from_slice(&value.source_event_id) {
match Uuid::from_slice(&source_event_id) {
Ok(id) => Some(id),
Err(error) => {
error!(
message = "Failed to parse source_event_id: {}",
%error
%error,
source_event_id = %String::from_utf8_lossy(&source_event_id),
"Failed to parse source_event_id.",
);
None
}
}
};
metadata = metadata.with_source_event_id(maybe_source_event_id);

metadata
EventMetadata(Arc::new(Inner {
value: metadata_value.unwrap_or_else(|| vrl::value::Value::Object(ObjectMap::new())),
secrets: secrets.unwrap_or_default(),
finalizers: EventFinalizers::default(),
source_id,
source_type: source_type.map(Into::into),
upstream_id,
schema_definition: default_schema_definition(),
dropped_fields: ObjectMap::new(),
datadog_origin_metadata,
source_event_id,
}))
}
}

Expand Down
Loading