Skip to content

Commit 9e8e503

Browse files
committed
copy open-telemetry#9357 changes for testing
1 parent 15201f1 commit 9e8e503

File tree

4 files changed

+151
-38
lines changed

4 files changed

+151
-38
lines changed

receiver/otlpreceiver/internal/errors/errors.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
package errors // import "go.opentelemetry.io/collector/receiver/otlpreceiver/internal/errors"
55

66
import (
7+
"net/http"
8+
79
"google.golang.org/grpc/codes"
810
"google.golang.org/grpc/status"
911

@@ -24,3 +26,25 @@ func GetStatusFromError(err error) error {
2426
}
2527
return s.Err()
2628
}
29+
30+
func GetHTTPStatusCodeFromStatus(err error) int {
31+
s, ok := status.FromError(err)
32+
if !ok {
33+
return http.StatusInternalServerError
34+
}
35+
// See https://github.com/open-telemetry/opentelemetry-proto/blob/main/docs/specification.md#failures
36+
// to see if a code is retryable.
37+
// See https://github.com/open-telemetry/opentelemetry-proto/blob/main/docs/specification.md#failures-1
38+
// to see a list of retryable http status codes.
39+
switch s.Code() {
40+
// Retryable
41+
case codes.Canceled, codes.DeadlineExceeded, codes.Aborted, codes.OutOfRange, codes.Unavailable, codes.DataLoss:
42+
return http.StatusServiceUnavailable
43+
// Retryable
44+
case codes.ResourceExhausted:
45+
return http.StatusTooManyRequests
46+
// Not Retryable
47+
default:
48+
return http.StatusInternalServerError
49+
}
50+
}

receiver/otlpreceiver/internal/errors/errors_test.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package errors // import "go.opentelemetry.io/collector/receiver/otlpreceiver/in
55

66
import (
77
"fmt"
8+
"net/http"
89
"testing"
910

1011
"github.com/stretchr/testify/assert"
@@ -43,3 +44,38 @@ func Test_GetStatusFromError(t *testing.T) {
4344
})
4445
}
4546
}
47+
48+
func Test_GetHTTPStatusCodeFromStatus(t *testing.T) {
49+
tests := []struct {
50+
name string
51+
input error
52+
expected int
53+
}{
54+
{
55+
name: "Not a Status",
56+
input: fmt.Errorf("not a status error"),
57+
expected: http.StatusInternalServerError,
58+
},
59+
{
60+
name: "Retryable Status",
61+
input: status.New(codes.Unavailable, "test").Err(),
62+
expected: http.StatusServiceUnavailable,
63+
},
64+
{
65+
name: "Non-retryable Status",
66+
input: status.New(codes.InvalidArgument, "test").Err(),
67+
expected: http.StatusInternalServerError,
68+
},
69+
{
70+
name: "Specifically 429",
71+
input: status.New(codes.ResourceExhausted, "test").Err(),
72+
expected: http.StatusTooManyRequests,
73+
},
74+
}
75+
for _, tt := range tests {
76+
t.Run(tt.name, func(t *testing.T) {
77+
result := GetHTTPStatusCodeFromStatus(tt.input)
78+
assert.Equal(t, tt.expected, result)
79+
})
80+
}
81+
}

receiver/otlpreceiver/otlp_test.go

Lines changed: 87 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -52,10 +52,12 @@ var otlpReceiverID = component.MustNewIDWithName("otlp", otlpReceiverName)
5252

5353
func TestJsonHttp(t *testing.T) {
5454
tests := []struct {
55-
name string
56-
encoding string
57-
contentType string
58-
err error
55+
name string
56+
encoding string
57+
contentType string
58+
err error
59+
expectedStatus *spb.Status
60+
expectedStatusCode int
5961
}{
6062
{
6163
name: "JSONUncompressed",
@@ -83,16 +85,36 @@ func TestJsonHttp(t *testing.T) {
8385
contentType: "application/json",
8486
},
8587
{
86-
name: "NotGRPCError",
87-
encoding: "",
88-
contentType: "application/json",
89-
err: errors.New("my error"),
88+
name: "Permanent NotGRPCError",
89+
encoding: "",
90+
contentType: "application/json",
91+
err: consumererror.NewPermanent(errors.New("my error")),
92+
expectedStatus: &spb.Status{Code: int32(codes.Internal), Message: "Permanent error: my error"},
93+
expectedStatusCode: http.StatusInternalServerError,
9094
},
9195
{
92-
name: "GRPCError",
93-
encoding: "",
94-
contentType: "application/json",
95-
err: status.New(codes.Unavailable, "").Err(),
96+
name: "Retryable NotGRPCError",
97+
encoding: "",
98+
contentType: "application/json",
99+
err: errors.New("my error"),
100+
expectedStatus: &spb.Status{Code: int32(codes.Unavailable), Message: "my error"},
101+
expectedStatusCode: http.StatusServiceUnavailable,
102+
},
103+
{
104+
name: "Permanent GRPCError",
105+
encoding: "",
106+
contentType: "application/json",
107+
err: status.New(codes.InvalidArgument, "").Err(),
108+
expectedStatus: &spb.Status{Code: int32(codes.InvalidArgument), Message: ""},
109+
expectedStatusCode: http.StatusInternalServerError,
110+
},
111+
{
112+
name: "Retryable GRPCError",
113+
encoding: "",
114+
contentType: "application/json",
115+
err: status.New(codes.Unavailable, "").Err(),
116+
expectedStatus: &spb.Status{Code: int32(codes.Unavailable), Message: ""},
117+
expectedStatusCode: http.StatusServiceUnavailable,
96118
},
97119
}
98120
addr := testutil.GetAvailableLocalAddress(t)
@@ -108,7 +130,7 @@ func TestJsonHttp(t *testing.T) {
108130

109131
for _, dr := range generateDataRequests(t) {
110132
url := "http://" + addr + dr.path
111-
respBytes := doHTTPRequest(t, url, tt.encoding, tt.contentType, dr.jsonBytes, tt.err != nil)
133+
respBytes := doHTTPRequest(t, url, tt.encoding, tt.contentType, dr.jsonBytes, tt.expectedStatusCode)
112134
if tt.err == nil {
113135
tr := ptraceotlp.NewExportResponse()
114136
assert.NoError(t, tr.UnmarshalJSON(respBytes), "Unable to unmarshal response to Response")
@@ -120,7 +142,7 @@ func TestJsonHttp(t *testing.T) {
120142
assert.True(t, proto.Equal(errStatus, s.Proto()))
121143
} else {
122144
fmt.Println(errStatus)
123-
assert.True(t, proto.Equal(errStatus, &spb.Status{Code: int32(codes.Unavailable), Message: "my error"}))
145+
assert.True(t, proto.Equal(errStatus, tt.expectedStatus))
124146
}
125147
sink.checkData(t, dr.data, 0)
126148
}
@@ -302,9 +324,11 @@ func TestHandleInvalidRequests(t *testing.T) {
302324

303325
func TestProtoHttp(t *testing.T) {
304326
tests := []struct {
305-
name string
306-
encoding string
307-
err error
327+
name string
328+
encoding string
329+
err error
330+
expectedStatus *spb.Status
331+
expectedStatusCode int
308332
}{
309333
{
310334
name: "ProtoUncompressed",
@@ -319,14 +343,32 @@ func TestProtoHttp(t *testing.T) {
319343
encoding: "zstd",
320344
},
321345
{
322-
name: "NotGRPCError",
323-
encoding: "",
324-
err: errors.New("my error"),
346+
name: "Permanent NotGRPCError",
347+
encoding: "",
348+
err: consumererror.NewPermanent(errors.New("my error")),
349+
expectedStatus: &spb.Status{Code: int32(codes.Internal), Message: "Permanent error: my error"},
350+
expectedStatusCode: http.StatusInternalServerError,
325351
},
326352
{
327-
name: "GRPCError",
328-
encoding: "",
329-
err: status.New(codes.Unavailable, "").Err(),
353+
name: "Retryable NotGRPCError",
354+
encoding: "",
355+
err: errors.New("my error"),
356+
expectedStatus: &spb.Status{Code: int32(codes.Unavailable), Message: "my error"},
357+
expectedStatusCode: http.StatusServiceUnavailable,
358+
},
359+
{
360+
name: "Permanent GRPCError",
361+
encoding: "",
362+
err: status.New(codes.InvalidArgument, "").Err(),
363+
expectedStatus: &spb.Status{Code: int32(codes.InvalidArgument), Message: ""},
364+
expectedStatusCode: http.StatusInternalServerError,
365+
},
366+
{
367+
name: "Retryable GRPCError",
368+
encoding: "",
369+
err: status.New(codes.Unavailable, "").Err(),
370+
expectedStatus: &spb.Status{Code: int32(codes.Unavailable), Message: ""},
371+
expectedStatusCode: http.StatusServiceUnavailable,
330372
},
331373
}
332374
addr := testutil.GetAvailableLocalAddress(t)
@@ -345,7 +387,7 @@ func TestProtoHttp(t *testing.T) {
345387

346388
for _, dr := range generateDataRequests(t) {
347389
url := "http://" + addr + dr.path
348-
respBytes := doHTTPRequest(t, url, tt.encoding, "application/x-protobuf", dr.protoBytes, tt.err != nil)
390+
respBytes := doHTTPRequest(t, url, tt.encoding, "application/x-protobuf", dr.protoBytes, tt.expectedStatusCode)
349391
if tt.err == nil {
350392
tr := ptraceotlp.NewExportResponse()
351393
assert.NoError(t, tr.UnmarshalProto(respBytes))
@@ -356,7 +398,7 @@ func TestProtoHttp(t *testing.T) {
356398
if s, ok := status.FromError(tt.err); ok {
357399
assert.True(t, proto.Equal(errStatus, s.Proto()))
358400
} else {
359-
assert.True(t, proto.Equal(errStatus, &spb.Status{Code: int32(codes.Unavailable), Message: "my error"}))
401+
assert.True(t, proto.Equal(errStatus, tt.expectedStatus))
360402
}
361403
sink.checkData(t, dr.data, 0)
362404
}
@@ -560,20 +602,30 @@ func TestOTLPReceiverGRPCTracesIngestTest(t *testing.T) {
560602
// trace receiver.
561603
func TestOTLPReceiverHTTPTracesIngestTest(t *testing.T) {
562604
type ingestionStateTest struct {
563-
okToIngest bool
564-
expectedCode codes.Code
605+
okToIngest bool
606+
err error
607+
expectedCode codes.Code
608+
expectedStatusCode int
565609
}
566610

567611
expectedReceivedBatches := 2
568-
expectedIngestionBlockedRPCs := 1
612+
expectedIngestionBlockedRPCs := 2
569613
ingestionStates := []ingestionStateTest{
570614
{
571615
okToIngest: true,
572616
expectedCode: codes.OK,
573617
},
574618
{
575-
okToIngest: false,
576-
expectedCode: codes.Unavailable,
619+
okToIngest: false,
620+
err: consumererror.NewPermanent(errors.New("consumer error")),
621+
expectedCode: codes.Internal,
622+
expectedStatusCode: http.StatusInternalServerError,
623+
},
624+
{
625+
okToIngest: false,
626+
err: errors.New("consumer error"),
627+
expectedCode: codes.Unavailable,
628+
expectedStatusCode: http.StatusServiceUnavailable,
577629
},
578630
{
579631
okToIngest: true,
@@ -599,7 +651,7 @@ func TestOTLPReceiverHTTPTracesIngestTest(t *testing.T) {
599651
if ingestionState.okToIngest {
600652
sink.SetConsumeError(nil)
601653
} else {
602-
sink.SetConsumeError(errors.New("consumer error"))
654+
sink.SetConsumeError(ingestionState.err)
603655
}
604656

605657
pbMarshaler := ptrace.ProtoMarshaler{}
@@ -620,7 +672,7 @@ func TestOTLPReceiverHTTPTracesIngestTest(t *testing.T) {
620672
} else {
621673
errStatus := &spb.Status{}
622674
assert.NoError(t, proto.Unmarshal(respBytes, errStatus))
623-
assert.Equal(t, http.StatusInternalServerError, resp.StatusCode)
675+
assert.Equal(t, ingestionState.expectedStatusCode, resp.StatusCode)
624676
assert.Equal(t, ingestionState.expectedCode, codes.Code(errStatus.Code))
625677
}
626678
}
@@ -853,7 +905,7 @@ func doHTTPRequest(
853905
encoding string,
854906
contentType string,
855907
data []byte,
856-
expectErr bool,
908+
expectStatusCode int,
857909
) []byte {
858910
req := createHTTPRequest(t, url, encoding, contentType, data)
859911
resp, err := http.DefaultClient.Do(req)
@@ -866,10 +918,10 @@ func doHTTPRequest(
866918
// For cases like "application/json; charset=utf-8", the response will be only "application/json"
867919
require.True(t, strings.HasPrefix(strings.ToLower(contentType), resp.Header.Get("Content-Type")))
868920

869-
if !expectErr {
921+
if expectStatusCode == 0 {
870922
require.Equal(t, http.StatusOK, resp.StatusCode)
871923
} else {
872-
require.Equal(t, http.StatusInternalServerError, resp.StatusCode)
924+
require.Equal(t, expectStatusCode, resp.StatusCode)
873925
}
874926

875927
return respBytes

receiver/otlpreceiver/otlphttp.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"google.golang.org/grpc/codes"
1414
"google.golang.org/grpc/status"
1515

16+
"go.opentelemetry.io/collector/receiver/otlpreceiver/internal/errors"
1617
"go.opentelemetry.io/collector/receiver/otlpreceiver/internal/logs"
1718
"go.opentelemetry.io/collector/receiver/otlpreceiver/internal/metrics"
1819
"go.opentelemetry.io/collector/receiver/otlpreceiver/internal/trace"
@@ -42,7 +43,7 @@ func handleTraces(resp http.ResponseWriter, req *http.Request, tracesReceiver *t
4243

4344
otlpResp, err := tracesReceiver.Export(req.Context(), otlpReq)
4445
if err != nil {
45-
writeError(resp, enc, err, http.StatusInternalServerError)
46+
writeError(resp, enc, err, errors.GetHTTPStatusCodeFromStatus(err))
4647
return
4748
}
4849

@@ -73,7 +74,7 @@ func handleMetrics(resp http.ResponseWriter, req *http.Request, metricsReceiver
7374

7475
otlpResp, err := metricsReceiver.Export(req.Context(), otlpReq)
7576
if err != nil {
76-
writeError(resp, enc, err, http.StatusInternalServerError)
77+
writeError(resp, enc, err, errors.GetHTTPStatusCodeFromStatus(err))
7778
return
7879
}
7980

@@ -104,7 +105,7 @@ func handleLogs(resp http.ResponseWriter, req *http.Request, logsReceiver *logs.
104105

105106
otlpResp, err := logsReceiver.Export(req.Context(), otlpReq)
106107
if err != nil {
107-
writeError(resp, enc, err, http.StatusInternalServerError)
108+
writeError(resp, enc, err, errors.GetHTTPStatusCodeFromStatus(err))
108109
return
109110
}
110111

0 commit comments

Comments
 (0)