Skip to content

Commit 2527d36

Browse files
Add context handling to sinks in consumertest (#13048)
#### Description This PR enhances the `consumertest.Sink` types (TracesSink, MetricsSink, LogsSink, ProfilesSink) to store and expose request contexts during consumption Changes include: - Added context storage to all sink types - Added `AllContexts()` method to retrieve stored contexts - Updated `Reset()` to clear stored contexts - Added comprehensive tests for context transformation verification #### Link to tracking issue Fixes [#13039](#13039) #### Testing Added new test cases to verify context handling: - `TestSinkContextTransformation`: Verifies context preservation across all sink types - `TestContextTransformationChain`: Tests complex chains of context transformations - `TestConcurrentContextTransformations`: Ensures thread-safe context handling - Added context verification to existing sink tests #### Documentation
1 parent e6c05b8 commit 2527d36

File tree

3 files changed

+348
-5
lines changed

3 files changed

+348
-5
lines changed

.chloggen/context-in-sinks.yaml

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
7+
component: consumer/consumertest
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Add context to sinks
11+
12+
# One or more tracking issues or pull requests related to the change
13+
issues: [13039]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# Optional: The change log or logs in which this entry should be included.
21+
# e.g. '[user]' or '[user, api]'
22+
# Include 'user' if the change is relevant to end users.
23+
# Include 'api' if there is a change to a library API.
24+
# Default: '[user]'
25+
change_logs: [api]

consumer/consumertest/sink.go

Lines changed: 56 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,17 +21,19 @@ type TracesSink struct {
2121
nonMutatingConsumer
2222
mu sync.Mutex
2323
traces []ptrace.Traces
24+
contexts []context.Context
2425
spanCount int
2526
}
2627

2728
var _ consumer.Traces = (*TracesSink)(nil)
2829

2930
// ConsumeTraces stores traces to this sink.
30-
func (ste *TracesSink) ConsumeTraces(_ context.Context, td ptrace.Traces) error {
31+
func (ste *TracesSink) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {
3132
ste.mu.Lock()
3233
defer ste.mu.Unlock()
3334

3435
ste.traces = append(ste.traces, td)
36+
ste.contexts = append(ste.contexts, ctx)
3537
ste.spanCount += td.SpanCount()
3638

3739
return nil
@@ -47,6 +49,16 @@ func (ste *TracesSink) AllTraces() []ptrace.Traces {
4749
return copyTraces
4850
}
4951

52+
// Contexts returns the contexts stored by this sink since last Reset.
53+
func (ste *TracesSink) Contexts() []context.Context {
54+
ste.mu.Lock()
55+
defer ste.mu.Unlock()
56+
57+
copyContexts := make([]context.Context, len(ste.contexts))
58+
copy(copyContexts, ste.contexts)
59+
return copyContexts
60+
}
61+
5062
// SpanCount returns the number of spans sent to this sink.
5163
func (ste *TracesSink) SpanCount() int {
5264
ste.mu.Lock()
@@ -60,6 +72,7 @@ func (ste *TracesSink) Reset() {
6072
defer ste.mu.Unlock()
6173

6274
ste.traces = nil
75+
ste.contexts = nil
6376
ste.spanCount = 0
6477
}
6578

@@ -69,17 +82,19 @@ type MetricsSink struct {
6982
nonMutatingConsumer
7083
mu sync.Mutex
7184
metrics []pmetric.Metrics
85+
contexts []context.Context
7286
dataPointCount int
7387
}
7488

7589
var _ consumer.Metrics = (*MetricsSink)(nil)
7690

7791
// ConsumeMetrics stores metrics to this sink.
78-
func (sme *MetricsSink) ConsumeMetrics(_ context.Context, md pmetric.Metrics) error {
92+
func (sme *MetricsSink) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error {
7993
sme.mu.Lock()
8094
defer sme.mu.Unlock()
8195

8296
sme.metrics = append(sme.metrics, md)
97+
sme.contexts = append(sme.contexts, ctx)
8398
sme.dataPointCount += md.DataPointCount()
8499

85100
return nil
@@ -95,6 +110,16 @@ func (sme *MetricsSink) AllMetrics() []pmetric.Metrics {
95110
return copyMetrics
96111
}
97112

113+
// Contexts returns the contexts stored by this sink since last Reset.
114+
func (sme *MetricsSink) Contexts() []context.Context {
115+
sme.mu.Lock()
116+
defer sme.mu.Unlock()
117+
118+
copyContexts := make([]context.Context, len(sme.contexts))
119+
copy(copyContexts, sme.contexts)
120+
return copyContexts
121+
}
122+
98123
// DataPointCount returns the number of metrics stored by this sink since last Reset.
99124
func (sme *MetricsSink) DataPointCount() int {
100125
sme.mu.Lock()
@@ -108,6 +133,7 @@ func (sme *MetricsSink) Reset() {
108133
defer sme.mu.Unlock()
109134

110135
sme.metrics = nil
136+
sme.contexts = nil
111137
sme.dataPointCount = 0
112138
}
113139

@@ -117,19 +143,20 @@ type LogsSink struct {
117143
nonMutatingConsumer
118144
mu sync.Mutex
119145
logs []plog.Logs
146+
contexts []context.Context
120147
logRecordCount int
121148
}
122149

123150
var _ consumer.Logs = (*LogsSink)(nil)
124151

125152
// ConsumeLogs stores logs to this sink.
126-
func (sle *LogsSink) ConsumeLogs(_ context.Context, ld plog.Logs) error {
153+
func (sle *LogsSink) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
127154
sle.mu.Lock()
128155
defer sle.mu.Unlock()
129156

130157
sle.logs = append(sle.logs, ld)
131158
sle.logRecordCount += ld.LogRecordCount()
132-
159+
sle.contexts = append(sle.contexts, ctx)
133160
return nil
134161
}
135162

@@ -156,26 +183,39 @@ func (sle *LogsSink) Reset() {
156183
defer sle.mu.Unlock()
157184

158185
sle.logs = nil
186+
sle.contexts = nil
159187
sle.logRecordCount = 0
160188
}
161189

190+
// Contexts returns the contexts stored by this sink since last Reset.
191+
func (sle *LogsSink) Contexts() []context.Context {
192+
sle.mu.Lock()
193+
defer sle.mu.Unlock()
194+
195+
copyContexts := make([]context.Context, len(sle.contexts))
196+
copy(copyContexts, sle.contexts)
197+
return copyContexts
198+
}
199+
162200
// ProfilesSink is a xconsumer.Profiles that acts like a sink that
163201
// stores all profiles and allows querying them for testing.
164202
type ProfilesSink struct {
165203
nonMutatingConsumer
166204
mu sync.Mutex
167205
profiles []pprofile.Profiles
206+
contexts []context.Context
168207
sampleCount int
169208
}
170209

171210
var _ xconsumer.Profiles = (*ProfilesSink)(nil)
172211

173212
// ConsumeProfiles stores profiles to this sink.
174-
func (ste *ProfilesSink) ConsumeProfiles(_ context.Context, td pprofile.Profiles) error {
213+
func (ste *ProfilesSink) ConsumeProfiles(ctx context.Context, td pprofile.Profiles) error {
175214
ste.mu.Lock()
176215
defer ste.mu.Unlock()
177216

178217
ste.profiles = append(ste.profiles, td)
218+
ste.contexts = append(ste.contexts, ctx)
179219
ste.sampleCount += td.SampleCount()
180220

181221
return nil
@@ -204,5 +244,16 @@ func (ste *ProfilesSink) Reset() {
204244
defer ste.mu.Unlock()
205245

206246
ste.profiles = nil
247+
ste.contexts = nil
207248
ste.sampleCount = 0
208249
}
250+
251+
// Contexts returns the contexts stored by this sink since last Reset.
252+
func (ste *ProfilesSink) Contexts() []context.Context {
253+
ste.mu.Lock()
254+
defer ste.mu.Unlock()
255+
256+
copyContexts := make([]context.Context, len(ste.contexts))
257+
copy(copyContexts, ste.contexts)
258+
return copyContexts
259+
}

0 commit comments

Comments
 (0)