【注意】最后更新于 March 7, 2019,文中内容可能已过时,请谨慎使用。
一、简介
Netty内存池了解几个概念:
1.核心类介绍
- PoolArena是分配管理内存区域;
- PoolChunk是内存区域,默认大小是4M。以page(默认为4K)为单位进行管理;
- PoolSubpag是小于page内存管理;分Tiny(小于512内存)和Small(小于4K)两个类型;
- PoolChunkList是提高内存分配效率并减少内部碎片;
- PoolThreadCache是分配的线程缓存;
先从PoolChunk讲起
二、PoolChunk 源码分析
PoolChunk类主要负责内存块的分配与回收;以下术语对于理解代码很重要
- page - 分配内存块的最小单位。
- chunk - page的集合
- 代码中chunkSize = 2^{maxOrder(二叉树高度)} * pageSize(page大小)
首先我们分配一个size = chunkSize的字节数组。每当需要创建给定大小的ByteBuf时,我们搜索字节数组中的第一个位置,
该位置具有足够的空白空间来容纳请求的大小,并返回一个编码这个偏移信息(这个内存段被标记为保留,所以它总是被一个ByteBuf使用)
1.是怎么管理page的集合呢?
用数组实现满二叉树结构;运用满二叉树结构管理内存page是否分配完,以及该节点能分配多少内存;
数组的内容保存当前节点高度;节点高度区分该节点能分配多少内存;
树结构如图下:

- 根据节点高度值,就知道能分配多少,例如根节点高度为零,就能能分配chunkSize大小内存,高度为1,能分配chunkSize/2…高度为11只能分配pageSize大小;
- 判断该节点内存是否分配完,根据节点高度,高度等于总层数 + 1;已经分配完;
- 当我们分配和释放节点时,我们更新存储在memoryMap中的值,以便判定节点是否分配含义:
- 1)memoryMap数组,默认初始化该节点高度,例如:memoryMap[512] = 9,则表示其本身到下面所有的子节点都可以被分配;
- 2)memoryMap[512] = val (从10到11), 则表示512节点下有子节点已经分配过,则该节点不能直接被分配,而其子节点中的第val和val以下层还存在未分配的节点;
- 3)memoryMap[512] = 12 (即总层数 + 1), 可分配的深度已经大于总层数, 则该节点下的所有子节点都已经被分配
2.数据结构
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
final class PoolChunk<T> implements PoolChunkMetric {
private static final int INTEGER_SIZE_MINUS_ONE = Integer.SIZE - 1;
final PoolArena<T> arena; //PoolArena 管理PoolChunk
final T memory; //实际分配内存容量为chunkSize的byte[](heap方式)或ByteBuffer(direct方式)
final boolean unpooled; //是否启动内存池状态
final int offset;
private final byte[] memoryMap; //分配内存状态满二叉树数组
private final byte[] depthMap; //保存满二叉树高度值数组,构造时memoryMap和depthMap都相同数据
private final PoolSubpage<T>[] subpages;//分配少于pageSize内存空间
/** Used to determine if the requested capacity is equal to or greater than pageSize. */
private final int subpageOverflowMask; //确定请求的容量是否等于或大于pageSize
private final int pageSize; //page大小
private final int pageShifts; // log2(pageSize)
private final int maxOrder; //满二叉树的高度
private final int chunkSize; //2^{maxOrder} * pageSize
private final int log2ChunkSize;//log2(chunkSize)
private final int maxSubpageAllocs;//1 << maxOrder=2^{maxOrder} 满二叉树的叶节点数量
/** Used to mark memory as unusable */
//用于分配内存标示,unusable=maxOrder + 1 这块内存已分配完成
private final byte unusable; //maxOrder + 1
private int freeBytes; //分配字节数量
//PoolChunkList
PoolChunkList<T> parent;
PoolChunk<T> prev;
PoolChunk<T> next;
}
|
- 毋庸置疑管理内存核心就是memoryMap数组,这个数组展示满二叉树结构,具备二叉树的结构;
3.构造
- pageSize 默认4k
- chunkSize 16MiB
- maxOrder 11
- pageShifts log2(pageShifts) 11
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
PoolChunk(PoolArena<T> arena, T memory, int pageSize, int maxOrder, int pageShifts, int chunkSize, int offset) {
unpooled = false;
this.arena = arena;
this.memory = memory;
this.pageSize = pageSize;
this.pageShifts = pageShifts;
this.maxOrder = maxOrder;
this.chunkSize = chunkSize;
this.offset = offset;
unusable = (byte) (maxOrder + 1);
log2ChunkSize = log2(chunkSize);
subpageOverflowMask = ~(pageSize - 1);
freeBytes = chunkSize;
assert maxOrder < 30 : "maxOrder should be < 30, but is: " + maxOrder;
maxSubpageAllocs = 1 << maxOrder;//2^{maxOrder} 满二叉树叶节点数量
// Generate the memory map.
memoryMap = new byte[maxSubpageAllocs << 1];//满二叉树2^{maxOrder+1} -1个结点,数组下标0不为二叉树根节点,
// 下标1才是树根节点,节点计算2^{maxOrder+1}
depthMap = new byte[memoryMap.length];
int memoryMapIndex = 1; //数组从下标1开始
//赋值满二叉树数组高度值 满二叉树的第d层有2 ^d 个结点
for (int d = 0; d <= maxOrder; ++ d) { // move down the tree one level at a time
int depth = 1 << d; //2 ^d
for (int p = 0; p < depth; ++ p) {
// in each level traverse left to right and set value to the depth of subtree
memoryMap[memoryMapIndex] = (byte) d;
depthMap[memoryMapIndex] = (byte) d;
memoryMapIndex ++;
}
}
//PoolSubpage作用就是分配小于4k内存,具体下面做分析
subpages = newSubpageArray(maxSubpageAllocs);//创建PoolSubpage数组,大小为叶子节点数量
}
@SuppressWarnings("unchecked")
private PoolSubpage<T>[] newSubpageArray(int size) {
return new PoolSubpage[size];
}
|
4.分配(allocate)
1
2
3
4
5
6
7
8
9
10
|
//normCapacity在PoolArena类中已经处理过,例如:申请分配9k内存,实际分配内存16K
//返回long类型数组下标值,-1说明没找分配内存,
//数组下标值int就可以满足,主要原因在于申请内存容量小于pageSize,分配机制PoolSubpage管理,所以PoolSubpage返回long一个状态标示,具体在PoolSubpage说明;
long allocate(int normCapacity) {
if ((normCapacity & subpageOverflowMask) != 0) { // normCapacity>= pageSize
return allocateRun(normCapacity); //chunk 正常分配
} else {
return allocateSubpage(normCapacity);//小于pageSize,PoolSubpage进行分配
}
}
|
a.allocateRun分配大于等于pageSize内存
1
2
3
4
5
6
7
8
9
10
|
private long allocateRun(int normCapacity) {
//获取normCapacity所在高度,log2(normCapacity) - pageShifts(log2(pageSize))=log2(normCapacity-pageSize),获取normCapacity高度
int d = maxOrder - (log2(normCapacity) - pageShifts);//maxOrder-normCapacity高度,WHY??分配内存二叉树高度越低,分配内存越大;跟正常高度相反;
int id = allocateNode(d);
if (id < 0) {
return id;
}
freeBytes -= runLength(id);
return id;
}
|
b.allocateNode获取节点下标值allocateNode
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
private int allocateNode(int d) {
//从根节点遍历查询分配节点数组下标id
int id = 1;
int initial = - (1 << d); // has last d bits = 0 and rest all = 1
// 例如d=3,1<<3=8,
// 8=0000 0000 0000 0000 0000 0000 0000 1000
// -8=1111 1111 1111 1111 1111 1111 1111 1000
//
byte val = value(id);
if (val > d) { // unusable 根节点高度大于申请分配内存高度,说明根节点值unusable状态或者分配内存不够,不可以在分配;
return -1;
}
//当高度d时所有id相等这个等式id & initial == 1 << d,然而小于高度d的节点下标的id时相等这个等式(id & initial) == 0
//这里为什么这么做,主要原因时分配内存时,更新父类值,获取子类最小值;查找父类节点高度值,看是否能获取相应分配内存
while (val < d || (id & initial) == 0) { // id & initial == 1 << d for all ids at depth d, for < d it is 0
id <<= 1; //找子类左节点
val = value(id);
if (val > d) {
id ^= 1; //子类右节点
val = value(id);
}
}
byte value = value(id);
...
setValue(id, unusable); // mark as unusable 把当前节点设置不可以在分配
updateParentsAlloc(id); //更新父节点值
return id;
}
private byte value(int id) {
return memoryMap[id];
}
private void setValue(int id, byte val) {
memoryMap[id] = val;
}
|
c.更新父节点以及追加到根节点的高度值
1
2
3
4
5
6
7
8
9
10
11
12
13
|
/**
* 更新父节点的高度值,往上更新
*/
private void updateParentsAlloc(int id) {
while (id > 1) {
int parentId = id >>> 1; //id/2获取父节点
byte val1 = value(id);
byte val2 = value(id ^ 1);//id ^ 1设计不错,可以获取左节点,又可以获取右节点
byte val = val1 < val2 ? val1 : val2;//获取最小值
setValue(parentId, val); //把子节点最小值赋给父节点
id = parentId; //往上更新,递归树结构
}
}
|
5.free释放内存分两种大于等page的大小以及小于page;分配逻辑也不同,释放也分两种;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
void free(long handle) {
int memoryMapIdx = memoryMapIdx(handle);//获取memoryMap数组下标值
int bitmapIdx = bitmapIdx(handle); //获取PoolSubpage中bitmap下标值
//这块PoolSubpage做具体分析,主要作用就是释放subpage
if (bitmapIdx != 0) { // free a subpage
PoolSubpage<T> subpage = subpages[subpageIdx(memoryMapIdx)];
assert subpage != null && subpage.doNotDestroy;
// Obtain the head of the PoolSubPage pool that is owned by the PoolArena and synchronize on it.
// This is need as we may add it back and so alter the linked-list structure.
PoolSubpage<T> head = arena.findSubpagePoolHead(subpage.elemSize);
synchronized (head) {
//0x3FFFFFFF=1<<30-1;
//bitmapIdx & 0x3FFFFFFF为什么这么操作?
//在PoolSubpage分配内存时,组装返回值中有个标示状态(PoolSubpage分配具体说明)占最高2位;所以这样排除最高2位处理,获取真正bitmapIdx值;
if (subpage.free(head, bitmapIdx & 0x3FFFFFFF)) {//
return;
}
}
}
freeBytes += runLength(memoryMapIdx);//释放内存大小
setValue(memoryMapIdx, depth(memoryMapIdx));//设置高度
updateParentsFree(memoryMapIdx);
}
private static int memoryMapIdx(long handle) {
return (int) handle;
}
private static int bitmapIdx(long handle) {
return (int) (handle >>> Integer.SIZE);
}
private byte depth(int id) {
return depthMap[id];
}
private int runLength(int id) {
// represents the size in #bytes supported by node 'id' in the tree
// (1 << (maxOrder - depth(id)))*pageSize ==> (1 << (maxOrder - depth(id)))* 1<< log2(pageSize) ==>
// log2ChunkSize 高度包括pageSize高度
return 1 << log2ChunkSize - depth(id);
}
|
a.释放内存更新父节点以及追加到根节点的高度值
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
private void updateParentsFree(int id) {
int logChild = depth(id) + 1;
while (id > 1) {
int parentId = id >>> 1;
byte val1 = value(id);
byte val2 = value(id ^ 1);
logChild -= 1; // in first iteration equals log, subsequently reduce 1 from logChild as we traverse up
//logChild判断孩子节点是否分配;都没分配把父节点修改成高度值,有个孩子分配,父节点修改成孩子中最小值
if (val1 == logChild && val2 == logChild) {
//logChild - 1 修改成以前高度节点,子类都可以分配内存
setValue(parentId, (byte) (logChild - 1));
} else {
byte val = val1 < val2 ? val1 : val2;
setValue(parentId, val);
}
id = parentId;//往上迭代
}
}
|
- logChild - 1??前置条件左节点和右节点高度相同,且根释放节点高度相同;
- 例如:该叶节点的释放(高度11)和该节点右节点也释放(高度11),按左右节点大小判断,父节点的高度11,这里错;赋值为高度-1;
6.initBuf初始化PooledByteBuf
调用PooledByteBuf.init进行初始化操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
void initBuf(PooledByteBuf<T> buf, long handle, int reqCapacity) {
int memoryMapIdx = (int)handle;
int bitmapIdx = (int)(handle >>> 32);
if (bitmapIdx == 0) {
byte val = this.value(memoryMapIdx);
assert val == this.unusable : String.valueOf(val);
//runOffset(memoryMapIdx) 获取偏移量
//this.runLength(memoryMapIdx)实际分配内存
buf.init(this, handle, this.runOffset(memoryMapIdx), reqCapacity, this.runLength(memoryMapIdx), this.arena.parent.threadCache());
} else {
this.initBufWithSubpage(buf, handle, bitmapIdx, reqCapacity);
}
}
private byte depth(int id) {
return depthMap[id];
}
private int runLength(int id) {
// represents the size in #bytes supported by node 'id' in the tree
return 1 << log2ChunkSize - depth(id);
}
//获取数组下标偏移量
private int runOffset(int id) {
// represents the 0-based offset in #bytes from start of the byte-array chunk
//表示从字节数组块开始的以字节为单位的基于0的偏移量
int shift = id ^ 1 << depth(id); //获取d层所有节点从左到右排列顺序(0标示开始);先执行1 << depth(id),获取节点数量,也是该层开始下标位置;获取该层节点从左到右下标位置;
return shift * runLength(id); //获取实际内存地址标偏移量,runLength(id)获取该id的层分配大小;
}
|
三、PoolSubpage 源码分析
PoolSubpage管理小于page的大小集合;分Tiny(小于512内存)和Small(小于4K)两个类型;基于page大小分配相同类型集合;
1.数据结构
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
final class PoolSubpage<T> implements PoolSubpageMetric {
final PoolChunk<T> chunk; //属于chunk内存区域
private final int memoryMapIdx;//PoolChunk中memoryMap数组下标
private final int runOffset; //PoolChunk中内存偏移量
private final int pageSize; //PoolChunk中page大小
private final long[] bitmap; //
//PoolArena管理关联,后面具体说明
PoolSubpage<T> prev;
PoolSubpage<T> next;
boolean doNotDestroy; //是否需要释放整个Page
int elemSize; //最小分配大小
private int maxNumElems; //该page包含的段数量
private int bitmapLength; //bitmap长度
private int nextAvail; //下一个可用的位置
private int numAvail; //可用段数量
}
|
2.构造
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
//创建链接列表头的特殊构造函数。
PoolSubpage(int pageSize) {
chunk = null;
memoryMapIdx = -1;
runOffset = -1;
elemSize = -1;
this.pageSize = pageSize;
bitmap = null;
}
PoolSubpage(PoolSubpage<T> head, PoolChunk<T> chunk, int memoryMapIdx, int runOffset, int pageSize, int elemSize) {
this.chunk = chunk;
this.memoryMapIdx = memoryMapIdx;
this.runOffset = runOffset;
this.pageSize = pageSize;
//分配最小值16,long是64位代表64个块段区状态,所以pageSize / 16 / 64=pageSize >>> 10得到分配最大数组容量
//为什么这里创建最大容量数组?PoolSubpage可以重复利用,只是elemSize大小可能不同,导致容量大小不同,就设置最大容量数组,不用每次重新创建内存大小;
bitmap = new long[pageSize >>> 10]; // pageSize / 16 / 64
init(head, elemSize);//分配elemSize大小
}
|
3.init 重新构建elemSize大小块
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
void init(PoolSubpage<T> head, int elemSize) {
doNotDestroy = true;
this.elemSize = elemSize;
if (elemSize != 0) {
maxNumElems = numAvail = pageSize / elemSize;//分多个elemSize大小块段区
nextAvail = 0;
bitmapLength = maxNumElems >>> 6; // long是64位代表64个块段区状态
if ((maxNumElems & 63) != 0) { //就是bitmapLength>>6=0情况,bitmapLength=1;
bitmapLength ++;
}
for (int i = 0; i < bitmapLength; i ++) { //初始化bitmap值
bitmap[i] = 0;
}
}
addToPool(head);
}
//往头链表拼接形成环绕
private void addToPool(PoolSubpage<T> head) {
assert prev == null && next == null;
prev = head;
next = head.next;
next.prev = this;
head.next = this;
}
|
4.PoolChunk#allocateSubpage分配小于pageSize内存创建或者重构PoolSubpage
PoolChunk类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
private long allocateSubpage(int normCapacity) {
// Obtain the head of the PoolSubPage pool that is owned by the PoolArena and synchronize on it.
// This is need as we may add it back and so alter the linked-list structure.
// 获取PoolArena拥有的PoolSubPage池的中获取头节点;进行head加锁
PoolSubpage<T> head = arena.findSubpagePoolHead(normCapacity);
synchronized (head) {
int d = maxOrder; // subpages are only be allocated from pages i.e., leaves
int id = allocateNode(d);//PoolChunk中分配可用内存id
if (id < 0) {
return id;
}
final PoolSubpage<T>[] subpages = this.subpages;
final int pageSize = this.pageSize;
freeBytes -= pageSize;
int subpageIdx = subpageIdx(id);//获取subpages数组下标值;
PoolSubpage<T> subpage = subpages[subpageIdx];
if (subpage == null) {
//创建PoolSubpage
subpage = new PoolSubpage<T>(head, this, id, runOffset(id), pageSize, normCapacity);
subpages[subpageIdx] = subpage;
} else {
//重新构建normCapacity大小块内存
subpage.init(head, normCapacity);
}
//PoolSubpage分配normCapacity大小内存
return subpage.allocate();
}
}
private int subpageIdx(int memoryMapIdx) {
return memoryMapIdx ^ maxSubpageAllocs; // remove highest set bit, to get offset 获取节点下标从左到右,从零开始
}
//获取数组下标偏移量
private int runOffset(int id) {
// represents the 0-based offset in #bytes from start of the byte-array chunk
//表示从字节数组块开始的以字节为单位的基于0的偏移量
int shift = id ^ 1 << depth(id); //获取d层所有节点从左到右排列顺序(0标示开始);
return shift * runLength(id); //获取实际内存地址标偏移量,runLength(id)获取该id的层分配大小;
}
|
5.allocate分配内存
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
/**
* Returns the bitmap index of the subpage allocation.
*/
long allocate() {
if (elemSize == 0) {
return toHandle(0);
}
if (numAvail == 0 || !doNotDestroy) {
return -1;
}
final int bitmapIdx = getNextAvail();//获取下个可用块内存
int q = bitmapIdx >>> 6; //bitmapIdx/64,获取bitmap数组下标值
int r = bitmapIdx & 63; //获取63内下标值
assert (bitmap[q] >>> r & 1) == 0;
bitmap[q] |= 1L << r; //在long类型64位相应下标值赋值位1;
if (-- numAvail == 0) {
removeFromPool();
}
return toHandle(bitmapIdx);
}
// |<-- 64-63 -->|<-- 62-32 -->| <-- 32 --> |
// | 01(状态标示) | bitmapIdx | memoryMapIdx |
//0x4000000000000000L=1<<62
//bitmapIdx(最大值512)30位足够保存相应值
//状态标示有个作用,bitmapIdx位零时,没有这个状态不能区分分配内存大于pageSize
private long toHandle(int bitmapIdx) {
return 0x4000000000000000L | (long) bitmapIdx << 32 | memoryMapIdx;
}
//删除该链表
private void removeFromPool() {
assert prev != null && next != null;
prev.next = next;
next.prev = prev;
next = null;
prev = null;
}
|
a.查找可以分配内存getNextAvail
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
private int getNextAvail() {
int nextAvail = this.nextAvail;
if (nextAvail >= 0) { //为什么这里判断nextAvail >= 0成立时又将this.nextAvail = -1呢?
//nextAvail只有两个地方赋值为nextAvail >= 0;在初始化赋值为零和在释放内存块时赋值相应值;
//nextAvail = -1重新迭代查询中间是否有释放内存块;
this.nextAvail = -1;
return nextAvail;
}
return findNextAvail();
}
private int findNextAvail() {
final long[] bitmap = this.bitmap;
final int bitmapLength = this.bitmapLength;
for (int i = 0; i < bitmapLength; i ++) {
long bits = bitmap[i];
if (~bits != 0) { //有分配内存
return findNextAvail0(i, bits);
}
}
return -1;
}
//获取nextAvail值
private int findNextAvail0(int i, long bits) {
final int maxNumElems = this.maxNumElems;
final int baseVal = i << 6;//i*64
for (int j = 0; j < 64; j ++) {
if ((bits & 1) == 0) {
int val = baseVal | j;
if (val < maxNumElems) {
return val;
} else {
break;
}
}
bits >>>= 1;
}
return -1;
}
|
6.free释放内存
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
boolean free(PoolSubpage<T> head, int bitmapIdx) {
if (elemSize == 0) {
return true;
}
int q = bitmapIdx >>> 6;
int r = bitmapIdx & 63;
assert (bitmap[q] >>> r & 1) != 0;
bitmap[q] ^= 1L << r;
setNextAvail(bitmapIdx);
if (numAvail ++ == 0) { //不可分配内存中正好释放一个块内存,添加到缓存链表中
addToPool(head);
return true;
}
if (numAvail != maxNumElems) {
return true;
} else {
// Subpage not in use (numAvail == maxNumElems)
if (prev == next) {
// Do not remove if this subpage is the only one left in the pool.
//如果subpage池中唯一的一个,请不要删除。
return true;
}
// Remove this subpage from the pool if there are other subpages left in the pool.
//如果池中还有其他subpage,则从池中删除该子页面。
doNotDestroy = false;
removeFromPool();
return false;
}
}
private void setNextAvail(int bitmapIdx) {
nextAvail = bitmapIdx;
}
|
7.initBufWithSubpage
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
void initBufWithSubpage(PooledByteBuf<T> buf, long handle, int reqCapacity) {
this.initBufWithSubpage(buf, handle, (int)(handle >>> 32), reqCapacity);
}
private void initBufWithSubpage(PooledByteBuf<T> buf, long handle, int bitmapIdx, int reqCapacity) {
assert bitmapIdx != 0;
int memoryMapIdx = (int)handle;
PoolSubpage<T> subpage = this.subpages[this.subpageIdx(memoryMapIdx)];
assert subpage.doNotDestroy;
assert reqCapacity <= subpage.elemSize;
//1073741823=0x3FFFFFFF free做具体分析这里不做分析
//his.runOffset(memoryMapIdx) + (bitmapIdx & 1073741823) * subpage.elemSize 获取偏移量
//subpage.elemSize实际分配内存
buf.init(this, handle, this.runOffset(memoryMapIdx) + (bitmapIdx & 1073741823) * subpage.elemSize, reqCapacity, subpage.elemSize, this.arena.parent.threadCache());
}
|
四、PoolChunkList 源码分析
为了提高内存分配效率并减少内部碎片,jemalloc算法将Arena切分为小块Chunk,根据每块的内存使用率又将小块组合为以下几种状态:
QINIT,Q0,Q25,Q50,Q75,Q100。Chunk块可以在这几种状态间随着内存使用率的变化进行转移,内存使用率和状态转移可参见下图:

PoolChunkList管理chunk状态;
1.数据结构
1
2
3
4
5
6
7
8
9
10
11
12
13
|
final class PoolChunkList<T> implements PoolChunkListMetric {
private static final Iterator<PoolChunkMetric> EMPTY_METRICS = Collections.<PoolChunkMetric>emptyList().iterator();
private final PoolArena<T> arena; //所属的Arena
private final PoolChunkList<T> nextList;//next状态
private final int minUsage; //最小内存使用率
private final int maxUsage; //最大内存使用率
private final int maxCapacity;//Chunk可分配的最大字节数
private PoolChunk<T> head; //Chunk头节点
// This is only update once when create the linked like list of PoolChunkList in PoolArena constructor.
//这只是在PoolArena构造函数中创建PoolChunkList的链接列表时更新一次
private PoolChunkList<T> prevList;//prev状态
}
|
2.构造
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
PoolChunkList(PoolArena<T> arena, PoolChunkList<T> nextList, int minUsage, int maxUsage, int chunkSize) {
assert minUsage <= maxUsage;
this.arena = arena;
this.nextList = nextList;
this.minUsage = minUsage;
this.maxUsage = maxUsage;
maxCapacity = calculateMaxCapacity(minUsage, chunkSize);
}
private static int calculateMaxCapacity(int minUsage, int chunkSize) {
minUsage = minUsage0(minUsage);
if (minUsage == 100) {
// If the minUsage is 100 we can not allocate anything out of this list.
//如果minUsage是100,我们不能从这个列表中分配任何东西。
return 0;
}
// Calculate the maximum amount of bytes that can be allocated from a PoolChunk in this PoolChunkList.
// 计算PoolChunkList中可以从PoolChunk分配的最大字节数。
// As an example: 例如
// - If a PoolChunkList has minUsage == 25 we are allowed to allocate at most 75% of the chunkSize because
// this is the maximum amount available in any PoolChunk in this PoolChunkList.
//如果PoolChunkList的minUsage为25,我们最多可以分配块大小的75%,因为这是PoolChunkList中任何PoolChunk的最大可用量。
return (int) (chunkSize * (100L - minUsage) / 100L);
}
private static int minUsage0(int value) {
return max(1, value); //Math.max
}
//设置prevList
void prevList(PoolChunkList<T> prevList) {
assert this.prevList == null;
this.prevList = prevList;
}
|
3.allocate分配内存
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
boolean allocate(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
if (head == null || normCapacity > maxCapacity) {
// Either this PoolChunkList is empty or the requested capacity is larger then the capacity which can
// be handled by the PoolChunks that are contained in this PoolChunkList.
return false;
}
for (PoolChunk<T> cur = head;;) {
long handle = cur.allocate(normCapacity); //申请分配内存
if (handle < 0) { //小于零,往下PoolChunk申请分配内存
cur = cur.next;
if (cur == null) {
return false;
}
} else {
cur.initBuf(buf, handle, reqCapacity); //初始化PooledByteBuf
if (cur.usage() >= maxUsage) { //chunk 当前使用的百分比大于等于PoolChunkList最大值,删除该chunk,往PoolChunkList下个值添加chunk;
remove(cur);
nextList.add(cur);
}
return true;
}
}
}
|
a.remove
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
private void remove(PoolChunk<T> cur) {
if (cur == head) {
head = cur.next;
if (head != null) {
head.prev = null;
}
} else {
PoolChunk<T> next = cur.next;
cur.prev.next = next;
if (next != null) {
next.prev = cur.prev;
}
}
}
|
4.free释放内存
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
boolean free(PoolChunk<T> chunk, long handle) {
chunk.free(handle);
if (chunk.usage() < minUsage) {//chunk 当前使用的百分比小于PoolChunkList最小值,删除该chunk,往PoolChunkList上个值添加chunk;
remove(chunk);
// Move the PoolChunk down the PoolChunkList linked-list.
return move0(chunk);
}
return true;
}
private boolean move0(PoolChunk<T> chunk) {
if (prevList == null) {
// There is no previous PoolChunkList so return false which result in having the PoolChunk destroyed and
// all memory associated with the PoolChunk will be released.
//没有先前的PoolChunkList返回false,导致PoolChunk被销毁,并且所有与PoolChunk关联的内存都将被释放
assert chunk.usage() == 0;
return false;
}
return prevList.move(chunk);
}
|
5.add 添加chunk
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
void add(PoolChunk<T> chunk) {
if (chunk.usage() >= maxUsage) {
nextList.add(chunk);
return;
}
add0(chunk);
}
void add0(PoolChunk<T> chunk) {
chunk.parent = this;
if (head == null) {
head = chunk;
chunk.prev = null;
chunk.next = null;
} else {
chunk.prev = null;
chunk.next = head;
head.prev = chunk;
head = chunk;
}
}
|
6.move 添加chunk
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
private boolean move(PoolChunk<T> chunk) {
assert chunk.usage() < maxUsage;
if (chunk.usage() < minUsage) {
// Move the PoolChunk down the PoolChunkList linked-list.
return move0(chunk);
}
// PoolChunk fits into this PoolChunkList, adding it here.
add0(chunk);
return true;
}
private boolean move0(PoolChunk<T> chunk) {
if (prevList == null) {
// There is no previous PoolChunkList so return false which result in having the PoolChunk destroyed and
// all memory associated with the PoolChunk will be released.
assert chunk.usage() == 0;
return false;
}
return prevList.move(chunk);
}
|
五、PoolArena 源码分析
PoolArena是抽象类,两个实现类HeapArena(堆内存)和DirectArena(直接内存);
1.数据结构
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
abstract class PoolArena<T> implements PoolArenaMetric {
static final boolean HAS_UNSAFE = PlatformDependent.hasUnsafe();
enum SizeClass {
Tiny,
Small,
Normal
// 除此之外的请求为Huge
}
//Tiny分配固定值数量
static final int numTinySubpagePools = 512 >>> 4;//32
final PooledByteBufAllocator parent;
private final int maxOrder; //chunk相关满二叉树的高度
final int pageSize; //page大小
final int pageShifts; //pageSize是2幂次方,pageShifts=log2(pageSize),位移大小
final int chunkSize; //chunk大小
final int subpageOverflowMask; //用于判断请求是否为Small/Tiny
final int numSmallSubpagePools;//small请求的双向链表头个数
final int directMemoryCacheAlignment;//对齐基准
final int directMemoryCacheAlignmentMask;//用于对齐内存
private final PoolSubpage<T>[] tinySubpagePools;//tiny Subpage双向链表
private final PoolSubpage<T>[] smallSubpagePools;//small Subpage双向链表
//chunk状态 jemalloc博客有具体说明
private final PoolChunkList<T> q050;
private final PoolChunkList<T> q025;
private final PoolChunkList<T> q000;
private final PoolChunkList<T> qInit;
private final PoolChunkList<T> q075;
private final PoolChunkList<T> q100;
// 线程缓存数量
final AtomicInteger numThreadCaches = new AtomicInteger();
}
|
2.构造
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
|
protected PoolArena(PooledByteBufAllocator parent, int pageSize,
int maxOrder, int pageShifts, int chunkSize, int cacheAlignment) {
this.parent = parent;
this.pageSize = pageSize;
this.maxOrder = maxOrder;
this.pageShifts = pageShifts;
this.chunkSize = chunkSize;
directMemoryCacheAlignment = cacheAlignment;
directMemoryCacheAlignmentMask = cacheAlignment - 1;
subpageOverflowMask = ~(pageSize - 1);
tinySubpagePools = newSubpagePoolArray(numTinySubpagePools); // 512 >>> 4
for (int i = 0; i < tinySubpagePools.length; i ++) {
tinySubpagePools[i] = newSubpagePoolHead(pageSize); //初始化PoolSubpage的头节点
}
//Small分配固定值数量
numSmallSubpagePools = pageShifts - 9;// 2^9=512
smallSubpagePools = newSubpagePoolArray(numSmallSubpagePools);
for (int i = 0; i < smallSubpagePools.length; i ++) {
smallSubpagePools[i] = newSubpagePoolHead(pageSize); //初始化PoolSubpage的头节点
}
//PoolChunkList 状态赋值,状态关联初始化
q100 = new PoolChunkList<T>(this, null, 100, Integer.MAX_VALUE, chunkSize);
q075 = new PoolChunkList<T>(this, q100, 75, 100, chunkSize);
q050 = new PoolChunkList<T>(this, q075, 50, 100, chunkSize);
q025 = new PoolChunkList<T>(this, q050, 25, 75, chunkSize);
q000 = new PoolChunkList<T>(this, q025, 1, 50, chunkSize);
qInit = new PoolChunkList<T>(this, q000, Integer.MIN_VALUE, 25, chunkSize);
q100.prevList(q075);
q075.prevList(q050);
q050.prevList(q025);
q025.prevList(q000);
q000.prevList(null);
qInit.prevList(qInit);
}
private PoolSubpage<T> newSubpagePoolHead(int pageSize) {
PoolSubpage<T> head = new PoolSubpage<T>(pageSize);
head.prev = head;
head.next = head;
return head;
}
@SuppressWarnings("unchecked")
private PoolSubpage<T>[] newSubpagePoolArray(int size) {
return new PoolSubpage[size];
}
private PoolSubpage<T> newSubpagePoolHead(int pageSize) {
PoolSubpage<T> head = new PoolSubpage<T>(pageSize);
head.prev = head;
head.next = head;
return head;
}
|
2.allocate 分配内存
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
|
PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {
PooledByteBuf<T> buf = newByteBuf(maxCapacity); //newByteBufy由子类实现
allocate(cache, buf, reqCapacity);
return buf;
}
private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
final int normCapacity = normalizeCapacity(reqCapacity);//获取计算需要分配内存值
if (isTinyOrSmall(normCapacity)) { // capacity < pageSize
int tableIdx;
PoolSubpage<T>[] table;
boolean tiny = isTiny(normCapacity);
if (tiny) { // < 512
if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) { //PoolThreadCache缓存中获取,具体后面介绍
// was able to allocate out of the cache so move on
return;
}
tableIdx = tinyIdx(normCapacity);
table = tinySubpagePools;
} else {
if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {//PoolThreadCache缓存中获取,具体后面介绍
// was able to allocate out of the cache so move on
return;
}
tableIdx = smallIdx(normCapacity);
table = smallSubpagePools;
}
final PoolSubpage<T> head = table[tableIdx];
//从tinySubpagePools和smallSubpagePools中获取要分配内存
synchronized (head) {
final PoolSubpage<T> s = head.next;
if (s != head) { //
assert s.doNotDestroy && s.elemSize == normCapacity;
long handle = s.allocate();
assert handle >= 0;
s.chunk.initBufWithSubpage(buf, handle, reqCapacity);
incTinySmallAllocation(tiny); //统计创建Tiny或者Small数量
return;
}
}
synchronized (this) {
allocateNormal(buf, reqCapacity, normCapacity);//从chunk分配内存
}
incTinySmallAllocation(tiny);
return;
}
if (normCapacity <= chunkSize) {
if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
synchronized (this) {
allocateNormal(buf, reqCapacity, normCapacity);
++allocationsNormal;
}
} else {
// Huge allocations are never served via the cache so just call allocateHuge
//巨大的分配永远不会通过内存池服务,所以只需调用allocateHuge
allocateHuge(buf, reqCapacity);
}
}
// capacity < pageSize subpageOverflowMask=~(pageSize-1)
boolean isTinyOrSmall(int normCapacity) {
return (normCapacity & subpageOverflowMask) == 0;
}
// normCapacity < 512 0xFFFFFE00=-512
static boolean isTiny(int normCapacity) {
return (normCapacity & 0xFFFFFE00) == 0;
}
//获取tinySubpagePools数组下标值
static int tinyIdx(int normCapacity) {
return normCapacity >>> 4;
}
//获取smallSubpagePools数组下标值
static int smallIdx(int normCapacity) {
int tableIdx = 0;
int i = normCapacity >>> 10; //normCapacity/512
while (i != 0) {
i >>>= 1; //i/2
tableIdx ++;
}
return tableIdx;
}
private void incTinySmallAllocation(boolean tiny) {
if (tiny) {
allocationsTiny.increment();
} else {
allocationsSmall.increment();
}
}
|
3.normalizeCapacity 获取计算需要分配内存值;
两种情况大于等于512情况和小于512情况
大于等于512:大于reqCapacity值最近2幂次方值
小于512情况:reqCapacity的值在(1-(521»4)=32)«4范围内,一共32个
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
|
int normalizeCapacity(int reqCapacity) {
if (reqCapacity < 0) {
throw new IllegalArgumentException("capacity: " + reqCapacity + " (expected: 0+)");
}
if (reqCapacity >= chunkSize) {
return directMemoryCacheAlignment == 0 ? reqCapacity : alignCapacity(reqCapacity);
}
if (!isTiny(reqCapacity)) { // >= 512
// Doubled
int normalizedCapacity = reqCapacity;
normalizedCapacity --;
//这块代码 最高位1后面都要补1
//例如 normalizedCapacity=6 0110 转化normalizedCapacity=7 0111
normalizedCapacity |= normalizedCapacity >>> 1;
normalizedCapacity |= normalizedCapacity >>> 2;
normalizedCapacity |= normalizedCapacity >>> 4;
normalizedCapacity |= normalizedCapacity >>> 8;
normalizedCapacity |= normalizedCapacity >>> 16;
//最后加1,能获取normalizedCapacity最近2幂次方
normalizedCapacity ++;
if (normalizedCapacity < 0) { //normalizedCapacity达到最大
normalizedCapacity >>>= 1;
}
assert directMemoryCacheAlignment == 0 || (normalizedCapacity & directMemoryCacheAlignmentMask) == 0;
return normalizedCapacity;
}
if (directMemoryCacheAlignment > 0) {
return alignCapacity(reqCapacity);
}
// Quantum-spacedb
if ((reqCapacity & 15) == 0) {//reqCapacity的值在(1-(512>>4)=32)<<4范围内,说白就是16倍数而已,一共31个不包括512
return reqCapacity;
}
return (reqCapacity & ~15) + 16;//把reqCapacity最后四位补零在加16,在上面范围内
}
//0xFFFFFE00=-512 判断是否小于512
static boolean isTiny(int normCapacity) {
return (normCapacity & 0xFFFFFE00) == 0;
}
int alignCapacity(int reqCapacity) {
int delta = reqCapacity & directMemoryCacheAlignmentMask;
return delta == 0 ? reqCapacity : reqCapacity + directMemoryCacheAlignment - delta;
}
|
4.allocateNormal 从chunk中分配内存
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
private void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
//为什么是从q050开始尝试分配呢?
// q050是内存占用50%~100%的chunk,猜测是希望能够提高整个应用的内存使用率,因为这样大部分情况下会使用q050的内存,
//这样在内存使用不是很多的情况下一些利用率低(<50%)的chunk慢慢就会淘汰出去,最终被回收。
if (q050.allocate(buf, reqCapacity, normCapacity) || q025.allocate(buf, reqCapacity, normCapacity) ||
q000.allocate(buf, reqCapacity, normCapacity) || qInit.allocate(buf, reqCapacity, normCapacity) ||
q075.allocate(buf, reqCapacity, normCapacity)) {
return;
}
// Add a new chunk. 创建新chunk
PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize);//抽象方法子类实现
long handle = c.allocate(normCapacity);
assert handle > 0;
c.initBuf(buf, handle, reqCapacity);
qInit.add(c); //第一次创建添加到qInit状态中
}
|
5.allocateHuge 分配大小大于chunkSize不通过内存池创建
1
2
3
4
5
6
|
private void allocateHuge(PooledByteBuf<T> buf, int reqCapacity) {
PoolChunk<T> chunk = newUnpooledChunk(reqCapacity);//子类实现
activeBytesHuge.add(chunk.chunkSize()); //统计
buf.initUnpooled(chunk, reqCapacity);
allocationsHuge.increment(); //统计
}
|
6.free 释放内存
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
|
void free(PoolChunk<T> chunk, long handle, int normCapacity, PoolThreadCache cache) {
if (chunk.unpooled) {
int size = chunk.chunkSize();
destroyChunk(chunk); //子类去实现
activeBytesHuge.add(-size);
deallocationsHuge.increment();
} else {
SizeClass sizeClass = sizeClass(normCapacity);
if (cache != null && cache.add(this, chunk, handle, normCapacity, sizeClass)) { //PoolThreadCache操作
// cached so not free it.
//缓存,所以不释放它。
return;
}
freeChunk(chunk, handle, sizeClass);
}
}
private SizeClass sizeClass(int normCapacity) {
if (!isTinyOrSmall(normCapacity)) {
return SizeClass.Normal;
}
return isTiny(normCapacity) ? SizeClass.Tiny : SizeClass.Small;
}
void freeChunk(PoolChunk<T> chunk, long handle, SizeClass sizeClass) {
final boolean destroyChunk;
synchronized (this) {
switch (sizeClass) {
case Normal:
++deallocationsNormal;
break;
case Small:
++deallocationsSmall;
break;
case Tiny:
++deallocationsTiny;
break;
default:
throw new Error();
}
destroyChunk = !chunk.parent.free(chunk, handle);//chunk.parent ChunkList操作free
}
if (destroyChunk) {//ChunkList操作free返回false才调用destroyChunk
// destroyChunk not need to be called while holding the synchronized lock.
destroyChunk(chunk); //子类去实现
}
}
|
7.findSubpagePoolHead 查询subpagePool的头节点
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
PoolSubpage<T> findSubpagePoolHead(int elemSize) {
int tableIdx;
PoolSubpage<T>[] table;
if (isTiny(elemSize)) { // < 512
tableIdx = elemSize >>> 4;
table = tinySubpagePools;
} else {
tableIdx = 0;
elemSize >>>= 10;
while (elemSize != 0) {
elemSize >>>= 1;
tableIdx ++;
}
table = smallSubpagePools;
}
return table[tableIdx];
}
|
8.reallocate 重新分配内存
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
void reallocate(PooledByteBuf<T> buf, int newCapacity, boolean freeOldMemory) {
if (newCapacity < 0 || newCapacity > buf.maxCapacity()) {
throw new IllegalArgumentException("newCapacity: " + newCapacity);
}
int oldCapacity = buf.length;
if (oldCapacity == newCapacity) {
return;
}
PoolChunk<T> oldChunk = buf.chunk;
long oldHandle = buf.handle;
T oldMemory = buf.memory;
int oldOffset = buf.offset;
int oldMaxLength = buf.maxLength;
int readerIndex = buf.readerIndex();
int writerIndex = buf.writerIndex();
allocate(parent.threadCache(), buf, newCapacity); //分配新内存大小
if (newCapacity > oldCapacity) {
memoryCopy(
oldMemory, oldOffset,
buf.memory, buf.offset, oldCapacity); //抽象方法,子类去实现
} else if (newCapacity < oldCapacity) {
if (readerIndex < newCapacity) {
if (writerIndex > newCapacity) {
writerIndex = newCapacity;
}
memoryCopy(
oldMemory, oldOffset + readerIndex,
buf.memory, buf.offset + readerIndex, writerIndex - readerIndex);
} else {
readerIndex = writerIndex = newCapacity;
}
}
buf.setIndex(readerIndex, writerIndex);
if (freeOldMemory) {
free(oldChunk, oldHandle, oldMaxLength, buf.cache);//释放旧内存
}
}
|
9.子类实现HeapArena和DirectArena
不做具体分析;
1
2
3
4
5
6
|
//子类实现抽象方法
protected abstract PoolChunk<T> newChunk(int pageSize, int maxOrder, int pageShifts, int chunkSize);
protected abstract PoolChunk<T> newUnpooledChunk(int capacity);
protected abstract PooledByteBuf<T> newByteBuf(int maxCapacity);
protected abstract void memoryCopy(T src, int srcOffset, T dst, int dstOffset, int length);
protected abstract void destroyChunk(PoolChunk<T> chunk);
|
六、PoolThreadCache 源码分析
1.数据结构
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
final class PoolThreadCache {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(PoolThreadCache.class);
//类型不同PoolArena
final PoolArena<byte[]> heapArena;
final PoolArena<ByteBuffer> directArena;
// Hold the caches for the different size classes, which are tiny, small and normal.
//大小不同的缓存
private final MemoryRegionCache<byte[]>[] tinySubPageHeapCaches;
private final MemoryRegionCache<byte[]>[] smallSubPageHeapCaches;
private final MemoryRegionCache<ByteBuffer>[] tinySubPageDirectCaches;
private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches;
private final MemoryRegionCache<byte[]>[] normalHeapCaches;
private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches;
// Used for bitshifting when calculate the index of normal caches later
//稍后计算正常高速缓存的索引时用于位移
private final int numShiftsNormalDirect;
private final int numShiftsNormalHeap;
//释放队列缓存门槛次数
private final int freeSweepAllocationThreshold;
//缓存分配次数
private int allocations;
}
|
2.MemoryRegionCache源码分析
MemoryRegionCache是抽象类,有SubPageMemoryRegionCache和NormalMemoryRegionCache实现
(1).数据结构
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
private abstract static class MemoryRegionCache<T> {
private final int size; //申请队列大小
private final Queue<Entry<T>> queue;//调用jctools中队列
private final SizeClass sizeClass; //分配类型,Tiny,Small,Normal
private int allocations; //分配数量统计
//运用对象池
static final class Entry<T> {
final Handle<Entry<?>> recyclerHandle;
PoolChunk<T> chunk;
long handle = -1;
Entry(Handle<Entry<?>> recyclerHandle) {
this.recyclerHandle = recyclerHandle;
}
void recycle() {
chunk = null;
handle = -1;
recyclerHandle.recycle(this);
}
}
}
|
(2).构造
1
2
3
4
5
|
MemoryRegionCache(int size, SizeClass sizeClass) {
this.size = MathUtil.safeFindNextPositivePowerOfTwo(size);//最终结果1<<log2(size-1)
queue = PlatformDependent.newFixedMpscQueue(this.size); //hasUnsafe() ? new MpscArrayQueue<T>(capacity) : new MpscAtomicArrayQueue<T>(capacity);
this.sizeClass = sizeClass;
}
|
(3).add 添加缓存
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
public final boolean add(PoolChunk<T> chunk, long handle) {
Entry<T> entry = newEntry(chunk, handle);
boolean queued = queue.offer(entry);
if (!queued) {
//如果不可能缓存该块,则立即回收该entry。
entry.recycle();
}
return queued;
}
private static Entry newEntry(PoolChunk<?> chunk, long handle) {
Entry entry = RECYCLER.get();
entry.chunk = chunk;
entry.handle = handle;
return entry;
}
//Recycler后面主体做介绍
@SuppressWarnings("rawtypes")
private static final Recycler<Entry> RECYCLER = new Recycler<Entry>() {
@SuppressWarnings("unchecked")
@Override
protected Entry newObject(Handle<Entry> handle) {
return new Entry(handle);
}
};
|
(4).从缓存中分配内存
1
2
3
4
5
6
7
8
9
10
11
12
|
public final boolean allocate(PooledByteBuf<T> buf, int reqCapacity) {
Entry<T> entry = queue.poll();
if (entry == null) {
return false;
}
initBuf(entry.chunk, entry.handle, buf, reqCapacity);//抽象类
entry.recycle(); //回收
//allocations不是线程安全的,同一个线程始终调用。
++ allocations;
return true;
}
|
(5).从缓存队列中释放所有内存
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
public final int free() {
return free(Integer.MAX_VALUE);
}
private int free(int max) {
int numFreed = 0;
for (; numFreed < max; numFreed++) {
Entry<T> entry = queue.poll();
if (entry != null) {
freeEntry(entry);
} else {
// all cleared
return numFreed;
}
}
return numFreed;
}
@SuppressWarnings({ "unchecked", "rawtypes" })
private void freeEntry(Entry entry) {
PoolChunk chunk = entry.chunk;
long handle = entry.handle;
//现在回收所以PoolChunk可以被GC化
entry.recycle();
//freeChunk
chunk.arena.freeChunk(chunk, handle, sizeClass);
}
|
(5).trim如果分配不够频繁,则释放缓存的PoolChunk。
1
2
3
4
5
6
7
8
9
10
11
12
|
/**
* 如果分配不够频繁,则释放缓存的PoolChunk。
*/
public final void trim() {
int free = size - allocations;
allocations = 0;
// We not even allocated all the number that are
if (free > 0) {
free(free);
}
}
|
(6).子类实现SubPageMemoryRegionCache和NormalMemoryRegionCache
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
private static final class SubPageMemoryRegionCache<T> extends MemoryRegionCache<T> {
SubPageMemoryRegionCache(int size, SizeClass sizeClass) {
super(size, sizeClass);
}
@Override
protected void initBuf(
PoolChunk<T> chunk, long handle, PooledByteBuf<T> buf, int reqCapacity) {
chunk.initBufWithSubpage(buf, handle, reqCapacity);
}
}
private static final class NormalMemoryRegionCache<T> extends MemoryRegionCache<T> {
NormalMemoryRegionCache(int size) {
super(size, SizeClass.Normal);
}
@Override
protected void initBuf(
PoolChunk<T> chunk, long handle, PooledByteBuf<T> buf, int reqCapacity) {
chunk.initBuf(buf, handle, reqCapacity);
}
}
|
3.PoolThreadCache构造
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
|
PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena,
int tinyCacheSize, int smallCacheSize, int normalCacheSize,
int maxCachedBufferCapacity, int freeSweepAllocationThreshold) {
...
this.freeSweepAllocationThreshold = freeSweepAllocationThreshold;
this.heapArena = heapArena;
this.directArena = directArena;
if (directArena != null) { //直接内存初始化
tinySubPageDirectCaches = createSubPageCaches(
tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny); //创建Tiny缓存
smallSubPageDirectCaches = createSubPageCaches(
smallCacheSize, directArena.numSmallSubpagePools, SizeClass.Small);//创建small缓存
numShiftsNormalDirect = log2(directArena.pageSize);
normalDirectCaches = createNormalCaches(
normalCacheSize, maxCachedBufferCapacity, directArena); //创建normal缓存
directArena.numThreadCaches.getAndIncrement();
} else {
// No directArea is configured so just null out all caches
tinySubPageDirectCaches = null;
smallSubPageDirectCaches = null;
normalDirectCaches = null;
numShiftsNormalDirect = -1;
}
if (heapArena != null) { //堆内存初始化缓存数据
// Create the caches for the heap allocations
tinySubPageHeapCaches = createSubPageCaches(
tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
smallSubPageHeapCaches = createSubPageCaches(
smallCacheSize, heapArena.numSmallSubpagePools, SizeClass.Small);
numShiftsNormalHeap = log2(heapArena.pageSize);
normalHeapCaches = createNormalCaches(
normalCacheSize, maxCachedBufferCapacity, heapArena);
heapArena.numThreadCaches.getAndIncrement();
} else {
// No heapArea is configured so just null out all caches
tinySubPageHeapCaches = null;
smallSubPageHeapCaches = null;
normalHeapCaches = null;
numShiftsNormalHeap = -1;
}
...
}
private static <T> MemoryRegionCache<T>[] createSubPageCaches(
int cacheSize, int numCaches, SizeClass sizeClass) {
if (cacheSize > 0 && numCaches > 0) {
@SuppressWarnings("unchecked")
MemoryRegionCache<T>[] cache = new MemoryRegionCache[numCaches];
for (int i = 0; i < cache.length; i++) {
// TODO: maybe use cacheSize / cache.length
cache[i] = new SubPageMemoryRegionCache<T>(cacheSize, sizeClass);
}
return cache;
} else {
return null;
}
}
private static <T> MemoryRegionCache<T>[] createNormalCaches(
int cacheSize, int maxCachedBufferCapacity, PoolArena<T> area) {
if (cacheSize > 0 && maxCachedBufferCapacity > 0) {
int max = Math.min(area.chunkSize, maxCachedBufferCapacity);
int arraySize = Math.max(1, log2(max / area.pageSize) + 1);
@SuppressWarnings("unchecked")
MemoryRegionCache<T>[] cache = new MemoryRegionCache[arraySize];
for (int i = 0; i < cache.length; i++) {
cache[i] = new NormalMemoryRegionCache<T>(cacheSize);
}
return cache;
} else {
return null;
}
}
private static int log2(int val) {
int res = 0;
while (val > 1) {
val >>= 1;
res++;
}
return res;
}
|
说白其次都是操作MemoryRegionCache类进行缓存;下面所用操作都是MemoryRegionCache
4.添加缓存
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
/**
* Add {@link PoolChunk} and {@code handle} to the cache if there is enough room.
* Returns {@code true} if it fit into the cache {@code false} otherwise.
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
boolean add(PoolArena<?> area, PoolChunk chunk, long handle, int normCapacity, SizeClass sizeClass) {
MemoryRegionCache<?> cache = cache(area, normCapacity, sizeClass);
if (cache == null) {
return false;
}
return cache.add(chunk, handle);
}
private MemoryRegionCache<?> cache(PoolArena<?> area, int normCapacity, SizeClass sizeClass) {
switch (sizeClass) {
case Normal:
return cacheForNormal(area, normCapacity);
case Small:
return cacheForSmall(area, normCapacity);
case Tiny:
return cacheForTiny(area, normCapacity);
default:
throw new Error();
}
}
|
5.从缓存中分获取分配内存
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
/**
* Try to allocate a tiny buffer out of the cache. Returns {@code true} if successful {@code false} otherwise
*/
boolean allocateTiny(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
return allocate(cacheForTiny(area, normCapacity), buf, reqCapacity);
}
/**
* Try to allocate a small buffer out of the cache. Returns {@code true} if successful {@code false} otherwise
*/
boolean allocateSmall(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
return allocate(cacheForSmall(area, normCapacity), buf, reqCapacity);
}
/**
* Try to allocate a small buffer out of the cache. Returns {@code true} if successful {@code false} otherwise
*/
boolean allocateNormal(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
return allocate(cacheForNormal(area, normCapacity), buf, reqCapacity);
}
@SuppressWarnings({ "unchecked", "rawtypes" })
private boolean allocate(MemoryRegionCache<?> cache, PooledByteBuf buf, int reqCapacity) {
if (cache == null) {
// no cache found so just return false here
return false;
}
boolean allocated = cache.allocate(buf, reqCapacity);
if (++ allocations >= freeSweepAllocationThreshold) { //门槛次数做判断,释放内存
allocations = 0;
trim();
}
return allocated;
}
|
5.free释放缓存内存
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
void free() {
int numFreed = free(tinySubPageDirectCaches) +
free(smallSubPageDirectCaches) +
free(normalDirectCaches) +
free(tinySubPageHeapCaches) +
free(smallSubPageHeapCaches) +
free(normalHeapCaches);
if (directArena != null) {
directArena.numThreadCaches.getAndDecrement();
}
if (heapArena != null) {
heapArena.numThreadCaches.getAndDecrement();
}
}
private static int free(MemoryRegionCache<?>[] caches) {
if (caches == null) {
return 0;
}
int numFreed = 0;
for (MemoryRegionCache<?> c: caches) {
numFreed += free(c);
}
return numFreed;
}
private static int free(MemoryRegionCache<?> cache) {
if (cache == null) {
return 0;
}
return cache.free();
}
|
6.trim释放缓存内存
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
void trim() {
trim(tinySubPageDirectCaches);
trim(smallSubPageDirectCaches);
trim(normalDirectCaches);
trim(tinySubPageHeapCaches);
trim(smallSubPageHeapCaches);
trim(normalHeapCaches);
}
private static void trim(MemoryRegionCache<?>[] caches) {
if (caches == null) {
return;
}
for (MemoryRegionCache<?> c: caches) {
trim(c);
}
}
private static void trim(MemoryRegionCache<?> cache) {
if (cache == null) {
return;
}
cache.trim();
}
|
七、PooledByteBuf 源码分析
PooledByteBuf是抽象类
1.数据结构
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
abstract class PooledByteBuf<T> extends AbstractReferenceCountedByteBuf {
private final Recycler.Handle<PooledByteBuf<T>> recyclerHandle;
protected PoolChunk<T> chunk; //chunk
protected long handle; //chunk 定位值
protected T memory; //实际内存区域
protected int offset; //偏移量
protected int length; //内存大小
int maxLength; //最大长度
PoolThreadCache cache; //内存池缓存
private ByteBuffer tmpNioBuf;
private ByteBufAllocator allocator;
}
|
2.构造
1
2
3
4
|
protected PooledByteBuf(Recycler.Handle<? extends PooledByteBuf<T>> recyclerHandle, int maxCapacity) {
super(maxCapacity);
this.recyclerHandle = (Handle<PooledByteBuf<T>>) recyclerHandle;
}
|
3.init 初始化,在内存池中分配内存调用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
void init(PoolChunk<T> chunk, long handle, int offset, int length, int maxLength, PoolThreadCache cache) {
init0(chunk, handle, offset, length, maxLength, cache);
}
//没用到内存池初始化
void initUnpooled(PoolChunk<T> chunk, int length) {
init0(chunk, 0, chunk.offset, length, length, null);
}
private void init0(PoolChunk<T> chunk, long handle, int offset, int length, int maxLength, PoolThreadCache cache) {
assert handle >= 0;
assert chunk != null;
this.chunk = chunk;
memory = chunk.memory;
allocator = chunk.arena.parent;
this.cache = cache;
this.handle = handle;
this.offset = offset;
this.length = length;
this.maxLength = maxLength;
tmpNioBuf = null;
}
|
4.capacity 重新分配新容量
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
@Override
public final ByteBuf capacity(int newCapacity) {
checkNewCapacity(newCapacity);
// If the request capacity does not require reallocation, just update the length of the memory.
if (chunk.unpooled) {
if (newCapacity == length) {
return this;
}
} else {
if (newCapacity > length) { //扩容
if (newCapacity <= maxLength) { //小于maxLength 不用在重新分配容量
length = newCapacity;
return this;
}
} else if (newCapacity < length) {//缩小容量
if (newCapacity > maxLength >>> 1) {//小于maxLength/2,
if (maxLength <= 512) { //判断maxLength是否小于等于512
if (newCapacity > maxLength - 16) {
length = newCapacity;
setIndex(Math.min(readerIndex(), newCapacity), Math.min(writerIndex(), newCapacity));
return this;
}
} else { // > 512 (i.e. >= 1024)
length = newCapacity;
setIndex(Math.min(readerIndex(), newCapacity), Math.min(writerIndex(), newCapacity));
return this;
}
}
} else {
return this;
}
}
// Reallocation required. 重新分配内存大小
chunk.arena.reallocate(this, newCapacity, true);
return this;
}
|