一、简介

Netty内存池了解几个概念:

1.核心类介绍

  • PoolArena是分配管理内存区域;
  • PoolChunk是内存区域,默认大小是4M。以page(默认为4K)为单位进行管理;
  • PoolSubpag是小于page内存管理;分Tiny(小于512内存)和Small(小于4K)两个类型;
  • PoolChunkList是提高内存分配效率并减少内部碎片;
  • PoolThreadCache是分配的线程缓存;

先从PoolChunk讲起

二、PoolChunk 源码分析

PoolChunk类主要负责内存块的分配与回收;以下术语对于理解代码很重要

  • page - 分配内存块的最小单位。
  • chunk - page的集合
  • 代码中chunkSize = 2^{maxOrder(二叉树高度)} * pageSize(page大小)

首先我们分配一个size = chunkSize的字节数组。每当需要创建给定大小的ByteBuf时,我们搜索字节数组中的第一个位置, 该位置具有足够的空白空间来容纳请求的大小,并返回一个编码这个偏移信息(这个内存段被标记为保留,所以它总是被一个ByteBuf使用)

1.是怎么管理page的集合呢?

用数组实现满二叉树结构;运用满二叉树结构管理内存page是否分配完,以及该节点能分配多少内存; 数组的内容保存当前节点高度;节点高度区分该节点能分配多少内存;

树结构如图下:

  • 根据节点高度值,就知道能分配多少,例如根节点高度为零,就能能分配chunkSize大小内存,高度为1,能分配chunkSize/2…高度为11只能分配pageSize大小;
  • 判断该节点内存是否分配完,根据节点高度,高度等于总层数 + 1;已经分配完;
  • 当我们分配和释放节点时,我们更新存储在memoryMap中的值,以便判定节点是否分配含义:
    • 1)memoryMap数组,默认初始化该节点高度,例如:memoryMap[512] = 9,则表示其本身到下面所有的子节点都可以被分配;
    • 2)memoryMap[512] = val (从10到11), 则表示512节点下有子节点已经分配过,则该节点不能直接被分配,而其子节点中的第val和val以下层还存在未分配的节点;
    • 3)memoryMap[512] = 12 (即总层数 + 1), 可分配的深度已经大于总层数, 则该节点下的所有子节点都已经被分配

2.数据结构

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
final class PoolChunk<T> implements PoolChunkMetric {

    private static final int INTEGER_SIZE_MINUS_ONE = Integer.SIZE - 1;

    final PoolArena<T> arena;  //PoolArena 管理PoolChunk
    final T memory;            //实际分配内存容量为chunkSize的byte[](heap方式)或ByteBuffer(direct方式)
    final boolean unpooled;    //是否启动内存池状态 
    final int offset;          

    private final byte[] memoryMap;  //分配内存状态满二叉树数组
    private final byte[] depthMap;   //保存满二叉树高度值数组,构造时memoryMap和depthMap都相同数据
    private final PoolSubpage<T>[] subpages;//分配少于pageSize内存空间
    /** Used to determine if the requested capacity is equal to or greater than pageSize. */
    private final int subpageOverflowMask;   //确定请求的容量是否等于或大于pageSize
    private final int pageSize;  //page大小
    private final int pageShifts; // log2(pageSize)
    private final int maxOrder;   //满二叉树的高度    
    private final int chunkSize;  //2^{maxOrder} * pageSize
    private final int log2ChunkSize;//log2(chunkSize)
    private final int maxSubpageAllocs;//1 << maxOrder=2^{maxOrder}   满二叉树的叶节点数量
    /** Used to mark memory as unusable */
    //用于分配内存标示,unusable=maxOrder + 1 这块内存已分配完成
    private final byte unusable; //maxOrder + 1

    private int freeBytes; //分配字节数量
   
    //PoolChunkList 
    PoolChunkList<T> parent;
    PoolChunk<T> prev;
    PoolChunk<T> next;
}
  • 毋庸置疑管理内存核心就是memoryMap数组,这个数组展示满二叉树结构,具备二叉树的结构;

3.构造

  • pageSize 默认4k
  • chunkSize 16MiB
  • maxOrder 11
  • pageShifts log2(pageShifts) 11
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
   PoolChunk(PoolArena<T> arena, T memory, int pageSize, int maxOrder, int pageShifts, int chunkSize, int offset) {
        unpooled = false;
        this.arena = arena;
        this.memory = memory;
        this.pageSize = pageSize;
        this.pageShifts = pageShifts;
        this.maxOrder = maxOrder;
        this.chunkSize = chunkSize;
        this.offset = offset;
        unusable = (byte) (maxOrder + 1);
        log2ChunkSize = log2(chunkSize);
        subpageOverflowMask = ~(pageSize - 1);
        freeBytes = chunkSize;

        assert maxOrder < 30 : "maxOrder should be < 30, but is: " + maxOrder;
        maxSubpageAllocs = 1 << maxOrder;//2^{maxOrder} 满二叉树叶节点数量

        // Generate the memory map.
        memoryMap = new byte[maxSubpageAllocs << 1];//满二叉树2^{maxOrder+1} -1个结点,数组下标0不为二叉树根节点,
                                                    // 下标1才是树根节点,节点计算2^{maxOrder+1}

        depthMap = new byte[memoryMap.length];
        int memoryMapIndex = 1; //数组从下标1开始
        //赋值满二叉树数组高度值   满二叉树的第d层有2 ^d 个结点
        for (int d = 0; d <= maxOrder; ++ d) { // move down the tree one level at a time
            int depth = 1 << d; //2 ^d
            for (int p = 0; p < depth; ++ p) {
                // in each level traverse left to right and set value to the depth of subtree
                memoryMap[memoryMapIndex] = (byte) d;
                depthMap[memoryMapIndex] = (byte) d;
                memoryMapIndex ++;
            }
        }
        //PoolSubpage作用就是分配小于4k内存,具体下面做分析
        subpages = newSubpageArray(maxSubpageAllocs);//创建PoolSubpage数组,大小为叶子节点数量
    }
    
    
   @SuppressWarnings("unchecked")
   private PoolSubpage<T>[] newSubpageArray(int size) {
       return new PoolSubpage[size];
   }

4.分配(allocate)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
    //normCapacity在PoolArena类中已经处理过,例如:申请分配9k内存,实际分配内存16K
   //返回long类型数组下标值,-1说明没找分配内存,
   //数组下标值int就可以满足,主要原因在于申请内存容量小于pageSize,分配机制PoolSubpage管理,所以PoolSubpage返回long一个状态标示,具体在PoolSubpage说明;
    long allocate(int normCapacity) {
        if ((normCapacity & subpageOverflowMask) != 0) { // normCapacity>= pageSize
            return allocateRun(normCapacity); //chunk 正常分配
        } else {
            return allocateSubpage(normCapacity);//小于pageSize,PoolSubpage进行分配
        }
    }

a.allocateRun分配大于等于pageSize内存

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
    private long allocateRun(int normCapacity) {
        //获取normCapacity所在高度,log2(normCapacity) - pageShifts(log2(pageSize))=log2(normCapacity-pageSize),获取normCapacity高度
        int d = maxOrder - (log2(normCapacity) - pageShifts);//maxOrder-normCapacity高度,WHY??分配内存二叉树高度越低,分配内存越大;跟正常高度相反;
        int id = allocateNode(d);
        if (id < 0) {
            return id;
        }
        freeBytes -= runLength(id);
        return id;
    }

b.allocateNode获取节点下标值allocateNode

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
    private int allocateNode(int d) {
        //从根节点遍历查询分配节点数组下标id
        int id = 1;
        int initial = - (1 << d); // has last d bits = 0 and rest all = 1  
                                  // 例如d=3,1<<3=8,
                                 //  8=0000 0000 0000 0000 0000 0000 0000 1000
                                 //  -8=1111 1111 1111 1111 1111 1111 1111 1000
                                 //
        byte val = value(id);
        if (val > d) { // unusable  根节点高度大于申请分配内存高度,说明根节点值unusable状态或者分配内存不够,不可以在分配;
            return -1;
        }
        //当高度d时所有id相等这个等式id & initial == 1 << d,然而小于高度d的节点下标的id时相等这个等式(id & initial) == 0
        //这里为什么这么做,主要原因时分配内存时,更新父类值,获取子类最小值;查找父类节点高度值,看是否能获取相应分配内存
        while (val < d || (id & initial) == 0) { // id & initial == 1 << d for all ids at depth d, for < d it is 0
            id <<= 1; //找子类左节点
            val = value(id);
            if (val > d) {  
                id ^= 1;    //子类右节点
                val = value(id);
            }
        }
        byte value = value(id);
        ...
        setValue(id, unusable); // mark as unusable 把当前节点设置不可以在分配
        updateParentsAlloc(id); //更新父节点值
        return id;
    }
   private byte value(int id) {
        return memoryMap[id];
    } 
    private void setValue(int id, byte val) {
        memoryMap[id] = val;
    }     

c.更新父节点以及追加到根节点的高度值

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
    /**
      * 更新父节点的高度值,往上更新 
      */
     private void updateParentsAlloc(int id) {
         while (id > 1) {
             int parentId = id >>> 1;  //id/2获取父节点
             byte val1 = value(id);
             byte val2 = value(id ^ 1);//id ^ 1设计不错,可以获取左节点,又可以获取右节点
             byte val = val1 < val2 ? val1 : val2;//获取最小值
             setValue(parentId, val);  //把子节点最小值赋给父节点
             id = parentId; //往上更新,递归树结构
         }
     } 

5.free释放内存分两种大于等page的大小以及小于page;分配逻辑也不同,释放也分两种;

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
    void free(long handle) {
        int memoryMapIdx = memoryMapIdx(handle);//获取memoryMap数组下标值
        int bitmapIdx = bitmapIdx(handle);      //获取PoolSubpage中bitmap下标值

        //这块PoolSubpage做具体分析,主要作用就是释放subpage
        if (bitmapIdx != 0) { // free a subpage
            PoolSubpage<T> subpage = subpages[subpageIdx(memoryMapIdx)];
            assert subpage != null && subpage.doNotDestroy;

            // Obtain the head of the PoolSubPage pool that is owned by the PoolArena and synchronize on it.
            // This is need as we may add it back and so alter the linked-list structure.
            PoolSubpage<T> head = arena.findSubpagePoolHead(subpage.elemSize);
            synchronized (head) {
                //0x3FFFFFFF=1<<30-1;
                //bitmapIdx & 0x3FFFFFFF为什么这么操作?
                //在PoolSubpage分配内存时,组装返回值中有个标示状态(PoolSubpage分配具体说明)占最高2位;所以这样排除最高2位处理,获取真正bitmapIdx值;
                if (subpage.free(head, bitmapIdx & 0x3FFFFFFF)) {//
                    return;
                }
            }
        }
        
        freeBytes += runLength(memoryMapIdx);//释放内存大小
        setValue(memoryMapIdx, depth(memoryMapIdx));//设置高度
        updateParentsFree(memoryMapIdx);
    }
    private static int memoryMapIdx(long handle) {
        return (int) handle;
    }

    private static int bitmapIdx(long handle) {
        return (int) (handle >>> Integer.SIZE);
    }
    private byte depth(int id) {
        return depthMap[id];
    }
    private int runLength(int id) {
        // represents the size in #bytes supported by node 'id' in the tree
        // (1 << (maxOrder - depth(id)))*pageSize ==> (1 << (maxOrder - depth(id)))* 1<< log2(pageSize)  ==>
        // log2ChunkSize 高度包括pageSize高度
        return 1 << log2ChunkSize - depth(id);
    }

a.释放内存更新父节点以及追加到根节点的高度值

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
    private void updateParentsFree(int id) {
        int logChild = depth(id) + 1;
        while (id > 1) {
            int parentId = id >>> 1;
            byte val1 = value(id);   
            byte val2 = value(id ^ 1);
            logChild -= 1; // in first iteration equals log, subsequently reduce 1 from logChild as we traverse up
            //logChild判断孩子节点是否分配;都没分配把父节点修改成高度值,有个孩子分配,父节点修改成孩子中最小值
            if (val1 == logChild && val2 == logChild) {
                //logChild - 1 修改成以前高度节点,子类都可以分配内存
                setValue(parentId, (byte) (logChild - 1));
            } else {
                byte val = val1 < val2 ? val1 : val2;
                setValue(parentId, val);
            }

            id = parentId;//往上迭代
        }
    } 
  • logChild - 1??前置条件左节点和右节点高度相同,且根释放节点高度相同;
    • 例如:该叶节点的释放(高度11)和该节点右节点也释放(高度11),按左右节点大小判断,父节点的高度11,这里错;赋值为高度-1;

6.initBuf初始化PooledByteBuf

调用PooledByteBuf.init进行初始化操作

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
  void initBuf(PooledByteBuf<T> buf, long handle, int reqCapacity) {
        int memoryMapIdx = (int)handle;
        int bitmapIdx = (int)(handle >>> 32);
        if (bitmapIdx == 0) {
            byte val = this.value(memoryMapIdx);

            assert val == this.unusable : String.valueOf(val);
             //runOffset(memoryMapIdx) 获取偏移量
             //this.runLength(memoryMapIdx)实际分配内存
            buf.init(this, handle, this.runOffset(memoryMapIdx), reqCapacity, this.runLength(memoryMapIdx), this.arena.parent.threadCache());
        } else {
            this.initBufWithSubpage(buf, handle, bitmapIdx, reqCapacity);
        }

    }
  
     private byte depth(int id) {
              return depthMap[id];
          }
          private int runLength(int id) {
              // represents the size in #bytes supported by node 'id' in the tree
              return 1 << log2ChunkSize - depth(id);
          }
      //获取数组下标偏移量
      private int runOffset(int id) {
          // represents the 0-based offset in #bytes from start of the byte-array chunk
          //表示从字节数组块开始的以字节为单位的基于0的偏移量
          int shift = id ^ 1 << depth(id); //获取d层所有节点从左到右排列顺序(0标示开始);先执行1 << depth(id),获取节点数量,也是该层开始下标位置;获取该层节点从左到右下标位置;
          return shift * runLength(id);    //获取实际内存地址标偏移量,runLength(id)获取该id的层分配大小;
      }    

三、PoolSubpage 源码分析

PoolSubpage管理小于page的大小集合;分Tiny(小于512内存)和Small(小于4K)两个类型;基于page大小分配相同类型集合;

1.数据结构

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
final class PoolSubpage<T> implements PoolSubpageMetric {

    final PoolChunk<T> chunk;     //属于chunk内存区域
    private final int memoryMapIdx;//PoolChunk中memoryMap数组下标
    private final int runOffset;   //PoolChunk中内存偏移量
    private final int pageSize;    //PoolChunk中page大小
    private final long[] bitmap;   //
   
    //PoolArena管理关联,后面具体说明
    PoolSubpage<T> prev;
    PoolSubpage<T> next;

    boolean doNotDestroy;           //是否需要释放整个Page
    int elemSize;                  //最小分配大小
    private int maxNumElems;       //该page包含的段数量
    private int bitmapLength;      //bitmap长度
    private int nextAvail;         //下一个可用的位置
    private int numAvail;          //可用段数量
}

2.构造

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
 //创建链接列表头的特殊构造函数。
    PoolSubpage(int pageSize) {
        chunk = null;
        memoryMapIdx = -1;
        runOffset = -1;
        elemSize = -1;
        this.pageSize = pageSize;
        bitmap = null;
    }

    PoolSubpage(PoolSubpage<T> head, PoolChunk<T> chunk, int memoryMapIdx, int runOffset, int pageSize, int elemSize) {
        this.chunk = chunk;
        this.memoryMapIdx = memoryMapIdx;
        this.runOffset = runOffset;
        this.pageSize = pageSize;
        //分配最小值16,long是64位代表64个块段区状态,所以pageSize / 16 / 64=pageSize >>> 10得到分配最大数组容量
        //为什么这里创建最大容量数组?PoolSubpage可以重复利用,只是elemSize大小可能不同,导致容量大小不同,就设置最大容量数组,不用每次重新创建内存大小;
        bitmap = new long[pageSize >>> 10]; // pageSize / 16 / 64
        init(head, elemSize);//分配elemSize大小
    }

3.init 重新构建elemSize大小块

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
  void init(PoolSubpage<T> head, int elemSize) {
        doNotDestroy = true;
        this.elemSize = elemSize;
        if (elemSize != 0) {
            maxNumElems = numAvail = pageSize / elemSize;//分多个elemSize大小块段区
            nextAvail = 0;
            bitmapLength = maxNumElems >>> 6; // long是64位代表64个块段区状态
            if ((maxNumElems & 63) != 0) { //就是bitmapLength>>6=0情况,bitmapLength=1;
                bitmapLength ++;
            }

            for (int i = 0; i < bitmapLength; i ++) { //初始化bitmap值
                bitmap[i] = 0;
            }
        }
        addToPool(head);
    }
     
    //往头链表拼接形成环绕
    private void addToPool(PoolSubpage<T> head) {
        assert prev == null && next == null;
        prev = head;
        next = head.next;
        next.prev = this;
        head.next = this;
    }    

4.PoolChunk#allocateSubpage分配小于pageSize内存创建或者重构PoolSubpage

PoolChunk类

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
    private long allocateSubpage(int normCapacity) {
        // Obtain the head of the PoolSubPage pool that is owned by the PoolArena and synchronize on it.
        // This is need as we may add it back and so alter the linked-list structure.
        // 获取PoolArena拥有的PoolSubPage池的中获取头节点;进行head加锁
        PoolSubpage<T> head = arena.findSubpagePoolHead(normCapacity);
        synchronized (head) {
            int d = maxOrder; // subpages are only be allocated from pages i.e., leaves
            int id = allocateNode(d);//PoolChunk中分配可用内存id
            if (id < 0) {
                return id;
            }

            final PoolSubpage<T>[] subpages = this.subpages;
            final int pageSize = this.pageSize;

            freeBytes -= pageSize;

            int subpageIdx = subpageIdx(id);//获取subpages数组下标值;
            PoolSubpage<T> subpage = subpages[subpageIdx];
            if (subpage == null) {
                //创建PoolSubpage
                subpage = new PoolSubpage<T>(head, this, id, runOffset(id), pageSize, normCapacity);
                subpages[subpageIdx] = subpage;
            } else { 
                //重新构建normCapacity大小块内存
                subpage.init(head, normCapacity);
            }
            //PoolSubpage分配normCapacity大小内存
            return subpage.allocate();
        }
    }
    private int subpageIdx(int memoryMapIdx) {
        return memoryMapIdx ^ maxSubpageAllocs; // remove highest set bit, to get offset 获取节点下标从左到右,从零开始
    }    
    //获取数组下标偏移量
    private int runOffset(int id) {
        // represents the 0-based offset in #bytes from start of the byte-array chunk
        //表示从字节数组块开始的以字节为单位的基于0的偏移量
        int shift = id ^ 1 << depth(id); //获取d层所有节点从左到右排列顺序(0标示开始);
        return shift * runLength(id);    //获取实际内存地址标偏移量,runLength(id)获取该id的层分配大小;
    } 
    

5.allocate分配内存

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
  /**
     * Returns the bitmap index of the subpage allocation.
     */
    long allocate() {
        if (elemSize == 0) {
            return toHandle(0);
        }

        if (numAvail == 0 || !doNotDestroy) {
            return -1;
        }

        final int bitmapIdx = getNextAvail();//获取下个可用块内存
        int q = bitmapIdx >>> 6; //bitmapIdx/64,获取bitmap数组下标值
        int r = bitmapIdx & 63;  //获取63内下标值
        assert (bitmap[q] >>> r & 1) == 0;
        bitmap[q] |= 1L << r;    //在long类型64位相应下标值赋值位1;

        if (-- numAvail == 0) {
            removeFromPool();
        }

        return toHandle(bitmapIdx);
    }
    
   // |<--     64-63    -->|<--   62-32   -->| <--   32      --> | 
   // |      01(状态标示)   |   bitmapIdx     |  memoryMapIdx     | 
   //0x4000000000000000L=1<<62
   //bitmapIdx(最大值512)30位足够保存相应值
   //状态标示有个作用,bitmapIdx位零时,没有这个状态不能区分分配内存大于pageSize
    private long toHandle(int bitmapIdx) {
        return 0x4000000000000000L | (long) bitmapIdx << 32 | memoryMapIdx;
    }
    
    //删除该链表
    private void removeFromPool() {
        assert prev != null && next != null;
        prev.next = next;
        next.prev = prev;
        next = null;
        prev = null;
    }
   

a.查找可以分配内存getNextAvail

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
  private int getNextAvail() {
        int nextAvail = this.nextAvail;
        if (nextAvail >= 0) {    //为什么这里判断nextAvail >= 0成立时又将this.nextAvail = -1呢?
                                 //nextAvail只有两个地方赋值为nextAvail >= 0;在初始化赋值为零和在释放内存块时赋值相应值;
                                 //nextAvail = -1重新迭代查询中间是否有释放内存块;
            this.nextAvail = -1;  
            return nextAvail;
        }
        return findNextAvail();
    }

    private int findNextAvail() {
        final long[] bitmap = this.bitmap;
        final int bitmapLength = this.bitmapLength;
        for (int i = 0; i < bitmapLength; i ++) {
            long bits = bitmap[i];
            if (~bits != 0) {  //有分配内存
                return findNextAvail0(i, bits);
            }
        }
        return -1;
    }

    //获取nextAvail值
    private int findNextAvail0(int i, long bits) {
        final int maxNumElems = this.maxNumElems;
        final int baseVal = i << 6;//i*64

        for (int j = 0; j < 64; j ++) {
            if ((bits & 1) == 0) {
                int val = baseVal | j;
                if (val < maxNumElems) {
                    return val;
                } else {
                    break;
                }
            }
            bits >>>= 1;
        }
        return -1;
    } 

6.free释放内存

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
    boolean free(PoolSubpage<T> head, int bitmapIdx) {
        if (elemSize == 0) {
            return true;
        }
        int q = bitmapIdx >>> 6;
        int r = bitmapIdx & 63;
        assert (bitmap[q] >>> r & 1) != 0;
        bitmap[q] ^= 1L << r;

        setNextAvail(bitmapIdx);

        if (numAvail ++ == 0) { //不可分配内存中正好释放一个块内存,添加到缓存链表中
            addToPool(head);
            return true;
        }

        if (numAvail != maxNumElems) {
            return true;
        } else {
            // Subpage not in use (numAvail == maxNumElems)
            if (prev == next) {
                // Do not remove if this subpage is the only one left in the pool.
                //如果subpage池中唯一的一个,请不要删除。
                return true;
            }

            // Remove this subpage from the pool if there are other subpages left in the pool.
            //如果池中还有其他subpage,则从池中删除该子页面。
            doNotDestroy = false;
            removeFromPool();
            return false;
        }
    }
    private void setNextAvail(int bitmapIdx) {
        nextAvail = bitmapIdx;
    }    

7.initBufWithSubpage

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
   void initBufWithSubpage(PooledByteBuf<T> buf, long handle, int reqCapacity) {
        this.initBufWithSubpage(buf, handle, (int)(handle >>> 32), reqCapacity);
    }

    private void initBufWithSubpage(PooledByteBuf<T> buf, long handle, int bitmapIdx, int reqCapacity) {
        assert bitmapIdx != 0;

        int memoryMapIdx = (int)handle;
        PoolSubpage<T> subpage = this.subpages[this.subpageIdx(memoryMapIdx)];

        assert subpage.doNotDestroy;

        assert reqCapacity <= subpage.elemSize;
         //1073741823=0x3FFFFFFF free做具体分析这里不做分析
         //his.runOffset(memoryMapIdx) + (bitmapIdx & 1073741823) * subpage.elemSize 获取偏移量
         //subpage.elemSize实际分配内存
        buf.init(this, handle, this.runOffset(memoryMapIdx) + (bitmapIdx & 1073741823) * subpage.elemSize, reqCapacity, subpage.elemSize, this.arena.parent.threadCache());
    } 

四、PoolChunkList 源码分析

为了提高内存分配效率并减少内部碎片,jemalloc算法将Arena切分为小块Chunk,根据每块的内存使用率又将小块组合为以下几种状态: QINIT,Q0,Q25,Q50,Q75,Q100。Chunk块可以在这几种状态间随着内存使用率的变化进行转移,内存使用率和状态转移可参见下图:

PoolChunkList管理chunk状态;

1.数据结构

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
final class PoolChunkList<T> implements PoolChunkListMetric {
    private static final Iterator<PoolChunkMetric> EMPTY_METRICS = Collections.<PoolChunkMetric>emptyList().iterator();
    private final PoolArena<T> arena; //所属的Arena
    private final PoolChunkList<T> nextList;//next状态
    private final int minUsage;  //最小内存使用率
    private final int maxUsage;  //最大内存使用率
    private final int maxCapacity;//Chunk可分配的最大字节数
    private PoolChunk<T> head; //Chunk头节点

    // This is only update once when create the linked like list of PoolChunkList in PoolArena constructor.
    //这只是在PoolArena构造函数中创建PoolChunkList的链接列表时更新一次
    private PoolChunkList<T> prevList;//prev状态
 }   

2.构造

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
 PoolChunkList(PoolArena<T> arena, PoolChunkList<T> nextList, int minUsage, int maxUsage, int chunkSize) {
        assert minUsage <= maxUsage;
        this.arena = arena;
        this.nextList = nextList;
        this.minUsage = minUsage;
        this.maxUsage = maxUsage;
        maxCapacity = calculateMaxCapacity(minUsage, chunkSize);
    }

    private static int calculateMaxCapacity(int minUsage, int chunkSize) {
        minUsage = minUsage0(minUsage);

        if (minUsage == 100) {
            // If the minUsage is 100 we can not allocate anything out of this list.
            //如果minUsage是100,我们不能从这个列表中分配任何东西。
            return 0;
        }

        // Calculate the maximum amount of bytes that can be allocated from a PoolChunk in this PoolChunkList.
        // 计算PoolChunkList中可以从PoolChunk分配的最大字节数。
        // As an example: 例如
        // - If a PoolChunkList has minUsage == 25 we are allowed to allocate at most 75% of the chunkSize because
        //   this is the maximum amount available in any PoolChunk in this PoolChunkList.
        //如果PoolChunkList的minUsage为25,我们最多可以分配块大小的75%,因为这是PoolChunkList中任何PoolChunk的最大可用量。
        return  (int) (chunkSize * (100L - minUsage) / 100L);
    }
    private static int minUsage0(int value) {
        return max(1, value); //Math.max
    } 
   //设置prevList 
  void prevList(PoolChunkList<T> prevList) {
          assert this.prevList == null;
          this.prevList = prevList;
      }     

3.allocate分配内存

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
    boolean allocate(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
        if (head == null || normCapacity > maxCapacity) {
            // Either this PoolChunkList is empty or the requested capacity is larger then the capacity which can
            // be handled by the PoolChunks that are contained in this PoolChunkList.
            return false;
        }

        for (PoolChunk<T> cur = head;;) {
            long handle = cur.allocate(normCapacity); //申请分配内存
            if (handle < 0) {  //小于零,往下PoolChunk申请分配内存
                cur = cur.next;
                if (cur == null) {
                    return false;
                }
            } else {
                cur.initBuf(buf, handle, reqCapacity); //初始化PooledByteBuf
                if (cur.usage() >= maxUsage) { //chunk 当前使用的百分比大于等于PoolChunkList最大值,删除该chunk,往PoolChunkList下个值添加chunk;
                    remove(cur);
                    nextList.add(cur);
                }
                return true;
            }
        }
    }   

a.remove

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
     private void remove(PoolChunk<T> cur) {
         if (cur == head) {
             head = cur.next;
             if (head != null) {
                 head.prev = null;
             }
         } else {
             PoolChunk<T> next = cur.next;
             cur.prev.next = next;
             if (next != null) {
                 next.prev = cur.prev;
             }
         }
     } 

4.free释放内存

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
    boolean free(PoolChunk<T> chunk, long handle) {
        chunk.free(handle);
        if (chunk.usage() < minUsage) {//chunk 当前使用的百分比小于PoolChunkList最小值,删除该chunk,往PoolChunkList上个值添加chunk;
            remove(chunk);
            // Move the PoolChunk down the PoolChunkList linked-list.
            return move0(chunk);
        }
        return true;
    }

    private boolean move0(PoolChunk<T> chunk) {
        if (prevList == null) {
            // There is no previous PoolChunkList so return false which result in having the PoolChunk destroyed and
            // all memory associated with the PoolChunk will be released.
            //没有先前的PoolChunkList返回false,导致PoolChunk被销毁,并且所有与PoolChunk关联的内存都将被释放
            assert chunk.usage() == 0;
            return false;
        }
        return prevList.move(chunk);
    }    

5.add 添加chunk

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
  void add(PoolChunk<T> chunk) {
        if (chunk.usage() >= maxUsage) {
            nextList.add(chunk);
            return;
        }
        add0(chunk);
    }

    void add0(PoolChunk<T> chunk) {
        chunk.parent = this;
        if (head == null) {
            head = chunk;
            chunk.prev = null;
            chunk.next = null;
        } else {
            chunk.prev = null;
            chunk.next = head;
            head.prev = chunk;
            head = chunk;
        }
    }

6.move 添加chunk

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
   private boolean move(PoolChunk<T> chunk) {
          assert chunk.usage() < maxUsage;
  
          if (chunk.usage() < minUsage) {
              // Move the PoolChunk down the PoolChunkList linked-list.
              return move0(chunk);
          }
  
          // PoolChunk fits into this PoolChunkList, adding it here.
          add0(chunk);
          return true;
      }

      private boolean move0(PoolChunk<T> chunk) {
          if (prevList == null) {
              // There is no previous PoolChunkList so return false which result in having the PoolChunk destroyed and
              // all memory associated with the PoolChunk will be released.
              assert chunk.usage() == 0;
              return false;
          }
          return prevList.move(chunk);
      }

五、PoolArena 源码分析

PoolArena是抽象类,两个实现类HeapArena(堆内存)和DirectArena(直接内存);

1.数据结构

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
abstract class PoolArena<T> implements PoolArenaMetric {
    static final boolean HAS_UNSAFE = PlatformDependent.hasUnsafe();

    enum SizeClass {
        Tiny,
        Small,
        Normal
        // 除此之外的请求为Huge
    }
    //Tiny分配固定值数量
    static final int numTinySubpagePools = 512 >>> 4;//32

    final PooledByteBufAllocator parent;

    private final int maxOrder; //chunk相关满二叉树的高度
    final int pageSize;         //page大小
    final int pageShifts;       //pageSize是2幂次方,pageShifts=log2(pageSize),位移大小
    final int chunkSize;        //chunk大小
    final int subpageOverflowMask; //用于判断请求是否为Small/Tiny
    final int numSmallSubpagePools;//small请求的双向链表头个数
    final int directMemoryCacheAlignment;//对齐基准
    final int directMemoryCacheAlignmentMask;//用于对齐内存
    private final PoolSubpage<T>[] tinySubpagePools;//tiny Subpage双向链表
    private final PoolSubpage<T>[] smallSubpagePools;//small Subpage双向链表

    //chunk状态 jemalloc博客有具体说明
    private final PoolChunkList<T> q050;
    private final PoolChunkList<T> q025;
    private final PoolChunkList<T> q000;
    private final PoolChunkList<T> qInit;
    private final PoolChunkList<T> q075;
    private final PoolChunkList<T> q100;

    // 线程缓存数量
    final AtomicInteger numThreadCaches = new AtomicInteger();
 }   

2.构造

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
 protected PoolArena(PooledByteBufAllocator parent, int pageSize,
          int maxOrder, int pageShifts, int chunkSize, int cacheAlignment) {
        this.parent = parent;
        this.pageSize = pageSize;
        this.maxOrder = maxOrder;
        this.pageShifts = pageShifts;
        this.chunkSize = chunkSize;
        directMemoryCacheAlignment = cacheAlignment;
        directMemoryCacheAlignmentMask = cacheAlignment - 1;
        subpageOverflowMask = ~(pageSize - 1);
        tinySubpagePools = newSubpagePoolArray(numTinySubpagePools); // 512 >>> 4
        for (int i = 0; i < tinySubpagePools.length; i ++) {
            tinySubpagePools[i] = newSubpagePoolHead(pageSize); //初始化PoolSubpage的头节点
        }
        //Small分配固定值数量 
        numSmallSubpagePools = pageShifts - 9;// 2^9=512
        smallSubpagePools = newSubpagePoolArray(numSmallSubpagePools);
        for (int i = 0; i < smallSubpagePools.length; i ++) {
            smallSubpagePools[i] = newSubpagePoolHead(pageSize); //初始化PoolSubpage的头节点
        }

        //PoolChunkList 状态赋值,状态关联初始化
        q100 = new PoolChunkList<T>(this, null, 100, Integer.MAX_VALUE, chunkSize);
        q075 = new PoolChunkList<T>(this, q100, 75, 100, chunkSize);
        q050 = new PoolChunkList<T>(this, q075, 50, 100, chunkSize);
        q025 = new PoolChunkList<T>(this, q050, 25, 75, chunkSize);
        q000 = new PoolChunkList<T>(this, q025, 1, 50, chunkSize);
        qInit = new PoolChunkList<T>(this, q000, Integer.MIN_VALUE, 25, chunkSize);

        q100.prevList(q075);
        q075.prevList(q050);
        q050.prevList(q025);
        q025.prevList(q000);
        q000.prevList(null);
        qInit.prevList(qInit);

    }
   private PoolSubpage<T> newSubpagePoolHead(int pageSize) {
        PoolSubpage<T> head = new PoolSubpage<T>(pageSize);
        head.prev = head;
        head.next = head;
        return head;
    }
    @SuppressWarnings("unchecked")
    private PoolSubpage<T>[] newSubpagePoolArray(int size) {
        return new PoolSubpage[size];
    } 
    private PoolSubpage<T> newSubpagePoolHead(int pageSize) {
        PoolSubpage<T> head = new PoolSubpage<T>(pageSize);
        head.prev = head;
        head.next = head;
        return head;
    }
   

2.allocate 分配内存

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
    PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {
        PooledByteBuf<T> buf = newByteBuf(maxCapacity); //newByteBufy由子类实现
        allocate(cache, buf, reqCapacity);
        return buf;
    }
  private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
        final int normCapacity = normalizeCapacity(reqCapacity);//获取计算需要分配内存值
        if (isTinyOrSmall(normCapacity)) { // capacity < pageSize
            int tableIdx;
            PoolSubpage<T>[] table;
            boolean tiny = isTiny(normCapacity);
            if (tiny) { // < 512
                if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) { //PoolThreadCache缓存中获取,具体后面介绍
                    // was able to allocate out of the cache so move on
                    return;
                }
                tableIdx = tinyIdx(normCapacity);
                table = tinySubpagePools;
            } else {
                if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {//PoolThreadCache缓存中获取,具体后面介绍
                    // was able to allocate out of the cache so move on
                    return;
                }
                tableIdx = smallIdx(normCapacity);
                table = smallSubpagePools;
            }

            final PoolSubpage<T> head = table[tableIdx];

            //从tinySubpagePools和smallSubpagePools中获取要分配内存
            synchronized (head) {
                final PoolSubpage<T> s = head.next;
                if (s != head) {   //
                    assert s.doNotDestroy && s.elemSize == normCapacity;
                    long handle = s.allocate();
                    assert handle >= 0;
                    s.chunk.initBufWithSubpage(buf, handle, reqCapacity);
                    incTinySmallAllocation(tiny); //统计创建Tiny或者Small数量
                    return;
                }
            }
            synchronized (this) {
                allocateNormal(buf, reqCapacity, normCapacity);//从chunk分配内存
            }

            incTinySmallAllocation(tiny);
            return;
        }
        if (normCapacity <= chunkSize) {
            if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
                // was able to allocate out of the cache so move on
                return;
            }
            synchronized (this) {
                allocateNormal(buf, reqCapacity, normCapacity);
                ++allocationsNormal;
            }
        } else {
            // Huge allocations are never served via the cache so just call allocateHuge
            //巨大的分配永远不会通过内存池服务,所以只需调用allocateHuge
            allocateHuge(buf, reqCapacity);
        }
    } 

   // capacity < pageSize  subpageOverflowMask=~(pageSize-1)
   boolean isTinyOrSmall(int normCapacity) {
       return (normCapacity & subpageOverflowMask) == 0;
   }
   
   // normCapacity < 512  0xFFFFFE00=-512
   static boolean isTiny(int normCapacity) {
       return (normCapacity & 0xFFFFFE00) == 0;
   }
   
   //获取tinySubpagePools数组下标值
   static int tinyIdx(int normCapacity) {
       return normCapacity >>> 4;
   }

   //获取smallSubpagePools数组下标值
   static int smallIdx(int normCapacity) {
       int tableIdx = 0;
       int i = normCapacity >>> 10; //normCapacity/512
       while (i != 0) {
           i >>>= 1;   //i/2
           tableIdx ++;
       }
       return tableIdx;
   }    

    private void incTinySmallAllocation(boolean tiny) {
        if (tiny) {
            allocationsTiny.increment();
        } else {
            allocationsSmall.increment();
        }
    }   

3.normalizeCapacity 获取计算需要分配内存值;

两种情况大于等于512情况和小于512情况 大于等于512:大于reqCapacity值最近2幂次方值 小于512情况:reqCapacity的值在(1-(521»4)=32)«4范围内,一共32个

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
  int normalizeCapacity(int reqCapacity) {
        if (reqCapacity < 0) {
            throw new IllegalArgumentException("capacity: " + reqCapacity + " (expected: 0+)");
        }

        if (reqCapacity >= chunkSize) {
            return directMemoryCacheAlignment == 0 ? reqCapacity : alignCapacity(reqCapacity);
        }

        if (!isTiny(reqCapacity)) { // >= 512
            // Doubled

            int normalizedCapacity = reqCapacity;
            normalizedCapacity --;
            //这块代码 最高位1后面都要补1
            //例如 normalizedCapacity=6 0110 转化normalizedCapacity=7 0111
            normalizedCapacity |= normalizedCapacity >>>  1;
            normalizedCapacity |= normalizedCapacity >>>  2;
            normalizedCapacity |= normalizedCapacity >>>  4;
            normalizedCapacity |= normalizedCapacity >>>  8;
            normalizedCapacity |= normalizedCapacity >>> 16;
            //最后加1,能获取normalizedCapacity最近2幂次方
            normalizedCapacity ++;

            if (normalizedCapacity < 0) { //normalizedCapacity达到最大
                normalizedCapacity >>>= 1;
            }
            assert directMemoryCacheAlignment == 0 || (normalizedCapacity & directMemoryCacheAlignmentMask) == 0;

            return normalizedCapacity;
        }

        if (directMemoryCacheAlignment > 0) {
            return alignCapacity(reqCapacity);
        }

        // Quantum-spacedb
        if ((reqCapacity & 15) == 0) {//reqCapacity的值在(1-(512>>4)=32)<<4范围内,说白就是16倍数而已,一共31个不包括512
            return reqCapacity;
        }

        return (reqCapacity & ~15) + 16;//把reqCapacity最后四位补零在加16,在上面范围内
    }
    //0xFFFFFE00=-512 判断是否小于512
   static boolean isTiny(int normCapacity) {
       return (normCapacity & 0xFFFFFE00) == 0;
   }
   int alignCapacity(int reqCapacity) {
       int delta = reqCapacity & directMemoryCacheAlignmentMask;
       return delta == 0 ? reqCapacity : reqCapacity + directMemoryCacheAlignment - delta;
   }    

4.allocateNormal 从chunk中分配内存

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
    private void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
        //为什么是从q050开始尝试分配呢?
        // q050是内存占用50%~100%的chunk,猜测是希望能够提高整个应用的内存使用率,因为这样大部分情况下会使用q050的内存,
        //这样在内存使用不是很多的情况下一些利用率低(<50%)的chunk慢慢就会淘汰出去,最终被回收。

        if (q050.allocate(buf, reqCapacity, normCapacity) || q025.allocate(buf, reqCapacity, normCapacity) ||
            q000.allocate(buf, reqCapacity, normCapacity) || qInit.allocate(buf, reqCapacity, normCapacity) ||
            q075.allocate(buf, reqCapacity, normCapacity)) {
            return;
        }

        // Add a new chunk. 创建新chunk
        PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize);//抽象方法子类实现
        long handle = c.allocate(normCapacity);
        assert handle > 0;
        c.initBuf(buf, handle, reqCapacity);
        qInit.add(c); //第一次创建添加到qInit状态中
    }

5.allocateHuge 分配大小大于chunkSize不通过内存池创建

1
2
3
4
5
6
    private void allocateHuge(PooledByteBuf<T> buf, int reqCapacity) {
        PoolChunk<T> chunk = newUnpooledChunk(reqCapacity);//子类实现
        activeBytesHuge.add(chunk.chunkSize()); //统计
        buf.initUnpooled(chunk, reqCapacity);
        allocationsHuge.increment();            //统计
    }

6.free 释放内存

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
    void free(PoolChunk<T> chunk, long handle, int normCapacity, PoolThreadCache cache) {
        if (chunk.unpooled) {
            int size = chunk.chunkSize();
            destroyChunk(chunk);   //子类去实现
            activeBytesHuge.add(-size);
            deallocationsHuge.increment();
        } else {
            SizeClass sizeClass = sizeClass(normCapacity);
            if (cache != null && cache.add(this, chunk, handle, normCapacity, sizeClass)) { //PoolThreadCache操作
                // cached so not free it.
                //缓存,所以不释放它。
                return;
            }

            freeChunk(chunk, handle, sizeClass);
        }
    }

   private SizeClass sizeClass(int normCapacity) {
       if (!isTinyOrSmall(normCapacity)) {
           return SizeClass.Normal;
       }
       return isTiny(normCapacity) ? SizeClass.Tiny : SizeClass.Small;
   }

   void freeChunk(PoolChunk<T> chunk, long handle, SizeClass sizeClass) {
       final boolean destroyChunk;
       synchronized (this) {
           switch (sizeClass) {
           case Normal:
               ++deallocationsNormal;
               break;
           case Small:
               ++deallocationsSmall;
               break;
           case Tiny:
               ++deallocationsTiny;
               break;
           default:
               throw new Error();
           }
           destroyChunk = !chunk.parent.free(chunk, handle);//chunk.parent ChunkList操作free
       }
       if (destroyChunk) {//ChunkList操作free返回false才调用destroyChunk
           // destroyChunk not need to be called while holding the synchronized lock.
           destroyChunk(chunk); //子类去实现
       }
   }

7.findSubpagePoolHead 查询subpagePool的头节点

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
    PoolSubpage<T> findSubpagePoolHead(int elemSize) {
        int tableIdx;
        PoolSubpage<T>[] table;
        if (isTiny(elemSize)) { // < 512
            tableIdx = elemSize >>> 4;
            table = tinySubpagePools;
        } else {
            tableIdx = 0;
            elemSize >>>= 10;
            while (elemSize != 0) {
                elemSize >>>= 1;
                tableIdx ++;
            }
            table = smallSubpagePools;
        }

        return table[tableIdx];
    }

8.reallocate 重新分配内存

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
    void reallocate(PooledByteBuf<T> buf, int newCapacity, boolean freeOldMemory) {
        if (newCapacity < 0 || newCapacity > buf.maxCapacity()) {
            throw new IllegalArgumentException("newCapacity: " + newCapacity);
        }

        int oldCapacity = buf.length;
        if (oldCapacity == newCapacity) {
            return;
        }

        PoolChunk<T> oldChunk = buf.chunk;
        long oldHandle = buf.handle;
        T oldMemory = buf.memory;
        int oldOffset = buf.offset;
        int oldMaxLength = buf.maxLength;
        int readerIndex = buf.readerIndex();
        int writerIndex = buf.writerIndex();

        allocate(parent.threadCache(), buf, newCapacity); //分配新内存大小
        if (newCapacity > oldCapacity) {
            memoryCopy(
                    oldMemory, oldOffset,
                    buf.memory, buf.offset, oldCapacity); //抽象方法,子类去实现
        } else if (newCapacity < oldCapacity) {
            if (readerIndex < newCapacity) {
                if (writerIndex > newCapacity) {
                    writerIndex = newCapacity;
                }
                memoryCopy(
                        oldMemory, oldOffset + readerIndex,
                        buf.memory, buf.offset + readerIndex, writerIndex - readerIndex);
            } else {
                readerIndex = writerIndex = newCapacity;
            }
        }

        buf.setIndex(readerIndex, writerIndex);

        if (freeOldMemory) {
            free(oldChunk, oldHandle, oldMaxLength, buf.cache);//释放旧内存
        }
    }

9.子类实现HeapArena和DirectArena

不做具体分析;

1
2
3
4
5
6
    //子类实现抽象方法
   protected abstract PoolChunk<T> newChunk(int pageSize, int maxOrder, int pageShifts, int chunkSize);
   protected abstract PoolChunk<T> newUnpooledChunk(int capacity);
   protected abstract PooledByteBuf<T> newByteBuf(int maxCapacity);
   protected abstract void memoryCopy(T src, int srcOffset, T dst, int dstOffset, int length);
   protected abstract void destroyChunk(PoolChunk<T> chunk);   

六、PoolThreadCache 源码分析

1.数据结构

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
final class PoolThreadCache {

    private static final InternalLogger logger = InternalLoggerFactory.getInstance(PoolThreadCache.class);

    //类型不同PoolArena
    final PoolArena<byte[]> heapArena;
    final PoolArena<ByteBuffer> directArena;

    // Hold the caches for the different size classes, which are tiny, small and normal.
    //大小不同的缓存
    private final MemoryRegionCache<byte[]>[] tinySubPageHeapCaches;
    private final MemoryRegionCache<byte[]>[] smallSubPageHeapCaches;
    private final MemoryRegionCache<ByteBuffer>[] tinySubPageDirectCaches;
    private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches;
    private final MemoryRegionCache<byte[]>[] normalHeapCaches;
    private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches;

    // Used for bitshifting when calculate the index of normal caches later
    //稍后计算正常高速缓存的索引时用于位移
    private final int numShiftsNormalDirect;
    private final int numShiftsNormalHeap;
    //释放队列缓存门槛次数
    private final int freeSweepAllocationThreshold;
    //缓存分配次数
    private int allocations;
}     

2.MemoryRegionCache源码分析

MemoryRegionCache是抽象类,有SubPageMemoryRegionCache和NormalMemoryRegionCache实现

(1).数据结构

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
  private abstract static class MemoryRegionCache<T> {
         private final int size;   //申请队列大小
         private final Queue<Entry<T>> queue;//调用jctools中队列
         private final SizeClass sizeClass;  //分配类型,Tiny,Small,Normal
         private int allocations;   //分配数量统计
         
       //运用对象池
       static final class Entry<T> {
           final Handle<Entry<?>> recyclerHandle;
           PoolChunk<T> chunk;
           long handle = -1;

           Entry(Handle<Entry<?>> recyclerHandle) {
               this.recyclerHandle = recyclerHandle;
           }

           void recycle() {
               chunk = null;
               handle = -1;
               recyclerHandle.recycle(this);
           }
       }          
}         

(2).构造

1
2
3
4
5
     MemoryRegionCache(int size, SizeClass sizeClass) {
            this.size = MathUtil.safeFindNextPositivePowerOfTwo(size);//最终结果1<<log2(size-1)
            queue = PlatformDependent.newFixedMpscQueue(this.size);   //hasUnsafe() ? new MpscArrayQueue<T>(capacity) : new MpscAtomicArrayQueue<T>(capacity);
            this.sizeClass = sizeClass;
        }

(3).add 添加缓存

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
        public final boolean add(PoolChunk<T> chunk, long handle) {
            Entry<T> entry = newEntry(chunk, handle);
            boolean queued = queue.offer(entry);
            if (!queued) {
                //如果不可能缓存该块,则立即回收该entry。
                entry.recycle();
            }

            return queued;
        }
  private static Entry newEntry(PoolChunk<?> chunk, long handle) {
           Entry entry = RECYCLER.get();
           entry.chunk = chunk;
           entry.handle = handle;
           return entry;
       }

       //Recycler后面主体做介绍
       @SuppressWarnings("rawtypes")
       private static final Recycler<Entry> RECYCLER = new Recycler<Entry>() {
           @SuppressWarnings("unchecked")
           @Override
           protected Entry newObject(Handle<Entry> handle) {
               return new Entry(handle);
           }
       };         

(4).从缓存中分配内存

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
        public final boolean allocate(PooledByteBuf<T> buf, int reqCapacity) {
            Entry<T> entry = queue.poll();
            if (entry == null) {
                return false;
            }
            initBuf(entry.chunk, entry.handle, buf, reqCapacity);//抽象类
            entry.recycle(); //回收
            //allocations不是线程安全的,同一个线程始终调用。
            ++ allocations;
            return true;
        }

(5).从缓存队列中释放所有内存

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
        public final int free() {
            return free(Integer.MAX_VALUE);
        }

        private int free(int max) {
            int numFreed = 0;
            for (; numFreed < max; numFreed++) {
                Entry<T> entry = queue.poll();
                if (entry != null) {
                    freeEntry(entry);
                } else {
                    // all cleared
                    return numFreed;
                }
            }
            return numFreed;
        }
   @SuppressWarnings({ "unchecked", "rawtypes" })
       private  void freeEntry(Entry entry) {
           PoolChunk chunk = entry.chunk;
           long handle = entry.handle;

           //现在回收所以PoolChunk可以被GC化
           entry.recycle();
           //freeChunk
           chunk.arena.freeChunk(chunk, handle, sizeClass);
       }         

(5).trim如果分配不够频繁,则释放缓存的PoolChunk。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
        /**
         * 如果分配不够频繁,则释放缓存的PoolChunk。
         */
        public final void trim() {
            int free = size - allocations;
            allocations = 0;

            // We not even allocated all the number that are
            if (free > 0) {
                free(free);
            }
        }

(6).子类实现SubPageMemoryRegionCache和NormalMemoryRegionCache

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
    private static final class SubPageMemoryRegionCache<T> extends MemoryRegionCache<T> {
        SubPageMemoryRegionCache(int size, SizeClass sizeClass) {
            super(size, sizeClass);
        }

        @Override
        protected void initBuf(
                PoolChunk<T> chunk, long handle, PooledByteBuf<T> buf, int reqCapacity) {
            chunk.initBufWithSubpage(buf, handle, reqCapacity);
        }
    }

    private static final class NormalMemoryRegionCache<T> extends MemoryRegionCache<T> {
        NormalMemoryRegionCache(int size) {
            super(size, SizeClass.Normal);
        }

        @Override
        protected void initBuf(
                PoolChunk<T> chunk, long handle, PooledByteBuf<T> buf, int reqCapacity) {
            chunk.initBuf(buf, handle, reqCapacity);
        }
    }

3.PoolThreadCache构造

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88

   PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena,
                   int tinyCacheSize, int smallCacheSize, int normalCacheSize,
                   int maxCachedBufferCapacity, int freeSweepAllocationThreshold) {
       ...
       this.freeSweepAllocationThreshold = freeSweepAllocationThreshold;
       this.heapArena = heapArena;
       this.directArena = directArena;
       if (directArena != null) { //直接内存初始化
           tinySubPageDirectCaches = createSubPageCaches(
                   tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny); //创建Tiny缓存
           smallSubPageDirectCaches = createSubPageCaches(
                   smallCacheSize, directArena.numSmallSubpagePools, SizeClass.Small);//创建small缓存

           numShiftsNormalDirect = log2(directArena.pageSize);
           normalDirectCaches = createNormalCaches(
                   normalCacheSize, maxCachedBufferCapacity, directArena);               //创建normal缓存

           directArena.numThreadCaches.getAndIncrement();
       } else {
           // No directArea is configured so just null out all caches
           tinySubPageDirectCaches = null;
           smallSubPageDirectCaches = null;
           normalDirectCaches = null;
           numShiftsNormalDirect = -1;
       }
       if (heapArena != null) {  //堆内存初始化缓存数据
           // Create the caches for the heap allocations
           tinySubPageHeapCaches = createSubPageCaches(
                   tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
           smallSubPageHeapCaches = createSubPageCaches(
                   smallCacheSize, heapArena.numSmallSubpagePools, SizeClass.Small);

           numShiftsNormalHeap = log2(heapArena.pageSize);
           normalHeapCaches = createNormalCaches(
                   normalCacheSize, maxCachedBufferCapacity, heapArena);

           heapArena.numThreadCaches.getAndIncrement();
       } else {
           // No heapArea is configured so just null out all caches
           tinySubPageHeapCaches = null;
           smallSubPageHeapCaches = null;
           normalHeapCaches = null;
           numShiftsNormalHeap = -1;
       }
    ...
   }

   private static <T> MemoryRegionCache<T>[] createSubPageCaches(
           int cacheSize, int numCaches, SizeClass sizeClass) {
       if (cacheSize > 0 && numCaches > 0) {
           @SuppressWarnings("unchecked")
           MemoryRegionCache<T>[] cache = new MemoryRegionCache[numCaches];
           for (int i = 0; i < cache.length; i++) {
               // TODO: maybe use cacheSize / cache.length
               cache[i] = new SubPageMemoryRegionCache<T>(cacheSize, sizeClass);
           }
           return cache;
       } else {
           return null;
       }
   }

   private static <T> MemoryRegionCache<T>[] createNormalCaches(
           int cacheSize, int maxCachedBufferCapacity, PoolArena<T> area) {
       if (cacheSize > 0 && maxCachedBufferCapacity > 0) {
           int max = Math.min(area.chunkSize, maxCachedBufferCapacity);
           int arraySize = Math.max(1, log2(max / area.pageSize) + 1);

           @SuppressWarnings("unchecked")
           MemoryRegionCache<T>[] cache = new MemoryRegionCache[arraySize];
           for (int i = 0; i < cache.length; i++) {
               cache[i] = new NormalMemoryRegionCache<T>(cacheSize);
           }
           return cache;
       } else {
           return null;
       }
   }

   private static int log2(int val) {
       int res = 0;
       while (val > 1) {
           val >>= 1;
           res++;
       }
       return res;
   }

说白其次都是操作MemoryRegionCache类进行缓存;下面所用操作都是MemoryRegionCache

4.添加缓存

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

   /**
    * Add {@link PoolChunk} and {@code handle} to the cache if there is enough room.
    * Returns {@code true} if it fit into the cache {@code false} otherwise.
    */
   @SuppressWarnings({ "unchecked", "rawtypes" })
   boolean add(PoolArena<?> area, PoolChunk chunk, long handle, int normCapacity, SizeClass sizeClass) {
       MemoryRegionCache<?> cache = cache(area, normCapacity, sizeClass);
       if (cache == null) {
           return false;
       }
       return cache.add(chunk, handle);
   }

   private MemoryRegionCache<?> cache(PoolArena<?> area, int normCapacity, SizeClass sizeClass) {
       switch (sizeClass) {
       case Normal:
           return cacheForNormal(area, normCapacity);
       case Small:
           return cacheForSmall(area, normCapacity);
       case Tiny:
           return cacheForTiny(area, normCapacity);
       default:
           throw new Error();
       }
   }

5.从缓存中分获取分配内存

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
 /**
     * Try to allocate a tiny buffer out of the cache. Returns {@code true} if successful {@code false} otherwise
     */
    boolean allocateTiny(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
        return allocate(cacheForTiny(area, normCapacity), buf, reqCapacity);
    }

    /**
     * Try to allocate a small buffer out of the cache. Returns {@code true} if successful {@code false} otherwise
     */
    boolean allocateSmall(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
        return allocate(cacheForSmall(area, normCapacity), buf, reqCapacity);
    }

    /**
     * Try to allocate a small buffer out of the cache. Returns {@code true} if successful {@code false} otherwise
     */
    boolean allocateNormal(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
        return allocate(cacheForNormal(area, normCapacity), buf, reqCapacity);
    }

    @SuppressWarnings({ "unchecked", "rawtypes" })
    private boolean allocate(MemoryRegionCache<?> cache, PooledByteBuf buf, int reqCapacity) {
        if (cache == null) {
            // no cache found so just return false here
            return false;
        }
        boolean allocated = cache.allocate(buf, reqCapacity);
        if (++ allocations >= freeSweepAllocationThreshold) { //门槛次数做判断,释放内存
            allocations = 0;
            trim();
        }
        return allocated;
    }
    

5.free释放缓存内存

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
    void free() {
        int numFreed = free(tinySubPageDirectCaches) +
                free(smallSubPageDirectCaches) +
                free(normalDirectCaches) +
                free(tinySubPageHeapCaches) +
                free(smallSubPageHeapCaches) +
                free(normalHeapCaches);

        if (directArena != null) {
            directArena.numThreadCaches.getAndDecrement();
        }

        if (heapArena != null) {
            heapArena.numThreadCaches.getAndDecrement();
        }
    }

    private static int free(MemoryRegionCache<?>[] caches) {
        if (caches == null) {
            return 0;
        }

        int numFreed = 0;
        for (MemoryRegionCache<?> c: caches) {
            numFreed += free(c);
        }
        return numFreed;
    }

    private static int free(MemoryRegionCache<?> cache) {
        if (cache == null) {
            return 0;
        }
        return cache.free();
    }

6.trim释放缓存内存

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
    void trim() {
        trim(tinySubPageDirectCaches);
        trim(smallSubPageDirectCaches);
        trim(normalDirectCaches);
        trim(tinySubPageHeapCaches);
        trim(smallSubPageHeapCaches);
        trim(normalHeapCaches);
    }

    private static void trim(MemoryRegionCache<?>[] caches) {
        if (caches == null) {
            return;
        }
        for (MemoryRegionCache<?> c: caches) {
            trim(c);
        }
    }

    private static void trim(MemoryRegionCache<?> cache) {
        if (cache == null) {
            return;
        }
        cache.trim();
    }

七、PooledByteBuf 源码分析

PooledByteBuf是抽象类

1.数据结构

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
abstract class PooledByteBuf<T> extends AbstractReferenceCountedByteBuf {

    private final Recycler.Handle<PooledByteBuf<T>> recyclerHandle;

    protected PoolChunk<T> chunk; //chunk
    protected long handle;        //chunk 定位值
    protected T memory;           //实际内存区域
    protected int offset;         //偏移量
    protected int length;         //内存大小
    int maxLength;                //最大长度
    PoolThreadCache cache;        //内存池缓存
    private ByteBuffer tmpNioBuf;
    private ByteBufAllocator allocator; 
}

2.构造

1
2
3
4
    protected PooledByteBuf(Recycler.Handle<? extends PooledByteBuf<T>> recyclerHandle, int maxCapacity) {
        super(maxCapacity);
        this.recyclerHandle = (Handle<PooledByteBuf<T>>) recyclerHandle;
    }

3.init 初始化,在内存池中分配内存调用

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
 void init(PoolChunk<T> chunk, long handle, int offset, int length, int maxLength, PoolThreadCache cache) {
        init0(chunk, handle, offset, length, maxLength, cache);
    }
    //没用到内存池初始化
    void initUnpooled(PoolChunk<T> chunk, int length) {
        init0(chunk, 0, chunk.offset, length, length, null);
    }

    private void init0(PoolChunk<T> chunk, long handle, int offset, int length, int maxLength, PoolThreadCache cache) {
        assert handle >= 0;
        assert chunk != null;

        this.chunk = chunk;
        memory = chunk.memory;
        allocator = chunk.arena.parent;
        this.cache = cache;
        this.handle = handle;
        this.offset = offset;
        this.length = length;
        this.maxLength = maxLength;
        tmpNioBuf = null;
    }

4.capacity 重新分配新容量

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
    @Override
    public final ByteBuf capacity(int newCapacity) {
        checkNewCapacity(newCapacity);

        // If the request capacity does not require reallocation, just update the length of the memory.
        if (chunk.unpooled) {
            if (newCapacity == length) {
                return this;
            }
        } else {
            if (newCapacity > length) {      //扩容
                if (newCapacity <= maxLength) { //小于maxLength 不用在重新分配容量
                    length = newCapacity;
                    return this;
                }
            } else if (newCapacity < length) {//缩小容量
                if (newCapacity > maxLength >>> 1) {//小于maxLength/2,
                    if (maxLength <= 512) {         //判断maxLength是否小于等于512
                        if (newCapacity > maxLength - 16) {
                            length = newCapacity;
                            setIndex(Math.min(readerIndex(), newCapacity), Math.min(writerIndex(), newCapacity));
                            return this;
                        }
                    } else { // > 512 (i.e. >= 1024)
                        length = newCapacity;
                        setIndex(Math.min(readerIndex(), newCapacity), Math.min(writerIndex(), newCapacity));
                        return this;
                    }
                }
            } else {
                return this;
            }
        }

        // Reallocation required. 重新分配内存大小
        chunk.arena.reallocate(this, newCapacity, true);
        return this;
    }