Skip to content

Commit 4302c5b

Browse files
diurnalistAlex Boten
andauthored
[statsdreceiver] support simple tags (#29012)
dogstatsd supports two types of tags on metrics: simple and dimensional tags[^1]. the former is just a key, the latter is a key and a value. with the assumption that many users of the statsdreceiver are enabling ingest of dogstatsd metrics, this makes the statsd parser more optimistic, so it can handle tags w/o a value. this functionality is gated behind a new `enable_simple_tags` flag. when this happens, we set an attribute that has a zero value. so far as i know, this is allowed in the opentelemetry spec. the decision of how to handle attributes w/ zero values is best left to configuration w/in the pipeline itself, as different users may have different opinions or approaches that work best with their systems. [^1]: https://www.datadoghq.com/blog/the-power-of-tagged-metrics/#whats-a-metric-tag **Testing:** added coverage to unit tests to enable parsing simple tags. --------- Co-authored-by: Alex Boten <[email protected]>
1 parent 80deba0 commit 4302c5b

File tree

7 files changed

+126
-22
lines changed

7 files changed

+126
-22
lines changed
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: "enhancement"
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: "statsdreceiver"
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Add support for 'simple' tags that do not have a defined value, to accommodate DogStatsD metrics that may utilize these.
11+
12+
13+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
14+
issues: [29012]
15+
16+
# (Optional) One or more lines of additional information to render under the primary note.
17+
# These lines will be padded with 2 spaces and then inserted directly into the document.
18+
# Use pipe (|) for multiline entries.
19+
subtext: "This functionality is gated behind a new `enable_simple_tags` config boolean, as it is not part of the StatsD spec."
20+
21+
# If your change doesn't affect end users or the exported elements of any package,
22+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
23+
# Optional: The change log or logs in which this entry should be included.
24+
# e.g. '[user]' or '[user, api]'
25+
# Include 'user' if the change is relevant to end users.
26+
# Include 'api' if there is a change to a library API.
27+
# Default: '[user]'
28+
change_logs: ['user']

receiver/statsdreceiver/README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ The Following settings are optional:
3333

3434
- `enable_metric_type: true`(default value is false): Enable the statsd receiver to be able to emit the metric type(gauge, counter, timer(in the future), histogram(in the future)) as a label.
3535

36+
- `enable_simple_tags: true`(default value is false): Enable parsing tags that do not have a value, e.g. `#mykey` instead of `#mykey:myvalue`. DogStatsD supports such tagging.
37+
3638
- `is_monotonic_counter` (default value is false): Set all counter-type metrics the statsd receiver received as monotonic.
3739

3840
- `timer_histogram_mapping:`(default value is below): Specify what OTLP type to convert received timing/histogram data to.

receiver/statsdreceiver/config.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ type Config struct {
1919
NetAddr confignet.NetAddr `mapstructure:",squash"`
2020
AggregationInterval time.Duration `mapstructure:"aggregation_interval"`
2121
EnableMetricType bool `mapstructure:"enable_metric_type"`
22+
EnableSimpleTags bool `mapstructure:"enable_simple_tags"`
2223
IsMonotonicCounter bool `mapstructure:"is_monotonic_counter"`
2324
TimerHistogramMapping []protocol.TimerHistogramMapping `mapstructure:"timer_histogram_mapping"`
2425
}

receiver/statsdreceiver/internal/protocol/parser.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import (
1212

1313
// Parser is something that can map input StatsD strings to OTLP Metric representations.
1414
type Parser interface {
15-
Initialize(enableMetricType bool, isMonotonicCounter bool, sendTimerHistogram []TimerHistogramMapping) error
15+
Initialize(enableMetricType bool, enableSimpleTags bool, isMonotonicCounter bool, sendTimerHistogram []TimerHistogramMapping) error
1616
GetMetrics() []BatchMetrics
1717
Aggregate(line string, addr net.Addr) error
1818
}

receiver/statsdreceiver/internal/protocol/statsd_parser.go

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ var defaultObserverCategory = ObserverCategory{
7979
type StatsDParser struct {
8080
instrumentsByAddress map[netAddr]*instruments
8181
enableMetricType bool
82+
enableSimpleTags bool
8283
isMonotonicCounter bool
8384
timerEvents ObserverCategory
8485
histogramEvents ObserverCategory
@@ -156,12 +157,13 @@ func (p *StatsDParser) resetState(when time.Time) {
156157
p.instrumentsByAddress = make(map[netAddr]*instruments)
157158
}
158159

159-
func (p *StatsDParser) Initialize(enableMetricType bool, isMonotonicCounter bool, sendTimerHistogram []TimerHistogramMapping) error {
160+
func (p *StatsDParser) Initialize(enableMetricType bool, enableSimpleTags bool, isMonotonicCounter bool, sendTimerHistogram []TimerHistogramMapping) error {
160161
p.resetState(timeNowFunc())
161162

162163
p.histogramEvents = defaultObserverCategory
163164
p.timerEvents = defaultObserverCategory
164165
p.enableMetricType = enableMetricType
166+
p.enableSimpleTags = enableSimpleTags
165167
p.isMonotonicCounter = isMonotonicCounter
166168
// Note: validation occurs in ("../".Config).validate()
167169
for _, eachMap := range sendTimerHistogram {
@@ -270,7 +272,7 @@ func (p *StatsDParser) observerCategoryFor(t MetricType) ObserverCategory {
270272

271273
// Aggregate for each metric line.
272274
func (p *StatsDParser) Aggregate(line string, addr net.Addr) error {
273-
parsedMetric, err := parseMessageToMetric(line, p.enableMetricType)
275+
parsedMetric, err := parseMessageToMetric(line, p.enableMetricType, p.enableSimpleTags)
274276
if err != nil {
275277
return err
276278
}
@@ -349,7 +351,7 @@ func (p *StatsDParser) Aggregate(line string, addr net.Addr) error {
349351
return nil
350352
}
351353

352-
func parseMessageToMetric(line string, enableMetricType bool) (statsDMetric, error) {
354+
func parseMessageToMetric(line string, enableMetricType bool, enableSimpleTags bool) (statsDMetric, error) {
353355
result := statsDMetric{}
354356

355357
parts := strings.Split(line, "|")
@@ -410,11 +412,22 @@ func parseMessageToMetric(line string, enableMetricType bool) (statsDMetric, err
410412

411413
for _, tagSet := range tagSets {
412414
tagParts := strings.SplitN(tagSet, ":", 2)
413-
if len(tagParts) != 2 {
414-
return result, fmt.Errorf("invalid tag format: %s", tagParts)
415-
}
416415
k := tagParts[0]
417-
v := tagParts[1]
416+
if k == "" {
417+
return result, fmt.Errorf("invalid tag format: %q", tagSet)
418+
}
419+
420+
// support both simple tags (w/o value) and dimension tags (w/ value).
421+
// dogstatsd notably allows simple tags.
422+
var v string
423+
if len(tagParts) == 2 {
424+
v = tagParts[1]
425+
}
426+
427+
if v == "" && !enableSimpleTags {
428+
return result, fmt.Errorf("invalid tag format: %q", tagSet)
429+
}
430+
418431
kvs = append(kvs, attribute.String(k, v))
419432
}
420433
default:

receiver/statsdreceiver/internal/protocol/statsd_parser_test.go

Lines changed: 73 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,8 @@ func Test_ParseMessageToMetric(t *testing.T) {
5353
},
5454
{
5555
name: "invalid tag format",
56-
input: "test.metric:42|c|#key1",
57-
err: errors.New("invalid tag format: [key1]"),
56+
input: "test.metric:42|c|#:val1",
57+
err: errors.New("invalid tag format: \":val1\""),
5858
},
5959
{
6060
name: "unrecognized message part",
@@ -235,7 +235,7 @@ func Test_ParseMessageToMetric(t *testing.T) {
235235

236236
for _, tt := range tests {
237237
t.Run(tt.name, func(t *testing.T) {
238-
got, err := parseMessageToMetric(tt.input, false)
238+
got, err := parseMessageToMetric(tt.input, false, false)
239239

240240
if tt.err != nil {
241241
assert.Equal(t, tt.err, err)
@@ -433,7 +433,66 @@ func Test_ParseMessageToMetricWithMetricType(t *testing.T) {
433433

434434
for _, tt := range tests {
435435
t.Run(tt.name, func(t *testing.T) {
436-
got, err := parseMessageToMetric(tt.input, true)
436+
got, err := parseMessageToMetric(tt.input, true, false)
437+
438+
if tt.err != nil {
439+
assert.Equal(t, tt.err, err)
440+
} else {
441+
assert.NoError(t, err)
442+
assert.Equal(t, tt.wantMetric, got)
443+
}
444+
})
445+
}
446+
}
447+
448+
func Test_ParseMessageToMetricWithSimpleTags(t *testing.T) {
449+
tests := []struct {
450+
name string
451+
input string
452+
wantMetric statsDMetric
453+
err error
454+
}{
455+
{
456+
name: "counter metric with sample rate and (dimensional) tag",
457+
input: "test.metric:42|c|@0.1|#key:value",
458+
wantMetric: testStatsDMetric(
459+
"test.metric",
460+
42,
461+
false,
462+
"c",
463+
0.1,
464+
[]string{"key"},
465+
[]string{"value"}),
466+
},
467+
{
468+
name: "counter metric with sample rate and (simple) tag",
469+
input: "test.metric:42|c|@0.1|#key",
470+
wantMetric: testStatsDMetric(
471+
"test.metric",
472+
42,
473+
false,
474+
"c",
475+
0.1,
476+
[]string{"key"},
477+
[]string{""}),
478+
},
479+
{
480+
name: "counter metric with sample rate and two (simple) tags",
481+
input: "test.metric:42|c|@0.1|#key,key2",
482+
wantMetric: testStatsDMetric(
483+
"test.metric",
484+
42,
485+
false,
486+
"c",
487+
0.1,
488+
[]string{"key", "key2"},
489+
[]string{"", ""}),
490+
},
491+
}
492+
493+
for _, tt := range tests {
494+
t.Run(tt.name, func(t *testing.T) {
495+
got, err := parseMessageToMetric(tt.input, false, true)
437496

438497
if tt.err != nil {
439498
assert.Equal(t, tt.err, err)
@@ -677,7 +736,7 @@ func TestStatsDParser_Aggregate(t *testing.T) {
677736
t.Run(tt.name, func(t *testing.T) {
678737
var err error
679738
p := &StatsDParser{}
680-
assert.NoError(t, p.Initialize(false, false, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "gauge"}, {StatsdType: "histogram", ObserverType: "gauge"}}))
739+
assert.NoError(t, p.Initialize(false, false, false, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "gauge"}, {StatsdType: "histogram", ObserverType: "gauge"}}))
681740
p.lastIntervalTime = time.Unix(611, 0)
682741
addr, _ := net.ResolveUDPAddr("udp", "1.2.3.4:5678")
683742
addrKey := newNetAddr(addr)
@@ -746,7 +805,7 @@ func TestStatsDParser_AggregateByAddress(t *testing.T) {
746805
for _, tt := range tests {
747806
t.Run(tt.name, func(t *testing.T) {
748807
p := &StatsDParser{}
749-
assert.NoError(t, p.Initialize(true, false, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "gauge"}, {StatsdType: "histogram", ObserverType: "gauge"}}))
808+
assert.NoError(t, p.Initialize(true, false, false, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "gauge"}, {StatsdType: "histogram", ObserverType: "gauge"}}))
750809
p.lastIntervalTime = time.Unix(611, 0)
751810
for i, addr := range tt.addresses {
752811
for _, line := range tt.input[i] {
@@ -814,7 +873,7 @@ func TestStatsDParser_AggregateWithMetricType(t *testing.T) {
814873
t.Run(tt.name, func(t *testing.T) {
815874
var err error
816875
p := &StatsDParser{}
817-
assert.NoError(t, p.Initialize(true, false, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "gauge"}, {StatsdType: "histogram", ObserverType: "gauge"}}))
876+
assert.NoError(t, p.Initialize(true, false, false, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "gauge"}, {StatsdType: "histogram", ObserverType: "gauge"}}))
818877
p.lastIntervalTime = time.Unix(611, 0)
819878
addr, _ := net.ResolveUDPAddr("udp", "1.2.3.4:5678")
820879
addrKey := newNetAddr(addr)
@@ -864,7 +923,7 @@ func TestStatsDParser_AggregateWithIsMonotonicCounter(t *testing.T) {
864923
t.Run(tt.name, func(t *testing.T) {
865924
var err error
866925
p := &StatsDParser{}
867-
assert.NoError(t, p.Initialize(false, true, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "gauge"}, {StatsdType: "histogram", ObserverType: "gauge"}}))
926+
assert.NoError(t, p.Initialize(false, false, true, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "gauge"}, {StatsdType: "histogram", ObserverType: "gauge"}}))
868927
p.lastIntervalTime = time.Unix(611, 0)
869928
addr, _ := net.ResolveUDPAddr("udp", "1.2.3.4:5678")
870929
addrKey := newNetAddr(addr)
@@ -986,7 +1045,7 @@ func TestStatsDParser_AggregateTimerWithSummary(t *testing.T) {
9861045
t.Run(tt.name, func(t *testing.T) {
9871046
var err error
9881047
p := &StatsDParser{}
989-
assert.NoError(t, p.Initialize(false, false, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "summary"}, {StatsdType: "histogram", ObserverType: "summary"}}))
1048+
assert.NoError(t, p.Initialize(false, false, false, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "summary"}, {StatsdType: "histogram", ObserverType: "summary"}}))
9901049
addr, _ := net.ResolveUDPAddr("udp", "1.2.3.4:5678")
9911050
addrKey := newNetAddr(addr)
9921051
for _, line := range tt.input {
@@ -1003,7 +1062,7 @@ func TestStatsDParser_AggregateTimerWithSummary(t *testing.T) {
10031062

10041063
func TestStatsDParser_Initialize(t *testing.T) {
10051064
p := &StatsDParser{}
1006-
assert.NoError(t, p.Initialize(true, false, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "gauge"}, {StatsdType: "histogram", ObserverType: "gauge"}}))
1065+
assert.NoError(t, p.Initialize(true, false, false, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "gauge"}, {StatsdType: "histogram", ObserverType: "gauge"}}))
10071066
teststatsdDMetricdescription := statsDMetricDescription{
10081067
name: "test",
10091068
metricType: "g",
@@ -1022,7 +1081,7 @@ func TestStatsDParser_Initialize(t *testing.T) {
10221081

10231082
func TestStatsDParser_GetMetricsWithMetricType(t *testing.T) {
10241083
p := &StatsDParser{}
1025-
assert.NoError(t, p.Initialize(true, false, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "gauge"}, {StatsdType: "histogram", ObserverType: "gauge"}}))
1084+
assert.NoError(t, p.Initialize(true, false, false, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "gauge"}, {StatsdType: "histogram", ObserverType: "gauge"}}))
10261085
instrument := newInstruments(nil)
10271086
instrument.gauges[testDescription("statsdTestMetric1", "g",
10281087
[]string{"mykey", "metric_type"}, []string{"myvalue", "gauge"})] = buildGaugeMetric(testStatsDMetric("testGauge1", 1, false, "g", 0, []string{"mykey", "metric_type"}, []string{"myvalue", "gauge"}), time.Unix(711, 0))
@@ -1095,7 +1154,7 @@ func TestStatsDParser_Mappings(t *testing.T) {
10951154
t.Run(tc.name, func(t *testing.T) {
10961155
p := &StatsDParser{}
10971156

1098-
assert.NoError(t, p.Initialize(false, false, tc.mapping))
1157+
assert.NoError(t, p.Initialize(false, false, false, tc.mapping))
10991158

11001159
addr, _ := net.ResolveUDPAddr("udp", "1.2.3.4:5678")
11011160
assert.NoError(t, p.Aggregate("H:10|h", addr))
@@ -1129,7 +1188,7 @@ func TestStatsDParser_ScopeIsIncluded(t *testing.T) {
11291188
}
11301189
testAddress, _ := net.ResolveUDPAddr("udp", "1.2.3.4:5678")
11311190

1132-
err := p.Initialize(true, false,
1191+
err := p.Initialize(true, false, false,
11331192
[]TimerHistogramMapping{
11341193
{StatsdType: "timer", ObserverType: "summary"},
11351194
{StatsdType: "histogram", ObserverType: "histogram"},
@@ -1399,7 +1458,7 @@ func TestStatsDParser_AggregateTimerWithHistogram(t *testing.T) {
13991458
t.Run(tt.name, func(t *testing.T) {
14001459
var err error
14011460
p := &StatsDParser{}
1402-
assert.NoError(t, p.Initialize(false, false, tt.mapping))
1461+
assert.NoError(t, p.Initialize(false, false, false, tt.mapping))
14031462
addr, _ := net.ResolveUDPAddr("udp", "1.2.3.4:5678")
14041463
for _, line := range tt.input {
14051464
err = p.Aggregate(line, addr)

receiver/statsdreceiver/receiver.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ func (r *statsdReceiver) Start(ctx context.Context, host component.Host) error {
9191
ticker := time.NewTicker(r.config.AggregationInterval)
9292
err = r.parser.Initialize(
9393
r.config.EnableMetricType,
94+
r.config.EnableSimpleTags,
9495
r.config.IsMonotonicCounter,
9596
r.config.TimerHistogramMapping,
9697
)

0 commit comments

Comments
 (0)