Skip to content

Commit e609f9b

Browse files
hovavzadjaglowski
authored andcommitted
Add async config block & concurrent readers to UDP input operator (open-telemetry#27647)
**Description:** adding a feature - Adding asynchronous & concurrency mode to the UDP receiver/stanza input operator - goal is to reduce UDP packet loss in high-scale scenarios. Added 'async' block that holds 'FixedAReaderRoutineCount' field - it determines how many concurrent readers will read from the UDP port, process logs, and send them downstream. **Link to tracking Issue:** 27613 **Testing:** Local stress tests ran all types of config (no 'async', with empty 'async', with 'async' that contains FixedAReaderRoutineCount=2). In repo, added single test to udp_test, config_test (in stanza udp operator), and udp_test (in udplogreceiver). **Documentation:** Updated md file for both udplogreceiver & stanza udp_input operator with the new flags. --------- Co-authored-by: Daniel Jaglowski <[email protected]>
1 parent 2a0e096 commit e609f9b

File tree

8 files changed

+160
-48
lines changed

8 files changed

+160
-48
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: pkg/stanza
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Add option to run udp logs receiver (and stanza udp input operator) concurrently to reduce data-loss during high-scale scenarios
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [27613]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: []

pkg/stanza/docs/operators/udp_input.md

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,12 @@ The `udp_input` operator listens for logs from UDP packets.
1212
| `attributes` | {} | A map of `key: value` pairs to add to the entry's attributes. |
1313
| `one_log_per_packet` | false | Skip log tokenization, set to true if logs contains one log per record and multiline is not used. This will improve performance. |
1414
| `resource` | {} | A map of `key: value` pairs to add to the entry's resource. |
15-
| `add_attributes` | false | Adds `net.*` attributes according to [semantic convention][https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/span-general.md#general-network-connection-attributes]. |
15+
| `add_attributes` | false | Adds `net.*` attributes according to [semantic convention][https://github.com/open-telemetry/semantic-conventions/blob/cee22ec91448808ebcfa53df689c800c7171c9e1/docs/general/attributes.md#other-network-attributes]. |
1616
| `multiline` | | A `multiline` configuration block. See below for details. |
1717
| `preserve_leading_whitespaces` | false | Whether to preserve leading whitespaces. |
1818
| `preserve_trailing_whitespaces` | false | Whether to preserve trailing whitespaces. |
1919
| `encoding` | `utf-8` | The encoding of the file being read. See the list of supported encodings below for available options. |
20+
| `async` | nil | An `async` configuration block. See below for details. |
2021

2122
#### `multiline` configuration
2223

@@ -45,6 +46,16 @@ Other less common encodings are supported on a best-effort basis.
4546
See [https://www.iana.org/assignments/character-sets/character-sets.xhtml](https://www.iana.org/assignments/character-sets/character-sets.xhtml)
4647
for other encodings available.
4748

49+
#### `async` configuration
50+
51+
If set, the `async` configuration block instructs the `udp_input` operator to read and process logs asynchronsouly and concurrently.
52+
53+
**note** If `async` is not set at all, a single thread will read lines synchronously.
54+
55+
| Field | Default | Description |
56+
| --- | --- | --- |
57+
| `readers` | 1 | Concurrency level - Determines how many go routines read from UDP port (and process logs before sending downstream). |
58+
4859
### Example Configurations
4960

5061
#### Simple

pkg/stanza/operator/input/udp/config_test.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,21 @@ func TestUnmarshal(t *testing.T) {
3333
return cfg
3434
}(),
3535
},
36+
{
37+
Name: "all_with_async",
38+
ExpectErr: false,
39+
Expect: func() *Config {
40+
cfg := NewConfig()
41+
cfg.ListenAddress = "10.0.0.1:9000"
42+
cfg.AddAttributes = true
43+
cfg.Encoding = "utf-8"
44+
cfg.SplitConfig.LineStartPattern = "ABC"
45+
cfg.SplitConfig.LineEndPattern = ""
46+
cfg.AsyncConfig = NewAsyncConfig()
47+
cfg.AsyncConfig.Readers = 2
48+
return cfg
49+
}(),
50+
},
3651
},
3752
}.Run(t)
3853
}

pkg/stanza/operator/input/udp/testdata/config.yaml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,3 +8,13 @@ all:
88
multiline:
99
line_start_pattern: ABC
1010
line_end_pattern: ""
11+
all_with_async:
12+
type: udp_input
13+
listen_address: 10.0.0.1:9000
14+
add_attributes: true
15+
encoding: utf-8
16+
multiline:
17+
line_start_pattern: ABC
18+
line_end_pattern: ""
19+
async:
20+
readers: 2

pkg/stanza/operator/input/udp/udp.go

Lines changed: 57 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,17 @@ type Config struct {
5858
BaseConfig `mapstructure:",squash"`
5959
}
6060

61+
type AsyncConfig struct {
62+
Readers int `mapstructure:"readers,omitempty"`
63+
}
64+
65+
// NewAsyncConfig creates a new AsyncConfig with default values.
66+
func NewAsyncConfig() *AsyncConfig {
67+
return &AsyncConfig{
68+
Readers: 1,
69+
}
70+
}
71+
6172
// BaseConfig is the details configuration of a udp input operator.
6273
type BaseConfig struct {
6374
ListenAddress string `mapstructure:"listen_address,omitempty"`
@@ -66,6 +77,7 @@ type BaseConfig struct {
6677
Encoding string `mapstructure:"encoding,omitempty"`
6778
SplitConfig split.Config `mapstructure:"multiline,omitempty"`
6879
TrimConfig trim.Config `mapstructure:",squash"`
80+
AsyncConfig *AsyncConfig `mapstructure:"async,omitempty"`
6981
}
7082

7183
// Build will build a udp input operator.
@@ -101,6 +113,14 @@ func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) {
101113
resolver = helper.NewIPResolver()
102114
}
103115

116+
if c.AsyncConfig == nil {
117+
c.AsyncConfig = NewAsyncConfig()
118+
}
119+
120+
if c.AsyncConfig.Readers <= 0 {
121+
return nil, fmt.Errorf("async readers must be greater than 0")
122+
}
123+
104124
udpInput := &Input{
105125
InputOperator: inputOperator,
106126
address: address,
@@ -110,6 +130,7 @@ func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) {
110130
splitFunc: splitFunc,
111131
resolver: resolver,
112132
OneLogPerPacket: c.OneLogPerPacket,
133+
AsyncConfig: c.AsyncConfig,
113134
}
114135
return udpInput, nil
115136
}
@@ -121,6 +142,7 @@ type Input struct {
121142
address *net.UDPAddr
122143
addAttributes bool
123144
OneLogPerPacket bool
145+
AsyncConfig *AsyncConfig
124146

125147
connection net.PacketConn
126148
cancel context.CancelFunc
@@ -148,44 +170,47 @@ func (u *Input) Start(_ operator.Persister) error {
148170

149171
// goHandleMessages will handle messages from a udp connection.
150172
func (u *Input) goHandleMessages(ctx context.Context) {
151-
u.wg.Add(1)
152-
153-
go func() {
154-
defer u.wg.Done()
155-
156-
dec := decode.New(u.encoding)
157-
buf := make([]byte, 0, MaxUDPSize)
158-
for {
159-
message, remoteAddr, err := u.readMessage()
160-
if err != nil {
161-
select {
162-
case <-ctx.Done():
163-
return
164-
default:
165-
u.Errorw("Failed reading messages", zap.Error(err))
166-
}
167-
break
168-
}
173+
for i := 0; i < u.AsyncConfig.Readers; i++ {
174+
u.wg.Add(1)
175+
go u.readAndProcessMessages(ctx)
176+
}
177+
}
169178

170-
if u.OneLogPerPacket {
171-
log := truncateMaxLog(message)
172-
u.handleMessage(ctx, remoteAddr, dec, log)
173-
continue
179+
func (u *Input) readAndProcessMessages(ctx context.Context) {
180+
defer u.wg.Done()
181+
182+
dec := decode.New(u.encoding)
183+
buf := make([]byte, 0, MaxUDPSize)
184+
for {
185+
message, remoteAddr, err := u.readMessage()
186+
if err != nil {
187+
select {
188+
case <-ctx.Done():
189+
return
190+
default:
191+
u.Errorw("Failed reading messages", zap.Error(err))
174192
}
193+
break
194+
}
175195

176-
scanner := bufio.NewScanner(bytes.NewReader(message))
177-
scanner.Buffer(buf, MaxUDPSize)
196+
if u.OneLogPerPacket {
197+
log := truncateMaxLog(message)
198+
u.handleMessage(ctx, remoteAddr, dec, log)
199+
continue
200+
}
178201

179-
scanner.Split(u.splitFunc)
202+
scanner := bufio.NewScanner(bytes.NewReader(message))
203+
scanner.Buffer(buf, MaxUDPSize)
180204

181-
for scanner.Scan() {
182-
u.handleMessage(ctx, remoteAddr, dec, scanner.Bytes())
183-
}
184-
if err := scanner.Err(); err != nil {
185-
u.Errorw("Scanner error", zap.Error(err))
186-
}
205+
scanner.Split(u.splitFunc)
206+
207+
for scanner.Scan() {
208+
u.handleMessage(ctx, remoteAddr, dec, scanner.Bytes())
187209
}
188-
}()
210+
if err := scanner.Err(); err != nil {
211+
u.Errorw("Scanner error", zap.Error(err))
212+
}
213+
}
189214
}
190215

191216
func truncateMaxLog(data []byte) (token []byte) {

pkg/stanza/operator/input/udp/udp_test.go

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,8 @@ import (
1818
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/testutil"
1919
)
2020

21-
func udpInputTest(input []byte, expected []string) func(t *testing.T) {
21+
func udpInputTest(input []byte, expected []string, cfg *Config) func(t *testing.T) {
2222
return func(t *testing.T) {
23-
cfg := NewConfigWithID("test_input")
24-
cfg.ListenAddress = ":0"
25-
2623
op, err := cfg.Build(testutil.Logger(t))
2724
require.NoError(t, err)
2825

@@ -138,10 +135,17 @@ func udpInputAttributesTest(input []byte, expected []string) func(t *testing.T)
138135
}
139136

140137
func TestInput(t *testing.T) {
141-
t.Run("Simple", udpInputTest([]byte("message1"), []string{"message1"}))
142-
t.Run("TrailingNewlines", udpInputTest([]byte("message1\n"), []string{"message1"}))
143-
t.Run("TrailingCRNewlines", udpInputTest([]byte("message1\r\n"), []string{"message1"}))
144-
t.Run("NewlineInMessage", udpInputTest([]byte("message1\nmessage2\n"), []string{"message1\nmessage2"}))
138+
cfg := NewConfigWithID("test_input")
139+
cfg.ListenAddress = ":0"
140+
141+
t.Run("Simple", udpInputTest([]byte("message1"), []string{"message1"}, cfg))
142+
t.Run("TrailingNewlines", udpInputTest([]byte("message1\n"), []string{"message1"}, cfg))
143+
t.Run("TrailingCRNewlines", udpInputTest([]byte("message1\r\n"), []string{"message1"}, cfg))
144+
t.Run("NewlineInMessage", udpInputTest([]byte("message1\nmessage2\n"), []string{"message1\nmessage2"}, cfg))
145+
146+
cfg.AsyncConfig = NewAsyncConfig()
147+
cfg.AsyncConfig.Readers = 2
148+
t.Run("SimpleAsync", udpInputTest([]byte("message1"), []string{"message1"}, cfg))
145149
}
146150

147151
func TestInputAttributes(t *testing.T) {

receiver/udplogreceiver/README.md

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,11 @@ Receives logs over UDP.
2424
| `attributes` | {} | A map of `key: value` pairs to add to the entry's attributes |
2525
| `one_log_per_packet` | false | Skip log tokenization, set to true if logs contains one log per record and multiline is not used. This will improve performance. |
2626
| `resource` | {} | A map of `key: value` pairs to add to the entry's resource |
27-
| `add_attributes` | false | Adds `net.*` attributes according to [semantic convention][https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/span-general.md#general-network-connection-attributes] |
27+
| `add_attributes` | false | Adds `net.*` attributes according to [semantic convention][hhttps://github.com/open-telemetry/semantic-conventions/blob/cee22ec91448808ebcfa53df689c800c7171c9e1/docs/general/attributes.md#other-network-attributes] |
2828
| `multiline` | | A `multiline` configuration block. See below for details |
2929
| `encoding` | `utf-8` | The encoding of the file being read. See the list of supported encodings below for available options |
3030
| `operators` | [] | An array of [operators](../../pkg/stanza/docs/operators/README.md#what-operators-are-available). See below for more details |
31+
| `async` | nil | An `async` configuration block. See below for details. |
3132

3233
### Operators
3334

@@ -69,6 +70,16 @@ Other less common encodings are supported on a best-effort basis.
6970
See [https://www.iana.org/assignments/character-sets/character-sets.xhtml](https://www.iana.org/assignments/character-sets/character-sets.xhtml)
7071
for other encodings available.
7172

73+
#### `async` configuration
74+
75+
If set, the `async` configuration block instructs the `udp_input` operator to read and process logs asynchronsouly and concurrently.
76+
77+
**note** If `async` is not set at all, a single thread will read lines synchronously.
78+
79+
| Field | Default | Description |
80+
| --- | --- | --- |
81+
| `readers` | 1 | Concurrency level - Determines how many go routines read from UDP port (and process logs before sending downstream). |
82+
7283
## Example Configurations
7384

7485
### Simple

receiver/udplogreceiver/udp_test.go

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,19 @@ import (
2525
)
2626

2727
func TestUdp(t *testing.T) {
28-
testUDP(t, testdataConfigYaml())
28+
listenAddress := "127.0.0.1:29018"
29+
testUDP(t, testdataConfigYaml(listenAddress), listenAddress)
2930
}
3031

31-
func testUDP(t *testing.T, cfg *UDPLogConfig) {
32+
func TestUdpAsync(t *testing.T) {
33+
listenAddress := "127.0.0.1:29019"
34+
cfg := testdataConfigYaml(listenAddress)
35+
cfg.InputConfig.AsyncConfig = udp.NewAsyncConfig()
36+
cfg.InputConfig.AsyncConfig.Readers = 2
37+
testUDP(t, testdataConfigYaml(listenAddress), listenAddress)
38+
}
39+
40+
func testUDP(t *testing.T, cfg *UDPLogConfig, listenAddress string) {
3241
numLogs := 5
3342

3443
f := NewFactory()
@@ -38,7 +47,7 @@ func testUDP(t *testing.T, cfg *UDPLogConfig) {
3847
require.NoError(t, rcvr.Start(context.Background(), componenttest.NewNopHost()))
3948

4049
var conn net.Conn
41-
conn, err = net.Dial("udp", "127.0.0.1:29018")
50+
conn, err = net.Dial("udp", listenAddress)
4251
require.NoError(t, err)
4352

4453
for i := 0; i < numLogs; i++ {
@@ -78,17 +87,17 @@ func TestLoadConfig(t *testing.T) {
7887
require.NoError(t, component.UnmarshalConfig(sub, cfg))
7988

8089
assert.NoError(t, component.ValidateConfig(cfg))
81-
assert.Equal(t, testdataConfigYaml(), cfg)
90+
assert.Equal(t, testdataConfigYaml("127.0.0.1:29018"), cfg)
8291
}
8392

84-
func testdataConfigYaml() *UDPLogConfig {
93+
func testdataConfigYaml(listenAddress string) *UDPLogConfig {
8594
return &UDPLogConfig{
8695
BaseConfig: adapter.BaseConfig{
8796
Operators: []operator.Config{},
8897
},
8998
InputConfig: func() udp.Config {
9099
c := udp.NewConfig()
91-
c.ListenAddress = "127.0.0.1:29018"
100+
c.ListenAddress = listenAddress
92101
return *c
93102
}(),
94103
}

0 commit comments

Comments
 (0)