Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
13c598b
update methods and testS
TimothyW553 Nov 12, 2025
40f3581
delete workspace
TimothyW553 Nov 12, 2025
b2ba4e4
delete
TimothyW553 Nov 12, 2025
8dcb742
implementation
TimothyW553 Dec 11, 2025
1fdadc7
Add catalogCommits to getActiveCommitAtTime
TimothyW553 Dec 11, 2025
e1ba896
fix import
TimothyW553 Dec 11, 2025
f85961a
integration tests
TimothyW553 Dec 11, 2025
32fb39e
integration tests
TimothyW553 Dec 11, 2025
e5a2447
Update tests
TimothyW553 Dec 11, 2025
9fce780
Update tests
TimothyW553 Dec 11, 2025
d847da2
revert accidental fmt
TimothyW553 Dec 11, 2025
1e6f351
fix earliest version from delta
TimothyW553 Dec 11, 2025
1df1a08
fix earliest version from delta
TimothyW553 Dec 11, 2025
a3d9b6b
Address comments regarding naming, early exit, and consolidating test…
TimothyW553 Dec 16, 2025
dc511d6
add test
TimothyW553 Dec 16, 2025
e993b6c
Address changes: rename from CCv2 to catalogManaged, consolidate test…
TimothyW553 Dec 16, 2025
398b807
address comment about naming and docs
TimothyW553 Dec 18, 2025
fffe2a0
Merge remote-tracking branch 'upstream/master' into stack/snapshotman…
TimothyW553 Dec 18, 2025
d383547
update docs to follow delta, kernel, etc semantics
TimothyW553 Dec 18, 2025
f6eb928
update ratified to catalog and update docs
TimothyW553 Dec 19, 2025
da30e83
algin comment
TimothyW553 Dec 19, 2025
16dd88c
Create factory and routing
TimothyW553 Dec 11, 2025
2410591
extract engine
TimothyW553 Dec 16, 2025
cbb134e
clarify fallback
TimothyW553 Dec 19, 2025
19b8915
streaming test
TimothyW553 Dec 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
import static java.util.Objects.requireNonNull;

import io.delta.kernel.Snapshot;
import io.delta.kernel.defaults.engine.DefaultEngine;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.spark.read.SparkScanBuilder;
import io.delta.kernel.spark.snapshot.DeltaSnapshotManager;
import io.delta.kernel.spark.snapshot.PathBasedSnapshotManager;
import io.delta.kernel.spark.snapshot.SnapshotManagerFactory;
import io.delta.kernel.spark.utils.SchemaUtils;
import java.util.*;
import java.util.function.Supplier;
Expand Down Expand Up @@ -52,8 +54,6 @@ public class SparkTable implements Table, SupportsRead {
/** Snapshot created during connector setup */
private final Snapshot initialSnapshot;

private final Configuration hadoopConf;

private final SchemaProvider schemaProvider;
private final Optional<CatalogTable> catalogTable;

Expand Down Expand Up @@ -135,9 +135,10 @@ private SparkTable(
merged.putAll(userOptions);
this.options = Collections.unmodifiableMap(merged);

this.hadoopConf =
Configuration hadoopConf =
SparkSession.active().sessionState().newHadoopConfWithOptions(toScalaMap(options));
this.snapshotManager = new PathBasedSnapshotManager(tablePath, hadoopConf);
Engine kernelEngine = DefaultEngine.create(hadoopConf);
this.snapshotManager = SnapshotManagerFactory.create(tablePath, kernelEngine, catalogTable);
// Load the initial snapshot through the manager
this.initialSnapshot = snapshotManager.loadLatestSnapshot();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,12 @@ public class PathBasedSnapshotManager implements DeltaSnapshotManager {
private final Engine kernelEngine;

public PathBasedSnapshotManager(String tablePath, Configuration hadoopConf) {
this(tablePath, DefaultEngine.create(requireNonNull(hadoopConf, "hadoopConf is null")));
}

public PathBasedSnapshotManager(String tablePath, Engine kernelEngine) {
this.tablePath = requireNonNull(tablePath, "tablePath is null");
this.kernelEngine = DefaultEngine.create(requireNonNull(hadoopConf, "hadoopConf is null"));
this.kernelEngine = requireNonNull(kernelEngine, "kernelEngine is null");
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright (2025) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.spark.snapshot;

import io.delta.kernel.engine.Engine;
import io.delta.kernel.spark.snapshot.unitycatalog.UCManagedTableSnapshotManager;
import io.delta.kernel.spark.snapshot.unitycatalog.UCTableInfo;
import io.delta.kernel.spark.snapshot.unitycatalog.UCUtils;
import io.delta.kernel.unitycatalog.UCCatalogManagedClient;
import io.delta.storage.commit.uccommitcoordinator.UCClient;
import io.delta.storage.commit.uccommitcoordinator.UCTokenBasedRestClient;
import java.util.Optional;
import org.apache.spark.annotation.Experimental;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.catalog.CatalogTable;

/**
* Factory for creating {@link DeltaSnapshotManager} instances.
*
* <p>This factory determines the appropriate snapshot manager based on the table configuration:
*
* <ul>
* <li>For Unity Catalog managed tables: creates {@link UCManagedTableSnapshotManager}
* <li>For path-based tables: creates {@link PathBasedSnapshotManager}
* </ul>
*/
@Experimental
public final class SnapshotManagerFactory {

// Utility class - no instances
private SnapshotManagerFactory() {}

/**
* Creates a snapshot manager for the given table.
*
* @param tablePath the filesystem path to the Delta table
* @param kernelEngine the pre-configured Kernel {@link Engine} to use for table operations
* @param catalogTable optional Spark catalog table metadata
* @return a {@link DeltaSnapshotManager} appropriate for the table type
*/
public static DeltaSnapshotManager create(
String tablePath, Engine kernelEngine, Optional<CatalogTable> catalogTable) {

if (catalogTable.isPresent()) {
Optional<UCTableInfo> ucTableInfo =
UCUtils.extractTableInfo(catalogTable.get(), SparkSession.active());
if (ucTableInfo.isPresent()) {
return createUCManagedSnapshotManager(ucTableInfo.get(), kernelEngine);
}
// Catalog table without UC metadata falls back to path-based handling.
}

// Default: path-based snapshot manager for non-UC tables
return new PathBasedSnapshotManager(tablePath, kernelEngine);
}

private static UCManagedTableSnapshotManager createUCManagedSnapshotManager(
UCTableInfo tableInfo, Engine kernelEngine) {
UCClient ucClient = new UCTokenBasedRestClient(tableInfo.getUcUri(), tableInfo.getUcToken());
UCCatalogManagedClient ucCatalogClient = new UCCatalogManagedClient(ucClient);
return new UCManagedTableSnapshotManager(ucCatalogClient, tableInfo, kernelEngine);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,12 @@
import io.delta.kernel.Snapshot;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.internal.DeltaHistoryManager;
import io.delta.kernel.internal.SnapshotImpl;
import io.delta.kernel.internal.files.ParsedCatalogCommitData;
import io.delta.kernel.spark.exception.VersionNotFoundException;
import io.delta.kernel.spark.snapshot.DeltaSnapshotManager;
import io.delta.kernel.unitycatalog.UCCatalogManagedClient;
import java.util.List;
import java.util.Optional;

/**
Expand Down Expand Up @@ -55,37 +59,122 @@ public UCManagedTableSnapshotManager(
this.engine = requireNonNull(engine, "engine is null");
}

/**
* Loads and returns the latest snapshot of the UC-managed Delta table.
*
* @return the latest snapshot of the table
*/
@Override
public Snapshot loadLatestSnapshot() {
throw new UnsupportedOperationException(
"UCManagedTableSnapshotManager.loadLatestSnapshot is not yet implemented");
return ucCatalogManagedClient.loadSnapshot(
engine,
tableId,
tablePath,
Optional.empty() /* versionOpt */,
Optional.empty() /* timestampOpt */);
}

@Override
public Snapshot loadSnapshotAt(long version) {
throw new UnsupportedOperationException(
"UCManagedTableSnapshotManager.loadSnapshotAt is not yet implemented");
return ucCatalogManagedClient.loadSnapshot(
engine, tableId, tablePath, Optional.of(version), Optional.empty() /* timestampOpt */);
}

/**
* Finds the active commit at a specific timestamp.
*
* <p>For UC-managed tables, this loads the latest snapshot and uses {@link
* DeltaHistoryManager#getActiveCommitAtTimestamp} to resolve the timestamp to a commit.
*
* @param timestampMillis the timestamp to find the version for in milliseconds since the unix
* epoch
* @param canReturnLastCommit whether we can return the latest version of the table if the
* provided timestamp is after the latest commit
* @param mustBeRecreatable whether the state at the returned commit should be recreatable
* @param canReturnEarliestCommit whether we can return the earliest version of the table if the
* provided timestamp is before the earliest commit
* @return the commit that was active at the specified timestamp
*/
@Override
public DeltaHistoryManager.Commit getActiveCommitAtTime(
long timestampMillis,
boolean canReturnLastCommit,
boolean mustBeRecreatable,
boolean canReturnEarliestCommit) {
throw new UnsupportedOperationException(
"UCManagedTableSnapshotManager.getActiveCommitAtTime is not yet implemented");
SnapshotImpl snapshot = (SnapshotImpl) loadLatestSnapshot();
List<ParsedCatalogCommitData> catalogCommits = snapshot.getLogSegment().getAllCatalogCommits();
return DeltaHistoryManager.getActiveCommitAtTimestamp(
engine,
snapshot,
snapshot.getLogPath(),
timestampMillis,
mustBeRecreatable,
canReturnLastCommit,
canReturnEarliestCommit,
catalogCommits);
}

/**
* Checks if a specific version exists and is accessible.
*
* <p>For UC-managed tables with catalogManaged, log files may be cleaned up, so we need to use
* DeltaHistoryManager to find the earliest available version based on filesystem state.
*
* @param version the version to check
* @param mustBeRecreatable whether the state at this version should be recreatable
* @param allowOutOfRange whether versions greater than the latest version are allowed without
* throwing an exception
* @throws VersionNotFoundException if the version is not available or does not meet the specified
* criteria
*/
@Override
public void checkVersionExists(long version, boolean mustBeRecreatable, boolean allowOutOfRange) {
throw new UnsupportedOperationException(
"UCManagedTableSnapshotManager.checkVersionExists is not yet implemented");
public void checkVersionExists(long version, boolean mustBeRecreatable, boolean allowOutOfRange)
throws VersionNotFoundException {
// Load latest to get the current version bounds
SnapshotImpl snapshot = (SnapshotImpl) loadLatestSnapshot();
// Latest version visible in this UC-managed snapshot.
long latestSnapshotVersion = snapshot.getVersion();

// Fast path: check upper bound before expensive filesystem operations
if ((version > latestSnapshotVersion) && !allowOutOfRange) {
throw new VersionNotFoundException(version, 0 /* earliest */, latestSnapshotVersion);
}

// Get the earliest version among catalog commits. This bounds the Kernel's filesystem search
// for the earliest available version (e.g., if catalog has v0, no filesystem search is needed).
List<ParsedCatalogCommitData> catalogCommits = snapshot.getLogSegment().getAllCatalogCommits();
Optional<Long> earliestCatalogCommitVersion =
catalogCommits.stream().map(ParsedCatalogCommitData::getVersion).min(Long::compare);

long earliestVersion =
mustBeRecreatable
? DeltaHistoryManager.getEarliestRecreatableCommit(
engine, snapshot.getLogPath(), earliestCatalogCommitVersion)
: DeltaHistoryManager.getEarliestDeltaFile(
engine, snapshot.getLogPath(), earliestCatalogCommitVersion);

if (version < earliestVersion) {
throw new VersionNotFoundException(version, earliestVersion, latestSnapshotVersion);
}
}

/**
* Gets a range of table changes (commits) between start and end versions.
*
* @param engine the engine implementation for executing operations
* @param startVersion the starting version (inclusive)
* @param endVersion optional ending version (inclusive); if not provided, extends to latest
* @return a CommitRange representing the specified range of commits
*/
@Override
public CommitRange getTableChanges(Engine engine, long startVersion, Optional<Long> endVersion) {
throw new UnsupportedOperationException(
"UCManagedTableSnapshotManager.getTableChanges is not yet implemented");
return ucCatalogManagedClient.loadCommitRange(
engine,
tableId,
tablePath,
Optional.of(startVersion) /* startVersionOpt */,
Optional.empty() /* startTimestampOpt */,
endVersion /* endVersionOpt */,
Optional.empty() /* endTimestampOpt */);
}
}
Loading
Loading