Skip to content

Commit 80ba916

Browse files
authored
fix retry scenario for query to table materialization (#36912)
* fix retry scenario for query to table materialization * fix retry scenario for query to table materialization
1 parent 2ffd443 commit 80ba916

File tree

2 files changed

+24
-0
lines changed

2 files changed

+24
-0
lines changed

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -979,6 +979,8 @@ public void processElement(
979979
getParseFn(),
980980
getOutputCoder(),
981981
getBigQueryServices());
982+
// due to retry, table may already exist, remove it to ensure correctness
983+
querySource.removeDestinationIfExists(options.as(BigQueryOptions.class));
982984
Table queryResultTable = querySource.getTargetTable(options.as(BigQueryOptions.class));
983985

984986
BigQueryStorageTableSource<T> output =

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageQuerySource.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
*/
1818
package org.apache.beam.sdk.io.gcp.bigquery;
1919

20+
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryResourceNaming.createTempTableReference;
2021
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
2122

2223
import com.google.api.services.bigquery.model.JobStatistics;
@@ -25,6 +26,7 @@
2526
import com.google.cloud.bigquery.storage.v1.DataFormat;
2627
import java.io.IOException;
2728
import java.io.ObjectInputStream;
29+
import java.util.Optional;
2830
import java.util.concurrent.atomic.AtomicReference;
2931
import org.apache.beam.sdk.coders.Coder;
3032
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.QueryPriority;
@@ -188,4 +190,24 @@ public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
188190
protected @Nullable String getTargetTableId(BigQueryOptions options) throws Exception {
189191
return null;
190192
}
193+
194+
void removeDestinationIfExists(BigQueryOptions options) throws Exception {
195+
DatasetService datasetService = bqServices.getDatasetService(options.as(BigQueryOptions.class));
196+
String project = queryTempProject;
197+
if (project == null) {
198+
project =
199+
options.as(BigQueryOptions.class).getBigQueryProject() == null
200+
? options.as(BigQueryOptions.class).getProject()
201+
: options.as(BigQueryOptions.class).getBigQueryProject();
202+
}
203+
String tempTableID =
204+
BigQueryResourceNaming.createJobIdPrefix(
205+
options.getJobName(), stepUuid, BigQueryResourceNaming.JobType.QUERY);
206+
TableReference tempTableReference =
207+
createTempTableReference(project, tempTableID, Optional.ofNullable(queryTempDataset));
208+
Table destTable = datasetService.getTable(tempTableReference);
209+
if (destTable != null) {
210+
datasetService.deleteTable(tempTableReference);
211+
}
212+
}
191213
}

0 commit comments

Comments
 (0)