Skip to content
Merged
3 changes: 2 additions & 1 deletion internal/pkg/agent/operation/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,7 @@ func (o *Operator) getMonitoringMetricbeatConfig(outputType string, output inter
if len(hosts) == 0 {
return nil, false
}
//nolint:prealloc // false positive
var modules []interface{}
fixedAgentName := strings.ReplaceAll(agentName, "-", "_")

Expand Down Expand Up @@ -668,7 +669,7 @@ func normalizeHTTPCopyRules(name string) []map[string]interface{} {
return fromToMap
}

for _, exportedMetric := range spec.ExprtedMetrics {
for _, exportedMetric := range spec.ExportedMetrics {
fromToMap = append(fromToMap, map[string]interface{}{
"from": fmt.Sprintf("http.agent.%s", exportedMetric),
"to": exportedMetric,
Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/agent/operation/monitoring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import (
func TestExportedMetrics(t *testing.T) {
programName := "testing"
expectedMetricsName := "metric_name"
program.SupportedMap[programName] = program.Spec{ExprtedMetrics: []string{expectedMetricsName}}
program.SupportedMap[programName] = program.Spec{ExportedMetrics: []string{expectedMetricsName}}

exportedMetrics := normalizeHTTPCopyRules(programName)

Expand Down
16 changes: 15 additions & 1 deletion internal/pkg/agent/program/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,25 @@ type Spec struct {
When string `yaml:"when"`
Constraints string `yaml:"constraints"`
RestartOnOutputChange bool `yaml:"restart_on_output_change,omitempty"`
ExprtedMetrics []string `yaml:"exported_metrics,omitempty"`
ExportedMetrics []string `yaml:"exported_metrics,omitempty"`
Process *ProcessSettings `yaml:"process,omitempty"`
}

// ProcessSettings process specific settings
type ProcessSettings struct {
// Allows to override the agent stop timeout settings and specify a different stop timeout for Endpoint service
StopTimeoutSecs int `yaml:"stop_timeout"`
}

// Service info
type ServiceInfo struct {
Name string `yaml:"name"`
Label string `yaml:"label"`
}

// ReadSpecs reads all the specs that match the provided globbing path.
func ReadSpecs(path string) ([]Spec, error) {
//nolint:prealloc // do not lint
var specs []Spec
files, err := filepath.Glob(path)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/agent/program/supported.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

50 changes: 36 additions & 14 deletions internal/pkg/core/plugin/service/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,13 @@ import (
"github.com/elastic/elastic-agent/pkg/core/server"
)

const (
darwin = "darwin"
windows = "windows"
unknown = "Unknown"
running = "Running"
)

var (
// ErrAppNotInstalled is returned when configuration is performed on not installed application.
ErrAppNotInstalled = errors.New("application is not installed", errors.TypeApplication)
Expand Down Expand Up @@ -165,8 +172,8 @@ func (a *Application) Start(ctx context.Context, _ app.Taggable, cfg map[string]
// already started
if a.srvState != nil {
a.setState(state.Starting, "Starting", nil)
a.srvState.SetStatus(proto.StateObserved_STARTING, a.state.Message, a.state.Payload)
a.srvState.UpdateConfig(a.srvState.Config())
_ = a.srvState.SetStatus(proto.StateObserved_STARTING, a.state.Message, a.state.Payload)
_ = a.srvState.UpdateConfig(a.srvState.Config())
} else {
a.setState(state.Starting, "Starting", nil)
a.srvState, err = a.srv.Register(a, string(cfgStr))
Expand Down Expand Up @@ -247,6 +254,14 @@ func (a *Application) Configure(ctx context.Context, config map[string]interface
return err
}

func (a *Application) getStopTimeout() time.Duration {
if a.desc.Spec().Process != nil && a.desc.Spec().Process.StopTimeoutSecs > 0 {
to := time.Duration(a.desc.Spec().Process.StopTimeoutSecs) * time.Second
return to
}
return a.processConfig.StopTimeout
}

// Stop stops the current application.
func (a *Application) Stop() {
a.appLock.Lock()
Expand All @@ -257,21 +272,28 @@ func (a *Application) Stop() {
return
}

if err := srvState.Stop(a.processConfig.StopTimeout); err != nil {
a.appLock.Lock()
a.setState(
state.Failed,
fmt.Errorf("failed to stop after %s: %w", a.processConfig.StopTimeout, err).Error(),
nil)
} else {
a.appLock.Lock()
a.setState(state.Stopped, "Stopped", nil)
name := a.desc.Spec().Name
to := a.getStopTimeout()

a.logger.Infof("Stop %v service, with %v timeout", name, to)

// Try to stop the service with 3 minutes (default) timeout
if err := srvState.Stop(to); err != nil {
// Log the error
a.logger.Errorf("Failed to stop %v service after %v timeout", name, to)
}
a.srvState = nil

// Cleanup
a.appLock.Lock()
defer a.appLock.Unlock()

a.srvState = nil
a.cleanUp()
a.stopCredsListener()
a.appLock.Unlock()

// Set the service state to "stopped", otherwise the agent is stuck in the failed stop state until restarted
a.logger.Infof("setting %s service status to Stopped", name)
a.setState(state.Stopped, "Stopped", nil)
}

// Shutdown disconnects the service, but doesn't signal it to stop.
Expand Down Expand Up @@ -327,7 +349,7 @@ func (a *Application) setState(s state.Status, msg string, payload map[string]in
}

func (a *Application) cleanUp() {
a.monitor.Cleanup(a.desc.Spec(), a.pipelineID)
_ = a.monitor.Cleanup(a.desc.Spec(), a.pipelineID)
}

func (a *Application) startCredsListener() error {
Expand Down
Loading