Skip to content

Commit d2fd181

Browse files
author
Jacob Marble
authored
[influxdb] update InfluxDB receiver and exporter (#18080)
Update InfluxDB dependencies (influxdbreceiver and influxdbexporter). Also update influxexporter to adopt to breaking API changes in the github.com/influxdata/influxdb-observability line protocol converter and writer.
1 parent fb9baa0 commit d2fd181

File tree

16 files changed

+223
-155
lines changed

16 files changed

+223
-155
lines changed
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
2+
change_type: enhancement
3+
4+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
5+
component: influxdbexporter
6+
7+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
8+
note: update influxdb-observability and influxdbexporter, to better utilize InfluxDB/IOx
9+
10+
# One or more tracking issues related to the change
11+
issues: [18080]
12+
13+
# (Optional) One or more lines of additional information to render under the primary note.
14+
# These lines will be padded with 2 spaces and then inserted directly into the document.
15+
# Use pipe (|) for multiline entries.
16+
subtext:
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
2+
change_type: enhancement
3+
4+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
5+
component: influxdbreceiver
6+
7+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
8+
note: routine update to dependency influxdb-observability for influxdbreceiver
9+
10+
# One or more tracking issues related to the change
11+
issues: [18080]
12+
13+
# (Optional) One or more lines of additional information to render under the primary note.
14+
# These lines will be padded with 2 spaces and then inserted directly into the document.
15+
# Use pipe (|) for multiline entries.
16+
subtext:

cmd/configschema/go.mod

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -274,9 +274,9 @@ require (
274274
github.com/imdario/mergo v0.3.13 // indirect
275275
github.com/inconshreveable/mousetrap v1.0.1 // indirect
276276
github.com/influxdata/go-syslog/v3 v3.0.1-0.20210608084020-ac565dc76ba6 // indirect
277-
github.com/influxdata/influxdb-observability/common v0.2.35 // indirect
278-
github.com/influxdata/influxdb-observability/influx2otel v0.2.35 // indirect
279-
github.com/influxdata/influxdb-observability/otel2influx v0.2.35 // indirect
277+
github.com/influxdata/influxdb-observability/common v0.3.0 // indirect
278+
github.com/influxdata/influxdb-observability/influx2otel v0.3.0 // indirect
279+
github.com/influxdata/influxdb-observability/otel2influx v0.3.0 // indirect
280280
github.com/influxdata/line-protocol/v2 v2.2.1 // indirect
281281
github.com/influxdata/telegraf v1.16.3 // indirect
282282
github.com/ionos-cloud/sdk-go/v6 v6.1.3 // indirect

cmd/configschema/go.sum

Lines changed: 6 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

cmd/otelcontribcol/go.mod

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -386,9 +386,9 @@ require (
386386
github.com/imdario/mergo v0.3.12 // indirect
387387
github.com/inconshreveable/mousetrap v1.0.1 // indirect
388388
github.com/influxdata/go-syslog/v3 v3.0.1-0.20210608084020-ac565dc76ba6 // indirect
389-
github.com/influxdata/influxdb-observability/common v0.2.35 // indirect
390-
github.com/influxdata/influxdb-observability/influx2otel v0.2.35 // indirect
391-
github.com/influxdata/influxdb-observability/otel2influx v0.2.35 // indirect
389+
github.com/influxdata/influxdb-observability/common v0.3.0 // indirect
390+
github.com/influxdata/influxdb-observability/influx2otel v0.3.0 // indirect
391+
github.com/influxdata/influxdb-observability/otel2influx v0.3.0 // indirect
392392
github.com/influxdata/line-protocol/v2 v2.2.1 // indirect
393393
github.com/ionos-cloud/sdk-go/v6 v6.1.3 // indirect
394394
github.com/jaegertracing/jaeger v1.41.0 // indirect

cmd/otelcontribcol/go.sum

Lines changed: 6 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

exporter/influxdbexporter/exporter.go

Lines changed: 50 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -26,142 +26,126 @@ import (
2626
"go.opentelemetry.io/collector/pdata/plog"
2727
"go.opentelemetry.io/collector/pdata/pmetric"
2828
"go.opentelemetry.io/collector/pdata/ptrace"
29+
"go.uber.org/multierr"
2930
)
3031

3132
type tracesExporter struct {
3233
logger common.Logger
33-
cfg *Config
3434
writer *influxHTTPWriter
3535
converter *otel2influx.OtelTracesToLineProtocol
36-
settings component.TelemetrySettings
3736
}
3837

39-
func newTracesExporter(config *Config, params exporter.CreateSettings) *tracesExporter {
40-
logger := newZapInfluxLogger(params.Logger)
41-
converter := otel2influx.NewOtelTracesToLineProtocol(logger)
38+
func newTracesExporter(config *Config, settings exporter.CreateSettings) (*tracesExporter, error) {
39+
logger := newZapInfluxLogger(settings.Logger)
40+
41+
writer, err := newInfluxHTTPWriter(logger, config, settings.TelemetrySettings)
42+
if err != nil {
43+
return nil, err
44+
}
45+
46+
converter, err := otel2influx.NewOtelTracesToLineProtocol(logger, writer)
47+
if err != nil {
48+
return nil, err
49+
}
4250

4351
return &tracesExporter{
4452
logger: logger,
45-
cfg: config,
53+
writer: writer,
4654
converter: converter,
47-
settings: params.TelemetrySettings,
48-
}
55+
}, nil
4956
}
5057

5158
func (e *tracesExporter) pushTraces(ctx context.Context, td ptrace.Traces) error {
52-
batch := e.writer.newBatch()
53-
54-
err := e.converter.WriteTraces(ctx, td, batch)
59+
err := e.converter.WriteTraces(ctx, td)
5560
if err != nil {
5661
return consumererror.NewPermanent(err)
5762
}
58-
return batch.flushAndClose(ctx)
63+
return nil
5964
}
6065

61-
// start starts the traces exporter
62-
func (e *tracesExporter) start(_ context.Context, host component.Host) (err error) {
63-
64-
writer, err := newInfluxHTTPWriter(e.logger, e.cfg, host, e.settings)
65-
if err != nil {
66-
return err
67-
}
68-
e.writer = writer
66+
func (e *tracesExporter) Start(ctx context.Context, host component.Host) error {
67+
e.logger.Debug("starting traces exporter")
68+
return multierr.Combine(
69+
e.writer.Start(ctx, host),
70+
e.converter.Start(ctx, host))
71+
}
6972

70-
return nil
73+
func (e *tracesExporter) Shutdown(ctx context.Context) error {
74+
return e.converter.Shutdown(ctx)
7175
}
7276

7377
type metricsExporter struct {
7478
logger common.Logger
75-
cfg *Config
7679
writer *influxHTTPWriter
7780
converter *otel2influx.OtelMetricsToLineProtocol
78-
settings component.TelemetrySettings
79-
}
80-
81-
var metricsSchemata = map[string]common.MetricsSchema{
82-
"telegraf-prometheus-v1": common.MetricsSchemaTelegrafPrometheusV1,
83-
"telegraf-prometheus-v2": common.MetricsSchemaTelegrafPrometheusV2,
8481
}
8582

8683
func newMetricsExporter(config *Config, params exporter.CreateSettings) (*metricsExporter, error) {
8784
logger := newZapInfluxLogger(params.Logger)
88-
schema, found := metricsSchemata[config.MetricsSchema]
85+
schema, found := common.MetricsSchemata[config.MetricsSchema]
8986
if !found {
9087
return nil, fmt.Errorf("schema '%s' not recognized", config.MetricsSchema)
9188
}
9289

93-
converter, err := otel2influx.NewOtelMetricsToLineProtocol(logger, schema)
90+
writer, err := newInfluxHTTPWriter(logger, config, params.TelemetrySettings)
91+
if err != nil {
92+
return nil, err
93+
}
94+
95+
converter, err := otel2influx.NewOtelMetricsToLineProtocol(logger, writer, schema)
9496
if err != nil {
9597
return nil, err
9698
}
9799

98100
return &metricsExporter{
99101
logger: logger,
100-
cfg: config,
102+
writer: writer,
101103
converter: converter,
102-
settings: params.TelemetrySettings,
103104
}, nil
104105
}
105106

106107
func (e *metricsExporter) pushMetrics(ctx context.Context, md pmetric.Metrics) error {
107-
batch := e.writer.newBatch()
108-
109-
err := e.converter.WriteMetrics(ctx, md, batch)
108+
err := e.converter.WriteMetrics(ctx, md)
110109
if err != nil {
111110
return consumererror.NewPermanent(err)
112111
}
113-
return batch.flushAndClose(ctx)
112+
return nil
114113
}
115114

116-
// start starts the metrics exporter
117-
func (e *metricsExporter) start(_ context.Context, host component.Host) (err error) {
118-
119-
writer, err := newInfluxHTTPWriter(e.logger, e.cfg, host, e.settings)
120-
if err != nil {
121-
return err
122-
}
123-
e.writer = writer
124-
125-
return nil
115+
func (e *metricsExporter) Start(ctx context.Context, host component.Host) error {
116+
return e.writer.Start(ctx, host)
126117
}
127118

128119
type logsExporter struct {
129120
logger common.Logger
130-
cfg *Config
131121
writer *influxHTTPWriter
132122
converter *otel2influx.OtelLogsToLineProtocol
133-
settings component.TelemetrySettings
134123
}
135124

136-
func newLogsExporter(config *Config, params exporter.CreateSettings) *logsExporter {
125+
func newLogsExporter(config *Config, params exporter.CreateSettings) (*logsExporter, error) {
137126
logger := newZapInfluxLogger(params.Logger)
138-
converter := otel2influx.NewOtelLogsToLineProtocol(logger)
127+
128+
writer, err := newInfluxHTTPWriter(logger, config, params.TelemetrySettings)
129+
if err != nil {
130+
return nil, err
131+
}
132+
converter := otel2influx.NewOtelLogsToLineProtocol(logger, writer)
139133

140134
return &logsExporter{
141135
logger: logger,
136+
writer: writer,
142137
converter: converter,
143-
cfg: config,
144-
settings: params.TelemetrySettings,
145-
}
138+
}, nil
146139
}
147140

148141
func (e *logsExporter) pushLogs(ctx context.Context, ld plog.Logs) error {
149-
batch := e.writer.newBatch()
150-
151-
err := e.converter.WriteLogs(ctx, ld, batch)
142+
err := e.converter.WriteLogs(ctx, ld)
152143
if err != nil {
153144
return consumererror.NewPermanent(err)
154145
}
155-
return batch.flushAndClose(ctx)
146+
return nil
156147
}
157148

158-
// start starts the logs exporter
159-
func (e *logsExporter) start(_ context.Context, host component.Host) (err error) {
160-
writer, err := newInfluxHTTPWriter(e.logger, e.cfg, host, e.settings)
161-
if err != nil {
162-
return err
163-
}
164-
e.writer = writer
165-
166-
return nil
149+
func (e *logsExporter) Start(ctx context.Context, host component.Host) error {
150+
return e.writer.Start(ctx, host)
167151
}

exporter/influxdbexporter/factory.go

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"context"
1919
"time"
2020

21+
"github.com/influxdata/influxdb-observability/common"
2122
"go.opentelemetry.io/collector/component"
2223
"go.opentelemetry.io/collector/config/confighttp"
2324
"go.opentelemetry.io/collector/config/configopaque"
@@ -39,7 +40,10 @@ func NewFactory() exporter.Factory {
3940
func createTraceExporter(ctx context.Context, set exporter.CreateSettings, config component.Config) (exporter.Traces, error) {
4041
cfg := config.(*Config)
4142

42-
exporter := newTracesExporter(cfg, set)
43+
exporter, err := newTracesExporter(cfg, set)
44+
if err != nil {
45+
return nil, err
46+
}
4347

4448
return exporterhelper.NewTracesExporter(
4549
ctx,
@@ -48,7 +52,8 @@ func createTraceExporter(ctx context.Context, set exporter.CreateSettings, confi
4852
exporter.pushTraces,
4953
exporterhelper.WithQueue(cfg.QueueSettings),
5054
exporterhelper.WithRetry(cfg.RetrySettings),
51-
exporterhelper.WithStart(exporter.start),
55+
exporterhelper.WithStart(exporter.Start),
56+
exporterhelper.WithShutdown(exporter.Shutdown),
5257
)
5358
}
5459

@@ -67,14 +72,17 @@ func createMetricsExporter(ctx context.Context, set exporter.CreateSettings, con
6772
exporter.pushMetrics,
6873
exporterhelper.WithQueue(cfg.QueueSettings),
6974
exporterhelper.WithRetry(cfg.RetrySettings),
70-
exporterhelper.WithStart(exporter.start),
75+
exporterhelper.WithStart(exporter.Start),
7176
)
7277
}
7378

7479
func createLogsExporter(ctx context.Context, set exporter.CreateSettings, config component.Config) (exporter.Logs, error) {
7580
cfg := config.(*Config)
7681

77-
exporter := newLogsExporter(cfg, set)
82+
exporter, err := newLogsExporter(cfg, set)
83+
if err != nil {
84+
return nil, err
85+
}
7886

7987
return exporterhelper.NewLogsExporter(
8088
ctx,
@@ -83,7 +91,7 @@ func createLogsExporter(ctx context.Context, set exporter.CreateSettings, config
8391
exporter.pushLogs,
8492
exporterhelper.WithQueue(cfg.QueueSettings),
8593
exporterhelper.WithRetry(cfg.RetrySettings),
86-
exporterhelper.WithStart(exporter.start),
94+
exporterhelper.WithStart(exporter.Start),
8795
)
8896
}
8997

@@ -97,6 +105,6 @@ func createDefaultConfig() component.Config {
97105
},
98106
QueueSettings: exporterhelper.NewDefaultQueueSettings(),
99107
RetrySettings: exporterhelper.NewDefaultRetrySettings(),
100-
MetricsSchema: "telegraf-prometheus-v1",
108+
MetricsSchema: common.MetricsSchemaTelegrafPrometheusV1.String(),
101109
}
102110
}

0 commit comments

Comments
 (0)