@@ -25,53 +25,115 @@ import (
25
25
"go.opentelemetry.io/collector/exporter/exporterhelper"
26
26
"go.opentelemetry.io/collector/translator/trace/jaeger"
27
27
"go.uber.org/zap"
28
+
29
+ "github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/splunk"
28
30
)
29
31
30
32
// sapmExporter is a wrapper struct of SAPM exporter
31
33
type sapmExporter struct {
32
34
client * sapmclient.Client
33
35
logger * zap.Logger
36
+ config * Config
34
37
}
35
38
36
39
func (se * sapmExporter ) Shutdown (context.Context ) error {
37
40
se .client .Stop ()
38
41
return nil
39
42
}
40
43
41
- func newSAPMTraceExporter (cfg * Config , params component.ExporterCreateParams ) (component. TraceExporter , error ) {
44
+ func newSAPMExporter (cfg * Config , params component.ExporterCreateParams ) (sapmExporter , error ) {
42
45
err := cfg .validate ()
43
46
if err != nil {
44
- return nil , err
47
+ return sapmExporter {} , err
45
48
}
46
49
47
50
client , err := sapmclient .New (cfg .clientOptions ()... )
48
51
if err != nil {
49
- return nil , err
52
+ return sapmExporter {} , err
50
53
}
51
- se := sapmExporter {
54
+ return sapmExporter {
52
55
client : client ,
53
56
logger : params .Logger ,
57
+ config : cfg ,
58
+ }, err
59
+ }
60
+
61
+ func newSAPMTraceExporter (cfg * Config , params component.ExporterCreateParams ) (component.TraceExporter , error ) {
62
+ se , err := newSAPMExporter (cfg , params )
63
+ if err != nil {
64
+ return nil , err
54
65
}
66
+
55
67
return exporterhelper .NewTraceExporter (
56
68
cfg ,
57
69
se .pushTraceData ,
58
70
exporterhelper .WithShutdown (se .Shutdown ))
59
71
}
60
72
61
- // pushTraceData exports traces in SAPM proto and returns number of dropped spans and error if export failed
62
- func (se * sapmExporter ) pushTraceData (ctx context.Context , td pdata.Traces ) (droppedSpansCount int , err error ) {
63
- batches , err := jaeger .InternalTracesToJaegerProto (td )
64
- if err != nil {
65
- return td .SpanCount (), consumererror .Permanent (err )
73
+ // tracesByAccessToken takes a pdata.Traces struct and will iterate through its ResourceSpans' attributes,
74
+ // regrouping by any SFx access token label value if Config.AccessTokenPassthrough is enabled. It will delete any
75
+ // set token label in any case to prevent serialization.
76
+ // It returns a map of newly constructed pdata.Traces keyed by access token, defaulting to empty string.
77
+ func (se * sapmExporter ) tracesByAccessToken (td pdata.Traces ) map [string ]pdata.Traces {
78
+ tracesByToken := make (map [string ]pdata.Traces , 1 )
79
+ resourceSpans := td .ResourceSpans ()
80
+ for i := 0 ; i < resourceSpans .Len (); i ++ {
81
+ resourceSpan := resourceSpans .At (i )
82
+ if resourceSpan .IsNil () {
83
+ // Invalid trace so nothing to export
84
+ continue
85
+ }
86
+
87
+ accessToken := ""
88
+ if ! resourceSpan .Resource ().IsNil () {
89
+ attrs := resourceSpan .Resource ().Attributes ()
90
+ attributeValue , ok := attrs .Get (splunk .SFxAccessTokenLabel )
91
+ if ok {
92
+ attrs .Delete (splunk .SFxAccessTokenLabel )
93
+ if se .config .AccessTokenPassthrough {
94
+ accessToken = attributeValue .StringVal ()
95
+ }
96
+ }
97
+ }
98
+
99
+ traceForToken , ok := tracesByToken [accessToken ]
100
+ if ! ok {
101
+ traceForToken = pdata .NewTraces ()
102
+ tracesByToken [accessToken ] = traceForToken
103
+ }
104
+
105
+ // Append ResourceSpan to trace for this access token
106
+ traceForTokenSize := traceForToken .ResourceSpans ().Len ()
107
+ traceForToken .ResourceSpans ().Resize (traceForTokenSize + 1 )
108
+ traceForToken .ResourceSpans ().At (traceForTokenSize ).InitEmpty ()
109
+ resourceSpan .CopyTo (traceForToken .ResourceSpans ().At (traceForTokenSize ))
66
110
}
67
- err = se .client .Export (ctx , batches )
68
- if err != nil {
69
- if sendErr , ok := err .(* sapmclient.ErrSend ); ok {
70
- if sendErr .Permanent {
71
- return 0 , consumererror .Permanent (sendErr )
111
+
112
+ return tracesByToken
113
+ }
114
+
115
+ // pushTraceData exports traces in SAPM proto by associated SFx access token and returns number of dropped spans
116
+ // and the last experienced error if any translation or export failed
117
+ func (se * sapmExporter ) pushTraceData (ctx context.Context , td pdata.Traces ) (droppedSpansCount int , err error ) {
118
+ traces := se .tracesByAccessToken (td )
119
+ droppedSpansCount = 0
120
+ for accessToken , trace := range traces {
121
+ batches , translateErr := jaeger .InternalTracesToJaegerProto (trace )
122
+ if translateErr != nil {
123
+ droppedSpansCount += trace .SpanCount ()
124
+ err = consumererror .Permanent (translateErr )
125
+ continue
126
+ }
127
+
128
+ exportErr := se .client .ExportWithAccessToken (ctx , batches , accessToken )
129
+ if exportErr != nil {
130
+ if sendErr , ok := exportErr .(* sapmclient.ErrSend ); ok {
131
+ if sendErr .Permanent {
132
+ err = consumererror .Permanent (sendErr )
133
+ }
72
134
}
135
+ droppedSpansCount += trace .SpanCount ()
73
136
}
74
- return td .SpanCount (), err
75
137
}
76
- return 0 , nil
138
+ return
77
139
}
0 commit comments