Sequence源码分析

简介

Disruptor使用序列作为一种手段来确定特定组件的位置。每个消费者(EventProcessor)都像Disruptor本身一样维护一个Sequence。大部分并发代码依赖于这些Sequence值 的移动,因此Sequence支持AtomicLong的许多当前特性。事实上,2之间唯一真正的区别是序列包含额外的功能,以防止序列和其他值之间的错误共享(CPU缓存伪共享(Flase Sharing)问题)。

源码

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
/*
 * Copyright 2012 LMAX Ltd.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.lmax.disruptor;

import sun.misc.Unsafe;

import com.lmax.disruptor.util.Util;

// 左边缓存行填充
class LhsPadding
{
    protected long p1, p2, p3, p4, p5, p6, p7;
}
 // 序列
class Value extends LhsPadding
{
    protected volatile long value;
}

// 右边缓存行填充
class RhsPadding extends Value
{
    protected long p9, p10, p11, p12, p13, p14, p15;
}

/**
 * <p>Concurrent sequence class used for tracking the progress of
 * the ring buffer and event processors.  Support a number
 * of concurrent operations including CAS and order writes.
 * 同步序列类用于跟踪环形缓冲区和事件处理器的进度。 支持一些并发操作,包括CAS和顺序写入。
 * <p>Also attempts to be more efficient with regards to false
 * sharing by adding padding around the volatile field.
 */
public class Sequence extends RhsPadding
{
    static final long INITIAL_VALUE = -1L;
    private static final Unsafe UNSAFE;
    private static final long VALUE_OFFSET;

    static
    {
        UNSAFE = Util.getUnsafe();
        try
        {
            VALUE_OFFSET = UNSAFE.objectFieldOffset(Value.class.getDeclaredField("value"));
        }
        catch (final Exception e)
        {
            throw new RuntimeException(e);
        }
    }

    /**
     * Create a sequence initialised to -1.
     */
    public Sequence()
    {
        this(INITIAL_VALUE);
    }

    /**
     * Create a sequence with a specified initial value.
     * 创建具有指定初始值的序列。
     *
     * @param initialValue The initial value for this sequence.
     */
    public Sequence(final long initialValue)
    {
        UNSAFE.putOrderedLong(this, VALUE_OFFSET, initialValue);
    }

    /**
     * Perform a volatile read of this sequence's value.
     * 对这个序列的值执行一个不稳定的读取。
     * @return The current value of the sequence.
     */
    public long get()
    {
        return value;
    }

    /**
     * Perform an ordered write of this sequence.  The intent is
     * a Store/Store barrier between this write and any previous
     * store.
     * 执行这个序列的有序写。意图是在此写入和任何之前的存储之间的Store/Store屏障。
     * @param value The new value for the sequence.
     */
    public void set(final long value)
    {
        UNSAFE.putOrderedLong(this, VALUE_OFFSET, value);
    }

    /**
     * Performs a volatile write of this sequence.  The intent is
     * a Store/Store barrier between this write and any previous
     * write and a Store/Load barrier between this write and any
     * subsequent volatile read.
     * 
     * 执行这个序列的volatile的写。意图是在此写入和之前的任何写入之间的Store/Store屏障,以及在此写入和随后的volatile读取之间的Store/Load屏障。
     *
     * @param value The new value for the sequence.
     */
    public void setVolatile(final long value)
    {
        UNSAFE.putLongVolatile(this, VALUE_OFFSET, value);
    }

    /**
     * Perform a compare and set operation on the sequence.
     * CAS修改序列
     * @param expectedValue The expected current value.
     * @param newValue The value to update to.
     * @return true if the operation succeeds, false otherwise.
     */
    public boolean compareAndSet(final long expectedValue, final long newValue)
    {
        return UNSAFE.compareAndSwapLong(this, VALUE_OFFSET, expectedValue, newValue);
    }

    /**
     * Atomically increment the sequence by one.
     * 递增
     * @return The value after the increment
     */
    public long incrementAndGet()
    {
        return addAndGet(1L);
    }

    /**
     * Atomically add the supplied value.
     * 原子地添加所提供的值。
     * @param increment The value to add to the sequence.
     * @return The value after the increment.
     */
    public long addAndGet(final long increment)
    {
        long currentValue;
        long newValue;

        do
        {
            currentValue = get();
            newValue = currentValue + increment;
        }
        while (!compareAndSet(currentValue, newValue));

        return newValue;
    }

    @Override
    public String toString()
    {
        return Long.toString(get());
    }
}


实现缓存行填充

LhsPadding[56字节]+RhsPadding[56字节]+Value[8字节]=120字节,在加上对象Sequence[8字节]=128字节。现在处理器的缓存行最大128字节。因此没有伪共享, 就没有和其它任何变量的意外冲突,没有不必要的缓存未命中。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
  // 左边缓存行填充
  class LhsPadding
  {
      protected long p1, p2, p3, p4, p5, p6, p7;
  }
   // 序列
  class Value extends LhsPadding
  {
      protected volatile long value;
  }
  
  // 右边缓存行填充
  class RhsPadding extends Value
  {
      protected long p9, p10, p11, p12, p13, p14, p15;
  }

Sequence的无限增长超过Long.MAX_VALUE=(2«63)-1怎么办?

从代码看,为了简化seqence的比较(头尾的判断逻辑),一直递增,总会超过Long.MAX_VALUE,后续会出现负数,进而出现死循环代码。可以靠定期重启系统来解决。

Martin Flower的解释:This does mean that if they process a billion transactionsper second the counter will wrap in 292 years, causing some hell to breakloose. They have decided that fixing this is not a high priority.