Skip to content

Commit 040eba8

Browse files
dmitryaxpmatyjasek-sumo
authored andcommitted
Fix HEC exporter throwing 400s (open-telemetry#3032)
This commit fixes the HEC exporter sending log payload with missing the GZIP footer. gzip.Writer.Flush doesn't write GZIP footer. As a result reading the compressed payload fails with "unexpected EOF". This commit changes gzip.Writer.Flush call to gzip.Writer.Close to make sure that the GZIP footer is always set.
1 parent 885bd70 commit 040eba8

File tree

2 files changed

+90
-14
lines changed

2 files changed

+90
-14
lines changed

exporter/splunkhecexporter/client.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -132,8 +132,6 @@ func (c *client) pushLogData(ctx context.Context, ld pdata.Logs) (err error) {
132132
gzipBuffer := bytes.NewBuffer(make([]byte, 0, c.config.MaxContentLengthLogs))
133133
gzipWriter.Reset(gzipBuffer)
134134

135-
defer gzipWriter.Close()
136-
137135
// Callback when each batch is to be sent.
138136
send := func(ctx context.Context, buf *bytes.Buffer) (err error) {
139137
shouldCompress := buf.Len() >= minCompressionLen && !c.config.DisableCompression
@@ -146,7 +144,7 @@ func (c *client) pushLogData(ctx context.Context, ld pdata.Logs) (err error) {
146144
return fmt.Errorf("failed copying buffer to gzip writer: %v", err)
147145
}
148146

149-
if err = gzipWriter.Flush(); err != nil {
147+
if err = gzipWriter.Close(); err != nil {
150148
return fmt.Errorf("failed flushing compressed data to gzip writer: %v", err)
151149
}
152150

exporter/splunkhecexporter/client_test.go

Lines changed: 89 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package splunkhecexporter
1515

1616
import (
17+
"bytes"
1718
"compress/gzip"
1819
"context"
1920
"errors"
@@ -128,7 +129,7 @@ func createLogData(numResources int, numLibraries int, numRecords int) pdata.Log
128129

129130
type CapturingData struct {
130131
testing *testing.T
131-
receivedRequest chan string
132+
receivedRequest chan []byte
132133
statusCode int
133134
checkCompression bool
134135
}
@@ -146,7 +147,7 @@ func (c *CapturingData) ServeHTTP(w http.ResponseWriter, r *http.Request) {
146147
panic(err)
147148
}
148149
go func() {
149-
c.receivedRequest <- string(body)
150+
c.receivedRequest <- body
150151
}()
151152
w.WriteHeader(c.statusCode)
152153
}
@@ -163,7 +164,7 @@ func runMetricsExport(disableCompression bool, numberOfDataPoints int, t *testin
163164
cfg.DisableCompression = disableCompression
164165
cfg.Token = "1234-1234"
165166

166-
receivedRequest := make(chan string)
167+
receivedRequest := make(chan []byte)
167168
capture := CapturingData{testing: t, receivedRequest: receivedRequest, statusCode: 200, checkCompression: !cfg.DisableCompression}
168169
s := &http.Server{
169170
Handler: &capture,
@@ -184,7 +185,7 @@ func runMetricsExport(disableCompression bool, numberOfDataPoints int, t *testin
184185
assert.NoError(t, err)
185186
select {
186187
case request := <-receivedRequest:
187-
return request, nil
188+
return string(request), nil
188189
case <-time.After(1 * time.Second):
189190
return "", errors.New("timeout")
190191
}
@@ -202,7 +203,7 @@ func runTraceExport(disableCompression bool, numberOfTraces int, t *testing.T) (
202203
cfg.DisableCompression = disableCompression
203204
cfg.Token = "1234-1234"
204205

205-
receivedRequest := make(chan string)
206+
receivedRequest := make(chan []byte)
206207
capture := CapturingData{testing: t, receivedRequest: receivedRequest, statusCode: 200, checkCompression: !cfg.DisableCompression}
207208
s := &http.Server{
208209
Handler: &capture,
@@ -223,13 +224,13 @@ func runTraceExport(disableCompression bool, numberOfTraces int, t *testing.T) (
223224
assert.NoError(t, err)
224225
select {
225226
case request := <-receivedRequest:
226-
return request, nil
227+
return string(request), nil
227228
case <-time.After(1 * time.Second):
228229
return "", errors.New("timeout")
229230
}
230231
}
231232

232-
func runLogExport(cfg *Config, ld pdata.Logs, t *testing.T) ([]string, error) {
233+
func runLogExport(cfg *Config, ld pdata.Logs, t *testing.T) ([][]byte, error) {
233234
listener, err := net.Listen("tcp", "127.0.0.1:0")
234235
if err != nil {
235236
panic(err)
@@ -238,7 +239,7 @@ func runLogExport(cfg *Config, ld pdata.Logs, t *testing.T) ([]string, error) {
238239
cfg.Endpoint = "http://" + listener.Addr().String() + "/services/collector"
239240
cfg.Token = "1234-1234"
240241

241-
receivedRequest := make(chan string)
242+
receivedRequest := make(chan []byte)
242243
capture := CapturingData{testing: t, receivedRequest: receivedRequest, statusCode: 200, checkCompression: !cfg.DisableCompression}
243244
s := &http.Server{
244245
Handler: &capture,
@@ -256,7 +257,7 @@ func runLogExport(cfg *Config, ld pdata.Logs, t *testing.T) ([]string, error) {
256257
err = exporter.ConsumeLogs(context.Background(), ld)
257258
assert.NoError(t, err)
258259

259-
var requests []string
260+
var requests [][]byte
260261
for {
261262
select {
262263
case request := <-receivedRequest:
@@ -286,8 +287,13 @@ func TestReceiveLogs(t *testing.T) {
286287
type wantType struct {
287288
batches []string
288289
numBatches int
290+
compressed bool
289291
}
290292

293+
// The test cases depend on the constant minCompressionLen = 1500.
294+
// If the constant changed, the test cases with want.compressed=true must be updated.
295+
require.Equal(t, minCompressionLen, 1500)
296+
291297
tests := []struct {
292298
name string
293299
conf *Config
@@ -348,6 +354,62 @@ func TestReceiveLogs(t *testing.T) {
348354
numBatches: 2,
349355
},
350356
},
357+
{
358+
name: "1 compressed batch of 1837 bytes, make sure the event size is more than minCompressionLen=1500 to trigger compression",
359+
logs: createLogData(1, 1, 10),
360+
conf: func() *Config {
361+
return NewFactory().CreateDefaultConfig().(*Config)
362+
}(),
363+
want: wantType{
364+
batches: []string{
365+
`{"host":"myhost","source":"myapp","sourcetype":"myapp-type","index":"myindex","event":"mylog","fields":{"custom":"custom","host.name":"myhost","service.name":"myapp"}}` + "\n\r\n\r\n" +
366+
`{"time":0.001,"host":"myhost","source":"myapp","sourcetype":"myapp-type","index":"myindex","event":"mylog","fields":{"custom":"custom","host.name":"myhost","service.name":"myapp"}}` + "\n\r\n\r\n" +
367+
`{"time":0.002,"host":"myhost","source":"myapp","sourcetype":"myapp-type","index":"myindex","event":"mylog","fields":{"custom":"custom","host.name":"myhost","service.name":"myapp"}}` + "\n\r\n\r\n" +
368+
`{"time":0.003,"host":"myhost","source":"myapp","sourcetype":"myapp-type","index":"myindex","event":"mylog","fields":{"custom":"custom","host.name":"myhost","service.name":"myapp"}}` + "\n\r\n\r\n" +
369+
`{"time":0.004,"host":"myhost","source":"myapp","sourcetype":"myapp-type","index":"myindex","event":"mylog","fields":{"custom":"custom","host.name":"myhost","service.name":"myapp"}}` + "\n\r\n\r\n" +
370+
`{"time":0.005,"host":"myhost","source":"myapp","sourcetype":"myapp-type","index":"myindex","event":"mylog","fields":{"custom":"custom","host.name":"myhost","service.name":"myapp"}}` + "\n\r\n\r\n" +
371+
`{"time":0.006,"host":"myhost","source":"myapp","sourcetype":"myapp-type","index":"myindex","event":"mylog","fields":{"custom":"custom","host.name":"myhost","service.name":"myapp"}}` + "\n\r\n\r\n" +
372+
`{"time":0.007,"host":"myhost","source":"myapp","sourcetype":"myapp-type","index":"myindex","event":"mylog","fields":{"custom":"custom","host.name":"myhost","service.name":"myapp"}}` + "\n\r\n\r\n" +
373+
`{"time":0.008,"host":"myhost","source":"myapp","sourcetype":"myapp-type","index":"myindex","event":"mylog","fields":{"custom":"custom","host.name":"myhost","service.name":"myapp"}}` + "\n\r\n\r\n" +
374+
`{"time":0.009,"host":"myhost","source":"myapp","sourcetype":"myapp-type","index":"myindex","event":"mylog","fields":{"custom":"custom","host.name":"myhost","service.name":"myapp"}}` + "\n\r\n\r\n",
375+
},
376+
numBatches: 1,
377+
compressed: true,
378+
},
379+
},
380+
{
381+
name: "2 compressed batches - 1652 bytes each, make sure the log size is more than minCompressionLen=1500 to trigger compression",
382+
logs: createLogData(1, 1, 18), // comes to HEC events payload size - 1837 bytes
383+
conf: func() *Config {
384+
cfg := NewFactory().CreateDefaultConfig().(*Config)
385+
cfg.MaxContentLengthLogs = 1700
386+
return cfg
387+
}(),
388+
want: wantType{
389+
batches: []string{
390+
`{"host":"myhost","source":"myapp","sourcetype":"myapp-type","index":"myindex","event":"mylog","fields":{"custom":"custom","host.name":"myhost","service.name":"myapp"}}` + "\n\r\n\r\n" +
391+
`{"time":0.001,"host":"myhost","source":"myapp","sourcetype":"myapp-type","index":"myindex","event":"mylog","fields":{"custom":"custom","host.name":"myhost","service.name":"myapp"}}` + "\n\r\n\r\n" +
392+
`{"time":0.002,"host":"myhost","source":"myapp","sourcetype":"myapp-type","index":"myindex","event":"mylog","fields":{"custom":"custom","host.name":"myhost","service.name":"myapp"}}` + "\n\r\n\r\n" +
393+
`{"time":0.003,"host":"myhost","source":"myapp","sourcetype":"myapp-type","index":"myindex","event":"mylog","fields":{"custom":"custom","host.name":"myhost","service.name":"myapp"}}` + "\n\r\n\r\n" +
394+
`{"time":0.004,"host":"myhost","source":"myapp","sourcetype":"myapp-type","index":"myindex","event":"mylog","fields":{"custom":"custom","host.name":"myhost","service.name":"myapp"}}` + "\n\r\n\r\n" +
395+
`{"time":0.005,"host":"myhost","source":"myapp","sourcetype":"myapp-type","index":"myindex","event":"mylog","fields":{"custom":"custom","host.name":"myhost","service.name":"myapp"}}` + "\n\r\n\r\n" +
396+
`{"time":0.006,"host":"myhost","source":"myapp","sourcetype":"myapp-type","index":"myindex","event":"mylog","fields":{"custom":"custom","host.name":"myhost","service.name":"myapp"}}` + "\n\r\n\r\n" +
397+
`{"time":0.007,"host":"myhost","source":"myapp","sourcetype":"myapp-type","index":"myindex","event":"mylog","fields":{"custom":"custom","host.name":"myhost","service.name":"myapp"}}` + "\n\r\n\r\n" +
398+
`{"time":0.008,"host":"myhost","source":"myapp","sourcetype":"myapp-type","index":"myindex","event":"mylog","fields":{"custom":"custom","host.name":"myhost","service.name":"myapp"}}` + "\n\r\n\r\n",
399+
`{"time":0.009,"host":"myhost","source":"myapp","sourcetype":"myapp-type","index":"myindex","event":"mylog","fields":{"custom":"custom","host.name":"myhost","service.name":"myapp"}}` + "\n\r\n\r\n" +
400+
`{"time":0.01,"host":"myhost","source":"myapp","sourcetype":"myapp-type","index":"myindex","event":"mylog","fields":{"custom":"custom","host.name":"myhost","service.name":"myapp"}}` + "\n\r\n\r\n" +
401+
`{"time":0.011,"host":"myhost","source":"myapp","sourcetype":"myapp-type","index":"myindex","event":"mylog","fields":{"custom":"custom","host.name":"myhost","service.name":"myapp"}}` + "\n\r\n\r\n" +
402+
`{"time":0.012,"host":"myhost","source":"myapp","sourcetype":"myapp-type","index":"myindex","event":"mylog","fields":{"custom":"custom","host.name":"myhost","service.name":"myapp"}}` + "\n\r\n\r\n" +
403+
`{"time":0.013,"host":"myhost","source":"myapp","sourcetype":"myapp-type","index":"myindex","event":"mylog","fields":{"custom":"custom","host.name":"myhost","service.name":"myapp"}}` + "\n\r\n\r\n" +
404+
`{"time":0.014,"host":"myhost","source":"myapp","sourcetype":"myapp-type","index":"myindex","event":"mylog","fields":{"custom":"custom","host.name":"myhost","service.name":"myapp"}}` + "\n\r\n\r\n" +
405+
`{"time":0.015,"host":"myhost","source":"myapp","sourcetype":"myapp-type","index":"myindex","event":"mylog","fields":{"custom":"custom","host.name":"myhost","service.name":"myapp"}}` + "\n\r\n\r\n" +
406+
`{"time":0.016,"host":"myhost","source":"myapp","sourcetype":"myapp-type","index":"myindex","event":"mylog","fields":{"custom":"custom","host.name":"myhost","service.name":"myapp"}}` + "\n\r\n\r\n" +
407+
`{"time":0.017,"host":"myhost","source":"myapp","sourcetype":"myapp-type","index":"myindex","event":"mylog","fields":{"custom":"custom","host.name":"myhost","service.name":"myapp"}}` + "\n\r\n\r\n",
408+
},
409+
numBatches: 2,
410+
compressed: true,
411+
},
412+
},
351413
}
352414

353415
for _, test := range tests {
@@ -359,7 +421,11 @@ func TestReceiveLogs(t *testing.T) {
359421

360422
for i := 0; i < test.want.numBatches; i++ {
361423
require.NotZero(t, got[i])
362-
assert.Equal(t, test.want.batches[i], got[i])
424+
if test.want.compressed {
425+
validateCompressedEqual(t, test.want.batches[i], got[i])
426+
} else {
427+
assert.Equal(t, test.want.batches[i], string(got[i]))
428+
}
363429

364430
}
365431
})
@@ -391,7 +457,7 @@ func TestReceiveMetricsWithCompression(t *testing.T) {
391457
}
392458

393459
func TestErrorReceived(t *testing.T) {
394-
receivedRequest := make(chan string)
460+
receivedRequest := make(chan []byte)
395461
capture := CapturingData{receivedRequest: receivedRequest, statusCode: 500}
396462
listener, err := net.Listen("tcp", "127.0.0.1:0")
397463
if err != nil {
@@ -720,3 +786,15 @@ func TestSubLogs(t *testing.T) {
720786
// The name of the sole log record should be 1_1_2.
721787
assert.Equal(t, "1_1_2", got.ResourceLogs().At(0).InstrumentationLibraryLogs().At(0).Logs().At(0).Name())
722788
}
789+
790+
// validateCompressedEqual validates that GZipped `got` contains `expected` string
791+
func validateCompressedEqual(t *testing.T, expected string, got []byte) {
792+
z, err := gzip.NewReader(bytes.NewReader(got))
793+
require.NoError(t, err)
794+
defer z.Close()
795+
796+
p, err := ioutil.ReadAll(z)
797+
require.NoError(t, err)
798+
799+
assert.Equal(t, expected, string(p))
800+
}

0 commit comments

Comments
 (0)