【注意】最后更新于 June 15, 2017,文中内容可能已过时,请谨慎使用。
DelayQueue(延时队列)源码分析
简介
DelayQueue是一种无界的阻塞队列,队列里只允许放入可以"延期"的元素,队列中列头的元素是最先"到期"的元素。
如果队列中没有任何元素"到期”,尽管队列中有元素,也不能从队列头获取到任何元素。
一、DelayQueue 内部实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
|
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E> {
//重入锁
private transient final ReentrantLock lock = new ReentrantLock();
//优先级队列
private final PriorityQueue<E> q = new PriorityQueue<E>();
/**
* Thread designated to wait for the element at the head of
* the queue. This variant of the Leader-Follower pattern
* (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
* minimize unnecessary timed waiting. When a thread becomes
* the leader, it waits only for the next delay to elapse, but
* other threads await indefinitely. The leader thread must
* signal some other thread before returning from take() or
* poll(...), unless some other thread becomes leader in the
* interim. Whenever the head of the queue is replaced with
* an element with an earlier expiration time, the leader
* field is invalidated by being reset to null, and some
* waiting thread, but not necessarily the current leader, is
* signalled. So waiting threads must be prepared to acquire
* and lose leadership while waiting.
* 指定在队列头部等待元素的线程。Leader-Follower模式的这种变体用于最小化不必要的定时等待.
* (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/)
* 当一个线程成为领导者时,它只等待下一个延迟,但其他线程则无限期地等待。
* 领导者线程必须在从take()或poll(...)返回之前发出其他线程的信号,
* 除非其他线程在过渡期间成为领导者。每当队列的头部被一个具有较早到期时间的元素替换时,
* 领导者字段被重置为空而失效,并且一些等待线程(但不一定是当前领导者)被发信号通知。
* 所以等待的线程必须准备好在获得和失去领导的同时等待。
*/
private Thread leader = null;
/**
* Condition signalled when a newer element becomes available
* at the head of the queue or a new thread may need to
* become leader.
*/
//当队列前端有更新的元素或新线程可能需要成为leader时,条件就会发出信号。
private final Condition available = lock.newCondition();
/**
* Creates a new <tt>DelayQueue</tt> that is initially empty.
*/
public DelayQueue() {}
/**
* Creates a <tt>DelayQueue</tt> initially containing the elements of the
* given collection of {@link Delayed} instances.
*
* @param c the collection of elements to initially contain
* @throws NullPointerException if the specified collection or any
* of its elements are null
*/
public DelayQueue(Collection<? extends E> c) {
this.addAll(c);
}
}
|
二、入队
- offer(E e)
- offer(E e, long timeout, TimeUnit unit) 将指定元素插入该队列的尾部,如果队列已满,DelayQueue不支持等待时间,主要数据元素保存到优先队列中
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
/**
* Inserts the specified element into this delay queue.
* 将指定的元素插入到这个延迟队列中。
* @param e the element to add
* @return <tt>true</tt>
* @throws NullPointerException if the specified element is null
*/
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.offer(e);
if (q.peek() == e) {
leader = null;
available.signal();
}
return true;
} finally {
lock.unlock();
}
}
/**
* Inserts the specified element into this delay queue. As the queue is
* unbounded this method will never block.
* 将指定的元素插入到这个延迟队列中。由于队列是无限的,此方法将永不阻塞。
* @param e the element to add
* @param timeout This parameter is ignored as the method never blocks
* @param unit This parameter is ignored as the method never blocks
* @return <tt>true</tt>
* @throws NullPointerException {@inheritDoc}
*/
public boolean offer(E e, long timeout, TimeUnit unit) {
return offer(e);
}
|
2. add方法
add(E e) 插入此优先队列,队列满了,DelayQueue 不支持throw IllegalStateException异常
1
2
3
4
5
6
7
8
9
10
|
/**
* Inserts the specified element into this delay queue.
* 将指定的元素插入到这个延迟队列中。
* @param e the element to add
* @return <tt>true</tt> (as specified by {@link Collection#add})
* @throws NullPointerException if the specified element is null
*/
public boolean add(E e) {
return offer(e);
}
|
3. put方法
- 将指定的元素插入到这个延迟队列中。由于队列是无限的,此方法将永不阻塞。
1
2
3
4
5
6
7
8
9
10
|
/**
* Inserts the specified element into this delay queue. As the queue is
* unbounded this method will never block.
* 将指定的元素插入到这个延迟队列中。由于队列是无限的,此方法将永不阻塞。
* @param e the element to add
* @throws NullPointerException {@inheritDoc}
*/
public void put(E e) {
offer(e);
}
|
三、出队
1. poll方法
- poll() 获取并移除此队列的头,如果此队列为空,则返回 null
- E poll(long timeout, TimeUnit unit) 获取并移除此队列的头部,队列为空时,在指定的等待时间前等待可用的元素。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
|
/**
* Retrieves and removes the head of this queue, or returns <tt>null</tt>
* if this queue has no elements with an expired delay.
* 检索并删除该队列的头部,或者返回< tt > null </ tt >,如果此队列没有过期延迟的元素。
* @return the head of this queue, or <tt>null</tt> if this
* queue has no elements with an expired delay
*/
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E first = q.peek(); //获取第一个元素,优先队列peek()方法不删除元素
if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0) //获取元素不为空时,判断元素没有过期延迟。
return null;
else
return q.poll();
} finally {
lock.unlock();
}
}
/**
* Retrieves and removes the head of this queue, waiting if necessary
* until an element with an expired delay is available on this queue,
* or the specified wait time expires.
* 检索并删除该队列的头部,如果需要,则等待,直到该队列上有过期延迟的元素,或者指定的等待时间过期。
* @return the head of this queue, or <tt>null</tt> if the
* specified waiting time elapses before an element with
* an expired delay becomes available
* @throws InterruptedException {@inheritDoc}
*/
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null) { // 获取元素为null时候,条件队列等待
if (nanos <= 0)
return null;
else
nanos = available.awaitNanos(nanos);
} else {
long delay = first.getDelay(TimeUnit.NANOSECONDS);
if (delay <= 0) //过期延迟
return q.poll();
if (nanos <= 0) //超过等待,返回null
return null;
if (nanos < delay || leader != null) //
nanos = available.awaitNanos(nanos);
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
long timeLeft = available.awaitNanos(delay);
nanos -= delay - timeLeft; //算出剩余等待时间
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}
|
2. take方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
/**
* Retrieves and removes the head of this queue, waiting if necessary
* until an element with an expired delay is available on this queue.
* 检索并删除此队列的头部,如果需要,则等待,直到该队列上有过期延迟的元素为止。
* @return the head of this queue
* @throws InterruptedException {@inheritDoc}
*/
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null)
available.await();
else {
long delay = first.getDelay(TimeUnit.NANOSECONDS);
if (delay <= 0)
return q.poll();
else if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}
|
3. peek方法
peek方法是java.util.Queue接口方法。获取队列的头部,不删除队列头部元素。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
/**
* Retrieves, but does not remove, the head of this queue, or
* returns <tt>null</tt> if this queue is empty. Unlike
* <tt>poll</tt>, if no expired elements are available in the queue,
* this method returns the element that will expire next,
* if one exists.
* 如果此队列为空,则检索该队列的头部,但不删除该队列的头,或返回< tt > null </ tt >。与< tt >poll< / tt >不同,
* 如果队列中没有过期的元素,该方法将返回将在下一次过期的元素,如果存在的话。
* @return the head of this queue, or <tt>null</tt> if this
* queue is empty.
*/
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return q.peek();
} finally {
lock.unlock();
}
}
|
三、移除队列
1. remove方法
1
2
3
4
5
6
7
8
9
10
11
12
13
|
/**
* Removes a single instance of the specified element from this
* queue, if it is present, whether or not it has expired.
*/
public boolean remove(Object o) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return q.remove(o);
} finally {
lock.unlock();
}
}
|
2. clear方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
/**
* Atomically removes all of the elements from this delay queue.
* The queue will be empty after this call returns.
* Elements with an unexpired delay are not waited for; they are
* simply discarded from the queue.
* 原子地从这个延迟队列中删除所有的元素。此调用返回后,队列将为空。未过期的元素没有等待;它们被简单地从队列中丢弃。
*/
public void clear() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.clear();
} finally {
lock.unlock();
}
}
|
3. drainTo方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
|
/**
* @throws UnsupportedOperationException {@inheritDoc}
* @throws ClassCastException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
* @throws IllegalArgumentException {@inheritDoc}
*/
public int drainTo(Collection<? super E> c) {
if (c == null)
throw new NullPointerException();
if (c == this)
throw new IllegalArgumentException();
final ReentrantLock lock = this.lock;
lock.lock();
try {
int n = 0;
for (;;) {
E first = q.peek();
if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0) //未过期延迟,终止
break;
c.add(q.poll());
++n;
}
return n;
} finally {
lock.unlock();
}
}
/**
* @throws UnsupportedOperationException {@inheritDoc}
* @throws ClassCastException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
* @throws IllegalArgumentException {@inheritDoc}
*/
public int drainTo(Collection<? super E> c, int maxElements) {
if (c == null)
throw new NullPointerException();
if (c == this)
throw new IllegalArgumentException();
if (maxElements <= 0)
return 0;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int n = 0;
while (n < maxElements) {
E first = q.peek();
if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0) //未过期延迟,终止
break;
c.add(q.poll());
++n;
}
return n;
} finally {
lock.unlock();
}
}
|
五、Iterator 迭代器实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
private class Itr implements Iterator<E> {
final Object[] array; // Array of all elements 数组的所有元素
int cursor; // index of next element to return; 下一个元素的返回索引;
int lastRet; // index of last element, or -1 if no such 最后一个元素的索引,或者- 1,如果没有
Itr(Object[] array) {
lastRet = -1;
this.array = array;
}
public boolean hasNext() {
return cursor < array.length;
}
@SuppressWarnings("unchecked")
public E next() {
if (cursor >= array.length)
throw new NoSuchElementException();
lastRet = cursor;
return (E)array[cursor++];
}
public void remove() {
if (lastRet < 0)
throw new IllegalStateException();
Object x = array[lastRet];
lastRet = -1;
// Traverse underlying queue to find == element,
// not just a .equals element.
lock.lock();
try {
for (Iterator it = q.iterator(); it.hasNext(); ) { //迭代优先队列
if (it.next() == x) {
it.remove();
return;
}
}
} finally {
lock.unlock();
}
}
}
|