Skip to content

Commit 74170c0

Browse files
authored
[exporter/file] add encoding extension support (#31774)
**Description:** This adds support to set an encoding extension on the file exporter, overriding the format type.
1 parent e957541 commit 74170c0

File tree

11 files changed

+220
-15
lines changed

11 files changed

+220
-15
lines changed

.chloggen/encoding_fileexporter.yaml

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: fileexporter
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Adopt the encoding extension with the file exporter.
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [31774]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: []

cmd/configschema/go.mod

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1151,3 +1151,7 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/stor
11511151
replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage/dbstorage => ../../extension/storage/dbstorage
11521152

11531153
replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage => ../../extension/storage
1154+
1155+
replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/otlpencodingextension => ../../extension/encoding/otlpencodingextension
1156+
1157+
replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding => ../../extension/encoding

exporter/fileexporter/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ The following settings are optional:
4747
- localtime : [default: false (use UTC)] whether or not the timestamps in backup files is formatted according to the host's local time.
4848

4949
- `format`[default: json]: define the data format of encoded telemetry data. The setting can be overridden with `proto`.
50+
- `encoding`[default: none]: if specified, uses an encoding extension to encode telemetry data. Overrides `format`.
5051
- `append`[default: `false`] defines whether append to the file (`true`) or truncate (`false`). If `append: true` is set then setting `rotation` or `compression` is currently not supported.
5152
- `compression`[no default]: the compression algorithm used when exporting telemetry data to file. Supported compression algorithms:`zstd`
5253
- `flush_interval`[default: 1s]: `time.Duration` interval between flushes. See [time.ParseDuration](https://pkg.go.dev/time#ParseDuration) for valid formats.

exporter/fileexporter/config.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,10 @@ type Config struct {
4040
// - proto: OTLP binary protobuf bytes.
4141
FormatType string `mapstructure:"format"`
4242

43+
// Encoding defines the encoding of the telemetry data.
44+
// If specified, it overrides `FormatType` and applies an encoding extension.
45+
Encoding *component.ID `mapstructure:"encoding"`
46+
4347
// Compression Codec used to export telemetry data
4448
// Supported compression algorithms:`zstd`
4549
Compression string `mapstructure:"compression"`
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package fileexporter
5+
6+
import (
7+
"context"
8+
"os"
9+
"path/filepath"
10+
"testing"
11+
"time"
12+
13+
"github.com/stretchr/testify/require"
14+
"go.opentelemetry.io/collector/component"
15+
"go.opentelemetry.io/collector/component/componenttest"
16+
"go.opentelemetry.io/collector/exporter/exportertest"
17+
"go.opentelemetry.io/collector/extension/extensiontest"
18+
"go.opentelemetry.io/collector/pdata/pcommon"
19+
"go.opentelemetry.io/collector/pdata/plog"
20+
"go.opentelemetry.io/collector/pdata/pmetric"
21+
"go.opentelemetry.io/collector/pdata/ptrace"
22+
23+
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/otlpencodingextension"
24+
)
25+
26+
type hostWithEncoding struct {
27+
encodings map[component.ID]component.Component
28+
}
29+
30+
func (h hostWithEncoding) GetFactory(_ component.Kind, _ component.Type) component.Factory {
31+
panic("unsupported")
32+
}
33+
34+
func (h hostWithEncoding) GetExtensions() map[component.ID]component.Component {
35+
return h.encodings
36+
}
37+
38+
func (h hostWithEncoding) GetExporters() map[component.DataType]map[component.ID]component.Component {
39+
panic("unsupported")
40+
}
41+
42+
func TestEncoding(t *testing.T) {
43+
f := NewFactory()
44+
cfg := f.CreateDefaultConfig().(*Config)
45+
cfg.Path = filepath.Join(t.TempDir(), "encoding.txt")
46+
id := component.MustNewID("otlpjson")
47+
cfg.Encoding = &id
48+
49+
ef := otlpencodingextension.NewFactory()
50+
efCfg := ef.CreateDefaultConfig().(*otlpencodingextension.Config)
51+
efCfg.Protocol = "otlp_json"
52+
ext, err := ef.CreateExtension(context.Background(), extensiontest.NewNopCreateSettings(), efCfg)
53+
require.NoError(t, err)
54+
require.NoError(t, ext.Start(context.Background(), componenttest.NewNopHost()))
55+
56+
me, err := f.CreateMetricsExporter(context.Background(), exportertest.NewNopCreateSettings(), cfg)
57+
require.NoError(t, err)
58+
te, err := f.CreateTracesExporter(context.Background(), exportertest.NewNopCreateSettings(), cfg)
59+
require.NoError(t, err)
60+
le, err := f.CreateLogsExporter(context.Background(), exportertest.NewNopCreateSettings(), cfg)
61+
require.NoError(t, err)
62+
host := hostWithEncoding{
63+
map[component.ID]component.Component{id: ext},
64+
}
65+
require.NoError(t, me.Start(context.Background(), host))
66+
require.NoError(t, te.Start(context.Background(), host))
67+
require.NoError(t, le.Start(context.Background(), host))
68+
t.Cleanup(func() {
69+
70+
})
71+
72+
require.NoError(t, me.ConsumeMetrics(context.Background(), generateMetrics()))
73+
require.NoError(t, te.ConsumeTraces(context.Background(), generateTraces()))
74+
require.NoError(t, le.ConsumeLogs(context.Background(), generateLogs()))
75+
76+
require.NoError(t, me.Shutdown(context.Background()))
77+
require.NoError(t, te.Shutdown(context.Background()))
78+
require.NoError(t, le.Shutdown(context.Background()))
79+
80+
b, err := os.ReadFile(cfg.Path)
81+
require.NoError(t, err)
82+
require.Contains(t, string(b), `{"resourceMetrics":`)
83+
require.Contains(t, string(b), `{"resourceSpans":`)
84+
require.Contains(t, string(b), `{"resourceLogs":`)
85+
}
86+
87+
func generateLogs() plog.Logs {
88+
logs := plog.NewLogs()
89+
rl := logs.ResourceLogs().AppendEmpty()
90+
rl.Resource().Attributes().PutStr("resource", "R1")
91+
l := rl.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()
92+
l.Body().SetStr("test log message")
93+
l.SetTimestamp(pcommon.NewTimestampFromTime(time.Now()))
94+
return logs
95+
}
96+
97+
func generateMetrics() pmetric.Metrics {
98+
metrics := pmetric.NewMetrics()
99+
rm := metrics.ResourceMetrics().AppendEmpty()
100+
rm.Resource().Attributes().PutStr("resource", "R1")
101+
m := rm.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty()
102+
m.SetName("test_metric")
103+
dp := m.SetEmptyGauge().DataPoints().AppendEmpty()
104+
dp.Attributes().PutStr("test_attr", "value_1")
105+
dp.SetIntValue(123)
106+
dp.SetTimestamp(pcommon.NewTimestampFromTime(time.Now()))
107+
return metrics
108+
}
109+
110+
func generateTraces() ptrace.Traces {
111+
traces := ptrace.NewTraces()
112+
rs := traces.ResourceSpans().AppendEmpty()
113+
rs.Resource().Attributes().PutStr("resource", "R1")
114+
span := rs.ScopeSpans().AppendEmpty().Spans().AppendEmpty()
115+
span.Attributes().PutStr("test_attr", "value_1")
116+
span.SetName("test_span")
117+
span.SetStartTimestamp(pcommon.NewTimestampFromTime(time.Now().Add(-1 * time.Second)))
118+
span.SetEndTimestamp(pcommon.NewTimestampFromTime(time.Now()))
119+
return traces
120+
}

exporter/fileexporter/file_exporter.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,14 @@ func (e *fileExporter) consumeLogs(_ context.Context, ld plog.Logs) error {
4444
}
4545

4646
// Start starts the flush timer if set.
47-
func (e *fileExporter) Start(_ context.Context, _ component.Host) error {
48-
e.marshaller = newMarshaller(e.conf)
47+
func (e *fileExporter) Start(_ context.Context, host component.Host) error {
48+
var err error
49+
e.marshaller, err = newMarshaller(e.conf, host)
50+
if err != nil {
51+
return err
52+
}
4953
export := buildExportFunc(e.conf)
5054

51-
var err error
5255
e.writer, err = newFileWriter(e.conf.Path, e.conf.Append, e.conf.Rotation, e.conf.FlushInterval, export)
5356
if err != nil {
5457
return err

exporter/fileexporter/go.mod

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,15 @@ go 1.21
55
require (
66
github.com/hashicorp/golang-lru/v2 v2.0.7
77
github.com/klauspost/compress v1.17.7
8+
github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/otlpencodingextension v0.96.0
89
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.96.0
910
github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent v0.96.0
1011
github.com/stretchr/testify v1.9.0
1112
go.opentelemetry.io/collector/component v0.96.1-0.20240315172937-3b5aee0c7a16
1213
go.opentelemetry.io/collector/confmap v0.96.1-0.20240315172937-3b5aee0c7a16
1314
go.opentelemetry.io/collector/consumer v0.96.1-0.20240315172937-3b5aee0c7a16
1415
go.opentelemetry.io/collector/exporter v0.96.1-0.20240315172937-3b5aee0c7a16
16+
go.opentelemetry.io/collector/extension v0.96.1-0.20240315172937-3b5aee0c7a16
1517
go.opentelemetry.io/collector/pdata v1.3.1-0.20240315172937-3b5aee0c7a16
1618
go.opentelemetry.io/otel/metric v1.24.0
1719
go.opentelemetry.io/otel/trace v1.24.0
@@ -24,7 +26,7 @@ require (
2426
github.com/beorn7/perks v1.0.1 // indirect
2527
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
2628
github.com/cespare/xxhash/v2 v2.2.0 // indirect
27-
github.com/davecgh/go-spew v1.1.1 // indirect
29+
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
2830
github.com/go-logr/logr v1.4.1 // indirect
2931
github.com/go-logr/stdr v1.2.2 // indirect
3032
github.com/go-viper/mapstructure/v2 v2.0.0-alpha.1 // indirect
@@ -39,15 +41,15 @@ require (
3941
github.com/mitchellh/reflectwalk v1.0.2 // indirect
4042
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
4143
github.com/modern-go/reflect2 v1.0.2 // indirect
42-
github.com/pmezard/go-difflib v1.0.0 // indirect
44+
github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding v0.96.0 // indirect
45+
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
4346
github.com/prometheus/client_golang v1.19.0 // indirect
4447
github.com/prometheus/client_model v0.6.0 // indirect
4548
github.com/prometheus/common v0.48.0 // indirect
4649
github.com/prometheus/procfs v0.12.0 // indirect
4750
go.opentelemetry.io/collector v0.96.1-0.20240315172937-3b5aee0c7a16 // indirect
48-
go.opentelemetry.io/collector/config/configretry v0.96.1-0.20240315172937-3b5aee0c7a16 // indirect
51+
go.opentelemetry.io/collector/config/configretry v0.96.1-0.20240306115632-b2693620eff6 // indirect
4952
go.opentelemetry.io/collector/config/configtelemetry v0.96.1-0.20240315172937-3b5aee0c7a16 // indirect
50-
go.opentelemetry.io/collector/extension v0.96.1-0.20240315172937-3b5aee0c7a16 // indirect
5153
go.opentelemetry.io/collector/receiver v0.96.1-0.20240315172937-3b5aee0c7a16 // indirect
5254
go.opentelemetry.io/otel v1.24.0 // indirect
5355
go.opentelemetry.io/otel/exporters/prometheus v0.46.0 // indirect
@@ -78,3 +80,7 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil
7880
replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest => ../../pkg/pdatatest
7981

8082
replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden => ../../pkg/golden
83+
84+
replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/otlpencodingextension => ../../extension/encoding/otlpencodingextension
85+
86+
replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding => ../../extension/encoding

exporter/fileexporter/go.sum

Lines changed: 6 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

exporter/fileexporter/grouping_file_exporter.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -244,8 +244,12 @@ func group[T any](e *groupingFileExporter, groups map[string][]T, resource pcomm
244244
}
245245

246246
// Start initializes and starts the exporter.
247-
func (e *groupingFileExporter) Start(context.Context, component.Host) error {
248-
e.marshaller = newMarshaller(e.conf)
247+
func (e *groupingFileExporter) Start(_ context.Context, host component.Host) error {
248+
var err error
249+
e.marshaller, err = newMarshaller(e.conf, host)
250+
if err != nil {
251+
return err
252+
}
249253
export := buildExportFunc(e.conf)
250254

251255
pathParts := strings.Split(e.conf.Path, "*")

exporter/fileexporter/marshaller.go

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@
44
package fileexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/fileexporter"
55

66
import (
7+
"errors"
8+
"fmt"
9+
10+
"go.opentelemetry.io/collector/component"
711
"go.opentelemetry.io/collector/pdata/plog"
812
"go.opentelemetry.io/collector/pdata/pmetric"
913
"go.opentelemetry.io/collector/pdata/ptrace"
@@ -34,18 +38,38 @@ type marshaller struct {
3438
formatType string
3539
}
3640

37-
func newMarshaller(conf *Config) *marshaller {
41+
func newMarshaller(conf *Config, host component.Host) (*marshaller, error) {
42+
if conf.Encoding != nil {
43+
encoding := host.GetExtensions()[*conf.Encoding]
44+
if encoding == nil {
45+
return nil, fmt.Errorf("unknown encoding %q", conf.Encoding)
46+
}
47+
// cast with ok to avoid panics.
48+
tm, _ := encoding.(ptrace.Marshaler)
49+
pm, _ := encoding.(pmetric.Marshaler)
50+
lm, _ := encoding.(plog.Marshaler)
51+
return &marshaller{
52+
tracesMarshaler: tm,
53+
metricsMarshaler: pm,
54+
logsMarshaler: lm,
55+
compression: conf.Compression,
56+
compressor: buildCompressor(conf.Compression),
57+
}, nil
58+
}
3859
return &marshaller{
3960
formatType: conf.FormatType,
4061
tracesMarshaler: tracesMarshalers[conf.FormatType],
4162
metricsMarshaler: metricsMarshalers[conf.FormatType],
4263
logsMarshaler: logsMarshalers[conf.FormatType],
4364
compression: conf.Compression,
4465
compressor: buildCompressor(conf.Compression),
45-
}
66+
}, nil
4667
}
4768

4869
func (m *marshaller) marshalTraces(td ptrace.Traces) ([]byte, error) {
70+
if m.metricsMarshaler == nil {
71+
return nil, errors.New("traces are not supported by encoding")
72+
}
4973
buf, err := m.tracesMarshaler.MarshalTraces(td)
5074
if err != nil {
5175
return nil, err
@@ -55,6 +79,9 @@ func (m *marshaller) marshalTraces(td ptrace.Traces) ([]byte, error) {
5579
}
5680

5781
func (m *marshaller) marshalMetrics(md pmetric.Metrics) ([]byte, error) {
82+
if m.metricsMarshaler == nil {
83+
return nil, errors.New("metrics are not supported by encoding")
84+
}
5885
buf, err := m.metricsMarshaler.MarshalMetrics(md)
5986
if err != nil {
6087
return nil, err
@@ -64,6 +91,9 @@ func (m *marshaller) marshalMetrics(md pmetric.Metrics) ([]byte, error) {
6491
}
6592

6693
func (m *marshaller) marshalLogs(ld plog.Logs) ([]byte, error) {
94+
if m.metricsMarshaler == nil {
95+
return nil, errors.New("logs are not supported by encoding")
96+
}
6797
buf, err := m.logsMarshaler.MarshalLogs(ld)
6898
if err != nil {
6999
return nil, err

0 commit comments

Comments
 (0)