Skip to content

Commit 6e6c3e8

Browse files
jmacdsbylica-splunk
authored andcommitted
(otelarrowreceiver): Use a single call to BoundedQueue.Acquire (open-telemetry#36082)
#### Description Simplifies the admission control logic for OTAP payloads. We call Acquire() once after uncompressing the data, instead of once with compressed size and once with the difference. #### Link to tracking issue Part of open-telemetry#36074. #### Testing One test is replaced with logic to verify certain BoundedQueue actions. ~Note: the OTel-Arrow test suite will not pass with this PR until it merges with open-telemetry#36078.~ Originally developed in open-telemetry#36033. #### Documentation Not user-visible.
1 parent 78c0e91 commit 6e6c3e8

File tree

4 files changed

+62
-143
lines changed

4 files changed

+62
-143
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: bug_fix
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: otelarrowreceiver
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Simplify receiver admission control logic
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [36074]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

receiver/otelarrowreceiver/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ require (
3434
go.uber.org/zap v1.27.0
3535
golang.org/x/net v0.30.0
3636
google.golang.org/grpc v1.67.1
37-
google.golang.org/protobuf v1.35.1
3837
)
3938

4039
require (
@@ -90,6 +89,7 @@ require (
9089
golang.org/x/tools v0.22.0 // indirect
9190
golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect
9291
google.golang.org/genproto/googleapis/rpc v0.0.0-20241007155032-5fefd90f89a9 // indirect
92+
google.golang.org/protobuf v1.35.1 // indirect
9393
gopkg.in/yaml.v3 v3.0.1 // indirect
9494
)
9595

receiver/otelarrowreceiver/internal/arrow/arrow.go

Lines changed: 13 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,7 @@ import (
88
"errors"
99
"fmt"
1010
"io"
11-
"net"
1211
"runtime"
13-
"strconv"
1412
"strings"
1513
"sync"
1614
"sync/atomic"
@@ -39,7 +37,6 @@ import (
3937
"google.golang.org/grpc/codes"
4038
"google.golang.org/grpc/metadata"
4139
"google.golang.org/grpc/status"
42-
"google.golang.org/protobuf/proto"
4340

4441
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/grpcutil"
4542
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission"
@@ -454,9 +451,8 @@ type inFlightData struct {
454451
// consumeAndRespond() function.
455452
refs atomic.Int32
456453

457-
numAcquired int64 // how many bytes held in the semaphore
458-
numItems int // how many items
459-
uncompSize int64 // uncompressed data size
454+
numItems int // how many items
455+
uncompSize int64 // uncompressed data size == how many bytes held in the semaphore
460456
}
461457

462458
func (id *inFlightData) recvDone(ctx context.Context, recvErrPtr *error) {
@@ -505,8 +501,8 @@ func (id *inFlightData) anyDone(ctx context.Context) {
505501

506502
id.span.End()
507503

508-
if id.numAcquired != 0 {
509-
if err := id.boundedQueue.Release(id.numAcquired); err != nil {
504+
if id.uncompSize != 0 {
505+
if err := id.boundedQueue.Release(id.uncompSize); err != nil {
510506
id.telemetry.Logger.Error("release error", zap.Error(err))
511507
}
512508
}
@@ -606,19 +602,6 @@ func (r *receiverStream) recvOne(streamCtx context.Context, serverStream anyStre
606602
}
607603
}
608604

609-
var prevAcquiredBytes int64
610-
uncompSizeHeaderStr, uncompSizeHeaderFound := authHdrs["otlp-pdata-size"]
611-
if !uncompSizeHeaderFound || len(uncompSizeHeaderStr) == 0 {
612-
// This is a compressed size so make sure to acquire the difference when request is decompressed.
613-
prevAcquiredBytes = int64(proto.Size(req))
614-
} else {
615-
var parseErr error
616-
prevAcquiredBytes, parseErr = strconv.ParseInt(uncompSizeHeaderStr[0], 10, 64)
617-
if parseErr != nil {
618-
return status.Errorf(codes.Internal, "failed to convert string to request size: %v", parseErr)
619-
}
620-
}
621-
622605
var callerCancel context.CancelFunc
623606
if encodedTimeout, has := authHdrs["grpc-timeout"]; has && len(encodedTimeout) == 1 {
624607
if timeout, decodeErr := grpcutil.DecodeTimeout(encodedTimeout[0]); decodeErr != nil {
@@ -638,17 +621,6 @@ func (r *receiverStream) recvOne(streamCtx context.Context, serverStream anyStre
638621
}
639622
}
640623

641-
// Use the bounded queue to memory limit based on incoming
642-
// uncompressed request size and waiters. Acquire will fail
643-
// immediately if there are too many waiters, or will
644-
// otherwise block until timeout or enough memory becomes
645-
// available.
646-
acquireErr := r.boundedQueue.Acquire(inflightCtx, prevAcquiredBytes)
647-
if acquireErr != nil {
648-
return status.Errorf(codes.ResourceExhausted, "otel-arrow bounded queue: %v", acquireErr)
649-
}
650-
flight.numAcquired = prevAcquiredBytes
651-
652624
data, numItems, uncompSize, consumeErr := r.consumeBatch(ac, req)
653625

654626
if consumeErr != nil {
@@ -658,19 +630,21 @@ func (r *receiverStream) recvOne(streamCtx context.Context, serverStream anyStre
658630
return status.Errorf(codes.Internal, "otel-arrow decode: %v", consumeErr)
659631
}
660632

633+
// Use the bounded queue to memory limit based on incoming
634+
// uncompressed request size and waiters. Acquire will fail
635+
// immediately if there are too many waiters, or will
636+
// otherwise block until timeout or enough memory becomes
637+
// available.
638+
acquireErr := r.boundedQueue.Acquire(inflightCtx, uncompSize)
639+
if acquireErr != nil {
640+
return acquireErr
641+
}
661642
flight.uncompSize = uncompSize
662643
flight.numItems = numItems
663644

664645
r.telemetryBuilder.OtelArrowReceiverInFlightBytes.Add(inflightCtx, uncompSize)
665646
r.telemetryBuilder.OtelArrowReceiverInFlightItems.Add(inflightCtx, int64(numItems))
666647

667-
numAcquired, secondAcquireErr := r.acquireAdditionalBytes(inflightCtx, prevAcquiredBytes, uncompSize, hrcv.connInfo.Addr, uncompSizeHeaderFound)
668-
669-
flight.numAcquired = numAcquired
670-
if secondAcquireErr != nil {
671-
return status.Errorf(codes.ResourceExhausted, "otel-arrow bounded queue re-acquire: %v", secondAcquireErr)
672-
}
673-
674648
// Recognize that the request is still in-flight via consumeAndRespond()
675649
flight.refs.Add(1)
676650

@@ -901,47 +875,3 @@ func (r *Receiver) consumeData(ctx context.Context, data any, flight *inFlightDa
901875
}
902876
return retErr
903877
}
904-
905-
func (r *Receiver) acquireAdditionalBytes(ctx context.Context, prevAcquired, uncompSize int64, addr net.Addr, uncompSizeHeaderFound bool) (int64, error) {
906-
diff := uncompSize - prevAcquired
907-
908-
if diff == 0 {
909-
return uncompSize, nil
910-
}
911-
912-
if uncompSizeHeaderFound {
913-
var clientAddr string
914-
if addr != nil {
915-
clientAddr = addr.String()
916-
}
917-
// a mismatch between header set by exporter and the uncompSize just calculated.
918-
r.telemetry.Logger.Debug("mismatch between uncompressed size in receiver and otlp-pdata-size header",
919-
zap.String("client-address", clientAddr),
920-
zap.Int("uncompsize", int(uncompSize)),
921-
zap.Int("otlp-pdata-size", int(prevAcquired)),
922-
)
923-
} else if diff < 0 {
924-
// proto.Size() on compressed request was greater than pdata uncompressed size.
925-
r.telemetry.Logger.Debug("uncompressed size is less than compressed size",
926-
zap.Int("uncompressed", int(uncompSize)),
927-
zap.Int("compressed", int(prevAcquired)),
928-
)
929-
}
930-
931-
if diff < 0 {
932-
// If the difference is negative, release the overage.
933-
if err := r.boundedQueue.Release(-diff); err != nil {
934-
return 0, err
935-
}
936-
} else {
937-
// Release previously acquired bytes to prevent deadlock and
938-
// reacquire the uncompressed size we just calculated.
939-
if err := r.boundedQueue.Release(prevAcquired); err != nil {
940-
return 0, err
941-
}
942-
if err := r.boundedQueue.Acquire(ctx, uncompSize); err != nil {
943-
return 0, err
944-
}
945-
}
946-
return uncompSize, nil
947-
}

receiver/otelarrowreceiver/internal/arrow/arrow_test.go

Lines changed: 21 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import (
99
"encoding/json"
1010
"fmt"
1111
"io"
12-
"strconv"
1312
"strings"
1413
"sync"
1514
"testing"
@@ -408,98 +407,61 @@ func requireExhaustedStatus(t *testing.T, err error) {
408407
requireStatus(t, codes.ResourceExhausted, err)
409408
}
410409

410+
func requireInvalidArgumentStatus(t *testing.T, err error) {
411+
requireStatus(t, codes.InvalidArgument, err)
412+
}
413+
411414
func requireStatus(t *testing.T, code codes.Code, err error) {
412415
require.Error(t, err)
413416
status, ok := status.FromError(err)
414417
require.True(t, ok, "is status-wrapped %v", err)
415418
require.Equal(t, code, status.Code())
416419
}
417420

418-
func TestBoundedQueueWithPdataHeaders(t *testing.T) {
421+
func TestBoundedQueueLimits(t *testing.T) {
419422
var sizer ptrace.ProtoMarshaler
420423
stdTesting := otelAssert.NewStdUnitTest(t)
421-
pdataSizeTenTraces := sizer.TracesSize(testdata.GenerateTraces(10))
422-
defaultBoundedQueueLimit := int64(100000)
424+
td := testdata.GenerateTraces(10)
425+
tdSize := int64(sizer.TracesSize(td))
426+
423427
tests := []struct {
424-
name string
425-
numTraces int
426-
includePdataHeader bool
427-
pdataSize string
428-
rejected bool
428+
name string
429+
admitLimit int64
430+
expectErr bool
429431
}{
430432
{
431-
name: "no header compressed greater than uncompressed",
432-
numTraces: 10,
433-
},
434-
{
435-
name: "no header compressed less than uncompressed",
436-
numTraces: 100,
437-
},
438-
{
439-
name: "pdata header less than uncompressedSize",
440-
numTraces: 10,
441-
pdataSize: strconv.Itoa(pdataSizeTenTraces / 2),
442-
includePdataHeader: true,
443-
},
444-
{
445-
name: "pdata header equal uncompressedSize",
446-
numTraces: 10,
447-
pdataSize: strconv.Itoa(pdataSizeTenTraces),
448-
includePdataHeader: true,
433+
name: "admit request",
434+
admitLimit: tdSize * 2,
435+
expectErr: false,
449436
},
450437
{
451-
name: "pdata header greater than uncompressedSize",
452-
numTraces: 10,
453-
pdataSize: strconv.Itoa(pdataSizeTenTraces * 2),
454-
includePdataHeader: true,
455-
},
456-
{
457-
name: "no header compressed accepted uncompressed rejected",
458-
numTraces: 100,
459-
rejected: true,
460-
},
461-
{
462-
name: "pdata header accepted uncompressed rejected",
463-
numTraces: 100,
464-
rejected: true,
465-
pdataSize: strconv.Itoa(pdataSizeTenTraces),
466-
includePdataHeader: true,
438+
name: "reject request",
439+
admitLimit: tdSize / 2,
440+
expectErr: true,
467441
},
468442
}
469443
for _, tt := range tests {
470444
t.Run(tt.name, func(t *testing.T) {
471445
tc := newHealthyTestChannel(t)
472446
ctc := newCommonTestCase(t, tc)
473447

474-
td := testdata.GenerateTraces(tt.numTraces)
475448
batch, err := ctc.testProducer.BatchArrowRecordsFromTraces(td)
476449
require.NoError(t, err)
477-
if tt.includePdataHeader {
478-
var hpb bytes.Buffer
479-
hpe := hpack.NewEncoder(&hpb)
480-
err = hpe.WriteField(hpack.HeaderField{
481-
Name: "otlp-pdata-size",
482-
Value: tt.pdataSize,
483-
})
484-
assert.NoError(t, err)
485-
batch.Headers = make([]byte, hpb.Len())
486-
copy(batch.Headers, hpb.Bytes())
487-
}
488450

489451
var bq admission.Queue
490-
if tt.rejected {
452+
if tt.expectErr {
491453
ctc.stream.EXPECT().Send(statusOKFor(batch.BatchId)).Times(0)
492454
bq = admission.NewBoundedQueue(noopTelemetry, int64(sizer.TracesSize(td)-100), 10)
493455
} else {
494456
ctc.stream.EXPECT().Send(statusOKFor(batch.BatchId)).Times(1).Return(nil)
495-
bq = admission.NewBoundedQueue(noopTelemetry, defaultBoundedQueueLimit, 10)
457+
bq = admission.NewBoundedQueue(noopTelemetry, tt.admitLimit, 10)
496458
}
497459

498460
ctc.start(ctc.newRealConsumer, bq)
499461
ctc.putBatch(batch, nil)
500462

501-
if tt.rejected {
502-
requireExhaustedStatus(t, ctc.wait())
463+
if tt.expectErr {
464+
requireInvalidArgumentStatus(t, ctc.wait())
503465
} else {
504466
data := <-ctc.consume
505467
actualTD := data.Data.(ptrace.Traces)

0 commit comments

Comments
 (0)