Skip to content

Commit 716bd4a

Browse files
authored
chore: Switch to AvroCoder for Reverse replication template (#3073)
* Switch to AvroCoder * Spotless
1 parent cb16534 commit 716bd4a

File tree

3 files changed

+23
-19
lines changed

3 files changed

+23
-19
lines changed

v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDb.java

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -78,9 +78,9 @@
7878
import org.apache.beam.sdk.Pipeline;
7979
import org.apache.beam.sdk.PipelineResult;
8080
import org.apache.beam.sdk.coders.KvCoder;
81-
import org.apache.beam.sdk.coders.SerializableCoder;
8281
import org.apache.beam.sdk.coders.StringUtf8Coder;
8382
import org.apache.beam.sdk.coders.VarLongCoder;
83+
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
8484
import org.apache.beam.sdk.io.FileSystems;
8585
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
8686
import org.apache.beam.sdk.io.gcp.spanner.SpannerAccessor;
@@ -686,11 +686,9 @@ public static PipelineResult run(Options options) {
686686
.setCoder(FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
687687

688688
PCollection<TrimmedShardedDataChangeRecord> dlqRecords =
689-
dlqJsonStrRecords
690-
.apply(
691-
"Convert DLQ records to TrimmedShardedDataChangeRecord",
692-
ParDo.of(new ConvertDlqRecordToTrimmedShardedDataChangeRecordFn()))
693-
.setCoder(SerializableCoder.of(TrimmedShardedDataChangeRecord.class));
689+
dlqJsonStrRecords.apply(
690+
"Convert DLQ records to TrimmedShardedDataChangeRecord",
691+
ParDo.of(new ConvertDlqRecordToTrimmedShardedDataChangeRecordFn()));
694692
PCollection<TrimmedShardedDataChangeRecord> mergedRecords = null;
695693

696694
if (options.getFailureInjectionParameter() != null
@@ -711,13 +709,11 @@ public static PipelineResult run(Options options) {
711709
// stream data
712710
.apply("Reshuffle", Reshuffle.viaRandomKey())
713711
.apply("Filteration", ParDo.of(new FilterRecordsFn(options.getFiltrationMode())))
714-
.apply("Preprocess", ParDo.of(new PreprocessRecordsFn()))
715-
.setCoder(SerializableCoder.of(TrimmedShardedDataChangeRecord.class));
712+
.apply("Preprocess", ParDo.of(new PreprocessRecordsFn()));
716713
mergedRecords =
717714
PCollectionList.of(changeRecordsFromDB)
718715
.and(dlqRecords)
719-
.apply("Flatten", Flatten.pCollections())
720-
.setCoder(SerializableCoder.of(TrimmedShardedDataChangeRecord.class));
716+
.apply("Flatten", Flatten.pCollections());
721717
} else {
722718
mergedRecords = dlqRecords;
723719
}
@@ -759,8 +755,7 @@ public static PipelineResult run(Options options) {
759755
options.getSourceType()))) // currently assuming that all shards accept the
760756
// same
761757
.setCoder(
762-
KvCoder.of(
763-
VarLongCoder.of(), SerializableCoder.of(TrimmedShardedDataChangeRecord.class)))
758+
KvCoder.of(VarLongCoder.of(), AvroCoder.of(TrimmedShardedDataChangeRecord.class)))
764759
.apply("Reshuffle2", Reshuffle.of())
765760
.apply(
766761
"Write to source",

v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/changestream/TrimmedShardedDataChangeRecord.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,29 +19,42 @@
1919
import com.google.gson.annotations.SerializedName;
2020
import java.io.Serializable;
2121
import java.util.Objects;
22+
import org.apache.avro.reflect.AvroEncode;
23+
import org.apache.avro.reflect.Nullable;
24+
import org.apache.beam.sdk.coders.DefaultCoder;
25+
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
26+
import org.apache.beam.sdk.io.gcp.spanner.changestreams.encoder.TimestampEncoding;
2227
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.Mod;
2328
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ModType;
2429

2530
/**
2631
* Trimmed version of the Apache Beam DataChangeRecord class that only contains the field we need in
2732
* this pipeline.
2833
*/
29-
@SuppressWarnings("initialization.fields.uninitialized") // Avro requires the default constructor
34+
@SuppressWarnings("initialization.fields.uninitialized")
35+
@DefaultCoder(AvroCoder.class)
3036
public class TrimmedShardedDataChangeRecord extends java.lang.Object implements Serializable {
37+
38+
@AvroEncode(using = TimestampEncoding.class)
3139
private Timestamp commitTimestamp;
40+
3241
private String serverTransactionId;
3342
private String recordSequence;
3443
private String tableName;
3544
private Mod mod;
3645
private ModType modType;
3746
private long numberOfRecordsInTransaction;
3847
private String transactionTag;
39-
private String shard;
48+
49+
@Nullable private String shard;
4050
private boolean isRetryRecord;
4151

4252
@SerializedName("_metadata_retry_count")
4353
private long metadataRetryCount;
4454

55+
// AvroCoder requires the default constructor
56+
public TrimmedShardedDataChangeRecord() {}
57+
4558
public TrimmedShardedDataChangeRecord(
4659
com.google.cloud.Timestamp commitTimestamp,
4760
String serverTransactionId,

v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/transforms/PreprocessRecordsFnTest.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import com.google.cloud.Timestamp;
1919
import com.google.cloud.teleport.v2.templates.changestream.TrimmedShardedDataChangeRecord;
2020
import java.util.Arrays;
21-
import org.apache.beam.sdk.coders.SerializableCoder;
2221
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
2322
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.Mod;
2423
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ModType;
@@ -38,10 +37,7 @@ public class PreprocessRecordsFnTest {
3837
@Test
3938
public void basicTest() {
4039
PCollection<TrimmedShardedDataChangeRecord> output =
41-
pipeline
42-
.apply(Create.of(getDataChangeRecord()))
43-
.apply(ParDo.of(new PreprocessRecordsFn()))
44-
.setCoder(SerializableCoder.of(TrimmedShardedDataChangeRecord.class));
40+
pipeline.apply(Create.of(getDataChangeRecord())).apply(ParDo.of(new PreprocessRecordsFn()));
4541

4642
PAssert.that(output)
4743
.containsInAnyOrder(

0 commit comments

Comments
 (0)