diff --git a/.chloggen/clickhouse-replication.yaml b/.chloggen/clickhouse-replication.yaml
new file mode 100755
index 0000000000000..4092e7d8e5ae8
--- /dev/null
+++ b/.chloggen/clickhouse-replication.yaml
@@ -0,0 +1,27 @@
+# Use this changelog template to create an entry for release notes.
+
+# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
+change_type: 'enhancement'
+
+# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
+component: clickhouseexporter
+
+# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
+note: "Allow to define `CLUSTER` and `ENGINE` when create database or table"
+
+# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
+issues: [24649]
+
+# (Optional) One or more lines of additional information to render under the primary note.
+# These lines will be padded with 2 spaces and then inserted directly into the document.
+# Use pipe (|) for multiline entries.
+subtext: "Changes allow to implement 'replication for fault tolerance scenario'"
+
+# If your change doesn't affect end users or the exported elements of any package,
+# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
+# Optional: The change log or logs in which this entry should be included.
+# e.g. '[user]' or '[user, api]'
+# Include 'user' if the change is relevant to end users.
+# Include 'api' if there is a change to a library API.
+# Default: '[user]'
+change_logs: [api]
diff --git a/exporter/clickhouseexporter/README.md b/exporter/clickhouseexporter/README.md
index a351eebd12f29..d24c51a514b6d 100644
--- a/exporter/clickhouseexporter/README.md
+++ b/exporter/clickhouseexporter/README.md
@@ -13,7 +13,7 @@
-This exporter supports sending OpenTelemetry data to [ClickHouse](https://clickhouse.com/).
+This exporter supports sending OpenTelemetry data to [ClickHouse](https://clickhouse.com/).
> ClickHouse is an open-source, high performance columnar OLAP database management system for real-time analytics using
> SQL.
> Throughput can be measured in rows per second or megabytes per second.
@@ -302,6 +302,18 @@ Processing:
- `max_elapsed_time` (default = 300s): The maximum amount of time spent trying to send a batch; ignored if `enabled`
is `false`
+Cluster definition:
+
+- `cluster_name` (default = ): The cluster name allows to create database or table on cluster. When set '`ON CLUSTER cluster_name`' string is present in creation query.
+
+Table engine:
+
+- `table_engine`
+ - `name` (default = MergeTree)
+ - `params` (default = )
+
+Modifies `ENGINE` definition when table is created. If not set then `ENGINE` defaults to `MergeTree` Combined with `cluster_name` allows to implement [replication for fault tolerance scenario](https://clickhouse.com/docs/en/architecture/replication).
+
## TLS
The exporter supports TLS. To enable TLS, you need to specify the `secure=true` query parameter in the `endpoint` URL or
@@ -334,6 +346,10 @@ exporters:
initial_interval: 5s
max_interval: 30s
max_elapsed_time: 300s
+ # Uncomment if there is a need for replication accross cluster instances
+ # table_engine:
+ # name: ReplicatedMergeTree
+ # cluster_name: my_cluster
service:
pipelines:
logs:
diff --git a/exporter/clickhouseexporter/config.go b/exporter/clickhouseexporter/config.go
index 43dc5b7568005..2789829ad205d 100644
--- a/exporter/clickhouseexporter/config.go
+++ b/exporter/clickhouseexporter/config.go
@@ -40,6 +40,10 @@ type Config struct {
MetricsTableName string `mapstructure:"metrics_table_name"`
// TTLDays is The data time-to-live in days, 0 means no ttl.
TTLDays uint `mapstructure:"ttl_days"`
+ // TableEngine modifies ENGINE value when create table. See `TableEngineString` function for details.
+ TableEngine TableEngine `mapstructure:"table_engine"`
+ // ClusterName, if set, appends 'ON CLUSTER' definition when create database or table. For example: CREATE DATABASE IF NOT EXISTS db1 ON CLUSTER `ClusterName`
+ ClusterName string `mapstructure:"cluster_name"`
}
// QueueSettings is a subset of exporterhelper.QueueSettings.
@@ -48,7 +52,14 @@ type QueueSettings struct {
QueueSize int `mapstructure:"queue_size"`
}
+// TableEngine encapsulates ENGINE string value when create table. For example, Name = ReplicatedMergeTree and Params = "'par1', 'par2', par3" will generate ReplicatedMergeTree('par1', 'par2', par3) string value, see `Config.TableEngineString` for details.
+type TableEngine struct {
+ Name string `mapstructure:"name"`
+ Params string `mapstructure:"params"`
+}
+
const defaultDatabase = "default"
+const defaultTableEngineName = "MergeTree"
var (
errConfigNoEndpoint = errors.New("endpoint must be specified")
@@ -142,3 +153,26 @@ func (cfg *Config) buildDB(database string) (*sql.DB, error) {
return conn, nil
}
+
+// TableEngineString generates ENGINE string.
+// If `Name` and `TableEngine.Params` are set then return 'TableEngine.Name(TableEngine.Params)'. If `TableEngine.Params`is empty then return 'TableEngine.Name()'. Otherwise return 'defaultTableEngineName()'.
+func (cfg *Config) TableEngineString() (string) {
+ if cfg.TableEngine.Name == "" {
+ return fmt.Sprintf("%s()", defaultTableEngineName)
+ }
+
+ if cfg.TableEngine.Params == "" {
+ return fmt.Sprintf("%s()", cfg.TableEngine.Name)
+ }
+
+ return fmt.Sprintf("%s(%s)", cfg.TableEngine.Name, cfg.TableEngine.Params)
+}
+
+// ClusterClause produces `ON CLUSTER ClusterName` if `ClusterName` is not empty. Otherwise return "".
+func (cfg *Config) ClusterClause() (string) {
+ if cfg.ClusterName == "" {
+ return ""
+ }
+
+ return fmt.Sprintf("ON CLUSTER %s", cfg.ClusterName)
+}
diff --git a/exporter/clickhouseexporter/config_test.go b/exporter/clickhouseexporter/config_test.go
index 6f65295f2bda6..4e1404b984129 100644
--- a/exporter/clickhouseexporter/config_test.go
+++ b/exporter/clickhouseexporter/config_test.go
@@ -21,6 +21,49 @@ import (
)
const defaultEndpoint = "clickhouse://127.0.0.1:9000"
+const defaultCluster = "cluster_1S_2R"
+
+func TestTableEngineConfigParsing(t *testing.T) {
+ t.Parallel()
+ cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config-table-engine.yml"))
+ require.NoError(t, err)
+
+ tests := []struct {
+ id component.ID
+ expected string
+ } {
+ {
+ id: component.NewIDWithName(metadata.Type, "table-engine-empty"),
+ expected: "MergeTree()",
+ },
+ {
+ id: component.NewIDWithName(metadata.Type, "table-engine-name-only"),
+ expected: "ReplicatedReplacingMergeTree()",
+ },
+ {
+ id: component.NewIDWithName(metadata.Type, "table-engine-full"),
+ expected: "ReplicatedReplacingMergeTree('/clickhouse/tables/{shard}/table_name', '{replica}', ver)",
+ },
+ {
+ id: component.NewIDWithName(metadata.Type, "table-engine-invalid"),
+ expected: "MergeTree()",
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.id.String(), func(t *testing.T) {
+ factory := NewFactory()
+ cfg := factory.CreateDefaultConfig()
+
+ sub, err := cm.Sub(tt.id.String())
+ require.NoError(t, err)
+ require.NoError(t, component.UnmarshalConfig(sub, cfg))
+
+ assert.NoError(t, component.ValidateConfig(cfg))
+ assert.Equal(t, tt.expected, cfg.(*Config).TableEngineString())
+ })
+ }
+}
func TestLoadConfig(t *testing.T) {
t.Parallel()
diff --git a/exporter/clickhouseexporter/example/replication_setup/clickhouse-01/etc/clickhouse-server/config.d/macros.xml b/exporter/clickhouseexporter/example/replication_setup/clickhouse-01/etc/clickhouse-server/config.d/macros.xml
new file mode 100644
index 0000000000000..44f9b2d7e324d
--- /dev/null
+++ b/exporter/clickhouseexporter/example/replication_setup/clickhouse-01/etc/clickhouse-server/config.d/macros.xml
@@ -0,0 +1,7 @@
+
+
+ 01
+ 01
+ cluster_1S_2R
+
+
diff --git a/exporter/clickhouseexporter/example/replication_setup/clickhouse-01/etc/clickhouse-server/config.d/network-and-logging.xml b/exporter/clickhouseexporter/example/replication_setup/clickhouse-01/etc/clickhouse-server/config.d/network-and-logging.xml
new file mode 100644
index 0000000000000..f5cc408d8e9fa
--- /dev/null
+++ b/exporter/clickhouseexporter/example/replication_setup/clickhouse-01/etc/clickhouse-server/config.d/network-and-logging.xml
@@ -0,0 +1,13 @@
+
+
+ debug
+ /var/log/clickhouse-server/clickhouse-server.log
+ /var/log/clickhouse-server/clickhouse-server.err.log
+ 1000M
+ 3
+
+ cluster_1S_2R node 1
+ 0.0.0.0
+ 8123
+ 9000
+
diff --git a/exporter/clickhouseexporter/example/replication_setup/clickhouse-01/etc/clickhouse-server/config.d/remote-servers.xml b/exporter/clickhouseexporter/example/replication_setup/clickhouse-01/etc/clickhouse-server/config.d/remote-servers.xml
new file mode 100644
index 0000000000000..d410fb20a4965
--- /dev/null
+++ b/exporter/clickhouseexporter/example/replication_setup/clickhouse-01/etc/clickhouse-server/config.d/remote-servers.xml
@@ -0,0 +1,18 @@
+
+
+
+ mysecretphrase
+
+ true
+
+ clickhouse-01
+ 9000
+
+
+ clickhouse-02
+ 9000
+
+
+
+
+
diff --git a/exporter/clickhouseexporter/example/replication_setup/clickhouse-01/etc/clickhouse-server/config.d/use-keeper.xml b/exporter/clickhouseexporter/example/replication_setup/clickhouse-01/etc/clickhouse-server/config.d/use-keeper.xml
new file mode 100644
index 0000000000000..1f03f8d74f173
--- /dev/null
+++ b/exporter/clickhouseexporter/example/replication_setup/clickhouse-01/etc/clickhouse-server/config.d/use-keeper.xml
@@ -0,0 +1,9 @@
+
+
+
+
+ clickhouse-keeper-01
+ 9181
+
+
+
diff --git a/exporter/clickhouseexporter/example/replication_setup/clickhouse-01/etc/clickhouse-server/users.d/users.xml b/exporter/clickhouseexporter/example/replication_setup/clickhouse-01/etc/clickhouse-server/users.d/users.xml
new file mode 100644
index 0000000000000..a22d05d705f9d
--- /dev/null
+++ b/exporter/clickhouseexporter/example/replication_setup/clickhouse-01/etc/clickhouse-server/users.d/users.xml
@@ -0,0 +1,42 @@
+
+
+
+
+ 10000000000
+ 0
+ in_order
+ 1
+
+
+
+
+ 1
+ default
+
+ ::/0
+
+ default
+
+
+ 1
+ password
+ default
+
+ ::/0
+
+ default
+
+
+
+
+
+ 3600
+ 0
+ 0
+ 0
+ 0
+ 0
+
+
+
+
diff --git a/exporter/clickhouseexporter/example/replication_setup/clickhouse-02/etc/clickhouse-server/config.d/macros.xml b/exporter/clickhouseexporter/example/replication_setup/clickhouse-02/etc/clickhouse-server/config.d/macros.xml
new file mode 100644
index 0000000000000..142a1ea17beb2
--- /dev/null
+++ b/exporter/clickhouseexporter/example/replication_setup/clickhouse-02/etc/clickhouse-server/config.d/macros.xml
@@ -0,0 +1,7 @@
+
+
+ 01
+ 02
+ cluster_1S_2R
+
+
diff --git a/exporter/clickhouseexporter/example/replication_setup/clickhouse-02/etc/clickhouse-server/config.d/network-and-logging.xml b/exporter/clickhouseexporter/example/replication_setup/clickhouse-02/etc/clickhouse-server/config.d/network-and-logging.xml
new file mode 100644
index 0000000000000..ecf79fcfd76af
--- /dev/null
+++ b/exporter/clickhouseexporter/example/replication_setup/clickhouse-02/etc/clickhouse-server/config.d/network-and-logging.xml
@@ -0,0 +1,13 @@
+
+
+ debug
+ /var/log/clickhouse-server/clickhouse-server.log
+ /var/log/clickhouse-server/clickhouse-server.err.log
+ 1000M
+ 3
+
+ cluster_1S_2R node 2
+ 0.0.0.0
+ 8123
+ 9000
+
diff --git a/exporter/clickhouseexporter/example/replication_setup/clickhouse-02/etc/clickhouse-server/config.d/remote-servers.xml b/exporter/clickhouseexporter/example/replication_setup/clickhouse-02/etc/clickhouse-server/config.d/remote-servers.xml
new file mode 100644
index 0000000000000..d410fb20a4965
--- /dev/null
+++ b/exporter/clickhouseexporter/example/replication_setup/clickhouse-02/etc/clickhouse-server/config.d/remote-servers.xml
@@ -0,0 +1,18 @@
+
+
+
+ mysecretphrase
+
+ true
+
+ clickhouse-01
+ 9000
+
+
+ clickhouse-02
+ 9000
+
+
+
+
+
diff --git a/exporter/clickhouseexporter/example/replication_setup/clickhouse-02/etc/clickhouse-server/config.d/use-keeper.xml b/exporter/clickhouseexporter/example/replication_setup/clickhouse-02/etc/clickhouse-server/config.d/use-keeper.xml
new file mode 100644
index 0000000000000..1f03f8d74f173
--- /dev/null
+++ b/exporter/clickhouseexporter/example/replication_setup/clickhouse-02/etc/clickhouse-server/config.d/use-keeper.xml
@@ -0,0 +1,9 @@
+
+
+
+
+ clickhouse-keeper-01
+ 9181
+
+
+
diff --git a/exporter/clickhouseexporter/example/replication_setup/clickhouse-02/etc/clickhouse-server/users.d/users.xml b/exporter/clickhouseexporter/example/replication_setup/clickhouse-02/etc/clickhouse-server/users.d/users.xml
new file mode 100644
index 0000000000000..a22d05d705f9d
--- /dev/null
+++ b/exporter/clickhouseexporter/example/replication_setup/clickhouse-02/etc/clickhouse-server/users.d/users.xml
@@ -0,0 +1,42 @@
+
+
+
+
+ 10000000000
+ 0
+ in_order
+ 1
+
+
+
+
+ 1
+ default
+
+ ::/0
+
+ default
+
+
+ 1
+ password
+ default
+
+ ::/0
+
+ default
+
+
+
+
+
+ 3600
+ 0
+ 0
+ 0
+ 0
+ 0
+
+
+
+
diff --git a/exporter/clickhouseexporter/example/replication_setup/clickhouse-keeper-01/etc/clickhouse-keeper/keeper_config.xml b/exporter/clickhouseexporter/example/replication_setup/clickhouse-keeper-01/etc/clickhouse-keeper/keeper_config.xml
new file mode 100644
index 0000000000000..7ae68344028bb
--- /dev/null
+++ b/exporter/clickhouseexporter/example/replication_setup/clickhouse-keeper-01/etc/clickhouse-keeper/keeper_config.xml
@@ -0,0 +1,28 @@
+
+
+ trace
+ /var/log/clickhouse-keeper/clickhouse-keeper.log
+ /var/log/clickhouse-keeper/clickhouse-keeper.err.log
+ 1000M
+ 3
+
+ 0.0.0.0
+
+ 9181
+ 1
+ /var/lib/clickhouse/coordination/log
+ /var/lib/clickhouse/coordination/snapshots
+
+ 10000
+ 30000
+ trace
+
+
+
+ 1
+ clickhouse-keeper-01
+ 9234
+
+
+
+
diff --git a/exporter/clickhouseexporter/example/replication_setup/docker-compose.yml b/exporter/clickhouseexporter/example/replication_setup/docker-compose.yml
new file mode 100644
index 0000000000000..8904c750b05ec
--- /dev/null
+++ b/exporter/clickhouseexporter/example/replication_setup/docker-compose.yml
@@ -0,0 +1,37 @@
+version: '1.0'
+services:
+ clickhouse-keeper-01:
+ image: "clickhouse/clickhouse-keeper:${CHKVER:-latest-alpine}"
+ user: "101:101"
+ container_name: clickhouse-keeper-01
+ hostname: clickhouse-keeper-01
+ volumes:
+ - ${PWD}/clickhouse-keeper-01/etc/clickhouse-keeper/keeper_config.xml:/etc/clickhouse-keeper/keeper_config.xml
+ ports:
+ - "127.0.0.1:9181:9181"
+ clickhouse-01:
+ image: "clickhouse/clickhouse-server:${CHVER:-latest}"
+ user: "101:101"
+ container_name: clickhouse-01
+ hostname: clickhouse-01
+ volumes:
+ - ${PWD}/clickhouse-01/etc/clickhouse-server/config.d:/etc/clickhouse-server/config.d
+ - ${PWD}/clickhouse-01/etc/clickhouse-server/users.d:/etc/clickhouse-server/users.d
+ ports:
+ - "127.0.0.1:18123:8123"
+ - "127.0.0.1:19000:9000"
+ depends_on:
+ - clickhouse-keeper-01
+ clickhouse-02:
+ image: "clickhouse/clickhouse-server:${CHVER:-latest}"
+ user: "101:101"
+ container_name: clickhouse-02
+ hostname: clickhouse-02
+ volumes:
+ - ${PWD}/clickhouse-02/etc/clickhouse-server/config.d:/etc/clickhouse-server/config.d
+ - ${PWD}/clickhouse-02/etc/clickhouse-server/users.d:/etc/clickhouse-server/users.d
+ ports:
+ - "127.0.0.1:18124:8123"
+ - "127.0.0.1:19001:9000"
+ depends_on:
+ - clickhouse-keeper-01
diff --git a/exporter/clickhouseexporter/exporter_logs.go b/exporter/clickhouseexporter/exporter_logs.go
index a970bdb26c96a..edb7cf6143de4 100644
--- a/exporter/clickhouseexporter/exporter_logs.go
+++ b/exporter/clickhouseexporter/exporter_logs.go
@@ -128,7 +128,7 @@ func attributesToMap(attributes pcommon.Map) map[string]string {
const (
// language=ClickHouse SQL
createLogsTableSQL = `
-CREATE TABLE IF NOT EXISTS %s (
+CREATE TABLE IF NOT EXISTS %s %s (
Timestamp DateTime64(9) CODEC(Delta, ZSTD(1)),
TraceId String CODEC(ZSTD(1)),
SpanId String CODEC(ZSTD(1)),
@@ -152,7 +152,7 @@ CREATE TABLE IF NOT EXISTS %s (
INDEX idx_log_attr_key mapKeys(LogAttributes) TYPE bloom_filter(0.01) GRANULARITY 1,
INDEX idx_log_attr_value mapValues(LogAttributes) TYPE bloom_filter(0.01) GRANULARITY 1,
INDEX idx_body Body TYPE tokenbf_v1(32768, 3, 0) GRANULARITY 1
-) ENGINE MergeTree()
+) ENGINE = %s
%s
PARTITION BY toDate(Timestamp)
ORDER BY (ServiceName, SeverityText, toUnixTimestamp(Timestamp), TraceId)
@@ -218,7 +218,7 @@ func createDatabase(ctx context.Context, cfg *Config) error {
defer func() {
_ = db.Close()
}()
- query := fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s", cfg.Database)
+ query := fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s %s", cfg.Database, cfg.ClusterClause())
_, err = db.ExecContext(ctx, query)
if err != nil {
return fmt.Errorf("create database:%w", err)
@@ -238,7 +238,8 @@ func renderCreateLogsTableSQL(cfg *Config) string {
if cfg.TTLDays > 0 {
ttlExpr = fmt.Sprintf(`TTL toDateTime(Timestamp) + toIntervalDay(%d)`, cfg.TTLDays)
}
- return fmt.Sprintf(createLogsTableSQL, cfg.LogsTableName, ttlExpr)
+
+ return fmt.Sprintf(createLogsTableSQL, cfg.LogsTableName, cfg.ClusterClause(), cfg.TableEngineString(), ttlExpr)
}
func renderInsertLogsSQL(cfg *Config) string {
diff --git a/exporter/clickhouseexporter/exporter_logs_test.go b/exporter/clickhouseexporter/exporter_logs_test.go
index c2d2bc44f2d75..4cc038d931ab9 100644
--- a/exporter/clickhouseexporter/exporter_logs_test.go
+++ b/exporter/clickhouseexporter/exporter_logs_test.go
@@ -8,6 +8,7 @@ import (
"database/sql"
"database/sql/driver"
"errors"
+ "fmt"
"strings"
"testing"
"time"
@@ -121,6 +122,121 @@ func TestExporter_pushLogsData(t *testing.T) {
})
}
+func TestLogsClusterConfig(t *testing.T) {
+ testClusterConfig(t, func(t *testing.T, dsn string, test clusterTestConfig, fns ...func(*Config)) {
+ exporter := newTestLogsExporter(t, dsn, fns...)
+ test.sanityCheck(t, exporter.cfg)
+ })
+}
+
+func TestLogsTableEngineConfig(t *testing.T) {
+ testTableEngineConfig(t, func(t *testing.T, dsn string, test tableEngineTestConfig, fns ...func(*Config)) {
+ exporter := newTestLogsExporter(t, dsn, fns...)
+ test.sanityCheck(t, exporter.cfg.TableEngine)
+ })
+}
+
+func testClusterConfig(t *testing.T, completion clusterTestCompletion) {
+ tests := []clusterTestConfig {
+ {
+ name: "on",
+ cluster: defaultCluster,
+ shouldSucceed: true,
+ },
+ {
+ name: "off",
+ cluster: "",
+ shouldSucceed: false,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run("test cluster config " + tt.name, func(t *testing.T) {
+ initClickhouseTestServer(t, func(query string, values []driver.Value) error {
+ if tt.shouldSucceed {
+ require.NoError(t, checkClusterQueryDefinition(query, tt.cluster))
+ } else {
+ require.Error(t, checkClusterQueryDefinition(query, tt.cluster))
+ }
+ return nil
+ })
+
+ var configMods []func(*Config)
+ configMods = append(configMods, func(cfg *Config) {
+ cfg.ClusterName = tt.cluster
+ cfg.Database = "test_db_" + time.Now().Format("20060102150405")
+ })
+
+ completion(t, defaultEndpoint, tt, configMods...)
+ })
+ }
+}
+
+func testTableEngineConfig(t *testing.T, completion tableEngineTestCompletion) {
+ tests := []tableEngineTestConfig {
+ {
+ name: "no params",
+ teName: "CustomEngine",
+ teParams: "",
+ expectedTableName: "CustomEngine",
+ shouldSucceed: true,
+ },
+ {
+ name: "with params",
+ teName: "CustomEngine",
+ teParams: "'/x/y/z', 'some_param', another_param, last_param",
+ expectedTableName: "CustomEngine",
+ shouldSucceed: true,
+ },
+ {
+ name: "with empty name",
+ teName: "",
+ teParams: "",
+ expectedTableName: defaultTableEngineName,
+ shouldSucceed: true,
+ },
+ {
+ name: "fail",
+ teName: "CustomEngine",
+ teParams: "",
+ expectedTableName: defaultTableEngineName,
+ shouldSucceed: false,
+ },
+ }
+
+ for _, tt := range tests {
+ te := TableEngine{Name: tt.teName, Params: tt.teParams}
+ expectedTEValue := fmt.Sprintf("%s(%s)", tt.expectedTableName, tt.teParams)
+
+ t.Run("test table engine config " + tt.name, func(t *testing.T) {
+ initClickhouseTestServer(t, func(query string, values []driver.Value) error {
+ firstLine := getQueryFirstLine(query)
+ if !strings.HasPrefix(strings.ToLower(firstLine), "create table") {
+ return nil
+ }
+
+ check := checkTableEngineQueryDefinition(query, expectedTEValue)
+ if tt.shouldSucceed {
+ require.NoError(t, check)
+ } else {
+ require.Error(t, check)
+ }
+
+ return nil
+ })
+
+ var configMods []func(*Config)
+ if te.Name != "" {
+ configMods = append(configMods, func(cfg *Config) {
+ cfg.TableEngine = te
+ })
+ }
+
+ completion(t, defaultEndpoint, tt, configMods...)
+ })
+ }
+}
+
func newTestLogsExporter(t *testing.T, dsn string, fns ...func(*Config)) *logsExporter {
exporter, err := newLogsExporter(zaptest.NewLogger(t), withTestExporterConfig(fns...)(dsn))
require.NoError(t, err)
@@ -141,6 +257,45 @@ func withTestExporterConfig(fns ...func(*Config)) func(string) *Config {
}
}
+func checkClusterQueryDefinition(query string, clusterName string) error {
+ line := getQueryFirstLine(query)
+ lowercasedLine := strings.ToLower(line)
+ suffix := fmt.Sprintf("ON CLUSTER %s", clusterName)
+ prefixes := []string{"create database", "create table", "create materialized view"}
+ for _, prefix := range prefixes {
+ if strings.HasPrefix(lowercasedLine, prefix) {
+ if strings.HasSuffix(line, suffix) {
+ return nil
+ }
+ }
+ }
+
+ return errors.New(fmt.Sprintf("Does not contain cluster clause: %s", line))
+}
+
+func checkTableEngineQueryDefinition(query string, expectedEngineName string) error {
+ lines := strings.Split(query, "\n")
+ for _, line := range lines {
+ if strings.Contains(strings.ToLower(line), "engine = ") {
+ engine := strings.Split(line, " = ")[1]
+ engine = strings.Trim(engine, " ")
+ if engine == expectedEngineName {
+ return nil
+ } else {
+ return errors.New(fmt.Sprintf("Wrong engine name: %s, expected: %s", engine, expectedEngineName))
+ }
+ }
+ }
+
+ return errors.New(fmt.Sprintf("Query does not contain engine definition: %s", query))
+}
+
+func getQueryFirstLine(query string) string {
+ trimmed := strings.Trim(query, "\n")
+ line := strings.Split(trimmed, "\n")[0]
+ return strings.Trim(line, " (")
+}
+
func simpleLogs(count int) plog.Logs {
logs := plog.NewLogs()
rl := logs.ResourceLogs().AppendEmpty()
@@ -172,11 +327,43 @@ func initClickhouseTestServer(t *testing.T, recorder recorder) {
}
type recorder func(query string, values []driver.Value) error
+type clusterTestCompletion func(t *testing.T, dsn string, test clusterTestConfig, fns ...func(*Config))
+type tableEngineTestCompletion func(t *testing.T, dsn string, test tableEngineTestConfig, fns ...func(*Config))
type testClickhouseDriver struct {
recorder recorder
}
+type clusterTestConfig struct {
+ name string
+ cluster string
+ shouldSucceed bool
+}
+
+type tableEngineTestConfig struct {
+ name string
+ teName string
+ teParams string
+ expectedTableName string
+ shouldSucceed bool
+}
+
+func (teTest tableEngineTestConfig) sanityCheck(t *testing.T, te TableEngine) {
+ if teTest.teName == "" {
+ require.Empty(t, te.Name)
+ } else {
+ require.NotEmpty(t, te.Name)
+ }
+}
+
+func (test clusterTestConfig) sanityCheck(t *testing.T, cfg *Config) {
+ if test.cluster == "" {
+ require.Empty(t, cfg.ClusterClause())
+ } else {
+ require.NotEmpty(t, cfg.ClusterClause())
+ }
+}
+
func (t *testClickhouseDriver) Open(_ string) (driver.Conn, error) {
return &testClickhouseDriverConn{
recorder: t.recorder,
diff --git a/exporter/clickhouseexporter/exporter_metrics.go b/exporter/clickhouseexporter/exporter_metrics.go
index 916b9381e2011..db86c1e4917cd 100644
--- a/exporter/clickhouseexporter/exporter_metrics.go
+++ b/exporter/clickhouseexporter/exporter_metrics.go
@@ -42,7 +42,7 @@ func (e *metricsExporter) start(ctx context.Context, _ component.Host) error {
}
internal.SetLogger(e.logger)
- return internal.NewMetricsTable(ctx, e.cfg.MetricsTableName, e.cfg.TTLDays, e.client)
+ return internal.NewMetricsTable(ctx, e.cfg.MetricsTableName, e.cfg.ClusterClause(), e.cfg.TableEngineString(), e.cfg.TTLDays, e.client)
}
// shutdown will shut down the exporter.
diff --git a/exporter/clickhouseexporter/exporter_metrics_test.go b/exporter/clickhouseexporter/exporter_metrics_test.go
index 4058a80bc8ac8..5a0de32841c09 100644
--- a/exporter/clickhouseexporter/exporter_metrics_test.go
+++ b/exporter/clickhouseexporter/exporter_metrics_test.go
@@ -28,7 +28,7 @@ func TestExporter_pushMetricsData(t *testing.T) {
}
return nil
})
- exporter := newTestMetricsExporter(t)
+ exporter := newTestMetricsExporter(t, defaultEndpoint)
mustPushMetricsData(t, exporter, simpleMetrics(1))
require.Equal(t, int32(15), items.Load())
@@ -40,7 +40,7 @@ func TestExporter_pushMetricsData(t *testing.T) {
}
return nil
})
- exporter := newTestMetricsExporter(t)
+ exporter := newTestMetricsExporter(t, defaultEndpoint)
err := exporter.pushMetricsData(context.TODO(), simpleMetrics(2))
require.Error(t, err)
})
@@ -92,7 +92,7 @@ func TestExporter_pushMetricsData(t *testing.T) {
}
return nil
})
- exporter := newTestMetricsExporter(t)
+ exporter := newTestMetricsExporter(t, defaultEndpoint)
mustPushMetricsData(t, exporter, simpleMetrics(1))
require.Equal(t, int32(15), items.Load())
@@ -101,7 +101,7 @@ func TestExporter_pushMetricsData(t *testing.T) {
func Benchmark_pushMetricsData(b *testing.B) {
pm := simpleMetrics(1)
- exporter := newTestMetricsExporter(&testing.T{})
+ exporter := newTestMetricsExporter(&testing.T{}, defaultEndpoint)
b.ReportAllocs()
b.ResetTimer()
for n := 0; n < b.N; n++ {
@@ -110,6 +110,20 @@ func Benchmark_pushMetricsData(b *testing.B) {
}
}
+func TestMetricsClusterConfig(t *testing.T) {
+ testClusterConfig(t, func(t *testing.T, dsn string, test clusterTestConfig, fns ...func(*Config)) {
+ exporter := newTestMetricsExporter(t, dsn, fns...)
+ test.sanityCheck(t, exporter.cfg)
+ })
+}
+
+func TestMetricsTableEngineConfig(t *testing.T) {
+ testTableEngineConfig(t, func(t *testing.T, dsn string, test tableEngineTestConfig, fns ...func(*Config)) {
+ exporter := newTestMetricsExporter(t, dsn, fns...)
+ test.sanityCheck(t, exporter.cfg.TableEngine)
+ })
+}
+
// simpleMetrics there will be added two ResourceMetrics and each of them have count data point
func simpleMetrics(count int) pmetric.Metrics {
metrics := pmetric.NewMetrics()
@@ -451,8 +465,8 @@ func mustPushMetricsData(t *testing.T, exporter *metricsExporter, md pmetric.Met
require.NoError(t, err)
}
-func newTestMetricsExporter(t *testing.T) *metricsExporter {
- exporter, err := newMetricsExporter(zaptest.NewLogger(t), withTestExporterConfig()(defaultEndpoint))
+func newTestMetricsExporter(t *testing.T, dsn string, fns ...func(*Config)) *metricsExporter {
+ exporter, err := newMetricsExporter(zaptest.NewLogger(t), withTestExporterConfig(fns...)(dsn))
require.NoError(t, err)
require.NoError(t, exporter.start(context.TODO(), nil))
diff --git a/exporter/clickhouseexporter/exporter_traces.go b/exporter/clickhouseexporter/exporter_traces.go
index 6f03ee295833d..8ff5ace44ae8f 100644
--- a/exporter/clickhouseexporter/exporter_traces.go
+++ b/exporter/clickhouseexporter/exporter_traces.go
@@ -158,7 +158,7 @@ func convertLinks(links ptrace.SpanLinkSlice) ([]string, []string, []string, []m
const (
// language=ClickHouse SQL
createTracesTableSQL = `
-CREATE TABLE IF NOT EXISTS %s (
+CREATE TABLE IF NOT EXISTS %s %s (
Timestamp DateTime64(9) CODEC(Delta, ZSTD(1)),
TraceId String CODEC(ZSTD(1)),
SpanId String CODEC(ZSTD(1)),
@@ -191,7 +191,7 @@ CREATE TABLE IF NOT EXISTS %s (
INDEX idx_span_attr_key mapKeys(SpanAttributes) TYPE bloom_filter(0.01) GRANULARITY 1,
INDEX idx_span_attr_value mapValues(SpanAttributes) TYPE bloom_filter(0.01) GRANULARITY 1,
INDEX idx_duration Duration TYPE minmax GRANULARITY 1
-) ENGINE MergeTree()
+) ENGINE = %s
%s
PARTITION BY toDate(Timestamp)
ORDER BY (ServiceName, SpanName, toUnixTimestamp(Timestamp), TraceId)
@@ -249,18 +249,18 @@ SETTINGS index_granularity=8192, ttl_only_drop_parts = 1;
const (
createTraceIDTsTableSQL = `
-create table IF NOT EXISTS %s_trace_id_ts (
+create table IF NOT EXISTS %s_trace_id_ts %s (
TraceId String CODEC(ZSTD(1)),
Start DateTime64(9) CODEC(Delta, ZSTD(1)),
End DateTime64(9) CODEC(Delta, ZSTD(1)),
INDEX idx_trace_id TraceId TYPE bloom_filter(0.01) GRANULARITY 1
-) ENGINE MergeTree()
+) ENGINE = %s
%s
ORDER BY (TraceId, toUnixTimestamp(Start))
SETTINGS index_granularity=8192;
`
createTraceIDTsMaterializedViewSQL = `
-CREATE MATERIALIZED VIEW IF NOT EXISTS %s_trace_id_ts_mv
+CREATE MATERIALIZED VIEW IF NOT EXISTS %s_trace_id_ts_mv %s
TO %s.%s_trace_id_ts
AS SELECT
TraceId,
@@ -295,7 +295,7 @@ func renderCreateTracesTableSQL(cfg *Config) string {
if cfg.TTLDays > 0 {
ttlExpr = fmt.Sprintf(`TTL toDateTime(Timestamp) + toIntervalDay(%d)`, cfg.TTLDays)
}
- return fmt.Sprintf(createTracesTableSQL, cfg.TracesTableName, ttlExpr)
+ return fmt.Sprintf(createTracesTableSQL, cfg.TracesTableName, cfg.ClusterClause(), cfg.TableEngineString(), ttlExpr)
}
func renderCreateTraceIDTsTableSQL(cfg *Config) string {
@@ -303,10 +303,10 @@ func renderCreateTraceIDTsTableSQL(cfg *Config) string {
if cfg.TTLDays > 0 {
ttlExpr = fmt.Sprintf(`TTL toDateTime(Start) + toIntervalDay(%d)`, cfg.TTLDays)
}
- return fmt.Sprintf(createTraceIDTsTableSQL, cfg.TracesTableName, ttlExpr)
+ return fmt.Sprintf(createTraceIDTsTableSQL, cfg.TracesTableName, cfg.ClusterClause(), cfg.TableEngineString(), ttlExpr)
}
func renderTraceIDTsMaterializedViewSQL(cfg *Config) string {
return fmt.Sprintf(createTraceIDTsMaterializedViewSQL, cfg.TracesTableName,
- cfg.Database, cfg.TracesTableName, cfg.Database, cfg.TracesTableName)
+ cfg.ClusterClause(), cfg.Database, cfg.TracesTableName, cfg.Database, cfg.TracesTableName)
}
diff --git a/exporter/clickhouseexporter/exporter_traces_test.go b/exporter/clickhouseexporter/exporter_traces_test.go
index 2bff1fcdc2b2b..02b28e853ad48 100644
--- a/exporter/clickhouseexporter/exporter_traces_test.go
+++ b/exporter/clickhouseexporter/exporter_traces_test.go
@@ -48,6 +48,20 @@ func TestExporter_pushTracesData(t *testing.T) {
})
}
+func TestTracesClusterConfig(t *testing.T) {
+ testClusterConfig(t, func(t *testing.T, dsn string, test clusterTestConfig, fns ...func(*Config)) {
+ exporter := newTestTracesExporter(t, dsn, fns...)
+ test.sanityCheck(t, exporter.cfg)
+ })
+}
+
+func TestTracesTableEngineConfig(t *testing.T) {
+ testTableEngineConfig(t, func(t *testing.T, dsn string, test tableEngineTestConfig, fns ...func(*Config)) {
+ exporter := newTestTracesExporter(t, dsn, fns...)
+ test.sanityCheck(t, exporter.cfg.TableEngine)
+ })
+}
+
func newTestTracesExporter(t *testing.T, dsn string, fns ...func(*Config)) *tracesExporter {
exporter, err := newTracesExporter(zaptest.NewLogger(t), withTestExporterConfig(fns...)(dsn))
require.NoError(t, err)
diff --git a/exporter/clickhouseexporter/internal/exponential_histogram_metrics.go b/exporter/clickhouseexporter/internal/exponential_histogram_metrics.go
index 421d967cd61cb..38b9e8411cf79 100644
--- a/exporter/clickhouseexporter/internal/exponential_histogram_metrics.go
+++ b/exporter/clickhouseexporter/internal/exponential_histogram_metrics.go
@@ -17,7 +17,7 @@ import (
const (
// language=ClickHouse SQL
createExpHistogramTableSQL = `
-CREATE TABLE IF NOT EXISTS %s_exponential_histogram (
+CREATE TABLE IF NOT EXISTS %s_exponential_histogram %s (
ResourceAttributes Map(LowCardinality(String), String) CODEC(ZSTD(1)),
ResourceSchemaUrl String CODEC(ZSTD(1)),
ScopeName String CODEC(ZSTD(1)),
@@ -55,7 +55,7 @@ CREATE TABLE IF NOT EXISTS %s_exponential_histogram (
INDEX idx_scope_attr_value mapValues(ScopeAttributes) TYPE bloom_filter(0.01) GRANULARITY 1,
INDEX idx_attr_key mapKeys(Attributes) TYPE bloom_filter(0.01) GRANULARITY 1,
INDEX idx_attr_value mapValues(Attributes) TYPE bloom_filter(0.01) GRANULARITY 1
-) ENGINE MergeTree()
+) ENGINE = %s
%s
PARTITION BY toDate(TimeUnix)
ORDER BY (MetricName, Attributes, toUnixTimestamp64Nano(TimeUnix))
@@ -77,7 +77,7 @@ SETTINGS index_granularity=8192, ttl_only_drop_parts = 1;
StartTimeUnix,
TimeUnix,
Count,
- Sum,
+ Sum,
Scale,
ZeroCount,
PositiveOffset,
diff --git a/exporter/clickhouseexporter/internal/gauge_metrics.go b/exporter/clickhouseexporter/internal/gauge_metrics.go
index d16caa78f56a7..2f13196c2a978 100644
--- a/exporter/clickhouseexporter/internal/gauge_metrics.go
+++ b/exporter/clickhouseexporter/internal/gauge_metrics.go
@@ -17,7 +17,7 @@ import (
const (
// language=ClickHouse SQL
createGaugeTableSQL = `
-CREATE TABLE IF NOT EXISTS %s_gauge (
+CREATE TABLE IF NOT EXISTS %s_gauge %s (
ResourceAttributes Map(LowCardinality(String), String) CODEC(ZSTD(1)),
ResourceSchemaUrl String CODEC(ZSTD(1)),
ScopeName String CODEC(ZSTD(1)),
@@ -46,7 +46,7 @@ CREATE TABLE IF NOT EXISTS %s_gauge (
INDEX idx_scope_attr_value mapValues(ScopeAttributes) TYPE bloom_filter(0.01) GRANULARITY 1,
INDEX idx_attr_key mapKeys(Attributes) TYPE bloom_filter(0.01) GRANULARITY 1,
INDEX idx_attr_value mapValues(Attributes) TYPE bloom_filter(0.01) GRANULARITY 1
-) ENGINE MergeTree()
+) ENGINE = %s
%s
PARTITION BY toDate(TimeUnix)
ORDER BY (MetricName, Attributes, toUnixTimestamp64Nano(TimeUnix))
diff --git a/exporter/clickhouseexporter/internal/histogram_metrics.go b/exporter/clickhouseexporter/internal/histogram_metrics.go
index 0ce19fe4d08b2..f7928fa8c8d39 100644
--- a/exporter/clickhouseexporter/internal/histogram_metrics.go
+++ b/exporter/clickhouseexporter/internal/histogram_metrics.go
@@ -17,7 +17,7 @@ import (
const (
// language=ClickHouse SQL
createHistogramTableSQL = `
-CREATE TABLE IF NOT EXISTS %s_histogram (
+CREATE TABLE IF NOT EXISTS %s_histogram %s (
ResourceAttributes Map(LowCardinality(String), String) CODEC(ZSTD(1)),
ResourceSchemaUrl String CODEC(ZSTD(1)),
ScopeName String CODEC(ZSTD(1)),
@@ -51,7 +51,7 @@ CREATE TABLE IF NOT EXISTS %s_histogram (
INDEX idx_scope_attr_value mapValues(ScopeAttributes) TYPE bloom_filter(0.01) GRANULARITY 1,
INDEX idx_attr_key mapKeys(Attributes) TYPE bloom_filter(0.01) GRANULARITY 1,
INDEX idx_attr_value mapValues(Attributes) TYPE bloom_filter(0.01) GRANULARITY 1
-) ENGINE MergeTree()
+) ENGINE = %s
%s
PARTITION BY toDate(TimeUnix)
ORDER BY (MetricName, Attributes, toUnixTimestamp64Nano(TimeUnix))
diff --git a/exporter/clickhouseexporter/internal/metrics_model.go b/exporter/clickhouseexporter/internal/metrics_model.go
index 52641aae7dd04..bc9d0eef3e3c3 100644
--- a/exporter/clickhouseexporter/internal/metrics_model.go
+++ b/exporter/clickhouseexporter/internal/metrics_model.go
@@ -51,13 +51,13 @@ func SetLogger(l *zap.Logger) {
}
// NewMetricsTable create metric tables with an expiry time to storage metric telemetry data
-func NewMetricsTable(ctx context.Context, tableName string, ttlDays uint, db *sql.DB) error {
+func NewMetricsTable(ctx context.Context, tableName string, clusterClause string, tableEngine string, ttlDays uint, db *sql.DB) error {
var ttlExpr string
if ttlDays > 0 {
ttlExpr = fmt.Sprintf(`TTL toDateTime(TimeUnix) + toIntervalDay(%d)`, ttlDays)
}
for table := range supportedMetricTypes {
- query := fmt.Sprintf(table, tableName, ttlExpr)
+ query := fmt.Sprintf(table, tableName, clusterClause, tableEngine, ttlExpr)
if _, err := db.ExecContext(ctx, query); err != nil {
return fmt.Errorf("exec create metrics table sql: %w", err)
}
diff --git a/exporter/clickhouseexporter/internal/sum_metrics.go b/exporter/clickhouseexporter/internal/sum_metrics.go
index 69f9a9d1d5d58..472d79199d6e2 100644
--- a/exporter/clickhouseexporter/internal/sum_metrics.go
+++ b/exporter/clickhouseexporter/internal/sum_metrics.go
@@ -17,7 +17,7 @@ import (
const (
// language=ClickHouse SQL
createSumTableSQL = `
-CREATE TABLE IF NOT EXISTS %s_sum (
+CREATE TABLE IF NOT EXISTS %s_sum %s (
ResourceAttributes Map(LowCardinality(String), String) CODEC(ZSTD(1)),
ResourceSchemaUrl String CODEC(ZSTD(1)),
ScopeName String CODEC(ZSTD(1)),
@@ -48,7 +48,7 @@ CREATE TABLE IF NOT EXISTS %s_sum (
INDEX idx_scope_attr_value mapValues(ScopeAttributes) TYPE bloom_filter(0.01) GRANULARITY 1,
INDEX idx_attr_key mapKeys(Attributes) TYPE bloom_filter(0.01) GRANULARITY 1,
INDEX idx_attr_value mapValues(Attributes) TYPE bloom_filter(0.01) GRANULARITY 1
-) ENGINE MergeTree()
+) ENGINE = %s
%s
PARTITION BY toDate(TimeUnix)
ORDER BY (MetricName, Attributes, toUnixTimestamp64Nano(TimeUnix))
diff --git a/exporter/clickhouseexporter/internal/summary_metrics.go b/exporter/clickhouseexporter/internal/summary_metrics.go
index a62c9cdb697f9..e6c00f8d0229a 100644
--- a/exporter/clickhouseexporter/internal/summary_metrics.go
+++ b/exporter/clickhouseexporter/internal/summary_metrics.go
@@ -17,7 +17,7 @@ import (
const (
// language=ClickHouse SQL
createSummaryTableSQL = `
-CREATE TABLE IF NOT EXISTS %s_summary (
+CREATE TABLE IF NOT EXISTS %s_summary %s (
ResourceAttributes Map(LowCardinality(String), String) CODEC(ZSTD(1)),
ResourceSchemaUrl String CODEC(ZSTD(1)),
ScopeName String CODEC(ZSTD(1)),
@@ -44,7 +44,7 @@ CREATE TABLE IF NOT EXISTS %s_summary (
INDEX idx_scope_attr_value mapValues(ScopeAttributes) TYPE bloom_filter(0.01) GRANULARITY 1,
INDEX idx_attr_key mapKeys(Attributes) TYPE bloom_filter(0.01) GRANULARITY 1,
INDEX idx_attr_value mapValues(Attributes) TYPE bloom_filter(0.01) GRANULARITY 1
-) ENGINE MergeTree()
+) ENGINE = %s
%s
PARTITION BY toDate(TimeUnix)
ORDER BY (MetricName, Attributes, toUnixTimestamp64Nano(TimeUnix))
diff --git a/exporter/clickhouseexporter/testdata/config-table-engine.yml b/exporter/clickhouseexporter/testdata/config-table-engine.yml
new file mode 100644
index 0000000000000..cd208d450d4e5
--- /dev/null
+++ b/exporter/clickhouseexporter/testdata/config-table-engine.yml
@@ -0,0 +1,15 @@
+clickhouse/table-engine-empty:
+ endpoint: clickhouse://127.0.0.1:9000
+clickhouse/table-engine-name-only:
+ endpoint: clickhouse://127.0.0.1:9000
+ table_engine:
+ name: ReplicatedReplacingMergeTree
+clickhouse/table-engine-full:
+ endpoint: clickhouse://127.0.0.1:9000
+ table_engine:
+ name: ReplicatedReplacingMergeTree
+ params: "'/clickhouse/tables/{shard}/table_name', '{replica}', ver"
+clickhouse/table-engine-invalid:
+ endpoint: clickhouse://127.0.0.1:9000
+ table_engine:
+ params: "whatever"