From 4c67f3eb2793bfdb14e28379edd31fabc158a487 Mon Sep 17 00:00:00 2001 From: james hughes Date: Mon, 3 Jun 2024 11:45:28 -0700 Subject: [PATCH 1/3] initial kafkametrics discovery integ test using docker_observer --- .github/workflows/integration-test.yml | 44 ++++ Makefile | 4 + docker/docker-compose.yml | 13 ++ .../kafkametrics_discovery_test.go | 206 ++++++++++++++++++ ...erver_without_ssl_kafkametrics_config.yaml | 57 +++++ 5 files changed, 324 insertions(+) create mode 100644 tests/receivers/kafkametrics/kafkametrics_discovery_test.go create mode 100644 tests/receivers/kafkametrics/testdata/docker_observer_without_ssl_kafkametrics_config.yaml diff --git a/.github/workflows/integration-test.yml b/.github/workflows/integration-test.yml index b76a2ec85a..1a9ef2a465 100644 --- a/.github/workflows/integration-test.yml +++ b/.github/workflows/integration-test.yml @@ -282,3 +282,47 @@ jobs: exit $exit_status env: SPLUNK_OTEL_COLLECTOR_IMAGE: 'otelcol:latest' + integration-test-kafkametrics-discovery: + name: integration-test-kafkametrics-discovery + # Use 20.04.5 until https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/16450 is resolved + runs-on: ubuntu-20.04 + needs: [docker-otelcol] + strategy: + matrix: + ARCH: [ "amd64", "arm64" ] + fail-fast: false + steps: + - uses: actions/checkout@v4 + with: + fetch-depth: 0 + - run: docker compose -f docker/docker-compose.yml --profile integration-test-kafkametrics-discovery up -d --quiet-pull + - uses: actions/setup-go@v5 + with: + go-version: ${{ env.GO_VERSION }} + cache-dependency-path: '**/go.sum' + - uses: actions/download-artifact@v4 + with: + name: otelcol + path: ./bin + - uses: actions/download-artifact@v4 + with: + name: docker-otelcol-${{ matrix.ARCH }} + path: ./docker-otelcol/${{ matrix.ARCH }} + - uses: docker/setup-qemu-action@v3 + if: ${{ matrix.ARCH != 'amd64' }} + with: + platforms: ${{ matrix.ARCH }} + image: tonistiigi/binfmt:qemu-v7.0.0 + - run: docker load -i ./docker-otelcol/${{ matrix.ARCH }}/image.tar + - run: ln -sf otelcol_linux_${{ matrix.ARCH }} ./bin/otelcol + - run: chmod a+x ./bin/* + - name: Run Discovery Integration Test + run: | + set -o pipefail + target="integration-test-kafkametrics-discovery" + make $target 2>&1 | tee kafkametrics-${{ github.run_id }}-${{ matrix.ARCH }}.out + exit_status=${PIPESTATUS[0]} + echo "Exit status: $exit_status" + exit $exit_status + env: + SPLUNK_OTEL_COLLECTOR_IMAGE: 'otelcol:latest' diff --git a/Makefile b/Makefile index 83de351af8..bee7644122 100644 --- a/Makefile +++ b/Makefile @@ -79,6 +79,10 @@ integration-test: integration-test-mongodb-discovery: @set -e; cd tests && $(GOTEST_SERIAL) $(BUILD_INFO_TESTS) --tags=discovery_integration_mongodb -v -timeout 5m -count 1 ./... +.PHONY: integration-test-kafkametrics-discovery +integration-test-kafkametrics-discovery: + @set -e; cd tests && $(GOTEST_SERIAL) $(BUILD_INFO_TESTS) --tags=discovery_integration_kafkametrics -v -timeout 5m -count 1 ./... + .PHONY: smartagent-integration-test smartagent-integration-test: @set -e; cd tests && $(GOTEST_SERIAL) $(BUILD_INFO_TESTS) --tags=smartagent_integration -v -timeout 5m -count 1 ./... diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index b1cb354f64..9f39ad980b 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -62,12 +62,15 @@ services: depends_on: - zookeeper profiles: + - integration-test-kafkametrics-discovery - smartagent build: ./kafka environment: START_AS: broker KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 JMX_PORT: 7099 + USER: "kafka" + PASSWORD: "insecure" ports: - "7099:7099" - "9092:9092" @@ -87,6 +90,7 @@ services: depends_on: - zookeeper profiles: + - integration-test-kafkametrics-discovery - smartagent build: ./kafka environment: @@ -94,6 +98,8 @@ services: KAFKA_BROKER: kafka-broker:9092 JMX_PORT: 9099 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + USER: "kafka" + PASSWORD: "insecure" ports: - "9099:9099" healthcheck: @@ -112,6 +118,7 @@ services: depends_on: - zookeeper profiles: + - integration-test-kafkametrics-discovery - smartagent build: ./kafka environment: @@ -119,6 +126,8 @@ services: KAFKA_BROKER: kafka-broker:9092 JMX_PORT: 8099 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + USER: "kafka" + PASSWORD: "insecure" ports: - "8099:8099" healthcheck: @@ -137,12 +146,15 @@ services: depends_on: - zookeeper profiles: + - integration-test-kafkametrics-discovery - smartagent build: ./kafka environment: START_AS: create-topic KAFKA_BROKER: kafka-broker:9092 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + USER: "kafka" + PASSWORD: "insecure" kong: image: quay.io/splunko11ytest/kong:latest profiles: @@ -217,6 +229,7 @@ services: zookeeper: image: zookeeper:3.5 profiles: + - integration-test-kafkametrics-discovery - smartagent ports: - "2181:2181" diff --git a/tests/receivers/kafkametrics/kafkametrics_discovery_test.go b/tests/receivers/kafkametrics/kafkametrics_discovery_test.go new file mode 100644 index 0000000000..7a27fb55d8 --- /dev/null +++ b/tests/receivers/kafkametrics/kafkametrics_discovery_test.go @@ -0,0 +1,206 @@ +// Copyright Splunk, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build discovery_integration_kafkametrics + +package tests + +import ( + "context" + "errors" + "fmt" + "os" + "path/filepath" + "runtime" + "syscall" + "testing" + "time" + + "github.com/docker/docker/api/types/container" + "github.com/docker/docker/api/types/network" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/testcontainers/testcontainers-go" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/receiver/otlpreceiver" + "go.opentelemetry.io/collector/receiver/receivertest" +) + +type otelContainer struct { + testcontainers.Container +} + +const ( + ReceiverTypeAttr = "discovery.receiver.type" + MessageAttr = "discovery.message" + OtelEntityAttributesAttr = "otel.entity.attributes" +) + +func getDockerGID() (string, error) { + finfo, err := os.Stat("/var/run/docker.sock") + if err != nil { + return "", err + } + fsys := finfo.Sys() + stat, ok := fsys.(*syscall.Stat_t) + if !ok { + return "", fmt.Errorf("OS error occurred while trying to get GID ") + } + dockerGID := fmt.Sprintf("%d", stat.Gid) + return dockerGID, nil +} + +func kafkaMetricsAutoDiscoveryHelper(t *testing.T, ctx context.Context, configFile string, logMessageToAssert string) (*otelContainer, error) { + factory := otlpreceiver.NewFactory() + port := 16745 + c := factory.CreateDefaultConfig().(*otlpreceiver.Config) + c.GRPC.NetAddr.Endpoint = fmt.Sprintf("localhost:%d", port) + endpoint := c.GRPC.NetAddr.Endpoint + sink := &consumertest.LogsSink{} + receiver, err := factory.CreateLogsReceiver(context.Background(), receivertest.NewNopCreateSettings(), c, sink) + require.NoError(t, err) + require.NoError(t, receiver.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + require.NoError(t, receiver.Shutdown(context.Background())) + }) + + dockerGID, err := getDockerGID() + require.NoError(t, err) + + otelConfigPath, err := filepath.Abs(filepath.Join(".", "testdata", configFile)) + if err != nil { + return nil, err + } + r, err := os.Open(otelConfigPath) + if err != nil { + return nil, err + } + + currPath, err := filepath.Abs(filepath.Join(".", "testdata")) + if err != nil { + return nil, err + } + req := testcontainers.ContainerRequest{ + Image: "otelcol:latest", + HostConfigModifier: func(hc *container.HostConfig) { + hc.Binds = []string{"/var/run/docker.sock:/var/run/docker.sock"} + hc.NetworkMode = network.NetworkHost + hc.GroupAdd = []string{dockerGID} + }, + Env: map[string]string{ + "SPLUNK_REALM": "us2", + "SPLUNK_ACCESS_TOKEN": "12345", + "SPLUNK_DISCOVERY_LOG_LEVEL": "info", + "OTLP_ENDPOINT": endpoint, + "SPLUNK_OTEL_COLLECTOR_IMAGE": "otelcol:latest", + }, + Entrypoint: []string{"/otelcol", "--config", "/home/otel-local-config.yaml"}, + Files: []testcontainers.ContainerFile{ + { + Reader: r, + HostFilePath: otelConfigPath, + ContainerFilePath: "/home/otel-local-config.yaml", + FileMode: 0o777, + }, + { + HostFilePath: currPath, + ContainerFilePath: "/home/", + FileMode: 0o777, + }, + }, + } + + container, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ + ContainerRequest: req, + Started: true, + }) + if err != nil { + return nil, err + } + + seenMessageAttr := 0 + seenReceiverTypeAttr := 0 + expectedReceiver := "kafkametrics" + assert.EventuallyWithT(t, func(tt *assert.CollectT) { + //require.Equal(t, sink.LogRecordCount(), len(sink.AllLogs()), "log record count and all logs count differ, could be race") + if len(sink.AllLogs()) == 0 { + assert.Fail(tt, "No logs collected") + return + } + for i := 0; i < len(sink.AllLogs()); i++ { + plogs := sink.AllLogs()[i] + lrs := plogs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords() + for j := 0; j < lrs.Len(); j++ { + lr := lrs.At(j) + attrMap, ok := lr.Attributes().Get(OtelEntityAttributesAttr) + if ok { + m := attrMap.Map() + discoveryMsg, ok := m.Get(MessageAttr) + if ok { + seenMessageAttr++ + assert.Equal(t, logMessageToAssert, discoveryMsg.AsString()) + } + discoveryType, ok := m.Get(ReceiverTypeAttr) + if ok { + seenReceiverTypeAttr++ + assert.Equal(t, expectedReceiver, discoveryType.AsString()) + } + } + } + } + assert.Greater(t, seenMessageAttr, 0, "Did not see message '%s'", logMessageToAssert) + assert.Greater(t, seenReceiverTypeAttr, 0, "Did not see expected type '%s'", expectedReceiver) + }, 60*time.Second, 1*time.Second, "Did not get '%s' discovery in time", expectedReceiver) + + return &otelContainer{Container: container}, nil +} + +func TestIntegrationKafkaMetricsAutoDiscovery(t *testing.T) { + if runtime.GOOS == "darwin" || runtime.GOOS == "windows" { + t.Skip("Integration tests are only run on linux architecture: https://github.com/signalfx/splunk-otel-collector/blob/main/.github/workflows/integration-test.yml#L35") + } + + successfulDiscoveryMsg := `kafkametrics receiver is working!` + ctx := context.Background() + + tests := map[string]struct { + ctx context.Context + configFileName string + logMessageToAssert string + expected error + }{ + "Successful Discovery test": { + ctx: ctx, + configFileName: "docker_observer_without_ssl_kafkametrics_config.yaml", + logMessageToAssert: successfulDiscoveryMsg, + expected: nil, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + container, err := kafkaMetricsAutoDiscoveryHelper(t, test.ctx, test.configFileName, test.logMessageToAssert) + + if !errors.Is(err, test.expected) { + t.Fatalf(" Expected %v, got %v", test.expected, err) + } + t.Cleanup(func() { + if err := container.Terminate(ctx); err != nil { + t.Fatalf("failed to terminate container: %s", err) + } + }) + }) + } +} diff --git a/tests/receivers/kafkametrics/testdata/docker_observer_without_ssl_kafkametrics_config.yaml b/tests/receivers/kafkametrics/testdata/docker_observer_without_ssl_kafkametrics_config.yaml new file mode 100644 index 0000000000..6bd9e29857 --- /dev/null +++ b/tests/receivers/kafkametrics/testdata/docker_observer_without_ssl_kafkametrics_config.yaml @@ -0,0 +1,57 @@ +extensions: + docker_observer: + +receivers: + # set up otlp receiver to use its endpoints as assertion material + discovery: + embed_receiver_config: true + receivers: + kafkametrics: + config: + auth: + plain_text: + username: "kafka" + password: "insecure" + protocol_version: "2.0.0" + scrapers: + - brokers + - consumers + - topics + rule: type == "container" and any([name, image, command], {# matches "(?i)(.*)kafka(.*)"}) and not (command matches "splunk.discovery") + status: + metrics: + - status: successful + strict: kafka.brokers + message: kafkametrics receiver is working! + statements: + - status: failed + regexp: 'connect: network is unreachable' + message: The container cannot be reached by the Collector. Make sure they're in the same network. + - status: failed + regexp: 'connect: connection refused' + message: The container is refusing kafka connections. + + watch_observers: + - docker_observer + +# drop scrape_timestamp attributes until we can accept arbitrary values +processors: + +exporters: + debug: + verbosity: detailed + otlp: + endpoint: "${OTLP_ENDPOINT}" + tls: + insecure: true + +service: + telemetry: + logs: + level: debug + extensions: + - docker_observer + pipelines: + logs: + receivers: [discovery] + exporters: [otlp, debug] From edf231efb7a3976b2178a3a4fcb1d6f062a6bf2f Mon Sep 17 00:00:00 2001 From: james hughes Date: Tue, 11 Jun 2024 14:53:31 -0700 Subject: [PATCH 2/3] bugfixes --- docker/docker-compose.yml | 13 ------------- .../kafkametrics/kafkametrics_discovery_test.go | 9 ++++----- ...er_observer_without_ssl_kafkametrics_config.yaml | 7 +------ 3 files changed, 5 insertions(+), 24 deletions(-) diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 9f39ad980b..b1cb354f64 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -62,15 +62,12 @@ services: depends_on: - zookeeper profiles: - - integration-test-kafkametrics-discovery - smartagent build: ./kafka environment: START_AS: broker KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 JMX_PORT: 7099 - USER: "kafka" - PASSWORD: "insecure" ports: - "7099:7099" - "9092:9092" @@ -90,7 +87,6 @@ services: depends_on: - zookeeper profiles: - - integration-test-kafkametrics-discovery - smartagent build: ./kafka environment: @@ -98,8 +94,6 @@ services: KAFKA_BROKER: kafka-broker:9092 JMX_PORT: 9099 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 - USER: "kafka" - PASSWORD: "insecure" ports: - "9099:9099" healthcheck: @@ -118,7 +112,6 @@ services: depends_on: - zookeeper profiles: - - integration-test-kafkametrics-discovery - smartagent build: ./kafka environment: @@ -126,8 +119,6 @@ services: KAFKA_BROKER: kafka-broker:9092 JMX_PORT: 8099 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 - USER: "kafka" - PASSWORD: "insecure" ports: - "8099:8099" healthcheck: @@ -146,15 +137,12 @@ services: depends_on: - zookeeper profiles: - - integration-test-kafkametrics-discovery - smartagent build: ./kafka environment: START_AS: create-topic KAFKA_BROKER: kafka-broker:9092 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 - USER: "kafka" - PASSWORD: "insecure" kong: image: quay.io/splunko11ytest/kong:latest profiles: @@ -229,7 +217,6 @@ services: zookeeper: image: zookeeper:3.5 profiles: - - integration-test-kafkametrics-discovery - smartagent ports: - "2181:2181" diff --git a/tests/receivers/kafkametrics/kafkametrics_discovery_test.go b/tests/receivers/kafkametrics/kafkametrics_discovery_test.go index 7a27fb55d8..122266f8ed 100644 --- a/tests/receivers/kafkametrics/kafkametrics_discovery_test.go +++ b/tests/receivers/kafkametrics/kafkametrics_discovery_test.go @@ -134,7 +134,6 @@ func kafkaMetricsAutoDiscoveryHelper(t *testing.T, ctx context.Context, configFi seenReceiverTypeAttr := 0 expectedReceiver := "kafkametrics" assert.EventuallyWithT(t, func(tt *assert.CollectT) { - //require.Equal(t, sink.LogRecordCount(), len(sink.AllLogs()), "log record count and all logs count differ, could be race") if len(sink.AllLogs()) == 0 { assert.Fail(tt, "No logs collected") return @@ -150,18 +149,18 @@ func kafkaMetricsAutoDiscoveryHelper(t *testing.T, ctx context.Context, configFi discoveryMsg, ok := m.Get(MessageAttr) if ok { seenMessageAttr++ - assert.Equal(t, logMessageToAssert, discoveryMsg.AsString()) + assert.Equal(tt, logMessageToAssert, discoveryMsg.AsString()) } discoveryType, ok := m.Get(ReceiverTypeAttr) if ok { seenReceiverTypeAttr++ - assert.Equal(t, expectedReceiver, discoveryType.AsString()) + assert.Equal(tt, expectedReceiver, discoveryType.AsString()) } } } } - assert.Greater(t, seenMessageAttr, 0, "Did not see message '%s'", logMessageToAssert) - assert.Greater(t, seenReceiverTypeAttr, 0, "Did not see expected type '%s'", expectedReceiver) + assert.Greater(tt, seenMessageAttr, 0, "Did not see message '%s'", logMessageToAssert) + assert.Greater(tt, seenReceiverTypeAttr, 0, "Did not see expected type '%s'", expectedReceiver) }, 60*time.Second, 1*time.Second, "Did not get '%s' discovery in time", expectedReceiver) return &otelContainer{Container: container}, nil diff --git a/tests/receivers/kafkametrics/testdata/docker_observer_without_ssl_kafkametrics_config.yaml b/tests/receivers/kafkametrics/testdata/docker_observer_without_ssl_kafkametrics_config.yaml index 6bd9e29857..0a2878f119 100644 --- a/tests/receivers/kafkametrics/testdata/docker_observer_without_ssl_kafkametrics_config.yaml +++ b/tests/receivers/kafkametrics/testdata/docker_observer_without_ssl_kafkametrics_config.yaml @@ -8,15 +8,10 @@ receivers: receivers: kafkametrics: config: - auth: - plain_text: - username: "kafka" - password: "insecure" + client_id: "otel-integration-test" protocol_version: "2.0.0" scrapers: - brokers - - consumers - - topics rule: type == "container" and any([name, image, command], {# matches "(?i)(.*)kafka(.*)"}) and not (command matches "splunk.discovery") status: metrics: From 57351eae7091482f17ff12ebb1bf9caa16a5b244 Mon Sep 17 00:00:00 2001 From: james hughes Date: Tue, 11 Jun 2024 14:58:36 -0700 Subject: [PATCH 3/3] change to use net new kafka in docker composition --- .github/workflows/integration-test.yml | 5 ++--- docker/docker-compose.yml | 28 ++++++++++++++++++++++++++ 2 files changed, 30 insertions(+), 3 deletions(-) diff --git a/.github/workflows/integration-test.yml b/.github/workflows/integration-test.yml index 1a9ef2a465..8c4fd8c854 100644 --- a/.github/workflows/integration-test.yml +++ b/.github/workflows/integration-test.yml @@ -284,8 +284,7 @@ jobs: SPLUNK_OTEL_COLLECTOR_IMAGE: 'otelcol:latest' integration-test-kafkametrics-discovery: name: integration-test-kafkametrics-discovery - # Use 20.04.5 until https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/16450 is resolved - runs-on: ubuntu-20.04 + runs-on: ubuntu-22.04 needs: [docker-otelcol] strategy: matrix: @@ -295,7 +294,7 @@ jobs: - uses: actions/checkout@v4 with: fetch-depth: 0 - - run: docker compose -f docker/docker-compose.yml --profile integration-test-kafkametrics-discovery up -d --quiet-pull + - run: docker compose -f docker/docker-compose.yml --profile kafka-kraft-single up -d --quiet-pull - uses: actions/setup-go@v5 with: go-version: ${{ env.GO_VERSION }} diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index b1cb354f64..7ca53f1a8d 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -57,6 +57,34 @@ services: build: ./httpd_basicauth ports: - "8001:80" + # new-style kafka + kafka-kraft-single: + image: apache/kafka:3.7.0 + # following configuration is taken DIRECTLY from + # kafka examples in the upstream apache (licensed + hosted) repo + # Thanks all from the Kafka team! + # https://github.com/apache/kafka/blob/3.7.0/docker/examples/jvm/single-node/plaintext/docker-compose.yml + hostname: broker + container_name: broker + ports: + - '9092:9092' + environment: + KAFKA_NODE_ID: 1 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT' + KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT_HOST://localhost:9092,PLAINTEXT://broker:19092' + KAFKA_PROCESS_ROLES: 'broker,controller' + KAFKA_CONTROLLER_QUORUM_VOTERS: '1@broker:29093' + KAFKA_LISTENERS: 'CONTROLLER://:29093,PLAINTEXT_HOST://:9092,PLAINTEXT://:19092' + KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT' + KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER' + CLUSTER_ID: '4L6g3nShT-eMCtK--X86sw' + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs' + profiles: + - kafka-kraft-single kafka-broker: image: quay.io/splunko11ytest/kafka:latest depends_on: