-
Notifications
You must be signed in to change notification settings - Fork 2.6k
Use GRPC interceptors instead of explicit context wrappers #6133
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -12,11 +12,9 @@ import ( | |||
|
||||
"google.golang.org/grpc" | ||||
"google.golang.org/grpc/codes" | ||||
"google.golang.org/grpc/metadata" | ||||
"google.golang.org/grpc/status" | ||||
|
||||
"github.com/jaegertracing/jaeger/model" | ||||
"github.com/jaegertracing/jaeger/pkg/bearertoken" | ||||
_ "github.com/jaegertracing/jaeger/pkg/gogocodec" // force gogo codec registration | ||||
"github.com/jaegertracing/jaeger/proto-gen/storage_v1" | ||||
"github.com/jaegertracing/jaeger/storage/dependencystore" | ||||
|
@@ -30,9 +28,6 @@ var ( | |||
_ StoragePlugin = (*GRPCClient)(nil) | ||||
_ ArchiveStoragePlugin = (*GRPCClient)(nil) | ||||
_ PluginCapabilities = (*GRPCClient)(nil) | ||||
|
||||
// upgradeContext composites several steps of upgrading context | ||||
upgradeContext = composeContextUpgradeFuncs(upgradeContextWithBearerToken) | ||||
) | ||||
|
||||
// GRPCClient implements shared.StoragePlugin and reads/writes spans and dependencies | ||||
|
@@ -58,36 +53,6 @@ func NewGRPCClient(tracedConn *grpc.ClientConn, untracedConn *grpc.ClientConn) * | |||
} | ||||
} | ||||
|
||||
// ContextUpgradeFunc is a functional type that can be composed to upgrade context | ||||
type ContextUpgradeFunc func(ctx context.Context) context.Context | ||||
|
||||
// composeContextUpgradeFuncs composes ContextUpgradeFunc and returns a composed function | ||||
// to run the given func in strict order. | ||||
func composeContextUpgradeFuncs(funcs ...ContextUpgradeFunc) ContextUpgradeFunc { | ||||
return func(ctx context.Context) context.Context { | ||||
for _, fun := range funcs { | ||||
ctx = fun(ctx) | ||||
} | ||||
return ctx | ||||
} | ||||
} | ||||
|
||||
// upgradeContextWithBearerToken turns the context into a gRPC outgoing context with bearer token | ||||
// in the request metadata, if the original context has bearer token attached. | ||||
// Otherwise returns original context. | ||||
func upgradeContextWithBearerToken(ctx context.Context) context.Context { | ||||
bearerToken, hasToken := bearertoken.GetBearerToken(ctx) | ||||
if hasToken { | ||||
md, ok := metadata.FromOutgoingContext(ctx) | ||||
if !ok { | ||||
md = metadata.New(nil) | ||||
} | ||||
md.Set(BearerTokenKey, bearerToken) | ||||
return metadata.NewOutgoingContext(ctx, md) | ||||
} | ||||
return ctx | ||||
} | ||||
|
||||
// DependencyReader implements shared.StoragePlugin. | ||||
func (c *GRPCClient) DependencyReader() dependencystore.Reader { | ||||
return c | ||||
|
@@ -117,7 +82,7 @@ func (c *GRPCClient) ArchiveSpanWriter() spanstore.Writer { | |||
|
||||
// GetTrace takes a traceID and returns a Trace associated with that traceID | ||||
func (c *GRPCClient) GetTrace(ctx context.Context, traceID model.TraceID) (*model.Trace, error) { | ||||
stream, err := c.readerClient.GetTrace(upgradeContext(ctx), &storage_v1.GetTraceRequest{ | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. where is the code that attaches interceptors to the client that would do these upgrades instead? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Interceptors are being attached to shared client in jaeger/plugin/storage/grpc/factory.go Line 162 in 6a520a5
|
||||
stream, err := c.readerClient.GetTrace(ctx, &storage_v1.GetTraceRequest{ | ||||
TraceID: traceID, | ||||
}) | ||||
if status.Code(err) == codes.NotFound { | ||||
|
@@ -132,7 +97,7 @@ func (c *GRPCClient) GetTrace(ctx context.Context, traceID model.TraceID) (*mode | |||
|
||||
// GetServices returns a list of all known services | ||||
func (c *GRPCClient) GetServices(ctx context.Context) ([]string, error) { | ||||
resp, err := c.readerClient.GetServices(upgradeContext(ctx), &storage_v1.GetServicesRequest{}) | ||||
resp, err := c.readerClient.GetServices(ctx, &storage_v1.GetServicesRequest{}) | ||||
if err != nil { | ||||
return nil, fmt.Errorf("plugin error: %w", err) | ||||
} | ||||
|
@@ -145,7 +110,7 @@ func (c *GRPCClient) GetOperations( | |||
ctx context.Context, | ||||
query spanstore.OperationQueryParameters, | ||||
) ([]spanstore.Operation, error) { | ||||
resp, err := c.readerClient.GetOperations(upgradeContext(ctx), &storage_v1.GetOperationsRequest{ | ||||
resp, err := c.readerClient.GetOperations(ctx, &storage_v1.GetOperationsRequest{ | ||||
Service: query.ServiceName, | ||||
SpanKind: query.SpanKind, | ||||
}) | ||||
|
@@ -173,7 +138,7 @@ func (c *GRPCClient) GetOperations( | |||
|
||||
// FindTraces retrieves traces that match the traceQuery | ||||
func (c *GRPCClient) FindTraces(ctx context.Context, query *spanstore.TraceQueryParameters) ([]*model.Trace, error) { | ||||
stream, err := c.readerClient.FindTraces(upgradeContext(ctx), &storage_v1.FindTracesRequest{ | ||||
stream, err := c.readerClient.FindTraces(ctx, &storage_v1.FindTracesRequest{ | ||||
Query: &storage_v1.TraceQueryParameters{ | ||||
ServiceName: query.ServiceName, | ||||
OperationName: query.OperationName, | ||||
|
@@ -212,7 +177,7 @@ func (c *GRPCClient) FindTraces(ctx context.Context, query *spanstore.TraceQuery | |||
|
||||
// FindTraceIDs retrieves traceIDs that match the traceQuery | ||||
func (c *GRPCClient) FindTraceIDs(ctx context.Context, query *spanstore.TraceQueryParameters) ([]model.TraceID, error) { | ||||
resp, err := c.readerClient.FindTraceIDs(upgradeContext(ctx), &storage_v1.FindTraceIDsRequest{ | ||||
resp, err := c.readerClient.FindTraceIDs(ctx, &storage_v1.FindTraceIDsRequest{ | ||||
Query: &storage_v1.TraceQueryParameters{ | ||||
ServiceName: query.ServiceName, | ||||
OperationName: query.OperationName, | ||||
|
Uh oh!
There was an error while loading. Please reload this page.