Skip to content

Commit 9467313

Browse files
authored
e2e: fix race between partition by tests (#3386)
non-distributed partitioned table is reading row for distributed partitioned table, which doesn't have partition by, thus we read table as not having a partition by expression
1 parent d13563c commit 9467313

File tree

5 files changed

+36
-32
lines changed

5 files changed

+36
-32
lines changed

flow/connectors/clickhouse/avro_sync.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -359,7 +359,7 @@ func (s *ClickHouseAvroSyncMethod) pushS3DataToClickHouse(
359359
slog.Uint64("numParts", numParts),
360360
slog.Int("chunkIdx", chunkIdx),
361361
slog.Any("error", err))
362-
return exceptions.NewQRepSyncError(err, config.DestinationTableIdentifier, s.ClickHouseConnector.config.Database)
362+
return exceptions.NewQRepSyncError(err, config.DestinationTableIdentifier, s.ClickHouseConnector.Config.Database)
363363
}
364364
s.logger.Info("inserted part",
365365
slog.Uint64("part", i),

flow/connectors/clickhouse/cdc.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ func (c *ClickHouseConnector) CreateRawTable(ctx context.Context, req *protos.Cr
5555
var rawDistributedName string
5656
rawTableName := c.GetRawTableName(req.FlowJobName)
5757
engine := "MergeTree()"
58-
if c.config.Replicated {
58+
if c.Config.Replicated {
5959
engine = fmt.Sprintf(
6060
"ReplicatedMergeTree('%s%s','{replica}')",
6161
zooPathPrefix,
@@ -80,8 +80,8 @@ func (c *ClickHouseConnector) CreateRawTable(ctx context.Context, req *protos.Cr
8080
if err := c.execWithLogging(ctx,
8181
fmt.Sprintf(createRawDistributedSQL, peerdb_clickhouse.QuoteIdentifier(rawDistributedName), onCluster,
8282
rawColumns,
83-
peerdb_clickhouse.QuoteIdentifier(c.config.Cluster),
84-
peerdb_clickhouse.QuoteIdentifier(c.config.Database),
83+
peerdb_clickhouse.QuoteIdentifier(c.Config.Cluster),
84+
peerdb_clickhouse.QuoteIdentifier(c.Config.Database),
8585
peerdb_clickhouse.QuoteIdentifier(rawTableName)),
8686
); err != nil {
8787
return nil, fmt.Errorf("unable to create raw table: %w", err)
@@ -195,7 +195,7 @@ func (c *ClickHouseConnector) ReplayTableSchemaDeltas(
195195
}
196196

197197
// Distributed table isn't created for null tables, no need to alter shard tables that don't exist
198-
if c.config.Cluster != "" && (tm == nil || tm.Engine != protos.TableEngine_CH_ENGINE_NULL) {
198+
if c.Config.Cluster != "" && (tm == nil || tm.Engine != protos.TableEngine_CH_ENGINE_NULL) {
199199
if err := c.execWithLogging(ctx,
200200
fmt.Sprintf("ALTER TABLE %s%s ADD COLUMN IF NOT EXISTS %s %s",
201201
peerdb_clickhouse.QuoteIdentifier(schemaDelta.DstTableName+"_shard"), onCluster,
@@ -236,7 +236,7 @@ func (c *ClickHouseConnector) RenameTables(
236236
continue
237237
}
238238

239-
resyncTableExists, err := c.checkIfTableExists(ctx, c.config.Database, renameRequest.CurrentName)
239+
resyncTableExists, err := c.checkIfTableExists(ctx, c.Config.Database, renameRequest.CurrentName)
240240
if err != nil {
241241
return nil, fmt.Errorf("unable to check if resync table %s exists: %w", renameRequest.CurrentName, err)
242242
}
@@ -246,7 +246,7 @@ func (c *ClickHouseConnector) RenameTables(
246246
continue
247247
}
248248

249-
originalTableExists, err := c.checkIfTableExists(ctx, c.config.Database, renameRequest.NewName)
249+
originalTableExists, err := c.checkIfTableExists(ctx, c.Config.Database, renameRequest.NewName)
250250
if err != nil {
251251
return nil, fmt.Errorf("unable to check if table %s exists: %w", renameRequest.NewName, err)
252252
}
@@ -323,7 +323,7 @@ func (c *ClickHouseConnector) RemoveTableEntriesFromRawTable(
323323
ctx context.Context,
324324
req *protos.RemoveTablesFromRawTableInput,
325325
) error {
326-
if c.config.Cluster != "" {
326+
if c.Config.Cluster != "" {
327327
// this operation isn't crucial, okay to skip
328328
c.logger.Info("skipping raw table cleanup of tables, DELETE not supported on Distributed table engine",
329329
slog.Any("tables", req.DestinationTableNames))

flow/connectors/clickhouse/clickhouse.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ type ClickHouseConnector struct {
3131
*metadataStore.PostgresMetadata
3232
database clickhouse.Conn
3333
logger log.Logger
34-
config *protos.ClickhouseConfig
34+
Config *protos.ClickhouseConfig
3535
credsProvider *utils.ClickHouseS3Credentials
3636
chVersion *chproto.Version
3737
}
@@ -103,7 +103,7 @@ func NewClickHouseConnector(
103103
connector := &ClickHouseConnector{
104104
database: database,
105105
PostgresMetadata: pgMetadata,
106-
config: config,
106+
Config: config,
107107
logger: logger,
108108
credsProvider: &utils.ClickHouseS3Credentials{
109109
Provider: credentialsProvider,
@@ -163,7 +163,7 @@ func ValidateClickHouseHost(ctx context.Context, chHost string, allowedDomainStr
163163
func (c *ClickHouseConnector) ValidateCheck(ctx context.Context) error {
164164
// validate clickhouse host
165165
allowedDomains := internal.PeerDBClickHouseAllowedDomains()
166-
if err := ValidateClickHouseHost(ctx, c.config.Host, allowedDomains); err != nil {
166+
if err := ValidateClickHouseHost(ctx, c.Config.Host, allowedDomains); err != nil {
167167
return err
168168
}
169169
validateDummyTableName := "peerdb_validation_" + shared.RandomString(4)
@@ -490,8 +490,8 @@ func (c *ClickHouseConnector) GetTableSchema(
490490
}
491491

492492
func (c *ClickHouseConnector) onCluster() string {
493-
if c.config.Cluster != "" {
494-
return " ON CLUSTER " + peerdb_clickhouse.QuoteIdentifier(c.config.Cluster)
493+
if c.Config.Cluster != "" {
494+
return " ON CLUSTER " + peerdb_clickhouse.QuoteIdentifier(c.Config.Cluster)
495495
}
496496
return ""
497497
}

flow/connectors/clickhouse/normalize.go

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ func (c *ClickHouseConnector) SetupNormalizedTable(
5050
destinationTableIdentifier string,
5151
sourceTableSchema *protos.TableSchema,
5252
) (bool, error) {
53-
tableAlreadyExists, err := c.checkIfTableExists(ctx, c.config.Database, destinationTableIdentifier)
53+
tableAlreadyExists, err := c.checkIfTableExists(ctx, c.Config.Database, destinationTableIdentifier)
5454
if err != nil {
5555
return false, fmt.Errorf("error occurred while checking if destination ClickHouse table exists: %w", err)
5656
}
@@ -99,7 +99,7 @@ func (c *ClickHouseConnector) generateCreateTableSQLForNormalizedTable(
9999

100100
switch tmEngine {
101101
case protos.TableEngine_CH_ENGINE_REPLACING_MERGE_TREE, protos.TableEngine_CH_ENGINE_REPLICATED_REPLACING_MERGE_TREE:
102-
if c.config.Replicated {
102+
if c.Config.Replicated {
103103
engine = fmt.Sprintf(
104104
"ReplicatedReplacingMergeTree('%s%s','{replica}',%s)",
105105
zooPathPrefix,
@@ -110,7 +110,7 @@ func (c *ClickHouseConnector) generateCreateTableSQLForNormalizedTable(
110110
engine = fmt.Sprintf("ReplacingMergeTree(%s)", peerdb_clickhouse.QuoteIdentifier(versionColName))
111111
}
112112
case protos.TableEngine_CH_ENGINE_MERGE_TREE, protos.TableEngine_CH_ENGINE_REPLICATED_MERGE_TREE:
113-
if c.config.Replicated {
113+
if c.Config.Replicated {
114114
engine = fmt.Sprintf(
115115
"ReplicatedMergeTree('%s%s','{replica}')",
116116
zooPathPrefix,
@@ -120,7 +120,7 @@ func (c *ClickHouseConnector) generateCreateTableSQLForNormalizedTable(
120120
engine = "MergeTree()"
121121
}
122122
case protos.TableEngine_CH_ENGINE_COALESCING_MERGE_TREE:
123-
if c.config.Replicated {
123+
if c.Config.Replicated {
124124
engine = fmt.Sprintf(
125125
"ReplicatedCoalescingMergeTree('%s%s','{replica}')",
126126
zooPathPrefix,
@@ -141,7 +141,7 @@ func (c *ClickHouseConnector) generateCreateTableSQLForNormalizedTable(
141141
var stmtBuilder strings.Builder
142142
var stmtBuilderDistributed strings.Builder
143143
var builders []*strings.Builder
144-
if c.config.Cluster != "" && tmEngine != protos.TableEngine_CH_ENGINE_NULL {
144+
if c.Config.Cluster != "" && tmEngine != protos.TableEngine_CH_ENGINE_NULL {
145145
builders = []*strings.Builder{&stmtBuilder, &stmtBuilderDistributed}
146146
} else {
147147
builders = []*strings.Builder{&stmtBuilder}
@@ -154,14 +154,14 @@ func (c *ClickHouseConnector) generateCreateTableSQLForNormalizedTable(
154154
} else {
155155
builder.WriteString("CREATE TABLE IF NOT EXISTS ")
156156
}
157-
if c.config.Cluster != "" && tmEngine != protos.TableEngine_CH_ENGINE_NULL && idx == 0 {
157+
if c.Config.Cluster != "" && tmEngine != protos.TableEngine_CH_ENGINE_NULL && idx == 0 {
158158
// distributed table gets destination name, avoid naming conflict
159159
builder.WriteString(peerdb_clickhouse.QuoteIdentifier(tableIdentifier + "_shard"))
160160
} else {
161161
builder.WriteString(peerdb_clickhouse.QuoteIdentifier(tableIdentifier))
162162
}
163-
if c.config.Cluster != "" {
164-
fmt.Fprintf(builder, " ON CLUSTER %s", peerdb_clickhouse.QuoteIdentifier(c.config.Cluster))
163+
if c.Config.Cluster != "" {
164+
fmt.Fprintf(builder, " ON CLUSTER %s", peerdb_clickhouse.QuoteIdentifier(c.Config.Cluster))
165165
}
166166
builder.WriteString(" (")
167167

@@ -253,10 +253,10 @@ func (c *ClickHouseConnector) generateCreateTableSQLForNormalizedTable(
253253
stmtBuilder.WriteString(" SETTINGS allow_nullable_key = 1")
254254
}
255255

256-
if c.config.Cluster != "" {
256+
if c.Config.Cluster != "" {
257257
fmt.Fprintf(&stmtBuilderDistributed, " ENGINE = Distributed(%s,%s,%s",
258-
peerdb_clickhouse.QuoteIdentifier(c.config.Cluster),
259-
peerdb_clickhouse.QuoteIdentifier(c.config.Database),
258+
peerdb_clickhouse.QuoteIdentifier(c.Config.Cluster),
259+
peerdb_clickhouse.QuoteIdentifier(c.Config.Database),
260260
peerdb_clickhouse.QuoteIdentifier(tableIdentifier+"_shard"),
261261
)
262262
if tableMapping.ShardingKey != "" {
@@ -483,7 +483,7 @@ func (c *ClickHouseConnector) NormalizeRecords(
483483
chConn = c.database
484484
} else {
485485
var err error
486-
chConn, err = Connect(errCtx, req.Env, c.config)
486+
chConn, err = Connect(errCtx, req.Env, c.Config)
487487
if err != nil {
488488
return err
489489
}
@@ -560,7 +560,7 @@ func (c *ClickHouseConnector) NormalizeRecords(
560560
req.Env,
561561
rawTbl,
562562
c.chVersion,
563-
c.config.Cluster != "",
563+
c.Config.Cluster != "",
564564
)
565565
insertIntoSelectQuery, err := queryGenerator.BuildQuery(ctx)
566566
if err != nil {

flow/e2e/clickhouse/peer_flow_ch_test.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2423,9 +2423,11 @@ func (s ClickHouseSuite) Test_PartitionBy() {
24232423
dstTableSuffix = "_shard"
24242424
}
24252425
require.NoError(s.t,
2426-
ch.QueryRow(s.t.Context(),
2427-
"select partition_key,sorting_key from system.tables where name="+clickhouse.QuoteLiteral(dstTableName+dstTableSuffix),
2428-
).Scan(&partitionKey, &sortingKey),
2426+
ch.QueryRow(s.t.Context(), fmt.Sprintf(
2427+
"select partition_key,sorting_key from system.tables where database=%s and name=%s",
2428+
clickhouse.QuoteLiteral(s.connector.Config.Database),
2429+
clickhouse.QuoteLiteral(dstTableName+dstTableSuffix),
2430+
)).Scan(&partitionKey, &sortingKey),
24292431
)
24302432
require.NoError(s.t, ch.Close())
24312433
require.Equal(s.t, "num", partitionKey)
@@ -2473,9 +2475,11 @@ func (s ClickHouseSuite) Test_PartitionByExpr() {
24732475
dstTableSuffix = "_shard"
24742476
}
24752477
require.NoError(s.t,
2476-
ch.QueryRow(s.t.Context(),
2477-
"select partition_key from system.tables where name="+clickhouse.QuoteLiteral(dstTableName+dstTableSuffix),
2478-
).Scan(&partitionKey),
2478+
ch.QueryRow(s.t.Context(), fmt.Sprintf(
2479+
"select partition_key from system.tables where database=%s and name=%s",
2480+
clickhouse.QuoteLiteral(s.connector.Config.Database),
2481+
clickhouse.QuoteLiteral(dstTableName+dstTableSuffix),
2482+
)).Scan(&partitionKey),
24792483
)
24802484
require.NoError(s.t, ch.Close())
24812485
require.Equal(s.t, "(num % 2, val)", partitionKey)

0 commit comments

Comments
 (0)