Skip to content

Commit 3f9d9d7

Browse files
author
Ignacio Bonafonte
authored
Merge pull request #273 from nachoBonafonte/Implement-ForceFlush-spec
Implement ForceFlush spec adding a timeout parameter. This parameter takes effect only in BatchSpanProcessor built-in processors
2 parents bc01286 + 80b6cf4 commit 3f9d9d7

File tree

8 files changed

+36
-23
lines changed

8 files changed

+36
-23
lines changed

Sources/Instrumentation/SignPostIntegration/SignPostIntegration.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,5 +28,5 @@ public class SignPostIntegration: SpanProcessor {
2828
}
2929

3030
public func shutdown() {}
31-
public func forceFlush() {}
31+
public func forceFlush(timeout: TimeInterval? = nil) {}
3232
}

Sources/OpenTelemetrySdk/Trace/SpanProcessor.swift

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ public protocol SpanProcessor {
1818
/// Called when a Span is started, if the Span.isRecording is true.
1919
/// This method is called synchronously on the execution thread, should not throw or block the
2020
/// execution thread.
21+
/// - Parameter parentContext: the context of the span parent, if exists
2122
/// - Parameter span: the ReadableSpan that just started
2223
func onStart(parentContext: SpanContext?, span: ReadableSpan)
2324

@@ -33,5 +34,13 @@ public protocol SpanProcessor {
3334

3435
/// Processes all span events that have not yet been processed.
3536
/// This method is executed synchronously on the calling thread
36-
func forceFlush()
37+
/// - Parameter timeout: Maximum time the flush complete or abort. If nil, it will wait indefinitely
38+
func forceFlush(timeout: TimeInterval?)
3739
}
40+
41+
extension SpanProcessor {
42+
func forceFlush() {
43+
return forceFlush(timeout: nil)
44+
}
45+
}
46+

Sources/OpenTelemetrySdk/Trace/SpanProcessors/BatchSpanProcessor.swift

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ public struct BatchSpanProcessor: SpanProcessor {
1818
fileprivate var worker: BatchWorker
1919

2020
public init(spanExporter: SpanExporter, scheduleDelay: TimeInterval = 5, exportTimeout: TimeInterval = 30,
21-
maxQueueSize: Int = 2048, maxExportBatchSize: Int = 512, willExportCallback:((inout [SpanData]) -> Void)? = nil)
21+
maxQueueSize: Int = 2048, maxExportBatchSize: Int = 512, willExportCallback: ((inout [SpanData]) -> Void)? = nil)
2222
{
2323
worker = BatchWorker(spanExporter: spanExporter,
2424
scheduleDelay: scheduleDelay,
@@ -46,8 +46,8 @@ public struct BatchSpanProcessor: SpanProcessor {
4646
worker.shutdown()
4747
}
4848

49-
public func forceFlush() {
50-
worker.forceFlush()
49+
public func forceFlush(timeout: TimeInterval? = nil) {
50+
worker.forceFlush(explicitTimeout: timeout)
5151
}
5252
}
5353

@@ -60,13 +60,13 @@ private class BatchWorker: Thread {
6060
let maxQueueSize: Int
6161
let exportTimeout: TimeInterval
6262
let maxExportBatchSize: Int
63-
let willExportCallback:((inout [SpanData]) -> Void)?
63+
let willExportCallback: ((inout [SpanData]) -> Void)?
6464
let halfMaxQueueSize: Int
6565
private let cond = NSCondition()
6666
var spanList = [ReadableSpan]()
6767
var queue: OperationQueue
6868

69-
init(spanExporter: SpanExporter, scheduleDelay: TimeInterval, exportTimeout:TimeInterval, maxQueueSize: Int, maxExportBatchSize: Int, willExportCallback:((inout [SpanData]) -> Void)?) {
69+
init(spanExporter: SpanExporter, scheduleDelay: TimeInterval, exportTimeout: TimeInterval, maxQueueSize: Int, maxExportBatchSize: Int, willExportCallback: ((inout [SpanData]) -> Void)?) {
7070
self.spanExporter = spanExporter
7171
self.scheduleDelay = scheduleDelay
7272
self.exportTimeout = exportTimeout
@@ -109,33 +109,34 @@ private class BatchWorker: Thread {
109109
spansCopy = spanList
110110
spanList.removeAll()
111111
cond.unlock()
112-
self.exportBatch(spanList: spansCopy)
112+
self.exportBatch(spanList: spansCopy, explicitTimeout: nil)
113113
}
114114
} while true
115115
}
116116

117117
func shutdown() {
118-
forceFlush()
118+
forceFlush(explicitTimeout: nil)
119119
spanExporter.shutdown()
120120
}
121121

122-
public func forceFlush() {
122+
public func forceFlush(explicitTimeout: TimeInterval?) {
123123
var spansCopy: [ReadableSpan]
124124
cond.lock()
125125
spansCopy = spanList
126126
spanList.removeAll()
127127
cond.unlock()
128128
// Execute the batch export outside the synchronized to not block all producers.
129-
exportBatch(spanList: spansCopy)
129+
exportBatch(spanList: spansCopy, explicitTimeout: explicitTimeout)
130130
}
131131

132-
private func exportBatch(spanList: [ReadableSpan]) {
132+
private func exportBatch(spanList: [ReadableSpan], explicitTimeout: TimeInterval?) {
133133
let exportOperation = BlockOperation { [weak self] in
134134
self?.exportAction(spanList: spanList)
135135
}
136136
let timeoutTimer = DispatchSource.makeTimerSource(queue: DispatchQueue.global())
137-
timeoutTimer.setEventHandler{ exportOperation.cancel() }
138-
timeoutTimer.schedule(deadline: .now() + .milliseconds(Int(exportTimeout.toMilliseconds)), leeway: .milliseconds(1))
137+
timeoutTimer.setEventHandler { exportOperation.cancel() }
138+
let maxTimeOut = min(explicitTimeout ?? TimeInterval.greatestFiniteMagnitude, exportTimeout)
139+
timeoutTimer.schedule(deadline: .now() + .milliseconds(Int(maxTimeOut.toMilliseconds)), leeway: .milliseconds(1))
139140
timeoutTimer.activate()
140141
queue.addOperation(exportOperation)
141142
queue.waitUntilAllOperationsAreFinished()

Sources/OpenTelemetrySdk/Trace/SpanProcessors/MultiSpanProcessor.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,9 @@ public struct MultiSpanProcessor: SpanProcessor {
5151
}
5252
}
5353

54-
public func forceFlush() {
54+
public func forceFlush(timeout: TimeInterval? = nil) {
5555
spanProcessorsAll.forEach {
56-
$0.forceFlush()
56+
$0.forceFlush(timeout: timeout)
5757
}
5858
}
5959
}

Sources/OpenTelemetrySdk/Trace/SpanProcessors/NoopSpanProcessor.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,5 +18,5 @@ struct NoopSpanProcessor: SpanProcessor {
1818

1919
func shutdown() {}
2020

21-
func forceFlush() {}
21+
func forceFlush(timeout: TimeInterval? = nil) {}
2222
}

Sources/OpenTelemetrySdk/Trace/SpanProcessors/SimpleSpanProcessor.swift

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,9 @@
66
import Foundation
77
import OpenTelemetryApi
88

9-
/// An implementation of the SpanProcessor that converts the ReadableSpan SpanData
10-
/// and passes it to the configured exporter.
9+
/// A really simple implementation of the SpanProcessor that converts the ReadableSpan SpanData
10+
/// and passes it to the configured exporter.
11+
/// For production environment BatchSpanProcessor is configurable and is preferred.
1112
public struct SimpleSpanProcessor: SpanProcessor {
1213
private let spanExporter: SpanExporter
1314
private var sampled: Bool = true
@@ -35,7 +36,9 @@ public struct SimpleSpanProcessor: SpanProcessor {
3536
}
3637
}
3738

38-
public func forceFlush() {
39+
/// Forces the processing of the remaining spans
40+
/// - Parameter timeout: unused in this processor
41+
public func forceFlush(timeout: TimeInterval? = nil) {
3942
processorQueue.sync {
4043
_ = spanExporter.flush()
4144
}

Sources/OpenTelemetrySdk/Trace/TracerProviderSdk.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ public class TracerProviderSdk: TracerProvider {
140140
}
141141

142142
/// Requests the active span processor to process all span events that have not yet been processed.
143-
public func forceFlush() {
144-
sharedState.activeSpanProcessor.forceFlush()
143+
public func forceFlush(timeout: TimeInterval? = nil) {
144+
sharedState.activeSpanProcessor.forceFlush(timeout: timeout)
145145
}
146146
}

Tests/OpenTelemetrySdkTests/Trace/Mocks/SpanProcessorMock.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ class SpanProcessorMock: SpanProcessor {
3636
shutdownCalledTimes += 1
3737
}
3838

39-
func forceFlush() {
39+
func forceFlush(timeout: TimeInterval? = nil) {
4040
forceFlushCalledTimes += 1
4141
}
4242
}

0 commit comments

Comments
 (0)