ConcurrentSkipListMap源码分析

简介

  • ConcurrentSkipListMap是一种线程安全的有序的Map;
  • ConcurrentSkipListMap内部的数据结构是SkipList(跳表)。

一、ConcurrentSkipListMap 数据结构实现

ConcurrentSkipListMap实现ConcurrentNavigableMap接口,ConcurrentNavigableMap依赖关系图

  ConcurrentSkipListMap结构如图下面

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
        * Head nodes          Index nodes
        * +-+    right        +-+                      +-+
        * |2|---------------->| |--------------------->| |->null
        * +-+                 +-+                      +-+
        *  | down              |                        |
        *  v                   v                        v
        * +-+            +-+  +-+       +-+            +-+       +-+
        * |1|----------->| |->| |------>| |----------->| |------>| |->null
        * +-+            +-+  +-+       +-+            +-+       +-+
        *  v              |    |         |              |         |
        * Nodes  next     v    v         v              v         v
        * +-+  +-+  +-+  +-+  +-+  +-+  +-+  +-+  +-+  +-+  +-+  +-+
        * | |->|A|->|B|->|C|->|D|->|E|->|F|->|G|->|H|->|I|->|J|->|K|->null
        * +-+  +-+  +-+  +-+  +-+  +-+  +-+  +-+  +-+  +-+  +-+  +-+
        *
  • 三个节点
    • Head nodes 头节点
    • Index nodes 索引节点
    • 真实数据节点Node

1.Node节点

a.数据结构

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
       static final class Node<K,V> {
           final K key;
           volatile Object value;
           volatile Node<K,V> next;
   
           /**
            * Creates a new regular node.
            */
           Node(K key, Object value, Node<K,V> next) {
               this.key = key;
               this.value = value;
               this.next = next;
           }
   
          /**
            * 
            * 创建一个标记节点。用于删除操作
            */
           Node(Node<K,V> next) {
               this.key = null;
               this.value = this;
               this.next = next;
           }
   
           /**
            *
            * 判断节点是否为标记节点。
            */
           boolean isMarker() {
               return value == this;
           }
   
           /**
            * 判断节点是否是base-level链表的头节点。
            */
           boolean isBaseHeader() {
               return value == BASE_HEADER;
           }

a.helpDelete 先建立软删除(next添加删除标记节点),后面在删除

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
     void helpDelete(Node<K,V> b, Node<K,V> f) {
              if (f == next && this == b.next) {
                  if (f == null || f.value != f)   //添加删除标记节点
                      //casNext(f, new Node<K,V>(f)); 
                      appendMarker(f);
                  else
                      b.casNext(this, f.next);
              }
          }
      } 

(2).Index node(索引节点)

a.Index数据结构

  • 添加down(下)和right(右)指向索引;
  • down索引创建不能更改;
  • 索引节点删除,node赋值null
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
       static class Index<K,V> {
           final Node<K,V> node;
           final Index<K,V> down;
           volatile Index<K,V> right;
   
           /**
            * Creates index node with given values.
            */
           Index(Node<K,V> node, Index<K,V> down, Index<K,V> right) {
               this.node = node;
               this.down = down;
               this.right = right;
           }

          //删除节点
          final boolean indexesDeletedNode() {
               return node.value == null;
           }

a.link或者unlink,CAS控制right

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
        final boolean link(Index<K,V> succ, Index<K,V> newSucc) {
              Node<K,V> n = node;
              newSucc.right = succ;
              return n.value != null && casRight(succ, newSucc);
          }
  
          final boolean unlink(Index<K,V> succ) {
              return !indexesDeletedNode() && casRight(succ, succ.right);
          } 

(3).HeadIndex(头索引节点)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
    /**
        * Nodes heading each level keep track of their level.
        * 头节点,每个头节点都包含一个表示层级的域。
        */
       static final class HeadIndex<K,V> extends Index<K,V> {
           final int level;
           HeadIndex(Node<K,V> node, Index<K,V> down, Index<K,V> right, int level) {
               super(node, down, right);
               this.level = level;
           }    

       }

(4).ConcurrentSkipListMap内部结构

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public class ConcurrentSkipListMap<K,V> ...{
    
   
       /**
        * 为randomLevel中使用的便宜实例随机数生成器生成初始随机种子。
        */
       private static final Random seedGenerator = new Random();
   
       /**
        * 用来定义base-level的头结点。
        */
       private static final Object BASE_HEADER = new Object();
   
       /**
        * The topmost head index of the skiplist.
        * 
        */
       private transient volatile HeadIndex<K,V> head;
   
       /**
        * 比较器
        * @serial
        */
       private final Comparator<? super K> comparator;
   
       /**
        * 随机种子,这里没有用volatile修饰,多个线程看到不同的值也没关系。
        */
       private transient int randomSeed;
  
   
    public ConcurrentSkipListMap() {
        this.comparator = null;
        initialize();
    }

      /**
       * 初始化或重置内部状态。
       */
      final void initialize() {
          keySet = null;
          entrySet = null;
          values = null;
          descendingMap = null;
          //生成随机种子,这个种子用来生成随机的Level。
          randomSeed = seedGenerator.nextInt() | 0x0100; // ensure nonzero 确保非零 
          //生成头节点,该节点value是BASE_HEADER,level是1。
          head = new HeadIndex<K,V>(new Node<K,V>(null, BASE_HEADER, null), null, null, 1);
      }
  
  }

二、put 添加元素

  • value 不能为空
  • Key必须实现Comparable
  • 设置comparator
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
      public V put(K key, V value) {
          if (value == null)
              throw new NullPointerException();
          return doPut(key, value, false);
      }

      private V doPut(K kkey, V value, boolean onlyIfAbsent) {
          Comparable<? super K> key = comparable(kkey);
          for (;;) {
              Node<K,V> b = findPredecessor(key); //查询key 节点前node
              Node<K,V> n = b.next;
              for (;;) {
                  if (n != null) {
                      Node<K,V> f = n.next;
                      if (n != b.next)               // inconsistent read 不一致,发生竞争重试
                          break;
                      Object v = n.value;
                      if (v == null) {               // n is deleted 节点n已经被删除了
                          n.helpDelete(b, f);   
                          break;
                      }
                      if (v == n || b.value == null) // b is deleted  节点b已经被删除了
                          break;
                      int c = key.compareTo(n.key);
                      if (c > 0) {
                          b = n;
                          n = f;
                          continue;
                      }
                      if (c == 0) {
                          if (onlyIfAbsent || n.casValue(v, value))
                              return (V)v;
                          else
                              break; // restart if lost race to replace value
                      }
                      // else c < 0; fall through
                  }
  
                  Node<K,V> z = new Node<K,V>(kkey, value, n); //创建一个节点
                  if (!b.casNext(n, z)) 
                      break;         // restart if lost race to append to b
                  int level = randomLevel(); //随机生成层级 层级最大31
                  if (level > 0)
                      insertIndex(z, level); //添加索引层次
                  return null;
              }
          }
      }

1.findPredecessor 按key查询前节点值,索引结束该节点值

  • 先从head节点查询开始;遍历到最底层节点;
    • 大于key往right遍历
    • 小于等于往down遍历
  • 处理索引节点为空情况,处理unlink
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
     private Node<K,V> findPredecessor(Comparable<? super K> key) {
         if (key == null)
             throw new NullPointerException(); // don't postpone errors
         for (;;) {
             Index<K,V> q = head;
             Index<K,V> r = q.right;
             for (;;) {
                 if (r != null) {
                     Node<K,V> n = r.node;
                     K k = n.key;
                     if (n.value == null) {//节点为空,删除改索引节点。
                         if (!q.unlink(r))
                             break;           // restart
                         r = q.right;         // reread r
                         continue;
                     }
                     if (key.compareTo(k) > 0) {
                         q = r;
                         r = r.right;
                         continue;
                     }
                 }
                 Index<K,V> d = q.down;
                 if (d != null) {
                     q = d;
                     r = d.right;
                 } else
                     return q.node;
             }
         }
     }     

2.randomLevel 随机获取层次

  • 利用Xorshift随机算法
  • 根据随机值,左移一位&1计算层次
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
        private int randomLevel() {
            int x = randomSeed;
            x ^= x << 13;
            x ^= x >>> 17;
            randomSeed = x ^= x << 5;  // 运用 Xorshift随机算法, https://en.wikipedia.org/wiki/Xorshift
            if ((x & 0x80000001) != 0) // test highest and lowest bits  最高(31)和最低位
                return 0;
            int level = 1;
            while (((x >>>= 1) & 1) != 0) ++level;
            return level;
        }

2.insertIndex 插入索引节点

  • 分两种处理
  • 当前创建层次小于等于现在层次高度
  • 当前创建层次大于现在层次高度
    • CAS的head头节点
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
    private void insertIndex(Node<K,V> z, int level) {
        HeadIndex<K,V> h = head;
        int max = h.level;

        if (level <= max) {
            Index<K,V> idx = null;
            for (int i = 1; i <= level; ++i) // 建立该Node层级关系
                idx = new Index<K,V>(z, idx, null);
            addIndex(idx, h, level);

        } else { // Add a new level
            level = max + 1;
            Index<K,V>[] idxs = (Index<K,V>[])new Index[level+1];
            Index<K,V> idx = null;
            for (int i = 1; i <= level; ++i)
                idxs[i] = idx = new Index<K,V>(z, idx, null);  // 建立该Node层级关系

            HeadIndex<K,V> oldh; //头节点
            int k;
            for (;;) {
                oldh = head;
                int oldLevel = oldh.level;
                if (level <= oldLevel) { // lost race to add level  竞争条件
                    k = level;
                    break;
                }
                HeadIndex<K,V> newh = oldh;
                Node<K,V> oldbase = oldh.node;
                for (int j = oldLevel+1; j <= level; ++j)
                    newh = new HeadIndex<K,V>(oldbase, newh, idxs[j], j); // 建立头节点层级关系
                if (casHead(oldh, newh)) { // CAS 更新头结点
                    k = oldLevel;
                    break;
                }
            }
            addIndex(idxs[k], oldh, k);
        }
    }

3.addIndex 建立索引关系

  • 查找类似findPredecessor
  • 当查找到建立跟索引层次相同,进行link链接操作
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
    private void addIndex(Index<K,V> idx, HeadIndex<K,V> h, int indexLevel) {
        // Track next level to insert in case of retries 跟踪下一个级别,以便在重试时插入
        int insertionLevel = indexLevel;
        Comparable<? super K> key = comparable(idx.node.key);
        if (key == null) throw new NullPointerException();

        // 类似于findPredecessor,但添加索引节点
        // path to key.
        for (;;) {
            int j = h.level;
            Index<K,V> q = h;
            Index<K,V> r = q.right;
            Index<K,V> t = idx;
            for (;;) {
                if (r != null) {
                    Node<K,V> n = r.node;
                    int c = key.compareTo(n.key);
                    //索引删除操作
                    if (n.value == null) {
                        if (!q.unlink(r))
                            break;
                        r = q.right;
                        continue;
                    }
                    if (c > 0) { // 判断key 大于right key 值 
                        q = r;
                        r = r.right;
                        continue;
                    }
                }

                if (j == insertionLevel) {
                    // 如果节点已经被删除,不要插入索引
                    if (t.indexesDeletedNode()) {
                        findNode(key); // cleans up
                        return;
                    }
                    if (!q.link(r, t))
                        break; // restart
                    if (--insertionLevel == 0) {
                        // 在返回之前需要最终的删除检查
                        if (t.indexesDeletedNode())
                            findNode(key);
                        return;
                    }
                }

                if (--j >= insertionLevel && j < indexLevel)
                    t = t.down;
                q = q.down;
                r = q.right;
            }
        }
    }

4.findNode 查找节点

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
    private Node<K,V> findNode(Comparable<? super K> key) {
        for (;;) {
            Node<K,V> b = findPredecessor(key);
            Node<K,V> n = b.next;
            for (;;) {
                if (n == null)
                    return null;
                Node<K,V> f = n.next;
                if (n != b.next)                // inconsistent read
                    break;
                Object v = n.value;
                if (v == null) {                // n is deleted
                    n.helpDelete(b, f);
                    break;
                }
                if (v == n || b.value == null)  // b is deleted
                    break;
                int c = key.compareTo(n.key);
                if (c == 0)
                    return n;
                if (c < 0)
                    return null;
                b = n;
                n = f;
            }
        }
    }    

二、get 获取元素

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
    public V get(Object key) {
        return doGet(key);
    }
    /**
     * Gets value for key using findNode.
     * @param okey the key
     * @return the value, or null if absent
     */
    private V doGet(Object okey) {
        Comparable<? super K> key = comparable(okey);
        /*
         * 在这里和其他地方需要循环的情况下值字段变空,就像它即将返回一样,在这种情况下,
         * 我们失去了一个删除的竞争,所以必须重试。
         */
        for (;;) {
            Node<K,V> n = findNode(key);
            if (n == null)
                return null;
            Object v = n.value;
            if (v != null)
                return (V)v;
        }
    } 

三、remove 删除元素

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
    public V remove(Object key) {
        return doRemove(key, null);
    }

    final V doRemove(Object okey, Object value) {
        Comparable<? super K> key = comparable(okey);
        for (;;) {
            Node<K,V> b = findPredecessor(key);
            Node<K,V> n = b.next;
            for (;;) {
                if (n == null)
                    return null;
                Node<K,V> f = n.next;
                if (n != b.next)                    // inconsistent read
                    break;
                Object v = n.value;
                if (v == null) {                    // n is deleted
                    n.helpDelete(b, f);
                    break;
                }
                if (v == n || b.value == null)      // b is deleted
                    break;
                int c = key.compareTo(n.key);
                if (c < 0)
                    return null;
                if (c > 0) {
                    b = n;
                    n = f;
                    continue;
                }
                if (value != null && !value.equals(v))
                    return null;
                if (!n.casValue(v, null))                   // cas value设置为空
                    break;
                if (!n.appendMarker(f) || !b.casNext(n, f)) // 标志删除节点,
                    findNode(key);                  // Retry via findNode
                else {
                    findPredecessor(key);           // Clean index 清楚节点
                    if (head.right == null)
                        tryReduceLevel();
                }
                return (V)v;
            }
        }
    }

a.tryReduceLevel 尝试降低高度head

如果没有节点,可能会降低head层级。这种方法(很少)会出错,在这种情况下,即使它们将要包含索引节点,层级也会消失。这会影响性能,而不是正确性。 为了最大限度地减少错误并减少滞后现象,只有最顶层的三个层次看起来是空的时,层次才会减少一个。另外,如果在CAS之后删除的级别看起来不是空的,我们会尝试在任何人注意到我们的错误之前快速更改它。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
       private void tryReduceLevel() {
           HeadIndex<K,V> h = head;
           HeadIndex<K,V> d;
           HeadIndex<K,V> e;
           if (h.level > 3 &&
               (d = (HeadIndex<K,V>)h.down) != null &&
               (e = (HeadIndex<K,V>)d.down) != null &&
               e.right == null &&
               d.right == null &&
               h.right == null &&
               casHead(h, d) && // try to set
               h.right != null) // recheck 再检查
               casHead(d, h);   // 尝试回滚操作
       }    

四,size

  • 链表遍历,时间复杂度O(n)
1
2
3
4
5
6
   public int size() {
       long c;
       return ((baseHead() == null) ? 0 :
               ((c = getAdderCount()) >= Integer.MAX_VALUE) ?
               Integer.MAX_VALUE : (int) c);
   } 

五、总结

  • 为什么删除,先插入标记节点?
    • 以避免与并发插入发生冲突;last节点为例子,last删除节点,cas是last的上个节点,问题来, 先并发插入last后面,一个对last的上个节点操作,一个对last节点操作;这样问题导致数据丢失;