Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -246,31 +246,24 @@ public CommitsBacklog<Snapshot> getCommitsBacklog(InstantsForIncrementalSync las
public boolean isIncrementalSyncSafeFrom(Instant instant) {
long timeInMillis = instant.toEpochMilli();
Table iceTable = getSourceTable();
boolean doesInstantOfAgeExists = false;
Long targetSnapshotId = null;
for (Snapshot snapshot : iceTable.snapshots()) {
if (snapshot.timestampMillis() <= timeInMillis) {
doesInstantOfAgeExists = true;
targetSnapshotId = snapshot.snapshotId();
} else {
break;
Snapshot currentSnapshot = iceTable.currentSnapshot();

while (currentSnapshot != null && currentSnapshot.timestampMillis() > timeInMillis) {
Long parentSnapshotId = currentSnapshot.parentId();
if (parentSnapshotId == null) {
// no more snapshots in the chain and did not find targetSnapshot
return false;
}
}
if (!doesInstantOfAgeExists) {
return false;
}
// Go from latest snapshot until targetSnapshotId through parent reference.
// nothing has to be null in this chain to guarantee safety of incremental sync.
Long currentSnapshotId = iceTable.currentSnapshot().snapshotId();
while (currentSnapshotId != null && currentSnapshotId != targetSnapshotId) {
Snapshot currentSnapshot = iceTable.snapshot(currentSnapshotId);
if (currentSnapshot == null) {
// The snapshot is expired.

Snapshot parentSnapshot = iceTable.snapshot(parentSnapshotId);
if (parentSnapshot == null) {
// chain is broken due to expired snapshot
log.info("Expired snapshot id: {}", parentSnapshotId);
return false;
}
currentSnapshotId = currentSnapshot.parentId();
currentSnapshot = parentSnapshot;
}
return true;
return currentSnapshot != null;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ public String getFilterQuery() {
return String.format("%s > 'aaa'", icebergDataHelper.getRecordKeyField());
}

public Long getLastCommitTimestamp() {
public long getLastCommitTimestamp() {
return getLatestSnapshot().timestampMillis();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ public void testInsertsUpsertsAndDeletes(boolean isPartitioned) {
List<TableChange> allTableChanges = new ArrayList<>();

testIcebergTable.insertRows(50);
Long timestamp1 = testIcebergTable.getLastCommitTimestamp();
long timestamp1 = testIcebergTable.getLastCommitTimestamp();
allActiveFiles.add(testIcebergTable.getAllActiveFiles());

List<Record> records1 = testIcebergTable.insertRows(50);
Expand Down Expand Up @@ -204,7 +204,7 @@ public void testDropPartition() {
List<TableChange> allTableChanges = new ArrayList<>();

List<Record> records1 = testIcebergTable.insertRows(50);
Long timestamp1 = testIcebergTable.getLastCommitTimestamp();
long timestamp1 = testIcebergTable.getLastCommitTimestamp();
allActiveFiles.add(testIcebergTable.getAllActiveFiles());

List<Record> records2 = testIcebergTable.insertRows(50);
Expand Down Expand Up @@ -264,7 +264,7 @@ public void testDeleteAllRecordsInPartition() {
List<TableChange> allTableChanges = new ArrayList<>();

List<Record> records1 = testIcebergTable.insertRows(50);
Long timestamp1 = testIcebergTable.getLastCommitTimestamp();
long timestamp1 = testIcebergTable.getLastCommitTimestamp();
allActiveFiles.add(testIcebergTable.getAllActiveFiles());

List<Record> records2 = testIcebergTable.insertRows(50);
Expand Down Expand Up @@ -325,7 +325,7 @@ public void testExpireSnapshots(boolean isPartitioned) throws InterruptedExcepti
List<TableChange> allTableChanges = new ArrayList<>();

List<Record> records1 = testIcebergTable.insertRows(50);
Long timestamp1 = testIcebergTable.getLastCommitTimestamp();
long timestamp1 = testIcebergTable.getLastCommitTimestamp();

testIcebergTable.upsertRows(records1.subList(0, 20));
allActiveFiles.add(testIcebergTable.getAllActiveFiles());
Expand Down Expand Up @@ -383,8 +383,8 @@ public void testForIncrementalSyncSafetyCheck(boolean shouldExpireSnapshots) {
TestIcebergTable.forStandardSchemaAndPartitioning(
tableName, "level", tempDir, hadoopConf)) {
// Insert 50 rows to INFO partition.
List<Record> commit1Rows = testIcebergTable.insertRecordsForPartition(50, "INFO");
Long timestamp1 = testIcebergTable.getLastCommitTimestamp();
List<Record> firstCommitRows = testIcebergTable.insertRecordsForPartition(50, "INFO");
long timestampAfterFirstCommit = testIcebergTable.getLastCommitTimestamp();
SourceTable tableConfig =
SourceTable.builder()
.name(testIcebergTable.getTableName())
Expand All @@ -393,23 +393,42 @@ public void testForIncrementalSyncSafetyCheck(boolean shouldExpireSnapshots) {
.build();

// Upsert all rows inserted before, so all files are replaced.
testIcebergTable.upsertRows(commit1Rows.subList(0, 50));
long snapshotIdAfterCommit2 = testIcebergTable.getLatestSnapshot().snapshotId();
testIcebergTable.upsertRows(firstCommitRows.subList(0, 50));
long timestampAfterSecondCommit = testIcebergTable.getLastCommitTimestamp();
long snapshotIdAfterSecondCommit = testIcebergTable.getLatestSnapshot().snapshotId();

// Insert 50 rows to different (ERROR) partition.
testIcebergTable.insertRecordsForPartition(50, "ERROR");
long timestampAfterThirdCommit = testIcebergTable.getLastCommitTimestamp();

if (shouldExpireSnapshots) {
// Expire snapshotAfterCommit2.
testIcebergTable.expireSnapshot(snapshotIdAfterCommit2);
testIcebergTable.expireSnapshot(snapshotIdAfterSecondCommit);
}
IcebergConversionSource conversionSource =
sourceProvider.getConversionSourceInstance(tableConfig);
if (shouldExpireSnapshots) {
assertFalse(conversionSource.isIncrementalSyncSafeFrom(Instant.ofEpochMilli(timestamp1)));
// Since the second snapshot is expired, we cannot safely perform incremental sync from the
// first two commits
assertFalse(
conversionSource.isIncrementalSyncSafeFrom(
Instant.ofEpochMilli(timestampAfterFirstCommit)));
assertFalse(
conversionSource.isIncrementalSyncSafeFrom(
Instant.ofEpochMilli(timestampAfterSecondCommit)));
} else {
assertTrue(conversionSource.isIncrementalSyncSafeFrom(Instant.ofEpochMilli(timestamp1)));
// The full history is still present so incremental sync is safe from any of these commits
assertTrue(
conversionSource.isIncrementalSyncSafeFrom(
Instant.ofEpochMilli(timestampAfterFirstCommit)));
assertTrue(
conversionSource.isIncrementalSyncSafeFrom(
Instant.ofEpochMilli(timestampAfterSecondCommit)));
}
// Table always has the last commit so incremental sync is safe
assertTrue(
conversionSource.isIncrementalSyncSafeFrom(
Instant.ofEpochMilli(timestampAfterThirdCommit)));
// Table doesn't have instant of this older commit, hence it is not safe.
Instant instantAsOfHourAgo = Instant.now().minus(1, ChronoUnit.HOURS);
assertFalse(conversionSource.isIncrementalSyncSafeFrom(instantAsOfHourAgo));
Expand Down