@@ -122,23 +122,47 @@ func TestExporter_pushLogsData(t *testing.T) {
122
122
})
123
123
}
124
124
125
- func TestLogsTableCreationOnCluster (t * testing.T ) {
126
- dbName := "test_db_" + time .Now ().Format ("20060102150405" )
125
+ func TestLogsClusterConfigOn (t * testing.T ) {
126
+ testClusterConfigOn (t , func (t * testing.T , dsn string , fns ... func (* Config )) {
127
+ exporter := newTestLogsExporter (t , dsn , fns ... )
128
+ require .NotEmpty (t , exporter .cfg .ClusterClause ())
129
+ })
130
+ }
131
+
132
+ func TestLogsClusterConfigOff (t * testing.T ) {
133
+ testClusterConfigOff (t , func (t * testing.T , dsn string , fns ... func (* Config )) {
134
+ exporter := newTestLogsExporter (t , dsn , fns ... )
135
+ require .Empty (t , exporter .cfg .ClusterClause ())
136
+ })
137
+ }
138
+
139
+ func testClusterConfigOn (t * testing.T , completion exporterValuesProvider ) {
140
+ initClickhouseTestServer (t , func (query string , values []driver.Value ) error {
141
+ require .NoError (t , checkClusterQueryStatememt (query , defaultCluster ))
142
+ return nil
143
+ })
144
+
127
145
var configMods []func (* Config )
128
146
configMods = append (configMods , func (cfg * Config ) {
129
- cfg .ClusterName = replicationCluster
130
- cfg .Database = dbName
131
- cfg .TableEngine = TableEngine {Name : "ReplicatedMergeTree" , Params : "" }
147
+ cfg .ClusterName = defaultCluster
148
+ cfg .Database = "test_db_" + time .Now ().Format ("20060102150405" )
132
149
})
133
150
134
- t .Run ("Check database and table creation on cluster" , func (t * testing.T ) {
135
- exporter := newTestLogsExporter (t , replicationEndpoint , configMods ... )
136
- require .NotEmpty (t , exporter .cfg .ClusterClause ())
151
+ completion (t , defaultEndpoint , configMods ... )
152
+ }
153
+
154
+ func testClusterConfigOff (t * testing.T , completion exporterValuesProvider ) {
155
+ initClickhouseTestServer (t , func (query string , values []driver.Value ) error {
156
+ require .Error (t , checkClusterQueryStatememt (query , defaultCluster ))
157
+ return nil
158
+ })
137
159
138
- samplesCount := 5
139
- mustPushLogsData ( t , exporter , simpleLogs ( samplesCount ))
140
- checkReplicatedTableCount ( t , dbName , getTableNames ( dbName , exporter . client ), samplesCount )
160
+ var configMods [] func ( * Config )
161
+ configMods = append ( configMods , func ( cfg * Config ) {
162
+ cfg . Database = "test_db_" + time . Now (). Format ( "20060102150405" )
141
163
})
164
+
165
+ completion (t , defaultEndpoint , configMods ... )
142
166
}
143
167
144
168
func newTestLogsExporter (t * testing.T , dsn string , fns ... func (* Config )) * logsExporter {
@@ -161,45 +185,22 @@ func withTestExporterConfig(fns ...func(*Config)) func(string) *Config {
161
185
}
162
186
}
163
187
164
- // Opens Clickhouse client to `replicationEndpoint2` to check if table data was replicated.
165
- func checkReplicatedTableCount (t * testing.T , dbName string , tableNames []string , expectedCount int ) {
166
- // wait for replication
167
- require .NotEmpty (t , tableNames )
168
- time .Sleep (1 * time .Second )
169
-
170
- config := withTestExporterConfig ()(replicationEndpoint2 )
171
- client , err := newClickhouseClient (config )
172
- require .NoError (t , err )
173
- defer client .Close ()
174
-
175
- println ("replication in db" , dbName , "; url:" , replicationEndpoint2 )
176
- for _ , tableName := range tableNames {
177
- println ("check count in replicated table" , tableName )
178
- query := fmt .Sprintf ("SELECT count(*) FROM %s.%s" , dbName , tableName )
179
- row := client .QueryRowContext (context .TODO (), query )
180
- var count int
181
- row .Scan (& count )
182
- require .Equal (t , expectedCount , count )
183
- }
184
- }
185
-
186
- // Get table names from database.
187
- func getTableNames (dbName string , client * sql.DB ) []string {
188
- rows , err := client .QueryContext (context .TODO (), "show tables from " + dbName )
189
- defer rows .Close ()
190
-
191
- if err != nil {
192
- return []string {}
193
- }
194
-
195
- var tableNames []string
196
- for rows .Next () {
197
- var tableName string
198
- rows .Scan (& tableName )
199
- tableNames = append (tableNames , tableName )
188
+ func checkClusterQueryStatememt (query string , clusterName string ) error {
189
+ trimmed := strings .Trim (query , "\n " )
190
+ line := strings .Split (trimmed , "\n " )[0 ]
191
+ line = strings .Trim (line , " (" )
192
+ lowercasedLine := strings .ToLower (line )
193
+ suffix := fmt .Sprintf ("ON CLUSTER %s" , clusterName )
194
+ prefixes := []string {"create database" , "create table" , "create materialized view" }
195
+ for _ , prefix := range prefixes {
196
+ if strings .HasPrefix (lowercasedLine , prefix ) {
197
+ if strings .HasSuffix (line , suffix ) {
198
+ return nil
199
+ }
200
+ }
200
201
}
201
202
202
- return tableNames
203
+ return errors . New ( fmt . Sprintf ( "Does not contain cluster clause: %s" , line ))
203
204
}
204
205
205
206
func simpleLogs (count int ) plog.Logs {
@@ -233,6 +234,7 @@ func initClickhouseTestServer(t *testing.T, recorder recorder) {
233
234
}
234
235
235
236
type recorder func (query string , values []driver.Value ) error
237
+ type exporterValuesProvider func (t * testing.T , dsn string , fns ... func (* Config ))
236
238
237
239
type testClickhouseDriver struct {
238
240
recorder recorder
0 commit comments