Skip to content

Commit 09d924f

Browse files
status identifies failing component, fleet gateway may report degraded, liveness endpoint added (#569)
* Add liveness endpoint Add /liveness route to metrics server. This route will report the status from pkg/core/status. fleet-gateway will now report a degraded state if a checkin fails. This may not propogate to fleet-server as a failed checkin means communications between the agent and the server are not working. It may also lead to the server reporting degraded for up to 30s (fleet-server polling time) when teh agent is able to successfully connect. * linter fix * add nolint direcrtive * Linter fix * Review feedback, add doc strings * Rename noop controller file to _test file
1 parent 11ce214 commit 09d924f

File tree

10 files changed

+319
-75
lines changed

10 files changed

+319
-75
lines changed

CHANGELOG.next.asciidoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,3 +185,4 @@
185185
- Bump node.js version for heartbeat/synthetics to 16.15.0
186186
- Support scheduled actions and cancellation of pending actions. {issue}393[393] {pull}419[419]
187187
- Add `@metadata.input_id` and `@metadata.stream_id` when applying the inject stream processor {pull}527[527]
188+
- Add liveness endpoint, allow fleet-gateway component to report degraded state, add update time and messages to status output. {issue}390[390] {pull}569[569]

internal/pkg/agent/application/gateway/fleet/fleet_gateway.go

Lines changed: 29 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
// or more contributor license agreements. Licensed under the Elastic License;
33
// you may not use this file except in compliance with the Elastic License.
44

5+
// Package fleet handles interactions between the elastic-agent and fleet-server.
6+
// Specifically it will handle agent checkins, and action queueing/dispatch.
57
package fleet
68

79
import (
@@ -75,23 +77,24 @@ type actionQueue interface {
7577
}
7678

7779
type fleetGateway struct {
78-
bgContext context.Context
79-
log *logger.Logger
80-
dispatcher pipeline.Dispatcher
81-
client client.Sender
82-
scheduler scheduler.Scheduler
83-
backoff backoff.Backoff
84-
settings *fleetGatewaySettings
85-
agentInfo agentInfo
86-
reporter fleetReporter
87-
done chan struct{}
88-
wg sync.WaitGroup
89-
acker store.FleetAcker
90-
unauthCounter int
91-
statusController status.Controller
92-
statusReporter status.Reporter
93-
stateStore stateStore
94-
queue actionQueue
80+
bgContext context.Context
81+
log *logger.Logger
82+
dispatcher pipeline.Dispatcher
83+
client client.Sender
84+
scheduler scheduler.Scheduler
85+
backoff backoff.Backoff
86+
settings *fleetGatewaySettings
87+
agentInfo agentInfo
88+
reporter fleetReporter
89+
done chan struct{}
90+
wg sync.WaitGroup
91+
acker store.FleetAcker
92+
unauthCounter int
93+
checkinFailCounter int
94+
statusController status.Controller
95+
statusReporter status.Reporter
96+
stateStore stateStore
97+
queue actionQueue
9598
}
9699

97100
// New creates a new fleet gateway
@@ -286,6 +289,7 @@ func (f *fleetGateway) doExecute() (*fleetapi.CheckinResponse, error) {
286289
f.log.Debugf("Checking started")
287290
resp, err := f.execute(f.bgContext)
288291
if err != nil {
292+
f.checkinFailCounter++
289293
f.log.Errorf("Could not communicate with fleet-server Checking API will retry, error: %s", err)
290294
if !f.backoff.Wait() {
291295
// Something bad has happened and we log it and we should update our current state.
@@ -299,8 +303,16 @@ func (f *fleetGateway) doExecute() (*fleetapi.CheckinResponse, error) {
299303
f.statusReporter.Update(state.Failed, err.Error(), nil)
300304
return nil, err
301305
}
306+
if f.checkinFailCounter > 1 {
307+
// Update status reporter for gateway to degraded when there are two consecutive failures.
308+
// Note that this may not propagate to fleet-server as the agent is having issues checking in.
309+
// It may also (falsely) report a degraded session for 30s if it is eventually successful.
310+
// However this component will allow the agent to report fleet gateway degredation locally.
311+
f.statusReporter.Update(state.Degraded, fmt.Sprintf("checkin failed: %v", err), nil)
312+
}
302313
continue
303314
}
315+
f.checkinFailCounter = 0
304316
// Request was successful, return the collected actions.
305317
return resp, nil
306318
}

internal/pkg/agent/application/gateway/fleet/fleet_gateway_test.go

Lines changed: 84 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,14 @@ import (
2626
"github.com/elastic/elastic-agent/internal/pkg/agent/application/paths"
2727
"github.com/elastic/elastic-agent/internal/pkg/agent/storage"
2828
"github.com/elastic/elastic-agent/internal/pkg/agent/storage/store"
29+
"github.com/elastic/elastic-agent/internal/pkg/core/state"
2930
"github.com/elastic/elastic-agent/internal/pkg/fleetapi"
3031
noopacker "github.com/elastic/elastic-agent/internal/pkg/fleetapi/acker/noop"
3132
repo "github.com/elastic/elastic-agent/internal/pkg/reporter"
3233
fleetreporter "github.com/elastic/elastic-agent/internal/pkg/reporter/fleet"
3334
fleetreporterConfig "github.com/elastic/elastic-agent/internal/pkg/reporter/fleet/config"
3435
"github.com/elastic/elastic-agent/internal/pkg/scheduler"
36+
"github.com/elastic/elastic-agent/internal/pkg/testutils"
3537
"github.com/elastic/elastic-agent/pkg/core/logger"
3638
)
3739

@@ -705,59 +707,95 @@ func TestRetriesOnFailures(t *testing.T) {
705707
Backoff: backoffSettings{Init: 100 * time.Millisecond, Max: 5 * time.Second},
706708
}
707709

708-
t.Run("When the gateway fails to communicate with the checkin API we will retry",
709-
withGateway(agentInfo, settings, func(
710-
t *testing.T,
711-
gateway gateway.FleetGateway,
712-
client *testingClient,
713-
dispatcher *testingDispatcher,
714-
scheduler *scheduler.Stepper,
715-
rep repo.Backend,
716-
) {
717-
fail := func(_ http.Header, _ io.Reader) (*http.Response, error) {
718-
return wrapStrToResp(http.StatusInternalServerError, "something is bad"), nil
719-
}
720-
clientWaitFn := client.Answer(fail)
721-
err := gateway.Start()
722-
require.NoError(t, err)
710+
t.Run("When the gateway fails to communicate with the checkin API we will retry", func(t *testing.T) {
711+
scheduler := scheduler.NewStepper()
712+
client := newTestingClient()
713+
dispatcher := newTestingDispatcher()
714+
log, _ := logger.New("fleet_gateway", false)
715+
rep := getReporter(agentInfo, log, t)
723716

724-
_ = rep.Report(context.Background(), &testStateEvent{})
717+
ctx, cancel := context.WithCancel(context.Background())
718+
defer cancel()
725719

726-
// Initial tick is done out of bound so we can block on channels.
727-
scheduler.Next()
720+
diskStore := storage.NewDiskStore(paths.AgentStateStoreFile())
721+
stateStore, err := store.NewStateStore(log, diskStore)
722+
require.NoError(t, err)
728723

729-
// Simulate a 500 errors for the next 3 calls.
730-
<-clientWaitFn
731-
<-clientWaitFn
732-
<-clientWaitFn
733-
734-
// API recover
735-
waitFn := ackSeq(
736-
client.Answer(func(_ http.Header, body io.Reader) (*http.Response, error) {
737-
cr := &request{}
738-
content, err := ioutil.ReadAll(body)
739-
if err != nil {
740-
t.Fatal(err)
741-
}
742-
err = json.Unmarshal(content, &cr)
743-
if err != nil {
744-
t.Fatal(err)
745-
}
724+
queue := &mockQueue{}
725+
queue.On("DequeueActions").Return([]fleetapi.Action{})
726+
queue.On("Actions").Return([]fleetapi.Action{})
727+
728+
fleetReporter := &testutils.MockReporter{}
729+
fleetReporter.On("Update", state.Degraded, mock.Anything, mock.Anything).Times(2)
730+
fleetReporter.On("Update", mock.Anything, mock.Anything, mock.Anything).Maybe()
731+
fleetReporter.On("Unregister").Maybe()
732+
733+
statusController := &testutils.MockController{}
734+
statusController.On("RegisterComponent", "gateway").Return(fleetReporter).Once()
735+
statusController.On("StatusString").Return("string")
736+
737+
gateway, err := newFleetGatewayWithScheduler(
738+
ctx,
739+
log,
740+
settings,
741+
agentInfo,
742+
client,
743+
dispatcher,
744+
scheduler,
745+
rep,
746+
noopacker.NewAcker(),
747+
statusController,
748+
stateStore,
749+
queue,
750+
)
751+
require.NoError(t, err)
752+
753+
fail := func(_ http.Header, _ io.Reader) (*http.Response, error) {
754+
return wrapStrToResp(http.StatusInternalServerError, "something is bad"), nil
755+
}
756+
clientWaitFn := client.Answer(fail)
757+
err = gateway.Start()
758+
require.NoError(t, err)
746759

747-
require.Equal(t, 1, len(cr.Events))
760+
_ = rep.Report(context.Background(), &testStateEvent{})
748761

749-
resp := wrapStrToResp(http.StatusOK, `{ "actions": [] }`)
750-
return resp, nil
751-
}),
762+
// Initial tick is done out of bound so we can block on channels.
763+
scheduler.Next()
752764

753-
dispatcher.Answer(func(actions ...fleetapi.Action) error {
754-
require.Equal(t, 0, len(actions))
755-
return nil
756-
}),
757-
)
765+
// Simulate a 500 errors for the next 3 calls.
766+
<-clientWaitFn
767+
<-clientWaitFn
768+
<-clientWaitFn
758769

759-
waitFn()
760-
}))
770+
// API recover
771+
waitFn := ackSeq(
772+
client.Answer(func(_ http.Header, body io.Reader) (*http.Response, error) {
773+
cr := &request{}
774+
content, err := ioutil.ReadAll(body)
775+
if err != nil {
776+
t.Fatal(err)
777+
}
778+
err = json.Unmarshal(content, &cr)
779+
if err != nil {
780+
t.Fatal(err)
781+
}
782+
783+
require.Equal(t, 1, len(cr.Events))
784+
785+
resp := wrapStrToResp(http.StatusOK, `{ "actions": [] }`)
786+
return resp, nil
787+
}),
788+
789+
dispatcher.Answer(func(actions ...fleetapi.Action) error {
790+
require.Equal(t, 0, len(actions))
791+
return nil
792+
}),
793+
)
794+
795+
waitFn()
796+
statusController.AssertExpectations(t)
797+
fleetReporter.AssertExpectations(t)
798+
})
761799

762800
t.Run("The retry loop is interruptible",
763801
withGateway(agentInfo, &fleetGatewaySettings{

internal/pkg/agent/application/gateway/fleet/noop_status_controller.go renamed to internal/pkg/agent/application/gateway/fleet/noop_status_controller_test.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,21 +5,25 @@
55
package fleet
66

77
import (
8+
"net/http"
9+
810
"github.com/elastic/elastic-agent/internal/pkg/core/state"
911
"github.com/elastic/elastic-agent/internal/pkg/core/status"
1012
)
1113

1214
type noopController struct{}
1315

16+
func (*noopController) SetAgentID(_ string) {}
1417
func (*noopController) RegisterComponent(_ string) status.Reporter { return &noopReporter{} }
1518
func (*noopController) RegisterComponentWithPersistance(_ string, _ bool) status.Reporter {
1619
return &noopReporter{}
1720
}
18-
func (*noopController) RegisterApp(_ string, _ string) status.Reporter { return &noopReporter{} }
19-
func (*noopController) Status() status.AgentStatus { return status.AgentStatus{Status: status.Healthy} }
20-
func (*noopController) StatusCode() status.AgentStatusCode { return status.Healthy }
21-
func (*noopController) UpdateStateID(_ string) {}
22-
func (*noopController) StatusString() string { return "online" }
21+
func (*noopController) RegisterApp(_ string, _ string) status.Reporter { return &noopReporter{} }
22+
func (*noopController) Status() status.AgentStatus { return status.AgentStatus{Status: status.Healthy} }
23+
func (*noopController) StatusCode() status.AgentStatusCode { return status.Healthy }
24+
func (*noopController) UpdateStateID(_ string) {}
25+
func (*noopController) StatusString() string { return "online" }
26+
func (*noopController) ServeHTTP(_ http.ResponseWriter, _ *http.Request) {}
2327

2428
type noopReporter struct{}
2529

internal/pkg/agent/cmd/run.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,7 @@ func run(override cfgOverrider) error {
169169
rex := reexec.NewManager(rexLogger, execPath)
170170

171171
statusCtrl := status.NewController(logger)
172+
statusCtrl.SetAgentID(agentInfo.AgentID())
172173

173174
tracer, err := initTracer(agentName, release.Version(), cfg.Settings.MonitoringConfig)
174175
if err != nil {
@@ -199,7 +200,7 @@ func run(override cfgOverrider) error {
199200
control.SetRouteFn(app.Routes)
200201
control.SetMonitoringCfg(cfg.Settings.MonitoringConfig)
201202

202-
serverStopFn, err := setupMetrics(agentInfo, logger, cfg.Settings.DownloadConfig.OS(), cfg.Settings.MonitoringConfig, app, tracer)
203+
serverStopFn, err := setupMetrics(agentInfo, logger, cfg.Settings.DownloadConfig.OS(), cfg.Settings.MonitoringConfig, app, tracer, statusCtrl)
203204
if err != nil {
204205
return err
205206
}
@@ -354,6 +355,7 @@ func setupMetrics(
354355
cfg *monitoringCfg.MonitoringConfig,
355356
app application.Application,
356357
tracer *apm.Tracer,
358+
statusCtrl status.Controller,
357359
) (func() error, error) {
358360
if err := report.SetupMetrics(logger, agentName, version.GetDefaultVersion()); err != nil {
359361
return nil, err
@@ -366,7 +368,7 @@ func setupMetrics(
366368
}
367369

368370
bufferEnabled := cfg.HTTP.Buffer != nil && cfg.HTTP.Buffer.Enabled
369-
s, err := monitoringServer.New(logger, endpointConfig, monitoring.GetNamespace, app.Routes, isProcessStatsEnabled(cfg.HTTP), bufferEnabled, tracer)
371+
s, err := monitoringServer.New(logger, endpointConfig, monitoring.GetNamespace, app.Routes, isProcessStatsEnabled(cfg.HTTP), bufferEnabled, tracer, statusCtrl)
370372
if err != nil {
371373
return nil, errors.New(err, "could not start the HTTP server for the API")
372374
}

internal/pkg/core/monitoring/server/server.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ func New(
3333
enableProcessStats bool,
3434
enableBuffer bool,
3535
tracer *apm.Tracer,
36+
statusController http.Handler,
3637
) (*api.Server, error) {
3738
if err := createAgentMonitoringDrop(endpointConfig.Host); err != nil {
3839
// log but ignore
@@ -44,7 +45,7 @@ func New(
4445
return nil, err
4546
}
4647

47-
return exposeMetricsEndpoint(log, cfg, ns, routesFetchFn, enableProcessStats, enableBuffer, tracer)
48+
return exposeMetricsEndpoint(log, cfg, ns, routesFetchFn, enableProcessStats, enableBuffer, tracer, statusController)
4849
}
4950

5051
func exposeMetricsEndpoint(
@@ -55,6 +56,7 @@ func exposeMetricsEndpoint(
5556
enableProcessStats bool,
5657
enableBuffer bool,
5758
tracer *apm.Tracer,
59+
statusController http.Handler,
5860
) (*api.Server, error) {
5961
r := mux.NewRouter()
6062
if tracer != nil {
@@ -63,6 +65,8 @@ func exposeMetricsEndpoint(
6365
statsHandler := statsHandler(ns("stats"))
6466
r.Handle("/stats", createHandler(statsHandler))
6567

68+
r.Handle("/liveness", statusController)
69+
6670
if enableProcessStats {
6771
r.HandleFunc("/processes", processesHandler(routesFetchFn))
6872
r.Handle("/processes/{processID}", createHandler(processHandler(statsHandler)))
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package status
2+
3+
import (
4+
"encoding/json"
5+
"net/http"
6+
"time"
7+
)
8+
9+
// LivenessResponse is the response body for the liveness endpoint.
10+
type LivenessResponse struct {
11+
ID string `json:"id"`
12+
Status string `json:"status"`
13+
Message string `json:"message"`
14+
UpdateTime time.Time `json:"update_timestamp"`
15+
}
16+
17+
// ServeHTTP is an HTTP Handler for the status controller.
18+
// Respose code is 200 for a healthy agent, and 503 otherwise.
19+
// Response body is a JSON object that contains the agent ID, status, message, and the last status update time.
20+
func (r *controller) ServeHTTP(wr http.ResponseWriter, req *http.Request) {
21+
s := r.Status()
22+
lr := LivenessResponse{
23+
ID: r.agentID,
24+
Status: s.Status.String(),
25+
Message: s.Message,
26+
UpdateTime: s.UpdateTime,
27+
}
28+
status := http.StatusOK
29+
if s.Status != Healthy {
30+
status = http.StatusServiceUnavailable
31+
}
32+
33+
wr.Header().Set("Content-Type", "application/json")
34+
wr.WriteHeader(status)
35+
enc := json.NewEncoder(wr)
36+
if err := enc.Encode(lr); err != nil {
37+
r.log.Errorf("Unable to encode liveness response: %v", err)
38+
}
39+
}

0 commit comments

Comments
 (0)