Skip to content

Commit 170b2f9

Browse files
claudiobastosjpkrohling
authored andcommitted
[exporter/loadbalancing] allow metrics routing (open-telemetry#26378)
Closes open-telemetry#25858 **Description:** Add metrics exporter that will balance its metrics considering new routing choices **Link to tracking Issue:** open-telemetry#25858 **Testing:** Add tests for new routing choices and metrics exporter --------- Signed-off-by: Claudio B <[email protected]> Co-authored-by: Juraci Paixão Kröhling <[email protected]>
1 parent cbcda3d commit 170b2f9

File tree

10 files changed

+1241
-4
lines changed

10 files changed

+1241
-4
lines changed

.chloggen/loadbalance-metrics.yaml

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: 'enhancement'
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: exporter/loadbalancing
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Allow metrics routing
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [25858]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

exporter/loadbalancingexporter/README.md

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,19 +4,32 @@
44
| Status | |
55
| ------------- |-----------|
66
| Stability | [beta]: traces, logs |
7+
| | [development]: metrics |
78
| Distributions | [contrib], [aws], [grafana], [observiq], [sumo] |
89
| Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Aexporter%2Floadbalancing%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aopen+is%3Aissue+label%3Aexporter%2Floadbalancing) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Aexporter%2Floadbalancing%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aclosed+is%3Aissue+label%3Aexporter%2Floadbalancing) |
910
| [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@jpkrohling](https://www.github.com/jpkrohling) |
1011

1112
[beta]: https://github.com/open-telemetry/opentelemetry-collector#beta
13+
[development]: https://github.com/open-telemetry/opentelemetry-collector#development
1214
[contrib]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib
1315
[aws]: https://github.com/aws-observability/aws-otel-collector
1416
[grafana]: https://github.com/grafana/agent
1517
[observiq]: https://github.com/observIQ/observiq-otel-collector
1618
[sumo]: https://github.com/SumoLogic/sumologic-otel-collector
1719
<!-- end autogenerated section -->
1820

19-
This is an exporter that will consistently export spans and logs depending on the `routing_key` configured. If no `routing_key` is configured, the default routing mechanism is `traceID`. This means that spans belonging to the same `traceID` (or `service.name`, when `service` is used as the `routing_key`) will be sent to the same backend.
21+
This is an exporter that will consistently export spans, metrics and logs depending on the `routing_key` configured.
22+
23+
The options for `routing_key` are: `service`, `traceID`, `metric` (metric name), `resource`.
24+
25+
| routing_key | can be used for |
26+
| ------------- |-----------|
27+
| service | logs, spans, metrics |
28+
| traceID | logs, spans |
29+
| resource | metrics |
30+
| metric | metrics |
31+
32+
If no `routing_key` is configured, the default routing mechanism is `traceID` for traces, while `service` is the default for metrics. This means that spans belonging to the same `traceID` (or `service.name`, when `service` is used as the `routing_key`) will be sent to the same backend.
2033

2134
It requires a source of backend information to be provided: static, with a fixed list of backends, or DNS, with a hostname that will resolve to all IP addresses to use. The DNS resolver will periodically check for updates.
2235

exporter/loadbalancingexporter/config.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ type routingKey int
1414
const (
1515
traceIDRouting routingKey = iota
1616
svcRouting
17+
metricNameRouting
18+
resourceRouting
1719
)
1820

1921
// Config defines configuration for the exporter.

exporter/loadbalancingexporter/factory.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ func NewFactory() exporter.Factory {
2525
createDefaultConfig,
2626
exporter.WithTraces(createTracesExporter, metadata.TracesStability),
2727
exporter.WithLogs(createLogsExporter, metadata.LogsStability),
28+
exporter.WithMetrics(createMetricsExporter, metadata.MetricsStability),
2829
)
2930
}
3031

@@ -46,3 +47,7 @@ func createTracesExporter(_ context.Context, params exporter.CreateSettings, cfg
4647
func createLogsExporter(_ context.Context, params exporter.CreateSettings, cfg component.Config) (exporter.Logs, error) {
4748
return newLogsExporter(params, cfg)
4849
}
50+
51+
func createMetricsExporter(_ context.Context, params exporter.CreateSettings, cfg component.Config) (exporter.Metrics, error) {
52+
return newMetricsExporter(params, cfg)
53+
}

exporter/loadbalancingexporter/internal/metadata/generated_status.go

Lines changed: 4 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

exporter/loadbalancingexporter/metadata.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ status:
44
class: exporter
55
stability:
66
beta: [traces, logs]
7+
development: [metrics]
78
distributions:
89
- contrib
910
- grafana
Lines changed: 215 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,215 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package loadbalancingexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/loadbalancingexporter"
5+
6+
import (
7+
"context"
8+
"errors"
9+
"fmt"
10+
"sort"
11+
"strings"
12+
"sync"
13+
"time"
14+
15+
"go.opencensus.io/stats"
16+
"go.opencensus.io/tag"
17+
"go.opentelemetry.io/collector/component"
18+
"go.opentelemetry.io/collector/consumer"
19+
"go.opentelemetry.io/collector/exporter"
20+
"go.opentelemetry.io/collector/exporter/otlpexporter"
21+
"go.opentelemetry.io/collector/pdata/pcommon"
22+
"go.opentelemetry.io/collector/pdata/pmetric"
23+
conventions "go.opentelemetry.io/collector/semconv/v1.6.1"
24+
"go.uber.org/multierr"
25+
26+
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchpersignal"
27+
)
28+
29+
var _ exporter.Metrics = (*metricExporterImp)(nil)
30+
31+
type metricExporterImp struct {
32+
loadBalancer loadBalancer
33+
routingKey routingKey
34+
35+
stopped bool
36+
shutdownWg sync.WaitGroup
37+
}
38+
39+
func newMetricsExporter(params exporter.CreateSettings, cfg component.Config) (*metricExporterImp, error) {
40+
exporterFactory := otlpexporter.NewFactory()
41+
42+
lb, err := newLoadBalancer(params, cfg, func(ctx context.Context, endpoint string) (component.Component, error) {
43+
oCfg := buildExporterConfig(cfg.(*Config), endpoint)
44+
return exporterFactory.CreateMetricsExporter(ctx, params, &oCfg)
45+
})
46+
if err != nil {
47+
return nil, err
48+
}
49+
50+
metricExporter := metricExporterImp{loadBalancer: lb, routingKey: svcRouting}
51+
52+
switch cfg.(*Config).RoutingKey {
53+
case "service", "":
54+
// default case for empty routing key
55+
metricExporter.routingKey = svcRouting
56+
case "resource":
57+
metricExporter.routingKey = resourceRouting
58+
case "metric":
59+
metricExporter.routingKey = metricNameRouting
60+
default:
61+
return nil, fmt.Errorf("unsupported routing_key: %q", cfg.(*Config).RoutingKey)
62+
}
63+
return &metricExporter, nil
64+
65+
}
66+
67+
func (e *metricExporterImp) Capabilities() consumer.Capabilities {
68+
return consumer.Capabilities{MutatesData: false}
69+
}
70+
71+
func (e *metricExporterImp) Start(ctx context.Context, host component.Host) error {
72+
return e.loadBalancer.Start(ctx, host)
73+
}
74+
75+
func (e *metricExporterImp) Shutdown(context.Context) error {
76+
e.stopped = true
77+
e.shutdownWg.Wait()
78+
return nil
79+
}
80+
81+
func (e *metricExporterImp) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error {
82+
var errs error
83+
batches := batchpersignal.SplitMetrics(md)
84+
for _, batch := range batches {
85+
errs = multierr.Append(errs, e.consumeMetric(ctx, batch))
86+
}
87+
88+
return errs
89+
}
90+
91+
func (e *metricExporterImp) consumeMetric(ctx context.Context, md pmetric.Metrics) error {
92+
var exp component.Component
93+
routingIds, err := routingIdentifiersFromMetrics(md, e.routingKey)
94+
if err != nil {
95+
return err
96+
}
97+
for rid := range routingIds {
98+
endpoint := e.loadBalancer.Endpoint([]byte(rid))
99+
exp, err = e.loadBalancer.Exporter(endpoint)
100+
if err != nil {
101+
return err
102+
}
103+
104+
te, ok := exp.(exporter.Metrics)
105+
if !ok {
106+
return fmt.Errorf("unable to export metrics, unexpected exporter type: expected exporter.Metrics but got %T", exp)
107+
}
108+
109+
start := time.Now()
110+
err = te.ConsumeMetrics(ctx, md)
111+
duration := time.Since(start)
112+
113+
if err == nil {
114+
_ = stats.RecordWithTags(
115+
ctx,
116+
[]tag.Mutator{tag.Upsert(endpointTagKey, endpoint), successTrueMutator},
117+
mBackendLatency.M(duration.Milliseconds()))
118+
} else {
119+
_ = stats.RecordWithTags(
120+
ctx,
121+
[]tag.Mutator{tag.Upsert(endpointTagKey, endpoint), successFalseMutator},
122+
mBackendLatency.M(duration.Milliseconds()))
123+
}
124+
}
125+
126+
return err
127+
}
128+
129+
func routingIdentifiersFromMetrics(mds pmetric.Metrics, key routingKey) (map[string]bool, error) {
130+
ids := make(map[string]bool)
131+
132+
// no need to test "empty labels"
133+
// no need to test "empty resources"
134+
135+
rs := mds.ResourceMetrics()
136+
if rs.Len() == 0 {
137+
return nil, errors.New("empty resource metrics")
138+
}
139+
140+
ils := rs.At(0).ScopeMetrics()
141+
if ils.Len() == 0 {
142+
return nil, errors.New("empty scope metrics")
143+
}
144+
145+
metrics := ils.At(0).Metrics()
146+
if metrics.Len() == 0 {
147+
return nil, errors.New("empty metrics")
148+
}
149+
150+
for i := 0; i < rs.Len(); i++ {
151+
resource := rs.At(i).Resource()
152+
switch key {
153+
default:
154+
case svcRouting, traceIDRouting:
155+
svc, ok := resource.Attributes().Get(conventions.AttributeServiceName)
156+
if !ok {
157+
return nil, errors.New("unable to get service name")
158+
}
159+
ids[svc.Str()] = true
160+
case metricNameRouting:
161+
sm := rs.At(i).ScopeMetrics()
162+
for j := 0; j < sm.Len(); j++ {
163+
metrics := sm.At(j).Metrics()
164+
for k := 0; k < metrics.Len(); k++ {
165+
md := metrics.At(k)
166+
rKey := metricRoutingKey(md)
167+
ids[rKey] = true
168+
}
169+
}
170+
case resourceRouting:
171+
sm := rs.At(i).ScopeMetrics()
172+
for j := 0; j < sm.Len(); j++ {
173+
metrics := sm.At(j).Metrics()
174+
for k := 0; k < metrics.Len(); k++ {
175+
md := metrics.At(k)
176+
rKey := resourceRoutingKey(md, resource.Attributes())
177+
ids[rKey] = true
178+
}
179+
}
180+
}
181+
}
182+
183+
return ids, nil
184+
185+
}
186+
187+
// maintain
188+
func sortedMapAttrs(attrs pcommon.Map) []string {
189+
keys := make([]string, 0)
190+
for k := range attrs.AsRaw() {
191+
keys = append(keys, k)
192+
}
193+
sort.Strings(keys)
194+
195+
attrsHash := make([]string, 0)
196+
for _, k := range keys {
197+
attrsHash = append(attrsHash, k)
198+
if v, ok := attrs.Get(k); ok {
199+
attrsHash = append(attrsHash, v.AsString())
200+
}
201+
}
202+
return attrsHash
203+
}
204+
205+
func resourceRoutingKey(md pmetric.Metric, attrs pcommon.Map) string {
206+
attrsHash := sortedMapAttrs(attrs)
207+
attrsHash = append(attrsHash, md.Name())
208+
routingRef := strings.Join(attrsHash, "")
209+
210+
return routingRef
211+
}
212+
213+
func metricRoutingKey(md pmetric.Metric) string {
214+
return md.Name()
215+
}

0 commit comments

Comments
 (0)