Skip to content

Commit 17bab9d

Browse files
committed
[chore] Create service/internal/obsconsumer package
1 parent 0f5d764 commit 17bab9d

File tree

11 files changed

+801
-0
lines changed

11 files changed

+801
-0
lines changed

service/internal/obsconsumer/logs.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package obsconsumer // import "go.opentelemetry.io/collector/service/internal/obsconsumer"
5+
6+
import (
7+
"context"
8+
9+
"go.opentelemetry.io/otel/attribute"
10+
"go.opentelemetry.io/otel/metric"
11+
12+
"go.opentelemetry.io/collector/consumer"
13+
"go.opentelemetry.io/collector/pdata/plog"
14+
)
15+
16+
var _ consumer.Logs = Logs{}
17+
18+
func NewLogs(consumer consumer.Logs, itemCounter metric.Int64Counter, opts ...Option) Logs {
19+
o := options{}
20+
for _, opt := range opts {
21+
opt.apply(&o)
22+
}
23+
return Logs{
24+
consumer: consumer,
25+
itemCounter: itemCounter,
26+
options: o,
27+
}
28+
}
29+
30+
type Logs struct {
31+
consumer consumer.Logs
32+
itemCounter metric.Int64Counter
33+
options
34+
}
35+
36+
func (c Logs) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
37+
var attrs []attribute.KeyValue
38+
attrs = append(attrs, c.options.staticDataPointAttributes...)
39+
err := c.consumer.ConsumeLogs(ctx, ld)
40+
outcome := "success"
41+
if err != nil {
42+
outcome = "failure"
43+
}
44+
attrs = append(attrs, attribute.String("outcome", outcome))
45+
c.itemCounter.Add(ctx, int64(ld.ResourceLogs().Len()), metric.WithAttributes(attrs...))
46+
return err
47+
}
48+
49+
func (c Logs) Capabilities() consumer.Capabilities {
50+
return c.consumer.Capabilities()
51+
}
Lines changed: 229 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,229 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package obsconsumer_test
5+
6+
import (
7+
"context"
8+
"errors"
9+
"testing"
10+
11+
"github.com/stretchr/testify/assert"
12+
"github.com/stretchr/testify/require"
13+
"go.opentelemetry.io/collector/consumer"
14+
"go.opentelemetry.io/collector/pdata/plog"
15+
"go.opentelemetry.io/otel/attribute"
16+
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
17+
"go.opentelemetry.io/otel/sdk/metric/metricdata"
18+
19+
"go.opentelemetry.io/collector/service/internal/obsconsumer"
20+
)
21+
22+
type mockLogsConsumer struct {
23+
err error
24+
}
25+
26+
func (m *mockLogsConsumer) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
27+
return m.err
28+
}
29+
30+
func (m *mockLogsConsumer) Capabilities() consumer.Capabilities {
31+
return consumer.Capabilities{MutatesData: false}
32+
}
33+
34+
func TestLogsConsumeSuccess(t *testing.T) {
35+
ctx := context.Background()
36+
mockConsumer := &mockLogsConsumer{}
37+
38+
reader := sdkmetric.NewManualReader()
39+
mp := sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader))
40+
meter := mp.Meter("test")
41+
counter, err := meter.Int64Counter("test_counter")
42+
require.NoError(t, err)
43+
44+
consumer := obsconsumer.NewLogs(mockConsumer, counter)
45+
46+
ld := plog.NewLogs()
47+
rl := ld.ResourceLogs().AppendEmpty()
48+
rl.ScopeLogs().AppendEmpty()
49+
50+
err = consumer.ConsumeLogs(ctx, ld)
51+
require.NoError(t, err)
52+
53+
var rm metricdata.ResourceMetrics
54+
err = reader.Collect(ctx, &rm)
55+
require.NoError(t, err)
56+
require.Len(t, rm.ScopeMetrics, 1)
57+
require.Len(t, rm.ScopeMetrics[0].Metrics, 1)
58+
59+
metric := rm.ScopeMetrics[0].Metrics[0]
60+
require.Equal(t, "test_counter", metric.Name)
61+
62+
data := metric.Data.(metricdata.Sum[int64])
63+
require.Len(t, data.DataPoints, 1)
64+
require.Equal(t, int64(1), data.DataPoints[0].Value)
65+
66+
attrs := data.DataPoints[0].Attributes
67+
require.Equal(t, 1, attrs.Len())
68+
val, ok := attrs.Value(attribute.Key("outcome"))
69+
require.True(t, ok)
70+
require.Equal(t, "success", val.Emit())
71+
}
72+
73+
func TestLogsConsumeFailure(t *testing.T) {
74+
ctx := context.Background()
75+
expectedErr := errors.New("test error")
76+
mockConsumer := &mockLogsConsumer{err: expectedErr}
77+
78+
reader := sdkmetric.NewManualReader()
79+
mp := sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader))
80+
meter := mp.Meter("test")
81+
counter, err := meter.Int64Counter("test_counter")
82+
require.NoError(t, err)
83+
84+
consumer := obsconsumer.NewLogs(mockConsumer, counter)
85+
86+
ld := plog.NewLogs()
87+
rl := ld.ResourceLogs().AppendEmpty()
88+
rl.ScopeLogs().AppendEmpty()
89+
90+
err = consumer.ConsumeLogs(ctx, ld)
91+
assert.Equal(t, expectedErr, err)
92+
93+
var rm metricdata.ResourceMetrics
94+
err = reader.Collect(ctx, &rm)
95+
require.NoError(t, err)
96+
require.Len(t, rm.ScopeMetrics, 1)
97+
require.Len(t, rm.ScopeMetrics[0].Metrics, 1)
98+
99+
metric := rm.ScopeMetrics[0].Metrics[0]
100+
require.Equal(t, "test_counter", metric.Name)
101+
102+
data := metric.Data.(metricdata.Sum[int64])
103+
require.Len(t, data.DataPoints, 1)
104+
require.Equal(t, int64(1), data.DataPoints[0].Value)
105+
106+
attrs := data.DataPoints[0].Attributes
107+
require.Equal(t, 1, attrs.Len())
108+
val, ok := attrs.Value(attribute.Key("outcome"))
109+
require.True(t, ok)
110+
require.Equal(t, "failure", val.Emit())
111+
}
112+
113+
func TestLogsWithStaticAttributes(t *testing.T) {
114+
ctx := context.Background()
115+
mockConsumer := &mockLogsConsumer{}
116+
117+
reader := sdkmetric.NewManualReader()
118+
mp := sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader))
119+
meter := mp.Meter("test")
120+
counter, err := meter.Int64Counter("test_counter")
121+
require.NoError(t, err)
122+
123+
staticAttr := attribute.String("test", "value")
124+
consumer := obsconsumer.NewLogs(mockConsumer, counter, obsconsumer.WithStaticDataPointAttribute(staticAttr))
125+
126+
ld := plog.NewLogs()
127+
rl := ld.ResourceLogs().AppendEmpty()
128+
rl.ScopeLogs().AppendEmpty()
129+
130+
err = consumer.ConsumeLogs(ctx, ld)
131+
require.NoError(t, err)
132+
133+
var rm metricdata.ResourceMetrics
134+
err = reader.Collect(ctx, &rm)
135+
require.NoError(t, err)
136+
require.Len(t, rm.ScopeMetrics, 1)
137+
require.Len(t, rm.ScopeMetrics[0].Metrics, 1)
138+
139+
metric := rm.ScopeMetrics[0].Metrics[0]
140+
require.Equal(t, "test_counter", metric.Name)
141+
142+
data := metric.Data.(metricdata.Sum[int64])
143+
require.Len(t, data.DataPoints, 1)
144+
require.Equal(t, int64(1), data.DataPoints[0].Value)
145+
146+
attrs := data.DataPoints[0].Attributes
147+
require.Equal(t, 2, attrs.Len())
148+
val, ok := attrs.Value(attribute.Key("test"))
149+
require.True(t, ok)
150+
require.Equal(t, "value", val.Emit())
151+
val, ok = attrs.Value(attribute.Key("outcome"))
152+
require.True(t, ok)
153+
require.Equal(t, "success", val.Emit())
154+
}
155+
156+
func TestLogsMultipleItemsMixedOutcomes(t *testing.T) {
157+
ctx := context.Background()
158+
expectedErr := errors.New("test error")
159+
mockConsumer := &mockLogsConsumer{}
160+
161+
reader := sdkmetric.NewManualReader()
162+
mp := sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader))
163+
meter := mp.Meter("test")
164+
counter, err := meter.Int64Counter("test_counter")
165+
require.NoError(t, err)
166+
167+
consumer := obsconsumer.NewLogs(mockConsumer, counter)
168+
169+
// First batch: 2 successful items
170+
ld1 := plog.NewLogs()
171+
for i := 0; i < 2; i++ {
172+
rl := ld1.ResourceLogs().AppendEmpty()
173+
rl.ScopeLogs().AppendEmpty()
174+
}
175+
err = consumer.ConsumeLogs(ctx, ld1)
176+
require.NoError(t, err)
177+
178+
// Second batch: 1 failed item
179+
mockConsumer.err = expectedErr
180+
ld2 := plog.NewLogs()
181+
rl := ld2.ResourceLogs().AppendEmpty()
182+
rl.ScopeLogs().AppendEmpty()
183+
err = consumer.ConsumeLogs(ctx, ld2)
184+
assert.Equal(t, expectedErr, err)
185+
186+
// Third batch: 2 successful items
187+
mockConsumer.err = nil
188+
ld3 := plog.NewLogs()
189+
for i := 0; i < 2; i++ {
190+
rl := ld3.ResourceLogs().AppendEmpty()
191+
rl.ScopeLogs().AppendEmpty()
192+
}
193+
err = consumer.ConsumeLogs(ctx, ld3)
194+
require.NoError(t, err)
195+
196+
// Fourth batch: 1 failed item
197+
mockConsumer.err = expectedErr
198+
ld4 := plog.NewLogs()
199+
rl = ld4.ResourceLogs().AppendEmpty()
200+
rl.ScopeLogs().AppendEmpty()
201+
err = consumer.ConsumeLogs(ctx, ld4)
202+
assert.Equal(t, expectedErr, err)
203+
204+
var rm metricdata.ResourceMetrics
205+
err = reader.Collect(ctx, &rm)
206+
require.NoError(t, err)
207+
require.Len(t, rm.ScopeMetrics, 1)
208+
require.Len(t, rm.ScopeMetrics[0].Metrics, 1)
209+
210+
metric := rm.ScopeMetrics[0].Metrics[0]
211+
require.Equal(t, "test_counter", metric.Name)
212+
213+
data := metric.Data.(metricdata.Sum[int64])
214+
require.Len(t, data.DataPoints, 2)
215+
216+
// Find success and failure data points
217+
var successDP, failureDP metricdata.DataPoint[int64]
218+
for _, dp := range data.DataPoints {
219+
val, ok := dp.Attributes.Value(attribute.Key("outcome"))
220+
if ok && val.Emit() == "success" {
221+
successDP = dp
222+
} else {
223+
failureDP = dp
224+
}
225+
}
226+
227+
require.Equal(t, int64(4), successDP.Value)
228+
require.Equal(t, int64(2), failureDP.Value)
229+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package obsconsumer // import "go.opentelemetry.io/collector/service/internal/obsconsumer"
5+
6+
import (
7+
"context"
8+
9+
"go.opentelemetry.io/otel/attribute"
10+
"go.opentelemetry.io/otel/metric"
11+
12+
"go.opentelemetry.io/collector/consumer"
13+
"go.opentelemetry.io/collector/pdata/pmetric"
14+
)
15+
16+
var _ consumer.Metrics = Metrics{}
17+
18+
func NewMetrics(consumer consumer.Metrics, itemCounter metric.Int64Counter, opts ...Option) Metrics {
19+
o := options{}
20+
for _, opt := range opts {
21+
opt.apply(&o)
22+
}
23+
return Metrics{
24+
consumer: consumer,
25+
itemCounter: itemCounter,
26+
options: o,
27+
}
28+
}
29+
30+
type Metrics struct {
31+
consumer consumer.Metrics
32+
itemCounter metric.Int64Counter
33+
options
34+
}
35+
36+
func (c Metrics) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error {
37+
var attrs []attribute.KeyValue
38+
attrs = append(attrs, c.options.staticDataPointAttributes...)
39+
err := c.consumer.ConsumeMetrics(ctx, md)
40+
outcome := "success"
41+
if err != nil {
42+
outcome = "failure"
43+
}
44+
attrs = append(attrs, attribute.String("outcome", outcome))
45+
c.itemCounter.Add(ctx, int64(md.ResourceMetrics().Len()), metric.WithAttributes(attrs...))
46+
return err
47+
}
48+
49+
func (c Metrics) Capabilities() consumer.Capabilities {
50+
return c.consumer.Capabilities()
51+
}

0 commit comments

Comments
 (0)