Skip to content

Commit 3667e9f

Browse files
committed
Expose metrics for batch sizes
Signed-off-by: Israel Blancas <[email protected]>
1 parent c9aaed8 commit 3667e9f

31 files changed

+446
-132
lines changed

.chloggen/12894.yaml

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
7+
component: exporterhelper
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Expose otelcol_exporter_batch_send_size_bytes from the default batcher.
11+
12+
# One or more tracking issues or pull requests related to the change
13+
issues: [12894]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: |
19+
Added the following metric to the exporter helper default batcher:
20+
- otelcol_exporter_batch_send_size_bytes: Batches size in the queue (in bytes)
21+
22+
23+
# Optional: The change log or logs in which this entry should be included.
24+
# e.g. '[user]' or '[user, api]'
25+
# Include 'user' if the change is relevant to end users.
26+
# Include 'api' if there is a change to a library API.
27+
# Default: '[user]'
28+
change_logs: [user]

exporter/exporterhelper/documentation.md

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,14 @@
66

77
The following telemetry is emitted by this component.
88

9+
### otelcol_exporter_batch_send_size_bytes
10+
11+
Batches size in the queue (in bytes). [alpha]
12+
13+
| Unit | Metric Type | Value Type |
14+
| ---- | ----------- | ---------- |
15+
| {By} | Histogram | Int |
16+
917
### otelcol_exporter_enqueue_failed_log_records
1018

1119
Number of log records failed to be added to the sending queue. [alpha]
@@ -32,15 +40,15 @@ Number of spans failed to be added to the sending queue. [alpha]
3240

3341
### otelcol_exporter_queue_capacity
3442

35-
Fixed capacity of the retry queue (in batches) [alpha]
43+
Fixed capacity of the retry queue (in batches). [alpha]
3644

3745
| Unit | Metric Type | Value Type |
3846
| ---- | ----------- | ---------- |
3947
| {batches} | Gauge | Int |
4048

4149
### otelcol_exporter_queue_size
4250

43-
Current size of the retry queue (in batches) [alpha]
51+
Current size of the retry queue (in batches). [alpha]
4452

4553
| Unit | Metric Type | Value Type |
4654
| ---- | ----------- | ---------- |

exporter/exporterhelper/internal/base_exporter.go

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"go.opentelemetry.io/collector/config/configretry"
1515
"go.opentelemetry.io/collector/consumer"
1616
"go.opentelemetry.io/collector/exporter"
17+
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/metadata"
1718
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
1819
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
1920
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/sender"
@@ -49,20 +50,27 @@ type BaseExporter struct {
4950
queueBatchSettings QueueBatchSettings[request.Request]
5051
queueCfg queuebatch.Config
5152
batcherCfg BatcherConfig
53+
tb *metadata.TelemetryBuilder
5254
}
5355

5456
func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, pusher sender.SendFunc[request.Request], options ...Option) (*BaseExporter, error) {
57+
var err error
5558
be := &BaseExporter{
5659
Set: set,
5760
timeoutCfg: NewDefaultTimeoutConfig(),
5861
}
5962

6063
for _, op := range options {
61-
if err := op(be); err != nil {
64+
if err = op(be); err != nil {
6265
return nil, err
6366
}
6467
}
6568

69+
be.tb, err = metadata.NewTelemetryBuilder(set.TelemetrySettings)
70+
if err != nil {
71+
return nil, err
72+
}
73+
6674
// Consumer Sender is always initialized.
6775
be.firstSender = sender.NewSender(pusher)
6876

@@ -77,11 +85,7 @@ func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, pusher sende
7785
be.firstSender = be.RetrySender
7886
}
7987

80-
var err error
81-
be.firstSender, err = newObsReportSender(set, signal, be.firstSender)
82-
if err != nil {
83-
return nil, err
84-
}
88+
be.firstSender = newObsReportSender(set, signal, be.firstSender, be.tb)
8589

8690
if be.batcherCfg.Enabled || be.queueCfg.Batch != nil {
8791
// Batcher mutates the data.
@@ -90,12 +94,13 @@ func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, pusher sende
9094

9195
if be.queueCfg.Enabled || be.batcherCfg.Enabled {
9296
qSet := queuebatch.Settings[request.Request]{
93-
Signal: signal,
94-
ID: set.ID,
95-
Telemetry: set.TelemetrySettings,
96-
Encoding: be.queueBatchSettings.Encoding,
97-
Sizers: be.queueBatchSettings.Sizers,
98-
Partitioner: be.queueBatchSettings.Partitioner,
97+
Signal: signal,
98+
ID: set.ID,
99+
Telemetry: set.TelemetrySettings,
100+
Encoding: be.queueBatchSettings.Encoding,
101+
Sizers: be.queueBatchSettings.Sizers,
102+
Partitioner: be.queueBatchSettings.Partitioner,
103+
TelemetryBuilder: be.tb,
99104
}
100105
be.QueueSender, err = NewQueueSender(qSet, be.queueCfg, be.batcherCfg, be.ExportFailureMessage, be.firstSender)
101106
if err != nil {

exporter/exporterhelper/internal/base_exporter_test.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,9 @@ import (
1010

1111
"github.com/stretchr/testify/assert"
1212
"github.com/stretchr/testify/require"
13+
"go.opentelemetry.io/otel/metric"
14+
embeddedmetric "go.opentelemetry.io/otel/metric/embedded"
15+
noopmetric "go.opentelemetry.io/otel/metric/noop"
1316
"go.uber.org/zap"
1417
"go.uber.org/zap/zaptest/observer"
1518

@@ -143,6 +146,29 @@ func TestQueueRetryWithDisabledQueue(t *testing.T) {
143146
}
144147
}
145148

149+
type failingMeterProvider struct {
150+
embeddedmetric.MeterProvider
151+
}
152+
153+
func (m failingMeterProvider) Meter(_ string, _ ...metric.MeterOption) metric.Meter {
154+
return failingMeter{}
155+
}
156+
157+
type failingMeter struct {
158+
noopmetric.Meter
159+
}
160+
161+
func (m failingMeter) Int64Counter(_ string, _ ...metric.Int64CounterOption) (metric.Int64Counter, error) {
162+
return nil, errors.New("failed to create counter")
163+
}
164+
165+
func TestBaseExporterWithTelemetryError(t *testing.T) {
166+
set := exportertest.NewNopSettings(exportertest.NopType)
167+
set.MeterProvider = failingMeterProvider{}
168+
_, err := NewBaseExporter(set, pipeline.SignalMetrics, noopExport)
169+
require.Error(t, err)
170+
}
171+
146172
func errExport(context.Context, request.Request) error {
147173
return errors.New("my error")
148174
}

exporter/exporterhelper/internal/metadata/generated_telemetry.go

Lines changed: 9 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

exporter/exporterhelper/internal/metadatatest/generated_telemetrytest.go

Lines changed: 17 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

exporter/exporterhelper/internal/metadatatest/generated_telemetrytest_test.go

Lines changed: 4 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

exporter/exporterhelper/internal/obs_report_sender.go

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -49,12 +49,7 @@ type obsReportSender[K request.Request] struct {
4949
next sender.Sender[K]
5050
}
5151

52-
func newObsReportSender[K request.Request](set exporter.Settings, signal pipeline.Signal, next sender.Sender[K]) (sender.Sender[K], error) {
53-
telemetryBuilder, err := metadata.NewTelemetryBuilder(set.TelemetrySettings)
54-
if err != nil {
55-
return nil, err
56-
}
57-
52+
func newObsReportSender[K request.Request](set exporter.Settings, signal pipeline.Signal, next sender.Sender[K], tb *metadata.TelemetryBuilder) sender.Sender[K] {
5853
idStr := set.ID.String()
5954
expAttr := attribute.String(ExporterKey, idStr)
6055

@@ -68,26 +63,27 @@ func newObsReportSender[K request.Request](set exporter.Settings, signal pipelin
6863

6964
switch signal {
7065
case pipeline.SignalTraces:
71-
or.itemsSentInst = telemetryBuilder.ExporterSentSpans
72-
or.itemsFailedInst = telemetryBuilder.ExporterSendFailedSpans
66+
or.itemsSentInst = tb.ExporterSentSpans
67+
or.itemsFailedInst = tb.ExporterSendFailedSpans
7368

7469
case pipeline.SignalMetrics:
75-
or.itemsSentInst = telemetryBuilder.ExporterSentMetricPoints
76-
or.itemsFailedInst = telemetryBuilder.ExporterSendFailedMetricPoints
70+
or.itemsSentInst = tb.ExporterSentMetricPoints
71+
or.itemsFailedInst = tb.ExporterSendFailedMetricPoints
7772

7873
case pipeline.SignalLogs:
79-
or.itemsSentInst = telemetryBuilder.ExporterSentLogRecords
80-
or.itemsFailedInst = telemetryBuilder.ExporterSendFailedLogRecords
74+
or.itemsSentInst = tb.ExporterSentLogRecords
75+
or.itemsFailedInst = tb.ExporterSendFailedLogRecords
8176
}
8277

83-
return or, nil
78+
return or
8479
}
8580

8681
func (ors *obsReportSender[K]) Send(ctx context.Context, req K) error {
8782
// Have to read the number of items before sending the request since the request can
8883
// be modified by the downstream components like the batcher.
8984
c := ors.startOp(ctx)
9085
items := req.ItemsCount()
86+
9187
// Forward the data to the next consumer (this pusher is the next).
9288
err := ors.next.Send(c, req)
9389
ors.endOp(c, items, err)

exporter/exporterhelper/internal/obs_report_sender_test.go

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"go.opentelemetry.io/collector/component"
1919
"go.opentelemetry.io/collector/component/componenttest"
2020
"go.opentelemetry.io/collector/exporter"
21+
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/metadata"
2122
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/metadatatest"
2223
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
2324
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/requesttest"
@@ -38,13 +39,17 @@ func TestExportTraceDataOp(t *testing.T) {
3839
parentCtx, parentSpan := tt.NewTelemetrySettings().TracerProvider.Tracer("test").Start(context.Background(), t.Name())
3940
defer parentSpan.End()
4041

42+
tb, err := metadata.NewTelemetryBuilder(tt.NewTelemetrySettings())
43+
require.NoError(t, err)
44+
t.Cleanup(func() { tb.Shutdown() })
45+
4146
var exporterErr error
42-
obsrep, err := newObsReportSender(
47+
obsrep := newObsReportSender(
4348
exporter.Settings{ID: exporterID, TelemetrySettings: tt.NewTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()},
4449
pipeline.SignalTraces,
4550
sender.NewSender(func(context.Context, request.Request) error { return exporterErr }),
51+
tb,
4652
)
47-
require.NoError(t, err)
4853

4954
params := []testParams{
5055
{items: 22, err: nil},
@@ -106,13 +111,17 @@ func TestExportMetricsOp(t *testing.T) {
106111
parentCtx, parentSpan := tt.NewTelemetrySettings().TracerProvider.Tracer("test").Start(context.Background(), t.Name())
107112
defer parentSpan.End()
108113

114+
tb, err := metadata.NewTelemetryBuilder(tt.NewTelemetrySettings())
115+
require.NoError(t, err)
116+
t.Cleanup(func() { tb.Shutdown() })
117+
109118
var exporterErr error
110-
obsrep, err := newObsReportSender(
119+
obsrep := newObsReportSender(
111120
exporter.Settings{ID: exporterID, TelemetrySettings: tt.NewTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()},
112121
pipeline.SignalMetrics,
113122
sender.NewSender(func(context.Context, request.Request) error { return exporterErr }),
123+
tb,
114124
)
115-
require.NoError(t, err)
116125

117126
params := []testParams{
118127
{items: 17, err: nil},
@@ -174,13 +183,17 @@ func TestExportLogsOp(t *testing.T) {
174183
parentCtx, parentSpan := tt.NewTelemetrySettings().TracerProvider.Tracer("test").Start(context.Background(), t.Name())
175184
defer parentSpan.End()
176185

186+
tb, err := metadata.NewTelemetryBuilder(tt.NewTelemetrySettings())
187+
require.NoError(t, err)
188+
t.Cleanup(func() { tb.Shutdown() })
189+
177190
var exporterErr error
178-
obsrep, err := newObsReportSender(
191+
obsrep := newObsReportSender(
179192
exporter.Settings{ID: exporterID, TelemetrySettings: tt.NewTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()},
180193
pipeline.SignalLogs,
181194
sender.NewSender(func(context.Context, request.Request) error { return exporterErr }),
195+
tb,
182196
)
183-
require.NoError(t, err)
184197

185198
params := []testParams{
186199
{items: 17, err: nil},

0 commit comments

Comments
 (0)