diff --git a/xtable-api/src/main/java/org/apache/xtable/model/InternalSnapshot.java b/xtable-api/src/main/java/org/apache/xtable/model/InternalSnapshot.java index 345fda78a..1963eba70 100644 --- a/xtable-api/src/main/java/org/apache/xtable/model/InternalSnapshot.java +++ b/xtable-api/src/main/java/org/apache/xtable/model/InternalSnapshot.java @@ -23,6 +23,7 @@ import java.util.List; import lombok.Builder; +import lombok.NonNull; import lombok.Value; import org.apache.xtable.model.storage.PartitionFileGroup; @@ -47,4 +48,6 @@ public class InternalSnapshot { List partitionedDataFiles; // pending commits before latest commit on the table. @Builder.Default List pendingCommits = Collections.emptyList(); + // commit identifier in source table + @NonNull String sourceIdentifier; } diff --git a/xtable-api/src/main/java/org/apache/xtable/model/TableChange.java b/xtable-api/src/main/java/org/apache/xtable/model/TableChange.java index 51f0ee0bb..fe3907eee 100644 --- a/xtable-api/src/main/java/org/apache/xtable/model/TableChange.java +++ b/xtable-api/src/main/java/org/apache/xtable/model/TableChange.java @@ -19,6 +19,7 @@ package org.apache.xtable.model; import lombok.Builder; +import lombok.NonNull; import lombok.Value; import org.apache.xtable.model.storage.DataFilesDiff; @@ -36,4 +37,7 @@ public class TableChange { /** The {@link InternalTable} at the commit time to which this table change belongs. */ InternalTable tableAsOfChange; + + // Commit identifier in source table + @NonNull String sourceIdentifier; } diff --git a/xtable-api/src/main/java/org/apache/xtable/model/metadata/TableSyncMetadata.java b/xtable-api/src/main/java/org/apache/xtable/model/metadata/TableSyncMetadata.java index fa5eeaff8..d8c707916 100644 --- a/xtable-api/src/main/java/org/apache/xtable/model/metadata/TableSyncMetadata.java +++ b/xtable-api/src/main/java/org/apache/xtable/model/metadata/TableSyncMetadata.java @@ -54,10 +54,30 @@ public class TableSyncMetadata { Instant lastInstantSynced; List instantsToConsiderForNextSync; int version; + String sourceTableFormat; + String sourceIdentifier; + /** + * @deprecated Use {@link #of(Instant, List, String, String)} instead. This method exists for + * backward compatibility and will be removed in a future version. + */ + @Deprecated public static TableSyncMetadata of( Instant lastInstantSynced, List instantsToConsiderForNextSync) { - return new TableSyncMetadata(lastInstantSynced, instantsToConsiderForNextSync, CURRENT_VERSION); + return TableSyncMetadata.of(lastInstantSynced, instantsToConsiderForNextSync, null, null); + } + + public static TableSyncMetadata of( + Instant lastInstantSynced, + List instantsToConsiderForNextSync, + String sourceTableFormat, + String sourceIdentifier) { + return new TableSyncMetadata( + lastInstantSynced, + instantsToConsiderForNextSync, + CURRENT_VERSION, + sourceTableFormat, + sourceIdentifier); } public String toJson() { diff --git a/xtable-api/src/main/java/org/apache/xtable/spi/extractor/ConversionSource.java b/xtable-api/src/main/java/org/apache/xtable/spi/extractor/ConversionSource.java index 21f7f63f2..42d2b26b9 100644 --- a/xtable-api/src/main/java/org/apache/xtable/spi/extractor/ConversionSource.java +++ b/xtable-api/src/main/java/org/apache/xtable/spi/extractor/ConversionSource.java @@ -87,4 +87,18 @@ public interface ConversionSource extends Closeable { * false. */ boolean isIncrementalSyncSafeFrom(Instant instant); + + /** + * Extract the identifier of the provided commit. The identifier is defined as: + * + *
    + *
  • Snapshot ID in Iceberg + *
  • Version ID in Delta + *
  • Timestamp in Hudi + *
+ * + * @param commit The provided commit + * @return the string version of the commit identifier + */ + String getCommitIdentifier(COMMIT commit); } diff --git a/xtable-api/src/main/java/org/apache/xtable/spi/sync/ConversionTarget.java b/xtable-api/src/main/java/org/apache/xtable/spi/sync/ConversionTarget.java index 736f49e42..476578cb6 100644 --- a/xtable-api/src/main/java/org/apache/xtable/spi/sync/ConversionTarget.java +++ b/xtable-api/src/main/java/org/apache/xtable/spi/sync/ConversionTarget.java @@ -90,4 +90,14 @@ public interface ConversionTarget { /** Initializes the client with provided configuration */ void init(TargetTable targetTable, Configuration configuration); + + /** + * Retrieves the commit identifier from the target table that corresponds to a given source table + * commit identifier + * + * @param sourceIdentifier the unique identifier of the source table commit + * @return an {@link Optional} containing the target commit identifier if a corresponding commit + * exists, or an empty {@link Optional} if no match is found + */ + Optional getTargetCommitIdentifier(String sourceIdentifier); } diff --git a/xtable-api/src/main/java/org/apache/xtable/spi/sync/TableFormatSync.java b/xtable-api/src/main/java/org/apache/xtable/spi/sync/TableFormatSync.java index bb3406696..78a22b764 100644 --- a/xtable-api/src/main/java/org/apache/xtable/spi/sync/TableFormatSync.java +++ b/xtable-api/src/main/java/org/apache/xtable/spi/sync/TableFormatSync.java @@ -73,7 +73,8 @@ public Map syncSnapshot( internalTable, target -> target.syncFilesForSnapshot(snapshot.getPartitionedDataFiles()), startTime, - snapshot.getPendingCommits())); + snapshot.getPendingCommits(), + snapshot.getSourceIdentifier())); } catch (Exception e) { log.error("Failed to sync snapshot", e); results.put( @@ -121,7 +122,8 @@ public Map> syncChanges( change.getTableAsOfChange(), target -> target.syncFilesForDiff(change.getFilesDiff()), startTime, - changes.getPendingCommits())); + changes.getPendingCommits(), + change.getSourceIdentifier())); } catch (Exception e) { log.error("Failed to sync table changes", e); resultsForFormat.add(buildResultForError(SyncMode.INCREMENTAL, startTime, e)); @@ -149,19 +151,26 @@ private SyncResult getSyncResult( InternalTable tableState, SyncFiles fileSyncMethod, Instant startTime, - List pendingCommits) { + List pendingCommits, + String sourceIdentifier) { // initialize the sync conversionTarget.beginSync(tableState); + // Persist the latest commit time in table properties for incremental syncs + // Syncing metadata must precede the following steps to ensure that the metadata is available + // before committing + TableSyncMetadata latestState = + TableSyncMetadata.of( + tableState.getLatestCommitTime(), + pendingCommits, + tableState.getTableFormat(), + sourceIdentifier); + conversionTarget.syncMetadata(latestState); // sync schema updates conversionTarget.syncSchema(tableState.getReadSchema()); // sync partition updates conversionTarget.syncPartitionSpec(tableState.getPartitioningFields()); // Update the files in the target table fileSyncMethod.sync(conversionTarget); - // Persist the latest commit time in table properties for incremental syncs. - TableSyncMetadata latestState = - TableSyncMetadata.of(tableState.getLatestCommitTime(), pendingCommits); - conversionTarget.syncMetadata(latestState); conversionTarget.completeSync(); return SyncResult.builder() diff --git a/xtable-api/src/test/java/org/apache/xtable/model/metadata/TestTableSyncMetadata.java b/xtable-api/src/test/java/org/apache/xtable/model/metadata/TestTableSyncMetadata.java index 3dcc6fbe0..56cce262a 100644 --- a/xtable-api/src/test/java/org/apache/xtable/model/metadata/TestTableSyncMetadata.java +++ b/xtable-api/src/test/java/org/apache/xtable/model/metadata/TestTableSyncMetadata.java @@ -44,6 +44,7 @@ void jsonRoundTrip(TableSyncMetadata metadata, String expectedJson) { private static Stream provideMetadataAndJson() { return Stream.of( + // Old version of metadata and JSON Arguments.of( TableSyncMetadata.of( Instant.parse("2020-07-04T10:15:30.00Z"), @@ -56,7 +57,24 @@ private static Stream provideMetadataAndJson() { "{\"lastInstantSynced\":\"2020-07-04T10:15:30Z\",\"instantsToConsiderForNextSync\":[],\"version\":0}"), Arguments.of( TableSyncMetadata.of(Instant.parse("2020-07-04T10:15:30.00Z"), null), - "{\"lastInstantSynced\":\"2020-07-04T10:15:30Z\",\"version\":0}")); + "{\"lastInstantSynced\":\"2020-07-04T10:15:30Z\",\"version\":0}"), + // New version of metadata and JSON with `sourceTableFormat` and `sourceIdentifier` fields + Arguments.of( + TableSyncMetadata.of( + Instant.parse("2020-07-04T10:15:30.00Z"), + Arrays.asList( + Instant.parse("2020-08-21T11:15:30.00Z"), + Instant.parse("2024-01-21T12:15:30.00Z")), + "TEST", + "0"), + "{\"lastInstantSynced\":\"2020-07-04T10:15:30Z\",\"instantsToConsiderForNextSync\":[\"2020-08-21T11:15:30Z\",\"2024-01-21T12:15:30Z\"],\"version\":0,\"sourceTableFormat\":\"TEST\",\"sourceIdentifier\":\"0\"}"), + Arguments.of( + TableSyncMetadata.of( + Instant.parse("2020-07-04T10:15:30.00Z"), Collections.emptyList(), "TEST", "0"), + "{\"lastInstantSynced\":\"2020-07-04T10:15:30Z\",\"instantsToConsiderForNextSync\":[],\"version\":0,\"sourceTableFormat\":\"TEST\",\"sourceIdentifier\":\"0\"}"), + Arguments.of( + TableSyncMetadata.of(Instant.parse("2020-07-04T10:15:30.00Z"), null, "TEST", "0"), + "{\"lastInstantSynced\":\"2020-07-04T10:15:30Z\",\"version\":0,\"sourceTableFormat\":\"TEST\",\"sourceIdentifier\":\"0\"}")); } @Test diff --git a/xtable-api/src/test/java/org/apache/xtable/spi/extractor/TestExtractFromSource.java b/xtable-api/src/test/java/org/apache/xtable/spi/extractor/TestExtractFromSource.java index 8503057db..5c7f8cfcf 100644 --- a/xtable-api/src/test/java/org/apache/xtable/spi/extractor/TestExtractFromSource.java +++ b/xtable-api/src/test/java/org/apache/xtable/spi/extractor/TestExtractFromSource.java @@ -53,7 +53,11 @@ public void extractSnapshot() { InternalTable table = InternalTable.builder().latestCommitTime(Instant.now()).build(); List dataFiles = Collections.emptyList(); InternalSnapshot internalSnapshot = - InternalSnapshot.builder().table(table).partitionedDataFiles(dataFiles).build(); + InternalSnapshot.builder() + .table(table) + .partitionedDataFiles(dataFiles) + .sourceIdentifier("0") + .build(); when(mockConversionSource.getCurrentSnapshot()).thenReturn(internalSnapshot); assertEquals(internalSnapshot, ExtractFromSource.of(mockConversionSource).extractSnapshot()); } @@ -86,6 +90,7 @@ public void extractTableChanges() { .tableAsOfChange(tableAtFirstInstant) .filesDiff( DataFilesDiff.builder().fileAdded(newFile1).fileRemoved(initialFile2).build()) + .sourceIdentifier("0") .build(); when(mockConversionSource.getTableChangeForCommit(firstCommitToSync)) .thenReturn(tableChangeToReturnAtFirstInstant); @@ -94,6 +99,7 @@ public void extractTableChanges() { .tableAsOfChange(tableAtFirstInstant) .filesDiff( DataFilesDiff.builder().fileAdded(newFile1).fileRemoved(initialFile2).build()) + .sourceIdentifier("0") .build(); // add 2 new files, remove 2 files @@ -110,6 +116,7 @@ public void extractTableChanges() { .filesAdded(Arrays.asList(newFile2, newFile3)) .filesRemoved(Arrays.asList(initialFile3, newFile1)) .build()) + .sourceIdentifier("1") .build(); when(mockConversionSource.getTableChangeForCommit(secondCommitToSync)) .thenReturn(tableChangeToReturnAtSecondInstant); @@ -121,6 +128,7 @@ public void extractTableChanges() { .filesAdded(Arrays.asList(newFile2, newFile3)) .filesRemoved(Arrays.asList(initialFile3, newFile1)) .build()) + .sourceIdentifier("1") .build(); IncrementalTableChanges actual = diff --git a/xtable-api/src/test/java/org/apache/xtable/spi/sync/TestTableFormatSync.java b/xtable-api/src/test/java/org/apache/xtable/spi/sync/TestTableFormatSync.java index 39480f8b0..852d4e13f 100644 --- a/xtable-api/src/test/java/org/apache/xtable/spi/sync/TestTableFormatSync.java +++ b/xtable-api/src/test/java/org/apache/xtable/spi/sync/TestTableFormatSync.java @@ -76,6 +76,7 @@ void syncSnapshotWithFailureForOneFormat() { .table(startingTableState) .partitionedDataFiles(fileGroups) .pendingCommits(pendingCommitInstants) + .sourceIdentifier("0") .build(); when(mockConversionTarget1.getTableFormat()).thenReturn(TableFormat.ICEBERG); when(mockConversionTarget2.getTableFormat()).thenReturn(TableFormat.DELTA); @@ -109,7 +110,10 @@ void syncSnapshotWithFailureForOneFormat() { failureResult.getTableFormatSyncStatus()); verifyBaseConversionTargetCalls( - mockConversionTarget2, startingTableState, pendingCommitInstants); + mockConversionTarget2, + startingTableState, + pendingCommitInstants, + snapshot.getSourceIdentifier()); verify(mockConversionTarget2).syncFilesForSnapshot(fileGroups); verify(mockConversionTarget2).completeSync(); verify(mockConversionTarget1, never()).completeSync(); @@ -127,15 +131,27 @@ void syncChangesWithFailureForOneFormat() { InternalTable tableState1 = getTableState(1); DataFilesDiff dataFilesDiff1 = getFilesDiff(1); TableChange tableChange1 = - TableChange.builder().tableAsOfChange(tableState1).filesDiff(dataFilesDiff1).build(); + TableChange.builder() + .tableAsOfChange(tableState1) + .filesDiff(dataFilesDiff1) + .sourceIdentifier("0") + .build(); InternalTable tableState2 = getTableState(2); DataFilesDiff dataFilesDiff2 = getFilesDiff(2); TableChange tableChange2 = - TableChange.builder().tableAsOfChange(tableState2).filesDiff(dataFilesDiff2).build(); + TableChange.builder() + .tableAsOfChange(tableState2) + .filesDiff(dataFilesDiff2) + .sourceIdentifier("1") + .build(); InternalTable tableState3 = getTableState(3); DataFilesDiff dataFilesDiff3 = getFilesDiff(3); TableChange tableChange3 = - TableChange.builder().tableAsOfChange(tableState3).filesDiff(dataFilesDiff3).build(); + TableChange.builder() + .tableAsOfChange(tableState3) + .filesDiff(dataFilesDiff3) + .sourceIdentifier("2") + .build(); List pendingCommitInstants = Collections.singletonList(Instant.now()); when(mockConversionTarget1.getTableFormat()).thenReturn(TableFormat.ICEBERG); @@ -154,10 +170,12 @@ void syncChangesWithFailureForOneFormat() { Map conversionTargetWithMetadata = new HashMap<>(); conversionTargetWithMetadata.put( mockConversionTarget1, - TableSyncMetadata.of(Instant.now().minus(1, ChronoUnit.HOURS), Collections.emptyList())); + TableSyncMetadata.of( + Instant.now().minus(1, ChronoUnit.HOURS), Collections.emptyList(), "TEST", "0")); conversionTargetWithMetadata.put( mockConversionTarget2, - TableSyncMetadata.of(Instant.now().minus(1, ChronoUnit.HOURS), Collections.emptyList())); + TableSyncMetadata.of( + Instant.now().minus(1, ChronoUnit.HOURS), Collections.emptyList(), "TEST", "1")); Map> result = TableFormatSync.getInstance() @@ -201,13 +219,29 @@ void syncChangesWithFailureForOneFormat() { assertSyncResultTimes(successResults.get(i), start); } - verifyBaseConversionTargetCalls(mockConversionTarget1, tableState1, pendingCommitInstants); + verifyBaseConversionTargetCalls( + mockConversionTarget1, + tableState1, + pendingCommitInstants, + tableChange1.getSourceIdentifier()); verify(mockConversionTarget1).syncFilesForDiff(dataFilesDiff1); - verifyBaseConversionTargetCalls(mockConversionTarget2, tableState1, pendingCommitInstants); + verifyBaseConversionTargetCalls( + mockConversionTarget2, + tableState1, + pendingCommitInstants, + tableChange1.getSourceIdentifier()); verify(mockConversionTarget2).syncFilesForDiff(dataFilesDiff1); - verifyBaseConversionTargetCalls(mockConversionTarget2, tableState2, pendingCommitInstants); + verifyBaseConversionTargetCalls( + mockConversionTarget2, + tableState2, + pendingCommitInstants, + tableChange2.getSourceIdentifier()); verify(mockConversionTarget2).syncFilesForDiff(dataFilesDiff2); - verifyBaseConversionTargetCalls(mockConversionTarget2, tableState3, pendingCommitInstants); + verifyBaseConversionTargetCalls( + mockConversionTarget2, + tableState3, + pendingCommitInstants, + tableChange3.getSourceIdentifier()); verify(mockConversionTarget2).syncFilesForDiff(dataFilesDiff3); verify(mockConversionTarget1, times(1)).completeSync(); verify(mockConversionTarget2, times(3)).completeSync(); @@ -219,15 +253,27 @@ void syncChangesWithDifferentFormatsAndMetadata() { InternalTable tableState1 = getTableState(1); DataFilesDiff dataFilesDiff1 = getFilesDiff(1); TableChange tableChange1 = - TableChange.builder().tableAsOfChange(tableState1).filesDiff(dataFilesDiff1).build(); + TableChange.builder() + .tableAsOfChange(tableState1) + .filesDiff(dataFilesDiff1) + .sourceIdentifier("0") + .build(); InternalTable tableState2 = getTableState(2); DataFilesDiff dataFilesDiff2 = getFilesDiff(2); TableChange tableChange2 = - TableChange.builder().tableAsOfChange(tableState2).filesDiff(dataFilesDiff2).build(); + TableChange.builder() + .tableAsOfChange(tableState2) + .filesDiff(dataFilesDiff2) + .sourceIdentifier("1") + .build(); InternalTable tableState3 = getTableState(3); DataFilesDiff dataFilesDiff3 = getFilesDiff(3); TableChange tableChange3 = - TableChange.builder().tableAsOfChange(tableState3).filesDiff(dataFilesDiff3).build(); + TableChange.builder() + .tableAsOfChange(tableState3) + .filesDiff(dataFilesDiff3) + .sourceIdentifier("2") + .build(); List pendingCommitInstants = Collections.singletonList(Instant.now()); when(mockConversionTarget1.getTableFormat()).thenReturn(TableFormat.ICEBERG); @@ -247,12 +293,17 @@ void syncChangesWithDifferentFormatsAndMetadata() { mockConversionTarget1, TableSyncMetadata.of( tableChange2.getTableAsOfChange().getLatestCommitTime(), - Collections.singletonList(tableChange1.getTableAsOfChange().getLatestCommitTime()))); + Collections.singletonList(tableChange1.getTableAsOfChange().getLatestCommitTime()), + "TEST", + tableChange2.getSourceIdentifier())); // mockConversionTarget2 will have synced the first table change previously conversionTargetWithMetadata.put( mockConversionTarget2, TableSyncMetadata.of( - tableChange1.getTableAsOfChange().getLatestCommitTime(), Collections.emptyList())); + tableChange1.getTableAsOfChange().getLatestCommitTime(), + Collections.emptyList(), + "TEST", + tableChange1.getSourceIdentifier())); Map> result = TableFormatSync.getInstance() @@ -290,15 +341,31 @@ void syncChangesWithDifferentFormatsAndMetadata() { } // conversionTarget1 syncs table changes 1 and 3 - verifyBaseConversionTargetCalls(mockConversionTarget1, tableState1, pendingCommitInstants); + verifyBaseConversionTargetCalls( + mockConversionTarget1, + tableState1, + pendingCommitInstants, + tableChange1.getSourceIdentifier()); verify(mockConversionTarget1).syncFilesForDiff(dataFilesDiff1); - verifyBaseConversionTargetCalls(mockConversionTarget1, tableState3, pendingCommitInstants); + verifyBaseConversionTargetCalls( + mockConversionTarget1, + tableState3, + pendingCommitInstants, + tableChange3.getSourceIdentifier()); verify(mockConversionTarget1).syncFilesForDiff(dataFilesDiff3); verify(mockConversionTarget1, times(2)).completeSync(); // conversionTarget2 syncs table changes 2 and 3 - verifyBaseConversionTargetCalls(mockConversionTarget2, tableState2, pendingCommitInstants); + verifyBaseConversionTargetCalls( + mockConversionTarget2, + tableState2, + pendingCommitInstants, + tableChange2.getSourceIdentifier()); verify(mockConversionTarget2).syncFilesForDiff(dataFilesDiff2); - verifyBaseConversionTargetCalls(mockConversionTarget2, tableState3, pendingCommitInstants); + verifyBaseConversionTargetCalls( + mockConversionTarget2, + tableState3, + pendingCommitInstants, + tableChange3.getSourceIdentifier()); verify(mockConversionTarget2).syncFilesForDiff(dataFilesDiff3); verify(mockConversionTarget2, times(2)).completeSync(); } @@ -309,7 +376,11 @@ void syncChangesOneFormatWithNoRequiredChanges() { InternalTable tableState1 = getTableState(1); DataFilesDiff dataFilesDiff1 = getFilesDiff(1); TableChange tableChange1 = - TableChange.builder().tableAsOfChange(tableState1).filesDiff(dataFilesDiff1).build(); + TableChange.builder() + .tableAsOfChange(tableState1) + .filesDiff(dataFilesDiff1) + .sourceIdentifier("0") + .build(); List pendingCommitInstants = Collections.emptyList(); when(mockConversionTarget1.getTableFormat()).thenReturn(TableFormat.ICEBERG); @@ -325,11 +396,13 @@ void syncChangesOneFormatWithNoRequiredChanges() { Map conversionTargetWithMetadata = new HashMap<>(); // mockConversionTarget1 will have nothing to sync conversionTargetWithMetadata.put( - mockConversionTarget1, TableSyncMetadata.of(Instant.now(), Collections.emptyList())); + mockConversionTarget1, + TableSyncMetadata.of(Instant.now(), Collections.emptyList(), "TEST", "0")); // mockConversionTarget2 will have synced the first table change previously conversionTargetWithMetadata.put( mockConversionTarget2, - TableSyncMetadata.of(Instant.now().minus(1, ChronoUnit.HOURS), Collections.emptyList())); + TableSyncMetadata.of( + Instant.now().minus(1, ChronoUnit.HOURS), Collections.emptyList(), "TEST", "1")); Map> result = TableFormatSync.getInstance() @@ -348,7 +421,11 @@ void syncChangesOneFormatWithNoRequiredChanges() { verify(mockConversionTarget1, never()).syncFilesForDiff(any()); verify(mockConversionTarget1, never()).completeSync(); - verifyBaseConversionTargetCalls(mockConversionTarget2, tableState1, pendingCommitInstants); + verifyBaseConversionTargetCalls( + mockConversionTarget2, + tableState1, + pendingCommitInstants, + tableChange1.getSourceIdentifier()); verify(mockConversionTarget2).syncFilesForDiff(dataFilesDiff1); } @@ -389,12 +466,17 @@ private DataFilesDiff getFilesDiff(int id) { private void verifyBaseConversionTargetCalls( ConversionTarget mockConversionTarget, InternalTable startingTableState, - List pendingCommitInstants) { + List pendingCommitInstants, + String sourceIdentifier) { verify(mockConversionTarget).beginSync(startingTableState); verify(mockConversionTarget).syncSchema(startingTableState.getReadSchema()); verify(mockConversionTarget).syncPartitionSpec(startingTableState.getPartitioningFields()); verify(mockConversionTarget) .syncMetadata( - TableSyncMetadata.of(startingTableState.getLatestCommitTime(), pendingCommitInstants)); + TableSyncMetadata.of( + startingTableState.getLatestCommitTime(), + pendingCommitInstants, + startingTableState.getTableFormat(), + sourceIdentifier)); } } diff --git a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java index 140eb8adc..de4a2b696 100644 --- a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java +++ b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java @@ -98,6 +98,7 @@ public InternalSnapshot getCurrentSnapshot() { return InternalSnapshot.builder() .table(table) .partitionedDataFiles(getInternalDataFiles(snapshot, table.getReadSchema())) + .sourceIdentifier(getCommitIdentifier(snapshot.version())) .build(); } @@ -167,7 +168,11 @@ public TableChange getTableChangeForCommit(Long versionNumber) { .filesAdded(addedFiles.values()) .filesRemoved(removedFiles.values()) .build(); - return TableChange.builder().tableAsOfChange(tableAtVersion).filesDiff(dataFilesDiff).build(); + return TableChange.builder() + .tableAsOfChange(tableAtVersion) + .filesDiff(dataFilesDiff) + .sourceIdentifier(getCommitIdentifier(versionNumber)) + .build(); } @Override @@ -200,6 +205,11 @@ public boolean isIncrementalSyncSafeFrom(Instant instant) { return deltaCommitInstant.equals(instant) || deltaCommitInstant.isBefore(instant); } + @Override + public String getCommitIdentifier(Long commit) { + return String.valueOf(commit); + } + private DeltaIncrementalChangesState getChangesState() { return deltaIncrementalChangesState.orElseThrow( () -> new IllegalStateException("DeltaIncrementalChangesState is not initialized")); diff --git a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionTarget.java b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionTarget.java index b34fa4491..d91941847 100644 --- a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionTarget.java +++ b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionTarget.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; @@ -30,6 +31,7 @@ import lombok.Getter; import lombok.Setter; import lombok.ToString; +import lombok.extern.log4j.Log4j2; import org.apache.hadoop.conf.Configuration; import org.apache.spark.sql.SparkSession; @@ -41,14 +43,17 @@ import org.apache.spark.sql.delta.DeltaLog; import org.apache.spark.sql.delta.DeltaOperations; import org.apache.spark.sql.delta.OptimisticTransaction; +import org.apache.spark.sql.delta.Snapshot; import org.apache.spark.sql.delta.actions.Action; import org.apache.spark.sql.delta.actions.AddFile; +import org.apache.spark.sql.delta.actions.CommitInfo; import org.apache.spark.sql.delta.actions.Format; import org.apache.spark.sql.delta.actions.Metadata; import org.apache.spark.sql.delta.actions.RemoveFile; import scala.Option; import scala.Some; +import scala.Tuple2; import scala.collection.JavaConverters; import scala.collection.Seq; @@ -65,6 +70,7 @@ import org.apache.xtable.model.storage.TableFormat; import org.apache.xtable.spi.sync.ConversionTarget; +@Log4j2 public class DeltaConversionTarget implements ConversionTarget { private static final String MIN_READER_VERSION = String.valueOf(1); // gets access to generated columns. @@ -215,6 +221,58 @@ public String getTableFormat() { return TableFormat.DELTA; } + @Override + public Optional getTargetCommitIdentifier(String sourceIdentifier) { + Snapshot currentSnapshot = deltaLog.currentSnapshot().snapshot(); + + Iterator>> versionIterator = + JavaConverters.asJavaIteratorConverter( + deltaLog.getChanges(currentSnapshot.version(), false)) + .asJava(); + while (versionIterator.hasNext()) { + Tuple2> currentChange = versionIterator.next(); + Long targetVersion = currentSnapshot.version(); + List actions = JavaConverters.seqAsJavaListConverter(currentChange._2()).asJava(); + + // Find the CommitInfo in the changes belongs to certain version + Optional commitInfo = + actions.stream() + .filter(action -> action instanceof CommitInfo) + .map(action -> (CommitInfo) action) + .findFirst(); + if (!commitInfo.isPresent()) { + continue; + } + + Option> tags = commitInfo.get().tags(); + if (tags.isEmpty()) { + continue; + } + + Option sourceMetadataJson = tags.get().get(TableSyncMetadata.XTABLE_METADATA); + if (sourceMetadataJson.isEmpty()) { + continue; + } + + try { + Optional optionalMetadata = + TableSyncMetadata.fromJson(sourceMetadataJson.get()); + if (!optionalMetadata.isPresent()) { + continue; + } + + TableSyncMetadata metadata = optionalMetadata.get(); + if (sourceIdentifier.equals(metadata.getSourceIdentifier())) { + return Optional.of(String.valueOf(targetVersion)); + } + } catch (Exception e) { + log.warn("Failed to parse commit metadata for commit: {}", targetVersion, e); + } + } + + return Optional.empty(); + } + @EqualsAndHashCode @ToString private class TransactionState { @@ -264,7 +322,8 @@ private void commitTransaction() { transaction.updateMetadata(metadata, false); transaction.commit( actions, - new DeltaOperations.Update(Option.apply(Literal.fromObject("xtable-delta-sync")))); + new DeltaOperations.Update(Option.apply(Literal.fromObject("xtable-delta-sync"))), + ScalaUtils.convertJavaMapToScala(getCommitTags())); } private Map getConfigurationsForDeltaSync() { @@ -300,5 +359,9 @@ private Format getFileFormat() { // fallback to existing deltalog value return deltaLog.snapshot().metadata().format(); } + + private Map getCommitTags() { + return Collections.singletonMap(TableSyncMetadata.XTABLE_METADATA, metadata.toJson()); + } } } diff --git a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSource.java b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSource.java index 02423c2d6..cb65c3411 100644 --- a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSource.java +++ b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSource.java @@ -114,6 +114,7 @@ public InternalSnapshot getCurrentSnapshot() { hoodieInstant -> HudiInstantUtils.parseFromInstantTime(hoodieInstant.getTimestamp())) .collect(CustomCollectors.toList(pendingInstants.size()))) + .sourceIdentifier(getCommitIdentifier(latestCommit)) .build(); } @@ -130,6 +131,7 @@ public TableChange getTableChangeForCommit(HoodieInstant hoodieInstantForDiff) { .filesDiff( dataFileExtractor.getDiffForCommit( hoodieInstantForDiff, table, hoodieInstantForDiff, visibleTimeline)) + .sourceIdentifier(getCommitIdentifier(hoodieInstantForDiff)) .build(); } @@ -161,6 +163,11 @@ public boolean isIncrementalSyncSafeFrom(Instant instant) { return doesCommitExistsAsOfInstant(instant) && !isAffectedByCleanupProcess(instant); } + @Override + public String getCommitIdentifier(HoodieInstant commit) { + return commit.getTimestamp(); + } + private boolean doesCommitExistsAsOfInstant(Instant instant) { HoodieInstant hoodieInstant = getCommitAtInstant(instant); return hoodieInstant != null; diff --git a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionTarget.java b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionTarget.java index c3ef6f922..97c2f9e36 100644 --- a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionTarget.java +++ b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionTarget.java @@ -54,13 +54,13 @@ import org.apache.hudi.common.model.HoodieCleaningPolicy; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieFileGroup; -import org.apache.hudi.common.model.HoodieReplaceCommitMetadata; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; +import org.apache.hudi.common.table.timeline.TimelineUtils; import org.apache.hudi.common.table.view.TableFileSystemView; import org.apache.hudi.common.util.CleanerUtils; import org.apache.hudi.common.util.ExternalFilePathUtil; @@ -287,28 +287,7 @@ public Optional getTableMetadata() { .filterCompletedInstants() .lastInstant() .toJavaOptional() - .map( - instant -> { - try { - if (instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) { - return HoodieReplaceCommitMetadata.fromBytes( - client.getActiveTimeline().getInstantDetails(instant).get(), - HoodieReplaceCommitMetadata.class) - .getExtraMetadata(); - } else { - return HoodieCommitMetadata.fromBytes( - client.getActiveTimeline().getInstantDetails(instant).get(), - HoodieCommitMetadata.class) - .getExtraMetadata(); - } - } catch (IOException ex) { - throw new ReadException("Unable to read Hudi commit metadata", ex); - } - }) - .flatMap( - metadata -> - TableSyncMetadata.fromJson( - metadata.get(TableSyncMetadata.XTABLE_METADATA)))); + .flatMap(instant -> getMetadata(instant, client))); } @Override @@ -316,11 +295,56 @@ public String getTableFormat() { return TableFormat.HUDI; } + @Override + public Optional getTargetCommitIdentifier(String sourceIdentifier) { + if (!metaClient.isPresent()) { + return Optional.empty(); + } + return getTargetCommitIdentifier(sourceIdentifier, metaClient.get()); + } + + Optional getTargetCommitIdentifier( + String sourceIdentifier, HoodieTableMetaClient metaClient) { + + HoodieTimeline commitTimeline = metaClient.getCommitsTimeline(); + + for (HoodieInstant instant : commitTimeline.getInstants()) { + try { + Optional optionalMetadata = getMetadata(instant, metaClient); + if (!optionalMetadata.isPresent()) { + continue; + } + + TableSyncMetadata metadata = optionalMetadata.get(); + if (sourceIdentifier.equals(metadata.getSourceIdentifier())) { + return Optional.of(instant.getTimestamp()); + } + } catch (Exception e) { + log.warn("Failed to parse commit metadata for instant: {}", instant, e); + } + } + return Optional.empty(); + } + private HoodieTableMetaClient getMetaClient() { return metaClient.orElseThrow( () -> new IllegalStateException("beginSync must be called before calling this method")); } + private Optional getMetadata( + HoodieInstant instant, HoodieTableMetaClient metaClient) { + try { + HoodieCommitMetadata commitMetadata = + TimelineUtils.getCommitMetadata(instant, metaClient.getActiveTimeline()); + String sourceMetadataJson = + commitMetadata.getExtraMetadata().get(TableSyncMetadata.XTABLE_METADATA); + return TableSyncMetadata.fromJson(sourceMetadataJson); + } catch (Exception e) { + log.warn("Failed to parse commit metadata for instant: {}", instant, e); + return Optional.empty(); + } + } + static class CommitState { private HoodieTableMetaClient metaClient; @Getter private final String instantTime; diff --git a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java index 0d400e28b..cb75b6d06 100644 --- a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java +++ b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java @@ -165,6 +165,7 @@ public InternalSnapshot getCurrentSnapshot() { .version(String.valueOf(currentSnapshot.snapshotId())) .table(irTable) .partitionedDataFiles(partitionedDataFiles) + .sourceIdentifier(getCommitIdentifier(currentSnapshot)) .build(); } @@ -196,7 +197,11 @@ public TableChange getTableChangeForCommit(Snapshot snapshot) { DataFilesDiff.builder().filesAdded(dataFilesAdded).filesRemoved(dataFilesRemoved).build(); InternalTable table = getTable(snapshot); - return TableChange.builder().tableAsOfChange(table).filesDiff(filesDiff).build(); + return TableChange.builder() + .tableAsOfChange(table) + .filesDiff(filesDiff) + .sourceIdentifier(getCommitIdentifier(snapshot)) + .build(); } @Override @@ -265,6 +270,11 @@ public boolean isIncrementalSyncSafeFrom(Instant instant) { return true; } + @Override + public String getCommitIdentifier(Snapshot commit) { + return String.valueOf(commit.snapshotId()); + } + @Override public void close() { getTableOps().close(); diff --git a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java index ecdbfa261..69faf1eaf 100644 --- a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java +++ b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java @@ -21,6 +21,7 @@ import java.time.Instant; import java.time.temporal.ChronoUnit; import java.util.List; +import java.util.Map; import java.util.Optional; import lombok.extern.log4j.Log4j2; @@ -66,6 +67,7 @@ public class IcebergConversionTarget implements ConversionTarget { private Transaction transaction; private Table table; private InternalTable internalTableState; + private TableSyncMetadata tableSyncMetadata; public IcebergConversionTarget() {} @@ -174,6 +176,8 @@ public void syncPartitionSpec(List partitionSpec) { @Override public void syncMetadata(TableSyncMetadata metadata) { + tableSyncMetadata = metadata; + UpdateProperties updateProperties = transaction.updateProperties(); updateProperties.set(TableSyncMetadata.XTABLE_METADATA, metadata.toJson()); if (!table.properties().containsKey(TableProperties.WRITE_DATA_LOCATION)) { @@ -200,13 +204,18 @@ public void syncFilesForSnapshot(List partitionedDataFiles) transaction, partitionedDataFiles, transaction.table().schema(), - transaction.table().spec()); + transaction.table().spec(), + tableSyncMetadata); } @Override public void syncFilesForDiff(DataFilesDiff dataFilesDiff) { dataFileUpdatesExtractor.applyDiff( - transaction, dataFilesDiff, transaction.table().schema(), transaction.table().spec()); + transaction, + dataFilesDiff, + transaction.table().schema(), + transaction.table().spec(), + tableSyncMetadata); } @Override @@ -221,6 +230,7 @@ public void completeSync() { transaction.commitTransaction(); transaction = null; internalTableState = null; + tableSyncMetadata = null; } private void safeDelete(String file) { @@ -242,6 +252,33 @@ public String getTableFormat() { return TableFormat.ICEBERG; } + @Override + public Optional getTargetCommitIdentifier(String sourceIdentifier) { + for (Snapshot snapshot : table.snapshots()) { + Map summary = snapshot.summary(); + String sourceMetadataJson = summary.get(TableSyncMetadata.XTABLE_METADATA); + if (sourceMetadataJson == null) { + continue; + } + + try { + Optional optionalMetadata = + TableSyncMetadata.fromJson(sourceMetadataJson); + if (!optionalMetadata.isPresent()) { + continue; + } + + TableSyncMetadata metadata = optionalMetadata.get(); + if (sourceIdentifier.equals(metadata.getSourceIdentifier())) { + return Optional.of(String.valueOf(snapshot.snapshotId())); + } + } catch (Exception e) { + log.warn("Failed to parse parse snapshot metadata for {}", snapshot.snapshotId(), e); + } + } + return Optional.empty(); + } + private void rollbackCorruptCommits() { if (table == null) { // there is no existing table so exit early diff --git a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergDataFileUpdatesSync.java b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergDataFileUpdatesSync.java index 80e1559fb..0c8fa9a49 100644 --- a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergDataFileUpdatesSync.java +++ b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergDataFileUpdatesSync.java @@ -30,6 +30,7 @@ import org.apache.xtable.exception.NotSupportedException; import org.apache.xtable.exception.ReadException; import org.apache.xtable.model.InternalTable; +import org.apache.xtable.model.metadata.TableSyncMetadata; import org.apache.xtable.model.storage.DataFilesDiff; import org.apache.xtable.model.storage.FilesDiff; import org.apache.xtable.model.storage.InternalDataFile; @@ -46,7 +47,8 @@ public void applySnapshot( Transaction transaction, List partitionedDataFiles, Schema schema, - PartitionSpec partitionSpec) { + PartitionSpec partitionSpec, + TableSyncMetadata metadata) { Map previousFiles = new HashMap<>(); try (CloseableIterable iterator = table.newScan().planFiles()) { @@ -60,21 +62,24 @@ public void applySnapshot( FilesDiff diff = DataFilesDiff.findNewAndRemovedFiles(partitionedDataFiles, previousFiles); - applyDiff(transaction, diff.getFilesAdded(), diff.getFilesRemoved(), schema, partitionSpec); + applyDiff( + transaction, diff.getFilesAdded(), diff.getFilesRemoved(), schema, partitionSpec, metadata); } public void applyDiff( Transaction transaction, DataFilesDiff dataFilesDiff, Schema schema, - PartitionSpec partitionSpec) { + PartitionSpec partitionSpec, + TableSyncMetadata metadata) { Collection filesRemoved = dataFilesDiff.getFilesRemoved().stream() .map(file -> getDataFile(partitionSpec, schema, file)) .collect(Collectors.toList()); - applyDiff(transaction, dataFilesDiff.getFilesAdded(), filesRemoved, schema, partitionSpec); + applyDiff( + transaction, dataFilesDiff.getFilesAdded(), filesRemoved, schema, partitionSpec, metadata); } private void applyDiff( @@ -82,10 +87,12 @@ private void applyDiff( Collection filesAdded, Collection filesRemoved, Schema schema, - PartitionSpec partitionSpec) { + PartitionSpec partitionSpec, + TableSyncMetadata metadata) { OverwriteFiles overwriteFiles = transaction.newOverwrite(); filesAdded.forEach(f -> overwriteFiles.addFile(getDataFile(partitionSpec, schema, f))); filesRemoved.forEach(overwriteFiles::deleteFile); + overwriteFiles.set(TableSyncMetadata.XTABLE_METADATA, metadata.toJson()); overwriteFiles.commit(); } diff --git a/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java b/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java index 0f34103e8..9bf8f0103 100644 --- a/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java +++ b/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java @@ -94,7 +94,7 @@ public class TestConversionController { void testAllSnapshotSyncAsPerConfig() { SyncMode syncMode = SyncMode.FULL; InternalTable internalTable = getInternalTable(); - InternalSnapshot internalSnapshot = buildSnapshot(internalTable, "v1"); + InternalSnapshot internalSnapshot = buildSnapshot(internalTable, "v1", "0"); Instant instantBeforeHour = Instant.now().minus(Duration.ofHours(1)); SyncResult syncResult = buildSyncResult(syncMode, instantBeforeHour); Map perTableResults = new HashMap<>(); @@ -172,16 +172,19 @@ void testAllIncrementalSyncAsPerConfigAndNoFallbackNecessary() { CommitsBacklog commitsBacklog = CommitsBacklog.builder().commitsToProcess(instantsToProcess).build(); Optional conversionTarget1Metadata = - Optional.of(TableSyncMetadata.of(icebergLastSyncInstant, pendingInstantsForIceberg)); + Optional.of( + TableSyncMetadata.of(icebergLastSyncInstant, pendingInstantsForIceberg, "TEST", "0")); when(mockConversionTarget1.getTableMetadata()).thenReturn(conversionTarget1Metadata); Optional conversionTarget2Metadata = - Optional.of(TableSyncMetadata.of(deltaLastSyncInstant, pendingInstantsForDelta)); + Optional.of( + TableSyncMetadata.of(deltaLastSyncInstant, pendingInstantsForDelta, "TEST", "0")); when(mockConversionTarget2.getTableMetadata()).thenReturn(conversionTarget2Metadata); when(mockConversionSource.getCommitsBacklog(instantsForIncrementalSync)) .thenReturn(commitsBacklog); List tableChanges = new ArrayList<>(); - for (Instant instant : instantsToProcess) { - TableChange tableChange = getTableChange(instant); + for (int i = 0; i < instantsToProcess.size(); i++) { + Instant instant = instantsToProcess.get(i); + TableChange tableChange = getTableChange(instant, String.valueOf(i)); tableChanges.add(tableChange); when(mockConversionSource.getTableChangeForCommit(instant)).thenReturn(tableChange); } @@ -224,7 +227,7 @@ void testIncrementalSyncFallBackToSnapshotForAllFormats() { SyncMode syncMode = SyncMode.INCREMENTAL; InternalTable internalTable = getInternalTable(); Instant instantBeforeHour = Instant.now().minus(Duration.ofHours(1)); - InternalSnapshot internalSnapshot = buildSnapshot(internalTable, "v1"); + InternalSnapshot internalSnapshot = buildSnapshot(internalTable, "v1", "0"); SyncResult syncResult = buildSyncResult(syncMode, instantBeforeHour); Map syncResults = new HashMap<>(); syncResults.put(TableFormat.ICEBERG, syncResult); @@ -247,9 +250,11 @@ void testIncrementalSyncFallBackToSnapshotForAllFormats() { // Both Iceberg and Delta last synced at instantAt5 and have no pending instants. when(mockConversionTarget1.getTableMetadata()) - .thenReturn(Optional.of(TableSyncMetadata.of(instantAt5, Collections.emptyList()))); + .thenReturn( + Optional.of(TableSyncMetadata.of(instantAt5, Collections.emptyList(), "TEST", "0"))); when(mockConversionTarget2.getTableMetadata()) - .thenReturn(Optional.of(TableSyncMetadata.of(instantAt5, Collections.emptyList()))); + .thenReturn( + Optional.of(TableSyncMetadata.of(instantAt5, Collections.emptyList(), "TEST", "0"))); when(mockConversionSource.getCurrentSnapshot()).thenReturn(internalSnapshot); when(tableFormatSync.syncSnapshot( @@ -309,22 +314,26 @@ void testIncrementalSyncFallbackToSnapshotForOnlySingleFormat() { CommitsBacklog.builder().commitsToProcess(instantsToProcess).build(); when(mockConversionTarget1.getTableMetadata()) .thenReturn( - Optional.of(TableSyncMetadata.of(icebergLastSyncInstant, pendingInstantsForIceberg))); + Optional.of( + TableSyncMetadata.of( + icebergLastSyncInstant, pendingInstantsForIceberg, "TEST", "0"))); Optional conversionTarget2Metadata = - Optional.of(TableSyncMetadata.of(deltaLastSyncInstant, pendingInstantsForDelta)); + Optional.of( + TableSyncMetadata.of(deltaLastSyncInstant, pendingInstantsForDelta, "TEST", "0")); when(mockConversionTarget2.getTableMetadata()).thenReturn(conversionTarget2Metadata); when(mockConversionSource.getCommitsBacklog(instantsForIncrementalSync)) .thenReturn(commitsBacklog); List tableChanges = new ArrayList<>(); - for (Instant instant : instantsToProcess) { - TableChange tableChange = getTableChange(instant); + for (int i = 0; i < instantsToProcess.size(); i++) { + Instant instant = instantsToProcess.get(i); + TableChange tableChange = getTableChange(instant, String.valueOf(i)); tableChanges.add(tableChange); when(mockConversionSource.getTableChangeForCommit(instant)).thenReturn(tableChange); } // Iceberg needs to sync by snapshot since instant15 is affected by table clean-up. InternalTable internalTable = getInternalTable(); Instant instantBeforeHour = Instant.now().minus(Duration.ofHours(1)); - InternalSnapshot internalSnapshot = buildSnapshot(internalTable, "v1"); + InternalSnapshot internalSnapshot = buildSnapshot(internalTable, "v1", "0"); SyncResult syncResult = buildSyncResult(syncMode, instantBeforeHour); Map snapshotResult = Collections.singletonMap(TableFormat.ICEBERG, syncResult); @@ -388,10 +397,12 @@ void incrementalSyncWithNoPendingInstantsForAllFormats() { CommitsBacklog commitsBacklog = CommitsBacklog.builder().commitsToProcess(instantsToProcess).build(); Optional conversionTarget1Metadata = - Optional.of(TableSyncMetadata.of(icebergLastSyncInstant, Collections.emptyList())); + Optional.of( + TableSyncMetadata.of(icebergLastSyncInstant, Collections.emptyList(), "TEST", "0")); when(mockConversionTarget1.getTableMetadata()).thenReturn(conversionTarget1Metadata); Optional conversionTarget2Metadata = - Optional.of(TableSyncMetadata.of(deltaLastSyncInstant, Collections.emptyList())); + Optional.of( + TableSyncMetadata.of(deltaLastSyncInstant, Collections.emptyList(), "TEST", "0")); when(mockConversionTarget2.getTableMetadata()).thenReturn(conversionTarget2Metadata); when(mockConversionSource.getCommitsBacklog(instantsForIncrementalSync)) .thenReturn(commitsBacklog); @@ -426,7 +437,7 @@ void testNoTableFormatConversionWithMultipleCatalogSync() { List targetCatalogs = Arrays.asList(getTargetCatalog("1"), getTargetCatalog("2")); InternalTable internalTable = getInternalTable(); - InternalSnapshot internalSnapshot = buildSnapshot(internalTable, "v1"); + InternalSnapshot internalSnapshot = buildSnapshot(internalTable, "v1", "0"); // Conversion source and target mocks. ConversionConfig conversionConfig = getTableSyncConfig( @@ -513,8 +524,11 @@ private List buildSyncResults(List instantList) { .collect(Collectors.toList()); } - private TableChange getTableChange(Instant instant) { - return TableChange.builder().tableAsOfChange(getInternalTable(instant)).build(); + private TableChange getTableChange(Instant instant, String sourceIdentifier) { + return TableChange.builder() + .tableAsOfChange(getInternalTable(instant)) + .sourceIdentifier(sourceIdentifier) + .build(); } private SyncResult buildSyncResult(SyncMode syncMode, Instant lastSyncedInstant) { @@ -536,8 +550,13 @@ private SyncResult buildSyncResult( .build(); } - private InternalSnapshot buildSnapshot(InternalTable internalTable, String version) { - return InternalSnapshot.builder().table(internalTable).version(version).build(); + private InternalSnapshot buildSnapshot( + InternalTable internalTable, String version, String sourceIdentifier) { + return InternalSnapshot.builder() + .table(internalTable) + .version(version) + .sourceIdentifier(sourceIdentifier) + .build(); } private InternalTable getInternalTable() { diff --git a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSync.java b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSync.java index f0f889d25..7e921efe5 100644 --- a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSync.java +++ b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSync.java @@ -19,6 +19,7 @@ package org.apache.xtable.delta; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.parallel.ExecutionMode.SAME_THREAD; @@ -36,6 +37,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Random; import java.util.Set; import java.util.UUID; @@ -81,6 +83,7 @@ import org.apache.xtable.conversion.TargetTable; import org.apache.xtable.model.InternalSnapshot; import org.apache.xtable.model.InternalTable; +import org.apache.xtable.model.metadata.TableSyncMetadata; import org.apache.xtable.model.schema.InternalField; import org.apache.xtable.model.schema.InternalPartitionField; import org.apache.xtable.model.schema.InternalSchema; @@ -159,8 +162,8 @@ public void testCreateSnapshotControlFlow() throws Exception { InternalDataFile dataFile2 = getDataFile(2, Collections.emptyList(), basePath); InternalDataFile dataFile3 = getDataFile(3, Collections.emptyList(), basePath); - InternalSnapshot snapshot1 = buildSnapshot(table1, dataFile1, dataFile2); - InternalSnapshot snapshot2 = buildSnapshot(table2, dataFile2, dataFile3); + InternalSnapshot snapshot1 = buildSnapshot(table1, "0", dataFile1, dataFile2); + InternalSnapshot snapshot2 = buildSnapshot(table2, "1", dataFile2, dataFile3); TableFormatSync.getInstance() .syncSnapshot(Collections.singletonList(conversionTarget), snapshot1); @@ -214,7 +217,7 @@ public void testPrimitiveFieldPartitioning() throws Exception { EqualTo equalToExpr = new EqualTo(new Column("string_field", new StringType()), Literal.of("warning")); - InternalSnapshot snapshot1 = buildSnapshot(table, dataFile1, dataFile2, dataFile3); + InternalSnapshot snapshot1 = buildSnapshot(table, "0", dataFile1, dataFile2, dataFile3); TableFormatSync.getInstance() .syncSnapshot(Collections.singletonList(conversionTarget), snapshot1); @@ -294,7 +297,7 @@ public void testMultipleFieldPartitioning() throws Exception { EqualTo equalToExpr2 = new EqualTo(new Column("int_field", new IntegerType()), Literal.of(20)); And CombinedExpr = new And(equalToExpr1, equalToExpr2); - InternalSnapshot snapshot1 = buildSnapshot(table, dataFile1, dataFile2, dataFile3); + InternalSnapshot snapshot1 = buildSnapshot(table, "0", dataFile1, dataFile2, dataFile3); TableFormatSync.getInstance() .syncSnapshot(Collections.singletonList(conversionTarget), snapshot1); validateDeltaTable(basePath, new HashSet<>(Arrays.asList(dataFile2)), CombinedExpr); @@ -337,7 +340,7 @@ public void testTimestampPartitioning(PartitionTransformType transformType) thro InternalDataFile dataFile2 = getDataFile(2, partitionValues1, basePath); InternalDataFile dataFile3 = getDataFile(3, partitionValues2, basePath); - InternalSnapshot snapshot1 = buildSnapshot(table, dataFile1, dataFile2, dataFile3); + InternalSnapshot snapshot1 = buildSnapshot(table, "0", dataFile1, dataFile2, dataFile3); TableFormatSync.getInstance() .syncSnapshot(Collections.singletonList(conversionTarget), snapshot1); @@ -367,6 +370,67 @@ public void testTimestampPartitioning(PartitionTransformType transformType) thro .contains(String.format("xtable_partition_col_%s_timestamp_field", transformType))); } + @Test + public void testSourceTargetIdMapping() throws Exception { + InternalSchema baseSchema = getInternalSchema(); + InternalTable sourceTable = + getInternalTable("source_table", basePath, baseSchema, null, LAST_COMMIT_TIME); + + InternalDataFile sourceDataFile1 = getDataFile(101, Collections.emptyList(), basePath); + InternalDataFile sourceDataFile2 = getDataFile(102, Collections.emptyList(), basePath); + InternalDataFile sourceDataFile3 = getDataFile(103, Collections.emptyList(), basePath); + + InternalSnapshot sourceSnapshot1 = + buildSnapshot(sourceTable, "0", sourceDataFile1, sourceDataFile2); + InternalSnapshot sourceSnapshot2 = + buildSnapshot(sourceTable, "1", sourceDataFile2, sourceDataFile3); + + TableFormatSync.getInstance() + .syncSnapshot(Collections.singletonList(conversionTarget), sourceSnapshot1); + Optional mappedTargetId1 = + conversionTarget.getTargetCommitIdentifier(sourceSnapshot1.getSourceIdentifier()); + validateDeltaTable( + basePath, new HashSet<>(Arrays.asList(sourceDataFile1, sourceDataFile2)), null); + assertTrue(mappedTargetId1.isPresent()); + assertEquals("0", mappedTargetId1.get()); + + TableFormatSync.getInstance() + .syncSnapshot(Collections.singletonList(conversionTarget), sourceSnapshot2); + Optional mappedTargetId2 = + conversionTarget.getTargetCommitIdentifier(sourceSnapshot2.getSourceIdentifier()); + validateDeltaTable( + basePath, new HashSet<>(Arrays.asList(sourceDataFile2, sourceDataFile3)), null); + assertTrue(mappedTargetId2.isPresent()); + assertEquals("1", mappedTargetId2.get()); + + Optional unmappedTargetId = conversionTarget.getTargetCommitIdentifier("s3"); + assertFalse(unmappedTargetId.isPresent()); + } + + @Test + public void testGetTargetCommitIdentifierWithNullSourceIdentifier() throws Exception { + InternalSchema baseSchema = getInternalSchema(); + InternalTable internalTable = + getInternalTable("source_table", basePath, baseSchema, null, LAST_COMMIT_TIME); + InternalDataFile sourceDataFile = getDataFile(101, Collections.emptyList(), basePath); + InternalSnapshot snapshot = buildSnapshot(internalTable, "0", sourceDataFile); + + // Mock the snapshot sync process like getSyncResult() + conversionTarget.beginSync(internalTable); + TableSyncMetadata tableSyncMetadata = + TableSyncMetadata.of(internalTable.getLatestCommitTime(), snapshot.getPendingCommits()); + conversionTarget.syncMetadata(tableSyncMetadata); + conversionTarget.syncSchema(internalTable.getReadSchema()); + conversionTarget.syncPartitionSpec(internalTable.getPartitioningFields()); + conversionTarget.syncFilesForSnapshot(snapshot.getPartitionedDataFiles()); + conversionTarget.completeSync(); + + // No crash should happen during the process + Optional unmappedTargetId = conversionTarget.getTargetCommitIdentifier("0"); + // The targetIdentifier is expected to not be found + assertFalse(unmappedTargetId.isPresent()); + } + private static Stream timestampPartitionTestingArgs() { return Stream.of( Arguments.of(PartitionTransformType.YEAR), @@ -408,10 +472,12 @@ private void validateDeltaTable( internalDataFiles.size(), count, "Number of files from DeltaScan don't match expectation"); } - private InternalSnapshot buildSnapshot(InternalTable table, InternalDataFile... dataFiles) { + private InternalSnapshot buildSnapshot( + InternalTable table, String sourceIdentifier, InternalDataFile... dataFiles) { return InternalSnapshot.builder() .table(table) .partitionedDataFiles(PartitionFileGroup.fromFiles(Arrays.asList(dataFiles))) + .sourceIdentifier(sourceIdentifier) .build(); } diff --git a/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionTarget.java b/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionTarget.java index 03bb6e2ca..82125d25a 100644 --- a/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionTarget.java +++ b/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionTarget.java @@ -22,6 +22,8 @@ import static org.apache.xtable.hudi.HudiTestUtil.getHoodieWriteConfig; import static org.apache.xtable.hudi.HudiTestUtil.initTableAndGetMetaClient; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; import java.nio.file.Path; import java.time.Duration; @@ -32,6 +34,7 @@ import java.util.Comparator; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.TimeZone; import java.util.UUID; import java.util.stream.Collectors; @@ -204,7 +207,8 @@ void syncForExistingTable() { targetClient.syncFilesForDiff(dataFilesDiff); targetClient.syncSchema(SCHEMA); TableSyncMetadata latestState = - TableSyncMetadata.of(initialState.getLatestCommitTime(), Collections.emptyList()); + TableSyncMetadata.of( + initialState.getLatestCommitTime(), Collections.emptyList(), "TEST", "0"); targetClient.syncMetadata(latestState); targetClient.completeSync(); @@ -243,7 +247,8 @@ void syncForNewTable() { targetClient.beginSync(initialState); targetClient.syncFilesForSnapshot(snapshot); TableSyncMetadata latestState = - TableSyncMetadata.of(initialState.getLatestCommitTime(), Collections.emptyList()); + TableSyncMetadata.of( + initialState.getLatestCommitTime(), Collections.emptyList(), "TEST", "0"); targetClient.syncSchema(initialState.getReadSchema()); targetClient.syncMetadata(latestState); targetClient.completeSync(); @@ -288,7 +293,8 @@ void archiveTimelineAndCleanMetadataTableAfterMultipleCommits(String partitionPa targetClient.beginSync(initialState); targetClient.syncFilesForSnapshot(snapshot); TableSyncMetadata latestState = - TableSyncMetadata.of(initialState.getLatestCommitTime(), Collections.emptyList()); + TableSyncMetadata.of( + initialState.getLatestCommitTime(), Collections.emptyList(), "TEST", "0"); targetClient.syncMetadata(latestState); targetClient.syncSchema(initialState.getReadSchema()); targetClient.completeSync(); @@ -311,7 +317,8 @@ CONTEXT, getHoodieWriteConfig(metaClient).getMetadataConfig(), tableBasePath, tr targetClient, Collections.singletonList(getTestFile(partitionPath, fileName2)), Collections.singletonList(getTestFile(partitionPath, fileName1)), - Instant.now().minus(12, ChronoUnit.HOURS)); + Instant.now().minus(12, ChronoUnit.HOURS), + "1"); assertFileGroupCorrectness( metaClient, partitionPath, Arrays.asList(file0Pair, Pair.of(fileName2, filePath2))); @@ -331,7 +338,9 @@ CONTEXT, getHoodieWriteConfig(metaClient).getMetadataConfig(), tableBasePath, tr targetClient, Collections.singletonList(getTestFile(partitionPath, fileName3)), Collections.singletonList(getTestFile(partitionPath, fileName2)), - Instant.now().minus(8, ChronoUnit.HOURS)); + Instant.now().minus(8, ChronoUnit.HOURS), + "2"); + System.out.println(metaClient.getCommitsTimeline().lastInstant().get().getTimestamp()); // create a commit that just adds fileName4 String fileName4 = "file_4.parquet"; @@ -340,7 +349,9 @@ CONTEXT, getHoodieWriteConfig(metaClient).getMetadataConfig(), tableBasePath, tr targetClient, Collections.singletonList(getTestFile(partitionPath, fileName4)), Collections.emptyList(), - Instant.now()); + Instant.now(), + "3"); + System.out.println(metaClient.getCommitsTimeline().lastInstant().get().getTimestamp()); // create another commit that should trigger archival of the first two commits String fileName5 = "file_5.parquet"; @@ -349,7 +360,9 @@ CONTEXT, getHoodieWriteConfig(metaClient).getMetadataConfig(), tableBasePath, tr targetClient, Collections.singletonList(getTestFile(partitionPath, fileName5)), Collections.emptyList(), - Instant.now()); + Instant.now(), + "4"); + System.out.println(metaClient.getCommitsTimeline().lastInstant().get().getTimestamp()); assertFileGroupCorrectness( metaClient, @@ -372,18 +385,147 @@ CONTEXT, getHoodieWriteConfig(metaClient).getMetadataConfig(), tableBasePath, tr 2, metaClient.getArchivedTimeline().reload().filterCompletedInstants().countInstants()); } + @ParameterizedTest + @ValueSource(strings = {"partition_path", ""}) + void testSourceTargetMappingWithSnapshotAndIncrementalSync(String partitionPath) { + // Step 1: Initialize Test Files for Initial Snapshot + String fileName0 = "file_0.parquet"; + String fileName1 = "file_1.parquet"; + + List initialSnapshot = + Collections.singletonList( + PartitionFileGroup.builder() + .files( + Arrays.asList( + getTestFile(partitionPath, fileName0), + getTestFile(partitionPath, fileName1))) + .partitionValues( + Collections.singletonList( + PartitionValue.builder() + .partitionField(PARTITION_FIELD) + .range(Range.scalar("partitionPath")) + .build())) + .build()); + + // Step 2: Sync Initial Snapshot + InternalTable initialState = getState(Instant.now().minus(24, ChronoUnit.HOURS)); + HudiConversionTarget targetClient = getTargetClient(); + targetClient.beginSync(initialState); + targetClient.syncFilesForSnapshot(initialSnapshot); + TableSyncMetadata latestState = + TableSyncMetadata.of( + initialState.getLatestCommitTime(), Collections.emptyList(), "TEST", "0"); + targetClient.syncMetadata(latestState); + targetClient.syncSchema(initialState.getReadSchema()); + targetClient.completeSync(); + + // Step 3: Verify Source-Target Mapping for Initial Snapshot + HoodieTableMetaClient metaClient = + HoodieTableMetaClient.builder().setConf(CONFIGURATION).setBasePath(tableBasePath).build(); + + Optional initialTargetIdentifier = + targetClient.getTargetCommitIdentifier(latestState.getSourceIdentifier(), metaClient); + assertTrue(initialTargetIdentifier.isPresent()); + assertEquals( + initialTargetIdentifier.get(), + metaClient.getCommitsTimeline().lastInstant().get().getTimestamp()); + + // Step 4: Perform Incremental Sync (Remove file1, Add file2) + String fileName2 = "file_2.parquet"; + incrementalSync( + targetClient, + Collections.singletonList(getTestFile(partitionPath, fileName2)), // Adding file2 + Collections.singletonList(getTestFile(partitionPath, fileName1)), // Removing file1 + Instant.now().minus(12, ChronoUnit.HOURS), + "1"); // Incremental commit ID = "1" + + // Step 5: Verify Source-Target Mapping for Incremental Sync + metaClient.reloadActiveTimeline(); + Optional incrementalTargetIdentifier = + targetClient.getTargetCommitIdentifier("1", metaClient); + assertTrue(incrementalTargetIdentifier.isPresent()); + assertEquals( + incrementalTargetIdentifier.get(), + metaClient.getCommitsTimeline().lastInstant().get().getTimestamp()); + + // Step 6: Perform Another Incremental Sync (Remove file2, Add file3) + String fileName3 = "file_3.parquet"; + incrementalSync( + targetClient, + Collections.singletonList(getTestFile(partitionPath, fileName3)), // Adding file3 + Collections.singletonList(getTestFile(partitionPath, fileName2)), // Removing file2 + Instant.now().minus(8, ChronoUnit.HOURS), + "2"); // Incremental commit ID = "2" + + // Step 7: Verify Source-Target Mapping for Second Incremental Sync + metaClient.reloadActiveTimeline(); + Optional incrementalTargetIdentifier2 = + targetClient.getTargetCommitIdentifier("2", metaClient); + assertTrue(incrementalTargetIdentifier2.isPresent()); + assertEquals( + incrementalTargetIdentifier2.get(), + metaClient.getCommitsTimeline().lastInstant().get().getTimestamp()); + + // Step 8: Verify Non-Existent Source ID Returns Empty + Optional nonExistentTargetIdentifier = + targetClient.getTargetCommitIdentifier("3", metaClient); + assertFalse(nonExistentTargetIdentifier.isPresent()); + } + + @ParameterizedTest + @ValueSource(strings = {"partition_path", ""}) + void testGetTargetCommitIdentifierWithNullSourceIdentifier(String partitionPath) { + // Initialize Test Files and Snapshot + String fileName0 = "file_0.parquet"; + String fileName1 = "file_1.parquet"; + + List initialSnapshot = + Collections.singletonList( + PartitionFileGroup.builder() + .files( + Arrays.asList( + getTestFile(partitionPath, fileName0), + getTestFile(partitionPath, fileName1))) + .partitionValues( + Collections.singletonList( + PartitionValue.builder() + .partitionField(PARTITION_FIELD) + .range(Range.scalar("partitionPath")) + .build())) + .build()); + InternalTable internalTable = getState(Instant.now().minus(24, ChronoUnit.HOURS)); + HudiConversionTarget targetClient = getTargetClient(); + + targetClient.beginSync(internalTable); + targetClient.syncFilesForSnapshot(initialSnapshot); + TableSyncMetadata tableSyncMetadata = + TableSyncMetadata.of(internalTable.getLatestCommitTime(), Collections.emptyList()); + targetClient.syncMetadata(tableSyncMetadata); + targetClient.syncSchema(internalTable.getReadSchema()); + targetClient.completeSync(); + + HoodieTableMetaClient metaClient = + HoodieTableMetaClient.builder().setConf(CONFIGURATION).setBasePath(tableBasePath).build(); + // No crash should happen during the process + Optional targetIdentifier = targetClient.getTargetCommitIdentifier("0", metaClient); + // The targetIdentifier is expected to not be found + assertFalse(targetIdentifier.isPresent()); + } + private TableSyncMetadata incrementalSync( ConversionTarget conversionTarget, List filesToAdd, List filesToRemove, - Instant commitStart) { + Instant commitStart, + String sourceIdentifier) { DataFilesDiff dataFilesDiff2 = DataFilesDiff.builder().filesAdded(filesToAdd).filesRemoved(filesToRemove).build(); InternalTable state3 = getState(commitStart); conversionTarget.beginSync(state3); conversionTarget.syncFilesForDiff(dataFilesDiff2); TableSyncMetadata latestState = - TableSyncMetadata.of(state3.getLatestCommitTime(), Collections.emptyList()); + TableSyncMetadata.of( + state3.getLatestCommitTime(), Collections.emptyList(), "TEST", sourceIdentifier); conversionTarget.syncMetadata(latestState); conversionTarget.completeSync(); return latestState; diff --git a/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiConversionTarget.java b/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiConversionTarget.java index fe20db054..da1ec0338 100644 --- a/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiConversionTarget.java +++ b/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiConversionTarget.java @@ -141,7 +141,8 @@ void syncMetadata() { HudiConversionTarget targetClient = getTargetClient(null); HudiConversionTarget.CommitState mockCommitState = initMocksForBeginSync(targetClient).getLeft(); - TableSyncMetadata metadata = TableSyncMetadata.of(COMMIT_TIME, Collections.emptyList()); + TableSyncMetadata metadata = + TableSyncMetadata.of(COMMIT_TIME, Collections.emptyList(), "TEST", "0"); targetClient.syncMetadata(metadata); // validate that metadata is set in commitState verify(mockCommitState).setTableSyncMetadata(metadata); diff --git a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java index bd36dde91..c02d7f268 100644 --- a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java +++ b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java @@ -227,8 +227,8 @@ public void testCreateSnapshotControlFlow() throws Exception { InternalDataFile dataFile1 = getDataFile(1, Collections.emptyList()); InternalDataFile dataFile2 = getDataFile(2, Collections.emptyList()); InternalDataFile dataFile3 = getDataFile(3, Collections.emptyList()); - InternalSnapshot snapshot1 = buildSnapshot(table1, dataFile1, dataFile2); - InternalSnapshot snapshot2 = buildSnapshot(table2, dataFile2, dataFile3); + InternalSnapshot snapshot1 = buildSnapshot(table1, "0", dataFile1, dataFile2); + InternalSnapshot snapshot2 = buildSnapshot(table2, "1", dataFile2, dataFile3); when(mockSchemaExtractor.toIceberg(internalSchema)).thenReturn(icebergSchema); when(mockSchemaExtractor.toIceberg(schema2)).thenReturn(icebergSchema2); ArgumentCaptor partitionSpecSchemaArgumentCaptor = @@ -322,9 +322,9 @@ public void testIncompleteWriteRollback() throws Exception { InternalDataFile dataFile2 = getDataFile(2, Collections.emptyList()); InternalDataFile dataFile3 = getDataFile(3, Collections.emptyList()); InternalDataFile dataFile4 = getDataFile(4, Collections.emptyList()); - InternalSnapshot snapshot1 = buildSnapshot(table1, dataFile1, dataFile2); - InternalSnapshot snapshot2 = buildSnapshot(table2, dataFile2, dataFile3); - InternalSnapshot snapshot3 = buildSnapshot(table2, dataFile3, dataFile4); + InternalSnapshot snapshot1 = buildSnapshot(table1, "0", dataFile1, dataFile2); + InternalSnapshot snapshot2 = buildSnapshot(table2, "1", dataFile2, dataFile3); + InternalSnapshot snapshot3 = buildSnapshot(table2, "2", dataFile3, dataFile4); when(mockSchemaExtractor.toIceberg(internalSchema)).thenReturn(icebergSchema); when(mockSchemaExtractor.toIceberg(schema2)).thenReturn(icebergSchema2); ArgumentCaptor partitionSpecSchemaArgumentCaptor = @@ -398,7 +398,7 @@ public void testTimestampPartitioning() throws Exception { InternalDataFile dataFile1 = getDataFile(1, partitionValues1); InternalDataFile dataFile2 = getDataFile(2, partitionValues1); InternalDataFile dataFile3 = getDataFile(3, partitionValues2); - InternalSnapshot snapshot = buildSnapshot(table, dataFile1, dataFile2, dataFile3); + InternalSnapshot snapshot = buildSnapshot(table, "0", dataFile1, dataFile2, dataFile3); when(mockSchemaExtractor.toIceberg(internalSchema)) .thenReturn(icebergSchema) @@ -461,7 +461,7 @@ public void testDatePartitioning() throws Exception { InternalDataFile dataFile1 = getDataFile(1, partitionValues1); InternalDataFile dataFile2 = getDataFile(2, partitionValues1); InternalDataFile dataFile3 = getDataFile(3, partitionValues2); - InternalSnapshot snapshot = buildSnapshot(table, dataFile1, dataFile2, dataFile3); + InternalSnapshot snapshot = buildSnapshot(table, "0", dataFile1, dataFile2, dataFile3); when(mockSchemaExtractor.toIceberg(internalSchema)).thenReturn(icebergSchema); PartitionSpec partitionSpec = @@ -515,7 +515,7 @@ public void testNumericFieldPartitioning() throws Exception { InternalDataFile dataFile1 = getDataFile(1, partitionValues1); InternalDataFile dataFile2 = getDataFile(2, partitionValues1); InternalDataFile dataFile3 = getDataFile(3, partitionValues2); - InternalSnapshot snapshot = buildSnapshot(table, dataFile1, dataFile2, dataFile3); + InternalSnapshot snapshot = buildSnapshot(table, "0", dataFile1, dataFile2, dataFile3); when(mockSchemaExtractor.toIceberg(internalSchema)).thenReturn(icebergSchema); PartitionSpec partitionSpec = @@ -590,7 +590,7 @@ public void testMultipleFieldPartitioning() throws Exception { InternalDataFile dataFile1 = getDataFile(1, partitionValues1); InternalDataFile dataFile2 = getDataFile(2, partitionValues2); InternalDataFile dataFile3 = getDataFile(3, partitionValues3); - InternalSnapshot snapshot = buildSnapshot(table, dataFile1, dataFile2, dataFile3); + InternalSnapshot snapshot = buildSnapshot(table, "0", dataFile1, dataFile2, dataFile3); when(mockSchemaExtractor.toIceberg(internalSchema)).thenReturn(icebergSchema); PartitionSpec partitionSpec = @@ -656,7 +656,7 @@ public void testNestedFieldPartitioning() throws Exception { InternalDataFile dataFile1 = getDataFile(1, partitionValues1); InternalDataFile dataFile2 = getDataFile(2, partitionValues1); InternalDataFile dataFile3 = getDataFile(3, partitionValues2); - InternalSnapshot snapshot = buildSnapshot(table, dataFile1, dataFile2, dataFile3); + InternalSnapshot snapshot = buildSnapshot(table, "0", dataFile1, dataFile2, dataFile3); when(mockSchemaExtractor.toIceberg(internalSchema)).thenReturn(icebergSchema); PartitionSpec partitionSpec = @@ -681,10 +681,111 @@ public void testNestedFieldPartitioning() throws Exception { Expressions.equal(partitionField.getSourceField().getPath(), "value1")); } - private InternalSnapshot buildSnapshot(InternalTable table, InternalDataFile... dataFiles) { + @Test + public void testSourceTargetMapping() throws Exception { + // Prepare schemas + List fields2 = new ArrayList<>(internalSchema.getFields()); + fields2.add( + InternalField.builder() + .name("long_field") + .schema(InternalSchema.builder().name("long").dataType(InternalType.LONG).build()) + .build()); + InternalSchema schema2 = internalSchema.toBuilder().fields(fields2).build(); + List fields = new ArrayList<>(icebergSchema.columns()); + fields.add(Types.NestedField.of(6, false, "long_field", Types.LongType.get())); + Schema icebergSchema2 = new Schema(fields); + + InternalTable table1 = + getInternalTable(tableName, basePath, internalSchema, null, LAST_COMMIT_TIME); + InternalTable table2 = getInternalTable(tableName, basePath, schema2, null, LAST_COMMIT_TIME); + + // Create data files and snapshots + InternalDataFile dataFile1 = getDataFile(1, Collections.emptyList()); + InternalDataFile dataFile2 = getDataFile(2, Collections.emptyList()); + InternalDataFile dataFile3 = getDataFile(3, Collections.emptyList()); + InternalSnapshot snapshot1 = buildSnapshot(table1, "0", dataFile1, dataFile2); + InternalSnapshot snapshot2 = buildSnapshot(table2, "1", dataFile2, dataFile3); + + when(mockSchemaExtractor.toIceberg(internalSchema)).thenReturn(icebergSchema); + when(mockSchemaExtractor.toIceberg(schema2)).thenReturn(icebergSchema2); + when(mockPartitionSpecExtractor.toIceberg(eq(null), any())) + .thenReturn(PartitionSpec.unpartitioned()); + + mockColStatsForFile(dataFile1, 2); + mockColStatsForFile(dataFile2, 2); + mockColStatsForFile(dataFile3, 1); + + // Sync first snapshot + TableFormatSync.getInstance() + .syncSnapshot(Collections.singletonList(conversionTarget), snapshot1); + Optional targetIdentifier1 = + conversionTarget.getTargetCommitIdentifier(snapshot1.getSourceIdentifier()); + assertTrue(targetIdentifier1.isPresent()); + assertEquals( + targetIdentifier1.get(), String.valueOf(getTable(basePath).currentSnapshot().snapshotId())); + + // Sync second snapshot + TableFormatSync.getInstance() + .syncSnapshot(Collections.singletonList(conversionTarget), snapshot2); + Optional targetIdentifier2 = + conversionTarget.getTargetCommitIdentifier(snapshot2.getSourceIdentifier()); + assertTrue(targetIdentifier2.isPresent()); + assertEquals( + targetIdentifier2.get(), String.valueOf(getTable(basePath).currentSnapshot().snapshotId())); + + // Validate that an unknown source ID returns empty + Optional emptyTargetIdentifier = conversionTarget.getTargetCommitIdentifier("999"); + assertFalse(emptyTargetIdentifier.isPresent()); + } + + @Test + public void testGetTargetCommitIdentifierWithNullSourceIdentifier() { + // Prepare schemas + List fields2 = new ArrayList<>(internalSchema.getFields()); + fields2.add( + InternalField.builder() + .name("long_field") + .schema(InternalSchema.builder().name("long").dataType(InternalType.LONG).build()) + .build()); + InternalSchema schema2 = internalSchema.toBuilder().fields(fields2).build(); + List fields = new ArrayList<>(icebergSchema.columns()); + fields.add(Types.NestedField.of(6, false, "long_field", Types.LongType.get())); + Schema icebergSchema2 = new Schema(fields); + + InternalTable internalTable = + getInternalTable(tableName, basePath, internalSchema, null, LAST_COMMIT_TIME); + + // Create data files and snapshots + InternalDataFile dataFile1 = getDataFile(1, Collections.emptyList()); + InternalSnapshot snapshot = buildSnapshot(internalTable, "0", dataFile1); + when(mockSchemaExtractor.toIceberg(internalSchema)).thenReturn(icebergSchema); + when(mockSchemaExtractor.toIceberg(schema2)).thenReturn(icebergSchema2); + when(mockPartitionSpecExtractor.toIceberg(eq(null), any())) + .thenReturn(PartitionSpec.unpartitioned()); + mockColStatsForFile(dataFile1, 2); + + // Mock the snapshot sync process like getSyncResult() + conversionTarget.beginSync(internalTable); + TableSyncMetadata tableSyncMetadata = + TableSyncMetadata.of(internalTable.getLatestCommitTime(), snapshot.getPendingCommits()); + conversionTarget.syncMetadata(tableSyncMetadata); + conversionTarget.syncSchema(internalTable.getReadSchema()); + conversionTarget.syncPartitionSpec(internalTable.getPartitioningFields()); + conversionTarget.syncFilesForSnapshot(snapshot.getPartitionedDataFiles()); + conversionTarget.completeSync(); + + // No crash should happen during the process + Optional targetIdentifier = conversionTarget.getTargetCommitIdentifier("0"); + // The targetIdentifier is expected to not be found + assertFalse(targetIdentifier.isPresent()); + } + + private InternalSnapshot buildSnapshot( + InternalTable table, String sourceIdentifier, InternalDataFile... dataFiles) { return InternalSnapshot.builder() .table(table) .partitionedDataFiles(PartitionFileGroup.fromFiles(Arrays.asList(dataFiles))) + .sourceIdentifier(sourceIdentifier) .build(); }