Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions processor/spanmetricsprocessor/internal/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
// batch of spans.
type Cache struct {
*lru.Cache
evictedItems map[interface{}]interface{}
EvictedItems map[interface{}]interface{}
}

// NewCache creates a Cache.
Expand All @@ -40,15 +40,15 @@ func NewCache(size int) (*Cache, error) {

return &Cache{
Cache: lruCache,
evictedItems: evictedItems,
EvictedItems: evictedItems,
}, nil
}

// RemoveEvictedItems cleans all the evicted items.
func (c *Cache) RemoveEvictedItems() {
// we need to keep the original pointer to evictedItems map as it is used in the closure of lru.NewWithEvict
for k := range c.evictedItems {
delete(c.evictedItems, k)
for k := range c.EvictedItems {
delete(c.EvictedItems, k)
}
}

Expand All @@ -57,7 +57,7 @@ func (c *Cache) Get(key interface{}) (interface{}, bool) {
if val, ok := c.Cache.Get(key); ok {
return val, ok
}
val, ok := c.evictedItems[key]
val, ok := c.EvictedItems[key]
return val, ok
}

Expand Down
24 changes: 12 additions & 12 deletions processor/spanmetricsprocessor/internal/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,16 +69,16 @@ func TestCache_Get(t *testing.T) {
tests := []struct {
name string
lruCache func() *Cache
evictedItems map[interface{}]interface{}
EvictedItems map[interface{}]interface{}
key interface{}
wantValue interface{}
wantOk bool
}{
{
name: "if key is not found in LRUCache, will get key from evictedItems",
name: "if key is not found in LRUCache, will get key from EvictedItems",
lruCache: func() *Cache {
cache, _ := NewCache(1)
cache.evictedItems["key"] = "val"
cache.EvictedItems["key"] = "val"
return cache
},
key: "key",
Expand All @@ -90,7 +90,7 @@ func TestCache_Get(t *testing.T) {
lruCache: func() *Cache {
cache, _ := NewCache(1)
cache.Add("key", "val_from_LRU")
cache.evictedItems["key"] = "val_from_evicted_items"
cache.EvictedItems["key"] = "val_from_evicted_items"
return cache
},
key: "key",
Expand Down Expand Up @@ -142,8 +142,8 @@ func TestCache_RemoveEvictedItems(t *testing.T) {
if err != nil {
return nil, err
}
cache.evictedItems["key0"] = "val0"
cache.evictedItems["key1"] = "val1"
cache.EvictedItems["key0"] = "val0"
cache.EvictedItems["key1"] = "val1"
return cache, nil
},
},
Expand All @@ -155,7 +155,7 @@ func TestCache_RemoveEvictedItems(t *testing.T) {
cache, err := tt.lruCache()
assert.NoError(t, err)
cache.RemoveEvictedItems()
assert.Empty(t, cache.evictedItems)
assert.Empty(t, cache.EvictedItems)
})
}
}
Expand All @@ -178,8 +178,8 @@ func TestCache_PurgeItems(t *testing.T) {
if err != nil {
return nil, err
}
cache.evictedItems["key0"] = "val0"
cache.evictedItems["key1"] = "val1"
cache.EvictedItems["key0"] = "val0"
cache.EvictedItems["key1"] = "val1"
return cache, nil
},
},
Expand All @@ -192,8 +192,8 @@ func TestCache_PurgeItems(t *testing.T) {
}
cache.Add("key", "val")
cache.Add("key2", "val2")
cache.evictedItems["key0"] = "val0"
cache.evictedItems["key1"] = "val1"
cache.EvictedItems["key0"] = "val0"
cache.EvictedItems["key1"] = "val1"
return cache, nil
},
},
Expand All @@ -206,7 +206,7 @@ func TestCache_PurgeItems(t *testing.T) {
assert.NoError(t, err)
cache.Purge()
assert.Zero(t, cache.Len())
assert.Empty(t, cache.evictedItems)
assert.Empty(t, cache.EvictedItems)
})
}
}
7 changes: 6 additions & 1 deletion processor/spanmetricsprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,8 +339,13 @@ func (p *processorImp) getDimensionsByMetricKey(k metricKey) (*pcommon.Map, erro
}
return nil, fmt.Errorf("type assertion of metricKeyToDimensions attributes failed, the key is %q", k)
}
if item, ok := p.metricKeyToDimensions.EvictedItems[k]; ok {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought metricKeyToDimensions .Get(...) already does this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it do use the evictedItems , but still can't not explain why this error occur

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I debug for a while, I catch the point this time that:

for key := range p.callSum {
mCalls := ilm.Metrics().AppendEmpty()
mCalls.SetName("calls_total")
mCalls.SetEmptySum().SetIsMonotonic(true)
mCalls.Sum().SetAggregationTemporality(p.config.GetAggregationTemporality())
dpCalls := mCalls.Sum().DataPoints().AppendEmpty()
dpCalls.SetStartTimestamp(pcommon.NewTimestampFromTime(p.startTime))
dpCalls.SetTimestamp(pcommon.NewTimestampFromTime(time.Now()))
dpCalls.SetIntValue(p.callSum[key])
dimensions, err := p.getDimensionsByMetricKey(key)

It loops p.callSum containing all key-values since the collector is started to find a key in the cache, but the cache only has 1000 items by default. If the built different key is large than 1000, then the next loop of p.callSum will raise an error.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes, I think you're right about the fact that p.callSum accumulates keys indefinitely since startup, unless if the user has configured delta temporality.

For cumulative temporality, while I think we should still keep all keys since collector startup (so the counts are correct), I don't think we should loop over this entire set of metric keys. Instead we should only loop over those that relate to the current batch of spans received, by somehow marking a key as "dirty" if it relates to a span's set of metric keys.

What do you think?

I can put together some tests locally to test this theory.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so, we just loop over those keys that relate to the current batch of spans received. Do I need close this PR and wait for your PR or I fix it in this PR?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please go ahead with the fix 👍🏼

Note that #15710 involves a fairly major refactor of the spanmetrics processor, so I suggest to wait for #15710 to be merged first.

As it's quite a large refactor, it might make more sense to close this PR and create a new one based off the refactored version.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Frapschen FYI #15710 has been merged.

if attributeMap, ok := item.(pcommon.Map); ok {
return &attributeMap, nil
}
}

return nil, fmt.Errorf("value not found in metricKeyToDimensions cache by key %q", k)
return nil, fmt.Errorf("value not found in metricKeyToDimensions cache and its EvictedItems by key %q", k)
}

// aggregateMetrics aggregates the raw metrics from the input trace data.
Expand Down