Skip to content

Commit d23e351

Browse files
metricstarttimeprocessor: Implementation of the subtractinitial strategy (#38594)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description Addresses #38379. Some of this logic already existed in the [GCM exporter](https://github.com/GoogleCloudPlatform/opentelemetry-operations-go/blob/main/exporter/collector/internal/normalization/standard_normalizer.go). For each resource, and for each timeseries - Skip if the point is not a cumulative sum, histogram, exp histogram, or summary. - Lookup the timeseries in the cache. If it is not present, drop this point from the batch, and store the timestamp and value (including bucket counts, etc) in the cache. - Subtract cumulative values (e.g. sum, bucket counts) of the cached timeseries from the current timeseries, and set the start timestamp to the cached point's timestamp. <!-- Issue number (e.g. #1234) or full URL to issue, if applicable. --> #### Link to tracking issue Fixes #38379. <!--Describe what testing was performed and which tests were added.--> #### Testing Added unit and integration tests. <!--Describe the documentation added.--> #### Documentation Readme already covers this strategy <!--Please delete paragraphs that you did not use before submitting.--> --------- Signed-off-by: Ridwan Sharif <[email protected]>
1 parent 5db059a commit d23e351

File tree

8 files changed

+1776
-518
lines changed

8 files changed

+1776
-518
lines changed

.chloggen/subtract-strategy-v2.yaml

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: metricstarttimeprocessor
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Implementation of the subtractinitial strategy
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [38379]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: []

processor/metricstarttimeprocessor/internal/datapointstorage/timeseries_map.go

Lines changed: 25 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -101,20 +101,20 @@ func (tsm *TimeseriesMap) GC() {
101101
tsm.Mark = false
102102
}
103103

104-
// IsResetHistogram compares the given histogram datapoint h, to tsi.Histogram
104+
// IsResetHistogram compares the given histogram datapoint h, to ref
105105
// and determines whether the metric has been reset based on the values. It is
106106
// a reset if any of the bucket boundaries have changed, if any of the bucket
107107
// counts have decreased or if the total sum or count have decreased.
108-
func (tsi *TimeseriesInfo) IsResetHistogram(h pmetric.HistogramDataPoint) bool {
109-
if h.Count() < tsi.Histogram.Count() {
108+
func IsResetHistogram(h, ref pmetric.HistogramDataPoint) bool {
109+
if h.Count() < ref.Count() {
110110
return true
111111
}
112-
if h.Sum() < tsi.Histogram.Sum() {
112+
if h.Sum() < ref.Sum() {
113113
return true
114114
}
115115

116116
// Guard against bucket boundaries changes.
117-
refBounds := tsi.Histogram.ExplicitBounds().AsRaw()
117+
refBounds := ref.ExplicitBounds().AsRaw()
118118
hBounds := h.ExplicitBounds().AsRaw()
119119
if len(refBounds) != len(hBounds) {
120120
return true
@@ -126,69 +126,69 @@ func (tsi *TimeseriesInfo) IsResetHistogram(h pmetric.HistogramDataPoint) bool {
126126
}
127127

128128
// We need to check individual buckets to make sure the counts are all increasing.
129-
if tsi.Histogram.BucketCounts().Len() != h.BucketCounts().Len() {
129+
if ref.BucketCounts().Len() != h.BucketCounts().Len() {
130130
return true
131131
}
132-
for i := range tsi.Histogram.BucketCounts().Len() {
133-
if h.BucketCounts().At(i) < tsi.Histogram.BucketCounts().At(i) {
132+
for i := range ref.BucketCounts().Len() {
133+
if h.BucketCounts().At(i) < ref.BucketCounts().At(i) {
134134
return true
135135
}
136136
}
137137
return false
138138
}
139139

140140
// IsResetExponentialHistogram compares the given exponential histogram
141-
// datapoint eh, to tsi.ExponentialHistogram and determines whether the metric
141+
// datapoint eh, to ref and determines whether the metric
142142
// has been reset based on the values. It is a reset if any of the bucket
143143
// boundaries have changed, if any of the bucket counts have decreased or if the
144144
// total sum or count have decreased.
145-
func (tsi *TimeseriesInfo) IsResetExponentialHistogram(eh pmetric.ExponentialHistogramDataPoint) bool {
145+
func IsResetExponentialHistogram(eh, ref pmetric.ExponentialHistogramDataPoint) bool {
146146
// Same as the histogram implementation
147-
if eh.Count() < tsi.ExponentialHistogram.Count() {
147+
if eh.Count() < ref.Count() {
148148
return true
149149
}
150-
if eh.Sum() < tsi.ExponentialHistogram.Sum() {
150+
if eh.Sum() < ref.Sum() {
151151
return true
152152
}
153153

154154
// Guard against bucket boundaries changes.
155-
if tsi.ExponentialHistogram.Scale() != eh.Scale() {
155+
if ref.Scale() != eh.Scale() {
156156
return true
157157
}
158158

159159
// We need to check individual buckets to make sure the counts are all increasing.
160-
if tsi.ExponentialHistogram.Positive().BucketCounts().Len() != eh.Positive().BucketCounts().Len() {
160+
if ref.Positive().BucketCounts().Len() != eh.Positive().BucketCounts().Len() {
161161
return true
162162
}
163-
for i := range tsi.ExponentialHistogram.Positive().BucketCounts().Len() {
164-
if eh.Positive().BucketCounts().At(i) < tsi.ExponentialHistogram.Positive().BucketCounts().At(i) {
163+
for i := range ref.Positive().BucketCounts().Len() {
164+
if eh.Positive().BucketCounts().At(i) < ref.Positive().BucketCounts().At(i) {
165165
return true
166166
}
167167
}
168-
if tsi.ExponentialHistogram.Negative().BucketCounts().Len() != eh.Negative().BucketCounts().Len() {
168+
if ref.Negative().BucketCounts().Len() != eh.Negative().BucketCounts().Len() {
169169
return true
170170
}
171-
for i := range tsi.ExponentialHistogram.Negative().BucketCounts().Len() {
172-
if eh.Negative().BucketCounts().At(i) < tsi.ExponentialHistogram.Negative().BucketCounts().At(i) {
171+
for i := range ref.Negative().BucketCounts().Len() {
172+
if eh.Negative().BucketCounts().At(i) < ref.Negative().BucketCounts().At(i) {
173173
return true
174174
}
175175
}
176176

177177
return false
178178
}
179179

180-
// IsResetSummary compares the given summary datapoint s to tsi.Summary and
180+
// IsResetSummary compares the given summary datapoint s to ref and
181181
// determines whether the metric has been reset based on the values. It is a
182182
// reset if the count or sum has decreased.
183-
func (tsi *TimeseriesInfo) IsResetSummary(s pmetric.SummaryDataPoint) bool {
184-
return s.Count() < tsi.Summary.Count() || s.Sum() < tsi.Summary.Sum()
183+
func IsResetSummary(s, ref pmetric.SummaryDataPoint) bool {
184+
return s.Count() < ref.Count() || s.Sum() < ref.Sum()
185185
}
186186

187-
// IsResetSum compares the given number datapoint s to tsi.Number and determines
187+
// IsResetSum compares the given number datapoint s to ref and determines
188188
// whether the metric has been reset based on the values. It is a reset if the
189189
// value has decreased.
190-
func (tsi *TimeseriesInfo) IsResetSum(s pmetric.NumberDataPoint) bool {
191-
return s.DoubleValue() < tsi.Number.DoubleValue()
190+
func IsResetSum(s, ref pmetric.NumberDataPoint) bool {
191+
return s.DoubleValue() < ref.DoubleValue()
192192
}
193193

194194
func newTimeseriesMap() *TimeseriesMap {

processor/metricstarttimeprocessor/internal/datapointstorage/timeseries_map_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -316,7 +316,7 @@ func TestTimeseriesInfo_IsResetHistogram(t *testing.T) {
316316
t.Run(tt.name, func(t *testing.T) {
317317
tsi := tt.setupTsi()
318318
h := tt.setupH()
319-
assert.Equal(t, tt.expectedReset, tsi.IsResetHistogram(h))
319+
assert.Equal(t, tt.expectedReset, IsResetHistogram(h, tsi.Histogram))
320320
})
321321
}
322322
}
@@ -518,7 +518,7 @@ func TestTimeseriesInfo_IsResetExponentialHistogram(t *testing.T) {
518518
t.Run(tt.name, func(t *testing.T) {
519519
tsi := tt.setupTsi()
520520
eh := tt.setupEh()
521-
assert.Equal(t, tt.expectedReset, tsi.IsResetExponentialHistogram(eh))
521+
assert.Equal(t, tt.expectedReset, IsResetExponentialHistogram(eh, tsi.ExponentialHistogram))
522522
})
523523
}
524524
}
@@ -604,7 +604,7 @@ func TestTimeseriesInfo_IsResetSummary(t *testing.T) {
604604
t.Run(tt.name, func(t *testing.T) {
605605
tsi := tt.setupTsi()
606606
s := tt.setupS()
607-
assert.Equal(t, tt.expectedReset, tsi.IsResetSummary(s))
607+
assert.Equal(t, tt.expectedReset, IsResetSummary(s, tsi.Summary))
608608
})
609609
}
610610
}
@@ -652,7 +652,7 @@ func TestTimeseriesInfo_IsResetSum(t *testing.T) {
652652
t.Run(tt.name, func(t *testing.T) {
653653
tsi := tt.setupTsi()
654654
s := tt.setupS()
655-
assert.Equal(t, tt.expectedReset, tsi.IsResetSum(s))
655+
assert.Equal(t, tt.expectedReset, IsResetSum(s, tsi.Number))
656656
})
657657
}
658658
}

0 commit comments

Comments
 (0)