Skip to content

Commit df79d6d

Browse files
RabbitMQ exporter implementation (#32051)
**Description:** This is the completed implementation of the rabbitmq exporter. **Link to tracking Issue:** #28891 **Testing:** - Unit tests - Happy path with rabbitmq running locally and in the cloud, testing different configuration options - Error cases - Fail to connect during start-up - Invalid credentials - Connection lost midway through publishing to the queue. The component attempts reconnecting on the next publish attempt - Concurrent publishing, both with and without connection issues **Documentation:** Updated README with more configuration options --------- Co-authored-by: Andrzej Stencel <[email protected]>
1 parent 52cc8e8 commit df79d6d

16 files changed

+1323
-61
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: rabbitmqexporter
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Implements the RabbitMQ exporter
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [28891]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

exporter/rabbitmqexporter/README.md

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,11 @@
1010
[development]: https://github.com/open-telemetry/opentelemetry-collector#development
1111
<!-- end autogenerated section -->
1212

13-
Exports metrics, traces, and logs to [RabbitMQ](https://www.rabbitmq.com/) using the AMQP 0.9.1 protocol
13+
Exports metrics, traces, and logs to [RabbitMQ](https://www.rabbitmq.com/) using the AMQP 0.9.1 protocol.
14+
15+
Messages are published to the [default exchange](https://www.rabbitmq.com/tutorials/amqp-concepts#exchange-default) direct exchange, but optionally can be published to a different direct exchange.
16+
17+
This component expects that exchanges, queues, and bindings already exist - they are not currently created by this component.
1418

1519
## Getting Started
1620

@@ -19,16 +23,15 @@ The following settings can be configured:
1923
- `endpoint` (required, ex = amqp://localhost:5672): Endpoint to connect to RabbitMQ
2024
- `vhost` (optional): The RabbitMQ [virtual host](https://www.rabbitmq.com/docs/vhosts) to connect to
2125
- `auth`:
22-
- `sasl`: Configuration if using SASL PLAIN authentication
26+
- `plain`: Configuration if using SASL PLAIN authentication
2327
- `username` (required): username for authentication
24-
- `password` (required): password for authentication
25-
- `tls` (optional): TODO, need to add this
28+
- `password`: password for authentication
29+
- `tls` (optional): [TLS configuration](https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/configtls/configtls.go#L32)
2630
- `routing`:
2731
- `routing_key` (default = otlp_spans for traces, otlp_metrics for metrics, otlp_logs for logs): Routing key used to route exported messages to RabbitMQ consumers
32+
- `exchange`: Name of the exchange used to route messages. If omitted, the [default exchange](https://www.rabbitmq.com/tutorials/amqp-concepts#exchange-default) is used which routes to a queue with the same as the routing key. Only [direct exchanges](https://www.rabbitmq.com/tutorials/amqp-concepts#exchange-direct) are currently supported. Note that this component does not handle queue creation or binding.
2833
- `durable` (default = true): Whether to instruct RabbitMQ to make messages [durable](https://www.rabbitmq.com/docs/queues#durability) by writing to disk
29-
- `message_body_encoding`: (default = "otlp_proto"): The encoding of telemetry sent to RabbitMQ
30-
- `otlp_proto`: payload is Protobuf serialized from `ExportTraceServiceRequest` if set as a traces exporter or `ExportMetricsServiceRequest` for metrics or `ExportLogsServiceRequest` for logs.
31-
- `otlp_json`: ** EXPERIMENTAL ** payload is JSON serialized from `ExportTraceServiceRequest` if set as a traces exporter or `ExportMetricsServiceRequest` for metrics or `ExportLogsServiceRequest` for logs.
34+
- `encoding_extension`: (defaults to OTLP protobuf format): ID of the [encoding extension](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/extension/encoding) to use to marshal data
3235
- `retry_on_failure`:
3336
- `enabled` (default = false)
3437

@@ -40,7 +43,12 @@ exporters:
4043
connection:
4144
endpoint: amqp://localhost:5672
4245
auth:
43-
sasl:
46+
plain:
4447
username: user
4548
password: pass
49+
encoding_extension: otlp_encoding/rabbitmq
50+
51+
extensions:
52+
otlp_encoding/rabbitmq:
53+
protocol: otlp_json
4654
```

exporter/rabbitmqexporter/config.go

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,33 +4,42 @@
44
package rabbitmqexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/rabbitmqexporter"
55

66
import (
7+
"errors"
8+
"time"
9+
710
"go.opentelemetry.io/collector/component"
811
"go.opentelemetry.io/collector/config/configretry"
12+
"go.opentelemetry.io/collector/config/configtls"
913
)
1014

1115
type Config struct {
1216
Connection ConnectionConfig `mapstructure:"connection"`
1317
Routing RoutingConfig `mapstructure:"routing"`
14-
MessageBodyEncoding string `mapstructure:"message_body_encoding"`
18+
EncodingExtensionID *component.ID `mapstructure:"encoding_extension"`
1519
Durable bool `mapstructure:"durable"`
1620
RetrySettings configretry.BackOffConfig `mapstructure:"retry_on_failure"`
1721
}
1822

1923
type ConnectionConfig struct {
20-
Endpoint string `mapstructure:"endpoint"`
21-
VHost string `mapstructure:"vhost"`
22-
Auth AuthConfig `mapstructure:"auth"`
24+
Endpoint string `mapstructure:"endpoint"`
25+
VHost string `mapstructure:"vhost"`
26+
TLSConfig *configtls.ClientConfig `mapstructure:"tls"`
27+
Auth AuthConfig `mapstructure:"auth"`
28+
ConnectionTimeout time.Duration `mapstructure:"connection_timeout"`
29+
Heartbeat time.Duration `mapstructure:"heartbeat"`
30+
PublishConfirmationTimeout time.Duration `mapstructure:"publish_confirmation_timeout"`
2331
}
2432

2533
type RoutingConfig struct {
34+
Exchange string `mapstructure:"exchange"`
2635
RoutingKey string `mapstructure:"routing_key"`
2736
}
2837

2938
type AuthConfig struct {
30-
SASL SASLConfig `mapstructure:"sasl"`
39+
Plain PlainAuth `mapstructure:"plain"`
3140
}
3241

33-
type SASLConfig struct {
42+
type PlainAuth struct {
3443
Username string `mapstructure:"username"`
3544
Password string `mapstructure:"password"`
3645
}
@@ -39,5 +48,14 @@ var _ component.Config = (*Config)(nil)
3948

4049
// Validate checks if the exporter configuration is valid
4150
func (cfg *Config) Validate() error {
51+
if cfg.Connection.Endpoint == "" {
52+
return errors.New("connection.endpoint is required")
53+
}
54+
55+
// Password-less users are possible so only validate username
56+
if cfg.Connection.Auth.Plain.Username == "" {
57+
return errors.New("connection.auth.plain.username is required")
58+
}
59+
4260
return nil
4361
}

exporter/rabbitmqexporter/config_test.go

Lines changed: 38 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,49 +4,69 @@
44
package rabbitmqexporter
55

66
import (
7+
"errors"
78
"path/filepath"
89
"testing"
10+
"time"
911

1012
"github.com/stretchr/testify/assert"
1113
"github.com/stretchr/testify/require"
1214
"go.opentelemetry.io/collector/component"
1315
"go.opentelemetry.io/collector/config/configretry"
16+
"go.opentelemetry.io/collector/config/configtls"
1417
"go.opentelemetry.io/collector/confmap/confmaptest"
1518

1619
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/rabbitmqexporter/internal/metadata"
1720
)
1821

22+
var encodingComponentID = component.NewIDWithName(component.MustNewType("otlp_encoding"), "rabbitmq123")
23+
1924
func TestLoadConfig(t *testing.T) {
2025
t.Parallel()
2126

2227
cm, err := confmaptest.LoadConf(filepath.Join("testdata", "test-config.yaml"))
2328
require.NoError(t, err)
2429

2530
tests := []struct {
26-
id component.ID
27-
expected component.Config
31+
id component.ID
32+
expected component.Config
33+
errorMessage string
2834
}{
2935
{
30-
id: component.NewIDWithName(metadata.Type, ""),
31-
expected: createDefaultConfig().(*Config),
36+
id: component.NewIDWithName(metadata.Type, "missing_endpoint"),
37+
errorMessage: "connection.endpoint is required",
38+
},
39+
{
40+
id: component.NewIDWithName(metadata.Type, "missing_plainauth_username"),
41+
errorMessage: "connection.auth.plain.username is required",
3242
},
3343
{
3444
id: component.NewIDWithName(metadata.Type, "all_fields"),
3545
expected: &Config{
3646
Connection: ConnectionConfig{
37-
Endpoint: "amqp://localhost:5672",
47+
Endpoint: "amqps://localhost:5672",
3848
VHost: "vhost1",
3949
Auth: AuthConfig{
40-
SASL: SASLConfig{
50+
Plain: PlainAuth{
4151
Username: "user",
4252
Password: "pass",
4353
},
4454
},
55+
TLSConfig: &configtls.ClientConfig{
56+
Config: configtls.Config{
57+
CAFile: "cert123",
58+
},
59+
Insecure: true,
60+
},
61+
ConnectionTimeout: time.Millisecond,
62+
Heartbeat: time.Millisecond * 2,
63+
PublishConfirmationTimeout: time.Millisecond * 3,
4564
},
4665
Routing: RoutingConfig{
66+
Exchange: "amq.direct",
4767
RoutingKey: "custom_routing_key",
4868
},
49-
MessageBodyEncoding: "otlp_json",
69+
EncodingExtensionID: &encodingComponentID,
5070
Durable: false,
5171
RetrySettings: configretry.BackOffConfig{
5272
Enabled: true,
@@ -60,14 +80,16 @@ func TestLoadConfig(t *testing.T) {
6080
Endpoint: "amqp://localhost:5672",
6181
VHost: "",
6282
Auth: AuthConfig{
63-
SASL: SASLConfig{
83+
Plain: PlainAuth{
6484
Username: "user",
6585
Password: "pass",
6686
},
6787
},
88+
ConnectionTimeout: defaultConnectionTimeout,
89+
Heartbeat: defaultConnectionHeartbeat,
90+
PublishConfirmationTimeout: defaultPublishConfirmationTimeout,
6891
},
69-
MessageBodyEncoding: "otlp_proto",
70-
Durable: true,
92+
Durable: true,
7193
RetrySettings: configretry.BackOffConfig{
7294
Enabled: false,
7395
},
@@ -84,6 +106,12 @@ func TestLoadConfig(t *testing.T) {
84106
require.NoError(t, err)
85107
require.NoError(t, component.UnmarshalConfig(sub, cfg))
86108

109+
if tt.expected == nil {
110+
err = errors.Join(err, component.ValidateConfig(cfg))
111+
assert.ErrorContains(t, err, tt.errorMessage)
112+
return
113+
}
114+
87115
assert.NoError(t, component.ValidateConfig(cfg))
88116
assert.Equal(t, tt.expected, cfg)
89117
})

exporter/rabbitmqexporter/factory.go

Lines changed: 56 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ package rabbitmqexporter // import "github.com/open-telemetry/opentelemetry-coll
55

66
import (
77
"context"
8+
"crypto/tls"
9+
"time"
810

911
"go.opentelemetry.io/collector/component"
1012
"go.opentelemetry.io/collector/config/configretry"
@@ -13,10 +15,21 @@ import (
1315
"go.opentelemetry.io/collector/exporter/exporterhelper"
1416

1517
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/rabbitmqexporter/internal/metadata"
18+
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/rabbitmqexporter/internal/publisher"
1619
)
1720

1821
const (
19-
defaultEncoding = "otlp_proto"
22+
defaultConnectionTimeout = time.Second * 10
23+
defaultConnectionHeartbeat = time.Second * 5
24+
defaultPublishConfirmationTimeout = time.Second * 5
25+
26+
spansRoutingKey = "otlp_spans"
27+
metricsRoutingKey = "otlp_metrics"
28+
logsRoutingKey = "otlp_logs"
29+
30+
spansConnectionName = "otel-collector-spans"
31+
metricsConnectionName = "otel-collector-metrics"
32+
logsConnectionName = "otel-collector-logs"
2033
)
2134

2235
func NewFactory() exporter.Factory {
@@ -34,9 +47,13 @@ func createDefaultConfig() component.Config {
3447
Enabled: false,
3548
}
3649
return &Config{
37-
MessageBodyEncoding: defaultEncoding,
38-
Durable: true,
39-
RetrySettings: retrySettings,
50+
Durable: true,
51+
RetrySettings: retrySettings,
52+
Connection: ConnectionConfig{
53+
ConnectionTimeout: defaultConnectionTimeout,
54+
Heartbeat: defaultConnectionHeartbeat,
55+
PublishConfirmationTimeout: defaultPublishConfirmationTimeout,
56+
},
4057
}
4158
}
4259

@@ -46,13 +63,15 @@ func createTracesExporter(
4663
cfg component.Config,
4764
) (exporter.Traces, error) {
4865
config := cfg.(*Config)
49-
r := newRabbitmqExporter(config, set.TelemetrySettings)
66+
67+
routingKey := getRoutingKeyOrDefault(config, spansRoutingKey)
68+
r := newRabbitmqExporter(config, set.TelemetrySettings, newPublisherFactory(set), newTLSFactory(config), routingKey, spansConnectionName)
5069

5170
return exporterhelper.NewTracesExporter(
5271
ctx,
5372
set,
5473
cfg,
55-
r.pushTraces,
74+
r.publishTraces,
5675
exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}),
5776
exporterhelper.WithStart(r.start),
5877
exporterhelper.WithShutdown(r.shutdown),
@@ -66,13 +85,15 @@ func createMetricsExporter(
6685
cfg component.Config,
6786
) (exporter.Metrics, error) {
6887
config := (cfg.(*Config))
69-
r := newRabbitmqExporter(config, set.TelemetrySettings)
88+
89+
routingKey := getRoutingKeyOrDefault(config, metricsRoutingKey)
90+
r := newRabbitmqExporter(config, set.TelemetrySettings, newPublisherFactory(set), newTLSFactory(config), routingKey, metricsConnectionName)
7091

7192
return exporterhelper.NewMetricsExporter(
7293
ctx,
7394
set,
7495
cfg,
75-
r.pushMetrics,
96+
r.publishMetrics,
7697
exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}),
7798
exporterhelper.WithStart(r.start),
7899
exporterhelper.WithShutdown(r.shutdown),
@@ -86,16 +107,41 @@ func createLogsExporter(
86107
cfg component.Config,
87108
) (exporter.Logs, error) {
88109
config := (cfg.(*Config))
89-
r := newRabbitmqExporter(config, set.TelemetrySettings)
110+
111+
routingKey := getRoutingKeyOrDefault(config, logsRoutingKey)
112+
r := newRabbitmqExporter(config, set.TelemetrySettings, newPublisherFactory(set), newTLSFactory(config), routingKey, logsConnectionName)
90113

91114
return exporterhelper.NewLogsExporter(
92115
ctx,
93116
set,
94117
cfg,
95-
r.pushLogs,
118+
r.publishLogs,
96119
exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}),
97120
exporterhelper.WithStart(r.start),
98121
exporterhelper.WithShutdown(r.shutdown),
99122
exporterhelper.WithRetry(config.RetrySettings),
100123
)
101124
}
125+
126+
func getRoutingKeyOrDefault(config *Config, fallback string) string {
127+
routingKey := fallback
128+
if config.Routing.RoutingKey != "" {
129+
routingKey = config.Routing.RoutingKey
130+
}
131+
return routingKey
132+
}
133+
134+
func newPublisherFactory(set exporter.CreateSettings) publisherFactory {
135+
return func(dialConfig publisher.DialConfig) (publisher.Publisher, error) {
136+
return publisher.NewConnection(set.Logger, publisher.NewAmqpClient(), dialConfig)
137+
}
138+
}
139+
140+
func newTLSFactory(config *Config) tlsFactory {
141+
if config.Connection.TLSConfig != nil {
142+
return config.Connection.TLSConfig.LoadTLSConfig
143+
}
144+
return func(context.Context) (*tls.Config, error) {
145+
return nil, nil
146+
}
147+
}

0 commit comments

Comments
 (0)