Skip to content

Commit 19fab39

Browse files
add discovery converter to append discovered components to service config (#2986)
1 parent c784484 commit 19fab39

File tree

10 files changed

+417
-30
lines changed

10 files changed

+417
-30
lines changed

internal/common/discovery/discovery.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@ const (
2727
ReceiverNameAttr = "discovery.receiver.name"
2828
ReceiverTypeAttr = "discovery.receiver.type"
2929
StatusAttr = "discovery.status"
30+
31+
DiscoExtensionsKey = "extensions/splunk.discovery"
32+
DiscoReceiversKey = "receivers/splunk.discovery"
3033
)
3134

3235
var NoType = component.NewID("")

internal/configconverter/discovery.go

Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
// Copyright The OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package configconverter
16+
17+
import (
18+
"context"
19+
"fmt"
20+
21+
"go.opentelemetry.io/collector/confmap"
22+
23+
"github.com/signalfx/splunk-otel-collector/internal/common/discovery"
24+
)
25+
26+
type Discovery struct{}
27+
28+
// Convert will find `service::<extensions|receivers>/splunk.discovery` entries
29+
// provided by the discovery confmap.Provider and relocate them to
30+
// `service::extensions` and `service::pipelines::metrics::receivers`,
31+
// by appending them to existing sequences, if any.
32+
func (Discovery) Convert(_ context.Context, in *confmap.Conf) error {
33+
if in == nil {
34+
return nil
35+
}
36+
37+
out := in.ToStringMap()
38+
service, serviceExtensions, err := getServiceExtensions(out)
39+
if err != nil {
40+
return err
41+
}
42+
out["service"] = service
43+
44+
discoExtensionsIsSet, discoExtensions, err := getDiscoExtensions(service)
45+
if err != nil {
46+
return err
47+
}
48+
49+
discoReceiversIsSet, discoReceivers, err := getDiscoReceivers(service)
50+
if err != nil {
51+
return err
52+
}
53+
54+
// do nothing if discovery provider didn't modify config
55+
if !discoExtensionsIsSet && !discoReceiversIsSet {
56+
return nil
57+
}
58+
59+
if len(discoExtensions) > 0 {
60+
service["extensions"] = appendUnique(serviceExtensions, discoExtensions)
61+
}
62+
63+
metricsPipeline, metricsReceivers, err := getMetricsPipelineAndReceivers(service)
64+
if err != nil {
65+
return err
66+
}
67+
68+
if len(discoReceivers) > 0 {
69+
metricsPipeline["receivers"] = appendUnique(metricsReceivers, discoReceivers)
70+
}
71+
72+
*in = *confmap.NewFromStringMap(out)
73+
return nil
74+
}
75+
76+
func getServiceExtensions(out map[string]any) (map[string]any, []any, error) {
77+
service := map[string]any{}
78+
var serviceExtensions []any
79+
if s, hasService := out["service"]; hasService && s != nil {
80+
service = s.(map[string]any)
81+
if ses, hasExtensions := service["extensions"]; hasExtensions && ses != nil {
82+
var err error
83+
if serviceExtensions, err = toAnySlice(ses); err != nil {
84+
return nil, nil, fmt.Errorf("cannot determine service extensions: %w", err)
85+
}
86+
}
87+
}
88+
return service, serviceExtensions, nil
89+
}
90+
91+
func getDiscoExtensions(service map[string]any) (bool, []any, error) {
92+
var isSet bool
93+
var extensions []any
94+
if des, hasDiscoExtensions := service[discovery.DiscoExtensionsKey]; hasDiscoExtensions {
95+
isSet = true
96+
delete(service, discovery.DiscoExtensionsKey)
97+
var err error
98+
if extensions, err = toAnySlice(des); err != nil {
99+
return false, nil, fmt.Errorf("cannot determine discovery extensions: %w", err)
100+
}
101+
}
102+
return isSet, extensions, nil
103+
}
104+
105+
func getDiscoReceivers(service map[string]any) (bool, []any, error) {
106+
var isSet bool
107+
var receivers []any
108+
if des, hasDiscoReceivers := service[discovery.DiscoReceiversKey]; hasDiscoReceivers {
109+
isSet = true
110+
delete(service, discovery.DiscoReceiversKey)
111+
var err error
112+
if receivers, err = toAnySlice(des); err != nil {
113+
return false, nil, fmt.Errorf("cannot determine discovery receivers: %w", err)
114+
}
115+
}
116+
return isSet, receivers, nil
117+
}
118+
119+
func getMetricsPipelineAndReceivers(service map[string]any) (map[string]any, []any, error) {
120+
pipelines := map[string]any{}
121+
if pl, ok := service["pipelines"]; ok && pl != nil {
122+
pipelines = pl.(map[string]any)
123+
}
124+
service["pipelines"] = pipelines
125+
126+
metricsPipeline := map[string]any{}
127+
if mp, ok := pipelines["metrics"]; ok && mp != nil {
128+
metricsPipeline = mp.(map[string]any)
129+
}
130+
pipelines["metrics"] = metricsPipeline
131+
132+
var metricsReceivers []any
133+
if mr, ok := metricsPipeline["receivers"]; ok && mr != nil {
134+
var err error
135+
if metricsReceivers, err = toAnySlice(mr); err != nil {
136+
return nil, nil, fmt.Errorf("cannot determine metrics pipeline receivers: %w", err)
137+
}
138+
}
139+
return metricsPipeline, metricsReceivers, nil
140+
}
141+
142+
func appendUnique(serviceComponents []any, discoComponents []any) []any {
143+
existing := map[any]struct{}{}
144+
for _, e := range serviceComponents {
145+
existing[e] = struct{}{}
146+
}
147+
for _, e := range discoComponents {
148+
if _, exists := existing[e]; !exists {
149+
serviceComponents = append(serviceComponents, e)
150+
}
151+
}
152+
return serviceComponents
153+
}
154+
155+
func toAnySlice(s any) ([]any, error) {
156+
var out []any
157+
switch v := s.(type) {
158+
case []any:
159+
out = v
160+
case []string:
161+
for _, i := range v {
162+
out = append(out, i)
163+
}
164+
default:
165+
return nil, fmt.Errorf("unexpected form %T", s)
166+
}
167+
return out, nil
168+
}
Lines changed: 216 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,216 @@
1+
// Copyright The OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package configconverter
16+
17+
import (
18+
"context"
19+
"testing"
20+
21+
"github.com/stretchr/testify/require"
22+
"go.opentelemetry.io/collector/confmap"
23+
"gopkg.in/yaml.v3"
24+
)
25+
26+
func TestDiscovery(t *testing.T) {
27+
in := confFromYaml(t, `service:
28+
extensions: [ext/one, ext/two, ext/three, ext/four]
29+
extensions/splunk.discovery: [ext/four, ext/five]
30+
pipelines:
31+
metrics:
32+
receivers: [recv/one, recv/two, recv/three, recv/four]
33+
processors: [proc/one, proc/two, proc/three]
34+
exporters: [exp/one, exp/two, exp/three]
35+
metrics/untouched:
36+
receivers: [recv/six, recv/seven, recv/eight]
37+
processors: [proc/six, proc/seven, proc/eight]
38+
exporters: [exp/six, exp/seven, exp/eight]
39+
receivers/splunk.discovery: [recv/four, recv/five]
40+
`)
41+
42+
expected := confFromYaml(t, `service:
43+
extensions: [ext/one, ext/two, ext/three, ext/four, ext/five]
44+
pipelines:
45+
metrics:
46+
receivers: [recv/one, recv/two, recv/three, recv/four, recv/five]
47+
processors: [proc/one, proc/two, proc/three]
48+
exporters: [exp/one, exp/two, exp/three]
49+
metrics/untouched:
50+
receivers: [recv/six, recv/seven, recv/eight]
51+
processors: [proc/six, proc/seven, proc/eight]
52+
exporters: [exp/six, exp/seven, exp/eight]
53+
`)
54+
55+
require.NoError(t, Discovery{}.Convert(context.Background(), in))
56+
require.Equal(t, expected.ToStringMap(), in.ToStringMap())
57+
}
58+
59+
func TestDiscoveryNotDetected(t *testing.T) {
60+
in := confFromYaml(t, `service:
61+
extensions: [ext/one, ext/two, ext/three]
62+
pipelines:
63+
metrics:
64+
receivers: [recv/one, recv/two, recv/three]
65+
processors: [proc/one, proc/two, proc/three]
66+
exporters: [exp/one, exp/two, exp/three]
67+
metrics/untouched:
68+
receivers: [recv/six, recv/seven, recv/eight]
69+
processors: [proc/six, proc/seven, proc/eight]
70+
exporters: [exp/six, exp/seven, exp/eight]
71+
`)
72+
73+
expected := confFromYaml(t, `service:
74+
extensions: [ext/one, ext/two, ext/three]
75+
pipelines:
76+
metrics:
77+
receivers: [recv/one, recv/two, recv/three]
78+
processors: [proc/one, proc/two, proc/three]
79+
exporters: [exp/one, exp/two, exp/three]
80+
metrics/untouched:
81+
receivers: [recv/six, recv/seven, recv/eight]
82+
processors: [proc/six, proc/seven, proc/eight]
83+
exporters: [exp/six, exp/seven, exp/eight]
84+
`)
85+
86+
require.NoError(t, Discovery{}.Convert(context.Background(), in))
87+
require.Equal(t, expected.ToStringMap(), in.ToStringMap())
88+
}
89+
90+
func TestDiscoveryExtensionsOnly(t *testing.T) {
91+
in := confFromYaml(t, `service:
92+
extensions/splunk.discovery: [ext/one, ext/two]
93+
pipelines:
94+
metrics:
95+
receivers: [recv/one, recv/two, recv/three]
96+
processors: [proc/one, proc/two, proc/three]
97+
exporters: [exp/one, exp/two, exp/three]
98+
metrics/untouched:
99+
receivers: [recv/six, recv/seven, recv/eight]
100+
processors: [proc/six, proc/seven, proc/eight]
101+
exporters: [exp/six, exp/seven, exp/eight]
102+
`)
103+
104+
expected := confFromYaml(t, `service:
105+
extensions: [ext/one, ext/two]
106+
pipelines:
107+
metrics:
108+
receivers: [recv/one, recv/two, recv/three]
109+
processors: [proc/one, proc/two, proc/three]
110+
exporters: [exp/one, exp/two, exp/three]
111+
metrics/untouched:
112+
receivers: [recv/six, recv/seven, recv/eight]
113+
processors: [proc/six, proc/seven, proc/eight]
114+
exporters: [exp/six, exp/seven, exp/eight]
115+
`)
116+
117+
require.NoError(t, Discovery{}.Convert(context.Background(), in))
118+
require.Equal(t, expected.ToStringMap(), in.ToStringMap())
119+
}
120+
121+
func TestDiscoveryEmptyExtensions(t *testing.T) {
122+
in := confFromYaml(t, `service:
123+
extensions/splunk.discovery: []
124+
pipelines:
125+
metrics:
126+
receivers: [recv/one, recv/two, recv/three]
127+
processors: [proc/one, proc/two, proc/three]
128+
exporters: [exp/one, exp/two, exp/three]
129+
metrics/untouched:
130+
receivers: [recv/six, recv/seven, recv/eight]
131+
processors: [proc/six, proc/seven, proc/eight]
132+
exporters: [exp/six, exp/seven, exp/eight]
133+
receivers/splunk.discovery: [recv/four, recv/five]
134+
`)
135+
136+
expected := confFromYaml(t, `service:
137+
pipelines:
138+
metrics:
139+
receivers: [recv/one, recv/two, recv/three, recv/four, recv/five]
140+
processors: [proc/one, proc/two, proc/three]
141+
exporters: [exp/one, exp/two, exp/three]
142+
metrics/untouched:
143+
receivers: [recv/six, recv/seven, recv/eight]
144+
processors: [proc/six, proc/seven, proc/eight]
145+
exporters: [exp/six, exp/seven, exp/eight]
146+
`)
147+
148+
require.NoError(t, Discovery{}.Convert(context.Background(), in))
149+
require.Equal(t, expected.ToStringMap(), in.ToStringMap())
150+
}
151+
152+
func TestDiscoveryReceiversOnly(t *testing.T) {
153+
in := confFromYaml(t, `service:
154+
pipelines:
155+
metrics:
156+
receivers: []
157+
processors: [proc/one, proc/two, proc/three]
158+
exporters: [exp/one, exp/two, exp/three]
159+
metrics/untouched:
160+
receivers: [recv/six, recv/seven, recv/eight]
161+
processors: [proc/six, proc/seven, proc/eight]
162+
exporters: [exp/six, exp/seven, exp/eight]
163+
receivers/splunk.discovery: [recv/one, recv/two]
164+
`)
165+
166+
expected := confFromYaml(t, `service:
167+
pipelines:
168+
metrics:
169+
receivers: [recv/one, recv/two]
170+
processors: [proc/one, proc/two, proc/three]
171+
exporters: [exp/one, exp/two, exp/three]
172+
metrics/untouched:
173+
receivers: [recv/six, recv/seven, recv/eight]
174+
processors: [proc/six, proc/seven, proc/eight]
175+
exporters: [exp/six, exp/seven, exp/eight]
176+
`)
177+
178+
require.NoError(t, Discovery{}.Convert(context.Background(), in))
179+
require.Equal(t, expected.ToStringMap(), in.ToStringMap())
180+
}
181+
182+
func TestDiscoveryEmptyReceivers(t *testing.T) {
183+
in := confFromYaml(t, `service:
184+
pipelines:
185+
metrics:
186+
processors: [proc/one, proc/two, proc/three]
187+
exporters: [exp/one, exp/two, exp/three]
188+
metrics/untouched:
189+
receivers: [recv/six, recv/seven, recv/eight]
190+
processors: [proc/six, proc/seven, proc/eight]
191+
exporters: [exp/six, exp/seven, exp/eight]
192+
receivers/splunk.discovery: []
193+
`)
194+
195+
expected := confFromYaml(t, `service:
196+
pipelines:
197+
metrics:
198+
processors: [proc/one, proc/two, proc/three]
199+
exporters: [exp/one, exp/two, exp/three]
200+
metrics/untouched:
201+
receivers: [recv/six, recv/seven, recv/eight]
202+
processors: [proc/six, proc/seven, proc/eight]
203+
exporters: [exp/six, exp/seven, exp/eight]
204+
`)
205+
206+
require.NoError(t, Discovery{}.Convert(context.Background(), in))
207+
require.Equal(t, expected.ToStringMap(), in.ToStringMap())
208+
}
209+
210+
func confFromYaml(t testing.TB, content string) *confmap.Conf {
211+
var conf map[string]any
212+
if err := yaml.Unmarshal([]byte(content), &conf); err != nil {
213+
t.Errorf("failed loading conf from yaml: %v", err)
214+
}
215+
return confmap.NewFromStringMap(conf)
216+
}

0 commit comments

Comments
 (0)