1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
|
/**
* Base class for the various sequencer types (single/multi). Provides
* common functionality like the management of gating sequences (add/remove) and
* ownership of the current cursor.
* 各种sequencer类型的基类(single / multi)。提供常见的功能,如管理控制序列(添加/删除)和当前游标的所有权。
*/
public abstract class AbstractSequencer implements Sequencer
{
private static final AtomicReferenceFieldUpdater<AbstractSequencer, Sequence[]> SEQUENCE_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(AbstractSequencer.class, Sequence[].class, "gatingSequences"); //原子操作更新gatingSequences属性
protected final int bufferSize; // 大小
protected final WaitStrategy waitStrategy; //等待策略
// 当作生产序列
protected final Sequence cursor = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); //当前序列
// 当作消费序列
protected volatile Sequence[] gatingSequences = new Sequence[0]; //控制序列数组
/**
* Create with the specified buffer size and wait strategy.
* 使用指定的缓冲区大小和等待策略创建
* @param bufferSize The total number of entries, must be a positive power of 2.
* @param waitStrategy The wait strategy used by this sequencer
*/
public AbstractSequencer(int bufferSize, WaitStrategy waitStrategy)
{
if (bufferSize < 1)
{
throw new IllegalArgumentException("bufferSize must not be less than 1");
}
if (Integer.bitCount(bufferSize) != 1)
{
throw new IllegalArgumentException("bufferSize must be a power of 2");
}
this.bufferSize = bufferSize;
this.waitStrategy = waitStrategy;
}
/**
* @see Sequencer#getCursor()
*/
@Override
public final long getCursor()
{
return cursor.get();
}
/**
* @see Sequencer#getBufferSize()
*/
@Override
public final int getBufferSize()
{
return bufferSize;
}
/**
* @see Sequencer#addGatingSequences(Sequence...)
*/
@Override
public final void addGatingSequences(Sequence... gatingSequences)
{
SequenceGroups.addSequences(this, SEQUENCE_UPDATER, this, gatingSequences);
}
/**
* @see Sequencer#removeGatingSequence(Sequence)
*/
@Override
public boolean removeGatingSequence(Sequence sequence)
{
return SequenceGroups.removeSequence(this, SEQUENCE_UPDATER, sequence);
}
/**
* @see Sequencer#getMinimumSequence()
*/
@Override
public long getMinimumSequence()
{
return Util.getMinimumSequence(gatingSequences, cursor.get());
}
/**
* @see Sequencer#newBarrier(Sequence...)
*/
@Override
public SequenceBarrier newBarrier(Sequence... sequencesToTrack)
{
return new ProcessingSequenceBarrier(this, waitStrategy, cursor, sequencesToTrack);
}
/**
* 为此序列创建一个事件轮询器,它将使用提供的数据提供程序和要门控的序列
*
* @param dataProvider 此事件轮询器用户的数据源
* @param gatingSequences 要门控的序列
* @return A poller that will gate on this ring buffer and the supplied sequences.
*/
@Override
public <T> EventPoller<T> newPoller(DataProvider<T> dataProvider, Sequence... gatingSequences)
{
return EventPoller.newInstance(dataProvider, this, new Sequence(), cursor, gatingSequences);
}
|