Skip to content

Conversation

@zemin-piao
Copy link

@zemin-piao zemin-piao commented Dec 11, 2025

Which Delta project/connector is this regarding?

  • Spark
  • Standalone
  • Flink
  • Kernel
  • Other (fill in here)

Description

Resolves #4343

How was this patch tested?

This was tested with running apache spark + deltalake + celeborn with client mode. No row duplicates + no shuffle fetch failures happens

Test Environment

- Platform: Kind (Kubernetes in Docker) cluster
- Shuffle Service: Apache Celeborn 0.6.2
- Spark: 4.0.1 with Delta Lake 4.1.0-SNAPSHOT
- Configuration:
  - spark.databricks.delta.optimizeWrite.enabled=true
  - spark.databricks.delta.optimizeWrite.useShuffleManager=true
  - spark.databricks.delta.optimizeWrite.binSize=512
  - spark.databricks.delta.optimizeWrite.numShuffleBlocks=2000
  - spark.shuffle.manager=org.apache.spark.shuffle.celeborn.SparkShuffleManager

Test Results

Test 1 - Basic Write (600K records)
- No FetchFailedException
- No data duplication (600K total = 600K distinct)
- Files properly consolidated

Test 2 - Partitioned Write (1M records)
- Partitioned by date + region
- No duplicates across partitions
- All partition data intact

Does this PR introduce any user-facing changes?

Yes, previous behavior is that when the optimizeWrite feature is enabled + when running in parallel remote shuffle service for spark, the FetchFailedException happens. After this PR, by specifying the flag useShuffleManager to be true, the FetchFailedException is prevented even using remote shuffle service (e.g. apache celeborn)

@zemin-piao zemin-piao changed the title Prevent FetchFailedException when enabling optimize write feature when running remote shuffle service (e.g. celeborn) [Spark] Prevent FetchFailedException when enabling optimize write feature when running remote shuffle service (e.g. celeborn) Dec 12, 2025
zemin-piao and others added 4 commits December 19, 2025 17:07
…ss bins

When using remote shuffle services (useShuffleManager=true), the
ShuffleManager.getReader() API reads entire partitions. If bin-packing
splits a reducer's blocks across multiple bins, each bin reads the full
partition, causing data duplication in output files.

This fix changes bin-packing behavior for remote shuffle mode:
- Large reducers (>= binSize): each gets its own bin
- Small reducers (< binSize): bin-packed together as atomic units
- Reducers are never split across bins

Trade-off: Large reducers may produce oversized files, but this is
acceptable compared to data duplication.

Local shuffle mode (default) is unchanged and continues to bin-pack
individual blocks for optimal file sizes.
@zemin-piao zemin-piao force-pushed the optimize_write_for_remote_shuffle_service branch from eb9cf29 to 2f4b8ea Compare December 19, 2025 16:07
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Feature Request] Support for remote shuffle services in OptimizedWrite

2 participants