Skip to content

Zookeeper config source #318

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/OneOfOne/xxhash v1.2.5 // indirect
github.com/client9/misspell v0.3.4
github.com/go-playground/universal-translator v0.17.0 // indirect
github.com/go-zookeeper/zk v1.0.2
github.com/gogo/googleapis v1.4.0 // indirect
github.com/golangci/golangci-lint v1.38.0
github.com/google/addlicense v0.0.0-20200906110928-a0294312aa76
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -671,6 +671,8 @@ github.com/go-xmlfmt/xmlfmt v0.0.0-20191208150333-d5b6f63a941b h1:khEcpUM4yFcxg4
github.com/go-xmlfmt/xmlfmt v0.0.0-20191208150333-d5b6f63a941b/go.mod h1:aUCEOzzezBEjDBbFBoSiya/gduyIiWYRP6CnSFIV8AM=
github.com/go-yaml/yaml v2.1.0+incompatible h1:RYi2hDdss1u4YE7GwixGzWwVo47T8UQwnTLB6vQiq+o=
github.com/go-yaml/yaml v2.1.0+incompatible/go.mod h1:w2MrLa16VYP0jy6N7M5kHaCkaLENm+P+Tv+MfurjSw0=
github.com/go-zookeeper/zk v1.0.2 h1:4mx0EYENAdX/B/rbunjlt5+4RTA/a9SMHBRuSKdGxPM=
github.com/go-zookeeper/zk v1.0.2/go.mod h1:nOB03cncLtlp4t+UAkGSV+9beXP/akpekBwL+UX1Qcw=
github.com/gobuffalo/attrs v0.0.0-20190224210810-a9411de4debd/go.mod h1:4duuawTqi2wkkpB4ePgWMaai6/Kc6WEz83bhFwpHzj0=
github.com/gobuffalo/depgen v0.0.0-20190329151759-d478694a28d3/go.mod h1:3STtPUQYuzV0gBVOY3vy6CfMm/ljR4pABfrTeHNLHUY=
github.com/gobuffalo/depgen v0.1.0/go.mod h1:+ifsuy7fhi15RWncXQQKjWS9JPkdah5sZvtHc2RXGlg=
Expand Down
43 changes: 43 additions & 0 deletions internal/configsource/zookeeperconfigsource/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# Zookeeper Config Source (Alpha)

Use the [Zookeeper](https://zookeeper.apache.org/) config source to retrieve data from
Zookeeper and inject it into your collector configuration.

## Configuration

Under the `config_sources:` use `zookeeper:` or `zookeeper/<name>:` to create a
Zookeeper config source. The following parameters are available to customize
Zookeeper config sources:

```yaml
config_sources:
zookeeper:
# endpoint is the Zookeeper server addresses. Config source will try to connect to
# these endpoints to access an Zookeeper cluster.
endpoints: [http://localhost:2181]
# timeout sets the amount of time for which a session is considered valid after
# losing connection to a server. Within the session timeout it's possible to
# reestablish a connection to a different server and keep the same session.
timeout: 10s
```

If multiple paths are needed create different instances of the config source, example:

```yaml
config_sources:
# Assuming that the environment variables ZOOKEEPER_ADDR is the defined and the
# different secrets are on the same server but at different paths.
zookeeper:
endpoints: [$ZOOKEEPER_ADDR]
zookeeper/another_cluster:
endpoints: [$ZOOKEEPER_2_ADDR]
timeout: 15s

# Both Zookeeper config sources can be used via their full name. Hypothetical example:
components:
component_using_zookeeper:
token: $zookeeper:/data/token

component_using_zookeeper_another_cluster:
token: $zookeeper/another_cluster:/data/token
```
29 changes: 29 additions & 0 deletions internal/configsource/zookeeperconfigsource/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright 2020 Splunk, Inc.
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package zookeeperconfigsource

import (
"time"

"github.com/signalfx/splunk-otel-collector/internal/configprovider"
)

// Config defines zookeeperconfigsource configuration
type Config struct {
*configprovider.Settings
Endpoints []string `mapstructure:"endpoints"`
Timeout time.Duration `mapstructure:"timeout"`
}
69 changes: 69 additions & 0 deletions internal/configsource/zookeeperconfigsource/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright 2020 Splunk, Inc.
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package zookeeperconfigsource

import (
"context"
"path"
"testing"
"time"

"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/config"
"go.uber.org/zap"

"github.com/signalfx/splunk-otel-collector/internal/configprovider"
)

func TestZookeeperLoadConfig(t *testing.T) {
fileName := path.Join("testdata", "config.yaml")
v, err := config.NewParserFromFile(fileName)
require.NoError(t, err)

factories := map[config.Type]configprovider.Factory{
typeStr: NewFactory(),
}

actualSettings, err := configprovider.Load(context.Background(), v, factories)
require.NoError(t, err)

expectedSettings := map[string]configprovider.ConfigSettings{
"zookeeper": &Config{
Settings: &configprovider.Settings{
TypeVal: "zookeeper",
NameVal: "zookeeper",
},
Endpoints: []string{"http://localhost:1234"},
Timeout: time.Second * 10,
},
"zookeeper/timeout": &Config{
Settings: &configprovider.Settings{
TypeVal: "zookeeper",
NameVal: "zookeeper/timeout",
},
Endpoints: []string{"https://localhost:3010"},
Timeout: time.Second * 8,
},
}

require.Equal(t, expectedSettings, actualSettings)

params := configprovider.CreateParams{
Logger: zap.NewNop(),
}
_, err = configprovider.Build(context.Background(), actualSettings, params, factories)
require.NoError(t, err)
}
55 changes: 55 additions & 0 deletions internal/configsource/zookeeperconfigsource/configsource.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright 2020 Splunk, Inc.
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package zookeeperconfigsource

import (
"context"
"time"

"github.com/go-zookeeper/zk"
"go.opentelemetry.io/collector/config/experimental/configsource"
"go.uber.org/zap"
)

type zookeeperconfigsource struct {
logger *zap.Logger
endpoints []string
timeout time.Duration
}

var _ configsource.ConfigSource = (*zookeeperconfigsource)(nil)

func (z *zookeeperconfigsource) NewSession(ctx context.Context) (configsource.Session, error) {
return newSession(z.logger, newConnectFunc(z.endpoints, z.timeout)), nil
}

// newConnectFunc returns a new function that can be used to establish and return a connection
// to a zookeeper cluster. Every function returned by newConnectFunc will return the same
// underlying connection until it is lost.
func newConnectFunc(endpoints []string, timeout time.Duration) connectFunc {
var conn *zk.Conn
return func(ctx context.Context) (zkConnection, error) {
if conn != nil && conn.State() != zk.StateDisconnected {
return conn, nil
}

conn, _, err := zk.Connect(endpoints, timeout, zk.WithLogInfo(false))
if err != nil {
return nil, err
}
return conn, nil
}
}
31 changes: 31 additions & 0 deletions internal/configsource/zookeeperconfigsource/connection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright 2020 Splunk, Inc.
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package zookeeperconfigsource

import (
"context"

"github.com/go-zookeeper/zk"
)

// zkConnection defines an interface that satisfies all functionality
// a session needs from zk.Conn. This allows us to easily mock
// the connection in tests.
type zkConnection interface {
GetW(string) ([]byte, *zk.Stat, <-chan zk.Event, error)
}

type connectFunc func(context.Context) (zkConnection, error)
81 changes: 81 additions & 0 deletions internal/configsource/zookeeperconfigsource/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package zookeeperconfigsource

import (
"context"
"errors"
"fmt"
"net/url"
"time"

"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/experimental/configsource"

"github.com/signalfx/splunk-otel-collector/internal/configprovider"
)

const (
// The "type" of zookeeper config sources in configuration.
typeStr = "zookeeper"

defaultEndpoint = "localhost:2181"
defaultTimeout = time.Second * 10
)

// Private error types to help with testability.
type (
errMissingEndpoint struct{ error }
errInvalidEndpoint struct{ error }
)

type zkFactory struct{}

func (v *zkFactory) Type() config.Type {
return typeStr
}

func (v *zkFactory) CreateDefaultConfig() configprovider.ConfigSettings {
return &Config{
Settings: configprovider.NewSettings(typeStr),
Endpoints: []string{defaultEndpoint},
Timeout: defaultTimeout,
}
}

func (v *zkFactory) CreateConfigSource(_ context.Context, params configprovider.CreateParams, cfg configprovider.ConfigSettings) (configsource.ConfigSource, error) {
zkCfg := cfg.(*Config)

if len(zkCfg.Endpoints) == 0 {
return nil, &errMissingEndpoint{errors.New("cannot connect to zk without any endpoints")}
}

for _, uri := range zkCfg.Endpoints {
if _, err := url.ParseRequestURI(uri); err != nil {
return nil, &errInvalidEndpoint{fmt.Errorf("invalid endpoint %q: %w", uri, err)}
}
}

return &zookeeperconfigsource{
logger: params.Logger,
endpoints: zkCfg.Endpoints,
timeout: zkCfg.Timeout,
}, nil
}

// NewFactory returns a new zookeekeper config source factory
func NewFactory() configprovider.Factory {
return &zkFactory{}
}
Loading