Skip to content

Commit efb574a

Browse files
[spanner-to-sourcedb] DELETEs: Error fetching shard id column (#3024)
[b/458068271](https://b.corp.google.com/issues/458068271) [b/458070941](https://b.corp.google.com/issues/458070941) ## Problem: Whenever a row has column with NULL value, the reverse replication fails with the following error messages: 1. AssignShardId step in dataflow: ```Error fetching shard Id column: Illegal call to getter of null value``` 2. DLQ entry: ```"error_message":"No shard identified for the record"``` ## Fix: marshalSpannerValues calls getter functions to get value of the column, but these function throw a NullPointerException when the value is NULL. This is handled incorrectly and causes the above error. So the fix is to catch a NULL value at the beginning of the function itself. ## Tests: 1. Unit test updated - failed without the fix with the expected error and passed with it. 2. Integration test updated to include NULL values in the row. ### Dataflow job with container built on fixed code. Template Container built in gs://ea-functional-tests/templates/flex/Spanner_to_SourceDb Ran [dataflow job](https://pantheon.corp.google.com/dataflow/jobs/us-central1/2025-11-26_02_33_03-2924712538454216688;graphView=0?project=span-cloud-ck-testing-external&e=PangolinKitchenLaunch::PangolinKitchenEnabled&mods=logs_tg_staging&pageState=(%22dfTime%22:(%22l%22:%22dfJobMaxTime%22))) with above container. #### 1. double datatype SQL Query: column skill of type double is kept NULL, column variance of type double is non-NULL ``` INSERT INTO ut_scl_squad (ddrKey, gameSpaceId, ownerPersId, teammateIndex, teammatePersId, `variance`, lastUpdateTime) VALUES (3002, 17613, 172377253, 202, 1269841562, 1234.56, 2720290009); DELETE FROM ut_scl_squad WHERE ddrKey = 3002 AND gameSpaceId = 17613 AND ownerPersId = 172377253; ``` #### 2. timestamp datatype SQL Query: - column createdTime of type timestamp is kept non-NULL, column squadName of type varchar is NULL ``` INSERT INTO ut_showoff (ddrkey, showoffId, userId, createdTime, `count`) VALUES (2002, 2002, -285444577, TIMESTAMP('2025-01-01T12:00:00Z'), 2444035295); DELETE FROM ut_showoff WHERE ddrkey = 2002 AND showoffId = 2002; ``` - column createdTime of type timestamp is kept NULL ``` INSERT INTO ut_showoff (ddrkey, showoffId, userId) VALUES (202, 202, -28577); DELETE FROM ut_showoff WHERE ddrkey = 202 AND showoffId = 202; ``` #### 3. Reserved Keywords SQL Query: column content of type varbinary is kept NULL ``` INSERT INTO sedges (ddrkey, created, `from`, `to`, `type`, `value`, flags) VALUES (2001, 2001, 'nodeA', 'nodeB', 'edgeType1', 'edgeValue1', -1326272220); DELETE FROM sedges WHERE ddrkey = 2001 AND created = 2001; ``` #### 4. INTERLEAVING - INTERLEAVE IN PARENT ON DELETE CASCADE: on deleting parent on spanner, both parent and child records from sql were also deleted. - INTERLEAVE IN: on deleting parent on spanner, the parent row was also deleted from sql. #### 5. Regression testing: Custom sharding logic - Flow when theres no migration_shard_id column and pipeline uses ddrkey value to compute shard is not affected by this change.
1 parent a73e0f0 commit efb574a

File tree

6 files changed

+378
-16
lines changed

6 files changed

+378
-16
lines changed

v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/transforms/AssignShardIdFn.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -337,6 +337,11 @@ private Map<String, Object> fetchSpannerRecord(
337337
*/
338338
private void marshalSpannerValues(
339339
ObjectNode newValuesJsonNode, String tableName, String colName, Struct row) {
340+
if (row.isNull(colName)) {
341+
newValuesJsonNode.putNull(colName);
342+
return;
343+
}
344+
340345
// TODO(b/430495490): Add support for string arrays on Spanner side.
341346
switch (ddl.table(tableName).column(colName).type().getCode()) {
342347
case FLOAT32:

v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDbDatatypeIT.java

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,24 @@ private void writeRowsInSpanner() {
260260
.to(Value.bytes(ByteArray.copyFrom("varbinary")))
261261
.set("bit_column")
262262
.to(Value.bytes(ByteArray.copyFrom("a")))
263+
.set("null_string_column")
264+
.to((String) null)
265+
.set("null_int_column")
266+
.to((Long) null)
267+
.set("null_date_column")
268+
.to((Date) null)
269+
.set("null_float_64_column")
270+
.to((Double) null)
271+
.set("null_float_32_column")
272+
.to((Float) null)
273+
.set("null_numeric_column")
274+
.to((BigDecimal) null)
275+
.set("null_timestamp_column")
276+
.to((Timestamp) null)
277+
.set("null_blob_column")
278+
.to((ByteArray) null)
279+
.set("null_bool_column")
280+
.to((Boolean) null)
263281
.build();
264282
spannerResourceManager.write(m);
265283

@@ -423,6 +441,15 @@ private void assertReadRowInMySQL(Map<String, Object> row)
423441
() ->
424442
assertThat(row.get("varbinary_column"))
425443
.isEqualTo("varbinary".getBytes(StandardCharsets.UTF_8)),
426-
() -> assertThat(row.get("bit_column")).isEqualTo("a".getBytes(StandardCharsets.UTF_8)));
444+
() -> assertThat(row.get("bit_column")).isEqualTo("a".getBytes(StandardCharsets.UTF_8)),
445+
() -> assertThat(row.get("null_string_column")).isNull(),
446+
() -> assertThat(row.get("null_int_column")).isNull(),
447+
() -> assertThat(row.get("null_date_column")).isNull(),
448+
() -> assertThat(row.get("null_float_64_column")).isNull(),
449+
() -> assertThat(row.get("null_float_32_column")).isNull(),
450+
() -> assertThat(row.get("null_numeric_column")).isNull(),
451+
() -> assertThat(row.get("null_timestamp_column")).isNull(),
452+
() -> assertThat(row.get("null_blob_column")).isNull(),
453+
() -> assertThat(row.get("null_bool_column")).isNull());
427454
}
428455
}

v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/transforms/AssignShardIdFnTest.java

Lines changed: 29 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -106,9 +106,12 @@ private void mockSpannerReadRow() {
106106
when(mockRow.getValue("accountId")).thenReturn(Value.string("Id1"));
107107
when(mockRow.getValue("accountName")).thenReturn(Value.string("xyz"));
108108
when(mockRow.getValue("migration_shard_id")).thenReturn(Value.string("shard1"));
109+
when(mockRow.isNull("string_col_null")).thenReturn(true);
109110
when(mockRow.getValue("accountNumber")).thenReturn(Value.int64(1));
111+
when(mockRow.isNull("int_64_col_null")).thenReturn(true);
110112
when(mockRow.getValue("bytesCol"))
111113
.thenReturn(Value.bytes(ByteArray.copyFrom("GOOGLE".getBytes())));
114+
when(mockRow.isNull("bytes_col_null")).thenReturn(true);
112115
when(mockRow.getDouble("float_64_col")).thenReturn(0.5);
113116
when(mockRow.getValue("float_64_col")).thenReturn(Value.float64(0.5));
114117
when(mockRow.getDouble("float_64_col_nan")).thenReturn(Double.NaN);
@@ -119,6 +122,7 @@ private void mockSpannerReadRow() {
119122
when(mockRow.getDouble("float_64_col_neg_infinity")).thenReturn(Double.NEGATIVE_INFINITY);
120123
when(mockRow.getValue("float_64_col_neg_infinity"))
121124
.thenReturn(Value.float64(Double.NEGATIVE_INFINITY));
125+
when(mockRow.isNull("float_64_col_null")).thenReturn(true);
122126
when(mockRow.getValue("float_32_col")).thenReturn(Value.float32(0.5f));
123127
when(mockRow.getFloat("float_32_col")).thenReturn(0.5f);
124128
when(mockRow.getFloat("float_32_col_nan")).thenReturn(Float.NaN);
@@ -129,8 +133,11 @@ private void mockSpannerReadRow() {
129133
when(mockRow.getFloat("float_32_col_neg_infinity")).thenReturn(Float.NEGATIVE_INFINITY);
130134
when(mockRow.getValue("float_32_col_neg_infinity"))
131135
.thenReturn(Value.float32(Float.NEGATIVE_INFINITY));
136+
when(mockRow.isNull("float_32_col_null")).thenReturn(true);
132137
when(mockRow.getBoolean("bool_col")).thenReturn(true);
133138
when(mockRow.getValue("bool_col")).thenReturn(Value.bool(true));
139+
when(mockRow.isNull("bool_col_null")).thenReturn(true);
140+
when(mockRow.isNull("timestamp_col_null")).thenReturn(true);
134141

135142
// Mock readRow
136143
when(mockReadOnlyTransaction.readRow(eq("tableName"), any(Key.class), any(Iterable.class)))
@@ -224,7 +231,7 @@ public void testProcessElementInsertModForMultiShard() throws Exception {
224231
assignShardIdFn.processElement(processContext);
225232

226233
String newValuesJson =
227-
"{\"float_32_col_nan\":\"NaN\",\"bool_col\":true,\"accountName\":\"xyz\",\"float_64_col_neg_infinity\":\"-Infinity\",\"float_32_col_neg_infinity\":\"-Infinity\",\"accountNumber\":\"1\",\"bytesCol\":\"R09PR0xF\",\"accountId\":\"Id1\",\"migration_shard_id\":\"shard1\",\"float_64_col\":0.5,\"float_32_col_infinity\":\"Infinity\",\"float_32_col\":0.5,\"float_64_col_infinity\":\"Infinity\",\"float_64_col_nan\":\"NaN\"}";
234+
"{\"bool_col_null\":null,\"float_32_col_nan\":\"NaN\",\"bool_col\":true,\"timestamp_col_null\":null,\"accountName\":\"xyz\",\"float_64_col_neg_infinity\":\"-Infinity\",\"float_32_col_neg_infinity\":\"-Infinity\",\"bytes_col_null\":null,\"string_col_null\":null,\"accountNumber\":\"1\",\"bytesCol\":\"R09PR0xF\",\"accountId\":\"Id1\",\"migration_shard_id\":\"shard1\",\"int_64_col_null\":null,\"float_64_col\":0.5,\"float_32_col_infinity\":\"Infinity\",\"float_32_col\":0.5,\"float_64_col_infinity\":\"Infinity\",\"float_64_col_null\":null,\"float_64_col_nan\":\"NaN\",\"float_32_col_null\":null}";
228235

229236
record.setMod(
230237
new Mod(record.getMod().getKeysJson(), record.getMod().getOldValuesJson(), newValuesJson));
@@ -264,7 +271,7 @@ public void testProcessElementDeleteModForMultiShard() throws Exception {
264271
Long key = keyStr.hashCode() % 10000L;
265272

266273
String newValuesJson =
267-
"{\"float_32_col_nan\":\"NaN\",\"bool_col\":true,\"accountName\":\"xyz\",\"float_64_col_neg_infinity\":\"-Infinity\",\"float_32_col_neg_infinity\":\"-Infinity\",\"accountNumber\":\"1\",\"bytesCol\":\"R09PR0xF\",\"accountId\":\"Id1\",\"migration_shard_id\":\"shard1\",\"float_64_col\":0.5,\"float_32_col_infinity\":\"Infinity\",\"float_32_col\":0.5,\"float_64_col_infinity\":\"Infinity\",\"float_64_col_nan\":\"NaN\"}";
274+
"{\"bool_col_null\":null,\"float_32_col_nan\":\"NaN\",\"bool_col\":true,\"timestamp_col_null\":null,\"accountName\":\"xyz\",\"float_64_col_neg_infinity\":\"-Infinity\",\"float_32_col_neg_infinity\":\"-Infinity\",\"bytes_col_null\":null,\"string_col_null\":null,\"accountNumber\":\"1\",\"bytesCol\":\"R09PR0xF\",\"accountId\":\"Id1\",\"migration_shard_id\":\"shard1\",\"int_64_col_null\":null,\"float_64_col\":0.5,\"float_32_col_infinity\":\"Infinity\",\"float_32_col\":0.5,\"float_64_col_infinity\":\"Infinity\",\"float_64_col_null\":null,\"float_64_col_nan\":\"NaN\",\"float_32_col_null\":null}";
268275

269276
record.setMod(
270277
new Mod(record.getMod().getKeysJson(), record.getMod().getOldValuesJson(), newValuesJson));
@@ -806,9 +813,16 @@ private static Ddl getTestDdl() {
806813
.string()
807814
.max()
808815
.endColumn()
816+
.column("string_col_null")
817+
.string()
818+
.max()
819+
.endColumn()
809820
.column("accountNumber")
810821
.int64()
811822
.endColumn()
823+
.column("int_64_col_null")
824+
.int64()
825+
.endColumn()
812826
.column("bytesCol")
813827
.bytes()
814828
.endColumn()
@@ -824,16 +838,7 @@ private static Ddl getTestDdl() {
824838
.column("float_64_col_neg_infinity")
825839
.float64()
826840
.endColumn()
827-
.column("float_64_col")
828-
.float64()
829-
.endColumn()
830-
.column("float_64_col_nan")
831-
.float64()
832-
.endColumn()
833-
.column("float_64_col_infinity")
834-
.float64()
835-
.endColumn()
836-
.column("float_64_col_neg_infinity")
841+
.column("float_64_col_null")
837842
.float64()
838843
.endColumn()
839844
.column("float_32_col")
@@ -848,9 +853,21 @@ private static Ddl getTestDdl() {
848853
.column("float_32_col_neg_infinity")
849854
.float32()
850855
.endColumn()
856+
.column("float_32_col_null")
857+
.float32()
858+
.endColumn()
851859
.column("bool_col")
852860
.bool()
853861
.endColumn()
862+
.column("bool_col_null")
863+
.bool()
864+
.endColumn()
865+
.column("bytes_col_null")
866+
.bytes()
867+
.endColumn()
868+
.column("timestamp_col_null")
869+
.timestamp()
870+
.endColumn()
854871
// Non-stored Generated Column should not affect any flows
855872
.column("non_stored_gen_column")
856873
.int64()

v2/spanner-to-sourcedb/src/test/resources/SpannerToSourceDbDatatypeIT/mysql-schema.sql

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,16 @@ CREATE TABLE `AllDatatypeColumns` (
2727
`other_bool_column` tinyint(1),
2828
`binary_column` binary(10),
2929
`varbinary_column` varbinary(20),
30-
`bit_column` bit(7),
30+
`bit_column` bit(7),
31+
`null_string_column` varchar(128),
32+
`null_int_column` bigint,
33+
`null_date_column` date,
34+
`null_float_64_column` double,
35+
`null_float_32_column` float(10,2),
36+
`null_numeric_column` decimal(10,2),
37+
`null_timestamp_column` timestamp,
38+
`null_blob_column` varbinary(20),
39+
`null_bool_column` tinyint(1),
3140
PRIMARY KEY (`varchar_column`)
3241
);
3342

0 commit comments

Comments
 (0)