@@ -9,10 +9,9 @@ import (
9
9
"strconv"
10
10
"strings"
11
11
12
- "github.com/Azure/azure-kusto-go/kusto"
13
- kustoerrors "github.com/Azure/azure-kusto-go/kusto/data/errors"
14
- "github.com/Azure/azure-kusto-go/kusto/ingest"
15
- "github.com/Azure/azure-kusto-go/kusto/ingest/ingestoptions"
12
+ "github.com/Azure/azure-kusto-go/azkustodata"
13
+ "github.com/Azure/azure-kusto-go/azkustoingest"
14
+ "github.com/Azure/azure-kusto-go/azkustoingest/ingestoptions"
16
15
jsoniter "github.com/json-iterator/go"
17
16
"go.opentelemetry.io/collector/pdata/pcommon"
18
17
"go.opentelemetry.io/collector/pdata/plog"
@@ -23,10 +22,9 @@ import (
23
22
24
23
// adxDataProducer uses the ADX client to perform ingestion
25
24
type adxDataProducer struct {
26
- client * kusto.Client // client for logs, traces and metrics
27
- ingestor ingest.Ingestor // ingestion for logs, traces and metrics
28
- ingestOptions []ingest.FileOption // options for the ingestion
29
- logger * zap.Logger // logger for tracing the flow
25
+ ingestor azkustoingest.Ingestor // ingestion for logs, traces and metrics
26
+ ingestOptions []azkustoingest.FileOption // options for the ingestion
27
+ logger * zap.Logger // logger for tracing the flow
30
28
}
31
29
32
30
const nextline = "\n "
@@ -127,13 +125,8 @@ func (e *adxDataProducer) tracesDataPusher(_ context.Context, traceData ptrace.T
127
125
}
128
126
129
127
func (e * adxDataProducer ) Close (context.Context ) error {
130
- var err error
131
-
132
- err = e .ingestor .Close ()
133
-
134
- if clientErr := e .client .Close (); clientErr != nil {
135
- err = kustoerrors .GetCombinedError (err , clientErr )
136
- }
128
+ // Close the ingestor and client connections
129
+ err := e .ingestor .Close ()
137
130
if err != nil {
138
131
e .logger .Warn ("Error closing connections" , zap .Error (err ))
139
132
} else {
@@ -149,94 +142,94 @@ func newExporter(config *Config, logger *zap.Logger, telemetryDataType int, vers
149
142
if err != nil {
150
143
return nil , err
151
144
}
152
- metricClient , err := buildAdxClient (config , version )
153
- if err != nil {
154
- return nil , err
155
- }
156
145
157
- var ingestor ingest .Ingestor
146
+ var ingestor azkustoingest .Ingestor
158
147
159
- var ingestOptions []ingest .FileOption
160
- ingestOptions = append (ingestOptions , ingest .FileFormat (ingest .JSON ))
161
- ingestOptions = append (ingestOptions , ingest .CompressionType (ingestoptions .GZIP ))
148
+ var ingestOptions []azkustoingest .FileOption
149
+ ingestOptions = append (ingestOptions , azkustoingest .FileFormat (azkustoingest .JSON ))
150
+ ingestOptions = append (ingestOptions , azkustoingest .CompressionType (ingestoptions .GZIP ))
162
151
// Expect that this mapping is already existent
163
152
if refOption := getMappingRef (config , telemetryDataType ); refOption != nil {
164
153
ingestOptions = append (ingestOptions , refOption )
165
154
}
166
155
// The exporter could be configured to run in either modes. Using managedstreaming or batched queueing
167
156
if strings .ToLower (config .IngestionType ) == managedIngestType {
168
- mi , err := createManagedStreamingIngestor (config , metricClient , tableName )
157
+ mi , err := createManagedStreamingIngestor (config , version , tableName )
169
158
if err != nil {
170
159
return nil , err
171
160
}
172
161
ingestor = mi
173
162
} else {
174
- qi , err := createQueuedIngestor (config , metricClient , tableName )
163
+ qi , err := createQueuedIngestor (config , version , tableName )
175
164
if err != nil {
176
165
return nil , err
177
166
}
178
167
ingestor = qi
179
168
}
180
169
return & adxDataProducer {
181
- client : metricClient ,
182
170
ingestOptions : ingestOptions ,
183
171
ingestor : ingestor ,
184
172
logger : logger ,
185
173
}, nil
186
174
}
187
175
188
176
// Fetches the corresponding ingestionRef if the mapping is provided
189
- func getMappingRef (config * Config , telemetryDataType int ) ingest .FileOption {
177
+ func getMappingRef (config * Config , telemetryDataType int ) azkustoingest .FileOption {
190
178
switch telemetryDataType {
191
179
case metricsType :
192
180
if ! isEmpty (config .MetricTableMapping ) {
193
- return ingest .IngestionMappingRef (config .MetricTableMapping , ingest .JSON )
181
+ return azkustoingest .IngestionMappingRef (config .MetricTableMapping , azkustoingest .JSON )
194
182
}
195
183
case tracesType :
196
184
if ! isEmpty (config .TraceTableMapping ) {
197
- return ingest .IngestionMappingRef (config .TraceTableMapping , ingest .JSON )
185
+ return azkustoingest .IngestionMappingRef (config .TraceTableMapping , azkustoingest .JSON )
198
186
}
199
187
case logsType :
200
188
if ! isEmpty (config .LogTableMapping ) {
201
- return ingest .IngestionMappingRef (config .LogTableMapping , ingest .JSON )
189
+ return azkustoingest .IngestionMappingRef (config .LogTableMapping , azkustoingest .JSON )
202
190
}
203
191
}
204
192
return nil
205
193
}
206
194
207
- func buildAdxClient (config * Config , version string ) (* kusto.Client , error ) {
208
- client , err := kusto .New (createKcsb (config , version ))
209
- return client , err
210
- }
211
-
212
- func createKcsb (config * Config , version string ) * kusto.ConnectionStringBuilder {
213
- var kcsb * kusto.ConnectionStringBuilder
195
+ func createKcsb (config * Config , version string ) * azkustodata.ConnectionStringBuilder {
196
+ var kcsb * azkustodata.ConnectionStringBuilder
214
197
isManagedIdentity := len (strings .TrimSpace (config .ManagedIdentityID )) > 0
215
198
isSystemManagedIdentity := strings .EqualFold (strings .TrimSpace (config .ManagedIdentityID ), "SYSTEM" )
216
199
// If the user has managed identity done, use it. For System managed identity use the MI as system
217
200
switch {
218
201
case config .UseAzureAuth :
219
- kcsb = kusto .NewConnectionStringBuilder (config .ClusterURI ).WithDefaultAzureCredential ()
202
+ kcsb = azkustodata .NewConnectionStringBuilder (config .ClusterURI ).WithDefaultAzureCredential ()
220
203
case ! isManagedIdentity :
221
- kcsb = kusto .NewConnectionStringBuilder (config .ClusterURI ).WithAadAppKey (config .ApplicationID , string (config .ApplicationKey ), config .TenantID )
204
+ kcsb = azkustodata .NewConnectionStringBuilder (config .ClusterURI ).WithAadAppKey (config .ApplicationID , string (config .ApplicationKey ), config .TenantID )
222
205
case isManagedIdentity && isSystemManagedIdentity :
223
- kcsb = kusto .NewConnectionStringBuilder (config .ClusterURI ).WithSystemManagedIdentity ()
206
+ kcsb = azkustodata .NewConnectionStringBuilder (config .ClusterURI ).WithSystemManagedIdentity ()
224
207
case isManagedIdentity && ! isSystemManagedIdentity :
225
- kcsb = kusto .NewConnectionStringBuilder (config .ClusterURI ).WithUserManagedIdentity (config .ManagedIdentityID )
208
+ kcsb = azkustodata .NewConnectionStringBuilder (config .ClusterURI ).WithUserAssignedIdentityClientId (config .ManagedIdentityID )
226
209
}
227
- kcsb .SetConnectorDetails ("OpenTelemetry" , version , "" , "" , false , "" , kusto .StringPair {Key : "isManagedIdentity" , Value : strconv .FormatBool (isManagedIdentity )})
210
+ kcsb .SetConnectorDetails ("OpenTelemetry" , version , "" , "" , false , "" , azkustodata .StringPair {Key : "isManagedIdentity" , Value : strconv .FormatBool (isManagedIdentity )})
228
211
return kcsb
229
212
}
230
213
231
214
// Depending on the table, create separate ingestors
232
- func createManagedStreamingIngestor (config * Config , adxclient * kusto.Client , tablename string ) (* ingest.Managed , error ) {
233
- ingestor , err := ingest .NewManaged (adxclient , config .Database , tablename )
215
+ func createManagedStreamingIngestor (config * Config , version string , tablename string ) (* azkustoingest.Managed , error ) {
216
+ kcsb := createKcsb (config , version )
217
+ ingestopts := []azkustoingest.Option {
218
+ azkustoingest .WithDefaultDatabase (config .Database ),
219
+ azkustoingest .WithDefaultTable (tablename ),
220
+ }
221
+ ingestor , err := azkustoingest .NewManaged (kcsb , ingestopts ... )
234
222
return ingestor , err
235
223
}
236
224
237
225
// A queued ingestor in case that is provided as the config option
238
- func createQueuedIngestor (config * Config , adxclient * kusto.Client , tablename string ) (* ingest.Ingestion , error ) {
239
- ingestor , err := ingest .New (adxclient , config .Database , tablename )
226
+ func createQueuedIngestor (config * Config , version string , tablename string ) (* azkustoingest.Ingestion , error ) {
227
+ kcsb := createKcsb (config , version )
228
+ ingestopts := []azkustoingest.Option {
229
+ azkustoingest .WithDefaultDatabase (config .Database ),
230
+ azkustoingest .WithDefaultTable (tablename ),
231
+ }
232
+ ingestor , err := azkustoingest .New (kcsb , ingestopts ... )
240
233
return ingestor , err
241
234
}
242
235
0 commit comments