Skip to content

Commit 95add12

Browse files
authored
[Spark] Support UnityCatalog OAuth authentication. (#5683)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [x] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description The related issue is #5509, since we've already publish the unitycatalog-0.3.1 to the central maven repo: https://central.sonatype.com/artifact/io.unitycatalog/unitycatalog-client And we've already introduced the abstracted [TokenProvider](https://github.com/unitycatalog/unitycatalog/blob/main/clients/java/src/main/java/io/unitycatalog/client/auth/TokenProvider.java) API in unitycatalog-client 0.3.1, then in this PR, we just use the TokenProvider from unitycatalog-client 0.3.1 directly, and remove all the hard-coded `static` token authentication in the oss-delta, so that delta will have the consistent and abstracted authentication to unitycatalog client, and with this approach, it also helps us to extend to other custom TokenProvider implementation. ## How was this patch tested? We use the locatly designed unit test. ## Does this PR introduce _any_ user-facing changes? No. --------- Signed-off-by: openinx <[email protected]>
1 parent 9984f28 commit 95add12

File tree

9 files changed

+435
-162
lines changed

9 files changed

+435
-162
lines changed

build.sbt

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1023,6 +1023,13 @@ lazy val storage = (project in file("storage"))
10231023
// Note that the org.apache.hadoop.fs.s3a.Listing::createFileStatusListingIterator 3.3.1 API
10241024
// is not compatible with 3.3.2.
10251025
"org.apache.hadoop" % "hadoop-aws" % hadoopVersion % "provided",
1026+
"io.unitycatalog" % "unitycatalog-client" % unityCatalogVersion excludeAll(
1027+
ExclusionRule(organization = "org.openapitools"),
1028+
ExclusionRule(organization = "com.fasterxml.jackson.core"),
1029+
ExclusionRule(organization = "com.fasterxml.jackson.module"),
1030+
ExclusionRule(organization = "com.fasterxml.jackson.datatype"),
1031+
ExclusionRule(organization = "com.fasterxml.jackson.dataformat")
1032+
),
10261033

10271034
// Test Deps
10281035
"org.scalatest" %% "scalatest" % scalaTestVersion % "test",

kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/unitycatalog/UCTableInfo.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,23 +17,28 @@
1717

1818
import static java.util.Objects.requireNonNull;
1919

20+
import java.util.Collections;
21+
import java.util.Map;
22+
2023
/**
2124
* Table information for Unity Catalog managed tables.
2225
*
2326
* <p>This POJO encapsulates all the information needed to interact with a Unity Catalog table
2427
* without requiring Spark dependencies.
2528
*/
2629
public final class UCTableInfo {
30+
2731
private final String tableId;
2832
private final String tablePath;
2933
private final String ucUri;
30-
private final String ucToken;
34+
private final Map<String, String> authConfig;
3135

32-
public UCTableInfo(String tableId, String tablePath, String ucUri, String ucToken) {
36+
public UCTableInfo(
37+
String tableId, String tablePath, String ucUri, Map<String, String> authConfig) {
3338
this.tableId = requireNonNull(tableId, "tableId is null");
3439
this.tablePath = requireNonNull(tablePath, "tablePath is null");
3540
this.ucUri = requireNonNull(ucUri, "ucUri is null");
36-
this.ucToken = requireNonNull(ucToken, "ucToken is null");
41+
this.authConfig = Collections.unmodifiableMap(requireNonNull(authConfig, "authConfig is null"));
3742
}
3843

3944
public String getTableId() {
@@ -48,7 +53,7 @@ public String getUcUri() {
4853
return ucUri;
4954
}
5055

51-
public String getUcToken() {
52-
return ucToken;
56+
public Map<String, String> getAuthConfig() {
57+
return authConfig;
5358
}
5459
}

kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/unitycatalog/UCUtils.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package io.delta.kernel.spark.snapshot.unitycatalog;
1717

1818
import static java.util.Objects.requireNonNull;
19+
import static scala.jdk.javaapi.CollectionConverters.asJava;
1920

2021
import io.delta.kernel.spark.utils.CatalogTableUtils;
2122
import io.delta.storage.commit.uccommitcoordinator.UCCommitCoordinatorClient;
@@ -85,9 +86,8 @@ public static Optional<UCTableInfo> extractTableInfo(
8586

8687
UCCatalogConfig config = configOpt.get();
8788
String ucUri = config.uri();
88-
String ucToken = config.token();
8989

90-
return Optional.of(new UCTableInfo(tableId, tablePath, ucUri, ucToken));
90+
return Optional.of(new UCTableInfo(tableId, tablePath, ucUri, asJava(config.authConfig())));
9191
}
9292

9393
private static String extractUCTableId(CatalogTable catalogTable) {

kernel-spark/src/test/java/io/delta/kernel/spark/snapshot/unitycatalog/UCTableInfoTest.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,24 +17,33 @@
1717

1818
import static org.junit.jupiter.api.Assertions.assertEquals;
1919

20+
import java.util.HashMap;
21+
import java.util.Map;
2022
import org.junit.jupiter.api.Test;
2123

2224
/** Tests for {@link UCTableInfo}. */
2325
class UCTableInfoTest {
2426

2527
@Test
26-
void testConstructor_ValidInputs_StoresAllFields() {
28+
void testConstructor() {
2729
// Use distinctive values that would fail if implementation had hardcoded defaults
2830
String tableId = "uc_tbl_7f3a9b2c-e8d1-4f6a";
2931
String tablePath = "abfss://[email protected]/delta/v2";
3032
String ucUri = "https://uc-server.example.net/api/2.1/uc";
3133
String ucToken = "dapi_Kx9mN$2pQr#7vWz";
3234

33-
UCTableInfo info = new UCTableInfo(tableId, tablePath, ucUri, ucToken);
35+
Map<String, String> authConfig = new HashMap<>();
36+
authConfig.put("type", "static");
37+
authConfig.put("token", ucToken);
38+
39+
UCTableInfo info = new UCTableInfo(tableId, tablePath, ucUri, authConfig);
3440

3541
assertEquals(tableId, info.getTableId(), "Table ID should be stored correctly");
3642
assertEquals(tablePath, info.getTablePath(), "Table path should be stored correctly");
3743
assertEquals(ucUri, info.getUcUri(), "UC URI should be stored correctly");
38-
assertEquals(ucToken, info.getUcToken(), "UC token should be stored correctly");
44+
45+
Map<String, String> ret = info.getAuthConfig();
46+
assertEquals("static", ret.get("type"), "Type should be static");
47+
assertEquals(ucToken, ret.get("token"), "UC token should be stored correctly in configMap");
3948
}
4049
}

kernel-spark/src/test/scala/io/delta/kernel/spark/snapshot/unitycatalog/UCUtilsSuite.scala

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,13 @@ class UCUtilsSuite extends SparkFunSuite with SharedSparkSession {
164164
info.getTablePath == TABLE_PATH_ALPHA,
165165
s"Table path mismatch: got ${info.getTablePath}")
166166
assert(info.getUcUri == UC_URI_ALPHA, s"UC URI mismatch: got ${info.getUcUri}")
167-
assert(info.getUcToken == UC_TOKEN_ALPHA, s"UC token mismatch: got ${info.getUcToken}")
167+
val configMap = info.getAuthConfig
168+
assert(
169+
configMap.get("type") == "static",
170+
s"Type should be static: got ${configMap.get("type")}")
171+
assert(
172+
configMap.get("token") == UC_TOKEN_ALPHA,
173+
s"UC token mismatch: got ${configMap.get("token")}")
168174
}
169175
}
170176

@@ -190,7 +196,7 @@ class UCUtilsSuite extends SparkFunSuite with SharedSparkSession {
190196
// catalogGamma config (should NOT be used)
191197
s"spark.sql.catalog.$catalogGamma" -> UC_CATALOG_CONNECTOR,
192198
s"spark.sql.catalog.$catalogGamma.uri" -> ucUriGamma,
193-
s"spark.sql.catalog.$catalogGamma.token" -> ucTokenGamma,
199+
s"spark.sql.catalog.$catalogGamma.token" -> ucTokenBeta,
194200
// catalogBeta config (should be used)
195201
s"spark.sql.catalog.$catalogBeta" -> UC_CATALOG_CONNECTOR,
196202
s"spark.sql.catalog.$catalogBeta.uri" -> ucUriBeta,
@@ -208,7 +214,11 @@ class UCUtilsSuite extends SparkFunSuite with SharedSparkSession {
208214
assert(
209215
info.getUcUri == ucUriBeta,
210216
s"Should use catalogBeta's URI, got: ${info.getUcUri}")
211-
assert(info.getUcToken == ucTokenBeta, s"Should use catalogBeta's token, got: ${info.getUcToken}")
217+
val configMap = info.getAuthConfig
218+
assert(configMap.get("type") == "static", s"Type should be static")
219+
assert(
220+
configMap.get("token") == ucTokenBeta,
221+
s"Should use catalogBeta's token, got: ${configMap.get("token")}")
212222
assert(info.getTableId == tableIdBeta, s"Should extract tableIdBeta, got: ${info.getTableId}")
213223
assert(
214224
info.getTablePath == tablePathBeta,

0 commit comments

Comments
 (0)