@@ -16,8 +16,10 @@ import (
16
16
"go.opentelemetry.io/collector/consumer"
17
17
"go.opentelemetry.io/collector/pdata/pmetric"
18
18
"go.opentelemetry.io/collector/receiver"
19
+ "go.opentelemetry.io/collector/receiver/receiverhelper"
19
20
"go.uber.org/zap"
20
21
22
+ "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/metadata"
21
23
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/protocol"
22
24
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/transport"
23
25
)
@@ -31,6 +33,7 @@ type statsdReceiver struct {
31
33
32
34
server transport.Server
33
35
reporter transport.Reporter
36
+ obsrecv * receiverhelper.ObsReport
34
37
parser protocol.Parser
35
38
nextConsumer consumer.Metrics
36
39
cancel context.CancelFunc
@@ -52,10 +55,19 @@ func newReceiver(
52
55
return nil , err
53
56
}
54
57
58
+ obsrecv , err := receiverhelper .NewObsReport (receiverhelper.ObsReportSettings {
59
+ ReceiverID : set .ID ,
60
+ ReceiverCreateSettings : set ,
61
+ })
62
+ if err != nil {
63
+ return nil , err
64
+ }
65
+
55
66
r := & statsdReceiver {
56
67
settings : set ,
57
68
config : & config ,
58
69
nextConsumer : nextConsumer ,
70
+ obsrecv : obsrecv ,
59
71
reporter : rep ,
60
72
parser : & protocol.StatsDParser {
61
73
BuildInfo : set .BuildInfo ,
@@ -110,10 +122,13 @@ func (r *statsdReceiver) Start(ctx context.Context, _ component.Host) error {
110
122
batchMetrics := r .parser .GetMetrics ()
111
123
for _ , batch := range batchMetrics {
112
124
batchCtx := client .NewContext (ctx , batch .Info )
113
-
114
- if err := r .Flush (batchCtx , batch .Metrics , r .nextConsumer ); err != nil {
125
+ numPoints := batch .Metrics .DataPointCount ()
126
+ flushCtx := r .obsrecv .StartMetricsOp (batchCtx )
127
+ err := r .Flush (flushCtx , batch .Metrics , r .nextConsumer )
128
+ if err != nil {
115
129
r .reporter .OnDebugf ("Error flushing metrics" , zap .Error (err ))
116
130
}
131
+ r .obsrecv .EndMetricsOp (flushCtx , metadata .Type .String (), numPoints , err )
117
132
}
118
133
case metric := <- transferChan :
119
134
if err := r .parser .Aggregate (metric .Raw , metric .Addr ); err != nil {
0 commit comments