@@ -39,8 +39,8 @@ import (
39
39
"go.opentelemetry.io/collector/receiver/otlpreceiver"
40
40
"go.opentelemetry.io/otel"
41
41
"go.opentelemetry.io/otel/attribute"
42
- "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc "
43
- "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc "
42
+ "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp "
43
+ "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp "
44
44
"go.opentelemetry.io/otel/sdk/log"
45
45
"go.opentelemetry.io/otel/sdk/resource"
46
46
sdktrace "go.opentelemetry.io/otel/sdk/trace"
@@ -73,46 +73,35 @@ type point struct {
73
73
Value float64
74
74
}
75
75
76
- func TestIntegration_NativeOTelAPMStatsIngest (t * testing.T ) {
77
- previousVal := datadogconnector .NativeIngestFeatureGate .IsEnabled ()
78
- err := featuregate .GlobalRegistry ().Set (datadogconnector .NativeIngestFeatureGate .ID (), true )
79
- require .NoError (t , err )
80
- defer func () {
81
- err = featuregate .GlobalRegistry ().Set (datadogconnector .NativeIngestFeatureGate .ID (), previousVal )
82
- require .NoError (t , err )
83
- }()
84
-
85
- testIntegration (t )
86
- }
87
-
88
- func TestIntegration_LegacyOTelAPMStatsIngest (t * testing.T ) {
89
- testIntegration (t )
90
- }
91
-
92
- func testIntegration (t * testing.T ) {
76
+ func TestIntegration (t * testing.T ) {
93
77
// 1. Set up mock Datadog server
94
78
// See also https://github.com/DataDog/datadog-agent/blob/49c16e0d4deab396626238fa1d572b684475a53f/cmd/trace-agent/test/backend.go
95
79
apmstatsRec := & testutil.HTTPRequestRecorderWithChan {Pattern : testutil .APMStatsEndpoint , ReqChan : make (chan []byte )}
96
80
tracesRec := & testutil.HTTPRequestRecorderWithChan {Pattern : testutil .TraceEndpoint , ReqChan : make (chan []byte )}
97
81
server := testutil .DatadogServerMock (apmstatsRec .HandlerFunc , tracesRec .HandlerFunc )
98
82
defer server .Close ()
99
83
t .Setenv ("SERVER_URL" , server .URL )
100
- t .Setenv ("OTLP_HTTP_SERVER" , commonTestutil .GetAvailableLocalAddress (t ))
101
- otlpGRPCEndpoint := commonTestutil .GetAvailableLocalAddress (t )
102
- t .Setenv ("OTLP_GRPC_SERVER" , otlpGRPCEndpoint )
84
+ otlpEndpoint := commonTestutil .GetAvailableLocalAddress (t )
85
+ t .Setenv ("OTLP_HTTP_SERVER" , otlpEndpoint )
103
86
104
87
// 2. Start in-process collector
105
88
factories := getIntegrationTestComponents (t )
106
89
app := getIntegrationTestCollector (t , "integration_test_config.yaml" , factories )
90
+ var wg sync.WaitGroup
91
+ wg .Add (1 )
107
92
go func () {
108
- assert .NoError (t , app .Run (context .Background ()))
93
+ _ = app .Run (context .Background ()) // ignore shutdown error, core collector has race in shutdown: https://github.com/open-telemetry/opentelemetry-collector/issues/12944
94
+ wg .Done ()
95
+ }()
96
+ defer func () {
97
+ app .Shutdown ()
98
+ wg .Wait ()
109
99
}()
110
- defer app .Shutdown ()
111
100
112
101
waitForReadiness (app )
113
102
114
103
// 3. Generate and send traces
115
- sendTraces (t , otlpGRPCEndpoint )
104
+ sendTraces (t , otlpEndpoint )
116
105
117
106
// 4. Validate traces and APM stats from the mock server
118
107
var spans []* pb.Span
@@ -233,7 +222,7 @@ func sendTraces(t *testing.T, endpoint string) {
233
222
ctx := context .Background ()
234
223
235
224
// Set up OTel-Go SDK and exporter
236
- traceExporter , err := otlptracegrpc .New (ctx , otlptracegrpc .WithInsecure (), otlptracegrpc .WithEndpoint (endpoint ))
225
+ traceExporter , err := otlptracehttp .New (ctx , otlptracehttp .WithInsecure (), otlptracehttp .WithEndpoint (endpoint ))
237
226
require .NoError (t , err )
238
227
bsp := sdktrace .NewBatchSpanProcessor (traceExporter )
239
228
r1 , _ := resource .New (ctx , resource .WithAttributes (attribute .String ("k8s.node.name" , "aaaa" )))
@@ -290,22 +279,27 @@ func TestIntegrationComputeTopLevelBySpanKind(t *testing.T) {
290
279
server := testutil .DatadogServerMock (apmstatsRec .HandlerFunc , tracesRec .HandlerFunc )
291
280
defer server .Close ()
292
281
t .Setenv ("SERVER_URL" , server .URL )
293
- t .Setenv ("OTLP_HTTP_SERVER" , commonTestutil .GetAvailableLocalAddress (t ))
294
- otlpGRPCEndpoint := commonTestutil .GetAvailableLocalAddress (t )
295
- t .Setenv ("OTLP_GRPC_SERVER" , otlpGRPCEndpoint )
282
+ otlpEndpoint := commonTestutil .GetAvailableLocalAddress (t )
283
+ t .Setenv ("OTLP_HTTP_SERVER" , otlpEndpoint )
296
284
297
285
// 2. Start in-process collector
298
286
factories := getIntegrationTestComponents (t )
299
287
app := getIntegrationTestCollector (t , "integration_test_toplevel_config.yaml" , factories )
288
+ var wg sync.WaitGroup
289
+ wg .Add (1 )
300
290
go func () {
301
- assert .NoError (t , app .Run (context .Background ()))
291
+ _ = app .Run (context .Background ()) // ignore shutdown error, core collector has race in shutdown: https://github.com/open-telemetry/opentelemetry-collector/issues/12944
292
+ wg .Done ()
293
+ }()
294
+ defer func () {
295
+ app .Shutdown ()
296
+ wg .Wait ()
302
297
}()
303
- defer app .Shutdown ()
304
298
305
299
waitForReadiness (app )
306
300
307
301
// 3. Generate and send traces
308
- sendTracesComputeTopLevelBySpanKind (t , otlpGRPCEndpoint )
302
+ sendTracesComputeTopLevelBySpanKind (t , otlpEndpoint )
309
303
310
304
// 4. Validate traces and APM stats from the mock server
311
305
var spans []* pb.Span
@@ -397,7 +391,7 @@ func sendTracesComputeTopLevelBySpanKind(t *testing.T, endpoint string) {
397
391
ctx := context .Background ()
398
392
399
393
// Set up OTel-Go SDK and exporter
400
- traceExporter , err := otlptracegrpc .New (ctx , otlptracegrpc .WithInsecure (), otlptracegrpc .WithEndpoint (endpoint ))
394
+ traceExporter , err := otlptracehttp .New (ctx , otlptracehttp .WithInsecure (), otlptracehttp .WithEndpoint (endpoint ))
401
395
require .NoError (t , err )
402
396
bsp := sdktrace .NewBatchSpanProcessor (traceExporter )
403
397
r1 , _ := resource .New (ctx , resource .WithAttributes (attribute .String ("k8s.node.name" , "aaaa" )))
@@ -478,17 +472,16 @@ func TestIntegrationLogs(t *testing.T) {
478
472
})
479
473
defer server .Close ()
480
474
t .Setenv ("SERVER_URL" , server .URL )
481
- t .Setenv ("OTLP_HTTP_SERVER" , commonTestutil .GetAvailableLocalAddress (t ))
482
- otlpGRPCEndpoint := commonTestutil .GetAvailableLocalAddress (t )
483
- t .Setenv ("OTLP_GRPC_SERVER" , otlpGRPCEndpoint )
475
+ otlpEndpoint := commonTestutil .GetAvailableLocalAddress (t )
476
+ t .Setenv ("OTLP_HTTP_SERVER" , otlpEndpoint )
484
477
485
478
// 2. Start in-process collector
486
479
factories := getIntegrationTestComponents (t )
487
480
app := getIntegrationTestCollector (t , "integration_test_logs_config.yaml" , factories )
488
481
var wg sync.WaitGroup
489
482
wg .Add (1 )
490
483
go func () {
491
- _ = app .Run (context .Background ()) // ignore shutdown error
484
+ _ = app .Run (context .Background ()) // ignore shutdown error, core collector has race in shutdown: https://github.com/open-telemetry/opentelemetry-collector/issues/12944
492
485
wg .Done ()
493
486
}()
494
487
defer func () {
@@ -499,7 +492,7 @@ func TestIntegrationLogs(t *testing.T) {
499
492
waitForReadiness (app )
500
493
501
494
// 3. Generate and send logs
502
- sendLogs (t , 5 , otlpGRPCEndpoint )
495
+ sendLogs (t , 5 , otlpEndpoint )
503
496
504
497
// 4. Validate logs and metrics from the mock server
505
498
// Wait until `doneChannel` is closed and internal metrics are received.
@@ -545,7 +538,7 @@ func TestIntegrationLogs(t *testing.T) {
545
538
546
539
func sendLogs (t * testing.T , numLogs int , endpoint string ) {
547
540
ctx := context .Background ()
548
- logExporter , err := otlploggrpc .New (ctx , otlploggrpc .WithInsecure (), otlploggrpc .WithEndpoint (endpoint ))
541
+ logExporter , err := otlploghttp .New (ctx , otlploghttp .WithInsecure (), otlploghttp .WithEndpoint (endpoint ))
549
542
assert .NoError (t , err )
550
543
lr := make ([]log.Record , numLogs )
551
544
assert .NoError (t , logExporter .Export (ctx , lr ))
@@ -642,10 +635,16 @@ func testIntegrationHostMetrics(t *testing.T, expectedMetrics map[string]struct{
642
635
// 2. Start in-process collector
643
636
factories := getIntegrationTestComponents (t )
644
637
app := getIntegrationTestCollector (t , "integration_test_host_metrics_config.yaml" , factories )
638
+ var wg sync.WaitGroup
639
+ wg .Add (1 )
645
640
go func () {
646
- assert .NoError (t , app .Run (context .Background ()))
641
+ _ = app .Run (context .Background ()) // ignore shutdown error, core collector has race in shutdown: https://github.com/open-telemetry/opentelemetry-collector/issues/12944
642
+ wg .Done ()
643
+ }()
644
+ defer func () {
645
+ app .Shutdown ()
646
+ wg .Wait ()
647
647
}()
648
- defer app .Shutdown ()
649
648
650
649
waitForReadiness (app )
651
650
@@ -730,3 +729,95 @@ func seriesFromAPIClient(t *testing.T, metricsBytes []byte, expectedMetrics map[
730
729
}
731
730
return metricMap , nil
732
731
}
732
+
733
+ func TestIntegrationInternalMetrics (t * testing.T ) {
734
+ require .NoError (t , featuregate .GlobalRegistry ().Set ("exporter.datadogexporter.metricexportserializerclient" , false ))
735
+ defer func () {
736
+ require .NoError (t , featuregate .GlobalRegistry ().Set ("exporter.datadogexporter.metricexportserializerclient" , true ))
737
+ }()
738
+ expectedMetrics := map [string ]struct {}{
739
+ // Datadog internal metrics on trace and stats writers
740
+ "datadog.otlp_translator.resources.missing_source" : {},
741
+ "datadog.trace_agent.stats_writer.bytes" : {},
742
+ "datadog.trace_agent.stats_writer.retries" : {},
743
+ "datadog.trace_agent.stats_writer.stats_buckets" : {},
744
+ "datadog.trace_agent.stats_writer.stats_entries" : {},
745
+ "datadog.trace_agent.stats_writer.payloads" : {},
746
+ "datadog.trace_agent.stats_writer.client_payloads" : {},
747
+ "datadog.trace_agent.stats_writer.errors" : {},
748
+ "datadog.trace_agent.stats_writer.splits" : {},
749
+ "datadog.trace_agent.trace_writer.bytes" : {},
750
+ "datadog.trace_agent.trace_writer.retries" : {},
751
+ "datadog.trace_agent.trace_writer.spans" : {},
752
+ "datadog.trace_agent.trace_writer.traces" : {},
753
+ "datadog.trace_agent.trace_writer.payloads" : {},
754
+ "datadog.trace_agent.trace_writer.errors" : {},
755
+ "datadog.trace_agent.trace_writer.events" : {},
756
+
757
+ // OTel collector internal metrics
758
+ "otelcol_process_memory_rss" : {},
759
+ "otelcol_process_runtime_total_sys_memory_bytes" : {},
760
+ "otelcol_process_uptime" : {},
761
+ "otelcol_process_cpu_seconds" : {},
762
+ "otelcol_process_runtime_heap_alloc_bytes" : {},
763
+ "otelcol_process_runtime_total_alloc_bytes" : {},
764
+ "otelcol_receiver_accepted_metric_points" : {},
765
+ "otelcol_receiver_accepted_spans" : {},
766
+ "otelcol_exporter_queue_capacity" : {},
767
+ "otelcol_exporter_queue_size" : {},
768
+ "otelcol_exporter_sent_spans" : {},
769
+ "otelcol_exporter_sent_metric_points" : {},
770
+ }
771
+ testIntegrationInternalMetrics (t , expectedMetrics )
772
+ }
773
+
774
+ func testIntegrationInternalMetrics (t * testing.T , expectedMetrics map [string ]struct {}) {
775
+ // 1. Set up mock Datadog server
776
+ seriesRec := & testutil.HTTPRequestRecorderWithChan {Pattern : testutil .MetricV2Endpoint , ReqChan : make (chan []byte , 100 )}
777
+ tracesRec := & testutil.HTTPRequestRecorderWithChan {Pattern : testutil .TraceEndpoint , ReqChan : make (chan []byte , 100 )}
778
+ server := testutil .DatadogServerMock (seriesRec .HandlerFunc , tracesRec .HandlerFunc )
779
+ defer server .Close ()
780
+ t .Setenv ("SERVER_URL" , server .URL )
781
+ otlpEndpoint := commonTestutil .GetAvailableLocalAddress (t )
782
+ t .Setenv ("OTLP_HTTP_SERVER" , otlpEndpoint )
783
+
784
+ // 2. Start in-process collector
785
+ factories := getIntegrationTestComponents (t )
786
+ app := getIntegrationTestCollector (t , "integration_test_internal_metrics_config.yaml" , factories )
787
+ var wg sync.WaitGroup
788
+ wg .Add (1 )
789
+ go func () {
790
+ _ = app .Run (context .Background ()) // ignore shutdown error, core collector has race in shutdown: https://github.com/open-telemetry/opentelemetry-collector/issues/12944
791
+ wg .Done ()
792
+ }()
793
+ defer func () {
794
+ app .Shutdown ()
795
+ wg .Wait ()
796
+ }()
797
+
798
+ waitForReadiness (app )
799
+
800
+ // 3. Generate and send traces
801
+ sendTraces (t , otlpEndpoint )
802
+
803
+ // 4. Validate Datadog trace agent & OTel internal metrics are sent to the mock server
804
+ metricMap := make (map [string ]series )
805
+ for len (metricMap ) < len (expectedMetrics ) {
806
+ select {
807
+ case <- tracesRec .ReqChan :
808
+ // Drain the channel, no need to look into the traces
809
+ case metricsBytes := <- seriesRec .ReqChan :
810
+ var metrics seriesSlice
811
+ gz := getGzipReader (t , metricsBytes )
812
+ dec := json .NewDecoder (gz )
813
+ assert .NoError (t , dec .Decode (& metrics ))
814
+ for _ , s := range metrics .Series {
815
+ if _ , ok := expectedMetrics [s .Metric ]; ok {
816
+ metricMap [s .Metric ] = s
817
+ }
818
+ }
819
+ case <- time .After (60 * time .Second ):
820
+ t .Fail ()
821
+ }
822
+ }
823
+ }
0 commit comments