@@ -14,6 +14,9 @@ import (
1414 "go.opentelemetry.io/collector/component"
1515 "go.opentelemetry.io/collector/component/componentstatus"
1616 "go.opentelemetry.io/collector/consumer"
17+ "go.opentelemetry.io/collector/pdata/plog"
18+ "go.opentelemetry.io/collector/pdata/pmetric"
19+ "go.opentelemetry.io/collector/pdata/ptrace"
1720 "go.opentelemetry.io/collector/receiver"
1821 "go.opentelemetry.io/collector/receiver/receiverhelper"
1922 "go.opentelemetry.io/otel/attribute"
@@ -100,11 +103,7 @@ var _ receiver.Traces = (*kafkaTracesConsumer)(nil)
100103var _ receiver.Metrics = (* kafkaMetricsConsumer )(nil )
101104var _ receiver.Logs = (* kafkaLogsConsumer )(nil )
102105
103- func newTracesReceiver (config Config , set receiver.Settings , unmarshaler TracesUnmarshaler , nextConsumer consumer.Traces ) (* kafkaTracesConsumer , error ) {
104- if unmarshaler == nil {
105- return nil , errUnrecognizedEncoding
106- }
107-
106+ func newTracesReceiver (config Config , set receiver.Settings , nextConsumer consumer.Traces ) (* kafkaTracesConsumer , error ) {
108107 telemetryBuilder , err := metadata .NewTelemetryBuilder (set .TelemetrySettings )
109108 if err != nil {
110109 return nil , err
@@ -114,7 +113,6 @@ func newTracesReceiver(config Config, set receiver.Settings, unmarshaler TracesU
114113 config : config ,
115114 topics : []string {config .Topic },
116115 nextConsumer : nextConsumer ,
117- unmarshaler : unmarshaler ,
118116 settings : set ,
119117 autocommitEnabled : config .AutoCommit .Enable ,
120118 messageMarking : config .MessageMarking ,
@@ -170,6 +168,22 @@ func (c *kafkaTracesConsumer) Start(_ context.Context, host component.Host) erro
170168 if err != nil {
171169 return err
172170 }
171+ // extensions take precedence over internal encodings
172+ if unmarshaler , errExt := loadEncodingExtension [ptrace.Unmarshaler ](
173+ host ,
174+ c .config .Encoding ,
175+ ); errExt == nil {
176+ c .unmarshaler = & tracesEncodingUnmarshaler {
177+ unmarshaler : * unmarshaler ,
178+ encoding : c .config .Encoding ,
179+ }
180+ }
181+ if unmarshaler , ok := defaultTracesUnmarshalers ()[c .config .Encoding ]; c .unmarshaler == nil && ok {
182+ c .unmarshaler = unmarshaler
183+ }
184+ if c .unmarshaler == nil {
185+ return errUnrecognizedEncoding
186+ }
173187 // consumerGroup may be set in tests to inject fake implementation.
174188 if c .consumerGroup == nil {
175189 if c .consumerGroup , err = createKafkaClient (c .config ); err != nil {
@@ -229,11 +243,7 @@ func (c *kafkaTracesConsumer) Shutdown(context.Context) error {
229243 return c .consumerGroup .Close ()
230244}
231245
232- func newMetricsReceiver (config Config , set receiver.Settings , unmarshaler MetricsUnmarshaler , nextConsumer consumer.Metrics ) (* kafkaMetricsConsumer , error ) {
233- if unmarshaler == nil {
234- return nil , errUnrecognizedEncoding
235- }
236-
246+ func newMetricsReceiver (config Config , set receiver.Settings , nextConsumer consumer.Metrics ) (* kafkaMetricsConsumer , error ) {
237247 telemetryBuilder , err := metadata .NewTelemetryBuilder (set .TelemetrySettings )
238248 if err != nil {
239249 return nil , err
@@ -243,7 +253,6 @@ func newMetricsReceiver(config Config, set receiver.Settings, unmarshaler Metric
243253 config : config ,
244254 topics : []string {config .Topic },
245255 nextConsumer : nextConsumer ,
246- unmarshaler : unmarshaler ,
247256 settings : set ,
248257 autocommitEnabled : config .AutoCommit .Enable ,
249258 messageMarking : config .MessageMarking ,
@@ -267,6 +276,22 @@ func (c *kafkaMetricsConsumer) Start(_ context.Context, host component.Host) err
267276 if err != nil {
268277 return err
269278 }
279+ // extensions take precedence over internal encodings
280+ if unmarshaler , errExt := loadEncodingExtension [pmetric.Unmarshaler ](
281+ host ,
282+ c .config .Encoding ,
283+ ); errExt == nil {
284+ c .unmarshaler = & metricsEncodingUnmarshaler {
285+ unmarshaler : * unmarshaler ,
286+ encoding : c .config .Encoding ,
287+ }
288+ }
289+ if unmarshaler , ok := defaultMetricsUnmarshalers ()[c .config .Encoding ]; c .unmarshaler == nil && ok {
290+ c .unmarshaler = unmarshaler
291+ }
292+ if c .unmarshaler == nil {
293+ return errUnrecognizedEncoding
294+ }
270295 // consumerGroup may be set in tests to inject fake implementation.
271296 if c .consumerGroup == nil {
272297 if c .consumerGroup , err = createKafkaClient (c .config ); err != nil {
@@ -326,11 +351,7 @@ func (c *kafkaMetricsConsumer) Shutdown(context.Context) error {
326351 return c .consumerGroup .Close ()
327352}
328353
329- func newLogsReceiver (config Config , set receiver.Settings , unmarshaler LogsUnmarshaler , nextConsumer consumer.Logs ) (* kafkaLogsConsumer , error ) {
330- if unmarshaler == nil {
331- return nil , errUnrecognizedEncoding
332- }
333-
354+ func newLogsReceiver (config Config , set receiver.Settings , nextConsumer consumer.Logs ) (* kafkaLogsConsumer , error ) {
334355 telemetryBuilder , err := metadata .NewTelemetryBuilder (set .TelemetrySettings )
335356 if err != nil {
336357 return nil , err
@@ -340,7 +361,6 @@ func newLogsReceiver(config Config, set receiver.Settings, unmarshaler LogsUnmar
340361 config : config ,
341362 topics : []string {config .Topic },
342363 nextConsumer : nextConsumer ,
343- unmarshaler : unmarshaler ,
344364 settings : set ,
345365 autocommitEnabled : config .AutoCommit .Enable ,
346366 messageMarking : config .MessageMarking ,
@@ -364,6 +384,25 @@ func (c *kafkaLogsConsumer) Start(_ context.Context, host component.Host) error
364384 if err != nil {
365385 return err
366386 }
387+ // extensions take precedence over internal encodings
388+ if unmarshaler , errExt := loadEncodingExtension [plog.Unmarshaler ](
389+ host ,
390+ c .config .Encoding ,
391+ ); errExt == nil {
392+ c .unmarshaler = & logsEncodingUnmarshaler {
393+ unmarshaler : * unmarshaler ,
394+ encoding : c .config .Encoding ,
395+ }
396+ }
397+ if unmarshaler , errInt := getLogsUnmarshaler (
398+ c .config .Encoding ,
399+ defaultLogsUnmarshalers (c .settings .BuildInfo .Version , c .settings .Logger ),
400+ ); c .unmarshaler == nil && errInt == nil {
401+ c .unmarshaler = unmarshaler
402+ }
403+ if c .unmarshaler == nil {
404+ return errUnrecognizedEncoding
405+ }
367406 // consumerGroup may be set in tests to inject fake implementation.
368407 if c .consumerGroup == nil {
369408 if c .consumerGroup , err = createKafkaClient (c .config ); err != nil {
@@ -720,3 +759,30 @@ func toSaramaInitialOffset(initialOffset string) (int64, error) {
720759 return 0 , errInvalidInitialOffset
721760 }
722761}
762+
763+ // loadEncodingExtension tries to load an available extension for the given encoding.
764+ func loadEncodingExtension [T any ](host component.Host , encoding string ) (* T , error ) {
765+ extensionID , err := encodingToComponentID (encoding )
766+ if err != nil {
767+ return nil , err
768+ }
769+ encodingExtension , ok := host .GetExtensions ()[* extensionID ]
770+ if ! ok {
771+ return nil , fmt .Errorf ("unknown encoding extension %q" , encoding )
772+ }
773+ unmarshaler , ok := encodingExtension .(T )
774+ if ! ok {
775+ return nil , fmt .Errorf ("extension %q is not an unmarshaler" , encoding )
776+ }
777+ return & unmarshaler , nil
778+ }
779+
780+ // encodingToComponentID converts an encoding string to a component ID using the given encoding as type.
781+ func encodingToComponentID (encoding string ) (* component.ID , error ) {
782+ componentType , err := component .NewType (encoding )
783+ if err != nil {
784+ return nil , fmt .Errorf ("invalid component type: %w" , err )
785+ }
786+ id := component .NewID (componentType )
787+ return & id , nil
788+ }
0 commit comments