Skip to content

Commit 3e3b201

Browse files
authored
Fix the scraper/discover manager coordination on the Prometheus receiver (#2089)
* Fix the scraper/discover manager coordination on the Prometheus receiver The receiver contains various unnecessary sections. Rewriting the receiver's Start for better maintainability. Related to #1909. * Use the background context * Remove dead code
1 parent b0bb74e commit 3e3b201

File tree

3 files changed

+51
-68
lines changed

3 files changed

+51
-68
lines changed

receiver/prometheusreceiver/internal/ocastore.go

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ package internal
1717
import (
1818
"context"
1919
"io"
20-
"sync"
2120
"sync/atomic"
2221

2322
"github.com/prometheus/prometheus/pkg/labels"
@@ -47,16 +46,17 @@ type OcaStore interface {
4746

4847
// OpenCensus Store for prometheus
4948
type ocaStore struct {
50-
running int32
51-
logger *zap.Logger
49+
ctx context.Context
50+
51+
running int32 // access atomically
5252
sink consumer.MetricsConsumer
5353
mc *mService
54-
once *sync.Once
55-
ctx context.Context
5654
jobsMap *JobsMap
5755
useStartTimeMetric bool
5856
startTimeMetricRegex string
5957
receiverName string
58+
59+
logger *zap.Logger
6060
}
6161

6262
// NewOcaStore returns an ocaStore instance, which can be acted as prometheus' scrape.Appendable
@@ -66,7 +66,6 @@ func NewOcaStore(ctx context.Context, sink consumer.MetricsConsumer, logger *zap
6666
ctx: ctx,
6767
sink: sink,
6868
logger: logger,
69-
once: &sync.Once{},
7069
jobsMap: jobsMap,
7170
useStartTimeMetric: useStartTimeMetric,
7271
startTimeMetricRegex: startTimeMetricRegex,

receiver/prometheusreceiver/metrics_receiver.go

Lines changed: 40 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ package prometheusreceiver
1616

1717
import (
1818
"context"
19-
"sync"
2019
"time"
2120

2221
"github.com/prometheus/prometheus/discovery"
@@ -25,18 +24,16 @@ import (
2524

2625
"go.opentelemetry.io/collector/component"
2726
"go.opentelemetry.io/collector/consumer"
28-
"go.opentelemetry.io/collector/obsreport"
2927
"go.opentelemetry.io/collector/receiver/prometheusreceiver/internal"
3028
)
3129

3230
// pReceiver is the type that provides Prometheus scraper/receiver functionality.
3331
type pReceiver struct {
34-
startOnce sync.Once
35-
stopOnce sync.Once
36-
cfg *Config
37-
consumer consumer.MetricsConsumer
38-
cancel context.CancelFunc
39-
logger *zap.Logger
32+
cfg *Config
33+
consumer consumer.MetricsConsumer
34+
cancelFunc context.CancelFunc
35+
36+
logger *zap.Logger
4037
}
4138

4239
// New creates a new prometheus.Receiver reference.
@@ -51,62 +48,49 @@ func newPrometheusReceiver(logger *zap.Logger, cfg *Config, next consumer.Metric
5148

5249
// Start is the method that starts Prometheus scraping and it
5350
// is controlled by having previously defined a Configuration using perhaps New.
54-
func (pr *pReceiver) Start(_ context.Context, host component.Host) error {
55-
pr.startOnce.Do(func() {
56-
ctx := context.Background()
57-
c, cancel := context.WithCancel(ctx)
58-
pr.cancel = cancel
59-
c = obsreport.ReceiverContext(c, pr.cfg.Name(), "http")
60-
var jobsMap *internal.JobsMap
61-
if !pr.cfg.UseStartTimeMetric {
62-
jobsMap = internal.NewJobsMap(2 * time.Minute)
63-
}
64-
app := internal.NewOcaStore(c, pr.consumer, pr.logger, jobsMap, pr.cfg.UseStartTimeMetric, pr.cfg.StartTimeMetricRegex, pr.cfg.Name())
65-
// need to use a logger with the gokitLog interface
66-
l := internal.NewZapToGokitLogAdapter(pr.logger)
67-
scrapeManager := scrape.NewManager(l, app)
68-
app.SetScrapeManager(scrapeManager)
69-
discoveryManagerScrape := discovery.NewManager(ctx, l)
70-
go func() {
71-
if err := discoveryManagerScrape.Run(); err != nil {
72-
host.ReportFatalError(err)
73-
}
74-
}()
75-
if err := scrapeManager.ApplyConfig(pr.cfg.PrometheusConfig); err != nil {
76-
host.ReportFatalError(err)
77-
return
78-
}
51+
func (r *pReceiver) Start(ctx context.Context, host component.Host) error {
52+
discoveryCtx, cancel := context.WithCancel(context.Background())
53+
r.cancelFunc = cancel
7954

80-
// Run the scrape manager.
81-
syncConfig := make(chan bool)
82-
errsChan := make(chan error, 1)
83-
go func() {
84-
defer close(errsChan)
85-
<-time.After(100 * time.Millisecond)
86-
close(syncConfig)
87-
if err := scrapeManager.Run(discoveryManagerScrape.SyncCh()); err != nil {
88-
errsChan <- err
89-
}
90-
}()
91-
<-syncConfig
92-
// By this point we've given time to the scrape manager
93-
// to start applying its original configuration.
55+
logger := internal.NewZapToGokitLogAdapter(r.logger)
9456

95-
discoveryCfg := make(map[string]discovery.Configs)
96-
for _, scrapeConfig := range pr.cfg.PrometheusConfig.ScrapeConfigs {
97-
discoveryCfg[scrapeConfig.JobName] = scrapeConfig.ServiceDiscoveryConfigs
57+
discoveryManager := discovery.NewManager(discoveryCtx, logger)
58+
discoveryCfg := make(map[string]discovery.Configs)
59+
for _, scrapeConfig := range r.cfg.PrometheusConfig.ScrapeConfigs {
60+
discoveryCfg[scrapeConfig.JobName] = scrapeConfig.ServiceDiscoveryConfigs
61+
}
62+
if err := discoveryManager.ApplyConfig(discoveryCfg); err != nil {
63+
return err
64+
}
65+
go func() {
66+
if err := discoveryManager.Run(); err != nil {
67+
r.logger.Error("Discovery manager failed", zap.Error(err))
68+
host.ReportFatalError(err)
9869
}
70+
}()
71+
72+
var jobsMap *internal.JobsMap
73+
if !r.cfg.UseStartTimeMetric {
74+
jobsMap = internal.NewJobsMap(2 * time.Minute)
75+
}
76+
ocaStore := internal.NewOcaStore(ctx, r.consumer, r.logger, jobsMap, r.cfg.UseStartTimeMetric, r.cfg.StartTimeMetricRegex, r.cfg.Name())
9977

100-
// Now trigger the discovery notification to the scrape manager.
101-
if err := discoveryManagerScrape.ApplyConfig(discoveryCfg); err != nil {
102-
errsChan <- err
78+
scrapeManager := scrape.NewManager(logger, ocaStore)
79+
ocaStore.SetScrapeManager(scrapeManager)
80+
if err := scrapeManager.ApplyConfig(r.cfg.PrometheusConfig); err != nil {
81+
return err
82+
}
83+
go func() {
84+
if err := scrapeManager.Run(discoveryManager.SyncCh()); err != nil {
85+
r.logger.Error("Scrape manager failed", zap.Error(err))
86+
host.ReportFatalError(err)
10387
}
104-
})
88+
}()
10589
return nil
10690
}
10791

10892
// Shutdown stops and cancels the underlying Prometheus scrapers.
109-
func (pr *pReceiver) Shutdown(context.Context) error {
110-
pr.stopOnce.Do(pr.cancel)
93+
func (r *pReceiver) Shutdown(context.Context) error {
94+
r.cancelFunc()
11195
return nil
11296
}

service/internal/resources.go

Lines changed: 6 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)