@@ -17,9 +17,10 @@ import (
17
17
)
18
18
19
19
type batch struct {
20
- ctx context.Context
21
- req request.Request
22
- done multiDone
20
+ ctx context.Context
21
+ req request.Request
22
+ done multiDone
23
+ created time.Time
23
24
}
24
25
25
26
// defaultBatcher continuously batch incoming requests and flushes asynchronously if minimum size limit is met or on timeout.
@@ -30,7 +31,7 @@ type defaultBatcher struct {
30
31
stopWG sync.WaitGroup
31
32
currentBatchMu sync.Mutex
32
33
currentBatch * batch
33
- timer * time.Timer
34
+ ticker * time.Ticker
34
35
shutdownCh chan struct {}
35
36
}
36
37
@@ -52,12 +53,6 @@ func newDefaultBatcher(batchCfg BatchConfig, consumeFunc sender.SendFunc[request
52
53
}
53
54
}
54
55
55
- func (qb * defaultBatcher ) resetTimer () {
56
- if qb .batchCfg .FlushTimeout > 0 {
57
- qb .timer .Reset (qb .batchCfg .FlushTimeout )
58
- }
59
- }
60
-
61
56
func (qb * defaultBatcher ) Consume (ctx context.Context , req request.Request , done Done ) {
62
57
qb .currentBatchMu .Lock ()
63
58
@@ -81,11 +76,11 @@ func (qb *defaultBatcher) Consume(ctx context.Context, req request.Request, done
81
76
// Do not flush the last item and add it to the current batch.
82
77
reqList = reqList [:len (reqList )- 1 ]
83
78
qb .currentBatch = & batch {
84
- ctx : ctx ,
85
- req : lastReq ,
86
- done : multiDone {done },
79
+ ctx : ctx ,
80
+ req : lastReq ,
81
+ done : multiDone {done },
82
+ created : time .Now (),
87
83
}
88
- qb .resetTimer ()
89
84
}
90
85
91
86
qb .currentBatchMu .Unlock ()
@@ -136,11 +131,11 @@ func (qb *defaultBatcher) Consume(ctx context.Context, req request.Request, done
136
131
// Do not flush the last item and add it to the current batch.
137
132
reqList = reqList [:len (reqList )- 1 ]
138
133
qb .currentBatch = & batch {
139
- ctx : ctx ,
140
- req : lastReq ,
141
- done : multiDone {done },
134
+ ctx : ctx ,
135
+ req : lastReq ,
136
+ done : multiDone {done },
137
+ created : time .Now (),
142
138
}
143
- qb .resetTimer ()
144
139
}
145
140
}
146
141
@@ -162,8 +157,8 @@ func (qb *defaultBatcher) startTimeBasedFlushingGoroutine() {
162
157
select {
163
158
case <- qb .shutdownCh :
164
159
return
165
- case <- qb .timer .C :
166
- qb .flushCurrentBatchIfNecessary ()
160
+ case <- qb .ticker .C :
161
+ qb .flushCurrentBatchIfNecessary (false )
167
162
}
168
163
}
169
164
}()
@@ -172,27 +167,30 @@ func (qb *defaultBatcher) startTimeBasedFlushingGoroutine() {
172
167
// Start starts the goroutine that reads from the queue and flushes asynchronously.
173
168
func (qb * defaultBatcher ) Start (_ context.Context , _ component.Host ) error {
174
169
if qb .batchCfg .FlushTimeout > 0 {
175
- qb .timer = time .NewTimer (qb .batchCfg .FlushTimeout )
170
+ qb .ticker = time .NewTicker (qb .batchCfg .FlushTimeout )
176
171
qb .startTimeBasedFlushingGoroutine ()
177
172
}
178
173
179
174
return nil
180
175
}
181
176
182
177
// flushCurrentBatchIfNecessary sends out the current request batch if it is not nil
183
- func (qb * defaultBatcher ) flushCurrentBatchIfNecessary () {
178
+ func (qb * defaultBatcher ) flushCurrentBatchIfNecessary (forceFlush bool ) {
184
179
qb .currentBatchMu .Lock ()
185
180
if qb .currentBatch == nil {
186
181
qb .currentBatchMu .Unlock ()
187
182
return
188
183
}
184
+ if ! forceFlush && time .Since (qb .currentBatch .created ) < qb .batchCfg .FlushTimeout {
185
+ qb .currentBatchMu .Unlock ()
186
+ return
187
+ }
189
188
batchToFlush := qb .currentBatch
190
189
qb .currentBatch = nil
191
190
qb .currentBatchMu .Unlock ()
192
191
193
192
// flush() blocks until successfully started a goroutine for flushing.
194
193
qb .flush (batchToFlush .ctx , batchToFlush .req , batchToFlush .done )
195
- qb .resetTimer ()
196
194
}
197
195
198
196
// flush starts a goroutine that calls consumeFunc. It blocks until a worker is available if necessary.
@@ -214,7 +212,7 @@ func (qb *defaultBatcher) flush(ctx context.Context, req request.Request, done D
214
212
func (qb * defaultBatcher ) Shutdown (_ context.Context ) error {
215
213
close (qb .shutdownCh )
216
214
// Make sure execute one last flush if necessary.
217
- qb .flushCurrentBatchIfNecessary ()
215
+ qb .flushCurrentBatchIfNecessary (true )
218
216
qb .stopWG .Wait ()
219
217
return nil
220
218
}
0 commit comments