【注意】最后更新于 April 15, 2019,文中内容可能已过时,请谨慎使用。
一、介绍
Netty为了提高性能,Java的ThreadLocal不用,自己创建FastThreadLocal,屌爆ThreadLocal;
ThreadLocal的内存泄露的风险。
当从FastThreadLocalThread访问时,具有更高的访问性能。内部结构用数组保存,FastThreadLocal在数组中使用常量索引来查找变量,
而不是使用哈希码和哈希表。尽管看似非常微妙,但与使用哈希表相比,它在性能上却有一点优势,并且在经常访问时很有用。
二、FastThreadLocal
1.数据结构
1
2
3
4
5
|
private final int index;
public FastThreadLocal() {
index = InternalThreadLocalMap.nextVariableIndex();
}
|
- 常量index
- InternalThreadLocalMap.nextVariableIndex()赋值给index
a.InternalThreadLocalMap.nextVariableIndex()
- nextIndex是静态常量AtomicInteger
- 获取index是自动增长,原子操作;多线程访问存在竞争问题,CAS控制;
1
2
3
4
5
6
7
8
|
public static int nextVariableIndex() {
int index = nextIndex.getAndIncrement();
if (index < 0) {
nextIndex.decrementAndGet();
throw new IllegalStateException(...);
}
return index;
}
|
2.get 返回当前线程的当前值
1
2
3
4
5
6
7
8
9
10
11
|
public final V get() {
InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
Object v = threadLocalMap.indexedVariable(index);
if (v != InternalThreadLocalMap.UNSET) {
return (V) v;
}
V value = initialize(threadLocalMap);
registerCleaner(threadLocalMap);
return value;
}
|
a.InternalThreadLocalMap.get() 当线程中获取InternalThreadLocalMap
- fastGet比slowGet快,前提条件创建FastThreadLocalThread线程;
1
2
3
4
5
6
7
8
|
public static InternalThreadLocalMap get() {
Thread thread = Thread.currentThread();
if (thread instanceof FastThreadLocalThread) {
return fastGet((FastThreadLocalThread) thread);
} else {
return slowGet();
}
}
|
b.InternalThreadLocalMap.fastGet()
FastThreadLocalThread变量threadLocalMap查找InternalThreadLocalMap,为空创建InternalThreadLocalMap
1
2
3
4
5
6
7
|
private static InternalThreadLocalMap fastGet(FastThreadLocalThread thread) {
InternalThreadLocalMap threadLocalMap = thread.threadLocalMap();
if (threadLocalMap == null) {
thread.setThreadLocalMap(threadLocalMap = new InternalThreadLocalMap());
}
return threadLocalMap;
}
|
b.InternalThreadLocalMap.slowGet()
从JAVA的ThreadLocal获取InternalThreadLocalMap,不存在创建InternalThreadLocalMap
1
2
3
4
5
6
7
8
9
|
private static InternalThreadLocalMap slowGet() {
ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = UnpaddedInternalThreadLocalMap.slowThreadLocalMap;
InternalThreadLocalMap ret = slowThreadLocalMap.get();
if (ret == null) {
ret = new InternalThreadLocalMap();
slowThreadLocalMap.set(ret);
}
return ret;
}
|
c.InternalThreadLocalMap.indexedVariable获取当前线程的当前值;
1
2
3
4
|
public Object indexedVariable(int index) {
Object[] lookup = indexedVariables;
return index < lookup.length? lookup[index] : UNSET;
}
|
3.initialize 未获取到值,进行初始化操作
1
2
3
4
5
6
7
8
9
10
11
12
13
|
private V initialize(InternalThreadLocalMap threadLocalMap) {
V v = null;
try {
//空方法,具体创建对象时,自行重写方法;
v = initialValue();
} catch (Exception e) {
PlatformDependent.throwException(e);
}
threadLocalMap.setIndexedVariable(index, v);
addToVariablesToRemove(threadLocalMap, this);
return v;
}
|
a.InternalThreadLocalMap.setIndexedVariable 设置本地线程变量
- 往数组中指定下标赋值本地变量
- 下标索引大于数组大小,继续扩容
1
2
3
4
5
6
7
8
9
10
11
12
|
public boolean setIndexedVariable(int index, Object value) {
Object[] lookup = indexedVariables;
if (index < lookup.length) {
Object oldValue = lookup[index];
lookup[index] = value;
return oldValue == UNSET;
} else {
expandIndexedVariableTableAndSet(index, value);
return true;
}
}
|
b.InternalThreadLocalMap.expandIndexedVariableTableAndSet 进行扩容
- 进行扩容,2次方值大于下标索引最近值;
- 在进行下标赋值
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
private void expandIndexedVariableTableAndSet(int index, Object value) {
Object[] oldArray = indexedVariables;
final int oldCapacity = oldArray.length;
int newCapacity = index;
newCapacity |= newCapacity >>> 1;
newCapacity |= newCapacity >>> 2;
newCapacity |= newCapacity >>> 4;
newCapacity |= newCapacity >>> 8;
newCapacity |= newCapacity >>> 16;
newCapacity ++;
Object[] newArray = Arrays.copyOf(oldArray, newCapacity);
Arrays.fill(newArray, oldCapacity, newArray.length, UNSET);
newArray[index] = value;
indexedVariables = newArray;
}
|
4.addToVariablesToRemove 为本地线程添加删除索引的变量
- variablesToRemoveIndex默认为零,静态常量变量;类创建第一次就调用InternalThreadLocalMap.nextVariableIndex;
- 删除索引的变量的值是Set
- 作用后面说
1
2
3
4
5
6
7
8
9
10
11
12
13
|
private static final int variablesToRemoveIndex = InternalThreadLocalMap.nextVariableIndex();
private static void addToVariablesToRemove(InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) {
Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
Set<FastThreadLocal<?>> variablesToRemove;
if (v == InternalThreadLocalMap.UNSET || v == null) {
variablesToRemove = Collections.newSetFromMap(new IdentityHashMap<FastThreadLocal<?>, Boolean>());
threadLocalMap.setIndexedVariable(variablesToRemoveIndex, variablesToRemove);
} else {
variablesToRemove = (Set<FastThreadLocal<?>>) v;
}
variablesToRemove.add(variable);
}
|
5.registerCleaner 对象清理注册ObjectCleaner.register
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
private void registerCleaner(final InternalThreadLocalMap threadLocalMap) {
Thread current = Thread.currentThread();
if (!FastThreadLocalThread.willCleanupFastThreadLocals(current)) {
//我们将需要确保我们将触发remove(InternalThreadLocalMap),
// 以便将所有内容释放并调用FastThreadLocal.onRemoval(...)。
ObjectCleaner.register(current, new Runnable() {
@Override
public void run() {
remove(threadLocalMap);
//最好不要在此处调用InternalThreadLocalMap.remove(),
//因为只有在GC收集了线程后才会触发此操作。在这种情况下,ThreadLocal已经消失了。
}
});
}
}
|
6.remove GC清理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
public final void remove(InternalThreadLocalMap threadLocalMap) {
if (threadLocalMap == null) {
return;
}
//把当前下标设置成UNSET
Object v = threadLocalMap.removeIndexedVariable(index);
//variablesToRemoveIndex下标值中Set移除当前下标索引
removeFromVariablesToRemove(threadLocalMap, this);
if (v != InternalThreadLocalMap.UNSET) {
try {
//空方法,自行实现处理相应逻辑
onRemoval((V) v);
} catch (Exception e) {
PlatformDependent.throwException(e);
}
}
}
|
7.set 设置变量值
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
public final void set(V value) {
if (value != InternalThreadLocalMap.UNSET) {
InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
if (setKnownNotUnset(threadLocalMap, value)) {
registerCleaner(threadLocalMap);
}
} else {
remove();
}
}
private boolean setKnownNotUnset(InternalThreadLocalMap threadLocalMap, V value) {
if (threadLocalMap.setIndexedVariable(index, value)) {
addToVariablesToRemove(threadLocalMap, this);
return true;
}
return false;
}
|
8.removeAll
- 移除本地线程中所有FastThreadLocal变量值
- variablesToRemoveIndex下标保存所有FastThreadLocal,可以快速触发remove
- 为什么variablesToRemoveIndex下标保存所有FastThreadLocal索引呢?
- 不用遍历数组InternalThreadLocalMap.indexedVariables,数组很大,反而影响性能
- FastThreadLocalRunnable执行run的finally执行FastThreadLocal.removeAll();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
public static void removeAll() {
InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.getIfSet();
if (threadLocalMap == null) {
return;
}
try {
Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
if (v != null && v != InternalThreadLocalMap.UNSET) {
@SuppressWarnings("unchecked")
Set<FastThreadLocal<?>> variablesToRemove = (Set<FastThreadLocal<?>>) v;
FastThreadLocal<?>[] variablesToRemoveArray =
variablesToRemove.toArray(new FastThreadLocal[variablesToRemove.size()]);
for (FastThreadLocal<?> tlv: variablesToRemoveArray) {
tlv.remove(threadLocalMap);
}
}
} finally {
InternalThreadLocalMap.remove();
}
}
|
a.InternalThreadLocalMap#remove
1
2
3
4
5
6
7
8
|
public static void remove() {
Thread thread = Thread.currentThread();
if (thread instanceof FastThreadLocalThread) {
((FastThreadLocalThread) thread).setThreadLocalMap(null);
} else {
slowThreadLocalMap.remove();
}
}
|
9.InternalThreadLocalMap
1.内容数据结构,列出几个重要
1
2
3
4
5
6
7
8
9
10
11
|
class UnpaddedInternalThreadLocalMap {
static final ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = new ThreadLocal<InternalThreadLocalMap>();
static final AtomicInteger nextIndex = new AtomicInteger();
/** Used by {@link FastThreadLocal} */
Object[] indexedVariables;
// String-related thread-locals
StringBuilder stringBuilder;
}
|
2.有意思构造方法
1
2
3
4
5
6
7
8
9
10
11
12
13
|
// 缓存行填充(必须是公共的)在启用CompressedOops的情况下,此类的实例至少应占用128个字节。
public long rp1, rp2, rp3, rp4, rp5, rp6, rp7, rp8, rp9;
private InternalThreadLocalMap() {
// this.indexedVariables = indexedVariables;
super(newIndexedVariableTable());
}
private static Object[] newIndexedVariableTable() {
Object[] array = new Object[32];
Arrays.fill(array, UNSET);
return array;
}
|
3.indexedVariables数组很大问题?
4.本地缓存的字符串缓冲区,不用频繁创建StringBuilder,提高性能
1
2
3
4
5
6
7
8
9
10
11
12
|
public StringBuilder stringBuilder() {
StringBuilder sb = stringBuilder;
if (sb == null) {
return stringBuilder = new StringBuilder(STRING_BUILDER_INITIAL_SIZE);
}
if (sb.capacity() > STRING_BUILDER_MAX_SIZE) {
sb.setLength(STRING_BUILDER_INITIAL_SIZE);
sb.trimToSize();
}
sb.setLength(0);
return sb;
}
|
10.ObjectCleaner 对象回收
原理:注册对象创建弱引用的类,等对象没有其他引用,等GC回收,放入ReferenceQueue队列中,启动一个线程读取ReferenceQueue的数据进行cleanup操作;
1.AutomaticCleanerReference是弱引用的类,继承AutomaticCleanerReference
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
private static final class AutomaticCleanerReference extends WeakReference<Object> {
private final Runnable cleanupTask;
AutomaticCleanerReference(Object referent, Runnable cleanupTask) {
super(referent, REFERENCE_QUEUE);
this.cleanupTask = cleanupTask;
}
//清理方法
void cleanup() {
cleanupTask.run();
}
@Override
public Thread get() {
return null;
}
@Override
public void clear() {
LIVE_SET.remove(this);
super.clear();
}
}
|
2.类变量
1
2
|
private static final Set<AutomaticCleanerReference> LIVE_SET = new ConcurrentSet<AutomaticCleanerReference>();
private static final ReferenceQueue<Object> REFERENCE_QUEUE = new ReferenceQueue<Object>();
|
3.register 注册对象回收机制
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
public static void register(Object object, Runnable cleanupTask) {
//创建弱引用对象
AutomaticCleanerReference reference = new AutomaticCleanerReference(object,
ObjectUtil.checkNotNull(cleanupTask, "cleanupTask"));
//缓存
LIVE_SET.add(reference);
// 检查是否已运行清洁线程。
if (CLEANER_RUNNING.compareAndSet(false, true)) {
Thread cleanupThread = new FastThreadLocalThread(CLEANER_TASK);
cleanupThread.setPriority(Thread.MIN_PRIORITY);
// Set to null to ensure we not create classloader leaks by holding a strong reference to the inherited
// classloader.
// 设置为null可以确保我们不对继承的类加载器进行强引用,从而避免创建类加载器泄漏。
// See:
// - https://github.com/netty/netty/issues/7290
// - https://bugs.openjdk.java.net/browse/JDK-7008595
cleanupThread.setContextClassLoader(null);
cleanupThread.setName(CLEANER_THREAD_NAME);
// 该线程不是守护进程,因为一旦所有对已注册对象的引用都将消失,它将死掉,
// 并且始终调用所有清除任务非常重要,因为这些清除任务可能会释放直接内存等。
cleanupThread.setDaemon(false);
cleanupThread.start();
}
}
|
4.CLEANER_TASK 清除任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
private static final Runnable CLEANER_TASK = new Runnable() {
@Override
public void run() {
boolean interrupted = false;
for (;;) {
// 只要LIVE_SET不为空,就继续进行处理,看看是否可以让此线程完成。
while (!LIVE_SET.isEmpty()) {
final AutomaticCleanerReference reference;
try {
//获取回收对象,等待时间,默认10s
reference = (AutomaticCleanerReference) REFERENCE_QUEUE.remove(REFERENCE_QUEUE_POLL_TIMEOUT_MS);
} catch (InterruptedException ex) {
// Just consume and move on
interrupted = true;
continue;
}
if (reference != null) {
try {
//清除操作
reference.cleanup();
} catch (Throwable ignored) {
}
LIVE_SET.remove(reference);
}
}
CLEANER_RUNNING.set(false);
// LIVE_SET为空线程退出
if (LIVE_SET.isEmpty() || !CLEANER_RUNNING.compareAndSet(false, true)) {
break;
}
}
if (interrupted) {
Thread.currentThread().interrupt();
}
}
};
|