diff --git a/.chloggen/snappy-done-right.yaml b/.chloggen/snappy-done-right.yaml new file mode 100644 index 00000000000..08598f40703 --- /dev/null +++ b/.chloggen/snappy-done-right.yaml @@ -0,0 +1,37 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: confighttp + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: "Fix handling of `snappy` content-encoding in a backwards-compatible way" + +# One or more tracking issues or pull requests related to the change +issues: [10584, 12825] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + The collector used the Snappy compression type of "framed" to handle the HTTP + content-encoding "snappy". However, this encoding is typically used to indicate + the "block" compression variant of "snappy". This change allows the collector to: + - The server endpoints will now accept "x-snappy-framed" as a valid + content-encoding. + - Client compression type "snappy" will now compress to the "block" variant of snappy + instead of "framed". If you want the old behavior, you can set the + compression type to "x-snappy-framed". + - When a server endpoint receives a content-encoding of "snappy", it will + look at the first bytes of the payload to determine if it is "framed" or "block" snappy, + and will decompress accordingly. This is a backwards-compatible change. + - In a future release, this checking behavior may be removed. + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/config/configcompression/compressiontype.go b/config/configcompression/compressiontype.go index d2bc4b16d5b..a6588dca720 100644 --- a/config/configcompression/compressiontype.go +++ b/config/configcompression/compressiontype.go @@ -22,6 +22,7 @@ const ( TypeZlib Type = "zlib" TypeDeflate Type = "deflate" TypeSnappy Type = "snappy" + TypeSnappyFramed Type = "x-snappy-framed" TypeZstd Type = "zstd" TypeLz4 Type = "lz4" typeNone Type = "none" @@ -41,6 +42,7 @@ func (ct *Type) UnmarshalText(in []byte) error { typ == TypeZlib || typ == TypeDeflate || typ == TypeSnappy || + typ == TypeSnappyFramed || typ == TypeZstd || typ == TypeLz4 || typ == typeNone || diff --git a/config/configcompression/compressiontype_test.go b/config/configcompression/compressiontype_test.go index 2fb9976eb4e..8225c880639 100644 --- a/config/configcompression/compressiontype_test.go +++ b/config/configcompression/compressiontype_test.go @@ -42,6 +42,12 @@ func TestUnmarshalText(t *testing.T) { shouldError: false, isCompressed: true, }, + { + name: "ValidSnappyFramed", + compressionName: []byte("x-snappy-framed"), + shouldError: false, + isCompressed: true, + }, { name: "ValidZstd", compressionName: []byte("zstd"), @@ -128,6 +134,17 @@ func TestValidateParams(t *testing.T) { compressionLevel: 1, shouldError: true, }, + { + name: "ValidSnappyFramed", + compressionName: []byte("x-snappy-framed"), + shouldError: false, + }, + { + name: "InvalidSnappyFramed", + compressionName: []byte("x-snappy-framed"), + compressionLevel: 1, + shouldError: true, + }, { name: "ValidZstd", compressionName: []byte("zstd"), diff --git a/config/confighttp/compression.go b/config/confighttp/compression.go index 62638e0c7be..e6de8e1b0c9 100644 --- a/config/confighttp/compression.go +++ b/config/confighttp/compression.go @@ -6,9 +6,11 @@ package confighttp // import "go.opentelemetry.io/collector/config/confighttp" import ( + "bufio" "bytes" "compress/gzip" "compress/zlib" + "errors" "fmt" "io" "net/http" @@ -27,6 +29,45 @@ type compressRoundTripper struct { compressor *compressor } +// snappyFramingHeader is always the first 10 bytes of a snappy framed stream. +var snappyFramingHeader = []byte{ + 0xff, 0x06, 0x00, 0x00, + 0x73, 0x4e, 0x61, 0x50, 0x70, 0x59, // "sNaPpY" +} + +// snappyHandler returns an io.ReadCloser that auto-detects the snappy format. +// This is necessary because the collector previously used "content-encoding: snappy" +// but decompressed and compressed the payloads using the snappy framing format. +// However, "content-encoding: snappy" is uses the block format, and "x-snappy-framed" +// is the framing format. This handler is a (hopefully temporary) hack to +// make this work in a backwards-compatible way. +func snappyHandler(body io.ReadCloser) (io.ReadCloser, error) { + br := bufio.NewReader(body) + + peekBytes, err := br.Peek(len(snappyFramingHeader)) + if err != nil && !errors.Is(err, io.EOF) { + return nil, err + } + + isFramed := len(peekBytes) >= len(snappyFramingHeader) && bytes.Equal(peekBytes[:len(snappyFramingHeader)], snappyFramingHeader) + + if isFramed { + return &compressReadCloser{ + Reader: snappy.NewReader(br), + orig: body, + }, nil + } + compressed, err := io.ReadAll(br) + if err != nil { + return nil, err + } + decoded, err := snappy.Decode(nil, compressed) + if err != nil { + return nil, err + } + return io.NopCloser(bytes.NewReader(decoded)), nil +} + var availableDecoders = map[string]func(body io.ReadCloser) (io.ReadCloser, error){ "": func(io.ReadCloser) (io.ReadCloser, error) { // Not a compressed payload. Nothing to do. @@ -60,8 +101,9 @@ var availableDecoders = map[string]func(body io.ReadCloser) (io.ReadCloser, erro } return zr, nil }, + "snappy": snappyHandler, //nolint:unparam // Ignoring the linter request to remove error return since it needs to match the method signature - "snappy": func(body io.ReadCloser) (io.ReadCloser, error) { + "x-snappy-framed": func(body io.ReadCloser) (io.ReadCloser, error) { // Lazy Reading content to improve memory efficiency return &compressReadCloser{ Reader: snappy.NewReader(body), diff --git a/config/confighttp/compression_test.go b/config/confighttp/compression_test.go index 4c41e8085ff..b3e7f950b7b 100644 --- a/config/confighttp/compression_test.go +++ b/config/confighttp/compression_test.go @@ -32,6 +32,7 @@ func TestHTTPClientCompression(t *testing.T) { compressedGzipBody := compressGzip(t, testBody) compressedZlibBody := compressZlib(t, testBody) compressedDeflateBody := compressZlib(t, testBody) + compressedSnappyFramedBody := compressSnappyFramed(t, testBody) compressedSnappyBody := compressSnappy(t, testBody) compressedZstdBody := compressZstd(t, testBody) compressedLz4Body := compressLz4(t, testBody) @@ -111,6 +112,19 @@ func TestHTTPClientCompression(t *testing.T) { reqBody: compressedSnappyBody.Bytes(), shouldError: true, }, + { + name: "ValidSnappyFramed", + encoding: configcompression.TypeSnappyFramed, + reqBody: compressedSnappyFramedBody.Bytes(), + shouldError: false, + }, + { + name: "InvalidSnappyFramed", + encoding: configcompression.TypeSnappyFramed, + level: gzip.DefaultCompression, + reqBody: compressedSnappyFramedBody.Bytes(), + shouldError: true, + }, { name: "ValidZstd", encoding: configcompression.TypeZstd, @@ -250,12 +264,24 @@ func TestHTTPContentDecompressionHandler(t *testing.T) { reqBody: compressZstd(t, testBody), respCode: http.StatusOK, }, + { + name: "ValidSnappyFramed", + encoding: "x-snappy-framed", + reqBody: compressSnappyFramed(t, testBody), + respCode: http.StatusOK, + }, { name: "ValidSnappy", encoding: "snappy", reqBody: compressSnappy(t, testBody), respCode: http.StatusOK, }, + { + name: "ValidSnappyFramedAsSnappy", + encoding: "snappy", + reqBody: compressSnappyFramed(t, testBody), + respCode: http.StatusOK, + }, { name: "ValidLz4", encoding: "lz4", @@ -290,12 +316,19 @@ func TestHTTPContentDecompressionHandler(t *testing.T) { respCode: http.StatusBadRequest, respBody: "invalid input: magic number mismatch", }, + { + name: "InvalidSnappyFramed", + encoding: "x-snappy-framed", + reqBody: bytes.NewBuffer(testBody), + respCode: http.StatusBadRequest, + respBody: "snappy: corrupt input", + }, { name: "InvalidSnappy", encoding: "snappy", reqBody: bytes.NewBuffer(testBody), respCode: http.StatusBadRequest, - respBody: "snappy: corrupt input", + respBody: "snappy: corrupt input\n", }, { name: "UnsupportedCompression", @@ -415,7 +448,7 @@ func TestOverrideCompressionList(t *testing.T) { }), defaultMaxRequestBodySize, defaultErrorHandler, configuredDecoders, nil)) t.Cleanup(srv.Close) - req, err := http.NewRequest(http.MethodGet, srv.URL, compressSnappy(t, []byte("123decompressed body"))) + req, err := http.NewRequest(http.MethodGet, srv.URL, compressSnappyFramed(t, []byte("123decompressed body"))) require.NoError(t, err, "failed to create request to test handler") req.Header.Set("Content-Encoding", "snappy") @@ -456,6 +489,11 @@ func TestDecompressorAvoidDecompressionBomb(t *testing.T) { encoding: "zlib", compress: compressZlib, }, + { + name: "x-snappy-framed", + encoding: "x-snappy-framed", + compress: compressSnappyFramed, + }, { name: "snappy", encoding: "snappy", @@ -517,7 +555,7 @@ func compressZlib(tb testing.TB, body []byte) *bytes.Buffer { return &buf } -func compressSnappy(tb testing.TB, body []byte) *bytes.Buffer { +func compressSnappyFramed(tb testing.TB, body []byte) *bytes.Buffer { var buf bytes.Buffer sw := snappy.NewBufferedWriter(&buf) _, err := sw.Write(body) @@ -526,6 +564,14 @@ func compressSnappy(tb testing.TB, body []byte) *bytes.Buffer { return &buf } +func compressSnappy(tb testing.TB, body []byte) *bytes.Buffer { + var buf bytes.Buffer + compressed := snappy.Encode(nil, body) + _, err := buf.Write(compressed) + require.NoError(tb, err) + return &buf +} + func compressZstd(tb testing.TB, body []byte) *bytes.Buffer { var buf bytes.Buffer compression := zstd.SpeedFastest diff --git a/config/confighttp/compressor.go b/config/confighttp/compressor.go index 750bda5795d..5cc9e6f681f 100644 --- a/config/confighttp/compressor.go +++ b/config/confighttp/compressor.go @@ -66,10 +66,14 @@ func newWriteCloserResetFunc(compressionType configcompression.Type, compression w, _ := gzip.NewWriterLevel(nil, int(compressionParams.Level)) return w }, nil - case configcompression.TypeSnappy: + case configcompression.TypeSnappyFramed: return func() writeCloserReset { return snappy.NewBufferedWriter(nil) }, nil + case configcompression.TypeSnappy: + return func() writeCloserReset { + return &rawSnappyWriter{} + }, nil case configcompression.TypeZstd: level := zstd.WithEncoderLevel(zstd.EncoderLevelFromZstd(int(compressionParams.Level))) return func() writeCloserReset { @@ -111,3 +115,37 @@ func (p *compressor) compress(buf *bytes.Buffer, body io.ReadCloser) error { return writer.Close() } + +// rawSnappyWriter buffers all writes and, on Close, +// compresses the data as a raw snappy block (non-framed) +// and writes the compressed bytes to the underlying writer. +type rawSnappyWriter struct { + buffer bytes.Buffer + w io.Writer + closed bool +} + +// Write buffers the data. +func (w *rawSnappyWriter) Write(p []byte) (int, error) { + return w.buffer.Write(p) +} + +// Close compresses the buffered data in one shot using snappy.Encode, +// writes the compressed block to the underlying writer, and marks the writer as closed. +func (w *rawSnappyWriter) Close() error { + if w.closed { + return nil + } + w.closed = true + // Compress the buffered uncompressed bytes. + compressed := snappy.Encode(nil, w.buffer.Bytes()) + _, err := w.w.Write(compressed) + return err +} + +// Reset sets a new underlying writer, resets the buffer and the closed flag. +func (w *rawSnappyWriter) Reset(newWriter io.Writer) { + w.buffer.Reset() + w.w = newWriter + w.closed = false +} diff --git a/config/confighttp/confighttp.go b/config/confighttp/confighttp.go index 43193c957ee..ea926c6495a 100644 --- a/config/confighttp/confighttp.go +++ b/config/confighttp/confighttp.go @@ -37,7 +37,7 @@ const ( defaultMaxRequestBodySize = 20 * 1024 * 1024 // 20MiB ) -var defaultCompressionAlgorithms = []string{"", "gzip", "zstd", "zlib", "snappy", "deflate", "lz4"} +var defaultCompressionAlgorithms = []string{"", "gzip", "zstd", "zlib", "snappy", "deflate", "lz4", "x-snappy-framed"} // ClientConfig defines settings for creating an HTTP client. type ClientConfig struct {