@@ -13,9 +13,14 @@ import (
13
13
"time"
14
14
15
15
"cloud.google.com/go/pubsub/apiv1/pubsubpb"
16
+ "go.opentelemetry.io/collector/receiver"
17
+ "go.opentelemetry.io/otel/attribute"
18
+ "go.opentelemetry.io/otel/metric"
16
19
"go.uber.org/zap"
17
20
"google.golang.org/grpc/codes"
18
21
"google.golang.org/grpc/status"
22
+
23
+ "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudpubsubreceiver/internal/metadata"
19
24
)
20
25
21
26
// Time to wait before restarting, when the stream stopped
@@ -36,7 +41,8 @@ type StreamHandler struct {
36
41
streamWaitGroup sync.WaitGroup
37
42
// wait group for the handler
38
43
handlerWaitGroup sync.WaitGroup
39
- logger * zap.Logger
44
+ settings receiver.Settings
45
+ telemetryBuilder * metadata.TelemetryBuilder
40
46
// time that acknowledge loop waits before acknowledging messages
41
47
ackBatchWait time.Duration
42
48
@@ -51,19 +57,21 @@ func (handler *StreamHandler) ack(ackID string) {
51
57
52
58
func NewHandler (
53
59
ctx context.Context ,
54
- logger * zap.Logger ,
60
+ settings receiver.Settings ,
61
+ telemetryBuilder * metadata.TelemetryBuilder ,
55
62
client SubscriberClient ,
56
63
clientID string ,
57
64
subscription string ,
58
65
callback func (ctx context.Context , message * pubsubpb.ReceivedMessage ) error ,
59
66
) (* StreamHandler , error ) {
60
67
handler := StreamHandler {
61
- logger : logger ,
62
- client : client ,
63
- clientID : clientID ,
64
- subscription : subscription ,
65
- pushMessage : callback ,
66
- ackBatchWait : 10 * time .Second ,
68
+ settings : settings ,
69
+ telemetryBuilder : telemetryBuilder ,
70
+ client : client ,
71
+ clientID : clientID ,
72
+ subscription : subscription ,
73
+ pushMessage : callback ,
74
+ ackBatchWait : 10 * time .Second ,
67
75
}
68
76
return & handler , handler .initStream (ctx )
69
77
}
@@ -85,6 +93,11 @@ func (handler *StreamHandler) initStream(ctx context.Context) error {
85
93
_ = handler .stream .CloseSend ()
86
94
return err
87
95
}
96
+ handler .telemetryBuilder .ReceiverGooglecloudpubsubStreamRestarts .Add (ctx , 1 ,
97
+ metric .WithAttributes (
98
+ attribute .String ("otelcol.component.kind" , "receiver" ),
99
+ attribute .String ("otelcol.component.id" , handler .settings .ID .String ()),
100
+ ))
88
101
return nil
89
102
}
90
103
@@ -102,7 +115,7 @@ func (handler *StreamHandler) recoverableStream(ctx context.Context) {
102
115
var loopCtx context.Context
103
116
loopCtx , cancel := context .WithCancel (ctx )
104
117
105
- handler .logger . Info ("Starting Streaming Pull" )
118
+ handler .settings . Logger . Debug ("Starting Streaming Pull" )
106
119
handler .streamWaitGroup .Add (2 )
107
120
go handler .requestStream (loopCtx , cancel )
108
121
go handler .responseStream (loopCtx , cancel )
@@ -117,13 +130,13 @@ func (handler *StreamHandler) recoverableStream(ctx context.Context) {
117
130
if handler .isRunning .Load () {
118
131
err := handler .initStream (ctx )
119
132
if err != nil {
120
- handler .logger .Error ("Failed to recovery stream." )
133
+ handler .settings . Logger .Error ("Failed to recovery stream." )
121
134
}
122
135
}
123
- handler .logger . Warn ("End of recovery loop, restarting." )
136
+ handler .settings . Logger . Debug ("End of recovery loop, restarting." )
124
137
time .Sleep (streamRecoveryBackoffPeriod )
125
138
}
126
- handler .logger .Warn ("Shutting down recovery loop." )
139
+ handler .settings . Logger .Warn ("Shutting down recovery loop." )
127
140
handler .handlerWaitGroup .Done ()
128
141
}
129
142
@@ -157,15 +170,15 @@ func (handler *StreamHandler) requestStream(ctx context.Context, cancel context.
157
170
for {
158
171
if err := handler .acknowledgeMessages (); err != nil {
159
172
if errors .Is (err , io .EOF ) {
160
- handler .logger .Warn ("EOF reached" )
173
+ handler .settings . Logger .Warn ("EOF reached" )
161
174
break
162
175
}
163
- handler .logger .Error (fmt .Sprintf ("Failed in acknowledge messages with error %v" , err ))
176
+ handler .settings . Logger .Error (fmt .Sprintf ("Failed in acknowledge messages with error %v" , err ))
164
177
break
165
178
}
166
179
select {
167
180
case <- ctx .Done ():
168
- handler .logger . Warn ("requestStream <-ctx.Done()" )
181
+ handler .settings . Logger . Debug ("requestStream <-ctx.Done()" )
169
182
case <- timer .C :
170
183
timer .Reset (handler .ackBatchWait )
171
184
}
@@ -176,7 +189,7 @@ func (handler *StreamHandler) requestStream(ctx context.Context, cancel context.
176
189
}
177
190
}
178
191
cancel ()
179
- handler .logger . Warn ("Request Stream loop ended." )
192
+ handler .settings . Logger . Debug ("Request Stream loop ended." )
180
193
_ = handler .stream .CloseSend ()
181
194
handler .streamWaitGroup .Done ()
182
195
}
@@ -202,30 +215,30 @@ func (handler *StreamHandler) responseStream(ctx context.Context, cancel context
202
215
case errors .Is (err , io .EOF ):
203
216
activeStreaming = false
204
217
case ! grpcStatus :
205
- handler .logger .Warn ("response stream breaking on error" ,
218
+ handler .settings . Logger .Warn ("response stream breaking on error" ,
206
219
zap .Error (err ))
207
220
activeStreaming = false
208
221
case s .Code () == codes .Unavailable :
209
- handler .logger . Info ("response stream breaking on gRPC s 'Unavailable'" )
222
+ handler .settings . Logger . Debug ("response stream breaking on gRPC s 'Unavailable'" )
210
223
activeStreaming = false
211
224
case s .Code () == codes .NotFound :
212
- handler .logger .Error ("resource doesn't exist, wait 60 seconds, and restarting stream" )
225
+ handler .settings . Logger .Error ("resource doesn't exist, wait 60 seconds, and restarting stream" )
213
226
time .Sleep (time .Second * 60 )
214
227
activeStreaming = false
215
228
default :
216
- handler .logger .Warn ("response stream breaking on gRPC s " + s .Message (),
229
+ handler .settings . Logger .Warn ("response stream breaking on gRPC s " + s .Message (),
217
230
zap .String ("s" , s .Message ()),
218
231
zap .Error (err ))
219
232
activeStreaming = false
220
233
}
221
234
}
222
235
if errors .Is (ctx .Err (), context .Canceled ) {
223
236
// Canceling the loop, collector is probably stopping
224
- handler .logger .Warn ("response stream ctx.Err() == context.Canceled" )
237
+ handler .settings . Logger .Warn ("response stream ctx.Err() == context.Canceled" )
225
238
break
226
239
}
227
240
}
228
241
cancel ()
229
- handler .logger . Warn ("Response Stream loop ended." )
242
+ handler .settings . Logger . Debug ("Response Stream loop ended." )
230
243
handler .streamWaitGroup .Done ()
231
244
}
0 commit comments