Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.List;

import lombok.Builder;
import lombok.NonNull;
import lombok.Value;

import org.apache.xtable.model.storage.PartitionFileGroup;
Expand All @@ -47,4 +48,6 @@ public class InternalSnapshot {
List<PartitionFileGroup> partitionedDataFiles;
// pending commits before latest commit on the table.
@Builder.Default List<Instant> pendingCommits = Collections.emptyList();
// commit identifier in source table
@NonNull String sourceIdentifier;
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.xtable.model;

import lombok.Builder;
import lombok.NonNull;
import lombok.Value;

import org.apache.xtable.model.storage.DataFilesDiff;
Expand All @@ -36,4 +37,7 @@ public class TableChange {

/** The {@link InternalTable} at the commit time to which this table change belongs. */
InternalTable tableAsOfChange;

// Commit identifier in source table
@NonNull String sourceIdentifier;
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,30 @@ public class TableSyncMetadata {
Instant lastInstantSynced;
List<Instant> instantsToConsiderForNextSync;
int version;
String sourceTableFormat;
String sourceIdentifier;

/**
* @deprecated Use {@link #of(Instant, List, String, String)} instead. This method exists for
* backward compatibility and will be removed in a future version.
*/
@Deprecated
public static TableSyncMetadata of(
Instant lastInstantSynced, List<Instant> instantsToConsiderForNextSync) {
return new TableSyncMetadata(lastInstantSynced, instantsToConsiderForNextSync, CURRENT_VERSION);
return TableSyncMetadata.of(lastInstantSynced, instantsToConsiderForNextSync, null, null);
}

public static TableSyncMetadata of(
Instant lastInstantSynced,
List<Instant> instantsToConsiderForNextSync,
String sourceTableFormat,
String sourceIdentifier) {
return new TableSyncMetadata(
lastInstantSynced,
instantsToConsiderForNextSync,
CURRENT_VERSION,
sourceTableFormat,
sourceIdentifier);
}

public String toJson() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,4 +87,18 @@ public interface ConversionSource<COMMIT> extends Closeable {
* false.
*/
boolean isIncrementalSyncSafeFrom(Instant instant);

/**
* Extract the identifier of the provided commit. The identifier is defined as:
*
* <ul>
* <li>Snapshot ID in Iceberg
* <li>Version ID in Delta
* <li>Timestamp in Hudi
* </ul>
*
* @param commit The provided commit
* @return the string version of the commit identifier
*/
String getCommitIdentifier(COMMIT commit);
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,14 @@ public interface ConversionTarget {

/** Initializes the client with provided configuration */
void init(TargetTable targetTable, Configuration configuration);

/**
* Retrieves the commit identifier from the target table that corresponds to a given source table
* commit identifier
Comment on lines +95 to +96
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will these identifier's always be the same? if so, is it simpler to make this a boolean method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will these identifier's always be the same? if so, is it simpler to make this a boolean method?

Yes the source identifier stored in the target table remains unchanged. However, this API is designed to retrieve the corresponding target COMMIT based on an input source identifier (It would be the rollback source COMMIT in this feature). This allows us to initiate a rollback to the retrieved target COMMIT on the target table

*
* @param sourceIdentifier the unique identifier of the source table commit
* @return an {@link Optional} containing the target commit identifier if a corresponding commit
* exists, or an empty {@link Optional} if no match is found
*/
Optional<String> getTargetCommitIdentifier(String sourceIdentifier);
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ public Map<String, SyncResult> syncSnapshot(
internalTable,
target -> target.syncFilesForSnapshot(snapshot.getPartitionedDataFiles()),
startTime,
snapshot.getPendingCommits()));
snapshot.getPendingCommits(),
snapshot.getSourceIdentifier()));
} catch (Exception e) {
log.error("Failed to sync snapshot", e);
results.put(
Expand Down Expand Up @@ -121,7 +122,8 @@ public Map<String, List<SyncResult>> syncChanges(
change.getTableAsOfChange(),
target -> target.syncFilesForDiff(change.getFilesDiff()),
startTime,
changes.getPendingCommits()));
changes.getPendingCommits(),
change.getSourceIdentifier()));
} catch (Exception e) {
log.error("Failed to sync table changes", e);
resultsForFormat.add(buildResultForError(SyncMode.INCREMENTAL, startTime, e));
Expand Down Expand Up @@ -149,19 +151,26 @@ private SyncResult getSyncResult(
InternalTable tableState,
SyncFiles fileSyncMethod,
Instant startTime,
List<Instant> pendingCommits) {
List<Instant> pendingCommits,
String sourceIdentifier) {
// initialize the sync
conversionTarget.beginSync(tableState);
// Persist the latest commit time in table properties for incremental syncs
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here We need to move the Metadata set operation earlier because it will be required during the sync() operation in Iceberg (Delta and Hudi only need it in completeSync()

// Syncing metadata must precede the following steps to ensure that the metadata is available
// before committing
TableSyncMetadata latestState =
TableSyncMetadata.of(
tableState.getLatestCommitTime(),
pendingCommits,
tableState.getTableFormat(),
sourceIdentifier);
conversionTarget.syncMetadata(latestState);
// sync schema updates
conversionTarget.syncSchema(tableState.getReadSchema());
// sync partition updates
conversionTarget.syncPartitionSpec(tableState.getPartitioningFields());
// Update the files in the target table
fileSyncMethod.sync(conversionTarget);
// Persist the latest commit time in table properties for incremental syncs.
TableSyncMetadata latestState =
TableSyncMetadata.of(tableState.getLatestCommitTime(), pendingCommits);
conversionTarget.syncMetadata(latestState);
conversionTarget.completeSync();

return SyncResult.builder()
Expand Down
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New data and test case added for backward compatibility

Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ void jsonRoundTrip(TableSyncMetadata metadata, String expectedJson) {

private static Stream<Arguments> provideMetadataAndJson() {
return Stream.of(
// Old version of metadata and JSON
Arguments.of(
TableSyncMetadata.of(
Instant.parse("2020-07-04T10:15:30.00Z"),
Expand All @@ -56,7 +57,24 @@ private static Stream<Arguments> provideMetadataAndJson() {
"{\"lastInstantSynced\":\"2020-07-04T10:15:30Z\",\"instantsToConsiderForNextSync\":[],\"version\":0}"),
Arguments.of(
TableSyncMetadata.of(Instant.parse("2020-07-04T10:15:30.00Z"), null),
"{\"lastInstantSynced\":\"2020-07-04T10:15:30Z\",\"version\":0}"));
"{\"lastInstantSynced\":\"2020-07-04T10:15:30Z\",\"version\":0}"),
// New version of metadata and JSON with `sourceTableFormat` and `sourceIdentifier` fields
Arguments.of(
TableSyncMetadata.of(
Instant.parse("2020-07-04T10:15:30.00Z"),
Arrays.asList(
Instant.parse("2020-08-21T11:15:30.00Z"),
Instant.parse("2024-01-21T12:15:30.00Z")),
"TEST",
"0"),
"{\"lastInstantSynced\":\"2020-07-04T10:15:30Z\",\"instantsToConsiderForNextSync\":[\"2020-08-21T11:15:30Z\",\"2024-01-21T12:15:30Z\"],\"version\":0,\"sourceTableFormat\":\"TEST\",\"sourceIdentifier\":\"0\"}"),
Arguments.of(
TableSyncMetadata.of(
Instant.parse("2020-07-04T10:15:30.00Z"), Collections.emptyList(), "TEST", "0"),
"{\"lastInstantSynced\":\"2020-07-04T10:15:30Z\",\"instantsToConsiderForNextSync\":[],\"version\":0,\"sourceTableFormat\":\"TEST\",\"sourceIdentifier\":\"0\"}"),
Arguments.of(
TableSyncMetadata.of(Instant.parse("2020-07-04T10:15:30.00Z"), null, "TEST", "0"),
"{\"lastInstantSynced\":\"2020-07-04T10:15:30Z\",\"version\":0,\"sourceTableFormat\":\"TEST\",\"sourceIdentifier\":\"0\"}"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,11 @@ public void extractSnapshot() {
InternalTable table = InternalTable.builder().latestCommitTime(Instant.now()).build();
List<PartitionFileGroup> dataFiles = Collections.emptyList();
InternalSnapshot internalSnapshot =
InternalSnapshot.builder().table(table).partitionedDataFiles(dataFiles).build();
InternalSnapshot.builder()
.table(table)
.partitionedDataFiles(dataFiles)
.sourceIdentifier("0")
.build();
when(mockConversionSource.getCurrentSnapshot()).thenReturn(internalSnapshot);
assertEquals(internalSnapshot, ExtractFromSource.of(mockConversionSource).extractSnapshot());
}
Expand Down Expand Up @@ -86,6 +90,7 @@ public void extractTableChanges() {
.tableAsOfChange(tableAtFirstInstant)
.filesDiff(
DataFilesDiff.builder().fileAdded(newFile1).fileRemoved(initialFile2).build())
.sourceIdentifier("0")
.build();
when(mockConversionSource.getTableChangeForCommit(firstCommitToSync))
.thenReturn(tableChangeToReturnAtFirstInstant);
Expand All @@ -94,6 +99,7 @@ public void extractTableChanges() {
.tableAsOfChange(tableAtFirstInstant)
.filesDiff(
DataFilesDiff.builder().fileAdded(newFile1).fileRemoved(initialFile2).build())
.sourceIdentifier("0")
.build();

// add 2 new files, remove 2 files
Expand All @@ -110,6 +116,7 @@ public void extractTableChanges() {
.filesAdded(Arrays.asList(newFile2, newFile3))
.filesRemoved(Arrays.asList(initialFile3, newFile1))
.build())
.sourceIdentifier("1")
.build();
when(mockConversionSource.getTableChangeForCommit(secondCommitToSync))
.thenReturn(tableChangeToReturnAtSecondInstant);
Expand All @@ -121,6 +128,7 @@ public void extractTableChanges() {
.filesAdded(Arrays.asList(newFile2, newFile3))
.filesRemoved(Arrays.asList(initialFile3, newFile1))
.build())
.sourceIdentifier("1")
.build();

IncrementalTableChanges actual =
Expand Down
Loading