Skip to content

Commit 88c3006

Browse files
zcrossatoulme
andauthored
[extension/jaegerremotesampling] gRPC remote mode propagates configured HTTP headers (#24414)
**Description:** This could be described as a fix or an improvement to the [jaegerremotesampling extension](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/extension/jaegerremotesampling). Status quo: For usages in which a `remote` source is specified using collector-wide standard [configgrpc.GRPCClientSettings](https://github.com/open-telemetry/opentelemetry-collector/blob/b5e511ce31f22fd3d4817236792245fe1bd88ef8/config/configgrpc/configgrpc.go#L54), the given HTTP headers are not actually set on outbound calls to the destination `grpcstore.SamplingManager` endpoint. After this PR: Outbound calls will add any HTTP headers specified in the gRPC client settings for the remote source. This will mean that drop-in extension usage will support use cases in which header additions are necessary for remote interactions. I took an approach that I observed in several other exporters/extensions: "enhancing" the gRPC context. **Link to tracking Issue:** N/A **Testing:** Existing extension integration tests have been updated to perform a (previously not performed) client-like HTTP call to the extension's running gRPC server, and then to verify that an observed call to a gRPC remote includes configured HTTP header additions (as gRPC metadata). **Documentation:** N/A: I assumed the behavior that this PR now implements, because the gRPC client settings config is so "standard" throughout the opentelemetry-collector repo (and other extensions in this contrib repo). --------- Co-authored-by: Antoine Toulme <[email protected]>
1 parent 2cb7308 commit 88c3006

File tree

5 files changed

+161
-40
lines changed

5 files changed

+161
-40
lines changed
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
# Use this changelog template to create an entry for release notes.
2+
# If your change doesn't affect end users, such as a test fix or a tooling change,
3+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
4+
5+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
6+
change_type: enhancement
7+
8+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
9+
component: extension/jaegerremotesampling
10+
11+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
12+
note: gRPC remote source usage in jaegerremotesampling extension propagates HTTP headers if set in gRPC client config
13+
14+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
15+
issues: [24414]
16+
17+
# (Optional) One or more lines of additional information to render under the primary note.
18+
# These lines will be padded with 2 spaces and then inserted directly into the document.
19+
# Use pipe (|) for multiline entries.
20+
subtext:

extension/jaegerremotesampling/extension.go

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77
"context"
88
"fmt"
99

10-
grpcStore "github.com/jaegertracing/jaeger/cmd/agent/app/configmanager/grpc"
1110
"github.com/jaegertracing/jaeger/cmd/collector/app/sampling/strategystore"
1211
"github.com/jaegertracing/jaeger/plugin/sampling/strategystore/static"
1312
"go.opentelemetry.io/collector/component"
@@ -62,13 +61,10 @@ func (jrse *jrsExtension) Start(ctx context.Context, host component.Host) error
6261
if jrse.cfg.Source.Remote != nil {
6362
conn, err := jrse.cfg.Source.Remote.ToClientConn(ctx, host, jrse.telemetry)
6463
if err != nil {
65-
return fmt.Errorf("error while connecting to the remote sampling source: %w", err)
64+
return fmt.Errorf("failed to create the remote strategy store: %w", err)
6665
}
67-
68-
jrse.samplingStore = grpcStore.NewConfigManager(conn)
69-
jrse.closers = append(jrse.closers, func() error {
70-
return conn.Close()
71-
})
66+
jrse.closers = append(jrse.closers, conn.Close)
67+
jrse.samplingStore = internal.NewRemoteStrategyStore(conn, jrse.cfg.Source.Remote)
7268
}
7369

7470
if jrse.cfg.HTTPServerSettings != nil {

extension/jaegerremotesampling/extension_test.go

Lines changed: 88 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"context"
88
"fmt"
99
"net"
10+
"net/http"
1011
"path/filepath"
1112
"testing"
1213

@@ -15,7 +16,10 @@ import (
1516
"github.com/stretchr/testify/require"
1617
"go.opentelemetry.io/collector/component/componenttest"
1718
"go.opentelemetry.io/collector/config/configgrpc"
19+
"go.opentelemetry.io/collector/config/configopaque"
20+
"go.opentelemetry.io/collector/config/configtls"
1821
"google.golang.org/grpc"
22+
"google.golang.org/grpc/metadata"
1923
)
2024

2125
func TestNewExtension(t *testing.T) {
@@ -41,44 +45,97 @@ func TestStartAndShutdownLocalFile(t *testing.T) {
4145
assert.NoError(t, e.Shutdown(context.Background()))
4246
}
4347

44-
func TestStartAndShutdownRemote(t *testing.T) {
45-
// prepare the socket the mock server will listen at
46-
lis, err := net.Listen("tcp", "127.0.0.1:0")
47-
require.NoError(t, err)
48-
49-
// create the mock server
50-
server := grpc.NewServer()
51-
52-
// register the service
53-
api_v2.RegisterSamplingManagerServer(server, &samplingServer{})
54-
55-
go func() {
56-
err = server.Serve(lis)
57-
require.NoError(t, err)
58-
}()
59-
60-
// create the config, pointing to the mock server
61-
cfg := testConfig()
62-
cfg.GRPCServerSettings.NetAddr.Endpoint = "127.0.0.1:0"
63-
cfg.Source.Remote = &configgrpc.GRPCClientSettings{
64-
Endpoint: fmt.Sprintf("127.0.0.1:%d", lis.Addr().(*net.TCPAddr).Port),
65-
WaitForReady: true,
48+
func TestStartAndCallAndShutdownRemote(t *testing.T) {
49+
for _, tc := range []struct {
50+
name string
51+
remoteClientHeaderConfig map[string]configopaque.String
52+
}{
53+
{
54+
name: "no configured header additions",
55+
},
56+
{
57+
name: "configured header additions",
58+
remoteClientHeaderConfig: map[string]configopaque.String{
59+
"testheadername": "testheadervalue",
60+
"anotherheadername": "anotherheadervalue",
61+
},
62+
},
63+
} {
64+
t.Run(tc.name, func(t *testing.T) {
65+
66+
// prepare the socket the mock server will listen at
67+
lis, err := net.Listen("tcp", "127.0.0.1:0")
68+
require.NoError(t, err)
69+
70+
// create the mock server
71+
server := grpc.NewServer()
72+
73+
// register the service
74+
mockServer := &samplingServer{}
75+
api_v2.RegisterSamplingManagerServer(server, mockServer)
76+
77+
go func() {
78+
err = server.Serve(lis)
79+
require.NoError(t, err)
80+
}()
81+
82+
// create the config, pointing to the mock server
83+
cfg := testConfig()
84+
cfg.GRPCServerSettings.NetAddr.Endpoint = "127.0.0.1:0"
85+
cfg.Source.Remote = &configgrpc.GRPCClientSettings{
86+
Endpoint: fmt.Sprintf("127.0.0.1:%d", lis.Addr().(*net.TCPAddr).Port),
87+
TLSSetting: configtls.TLSClientSetting{
88+
Insecure: true, // test only
89+
},
90+
WaitForReady: true,
91+
Headers: tc.remoteClientHeaderConfig,
92+
}
93+
94+
// create the extension
95+
e := newExtension(cfg, componenttest.NewNopTelemetrySettings())
96+
require.NotNil(t, e)
97+
98+
// start the server
99+
assert.NoError(t, e.Start(context.Background(), componenttest.NewNopHost()))
100+
101+
// make a call
102+
resp, err := http.Get("http://127.0.0.1:5778/sampling?service=foo")
103+
assert.NoError(t, err)
104+
assert.Equal(t, 200, resp.StatusCode)
105+
106+
// shut down the server
107+
assert.NoError(t, e.Shutdown(context.Background()))
108+
109+
// verify observed calls
110+
assert.Len(t, mockServer.observedCalls, 1)
111+
singleCall := mockServer.observedCalls[0]
112+
assert.Equal(t, &api_v2.SamplingStrategyParameters{
113+
ServiceName: "foo",
114+
}, singleCall.params)
115+
md, ok := metadata.FromIncomingContext(singleCall.ctx)
116+
assert.True(t, ok)
117+
for expectedHeaderName, expectedHeaderValue := range tc.remoteClientHeaderConfig {
118+
assert.Equal(t, []string{string(expectedHeaderValue)}, md.Get(expectedHeaderName))
119+
}
120+
})
66121
}
67-
68-
// create the extension
69-
e := newExtension(cfg, componenttest.NewNopTelemetrySettings())
70-
require.NotNil(t, e)
71-
72-
// test
73-
assert.NoError(t, e.Start(context.Background(), componenttest.NewNopHost()))
74-
assert.NoError(t, e.Shutdown(context.Background()))
75122
}
76123

77124
type samplingServer struct {
78125
api_v2.UnimplementedSamplingManagerServer
126+
observedCalls []observedCall
127+
}
128+
129+
type observedCall struct {
130+
ctx context.Context
131+
params *api_v2.SamplingStrategyParameters
79132
}
80133

81-
func (s samplingServer) GetSamplingStrategy(_ context.Context, _ *api_v2.SamplingStrategyParameters) (*api_v2.SamplingStrategyResponse, error) {
134+
func (s *samplingServer) GetSamplingStrategy(ctx context.Context, params *api_v2.SamplingStrategyParameters) (*api_v2.SamplingStrategyResponse, error) {
135+
s.observedCalls = append(s.observedCalls, observedCall{
136+
ctx: ctx,
137+
params: params,
138+
})
82139
return &api_v2.SamplingStrategyResponse{
83140
StrategyType: api_v2.SamplingStrategyType_PROBABILISTIC,
84141
}, nil

extension/jaegerremotesampling/go.mod

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ require (
99
go.opentelemetry.io/collector/config/configgrpc v0.82.0
1010
go.opentelemetry.io/collector/config/confighttp v0.82.0
1111
go.opentelemetry.io/collector/config/confignet v0.82.0
12+
go.opentelemetry.io/collector/config/configopaque v0.82.0
13+
go.opentelemetry.io/collector/config/configtls v0.82.0
1214
go.opentelemetry.io/collector/confmap v0.82.0
1315
go.opentelemetry.io/collector/extension v0.82.0
1416
go.uber.org/zap v1.24.0
@@ -51,9 +53,7 @@ require (
5153
go.opentelemetry.io/collector v0.82.0 // indirect
5254
go.opentelemetry.io/collector/config/configauth v0.82.0 // indirect
5355
go.opentelemetry.io/collector/config/configcompression v0.82.0 // indirect
54-
go.opentelemetry.io/collector/config/configopaque v0.82.0 // indirect
5556
go.opentelemetry.io/collector/config/configtelemetry v0.82.0 // indirect
56-
go.opentelemetry.io/collector/config/configtls v0.82.0 // indirect
5757
go.opentelemetry.io/collector/config/internal v0.82.0 // indirect
5858
go.opentelemetry.io/collector/extension/auth v0.82.0 // indirect
5959
go.opentelemetry.io/collector/featuregate v1.0.0-rcv0014 // indirect
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package internal // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/jaegerremotesampling/internal"
5+
6+
import (
7+
"context"
8+
9+
grpcstore "github.com/jaegertracing/jaeger/cmd/agent/app/configmanager/grpc"
10+
"github.com/jaegertracing/jaeger/cmd/collector/app/sampling/strategystore"
11+
"github.com/jaegertracing/jaeger/thrift-gen/sampling"
12+
"go.opentelemetry.io/collector/config/configgrpc"
13+
"go.opentelemetry.io/collector/config/configopaque"
14+
"google.golang.org/grpc"
15+
"google.golang.org/grpc/metadata"
16+
)
17+
18+
type grpcRemoteStrategyStore struct {
19+
headerAdditions map[string]configopaque.String
20+
delegate *grpcstore.SamplingManager
21+
}
22+
23+
// NewRemoteStrategyStore returns a StrategyStore that delegates to the configured Jaeger gRPC endpoint, making
24+
// extension-configured enhancements (header additions only for now) to the gRPC context of every outbound gRPC call.
25+
// Note: it would be nice to expand the configuration surface to include an optional TTL-based caching behavior
26+
// for service-specific outbound GetSamplingStrategy calls.
27+
func NewRemoteStrategyStore(
28+
conn *grpc.ClientConn,
29+
grpcClientSettings *configgrpc.GRPCClientSettings,
30+
) strategystore.StrategyStore {
31+
return &grpcRemoteStrategyStore{
32+
headerAdditions: grpcClientSettings.Headers,
33+
delegate: grpcstore.NewConfigManager(conn),
34+
}
35+
}
36+
37+
func (g *grpcRemoteStrategyStore) GetSamplingStrategy(ctx context.Context, serviceName string) (*sampling.SamplingStrategyResponse, error) {
38+
return g.delegate.GetSamplingStrategy(g.enhanceContext(ctx), serviceName)
39+
}
40+
41+
// This function is used to add the extension configuration defined HTTP headers to a given outbound gRPC call's context.
42+
func (g *grpcRemoteStrategyStore) enhanceContext(ctx context.Context) context.Context {
43+
md := metadata.New(nil)
44+
for k, v := range g.headerAdditions {
45+
md.Set(k, string(v))
46+
}
47+
return metadata.NewOutgoingContext(ctx, md)
48+
}

0 commit comments

Comments
 (0)