Skip to content

Commit 1f50d4d

Browse files
authored
[chore] Integration tests: move sink initialization to a common package (#1697)
1 parent faf2847 commit 1f50d4d

File tree

8 files changed

+138
-219
lines changed

8 files changed

+138
-219
lines changed

functional_tests/configuration_switching/configuration_switching_test.go

Lines changed: 10 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,8 @@ import (
1515
"text/template"
1616
"time"
1717

18-
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/splunkhecreceiver"
19-
"go.opentelemetry.io/collector/component/componenttest"
2018
"go.opentelemetry.io/collector/pdata/plog"
2119
"go.opentelemetry.io/collector/pdata/pmetric"
22-
"go.opentelemetry.io/collector/receiver/receivertest"
2320
"gopkg.in/yaml.v3"
2421

2522
"github.com/stretchr/testify/assert"
@@ -37,12 +34,8 @@ import (
3734
)
3835

3936
const (
40-
hecReceiverPort = 8090
41-
hecMetricsReceiverPort = 8091
42-
apiPort = 8881
43-
hecLogsObjectsReceiverPort = 8092
44-
testDir = "testdata"
45-
valuesDir = "values"
37+
testDir = "testdata"
38+
valuesDir = "values"
4639
)
4740

4841
var globalSinks *sinks
@@ -57,14 +50,10 @@ type sinks struct {
5750

5851
func setupOnce(t *testing.T) *sinks {
5952
setupRun.Do(func() {
60-
// create an API server
61-
internal.CreateApiServer(t, apiPort)
62-
// set ingest pipelines
63-
logs, metrics := setupHEC(t)
6453
globalSinks = &sinks{
65-
logsConsumer: logs,
66-
hecMetricsConsumer: metrics,
67-
logsObjectsConsumer: setupHECLogsObjects(t),
54+
logsConsumer: internal.SetupHECLogsSink(t),
55+
hecMetricsConsumer: internal.SetupHECMetricsSink(t),
56+
logsObjectsConsumer: internal.SetupHECObjectsSink(t),
6857
}
6958
if os.Getenv("TEARDOWN_BEFORE_SETUP") == "true" {
7059
teardown(t)
@@ -92,8 +81,8 @@ func deployChartsAndApps(t *testing.T, valuesFileName string, repl map[string]in
9281
require.Fail(t, "Host endpoint not found")
9382
}
9483
replacements := map[string]interface{}{
95-
"LogHecEndpoint": fmt.Sprintf("http://%s:%d", hostEp, hecReceiverPort),
96-
"MetricHecEndpoint": fmt.Sprintf("http://%s:%d/services/collector", hostEp, hecMetricsReceiverPort),
84+
"LogHecEndpoint": fmt.Sprintf("http://%s:%d", hostEp, internal.HECLogsReceiverPort),
85+
"MetricHecEndpoint": fmt.Sprintf("http://%s:%d/services/collector", hostEp, internal.HECMetricsReceiverPort),
9786
}
9887
for k, v := range repl {
9988
replacements[k] = v
@@ -315,7 +304,7 @@ func testClusterReceiverEnabledOrDisabled(t *testing.T) {
315304
if len(hostEp) == 0 {
316305
require.Fail(t, "Host endpoint not found")
317306
}
318-
logsObjectsHecEndpoint := fmt.Sprintf("http://%s:%d/services/collector", hostEp, hecLogsObjectsReceiverPort)
307+
logsObjectsHecEndpoint := fmt.Sprintf("http://%s:%d/services/collector", hostEp, internal.HECObjectsReceiverPort)
319308

320309
t.Run("check cluster receiver enabled", func(t *testing.T) {
321310
replacements := map[string]interface{}{
@@ -358,7 +347,7 @@ func testVerifyLogsAndMetricsAttributes(t *testing.T) {
358347
t.Run("verify cluster receiver attributes", func(t *testing.T) {
359348
valuesFileName := "values_cluster_receiver_only.yaml.tmpl"
360349
logsObjectsConsumer := setupOnce(t).logsObjectsConsumer
361-
logsObjectsHecEndpoint := fmt.Sprintf("http://%s:%d/services/collector", hostEp, hecLogsObjectsReceiverPort)
350+
logsObjectsHecEndpoint := fmt.Sprintf("http://%s:%d/services/collector", hostEp, internal.HECObjectsReceiverPort)
362351

363352
replacements := map[string]interface{}{
364353
"ClusterReceiverEnabled": true,
@@ -381,7 +370,7 @@ func testVerifyLogsAndMetricsAttributes(t *testing.T) {
381370
t.Run("verify cluster receiver metrics attributes", func(t *testing.T) {
382371
valuesFileName := "values_cluster_receiver_only.yaml.tmpl"
383372
hecMetricsConsumer := setupOnce(t).hecMetricsConsumer
384-
logsObjectsHecEndpoint := fmt.Sprintf("http://%s:%d/services/collector", hostEp, hecLogsObjectsReceiverPort)
373+
logsObjectsHecEndpoint := fmt.Sprintf("http://%s:%d/services/collector", hostEp, internal.HECObjectsReceiverPort)
385374

386375
replacements := map[string]interface{}{
387376
"ClusterReceiverEnabled": true,
@@ -626,51 +615,3 @@ func uninstallDeployment(t *testing.T) {
626615
t.Logf("Uninstalled release: %v", uninstallResponse)
627616
waitForAllPodsToBeRemoved(t, "default")
628617
}
629-
630-
func setupHEC(t *testing.T) (*consumertest.LogsSink, *consumertest.MetricsSink) {
631-
// the splunkhecreceiver does poorly at receiving logs and metrics. Use separate ports for now.
632-
f := splunkhecreceiver.NewFactory()
633-
cfg := f.CreateDefaultConfig().(*splunkhecreceiver.Config)
634-
cfg.Endpoint = fmt.Sprintf("0.0.0.0:%d", hecReceiverPort)
635-
636-
mCfg := f.CreateDefaultConfig().(*splunkhecreceiver.Config)
637-
mCfg.Endpoint = fmt.Sprintf("0.0.0.0:%d", hecMetricsReceiverPort)
638-
639-
lc := new(consumertest.LogsSink)
640-
mc := new(consumertest.MetricsSink)
641-
rcvr, err := f.CreateLogs(context.Background(), receivertest.NewNopSettings(f.Type()), cfg, lc)
642-
mrcvr, err := f.CreateMetrics(context.Background(), receivertest.NewNopSettings(f.Type()), mCfg, mc)
643-
require.NoError(t, err)
644-
645-
require.NoError(t, rcvr.Start(context.Background(), componenttest.NewNopHost()))
646-
require.NoError(t, err, "failed creating logs receiver")
647-
t.Cleanup(func() {
648-
assert.NoError(t, rcvr.Shutdown(context.Background()))
649-
})
650-
651-
require.NoError(t, mrcvr.Start(context.Background(), componenttest.NewNopHost()))
652-
require.NoError(t, err, "failed creating metrics receiver")
653-
t.Cleanup(func() {
654-
assert.NoError(t, mrcvr.Shutdown(context.Background()))
655-
})
656-
657-
return lc, mc
658-
}
659-
660-
func setupHECLogsObjects(t *testing.T) *consumertest.LogsSink {
661-
f := splunkhecreceiver.NewFactory()
662-
cfg := f.CreateDefaultConfig().(*splunkhecreceiver.Config)
663-
cfg.Endpoint = fmt.Sprintf("0.0.0.0:%d", hecLogsObjectsReceiverPort)
664-
665-
lc := new(consumertest.LogsSink)
666-
rcvr, err := f.CreateLogs(context.Background(), receivertest.NewNopSettings(f.Type()), cfg, lc)
667-
require.NoError(t, err)
668-
669-
require.NoError(t, rcvr.Start(context.Background(), componenttest.NewNopHost()))
670-
require.NoError(t, err, "failed creating logs receiver")
671-
t.Cleanup(func() {
672-
assert.NoError(t, rcvr.Shutdown(context.Background()))
673-
})
674-
675-
return lc
676-
}

functional_tests/functional/functional_test.go

Lines changed: 10 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,13 @@ import (
1919
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden"
2020
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/pmetrictest"
2121
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/ptracetest"
22-
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/splunkhecreceiver"
2322
"github.com/stretchr/testify/assert"
2423
"github.com/stretchr/testify/require"
25-
"go.opentelemetry.io/collector/component/componenttest"
2624
"go.opentelemetry.io/collector/consumer/consumertest"
2725
"go.opentelemetry.io/collector/pdata/pcommon"
2826
"go.opentelemetry.io/collector/pdata/plog"
2927
"go.opentelemetry.io/collector/pdata/pmetric"
3028
"go.opentelemetry.io/collector/pdata/ptrace"
31-
"go.opentelemetry.io/collector/receiver/otlpreceiver"
32-
"go.opentelemetry.io/collector/receiver/receivertest"
3329
"gopkg.in/yaml.v3"
3430
"helm.sh/helm/v3/pkg/action"
3531
"helm.sh/helm/v3/pkg/kube"
@@ -53,14 +49,8 @@ import (
5349
)
5450

5551
const (
56-
hecReceiverPort = 8090
57-
hecMetricsReceiverPort = 8091
58-
hecLogsObjectsReceiverPort = 8092
5952
signalFxReceiverPort = 9443
6053
signalFxReceiverK8sClusterReceiverPort = 19443
61-
otlpReceiverPort = 4317
62-
otlpHTTPReceiverPort = 4318
63-
apiPort = 8881
6454
kindTestKubeEnv = "kind"
6555
eksTestKubeEnv = "eks"
6656
autopilotTestKubeEnv = "gke/autopilot"
@@ -93,17 +83,15 @@ type sinks struct {
9383
func setupOnce(t *testing.T) *sinks {
9484
setupRun.Do(func() {
9585
// create an API server
96-
internal.CreateApiServer(t, apiPort)
97-
// set ingest pipelines
98-
logs, metrics := setupHEC(t)
86+
internal.SetupSignalFxApiServer(t)
9987
globalSinks = &sinks{
100-
logsConsumer: logs,
101-
hecMetricsConsumer: metrics,
102-
logsObjectsConsumer: setupHECLogsObjects(t),
88+
logsConsumer: internal.SetupHECLogsSink(t),
89+
hecMetricsConsumer: internal.SetupHECMetricsSink(t),
90+
logsObjectsConsumer: internal.SetupHECObjectsSink(t),
10391
agentMetricsConsumer: internal.SetupSignalfxReceiver(t, signalFxReceiverPort),
10492
k8sclusterReceiverMetricsConsumer: internal.SetupSignalfxReceiver(t,
10593
signalFxReceiverK8sClusterReceiverPort),
106-
tracesConsumer: setupTraces(t),
94+
tracesConsumer: internal.SetupOTLPTracesSink(t),
10795
}
10896
if os.Getenv("TEARDOWN_BEFORE_SETUP") == "true" {
10997
teardown(t)
@@ -225,11 +213,11 @@ func deployChartsAndApps(t *testing.T) {
225213
}{
226214
fmt.Sprintf("http://%s:%d", hostEp, signalFxReceiverK8sClusterReceiverPort),
227215
fmt.Sprintf("http://%s:%d", hostEp, signalFxReceiverPort),
228-
fmt.Sprintf("http://%s:%d", hostEp, hecReceiverPort),
229-
fmt.Sprintf("http://%s:%d/services/collector", hostEp, hecMetricsReceiverPort),
230-
fmt.Sprintf("%s:%d", hostEp, otlpReceiverPort),
231-
fmt.Sprintf("http://%s:%d", hostEp, apiPort),
232-
fmt.Sprintf("http://%s:%d/services/collector", hostEp, hecLogsObjectsReceiverPort),
216+
fmt.Sprintf("http://%s:%d", hostEp, internal.HECLogsReceiverPort),
217+
fmt.Sprintf("http://%s:%d/services/collector", hostEp, internal.HECMetricsReceiverPort),
218+
fmt.Sprintf("%s:%d", hostEp, internal.OTLPGRPCReceiverPort),
219+
fmt.Sprintf("http://%s:%d", hostEp, internal.SignalFxAPIPort),
220+
fmt.Sprintf("http://%s:%d/services/collector", hostEp, internal.HECObjectsReceiverPort),
233221
kubeTestEnv,
234222
}
235223
tmpl, err := template.New("").Parse(string(valuesBytes))
@@ -1449,73 +1437,6 @@ func waitForAllNamespacesToBeCreated(t *testing.T, client *kubernetes.Clientset)
14491437
}, 5*time.Minute, 10*time.Second)
14501438
}
14511439

1452-
func setupTraces(t *testing.T) *consumertest.TracesSink {
1453-
tc := new(consumertest.TracesSink)
1454-
f := otlpreceiver.NewFactory()
1455-
cfg := f.CreateDefaultConfig().(*otlpreceiver.Config)
1456-
cfg.Protocols.GRPC.NetAddr.Endpoint = fmt.Sprintf("0.0.0.0:%d", otlpReceiverPort)
1457-
cfg.Protocols.HTTP.Endpoint = fmt.Sprintf("0.0.0.0:%d", otlpHTTPReceiverPort)
1458-
1459-
rcvr, err := f.CreateTraces(context.Background(), receivertest.NewNopSettings(f.Type()), cfg, tc)
1460-
require.NoError(t, err)
1461-
1462-
require.NoError(t, rcvr.Start(context.Background(), componenttest.NewNopHost()))
1463-
require.NoError(t, err, "failed creating traces receiver")
1464-
t.Cleanup(func() {
1465-
assert.NoError(t, rcvr.Shutdown(context.Background()))
1466-
})
1467-
1468-
return tc
1469-
}
1470-
1471-
func setupHEC(t *testing.T) (*consumertest.LogsSink, *consumertest.MetricsSink) {
1472-
// the splunkhecreceiver does poorly at receiving logs and metrics. Use separate ports for now.
1473-
f := splunkhecreceiver.NewFactory()
1474-
cfg := f.CreateDefaultConfig().(*splunkhecreceiver.Config)
1475-
cfg.Endpoint = fmt.Sprintf("0.0.0.0:%d", hecReceiverPort)
1476-
1477-
mCfg := f.CreateDefaultConfig().(*splunkhecreceiver.Config)
1478-
mCfg.Endpoint = fmt.Sprintf("0.0.0.0:%d", hecMetricsReceiverPort)
1479-
1480-
lc := new(consumertest.LogsSink)
1481-
mc := new(consumertest.MetricsSink)
1482-
rcvr, err := f.CreateLogs(context.Background(), receivertest.NewNopSettings(f.Type()), cfg, lc)
1483-
mrcvr, err := f.CreateMetrics(context.Background(), receivertest.NewNopSettings(f.Type()), mCfg, mc)
1484-
require.NoError(t, err)
1485-
1486-
require.NoError(t, rcvr.Start(context.Background(), componenttest.NewNopHost()))
1487-
require.NoError(t, err, "failed creating logs receiver")
1488-
t.Cleanup(func() {
1489-
assert.NoError(t, rcvr.Shutdown(context.Background()))
1490-
})
1491-
1492-
require.NoError(t, mrcvr.Start(context.Background(), componenttest.NewNopHost()))
1493-
require.NoError(t, err, "failed creating metrics receiver")
1494-
t.Cleanup(func() {
1495-
assert.NoError(t, mrcvr.Shutdown(context.Background()))
1496-
})
1497-
1498-
return lc, mc
1499-
}
1500-
1501-
func setupHECLogsObjects(t *testing.T) *consumertest.LogsSink {
1502-
f := splunkhecreceiver.NewFactory()
1503-
cfg := f.CreateDefaultConfig().(*splunkhecreceiver.Config)
1504-
cfg.Endpoint = fmt.Sprintf("0.0.0.0:%d", hecLogsObjectsReceiverPort)
1505-
1506-
lc := new(consumertest.LogsSink)
1507-
rcvr, err := f.CreateLogs(context.Background(), receivertest.NewNopSettings(f.Type()), cfg, lc)
1508-
require.NoError(t, err)
1509-
1510-
require.NoError(t, rcvr.Start(context.Background(), componenttest.NewNopHost()))
1511-
require.NoError(t, err, "failed creating logs receiver")
1512-
t.Cleanup(func() {
1513-
assert.NoError(t, rcvr.Shutdown(context.Background()))
1514-
})
1515-
1516-
return lc
1517-
}
1518-
15191440
func checkMetricsAreEmitted(t *testing.T, mc *consumertest.MetricsSink, metricNames []string, matchFn func(string, pcommon.Map) bool) {
15201441
metricsToFind := map[string]bool{}
15211442
for _, name := range metricNames {

functional_tests/histogram/histogram_test.go

Lines changed: 1 addition & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ package histogram
55

66
import (
77
"bytes"
8-
"context"
98
"fmt"
109
"os"
1110
"path/filepath"
@@ -17,13 +16,10 @@ import (
1716

1817
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden"
1918
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/pmetrictest"
20-
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/signalfxreceiver"
2119
"github.com/stretchr/testify/assert"
2220
"github.com/stretchr/testify/require"
23-
"go.opentelemetry.io/collector/component/componenttest"
2421
"go.opentelemetry.io/collector/consumer/consumertest"
2522
"go.opentelemetry.io/collector/pdata/pmetric"
26-
"go.opentelemetry.io/collector/receiver/receivertest"
2723
"gopkg.in/yaml.v3"
2824
"helm.sh/helm/v3/pkg/action"
2925
"helm.sh/helm/v3/pkg/kube"
@@ -43,7 +39,7 @@ var histogramMetricsConsumer *consumertest.MetricsSink
4339

4440
func setupOnce(t *testing.T) *consumertest.MetricsSink {
4541
setupRun.Do(func() {
46-
histogramMetricsConsumer = setupOtlpReceiver(t, otlpReceiverPort)
42+
histogramMetricsConsumer = internal.SetupSignalfxReceiver(t, otlpReceiverPort)
4743

4844
if os.Getenv("TEARDOWN_BEFORE_SETUP") == "true" {
4945
teardown(t)
@@ -59,24 +55,6 @@ func setupOnce(t *testing.T) *consumertest.MetricsSink {
5955
return histogramMetricsConsumer
6056
}
6157

62-
func setupOtlpReceiver(t *testing.T, port int) *consumertest.MetricsSink {
63-
mc := new(consumertest.MetricsSink)
64-
f := signalfxreceiver.NewFactory()
65-
cfg := f.CreateDefaultConfig().(*signalfxreceiver.Config)
66-
cfg.Endpoint = fmt.Sprintf("0.0.0.0:%d", port)
67-
68-
rcvr, err := f.CreateMetrics(context.Background(), receivertest.NewNopSettings(f.Type()), cfg, mc)
69-
require.NoError(t, err)
70-
71-
require.NoError(t, rcvr.Start(context.Background(), componenttest.NewNopHost()))
72-
require.NoError(t, err, "failed creating metrics receiver")
73-
t.Cleanup(func() {
74-
assert.NoError(t, rcvr.Shutdown(context.Background()))
75-
})
76-
77-
return mc
78-
}
79-
8058
func deployChartsAndApps(t *testing.T) {
8159
testKubeConfig, setKubeConfig := os.LookupEnv("KUBECONFIG")
8260
require.True(t, setKubeConfig, "the environment variable KUBECONFIG must be set")

functional_tests/internal/api_server.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,17 @@ import (
1313
"github.com/stretchr/testify/require"
1414
)
1515

16-
func CreateApiServer(t *testing.T, port int) {
16+
const SignalFxAPIPort = 8881
17+
18+
func SetupSignalFxApiServer(t *testing.T) {
1719
mux := http.NewServeMux()
1820
mux.HandleFunc("/", func(writer http.ResponseWriter, request *http.Request) {
1921
writer.WriteHeader(200)
2022
})
2123

2224
_, cancelCtx := context.WithCancel(context.Background())
2325
s := &http.Server{
24-
Addr: fmt.Sprintf("0.0.0.0:%d", port),
26+
Addr: fmt.Sprintf("0.0.0.0:%d", SignalFxAPIPort),
2527
Handler: mux,
2628
}
2729

0 commit comments

Comments
 (0)