Skip to content

Commit bfb2d24

Browse files
committed
PR review improvements
1 parent 53a58b0 commit bfb2d24

File tree

10 files changed

+69
-69
lines changed

10 files changed

+69
-69
lines changed

component/otelcol/config_grpc.go

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -137,11 +137,11 @@ type GRPCClientArguments struct {
137137
TLS TLSClientArguments `river:"tls,block,optional"`
138138
Keepalive *KeepaliveClientArguments `river:"keepalive,block,optional"`
139139

140-
ReadBufferSize units.Base2Bytes `river:"read_buffer_size,attr,optional"`
141-
WriteBufferSize units.Base2Bytes `river:"write_buffer_size,attr,optional"`
142-
WaitForReady bool `river:"wait_for_ready,attr,optional"`
143-
Headers map[string]configopaque.String `river:"headers,attr,optional"`
144-
BalancerName string `river:"balancer_name,attr,optional"`
140+
ReadBufferSize units.Base2Bytes `river:"read_buffer_size,attr,optional"`
141+
WriteBufferSize units.Base2Bytes `river:"write_buffer_size,attr,optional"`
142+
WaitForReady bool `river:"wait_for_ready,attr,optional"`
143+
Headers map[string]string `river:"headers,attr,optional"`
144+
BalancerName string `river:"balancer_name,attr,optional"`
145145

146146
// Auth is a binding to an otelcol.auth.* component extension which handles
147147
// authentication.
@@ -154,6 +154,11 @@ func (args *GRPCClientArguments) Convert() *otelconfiggrpc.GRPCClientSettings {
154154
return nil
155155
}
156156

157+
opaqueHeaders := make(map[string]configopaque.String)
158+
for headerName, headerVal := range args.Headers {
159+
opaqueHeaders[headerName] = configopaque.String(headerVal)
160+
}
161+
157162
// Configure the authentication if args.Auth is set.
158163
var auth *otelconfigauth.Authentication
159164
if args.Auth != nil {
@@ -171,7 +176,7 @@ func (args *GRPCClientArguments) Convert() *otelconfiggrpc.GRPCClientSettings {
171176
ReadBufferSize: int(args.ReadBufferSize),
172177
WriteBufferSize: int(args.WriteBufferSize),
173178
WaitForReady: args.WaitForReady,
174-
Headers: args.Headers,
179+
Headers: opaqueHeaders,
175180
BalancerName: args.BalancerName,
176181

177182
Auth: auth,

component/otelcol/config_http.go

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import (
88
otelcomponent "go.opentelemetry.io/collector/component"
99
otelconfigauth "go.opentelemetry.io/collector/config/configauth"
1010
otelconfighttp "go.opentelemetry.io/collector/config/confighttp"
11-
otelconfigopaque "go.opentelemetry.io/collector/config/configopaque"
11+
"go.opentelemetry.io/collector/config/configopaque"
1212
otelextension "go.opentelemetry.io/collector/extension"
1313
)
1414

@@ -80,10 +80,10 @@ type HTTPClientArguments struct {
8080

8181
TLS TLSClientArguments `river:"tls,block,optional"`
8282

83-
ReadBufferSize units.Base2Bytes `river:"read_buffer_size,attr,optional"`
84-
WriteBufferSize units.Base2Bytes `river:"write_buffer_size,attr,optional"`
85-
Timeout time.Duration `river:"timeout,attr,optional"`
86-
Headers map[string]otelconfigopaque.String `river:"headers,attr,optional"`
83+
ReadBufferSize units.Base2Bytes `river:"read_buffer_size,attr,optional"`
84+
WriteBufferSize units.Base2Bytes `river:"write_buffer_size,attr,optional"`
85+
Timeout time.Duration `river:"timeout,attr,optional"`
86+
Headers map[string]string `river:"headers,attr,optional"`
8787
// CustomRoundTripper func(next http.RoundTripper) (http.RoundTripper, error) TODO (@tpaschalis)
8888
MaxIdleConns *int `river:"max_idle_conns,attr,optional"`
8989
MaxIdleConnsPerHost *int `river:"max_idle_conns_per_host,attr,optional"`
@@ -107,6 +107,11 @@ func (args *HTTPClientArguments) Convert() *otelconfighttp.HTTPClientSettings {
107107
auth = &otelconfigauth.Authentication{AuthenticatorID: args.Auth.ID}
108108
}
109109

110+
opaqueHeaders := make(map[string]configopaque.String)
111+
for headerName, headerVal := range args.Headers {
112+
opaqueHeaders[headerName] = configopaque.String(headerVal)
113+
}
114+
110115
return &otelconfighttp.HTTPClientSettings{
111116
Endpoint: args.Endpoint,
112117

@@ -117,7 +122,7 @@ func (args *HTTPClientArguments) Convert() *otelconfighttp.HTTPClientSettings {
117122
ReadBufferSize: int(args.ReadBufferSize),
118123
WriteBufferSize: int(args.WriteBufferSize),
119124
Timeout: args.Timeout,
120-
Headers: args.Headers,
125+
Headers: opaqueHeaders,
121126
// CustomRoundTripper: func(http.RoundTripper) (http.RoundTripper, error) { panic("not implemented") }, TODO (@tpaschalis)
122127
MaxIdleConns: args.MaxIdleConns,
123128
MaxIdleConnsPerHost: args.MaxIdleConnsPerHost,

component/otelcol/exporter/jaeger/jaeger.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import (
99
"github.com/grafana/agent/component/otelcol/exporter"
1010
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/jaegerexporter"
1111
otelcomponent "go.opentelemetry.io/collector/component"
12-
"go.opentelemetry.io/collector/config/configopaque"
1312
otelpexporterhelper "go.opentelemetry.io/collector/exporter/exporterhelper"
1413
otelextension "go.opentelemetry.io/collector/extension"
1514
)
@@ -83,7 +82,7 @@ type GRPCClientArguments otelcol.GRPCClientArguments
8382
// DefaultGRPCClientArguments holds component-specific default settings for
8483
// GRPCClientArguments.
8584
var DefaultGRPCClientArguments = GRPCClientArguments{
86-
Headers: map[string]configopaque.String{},
85+
Headers: map[string]string{},
8786
Compression: otelcol.CompressionTypeGzip,
8887
WriteBufferSize: 512 * 1024,
8988
}

component/otelcol/exporter/otlp/otlp.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88
"github.com/grafana/agent/component/otelcol"
99
"github.com/grafana/agent/component/otelcol/exporter"
1010
otelcomponent "go.opentelemetry.io/collector/component"
11-
"go.opentelemetry.io/collector/config/configopaque"
1211
otelpexporterhelper "go.opentelemetry.io/collector/exporter/exporterhelper"
1312
"go.opentelemetry.io/collector/exporter/otlpexporter"
1413
otelextension "go.opentelemetry.io/collector/extension"
@@ -83,7 +82,7 @@ type GRPCClientArguments otelcol.GRPCClientArguments
8382
// DefaultGRPCClientArguments holds component-specific default settings for
8483
// GRPCClientArguments.
8584
var DefaultGRPCClientArguments = GRPCClientArguments{
86-
Headers: map[string]configopaque.String{},
85+
Headers: map[string]string{},
8786
Compression: otelcol.CompressionTypeGzip,
8887
WriteBufferSize: 512 * 1024,
8988
}

component/otelcol/exporter/otlphttp/otlphttp.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import (
99
"github.com/grafana/agent/component/otelcol"
1010
"github.com/grafana/agent/component/otelcol/exporter"
1111
otelcomponent "go.opentelemetry.io/collector/component"
12-
"go.opentelemetry.io/collector/config/configopaque"
1312
"go.opentelemetry.io/collector/exporter/otlphttpexporter"
1413
otelextension "go.opentelemetry.io/collector/extension"
1514
)
@@ -98,7 +97,7 @@ var (
9897
IdleConnTimeout: &DefaultIdleConnTimeout,
9998

10099
Timeout: 30 * time.Second,
101-
Headers: map[string]configopaque.String{},
100+
Headers: map[string]string{},
102101
Compression: otelcol.CompressionTypeGzip,
103102
ReadBufferSize: 0,
104103
WriteBufferSize: 512 * 1024,

component/otelcol/receiver/prometheus/internal/transaction_test.go

Lines changed: 26 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -61,33 +61,25 @@ var (
6161
)
6262

6363
func TestTransactionCommitWithoutAdding(t *testing.T) {
64-
recv, err := nopObsRecv()
65-
assert.NoError(t, err)
66-
tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, consumertest.NewNop(), nil, receivertest.NewNopCreateSettings(), recv)
64+
tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, consumertest.NewNop(), nil, receivertest.NewNopCreateSettings(), nopObsRecv(t))
6765
assert.NoError(t, tr.Commit())
6866
}
6967

7068
func TestTransactionRollbackDoesNothing(t *testing.T) {
71-
recv, err := nopObsRecv()
72-
assert.NoError(t, err)
73-
tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, consumertest.NewNop(), nil, receivertest.NewNopCreateSettings(), recv)
69+
tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, consumertest.NewNop(), nil, receivertest.NewNopCreateSettings(), nopObsRecv(t))
7470
assert.NoError(t, tr.Rollback())
7571
}
7672

7773
func TestTransactionUpdateMetadataDoesNothing(t *testing.T) {
78-
recv, err := nopObsRecv()
79-
assert.NoError(t, err)
80-
tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, consumertest.NewNop(), nil, receivertest.NewNopCreateSettings(), recv)
81-
_, err = tr.UpdateMetadata(0, labels.New(), metadata.Metadata{})
74+
tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, consumertest.NewNop(), nil, receivertest.NewNopCreateSettings(), nopObsRecv(t))
75+
_, err := tr.UpdateMetadata(0, labels.New(), metadata.Metadata{})
8276
assert.NoError(t, err)
8377
}
8478

8579
func TestTransactionAppendNoTarget(t *testing.T) {
86-
recv, err := nopObsRecv()
87-
assert.NoError(t, err)
8880
badLabels := labels.FromStrings(model.MetricNameLabel, "counter_test")
89-
tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, consumertest.NewNop(), nil, receivertest.NewNopCreateSettings(), recv)
90-
_, err = tr.Append(0, badLabels, time.Now().Unix()*1000, 1.0)
81+
tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, consumertest.NewNop(), nil, receivertest.NewNopCreateSettings(), nopObsRecv(t))
82+
_, err := tr.Append(0, badLabels, time.Now().Unix()*1000, 1.0)
9183
assert.Error(t, err)
9284
}
9385

@@ -96,20 +88,16 @@ func TestTransactionAppendNoMetricName(t *testing.T) {
9688
model.InstanceLabel: "localhost:8080",
9789
model.JobLabel: "test2",
9890
})
99-
recv, err := nopObsRecv()
100-
assert.NoError(t, err)
101-
tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, consumertest.NewNop(), nil, receivertest.NewNopCreateSettings(), recv)
102-
_, err = tr.Append(0, jobNotFoundLb, time.Now().Unix()*1000, 1.0)
91+
tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, consumertest.NewNop(), nil, receivertest.NewNopCreateSettings(), nopObsRecv(t))
92+
_, err := tr.Append(0, jobNotFoundLb, time.Now().Unix()*1000, 1.0)
10393
assert.ErrorIs(t, err, errMetricNameNotFound)
10494

10595
assert.ErrorIs(t, tr.Commit(), errNoDataToBuild)
10696
}
10797

10898
func TestTransactionAppendEmptyMetricName(t *testing.T) {
109-
recv, err := nopObsRecv()
110-
assert.NoError(t, err)
111-
tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, consumertest.NewNop(), nil, receivertest.NewNopCreateSettings(), recv)
112-
_, err = tr.Append(0, labels.FromMap(map[string]string{
99+
tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, consumertest.NewNop(), nil, receivertest.NewNopCreateSettings(), nopObsRecv(t))
100+
_, err := tr.Append(0, labels.FromMap(map[string]string{
113101
model.InstanceLabel: "localhost:8080",
114102
model.JobLabel: "test2",
115103
model.MetricNameLabel: "",
@@ -119,10 +107,8 @@ func TestTransactionAppendEmptyMetricName(t *testing.T) {
119107

120108
func TestTransactionAppendResource(t *testing.T) {
121109
sink := new(consumertest.MetricsSink)
122-
recv, err := nopObsRecv()
123-
assert.NoError(t, err)
124-
tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, sink, nil, receivertest.NewNopCreateSettings(), recv)
125-
_, err = tr.Append(0, labels.FromMap(map[string]string{
110+
tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, sink, nil, receivertest.NewNopCreateSettings(), nopObsRecv(t))
111+
_, err := tr.Append(0, labels.FromMap(map[string]string{
126112
model.InstanceLabel: "localhost:8080",
127113
model.JobLabel: "test",
128114
model.MetricNameLabel: "counter_test",
@@ -150,20 +136,16 @@ func TestTransactionCommitErrorWhenAdjusterError(t *testing.T) {
150136
})
151137
sink := new(consumertest.MetricsSink)
152138
adjusterErr := errors.New("adjuster error")
153-
recv, err := nopObsRecv()
154-
assert.NoError(t, err)
155-
tr := newTransaction(scrapeCtx, &errorAdjuster{err: adjusterErr}, sink, nil, receivertest.NewNopCreateSettings(), recv)
156-
_, err = tr.Append(0, goodLabels, time.Now().Unix()*1000, 1.0)
139+
tr := newTransaction(scrapeCtx, &errorAdjuster{err: adjusterErr}, sink, nil, receivertest.NewNopCreateSettings(), nopObsRecv(t))
140+
_, err := tr.Append(0, goodLabels, time.Now().Unix()*1000, 1.0)
157141
assert.NoError(t, err)
158142
assert.ErrorIs(t, tr.Commit(), adjusterErr)
159143
}
160144

161145
// Ensure that we reject duplicate label keys. See https://github.com/open-telemetry/wg-prometheus/issues/44.
162146
func TestTransactionAppendDuplicateLabels(t *testing.T) {
163147
sink := new(consumertest.MetricsSink)
164-
recv, err := nopObsRecv()
165-
assert.NoError(t, err)
166-
tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, sink, nil, receivertest.NewNopCreateSettings(), recv)
148+
tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, sink, nil, receivertest.NewNopCreateSettings(), nopObsRecv(t))
167149

168150
dupLabels := labels.FromStrings(
169151
model.InstanceLabel, "0.0.0.0:8855",
@@ -174,49 +156,48 @@ func TestTransactionAppendDuplicateLabels(t *testing.T) {
174156
"z", "9",
175157
)
176158

177-
_, err = tr.Append(0, dupLabels, 1917, 1.0)
159+
_, err := tr.Append(0, dupLabels, 1917, 1.0)
178160
require.Error(t, err)
179161
assert.Contains(t, err.Error(), `invalid sample: non-unique label names: "a"`)
180162
}
181163

182164
func TestTransactionAppendHistogramNoLe(t *testing.T) {
183165
sink := new(consumertest.MetricsSink)
184-
recv, err := nopObsRecv()
185-
assert.NoError(t, err)
186-
tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, sink, nil, receivertest.NewNopCreateSettings(), recv)
166+
tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, sink, nil, receivertest.NewNopCreateSettings(), nopObsRecv(t))
187167

188168
goodLabels := labels.FromStrings(
189169
model.InstanceLabel, "0.0.0.0:8855",
190170
model.JobLabel, "test",
191171
model.MetricNameLabel, "hist_test_bucket",
192172
)
193173

194-
_, err = tr.Append(0, goodLabels, 1917, 1.0)
174+
_, err := tr.Append(0, goodLabels, 1917, 1.0)
195175
require.ErrorIs(t, err, errEmptyLeLabel)
196176
}
197177

198178
func TestTransactionAppendSummaryNoQuantile(t *testing.T) {
199179
sink := new(consumertest.MetricsSink)
200-
recv, err := nopObsRecv()
201-
assert.NoError(t, err)
202-
tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, sink, nil, receivertest.NewNopCreateSettings(), recv)
180+
tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, sink, nil, receivertest.NewNopCreateSettings(), nopObsRecv(t))
203181

204182
goodLabels := labels.FromStrings(
205183
model.InstanceLabel, "0.0.0.0:8855",
206184
model.JobLabel, "test",
207185
model.MetricNameLabel, "summary_test",
208186
)
209187

210-
_, err = tr.Append(0, goodLabels, 1917, 1.0)
188+
_, err := tr.Append(0, goodLabels, 1917, 1.0)
211189
require.ErrorIs(t, err, errEmptyQuantileLabel)
212190
}
213191

214-
func nopObsRecv() (*obsreport.Receiver, error) {
215-
return obsreport.NewReceiver(obsreport.ReceiverSettings{
192+
func nopObsRecv(t *testing.T) *obsreport.Receiver {
193+
res, err := obsreport.NewReceiver(obsreport.ReceiverSettings{
216194
ReceiverID: component.NewID("prometheus"),
217195
Transport: transport,
218196
ReceiverCreateSettings: receivertest.NewNopCreateSettings(),
219197
})
198+
199+
assert.NoError(t, err)
200+
return res
220201
}
221202

222203
func TestMetricBuilderCounters(t *testing.T) {
@@ -1047,9 +1028,7 @@ func (tt buildTestData) run(t *testing.T) {
10471028
st := ts
10481029
for i, page := range tt.inputs {
10491030
sink := new(consumertest.MetricsSink)
1050-
recv, err := nopObsRecv()
1051-
assert.NoError(t, err)
1052-
tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, sink, nil, receivertest.NewNopCreateSettings(), recv)
1031+
tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, sink, nil, receivertest.NewNopCreateSettings(), nopObsRecv(t))
10531032
for _, pt := range page.pts {
10541033
// set ts for testing
10551034
pt.t = st

docs/sources/flow/reference/components/otelcol.auth.bearer.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ configuration.
5151

5252
`otelcol.auth.bearer` does not expose any component-specific debug information.
5353

54-
## Example
54+
## Examples
5555

5656
### Default scheme via gRPC transport
5757

pkg/traces/instance.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,14 +132,28 @@ func (i *Instance) buildAndStartPipeline(ctx context.Context, cfg InstanceConfig
132132
Version: build.Version,
133133
}
134134

135+
// useOtelForInternalMetrics is required so that the Collector service configures Collector components using the Otel SDK
136+
// instead of OpenCensus. If this is not specified, then the OtelMetricViews and OtelMetricReader parameters which we
137+
// pass to service.New() below will not be taken into account. This would mean that metrics from custom components such as
138+
// the one in pkg/traces/servicegraphprocessor would not work.
139+
//
140+
// disableHighCardinalityMetrics is required so that we don't include labels containing ports and IP addresses in gRPC metrics.
141+
// Example metric with high cardinality...
142+
// rpc_server_duration_bucket{net_sock_peer_addr="127.0.0.1",net_sock_peer_port="59947",rpc_grpc_status_code="0",rpc_method="Export",rpc_service="opentelemetry.proto.collector.trace.v1.TraceService",rpc_system="grpc",traces_config="default",le="7500"} 294
143+
// ... the same metric when disableHighCardinalityMetrics is switched on looks like this:
144+
// rpc_server_duration_bucket{rpc_grpc_status_code="0",rpc_method="Export",rpc_service="opentelemetry.proto.collector.trace.v1.TraceService",rpc_system="grpc",traces_config="default",le="7500"} 32
145+
// For more context:
146+
// https://opentelemetry.io/docs/specs/otel/metrics/semantic_conventions/rpc-metrics/
147+
// https://github.com/open-telemetry/opentelemetry-go-contrib/pull/2700
148+
// https://github.com/open-telemetry/opentelemetry-collector/pull/6788/files
135149
err = enableOtelFeatureGates(
136150
"telemetry.useOtelForInternalMetrics",
137151
"telemetry.disableHighCardinalityMetrics")
138152
if err != nil {
139153
return err
140154
}
141155

142-
promExporter, err := traceutils.PromeheusExporter(reg)
156+
promExporter, err := traceutils.PrometheusExporter(reg)
143157
if err != nil {
144158
return fmt.Errorf("error creating otel prometheus exporter: %w", err)
145159
}

pkg/traces/internal/traceutils/otel_meter_settings.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import (
55
otelprom "go.opentelemetry.io/otel/exporters/prometheus"
66
)
77

8-
func PromeheusExporter(reg prometheus.Registerer) (*otelprom.Exporter, error) {
8+
func PrometheusExporter(reg prometheus.Registerer) (*otelprom.Exporter, error) {
99
return otelprom.New(
1010
otelprom.WithRegisterer(reg),
1111
otelprom.WithoutUnits(),

pkg/traces/servicegraphprocessor/processor_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ func TestConsumeMetrics(t *testing.T) {
109109
}
110110

111111
func getTestMeterProvider(t *testing.T, reg prometheus.Registerer) *sdkmetric.MeterProvider {
112-
promExporter, err := traceutils.PromeheusExporter(reg)
112+
promExporter, err := traceutils.PrometheusExporter(reg)
113113
require.NoError(t, err)
114114

115115
mp := sdkmetric.NewMeterProvider(

0 commit comments

Comments
 (0)