Skip to content

Commit e289237

Browse files
authored
[processor/resourcedetection] introduce retry mechanism for detectors (#37506)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description Introduce retry mechanism for detectors. <!-- Issue number (e.g. #1234) or full URL to issue, if applicable. --> #### Link to tracking issue Fixes #34761 --------- Signed-off-by: odubajDT <[email protected]>
1 parent a826350 commit e289237

File tree

9 files changed

+138
-38
lines changed

9 files changed

+138
-38
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: processor/resourcedetection
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: "Introduce retry logic for failed resource detection."
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [34761]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: []

connector/datadogconnector/go.sum

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

exporter/datadogexporter/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,7 @@ require (
178178
github.com/bmatcuk/doublestar/v4 v4.8.1 // indirect
179179
github.com/briandowns/spinner v1.23.0 // indirect
180180
github.com/cenkalti/backoff v2.2.1+incompatible // indirect
181+
github.com/cenkalti/backoff/v5 v5.0.2 // indirect
181182
github.com/cespare/xxhash/v2 v2.3.0 // indirect
182183
github.com/cihub/seelog v0.0.0-20170130134532-f561c5e57575 // indirect
183184
github.com/cncf/xds/go v0.0.0-20240905190251-b4127c9b8d78 // indirect

exporter/datadogexporter/go.sum

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

exporter/datadogexporter/integrationtest/go.sum

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

processor/resourcedetectionprocessor/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ require (
1010
github.com/aws/aws-sdk-go-v2/config v1.29.7
1111
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.29
1212
github.com/aws/aws-sdk-go-v2/service/ec2 v1.203.1
13+
github.com/cenkalti/backoff/v5 v5.0.2
1314
github.com/google/go-cmp v0.6.0
1415
github.com/hashicorp/consul/api v1.31.2
1516
github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/ecsutil v0.120.1

processor/resourcedetectionprocessor/go.sum

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

processor/resourcedetectionprocessor/internal/resourcedetection.go

Lines changed: 45 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,13 @@ package internal // import "github.com/open-telemetry/opentelemetry-collector-co
77

88
import (
99
"context"
10+
"errors"
1011
"fmt"
1112
"net/http"
1213
"sync"
1314
"time"
1415

16+
backoff "github.com/cenkalti/backoff/v5"
1517
"go.opentelemetry.io/collector/featuregate"
1618
"go.opentelemetry.io/collector/pdata/pcommon"
1719
"go.opentelemetry.io/collector/processor"
@@ -120,31 +122,64 @@ func (p *ResourceProvider) Get(ctx context.Context, client *http.Client) (resour
120122
var cancel context.CancelFunc
121123
ctx, cancel = context.WithTimeout(ctx, client.Timeout)
122124
defer cancel()
123-
p.detectResource(ctx)
125+
p.detectResource(ctx, client.Timeout)
124126
})
125127

126128
return p.detectedResource.resource, p.detectedResource.schemaURL, p.detectedResource.err
127129
}
128130

129-
func (p *ResourceProvider) detectResource(ctx context.Context) {
131+
func (p *ResourceProvider) detectResource(ctx context.Context, timeout time.Duration) {
130132
p.detectedResource = &resourceResult{}
131133

132134
res := pcommon.NewResource()
133135
mergedSchemaURL := ""
134136

135137
p.logger.Info("began detecting resource information")
136138

137-
for _, detector := range p.detectors {
138-
r, schemaURL, err := detector.Detect(ctx)
139-
if err != nil {
140-
p.logger.Warn("failed to detect resource", zap.Error(err))
139+
resultsChan := make([]chan resourceResult, len(p.detectors))
140+
for i, detector := range p.detectors {
141+
resultsChan[i] = make(chan resourceResult)
142+
go func(detector Detector) {
143+
sleep := backoff.ExponentialBackOff{
144+
InitialInterval: 1 * time.Second,
145+
RandomizationFactor: 1.5,
146+
Multiplier: 2,
147+
MaxInterval: timeout,
148+
}
149+
sleep.Reset()
150+
var err error
151+
var r pcommon.Resource
152+
var schemaURL string
153+
for {
154+
r, schemaURL, err = detector.Detect(ctx)
155+
if err == nil {
156+
resultsChan[i] <- resourceResult{resource: r, schemaURL: schemaURL, err: nil}
157+
return
158+
}
159+
p.logger.Warn("failed to detect resource", zap.Error(err))
160+
161+
timer := time.NewTimer(sleep.NextBackOff())
162+
select {
163+
case <-timer.C:
164+
fmt.Println("Retrying fetching data...")
165+
case <-ctx.Done():
166+
p.logger.Warn("Context was cancelled: %w", zap.Error(ctx.Err()))
167+
resultsChan[i] <- resourceResult{resource: r, schemaURL: schemaURL, err: err}
168+
return
169+
}
170+
}
171+
}(detector)
172+
}
173+
174+
for _, ch := range resultsChan {
175+
result := <-ch
176+
if result.err != nil {
141177
if allowErrorPropagationFeatureGate.IsEnabled() {
142-
p.detectedResource.err = err
143-
return
178+
p.detectedResource.err = errors.Join(p.detectedResource.err, result.err)
144179
}
145180
} else {
146-
mergedSchemaURL = MergeSchemaURL(mergedSchemaURL, schemaURL)
147-
MergeResource(res, r, false)
181+
mergedSchemaURL = MergeSchemaURL(mergedSchemaURL, result.schemaURL)
182+
MergeResource(res, result.resource, false)
148183
}
149184
}
150185

processor/resourcedetectionprocessor/internal/resourcedetection_test.go

Lines changed: 56 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ func TestDetect(t *testing.T) {
107107
p, err := f.CreateResourceProvider(processortest.NewNopSettings(metadata.Type), time.Second, tt.attributes, &mockDetectorConfig{}, mockDetectorTypes...)
108108
require.NoError(t, err)
109109

110-
got, _, err := p.Get(context.Background(), http.DefaultClient)
110+
got, _, err := p.Get(context.Background(), &http.Client{Timeout: 10 * time.Second})
111111
require.NoError(t, err)
112112

113113
assert.Equal(t, tt.expectedResource, got.Attributes().AsRaw())
@@ -133,38 +133,46 @@ func TestDetectResource_DetectorFactoryError(t *testing.T) {
133133
require.EqualError(t, err, fmt.Sprintf("failed creating detector type %q: %v", mockDetectorKey, "creation failed"))
134134
}
135135

136-
func TestDetectResource_Error(t *testing.T) {
137-
md1 := &MockDetector{}
138-
res := pcommon.NewResource()
139-
require.NoError(t, res.Attributes().FromRaw(map[string]any{"a": "1", "b": "2"}))
140-
md1.On("Detect").Return(res, nil)
141-
142-
md2 := &MockDetector{}
143-
md2.On("Detect").Return(pcommon.NewResource(), errors.New("err1"))
144-
145-
p := NewResourceProvider(zap.NewNop(), time.Second, nil, md1, md2)
146-
_, _, err := p.Get(context.Background(), http.DefaultClient)
147-
require.NoError(t, err)
148-
}
149-
150-
func TestDetectResource_Error_PropagationEnabled(t *testing.T) {
136+
func TestDetectResource_Error_ContextDeadline_WithErrPropagation(t *testing.T) {
151137
err := featuregate.GlobalRegistry().Set(allowErrorPropagationFeatureGate.ID(), true)
152138
assert.NoError(t, err)
153139
defer func() {
154140
_ = featuregate.GlobalRegistry().Set(allowErrorPropagationFeatureGate.ID(), false)
155141
}()
156142

157143
md1 := &MockDetector{}
158-
res := pcommon.NewResource()
159-
require.NoError(t, res.Attributes().FromRaw(map[string]any{"a": "1", "b": "2"}))
160-
md1.On("Detect").Return(res, nil)
144+
md1.On("Detect").Return(pcommon.NewResource(), fmt.Errorf("err1"))
161145

162146
md2 := &MockDetector{}
163-
md2.On("Detect").Return(pcommon.NewResource(), errors.New("err1"))
147+
md2.On("Detect").Return(pcommon.NewResource(), errors.New("err2"))
164148

165149
p := NewResourceProvider(zap.NewNop(), time.Second, nil, md1, md2)
166-
_, _, err = p.Get(context.Background(), http.DefaultClient)
150+
151+
var cancel context.CancelFunc
152+
ctx, cancel := context.WithTimeout(context.TODO(), 3*time.Second)
153+
defer cancel()
154+
155+
_, _, err = p.Get(ctx, &http.Client{Timeout: 10 * time.Second})
167156
require.Error(t, err)
157+
require.Contains(t, err.Error(), "err1")
158+
require.Contains(t, err.Error(), "err2")
159+
}
160+
161+
func TestDetectResource_Error_ContextDeadline_WithoutErrPropagation(t *testing.T) {
162+
md1 := &MockDetector{}
163+
md1.On("Detect").Return(pcommon.NewResource(), fmt.Errorf("err1"))
164+
165+
md2 := &MockDetector{}
166+
md2.On("Detect").Return(pcommon.NewResource(), errors.New("err2"))
167+
168+
p := NewResourceProvider(zap.NewNop(), time.Second, nil, md1, md2)
169+
170+
var cancel context.CancelFunc
171+
ctx, cancel := context.WithTimeout(context.TODO(), 3*time.Second)
172+
defer cancel()
173+
174+
_, _, err := p.Get(ctx, &http.Client{Timeout: 10 * time.Second})
175+
require.NoError(t, err)
168176
}
169177

170178
func TestMergeResource(t *testing.T) {
@@ -230,20 +238,17 @@ func TestDetectResource_Parallel(t *testing.T) {
230238
require.NoError(t, res2.Attributes().FromRaw(map[string]any{"a": "11", "c": "3"}))
231239
md2.On("Detect").Return(res2, nil)
232240

233-
md3 := NewMockParallelDetector()
234-
md3.On("Detect").Return(pcommon.NewResource(), errors.New("an error"))
235-
236241
expectedResourceAttrs := map[string]any{"a": "1", "b": "2", "c": "3"}
237242

238-
p := NewResourceProvider(zap.NewNop(), time.Second, nil, md1, md2, md3)
243+
p := NewResourceProvider(zap.NewNop(), time.Second, nil, md1, md2)
239244

240245
// call p.Get multiple times
241246
wg := &sync.WaitGroup{}
242247
wg.Add(iterations)
243248
for i := 0; i < iterations; i++ {
244249
go func() {
245250
defer wg.Done()
246-
detected, _, err := p.Get(context.Background(), http.DefaultClient)
251+
detected, _, err := p.Get(context.Background(), &http.Client{Timeout: 10 * time.Second})
247252
assert.NoError(t, err)
248253
assert.Equal(t, expectedResourceAttrs, detected.Attributes().AsRaw())
249254
}()
@@ -255,13 +260,36 @@ func TestDetectResource_Parallel(t *testing.T) {
255260
// detector.Detect should only be called once, so we only need to notify each channel once
256261
md1.ch <- struct{}{}
257262
md2.ch <- struct{}{}
258-
md3.ch <- struct{}{}
259263

260264
// then wait until all goroutines are finished, and ensure p.Detect was only called once
261265
wg.Wait()
262266
md1.AssertNumberOfCalls(t, "Detect", 1)
263267
md2.AssertNumberOfCalls(t, "Detect", 1)
264-
md3.AssertNumberOfCalls(t, "Detect", 1)
268+
}
269+
270+
func TestDetectResource_Reconnect(t *testing.T) {
271+
md1 := &MockDetector{}
272+
res1 := pcommon.NewResource()
273+
require.NoError(t, res1.Attributes().FromRaw(map[string]any{"a": "1", "b": "2"}))
274+
md1.On("Detect").Return(pcommon.NewResource(), errors.New("connection error1")).Twice()
275+
md1.On("Detect").Return(res1, nil)
276+
277+
md2 := &MockDetector{}
278+
res2 := pcommon.NewResource()
279+
require.NoError(t, res2.Attributes().FromRaw(map[string]any{"c": "3"}))
280+
md2.On("Detect").Return(pcommon.NewResource(), errors.New("connection error2")).Once()
281+
md2.On("Detect").Return(res2, nil)
282+
283+
expectedResourceAttrs := map[string]any{"a": "1", "b": "2", "c": "3"}
284+
285+
p := NewResourceProvider(zap.NewNop(), time.Second, nil, md1, md2)
286+
287+
detected, _, err := p.Get(context.Background(), &http.Client{Timeout: 15 * time.Second})
288+
assert.NoError(t, err)
289+
assert.Equal(t, expectedResourceAttrs, detected.Attributes().AsRaw())
290+
291+
md1.AssertNumberOfCalls(t, "Detect", 3) // 2 errors + 1 success
292+
md2.AssertNumberOfCalls(t, "Detect", 2) // 1 error + 1 success
265293
}
266294

267295
func TestFilterAttributes_Match(t *testing.T) {

0 commit comments

Comments
 (0)