Skip to content

Commit 466abaa

Browse files
committed
[chore] Create service/internal/obsconsumer package
1 parent 0f5d764 commit 466abaa

File tree

10 files changed

+1275
-0
lines changed

10 files changed

+1275
-0
lines changed

service/internal/obsconsumer/logs.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package obsconsumer // import "go.opentelemetry.io/collector/service/internal/obsconsumer"
5+
6+
import (
7+
"context"
8+
9+
"go.opentelemetry.io/otel/attribute"
10+
"go.opentelemetry.io/otel/metric"
11+
12+
"go.opentelemetry.io/collector/consumer"
13+
"go.opentelemetry.io/collector/pdata/plog"
14+
)
15+
16+
var _ consumer.Logs = Logs{}
17+
18+
func NewLogs(consumer consumer.Logs, itemCounter metric.Int64Counter, opts ...Option) Logs {
19+
o := options{}
20+
for _, opt := range opts {
21+
opt.apply(&o)
22+
}
23+
return Logs{
24+
consumer: consumer,
25+
itemCounter: itemCounter,
26+
options: o,
27+
}
28+
}
29+
30+
type Logs struct {
31+
consumer consumer.Logs
32+
itemCounter metric.Int64Counter
33+
options
34+
}
35+
36+
func (c Logs) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
37+
// Measure before calling ConsumeLogs because the data may be mutated downstream
38+
itemCount := ld.LogRecordCount()
39+
40+
err := c.consumer.ConsumeLogs(ctx, ld)
41+
outcome := "success"
42+
if err != nil {
43+
outcome = "failure"
44+
}
45+
46+
var attrs []attribute.KeyValue
47+
attrs = append(attrs, c.staticDataPointAttributes...)
48+
attrs = append(attrs, attribute.String("outcome", outcome))
49+
c.itemCounter.Add(ctx, int64(itemCount), metric.WithAttributes(attrs...))
50+
return err
51+
}
52+
53+
func (c Logs) Capabilities() consumer.Capabilities {
54+
return c.consumer.Capabilities()
55+
}
Lines changed: 251 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,251 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package obsconsumer_test
5+
6+
import (
7+
"context"
8+
"errors"
9+
"testing"
10+
11+
"github.com/stretchr/testify/assert"
12+
"github.com/stretchr/testify/require"
13+
"go.opentelemetry.io/otel/attribute"
14+
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
15+
"go.opentelemetry.io/otel/sdk/metric/metricdata"
16+
17+
"go.opentelemetry.io/collector/consumer"
18+
"go.opentelemetry.io/collector/pdata/plog"
19+
"go.opentelemetry.io/collector/service/internal/obsconsumer"
20+
)
21+
22+
type mockLogsConsumer struct {
23+
err error
24+
capabilities consumer.Capabilities
25+
}
26+
27+
func (m *mockLogsConsumer) ConsumeLogs(_ context.Context, _ plog.Logs) error {
28+
return m.err
29+
}
30+
31+
func (m *mockLogsConsumer) Capabilities() consumer.Capabilities {
32+
return m.capabilities
33+
}
34+
35+
func TestLogsConsumeSuccess(t *testing.T) {
36+
ctx := context.Background()
37+
mockConsumer := &mockLogsConsumer{}
38+
39+
reader := sdkmetric.NewManualReader()
40+
mp := sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader))
41+
meter := mp.Meter("test")
42+
counter, err := meter.Int64Counter("test_counter")
43+
require.NoError(t, err)
44+
45+
consumer := obsconsumer.NewLogs(mockConsumer, counter)
46+
47+
ld := plog.NewLogs()
48+
r := ld.ResourceLogs().AppendEmpty()
49+
sl := r.ScopeLogs().AppendEmpty()
50+
sl.LogRecords().AppendEmpty()
51+
52+
err = consumer.ConsumeLogs(ctx, ld)
53+
require.NoError(t, err)
54+
55+
var rm metricdata.ResourceMetrics
56+
err = reader.Collect(ctx, &rm)
57+
require.NoError(t, err)
58+
require.Len(t, rm.ScopeMetrics, 1)
59+
require.Len(t, rm.ScopeMetrics[0].Metrics, 1)
60+
61+
metric := rm.ScopeMetrics[0].Metrics[0]
62+
require.Equal(t, "test_counter", metric.Name)
63+
64+
data := metric.Data.(metricdata.Sum[int64])
65+
require.Len(t, data.DataPoints, 1)
66+
require.Equal(t, int64(1), data.DataPoints[0].Value)
67+
68+
attrs := data.DataPoints[0].Attributes
69+
require.Equal(t, 1, attrs.Len())
70+
val, ok := attrs.Value(attribute.Key("outcome"))
71+
require.True(t, ok)
72+
require.Equal(t, "success", val.Emit())
73+
}
74+
75+
func TestLogsConsumeFailure(t *testing.T) {
76+
ctx := context.Background()
77+
expectedErr := errors.New("test error")
78+
mockConsumer := &mockLogsConsumer{err: expectedErr}
79+
80+
reader := sdkmetric.NewManualReader()
81+
mp := sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader))
82+
meter := mp.Meter("test")
83+
counter, err := meter.Int64Counter("test_counter")
84+
require.NoError(t, err)
85+
86+
consumer := obsconsumer.NewLogs(mockConsumer, counter)
87+
88+
ld := plog.NewLogs()
89+
r := ld.ResourceLogs().AppendEmpty()
90+
sl := r.ScopeLogs().AppendEmpty()
91+
sl.LogRecords().AppendEmpty()
92+
93+
err = consumer.ConsumeLogs(ctx, ld)
94+
assert.Equal(t, expectedErr, err)
95+
96+
var rm metricdata.ResourceMetrics
97+
err = reader.Collect(ctx, &rm)
98+
require.NoError(t, err)
99+
require.Len(t, rm.ScopeMetrics, 1)
100+
require.Len(t, rm.ScopeMetrics[0].Metrics, 1)
101+
102+
metric := rm.ScopeMetrics[0].Metrics[0]
103+
require.Equal(t, "test_counter", metric.Name)
104+
105+
data := metric.Data.(metricdata.Sum[int64])
106+
require.Len(t, data.DataPoints, 1)
107+
require.Equal(t, int64(1), data.DataPoints[0].Value)
108+
109+
attrs := data.DataPoints[0].Attributes
110+
require.Equal(t, 1, attrs.Len())
111+
val, ok := attrs.Value(attribute.Key("outcome"))
112+
require.True(t, ok)
113+
require.Equal(t, "failure", val.Emit())
114+
}
115+
116+
func TestLogsWithStaticAttributes(t *testing.T) {
117+
ctx := context.Background()
118+
mockConsumer := &mockLogsConsumer{}
119+
120+
reader := sdkmetric.NewManualReader()
121+
mp := sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader))
122+
meter := mp.Meter("test")
123+
counter, err := meter.Int64Counter("test_counter")
124+
require.NoError(t, err)
125+
126+
staticAttr := attribute.String("test", "value")
127+
consumer := obsconsumer.NewLogs(mockConsumer, counter, obsconsumer.WithStaticDataPointAttribute(staticAttr))
128+
129+
ld := plog.NewLogs()
130+
r := ld.ResourceLogs().AppendEmpty()
131+
sl := r.ScopeLogs().AppendEmpty()
132+
sl.LogRecords().AppendEmpty()
133+
134+
err = consumer.ConsumeLogs(ctx, ld)
135+
require.NoError(t, err)
136+
137+
var rm metricdata.ResourceMetrics
138+
err = reader.Collect(ctx, &rm)
139+
require.NoError(t, err)
140+
require.Len(t, rm.ScopeMetrics, 1)
141+
require.Len(t, rm.ScopeMetrics[0].Metrics, 1)
142+
143+
metric := rm.ScopeMetrics[0].Metrics[0]
144+
require.Equal(t, "test_counter", metric.Name)
145+
146+
data := metric.Data.(metricdata.Sum[int64])
147+
require.Len(t, data.DataPoints, 1)
148+
require.Equal(t, int64(1), data.DataPoints[0].Value)
149+
150+
attrs := data.DataPoints[0].Attributes
151+
require.Equal(t, 2, attrs.Len())
152+
val, ok := attrs.Value(attribute.Key("test"))
153+
require.True(t, ok)
154+
require.Equal(t, "value", val.Emit())
155+
val, ok = attrs.Value(attribute.Key("outcome"))
156+
require.True(t, ok)
157+
require.Equal(t, "success", val.Emit())
158+
}
159+
160+
func TestLogsMultipleItemsMixedOutcomes(t *testing.T) {
161+
ctx := context.Background()
162+
expectedErr := errors.New("test error")
163+
mockConsumer := &mockLogsConsumer{}
164+
165+
reader := sdkmetric.NewManualReader()
166+
mp := sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader))
167+
meter := mp.Meter("test")
168+
counter, err := meter.Int64Counter("test_counter")
169+
require.NoError(t, err)
170+
171+
consumer := obsconsumer.NewLogs(mockConsumer, counter)
172+
173+
// First batch: 2 successful items
174+
ld1 := plog.NewLogs()
175+
for range 2 {
176+
r := ld1.ResourceLogs().AppendEmpty()
177+
sl := r.ScopeLogs().AppendEmpty()
178+
sl.LogRecords().AppendEmpty()
179+
}
180+
err = consumer.ConsumeLogs(ctx, ld1)
181+
require.NoError(t, err)
182+
183+
// Second batch: 1 failed item
184+
mockConsumer.err = expectedErr
185+
ld2 := plog.NewLogs()
186+
r := ld2.ResourceLogs().AppendEmpty()
187+
sl := r.ScopeLogs().AppendEmpty()
188+
sl.LogRecords().AppendEmpty()
189+
err = consumer.ConsumeLogs(ctx, ld2)
190+
assert.Equal(t, expectedErr, err)
191+
192+
// Third batch: 2 successful items
193+
mockConsumer.err = nil
194+
ld3 := plog.NewLogs()
195+
for range 2 {
196+
r = ld3.ResourceLogs().AppendEmpty()
197+
sl = r.ScopeLogs().AppendEmpty()
198+
sl.LogRecords().AppendEmpty()
199+
}
200+
err = consumer.ConsumeLogs(ctx, ld3)
201+
require.NoError(t, err)
202+
203+
// Fourth batch: 1 failed item
204+
mockConsumer.err = expectedErr
205+
ld4 := plog.NewLogs()
206+
r = ld4.ResourceLogs().AppendEmpty()
207+
sl = r.ScopeLogs().AppendEmpty()
208+
sl.LogRecords().AppendEmpty()
209+
err = consumer.ConsumeLogs(ctx, ld4)
210+
assert.Equal(t, expectedErr, err)
211+
212+
var rm metricdata.ResourceMetrics
213+
err = reader.Collect(ctx, &rm)
214+
require.NoError(t, err)
215+
require.Len(t, rm.ScopeMetrics, 1)
216+
require.Len(t, rm.ScopeMetrics[0].Metrics, 1)
217+
218+
metric := rm.ScopeMetrics[0].Metrics[0]
219+
require.Equal(t, "test_counter", metric.Name)
220+
221+
data := metric.Data.(metricdata.Sum[int64])
222+
require.Len(t, data.DataPoints, 2)
223+
224+
// Find success and failure data points
225+
var successDP, failureDP metricdata.DataPoint[int64]
226+
for _, dp := range data.DataPoints {
227+
val, ok := dp.Attributes.Value(attribute.Key("outcome"))
228+
if ok && val.Emit() == "success" {
229+
successDP = dp
230+
} else {
231+
failureDP = dp
232+
}
233+
}
234+
235+
require.Equal(t, int64(4), successDP.Value)
236+
require.Equal(t, int64(2), failureDP.Value)
237+
}
238+
239+
func TestLogsCapabilities(t *testing.T) {
240+
mockConsumer := &mockLogsConsumer{
241+
capabilities: consumer.Capabilities{MutatesData: true},
242+
}
243+
reader := sdkmetric.NewManualReader()
244+
mp := sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader))
245+
meter := mp.Meter("test")
246+
counter, err := meter.Int64Counter("test_counter")
247+
require.NoError(t, err)
248+
249+
consumer := obsconsumer.NewLogs(mockConsumer, counter)
250+
require.Equal(t, consumer.Capabilities(), mockConsumer.capabilities)
251+
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package obsconsumer // import "go.opentelemetry.io/collector/service/internal/obsconsumer"
5+
6+
import (
7+
"context"
8+
9+
"go.opentelemetry.io/otel/attribute"
10+
"go.opentelemetry.io/otel/metric"
11+
12+
"go.opentelemetry.io/collector/consumer"
13+
"go.opentelemetry.io/collector/pdata/pmetric"
14+
)
15+
16+
var _ consumer.Metrics = Metrics{}
17+
18+
func NewMetrics(consumer consumer.Metrics, itemCounter metric.Int64Counter, opts ...Option) Metrics {
19+
o := options{}
20+
for _, opt := range opts {
21+
opt.apply(&o)
22+
}
23+
return Metrics{
24+
consumer: consumer,
25+
itemCounter: itemCounter,
26+
options: o,
27+
}
28+
}
29+
30+
type Metrics struct {
31+
consumer consumer.Metrics
32+
itemCounter metric.Int64Counter
33+
options
34+
}
35+
36+
func (c Metrics) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error {
37+
// Measure before calling ConsumeMetrics because the data may be mutated downstream
38+
itemCount := md.DataPointCount()
39+
40+
err := c.consumer.ConsumeMetrics(ctx, md)
41+
outcome := "success"
42+
if err != nil {
43+
outcome = "failure"
44+
}
45+
46+
var attrs []attribute.KeyValue
47+
attrs = append(attrs, c.staticDataPointAttributes...)
48+
attrs = append(attrs, attribute.String("outcome", outcome))
49+
c.itemCounter.Add(ctx, int64(itemCount), metric.WithAttributes(attrs...))
50+
return err
51+
}
52+
53+
func (c Metrics) Capabilities() consumer.Capabilities {
54+
return c.consumer.Capabilities()
55+
}

0 commit comments

Comments
 (0)