Skip to content

Commit 89a469f

Browse files
bogdandrutucparkins
authored andcommitted
[chore] fix resourcetotelemetry usage in carbonexporter (open-telemetry#29888)
In open-telemetry#29879 I forgot to actually plugin the logic, only the config was added there. Added a test to confirm it. Signed-off-by: Bogdan Drutu <[email protected]>
1 parent 9def75b commit 89a469f

File tree

2 files changed

+51
-22
lines changed

2 files changed

+51
-22
lines changed

exporter/carbonexporter/exporter.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ import (
1212
"go.opentelemetry.io/collector/exporter"
1313
"go.opentelemetry.io/collector/exporter/exporterhelper"
1414
"go.opentelemetry.io/collector/pdata/pmetric"
15+
16+
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/resourcetotelemetry"
1517
)
1618

1719
// newCarbonExporter returns a new Carbon exporter.
@@ -20,7 +22,7 @@ func newCarbonExporter(cfg *Config, set exporter.CreateSettings) (exporter.Metri
2022
connPool: newTCPConnPool(cfg.Endpoint, cfg.Timeout),
2123
}
2224

23-
return exporterhelper.NewMetricsExporter(
25+
exp, err := exporterhelper.NewMetricsExporter(
2426
context.TODO(),
2527
set,
2628
cfg,
@@ -29,6 +31,11 @@ func newCarbonExporter(cfg *Config, set exporter.CreateSettings) (exporter.Metri
2931
exporterhelper.WithQueue(cfg.QueueConfig),
3032
exporterhelper.WithRetry(cfg.RetryConfig),
3133
exporterhelper.WithShutdown(sender.Shutdown))
34+
if err != nil {
35+
return nil, err
36+
}
37+
38+
return resourcetotelemetry.WrapMetricsExporter(cfg.ResourceToTelemetryConfig, exp), nil
3239
}
3340

3441
// carbonSender is the struct tying the translation function and the TCP

exporter/carbonexporter/exporter_test.go

Lines changed: 43 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
conventions "go.opentelemetry.io/collector/semconv/v1.9.0"
2828

2929
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/testutil"
30+
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/resourcetotelemetry"
3031
)
3132

3233
func TestNewWithDefaultConfig(t *testing.T) {
@@ -45,10 +46,31 @@ func TestConsumeMetricsNoServer(t *testing.T) {
4546
exportertest.NewNopCreateSettings())
4647
require.NoError(t, err)
4748
require.NoError(t, exp.Start(context.Background(), componenttest.NewNopHost()))
48-
require.Error(t, exp.ConsumeMetrics(context.Background(), generateLargeBatch()))
49+
require.Error(t, exp.ConsumeMetrics(context.Background(), generateSmallBatch()))
4950
require.NoError(t, exp.Shutdown(context.Background()))
5051
}
5152

53+
func TestConsumeMetricsWithResourceToTelemetry(t *testing.T) {
54+
addr := testutil.GetAvailableLocalAddress(t)
55+
cs := newCarbonServer(t, addr, "test_0;k0=v0;k1=v1;service.name=test_carbon 0")
56+
// Each metric point will generate one Carbon line, set up the wait
57+
// for all of them.
58+
cs.start(t, 1)
59+
60+
exp, err := newCarbonExporter(
61+
&Config{
62+
TCPAddr: confignet.TCPAddr{Endpoint: addr},
63+
TimeoutSettings: exporterhelper.TimeoutSettings{Timeout: 5 * time.Second},
64+
ResourceToTelemetryConfig: resourcetotelemetry.Settings{Enabled: true},
65+
},
66+
exportertest.NewNopCreateSettings())
67+
require.NoError(t, err)
68+
require.NoError(t, exp.Start(context.Background(), componenttest.NewNopHost()))
69+
require.NoError(t, exp.ConsumeMetrics(context.Background(), generateSmallBatch()))
70+
assert.NoError(t, exp.Shutdown(context.Background()))
71+
cs.shutdownAndVerify(t)
72+
}
73+
5274
func TestConsumeMetrics(t *testing.T) {
5375
if runtime.GOOS == "windows" {
5476
t.Skip("skipping test on windows, see https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/10147")
@@ -94,7 +116,7 @@ func TestConsumeMetrics(t *testing.T) {
94116
for _, tt := range tests {
95117
t.Run(tt.name, func(t *testing.T) {
96118
addr := testutil.GetAvailableLocalAddress(t)
97-
cs := newCarbonServer(t, addr)
119+
cs := newCarbonServer(t, addr, "")
98120
// Each metric point will generate one Carbon line, set up the wait
99121
// for all of them.
100122
cs.start(t, tt.numProducers*tt.writesPerProducer*tt.md.DataPointCount())
@@ -133,25 +155,21 @@ func TestConsumeMetrics(t *testing.T) {
133155
}
134156

135157
func generateSmallBatch() pmetric.Metrics {
136-
metrics := pmetric.NewMetrics()
137-
m := metrics.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty()
138-
m.SetName("test_gauge")
139-
dp := m.SetEmptyGauge().DataPoints().AppendEmpty()
140-
dp.Attributes().PutStr("k0", "v0")
141-
dp.Attributes().PutStr("k1", "v1")
142-
dp.SetTimestamp(pcommon.NewTimestampFromTime(time.Now()))
143-
dp.SetDoubleValue(123)
144-
return metrics
158+
return generateMetricsBatch(1)
145159
}
146160

147161
func generateLargeBatch() pmetric.Metrics {
162+
return generateMetricsBatch(1024)
163+
}
164+
165+
func generateMetricsBatch(size int) pmetric.Metrics {
148166
ts := time.Now()
149167
metrics := pmetric.NewMetrics()
150168
rm := metrics.ResourceMetrics().AppendEmpty()
151169
rm.Resource().Attributes().PutStr(conventions.AttributeServiceName, "test_carbon")
152170
ms := rm.ScopeMetrics().AppendEmpty().Metrics()
153171

154-
for i := 0; i < 1028; i++ {
172+
for i := 0; i < size; i++ {
155173
m := ms.AppendEmpty()
156174
m.SetName("test_" + strconv.Itoa(i))
157175
dp := m.SetEmptyGauge().DataPoints().AppendEmpty()
@@ -165,19 +183,21 @@ func generateLargeBatch() pmetric.Metrics {
165183
}
166184

167185
type carbonServer struct {
168-
ln *net.TCPListener
169-
doneServer *atomic.Bool
170-
wg sync.WaitGroup
186+
ln *net.TCPListener
187+
doneServer *atomic.Bool
188+
wg sync.WaitGroup
189+
expectedContainsValue string
171190
}
172191

173-
func newCarbonServer(t *testing.T, addr string) *carbonServer {
192+
func newCarbonServer(t *testing.T, addr string, expectedContainsValue string) *carbonServer {
174193
laddr, err := net.ResolveTCPAddr("tcp", addr)
175194
require.NoError(t, err)
176195
ln, err := net.ListenTCP("tcp", laddr)
177196
require.NoError(t, err)
178197
return &carbonServer{
179-
ln: ln,
180-
doneServer: &atomic.Bool{},
198+
ln: ln,
199+
doneServer: &atomic.Bool{},
200+
expectedContainsValue: expectedContainsValue,
181201
}
182202
}
183203

@@ -198,14 +218,16 @@ func (cs *carbonServer) start(t *testing.T, numExpectedReq int) {
198218

199219
reader := bufio.NewReader(conn)
200220
for {
201-
// Actual metric validation is done by other tests, here it
202-
// is just flow.
203-
_, err := reader.ReadBytes(byte('\n'))
221+
buf, err := reader.ReadBytes(byte('\n'))
204222
if errors.Is(err, io.EOF) {
205223
return
206224
}
207225
require.NoError(t, err)
208226

227+
if cs.expectedContainsValue != "" {
228+
assert.Contains(t, string(buf), cs.expectedContainsValue)
229+
}
230+
209231
cs.wg.Done()
210232
}
211233
}(conn)

0 commit comments

Comments
 (0)