Skip to content

Commit b7ce96c

Browse files
committed
uc client factory
1 parent 36dbdda commit b7ce96c

File tree

3 files changed

+482
-0
lines changed

3 files changed

+482
-0
lines changed

kernel-spark/src/main/java/io/delta/kernel/spark/catalog/SparkTable.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import io.delta.kernel.spark.read.SparkScanBuilder;
2323
import io.delta.kernel.spark.snapshot.DeltaSnapshotManager;
2424
import io.delta.kernel.spark.snapshot.PathBasedSnapshotManager;
25+
import io.delta.kernel.spark.unity.UnityCatalogClientFactory;
2526
import io.delta.kernel.spark.utils.SchemaUtils;
2627
import java.util.*;
2728
import org.apache.hadoop.conf.Configuration;
@@ -58,6 +59,7 @@ public class SparkTable implements Table, SupportsRead {
5859
private final Column[] columns;
5960
private final Transform[] partitionTransforms;
6061
private final Optional<CatalogTable> catalogTable;
62+
private final Optional<UnityCatalogClientFactory.UnityCatalogClient> unityCatalogClient;
6163

6264
/**
6365
* Creates a SparkTable from a filesystem path without a catalog table.
@@ -118,6 +120,9 @@ private SparkTable(
118120
Optional<CatalogTable> catalogTable) {
119121
this.identifier = requireNonNull(identifier, "identifier is null");
120122
this.catalogTable = catalogTable;
123+
this.unityCatalogClient =
124+
catalogTable.flatMap(
125+
table -> UnityCatalogClientFactory.create(SparkSession.active(), identifier, table));
121126
// Merge options: file system options from catalog + user options (user takes precedence)
122127
// This follows the same pattern as DeltaTableV2 in delta-spark
123128
Map<String, String> merged = new HashMap<>();
@@ -197,6 +202,10 @@ public Optional<CatalogTable> getCatalogTable() {
197202
return catalogTable;
198203
}
199204

205+
public Optional<UnityCatalogClientFactory.UnityCatalogClient> getUnityCatalogClient() {
206+
return unityCatalogClient;
207+
}
208+
200209
@Override
201210
public String name() {
202211
return identifier.name();
Lines changed: 278 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,278 @@
1+
package io.delta.kernel.spark.unity;
2+
3+
import static java.util.Objects.requireNonNull;
4+
5+
import io.delta.kernel.spark.utils.CatalogTableUtils;
6+
import io.delta.kernel.spark.utils.ScalaUtils;
7+
import io.delta.storage.commit.uccommitcoordinator.UCClient;
8+
import io.delta.storage.commit.uccommitcoordinator.UCTokenBasedRestClient;
9+
import java.net.URI;
10+
import java.net.URISyntaxException;
11+
import java.util.ArrayList;
12+
import java.util.Collections;
13+
import java.util.HashMap;
14+
import java.util.List;
15+
import java.util.Locale;
16+
import java.util.Map;
17+
import java.util.Objects;
18+
import java.util.Optional;
19+
import java.util.function.BiFunction;
20+
import org.apache.spark.sql.SparkSession;
21+
import org.apache.spark.sql.catalyst.catalog.CatalogTable;
22+
import org.apache.spark.sql.connector.catalog.Identifier;
23+
24+
/**
25+
* Factory for constructing Unity Catalog clients from Spark session catalog configuration.
26+
*
27+
* <p>The logic mirrors the config resolution performed by {@code
28+
* org.apache.spark.sql.delta.coordinatedcommits.UCCommitCoordinatorBuilder}, ensuring the connector
29+
* honours the same semantics across V1 and Kernel-backed paths.
30+
*/
31+
public final class UnityCatalogClientFactory {
32+
33+
private static final String SPARK_SQL_CATALOG_PREFIX = "spark.sql.catalog.";
34+
private static final String URI_SUFFIX = "uri";
35+
private static final String TOKEN_SUFFIX = "token";
36+
private static final String WAREHOUSE_SUFFIX = "warehouse";
37+
private static final String UNITY_CATALOG_CONNECTOR_CLASS =
38+
"io.unitycatalog.spark.UCSingleCatalog";
39+
40+
private UnityCatalogClientFactory() {}
41+
42+
private static volatile BiFunction<String, String, UCClient> clientBuilder =
43+
UCTokenBasedRestClient::new;
44+
45+
/**
46+
* Creates a Unity Catalog client for the provided catalog table if it is Unity Catalog managed.
47+
*
48+
* @param spark active Spark session
49+
* @param identifier table identifier supplied by Spark DSv2
50+
* @param catalogTable catalog metadata for the table
51+
* @return optional Unity Catalog client details; empty when the table is not UC managed
52+
*/
53+
public static Optional<UnityCatalogClient> create(
54+
SparkSession spark, Identifier identifier, CatalogTable catalogTable) {
55+
requireNonNull(spark, "spark session is null");
56+
requireNonNull(identifier, "identifier is null");
57+
requireNonNull(catalogTable, "catalogTable is null");
58+
59+
if (!CatalogTableUtils.isUnityCatalogManagedTable(catalogTable)) {
60+
return Optional.empty();
61+
}
62+
63+
Map<String, String> sparkConf = ScalaUtils.toJavaMap(spark.conf().getAll());
64+
65+
List<CatalogConfig> unityCatalogs = collectUnityCatalogConfigs(sparkConf);
66+
if (unityCatalogs.isEmpty()) {
67+
throw new IllegalStateException(
68+
"Unity Catalog table detected but no Unity Catalog connectors are configured.");
69+
}
70+
71+
Optional<String> requestedCatalogName = extractCatalogName(identifier);
72+
CatalogConfig selectedCatalog =
73+
selectCatalogConfig(requestedCatalogName, unityCatalogs, identifier.name());
74+
75+
UCClient ucClient = clientBuilder.apply(selectedCatalog.uri, selectedCatalog.token);
76+
UnityCatalogClient clientHandle =
77+
new UnityCatalogClient(selectedCatalog.name, ucClient, selectedCatalog.warehouse);
78+
return Optional.of(clientHandle);
79+
}
80+
81+
private static CatalogConfig selectCatalogConfig(
82+
Optional<String> requestedCatalogName, List<CatalogConfig> unityCatalogs, String tableName) {
83+
if (requestedCatalogName.isPresent()) {
84+
Optional<CatalogConfig> match =
85+
unityCatalogs.stream()
86+
.filter(config -> config.name.equalsIgnoreCase(requestedCatalogName.get()))
87+
.findFirst();
88+
if (match.isPresent()) {
89+
return match.get();
90+
}
91+
if (unityCatalogs.size() == 1) {
92+
return unityCatalogs.get(0);
93+
}
94+
throw new IllegalStateException(
95+
String.format(
96+
Locale.ROOT,
97+
"Unable to locate Unity Catalog connector '%s' for table '%s'.",
98+
requestedCatalogName.get(),
99+
tableName));
100+
}
101+
102+
if (unityCatalogs.size() == 1) {
103+
return unityCatalogs.get(0);
104+
}
105+
106+
throw new IllegalStateException(
107+
String.format(
108+
Locale.ROOT,
109+
"Multiple Unity Catalog connectors configured (%s) but table '%s' does not carry a "
110+
+ "catalog-qualified identifier.",
111+
listCatalogNames(unityCatalogs),
112+
tableName));
113+
}
114+
115+
private static List<CatalogConfig> collectUnityCatalogConfigs(Map<String, String> sparkConf) {
116+
Map<String, CatalogProperties> catalogProperties = new HashMap<>();
117+
118+
for (Map.Entry<String, String> entry : sparkConf.entrySet()) {
119+
String key = entry.getKey();
120+
if (!key.startsWith(SPARK_SQL_CATALOG_PREFIX)) {
121+
continue;
122+
}
123+
String remainder = key.substring(SPARK_SQL_CATALOG_PREFIX.length());
124+
int dotIndex = remainder.indexOf('.');
125+
String catalogName;
126+
String propertyKey = null;
127+
if (dotIndex == -1) {
128+
catalogName = remainder;
129+
} else {
130+
catalogName = remainder.substring(0, dotIndex);
131+
propertyKey = remainder.substring(dotIndex + 1);
132+
}
133+
134+
CatalogProperties properties =
135+
catalogProperties.computeIfAbsent(catalogName, CatalogProperties::new);
136+
if (propertyKey == null) {
137+
properties.connectorClass = entry.getValue();
138+
} else if (URI_SUFFIX.equals(propertyKey)) {
139+
properties.uri = entry.getValue();
140+
} else if (TOKEN_SUFFIX.equals(propertyKey)) {
141+
properties.token = entry.getValue();
142+
} else if (WAREHOUSE_SUFFIX.equals(propertyKey)) {
143+
properties.warehouse = Optional.ofNullable(entry.getValue()).filter(v -> !v.isEmpty());
144+
}
145+
}
146+
147+
List<CatalogConfig> unityCatalogs = new ArrayList<>();
148+
for (CatalogProperties properties : catalogProperties.values()) {
149+
if (!UNITY_CATALOG_CONNECTOR_CLASS.equals(properties.connectorClass)) {
150+
continue;
151+
}
152+
153+
String uri = requireTrimmed(properties.uri, properties.name, URI_SUFFIX);
154+
try {
155+
new URI(uri);
156+
} catch (URISyntaxException e) {
157+
throw new IllegalStateException(
158+
String.format(
159+
Locale.ROOT,
160+
"Invalid Unity Catalog URI '%s' configured for catalog '%s'.",
161+
uri,
162+
properties.name),
163+
e);
164+
}
165+
166+
String token = requireTrimmed(properties.token, properties.name, TOKEN_SUFFIX);
167+
unityCatalogs.add(new CatalogConfig(properties.name, uri, token, properties.warehouse));
168+
}
169+
170+
return unityCatalogs;
171+
}
172+
173+
private static String listCatalogNames(List<CatalogConfig> configs) {
174+
List<String> names = new ArrayList<>(configs.size());
175+
for (CatalogConfig config : configs) {
176+
names.add(config.name);
177+
}
178+
Collections.sort(names, String.CASE_INSENSITIVE_ORDER);
179+
return String.join(",", names);
180+
}
181+
182+
private static String requireTrimmed(String value, String catalogName, String propertySuffix) {
183+
if (value == null) {
184+
throw new IllegalStateException(
185+
String.format(
186+
Locale.ROOT,
187+
"Missing Unity Catalog configuration '%s%s%s'.",
188+
SPARK_SQL_CATALOG_PREFIX,
189+
catalogName,
190+
propertySuffix.isEmpty() ? "" : "." + propertySuffix));
191+
}
192+
String trimmed = value.trim();
193+
if (trimmed.isEmpty()) {
194+
throw new IllegalStateException(
195+
String.format(
196+
Locale.ROOT,
197+
"Unity Catalog configuration '%s%s.%s' cannot be empty.",
198+
SPARK_SQL_CATALOG_PREFIX,
199+
catalogName,
200+
propertySuffix));
201+
}
202+
return trimmed;
203+
}
204+
205+
private static Optional<String> extractCatalogName(Identifier identifier) {
206+
String[] namespace = identifier.namespace();
207+
if (namespace != null && namespace.length > 0) {
208+
return Optional.of(namespace[0]);
209+
}
210+
return Optional.empty();
211+
}
212+
213+
/** Unity Catalog client handle containing additional metadata required by the connector. */
214+
public static final class UnityCatalogClient implements AutoCloseable {
215+
private final String catalogName;
216+
private final UCClient ucClient;
217+
private final Optional<String> warehouse;
218+
219+
UnityCatalogClient(String catalogName, UCClient ucClient, Optional<String> warehouse) {
220+
this.catalogName = requireNonNull(catalogName, "catalogName is null");
221+
this.ucClient = requireNonNull(ucClient, "ucClient is null");
222+
this.warehouse = Objects.requireNonNullElseGet(warehouse, Optional::empty);
223+
}
224+
225+
public String getCatalogName() {
226+
return catalogName;
227+
}
228+
229+
public UCClient getUcClient() {
230+
return ucClient;
231+
}
232+
233+
public Optional<String> getWarehouse() {
234+
return warehouse;
235+
}
236+
237+
@Override
238+
public void close() throws Exception {
239+
ucClient.close();
240+
}
241+
}
242+
243+
private static final class CatalogProperties {
244+
private final String name;
245+
private String connectorClass;
246+
private String uri;
247+
private String token;
248+
private Optional<String> warehouse = Optional.empty();
249+
250+
private CatalogProperties(String name) {
251+
this.name = name;
252+
}
253+
}
254+
255+
private static final class CatalogConfig {
256+
private final String name;
257+
private final String uri;
258+
private final String token;
259+
private final Optional<String> warehouse;
260+
261+
private CatalogConfig(String name, String uri, String token, Optional<String> warehouse) {
262+
this.name = name;
263+
this.uri = uri;
264+
this.token = token;
265+
this.warehouse = warehouse;
266+
}
267+
}
268+
269+
/** Visible for testing: override the UC client builder. */
270+
public static void setClientBuilderForTesting(BiFunction<String, String, UCClient> builder) {
271+
clientBuilder = requireNonNull(builder, "builder is null");
272+
}
273+
274+
/** Visible for testing: reset the UC client builder to the default implementation. */
275+
public static void resetClientBuilderForTesting() {
276+
clientBuilder = UCTokenBasedRestClient::new;
277+
}
278+
}

0 commit comments

Comments
 (0)