Skip to content

Commit 7cabfc1

Browse files
author
Ignacio Bonafonte
authored
Merge pull request #199 from bryce-b/bryce/metric-processor-improvements
This PR fixes a retention loop between MetricProviderSdk & PushMetricController, and also fixes the resulting crash by using a DispatchSourceTimer to clean up the dispatched thread when the PushMetricController is released. fixes #198
2 parents 0bcf4da + f2ec48e commit 7cabfc1

File tree

4 files changed

+46
-31
lines changed

4 files changed

+46
-31
lines changed

Sources/OpenTelemetrySdk/Metrics/Export/PushMetricController.swift

Lines changed: 31 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -6,35 +6,46 @@ import Foundation
66

77
class PushMetricController {
88
var meterSharedState: MeterSharedState
9-
var meterProvider: MeterProviderSdk
9+
weak var meterProvider: MeterProviderSdk?
1010

1111
let pushMetricQueue = DispatchQueue(label: "org.opentelemetry.PushMetricController.pushMetricQueue")
12+
let metricPushTimer: DispatchSourceTimer
1213

1314
init(meterProvider: MeterProviderSdk, meterSharedState: MeterSharedState, shouldCancel: (() -> Bool)? = nil) {
1415
self.meterProvider = meterProvider
1516
self.meterSharedState = meterSharedState
16-
pushMetricQueue.asyncAfter(deadline: .now() + meterSharedState.metricPushInterval) { [weak self] in
17-
guard let self = self else {
18-
return
19-
}
20-
while !(shouldCancel?() ?? false) {
21-
autoreleasepool {
22-
let start = Date()
23-
let values = self.meterProvider.getMeters().values
24-
values.forEach {
25-
$0.collect()
26-
}
17+
metricPushTimer = DispatchSource.makeTimerSource(flags: DispatchSource.TimerFlags(), queue: pushMetricQueue)
18+
metricPushTimer.setEventHandler { [weak self] in
19+
autoreleasepool {
20+
guard let self = self,
21+
let meterProvider = self.meterProvider else {
22+
return
23+
}
24+
if shouldCancel?() ?? false {
25+
self.metricPushTimer.cancel()
26+
return
27+
}
28+
let start = Date()
29+
let values = meterProvider.getMeters().values
30+
values.forEach {
31+
$0.collect()
32+
}
2733

28-
let metricToExport = self.meterSharedState.metricProcessor.finishCollectionCycle()
34+
let metricToExport = self.meterSharedState.metricProcessor.finishCollectionCycle()
2935

30-
_ = meterSharedState.metricExporter.export(metrics: metricToExport, shouldCancel: shouldCancel)
31-
let timeInterval = Date().timeIntervalSince(start)
32-
let remainingWait = meterSharedState.metricPushInterval - timeInterval
33-
if remainingWait > 0 {
34-
usleep(UInt32(remainingWait * 1000000))
35-
}
36-
}
36+
_ = meterSharedState.metricExporter.export(metrics: metricToExport, shouldCancel: shouldCancel)
3737
}
3838
}
39+
40+
metricPushTimer.schedule(deadline: .now() + meterSharedState.metricPushInterval, repeating: meterSharedState.metricPushInterval)
41+
metricPushTimer.activate()
42+
}
43+
44+
deinit {
45+
metricPushTimer.suspend() // suspending the timer prior to checking `isCancelled()` prevents a race condition between the check and actually calling `cancel()`
46+
if !metricPushTimer.isCancelled {
47+
metricPushTimer.cancel()
48+
metricPushTimer.resume()
49+
}
3950
}
4051
}

Sources/OpenTelemetrySdk/Metrics/MeterProviderSdk.swift

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,15 @@ public class MeterProviderSdk: MeterProvider {
2626
metricPushInterval: TimeInterval = MeterProviderSdk.defaultPushInterval,
2727
resource: Resource = EnvVarResource.resource)
2828
{
29-
self.meterSharedState = MeterSharedState(metricProcessor: metricProcessor, metricPushInterval: metricPushInterval, metricExporter: metricExporter, resource: resource)
29+
meterSharedState = MeterSharedState(metricProcessor: metricProcessor, metricPushInterval: metricPushInterval, metricExporter: metricExporter, resource: resource)
3030

31-
defaultMeter = MeterSdk(meterSharedState: self.meterSharedState, instrumentationLibraryInfo: InstrumentationLibraryInfo())
31+
defaultMeter = MeterSdk(meterSharedState: meterSharedState, instrumentationLibraryInfo: InstrumentationLibraryInfo())
3232

3333
pushMetricController = PushMetricController(
3434
meterProvider: self,
35-
meterSharedState: self.meterSharedState) {
36-
false
35+
meterSharedState: meterSharedState
36+
) {
37+
false
3738
}
3839
}
3940

@@ -49,7 +50,7 @@ public class MeterProviderSdk: MeterProvider {
4950
let instrumentationLibraryInfo = InstrumentationLibraryInfo(name: instrumentationName, version: instrumentationVersion)
5051
var meter: MeterSdk! = meterRegistry[instrumentationLibraryInfo]
5152
if meter == nil {
52-
meter = MeterSdk(meterSharedState: self.meterSharedState, instrumentationLibraryInfo: instrumentationLibraryInfo)
53+
meter = MeterSdk(meterSharedState: meterSharedState, instrumentationLibraryInfo: instrumentationLibraryInfo)
5354
meterRegistry[instrumentationLibraryInfo] = meter!
5455
}
5556
return meter!

Tests/ExportersTests/DatadogExporter/DatadogExporterTests.swift

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -120,10 +120,11 @@ class DatadogExporterTests: XCTestCase {
120120

121121
let datadogExporter = try! DatadogExporter(config: exporterConfiguration)
122122

123+
let provider = MeterProviderSdk(metricProcessor: UngroupedBatcher(),
124+
metricExporter: datadogExporter,
125+
metricPushInterval: 0.1)
123126

124-
let meter = MeterProviderSdk(metricProcessor: UngroupedBatcher(),
125-
metricExporter: datadogExporter,
126-
metricPushInterval: 0.1).get(instrumentationName: "MyMeter")
127+
let meter = provider.get(instrumentationName: "MyMeter")
127128

128129
let testCounter = meter.createIntCounter(name: "MyCounter")
129130

Tests/ExportersTests/Prometheus/PrometheusExporterTests.swift

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ class PrometheusExporterTests: XCTestCase {
1212
let metricPushIntervalSec = 0.05
1313
let waitDuration = 0.1 + 0.1
1414

15+
1516
func testMetricsHttpServerAsync() {
1617
let promOptions = PrometheusExporterOptions(url: "http://localhost:9184/metrics/")
1718
let promExporter = PrometheusExporter(options: promOptions)
@@ -29,7 +30,7 @@ class PrometheusExporterTests: XCTestCase {
2930
}
3031
}
3132

32-
collectMetrics(simpleProcessor: simpleProcessor, exporter: promExporter)
33+
let retain_me = collectMetrics(simpleProcessor: simpleProcessor, exporter: promExporter)
3334
usleep(useconds_t(waitDuration * 1000000))
3435
let url = URL(string: "http://localhost:9184/metrics/")!
3536
let task = URLSession.shared.dataTask(with: url) { data, response, error in
@@ -59,10 +60,10 @@ class PrometheusExporterTests: XCTestCase {
5960
metricsHttpServer.stop()
6061
}
6162

62-
private func collectMetrics(simpleProcessor: UngroupedBatcher, exporter: MetricExporter) {
63+
private func collectMetrics(simpleProcessor: UngroupedBatcher, exporter: MetricExporter) -> MeterProviderSdk {
6364

6465
let meterProvider = MeterProviderSdk(metricProcessor: simpleProcessor, metricExporter: exporter, metricPushInterval: metricPushIntervalSec)
65-
66+
6667
let meter = meterProvider.get(instrumentationName: "library1")
6768

6869
let testCounter = meter.createIntCounter(name: "testCounter")
@@ -81,6 +82,7 @@ class PrometheusExporterTests: XCTestCase {
8182
testMeasure.record(value: 5, labelset: meter.getLabelSet(labels: labels1))
8283
testMeasure.record(value: 500, labelset: meter.getLabelSet(labels: labels1))
8384
}
85+
return meterProvider
8486
}
8587

8688
private func validateResponse(responseText: String) {

0 commit comments

Comments
 (0)