Skip to content

Commit 6aa4367

Browse files
committed
[pdata] Enable the pdata mutation safeguards in the fanout consumers
This required introducing extra API to get the pdata mutability state: - p[metric|trace|log].[Metrics|Traces|Logs].IsReadOnly()
1 parent 5361583 commit 6aa4367

File tree

15 files changed

+324
-118
lines changed

15 files changed

+324
-118
lines changed
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
2+
change_type: enhancement
3+
4+
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
5+
component: fanoutconsumer
6+
7+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
8+
note: Enable runtime assertions to catch incorrect pdata mutations in the components claiming as non-mutating pdata.
9+
10+
# One or more tracking issues or pull requests related to the change
11+
issues: [6794]
12+
13+
# (Optional) One or more lines of additional information to render under the primary note.
14+
# These lines will be padded with 2 spaces and then inserted directly into the document.
15+
# Use pipe (|) for multiline entries.
16+
subtext: |
17+
This change introduces enables the runtime assertions to catch unintentional pdata mutations in components
18+
that are claimed as non-mutating pdata. Without these assertions, runtime errors may still occur, but thrown by
19+
unrelated components, making it very difficult to troubleshoot.
20+
21+
# Optional: The change log or logs in which this entry should be included.
22+
# e.g. '[user]' or '[user, api]'
23+
# Include 'user' if the change is relevant to end users.
24+
# Include 'api' if there is a change to a library API.
25+
# Default: '[user]'
26+
change_logs: [user]

.chloggen/pdata-mutation-assertions.yaml

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ change_type: enhancement
55
component: pdata
66

77
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
8-
note: Introduce runtime assertions to catch incorrect pdata mutations
8+
note: Introduce API to control pdata mutability.
99

1010
# One or more tracking issues or pull requests related to the change
1111
issues: [6794]
@@ -14,6 +14,14 @@ issues: [6794]
1414
# These lines will be padded with 2 spaces and then inserted directly into the document.
1515
# Use pipe (|) for multiline entries.
1616
subtext: |
17-
This change introduces an option to enable runtime assertions to catch unintentional pdata mutations in components
18-
that are claimed as non-mutating pdata. Without these assertions, runtime errors may still occur, but thrown by
19-
unrelated components, making it very difficult to troubleshoot.
17+
This change introduces new API pdata methods to control the mutability:
18+
- p[metric|trace|log].[Metrics|Traces|Logs].MarkReadOnly() - marks the pdata as read-only. Any subsequent
19+
mutations will result in a panic.
20+
- p[metric|trace|log].[Metrics|Traces|Logs].IsReadOnly() - returns true if the pdata is marked as read-only.
21+
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [api]

internal/fanoutconsumer/logs.go

Lines changed: 30 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -20,36 +20,22 @@ import (
2020
// NewLogs wraps multiple log consumers in a single one.
2121
// It fanouts the incoming data to all the consumers, and does smart routing:
2222
// - Clones only to the consumer that needs to mutate the data.
23-
// - If all consumers needs to mutate the data one will get the original data.
23+
// - If all consumers needs to mutate the data one will get the original mutable data.
2424
func NewLogs(lcs []consumer.Logs) consumer.Logs {
25-
if len(lcs) == 1 {
26-
// Don't wrap if no need to do it.
27-
return lcs[0]
28-
}
29-
var pass []consumer.Logs
30-
var clone []consumer.Logs
31-
for i := 0; i < len(lcs)-1; i++ {
32-
if !lcs[i].Capabilities().MutatesData {
33-
pass = append(pass, lcs[i])
25+
lc := &logsConsumer{}
26+
for i := 0; i < len(lcs); i++ {
27+
if lcs[i].Capabilities().MutatesData {
28+
lc.mutable = append(lc.mutable, lcs[i])
3429
} else {
35-
clone = append(clone, lcs[i])
30+
lc.readonly = append(lc.readonly, lcs[i])
3631
}
3732
}
38-
// Give the original data to the last consumer if no other read-only consumer,
39-
// otherwise put it in the right bucket. Never share the same data between
40-
// a mutating and a non-mutating consumer since the non-mutating consumer may process
41-
// data async and the mutating consumer may change the data before that.
42-
if len(pass) == 0 || !lcs[len(lcs)-1].Capabilities().MutatesData {
43-
pass = append(pass, lcs[len(lcs)-1])
44-
} else {
45-
clone = append(clone, lcs[len(lcs)-1])
46-
}
47-
return &logsConsumer{pass: pass, clone: clone}
33+
return lc
4834
}
4935

5036
type logsConsumer struct {
51-
pass []consumer.Logs
52-
clone []consumer.Logs
37+
mutable []consumer.Logs
38+
readonly []consumer.Logs
5339
}
5440

5541
func (lsc *logsConsumer) Capabilities() consumer.Capabilities {
@@ -59,17 +45,30 @@ func (lsc *logsConsumer) Capabilities() consumer.Capabilities {
5945
// ConsumeLogs exports the plog.Logs to all consumers wrapped by the current one.
6046
func (lsc *logsConsumer) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
6147
var errs error
62-
// Initially pass to clone exporter to avoid the case where the optimization of sending
63-
// the incoming data to a mutating consumer is used that may change the incoming data before
64-
// cloning.
65-
for _, lc := range lsc.clone {
66-
clonedLogs := plog.NewLogs()
67-
ld.CopyTo(clonedLogs)
68-
errs = multierr.Append(errs, lc.ConsumeLogs(ctx, clonedLogs))
48+
49+
// Clone the data before sending to mutable consumers.
50+
// The only exception is the last consumer which is allowed to mutate the data only if there are no
51+
// other non-mutating consumers and the data is mutable. Never share the same data between
52+
// a mutating and a non-mutating consumer since the non-mutating consumer may process
53+
// data async and the mutating consumer may change the data before that.
54+
for i, lc := range lsc.mutable {
55+
if i < len(lsc.mutable)-1 || ld.IsReadOnly() || len(lsc.readonly) > 0 {
56+
clonedLogs := plog.NewLogs()
57+
ld.CopyTo(clonedLogs)
58+
errs = multierr.Append(errs, lc.ConsumeLogs(ctx, clonedLogs))
59+
} else {
60+
errs = multierr.Append(errs, lc.ConsumeLogs(ctx, ld))
61+
}
6962
}
70-
for _, lc := range lsc.pass {
63+
64+
// Mark the data as read-only if it was sent to more than one read-only consumer.
65+
if len(lsc.readonly) > 1 {
66+
ld.MarkReadOnly()
67+
}
68+
for _, lc := range lsc.readonly {
7169
errs = multierr.Append(errs, lc.ConsumeLogs(ctx, ld))
7270
}
71+
7372
return errs
7473
}
7574

internal/fanoutconsumer/logs_test.go

Lines changed: 49 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,6 @@ import (
2020
"go.opentelemetry.io/collector/pdata/plog"
2121
)
2222

23-
func TestLogsNotMultiplexing(t *testing.T) {
24-
nop := consumertest.NewNop()
25-
lfc := NewLogs([]consumer.Logs{nop})
26-
assert.Same(t, nop, lfc)
27-
}
28-
2923
func TestLogsMultiplexingNonMutating(t *testing.T) {
3024
p1 := new(consumertest.LogsSink)
3125
p2 := new(consumertest.LogsSink)
@@ -57,6 +51,9 @@ func TestLogsMultiplexingNonMutating(t *testing.T) {
5751
assert.True(t, ld == p3.AllLogs()[1])
5852
assert.EqualValues(t, ld, p3.AllLogs()[0])
5953
assert.EqualValues(t, ld, p3.AllLogs()[1])
54+
55+
// The data should be marked as read only.
56+
assert.True(t, ld.IsReadOnly())
6057
}
6158

6259
func TestLogsMultiplexingMutating(t *testing.T) {
@@ -91,6 +88,46 @@ func TestLogsMultiplexingMutating(t *testing.T) {
9188
assert.True(t, ld == p3.AllLogs()[1])
9289
assert.EqualValues(t, ld, p3.AllLogs()[0])
9390
assert.EqualValues(t, ld, p3.AllLogs()[1])
91+
92+
// The data should not be marked as read only.
93+
assert.False(t, ld.IsReadOnly())
94+
}
95+
96+
func TestReadOnlyLogsMultiplexingMutating(t *testing.T) {
97+
p1 := &mutatingLogsSink{LogsSink: new(consumertest.LogsSink)}
98+
p2 := &mutatingLogsSink{LogsSink: new(consumertest.LogsSink)}
99+
p3 := &mutatingLogsSink{LogsSink: new(consumertest.LogsSink)}
100+
101+
lfc := NewLogs([]consumer.Logs{p1, p2, p3})
102+
assert.False(t, lfc.Capabilities().MutatesData)
103+
ldOrig := testdata.GenerateLogs(1)
104+
ld := testdata.GenerateLogs(1)
105+
ld.MarkReadOnly()
106+
107+
for i := 0; i < 2; i++ {
108+
err := lfc.ConsumeLogs(context.Background(), ld)
109+
if err != nil {
110+
t.Errorf("Wanted nil got error")
111+
return
112+
}
113+
}
114+
115+
// All consumers should receive the cloned data.
116+
117+
assert.True(t, ld != p1.AllLogs()[0])
118+
assert.True(t, ld != p1.AllLogs()[1])
119+
assert.EqualValues(t, ldOrig, p1.AllLogs()[0])
120+
assert.EqualValues(t, ldOrig, p1.AllLogs()[1])
121+
122+
assert.True(t, ld != p2.AllLogs()[0])
123+
assert.True(t, ld != p2.AllLogs()[1])
124+
assert.EqualValues(t, ldOrig, p2.AllLogs()[0])
125+
assert.EqualValues(t, ldOrig, p2.AllLogs()[1])
126+
127+
assert.True(t, ld != p3.AllLogs()[0])
128+
assert.True(t, ld != p3.AllLogs()[1])
129+
assert.EqualValues(t, ldOrig, p3.AllLogs()[0])
130+
assert.EqualValues(t, ldOrig, p3.AllLogs()[1])
94131
}
95132

96133
func TestLogsMultiplexingMixLastMutating(t *testing.T) {
@@ -126,6 +163,9 @@ func TestLogsMultiplexingMixLastMutating(t *testing.T) {
126163
assert.True(t, ld != p3.AllLogs()[1])
127164
assert.EqualValues(t, ld, p3.AllLogs()[0])
128165
assert.EqualValues(t, ld, p3.AllLogs()[1])
166+
167+
// The data should not be marked as read only.
168+
assert.False(t, ld.IsReadOnly())
129169
}
130170

131171
func TestLogsMultiplexingMixLastNonMutating(t *testing.T) {
@@ -160,6 +200,9 @@ func TestLogsMultiplexingMixLastNonMutating(t *testing.T) {
160200
assert.True(t, ld == p3.AllLogs()[1])
161201
assert.EqualValues(t, ld, p3.AllLogs()[0])
162202
assert.EqualValues(t, ld, p3.AllLogs()[1])
203+
204+
// The data should not be marked as read only.
205+
assert.False(t, ld.IsReadOnly())
163206
}
164207

165208
func TestLogsWhenErrors(t *testing.T) {

internal/fanoutconsumer/metrics.go

Lines changed: 30 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -18,36 +18,22 @@ import (
1818
// NewMetrics wraps multiple metrics consumers in a single one.
1919
// It fanouts the incoming data to all the consumers, and does smart routing:
2020
// - Clones only to the consumer that needs to mutate the data.
21-
// - If all consumers needs to mutate the data one will get the original data.
21+
// - If all consumers needs to mutate the data one will get the original mutable data.
2222
func NewMetrics(mcs []consumer.Metrics) consumer.Metrics {
23-
if len(mcs) == 1 {
24-
// Don't wrap if no need to do it.
25-
return mcs[0]
26-
}
27-
var pass []consumer.Metrics
28-
var clone []consumer.Metrics
29-
for i := 0; i < len(mcs)-1; i++ {
30-
if !mcs[i].Capabilities().MutatesData {
31-
pass = append(pass, mcs[i])
23+
mc := &metricsConsumer{}
24+
for i := 0; i < len(mcs); i++ {
25+
if mcs[i].Capabilities().MutatesData {
26+
mc.mutable = append(mc.mutable, mcs[i])
3227
} else {
33-
clone = append(clone, mcs[i])
28+
mc.readonly = append(mc.readonly, mcs[i])
3429
}
3530
}
36-
// Give the original data to the last consumer if no other read-only consumer,
37-
// otherwise put it in the right bucket. Never share the same data between
38-
// a mutating and a non-mutating consumer since the non-mutating consumer may process
39-
// data async and the mutating consumer may change the data before that.
40-
if len(pass) == 0 || !mcs[len(mcs)-1].Capabilities().MutatesData {
41-
pass = append(pass, mcs[len(mcs)-1])
42-
} else {
43-
clone = append(clone, mcs[len(mcs)-1])
44-
}
45-
return &metricsConsumer{pass: pass, clone: clone}
31+
return mc
4632
}
4733

4834
type metricsConsumer struct {
49-
pass []consumer.Metrics
50-
clone []consumer.Metrics
35+
mutable []consumer.Metrics
36+
readonly []consumer.Metrics
5137
}
5238

5339
func (msc *metricsConsumer) Capabilities() consumer.Capabilities {
@@ -57,17 +43,30 @@ func (msc *metricsConsumer) Capabilities() consumer.Capabilities {
5743
// ConsumeMetrics exports the pmetric.Metrics to all consumers wrapped by the current one.
5844
func (msc *metricsConsumer) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error {
5945
var errs error
60-
// Initially pass to clone exporter to avoid the case where the optimization of sending
61-
// the incoming data to a mutating consumer is used that may change the incoming data before
62-
// cloning.
63-
for _, mc := range msc.clone {
64-
clonedMetrics := pmetric.NewMetrics()
65-
md.CopyTo(clonedMetrics)
66-
errs = multierr.Append(errs, mc.ConsumeMetrics(ctx, clonedMetrics))
46+
47+
// Clone the data before sending to mutable consumers.
48+
// The only exception is the last consumer which is allowed to mutate the data only if there are no
49+
// other non-mutating consumers and the data is mutable. Never share the same data between
50+
// a mutating and a non-mutating consumer since the non-mutating consumer may process
51+
// data async and the mutating consumer may change the data before that.
52+
for i, mc := range msc.mutable {
53+
if i < len(msc.mutable)-1 || md.IsReadOnly() || len(msc.readonly) > 0 {
54+
clonedMetrics := pmetric.NewMetrics()
55+
md.CopyTo(clonedMetrics)
56+
errs = multierr.Append(errs, mc.ConsumeMetrics(ctx, clonedMetrics))
57+
} else {
58+
errs = multierr.Append(errs, mc.ConsumeMetrics(ctx, md))
59+
}
6760
}
68-
for _, mc := range msc.pass {
61+
62+
// Mark the data as read-only if it was sent to more than one read-only consumer.
63+
if len(msc.readonly) > 1 {
64+
md.MarkReadOnly()
65+
}
66+
for _, mc := range msc.readonly {
6967
errs = multierr.Append(errs, mc.ConsumeMetrics(ctx, md))
7068
}
69+
7170
return errs
7271
}
7372

0 commit comments

Comments
 (0)