Skip to content

Commit 186038a

Browse files
add token get from context with highest priority
1 parent 570f619 commit 186038a

File tree

3 files changed

+252
-4
lines changed

3 files changed

+252
-4
lines changed

exporter/signalfxexporter/dpclient.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"sync"
1616

1717
sfxpb "github.com/signalfx/com_signalfx_metrics_protobuf/model"
18+
"go.opentelemetry.io/collector/client"
1819
"go.opentelemetry.io/collector/consumer/consumererror"
1920
"go.opentelemetry.io/collector/pdata/pmetric"
2021
"go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp"
@@ -88,7 +89,7 @@ func (s *sfxDPClient) pushMetricsData(
8889
}
8990

9091
// All metrics in the pmetric.Metrics will have the same access token because of the BatchPerResourceMetrics.
91-
metricToken := s.retrieveAccessToken(rms.At(0))
92+
metricToken := s.retrieveAccessToken(ctx, rms.At(0))
9293

9394
// export SFx format
9495
sfxDataPoints := s.converter.MetricsToSignalFxV2(md)
@@ -194,12 +195,18 @@ func (s *sfxDPClient) encodeBody(dps []*sfxpb.DataPoint) (bodyReader io.Reader,
194195
return s.getReader(body)
195196
}
196197

197-
func (s *sfxDPClient) retrieveAccessToken(md pmetric.ResourceMetrics) string {
198+
func (s *sfxDPClient) retrieveAccessToken(ctx context.Context, md pmetric.ResourceMetrics) string {
198199
if !s.accessTokenPassthrough {
199200
// Nothing to do if token is pass through not configured or resource is nil.
200201
return ""
201202
}
202203

204+
cl := client.FromContext(ctx)
205+
ss := cl.Metadata.Get(splunk.SFxAccessTokenHeader)
206+
if len(ss) > 0 {
207+
return ss[0]
208+
}
209+
203210
attrs := md.Resource().Attributes()
204211
if accessToken, ok := attrs.Get(splunk.SFxAccessTokenLabel); ok {
205212
return accessToken.Str()

exporter/signalfxexporter/eventclient.go

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"strings"
1212

1313
sfxpb "github.com/signalfx/com_signalfx_metrics_protobuf/model"
14+
"go.opentelemetry.io/collector/client"
1415
"go.opentelemetry.io/collector/consumer/consumererror"
1516
"go.opentelemetry.io/collector/pdata/pcommon"
1617
"go.opentelemetry.io/collector/pdata/plog"
@@ -33,7 +34,7 @@ func (s *sfxEventClient) pushLogsData(ctx context.Context, ld plog.Logs) (int, e
3334
return 0, nil
3435
}
3536

36-
accessToken := s.retrieveAccessToken(rls.At(0))
37+
accessToken := s.retrieveAccessToken(ctx, rls.At(0))
3738

3839
var sfxEvents []*sfxpb.Event
3940
numDroppedLogRecords := 0
@@ -104,7 +105,18 @@ func (s *sfxEventClient) encodeBody(events []*sfxpb.Event) (bodyReader io.Reader
104105
return s.getReader(body)
105106
}
106107

107-
func (s *sfxEventClient) retrieveAccessToken(rl plog.ResourceLogs) string {
108+
func (s *sfxEventClient) retrieveAccessToken(ctx context.Context, rl plog.ResourceLogs) string {
109+
if !s.accessTokenPassthrough {
110+
// Nothing to do if token is pass through not configured or resource is nil.
111+
return ""
112+
}
113+
114+
cl := client.FromContext(ctx)
115+
ss := cl.Metadata.Get(splunk.SFxAccessTokenHeader)
116+
if len(ss) > 0 {
117+
return ss[0]
118+
}
119+
108120
attrs := rl.Resource().Attributes()
109121
if accessToken, ok := attrs.Get(splunk.SFxAccessTokenLabel); ok && accessToken.Type() == pcommon.ValueTypeStr {
110122
return accessToken.Str()

exporter/signalfxexporter/exporter_test.go

Lines changed: 229 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
sfxpb "github.com/signalfx/com_signalfx_metrics_protobuf/model"
2424
"github.com/stretchr/testify/assert"
2525
"github.com/stretchr/testify/require"
26+
"go.opentelemetry.io/collector/client"
2627
"go.opentelemetry.io/collector/component/componenttest"
2728
"go.opentelemetry.io/collector/config/confighttp"
2829
"go.opentelemetry.io/collector/config/configopaque"
@@ -567,6 +568,138 @@ func TestConsumeMetricsWithAccessTokenPassthrough(t *testing.T) {
567568
}
568569
}
569570

571+
func TestConsumeMetricsAccessTokenPassthroughPriorityToContext(t *testing.T) {
572+
fromHeaders := "AccessTokenFromClientHeaders"
573+
fromLabels := []string{"AccessTokenFromLabel0", "AccessTokenFromLabel1"}
574+
fromContext := "AccessTokenFromContext"
575+
576+
validMetricsWithToken := func(includeToken bool, token string, histogram bool) pmetric.Metrics {
577+
out := pmetric.NewMetrics()
578+
rm := out.ResourceMetrics().AppendEmpty()
579+
580+
if includeToken {
581+
rm.Resource().Attributes().PutStr("com.splunk.signalfx.access_token", token)
582+
}
583+
584+
ilm := rm.ScopeMetrics().AppendEmpty()
585+
m := ilm.Metrics().AppendEmpty()
586+
587+
if histogram {
588+
buildHistogram(m, "test_histogram", pcommon.Timestamp(100000000), 1)
589+
} else {
590+
m.SetName("test_gauge")
591+
592+
dp := m.SetEmptyGauge().DataPoints().AppendEmpty()
593+
dp.Attributes().PutStr("k0", "v0")
594+
dp.Attributes().PutStr("k1", "v1")
595+
dp.SetDoubleValue(123)
596+
}
597+
598+
return out
599+
}
600+
601+
tests := []struct {
602+
name string
603+
accessTokenPassthrough bool
604+
metrics pmetric.Metrics
605+
additionalHeaders map[string]string
606+
pushedTokens []string
607+
sendOTLPHistograms bool
608+
inContext bool
609+
}{
610+
{
611+
name: "passthrough access token and included in md",
612+
accessTokenPassthrough: true,
613+
inContext: true,
614+
metrics: validMetricsWithToken(true, fromLabels[0], false),
615+
pushedTokens: []string{fromContext},
616+
},
617+
{
618+
name: "passthrough access token and not included in md",
619+
accessTokenPassthrough: true,
620+
inContext: true,
621+
metrics: validMetricsWithToken(false, fromLabels[0], false),
622+
pushedTokens: []string{fromContext},
623+
sendOTLPHistograms: false,
624+
},
625+
{
626+
name: "passthrough access token and included in md",
627+
accessTokenPassthrough: true,
628+
inContext: false,
629+
metrics: validMetricsWithToken(true, fromLabels[0], false),
630+
pushedTokens: []string{fromLabels[0]},
631+
},
632+
{
633+
name: "passthrough access token and not included in md",
634+
accessTokenPassthrough: true,
635+
inContext: false,
636+
metrics: validMetricsWithToken(false, fromLabels[0], false),
637+
pushedTokens: []string{fromHeaders},
638+
sendOTLPHistograms: false,
639+
},
640+
}
641+
for _, tt := range tests {
642+
receivedTokens := struct {
643+
sync.Mutex
644+
tokens []string
645+
}{}
646+
receivedTokens.tokens = []string{}
647+
t.Run(tt.name, func(t *testing.T) {
648+
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
649+
assert.Equal(t, tt.name, r.Header.Get("test_header_"))
650+
receivedTokens.Lock()
651+
652+
token := r.Header.Get("x-sf-token")
653+
receivedTokens.tokens = append(receivedTokens.tokens, token)
654+
655+
receivedTokens.Unlock()
656+
w.WriteHeader(http.StatusAccepted)
657+
}))
658+
defer server.Close()
659+
660+
factory := NewFactory()
661+
cfg := factory.CreateDefaultConfig().(*Config)
662+
cfg.IngestURL = server.URL
663+
cfg.APIURL = server.URL
664+
cfg.ClientConfig.Headers = make(map[string]configopaque.String)
665+
for k, v := range tt.additionalHeaders {
666+
cfg.ClientConfig.Headers[k] = configopaque.String(v)
667+
}
668+
cfg.ClientConfig.Headers["test_header_"] = configopaque.String(tt.name)
669+
cfg.AccessToken = configopaque.String(fromHeaders)
670+
cfg.AccessTokenPassthrough = tt.accessTokenPassthrough
671+
cfg.SendOTLPHistograms = tt.sendOTLPHistograms
672+
sfxExp, err := NewFactory().CreateMetrics(context.Background(), exportertest.NewNopSettings(), cfg)
673+
require.NoError(t, err)
674+
ctx := context.Background()
675+
if tt.inContext {
676+
ctx = client.NewContext(
677+
ctx,
678+
client.Info{Metadata: client.NewMetadata(
679+
map[string][]string{splunk.SFxAccessTokenHeader: {fromContext}},
680+
)},
681+
)
682+
}
683+
require.NoError(t, sfxExp.Start(ctx, componenttest.NewNopHost()))
684+
defer func() {
685+
require.NoError(t, sfxExp.Shutdown(context.Background()))
686+
}()
687+
688+
err = sfxExp.ConsumeMetrics(ctx, tt.metrics)
689+
690+
assert.NoError(t, err)
691+
require.Eventually(t, func() bool {
692+
receivedTokens.Lock()
693+
defer receivedTokens.Unlock()
694+
return len(tt.pushedTokens) == len(receivedTokens.tokens)
695+
}, 1*time.Second, 10*time.Millisecond)
696+
sort.Strings(tt.pushedTokens)
697+
sort.Strings(receivedTokens.tokens)
698+
assert.Equal(t, tt.pushedTokens, receivedTokens.tokens)
699+
})
700+
}
701+
}
702+
570703
func TestNewEventExporter(t *testing.T) {
571704
got, err := newEventExporter(nil, exportertest.NewNopSettings())
572705
assert.EqualError(t, err, "nil config")
@@ -812,6 +945,102 @@ func TestConsumeLogsDataWithAccessTokenPassthrough(t *testing.T) {
812945
}
813946
}
814947

948+
func TestConsumeLogsAccessTokenPassthrough(t *testing.T) {
949+
fromHeaders := "AccessTokenFromClientHeaders"
950+
fromLabels := "AccessTokenFromLabel"
951+
fromContext := "AccessTokenFromContext"
952+
953+
newLogData := func(includeToken bool) plog.Logs {
954+
out := makeSampleResourceLogs()
955+
makeSampleResourceLogs().ResourceLogs().At(0).CopyTo(out.ResourceLogs().AppendEmpty())
956+
957+
if includeToken {
958+
out.ResourceLogs().At(0).Resource().Attributes().PutStr("com.splunk.signalfx.access_token", fromLabels)
959+
out.ResourceLogs().At(1).Resource().Attributes().PutStr("com.splunk.signalfx.access_token", fromLabels)
960+
}
961+
return out
962+
}
963+
964+
tests := []struct {
965+
name string
966+
accessTokenPassthrough bool
967+
includedInLogData bool
968+
inContext bool
969+
expectedToken string
970+
}{
971+
{
972+
name: "passthrough access token and not included in request context",
973+
inContext: true,
974+
accessTokenPassthrough: true,
975+
includedInLogData: true,
976+
expectedToken: fromContext,
977+
},
978+
{
979+
name: "passthrough access token and included in logs",
980+
inContext: false,
981+
accessTokenPassthrough: true,
982+
includedInLogData: true,
983+
expectedToken: fromLabels,
984+
},
985+
{
986+
name: "passthrough access token and not included in logs",
987+
inContext: false,
988+
accessTokenPassthrough: false,
989+
includedInLogData: false,
990+
expectedToken: fromHeaders,
991+
},
992+
}
993+
for _, tt := range tests {
994+
t.Run(tt.name, func(t *testing.T) {
995+
receivedTokens := struct {
996+
sync.Mutex
997+
tokens []string
998+
}{}
999+
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
1000+
assert.Equal(t, tt.name, r.Header.Get("test_header_"))
1001+
receivedTokens.Lock()
1002+
receivedTokens.tokens = append(receivedTokens.tokens, r.Header.Get("x-sf-token"))
1003+
receivedTokens.Unlock()
1004+
w.WriteHeader(http.StatusAccepted)
1005+
}))
1006+
defer server.Close()
1007+
1008+
factory := NewFactory()
1009+
cfg := factory.CreateDefaultConfig().(*Config)
1010+
cfg.IngestURL = server.URL
1011+
cfg.APIURL = server.URL
1012+
cfg.Headers = make(map[string]configopaque.String)
1013+
cfg.Headers["test_header_"] = configopaque.String(tt.name)
1014+
cfg.AccessToken = configopaque.String(fromHeaders)
1015+
cfg.AccessTokenPassthrough = tt.accessTokenPassthrough
1016+
sfxExp, err := NewFactory().CreateLogs(context.Background(), exportertest.NewNopSettings(), cfg)
1017+
require.NoError(t, err)
1018+
require.NoError(t, sfxExp.Start(context.Background(), componenttest.NewNopHost()))
1019+
defer func() {
1020+
require.NoError(t, sfxExp.Shutdown(context.Background()))
1021+
}()
1022+
1023+
ctx := context.Background()
1024+
if tt.inContext {
1025+
ctx = client.NewContext(
1026+
ctx,
1027+
client.Info{Metadata: client.NewMetadata(
1028+
map[string][]string{splunk.SFxAccessTokenHeader: {"AccessTokenFromContext"}},
1029+
)},
1030+
)
1031+
}
1032+
assert.NoError(t, sfxExp.ConsumeLogs(ctx, newLogData(tt.includedInLogData)))
1033+
1034+
require.Eventually(t, func() bool {
1035+
receivedTokens.Lock()
1036+
defer receivedTokens.Unlock()
1037+
return len(receivedTokens.tokens) == 1
1038+
}, 1*time.Second, 10*time.Millisecond)
1039+
assert.Equal(t, tt.expectedToken, receivedTokens.tokens[0])
1040+
})
1041+
}
1042+
}
1043+
8151044
func generateLargeDPBatch() pmetric.Metrics {
8161045
md := pmetric.NewMetrics()
8171046
md.ResourceMetrics().EnsureCapacity(6500)

0 commit comments

Comments
 (0)