Skip to content

Conversation

@zikangh
Copy link
Collaborator

@zikangh zikangh commented Dec 8, 2025

🥞 Stacked PR

Use this link to review incremental changes.


Which Delta project/connector is this regarding?

  • Spark
  • Standalone
  • Flink
  • Kernel
  • Other (fill in here)

Description

Add integration tests for kernel-spark DSv2 streaming and fix edge cases in offset handling.

Changes:

  1. Fixed offset capping in getBatch(): Cap endVersion to the latest available snapshot version to handle offsets pointing to non-existent future versions (which can occur when
    buildOffsetFromIndexedFile advances to the next version).

  2. Fixed starting version caching: Made getStartingVersion() synchronized and added caching to ensure idempotent behavior - startingVersion: "latest" must resolve to the same version across
    multiple calls.

  3. Created DeltaSourceDSv2Suite: Runs a subset of existing DSv1 streaming tests using DSv2 to verify API compatibility for startingVersion options.

How was this patch tested?

End-to-end streaming tests.

Does this PR introduce any user-facing changes?

No.

@zikangh zikangh changed the title integration2 [kernel-spark] Enable a few E2E dsv2 streaming tests and fix offset management edge cases Dec 8, 2025
@zikangh zikangh marked this pull request as ready for review December 8, 2025 19:44
@zikangh zikangh force-pushed the stack/integration2 branch from 403325d to e513d9b Compare December 8, 2025 19:56
@zikangh zikangh force-pushed the stack/integration2 branch 3 times, most recently from 07b5d05 to dee64c4 Compare December 9, 2025 00:20
gengliangwang pushed a commit that referenced this pull request Dec 9, 2025
…for dsv2 streaming (#5499)

## 🥞 Stacked PR
Use this [link](https://github.com/delta-io/delta/pull/5499/files) to
review incremental changes.
- [**stack/plan1**](#5499) [[Files
changed](https://github.com/delta-io/delta/pull/5499/files)]
- [stack/integration](#5572)
[[Files
changed](https://github.com/delta-io/delta/pull/5572/files/a0512bb563ff00a31461c2a188e11d59f19146e1..321432605fc1efe3253b30116de2d389f6f66977)]
- [stack/integration2](#5652)
[[Files
changed](https://github.com/delta-io/delta/pull/5652/files/321432605fc1efe3253b30116de2d389f6f66977..dee64c4ca3abbdd530c232e42325e702e9d61a0b)]
- [stack/reader](#5638) [[Files
changed](https://github.com/delta-io/delta/pull/5638/files/dee64c4ca3abbdd530c232e42325e702e9d61a0b..c8f25572585682e9d540bce0c4e982dc8dc1079c)]
- [stack/lazy](#5650) [[Files
changed](https://github.com/delta-io/delta/pull/5650/files/c8f25572585682e9d540bce0c4e982dc8dc1079c..0703f5750fa765d45c5b8c04b286d05f87e0ac6c)]
- [stack/snapshot](#5651) [[Files
changed](https://github.com/delta-io/delta/pull/5651/files/0703f5750fa765d45c5b8c04b286d05f87e0ac6c..cb6efb41f4cf62ec1501f6b7dce5e8d6e926eaf1)]

---------
<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [x] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

<!--
- Describe what this PR changes.
- Describe why we need the change.
 
If this PR resolves an issue be sure to include "Resolves #XXX" to
correctly link and close the issue upon merge.
-->
In this PR, we implement the DSv2 methods `planInputPartitions()` and
`createReaderFactory()`.
These are DSv2-only API that will replace DSv1's `getBatch()`

`planInputPartitions()`: Returns physical partitions describing how to
read the data; Called once per micro-batch during planning
`createReaderFactory()`: Returns a factory that creates readers for the
partitions; Each executor uses this factory to read its assigned
partitions


## How was this patch tested?

<!--
If tests were added, say they were added here. Please make sure to test
the changes thoroughly including negative and positive cases if
possible.
If the changes were tested in any way other than unit tests, please
clarify how you tested step by step (ideally copy and paste-able, so
that other reviewers can test and check, and descendants can verify in
the future).
If the changes were not tested, please explain why.
-->

## Does this PR introduce _any_ user-facing changes?

<!--
If yes, please clarify the previous behavior and the change this PR
proposes - provide the console output, description and/or an example to
show the behavior difference if possible.
If possible, please also clarify if this is a user-facing change
compared to the released Delta Lake versions or within the unreleased
branches such as master.
If no, write 'No'.
-->
@zikangh zikangh force-pushed the stack/integration2 branch 2 times, most recently from 53ee34c to e7853cf Compare December 10, 2025 03:02
TimothyW553 pushed a commit to TimothyW553/delta that referenced this pull request Dec 10, 2025
…for dsv2 streaming (delta-io#5499)

## 🥞 Stacked PR
Use this [link](https://github.com/delta-io/delta/pull/5499/files) to
review incremental changes.
- [**stack/plan1**](delta-io#5499) [[Files
changed](https://github.com/delta-io/delta/pull/5499/files)]
- [stack/integration](delta-io#5572)
[[Files
changed](https://github.com/delta-io/delta/pull/5572/files/a0512bb563ff00a31461c2a188e11d59f19146e1..321432605fc1efe3253b30116de2d389f6f66977)]
- [stack/integration2](delta-io#5652)
[[Files
changed](https://github.com/delta-io/delta/pull/5652/files/321432605fc1efe3253b30116de2d389f6f66977..dee64c4ca3abbdd530c232e42325e702e9d61a0b)]
- [stack/reader](delta-io#5638) [[Files
changed](https://github.com/delta-io/delta/pull/5638/files/dee64c4ca3abbdd530c232e42325e702e9d61a0b..c8f25572585682e9d540bce0c4e982dc8dc1079c)]
- [stack/lazy](delta-io#5650) [[Files
changed](https://github.com/delta-io/delta/pull/5650/files/c8f25572585682e9d540bce0c4e982dc8dc1079c..0703f5750fa765d45c5b8c04b286d05f87e0ac6c)]
- [stack/snapshot](delta-io#5651) [[Files
changed](https://github.com/delta-io/delta/pull/5651/files/0703f5750fa765d45c5b8c04b286d05f87e0ac6c..cb6efb41f4cf62ec1501f6b7dce5e8d6e926eaf1)]

---------
<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [x] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

<!--
- Describe what this PR changes.
- Describe why we need the change.
 
If this PR resolves an issue be sure to include "Resolves #XXX" to
correctly link and close the issue upon merge.
-->
In this PR, we implement the DSv2 methods `planInputPartitions()` and
`createReaderFactory()`.
These are DSv2-only API that will replace DSv1's `getBatch()`

`planInputPartitions()`: Returns physical partitions describing how to
read the data; Called once per micro-batch during planning
`createReaderFactory()`: Returns a factory that creates readers for the
partitions; Each executor uses this factory to read its assigned
partitions


## How was this patch tested?

<!--
If tests were added, say they were added here. Please make sure to test
the changes thoroughly including negative and positive cases if
possible.
If the changes were tested in any way other than unit tests, please
clarify how you tested step by step (ideally copy and paste-able, so
that other reviewers can test and check, and descendants can verify in
the future).
If the changes were not tested, please explain why.
-->

## Does this PR introduce _any_ user-facing changes?

<!--
If yes, please clarify the previous behavior and the change this PR
proposes - provide the console output, description and/or an example to
show the behavior difference if possible.
If possible, please also clarify if this is a user-facing change
compared to the released Delta Lake versions or within the unreleased
branches such as master.
If no, write 'No'.
-->
@zikangh zikangh requested a review from tdas December 10, 2025 23:48
@zikangh zikangh force-pushed the stack/integration2 branch 2 times, most recently from 6364515 to 86dac51 Compare December 12, 2025 18:46
huan233usc pushed a commit that referenced this pull request Dec 12, 2025
…r dsv2 streaming (#5572)

## 🥞 Stacked PR
Use this [link](https://github.com/delta-io/delta/pull/5572/files) to
review incremental changes.
- [**stack/integration**](#5572)
[[Files changed](https://github.com/delta-io/delta/pull/5572/files)]
- [stack/integration2](#5652)
[[Files
changed](https://github.com/delta-io/delta/pull/5652/files/6bae6e4ad461ef8ec4d5849c6ab70190b5c8e3c4..86dac519632a5acb21624b24b205d28d5bfb8e40)]
- [stack/reader](#5638) [[Files
changed](https://github.com/delta-io/delta/pull/5638/files/86dac519632a5acb21624b24b205d28d5bfb8e40..236b9425cddde4bf1a6155ff69666f3957b9bed2)]
- [stack/lazy](#5650) [[Files
changed](https://github.com/delta-io/delta/pull/5650/files/236b9425cddde4bf1a6155ff69666f3957b9bed2..08f8fe0046d45ebd955d78fa4c6ea8042248d46d)]
- [stack/lazy2](#5686) [[Files
changed](https://github.com/delta-io/delta/pull/5686/files/08f8fe0046d45ebd955d78fa4c6ea8042248d46d..45cb03377364de0d170969cff445bb86068e8557)]
- [stack/snapshot](#5651) [[Files
changed](https://github.com/delta-io/delta/pull/5651/files/45cb03377364de0d170969cff445bb86068e8557..bf0efa8312401fcee2e5fcc5301cf14df264a5c9)]

---------
<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [x] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description


Implement `deserializeOffset`, `commit`, and `stop` methods in
`SparkMicroBatchStream` to complete the Spark DSv2 streaming API
contract.

1. **`deserializeOffset(json)`** - Called when resuming from a
checkpoint.
2. **`commit(end)`** - Called after each micro-batch completes
successfully.
  3. **`stop()`** - Called during query shutdown.

<!--
- Describe what this PR changes.
- Describe why we need the change.
 
If this PR resolves an issue be sure to include "Resolves #XXX" to
correctly link and close the issue upon merge.
-->

## How was this patch tested?

Unit tests

<!--
If tests were added, say they were added here. Please make sure to test
the changes thoroughly including negative and positive cases if
possible.
If the changes were tested in any way other than unit tests, please
clarify how you tested step by step (ideally copy and paste-able, so
that other reviewers can test and check, and descendants can verify in
the future).
If the changes were not tested, please explain why.
-->

## Does this PR introduce _any_ user-facing changes?

No.

<!--
If yes, please clarify the previous behavior and the change this PR
proposes - provide the console output, description and/or an example to
show the behavior difference if possible.
If possible, please also clarify if this is a user-facing change
compared to the released Delta Lake versions or within the unreleased
branches such as master.
If no, write 'No'.
-->
}
}

test("new commits arrive after stream initialization") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you need a new test? did the existing tests that have been around for years not cover this particular case?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right! This is mostly similar to "basic", I removed it. The other test new commits arrive after stream initialization - with explicit startingVersion is not redundant and provides coverage for:

  1. the startingVersion is a numeric value
  2. multi-batch
  3. commits come in after stream initialization

}
}

test("new commits arrive after stream initialization") {
Copy link
Contributor

@tdas tdas Dec 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whats the difference between this test and next one? initialization with latest version vs startingVersion? if so, please make it clear in the name.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, default behavior (reads an initial snapshot) vs an explicit startingVersion. I renamed one of the test cases to disambiguate.

/**
* Test suite that runs DeltaSourceSuite using the V2 connector (V2_ENABLE_MODE=STRICT).
*/
class DeltaSourceDSv2Suite extends DeltaSourceSuite with V2ForceTest {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@huan233usc how long are we going to continue this pattern of overriding individual tests liek this?

if we want to add such tests in the short term, what is the long term plan of these v2 only tests when we will be running everything in v2 mode?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For streaming development, I think we will have this pattern for DeltaSource*Suite

Meanwhile I was thinking about replacing this allowlisting model with denylisting model (setting a github workflow with "auto" v2 selection) The resource might be a concern.

@@ -0,0 +1,56 @@
/*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we put all teh dsv2 specific tests in a separate directory so that we know which ones are dsv2 only tests?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to spark-unified/tests/v2 folder for example

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved it to spark-unified/src/test/ so it sits next to V2ForceTest.scala. Not sure if I should create the v2/ subfolder yet. perhaps @huan233usc can decide once tests are consolidated.

* Override disableLogCleanup to use DeltaLog API instead of SQL ALTER TABLE.
* Path-based ALTER TABLE doesn't work properly with V2_ENABLE_MODE=STRICT.
*/
override protected def disableLogCleanup(tablePath: String): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@huan233usc how are we tracking these gaps ?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it the general missing of alter table ddl support. We could track it via a github issue with a tag describing it is a v2 connector FR etc.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created #5731 and added a TODO.
@huan233usc -- could you help add details?

"new commits arrive after stream initialization with startingVersion"
)

override protected def shouldSkipTest(testName: String): Boolean = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@huan233usc can we change this from "should skip" to "expected to fail"?
basically, if the status of some tests changes for some reason from fail --> pass, but its fully skipped, how will we catch those changes?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, "expected to fail" is a better pattern than "should skip".

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Copy link
Contributor

@tdas tdas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lots of comments about test organization

"startingVersion latest defined before started",
"startingVersion latest works on defined but empty table",
"new commits arrive after stream initialization with startingVersion"
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think we should start with making a list of tests that should skipped so that we know exactly what is broken. its hard to understand otherwise how much is broken and needs more work.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. also made a list of shouldPass tests & added validation (a test case should either in shouldPass or shouldFail).


@Test
public void testStreamingRead(@TempDir File deltaTablePath) {
public void testStreamingReadMultipleVersions(@TempDir File deltaTablePath) throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@huan233usc what is the scope of the Dsv2BasicTest class? are all operations supposed to be added there? i want some structure and scope to each test suite, so that we dont end up with randomization in what test is where.

why not make a separate test suite for streaming?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think we should separate the test suites for streaming, and probably write and DML in the future.

However, for those tests kernel-spark/src/test/java/io/delta/kernel/spark/* I was thinking about consolidate it with existing v1 tests(which now could run with both v1 and v2 connector now) in the end

Dsv2BasicTest was added before we made the module change to enable running v1 and v2 connector with DeltaCatalog. It has a setup that we could write using v1 connector but read with v2 connector(with a special test catalog impl io.delta.kernel.spark.catalog.TestCatalog), and so we could cover some simple e2e scenario in the very beginning.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 we should consolidate eventually; currently DSv2BasicTest uses a custom TestCatalog, independent from DeltaCatalog's V2_ENABLE_MODE routing. imo it does provide value right now as we can test dsv2 without having to worry about integration/routing bugs.

@zikangh zikangh force-pushed the stack/integration2 branch 2 times, most recently from 7674172 to 428116e Compare December 16, 2025 00:11
@zikangh zikangh requested a review from tdas December 18, 2025 10:17
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants