Skip to content

Commit 9c71935

Browse files
authored
Add pipelines.Config to remove duplicate of the pipelines configuration (#7854)
Signed-off-by: Bogdan Drutu <[email protected]>
1 parent 64f3415 commit 9c71935

File tree

15 files changed

+284
-179
lines changed

15 files changed

+284
-179
lines changed
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
2+
change_type: deprecation
3+
4+
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
5+
component: service
6+
7+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
8+
note: Deprecate service.PipelineConfig in favor of pipelines.Config.
9+
10+
# One or more tracking issues or pull requests related to the change
11+
issues: [7854]
12+

.chloggen/pipelinesconfig.yaml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
2+
change_type: enhancement
3+
4+
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
5+
component: service/pipelines
6+
7+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
8+
note: Add pipelines.Config to remove duplicate of the pipelines configuration
9+
10+
# One or more tracking issues or pull requests related to the change
11+
issues: [7854]
12+

otelcol/config.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -130,14 +130,14 @@ func (cfg *Config) Validate() error {
130130
connectorsAsReceivers[ref] = struct{}{}
131131
continue
132132
}
133-
return fmt.Errorf("service::pipeline::%s: references receiver %q which is not configured", pipelineID, ref)
133+
return fmt.Errorf("service::pipelines::%s: references receiver %q which is not configured", pipelineID, ref)
134134
}
135135

136136
// Validate pipeline processor name references.
137137
for _, ref := range pipeline.Processors {
138138
// Check that the name referenced in the pipeline's processors exists in the top-level processors.
139139
if cfg.Processors[ref] == nil {
140-
return fmt.Errorf("service::pipeline::%s: references processor %q which is not configured", pipelineID, ref)
140+
return fmt.Errorf("service::pipelines::%s: references processor %q which is not configured", pipelineID, ref)
141141
}
142142
}
143143

@@ -151,7 +151,7 @@ func (cfg *Config) Validate() error {
151151
connectorsAsExporters[ref] = struct{}{}
152152
continue
153153
}
154-
return fmt.Errorf("service::pipeline::%s: references exporter %q which is not configured", pipelineID, ref)
154+
return fmt.Errorf("service::pipelines::%s: references exporter %q which is not configured", pipelineID, ref)
155155
}
156156
}
157157

otelcol/config_test.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"go.opentelemetry.io/collector/component"
1515
"go.opentelemetry.io/collector/config/configtelemetry"
1616
"go.opentelemetry.io/collector/service"
17+
"go.opentelemetry.io/collector/service/pipelines"
1718
"go.opentelemetry.io/collector/service/telemetry"
1819
)
1920

@@ -88,7 +89,7 @@ func TestConfigValidate(t *testing.T) {
8889
pipe.Receivers = append(pipe.Receivers, component.NewIDWithName("nop", "2"))
8990
return cfg
9091
},
91-
expected: errors.New(`service::pipeline::traces: references receiver "nop/2" which is not configured`),
92+
expected: errors.New(`service::pipelines::traces: references receiver "nop/2" which is not configured`),
9293
},
9394
{
9495
name: "invalid-processor-reference",
@@ -98,7 +99,7 @@ func TestConfigValidate(t *testing.T) {
9899
pipe.Processors = append(pipe.Processors, component.NewIDWithName("nop", "2"))
99100
return cfg
100101
},
101-
expected: errors.New(`service::pipeline::traces: references processor "nop/2" which is not configured`),
102+
expected: errors.New(`service::pipelines::traces: references processor "nop/2" which is not configured`),
102103
},
103104
{
104105
name: "invalid-exporter-reference",
@@ -108,7 +109,7 @@ func TestConfigValidate(t *testing.T) {
108109
pipe.Exporters = append(pipe.Exporters, component.NewIDWithName("nop", "2"))
109110
return cfg
110111
},
111-
expected: errors.New(`service::pipeline::traces: references exporter "nop/2" which is not configured`),
112+
expected: errors.New(`service::pipelines::traces: references exporter "nop/2" which is not configured`),
112113
},
113114
{
114115
name: "invalid-receiver-config",
@@ -199,7 +200,7 @@ func TestConfigValidate(t *testing.T) {
199200
pipe.Receivers = append(pipe.Receivers, component.NewIDWithName("nop", "conn2"))
200201
return cfg
201202
},
202-
expected: errors.New(`service::pipeline::traces: references receiver "nop/conn2" which is not configured`),
203+
expected: errors.New(`service::pipelines::traces: references receiver "nop/conn2" which is not configured`),
203204
},
204205
{
205206
name: "invalid-connector-reference-as-receiver",
@@ -209,7 +210,7 @@ func TestConfigValidate(t *testing.T) {
209210
pipe.Exporters = append(pipe.Exporters, component.NewIDWithName("nop", "conn2"))
210211
return cfg
211212
},
212-
expected: errors.New(`service::pipeline::traces: references exporter "nop/conn2" which is not configured`),
213+
expected: errors.New(`service::pipelines::traces: references exporter "nop/conn2" which is not configured`),
213214
},
214215
{
215216
name: "missing-connector-as-receiver",
@@ -238,7 +239,7 @@ func TestConfigValidate(t *testing.T) {
238239
cfg.Service.Pipelines = nil
239240
return cfg
240241
},
241-
expected: errors.New(`service must have at least one pipeline`),
242+
expected: fmt.Errorf(`service::pipelines config validation failed: %w`, errors.New(`service must have at least one pipeline`)),
242243
},
243244
}
244245

@@ -285,7 +286,7 @@ func generateConfig() *Config {
285286
},
286287
},
287288
Extensions: []component.ID{component.NewID("nop")},
288-
Pipelines: map[component.ID]*service.PipelineConfig{
289+
Pipelines: pipelines.Config{
289290
component.NewID("traces"): {
290291
Receivers: []component.ID{component.NewID("nop")},
291292
Processors: []component.ID{component.NewID("nop")},

otelcol/configprovider_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"go.opentelemetry.io/collector/processor/processortest"
2525
"go.opentelemetry.io/collector/receiver/receivertest"
2626
"go.opentelemetry.io/collector/service"
27+
"go.opentelemetry.io/collector/service/pipelines"
2728
"go.opentelemetry.io/collector/service/telemetry"
2829
)
2930

@@ -35,7 +36,7 @@ var configNop = &Config{
3536
Extensions: map[component.ID]component.Config{component.NewID("nop"): extensiontest.NewNopFactory().CreateDefaultConfig()},
3637
Service: service.Config{
3738
Extensions: []component.ID{component.NewID("nop")},
38-
Pipelines: map[component.ID]*service.PipelineConfig{
39+
Pipelines: pipelines.Config{
3940
component.NewID("traces"): {
4041
Receivers: []component.ID{component.NewID("nop")},
4142
Processors: []component.ID{component.NewID("nop")},

otelcol/otelcoltest/config_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import (
1111
"github.com/stretchr/testify/require"
1212

1313
"go.opentelemetry.io/collector/component"
14-
"go.opentelemetry.io/collector/service"
14+
"go.opentelemetry.io/collector/service/pipelines"
1515
)
1616

1717
func TestLoadConfig(t *testing.T) {
@@ -50,7 +50,7 @@ func TestLoadConfig(t *testing.T) {
5050
assert.Contains(t, cfg.Service.Extensions, component.NewID("nop"))
5151
require.Len(t, cfg.Service.Pipelines, 1)
5252
assert.Equal(t,
53-
&service.PipelineConfig{
53+
&pipelines.PipelineConfig{
5454
Receivers: []component.ID{component.NewID("nop")},
5555
Processors: []component.ID{component.NewID("nop")},
5656
Exporters: []component.ID{component.NewID("nop")},

otelcol/unmarshaler_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,9 @@ import (
1010
"github.com/stretchr/testify/require"
1111
"go.uber.org/zap"
1212

13-
"go.opentelemetry.io/collector/component"
1413
"go.opentelemetry.io/collector/confmap"
1514
"go.opentelemetry.io/collector/service"
15+
"go.opentelemetry.io/collector/service/pipelines"
1616
"go.opentelemetry.io/collector/service/telemetry"
1717
)
1818

@@ -131,8 +131,8 @@ func TestPipelineConfigUnmarshalError(t *testing.T) {
131131

132132
for _, tt := range testCases {
133133
t.Run(tt.name, func(t *testing.T) {
134-
pipelines := make(map[component.ID]service.PipelineConfig)
135-
err := tt.conf.Unmarshal(&pipelines, confmap.WithErrorUnused())
134+
pips := new(pipelines.Config)
135+
err := tt.conf.Unmarshal(&pips, confmap.WithErrorUnused())
136136
require.Error(t, err)
137137
assert.Contains(t, err.Error(), tt.expectError)
138138
})

service/config.go

Lines changed: 7 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -4,19 +4,15 @@
44
package service // import "go.opentelemetry.io/collector/service"
55

66
import (
7-
"errors"
87
"fmt"
98

10-
"go.opentelemetry.io/collector/component"
119
"go.opentelemetry.io/collector/service/extensions"
10+
"go.opentelemetry.io/collector/service/pipelines"
1211
"go.opentelemetry.io/collector/service/telemetry"
1312
)
1413

15-
var (
16-
errMissingServicePipelines = errors.New("service must have at least one pipeline")
17-
errMissingServicePipelineReceivers = errors.New("must have at least one receiver")
18-
errMissingServicePipelineExporters = errors.New("must have at least one exporter")
19-
)
14+
// Deprecated: [v0.80.0] use pipelines.PipelineConfig.
15+
type PipelineConfig = pipelines.PipelineConfig
2016

2117
// Config defines the configurable components of the Service.
2218
type Config struct {
@@ -27,61 +23,16 @@ type Config struct {
2723
Extensions extensions.Config `mapstructure:"extensions"`
2824

2925
// Pipelines are the set of data pipelines configured for the service.
30-
Pipelines map[component.ID]*PipelineConfig `mapstructure:"pipelines"`
26+
Pipelines pipelines.Config `mapstructure:"pipelines"`
3127
}
3228

3329
func (cfg *Config) Validate() error {
34-
// Must have at least one pipeline.
35-
if len(cfg.Pipelines) == 0 {
36-
return errMissingServicePipelines
37-
}
38-
39-
// Check that all pipelines have at least one receiver and one exporter, and they reference
40-
// only configured components.
41-
for pipelineID, pipeline := range cfg.Pipelines {
42-
if pipelineID.Type() != component.DataTypeTraces && pipelineID.Type() != component.DataTypeMetrics && pipelineID.Type() != component.DataTypeLogs {
43-
return fmt.Errorf("service::pipeline::%s: unknown datatype %q", pipelineID, pipelineID.Type())
44-
}
45-
46-
// Validate pipeline has at least one receiver.
47-
if err := pipeline.Validate(); err != nil {
48-
return fmt.Errorf("service::pipeline::%s: %w", pipelineID, err)
49-
}
30+
if err := cfg.Pipelines.Validate(); err != nil {
31+
return fmt.Errorf("service::pipelines config validation failed: %w", err)
5032
}
5133

5234
if err := cfg.Telemetry.Validate(); err != nil {
53-
fmt.Printf("service::telemetry config validation failed, %v\n", err)
54-
}
55-
56-
return nil
57-
}
58-
59-
// PipelineConfig defines the configuration of a Pipeline.
60-
type PipelineConfig struct {
61-
Receivers []component.ID `mapstructure:"receivers"`
62-
Processors []component.ID `mapstructure:"processors"`
63-
Exporters []component.ID `mapstructure:"exporters"`
64-
}
65-
66-
func (cfg *PipelineConfig) Validate() error {
67-
// Validate pipeline has at least one receiver.
68-
if len(cfg.Receivers) == 0 {
69-
return errMissingServicePipelineReceivers
70-
}
71-
72-
// Validate pipeline has at least one exporter.
73-
if len(cfg.Exporters) == 0 {
74-
return errMissingServicePipelineExporters
75-
}
76-
77-
// Validate no processors are duplicated within a pipeline.
78-
procSet := make(map[component.ID]struct{}, len(cfg.Processors))
79-
for _, ref := range cfg.Processors {
80-
// Ensure no processors are duplicated within the pipeline
81-
if _, exists := procSet[ref]; exists {
82-
return fmt.Errorf("references processor %q multiple times", ref)
83-
}
84-
procSet[ref] = struct{}{}
35+
fmt.Printf("service::telemetry config validation failed: %v\n", err)
8536
}
8637

8738
return nil

service/config_test.go

Lines changed: 5 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"go.opentelemetry.io/collector/component"
1515
"go.opentelemetry.io/collector/config/configtelemetry"
1616
"go.opentelemetry.io/collector/service/extensions"
17+
"go.opentelemetry.io/collector/service/pipelines"
1718
"go.opentelemetry.io/collector/service/telemetry"
1819
)
1920

@@ -45,47 +46,20 @@ func TestConfigValidate(t *testing.T) {
4546
pipe.Processors = append(pipe.Processors, pipe.Processors...)
4647
return cfg
4748
},
48-
expected: fmt.Errorf(`service::pipeline::traces: %w`, errors.New(`references processor "nop" multiple times`)),
49-
},
50-
{
51-
name: "missing-pipeline-receivers",
52-
cfgFn: func() *Config {
53-
cfg := generateConfig()
54-
cfg.Pipelines[component.NewID("traces")].Receivers = nil
55-
return cfg
56-
},
57-
expected: fmt.Errorf(`service::pipeline::traces: %w`, errMissingServicePipelineReceivers),
58-
},
59-
{
60-
name: "missing-pipeline-exporters",
61-
cfgFn: func() *Config {
62-
cfg := generateConfig()
63-
cfg.Pipelines[component.NewID("traces")].Exporters = nil
64-
return cfg
65-
},
66-
expected: fmt.Errorf(`service::pipeline::traces: %w`, errMissingServicePipelineExporters),
67-
},
68-
{
69-
name: "missing-pipelines",
70-
cfgFn: func() *Config {
71-
cfg := generateConfig()
72-
cfg.Pipelines = nil
73-
return cfg
74-
},
75-
expected: errMissingServicePipelines,
49+
expected: fmt.Errorf(`service::pipelines config validation failed: %w`, fmt.Errorf(`pipeline "traces": %w`, errors.New(`references processor "nop" multiple times`))),
7650
},
7751
{
7852
name: "invalid-service-pipeline-type",
7953
cfgFn: func() *Config {
8054
cfg := generateConfig()
81-
cfg.Pipelines[component.NewID("wrongtype")] = &PipelineConfig{
55+
cfg.Pipelines[component.NewID("wrongtype")] = &pipelines.PipelineConfig{
8256
Receivers: []component.ID{component.NewID("nop")},
8357
Processors: []component.ID{component.NewID("nop")},
8458
Exporters: []component.ID{component.NewID("nop")},
8559
}
8660
return cfg
8761
},
88-
expected: errors.New(`service::pipeline::wrongtype: unknown datatype "wrongtype"`),
62+
expected: fmt.Errorf(`service::pipelines config validation failed: %w`, errors.New(`pipeline "wrongtype": unknown datatype "wrongtype"`)),
8963
},
9064
{
9165
name: "invalid-telemetry-metric-config",
@@ -126,7 +100,7 @@ func generateConfig() *Config {
126100
},
127101
},
128102
Extensions: extensions.Config{component.NewID("nop")},
129-
Pipelines: map[component.ID]*PipelineConfig{
103+
Pipelines: pipelines.Config{
130104
component.NewID("traces"): {
131105
Receivers: []component.ID{component.NewID("nop")},
132106
Processors: []component.ID{component.NewID("nop")},

service/internal/graph/graph.go

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"go.opentelemetry.io/collector/receiver"
2323
"go.opentelemetry.io/collector/service/internal/capabilityconsumer"
2424
"go.opentelemetry.io/collector/service/internal/fanoutconsumer"
25+
"go.opentelemetry.io/collector/service/pipelines"
2526
)
2627

2728
// Settings holds configuration for building builtPipelines.
@@ -35,13 +36,7 @@ type Settings struct {
3536
ConnectorBuilder *connector.Builder
3637

3738
// PipelineConfigs is a map of component.ID to PipelineConfig.
38-
PipelineConfigs map[component.ID]*PipelineConfig
39-
}
40-
41-
type PipelineConfig struct {
42-
Receivers []component.ID
43-
Processors []component.ID
44-
Exporters []component.ID
39+
PipelineConfigs pipelines.Config
4540
}
4641

4742
type Graph struct {

0 commit comments

Comments
 (0)