Skip to content

Commit d904716

Browse files
Add documentation example for processor package (#5675) (#13070)
#### Description This PR introduces a testable example to the package [processor](/processor). it's a similar example to the documentation one on https://opentelemetry.io/docs/collector/building/receiver/#designing-and-validating-receiver-settings #### Issue: #5675 (comment) #### Testing the testable example was executed successfully: ![image](https://github.com/user-attachments/assets/a57a2bc8-22f0-4063-a1b4-aac640423219) #### Documentation this testable example follows https://go.dev/blog/examples#larger-examples format and should be automatically added to `processor` package documentation
1 parent fd4d3c4 commit d904716

File tree

2 files changed

+105
-0
lines changed

2 files changed

+105
-0
lines changed

processor/README.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ Some important aspects of pipelines and processors to be aware of:
1111
- [Exclusive Ownership](#exclusive-ownership)
1212
- [Shared Ownership](#shared-ownership)
1313
- [Ordering Processors](#ordering-processors)
14+
- [Creating Custom Processor](#creating-custom-processors)
1415

1516
Supported processors (sorted alphabetically):
1617
- [Batch Processor](batchprocessor/README.md)
@@ -107,3 +108,8 @@ data cloning described in Exclusive Ownership section.
107108

108109
The order processors are specified in a pipeline is important as this is the
109110
order in which each processor is applied.
111+
112+
## Creating Custom Processors
113+
114+
To create a custom processor for the OpenTelemetry Collector, you need to implement the processor interface, define the processor's configuration, and register it with the Collector. The process typically involves creating a factory, implementing the required processing logic, and handling configuration options. For a practical example and guidance, refer to the [`processorhelper`](https://pkg.go.dev/go.opentelemetry.io/collector/processor/processorhelper) package, which provides utilities and patterns to simplify processor development.
115+
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package processorhelper_test
5+
6+
import (
7+
"context"
8+
"fmt"
9+
10+
"go.opentelemetry.io/collector/component"
11+
"go.opentelemetry.io/collector/consumer"
12+
"go.opentelemetry.io/collector/pdata/pmetric"
13+
"go.opentelemetry.io/collector/processor"
14+
"go.opentelemetry.io/collector/processor/processorhelper"
15+
)
16+
17+
// typeStr defines the unique type identifier for the processor.
18+
var typeStr = component.MustNewType("example")
19+
20+
// exampleConfig holds configuration settings for the processor.
21+
type exampleConfig struct{}
22+
23+
// exampleProcessor implements the OpenTelemetry processor interface.
24+
type exampleProcessor struct {
25+
cancel context.CancelFunc
26+
config exampleConfig
27+
}
28+
29+
// Example demonstrates the usage of the processor factory.
30+
func Example() {
31+
// Instantiate the processor factory and print its type.
32+
exampleProcessor := NewFactory()
33+
fmt.Println(exampleProcessor.Type())
34+
35+
// Output:
36+
// example
37+
}
38+
39+
// NewFactory creates a new processor factory.
40+
func NewFactory() processor.Factory {
41+
return processor.NewFactory(
42+
typeStr,
43+
createDefaultConfig,
44+
processor.WithMetrics(createExampleProcessor, component.StabilityLevelAlpha),
45+
)
46+
}
47+
48+
// createDefaultConfig returns the default configuration for the processor.
49+
func createDefaultConfig() component.Config {
50+
return &exampleConfig{}
51+
}
52+
53+
// createExampleProcessor initializes an instance of the example processor.
54+
func createExampleProcessor(ctx context.Context, params processor.Settings, baseCfg component.Config, next consumer.Metrics) (processor.Metrics, error) {
55+
// Convert baseCfg to the correct type.
56+
cfg := baseCfg.(*exampleConfig)
57+
58+
// Create a new processor instance.
59+
pcsr := newExampleProcessor(ctx, cfg)
60+
61+
// Wrap the processor with the helper utilities.
62+
return processorhelper.NewMetrics(
63+
ctx,
64+
params,
65+
cfg,
66+
next,
67+
pcsr.consumeMetrics,
68+
processorhelper.WithCapabilities(consumer.Capabilities{MutatesData: true}),
69+
processorhelper.WithShutdown(pcsr.shutdown),
70+
)
71+
}
72+
73+
// newExampleProcessor constructs a new instance of the example processor.
74+
func newExampleProcessor(ctx context.Context, cfg *exampleConfig) *exampleProcessor {
75+
pcsr := &exampleProcessor{
76+
config: *cfg,
77+
}
78+
79+
// Create a cancelable context.
80+
_, pcsr.cancel = context.WithCancel(ctx)
81+
82+
return pcsr
83+
}
84+
85+
// ConsumeMetrics modify metrics adding one attribute to resource.
86+
func (pcsr *exampleProcessor) consumeMetrics(_ context.Context, md pmetric.Metrics) (pmetric.Metrics, error) {
87+
rm := md.ResourceMetrics()
88+
for i := 0; i < rm.Len(); i++ {
89+
rm.At(i).Resource().Attributes().PutStr("processed_by", "exampleProcessor")
90+
}
91+
92+
return md, nil
93+
}
94+
95+
// Shutdown properly stops the processor and releases resources.
96+
func (pcsr *exampleProcessor) shutdown(_ context.Context) error {
97+
pcsr.cancel()
98+
return nil
99+
}

0 commit comments

Comments
 (0)