diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStep.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStep.java index 13fb5d07b3..9744978623 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStep.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStep.java @@ -17,15 +17,21 @@ package org.apache.gobblin.data.management.copy.iceberg; +import java.io.DataInputStream; import java.io.IOException; import java.time.Duration; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.Properties; import java.util.concurrent.TimeUnit; import java.util.concurrent.ExecutionException; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.iceberg.DataFile; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.util.SerializationUtil; @@ -41,6 +47,8 @@ import lombok.extern.slf4j.Slf4j; import org.apache.gobblin.commit.CommitStep; +import org.apache.gobblin.runtime.JobState; +import org.apache.gobblin.util.HadoopUtils; import org.apache.gobblin.util.retry.RetryerFactory; import static org.apache.gobblin.util.retry.RetryerFactory.RETRY_INTERVAL_MS; @@ -60,7 +68,7 @@ public class IcebergOverwritePartitionsStep implements CommitStep { private final String destTableIdStr; private final Properties properties; // Data files are kept as a list of base64 encoded strings for optimised de-serialization. - private final List base64EncodedDataFiles; + private final Path base64EncodedDataFilesPath; private final String partitionColName; private final String partitionValue; public static final String OVERWRITE_PARTITIONS_RETRYER_CONFIG_PREFIX = IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + @@ -73,15 +81,15 @@ public class IcebergOverwritePartitionsStep implements CommitStep { /** * Constructs an {@code IcebergReplacePartitionsStep} with the specified parameters. * - * @param destTableIdStr the identifier of the destination table as a string - * @param base64EncodedDataFiles [from List] the serialized data files to be used for replacing partitions - * @param properties the properties containing configuration + * @param destTableIdStr the identifier of the destination table as a string + * @param base64EncodedDataFilesPath base path where all data files are written + * @param properties the properties containing configuration */ - public IcebergOverwritePartitionsStep(String destTableIdStr, String partitionColName, String partitionValue, List base64EncodedDataFiles, Properties properties) { + public IcebergOverwritePartitionsStep(String destTableIdStr, String partitionColName, String partitionValue, Path base64EncodedDataFilesPath, Properties properties) { this.destTableIdStr = destTableIdStr; this.partitionColName = partitionColName; this.partitionValue = partitionValue; - this.base64EncodedDataFiles = base64EncodedDataFiles; + this.base64EncodedDataFilesPath = base64EncodedDataFilesPath; this.properties = properties; } @@ -140,11 +148,29 @@ public void execute() throws IOException { } } - private List getDataFiles() { - List dataFiles = new ArrayList<>(base64EncodedDataFiles.size()); - for (String base64EncodedDataFile : base64EncodedDataFiles) { - dataFiles.add(SerializationUtil.deserializeFromBase64(base64EncodedDataFile)); + List getDataFiles() throws IOException { + JobState jobState = new JobState(this.properties); + FileSystem fs = HadoopUtils.getWriterFileSystem(jobState, 1, 0); + + List dataFiles = Collections.synchronizedList(new ArrayList<>()); + log.info("Reading base64 encoded DataFiles from HDFS path: {}", base64EncodedDataFilesPath); + RemoteIterator fileIterator = fs.listFiles(base64EncodedDataFilesPath, false); + + List filePaths = new ArrayList<>(); + while (fileIterator.hasNext()) { + filePaths.add(fileIterator.next().getPath()); } + log.info("Read {} data files path", filePaths.size()); + filePaths.parallelStream().forEach(filePath -> { + try (DataInputStream in = fs.open(filePath)) { + String encodedContent = in.readUTF(); + DataFile dataFile = SerializationUtil.deserializeFromBase64(encodedContent); + dataFiles.add(dataFile); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + log.info("Total Data files read: {}", dataFiles.size()); return dataFiles; } diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDataset.java index 658f4a265d..0814dd3307 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDataset.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDataset.java @@ -17,6 +17,7 @@ package org.apache.gobblin.data.management.copy.iceberg; +import java.io.DataOutputStream; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; @@ -51,11 +52,13 @@ import lombok.extern.slf4j.Slf4j; +import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.data.management.copy.CopyConfiguration; import org.apache.gobblin.data.management.copy.CopyEntity; import org.apache.gobblin.data.management.copy.CopyableFile; import org.apache.gobblin.data.management.copy.entities.PostPublishStep; import org.apache.gobblin.data.management.copy.CopyableDataset; +import org.apache.gobblin.runtime.JobState; import org.apache.gobblin.util.function.CheckedExceptionFunction; import org.apache.gobblin.data.management.copy.iceberg.predicates.IcebergMatchesAnyPropNamePartitionFilterPredicate; import org.apache.gobblin.data.management.copy.iceberg.predicates.IcebergPartitionFilterPredicateUtil; @@ -69,6 +72,7 @@ */ @Slf4j public class IcebergPartitionDataset extends IcebergDataset { + private static final String ICEBERG_DATAFILES_PATH = "iceberg_data_files"; // Currently hardcoded these transforms here but eventually it will depend on filter predicate implementation and can // be moved to a common place or inside each filter predicate. private static final List supportedTransforms = ImmutableList.of("identity", "truncate"); @@ -127,7 +131,7 @@ Collection generateCopyEntities(FileSystem targetFs, CopyConfigurati // Adding this check to avoid adding post publish step when there are no files to copy. List destDataFiles = new ArrayList<>(destDataFileBySrcPath.values()); if (CollectionUtils.isNotEmpty(destDataFiles)) { - copyEntities.add(createOverwritePostPublishStep(destDataFiles)); + copyEntities.add(createOverwritePostPublishStep(destDataFiles, targetFs)); } log.info("~{}~ generated {} copy entities", fileSet, copyEntities.size()); @@ -201,26 +205,40 @@ private Map calcSrcFileStatusByDestFilePath(Map destDataFiles) { - List serializedDataFiles = getBase64EncodedDataFiles(destDataFiles); + private PostPublishStep createOverwritePostPublishStep(List destDataFiles, FileSystem targetFs) { + Path serializedDataFilesDirectory = getBase64EncodedDataFilesPath(destDataFiles, targetFs); IcebergOverwritePartitionsStep icebergOverwritePartitionStep = new IcebergOverwritePartitionsStep( this.getDestIcebergTable().getTableId().toString(), this.partitionColumnName, this.partitionColValue, - serializedDataFiles, + serializedDataFilesDirectory, this.properties ); return new PostPublishStep(this.getFileSetId(), Maps.newHashMap(), icebergOverwritePartitionStep, 0); } - private List getBase64EncodedDataFiles(List destDataFiles) { - List base64EncodedDataFiles = new ArrayList<>(destDataFiles.size()); - for (DataFile dataFile : destDataFiles) { - base64EncodedDataFiles.add(SerializationUtil.serializeToBase64(dataFile)); - } - return base64EncodedDataFiles; + public static Path getWorkDirRoot(JobState jobState) { + return new Path( + new Path(jobState.getProp(ConfigurationKeys.MR_JOB_ROOT_DIR_KEY), jobState.getJobName()), + jobState.getJobId()); + } + + private Path getBase64EncodedDataFilesPath(List destDataFiles, FileSystem targetFs) { + JobState jobState = new JobState(this.properties); + Path dataFilesDirectory = new Path(getWorkDirRoot(jobState), ICEBERG_DATAFILES_PATH); + log.info("Creating data files directory at : {}", dataFilesDirectory); + destDataFiles.parallelStream().forEach(dataFile -> { + Path dataFilePath = new Path(dataFilesDirectory, UUID.randomUUID().toString()); + String encodedDataFile = SerializationUtil.serializeToBase64(dataFile); + try (DataOutputStream out = targetFs.create(dataFilePath)) { + out.writeUTF(encodedDataFile); + } catch (IOException e) { + throw new RuntimeException("Failed to serialize data file : " + dataFile.path(), e); + } + }); + return dataFilesDirectory; } private Predicate createPartitionFilterPredicate() throws IOException { diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStepTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStepTest.java index e6a626fca7..8c2ff1f33d 100644 --- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStepTest.java +++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStepTest.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.Properties; +import org.apache.hadoop.fs.Path; import org.apache.iceberg.DataFile; import org.apache.iceberg.DataFiles; import org.apache.iceberg.PartitionSpec; @@ -44,7 +45,7 @@ public class IcebergOverwritePartitionsStepTest { private IcebergTable mockIcebergTable; private IcebergCatalog mockIcebergCatalog; private Properties mockProperties; - private List base64EncodedDataFiles; + private Path base64EncodedDataFilesPath; private IcebergOverwritePartitionsStep spyIcebergOverwritePartitionsStep; @BeforeMethod @@ -53,10 +54,10 @@ public void setUp() throws IOException { mockIcebergCatalog = Mockito.mock(IcebergCatalog.class); mockProperties = new Properties(); - base64EncodedDataFiles = getEncodedDummyDataFiles(); - + base64EncodedDataFilesPath = new Path("hdfs://dummy/path/to/encoded/datafiles"); spyIcebergOverwritePartitionsStep = Mockito.spy(new IcebergOverwritePartitionsStep(destTableIdStr, - testPartitionColName, testPartitionColValue, base64EncodedDataFiles, mockProperties)); + testPartitionColName, testPartitionColValue, base64EncodedDataFilesPath, mockProperties)); + Mockito.doReturn(getEncodedDummyDataFiles()).when(spyIcebergOverwritePartitionsStep).getDataFiles(); Mockito.when(mockIcebergCatalog.openTable(Mockito.any(TableIdentifier.class))).thenReturn(mockIcebergTable); Mockito.doReturn(mockIcebergCatalog).when(spyIcebergOverwritePartitionsStep).createDestinationCatalog(); @@ -125,7 +126,7 @@ public void testExecuteWithCustomRetryConfig() throws IOException { mockProperties.setProperty(IcebergOverwritePartitionsStep.OVERWRITE_PARTITIONS_RETRYER_CONFIG_PREFIX + "." + RETRY_TIMES, Integer.toString(retryCount)); spyIcebergOverwritePartitionsStep = Mockito.spy(new IcebergOverwritePartitionsStep(destTableIdStr, - testPartitionColName, testPartitionColValue, base64EncodedDataFiles, mockProperties)); + testPartitionColName, testPartitionColValue, base64EncodedDataFilesPath, mockProperties)); Mockito.when(mockIcebergCatalog.openTable(Mockito.any(TableIdentifier.class))).thenReturn(mockIcebergTable); Mockito.doReturn(mockIcebergCatalog).when(spyIcebergOverwritePartitionsStep).createDestinationCatalog(); try {