Skip to content

Commit 153c50d

Browse files
krisztianfeketevincentfree
authored andcommitted
[receiver/k8sobjectsreceiver] Handle missing objects via error_mode (open-telemetry#38851)
#### Description This PR adopts a similar logic to `ottl.ErrorMode` for `k8sobjectsreceiver`, enabling users to choose between ignoring, silencing, and propagating errors for missing objects. The default is `propagate`, therefore it is backward compatible with the current state. <!-- Issue number (e.g. open-telemetry#1234) or full URL to issue, if applicable. --> #### Link to tracking issue Fixes open-telemetry#38803 <!--Describe what testing was performed and which tests were added.--> #### Testing Added tests, and also tested by building my own image and deployed into a Kubernetes cluster. <!--Describe the documentation added.--> #### Documentation I've updated `receiver/k8sobjectsreceiver/config.yaml` and README with the new config options. <!--Please delete paragraphs that you did not use before submitting.-->
1 parent 27c0067 commit 153c50d

File tree

9 files changed

+225
-23
lines changed

9 files changed

+225
-23
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: bug_fix
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: k8sobjectsreceiver
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Introduces `error_mode`, so users can choose between propagating, ignoring, or silencing missing objects.
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [38803]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

receiver/k8sobjectsreceiver/README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,10 @@ Brief description of configuration properties:
4242
the K8s API server. This can be one of `none` (for no auth), `serviceAccount`
4343
(to use the standard service account token provided to the agent pod), or
4444
`kubeConfig` to use credentials from `~/.kube/config`.
45+
- `error_mode` (default = `propagate`): Determines how to handle errors when the receiver is unable to pull or watch objects due to missing resources. This can be one of `propagate`, `ignore`, or `silent`.
46+
- `propagate` will propagate the error to the collector as an Error.
47+
- `ignore` will log and ignore the error and continue.
48+
- `silent` will ignore the error and continue without logging.
4549
- `name`: Name of the resource object to collect
4650
- `mode`: define in which way it collects this type of object, either "pull" or "watch".
4751
- `pull` mode will read all objects of this type use the list API at an interval.

receiver/k8sobjectsreceiver/config.go

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,14 @@ var modeMap = map[mode]bool{
3333
WatchMode: true,
3434
}
3535

36+
type ErrorMode string
37+
38+
const (
39+
PropagateError ErrorMode = "propagate"
40+
IgnoreError ErrorMode = "ignore"
41+
SilentError ErrorMode = "silent"
42+
)
43+
3644
type K8sObjectsConfig struct {
3745
Name string `mapstructure:"name"`
3846
Group string `mapstructure:"group"`
@@ -50,14 +58,21 @@ type K8sObjectsConfig struct {
5058
type Config struct {
5159
k8sconfig.APIConfig `mapstructure:",squash"`
5260

53-
Objects []*K8sObjectsConfig `mapstructure:"objects"`
61+
Objects []*K8sObjectsConfig `mapstructure:"objects"`
62+
ErrorMode ErrorMode `mapstructure:"error_mode"`
5463

5564
// For mocking purposes only.
5665
makeDiscoveryClient func() (discovery.ServerResourcesInterface, error)
5766
makeDynamicClient func() (dynamic.Interface, error)
5867
}
5968

6069
func (c *Config) Validate() error {
70+
switch c.ErrorMode {
71+
case PropagateError, IgnoreError, SilentError:
72+
default:
73+
return fmt.Errorf("invalid error_mode %q: must be one of 'propagate', 'ignore', or 'silent'", c.ErrorMode)
74+
}
75+
6176
for _, object := range c.Objects {
6277
if object.Mode == "" {
6378
object.Mode = defaultMode
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
receivers:
2+
k8sobjects:
3+
error_mode: ignore # Can be "propagate", "ignore", or "silent"
4+
objects:
5+
# ... other configuration ...

receiver/k8sobjectsreceiver/config_test.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,13 @@ func TestLoadConfig(t *testing.T) {
112112

113113
assert.Equal(t, tt.expected.AuthType, cfg.AuthType)
114114
assert.Equal(t, tt.expected.Objects, cfg.Objects)
115+
116+
err = cfg.Validate()
117+
if tt.expected == nil {
118+
assert.Error(t, err)
119+
} else {
120+
assert.NoError(t, err)
121+
}
115122
})
116123
}
117124
}
@@ -125,6 +132,7 @@ func TestValidate(t *testing.T) {
125132
{
126133
desc: "invalid mode",
127134
cfg: &Config{
135+
ErrorMode: PropagateError,
128136
Objects: []*K8sObjectsConfig{
129137
{
130138
Name: "pods",
@@ -137,6 +145,7 @@ func TestValidate(t *testing.T) {
137145
{
138146
desc: "exclude watch type with pull mode",
139147
cfg: &Config{
148+
ErrorMode: PropagateError,
140149
Objects: []*K8sObjectsConfig{
141150
{
142151
Name: "pods",
@@ -152,6 +161,7 @@ func TestValidate(t *testing.T) {
152161
{
153162
desc: "default mode is set",
154163
cfg: &Config{
164+
ErrorMode: PropagateError,
155165
Objects: []*K8sObjectsConfig{
156166
{
157167
Name: "pods",
@@ -162,6 +172,7 @@ func TestValidate(t *testing.T) {
162172
{
163173
desc: "default interval for pull mode",
164174
cfg: &Config{
175+
ErrorMode: PropagateError,
165176
Objects: []*K8sObjectsConfig{
166177
{
167178
Name: "pods",

receiver/k8sobjectsreceiver/factory.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ func createDefaultConfig() component.Config {
3030
APIConfig: k8sconfig.APIConfig{
3131
AuthType: k8sconfig.AuthTypeServiceAccount,
3232
},
33+
ErrorMode: PropagateError,
3334
}
3435
}
3536

receiver/k8sobjectsreceiver/factory_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ func TestDefaultConfig(t *testing.T) {
2727
APIConfig: k8sconfig.APIConfig{
2828
AuthType: k8sconfig.AuthTypeServiceAccount,
2929
},
30+
ErrorMode: PropagateError,
3031
}, rCfg)
3132
}
3233

receiver/k8sobjectsreceiver/receiver.go

Lines changed: 65 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,10 @@ func newReceiver(params receiver.Settings, config *Config, consumer consumer.Log
5959
for _, item := range objects[i].ExcludeWatchType {
6060
objects[i].exclude[item] = true
6161
}
62+
// Set default interval if in PullMode and interval is 0
63+
if objects[i].Mode == PullMode && objects[i].Interval == 0 {
64+
objects[i].Interval = defaultPullInterval
65+
}
6266
}
6367

6468
return &k8sobjectsreceiver{
@@ -84,14 +88,19 @@ func (kr *k8sobjectsreceiver) Start(ctx context.Context, _ component.Host) error
8488
return err
8589
}
8690

91+
var validConfigs []*K8sObjectsConfig
8792
for _, object := range kr.objects {
8893
gvrs, ok := validObjects[object.Name]
8994
if !ok {
90-
availableResource := make([]string, len(validObjects))
95+
availableResource := make([]string, 0, len(validObjects))
9196
for k := range validObjects {
9297
availableResource = append(availableResource, k)
9398
}
94-
return fmt.Errorf("resource %v not found. Valid resources are: %v", object.Name, availableResource)
99+
err := fmt.Errorf("resource not found: %s. Available resources in cluster: %v", object.Name, availableResource)
100+
if handlerErr := kr.handleError(err, ""); handlerErr != nil {
101+
return handlerErr
102+
}
103+
continue
95104
}
96105

97106
gvr := gvrs[0]
@@ -101,15 +110,21 @@ func (kr *k8sobjectsreceiver) Start(ctx context.Context, _ component.Host) error
101110
break
102111
}
103112
}
113+
104114
object.gvr = gvr
115+
validConfigs = append(validConfigs, object)
105116
}
106117

107-
kr.setting.Logger.Info("Object Receiver started")
118+
if len(validConfigs) == 0 {
119+
err := errors.New("no valid Kubernetes objects found to watch")
120+
return err
121+
}
108122

123+
kr.setting.Logger.Info("Object Receiver started")
109124
cctx, cancel := context.WithCancel(ctx)
110125
kr.cancel = cancel
111126

112-
for _, object := range kr.objects {
127+
for _, object := range validConfigs {
113128
kr.start(cctx, object)
114129
}
115130
return nil
@@ -131,7 +146,10 @@ func (kr *k8sobjectsreceiver) Shutdown(context.Context) error {
131146

132147
func (kr *k8sobjectsreceiver) start(ctx context.Context, object *K8sObjectsConfig) {
133148
resource := kr.client.Resource(*object.gvr)
134-
kr.setting.Logger.Info("Started collecting", zap.Any("gvr", object.gvr), zap.Any("mode", object.Mode), zap.Any("namespaces", object.Namespaces))
149+
kr.setting.Logger.Info("Started collecting",
150+
zap.Any("gvr", object.gvr),
151+
zap.Any("mode", object.Mode),
152+
zap.Any("namespaces", object.Namespaces))
135153

136154
switch object.Mode {
137155
case PullMode:
@@ -176,7 +194,9 @@ func (kr *k8sobjectsreceiver) startPull(ctx context.Context, config *K8sObjectsC
176194
case <-ticker.C:
177195
objects, err := resource.List(ctx, listOption)
178196
if err != nil {
179-
kr.setting.Logger.Error("error in pulling object", zap.String("resource", config.gvr.String()), zap.Error(err))
197+
kr.setting.Logger.Error("error in pulling object",
198+
zap.String("resource", config.gvr.String()),
199+
zap.Error(err))
180200
} else if len(objects.Items) > 0 {
181201
logs := pullObjectsToLogData(objects, time.Now(), config)
182202
obsCtx := kr.obsrecv.StartLogsOp(ctx)
@@ -207,7 +227,9 @@ func (kr *k8sobjectsreceiver) startWatch(ctx context.Context, config *K8sObjects
207227
wait.UntilWithContext(cancelCtx, func(newCtx context.Context) {
208228
resourceVersion, err := getResourceVersion(newCtx, &cfgCopy, resource)
209229
if err != nil {
210-
kr.setting.Logger.Error("could not retrieve a resourceVersion", zap.String("resource", cfgCopy.gvr.String()), zap.Error(err))
230+
kr.setting.Logger.Error("could not retrieve a resourceVersion",
231+
zap.String("resource", cfgCopy.gvr.String()),
232+
zap.Error(err))
211233
cancel()
212234
return
213235
}
@@ -227,7 +249,9 @@ func (kr *k8sobjectsreceiver) startWatch(ctx context.Context, config *K8sObjects
227249
func (kr *k8sobjectsreceiver) doWatch(ctx context.Context, config *K8sObjectsConfig, resourceVersion string, watchFunc func(options metav1.ListOptions) (apiWatch.Interface, error), stopperChan chan struct{}) bool {
228250
watcher, err := watch.NewRetryWatcher(resourceVersion, &cache.ListWatch{WatchFunc: watchFunc})
229251
if err != nil {
230-
kr.setting.Logger.Error("error in watching object", zap.String("resource", config.gvr.String()), zap.Error(err))
252+
kr.setting.Logger.Error("error in watching object",
253+
zap.String("resource", config.gvr.String()),
254+
zap.Error(err))
231255
return true
232256
}
233257

@@ -240,19 +264,22 @@ func (kr *k8sobjectsreceiver) doWatch(ctx context.Context, config *K8sObjectsCon
240264
errObject := apierrors.FromObject(data.Object)
241265
//nolint:errorlint
242266
if errObject.(*apierrors.StatusError).ErrStatus.Code == http.StatusGone {
243-
kr.setting.Logger.Info("received a 410, grabbing new resource version", zap.Any("data", data))
267+
kr.setting.Logger.Info("received a 410, grabbing new resource version",
268+
zap.Any("data", data))
244269
// we received a 410 so we need to restart
245270
return false
246271
}
247272
}
248273

249274
if !ok {
250-
kr.setting.Logger.Warn("Watch channel closed unexpectedly", zap.String("resource", config.gvr.String()))
275+
kr.setting.Logger.Warn("Watch channel closed unexpectedly",
276+
zap.String("resource", config.gvr.String()))
251277
return true
252278
}
253279

254280
if config.exclude[data.Type] {
255-
kr.setting.Logger.Debug("dropping excluded data", zap.String("type", string(data.Type)))
281+
kr.setting.Logger.Debug("dropping excluded data",
282+
zap.String("type", string(data.Type)))
256283
continue
257284
}
258285

@@ -321,3 +348,30 @@ func newTicker(ctx context.Context, repeat time.Duration) *time.Ticker {
321348
ticker.C = nc
322349
return ticker
323350
}
351+
352+
// handleError handles errors according to the configured error mode
353+
func (kr *k8sobjectsreceiver) handleError(err error, msg string) error {
354+
if err == nil {
355+
return nil
356+
}
357+
358+
switch kr.config.ErrorMode {
359+
case PropagateError:
360+
if msg != "" {
361+
return fmt.Errorf("%s: %w", msg, err)
362+
}
363+
return err
364+
case IgnoreError:
365+
if msg != "" {
366+
kr.setting.Logger.Info(msg, zap.Error(err))
367+
} else {
368+
kr.setting.Logger.Info(err.Error())
369+
}
370+
return nil
371+
case SilentError:
372+
return nil
373+
default:
374+
// This shouldn't happen as we validate ErrorMode during config validation
375+
return fmt.Errorf("invalid error_mode %q: %w", kr.config.ErrorMode, err)
376+
}
377+
}

0 commit comments

Comments
 (0)