Skip to content

Commit 4c3e37d

Browse files
committed
Moving configsource package and latest core package
1 parent d0b2280 commit 4c3e37d

File tree

2 files changed

+32
-96
lines changed

2 files changed

+32
-96
lines changed

internal/configprovider/manager.go

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
package configsource
15+
package configprovider
1616

1717
import (
1818
"bytes"
@@ -26,6 +26,7 @@ import (
2626

2727
"go.opentelemetry.io/collector/component"
2828
"go.opentelemetry.io/collector/config"
29+
"go.opentelemetry.io/collector/config/experimental/configsource"
2930
"go.opentelemetry.io/collector/consumer/consumererror"
3031
"go.uber.org/zap"
3132
"gopkg.in/yaml.v2"
@@ -156,10 +157,10 @@ type (
156157
type Manager struct {
157158
// configSources is map from ConfigSource names (as defined in the configuration)
158159
// and the respective instances.
159-
configSources map[string]ConfigSource
160+
configSources map[string]configsource.ConfigSource
160161
// sessions track all the Session objects used to retrieve values to be injected
161162
// into the configuration.
162-
sessions map[string]Session
163+
sessions map[string]configsource.Session
163164
// watchingCh is used to notify users of the Manager that the WatchForUpdate function
164165
// is ready and waiting for notifications.
165166
watchingCh chan struct{}
@@ -240,11 +241,11 @@ func (m *Manager) WatchForUpdate() error {
240241

241242
err := watcherFn()
242243
switch {
243-
case errors.Is(err, ErrWatcherNotSupported):
244+
case errors.Is(err, configsource.ErrWatcherNotSupported):
244245
// The watcher for the retrieved value is not supported, nothing to
245246
// do, just exit from the goroutine.
246247
return
247-
case errors.Is(err, ErrSessionClosed):
248+
case errors.Is(err, configsource.ErrSessionClosed):
248249
// The Session from which this watcher was retrieved is being closed.
249250
// There is no error to report, just exit from the goroutine.
250251
return
@@ -271,7 +272,7 @@ func (m *Manager) WatchForUpdate() error {
271272
return err
272273
case <-m.closeCh:
273274
// This covers the case that all watchers returned ErrWatcherNotSupported.
274-
return ErrSessionClosed
275+
return configsource.ErrSessionClosed
275276
}
276277
}
277278

@@ -297,10 +298,10 @@ func (m *Manager) Close(ctx context.Context) error {
297298
return consumererror.Combine(errs)
298299
}
299300

300-
func newManager(configSources map[string]ConfigSource) *Manager {
301+
func newManager(configSources map[string]configsource.ConfigSource) *Manager {
301302
return &Manager{
302303
configSources: configSources,
303-
sessions: make(map[string]Session),
304+
sessions: make(map[string]configsource.Session),
304305
watchingCh: make(chan struct{}),
305306
closeCh: make(chan struct{}),
306307
}
@@ -346,7 +347,7 @@ func (m *Manager) expandStringValues(ctx context.Context, value interface{}) (in
346347

347348
// expandConfigSource retrieve data from the specified config source and injects them into
348349
// the configuration. The Manager tracks sessions and watcher objects as needed.
349-
func (m *Manager) expandConfigSource(ctx context.Context, cfgSrc ConfigSource, s string) (interface{}, error) {
350+
func (m *Manager) expandConfigSource(ctx context.Context, cfgSrc configsource.ConfigSource, s string) (interface{}, error) {
350351
cfgSrcName, selector, params, err := parseCfgSrc(s)
351352
if err != nil {
352353
return nil, err
@@ -509,7 +510,7 @@ func parseCfgSrc(s string) (cfgSrcName, selector string, params interface{}, err
509510
selector = strings.Trim(parts[0], " ")
510511

511512
if len(parts) > 1 && len(parts[1]) > 0 {
512-
v := config.NewViper()
513+
v := config.NewParser().Viper()
513514
v.SetConfigType("yaml")
514515
if err = v.ReadConfig(bytes.NewReader([]byte(parts[1]))); err != nil {
515516
return

internal/configprovider/manager_test.go

Lines changed: 21 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,11 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
package configsource
15+
package configprovider
1616

1717
import (
1818
"context"
1919
"errors"
20-
"fmt"
2120
"os"
2221
"path"
2322
"testing"
@@ -26,6 +25,7 @@ import (
2625
"github.com/stretchr/testify/require"
2726
"go.opentelemetry.io/collector/component"
2827
"go.opentelemetry.io/collector/config"
28+
"go.opentelemetry.io/collector/config/experimental/configsource"
2929
"go.uber.org/zap"
3030
)
3131

@@ -81,7 +81,7 @@ func TestConfigSourceManager_NewManager(t *testing.T) {
8181

8282
func TestConfigSourceManager_Simple(t *testing.T) {
8383
ctx := context.Background()
84-
manager := newManager(map[string]ConfigSource{
84+
manager := newManager(map[string]configsource.ConfigSource{
8585
"tstcfgsrc": &testConfigSource{
8686
ValueMap: map[string]valueEntry{
8787
"test_selector": {Value: "test_value"},
@@ -119,7 +119,7 @@ func TestConfigSourceManager_Simple(t *testing.T) {
119119
manager.WaitForWatcher()
120120
assert.NoError(t, manager.Close(ctx))
121121
<-doneCh
122-
assert.ErrorIs(t, errWatcher, ErrSessionClosed)
122+
assert.ErrorIs(t, errWatcher, configsource.ErrSessionClosed)
123123
}
124124

125125
func TestConfigSourceManager_ResolveRemoveConfigSourceSection(t *testing.T) {
@@ -132,7 +132,7 @@ func TestConfigSourceManager_ResolveRemoveConfigSourceSection(t *testing.T) {
132132
},
133133
}
134134

135-
manager := newManager(map[string]ConfigSource{
135+
manager := newManager(map[string]configsource.ConfigSource{
136136
"tstcfgsrc": &testConfigSource{},
137137
})
138138

@@ -150,15 +150,15 @@ func TestConfigSourceManager_ResolveErrors(t *testing.T) {
150150

151151
tests := []struct {
152152
config map[string]interface{}
153-
configSourceMap map[string]ConfigSource
153+
configSourceMap map[string]configsource.ConfigSource
154154
name string
155155
}{
156156
{
157157
name: "incorrect_cfgsrc_ref",
158158
config: map[string]interface{}{
159159
"cfgsrc": "$tstcfgsrc:selector?{invalid}",
160160
},
161-
configSourceMap: map[string]ConfigSource{
161+
configSourceMap: map[string]configsource.ConfigSource{
162162
"tstcfgsrc": &testConfigSource{},
163163
},
164164
},
@@ -167,7 +167,7 @@ func TestConfigSourceManager_ResolveErrors(t *testing.T) {
167167
config: map[string]interface{}{
168168
"cfgsrc": "$tstcfgsrc:selector",
169169
},
170-
configSourceMap: map[string]ConfigSource{
170+
configSourceMap: map[string]configsource.ConfigSource{
171171
"tstcfgsrc": &testConfigSource{ErrOnNewSession: testErr},
172172
},
173173
},
@@ -176,7 +176,7 @@ func TestConfigSourceManager_ResolveErrors(t *testing.T) {
176176
config: map[string]interface{}{
177177
"cfgsrc": "$tstcfgsrc:selector",
178178
},
179-
configSourceMap: map[string]ConfigSource{
179+
configSourceMap: map[string]configsource.ConfigSource{
180180
"tstcfgsrc": &testConfigSource{ErrOnRetrieve: testErr},
181181
},
182182
},
@@ -185,7 +185,7 @@ func TestConfigSourceManager_ResolveErrors(t *testing.T) {
185185
config: map[string]interface{}{
186186
"cfgsrc": "$tstcfgsrc:selector",
187187
},
188-
configSourceMap: map[string]ConfigSource{
188+
configSourceMap: map[string]configsource.ConfigSource{
189189
"tstcfgsrc": &testConfigSource{
190190
ErrOnRetrieveEnd: testErr,
191191
ValueMap: map[string]valueEntry{
@@ -209,7 +209,7 @@ func TestConfigSourceManager_ResolveErrors(t *testing.T) {
209209

210210
func TestConfigSourceManager_ArraysAndMaps(t *testing.T) {
211211
ctx := context.Background()
212-
manager := newManager(map[string]ConfigSource{
212+
manager := newManager(map[string]configsource.ConfigSource{
213213
"tstcfgsrc": &testConfigSource{
214214
ValueMap: map[string]valueEntry{
215215
"elem0": {Value: "elem0_value"},
@@ -268,7 +268,7 @@ func TestConfigSourceManager_ParamsHandling(t *testing.T) {
268268
return nil
269269
}
270270

271-
manager := newManager(map[string]ConfigSource{
271+
manager := newManager(map[string]configsource.ConfigSource{
272272
"tstcfgsrc": &tstCfgSrc,
273273
})
274274

@@ -292,7 +292,7 @@ func TestConfigSourceManager_WatchForUpdate(t *testing.T) {
292292
ctx := context.Background()
293293
watchForUpdateCh := make(chan error, 1)
294294

295-
manager := newManager(map[string]ConfigSource{
295+
manager := newManager(map[string]configsource.ConfigSource{
296296
"tstcfgsrc": &testConfigSource{
297297
ValueMap: map[string]valueEntry{
298298
"test_selector": {
@@ -323,10 +323,10 @@ func TestConfigSourceManager_WatchForUpdate(t *testing.T) {
323323
}()
324324

325325
manager.WaitForWatcher()
326-
watchForUpdateCh <- ErrValueUpdated
326+
watchForUpdateCh <- configsource.ErrValueUpdated
327327

328328
<-doneCh
329-
assert.ErrorIs(t, errWatcher, ErrValueUpdated)
329+
assert.ErrorIs(t, errWatcher, configsource.ErrValueUpdated)
330330
assert.NoError(t, manager.Close(ctx))
331331
}
332332

@@ -341,11 +341,11 @@ func TestConfigSourceManager_MultipleWatchForUpdate(t *testing.T) {
341341
case errFromWatchForUpdate := <-watchForUpdateCh:
342342
return errFromWatchForUpdate
343343
case <-watchDoneCh:
344-
return ErrSessionClosed
344+
return configsource.ErrSessionClosed
345345
}
346346
}
347347

348-
manager := newManager(map[string]ConfigSource{
348+
manager := newManager(map[string]configsource.ConfigSource{
349349
"tstcfgsrc": &testConfigSource{
350350
ValueMap: map[string]valueEntry{
351351
"test_selector": {
@@ -379,11 +379,11 @@ func TestConfigSourceManager_MultipleWatchForUpdate(t *testing.T) {
379379
manager.WaitForWatcher()
380380

381381
for i := 0; i < watchForUpdateChSize; i++ {
382-
watchForUpdateCh <- ErrValueUpdated
382+
watchForUpdateCh <- configsource.ErrValueUpdated
383383
}
384384

385385
<-doneCh
386-
assert.ErrorIs(t, errWatcher, ErrValueUpdated)
386+
assert.ErrorIs(t, errWatcher, configsource.ErrValueUpdated)
387387
close(watchForUpdateCh)
388388
assert.NoError(t, manager.Close(ctx))
389389
}
@@ -409,7 +409,7 @@ func TestConfigSourceManager_EnvVarHandling(t *testing.T) {
409409
return nil
410410
}
411411

412-
manager := newManager(map[string]ConfigSource{
412+
manager := newManager(map[string]configsource.ConfigSource{
413413
"tstcfgsrc": &tstCfgSrc,
414414
})
415415

@@ -431,7 +431,7 @@ func TestConfigSourceManager_EnvVarHandling(t *testing.T) {
431431

432432
func TestManager_expandString(t *testing.T) {
433433
ctx := context.Background()
434-
manager := newManager(map[string]ConfigSource{
434+
manager := newManager(map[string]configsource.ConfigSource{
435435
"tstcfgsrc": &testConfigSource{
436436
ValueMap: map[string]valueEntry{
437437
"str_key": {Value: "test_value"},
@@ -621,68 +621,3 @@ func Test_parseCfgSrc(t *testing.T) {
621621
})
622622
}
623623
}
624-
625-
// testConfigSource a ConfigSource to be used in tests.
626-
type testConfigSource struct {
627-
ValueMap map[string]valueEntry
628-
629-
ErrOnNewSession error
630-
ErrOnRetrieve error
631-
ErrOnRetrieveEnd error
632-
ErrOnClose error
633-
634-
OnRetrieve func(ctx context.Context, selector string, params interface{}) error
635-
}
636-
637-
type valueEntry struct {
638-
Value interface{}
639-
WatchForUpdateFn func() error
640-
}
641-
642-
var _ (ConfigSource) = (*testConfigSource)(nil)
643-
var _ (Session) = (*testConfigSource)(nil)
644-
645-
func (t *testConfigSource) NewSession(context.Context) (Session, error) {
646-
if t.ErrOnNewSession != nil {
647-
return nil, t.ErrOnNewSession
648-
}
649-
return t, nil
650-
}
651-
652-
func (t *testConfigSource) Retrieve(ctx context.Context, selector string, params interface{}) (Retrieved, error) {
653-
if t.OnRetrieve != nil {
654-
if err := t.OnRetrieve(ctx, selector, params); err != nil {
655-
return nil, err
656-
}
657-
}
658-
659-
if t.ErrOnRetrieve != nil {
660-
return nil, t.ErrOnRetrieve
661-
}
662-
663-
entry, ok := t.ValueMap[selector]
664-
if !ok {
665-
return nil, fmt.Errorf("no value for selector %q", selector)
666-
}
667-
668-
watchForUpdateFn := func() error {
669-
return ErrWatcherNotSupported
670-
}
671-
672-
if entry.WatchForUpdateFn != nil {
673-
watchForUpdateFn = entry.WatchForUpdateFn
674-
}
675-
676-
return &retrieved{
677-
value: entry.Value,
678-
watchForUpdateFn: watchForUpdateFn,
679-
}, nil
680-
}
681-
682-
func (t *testConfigSource) RetrieveEnd(context.Context) error {
683-
return t.ErrOnRetrieveEnd
684-
}
685-
686-
func (t *testConfigSource) Close(context.Context) error {
687-
return t.ErrOnClose
688-
}

0 commit comments

Comments
 (0)