Skip to content

Conversation

@vitaliili-db
Copy link
Contributor

Which Delta project/connector is this regarding?

  • Spark
  • Standalone
  • Flink
  • Kernel
  • Other (fill in here)

Description

This PR improves the Delta V2 streaming POC by:

  • Adding AUTO as a valid spark.databricks.delta.v2.enableMode value and keeping production default as NONE while enabling AUTO by default in SQL tests.
  • Introducing ApplyV2Streaming (extracted into its own rule) to rewrite V1 StreamingRelation to V2 StreamingRelationV2 backed by Kernel SparkTable when enabled.
  • Making DeltaDataSource.sourceSchema avoid redundant schema loading when the schema originated from the V2 streaming path, using a dedicated marker option.
  • Adding focused tests for the V2 streaming rewrite + schema marker behavior, including a test session catalog that injects UC markers.

How was this patch tested?

  • Added/updated tests:
    • ./build/sbt "spark/testOnly org.apache.spark.sql.delta.V2StreamingConversionSuite"
  • The AUTO end-to-end streaming test currently asserts the expected Kernel limitation:
    • initialOffset with initial snapshot is not supported yet (TODO left in test to flip to a passing test once supported).

Does this PR introduce any user-facing changes?

No.

case Some(catalogTable) =>
DeltaSourceUtils.isDeltaDataSourceName(s.sourceName) ||
catalogTable.provider.exists(DeltaSourceUtils.isDeltaDataSourceName) ||
CatalogTableUtils.isUnityCatalogManagedTable(catalogTable)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we skip checking CatalogTableUtils.isUnityCatalogManagedTable(catalogTable) here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the scope of the change is to handle only uc owned tables?

// scalastyle:off caselocale
mode.toUpperCase match {
// scalastyle:on caselocale
case "STRICT" =>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can just return V2 table in DeltaCatalog in STRICT mode. The new analyzer rule only happens on AUTO mode

case s: StreamingRelation if shouldApplyV2Streaming(s) =>
// catalogTable is guaranteed to be defined because shouldApplyV2Streaming checks it
// via isDeltaStreamingRelation, but we use pattern matching for safety
s.dataSource.catalogTable match {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if shouldApplyV2Streaming returns yes, s.dataSource.catalogTable should be defined

ScalaUtils.toJavaMap(catalogTable.properties))

// Add a marker to indicate this schema comes from V2 streaming (SparkTable)
// This allows DeltaDataSource.sourceSchema to distinguish between:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DeltaDataSource.sourceSchema is called before this rule

* UC-managed markers through Spark's normal catalog resolution paths, without manually patching
* CatalogTable objects in tests.
*/
class UCTableInjectingSessionCatalogInner extends DelegatingCatalogExtension {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why extending DelegatingCatalogExtension? customizing the loadTable method of UCTableInjectingSessionCatalog seems enough

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants