Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
<protobuf.version>3.5.1</protobuf.version>
<log4j.version>1.2.17</log4j.version>
<slf4j.version>1.7.16</slf4j.version>
<grpc.version>1.38.0</grpc.version>
<grpc.version>1.48.0</grpc.version>
<netty.tcnative.version>2.0.34.Final</netty.tcnative.version>
<gson.version>2.8.9</gson.version>
<powermock.version>1.6.6</powermock.version>
Expand All @@ -75,12 +75,12 @@
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.16.1</version>
<version>3.19.6</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java-util</artifactId>
<version>3.16.1</version>
<version>3.19.6</version>
</dependency>
<dependency>
<groupId>io.perfmark</groupId>
Expand Down Expand Up @@ -232,6 +232,11 @@
<version>3.9</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.15</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
Expand Down
22 changes: 14 additions & 8 deletions src/main/java/io/grpc/internal/ClientCallImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static io.grpc.Status.DEADLINE_EXCEEDED;
import static io.grpc.internal.GrpcUtil.CONTENT_ACCEPT_ENCODING_KEY;
import static io.grpc.internal.GrpcUtil.CONTENT_ENCODING_KEY;
import static io.grpc.internal.GrpcUtil.CONTENT_LENGTH_KEY;
import static io.grpc.internal.GrpcUtil.MESSAGE_ACCEPT_ENCODING_KEY;
import static io.grpc.internal.GrpcUtil.MESSAGE_ENCODING_KEY;
import static java.lang.Math.max;
Expand All @@ -33,6 +34,7 @@
import io.grpc.Attributes;
import io.grpc.CallOptions;
import io.grpc.ClientCall;
import io.grpc.ClientStreamTracer;
import io.grpc.Codec;
import io.grpc.Compressor;
import io.grpc.CompressorRegistry;
Expand Down Expand Up @@ -166,6 +168,7 @@ static void prepareHeaders(
DecompressorRegistry decompressorRegistry,
Compressor compressor,
boolean fullStreamDecompression) {
headers.discardAll(CONTENT_LENGTH_KEY);
headers.discardAll(MESSAGE_ENCODING_KEY);
if (compressor != Codec.Identity.NONE) {
headers.put(MESSAGE_ENCODING_KEY, compressor.getMessageEncoding());
Expand Down Expand Up @@ -260,10 +263,13 @@ public void runInContext() {
effectiveDeadline, context.getDeadline(), callOptions.getDeadline());
stream = clientStreamProvider.newStream(method, callOptions, headers, context);
} else {
ClientStreamTracer[] tracers =
GrpcUtil.getClientStreamTracers(callOptions, headers, 0, false);
stream =
new FailingClientStream(
DEADLINE_EXCEEDED.withDescription(
"ClientCall started after deadline exceeded: " + effectiveDeadline));
"ClientCall started after deadline exceeded: " + effectiveDeadline),
tracers);
}

if (callExecutorIsDirect) {
Expand Down Expand Up @@ -363,12 +369,14 @@ private static void logIfContextNarrowedTimeout(
StringBuilder builder =
new StringBuilder(
String.format(
"Call timeout set to '%d' ns, due to context deadline.", effectiveTimeout));
Locale.US,
"Call timeout set to '%d' ns, due to context deadline.",
effectiveTimeout));
if (callDeadline == null) {
builder.append(" Explicit call timeout was not set.");
} else {
long callTimeout = callDeadline.timeRemaining(TimeUnit.NANOSECONDS);
builder.append(String.format(" Explicit call timeout was '%d' ns.", callTimeout));
builder.append(String.format(Locale.US, " Explicit call timeout was '%d' ns.", callTimeout));
}

log.fine(builder.toString());
Expand Down Expand Up @@ -562,6 +570,9 @@ public void setMessageCompression(boolean enabled) {

@Override
public boolean isReady() {
if (halfCloseCalled) {
return false;
}
return stream.isReady();
}

Expand Down Expand Up @@ -711,11 +722,6 @@ private void runInternal() {
}
}

@Override
public void closed(Status status, Metadata trailers) {
closed(status, RpcProgress.PROCESSED, trailers);
}

@Override
public void closed(Status status, RpcProgress rpcProgress, Metadata trailers) {
PerfMark.startTask("ClientStreamListener.closed", tag);
Expand Down
91 changes: 51 additions & 40 deletions src/main/java/io/netty/buffer/PoolArena.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ enum SizeClass {

final int numSmallSubpagePools;
final int directMemoryCacheAlignment;
final int directMemoryCacheAlignmentMask;
private final PoolSubpage<T>[] smallSubpagePools;

private final PoolChunkList<T> q050;
Expand Down Expand Up @@ -97,7 +96,6 @@ protected PoolArena(
super(pageSize, pageShifts, chunkSize, cacheAlignment);
this.parent = parent;
directMemoryCacheAlignment = cacheAlignment;
directMemoryCacheAlignmentMask = cacheAlignment - 1;

numSmallSubpagePools = nSubpages;
smallSubpagePools = newSubpagePoolArray(numSmallSubpagePools);
Expand Down Expand Up @@ -183,17 +181,23 @@ private void tcacheAllocateSmall(
return;
}

/**
* Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and {@link
* PoolChunk#free(long)} may modify the doubly linked list as well.
/*
* Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and
* {@link PoolChunk#free(long)} may modify the doubly linked list as well.
*/
final PoolSubpage<T> head = smallSubpagePools[sizeIdx];
final boolean needsNormalAllocation;
synchronized (head) {
final PoolSubpage<T> s = head.next;
needsNormalAllocation = s == head;
if (!needsNormalAllocation) {
assert s.doNotDestroy && s.elemSize == sizeIdx2size(sizeIdx);
assert s.doNotDestroy && s.elemSize == sizeIdx2size(sizeIdx)
: "doNotDestroy="
+ s.doNotDestroy
+ ", elemSize="
+ s.elemSize
+ ", sizeIdx="
+ sizeIdx;
long handle = s.allocate();
assert handle >= 0;
s.chunk.initBufWithSubpage(buf, null, handle, reqCapacity, cache);
Expand Down Expand Up @@ -221,7 +225,7 @@ private void tcacheAllocateNormal(
}
}

// Method must be called inside synchronized(this) { ... } block
// Method must be called inside synchronized(this) { ... } block
private void allocateNormal(
PooledByteBuf<T> buf, int reqCapacity, int sizeIdx, PoolThreadCache threadCache) {
if (q050.allocate(buf, reqCapacity, sizeIdx, threadCache)
Expand Down Expand Up @@ -272,7 +276,7 @@ void free(
}
}

private SizeClass sizeClass(long handle) {
private static SizeClass sizeClass(long handle) {
return isSubpage(handle) ? SizeClass.Small : SizeClass.Normal;
}

Expand Down Expand Up @@ -499,6 +503,25 @@ public long numActiveBytes() {
return max(0, val);
}

/**
* Return the number of bytes that are currently pinned to buffer instances, by the arena. The
* pinned memory is not accessible for use by any other allocation, until the buffers using have
* all been released.
*/
public long numPinnedBytes() {
long val =
activeBytesHuge
.value(); // Huge chunks are exact-sized for the buffers they were allocated to.
synchronized (this) {
for (int i = 0; i < chunkListMetrics.size(); i++) {
for (PoolChunkMetric m : chunkListMetrics.get(i)) {
val += ((PoolChunk<?>) m).pinnedBytes();
}
}
}
return max(0, val);
}

protected abstract PoolChunk<T> newChunk(
int pageSize, int maxPageIdx, int pageShifts, int chunkSize);

Expand Down Expand Up @@ -588,13 +611,8 @@ private void destroyPoolChunkLists(PoolChunkList<T>... chunkLists) {

static final class HeapArena extends PoolArena<byte[]> {

HeapArena(
PooledByteBufAllocator parent,
int pageSize,
int pageShifts,
int chunkSize,
int directMemoryCacheAlignment) {
super(parent, pageSize, pageShifts, chunkSize, directMemoryCacheAlignment);
HeapArena(PooledByteBufAllocator parent, int pageSize, int pageShifts, int chunkSize) {
super(parent, pageSize, pageShifts, chunkSize, 0);
}

private static byte[] newByteArray(int size) {
Expand All @@ -610,12 +628,12 @@ boolean isDirect() {
protected PoolChunk<byte[]> newChunk(
int pageSize, int maxPageIdx, int pageShifts, int chunkSize) {
return new PoolChunk<byte[]>(
this, newByteArray(chunkSize), pageSize, pageShifts, chunkSize, maxPageIdx, 0);
this, null, newByteArray(chunkSize), pageSize, pageShifts, chunkSize, maxPageIdx);
}

@Override
protected PoolChunk<byte[]> newUnpooledChunk(int capacity) {
return new PoolChunk<byte[]>(this, newByteArray(capacity), capacity, 0);
return new PoolChunk<byte[]>(this, null, newByteArray(capacity), capacity);
}

@Override
Expand Down Expand Up @@ -656,40 +674,33 @@ boolean isDirect() {
return true;
}

// mark as package-private, only for unit test
int offsetCacheLine(ByteBuffer memory) {
// We can only calculate the offset if Unsafe is present as otherwise directBufferAddress(...)
// will
// throw an NPE.
int remainder =
HAS_UNSAFE
? (int)
(PlatformDependent.directBufferAddress(memory) & directMemoryCacheAlignmentMask)
: 0;

// offset = alignment - address & (alignment - 1)
return directMemoryCacheAlignment - remainder;
}

@Override
protected PoolChunk<ByteBuffer> newChunk(
int pageSize, int maxPageIdx, int pageShifts, int chunkSize) {
if (directMemoryCacheAlignment == 0) {
ByteBuffer memory = allocateDirect(chunkSize);
return new PoolChunk<ByteBuffer>(
this, allocateDirect(chunkSize), pageSize, pageShifts, chunkSize, maxPageIdx, 0);
this, memory, memory, pageSize, pageShifts, chunkSize, maxPageIdx);
}
final ByteBuffer memory = allocateDirect(chunkSize + directMemoryCacheAlignment);

final ByteBuffer base = allocateDirect(chunkSize + directMemoryCacheAlignment);
final ByteBuffer memory =
PlatformDependent.alignDirectBuffer(base, directMemoryCacheAlignment);
return new PoolChunk<ByteBuffer>(
this, memory, pageSize, pageShifts, chunkSize, maxPageIdx, offsetCacheLine(memory));
this, base, memory, pageSize, pageShifts, chunkSize, maxPageIdx);
}

@Override
protected PoolChunk<ByteBuffer> newUnpooledChunk(int capacity) {
if (directMemoryCacheAlignment == 0) {
return new PoolChunk<ByteBuffer>(this, allocateDirect(capacity), capacity, 0);
ByteBuffer memory = allocateDirect(capacity);
return new PoolChunk<ByteBuffer>(this, memory, memory, capacity);
}
final ByteBuffer memory = allocateDirect(capacity + directMemoryCacheAlignment);
return new PoolChunk<ByteBuffer>(this, memory, capacity, offsetCacheLine(memory));

final ByteBuffer base = allocateDirect(capacity + directMemoryCacheAlignment);
final ByteBuffer memory =
PlatformDependent.alignDirectBuffer(base, directMemoryCacheAlignment);
return new PoolChunk<ByteBuffer>(this, base, memory, capacity);
}

private static ByteBuffer allocateDirect(int capacity) {
Expand All @@ -701,9 +712,9 @@ private static ByteBuffer allocateDirect(int capacity) {
@Override
protected void destroyChunk(PoolChunk<ByteBuffer> chunk) {
if (PlatformDependent.useDirectBufferNoCleaner()) {
PlatformDependent.freeDirectNoCleaner(chunk.memory);
PlatformDependent.freeDirectNoCleaner((ByteBuffer) chunk.base);
} else {
PlatformDependent.freeDirectBuffer(chunk.memory);
PlatformDependent.freeDirectBuffer((ByteBuffer) chunk.base);
}
}

Expand Down
8 changes: 6 additions & 2 deletions src/main/java/org/tikv/common/AbstractGRPCClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -193,13 +193,17 @@ private boolean doCheckHealth(BackOffer backOffer, String addressStr, HostMappin
HealthCheckResponse resp = stub.check(req);
return resp.getStatus() == HealthCheckResponse.ServingStatus.SERVING;
} catch (Exception e) {
logger.warn("check health failed.", e);
logger.warn("check health failed, addr: {}, caused by: {}", addressStr, e.getMessage());
backOffer.doBackOff(BackOffFuncType.BoCheckHealth, e);
}
}
}

protected boolean checkHealth(BackOffer backOffer, String addressStr, HostMapping hostMapping) {
return doCheckHealth(backOffer, addressStr, hostMapping);
try {
return doCheckHealth(backOffer, addressStr, hostMapping);
} catch (Exception e) {
return false;
}
}
}
28 changes: 22 additions & 6 deletions src/main/java/org/tikv/common/PDClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -462,14 +462,19 @@ private GetMembersResponse doGetMembers(BackOffer backOffer, URI uri) {
}
return resp;
} catch (Exception e) {
logger.warn("failed to get member from pd server.", e);
logger.warn(
"failed to get member from pd server from {}, caused by: {}", uri, e.getMessage());
backOffer.doBackOff(BackOffFuncType.BoPDRPC, e);
}
}
}

private GetMembersResponse getMembers(BackOffer backOffer, URI uri) {
return doGetMembers(backOffer, uri);
try {
return doGetMembers(backOffer, uri);
} catch (Exception e) {
return null;
}
}

// return whether the leader has changed to target address `leaderUrlStr`.
Expand Down Expand Up @@ -524,13 +529,16 @@ synchronized boolean createFollowerClientWrapper(
public void tryUpdateLeaderOrForwardFollower() {
if (updateLeaderNotify.compareAndSet(false, true)) {
try {
BackOffer backOffer = defaultBackOffer();
updateLeaderService.submit(
() -> {
try {
updateLeaderOrForwardFollower(backOffer);
updateLeaderOrForwardFollower();
} catch (Exception e) {
logger.info("update leader or forward follower failed", e);
throw e;
} finally {
updateLeaderNotify.set(false);
logger.info("updating leader finish");
}
});
} catch (RejectedExecutionException e) {
Expand All @@ -540,11 +548,13 @@ public void tryUpdateLeaderOrForwardFollower() {
}
}

private synchronized void updateLeaderOrForwardFollower(BackOffer backOffer) {
private synchronized void updateLeaderOrForwardFollower() {
logger.warn("updating leader or forward follower");
if (System.currentTimeMillis() - lastUpdateLeaderTime < MIN_TRY_UPDATE_DURATION) {
return;
}
for (URI url : this.pdAddrs) {
BackOffer backOffer = this.probeBackOffer();
// since resp is null, we need update leader's address by walking through all pd server.
GetMembersResponse resp = getMembers(backOffer, url);
if (resp == null) {
Expand Down Expand Up @@ -602,8 +612,9 @@ && createFollowerClientWrapper(backOffer, followerUrlStr, leaderUrlStr)) {
}

public void tryUpdateLeader() {
logger.info("try update leader");
for (URI url : this.pdAddrs) {
BackOffer backOffer = defaultBackOffer();
BackOffer backOffer = this.probeBackOffer();
// since resp is null, we need update leader's address by walking through all pd server.
GetMembersResponse resp = getMembers(backOffer, url);
if (resp == null) {
Expand Down Expand Up @@ -856,4 +867,9 @@ public RequestKeyCodec getCodec() {
private static BackOffer defaultBackOffer() {
return ConcreteBackOffer.newCustomBackOff(BackOffer.PD_INFO_BACKOFF);
}

private BackOffer probeBackOffer() {
int maxSleep = (int) getTimeout() * 2;
return ConcreteBackOffer.newCustomBackOff(maxSleep);
}
}
Loading