Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,9 @@
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
import org.apache.beam.sdk.io.gcp.spanner.SpannerAccessor;
Expand Down Expand Up @@ -686,11 +686,9 @@ public static PipelineResult run(Options options) {
.setCoder(FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));

PCollection<TrimmedShardedDataChangeRecord> dlqRecords =
dlqJsonStrRecords
.apply(
"Convert DLQ records to TrimmedShardedDataChangeRecord",
ParDo.of(new ConvertDlqRecordToTrimmedShardedDataChangeRecordFn()))
.setCoder(SerializableCoder.of(TrimmedShardedDataChangeRecord.class));
dlqJsonStrRecords.apply(
"Convert DLQ records to TrimmedShardedDataChangeRecord",
ParDo.of(new ConvertDlqRecordToTrimmedShardedDataChangeRecordFn()));
PCollection<TrimmedShardedDataChangeRecord> mergedRecords = null;

if (options.getFailureInjectionParameter() != null
Expand All @@ -711,13 +709,11 @@ public static PipelineResult run(Options options) {
// stream data
.apply("Reshuffle", Reshuffle.viaRandomKey())
.apply("Filteration", ParDo.of(new FilterRecordsFn(options.getFiltrationMode())))
.apply("Preprocess", ParDo.of(new PreprocessRecordsFn()))
.setCoder(SerializableCoder.of(TrimmedShardedDataChangeRecord.class));
.apply("Preprocess", ParDo.of(new PreprocessRecordsFn()));
mergedRecords =
PCollectionList.of(changeRecordsFromDB)
.and(dlqRecords)
.apply("Flatten", Flatten.pCollections())
.setCoder(SerializableCoder.of(TrimmedShardedDataChangeRecord.class));
.apply("Flatten", Flatten.pCollections());
} else {
mergedRecords = dlqRecords;
}
Expand Down Expand Up @@ -759,8 +755,7 @@ public static PipelineResult run(Options options) {
options.getSourceType()))) // currently assuming that all shards accept the
// same
.setCoder(
KvCoder.of(
VarLongCoder.of(), SerializableCoder.of(TrimmedShardedDataChangeRecord.class)))
KvCoder.of(VarLongCoder.of(), AvroCoder.of(TrimmedShardedDataChangeRecord.class)))
.apply("Reshuffle2", Reshuffle.of())
.apply(
"Write to source",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,29 +19,42 @@
import com.google.gson.annotations.SerializedName;
import java.io.Serializable;
import java.util.Objects;
import org.apache.avro.reflect.AvroEncode;
import org.apache.avro.reflect.Nullable;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.encoder.TimestampEncoding;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.Mod;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ModType;

/**
* Trimmed version of the Apache Beam DataChangeRecord class that only contains the field we need in
* this pipeline.
*/
@SuppressWarnings("initialization.fields.uninitialized") // Avro requires the default constructor
@SuppressWarnings("initialization.fields.uninitialized")
@DefaultCoder(AvroCoder.class)
public class TrimmedShardedDataChangeRecord extends java.lang.Object implements Serializable {

@AvroEncode(using = TimestampEncoding.class)
private Timestamp commitTimestamp;

private String serverTransactionId;
private String recordSequence;
private String tableName;
private Mod mod;
private ModType modType;
private long numberOfRecordsInTransaction;
private String transactionTag;
private String shard;

@Nullable private String shard;
private boolean isRetryRecord;

@SerializedName("_metadata_retry_count")
private long metadataRetryCount;

// AvroCoder requires the default constructor
public TrimmedShardedDataChangeRecord() {}

public TrimmedShardedDataChangeRecord(
com.google.cloud.Timestamp commitTimestamp,
String serverTransactionId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import com.google.cloud.Timestamp;
import com.google.cloud.teleport.v2.templates.changestream.TrimmedShardedDataChangeRecord;
import java.util.Arrays;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.Mod;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ModType;
Expand All @@ -38,10 +37,7 @@ public class PreprocessRecordsFnTest {
@Test
public void basicTest() {
PCollection<TrimmedShardedDataChangeRecord> output =
pipeline
.apply(Create.of(getDataChangeRecord()))
.apply(ParDo.of(new PreprocessRecordsFn()))
.setCoder(SerializableCoder.of(TrimmedShardedDataChangeRecord.class));
pipeline.apply(Create.of(getDataChangeRecord())).apply(ParDo.of(new PreprocessRecordsFn()));

PAssert.that(output)
.containsInAnyOrder(
Expand Down
Loading