@@ -30,23 +30,9 @@ import (
3030 "github.com/open-telemetry/opentelemetry-collector/oterr"
3131)
3232
33- const (
34- defaultNumWorkers = 4
35-
36- messageChannelSize = 64
37- )
38-
3933// Receiver is the type used to handle spans from OpenCensus exporters.
4034type Receiver struct {
4135 nextConsumer consumer.TraceConsumer
42- numWorkers int
43- workers []* receiverWorker
44- messageChan chan * traceDataWithCtx
45- }
46-
47- type traceDataWithCtx struct {
48- data * consumerdata.TraceData
49- ctx context.Context
5036}
5137
5238// New creates a new opencensus.Receiver reference.
@@ -55,25 +41,13 @@ func New(nextConsumer consumer.TraceConsumer, opts ...Option) (*Receiver, error)
5541 return nil , oterr .ErrNilNextConsumer
5642 }
5743
58- messageChan := make (chan * traceDataWithCtx , messageChannelSize )
5944 ocr := & Receiver {
6045 nextConsumer : nextConsumer ,
61- numWorkers : defaultNumWorkers ,
62- messageChan : messageChan ,
6346 }
6447 for _ , opt := range opts {
6548 opt (ocr )
6649 }
6750
68- // Setup and startup worker pool
69- workers := make ([]* receiverWorker , 0 , ocr .numWorkers )
70- for index := 0 ; index < ocr .numWorkers ; index ++ {
71- worker := newReceiverWorker (ocr )
72- go worker .listenOn (messageChan )
73- workers = append (workers , worker )
74- }
75- ocr .workers = workers
76-
7751 return ocr , nil
7852}
7953
@@ -89,13 +63,11 @@ func (ocr *Receiver) Config(tcs agenttracepb.TraceService_ConfigServer) error {
8963
9064var errTraceExportProtocolViolation = errors .New ("protocol violation: Export's first message must have a Node" )
9165
92- const receiverTagValue = "oc_trace"
93-
9466// Export is the gRPC method that receives streamed traces from
9567// OpenCensus-traceproto compatible libraries/applications.
9668func (ocr * Receiver ) Export (tes agenttracepb.TraceService_ExportServer ) error {
9769 // We need to ensure that it propagates the receiver name as a tag
98- ctxWithReceiverName := observability .ContextWithReceiverName (tes .Context (), receiverTagValue )
70+ ctxWithReceiverName := observability .ContextWithReceiverName (tes .Context (), "oc_trace" )
9971
10072 // The first message MUST have a non-nil Node.
10173 recv , err := tes .Recv ()
@@ -112,28 +84,13 @@ func (ocr *Receiver) Export(tes agenttracepb.TraceService_ExportServer) error {
11284 var resource * resourcepb.Resource
11385 // Now that we've got the first message with a Node, we can start to receive streamed up spans.
11486 for {
115- // If a Node has been sent from downstream, save and use it.
116- if recv .Node != nil {
117- lastNonNilNode = recv .Node
118- }
119-
120- // TODO(songya): differentiate between unset and nil resource. See
121- // https://github.com/census-instrumentation/opencensus-proto/issues/146.
122- if recv .Resource != nil {
123- resource = recv .Resource
124- }
125-
126- td := & consumerdata.TraceData {
127- Node : lastNonNilNode ,
128- Resource : resource ,
129- Spans : recv .Spans ,
130- SourceFormat : "oc_trace" ,
87+ lastNonNilNode , resource , err = ocr .processReceivedMsg (ctxWithReceiverName , lastNonNilNode , resource , recv )
88+ if err != nil {
89+ // Metrics and z-pages record data loss but there is no back pressure.
90+ // However, cause the stream to be closed.
91+ return nil
13192 }
13293
133- ocr .messageChan <- & traceDataWithCtx {data : td , ctx : ctxWithReceiverName }
134-
135- observability .RecordMetricsForTraceReceiver (ctxWithReceiverName , len (td .Spans ), 0 )
136-
13794 recv , err = tes .Recv ()
13895 if err != nil {
13996 if err == io .EOF {
@@ -146,63 +103,64 @@ func (ocr *Receiver) Export(tes agenttracepb.TraceService_ExportServer) error {
146103 }
147104}
148105
149- // Stop the receiver and its workers
150- func (ocr * Receiver ) Stop () {
151- for _ , worker := range ocr .workers {
152- worker .stopListening ()
106+ func (ocr * Receiver ) processReceivedMsg (
107+ ctx context.Context ,
108+ lastNonNilNode * commonpb.Node ,
109+ resource * resourcepb.Resource ,
110+ recv * agenttracepb.ExportTraceServiceRequest ,
111+ ) (* commonpb.Node , * resourcepb.Resource , error ) {
112+ // If a Node has been sent from downstream, save and use it.
113+ if recv .Node != nil {
114+ lastNonNilNode = recv .Node
153115 }
154- }
155116
156- type receiverWorker struct {
157- receiver * Receiver
158- cancel chan struct {}
159- }
160-
161- func newReceiverWorker (receiver * Receiver ) * receiverWorker {
162- return & receiverWorker {
163- receiver : receiver ,
164- cancel : make (chan struct {}),
117+ // TODO(songya): differentiate between unset and nil resource. See
118+ // https://github.com/census-instrumentation/opencensus-proto/issues/146.
119+ if recv .Resource != nil {
120+ resource = recv .Resource
165121 }
166- }
167122
168- func (rw * receiverWorker ) listenOn (cn <- chan * traceDataWithCtx ) {
169- for {
170- select {
171- case tdWithCtx := <- cn :
172- rw .export (tdWithCtx .ctx , tdWithCtx .data )
173- case <- rw .cancel :
174- return
175- }
123+ td := & consumerdata.TraceData {
124+ Node : lastNonNilNode ,
125+ Resource : resource ,
126+ Spans : recv .Spans ,
127+ SourceFormat : "oc_trace" ,
176128 }
177- }
178129
179- func ( rw * receiverWorker ) stopListening () {
180- close ( rw . cancel )
130+ err := ocr . sendToNextConsumer ( ctx , td )
131+ return lastNonNilNode , resource , err
181132}
182133
183- func (rw * receiverWorker ) export (longLivedCtx context.Context , tracedata * consumerdata.TraceData ) {
134+ func (ocr * Receiver ) sendToNextConsumer (longLivedCtx context.Context , tracedata * consumerdata.TraceData ) error {
184135 if tracedata == nil {
185- return
136+ return nil
186137 }
187138
188139 if len (tracedata .Spans ) == 0 {
189- return
140+ observability .RecordMetricsForTraceReceiver (longLivedCtx , 0 , 0 )
141+ return nil
190142 }
191143
192144 // Trace this method
193145 ctx , span := trace .StartSpan (context .Background (), "OpenCensusTraceReceiver.Export" )
194146 defer span .End ()
195147
196- // TODO: (@odeke-em) investigate if it is necessary
197- // to group nodes with their respective spans during
198- // spansAndNode list unfurling then send spans grouped per node
199-
200148 // If the starting RPC has a parent span, then add it as a parent link.
201149 observability .SetParentLink (longLivedCtx , span )
202150
203- rw .receiver .nextConsumer .ConsumeTraceData (ctx , * tracedata )
151+ err := ocr .nextConsumer .ConsumeTraceData (ctx , * tracedata )
152+ if err != nil {
153+ observability .RecordMetricsForTraceReceiver (longLivedCtx , 0 , len (tracedata .Spans ))
154+ span .AddAttributes (trace .Int64Attribute ("dropped_spans" , int64 (len (tracedata .Spans ))))
155+
156+ span .SetStatus (trace.Status {
157+ Code : trace .StatusCodeUnknown ,
158+ Message : err .Error (),
159+ })
160+ } else {
161+ observability .RecordMetricsForTraceReceiver (longLivedCtx , len (tracedata .Spans ), 0 )
162+ span .AddAttributes (trace .Int64Attribute ("num_spans" , int64 (len (tracedata .Spans ))))
163+ }
204164
205- span .Annotate ([]trace.Attribute {
206- trace .Int64Attribute ("num_spans" , int64 (len (tracedata .Spans ))),
207- }, "" )
165+ return err
208166}
0 commit comments