@@ -12,50 +12,87 @@ import (
1212 "github.com/stretchr/testify/require"
1313 "go.opentelemetry.io/collector/component/componenttest"
1414 "go.opentelemetry.io/collector/consumer/consumertest"
15+ "go.opentelemetry.io/collector/exporter"
1516 "go.opentelemetry.io/collector/exporter/exportertest"
1617 "go.opentelemetry.io/collector/pdata/pcommon"
1718 "go.opentelemetry.io/collector/pdata/pmetric"
19+ "go.opentelemetry.io/collector/receiver"
1820 "go.opentelemetry.io/collector/receiver/receivertest"
1921 "go.uber.org/zap"
2022
2123 "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/stefexporter"
24+ "github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/testutil"
2225 "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/stefreceiver/internal/metadata"
2326)
2427
25- func TestRoundtrip (t * testing.T ) {
28+ func genMetrics () pmetric.Metrics {
29+ data := pmetric .NewMetrics ()
30+ metricPoint := data .ResourceMetrics ().AppendEmpty ().ScopeMetrics ().AppendEmpty ().Metrics ().AppendEmpty ()
31+ metricPoint .SetName ("foo" )
32+ gauge := metricPoint .SetEmptyGauge ()
33+ dp := gauge .DataPoints ().AppendEmpty ()
34+ dp .SetIntValue (1 )
35+ dp .SetTimestamp (pcommon .NewTimestampFromTime (time .Now ()))
36+ return data
37+ }
38+
39+ func createReceiver (t * testing.T , endpoint string ) (receiver.Metrics , * consumertest.MetricsSink ) {
2640 sink := & consumertest.MetricsSink {}
2741 settings := receivertest .NewNopSettings (metadata .Type )
2842 settings .Logger , _ = zap .NewDevelopment ()
29- m , err := NewFactory ().CreateMetrics (context .Background (), settings , createDefaultConfig (), sink )
30- t .Cleanup (func () {
31- err = m .Shutdown (context .Background ())
32- require .NoError (t , err )
33- })
43+ rcvCfg := createDefaultConfig ()
44+ rcvCfg .(* Config ).NetAddr .Endpoint = endpoint
45+ m , err := NewFactory ().CreateMetrics (context .Background (), settings , rcvCfg , sink )
3446 require .NoError (t , m .Start (context .Background (), componenttest .NewNopHost ()))
3547 require .NoError (t , err )
48+ return m , sink
49+ }
50+
51+ func createExporter (t * testing.T , endpoint string ) exporter.Metrics {
3652 cfg := stefexporter .NewFactory ().CreateDefaultConfig ().(* stefexporter.Config )
37- cfg .Endpoint = "localhost:4320"
53+ cfg .Endpoint = endpoint
3854 cfg .TLSSetting .Insecure = true
39- exporterSettings := exportertest .NewNopSettings (metadata .Type )
40- exporterSettings .Logger , _ = zap .NewDevelopment ()
41- exporter , err := stefexporter .NewFactory ().CreateMetrics (context .Background (), exporterSettings , cfg )
55+ settings := exportertest .NewNopSettings (metadata .Type )
56+ settings .Logger , _ = zap .NewDevelopment ()
57+ exp , err := stefexporter .NewFactory ().CreateMetrics (context .Background (), settings , cfg )
4258 require .NoError (t , err )
43- t .Cleanup (func () {
44- err = exporter .Shutdown (context .Background ())
45- require .NoError (t , err )
46- })
59+ require .NoError (t , exp .Start (context .Background (), componenttest .NewNopHost ()))
60+ return exp
61+ }
4762
48- require . NoError ( t , exporter . Start ( context . Background (), componenttest . NewNopHost ()))
49- data := pmetric . NewMetrics ( )
50- metricPoint := data . ResourceMetrics (). AppendEmpty (). ScopeMetrics (). AppendEmpty (). Metrics (). AppendEmpty ( )
51- metricPoint . SetName ( "foo" )
52- gauge := metricPoint . SetEmptyGauge ()
53- dp := gauge . DataPoints (). AppendEmpty ( )
54- dp . SetIntValue ( 1 )
55- dp . SetTimestamp ( pcommon . NewTimestampFromTime ( time . Now ()))
56- err = exporter .ConsumeMetrics (context .Background (), data )
63+ func TestRoundtrip ( t * testing. T ) {
64+ endpoint := testutil . GetAvailableLocalAddress ( t )
65+ m , sink := createReceiver ( t , endpoint )
66+ t . Cleanup ( func () { require . NoError ( t , m . Shutdown ( context . Background ())) } )
67+
68+ exporter := createExporter ( t , endpoint )
69+ t . Cleanup ( func () { require . NoError ( t , exporter . Shutdown ( context . Background ())) } )
70+
71+ err : = exporter .ConsumeMetrics (context .Background (), genMetrics () )
5772 require .NoError (t , err )
5873 assert .EventuallyWithT (t , func (tt * assert.CollectT ) {
5974 assert .Len (tt , sink .AllMetrics (), 1 )
6075 }, 1 * time .Minute , 10 * time .Millisecond )
6176}
77+
78+ func TestShutdownWhenConnected (t * testing.T ) {
79+ endpoint := testutil .GetAvailableLocalAddress (t )
80+ receiver , sink := createReceiver (t , endpoint )
81+ exporter := createExporter (t , endpoint )
82+
83+ require .NoError (t , exporter .ConsumeMetrics (context .Background (), genMetrics ()))
84+
85+ assert .EventuallyWithT (
86+ t , func (tt * assert.CollectT ) {
87+ assert .Len (tt , sink .AllMetrics (), 1 )
88+ }, 1 * time .Minute , 10 * time .Millisecond ,
89+ )
90+
91+ // Try shutdown receiver before shutting down exporter.
92+ // This means there is an active connection at the receiver.
93+ // Previously we had a bug causing the receiver Shutdown to hang forever
94+ // in this situation.
95+ require .NoError (t , receiver .Shutdown (context .Background ()))
96+
97+ require .NoError (t , exporter .Shutdown (context .Background ()))
98+ }
0 commit comments