@@ -21,6 +21,7 @@ import scala.collection.JavaConverters._
2121import io .delta .kernel .spark .table .SparkTable
2222
2323import org .apache .spark .sql .SparkSession
24+ import org .apache .spark .sql .catalyst .catalog .CatalogTable
2425import org .apache .spark .sql .catalyst .plans .logical .LogicalPlan
2526import org .apache .spark .sql .catalyst .rules .Rule
2627import org .apache .spark .sql .catalyst .streaming .StreamingRelationV2
@@ -65,7 +66,8 @@ class UseV2ForStreamingRule(spark: SparkSession) extends Rule[LogicalPlan] {
6566 // Transform StreamingRelation (V1) to StreamingRelationV2 (V2) for catalog-managed tables
6667 plan.resolveOperatorsDown {
6768 case streamingRel @ StreamingRelation (dataSource, sourceName, output)
68- if isCatalogManagedDeltaTable(dataSource) =>
69+ if isCatalogManagedDeltaTable(dataSource) &&
70+ isCatalogOwnedTable(spark, dataSource.catalogTable.get) =>
6971
7072 val catalogTable = dataSource.catalogTable.get
7173 val tableIdent = catalogTable.identifier
@@ -122,4 +124,15 @@ class UseV2ForStreamingRule(spark: SparkSession) extends Rule[LogicalPlan] {
122124 catalogTable.provider.exists(_.equalsIgnoreCase(" delta" ))
123125 }
124126 }
127+
128+ /**
129+ * Check if the table is catalog-owned (CCV2).
130+ */
131+ private def isCatalogOwnedTable (
132+ spark : SparkSession ,
133+ catalogTable : CatalogTable ): Boolean = {
134+ // TODO: Implement actual check for catalog-owned tables
135+ // Currently returns true to enable V2 streaming for all catalog-managed tables
136+ true
137+ }
125138}
0 commit comments