@@ -5,78 +5,238 @@ package googlecloudpubsubexporter
5
5
6
6
import (
7
7
"context"
8
+ "fmt"
8
9
"testing"
9
- "time"
10
10
11
11
pb "cloud.google.com/go/pubsub/apiv1/pubsubpb"
12
- "cloud.google.com/go/pubsub/pstest"
12
+ "github.com/google/uuid"
13
+ "github.com/googleapis/gax-go/v2"
13
14
"github.com/stretchr/testify/assert"
14
- "go.opentelemetry.io/collector/exporter/exporterhelper"
15
+ "github.com/stretchr/testify/require"
16
+ "go.opentelemetry.io/collector/component/componenttest"
15
17
"go.opentelemetry.io/collector/exporter/exportertest"
16
18
"go.opentelemetry.io/collector/pdata/plog"
17
19
"go.opentelemetry.io/collector/pdata/pmetric"
18
20
"go.opentelemetry.io/collector/pdata/ptrace"
19
21
)
20
22
21
- func TestName (t * testing.T ) {
22
- exporter := & pubsubExporter {}
23
- assert .Equal (t , "googlecloudpubsub" , exporter .Name ())
24
- }
23
+ const (
24
+ defaultUUID = "00000000-0000-0000-0000-000000000000"
25
+ defaultProjectID = "my-project"
26
+ defaultTopic = "projects/my-project/topics/otlp"
27
+ )
25
28
26
- func TestExporterDefaultSettings (t * testing.T ) {
27
- ctx := context .Background ()
28
- // Start a fake server running locally.
29
- srv := pstest .NewServer ()
30
- defer srv .Close ()
31
- _ , err := srv .GServer .CreateTopic (ctx , & pb.Topic {
32
- Name : "projects/my-project/topics/otlp" ,
29
+ func TestExporterNoData (t * testing.T ) {
30
+ exporter , publisher := newTestExporter (t , func (config * Config ) {
31
+ config .Watermark .Behavior = "earliest"
33
32
})
34
- assert .NoError (t , err )
35
33
36
- factory := NewFactory ()
37
- cfg := factory .CreateDefaultConfig ()
38
- exporterConfig := cfg .(* Config )
39
- exporterConfig .Endpoint = srv .Addr
40
- exporterConfig .Insecure = true
41
- exporterConfig .ProjectID = "my-project"
42
- exporterConfig .Topic = "projects/my-project/topics/otlp"
43
- exporterConfig .TimeoutSettings = exporterhelper.TimeoutConfig {
44
- Timeout : 12 * time .Second ,
45
- }
46
- exporter := ensureExporter (exportertest .NewNopSettings (), exporterConfig )
47
- assert .NoError (t , exporter .start (ctx , nil ))
48
- assert .NoError (t , exporter .consumeTraces (ctx , ptrace .NewTraces ()))
49
- assert .NoError (t , exporter .consumeMetrics (ctx , pmetric .NewMetrics ()))
34
+ ctx := context .Background ()
50
35
assert .NoError (t , exporter .consumeLogs (ctx , plog .NewLogs ()))
51
- assert .NoError (t , exporter .shutdown (ctx ))
36
+ assert .NoError (t , exporter .consumeMetrics (ctx , pmetric .NewMetrics ()))
37
+ assert .NoError (t , exporter .consumeTraces (ctx , ptrace .NewTraces ()))
38
+
39
+ assert .Zero (t , publisher .requests )
52
40
}
53
41
54
- func TestExporterCompression (t * testing.T ) {
55
- ctx := context .Background ()
56
- // Start a fake server running locally.
57
- srv := pstest .NewServer ()
58
- defer srv .Close ()
59
- _ , err := srv .GServer .CreateTopic (ctx , & pb.Topic {
60
- Name : "projects/my-project/topics/otlp" ,
42
+ func TestExporterClientError (t * testing.T ) {
43
+ cfg := NewFactory ().CreateDefaultConfig ().(* Config )
44
+ cfg .ProjectID = defaultProjectID
45
+ cfg .Topic = defaultTopic
46
+ require .NoError (t , cfg .Validate ())
47
+
48
+ exporter := ensureExporter (exportertest .NewNopSettings (), cfg )
49
+ exporter .makeClient = func (context.Context , * Config , string ) (publisherClient , error ) {
50
+ return nil , fmt .Errorf ("something went wrong" )
51
+ }
52
+
53
+ require .Error (t , exporter .start (context .Background (), componenttest .NewNopHost ()))
54
+ }
55
+
56
+ func TestExporterSimpleData (t * testing.T ) {
57
+ t .Run ("logs" , func (t * testing.T ) {
58
+ exporter , publisher := newTestExporter (t )
59
+
60
+ logs := plog .NewLogs ()
61
+ logs .ResourceLogs ().AppendEmpty ().ScopeLogs ().AppendEmpty ().LogRecords ().AppendEmpty ().Body ().SetStr ("some log message" )
62
+
63
+ require .NoError (t , exporter .consumeLogs (context .Background (), logs ))
64
+ require .Len (t , publisher .requests , 1 )
65
+
66
+ request := publisher .requests [0 ]
67
+ assert .Equal (t , defaultTopic , request .Topic )
68
+ assert .Len (t , request .Messages , 1 )
69
+
70
+ message := request .Messages [0 ]
71
+ assert .NotEmpty (t , message .Data )
72
+ assert .Subset (t , message .Attributes , map [string ]string {
73
+ "ce-type" : "org.opentelemetry.otlp.logs.v1" ,
74
+ "content-type" : "application/protobuf" ,
75
+ })
76
+ })
77
+
78
+ t .Run ("metrics" , func (t * testing.T ) {
79
+ exporter , publisher := newTestExporter (t )
80
+
81
+ metrics := pmetric .NewMetrics ()
82
+ metric := metrics .ResourceMetrics ().AppendEmpty ().ScopeMetrics ().AppendEmpty ().Metrics ().AppendEmpty ()
83
+ metric .SetName ("some.metric" )
84
+ metric .SetEmptyGauge ().DataPoints ().AppendEmpty ().SetIntValue (42 )
85
+
86
+ require .NoError (t , exporter .consumeMetrics (context .Background (), metrics ))
87
+ require .Len (t , publisher .requests , 1 )
88
+
89
+ request := publisher .requests [0 ]
90
+ assert .Equal (t , defaultTopic , request .Topic )
91
+ assert .Len (t , request .Messages , 1 )
92
+
93
+ message := request .Messages [0 ]
94
+ assert .NotEmpty (t , message .Data )
95
+ assert .Subset (t , message .Attributes , map [string ]string {
96
+ "ce-type" : "org.opentelemetry.otlp.metrics.v1" ,
97
+ "content-type" : "application/protobuf" ,
98
+ })
61
99
})
62
- assert .NoError (t , err )
100
+
101
+ t .Run ("traces" , func (t * testing.T ) {
102
+ exporter , publisher := newTestExporter (t )
103
+
104
+ traces := ptrace .NewTraces ()
105
+ span := traces .ResourceSpans ().AppendEmpty ().ScopeSpans ().AppendEmpty ().Spans ().AppendEmpty ()
106
+ span .SetName ("some span" )
107
+
108
+ require .NoError (t , exporter .consumeTraces (context .Background (), traces ))
109
+ require .Len (t , publisher .requests , 1 )
110
+
111
+ request := publisher .requests [0 ]
112
+ assert .Equal (t , defaultTopic , request .Topic )
113
+ assert .Len (t , request .Messages , 1 )
114
+
115
+ message := request .Messages [0 ]
116
+ assert .NotEmpty (t , message .Data )
117
+ assert .Subset (t , message .Attributes , map [string ]string {
118
+ "ce-type" : "org.opentelemetry.otlp.traces.v1" ,
119
+ "content-type" : "application/protobuf" ,
120
+ })
121
+ })
122
+ }
123
+
124
+ func TestExporterSimpleDataWithCompression (t * testing.T ) {
125
+ withCompression := func (config * Config ) {
126
+ config .Compression = "gzip"
127
+ }
128
+
129
+ t .Run ("logs" , func (t * testing.T ) {
130
+ exporter , publisher := newTestExporter (t , withCompression )
131
+
132
+ logs := plog .NewLogs ()
133
+ logs .ResourceLogs ().AppendEmpty ().ScopeLogs ().AppendEmpty ().LogRecords ().AppendEmpty ().Body ().SetStr ("some log message" )
134
+
135
+ require .NoError (t , exporter .consumeLogs (context .Background (), logs ))
136
+ require .Len (t , publisher .requests , 1 )
137
+
138
+ request := publisher .requests [0 ]
139
+ assert .Equal (t , defaultTopic , request .Topic )
140
+ assert .Len (t , request .Messages , 1 )
141
+
142
+ message := request .Messages [0 ]
143
+ assert .NotEmpty (t , message .Data )
144
+ assert .Subset (t , message .Attributes , map [string ]string {
145
+ "ce-id" : "00000000-0000-0000-0000-000000000000" ,
146
+ "ce-source" : "/opentelemetry/collector/googlecloudpubsub/latest" ,
147
+ "ce-specversion" : "1.0" ,
148
+ "ce-type" : "org.opentelemetry.otlp.logs.v1" ,
149
+ "content-type" : "application/protobuf" ,
150
+ "content-encoding" : "gzip" ,
151
+ })
152
+ })
153
+
154
+ t .Run ("metrics" , func (t * testing.T ) {
155
+ exporter , publisher := newTestExporter (t , withCompression )
156
+
157
+ metrics := pmetric .NewMetrics ()
158
+ metric := metrics .ResourceMetrics ().AppendEmpty ().ScopeMetrics ().AppendEmpty ().Metrics ().AppendEmpty ()
159
+ metric .SetName ("some.metric" )
160
+ metric .SetEmptyGauge ().DataPoints ().AppendEmpty ().SetIntValue (42 )
161
+
162
+ require .NoError (t , exporter .consumeMetrics (context .Background (), metrics ))
163
+ require .Len (t , publisher .requests , 1 )
164
+
165
+ request := publisher .requests [0 ]
166
+ assert .Equal (t , defaultTopic , request .Topic )
167
+ assert .Len (t , request .Messages , 1 )
168
+
169
+ message := request .Messages [0 ]
170
+ assert .NotEmpty (t , message .Data )
171
+ assert .Subset (t , message .Attributes , map [string ]string {
172
+ "ce-type" : "org.opentelemetry.otlp.metrics.v1" ,
173
+ "content-type" : "application/protobuf" ,
174
+ "content-encoding" : "gzip" ,
175
+ })
176
+ })
177
+
178
+ t .Run ("traces" , func (t * testing.T ) {
179
+ exporter , publisher := newTestExporter (t , withCompression )
180
+
181
+ traces := ptrace .NewTraces ()
182
+ span := traces .ResourceSpans ().AppendEmpty ().ScopeSpans ().AppendEmpty ().Spans ().AppendEmpty ()
183
+ span .SetName ("some span" )
184
+
185
+ require .NoError (t , exporter .consumeTraces (context .Background (), traces ))
186
+ require .Len (t , publisher .requests , 1 )
187
+
188
+ request := publisher .requests [0 ]
189
+ assert .Equal (t , defaultTopic , request .Topic )
190
+ assert .Len (t , request .Messages , 1 )
191
+
192
+ message := request .Messages [0 ]
193
+ assert .NotEmpty (t , message .Data )
194
+ assert .Subset (t , message .Attributes , map [string ]string {
195
+ "ce-type" : "org.opentelemetry.otlp.traces.v1" ,
196
+ "content-type" : "application/protobuf" ,
197
+ "content-encoding" : "gzip" ,
198
+ })
199
+ })
200
+ }
201
+
202
+ // Helpers
203
+
204
+ func newTestExporter (t * testing.T , options ... func (* Config )) (* pubsubExporter , * mockPublisher ) {
205
+ t .Helper ()
63
206
64
207
factory := NewFactory ()
65
- cfg := factory .CreateDefaultConfig ()
66
- exporterConfig := cfg .(* Config )
67
- exporterConfig .Endpoint = srv .Addr
68
- exporterConfig .UserAgent = "test-user-agent"
69
- exporterConfig .Insecure = true
70
- exporterConfig .ProjectID = "my-project"
71
- exporterConfig .Topic = "projects/my-project/topics/otlp"
72
- exporterConfig .TimeoutSettings = exporterhelper.TimeoutConfig {
73
- Timeout : 12 * time .Second ,
208
+ cfg := factory .CreateDefaultConfig ().(* Config )
209
+ cfg .ProjectID = defaultProjectID
210
+ cfg .Topic = defaultTopic
211
+ for _ , option := range options {
212
+ option (cfg )
74
213
}
75
- exporterConfig .Compression = "gzip"
76
- exporter := ensureExporter (exportertest .NewNopSettings (), exporterConfig )
77
- assert .NoError (t , exporter .start (ctx , nil ))
78
- assert .NoError (t , exporter .consumeTraces (ctx , ptrace .NewTraces ()))
79
- assert .NoError (t , exporter .consumeMetrics (ctx , pmetric .NewMetrics ()))
80
- assert .NoError (t , exporter .consumeLogs (ctx , plog .NewLogs ()))
81
- assert .NoError (t , exporter .shutdown (ctx ))
214
+ require .NoError (t , cfg .Validate ())
215
+
216
+ exporter := ensureExporter (exportertest .NewNopSettings (), cfg )
217
+ publisher := & mockPublisher {}
218
+ exporter .makeClient = func (context.Context , * Config , string ) (publisherClient , error ) {
219
+ return publisher , nil
220
+ }
221
+ exporter .makeUUID = func () (uuid.UUID , error ) {
222
+ return uuid .Parse (defaultUUID )
223
+ }
224
+
225
+ require .NoError (t , exporter .start (context .Background (), componenttest .NewNopHost ()))
226
+ t .Cleanup (func () { assert .NoError (t , exporter .shutdown (context .Background ())) })
227
+
228
+ return exporter , publisher
229
+ }
230
+
231
+ type mockPublisher struct {
232
+ requests []* pb.PublishRequest
233
+ }
234
+
235
+ func (m * mockPublisher ) Publish (_ context.Context , request * pb.PublishRequest , _ ... gax.CallOption ) (* pb.PublishResponse , error ) {
236
+ m .requests = append (m .requests , request )
237
+ return & pb.PublishResponse {}, nil
238
+ }
239
+
240
+ func (m * mockPublisher ) Close () error {
241
+ return nil
82
242
}
0 commit comments