PriorityBlockingQueue源码分析

简介

  PriorityBlockingQueue是一个支持优先级的无界阻塞队列。默认情况下元素采用自然顺序升序排序,当然我们也可以通过构造函数来指定Comparator来对元素进行排序。需要注意的是PriorityBlockingQueue不能保证同优先级元素的顺序。

一、PriorityBlockingQueue数据结构实现

  • MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8 为什么减8? 解决:https://stackoverflow.com/questions/35756277/why-the-maximum-array-size-of-arraylist-is-integer-max-value-8
  • 使用堆排序
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
public class PriorityBlockingQueue<E> extends AbstractQueue<E>
    implements BlockingQueue<E>, java.io.Serializable {
    private static final long serialVersionUID = 5595510919245408276L;

    /*
     * The implementation uses an array-based binary heap, with public
     * operations protected with a single lock. However, allocation
     * during resizing uses a simple spinlock (used only while not
     * holding main lock) in order to allow takes to operate
     * concurrently with allocation.  This avoids repeated
     * postponement of waiting consumers and consequent element
     * build-up. The need to back away from lock during allocation
     * makes it impossible to simply wrap delegated
     * java.util.PriorityQueue operations within a lock, as was done
     * in a previous version of this class. To maintain
     * interoperability, a plain PriorityQueue is still used during
     * serialization, which maintains compatibility at the expense of
     * transiently doubling overhead.
     */

    /**
     * Default array capacity.
     * 默认的数组容量。
     */
    private static final int DEFAULT_INITIAL_CAPACITY = 11;

    /**
     * The maximum size of array to allocate.
     * Some VMs reserve some header words in an array.
     * Attempts to allocate larger arrays may result in
     * OutOfMemoryError: Requested array size exceeds VM limit
     * 要分配的数组的最大大小。一些VM在阵列中保留一些标题字。尝试分配更大的数组可能会导致内存错误:请求的数组大小超过VM限制
     */
    private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

    /**
     * Priority queue represented as a balanced binary heap: the two
     * children of queue[n] are queue[2*n+1] and queue[2*(n+1)].  The
     * priority queue is ordered by comparator, or by the elements'
     * natural ordering, if comparator is null: For each node n in the
     * heap and each descendant d of n, n <= d.  The element with the
     * lowest value is in queue[0], assuming the queue is nonempty.
     * 优先队列表示为一个平衡的二进制堆
     */
    private transient Object[] queue;

    /**
     * The number of elements in the priority queue.
     * 优先队列中的元素个数
     */
    private transient int size;

    /**
     * The comparator, or null if priority queue uses elements'
     * natural ordering.
     * 如果优先队列使用元素的自然排序,则比较器或null。
     */
    private transient Comparator<? super E> comparator;

    /**
     * Lock used for all public operations
     * 重入锁
     */
    private final ReentrantLock lock;

    /**
     * Condition for blocking when empty
     * 当空时阻塞的条件
     */
    private final Condition notEmpty;

    /**
     * Spinlock for allocation, acquired via CAS.
     * 用于分配的自旋锁,通过CAS获得。
     */
    private transient volatile int allocationSpinLock;

    /**
     * A plain PriorityQueue used only for serialization,
     * to maintain compatibility with previous versions
     * of this class. Non-null only during serialization/deserialization.
     * 一个简单的优先队列只用于序列化,以保持与以前版本的这个类的兼容性。非空仅在序列化/反序列化期间。
     */
    private PriorityQueue q;

    /**
     * Creates a {@code PriorityBlockingQueue} with the default
     * initial capacity (11) that orders its elements according to
     * their {@linkplain Comparable natural ordering}.
     */
    public PriorityBlockingQueue() {
        this(DEFAULT_INITIAL_CAPACITY, null);
    }

    /**
     * Creates a {@code PriorityBlockingQueue} with the specified
     * initial capacity that orders its elements according to their
     * {@linkplain Comparable natural ordering}.
     *
     * @param initialCapacity the initial capacity for this priority queue
     * @throws IllegalArgumentException if {@code initialCapacity} is less
     *         than 1
     */
    public PriorityBlockingQueue(int initialCapacity) {
        this(initialCapacity, null);
    }

    /**
     * Creates a {@code PriorityBlockingQueue} with the specified initial
     * capacity that orders its elements according to the specified
     * comparator.
     *
     * @param initialCapacity the initial capacity for this priority queue
     * @param  comparator the comparator that will be used to order this
     *         priority queue.  If {@code null}, the {@linkplain Comparable
     *         natural ordering} of the elements will be used.
     * @throws IllegalArgumentException if {@code initialCapacity} is less
     *         than 1
     */
    public PriorityBlockingQueue(int initialCapacity,
                                 Comparator<? super E> comparator) {
        if (initialCapacity < 1)
            throw new IllegalArgumentException();
        this.lock = new ReentrantLock();
        this.notEmpty = lock.newCondition();
        this.comparator = comparator;
        this.queue = new Object[initialCapacity];
    }

    /**
     * Creates a {@code PriorityBlockingQueue} containing the elements
     * in the specified collection.  If the specified collection is a
     * {@link SortedSet} or a {@link PriorityQueue},  this
     * priority queue will be ordered according to the same ordering.
     * Otherwise, this priority queue will be ordered according to the
     * {@linkplain Comparable natural ordering} of its elements.
     *
     * @param  c the collection whose elements are to be placed
     *         into this priority queue
     * @throws ClassCastException if elements of the specified collection
     *         cannot be compared to one another according to the priority
     *         queue's ordering
     * @throws NullPointerException if the specified collection or any
     *         of its elements are null
     */
    public PriorityBlockingQueue(Collection<? extends E> c) {
        this.lock = new ReentrantLock();
        this.notEmpty = lock.newCondition();
        boolean heapify = true; // true if not known to be in heap order 如果不知道为堆顺序,则为真
        boolean screen = true;  // true if must screen for nulls  如果必须筛选为空
        if (c instanceof SortedSet<?>) {
            SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
            this.comparator = (Comparator<? super E>) ss.comparator();
            heapify = false;
        }
        else if (c instanceof PriorityBlockingQueue<?>) {
            PriorityBlockingQueue<? extends E> pq =
                (PriorityBlockingQueue<? extends E>) c;
            this.comparator = (Comparator<? super E>) pq.comparator();
            screen = false;
            if (pq.getClass() == PriorityBlockingQueue.class) // exact match
                heapify = false;
        }
        Object[] a = c.toArray();
        int n = a.length;
        // If c.toArray incorrectly doesn't return Object[], copy it.
        if (a.getClass() != Object[].class)
            a = Arrays.copyOf(a, n, Object[].class);
        if (screen && (n == 1 || this.comparator != null)) {
            for (int i = 0; i < n; ++i)
                if (a[i] == null)
                    throw new NullPointerException();
        }
        this.queue = a;
        this.size = n;
        if (heapify)
            heapify();
    }
    /**
     * Establishes the heap invariant (described above) in the entire tree,
     * assuming nothing about the order of the elements prior to the call.
     * 在整棵树中建立堆不变(如上所述),在调用之前不考虑元素的顺序。
     */
    private void heapify() {
        Object[] array = queue;
        int n = size;
        int half = (n >>> 1) - 1;
        Comparator<? super E> cmp = comparator;
        if (cmp == null) {
            //构建小顶堆
            for (int i = half; i >= 0; i--) 
                siftDownComparable(i, (E) array[i], array, n); //建立堆
        }
        else {
            for (int i = half; i >= 0; i--)
                siftDownUsingComparator(i, (E) array[i], array, n, cmp); //建立堆
        }
    }    

 /**
     * Inserts item x at position k, maintaining heap invariant by
     * demoting x down the tree repeatedly until it is less than or
     * equal to its children or is a leaf.
     *
     * @param k the position to fill
     * @param x the item to insert
     * @param array the heap array
     * @param n heap size
     */
    private static <T> void siftDownComparable(int k, T x, Object[] array,
                                               int n) {
        if (n > 0) {
            Comparable<? super T> key = (Comparable<? super T>)x;
            int half = n >>> 1;           // loop while a non-leaf
            while (k < half) {
                int child = (k << 1) + 1; // assume left child is least 左孩子节点下标
                Object c = array[child];
                int right = child + 1;
                if (right < n &&
                    ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
                    c = array[child = right];
                if (key.compareTo((T) c) <= 0) //如果父节点小于子节点,不用进行交换
                    break;
                array[k] = c;
                k = child;
            }
            array[k] = key;
        }
    }

    private static <T> void siftDownUsingComparator(int k, T x, Object[] array,
                                                    int n,
                                                    Comparator<? super T> cmp) {
        if (n > 0) {
            int half = n >>> 1;
            while (k < half) {
                int child = (k << 1) + 1;
                Object c = array[child];
                int right = child + 1;
                if (right < n && cmp.compare((T) c, (T) array[right]) > 0)
                    c = array[child = right];
                if (cmp.compare(x, (T) c) <= 0)
                    break;
                array[k] = c;
                k = child;
            }
            array[k] = x;
        }
    }
}

二、入队

  • offer(E e)
  • offer(E e, long timeout, TimeUnit unit) 将指定元素插入该队列的尾部,如果队列已满,PriorityBlockingQueue不支持等待时间
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
 /** 
     * Inserts the specified element into this priority queue.
     * As the queue is unbounded, this method will never return {@code false}.
     * 将指定元素插入此优先队列。因为队列是无界的,所以这个方法永远不会返回{@ code false}。
     * @param e the element to add
     * @return {@code true} (as specified by {@link Queue#offer})
     * @throws ClassCastException if the specified element cannot be compared
     *         with elements currently in the priority queue according to the
     *         priority queue's ordering
     * @throws NullPointerException if the specified element is null
     */
    public boolean offer(E e) {
        if (e == null)
            throw new NullPointerException();
        final ReentrantLock lock = this.lock;
        lock.lock();
        int n, cap;
        Object[] array;
        while ((n = size) >= (cap = (array = queue).length))
            tryGrow(array, cap); //扩容
        try {
            Comparator<? super E> cmp = comparator;
            if (cmp == null)
                siftUpComparable(n, e, array);  //往上构建堆结构 添加数据
            else
                siftUpUsingComparator(n, e, array, cmp); //往上构建堆结构
            size = n + 1;
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
        return true;
    }

 /**
  * Tries to grow array to accommodate at least one more element
  * (but normally expand by about 50%), giving up (allowing retry)
  * on contention (which we expect to be rare). Call only while
  * holding lock.
  * 尝试增加数组以容纳至少一个元素(但通常会扩展约50%),放弃(允许重试)争用(我们期望很少)。只在持有锁时调用。
  * @param array the heap array
  * @param oldCap the length of the array
  */
 private void tryGrow(Object[] array, int oldCap) {
     lock.unlock(); // must release and then re-acquire main lock 必须释放,然后重新获得主锁
     Object[] newArray = null;
     if (allocationSpinLock == 0 &&
         UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                                  0, 1)) { // 用CAS 扩容判断
         try {
             int newCap = oldCap + ((oldCap < 64) ?
                                    (oldCap + 2) : // grow faster if small
                                    (oldCap >> 1));  // 旧容量小于64,扩容2倍+2 大于64 扩容1.5
             if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow 可能的溢出
                 int minCap = oldCap + 1;
                 if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                     throw new OutOfMemoryError();
                 newCap = MAX_ARRAY_SIZE;
             }
             if (newCap > oldCap && queue == array)
                 newArray = new Object[newCap];
         } finally {
             allocationSpinLock = 0;
         }
     }
     if (newArray == null) // back off if another thread is allocating
         Thread.yield();
     lock.lock();
     if (newArray != null && queue == array) {
         queue = newArray;
         System.arraycopy(array, 0, newArray, 0, oldCap); //扩容数组
     }
 }
    /**
      * Inserts item x at position k, maintaining heap invariant by
      * promoting x up the tree until it is greater than or equal to
      * its parent, or is the root.
      *
      * To simplify and speed up coercions and comparisons. the
      * Comparable and Comparator versions are separated into different
      * methods that are otherwise identical. (Similarly for siftDown.)
      * These methods are static, with heap state as arguments, to
      * simplify use in light of possible comparator exceptions.
      *
      * @param k the position to fill
      * @param x the item to insert
      * @param array the heap array
      * @param n heap size
      */
     private static <T> void siftUpComparable(int k, T x, Object[] array) {
         Comparable<? super T> key = (Comparable<? super T>) x;
         while (k > 0) {
             int parent = (k - 1) >>> 1;
             Object e = array[parent];
             if (key.compareTo((T) e) >= 0) // 子节点大于父节点 不用做替换
                 break;
             array[k] = e;
             k = parent;
         }
         array[k] = key;
     }
 
     private static <T> void siftUpUsingComparator(int k, T x, Object[] array,
                                        Comparator<? super T> cmp) {
         while (k > 0) {
             int parent = (k - 1) >>> 1;
             Object e = array[parent];
             if (cmp.compare(x, (T) e) >= 0)
                 break;
             array[k] = e;
             k = parent;
         }
         array[k] = x;
     }
    /**
     * Inserts the specified element into this priority queue.
     * As the queue is unbounded, this method will never block or
     * return {@code false}.
     *
     * @param e the element to add
     * @param timeout This parameter is ignored as the method never blocks
     * @param unit This parameter is ignored as the method never blocks
     * @return {@code true} (as specified by
     *  {@link BlockingQueue#offer(Object,long,TimeUnit) BlockingQueue.offer})
     * @throws ClassCastException if the specified element cannot be compared
     *         with elements currently in the priority queue according to the
     *         priority queue's ordering
     * @throws NullPointerException if the specified element is null
     */
    public boolean offer(E e, long timeout, TimeUnit unit) {
        return offer(e); // never need to block
    }  

2. add方法

add(E e) 插入此优先队列,队列满了,PriorityBlockingQueue 不支持throw IllegalStateException异常

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
   /**
     * Inserts the specified element into this priority queue.
     * 将指定元素插入此优先队列。
     * @param e the element to add
     * @return {@code true} (as specified by {@link Collection#add})
     * @throws ClassCastException if the specified element cannot be compared
     *         with elements currently in the priority queue according to the
     *         priority queue's ordering
     * @throws NullPointerException if the specified element is null
     */
    public boolean add(E e) {
        return offer(e);
    }

3. put方法

  • 将指定元素插入此优先队列。因为队列是无界的,所以这个方法永远不会阻塞。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
    /**
     * Inserts the specified element into this priority queue.
     * As the queue is unbounded, this method will never block.
     * 将指定元素插入此优先队列。因为队列是无界的,所以这个方法永远不会阻塞。
     * @param e the element to add
     * @throws ClassCastException if the specified element cannot be compared
     *         with elements currently in the priority queue according to the
     *         priority queue's ordering
     * @throws NullPointerException if the specified element is null
     */
    public void put(E e) {
        offer(e); // never need to block
    }

二、出队

1. poll方法

  • poll() 获取并移除此队列的头,如果此队列为空,则返回 null
  • E poll(long timeout, TimeUnit unit) 获取并移除此队列的头部,队列为空时,在指定的等待时间前等待可用的元素。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
 public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return dequeue();
        } finally {
            lock.unlock();
        }
    }
    /**
     * Mechanics for poll().  Call only while holding lock.
     */
    private E dequeue() {
        int n = size - 1;
        if (n < 0)
            return null;
        else {
            Object[] array = queue;
            E result = (E) array[0];
            E x = (E) array[n];
            array[n] = null;
            Comparator<? super E> cmp = comparator;
            if (cmp == null)
                siftDownComparable(0, x, array, n); //重新构建堆结构 把最后一位赋给第一位,重新重构堆结构
            else
                siftDownUsingComparator(0, x, array, n, cmp);//重新构建堆结构
            size = n;
            return result;
        }
    }
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        E result;
        try {
            while ( (result = dequeue()) == null && nanos > 0)
                nanos = notEmpty.awaitNanos(nanos);
        } finally {
            lock.unlock();
        }
        return result;
    }    

2. take方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        E result;
        try {
            while ( (result = dequeue()) == null)
                notEmpty.await();
        } finally {
            lock.unlock();
        }
        return result;
    }

3. peek方法

peek方法是java.util.Queue接口方法。获取队列的头部,不删除队列头部元素。

1
2
3
4
5
6
7
8
9
    public E peek() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return (size == 0) ? null : (E) queue[0];
        } finally {
            lock.unlock();
        }
    }

三、移除队列

1. remove方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
    /**
     * Removes a single instance of the specified element from this queue,
     * if it is present.  More formally, removes an element {@code e} such
     * that {@code o.equals(e)}, if this queue contains one or more such
     * elements.  Returns {@code true} if and only if this queue contained
     * the specified element (or equivalently, if this queue changed as a
     * result of the call).
     *
     * @param o element to be removed from this queue, if present
     * @return {@code true} if this queue changed as a result of the call
     */
    public boolean remove(Object o) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            int i = indexOf(o);
            if (i == -1)
                return false;
            removeAt(i);
            return true;
        } finally {
            lock.unlock();
        }
    }
    private int indexOf(Object o) {
        if (o != null) {
            Object[] array = queue;
            int n = size;
            for (int i = 0; i < n; i++)
                if (o.equals(array[i]))
                    return i;
        }
        return -1;
    }    
  /**
     * Removes the ith element from queue.
     * 从队列中移除第i个元素。
     */
    private void removeAt(int i) {
        Object[] array = queue;
        int n = size - 1;
        if (n == i) // removed last element 移除最后一个元素
            array[i] = null;
        else {
            E moved = (E) array[n];
            array[n] = null;
            Comparator<? super E> cmp = comparator;
            if (cmp == null)
                siftDownComparable(i, moved, array, n); // 往下重构堆结构
            else
                siftDownUsingComparator(i, moved, array, n, cmp);
            if (array[i] == moved) {  // 相等没有往下没有重构堆结构 往上重新重构堆结构
                if (cmp == null)
                    siftUpComparable(i, moved, array);  
                else
                    siftUpUsingComparator(i, moved, array, cmp);
            }
        }
        size = n;
    }    

2. clear方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
    /**
     * Atomically removes all of the elements from this queue.
     * The queue will be empty after this call returns.
     */
    public void clear() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            Object[] array = queue;
            int n = size;
            size = 0;
            for (int i = 0; i < n; i++)
                array[i] = null;
        } finally {
            lock.unlock();
        }
    }

3. drainTo方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
   /**
      * @throws UnsupportedOperationException {@inheritDoc}
      * @throws ClassCastException            {@inheritDoc}
      * @throws NullPointerException          {@inheritDoc}
      * @throws IllegalArgumentException      {@inheritDoc}
      */
     public int drainTo(Collection<? super E> c) {
         return drainTo(c, Integer.MAX_VALUE);
     }
 
     /**
      * @throws UnsupportedOperationException {@inheritDoc}
      * @throws ClassCastException            {@inheritDoc}
      * @throws NullPointerException          {@inheritDoc}
      * @throws IllegalArgumentException      {@inheritDoc}
      */
     public int drainTo(Collection<? super E> c, int maxElements) {
         if (c == null)
             throw new NullPointerException();
         if (c == this)
             throw new IllegalArgumentException();
         if (maxElements <= 0)
             return 0;
         final ReentrantLock lock = this.lock;
         lock.lock();
         try {
             int n = Math.min(size, maxElements);
             for (int i = 0; i < n; i++) {
                 c.add((E) queue[0]); // In this order, in case add() throws.
                 dequeue();          //循环执行poll操作
             }
             return n;
         } finally {
             lock.unlock();
         }
     }

五、Iterator 迭代器实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
  /**
     * Snapshot iterator that works off copy of underlying q array.
     */
    final class Itr implements Iterator<E> {
        final Object[] array; // Array of all elements 数组的所有元素
        int cursor;           // index of next element to return 下一个元素的索引返回
        int lastRet;          // index of last element, or -1 if no such 最后一个元素的索引,或者- 1,如果没有

        Itr(Object[] array) {
            lastRet = -1;
            this.array = array;
        }

        public boolean hasNext() {
            return cursor < array.length;
        }

        public E next() {
            if (cursor >= array.length)
                throw new NoSuchElementException();
            lastRet = cursor;
            return (E)array[cursor++];
        }

        public void remove() {
            if (lastRet < 0)
                throw new IllegalStateException();
            removeEQ(array[lastRet]);//移除其中值
            lastRet = -1;
        }
    }
     /**
      * Identity-based version for use in Itr.remove
      */
     void removeEQ(Object o) {
         final ReentrantLock lock = this.lock;
         lock.lock();
         try {
             Object[] array = queue;
             for (int i = 0, n = size; i < n; i++) {
                 if (o == array[i]) {
                     removeAt(i);
                     break;
                 }
             }
         } finally {
             lock.unlock();
         }
     }