Skip to content

Commit 26ff342

Browse files
kafka exporter: seperate message conversion for otlp and jaeger (#3166)
* kafka exporter: seperate message conversion for otlp and jaeger * fix test * fix format * Revert "fix format" This reverts commit f02e8fb. * Revert "fix test" This reverts commit 23230cc. * Revert "kafka exporter: seperate message conversion for otlp and jaeger" This reverts commit 4184718. * convert traces/metrics/logs to sarama.ProducerMessage directly * fix linting error (impi) and changed CHANGELOG.md Co-authored-by: Bogdan Drutu <[email protected]>
1 parent 8391c1e commit 26ff342

File tree

9 files changed

+70
-54
lines changed

9 files changed

+70
-54
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
## 🛑 Breaking changes 🛑
88

9+
- Change `Marshal` signatures in kafkaexporter's Marshalers to directly convert pdata to `sarama.ProducerMessage` (#3162)
910
- Remove `tracetranslator.DetermineValueType`, only used internally by Zipkin (#3114)
1011
- Remove OpenCensus conventions, should not be used (#3113)
1112
- Remove Zipkin specific translation constants, move to internal (#3112)

exporter/kafkaexporter/factory_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"context"
1919
"testing"
2020

21+
"github.com/Shopify/sarama"
2122
"github.com/stretchr/testify/assert"
2223
"github.com/stretchr/testify/require"
2324
"go.uber.org/zap"
@@ -128,7 +129,7 @@ type customMarshaler struct {
128129

129130
var _ TracesMarshaler = (*customMarshaler)(nil)
130131

131-
func (c customMarshaler) Marshal(_ pdata.Traces) ([]Message, error) {
132+
func (c customMarshaler) Marshal(_ pdata.Traces, topic string) ([]*sarama.ProducerMessage, error) {
132133
panic("implement me")
133134
}
134135

exporter/kafkaexporter/jaeger_marshaler.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ package kafkaexporter
1717
import (
1818
"bytes"
1919

20+
"github.com/Shopify/sarama"
2021
"github.com/gogo/protobuf/jsonpb"
2122
jaegerproto "github.com/jaegertracing/jaeger/model"
2223

@@ -31,12 +32,13 @@ type jaegerMarshaler struct {
3132

3233
var _ TracesMarshaler = (*jaegerMarshaler)(nil)
3334

34-
func (j jaegerMarshaler) Marshal(traces pdata.Traces) ([]Message, error) {
35+
func (j jaegerMarshaler) Marshal(traces pdata.Traces, topic string) ([]*sarama.ProducerMessage, error) {
3536
batches, err := jaegertranslator.InternalTracesToJaegerProto(traces)
3637
if err != nil {
3738
return nil, err
3839
}
39-
var messages []Message
40+
var messages []*sarama.ProducerMessage
41+
4042
var errs []error
4143
for _, batch := range batches {
4244
for _, span := range batch.Spans {
@@ -48,7 +50,11 @@ func (j jaegerMarshaler) Marshal(traces pdata.Traces) ([]Message, error) {
4850
continue
4951
}
5052
key := []byte(span.TraceID.String())
51-
messages = append(messages, Message{Value: bts, Key: key})
53+
messages = append(messages, &sarama.ProducerMessage{
54+
Topic: topic,
55+
Value: sarama.ByteEncoder(bts),
56+
Key: sarama.ByteEncoder(key),
57+
})
5258
}
5359
}
5460
return messages, consumererror.Combine(errs)

exporter/kafkaexporter/jaeger_marshaler_test.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"bytes"
1919
"testing"
2020

21+
"github.com/Shopify/sarama"
2122
"github.com/gogo/protobuf/jsonpb"
2223
"github.com/stretchr/testify/assert"
2324
"github.com/stretchr/testify/require"
@@ -50,14 +51,14 @@ func TestJaegerMarshaler(t *testing.T) {
5051
tests := []struct {
5152
unmarshaler TracesMarshaler
5253
encoding string
53-
messages []Message
54+
messages []*sarama.ProducerMessage
5455
}{
5556
{
5657
unmarshaler: jaegerMarshaler{
5758
marshaler: jaegerProtoSpanMarshaler{},
5859
},
5960
encoding: "jaeger_proto",
60-
messages: []Message{{Value: jaegerProtoBytes, Key: messageKey}},
61+
messages: []*sarama.ProducerMessage{{Topic: "topic", Value: sarama.ByteEncoder(jaegerProtoBytes), Key: sarama.ByteEncoder(messageKey)}},
6162
},
6263
{
6364
unmarshaler: jaegerMarshaler{
@@ -66,12 +67,12 @@ func TestJaegerMarshaler(t *testing.T) {
6667
},
6768
},
6869
encoding: "jaeger_json",
69-
messages: []Message{{Value: jsonByteBuffer.Bytes(), Key: messageKey}},
70+
messages: []*sarama.ProducerMessage{{Topic: "topic", Value: sarama.ByteEncoder(jsonByteBuffer.Bytes()), Key: sarama.ByteEncoder(messageKey)}},
7071
},
7172
}
7273
for _, test := range tests {
7374
t.Run(test.encoding, func(t *testing.T) {
74-
messages, err := test.unmarshaler.Marshal(td)
75+
messages, err := test.unmarshaler.Marshal(td, "topic")
7576
require.NoError(t, err)
7677
assert.Equal(t, test.messages, messages)
7778
assert.Equal(t, test.encoding, test.unmarshaler.Encoding())
@@ -86,7 +87,7 @@ func TestJaegerMarshaler_error_covert_traceID(t *testing.T) {
8687
td := pdata.NewTraces()
8788
td.ResourceSpans().AppendEmpty().InstrumentationLibrarySpans().AppendEmpty().Spans().AppendEmpty()
8889
// fails in zero traceID
89-
messages, err := marshaler.Marshal(td)
90+
messages, err := marshaler.Marshal(td, "topic")
9091
require.Error(t, err)
9192
assert.Nil(t, messages)
9293
}

exporter/kafkaexporter/kafka_exporter.go

Lines changed: 6 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,11 @@ type kafkaTracesProducer struct {
3737
}
3838

3939
func (e *kafkaTracesProducer) traceDataPusher(_ context.Context, td pdata.Traces) error {
40-
messages, err := e.marshaler.Marshal(td)
40+
messages, err := e.marshaler.Marshal(td, e.topic)
4141
if err != nil {
4242
return consumererror.Permanent(err)
4343
}
44-
err = e.producer.SendMessages(producerMessages(messages, e.topic))
44+
err = e.producer.SendMessages(messages)
4545
if err != nil {
4646
return err
4747
}
@@ -61,11 +61,11 @@ type kafkaMetricsProducer struct {
6161
}
6262

6363
func (e *kafkaMetricsProducer) metricsDataPusher(_ context.Context, md pdata.Metrics) error {
64-
messages, err := e.marshaler.Marshal(md)
64+
messages, err := e.marshaler.Marshal(md, e.topic)
6565
if err != nil {
6666
return consumererror.Permanent(err)
6767
}
68-
err = e.producer.SendMessages(producerMessages(messages, e.topic))
68+
err = e.producer.SendMessages(messages)
6969
if err != nil {
7070
return err
7171
}
@@ -85,11 +85,11 @@ type kafkaLogsProducer struct {
8585
}
8686

8787
func (e *kafkaLogsProducer) logsDataPusher(_ context.Context, ld pdata.Logs) error {
88-
messages, err := e.marshaler.Marshal(ld)
88+
messages, err := e.marshaler.Marshal(ld, e.topic)
8989
if err != nil {
9090
return consumererror.Permanent(err)
9191
}
92-
err = e.producer.SendMessages(producerMessages(messages, e.topic))
92+
err = e.producer.SendMessages(messages)
9393
if err != nil {
9494
return err
9595
}
@@ -184,15 +184,3 @@ func newLogsExporter(config Config, params component.ExporterCreateParams, marsh
184184
}, nil
185185

186186
}
187-
188-
func producerMessages(messages []Message, topic string) []*sarama.ProducerMessage {
189-
producerMessages := make([]*sarama.ProducerMessage, len(messages))
190-
for i := range messages {
191-
producerMessages[i] = &sarama.ProducerMessage{
192-
Topic: topic,
193-
Value: sarama.ByteEncoder(messages[i].Value),
194-
Key: sarama.ByteEncoder(messages[i].Key),
195-
}
196-
}
197-
return producerMessages
198-
}

exporter/kafkaexporter/kafka_exporter_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,7 @@ type logsErrorMarshaler struct {
270270
err error
271271
}
272272

273-
func (e metricsErrorMarshaler) Marshal(_ pdata.Metrics) ([]Message, error) {
273+
func (e metricsErrorMarshaler) Marshal(_ pdata.Metrics, _ string) ([]*sarama.ProducerMessage, error) {
274274
return nil, e.err
275275
}
276276

@@ -280,15 +280,15 @@ func (e metricsErrorMarshaler) Encoding() string {
280280

281281
var _ TracesMarshaler = (*tracesErrorMarshaler)(nil)
282282

283-
func (e tracesErrorMarshaler) Marshal(_ pdata.Traces) ([]Message, error) {
283+
func (e tracesErrorMarshaler) Marshal(_ pdata.Traces, _ string) ([]*sarama.ProducerMessage, error) {
284284
return nil, e.err
285285
}
286286

287287
func (e tracesErrorMarshaler) Encoding() string {
288288
panic("implement me")
289289
}
290290

291-
func (e logsErrorMarshaler) Marshal(_ pdata.Logs) ([]Message, error) {
291+
func (e logsErrorMarshaler) Marshal(_ pdata.Logs, _ string) ([]*sarama.ProducerMessage, error) {
292292
return nil, e.err
293293
}
294294

exporter/kafkaexporter/marshaler.go

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,42 +15,38 @@
1515
package kafkaexporter
1616

1717
import (
18+
"github.com/Shopify/sarama"
19+
1820
"go.opentelemetry.io/collector/consumer/pdata"
1921
)
2022

2123
// TracesMarshaler marshals traces into Message array.
2224
type TracesMarshaler interface {
23-
// Marshal serializes spans into Messages
24-
Marshal(traces pdata.Traces) ([]Message, error)
25+
// Marshal serializes spans into sarama's ProducerMessages
26+
Marshal(traces pdata.Traces, topic string) ([]*sarama.ProducerMessage, error)
2527

2628
// Encoding returns encoding name
2729
Encoding() string
2830
}
2931

3032
// MetricsMarshaler marshals metrics into Message array
3133
type MetricsMarshaler interface {
32-
// Marshal serializes metrics into Messages
33-
Marshal(metrics pdata.Metrics) ([]Message, error)
34+
// Marshal serializes metrics into sarama's ProducerMessages
35+
Marshal(metrics pdata.Metrics, topic string) ([]*sarama.ProducerMessage, error)
3436

3537
// Encoding returns encoding name
3638
Encoding() string
3739
}
3840

3941
// LogsMarshaler marshals logs into Message array
4042
type LogsMarshaler interface {
41-
// Marshal serializes logs into Messages
42-
Marshal(logs pdata.Logs) ([]Message, error)
43+
// Marshal serializes logs into sarama's ProducerMessages
44+
Marshal(logs pdata.Logs, topic string) ([]*sarama.ProducerMessage, error)
4345

4446
// Encoding returns encoding name
4547
Encoding() string
4648
}
4749

48-
// Message encapsulates Kafka's message payload.
49-
type Message struct {
50-
Value []byte
51-
Key []byte
52-
}
53-
5450
// tracesMarshalers returns map of supported encodings with TracesMarshaler.
5551
func tracesMarshalers() map[string]TracesMarshaler {
5652
otlppb := &otlpTracesPbMarshaler{}

exporter/kafkaexporter/otlp_marshaler.go

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
package kafkaexporter
1616

1717
import (
18+
"github.com/Shopify/sarama"
19+
1820
"go.opentelemetry.io/collector/consumer/pdata"
1921
)
2022

@@ -28,12 +30,17 @@ func (m *otlpTracesPbMarshaler) Encoding() string {
2830
return defaultEncoding
2931
}
3032

31-
func (m *otlpTracesPbMarshaler) Marshal(td pdata.Traces) ([]Message, error) {
33+
func (m *otlpTracesPbMarshaler) Marshal(td pdata.Traces, topic string) ([]*sarama.ProducerMessage, error) {
3234
bts, err := td.ToOtlpProtoBytes()
3335
if err != nil {
3436
return nil, err
3537
}
36-
return []Message{{Value: bts}}, nil
38+
return []*sarama.ProducerMessage{
39+
{
40+
Topic: topic,
41+
Value: sarama.ByteEncoder(bts),
42+
},
43+
}, nil
3744
}
3845

3946
type otlpMetricsPbMarshaler struct {
@@ -43,12 +50,17 @@ func (m *otlpMetricsPbMarshaler) Encoding() string {
4350
return defaultEncoding
4451
}
4552

46-
func (m *otlpMetricsPbMarshaler) Marshal(md pdata.Metrics) ([]Message, error) {
53+
func (m *otlpMetricsPbMarshaler) Marshal(md pdata.Metrics, topic string) ([]*sarama.ProducerMessage, error) {
4754
bts, err := md.ToOtlpProtoBytes()
4855
if err != nil {
4956
return nil, err
5057
}
51-
return []Message{{Value: bts}}, nil
58+
return []*sarama.ProducerMessage{
59+
{
60+
Topic: topic,
61+
Value: sarama.ByteEncoder(bts),
62+
},
63+
}, nil
5264
}
5365

5466
type otlpLogsPbMarshaler struct {
@@ -58,10 +70,15 @@ func (m *otlpLogsPbMarshaler) Encoding() string {
5870
return defaultEncoding
5971
}
6072

61-
func (m *otlpLogsPbMarshaler) Marshal(ld pdata.Logs) ([]Message, error) {
73+
func (m *otlpLogsPbMarshaler) Marshal(ld pdata.Logs, topic string) ([]*sarama.ProducerMessage, error) {
6274
bts, err := ld.ToOtlpProtoBytes()
6375
if err != nil {
6476
return nil, err
6577
}
66-
return []Message{{Value: bts}}, nil
78+
return []*sarama.ProducerMessage{
79+
{
80+
Topic: topic,
81+
Value: sarama.ByteEncoder(bts),
82+
},
83+
}, nil
6784
}

exporter/kafkaexporter/otlp_marshaler_test.go

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,12 @@ func TestOTLPTracesPbMarshaler(t *testing.T) {
2828
td := testdata.GenerateTracesTwoSpansSameResource()
2929
m := otlpTracesPbMarshaler{}
3030
assert.Equal(t, "otlp_proto", m.Encoding())
31-
messages, err := m.Marshal(td)
31+
messages, err := m.Marshal(td, "topic")
3232
require.NoError(t, err)
3333
require.Len(t, messages, 1)
34-
extracted, err := pdata.TracesFromOtlpProtoBytes(messages[0].Value)
34+
messageBytes, err := messages[0].Value.Encode()
35+
require.NoError(t, err)
36+
extracted, err := pdata.TracesFromOtlpProtoBytes(messageBytes)
3537
require.NoError(t, err)
3638
assert.EqualValues(t, td, extracted)
3739
}
@@ -40,10 +42,12 @@ func TestOTLPMetricsPbMarshaler(t *testing.T) {
4042
md := testdata.GenerateMetricsTwoMetrics()
4143
m := otlpMetricsPbMarshaler{}
4244
assert.Equal(t, "otlp_proto", m.Encoding())
43-
messages, err := m.Marshal(md)
45+
messages, err := m.Marshal(md, "topic")
4446
require.NoError(t, err)
4547
require.Len(t, messages, 1)
46-
extracted, err := pdata.MetricsFromOtlpProtoBytes(messages[0].Value)
48+
messageBytes, err := messages[0].Value.Encode()
49+
require.NoError(t, err)
50+
extracted, err := pdata.MetricsFromOtlpProtoBytes(messageBytes)
4751
require.NoError(t, err)
4852
assert.EqualValues(t, md, extracted)
4953
}
@@ -52,10 +56,12 @@ func TestOTLPLogsPbMarshaler(t *testing.T) {
5256
ld := testdata.GenerateLogsOneLogRecord()
5357
m := otlpLogsPbMarshaler{}
5458
assert.Equal(t, "otlp_proto", m.Encoding())
55-
messages, err := m.Marshal(ld)
59+
messages, err := m.Marshal(ld, "topic")
5660
require.NoError(t, err)
5761
require.Len(t, messages, 1)
58-
extracted, err := pdata.LogsFromOtlpProtoBytes(messages[0].Value)
62+
messageBytes, err := messages[0].Value.Encode()
63+
require.NoError(t, err)
64+
extracted, err := pdata.LogsFromOtlpProtoBytes(messageBytes)
5965
require.NoError(t, err)
6066
assert.EqualValues(t, ld, extracted)
6167
}

0 commit comments

Comments
 (0)