Skip to content

Commit b3c781b

Browse files
authored
[chore] Remove telemetry from graph initialization (#10446)
This PR refactors the service graph initialization so that the graph can be assembled without the intention to start it. 1. Do not save telemetry on the graph. The only use of this was for reporting component status. Instead, pass in a reporter when starting or stopping. Correspondingly, add an internal `statustest` package to make it easy to pass in a status reporter in tests. 2. Decouple graph building from extension building. There isn't any direct relationship so these things should be separated.
1 parent 426e660 commit b3c781b

File tree

7 files changed

+101
-54
lines changed

7 files changed

+101
-54
lines changed

service/internal/graph/graph.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
"go.opentelemetry.io/collector/receiver"
3333
"go.opentelemetry.io/collector/service/internal/capabilityconsumer"
3434
"go.opentelemetry.io/collector/service/internal/servicetelemetry"
35+
"go.opentelemetry.io/collector/service/internal/status"
3536
"go.opentelemetry.io/collector/service/pipelines"
3637
)
3738

@@ -69,7 +70,6 @@ func Build(ctx context.Context, set Settings) (*Graph, error) {
6970
componentGraph: simple.NewDirectedGraph(),
7071
pipelines: make(map[component.ID]*pipelineNodes, len(set.PipelineConfigs)),
7172
instanceIDs: make(map[int64]*component.InstanceID),
72-
telemetry: set.Telemetry,
7373
}
7474
for pipelineID := range set.PipelineConfigs {
7575
pipelines.pipelines[pipelineID] = &pipelineNodes{
@@ -394,7 +394,7 @@ type pipelineNodes struct {
394394
exporters map[int64]graph.Node
395395
}
396396

397-
func (g *Graph) StartAll(ctx context.Context, host component.Host) error {
397+
func (g *Graph) StartAll(ctx context.Context, host component.Host, reporter status.Reporter) error {
398398
nodes, err := topo.Sort(g.componentGraph)
399399
if err != nil {
400400
return err
@@ -413,25 +413,25 @@ func (g *Graph) StartAll(ctx context.Context, host component.Host) error {
413413
}
414414

415415
instanceID := g.instanceIDs[node.ID()]
416-
g.telemetry.Status.ReportStatus(
416+
reporter.ReportStatus(
417417
instanceID,
418418
component.NewStatusEvent(component.StatusStarting),
419419
)
420420

421421
if compErr := comp.Start(ctx, host); compErr != nil {
422-
g.telemetry.Status.ReportStatus(
422+
reporter.ReportStatus(
423423
instanceID,
424424
component.NewPermanentErrorEvent(compErr),
425425
)
426426
return compErr
427427
}
428428

429-
g.telemetry.Status.ReportOKIfStarting(instanceID)
429+
reporter.ReportOKIfStarting(instanceID)
430430
}
431431
return nil
432432
}
433433

434-
func (g *Graph) ShutdownAll(ctx context.Context) error {
434+
func (g *Graph) ShutdownAll(ctx context.Context, reporter status.Reporter) error {
435435
nodes, err := topo.Sort(g.componentGraph)
436436
if err != nil {
437437
return err
@@ -452,21 +452,21 @@ func (g *Graph) ShutdownAll(ctx context.Context) error {
452452
}
453453

454454
instanceID := g.instanceIDs[node.ID()]
455-
g.telemetry.Status.ReportStatus(
455+
reporter.ReportStatus(
456456
instanceID,
457457
component.NewStatusEvent(component.StatusStopping),
458458
)
459459

460460
if compErr := comp.Shutdown(ctx); compErr != nil {
461461
errs = multierr.Append(errs, compErr)
462-
g.telemetry.Status.ReportStatus(
462+
reporter.ReportStatus(
463463
instanceID,
464464
component.NewPermanentErrorEvent(compErr),
465465
)
466466
continue
467467
}
468468

469-
g.telemetry.Status.ReportStatus(
469+
reporter.ReportStatus(
470470
instanceID,
471471
component.NewStatusEvent(component.StatusStopped),
472472
)

service/internal/graph/graph_test.go

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"go.opentelemetry.io/collector/receiver/receivertest"
3131
"go.opentelemetry.io/collector/service/internal/servicetelemetry"
3232
"go.opentelemetry.io/collector/service/internal/status"
33+
"go.opentelemetry.io/collector/service/internal/status/statustest"
3334
"go.opentelemetry.io/collector/service/internal/testcomponents"
3435
"go.opentelemetry.io/collector/service/pipelines"
3536
)
@@ -154,13 +155,13 @@ func TestGraphStartStop(t *testing.T) {
154155
pg.componentGraph.SetEdge(simple.Edge{F: f, T: t})
155156
}
156157

157-
require.NoError(t, pg.StartAll(ctx, componenttest.NewNopHost()))
158+
require.NoError(t, pg.StartAll(ctx, componenttest.NewNopHost(), statustest.NewNopStatusReporter()))
158159
for _, edge := range tt.edges {
159160
assert.Greater(t, ctx.order[edge[0]], ctx.order[edge[1]])
160161
}
161162

162163
ctx.order = map[component.ID]int{}
163-
require.NoError(t, pg.ShutdownAll(ctx))
164+
require.NoError(t, pg.ShutdownAll(ctx, statustest.NewNopStatusReporter()))
164165
for _, edge := range tt.edges {
165166
assert.Less(t, ctx.order[edge[0]], ctx.order[edge[1]])
166167
}
@@ -188,11 +189,11 @@ func TestGraphStartStopCycle(t *testing.T) {
188189
pg.componentGraph.SetEdge(simple.Edge{F: c1, T: e1})
189190
pg.componentGraph.SetEdge(simple.Edge{F: c1, T: p1}) // loop back
190191

191-
err := pg.StartAll(context.Background(), componenttest.NewNopHost())
192+
err := pg.StartAll(context.Background(), componenttest.NewNopHost(), statustest.NewNopStatusReporter())
192193
assert.Error(t, err)
193194
assert.Contains(t, err.Error(), `topo: no topological ordering: cyclic components`)
194195

195-
err = pg.ShutdownAll(context.Background())
196+
err = pg.ShutdownAll(context.Background(), statustest.NewNopStatusReporter())
196197
assert.Error(t, err)
197198
assert.Contains(t, err.Error(), `topo: no topological ordering: cyclic components`)
198199
}
@@ -216,8 +217,8 @@ func TestGraphStartStopComponentError(t *testing.T) {
216217
F: r1,
217218
T: e1,
218219
})
219-
assert.EqualError(t, pg.StartAll(context.Background(), componenttest.NewNopHost()), "foo")
220-
assert.EqualError(t, pg.ShutdownAll(context.Background()), "bar")
220+
assert.EqualError(t, pg.StartAll(context.Background(), componenttest.NewNopHost(), statustest.NewNopStatusReporter()), "foo")
221+
assert.EqualError(t, pg.ShutdownAll(context.Background(), statustest.NewNopStatusReporter()), "bar")
221222
}
222223

223224
func TestConnectorPipelinesGraph(t *testing.T) {
@@ -768,7 +769,7 @@ func TestConnectorPipelinesGraph(t *testing.T) {
768769

769770
assert.Equal(t, len(test.pipelineConfigs), len(pg.pipelines))
770771

771-
assert.NoError(t, pg.StartAll(context.Background(), componenttest.NewNopHost()))
772+
assert.NoError(t, pg.StartAll(context.Background(), componenttest.NewNopHost(), statustest.NewNopStatusReporter()))
772773

773774
mutatingPipelines := make(map[component.ID]bool, len(test.pipelineConfigs))
774775

@@ -892,7 +893,7 @@ func TestConnectorPipelinesGraph(t *testing.T) {
892893
}
893894

894895
// Shut down the entire component graph
895-
assert.NoError(t, pg.ShutdownAll(context.Background()))
896+
assert.NoError(t, pg.ShutdownAll(context.Background(), statustest.NewNopStatusReporter()))
896897

897898
// Check each pipeline individually, ensuring that all components are stopped.
898899
for pipelineID := range test.pipelineConfigs {
@@ -2148,8 +2149,8 @@ func TestGraphFailToStartAndShutdown(t *testing.T) {
21482149
}
21492150
pipelines, err := Build(context.Background(), set)
21502151
assert.NoError(t, err)
2151-
assert.Error(t, pipelines.StartAll(context.Background(), componenttest.NewNopHost()))
2152-
assert.Error(t, pipelines.ShutdownAll(context.Background()))
2152+
assert.Error(t, pipelines.StartAll(context.Background(), componenttest.NewNopHost(), statustest.NewNopStatusReporter()))
2153+
assert.Error(t, pipelines.ShutdownAll(context.Background(), statustest.NewNopStatusReporter()))
21532154
})
21542155

21552156
t.Run(dt.String()+"/processor", func(t *testing.T) {
@@ -2162,8 +2163,8 @@ func TestGraphFailToStartAndShutdown(t *testing.T) {
21622163
}
21632164
pipelines, err := Build(context.Background(), set)
21642165
assert.NoError(t, err)
2165-
assert.Error(t, pipelines.StartAll(context.Background(), componenttest.NewNopHost()))
2166-
assert.Error(t, pipelines.ShutdownAll(context.Background()))
2166+
assert.Error(t, pipelines.StartAll(context.Background(), componenttest.NewNopHost(), statustest.NewNopStatusReporter()))
2167+
assert.Error(t, pipelines.ShutdownAll(context.Background(), statustest.NewNopStatusReporter()))
21672168
})
21682169

21692170
t.Run(dt.String()+"/exporter", func(t *testing.T) {
@@ -2176,8 +2177,8 @@ func TestGraphFailToStartAndShutdown(t *testing.T) {
21762177
}
21772178
pipelines, err := Build(context.Background(), set)
21782179
assert.NoError(t, err)
2179-
assert.Error(t, pipelines.StartAll(context.Background(), componenttest.NewNopHost()))
2180-
assert.Error(t, pipelines.ShutdownAll(context.Background()))
2180+
assert.Error(t, pipelines.StartAll(context.Background(), componenttest.NewNopHost(), statustest.NewNopStatusReporter()))
2181+
assert.Error(t, pipelines.ShutdownAll(context.Background(), statustest.NewNopStatusReporter()))
21812182
})
21822183

21832184
for _, dt2 := range dataTypes {
@@ -2196,8 +2197,8 @@ func TestGraphFailToStartAndShutdown(t *testing.T) {
21962197
}
21972198
pipelines, err := Build(context.Background(), set)
21982199
assert.NoError(t, err)
2199-
assert.Error(t, pipelines.StartAll(context.Background(), componenttest.NewNopHost()))
2200-
assert.Error(t, pipelines.ShutdownAll(context.Background()))
2200+
assert.Error(t, pipelines.StartAll(context.Background(), componenttest.NewNopHost(), statustest.NewNopStatusReporter()))
2201+
assert.Error(t, pipelines.ShutdownAll(context.Background(), statustest.NewNopStatusReporter()))
22012202
})
22022203
}
22032204
}
@@ -2350,8 +2351,8 @@ func TestStatusReportedOnStartupShutdown(t *testing.T) {
23502351
}
23512352
pg.componentGraph.SetEdge(simple.Edge{F: e0, T: e1})
23522353

2353-
assert.Equal(t, tc.startupErr, pg.StartAll(context.Background(), componenttest.NewNopHost()))
2354-
assert.Equal(t, tc.shutdownErr, pg.ShutdownAll(context.Background()))
2354+
assert.Equal(t, tc.startupErr, pg.StartAll(context.Background(), componenttest.NewNopHost(), rep))
2355+
assert.Equal(t, tc.shutdownErr, pg.ShutdownAll(context.Background(), rep))
23552356
assertEqualStatuses(t, tc.expectedStatuses, actualStatuses)
23562357
})
23572358
}

service/internal/servicetelemetry/telemetry_settings.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ type TelemetrySettings struct {
3737

3838
// Status contains a Reporter that allows the service to report status on behalf of a
3939
// component.
40-
Status *status.Reporter
40+
Status status.Reporter
4141
}
4242

4343
// ToComponentTelemetrySettings returns a TelemetrySettings for a specific component derived from

service/internal/status/status.go

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,13 @@ type ServiceStatusFunc func(*component.InstanceID, *component.StatusEvent)
9696
var ErrStatusNotReady = errors.New("report component status is not ready until service start")
9797

9898
// Reporter handles component status reporting
99-
type Reporter struct {
99+
type Reporter interface {
100+
Ready()
101+
ReportStatus(id *component.InstanceID, ev *component.StatusEvent)
102+
ReportOKIfStarting(id *component.InstanceID)
103+
}
104+
105+
type reporter struct {
100106
mu sync.Mutex
101107
ready bool
102108
fsmMap map[*component.InstanceID]*fsm
@@ -106,23 +112,23 @@ type Reporter struct {
106112

107113
// NewReporter returns a reporter that will invoke the NotifyStatusFunc when a component's status
108114
// has changed.
109-
func NewReporter(onStatusChange NotifyStatusFunc, onInvalidTransition InvalidTransitionFunc) *Reporter {
110-
return &Reporter{
115+
func NewReporter(onStatusChange NotifyStatusFunc, onInvalidTransition InvalidTransitionFunc) Reporter {
116+
return &reporter{
111117
fsmMap: make(map[*component.InstanceID]*fsm),
112118
onStatusChange: onStatusChange,
113119
onInvalidTransition: onInvalidTransition,
114120
}
115121
}
116122

117123
// Ready enables status reporting
118-
func (r *Reporter) Ready() {
124+
func (r *reporter) Ready() {
119125
r.mu.Lock()
120126
defer r.mu.Unlock()
121127
r.ready = true
122128
}
123129

124130
// ReportStatus reports status for the given InstanceID
125-
func (r *Reporter) ReportStatus(
131+
func (r *reporter) ReportStatus(
126132
id *component.InstanceID,
127133
ev *component.StatusEvent,
128134
) {
@@ -137,7 +143,7 @@ func (r *Reporter) ReportStatus(
137143
}
138144
}
139145

140-
func (r *Reporter) ReportOKIfStarting(id *component.InstanceID) {
146+
func (r *reporter) ReportOKIfStarting(id *component.InstanceID) {
141147
r.mu.Lock()
142148
defer r.mu.Unlock()
143149
if !r.ready {
@@ -152,7 +158,7 @@ func (r *Reporter) ReportOKIfStarting(id *component.InstanceID) {
152158
}
153159

154160
// Note: a lock must be acquired before calling this method.
155-
func (r *Reporter) componentFSM(id *component.InstanceID) *fsm {
161+
func (r *reporter) componentFSM(id *component.InstanceID) *fsm {
156162
fsm, ok := r.fsmMap[id]
157163
if !ok {
158164
fsm = newFSM(func(ev *component.StatusEvent) { r.onStatusChange(id, ev) })
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package statustest // import "go.opentelemetry.io/collector/service/internal/status/statustest"
5+
6+
import (
7+
"go.opentelemetry.io/collector/component"
8+
"go.opentelemetry.io/collector/service/internal/status"
9+
)
10+
11+
func NewNopStatusReporter() status.Reporter {
12+
return &nopStatusReporter{}
13+
}
14+
15+
type nopStatusReporter struct{}
16+
17+
func (r *nopStatusReporter) Ready() {}
18+
19+
func (r *nopStatusReporter) ReportStatus(*component.InstanceID, *component.StatusEvent) {}
20+
21+
func (r *nopStatusReporter) ReportOKIfStarting(*component.InstanceID) {}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package statustest // import "go.opentelemetry.io/collector/service/internal/status/statustest"
5+
6+
import "testing"
7+
8+
func TestNopStatusReporter(*testing.T) {
9+
nop := NewNopStatusReporter()
10+
nop.Ready()
11+
nop.ReportOKIfStarting(nil)
12+
nop.ReportStatus(nil, nil)
13+
}

0 commit comments

Comments
 (0)