Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,21 @@

package org.apache.gobblin.data.management.copy.iceberg;

import java.io.DataInputStream;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ExecutionException;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.util.SerializationUtil;
Expand All @@ -41,6 +47,8 @@
import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.commit.CommitStep;
import org.apache.gobblin.runtime.JobState;
import org.apache.gobblin.util.HadoopUtils;
import org.apache.gobblin.util.retry.RetryerFactory;

import static org.apache.gobblin.util.retry.RetryerFactory.RETRY_INTERVAL_MS;
Expand All @@ -60,7 +68,7 @@ public class IcebergOverwritePartitionsStep implements CommitStep {
private final String destTableIdStr;
private final Properties properties;
// Data files are kept as a list of base64 encoded strings for optimised de-serialization.
private final List<String> base64EncodedDataFiles;
private final Path base64EncodedDataFilesPath;
private final String partitionColName;
private final String partitionValue;
public static final String OVERWRITE_PARTITIONS_RETRYER_CONFIG_PREFIX = IcebergDatasetFinder.ICEBERG_DATASET_PREFIX +
Expand All @@ -73,15 +81,15 @@ public class IcebergOverwritePartitionsStep implements CommitStep {
/**
* Constructs an {@code IcebergReplacePartitionsStep} with the specified parameters.
*
* @param destTableIdStr the identifier of the destination table as a string
* @param base64EncodedDataFiles [from List<DataFiles>] the serialized data files to be used for replacing partitions
* @param properties the properties containing configuration
* @param destTableIdStr the identifier of the destination table as a string
* @param base64EncodedDataFilesPath base path where all data files are written
* @param properties the properties containing configuration
*/
public IcebergOverwritePartitionsStep(String destTableIdStr, String partitionColName, String partitionValue, List<String> base64EncodedDataFiles, Properties properties) {
public IcebergOverwritePartitionsStep(String destTableIdStr, String partitionColName, String partitionValue, Path base64EncodedDataFilesPath, Properties properties) {
this.destTableIdStr = destTableIdStr;
this.partitionColName = partitionColName;
this.partitionValue = partitionValue;
this.base64EncodedDataFiles = base64EncodedDataFiles;
this.base64EncodedDataFilesPath = base64EncodedDataFilesPath;
this.properties = properties;
}

Expand Down Expand Up @@ -140,11 +148,29 @@ public void execute() throws IOException {
}
}

private List<DataFile> getDataFiles() {
List<DataFile> dataFiles = new ArrayList<>(base64EncodedDataFiles.size());
for (String base64EncodedDataFile : base64EncodedDataFiles) {
dataFiles.add(SerializationUtil.deserializeFromBase64(base64EncodedDataFile));
List<DataFile> getDataFiles() throws IOException {
JobState jobState = new JobState(this.properties);
FileSystem fs = HadoopUtils.getWriterFileSystem(jobState, 1, 0);

List<DataFile> dataFiles = Collections.synchronizedList(new ArrayList<>());
log.info("Reading base64 encoded DataFiles from HDFS path: {}", base64EncodedDataFilesPath);
RemoteIterator<LocatedFileStatus> fileIterator = fs.listFiles(base64EncodedDataFilesPath, false);

List<Path> filePaths = new ArrayList<>();
while (fileIterator.hasNext()) {
filePaths.add(fileIterator.next().getPath());
}
log.info("Read {} data files path", filePaths.size());
filePaths.parallelStream().forEach(filePath -> {
try (DataInputStream in = fs.open(filePath)) {
String encodedContent = in.readUTF();
DataFile dataFile = SerializationUtil.deserializeFromBase64(encodedContent);
dataFiles.add(dataFile);
} catch (IOException e) {
throw new RuntimeException(e);
}
});
log.info("Total Data files read: {}", dataFiles.size());
return dataFiles;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.gobblin.data.management.copy.iceberg;

import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -51,11 +52,13 @@

import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.data.management.copy.CopyConfiguration;
import org.apache.gobblin.data.management.copy.CopyEntity;
import org.apache.gobblin.data.management.copy.CopyableFile;
import org.apache.gobblin.data.management.copy.entities.PostPublishStep;
import org.apache.gobblin.data.management.copy.CopyableDataset;
import org.apache.gobblin.runtime.JobState;
import org.apache.gobblin.util.function.CheckedExceptionFunction;
import org.apache.gobblin.data.management.copy.iceberg.predicates.IcebergMatchesAnyPropNamePartitionFilterPredicate;
import org.apache.gobblin.data.management.copy.iceberg.predicates.IcebergPartitionFilterPredicateUtil;
Expand All @@ -69,6 +72,7 @@
*/
@Slf4j
public class IcebergPartitionDataset extends IcebergDataset {
private static final String ICEBERG_DATAFILES_PATH = "iceberg_data_files";
// Currently hardcoded these transforms here but eventually it will depend on filter predicate implementation and can
// be moved to a common place or inside each filter predicate.
private static final List<String> supportedTransforms = ImmutableList.of("identity", "truncate");
Expand Down Expand Up @@ -127,7 +131,7 @@ Collection<CopyEntity> generateCopyEntities(FileSystem targetFs, CopyConfigurati
// Adding this check to avoid adding post publish step when there are no files to copy.
List<DataFile> destDataFiles = new ArrayList<>(destDataFileBySrcPath.values());
if (CollectionUtils.isNotEmpty(destDataFiles)) {
copyEntities.add(createOverwritePostPublishStep(destDataFiles));
copyEntities.add(createOverwritePostPublishStep(destDataFiles, targetFs));
}

log.info("~{}~ generated {} copy entities", fileSet, copyEntities.size());
Expand Down Expand Up @@ -201,26 +205,40 @@ private Map<Path, FileStatus> calcSrcFileStatusByDestFilePath(Map<Path, DataFile
return srcFileStatusByDestFilePath;
}

private PostPublishStep createOverwritePostPublishStep(List<DataFile> destDataFiles) {
List<String> serializedDataFiles = getBase64EncodedDataFiles(destDataFiles);
private PostPublishStep createOverwritePostPublishStep(List<DataFile> destDataFiles, FileSystem targetFs) {
Path serializedDataFilesDirectory = getBase64EncodedDataFilesPath(destDataFiles, targetFs);

IcebergOverwritePartitionsStep icebergOverwritePartitionStep = new IcebergOverwritePartitionsStep(
this.getDestIcebergTable().getTableId().toString(),
this.partitionColumnName,
this.partitionColValue,
serializedDataFiles,
serializedDataFilesDirectory,
this.properties
);

return new PostPublishStep(this.getFileSetId(), Maps.newHashMap(), icebergOverwritePartitionStep, 0);
}

private List<String> getBase64EncodedDataFiles(List<DataFile> destDataFiles) {
List<String> base64EncodedDataFiles = new ArrayList<>(destDataFiles.size());
for (DataFile dataFile : destDataFiles) {
base64EncodedDataFiles.add(SerializationUtil.serializeToBase64(dataFile));
}
return base64EncodedDataFiles;
public static Path getWorkDirRoot(JobState jobState) {
return new Path(
new Path(jobState.getProp(ConfigurationKeys.MR_JOB_ROOT_DIR_KEY), jobState.getJobName()),
jobState.getJobId());
}

private Path getBase64EncodedDataFilesPath(List<DataFile> destDataFiles, FileSystem targetFs) {
JobState jobState = new JobState(this.properties);
Path dataFilesDirectory = new Path(getWorkDirRoot(jobState), ICEBERG_DATAFILES_PATH);
log.info("Creating data files directory at : {}", dataFilesDirectory);
destDataFiles.parallelStream().forEach(dataFile -> {
Path dataFilePath = new Path(dataFilesDirectory, UUID.randomUUID().toString());
String encodedDataFile = SerializationUtil.serializeToBase64(dataFile);
try (DataOutputStream out = targetFs.create(dataFilePath)) {
out.writeUTF(encodedDataFile);
} catch (IOException e) {
throw new RuntimeException("Failed to serialize data file : " + dataFile.path(), e);
}
});
return dataFilesDirectory;
}

private Predicate<StructLike> createPartitionFilterPredicate() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.List;
import java.util.Properties;

import org.apache.hadoop.fs.Path;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.PartitionSpec;
Expand All @@ -44,7 +45,7 @@ public class IcebergOverwritePartitionsStepTest {
private IcebergTable mockIcebergTable;
private IcebergCatalog mockIcebergCatalog;
private Properties mockProperties;
private List<String> base64EncodedDataFiles;
private Path base64EncodedDataFilesPath;
private IcebergOverwritePartitionsStep spyIcebergOverwritePartitionsStep;

@BeforeMethod
Expand All @@ -53,10 +54,10 @@ public void setUp() throws IOException {
mockIcebergCatalog = Mockito.mock(IcebergCatalog.class);
mockProperties = new Properties();

base64EncodedDataFiles = getEncodedDummyDataFiles();

base64EncodedDataFilesPath = new Path("hdfs://dummy/path/to/encoded/datafiles");
spyIcebergOverwritePartitionsStep = Mockito.spy(new IcebergOverwritePartitionsStep(destTableIdStr,
testPartitionColName, testPartitionColValue, base64EncodedDataFiles, mockProperties));
testPartitionColName, testPartitionColValue, base64EncodedDataFilesPath, mockProperties));
Mockito.doReturn(getEncodedDummyDataFiles()).when(spyIcebergOverwritePartitionsStep).getDataFiles();

Mockito.when(mockIcebergCatalog.openTable(Mockito.any(TableIdentifier.class))).thenReturn(mockIcebergTable);
Mockito.doReturn(mockIcebergCatalog).when(spyIcebergOverwritePartitionsStep).createDestinationCatalog();
Expand Down Expand Up @@ -125,7 +126,7 @@ public void testExecuteWithCustomRetryConfig() throws IOException {
mockProperties.setProperty(IcebergOverwritePartitionsStep.OVERWRITE_PARTITIONS_RETRYER_CONFIG_PREFIX + "." + RETRY_TIMES,
Integer.toString(retryCount));
spyIcebergOverwritePartitionsStep = Mockito.spy(new IcebergOverwritePartitionsStep(destTableIdStr,
testPartitionColName, testPartitionColValue, base64EncodedDataFiles, mockProperties));
testPartitionColName, testPartitionColValue, base64EncodedDataFilesPath, mockProperties));
Mockito.when(mockIcebergCatalog.openTable(Mockito.any(TableIdentifier.class))).thenReturn(mockIcebergTable);
Mockito.doReturn(mockIcebergCatalog).when(spyIcebergOverwritePartitionsStep).createDestinationCatalog();
try {
Expand Down