Skip to content

Commit 32187be

Browse files
authored
Revert "What if we used zstd in our trace writers (#23806)" (#25278)
This reverts commit e496f86.
1 parent 65cc71c commit 32187be

File tree

4 files changed

+53
-98
lines changed

4 files changed

+53
-98
lines changed

pkg/trace/go.mod

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ require (
2020
github.com/DataDog/datadog-go/v5 v5.5.0
2121
github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes v0.14.0
2222
github.com/DataDog/sketches-go v1.4.2
23-
github.com/DataDog/zstd v1.5.5
2423
github.com/Microsoft/go-winio v0.6.1
2524
github.com/cihub/seelog v0.0.0-20170130134532-f561c5e57575
2625
github.com/davecgh/go-spew v1.1.1

pkg/trace/go.sum

Lines changed: 0 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/trace/writer/trace.go

Lines changed: 17 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ package writer
88
import (
99
"compress/gzip"
1010
"errors"
11-
"io"
1211
"runtime"
1312
"strings"
1413
"sync"
@@ -22,7 +21,6 @@ import (
2221
"github.com/DataDog/datadog-agent/pkg/trace/timing"
2322

2423
"github.com/DataDog/datadog-go/v5/statsd"
25-
"github.com/DataDog/zstd"
2624
)
2725

2826
// pathTraces is the target host API path for delivering traces.
@@ -71,7 +69,7 @@ type TraceWriter struct {
7169
senders []*sender
7270
stop chan struct{}
7371
stats *info.TraceWriterInfo
74-
wg sync.WaitGroup // waits for compressors
72+
wg sync.WaitGroup // waits for gzippers
7573
tick time.Duration // flush frequency
7674
agentVersion string
7775

@@ -87,7 +85,6 @@ type TraceWriter struct {
8785
easylog *log.ThrottledLogger
8886
statsd statsd.ClientInterface
8987
timing timing.Reporter
90-
useZstd bool
9188
}
9289

9390
// NewTraceWriter returns a new TraceWriter. It is created for the given agent configuration and
@@ -118,7 +115,6 @@ func NewTraceWriter(
118115
telemetryCollector: telemetryCollector,
119116
statsd: statsd,
120117
timing: timing,
121-
useZstd: cfg.HasFeature("zstd-encoding"),
122118
}
123119
climit := cfg.TraceWriter.ConnectionLimit
124120
if climit == 0 {
@@ -286,43 +282,25 @@ func (w *TraceWriter) serializer() {
286282
}
287283

288284
w.stats.BytesUncompressed.Add(int64(len(b)))
289-
var p *payload
290-
var writer io.WriteCloser
291-
292-
if w.useZstd {
293-
p = newPayload(map[string]string{
294-
"Content-Type": "application/x-protobuf",
295-
"Content-Encoding": "zstd",
296-
headerLanguages: strings.Join(info.Languages(), "|"),
297-
})
298-
299-
p.body.Grow(len(b) / 2)
300-
writer = zstd.NewWriterLevel(p.body, zstd.BestSpeed)
301-
302-
} else {
303-
p = newPayload(map[string]string{
304-
"Content-Type": "application/x-protobuf",
305-
"Content-Encoding": "gzip",
306-
headerLanguages: strings.Join(info.Languages(), "|"),
307-
})
308-
p.body.Grow(len(b) / 2)
309-
310-
writer, err = gzip.NewWriterLevel(p.body, gzip.BestSpeed)
311-
if err != nil {
312-
// it will never happen, unless an invalid compression is chosen;
313-
// we know gzip.BestSpeed is valid.
314-
log.Errorf("Failed to compress trace paylod: %s", err)
315-
return
316-
}
285+
p := newPayload(map[string]string{
286+
"Content-Type": "application/x-protobuf",
287+
"Content-Encoding": "gzip",
288+
headerLanguages: strings.Join(info.Languages(), "|"),
289+
})
290+
p.body.Grow(len(b) / 2)
291+
gzipw, err := gzip.NewWriterLevel(p.body, gzip.BestSpeed)
292+
if err != nil {
293+
// it will never happen, unless an invalid compression is chosen;
294+
// we know gzip.BestSpeed is valid.
295+
log.Errorf("gzip.NewWriterLevel: %d", err)
296+
return
317297
}
318-
319-
if _, err := writer.Write(b); err != nil {
320-
log.Errorf("Error compressing trace payload: %v", err)
298+
if _, err := gzipw.Write(b); err != nil {
299+
log.Errorf("Error gzipping trace payload: %v", err)
321300
}
322-
if err := writer.Close(); err != nil {
323-
log.Errorf("Error closing compressed stream when writing trace payload: %v", err)
301+
if err := gzipw.Close(); err != nil {
302+
log.Errorf("Error closing gzip stream when writing trace payload: %v", err)
324303
}
325-
326304
sendPayloads(w.senders, p, w.syncMode)
327305
}()
328306
}

pkg/trace/writer/trace_test.go

Lines changed: 36 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,11 @@ package writer
77

88
import (
99
"compress/gzip"
10-
"fmt"
1110
"io"
1211
"runtime"
1312
"sync"
1413
"testing"
1514

16-
"github.com/DataDog/zstd"
1715
"github.com/stretchr/testify/assert"
1816
"github.com/stretchr/testify/require"
1917
"google.golang.org/protobuf/proto"
@@ -44,45 +42,38 @@ func (s MockSampler) GetTargetTPS() float64 {
4442
var mockSampler = MockSampler{TargetTPS: 5, Enabled: true}
4543

4644
func TestTraceWriter(t *testing.T) {
47-
testCases := []bool{false, true}
45+
srv := newTestServer()
46+
cfg := &config.AgentConfig{
47+
Hostname: testHostname,
48+
DefaultEnv: testEnv,
49+
Endpoints: []*config.Endpoint{{
50+
APIKey: "123",
51+
Host: srv.URL,
52+
}},
53+
TraceWriter: &config.WriterConfig{ConnectionLimit: 200, QueueSize: 40},
54+
}
4855

49-
for _, tc := range testCases {
50-
srv := newTestServer()
51-
cfg := &config.AgentConfig{
52-
Hostname: testHostname,
53-
DefaultEnv: testEnv,
54-
Endpoints: []*config.Endpoint{{
55-
APIKey: "123",
56-
Host: srv.URL,
57-
}},
58-
TraceWriter: &config.WriterConfig{ConnectionLimit: 200, QueueSize: 40},
56+
t.Run("ok", func(t *testing.T) {
57+
testSpans := []*SampledChunks{
58+
randomSampledSpans(20, 8),
59+
randomSampledSpans(10, 0),
60+
randomSampledSpans(40, 5),
5961
}
60-
if tc {
61-
cfg.Features = map[string]struct{}{"zstd-encoding": {}}
62+
// Use a flush threshold that allows the first two entries to not overflow,
63+
// but overflow on the third.
64+
defer useFlushThreshold(testSpans[0].Size + testSpans[1].Size + 10)()
65+
tw := NewTraceWriter(cfg, mockSampler, mockSampler, mockSampler, telemetry.NewNoopCollector(), &statsd.NoOpClient{}, &timing.NoopReporter{})
66+
tw.In = make(chan *SampledChunks)
67+
go tw.Run()
68+
for _, ss := range testSpans {
69+
tw.In <- ss
6270
}
63-
64-
t.Run(fmt.Sprintf("zstd_%t", tc), func(t *testing.T) {
65-
testSpans := []*SampledChunks{
66-
randomSampledSpans(20, 8),
67-
randomSampledSpans(10, 0),
68-
randomSampledSpans(40, 5),
69-
}
70-
// Use a flush threshold that allows the first two entries to not overflow,
71-
// but overflow on the third.
72-
defer useFlushThreshold(testSpans[0].Size + testSpans[1].Size + 10)()
73-
tw := NewTraceWriter(cfg, mockSampler, mockSampler, mockSampler, telemetry.NewNoopCollector(), &statsd.NoOpClient{}, &timing.NoopReporter{})
74-
tw.In = make(chan *SampledChunks)
75-
go tw.Run()
76-
for _, ss := range testSpans {
77-
tw.In <- ss
78-
}
79-
tw.Stop()
80-
// One payload flushes due to overflowing the threshold, and the second one
81-
// because of stop.
82-
assert.Equal(t, 2, srv.Accepted())
83-
payloadsContain(t, srv.Payloads(), testSpans, tc)
84-
})
85-
}
71+
tw.Stop()
72+
// One payload flushes due to overflowing the threshold, and the second one
73+
// because of stop.
74+
assert.Equal(t, 2, srv.Accepted())
75+
payloadsContain(t, srv.Payloads(), testSpans)
76+
})
8677
}
8778

8879
func TestTraceWriterMultipleEndpointsConcurrent(t *testing.T) {
@@ -131,7 +122,7 @@ func TestTraceWriterMultipleEndpointsConcurrent(t *testing.T) {
131122

132123
wg.Wait()
133124
tw.Stop()
134-
payloadsContain(t, srv.Payloads(), testSpans, false)
125+
payloadsContain(t, srv.Payloads(), testSpans)
135126
}
136127

137128
// useFlushThreshold sets n as the number of bytes to be used as the flush threshold
@@ -154,25 +145,14 @@ func randomSampledSpans(spans, events int) *SampledChunks {
154145
}
155146

156147
// payloadsContain checks that the given payloads contain the given set of sampled spans.
157-
func payloadsContain(t *testing.T, payloads []*payload, sampledSpans []*SampledChunks, shouldUseZstd bool) {
148+
func payloadsContain(t *testing.T, payloads []*payload, sampledSpans []*SampledChunks) {
158149
t.Helper()
159150
var all pb.AgentPayload
160151
for _, p := range payloads {
161152
assert := assert.New(t)
162-
var slurp []byte
163-
var err error
164-
var reader io.ReadCloser
165-
166-
if shouldUseZstd {
167-
reader = zstd.NewReader(p.body)
168-
assert.NotNil(reader)
169-
} else {
170-
reader, err = gzip.NewReader(p.body)
171-
assert.NoError(err)
172-
}
173-
174-
slurp, err = io.ReadAll(reader)
175-
153+
gzipr, err := gzip.NewReader(p.body)
154+
assert.NoError(err)
155+
slurp, err := io.ReadAll(gzipr)
176156
assert.NoError(err)
177157
var payload pb.AgentPayload
178158
err = proto.Unmarshal(slurp, &payload)
@@ -227,7 +207,7 @@ func TestTraceWriterFlushSync(t *testing.T) {
227207
tw.FlushSync()
228208
// Now all trace payloads should be sent
229209
assert.Equal(t, 1, srv.Accepted())
230-
payloadsContain(t, srv.Payloads(), testSpans, false)
210+
payloadsContain(t, srv.Payloads(), testSpans)
231211
})
232212
}
233213

@@ -296,7 +276,7 @@ func TestTraceWriterSyncStop(t *testing.T) {
296276
tw.Stop()
297277
// Now all trace payloads should be sent
298278
assert.Equal(t, 1, srv.Accepted())
299-
payloadsContain(t, srv.Payloads(), testSpans, false)
279+
payloadsContain(t, srv.Payloads(), testSpans)
300280
})
301281
}
302282

0 commit comments

Comments
 (0)