Skip to content

Commit a4e7796

Browse files
felixbarnycarsonipChrsMark
authored andcommitted
[elasticsearchexporter] Direct serialization without objmodel in OTel mode (open-telemetry#37032)
Directly serializes pdata to JSON in OTel mode * Improved performance as no `objmodel.Document` needs to be created first * Fixes issue discovered in open-telemetry#37021 where map bodies with dotted field names are de-dotted --------- Co-authored-by: Carson Ip <[email protected]> Co-authored-by: Christos Markou <[email protected]>
1 parent bb3bfe3 commit a4e7796

File tree

11 files changed

+1075
-612
lines changed

11 files changed

+1075
-612
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: elasticsearchexporter
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: More efficient JSON encoding for OTel mode
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [37032]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: Increases throughput for metrics by 2x and for logs and traces by 3x
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

exporter/elasticsearchexporter/exporter.go

Lines changed: 56 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
package elasticsearchexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter"
55

66
import (
7-
"bytes"
87
"context"
98
"errors"
109
"fmt"
@@ -20,7 +19,7 @@ import (
2019
"go.opentelemetry.io/collector/pdata/ptrace"
2120
"go.uber.org/zap"
2221

23-
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/objmodel"
22+
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/pool"
2423
)
2524

2625
type elasticsearchExporter struct {
@@ -36,6 +35,8 @@ type elasticsearchExporter struct {
3635

3736
wg sync.WaitGroup // active sessions
3837
bulkIndexer bulkIndexer
38+
39+
bufferPool *pool.BufferPool
3940
}
4041

4142
func newExporter(
@@ -69,6 +70,7 @@ func newExporter(
6970
model: model,
7071
logstashFormat: cfg.LogstashFormat,
7172
otel: otel,
73+
bufferPool: pool.NewBufferPool(),
7274
}
7375
}
7476

@@ -173,11 +175,14 @@ func (e *elasticsearchExporter) pushLogRecord(
173175
fIndex = formattedIndex
174176
}
175177

176-
document, err := e.model.encodeLog(resource, resourceSchemaURL, record, scope, scopeSchemaURL)
178+
buf := e.bufferPool.NewPooledBuffer()
179+
err := e.model.encodeLog(resource, resourceSchemaURL, record, scope, scopeSchemaURL, buf.Buffer)
177180
if err != nil {
181+
buf.Recycle()
178182
return fmt.Errorf("failed to encode log event: %w", err)
179183
}
180-
return bulkIndexerSession.Add(ctx, fIndex, bytes.NewReader(document), nil)
184+
// not recycling after Add returns an error as we don't know if it's already recycled
185+
return bulkIndexerSession.Add(ctx, fIndex, buf, nil)
181186
}
182187

183188
func (e *elasticsearchExporter) pushMetricsData(
@@ -193,21 +198,18 @@ func (e *elasticsearchExporter) pushMetricsData(
193198
}
194199
defer session.End()
195200

196-
var (
197-
validationErrs []error // log instead of returning these so that upstream does not retry
198-
errs []error
199-
)
201+
var errs []error
200202
resourceMetrics := metrics.ResourceMetrics()
201203
for i := 0; i < resourceMetrics.Len(); i++ {
202204
resourceMetric := resourceMetrics.At(i)
203205
resource := resourceMetric.Resource()
204206
scopeMetrics := resourceMetric.ScopeMetrics()
205207

206-
resourceDocs := make(map[string]map[uint32]objmodel.Document)
207-
208208
for j := 0; j < scopeMetrics.Len(); j++ {
209+
var validationErrs []error // log instead of returning these so that upstream does not retry
209210
scopeMetrics := scopeMetrics.At(j)
210211
scope := scopeMetrics.Scope()
212+
groupedDataPointsByIndex := make(map[string]map[uint32][]dataPoint)
211213
for k := 0; k < scopeMetrics.Metrics().Len(); k++ {
212214
metric := scopeMetrics.Metrics().At(k)
213215

@@ -216,13 +218,17 @@ func (e *elasticsearchExporter) pushMetricsData(
216218
if err != nil {
217219
return err
218220
}
219-
if _, ok := resourceDocs[fIndex]; !ok {
220-
resourceDocs[fIndex] = make(map[uint32]objmodel.Document)
221+
groupedDataPoints, ok := groupedDataPointsByIndex[fIndex]
222+
if !ok {
223+
groupedDataPoints = make(map[uint32][]dataPoint)
224+
groupedDataPointsByIndex[fIndex] = groupedDataPoints
221225
}
222-
223-
if err = e.model.upsertMetricDataPointValue(resourceDocs[fIndex], resource,
224-
resourceMetric.SchemaUrl(), scope, scopeMetrics.SchemaUrl(), metric, dp); err != nil {
225-
return err
226+
dpHash := e.model.hashDataPoint(dp)
227+
dataPoints, ok := groupedDataPoints[dpHash]
228+
if !ok {
229+
groupedDataPoints[dpHash] = []dataPoint{dp}
230+
} else {
231+
groupedDataPoints[dpHash] = append(dataPoints, dp)
226232
}
227233
return nil
228234
}
@@ -232,7 +238,7 @@ func (e *elasticsearchExporter) pushMetricsData(
232238
dps := metric.Sum().DataPoints()
233239
for l := 0; l < dps.Len(); l++ {
234240
dp := dps.At(l)
235-
if err := upsertDataPoint(newNumberDataPoint(dp)); err != nil {
241+
if err := upsertDataPoint(newNumberDataPoint(metric, dp)); err != nil {
236242
validationErrs = append(validationErrs, err)
237243
continue
238244
}
@@ -241,7 +247,7 @@ func (e *elasticsearchExporter) pushMetricsData(
241247
dps := metric.Gauge().DataPoints()
242248
for l := 0; l < dps.Len(); l++ {
243249
dp := dps.At(l)
244-
if err := upsertDataPoint(newNumberDataPoint(dp)); err != nil {
250+
if err := upsertDataPoint(newNumberDataPoint(metric, dp)); err != nil {
245251
validationErrs = append(validationErrs, err)
246252
continue
247253
}
@@ -254,7 +260,7 @@ func (e *elasticsearchExporter) pushMetricsData(
254260
dps := metric.ExponentialHistogram().DataPoints()
255261
for l := 0; l < dps.Len(); l++ {
256262
dp := dps.At(l)
257-
if err := upsertDataPoint(newExponentialHistogramDataPoint(dp)); err != nil {
263+
if err := upsertDataPoint(newExponentialHistogramDataPoint(metric, dp)); err != nil {
258264
validationErrs = append(validationErrs, err)
259265
continue
260266
}
@@ -267,7 +273,7 @@ func (e *elasticsearchExporter) pushMetricsData(
267273
dps := metric.Histogram().DataPoints()
268274
for l := 0; l < dps.Len(); l++ {
269275
dp := dps.At(l)
270-
if err := upsertDataPoint(newHistogramDataPoint(dp)); err != nil {
276+
if err := upsertDataPoint(newHistogramDataPoint(metric, dp)); err != nil {
271277
validationErrs = append(validationErrs, err)
272278
continue
273279
}
@@ -276,37 +282,35 @@ func (e *elasticsearchExporter) pushMetricsData(
276282
dps := metric.Summary().DataPoints()
277283
for l := 0; l < dps.Len(); l++ {
278284
dp := dps.At(l)
279-
if err := upsertDataPoint(newSummaryDataPoint(dp)); err != nil {
285+
if err := upsertDataPoint(newSummaryDataPoint(metric, dp)); err != nil {
280286
validationErrs = append(validationErrs, err)
281287
continue
282288
}
283289
}
284290
}
285291
}
286-
}
287292

288-
if len(validationErrs) > 0 {
289-
e.Logger.Warn("validation errors", zap.Error(errors.Join(validationErrs...)))
290-
}
291-
292-
for fIndex, docs := range resourceDocs {
293-
for _, doc := range docs {
294-
var (
295-
docBytes []byte
296-
err error
297-
)
298-
docBytes, err = e.model.encodeDocument(doc)
299-
if err != nil {
300-
errs = append(errs, err)
301-
continue
302-
}
303-
if err := session.Add(ctx, fIndex, bytes.NewReader(docBytes), doc.DynamicTemplates()); err != nil {
304-
if cerr := ctx.Err(); cerr != nil {
305-
return cerr
293+
for fIndex, groupedDataPoints := range groupedDataPointsByIndex {
294+
for _, dataPoints := range groupedDataPoints {
295+
buf := e.bufferPool.NewPooledBuffer()
296+
dynamicTemplates, err := e.model.encodeMetrics(resource, resourceMetric.SchemaUrl(), scope, scopeMetrics.SchemaUrl(), dataPoints, &validationErrs, buf.Buffer)
297+
if err != nil {
298+
buf.Recycle()
299+
errs = append(errs, err)
300+
continue
301+
}
302+
if err := session.Add(ctx, fIndex, buf, dynamicTemplates); err != nil {
303+
// not recycling after Add returns an error as we don't know if it's already recycled
304+
if cerr := ctx.Err(); cerr != nil {
305+
return cerr
306+
}
307+
errs = append(errs, err)
306308
}
307-
errs = append(errs, err)
308309
}
309310
}
311+
if len(validationErrs) > 0 {
312+
e.Logger.Warn("validation errors", zap.Error(errors.Join(validationErrs...)))
313+
}
310314
}
311315
}
312316

@@ -411,11 +415,14 @@ func (e *elasticsearchExporter) pushTraceRecord(
411415
fIndex = formattedIndex
412416
}
413417

414-
document, err := e.model.encodeSpan(resource, resourceSchemaURL, span, scope, scopeSchemaURL)
418+
buf := e.bufferPool.NewPooledBuffer()
419+
err := e.model.encodeSpan(resource, resourceSchemaURL, span, scope, scopeSchemaURL, buf.Buffer)
415420
if err != nil {
421+
buf.Recycle()
416422
return fmt.Errorf("failed to encode trace record: %w", err)
417423
}
418-
return bulkIndexerSession.Add(ctx, fIndex, bytes.NewReader(document), nil)
424+
// not recycling after Add returns an error as we don't know if it's already recycled
425+
return bulkIndexerSession.Add(ctx, fIndex, buf, nil)
419426
}
420427

421428
func (e *elasticsearchExporter) pushSpanEvent(
@@ -440,14 +447,12 @@ func (e *elasticsearchExporter) pushSpanEvent(
440447
}
441448
fIndex = formattedIndex
442449
}
443-
444-
document := e.model.encodeSpanEvent(resource, resourceSchemaURL, span, spanEvent, scope, scopeSchemaURL)
445-
if document == nil {
450+
buf := e.bufferPool.NewPooledBuffer()
451+
e.model.encodeSpanEvent(resource, resourceSchemaURL, span, spanEvent, scope, scopeSchemaURL, buf.Buffer)
452+
if buf.Buffer.Len() == 0 {
453+
buf.Recycle()
446454
return nil
447455
}
448-
docBytes, err := e.model.encodeDocument(*document)
449-
if err != nil {
450-
return err
451-
}
452-
return bulkIndexerSession.Add(ctx, fIndex, bytes.NewReader(docBytes), nil)
456+
// not recycling after Add returns an error as we don't know if it's already recycled
457+
return bulkIndexerSession.Add(ctx, fIndex, buf, nil)
453458
}

0 commit comments

Comments
 (0)