Skip to content

Merge observability report sender for all signals, remove duplicate code #12165

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions .chloggen/merge-obs-report.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporterhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Rename exporter span signal specific attributes (e.g. "sent_spans" / "send_failed_span") to "items.sent" / "items.failed".

# One or more tracking issues or pull requests related to the change
issues: [12165]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api, user]
6 changes: 3 additions & 3 deletions exporter/exporterhelper/internal/base_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ type BaseExporter struct {
}

func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, osf ObsrepSenderFactory, options ...Option) (*BaseExporter, error) {
obsReport, err := NewExporter(ObsReportSettings{ExporterSettings: set, Signal: signal})
obsReport, err := NewObsReport(ObsReportSettings{ExporterSettings: set, Signal: signal})
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -121,7 +121,7 @@ func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, osf ObsrepSe
return be, nil
}

// send sends the request using the first sender in the chain.
// Send sends the request using the first sender in the chain.
func (be *BaseExporter) Send(ctx context.Context, req internal.Request) error {
err := be.QueueSender.Send(ctx, req)
if err != nil {
Expand Down Expand Up @@ -282,7 +282,7 @@ func WithMarshaler(marshaler exporterqueue.Marshaler[internal.Request]) Option {
}
}

// withUnmarshaler is used to set the request unmarshaler for the new exporter helper.
// WithUnmarshaler is used to set the request unmarshaler for the new exporter helper.
// It must be provided as the first option when creating a new exporter helper.
func WithUnmarshaler(unmarshaler exporterqueue.Unmarshaler[internal.Request]) Option {
return func(o *BaseExporter) error {
Expand Down
141 changes: 141 additions & 0 deletions exporter/exporterhelper/internal/obs_report.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package internal // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal"

import (
"context"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"

"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/metadata"
"go.opentelemetry.io/collector/pipeline"
)

const (
// spanNameSep is duplicate between receiver and exporter.
spanNameSep = "/"

// ExporterKey used to identify exporters in metrics and traces.
ExporterKey = "exporter"

// DataTypeKey used to identify the data type in the queue size metric.
DataTypeKey = "data_type"

// ItemsSent used to track number of items sent by exporters.
ItemsSent = "items.sent"
// ItemsFailed used to track number of items that failed to be sent by exporters.
ItemsFailed = "items.failed"
)

// ObsReport is a helper to add observability to an exporter.
type ObsReport struct {
spanName string
tracer trace.Tracer
Signal pipeline.Signal

spanAttrs trace.SpanStartEventOption
metricAttr metric.MeasurementOption
TelemetryBuilder *metadata.TelemetryBuilder
enqueueFailedInst metric.Int64Counter
itemsSentInst metric.Int64Counter
itemsFailedInst metric.Int64Counter
}

// ObsReportSettings are settings for creating an ObsReport.
type ObsReportSettings struct {
ExporterSettings exporter.Settings
Signal pipeline.Signal
}

func NewObsReport(set ObsReportSettings) (*ObsReport, error) {
telemetryBuilder, err := metadata.NewTelemetryBuilder(set.ExporterSettings.TelemetrySettings)
if err != nil {
return nil, err
}

Check warning on line 59 in exporter/exporterhelper/internal/obs_report.go

View check run for this annotation

Codecov / codecov/patch

exporter/exporterhelper/internal/obs_report.go#L58-L59

Added lines #L58 - L59 were not covered by tests

idStr := set.ExporterSettings.ID.String()
expAttr := attribute.String(ExporterKey, idStr)

or := &ObsReport{
spanName: ExporterKey + spanNameSep + idStr + spanNameSep + set.Signal.String(),
tracer: metadata.Tracer(set.ExporterSettings.TelemetrySettings),
Signal: set.Signal,
spanAttrs: trace.WithAttributes(expAttr, attribute.String(DataTypeKey, set.Signal.String())),
metricAttr: metric.WithAttributeSet(attribute.NewSet(expAttr)),
TelemetryBuilder: telemetryBuilder,
}

switch set.Signal {
case pipeline.SignalTraces:
or.enqueueFailedInst = or.TelemetryBuilder.ExporterEnqueueFailedSpans
or.itemsSentInst = or.TelemetryBuilder.ExporterSentSpans
or.itemsFailedInst = or.TelemetryBuilder.ExporterSendFailedSpans

case pipeline.SignalMetrics:
or.enqueueFailedInst = or.TelemetryBuilder.ExporterEnqueueFailedMetricPoints
or.itemsSentInst = or.TelemetryBuilder.ExporterSentMetricPoints
or.itemsFailedInst = or.TelemetryBuilder.ExporterSendFailedMetricPoints

case pipeline.SignalLogs:
or.enqueueFailedInst = or.TelemetryBuilder.ExporterEnqueueFailedLogRecords
or.itemsSentInst = or.TelemetryBuilder.ExporterSentLogRecords
or.itemsFailedInst = or.TelemetryBuilder.ExporterSendFailedLogRecords
}

return or, nil
}

// StartOp creates the span used to trace the operation. Returning
// the updated context and the created span.
func (or *ObsReport) StartOp(ctx context.Context) context.Context {
ctx, _ = or.tracer.Start(ctx, or.spanName, or.spanAttrs)
return ctx
}

// EndOp completes the export operation that was started with StartOp.
func (or *ObsReport) EndOp(ctx context.Context, numLogRecords int, err error) {
numSent, numFailedToSend := toNumItems(numLogRecords, err)

// No metrics recorded for profiles.
if or.itemsSentInst != nil {
or.itemsSentInst.Add(ctx, numSent, or.metricAttr)
}
// No metrics recorded for profiles.
if or.itemsFailedInst != nil {
or.itemsFailedInst.Add(ctx, numFailedToSend, or.metricAttr)
}

span := trace.SpanFromContext(ctx)
defer span.End()
// End the span according to errors.
if span.IsRecording() {
span.SetAttributes(
attribute.Int64(ItemsSent, numSent),
attribute.Int64(ItemsFailed, numFailedToSend),
)
if err != nil {
span.SetStatus(codes.Error, err.Error())
}
}
}

func toNumItems(numExportedItems int, err error) (int64, int64) {
if err != nil {
return 0, int64(numExportedItems)
}
return int64(numExportedItems), 0
}

func (or *ObsReport) RecordEnqueueFailure(ctx context.Context, failed int64) {
// No metrics recorded for profiles.
if or.enqueueFailedInst == nil {
return
}

Check warning on line 138 in exporter/exporterhelper/internal/obs_report.go

View check run for this annotation

Codecov / codecov/patch

exporter/exporterhelper/internal/obs_report.go#L137-L138

Added lines #L137 - L138 were not covered by tests

or.enqueueFailedInst.Add(ctx, failed, or.metricAttr)
}
28 changes: 28 additions & 0 deletions exporter/exporterhelper/internal/obs_report_sender.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package internal // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal"

import (
"context"

"go.opentelemetry.io/collector/exporter/internal"
)

type obsReportSender[K internal.Request] struct {
BaseSender[K]
obsrep *ObsReport
}

func NewObsReportSender[K internal.Request](obsrep *ObsReport) Sender[K] {
return &obsReportSender[K]{obsrep: obsrep}
}

func (ors *obsReportSender[K]) Send(ctx context.Context, req K) error {
c := ors.obsrep.StartOp(ctx)
items := req.ItemsCount()
// Forward the data to the next consumer (this pusher is the next).
err := ors.NextSender.Send(c, req)
ors.obsrep.EndOp(c, items, err)
return err
}
Loading
Loading