Skip to content

[pkg/loki] Group Loki requests by the attribute inferred from the loki.tenant hint. #14930

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions .chloggen/loki-tenant-id-from-attributes.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: pkg/translator/loki

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add support for grouping Loki requests by attribute that is resolved from the `loki.tenant` hint.

# One or more tracking issues related to the change
issues: [14706]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
15 changes: 14 additions & 1 deletion pkg/translator/loki/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
const (
hintAttributes = "loki.attribute.labels"
hintResources = "loki.resource.labels"
hintTenant = "loki.tenant"
)

var defaultExporterLabels = model.LabelSet{"exporter": "OTLP"}
Expand All @@ -48,6 +49,18 @@ func convertAttributesAndMerge(logAttrs pcommon.Map, resAttrs pcommon.Map) model
out = out.Merge(labels)
}

// get tenant hint from resource attributes, fallback to record attributes
// if it is not found
if resourcesToLabel, found := resAttrs.Get(hintTenant); !found {
if attributesToLabel, found := logAttrs.Get(hintTenant); found {
labels := convertAttributesToLabels(logAttrs, attributesToLabel)
out = out.Merge(labels)
}
} else {
labels := convertAttributesToLabels(resAttrs, resourcesToLabel)
out = out.Merge(labels)
}

return out
}

Expand Down Expand Up @@ -87,7 +100,7 @@ func parseAttributeNames(attrsToSelect pcommon.Value) []string {

func removeAttributes(attrs pcommon.Map, labels model.LabelSet) {
attrs.RemoveIf(func(s string, v pcommon.Value) bool {
if s == hintAttributes || s == hintResources {
if s == hintAttributes || s == hintResources || s == hintTenant {
return true
}

Expand Down
130 changes: 130 additions & 0 deletions pkg/translator/loki/logs_to_loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,142 @@ import (
"go.opentelemetry.io/collector/pdata/plog"
)

type PushRequest struct {
*logproto.PushRequest
Report *PushReport
}

// PushReport contains the summary for the outcome of a LogsToLoki operation
type PushReport struct {
Errors []error
NumSubmitted int
NumDropped int
}

// LogsToLokiRequests converts a Logs pipeline data into Loki PushRequests grouped
// by tenant. The tenant value is inferred from the `loki.tenant` resource or log
// attribute hint. If the `loki.tenant` attribute is present in both resource or
// log attributes, then the resource attribute takes precedence.
// Labels for each record are inferred based on the hints "loki.attribute.labels"
// and "loki.resource.labels". Each hint might contain a comma-separated list of
// attributes (resource or record) that should be promoted to a Loki label. Those
// attributes are removed from the body as a result, otherwise they would be shown
// in duplicity in Loki.
// PushStreams are created based on the labels: all records containing the same
// set of labels are part of the same stream. All streams are then packed within
// the resulting PushRequest.
// When this function isn't able to marshal a log record, the log record is dropped
// and processing continues, so that the caller can decide to either skip the entire
// batch or send only the data that could be parsed. The caller can use the PushReport
// to make this decision, as it includes all of the errors that were encountered,
// as well as the number of items dropped and submitted.
func LogsToLokiRequests(ld plog.Logs) map[string]PushRequest {
groups := map[string]pushRequestGroup{}

rls := ld.ResourceLogs()
for i := 0; i < rls.Len(); i++ {
ills := rls.At(i).ScopeLogs()

for j := 0; j < ills.Len(); j++ {
logs := ills.At(j).LogRecords()
for k := 0; k < logs.Len(); k++ {

// similarly, we may remove attributes, so change only our version
log := plog.NewLogRecord()
logs.At(k).CopyTo(log)

// we may remove attributes, so we make a copy and change our version
resource := pcommon.NewResource()
rls.At(i).Resource().CopyTo(resource)

// resolve tenant and get/create a push request group
tenant := getTenantFromTenantHint(log.Attributes(), resource.Attributes())
group, ok := groups[tenant]
if !ok {
group = pushRequestGroup{
report: &PushReport{},
streams: make(map[string]*logproto.Stream),
}
groups[tenant] = group
}

mergedLabels := convertAttributesAndMerge(log.Attributes(), resource.Attributes())
// remove the attributes that were promoted to labels
removeAttributes(log.Attributes(), mergedLabels)
removeAttributes(resource.Attributes(), mergedLabels)

// create the stream name based on the labels
labels := mergedLabels.String()

entry, err := convertLogToJSONEntry(log, resource)
if err != nil {
// Couldn't convert so dropping log.
group.report.Errors = append(group.report.Errors, fmt.Errorf("failed to convert, dropping log: %w", err))
group.report.NumDropped++
continue
}

group.report.NumSubmitted++

if stream, ok := group.streams[labels]; ok {
stream.Entries = append(stream.Entries, *entry)
continue
}

group.streams[labels] = &logproto.Stream{
Labels: labels,
Entries: []logproto.Entry{*entry},
}
}
}
}

requests := make(map[string]PushRequest)
for tenant, g := range groups {
pr := &logproto.PushRequest{
Streams: make([]logproto.Stream, len(g.streams)),
}

i := 0
for _, stream := range g.streams {
pr.Streams[i] = *stream
i++
}
requests[tenant] = PushRequest{
PushRequest: pr,
Report: g.report,
}
}
return requests
}

// getTenantFromTenantHint extract an attribute based on the tenant hint.
// it looks up for the attribute first in resource attributes and fallbacks to
// record attributes if it is not found.
func getTenantFromTenantHint(logAttr pcommon.Map, resourceAttr pcommon.Map) string {
var tenant string
hintAttr, found := resourceAttr.Get(hintTenant)
if !found {
if hintAttr, found = logAttr.Get(hintTenant); !found {
return tenant
}
}

if tenantAttr, found := resourceAttr.Get(hintAttr.Str()); found {
tenant = tenantAttr.Str()
} else {
if tenantAttr, found = logAttr.Get(hintAttr.Str()); found {
tenant = tenantAttr.Str()
}
}
return tenant
}

type pushRequestGroup struct {
streams map[string]*logproto.Stream
report *PushReport
}

// LogsToLoki converts a Logs pipeline data into a Loki PushRequest.
// Labels for each record are inferred based on the hints "loki.attribute.labels"
// and "loki.resource.labels". Each hint might contain a comma-separated list of
Expand All @@ -43,6 +172,7 @@ type PushReport struct {
// batch or send only the data that could be parsed. The caller can use the PushReport
// to make this decision, as it includes all of the errors that were encountered,
// as well as the number of items dropped and submitted.
// Deprecated: [v0.62.0] will be removed after v0.63.0. Use LogsToLokiRequests instead.
func LogsToLoki(ld plog.Logs) (*logproto.PushRequest, *PushReport) {
report := &PushReport{}

Expand Down
Loading