一、介绍

Netty为了提高性能,Java的ThreadLocal不用,自己创建FastThreadLocal,屌爆ThreadLocal; ThreadLocal的内存泄露的风险。

当从FastThreadLocalThread访问时,具有更高的访问性能。内部结构用数组保存,FastThreadLocal在数组中使用常量索引来查找变量, 而不是使用哈希码和哈希表。尽管看似非常微妙,但与使用哈希表相比,它在性能上却有一点优势,并且在经常访问时很有用。

二、FastThreadLocal

1.数据结构

1
2
3
4
5
   private final int index;

   public FastThreadLocal() {
       index = InternalThreadLocalMap.nextVariableIndex();
   } 
  • 常量index
  • InternalThreadLocalMap.nextVariableIndex()赋值给index

a.InternalThreadLocalMap.nextVariableIndex()

  • nextIndex是静态常量AtomicInteger
  • 获取index是自动增长,原子操作;多线程访问存在竞争问题,CAS控制;
1
2
3
4
5
6
7
8
   public static int nextVariableIndex() {
       int index = nextIndex.getAndIncrement();
       if (index < 0) {
           nextIndex.decrementAndGet();
           throw new IllegalStateException(...);
       }
       return index;
   } 

2.get 返回当前线程的当前值

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
   public final V get() {
       InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
       Object v = threadLocalMap.indexedVariable(index);
       if (v != InternalThreadLocalMap.UNSET) {
           return (V) v;
       }

       V value = initialize(threadLocalMap);
       registerCleaner(threadLocalMap);
       return value;
   }

a.InternalThreadLocalMap.get() 当线程中获取InternalThreadLocalMap

  • fastGet比slowGet快,前提条件创建FastThreadLocalThread线程;
1
2
3
4
5
6
7
8
   public static InternalThreadLocalMap get() {
       Thread thread = Thread.currentThread();
       if (thread instanceof FastThreadLocalThread) {
           return fastGet((FastThreadLocalThread) thread);
       } else {
           return slowGet();
       }
   } 

b.InternalThreadLocalMap.fastGet()

FastThreadLocalThread变量threadLocalMap查找InternalThreadLocalMap,为空创建InternalThreadLocalMap

1
2
3
4
5
6
7
  private static InternalThreadLocalMap fastGet(FastThreadLocalThread thread) {
        InternalThreadLocalMap threadLocalMap = thread.threadLocalMap();
        if (threadLocalMap == null) {
            thread.setThreadLocalMap(threadLocalMap = new InternalThreadLocalMap());
        }
        return threadLocalMap;
    }

b.InternalThreadLocalMap.slowGet()

从JAVA的ThreadLocal获取InternalThreadLocalMap,不存在创建InternalThreadLocalMap

1
2
3
4
5
6
7
8
9
    private static InternalThreadLocalMap slowGet() {
        ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = UnpaddedInternalThreadLocalMap.slowThreadLocalMap;
        InternalThreadLocalMap ret = slowThreadLocalMap.get();
        if (ret == null) {
            ret = new InternalThreadLocalMap();
            slowThreadLocalMap.set(ret);
        }
        return ret;
    }

c.InternalThreadLocalMap.indexedVariable获取当前线程的当前值;

  • 数组保存变量值
  • 数组下标访问,速度很快
1
2
3
4
   public Object indexedVariable(int index) {
       Object[] lookup = indexedVariables;
       return index < lookup.length? lookup[index] : UNSET;
   } 

3.initialize 未获取到值,进行初始化操作

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
   private V initialize(InternalThreadLocalMap threadLocalMap) {
       V v = null;
       try {
           //空方法,具体创建对象时,自行重写方法;
           v = initialValue();
       } catch (Exception e) {
           PlatformDependent.throwException(e);
       }

       threadLocalMap.setIndexedVariable(index, v);
       addToVariablesToRemove(threadLocalMap, this);
       return v;
   } 

a.InternalThreadLocalMap.setIndexedVariable 设置本地线程变量

  • 往数组中指定下标赋值本地变量
  • 下标索引大于数组大小,继续扩容
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
   public boolean setIndexedVariable(int index, Object value) {
       Object[] lookup = indexedVariables;
       if (index < lookup.length) {
           Object oldValue = lookup[index];
           lookup[index] = value;
           return oldValue == UNSET;
       } else {
           expandIndexedVariableTableAndSet(index, value);
           return true;
       }
   }

b.InternalThreadLocalMap.expandIndexedVariableTableAndSet 进行扩容

  • 进行扩容,2次方值大于下标索引最近值;
  • 在进行下标赋值
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
    private void expandIndexedVariableTableAndSet(int index, Object value) {
        Object[] oldArray = indexedVariables;
        final int oldCapacity = oldArray.length;
        int newCapacity = index;
        newCapacity |= newCapacity >>>  1;
        newCapacity |= newCapacity >>>  2;
        newCapacity |= newCapacity >>>  4;
        newCapacity |= newCapacity >>>  8;
        newCapacity |= newCapacity >>> 16;
        newCapacity ++;

        Object[] newArray = Arrays.copyOf(oldArray, newCapacity);
        Arrays.fill(newArray, oldCapacity, newArray.length, UNSET);
        newArray[index] = value;
        indexedVariables = newArray;
    }

4.addToVariablesToRemove 为本地线程添加删除索引的变量

  • variablesToRemoveIndex默认为零,静态常量变量;类创建第一次就调用InternalThreadLocalMap.nextVariableIndex;
  • 删除索引的变量的值是Set
  • 作用后面说
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
    private static final int variablesToRemoveIndex = InternalThreadLocalMap.nextVariableIndex();
    
    private static void addToVariablesToRemove(InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) {
        Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
        Set<FastThreadLocal<?>> variablesToRemove;
        if (v == InternalThreadLocalMap.UNSET || v == null) {
            variablesToRemove = Collections.newSetFromMap(new IdentityHashMap<FastThreadLocal<?>, Boolean>());
            threadLocalMap.setIndexedVariable(variablesToRemoveIndex, variablesToRemove);
        } else {
            variablesToRemove = (Set<FastThreadLocal<?>>) v;
        }
        variablesToRemove.add(variable);
    }

5.registerCleaner 对象清理注册ObjectCleaner.register

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
   private void registerCleaner(final InternalThreadLocalMap threadLocalMap) {
       Thread current = Thread.currentThread();
       if (!FastThreadLocalThread.willCleanupFastThreadLocals(current)) {
           //我们将需要确保我们将触发remove(InternalThreadLocalMap),
           // 以便将所有内容释放并调用FastThreadLocal.onRemoval(...)。
           ObjectCleaner.register(current, new Runnable() {
               @Override
               public void run() {
                   remove(threadLocalMap);
                   //最好不要在此处调用InternalThreadLocalMap.remove(),
                   //因为只有在GC收集了线程后才会触发此操作。在这种情况下,ThreadLocal已经消失了。
               }
           });
       }
   } 

6.remove GC清理

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
   public final void remove(InternalThreadLocalMap threadLocalMap) {
       if (threadLocalMap == null) {
           return;
       }
       //把当前下标设置成UNSET 
       Object v = threadLocalMap.removeIndexedVariable(index);
       //variablesToRemoveIndex下标值中Set移除当前下标索引
       removeFromVariablesToRemove(threadLocalMap, this);

       if (v != InternalThreadLocalMap.UNSET) {
           try {
               //空方法,自行实现处理相应逻辑
               onRemoval((V) v);
           } catch (Exception e) {
               PlatformDependent.throwException(e);
           }
       }
   } 

7.set 设置变量值

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
   public final void set(V value) {
       if (value != InternalThreadLocalMap.UNSET) {
           InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
           if (setKnownNotUnset(threadLocalMap, value)) {
               registerCleaner(threadLocalMap);
           }
       } else {
           remove();
       }
   } 

   private boolean setKnownNotUnset(InternalThreadLocalMap threadLocalMap, V value) {
       if (threadLocalMap.setIndexedVariable(index, value)) {
           addToVariablesToRemove(threadLocalMap, this);
           return true;
       }
       return false;
   }
  • 具体方法上面都说

8.removeAll

  • 移除本地线程中所有FastThreadLocal变量值
  • variablesToRemoveIndex下标保存所有FastThreadLocal,可以快速触发remove
  • 为什么variablesToRemoveIndex下标保存所有FastThreadLocal索引呢?
    • 不用遍历数组InternalThreadLocalMap.indexedVariables,数组很大,反而影响性能
  • FastThreadLocalRunnable执行run的finally执行FastThreadLocal.removeAll();
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
   public static void removeAll() {
       InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.getIfSet();
       if (threadLocalMap == null) {
           return;
       }

       try {
           Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
           if (v != null && v != InternalThreadLocalMap.UNSET) {
               @SuppressWarnings("unchecked")
               Set<FastThreadLocal<?>> variablesToRemove = (Set<FastThreadLocal<?>>) v;
               FastThreadLocal<?>[] variablesToRemoveArray =
                       variablesToRemove.toArray(new FastThreadLocal[variablesToRemove.size()]);
               for (FastThreadLocal<?> tlv: variablesToRemoveArray) {
                   tlv.remove(threadLocalMap);
               }
           }
       } finally {
           InternalThreadLocalMap.remove();
       }
   } 

a.InternalThreadLocalMap#remove

1
2
3
4
5
6
7
8
   public static void remove() {
       Thread thread = Thread.currentThread();
       if (thread instanceof FastThreadLocalThread) {
           ((FastThreadLocalThread) thread).setThreadLocalMap(null);
       } else {
           slowThreadLocalMap.remove();
       }
   } 

9.InternalThreadLocalMap

1.内容数据结构,列出几个重要

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
class UnpaddedInternalThreadLocalMap {

   static final ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = new ThreadLocal<InternalThreadLocalMap>();
   static final AtomicInteger nextIndex = new AtomicInteger();

   /** Used by {@link FastThreadLocal} */
   Object[] indexedVariables;

   // String-related thread-locals
   StringBuilder stringBuilder;
} 

2.有意思构造方法

  • 默认创建32位数组
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
   // 缓存行填充(必须是公共的)在启用CompressedOops的情况下,此类的实例至少应占用128个字节。
   public long rp1, rp2, rp3, rp4, rp5, rp6, rp7, rp8, rp9;

   private InternalThreadLocalMap() {
       // this.indexedVariables = indexedVariables;
       super(newIndexedVariableTable());
   }

   private static Object[] newIndexedVariableTable() {
       Object[] array = new Object[32];
       Arrays.fill(array, UNSET);
       return array;
   } 

3.indexedVariables数组很大问题?

  • nextIndex是静态常量的原子int,不停创建FastThreadLocalThread线程,以及使用FastThreadLocal, 导致nextIndex,最后创建indexedVariables数组很大?浪费空间?

  • bug https://github.com/netty/netty/issues/9328

  • 设计时考虑一下

4.本地缓存的字符串缓冲区,不用频繁创建StringBuilder,提高性能

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
  public StringBuilder stringBuilder() {
       StringBuilder sb = stringBuilder;
       if (sb == null) {
           return stringBuilder = new StringBuilder(STRING_BUILDER_INITIAL_SIZE);
       }
       if (sb.capacity() > STRING_BUILDER_MAX_SIZE) {
           sb.setLength(STRING_BUILDER_INITIAL_SIZE);
           sb.trimToSize();
       }
       sb.setLength(0);
       return sb;
   } 

10.ObjectCleaner 对象回收

原理:注册对象创建弱引用的类,等对象没有其他引用,等GC回收,放入ReferenceQueue队列中,启动一个线程读取ReferenceQueue的数据进行cleanup操作;

1.AutomaticCleanerReference是弱引用的类,继承AutomaticCleanerReference

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
   private static final class AutomaticCleanerReference extends WeakReference<Object> {
       private final Runnable cleanupTask;

       AutomaticCleanerReference(Object referent, Runnable cleanupTask) {
           super(referent, REFERENCE_QUEUE);
           this.cleanupTask = cleanupTask;
       }
       //清理方法
       void cleanup() {
           cleanupTask.run();
       }

       @Override
       public Thread get() {
           return null;
       }

       @Override
       public void clear() {
           LIVE_SET.remove(this);
           super.clear();
       }
   }

2.类变量

1
2
    private static final Set<AutomaticCleanerReference> LIVE_SET = new ConcurrentSet<AutomaticCleanerReference>();
    private static final ReferenceQueue<Object> REFERENCE_QUEUE = new ReferenceQueue<Object>();

3.register 注册对象回收机制

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
 public static void register(Object object, Runnable cleanupTask) {
     //创建弱引用对象
      AutomaticCleanerReference reference = new AutomaticCleanerReference(object,
              ObjectUtil.checkNotNull(cleanupTask, "cleanupTask"));
      //缓存
      LIVE_SET.add(reference);

      // 检查是否已运行清洁线程。
      if (CLEANER_RUNNING.compareAndSet(false, true)) {
          Thread cleanupThread = new FastThreadLocalThread(CLEANER_TASK);
          cleanupThread.setPriority(Thread.MIN_PRIORITY);
          // Set to null to ensure we not create classloader leaks by holding a strong reference to the inherited
          // classloader.
          // 设置为null可以确保我们不对继承的类加载器进行强引用,从而避免创建类加载器泄漏。
          // See:
          // - https://github.com/netty/netty/issues/7290
          // - https://bugs.openjdk.java.net/browse/JDK-7008595
          cleanupThread.setContextClassLoader(null);
          cleanupThread.setName(CLEANER_THREAD_NAME);

          // 该线程不是守护进程,因为一旦所有对已注册对象的引用都将消失,它将死掉,
          // 并且始终调用所有清除任务非常重要,因为这些清除任务可能会释放直接内存等。
          cleanupThread.setDaemon(false);
          cleanupThread.start();
      }
  }  

4.CLEANER_TASK 清除任务

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
   private static final Runnable CLEANER_TASK = new Runnable() {
       @Override
       public void run() {
           boolean interrupted = false;
           for (;;) {
               // 只要LIVE_SET不为空,就继续进行处理,看看是否可以让此线程完成。
               while (!LIVE_SET.isEmpty()) {
                   final AutomaticCleanerReference reference;
                   try {
                       //获取回收对象,等待时间,默认10s
                       reference = (AutomaticCleanerReference) REFERENCE_QUEUE.remove(REFERENCE_QUEUE_POLL_TIMEOUT_MS);
                   } catch (InterruptedException ex) {
                       // Just consume and move on
                       interrupted = true;
                       continue;
                   }
                   if (reference != null) {
                       try {
                           //清除操作 
                           reference.cleanup();
                       } catch (Throwable ignored) {
                       }
                       LIVE_SET.remove(reference);
                   }
               }
               CLEANER_RUNNING.set(false);

               // LIVE_SET为空线程退出
               if (LIVE_SET.isEmpty() || !CLEANER_RUNNING.compareAndSet(false, true)) {
                   break;
               }
           }
           if (interrupted) {
               Thread.currentThread().interrupt();
           }
       }
   };