Skip to content

Commit 045fb26

Browse files
committed
Remove Receive from obsreport.Receiver funcs
Signed-off-by: Bogdan Drutu <[email protected]>
1 parent 66c4908 commit 045fb26

File tree

14 files changed

+73
-109
lines changed

14 files changed

+73
-109
lines changed

obsreport/doc.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,10 @@
2323
// being received, ie.:
2424
//
2525
// * TraceData receive operations should use the pair:
26-
// StartTraceDataReceiveOp/EndTraceDataReceiveOp
26+
// StartTracesOp/EndTracesOp
2727
//
2828
// * Metrics receive operations should use the pair:
29-
// StartMetricsReceiveOp/EndMetricsReceiveOp
29+
// StartMetricsOp/EndMetricsOp
3030
//
3131
// Similar for exporters:
3232
//

obsreport/obsreport_receiver.go

Lines changed: 28 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ type StartReceiveOption func(*StartReceiveOptions)
5151
// for {
5252
// // Since the context outlives the individual receive operations call obsreport using
5353
// // WithLongLivedCtx().
54-
// ctx := obsreport.StartTraceDataReceiveOp(
54+
// ctx := obsreport.StartTracesOp(
5555
// longLivedCtx,
5656
// r.config.Name(),
5757
// r.transport,
@@ -62,7 +62,7 @@ type StartReceiveOption func(*StartReceiveOptions)
6262
// if ok {
6363
// err = r.nextConsumer.ConsumeTraces(ctx, td)
6464
// }
65-
// obsreport.EndTraceDataReceiveOp(
65+
// obsreport.EndTracesOp(
6666
// ctx,
6767
// r.format,
6868
// len(td.Spans),
@@ -99,94 +99,58 @@ func NewReceiver(cfg ReceiverSettings) *Receiver {
9999
}
100100
}
101101

102-
// StartTraceDataReceiveOp is called when a request is received from a client.
102+
// StartTracesOp is called when a request is received from a client.
103103
// The returned context should be used in other calls to the obsreport functions
104104
// dealing with the same receive operation.
105-
func (rec *Receiver) StartTraceDataReceiveOp(
106-
operationCtx context.Context,
107-
opt ...StartReceiveOption,
108-
) context.Context {
109-
return rec.traceReceiveOp(
110-
operationCtx,
111-
obsmetrics.ReceiveTraceDataOperationSuffix,
112-
opt...)
105+
func (rec *Receiver) StartTracesOp(operationCtx context.Context, opt ...StartReceiveOption) context.Context {
106+
return rec.startOp(operationCtx, obsmetrics.ReceiveTraceDataOperationSuffix, opt...)
113107
}
114108

115-
// EndTraceDataReceiveOp completes the receive operation that was started with
116-
// StartTraceDataReceiveOp.
117-
func (rec *Receiver) EndTraceDataReceiveOp(
109+
// EndTracesOp completes the receive operation that was started with
110+
// StartTracesOp.
111+
func (rec *Receiver) EndTracesOp(
118112
receiverCtx context.Context,
119113
format string,
120114
numReceivedSpans int,
121115
err error,
122116
) {
123-
rec.endReceiveOp(
124-
receiverCtx,
125-
format,
126-
numReceivedSpans,
127-
err,
128-
config.TracesDataType,
129-
)
117+
rec.endOp(receiverCtx, format, numReceivedSpans, err, config.TracesDataType)
130118
}
131119

132-
// StartLogsReceiveOp is called when a request is received from a client.
120+
// StartLogsOp is called when a request is received from a client.
133121
// The returned context should be used in other calls to the obsreport functions
134122
// dealing with the same receive operation.
135-
func (rec *Receiver) StartLogsReceiveOp(
136-
operationCtx context.Context,
137-
opt ...StartReceiveOption,
138-
) context.Context {
139-
return rec.traceReceiveOp(
140-
operationCtx,
141-
obsmetrics.ReceiverLogsOperationSuffix,
142-
opt...)
123+
func (rec *Receiver) StartLogsOp(operationCtx context.Context, opt ...StartReceiveOption) context.Context {
124+
return rec.startOp(operationCtx, obsmetrics.ReceiverLogsOperationSuffix, opt...)
143125
}
144126

145-
// EndLogsReceiveOp completes the receive operation that was started with
146-
// StartLogsReceiveOp.
147-
func (rec *Receiver) EndLogsReceiveOp(
127+
// EndLogsOp completes the receive operation that was started with
128+
// StartLogsOp.
129+
func (rec *Receiver) EndLogsOp(
148130
receiverCtx context.Context,
149131
format string,
150132
numReceivedLogRecords int,
151133
err error,
152134
) {
153-
rec.endReceiveOp(
154-
receiverCtx,
155-
format,
156-
numReceivedLogRecords,
157-
err,
158-
config.LogsDataType,
159-
)
135+
rec.endOp(receiverCtx, format, numReceivedLogRecords, err, config.LogsDataType)
160136
}
161137

162-
// StartMetricsReceiveOp is called when a request is received from a client.
138+
// StartMetricsOp is called when a request is received from a client.
163139
// The returned context should be used in other calls to the obsreport functions
164140
// dealing with the same receive operation.
165-
func (rec *Receiver) StartMetricsReceiveOp(
166-
operationCtx context.Context,
167-
opt ...StartReceiveOption,
168-
) context.Context {
169-
return rec.traceReceiveOp(
170-
operationCtx,
171-
obsmetrics.ReceiverMetricsOperationSuffix,
172-
opt...)
141+
func (rec *Receiver) StartMetricsOp(operationCtx context.Context, opt ...StartReceiveOption) context.Context {
142+
return rec.startOp(operationCtx, obsmetrics.ReceiverMetricsOperationSuffix, opt...)
173143
}
174144

175-
// EndMetricsReceiveOp completes the receive operation that was started with
176-
// StartMetricsReceiveOp.
177-
func (rec *Receiver) EndMetricsReceiveOp(
145+
// EndMetricsOp completes the receive operation that was started with
146+
// StartMetricsOp.
147+
func (rec *Receiver) EndMetricsOp(
178148
receiverCtx context.Context,
179149
format string,
180150
numReceivedPoints int,
181151
err error,
182152
) {
183-
rec.endReceiveOp(
184-
receiverCtx,
185-
format,
186-
numReceivedPoints,
187-
err,
188-
config.MetricsDataType,
189-
)
153+
rec.endOp(receiverCtx, format, numReceivedPoints, err, config.MetricsDataType)
190154
}
191155

192156
// ReceiverContext adds the keys used when recording observability metrics to
@@ -205,9 +169,9 @@ func ReceiverContext(
205169
return ctx
206170
}
207171

208-
// traceReceiveOp creates the span used to trace the operation. Returning
172+
// startOp creates the span used to trace the operation. Returning
209173
// the updated context with the created span.
210-
func (rec *Receiver) traceReceiveOp(
174+
func (rec *Receiver) startOp(
211175
receiverCtx context.Context,
212176
operationSuffix string,
213177
opt ...StartReceiveOption,
@@ -224,7 +188,7 @@ func (rec *Receiver) traceReceiveOp(
224188
ctx, span = trace.StartSpan(receiverCtx, spanName)
225189
} else {
226190
// Since the receiverCtx is long lived do not use it to start the span.
227-
// This way this trace ends when the EndTraceDataReceiveOp is called.
191+
// This way this trace ends when the EndTracesOp is called.
228192
// Here is safe to ignore the returned context since it is not used below.
229193
_, span = trace.StartSpan(context.Background(), spanName)
230194

@@ -240,8 +204,8 @@ func (rec *Receiver) traceReceiveOp(
240204
return ctx
241205
}
242206

243-
// endReceiveOp records the observability signals at the end of an operation.
244-
func (rec *Receiver) endReceiveOp(
207+
// endOp records the observability signals at the end of an operation.
208+
func (rec *Receiver) endOp(
245209
receiverCtx context.Context,
246210
format string,
247211
numReceivedItems int,

obsreport/obsreport_test.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -72,10 +72,10 @@ func TestReceiveTraceDataOp(t *testing.T) {
7272
rcvdSpans := []int{13, 42}
7373
for i, param := range params {
7474
rec := NewReceiver(ReceiverSettings{ReceiverID: receiver, Transport: param.transport})
75-
ctx := rec.StartTraceDataReceiveOp(receiverCtx)
75+
ctx := rec.StartTracesOp(receiverCtx)
7676
assert.NotNil(t, ctx)
7777

78-
rec.EndTraceDataReceiveOp(
78+
rec.EndTracesOp(
7979
ctx,
8080
format,
8181
rcvdSpans[i],
@@ -133,10 +133,10 @@ func TestReceiveLogsOp(t *testing.T) {
133133
rcvdLogRecords := []int{13, 42}
134134
for i, param := range params {
135135
rec := NewReceiver(ReceiverSettings{ReceiverID: receiver, Transport: param.transport})
136-
ctx := rec.StartLogsReceiveOp(receiverCtx)
136+
ctx := rec.StartLogsOp(receiverCtx)
137137
assert.NotNil(t, ctx)
138138

139-
rec.EndLogsReceiveOp(
139+
rec.EndLogsOp(
140140
ctx,
141141
format,
142142
rcvdLogRecords[i],
@@ -194,10 +194,10 @@ func TestReceiveMetricsOp(t *testing.T) {
194194
rcvdMetricPts := []int{23, 29}
195195
for i, param := range params {
196196
rec := NewReceiver(ReceiverSettings{ReceiverID: receiver, Transport: param.transport})
197-
ctx := rec.StartMetricsReceiveOp(receiverCtx)
197+
ctx := rec.StartMetricsOp(receiverCtx)
198198
assert.NotNil(t, ctx)
199199

200-
rec.EndMetricsReceiveOp(
200+
rec.EndMetricsOp(
201201
ctx,
202202
format,
203203
rcvdMetricPts[i],
@@ -465,12 +465,12 @@ func TestReceiveWithLongLivedCtx(t *testing.T) {
465465
// Use a new context on each operation to simulate distinct operations
466466
// under the same long lived context.
467467
rec := NewReceiver(ReceiverSettings{ReceiverID: receiver, Transport: transport})
468-
ctx := rec.StartTraceDataReceiveOp(
468+
ctx := rec.StartTracesOp(
469469
longLivedCtx,
470470
WithLongLivedCtx())
471471
assert.NotNil(t, ctx)
472472

473-
rec.EndTraceDataReceiveOp(
473+
rec.EndTracesOp(
474474
ctx,
475475
format,
476476
op.numSpans,

obsreport/obsreporttest/obsreporttest_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,9 @@ func TestCheckReceiverTracesViews(t *testing.T) {
4444

4545
receiverCtx := obsreport.ReceiverContext(context.Background(), receiver, transport)
4646
rec := obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverID: receiver, Transport: transport})
47-
ctx := rec.StartTraceDataReceiveOp(receiverCtx)
47+
ctx := rec.StartTracesOp(receiverCtx)
4848
assert.NotNil(t, ctx)
49-
rec.EndTraceDataReceiveOp(
49+
rec.EndTracesOp(
5050
ctx,
5151
format,
5252
7,
@@ -62,9 +62,9 @@ func TestCheckReceiverMetricsViews(t *testing.T) {
6262

6363
receiverCtx := obsreport.ReceiverContext(context.Background(), receiver, transport)
6464
rec := obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverID: receiver, Transport: transport})
65-
ctx := rec.StartMetricsReceiveOp(receiverCtx)
65+
ctx := rec.StartMetricsOp(receiverCtx)
6666
assert.NotNil(t, ctx)
67-
rec.EndMetricsReceiveOp(ctx, format, 7, nil)
67+
rec.EndMetricsOp(ctx, format, 7, nil)
6868

6969
obsreporttest.CheckReceiverMetrics(t, receiver, transport, 7, 0)
7070
}
@@ -76,9 +76,9 @@ func TestCheckReceiverLogsViews(t *testing.T) {
7676

7777
receiverCtx := obsreport.ReceiverContext(context.Background(), receiver, transport)
7878
rec := obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverID: receiver, Transport: transport})
79-
ctx := rec.StartLogsReceiveOp(receiverCtx)
79+
ctx := rec.StartLogsOp(receiverCtx)
8080
assert.NotNil(t, ctx)
81-
rec.EndLogsReceiveOp(ctx, format, 7, nil)
81+
rec.EndLogsOp(ctx, format, 7, nil)
8282

8383
obsreporttest.CheckReceiverLogs(t, receiver, transport, 7, 0)
8484
}

receiver/jaegerreceiver/trace_receiver.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -245,10 +245,10 @@ func (h *agentHandler) EmitZipkinBatch(context.Context, []*zipkincore.Span) (err
245245
// Jaeger spans received by the Jaeger agent processor.
246246
func (h *agentHandler) EmitBatch(ctx context.Context, batch *jaeger.Batch) error {
247247
ctx = obsreport.ReceiverContext(ctx, h.id, h.transport)
248-
ctx = h.obsrecv.StartTraceDataReceiveOp(ctx)
248+
ctx = h.obsrecv.StartTracesOp(ctx)
249249

250250
numSpans, err := consumeTraces(ctx, batch, h.nextConsumer)
251-
h.obsrecv.EndTraceDataReceiveOp(ctx, thriftFormat, numSpans, err)
251+
h.obsrecv.EndTracesOp(ctx, thriftFormat, numSpans, err)
252252
return err
253253
}
254254

@@ -273,12 +273,12 @@ func (jr *jReceiver) PostSpans(ctx context.Context, r *api_v2.PostSpansRequest)
273273
}
274274

275275
ctx = obsreport.ReceiverContext(ctx, jr.id, grpcTransport)
276-
ctx = jr.grpcObsrecv.StartTraceDataReceiveOp(ctx)
276+
ctx = jr.grpcObsrecv.StartTracesOp(ctx)
277277

278278
td := jaegertranslator.ProtoBatchToInternalTraces(r.GetBatch())
279279

280280
err := jr.nextConsumer.ConsumeTraces(ctx, td)
281-
jr.grpcObsrecv.EndTraceDataReceiveOp(ctx, protobufFormat, len(r.GetBatch().Spans), err)
281+
jr.grpcObsrecv.EndTracesOp(ctx, protobufFormat, len(r.GetBatch().Spans), err)
282282
if err != nil {
283283
return nil, err
284284
}
@@ -423,12 +423,12 @@ func (jr *jReceiver) HandleThriftHTTPBatch(w http.ResponseWriter, r *http.Reques
423423
}
424424

425425
ctx = obsreport.ReceiverContext(ctx, jr.id, collectorHTTPTransport)
426-
ctx = jr.httpObsrecv.StartTraceDataReceiveOp(ctx)
426+
ctx = jr.httpObsrecv.StartTracesOp(ctx)
427427

428428
batch, hErr := jr.decodeThriftHTTPBody(r)
429429
if hErr != nil {
430430
http.Error(w, html.EscapeString(hErr.msg), hErr.statusCode)
431-
jr.httpObsrecv.EndTraceDataReceiveOp(ctx, thriftFormat, 0, hErr)
431+
jr.httpObsrecv.EndTracesOp(ctx, thriftFormat, 0, hErr)
432432
return
433433
}
434434

@@ -438,7 +438,7 @@ func (jr *jReceiver) HandleThriftHTTPBatch(w http.ResponseWriter, r *http.Reques
438438
} else {
439439
w.WriteHeader(http.StatusAccepted)
440440
}
441-
jr.httpObsrecv.EndTraceDataReceiveOp(ctx, thriftFormat, numSpans, err)
441+
jr.httpObsrecv.EndTracesOp(ctx, thriftFormat, numSpans, err)
442442
}
443443

444444
func (jr *jReceiver) startCollector(host component.Host) error {

receiver/kafkareceiver/kafka_receiver.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,7 @@ func (c *tracesConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSe
260260
session.MarkMessage(message, "")
261261

262262
ctx := obsreport.ReceiverContext(session.Context(), c.id, transport)
263-
ctx = c.obsrecv.StartTraceDataReceiveOp(ctx)
263+
ctx = c.obsrecv.StartTracesOp(ctx)
264264
statsTags := []tag.Mutator{tag.Insert(tagInstanceName, c.id.String())}
265265
_ = stats.RecordWithTags(ctx, statsTags,
266266
statMessageCount.M(1),
@@ -275,7 +275,7 @@ func (c *tracesConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSe
275275

276276
spanCount := traces.SpanCount()
277277
err = c.nextConsumer.ConsumeTraces(session.Context(), traces)
278-
c.obsrecv.EndTraceDataReceiveOp(ctx, c.unmarshaler.Encoding(), spanCount, err)
278+
c.obsrecv.EndTracesOp(ctx, c.unmarshaler.Encoding(), spanCount, err)
279279
if err != nil {
280280
return err
281281
}
@@ -312,7 +312,7 @@ func (c *logsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSess
312312
session.MarkMessage(message, "")
313313

314314
ctx := obsreport.ReceiverContext(session.Context(), c.id, transport)
315-
ctx = c.obsrecv.StartTraceDataReceiveOp(ctx)
315+
ctx = c.obsrecv.StartTracesOp(ctx)
316316
_ = stats.RecordWithTags(
317317
ctx,
318318
[]tag.Mutator{tag.Insert(tagInstanceName, c.id.String())},
@@ -328,7 +328,7 @@ func (c *logsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSess
328328

329329
err = c.nextConsumer.ConsumeLogs(session.Context(), logs)
330330
// TODO
331-
c.obsrecv.EndTraceDataReceiveOp(ctx, c.unmarshaler.Encoding(), logs.LogRecordCount(), err)
331+
c.obsrecv.EndTracesOp(ctx, c.unmarshaler.Encoding(), logs.LogRecordCount(), err)
332332
if err != nil {
333333
return err
334334
}

receiver/opencensusreceiver/ocmetrics/opencensus.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ func (ocr *Receiver) processReceivedMsg(
124124
}
125125

126126
func (ocr *Receiver) sendToNextConsumer(longLivedRPCCtx context.Context, node *commonpb.Node, resource *resourcepb.Resource, metrics []*ocmetrics.Metric) error {
127-
ctx := ocr.obsrecv.StartMetricsReceiveOp(
127+
ctx := ocr.obsrecv.StartMetricsOp(
128128
longLivedRPCCtx,
129129
obsreport.WithLongLivedCtx())
130130

@@ -141,7 +141,7 @@ func (ocr *Receiver) sendToNextConsumer(longLivedRPCCtx context.Context, node *c
141141
consumerErr = ocr.nextConsumer.ConsumeMetrics(ctx, internaldata.OCToMetrics(node, resource, metrics))
142142
}
143143

144-
ocr.obsrecv.EndMetricsReceiveOp(
144+
ocr.obsrecv.EndMetricsOp(
145145
ctx,
146146
receiverDataFormat,
147147
numPoints,

receiver/opencensusreceiver/octrace/opencensus.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -144,12 +144,12 @@ func (ocr *Receiver) processReceivedMsg(
144144
}
145145

146146
func (ocr *Receiver) sendToNextConsumer(longLivedRPCCtx context.Context, td pdata.Traces) error {
147-
ctx := ocr.obsrecv.StartTraceDataReceiveOp(
147+
ctx := ocr.obsrecv.StartTracesOp(
148148
longLivedRPCCtx,
149149
obsreport.WithLongLivedCtx())
150150

151151
err := ocr.nextConsumer.ConsumeTraces(ctx, td)
152-
ocr.obsrecv.EndTraceDataReceiveOp(ctx, receiverDataFormat, td.SpanCount(), err)
152+
ocr.obsrecv.EndTracesOp(ctx, receiverDataFormat, td.SpanCount(), err)
153153

154154
return err
155155
}

0 commit comments

Comments
 (0)