Skip to content

Commit 9320d12

Browse files
author
Ryan Fitzpatrick
committed
smartagent: remove event clients in favor of log pipelines
1 parent d8d0cba commit 9320d12

File tree

7 files changed

+201
-151
lines changed

7 files changed

+201
-151
lines changed

internal/receiver/smartagentreceiver/README.md

Lines changed: 40 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,30 +19,58 @@ should be used.
1919
[Collector processors](https://github.com/open-telemetry/opentelemetry-collector/blob/master/processor/README.md).
2020
1. Monitors with [dimension property and tag update
2121
functionality](https://dev.splunk.com/observability/docs/datamodel#Creating-or-updating-custom-properties-and-tags)
22-
require an associated `dimensionClients` field that references the name of the SignalFx exporter you are using in your
23-
pipeline.
22+
allow an associated `dimensionClients` field that references the name of the SignalFx exporter you are using in your
23+
pipeline. If you do not specify any exporters via this field, the receiver will attempt to use the associated
24+
pipeline. If the next element of the pipeline isn't compatible with dimension update behavior, if a lone SignalFx
25+
exporter was configured for your deployment, it will be selected. If no dimension update behavior is desired,
26+
you can specify the empty array `[]` to disable.
2427
1. Monitors with [event-sending
25-
functionality](https://dev.splunk.com/observability/docs/datamodel/ingest#Send-custom-events)
26-
require an associated `eventClients` field that references the name of the SignalFx exporter you are using in your
27-
pipeline.
28+
functionality](https://dev.splunk.com/observability/docs/datamodel/ingest#Send-custom-events) should be made members of
29+
a `logs` pipeline that utilizes a [SignalFx
30+
exporter](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/exporter/signalfxexporter/README.md)
31+
that will make the event submission requests. It's recommended, and in the case of the Processlist monitor required,
32+
to use a Resource Detection to ensure that host identity and other useful information is made available as event
33+
dimensions.
2834

2935
Example:
3036

3137
```yaml
32-
exporters:
33-
signalfx:
38+
extensions:
39+
smartagent:
3440

3541
receivers:
36-
smartagent/haproxy:
37-
type: haproxy
38-
host: myhaproxyinstance
39-
port: 8080
4042
smartagent/postgresql:
4143
type: postgresql
4244
host: mypostgresinstance
4345
port: 5432
4446
dimensionClients: [signalfx]
45-
eventClients: [signalfx]
47+
smartagent/processlist:
48+
type: processlist
49+
smartagent/kafka:
50+
type: collectd/kafka
51+
host: mykafkabroker
52+
port: 7099
53+
clusterName: mykafkacluster
54+
intervalSeconds: 1
55+
56+
exporters:
57+
signalfx:
58+
59+
processors:
60+
resourcedetection:
61+
detectors: [system]
62+
63+
service:
64+
extensions: [smartagent]
65+
pipelines:
66+
metrics:
67+
receivers: [smartagent/postgresql, smartagent/kafka]
68+
processors: [resourcedetection]
69+
exporters: [signalfx]
70+
logs:
71+
receivers: [smartagent/processlist]
72+
processors: [resourcedetection]
73+
exporters: [signalfx]
4674
```
4775
4876
The full list of settings exposed for this receiver are documented for

internal/receiver/smartagentreceiver/config.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ import (
3333
const defaultIntervalSeconds = 10
3434

3535
var errDimensionClientValue = fmt.Errorf("dimensionClients must be an array of compatible exporter names")
36-
var errEventClientValue = fmt.Errorf("eventClients must be an array of compatible exporter names")
3736

3837
type Config struct {
3938
monitorConfig config.MonitorCustomConfig
@@ -42,7 +41,6 @@ type Config struct {
4241
// Will expand to MonitorCustomConfig Host and Port values if unset.
4342
Endpoint string `mapstructure:"endpoint"`
4443
DimensionClients []string `mapstructure:"dimensionclients"`
45-
EventClients []string `mapstructure:"eventclients"`
4644
}
4745

4846
func (rCfg *Config) validate() error {
@@ -87,10 +85,6 @@ func mergeConfigs(componentViperSection *viper.Viper, intoCfg interface{}) error
8785
if err != nil {
8886
return err
8987
}
90-
receiverCfg.EventClients, err = getStringSliceFromAllSettings(allSettings, "eventclients", errEventClientValue)
91-
if err != nil {
92-
return err
93-
}
9488

9589
// monitors.ConfigTemplates is a map that all monitors use to register their custom configs in the Smart Agent.
9690
// The values are always pointers to an actual custom config.

internal/receiver/smartagentreceiver/config_test.go

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -54,14 +54,12 @@ func TestLoadConfig(t *testing.T) {
5454

5555
haproxyCfg := cfg.Receivers["smartagent/haproxy"].(*Config)
5656
expectedDimensionClients := []string{"exampleexporter/one", "exampleexporter/two"}
57-
expectedEventClients := []string{"exampleexporter/two", "exampleexporter/three"}
5857
require.Equal(t, &Config{
5958
ReceiverSettings: configmodels.ReceiverSettings{
6059
TypeVal: typeStr,
6160
NameVal: typeStr + "/haproxy",
6261
},
6362
DimensionClients: expectedDimensionClients,
64-
EventClients: expectedEventClients,
6563
monitorConfig: &haproxy.Config{
6664
MonitorConfig: config.MonitorConfig{
6765
Type: "haproxy",
@@ -84,7 +82,6 @@ func TestLoadConfig(t *testing.T) {
8482
NameVal: typeStr + "/redis",
8583
},
8684
DimensionClients: []string{},
87-
EventClients: []string{},
8885
monitorConfig: &redis.Config{
8986
MonitorConfig: config.MonitorConfig{
9087
Type: "collectd/redis",
@@ -388,18 +385,18 @@ func TestLoadInvalidConfigWithUnsupportedEndpoint(t *testing.T) {
388385
require.Nil(t, cfg)
389386
}
390387

391-
func TestLoadInvalidConfigWithNonArrayEventClients(t *testing.T) {
388+
func TestLoadInvalidConfigWithNonArrayDimensionClients(t *testing.T) {
392389
factories, err := componenttest.ExampleComponents()
393390
assert.Nil(t, err)
394391

395392
factory := NewFactory()
396393
factories.Receivers[configmodels.Type(typeStr)] = factory
397394
cfg, err := configtest.LoadConfigFile(
398-
t, path.Join(".", "testdata", "invalid_nonarray_event_clients.yaml"), factories,
395+
t, path.Join(".", "testdata", "invalid_nonarray_dimension_clients.yaml"), factories,
399396
)
400397
require.Error(t, err)
401398
require.EqualError(t, err,
402-
"error reading receivers configuration for smartagent/haproxy: eventClients must be an array of compatible exporter names")
399+
"error reading receivers configuration for smartagent/haproxy: dimensionClients must be an array of compatible exporter names")
403400
require.Nil(t, cfg)
404401
}
405402

internal/receiver/smartagentreceiver/output.go

Lines changed: 75 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ package smartagentreceiver
1616

1717
import (
1818
"context"
19-
"fmt"
2019
"time"
2120

2221
metadata "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata"
@@ -36,31 +35,33 @@ import (
3635
const internalTransport = "internal"
3736

3837
// Output is an implementation of a Smart Agent FilteringOutput that receives datapoints, events, and dimension updates
39-
// from a configured monitor. It will forward all datapoints to the nextConsumer, all dimension updates to the
40-
// nextDimensionClients, and all events to the nextEventClients as determined by the associated
41-
// items in Config.MetadataClients.
38+
// from a configured monitor. It will forward all datapoints to the nextMetricsConsumer, all dimension updates to the
39+
// nextDimensionClients as determined by the associated items in Config.MetadataClients, and all events to the
40+
// nextLogsConsumer.
4241
type Output struct {
43-
nextConsumer consumer.MetricsConsumer
42+
nextMetricsConsumer consumer.MetricsConsumer
43+
nextLogsConsumer consumer.LogsConsumer
4444
extraDimensions map[string]string
4545
logger *zap.Logger
4646
converter Converter
4747
monitorFiltering *monitorFiltering
4848
receiverName string
4949
nextDimensionClients []*metadata.MetadataExporter
50-
nextEventClients []*consumer.LogsConsumer
5150
}
5251

5352
var _ types.Output = (*Output)(nil)
5453
var _ types.FilteringOutput = (*Output)(nil)
5554

56-
func NewOutput(config Config, filtering *monitorFiltering, nextConsumer consumer.MetricsConsumer, host component.Host, logger *zap.Logger) *Output {
57-
metadataExporters := getMetadataExporters(config, host, &nextConsumer, logger)
58-
logConsumers := getLogsConsumers(config, host, &nextConsumer, logger)
55+
func NewOutput(
56+
config Config, filtering *monitorFiltering, nextMetricsConsumer consumer.MetricsConsumer,
57+
nextLogsConsumer consumer.LogsConsumer, host component.Host, logger *zap.Logger,
58+
) *Output {
59+
metadataExporters := getMetadataExporters(config, host, &nextMetricsConsumer, logger)
5960
return &Output{
6061
receiverName: config.Name(),
61-
nextConsumer: nextConsumer,
62+
nextMetricsConsumer: nextMetricsConsumer,
63+
nextLogsConsumer: nextLogsConsumer,
6264
nextDimensionClients: metadataExporters,
63-
nextEventClients: logConsumers,
6465
logger: logger,
6566
converter: Converter{logger: logger},
6667
extraDimensions: map[string]string{},
@@ -71,11 +72,11 @@ func NewOutput(config Config, filtering *monitorFiltering, nextConsumer consumer
7172
// getMetadataExporters walks through obtained Config.MetadataClients and returns all matching registered MetadataExporters,
7273
// if any. At this time the SignalFx exporter is the only supported use case and adopter of this type.
7374
func getMetadataExporters(
74-
config Config, host component.Host, nextConsumer *consumer.MetricsConsumer, logger *zap.Logger,
75+
config Config, host component.Host, nextMetricsConsumer *consumer.MetricsConsumer, logger *zap.Logger,
7576
) []*metadata.MetadataExporter {
7677
var exporters []*metadata.MetadataExporter
7778

78-
metadataExporters := getClientsFromMetricsExporters(config.DimensionClients, host, nextConsumer, "dimensionClients", logger)
79+
metadataExporters, noClientsSpecified := getDimensionClientsFromMetricsExporters(config.DimensionClients, host, nextMetricsConsumer, logger)
7980
for _, client := range metadataExporters {
8081
if metadataExporter, ok := (*client).(metadata.MetadataExporter); ok {
8182
exporters = append(exporters, &metadataExporter)
@@ -84,68 +85,75 @@ func getMetadataExporters(
8485
}
8586
}
8687

88+
if len(exporters) == 0 && noClientsSpecified {
89+
sfxExporter := getLoneSFxExporter(host, configmodels.MetricsDataType)
90+
if sfxExporter != nil {
91+
if sfx, ok := sfxExporter.(metadata.MetadataExporter); ok {
92+
exporters = append(exporters, &sfx)
93+
}
94+
}
95+
}
96+
8797
if len(exporters) == 0 {
8898
logger.Debug("no dimension updates are possible as no valid dimensionClients have been provided and next pipeline component isn't a MetadataExporter")
8999
}
90100

91101
return exporters
92102
}
93103

94-
// getLogsConsumers walks through obtained Config.EventClients and returns all matching registered LogsConsumers,
95-
// if any. At this time the SignalFx exporter is the only real target use case, but it's unexported and
96-
// as implemented all specified combination MetricsExporters and LogsConsumers will be returned.
97-
func getLogsConsumers(
98-
config Config, host component.Host, nextConsumer *consumer.MetricsConsumer, logger *zap.Logger,
99-
) []*consumer.LogsConsumer {
100-
var consumers []*consumer.LogsConsumer
101-
102-
eventClients := getClientsFromMetricsExporters(config.EventClients, host, nextConsumer, "eventClients", logger)
103-
for _, client := range eventClients {
104-
if logsExporter, ok := (*client).(consumer.LogsConsumer); ok {
105-
consumers = append(consumers, &logsExporter)
106-
} else {
107-
logger.Info("cannot send events to event client", zap.Any("client", *client))
104+
// getDimensionClientsFromMetricsExporters will walk through all provided config.DimensionClients and retrieve matching registered
105+
// MetricsExporters, the only truly supported component type.
106+
// If config.MetadataClients is nil, it will return a slice with nextMetricsConsumer if it's a MetricsExporter.
107+
func getDimensionClientsFromMetricsExporters(
108+
specifiedClients []string, host component.Host, nextMetricsConsumer *consumer.MetricsConsumer, logger *zap.Logger,
109+
) (clients []*metadata.MetadataExporter, wasNil bool) {
110+
if specifiedClients == nil {
111+
wasNil = true
112+
// default to nextMetricsConsumer if no clients have been provided
113+
if asMetadataExporter, ok := (*nextMetricsConsumer).(metadata.MetadataExporter); ok {
114+
clients = append(clients, &asMetadataExporter)
108115
}
116+
return
109117
}
110118

111-
if len(consumers) == 0 {
112-
logger.Debug("no SFx events are possible as no valid eventClients have been provided and next pipeline component isn't a LogsConsumer")
119+
if builtExporters, ok := host.GetExporters()[configmodels.MetricsDataType]; ok {
120+
for _, client := range specifiedClients {
121+
var found bool
122+
for exporterConfig, exporter := range builtExporters {
123+
if exporterConfig.Name() == client {
124+
if asMetadataExporter, ok := exporter.(metadata.MetadataExporter); ok {
125+
clients = append(clients, &asMetadataExporter)
126+
}
127+
found = true
128+
}
129+
}
130+
if !found {
131+
logger.Info(
132+
"specified dimension client is not an available exporter",
133+
zap.String("client", client),
134+
)
135+
}
136+
}
113137
}
114-
115-
return consumers
138+
return
116139
}
117140

118-
// getClientsFromMetricsExporters will walk through all provided config.DimensionClients and retrieve matching registered
119-
// MetricsExporters, the only truly supported component type.
120-
// If config.MetadataClients is nil, it will return a slice with nextConsumer if it's a MetricsExporter.
121-
func getClientsFromMetricsExporters(
122-
specifiedClients []string, host component.Host, nextConsumer *consumer.MetricsConsumer, fieldName string, logger *zap.Logger,
123-
) (clients []*interface{}) {
124-
if specifiedClients == nil {
125-
// default to nextConsumer if no clients have been provided
126-
asInterface := (*nextConsumer).(interface{})
127-
clients = append(clients, &asInterface)
128-
return
129-
}
130-
131-
builtExporters := host.GetExporters()[configmodels.MetricsDataType]
132-
for _, client := range specifiedClients {
133-
var found bool
141+
func getLoneSFxExporter(host component.Host, exporterType configmodels.DataType) component.Exporter {
142+
var sfxExporter component.Exporter
143+
if builtExporters, ok := host.GetExporters()[exporterType]; ok {
134144
for exporterConfig, exporter := range builtExporters {
135-
if exporterConfig.Name() == client {
136-
asInterface := exporter.(interface{})
137-
clients = append(clients, &asInterface)
138-
found = true
145+
if exporterConfig.Type() == "signalfx" {
146+
if sfxExporter == nil {
147+
sfxExporter = exporter
148+
} else { // we've already found one so no lone instance to use as default
149+
return nil
150+
}
151+
139152
}
140153
}
141-
if !found {
142-
logger.Info(
143-
fmt.Sprintf("specified %s is not an available exporter", fieldName),
144-
zap.String("client", client),
145-
)
146-
}
147154
}
148-
return clients
155+
return sfxExporter
156+
149157
}
150158

151159
func (output *Output) AddDatapointExclusionFilter(filter dpfilters.DatapointFilter) {
@@ -177,6 +185,10 @@ func (output *Output) Copy() types.Output {
177185
}
178186

179187
func (output *Output) SendDatapoints(datapoints ...*datapoint.Datapoint) {
188+
if output.nextMetricsConsumer == nil {
189+
return
190+
}
191+
180192
ctx := obsreport.ReceiverContext(context.Background(), output.receiverName, internalTransport)
181193
ctx = obsreport.StartMetricsReceiveOp(ctx, typeStr, internalTransport)
182194

@@ -192,21 +204,19 @@ func (output *Output) SendDatapoints(datapoints ...*datapoint.Datapoint) {
192204
}
193205

194206
_, numPoints := metrics.MetricAndDataPointCount()
195-
err := output.nextConsumer.ConsumeMetrics(context.Background(), metrics)
207+
err := output.nextMetricsConsumer.ConsumeMetrics(context.Background(), metrics)
196208
obsreport.EndMetricsReceiveOp(ctx, typeStr, numPoints, err)
197209
}
198210

199211
func (output *Output) SendEvent(event *event.Event) {
200-
if len(output.nextEventClients) == 0 {
212+
if output.nextLogsConsumer == nil {
201213
return
202214
}
203215

204216
logRecord := eventToLog(event, output.logger)
205-
for _, logsConsumer := range output.nextEventClients {
206-
err := (*logsConsumer).ConsumeLogs(context.Background(), logRecord)
207-
if err != nil {
208-
output.logger.Debug("SendEvent has failed", zap.Error(err))
209-
}
217+
err := output.nextLogsConsumer.ConsumeLogs(context.Background(), logRecord)
218+
if err != nil {
219+
output.logger.Debug("SendEvent has failed", zap.Error(err))
210220
}
211221
}
212222

0 commit comments

Comments
 (0)