4
4
package faroexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/faroexporter"
5
5
6
6
import (
7
+ "bytes"
7
8
"context"
9
+ "encoding/json"
8
10
"errors"
9
11
"fmt"
12
+ "io"
13
+ "net/http"
10
14
"net/url"
11
15
"runtime"
16
+ "strconv"
17
+ "sync"
18
+ "time"
12
19
20
+ faro "github.com/grafana/faro/pkg/go"
13
21
"go.opentelemetry.io/collector/component"
14
22
"go.opentelemetry.io/collector/consumer"
23
+ "go.opentelemetry.io/collector/consumer/consumererror"
15
24
"go.opentelemetry.io/collector/exporter"
25
+ "go.opentelemetry.io/collector/exporter/exporterhelper"
16
26
"go.opentelemetry.io/collector/pdata/plog"
17
27
"go.opentelemetry.io/collector/pdata/ptrace"
28
+ "go.uber.org/multierr"
18
29
"go.uber.org/zap"
30
+
31
+ farotranslator "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/faro"
19
32
)
20
33
21
34
type faroExporter struct {
22
- config * Config
23
- // client *http.Client
35
+ config * Config
36
+ client * http.Client
24
37
logger * zap.Logger
25
38
settings component.TelemetrySettings
26
39
userAgent string
27
40
}
28
41
42
+ const (
43
+ headerRetryAfter = "Retry-After"
44
+ jsonContentType = "application/json"
45
+ )
46
+
29
47
func newExporter (cfg component.Config , set exporter.Settings ) (* faroExporter , error ) {
30
48
oCfg := cfg .(* Config )
31
49
@@ -47,16 +65,97 @@ func newExporter(cfg component.Config, set exporter.Settings) (*faroExporter, er
47
65
}, nil
48
66
}
49
67
50
- func (fe * faroExporter ) start (_ context.Context , _ component.Host ) error {
68
+ func (fe * faroExporter ) start (ctx context.Context , host component.Host ) error {
69
+ client , err := fe .config .ClientConfig .ToClient (ctx , host , fe .settings )
70
+ if err != nil {
71
+ return err
72
+ }
73
+ fe .client = client
51
74
return nil
52
75
}
53
76
54
- func (fe * faroExporter ) ConsumeTraces (_ context.Context , _ ptrace.Traces ) error {
55
- return nil
77
+ func (fe * faroExporter ) ConsumeTraces (ctx context.Context , td ptrace.Traces ) error {
78
+ fp , err := farotranslator .TranslateFromTraces (ctx , td )
79
+ if err != nil {
80
+ return fmt .Errorf ("failed to translate traces to faro payloads: %w" , err )
81
+ }
82
+ return fe .consume (ctx , fp )
56
83
}
57
84
58
- func (fe * faroExporter ) ConsumeLogs (_ context.Context , _ plog.Logs ) error {
59
- return nil
85
+ func (fe * faroExporter ) ConsumeLogs (ctx context.Context , ld plog.Logs ) error {
86
+ fp , err := farotranslator .TranslateFromLogs (ctx , ld )
87
+ if err != nil {
88
+ return fmt .Errorf ("failed to translate logs to faro payloads: %w" , err )
89
+ }
90
+ return fe .consume (ctx , fp )
91
+ }
92
+
93
+ func (fe * faroExporter ) export (ctx context.Context , fp * faro.Payload ) error {
94
+ fe .logger .Debug ("Preparing to make HTTP request" , zap .String ("endpoint" , fe .config .Endpoint ))
95
+ request , err := json .Marshal (fp )
96
+ if err != nil {
97
+ return consumererror .NewPermanent (err )
98
+ }
99
+ req , err := http .NewRequestWithContext (ctx , http .MethodPost , fe .config .Endpoint , bytes .NewReader (request ))
100
+ if err != nil {
101
+ return consumererror .NewPermanent (err )
102
+ }
103
+ req .Header .Set ("Content-Type" , jsonContentType )
104
+ req .Header .Set ("User-Agent" , fe .userAgent )
105
+
106
+ resp , err := fe .client .Do (req )
107
+ if err != nil {
108
+ return fmt .Errorf ("failed to make an HTTP request: %w" , err )
109
+ }
110
+ defer resp .Body .Close ()
111
+
112
+ if resp .StatusCode == http .StatusAccepted {
113
+ return nil
114
+ }
115
+
116
+ var errString string
117
+ var formattedErr error
118
+ bodyBytes , err := io .ReadAll (resp .Body )
119
+ bodyContent := "unknown response"
120
+ if err == nil {
121
+ bodyContent = string (bodyBytes )
122
+ }
123
+
124
+ errString = fmt .Sprintf (
125
+ "error exporting items, request to %s responded with HTTP Status Code %d, Message=%s" ,
126
+ fe .config .Endpoint , resp .StatusCode , bodyContent )
127
+ formattedErr = newStatusFromMsgAndHTTPCode (errString , resp .StatusCode ).Err ()
128
+
129
+ if isRetryableStatusCode (resp .StatusCode ) {
130
+ retryAfter := 0
131
+ isThrottleError := resp .StatusCode == http .StatusTooManyRequests || resp .StatusCode == http .StatusServiceUnavailable
132
+ if val := resp .Header .Get (headerRetryAfter ); isThrottleError && val != "" {
133
+ if seconds , err := strconv .Atoi (val ); err == nil {
134
+ retryAfter = seconds
135
+ }
136
+ }
137
+ return exporterhelper .NewThrottleRetry (formattedErr , time .Duration (retryAfter )* time .Second )
138
+ }
139
+
140
+ return consumererror .NewPermanent (formattedErr )
141
+ }
142
+
143
+ func (fe * faroExporter ) consume (ctx context.Context , fp []faro.Payload ) error {
144
+ var errs error
145
+ var wg sync.WaitGroup
146
+ wg .Add (len (fp ))
147
+ var mu sync.Mutex
148
+ for _ , p := range fp {
149
+ go func () {
150
+ defer wg .Done ()
151
+ err := fe .export (ctx , & p )
152
+ mu .Lock ()
153
+ errs = multierr .Append (errs , err )
154
+ mu .Unlock ()
155
+ }()
156
+ }
157
+ wg .Wait ()
158
+ return errs
60
159
}
61
160
62
161
func (fe * faroExporter ) Capabilities () consumer.Capabilities {
0 commit comments