Skip to content

Commit 458cab8

Browse files
committed
Allow multiple OTLP to be created
1 parent 94a6cdd commit 458cab8

File tree

3 files changed

+87
-16
lines changed

3 files changed

+87
-16
lines changed

receiver/otlpreceiver/factory.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ func customUnmarshaler(componentViperSection *viper.Viper, intoCfg interface{})
121121

122122
// CreateTracesReceiver creates a trace receiver based on provided config.
123123
func createTraceReceiver(
124-
ctx context.Context,
124+
_ context.Context,
125125
params component.ReceiverCreateParams,
126126
cfg configmodels.Receiver,
127127
nextConsumer consumer.TracesConsumer,
@@ -130,15 +130,15 @@ func createTraceReceiver(
130130
if err != nil {
131131
return nil, err
132132
}
133-
if err = r.registerTraceConsumer(ctx, nextConsumer); err != nil {
133+
if err = r.registerTraceConsumer(nextConsumer); err != nil {
134134
return nil, err
135135
}
136136
return r, nil
137137
}
138138

139139
// CreateMetricsReceiver creates a metrics receiver based on provided config.
140140
func createMetricsReceiver(
141-
ctx context.Context,
141+
_ context.Context,
142142
params component.ReceiverCreateParams,
143143
cfg configmodels.Receiver,
144144
consumer consumer.MetricsConsumer,
@@ -147,15 +147,15 @@ func createMetricsReceiver(
147147
if err != nil {
148148
return nil, err
149149
}
150-
if err = r.registerMetricsConsumer(ctx, consumer); err != nil {
150+
if err = r.registerMetricsConsumer(consumer); err != nil {
151151
return nil, err
152152
}
153153
return r, nil
154154
}
155155

156156
// CreateLogReceiver creates a log receiver based on provided config.
157157
func createLogReceiver(
158-
ctx context.Context,
158+
_ context.Context,
159159
params component.ReceiverCreateParams,
160160
cfg configmodels.Receiver,
161161
consumer consumer.LogsConsumer,
@@ -164,7 +164,7 @@ func createLogReceiver(
164164
if err != nil {
165165
return nil, err
166166
}
167-
if err = r.registerLogsConsumer(ctx, consumer); err != nil {
167+
if err = r.registerLogsConsumer(consumer); err != nil {
168168
return nil, err
169169
}
170170
return r, nil

receiver/otlpreceiver/otlp.go

Lines changed: 40 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ func (r *otlpReceiver) startGRPCServer(cfg *configgrpc.GRPCServerSettings, host
9797
return err
9898
}
9999
go func() {
100-
if errGrpc := r.serverGRPC.Serve(gln); errGrpc != nil {
100+
if errGrpc := r.serverGRPC.Serve(gln); errGrpc != nil && errGrpc != grpc.ErrServerStopped {
101101
host.ReportFatalError(errGrpc)
102102
}
103103
}()
@@ -112,14 +112,32 @@ func (r *otlpReceiver) startHTTPServer(cfg *confighttp.HTTPServerSettings, host
112112
return err
113113
}
114114
go func() {
115-
if errHTTP := r.serverHTTP.Serve(hln); errHTTP != nil {
115+
if errHTTP := r.serverHTTP.Serve(hln); errHTTP != nil && errHTTP != http.ErrServerClosed {
116116
host.ReportFatalError(errHTTP)
117117
}
118118
}()
119119
return nil
120120
}
121121

122-
func (r *otlpReceiver) startProtocolServers(host component.Host) error {
122+
func (r *otlpReceiver) startProtocolServers(ctx context.Context, host component.Host) error {
123+
if r.traceReceiver != nil {
124+
if err := r.registerTraceServers(ctx); err != nil {
125+
return err
126+
}
127+
}
128+
129+
if r.metricsReceiver != nil {
130+
if err := r.registerMetricsServers(ctx); err != nil {
131+
return err
132+
}
133+
}
134+
135+
if r.logReceiver != nil {
136+
if err := r.registerLogsServers(ctx); err != nil {
137+
return err
138+
}
139+
}
140+
123141
var err error
124142
if r.cfg.GRPC != nil {
125143
err = r.startGRPCServer(r.cfg.GRPC, host)
@@ -155,14 +173,14 @@ func (r *otlpReceiver) startProtocolServers(host component.Host) error {
155173

156174
// Start runs the trace receiver on the gRPC server. Currently
157175
// it also enables the metrics receiver too.
158-
func (r *otlpReceiver) Start(_ context.Context, host component.Host) error {
176+
func (r *otlpReceiver) Start(ctx context.Context, host component.Host) error {
159177
if r.traceReceiver == nil && r.metricsReceiver == nil && r.logReceiver == nil {
160178
return errors.New("cannot start receiver: no consumers were specified")
161179
}
162180

163181
var err error
164182
r.startServerOnce.Do(func() {
165-
err = r.startProtocolServers(host)
183+
err = r.startProtocolServers(ctx, host)
166184
})
167185
return err
168186
}
@@ -184,11 +202,15 @@ func (r *otlpReceiver) Shutdown(ctx context.Context) error {
184202
return err
185203
}
186204

187-
func (r *otlpReceiver) registerTraceConsumer(ctx context.Context, tc consumer.TracesConsumer) error {
205+
func (r *otlpReceiver) registerTraceConsumer(tc consumer.TracesConsumer) error {
188206
if tc == nil {
189207
return componenterror.ErrNilNextConsumer
190208
}
191209
r.traceReceiver = trace.New(r.cfg.Name(), tc)
210+
return nil
211+
}
212+
213+
func (r *otlpReceiver) registerTraceServers(ctx context.Context) error {
192214
if r.serverGRPC != nil {
193215
collectortrace.RegisterTraceServiceServer(r.serverGRPC, r.traceReceiver)
194216
}
@@ -203,11 +225,15 @@ func (r *otlpReceiver) registerTraceConsumer(ctx context.Context, tc consumer.Tr
203225
return nil
204226
}
205227

206-
func (r *otlpReceiver) registerMetricsConsumer(ctx context.Context, mc consumer.MetricsConsumer) error {
228+
func (r *otlpReceiver) registerMetricsConsumer(mc consumer.MetricsConsumer) error {
207229
if mc == nil {
208230
return componenterror.ErrNilNextConsumer
209231
}
210232
r.metricsReceiver = metrics.New(r.cfg.Name(), mc)
233+
return nil
234+
}
235+
236+
func (r *otlpReceiver) registerMetricsServers(ctx context.Context) error {
211237
if r.serverGRPC != nil {
212238
collectormetrics.RegisterMetricsServiceServer(r.serverGRPC, r.metricsReceiver)
213239
}
@@ -217,11 +243,15 @@ func (r *otlpReceiver) registerMetricsConsumer(ctx context.Context, mc consumer.
217243
return nil
218244
}
219245

220-
func (r *otlpReceiver) registerLogsConsumer(ctx context.Context, tc consumer.LogsConsumer) error {
221-
if tc == nil {
246+
func (r *otlpReceiver) registerLogsConsumer(lc consumer.LogsConsumer) error {
247+
if lc == nil {
222248
return componenterror.ErrNilNextConsumer
223249
}
224-
r.logReceiver = logs.New(r.cfg.Name(), tc)
250+
r.logReceiver = logs.New(r.cfg.Name(), lc)
251+
return nil
252+
}
253+
254+
func (r *otlpReceiver) registerLogsServers(ctx context.Context) error {
225255
if r.serverGRPC != nil {
226256
collectorlog.RegisterLogsServiceServer(r.serverGRPC, r.logReceiver)
227257
}

receiver/otlpreceiver/otlp_test.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -542,6 +542,31 @@ func TestHTTPNewPortAlreadyUsed(t *testing.T) {
542542
require.Error(t, r.Start(context.Background(), componenttest.NewNopHost()))
543543
}
544544

545+
func TestReceiverLifecycle(t *testing.T) {
546+
endpointGrpc := testutil.GetAvailableLocalAddress(t)
547+
endpointHTTP := testutil.GetAvailableLocalAddress(t)
548+
549+
tracesSink := new(consumertest.TracesSink)
550+
metricsSink := new(consumertest.MetricsSink)
551+
552+
// Create OTLP receiver with gRPC and HTTP protocols.
553+
factory := NewFactory()
554+
cfg := factory.CreateDefaultConfig().(*Config)
555+
cfg.GRPC.NetAddr.Endpoint = endpointGrpc
556+
cfg.HTTP.Endpoint = endpointHTTP
557+
558+
fstReceiver := newReceiver(t, factory, cfg, tracesSink, metricsSink)
559+
mh := newAssertNoErrorHost(t)
560+
require.NoError(t, fstReceiver.Start(context.Background(), mh))
561+
562+
sndReceiver := newReceiver(t, factory, cfg, tracesSink, metricsSink)
563+
564+
require.NoError(t, fstReceiver.Shutdown(context.Background()))
565+
566+
require.NoError(t, sndReceiver.Start(context.Background(), mh))
567+
require.NoError(t, sndReceiver.Shutdown(context.Background()))
568+
}
569+
545570
func TestGRPCStartWithoutConsumers(t *testing.T) {
546571
addr := testutil.GetAvailableLocalAddress(t)
547572
r := newGRPCReceiver(t, otlpReceiverName, addr, nil, nil)
@@ -860,3 +885,19 @@ loop:
860885
// Indicate that we are done.
861886
close(doneSignal)
862887
}
888+
889+
type assertNoErrorHost struct {
890+
component.Host
891+
*testing.T
892+
}
893+
894+
func newAssertNoErrorHost(t *testing.T) component.Host {
895+
return &assertNoErrorHost{
896+
componenttest.NewNopHost(),
897+
t,
898+
}
899+
}
900+
901+
func (aneh *assertNoErrorHost) ReportFatalError(err error) {
902+
assert.NoError(aneh, err)
903+
}

0 commit comments

Comments
 (0)