Skip to content

Commit b24db54

Browse files
authored
Small cleanups in testbed, hide unnecessary public structs (#3446)
Signed-off-by: Bogdan Drutu <[email protected]>
1 parent 40d6a55 commit b24db54

12 files changed

+208
-179
lines changed

testbed/correctness/metrics/correctness_test_case.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ type correctnessTestCase struct {
3232
sender testbed.DataSender
3333
receiver testbed.DataReceiver
3434
harness *testHarness
35-
collector *testbed.InProcessCollector
35+
collector testbed.OtelcolRunner
3636
}
3737

3838
func newCorrectnessTestCase(

testbed/testbed/child_process.go renamed to testbed/testbed/child_process_collector.go

Lines changed: 15 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -36,34 +36,9 @@ import (
3636
"go.uber.org/atomic"
3737
)
3838

39-
// ResourceSpec is a resource consumption specification.
40-
type ResourceSpec struct {
41-
// Percentage of one core the process is expected to consume at most.
42-
// Test is aborted and failed if consumption during
43-
// ResourceCheckPeriod exceeds this number. If 0 the CPU
44-
// consumption is not monitored and does not affect the test result.
45-
ExpectedMaxCPU uint32
46-
47-
// Maximum RAM in MiB the process is expected to consume.
48-
// Test is aborted and failed if consumption exceeds this number.
49-
// If 0 memory consumption is not monitored and does not affect
50-
// the test result.
51-
ExpectedMaxRAM uint32
52-
53-
// Period during which CPU and RAM of the process are measured.
54-
// Bigger numbers will result in more averaging of short spikes.
55-
ResourceCheckPeriod time.Duration
56-
}
57-
58-
// isSpecified returns true if any part of ResourceSpec is specified,
59-
// i.e. has non-zero value.
60-
func (rs *ResourceSpec) isSpecified() bool {
61-
return rs != nil && (rs.ExpectedMaxCPU != 0 || rs.ExpectedMaxRAM != 0)
62-
}
63-
64-
// ChildProcess implements the OtelcolRunner interface as a child process on the same machine executing
39+
// childProcessCollector implements the OtelcolRunner interface as a child process on the same machine executing
6540
// the test. The process can be monitored and the output of which will be written to a log file.
66-
type ChildProcess struct {
41+
type childProcessCollector struct {
6742
// Path to agent executable. If unset the default executable in
6843
// bin/otelcol_{{.GOOS}}_{{.GOARCH}} will be used.
6944
// Can be set for example to use the unstable executable for a specific test.
@@ -121,21 +96,12 @@ type ChildProcess struct {
12196
ramMiBMax uint32
12297
}
12398

124-
type StartParams struct {
125-
Name string
126-
LogFilePath string
127-
CmdArgs []string
128-
resourceSpec *ResourceSpec
129-
}
130-
131-
type ResourceConsumption struct {
132-
CPUPercentAvg float64
133-
CPUPercentMax float64
134-
RAMMiBAvg uint32
135-
RAMMiBMax uint32
99+
// NewChildProcessCollector crewtes a new OtelcolRunner as a child process on the same machine executing the test.
100+
func NewChildProcessCollector() OtelcolRunner {
101+
return &childProcessCollector{}
136102
}
137103

138-
func (cp *ChildProcess) PrepareConfig(configStr string) (configCleanup func(), err error) {
104+
func (cp *childProcessCollector) PrepareConfig(configStr string) (configCleanup func(), err error) {
139105
configCleanup = func() {
140106
// NoOp
141107
}
@@ -198,7 +164,7 @@ func expandExeFileName(exeName string) string {
198164
// logFilePath is the file path to write the standard output and standard error of
199165
// the process to.
200166
// cmdArgs is the command line arguments to pass to the process.
201-
func (cp *ChildProcess) Start(params StartParams) error {
167+
func (cp *childProcessCollector) Start(params StartParams) error {
202168

203169
cp.name = params.Name
204170
cp.doneSignal = make(chan struct{})
@@ -275,7 +241,7 @@ func (cp *ChildProcess) Start(params StartParams) error {
275241
return err
276242
}
277243

278-
func (cp *ChildProcess) Stop() (stopped bool, err error) {
244+
func (cp *childProcessCollector) Stop() (stopped bool, err error) {
279245
if !cp.isStarted || cp.isStopped {
280246
return false, nil
281247
}
@@ -341,7 +307,7 @@ func (cp *ChildProcess) Stop() (stopped bool, err error) {
341307
return stopped, err
342308
}
343309

344-
func (cp *ChildProcess) WatchResourceConsumption() error {
310+
func (cp *childProcessCollector) WatchResourceConsumption() error {
345311
if !cp.resourceSpec.isSpecified() {
346312
// Resource monitoring is not enabled.
347313
return nil
@@ -388,11 +354,11 @@ func (cp *ChildProcess) WatchResourceConsumption() error {
388354
}
389355
}
390356

391-
func (cp *ChildProcess) GetProcessMon() *process.Process {
357+
func (cp *childProcessCollector) GetProcessMon() *process.Process {
392358
return cp.processMon
393359
}
394360

395-
func (cp *ChildProcess) fetchRAMUsage() {
361+
func (cp *childProcessCollector) fetchRAMUsage() {
396362
// Get process memory and CPU times
397363
mi, err := cp.processMon.MemoryInfo()
398364
if err != nil {
@@ -415,7 +381,7 @@ func (cp *ChildProcess) fetchRAMUsage() {
415381
cp.ramMiBCur.Store(ramMiBCur)
416382
}
417383

418-
func (cp *ChildProcess) fetchCPUUsage() {
384+
func (cp *childProcessCollector) fetchCPUUsage() {
419385
times, err := cp.processMon.Times()
420386
if err != nil {
421387
log.Printf("cannot get process times for %d: %s",
@@ -448,7 +414,7 @@ func (cp *ChildProcess) fetchCPUUsage() {
448414
cp.cpuPercentX1000Cur.Store(curCPUPercentageX1000)
449415
}
450416

451-
func (cp *ChildProcess) checkAllowedResourceUsage() error {
417+
func (cp *childProcessCollector) checkAllowedResourceUsage() error {
452418
// Check if current CPU usage exceeds expected.
453419
var errMsg string
454420
if cp.resourceSpec.ExpectedMaxCPU != 0 && cp.cpuPercentX1000Cur.Load()/1000 > cp.resourceSpec.ExpectedMaxCPU {
@@ -472,7 +438,7 @@ func (cp *ChildProcess) checkAllowedResourceUsage() error {
472438
}
473439

474440
// GetResourceConsumption returns resource consumption as a string
475-
func (cp *ChildProcess) GetResourceConsumption() string {
441+
func (cp *childProcessCollector) GetResourceConsumption() string {
476442
if !cp.resourceSpec.isSpecified() {
477443
// Monitoring is not enabled.
478444
return ""
@@ -486,7 +452,7 @@ func (cp *ChildProcess) GetResourceConsumption() string {
486452
}
487453

488454
// GetTotalConsumption returns total resource consumption since start of process
489-
func (cp *ChildProcess) GetTotalConsumption() *ResourceConsumption {
455+
func (cp *childProcessCollector) GetTotalConsumption() *ResourceConsumption {
490456
rc := &ResourceConsumption{}
491457

492458
if cp.processMon != nil {
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
// Copyright The OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package testbed
16+
17+
import (
18+
"fmt"
19+
"strings"
20+
21+
"github.com/shirou/gopsutil/process"
22+
"go.uber.org/zap"
23+
"go.uber.org/zap/zapcore"
24+
25+
"go.opentelemetry.io/collector/component"
26+
"go.opentelemetry.io/collector/internal/version"
27+
"go.opentelemetry.io/collector/service"
28+
"go.opentelemetry.io/collector/service/parserprovider"
29+
)
30+
31+
// inProcessCollector implements the OtelcolRunner interfaces running a single otelcol as a go routine within the
32+
// same process as the test executor.
33+
type inProcessCollector struct {
34+
logger *zap.Logger
35+
factories component.Factories
36+
configStr string
37+
svc *service.Collector
38+
appDone chan struct{}
39+
stopped bool
40+
}
41+
42+
// NewInProcessCollector crewtes a new inProcessCollector using the supplied component factories.
43+
func NewInProcessCollector(factories component.Factories) OtelcolRunner {
44+
return &inProcessCollector{
45+
factories: factories,
46+
}
47+
}
48+
49+
func (ipp *inProcessCollector) PrepareConfig(configStr string) (configCleanup func(), err error) {
50+
configCleanup = func() {
51+
// NoOp
52+
}
53+
var logger *zap.Logger
54+
logger, err = configureLogger()
55+
if err != nil {
56+
return configCleanup, err
57+
}
58+
ipp.logger = logger
59+
ipp.configStr = configStr
60+
return configCleanup, err
61+
}
62+
63+
func (ipp *inProcessCollector) Start(args StartParams) error {
64+
settings := service.CollectorSettings{
65+
BuildInfo: component.BuildInfo{
66+
Command: "otelcol",
67+
Version: version.Version,
68+
},
69+
Factories: ipp.factories,
70+
ParserProvider: parserprovider.NewInMemory(strings.NewReader(ipp.configStr)),
71+
}
72+
var err error
73+
ipp.svc, err = service.New(settings)
74+
if err != nil {
75+
return err
76+
}
77+
ipp.svc.Command().SetArgs(args.CmdArgs)
78+
79+
ipp.appDone = make(chan struct{})
80+
go func() {
81+
defer close(ipp.appDone)
82+
appErr := ipp.svc.Run()
83+
if appErr != nil {
84+
err = appErr
85+
}
86+
}()
87+
88+
for state := range ipp.svc.GetStateChannel() {
89+
switch state {
90+
case service.Starting:
91+
// NoOp
92+
case service.Running:
93+
return err
94+
default:
95+
err = fmt.Errorf("unable to start, otelcol state is %d", state)
96+
}
97+
}
98+
return err
99+
}
100+
101+
func (ipp *inProcessCollector) Stop() (stopped bool, err error) {
102+
if !ipp.stopped {
103+
ipp.stopped = true
104+
ipp.svc.Shutdown()
105+
}
106+
<-ipp.appDone
107+
stopped = ipp.stopped
108+
return stopped, err
109+
}
110+
111+
func (ipp *inProcessCollector) WatchResourceConsumption() error {
112+
return nil
113+
}
114+
115+
func (ipp *inProcessCollector) GetProcessMon() *process.Process {
116+
return nil
117+
}
118+
119+
func (ipp *inProcessCollector) GetTotalConsumption() *ResourceConsumption {
120+
return &ResourceConsumption{
121+
CPUPercentAvg: 0,
122+
CPUPercentMax: 0,
123+
RAMMiBAvg: 0,
124+
RAMMiBMax: 0,
125+
}
126+
}
127+
128+
func (ipp *inProcessCollector) GetResourceConsumption() string {
129+
return ""
130+
}
131+
132+
func configureLogger() (*zap.Logger, error) {
133+
conf := zap.NewDevelopmentConfig()
134+
conf.Level.SetLevel(zapcore.InfoLevel)
135+
logger, err := conf.Build()
136+
return logger, err
137+
}

testbed/testbed/otelcol_runner_test.go renamed to testbed/testbed/in_process_collector_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@ func TestNewInProcessPipeline(t *testing.T) {
2929
assert.NoError(t, err)
3030
sender := NewOTLPTraceDataSender(DefaultHost, GetAvailablePort(t))
3131
receiver := NewOTLPDataReceiver(DefaultOTLPPort)
32-
runner := NewInProcessCollector(factories)
32+
runner, ok := NewInProcessCollector(factories).(*inProcessCollector)
33+
require.True(t, ok)
3334

3435
format := `
3536
receivers:%v

testbed/testbed/options.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,35 @@
1717

1818
package testbed
1919

20+
import (
21+
"time"
22+
)
23+
24+
// ResourceSpec is a resource consumption specification.
25+
type ResourceSpec struct {
26+
// Percentage of one core the process is expected to consume at most.
27+
// Test is aborted and failed if consumption during
28+
// ResourceCheckPeriod exceeds this number. If 0 the CPU
29+
// consumption is not monitored and does not affect the test result.
30+
ExpectedMaxCPU uint32
31+
32+
// Maximum RAM in MiB the process is expected to consume.
33+
// Test is aborted and failed if consumption exceeds this number.
34+
// If 0 memory consumption is not monitored and does not affect
35+
// the test result.
36+
ExpectedMaxRAM uint32
37+
38+
// Period during which CPU and RAM of the process are measured.
39+
// Bigger numbers will result in more averaging of short spikes.
40+
ResourceCheckPeriod time.Duration
41+
}
42+
43+
// isSpecified returns true if any part of ResourceSpec is specified,
44+
// i.e. has non-zero value.
45+
func (rs *ResourceSpec) isSpecified() bool {
46+
return rs != nil && (rs.ExpectedMaxCPU != 0 || rs.ExpectedMaxRAM != 0)
47+
}
48+
2049
// TestCaseOption defines a TestCase option.
2150
type TestCaseOption func(t *TestCase)
2251

0 commit comments

Comments
 (0)