Skip to content

Commit ef00455

Browse files
jinja2XinRanZhAWS
authored andcommitted
[exporter/signalfx] Add option to send histograms in OTLP format (open-telemetry#31197)
**Description:** This PR introduces a new config `send_otlp_histograms` to the `signalfx` exporter. When the option is enabled. the exporter will send histograms in OTLP format to the Splunk Observability backend. **Link to tracking Issue:** [26298](open-telemetry#26298)
1 parent afd8819 commit ef00455

File tree

18 files changed

+1333
-82
lines changed

18 files changed

+1333
-82
lines changed

.chloggen/signalfx-exp-otlp.yaml

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: exporter/signalfx
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Send histograms in otlp format with new config `send_otlp_histograms` option
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [26298]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

.chloggen/signalfx-recv-otlp.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ component: receiver/signalfx
1010
note: Accept otlp protobuf requests when content-type is "application/x-protobuf;format=otlp"
1111

1212
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13-
issues: [31052]
13+
issues: [26298]
1414

1515
# (Optional) One or more lines of additional information to render under the primary note.
1616
# These lines will be padded with 2 spaces and then inserted directly into the document.

exporter/signalfxexporter/README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,8 @@ will be replaced with a `_`.
145145
api_tls:
146146
ca_file: "/etc/opt/certs/ca.pem"
147147
```
148-
- `drop_histogram_buckets`: (default = `false`) if set to true, histogram buckets will not be translated into datapoints with `_bucket` suffix but will be dropped instead, only datapoints with `_sum`, `_count`, `_min` (optional) and `_max` (optional) suffixes will be sent.
148+
- `drop_histogram_buckets`: (default = `false`) if set to true, histogram buckets will not be translated into datapoints with `_bucket` suffix but will be dropped instead, only datapoints with `_sum`, `_count`, `_min` (optional) and `_max` (optional) suffixes will be sent. Please note that this option does not apply to histograms sent in OTLP format with `send_otlp_histograms` enabled.
149+
- `send_otlp_histograms`: (default: `false`) if set to true, any histogram metrics receiver by the exporter will be sent to Splunk Observability backend in OTLP format without conversion to SignalFx format. This can only be enabled if the Splunk Observability environment (realm) has the new Histograms feature rolled out. Please note that histograms sent in OTLP format do not apply to the exporter configurations `include_metrics` and `exclude_metrics`.
149150
In addition, this exporter offers queued retry which is enabled by default.
150151
Information about queued retry configuration parameters can be found
151152
[here](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/README.md).

exporter/signalfxexporter/config.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ type Config struct {
112112

113113
// ExcludeMetrics defines dpfilter.MetricFilters that will determine metrics to be
114114
// excluded from sending to SignalFx backend. If translations enabled with
115-
// TranslationRules options, the exclusion will be applie on translated metrics.
115+
// TranslationRules options, the exclusion will be applied on translated metrics.
116116
ExcludeMetrics []dpfilters.MetricFilter `mapstructure:"exclude_metrics"`
117117

118118
// IncludeMetrics defines dpfilter.MetricFilters to override exclusion any of metric.
@@ -134,6 +134,10 @@ type Config struct {
134134
// Whether to drop histogram bucket metrics dispatched to Splunk Observability.
135135
// Default value is set to false.
136136
DropHistogramBuckets bool `mapstructure:"drop_histogram_buckets"`
137+
138+
// Whether to send histogram metrics in OTLP format to Splunk Observability.
139+
// Default value is set to false.
140+
SendOTLPHistograms bool `mapstructure:"send_otlp_histograms"`
137141
}
138142

139143
type DimensionClientConfig struct {

exporter/signalfxexporter/config_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ func TestLoadConfig(t *testing.T) {
104104
},
105105
},
106106
NonAlphanumericDimensionChars: "_-.",
107+
SendOTLPHistograms: false,
107108
},
108109
},
109110
{
@@ -263,6 +264,7 @@ func TestLoadConfig(t *testing.T) {
263264
},
264265
},
265266
NonAlphanumericDimensionChars: "_-.",
267+
SendOTLPHistograms: true,
266268
},
267269
},
268270
}

exporter/signalfxexporter/dpclient.go

Lines changed: 125 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,20 @@ import (
1717
sfxpb "github.com/signalfx/com_signalfx_metrics_protobuf/model"
1818
"go.opentelemetry.io/collector/consumer/consumererror"
1919
"go.opentelemetry.io/collector/pdata/pmetric"
20+
"go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp"
2021
"go.uber.org/zap"
2122

2223
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/signalfxexporter/internal/translation"
24+
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/signalfxexporter/internal/utils"
2325
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/splunk"
2426
)
2527

28+
const (
29+
contentEncodingHeader = "Content-Encoding"
30+
contentTypeHeader = "Content-Type"
31+
otlpProtobufContentType = "application/x-protobuf;format=otlp"
32+
)
33+
2634
type sfxClientBase struct {
2735
ingestURL *url.URL
2836
headers map[string]string
@@ -58,6 +66,7 @@ type sfxDPClient struct {
5866
logger *zap.Logger
5967
accessTokenPassthrough bool
6068
converter *translation.MetricsConverter
69+
sendOTLPHistograms bool
6170
}
6271

6372
func (s *sfxDPClient) pushMetricsData(
@@ -81,48 +90,55 @@ func (s *sfxDPClient) pushMetricsData(
8190
// All metrics in the pmetric.Metrics will have the same access token because of the BatchPerResourceMetrics.
8291
metricToken := s.retrieveAccessToken(rms.At(0))
8392

93+
// export SFx format
8494
sfxDataPoints := s.converter.MetricsToSignalFxV2(md)
85-
if s.logDataPoints {
86-
for _, dp := range sfxDataPoints {
87-
s.logger.Debug("Dispatching SFx datapoint", zap.Stringer("dp", dp))
95+
if len(sfxDataPoints) > 0 {
96+
droppedCount, err := s.pushMetricsDataForToken(ctx, sfxDataPoints, metricToken)
97+
if err != nil {
98+
return droppedCount, err
8899
}
89100
}
90-
return s.pushMetricsDataForToken(ctx, sfxDataPoints, metricToken)
91-
}
92101

93-
func (s *sfxDPClient) pushMetricsDataForToken(ctx context.Context, sfxDataPoints []*sfxpb.DataPoint, accessToken string) (int, error) {
94-
body, compressed, err := s.encodeBody(sfxDataPoints)
95-
if err != nil {
96-
return len(sfxDataPoints), consumererror.NewPermanent(err)
102+
// export any histograms in otlp if sendOTLPHistograms is true
103+
if s.sendOTLPHistograms {
104+
histogramData, metricCount := utils.GetHistograms(md)
105+
if metricCount > 0 {
106+
droppedCount, err := s.pushOTLPMetricsDataForToken(ctx, histogramData, metricToken)
107+
if err != nil {
108+
return droppedCount, err
109+
}
110+
}
97111
}
98112

113+
return 0, nil
114+
115+
}
116+
117+
func (s *sfxDPClient) postData(ctx context.Context, body io.Reader, headers map[string]string) error {
99118
datapointURL := *s.ingestURL
100119
if !strings.HasSuffix(datapointURL.Path, "v2/datapoint") {
101120
datapointURL.Path = path.Join(datapointURL.Path, "v2/datapoint")
102121
}
103122
req, err := http.NewRequestWithContext(ctx, "POST", datapointURL.String(), body)
104123
if err != nil {
105-
return len(sfxDataPoints), consumererror.NewPermanent(err)
124+
return consumererror.NewPermanent(err)
106125
}
107126

127+
// Set the headers configured in sfxDPClient
108128
for k, v := range s.headers {
109129
req.Header.Set(k, v)
110130
}
111131

112-
// Override access token in headers map if it's non empty.
113-
if accessToken != "" {
114-
req.Header.Set(splunk.SFxAccessTokenHeader, accessToken)
115-
}
116-
117-
if compressed {
118-
req.Header.Set("Content-Encoding", "gzip")
132+
// Set any extra headers passed by the caller
133+
for k, v := range headers {
134+
req.Header.Set(k, v)
119135
}
120136

121137
// TODO: Mark errors as partial errors wherever applicable when, partial
122138
// error for metrics is available.
123139
resp, err := s.client.Do(req)
124140
if err != nil {
125-
return len(sfxDataPoints), err
141+
return err
126142
}
127143

128144
defer func() {
@@ -132,7 +148,39 @@ func (s *sfxDPClient) pushMetricsDataForToken(ctx context.Context, sfxDataPoints
132148

133149
err = splunk.HandleHTTPCode(resp)
134150
if err != nil {
135-
return len(sfxDataPoints), err
151+
return err
152+
}
153+
return nil
154+
}
155+
156+
func (s *sfxDPClient) pushMetricsDataForToken(ctx context.Context, sfxDataPoints []*sfxpb.DataPoint, accessToken string) (int, error) {
157+
158+
if s.logDataPoints {
159+
for _, dp := range sfxDataPoints {
160+
s.logger.Debug("Dispatching SFx datapoint", zap.Stringer("dp", dp))
161+
}
162+
}
163+
164+
body, compressed, err := s.encodeBody(sfxDataPoints)
165+
dataPointCount := len(sfxDataPoints)
166+
if err != nil {
167+
return dataPointCount, consumererror.NewPermanent(err)
168+
}
169+
170+
headers := make(map[string]string)
171+
172+
// Override access token in headers map if it's non empty.
173+
if accessToken != "" {
174+
headers[splunk.SFxAccessTokenHeader] = accessToken
175+
}
176+
177+
if compressed {
178+
headers[contentEncodingHeader] = "gzip"
179+
}
180+
181+
err = s.postData(ctx, body, headers)
182+
if err != nil {
183+
return dataPointCount, err
136184
}
137185
return 0, nil
138186
}
@@ -160,3 +208,61 @@ func (s *sfxDPClient) retrieveAccessToken(md pmetric.ResourceMetrics) string {
160208
}
161209
return ""
162210
}
211+
212+
func (s *sfxDPClient) pushOTLPMetricsDataForToken(ctx context.Context, mh pmetric.Metrics, accessToken string) (int, error) {
213+
214+
dataPointCount := mh.DataPointCount()
215+
if s.logDataPoints {
216+
s.logger.Debug("Count of metrics to send in OTLP format",
217+
zap.Int("resource metrics", mh.ResourceMetrics().Len()),
218+
zap.Int("metrics", mh.MetricCount()),
219+
zap.Int("data points", dataPointCount))
220+
buf, err := metricsMarshaler.MarshalMetrics(mh)
221+
if err != nil {
222+
s.logger.Error("Failed to marshal metrics for logging otlp histograms", zap.Error(err))
223+
} else {
224+
s.logger.Debug("Dispatching OTLP metrics", zap.String("pmetrics", string(buf)))
225+
}
226+
}
227+
228+
body, compressed, err := s.encodeOTLPBody(mh)
229+
if err != nil {
230+
return dataPointCount, consumererror.NewPermanent(err)
231+
}
232+
233+
headers := make(map[string]string)
234+
235+
// Set otlp content-type header
236+
headers[contentTypeHeader] = otlpProtobufContentType
237+
238+
// Override access token in headers map if it's non-empty.
239+
if accessToken != "" {
240+
headers[splunk.SFxAccessTokenHeader] = accessToken
241+
}
242+
243+
if compressed {
244+
headers[contentEncodingHeader] = "gzip"
245+
}
246+
247+
s.logger.Debug("Sending metrics in OTLP format")
248+
249+
err = s.postData(ctx, body, headers)
250+
251+
if err != nil {
252+
return dataPointCount, consumererror.NewMetrics(err, mh)
253+
}
254+
255+
return 0, nil
256+
}
257+
258+
func (s *sfxDPClient) encodeOTLPBody(md pmetric.Metrics) (bodyReader io.Reader, compressed bool, err error) {
259+
260+
tr := pmetricotlp.NewExportRequestFromMetrics(md)
261+
262+
body, err := tr.MarshalProto()
263+
264+
if err != nil {
265+
return nil, false, err
266+
}
267+
return s.getReader(body)
268+
}

exporter/signalfxexporter/exporter.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ func newSignalFxExporter(
8484
config.IncludeMetrics,
8585
config.NonAlphanumericDimensionChars,
8686
config.DropHistogramBuckets,
87+
!config.SendOTLPHistograms, // if SendOTLPHistograms is true, do not process histograms when converting to SFx
8788
)
8889
if err != nil {
8990
return nil, fmt.Errorf("failed to create metric converter: %w", err)
@@ -121,6 +122,7 @@ func (se *signalfxExporter) start(ctx context.Context, host component.Host) (err
121122
logger: se.logger,
122123
accessTokenPassthrough: se.config.AccessTokenPassthrough,
123124
converter: se.converter,
125+
sendOTLPHistograms: se.config.SendOTLPHistograms,
124126
}
125127

126128
apiTLSCfg, err := se.config.APITLSSettings.LoadTLSConfig()

0 commit comments

Comments
 (0)