Skip to content

[Bug] InterruptedException when using Spark Compact Procedure paimon table with snapshot.expire.execution-mode=async #5681

Open
@lizc9

Description

@lizc9

Search before asking

  • I searched in the issues and found nothing similar.

Paimon version

Paimon 1.0.1

Compute Engine

Spark 3.4
Hive catalog with hive lock enable

Minimal reproduce step

  1. Create Paimon Table
create table paimon_hive.paimon.test_table (
`id` STRING,
`value` STRING
) WITH (
   'changelog-producer' = 'none',
   'bucket' = '10',
   'file.format' = 'parquet',
  'changelog-producer ' = 'none',
   'write-only' = 'true',
 'snapshot.expire.execution-mode' = 'async'
 );
  1. Execute Sql on Spark
CALL sys.compact(table => 'paimon.test_table', compact_strategy => '%s')
  1. Exeception
2025-05-30 14:18:20 ERROR [mainexpire-main-thread-thread-1] TableCommitImpl:326 - Executing expire encountered an error.
java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: Interrupted while waiting for the previous batch to be consumed
	at org.apache.paimon.utils.ObjectsCache.readSegments(ObjectsCache.java:143) ~[paimon-ali-emr-spark-3.4-1.0-ali-SNAPSHOT-34263962-cd7d9cbf-20250103.jar:1.0-ali-SNAPSHOT]
	at org.apache.paimon.utils.ObjectsCache.read(ObjectsCache.java:93) ~[paimon-ali-emr-spark-3.4-1.0-ali-SNAPSHOT-34263962-cd7d9cbf-20250103.jar:1.0-ali-SNAPSHOT]
	at org.apache.paimon.utils.ObjectsFile.readWithIOException(ObjectsFile.java:149) ~[paimon-ali-emr-spark-3.4-1.0-ali-SNAPSHOT-34263962-cd7d9cbf-20250103.jar:1.0-ali-SNAPSHOT]
	at org.apache.paimon.utils.ObjectsFile.read(ObjectsFile.java:134) ~[paimon-ali-emr-spark-3.4-1.0-ali-SNAPSHOT-34263962-cd7d9cbf-20250103.jar:1.0-ali-SNAPSHOT]
	at org.apache.paimon.utils.ObjectsFile.read(ObjectsFile.java:105) ~[paimon-ali-emr-spark-3.4-1.0-ali-SNAPSHOT-34263962-cd7d9cbf-20250103.jar:1.0-ali-SNAPSHOT]
	at org.apache.paimon.utils.ObjectsFile.read(ObjectsFile.java:101) ~[paimon-ali-emr-spark-3.4-1.0-ali-SNAPSHOT-34263962-cd7d9cbf-20250103.jar:1.0-ali-SNAPSHOT]
	at org.apache.paimon.manifest.ManifestList.readDataManifests(ManifestList.java:86) ~[paimon-ali-emr-spark-3.4-1.0-ali-SNAPSHOT-34263962-cd7d9cbf-20250103.jar:1.0-ali-SNAPSHOT]
	at org.apache.paimon.operation.FileDeletionBase.manifestSkippingSet(FileDeletionBase.java:420) ~[paimon-ali-emr-spark-3.4-1.0-ali-SNAPSHOT-34263962-cd7d9cbf-20250103.jar:1.0-ali-SNAPSHOT]
	at org.apache.paimon.table.ExpireSnapshotsImpl.expireUntil(ExpireSnapshotsImpl.java:231) ~[paimon-ali-emr-spark-3.4-1.0-ali-SNAPSHOT-34263962-cd7d9cbf-20250103.jar:1.0-ali-SNAPSHOT]
	at org.apache.paimon.table.ExpireSnapshotsImpl.expire(ExpireSnapshotsImpl.java:125) ~[paimon-ali-emr-spark-3.4-1.0-ali-SNAPSHOT-34263962-cd7d9cbf-20250103.jar:1.0-ali-SNAPSHOT]
	at org.apache.paimon.table.AbstractFileStoreTable.lambda$newCommit$2(AbstractFileStoreTable.java:443) ~[paimon-ali-emr-spark-3.4-1.0-ali-SNAPSHOT-34263962-cd7d9cbf-20250103.jar:1.0-ali-SNAPSHOT]
	at org.apache.paimon.table.sink.TableCommitImpl.expireSnapshots(TableCommitImpl.java:351) ~[paimon-ali-emr-spark-3.4-1.0-ali-SNAPSHOT-34263962-cd7d9cbf-20250103.jar:1.0-ali-SNAPSHOT]
	at org.apache.paimon.table.sink.TableCommitImpl.expire(TableCommitImpl.java:338) ~[paimon-ali-emr-spark-3.4-1.0-ali-SNAPSHOT-34263962-cd7d9cbf-20250103.jar:1.0-ali-SNAPSHOT]
	at org.apache.paimon.table.sink.TableCommitImpl.lambda$expire$5(TableCommitImpl.java:324) ~[paimon-ali-emr-spark-3.4-1.0-ali-SNAPSHOT-34263962-cd7d9cbf-20250103.jar:1.0-ali-SNAPSHOT]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_432]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_432]
	at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_432]
Caused by: java.lang.RuntimeException: java.lang.RuntimeException: Interrupted while waiting for the previous batch to be consumed
	at org.apache.paimon.reader.RecordReaderIterator.<init>(RecordReaderIterator.java:40) ~[paimon-ali-emr-spark-3.4-1.0-ali-SNAPSHOT-34263962-cd7d9cbf-20250103.jar:1.0-ali-SNAPSHOT]
	at org.apache.paimon.reader.RecordReader.toCloseableIterator(RecordReader.java:210) ~[paimon-ali-emr-spark-3.4-1.0-ali-SNAPSHOT-34263962-cd7d9cbf-20250103.jar:1.0-ali-SNAPSHOT]
	at org.apache.paimon.utils.ObjectsFile.createIterator(ObjectsFile.java:181) ~[paimon-ali-emr-spark-3.4-1.0-ali-SNAPSHOT-34263962-cd7d9cbf-20250103.jar:1.0-ali-SNAPSHOT]
	at org.apache.paimon.utils.ObjectsCache.readSegments(ObjectsCache.java:129) ~[paimon-ali-emr-spark-3.4-1.0-ali-SNAPSHOT-34263962-cd7d9cbf-20250103.jar:1.0-ali-SNAPSHOT]
	... 16 more
Caused by: java.lang.RuntimeException: Interrupted while waiting for the previous batch to be consumed
	at org.apache.paimon.format.avro.AvroBulkFormat$AvroReader.readBatch(AvroBulkFormat.java:99) ~[paimon-ali-emr-spark-3.4-1.0-ali-SNAPSHOT-34263962-cd7d9cbf-20250103.jar:1.0-ali-SNAPSHOT]
	at org.apache.paimon.format.avro.AvroBulkFormat$AvroReader.readBatch(AvroBulkFormat.java:57) ~[paimon-ali-emr-spark-3.4-1.0-ali-SNAPSHOT-34263962-cd7d9cbf-20250103.jar:1.0-ali-SNAPSHOT]
	at org.apache.paimon.reader.RecordReaderIterator.<init>(RecordReaderIterator.java:37) ~[paimon-ali-emr-spark-3.4-1.0-ali-SNAPSHOT-34263962-cd7d9cbf-20250103.jar:1.0-ali-SNAPSHOT]
	at org.apache.paimon.reader.RecordReader.toCloseableIterator(RecordReader.java:210) ~[paimon-ali-emr-spark-3.4-1.0-ali-SNAPSHOT-34263962-cd7d9cbf-20250103.jar:1.0-ali-SNAPSHOT]
	at org.apache.paimon.utils.ObjectsFile.createIterator(ObjectsFile.java:181) ~[paimon-ali-emr-spark-3.4-1.0-ali-SNAPSHOT-34263962-cd7d9cbf-20250103.jar:1.0-ali-SNAPSHOT]
	at org.apache.paimon.utils.ObjectsCache.readSegments(ObjectsCache.java:129) ~[paimon-ali-emr-spark-3.4-1.0-ali-SNAPSHOT-34263962-cd7d9cbf-20250103.jar:1.0-ali-SNAPSHOT]
	... 16 more
Caused by: java.lang.InterruptedException
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1220) ~[?:1.8.0_432]
	at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:335) ~[?:1.8.0_432]
	at java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:400) ~[?:1.8.0_432]
	at org.apache.paimon.utils.Pool.pollEntry(Pool.java:70) ~[paimon-ali-emr-spark-3.4-1.0-ali-SNAPSHOT-34263962-cd7d9cbf-20250103.jar:1.0-ali-SNAPSHOT]
	at org.apache.paimon.format.avro.AvroBulkFormat$AvroReader.readBatch(AvroBulkFormat.java:96) ~[paimon-ali-emr-spark-3.4-1.0-ali-SNAPSHOT-34263962-cd7d9cbf-20250103.jar:1.0-ali-SNAPSHOT]
	at org.apache.paimon.format.avro.AvroBulkFormat$AvroReader.readBatch(AvroBulkFormat.java:57) ~[paimon-ali-emr-spark-3.4-1.0-ali-SNAPSHOT-34263962-cd7d9cbf-20250103.jar:1.0-ali-SNAPSHOT]
	at org.apache.paimon.reader.RecordReaderIterator.<init>(RecordReaderIterator.java:37) ~[paimon-ali-emr-spark-3.4-1.0-ali-SNAPSHOT-34263962-cd7d9cbf-20250103.jar:1.0-ali-SNAPSHOT]
	at org.apache.paimon.reader.RecordReader.toCloseableIterator(RecordReader.java:210) ~[paimon-ali-emr-spark-3.4-1.0-ali-SNAPSHOT-34263962-cd7d9cbf-20250103.jar:1.0-ali-SNAPSHOT]
	at org.apache.paimon.utils.ObjectsFile.createIterator(ObjectsFile.java:181) ~[paimon-ali-emr-spark-3.4-1.0-ali-SNAPSHOT-34263962-cd7d9cbf-20250103.jar:1.0-ali-SNAPSHOT]
	at org.apache.paimon.utils.ObjectsCache.readSegments(ObjectsCache.java:129) ~[paimon-ali-emr-spark-3.4-1.0-ali-SNAPSHOT-34263962-cd7d9cbf-20250103.jar:1.0-ali-SNAPSHOT]
	... 16 more
  1. Guess the reason
    The asynchronous thread pool is executed shutdownNow, causing the asynchronous expire thread to stop
Image Image Image

What doesn't meet your expectations?

The Compact Procedure ignores the expire operation. The expire operation should be left to the Expire Procedure to perform.

PlanA:Reset table snapshot.expire.execution-mode option in Spark CompactProcedure (Simple but rough)

Expire is a single-threaded file deletion, which will block the Driver for a long time and prevent other tasks from being submitted.
Image

PlanB: Use expireMainExecutor.shutdown() instead of expireMainExecutor.shutdownNow() in TableCommitImpl

This plan may affect other places, so it is not as good as Plan A.

PlanC: add an internally used IGNORE enumeration value to ExpireExecutionMode. (Recommend)

Whenever the Compact Procedure is executed, change the snapshot.expire.execution-mode parameter of the table to IGNORE. If TableCommitImpl finds IGNORE in the commit phase, it ignores the expire operation.

Image

Anything else?

No response

Are you willing to submit a PR?

  • I'm willing to submit a PR!

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions