@@ -5,12 +5,8 @@ package stefexporter // import "github.com/open-telemetry/opentelemetry-collecto
5
5
6
6
import (
7
7
"context"
8
- "fmt"
9
- "sync"
8
+ "time"
10
9
11
- stefgrpc "github.com/splunk/stef/go/grpc"
12
- "github.com/splunk/stef/go/grpc/stef_proto"
13
- "github.com/splunk/stef/go/otel/oteltef"
14
10
stefpdatametrics "github.com/splunk/stef/go/pdata/metrics"
15
11
stefpkg "github.com/splunk/stef/go/pkg"
16
12
"go.opentelemetry.io/collector/component"
@@ -36,42 +32,26 @@ type stefExporter struct {
36
32
set component.TelemetrySettings
37
33
cfg Config
38
34
compression stefpkg.Compression
35
+ started bool
39
36
40
- // connMutex is taken when connecting, disconnecting or checking connection status.
41
- connMutex sync.Mutex
42
- isConnected bool
43
- connID uint64
44
- grpcConn * grpc.ClientConn
45
- client * stefgrpc.Client
46
- connCancel context.CancelFunc
47
-
48
- // The STEF writer we write metrics to and which in turns sends them over gRPC.
49
- stefWriter * oteltef.MetricsWriter
50
- stefWriterMutex sync.Mutex // protects stefWriter
51
-
52
- // lastAckID is the maximum ack ID received so far.
53
- lastAckID uint64
54
- // Cond to protect and signal lastAckID.
55
- ackCond * internal.CancellableCond
56
- }
37
+ grpcConn * grpc.ClientConn
57
38
58
- type loggerWrapper struct {
59
- logger * zap. Logger
39
+ connMan * internal. ConnManager
40
+ sync2Async * internal. Sync2Async
60
41
}
61
42
62
- func (w * loggerWrapper ) Debugf (_ context.Context , format string , v ... any ) {
63
- w .logger .Debug (fmt .Sprintf (format , v ... ))
64
- }
43
+ const (
44
+ flushPeriod = 100 * time .Millisecond
45
+ reconnectPeriod = 10 * time .Minute
46
+ )
65
47
66
- func (w * loggerWrapper ) Errorf (_ context.Context , format string , v ... any ) {
67
- w .logger .Error (fmt .Sprintf (format , v ... ))
68
- }
48
+ // TODO: make connection count configurable.
49
+ const connCount = 1
69
50
70
51
func newStefExporter (set component.TelemetrySettings , cfg * Config ) * stefExporter {
71
52
exp := & stefExporter {
72
- set : set ,
73
- cfg : * cfg ,
74
- ackCond : internal .NewCancellableCond (),
53
+ set : set ,
54
+ cfg : * cfg ,
75
55
}
76
56
77
57
exp .compression = stefpkg .CompressionNone
@@ -91,155 +71,89 @@ func (s *stefExporter) Start(ctx context.Context, host component.Host) error {
91
71
return err
92
72
}
93
73
94
- // No need to block Start(), we will begin connection attempt in a goroutine.
95
- go func () {
96
- if err := s .ensureConnected (ctx ); err != nil {
97
- s .set .Logger .Error ("Error connecting to destination" , zap .Error (err ))
98
- // This is not a fatal error. exportMetrics() will try to connect again as needed.
99
- }
100
- }()
101
- return nil
102
- }
103
-
104
- func (s * stefExporter ) Shutdown (ctx context.Context ) error {
105
- s .disconnect (ctx )
106
- if s .grpcConn != nil {
107
- if err := s .grpcConn .Close (); err != nil {
108
- s .set .Logger .Error ("Failed to close grpc connection" , zap .Error (err ))
109
- }
110
- s .grpcConn = nil
111
- }
112
- return nil
113
- }
114
-
115
- func (s * stefExporter ) ensureConnected (ctx context.Context ) error {
116
- s .connMutex .Lock ()
117
- defer s .connMutex .Unlock ()
118
-
119
- if s .isConnected {
120
- return nil
74
+ // Create a connection creator and manager to take care of the connections.
75
+ connCreator := internal .NewStefConnCreator (s .set .Logger , s .grpcConn , s .compression )
76
+ set := internal.ConnManagerSettings {
77
+ Logger : s .set .Logger ,
78
+ Creator : connCreator ,
79
+ TargetConnCount : connCount ,
80
+ FlushPeriod : flushPeriod ,
81
+ ReconnectPeriod : reconnectPeriod ,
121
82
}
122
-
123
- s .set .Logger .Debug ("Connecting to destination" , zap .String ("endpoint" , s .cfg .Endpoint ))
124
-
125
- s .ackCond .Cond .L .Lock ()
126
- // Reset lastAckID. New STEF stream ack IDs will start from 1.
127
- s .lastAckID = 0
128
- // Increment connection ID, to make sure we don't confuse the new and
129
- // previous (stale) connections.
130
- s .connID ++
131
- connID := s .connID
132
- s .ackCond .Cond .L .Unlock ()
133
-
134
- // Prepare to open a STEF/gRPC stream to the server.
135
- grpcClient := stef_proto .NewSTEFDestinationClient (s .grpcConn )
136
-
137
- // Let server know about our schema.
138
- schema , err := oteltef .MetricsWireSchema ()
83
+ s .connMan , err = internal .NewConnManager (set )
139
84
if err != nil {
140
85
return err
141
86
}
142
87
143
- settings := stefgrpc.ClientSettings {
144
- Logger : & loggerWrapper {s .set .Logger },
145
- GrpcClient : grpcClient ,
146
- ClientSchema : & schema ,
147
- Callbacks : stefgrpc.ClientCallbacks {
148
- OnAck : func (ackId uint64 ) error { return s .onGrpcAck (connID , ackId ) },
149
- },
150
- }
151
- s .client = stefgrpc .NewClient (settings )
152
-
153
- s .connCancel = nil
154
- connCtx , connCancel := context .WithCancel (context .Background ())
88
+ s .connMan .Start ()
155
89
156
- connectionAttemptDone := make ( chan struct {})
157
- defer close ( connectionAttemptDone )
90
+ // Wrap async implementation of sendMetricsAsync into a sync-callable API.
91
+ s . sync2Async = internal . NewSync2Async ( s . set . Logger , s . cfg . QueueConfig . NumConsumers , s . sendMetricsAsync )
158
92
159
- // Start a goroutine that waits for success, failure or cancellation of
160
- // the connection attempt.
93
+ // Begin connection attempt in a goroutine to avoid blocking Start().
161
94
go func () {
162
- // Wait for either connection attempt to be done or for the caller
163
- // of ensureConnected() to give up.
164
- select {
165
- case <- ctx .Done ():
166
- // The caller of ensureConnected() cancelled while we are waiting
167
- // for connection to be established. We have to cancel the
168
- // connection attempt (and the whole connection if it raced us and
169
- // managed to connect - we will reconnect later again in that case).
170
- s .set .Logger .Debug ("Canceling connection context because ensureConnected() caller cancelled." )
171
- connCancel ()
172
- case <- connectionAttemptDone :
173
- // Connection attempt finished (successfully or no). No need to wait for the
174
- // previous case, calling connCancel() is not needed anymore now. It will be
175
- // called later, when disconnecting.
176
- // From this moment we are essentially detaching from the Context
177
- // that passed to ensureConnected() since we wanted to honor it only
178
- // for the duration of the connection attempt, but not for the duration
179
- // of the entire existence of the connection.
95
+ // Acquire() triggers a connection attempt and blocks until it succeeds or fails.
96
+ conn , err := s .connMan .Acquire (ctx )
97
+ if err != nil {
98
+ s .set .Logger .Error ("Error connecting to destination" , zap .Error (err ))
99
+ // This is not a fatal error. Next sending attempt will try to
100
+ // connect again as needed.
101
+ return
180
102
}
103
+ // Connection is established. Return it, this is all we needed for now.
104
+ s .connMan .Release (ctx , conn )
181
105
}()
182
106
183
- grpcWriter , opts , err := s .client .Connect (connCtx )
184
- if err != nil {
185
- connCancel ()
186
- return fmt .Errorf ("failed to connect to destination: %w" , err )
187
- }
188
-
189
- opts .Compression = s .compression
190
-
191
- // Create STEF record writer over gRPC.
192
- s .stefWriter , err = oteltef .NewMetricsWriter (grpcWriter , opts )
193
- if err != nil {
194
- connCancel ()
195
- return err
196
- }
197
-
198
- // From this point on we consider the connection successfully established.
199
- s .isConnected = true
200
-
201
- // We need to call the cancel func when this connection is over so that we don't
202
- // leak the Context we just created. This will be done in disconnect().
203
- s .connCancel = connCancel
204
-
205
- s .set .Logger .Debug ("Connected to destination" , zap .String ("endpoint" , s .cfg .Endpoint ))
206
-
107
+ s .started = true
207
108
return nil
208
109
}
209
110
210
- func (s * stefExporter ) disconnect (ctx context.Context ) {
211
- s .connMutex .Lock ()
212
- defer s .connMutex .Unlock ()
213
-
214
- if ! s .isConnected {
215
- return
111
+ func (s * stefExporter ) Shutdown (ctx context.Context ) error {
112
+ if ! s .started {
113
+ return nil
216
114
}
217
115
218
- if s .connCancel != nil {
219
- s .set .Logger .Debug ("Calling cancel on connection context to avoid leaks" )
220
- s .connCancel ()
221
- s .connCancel = nil
116
+ if err := s .connMan .Stop (ctx ); err != nil {
117
+ s .set .Logger .Error ("Failed to stop connection manager" , zap .Error (err ))
222
118
}
223
119
224
- if err := s .client .Disconnect (ctx ); err != nil {
225
- s .set .Logger .Error ("Failed to disconnect" , zap .Error (err ))
120
+ if s .grpcConn != nil {
121
+ if err := s .grpcConn .Close (); err != nil {
122
+ s .set .Logger .Error ("Failed to close grpc connection" , zap .Error (err ))
123
+ }
124
+ s .grpcConn = nil
226
125
}
227
126
228
- s .set . Logger . Debug ( "Disconnected." )
229
- s . isConnected = false
127
+ s .started = false
128
+ return nil
230
129
}
231
130
232
- func (s * stefExporter ) exportMetrics (ctx context.Context , md pmetric.Metrics ) error {
233
- if err := s .ensureConnected (ctx ); err != nil {
234
- return err
131
+ func (s * stefExporter ) exportMetrics (ctx context.Context , data pmetric.Metrics ) error {
132
+ return s .sync2Async .DoSync (ctx , data )
133
+ }
134
+
135
+ // sendMetricsAsync is an async implementation of sending metric data.
136
+ // The result of sending will be reported via resultChan.
137
+ func (s * stefExporter ) sendMetricsAsync (
138
+ ctx context.Context ,
139
+ data any ,
140
+ resultChan internal.ResultChan ,
141
+ ) (internal.DataID , error ) {
142
+ // Acquire a connection to send the data over.
143
+ conn , err := s .connMan .Acquire (ctx )
144
+ if err != nil {
145
+ return 0 , err
235
146
}
236
147
237
- // stefWriter is not safe for concurrent writing, protect it .
238
- s . stefWriterMutex . Lock ( )
239
- defer s . stefWriterMutex . Unlock ()
148
+ // It must be a StefConn with a Writer .
149
+ stefConn := conn . Conn ().( * internal. StefConn )
150
+ stefWriter := stefConn . Writer ()
240
151
152
+ md := data .(pmetric.Metrics )
153
+
154
+ // Convert and write the data to the Writer.
241
155
converter := stefpdatametrics.OtlpToSTEFUnsorted {}
242
- err : = converter .WriteMetrics (md , s . stefWriter )
156
+ err = converter .WriteMetrics (md , stefWriter )
243
157
if err != nil {
244
158
s .set .Logger .Debug ("WriteMetrics failed" , zap .Error (err ))
245
159
@@ -251,62 +165,27 @@ func (s *stefExporter) exportMetrics(ctx context.Context, md pmetric.Metrics) er
251
165
//
252
166
// We need to reconnect. Disconnect here and the next exportMetrics()
253
167
// call will connect again.
254
- s .disconnect ( ctx )
168
+ s .connMan . DiscardAndClose ( conn )
255
169
256
170
// TODO: check if err is because STEF encoding failed. If so we must not
257
171
// try to re-encode the same data. Return consumererror.NewPermanent(err)
258
172
// to the caller. This requires changes in STEF Go library.
259
173
260
174
// Return an error to retry sending these metrics again next time.
261
- return err
175
+ return 0 , err
262
176
}
263
177
264
178
// According to STEF gRPC spec the destination ack IDs match written record number.
265
179
// When the data we have just written is received by destination it will send us
266
- // back and ack ID that numerically matches the last written record number.
267
- expectedAckID := s .stefWriter .RecordCount ()
268
-
269
- // stefWriter normally buffers written records in memory. Flush() ensures buffered
270
- // data is sent to network. This is necessary so that the server receives it and
271
- // sends an acknowledgement back.
272
- if err = s .stefWriter .Flush (); err != nil {
273
- s .set .Logger .Debug ("Flush failed" , zap .Error (err ))
274
-
275
- // Failure to write the gRPC stream normally means something is
276
- // wrong with the connection. We need to reconnect. Disconnect here
277
- // and the next exportMetrics() call will connect again.
278
- s .disconnect (ctx )
279
-
280
- // Return an error to retry sending these metrics again next time.
281
- return err
282
- }
283
-
284
- // Wait for acknowledgement.
285
- err = s .ackCond .Wait (ctx , func () bool { return s .lastAckID >= expectedAckID })
286
- if err != nil {
287
- return fmt .Errorf ("error waiting for ack ID %d: %w" , expectedAckID , err )
288
- }
180
+ // back an ack ID that numerically matches the last written record number.
181
+ expectedAckID := stefWriter .RecordCount ()
289
182
290
- return nil
291
- }
183
+ // Register to be notified via resultChan when the ack of the
184
+ // written record is received.
185
+ stefConn .OnAck (expectedAckID , resultChan )
292
186
293
- func (s * stefExporter ) onGrpcAck (connID uint64 , ackID uint64 ) error {
294
- s .ackCond .Cond .L .Lock ()
295
- defer s .ackCond .Cond .L .Unlock ()
187
+ // We are done with the connection.
188
+ s .connMan .Release (ctx , conn )
296
189
297
- if s .connID != connID {
298
- // This is an ack from a previous (stale) connection. This can happen
299
- // due to a race if the ack from the old stream arrives after we decided
300
- // to reconnect but the old stream is still functioning. We just need
301
- // to ignore this ack, it is no longer relevant.
302
- return nil
303
- }
304
-
305
- // The IDs are expected to always monotonically increase. Check it anyway in case
306
- // the server misbehaves and violates the expectation.
307
- if s .lastAckID < ackID {
308
- s .lastAckID = ackID
309
- s .ackCond .Cond .Broadcast ()
310
- }
311
- return nil
190
+ return internal .DataID (expectedAckID ), nil
312
191
}
0 commit comments