Skip to content

Commit 402b80c

Browse files
Add Capabilities to Processor and use for Fanout cloning decision (#374)
This is part 1 of the task to introduce capabilities and declare processor's intent to mutate or not mutate consumed data for the purpose of optimizing pipeline data ownership. If a processor declares that they mutate data the pipeline that the processor is in will use cloning fan out connector. This is done only if the pipeline shares a receiver with another pipeline. This ensures that it is safe for processor modify the data (because pipelines work concurrently) and avoids cloning for pipelines that consume data from the same receiver but do not modify it. For more details see: #372
1 parent b27d824 commit 402b80c

File tree

18 files changed

+440
-200
lines changed

18 files changed

+440
-200
lines changed

consumer/consumer.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,3 @@ type MetricsConsumer interface {
3636
type TraceConsumer interface {
3737
ConsumeTraceData(ctx context.Context, td consumerdata.TraceData) error
3838
}
39-
40-
// DataConsumer is a union type that can accept traces and/or metrics.
41-
type DataConsumer interface {
42-
TraceConsumer
43-
MetricsConsumer
44-
}

processor/attributesprocessor/attributes.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,10 @@ func (a *attributesProcessor) ConsumeTraceData(ctx context.Context, td consumerd
122122
return a.nextConsumer.ConsumeTraceData(ctx, td)
123123
}
124124

125+
func (a *attributesProcessor) GetCapabilities() processor.Capabilities {
126+
return processor.Capabilities{MutatesConsumedData: true}
127+
}
128+
125129
func insertAttribute(action attributeAction, attributesMap map[string]*tracepb.AttributeValue) {
126130
// Insert is only performed when the target key does not already exist
127131
// in the attribute map.
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
// Copyright 2019, OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package processor
16+
17+
import (
18+
"context"
19+
20+
commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1"
21+
metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1"
22+
resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1"
23+
tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1"
24+
"github.com/golang/protobuf/proto"
25+
26+
"github.com/open-telemetry/opentelemetry-collector/consumer"
27+
"github.com/open-telemetry/opentelemetry-collector/consumer/consumerdata"
28+
"github.com/open-telemetry/opentelemetry-collector/oterr"
29+
)
30+
31+
// This file contains implementations of cloning Trace/Metrics connectors
32+
// that fan out the data to multiple other consumers. Cloning connectors create
33+
// clones of data before fanning out, which ensures each consumer gets their
34+
// own copy of data and is free to modify it.
35+
36+
// NewMetricsCloningFanOutConnector wraps multiple metrics consumers in a single one.
37+
func NewMetricsCloningFanOutConnector(mcs []consumer.MetricsConsumer) consumer.MetricsConsumer {
38+
return metricsCloningFanOutConnector(mcs)
39+
}
40+
41+
type metricsCloningFanOutConnector []consumer.MetricsConsumer
42+
43+
var _ consumer.MetricsConsumer = (*metricsCloningFanOutConnector)(nil)
44+
45+
// ConsumeMetricsData exports the MetricsData to all consumers wrapped by the current one.
46+
func (mfc metricsCloningFanOutConnector) ConsumeMetricsData(ctx context.Context, md consumerdata.MetricsData) error {
47+
var errs []error
48+
49+
// Fan out to first len-1 consumers.
50+
for i := 0; i < len(mfc)-1; i++ {
51+
// Create a clone of data. We need to clone because consumers may modify the data.
52+
clone := cloneMetricsData(&md)
53+
if err := mfc[i].ConsumeMetricsData(ctx, *clone); err != nil {
54+
errs = append(errs, err)
55+
}
56+
}
57+
58+
if len(mfc) > 0 {
59+
// Give the original data to the last consumer.
60+
lastTc := mfc[len(mfc)-1]
61+
if err := lastTc.ConsumeMetricsData(ctx, md); err != nil {
62+
errs = append(errs, err)
63+
}
64+
}
65+
66+
return oterr.CombineErrors(errs)
67+
}
68+
69+
// NewTraceCloningFanOutConnector wraps multiple trace consumers in a single one.
70+
func NewTraceCloningFanOutConnector(tcs []consumer.TraceConsumer) consumer.TraceConsumer {
71+
return traceCloningFanOutConnector(tcs)
72+
}
73+
74+
type traceCloningFanOutConnector []consumer.TraceConsumer
75+
76+
var _ consumer.TraceConsumer = (*traceCloningFanOutConnector)(nil)
77+
78+
// ConsumeTraceData exports the span data to all trace consumers wrapped by the current one.
79+
func (tfc traceCloningFanOutConnector) ConsumeTraceData(ctx context.Context, td consumerdata.TraceData) error {
80+
var errs []error
81+
82+
// Fan out to first len-1 consumers.
83+
for i := 0; i < len(tfc)-1; i++ {
84+
// Create a clone of data. We need to clone because consumers may modify the data.
85+
clone := cloneTraceData(&td)
86+
if err := tfc[i].ConsumeTraceData(ctx, *clone); err != nil {
87+
errs = append(errs, err)
88+
}
89+
}
90+
91+
if len(tfc) > 0 {
92+
// Give the original data to the last consumer.
93+
lastTc := tfc[len(tfc)-1]
94+
if err := lastTc.ConsumeTraceData(ctx, td); err != nil {
95+
errs = append(errs, err)
96+
}
97+
}
98+
99+
return oterr.CombineErrors(errs)
100+
}
101+
102+
func cloneTraceData(td *consumerdata.TraceData) *consumerdata.TraceData {
103+
clone := &consumerdata.TraceData{
104+
SourceFormat: td.SourceFormat,
105+
Node: proto.Clone(td.Node).(*commonpb.Node),
106+
Resource: proto.Clone(td.Resource).(*resourcepb.Resource),
107+
}
108+
109+
if td.Spans != nil {
110+
clone.Spans = make([]*tracepb.Span, 0, len(td.Spans))
111+
112+
for _, span := range td.Spans {
113+
spanClone := proto.Clone(span).(*tracepb.Span)
114+
clone.Spans = append(clone.Spans, spanClone)
115+
}
116+
}
117+
118+
return clone
119+
}
120+
121+
func cloneMetricsData(md *consumerdata.MetricsData) *consumerdata.MetricsData {
122+
clone := &consumerdata.MetricsData{
123+
Node: proto.Clone(md.Node).(*commonpb.Node),
124+
Resource: proto.Clone(md.Resource).(*resourcepb.Resource),
125+
}
126+
127+
if md.Metrics != nil {
128+
clone.Metrics = make([]*metricspb.Metric, 0, len(md.Metrics))
129+
130+
for _, metric := range md.Metrics {
131+
metricClone := proto.Clone(metric).(*metricspb.Metric)
132+
clone.Metrics = append(clone.Metrics, metricClone)
133+
}
134+
}
135+
136+
return clone
137+
}
Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
// Copyright 2019, OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package processor
16+
17+
import (
18+
"context"
19+
"math/rand"
20+
"strconv"
21+
"testing"
22+
23+
commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1"
24+
metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1"
25+
resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1"
26+
tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1"
27+
"github.com/stretchr/testify/assert"
28+
29+
"github.com/open-telemetry/opentelemetry-collector/consumer"
30+
"github.com/open-telemetry/opentelemetry-collector/consumer/consumerdata"
31+
)
32+
33+
func TestTraceProcessorCloningMultiplexing(t *testing.T) {
34+
processors := make([]consumer.TraceConsumer, 3)
35+
for i := range processors {
36+
processors[i] = &mockTraceConsumer{}
37+
}
38+
39+
tfc := NewTraceCloningFanOutConnector(processors)
40+
td := consumerdata.TraceData{
41+
Spans: make([]*tracepb.Span, 7),
42+
Resource: &resourcepb.Resource{
43+
Type: "testtype",
44+
},
45+
}
46+
47+
var wantSpansCount = 0
48+
for i := 0; i < 2; i++ {
49+
wantSpansCount += len(td.Spans)
50+
err := tfc.ConsumeTraceData(context.Background(), td)
51+
if err != nil {
52+
t.Errorf("Wanted nil got error")
53+
return
54+
}
55+
}
56+
57+
for i, p := range processors {
58+
m := p.(*mockTraceConsumer)
59+
assert.Equal(t, wantSpansCount, m.TotalSpans)
60+
if i < len(processors)-1 {
61+
assert.True(t, td.Resource != m.Traces[0].Resource)
62+
} else {
63+
assert.True(t, td.Resource == m.Traces[0].Resource)
64+
}
65+
assert.EqualValues(t, td.Resource, m.Traces[0].Resource)
66+
}
67+
}
68+
69+
func TestMetricsProcessorCloningMultiplexing(t *testing.T) {
70+
processors := make([]consumer.MetricsConsumer, 3)
71+
for i := range processors {
72+
processors[i] = &mockMetricsConsumer{}
73+
}
74+
75+
mfc := NewMetricsCloningFanOutConnector(processors)
76+
md := consumerdata.MetricsData{
77+
Metrics: make([]*metricspb.Metric, 7),
78+
Resource: &resourcepb.Resource{
79+
Type: "testtype",
80+
},
81+
}
82+
83+
var wantMetricsCount = 0
84+
for i := 0; i < 2; i++ {
85+
wantMetricsCount += len(md.Metrics)
86+
err := mfc.ConsumeMetricsData(context.Background(), md)
87+
if err != nil {
88+
t.Errorf("Wanted nil got error")
89+
return
90+
}
91+
}
92+
93+
for i, p := range processors {
94+
m := p.(*mockMetricsConsumer)
95+
assert.Equal(t, wantMetricsCount, m.TotalMetrics)
96+
if i < len(processors)-1 {
97+
assert.True(t, md.Resource != m.Metrics[0].Resource)
98+
} else {
99+
assert.True(t, md.Resource == m.Metrics[0].Resource)
100+
}
101+
assert.EqualValues(t, md.Resource, m.Metrics[0].Resource)
102+
}
103+
}
104+
105+
func Benchmark100SpanClone(b *testing.B) {
106+
107+
b.StopTimer()
108+
109+
name := tracepb.TruncatableString{Value: "testspanname"}
110+
traceData := &consumerdata.TraceData{
111+
SourceFormat: "test-source-format",
112+
Node: &commonpb.Node{
113+
ServiceInfo: &commonpb.ServiceInfo{
114+
Name: "servicename",
115+
},
116+
},
117+
Resource: &resourcepb.Resource{
118+
Type: "resourcetype",
119+
},
120+
}
121+
for i := 0; i < 100; i++ {
122+
span := &tracepb.Span{
123+
TraceId: genRandBytes(16),
124+
SpanId: genRandBytes(8),
125+
Name: &name,
126+
Attributes: &tracepb.Span_Attributes{
127+
AttributeMap: map[string]*tracepb.AttributeValue{},
128+
},
129+
}
130+
131+
for j := 0; j < 5; j++ {
132+
span.Attributes.AttributeMap["intattr"+strconv.Itoa(j)] = &tracepb.AttributeValue{
133+
Value: &tracepb.AttributeValue_IntValue{IntValue: int64(i)},
134+
}
135+
span.Attributes.AttributeMap["strattr"+strconv.Itoa(j)] = &tracepb.AttributeValue{
136+
Value: &tracepb.AttributeValue_StringValue{
137+
StringValue: &tracepb.TruncatableString{Value: string(genRandBytes(20))},
138+
},
139+
}
140+
}
141+
142+
traceData.Spans = append(traceData.Spans, span)
143+
}
144+
145+
b.StartTimer()
146+
147+
for i := 0; i < b.N; i++ {
148+
cloneTraceData(traceData)
149+
}
150+
}
151+
152+
func genRandBytes(len int) []byte {
153+
b := make([]byte, len)
154+
for i := range b {
155+
b[i] = byte(rand.Intn(256))
156+
}
157+
return b
158+
}

0 commit comments

Comments
 (0)