Skip to content

Commit f361e4d

Browse files
Add signalfx metric token passthrough (#325)
Adding an `access_token_passthrough` configuration option and feature to the SignalFx metric receiver and exporter to allow continued association of datapoints with an [SFx access token](https://docs.signalfx.com/en/latest/admin-guide/tokens.html). This will provide the option having collector-proxied datapoints keep the initial access token set by metric clients instead of requiring all datapoints being tied to the access token sourced from the SFx exporter configuration. **Testing:** Updated relevant unit tests and tested local deployment with multiple clients and batch processor. **Documentation:** Updated readme for both receiver and exporter.
1 parent bb6c8c2 commit f361e4d

File tree

17 files changed

+274
-7
lines changed

17 files changed

+274
-7
lines changed

exporter/signalfxexporter/README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ be used instead. If path is not specified, `/v2/datapoint` is used.
2626
If `realm` is set, this option is derived and will be `https://api.{realm}.signalfx.com/`. If a value is explicitly
2727
set, the value of `realm` will not be used in determining `api_url`. The explicit value will be used instead.
2828
- `log_dimension_updates` (default = `false`): Whether or not to log dimension updates.
29+
- `access_token_passthrough`: (default = `true`) Whether to use `"com.splunk.signalfx.access_token"` metric resource label, if any, as SFx access token. In either case this label will be dropped during final translation. Intended to be used in tandem with identical configuration option for [SignalFx receiver](../../receiver/signalfxreceiver/README.md) to preserve datapoint origin.
2930

3031
Note: Either `realm` or both `ingest_url` and `api_url` should be explicitly set.
3132

@@ -35,6 +36,7 @@ Example:
3536
exporters:
3637
signalfx:
3738
access_token: <replace_with_actual_access_token>
39+
access_token_passthrough: true
3840
headers:
3941
added-entry: "added value"
4042
dot.test: test

exporter/signalfxexporter/config.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ import (
2222
"time"
2323

2424
"go.opentelemetry.io/collector/config/configmodels"
25+
26+
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/splunk"
2527
)
2628

2729
// Config defines configuration for SignalFx exporter.
@@ -58,6 +60,8 @@ type Config struct {
5860

5961
// Whether to log dimension updates being sent to SignalFx.
6062
LogDimensionUpdates bool `mapstructure:"log_dimension_updates"`
63+
64+
splunk.AccessTokenPassthroughConfig `mapstructure:",squash"`
6165
}
6266

6367
func (cfg *Config) getOptionsFromConfig() (*exporterOptions, error) {

exporter/signalfxexporter/config_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ import (
2525
"go.opentelemetry.io/collector/config"
2626
"go.opentelemetry.io/collector/config/configmodels"
2727
"go.uber.org/zap"
28+
29+
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/splunk"
2830
)
2931

3032
func TestLoadConfig(t *testing.T) {
@@ -60,6 +62,9 @@ func TestLoadConfig(t *testing.T) {
6062
"dot.test": "test",
6163
},
6264
Timeout: 2 * time.Second,
65+
AccessTokenPassthroughConfig: splunk.AccessTokenPassthroughConfig{
66+
AccessTokenPassthrough: false,
67+
},
6368
}
6469
assert.Equal(t, &expectedCfg, e1)
6570

exporter/signalfxexporter/dpclient.go

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,22 +31,25 @@ import (
3131
"go.opentelemetry.io/collector/consumer/consumererror"
3232
"go.opentelemetry.io/collector/exporter/exporterhelper"
3333
"go.uber.org/zap"
34+
35+
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/splunk"
3436
)
3537

3638
// sfxDPClient sends the data to the SignalFx backend.
3739
type sfxDPClient struct {
38-
ingestURL *url.URL
39-
headers map[string]string
40-
client *http.Client
41-
logger *zap.Logger
42-
zippers sync.Pool
40+
ingestURL *url.URL
41+
headers map[string]string
42+
client *http.Client
43+
logger *zap.Logger
44+
zippers sync.Pool
45+
accessTokenPassthrough bool
4346
}
4447

4548
func (s *sfxDPClient) pushMetricsData(
4649
ctx context.Context,
4750
md consumerdata.MetricsData,
4851
) (droppedTimeSeries int, err error) {
49-
52+
accessToken := s.retrieveAccessToken(md)
5053
sfxDataPoints, numDroppedTimeseries, err := metricDataToSignalFxV2(s.logger, md)
5154
if err != nil {
5255
return exporterhelper.NumTimeSeries(md), consumererror.Permanent(err)
@@ -66,6 +69,10 @@ func (s *sfxDPClient) pushMetricsData(
6669
req.Header.Set(k, v)
6770
}
6871

72+
if s.accessTokenPassthrough && accessToken != "" {
73+
req.Header.Set(splunk.SFxAccessTokenHeader, accessToken)
74+
}
75+
6976
if compressed {
7077
req.Header.Set("Content-Encoding", "gzip")
7178
}
@@ -98,7 +105,7 @@ func buildHeaders(config *Config) (map[string]string, error) {
98105
}
99106

100107
if config.AccessToken != "" {
101-
headers["X-Sf-Token"] = config.AccessToken
108+
headers[splunk.SFxAccessTokenHeader] = config.AccessToken
102109
}
103110

104111
// Add any custom headers from the config. They will override the pre-defined
@@ -122,6 +129,16 @@ func (s *sfxDPClient) encodeBody(dps []*sfxpb.DataPoint) (bodyReader io.Reader,
122129
return s.getReader(body)
123130
}
124131

132+
func (s *sfxDPClient) retrieveAccessToken(md consumerdata.MetricsData) string {
133+
accessToken := ""
134+
if labels := md.Resource.GetLabels(); labels != nil {
135+
accessToken = labels[splunk.SFxAccessTokenLabel]
136+
// Drop internally passed access token in all cases
137+
delete(labels, splunk.SFxAccessTokenLabel)
138+
}
139+
return accessToken
140+
}
141+
125142
// avoid attempting to compress things that fit into a single ethernet frame
126143
func (s *sfxDPClient) getReader(b []byte) (io.Reader, bool, error) {
127144
var err error

exporter/signalfxexporter/exporter.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ func New(
8181
zippers: sync.Pool{New: func() interface{} {
8282
return gzip.NewWriter(nil)
8383
}},
84+
accessTokenPassthrough: config.AccessTokenPassthrough,
8485
}
8586

8687
dimClient := dimensions.NewDimensionClient(

exporter/signalfxexporter/exporter_test.go

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,103 @@ func TestConsumeMetricsData(t *testing.T) {
144144
}
145145
}
146146

147+
func TestConsumeMetricsDataWithAccessTokenPassthrough(t *testing.T) {
148+
fromHeaders := "AccessTokenFromClientHeaders"
149+
fromLabels := "AccessTokenFromLabel"
150+
151+
newMetricData := func(includeToken bool) consumerdata.MetricsData {
152+
md := consumerdata.MetricsData{
153+
Node: &commonpb.Node{
154+
ServiceInfo: &commonpb.ServiceInfo{Name: "test_signalfx"},
155+
},
156+
Resource: &resourcepb.Resource{
157+
Type: "test",
158+
Labels: map[string]string{
159+
"com.splunk.signalfx.access_token": fromLabels,
160+
},
161+
},
162+
Metrics: []*metricspb.Metric{
163+
metricstestutils.Gauge(
164+
"test_gauge",
165+
[]string{"k0", "k1"},
166+
metricstestutils.Timeseries(
167+
time.Now(),
168+
[]string{"v0", "v1"},
169+
metricstestutils.Double(time.Now(), 123))),
170+
},
171+
}
172+
if !includeToken {
173+
delete(md.Resource.Labels, "com.splunk.signalfx.access_token")
174+
}
175+
return md
176+
}
177+
178+
tests := []struct {
179+
name string
180+
accessTokenPassthrough bool
181+
includedInMetricData bool
182+
expectedToken string
183+
}{
184+
{
185+
name: "passthrough access token and included in md",
186+
accessTokenPassthrough: true,
187+
includedInMetricData: true,
188+
expectedToken: fromLabels,
189+
},
190+
{
191+
name: "passthrough access token and not included in md",
192+
accessTokenPassthrough: true,
193+
includedInMetricData: false,
194+
expectedToken: fromHeaders,
195+
},
196+
{
197+
name: "don't passthrough access token and included in md",
198+
accessTokenPassthrough: false,
199+
includedInMetricData: true,
200+
expectedToken: fromHeaders,
201+
},
202+
{
203+
name: "don't passthrough access token and not included in md",
204+
accessTokenPassthrough: false,
205+
includedInMetricData: false,
206+
expectedToken: fromHeaders,
207+
},
208+
}
209+
for _, tt := range tests {
210+
t.Run(tt.name, func(t *testing.T) {
211+
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
212+
assert.Equal(t, "test", r.Header.Get("test_header_"))
213+
assert.Equal(t, tt.expectedToken, r.Header.Get("x-sf-token"))
214+
w.WriteHeader(http.StatusAccepted)
215+
}))
216+
defer server.Close()
217+
218+
serverURL, err := url.Parse(server.URL)
219+
assert.NoError(t, err)
220+
221+
dpClient := &sfxDPClient{
222+
ingestURL: serverURL,
223+
headers: map[string]string{
224+
"test_header_": "test",
225+
"X-Sf-Token": fromHeaders,
226+
},
227+
client: &http.Client{
228+
Timeout: 1 * time.Second,
229+
},
230+
logger: zap.NewNop(),
231+
zippers: sync.Pool{New: func() interface{} {
232+
return gzip.NewWriter(nil)
233+
}},
234+
accessTokenPassthrough: tt.accessTokenPassthrough,
235+
}
236+
237+
numDroppedTimeSeries, err := dpClient.pushMetricsData(context.Background(), newMetricData(tt.includedInMetricData))
238+
assert.Equal(t, 0, numDroppedTimeSeries)
239+
assert.NoError(t, err)
240+
})
241+
}
242+
}
243+
147244
func generateLargeBatch(t *testing.T) *consumerdata.MetricsData {
148245
md := &consumerdata.MetricsData{
149246
Node: &commonpb.Node{

exporter/signalfxexporter/factory.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ import (
2121
"go.opentelemetry.io/collector/config/configerror"
2222
"go.opentelemetry.io/collector/config/configmodels"
2323
"go.uber.org/zap"
24+
25+
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/splunk"
2426
)
2527

2628
const (
@@ -47,6 +49,9 @@ func (f *Factory) CreateDefaultConfig() configmodels.Exporter {
4749
NameVal: typeStr,
4850
},
4951
Timeout: defaultHTTPTimeout,
52+
AccessTokenPassthroughConfig: splunk.AccessTokenPassthroughConfig{
53+
AccessTokenPassthrough: true,
54+
},
5055
}
5156
}
5257

exporter/signalfxexporter/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ go 1.14
55
require (
66
github.com/census-instrumentation/opencensus-proto v0.2.1
77
github.com/golang/protobuf v1.3.5
8+
github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.0.0
89
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver v0.0.0
910
github.com/signalfx/com_signalfx_metrics_protobuf v0.0.0-20190530013331-054be550cb49
1011
github.com/stretchr/testify v1.5.1

exporter/signalfxexporter/testdata/config.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ exporters:
1414
headers:
1515
added-entry: "added value"
1616
dot.test: test
17+
access_token_passthrough: false
1718

1819
service:
1920
pipelines:

internal/common/splunk/common.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
// Copyright 2020, OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package splunk
16+
17+
const (
18+
SFxAccessTokenHeader = "X-Sf-Token"
19+
SFxAccessTokenLabel = "com.splunk.signalfx.access_token"
20+
)
21+
22+
type AccessTokenPassthroughConfig struct {
23+
// Whether to associate datapoints with an organization access token received in request.
24+
AccessTokenPassthrough bool `mapstructure:"access_token_passthrough"`
25+
}

receiver/signalfxreceiver/README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,14 @@ Example:
1212
receivers:
1313
signalfx:
1414
endpoint: localhost:7276
15+
access_token_passthrough: true
1516
tls:
1617
cert_file: /test.crt
1718
key_file: /test.key
1819
```
1920
2021
* `endpoint`: Address and port that the SignalFx receiver should bind to. Note that this must be 0.0.0.0:<port> instead of localhost if you want to receive spans from sources exporting to IPs other than localhost on the same host. For example, when the collector is deployed as a k8s deployment and exposed using a service.
22+
* `access_token_passthrough`: (default = `false`) Whether to preserve incoming access token (`X-Sf-Token` header value) as `"com.splunk.signalfx.access_token"` metric resource label. Can be used in tandem with identical configuration option for [SignalFx exporter](../../exporter/signalfxexporter/README.md) to preserve datapoint origin.
2123
* `tls`: This is an optional object used to specify if TLS should be used for incoming connections.
2224
* `cert_file`: Specifies the certificate file to use for TLS connection. Note: Both `key_file` and `cert_file` are required for TLS connection.
2325
* `key_file`: Specifies the key file to use for TLS connection. Note: Both `key_file` and `cert_file` are required for TLS connection.

receiver/signalfxreceiver/config.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ package signalfxreceiver
1717
import (
1818
"go.opentelemetry.io/collector/config/configmodels"
1919
"go.opentelemetry.io/collector/config/configtls"
20+
21+
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/splunk"
2022
)
2123

2224
// Config defines configuration for the SignalFx receiver.
@@ -26,4 +28,6 @@ type Config struct {
2628
// Configures the receiver to use TLS.
2729
// The default value is nil, which will cause the receiver to not use TLS.
2830
TLSCredentials *configtls.TLSSetting `mapstructure:"tls, omitempty"`
31+
32+
splunk.AccessTokenPassthroughConfig `mapstructure:",squash"`
2933
}

receiver/signalfxreceiver/config_test.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ import (
2323
"go.opentelemetry.io/collector/config"
2424
"go.opentelemetry.io/collector/config/configmodels"
2525
"go.opentelemetry.io/collector/config/configtls"
26+
27+
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/splunk"
2628
)
2729

2830
func TestLoadConfig(t *testing.T) {
@@ -51,6 +53,9 @@ func TestLoadConfig(t *testing.T) {
5153
NameVal: "signalfx/allsettings",
5254
Endpoint: "localhost:8080",
5355
},
56+
AccessTokenPassthroughConfig: splunk.AccessTokenPassthroughConfig{
57+
AccessTokenPassthrough: true,
58+
},
5459
})
5560

5661
r2 := cfg.Receivers["signalfx/tls"].(*Config)
@@ -64,5 +69,8 @@ func TestLoadConfig(t *testing.T) {
6469
CertFile: "/test.crt",
6570
KeyFile: "/test.key",
6671
},
72+
AccessTokenPassthroughConfig: splunk.AccessTokenPassthroughConfig{
73+
AccessTokenPassthrough: false,
74+
},
6775
})
6876
}

receiver/signalfxreceiver/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ require (
77
github.com/golang/protobuf v1.3.5
88
github.com/gorilla/mux v1.7.3
99
github.com/open-telemetry/opentelemetry-collector-contrib/exporter/signalfxexporter v0.0.0
10+
github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.0.0
1011
github.com/signalfx/com_signalfx_metrics_protobuf v0.0.0-20190530013331-054be550cb49
1112
github.com/stretchr/testify v1.5.1
1213
go.opencensus.io v0.22.3

receiver/signalfxreceiver/receiver.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"time"
2626
"unsafe"
2727

28+
resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1"
2829
"github.com/golang/protobuf/proto"
2930
"github.com/gorilla/mux"
3031
sfxpb "github.com/signalfx/com_signalfx_metrics_protobuf"
@@ -35,6 +36,8 @@ import (
3536
"go.opentelemetry.io/collector/obsreport"
3637
"go.opentelemetry.io/collector/translator/conventions"
3738
"go.uber.org/zap"
39+
40+
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/splunk"
3841
)
3942

4043
const (
@@ -211,6 +214,18 @@ func (r *sfxReceiver) handleReq(resp http.ResponseWriter, req *http.Request) {
211214

212215
md, _ := SignalFxV2ToMetricsData(r.logger, msg.Datapoints)
213216

217+
if r.config.AccessTokenPassthrough {
218+
if accessToken := req.Header.Get(splunk.SFxAccessTokenHeader); accessToken != "" {
219+
if md.Resource == nil {
220+
md.Resource = &resourcepb.Resource{}
221+
}
222+
if md.Resource.Labels == nil {
223+
md.Resource.Labels = make(map[string]string, 1)
224+
}
225+
md.Resource.Labels[splunk.SFxAccessTokenLabel] = accessToken
226+
}
227+
}
228+
214229
err = r.nextConsumer.ConsumeMetricsData(ctx, *md)
215230
obsreport.EndMetricsReceiveOp(
216231
ctx,

0 commit comments

Comments
 (0)