Skip to content
Open
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,13 @@ public final class CatalogTableUtils {

private CatalogTableUtils() {}

/**
* Matches delta-spark's test-mode signal (see org.apache.spark.sql.delta.util.Utils.isTesting).
*/
private static boolean isTesting() {
return System.getenv("DELTA_TESTING") != null;
}

/**
* Checks whether any catalog manages this table via CCv2 semantics.
*
Expand All @@ -65,6 +72,12 @@ private CatalogTableUtils() {}
public static boolean isCatalogManaged(CatalogTable table) {
requireNonNull(table, "table is null");
Map<String, String> storageProperties = getStorageProperties(table);
// Test-only escape hatch used by delta-spark suites to simulate Unity Catalog semantics
// without requiring a real commit coordinator / CCv2 table feature wiring.
// This should never be set in production catalogs.
if (isTesting() && storageProperties.containsKey("test.simulateUC")) {
return true;
}
return isCatalogManagedFeatureEnabled(storageProperties, FEATURE_CATALOG_MANAGED)
|| isCatalogManagedFeatureEnabled(storageProperties, FEATURE_CATALOG_OWNED_PREVIEW);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ object DeltaSQLConfV2 extends DeltaSQLConfUtils {
*
* Valid values:
* - NONE: V2 connector is disabled, always use V1 connector (DeltaTableV2) - default
* - AUTO: Automatically use V2 connector (SparkTable) for Unity Catalog managed tables
* in streaming queries and V1 connector (DeltaTableV2) for all other tables
* - STRICT: V2 connector is strictly enforced, always use V2 connector (Kernel SparkTable).
* Intended for testing V2 connector capabilities
*
Expand All @@ -39,9 +41,10 @@ object DeltaSQLConfV2 extends DeltaSQLConfUtils {
buildConf("v2.enableMode")
.doc(
"Controls the Delta V2 connector enable mode. " +
"Valid values: NONE (disabled, default), STRICT (should ONLY be enabled for testing).")
"Valid values: NONE (disabled, default), AUTO (use V2 for Unity Catalog managed tables), " +
"STRICT (should ONLY be enabled for testing).")
.stringConf
.checkValues(Set("NONE", "STRICT"))
.checkValues(Set("AUTO", "NONE", "STRICT"))
.createWithDefault("NONE")
}

135 changes: 135 additions & 0 deletions spark-unified/src/main/scala/io/delta/sql/ApplyV2Streaming.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Copyright (2025) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.sql

import scala.jdk.CollectionConverters._

import io.delta.kernel.spark.catalog.SparkTable
import io.delta.kernel.spark.utils.{CatalogTableUtils, ScalaUtils}
import org.apache.spark.sql.delta.sources.{DeltaSQLConfV2, DeltaSourceUtils}
import org.apache.spark.sql.delta.sources.DeltaV2StreamingUtils

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2
import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.execution.streaming.StreamingRelation
import org.apache.spark.sql.util.CaseInsensitiveStringMap

/**
* Rule for applying the V2 streaming path by rewriting V1 StreamingRelation
* with Delta DataSource to StreamingRelationV2 with SparkTable.
*
* This rule handles the case where Spark's FindDataSourceTable rule has converted
* a StreamingRelationV2 (with DeltaTableV2) back to a StreamingRelation because
* DeltaTableV2 doesn't advertise STREAMING_READ capability. We convert it back to
* StreamingRelationV2 with SparkTable (from kernel-spark) which does support streaming.
*
* Behavior based on spark.databricks.delta.v2.enableMode:
* - AUTO (default): Only applies to Unity Catalog managed tables
* - STRICT: Applies to all Delta tables (for testing V2 streaming)
* - NONE: Rule is disabled, no conversion happens
*
* @param session The Spark session for configuration access
*/
class ApplyV2Streaming(
@transient private val session: SparkSession)
extends Rule[LogicalPlan] {

private def isDeltaStreamingRelation(s: StreamingRelation): Boolean = {
// Check if this is a Delta streaming relation by examining:
// 1. The source name (e.g., "delta" from .format("delta"))
// 2. The catalog table's provider (e.g., "DELTA" from Unity Catalog)
// 3. Whether the table is a Unity Catalog managed table
s.dataSource.catalogTable match {
case Some(catalogTable) =>
DeltaSourceUtils.isDeltaDataSourceName(s.sourceName) ||
catalogTable.provider.exists(DeltaSourceUtils.isDeltaDataSourceName) ||
CatalogTableUtils.isUnityCatalogManagedTable(catalogTable)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we skip checking CatalogTableUtils.isUnityCatalogManagedTable(catalogTable) here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the scope of the change is to handle only uc owned tables?

case None => false
}
}

private def shouldApplyV2Streaming(s: StreamingRelation): Boolean = {
if (!isDeltaStreamingRelation(s)) {
return false
}

val mode = session.conf.get(
DeltaSQLConfV2.V2_ENABLE_MODE.key,
DeltaSQLConfV2.V2_ENABLE_MODE.defaultValueString)

// scalastyle:off caselocale
mode.toUpperCase match {
// scalastyle:on caselocale
case "STRICT" =>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can just return V2 table in DeltaCatalog in STRICT mode. The new analyzer rule only happens on AUTO mode

// Always apply V2 streaming for all Delta tables
true
case "AUTO" =>
// Only apply for Unity Catalog managed tables
// catalogTable is guaranteed to be Some because isDeltaStreamingRelation checked it
s.dataSource.catalogTable.exists(CatalogTableUtils.isUnityCatalogManagedTable)
case "NONE" | _ =>
// V2 streaming disabled
false
}
}

override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
case s: StreamingRelation if shouldApplyV2Streaming(s) =>
// catalogTable is guaranteed to be defined because shouldApplyV2Streaming checks it
// via isDeltaStreamingRelation, but we use pattern matching for safety
s.dataSource.catalogTable match {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if shouldApplyV2Streaming returns yes, s.dataSource.catalogTable should be defined

case Some(catalogTable) =>
val ident =
Identifier.of(catalogTable.identifier.database.toArray, catalogTable.identifier.table)
val table =
new SparkTable(
ident,
catalogTable,
ScalaUtils.toJavaMap(catalogTable.properties))

// Add a marker to indicate this schema comes from V2 streaming (SparkTable)
// This allows DeltaDataSource.sourceSchema to distinguish between:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DeltaDataSource.sourceSchema is called before this rule

// 1. Schema from SparkTable (validated by Kernel) - can be used directly
// 2. User-provided schema - must go through DeltaLog validation
val optionsWithMarker = new java.util.HashMap[String, String](
s.dataSource.options.size + 1)
s.dataSource.options.asJava.forEach((k, v) => optionsWithMarker.put(k, v))
optionsWithMarker.put(
DeltaV2StreamingUtils.V2_STREAMING_SCHEMA_SOURCE_KEY,
DeltaV2StreamingUtils.V2_STREAMING_SCHEMA_SOURCE_SPARK_TABLE)

StreamingRelationV2(
source = None,
sourceName = "delta",
table = table,
extraOptions = new CaseInsensitiveStringMap(optionsWithMarker),
output = toAttributes(table.schema),
catalog = None,
identifier = Some(ident),
v1Relation = Some(s))

case None =>
// This should never happen due to shouldApplyV2Streaming check, but be defensive
s
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.delta.sql

import org.apache.spark.sql.SparkSessionExtensions
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule

Expand Down Expand Up @@ -69,6 +70,20 @@ import org.apache.spark.sql.catalyst.rules.Rule
*/
class DeltaSparkSessionExtension extends AbstractDeltaSparkSessionExtension {

override def apply(extensions: SparkSessionExtensions): Unit = {
// First register all the base Delta rules from the V1 implementation.
super.apply(extensions)

// Register a post-hoc resolution rule that rewrites V1 StreamingRelation plans that
// read Delta tables into V2 StreamingRelationV2 plans backed by SparkTable.
//
// NOTE: This rule is functional (not a placeholder). Binary compatibility concerns are
// handled separately via the nested NoOpRule class below (kept for MiMa).
extensions.injectResolutionRule { session =>
new ApplyV2Streaming(session)
}
}

/**
* NoOpRule for binary compatibility with Delta 3.3.0
* This class must remain here to satisfy MiMa checks
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package org.apache.spark.sql.delta.sources

import java.util.Locale

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.util.{Failure, Success, Try}
Expand Down Expand Up @@ -111,6 +113,14 @@ class DeltaDataSource
schema: Option[StructType],
providerName: String,
parameters: Map[String, String]): (String, StructType) = {
// Only bypass schema loading for catalog-managed tables in V2 streaming mode.
// When V2_ENABLE_MODE is AUTO or STRICT, the schema comes from SparkTable (Kernel)
// which has already validated the table. For NONE mode, we must go through the
// normal DeltaLog schema loading path to ensure proper validation.
if (schema.isDefined &&
shouldUseProvidedSchemaForStreaming(sqlContext.sparkSession, parameters)) {
return (shortName(), schema.get)
}
val path = parameters.getOrElse("path", {
throw DeltaErrors.pathNotSpecifiedException
})
Expand Down Expand Up @@ -159,6 +169,55 @@ class DeltaDataSource
}
}

/**
* Determines whether to use the provided schema for streaming queries.
*
* NOTE: This method is only called from `sourceSchema`, which is part of the
* StreamSourceProvider interface, so we are guaranteed to be in a streaming context.
*
* The check determines which streaming path we're in:
* - STRICT mode: DeltaCatalog always returns SparkTable for catalog tables. If we have a schema
* and catalogTableOpt is defined, it came from SparkTable/Kernel via DeltaCatalog, so trust it.
* - AUTO mode: Schema comes from SparkTable (Kernel) via ApplyV2Streaming rule for Unity Catalog
* tables. Check marker (in parameters) to ensure it's not user-provided.
* - NONE mode: V1 streaming path, must load schema via DeltaLog for validation.
*
* @param parameters The streaming options, which may contain a marker set by ApplyV2Streaming
* @return true if we should use the provided schema (V2 path with SparkTable), false if we
* should load it from DeltaLog (V1 path or user-provided schema)
*/
private def shouldUseProvidedSchemaForStreaming(
spark: SparkSession,
parameters: Map[String, String]): Boolean = {
// NOTE: DeltaSQLConfV2 lives in kernel-spark, which sparkV1 cannot depend on. Use the shared
// key constant from DeltaSQLConfUtils instead.
val mode = spark.conf.get(DeltaSQLConf.V2_ENABLE_MODE_KEY, "NONE")

// Check if this schema came from SparkTable (V2 streaming) via ApplyV2Streaming
val isV2StreamingSchema =
parameters
.get(DeltaV2StreamingUtils.V2_STREAMING_SCHEMA_SOURCE_KEY)
.contains(DeltaV2StreamingUtils.V2_STREAMING_SCHEMA_SOURCE_SPARK_TABLE)

mode.toUpperCase(Locale.ROOT) match {
case "NONE" =>
// V1 streaming: must load schema via DeltaLog
false
case "STRICT" =>
// In STRICT mode, DeltaCatalog always returns SparkTable for catalog tables
// If we have a schema and a catalog table, it came from SparkTable, so trust it
// For path-based tables, be conservative and reload
catalogTableOpt.isDefined
case "AUTO" =>
// V2 streaming for UC tables: only use provided schema if marker is set
// This distinguishes SparkTable schema from user-provided schema
isV2StreamingSchema
case _ =>
// Unknown mode: be conservative, use DeltaLog
false
}
}

override def createSource(
sqlContext: SQLContext,
metadataPath: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,16 @@ trait DeltaSQLConfUtils {
def buildConf(key: String): ConfigBuilder = SQLConf.buildConf(s"$SQL_CONF_PREFIX.$key")
def buildStaticConf(key: String): ConfigBuilder =
SQLConf.buildStaticConf(s"spark.databricks.delta.$key")

/**
* Canonical SQLConf key for Delta V2 enable mode.
*
* This constant is shared across spark and kernel modules to avoid repeating the literal
* "spark.databricks.delta.v2.enableMode" in multiple places.
*
* NOTE: The ConfigEntry itself is defined in kernel-spark (DeltaSQLConfV2).
*/
final val V2_ENABLE_MODE_KEY: String = s"$SQL_CONF_PREFIX.v2.enableMode"
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright (2025) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta.sources

/**
* Utilities/constants for Delta's V2 streaming path integration.
*
* This is defined in sparkV1 so it can be referenced from both:
* - spark (DeltaDataSource / V1 streaming APIs)
* - spark-unified (ApplyV2Streaming rule)
*/
object DeltaV2StreamingUtils {
/** Marker option key injected by ApplyV2Streaming into streaming options. */
final val V2_STREAMING_SCHEMA_SOURCE_KEY: String = "__v2StreamingSchemaSource"

/** Marker option value indicating the schema originated from kernel SparkTable. */
final val V2_STREAMING_SCHEMA_SOURCE_SPARK_TABLE: String = "SparkTable"
}
Loading
Loading