Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions src/main/java/org/tikv/common/region/RegionManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,13 @@ public TiRegion getRegionByKey(ByteString key, BackOffer backOffer) {
Pair<Metapb.Region, Metapb.Peer> regionAndLeader = pdClient.getRegionByKey(backOffer, key);
region =
cache.putRegion(createRegion(regionAndLeader.first, regionAndLeader.second, backOffer));
logger.debug(
String.format(
"get region id: %d with leader: %d",
region.getId(), region.getLeader().getStoreId()));
}
} catch (Exception e) {
logger.warn("Get region failed: ", e);
return null;
} finally {
requestTimer.observeDuration();
Expand Down Expand Up @@ -240,18 +245,19 @@ public Pair<TiRegion, TiStore> getRegionStorePairByKey(
}
logger.info("Store {} is unreachable, try to get the next replica", peer.getStoreId());
}
// Does not set unreachable store to null in case it is incompatible with GrpcForward
if (store == null || !store.isReachable()) {
logger.warn("No TiKV store available for region: " + region);
}
} else {
List<TiStore> tiflashStores = new ArrayList<>();
for (Peer peer : region.getLearnerList()) {
TiStore s = getStoreById(peer.getStoreId(), backOffer);
if (!s.isReachable()) {
continue;
}
for (Metapb.StoreLabel label : s.getStore().getLabelsList()) {
if (label.getKey().equals(storeType.getLabelKey())
&& label.getValue().equals(storeType.getLabelValue())) {
tiflashStores.add(s);
}
if (s.isTiFlash()) {
tiflashStores.add(s);
}
}
// select a tiflash with Round-Robin strategy
Expand Down
30 changes: 30 additions & 0 deletions src/main/java/org/tikv/common/region/StoreHealthyChecker.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,22 @@
import io.grpc.health.v1.HealthCheckRequest;
import io.grpc.health.v1.HealthCheckResponse;
import io.grpc.health.v1.HealthGrpc;
import io.grpc.stub.ClientCalls;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.common.ReadOnlyPDClient;
import org.tikv.common.util.ChannelFactory;
import org.tikv.common.util.ConcreteBackOffer;
import org.tikv.kvproto.Metapb;
import org.tikv.kvproto.Mpp;
import org.tikv.kvproto.Mpp.IsAliveRequest;
import org.tikv.kvproto.TikvGrpc;

public class StoreHealthyChecker implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(StoreHealthyChecker.class);
Expand Down Expand Up @@ -75,6 +80,30 @@ private List<TiStore> getValidStores() {

private boolean checkStoreHealth(TiStore store) {
String addressStr = store.getStore().getAddress();
if (store.isTiFlash()) {
return checkTiFlashHealth(addressStr);
}
return checkTiKVHealth(addressStr);
}

private boolean checkTiFlashHealth(String addressStr) {
try {
ManagedChannel channel = channelFactory.getChannel(addressStr, pdClient.getHostMapping());
TikvGrpc.TikvBlockingStub stub =
TikvGrpc.newBlockingStub(channel).withDeadlineAfter(timeout, TimeUnit.MILLISECONDS);
Supplier<IsAliveRequest> factory = () -> Mpp.IsAliveRequest.newBuilder().build();
Mpp.IsAliveResponse resp =
ClientCalls.blockingUnaryCall(
stub.getChannel(), TikvGrpc.getIsAliveMethod(), stub.getCallOptions(), factory.get());
return resp != null && resp.getAvailable();
} catch (Exception e) {
logger.info(
"fail to check TiFlash health, regard as unhealthy. TiFlash address: " + addressStr, e);
return false;
}
}

private boolean checkTiKVHealth(String addressStr) {
try {
ManagedChannel channel = channelFactory.getChannel(addressStr, pdClient.getHostMapping());
HealthGrpc.HealthBlockingStub stub =
Expand All @@ -83,6 +112,7 @@ private boolean checkStoreHealth(TiStore store) {
HealthCheckResponse resp = stub.check(req);
return resp.getStatus() == HealthCheckResponse.ServingStatus.SERVING;
} catch (Exception e) {
logger.info("fail to check TiKV health, regard as unhealthy. TiKV address: " + addressStr, e);
return false;
}
}
Expand Down
10 changes: 10 additions & 0 deletions src/main/java/org/tikv/common/region/TiStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -105,4 +105,14 @@ public Metapb.Store getProxyStore() {
public long getId() {
return this.store.getId();
}

public boolean isTiFlash() {
for (Metapb.StoreLabel label : store.getLabelsList()) {
if (label.getKey().equals(TiStoreType.TiFlash.getLabelKey())
&& label.getValue().equals(TiStoreType.TiFlash.getLabelValue())) {
return true;
}
}
return false;
}
}