Skip to content

Commit 3b01fd7

Browse files
committed
Make scrapercontroler more generic and move closer to the scraperreceiver
Signed-off-by: Bogdan Drutu <[email protected]>
1 parent c56efd3 commit 3b01fd7

File tree

6 files changed

+313
-234
lines changed

6 files changed

+313
-234
lines changed

.chloggen/generalize-scraper.yaml

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: deprecation
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
7+
component: scraperhelper
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Deprecate `scraperhelper.NewScraperControllerReceiver` and `scraperhelper.ScraperControllerOption`.
11+
12+
# One or more tracking issues or pull requests related to the change
13+
issues: [12103]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: Use `scraperhelper.NewMetricsController` instead of `scraperhelper.NewScraperControllerReceiver` |
19+
Use `scraperhelper.ScraperControllerOption` instead of `scraperhelper.ControllerOption`
20+
21+
# Optional: The change log or logs in which this entry should be included.
22+
# e.g. '[user]' or '[user, api]'
23+
# Include 'user' if the change is relevant to end users.
24+
# Include 'api' if there is a change to a library API.
25+
# Default: '[user]'
26+
change_logs: []

scraper/scraperhelper/controller.go

Lines changed: 262 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,262 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package scraperhelper // import "go.opentelemetry.io/collector/scraper/scraperhelper"
5+
6+
import (
7+
"context"
8+
"sync"
9+
"time"
10+
11+
"go.uber.org/multierr"
12+
13+
"go.opentelemetry.io/collector/component"
14+
"go.opentelemetry.io/collector/consumer"
15+
"go.opentelemetry.io/collector/pdata/pmetric"
16+
"go.opentelemetry.io/collector/receiver"
17+
"go.opentelemetry.io/collector/receiver/receiverhelper"
18+
"go.opentelemetry.io/collector/scraper"
19+
"go.opentelemetry.io/collector/scraper/scrapererror"
20+
)
21+
22+
// Deprecated: [v0.118.0] use ControllerOption.
23+
type ScraperControllerOption = ControllerOption
24+
25+
// ControllerOption apply changes to internal options.
26+
type ControllerOption interface {
27+
apply(*controllerOptions)
28+
}
29+
30+
type optionFunc func(*controllerOptions)
31+
32+
func (of optionFunc) apply(e *controllerOptions) {
33+
of(e)
34+
}
35+
36+
// AddScraper configures the scraper.Metrics to be called with the specified options,
37+
// and at the specified collection interval.
38+
//
39+
// Observability information will be reported, and the scraped metrics
40+
// will be passed to the next consumer.
41+
func AddScraper(t component.Type, sc scraper.Metrics) ControllerOption {
42+
f := scraper.NewFactory(t, nil,
43+
scraper.WithMetrics(func(ctx context.Context, set scraper.Settings, config component.Config) (scraper.Metrics, error) {
44+
return sc, nil
45+
}, component.StabilityLevelAlpha))
46+
return AddFactoryWithConfig(f, nil)
47+
}
48+
49+
// AddFactoryWithConfig configures the scraper.Factory and associated config that
50+
// will be used to create a new scraper. The created scraper will be called with
51+
// the specified options, and at the specified collection interval.
52+
//
53+
// Observability information will be reported, and the scraped metrics
54+
// will be passed to the next consumer.
55+
func AddFactoryWithConfig(f scraper.Factory, cfg component.Config) ControllerOption {
56+
return optionFunc(func(o *controllerOptions) {
57+
o.factoriesWithConfig = append(o.factoriesWithConfig, factoryWithConfig{f: f, cfg: cfg})
58+
})
59+
}
60+
61+
// WithTickerChannel allows you to override the scraper controller's ticker
62+
// channel to specify when scrape is called. This is only expected to be
63+
// used by tests.
64+
func WithTickerChannel(tickerCh <-chan time.Time) ControllerOption {
65+
return optionFunc(func(o *controllerOptions) {
66+
o.tickerCh = tickerCh
67+
})
68+
}
69+
70+
type factoryWithConfig struct {
71+
f scraper.Factory
72+
cfg component.Config
73+
}
74+
75+
type controllerOptions struct {
76+
tickerCh <-chan time.Time
77+
factoriesWithConfig []factoryWithConfig
78+
}
79+
80+
type controller[T component.Component] struct {
81+
collectionInterval time.Duration
82+
initialDelay time.Duration
83+
timeout time.Duration
84+
85+
scrapers []T
86+
scrapeFunc func(*controller[T])
87+
tickerCh <-chan time.Time
88+
89+
done chan struct{}
90+
wg sync.WaitGroup
91+
92+
obsrecv *receiverhelper.ObsReport
93+
}
94+
95+
func newController[T component.Component](
96+
cfg *ControllerConfig,
97+
rSet receiver.Settings,
98+
scrapers []T,
99+
scrapeFunc func(*controller[T]),
100+
tickerCh <-chan time.Time,
101+
) (*controller[T], error) {
102+
obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{
103+
ReceiverID: rSet.ID,
104+
Transport: "",
105+
ReceiverCreateSettings: rSet,
106+
})
107+
if err != nil {
108+
return nil, err
109+
}
110+
111+
cs := &controller[T]{
112+
collectionInterval: cfg.CollectionInterval,
113+
initialDelay: cfg.InitialDelay,
114+
timeout: cfg.Timeout,
115+
scrapers: scrapers,
116+
scrapeFunc: scrapeFunc,
117+
done: make(chan struct{}),
118+
tickerCh: tickerCh,
119+
obsrecv: obsrecv,
120+
}
121+
122+
return cs, nil
123+
}
124+
125+
// Start the receiver, invoked during service start.
126+
func (sc *controller[T]) Start(ctx context.Context, host component.Host) error {
127+
for _, scrp := range sc.scrapers {
128+
if err := scrp.Start(ctx, host); err != nil {
129+
return err
130+
}
131+
}
132+
133+
sc.startScraping()
134+
return nil
135+
}
136+
137+
// Shutdown the receiver, invoked during service shutdown.
138+
func (sc *controller[T]) Shutdown(ctx context.Context) error {
139+
// Signal the goroutine to stop.
140+
close(sc.done)
141+
sc.wg.Wait()
142+
var errs error
143+
for _, scrp := range sc.scrapers {
144+
errs = multierr.Append(errs, scrp.Shutdown(ctx))
145+
}
146+
147+
return errs
148+
}
149+
150+
// startScraping initiates a ticker that calls Scrape based on the configured
151+
// collection interval.
152+
func (sc *controller[T]) startScraping() {
153+
sc.wg.Add(1)
154+
go func() {
155+
defer sc.wg.Done()
156+
if sc.initialDelay > 0 {
157+
select {
158+
case <-time.After(sc.initialDelay):
159+
case <-sc.done:
160+
return
161+
}
162+
}
163+
164+
if sc.tickerCh == nil {
165+
ticker := time.NewTicker(sc.collectionInterval)
166+
defer ticker.Stop()
167+
168+
sc.tickerCh = ticker.C
169+
}
170+
// Call scrape method during initialization to ensure
171+
// that scrapers start from when the component starts
172+
// instead of waiting for the full duration to start.
173+
sc.scrapeFunc(sc)
174+
for {
175+
select {
176+
case <-sc.tickerCh:
177+
sc.scrapeFunc(sc)
178+
case <-sc.done:
179+
return
180+
}
181+
}
182+
}()
183+
}
184+
185+
// Deprecated: [v0.118.0] Use NewMetricsController.
186+
func NewScraperControllerReceiver(
187+
cfg *ControllerConfig,
188+
set receiver.Settings,
189+
nextConsumer consumer.Metrics,
190+
options ...ControllerOption,
191+
) (component.Component, error) {
192+
return NewMetricsController(cfg, set, nextConsumer, options...)
193+
}
194+
195+
// NewMetricsController creates a receiver.Metrics with the configured options, that can control multiple scraper.Metrics.
196+
func NewMetricsController(cfg *ControllerConfig,
197+
rSet receiver.Settings,
198+
nextConsumer consumer.Metrics,
199+
options ...ControllerOption,
200+
) (receiver.Metrics, error) {
201+
co := getOptions(options)
202+
scrapers := make([]scraper.Metrics, 0, len(co.factoriesWithConfig))
203+
for _, fwc := range co.factoriesWithConfig {
204+
set := getSettings(fwc.f.Type(), rSet)
205+
s, err := fwc.f.CreateMetrics(context.Background(), set, fwc.cfg)
206+
if err != nil {
207+
return nil, err
208+
}
209+
s, err = wrapObsMetrics(s, rSet.ID, set.ID, set.TelemetrySettings)
210+
if err != nil {
211+
return nil, err
212+
}
213+
scrapers = append(scrapers, s)
214+
}
215+
return newController[scraper.Metrics](
216+
cfg, rSet, scrapers, func(c *controller[scraper.Metrics]) { scrapeMetrics(c, nextConsumer) }, co.tickerCh)
217+
}
218+
219+
func scrapeMetrics(c *controller[scraper.Metrics], nextConsumer consumer.Metrics) {
220+
ctx, done := withScrapeContext(c.timeout)
221+
defer done()
222+
223+
metrics := pmetric.NewMetrics()
224+
for i := range c.scrapers {
225+
md, err := c.scrapers[i].ScrapeMetrics(ctx)
226+
if err != nil && !scrapererror.IsPartialScrapeError(err) {
227+
continue
228+
}
229+
md.ResourceMetrics().MoveAndAppendTo(metrics.ResourceMetrics())
230+
}
231+
232+
dataPointCount := metrics.DataPointCount()
233+
ctx = c.obsrecv.StartMetricsOp(ctx)
234+
err := nextConsumer.ConsumeMetrics(ctx, metrics)
235+
c.obsrecv.EndMetricsOp(ctx, "", dataPointCount, err)
236+
}
237+
238+
func getOptions(options []ControllerOption) controllerOptions {
239+
co := controllerOptions{}
240+
for _, op := range options {
241+
op.apply(&co)
242+
}
243+
return co
244+
}
245+
246+
func getSettings(sType component.Type, rSet receiver.Settings) scraper.Settings {
247+
return scraper.Settings{
248+
ID: component.NewID(sType),
249+
TelemetrySettings: rSet.TelemetrySettings,
250+
BuildInfo: rSet.BuildInfo,
251+
}
252+
}
253+
254+
// withScrapeContext will return a context that has no deadline if timeout is 0
255+
// which implies no explicit timeout had occurred, otherwise, a context
256+
// with a deadline of the provided timeout is returned.
257+
func withScrapeContext(timeout time.Duration) (context.Context, context.CancelFunc) {
258+
if timeout == 0 {
259+
return context.WithCancel(context.Background())
260+
}
261+
return context.WithTimeout(context.Background(), timeout)
262+
}

scraper/scraperhelper/scrapercontroller_test.go renamed to scraper/scraperhelper/controller_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ func TestScrapeController(t *testing.T) {
138138
cfg = test.scraperControllerSettings
139139
}
140140

141-
mr, err := NewScraperControllerReceiver(cfg, receiver.Settings{ID: receiverID, TelemetrySettings: tt.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}, sink, options...)
141+
mr, err := NewMetricsController(cfg, receiver.Settings{ID: receiverID, TelemetrySettings: tt.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}, sink, options...)
142142
require.NoError(t, err)
143143

144144
err = mr.Start(context.Background(), componenttest.NewNopHost())
@@ -194,8 +194,8 @@ func TestScrapeController(t *testing.T) {
194194
}
195195
}
196196

197-
func configureMetricOptions(t *testing.T, test metricsTestCase, initializeChs []chan bool, scrapeMetricsChs []chan int, closeChs []chan bool) []ScraperControllerOption {
198-
var metricOptions []ScraperControllerOption
197+
func configureMetricOptions(t *testing.T, test metricsTestCase, initializeChs []chan bool, scrapeMetricsChs []chan int, closeChs []chan bool) []ControllerOption {
198+
var metricOptions []ControllerOption
199199

200200
for i := 0; i < test.scrapers; i++ {
201201
var scraperOptions []scraper.Option
@@ -317,7 +317,7 @@ func TestSingleScrapePerInterval(t *testing.T) {
317317
scp, err := scraper.NewMetrics(tsm.scrape)
318318
require.NoError(t, err)
319319

320-
recv, err := NewScraperControllerReceiver(
320+
recv, err := NewMetricsController(
321321
cfg,
322322
receivertest.NewNopSettings(),
323323
new(consumertest.MetricsSink),
@@ -359,7 +359,7 @@ func TestScrapeControllerStartsOnInit(t *testing.T) {
359359
scp, err := scraper.NewMetrics(tsm.scrape)
360360
require.NoError(t, err, "Must not error when creating scraper")
361361

362-
r, err := NewScraperControllerReceiver(
362+
r, err := NewMetricsController(
363363
&ControllerConfig{
364364
CollectionInterval: time.Hour,
365365
InitialDelay: 0,
@@ -398,7 +398,7 @@ func TestScrapeControllerInitialDelay(t *testing.T) {
398398
})
399399
require.NoError(t, err, "Must not error when creating scraper")
400400

401-
r, err := NewScraperControllerReceiver(
401+
r, err := NewMetricsController(
402402
&cfg,
403403
receivertest.NewNopSettings(),
404404
new(consumertest.MetricsSink),
@@ -428,7 +428,7 @@ func TestShutdownBeforeScrapeCanStart(t *testing.T) {
428428
})
429429
require.NoError(t, err, "Must not error when creating scraper")
430430

431-
r, err := NewScraperControllerReceiver(
431+
r, err := NewMetricsController(
432432
&cfg,
433433
receivertest.NewNopSettings(),
434434
new(consumertest.MetricsSink),

scraper/scraperhelper/obs_metrics.go

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -37,28 +37,28 @@ const (
3737
formatKey = "format"
3838
)
3939

40-
func newObsMetrics(delegate scraper.ScrapeMetricsFunc, receiverID component.ID, scraperID component.ID, telSettings component.TelemetrySettings) (scraper.ScrapeMetricsFunc, error) {
41-
telemetryBuilder, errBuilder := metadata.NewTelemetryBuilder(telSettings)
40+
func wrapObsMetrics(sc scraper.Metrics, receiverID component.ID, scraperID component.ID, set component.TelemetrySettings) (scraper.Metrics, error) {
41+
telemetryBuilder, errBuilder := metadata.NewTelemetryBuilder(set)
4242
if errBuilder != nil {
4343
return nil, errBuilder
4444
}
4545

46-
tracer := metadata.Tracer(telSettings)
46+
tracer := metadata.Tracer(set)
4747
spanName := scraperKey + spanNameSep + scraperID.String() + spanNameSep + "ScrapeMetrics"
4848
otelAttrs := metric.WithAttributeSet(attribute.NewSet(
4949
attribute.String(receiverKey, receiverID.String()),
5050
attribute.String(scraperKey, scraperID.String()),
5151
))
5252

53-
return func(ctx context.Context) (pmetric.Metrics, error) {
53+
scraperFuncs := func(ctx context.Context) (pmetric.Metrics, error) {
5454
ctx, span := tracer.Start(ctx, spanName)
5555
defer span.End()
5656

57-
md, err := delegate(ctx)
57+
md, err := sc.ScrapeMetrics(ctx)
5858
numScrapedMetrics := 0
5959
numErroredMetrics := 0
6060
if err != nil {
61-
telSettings.Logger.Error("Error scraping metrics", zap.Error(err))
61+
set.Logger.Error("Error scraping metrics", zap.Error(err))
6262
var partialErr scrapererror.PartialScrapeError
6363
if errors.As(err, &partialErr) {
6464
numErroredMetrics = partialErr.Failed
@@ -85,5 +85,7 @@ func newObsMetrics(delegate scraper.ScrapeMetricsFunc, receiverID component.ID,
8585
}
8686

8787
return md, err
88-
}, nil
88+
}
89+
90+
return scraper.NewMetrics(scraperFuncs, scraper.WithStart(sc.Start), scraper.WithShutdown(sc.Shutdown))
8991
}

0 commit comments

Comments
 (0)