Skip to content

Commit f1d1506

Browse files
committed
wip
1 parent 8daf962 commit f1d1506

File tree

13 files changed

+588
-26
lines changed

13 files changed

+588
-26
lines changed

.chloggen/routing-by-metrics.yaml

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: routingconnector
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Add ability to route by metric context
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [36236]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: []

cmd/otelcontribcol/builder-config.yaml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ dist:
1111
name: otelcontribcol
1212
description: Local OpenTelemetry Collector Contrib binary, testing only.
1313
version: 0.112.0-dev
14-
otelcol_version: 0.112.0
1514

1615
extensions:
1716
- gomod: go.opentelemetry.io/collector/extension/zpagesextension v0.112.0

connector/routingconnector/README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ If you are not already familiar with connectors, you may find it helpful to firs
3333
The following settings are available:
3434

3535
- `table (required)`: the routing table for this connector.
36-
- `table.context (optional, default: resource)`: the [OTTL Context] in which the statement will be evaluated. Currently, only `resource`, `log`, and `request` are supported.
36+
- `table.context (optional, default: resource)`: the [OTTL Context] in which the statement will be evaluated. Currently, only `resource`, `metric`, `log`, and `request` are supported.
3737
- `table.statement`: the routing condition provided as the [OTTL] statement. Required if `table.condition` is not provided. May not be used for `request` context.
3838
- `table.condition`: the routing condition provided as the [OTTL] condition. Required if `table.statement` is not provided. Required for `request` context.
3939
- `table.pipelines (required)`: the list of pipelines to use when the routing condition is met.
@@ -43,7 +43,7 @@ The following settings are available:
4343

4444
### Limitations
4545

46-
- The `match_once` setting is only supported when using the `resource` context. If any routes use `log` or `request` context, `match_once` must be set to `true`.
46+
- The `match_once` setting is only supported when using the `resource` context. If any routes use `metric`, `log` or `request` context, `match_once` must be set to `true`.
4747
- The `request` context requires use of the `condition` setting, and relies on a very limited grammar. Conditions must be in the form of `request["key"] == "value"` or `request["key"] != "value"`. (In the future, this grammar may be expanded to support more complex conditions.)
4848

4949
### Supported [OTTL] functions

connector/routingconnector/config.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ func (c *Config) Validate() error {
7777
return err
7878
}
7979
fallthrough
80-
case "log": // ok
80+
case "metric", "log": // ok
8181
if !c.MatchOnce {
8282
return fmt.Errorf(`%q context is not supported with "match_once: false"`, item.Context)
8383
}

connector/routingconnector/config_test.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,22 @@ func TestValidateConfig(t *testing.T) {
218218
},
219219
error: "invalid context: invalid",
220220
},
221+
{
222+
name: "metric context with match_once false",
223+
config: &Config{
224+
MatchOnce: false,
225+
Table: []RoutingTableItem{
226+
{
227+
Context: "metric",
228+
Statement: `route() where attributes["attr"] == "acme"`,
229+
Pipelines: []pipeline.ID{
230+
pipeline.NewIDWithName(pipeline.SignalTraces, "otlp"),
231+
},
232+
},
233+
},
234+
},
235+
error: `"metric" context is not supported with "match_once: false"`,
236+
},
221237
{
222238
name: "log context with match_once false",
223239
config: &Config{

connector/routingconnector/internal/pmetricutil/metrics.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,3 +16,46 @@ func MoveResourcesIf(from, to pmetric.Metrics, f func(pmetric.ResourceMetrics) b
1616
return true
1717
})
1818
}
19+
20+
// MoveMetricsWithContextIf calls f sequentially for each Metric present in the first pmetric.Metrics.
21+
// If f returns true, the element is removed from the first pmetric.Metrics and added to the second pmetric.Metrics.
22+
// Notably, the Resource and Scope associated with the Metric are created in the second pmetric.Metrics only once.
23+
// Resources or Scopes are removed from the original if they become empty. All ordering is preserved.
24+
func MoveMetricsWithContextIf(from, to pmetric.Metrics, f func(pmetric.ResourceMetrics, pmetric.ScopeMetrics, pmetric.Metric) bool) {
25+
rms := from.ResourceMetrics()
26+
for i := 0; i < rms.Len(); i++ {
27+
rm := rms.At(i)
28+
sms := rm.ScopeMetrics()
29+
var rmCopy *pmetric.ResourceMetrics
30+
for j := 0; j < sms.Len(); j++ {
31+
sm := sms.At(j)
32+
ms := sm.Metrics()
33+
var smCopy *pmetric.ScopeMetrics
34+
ms.RemoveIf(func(m pmetric.Metric) bool {
35+
if !f(rm, sm, m) {
36+
return false
37+
}
38+
if rmCopy == nil {
39+
rmc := to.ResourceMetrics().AppendEmpty()
40+
rmCopy = &rmc
41+
rm.Resource().CopyTo(rmCopy.Resource())
42+
rmCopy.SetSchemaUrl(rm.SchemaUrl())
43+
}
44+
if smCopy == nil {
45+
smc := rmCopy.ScopeMetrics().AppendEmpty()
46+
smCopy = &smc
47+
sm.Scope().CopyTo(smCopy.Scope())
48+
smCopy.SetSchemaUrl(sm.SchemaUrl())
49+
}
50+
m.CopyTo(smCopy.Metrics().AppendEmpty())
51+
return true
52+
})
53+
}
54+
sms.RemoveIf(func(sm pmetric.ScopeMetrics) bool {
55+
return sm.Metrics().Len() == 0
56+
})
57+
}
58+
rms.RemoveIf(func(rm pmetric.ResourceMetrics) bool {
59+
return rm.ScopeMetrics().Len() == 0
60+
})
61+
}

connector/routingconnector/internal/pmetricutil/metrics_test.go

Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,3 +80,147 @@ func TestMoveResourcesIf(t *testing.T) {
8080
})
8181
}
8282
}
83+
84+
func TestMoveMetricsWithContextIf(t *testing.T) {
85+
testCases := []struct {
86+
name string
87+
moveIf func(pmetric.ResourceMetrics, pmetric.ScopeMetrics, pmetric.Metric) bool
88+
from pmetric.Metrics
89+
to pmetric.Metrics
90+
expectFrom pmetric.Metrics
91+
expectTo pmetric.Metrics
92+
}{
93+
{
94+
name: "move_none",
95+
moveIf: func(_ pmetric.ResourceMetrics, _ pmetric.ScopeMetrics, _ pmetric.Metric) bool {
96+
return false
97+
},
98+
from: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"),
99+
to: pmetric.NewMetrics(),
100+
expectFrom: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"),
101+
expectTo: pmetric.NewMetrics(),
102+
},
103+
{
104+
name: "move_all",
105+
moveIf: func(_ pmetric.ResourceMetrics, _ pmetric.ScopeMetrics, _ pmetric.Metric) bool {
106+
return true
107+
},
108+
from: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"),
109+
to: pmetric.NewMetrics(),
110+
expectFrom: pmetric.NewMetrics(),
111+
expectTo: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"),
112+
},
113+
{
114+
name: "move_all_from_one_resource",
115+
moveIf: func(rl pmetric.ResourceMetrics, _ pmetric.ScopeMetrics, _ pmetric.Metric) bool {
116+
rname, ok := rl.Resource().Attributes().Get("resourceName")
117+
return ok && rname.AsString() == "resourceB"
118+
},
119+
from: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"),
120+
to: pmetric.NewMetrics(),
121+
expectFrom: pmetricutiltest.NewMetrics("A", "CD", "EF", "GH"),
122+
expectTo: pmetricutiltest.NewMetrics("B", "CD", "EF", "GH"),
123+
},
124+
{
125+
name: "move_all_from_one_scope",
126+
moveIf: func(rl pmetric.ResourceMetrics, sl pmetric.ScopeMetrics, _ pmetric.Metric) bool {
127+
rname, ok := rl.Resource().Attributes().Get("resourceName")
128+
return ok && rname.AsString() == "resourceB" && sl.Scope().Name() == "scopeC"
129+
},
130+
from: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"),
131+
to: pmetric.NewMetrics(),
132+
expectFrom: pmetricutiltest.NewMetricsFromOpts(
133+
pmetricutiltest.WithResource('A',
134+
pmetricutiltest.WithScope('C', pmetricutiltest.WithMetric('E', "GH"), pmetricutiltest.WithMetric('F', "GH")),
135+
pmetricutiltest.WithScope('D', pmetricutiltest.WithMetric('E', "GH"), pmetricutiltest.WithMetric('F', "GH")),
136+
),
137+
pmetricutiltest.WithResource('B',
138+
pmetricutiltest.WithScope('D', pmetricutiltest.WithMetric('E', "GH"), pmetricutiltest.WithMetric('F', "GH")),
139+
),
140+
),
141+
expectTo: pmetricutiltest.NewMetrics("B", "C", "EF", "GH"),
142+
},
143+
{
144+
name: "move_all_from_one_scope_in_each_resource",
145+
moveIf: func(_ pmetric.ResourceMetrics, sl pmetric.ScopeMetrics, _ pmetric.Metric) bool {
146+
return sl.Scope().Name() == "scopeD"
147+
},
148+
from: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"),
149+
to: pmetric.NewMetrics(),
150+
expectFrom: pmetricutiltest.NewMetrics("AB", "C", "EF", "GH"),
151+
expectTo: pmetricutiltest.NewMetrics("AB", "D", "EF", "GH"),
152+
},
153+
{
154+
name: "move_one",
155+
moveIf: func(rl pmetric.ResourceMetrics, sl pmetric.ScopeMetrics, m pmetric.Metric) bool {
156+
rname, ok := rl.Resource().Attributes().Get("resourceName")
157+
return ok && rname.AsString() == "resourceA" && sl.Scope().Name() == "scopeD" && m.Name() == "metricF"
158+
},
159+
from: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"),
160+
to: pmetric.NewMetrics(),
161+
expectFrom: pmetricutiltest.NewMetricsFromOpts(
162+
pmetricutiltest.WithResource('A',
163+
pmetricutiltest.WithScope('C', pmetricutiltest.WithMetric('E', "GH"), pmetricutiltest.WithMetric('F', "GH")),
164+
pmetricutiltest.WithScope('D', pmetricutiltest.WithMetric('E', "GH")),
165+
),
166+
pmetricutiltest.WithResource('B',
167+
pmetricutiltest.WithScope('C', pmetricutiltest.WithMetric('E', "GH"), pmetricutiltest.WithMetric('F', "GH")),
168+
pmetricutiltest.WithScope('D', pmetricutiltest.WithMetric('E', "GH"), pmetricutiltest.WithMetric('F', "GH")),
169+
),
170+
),
171+
expectTo: pmetricutiltest.NewMetrics("A", "D", "F", "GH"),
172+
},
173+
{
174+
name: "move_one_from_each_scope",
175+
moveIf: func(_ pmetric.ResourceMetrics, _ pmetric.ScopeMetrics, m pmetric.Metric) bool {
176+
return m.Name() == "metricE"
177+
},
178+
from: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"),
179+
to: pmetric.NewMetrics(),
180+
expectFrom: pmetricutiltest.NewMetrics("AB", "CD", "F", "GH"),
181+
expectTo: pmetricutiltest.NewMetrics("AB", "CD", "E", "GH"),
182+
},
183+
{
184+
name: "move_one_from_each_scope_in_one_resource",
185+
moveIf: func(rl pmetric.ResourceMetrics, _ pmetric.ScopeMetrics, m pmetric.Metric) bool {
186+
rname, ok := rl.Resource().Attributes().Get("resourceName")
187+
return ok && rname.AsString() == "resourceB" && m.Name() == "metricE"
188+
},
189+
from: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"),
190+
to: pmetric.NewMetrics(),
191+
expectFrom: pmetricutiltest.NewMetricsFromOpts(
192+
pmetricutiltest.WithResource('A',
193+
pmetricutiltest.WithScope('C', pmetricutiltest.WithMetric('E', "GH"), pmetricutiltest.WithMetric('F', "GH")),
194+
pmetricutiltest.WithScope('D', pmetricutiltest.WithMetric('E', "GH"), pmetricutiltest.WithMetric('F', "GH")),
195+
),
196+
pmetricutiltest.WithResource('B',
197+
pmetricutiltest.WithScope('C', pmetricutiltest.WithMetric('F', "GH")),
198+
pmetricutiltest.WithScope('D', pmetricutiltest.WithMetric('F', "GH")),
199+
),
200+
),
201+
expectTo: pmetricutiltest.NewMetrics("B", "CD", "E", "GH"),
202+
},
203+
{
204+
name: "move_some_to_preexisting",
205+
moveIf: func(_ pmetric.ResourceMetrics, sl pmetric.ScopeMetrics, _ pmetric.Metric) bool {
206+
return sl.Scope().Name() == "scopeD"
207+
},
208+
from: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"),
209+
to: pmetricutiltest.NewMetrics("1", "2", "3", "4"),
210+
expectFrom: pmetricutiltest.NewMetrics("AB", "C", "EF", "GH"),
211+
expectTo: pmetricutiltest.NewMetricsFromOpts(
212+
pmetricutiltest.WithResource('1', pmetricutiltest.WithScope('2', pmetricutiltest.WithMetric('3', "4"))),
213+
pmetricutiltest.WithResource('A', pmetricutiltest.WithScope('D', pmetricutiltest.WithMetric('E', "GH"), pmetricutiltest.WithMetric('F', "GH"))),
214+
pmetricutiltest.WithResource('B', pmetricutiltest.WithScope('D', pmetricutiltest.WithMetric('E', "GH"), pmetricutiltest.WithMetric('F', "GH"))),
215+
),
216+
},
217+
}
218+
219+
for _, tt := range testCases {
220+
t.Run(tt.name, func(t *testing.T) {
221+
pmetricutil.MoveMetricsWithContextIf(tt.from, tt.to, tt.moveIf)
222+
assert.NoError(t, pmetrictest.CompareMetrics(tt.expectFrom, tt.from), "from not modified as expected")
223+
assert.NoError(t, pmetrictest.CompareMetrics(tt.expectTo, tt.to), "to not as expected")
224+
})
225+
}
226+
}

connector/routingconnector/internal/pmetricutiltest/metrics.go

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,3 +43,59 @@ func NewMetrics(resourceIDs, scopeIDs, metricIDs, dataPointIDs string) pmetric.M
4343
}
4444
return md
4545
}
46+
47+
type Resource struct {
48+
id byte
49+
scopes []Scope
50+
}
51+
52+
type Scope struct {
53+
id byte
54+
metrics []Metric
55+
}
56+
57+
type Metric struct {
58+
id byte
59+
dataPoints string
60+
}
61+
62+
func WithResource(id byte, scopes ...Scope) Resource {
63+
r := Resource{id: id}
64+
r.scopes = append(r.scopes, scopes...)
65+
return r
66+
}
67+
68+
func WithScope(id byte, metrics ...Metric) Scope {
69+
s := Scope{id: id}
70+
s.metrics = append(s.metrics, metrics...)
71+
return s
72+
}
73+
74+
func WithMetric(id byte, dataPoints string) Metric {
75+
return Metric{id: id, dataPoints: dataPoints}
76+
}
77+
78+
// NewMetricsFromOpts creates a pmetric.Metrics with the specified resources, scopes, metrics,
79+
// and data points. The general idea is the same as NewMetrics, but this function allows for
80+
// more flexibility in creating non-uniform structures.
81+
func NewMetricsFromOpts(resources ...Resource) pmetric.Metrics {
82+
md := pmetric.NewMetrics()
83+
for _, resource := range resources {
84+
r := md.ResourceMetrics().AppendEmpty()
85+
r.Resource().Attributes().PutStr("resourceName", "resource"+string(resource.id))
86+
for _, scope := range resource.scopes {
87+
s := r.ScopeMetrics().AppendEmpty()
88+
s.Scope().SetName("scope" + string(scope.id))
89+
for _, metric := range scope.metrics {
90+
m := s.Metrics().AppendEmpty()
91+
m.SetName("metric" + string(metric.id))
92+
dps := m.SetEmptyGauge().DataPoints()
93+
for i := 0; i < len(metric.dataPoints); i++ {
94+
dp := dps.AppendEmpty()
95+
dp.Attributes().PutStr("dpName", "dp"+string(metric.dataPoints[i]))
96+
}
97+
}
98+
}
99+
}
100+
return md
101+
}

0 commit comments

Comments
 (0)