Skip to content

Commit 3416ca7

Browse files
leehinmanmergify[bot]
authored andcommitted
enforce ordering of libbeat API server Start/Stop functions (#46865)
* enforce ordering of libbeat API server Start/Stop functions (cherry picked from commit 4490460)
1 parent b2d1609 commit 3416ca7

File tree

2 files changed

+143
-22
lines changed

2 files changed

+143
-22
lines changed

libbeat/api/server.go

Lines changed: 57 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,14 @@ import (
3131
"github.com/elastic/elastic-agent-libs/logp"
3232
)
3333

34+
type serverState int
35+
36+
const (
37+
stateNew = iota
38+
stateStarted
39+
stateStopped
40+
)
41+
3442
// Server takes care of correctly starting the HTTP component of the API
3543
// and will answer all the routes defined in the received ServeMux.
3644
type Server struct {
@@ -41,6 +49,7 @@ type Server struct {
4149
wg sync.WaitGroup
4250
mutex sync.Mutex
4351
httpServer *http.Server
52+
state serverState
4453
}
4554

4655
// New creates a new API Server with no routes attached.
@@ -61,37 +70,69 @@ func New(log *logp.Logger, config *config.C) (*Server, error) {
6170
l: l,
6271
config: cfg,
6372
log: log.Named("api"),
73+
state: stateNew,
6474
}, nil
6575
}
6676

6777
// Start starts the HTTP server and accepting new connection.
6878
func (s *Server) Start() {
6979
s.mutex.Lock()
7080
defer s.mutex.Unlock()
71-
s.log.Info("Starting stats endpoint")
72-
s.wg.Add(1)
73-
s.httpServer = &http.Server{Handler: s.mux} //nolint:gosec // Keep original behavior
74-
go func(l net.Listener) {
75-
defer s.wg.Done()
76-
s.log.Infof("Metrics endpoint listening on: %s (configured: %s)", l.Addr().String(), s.config.Host)
77-
78-
err := s.httpServer.Serve(l)
79-
s.log.Infof("Stats endpoint (%s) finished: %v", l.Addr().String(), err)
80-
}(s.l)
81+
82+
switch s.state {
83+
case stateNew:
84+
s.state = stateStarted
85+
s.log.Info("Starting stats endpoint")
86+
s.wg.Add(1)
87+
s.httpServer = &http.Server{Handler: s.mux} //nolint:gosec // Keep original behavior
88+
go func(l net.Listener) {
89+
defer s.wg.Done()
90+
s.log.Infof("Metrics endpoint listening on: %s (configured: %s)", l.Addr().String(), s.config.Host)
91+
92+
err := s.httpServer.Serve(l)
93+
s.log.Infof("Stats endpoint (%s) finished: %v", l.Addr().String(), err)
94+
}(s.l)
95+
return
96+
case stateStarted:
97+
// only call Start once
98+
s.log.Debug("not starting stats endpoint because start was already called")
99+
return
100+
case stateStopped:
101+
s.log.Debug("not starting stats endpoint because stop was already called")
102+
return
103+
default:
104+
s.log.Errorf("unknown stats server state: %d", s.state)
105+
}
81106
}
82107

83108
// Stop stops the API server and free any resource associated with the process like unix sockets.
84109
func (s *Server) Stop() error {
85110
s.mutex.Lock()
86111
defer s.mutex.Unlock()
87-
if s.httpServer == nil {
112+
113+
switch s.state {
114+
case stateNew:
115+
s.state = stateStopped
116+
// New always creates a listener, need to close it even if the server hasn't started
117+
if err := s.l.Close(); err != nil {
118+
s.log.Infof("error closing stats endpoint (%s): %v", s.l.Addr().String(), err)
119+
}
88120
return nil
121+
case stateStarted:
122+
s.state = stateStopped
123+
// Closing the server will also close the listener
124+
if err := s.httpServer.Close(); err != nil {
125+
return fmt.Errorf("error closing monitoring server: %w", err)
126+
}
127+
s.wg.Wait()
128+
return nil
129+
case stateStopped:
130+
// only need to call Stop once
131+
s.log.Debug("not stopping stats endpoint because stop was already called")
132+
return nil
133+
default:
134+
return fmt.Errorf("unknown stats server state: %d", s.state)
89135
}
90-
if err := s.httpServer.Close(); err != nil {
91-
return fmt.Errorf("error closing monitoring server: %w", err)
92-
}
93-
s.wg.Wait()
94-
return nil
95136
}
96137

97138
// AttachHandler will attach a handler at the specified route. Routes are

libbeat/api/server_test.go

Lines changed: 86 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,20 @@ package api
1919

2020
import (
2121
"context"
22+
"encoding/base64"
2223
"io"
24+
"math/rand/v2"
2325
"net"
2426
"net/http"
2527
"net/http/httptest"
2628
"os"
29+
"path/filepath"
2730
"runtime"
2831
"testing"
2932

3033
"github.com/stretchr/testify/assert"
3134
"github.com/stretchr/testify/require"
35+
"go.uber.org/goleak"
3236

3337
"github.com/elastic/elastic-agent-libs/config"
3438
"github.com/elastic/elastic-agent-libs/logp/logptest"
@@ -41,7 +45,7 @@ func TestConfiguration(t *testing.T) {
4145
return
4246
}
4347
t.Run("when user is set", func(t *testing.T) {
44-
cfg := config.MustNewConfigFrom(map[string]interface{}{
48+
cfg := config.MustNewConfigFrom(map[string]any{
4549
"host": "unix:///tmp/ok",
4650
"user": "admin",
4751
})
@@ -51,7 +55,7 @@ func TestConfiguration(t *testing.T) {
5155
})
5256

5357
t.Run("when security descriptor is set", func(t *testing.T) {
54-
cfg := config.MustNewConfigFrom(map[string]interface{}{
58+
cfg := config.MustNewConfigFrom(map[string]any{
5559
"host": "unix:///tmp/ok",
5660
"security_descriptor": "D:P(A;;GA;;;1234)",
5761
})
@@ -87,7 +91,7 @@ func TestSocket(t *testing.T) {
8791
sockFile := tmpDir + "/test.sock"
8892
t.Log(sockFile)
8993

90-
cfg := config.MustNewConfigFrom(map[string]interface{}{
94+
cfg := config.MustNewConfigFrom(map[string]any{
9195
"host": "unix://" + sockFile,
9296
})
9397

@@ -130,7 +134,7 @@ func TestSocket(t *testing.T) {
130134
require.NoError(t, err)
131135
f.Close()
132136

133-
cfg := config.MustNewConfigFrom(map[string]interface{}{
137+
cfg := config.MustNewConfigFrom(map[string]any{
134138
"host": "unix://" + sockFile,
135139
})
136140

@@ -167,7 +171,7 @@ func TestHTTP(t *testing.T) {
167171
// select a random free port.
168172
url := "http://localhost:0"
169173

170-
cfg := config.MustNewConfigFrom(map[string]interface{}{
174+
cfg := config.MustNewConfigFrom(map[string]any{
171175
"host": url,
172176
})
173177
logger := logptest.NewTestingLogger(t, "")
@@ -198,7 +202,7 @@ func attachEchoHelloHandler(t *testing.T, s *Server) {
198202
}
199203

200204
func TestAttachHandler(t *testing.T) {
201-
cfg := config.MustNewConfigFrom(map[string]interface{}{
205+
cfg := config.MustNewConfigFrom(map[string]any{
202206
"host": "http://localhost:0",
203207
})
204208

@@ -223,8 +227,84 @@ func TestAttachHandler(t *testing.T) {
223227
assert.Equal(t, "test!", resp.Body.String())
224228
}
225229

230+
func TestOrdering(t *testing.T) {
231+
monitorSocket := genSocketPath()
232+
var monitorHost string
233+
if runtime.GOOS == "windows" {
234+
monitorHost = "npipe:///" + filepath.Base(monitorSocket)
235+
} else {
236+
monitorHost = "unix://" + monitorSocket
237+
}
238+
cfg := config.MustNewConfigFrom(map[string]any{
239+
"host": monitorHost,
240+
})
241+
242+
t.Run("NewStartStop", func(t *testing.T) {
243+
defer goleak.VerifyNone(t)
244+
logger := logptest.NewTestingLogger(t, "")
245+
s, err := New(logger, cfg)
246+
require.NoError(t, err)
247+
s.Start()
248+
err = s.Stop()
249+
require.NoError(t, err)
250+
s.wg.Wait()
251+
})
252+
t.Run("NewStopStart", func(t *testing.T) {
253+
defer goleak.VerifyNone(t)
254+
logger := logptest.NewTestingLogger(t, "")
255+
s, err := New(logger, cfg)
256+
require.NoError(t, err)
257+
err = s.Stop()
258+
require.NoError(t, err)
259+
s.Start()
260+
s.wg.Wait()
261+
})
262+
t.Run("NewStop", func(t *testing.T) {
263+
defer goleak.VerifyNone(t)
264+
logger := logptest.NewTestingLogger(t, "")
265+
s, err := New(logger, cfg)
266+
require.NoError(t, err)
267+
err = s.Stop()
268+
require.NoError(t, err)
269+
s.wg.Wait()
270+
})
271+
t.Run("NewStopStop", func(t *testing.T) {
272+
defer goleak.VerifyNone(t)
273+
logger := logptest.NewTestingLogger(t, "")
274+
s, err := New(logger, cfg)
275+
require.NoError(t, err)
276+
err = s.Stop()
277+
require.NoError(t, err)
278+
err = s.Stop()
279+
require.NoError(t, err)
280+
s.wg.Wait()
281+
})
282+
t.Run("NewStartStartStop", func(t *testing.T) {
283+
defer goleak.VerifyNone(t)
284+
logger := logptest.NewTestingLogger(t, "")
285+
s, err := New(logger, cfg)
286+
require.NoError(t, err)
287+
s.Start()
288+
s.Start()
289+
err = s.Stop()
290+
require.NoError(t, err)
291+
s.wg.Wait()
292+
})
293+
}
294+
226295
func newTestHandler(response string) http.Handler {
227296
return http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
228297
_, _ = io.WriteString(w, response)
229298
})
230299
}
300+
301+
func genSocketPath() string {
302+
randData := make([]byte, 16)
303+
for i := range len(randData) {
304+
randData[i] = uint8(rand.UintN(255)) //nolint:gosec // 0-255 fits in a uint8
305+
}
306+
socketName := base64.URLEncoding.EncodeToString(randData) + ".sock"
307+
// don't use t.TempDir() because it can be too long
308+
socketDir := os.TempDir()
309+
return filepath.Join(socketDir, socketName)
310+
}

0 commit comments

Comments
 (0)