Skip to content

Commit 8612538

Browse files
committed
Changed thrift_processor_test.go
1 parent 81c5eee commit 8612538

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

53 files changed

+3507
-56
lines changed

receiver/jaegerreceiver/go.mod

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/receiver/jaeger
33
go 1.23.0
44

55
require (
6+
github.com/HdrHistogram/hdrhistogram-go v1.1.2
67
github.com/apache/thrift v0.21.0
78
github.com/gorilla/mux v1.8.1
89
github.com/jaegertracing/jaeger v1.66.0
@@ -11,7 +12,7 @@ require (
1112
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger v0.120.1
1213
github.com/stretchr/testify v1.10.0
1314
go.opentelemetry.io/collector/component v0.120.1-0.20250226024140-8099e51f9a77
14-
go.opentelemetry.io/collector/component/componentstatus v0.120.1-0.20250226024140-8099e51f9a77
15+
go.opentelemetry.io/collector/component/componentstatus v0.119.0
1516
go.opentelemetry.io/collector/component/componenttest v0.120.1-0.20250226024140-8099e51f9a77
1617
go.opentelemetry.io/collector/config/configgrpc v0.120.1-0.20250226024140-8099e51f9a77
1718
go.opentelemetry.io/collector/config/confighttp v0.120.1-0.20250226024140-8099e51f9a77

receiver/jaegerreceiver/go.sum

Lines changed: 39 additions & 50 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
// Copyright The OpenTelemetry Authors
2+
// Copyright (c) 2019 The Jaeger Authors.
3+
// Copyright (c) 2017 Uber Technologies, Inc.
4+
// SPDX-License-Identifier: Apache-2.0
5+
6+
package customtransport
7+
8+
import (
9+
"bytes"
10+
"context"
11+
12+
"github.com/apache/thrift/lib/go/thrift"
13+
)
14+
15+
// TBufferedReadTransport is a thrift.TTransport that reads from a buffer
16+
type TBufferedReadTransport struct {
17+
readBuf *bytes.Buffer
18+
}
19+
20+
var _ thrift.TTransport = (*TBufferedReadTransport)(nil)
21+
22+
// NewTBufferedReadTransport creates a buffer backed TTransport
23+
func NewTBufferedReadTransport(readBuf *bytes.Buffer) (*TBufferedReadTransport, error) {
24+
return &TBufferedReadTransport{readBuf: readBuf}, nil
25+
}
26+
27+
// IsOpen does nothing as transport is not maintaining the connection
28+
// Required to maintain thrift.TTransport interface
29+
func (*TBufferedReadTransport) IsOpen() bool {
30+
return true
31+
}
32+
33+
// Open does nothing as transport is not maintaining the connection
34+
// Required to maintain thrift.TTransport interface
35+
func (*TBufferedReadTransport) Open() error {
36+
return nil
37+
}
38+
39+
// Close does nothing as transport is not maintaining the connection
40+
// Required to maintain thrift.TTransport interface
41+
func (*TBufferedReadTransport) Close() error {
42+
return nil
43+
}
44+
45+
// Read reads bytes from the local buffer and puts them in the specified buf
46+
func (p *TBufferedReadTransport) Read(buf []byte) (int, error) {
47+
in, err := p.readBuf.Read(buf)
48+
return in, thrift.NewTTransportExceptionFromError(err)
49+
}
50+
51+
// RemainingBytes returns the number of bytes left to be read from the readBuf
52+
func (p *TBufferedReadTransport) RemainingBytes() uint64 {
53+
//nolint: gosec // G115
54+
return uint64(p.readBuf.Len())
55+
}
56+
57+
// Write writes bytes into the read buffer
58+
// Required to maintain thrift.TTransport interface
59+
func (p *TBufferedReadTransport) Write(buf []byte) (int, error) {
60+
p.readBuf = bytes.NewBuffer(buf)
61+
return len(buf), nil
62+
}
63+
64+
// Flush does nothing as udp server does not write responses back
65+
// Required to maintain thrift.TTransport interface
66+
func (*TBufferedReadTransport) Flush(_ context.Context) error {
67+
return nil
68+
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
// Copyright The OpenTelemetry Authors
2+
// Copyright (c) 2019 The Jaeger Authors.
3+
// Copyright (c) 2017 Uber Technologies, Inc.
4+
// SPDX-License-Identifier: Apache-2.0
5+
6+
package customtransport
7+
8+
import (
9+
"bytes"
10+
"context"
11+
"testing"
12+
13+
"github.com/stretchr/testify/require"
14+
15+
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/jaegerreceiver/internal/cmd/testutils"
16+
)
17+
18+
// TestTBufferedReadTransport tests the TBufferedReadTransport
19+
func TestTBufferedReadTransport(t *testing.T) {
20+
buffer := bytes.NewBuffer([]byte("testString"))
21+
trans, err := NewTBufferedReadTransport(buffer)
22+
require.NotNil(t, trans)
23+
require.NoError(t, err)
24+
require.Equal(t, uint64(10), trans.RemainingBytes())
25+
26+
firstRead := make([]byte, 4)
27+
n, err := trans.Read(firstRead)
28+
require.NoError(t, err)
29+
require.Equal(t, 4, n)
30+
require.Equal(t, []byte("test"), firstRead)
31+
require.Equal(t, uint64(6), trans.RemainingBytes())
32+
33+
secondRead := make([]byte, 7)
34+
n, err = trans.Read(secondRead)
35+
require.NoError(t, err)
36+
require.Equal(t, 6, n)
37+
require.Equal(t, []byte("String"), secondRead[0:6])
38+
require.Equal(t, uint64(0), trans.RemainingBytes())
39+
}
40+
41+
// TestTBufferedReadTransportEmptyFunctions tests the empty functions in TBufferedReadTransport
42+
func TestTBufferedReadTransportEmptyFunctions(t *testing.T) {
43+
byteArr := make([]byte, 1)
44+
trans, err := NewTBufferedReadTransport(bytes.NewBuffer(byteArr))
45+
require.NotNil(t, trans)
46+
require.NoError(t, err)
47+
48+
err = trans.Open()
49+
require.NoError(t, err)
50+
51+
err = trans.Close()
52+
require.NoError(t, err)
53+
54+
err = trans.Flush(context.Background())
55+
require.NoError(t, err)
56+
57+
n, err := trans.Write(byteArr)
58+
require.Equal(t, 1, n)
59+
require.NoError(t, err)
60+
61+
isOpen := trans.IsOpen()
62+
require.True(t, isOpen)
63+
}
64+
65+
func TestMain(m *testing.M) {
66+
testutils.VerifyGoLeaks(m)
67+
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
// Copyright The OpenTelemetry Authors
2+
// Copyright (c) 2024 The Jaeger Authors.
3+
// SPDX-License-Identifier: Apache-2.0
4+
5+
package processors
6+
7+
import (
8+
"testing"
9+
10+
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/jaegerreceiver/internal/cmd/testutils"
11+
)
12+
13+
func TestMain(m *testing.M) {
14+
testutils.VerifyGoLeaks(m)
15+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
// Copyright The OpenTelemetry Authors
2+
// Copyright (c) 2019 The Jaeger Authors.
3+
// Copyright (c) 2017 Uber Technologies, Inc.
4+
// SPDX-License-Identifier: Apache-2.0
5+
6+
package processors
7+
8+
// Processor processes metrics in multiple formats
9+
type Processor interface {
10+
Serve()
11+
Stop()
12+
}
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
// Copyright The OpenTelemetry Authors
2+
// Copyright (c) 2019 The Jaeger Authors.
3+
// Copyright (c) 2017 Uber Technologies, Inc.
4+
// SPDX-License-Identifier: Apache-2.0
5+
6+
package processors
7+
8+
import (
9+
"context"
10+
"fmt"
11+
"sync"
12+
13+
"github.com/apache/thrift/lib/go/thrift"
14+
"go.uber.org/zap"
15+
16+
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/jaegerreceiver/internal/cmd/customtransport"
17+
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/jaegerreceiver/internal/cmd/servers"
18+
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/jaegerreceiver/internal/pkg/metrics"
19+
)
20+
21+
// ThriftProcessor is a server that processes spans using a TBuffered Server
22+
type ThriftProcessor struct {
23+
server servers.Server
24+
handler AgentProcessor
25+
protocolPool *sync.Pool
26+
numProcessors int
27+
processing sync.WaitGroup
28+
logger *zap.Logger
29+
metrics struct {
30+
// Amount of time taken for processor to close
31+
ProcessorCloseTimer metrics.Timer `metric:"thrift.udp.t-processor.close-time"`
32+
33+
// Number of failed buffer process operations
34+
HandlerProcessError metrics.Counter `metric:"thrift.udp.t-processor.handler-errors"`
35+
}
36+
}
37+
38+
// AgentProcessor handler used by the processor to process thrift and call the reporter
39+
// with the deserialized struct. This interface is implemented directly by Thrift generated
40+
// code, e.g. jaegerThrift.NewAgentProcessor(handler), where handler implements the Agent
41+
// Thrift service interface, which is invoked with the deserialized struct.
42+
type AgentProcessor interface {
43+
Process(ctx context.Context, iprot, oprot thrift.TProtocol) (success bool, err thrift.TException)
44+
}
45+
46+
// NewThriftProcessor creates a TBufferedServer backed ThriftProcessor
47+
func NewThriftProcessor(
48+
server servers.Server,
49+
numProcessors int,
50+
mFactory metrics.Factory,
51+
factory thrift.TProtocolFactory,
52+
handler AgentProcessor,
53+
logger *zap.Logger,
54+
) (*ThriftProcessor, error) {
55+
if numProcessors <= 0 {
56+
return nil, fmt.Errorf(
57+
"number of processors must be greater than 0, called with %d", numProcessors)
58+
}
59+
protocolPool := &sync.Pool{
60+
New: func() any {
61+
trans := &customtransport.TBufferedReadTransport{}
62+
return factory.GetProtocol(trans)
63+
},
64+
}
65+
66+
res := &ThriftProcessor{
67+
server: server,
68+
handler: handler,
69+
protocolPool: protocolPool,
70+
logger: logger,
71+
numProcessors: numProcessors,
72+
}
73+
metrics.Init(&res.metrics, mFactory, nil)
74+
res.processing.Add(res.numProcessors)
75+
for i := 0; i < res.numProcessors; i++ {
76+
go func() {
77+
res.processBuffer()
78+
res.processing.Done()
79+
}()
80+
}
81+
return res, nil
82+
}
83+
84+
// Serve starts serving traffic
85+
func (s *ThriftProcessor) Serve() {
86+
s.server.Serve()
87+
}
88+
89+
// IsServing indicates whether the server is currently serving traffic
90+
func (s *ThriftProcessor) IsServing() bool {
91+
return s.server.IsServing()
92+
}
93+
94+
// Stop stops the serving of traffic and waits until the queue is
95+
// emptied by the readers
96+
func (s *ThriftProcessor) Stop() {
97+
stopwatch := metrics.StartStopwatch(s.metrics.ProcessorCloseTimer)
98+
s.server.Stop()
99+
s.processing.Wait()
100+
stopwatch.Stop()
101+
}
102+
103+
// processBuffer reads data off the channel and puts it into a custom transport for
104+
// the processor to process
105+
func (s *ThriftProcessor) processBuffer() {
106+
for readBuf := range s.server.DataChan() {
107+
protocol := s.protocolPool.Get().(thrift.TProtocol)
108+
payload := readBuf.GetBytes()
109+
protocol.Transport().Write(payload)
110+
s.logger.Debug("Span(s) received by the agent", zap.Int("bytes-received", len(payload)))
111+
112+
// NB: oddly, thrift-gen/agent/agent.go:L156 does this: `return true, thrift.WrapTException(err2)`
113+
// So we check for both OK and error.
114+
if ok, err := s.handler.Process(context.Background(), protocol, protocol); !ok || err != nil {
115+
s.logger.Error("Processor failed", zap.Error(err))
116+
s.metrics.HandlerProcessError.Inc(1)
117+
}
118+
s.protocolPool.Put(protocol)
119+
s.server.DataRecd(readBuf) // acknowledge receipt and release the buffer
120+
}
121+
}

0 commit comments

Comments
 (0)