Skip to content

Commit f6df1f1

Browse files
committed
metricstarttimeprocessor: Add subtractinitial strategy implementation
This change implements the strategy defined in open-telemetry#38379.
1 parent 0b45594 commit f6df1f1

File tree

4 files changed

+402
-63
lines changed

4 files changed

+402
-63
lines changed

processor/metricstarttimeprocessor/internal/starttimecache/start_time_cache.go renamed to processor/metricstarttimeprocessor/internal/datapointstorage/cache.go

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
// Copyright The OpenTelemetry Authors
22
// SPDX-License-Identifier: Apache-2.0
33

4-
package starttimecache // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/metricstarttimeprocessor/internal/starttimecache"
4+
package datapointstorage // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/metricstarttimeprocessor/internal/starttimecache"
55

66
import (
77
"sync"
@@ -30,7 +30,7 @@ import (
3030
// The gc strategy uses a standard mark-and-sweep approach - each time a timeseriesMap is accessed,
3131
// it is marked. Similarly, each time a timeseriesInfo is accessed, it is also marked.
3232
//
33-
// At the end of each StartTimeCache.get(), if the last time the StartTimeCache was gc'd exceeds the 'gcInterval',
33+
// At the end of each StartTimeCache.Get(), if the last time the StartTimeCache was gc'd exceeds the 'gcInterval',
3434
// the StartTimeCache is locked and any timeseriesMaps that are unmarked are removed from the StartTimeCache
3535
// otherwise the timeseriesMap is gc'd
3636
//
@@ -57,20 +57,20 @@ type TimeseriesInfo struct {
5757
}
5858

5959
type NumberInfo struct {
60-
StartTime pcommon.Timestamp
61-
PreviousValue float64
60+
StartTime pcommon.Timestamp
61+
Value float64
6262
}
6363

6464
type HistogramInfo struct {
65-
StartTime pcommon.Timestamp
66-
PreviousCount uint64
67-
PreviousSum float64
65+
StartTime pcommon.Timestamp
66+
Count uint64
67+
Sum float64
6868
}
6969

7070
type SummaryInfo struct {
71-
StartTime pcommon.Timestamp
72-
PreviousCount uint64
73-
PreviousSum float64
71+
StartTime pcommon.Timestamp
72+
Count uint64
73+
Sum float64
7474
}
7575

7676
type TimeseriesKey struct {
@@ -157,8 +157,8 @@ func newTimeseriesMap() *TimeseriesMap {
157157
return &TimeseriesMap{Mark: true, TsiMap: map[TimeseriesKey]*TimeseriesInfo{}}
158158
}
159159

160-
// StartTimeCache maps from a resource to a map of timeseries instances for the resource.
161-
type StartTimeCache struct {
160+
// DataPointCache maps from a resource to a map of timeseries instances for the resource.
161+
type DataPointCache struct {
162162
sync.RWMutex
163163
// The mutex is used to protect access to the member fields. It is acquired for most of
164164
// get() and also acquired by gc().
@@ -168,13 +168,13 @@ type StartTimeCache struct {
168168
resourceMap map[[16]byte]*TimeseriesMap
169169
}
170170

171-
// NewStartTimeCache creates a new (empty) JobsMap.
172-
func NewStartTimeCache(gcInterval time.Duration) *StartTimeCache {
173-
return &StartTimeCache{gcInterval: gcInterval, lastGC: time.Now(), resourceMap: make(map[[16]byte]*TimeseriesMap)}
171+
// NewDataPointCache creates a new (empty) JobsMap.
172+
func NewDataPointCache(gcInterval time.Duration) *DataPointCache {
173+
return &DataPointCache{gcInterval: gcInterval, lastGC: time.Now(), resourceMap: make(map[[16]byte]*TimeseriesMap)}
174174
}
175175

176176
// Remove jobs and timeseries that have aged out.
177-
func (c *StartTimeCache) gc() {
177+
func (c *DataPointCache) gc() {
178178
c.Lock()
179179
defer c.Unlock()
180180
// once the structure is locked, confirm that gc() is still necessary
@@ -196,7 +196,7 @@ func (c *StartTimeCache) gc() {
196196
}
197197

198198
// Speculatively check if gc() is necessary, recheck once the structure is locked
199-
func (c *StartTimeCache) MaybeGC() {
199+
func (c *DataPointCache) MaybeGC() {
200200
c.RLock()
201201
defer c.RUnlock()
202202
if time.Since(c.lastGC) > c.gcInterval {
@@ -205,7 +205,7 @@ func (c *StartTimeCache) MaybeGC() {
205205
}
206206

207207
// Fetches the TimeseriesMap for the given resource hash. Creates a new map if required.
208-
func (c *StartTimeCache) Get(resourceHash [16]byte) *TimeseriesMap {
208+
func (c *DataPointCache) Get(resourceHash [16]byte) *TimeseriesMap {
209209
// a read lock is taken here as we will not need to modify resourceMap if the target timeseriesMap is available.
210210
c.RLock()
211211
tsm, ok := c.resourceMap[resourceHash]

processor/metricstarttimeprocessor/internal/starttimecache/start_time_cache_test.go renamed to processor/metricstarttimeprocessor/internal/datapointstorage/cache_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
// Copyright The OpenTelemetry Authors
22
// SPDX-License-Identifier: Apache-2.0
33

4-
package starttimecache // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/metricstarttimeprocessor/internal/starttimecache"
4+
package datapointstorage // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/metricstarttimeprocessor/internal/starttimecache"
55

66
import (
77
"testing"
@@ -16,7 +16,7 @@ import (
1616

1717
func TestStartTimeCache_NewStartTimeCache(t *testing.T) {
1818
gcInterval := time.Minute
19-
stc := NewStartTimeCache(gcInterval)
19+
stc := NewDataPointCache(gcInterval)
2020

2121
assert.NotNil(t, stc)
2222
assert.Equal(t, gcInterval, stc.gcInterval)
@@ -25,7 +25,7 @@ func TestStartTimeCache_NewStartTimeCache(t *testing.T) {
2525
}
2626

2727
func TestStartTimeCache_Get(t *testing.T) {
28-
stc := NewStartTimeCache(time.Minute)
28+
stc := NewDataPointCache(time.Minute)
2929
resourceAttrs := pcommon.NewMap()
3030
resourceAttrs.PutStr("k1", "v1")
3131
resourceHash := pdatautil.MapHash(resourceAttrs)
@@ -40,7 +40,7 @@ func TestStartTimeCache_Get(t *testing.T) {
4040
}
4141

4242
func TestStartTimeCache_MaybeGC(t *testing.T) {
43-
stc := NewStartTimeCache(time.Millisecond)
43+
stc := NewDataPointCache(time.Millisecond)
4444
resourceAttrs := pcommon.NewMap()
4545
resourceAttrs.PutStr("k1", "v1")
4646
resourceHash := pdatautil.MapHash(resourceAttrs)

0 commit comments

Comments
 (0)