Skip to content

Commit d9352db

Browse files
Add support for CONSUMER and PRODUCER span kinds (#520)
Previously these span kinds were not supported because OpenCensus does not have equivalent span kinds and they were translated as "Unspecified" span kind when received by Zipkin receiver. Now these span kinds are internally stored in a span attribute named "span.kind" when received by Zipkin receiver. When exporting, this attribute is used to perform a reverse translation in Zipkin exporter. "span.kind" also matches the OpenTracing semantic conventions so it should be correctly interpreted by any backend that follows this convention. ## To reviewers: The code that does translation from OC to Zipkin is moved from exporter/zipkinexporter/zipkin.go to translator/trace/zipkin/protospan_to_zipkinv1.go. protospan_to_zipkinv1.go is not all new code, only the code that deals with span kind is to be reviewed. Testing done: - Added a unit test to verify translations between Zipkin,OC and Jaeger. - TODO: Add E2E test to verify these translations on real pipelines.
1 parent 1f803d7 commit d9352db

File tree

10 files changed

+655
-401
lines changed

10 files changed

+655
-401
lines changed

exporter/zipkinexporter/zipkin.go

Lines changed: 1 addition & 279 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,7 @@ import (
1818
"bytes"
1919
"context"
2020
"fmt"
21-
"net"
2221
"net/http"
23-
"strconv"
2422
"time"
2523

2624
commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1"
@@ -35,7 +33,6 @@ import (
3533
"github.com/open-telemetry/opentelemetry-collector/consumer/consumererror"
3634
"github.com/open-telemetry/opentelemetry-collector/exporter"
3735
"github.com/open-telemetry/opentelemetry-collector/exporter/exporterhelper"
38-
tracetranslator "github.com/open-telemetry/opentelemetry-collector/translator/trace"
3936
spandatatranslator "github.com/open-telemetry/opentelemetry-collector/translator/trace/spandata"
4037
"github.com/open-telemetry/opentelemetry-collector/translator/trace/zipkin"
4138
)
@@ -107,86 +104,6 @@ func createZipkinExporter(logger *zap.Logger, config configmodels.Exporter) (*zi
107104
return ze, nil
108105
}
109106

110-
// zipkinEndpointFromAttributes extracts zipkin endpoint information
111-
// from a set of attributes (in the format of OC SpanData). It returns the built
112-
// zipkin endpoint and insert the attribute keys that were made redundant
113-
// (because now they can be represented by the endpoint) into the redundantKeys
114-
// map (function assumes that this was created by the caller). Per call at most
115-
// 3 attribute keys can be made redundant.
116-
func zipkinEndpointFromAttributes(
117-
attributes map[string]interface{},
118-
serviceName string,
119-
endpointType zipkinDirection,
120-
redundantKeys map[string]bool,
121-
) (endpoint *zipkinmodel.Endpoint) {
122-
123-
if attributes == nil {
124-
return nil
125-
}
126-
127-
// The data in the Attributes map was saved in the format
128-
// {
129-
// "ipv4": "192.168.99.101",
130-
// "port": "9000",
131-
// "serviceName": "backend",
132-
// }
133-
134-
var ipv4Key, ipv6Key, portKey string
135-
if endpointType == isLocalEndpoint {
136-
ipv4Key, ipv6Key, portKey = zipkin.LocalEndpointIPv4, zipkin.LocalEndpointIPv6, zipkin.LocalEndpointPort
137-
} else {
138-
ipv4Key, ipv6Key, portKey = zipkin.RemoteEndpointIPv4, zipkin.RemoteEndpointIPv6, zipkin.RemoteEndpointPort
139-
}
140-
141-
var ip net.IP
142-
ipv6Selected := false
143-
144-
if ipv4Str, ok := extractStringAttribute(attributes, ipv4Key); ok {
145-
ip = net.ParseIP(ipv4Str)
146-
redundantKeys[ipv4Key] = true
147-
} else if ipv6Str, ok := extractStringAttribute(attributes, ipv6Key); ok {
148-
ip = net.ParseIP(ipv6Str)
149-
ipv6Selected = true
150-
redundantKeys[ipv6Key] = true
151-
}
152-
153-
var port uint64
154-
if portStr, ok := extractStringAttribute(attributes, portKey); ok {
155-
port, _ = strconv.ParseUint(portStr, 10, 16)
156-
redundantKeys[portKey] = true
157-
}
158-
159-
if serviceName == "" && len(ip) == 0 && port == 0 {
160-
// Nothing to put on the endpoint
161-
return nil
162-
}
163-
164-
zEndpoint := &zipkinmodel.Endpoint{
165-
ServiceName: serviceName,
166-
Port: uint16(port),
167-
}
168-
169-
if ipv6Selected {
170-
zEndpoint.IPv6 = ip
171-
} else {
172-
zEndpoint.IPv4 = ip
173-
}
174-
175-
return zEndpoint
176-
}
177-
178-
func extractStringAttribute(
179-
attributes map[string]interface{},
180-
key string,
181-
) (value string, ok bool) {
182-
var i interface{}
183-
if i, ok = attributes[key]; ok {
184-
value, ok = i.(string)
185-
}
186-
187-
return value, ok
188-
}
189-
190107
func (ze *zipkinExporter) PushTraceData(ctx context.Context, td consumerdata.TraceData) (droppedSpans int, err error) {
191108
tbatch := []*zipkinmodel.SpanModel{}
192109
for _, span := range td.Spans {
@@ -220,204 +137,9 @@ func (ze *zipkinExporter) PushTraceData(ctx context.Context, td consumerdata.Tra
220137
return 0, nil
221138
}
222139

223-
// This code from down below is mostly copied from
224-
// https://github.com/census-instrumentation/opencensus-go/blob/96e75b88df843315da521168a0e3b11792088728/exporter/zipkin/zipkin.go#L57-L194
225-
// but that is because the Zipkin Go exporter requires process to change
226-
// and was designed without taking into account that LocalEndpoint and RemoteEndpoint
227-
// are per-span-Node attributes instead of global/system variables.
228-
// The alternative is to create a single exporter for every single combination
229-
// but this wastes resources i.e. an HTTP client for every single combination
230-
// but also requires the exporter to be changed entirely as per
231-
// https://github.com/census-instrumentation/opencensus-go/issues/959
232-
//
233-
// TODO: (@odeke-em) whenever we come to consensus with the OpenCensus-Go repository
234-
// on the Zipkin exporter and they have the same logic, then delete all the code
235-
// below here to allow per-span configuration changes.
236-
237-
const (
238-
statusCodeTagKey = "error"
239-
statusDescriptionTagKey = "opencensus.status_description"
240-
)
241-
242-
var (
243-
sampledTrue = true
244-
canonicalCodes = [...]string{
245-
"OK",
246-
"CANCELLED",
247-
"UNKNOWN",
248-
"INVALID_ARGUMENT",
249-
"DEADLINE_EXCEEDED",
250-
"NOT_FOUND",
251-
"ALREADY_EXISTS",
252-
"PERMISSION_DENIED",
253-
"RESOURCE_EXHAUSTED",
254-
"FAILED_PRECONDITION",
255-
"ABORTED",
256-
"OUT_OF_RANGE",
257-
"UNIMPLEMENTED",
258-
"INTERNAL",
259-
"UNAVAILABLE",
260-
"DATA_LOSS",
261-
"UNAUTHENTICATED",
262-
}
263-
)
264-
265-
func canonicalCodeString(code int32) string {
266-
if code < 0 || int(code) >= len(canonicalCodes) {
267-
return "error code " + strconv.FormatInt(int64(code), 10)
268-
}
269-
return canonicalCodes[code]
270-
}
271-
272-
func convertTraceID(t trace.TraceID) zipkinmodel.TraceID {
273-
h, l, _ := tracetranslator.BytesToUInt64TraceID(t[:])
274-
return zipkinmodel.TraceID{High: h, Low: l}
275-
}
276-
277-
func convertSpanID(s trace.SpanID) zipkinmodel.ID {
278-
id, _ := tracetranslator.BytesToUInt64SpanID(s[:])
279-
return zipkinmodel.ID(id)
280-
}
281-
282-
func spanKind(s *trace.SpanData) zipkinmodel.Kind {
283-
switch s.SpanKind {
284-
case trace.SpanKindClient:
285-
return zipkinmodel.Client
286-
case trace.SpanKindServer:
287-
return zipkinmodel.Server
288-
}
289-
return zipkinmodel.Undetermined
290-
}
291-
292-
func (ze *zipkinExporter) serviceNameOrDefault(node *commonpb.Node) string {
293-
// ze.defaultServiceName should never change
294-
defaultServiceName := ze.defaultServiceName
295-
296-
if node == nil || node.ServiceInfo == nil {
297-
return defaultServiceName
298-
}
299-
300-
if node.ServiceInfo.Name == "" {
301-
return defaultServiceName
302-
}
303-
304-
return node.ServiceInfo.Name
305-
}
306-
307-
type zipkinDirection bool
308-
309-
const (
310-
isLocalEndpoint zipkinDirection = true
311-
isRemoteEndpoint zipkinDirection = false
312-
)
313-
314140
func (ze *zipkinExporter) zipkinSpan(
315141
node *commonpb.Node,
316142
s *trace.SpanData,
317143
) (zc zipkinmodel.SpanModel) {
318-
319-
// Per call to zipkinEndpointFromAttributes at most 3 attribute keys can be
320-
// made redundant, give a hint when calling make that we expect at most 6
321-
// items on the map.
322-
redundantKeys := make(map[string]bool, 6)
323-
localEndpointServiceName := ze.serviceNameOrDefault(node)
324-
localEndpoint := zipkinEndpointFromAttributes(
325-
s.Attributes, localEndpointServiceName, isLocalEndpoint, redundantKeys)
326-
327-
remoteServiceName := ""
328-
if remoteServiceEntry, ok := s.Attributes[zipkin.RemoteEndpointServiceName]; ok {
329-
if remoteServiceName, ok = remoteServiceEntry.(string); ok {
330-
redundantKeys[zipkin.RemoteEndpointServiceName] = true
331-
}
332-
}
333-
remoteEndpoint := zipkinEndpointFromAttributes(
334-
s.Attributes, remoteServiceName, isRemoteEndpoint, redundantKeys)
335-
336-
sc := s.SpanContext
337-
z := zipkinmodel.SpanModel{
338-
SpanContext: zipkinmodel.SpanContext{
339-
TraceID: convertTraceID(sc.TraceID),
340-
ID: convertSpanID(sc.SpanID),
341-
Sampled: &sampledTrue,
342-
},
343-
Kind: spanKind(s),
344-
Name: s.Name,
345-
Timestamp: s.StartTime,
346-
Shared: false,
347-
LocalEndpoint: localEndpoint,
348-
RemoteEndpoint: remoteEndpoint,
349-
}
350-
351-
if s.ParentSpanID != (trace.SpanID{}) {
352-
id := convertSpanID(s.ParentSpanID)
353-
z.ParentID = &id
354-
}
355-
356-
if s, e := s.StartTime, s.EndTime; !s.IsZero() && !e.IsZero() {
357-
z.Duration = e.Sub(s)
358-
}
359-
360-
// construct Tags from s.Attributes and s.Status.
361-
if len(s.Attributes) != 0 {
362-
m := make(map[string]string, len(s.Attributes)+2)
363-
for key, value := range s.Attributes {
364-
if redundantKeys[key] {
365-
// Already represented by something other than an attribute,
366-
// skip it.
367-
continue
368-
}
369-
370-
switch v := value.(type) {
371-
case string:
372-
m[key] = v
373-
case bool:
374-
if v {
375-
m[key] = "true"
376-
} else {
377-
m[key] = "false"
378-
}
379-
case int64:
380-
m[key] = strconv.FormatInt(v, 10)
381-
}
382-
}
383-
z.Tags = m
384-
}
385-
if s.Status.Code != 0 || s.Status.Message != "" {
386-
if z.Tags == nil {
387-
z.Tags = make(map[string]string, 2)
388-
}
389-
if s.Status.Code != 0 {
390-
z.Tags[statusCodeTagKey] = canonicalCodeString(s.Status.Code)
391-
}
392-
if s.Status.Message != "" {
393-
z.Tags[statusDescriptionTagKey] = s.Status.Message
394-
}
395-
}
396-
397-
// construct Annotations from s.Annotations and s.MessageEvents.
398-
if len(s.Annotations) != 0 || len(s.MessageEvents) != 0 {
399-
z.Annotations = make([]zipkinmodel.Annotation, 0, len(s.Annotations)+len(s.MessageEvents))
400-
for _, a := range s.Annotations {
401-
z.Annotations = append(z.Annotations, zipkinmodel.Annotation{
402-
Timestamp: a.Time,
403-
Value: a.Message,
404-
})
405-
}
406-
for _, m := range s.MessageEvents {
407-
a := zipkinmodel.Annotation{
408-
Timestamp: m.Time,
409-
}
410-
switch m.EventType {
411-
case trace.MessageEventTypeSent:
412-
a.Value = "SENT"
413-
case trace.MessageEventTypeRecv:
414-
a.Value = "RECV"
415-
default:
416-
a.Value = "<?>"
417-
}
418-
z.Annotations = append(z.Annotations, a)
419-
}
420-
}
421-
422-
return z
144+
return zipkin.OCSpanDataToZipkin(node, s, ze.defaultServiceName)
423145
}

0 commit comments

Comments
 (0)