【注意】最后更新于 May 15, 2018,文中内容可能已过时,请谨慎使用。
SequenceBarrier分析
简介
序列屏障由序列发生器产生,包含对序列发生器的主要发布序列和任何相关消费者的序列的引用。它包含确定消费者是否有任何事件可供处理的逻辑。
一、SequenceBarrier 接口说明
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
|
/**
* Coordination barrier for tracking the cursor for publishers and sequence of
* dependent {@link EventProcessor}s for processing a data structure
*/
public interface SequenceBarrier
{
/**
* Wait for the given sequence to be available for consumption.
* 等待给定的序列可供消费。
* @param sequence to wait for
* @return the sequence up to which is available
* @throws AlertException if a status change has occurred for the Disruptor
* @throws InterruptedException if the thread needs awaking on a condition variable.
* @throws TimeoutException if a timeout occurs while waiting for the supplied sequence.
*/
long waitFor(long sequence) throws AlertException, InterruptedException, TimeoutException;
/**
* Get the current cursor value that can be read.
* 获取可以读取的当前游标值。
* @return value of the cursor for entries that have been published.
*/
long getCursor();
/**
* The current alert status for the barrier.
*
* 当前屏障是否发过通知。
* @return true if in alert otherwise false.
*/
boolean isAlerted();
/**
* Alert the {@link EventProcessor}s of a status change and stay in this status until cleared.
* 通知事件处理者状态变化,然后停留在这个状态上,直到状态被清除。
*/
void alert();
/**
* Clear the current alert status.
* 清楚通知状态。
*/
void clearAlert();
/**
* Check if an alert has been raised and throw an {@link AlertException} if it has.
* 检测是否发生了通知,如果已经发生了抛出AlertException异常。
*
* @throws AlertException if alert has been raised.
*/
void checkAlert() throws AlertException;
}
|
二、ProcessingSequenceBarrier实现SequenceBarrier接口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
|
/**
* {@link SequenceBarrier} handed out for gating {@link EventProcessor}s on a cursor sequence and optional dependent {@link EventProcessor}(s),
* using the given WaitStrategy.
*/
final class ProcessingSequenceBarrier implements SequenceBarrier
{
private final WaitStrategy waitStrategy;
private final Sequence dependentSequence;
private volatile boolean alerted = false;
private final Sequence cursorSequence;
private final Sequencer sequencer;
ProcessingSequenceBarrier(
final Sequencer sequencer,
final WaitStrategy waitStrategy,
final Sequence cursorSequence,
final Sequence[] dependentSequences)
{
this.sequencer = sequencer;
this.waitStrategy = waitStrategy;
this.cursorSequence = cursorSequence;
if (0 == dependentSequences.length)
{
dependentSequence = cursorSequence;
}
else
{
dependentSequence = new FixedSequenceGroup(dependentSequences);
}
}
@Override
public long waitFor(final long sequence)
throws AlertException, InterruptedException, TimeoutException
{
checkAlert();
long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);
if (availableSequence < sequence)
{
return availableSequence;
}
return sequencer.getHighestPublishedSequence(sequence, availableSequence);
}
@Override
public long getCursor()
{
return dependentSequence.get();
}
@Override
public boolean isAlerted()
{
return alerted;
}
@Override
public void alert()
{
alerted = true;
waitStrategy.signalAllWhenBlocking();
}
@Override
public void clearAlert()
{
alerted = false;
}
@Override
public void checkAlert() throws AlertException
{
if (alerted)
{
throw AlertException.INSTANCE;
}
}
}
|
FixedSequenceGroup, 在单个序列背后隐藏一组序列.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
|
/**
* Hides a group of Sequences behind a single Sequence
*/
public final class FixedSequenceGroup extends Sequence
{
private final Sequence[] sequences;
/**
* Constructor
*
* @param sequences the list of sequences to be tracked under this sequence group
*/
public FixedSequenceGroup(Sequence[] sequences)
{
this.sequences = Arrays.copyOf(sequences, sequences.length);
}
/**
* Get the minimum sequence value for the group.
*
* @return the minimum sequence value for the group.
*/
@Override
public long get()
{
return Util.getMinimumSequence(sequences);
}
@Override
public String toString()
{
return Arrays.toString(sequences);
}
/**
* Not supported.
*/
@Override
public void set(long value)
{
throw new UnsupportedOperationException();
}
/**
* Not supported.
*/
@Override
public boolean compareAndSet(long expectedValue, long newValue)
{
throw new UnsupportedOperationException();
}
/**
* Not supported.
*/
@Override
public long incrementAndGet()
{
throw new UnsupportedOperationException();
}
/**
* Not supported.
*/
@Override
public long addAndGet(long increment)
{
throw new UnsupportedOperationException();
}
}
|