Skip to content

Commit a70fa09

Browse files
authored
Add back "[exporter/awsemfexporter]Split EMF log with larger than 100 buckets." (#36771)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description This PR fix the flaky unit test in previous PR: #36336, and add back the implementation of splitting the emf log logic. <!-- Issue number (e.g. #1234) or full URL to issue, if applicable. --> #### Link to tracking issue #36727 <!--Describe what testing was performed and which tests were added.--> #### Testing Unit test updated and passed with 10 count: ``` go test -run TestAddToGroupedMetric -count 10 -tags=always PASS ok github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awsemfexporter 0.016s ```
1 parent 9cb09d5 commit a70fa09

File tree

6 files changed

+891
-66
lines changed

6 files changed

+891
-66
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: awsemfexporter
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Split EMF log to multiple log splits when buckets larger than 100.
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [36771]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: []

exporter/awsemfexporter/datapoint.go

Lines changed: 179 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,33 @@ type summaryMetricEntry struct {
109109
count uint64
110110
}
111111

112+
// dataPointSplit is a structure used to manage segments of data points split from a histogram.
113+
// It is not safe for concurrent use.
114+
type dataPointSplit struct {
115+
cWMetricHistogram *cWMetricHistogram
116+
length int
117+
capacity int
118+
}
119+
120+
func (split *dataPointSplit) isFull() bool {
121+
return split.length >= split.capacity
122+
}
123+
124+
func (split *dataPointSplit) setMax(maxVal float64) {
125+
split.cWMetricHistogram.Max = maxVal
126+
}
127+
128+
func (split *dataPointSplit) setMin(minVal float64) {
129+
split.cWMetricHistogram.Min = minVal
130+
}
131+
132+
func (split *dataPointSplit) appendMetricData(metricVal float64, count uint64) {
133+
split.cWMetricHistogram.Values = append(split.cWMetricHistogram.Values, metricVal)
134+
split.cWMetricHistogram.Counts = append(split.cWMetricHistogram.Counts, float64(count))
135+
split.length++
136+
split.cWMetricHistogram.Count += count
137+
}
138+
112139
// CalculateDeltaDatapoints retrieves the NumberDataPoint at the given index and performs rate/delta calculation if necessary.
113140
func (dps numberDataPointSlice) CalculateDeltaDatapoints(i int, instrumentationScopeName string, _ bool, calculators *emfCalculators) ([]dataPoint, bool) {
114141
metric := dps.NumberDataPointSlice.At(i)
@@ -193,85 +220,195 @@ func (dps histogramDataPointSlice) IsStaleNaNInf(i int) (bool, pcommon.Map) {
193220
}
194221

195222
// CalculateDeltaDatapoints retrieves the ExponentialHistogramDataPoint at the given index.
223+
// As CloudWatch EMF logs allows in maximum of 100 target members, the exponential histogram metric are split into multiple data points as needed,
224+
// each containing a maximum of 100 buckets, to comply with CloudWatch EMF log constraints.
225+
// Note that the number of values and counts in each split may not be less than splitThreshold as we are only adding non-zero bucket counts.
226+
//
227+
// For each split data point:
228+
// - Min and Max values are recalculated based on the bucket boundary within that specific split.
229+
// - Sum is only assigned to the first split to ensure the total sum of the datapoints after aggregation is correct.
230+
// - Count is accumulated based on the bucket counts within each split.
196231
func (dps exponentialHistogramDataPointSlice) CalculateDeltaDatapoints(idx int, instrumentationScopeName string, _ bool, _ *emfCalculators) ([]dataPoint, bool) {
197232
metric := dps.ExponentialHistogramDataPointSlice.At(idx)
198233

234+
const splitThreshold = 100
235+
currentBucketIndex := 0
236+
currentPositiveIndex := metric.Positive().BucketCounts().Len() - 1
237+
currentZeroIndex := 0
238+
currentNegativeIndex := 0
239+
var datapoints []dataPoint
240+
totalBucketLen := metric.Positive().BucketCounts().Len() + metric.Negative().BucketCounts().Len()
241+
if metric.ZeroCount() > 0 {
242+
totalBucketLen++
243+
}
244+
245+
for currentBucketIndex < totalBucketLen {
246+
// Create a new dataPointSplit with a capacity of up to splitThreshold buckets
247+
capacity := min(splitThreshold, totalBucketLen-currentBucketIndex)
248+
249+
sum := 0.0
250+
// Only assign `Sum` if this is the first split to make sure the total sum of the datapoints after aggregation is correct.
251+
if currentBucketIndex == 0 {
252+
sum = metric.Sum()
253+
}
254+
255+
split := dataPointSplit{
256+
cWMetricHistogram: &cWMetricHistogram{
257+
Values: []float64{},
258+
Counts: []float64{},
259+
Max: metric.Max(),
260+
Min: metric.Min(),
261+
Count: 0,
262+
Sum: sum,
263+
},
264+
length: 0,
265+
capacity: capacity,
266+
}
267+
268+
// Set collect values from positive buckets and save into split.
269+
currentBucketIndex, currentPositiveIndex = collectDatapointsWithPositiveBuckets(&split, metric, currentBucketIndex, currentPositiveIndex)
270+
// Set collect values from zero buckets and save into split.
271+
currentBucketIndex, currentZeroIndex = collectDatapointsWithZeroBucket(&split, metric, currentBucketIndex, currentZeroIndex)
272+
// Set collect values from negative buckets and save into split.
273+
currentBucketIndex, currentNegativeIndex = collectDatapointsWithNegativeBuckets(&split, metric, currentBucketIndex, currentNegativeIndex)
274+
275+
if split.length > 0 {
276+
// Add the current split to the datapoints list
277+
datapoints = append(datapoints, dataPoint{
278+
name: dps.metricName,
279+
value: split.cWMetricHistogram,
280+
labels: createLabels(metric.Attributes(), instrumentationScopeName),
281+
timestampMs: unixNanoToMilliseconds(metric.Timestamp()),
282+
})
283+
}
284+
}
285+
286+
if len(datapoints) == 0 {
287+
return []dataPoint{{
288+
name: dps.metricName,
289+
value: &cWMetricHistogram{
290+
Values: []float64{},
291+
Counts: []float64{},
292+
Count: metric.Count(),
293+
Sum: metric.Sum(),
294+
Max: metric.Max(),
295+
Min: metric.Min(),
296+
},
297+
labels: createLabels(metric.Attributes(), instrumentationScopeName),
298+
timestampMs: unixNanoToMilliseconds(metric.Timestamp()),
299+
}}, true
300+
}
301+
302+
// Override the min and max values of the first and last splits with the raw data of the metric.
303+
datapoints[0].value.(*cWMetricHistogram).Max = metric.Max()
304+
datapoints[len(datapoints)-1].value.(*cWMetricHistogram).Min = metric.Min()
305+
306+
return datapoints, true
307+
}
308+
309+
func collectDatapointsWithPositiveBuckets(split *dataPointSplit, metric pmetric.ExponentialHistogramDataPoint, currentBucketIndex int, currentPositiveIndex int) (int, int) {
310+
if split.isFull() || currentPositiveIndex < 0 {
311+
return currentBucketIndex, currentPositiveIndex
312+
}
313+
199314
scale := metric.Scale()
200315
base := math.Pow(2, math.Pow(2, float64(-scale)))
201-
arrayValues := []float64{}
202-
arrayCounts := []float64{}
203-
var bucketBegin float64
204-
var bucketEnd float64
205-
206-
// Set mid-point of positive buckets in values/counts array.
207316
positiveBuckets := metric.Positive()
208317
positiveOffset := positiveBuckets.Offset()
209318
positiveBucketCounts := positiveBuckets.BucketCounts()
210-
bucketBegin = 0
211-
bucketEnd = 0
212-
for i := 0; i < positiveBucketCounts.Len(); i++ {
213-
index := i + int(positiveOffset)
214-
if bucketBegin == 0 {
215-
bucketBegin = math.Pow(base, float64(index))
319+
bucketBegin := 0.0
320+
bucketEnd := 0.0
321+
322+
for !split.isFull() && currentPositiveIndex >= 0 {
323+
index := currentPositiveIndex + int(positiveOffset)
324+
if bucketEnd == 0 {
325+
bucketEnd = math.Pow(base, float64(index+1))
216326
} else {
217-
bucketBegin = bucketEnd
327+
bucketEnd = bucketBegin
218328
}
219-
bucketEnd = math.Pow(base, float64(index+1))
329+
bucketBegin = math.Pow(base, float64(index))
220330
metricVal := (bucketBegin + bucketEnd) / 2
221-
count := positiveBucketCounts.At(i)
331+
count := positiveBucketCounts.At(currentPositiveIndex)
222332
if count > 0 {
223-
arrayValues = append(arrayValues, metricVal)
224-
arrayCounts = append(arrayCounts, float64(count))
333+
split.appendMetricData(metricVal, count)
334+
335+
// The value are append from high to low, set Max from the first bucket (highest value) and Min from the last bucket (lowest value)
336+
if split.length == 1 {
337+
split.setMax(bucketEnd)
338+
}
339+
if split.isFull() {
340+
split.setMin(bucketBegin)
341+
}
225342
}
343+
currentBucketIndex++
344+
currentPositiveIndex--
226345
}
227346

228-
// Set count of zero bucket in values/counts array.
229-
if metric.ZeroCount() > 0 {
230-
arrayValues = append(arrayValues, 0)
231-
arrayCounts = append(arrayCounts, float64(metric.ZeroCount()))
347+
return currentBucketIndex, currentPositiveIndex
348+
}
349+
350+
func collectDatapointsWithZeroBucket(split *dataPointSplit, metric pmetric.ExponentialHistogramDataPoint, currentBucketIndex int, currentZeroIndex int) (int, int) {
351+
if metric.ZeroCount() > 0 && !split.isFull() && currentZeroIndex == 0 {
352+
split.appendMetricData(0, metric.ZeroCount())
353+
354+
// The value are append from high to low, set Max from the first bucket (highest value) and Min from the last bucket (lowest value)
355+
if split.length == 1 {
356+
split.setMax(0)
357+
}
358+
if split.isFull() {
359+
split.setMin(0)
360+
}
361+
currentZeroIndex++
362+
currentBucketIndex++
232363
}
233364

234-
// Set mid-point of negative buckets in values/counts array.
365+
return currentBucketIndex, currentZeroIndex
366+
}
367+
368+
func collectDatapointsWithNegativeBuckets(split *dataPointSplit, metric pmetric.ExponentialHistogramDataPoint, currentBucketIndex int, currentNegativeIndex int) (int, int) {
235369
// According to metrics spec, the value in histogram is expected to be non-negative.
236370
// https://opentelemetry.io/docs/specs/otel/metrics/api/#histogram
237371
// However, the negative support is defined in metrics data model.
238372
// https://opentelemetry.io/docs/specs/otel/metrics/data-model/#exponentialhistogram
239373
// The negative is also supported but only verified with unit test.
374+
if split.isFull() || currentNegativeIndex >= metric.Negative().BucketCounts().Len() {
375+
return currentBucketIndex, currentNegativeIndex
376+
}
240377

378+
scale := metric.Scale()
379+
base := math.Pow(2, math.Pow(2, float64(-scale)))
241380
negativeBuckets := metric.Negative()
242381
negativeOffset := negativeBuckets.Offset()
243382
negativeBucketCounts := negativeBuckets.BucketCounts()
244-
bucketBegin = 0
245-
bucketEnd = 0
246-
for i := 0; i < negativeBucketCounts.Len(); i++ {
247-
index := i + int(negativeOffset)
383+
bucketBegin := 0.0
384+
bucketEnd := 0.0
385+
386+
for !split.isFull() && currentNegativeIndex < metric.Negative().BucketCounts().Len() {
387+
index := currentNegativeIndex + int(negativeOffset)
248388
if bucketEnd == 0 {
249389
bucketEnd = -math.Pow(base, float64(index))
250390
} else {
251391
bucketEnd = bucketBegin
252392
}
253393
bucketBegin = -math.Pow(base, float64(index+1))
254394
metricVal := (bucketBegin + bucketEnd) / 2
255-
count := negativeBucketCounts.At(i)
395+
count := negativeBucketCounts.At(currentNegativeIndex)
256396
if count > 0 {
257-
arrayValues = append(arrayValues, metricVal)
258-
arrayCounts = append(arrayCounts, float64(count))
397+
split.appendMetricData(metricVal, count)
398+
399+
// The value are append from high to low, set Max from the first bucket (highest value) and Min from the last bucket (lowest value)
400+
if split.length == 1 {
401+
split.setMax(bucketEnd)
402+
}
403+
if split.isFull() {
404+
split.setMin(bucketBegin)
405+
}
259406
}
407+
currentBucketIndex++
408+
currentNegativeIndex++
260409
}
261410

262-
return []dataPoint{{
263-
name: dps.metricName,
264-
value: &cWMetricHistogram{
265-
Values: arrayValues,
266-
Counts: arrayCounts,
267-
Count: metric.Count(),
268-
Sum: metric.Sum(),
269-
Max: metric.Max(),
270-
Min: metric.Min(),
271-
},
272-
labels: createLabels(metric.Attributes(), instrumentationScopeName),
273-
timestampMs: unixNanoToMilliseconds(metric.Timestamp()),
274-
}}, true
411+
return currentBucketIndex, currentNegativeIndex
275412
}
276413

277414
func (dps exponentialHistogramDataPointSlice) IsStaleNaNInf(i int) (bool, pcommon.Map) {

0 commit comments

Comments
 (0)