Skip to content

Commit 0d9a3f8

Browse files
authored
[receiver/journald] Restart journalctl if it exits unexpectedly (#35635)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description According to the community, there are bugs in systemd that could corrupt the journal files or crash the log receiver: systemd/systemd#24320 systemd/systemd#24150 We've seen some issues reported to Elastic/beats project: elastic/beats#39352 elastic/beats#32782 elastic/beats#34077 Unfortunately, the otelcol is not immune from these issues. When the journalctl process exits for any reason, the log consumption from journald just stops. We've experienced this on some machines that have high log volume. Currently we monitors the journalctl processes started by otelcol, and restart the otelcol when some of them is missing. IMO, The journald receiver itself should monitor the journalctl process it starts, and does its best to keep it alive. In this PR, we try to restart the journalctl process when it exits unexpectedly. As long as the journalctl cmd can be started (via `Cmd.Start()`) successfully, the journald_input will always try to restart the journalctl process if it exits. The error reporting behaviour changes a bit in this PR. Before the PR, the `operator.Start` waits up to 1 sec to capture any immediate error returned from journalctl. After the PR, the error won't be reported back even if the journalctl exits immediately after start, instead, the error will be logged, and the process will be restarted. The fix is largely inspired by elastic/beats#40558. <!--Describe what testing was performed and which tests were added.--> #### Testing Add a simple bash script that print a line every second, and load it to systemd. `log_every_second.sh`: ```bash #!/bin/bash while true; do echo "Log message: $(date)" sleep 1 done ``` `log.service`: ``` [Unit] Description=Print logs to journald every second After=network.target [Service] ExecStart=/usr/local/bin/log_every_second.sh Restart=always StandardOutput=journal StandardError=journal [Install] WantedBy=multi-user.target ``` Start the otelcol with the following config: ```yaml service: telemetry: logs: level: debug pipelines: logs: receivers: [journald] processors: [] exporters: [debug] receivers: journald: exporters: debug: verbosity: basic sampling_initial: 1 sampling_thereafter: 1 ``` Kill the journalctl process and observe the otelcol's behaviour. The journactl process will be restarted after the backoff period (hardcoded to 2 sec): ```bash 2024-10-06T14:32:33.755Z info LogsExporter {"kind": "exporter", "data_type": "logs", "name": "debug", "resource logs": 1, "log records": 1} 2024-10-06T14:32:34.709Z error journald/input.go:98 journalctl command exited {"kind": "receiver", "name": "journald", "data_type": "logs", "operator_id": "journald_input", "operator_type": "journald_input", "error": "signal: terminated"} github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/input/journald.(*Input).run github.com/open-telemetry/opentelemetry-collector-contrib/pkg/[email protected]/operator/input/journald/input.go:98 2024-10-06T14:32:36.712Z debug journald/input.go:94 Starting the journalctl command {"kind": "receiver", "name": "journald", "data_type": "logs", "operator_id": "journald_input", "operator_type": "journald_input"} 2024-10-06T14:32:36.756Z info LogsExporter {"kind": "exporter", "data_type": "logs", "name": "debug", "resource logs": 1, "log records": 10} ``` <!--Please delete paragraphs that you did not use before submitting.--> --------- Signed-off-by: Mengnan Gong <[email protected]>
1 parent ed302a3 commit 0d9a3f8

File tree

4 files changed

+137
-80
lines changed

4 files changed

+137
-80
lines changed

.chloggen/restart-journalctl.yaml

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: journaldreceiver
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: "Restart journalctl if it exits unexpectedly"
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [35635]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: []

pkg/stanza/operator/input/journald/config_linux.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,12 @@ func (c Config) Build(set component.TelemetrySettings) (operator.Operator, error
4040
return &Input{
4141
InputOperator: inputOperator,
4242
newCmd: func(ctx context.Context, cursor []byte) cmd {
43+
// Copy args and if needed, add the cursor flag
44+
journalArgs := append([]string{}, args...)
4345
if cursor != nil {
44-
args = append(args, "--after-cursor", string(cursor))
46+
journalArgs = append(journalArgs, "--after-cursor", string(cursor))
4547
}
46-
return exec.CommandContext(ctx, "journalctl", args...) // #nosec - ...
48+
return exec.CommandContext(ctx, "journalctl", journalArgs...) // #nosec - ...
4749
// journalctl is an executable that is required for this operator to function
4850
},
4951
json: jsoniter.ConfigFastest,

pkg/stanza/operator/input/journald/input.go

Lines changed: 94 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ import (
1313
"io"
1414
"os/exec"
1515
"strconv"
16-
"strings"
1716
"sync"
1817
"time"
1918

@@ -35,6 +34,7 @@ type Input struct {
3534
json jsoniter.API
3635
cancel context.CancelFunc
3736
wg sync.WaitGroup
37+
errChan chan error
3838
}
3939

4040
type cmd interface {
@@ -44,9 +44,10 @@ type cmd interface {
4444
Wait() error
4545
}
4646

47-
type failedCommand struct {
48-
err string
49-
output string
47+
type journalctl struct {
48+
cmd cmd
49+
stdout io.ReadCloser
50+
stderr io.ReadCloser
5051
}
5152

5253
var lastReadCursorKey = "lastReadCursor"
@@ -56,85 +57,123 @@ func (operator *Input) Start(persister operator.Persister) error {
5657
ctx, cancel := context.WithCancel(context.Background())
5758
operator.cancel = cancel
5859

60+
operator.persister = persister
61+
operator.errChan = make(chan error)
62+
63+
go operator.run(ctx)
64+
65+
select {
66+
case err := <-operator.errChan:
67+
return fmt.Errorf("journalctl command failed: %w", err)
68+
case <-time.After(waitDuration):
69+
return nil
70+
}
71+
}
72+
73+
// run starts the journalctl process and monitor it.
74+
// If there is an error in operator.newJournalctl, the error will be sent to operator.errChan.
75+
// If the journalctl process started successfully, but there is an error in operator.runJournalctl,
76+
// The error will be logged and the journalctl process will be restarted.
77+
func (operator *Input) run(ctx context.Context) {
78+
for {
79+
select {
80+
case <-ctx.Done():
81+
return
82+
default:
83+
jctl, err := operator.newJournalctl(ctx)
84+
// If we can't start journalctl, there is nothing we can do but logging the error and return.
85+
if err != nil {
86+
select {
87+
case operator.errChan <- err:
88+
case <-time.After(waitDuration):
89+
operator.Logger().Error("Failed to init and start journalctl", zap.Error(err))
90+
}
91+
return
92+
}
93+
94+
operator.Logger().Debug("Starting the journalctl command")
95+
if err := operator.runJournalctl(ctx, jctl); err != nil {
96+
ee := &exec.ExitError{}
97+
if ok := errors.As(err, &ee); ok && ee.ExitCode() != 0 {
98+
operator.Logger().Error("journalctl command exited", zap.Error(ee))
99+
} else {
100+
operator.Logger().Info("journalctl command exited")
101+
}
102+
}
103+
// Backoff before restart.
104+
select {
105+
case <-ctx.Done():
106+
return
107+
case <-time.After(2 * time.Second):
108+
}
109+
}
110+
}
111+
}
112+
113+
// newJournalctl creates a new journalctl command.
114+
func (operator *Input) newJournalctl(ctx context.Context) (*journalctl, error) {
59115
// Start from a cursor if there is a saved offset
60-
cursor, err := persister.Get(ctx, lastReadCursorKey)
116+
cursor, err := operator.persister.Get(ctx, lastReadCursorKey)
61117
if err != nil {
62-
return fmt.Errorf("failed to get journalctl state: %w", err)
118+
return nil, fmt.Errorf("failed to get journalctl state: %w", err)
63119
}
64120

65-
operator.persister = persister
66-
67-
// Start journalctl
68121
journal := operator.newCmd(ctx, cursor)
69-
stdout, err := journal.StdoutPipe()
70-
if err != nil {
71-
return fmt.Errorf("failed to get journalctl stdout: %w", err)
122+
jctl := &journalctl{
123+
cmd: journal,
72124
}
73-
stderr, err := journal.StderrPipe()
125+
126+
jctl.stdout, err = journal.StdoutPipe()
74127
if err != nil {
75-
return fmt.Errorf("failed to get journalctl stderr: %w", err)
128+
return nil, fmt.Errorf("failed to get journalctl stdout: %w", err)
76129
}
77-
err = journal.Start()
130+
jctl.stderr, err = journal.StderrPipe()
78131
if err != nil {
79-
return fmt.Errorf("start journalctl: %w", err)
132+
return nil, fmt.Errorf("failed to get journalctl stderr: %w", err)
80133
}
81134

82-
stderrChan := make(chan string)
83-
failedChan := make(chan failedCommand)
84-
85-
// Start the wait goroutine
86-
operator.wg.Add(1)
87-
go func() {
88-
defer operator.wg.Done()
89-
err := journal.Wait()
90-
message := <-stderrChan
91-
92-
f := failedCommand{
93-
output: message,
94-
}
95-
96-
if err != nil {
97-
ee := (&exec.ExitError{})
98-
if ok := errors.As(err, &ee); ok && ee.ExitCode() != 0 {
99-
f.err = ee.Error()
100-
}
101-
}
135+
if err = journal.Start(); err != nil {
136+
return nil, fmt.Errorf("start journalctl: %w", err)
137+
}
102138

103-
select {
104-
case failedChan <- f:
105-
// log an error in case channel is closed
106-
case <-time.After(waitDuration):
107-
operator.Logger().Error("journalctl command exited", zap.String("error", f.err), zap.String("output", f.output))
108-
}
109-
}()
139+
return jctl, nil
140+
}
110141

111-
// Start the stderr reader goroutine
142+
// runJournalctl runs the journalctl command. This is a blocking call that returns
143+
// when the command exits.
144+
func (operator *Input) runJournalctl(ctx context.Context, jctl *journalctl) error {
145+
// Start the stderr reader goroutine.
146+
// This goroutine reads the stderr from the journalctl process. If the
147+
// process exits for any reason, then the stderr will be closed, this
148+
// goroutine will get an EOF error and exit.
112149
operator.wg.Add(1)
113150
go func() {
114151
defer operator.wg.Done()
115152

116-
stderrBuf := bufio.NewReader(stderr)
117-
messages := []string{}
153+
stderrBuf := bufio.NewReader(jctl.stderr)
118154

119155
for {
120156
line, err := stderrBuf.ReadBytes('\n')
121157
if err != nil {
122158
if !errors.Is(err, io.EOF) {
123159
operator.Logger().Error("Received error reading from journalctl stderr", zap.Error(err))
124160
}
125-
stderrChan <- strings.Join(messages, "\n")
126161
return
127162
}
128-
messages = append(messages, string(line))
163+
operator.Logger().Error("Received from journalctl stderr", zap.ByteString("stderr", line))
129164
}
130165
}()
131166

132-
// Start the reader goroutine
167+
// Start the reader goroutine.
168+
// This goroutine reads the stdout from the journalctl process, parses
169+
// the data, and writes to output. If the journalctl process exits for
170+
// any reason, then the stdout will be closed, this goroutine will get
171+
// an EOF error and exits.
133172
operator.wg.Add(1)
134173
go func() {
135174
defer operator.wg.Done()
136175

137-
stdoutBuf := bufio.NewReader(stdout)
176+
stdoutBuf := bufio.NewReader(jctl.stdout)
138177

139178
for {
140179
line, err := stdoutBuf.ReadBytes('\n')
@@ -159,16 +198,11 @@ func (operator *Input) Start(persister operator.Persister) error {
159198
}
160199
}()
161200

162-
// Wait waitDuration for eventual error
163-
select {
164-
case err := <-failedChan:
165-
if err.err == "" {
166-
return fmt.Errorf("journalctl command exited")
167-
}
168-
return fmt.Errorf("journalctl command failed (%v): %v", err.err, err.output)
169-
case <-time.After(waitDuration):
170-
return nil
171-
}
201+
// we wait for the reader goroutines to exit before calling Cmd.Wait().
202+
// As per documentation states, "It is thus incorrect to call Wait before all reads from the pipe have completed".
203+
operator.wg.Wait()
204+
205+
return jctl.cmd.Wait()
172206
}
173207

174208
func (operator *Input) parseJournalEntry(line []byte) (*entry.Entry, string, error) {
@@ -219,6 +253,5 @@ func (operator *Input) Stop() error {
219253
if operator.cancel != nil {
220254
operator.cancel()
221255
}
222-
operator.wg.Wait()
223256
return nil
224257
}

pkg/stanza/operator/input/journald/input_test.go

Lines changed: 12 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ package journald
88
import (
99
"bytes"
1010
"context"
11+
"errors"
1112
"io"
1213
"os/exec"
1314
"testing"
@@ -17,19 +18,21 @@ import (
1718
"github.com/stretchr/testify/mock"
1819
"github.com/stretchr/testify/require"
1920
"go.opentelemetry.io/collector/component/componenttest"
21+
"go.uber.org/zap"
2022

2123
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry"
2224
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
2325
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/testutil"
2426
)
2527

2628
type fakeJournaldCmd struct {
27-
exitError *exec.ExitError
28-
stdErr string
29+
startError error
30+
exitError *exec.ExitError
31+
stdErr string
2932
}
3033

3134
func (f *fakeJournaldCmd) Start() error {
32-
return nil
35+
return f.startError
3336
}
3437

3538
func (f *fakeJournaldCmd) StdoutPipe() (io.ReadCloser, error) {
@@ -73,8 +76,7 @@ func TestInputJournald(t *testing.T) {
7376
return &fakeJournaldCmd{}
7477
}
7578

76-
err = op.Start(testutil.NewUnscopedMockPersister())
77-
assert.EqualError(t, err, "journalctl command exited")
79+
require.NoError(t, op.Start(testutil.NewUnscopedMockPersister()))
7880
defer func() {
7981
require.NoError(t, op.Stop())
8082
}()
@@ -236,6 +238,7 @@ func TestInputJournaldError(t *testing.T) {
236238
cfg.OutputIDs = []string{"output"}
237239

238240
set := componenttest.NewNopTelemetrySettings()
241+
set.Logger, _ = zap.NewDevelopment()
239242
op, err := cfg.Build(set)
240243
require.NoError(t, err)
241244

@@ -250,20 +253,12 @@ func TestInputJournaldError(t *testing.T) {
250253

251254
op.(*Input).newCmd = func(_ context.Context, _ []byte) cmd {
252255
return &fakeJournaldCmd{
253-
exitError: &exec.ExitError{},
254-
stdErr: "stderr output\n",
256+
exitError: &exec.ExitError{},
257+
startError: errors.New("fail to start"),
255258
}
256259
}
257260

258261
err = op.Start(testutil.NewUnscopedMockPersister())
259-
assert.EqualError(t, err, "journalctl command failed (<nil>): stderr output\n")
260-
defer func() {
261-
require.NoError(t, op.Stop())
262-
}()
263-
264-
select {
265-
case <-received:
266-
case <-time.After(time.Second):
267-
require.FailNow(t, "Timed out waiting for entry to be read")
268-
}
262+
assert.EqualError(t, err, "journalctl command failed: start journalctl: fail to start")
263+
require.NoError(t, op.Stop())
269264
}

0 commit comments

Comments
 (0)