diff --git a/pom.xml b/pom.xml
index 9d95527e976..1d707191145 100644
--- a/pom.xml
+++ b/pom.xml
@@ -251,6 +251,18 @@
simpleclient_pushgateway
0.10.0
+
+ org.powermock
+ powermock-module-junit4
+ 2.0.2
+ test
+
+
+ org.powermock
+ powermock-api-mockito2
+ 2.0.2
+ test
+
diff --git a/src/main/java/org/tikv/common/exception/KeyException.java b/src/main/java/org/tikv/common/exception/KeyException.java
index 22ddda982b9..048ac3b35fe 100644
--- a/src/main/java/org/tikv/common/exception/KeyException.java
+++ b/src/main/java/org/tikv/common/exception/KeyException.java
@@ -31,7 +31,7 @@ public KeyException(String errMsg) {
}
public KeyException(Kvrpcpb.KeyError keyErr) {
- super("Key exception occurred");
+ super("Key exception occurred " + keyErr.toString());
this.keyErr = keyErr;
}
diff --git a/src/main/java/org/tikv/common/region/RegionStoreClient.java b/src/main/java/org/tikv/common/region/RegionStoreClient.java
index 9a4ed807503..1101aabd0bb 100644
--- a/src/main/java/org/tikv/common/region/RegionStoreClient.java
+++ b/src/main/java/org/tikv/common/region/RegionStoreClient.java
@@ -519,7 +519,7 @@ private boolean isPrewriteSuccess(BackOffer backOffer, PrewriteResponse resp, lo
Lock lock = new Lock(err.getLocked(), codec);
locks.add(lock);
} else {
- throw new KeyException(err.toString());
+ throw new KeyException(err);
}
}
if (isSuccess) {
diff --git a/src/main/java/org/tikv/txn/TwoPhaseCommitter.java b/src/main/java/org/tikv/txn/TwoPhaseCommitter.java
index d602cd5261f..2e6aefa5aea 100644
--- a/src/main/java/org/tikv/txn/TwoPhaseCommitter.java
+++ b/src/main/java/org/tikv/txn/TwoPhaseCommitter.java
@@ -232,8 +232,9 @@ private void doCommitPrimaryKeyWithRetry(BackOffer backOffer, ByteString key, lo
this.doCommitPrimaryKeyWithRetry(backOffer, key, commitTs);
}
}
-
- LOG.info("commit primary key {} successfully", KeyUtils.formatBytes(key));
+ if (commitResult.isSuccess() && LOG.isDebugEnabled()) {
+ LOG.debug("commit primary key {} successfully", KeyUtils.formatBytes(key));
+ }
}
/**
@@ -402,11 +403,13 @@ private void doPrewriteSecondaryKeySingleBatchWithRetry(
BatchKeys batchKeys,
Map mutations)
throws TiBatchWriteException {
- LOG.debug(
- "start prewrite secondary key, row={}, size={}KB, regionId={}",
- batchKeys.getKeys().size(),
- batchKeys.getSizeInKB(),
- batchKeys.getRegion().getId());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "start prewrite secondary key, row={}, size={}KB, regionId={}",
+ batchKeys.getKeys().size(),
+ batchKeys.getSizeInKB(),
+ batchKeys.getRegion().getId());
+ }
List keyList = batchKeys.getKeys();
int batchSize = keyList.size();
@@ -450,11 +453,13 @@ private void doPrewriteSecondaryKeySingleBatchWithRetry(
throw new TiBatchWriteException(errorMsg, e);
}
}
- LOG.debug(
- "prewrite secondary key successfully, row={}, size={}KB, regionId={}",
- batchKeys.getKeys().size(),
- batchKeys.getSizeInKB(),
- batchKeys.getRegion().getId());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "prewrite secondary key successfully, row={}, size={}KB, regionId={}",
+ batchKeys.getKeys().size(),
+ batchKeys.getSizeInKB(),
+ batchKeys.getRegion().getId());
+ }
}
private void appendBatchBySize(
@@ -597,11 +602,13 @@ private void doCommitSecondaryKeysWithRetry(
private void doCommitSecondaryKeySingleBatchWithRetry(
BackOffer backOffer, BatchKeys batchKeys, long commitTs) throws TiBatchWriteException {
- LOG.info(
- "start commit secondary key, row={}, size={}KB, regionId={}",
- batchKeys.getKeys().size(),
- batchKeys.getSizeInKB(),
- batchKeys.getRegion().getId());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "start commit secondary key, row={}, size={}KB, regionId={}",
+ batchKeys.getKeys().size(),
+ batchKeys.getSizeInKB(),
+ batchKeys.getRegion().getId());
+ }
List keysCommit = batchKeys.getKeys();
ByteString[] keys = new ByteString[keysCommit.size()];
keysCommit.toArray(keys);
@@ -617,11 +624,13 @@ private void doCommitSecondaryKeySingleBatchWithRetry(
LOG.warn(error);
throw new TiBatchWriteException("commit secondary key error", commitResult.getException());
}
- LOG.info(
- "commit {} rows successfully, size={}KB, regionId={}",
- batchKeys.getKeys().size(),
- batchKeys.getSizeInKB(),
- batchKeys.getRegion().getId());
+ if (commitResult.isSuccess() && LOG.isDebugEnabled()) {
+ LOG.debug(
+ "commit {} rows successfully, size={}KB, regionId={}",
+ batchKeys.getKeys().size(),
+ batchKeys.getSizeInKB(),
+ batchKeys.getRegion().getId());
+ }
}
private GroupKeyResult groupKeysByRegion(ByteString[] keys, int size, BackOffer backOffer)
diff --git a/src/main/java/org/tikv/txn/TxnKVClient.java b/src/main/java/org/tikv/txn/TxnKVClient.java
index 7806c56496e..9c5f4c9da3a 100644
--- a/src/main/java/org/tikv/txn/TxnKVClient.java
+++ b/src/main/java/org/tikv/txn/TxnKVClient.java
@@ -27,7 +27,6 @@
import org.tikv.common.ReadOnlyPDClient;
import org.tikv.common.TiConfiguration;
import org.tikv.common.exception.GrpcException;
-import org.tikv.common.exception.KeyException;
import org.tikv.common.exception.RegionException;
import org.tikv.common.exception.TiClientInternalException;
import org.tikv.common.exception.TiKVException;
@@ -181,7 +180,6 @@ public ClientRPCResult commit(
// TODO: check this logic to see are we satisfied?
private boolean retryableException(Exception e) {
return e instanceof TiClientInternalException
- || e instanceof KeyException
|| e instanceof RegionException
|| e instanceof StatusRuntimeException;
}
diff --git a/src/test/java/org/tikv/txn/TwoPhaseCommitterTest.java b/src/test/java/org/tikv/txn/TwoPhaseCommitterTest.java
index 02a530ffbc6..ecb150b5673 100644
--- a/src/test/java/org/tikv/txn/TwoPhaseCommitterTest.java
+++ b/src/test/java/org/tikv/txn/TwoPhaseCommitterTest.java
@@ -20,17 +20,40 @@
import static org.junit.Assert.fail;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.commons.lang3.RandomStringUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.tikv.BaseTxnKVTest;
+import org.tikv.common.BytePairWrapper;
import org.tikv.common.TiConfiguration;
import org.tikv.common.TiSession;
+import org.tikv.common.exception.KeyException;
+import org.tikv.common.util.BackOffer;
+import org.tikv.common.util.ConcreteBackOffer;
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({TiSession.class, TxnKVClient.class, TwoPhaseCommitter.class})
+@PowerMockIgnore({"javax.net.ssl.*"})
public class TwoPhaseCommitterTest extends BaseTxnKVTest {
+
+ private static final Logger logger = LoggerFactory.getLogger(TwoPhaseCommitterTest.class);
private static final int WRITE_BUFFER_SIZE = 32 * 1024;
private static final int TXN_COMMIT_BATCH_SIZE = 768 * 1024;
private static final long DEFAULT_BATCH_WRITE_LOCK_TTL = 3600000;
@@ -76,4 +99,170 @@ public void autoClosableTest() throws Exception {
executorService)) {}
Assert.assertTrue(executorService.isShutdown());
}
+
+ @Test
+ public void prewriteWriteConflictFastFailTest() throws Exception {
+
+ int WRITE_BUFFER_SIZE = 32;
+ String primaryKey = RandomStringUtils.randomAlphabetic(3);
+ AtomicLong failCount = new AtomicLong();
+ ExecutorService executorService =
+ Executors.newFixedThreadPool(
+ WRITE_BUFFER_SIZE,
+ new ThreadFactoryBuilder().setNameFormat("2pc-pool-%d").setDaemon(true).build());
+ CountDownLatch latch = new CountDownLatch(2);
+ int DEFAULT_BATCH_WRITE_LOCK_TTL = 10000;
+ new Thread(
+ () -> {
+ long startTS = session.getTimestamp().getVersion();
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ try {
+ TwoPhaseCommitter twoPhaseCommitter =
+ new TwoPhaseCommitter(
+ session, startTS, DEFAULT_BATCH_WRITE_LOCK_TTL, executorService);
+ List pairs =
+ Arrays.asList(
+ new BytePairWrapper(
+ primaryKey.getBytes(StandardCharsets.UTF_8),
+ primaryKey.getBytes(StandardCharsets.UTF_8)));
+ twoPhaseCommitter.prewriteSecondaryKeys(
+ primaryKey.getBytes(StandardCharsets.UTF_8), pairs.iterator(), 20000);
+ BackOffer backOffer = ConcreteBackOffer.newCustomBackOff(20000);
+
+ long commitTs = session.getTimestamp().getVersion();
+
+ twoPhaseCommitter.commitPrimaryKey(
+ backOffer, primaryKey.getBytes(StandardCharsets.UTF_8), commitTs);
+ } catch (Exception e) {
+ logger.error(e.getMessage(), e);
+ failCount.incrementAndGet();
+ } finally {
+ latch.countDown();
+ }
+ })
+ .start();
+
+ Thread.sleep(10);
+ new Thread(
+ () -> {
+ long startTS = session.getTimestamp().getVersion();
+ try {
+ TwoPhaseCommitter twoPhaseCommitter =
+ new TwoPhaseCommitter(
+ session, startTS, DEFAULT_BATCH_WRITE_LOCK_TTL, executorService);
+ List pairs =
+ Arrays.asList(
+ new BytePairWrapper(
+ primaryKey.getBytes(StandardCharsets.UTF_8),
+ primaryKey.getBytes(StandardCharsets.UTF_8)));
+ twoPhaseCommitter.prewriteSecondaryKeys(
+ primaryKey.getBytes(StandardCharsets.UTF_8), pairs.iterator(), 20000);
+ BackOffer backOffer = ConcreteBackOffer.newCustomBackOff(20000);
+
+ long commitTs = session.getTimestamp().getVersion();
+
+ twoPhaseCommitter.commitPrimaryKey(
+ backOffer, primaryKey.getBytes(StandardCharsets.UTF_8), commitTs);
+ } catch (Exception e) {
+ logger.error(e.getMessage(), e);
+ failCount.incrementAndGet();
+ } finally {
+ latch.countDown();
+ }
+ })
+ .start();
+ latch.await();
+ Assert.assertEquals(1, failCount.get());
+ }
+
+ @Test
+ public void prewriteWriteConflictLongNoFailTest() throws Exception {
+
+ int WRITE_BUFFER_SIZE = 32;
+ String primaryKey = RandomStringUtils.randomAlphabetic(3);
+ AtomicLong failCount = new AtomicLong();
+ ExecutorService executorService =
+ Executors.newFixedThreadPool(
+ WRITE_BUFFER_SIZE,
+ new ThreadFactoryBuilder().setNameFormat("2pc-pool-%d").setDaemon(true).build());
+ CountDownLatch latch = new CountDownLatch(2);
+ int DEFAULT_BATCH_WRITE_LOCK_TTL = 10000;
+
+ new Thread(
+ () -> {
+ long startTS = session.getTimestamp().getVersion();
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+
+ try {
+ session = PowerMockito.spy(session);
+ TxnKVClient kvClient = PowerMockito.spy(session.createTxnClient());
+ PowerMockito.when(kvClient, "retryableException", Mockito.any(KeyException.class))
+ .thenReturn(true);
+ PowerMockito.doReturn(kvClient).when(session).createTxnClient();
+
+ TwoPhaseCommitter twoPhaseCommitter =
+ new TwoPhaseCommitter(
+ session, startTS, DEFAULT_BATCH_WRITE_LOCK_TTL, executorService);
+ List pairs =
+ Arrays.asList(
+ new BytePairWrapper(
+ primaryKey.getBytes(StandardCharsets.UTF_8),
+ primaryKey.getBytes(StandardCharsets.UTF_8)));
+ twoPhaseCommitter.prewriteSecondaryKeys(
+ primaryKey.getBytes(StandardCharsets.UTF_8), pairs.iterator(), 20000);
+ BackOffer backOffer = ConcreteBackOffer.newCustomBackOff(20000);
+
+ long commitTs = session.getTimestamp().getVersion();
+
+ twoPhaseCommitter.commitPrimaryKey(
+ backOffer, primaryKey.getBytes(StandardCharsets.UTF_8), commitTs);
+ } catch (Exception e) {
+ logger.error(e.getMessage(), e);
+ failCount.incrementAndGet();
+ } finally {
+ latch.countDown();
+ }
+ })
+ .start();
+
+ Thread.sleep(10);
+ new Thread(
+ () -> {
+ long startTS = session.getTimestamp().getVersion();
+ try {
+ TwoPhaseCommitter twoPhaseCommitter =
+ new TwoPhaseCommitter(
+ session, startTS, DEFAULT_BATCH_WRITE_LOCK_TTL, executorService);
+ List pairs =
+ Arrays.asList(
+ new BytePairWrapper(
+ primaryKey.getBytes(StandardCharsets.UTF_8),
+ primaryKey.getBytes(StandardCharsets.UTF_8)));
+ twoPhaseCommitter.prewriteSecondaryKeys(
+ primaryKey.getBytes(StandardCharsets.UTF_8), pairs.iterator(), 20000);
+ BackOffer backOffer = ConcreteBackOffer.newCustomBackOff(20000);
+
+ long commitTs = session.getTimestamp().getVersion();
+
+ twoPhaseCommitter.commitPrimaryKey(
+ backOffer, primaryKey.getBytes(StandardCharsets.UTF_8), commitTs);
+ } catch (Exception e) {
+ logger.error(e.getMessage(), e);
+ failCount.incrementAndGet();
+ } finally {
+ latch.countDown();
+ }
+ })
+ .start();
+ latch.await();
+ Assert.assertEquals(1, failCount.get());
+ }
}