Skip to content

Commit e8aeafa

Browse files
authored
prometheus: scrapers are not stopped when receiver is shutdown (#3450)
During receiver shutdown discovery is stopped. However, any previously discovered targets will continue to be scraped. This calls `Stop` on the scrape manager to stop the scraping on shutdown as well. **Issue:** https://github.com/signalfx/splunk-otel-collector/issues/471 **Testing:** - added check to Prometheus end to end test that scraping is actually stopped - tested end to end with receivercreator, saw that scraping actually stopped after pod was deleted
1 parent 329285d commit e8aeafa

File tree

5 files changed

+55
-12
lines changed

5 files changed

+55
-12
lines changed

receiver/prometheusreceiver/internal/metadata.go

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package internal
1616

1717
import (
1818
"errors"
19+
"sync"
1920

2021
"github.com/prometheus/common/model"
2122
"github.com/prometheus/prometheus/pkg/labels"
@@ -33,10 +34,27 @@ type ScrapeManager interface {
3334
}
3435

3536
type metadataService struct {
36-
sm ScrapeManager
37+
sync.Mutex
38+
stopped bool
39+
sm ScrapeManager
40+
}
41+
42+
func (s *metadataService) Close() {
43+
s.Lock()
44+
s.stopped = true
45+
s.Unlock()
3746
}
3847

3948
func (s *metadataService) Get(job, instance string) (MetadataCache, error) {
49+
s.Lock()
50+
defer s.Unlock()
51+
52+
// If we're already stopped return early so that we don't call scrapeManager.TargetsAll()
53+
// which will result in deadlock if scrapeManager is being stopped.
54+
if s.stopped {
55+
return nil, errAlreadyStopped
56+
}
57+
4058
targetGroup, ok := s.sm.TargetsAll()[job]
4159
if !ok {
4260
return nil, errors.New("unable to find a target group with job=" + job)

receiver/prometheusreceiver/internal/ocastore.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -106,9 +106,11 @@ func (o *OcaStore) Appender(context.Context) storage.Appender {
106106
return noop
107107
}
108108

109-
func (o *OcaStore) Close() error {
110-
atomic.CompareAndSwapInt32(&o.running, runningStateReady, runningStateStop)
111-
return nil
109+
// Close OcaStore as well as the internal metadataService.
110+
func (o *OcaStore) Close() {
111+
if atomic.CompareAndSwapInt32(&o.running, runningStateReady, runningStateStop) {
112+
o.mc.Close()
113+
}
112114
}
113115

114116
// noopAppender, always return error on any operations

receiver/prometheusreceiver/internal/ocastore_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ func TestOcaStore(t *testing.T) {
3333
app := o.Appender(context.Background())
3434
require.NotNil(t, app, "Expecting app")
3535

36-
_ = o.Close()
36+
o.Close()
3737

3838
app = o.Appender(context.Background())
3939
assert.Equal(t, noop, app)

receiver/prometheusreceiver/metrics_receiver.go

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,9 @@ type pReceiver struct {
3636
consumer consumer.Metrics
3737
cancelFunc context.CancelFunc
3838

39-
logger *zap.Logger
39+
logger *zap.Logger
40+
scrapeManager *scrape.Manager
41+
ocaStore *internal.OcaStore
4042
}
4143

4244
// New creates a new prometheus.Receiver reference.
@@ -79,7 +81,7 @@ func (r *pReceiver) Start(_ context.Context, host component.Host) error {
7981
// Per component.Component Start instructions, for async operations we should not use the
8082
// incoming context, it may get cancelled.
8183
receiverCtx := obsreport.ReceiverContext(context.Background(), r.cfg.ID(), transport)
82-
ocaStore := internal.NewOcaStore(
84+
r.ocaStore = internal.NewOcaStore(
8385
receiverCtx,
8486
r.consumer,
8587
r.logger,
@@ -89,13 +91,13 @@ func (r *pReceiver) Start(_ context.Context, host component.Host) error {
8991
r.cfg.ID(),
9092
r.cfg.PrometheusConfig.GlobalConfig.ExternalLabels,
9193
)
92-
scrapeManager := scrape.NewManager(logger, ocaStore)
93-
ocaStore.SetScrapeManager(scrapeManager)
94-
if err := scrapeManager.ApplyConfig(r.cfg.PrometheusConfig); err != nil {
94+
r.scrapeManager = scrape.NewManager(logger, r.ocaStore)
95+
r.ocaStore.SetScrapeManager(r.scrapeManager)
96+
if err := r.scrapeManager.ApplyConfig(r.cfg.PrometheusConfig); err != nil {
9597
return err
9698
}
9799
go func() {
98-
if err := scrapeManager.Run(discoveryManager.SyncCh()); err != nil {
100+
if err := r.scrapeManager.Run(discoveryManager.SyncCh()); err != nil {
99101
r.logger.Error("Scrape manager failed", zap.Error(err))
100102
host.ReportFatalError(err)
101103
}
@@ -106,5 +108,11 @@ func (r *pReceiver) Start(_ context.Context, host component.Host) error {
106108
// Shutdown stops and cancels the underlying Prometheus scrapers.
107109
func (r *pReceiver) Shutdown(context.Context) error {
108110
r.cancelFunc()
111+
// ocaStore (and internally metadataService) needs to stop first to prevent deadlocks.
112+
// When stopping scrapeManager it waits for all scrapes to terminate. However during
113+
// scraping metadataService calls scrapeManager.AllTargets() which acquires
114+
// the same lock that's acquired when scrapeManager is stopped.
115+
r.ocaStore.Close()
116+
r.scrapeManager.Stop()
109117
return nil
110118
}

receiver/prometheusreceiver/metrics_receiver_test.go

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1"
3434
gokitlog "github.com/go-kit/kit/log"
3535
promcfg "github.com/prometheus/prometheus/config"
36+
"github.com/prometheus/prometheus/scrape"
3637
"github.com/stretchr/testify/assert"
3738
"github.com/stretchr/testify/require"
3839
"go.uber.org/zap"
@@ -1403,7 +1404,12 @@ func testEndToEnd(t *testing.T, targets []*testData, useStartTimeMetric bool) {
14031404
UseStartTimeMetric: useStartTimeMetric}, cms)
14041405

14051406
require.NoError(t, rcvr.Start(context.Background(), componenttest.NewNopHost()), "Failed to invoke Start: %v", err)
1406-
t.Cleanup(func() { require.NoError(t, rcvr.Shutdown(context.Background())) })
1407+
t.Cleanup(func() {
1408+
// verify state after shutdown is called
1409+
assert.Lenf(t, flattenTargets(rcvr.scrapeManager.TargetsAll()), len(targets), "expected %v targets to be running", len(targets))
1410+
require.NoError(t, rcvr.Shutdown(context.Background()))
1411+
assert.Len(t, flattenTargets(rcvr.scrapeManager.TargetsAll()), 0, "expected scrape manager to have no targets")
1412+
})
14071413

14081414
// wait for all provided data to be scraped
14091415
mp.wg.Wait()
@@ -1436,6 +1442,15 @@ func testEndToEnd(t *testing.T, targets []*testData, useStartTimeMetric bool) {
14361442
}
14371443
}
14381444

1445+
// flattenTargets takes a map of jobs to target and flattens to a list of targets
1446+
func flattenTargets(targets map[string][]*scrape.Target) []*scrape.Target {
1447+
var flatTargets []*scrape.Target
1448+
for _, target := range targets {
1449+
flatTargets = append(flatTargets, target...)
1450+
}
1451+
return flatTargets
1452+
}
1453+
14391454
var startTimeMetricRegexPage = `
14401455
# HELP go_threads Number of OS threads created
14411456
# TYPE go_threads gauge

0 commit comments

Comments
 (0)