@@ -57,7 +57,6 @@ enum SizeClass {
5757
5858 final int numSmallSubpagePools ;
5959 final int directMemoryCacheAlignment ;
60- final int directMemoryCacheAlignmentMask ;
6160 private final PoolSubpage <T >[] smallSubpagePools ;
6261
6362 private final PoolChunkList <T > q050 ;
@@ -97,7 +96,6 @@ protected PoolArena(
9796 super (pageSize , pageShifts , chunkSize , cacheAlignment );
9897 this .parent = parent ;
9998 directMemoryCacheAlignment = cacheAlignment ;
100- directMemoryCacheAlignmentMask = cacheAlignment - 1 ;
10199
102100 numSmallSubpagePools = nSubpages ;
103101 smallSubpagePools = newSubpagePoolArray (numSmallSubpagePools );
@@ -183,17 +181,23 @@ private void tcacheAllocateSmall(
183181 return ;
184182 }
185183
186- /**
187- * Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and {@link
188- * PoolChunk#free(long)} may modify the doubly linked list as well.
184+ /*
185+ * Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and
186+ * {@link PoolChunk#free(long)} may modify the doubly linked list as well.
189187 */
190188 final PoolSubpage <T > head = smallSubpagePools [sizeIdx ];
191189 final boolean needsNormalAllocation ;
192190 synchronized (head ) {
193191 final PoolSubpage <T > s = head .next ;
194192 needsNormalAllocation = s == head ;
195193 if (!needsNormalAllocation ) {
196- assert s .doNotDestroy && s .elemSize == sizeIdx2size (sizeIdx );
194+ assert s .doNotDestroy && s .elemSize == sizeIdx2size (sizeIdx )
195+ : "doNotDestroy="
196+ + s .doNotDestroy
197+ + ", elemSize="
198+ + s .elemSize
199+ + ", sizeIdx="
200+ + sizeIdx ;
197201 long handle = s .allocate ();
198202 assert handle >= 0 ;
199203 s .chunk .initBufWithSubpage (buf , null , handle , reqCapacity , cache );
@@ -221,7 +225,7 @@ private void tcacheAllocateNormal(
221225 }
222226 }
223227
224- // Method must be called inside synchronized(this) { ... } block
228+ // Method must be called inside synchronized(this) { ... } block
225229 private void allocateNormal (
226230 PooledByteBuf <T > buf , int reqCapacity , int sizeIdx , PoolThreadCache threadCache ) {
227231 if (q050 .allocate (buf , reqCapacity , sizeIdx , threadCache )
@@ -272,7 +276,7 @@ void free(
272276 }
273277 }
274278
275- private SizeClass sizeClass (long handle ) {
279+ private static SizeClass sizeClass (long handle ) {
276280 return isSubpage (handle ) ? SizeClass .Small : SizeClass .Normal ;
277281 }
278282
@@ -499,6 +503,25 @@ public long numActiveBytes() {
499503 return max (0 , val );
500504 }
501505
506+ /**
507+ * Return the number of bytes that are currently pinned to buffer instances, by the arena. The
508+ * pinned memory is not accessible for use by any other allocation, until the buffers using have
509+ * all been released.
510+ */
511+ public long numPinnedBytes () {
512+ long val =
513+ activeBytesHuge
514+ .value (); // Huge chunks are exact-sized for the buffers they were allocated to.
515+ synchronized (this ) {
516+ for (int i = 0 ; i < chunkListMetrics .size (); i ++) {
517+ for (PoolChunkMetric m : chunkListMetrics .get (i )) {
518+ val += ((PoolChunk <?>) m ).pinnedBytes ();
519+ }
520+ }
521+ }
522+ return max (0 , val );
523+ }
524+
502525 protected abstract PoolChunk <T > newChunk (
503526 int pageSize , int maxPageIdx , int pageShifts , int chunkSize );
504527
@@ -588,13 +611,8 @@ private void destroyPoolChunkLists(PoolChunkList<T>... chunkLists) {
588611
589612 static final class HeapArena extends PoolArena <byte []> {
590613
591- HeapArena (
592- PooledByteBufAllocator parent ,
593- int pageSize ,
594- int pageShifts ,
595- int chunkSize ,
596- int directMemoryCacheAlignment ) {
597- super (parent , pageSize , pageShifts , chunkSize , directMemoryCacheAlignment );
614+ HeapArena (PooledByteBufAllocator parent , int pageSize , int pageShifts , int chunkSize ) {
615+ super (parent , pageSize , pageShifts , chunkSize , 0 );
598616 }
599617
600618 private static byte [] newByteArray (int size ) {
@@ -610,12 +628,12 @@ boolean isDirect() {
610628 protected PoolChunk <byte []> newChunk (
611629 int pageSize , int maxPageIdx , int pageShifts , int chunkSize ) {
612630 return new PoolChunk <byte []>(
613- this , newByteArray (chunkSize ), pageSize , pageShifts , chunkSize , maxPageIdx , 0 );
631+ this , null , newByteArray (chunkSize ), pageSize , pageShifts , chunkSize , maxPageIdx );
614632 }
615633
616634 @ Override
617635 protected PoolChunk <byte []> newUnpooledChunk (int capacity ) {
618- return new PoolChunk <byte []>(this , newByteArray (capacity ), capacity , 0 );
636+ return new PoolChunk <byte []>(this , null , newByteArray (capacity ), capacity );
619637 }
620638
621639 @ Override
@@ -656,40 +674,33 @@ boolean isDirect() {
656674 return true ;
657675 }
658676
659- // mark as package-private, only for unit test
660- int offsetCacheLine (ByteBuffer memory ) {
661- // We can only calculate the offset if Unsafe is present as otherwise directBufferAddress(...)
662- // will
663- // throw an NPE.
664- int remainder =
665- HAS_UNSAFE
666- ? (int )
667- (PlatformDependent .directBufferAddress (memory ) & directMemoryCacheAlignmentMask )
668- : 0 ;
669-
670- // offset = alignment - address & (alignment - 1)
671- return directMemoryCacheAlignment - remainder ;
672- }
673-
674677 @ Override
675678 protected PoolChunk <ByteBuffer > newChunk (
676679 int pageSize , int maxPageIdx , int pageShifts , int chunkSize ) {
677680 if (directMemoryCacheAlignment == 0 ) {
681+ ByteBuffer memory = allocateDirect (chunkSize );
678682 return new PoolChunk <ByteBuffer >(
679- this , allocateDirect ( chunkSize ), pageSize , pageShifts , chunkSize , maxPageIdx , 0 );
683+ this , memory , memory , pageSize , pageShifts , chunkSize , maxPageIdx );
680684 }
681- final ByteBuffer memory = allocateDirect (chunkSize + directMemoryCacheAlignment );
685+
686+ final ByteBuffer base = allocateDirect (chunkSize + directMemoryCacheAlignment );
687+ final ByteBuffer memory =
688+ PlatformDependent .alignDirectBuffer (base , directMemoryCacheAlignment );
682689 return new PoolChunk <ByteBuffer >(
683- this , memory , pageSize , pageShifts , chunkSize , maxPageIdx , offsetCacheLine ( memory ) );
690+ this , base , memory , pageSize , pageShifts , chunkSize , maxPageIdx );
684691 }
685692
686693 @ Override
687694 protected PoolChunk <ByteBuffer > newUnpooledChunk (int capacity ) {
688695 if (directMemoryCacheAlignment == 0 ) {
689- return new PoolChunk <ByteBuffer >(this , allocateDirect (capacity ), capacity , 0 );
696+ ByteBuffer memory = allocateDirect (capacity );
697+ return new PoolChunk <ByteBuffer >(this , memory , memory , capacity );
690698 }
691- final ByteBuffer memory = allocateDirect (capacity + directMemoryCacheAlignment );
692- return new PoolChunk <ByteBuffer >(this , memory , capacity , offsetCacheLine (memory ));
699+
700+ final ByteBuffer base = allocateDirect (capacity + directMemoryCacheAlignment );
701+ final ByteBuffer memory =
702+ PlatformDependent .alignDirectBuffer (base , directMemoryCacheAlignment );
703+ return new PoolChunk <ByteBuffer >(this , base , memory , capacity );
693704 }
694705
695706 private static ByteBuffer allocateDirect (int capacity ) {
@@ -701,9 +712,9 @@ private static ByteBuffer allocateDirect(int capacity) {
701712 @ Override
702713 protected void destroyChunk (PoolChunk <ByteBuffer > chunk ) {
703714 if (PlatformDependent .useDirectBufferNoCleaner ()) {
704- PlatformDependent .freeDirectNoCleaner (chunk .memory );
715+ PlatformDependent .freeDirectNoCleaner (( ByteBuffer ) chunk .base );
705716 } else {
706- PlatformDependent .freeDirectBuffer (chunk .memory );
717+ PlatformDependent .freeDirectBuffer (( ByteBuffer ) chunk .base );
707718 }
708719 }
709720
0 commit comments