diff --git a/kernel-spark/src/main/java/io/delta/kernel/spark/utils/CatalogTableUtils.java b/kernel-spark/src/main/java/io/delta/kernel/spark/utils/CatalogTableUtils.java index 9c56003476c..9b6f774e0b4 100644 --- a/kernel-spark/src/main/java/io/delta/kernel/spark/utils/CatalogTableUtils.java +++ b/kernel-spark/src/main/java/io/delta/kernel/spark/utils/CatalogTableUtils.java @@ -56,6 +56,13 @@ public final class CatalogTableUtils { private CatalogTableUtils() {} + /** + * Matches delta-spark's test-mode signal (see org.apache.spark.sql.delta.util.Utils.isTesting). + */ + private static boolean isTesting() { + return System.getenv("DELTA_TESTING") != null; + } + /** * Checks whether any catalog manages this table via CCv2 semantics. * @@ -65,6 +72,12 @@ private CatalogTableUtils() {} public static boolean isCatalogManaged(CatalogTable table) { requireNonNull(table, "table is null"); Map storageProperties = getStorageProperties(table); + // Test-only escape hatch used by delta-spark suites to simulate Unity Catalog semantics + // without requiring a real commit coordinator / CCv2 table feature wiring. + // This should never be set in production catalogs. + if (isTesting() && storageProperties.containsKey("test.simulateUC")) { + return true; + } return isCatalogManagedFeatureEnabled(storageProperties, FEATURE_CATALOG_MANAGED) || isCatalogManagedFeatureEnabled(storageProperties, FEATURE_CATALOG_OWNED_PREVIEW); } diff --git a/kernel-spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConfV2.scala b/kernel-spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConfV2.scala index f42f1a44048..7e947f4c417 100644 --- a/kernel-spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConfV2.scala +++ b/kernel-spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConfV2.scala @@ -26,6 +26,8 @@ object DeltaSQLConfV2 extends DeltaSQLConfUtils { * * Valid values: * - NONE: V2 connector is disabled, always use V1 connector (DeltaTableV2) - default + * - AUTO: Automatically use V2 connector (SparkTable) for Unity Catalog managed tables + * in streaming queries and V1 connector (DeltaTableV2) for all other tables * - STRICT: V2 connector is strictly enforced, always use V2 connector (Kernel SparkTable). * Intended for testing V2 connector capabilities * @@ -39,9 +41,10 @@ object DeltaSQLConfV2 extends DeltaSQLConfUtils { buildConf("v2.enableMode") .doc( "Controls the Delta V2 connector enable mode. " + - "Valid values: NONE (disabled, default), STRICT (should ONLY be enabled for testing).") + "Valid values: NONE (disabled, default), AUTO (use V2 for Unity Catalog managed tables), " + + "STRICT (should ONLY be enabled for testing).") .stringConf - .checkValues(Set("NONE", "STRICT")) + .checkValues(Set("AUTO", "NONE", "STRICT")) .createWithDefault("NONE") } diff --git a/spark-unified/src/main/scala/io/delta/sql/ApplyV2Streaming.scala b/spark-unified/src/main/scala/io/delta/sql/ApplyV2Streaming.scala new file mode 100644 index 00000000000..db470c0d941 --- /dev/null +++ b/spark-unified/src/main/scala/io/delta/sql/ApplyV2Streaming.scala @@ -0,0 +1,135 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.sql + +import scala.jdk.CollectionConverters._ + +import io.delta.kernel.spark.catalog.SparkTable +import io.delta.kernel.spark.utils.{CatalogTableUtils, ScalaUtils} +import org.apache.spark.sql.delta.sources.{DeltaSQLConfV2, DeltaSourceUtils} +import org.apache.spark.sql.delta.sources.DeltaV2StreamingUtils + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2 +import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes +import org.apache.spark.sql.connector.catalog.Identifier +import org.apache.spark.sql.execution.streaming.StreamingRelation +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +/** + * Rule for applying the V2 streaming path by rewriting V1 StreamingRelation + * with Delta DataSource to StreamingRelationV2 with SparkTable. + * + * This rule handles the case where Spark's FindDataSourceTable rule has converted + * a StreamingRelationV2 (with DeltaTableV2) back to a StreamingRelation because + * DeltaTableV2 doesn't advertise STREAMING_READ capability. We convert it back to + * StreamingRelationV2 with SparkTable (from kernel-spark) which does support streaming. + * + * Behavior based on spark.databricks.delta.v2.enableMode: + * - AUTO (default): Only applies to Unity Catalog managed tables + * - STRICT: Applies to all Delta tables (for testing V2 streaming) + * - NONE: Rule is disabled, no conversion happens + * + * @param session The Spark session for configuration access + */ +class ApplyV2Streaming( + @transient private val session: SparkSession) + extends Rule[LogicalPlan] { + + private def isDeltaStreamingRelation(s: StreamingRelation): Boolean = { + // Check if this is a Delta streaming relation by examining: + // 1. The source name (e.g., "delta" from .format("delta")) + // 2. The catalog table's provider (e.g., "DELTA" from Unity Catalog) + // 3. Whether the table is a Unity Catalog managed table + s.dataSource.catalogTable match { + case Some(catalogTable) => + DeltaSourceUtils.isDeltaDataSourceName(s.sourceName) || + catalogTable.provider.exists(DeltaSourceUtils.isDeltaDataSourceName) || + CatalogTableUtils.isUnityCatalogManagedTable(catalogTable) + case None => false + } + } + + private def shouldApplyV2Streaming(s: StreamingRelation): Boolean = { + if (!isDeltaStreamingRelation(s)) { + return false + } + + val mode = session.conf.get( + DeltaSQLConfV2.V2_ENABLE_MODE.key, + DeltaSQLConfV2.V2_ENABLE_MODE.defaultValueString) + + // scalastyle:off caselocale + mode.toUpperCase match { + // scalastyle:on caselocale + case "STRICT" => + // Always apply V2 streaming for all Delta tables + true + case "AUTO" => + // Only apply for Unity Catalog managed tables + // catalogTable is guaranteed to be Some because isDeltaStreamingRelation checked it + s.dataSource.catalogTable.exists(CatalogTableUtils.isUnityCatalogManagedTable) + case "NONE" | _ => + // V2 streaming disabled + false + } + } + + override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators { + case s: StreamingRelation if shouldApplyV2Streaming(s) => + // catalogTable is guaranteed to be defined because shouldApplyV2Streaming checks it + // via isDeltaStreamingRelation, but we use pattern matching for safety + s.dataSource.catalogTable match { + case Some(catalogTable) => + val ident = + Identifier.of(catalogTable.identifier.database.toArray, catalogTable.identifier.table) + val table = + new SparkTable( + ident, + catalogTable, + ScalaUtils.toJavaMap(catalogTable.properties)) + + // Add a marker to indicate this schema comes from V2 streaming (SparkTable) + // This allows DeltaDataSource.sourceSchema to distinguish between: + // 1. Schema from SparkTable (validated by Kernel) - can be used directly + // 2. User-provided schema - must go through DeltaLog validation + val optionsWithMarker = new java.util.HashMap[String, String]( + s.dataSource.options.size + 1) + s.dataSource.options.asJava.forEach((k, v) => optionsWithMarker.put(k, v)) + optionsWithMarker.put( + DeltaV2StreamingUtils.V2_STREAMING_SCHEMA_SOURCE_KEY, + DeltaV2StreamingUtils.V2_STREAMING_SCHEMA_SOURCE_SPARK_TABLE) + + StreamingRelationV2( + source = None, + sourceName = "delta", + table = table, + extraOptions = new CaseInsensitiveStringMap(optionsWithMarker), + output = toAttributes(table.schema), + catalog = None, + identifier = Some(ident), + v1Relation = Some(s)) + + case None => + // This should never happen due to shouldApplyV2Streaming check, but be defensive + s + } + } +} + diff --git a/spark-unified/src/main/scala/io/delta/sql/DeltaSparkSessionExtension.scala b/spark-unified/src/main/scala/io/delta/sql/DeltaSparkSessionExtension.scala index dfd7028150b..ba13c28a786 100644 --- a/spark-unified/src/main/scala/io/delta/sql/DeltaSparkSessionExtension.scala +++ b/spark-unified/src/main/scala/io/delta/sql/DeltaSparkSessionExtension.scala @@ -16,6 +16,7 @@ package io.delta.sql +import org.apache.spark.sql.SparkSessionExtensions import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.rules.Rule @@ -69,6 +70,20 @@ import org.apache.spark.sql.catalyst.rules.Rule */ class DeltaSparkSessionExtension extends AbstractDeltaSparkSessionExtension { + override def apply(extensions: SparkSessionExtensions): Unit = { + // First register all the base Delta rules from the V1 implementation. + super.apply(extensions) + + // Register a post-hoc resolution rule that rewrites V1 StreamingRelation plans that + // read Delta tables into V2 StreamingRelationV2 plans backed by SparkTable. + // + // NOTE: This rule is functional (not a placeholder). Binary compatibility concerns are + // handled separately via the nested NoOpRule class below (kept for MiMa). + extensions.injectResolutionRule { session => + new ApplyV2Streaming(session) + } + } + /** * NoOpRule for binary compatibility with Delta 3.3.0 * This class must remain here to satisfy MiMa checks diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaDataSource.scala b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaDataSource.scala index 6ab2408042c..971a3d47c97 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaDataSource.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaDataSource.scala @@ -16,6 +16,8 @@ package org.apache.spark.sql.delta.sources +import java.util.Locale + import scala.collection.JavaConverters._ import scala.collection.mutable import scala.util.{Failure, Success, Try} @@ -111,6 +113,14 @@ class DeltaDataSource schema: Option[StructType], providerName: String, parameters: Map[String, String]): (String, StructType) = { + // Only bypass schema loading for catalog-managed tables in V2 streaming mode. + // When V2_ENABLE_MODE is AUTO or STRICT, the schema comes from SparkTable (Kernel) + // which has already validated the table. For NONE mode, we must go through the + // normal DeltaLog schema loading path to ensure proper validation. + if (schema.isDefined && + shouldUseProvidedSchemaForStreaming(sqlContext.sparkSession, parameters)) { + return (shortName(), schema.get) + } val path = parameters.getOrElse("path", { throw DeltaErrors.pathNotSpecifiedException }) @@ -159,6 +169,55 @@ class DeltaDataSource } } + /** + * Determines whether to use the provided schema for streaming queries. + * + * NOTE: This method is only called from `sourceSchema`, which is part of the + * StreamSourceProvider interface, so we are guaranteed to be in a streaming context. + * + * The check determines which streaming path we're in: + * - STRICT mode: DeltaCatalog always returns SparkTable for catalog tables. If we have a schema + * and catalogTableOpt is defined, it came from SparkTable/Kernel via DeltaCatalog, so trust it. + * - AUTO mode: Schema comes from SparkTable (Kernel) via ApplyV2Streaming rule for Unity Catalog + * tables. Check marker (in parameters) to ensure it's not user-provided. + * - NONE mode: V1 streaming path, must load schema via DeltaLog for validation. + * + * @param parameters The streaming options, which may contain a marker set by ApplyV2Streaming + * @return true if we should use the provided schema (V2 path with SparkTable), false if we + * should load it from DeltaLog (V1 path or user-provided schema) + */ + private def shouldUseProvidedSchemaForStreaming( + spark: SparkSession, + parameters: Map[String, String]): Boolean = { + // NOTE: DeltaSQLConfV2 lives in kernel-spark, which sparkV1 cannot depend on. Use the shared + // key constant from DeltaSQLConfUtils instead. + val mode = spark.conf.get(DeltaSQLConf.V2_ENABLE_MODE_KEY, "NONE") + + // Check if this schema came from SparkTable (V2 streaming) via ApplyV2Streaming + val isV2StreamingSchema = + parameters + .get(DeltaV2StreamingUtils.V2_STREAMING_SCHEMA_SOURCE_KEY) + .contains(DeltaV2StreamingUtils.V2_STREAMING_SCHEMA_SOURCE_SPARK_TABLE) + + mode.toUpperCase(Locale.ROOT) match { + case "NONE" => + // V1 streaming: must load schema via DeltaLog + false + case "STRICT" => + // In STRICT mode, DeltaCatalog always returns SparkTable for catalog tables + // If we have a schema and a catalog table, it came from SparkTable, so trust it + // For path-based tables, be conservative and reload + catalogTableOpt.isDefined + case "AUTO" => + // V2 streaming for UC tables: only use provided schema if marker is set + // This distinguishes SparkTable schema from user-provided schema + isV2StreamingSchema + case _ => + // Unknown mode: be conservative, use DeltaLog + false + } + } + override def createSource( sqlContext: SQLContext, metadataPath: String, diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala index 9cd31e74458..fe7c2e3aab6 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala @@ -39,6 +39,16 @@ trait DeltaSQLConfUtils { def buildConf(key: String): ConfigBuilder = SQLConf.buildConf(s"$SQL_CONF_PREFIX.$key") def buildStaticConf(key: String): ConfigBuilder = SQLConf.buildStaticConf(s"spark.databricks.delta.$key") + + /** + * Canonical SQLConf key for Delta V2 enable mode. + * + * This constant is shared across spark and kernel modules to avoid repeating the literal + * "spark.databricks.delta.v2.enableMode" in multiple places. + * + * NOTE: The ConfigEntry itself is defined in kernel-spark (DeltaSQLConfV2). + */ + final val V2_ENABLE_MODE_KEY: String = s"$SQL_CONF_PREFIX.v2.enableMode" } /** diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaV2StreamingUtils.scala b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaV2StreamingUtils.scala new file mode 100644 index 00000000000..9b48b3dabfe --- /dev/null +++ b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaV2StreamingUtils.scala @@ -0,0 +1,32 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.sources + +/** + * Utilities/constants for Delta's V2 streaming path integration. + * + * This is defined in sparkV1 so it can be referenced from both: + * - spark (DeltaDataSource / V1 streaming APIs) + * - spark-unified (ApplyV2Streaming rule) + */ +object DeltaV2StreamingUtils { + /** Marker option key injected by ApplyV2Streaming into streaming options. */ + final val V2_STREAMING_SCHEMA_SOURCE_KEY: String = "__v2StreamingSchemaSource" + + /** Marker option value indicating the schema originated from kernel SparkTable. */ + final val V2_STREAMING_SCHEMA_SOURCE_SPARK_TABLE: String = "SparkTable" +} diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/V2StreamingConversionSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/V2StreamingConversionSuite.scala new file mode 100644 index 00000000000..44e21f8557c --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/sql/delta/V2StreamingConversionSuite.scala @@ -0,0 +1,288 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta + +import org.apache.spark.sql.delta.sources.{DeltaDataSource, DeltaSQLConfV2, DeltaV2StreamingUtils} +import org.apache.spark.sql.delta.test.{DeltaSQLCommandTest, DeltaSQLTestUtils} +import org.apache.spark.sql.delta.test.UCTableInjectingSessionCatalog + +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2 +import org.apache.spark.sql.execution.streaming.StreamingRelation +import org.apache.spark.sql.streaming.StreamTest +import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} +import org.scalatest.exceptions.TestFailedException + +/** + * Test suite for V2 streaming conversion logic via ApplyV2Streaming rule. + * Tests the interaction between V2_ENABLE_MODE configuration and streaming queries. + */ +class V2StreamingConversionSuite + extends StreamTest + with DeltaSQLTestUtils + with DeltaSQLCommandTest { + + import testImplicits._ + + /** + * Helper to check if a DataFrame's logical plan uses StreamingRelationV2 (V2 path). + */ + private def usesV2Streaming(df: org.apache.spark.sql.DataFrame): Boolean = { + df.queryExecution.analyzed.collectFirst { + case _: StreamingRelationV2 => true + }.isDefined + } + + /** + * Helper to check if a DataFrame's logical plan uses StreamingRelation (V1 path). + */ + private def usesV1Streaming(df: org.apache.spark.sql.DataFrame): Boolean = { + df.queryExecution.analyzed.collectFirst { + case _: StreamingRelation => true + }.isDefined + } + + // ============================================================================ + // AUTO Mode Tests + // ============================================================================ + + test("AUTO mode: UC-managed table via test catalog runs V2 streaming (expected to fail today)") { + // TODO: Enable this as a passing end-to-end test once the kernel-backed streaming source + // supports initialOffset with an initial snapshot (SparkMicroBatchStream limitation). + withTempDir { dir => + val path = dir.getCanonicalPath + // Create a real table in the metastore first, under the default catalog setup. + // We use LOCATION to avoid relying on managed-location behaviors of the test catalog. + sql("DROP TABLE IF EXISTS uc_table") + sql(s"CREATE TABLE uc_table (id LONG) USING delta LOCATION '$path'") + sql("INSERT INTO uc_table VALUES (1)") + + // Reset catalog manager so the new `spark_catalog` implementation can apply. + spark.sessionState.catalogManager.reset() + try { + withSQLConf( + "spark.sql.catalog.spark_catalog" -> classOf[UCTableInjectingSessionCatalog].getName, + DeltaSQLConfV2.V2_ENABLE_MODE.key -> "AUTO") { + // Reset again to force catalog re-initialization with updated configs. + spark.sessionState.catalogManager.reset() + + val df = spark.readStream.table("uc_table") + assert(usesV2Streaming(df), + "AUTO mode should use V2 streaming for UC-managed tables (via test catalog)") + + val e = intercept[TestFailedException] { + testStream(df)(ProcessAllAvailable()) + } + assert( + e.getMessage.contains("initialOffset with initial snapshot is not supported yet"), + s"Unexpected failure; message was: ${e.getMessage}" + ) + } + } finally { + spark.sessionState.catalogManager.reset() + sql("DROP TABLE IF EXISTS uc_table") + } + } + } + + test("AUTO mode: non-UC catalog table uses V1 streaming (DeltaLog)") { + withSQLConf(DeltaSQLConfV2.V2_ENABLE_MODE.key -> "AUTO") { + withTable("regular_table") { + // Create regular catalog table (no UC properties) + sql("CREATE TABLE regular_table (id INT, value STRING) USING delta") + sql("INSERT INTO regular_table VALUES (0, 'init')") + + // Create streaming read + val df = spark.readStream.table("regular_table") + + // Verify V1 streaming is used (no UC table ID) + assert(usesV1Streaming(df), + "AUTO mode should use V1 streaming (StreamingRelation) for non-UC catalog tables") + + // Verify streaming works correctly + testStream(df)( + Execute { _ => + sql("INSERT INTO regular_table VALUES (1, 'a')") + }, + ProcessAllAvailable(), + CheckAnswer((0, "init"), (1, "a")) + ) + } + } + } + + test("AUTO mode: path-based table uses V1 streaming") { + withSQLConf(DeltaSQLConfV2.V2_ENABLE_MODE.key -> "AUTO") { + withTempDir { dir => + val path = dir.getCanonicalPath + + // Create path-based table + spark.range(3).selectExpr("id", "CAST(id AS STRING) as value") + .write.format("delta").save(path) + + // Create streaming read from path + val df = spark.readStream + .format("delta") + .load(path) + + // Verify V1 streaming is used (no catalog table) + assert(usesV1Streaming(df), + "AUTO mode should use V1 streaming for path-based tables") + + // Verify streaming works + testStream(df)( + Execute { _ => + spark.range(3, 5).selectExpr("id", "CAST(id AS STRING) as value") + .write.format("delta").mode("append").save(path) + }, + ProcessAllAvailable(), + CheckAnswer((0, "0"), (1, "1"), (2, "2"), (3, "3"), (4, "4")) + ) + } + } + } + + // ============================================================================ + // NONE Mode Tests (Default behavior) + // ============================================================================ + + test("NONE mode: all tables use V1 streaming") { + withSQLConf(DeltaSQLConfV2.V2_ENABLE_MODE.key -> "NONE") { + withTable("test_table") { + sql("CREATE TABLE test_table (id INT) USING delta") + sql("INSERT INTO test_table VALUES (1)") + + val df = spark.readStream.table("test_table") + + assert(usesV1Streaming(df), + "NONE mode should use V1 streaming for all tables") + + testStream(df)( + Execute { _ => + sql("INSERT INTO test_table VALUES (2)") + }, + ProcessAllAvailable(), + CheckAnswer(1, 2) + ) + } + } + } + + test("NONE mode: UC-managed table via test catalog is not rewritten to V2 streaming") { + withTempDir { dir => + val path = dir.getCanonicalPath + sql("DROP TABLE IF EXISTS uc_table") + sql(s"CREATE TABLE uc_table (id INT) USING delta LOCATION '$path'") + sql("INSERT INTO uc_table VALUES (1)") + + spark.sessionState.catalogManager.reset() + try { + withSQLConf( + "spark.sql.catalog.spark_catalog" -> classOf[UCTableInjectingSessionCatalog].getName, + DeltaSQLConfV2.V2_ENABLE_MODE.key -> "NONE") { + spark.sessionState.catalogManager.reset() + + val df = spark.readStream.table("uc_table") + assert(usesV1Streaming(df), + "NONE mode should not rewrite to V2 streaming even for UC-managed tables") + + // Still validate the source executes as V1 streaming. + testStream(df)( + Execute { _ => + sql("INSERT INTO uc_table VALUES (2)") + }, + ProcessAllAvailable(), + CheckAnswer(1, 2) + ) + } + } finally { + spark.sessionState.catalogManager.reset() + sql("DROP TABLE IF EXISTS uc_table") + } + } + } + + // ============================================================================ + // Schema Bypass Logic Tests + // ============================================================================ + + test("sourceSchema: AUTO mode without marker loads schema via DeltaLog") { + withSQLConf(DeltaSQLConfV2.V2_ENABLE_MODE.key -> "AUTO") { + withTable("test_table") { + sql("CREATE TABLE test_table (id INT, value STRING) USING delta") + + val catalogTable = spark.sessionState.catalog.getTableMetadata( + TableIdentifier("test_table")) + + val dataSource = new DeltaDataSource() + dataSource.setCatalogTableOpt(Some(catalogTable)) + + // Provide schema without V2 marker + val providedSchema = StructType(Seq( + StructField("id", IntegerType), + StructField("value", StringType) + )) + + val (shortName, returnedSchema) = dataSource.sourceSchema( + spark.sqlContext, + Some(providedSchema), + "delta", + Map("path" -> catalogTable.location.toString) // No __v2StreamingSchemaSource marker + ) + + // Without marker, AUTO mode should load via DeltaLog + // The returned schema should match the actual table schema + assert(returnedSchema.fieldNames.contains("id")) + assert(returnedSchema.fieldNames.contains("value")) + } + } + } + + test("sourceSchema: AUTO mode with V2 marker uses provided schema") { + withSQLConf(DeltaSQLConfV2.V2_ENABLE_MODE.key -> "AUTO") { + withTable("uc_table") { + sql("CREATE TABLE uc_table (id INT, value STRING) USING delta") + + val catalogTable = spark.sessionState.catalog.getTableMetadata( + TableIdentifier("uc_table")) + + val dataSource = new DeltaDataSource() + dataSource.setCatalogTableOpt(Some(catalogTable)) + + val providedSchema = StructType(Seq( + StructField("id", IntegerType), + StructField("value", StringType) + )) + + // Call sourceSchema WITH the V2 marker + val (shortName, returnedSchema) = dataSource.sourceSchema( + spark.sqlContext, + Some(providedSchema), + "delta", + Map( + DeltaV2StreamingUtils.V2_STREAMING_SCHEMA_SOURCE_KEY -> + DeltaV2StreamingUtils.V2_STREAMING_SCHEMA_SOURCE_SPARK_TABLE) // V2 marker present + ) + + // With marker, AUTO mode should use provided schema + assert(returnedSchema == providedSchema, + "AUTO mode with V2 marker should use provided schema") + } + } + } +} + diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/test/CustomCatalogs.scala b/spark/src/test/scala/org/apache/spark/sql/delta/test/CustomCatalogs.scala index 37b50458151..f4bc8d0a0d4 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/test/CustomCatalogs.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/test/CustomCatalogs.scala @@ -138,6 +138,76 @@ class DummySessionCatalogInner extends DelegatingCatalogExtension { } } +/** + * A session-catalog wrapper used in tests to simulate Unity Catalog managed tables by injecting + * UC-related properties into the returned [[CatalogTable.storage.properties]]. + * + * This allows end-to-end plan construction (and streaming query execution attempts) to observe + * UC-managed markers through Spark's normal catalog resolution paths, without manually patching + * CatalogTable objects in tests. + */ +class UCTableInjectingSessionCatalogInner extends DelegatingCatalogExtension { + private val UC_TABLE_NAME = "uc_table" + + override def loadTable(ident: Identifier): Table = { + val t = super.loadTable(ident).asInstanceOf[V1Table] + if (!ident.name().equalsIgnoreCase(UC_TABLE_NAME)) { + return t + } + + V1Table(t.v1Table.copy( + storage = t.v1Table.storage.copy( + properties = t.v1Table.storage.properties ++ Map( + "test.simulateUC" -> "true", + "io.unitycatalog.tableId" -> "test-uc-table-id" + ) + ) + )) + } +} + +/** + * A session catalog implementation that delegates to DeltaCatalog but injects UC markers for + * selected tables (see [[UCTableInjectingSessionCatalogInner]]). + */ +class UCTableInjectingSessionCatalog extends TableCatalog { + private var deltaCatalog: DeltaCatalog = null + + override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = { + val inner = new UCTableInjectingSessionCatalogInner() + inner.setDelegateCatalog(new V2SessionCatalog( + SparkSession.active.sessionState.catalogManager.v1SessionCatalog)) + deltaCatalog = new DeltaCatalog() + deltaCatalog.setDelegateCatalog(inner) + } + + override def name(): String = deltaCatalog.name() + + override def listTables(namespace: Array[String]): Array[Identifier] = { + deltaCatalog.listTables(namespace) + } + + override def loadTable(ident: Identifier): Table = deltaCatalog.loadTable(ident) + + override def createTable( + ident: Identifier, + schema: StructType, + partitions: Array[Transform], + properties: java.util.Map[String, String]): Table = { + deltaCatalog.createTable(ident, schema, partitions, properties) + } + + override def alterTable(ident: Identifier, changes: TableChange*): Table = { + deltaCatalog.alterTable(ident, changes: _*) + } + + override def dropTable(ident: Identifier): Boolean = deltaCatalog.dropTable(ident) + + override def renameTable(oldIdent: Identifier, newIdent: Identifier): Unit = { + deltaCatalog.renameTable(oldIdent, newIdent) + } +} + // A dummy catalog that adds a layer between DeltaCatalog and the Spark SessionCatalog, // to attach additional table storage properties after the table is loaded, and generates location // for managed tables. diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/test/DeltaSQLCommandTest.scala b/spark/src/test/scala/org/apache/spark/sql/delta/test/DeltaSQLCommandTest.scala index cb28a4f7123..366c8d672e1 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/test/DeltaSQLCommandTest.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/test/DeltaSQLCommandTest.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.delta.test import org.apache.spark.sql.delta.catalog.DeltaCatalog +import org.apache.spark.sql.delta.sources.DeltaSQLConfV2 import io.delta.sql.DeltaSparkSessionExtension import org.apache.spark.SparkConf @@ -35,5 +36,12 @@ trait DeltaSQLCommandTest extends SharedSparkSession { classOf[DeltaSparkSessionExtension].getName) .set(SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION.key, classOf[DeltaCatalog].getName) + .set(DeltaSQLConfV2.V2_ENABLE_MODE.key, v2EnableMode) } + + /** + * Override this method in test suites to change the V2 connector enable mode. + * Default is AUTO (use V2 connector for catalog-managed tables). + */ + protected def v2EnableMode: String = "AUTO" }