Skip to content

Commit 20c5453

Browse files
committed
Return proper http response code based on retryable error
1 parent 9047c0e commit 20c5453

File tree

4 files changed

+143
-38
lines changed

4 files changed

+143
-38
lines changed

receiver/otlpreceiver/internal/errors/errors.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
package errors // import "go.opentelemetry.io/collector/receiver/otlpreceiver/internal/errors"
55

66
import (
7+
"net/http"
8+
79
"google.golang.org/grpc/codes"
810
"google.golang.org/grpc/status"
911

@@ -23,3 +25,22 @@ func GetStatusFromError(err error) error {
2325
}
2426
return s.Err()
2527
}
28+
29+
func GetHTTPStatusCodeFromStatus(err error) int {
30+
s, ok := status.FromError(err)
31+
if !ok {
32+
return http.StatusInternalServerError
33+
}
34+
// See https://github.com/open-telemetry/opentelemetry-proto/blob/main/docs/specification.md#failures
35+
// to see if a code is retryable.
36+
// See https://github.com/open-telemetry/opentelemetry-proto/blob/main/docs/specification.md#failures-1
37+
// to see a list of retryable http status codes.
38+
switch s.Code() {
39+
// Retryable
40+
case codes.Canceled, codes.DeadlineExceeded, codes.Aborted, codes.OutOfRange, codes.Unavailable, codes.DataLoss:
41+
return http.StatusServiceUnavailable
42+
// Not Retryable
43+
default:
44+
return http.StatusInternalServerError
45+
}
46+
}

receiver/otlpreceiver/internal/errors/errors_test.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package errors // import "go.opentelemetry.io/collector/receiver/otlpreceiver/in
55

66
import (
77
"fmt"
8+
"net/http"
89
"testing"
910

1011
"github.com/stretchr/testify/assert"
@@ -43,3 +44,33 @@ func Test_GetStatusFromError(t *testing.T) {
4344
})
4445
}
4546
}
47+
48+
func Test_GetHTTPStatusCodeFromStatus(t *testing.T) {
49+
tests := []struct {
50+
name string
51+
input error
52+
expected int
53+
}{
54+
{
55+
name: "Not a Status",
56+
input: fmt.Errorf("not a status error"),
57+
expected: http.StatusInternalServerError,
58+
},
59+
{
60+
name: "Retryable Status",
61+
input: status.New(codes.Unavailable, "test").Err(),
62+
expected: http.StatusServiceUnavailable,
63+
},
64+
{
65+
name: "Non-retryable Status",
66+
input: status.New(codes.InvalidArgument, "test").Err(),
67+
expected: http.StatusInternalServerError,
68+
},
69+
}
70+
for _, tt := range tests {
71+
t.Run(tt.name, func(t *testing.T) {
72+
result := GetHTTPStatusCodeFromStatus(tt.input)
73+
assert.Equal(t, tt.expected, result)
74+
})
75+
}
76+
}

receiver/otlpreceiver/otlp_test.go

Lines changed: 87 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -52,10 +52,12 @@ var otlpReceiverID = component.NewIDWithName("otlp", otlpReceiverName)
5252

5353
func TestJsonHttp(t *testing.T) {
5454
tests := []struct {
55-
name string
56-
encoding string
57-
contentType string
58-
err error
55+
name string
56+
encoding string
57+
contentType string
58+
err error
59+
expectedStatus *spb.Status
60+
expectedStatusCode int
5961
}{
6062
{
6163
name: "JSONUncompressed",
@@ -83,16 +85,36 @@ func TestJsonHttp(t *testing.T) {
8385
contentType: "application/json",
8486
},
8587
{
86-
name: "NotGRPCError",
87-
encoding: "",
88-
contentType: "application/json",
89-
err: errors.New("my error"),
88+
name: "Permanent NotGRPCError",
89+
encoding: "",
90+
contentType: "application/json",
91+
err: consumererror.NewPermanent(errors.New("my error")),
92+
expectedStatus: &spb.Status{Code: int32(codes.InvalidArgument), Message: "Permanent error: my error"},
93+
expectedStatusCode: http.StatusInternalServerError,
9094
},
9195
{
92-
name: "GRPCError",
93-
encoding: "",
94-
contentType: "application/json",
95-
err: status.New(codes.Unavailable, "").Err(),
96+
name: "Retryable NotGRPCError",
97+
encoding: "",
98+
contentType: "application/json",
99+
err: errors.New("my error"),
100+
expectedStatus: &spb.Status{Code: int32(codes.Unavailable), Message: "my error"},
101+
expectedStatusCode: http.StatusServiceUnavailable,
102+
},
103+
{
104+
name: "Permanent GRPCError",
105+
encoding: "",
106+
contentType: "application/json",
107+
err: status.New(codes.InvalidArgument, "").Err(),
108+
expectedStatus: &spb.Status{Code: int32(codes.InvalidArgument), Message: ""},
109+
expectedStatusCode: http.StatusInternalServerError,
110+
},
111+
{
112+
name: "Retryable GRPCError",
113+
encoding: "",
114+
contentType: "application/json",
115+
err: status.New(codes.Unavailable, "").Err(),
116+
expectedStatus: &spb.Status{Code: int32(codes.Unavailable), Message: ""},
117+
expectedStatusCode: http.StatusServiceUnavailable,
96118
},
97119
}
98120
addr := testutil.GetAvailableLocalAddress(t)
@@ -108,7 +130,7 @@ func TestJsonHttp(t *testing.T) {
108130

109131
for _, dr := range generateDataRequests(t) {
110132
url := "http://" + addr + dr.path
111-
respBytes := doHTTPRequest(t, url, tt.encoding, tt.contentType, dr.jsonBytes, tt.err != nil)
133+
respBytes := doHTTPRequest(t, url, tt.encoding, tt.contentType, dr.jsonBytes, tt.expectedStatusCode)
112134
if tt.err == nil {
113135
tr := ptraceotlp.NewExportResponse()
114136
assert.NoError(t, tr.UnmarshalJSON(respBytes), "Unable to unmarshal response to Response")
@@ -120,7 +142,7 @@ func TestJsonHttp(t *testing.T) {
120142
assert.True(t, proto.Equal(errStatus, s.Proto()))
121143
} else {
122144
fmt.Println(errStatus)
123-
assert.True(t, proto.Equal(errStatus, &spb.Status{Code: int32(codes.Unavailable), Message: "my error"}))
145+
assert.True(t, proto.Equal(errStatus, tt.expectedStatus))
124146
}
125147
sink.checkData(t, dr.data, 0)
126148
}
@@ -302,9 +324,11 @@ func TestHandleInvalidRequests(t *testing.T) {
302324

303325
func TestProtoHttp(t *testing.T) {
304326
tests := []struct {
305-
name string
306-
encoding string
307-
err error
327+
name string
328+
encoding string
329+
err error
330+
expectedStatus *spb.Status
331+
expectedStatusCode int
308332
}{
309333
{
310334
name: "ProtoUncompressed",
@@ -319,14 +343,32 @@ func TestProtoHttp(t *testing.T) {
319343
encoding: "zstd",
320344
},
321345
{
322-
name: "NotGRPCError",
323-
encoding: "",
324-
err: errors.New("my error"),
346+
name: "Permanent NotGRPCError",
347+
encoding: "",
348+
err: consumererror.NewPermanent(errors.New("my error")),
349+
expectedStatus: &spb.Status{Code: int32(codes.InvalidArgument), Message: "Permanent error: my error"},
350+
expectedStatusCode: http.StatusInternalServerError,
325351
},
326352
{
327-
name: "GRPCError",
328-
encoding: "",
329-
err: status.New(codes.Unavailable, "").Err(),
353+
name: "Retryable NotGRPCError",
354+
encoding: "",
355+
err: errors.New("my error"),
356+
expectedStatus: &spb.Status{Code: int32(codes.Unavailable), Message: "my error"},
357+
expectedStatusCode: http.StatusServiceUnavailable,
358+
},
359+
{
360+
name: "Permanent GRPCError",
361+
encoding: "",
362+
err: status.New(codes.InvalidArgument, "").Err(),
363+
expectedStatus: &spb.Status{Code: int32(codes.InvalidArgument), Message: ""},
364+
expectedStatusCode: http.StatusInternalServerError,
365+
},
366+
{
367+
name: "Retryable GRPCError",
368+
encoding: "",
369+
err: status.New(codes.Unavailable, "").Err(),
370+
expectedStatus: &spb.Status{Code: int32(codes.Unavailable), Message: ""},
371+
expectedStatusCode: http.StatusServiceUnavailable,
330372
},
331373
}
332374
addr := testutil.GetAvailableLocalAddress(t)
@@ -345,7 +387,7 @@ func TestProtoHttp(t *testing.T) {
345387

346388
for _, dr := range generateDataRequests(t) {
347389
url := "http://" + addr + dr.path
348-
respBytes := doHTTPRequest(t, url, tt.encoding, "application/x-protobuf", dr.protoBytes, tt.err != nil)
390+
respBytes := doHTTPRequest(t, url, tt.encoding, "application/x-protobuf", dr.protoBytes, tt.expectedStatusCode)
349391
if tt.err == nil {
350392
tr := ptraceotlp.NewExportResponse()
351393
assert.NoError(t, tr.UnmarshalProto(respBytes))
@@ -356,7 +398,7 @@ func TestProtoHttp(t *testing.T) {
356398
if s, ok := status.FromError(tt.err); ok {
357399
assert.True(t, proto.Equal(errStatus, s.Proto()))
358400
} else {
359-
assert.True(t, proto.Equal(errStatus, &spb.Status{Code: int32(codes.Unavailable), Message: "my error"}))
401+
assert.True(t, proto.Equal(errStatus, tt.expectedStatus))
360402
}
361403
sink.checkData(t, dr.data, 0)
362404
}
@@ -560,20 +602,30 @@ func TestOTLPReceiverGRPCTracesIngestTest(t *testing.T) {
560602
// trace receiver.
561603
func TestOTLPReceiverHTTPTracesIngestTest(t *testing.T) {
562604
type ingestionStateTest struct {
563-
okToIngest bool
564-
expectedCode codes.Code
605+
okToIngest bool
606+
err error
607+
expectedCode codes.Code
608+
expectedStatusCode int
565609
}
566610

567611
expectedReceivedBatches := 2
568-
expectedIngestionBlockedRPCs := 1
612+
expectedIngestionBlockedRPCs := 2
569613
ingestionStates := []ingestionStateTest{
570614
{
571615
okToIngest: true,
572616
expectedCode: codes.OK,
573617
},
574618
{
575-
okToIngest: false,
576-
expectedCode: codes.Unavailable,
619+
okToIngest: false,
620+
err: consumererror.NewPermanent(errors.New("consumer error")),
621+
expectedCode: codes.InvalidArgument,
622+
expectedStatusCode: http.StatusInternalServerError,
623+
},
624+
{
625+
okToIngest: false,
626+
err: errors.New("consumer error"),
627+
expectedCode: codes.Unavailable,
628+
expectedStatusCode: http.StatusServiceUnavailable,
577629
},
578630
{
579631
okToIngest: true,
@@ -599,7 +651,7 @@ func TestOTLPReceiverHTTPTracesIngestTest(t *testing.T) {
599651
if ingestionState.okToIngest {
600652
sink.SetConsumeError(nil)
601653
} else {
602-
sink.SetConsumeError(errors.New("consumer error"))
654+
sink.SetConsumeError(ingestionState.err)
603655
}
604656

605657
pbMarshaler := ptrace.ProtoMarshaler{}
@@ -620,7 +672,7 @@ func TestOTLPReceiverHTTPTracesIngestTest(t *testing.T) {
620672
} else {
621673
errStatus := &spb.Status{}
622674
assert.NoError(t, proto.Unmarshal(respBytes, errStatus))
623-
assert.Equal(t, http.StatusInternalServerError, resp.StatusCode)
675+
assert.Equal(t, ingestionState.expectedStatusCode, resp.StatusCode)
624676
assert.Equal(t, ingestionState.expectedCode, codes.Code(errStatus.Code))
625677
}
626678
}
@@ -853,7 +905,7 @@ func doHTTPRequest(
853905
encoding string,
854906
contentType string,
855907
data []byte,
856-
expectErr bool,
908+
expectStatusCode int,
857909
) []byte {
858910
req := createHTTPRequest(t, url, encoding, contentType, data)
859911
resp, err := http.DefaultClient.Do(req)
@@ -866,10 +918,10 @@ func doHTTPRequest(
866918
// For cases like "application/json; charset=utf-8", the response will be only "application/json"
867919
require.True(t, strings.HasPrefix(strings.ToLower(contentType), resp.Header.Get("Content-Type")))
868920

869-
if !expectErr {
921+
if expectStatusCode == 0 {
870922
require.Equal(t, http.StatusOK, resp.StatusCode)
871923
} else {
872-
require.Equal(t, http.StatusInternalServerError, resp.StatusCode)
924+
require.Equal(t, expectStatusCode, resp.StatusCode)
873925
}
874926

875927
return respBytes

receiver/otlpreceiver/otlphttp.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"mime"
1010
"net/http"
1111

12+
"go.opentelemetry.io/collector/receiver/otlpreceiver/internal/errors"
1213
spb "google.golang.org/genproto/googleapis/rpc/status"
1314
"google.golang.org/grpc/codes"
1415
"google.golang.org/grpc/status"
@@ -42,7 +43,7 @@ func handleTraces(resp http.ResponseWriter, req *http.Request, tracesReceiver *t
4243

4344
otlpResp, err := tracesReceiver.Export(req.Context(), otlpReq)
4445
if err != nil {
45-
writeError(resp, enc, err, http.StatusInternalServerError)
46+
writeError(resp, enc, err, errors.GetHTTPStatusCodeFromStatus(err))
4647
return
4748
}
4849

@@ -73,7 +74,7 @@ func handleMetrics(resp http.ResponseWriter, req *http.Request, metricsReceiver
7374

7475
otlpResp, err := metricsReceiver.Export(req.Context(), otlpReq)
7576
if err != nil {
76-
writeError(resp, enc, err, http.StatusInternalServerError)
77+
writeError(resp, enc, err, errors.GetHTTPStatusCodeFromStatus(err))
7778
return
7879
}
7980

@@ -104,7 +105,7 @@ func handleLogs(resp http.ResponseWriter, req *http.Request, logsReceiver *logs.
104105

105106
otlpResp, err := logsReceiver.Export(req.Context(), otlpReq)
106107
if err != nil {
107-
writeError(resp, enc, err, http.StatusInternalServerError)
108+
writeError(resp, enc, err, errors.GetHTTPStatusCodeFromStatus(err))
108109
return
109110
}
110111

0 commit comments

Comments
 (0)