Skip to content

Commit e9eecc4

Browse files
bharadwaj-adityaAditya Bharadwaj
authored andcommitted
fixed priority handling in assign shard id (GoogleCloudPlatform#3110)
* fixed priority handling in assign shrad id * fixed tests * fixed tests * fixed tests * moved result set to closeable block --------- Co-authored-by: Aditya Bharadwaj <[email protected]>
1 parent 342837d commit e9eecc4

File tree

4 files changed

+63
-91
lines changed

4 files changed

+63
-91
lines changed

v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/dbutils/dao/spanner/SpannerDao.java

Lines changed: 1 addition & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -56,36 +56,10 @@ public SpannerDao(String projectId, String instanceId, String databaseId) {
5656

5757
// used for unit testing
5858
public SpannerDao(SpannerAccessor spannerAccessor) {
59+
this.spannerConfig = SpannerConfig.create();
5960
this.spannerAccessor = spannerAccessor;
6061
}
6162

62-
public ShadowTableRecord getShadowTableRecord(
63-
String tableName, com.google.cloud.spanner.Key primaryKey) {
64-
try {
65-
DatabaseClient databaseClient = spannerAccessor.getDatabaseClient();
66-
Struct row =
67-
databaseClient
68-
.singleUse()
69-
.readRow(
70-
tableName,
71-
primaryKey,
72-
Arrays.asList(
73-
Constants.PROCESSED_COMMIT_TS_COLUMN_NAME, Constants.RECORD_SEQ_COLUMN_NAME));
74-
75-
// This is the first event for the primary key and hence the latest event.
76-
if (row == null) {
77-
return null;
78-
}
79-
80-
return new ShadowTableRecord(row.getTimestamp(0), row.getLong(1));
81-
} catch (Exception e) {
82-
LOG.warn("The {} table could not be read. Exception: {}", tableName, e);
83-
// We need to throw the original exception such that the caller can
84-
// look at SpannerException class to take decision
85-
throw e;
86-
}
87-
}
88-
8963
public ShadowTableRecord readShadowTableRecordWithExclusiveLock(
9064
String shadowTableName,
9165
com.google.cloud.spanner.Key primaryKey,

v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/transforms/AssignShardIdFn.java

Lines changed: 36 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@
1919
import com.fasterxml.jackson.databind.JsonNode;
2020
import com.fasterxml.jackson.databind.ObjectMapper;
2121
import com.fasterxml.jackson.databind.node.ObjectNode;
22+
import com.google.cloud.spanner.KeySet;
23+
import com.google.cloud.spanner.Options;
24+
import com.google.cloud.spanner.ResultSet;
2225
import com.google.cloud.spanner.SpannerException;
2326
import com.google.cloud.spanner.Struct;
2427
import com.google.cloud.spanner.TimestampBound;
@@ -315,20 +318,9 @@ private Map<String, Object> fetchSpannerRecord(
315318
.map(Column::name)
316319
.collect(Collectors.toList());
317320

318-
Struct row =
319-
spannerAccessor
320-
.getDatabaseClient()
321-
.singleUse(TimestampBound.ofReadTimestamp(staleReadTs))
322-
.readRow(tableName, generateKey(tableName, keysJson, ddl), columns);
323-
if (row == null) {
324-
throw new Exception(
325-
"stale read on Spanner returned null for table: "
326-
+ tableName
327-
+ ", commitTimestamp: "
328-
+ commitTimestamp
329-
+ " and serverTxnId:"
330-
+ serverTxnId);
331-
}
321+
com.google.cloud.spanner.Key pk = generateKey(tableName, keysJson, ddl);
322+
323+
Struct row = readRowAsStruct(tableName, commitTimestamp, serverTxnId, staleReadTs, columns, pk);
332324
Map<String, Object> rowAsMap = getRowAsMap(row, columns, tableName, ddl);
333325
// TODO find a way to not make a special case from Cassandra.
334326
if (modType == ModType.DELETE && sourceType != Constants.SOURCE_CASSANDRA) {
@@ -350,6 +342,36 @@ private Map<String, Object> fetchSpannerRecord(
350342
return rowAsMap;
351343
}
352344

345+
private Struct readRowAsStruct(
346+
String tableName,
347+
com.google.cloud.Timestamp commitTimestamp,
348+
String serverTxnId,
349+
com.google.cloud.Timestamp staleReadTs,
350+
List<String> columns,
351+
com.google.cloud.spanner.Key pk)
352+
throws Exception {
353+
try (ResultSet rs =
354+
spannerAccessor
355+
.getDatabaseClient()
356+
.singleUse(TimestampBound.ofReadTimestamp(staleReadTs))
357+
.read(
358+
tableName,
359+
KeySet.singleKey(pk),
360+
columns,
361+
Options.priority(spannerConfig.getRpcPriority().get()))) {
362+
if (!rs.next()) {
363+
throw new Exception(
364+
"stale read on Spanner returned null for table: "
365+
+ tableName
366+
+ ", commitTimestamp: "
367+
+ commitTimestamp
368+
+ " and serverTxnId:"
369+
+ serverTxnId);
370+
}
371+
return rs.getCurrentRowAsStruct();
372+
}
373+
}
374+
353375
/*
354376
* Marshals Spanner's read row values to match CDC stream's representation.
355377
*/

v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/dbutils/dao/SpannerDaoTest.java

Lines changed: 0 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -15,24 +15,16 @@
1515
*/
1616
package com.google.cloud.teleport.v2.templates.dbutils.dao;
1717

18-
import static com.google.common.truth.Truth.assertThat;
19-
import static org.mockito.ArgumentMatchers.any;
2018
import static org.mockito.ArgumentMatchers.eq;
2119
import static org.mockito.Mockito.doNothing;
22-
import static org.mockito.Mockito.doThrow;
2320
import static org.mockito.Mockito.verify;
2421
import static org.mockito.Mockito.when;
2522

26-
import com.google.cloud.Timestamp;
2723
import com.google.cloud.spanner.DatabaseClient;
28-
import com.google.cloud.spanner.Key;
2924
import com.google.cloud.spanner.Mutation;
3025
import com.google.cloud.spanner.ReadOnlyTransaction;
31-
import com.google.cloud.spanner.Struct;
3226
import com.google.cloud.spanner.TransactionContext;
33-
import com.google.cloud.teleport.v2.templates.constants.Constants;
3427
import com.google.cloud.teleport.v2.templates.dbutils.dao.spanner.SpannerDao;
35-
import com.google.cloud.teleport.v2.templates.utils.ShadowTableRecord;
3628
import org.apache.beam.sdk.io.gcp.spanner.SpannerAccessor;
3729
import org.junit.Before;
3830
import org.junit.FixMethodOrder;
@@ -62,46 +54,6 @@ public void doBeforeEachTest() throws Exception {
6254
doNothing().when(mockSpannerAccessor).close();
6355
}
6456

65-
@Test
66-
public void testGetShadowTableRecordReturnsNull() {
67-
SpannerDao spannerDao = new SpannerDao(mockSpannerAccessor);
68-
// Mock readRow
69-
when(mockReadOnlyTransaction.readRow(eq("tableName"), any(Key.class), any(Iterable.class)))
70-
.thenReturn(null);
71-
assertThat(spannerDao.getShadowTableRecord("tableName", null)).isNull();
72-
}
73-
74-
@Test
75-
public void testGetShadowTableRecordReturnsRecord() {
76-
SpannerDao spannerDao = new SpannerDao(mockSpannerAccessor);
77-
Struct row =
78-
Struct.newBuilder()
79-
.set(Constants.PROCESSED_COMMIT_TS_COLUMN_NAME)
80-
.to(Timestamp.parseTimestamp("2023-05-18T12:01:13.088397258Z"))
81-
.set(Constants.RECORD_SEQ_COLUMN_NAME)
82-
.to(1)
83-
.build();
84-
85-
when(mockReadOnlyTransaction.readRow(eq("junk"), any(), any(Iterable.class))).thenReturn(row);
86-
ShadowTableRecord response = spannerDao.getShadowTableRecord("junk", null);
87-
spannerDao.close();
88-
89-
ShadowTableRecord expectedResponse =
90-
new ShadowTableRecord(Timestamp.parseTimestamp("2023-05-18T12:01:13.088397258Z"), 1);
91-
assertThat(response.getProcessedCommitTimestamp())
92-
.isEqualTo(expectedResponse.getProcessedCommitTimestamp());
93-
assertThat(response.getRecordSequence()).isEqualTo(expectedResponse.getRecordSequence());
94-
}
95-
96-
@Test(expected = RuntimeException.class)
97-
public void testGetShadowTableRecordException() {
98-
SpannerDao spannerDao = new SpannerDao(mockSpannerAccessor);
99-
doThrow(new RuntimeException("generic exception"))
100-
.when(mockReadOnlyTransaction)
101-
.readRow(eq("error"), any(), any(Iterable.class));
102-
ShadowTableRecord response = spannerDao.getShadowTableRecord("error", null);
103-
}
104-
10557
@Test
10658
public void testUpdateShadowTable() {
10759
SpannerDao spannerDao = new SpannerDao(mockSpannerAccessor);

v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/transforms/AssignShardIdFnTest.java

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,10 @@
3131
import com.google.cloud.Timestamp;
3232
import com.google.cloud.spanner.DatabaseClient;
3333
import com.google.cloud.spanner.Key;
34+
import com.google.cloud.spanner.KeySet;
35+
import com.google.cloud.spanner.Options.ReadOption;
3436
import com.google.cloud.spanner.ReadOnlyTransaction;
37+
import com.google.cloud.spanner.ResultSet;
3538
import com.google.cloud.spanner.Struct;
3639
import com.google.cloud.spanner.TimestampBound;
3740
import com.google.cloud.spanner.Value;
@@ -227,6 +230,13 @@ public void testProcessElementInsertModForMultiShard() throws Exception {
227230
when(mockOptions.getColumnOverrides()).thenReturn("");
228231
when(mockOptions.getSchemaOverridesFilePath()).thenReturn("");
229232

233+
com.google.cloud.spanner.ResultSet resultSet = mock(ResultSet.class);
234+
when(mockReadOnlyTransaction.read(
235+
eq("tableName"), any(KeySet.class), any(Iterable.class), any(ReadOption.class)))
236+
.thenReturn(resultSet);
237+
when(resultSet.next()).thenReturn(true);
238+
when(resultSet.getCurrentRowAsStruct()).thenReturn(mockRow);
239+
230240
AssignShardIdFn assignShardIdFn =
231241
new AssignShardIdFn(
232242
SpannerConfig.create(),
@@ -275,6 +285,13 @@ public void testProcessElementDeleteModForMultiShard() throws Exception {
275285
// Prepare mock for c.sideInput(ddlView)
276286
when(processContext.sideInput(mockDdlView)).thenReturn(ddl);
277287

288+
com.google.cloud.spanner.ResultSet resultSet = mock(ResultSet.class);
289+
when(mockReadOnlyTransaction.read(
290+
eq("tableName"), any(KeySet.class), any(Iterable.class), any(ReadOption.class)))
291+
.thenReturn(resultSet);
292+
when(resultSet.next()).thenReturn(true);
293+
when(resultSet.getCurrentRowAsStruct()).thenReturn(mockRow);
294+
278295
AssignShardIdFn assignShardIdFn =
279296
new AssignShardIdFn(
280297
SpannerConfig.create(),
@@ -392,6 +409,8 @@ public void testProcessElementDeleteAllDatatypes() throws Exception {
392409
when(processContext.element()).thenReturn(record);
393410
// All datatypes row
394411
ByteArray bytesArray = ByteArray.copyFrom("abc");
412+
com.google.cloud.spanner.ResultSet resultSet = mock(ResultSet.class);
413+
395414
Struct allDatatypesRow =
396415
Struct.newBuilder()
397416
.set("first_name")
@@ -421,8 +440,13 @@ public void testProcessElementDeleteAllDatatypes() throws Exception {
421440
.set("date_field2")
422441
.to(Date.parseDate("2020-12-30"))
423442
.build();
424-
when(mockReadOnlyTransaction.readRow(eq("Users"), any(Key.class), any(Iterable.class)))
425-
.thenReturn(allDatatypesRow);
443+
444+
when(mockReadOnlyTransaction.read(
445+
eq("Users"), any(KeySet.class), any(Iterable.class), any(ReadOption.class)))
446+
.thenReturn(resultSet);
447+
448+
when(resultSet.next()).thenReturn(true);
449+
when(resultSet.getCurrentRowAsStruct()).thenReturn(allDatatypesRow);
426450

427451
Ddl ddl = SchemaUtils.buildSpannerDdlFromSessionFile(ALL_TYPES_SESSION_FILE_PATH);
428452
SourceSchema sourceSchema =

0 commit comments

Comments
 (0)