Skip to content

Proposed "snappy" vs "x-snappy-framed" content-type confusion #12825

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions .chloggen/snappy-done-right.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely not enhancement, this is a complete breaking change.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we keep it as enhancement if we switch the strategy to using feature flags?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would no longer be a breaking change if we delay the client changes (unless there are bugs in the protocol detection).


# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: confighttp

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Fix handling of `snappy` content-encoding in a backwards-compatible way"

# One or more tracking issues or pull requests related to the change
issues: [10584, 12825]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
The collector used the Snappy compression type of "framed" to handle the HTTP
content-encoding "snappy". However, this encoding is typically used to indicate
the "block" compression variant of "snappy". This change allows the collector to:
- The server endpoints will now accept "x-snappy-framed" as a valid
content-encoding.
- Client compression type "snappy" will now compress to the "block" variant of snappy
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please don't do this, since as @jpkrohling explained, this will break existing servers that are not updated.

First we need to add support for both on both server/client. The "hack" for the server can also go now. Then wait for few versions until you can change the client.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What specific behavior do we want here? I tried to avoid breaking receiving as right now that felt like the pain point. Should we add x-snappy-framed as a type first, and then at some point later switch the compression to use non-framed for "snappy"?

If I changed the PR to do this, would this satisfy the path forward and start the ball rolling?

Copy link
Contributor

@jade-guiton-dd jade-guiton-dd Apr 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your PR avoids breakage in the situation where the receiver is updated but not the sender (and that's great!), but not in the opposite scenario: if the sender is updated but not the receiver, the sender will switch to block mode, and the receiver will not accept it. Of course, the sender can "just" update their config to explicitly switch back to framed mode, but this could still be a source of breakage.

Bogdan's suggestion is to add the protocol detection in the receiver (and the explicit framed mode) now, but to wait a few versions before switching "snappy" to use the block mode. This means that breakage will only occur if the sender gets both updates and the receiver gets none, which is less likely.

I think one additional possibility, to ease the transition and minimize how much code change needs to be postponed, would be to put the switch of snappy from framed to block mode behind an Alpha (off by default) feature gate, and add a warning log in confighttp when "snappy" is used with the gate disabled. Then later, we will only need to switch it to Beta (this is where the breaking change would be). What do you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this suggestion. For the components that need to use snappy encoding/decoding correctly, we just need to document that the new feature gate is required for them to work.

I don't see anybody else requiring this besides the prometheus remote-write receiver, which is not even an alpha component yet, so it sounds reasonable

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 with all the comments.

instead of "framed". If you want the old behavior, you can set the
compression type to "x-snappy-framed".
- When a server endpoint receives a content-encoding of "snappy", it will
look at the first bytes of the payload to determine if it is "framed" or "block" snappy,
and will decompress accordingly. This is a backwards-compatible change.
- In a future release, this checking behavior may be removed.

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
2 changes: 2 additions & 0 deletions config/configcompression/compressiontype.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ const (
TypeZlib Type = "zlib"
TypeDeflate Type = "deflate"
TypeSnappy Type = "snappy"
TypeSnappyFramed Type = "x-snappy-framed"
TypeZstd Type = "zstd"
TypeLz4 Type = "lz4"
typeNone Type = "none"
Expand All @@ -41,6 +42,7 @@ func (ct *Type) UnmarshalText(in []byte) error {
typ == TypeZlib ||
typ == TypeDeflate ||
typ == TypeSnappy ||
typ == TypeSnappyFramed ||
typ == TypeZstd ||
typ == TypeLz4 ||
typ == typeNone ||
Expand Down
17 changes: 17 additions & 0 deletions config/configcompression/compressiontype_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ func TestUnmarshalText(t *testing.T) {
shouldError: false,
isCompressed: true,
},
{
name: "ValidSnappyFramed",
compressionName: []byte("x-snappy-framed"),
shouldError: false,
isCompressed: true,
},
{
name: "ValidZstd",
compressionName: []byte("zstd"),
Expand Down Expand Up @@ -128,6 +134,17 @@ func TestValidateParams(t *testing.T) {
compressionLevel: 1,
shouldError: true,
},
{
name: "ValidSnappyFramed",
compressionName: []byte("x-snappy-framed"),
shouldError: false,
},
{
name: "InvalidSnappyFramed",
compressionName: []byte("x-snappy-framed"),
compressionLevel: 1,
shouldError: true,
},
{
name: "ValidZstd",
compressionName: []byte("zstd"),
Expand Down
44 changes: 43 additions & 1 deletion config/confighttp/compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@
package confighttp // import "go.opentelemetry.io/collector/config/confighttp"

import (
"bufio"
"bytes"
"compress/gzip"
"compress/zlib"
"errors"
"fmt"
"io"
"net/http"
Expand All @@ -27,6 +29,45 @@
compressor *compressor
}

// snappyFramingHeader is always the first 10 bytes of a snappy framed stream.
var snappyFramingHeader = []byte{
0xff, 0x06, 0x00, 0x00,
0x73, 0x4e, 0x61, 0x50, 0x70, 0x59, // "sNaPpY"
}

// snappyHandler returns an io.ReadCloser that auto-detects the snappy format.
// This is necessary because the collector previously used "content-encoding: snappy"
// but decompressed and compressed the payloads using the snappy framing format.
// However, "content-encoding: snappy" is uses the block format, and "x-snappy-framed"
// is the framing format. This handler is a (hopefully temporary) hack to
// make this work in a backwards-compatible way.
func snappyHandler(body io.ReadCloser) (io.ReadCloser, error) {
br := bufio.NewReader(body)

peekBytes, err := br.Peek(len(snappyFramingHeader))
if err != nil && !errors.Is(err, io.EOF) {
return nil, err
}

Check warning on line 50 in config/confighttp/compression.go

View check run for this annotation

Codecov / codecov/patch

config/confighttp/compression.go#L49-L50

Added lines #L49 - L50 were not covered by tests

isFramed := len(peekBytes) >= len(snappyFramingHeader) && bytes.Equal(peekBytes[:len(snappyFramingHeader)], snappyFramingHeader)

if isFramed {
return &compressReadCloser{
Reader: snappy.NewReader(br),
orig: body,
}, nil
}
compressed, err := io.ReadAll(br)
if err != nil {
return nil, err
}

Check warning on line 63 in config/confighttp/compression.go

View check run for this annotation

Codecov / codecov/patch

config/confighttp/compression.go#L62-L63

Added lines #L62 - L63 were not covered by tests
decoded, err := snappy.Decode(nil, compressed)
if err != nil {
return nil, err
}
return io.NopCloser(bytes.NewReader(decoded)), nil
}

var availableDecoders = map[string]func(body io.ReadCloser) (io.ReadCloser, error){
"": func(io.ReadCloser) (io.ReadCloser, error) {
// Not a compressed payload. Nothing to do.
Expand Down Expand Up @@ -60,8 +101,9 @@
}
return zr, nil
},
"snappy": snappyHandler,
//nolint:unparam // Ignoring the linter request to remove error return since it needs to match the method signature
"snappy": func(body io.ReadCloser) (io.ReadCloser, error) {
"x-snappy-framed": func(body io.ReadCloser) (io.ReadCloser, error) {
// Lazy Reading content to improve memory efficiency
return &compressReadCloser{
Reader: snappy.NewReader(body),
Expand Down
52 changes: 49 additions & 3 deletions config/confighttp/compression_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func TestHTTPClientCompression(t *testing.T) {
compressedGzipBody := compressGzip(t, testBody)
compressedZlibBody := compressZlib(t, testBody)
compressedDeflateBody := compressZlib(t, testBody)
compressedSnappyFramedBody := compressSnappyFramed(t, testBody)
compressedSnappyBody := compressSnappy(t, testBody)
compressedZstdBody := compressZstd(t, testBody)
compressedLz4Body := compressLz4(t, testBody)
Expand Down Expand Up @@ -111,6 +112,19 @@ func TestHTTPClientCompression(t *testing.T) {
reqBody: compressedSnappyBody.Bytes(),
shouldError: true,
},
{
name: "ValidSnappyFramed",
encoding: configcompression.TypeSnappyFramed,
reqBody: compressedSnappyFramedBody.Bytes(),
shouldError: false,
},
{
name: "InvalidSnappyFramed",
encoding: configcompression.TypeSnappyFramed,
level: gzip.DefaultCompression,
reqBody: compressedSnappyFramedBody.Bytes(),
shouldError: true,
},
{
name: "ValidZstd",
encoding: configcompression.TypeZstd,
Expand Down Expand Up @@ -250,12 +264,24 @@ func TestHTTPContentDecompressionHandler(t *testing.T) {
reqBody: compressZstd(t, testBody),
respCode: http.StatusOK,
},
{
name: "ValidSnappyFramed",
encoding: "x-snappy-framed",
reqBody: compressSnappyFramed(t, testBody),
respCode: http.StatusOK,
},
{
name: "ValidSnappy",
encoding: "snappy",
reqBody: compressSnappy(t, testBody),
respCode: http.StatusOK,
},
{
name: "ValidSnappyFramedAsSnappy",
encoding: "snappy",
reqBody: compressSnappyFramed(t, testBody),
respCode: http.StatusOK,
},
{
name: "ValidLz4",
encoding: "lz4",
Expand Down Expand Up @@ -290,12 +316,19 @@ func TestHTTPContentDecompressionHandler(t *testing.T) {
respCode: http.StatusBadRequest,
respBody: "invalid input: magic number mismatch",
},
{
name: "InvalidSnappyFramed",
encoding: "x-snappy-framed",
reqBody: bytes.NewBuffer(testBody),
respCode: http.StatusBadRequest,
respBody: "snappy: corrupt input",
},
{
name: "InvalidSnappy",
encoding: "snappy",
reqBody: bytes.NewBuffer(testBody),
respCode: http.StatusBadRequest,
respBody: "snappy: corrupt input",
respBody: "snappy: corrupt input\n",
},
{
name: "UnsupportedCompression",
Expand Down Expand Up @@ -415,7 +448,7 @@ func TestOverrideCompressionList(t *testing.T) {
}), defaultMaxRequestBodySize, defaultErrorHandler, configuredDecoders, nil))
t.Cleanup(srv.Close)

req, err := http.NewRequest(http.MethodGet, srv.URL, compressSnappy(t, []byte("123decompressed body")))
req, err := http.NewRequest(http.MethodGet, srv.URL, compressSnappyFramed(t, []byte("123decompressed body")))
require.NoError(t, err, "failed to create request to test handler")
req.Header.Set("Content-Encoding", "snappy")

Expand Down Expand Up @@ -456,6 +489,11 @@ func TestDecompressorAvoidDecompressionBomb(t *testing.T) {
encoding: "zlib",
compress: compressZlib,
},
{
name: "x-snappy-framed",
encoding: "x-snappy-framed",
compress: compressSnappyFramed,
},
{
name: "snappy",
encoding: "snappy",
Expand Down Expand Up @@ -517,7 +555,7 @@ func compressZlib(tb testing.TB, body []byte) *bytes.Buffer {
return &buf
}

func compressSnappy(tb testing.TB, body []byte) *bytes.Buffer {
func compressSnappyFramed(tb testing.TB, body []byte) *bytes.Buffer {
var buf bytes.Buffer
sw := snappy.NewBufferedWriter(&buf)
_, err := sw.Write(body)
Expand All @@ -526,6 +564,14 @@ func compressSnappy(tb testing.TB, body []byte) *bytes.Buffer {
return &buf
}

func compressSnappy(tb testing.TB, body []byte) *bytes.Buffer {
var buf bytes.Buffer
compressed := snappy.Encode(nil, body)
_, err := buf.Write(compressed)
require.NoError(tb, err)
return &buf
}

func compressZstd(tb testing.TB, body []byte) *bytes.Buffer {
var buf bytes.Buffer
compression := zstd.SpeedFastest
Expand Down
40 changes: 39 additions & 1 deletion config/confighttp/compressor.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,14 @@
w, _ := gzip.NewWriterLevel(nil, int(compressionParams.Level))
return w
}, nil
case configcompression.TypeSnappy:
case configcompression.TypeSnappyFramed:
return func() writeCloserReset {
return snappy.NewBufferedWriter(nil)
}, nil
case configcompression.TypeSnappy:
return func() writeCloserReset {
return &rawSnappyWriter{}
}, nil
case configcompression.TypeZstd:
level := zstd.WithEncoderLevel(zstd.EncoderLevelFromZstd(int(compressionParams.Level)))
return func() writeCloserReset {
Expand Down Expand Up @@ -111,3 +115,37 @@

return writer.Close()
}

// rawSnappyWriter buffers all writes and, on Close,
// compresses the data as a raw snappy block (non-framed)
// and writes the compressed bytes to the underlying writer.
type rawSnappyWriter struct {
buffer bytes.Buffer
w io.Writer
closed bool
}

// Write buffers the data.
func (w *rawSnappyWriter) Write(p []byte) (int, error) {
return w.buffer.Write(p)
}

// Close compresses the buffered data in one shot using snappy.Encode,
// writes the compressed block to the underlying writer, and marks the writer as closed.
func (w *rawSnappyWriter) Close() error {
if w.closed {
return nil
}

Check warning on line 138 in config/confighttp/compressor.go

View check run for this annotation

Codecov / codecov/patch

config/confighttp/compressor.go#L137-L138

Added lines #L137 - L138 were not covered by tests
w.closed = true
// Compress the buffered uncompressed bytes.
compressed := snappy.Encode(nil, w.buffer.Bytes())
_, err := w.w.Write(compressed)
return err
}

// Reset sets a new underlying writer, resets the buffer and the closed flag.
func (w *rawSnappyWriter) Reset(newWriter io.Writer) {
w.buffer.Reset()
w.w = newWriter
w.closed = false
}
2 changes: 1 addition & 1 deletion config/confighttp/confighttp.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ const (
defaultMaxRequestBodySize = 20 * 1024 * 1024 // 20MiB
)

var defaultCompressionAlgorithms = []string{"", "gzip", "zstd", "zlib", "snappy", "deflate", "lz4"}
var defaultCompressionAlgorithms = []string{"", "gzip", "zstd", "zlib", "snappy", "deflate", "lz4", "x-snappy-framed"}

// ClientConfig defines settings for creating an HTTP client.
type ClientConfig struct {
Expand Down
Loading