DelayQueue(延时队列)源码分析

简介

  DelayQueue是一种无界的阻塞队列,队列里只允许放入可以"延期"的元素,队列中列头的元素是最先"到期"的元素。 如果队列中没有任何元素"到期”,尽管队列中有元素,也不能从队列头获取到任何元素。

一、DelayQueue 内部实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
    implements BlockingQueue<E> {
    //重入锁
    private transient final ReentrantLock lock = new ReentrantLock();
    //优先级队列
    private final PriorityQueue<E> q = new PriorityQueue<E>();

    /**
     * Thread designated to wait for the element at the head of
     * the queue.  This variant of the Leader-Follower pattern
     * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
     * minimize unnecessary timed waiting.  When a thread becomes
     * the leader, it waits only for the next delay to elapse, but
     * other threads await indefinitely.  The leader thread must
     * signal some other thread before returning from take() or
     * poll(...), unless some other thread becomes leader in the
     * interim.  Whenever the head of the queue is replaced with
     * an element with an earlier expiration time, the leader
     * field is invalidated by being reset to null, and some
     * waiting thread, but not necessarily the current leader, is
     * signalled.  So waiting threads must be prepared to acquire
     * and lose leadership while waiting.
     * 指定在队列头部等待元素的线程。Leader-Follower模式的这种变体用于最小化不必要的定时等待.
     * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/)
     * 当一个线程成为领导者时,它只等待下一个延迟,但其他线程则无限期地等待。
     * 领导者线程必须在从take()或poll(...)返回之前发出其他线程的信号,
     * 除非其他线程在过渡期间成为领导者。每当队列的头部被一个具有较早到期时间的元素替换时,
     * 领导者字段被重置为空而失效,并且一些等待线程(但不一定是当前领导者)被发信号通知。
     * 所以等待的线程必须准备好在获得和失去领导的同时等待。
     */
    
    private Thread leader = null;

    /**
     * Condition signalled when a newer element becomes available
     * at the head of the queue or a new thread may need to
     * become leader.
     */
    //当队列前端有更新的元素或新线程可能需要成为leader时,条件就会发出信号。
    private final Condition available = lock.newCondition();

    /**
     * Creates a new <tt>DelayQueue</tt> that is initially empty.
     */
    public DelayQueue() {}

    /**
     * Creates a <tt>DelayQueue</tt> initially containing the elements of the
     * given collection of {@link Delayed} instances.
     *
     * @param c the collection of elements to initially contain
     * @throws NullPointerException if the specified collection or any
     *         of its elements are null
     */
    public DelayQueue(Collection<? extends E> c) {
        this.addAll(c);
    }
 }
  • DelayQueue 泛型实现Delayed接口
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    
    public interface Delayed extends Comparable<Delayed> {
        
        /**
         * Returns the remaining delay associated with this object, in the
         * given time unit.
         * 在给定的时间单元中返回与该对象关联的剩余延迟。
         * @param unit the time unit
         * @return the remaining delay; zero or negative values indicate
         * that the delay has already elapsed
         */
        long getDelay(TimeUnit unit);
    }
    
    

    DelayQueue中元素都要继承Delayed接口

二、入队

  • offer(E e)
  • offer(E e, long timeout, TimeUnit unit) 将指定元素插入该队列的尾部,如果队列已满,DelayQueue不支持等待时间,主要数据元素保存到优先队列中
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
    /**
     * Inserts the specified element into this delay queue.
     * 将指定的元素插入到这个延迟队列中。
     * @param e the element to add
     * @return <tt>true</tt>
     * @throws NullPointerException if the specified element is null
     */
    public boolean offer(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            q.offer(e);
            if (q.peek() == e) {
                leader = null;
                available.signal();
            }
            return true;
        } finally {
            lock.unlock();
        }
    }

 /**
  * Inserts the specified element into this delay queue. As the queue is
  * unbounded this method will never block.
  * 将指定的元素插入到这个延迟队列中。由于队列是无限的,此方法将永不阻塞。
  * @param e the element to add
  * @param timeout This parameter is ignored as the method never blocks
  * @param unit This parameter is ignored as the method never blocks
  * @return <tt>true</tt>
  * @throws NullPointerException {@inheritDoc}
  */
 public boolean offer(E e, long timeout, TimeUnit unit) {
     return offer(e);
 }    

2. add方法

add(E e) 插入此优先队列,队列满了,DelayQueue 不支持throw IllegalStateException异常

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
    /**
     * Inserts the specified element into this delay queue.
     * 将指定的元素插入到这个延迟队列中。
     * @param e the element to add
     * @return <tt>true</tt> (as specified by {@link Collection#add})
     * @throws NullPointerException if the specified element is null
     */
    public boolean add(E e) {
        return offer(e);
    }

3. put方法

  • 将指定的元素插入到这个延迟队列中。由于队列是无限的,此方法将永不阻塞。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
    /**
     * Inserts the specified element into this delay queue. As the queue is
     * unbounded this method will never block.
     * 将指定的元素插入到这个延迟队列中。由于队列是无限的,此方法将永不阻塞。
     * @param e the element to add
     * @throws NullPointerException {@inheritDoc}
     */
    public void put(E e) {
        offer(e);
    }

三、出队

1. poll方法

  • poll() 获取并移除此队列的头,如果此队列为空,则返回 null
  • E poll(long timeout, TimeUnit unit) 获取并移除此队列的头部,队列为空时,在指定的等待时间前等待可用的元素。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
 /**
     * Retrieves and removes the head of this queue, or returns <tt>null</tt>
     * if this queue has no elements with an expired delay.
     * 检索并删除该队列的头部,或者返回< tt > null </ tt >,如果此队列没有过期延迟的元素。
     * @return the head of this queue, or <tt>null</tt> if this
     *         queue has no elements with an expired delay
     */
    public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            E first = q.peek();  //获取第一个元素,优先队列peek()方法不删除元素
            if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0) //获取元素不为空时,判断元素没有过期延迟。
                return null;
            else
                return q.poll();
        } finally {
            lock.unlock();
        }
    }
 /**
     * Retrieves and removes the head of this queue, waiting if necessary
     * until an element with an expired delay is available on this queue,
     * or the specified wait time expires.
     * 检索并删除该队列的头部,如果需要,则等待,直到该队列上有过期延迟的元素,或者指定的等待时间过期。
     * @return the head of this queue, or <tt>null</tt> if the
     *         specified waiting time elapses before an element with
     *         an expired delay becomes available
     * @throws InterruptedException {@inheritDoc}
     */
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                E first = q.peek();
                if (first == null) { // 获取元素为null时候,条件队列等待
                    if (nanos <= 0) 
                        return null;
                    else
                        nanos = available.awaitNanos(nanos);
                } else {
                    long delay = first.getDelay(TimeUnit.NANOSECONDS); 
                    if (delay <= 0) //过期延迟
                        return q.poll();
                    if (nanos <= 0) //超过等待,返回null
                        return null;
                    if (nanos < delay || leader != null) // 
                        nanos = available.awaitNanos(nanos);
                    else {
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            long timeLeft = available.awaitNanos(delay);
                            nanos -= delay - timeLeft; //算出剩余等待时间
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && q.peek() != null)
                available.signal();
            lock.unlock();
        }
    }

2. take方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
    /**
     * Retrieves and removes the head of this queue, waiting if necessary
     * until an element with an expired delay is available on this queue.
     * 检索并删除此队列的头部,如果需要,则等待,直到该队列上有过期延迟的元素为止。
     * @return the head of this queue
     * @throws InterruptedException {@inheritDoc}
     */
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                E first = q.peek();
                if (first == null)
                    available.await();
                else {
                    long delay = first.getDelay(TimeUnit.NANOSECONDS);
                    if (delay <= 0)
                        return q.poll();
                    else if (leader != null)
                        available.await();
                    else {
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            available.awaitNanos(delay);
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && q.peek() != null)
                available.signal();
            lock.unlock();
        }
    }

3. peek方法

peek方法是java.util.Queue接口方法。获取队列的头部,不删除队列头部元素。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
    /**
     * Retrieves, but does not remove, the head of this queue, or
     * returns <tt>null</tt> if this queue is empty.  Unlike
     * <tt>poll</tt>, if no expired elements are available in the queue,
     * this method returns the element that will expire next,
     * if one exists.
     * 如果此队列为空,则检索该队列的头部,但不删除该队列的头,或返回< tt > null </ tt >。与< tt >poll< / tt >不同,
     * 如果队列中没有过期的元素,该方法将返回将在下一次过期的元素,如果存在的话。
     * @return the head of this queue, or <tt>null</tt> if this
     *         queue is empty.
     */
    public E peek() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return q.peek();
        } finally {
            lock.unlock();
        }
    }

三、移除队列

1. remove方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
    /**
     * Removes a single instance of the specified element from this
     * queue, if it is present, whether or not it has expired.
     */
    public boolean remove(Object o) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return q.remove(o);
        } finally {
            lock.unlock();
        }
    }

2. clear方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
    /**
     * Atomically removes all of the elements from this delay queue.
     * The queue will be empty after this call returns.
     * Elements with an unexpired delay are not waited for; they are
     * simply discarded from the queue.
     * 原子地从这个延迟队列中删除所有的元素。此调用返回后,队列将为空。未过期的元素没有等待;它们被简单地从队列中丢弃。
     */
    public void clear() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            q.clear();
        } finally {
            lock.unlock();
        }
    }

3. drainTo方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
  /**
     * @throws UnsupportedOperationException {@inheritDoc}
     * @throws ClassCastException            {@inheritDoc}
     * @throws NullPointerException          {@inheritDoc}
     * @throws IllegalArgumentException      {@inheritDoc}
     */
    public int drainTo(Collection<? super E> c) {
        if (c == null)
            throw new NullPointerException();
        if (c == this)
            throw new IllegalArgumentException();
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            int n = 0;
            for (;;) {
                E first = q.peek();
                if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0) //未过期延迟,终止
                    break;
                c.add(q.poll());
                ++n;
            }
            return n;
        } finally {
            lock.unlock();
        }
    }

    /**
     * @throws UnsupportedOperationException {@inheritDoc}
     * @throws ClassCastException            {@inheritDoc}
     * @throws NullPointerException          {@inheritDoc}
     * @throws IllegalArgumentException      {@inheritDoc}
     */
    public int drainTo(Collection<? super E> c, int maxElements) {
        if (c == null)
            throw new NullPointerException();
        if (c == this)
            throw new IllegalArgumentException();
        if (maxElements <= 0)
            return 0;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            int n = 0;
            while (n < maxElements) {
                E first = q.peek();
                if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0) //未过期延迟,终止
                    break;
                c.add(q.poll());
                ++n;
            }
            return n;
        } finally {
            lock.unlock();
        }
    }

五、Iterator 迭代器实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
    private class Itr implements Iterator<E> {
        final Object[] array; // Array of all elements 数组的所有元素
        int cursor;           // index of next element to return; 下一个元素的返回索引;
        int lastRet;          // index of last element, or -1 if no such 最后一个元素的索引,或者- 1,如果没有

        Itr(Object[] array) {
            lastRet = -1;
            this.array = array;
        }

        public boolean hasNext() {
            return cursor < array.length;
        }

        @SuppressWarnings("unchecked")
        public E next() {
            if (cursor >= array.length)
                throw new NoSuchElementException();
            lastRet = cursor;
            return (E)array[cursor++];
        }

        public void remove() {
            if (lastRet < 0)
                throw new IllegalStateException();
            Object x = array[lastRet];
            lastRet = -1;
            // Traverse underlying queue to find == element,
            // not just a .equals element.
            lock.lock();
            try {
                for (Iterator it = q.iterator(); it.hasNext(); ) { //迭代优先队列
                    if (it.next() == x) {
                        it.remove();
                        return;
                    }
                }
            } finally {
                lock.unlock();
            }
        }
    }