Skip to content

Commit f1981ab

Browse files
mahadzaryab1amol-verma-allen
authored andcommitted
[grpc][v2] Implement FindTraces Call in gRPC reader for remote storage api v2 (jaegertracing#6962)
<!-- !! Please DELETE this comment before posting. We appreciate your contribution to the Jaeger project! 👋🎉 --> ## Which problem is this PR solving? - Towards jaegertracing#6789 ## Description of the changes - This PR implements the `FindTraces` call in the gRPC v2 API client ## How was this change tested? - Added unit tests ## Checklist - [x] I have read https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md - [x] I have signed all commits - [x] I have added unit tests for the new functionality - [x] I have run lint and test steps successfully - for `jaeger`: `make lint test` - for `jaeger-ui`: `npm run lint` and `npm run test` --------- Signed-off-by: Mahad Zaryab <[email protected]> Signed-off-by: amol-verma-allen <[email protected]>
1 parent 88e0784 commit f1981ab

File tree

2 files changed

+150
-13
lines changed

2 files changed

+150
-13
lines changed

internal/storage/v2/grpc/tracereader.go

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ func (tr *TraceReader) GetTraces(
5050
Query: query,
5151
})
5252
if err != nil {
53-
yield(nil, fmt.Errorf("received error from grpc reader client: %w", err))
53+
yield(nil, fmt.Errorf("failed to execute GetTraces: %w", err))
5454
return
5555
}
5656
for received, err := stream.Recv(); !errors.Is(err, io.EOF); received, err = stream.Recv() {
@@ -68,7 +68,7 @@ func (tr *TraceReader) GetTraces(
6868
func (tr *TraceReader) GetServices(ctx context.Context) ([]string, error) {
6969
resp, err := tr.client.GetServices(ctx, &storage.GetServicesRequest{})
7070
if err != nil {
71-
return nil, fmt.Errorf("failed to get services: %w", err)
71+
return nil, fmt.Errorf("failed to execute GetServices: %w", err)
7272
}
7373
return resp.Services, nil
7474
}
@@ -82,7 +82,7 @@ func (tr *TraceReader) GetOperations(
8282
SpanKind: params.SpanKind,
8383
})
8484
if err != nil {
85-
return nil, fmt.Errorf("failed to get operations: %w", err)
85+
return nil, fmt.Errorf("failed to execute GetOperations: %w", err)
8686
}
8787
operations := make([]tracestore.Operation, len(resp.Operations))
8888
for i, op := range resp.Operations {
@@ -94,11 +94,28 @@ func (tr *TraceReader) GetOperations(
9494
return operations, nil
9595
}
9696

97-
func (*TraceReader) FindTraces(
98-
context.Context,
99-
tracestore.TraceQueryParams,
97+
func (tr *TraceReader) FindTraces(
98+
ctx context.Context,
99+
params tracestore.TraceQueryParams,
100100
) iter.Seq2[[]ptrace.Traces, error] {
101-
panic("not implemented")
101+
return func(yield func([]ptrace.Traces, error) bool) {
102+
stream, err := tr.client.FindTraces(ctx, &storage.FindTracesRequest{
103+
Query: toProtoQueryParameters(params),
104+
})
105+
if err != nil {
106+
yield(nil, fmt.Errorf("failed to execute FindTraces: %w", err))
107+
return
108+
}
109+
for received, err := stream.Recv(); !errors.Is(err, io.EOF); received, err = stream.Recv() {
110+
if err != nil {
111+
yield(nil, fmt.Errorf("received error from grpc stream: %w", err))
112+
return
113+
}
114+
if !yield([]ptrace.Traces{received.ToTraces()}, nil) {
115+
return
116+
}
117+
}
118+
}
102119
}
103120

104121
func (tr *TraceReader) FindTraceIDs(

internal/storage/v2/grpc/tracereader_test.go

Lines changed: 126 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,16 @@ func (ts *testServer) GetOperations(
5959
}, ts.err
6060
}
6161

62+
func (ts *testServer) FindTraces(
63+
_ *storage.FindTracesRequest,
64+
s storage.TraceReader_FindTracesServer,
65+
) error {
66+
for _, trace := range ts.traces {
67+
s.Send(trace)
68+
}
69+
return ts.err
70+
}
71+
6272
func (ts *testServer) FindTraceIDs(
6373
context.Context,
6474
*storage.FindTracesRequest,
@@ -215,7 +225,7 @@ func TestTraceReader_GetTraces_GRPCClientError(t *testing.T) {
215225
reader := NewTraceReader(conn)
216226
getTracesIter := reader.GetTraces(context.Background(), tracestore.GetTraceParams{})
217227
_, err = jiter.FlattenWithErrors(getTracesIter)
218-
require.ErrorContains(t, err, "received error from grpc reader client")
228+
require.ErrorContains(t, err, "failed to execute GetTraces")
219229
}
220230

221231
func TestTraceReader_GetServices(t *testing.T) {
@@ -237,7 +247,7 @@ func TestTraceReader_GetServices(t *testing.T) {
237247
testServer: &testServer{
238248
err: assert.AnError,
239249
},
240-
expectedError: "failed to get services",
250+
expectedError: "failed to execute GetServices",
241251
},
242252
}
243253

@@ -282,7 +292,7 @@ func TestTraceReader_GetOperations(t *testing.T) {
282292
testServer: &testServer{
283293
err: assert.AnError,
284294
},
285-
expectedError: "failed to get operations",
295+
expectedError: "failed to execute GetOperations",
286296
},
287297
}
288298

@@ -306,11 +316,121 @@ func TestTraceReader_GetOperations(t *testing.T) {
306316
}
307317

308318
func TestTraceReader_FindTraces(t *testing.T) {
309-
tr := &TraceReader{}
319+
queryParams := tracestore.TraceQueryParams{
320+
ServiceName: "service-a",
321+
OperationName: "operation-a",
322+
Attributes: pcommon.NewMap(),
323+
}
324+
tests := []struct {
325+
name string
326+
testServer *testServer
327+
traces []*jptrace.TracesData
328+
expectedTraces []ptrace.Traces
329+
expectedError string
330+
}{
331+
{
332+
name: "single trace",
333+
testServer: &testServer{
334+
traces: func() []*jptrace.TracesData {
335+
trace := makeTestTrace()
336+
traces := []*jptrace.TracesData{(*jptrace.TracesData)(&trace)}
337+
return traces
338+
}(),
339+
},
340+
expectedTraces: []ptrace.Traces{makeTestTrace()},
341+
},
342+
{
343+
name: "multiple traces",
344+
testServer: &testServer{
345+
traces: func() []*jptrace.TracesData {
346+
traceA := makeTestTrace()
347+
traceB := makeTestTrace()
348+
traces := []*jptrace.TracesData{
349+
(*jptrace.TracesData)(&traceA),
350+
(*jptrace.TracesData)(&traceB),
351+
}
352+
return traces
353+
}(),
354+
},
355+
expectedTraces: []ptrace.Traces{makeTestTrace(), makeTestTrace()},
356+
},
357+
{
358+
name: "error",
359+
testServer: &testServer{
360+
traces: func() []*jptrace.TracesData {
361+
trace := ptrace.NewTraces()
362+
traces := []*jptrace.TracesData{(*jptrace.TracesData)(&trace)}
363+
return traces
364+
}(),
365+
err: assert.AnError,
366+
},
367+
expectedError: "received error from grpc stream",
368+
},
369+
}
310370

311-
require.Panics(t, func() {
312-
tr.FindTraces(context.Background(), tracestore.TraceQueryParams{})
371+
for _, test := range tests {
372+
t.Run(test.name, func(t *testing.T) {
373+
conn := startTestServer(t, test.testServer)
374+
375+
reader := NewTraceReader(conn)
376+
getTracesIter := reader.FindTraces(context.Background(), queryParams)
377+
traces, err := jiter.FlattenWithErrors(getTracesIter)
378+
379+
if test.expectedError != "" {
380+
require.ErrorContains(t, err, test.expectedError)
381+
} else {
382+
require.NoError(t, err)
383+
require.Equal(t, test.expectedTraces, traces)
384+
}
385+
})
386+
}
387+
}
388+
389+
func TestTraceReader_FindTraces_YieldStopsIteration(t *testing.T) {
390+
queryParams := tracestore.TraceQueryParams{
391+
ServiceName: "service-a",
392+
OperationName: "operation-a",
393+
Attributes: pcommon.NewMap(),
394+
}
395+
traceA := makeTestTrace()
396+
traceB := makeTestTrace()
397+
testServer := &testServer{
398+
traces: []*jptrace.TracesData{
399+
(*jptrace.TracesData)(&traceA),
400+
(*jptrace.TracesData)(&traceB),
401+
},
402+
}
403+
404+
conn := startTestServer(t, testServer)
405+
reader := NewTraceReader(conn)
406+
407+
getTracesIter := reader.FindTraces(context.Background(), queryParams)
408+
var gotTraces []ptrace.Traces
409+
getTracesIter(func(traces []ptrace.Traces, _ error) bool {
410+
gotTraces = append(gotTraces, traces...)
411+
return false
412+
})
413+
414+
require.Len(t, gotTraces, 1)
415+
}
416+
417+
func TestTraceReader_FindTraces_GRPCClientError(t *testing.T) {
418+
queryParams := tracestore.TraceQueryParams{
419+
ServiceName: "service-a",
420+
OperationName: "operation-a",
421+
Attributes: pcommon.NewMap(),
422+
}
423+
conn, err := grpc.NewClient(":0",
424+
grpc.WithTransportCredentials(insecure.NewCredentials()),
425+
) // create client without a started server
426+
require.NoError(t, err)
427+
t.Cleanup(func() {
428+
conn.Close()
313429
})
430+
reader := NewTraceReader(conn)
431+
getTracesIter := reader.FindTraces(context.Background(), queryParams)
432+
_, err = jiter.FlattenWithErrors(getTracesIter)
433+
require.ErrorContains(t, err, "failed to execute FindTraces")
314434
}
315435

316436
func TestTraceReader_FindTraceIDs(t *testing.T) {

0 commit comments

Comments
 (0)