Sequencer分析

简介

Sequencer是Disruptor的真正核心。这个接口的2个实现(单生产者(SingleProducerSequencer),多生产者(MultiProducerSequencer))实现了所有的并发算法,用于在生产者和消费者之间快速正确地传递数据。

一、Sequencer接口说明

Sequencer接口UML图:

1.Cursored接口说明

获取当前的游标值。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
/**
 * Implementors of this interface must provide a single long value
 * that represents their current cursor value.  Used during dynamic
 * add/remove of Sequences from a
 * {@link SequenceGroups#addSequences(Object, java.util.concurrent.atomic.AtomicReferenceFieldUpdater, Cursored, Sequence...)}.
 * 
 * 这个接口的实现者必须提供一个表示它们当前游标值的单个long值。在动态add/remove序列时使用
 * {@link SequenceGroups#addSequences(Object, java.util.concurrent.atomic.AtomicReferenceFieldUpdater, Cursored, Sequence...)}。
 */
public interface Cursored
{
    /**
     * Get the current cursor value.
     * 获取当前的游标值。
     * @return current cursor value
     */
    long getCursor();
}

2.Sequenced接口说明

序列基本操作

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
public interface Sequenced
{
    /**
     * The capacity of the data structure to hold entries.
     *  RingBuffer 容量大小
     * @return the size of the RingBuffer.
     */
    int getBufferSize();

    /**
     * Has the buffer got capacity to allocate another sequence.  This is a concurrent
     * method so the response should only be taken as an indication of available capacity.
     * 判断是否还有给定的可用容量。
     * @param requiredCapacity in the buffer
     * @return true if the buffer has the capacity to allocate the next sequence otherwise false.
     */
    boolean hasAvailableCapacity(int requiredCapacity);

    /**
     * Get the remaining capacity for this sequencer.
     * 获取剩余容量。
     * @return The number of slots remaining.
     */
    long remainingCapacity();

    /**
     * Claim the next event in sequence for publishing.
     * 申请下一个序列值,用来发布事件。
     *
     * @return the claimed sequence value
     */
    long next();

    /**
     * Claim the next n events in sequence for publishing.  This is for batch event producing.  Using batch producing
     * requires a little care and some math.
     * <pre>
     * int n = 10;
     * long hi = sequencer.next(n);
     * long lo = hi - (n - 1);
     * for (long sequence = lo; sequence &lt;= hi; sequence++) {
     *     // Do work.
     * }
     * sequencer.publish(lo, hi);
     * </pre>
     * 申请下N个序列值,用来发布事件。
     * @param n the number of sequences to claim
     * @return the highest claimed sequence value
     */
    long next(int n);

    /**
     * Attempt to claim the next event in sequence for publishing.  Will return the
     * number of the slot if there is at least <code>requiredCapacity</code> slots
     * available.
     * 尝试申请下一个序列值用来发布事件,这个是无阻塞的方法。
     * @return the claimed sequence value
     * @throws InsufficientCapacityException thrown if there is no space available in the ring buffer.
     */
    long tryNext() throws InsufficientCapacityException;

    /**
     * Attempt to claim the next n events in sequence for publishing.  Will return the
     * highest numbered slot if there is at least <code>requiredCapacity</code> slots
     * available.  Have a look at {@link Sequencer#next()} for a description on how to
     * use this method.
     * 尝试申请下N个序列值用来发布事件,这个是无阻塞的方法。
     * 
     * @param n the number of sequences to claim
     * @return the claimed sequence value
     * @throws InsufficientCapacityException thrown if there is no space available in the ring buffer.
     */
    long tryNext(int n) throws InsufficientCapacityException;

    /**
     * Publishes a sequence. Call when the event has been filled.
     * 发布一个序列。当事件被填满时调用。
     * @param sequence the sequence to be published.
     */
    void publish(long sequence);

    /**
     * Batch publish sequences.  Called when all of the events have been filled.
     * 
     * 批量发布序列。当所有事件都被填满时调用。
     * @param lo first sequence number to publish
     * @param hi last sequence number to publish
     */
    void publish(long lo, long hi);
}

2.Sequencer接口说明

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95

/**
* Coordinates claiming sequences for access to a data structure while tracking dependent {@link Sequence}s
*/
public interface Sequencer extends Cursored, Sequenced
{
  /**
   * Set to -1 as sequence starting point
   * 序列初始值
   */
  long INITIAL_CURSOR_VALUE = -1L;

  /**
   * Claim a specific sequence.  Only used if initialising the ring buffer to
   * a specific value.
   * 声明一个特定的序列。这个方法在初始化RingBuffer的时候被调用。
   * @param sequence The sequence to initialise too.
   */
  void claim(long sequence);

  /**
   * Confirms if a sequence is published and the event is available for use; non-blocking.
   * 判断一个序列是否被发布,并且发布到序列上的事件是可处理的。非阻塞方法。
   *
   * @param sequence of the buffer to check
   * @return true if the sequence is available for use, false if not
   */
  boolean isAvailable(long sequence);

  /**
   * Add the specified gating sequences to this instance of the Disruptor.  They will
   * safely and atomically added to the list of gating sequences.
   *  添加一些追踪序列到当前实例,添加过程是原子的。
   * 这些控制序列一般是其他组件的序列,当前实例可以通过这些
   * 序列来查看其他组件的序列使用情况。
   * @param gatingSequences The sequences to add.
   */
  void addGatingSequences(Sequence... gatingSequences);

  /**
   * Remove the specified sequence from this sequencer.
   * 移除控制序列。
   * @param sequence to be removed.
   * @return <tt>true</tt> if this sequence was found, <tt>false</tt> otherwise.
   */
  boolean removeGatingSequence(Sequence sequence);

  /**
   * Create a new SequenceBarrier to be used by an EventProcessor to track which messages
   * are available to be read from the ring buffer given a list of sequences to track.
   *  基于给定的追踪序列创建一个序列栅栏,这个栅栏是提供给EventProcesso
   *  在判断Ringbuffer上某个事件是否能处理时使用的。
   * @param sequencesToTrack All of the sequences that the newly constructed barrier will wait on.
   * @return A sequence barrier that will track the specified sequences.
   * @see SequenceBarrier
   */
  SequenceBarrier newBarrier(Sequence... sequencesToTrack);

  /**
   * Get the minimum sequence value from all of the gating sequences
   * added to this ringBuffer.
   * 获取控制序列里面当前最小的序列值。
   * @return The minimum gating sequence or the cursor sequence if
   * no sequences have been added.
   */
  long getMinimumSequence();

  /**
   * Get the highest sequence number that can be safely read from the ring buffer.  Depending
   * on the implementation of the Sequencer this call may need to scan a number of values
   * in the Sequencer.  The scan will range from nextSequence to availableSequence.  If
   * there are no available values <code>&gt;= nextSequence</code> the return value will be
   * <code>nextSequence - 1</code>.  To work correctly a consumer should pass a value that
   * is 1 higher than the last sequence that was successfully processed.
   *
   * 获取RingBuffer上安全使用的最大的序列值。
   * 具体实现里面,这个调用可能需要序列上从nextSequence到availableSequence之间的值。
   * 如果没有比nextSequence大的可用序列,会返回nextSequence - 1。
   * 为了保证正确,事件处理者应该传递一个比最后的序列值大1个单位的序列来处理。
   * 
   * @param nextSequence      The sequence to start scanning from.
   * @param availableSequence The sequence to scan to.
   * @return The highest value that can be safely read, will be at least <code>nextSequence - 1</code>.
   */
  long getHighestPublishedSequence(long nextSequence, long availableSequence);

    /**
      *  通过给定的数据提供者和控制序列来创建一个EventPoller
      * @param provider
      * @param <T>
      * @param gatingSequences
      * @return 
      */
  <T> EventPoller<T> newPoller(DataProvider<T> provider, Sequence... gatingSequences);
}

二、AbstractSequencer 抽象类

AbstractSequencer实现Sequencer接口

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
/**
 * Base class for the various sequencer types (single/multi).  Provides
 * common functionality like the management of gating sequences (add/remove) and
 * ownership of the current cursor.
 * 各种sequencer类型的基类(single / multi)。提供常见的功能,如管理控制序列(添加/删除)和当前游标的所有权。
 */
public abstract class AbstractSequencer implements Sequencer
{
    private static final AtomicReferenceFieldUpdater<AbstractSequencer, Sequence[]> SEQUENCE_UPDATER =
        AtomicReferenceFieldUpdater.newUpdater(AbstractSequencer.class, Sequence[].class, "gatingSequences"); //原子操作更新gatingSequences属性

    protected final int bufferSize;              // 大小
    protected final WaitStrategy waitStrategy;   //等待策略
    // 当作生产序列
    protected final Sequence cursor = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);  //当前序列
    // 当作消费序列
    protected volatile Sequence[] gatingSequences = new Sequence[0];     //控制序列数组

    /**
     * Create with the specified buffer size and wait strategy.
     * 使用指定的缓冲区大小和等待策略创建
     * @param bufferSize   The total number of entries, must be a positive power of 2.
     * @param waitStrategy The wait strategy used by this sequencer
     */
    public AbstractSequencer(int bufferSize, WaitStrategy waitStrategy)
    {
        if (bufferSize < 1)
        {
            throw new IllegalArgumentException("bufferSize must not be less than 1");
        }
        if (Integer.bitCount(bufferSize) != 1)
        {
            throw new IllegalArgumentException("bufferSize must be a power of 2");
        }

        this.bufferSize = bufferSize;
        this.waitStrategy = waitStrategy;
    }

    /**
     * @see Sequencer#getCursor()
     */
    @Override
    public final long getCursor()
    {
        return cursor.get();
    }

    /**
     * @see Sequencer#getBufferSize()
     */
    @Override
    public final int getBufferSize()
    {
        return bufferSize;
    }

    /**
     * @see Sequencer#addGatingSequences(Sequence...)
     */
    @Override
    public final void addGatingSequences(Sequence... gatingSequences)
    {
        SequenceGroups.addSequences(this, SEQUENCE_UPDATER, this, gatingSequences);
    }

    /**
     * @see Sequencer#removeGatingSequence(Sequence)
     */
    @Override
    public boolean removeGatingSequence(Sequence sequence)
    {
        return SequenceGroups.removeSequence(this, SEQUENCE_UPDATER, sequence);
    }

    /**
     * @see Sequencer#getMinimumSequence()
     */
    @Override
    public long getMinimumSequence()
    {
        return Util.getMinimumSequence(gatingSequences, cursor.get());
    }

    /**
     * @see Sequencer#newBarrier(Sequence...)
     */
    @Override
    public SequenceBarrier newBarrier(Sequence... sequencesToTrack)
    {
        return new ProcessingSequenceBarrier(this, waitStrategy, cursor, sequencesToTrack);
    }

    /**
     * 为此序列创建一个事件轮询器,它将使用提供的数据提供程序和要门控的序列
     *
     * @param dataProvider    此事件轮询器用户的数据源
     * @param gatingSequences 要门控的序列
     * @return A poller that will gate on this ring buffer and the supplied sequences.
     */
    @Override
    public <T> EventPoller<T> newPoller(DataProvider<T> dataProvider, Sequence... gatingSequences)
    {
        return EventPoller.newInstance(dataProvider, this, new Sequence(), cursor, gatingSequences);
    }

三、SingleProducerSequencer 单个生产者,不存在并发情况

1.SingleProducerSequencer 构造

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
 //缓存行填充56字节
abstract class SingleProducerSequencerPad extends AbstractSequencer
{
    protected long p1, p2, p3, p4, p5, p6, p7;

    SingleProducerSequencerPad(int bufferSize, WaitStrategy waitStrategy)
    {
        super(bufferSize, waitStrategy);
    }
}

abstract class SingleProducerSequencerFields extends SingleProducerSequencerPad
{
    SingleProducerSequencerFields(int bufferSize, WaitStrategy waitStrategy)
    {
        super(bufferSize, waitStrategy);
    }

    /**
     * Set to -1 as sequence starting point
     */
    long nextValue = Sequence.INITIAL_VALUE;
    long cachedValue = Sequence.INITIAL_VALUE;
}

public final class SingleProducerSequencer extends SingleProducerSequencerFields
{
    protected long p1, p2, p3, p4, p5, p6, p7;

    /**
     * Construct a Sequencer with the selected wait strategy and buffer size.
     *
     * @param bufferSize   the size of the buffer that this will sequence over.
     * @param waitStrategy for those waiting on sequences.
     */
    public SingleProducerSequencer(int bufferSize, WaitStrategy waitStrategy)
    {
        super(bufferSize, waitStrategy);
    }
}

2.hasAvailableCapacity 判断是否还有给定的可用容量。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
  /**
     * @see Sequencer#hasAvailableCapacity(int)
     */
    @Override
    public boolean hasAvailableCapacity(int requiredCapacity)
    {
        return hasAvailableCapacity(requiredCapacity, false);
    }

    private boolean hasAvailableCapacity(int requiredCapacity, boolean doStore)
    {
        long nextValue = this.nextValue;

        long wrapPoint = (nextValue + requiredCapacity) - bufferSize;
        long cachedGatingSequence = this.cachedValue;

        if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)
        {
            if (doStore)
            {
                cursor.setVolatile(nextValue);  // StoreLoad fence
            }

            long minSequence = Util.getMinimumSequence(gatingSequences, nextValue);
            this.cachedValue = minSequence;

            if (wrapPoint > minSequence)
            {
                return false;
            }
        }

        return true;
    } 
  • nextValue当作是生产索引,cachedGatingSequence当作是消费索引,bufferSize当作环性结构大小,requiredCapacity
  • 消费索引大于生产索引,不成立,没生产,消费者不能消费,没有数据;
  • 生产索引减消费索引还未消费数据,剩下空间就是生产者要生产数据空间;生产数据空间=bufferSize-(nextValue-cachedGatingSequence)
  • 请求生产数据空间大小不能大于生产数据空间,这个判断是有空间生产;
  • requiredCapacity <= bufferSize-(nextValue-cachedGatingSequence)
    • 转变成 (nextValue+requiredCapacity - bufferSize) <= cachedGatingSequence 成立。跟我们代码很像
    • 变成不成立 (nextValue+requiredCapacity - bufferSize) > cachedGatingSequence 跟我们代码一样把
  • 理解下面next方法简单多了
  • nextValue就是序列cursor的缓存值,用意:相当于申请空间数量,还没真正去写,写完cursor才能修改(这个过程叫发布);

3.next

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
 /**
     * @see Sequencer#next()
     */
    @Override
    public long next()
    {
        return next(1);
    }

    /**
     * @see Sequencer#next(int)
     */
    @Override
    public long next(int n)
    {
        if (n < 1)
        {
            throw new IllegalArgumentException("n must be > 0");
        }

        long nextValue = this.nextValue;

        long nextSequence = nextValue + n;
        long wrapPoint = nextSequence - bufferSize;//
        long cachedGatingSequence = this.cachedValue;

        if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)
        {
            //发布用cursor.set(StoreStore),可能消费线程没更新到
            //用StoreLoad消费线程看到
            cursor.setVolatile(nextValue);  // StoreLoad fence

            long minSequence;
            while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue)))
            {
                //策略释放资源操作,通知消费者消费
                waitStrategy.signalAllWhenBlocking();
                //释放当前CPU资源
                LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin?
            }

            this.cachedValue = minSequence;
        }

        this.nextValue = nextSequence;

        return nextSequence;
    }

4.tryNext

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

   /**
    * @see Sequencer#tryNext()
    */
   @Override
   public long tryNext() throws InsufficientCapacityException
   {
       return tryNext(1);
   }

   /**
    * @see Sequencer#tryNext(int)
    */
   @Override
   public long tryNext(int n) throws InsufficientCapacityException
   {
       if (n < 1)
       {
           throw new IllegalArgumentException("n must be > 0");
       }

       if (!hasAvailableCapacity(n, true))
       {
           throw InsufficientCapacityException.INSTANCE;
       }

       long nextSequence = this.nextValue += n;

       return nextSequence;
   }

5.其他实现方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
  /**
     * @see Sequencer#remainingCapacity()
     */
    @Override
    public long remainingCapacity()
    {
        long nextValue = this.nextValue;

        long consumed = Util.getMinimumSequence(gatingSequences, nextValue);
        long produced = nextValue;
        return getBufferSize() - (produced - consumed);
    }

    /**
     * @see Sequencer#claim(long)
     *  赋值操作
     */
    @Override
    public void claim(long sequence)
    {
        this.nextValue = sequence;
    }

    /**
     * @see Sequencer#publish(long)
     */
    @Override
    public void publish(long sequence)
    {
        cursor.set(sequence);
        waitStrategy.signalAllWhenBlocking();
    }

    /**
     * @see Sequencer#publish(long, long)
     */
    @Override
    public void publish(long lo, long hi)
    {
        publish(hi);
    }

    /**
     * @see Sequencer#isAvailable(long)
     *  可用判断消费者消费序列
     */
    @Override
    public boolean isAvailable(long sequence)
    {
        return sequence <= cursor.get();
    }

    @Override
    public long getHighestPublishedSequence(long lowerBound, long availableSequence)
    {
        return availableSequence;
    }

四、MultiProducerSequencer

1.MultiProducerSequencer 多个生产者

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71

public final class MultiProducerSequencer extends AbstractSequencer
{
    private static final Unsafe UNSAFE = Util.getUnsafe();
    private static final long BASE = UNSAFE.arrayBaseOffset(int[].class);
    private static final long SCALE = UNSAFE.arrayIndexScale(int[].class);

    private final Sequence gatingSequenceCache = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);

    // availableBuffer tracks the state of each ringbuffer slot
    // see below for more details on the approach
    //可用缓冲区跟踪每个ringbuffer槽的状态
    private final int[] availableBuffer;
    private final int indexMask;
    private final int indexShift;

    /**
     * Construct a Sequencer with the selected wait strategy and buffer size.
     *
     * @param bufferSize   the size of the buffer that this will sequence over.
     * @param waitStrategy for those waiting on sequences.
     */
    public MultiProducerSequencer(int bufferSize, final WaitStrategy waitStrategy)
    {
        super(bufferSize, waitStrategy);
        availableBuffer = new int[bufferSize];
        indexMask = bufferSize - 1;
        indexShift = Util.log2(bufferSize);
        initialiseAvailableBuffer();
    }
    
     private void initialiseAvailableBuffer()
     {
         for (int i = availableBuffer.length - 1; i != 0; i--)
         {
             setAvailableBufferValue(i, -1);
         }
 
         setAvailableBufferValue(0, -1);
     }  

     /**
      * The below methods work on the availableBuffer flag.
      * <p>
      * The prime reason is to avoid a shared sequence object between publisher threads.
      * (Keeping single pointers tracking start and end would require coordination
      * between the threads).
      * <p>
      * --  Firstly we have the constraint that the delta between the cursor and minimum
      * gating sequence will never be larger than the buffer size (the code in
      * next/tryNext in the Sequence takes care of that).
      * -- Given that; take the sequence value and mask off the lower portion of the
      * sequence as the index into the buffer (indexMask). (aka modulo operator)
      * -- The upper portion of the sequence becomes the value to check for availability.
      * ie: it tells us how many times around the ring buffer we've been (aka division)
      * -- Because we can't wrap without the gating sequences moving forward (i.e. the
      * minimum gating sequence is effectively our last available position in the
      * buffer), when we have new data and successfully claimed a slot we can simply
      * write over the top.
      */
     private void setAvailable(final long sequence)
     {
         setAvailableBufferValue(calculateIndex(sequence), calculateAvailabilityFlag(sequence));
     }
 
     private void setAvailableBufferValue(int index, int flag)
     {
         long bufferAddress = (index * SCALE) + BASE;
         UNSAFE.putOrderedInt(availableBuffer, bufferAddress, flag);
     }    
 }

2.hasAvailableCapacity 判断是否还有给定的可用容量。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
 /**
     * @see Sequencer#hasAvailableCapacity(int)
     */
    @Override
    public boolean hasAvailableCapacity(final int requiredCapacity)
    {
        return hasAvailableCapacity(gatingSequences, requiredCapacity, cursor.get());
    }

    private boolean hasAvailableCapacity(Sequence[] gatingSequences, final int requiredCapacity, long cursorValue)
    {
        long wrapPoint = (cursorValue + requiredCapacity) - bufferSize;
        long cachedGatingSequence = gatingSequenceCache.get();

        if (wrapPoint > cachedGatingSequence || cachedGatingSequence > cursorValue)
        {
            long minSequence = Util.getMinimumSequence(gatingSequences, cursorValue);
            gatingSequenceCache.set(minSequence);

            if (wrapPoint > minSequence)
            {
                return false;
            }
        }

        return true;
    }
  • cursor当作申请空间序列,Sequence运用CAS控制并发,发布是怎么处理呢?
  • 例如:线程A,申请空间数量2,线程B,申请空间数量2;发布时候,线程B先发布,消费按顺序读取,GG读是空了,线程A还没发布;
  • 一般处理,消费A,获取到这个序列,在去请求一下,这个序列是否发布(是否可用),没发布,等待!所以提供isAvailable方法,告诉消费者;
  • 看看,availableBuffer数组保存这个标示,告诉消费端,这个是可以滴;数组保存数组是标示位(0,1,位),就会存在一个问题,消费端修改这个位标示,导致线程可见性问题,还要更新消费端序列;
    • availableBuffer创建int数组,保存当前的序列数据往右移动indexShift位(seq»indexShift),保存数字标示位,判断是否可用;
    • 为什么不保存序列本身值呢?long类型,占用资源;
    • indexShift=Util.log2(bufferSize),bufferSize=1024=2^10,indexShift=10

3.next

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
  /**
     * @see Sequencer#next()
     */
    @Override
    public long next()
    {
        return next(1);
    }

    /**
     * @see Sequencer#next(int)
     */
    @Override
    public long next(int n)
    {
        if (n < 1)
        {
            throw new IllegalArgumentException("n must be > 0");
        }

        long current;
        long next;

        do
        {
            current = cursor.get();
            next = current + n;

            long wrapPoint = next - bufferSize;
            long cachedGatingSequence = gatingSequenceCache.get();

            if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current)
            {
                long gatingSequence = Util.getMinimumSequence(gatingSequences, current);

                if (wrapPoint > gatingSequence)
                {
                    waitStrategy.signalAllWhenBlocking();
                    LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy?
                    continue;
                }

                gatingSequenceCache.set(gatingSequence);
            }
            else if (cursor.compareAndSet(current, next))
            {
                break;
            }
        }
        while (true);

        return next;
    }

3.tryNext

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
  /**
     * @see Sequencer#tryNext()
     */
    @Override
    public long tryNext() throws InsufficientCapacityException
    {
        return tryNext(1);
    }

    /**
     * @see Sequencer#tryNext(int)
     */
    @Override
    public long tryNext(int n) throws InsufficientCapacityException
    {
        if (n < 1)
        {
            throw new IllegalArgumentException("n must be > 0");
        }

        long current;
        long next;

        do
        {
            current = cursor.get();
            next = current + n;

            if (!hasAvailableCapacity(gatingSequences, n, current))
            {
                throw InsufficientCapacityException.INSTANCE;
            }
        }
        while (!cursor.compareAndSet(current, next));

        return next;
    }

5.其他实现方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
    /**
     * @see Sequencer#claim(long)
     */
    @Override
    public void claim(long sequence)
    {
        cursor.set(sequence);
    }
    /**
     * @see Sequencer#remainingCapacity()
     */
    @Override
    public long remainingCapacity()
    {
        long consumed = Util.getMinimumSequence(gatingSequences, cursor.get());
        long produced = cursor.get();
        return getBufferSize() - (produced - consumed);
    }
   /**
     * @see Sequencer#publish(long)
     */
    @Override
    public void publish(final long sequence)
    {
        setAvailable(sequence);
        waitStrategy.signalAllWhenBlocking();
    }

    /**
     * @see Sequencer#publish(long, long)
     */
    @Override
    public void publish(long lo, long hi)
    {
        for (long l = lo; l <= hi; l++)
        {
            setAvailable(l);
        }
        waitStrategy.signalAllWhenBlocking();
    }
 /**
     * @see Sequencer#isAvailable(long)
     */
    @Override
    public boolean isAvailable(long sequence)
    {
        int index = calculateIndex(sequence);
        int flag = calculateAvailabilityFlag(sequence);
        long bufferAddress = (index * SCALE) + BASE;
        return UNSAFE.getIntVolatile(availableBuffer, bufferAddress) == flag;
    }

    @Override
    public long getHighestPublishedSequence(long lowerBound, long availableSequence)
    {
        for (long sequence = lowerBound; sequence <= availableSequence; sequence++)
        {
            if (!isAvailable(sequence))
            {
                return sequence - 1;
            }
        }

        return availableSequence;
    }

    private int calculateAvailabilityFlag(final long sequence)
    {
        return (int) (sequence >>> indexShift);
    }

    private int calculateIndex(final long sequence)
    {
        return ((int) sequence) & indexMask;
    }    
  • 发布时,更新标示putOrderedInt
  • 判断可用时,getIntVolatile

五、SequenceGroups

SequenceGroups的帮助类,里面提供了addSequences和removeSequence方法,都是原子操作。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
/**
 * Provides static methods for managing a {@link SequenceGroup} object.
 */
class SequenceGroups
{
    static <T> void addSequences(
        final T holder,
        final AtomicReferenceFieldUpdater<T, Sequence[]> updater,
        final Cursored cursor,
        final Sequence... sequencesToAdd)
    {
        long cursorSequence;
        Sequence[] updatedSequences;
        Sequence[] currentSequences;

        do
        {
            currentSequences = updater.get(holder);
            updatedSequences = copyOf(currentSequences, currentSequences.length + sequencesToAdd.length);
            cursorSequence = cursor.getCursor();

            int index = currentSequences.length;
            for (Sequence sequence : sequencesToAdd)
            {
                sequence.set(cursorSequence);
                updatedSequences[index++] = sequence;
            }
        }
        while (!updater.compareAndSet(holder, currentSequences, updatedSequences));

        cursorSequence = cursor.getCursor();
        for (Sequence sequence : sequencesToAdd)
        {
            sequence.set(cursorSequence);
        }
    }

    static <T> boolean removeSequence(
        final T holder,
        final AtomicReferenceFieldUpdater<T, Sequence[]> sequenceUpdater,
        final Sequence sequence)
    {
        int numToRemove;
        Sequence[] oldSequences;
        Sequence[] newSequences;

        do
        {
            oldSequences = sequenceUpdater.get(holder);

            numToRemove = countMatching(oldSequences, sequence);

            if (0 == numToRemove)
            {
                break;
            }

            final int oldSize = oldSequences.length;
            newSequences = new Sequence[oldSize - numToRemove];

            for (int i = 0, pos = 0; i < oldSize; i++)
            {
                final Sequence testSequence = oldSequences[i];
                if (sequence != testSequence)
                {
                    newSequences[pos++] = testSequence;
                }
            }
        }
        while (!sequenceUpdater.compareAndSet(holder, oldSequences, newSequences));

        return numToRemove != 0;
    }

    private static <T> int countMatching(T[] values, final T toMatch)
    {
        int numToRemove = 0;
        for (T value : values)
        {
            if (value == toMatch) // Specifically uses identity
            {
                numToRemove++;
            }
        }
        return numToRemove;
    }
}

六、EventPoller

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
/**
 * Experimental poll-based interface for the Disruptor.
 */
public class EventPoller<T>
{
    private final DataProvider<T> dataProvider;
    private final Sequencer sequencer;
    private final Sequence sequence;
    private final Sequence gatingSequence;

    public interface Handler<T>
    {
        boolean onEvent(T event, long sequence, boolean endOfBatch) throws Exception;
    }

    public enum PollState
    {
        PROCESSING, GATING, IDLE
    }

    public EventPoller(
        final DataProvider<T> dataProvider,
        final Sequencer sequencer,
        final Sequence sequence,
        final Sequence gatingSequence)
    {
        this.dataProvider = dataProvider;
        this.sequencer = sequencer;
        this.sequence = sequence;
        this.gatingSequence = gatingSequence;
    }

    public PollState poll(final Handler<T> eventHandler) throws Exception
    {
        final long currentSequence = sequence.get();
        long nextSequence = currentSequence + 1;
        final long availableSequence = sequencer.getHighestPublishedSequence(nextSequence, gatingSequence.get());

        if (nextSequence <= availableSequence)
        {
            boolean processNextEvent;
            long processedSequence = currentSequence;

            try
            {
                do
                {
                    final T event = dataProvider.get(nextSequence);
                    processNextEvent = eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
                    processedSequence = nextSequence;
                    nextSequence++;

                }
                while (nextSequence <= availableSequence & processNextEvent);
            }
            finally
            {
                sequence.set(processedSequence);
            }

            return PollState.PROCESSING;
        }
        else if (sequencer.getCursor() >= nextSequence)
        {
            return PollState.GATING;
        }
        else
        {
            return PollState.IDLE;
        }
    }

    public static <T> EventPoller<T> newInstance(
        final DataProvider<T> dataProvider,
        final Sequencer sequencer,
        final Sequence sequence,
        final Sequence cursorSequence,
        final Sequence... gatingSequences)
    {
        Sequence gatingSequence;
        if (gatingSequences.length == 0)
        {
            gatingSequence = cursorSequence;
        }
        else if (gatingSequences.length == 1)
        {
            gatingSequence = gatingSequences[0];
        }
        else
        {
            gatingSequence = new FixedSequenceGroup(gatingSequences);
        }

        return new EventPoller<T>(dataProvider, sequencer, sequence, gatingSequence);
    }

    public Sequence getSequence()
    {
        return sequence;
    }
}