Skip to content

Commit b13bc9d

Browse files
authored
Move fanout consumers to fanoutconsumer package (#2615)
Signed-off-by: Bogdan Drutu <[email protected]>
1 parent 2688af9 commit b13bc9d

File tree

7 files changed

+84
-79
lines changed

7 files changed

+84
-79
lines changed

CHANGELOG.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,13 @@
22

33
## Unreleased
44

5+
## 🛑 Breaking changes 🛑
6+
- Move fanout consumers to fanoutconsumer package (#2615)
7+
8+
## 💡 Enhancements 💡
9+
10+
## 🧰 Bug fixes 🧰
11+
512
## v0.22.0 Beta
613

714
## 🛑 Breaking changes 🛑

processor/cloningfanoutconnector.go renamed to consumer/fanoutconsumer/cloningconsumer.go

Lines changed: 23 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
package processor
15+
package fanoutconsumer
1616

1717
import (
1818
"context"
@@ -22,27 +22,22 @@ import (
2222
"go.opentelemetry.io/collector/consumer/pdata"
2323
)
2424

25-
// This file contains implementations of cloning Trace/Metrics connectors
26-
// that fan out the data to multiple other consumers. Cloning connectors create
27-
// clones of data before fanning out, which ensures each consumer gets their
28-
// own copy of data and is free to modify it.
29-
30-
// NewMetricsCloningFanOutConnector wraps multiple metrics consumers in a single one and clones the data
25+
// NewMetricsCloning wraps multiple metrics consumers in a single one and clones the data
3126
// before fanning out.
32-
func NewMetricsCloningFanOutConnector(mcs []consumer.MetricsConsumer) consumer.MetricsConsumer {
27+
func NewMetricsCloning(mcs []consumer.MetricsConsumer) consumer.MetricsConsumer {
3328
if len(mcs) == 1 {
3429
// Don't wrap if no need to do it.
3530
return mcs[0]
3631
}
37-
return metricsCloningFanOutConnector(mcs)
32+
return metricsCloningConsumer(mcs)
3833
}
3934

40-
type metricsCloningFanOutConnector []consumer.MetricsConsumer
35+
type metricsCloningConsumer []consumer.MetricsConsumer
4136

42-
var _ consumer.MetricsConsumer = (*metricsCloningFanOutConnector)(nil)
37+
var _ consumer.MetricsConsumer = (*metricsCloningConsumer)(nil)
4338

44-
// ConsumeMetrics exports the MetricsData to all consumers wrapped by the current one.
45-
func (mfc metricsCloningFanOutConnector) ConsumeMetrics(ctx context.Context, md pdata.Metrics) error {
39+
// ConsumeMetrics exports the pdata.Metrics to all consumers wrapped by the current one.
40+
func (mfc metricsCloningConsumer) ConsumeMetrics(ctx context.Context, md pdata.Metrics) error {
4641
var errs []error
4742

4843
// Fan out to first len-1 consumers.
@@ -64,22 +59,22 @@ func (mfc metricsCloningFanOutConnector) ConsumeMetrics(ctx context.Context, md
6459
return consumererror.CombineErrors(errs)
6560
}
6661

67-
// NewTracesCloningFanOutConnector wraps multiple traces consumers in a single one and clones the data
62+
// NewTracesCloning wraps multiple traces consumers in a single one and clones the data
6863
// before fanning out.
69-
func NewTracesCloningFanOutConnector(tcs []consumer.TracesConsumer) consumer.TracesConsumer {
64+
func NewTracesCloning(tcs []consumer.TracesConsumer) consumer.TracesConsumer {
7065
if len(tcs) == 1 {
7166
// Don't wrap if no need to do it.
7267
return tcs[0]
7368
}
74-
return tracesCloningFanOutConnector(tcs)
69+
return tracesCloningConsumer(tcs)
7570
}
7671

77-
type tracesCloningFanOutConnector []consumer.TracesConsumer
72+
type tracesCloningConsumer []consumer.TracesConsumer
7873

79-
var _ consumer.TracesConsumer = (*tracesCloningFanOutConnector)(nil)
74+
var _ consumer.TracesConsumer = (*tracesCloningConsumer)(nil)
8075

81-
// ConsumeTraces exports the span data to all trace consumers wrapped by the current one.
82-
func (tfc tracesCloningFanOutConnector) ConsumeTraces(ctx context.Context, td pdata.Traces) error {
76+
// ConsumeTraces exports the pdata.Traces to all consumers wrapped by the current one.
77+
func (tfc tracesCloningConsumer) ConsumeTraces(ctx context.Context, td pdata.Traces) error {
8378
var errs []error
8479

8580
// Fan out to first len-1 consumers.
@@ -101,21 +96,22 @@ func (tfc tracesCloningFanOutConnector) ConsumeTraces(ctx context.Context, td pd
10196
return consumererror.CombineErrors(errs)
10297
}
10398

104-
// NewLogsCloningFanOutConnector wraps multiple trace consumers in a single one.
105-
func NewLogsCloningFanOutConnector(lcs []consumer.LogsConsumer) consumer.LogsConsumer {
99+
// NewLogsCloning wraps multiple trace consumers in a single one and clones the data
100+
// before fanning out.
101+
func NewLogsCloning(lcs []consumer.LogsConsumer) consumer.LogsConsumer {
106102
if len(lcs) == 1 {
107103
// Don't wrap if no need to do it.
108104
return lcs[0]
109105
}
110-
return logsCloningFanOutConnector(lcs)
106+
return logsCloningConsumer(lcs)
111107
}
112108

113-
type logsCloningFanOutConnector []consumer.LogsConsumer
109+
type logsCloningConsumer []consumer.LogsConsumer
114110

115-
var _ consumer.LogsConsumer = (*logsCloningFanOutConnector)(nil)
111+
var _ consumer.LogsConsumer = (*logsCloningConsumer)(nil)
116112

117-
// ConsumeLogs exports the log data to all consumers wrapped by the current one.
118-
func (lfc logsCloningFanOutConnector) ConsumeLogs(ctx context.Context, ld pdata.Logs) error {
113+
// ConsumeLogs exports the pdata.Logs to all consumers wrapped by the current one.
114+
func (lfc logsCloningConsumer) ConsumeLogs(ctx context.Context, ld pdata.Logs) error {
119115
var errs []error
120116

121117
// Fan out to first len-1 consumers.

processor/cloningfanoutconnector_test.go renamed to consumer/fanoutconsumer/cloningconsumer_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
package processor
15+
package fanoutconsumer
1616

1717
import (
1818
"context"
@@ -27,7 +27,7 @@ import (
2727

2828
func TestTraceProcessorCloningNotMultiplexing(t *testing.T) {
2929
nop := consumertest.NewTracesNop()
30-
tfc := NewTracesCloningFanOutConnector([]consumer.TracesConsumer{nop})
30+
tfc := NewTracesCloning([]consumer.TracesConsumer{nop})
3131
assert.Same(t, nop, tfc)
3232
}
3333

@@ -37,7 +37,7 @@ func TestTraceProcessorCloningMultiplexing(t *testing.T) {
3737
processors[i] = new(consumertest.TracesSink)
3838
}
3939

40-
tfc := NewTracesCloningFanOutConnector(processors)
40+
tfc := NewTracesCloning(processors)
4141
td := testdata.GenerateTraceDataTwoSpansSameResource()
4242

4343
var wantSpansCount = 0
@@ -70,7 +70,7 @@ func TestTraceProcessorCloningMultiplexing(t *testing.T) {
7070

7171
func TestMetricsProcessorCloningNotMultiplexing(t *testing.T) {
7272
nop := consumertest.NewMetricsNop()
73-
mfc := NewMetricsFanOutConnector([]consumer.MetricsConsumer{nop})
73+
mfc := NewMetrics([]consumer.MetricsConsumer{nop})
7474
assert.Same(t, nop, mfc)
7575
}
7676

@@ -80,7 +80,7 @@ func TestMetricsProcessorCloningMultiplexing(t *testing.T) {
8080
processors[i] = new(consumertest.MetricsSink)
8181
}
8282

83-
mfc := NewMetricsCloningFanOutConnector(processors)
83+
mfc := NewMetricsCloning(processors)
8484
md := testdata.GeneratMetricsAllTypesWithSampleDatapoints()
8585

8686
var wantMetricsCount = 0
@@ -113,7 +113,7 @@ func TestMetricsProcessorCloningMultiplexing(t *testing.T) {
113113

114114
func TestLogsProcessorCloningNotMultiplexing(t *testing.T) {
115115
nop := consumertest.NewLogsNop()
116-
lfc := NewLogsCloningFanOutConnector([]consumer.LogsConsumer{nop})
116+
lfc := NewLogsCloning([]consumer.LogsConsumer{nop})
117117
assert.Same(t, nop, lfc)
118118
}
119119

@@ -123,7 +123,7 @@ func TestLogsProcessorCloningMultiplexing(t *testing.T) {
123123
processors[i] = new(consumertest.LogsSink)
124124
}
125125

126-
mfc := NewLogsCloningFanOutConnector(processors)
126+
mfc := NewLogsCloning(processors)
127127
ld := testdata.GenerateLogDataOneLog()
128128

129129
var wantMetricsCount = 0

processor/fanoutconnector.go renamed to consumer/fanoutconsumer/consumer.go

Lines changed: 26 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,12 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
package processor
15+
// Package fanoutconsumer contains implementations of Traces/Metrics/Logs consumers
16+
// that fan out the data to multiple other consumers.
17+
//
18+
// Cloning connectors create clones of data before fanning out, which ensures each
19+
// consumer gets their own copy of data and is free to modify it.
20+
package fanoutconsumer
1621

1722
import (
1823
"context"
@@ -22,24 +27,21 @@ import (
2227
"go.opentelemetry.io/collector/consumer/pdata"
2328
)
2429

25-
// This file contains implementations of Trace/Metrics connectors
26-
// that fan out the data to multiple other consumers.
27-
28-
// NewMetricsFanOutConnector wraps multiple metrics consumers in a single one.
29-
func NewMetricsFanOutConnector(mcs []consumer.MetricsConsumer) consumer.MetricsConsumer {
30+
// NewMetrics wraps multiple metrics consumers in a single one.
31+
func NewMetrics(mcs []consumer.MetricsConsumer) consumer.MetricsConsumer {
3032
if len(mcs) == 1 {
3133
// Don't wrap if no need to do it.
3234
return mcs[0]
3335
}
34-
return metricsFanOutConnector(mcs)
36+
return metricsConsumer(mcs)
3537
}
3638

37-
type metricsFanOutConnector []consumer.MetricsConsumer
39+
type metricsConsumer []consumer.MetricsConsumer
3840

39-
var _ consumer.MetricsConsumer = (*metricsFanOutConnector)(nil)
41+
var _ consumer.MetricsConsumer = (*metricsConsumer)(nil)
4042

4143
// ConsumeMetrics exports the pdata.Metrics to all consumers wrapped by the current one.
42-
func (mfc metricsFanOutConnector) ConsumeMetrics(ctx context.Context, md pdata.Metrics) error {
44+
func (mfc metricsConsumer) ConsumeMetrics(ctx context.Context, md pdata.Metrics) error {
4345
var errs []error
4446
for _, mc := range mfc {
4547
if err := mc.ConsumeMetrics(ctx, md); err != nil {
@@ -49,21 +51,21 @@ func (mfc metricsFanOutConnector) ConsumeMetrics(ctx context.Context, md pdata.M
4951
return consumererror.CombineErrors(errs)
5052
}
5153

52-
// NewTracesFanOutConnector wraps multiple trace consumers in a single one.
53-
func NewTracesFanOutConnector(tcs []consumer.TracesConsumer) consumer.TracesConsumer {
54+
// NewTraces wraps multiple trace consumers in a single one.
55+
func NewTraces(tcs []consumer.TracesConsumer) consumer.TracesConsumer {
5456
if len(tcs) == 1 {
5557
// Don't wrap if no need to do it.
5658
return tcs[0]
5759
}
58-
return traceFanOutConnector(tcs)
60+
return traceConsumer(tcs)
5961
}
6062

61-
type traceFanOutConnector []consumer.TracesConsumer
63+
type traceConsumer []consumer.TracesConsumer
6264

63-
var _ consumer.TracesConsumer = (*traceFanOutConnector)(nil)
65+
var _ consumer.TracesConsumer = (*traceConsumer)(nil)
6466

65-
// ConsumeTraces exports the span data to all trace consumers wrapped by the current one.
66-
func (tfc traceFanOutConnector) ConsumeTraces(ctx context.Context, td pdata.Traces) error {
67+
// ConsumeTraces exports the pdata.Traces to all consumers wrapped by the current one.
68+
func (tfc traceConsumer) ConsumeTraces(ctx context.Context, td pdata.Traces) error {
6769
var errs []error
6870
for _, tc := range tfc {
6971
if err := tc.ConsumeTraces(ctx, td); err != nil {
@@ -73,21 +75,21 @@ func (tfc traceFanOutConnector) ConsumeTraces(ctx context.Context, td pdata.Trac
7375
return consumererror.CombineErrors(errs)
7476
}
7577

76-
// NewLogsFanOutConnector wraps multiple log consumers in a single one.
77-
func NewLogsFanOutConnector(lcs []consumer.LogsConsumer) consumer.LogsConsumer {
78+
// NewLogs wraps multiple log consumers in a single one.
79+
func NewLogs(lcs []consumer.LogsConsumer) consumer.LogsConsumer {
7880
if len(lcs) == 1 {
7981
// Don't wrap if no need to do it.
8082
return lcs[0]
8183
}
82-
return logsFanOutConnector(lcs)
84+
return logsConsumer(lcs)
8385
}
8486

85-
type logsFanOutConnector []consumer.LogsConsumer
87+
type logsConsumer []consumer.LogsConsumer
8688

87-
var _ consumer.LogsConsumer = (*logsFanOutConnector)(nil)
89+
var _ consumer.LogsConsumer = (*logsConsumer)(nil)
8890

89-
// Consume exports the log data to all consumers wrapped by the current one.
90-
func (fc logsFanOutConnector) ConsumeLogs(ctx context.Context, ld pdata.Logs) error {
91+
// ConsumeLogs exports the pdata.Logs to all consumers wrapped by the current one.
92+
func (fc logsConsumer) ConsumeLogs(ctx context.Context, ld pdata.Logs) error {
9193
var errs []error
9294
for _, tc := range fc {
9395
if err := tc.ConsumeLogs(ctx, ld); err != nil {

processor/fanoutconnector_test.go renamed to consumer/fanoutconsumer/consumer_test.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
package processor
15+
package fanoutconsumer
1616

1717
import (
1818
"context"
@@ -28,7 +28,7 @@ import (
2828

2929
func TestTracesProcessorNotMultiplexing(t *testing.T) {
3030
nop := consumertest.NewTracesNop()
31-
tfc := NewTracesFanOutConnector([]consumer.TracesConsumer{nop})
31+
tfc := NewTraces([]consumer.TracesConsumer{nop})
3232
assert.Same(t, nop, tfc)
3333
}
3434

@@ -38,7 +38,7 @@ func TestTracesProcessorMultiplexing(t *testing.T) {
3838
processors[i] = new(consumertest.TracesSink)
3939
}
4040

41-
tfc := NewTracesFanOutConnector(processors)
41+
tfc := NewTraces(processors)
4242
td := testdata.GenerateTraceDataOneSpan()
4343

4444
var wantSpansCount = 0
@@ -67,7 +67,7 @@ func TestTraceProcessorWhenOneErrors(t *testing.T) {
6767
// Make one processor return error
6868
processors[1] = consumertest.NewTracesErr(errors.New("my error"))
6969

70-
tfc := NewTracesFanOutConnector(processors)
70+
tfc := NewTraces(processors)
7171
td := testdata.GenerateTraceDataOneSpan()
7272

7373
var wantSpansCount = 0
@@ -82,7 +82,7 @@ func TestTraceProcessorWhenOneErrors(t *testing.T) {
8282

8383
func TestMetricsProcessorNotMultiplexing(t *testing.T) {
8484
nop := consumertest.NewMetricsNop()
85-
mfc := NewMetricsFanOutConnector([]consumer.MetricsConsumer{nop})
85+
mfc := NewMetrics([]consumer.MetricsConsumer{nop})
8686
assert.Same(t, nop, mfc)
8787
}
8888

@@ -92,7 +92,7 @@ func TestMetricsProcessorMultiplexing(t *testing.T) {
9292
processors[i] = new(consumertest.MetricsSink)
9393
}
9494

95-
mfc := NewMetricsFanOutConnector(processors)
95+
mfc := NewMetrics(processors)
9696
md := testdata.GenerateMetricsOneMetric()
9797

9898
var wantMetricsCount = 0
@@ -121,7 +121,7 @@ func TestMetricsProcessorWhenOneErrors(t *testing.T) {
121121
// Make one processor return error
122122
processors[1] = consumertest.NewMetricsErr(errors.New("my error"))
123123

124-
mfc := NewMetricsFanOutConnector(processors)
124+
mfc := NewMetrics(processors)
125125
md := testdata.GenerateMetricsOneMetric()
126126

127127
var wantMetricsCount = 0
@@ -136,7 +136,7 @@ func TestMetricsProcessorWhenOneErrors(t *testing.T) {
136136

137137
func TestLogsProcessorNotMultiplexing(t *testing.T) {
138138
nop := consumertest.NewLogsNop()
139-
lfc := NewLogsFanOutConnector([]consumer.LogsConsumer{nop})
139+
lfc := NewLogs([]consumer.LogsConsumer{nop})
140140
assert.Same(t, nop, lfc)
141141
}
142142

@@ -146,7 +146,7 @@ func TestLogsProcessorMultiplexing(t *testing.T) {
146146
processors[i] = new(consumertest.LogsSink)
147147
}
148148

149-
lfc := NewLogsFanOutConnector(processors)
149+
lfc := NewLogs(processors)
150150
ld := testdata.GenerateLogDataOneLog()
151151

152152
var wantMetricsCount = 0
@@ -175,7 +175,7 @@ func TestLogsProcessorWhenOneErrors(t *testing.T) {
175175
// Make one processor return error
176176
processors[1] = consumertest.NewLogsErr(errors.New("my error"))
177177

178-
lfc := NewLogsFanOutConnector(processors)
178+
lfc := NewLogs(processors)
179179
ld := testdata.GenerateLogDataOneLog()
180180

181181
var wantMetricsCount = 0

0 commit comments

Comments
 (0)