Skip to content

Commit 1700b4e

Browse files
committed
[Flink] Support withShard read of Flink
1 parent 9e362c8 commit 1700b4e

File tree

16 files changed

+400
-22
lines changed

16 files changed

+400
-22
lines changed

docs/layouts/shortcodes/generated/flink_connector_configuration.html

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@
162162
<td><h5>scan.split-enumerator.mode</h5></td>
163163
<td style="word-wrap: break-word;">fair</td>
164164
<td><p>Enum</p></td>
165-
<td>The mode used by StaticFileStoreSplitEnumerator to assign splits.<br /><br />Possible values:<ul><li>"fair": Distribute splits evenly when batch reading to prevent a few tasks from reading all.</li><li>"preemptive": Distribute splits preemptively according to the consumption speed of the task.</li></ul></td>
165+
<td>The mode used by StaticFileStoreSplitEnumerator to assign splits.<br /><br />Possible values:<ul><li>"fair": Distribute splits evenly when batch reading to prevent a few tasks from reading all.</li><li>"preemptive": Distribute splits preemptively according to the consumption speed of the task.</li><li>"empty": Splits will generate on TaskManager.</li></ul></td>
166166
</tr>
167167
<tr>
168168
<td><h5>scan.watermark.alignment.group</h5></td>
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.table.source;
20+
21+
/** Input splits. Needed by most batch computation engines. */
22+
public class EmptyDataSplit implements Split {
23+
24+
@Override
25+
public long rowCount() {
26+
return 0;
27+
}
28+
}

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -547,7 +547,8 @@ public enum SplitAssignMode implements DescribedEnum {
547547
"Distribute splits evenly when batch reading to prevent a few tasks from reading all."),
548548
PREEMPTIVE(
549549
"preemptive",
550-
"Distribute splits preemptively according to the consumption speed of the task.");
550+
"Distribute splits preemptively according to the consumption speed of the task."),
551+
EMPTY("empty", "Splits will generate on TaskManager.");
551552

552553
private final String value;
553554
private final String description;

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,18 +22,20 @@
2222
import org.apache.paimon.flink.NestedProjectedRowData;
2323
import org.apache.paimon.flink.source.metrics.FileStoreSourceReaderMetrics;
2424
import org.apache.paimon.flink.utils.TableScanUtils;
25-
import org.apache.paimon.table.source.TableRead;
2625

2726
import org.apache.flink.api.connector.source.SourceReader;
2827
import org.apache.flink.api.connector.source.SourceReaderContext;
2928
import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
29+
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
30+
import org.apache.flink.connector.file.src.reader.BulkFormat;
3031
import org.apache.flink.connector.file.src.reader.BulkFormat.RecordIterator;
3132
import org.apache.flink.table.data.RowData;
3233

3334
import javax.annotation.Nullable;
3435

3536
import java.util.Map;
3637
import java.util.Optional;
38+
import java.util.function.Supplier;
3739

3840
/** A {@link SourceReader} that read records from {@link FileStoreSourceSplit}. */
3941
public class FileStoreSourceReader
@@ -45,19 +47,14 @@ public class FileStoreSourceReader
4547
private long lastConsumeSnapshotId = Long.MIN_VALUE;
4648

4749
public FileStoreSourceReader(
50+
Supplier<SplitReader<BulkFormat.RecordIterator<RowData>, FileStoreSourceSplit>>
51+
splitReaderSupplier,
4852
SourceReaderContext readerContext,
49-
TableRead tableRead,
5053
FileStoreSourceReaderMetrics metrics,
5154
IOManager ioManager,
52-
@Nullable Long limit,
5355
@Nullable NestedProjectedRowData rowData) {
54-
// limiter is created in SourceReader, it can be shared in all split readers
5556
super(
56-
() ->
57-
new FileStoreSourceSplitReader(
58-
tableRead.withIOManager(ioManager),
59-
RecordLimiter.create(limit),
60-
metrics),
57+
splitReaderSupplier,
6158
(element, output, state) ->
6259
FlinkRecordsWithSplitIds.emitRecord(
6360
readerContext, element, output, state, metrics, rowData),

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplitReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public class FileStoreSourceSplitReader
6161

6262
@Nullable private final RecordLimiter limiter;
6363

64-
private final Queue<FileStoreSourceSplit> splits;
64+
protected final Queue<FileStoreSourceSplit> splits;
6565

6666
private final Pool<FileStoreRecordIterator> pool;
6767

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSource.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,12 +67,16 @@ public SourceReader<RowData, FileStoreSourceSplit> createReader(SourceReaderCont
6767
new FileStoreSourceReaderMetrics(metricGroup);
6868
TableRead tableRead =
6969
readBuilder.newRead().withMetricRegistry(new FlinkMetricRegistry(metricGroup));
70+
// limiter is created in SourceReader, it can be shared in all split readers
7071
return new FileStoreSourceReader(
72+
() ->
73+
new FileStoreSourceSplitReader(
74+
tableRead.withIOManager(ioManager),
75+
RecordLimiter.create(limit),
76+
sourceReaderMetrics),
7177
context,
72-
tableRead,
7378
sourceReaderMetrics,
7479
ioManager,
75-
limit,
7680
NestedProjectedRowData.copy(rowData));
7781
}
7882

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.paimon.flink.sink.FlinkSink;
2929
import org.apache.paimon.flink.source.align.AlignedContinuousFileStoreSource;
3030
import org.apache.paimon.flink.source.operator.MonitorSource;
31+
import org.apache.paimon.flink.source.shardread.ShardStaticFileStoreSource;
3132
import org.apache.paimon.flink.utils.TableScanUtils;
3233
import org.apache.paimon.options.Options;
3334
import org.apache.paimon.predicate.Predicate;
@@ -66,6 +67,7 @@
6667
import static org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType;
6768
import static org.apache.paimon.CoreOptions.StreamingReadMode.FILE;
6869
import static org.apache.paimon.flink.FlinkConnectorOptions.SOURCE_OPERATOR_UID_SUFFIX;
70+
import static org.apache.paimon.flink.FlinkConnectorOptions.SplitAssignMode.EMPTY;
6971
import static org.apache.paimon.flink.FlinkConnectorOptions.generateCustomUid;
7072
import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType;
7173
import static org.apache.paimon.flink.utils.ParallelismUtils.forwardParallelism;
@@ -198,6 +200,18 @@ private DataStream<RowData> buildStaticFileSource() {
198200
outerProject()));
199201
}
200202

203+
private DataStream<RowData> buildShardStaticFileSource() {
204+
Options options = Options.fromMap(table.options());
205+
return toDataStream(
206+
new ShardStaticFileStoreSource(
207+
createReadBuilder(projectedRowType()),
208+
limit,
209+
options.get(FlinkConnectorOptions.SCAN_SPLIT_ENUMERATOR_BATCH_SIZE),
210+
options.get(FlinkConnectorOptions.SCAN_SPLIT_ENUMERATOR_ASSIGN_MODE),
211+
dynamicPartitionFilteringInfo,
212+
outerProject()));
213+
}
214+
201215
private DataStream<RowData> buildContinuousFileSource() {
202216
return toDataStream(
203217
new ContinuousFileStoreSource(
@@ -295,6 +309,11 @@ public DataStream<RowData> build() {
295309
}
296310

297311
if (sourceBounded) {
312+
Options options = Options.fromMap(table.options());
313+
if (EMPTY.equals(
314+
options.get(FlinkConnectorOptions.SCAN_SPLIT_ENUMERATOR_ASSIGN_MODE))) {
315+
return buildShardStaticFileSource();
316+
}
298317
return buildStaticFileSource();
299318
}
300319
TableScanUtils.streamingReadingValidate(table);

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.paimon.flink.source.assigners.FIFOSplitAssigner;
2424
import org.apache.paimon.flink.source.assigners.PreAssignSplitAssigner;
2525
import org.apache.paimon.flink.source.assigners.SplitAssigner;
26+
import org.apache.paimon.flink.source.shardread.EmptySplitAssigner;
2627
import org.apache.paimon.table.source.InnerTableScan;
2728
import org.apache.paimon.table.source.ReadBuilder;
2829
import org.apache.paimon.table.source.TableScan;
@@ -87,7 +88,7 @@ public SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> restoreEnu
8788
context, null, splitAssigner, dynamicPartitionFilteringInfo);
8889
}
8990

90-
private List<FileStoreSourceSplit> getSplits(SplitEnumeratorContext context) {
91+
public List<FileStoreSourceSplit> getSplits(SplitEnumeratorContext context) {
9192
FileStoreSourceSplitGenerator splitGenerator = new FileStoreSourceSplitGenerator();
9293
TableScan scan = readBuilder.newScan();
9394
// register scan metrics
@@ -108,6 +109,8 @@ public static SplitAssigner createSplitAssigner(
108109
return new PreAssignSplitAssigner(splitBatchSize, context, splits);
109110
case PREEMPTIVE:
110111
return new FIFOSplitAssigner(splits);
112+
case EMPTY:
113+
return new EmptySplitAssigner(splitBatchSize, context, splits);
111114
default:
112115
throw new UnsupportedOperationException(
113116
"Unsupported assign mode " + splitAssignMode);

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.paimon.flink.NestedProjectedRowData;
2323
import org.apache.paimon.flink.PaimonDataStreamScanProvider;
2424
import org.apache.paimon.flink.Projection;
25+
import org.apache.paimon.flink.source.shardread.ShardStaticFileStoreSource;
2526
import org.apache.paimon.options.Options;
2627
import org.apache.paimon.predicate.Predicate;
2728
import org.apache.paimon.table.BucketMode;
@@ -39,6 +40,8 @@
3940

4041
import javax.annotation.Nullable;
4142

43+
import static org.apache.paimon.flink.FlinkConnectorOptions.SplitAssignMode.EMPTY;
44+
4245
/** A {@link FlinkTableSource} for system table. */
4346
public class SystemTableSource extends FlinkTableSource {
4447

@@ -100,9 +103,15 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
100103
new ContinuousFileStoreSource(
101104
readBuilder, table.options(), limit, BucketMode.HASH_FIXED, rowData);
102105
} else {
103-
source =
104-
new StaticFileStoreSource(
105-
readBuilder, limit, splitBatchSize, splitAssignMode, null, rowData);
106+
if (EMPTY.equals(splitAssignMode)) {
107+
source =
108+
new ShardStaticFileStoreSource(
109+
readBuilder, limit, splitBatchSize, splitAssignMode, null, rowData);
110+
} else {
111+
source =
112+
new StaticFileStoreSource(
113+
readBuilder, limit, splitBatchSize, splitAssignMode, null, rowData);
114+
}
106115
}
107116
return new PaimonDataStreamScanProvider(
108117
source.getBoundedness() == Boundedness.BOUNDED,

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedSourceReader.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@
2222
import org.apache.paimon.flink.NestedProjectedRowData;
2323
import org.apache.paimon.flink.source.FileStoreSourceReader;
2424
import org.apache.paimon.flink.source.FileStoreSourceSplit;
25+
import org.apache.paimon.flink.source.FileStoreSourceSplitReader;
2526
import org.apache.paimon.flink.source.FileStoreSourceSplitState;
27+
import org.apache.paimon.flink.source.RecordLimiter;
2628
import org.apache.paimon.flink.source.metrics.FileStoreSourceReaderMetrics;
2729
import org.apache.paimon.table.source.TableRead;
2830
import org.apache.paimon.utils.ReflectionUtils;
@@ -61,7 +63,16 @@ public AlignedSourceReader(
6163
IOManager ioManager,
6264
@Nullable Long limit,
6365
@Nullable NestedProjectedRowData rowData) {
64-
super(readerContext, tableRead, metrics, ioManager, limit, rowData);
66+
super(
67+
() ->
68+
new FileStoreSourceSplitReader(
69+
tableRead.withIOManager(ioManager),
70+
RecordLimiter.create(limit),
71+
metrics),
72+
readerContext,
73+
metrics,
74+
ioManager,
75+
rowData);
6576
this.nextCheckpointId = null;
6677
try {
6778
// In lower versions of Flink, the SplitFetcherManager does not provide the getQueue

0 commit comments

Comments
 (0)