Skip to content

[chore] Move metrics initialization in service/telemetry #11185

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions service/internal/promtest/server_util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package promtest // import "go.opentelemetry.io/collector/service/internal/promtest"

import (
"net"
"strconv"
"testing"

"go.opentelemetry.io/contrib/config"

"go.opentelemetry.io/collector/internal/testutil"
)

func GetAvailableLocalIPv6AddressPrometheus(t testing.TB) *config.Prometheus {
return addrToPrometheus(testutil.GetAvailableLocalIPv6Address(t))
}

func GetAvailableLocalAddressPrometheus(t testing.TB) *config.Prometheus {
return addrToPrometheus(testutil.GetAvailableLocalAddress(t))
}

func addrToPrometheus(address string) *config.Prometheus {
host, port, err := net.SplitHostPort(address)
if err != nil {
return nil

Check warning on line 27 in service/internal/promtest/server_util.go

View check run for this annotation

Codecov / codecov/patch

service/internal/promtest/server_util.go#L27

Added line #L27 was not covered by tests
}
portInt, err := strconv.Atoi(port)
if err != nil {
return nil

Check warning on line 31 in service/internal/promtest/server_util.go

View check run for this annotation

Codecov / codecov/patch

service/internal/promtest/server_util.go#L31

Added line #L31 was not covered by tests
}
return &config.Prometheus{
Host: &host,
Port: &portInt,
}
}
25 changes: 2 additions & 23 deletions service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,6 @@
featuregate.WithRegisterDescription("controls whether the collector supports extended OpenTelemetry"+
"configuration for internal telemetry"))

// disableHighCardinalityMetricsfeatureGate is the feature gate that controls whether the collector should enable
// potentially high cardinality metrics. The gate will be removed when the collector allows for view configuration.
var disableHighCardinalityMetricsfeatureGate = featuregate.GlobalRegistry().MustRegister(
"telemetry.disableHighCardinalityMetrics",
featuregate.StageAlpha,
featuregate.WithRegisterDescription("controls whether the collector should enable potentially high"+
"cardinality metrics. The gate will be removed when the collector allows for view configuration."))

// Settings holds configuration for building a new Service.
type Settings struct {
// BuildInfo provides collector start information.
Expand Down Expand Up @@ -104,8 +96,6 @@

// New creates a new Service, its telemetry, and Components.
func New(ctx context.Context, set Settings, cfg Config) (*Service, error) {
disableHighCard := disableHighCardinalityMetricsfeatureGate.IsEnabled()

srv := &Service{
buildInfo: set.BuildInfo,
host: &graph.Host{
Expand Down Expand Up @@ -144,14 +134,7 @@

logger.Info("Setting up own telemetry...")

mp, err := newMeterProvider(
meterProviderSettings{
res: res,
cfg: cfg.Telemetry.Metrics,
asyncErrorChannel: set.AsyncErrorChannel,
},
disableHighCard,
)
mp, err := telFactory.CreateMeterProvider(ctx, telset, &cfg.Telemetry)
if err != nil {
return nil, fmt.Errorf("failed to create metric provider: %w", err)
}
Expand Down Expand Up @@ -198,11 +181,7 @@

func logsAboutMeterProvider(logger *zap.Logger, cfg telemetry.MetricsConfig, mp metric.MeterProvider) {
if cfg.Level == configtelemetry.LevelNone || (cfg.Address == "" && len(cfg.Readers) == 0) {
logger.Info(
"Skipped telemetry setup.",
zap.String(zapKeyTelemetryAddress, cfg.Address),
zap.Stringer(zapKeyTelemetryLevel, cfg.Level),
)
logger.Info("Skipped telemetry setup.")

Check warning on line 184 in service/service.go

View check run for this annotation

Codecov / codecov/patch

service/service.go#L184

Added line #L184 was not covered by tests
return
}

Expand Down
30 changes: 3 additions & 27 deletions service/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@ import (
"context"
"errors"
"fmt"
"net"
"net/http"
"strconv"
"strings"
"sync"
"testing"
Expand All @@ -35,6 +33,7 @@ import (
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/service/extensions"
"go.opentelemetry.io/collector/service/internal/builders"
"go.opentelemetry.io/collector/service/internal/promtest"
"go.opentelemetry.io/collector/service/pipelines"
"go.opentelemetry.io/collector/service/telemetry"
)
Expand Down Expand Up @@ -355,10 +354,10 @@ func testCollectorStartHelperWithReaders(t *testing.T, tc ownMetricsTestCase, ne
)
switch network {
case "tcp", "tcp4":
metricsAddr = getAvailableLocalAddressPrometheus(t)
metricsAddr = promtest.GetAvailableLocalAddressPrometheus(t)
zpagesAddr = testutil.GetAvailableLocalAddress(t)
case "tcp6":
metricsAddr = getAvailableLocalIPv6AddressPrometheus(t)
metricsAddr = promtest.GetAvailableLocalIPv6AddressPrometheus(t)
zpagesAddr = testutil.GetAvailableLocalIPv6Address(t)
}
require.NotZero(t, metricsAddr, "network must be either of tcp, tcp4 or tcp6")
Expand Down Expand Up @@ -745,26 +744,3 @@ func newConfigWatcherExtensionFactory(name component.Type) extension.Factory {
component.StabilityLevelDevelopment,
)
}

func getAvailableLocalIPv6AddressPrometheus(t testing.TB) *config.Prometheus {
return addrToPrometheus(testutil.GetAvailableLocalIPv6Address(t))
}

func getAvailableLocalAddressPrometheus(t testing.TB) *config.Prometheus {
return addrToPrometheus(testutil.GetAvailableLocalAddress(t))
}

func addrToPrometheus(address string) *config.Prometheus {
host, port, err := net.SplitHostPort(address)
if err != nil {
return nil
}
portInt, err := strconv.Atoi(port)
if err != nil {
return nil
}
return &config.Prometheus{
Host: &host,
Port: &portInt,
}
}
23 changes: 23 additions & 0 deletions service/telemetry/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,26 @@ import (
"context"
"time"

"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/featuregate"
"go.opentelemetry.io/collector/service/internal/resource"
"go.opentelemetry.io/collector/service/telemetry/internal"
)

// disableHighCardinalityMetricsfeatureGate is the feature gate that controls whether the collector should enable
// potentially high cardinality metrics. The gate will be removed when the collector allows for view configuration.
var disableHighCardinalityMetricsfeatureGate = featuregate.GlobalRegistry().MustRegister(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be moved to globalgates instead?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why? Where else is used? My understanding is that in globalgates we put things shared between components/modules.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, i just thought of it as a convenient location for all feature gates 🤣

"telemetry.disableHighCardinalityMetrics",
featuregate.StageAlpha,
featuregate.WithRegisterDescription("controls whether the collector should enable potentially high"+
"cardinality metrics. The gate will be removed when the collector allows for view configuration."))

func createDefaultConfig() component.Config {
return &Config{
Logs: LogsConfig{
Expand Down Expand Up @@ -55,5 +66,17 @@ func NewFactory() Factory {
c := *cfg.(*Config)
return newTracerProvider(ctx, set, c)
}),
internal.WithMeterProvider(func(_ context.Context, set Settings, cfg component.Config) (metric.MeterProvider, error) {
c := *cfg.(*Config)
disableHighCard := disableHighCardinalityMetricsfeatureGate.IsEnabled()
return newMeterProvider(
meterProviderSettings{
res: resource.New(set.BuildInfo, c.Resource),
cfg: c.Metrics,
asyncErrorChannel: set.AsyncErrorChannel,
},
disableHighCard,
)
}),
)
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package proctelemetry // import "go.opentelemetry.io/collector/service/internal/proctelemetry"
package otelinit // import "go.opentelemetry.io/collector/service/telemetry/internal/otelinit"

import (
"context"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package proctelemetry
package otelinit

import (
"context"
Expand Down
13 changes: 6 additions & 7 deletions service/telemetry.go → service/telemetry/metrics.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package service // import "go.opentelemetry.io/collector/service"
package telemetry // import "go.opentelemetry.io/collector/service/telemetry"

import (
"context"
Expand All @@ -19,8 +19,7 @@ import (
"go.uber.org/zap"

"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/service/internal/proctelemetry"
"go.opentelemetry.io/collector/service/telemetry"
"go.opentelemetry.io/collector/service/telemetry/internal/otelinit"
)

const (
Expand All @@ -36,7 +35,7 @@ type meterProvider struct {

type meterProviderSettings struct {
res *resource.Resource
cfg telemetry.MetricsConfig
cfg MetricsConfig
asyncErrorChannel chan error
}

Expand Down Expand Up @@ -73,7 +72,7 @@ func newMeterProvider(set meterProviderSettings, disableHighCardinality bool) (m
var opts []sdkmetric.Option
for _, reader := range set.cfg.Readers {
// https://github.com/open-telemetry/opentelemetry-collector/issues/8045
r, server, err := proctelemetry.InitMetricReader(context.Background(), reader, set.asyncErrorChannel, &mp.serverWG)
r, server, err := otelinit.InitMetricReader(context.Background(), reader, set.asyncErrorChannel, &mp.serverWG)
if err != nil {
return nil, err
}
Expand All @@ -85,15 +84,15 @@ func newMeterProvider(set meterProviderSettings, disableHighCardinality bool) (m
}

var err error
mp.MeterProvider, err = proctelemetry.InitOpenTelemetry(set.res, opts, disableHighCardinality)
mp.MeterProvider, err = otelinit.InitOpenTelemetry(set.res, opts, disableHighCardinality)
if err != nil {
return nil, err
}
return mp, nil
}

// LogAboutServers logs about the servers that are serving metrics.
func (mp *meterProvider) LogAboutServers(logger *zap.Logger, cfg telemetry.MetricsConfig) {
func (mp *meterProvider) LogAboutServers(logger *zap.Logger, cfg MetricsConfig) {
for _, server := range mp.servers {
logger.Info(
"Serving metrics",
Expand Down
30 changes: 16 additions & 14 deletions service/telemetry_test.go → service/telemetry/metrics_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package service
package telemetry

import (
"context"
Expand All @@ -19,9 +19,9 @@ import (
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/internal/testutil"
semconv "go.opentelemetry.io/collector/semconv/v1.18.0"
"go.opentelemetry.io/collector/service/internal/proctelemetry"
"go.opentelemetry.io/collector/service/internal/promtest"
"go.opentelemetry.io/collector/service/internal/resource"
"go.opentelemetry.io/collector/service/telemetry"
"go.opentelemetry.io/collector/service/telemetry/internal/otelinit"
)

const (
Expand All @@ -32,6 +32,8 @@ const (
counterName = "test_counter"
)

var testInstanceID = "test_instance_id"

func TestTelemetryInit(t *testing.T) {
type metricValue struct {
value float64
Expand All @@ -43,7 +45,7 @@ func TestTelemetryInit(t *testing.T) {
disableHighCard bool
expectedMetrics map[string]metricValue
extendedConfig bool
cfg *telemetry.Config
cfg *Config
}{
{
name: "UseOpenTelemetryForInternalMetrics",
Expand Down Expand Up @@ -128,11 +130,11 @@ func TestTelemetryInit(t *testing.T) {
{
name: "UseOTelWithSDKConfiguration",
extendedConfig: true,
cfg: &telemetry.Config{
Metrics: telemetry.MetricsConfig{
cfg: &Config{
Metrics: MetricsConfig{
Level: configtelemetry.LevelDetailed,
},
Traces: telemetry.TracesConfig{
Traces: TracesConfig{
Processors: []config.SpanProcessor{
{
Batch: &config.BatchSpanProcessor{
Expand Down Expand Up @@ -194,18 +196,18 @@ func TestTelemetryInit(t *testing.T) {
{
Pull: &config.PullMetricReader{
Exporter: config.MetricExporter{
Prometheus: getAvailableLocalAddressPrometheus(t),
Prometheus: promtest.GetAvailableLocalAddressPrometheus(t),
},
},
},
}
}
if tc.cfg == nil {
tc.cfg = &telemetry.Config{
tc.cfg = &Config{
Resource: map[string]*string{
semconv.AttributeServiceInstanceID: &testInstanceID,
},
Metrics: telemetry.MetricsConfig{
Metrics: MetricsConfig{
Level: configtelemetry.LevelDetailed,
Address: testutil.GetAvailableLocalAddress(t),
},
Expand Down Expand Up @@ -253,13 +255,13 @@ func createTestMetrics(t *testing.T, mp metric.MeterProvider) {
require.NoError(t, err)
counter.Add(context.Background(), 13)

grpcExampleCounter, err := mp.Meter(proctelemetry.GRPCInstrumentation).Int64Counter(metricPrefix + grpcPrefix + counterName)
grpcExampleCounter, err := mp.Meter(otelinit.GRPCInstrumentation).Int64Counter(metricPrefix + grpcPrefix + counterName)
require.NoError(t, err)
grpcExampleCounter.Add(context.Background(), 11, metric.WithAttributes(proctelemetry.GRPCUnacceptableKeyValues...))
grpcExampleCounter.Add(context.Background(), 11, metric.WithAttributes(otelinit.GRPCUnacceptableKeyValues...))

httpExampleCounter, err := mp.Meter(proctelemetry.HTTPInstrumentation).Int64Counter(metricPrefix + httpPrefix + counterName)
httpExampleCounter, err := mp.Meter(otelinit.HTTPInstrumentation).Int64Counter(metricPrefix + httpPrefix + counterName)
require.NoError(t, err)
httpExampleCounter.Add(context.Background(), 10, metric.WithAttributes(proctelemetry.HTTPUnacceptableKeyValues...))
httpExampleCounter.Add(context.Background(), 10, metric.WithAttributes(otelinit.HTTPUnacceptableKeyValues...))
}

func getMetricsFromPrometheus(t *testing.T, handler http.Handler) map[string]*io_prometheus_client.MetricFamily {
Expand Down
Loading