Skip to content

Commit 349f3c1

Browse files
committed
Update carbon and wavefront to the new interface
Signed-off-by: Bogdan Drutu <[email protected]>
1 parent 0ad332d commit 349f3c1

File tree

14 files changed

+128
-215
lines changed

14 files changed

+128
-215
lines changed

cmd/otelcontribcol/components.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,8 @@ func components() (component.Factories, error) {
7777
collectdreceiver.NewFactory(),
7878
sapmreceiver.NewFactory(),
7979
signalfxreceiver.NewFactory(),
80-
&carbonreceiver.Factory{},
81-
&wavefrontreceiver.Factory{},
80+
carbonreceiver.NewFactory(),
81+
wavefrontreceiver.NewFactory(),
8282
redisreceiver.NewFactory(),
8383
kubeletstatsreceiver.NewFactory(),
8484
simpleprometheusreceiver.NewFactory(),

receiver/carbonreceiver/config_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ func TestLoadConfig(t *testing.T) {
3333
factories, err := componenttest.ExampleComponents()
3434
assert.Nil(t, err)
3535

36-
factory := &Factory{}
36+
factory := NewFactory()
3737
factories.Receivers[configmodels.Type(typeStr)] = factory
3838
cfg, err := configtest.LoadConfigFile(
3939
t, path.Join(".", "testdata", "config.yaml"), factories,

receiver/carbonreceiver/factory.go

Lines changed: 41 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,10 @@ import (
2020

2121
"github.com/spf13/viper"
2222
"go.opentelemetry.io/collector/component"
23-
"go.opentelemetry.io/collector/config/configerror"
2423
"go.opentelemetry.io/collector/config/configmodels"
2524
"go.opentelemetry.io/collector/config/confignet"
2625
"go.opentelemetry.io/collector/consumer"
27-
"go.uber.org/zap"
26+
"go.opentelemetry.io/collector/receiver/receiverhelper"
2827

2928
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/carbonreceiver/protocol"
3029
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/carbonreceiver/transport"
@@ -37,57 +36,50 @@ const (
3736
typeStr = "carbon"
3837
)
3938

40-
// Factory is the factory for carbon receiver.
41-
type Factory struct {
39+
// NewFactory creates a factory for Carbon receiver.
40+
func NewFactory() component.ReceiverFactory {
41+
return receiverhelper.NewFactory(
42+
typeStr,
43+
createDefaultConfig,
44+
receiverhelper.WithCustomUnmarshaler(customUnmarshaler),
45+
receiverhelper.WithMetrics(createMetricsReceiver))
4246
}
4347

44-
var _ component.ReceiverFactoryOld = (*Factory)(nil)
45-
46-
// Type gets the type of the Receiver config created by this factory.
47-
func (f *Factory) Type() configmodels.Type {
48-
return configmodels.Type(typeStr)
49-
}
50-
51-
// CustomUnmarshaler returns the custom function to handle the special settings
52-
// used on the receiver.
53-
func (f *Factory) CustomUnmarshaler() component.CustomUnmarshaler {
54-
return func(sourceViperSection *viper.Viper, intoCfg interface{}) error {
55-
if sourceViperSection == nil {
56-
// The section is empty nothing to do, using the default config.
57-
return nil
58-
}
59-
60-
// Unmarshal but not exact yet so the different keys under config do not
61-
// trigger errors, this is needed so that the types of protocol and transport
62-
// are read.
63-
if err := sourceViperSection.Unmarshal(intoCfg); err != nil {
64-
return err
65-
}
48+
func customUnmarshaler(sourceViperSection *viper.Viper, intoCfg interface{}) error {
49+
if sourceViperSection == nil {
50+
// The section is empty nothing to do, using the default config.
51+
return nil
52+
}
6653

67-
// Unmarshal the protocol, so the type of config can be properly set.
68-
rCfg := intoCfg.(*Config)
69-
vParserCfg := sourceViperSection.Sub(parserConfigSection)
70-
if vParserCfg != nil {
71-
if err := protocol.LoadParserConfig(vParserCfg, rCfg.Parser); err != nil {
72-
return fmt.Errorf(
73-
"error on %q section for %s: %v",
74-
parserConfigSection,
75-
rCfg.Name(),
76-
err)
77-
}
78-
}
54+
// Unmarshal but not exact yet so the different keys under config do not
55+
// trigger errors, this is needed so that the types of protocol and transport
56+
// are read.
57+
if err := sourceViperSection.Unmarshal(intoCfg); err != nil {
58+
return err
59+
}
7960

80-
// Unmarshal exact to validate the config keys.
81-
if err := sourceViperSection.UnmarshalExact(intoCfg); err != nil {
82-
return err
61+
// Unmarshal the protocol, so the type of config can be properly set.
62+
rCfg := intoCfg.(*Config)
63+
vParserCfg := sourceViperSection.Sub(parserConfigSection)
64+
if vParserCfg != nil {
65+
if err := protocol.LoadParserConfig(vParserCfg, rCfg.Parser); err != nil {
66+
return fmt.Errorf(
67+
"error on %q section for %s: %v",
68+
parserConfigSection,
69+
rCfg.Name(),
70+
err)
8371
}
72+
}
8473

85-
return nil
74+
// Unmarshal exact to validate the config keys.
75+
if err := sourceViperSection.UnmarshalExact(intoCfg); err != nil {
76+
return err
8677
}
78+
79+
return nil
8780
}
8881

89-
// CreateDefaultConfig creates the default configuration for Carbon receiver.
90-
func (f *Factory) CreateDefaultConfig() configmodels.Receiver {
82+
func createDefaultConfig() configmodels.Receiver {
9183
return &Config{
9284
ReceiverSettings: configmodels.ReceiverSettings{
9385
TypeVal: configmodels.Type(typeStr),
@@ -105,25 +97,13 @@ func (f *Factory) CreateDefaultConfig() configmodels.Receiver {
10597
}
10698
}
10799

108-
// CreateTraceReceiver creates a trace receiver based on provided config.
109-
func (f *Factory) CreateTraceReceiver(
110-
ctx context.Context,
111-
logger *zap.Logger,
112-
cfg configmodels.Receiver,
113-
consumer consumer.TraceConsumerOld,
114-
) (component.TraceReceiver, error) {
115-
116-
return nil, configerror.ErrDataTypeIsNotSupported
117-
}
118-
119-
// CreateMetricsReceiver creates a metrics receiver based on provided config.
120-
func (f *Factory) CreateMetricsReceiver(
121-
ctx context.Context,
122-
logger *zap.Logger,
100+
func createMetricsReceiver(
101+
_ context.Context,
102+
params component.ReceiverCreateParams,
123103
cfg configmodels.Receiver,
124-
consumer consumer.MetricsConsumerOld,
104+
consumer consumer.MetricsConsumer,
125105
) (component.MetricsReceiver, error) {
126106

127107
rCfg := cfg.(*Config)
128-
return New(logger, *rCfg, consumer)
108+
return New(params.Logger, *rCfg, consumer)
129109
}

receiver/carbonreceiver/factory_test.go

Lines changed: 7 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -19,43 +19,25 @@ import (
1919
"testing"
2020

2121
"github.com/stretchr/testify/assert"
22+
"go.opentelemetry.io/collector/component"
2223
"go.opentelemetry.io/collector/config/configcheck"
23-
"go.opentelemetry.io/collector/config/configerror"
24-
"go.opentelemetry.io/collector/consumer"
25-
"go.opentelemetry.io/collector/consumer/consumerdata"
24+
"go.opentelemetry.io/collector/exporter/exportertest"
2625
"go.uber.org/zap"
2726
)
2827

2928
func TestCreateDefaultConfig(t *testing.T) {
30-
factory := &Factory{}
29+
factory := NewFactory()
3130
cfg := factory.CreateDefaultConfig()
3231
assert.NotNil(t, cfg, "failed to create default config")
3332
assert.NoError(t, configcheck.ValidateConfig(cfg))
3433
}
3534

36-
type mockMetricsConsumer struct {
37-
}
38-
39-
var _ (consumer.MetricsConsumerOld) = (*mockMetricsConsumer)(nil)
40-
41-
func (m *mockMetricsConsumer) ConsumeMetricsData(ctx context.Context, md consumerdata.MetricsData) error {
42-
return nil
43-
}
44-
4535
func TestCreateReceiver(t *testing.T) {
46-
factory := &Factory{}
47-
cfg := factory.CreateDefaultConfig().(*Config)
36+
cfg := createDefaultConfig().(*Config)
4837
cfg.Endpoint = "localhost:0" // Endpoint is required, not going to be used here.
4938

50-
tReceiver, err := factory.CreateMetricsReceiver(context.Background(), zap.NewNop(), cfg, &mockMetricsConsumer{})
51-
assert.Nil(t, err, "receiver creation failed")
52-
assert.NotNil(t, tReceiver, "receiver creation failed")
53-
54-
tReceiver, err = factory.CreateMetricsReceiver(context.Background(), zap.NewNop(), cfg, &mockMetricsConsumer{})
55-
assert.Nil(t, err, "receiver creation failed")
39+
params := component.ReceiverCreateParams{Logger: zap.NewNop()}
40+
tReceiver, err := createMetricsReceiver(context.Background(), params, cfg, exportertest.NewNopMetricsExporter())
41+
assert.NoError(t, err)
5642
assert.NotNil(t, tReceiver, "receiver creation failed")
57-
58-
mReceiver, err := factory.CreateTraceReceiver(context.Background(), zap.NewNop(), cfg, nil)
59-
assert.Equal(t, err, configerror.ErrDataTypeIsNotSupported)
60-
assert.Nil(t, mReceiver)
6143
}

receiver/carbonreceiver/receiver.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ type carbonReceiver struct {
4545
server transport.Server
4646
reporter transport.Reporter
4747
parser protocol.Parser
48-
nextConsumer consumer.MetricsConsumerOld
48+
nextConsumer consumer.MetricsConsumer
4949

5050
startOnce sync.Once
5151
stopOnce sync.Once
@@ -57,7 +57,7 @@ var _ component.MetricsReceiver = (*carbonReceiver)(nil)
5757
func New(
5858
logger *zap.Logger,
5959
config Config,
60-
nextConsumer consumer.MetricsConsumerOld,
60+
nextConsumer consumer.MetricsConsumer,
6161
) (component.MetricsReceiver, error) {
6262

6363
if nextConsumer == nil {

receiver/carbonreceiver/receiver_test.go

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"go.opentelemetry.io/collector/config/configmodels"
3131
"go.opentelemetry.io/collector/config/confignet"
3232
"go.opentelemetry.io/collector/consumer"
33+
"go.opentelemetry.io/collector/consumer/pdatautil"
3334
"go.opentelemetry.io/collector/exporter/exportertest"
3435
"go.opentelemetry.io/collector/testutil"
3536
"go.uber.org/zap"
@@ -40,10 +41,10 @@ import (
4041
)
4142

4243
func Test_carbonreceiver_New(t *testing.T) {
43-
defaultConfig := (&Factory{}).CreateDefaultConfig().(*Config)
44+
defaultConfig := createDefaultConfig().(*Config)
4445
type args struct {
4546
config Config
46-
nextConsumer consumer.MetricsConsumerOld
47+
nextConsumer consumer.MetricsConsumer
4748
}
4849
tests := []struct {
4950
name string
@@ -54,7 +55,7 @@ func Test_carbonreceiver_New(t *testing.T) {
5455
name: "default_config",
5556
args: args{
5657
config: *defaultConfig,
57-
nextConsumer: new(exportertest.SinkMetricsExporterOld),
58+
nextConsumer: exportertest.NewNopMetricsExporter(),
5859
},
5960
},
6061
{
@@ -68,7 +69,7 @@ func Test_carbonreceiver_New(t *testing.T) {
6869
},
6970
TCPIdleTimeout: defaultConfig.TCPIdleTimeout,
7071
},
71-
nextConsumer: new(exportertest.SinkMetricsExporterOld),
72+
nextConsumer: exportertest.NewNopMetricsExporter(),
7273
},
7374
},
7475
{
@@ -84,7 +85,7 @@ func Test_carbonreceiver_New(t *testing.T) {
8485
config: Config{
8586
ReceiverSettings: configmodels.ReceiverSettings{},
8687
},
87-
nextConsumer: new(exportertest.SinkMetricsExporterOld),
88+
nextConsumer: exportertest.NewNopMetricsExporter(),
8889
},
8990
wantErr: errEmptyEndpoint,
9091
},
@@ -104,7 +105,7 @@ func Test_carbonreceiver_New(t *testing.T) {
104105
Config: &protocol.PlaintextConfig{},
105106
},
106107
},
107-
nextConsumer: new(exportertest.SinkMetricsExporterOld),
108+
nextConsumer: exportertest.NewNopMetricsExporter(),
108109
},
109110
wantErr: errors.New("unsupported transport \"unknown_transp\" for receiver \"invalid_transport_rcv\""),
110111
},
@@ -130,7 +131,7 @@ func Test_carbonreceiver_New(t *testing.T) {
130131
},
131132
},
132133
},
133-
nextConsumer: new(exportertest.SinkMetricsExporterOld),
134+
nextConsumer: exportertest.NewNopMetricsExporter(),
134135
},
135136
},
136137
{
@@ -150,7 +151,7 @@ func Test_carbonreceiver_New(t *testing.T) {
150151
Config: &protocol.PlaintextConfig{},
151152
},
152153
},
153-
nextConsumer: new(exportertest.SinkMetricsExporterOld),
154+
nextConsumer: exportertest.NewNopMetricsExporter(),
154155
},
155156
wantErr: errors.New("invalid idle timeout: -1s"),
156157
},
@@ -184,7 +185,7 @@ func Test_carbonreceiver_EndToEnd(t *testing.T) {
184185
{
185186
name: "default_config",
186187
configFn: func() *Config {
187-
return (&Factory{}).CreateDefaultConfig().(*Config)
188+
return createDefaultConfig().(*Config)
188189
},
189190
clientFn: func(t *testing.T) *client.Graphite {
190191
c, err := client.NewGraphite(client.TCP, host, port)
@@ -195,7 +196,7 @@ func Test_carbonreceiver_EndToEnd(t *testing.T) {
195196
{
196197
name: "default_config_udp",
197198
configFn: func() *Config {
198-
cfg := (&Factory{}).CreateDefaultConfig().(*Config)
199+
cfg := createDefaultConfig().(*Config)
199200
cfg.Transport = "udp"
200201
return cfg
201202
},
@@ -210,7 +211,7 @@ func Test_carbonreceiver_EndToEnd(t *testing.T) {
210211
t.Run(tt.name, func(t *testing.T) {
211212
cfg := tt.configFn()
212213
cfg.Endpoint = addr
213-
sink := new(exportertest.SinkMetricsExporterOld)
214+
sink := new(exportertest.SinkMetricsExporter)
214215
rcv, err := New(zap.NewNop(), *cfg, sink)
215216
require.NoError(t, err)
216217
r := rcv.(*carbonReceiver)
@@ -237,9 +238,11 @@ func Test_carbonreceiver_EndToEnd(t *testing.T) {
237238
mr.WaitAllOnMetricsProcessedCalls()
238239

239240
mdd := sink.AllMetrics()
240-
require.Equal(t, 1, len(mdd))
241-
require.Equal(t, 1, len(mdd[0].Metrics))
242-
metric := mdd[0].Metrics[0]
241+
require.Len(t, mdd, 1)
242+
ocmd := pdatautil.MetricsToMetricsData(mdd[0])
243+
require.Len(t, ocmd, 1)
244+
require.Len(t, ocmd[0].Metrics, 1)
245+
metric := ocmd[0].Metrics[0]
243246
assert.Equal(t, carbonMetric.Name, metric.GetMetricDescriptor().GetName())
244247
tss := metric.GetTimeseries()
245248
require.Equal(t, 1, len(tss))

receiver/carbonreceiver/transport/server.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ type Server interface {
3636
// the Parser and passed to the next consumer.
3737
ListenAndServe(
3838
p protocol.Parser,
39-
mc consumer.MetricsConsumerOld,
39+
mc consumer.MetricsConsumer,
4040
r Reporter,
4141
) error
4242

0 commit comments

Comments
 (0)