Skip to content

Commit 712c1bc

Browse files
authored
[agent] Refactor UDP server (#6852)
## Which problem is this PR solving? - Simplify code, part of #6704 ## Description of the changes - Remove the need for custom buffer type and use `bytes.Buffer` - Rename TBufferedServer to UDPServer since there's nothing Thrift-specific about it - Simplify tests of UDPServer to not depend on external Thrift types ## How was this change tested? - CI --------- Signed-off-by: Yuri Shkuro <[email protected]>
1 parent 302d99e commit 712c1bc

File tree

9 files changed

+140
-163
lines changed

9 files changed

+140
-163
lines changed

cmd/agent/app/builder.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ func (c *ServerConfiguration) applyDefaults() {
190190
}
191191

192192
// getUDPServer gets a TBufferedServer backed server using the server configuration
193-
func (c *ServerConfiguration) getUDPServer(mFactory metrics.Factory) (servers.Server, error) {
193+
func (c *ServerConfiguration) getUDPServer(mFactory metrics.Factory) (*servers.UDPServer, error) {
194194
c.applyDefaults()
195195

196196
if c.HostPort == "" {
@@ -206,7 +206,7 @@ func (c *ServerConfiguration) getUDPServer(mFactory metrics.Factory) (servers.Se
206206
}
207207
}
208208

209-
return servers.NewTBufferedServer(transport, c.QueueSize, c.MaxPacketSize, mFactory)
209+
return servers.NewUDPServer(transport, c.QueueSize, c.MaxPacketSize, mFactory)
210210
}
211211

212212
func defaultInt(value int, defaultVal int) int {

cmd/agent/app/processors/thrift_processor.go

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ import (
1919

2020
// ThriftProcessor is a server that processes spans using a TBuffered Server
2121
type ThriftProcessor struct {
22-
server servers.Server
22+
server *servers.UDPServer
2323
handler AgentProcessor
2424
protocolPool *sync.Pool
2525
numProcessors int
@@ -44,7 +44,7 @@ type AgentProcessor interface {
4444

4545
// NewThriftProcessor creates a TBufferedServer backed ThriftProcessor
4646
func NewThriftProcessor(
47-
server servers.Server,
47+
server *servers.UDPServer,
4848
numProcessors int,
4949
mFactory metrics.Factory,
5050
factory thrift.TProtocolFactory,
@@ -102,11 +102,10 @@ func (s *ThriftProcessor) Stop() {
102102
// processBuffer reads data off the channel and puts it into a custom transport for
103103
// the processor to process
104104
func (s *ThriftProcessor) processBuffer() {
105-
for readBuf := range s.server.DataChan() {
105+
for buf := range s.server.DataChan() {
106106
protocol := s.protocolPool.Get().(thrift.TProtocol)
107-
payload := readBuf.GetBytes()
108-
protocol.Transport().Write(payload)
109-
s.logger.Debug("Span(s) received by the agent", zap.Int("bytes-received", len(payload)))
107+
buf.WriteTo(protocol.Transport())
108+
s.logger.Debug("Span(s) received by the agent", zap.Int("bytes-received", buf.Len()))
110109

111110
// NB: oddly, thrift-gen/agent/agent.go:L156 does this: `return true, thrift.WrapTException(err2)`
112111
// So we check for both OK and error.
@@ -115,6 +114,6 @@ func (s *ThriftProcessor) processBuffer() {
115114
s.metrics.HandlerProcessError.Inc(1)
116115
}
117116
s.protocolPool.Put(protocol)
118-
s.server.DataRecd(readBuf) // acknowledge receipt and release the buffer
117+
s.server.DataRecd(buf) // acknowledge receipt and release the buffer
119118
}
120119
}

cmd/agent/app/processors/thrift_processor_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ func createProcessor(t *testing.T, mFactory metrics.Factory, tFactory thrift.TPr
4949

5050
queueSize := 10
5151
maxPacketSize := 65000
52-
server, err := servers.NewTBufferedServer(transport, queueSize, maxPacketSize, mFactory)
52+
server, err := servers.NewUDPServer(transport, queueSize, maxPacketSize, mFactory)
5353
require.NoError(t, err)
5454

5555
numProcessors := 1

cmd/agent/app/servers/package_test.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
// Copyright (c) 2025 The Jaeger Authors.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package servers
5+
6+
import (
7+
"testing"
8+
9+
"github.com/jaegertracing/jaeger/internal/testutils"
10+
)
11+
12+
func TestMain(m *testing.M) {
13+
testutils.VerifyGoLeaks(m)
14+
}

cmd/agent/app/servers/server.go

Lines changed: 0 additions & 36 deletions
This file was deleted.

cmd/agent/app/servers/server_test.go

Lines changed: 0 additions & 35 deletions
This file was deleted.

cmd/agent/app/servers/thriftudp/transport_test.go

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -121,21 +121,36 @@ func TestWriteRead(t *testing.T) {
121121
require.NoError(t, err)
122122
defer client.Close()
123123

124+
// client makes two writes and one flush, resulting in one packet
124125
n, err := client.Write([]byte("test"))
125126
require.NoError(t, err)
126127
require.Equal(t, 4, n)
127-
n, err = client.Write([]byte("string"))
128+
n, err = client.Write([]byte("string1"))
128129
require.NoError(t, err)
129-
require.Equal(t, 6, n)
130-
err = client.Flush(context.Background())
130+
require.Equal(t, 7, n)
131+
require.NoError(t, client.Flush(context.Background()))
132+
133+
// another packet
134+
n, err = client.Write([]byte("test"))
135+
require.NoError(t, err)
136+
require.Equal(t, 4, n)
137+
n, err = client.Write([]byte("string2"))
131138
require.NoError(t, err)
139+
require.Equal(t, 7, n)
140+
require.NoError(t, client.Flush(context.Background()))
132141

133-
expected := []byte("teststring")
142+
expected := "teststring1"
134143
readBuf := make([]byte, 20)
135144
n, err = server.Read(readBuf)
136145
require.NoError(t, err)
137-
require.Len(t, expected, n)
138-
require.Equal(t, expected, readBuf[0:n])
146+
assert.Len(t, expected, n)
147+
assert.Equal(t, expected, string(readBuf[0:n]))
148+
149+
expected = "teststring2"
150+
n, err = server.Read(readBuf)
151+
require.NoError(t, err)
152+
assert.Len(t, expected, n)
153+
assert.Equal(t, expected, string(readBuf[0:n]))
139154
}
140155

141156
func TestDoubleCloseError(t *testing.T) {

cmd/agent/app/servers/tbuffered_server.go renamed to cmd/agent/app/servers/udpserver.go

Lines changed: 77 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -5,31 +5,40 @@
55
package servers
66

77
import (
8+
"bytes"
89
"io"
910
"sync"
1011
"sync/atomic"
1112

1213
"github.com/jaegertracing/jaeger/pkg/metrics"
1314
)
1415

15-
// ThriftTransport is a subset of thrift.TTransport methods, for easier mocking.
16-
type ThriftTransport interface {
16+
// TBufferedServer is an alias for UDPServer, for backwards compatibility.
17+
type TBufferedServer = UDPServer
18+
19+
// NewTBufferedServer is an alias for NewUDPServer, for backwards compatibility.
20+
var NewTBufferedServer = NewUDPServer
21+
22+
// UDPConn is a an abstraction of *net.UDPConn, for easier mocking.
23+
type UDPConn interface {
1724
io.Reader
1825
io.Closer
1926
}
2027

21-
// TBufferedServer is a custom thrift server that reads traffic using the transport provided
22-
// and places messages into a buffered channel to be processed by the processor provided
23-
type TBufferedServer struct {
28+
// UDPServer reads packets from a UDP connection into bytes.Buffer and places
29+
// each buffer into a bounded channel to be consumed by the receiver.
30+
// After consuming the buffer, the receiver SHOULD call DataRecd() to signal
31+
// that the buffer is no longer in use and to return it to the pool.
32+
type UDPServer struct {
2433
// NB. queueLength HAS to be at the top of the struct or it will SIGSEV for certain architectures.
2534
// See https://github.com/golang/go/issues/13868
2635
queueSize int64
27-
dataChan chan *ReadBuf
36+
dataChan chan *bytes.Buffer
2837
maxPacketSize int
2938
maxQueueSize int
3039
serving uint32
31-
transport ThriftTransport
32-
readBufPool *sync.Pool
40+
transport UDPConn
41+
readBufPool sync.Pool
3342
metrics struct {
3443
// Size of the current server queue
3544
QueueSize metrics.Gauge `metric:"thrift.udp.server.queue_size"`
@@ -58,86 +67,117 @@ const (
5867
stateInit
5968
)
6069

61-
// NewTBufferedServer creates a TBufferedServer
62-
func NewTBufferedServer(
63-
transport ThriftTransport,
70+
// NewUDPServer creates a UDPServer
71+
func NewUDPServer(
72+
transport UDPConn,
6473
maxQueueSize int,
6574
maxPacketSize int,
6675
mFactory metrics.Factory,
67-
) (*TBufferedServer, error) {
68-
dataChan := make(chan *ReadBuf, maxQueueSize)
69-
70-
readBufPool := &sync.Pool{
71-
New: func() any {
72-
return &ReadBuf{bytes: make([]byte, maxPacketSize)}
73-
},
74-
}
75-
76-
res := &TBufferedServer{
77-
dataChan: dataChan,
76+
) (*UDPServer, error) {
77+
srv := &UDPServer{
78+
dataChan: make(chan *bytes.Buffer, maxQueueSize),
7879
transport: transport,
7980
maxQueueSize: maxQueueSize,
8081
maxPacketSize: maxPacketSize,
81-
readBufPool: readBufPool,
8282
serving: stateInit,
83+
readBufPool: sync.Pool{
84+
New: func() any {
85+
return new(bytes.Buffer)
86+
},
87+
},
88+
}
89+
90+
metrics.MustInit(&srv.metrics, mFactory, nil)
91+
return srv, nil
92+
}
93+
94+
// packetReader is a helper for reading a single packet no larger than maxPacketSize
95+
// from the underlying reader. Without it the ReadFrom() method of bytes.Buffer would
96+
// read multiple packets and won't even stop at maxPacketSize.
97+
type packetReader struct {
98+
maxPacketSize int
99+
reader io.LimitedReader
100+
attempt int
101+
}
102+
103+
func (r *packetReader) Read(p []byte) (int, error) {
104+
if r.attempt > 0 {
105+
return 0, io.EOF
83106
}
107+
r.attempt = 1
108+
return r.reader.Read(p)
109+
}
84110

85-
metrics.MustInit(&res.metrics, mFactory, nil)
86-
return res, nil
111+
func (r *packetReader) readPacket(buf *bytes.Buffer) (int, error) {
112+
// reset the readers since we're reusing them to avoid allocations
113+
r.attempt = 0
114+
r.reader.N = int64(r.maxPacketSize)
115+
// prepare the buffer for expected packet size
116+
buf.Grow(r.maxPacketSize)
117+
buf.Reset()
118+
// use Buffer's ReadFrom() as otherwise it's hard to get it into the right state
119+
n, err := buf.ReadFrom(r)
120+
return int(n), err
87121
}
88122

89123
// Serve initiates the readers and starts serving traffic
90-
func (s *TBufferedServer) Serve() {
124+
func (s *UDPServer) Serve() {
91125
defer close(s.dataChan)
92126
if !atomic.CompareAndSwapUint32(&s.serving, stateInit, stateServing) {
93127
return // Stop already called
94128
}
95129

130+
pr := &packetReader{
131+
maxPacketSize: s.maxPacketSize,
132+
reader: io.LimitedReader{
133+
R: s.transport,
134+
},
135+
}
136+
96137
for s.IsServing() {
97-
readBuf := s.readBufPool.Get().(*ReadBuf)
98-
n, err := s.transport.Read(readBuf.bytes)
138+
buf := s.readBufPool.Get().(*bytes.Buffer)
139+
n, err := pr.readPacket(buf)
99140
if err == nil {
100-
readBuf.n = n
101141
s.metrics.PacketSize.Update(int64(n))
102142
select {
103-
case s.dataChan <- readBuf:
143+
case s.dataChan <- buf:
104144
s.metrics.PacketsProcessed.Inc(1)
105145
s.updateQueueSize(1)
106146
default:
107-
s.readBufPool.Put(readBuf)
147+
s.readBufPool.Put(buf)
108148
s.metrics.PacketsDropped.Inc(1)
109149
}
110150
} else {
111-
s.readBufPool.Put(readBuf)
151+
s.readBufPool.Put(buf)
112152
s.metrics.ReadError.Inc(1)
113153
}
114154
}
115155
}
116156

117-
func (s *TBufferedServer) updateQueueSize(delta int64) {
157+
func (s *UDPServer) updateQueueSize(delta int64) {
118158
atomic.AddInt64(&s.queueSize, delta)
119159
s.metrics.QueueSize.Update(atomic.LoadInt64(&s.queueSize))
120160
}
121161

122162
// IsServing indicates whether the server is currently serving traffic
123-
func (s *TBufferedServer) IsServing() bool {
163+
func (s *UDPServer) IsServing() bool {
124164
return atomic.LoadUint32(&s.serving) == stateServing
125165
}
126166

127167
// Stop stops the serving of traffic and waits until the queue is
128168
// emptied by the readers
129-
func (s *TBufferedServer) Stop() {
169+
func (s *UDPServer) Stop() {
130170
atomic.StoreUint32(&s.serving, stateStopped)
131171
_ = s.transport.Close()
132172
}
133173

134174
// DataChan returns the data chan of the buffered server
135-
func (s *TBufferedServer) DataChan() chan *ReadBuf {
175+
func (s *UDPServer) DataChan() chan *bytes.Buffer {
136176
return s.dataChan
137177
}
138178

139179
// DataRecd is called by the consumers every time they read a data item from DataChan
140-
func (s *TBufferedServer) DataRecd(buf *ReadBuf) {
180+
func (s *UDPServer) DataRecd(buf *bytes.Buffer) {
141181
s.updateQueueSize(-1)
142182
s.readBufPool.Put(buf)
143183
}

0 commit comments

Comments
 (0)