Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions receiver/otlpreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,4 +195,6 @@ func createReceiver(cfg configmodels.Receiver, logger *zap.Logger) (*otlpReceive
// We maintain this map because the Factory is asked trace and metric receivers separately
// when it gets CreateTracesReceiver() and CreateMetricsReceiver() but they must not
// create separate objects, they must use one otlpReceiver object per configuration.
// When the receiver is shutdown it should be removed from this map so the same configuration
// can be recreated successfully.
var receivers = map[*Config]*otlpReceiver{}
16 changes: 14 additions & 2 deletions receiver/otlpreceiver/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type otlpReceiver struct {

stopOnce sync.Once
startServerOnce sync.Once
shutdownWG sync.WaitGroup

logger *zap.Logger
}
Expand Down Expand Up @@ -96,8 +97,11 @@ func (r *otlpReceiver) startGRPCServer(cfg *configgrpc.GRPCServerSettings, host
if err != nil {
return err
}
r.shutdownWG.Add(1)
go func() {
if errGrpc := r.serverGRPC.Serve(gln); errGrpc != nil {
defer r.shutdownWG.Done()

if errGrpc := r.serverGRPC.Serve(gln); errGrpc != nil && errGrpc != grpc.ErrServerStopped {
host.ReportFatalError(errGrpc)
}
}()
Expand All @@ -111,8 +115,11 @@ func (r *otlpReceiver) startHTTPServer(cfg *confighttp.HTTPServerSettings, host
if err != nil {
return err
}
r.shutdownWG.Add(1)
go func() {
if errHTTP := r.serverHTTP.Serve(hln); errHTTP != nil {
defer r.shutdownWG.Done()

if errHTTP := r.serverHTTP.Serve(hln); errHTTP != nil && errHTTP != http.ErrServerClosed {
host.ReportFatalError(errHTTP)
}
}()
Expand Down Expand Up @@ -180,6 +187,11 @@ func (r *otlpReceiver) Shutdown(ctx context.Context) error {
if r.serverGRPC != nil {
r.serverGRPC.GracefulStop()
}

r.shutdownWG.Wait()

// delete the receiver from the map.
delete(receivers, r.cfg)
})
return err
}
Expand Down
3 changes: 1 addition & 2 deletions service/defaultcomponents/default_receivers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,7 @@ func TestDefaultReceivers(t *testing.T) {
skipLifecyle: true, // TODO: Usage of CMux doesn't allow proper shutdown.
},
{
receiver: "otlp",
skipLifecyle: true, // TODO: Upcoming PR to fix zipkin lifecycle.
receiver: "otlp",
},
{
receiver: "prometheus",
Expand Down