@@ -5,12 +5,8 @@ package stefexporter // import "github.com/open-telemetry/opentelemetry-collecto
5
5
6
6
import (
7
7
"context"
8
- "fmt"
9
- "sync"
8
+ "time"
10
9
11
- stefgrpc "github.com/splunk/stef/go/grpc"
12
- "github.com/splunk/stef/go/grpc/stef_proto"
13
- "github.com/splunk/stef/go/otel/oteltef"
14
10
stefpdatametrics "github.com/splunk/stef/go/pdata/metrics"
15
11
stefpkg "github.com/splunk/stef/go/pkg"
16
12
"go.opentelemetry.io/collector/component"
@@ -36,42 +32,26 @@ type stefExporter struct {
36
32
set component.TelemetrySettings
37
33
cfg Config
38
34
compression stefpkg.Compression
35
+ started bool
39
36
40
- // connMutex is taken when connecting, disconnecting or checking connection status.
41
- connMutex sync.Mutex
42
- isConnected bool
43
- connID uint64
44
- grpcConn * grpc.ClientConn
45
- client * stefgrpc.Client
46
- connCancel context.CancelFunc
47
-
48
- // The STEF writer we write metrics to and which in turns sends them over gRPC.
49
- stefWriter * oteltef.MetricsWriter
50
- stefWriterMutex sync.Mutex // protects stefWriter
51
-
52
- // lastAckID is the maximum ack ID received so far.
53
- lastAckID uint64
54
- // Cond to protect and signal lastAckID.
55
- ackCond * internal.CancellableCond
56
- }
37
+ grpcConn * grpc.ClientConn
57
38
58
- type loggerWrapper struct {
59
- logger * zap. Logger
39
+ connMan * internal. ConnManager
40
+ sync2Async * internal. Sync2Async
60
41
}
61
42
62
- func (w * loggerWrapper ) Debugf (_ context.Context , format string , v ... any ) {
63
- w .logger .Debug (fmt .Sprintf (format , v ... ))
64
- }
43
+ const (
44
+ flushPeriod = 100 * time .Millisecond
45
+ reconnectPeriod = 10 * time .Minute
46
+ )
65
47
66
- func (w * loggerWrapper ) Errorf (_ context.Context , format string , v ... any ) {
67
- w .logger .Error (fmt .Sprintf (format , v ... ))
68
- }
48
+ // TODO: make connection count configurable.
49
+ const connCount = 1
69
50
70
51
func newStefExporter (set component.TelemetrySettings , cfg * Config ) * stefExporter {
71
52
exp := & stefExporter {
72
- set : set ,
73
- cfg : * cfg ,
74
- ackCond : internal .NewCancellableCond (),
53
+ set : set ,
54
+ cfg : * cfg ,
75
55
}
76
56
77
57
exp .compression = stefpkg .CompressionNone
@@ -86,160 +66,83 @@ func newStefExporter(set component.TelemetrySettings, cfg *Config) *stefExporter
86
66
func (s * stefExporter ) Start (ctx context.Context , host component.Host ) error {
87
67
// Prepare gRPC connection.
88
68
var err error
89
- s .grpcConn , err = s .cfg .ToClientConn (ctx , host , s .set )
69
+ s .grpcConn , err = s .cfg .ClientConfig . ToClientConn (ctx , host , s .set )
90
70
if err != nil {
91
71
return err
92
72
}
93
73
94
- // No need to block Start(), we will begin connection attempt in a goroutine.
74
+ // Create a connection creator and manager to take care of the connections.
75
+ connCreator := internal .NewStefConnCreator (s .set .Logger , s .grpcConn , s .compression )
76
+ s .connMan = internal .NewConnManager (s .set .Logger , connCreator , connCount , flushPeriod , reconnectPeriod )
77
+ s .connMan .Start ()
78
+
79
+ // Wrap async implementation of sendMetricsAsync into a sync-callable API.
80
+ s .sync2Async = internal .NewSync2Async (s .set .Logger , s .cfg .QueueConfig .NumConsumers , s .sendMetricsAsync )
81
+
82
+ // Begin connection attempt in a goroutine to avoid blocking Start().
95
83
go func () {
96
- if err := s .ensureConnected (ctx ); err != nil {
84
+ // Acquire() triggers a connection attempt and blocks until it succeeds or fails.
85
+ conn , err := s .connMan .Acquire (ctx )
86
+ if err != nil {
97
87
s .set .Logger .Error ("Error connecting to destination" , zap .Error (err ))
98
- // This is not a fatal error. exportMetrics() will try to connect again as needed.
88
+ // This is not a fatal error. Next sending attempt will try to
89
+ // connect again as needed.
90
+ return
99
91
}
92
+ // Connection is established. Return it, this is all we needed for now.
93
+ s .connMan .Release (conn )
100
94
}()
101
- return nil
102
- }
103
95
104
- func (s * stefExporter ) Shutdown (ctx context.Context ) error {
105
- s .disconnect (ctx )
106
- if s .grpcConn != nil {
107
- if err := s .grpcConn .Close (); err != nil {
108
- s .set .Logger .Error ("Failed to close grpc connection" , zap .Error (err ))
109
- }
110
- s .grpcConn = nil
111
- }
96
+ s .started = true
112
97
return nil
113
98
}
114
99
115
- func (s * stefExporter ) ensureConnected (ctx context.Context ) error {
116
- s .connMutex .Lock ()
117
- defer s .connMutex .Unlock ()
118
-
119
- if s .isConnected {
100
+ func (s * stefExporter ) Shutdown (ctx context.Context ) error {
101
+ if ! s .started {
120
102
return nil
121
103
}
122
104
123
- s .set .Logger .Debug ("Connecting to destination" , zap .String ("endpoint" , s .cfg .Endpoint ))
124
-
125
- s .ackCond .Cond .L .Lock ()
126
- // Reset lastAckID. New STEF stream ack IDs will start from 1.
127
- s .lastAckID = 0
128
- // Increment connection ID, to make sure we don't confuse the new and
129
- // previous (stale) connections.
130
- s .connID ++
131
- connID := s .connID
132
- s .ackCond .Cond .L .Unlock ()
133
-
134
- // Prepare to open a STEF/gRPC stream to the server.
135
- grpcClient := stef_proto .NewSTEFDestinationClient (s .grpcConn )
136
-
137
- // Let server know about our schema.
138
- schema , err := oteltef .MetricsWireSchema ()
139
- if err != nil {
140
- return err
105
+ if err := s .connMan .Stop (ctx ); err != nil {
106
+ s .set .Logger .Error ("Failed to stop connection manager" , zap .Error (err ))
141
107
}
142
108
143
- settings := stefgrpc.ClientSettings {
144
- Logger : & loggerWrapper {s .set .Logger },
145
- GrpcClient : grpcClient ,
146
- ClientSchema : & schema ,
147
- Callbacks : stefgrpc.ClientCallbacks {
148
- OnAck : func (ackId uint64 ) error { return s .onGrpcAck (connID , ackId ) },
149
- },
150
- }
151
- s .client = stefgrpc .NewClient (settings )
152
-
153
- s .connCancel = nil
154
- connCtx , connCancel := context .WithCancel (context .Background ())
155
-
156
- connectionAttemptDone := make (chan struct {})
157
- defer close (connectionAttemptDone )
158
-
159
- // Start a goroutine that waits for success, failure or cancellation of
160
- // the connection attempt.
161
- go func () {
162
- // Wait for either connection attempt to be done or for the caller
163
- // of ensureConnected() to give up.
164
- select {
165
- case <- ctx .Done ():
166
- // The caller of ensureConnected() cancelled while we are waiting
167
- // for connection to be established. We have to cancel the
168
- // connection attempt (and the whole connection if it raced us and
169
- // managed to connect - we will reconnect later again in that case).
170
- s .set .Logger .Debug ("Canceling connection context because ensureConnected() caller cancelled." )
171
- connCancel ()
172
- case <- connectionAttemptDone :
173
- // Connection attempt finished (successfully or no). No need to wait for the
174
- // previous case, calling connCancel() is not needed anymore now. It will be
175
- // called later, when disconnecting.
176
- // From this moment we are essentially detaching from the Context
177
- // that passed to ensureConnected() since we wanted to honor it only
178
- // for the duration of the connection attempt, but not for the duration
179
- // of the entire existence of the connection.
109
+ if s .grpcConn != nil {
110
+ if err := s .grpcConn .Close (); err != nil {
111
+ s .set .Logger .Error ("Failed to close grpc connection" , zap .Error (err ))
180
112
}
181
- }()
182
-
183
- grpcWriter , opts , err := s .client .Connect (connCtx )
184
- if err != nil {
185
- connCancel ()
186
- return fmt .Errorf ("failed to connect to destination: %w" , err )
187
- }
188
-
189
- opts .Compression = s .compression
190
-
191
- // Create STEF record writer over gRPC.
192
- s .stefWriter , err = oteltef .NewMetricsWriter (grpcWriter , opts )
193
- if err != nil {
194
- connCancel ()
195
- return err
113
+ s .grpcConn = nil
196
114
}
197
115
198
- // From this point on we consider the connection successfully established.
199
- s .isConnected = true
200
-
201
- // We need to call the cancel func when this connection is over so that we don't
202
- // leak the Context we just created. This will be done in disconnect().
203
- s .connCancel = connCancel
204
-
205
- s .set .Logger .Debug ("Connected to destination" , zap .String ("endpoint" , s .cfg .Endpoint ))
206
-
116
+ s .started = false
207
117
return nil
208
118
}
209
119
210
- func (s * stefExporter ) disconnect (ctx context.Context ) {
211
- s .connMutex .Lock ()
212
- defer s .connMutex .Unlock ()
213
-
214
- if ! s .isConnected {
215
- return
216
- }
217
-
218
- if s .connCancel != nil {
219
- s .set .Logger .Debug ("Calling cancel on connection context to avoid leaks" )
220
- s .connCancel ()
221
- s .connCancel = nil
222
- }
223
-
224
- if err := s .client .Disconnect (ctx ); err != nil {
225
- s .set .Logger .Error ("Failed to disconnect" , zap .Error (err ))
226
- }
227
-
228
- s .set .Logger .Debug ("Disconnected." )
229
- s .isConnected = false
120
+ func (s * stefExporter ) exportMetrics (ctx context.Context , data pmetric.Metrics ) error {
121
+ return s .sync2Async .DoSync (ctx , data )
230
122
}
231
123
232
- func (s * stefExporter ) exportMetrics (ctx context.Context , md pmetric.Metrics ) error {
233
- if err := s .ensureConnected (ctx ); err != nil {
234
- return err
124
+ // sendMetricsAsync is an async implementation of sending metric data.
125
+ // The result of sending will be reported via resultChan.
126
+ func (s * stefExporter ) sendMetricsAsync (
127
+ ctx context.Context ,
128
+ data any ,
129
+ resultChan internal.ResultChan ,
130
+ ) (internal.DataID , error ) {
131
+ // Acquire a connection to send the data over.
132
+ conn , err := s .connMan .Acquire (ctx )
133
+ if err != nil {
134
+ return 0 , err
235
135
}
236
136
237
- // stefWriter is not safe for concurrent writing, protect it.
238
- s .stefWriterMutex .Lock ()
239
- defer s .stefWriterMutex .Unlock ()
137
+ // It must be a StefConn with a Writer.
138
+ stefConn := conn .Conn ().(* internal.StefConn )
139
+ stefWriter := stefConn .Writer ()
140
+
141
+ md := data .(pmetric.Metrics )
240
142
143
+ // Convert and write the data to the Writer.
241
144
converter := stefpdatametrics.OtlpToSTEFUnsorted {}
242
- err : = converter .WriteMetrics (md , s . stefWriter )
145
+ err = converter .WriteMetrics (md , stefWriter )
243
146
if err != nil {
244
147
s .set .Logger .Debug ("WriteMetrics failed" , zap .Error (err ))
245
148
@@ -251,62 +154,27 @@ func (s *stefExporter) exportMetrics(ctx context.Context, md pmetric.Metrics) er
251
154
//
252
155
// We need to reconnect. Disconnect here and the next exportMetrics()
253
156
// call will connect again.
254
- s .disconnect ( ctx )
157
+ s .connMan . DiscardAndClose ( conn )
255
158
256
159
// TODO: check if err is because STEF encoding failed. If so we must not
257
160
// try to re-encode the same data. Return consumererror.NewPermanent(err)
258
161
// to the caller. This requires changes in STEF Go library.
259
162
260
163
// Return an error to retry sending these metrics again next time.
261
- return err
164
+ return 0 , err
262
165
}
263
166
264
167
// According to STEF gRPC spec the destination ack IDs match written record number.
265
168
// When the data we have just written is received by destination it will send us
266
- // back and ack ID that numerically matches the last written record number.
267
- expectedAckID := s .stefWriter .RecordCount ()
268
-
269
- // stefWriter normally buffers written records in memory. Flush() ensures buffered
270
- // data is sent to network. This is necessary so that the server receives it and
271
- // sends an acknowledgement back.
272
- if err = s .stefWriter .Flush (); err != nil {
273
- s .set .Logger .Debug ("Flush failed" , zap .Error (err ))
169
+ // back an ack ID that numerically matches the last written record number.
170
+ expectedAckID := stefWriter .RecordCount ()
274
171
275
- // Failure to write the gRPC stream normally means something is
276
- // wrong with the connection. We need to reconnect. Disconnect here
277
- // and the next exportMetrics() call will connect again.
278
- s .disconnect (ctx )
172
+ // Register to be notified via resultChan when the ack of the
173
+ // written record is received.
174
+ stefConn .OnAck (expectedAckID , resultChan )
279
175
280
- // Return an error to retry sending these metrics again next time.
281
- return err
282
- }
176
+ // We are done with the connection.
177
+ s .connMan .Release (conn )
283
178
284
- // Wait for acknowledgement.
285
- err = s .ackCond .Wait (ctx , func () bool { return s .lastAckID >= expectedAckID })
286
- if err != nil {
287
- return fmt .Errorf ("error waiting for ack ID %d: %w" , expectedAckID , err )
288
- }
289
-
290
- return nil
291
- }
292
-
293
- func (s * stefExporter ) onGrpcAck (connID uint64 , ackID uint64 ) error {
294
- s .ackCond .Cond .L .Lock ()
295
- defer s .ackCond .Cond .L .Unlock ()
296
-
297
- if s .connID != connID {
298
- // This is an ack from a previous (stale) connection. This can happen
299
- // due to a race if the ack from the old stream arrives after we decided
300
- // to reconnect but the old stream is still functioning. We just need
301
- // to ignore this ack, it is no longer relevant.
302
- return nil
303
- }
304
-
305
- // The IDs are expected to always monotonically increase. Check it anyway in case
306
- // the server misbehaves and violates the expectation.
307
- if s .lastAckID < ackID {
308
- s .lastAckID = ackID
309
- s .ackCond .Cond .Broadcast ()
310
- }
311
- return nil
179
+ return internal .DataID (expectedAckID ), nil
312
180
}
0 commit comments