Skip to content

Commit d595ecd

Browse files
mahadzaryab1yurishkuro
authored andcommitted
[grpc][v2] Implement GetTraces Call in gRPC reader for remote storage api v2 (jaegertracing#6857)
<!-- !! Please DELETE this comment before posting. We appreciate your contribution to the Jaeger project! 👋🎉 --> ## Which problem is this PR solving? - Towards jaegertracing#6789 ## Description of the changes - Implement the `GetTraces` call in the gRPC client for the remote storage API v2 ## How was this change tested? - Added unit tests ## Checklist - [x] I have read https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md - [x] I have signed all commits - [x] I have added unit tests for the new functionality - [x] I have run lint and test steps successfully - for `jaeger`: `make lint test` - for `jaeger-ui`: `npm run lint` and `npm run test` --------- Signed-off-by: Mahad Zaryab <[email protected]> Signed-off-by: Mahad Zaryab <[email protected]> Signed-off-by: Yuri Shkuro <[email protected]> Co-authored-by: Yuri Shkuro <[email protected]> Signed-off-by: amol-verma-allen <[email protected]>
1 parent ef5d4ae commit d595ecd

File tree

2 files changed

+163
-16
lines changed

2 files changed

+163
-16
lines changed

internal/storage/v2/grpc/tracereader.go

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@ package grpc
55

66
import (
77
"context"
8+
"errors"
89
"fmt"
10+
"io"
911
"iter"
1012

1113
"go.opentelemetry.io/collector/pdata/pcommon"
@@ -31,11 +33,36 @@ func NewTraceReader(conn *grpc.ClientConn) *TraceReader {
3133
}
3234
}
3335

34-
func (*TraceReader) GetTraces(
35-
context.Context,
36-
...tracestore.GetTraceParams,
36+
func (tr *TraceReader) GetTraces(
37+
ctx context.Context,
38+
traceIDs ...tracestore.GetTraceParams,
3739
) iter.Seq2[[]ptrace.Traces, error] {
38-
panic("not implemented")
40+
return func(yield func([]ptrace.Traces, error) bool) {
41+
query := []*storage.GetTraceParams{}
42+
for _, traceID := range traceIDs {
43+
query = append(query, &storage.GetTraceParams{
44+
TraceId: traceID.TraceID[:],
45+
StartTime: traceID.Start,
46+
EndTime: traceID.End,
47+
})
48+
}
49+
stream, err := tr.client.GetTraces(ctx, &storage.GetTracesRequest{
50+
Query: query,
51+
})
52+
if err != nil {
53+
yield(nil, fmt.Errorf("received error from grpc reader client: %w", err))
54+
return
55+
}
56+
for received, err := stream.Recv(); !errors.Is(err, io.EOF); received, err = stream.Recv() {
57+
if err != nil {
58+
yield(nil, fmt.Errorf("received error from grpc stream: %w", err))
59+
return
60+
}
61+
if !yield([]ptrace.Traces{received.ToTraces()}, nil) {
62+
return
63+
}
64+
}
65+
}
3966
}
4067

4168
func (tr *TraceReader) GetServices(ctx context.Context) ([]string, error) {

internal/storage/v2/grpc/tracereader_test.go

Lines changed: 132 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,12 @@ import (
1212
"github.com/stretchr/testify/assert"
1313
"github.com/stretchr/testify/require"
1414
"go.opentelemetry.io/collector/pdata/pcommon"
15+
"go.opentelemetry.io/collector/pdata/ptrace"
1516
"google.golang.org/grpc"
1617
"google.golang.org/grpc/credentials/insecure"
1718

1819
"github.com/jaegertracing/jaeger/internal/jiter"
20+
"github.com/jaegertracing/jaeger/internal/jptrace"
1921
"github.com/jaegertracing/jaeger/internal/proto-gen/storage/v2"
2022
"github.com/jaegertracing/jaeger/internal/storage/v2/api/tracestore"
2123
)
@@ -25,37 +27,45 @@ import (
2527
type testServer struct {
2628
storage.UnimplementedTraceReaderServer
2729

30+
traces []*jptrace.TracesData
2831
services []string
2932
operations []*storage.Operation
3033
traceIDs []*storage.FoundTraceID
3134
err error
3235
}
3336

34-
func (s *testServer) GetServices(
37+
func (ts *testServer) GetTraces(_ *storage.GetTracesRequest, s storage.TraceReader_GetTracesServer) error {
38+
for _, trace := range ts.traces {
39+
s.Send(trace)
40+
}
41+
return ts.err
42+
}
43+
44+
func (ts *testServer) GetServices(
3545
context.Context,
3646
*storage.GetServicesRequest,
3747
) (*storage.GetServicesResponse, error) {
3848
return &storage.GetServicesResponse{
39-
Services: s.services,
40-
}, s.err
49+
Services: ts.services,
50+
}, ts.err
4151
}
4252

43-
func (s *testServer) GetOperations(
53+
func (ts *testServer) GetOperations(
4454
context.Context,
4555
*storage.GetOperationsRequest,
4656
) (*storage.GetOperationsResponse, error) {
4757
return &storage.GetOperationsResponse{
48-
Operations: s.operations,
49-
}, s.err
58+
Operations: ts.operations,
59+
}, ts.err
5060
}
5161

52-
func (s *testServer) FindTraceIDs(
62+
func (ts *testServer) FindTraceIDs(
5363
context.Context,
5464
*storage.FindTracesRequest,
5565
) (*storage.FindTraceIDsResponse, error) {
5666
return &storage.FindTraceIDsResponse{
57-
TraceIds: s.traceIDs,
58-
}, s.err
67+
TraceIds: ts.traceIDs,
68+
}, ts.err
5969
}
6070

6171
func startTestServer(t *testing.T, testServer *testServer) *grpc.ClientConn {
@@ -90,12 +100,122 @@ func startServer(t *testing.T, server *grpc.Server, listener net.Listener) *grpc
90100
return conn
91101
}
92102

103+
func makeTestTrace() ptrace.Traces {
104+
trace := ptrace.NewTraces()
105+
resources := trace.ResourceSpans().AppendEmpty()
106+
scopes := resources.ScopeSpans().AppendEmpty()
107+
108+
spanA := scopes.Spans().AppendEmpty()
109+
spanA.SetName("foobar")
110+
spanA.SetTraceID(pcommon.TraceID([16]byte{1}))
111+
spanA.SetSpanID(pcommon.SpanID([8]byte{2}))
112+
spanA.SetKind(ptrace.SpanKindServer)
113+
spanA.Status().SetCode(ptrace.StatusCodeError)
114+
115+
return trace
116+
}
117+
93118
func TestTraceReader_GetTraces(t *testing.T) {
94-
tr := &TraceReader{}
119+
tests := []struct {
120+
name string
121+
testServer *testServer
122+
traces []*jptrace.TracesData
123+
expectedTraces []ptrace.Traces
124+
expectedError string
125+
}{
126+
{
127+
name: "single trace",
128+
testServer: &testServer{
129+
traces: func() []*jptrace.TracesData {
130+
trace := makeTestTrace()
131+
traces := []*jptrace.TracesData{(*jptrace.TracesData)(&trace)}
132+
return traces
133+
}(),
134+
},
135+
expectedTraces: []ptrace.Traces{makeTestTrace()},
136+
},
137+
{
138+
name: "multiple traces",
139+
testServer: &testServer{
140+
traces: func() []*jptrace.TracesData {
141+
traceA := makeTestTrace()
142+
traceB := makeTestTrace()
143+
traces := []*jptrace.TracesData{
144+
(*jptrace.TracesData)(&traceA),
145+
(*jptrace.TracesData)(&traceB),
146+
}
147+
return traces
148+
}(),
149+
},
150+
expectedTraces: []ptrace.Traces{makeTestTrace(), makeTestTrace()},
151+
},
152+
{
153+
name: "error",
154+
testServer: &testServer{
155+
traces: func() []*jptrace.TracesData {
156+
trace := ptrace.NewTraces()
157+
traces := []*jptrace.TracesData{(*jptrace.TracesData)(&trace)}
158+
return traces
159+
}(),
160+
err: assert.AnError,
161+
},
162+
expectedError: "received error from grpc stream",
163+
},
164+
}
95165

96-
require.Panics(t, func() {
97-
tr.GetTraces(context.Background(), tracestore.GetTraceParams{})
166+
for _, test := range tests {
167+
t.Run(test.name, func(t *testing.T) {
168+
conn := startTestServer(t, test.testServer)
169+
170+
reader := NewTraceReader(conn)
171+
getTracesIter := reader.GetTraces(context.Background(), tracestore.GetTraceParams{})
172+
traces, err := jiter.FlattenWithErrors(getTracesIter)
173+
174+
if test.expectedError != "" {
175+
require.ErrorContains(t, err, test.expectedError)
176+
} else {
177+
require.NoError(t, err)
178+
require.Equal(t, test.expectedTraces, traces)
179+
}
180+
})
181+
}
182+
}
183+
184+
func TestTraceReader_GetTraces_YieldStopsIteration(t *testing.T) {
185+
traceA := makeTestTrace()
186+
traceB := makeTestTrace()
187+
testServer := &testServer{
188+
traces: []*jptrace.TracesData{
189+
(*jptrace.TracesData)(&traceA),
190+
(*jptrace.TracesData)(&traceB),
191+
},
192+
}
193+
194+
conn := startTestServer(t, testServer)
195+
reader := NewTraceReader(conn)
196+
197+
getTracesIter := reader.GetTraces(context.Background(), tracestore.GetTraceParams{})
198+
var gotTraces []ptrace.Traces
199+
getTracesIter(func(traces []ptrace.Traces, _ error) bool {
200+
gotTraces = append(gotTraces, traces...)
201+
return false
202+
})
203+
204+
require.Len(t, gotTraces, 1)
205+
}
206+
207+
func TestTraceReader_GetTraces_GRPCClientError(t *testing.T) {
208+
conn, err := grpc.NewClient(":0",
209+
grpc.WithTransportCredentials(insecure.NewCredentials()),
210+
) // create client without a started server
211+
require.NoError(t, err)
212+
t.Cleanup(func() {
213+
conn.Close()
98214
})
215+
reader := NewTraceReader(conn)
216+
getTracesIter := reader.GetTraces(context.Background(), tracestore.GetTraceParams{})
217+
_, err = jiter.FlattenWithErrors(getTracesIter)
218+
require.ErrorContains(t, err, "received error from grpc reader client")
99219
}
100220

101221
func TestTraceReader_GetServices(t *testing.T) {

0 commit comments

Comments
 (0)