Skip to content

Commit d5ff217

Browse files
committed
Backport changes to confighttp and configgrpc
Upstream changes to address potential decompression-related failures in the `confighttp` and `configgrpc` modules are backported. See open-telemetry/opentelemetry-collector#10289 and open-telemetry/opentelemetry-collector#10323 for more details. Signed-off-by: Anthony J Mirabella <[email protected]>
1 parent 7449277 commit d5ff217

File tree

3 files changed

+199
-1
lines changed

3 files changed

+199
-1
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ for-all-target: $(GOMODULES)
5353

5454
PATCHES := $(shell find ./patches -name *.patch)
5555
apply-patches: $(PATCHES)
56-
$(foreach patch,$(PATCHES), patch --posix --forward -p1 < $(patch);)
56+
$(foreach patch,$(PATCHES), patch -V none --forward -p1 < $(patch);)
5757

5858
.PHONY: apply-patches
5959

patches/configgrpc.patch

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
diff --git a/vendor/go.opentelemetry.io/collector/config/configgrpc/configgrpc.go b/vendor/go.opentelemetry.io/collector/config/configgrpc/configgrpc.go
2+
index 87e7b83d7..e64b87142 100644
3+
--- a/vendor/go.opentelemetry.io/collector/config/configgrpc/configgrpc.go
4+
+++ b/vendor/go.opentelemetry.io/collector/config/configgrpc/configgrpc.go
5+
@@ -12,7 +12,6 @@ import (
6+
"time"
7+
8+
"github.com/mostynb/go-grpc-compression/nonclobbering/snappy"
9+
- "github.com/mostynb/go-grpc-compression/nonclobbering/zstd"
10+
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
11+
"go.opentelemetry.io/otel"
12+
"google.golang.org/grpc"
13+
@@ -28,6 +27,7 @@ import (
14+
"go.opentelemetry.io/collector/component"
15+
"go.opentelemetry.io/collector/config/configauth"
16+
"go.opentelemetry.io/collector/config/configcompression"
17+
+ grpcInternal "go.opentelemetry.io/collector/config/configgrpc/internal"
18+
"go.opentelemetry.io/collector/config/confignet"
19+
"go.opentelemetry.io/collector/config/configopaque"
20+
"go.opentelemetry.io/collector/config/configtelemetry"
21+
@@ -426,7 +426,7 @@ func getGRPCCompressionName(compressionType configcompression.Type) (string, err
22+
case configcompression.TypeSnappy:
23+
return snappy.Name, nil
24+
case configcompression.TypeZstd:
25+
- return zstd.Name, nil
26+
+ return grpcInternal.ZstdName, nil
27+
default:
28+
return "", fmt.Errorf("unsupported compression type %q", compressionType)
29+
}
30+
diff --git /dev/null b/vendor/go.opentelemetry.io/collector/config/configgrpc/internal/zstd.go
31+
new file mode 100644
32+
index 000000000..0718b7353
33+
--- /dev/null
34+
+++ b/vendor/go.opentelemetry.io/collector/config/configgrpc/internal/zstd.go
35+
@@ -0,0 +1,83 @@
36+
+// Copyright The OpenTelemetry Authors
37+
+// Copyright 2017 gRPC authors
38+
+// SPDX-License-Identifier: Apache-2.0
39+
+
40+
+package internal // import "go.opentelemetry.io/collector/config/configgrpc/internal"
41+
+
42+
+import (
43+
+ "errors"
44+
+ "io"
45+
+ "sync"
46+
+
47+
+ "github.com/klauspost/compress/zstd"
48+
+ "google.golang.org/grpc/encoding"
49+
+)
50+
+
51+
+const ZstdName = "zstd"
52+
+
53+
+func init() {
54+
+ encoding.RegisterCompressor(NewZstdCodec())
55+
+}
56+
+
57+
+type writer struct {
58+
+ *zstd.Encoder
59+
+ pool *sync.Pool
60+
+}
61+
+
62+
+func NewZstdCodec() encoding.Compressor {
63+
+ c := &compressor{}
64+
+ c.poolCompressor.New = func() any {
65+
+ zw, _ := zstd.NewWriter(nil, zstd.WithEncoderConcurrency(1), zstd.WithWindowSize(512*1024))
66+
+ return &writer{Encoder: zw, pool: &c.poolCompressor}
67+
+ }
68+
+ return c
69+
+}
70+
+
71+
+func (c *compressor) Compress(w io.Writer) (io.WriteCloser, error) {
72+
+ z := c.poolCompressor.Get().(*writer)
73+
+ z.Encoder.Reset(w)
74+
+ return z, nil
75+
+}
76+
+
77+
+func (z *writer) Close() error {
78+
+ defer z.pool.Put(z)
79+
+ return z.Encoder.Close()
80+
+}
81+
+
82+
+type reader struct {
83+
+ *zstd.Decoder
84+
+ pool *sync.Pool
85+
+}
86+
+
87+
+func (c *compressor) Decompress(r io.Reader) (io.Reader, error) {
88+
+ z, inPool := c.poolDecompressor.Get().(*reader)
89+
+ if !inPool {
90+
+ newZ, err := zstd.NewReader(r)
91+
+ if err != nil {
92+
+ return nil, err
93+
+ }
94+
+ return &reader{Decoder: newZ, pool: &c.poolDecompressor}, nil
95+
+ }
96+
+ if err := z.Reset(r); err != nil {
97+
+ c.poolDecompressor.Put(z)
98+
+ return nil, err
99+
+ }
100+
+ return z, nil
101+
+}
102+
+
103+
+func (z *reader) Read(p []byte) (n int, err error) {
104+
+ n, err = z.Decoder.Read(p)
105+
+ if errors.Is(err, io.EOF) {
106+
+ z.pool.Put(z)
107+
+ }
108+
+ return n, err
109+
+}
110+
+
111+
+func (c *compressor) Name() string {
112+
+ return ZstdName
113+
+}
114+
+
115+
+type compressor struct {
116+
+ poolCompressor sync.Pool
117+
+ poolDecompressor sync.Pool
118+
+}

patches/confighttp.patch

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
diff --git a/vendor/go.opentelemetry.io/collector/config/confighttp/compression.go b/vendor/go.opentelemetry.io/collector/config/confighttp/compression.go
2+
index 88ecafe78..a700bec84 100644
3+
--- a/vendor/go.opentelemetry.io/collector/config/confighttp/compression.go
4+
+++ b/vendor/go.opentelemetry.io/collector/config/confighttp/compression.go
5+
@@ -67,24 +67,26 @@ func (r *compressRoundTripper) RoundTrip(req *http.Request) (*http.Response, err
6+
}
7+
8+
type decompressor struct {
9+
- errHandler func(w http.ResponseWriter, r *http.Request, errorMsg string, statusCode int)
10+
- base http.Handler
11+
- decoders map[string]func(body io.ReadCloser) (io.ReadCloser, error)
12+
+ errHandler func(w http.ResponseWriter, r *http.Request, errorMsg string, statusCode int)
13+
+ base http.Handler
14+
+ decoders map[string]func(body io.ReadCloser) (io.ReadCloser, error)
15+
+ maxRequestBodySize int64
16+
}
17+
18+
// httpContentDecompressor offloads the task of handling compressed HTTP requests
19+
// by identifying the compression format in the "Content-Encoding" header and re-writing
20+
// request body so that the handlers further in the chain can work on decompressed data.
21+
// It supports gzip and deflate/zlib compression.
22+
-func httpContentDecompressor(h http.Handler, eh func(w http.ResponseWriter, r *http.Request, errorMsg string, statusCode int), decoders map[string]func(body io.ReadCloser) (io.ReadCloser, error)) http.Handler {
23+
+func httpContentDecompressor(h http.Handler, maxRequestBodySize int64, eh func(w http.ResponseWriter, r *http.Request, errorMsg string, statusCode int), decoders map[string]func(body io.ReadCloser) (io.ReadCloser, error)) http.Handler {
24+
errHandler := defaultErrorHandler
25+
if eh != nil {
26+
errHandler = eh
27+
}
28+
29+
d := &decompressor{
30+
- errHandler: errHandler,
31+
- base: h,
32+
+ maxRequestBodySize: maxRequestBodySize,
33+
+ errHandler: errHandler,
34+
+ base: h,
35+
decoders: map[string]func(body io.ReadCloser) (io.ReadCloser, error){
36+
"": func(io.ReadCloser) (io.ReadCloser, error) {
37+
// Not a compressed payload. Nothing to do.
38+
@@ -155,7 +157,7 @@ func (d *decompressor) ServeHTTP(w http.ResponseWriter, r *http.Request) {
39+
// "Content-Length" is set to -1 as the size of the decompressed body is unknown.
40+
r.Header.Del("Content-Length")
41+
r.ContentLength = -1
42+
- r.Body = newBody
43+
+ r.Body = http.MaxBytesReader(w, newBody, d.maxRequestBodySize)
44+
}
45+
d.base.ServeHTTP(w, r)
46+
}
47+
diff --git a/vendor/go.opentelemetry.io/collector/config/confighttp/confighttp.go b/vendor/go.opentelemetry.io/collector/config/confighttp/confighttp.go
48+
index b210fa0dd..71b2f17ee 100644
49+
--- a/vendor/go.opentelemetry.io/collector/config/confighttp/confighttp.go
50+
+++ b/vendor/go.opentelemetry.io/collector/config/confighttp/confighttp.go
51+
@@ -30,6 +30,7 @@ import (
52+
)
53+
54+
const headerContentEncoding = "Content-Encoding"
55+
+const defaultMaxRequestBodySize = 20 * 1024 * 1024 // 20MiB
56+
57+
// ClientConfig defines settings for creating an HTTP client.
58+
type ClientConfig struct {
59+
@@ -269,7 +270,7 @@ type ServerConfig struct {
60+
// Auth for this receiver
61+
Auth *configauth.Authentication `mapstructure:"auth"`
62+
63+
- // MaxRequestBodySize sets the maximum request body size in bytes
64+
+ // MaxRequestBodySize sets the maximum request body size in bytes. Default: 20MiB.
65+
MaxRequestBodySize int64 `mapstructure:"max_request_body_size"`
66+
67+
// IncludeMetadata propagates the client metadata from the incoming requests to the downstream consumers
68+
@@ -340,7 +341,11 @@ func (hss *ServerConfig) ToServer(_ context.Context, host component.Host, settin
69+
o(serverOpts)
70+
}
71+
72+
- handler = httpContentDecompressor(handler, serverOpts.errHandler, serverOpts.decoders)
73+
+ if hss.MaxRequestBodySize <= 0 {
74+
+ hss.MaxRequestBodySize = defaultMaxRequestBodySize
75+
+ }
76+
+
77+
+ handler = httpContentDecompressor(handler, hss.MaxRequestBodySize, serverOpts.errHandler, serverOpts.decoders)
78+
79+
if hss.MaxRequestBodySize > 0 {
80+
handler = maxRequestBodySizeInterceptor(handler, hss.MaxRequestBodySize)

0 commit comments

Comments
 (0)