Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ case class DeltaOptimizedWriterExec(
getConf(DeltaSQLConf.DELTA_OPTIMIZE_WRITE_MAX_SHUFFLE_PARTITIONS))
}

private lazy val useShuffleManager: Boolean =
getConf(DeltaSQLConf.DELTA_OPTIMIZE_WRITE_USE_SHUFFLE_MANAGER)

@transient private var cachedShuffleRDD: ShuffledRowRDD = _

@transient private lazy val mapTracker = SparkEnv.get.mapOutputTracker
Expand Down Expand Up @@ -112,14 +115,61 @@ case class DeltaOptimizedWriterExec(
val maxBinSize =
ByteUnit.BYTE.convertFrom(getConf(DeltaSQLConf.DELTA_OPTIMIZE_WRITE_BIN_SIZE), ByteUnit.MiB)

val bins = shuffleStats.toSeq.flatMap(_._2).groupBy(_._1.asInstanceOf[ShuffleBlockId].reduceId)
.flatMap { case (_, blocks) =>
// Group blocks by reducer and calculate total size per reducer
val reducerGroups = shuffleStats.toSeq.flatMap(_._2)
.groupBy(_._1.asInstanceOf[ShuffleBlockId].reduceId)
.map { case (reducerId, blocks) =>
(reducerId, blocks, blocks.map(_._2).sum)
}.toSeq

val bins = if (useShuffleManager) {
// Remote shuffle mode: Never split reducers across bins to avoid duplicate reads.
// ShuffleManager.getReader() reads entire partitions, so if a reducer is split
// across bins, each bin would read the full partition causing data duplication.
val (largeReducers, smallReducers) = reducerGroups.partition(_._3 >= maxBinSize)

// Large reducers: each gets its own bin (may exceed maxBinSize, but that's acceptable)
val largeBins = largeReducers.map { case (_, blocks, _) =>
blocks.map(_._1)
}

// Small reducers: bin-pack together, keeping each reducer's blocks atomic
val smallBins = if (smallReducers.nonEmpty) {
BinPackingUtils.binPackBySize[(Int, Seq[(BlockId, Long, Int)], Long), Seq[BlockId]](
smallReducers,
_._3, // total size of reducer
_._2.map(_._1).toSeq, // all block IDs for this reducer
maxBinSize
).map(_.flatten)
} else {
Seq.empty
}

val result = largeBins ++ smallBins

// Verify no reducer is split across multiple bins (would cause duplicate data)
val reducerToBinCount = result.zipWithIndex.flatMap { case (bin, binIdx) =>
bin.map(_.asInstanceOf[ShuffleBlockId].reduceId).distinct.map(_ -> binIdx)
}.groupBy(_._1).mapValues(_.map(_._2).distinct.size)

val duplicates = reducerToBinCount.filter(_._2 > 1)
assert(duplicates.isEmpty,
s"Reducer(s) split across multiple bins in remote shuffle mode, " +
s"which would cause data duplication: ${duplicates.keys.mkString(", ")}")

result
} else {
// Local shuffle mode: Bin-pack individual blocks for optimal bin sizes.
// ShuffleBlockFetcherIterator can fetch specific blocks, so splitting
// a reducer across bins is safe and allows for better bin packing.
reducerGroups.flatMap { case (_, blocks, _) =>
BinPackingUtils.binPackBySize[(BlockId, Long, Int), BlockId](
blocks,
_._2, // size
_._1, // blockId
maxBinSize)
}
}

bins
.map { bin =>
Expand Down Expand Up @@ -191,7 +241,8 @@ case class DeltaOptimizedWriterExec(
sparkContext,
shuffledRDD.dependency,
readMetrics,
new OptimizedWriterBlocks(partitions))
new OptimizedWriterBlocks(partitions),
useShuffleManager)
}

private def getConf[T](entry: ConfigEntry[T]): T = {
Expand Down Expand Up @@ -219,7 +270,8 @@ private class DeltaOptimizedWriterRDD(
@transient sparkContext: SparkContext,
var dep: ShuffleDependency[Int, _, InternalRow],
metrics: Map[String, SQLMetric],
@transient blocks: OptimizedWriterBlocks)
@transient blocks: OptimizedWriterBlocks,
useShuffleManager: Boolean)
extends RDD[InternalRow](sparkContext, Seq(dep)) with DeltaLogging {

override def getPartitions: Array[Partition] = Array.tabulate(blocks.bins.length) { i =>
Expand Down Expand Up @@ -264,7 +316,8 @@ private class DeltaOptimizedWriterRDD(
dep,
context,
blocks,
sqlMetricsReporter)
sqlMetricsReporter,
useShuffleManager)
reader.read().map(_._2)
}

Expand All @@ -284,40 +337,75 @@ private class OptimizedWriterShuffleReader(
dep: ShuffleDependency[Int, _, InternalRow],
context: TaskContext,
blocks: Iterator[(BlockManagerId, ArrayBuffer[(BlockId, Long, Int)])],
readMetrics: ShuffleReadMetricsReporter) extends ShuffleReader[Int, InternalRow] {
readMetrics: ShuffleReadMetricsReporter,
useShuffleManager: Boolean) extends ShuffleReader[Int, InternalRow] {

/** Read the combined key-values for this reduce task */
override def read(): Iterator[Product2[Int, InternalRow]] = {
val wrappedStreams = new ShuffleBlockFetcherIterator(
context,
SparkEnv.get.blockManager.blockStoreClient,
SparkEnv.get.blockManager,
SparkEnv.get.mapOutputTracker,
blocks,
SparkEnv.get.serializerManager.wrapStream,
// Note: we use getSizeAsMb when no suffix is provided for backwards compatibility
SparkEnv.get.conf.getSizeAsMb("spark.reducer.maxSizeInFlight", "48m") * 1024 * 1024,
SparkEnv.get.conf.getInt("spark.reducer.maxReqsInFlight", Int.MaxValue),
SparkEnv.get.conf.get(config.REDUCER_MAX_BLOCKS_IN_FLIGHT_PER_ADDRESS),
SparkEnv.get.conf.get(config.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM),
SparkEnv.get.conf.get(config.SHUFFLE_MAX_ATTEMPTS_ON_NETTY_OOM),
SparkEnv.get.conf.getBoolean("spark.shuffle.detectCorrupt", true),
SparkEnv.get.conf.getBoolean("spark.shuffle.detectCorrupt.useExtraMemory", false),
SparkEnv.get.conf.getBoolean("spark.shuffle.checksum.enabled", true),
SparkEnv.get.conf.get("spark.shuffle.checksum.algorithm", "ADLER32"),
readMetrics,
false)

val serializerInstance = dep.serializer.newInstance()
if (useShuffleManager) {
// Remote shuffle service mode: Use ShuffleManager.getReader() API
// This is compatible with remote shuffle services like Apache Celeborn and Uniffle.
// Trade-off: May read entire partitions instead of specific blocks if bin-packing
// splits a partition across multiple bins.

// Collect all reducer IDs (partition IDs) from our blocks
val reducerIds = blocks.flatMap { case (_, blockList) =>
blockList.map(_._1.asInstanceOf[ShuffleBlockId].reduceId)
}.toSet

// Read from all the partitions we need using ShuffleManager API
val iterators = reducerIds.toSeq.sorted.map { reducerId =>
val reader = SparkEnv.get.shuffleManager.getReader(
dep.shuffleHandle,
reducerId,
reducerId + 1,
context,
readMetrics)
reader.read().asInstanceOf[Iterator[Product2[Int, InternalRow]]]
}

// Create a key/value iterator for each stream
val recordIter = wrappedStreams.flatMap { case (_, wrappedStream) =>
// Note: the asKeyValueIterator below wraps a key/value iterator inside of a
// NextIterator. The NextIterator makes sure that close() is called on the
// underlying InputStream when all records have been read.
serializerInstance.deserializeStream(wrappedStream).asKeyValueIterator
}.asInstanceOf[Iterator[Product2[Int, InternalRow]]]
// Combine all iterators into one
val combinedIterator = iterators.foldLeft(
Iterator.empty: Iterator[Product2[Int, InternalRow]])(_ ++ _)

new InterruptibleIterator[Product2[Int, InternalRow]](context, recordIter)
new InterruptibleIterator[Product2[Int, InternalRow]](context, combinedIterator)
} else {
// Default mode: Use ShuffleBlockFetcherIterator for optimal performance
// This reads only the specific blocks assigned to this bin.
// Only works with local shuffle (BlockManager-based).

val wrappedStreams = new ShuffleBlockFetcherIterator(
context,
SparkEnv.get.blockManager.blockStoreClient,
SparkEnv.get.blockManager,
SparkEnv.get.mapOutputTracker,
blocks,
SparkEnv.get.serializerManager.wrapStream,
// Note: we use getSizeAsMb when no suffix is provided for backwards compatibility
SparkEnv.get.conf.getSizeAsMb("spark.reducer.maxSizeInFlight", "48m") * 1024 * 1024,
SparkEnv.get.conf.getInt("spark.reducer.maxReqsInFlight", Int.MaxValue),
SparkEnv.get.conf.get(config.REDUCER_MAX_BLOCKS_IN_FLIGHT_PER_ADDRESS),
SparkEnv.get.conf.get(config.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM),
SparkEnv.get.conf.get(config.SHUFFLE_MAX_ATTEMPTS_ON_NETTY_OOM),
SparkEnv.get.conf.getBoolean("spark.shuffle.detectCorrupt", true),
SparkEnv.get.conf.getBoolean("spark.shuffle.detectCorrupt.useExtraMemory", false),
SparkEnv.get.conf.getBoolean("spark.shuffle.checksum.enabled", true),
SparkEnv.get.conf.get("spark.shuffle.checksum.algorithm", "ADLER32"),
readMetrics,
false)

val serializerInstance = dep.serializer.newInstance()

// Create a key/value iterator for each stream
val recordIter = wrappedStreams.flatMap { case (_, wrappedStream) =>
// Note: the asKeyValueIterator below wraps a key/value iterator inside of a
// NextIterator. The NextIterator makes sure that close() is called on the
// underlying InputStream when all records have been read.
serializerInstance.deserializeStream(wrappedStream).asKeyValueIterator
}.asInstanceOf[Iterator[Product2[Int, InternalRow]]]

new InterruptibleIterator[Product2[Int, InternalRow]](context, recordIter)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2727,6 +2727,18 @@ trait DeltaSQLConfBase extends DeltaSQLConfUtils {
.bytesConf(ByteUnit.MiB)
.createWithDefault(512)

val DELTA_OPTIMIZE_WRITE_USE_SHUFFLE_MANAGER =
buildConf("optimizeWrite.useShuffleManager")
.doc("When true, uses ShuffleManager.getReader() API for reading shuffle data, " +
"which is compatible with remote shuffle services like Apache Celeborn and Uniffle. " +
"In this mode, shuffle partitions are never split across bins to avoid reading " +
"duplicate data - small partitions are bin-packed together, while large partitions " +
"(exceeding binSize) each get their own bin and may produce larger output files. " +
"When false (default), uses ShuffleBlockFetcherIterator for optimal performance with " +
"local shuffle, allowing individual blocks to be bin-packed for precise file sizes.")
.booleanConf
.createWithDefault(false)

val DELTA_OPTIMIZE_CLUSTERING_MIN_CUBE_SIZE =
buildConf("optimize.clustering.mergeStrategy.minCubeSize.threshold")
.internal()
Expand Down
Loading