Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions pb/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func NewServer(opts ...grpc.ServerOption) *grpc.Server {
}

// NewServerWithInterceptors creates new grpc.Server with custom message size and default interceptors.
// The provided interceptros will be appended to the predefined ones.
// The provided interceptors be executed after the default ones.
func NewServerWithInterceptors(
streamInterceptors []grpc.StreamServerInterceptor,
unaryInterceptors []grpc.UnaryServerInterceptor,
Expand Down Expand Up @@ -112,7 +112,7 @@ func DialContext(ctx context.Context, target string, opts ...grpc.DialOption) (*
}

// DialContextWithInterceptors creates a client connection to the given target with custom message size and default interceptors.
// The provided interceptros will be appended to the predefined ones.
// The provided interceptors will be executed before the default ones.
func DialContextWithInterceptors(
ctx context.Context,
target string,
Expand All @@ -121,11 +121,13 @@ func DialContextWithInterceptors(
opts ...grpc.DialOption,
) (*grpc.ClientConn, error) {
streamInterceptors = append(
[]grpc.StreamClientInterceptor{CtxlogStreamClientInterceptor},
streamInterceptors...)
streamInterceptors,
CtxlogStreamClientInterceptor,
)
unaryInterceptors = append(
[]grpc.UnaryClientInterceptor{CtxlogUnaryClientInterceptor},
unaryInterceptors...)
unaryInterceptors,
CtxlogUnaryClientInterceptor,
)

opts = append(opts,
grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient(
Expand Down
209 changes: 209 additions & 0 deletions pb/grpc_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
package pb_test

import (
"context"
"fmt"
"net"
"testing"
time "time"

"gopkg.in/src-d/lookout-sdk.v0/pb"

"github.com/stretchr/testify/suite"
grpc "google.golang.org/grpc"
)

// Implements AnalyzerClient interface
type mockAnalyzer struct {
Callback func(ctx context.Context)
}

func (a *mockAnalyzer) NotifyReviewEvent(ctx context.Context, event *pb.ReviewEvent) (*pb.EventResponse, error) {
if a.Callback != nil {
a.Callback(ctx)
}
return &pb.EventResponse{}, nil
}

func (a *mockAnalyzer) NotifyPushEvent(ctx context.Context, event *pb.PushEvent) (*pb.EventResponse, error) {
if a.Callback != nil {
a.Callback(ctx)
}
return &pb.EventResponse{}, nil
}

const port = 50050

// GrpcSuite is this file's main test suite
type GrpcSuite struct {
suite.Suite

listener net.Listener
grpcServer *grpc.Server
grpcAddr string
}

func TestGrpcSuite(t *testing.T) {
suite.Run(t, new(GrpcSuite))
}

func (s *GrpcSuite) SetupSuite() {
var err error
s.grpcAddr, err = pb.ToGoGrpcAddress(fmt.Sprintf("ipv4://127.0.0.1:%d", port))
s.Require().NoError(err)
}

func (s *GrpcSuite) TearDownTest() {
s.grpcServer.Stop()
s.listener.Close()
}

func (s *GrpcSuite) startServer() {
var err error
s.listener, err = pb.Listen(fmt.Sprintf("ipv4://0.0.0.0:%d", port))
s.Require().NoError(err, "failed to listen on port: %d", port)

err = s.grpcServer.Serve(s.listener)
s.Require().NoError(err)
}

func (s *GrpcSuite) TestAnalyzerService() {
// Basic test for the Analyzer service methods

require := s.Require()

s.grpcServer = pb.NewServer()
pb.RegisterAnalyzerServer(s.grpcServer, &mockAnalyzer{})

go s.startServer()
// Wait for the server to start listening
time.Sleep(time.Second)

conn, err := pb.DialContext(context.TODO(), s.grpcAddr)
require.NoError(err)

client := pb.NewAnalyzerClient(conn)

res, err := client.NotifyReviewEvent(context.TODO(), &pb.ReviewEvent{})
require.NoError(err)
require.NotNil(res)

res, err = client.NotifyPushEvent(context.TODO(), &pb.PushEvent{})
require.NoError(err)
require.NotNil(res)
}

func (s *GrpcSuite) TestAnalyzerCtxlogInterceptor() {
// In this scenario we use pb.AddFields before the gRPC call is started,
// and pb.GetLogFields inside the gRPC handler.

require := s.Require()

var serverFields pb.Fields

s.grpcServer = pb.NewServer()
pb.RegisterAnalyzerServer(s.grpcServer, &mockAnalyzer{
Callback: func(ctx context.Context) {
serverFields = pb.GetLogFields(ctx)
},
})

go s.startServer()
// Wait for the server to start listening
time.Sleep(time.Second)

conn, err := pb.DialContext(context.TODO(), s.grpcAddr)
require.NoError(err)

client := pb.NewAnalyzerClient(conn)

clientFields := pb.Fields{
"key-A": "value-A",
}
ctx := pb.AddLogFields(context.TODO(), clientFields)

res, err := client.NotifyReviewEvent(ctx, &pb.ReviewEvent{})
require.NoError(err)
require.NotNil(res)
require.EqualValues(clientFields, serverFields)

// New context, replaces the fields
clientFields = pb.Fields{
"key-B": "value-B",
}
ctx = pb.AddLogFields(context.TODO(), clientFields)

res, err = client.NotifyPushEvent(ctx, &pb.PushEvent{})
require.NoError(err)
require.NotNil(res)
require.EqualValues(clientFields, serverFields)
}

func (s *GrpcSuite) TestAnalyzerAddLogFieldsInterceptor() {
// In this scenario we use pb.AddFields in a client interceptor,
// and pb.GetLogFields in a server interceptor

require := s.Require()

var serverFields pb.Fields

myUnaryServerInterceptor := func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
serverFields = pb.GetLogFields(ctx)
return handler(ctx, req)
}

myStreamServerInterceptor := func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
ctx := stream.Context()
serverFields = pb.GetLogFields(ctx)
return handler(srv, stream)
}

s.grpcServer = pb.NewServerWithInterceptors(
[]grpc.StreamServerInterceptor{myStreamServerInterceptor},
[]grpc.UnaryServerInterceptor{myUnaryServerInterceptor},
)
pb.RegisterAnalyzerServer(s.grpcServer, &mockAnalyzer{})

go s.startServer()
// Wait for the server to start listening
time.Sleep(time.Second)

var clientFields pb.Fields

myUnaryClientInterceptor := func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
ctx = pb.AddLogFields(context.TODO(), clientFields)
return invoker(ctx, method, req, reply, cc, opts...)
}

myStreamClientInterceptor := func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
ctx = pb.AddLogFields(context.TODO(), clientFields)
return streamer(ctx, desc, cc, method, opts...)
}

conn, err := pb.DialContextWithInterceptors(context.TODO(), s.grpcAddr,
[]grpc.StreamClientInterceptor{myStreamClientInterceptor},
[]grpc.UnaryClientInterceptor{myUnaryClientInterceptor},
)
require.NoError(err)

client := pb.NewAnalyzerClient(conn)

clientFields = pb.Fields{
"key-A": "value-A",
}

res, err := client.NotifyReviewEvent(context.TODO(), &pb.ReviewEvent{})
require.NoError(err)
require.NotNil(res)
require.EqualValues(clientFields, serverFields)

// New context, replaces the fields
clientFields = pb.Fields{
"key-B": "value-B",
}

res, err = client.NotifyPushEvent(context.TODO(), &pb.PushEvent{})
require.NoError(err)
require.NotNil(res)
require.EqualValues(clientFields, serverFields)
}