@@ -7,15 +7,18 @@ import (
7
7
"context"
8
8
"errors"
9
9
"sync"
10
+ "sync/atomic"
10
11
"testing"
11
12
"time"
12
13
13
14
"github.com/IBM/sarama"
14
15
"github.com/cenkalti/backoff/v4"
15
16
"github.com/stretchr/testify/assert"
16
17
"github.com/stretchr/testify/require"
18
+ "go.opentelemetry.io/collector/client"
17
19
"go.opentelemetry.io/collector/component"
18
20
"go.opentelemetry.io/collector/component/componenttest"
21
+ "go.opentelemetry.io/collector/consumer"
19
22
"go.opentelemetry.io/collector/consumer/consumertest"
20
23
"go.opentelemetry.io/collector/pdata/plog"
21
24
"go.opentelemetry.io/collector/pdata/pmetric"
@@ -111,11 +114,22 @@ func TestTracesConsumerGroupHandler(t *testing.T) {
111
114
112
115
obsrecv , err := receiverhelper .NewObsReport (receiverhelper.ObsReportSettings {ReceiverCreateSettings : receivertest .NewNopSettings (metadata .Type )})
113
116
require .NoError (t , err )
117
+ var called atomic.Bool
114
118
c := tracesConsumerGroupHandler {
115
- unmarshaler : newPdataTracesUnmarshaler (& ptrace.ProtoUnmarshaler {}, defaultEncoding ),
116
- logger : zap .NewNop (),
117
- ready : make (chan bool ),
118
- nextConsumer : consumertest .NewNop (),
119
+ unmarshaler : newPdataTracesUnmarshaler (& ptrace.ProtoUnmarshaler {}, defaultEncoding ),
120
+ logger : zap .NewNop (),
121
+ ready : make (chan bool ),
122
+ nextConsumer : func () consumer.Traces {
123
+ c , err := consumer .NewTraces (func (ctx context.Context , _ ptrace.Traces ) error {
124
+ defer called .CompareAndSwap (false , true )
125
+ info := client .FromContext (ctx )
126
+ assert .Equal (t , []string {"abcdefg" }, info .Metadata .Get ("x-tenant-id" ))
127
+ assert .Equal (t , []string {"1234" , "5678" }, info .Metadata .Get ("x-request-ids" ))
128
+ return nil
129
+ })
130
+ require .NoError (t , err )
131
+ return c
132
+ }(),
119
133
obsrecv : obsrecv ,
120
134
headerExtractor : & nopHeaderExtractor {},
121
135
telemetryBuilder : telemetryBuilder ,
@@ -141,9 +155,25 @@ func TestTracesConsumerGroupHandler(t *testing.T) {
141
155
wg .Done ()
142
156
}()
143
157
144
- groupClaim .messageChan <- & sarama.ConsumerMessage {}
158
+ groupClaim .messageChan <- & sarama.ConsumerMessage {
159
+ Headers : []* sarama.RecordHeader {
160
+ {
161
+ Key : []byte ("x-tenant-id" ),
162
+ Value : []byte ("abcdefg" ),
163
+ },
164
+ {
165
+ Key : []byte ("x-request-ids" ),
166
+ Value : []byte ("1234" ),
167
+ },
168
+ {
169
+ Key : []byte ("x-request-ids" ),
170
+ Value : []byte ("5678" ),
171
+ },
172
+ },
173
+ }
145
174
close (groupClaim .messageChan )
146
175
wg .Wait ()
176
+ assert .True (t , called .Load ()) // Ensure nextConsumer was called.
147
177
}
148
178
149
179
func TestTracesConsumerGroupHandler_session_done (t * testing.T ) {
@@ -410,11 +440,22 @@ func TestMetricsConsumerGroupHandler(t *testing.T) {
410
440
411
441
obsrecv , err := receiverhelper .NewObsReport (receiverhelper.ObsReportSettings {ReceiverCreateSettings : receivertest .NewNopSettings (metadata .Type )})
412
442
require .NoError (t , err )
443
+ var called atomic.Bool
413
444
c := metricsConsumerGroupHandler {
414
- unmarshaler : newPdataMetricsUnmarshaler (& pmetric.ProtoUnmarshaler {}, defaultEncoding ),
415
- logger : zap .NewNop (),
416
- ready : make (chan bool ),
417
- nextConsumer : consumertest .NewNop (),
445
+ unmarshaler : newPdataMetricsUnmarshaler (& pmetric.ProtoUnmarshaler {}, defaultEncoding ),
446
+ logger : zap .NewNop (),
447
+ ready : make (chan bool ),
448
+ nextConsumer : func () consumer.Metrics {
449
+ c , err := consumer .NewMetrics (func (ctx context.Context , _ pmetric.Metrics ) error {
450
+ defer called .CompareAndSwap (false , true )
451
+ info := client .FromContext (ctx )
452
+ assert .Equal (t , []string {"abcdefg" }, info .Metadata .Get ("x-tenant-id" ))
453
+ assert .Equal (t , []string {"1234" , "5678" }, info .Metadata .Get ("x-request-ids" ))
454
+ return nil
455
+ })
456
+ require .NoError (t , err )
457
+ return c
458
+ }(),
418
459
obsrecv : obsrecv ,
419
460
headerExtractor : & nopHeaderExtractor {},
420
461
telemetryBuilder : telemetryBuilder ,
@@ -440,9 +481,25 @@ func TestMetricsConsumerGroupHandler(t *testing.T) {
440
481
wg .Done ()
441
482
}()
442
483
443
- groupClaim .messageChan <- & sarama.ConsumerMessage {}
484
+ groupClaim .messageChan <- & sarama.ConsumerMessage {
485
+ Headers : []* sarama.RecordHeader {
486
+ {
487
+ Key : []byte ("x-tenant-id" ),
488
+ Value : []byte ("abcdefg" ),
489
+ },
490
+ {
491
+ Key : []byte ("x-request-ids" ),
492
+ Value : []byte ("1234" ),
493
+ },
494
+ {
495
+ Key : []byte ("x-request-ids" ),
496
+ Value : []byte ("5678" ),
497
+ },
498
+ },
499
+ }
444
500
close (groupClaim .messageChan )
445
501
wg .Wait ()
502
+ assert .True (t , called .Load ()) // Ensure nextConsumer was called.
446
503
}
447
504
448
505
func TestMetricsConsumerGroupHandler_session_done (t * testing.T ) {
@@ -722,11 +779,22 @@ func TestLogsConsumerGroupHandler(t *testing.T) {
722
779
723
780
obsrecv , err := receiverhelper .NewObsReport (receiverhelper.ObsReportSettings {ReceiverCreateSettings : receivertest .NewNopSettings (metadata .Type )})
724
781
require .NoError (t , err )
782
+ var called atomic.Bool
725
783
c := logsConsumerGroupHandler {
726
- unmarshaler : newPdataLogsUnmarshaler (& plog.ProtoUnmarshaler {}, defaultEncoding ),
727
- logger : zap .NewNop (),
728
- ready : make (chan bool ),
729
- nextConsumer : consumertest .NewNop (),
784
+ unmarshaler : newPdataLogsUnmarshaler (& plog.ProtoUnmarshaler {}, defaultEncoding ),
785
+ logger : zap .NewNop (),
786
+ ready : make (chan bool ),
787
+ nextConsumer : func () consumer.Logs {
788
+ c , err := consumer .NewLogs (func (ctx context.Context , _ plog.Logs ) error {
789
+ defer called .CompareAndSwap (false , true )
790
+ info := client .FromContext (ctx )
791
+ assert .Equal (t , []string {"abcdefg" }, info .Metadata .Get ("x-tenant-id" ))
792
+ assert .Equal (t , []string {"1234" , "5678" }, info .Metadata .Get ("x-request-ids" ))
793
+ return nil
794
+ })
795
+ require .NoError (t , err )
796
+ return c
797
+ }(),
730
798
obsrecv : obsrecv ,
731
799
headerExtractor : & nopHeaderExtractor {},
732
800
telemetryBuilder : telemetryBuilder ,
@@ -752,9 +820,25 @@ func TestLogsConsumerGroupHandler(t *testing.T) {
752
820
wg .Done ()
753
821
}()
754
822
755
- groupClaim .messageChan <- & sarama.ConsumerMessage {}
823
+ groupClaim .messageChan <- & sarama.ConsumerMessage {
824
+ Headers : []* sarama.RecordHeader {
825
+ {
826
+ Key : []byte ("x-tenant-id" ),
827
+ Value : []byte ("abcdefg" ),
828
+ },
829
+ {
830
+ Key : []byte ("x-request-ids" ),
831
+ Value : []byte ("1234" ),
832
+ },
833
+ {
834
+ Key : []byte ("x-request-ids" ),
835
+ Value : []byte ("5678" ),
836
+ },
837
+ },
838
+ }
756
839
close (groupClaim .messageChan )
757
840
wg .Wait ()
841
+ assert .True (t , called .Load ()) // Ensure nextConsumer was called.
758
842
}
759
843
760
844
func TestLogsConsumerGroupHandler_session_done (t * testing.T ) {
@@ -1268,3 +1352,74 @@ func (c *nopNoUnmarshalerComponent) Start(_ context.Context, _ component.Host) e
1268
1352
func (c * nopNoUnmarshalerComponent ) Shutdown (_ context.Context ) error {
1269
1353
return nil
1270
1354
}
1355
+
1356
+ func Test_newContextWithHeaders (t * testing.T ) {
1357
+ type args struct {
1358
+ ctx context.Context
1359
+ headers []* sarama.RecordHeader
1360
+ }
1361
+ tests := []struct {
1362
+ name string
1363
+ args args
1364
+ want map [string ][]string
1365
+ }{
1366
+ {
1367
+ name : "no headers" ,
1368
+ args : args {
1369
+ ctx : context .Background (),
1370
+ headers : []* sarama.RecordHeader {},
1371
+ },
1372
+ want : map [string ][]string {},
1373
+ },
1374
+ {
1375
+ name : "single header" ,
1376
+ args : args {
1377
+ ctx : context .Background (),
1378
+ headers : []* sarama.RecordHeader {
1379
+ {Key : []byte ("key1" ), Value : []byte ("value1" )},
1380
+ },
1381
+ },
1382
+ want : map [string ][]string {
1383
+ "key1" : {"value1" },
1384
+ },
1385
+ },
1386
+ {
1387
+ name : "multiple headers" ,
1388
+ args : args {
1389
+ ctx : context .Background (),
1390
+ headers : []* sarama.RecordHeader {
1391
+ {Key : []byte ("key1" ), Value : []byte ("value1" )},
1392
+ {Key : []byte ("key2" ), Value : []byte ("value2" )},
1393
+ },
1394
+ },
1395
+ want : map [string ][]string {
1396
+ "key1" : {"value1" },
1397
+ "key2" : {"value2" },
1398
+ },
1399
+ },
1400
+ {
1401
+ name : "duplicate keys" ,
1402
+ args : args {
1403
+ ctx : context .Background (),
1404
+ headers : []* sarama.RecordHeader {
1405
+ {Key : []byte ("key1" ), Value : []byte ("value1" )},
1406
+ {Key : []byte ("key1" ), Value : []byte ("value2" )},
1407
+ },
1408
+ },
1409
+ want : map [string ][]string {
1410
+ "key1" : {"value1" , "value2" },
1411
+ },
1412
+ },
1413
+ }
1414
+
1415
+ for _ , tt := range tests {
1416
+ t .Run (tt .name , func (t * testing.T ) {
1417
+ ctx := newContextWithHeaders (tt .args .ctx , tt .args .headers )
1418
+ clientInfo := client .FromContext (ctx )
1419
+ for k , wantVal := range tt .want {
1420
+ val := clientInfo .Metadata .Get (k )
1421
+ assert .Equal (t , wantVal , val )
1422
+ }
1423
+ })
1424
+ }
1425
+ }
0 commit comments