Skip to content

Commit 43ee0f3

Browse files
authored
Pinot Server : Refresh message for Table Schema (#15956)
1 parent 0710db8 commit 43ee0f3

File tree

4 files changed

+128
-5
lines changed

4 files changed

+128
-5
lines changed
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pinot.common.messages;
20+
21+
import java.util.UUID;
22+
import org.apache.helix.model.Message;
23+
import org.apache.helix.zookeeper.datamodel.ZNRecord;
24+
25+
// TODO: To evaluate if this message should be send on any table-config updates as well
26+
public class TableConfigSchemaRefreshMessage extends Message {
27+
public static final String REFRESH_TABLE_CONFIG_AND_SCHEMA = "REFRESH_TABLE_CONFIG_AND_SCHEMA";
28+
private static final String TABLE_NAME_KEY = "tableName";
29+
30+
/**
31+
* Constructor for the sender.
32+
*/
33+
public TableConfigSchemaRefreshMessage(String tableNameWithType) {
34+
super(Message.MessageType.USER_DEFINE_MSG, UUID.randomUUID().toString());
35+
setMsgSubType(REFRESH_TABLE_CONFIG_AND_SCHEMA);
36+
// Give it infinite time to process the message, as long as session is alive
37+
setExecutionTimeout(-1);
38+
ZNRecord znRecord = getRecord();
39+
znRecord.setSimpleField(TABLE_NAME_KEY, tableNameWithType);
40+
}
41+
42+
/**
43+
* Constructor for the receiver.
44+
*/
45+
public TableConfigSchemaRefreshMessage(Message message) {
46+
super(message.getRecord());
47+
if (!message.getMsgSubType().equals(REFRESH_TABLE_CONFIG_AND_SCHEMA)) {
48+
throw new IllegalArgumentException("Invalid message subtype:" + message.getMsgSubType());
49+
}
50+
}
51+
52+
public String getTableNameWithType() {
53+
return getRecord().getSimpleField(TABLE_NAME_KEY);
54+
}
55+
}

pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,8 @@ public enum ServerMeter implements AbstractMetrics.Meter {
140140

141141
DIRECT_MEMORY_OOM("directMemoryOOMCount", true),
142142

143+
TABLE_CONFIG_AND_SCHEMA_REFRESH_FAILURES("tables", true, "Number of failures to refresh table config and schema"),
144+
143145
// Multi-stage
144146
/**
145147
* Number of times the max number of rows in the hash table has been reached.

pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java

Lines changed: 41 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@
111111
import org.apache.pinot.common.messages.SegmentRefreshMessage;
112112
import org.apache.pinot.common.messages.SegmentReloadMessage;
113113
import org.apache.pinot.common.messages.TableConfigRefreshMessage;
114+
import org.apache.pinot.common.messages.TableConfigSchemaRefreshMessage;
114115
import org.apache.pinot.common.messages.TableDeletionMessage;
115116
import org.apache.pinot.common.metadata.ZKMetadataProvider;
116117
import org.apache.pinot.common.metadata.controllerjob.ControllerJobType;
@@ -1585,13 +1586,27 @@ public void updateSchema(Schema schema, boolean reload, boolean forceTableSchema
15851586
}
15861587

15871588
updateSchema(schema, oldSchema, forceTableSchemaUpdate);
1588-
1589-
if (reload) {
1590-
LOGGER.info("Reloading tables with name: {}", schemaName);
1589+
try {
15911590
List<String> tableNamesWithType = getExistingTableNamesWithType(schemaName, null);
1592-
for (String tableNameWithType : tableNamesWithType) {
1593-
reloadAllSegments(tableNameWithType, false, null);
1591+
if (reload) {
1592+
LOGGER.info("Reloading tables with name: {}", schemaName);
1593+
for (String tableNameWithType : tableNamesWithType) {
1594+
reloadAllSegments(tableNameWithType, false, null);
1595+
}
1596+
} else {
1597+
// Send schema refresh message to all tables that use this schema
1598+
for (String tableNameWithType : tableNamesWithType) {
1599+
LOGGER.info("Sending updated schema message for table: {}", tableNameWithType);
1600+
sendTableConfigSchemaRefreshMessage(tableNameWithType, getServerInstancesForTable(tableNameWithType,
1601+
TableNameBuilder.getTableTypeFromTableName(tableNameWithType)));
1602+
}
15941603
}
1604+
} catch (TableNotFoundException e) {
1605+
if (reload) {
1606+
throw e;
1607+
}
1608+
// We don't throw exception if no tables found for schema when reload is false. Since this could be valid case
1609+
LOGGER.warn("No tables found for schema (refresh only): {}", schemaName, e);
15951610
}
15961611
}
15971612

@@ -3286,6 +3301,27 @@ private void sendRoutingTableRebuildMessage(String tableNameWithType) {
32863301
}
32873302
}
32883303

3304+
private void sendTableConfigSchemaRefreshMessage(String tableNameWithType, List<String> instances) {
3305+
TableConfigSchemaRefreshMessage refreshMessage = new TableConfigSchemaRefreshMessage(tableNameWithType);
3306+
for (String instance : instances) {
3307+
// Send refresh message to servers
3308+
Criteria recipientCriteria = new Criteria();
3309+
recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
3310+
recipientCriteria.setInstanceName(instance);
3311+
recipientCriteria.setSessionSpecific(true);
3312+
ClusterMessagingService messagingService = _helixZkManager.getMessagingService();
3313+
// Send message with no callback and infinite timeout on the recipient
3314+
int numMessagesSent = messagingService.send(recipientCriteria, refreshMessage, null, -1);
3315+
if (numMessagesSent > 0) {
3316+
LOGGER.info("Sent {} schema refresh messages to servers for table: {} for instance: {}", numMessagesSent,
3317+
tableNameWithType, instance);
3318+
} else {
3319+
LOGGER.warn("No schema refresh message sent to servers for table: {} for instance: {}", tableNameWithType,
3320+
instance);
3321+
}
3322+
}
3323+
}
3324+
32893325
/**
32903326
* Update the instance config given the broker instance id
32913327
*/

pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentMessageHandlerFactory.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.pinot.common.messages.IngestionMetricsRemoveMessage;
3434
import org.apache.pinot.common.messages.SegmentRefreshMessage;
3535
import org.apache.pinot.common.messages.SegmentReloadMessage;
36+
import org.apache.pinot.common.messages.TableConfigSchemaRefreshMessage;
3637
import org.apache.pinot.common.messages.TableDeletionMessage;
3738
import org.apache.pinot.common.metrics.ServerGauge;
3839
import org.apache.pinot.common.metrics.ServerMeter;
@@ -74,6 +75,8 @@ public MessageHandler createHandler(Message message, NotificationContext context
7475
return new ForceCommitMessageHandler(new ForceCommitMessage(message), _metrics, context);
7576
case IngestionMetricsRemoveMessage.INGESTION_METRICS_REMOVE_MSG_SUB_TYPE:
7677
return new IngestionMetricsRemoveMessageHandler(new IngestionMetricsRemoveMessage(message), _metrics, context);
78+
case TableConfigSchemaRefreshMessage.REFRESH_TABLE_CONFIG_AND_SCHEMA:
79+
return new TableSchemaRefreshMessageHandler(new TableConfigSchemaRefreshMessage(message), _metrics, context);
7780
default:
7881
LOGGER.warn("Unsupported user defined message sub type: {} for segment: {}", msgSubType,
7982
message.getPartitionName());
@@ -243,6 +246,33 @@ public HelixTaskResult handleMessage() {
243246
}
244247
}
245248

249+
private class TableSchemaRefreshMessageHandler extends DefaultMessageHandler {
250+
TableSchemaRefreshMessageHandler(TableConfigSchemaRefreshMessage message, ServerMetrics metrics,
251+
NotificationContext context) {
252+
super(message, metrics, context);
253+
}
254+
255+
@Override
256+
public HelixTaskResult handleMessage() {
257+
_logger.info("Handling table schema refresh message for table: {}", _tableNameWithType);
258+
try {
259+
TableDataManager tableDataManager = _instanceDataManager.getTableDataManager(_tableNameWithType);
260+
if (tableDataManager != null) {
261+
// Update the table config and schema by fetching from ZK
262+
tableDataManager.fetchIndexLoadingConfig();
263+
} else {
264+
_logger.warn("No data manager found for table: {}", _tableNameWithType);
265+
}
266+
} catch (Exception e) {
267+
_metrics.addMeteredTableValue(_tableNameWithType, ServerMeter.TABLE_CONFIG_AND_SCHEMA_REFRESH_FAILURES, 1);
268+
Utils.rethrowException(e);
269+
}
270+
HelixTaskResult helixTaskResult = new HelixTaskResult();
271+
helixTaskResult.setSuccess(true);
272+
return helixTaskResult;
273+
}
274+
}
275+
246276
private static class DefaultMessageHandler extends MessageHandler {
247277
final String _segmentName;
248278
final String _tableNameWithType;

0 commit comments

Comments
 (0)