4
4
package configuration_switching
5
5
6
6
import (
7
- "bytes"
8
7
"context"
9
8
"fmt"
10
9
"os"
11
10
"path/filepath"
12
11
"strings"
13
12
"testing"
14
- "text/template"
15
- "time"
16
13
17
14
"github.com/stretchr/testify/assert"
18
15
"github.com/stretchr/testify/require"
19
16
"go.opentelemetry.io/collector/consumer/consumertest"
20
17
"go.opentelemetry.io/collector/pdata/plog"
21
18
"go.opentelemetry.io/collector/pdata/pmetric"
22
- "gopkg.in/yaml.v3"
23
- "helm.sh/helm/v3/pkg/action"
24
- "helm.sh/helm/v3/pkg/kube"
25
19
corev1 "k8s.io/api/core/v1"
26
20
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
27
21
"k8s.io/client-go/kubernetes"
@@ -52,12 +46,6 @@ func deployChartsAndApps(t *testing.T, valuesFileName string, repl map[string]in
52
46
client , err := kubernetes .NewForConfig (kubeConfig )
53
47
require .NoError (t , err )
54
48
55
- chart := internal .LoadCollectorChart (t )
56
-
57
- var valuesBytes []byte
58
- valuesBytes , err = os .ReadFile (filepath .Join (testDir , valuesDir , valuesFileName ))
59
- require .NoError (t , err )
60
-
61
49
hostEp := internal .HostEndpoint (t )
62
50
if len (hostEp ) == 0 {
63
51
require .Fail (t , "Host endpoint not found" )
@@ -70,36 +58,10 @@ func deployChartsAndApps(t *testing.T, valuesFileName string, repl map[string]in
70
58
replacements [k ] = v
71
59
}
72
60
73
- tmpl , err := template . New ( "" ). Parse ( string ( valuesBytes ))
61
+ valuesFile , err := filepath . Abs ( filepath . Join ( testDir , valuesDir , valuesFileName ))
74
62
require .NoError (t , err )
75
- var buf bytes.Buffer
76
- err = tmpl .Execute (& buf , replacements )
77
- require .NoError (t , err )
78
- var values map [string ]interface {}
79
- err = yaml .Unmarshal (buf .Bytes (), & values )
80
- require .NoError (t , err )
81
-
82
- actionConfig := new (action.Configuration )
83
- if err := actionConfig .Init (kube .GetConfig (testKubeConfig , "" , "default" ), "default" , os .Getenv ("HELM_DRIVER" ), func (format string , v ... interface {}) {
84
- t .Logf (format + "\n " , v ... )
85
- }); err != nil {
86
- require .NoError (t , err )
87
- }
88
- install := action .NewInstall (actionConfig )
89
- install .Namespace = "default"
90
- install .ReleaseName = "sock"
91
- _ , err = install .Run (chart , values )
92
- if err != nil {
93
- t .Logf ("error reported during helm install: %v\n " , err )
94
- retryUpgrade := action .NewUpgrade (actionConfig )
95
- retryUpgrade .Namespace = "default"
96
- retryUpgrade .Install = true
97
- _ , err = retryUpgrade .Run ("sock" , chart , values )
98
- require .NoError (t , err )
99
- }
100
-
101
- waitForAllDeploymentsToStart (t , client )
102
- t .Log ("Deployments started" )
63
+ internal .ChartInstallOrUpgrade (t , testKubeConfig , valuesFile , replacements )
64
+ internal .WaitForAllDeploymentsToStart (t , client )
103
65
104
66
t .Cleanup (func () {
105
67
if os .Getenv ("SKIP_TEARDOWN" ) == "true" {
@@ -114,27 +76,9 @@ func deployChartsAndApps(t *testing.T, valuesFileName string, repl map[string]in
114
76
}
115
77
func teardown (t * testing.T ) {
116
78
t .Log ("Running teardown" )
117
- uninstallDeployment (t )
118
- }
119
-
120
- func waitForAllDeploymentsToStart (t * testing.T , client * kubernetes.Clientset ) {
121
- require .Eventually (t , func () bool {
122
- di , err := client .AppsV1 ().Deployments ("default" ).List (context .Background (), metav1.ListOptions {})
123
- require .NoError (t , err )
124
- for _ , d := range di .Items {
125
- if d .Status .ReadyReplicas != d .Status .Replicas {
126
- var messages string
127
- for _ , c := range d .Status .Conditions {
128
- messages += c .Message
129
- messages += "\n "
130
- }
131
-
132
- t .Logf ("Deployment not ready: %s, %s" , d .Name , messages )
133
- return false
134
- }
135
- }
136
- return true
137
- }, 5 * time .Minute , 10 * time .Second )
79
+ testKubeConfig , setKubeConfig := os .LookupEnv ("KUBECONFIG" )
80
+ require .True (t , setKubeConfig , "the environment variable KUBECONFIG must be set" )
81
+ internal .ChartUninstall (t , testKubeConfig )
138
82
}
139
83
140
84
func Test_Functions (t * testing.T ) {
@@ -152,14 +96,10 @@ func Test_Functions(t *testing.T) {
152
96
return
153
97
}
154
98
155
- t .Setenv ("KUBECONFIG" , "/tmp/kube-config-splunk-otel-collector-chart-functional-testing" )
156
- t .Setenv ("KUBE_TEST_ENV" , "kind" )
157
-
158
99
t .Run ("agent logs and metrics enabled or disabled" , testAgentLogsAndMetrics )
159
100
t .Run ("logs and metrics index switch" , testIndexSwitch )
160
101
t .Run ("cluster receiver enabled or disabled" , testClusterReceiverEnabledOrDisabled )
161
102
t .Run ("logs and metrics attributes verification" , testVerifyLogsAndMetricsAttributes )
162
-
163
103
}
164
104
165
105
func testAgentLogsAndMetrics (t * testing.T ) {
@@ -183,7 +123,6 @@ func testAgentLogsAndMetrics(t *testing.T) {
183
123
184
124
internal .WaitForMetrics (t , 5 , hecMetricsConsumer )
185
125
internal .WaitForLogs (t , 5 , agentLogsConsumer )
186
- uninstallDeployment (t )
187
126
})
188
127
189
128
t .Run ("check metrics only enabled" , func (t * testing.T ) {
@@ -201,7 +140,6 @@ func testAgentLogsAndMetrics(t *testing.T) {
201
140
202
141
internal .WaitForMetrics (t , 5 , hecMetricsConsumer )
203
142
internal .CheckNoEventsReceived (t , agentLogsConsumer )
204
- uninstallDeployment (t )
205
143
})
206
144
207
145
t .Run ("check logs only enabled" , func (t * testing.T ) {
@@ -215,26 +153,25 @@ func testAgentLogsAndMetrics(t *testing.T) {
215
153
deployChartsAndApps (t , valuesFileName , replacements )
216
154
217
155
internal .WaitForLogs (t , 5 , agentLogsConsumer )
218
- uninstallDeployment (t )
219
- internal .ResetLogsSink (t , agentLogsConsumer )
220
- internal .ResetMetricsSink (t , hecMetricsConsumer )
221
156
})
222
157
}
223
158
224
159
func testIndexSwitch (t * testing.T ) {
225
- var metricsIndex string = "metricsIndex"
226
- var newMetricsIndex string = "newMetricsIndex"
227
- var logsIndex string = "main"
228
- var newLogsIndex string = "newLogsIndex"
160
+ var metricsIndex = "metricsIndex"
161
+ var newMetricsIndex = "newMetricsIndex"
162
+ var logsIndex = "main"
163
+ var newLogsIndex = "newLogsIndex"
229
164
var nonDefaultSourcetype = "my-sourcetype"
230
165
231
166
valuesFileName := "values_indexes_switching.yaml.tmpl"
232
167
hecMetricsConsumer := globalSinks .hecMetricsConsumer
168
+ internal .ResetMetricsSink (t , hecMetricsConsumer )
233
169
internal .CheckNoMetricsReceived (t , hecMetricsConsumer )
234
170
agentLogsConsumer := globalSinks .logsConsumer
171
+ internal .ResetLogsSink (t , agentLogsConsumer )
235
172
internal .CheckNoEventsReceived (t , agentLogsConsumer )
236
173
237
- t .Run ("check logs and metrics index switching " , func (t * testing.T ) {
174
+ t .Run ("default_source_type " , func (t * testing.T ) {
238
175
replacements := map [string ]interface {}{
239
176
"MetricsIndex" : metricsIndex ,
240
177
"LogsIndex" : logsIndex ,
@@ -257,82 +194,73 @@ func testIndexSwitch(t *testing.T) {
257
194
}
258
195
}
259
196
assert .NotContains (t , sourcetypes , nonDefaultSourcetype )
260
- assert .True (t , len ( indices ) == 1 )
261
- assert .True (t , indices [0 ] == logsIndex )
197
+ assert .Len (t , indices , 1 )
198
+ assert .Equal (t , logsIndex , indices [0 ])
262
199
263
200
var mIndices []string
264
201
mIndices = getMetricsIndex (hecMetricsConsumer .AllMetrics ())
265
- assert .True (t , len (mIndices ) == 1 )
266
- assert .True (t , mIndices [0 ] == metricsIndex )
202
+ assert .Len (t , mIndices , 1 )
203
+ assert .Equal (t , metricsIndex , mIndices [0 ])
204
+ })
267
205
268
- replacements = map [string ]interface {}{
206
+ t .Run ("non_default_source_type" , func (t * testing.T ) {
207
+ replacements := map [string ]interface {}{
269
208
"MetricsIndex" : newMetricsIndex ,
270
209
"LogsIndex" : newLogsIndex ,
271
210
"NonDefaultSourcetype" : true ,
272
211
"Sourcetype" : nonDefaultSourcetype ,
273
212
}
274
213
deployChartsAndApps (t , valuesFileName , replacements )
275
- internal .ResetLogsSink (t , agentLogsConsumer )
276
- internal .ResetMetricsSink (t , hecMetricsConsumer )
277
214
278
215
internal .WaitForLogs (t , 3 , agentLogsConsumer )
279
- logs = agentLogsConsumer .AllLogs ()
280
- sourcetypes , indices = getLogsIndexAndSourceType (logs )
216
+ logs : = agentLogsConsumer .AllLogs ()
217
+ sourcetypes , indices : = getLogsIndexAndSourceType (logs )
281
218
t .Logf ("Indices: %v" , indices )
282
219
assert .Contains (t , indices , newLogsIndex )
283
220
assert .Contains (t , sourcetypes , nonDefaultSourcetype )
284
- assert .True (t , len (indices ) == 1 )
285
- assert .True (t , len (sourcetypes ) == 1 )
286
221
287
222
internal .WaitForMetrics (t , 3 , hecMetricsConsumer )
288
- mIndices = getMetricsIndex (hecMetricsConsumer .AllMetrics ())
289
- assert .True (t , len (mIndices ) == 1 )
290
- assert .True (t , mIndices [0 ] == newMetricsIndex )
223
+ mIndices := getMetricsIndex (hecMetricsConsumer .AllMetrics ())
224
+ assert .Contains (t , mIndices , newMetricsIndex )
291
225
})
292
- uninstallDeployment (t )
293
- internal .ResetLogsSink (t , agentLogsConsumer )
294
- internal .ResetMetricsSink (t , hecMetricsConsumer )
295
226
}
296
227
297
228
func testClusterReceiverEnabledOrDisabled (t * testing.T ) {
298
229
valuesFileName := "values_cluster_receiver_switching.yaml.tmpl"
299
- namespace := "default"
300
230
logsObjectsConsumer := globalSinks .logsObjectsConsumer
301
231
hostEp := internal .HostEndpoint (t )
302
232
if len (hostEp ) == 0 {
303
233
require .Fail (t , "Host endpoint not found" )
304
234
}
305
235
logsObjectsHecEndpoint := fmt .Sprintf ("http://%s:%d/services/collector" , hostEp , internal .HECObjectsReceiverPort )
306
236
307
- t .Run ("check cluster receiver enabled " , func (t * testing.T ) {
237
+ t .Run ("check cluster receiver disabled " , func (t * testing.T ) {
308
238
internal .ResetLogsSink (t , logsObjectsConsumer )
309
239
replacements := map [string ]interface {}{
310
240
"ClusterReceiverEnabled" : false ,
311
241
"LogObjectsHecEndpoint" : logsObjectsHecEndpoint ,
312
242
}
313
243
deployChartsAndApps (t , valuesFileName , replacements )
314
- var pods * corev1.PodList
315
- pods = listPodsInNamespace (t , namespace )
316
- assert .True (t , len (pods .Items ) == 1 )
244
+ pods := listPodsInNamespace (t , internal .Namespace )
245
+ assert .Len (t , pods .Items , 1 )
317
246
assert .True (t , strings .HasPrefix (pods .Items [0 ].Name , "sock-splunk-otel-collector-agent" ))
318
247
internal .CheckNoEventsReceived (t , logsObjectsConsumer )
248
+ })
319
249
320
- t .Log ("cluster receiver enabled" )
321
- replacements = map [string ]interface {}{
250
+ t .Run ("check cluster receiver enabled" , func (t * testing.T ) {
251
+ internal .ResetLogsSink (t , logsObjectsConsumer )
252
+ replacements := map [string ]interface {}{
322
253
"ClusterReceiverEnabled" : true ,
323
254
"LogObjectsHecEndpoint" : logsObjectsHecEndpoint ,
324
255
}
325
256
deployChartsAndApps (t , valuesFileName , replacements )
326
257
internal .ResetLogsSink (t , logsObjectsConsumer )
327
-
328
- pods = listPodsInNamespace (t , namespace )
329
- assert .True (t , len (pods .Items ) == 2 )
258
+ pods := listPodsInNamespace (t , internal .Namespace )
259
+ assert .Len (t , pods .Items , 2 )
330
260
assert .True (t , checkPodExists (pods , "sock-splunk-otel-collector-agent" ))
331
261
assert .True (t , checkPodExists (pods , "sock-splunk-otel-collector-k8s-cluster-receiver" ))
332
262
internal .WaitForLogs (t , 5 , logsObjectsConsumer )
333
263
})
334
- uninstallDeployment (t )
335
- internal .ResetLogsSink (t , logsObjectsConsumer )
336
264
}
337
265
338
266
func testVerifyLogsAndMetricsAttributes (t * testing.T ) {
@@ -456,13 +384,6 @@ func listPodsInNamespace(t *testing.T, namespace string) *corev1.PodList {
456
384
return pods
457
385
}
458
386
459
- func waitForAllPodsToBeRemoved (t * testing.T , namespace string ) {
460
- timeoutMinutes := 2
461
- require .Eventuallyf (t , func () bool {
462
- return len (listPodsInNamespace (t , namespace ).Items ) == 0
463
- }, time .Duration (timeoutMinutes )* time .Minute , 5 * time .Second , "There are still %d pods in the namespace" , len (listPodsInNamespace (t , namespace ).Items ))
464
- }
465
-
466
387
func getLogsIndexAndSourceType (logs []plog.Logs ) ([]string , []string ) {
467
388
var sourcetypes []string
468
389
var indices []string
@@ -595,22 +516,3 @@ func getMetricsAttributes(metrics []pmetric.Metrics, attributeName string) ([]st
595
516
fmt .Printf ("Counters: Found: %d | Skipped: %d | Not Found: %d\n " , foundCounter , skippedCounter , notFoundCounter )
596
517
return resourceAttributes , notFoundCounter
597
518
}
598
-
599
- func uninstallDeployment (t * testing.T ) {
600
- testKubeConfig , setKubeConfig := os .LookupEnv ("KUBECONFIG" )
601
- require .True (t , setKubeConfig , "the environment variable KUBECONFIG must be set" )
602
- actionConfig := new (action.Configuration )
603
- if err := actionConfig .Init (kube .GetConfig (testKubeConfig , "" , "default" ), "default" , os .Getenv ("HELM_DRIVER" ), func (format string , v ... interface {}) {
604
- t .Logf (format + "\n " , v ... )
605
- }); err != nil {
606
- require .NoError (t , err )
607
- }
608
-
609
- uninstall := action .NewUninstall (actionConfig )
610
- uninstallResponse , err := uninstall .Run ("sock" )
611
- if err != nil {
612
- t .Logf ("Failed to uninstall release: %v" , err )
613
- }
614
- t .Logf ("Uninstalled release: %v" , uninstallResponse )
615
- waitForAllPodsToBeRemoved (t , "default" )
616
- }
0 commit comments