@@ -16,7 +16,6 @@ package prometheusreceiver
1616
1717import (
1818 "context"
19- "sync"
2019 "time"
2120
2221 "github.com/prometheus/prometheus/discovery"
@@ -25,18 +24,16 @@ import (
2524
2625 "go.opentelemetry.io/collector/component"
2726 "go.opentelemetry.io/collector/consumer"
28- "go.opentelemetry.io/collector/obsreport"
2927 "go.opentelemetry.io/collector/receiver/prometheusreceiver/internal"
3028)
3129
3230// pReceiver is the type that provides Prometheus scraper/receiver functionality.
3331type pReceiver struct {
34- startOnce sync.Once
35- stopOnce sync.Once
36- cfg * Config
37- consumer consumer.MetricsConsumer
38- cancel context.CancelFunc
39- logger * zap.Logger
32+ cfg * Config
33+ consumer consumer.MetricsConsumer
34+ cancelFunc context.CancelFunc
35+
36+ logger * zap.Logger
4037}
4138
4239// New creates a new prometheus.Receiver reference.
@@ -51,62 +48,49 @@ func newPrometheusReceiver(logger *zap.Logger, cfg *Config, next consumer.Metric
5148
5249// Start is the method that starts Prometheus scraping and it
5350// is controlled by having previously defined a Configuration using perhaps New.
54- func (pr * pReceiver ) Start (_ context.Context , host component.Host ) error {
55- pr .startOnce .Do (func () {
56- ctx := context .Background ()
57- c , cancel := context .WithCancel (ctx )
58- pr .cancel = cancel
59- c = obsreport .ReceiverContext (c , pr .cfg .Name (), "http" )
60- var jobsMap * internal.JobsMap
61- if ! pr .cfg .UseStartTimeMetric {
62- jobsMap = internal .NewJobsMap (2 * time .Minute )
63- }
64- app := internal .NewOcaStore (c , pr .consumer , pr .logger , jobsMap , pr .cfg .UseStartTimeMetric , pr .cfg .StartTimeMetricRegex , pr .cfg .Name ())
65- // need to use a logger with the gokitLog interface
66- l := internal .NewZapToGokitLogAdapter (pr .logger )
67- scrapeManager := scrape .NewManager (l , app )
68- app .SetScrapeManager (scrapeManager )
69- discoveryManagerScrape := discovery .NewManager (ctx , l )
70- go func () {
71- if err := discoveryManagerScrape .Run (); err != nil {
72- host .ReportFatalError (err )
73- }
74- }()
75- if err := scrapeManager .ApplyConfig (pr .cfg .PrometheusConfig ); err != nil {
76- host .ReportFatalError (err )
77- return
78- }
51+ func (r * pReceiver ) Start (ctx context.Context , host component.Host ) error {
52+ discoveryCtx , cancel := context .WithCancel (context .Background ())
53+ r .cancelFunc = cancel
7954
80- // Run the scrape manager.
81- syncConfig := make (chan bool )
82- errsChan := make (chan error , 1 )
83- go func () {
84- defer close (errsChan )
85- <- time .After (100 * time .Millisecond )
86- close (syncConfig )
87- if err := scrapeManager .Run (discoveryManagerScrape .SyncCh ()); err != nil {
88- errsChan <- err
89- }
90- }()
91- <- syncConfig
92- // By this point we've given time to the scrape manager
93- // to start applying its original configuration.
55+ logger := internal .NewZapToGokitLogAdapter (r .logger )
9456
95- discoveryCfg := make (map [string ]discovery.Configs )
96- for _ , scrapeConfig := range pr .cfg .PrometheusConfig .ScrapeConfigs {
97- discoveryCfg [scrapeConfig .JobName ] = scrapeConfig .ServiceDiscoveryConfigs
57+ discoveryManager := discovery .NewManager (discoveryCtx , logger )
58+ discoveryCfg := make (map [string ]discovery.Configs )
59+ for _ , scrapeConfig := range r .cfg .PrometheusConfig .ScrapeConfigs {
60+ discoveryCfg [scrapeConfig .JobName ] = scrapeConfig .ServiceDiscoveryConfigs
61+ }
62+ if err := discoveryManager .ApplyConfig (discoveryCfg ); err != nil {
63+ return err
64+ }
65+ go func () {
66+ if err := discoveryManager .Run (); err != nil {
67+ r .logger .Error ("Discovery manager failed" , zap .Error (err ))
68+ host .ReportFatalError (err )
9869 }
70+ }()
71+
72+ var jobsMap * internal.JobsMap
73+ if ! r .cfg .UseStartTimeMetric {
74+ jobsMap = internal .NewJobsMap (2 * time .Minute )
75+ }
76+ ocaStore := internal .NewOcaStore (ctx , r .consumer , r .logger , jobsMap , r .cfg .UseStartTimeMetric , r .cfg .StartTimeMetricRegex , r .cfg .Name ())
9977
100- // Now trigger the discovery notification to the scrape manager.
101- if err := discoveryManagerScrape .ApplyConfig (discoveryCfg ); err != nil {
102- errsChan <- err
78+ scrapeManager := scrape .NewManager (logger , ocaStore )
79+ ocaStore .SetScrapeManager (scrapeManager )
80+ if err := scrapeManager .ApplyConfig (r .cfg .PrometheusConfig ); err != nil {
81+ return err
82+ }
83+ go func () {
84+ if err := scrapeManager .Run (discoveryManager .SyncCh ()); err != nil {
85+ r .logger .Error ("Scrape manager failed" , zap .Error (err ))
86+ host .ReportFatalError (err )
10387 }
104- })
88+ }( )
10589 return nil
10690}
10791
10892// Shutdown stops and cancels the underlying Prometheus scrapers.
109- func (pr * pReceiver ) Shutdown (context.Context ) error {
110- pr . stopOnce . Do ( pr . cancel )
93+ func (r * pReceiver ) Shutdown (context.Context ) error {
94+ r . cancelFunc ( )
11195 return nil
11296}
0 commit comments