Skip to content

Commit dddd44c

Browse files
authored
[grpc][v2] Implement gRPC v2 factory (#6968)
<!-- !! Please DELETE this comment before posting. We appreciate your contribution to the Jaeger project! πŸ‘‹πŸŽ‰ --> ## Which problem is this PR solving? - Towards #6965 ## Description of the changes - This PR implements the following factories for gRPC storage - `io.Closer` - `tracestore.Factory` - `depstore.Factory` ## How was this change tested? - Added unit tests ## Checklist - [x] I have read https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md - [x] I have signed all commits - [x] I have added unit tests for the new functionality - [x] I have run lint and test steps successfully - for `jaeger`: `make lint test` - for `jaeger-ui`: `npm run lint` and `npm run test` --------- Signed-off-by: Mahad Zaryab <[email protected]>
1 parent a76ea22 commit dddd44c

File tree

4 files changed

+290
-0
lines changed

4 files changed

+290
-0
lines changed

β€Žinternal/storage/v2/grpc/config.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
// Copyright (c) 2025 The Jaeger Authors.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package grpc
5+
6+
import (
7+
"time"
8+
9+
"go.opentelemetry.io/collector/config/configgrpc"
10+
"go.opentelemetry.io/collector/exporter/exporterhelper"
11+
12+
"github.com/jaegertracing/jaeger/internal/tenancy"
13+
)
14+
15+
type Config struct {
16+
Tenancy tenancy.Options `mapstructure:"multi_tenancy"`
17+
configgrpc.ClientConfig `mapstructure:",squash"`
18+
exporterhelper.TimeoutConfig `mapstructure:",squash"`
19+
}
20+
21+
func DefaultConfig() Config {
22+
return Config{
23+
TimeoutConfig: exporterhelper.TimeoutConfig{
24+
Timeout: time.Duration(5 * time.Second),
25+
},
26+
}
27+
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
// Copyright (c) 2025 The Jaeger Authors.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package grpc
5+
6+
import (
7+
"testing"
8+
9+
"github.com/stretchr/testify/require"
10+
)
11+
12+
func TestDefaultConfig(t *testing.T) {
13+
cfg := DefaultConfig()
14+
require.NotEmpty(t, cfg.Timeout)
15+
}
Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
// Copyright (c) 2025 The Jaeger Authors.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package grpc
5+
6+
import (
7+
"context"
8+
"errors"
9+
"fmt"
10+
"io"
11+
12+
"go.opentelemetry.io/collector/component"
13+
"go.opentelemetry.io/collector/config/configgrpc"
14+
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
15+
"go.opentelemetry.io/otel/trace"
16+
"go.opentelemetry.io/otel/trace/noop"
17+
"google.golang.org/grpc"
18+
19+
"github.com/jaegertracing/jaeger/internal/bearertoken"
20+
"github.com/jaegertracing/jaeger/internal/storage/v2/api/depstore"
21+
"github.com/jaegertracing/jaeger/internal/storage/v2/api/tracestore"
22+
"github.com/jaegertracing/jaeger/internal/telemetry"
23+
"github.com/jaegertracing/jaeger/internal/tenancy"
24+
)
25+
26+
var (
27+
_ io.Closer = (*Factory)(nil)
28+
_ tracestore.Factory = (*Factory)(nil)
29+
_ depstore.Factory = (*Factory)(nil)
30+
)
31+
32+
type Factory struct {
33+
telset telemetry.Settings
34+
config Config
35+
// readerConn is the gRPC connection used for reading data from the remote storage backend.
36+
// It is safe for this connection to have instrumentation enabled without
37+
// the risk of recursively generating traces.
38+
readerConn *grpc.ClientConn
39+
// writerConn is the gRPC connection used for writing data to the remote storage backend.
40+
// This connection should not have instrumentation enabled to avoid recursively generating traces.
41+
writerConn *grpc.ClientConn
42+
}
43+
44+
// NewFactory initializes a new gRPC (remote) storage backend.
45+
func NewFactory(
46+
cfg Config,
47+
telset telemetry.Settings,
48+
) (*Factory, error) {
49+
f := &Factory{
50+
telset: telset,
51+
config: cfg,
52+
}
53+
54+
readerTelset := getTelset(f.telset, f.telset.TracerProvider)
55+
writerTelset := getTelset(f.telset, noop.NewTracerProvider())
56+
newClientFn := func(telset component.TelemetrySettings, opts ...grpc.DialOption) (conn *grpc.ClientConn, err error) {
57+
clientOpts := make([]configgrpc.ToClientConnOption, 0)
58+
for _, opt := range opts {
59+
clientOpts = append(clientOpts, configgrpc.WithGrpcDialOption(opt))
60+
}
61+
return f.config.ToClientConn(context.Background(), f.telset.Host, telset, clientOpts...)
62+
}
63+
64+
if err := f.initializeConnections(readerTelset, writerTelset, newClientFn); err != nil {
65+
return nil, err
66+
}
67+
68+
return f, nil
69+
}
70+
71+
func (f *Factory) CreateTraceReader() (tracestore.Reader, error) {
72+
return NewTraceReader(f.readerConn), nil
73+
}
74+
75+
func (f *Factory) CreateTraceWriter() (tracestore.Writer, error) {
76+
return NewTraceWriter(f.writerConn), nil
77+
}
78+
79+
func (f *Factory) CreateDependencyReader() (depstore.Reader, error) {
80+
return NewDependencyReader(f.readerConn), nil
81+
}
82+
83+
func (f *Factory) Close() error {
84+
var errs []error
85+
if f.readerConn != nil {
86+
errs = append(errs, f.readerConn.Close())
87+
}
88+
if f.writerConn != nil {
89+
errs = append(errs, f.writerConn.Close())
90+
}
91+
return errors.Join(errs...)
92+
}
93+
94+
func getTelset(
95+
telset telemetry.Settings,
96+
tracerProvider trace.TracerProvider,
97+
) component.TelemetrySettings {
98+
return component.TelemetrySettings{
99+
Logger: telset.Logger,
100+
TracerProvider: tracerProvider,
101+
MeterProvider: telset.MeterProvider,
102+
}
103+
}
104+
105+
type newClientFn func(telset component.TelemetrySettings, opts ...grpc.DialOption) (*grpc.ClientConn, error)
106+
107+
func (f *Factory) initializeConnections(
108+
readerTelset, writerTelset component.TelemetrySettings,
109+
newClient newClientFn,
110+
) error {
111+
if f.config.Auth != nil {
112+
return errors.New("authenticator is not supported")
113+
}
114+
115+
unaryInterceptors := []grpc.UnaryClientInterceptor{bearertoken.NewUnaryClientInterceptor()}
116+
streamInterceptors := []grpc.StreamClientInterceptor{bearertoken.NewStreamClientInterceptor()}
117+
118+
if tenancyMgr := tenancy.NewManager(&f.config.Tenancy); tenancyMgr.Enabled {
119+
unaryInterceptors = append(unaryInterceptors, tenancy.NewClientUnaryInterceptor(tenancyMgr))
120+
streamInterceptors = append(streamInterceptors, tenancy.NewClientStreamInterceptor(tenancyMgr))
121+
}
122+
123+
baseOpts := []grpc.DialOption{
124+
grpc.WithChainUnaryInterceptor(unaryInterceptors...),
125+
grpc.WithChainStreamInterceptor(streamInterceptors...),
126+
}
127+
128+
createConn := func(telset component.TelemetrySettings) (*grpc.ClientConn, error) {
129+
opts := append(baseOpts, grpc.WithStatsHandler(
130+
otelgrpc.NewClientHandler(otelgrpc.WithTracerProvider(telset.TracerProvider)),
131+
))
132+
return newClient(telset, opts...)
133+
}
134+
135+
readerConn, err := createConn(readerTelset)
136+
if err != nil {
137+
return fmt.Errorf("error creating reader client connection: %w", err)
138+
}
139+
writerConn, err := createConn(writerTelset)
140+
if err != nil {
141+
return fmt.Errorf("error creating writer client connection: %w", err)
142+
}
143+
144+
f.readerConn, f.writerConn = readerConn, writerConn
145+
146+
return nil
147+
}
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
// Copyright (c) 2025 The Jaeger Authors.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package grpc
5+
6+
import (
7+
"net"
8+
"testing"
9+
"time"
10+
11+
"github.com/stretchr/testify/assert"
12+
"github.com/stretchr/testify/require"
13+
"go.opentelemetry.io/collector/component"
14+
"go.opentelemetry.io/collector/config/configauth"
15+
"go.opentelemetry.io/collector/config/configgrpc"
16+
"go.opentelemetry.io/collector/exporter/exporterhelper"
17+
"google.golang.org/grpc"
18+
19+
"github.com/jaegertracing/jaeger/internal/telemetry"
20+
"github.com/jaegertracing/jaeger/internal/tenancy"
21+
)
22+
23+
func TestNewFactory_NonEmptyAuthenticator(t *testing.T) {
24+
cfg := &Config{
25+
ClientConfig: configgrpc.ClientConfig{
26+
Auth: &configauth.Authentication{},
27+
},
28+
}
29+
_, err := NewFactory(*cfg, telemetry.NoopSettings())
30+
require.ErrorContains(t, err, "authenticator is not supported")
31+
}
32+
33+
func TestNewFactory(t *testing.T) {
34+
lis, err := net.Listen("tcp", ":0")
35+
require.NoError(t, err, "failed to listen")
36+
37+
s := grpc.NewServer()
38+
39+
startServer(t, s, lis)
40+
41+
cfg := Config{
42+
ClientConfig: configgrpc.ClientConfig{
43+
Endpoint: lis.Addr().String(),
44+
},
45+
TimeoutConfig: exporterhelper.TimeoutConfig{
46+
Timeout: 1 * time.Second,
47+
},
48+
Tenancy: tenancy.Options{
49+
Enabled: true,
50+
},
51+
}
52+
telset := telemetry.NoopSettings()
53+
f, err := NewFactory(cfg, telset)
54+
require.NoError(t, err)
55+
require.NoError(t, f.Close())
56+
}
57+
58+
func TestFactory(t *testing.T) {
59+
lis, err := net.Listen("tcp", ":0")
60+
require.NoError(t, err, "failed to listen")
61+
62+
s := grpc.NewServer()
63+
64+
conn := startServer(t, s, lis)
65+
f := &Factory{
66+
readerConn: conn,
67+
}
68+
69+
t.Run("CreateTraceReader", func(t *testing.T) {
70+
tr, err := f.CreateTraceReader()
71+
require.NoError(t, err)
72+
require.NotNil(t, tr)
73+
})
74+
75+
t.Run("CreateTraceWriter", func(t *testing.T) {
76+
tr, err := f.CreateTraceWriter()
77+
require.NoError(t, err)
78+
require.NotNil(t, tr)
79+
})
80+
81+
t.Run("CreateDependencyReader", func(t *testing.T) {
82+
tr, err := f.CreateDependencyReader()
83+
require.NoError(t, err)
84+
require.NotNil(t, tr)
85+
})
86+
}
87+
88+
func TestInitializeConnections_ClientError(t *testing.T) {
89+
f, err := NewFactory(Config{
90+
ClientConfig: configgrpc.ClientConfig{
91+
Endpoint: ":0",
92+
},
93+
}, telemetry.NoopSettings())
94+
require.NoError(t, err)
95+
t.Cleanup(func() { require.NoError(t, f.Close()) })
96+
newClientFn := func(_ component.TelemetrySettings, _ ...grpc.DialOption) (conn *grpc.ClientConn, err error) {
97+
return nil, assert.AnError
98+
}
99+
err = f.initializeConnections(component.TelemetrySettings{}, component.TelemetrySettings{}, newClientFn)
100+
assert.ErrorContains(t, err, "error creating reader client connection")
101+
}

0 commit comments

Comments
Β (0)