Skip to content

Commit cfbb0f6

Browse files
authored
Small cleanups in service/builder (#3693)
Signed-off-by: Bogdan Drutu <[email protected]>
1 parent 6814536 commit cfbb0f6

File tree

6 files changed

+62
-60
lines changed

6 files changed

+62
-60
lines changed

service/internal/builder/exporters_builder.go

Lines changed: 24 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -139,31 +139,38 @@ type dataTypeRequirement struct {
139139
type dataTypeRequirements map[config.DataType]dataTypeRequirement
140140

141141
// Data type requirements for all exporters.
142-
type exportersRequiredDataTypes map[config.Exporter]dataTypeRequirements
142+
type exportersRequiredDataTypes map[config.ComponentID]dataTypeRequirements
143143

144144
// BuildExporters builds Exporters from config.
145145
func BuildExporters(
146146
logger *zap.Logger,
147147
tracerProvider trace.TracerProvider,
148148
buildInfo component.BuildInfo,
149-
config *config.Config,
149+
cfg *config.Config,
150150
factories map[config.Type]component.ExporterFactory,
151151
) (Exporters, error) {
152152
logger = logger.With(zap.String(zapKindKey, zapKindLogExporter))
153153

154154
// We need to calculate required input data types for each exporter so that we know
155155
// which data type must be started for each exporter.
156-
exporterInputDataTypes := calcExportersRequiredDataTypes(config)
156+
exporterInputDataTypes := calcExportersRequiredDataTypes(cfg)
157157

158158
exporters := make(Exporters)
159-
// BuildExporters exporters based on configuration and required input data types.
160-
for _, expCfg := range config.Exporters {
159+
160+
// Build exporters exporters based on configuration and required input data types.
161+
for expID, expCfg := range cfg.Exporters {
161162
set := component.ExporterCreateSettings{
162-
Logger: logger.With(zap.Stringer(zapNameKey, expCfg.ID())),
163+
Logger: logger.With(zap.String(zapNameKey, expID.String())),
163164
TracerProvider: tracerProvider,
164165
BuildInfo: buildInfo,
165166
}
166-
exp, err := buildExporter(context.Background(), factories, set, expCfg, exporterInputDataTypes)
167+
168+
factory, exists := factories[expID.Type()]
169+
if !exists || factory == nil {
170+
return nil, fmt.Errorf("exporter factory not found for type: %s", expID.Type())
171+
}
172+
173+
exp, err := buildExporter(context.Background(), factory, set, expCfg, exporterInputDataTypes[expID])
167174
if err != nil {
168175
return nil, err
169176
}
@@ -174,7 +181,7 @@ func BuildExporters(
174181
return exporters, nil
175182
}
176183

177-
func calcExportersRequiredDataTypes(config *config.Config) exportersRequiredDataTypes {
184+
func calcExportersRequiredDataTypes(cfg *config.Config) exportersRequiredDataTypes {
178185
// Go over all pipelines. The data type of the pipeline defines what data type
179186
// each exporter is expected to receive. Collect all required types for each
180187
// exporter.
@@ -187,43 +194,34 @@ func calcExportersRequiredDataTypes(config *config.Config) exportersRequiredData
187194
result := make(exportersRequiredDataTypes)
188195

189196
// Iterate over pipelines.
190-
for _, pipeline := range config.Service.Pipelines {
197+
for _, pipeline := range cfg.Service.Pipelines {
191198
// Iterate over all exporters for this pipeline.
192-
for _, expName := range pipeline.Exporters {
193-
// Find the exporter config by name.
194-
exporter := config.Exporters[expName]
195-
196-
// Create the data type requirement for the exporter if it does not exist.
197-
if result[exporter] == nil {
198-
result[exporter] = make(dataTypeRequirements)
199+
for _, expID := range pipeline.Exporters {
200+
// Create the data type requirement for the expCfg if it does not exist.
201+
if _, ok := result[expID]; !ok {
202+
result[expID] = make(dataTypeRequirements)
199203
}
200204

201-
// Remember that this data type is required for the exporter and also which
205+
// Remember that this data type is required for the expCfg and also which
202206
// pipeline the requirement is coming from.
203-
result[exporter][pipeline.InputType] = dataTypeRequirement{pipeline}
207+
result[expID][pipeline.InputType] = dataTypeRequirement{pipeline}
204208
}
205209
}
206210
return result
207211
}
208212

209213
func buildExporter(
210214
ctx context.Context,
211-
factories map[config.Type]component.ExporterFactory,
215+
factory component.ExporterFactory,
212216
set component.ExporterCreateSettings,
213217
cfg config.Exporter,
214-
exportersInputDataTypes exportersRequiredDataTypes,
218+
inputDataTypes dataTypeRequirements,
215219
) (*builtExporter, error) {
216-
factory := factories[cfg.ID().Type()]
217-
if factory == nil {
218-
return nil, fmt.Errorf("exporter factory not found for type: %s", cfg.ID().Type())
219-
}
220-
221220
exporter := &builtExporter{
222221
logger: set.Logger,
223222
expByDataType: make(map[config.DataType]component.Exporter, 3),
224223
}
225224

226-
inputDataTypes := exportersInputDataTypes[cfg]
227225
if inputDataTypes == nil {
228226
set.Logger.Info("Ignoring exporter as it is not used by any pipeline")
229227
return exporter, nil

service/internal/builder/exporters_builder_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ func TestBuildExporters(t *testing.T) {
8383
// Since the endpoint of opencensus exporter doesn't actually exist, e1 may
8484
// already stop because it cannot connect.
8585
// The test should stop running if this isn't the error cause.
86-
require.Equal(t, err.Error(), "rpc error: code = Canceled desc = grpc: the client connection is closing")
86+
require.EqualError(t, err, "rpc error: code = Canceled desc = grpc: the client connection is closing")
8787
}
8888

8989
// Remove the pipeline so that the exporter is not attached to any pipeline.

service/internal/builder/extensions_builder.go

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -119,20 +119,26 @@ func BuildExtensions(
119119
config *config.Config,
120120
factories map[config.Type]component.ExtensionFactory,
121121
) (Extensions, error) {
122-
logger = logger.With(zap.String(zapKindKey, zapKindExtension))
123122
extensions := make(Extensions)
124-
for _, extName := range config.Service.Extensions {
125-
extCfg, exists := config.Extensions[extName]
126-
if !exists {
127-
return nil, fmt.Errorf("extension %q is not configured", extName)
123+
for _, extID := range config.Service.Extensions {
124+
extCfg, existsCfg := config.Extensions[extID]
125+
if !existsCfg {
126+
return nil, fmt.Errorf("extension %q is not configured", extID)
127+
}
128+
129+
factory, existsFactory := factories[extID.Type()]
130+
if !existsFactory {
131+
return nil, fmt.Errorf("extension factory for type %q is not configured", extID.Type())
128132
}
129133

130134
set := component.ExtensionCreateSettings{
131-
Logger: logger.With(zap.Stringer(zapNameKey, extCfg.ID())),
135+
Logger: logger.With(
136+
zap.String(zapKindKey, zapKindExtension),
137+
zap.String(zapNameKey, extID.String())),
132138
TracerProvider: tracerProvider,
133139
BuildInfo: buildInfo,
134140
}
135-
ext, err := buildExtension(context.Background(), factories, set, extCfg)
141+
ext, err := buildExtension(context.Background(), factory, set, extCfg)
136142
if err != nil {
137143
return nil, err
138144
}
@@ -143,12 +149,7 @@ func BuildExtensions(
143149
return extensions, nil
144150
}
145151

146-
func buildExtension(ctx context.Context, factories map[config.Type]component.ExtensionFactory, creationSet component.ExtensionCreateSettings, cfg config.Extension) (*builtExtension, error) {
147-
factory := factories[cfg.ID().Type()]
148-
if factory == nil {
149-
return nil, fmt.Errorf("extension factory for type %q is not configured", cfg.ID().Type())
150-
}
151-
152+
func buildExtension(ctx context.Context, factory component.ExtensionFactory, creationSet component.ExtensionCreateSettings, cfg config.Extension) (*builtExtension, error) {
152153
ext := &builtExtension{
153154
logger: creationSet.Logger,
154155
}

service/internal/builder/extensions_builder_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ func TestService_setupExtensions(t *testing.T) {
119119
ext, err := BuildExtensions(zap.NewNop(), trace.NewNoopTracerProvider(), component.DefaultBuildInfo(), tt.config, tt.factories.Extensions)
120120

121121
assert.Error(t, err)
122-
assert.Equal(t, tt.wantErrMsg, err.Error())
122+
assert.EqualError(t, err, tt.wantErrMsg)
123123
assert.Equal(t, 0, len(ext))
124124
})
125125
}

service/internal/builder/pipelines_builder.go

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -144,17 +144,24 @@ func (pb *pipelinesBuilder) buildPipeline(ctx context.Context, pipelineCfg *conf
144144
// the processor itself becomes a consumer for the one that precedes it in
145145
// in the pipeline and so on.
146146
for i := len(pipelineCfg.Processors) - 1; i >= 0; i-- {
147-
procName := pipelineCfg.Processors[i]
148-
procCfg := pb.config.Processors[procName]
147+
procID := pipelineCfg.Processors[i]
149148

150-
factory := pb.factories[procCfg.ID().Type()]
149+
procCfg, existsCfg := pb.config.Processors[procID]
150+
if !existsCfg {
151+
return nil, fmt.Errorf("processor %q is not configured", procID)
152+
}
153+
154+
factory, existsFactory := pb.factories[procID.Type()]
155+
if !existsFactory {
156+
return nil, fmt.Errorf("processor factory for type %q is not configured", procID.Type())
157+
}
151158

152159
// This processor must point to the next consumer and then
153160
// it becomes the next for the previous one (previous in the pipeline,
154161
// which we will build in the next loop iteration).
155162
var err error
156163
set := component.ProcessorCreateSettings{
157-
Logger: pb.logger.With(zap.String(zapKindKey, zapKindProcessor), zap.Stringer(zapNameKey, procCfg.ID())),
164+
Logger: pb.logger.With(zap.String(zapKindKey, zapKindProcessor), zap.String(zapNameKey, procID.String())),
158165
TracerProvider: pb.tracerProvider,
159166
BuildInfo: pb.buildInfo,
160167
}
@@ -188,17 +195,17 @@ func (pb *pipelinesBuilder) buildPipeline(ctx context.Context, pipelineCfg *conf
188195

189196
default:
190197
return nil, fmt.Errorf("error creating processor %q in pipeline %q, data type %s is not supported",
191-
procName, pipelineCfg.Name, pipelineCfg.InputType)
198+
procID, pipelineCfg.Name, pipelineCfg.InputType)
192199
}
193200

194201
if err != nil {
195202
return nil, fmt.Errorf("error creating processor %q in pipeline %q: %v",
196-
procName, pipelineCfg.Name, err)
203+
procID, pipelineCfg.Name, err)
197204
}
198205

199206
// Check if the factory really created the processor.
200207
if tc == nil && mc == nil && lc == nil {
201-
return nil, fmt.Errorf("factory for %v produced a nil processor", procCfg.ID())
208+
return nil, fmt.Errorf("factory for %v produced a nil processor", procID)
202209
}
203210
}
204211

service/internal/builder/receivers_builder.go

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -80,8 +80,6 @@ func (rcvs Receivers) StartAll(ctx context.Context, host component.Host) error {
8080

8181
// receiversBuilder builds receivers from config.
8282
type receiversBuilder struct {
83-
logger *zap.Logger
84-
buildInfo component.BuildInfo
8583
config *config.Config
8684
builtPipelines BuiltPipelines
8785
factories map[config.Type]component.ReceiverFactory
@@ -92,19 +90,20 @@ func BuildReceivers(
9290
logger *zap.Logger,
9391
tracerProvider trace.TracerProvider,
9492
buildInfo component.BuildInfo,
95-
config *config.Config,
93+
cfg *config.Config,
9694
builtPipelines BuiltPipelines,
9795
factories map[config.Type]component.ReceiverFactory,
9896
) (Receivers, error) {
99-
rb := &receiversBuilder{logger.With(zap.String(zapKindKey, zapKindReceiver)), buildInfo, config, builtPipelines, factories}
97+
rb := &receiversBuilder{cfg, builtPipelines, factories}
10098

10199
receivers := make(Receivers)
102-
for _, recvCfg := range rb.config.Receivers {
100+
for recvID, recvCfg := range cfg.Receivers {
103101
set := component.ReceiverCreateSettings{
104-
Logger: rb.logger.With(zap.Stringer(zapNameKey, recvCfg.ID())),
102+
Logger: logger.With(zap.String(zapKindKey, zapKindReceiver), zap.String(zapNameKey, recvID.String())),
105103
TracerProvider: tracerProvider,
106104
BuildInfo: buildInfo,
107105
}
106+
108107
rcv, err := rb.buildReceiver(context.Background(), set, recvCfg)
109108
if err != nil {
110109
if err == errUnusedReceiver {
@@ -199,13 +198,10 @@ func attachReceiverToPipelines(
199198
if err != nil {
200199
if err == componenterror.ErrDataTypeIsNotSupported {
201200
return fmt.Errorf(
202-
"receiver %v does not support %s but it was used in a "+
203-
"%s pipeline",
204-
cfg.ID(),
205-
dataType,
206-
dataType)
201+
"receiver %v does not support %s but it was used in a %s pipeline",
202+
cfg.ID(), dataType, dataType)
207203
}
208-
return fmt.Errorf("cannot create receiver %v: %s", cfg.ID(), err.Error())
204+
return fmt.Errorf("cannot create receiver %v: %w", cfg.ID(), err)
209205
}
210206

211207
// Check if the factory really created the receiver.

0 commit comments

Comments
 (0)