Skip to content

Commit cf6e6d8

Browse files
committed
Add channel based producer integration test
1 parent e7b4bd9 commit cf6e6d8

File tree

1 file changed

+72
-19
lines changed
  • instrumentation/github.com/confluentinc/confluent-kafka-go/kafka/splunkkafka/test

1 file changed

+72
-19
lines changed

instrumentation/github.com/confluentinc/confluent-kafka-go/kafka/splunkkafka/test/kafka_test.go

Lines changed: 72 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,8 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
//go:build cgo && (linux || darwin)
16-
// +build cgo
17-
// +build linux darwin
15+
//go:build cgo && linux
16+
// +build cgo,linux
1817

1918
package test
2019

@@ -47,7 +46,54 @@ var (
4746
testTopic = "gotest"
4847
)
4948

50-
func TestSynchronous(t *testing.T) {
49+
func TestChannelBasedProducer(t *testing.T) {
50+
defer goleak.VerifyNone(t)
51+
52+
partition := int32(0)
53+
sr, opts := newFixtures()
54+
p := newProducer(t, opts...)
55+
56+
done := make(chan struct{})
57+
var sent *kafka.Message
58+
go func() {
59+
defer close(done)
60+
sent = requireEventIsMessage(t, <-p.Events())
61+
}()
62+
go func() {
63+
p.ProduceChannel() <- &kafka.Message{
64+
TopicPartition: kafka.TopicPartition{
65+
Topic: &testTopic,
66+
Partition: partition,
67+
},
68+
Key: key,
69+
Value: val,
70+
}
71+
}()
72+
73+
// Wait for the delivery report goroutine to finish.
74+
<-done
75+
require.NoError(t, sent.TopicPartition.Error)
76+
77+
p.Close()
78+
79+
recv := consumeMessage(t, kafka.TopicPartition{
80+
Topic: &testTopic,
81+
Partition: partition,
82+
Offset: sent.TopicPartition.Offset,
83+
}, opts...)
84+
85+
assert.Equal(t, sent.String(), recv.String())
86+
87+
spans := sr.Ended()
88+
require.Len(t, spans, 2)
89+
pSpan, cSpan := spans[0], spans[1]
90+
// The should be linked via propagated headers.
91+
assert.Equal(t, pSpan.SpanContext().TraceID(), cSpan.SpanContext().TraceID())
92+
assertProducerSpan(t, pSpan)
93+
assertConsumerSpan(t, cSpan)
94+
}
95+
96+
func TestFunctionBasedProducer(t *testing.T) {
5197
defer goleak.VerifyNone(t)
5298

5399
partition := int32(0)
@@ -65,27 +111,15 @@ func TestSynchronous(t *testing.T) {
65111
}, deliveryCh)
66112
require.NoError(t, err)
67113
sent := requireEventIsMessage(t, <-deliveryCh)
114+
require.NoError(t, sent.TopicPartition.Error)
68115

69116
p.Close()
70117

71-
c := newConsumer(t, opts...)
72-
require.NoError(t, c.Assign([]kafka.TopicPartition{{
118+
recv := consumeMessage(t, kafka.TopicPartition{
73119
Topic: &testTopic,
74120
Partition: partition,
75121
Offset: sent.TopicPartition.Offset,
76-
}}))
77-
recv := requireEventIsMessage(t, func() kafka.Event {
78-
for {
79-
if e := c.Poll(100); e != nil {
80-
return e
81-
}
82-
}
83-
}())
84-
_, err = c.CommitMessage(recv)
85-
assert.NoError(t, err)
86-
assert.NoError(t, c.Unassign())
87-
88-
c.Close()
122+
}, opts...)
89123

90124
assert.Equal(t, sent.String(), recv.String())
91125

@@ -124,6 +158,25 @@ func newConsumer(t *testing.T, opts ...splunkkafka.Option) *splunkkafka.Consumer
124158
return c
125159
}
126160

161+
func consumeMessage(t *testing.T, tp kafka.TopicPartition, opts ...splunkkafka.Option) *kafka.Message {
162+
c := newConsumer(t, opts...)
163+
require.NoError(t, c.Assign([]kafka.TopicPartition{tp}))
164+
recv := requireEventIsMessage(t, func() kafka.Event {
165+
for {
166+
if e := c.Poll(100); e != nil {
167+
return e
168+
}
169+
}
170+
}())
171+
assert.NoError(t, recv.TopicPartition.Error)
172+
_, err := c.CommitMessage(recv)
173+
assert.NoError(t, err)
174+
assert.NoError(t, c.Unassign())
175+
176+
c.Close()
177+
return recv
178+
}
179+
127180
func assertProducerSpan(t *testing.T, span trace.ReadOnlySpan) {
128181
assert.Equal(t, fmt.Sprintf("%s send", testTopic), span.Name())
129182
assert.Equal(t, traceapi.SpanKindProducer, span.SpanKind())

0 commit comments

Comments
 (0)