Skip to content

Commit 4f8b075

Browse files
a-thaleratoulme
andauthored
[receiver/receivercreator] Add support for k8s service discovery (#29022)
**Description:** - Added a new watch to the k8s_observer extension for k8s services, which can be enabled using a new flag "observe_services". - Discovered entities are transformed into a new endpoint type `k8s.service`. - Adjusted the receivercreator to support the new type `k8s.service` **Link to tracking Issue:** [#29021](#29021) **Testing:** Added unit tests analogue to the available tests **Documentation:** Adjusted readme's of k8s_observer and receivercreator. Added description of new flags and typers. **Note:** Current implementation is working as described in the linked ticket. Please check the potential discussion points mentioned in the ticket: #29021 (comment) --------- Co-authored-by: Antoine Toulme <[email protected]>
1 parent 01559fb commit 4f8b075

22 files changed

+482
-32
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: receivercreator
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Added support for discovery of endpoints based on K8s services
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [29022]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: By discovering endpoints based on K8s services, a dynamic probing of K8s service leveraging for example the httpcheckreceiver get enabled
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

extension/observer/endpoints.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ const (
2323
PortType EndpointType = "port"
2424
// PodType is a pod endpoint.
2525
PodType EndpointType = "pod"
26+
// K8sServiceType is a service endpoint.
27+
K8sServiceType EndpointType = "k8s.service"
2628
// K8sNodeType is a Kubernetes Node endpoint.
2729
K8sNodeType EndpointType = "k8s.node"
2830
// HostPortType is a hostport endpoint.
@@ -34,6 +36,7 @@ const (
3436
var (
3537
_ EndpointDetails = (*Pod)(nil)
3638
_ EndpointDetails = (*Port)(nil)
39+
_ EndpointDetails = (*K8sService)(nil)
3740
_ EndpointDetails = (*K8sNode)(nil)
3841
_ EndpointDetails = (*HostPort)(nil)
3942
_ EndpointDetails = (*Container)(nil)
@@ -92,6 +95,40 @@ func (e Endpoint) equals(other Endpoint) bool {
9295
}
9396
}
9497

98+
// K8sService is a discovered k8s service.
99+
type K8sService struct {
100+
// Name of the service.
101+
Name string
102+
// UID is the unique ID in the cluster for the service.
103+
UID string
104+
// Labels is a map of user-specified metadata.
105+
Labels map[string]string
106+
// Annotations is a map of user-specified metadata.
107+
Annotations map[string]string
108+
// Namespace must be unique for services with same name.
109+
Namespace string
110+
// ClusterIP is the IP under which the service is reachable within the cluster.
111+
ClusterIP string
112+
// ServiceType is the type of the service: ClusterIP, NodePort, LoadBalancer, ExternalName
113+
ServiceType string
114+
}
115+
116+
func (s *K8sService) Env() EndpointEnv {
117+
return map[string]any{
118+
"uid": s.UID,
119+
"name": s.Name,
120+
"labels": s.Labels,
121+
"annotations": s.Annotations,
122+
"namespace": s.Namespace,
123+
"cluster_ip": s.ClusterIP,
124+
"service_type": s.ServiceType,
125+
}
126+
}
127+
128+
func (s *K8sService) Type() EndpointType {
129+
return K8sServiceType
130+
}
131+
95132
// Pod is a discovered k8s pod.
96133
type Pod struct {
97134
// Name of the pod.

extension/observer/endpoints_test.go

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ func TestEndpointEnv(t *testing.T) {
4949
},
5050
},
5151
{
52-
name: "K8s port",
52+
name: "K8s pod port",
5353
endpoint: Endpoint{
5454
ID: EndpointID("port_id"),
5555
Target: "192.68.73.2",
@@ -90,6 +90,42 @@ func TestEndpointEnv(t *testing.T) {
9090
"transport": ProtocolTCP,
9191
},
9292
},
93+
{
94+
name: "Service",
95+
endpoint: Endpoint{
96+
ID: EndpointID("service_id"),
97+
Target: "service.namespace",
98+
Details: &K8sService{
99+
Name: "service_name",
100+
UID: "service-uid",
101+
Labels: map[string]string{
102+
"label_key": "label_val",
103+
},
104+
Annotations: map[string]string{
105+
"annotation_1": "value_1",
106+
},
107+
Namespace: "service-namespace",
108+
ServiceType: "LoadBalancer",
109+
ClusterIP: "192.68.73.2",
110+
},
111+
},
112+
want: EndpointEnv{
113+
"type": "k8s.service",
114+
"endpoint": "service.namespace",
115+
"id": "service_id",
116+
"name": "service_name",
117+
"labels": map[string]string{
118+
"label_key": "label_val",
119+
},
120+
"annotations": map[string]string{
121+
"annotation_1": "value_1",
122+
},
123+
"uid": "service-uid",
124+
"namespace": "service-namespace",
125+
"cluster_ip": "192.68.73.2",
126+
"service_type": "LoadBalancer",
127+
},
128+
},
93129
{
94130
name: "Host port",
95131
endpoint: Endpoint{

extension/observer/k8sobserver/README.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
<!-- end autogenerated section -->
1616

1717
The `k8s_observer` is a [Receiver Creator](../../../receiver/receivercreator/README.md)-compatible "watch observer" that will detect and report
18-
Kubernetes pod, port, and node endpoints via the Kubernetes API.
18+
Kubernetes pod, port, service and node endpoints via the Kubernetes API.
1919

2020
## Example Config
2121

@@ -26,6 +26,7 @@ extensions:
2626
node: ${env:K8S_NODE_NAME}
2727
observe_pods: true
2828
observe_nodes: true
29+
observe_services: true
2930

3031
receivers:
3132
receiver_creator:
@@ -71,3 +72,4 @@ All fields are optional.
7172
| node | string | <no value> | The node name to limit the discovery of pod, port, and node endpoints. Providing no value (the default) results in discovering endpoints for all available nodes. |
7273
| observe_pods | bool | `true` | Whether to report observer pod and port endpoints. If `true` and `node` is specified it will only discover pod and port endpoints whose `spec.nodeName` matches the provided node name. If `true` and `node` isn't specified, it will discover all available pod and port endpoints. Please note that Collector connectivity to pods from other nodes is dependent on your cluster configuration and isn't guaranteed. |
7374
| observe_nodes | bool | `false` | Whether to report observer k8s.node endpoints. If `true` and `node` is specified it will only discover node endpoints whose `metadata.name` matches the provided node name. If `true` and `node` isn't specified, it will discover all available node endpoints. Please note that Collector connectivity to nodes is dependent on your cluster configuration and isn't guaranteed.|
75+
| observe_services | bool | `false` | Whether to report observer k8s.service endpoints.|

extension/observer/k8sobserver/config.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,14 @@ type Config struct {
3434
// it will only discover node endpoints whose `metadata.name` matches the provided node name. If `true` and
3535
// Node isn't specified, it will discover all available node endpoints. `false` by default.
3636
ObserveNodes bool `mapstructure:"observe_nodes"`
37+
// ObserveServices determines whether to report observer service and port endpoints. `false` by default.
38+
ObserveServices bool `mapstructure:"observe_services"`
3739
}
3840

3941
// Validate checks if the extension configuration is valid
4042
func (cfg *Config) Validate() error {
41-
if !cfg.ObservePods && !cfg.ObserveNodes {
42-
return fmt.Errorf("one of observe_pods and observe_nodes must be true")
43+
if !cfg.ObservePods && !cfg.ObserveNodes && !cfg.ObserveServices {
44+
return fmt.Errorf("one of observe_pods, observe_nodes and observe_services must be true")
4345
}
4446
return nil
4547
}

extension/observer/k8sobserver/config_test.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,11 @@ func TestLoadConfig(t *testing.T) {
3939
{
4040
id: component.NewIDWithName(metadata.Type, "observe-all"),
4141
expected: &Config{
42-
Node: "",
43-
APIConfig: k8sconfig.APIConfig{AuthType: k8sconfig.AuthTypeNone},
44-
ObservePods: true,
45-
ObserveNodes: true,
42+
Node: "",
43+
APIConfig: k8sconfig.APIConfig{AuthType: k8sconfig.AuthTypeNone},
44+
ObservePods: true,
45+
ObserveNodes: true,
46+
ObserveServices: true,
4647
},
4748
},
4849
{
@@ -51,7 +52,7 @@ func TestLoadConfig(t *testing.T) {
5152
},
5253
{
5354
id: component.NewIDWithName(metadata.Type, "invalid_no_observing"),
54-
expectedErr: "one of observe_pods and observe_nodes must be true",
55+
expectedErr: "one of observe_pods, observe_nodes and observe_services must be true",
5556
},
5657
}
5758
for _, tt := range tests {

extension/observer/k8sobserver/extension.go

Lines changed: 32 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,14 @@ var _ observer.Observable = (*k8sObserver)(nil)
2525

2626
type k8sObserver struct {
2727
*observer.EndpointsWatcher
28-
telemetry component.TelemetrySettings
29-
podListerWatcher cache.ListerWatcher
30-
nodeListerWatcher cache.ListerWatcher
31-
handler *handler
32-
once *sync.Once
33-
stop chan struct{}
34-
config *Config
28+
telemetry component.TelemetrySettings
29+
podListerWatcher cache.ListerWatcher
30+
serviceListerWatcher cache.ListerWatcher
31+
nodeListerWatcher cache.ListerWatcher
32+
handler *handler
33+
once *sync.Once
34+
stop chan struct{}
35+
config *Config
3536
}
3637

3738
// Start will populate the cache.SharedInformers for pods and nodes as configured and run them as goroutines.
@@ -52,6 +53,14 @@ func (k *k8sObserver) Start(_ context.Context, _ component.Host) error {
5253
}
5354
go podInformer.Run(k.stop)
5455
}
56+
if k.serviceListerWatcher != nil {
57+
k.telemetry.Logger.Debug("creating and starting service informer")
58+
serviceInformer := cache.NewSharedInformer(k.serviceListerWatcher, &v1.Service{}, 0)
59+
if _, err := serviceInformer.AddEventHandler(k.handler); err != nil {
60+
k.telemetry.Logger.Error("error adding event handler to service informer", zap.Error(err))
61+
}
62+
go serviceInformer.Run(k.stop)
63+
}
5564
if k.nodeListerWatcher != nil {
5665
k.telemetry.Logger.Debug("creating and starting node informer")
5766
nodeInformer := cache.NewSharedInformer(k.nodeListerWatcher, &v1.Node{}, 0)
@@ -90,6 +99,13 @@ func newObserver(config *Config, set extension.CreateSettings) (extension.Extens
9099
podListerWatcher = cache.NewListWatchFromClient(restClient, "pods", v1.NamespaceAll, podSelector)
91100
}
92101

102+
var serviceListerWatcher cache.ListerWatcher
103+
if config.ObserveServices {
104+
var serviceSelector = fields.Everything()
105+
set.Logger.Debug("observing services")
106+
serviceListerWatcher = cache.NewListWatchFromClient(restClient, "services", v1.NamespaceAll, serviceSelector)
107+
}
108+
93109
var nodeListerWatcher cache.ListerWatcher
94110
if config.ObserveNodes {
95111
var nodeSelector fields.Selector
@@ -103,14 +119,15 @@ func newObserver(config *Config, set extension.CreateSettings) (extension.Extens
103119
}
104120
h := &handler{idNamespace: set.ID.String(), endpoints: &sync.Map{}, logger: set.TelemetrySettings.Logger}
105121
obs := &k8sObserver{
106-
EndpointsWatcher: observer.NewEndpointsWatcher(h, time.Second, set.TelemetrySettings.Logger),
107-
telemetry: set.TelemetrySettings,
108-
podListerWatcher: podListerWatcher,
109-
nodeListerWatcher: nodeListerWatcher,
110-
stop: make(chan struct{}),
111-
config: config,
112-
handler: h,
113-
once: &sync.Once{},
122+
EndpointsWatcher: observer.NewEndpointsWatcher(h, time.Second, set.TelemetrySettings.Logger),
123+
telemetry: set.TelemetrySettings,
124+
podListerWatcher: podListerWatcher,
125+
serviceListerWatcher: serviceListerWatcher,
126+
nodeListerWatcher: nodeListerWatcher,
127+
stop: make(chan struct{}),
128+
config: config,
129+
handler: h,
130+
once: &sync.Once{},
114131
}
115132

116133
return obs, nil

extension/observer/k8sobserver/extension_test.go

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,94 @@ func TestNewExtension(t *testing.T) {
4040
require.NotNil(t, ext)
4141
}
4242

43+
func TestExtensionObserveServices(t *testing.T) {
44+
factory := NewFactory()
45+
config := factory.CreateDefaultConfig().(*Config)
46+
mockServiceHost(t, config)
47+
48+
set := extensiontest.NewNopCreateSettings()
49+
set.ID = component.NewID(metadata.Type)
50+
ext, err := newObserver(config, set)
51+
require.NoError(t, err)
52+
require.NotNil(t, ext)
53+
54+
obs := ext.(*k8sObserver)
55+
serviceListerWatcher := framework.NewFakeControllerSource()
56+
obs.serviceListerWatcher = serviceListerWatcher
57+
58+
serviceListerWatcher.Add(serviceWithClusterIP)
59+
60+
require.NoError(t, ext.Start(context.Background(), componenttest.NewNopHost()))
61+
62+
sink := &endpointSink{}
63+
obs.ListAndWatch(sink)
64+
65+
requireSink(t, sink, func() bool {
66+
return len(sink.added) == 1
67+
})
68+
69+
assert.Equal(t, observer.Endpoint{
70+
ID: "k8s_observer/service-1-UID",
71+
Target: "service-1.default.svc.cluster.local",
72+
Details: &observer.K8sService{
73+
Name: "service-1",
74+
Namespace: "default",
75+
UID: "service-1-UID",
76+
Labels: map[string]string{
77+
"env": "prod",
78+
},
79+
ClusterIP: "1.2.3.4",
80+
ServiceType: "ClusterIP",
81+
},
82+
}, sink.added[0])
83+
84+
serviceListerWatcher.Modify(serviceWithClusterIPV2)
85+
86+
requireSink(t, sink, func() bool {
87+
return len(sink.changed) == 1
88+
})
89+
90+
assert.Equal(t, observer.Endpoint{
91+
ID: "k8s_observer/service-1-UID",
92+
Target: "service-1.default.svc.cluster.local",
93+
Details: &observer.K8sService{
94+
Name: "service-1",
95+
Namespace: "default",
96+
UID: "service-1-UID",
97+
Labels: map[string]string{
98+
"env": "prod",
99+
"service-version": "2",
100+
},
101+
ClusterIP: "1.2.3.4",
102+
ServiceType: "ClusterIP",
103+
},
104+
}, sink.changed[0])
105+
106+
serviceListerWatcher.Delete(serviceWithClusterIPV2)
107+
108+
requireSink(t, sink, func() bool {
109+
return len(sink.removed) == 1
110+
})
111+
112+
assert.Equal(t, observer.Endpoint{
113+
ID: "k8s_observer/service-1-UID",
114+
Target: "service-1.default.svc.cluster.local",
115+
Details: &observer.K8sService{
116+
Name: "service-1",
117+
Namespace: "default",
118+
UID: "service-1-UID",
119+
Labels: map[string]string{
120+
"env": "prod",
121+
"service-version": "2",
122+
},
123+
ClusterIP: "1.2.3.4",
124+
ServiceType: "ClusterIP",
125+
},
126+
}, sink.removed[0])
127+
128+
require.NoError(t, ext.Shutdown(context.Background()))
129+
}
130+
43131
func TestExtensionObservePods(t *testing.T) {
44132
factory := NewFactory()
45133
config := factory.CreateDefaultConfig().(*Config)

0 commit comments

Comments
 (0)