Skip to content

Commit 16130e5

Browse files
yaroliakhjmsnll
authored andcommitted
[exporter/kafkaexporter] add zipkin encoding for traces (open-telemetry#23947)
**Description:** <Describe what has changed.> Adding Zipkin v2 encoding support for traces in kafkaexporter **Link to tracking Issue:** <Issue number if applicable> open-telemetry#21102 **Testing:** <Describe what testing was performed and which tests were added.> Test for `tracesMarshalers` has been extended with zipkin JSON test case.
1 parent a277117 commit 16130e5

File tree

8 files changed

+106
-36
lines changed

8 files changed

+106
-36
lines changed
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
# Use this changelog template to create an entry for release notes.
2+
# If your change doesn't affect end users, such as a test fix or a tooling change,
3+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
4+
5+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
6+
change_type: enhancement
7+
8+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
9+
component: exporter/kafkaexporter
10+
11+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
12+
note: Adding Zipkin encoding option for traces to kafkaexporter
13+
14+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
15+
issues: [21102]
16+
17+
# (Optional) One or more lines of additional information to render under the primary note.
18+
# These lines will be padded with 2 spaces and then inserted directly into the document.
19+
# Use pipe (|) for multiline entries.
20+
subtext:

exporter/kafkaexporter/README.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,9 @@ The following settings can be optionally configured:
3131
- `otlp_json`: payload is JSON serialized from `ExportTraceServiceRequest` if set as a traces exporter or `ExportMetricsServiceRequest` for metrics or `ExportLogsServiceRequest` for logs.
3232
- The following encodings are valid *only* for **traces**.
3333
- `jaeger_proto`: the payload is serialized to a single Jaeger proto `Span`, and keyed by TraceID.
34-
- `jaeger_json`: the payload is serialized to a single Jaeger JSON Span using `jsonpb`, and keyed by TraceID.\
34+
- `jaeger_json`: the payload is serialized to a single Jaeger JSON Span using `jsonpb`, and keyed by TraceID.
35+
- `zipkin_proto`: the payload is serialized to Zipkin v2 proto Span.
36+
- `zipkin_json`: the payload is serialized to Zipkin v2 JSON Span.
3537
- The following encodings are valid *only* for **logs**.
3638
- `raw`: if the log record body is a byte array, it is sent as is. Otherwise, it is serialized to JSON. Resource and record attributes are discarded.
3739
- `auth`

exporter/kafkaexporter/go.mod

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,11 @@ require (
77
github.com/aws/aws-sdk-go v1.45.20
88
github.com/cenkalti/backoff/v4 v4.2.1
99
github.com/gogo/protobuf v1.3.2
10-
github.com/jaegertracing/jaeger v1.41.0
10+
github.com/jaegertracing/jaeger v1.48.0
1111
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.86.0
1212
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger v0.86.0
13+
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin v0.86.0
14+
github.com/openzipkin/zipkin-go v0.4.2
1315
github.com/stretchr/testify v1.8.4
1416
github.com/xdg-go/scram v1.1.2
1517
go.opentelemetry.io/collector/component v0.86.0
@@ -34,7 +36,7 @@ require (
3436
github.com/go-logr/stdr v1.2.2 // indirect
3537
github.com/golang/protobuf v1.5.3 // indirect
3638
github.com/golang/snappy v0.0.4 // indirect
37-
github.com/hashicorp/errwrap v1.0.0 // indirect
39+
github.com/hashicorp/errwrap v1.1.0 // indirect
3840
github.com/hashicorp/go-multierror v1.1.1 // indirect
3941
github.com/hashicorp/go-uuid v1.0.3 // indirect
4042
github.com/jcmturner/aescts/v2 v2.0.0 // indirect
@@ -71,7 +73,7 @@ require (
7173
go.opentelemetry.io/otel/sdk v1.19.0 // indirect
7274
go.opentelemetry.io/otel/sdk/metric v1.19.0 // indirect
7375
go.opentelemetry.io/otel/trace v1.19.0 // indirect
74-
go.uber.org/atomic v1.10.0 // indirect
76+
go.uber.org/atomic v1.11.0 // indirect
7577
golang.org/x/crypto v0.13.0 // indirect
7678
golang.org/x/net v0.15.0 // indirect
7779
golang.org/x/sys v0.12.0 // indirect
@@ -95,3 +97,5 @@ retract (
9597
replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../pkg/pdatautil
9698

9799
replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest => ../../pkg/pdatatest
100+
101+
replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin => ../../pkg/translator/zipkin

exporter/kafkaexporter/go.sum

Lines changed: 8 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

exporter/kafkaexporter/marshaler.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ import (
88
"go.opentelemetry.io/collector/pdata/plog"
99
"go.opentelemetry.io/collector/pdata/pmetric"
1010
"go.opentelemetry.io/collector/pdata/ptrace"
11+
12+
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin/zipkinv2"
1113
)
1214

1315
// TracesMarshaler marshals traces into Message array.
@@ -41,11 +43,15 @@ type LogsMarshaler interface {
4143
func tracesMarshalers() map[string]TracesMarshaler {
4244
otlpPb := newPdataTracesMarshaler(&ptrace.ProtoMarshaler{}, defaultEncoding)
4345
otlpJSON := newPdataTracesMarshaler(&ptrace.JSONMarshaler{}, "otlp_json")
46+
zipkinProto := newPdataTracesMarshaler(zipkinv2.NewProtobufTracesMarshaler(), "zipkin_proto")
47+
zipkinJSON := newPdataTracesMarshaler(zipkinv2.NewJSONTracesMarshaler(), "zipkin_json")
4448
jaegerProto := jaegerMarshaler{marshaler: jaegerProtoSpanMarshaler{}}
4549
jaegerJSON := jaegerMarshaler{marshaler: newJaegerJSONMarshaler()}
4650
return map[string]TracesMarshaler{
4751
otlpPb.Encoding(): otlpPb,
4852
otlpJSON.Encoding(): otlpJSON,
53+
zipkinProto.Encoding(): zipkinProto,
54+
zipkinJSON.Encoding(): zipkinJSON,
4955
jaegerProto.Encoding(): jaegerProto,
5056
jaegerJSON.Encoding(): jaegerJSON,
5157
}

exporter/kafkaexporter/marshaler_test.go

Lines changed: 50 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"testing"
1010
"time"
1111

12+
zipkin "github.com/openzipkin/zipkin-go/model"
1213
"github.com/stretchr/testify/assert"
1314
"github.com/stretchr/testify/require"
1415
"go.opentelemetry.io/collector/pdata/pcommon"
@@ -20,6 +21,8 @@ func TestDefaultTracesMarshalers(t *testing.T) {
2021
expectedEncodings := []string{
2122
"otlp_proto",
2223
"otlp_json",
24+
"zipkin_proto",
25+
"zipkin_json",
2326
"jaeger_proto",
2427
"jaeger_json",
2528
}
@@ -84,27 +87,17 @@ func TestOTLPTracesJsonMarshaling(t *testing.T) {
8487
ils.Spans().AppendEmpty()
8588

8689
span := ils.Spans().At(0)
87-
span.SetKind(ptrace.SpanKindInternal)
88-
span.SetName(t.Name())
90+
span.SetKind(ptrace.SpanKindServer)
91+
span.SetName("foo")
8992
span.SetStartTimestamp(pcommon.NewTimestampFromTime(now))
9093
span.SetEndTimestamp(pcommon.NewTimestampFromTime(now.Add(time.Second)))
94+
span.SetTraceID([16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16})
9195
span.SetSpanID([8]byte{0, 1, 2, 3, 4, 5, 6, 7})
9296
span.SetParentSpanID([8]byte{8, 9, 10, 11, 12, 13, 14})
9397

94-
marshaler, ok := tracesMarshalers()["otlp_json"]
95-
require.True(t, ok, "Must have otlp json marshaller")
96-
97-
msg, err := marshaler.Marshal(traces, t.Name())
98-
require.NoError(t, err, "Must have marshaled the data without error")
99-
require.Len(t, msg, 1, "Must have one entry in the message")
100-
101-
data, err := msg[0].Value.Encode()
102-
require.NoError(t, err, "Must not error when encoding value")
103-
require.NotNil(t, data, "Must have valid data to test")
104-
10598
// Since marshaling json is not guaranteed to be in order
10699
// within a string, using a map to compare that the expected values are there
107-
expectedJSON := map[string]interface{}{
100+
otlpJSON := map[string]interface{}{
108101
"resourceSpans": []interface{}{
109102
map[string]interface{}{
110103
"resource": map[string]interface{}{},
@@ -113,11 +106,11 @@ func TestOTLPTracesJsonMarshaling(t *testing.T) {
113106
"scope": map[string]interface{}{},
114107
"spans": []interface{}{
115108
map[string]interface{}{
116-
"traceId": "",
109+
"traceId": "0102030405060708090a0b0c0d0e0f10",
117110
"spanId": "0001020304050607",
118111
"parentSpanId": "08090a0b0c0d0e00",
119-
"name": t.Name(),
120-
"kind": float64(ptrace.SpanKindInternal),
112+
"name": "foo",
113+
"kind": float64(ptrace.SpanKindServer),
121114
"startTimeUnixNano": fmt.Sprint(now.UnixNano()),
122115
"endTimeUnixNano": fmt.Sprint(now.Add(time.Second).UnixNano()),
123116
"status": map[string]interface{}{},
@@ -131,9 +124,45 @@ func TestOTLPTracesJsonMarshaling(t *testing.T) {
131124
},
132125
}
133126

134-
var final map[string]interface{}
135-
err = json.Unmarshal(data, &final)
136-
require.NoError(t, err, "Must not error marshaling expected data")
127+
zipkinJSON := []interface{}{
128+
map[string]interface{}{
129+
"traceId": "0102030405060708090a0b0c0d0e0f10",
130+
"id": "0001020304050607",
131+
"parentId": "08090a0b0c0d0e00",
132+
"name": "foo",
133+
"timestamp": float64(time.Second.Microseconds()),
134+
"duration": float64(time.Second.Microseconds()),
135+
"kind": string(zipkin.Server),
136+
"localEndpoint": map[string]interface{}{"serviceName": "otlpresourcenoservicename"},
137+
},
138+
}
139+
140+
tests := []struct {
141+
encoding string
142+
expectedJSON interface{}
143+
unmarshaled interface{}
144+
}{
145+
{encoding: "otlp_json", expectedJSON: otlpJSON, unmarshaled: map[string]interface{}{}},
146+
{encoding: "zipkin_json", expectedJSON: zipkinJSON, unmarshaled: []map[string]interface{}{}},
147+
}
148+
149+
for _, test := range tests {
150+
151+
marshaler, ok := tracesMarshalers()[test.encoding]
152+
require.True(t, ok, fmt.Sprintf("Must have %s marshaller", test.encoding))
153+
154+
msg, err := marshaler.Marshal(traces, t.Name())
155+
require.NoError(t, err, "Must have marshaled the data without error")
156+
require.Len(t, msg, 1, "Must have one entry in the message")
157+
158+
data, err := msg[0].Value.Encode()
159+
require.NoError(t, err, "Must not error when encoding value")
160+
require.NotNil(t, data, "Must have valid data to test")
137161

138-
assert.Equal(t, expectedJSON, final, "Must match the expected value")
162+
err = json.Unmarshal(data, &test.unmarshaled)
163+
require.NoError(t, err, "Must not error marshaling expected data")
164+
165+
assert.Equal(t, test.expectedJSON, test.unmarshaled, "Must match the expected value")
166+
167+
}
139168
}

receiver/kafkametricsreceiver/go.mod

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ require (
5050
github.com/hashicorp/errwrap v1.1.0 // indirect
5151
github.com/hashicorp/go-multierror v1.1.1 // indirect
5252
github.com/hashicorp/go-uuid v1.0.3 // indirect
53-
github.com/jaegertracing/jaeger v1.41.0 // indirect
53+
github.com/jaegertracing/jaeger v1.48.0 // indirect
5454
github.com/jcmturner/aescts/v2 v2.0.0 // indirect
5555
github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect
5656
github.com/jcmturner/gofork v1.7.6 // indirect
@@ -74,10 +74,12 @@ require (
7474
github.com/morikuni/aec v1.0.0 // indirect
7575
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.86.0 // indirect
7676
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger v0.86.0 // indirect
77+
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin v0.86.0 // indirect
7778
github.com/opencontainers/go-digest v1.0.0 // indirect
7879
github.com/opencontainers/image-spec v1.1.0-rc4 // indirect
7980
github.com/opencontainers/runc v1.1.5 // indirect
8081
github.com/opentracing/opentracing-go v1.2.0 // indirect
82+
github.com/openzipkin/zipkin-go v0.4.2 // indirect
8183
github.com/pierrec/lz4/v4 v4.1.18 // indirect
8284
github.com/pkg/errors v0.9.1 // indirect
8385
github.com/pmezard/go-difflib v1.0.0 // indirect
@@ -108,7 +110,7 @@ require (
108110
go.opentelemetry.io/otel/sdk v1.19.0 // indirect
109111
go.opentelemetry.io/otel/sdk/metric v1.19.0 // indirect
110112
go.opentelemetry.io/otel/trace v1.19.0 // indirect
111-
go.uber.org/atomic v1.10.0 // indirect
113+
go.uber.org/atomic v1.11.0 // indirect
112114
golang.org/x/crypto v0.13.0 // indirect
113115
golang.org/x/exp v0.0.0-20230711023510-fffb14384f22 // indirect
114116
golang.org/x/mod v0.12.0 // indirect
@@ -141,5 +143,7 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil
141143

142144
replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest => ../../pkg/pdatatest
143145

146+
replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin => ../../pkg/translator/zipkin
147+
144148
// see https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/24240
145149
replace github.com/docker/docker v24.0.4+incompatible => github.com/docker/docker v24.0.5-0.20230719162248-f022632503d1+incompatible

receiver/kafkametricsreceiver/go.sum

Lines changed: 6 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)