Skip to content

Permanently enable 'telemetry.newPipelineTelemetry' feature gate #12856

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Apr 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions .chloggen/lock-attributes-gate.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: service

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Set the `telemetry.newPipelineTelemetry` feature gate to stable.

# One or more tracking issues or pull requests related to the change
issues: [12856]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
The off state of this feature gate introduced a regression, where the Collector's internal logs were missing component attributes. See issue #12870 for more details on this bug. Since there does not appear to be much benefit to disabling the gate, we are immediately moving it to stable in order to lock in the correct behavior.

This comes with a breaking change, where internal logs exported through OTLP will now use instrumentation scope attributes to identify the source component instead of log attributes. This does not affect the Collector's stderr output. See the changelog for v0.123.0 for a more detailed description of the gate's effects.

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
10 changes: 2 additions & 8 deletions internal/telemetry/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ import (

var NewPipelineTelemetryGate = featuregate.GlobalRegistry().MustRegister(
"telemetry.newPipelineTelemetry",
featuregate.StageAlpha,
featuregate.StageStable,
featuregate.WithRegisterFromVersion("v0.123.0"),
featuregate.WithRegisterDescription("Instruments Collector pipelines and injects component-identifying attributes"),
featuregate.WithRegisterToVersion("v0.127.0"),
featuregate.WithRegisterReferenceURL("https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/rfcs/component-universal-telemetry.md"),
)

Expand Down Expand Up @@ -46,16 +46,10 @@ type TelemetrySettings struct {
// The publicization of this API is tracked in https://github.com/open-telemetry/opentelemetry-collector/issues/12405

func WithoutAttributes(ts TelemetrySettings, fields ...string) TelemetrySettings {
if !NewPipelineTelemetryGate.IsEnabled() {
return ts
}
return WithAttributeSet(ts, componentattribute.RemoveAttributes(ts.extraAttributes, fields...))
}

func WithAttributeSet(ts TelemetrySettings, attrs attribute.Set) TelemetrySettings {
if !NewPipelineTelemetryGate.IsEnabled() {
return ts
}
ts.extraAttributes = attrs
ts.Logger = componentattribute.ZapLoggerWithAttributes(ts.Logger, ts.extraAttributes)
ts.TracerProvider = componentattribute.TracerProviderWithAttributes(ts.TracerProvider, ts.extraAttributes)
Expand Down
11 changes: 0 additions & 11 deletions processor/memorylimiterprocessor/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (

"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/featuregate"
"go.opentelemetry.io/collector/internal/memorylimiter"
"go.opentelemetry.io/collector/internal/telemetry"
"go.opentelemetry.io/collector/internal/telemetry/componentattribute"
Expand All @@ -34,17 +33,7 @@ func TestCreateDefaultConfig(t *testing.T) {
assert.NoError(t, componenttest.CheckConfigStruct(cfg))
}

func setGate(t *testing.T, gate *featuregate.Gate, value bool) {
initialValue := gate.IsEnabled()
require.NoError(t, featuregate.GlobalRegistry().Set(gate.ID(), value))
t.Cleanup(func() {
_ = featuregate.GlobalRegistry().Set(gate.ID(), initialValue)
})
}

func TestCreateProcessor(t *testing.T) {
setGate(t, telemetry.NewPipelineTelemetryGate, true)

factory := NewFactory()
require.NotNil(t, factory)

Expand Down
2 changes: 1 addition & 1 deletion processor/memorylimiterprocessor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ require (
go.opentelemetry.io/collector/consumer/consumererror v0.124.0
go.opentelemetry.io/collector/consumer/consumertest v0.124.0
go.opentelemetry.io/collector/consumer/xconsumer v0.124.0
go.opentelemetry.io/collector/featuregate v1.30.0
go.opentelemetry.io/collector/internal/memorylimiter v0.124.0
go.opentelemetry.io/collector/internal/telemetry v0.124.0
go.opentelemetry.io/collector/pdata v1.30.0
Expand Down Expand Up @@ -58,6 +57,7 @@ require (
github.com/yusufpapurcu/wmi v1.2.4 // indirect
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
go.opentelemetry.io/collector/component/componentstatus v0.124.0 // indirect
go.opentelemetry.io/collector/featuregate v1.30.0 // indirect
go.opentelemetry.io/collector/pdata/testdata v0.124.0 // indirect
go.opentelemetry.io/contrib/bridges/otelzap v0.10.0 // indirect
go.opentelemetry.io/otel/log v0.11.0 // indirect
Expand Down
11 changes: 0 additions & 11 deletions receiver/otlpreceiver/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/consumer/xconsumer"
"go.opentelemetry.io/collector/featuregate"
"go.opentelemetry.io/collector/internal/telemetry"
"go.opentelemetry.io/collector/internal/telemetry/componentattribute"
"go.opentelemetry.io/collector/internal/testutil"
Expand All @@ -37,17 +36,7 @@ func TestCreateDefaultConfig(t *testing.T) {
assert.NoError(t, componenttest.CheckConfigStruct(cfg))
}

func setGate(t *testing.T, gate *featuregate.Gate, value bool) {
initialValue := gate.IsEnabled()
require.NoError(t, featuregate.GlobalRegistry().Set(gate.ID(), value))
t.Cleanup(func() {
_ = featuregate.GlobalRegistry().Set(gate.ID(), initialValue)
})
}

func TestCreateSameReceiver(t *testing.T) {
setGate(t, telemetry.NewPipelineTelemetryGate, true)

factory := NewFactory()
cfg := factory.CreateDefaultConfig().(*Config)
cfg.GRPC.NetAddr.Endpoint = testutil.GetAvailableLocalAddress(t)
Expand Down
2 changes: 1 addition & 1 deletion receiver/otlpreceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ require (
go.opentelemetry.io/collector/consumer/consumererror v0.124.0
go.opentelemetry.io/collector/consumer/consumertest v0.124.0
go.opentelemetry.io/collector/consumer/xconsumer v0.124.0
go.opentelemetry.io/collector/featuregate v1.30.0
go.opentelemetry.io/collector/internal/sharedcomponent v0.124.0
go.opentelemetry.io/collector/internal/telemetry v0.124.0
go.opentelemetry.io/collector/pdata v1.30.0
Expand Down Expand Up @@ -67,6 +66,7 @@ require (
go.opentelemetry.io/collector/client v1.30.0 // indirect
go.opentelemetry.io/collector/config/configcompression v1.30.0 // indirect
go.opentelemetry.io/collector/extension/extensionauth v1.30.0 // indirect
go.opentelemetry.io/collector/featuregate v1.30.0 // indirect
go.opentelemetry.io/collector/pipeline v0.124.0 // indirect
go.opentelemetry.io/contrib/bridges/otelzap v0.10.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.60.0 // indirect
Expand Down
2 changes: 1 addition & 1 deletion service/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ require (
go.opentelemetry.io/collector/receiver/xreceiver v0.124.0
go.opentelemetry.io/collector/semconv v0.124.0
go.opentelemetry.io/collector/service/hostcapabilities v0.124.0
go.opentelemetry.io/contrib/bridges/otelzap v0.10.0
go.opentelemetry.io/contrib/otelconf v0.15.0
go.opentelemetry.io/contrib/propagators/b3 v1.35.0
go.opentelemetry.io/otel v1.35.0
Expand Down Expand Up @@ -108,6 +107,7 @@ require (
go.opentelemetry.io/collector/config/configtls v1.30.0 // indirect
go.opentelemetry.io/collector/consumer/consumererror v0.124.0 // indirect
go.opentelemetry.io/collector/extension/extensionauth v1.30.0 // indirect
go.opentelemetry.io/contrib/bridges/otelzap v0.10.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.60.0 // indirect
go.opentelemetry.io/contrib/zpages v0.60.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc v0.11.0 // indirect
Expand Down
8 changes: 5 additions & 3 deletions service/internal/graph/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,12 @@ func (n *connectorNode) buildComponent(
builder *builders.ConnectorBuilder,
nexts []baseConsumer,
) error {
set := connector.Settings{ID: n.componentID, TelemetrySettings: tel, BuildInfo: info}
if telemetry.NewPipelineTelemetryGate.IsEnabled() {
set.TelemetrySettings = telemetry.WithAttributeSet(set.TelemetrySettings, *n.Set())
set := connector.Settings{
ID: n.componentID,
TelemetrySettings: telemetry.WithAttributeSet(tel, *n.Set()),
BuildInfo: info,
}

switch n.rcvrPipelineType {
case pipeline.SignalTraces:
return n.buildTraces(ctx, set, builder, nexts)
Expand Down
8 changes: 5 additions & 3 deletions service/internal/graph/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,12 @@ func (n *exporterNode) buildComponent(
info component.BuildInfo,
builder *builders.ExporterBuilder,
) error {
set := exporter.Settings{ID: n.componentID, TelemetrySettings: tel, BuildInfo: info}
if telemetry.NewPipelineTelemetryGate.IsEnabled() {
set.TelemetrySettings = telemetry.WithAttributeSet(set.TelemetrySettings, *n.Set())
set := exporter.Settings{
ID: n.componentID,
TelemetrySettings: telemetry.WithAttributeSet(tel, *n.Set()),
BuildInfo: info,
}

var err error
switch n.pipelineType {
case pipeline.SignalTraces:
Expand Down
8 changes: 5 additions & 3 deletions service/internal/graph/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,12 @@ func (n *processorNode) buildComponent(ctx context.Context,
builder *builders.ProcessorBuilder,
next baseConsumer,
) error {
set := processor.Settings{ID: n.componentID, TelemetrySettings: tel, BuildInfo: info}
if telemetry.NewPipelineTelemetryGate.IsEnabled() {
set.TelemetrySettings = telemetry.WithAttributeSet(set.TelemetrySettings, *n.Set())
set := processor.Settings{
ID: n.componentID,
TelemetrySettings: telemetry.WithAttributeSet(tel, *n.Set()),
BuildInfo: info,
}

var err error
switch n.pipelineID.Signal() {
case pipeline.SignalTraces:
Expand Down
8 changes: 5 additions & 3 deletions service/internal/graph/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,12 @@ func (n *receiverNode) buildComponent(ctx context.Context,
builder *builders.ReceiverBuilder,
nexts []baseConsumer,
) error {
set := receiver.Settings{ID: n.componentID, TelemetrySettings: tel, BuildInfo: info}
if telemetry.NewPipelineTelemetryGate.IsEnabled() {
set.TelemetrySettings = telemetry.WithAttributeSet(set.TelemetrySettings, *n.Set())
set := receiver.Settings{
ID: n.componentID,
TelemetrySettings: telemetry.WithAttributeSet(tel, *n.Set()),
BuildInfo: info,
}

var err error
switch n.pipelineType {
case pipeline.SignalTraces:
Expand Down
55 changes: 14 additions & 41 deletions service/telemetry/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,11 @@
package telemetry // import "go.opentelemetry.io/collector/service/telemetry"

import (
"go.opentelemetry.io/contrib/bridges/otelzap"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/log"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"go.opentelemetry.io/collector/internal/telemetry"
"go.opentelemetry.io/collector/internal/telemetry/componentattribute"
)

Expand Down Expand Up @@ -43,54 +41,29 @@ func newLogger(set Settings, cfg Config) (*zap.Logger, log.LoggerProvider, error

var lp log.LoggerProvider

if telemetry.NewPipelineTelemetryGate.IsEnabled() {
logger = logger.WithOptions(zap.WrapCore(func(core zapcore.Core) zapcore.Core {
core = componentattribute.NewConsoleCoreWithAttributes(core, attribute.NewSet())
logger = logger.WithOptions(zap.WrapCore(func(core zapcore.Core) zapcore.Core {
core = componentattribute.NewConsoleCoreWithAttributes(core, attribute.NewSet())

if len(cfg.Logs.Processors) > 0 && set.SDK != nil {
lp = set.SDK.LoggerProvider()

core = componentattribute.NewOTelTeeCoreWithAttributes(
core,
lp,
"go.opentelemetry.io/collector/service/telemetry",
cfg.Logs.Level,
attribute.NewSet(),
)
}

if cfg.Logs.Sampling != nil && cfg.Logs.Sampling.Enabled {
core = componentattribute.NewWrapperCoreWithAttributes(core, func(c zapcore.Core) zapcore.Core {
return newSampledCore(c, cfg.Logs.Sampling)
})
}

return core
}))
} else {
if len(cfg.Logs.Processors) > 0 && set.SDK != nil {
lp = set.SDK.LoggerProvider()

logger = logger.WithOptions(zap.WrapCore(func(c zapcore.Core) zapcore.Core {
core, err := zapcore.NewIncreaseLevelCore(zapcore.NewTee(
c,
otelzap.NewCore("go.opentelemetry.io/collector/service/telemetry",
otelzap.WithLoggerProvider(lp),
),
), zap.NewAtomicLevelAt(cfg.Logs.Level))
if err != nil {
panic(err)
}
return core
}))
core = componentattribute.NewOTelTeeCoreWithAttributes(
core,
lp,
"go.opentelemetry.io/collector/service/telemetry",
cfg.Logs.Level,
attribute.NewSet(),
)
}

if cfg.Logs.Sampling != nil && cfg.Logs.Sampling.Enabled {
logger = logger.WithOptions(zap.WrapCore(func(c zapcore.Core) zapcore.Core {
core = componentattribute.NewWrapperCoreWithAttributes(core, func(c zapcore.Core) zapcore.Core {
return newSampledCore(c, cfg.Logs.Sampling)
}))
})
}
}

return core
}))

return logger, lp, nil
}
Expand Down
39 changes: 9 additions & 30 deletions service/telemetry/logger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,26 +13,14 @@ import (
"github.com/stretchr/testify/require"
config "go.opentelemetry.io/contrib/otelconf/v0.3.0"
"go.uber.org/zap/zapcore"

"go.opentelemetry.io/collector/featuregate"
"go.opentelemetry.io/collector/internal/telemetry"
)

func setGate(t *testing.T, gate *featuregate.Gate, value bool) {
initialValue := gate.IsEnabled()
require.NoError(t, featuregate.GlobalRegistry().Set(gate.ID(), value))
t.Cleanup(func() {
_ = featuregate.GlobalRegistry().Set(gate.ID(), initialValue)
})
}

func TestNewLogger(t *testing.T) {
tests := []struct {
name string
wantCoreType any
wantCoreTypeRfc any
wantErr error
cfg Config
name string
wantCoreType any
wantErr error
cfg Config
}{
{
name: "no log config",
Expand All @@ -52,8 +40,7 @@ func TestNewLogger(t *testing.T) {
InitialFields: map[string]any{"fieldKey": "filed-value"},
},
},
wantCoreType: "*zapcore.ioCore",
wantCoreTypeRfc: "*componentattribute.consoleCoreWithAttributes",
wantCoreType: "*componentattribute.consoleCoreWithAttributes",
},
{
name: "log config with processors",
Expand All @@ -76,8 +63,7 @@ func TestNewLogger(t *testing.T) {
},
},
},
wantCoreType: "*zapcore.levelFilterCore",
wantCoreTypeRfc: "*componentattribute.otelTeeCoreWithAttributes",
wantCoreType: "*componentattribute.otelTeeCoreWithAttributes",
},
{
name: "log config with sampling",
Expand All @@ -99,8 +85,7 @@ func TestNewLogger(t *testing.T) {
InitialFields: map[string]any(nil),
},
},
wantCoreType: "*zapcore.sampler",
wantCoreTypeRfc: "*componentattribute.wrapperCoreWithAttributes",
wantCoreType: "*componentattribute.wrapperCoreWithAttributes",
},
}
for _, tt := range tests {
Expand All @@ -125,13 +110,7 @@ func TestNewLogger(t *testing.T) {
}
}
}
t.Run(tt.name, func(t *testing.T) {
setGate(t, telemetry.NewPipelineTelemetryGate, false)
testCoreType(t, tt.wantCoreType)
})
t.Run(tt.name+" (pipeline telemetry on)", func(t *testing.T) {
setGate(t, telemetry.NewPipelineTelemetryGate, true)
testCoreType(t, tt.wantCoreTypeRfc)
})

testCoreType(t, tt.wantCoreType)
}
}
Loading