Skip to content

Commit 8eda24f

Browse files
committed
switch marshal from JSON to byte-based
1 parent 65c9e49 commit 8eda24f

File tree

2 files changed

+59
-39
lines changed

2 files changed

+59
-39
lines changed

exporter/exporterhelper/internal/queuebatch/persistent_queue.go

Lines changed: 37 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"encoding/json"
1111
"errors"
1212
"fmt"
13+
"math"
1314
"strconv"
1415
"sync"
1516

@@ -243,11 +244,6 @@ func (pq *persistentQueue[T]) Offer(ctx context.Context, req T) error {
243244
return pq.putInternal(ctx, req)
244245
}
245246

246-
type marshaledRequestWithSpanContext struct {
247-
RequestBytes []byte `json:"request"`
248-
SpanContextJSON json.RawMessage `json:"span_context,omitempty"`
249-
}
250-
251247
type spanContextConfigWrapper struct {
252248
TraceID string
253249
SpanID string
@@ -289,31 +285,42 @@ func spanContextFromWrapper(wrapper spanContextConfigWrapper) (*trace.SpanContex
289285
return &sc, nil
290286
}
291287

292-
// unmarshalRequestWithSpanContext unmarshals a marshaledRequestWithSpanContext from bytes, returning the request
293-
// and a context with the restored SpanContext (if present).
288+
// unmarshalRequestWithSpanContext unmarshals a binary envelope, returning the request and a context with the restored SpanContext (if present).
294289
func unmarshalRequestWithSpanContext[T any](encoding Encoding[T], value []byte) (T, context.Context, error) {
295290
var req T
296291
restoredContext := context.Background()
297-
var envelope marshaledRequestWithSpanContext
298-
if err := json.Unmarshal(value, &envelope); err != nil {
299-
return req, restoredContext, err
292+
if len(value) < 8 {
293+
return req, restoredContext, errors.New("envelope too short")
294+
}
295+
reqLen := binary.LittleEndian.Uint32(value[:4])
296+
if len(value) < int(4+reqLen+4) {
297+
return req, restoredContext, errors.New("envelope too short for request")
300298
}
301-
request, err := encoding.Unmarshal(envelope.RequestBytes)
299+
reqBytes := value[4 : 4+reqLen]
300+
scLen := binary.LittleEndian.Uint32(value[4+reqLen : 8+reqLen])
301+
if len(value) < int(8+reqLen+scLen) {
302+
return req, restoredContext, errors.New("envelope too short for span context")
303+
}
304+
scBytes := value[8+reqLen : 8+reqLen+scLen]
305+
// Unmarshal request
306+
r, err := encoding.Unmarshal(reqBytes)
302307
if err != nil {
303308
return req, restoredContext, err
304309
}
305-
if len(envelope.SpanContextJSON) > 0 {
310+
req = r
311+
// Unmarshal span context if present
312+
if scLen > 0 {
306313
var wrapper spanContextConfigWrapper
307-
if err := json.Unmarshal(envelope.SpanContextJSON, &wrapper); err == nil {
314+
if err := json.Unmarshal(scBytes, &wrapper); err == nil {
308315
if sc, err := spanContextFromWrapper(wrapper); err == nil && sc != nil {
309316
restoredContext = trace.ContextWithSpanContext(restoredContext, *sc)
310317
}
311318
}
312319
}
313-
return request, restoredContext, nil
320+
return req, restoredContext, nil
314321
}
315322

316-
// marshalRequestWithSpanContext marshals the request and the SpanContext from ctx into a marshaledRequestWithSpanContext envelope as bytes.
323+
// marshalRequestWithSpanContext marshals the request and the SpanContext from ctx into a binary envelope as bytes.
317324
func marshalRequestWithSpanContext[T any](ctx context.Context, encoding Encoding[T], req T) ([]byte, error) {
318325
reqBuf, err := encoding.Marshal(req)
319326
if err != nil {
@@ -326,14 +333,22 @@ func marshalRequestWithSpanContext[T any](ctx context.Context, encoding Encoding
326333
if err != nil {
327334
return nil, err
328335
}
329-
} else {
330-
scJSON = nil // Will be omitted due to omitempty
331-
}
332-
envelope := marshaledRequestWithSpanContext{
333-
RequestBytes: reqBuf,
334-
SpanContextJSON: scJSON,
335336
}
336-
return json.Marshal(envelope)
337+
if len(reqBuf) > int(math.MaxUint32) {
338+
return nil, fmt.Errorf("request too large to encode: %d bytes", len(reqBuf))
339+
}
340+
if len(scJSON) > int(math.MaxUint32) {
341+
return nil, fmt.Errorf("span context too large to encode: %d bytes", len(scJSON))
342+
}
343+
// Compose binary envelope: [4 bytes reqLen][req][4 bytes scLen][scJSON]
344+
buf := make([]byte, 0, 8+len(reqBuf)+len(scJSON))
345+
//nolint:gosec // G115: integer overflow conversion int -> uint32
346+
buf = binary.LittleEndian.AppendUint32(buf, uint32(len(reqBuf)))
347+
buf = append(buf, reqBuf...)
348+
//nolint:gosec // G115: integer overflow conversion int -> uint32
349+
buf = binary.LittleEndian.AppendUint32(buf, uint32(len(scJSON)))
350+
buf = append(buf, scJSON...)
351+
return buf, nil
337352
}
338353

339354
// putInternal is the internal version that requires caller to hold the mutex lock.

exporter/exporterhelper/internal/queuebatch/persistent_queue_test.go

Lines changed: 22 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ package queuebatch
66
import (
77
"context"
88
"encoding/binary"
9-
"encoding/json"
109
"errors"
1110
"fmt"
1211
"math"
@@ -1441,28 +1440,34 @@ func TestMarshalUnmarshalRequestWithSpanContext(t *testing.T) {
14411440
})
14421441

14431442
t.Run("corrupted span_context field", func(t *testing.T) {
1444-
// Manually create a bad envelope
1445-
envelope := marshaledRequestWithSpanContext{
1446-
RequestBytes: []byte{0x2a, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
1447-
SpanContextJSON: []byte(`{"TraceID":123}`), // invalid TraceID
1448-
}
1449-
data, err := json.Marshal(envelope)
1450-
require.NoError(t, err)
1451-
_, gotCtx, err := unmarshalRequestWithSpanContext(uint64Encoding{}, data)
1443+
// Manually create a bad envelope using the new binary format
1444+
requestBytes := []byte{0x2a, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}
1445+
badSpanContext := []byte(`{"TraceID":123}`) // invalid TraceID
1446+
buf := make([]byte, 0, 8+len(requestBytes)+len(badSpanContext))
1447+
//nolint:gosec // G115: integer overflow conversion int -> uint32
1448+
buf = binary.LittleEndian.AppendUint32(buf, uint32(len(requestBytes)))
1449+
buf = append(buf, requestBytes...)
1450+
//nolint:gosec // G115: integer overflow conversion int -> uint32
1451+
buf = binary.LittleEndian.AppendUint32(buf, uint32(len(badSpanContext)))
1452+
buf = append(buf, badSpanContext...)
1453+
_, gotCtx, err := unmarshalRequestWithSpanContext(uint64Encoding{}, buf)
14521454
require.NoError(t, err)
14531455
// Should not panic, should return background context
14541456
restoredSC := trace.SpanContextFromContext(gotCtx)
14551457
assert.False(t, restoredSC.IsValid())
14561458
})
14571459

1458-
t.Run("valid JSON, invalid RequestBytes returns error", func(t *testing.T) {
1459-
envelope := marshaledRequestWithSpanContext{
1460-
RequestBytes: []byte{0x01, 0x02}, // too short for uint64Encoding.Unmarshal
1461-
SpanContextJSON: nil,
1462-
}
1463-
data, err := json.Marshal(envelope)
1464-
require.NoError(t, err)
1465-
_, _, err = unmarshalRequestWithSpanContext(uint64Encoding{}, data)
1460+
t.Run("valid binary, invalid RequestBytes returns error", func(t *testing.T) {
1461+
requestBytes := []byte{0x01, 0x02} // too short for uint64Encoding.Unmarshal
1462+
badSpanContext := []byte{}
1463+
buf := make([]byte, 0, 8+len(requestBytes)+len(badSpanContext))
1464+
//nolint:gosec // G115: integer overflow conversion int -> uint32
1465+
buf = binary.LittleEndian.AppendUint32(buf, uint32(len(requestBytes)))
1466+
buf = append(buf, requestBytes...)
1467+
//nolint:gosec // G115: integer overflow conversion int -> uint32
1468+
buf = binary.LittleEndian.AppendUint32(buf, uint32(len(badSpanContext)))
1469+
buf = append(buf, badSpanContext...)
1470+
_, _, err := unmarshalRequestWithSpanContext(uint64Encoding{}, buf)
14661471
require.Error(t, err)
14671472
})
14681473
}

0 commit comments

Comments
 (0)