Skip to content

Commit f04b3f1

Browse files
committed
[receiver/receiver_creator] Add support for logs' hints
Signed-off-by: ChrsMark <[email protected]>
1 parent 040cb47 commit f04b3f1

File tree

8 files changed

+643
-15
lines changed

8 files changed

+643
-15
lines changed

.chloggen/f_hints_logs.yaml

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: receivercreator
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Add support for starting logs' collection based on provided k8s annotations' hints
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [34427]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

receiver/receivercreator/README.md

Lines changed: 100 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -458,7 +458,8 @@ receiver_creator/metrics:
458458
# ignore_receivers: []
459459
```
460460

461-
Find bellow the supported annotations that user can define to automatically enable receivers to start collecting metrics signals from the target Pods/containers.
461+
Find bellow the supported annotations that user can define to automatically enable receivers to start
462+
collecting metrics signals from the target Pods/containers.
462463

463464
### Supported metrics annotations
464465

@@ -511,11 +512,82 @@ The current implementation relies on the implementation of `k8sobserver` extensi
511512
the [pod_endpoint](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/v0.111.0/extension/observer/k8sobserver/pod_endpoint.go).
512513
The hints are evaluated per container by extracting the annotations from each [`Port` endpoint](#Port) that is emitted.
513514

515+
### Supported logs annotations
516+
517+
This feature enables `filelog` receiver along with the `container` parser in order to collect logs from the discovered
518+
Pods.
519+
520+
#### Enable/disable discovery
521+
522+
`io.opentelemetry.discovery.logs/enabled` (Required. Example: `"true"`)
523+
524+
By default `"false"`.
525+
526+
The following configuration can be used:
527+
528+
```yaml
529+
receiver_creator/logs:
530+
watch_observers: [ k8s_observer ]
531+
discovery:
532+
enabled: true
533+
```
534+
#### Define configuration
535+
The default configuration for the `filelog` receiver is the following:
536+
537+
```yaml
538+
include:
539+
- /var/log/pods/`pod.namespace`_`pod.name`_`pod.uid`/`container_name`/*.log
540+
include_file_name: false
541+
include_file_path: true
542+
operators:
543+
- id: container-parser
544+
type: container
545+
```
546+
This default can be extended using the respective annotation:
547+
`io.opentelemetry.discovery.logs/config`
548+
549+
**Example:**
550+
551+
```yaml
552+
io.opentelemetry.discovery.logs/config: |
553+
include_file_name: true
554+
max_log_size: "2MiB"
555+
operators:
556+
- type: regex_parser
557+
regex: "^(?P<time>\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}) (?P<sev>[A-Z]*) (?P<msg>.*)$"
558+
```
559+
560+
Note that individual settings are overridden by the configuration provided by the hints while the operators list
561+
is extended keeping first the `container` parser.
562+
563+
564+
#### Support multiple target containers
565+
566+
Users can target the annotation to a specific container by suffixing it with the name of that container:
567+
`io.opentelemetry.discovery.logs.<container_name>/endpoint`.
568+
For example:
569+
```yaml
570+
io.opentelemetry.discovery.logs.busybox/config: |
571+
max_log_size: "3MiB"
572+
operators:
573+
- id: some
574+
type: add
575+
field: attributes.tag
576+
value: hints
577+
```
578+
where `busybox` is the name of the target container.
579+
580+
If a Pod is annotated with both container level hints and pod level hints the container level hints have priority and
581+
the Pod level hints are used as a fallback (see detailed example bellow).
582+
583+
The current implementation relies on the implementation of `k8sobserver` extension and specifically
584+
the [pod_endpoint](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/v0.111.0/extension/observer/k8sobserver/pod_endpoint.go).
585+
The hints are evaluated per container by extracting the annotations from each [`Pod Container` endpoint](#Pod Container) that is emitted.
514586

515587

516588
### Examples
517589

518-
#### Metrics example
590+
#### Metrics and Logs example
519591

520592
Collector's configuration:
521593
```yaml
@@ -525,12 +597,22 @@ receivers:
525597
discovery:
526598
enabled: true
527599
receivers:
600+
601+
receiver_creator/logs:
602+
watch_observers: [ k8s_observer ]
603+
discovery:
604+
enabled: true
605+
receivers:
528606
529607
service:
530608
extensions: [ k8s_observer]
531609
pipelines:
532610
metrics:
533-
receivers: [ receiver_creator ]
611+
receivers: [ receiver_creator/metrics ]
612+
processors: []
613+
exporters: [ debug ]
614+
logs:
615+
receivers: [ receiver_creator/logs ]
534616
processors: []
535617
exporters: [ debug ]
536618
```
@@ -600,6 +682,21 @@ spec:
600682
endpoint: "http://`endpoint`/nginx_status"
601683
collection_interval: "30s"
602684
timeout: "20s"
685+
686+
# redis pod container logs hints
687+
io.opentelemetry.discovery.logs.redis/enabled: "true"
688+
io.opentelemetry.discovery.logs.redis/config: |
689+
max_log_size: "4MiB"
690+
operators:
691+
- id: some
692+
type: add
693+
field: attributes.tag
694+
value: logs_hints
695+
696+
# nginx pod container logs hints
697+
io.opentelemetry.discovery.logs.webserver/enabled: "true"
698+
io.opentelemetry.discovery.logs.webserver/config: |
699+
max_log_size: "3MiB"
603700
spec:
604701
volumes:
605702
- name: nginx-conf

receiver/receivercreator/config_test.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -299,3 +299,40 @@ func (*nopWithoutEndpointFactory) CreateTraces(
299299
cfg: cfg,
300300
}, nil
301301
}
302+
303+
type nopWithFilelogConfig struct {
304+
Include []string `mapstructure:"include"`
305+
IncludeFileName bool `mapstructure:"include_file_name"`
306+
IncludeFilePath bool `mapstructure:"include_file_path"`
307+
Operators []any `mapstructure:"operators"`
308+
}
309+
310+
type nopWithFilelogFactory struct {
311+
rcvr.Factory
312+
}
313+
314+
type nopWithFilelogReceiver struct {
315+
mockComponent
316+
consumer.Logs
317+
consumer.Metrics
318+
consumer.Traces
319+
rcvr.Settings
320+
cfg component.Config
321+
}
322+
323+
func (*nopWithFilelogFactory) CreateDefaultConfig() component.Config {
324+
return &nopWithFilelogConfig{}
325+
}
326+
327+
func (*nopWithFilelogFactory) CreateLogs(
328+
_ context.Context,
329+
rcs rcvr.Settings,
330+
cfg component.Config,
331+
nextConsumer consumer.Logs,
332+
) (rcvr.Logs, error) {
333+
return &nopWithEndpointReceiver{
334+
Logs: nextConsumer,
335+
Settings: rcs,
336+
cfg: cfg,
337+
}, nil
338+
}

receiver/receivercreator/discovery.go

Lines changed: 99 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,16 @@ const (
2121

2222
// hint suffix for metrics
2323
otelMetricsHints = otelHints + ".metrics"
24+
otelLogsHints = otelHints + ".logs"
2425

2526
// hints definitions
2627
discoveryEnabledHint = "enabled"
2728
scraperHint = "scraper"
2829
configHint = "config"
30+
31+
logsReceiver = "filelog"
32+
defaultLogPathPattern = "/var/log/pods/%s_%s_%s/%s/*.log"
33+
filelogOperatorsConfigKey = "operators"
2934
)
3035

3136
// k8sHintsBuilder creates configurations from hints provided as Pod's annotations.
@@ -57,7 +62,7 @@ func (builder *k8sHintsBuilder) createReceiverTemplateFromHints(env observer.End
5762
return nil, fmt.Errorf("could not get endpoint type: %v", zap.Any("env", env))
5863
}
5964

60-
if endpointType != string(observer.PortType) {
65+
if endpointType != string(observer.PortType) && endpointType != string(observer.PodContainerType) {
6166
return nil, nil
6267
}
6368

@@ -72,7 +77,14 @@ func (builder *k8sHintsBuilder) createReceiverTemplateFromHints(env observer.End
7277
return nil, nil
7378
}
7479

75-
return builder.createScraper(pod.Annotations, env)
80+
switch endpointType {
81+
case string(observer.PortType):
82+
return builder.createScraper(pod.Annotations, env)
83+
case string(observer.PodContainerType):
84+
return builder.createLogsReceiver(pod.Annotations, env)
85+
default:
86+
return nil, nil
87+
}
7688
}
7789

7890
func (builder *k8sHintsBuilder) createScraper(
@@ -91,7 +103,7 @@ func (builder *k8sHintsBuilder) createScraper(
91103
port = p.Port
92104
pod := p.Pod
93105

94-
if !discoveryMetricsEnabled(annotations, otelMetricsHints, fmt.Sprint(port)) {
106+
if !discoveryEnabled(annotations, otelMetricsHints, fmt.Sprint(port)) {
95107
return nil, nil
96108
}
97109

@@ -118,6 +130,48 @@ func (builder *k8sHintsBuilder) createScraper(
118130
return &recTemplate, err
119131
}
120132

133+
func (builder *k8sHintsBuilder) createLogsReceiver(
134+
annotations map[string]string,
135+
env observer.EndpointEnv,
136+
) (*receiverTemplate, error) {
137+
if _, ok := builder.ignoreReceivers[logsReceiver]; ok {
138+
// receiver is ignored
139+
return nil, nil
140+
}
141+
142+
var containerName string
143+
var c observer.PodContainer
144+
err := mapstructure.Decode(env, &c)
145+
if err != nil {
146+
return nil, fmt.Errorf("could not extract pod's container: %v", zap.Any("env", env))
147+
}
148+
if c.Name == "" {
149+
return nil, fmt.Errorf("could not extract container name: %v", zap.Any("container", c))
150+
}
151+
containerName = c.Name
152+
pod := c.Pod
153+
154+
if !discoveryEnabled(annotations, otelLogsHints, containerName) {
155+
return nil, nil
156+
}
157+
158+
subreceiverKey := logsReceiver
159+
builder.logger.Debug("handling added hinted receiver", zap.Any("subreceiverKey", subreceiverKey))
160+
161+
userConfMap := createLogsConfig(
162+
annotations,
163+
containerName,
164+
pod.UID,
165+
pod.Name,
166+
pod.Namespace,
167+
builder.logger)
168+
169+
recTemplate, err := newReceiverTemplate(fmt.Sprintf("%v/%v_%v", subreceiverKey, pod.UID, containerName), userConfMap)
170+
recTemplate.signals = receiverSignals{false, true, false}
171+
172+
return &recTemplate, err
173+
}
174+
121175
func getScraperConfFromAnnotations(
122176
annotations map[string]string,
123177
defaultEndpoint, scopeSuffix string,
@@ -149,6 +203,47 @@ func getScraperConfFromAnnotations(
149203
return conf, nil
150204
}
151205

206+
func createLogsConfig(
207+
annotations map[string]string,
208+
containerName, podUID, podName, namespace string,
209+
logger *zap.Logger,
210+
) userConfigMap {
211+
scopeSuffix := containerName
212+
logPath := fmt.Sprintf(defaultLogPathPattern, namespace, podName, podUID, containerName)
213+
cont := []any{map[string]any{"id": "container-parser", "type": "container"}}
214+
defaultConfMap := userConfigMap{
215+
"include": []string{logPath},
216+
"include_file_path": true,
217+
"include_file_name": false,
218+
filelogOperatorsConfigKey: cont,
219+
}
220+
221+
configStr, found := getHintAnnotation(annotations, otelLogsHints, configHint, scopeSuffix)
222+
if !found || configStr == "" {
223+
return defaultConfMap
224+
}
225+
226+
userConf := make(map[string]any)
227+
if err := yaml.Unmarshal([]byte(configStr), &userConf); err != nil {
228+
logger.Debug("could not unmarshal configuration from hint", zap.Error(err))
229+
}
230+
231+
for k, v := range userConf {
232+
if k == filelogOperatorsConfigKey {
233+
vlist, ok := v.([]any)
234+
if !ok {
235+
logger.Debug("could not parse operators configuration from hint", zap.Any("config", userConf))
236+
}
237+
vlist = append(cont, vlist...)
238+
defaultConfMap[k] = vlist
239+
} else {
240+
defaultConfMap[k] = v
241+
}
242+
}
243+
244+
return defaultConfMap
245+
}
246+
152247
func getHintAnnotation(annotations map[string]string, hintBase string, hintKey string, suffix string) (string, bool) {
153248
// try to scope the hint more on container level by suffixing
154249
// with .<port> in case of Port event or # TODO: .<container_name> in case of Pod Container event
@@ -162,7 +257,7 @@ func getHintAnnotation(annotations map[string]string, hintBase string, hintKey s
162257
return podLevelHint, ok
163258
}
164259

165-
func discoveryMetricsEnabled(annotations map[string]string, hintBase string, scopeSuffix string) bool {
260+
func discoveryEnabled(annotations map[string]string, hintBase string, scopeSuffix string) bool {
166261
enabledHint, found := getHintAnnotation(annotations, hintBase, discoveryEnabledHint, scopeSuffix)
167262
if !found {
168263
return false

0 commit comments

Comments
 (0)