RingBuffer分析

简介

环形的缓冲区。曾经 RingBuffer 是 Disruptor 中的最主要的对象,但从3.0版本开始,其职责被简化为仅仅负责对通过 Disruptor 进行交换的数据(事件) 进行存储和更新。在一些更高级的应用场景中,Ring Buffer 可以由用户的自定义实现来完全替代。

一、RingBuffer接口实现

RingBuffer实现了一系列接口,Cursored、EventSequencer和EventSink。Cursored接口在Sequencer分析具体说明。

1.EventSequencer接口

2.EventSink接口说明

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
public interface EventSink<E>
{
    /**
     * Publishes an event to the ring buffer.  It handles
     * claiming the next sequence, getting the current (uninitialised)
     * event from the ring buffer and publishing the claimed sequence
     * after translation.
     * 
     * 将事件发布到环形缓冲区。它处理声明下一个序列,从环形缓冲区获取当前(未初始化)的事件,并在translation后发布声明的序列
     * @param translator The user specified translation for the event
     */
    void publishEvent(EventTranslator<E> translator);

    /**
     * Attempts to publish an event to the ring buffer.  It handles
     * claiming the next sequence, getting the current (uninitialised)
     * event from the ring buffer and publishing the claimed sequence
     * after translation.  Will return false if specified capacity
     * was not available.
     *
     * 尝试将事件发布到循环缓冲区。它处理声明的下一个序列,从环形缓冲区获取当前(未初始化)事件,并在translator后发布声明的序列。如果没有指定的容量,将返回false。
     * 
     * @param translator The user specified translation for the event
     * @return true if the value was published, false if there was insufficient
     * capacity.
     */
    boolean tryPublishEvent(EventTranslator<E> translator);

    /**
     * Allows one user supplied argument.
     *
     * 允许一个用户提供参数。
     * @param <A> Class of the user supplied argument
     * @param translator The user specified translation for the event
     * @param arg0       A user supplied argument.
     * @see #publishEvent(EventTranslator)
     */
    <A> void publishEvent(EventTranslatorOneArg<E, A> translator, A arg0);

    /**
     * Allows one user supplied argument.
     * 允许一个用户提供参数。
     * @param <A> Class of the user supplied argument
     * @param translator The user specified translation for the event
     * @param arg0       A user supplied argument.
     * @return true if the value was published, false if there was insufficient
     * capacity.
     * @see #tryPublishEvent(EventTranslator)
     */
    <A> boolean tryPublishEvent(EventTranslatorOneArg<E, A> translator, A arg0);

    /**
     * Allows two user supplied arguments.
     *  允许两个用户提供参数。
     * @param <A> Class of the user supplied argument
     * @param <B> Class of the user supplied argument
     * @param translator The user specified translation for the event
     * @param arg0       A user supplied argument.
     * @param arg1       A user supplied argument.
     * @see #publishEvent(EventTranslator)
     */
    <A, B> void publishEvent(EventTranslatorTwoArg<E, A, B> translator, A arg0, B arg1);

    /**
     * Allows two user supplied arguments.
     * 允许两个用户提供参数。
     * @param <A> Class of the user supplied argument
     * @param <B> Class of the user supplied argument
     * @param translator The user specified translation for the event
     * @param arg0       A user supplied argument.
     * @param arg1       A user supplied argument.
     * @return true if the value was published, false if there was insufficient
     * capacity.
     * @see #tryPublishEvent(EventTranslator)
     */
    <A, B> boolean tryPublishEvent(EventTranslatorTwoArg<E, A, B> translator, A arg0, B arg1);

    /**
     * Allows three user supplied arguments
     * 允许三个用户提供参数
     * @param <A> Class of the user supplied argument
     * @param <B> Class of the user supplied argument
     * @param <C> Class of the user supplied argument
     * @param translator The user specified translation for the event
     * @param arg0       A user supplied argument.
     * @param arg1       A user supplied argument.
     * @param arg2       A user supplied argument.
     * @see #publishEvent(EventTranslator)
     */
    <A, B, C> void publishEvent(EventTranslatorThreeArg<E, A, B, C> translator, A arg0, B arg1, C arg2);

    /**
     * Allows three user supplied arguments
     * 允许三个用户提供参数
     * @param <A> Class of the user supplied argument
     * @param <B> Class of the user supplied argument
     * @param <C> Class of the user supplied argument
     * @param translator The user specified translation for the event
     * @param arg0       A user supplied argument.
     * @param arg1       A user supplied argument.
     * @param arg2       A user supplied argument.
     * @return true if the value was published, false if there was insufficient
     * capacity.
     * @see #publishEvent(EventTranslator)
     */
    <A, B, C> boolean tryPublishEvent(EventTranslatorThreeArg<E, A, B, C> translator, A arg0, B arg1, C arg2);

    /**
     * Allows a variable number of user supplied arguments
     * 允许可变数量的用户提供参数
     * @param translator The user specified translation for the event
     * @param args       User supplied arguments.
     * @see #publishEvent(EventTranslator)
     */
    void publishEvent(EventTranslatorVararg<E> translator, Object... args);

    /**
     * Allows a variable number of user supplied arguments
     * 允许可变数量的用户提供参数
     * @param translator The user specified translation for the event
     * @param args       User supplied arguments.
     * @return true if the value was published, false if there was insufficient
     * capacity.
     * @see #publishEvent(EventTranslator)
     */
    boolean tryPublishEvent(EventTranslatorVararg<E> translator, Object... args);

    /**
     * <p>Publishes multiple events to the ring buffer.  It handles
     * claiming the next sequence, getting the current (uninitialised)
     * event from the ring buffer and publishing the claimed sequence
     * after translation.</p>
     * 
     * 将多个事件发布到循环缓冲区。它处理声明的下一个序列,从环形缓冲区获取当前(未初始化)事件,并在translators后发布声明的序列。
     * 
     * <p>With this call the data that is to be inserted into the ring
     * buffer will be a field (either explicitly or captured anonymously),
     * therefore this call will require an instance of the translator
     * for each value that is to be inserted into the ring buffer.</p>
     *
     * @param translators The user specified translation for each event
     */
    void publishEvents(EventTranslator<E>[] translators);

    /**
     * <p>Publishes multiple events to the ring buffer.  It handles
     * claiming the next sequence, getting the current (uninitialised)
     * event from the ring buffer and publishing the claimed sequence
     * after translation.</p>
     * 将多个事件发布到循环缓冲区。它处理声明的下一个序列,从环形缓冲区获取当前(未初始化)事件,并在translators后发布声明的序列。
     * <p>With this call the data that is to be inserted into the ring
     * buffer will be a field (either explicitly or captured anonymously),
     * therefore this call will require an instance of the translator
     * for each value that is to be inserted into the ring buffer.</p>
     *
     * @param translators   The user specified translation for each event
     * @param batchStartsAt The first element of the array which is within the batch.
     * @param batchSize     The actual size of the batch
     */
    void publishEvents(EventTranslator<E>[] translators, int batchStartsAt, int batchSize);

    /**
     * Attempts to publish multiple events to the ring buffer.  It handles
     * claiming the next sequence, getting the current (uninitialised)
     * event from the ring buffer and publishing the claimed sequence
     * after translation.  Will return false if specified capacity
     * was not available.
     * 尝试将多个事件发布到环形缓冲区。它处理声明的下一个序列,从环形缓冲区获取当前(未初始化)事件,并在translators后发布声明的序列。如果没有指定的容量,将返回false。
     * @param translators The user specified translation for the event
     * @return true if the value was published, false if there was insufficient
     * capacity.
     */
    boolean tryPublishEvents(EventTranslator<E>[] translators);

    /**
     * Attempts to publish multiple events to the ring buffer.  It handles
     * claiming the next sequence, getting the current (uninitialised)
     * event from the ring buffer and publishing the claimed sequence
     * after translation.  Will return false if specified capacity
     * was not available.
     * 尝试将多个事件发布到环形缓冲区。它处理声明的下一个序列,从环形缓冲区获取当前(未初始化)事件,并在translators后发布声明的序列。如果没有指定的容量,将返回false。
     * @param translators   The user specified translation for the event
     * @param batchStartsAt The first element of the array which is within the batch.
     * @param batchSize     The actual size of the batch
     * @return true if all the values were published, false if there was insufficient
     * capacity.
     */
    boolean tryPublishEvents(EventTranslator<E>[] translators, int batchStartsAt, int batchSize);

    /**
     * Allows one user supplied argument per event.
     * 允许一个用户为每个事件提供参数。
     * @param <A> Class of the user supplied argument
     * @param translator The user specified translation for the event
     * @param arg0       A user supplied argument.
     * @see #publishEvents(com.lmax.disruptor.EventTranslator[])
     */
    <A> void publishEvents(EventTranslatorOneArg<E, A> translator, A[] arg0);

    /**
     * Allows one user supplied argument per event.
     * 允许一个用户为每个事件提供参数。
     * @param <A> Class of the user supplied argument
     * @param translator    The user specified translation for each event
     * @param batchStartsAt The first element of the array which is within the batch.
     * @param batchSize     The actual size of the batch
     * @param arg0          An array of user supplied arguments, one element per event.
     * @see #publishEvents(EventTranslator[])
     */
    <A> void publishEvents(EventTranslatorOneArg<E, A> translator, int batchStartsAt, int batchSize, A[] arg0);

    /**
     * Allows one user supplied argument.
     * 允许一个用户提供参数。
     * @param <A> Class of the user supplied argument
     * @param translator The user specified translation for each event
     * @param arg0       An array of user supplied arguments, one element per event.
     * @return true if the value was published, false if there was insufficient
     * capacity.
     * @see #tryPublishEvents(com.lmax.disruptor.EventTranslator[])
     */
    <A> boolean tryPublishEvents(EventTranslatorOneArg<E, A> translator, A[] arg0);

    /**
     * Allows one user supplied argument.
     *  允许一个用户提供参数。
     * @param <A> Class of the user supplied argument
     * @param translator    The user specified translation for each event
     * @param batchStartsAt The first element of the array which is within the batch.
     * @param batchSize     The actual size of the batch
     * @param arg0          An array of user supplied arguments, one element per event.
     * @return true if the value was published, false if there was insufficient
     * capacity.
     * @see #tryPublishEvents(EventTranslator[])
     */
    <A> boolean tryPublishEvents(EventTranslatorOneArg<E, A> translator, int batchStartsAt, int batchSize, A[] arg0);

    /**
     * Allows two user supplied arguments per event.
     * 允许两个用户为每个事件提供参数。
     * @param <A> Class of the user supplied argument
     * @param <B> Class of the user supplied argument
     * @param translator The user specified translation for the event
     * @param arg0       An array of user supplied arguments, one element per event.
     * @param arg1       An array of user supplied arguments, one element per event.
     * @see #publishEvents(com.lmax.disruptor.EventTranslator[])
     */
    <A, B> void publishEvents(EventTranslatorTwoArg<E, A, B> translator, A[] arg0, B[] arg1);

    /**
     * Allows two user supplied arguments per event.
     * 允许两个用户为每个事件提供参数。
     * @param <A> Class of the user supplied argument
     * @param <B> Class of the user supplied argument
     * @param translator    The user specified translation for the event
     * @param batchStartsAt The first element of the array which is within the batch.
     * @param batchSize     The actual size of the batch.
     * @param arg0          An array of user supplied arguments, one element per event.
     * @param arg1          An array of user supplied arguments, one element per event.
     * @see #publishEvents(EventTranslator[])
     */
    <A, B> void publishEvents(
        EventTranslatorTwoArg<E, A, B> translator, int batchStartsAt, int batchSize, A[] arg0,
        B[] arg1);

    /**
     * Allows two user supplied arguments per event.
     * 允许两个用户为每个事件提供参数。
     * @param <A> Class of the user supplied argument
     * @param <B> Class of the user supplied argument
     * @param translator The user specified translation for the event
     * @param arg0       An array of user supplied arguments, one element per event.
     * @param arg1       An array of user supplied arguments, one element per event.
     * @return true if the value was published, false if there was insufficient
     * capacity.
     * @see #tryPublishEvents(com.lmax.disruptor.EventTranslator[])
     */
    <A, B> boolean tryPublishEvents(EventTranslatorTwoArg<E, A, B> translator, A[] arg0, B[] arg1);

    /**
     * Allows two user supplied arguments per event.
     * 允许两个用户为每个事件提供参数。
     * @param <A> Class of the user supplied argument
     * @param <B> Class of the user supplied argument
     * @param translator    The user specified translation for the event
     * @param batchStartsAt The first element of the array which is within the batch.
     * @param batchSize     The actual size of the batch.
     * @param arg0          An array of user supplied arguments, one element per event.
     * @param arg1          An array of user supplied arguments, one element per event.
     * @return true if the value was published, false if there was insufficient
     * capacity.
     * @see #tryPublishEvents(EventTranslator[])
     */
    <A, B> boolean tryPublishEvents(
        EventTranslatorTwoArg<E, A, B> translator, int batchStartsAt, int batchSize,
        A[] arg0, B[] arg1);

    /**
     * Allows three user supplied arguments per event.
     * 允许三个用户为每个事件提供参数。
     * @param <A> Class of the user supplied argument
     * @param <B> Class of the user supplied argument
     * @param <C> Class of the user supplied argument
     * @param translator The user specified translation for the event
     * @param arg0       An array of user supplied arguments, one element per event.
     * @param arg1       An array of user supplied arguments, one element per event.
     * @param arg2       An array of user supplied arguments, one element per event.
     * @see #publishEvents(com.lmax.disruptor.EventTranslator[])
     */
    <A, B, C> void publishEvents(EventTranslatorThreeArg<E, A, B, C> translator, A[] arg0, B[] arg1, C[] arg2);

    /**
     * Allows three user supplied arguments per event.
     * 允许三个用户为每个事件提供参数。
     * @param <A> Class of the user supplied argument
     * @param <B> Class of the user supplied argument
     * @param <C> Class of the user supplied argument
     * @param translator    The user specified translation for the event
     * @param batchStartsAt The first element of the array which is within the batch.
     * @param batchSize     The number of elements in the batch.
     * @param arg0          An array of user supplied arguments, one element per event.
     * @param arg1          An array of user supplied arguments, one element per event.
     * @param arg2          An array of user supplied arguments, one element per event.
     * @see #publishEvents(EventTranslator[])
     */
    <A, B, C> void publishEvents(
        EventTranslatorThreeArg<E, A, B, C> translator, int batchStartsAt, int batchSize,
        A[] arg0, B[] arg1, C[] arg2);

    /**
     * Allows three user supplied arguments per event.
     * 允许三个用户为每个事件提供参数。
     * @param <A> Class of the user supplied argument
     * @param <B> Class of the user supplied argument
     * @param <C> Class of the user supplied argument
     * @param translator The user specified translation for the event
     * @param arg0       An array of user supplied arguments, one element per event.
     * @param arg1       An array of user supplied arguments, one element per event.
     * @param arg2       An array of user supplied arguments, one element per event.
     * @return true if the value was published, false if there was insufficient
     * capacity.
     * @see #publishEvents(com.lmax.disruptor.EventTranslator[])
     */
    <A, B, C> boolean tryPublishEvents(EventTranslatorThreeArg<E, A, B, C> translator, A[] arg0, B[] arg1, C[] arg2);

    /**
     * Allows three user supplied arguments per event.
     * 允许三个用户为每个事件提供参数。
     * @param <A> Class of the user supplied argument
     * @param <B> Class of the user supplied argument
     * @param <C> Class of the user supplied argument
     * @param translator    The user specified translation for the event
     * @param batchStartsAt The first element of the array which is within the batch.
     * @param batchSize     The actual size of the batch.
     * @param arg0          An array of user supplied arguments, one element per event.
     * @param arg1          An array of user supplied arguments, one element per event.
     * @param arg2          An array of user supplied arguments, one element per event.
     * @return true if the value was published, false if there was insufficient
     * capacity.
     * @see #publishEvents(EventTranslator[])
     */
    <A, B, C> boolean tryPublishEvents(
        EventTranslatorThreeArg<E, A, B, C> translator, int batchStartsAt,
        int batchSize, A[] arg0, B[] arg1, C[] arg2);

    /**
     * Allows a variable number of user supplied arguments per event.
     * 允许变量数量的用户为每个事件提供参数。
     * @param translator The user specified translation for the event
     * @param args       User supplied arguments, one Object[] per event.
     * @see #publishEvents(com.lmax.disruptor.EventTranslator[])
     */
    void publishEvents(EventTranslatorVararg<E> translator, Object[]... args);

    /**
     * Allows a variable number of user supplied arguments per event.
     * 允许变量数量的用户为每个事件提供参数。
     * @param translator    The user specified translation for the event
     * @param batchStartsAt The first element of the array which is within the batch.
     * @param batchSize     The actual size of the batch
     * @param args          User supplied arguments, one Object[] per event.
     * @see #publishEvents(EventTranslator[])
     */
    void publishEvents(EventTranslatorVararg<E> translator, int batchStartsAt, int batchSize, Object[]... args);

    /**
     * Allows a variable number of user supplied arguments per event.
     * 允许变量数量的用户为每个事件提供参数。
     * @param translator The user specified translation for the event
     * @param args       User supplied arguments, one Object[] per event.
     * @return true if the value was published, false if there was insufficient
     * capacity.
     * @see #publishEvents(com.lmax.disruptor.EventTranslator[])
     */
    boolean tryPublishEvents(EventTranslatorVararg<E> translator, Object[]... args);

    /**
     * Allows a variable number of user supplied arguments per event.
     * 允许变量数量的用户为每个事件提供参数。
     * @param translator    The user specified translation for the event
     * @param batchStartsAt The first element of the array which is within the batch.
     * @param batchSize     The actual size of the batch.
     * @param args          User supplied arguments, one Object[] per event.
     * @return true if the value was published, false if there was insufficient
     * capacity.
     * @see #publishEvents(EventTranslator[])
     */
    boolean tryPublishEvents(EventTranslatorVararg<E> translator, int batchStartsAt, int batchSize, Object[]... args);

}

二、RingBuffer结构实现

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
//缓存行填充56字节
abstract class RingBufferPad
{
   protected long p1, p2, p3, p4, p5, p6, p7;
}

abstract class RingBufferFields<E> extends RingBufferPad
{
   private static final int BUFFER_PAD;      //缓冲填充
   private static final long REF_ARRAY_BASE; //数组偏移量
   private static final int REF_ELEMENT_SHIFT; //数组元素下标位移值
   private static final Unsafe UNSAFE = Util.getUnsafe();

   static
   {
       final int scale = UNSAFE.arrayIndexScale(Object[].class);
       if (4 == scale)
       {
           REF_ELEMENT_SHIFT = 2;
       }
       else if (8 == scale)
       {
           REF_ELEMENT_SHIFT = 3;
       }
       else
       {
           throw new IllegalStateException("Unknown pointer size");
       }
       BUFFER_PAD = 128 / scale;
       // Including the buffer pad in the array base offset 包括数组中的缓冲填充偏移
       // (BUFFER_PAD << REF_ELEMENT_SHIFT)=BUFFER_PAD*scale 从数组下标为BUFFER_PAD计算
       REF_ARRAY_BASE = UNSAFE.arrayBaseOffset(Object[].class) + (BUFFER_PAD << REF_ELEMENT_SHIFT);
   }

   private final long indexMask;   
   private final Object[] entries; // 保存数组
   protected final int bufferSize; //大小
   protected final Sequencer sequencer; // 序列

   RingBufferFields(
       EventFactory<E> eventFactory,
       Sequencer sequencer)
   {
       this.sequencer = sequencer;
       this.bufferSize = sequencer.getBufferSize();

       if (bufferSize < 1)
       {
           throw new IllegalArgumentException("bufferSize must not be less than 1");
       }
       if (Integer.bitCount(bufferSize) != 1) //缓冲区大小必须是2的幂
       {
           throw new IllegalArgumentException("bufferSize must be a power of 2");
       }

       this.indexMask = bufferSize - 1;
       this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];
       fill(eventFactory);
   }
   // 初始化数组
   private void fill(EventFactory<E> eventFactory)
   {
       for (int i = 0; i < bufferSize; i++)
       {
           entries[BUFFER_PAD + i] = eventFactory.newInstance();
       }
   }

   @SuppressWarnings("unchecked")
   protected final E elementAt(long sequence)
   {   // (sequence & indexMask) 获取下标 ((sequence & indexMask) << REF_ELEMENT_SHIFT) 获取数组下标偏移量 
       return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT));
   }
}

/**
* Ring based store of reusable entries containing the data representing
* an event being exchanged between event producer and {@link EventProcessor}s.
*
* @param <E> implementation storing the data for sharing during exchange or parallel coordination of an event.
*/
public final class RingBuffer<E> extends RingBufferFields<E> implements Cursored, EventSequencer<E>, EventSink<E>
{
   public static final long INITIAL_CURSOR_VALUE = Sequence.INITIAL_VALUE;
   protected long p1, p2, p3, p4, p5, p6, p7;

   /**
    * Construct a RingBuffer with the full option set.
    *
    * @param eventFactory to newInstance entries for filling the RingBuffer
    * @param sequencer    sequencer to handle the ordering of events moving through the RingBuffer.
    * @throws IllegalArgumentException if bufferSize is less than 1 or not a power of 2
    */
   RingBuffer(
       EventFactory<E> eventFactory,
       Sequencer sequencer)
   {
       super(eventFactory, sequencer);
   }

   /**
    * Create a new multiple producer RingBuffer with the specified wait strategy.
    * 使用指定的等待策略创建一个新的多个生产者循环缓冲区。
    * @param <E> Class of the event stored in the ring buffer.
    * @param factory      used to create the events within the ring buffer.
    * @param bufferSize   number of elements to create within the ring buffer.
    * @param waitStrategy used to determine how to wait for new elements to become available.
    * @return a constructed ring buffer.
    * @throws IllegalArgumentException if bufferSize is less than 1 or not a power of 2
    * @see MultiProducerSequencer
    */
   public static <E> RingBuffer<E> createMultiProducer(
       EventFactory<E> factory,
       int bufferSize,
       WaitStrategy waitStrategy)
   {
       MultiProducerSequencer sequencer = new MultiProducerSequencer(bufferSize, waitStrategy);

       return new RingBuffer<E>(factory, sequencer);
   }

   /**
    * Create a new multiple producer RingBuffer using the default wait strategy  {@link BlockingWaitStrategy}.
    * 使用默认的等待策略创建一个新的多个生产者循环缓冲区
    * @param <E> Class of the event stored in the ring buffer.
    * @param factory    used to create the events within the ring buffer.
    * @param bufferSize number of elements to create within the ring buffer.
    * @return a constructed ring buffer.
    * @throws IllegalArgumentException if <tt>bufferSize</tt> is less than 1 or not a power of 2
    * @see MultiProducerSequencer
    */
   public static <E> RingBuffer<E> createMultiProducer(EventFactory<E> factory, int bufferSize)
   {
       return createMultiProducer(factory, bufferSize, new BlockingWaitStrategy());
   }

   /**
    * Create a new single producer RingBuffer with the specified wait strategy.
    * 使用指定的等待策略创建新的单个生产者循环缓冲区。
    * @param <E> Class of the event stored in the ring buffer.
    * @param factory      used to create the events within the ring buffer.
    * @param bufferSize   number of elements to create within the ring buffer.
    * @param waitStrategy used to determine how to wait for new elements to become available.
    * @return a constructed ring buffer.
    * @throws IllegalArgumentException if bufferSize is less than 1 or not a power of 2
    * @see SingleProducerSequencer
    */
   public static <E> RingBuffer<E> createSingleProducer(
       EventFactory<E> factory,
       int bufferSize,
       WaitStrategy waitStrategy)
   {
       SingleProducerSequencer sequencer = new SingleProducerSequencer(bufferSize, waitStrategy);

       return new RingBuffer<E>(factory, sequencer);
   }

   /**
    * Create a new single producer RingBuffer using the default wait strategy  {@link BlockingWaitStrategy}.
    * 使用默认的等待策略创建新的单个生产者循环缓冲区
    * @param <E> Class of the event stored in the ring buffer.
    * @param factory    used to create the events within the ring buffer.
    * @param bufferSize number of elements to create within the ring buffer.
    * @return a constructed ring buffer.
    * @throws IllegalArgumentException if <tt>bufferSize</tt> is less than 1 or not a power of 2
    * @see MultiProducerSequencer
    */
   public static <E> RingBuffer<E> createSingleProducer(EventFactory<E> factory, int bufferSize)
   {
       return createSingleProducer(factory, bufferSize, new BlockingWaitStrategy());
   }

   /**
    * Create a new Ring Buffer with the specified producer type (SINGLE or MULTI)
    * 使用指定的生产者类型(SINGLE或MULTI)创建一个新的环形缓冲区
    * @param <E> Class of the event stored in the ring buffer.
    * @param producerType producer type to use {@link ProducerType}.
    * @param factory      used to create events within the ring buffer.
    * @param bufferSize   number of elements to create within the ring buffer.
    * @param waitStrategy used to determine how to wait for new elements to become available.
    * @return a constructed ring buffer.
    * @throws IllegalArgumentException if bufferSize is less than 1 or not a power of 2
    */
   public static <E> RingBuffer<E> create(
       ProducerType producerType,
       EventFactory<E> factory,
       int bufferSize,
       WaitStrategy waitStrategy)
   {
       switch (producerType)
       {
           case SINGLE:
               return createSingleProducer(factory, bufferSize, waitStrategy);
           case MULTI:
               return createMultiProducer(factory, bufferSize, waitStrategy);
           default:
               throw new IllegalStateException(producerType.toString());
       }
   }
}

三、Cursored接口实现方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
    /**
     * Get the current cursor value for the ring buffer.  The actual value received
     * will depend on the type of {@link Sequencer} that is being used.
     *
     * @see MultiProducerSequencer
     * @see SingleProducerSequencer
     */
    @Override
    public long getCursor()
    {
        return sequencer.getCursor();
    }

四、EventSequencer接口实现方法

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
 /**
     * <p>Get the event for a given sequence in the RingBuffer.</p>
     *
     * <p>This call has 2 uses.  Firstly use this call when publishing to a ring buffer.
     * After calling {@link RingBuffer#next()} use this call to get hold of the
     * preallocated event to fill with data before calling {@link RingBuffer#publish(long)}.</p>
     *
     * <p>Secondly use this call when consuming data from the ring buffer.  After calling
     * {@link SequenceBarrier#waitFor(long)} call this method with any value greater than
     * that your current consumer sequence and less than or equal to the value returned from
     * the {@link SequenceBarrier#waitFor(long)} method.</p>
     *
     * @param sequence for the event
     * @return the event for the given sequence
     */
    @Override
    public E get(long sequence)
    {
        return elementAt(sequence);
    }

    /**
     * Increment and return the next sequence for the ring buffer.  Calls of this
     * method should ensure that they always publish the sequence afterward.  E.g.
     * <pre>
     * long sequence = ringBuffer.next();
     * try {
     *     Event e = ringBuffer.get(sequence);
     *     // Do some work with the event.
     * } finally {
     *     ringBuffer.publish(sequence);
     * }
     * </pre>
     *
     * @return The next sequence to publish to.
     * @see RingBuffer#publish(long)
     * @see RingBuffer#get(long)
     */
    @Override
    public long next()
    {
        return sequencer.next();
    }

    /**
     * The same functionality as {@link RingBuffer#next()}, but allows the caller to claim
     * the next n sequences.
     *
     * @param n number of slots to claim
     * @return sequence number of the highest slot claimed
     * @see Sequencer#next(int)
     */
    @Override
    public long next(int n)
    {
        return sequencer.next(n);
    }

    /**
     * <p>Increment and return the next sequence for the ring buffer.  Calls of this
     * method should ensure that they always publish the sequence afterward.  E.g.</p>
     * <pre>
     * long sequence = ringBuffer.next();
     * try {
     *     Event e = ringBuffer.get(sequence);
     *     // Do some work with the event.
     * } finally {
     *     ringBuffer.publish(sequence);
     * }
     * </pre>
     * <p>This method will not block if there is not space available in the ring
     * buffer, instead it will throw an {@link InsufficientCapacityException}.</p>
     *
     * @return The next sequence to publish to.
     * @throws InsufficientCapacityException if the necessary space in the ring buffer is not available
     * @see RingBuffer#publish(long)
     * @see RingBuffer#get(long)
     */
    @Override
    public long tryNext() throws InsufficientCapacityException
    {
        return sequencer.tryNext();
    }

    /**
     * The same functionality as {@link RingBuffer#tryNext()}, but allows the caller to attempt
     * to claim the next n sequences.
     *
     * @param n number of slots to claim
     * @return sequence number of the highest slot claimed
     * @throws InsufficientCapacityException if the necessary space in the ring buffer is not available
     */
    @Override
    public long tryNext(int n) throws InsufficientCapacityException
    {
        return sequencer.tryNext(n);
    }
     /**
        * Add the specified gating sequences to this instance of the Disruptor.  They will
        * safely and atomically added to the list of gating sequences.
        *
        * @param gatingSequences The sequences to add.
        */
       public void addGatingSequences(Sequence... gatingSequences)
       {
           sequencer.addGatingSequences(gatingSequences);
       }
   
       /**
        * Get the minimum sequence value from all of the gating sequences
        * added to this ringBuffer.
        *
        * @return The minimum gating sequence or the cursor sequence if
        * no sequences have been added.
        */
       public long getMinimumGatingSequence()
       {
           return sequencer.getMinimumSequence();
       }
   
       /**
        * Remove the specified sequence from this ringBuffer.
        *
        * @param sequence to be removed.
        * @return <tt>true</tt> if this sequence was found, <tt>false</tt> otherwise.
        */
       public boolean removeGatingSequence(Sequence sequence)
       {
           return sequencer.removeGatingSequence(sequence);
       }
   
       /**
        * Create a new SequenceBarrier to be used by an EventProcessor to track which messages
        * are available to be read from the ring buffer given a list of sequences to track.
        *
        * @param sequencesToTrack the additional sequences to track
        * @return A sequence barrier that will track the specified sequences.
        * @see SequenceBarrier
        */
       public SequenceBarrier newBarrier(Sequence... sequencesToTrack)
       {
           return sequencer.newBarrier(sequencesToTrack);
       }
   
       /**
        * Creates an event poller for this ring buffer gated on the supplied sequences.
        *
        * @param gatingSequences to be gated on.
        * @return A poller that will gate on this ring buffer and the supplied sequences.
        */
       public EventPoller<E> newPoller(Sequence... gatingSequences)
       {
           return sequencer.newPoller(this, gatingSequences);
       }

四、EventSink接口实现方法

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664

  /**
   * @see com.lmax.disruptor.EventSink#publishEvent(com.lmax.disruptor.EventTranslator)
   */
  @Override
  public void publishEvent(EventTranslator<E> translator)
  {
      final long sequence = sequencer.next();
      translateAndPublish(translator, sequence);
  }

  /**
   * @see com.lmax.disruptor.EventSink#tryPublishEvent(com.lmax.disruptor.EventTranslator)
   */
  @Override
  public boolean tryPublishEvent(EventTranslator<E> translator)
  {
      try
      {
          final long sequence = sequencer.tryNext();
          translateAndPublish(translator, sequence);
          return true;
      }
      catch (InsufficientCapacityException e)
      {
          return false;
      }
  }

  /**
   * @see com.lmax.disruptor.EventSink#publishEvent(com.lmax.disruptor.EventTranslatorOneArg, Object)
   * com.lmax.disruptor.EventSink#publishEvent(com.lmax.disruptor.EventTranslatorOneArg, A)
   */
  @Override
  public <A> void publishEvent(EventTranslatorOneArg<E, A> translator, A arg0)
  {
      final long sequence = sequencer.next();
      translateAndPublish(translator, sequence, arg0);
  }

  /**
   * @see com.lmax.disruptor.EventSink#tryPublishEvent(com.lmax.disruptor.EventTranslatorOneArg, Object)
   * com.lmax.disruptor.EventSink#tryPublishEvent(com.lmax.disruptor.EventTranslatorOneArg, A)
   */
  @Override
  public <A> boolean tryPublishEvent(EventTranslatorOneArg<E, A> translator, A arg0)
  {
      try
      {
          final long sequence = sequencer.tryNext();
          translateAndPublish(translator, sequence, arg0);
          return true;
      }
      catch (InsufficientCapacityException e)
      {
          return false;
      }
  }

  /**
   * @see com.lmax.disruptor.EventSink#publishEvent(com.lmax.disruptor.EventTranslatorTwoArg, Object, Object)
   * com.lmax.disruptor.EventSink#publishEvent(com.lmax.disruptor.EventTranslatorTwoArg, A, B)
   */
  @Override
  public <A, B> void publishEvent(EventTranslatorTwoArg<E, A, B> translator, A arg0, B arg1)
  {
      final long sequence = sequencer.next();
      translateAndPublish(translator, sequence, arg0, arg1);
  }

  /**
   * @see com.lmax.disruptor.EventSink#tryPublishEvent(com.lmax.disruptor.EventTranslatorTwoArg, Object, Object)
   * com.lmax.disruptor.EventSink#tryPublishEvent(com.lmax.disruptor.EventTranslatorTwoArg, A, B)
   */
  @Override
  public <A, B> boolean tryPublishEvent(EventTranslatorTwoArg<E, A, B> translator, A arg0, B arg1)
  {
      try
      {
          final long sequence = sequencer.tryNext();
          translateAndPublish(translator, sequence, arg0, arg1);
          return true;
      }
      catch (InsufficientCapacityException e)
      {
          return false;
      }
  }

  /**
   * @see com.lmax.disruptor.EventSink#publishEvent(com.lmax.disruptor.EventTranslatorThreeArg, Object, Object, Object)
   * com.lmax.disruptor.EventSink#publishEvent(com.lmax.disruptor.EventTranslatorThreeArg, A, B, C)
   */
  @Override
  public <A, B, C> void publishEvent(EventTranslatorThreeArg<E, A, B, C> translator, A arg0, B arg1, C arg2)
  {
      final long sequence = sequencer.next();
      translateAndPublish(translator, sequence, arg0, arg1, arg2);
  }

  /**
   * @see com.lmax.disruptor.EventSink#tryPublishEvent(com.lmax.disruptor.EventTranslatorThreeArg, Object, Object, Object)
   * com.lmax.disruptor.EventSink#tryPublishEvent(com.lmax.disruptor.EventTranslatorThreeArg, A, B, C)
   */
  @Override
  public <A, B, C> boolean tryPublishEvent(EventTranslatorThreeArg<E, A, B, C> translator, A arg0, B arg1, C arg2)
  {
      try
      {
          final long sequence = sequencer.tryNext();
          translateAndPublish(translator, sequence, arg0, arg1, arg2);
          return true;
      }
      catch (InsufficientCapacityException e)
      {
          return false;
      }
  }

  /**
   * @see com.lmax.disruptor.EventSink#publishEvent(com.lmax.disruptor.EventTranslatorVararg, java.lang.Object...)
   */
  @Override
  public void publishEvent(EventTranslatorVararg<E> translator, Object... args)
  {
      final long sequence = sequencer.next();
      translateAndPublish(translator, sequence, args);
  }

  /**
   * @see com.lmax.disruptor.EventSink#tryPublishEvent(com.lmax.disruptor.EventTranslatorVararg, java.lang.Object...)
   */
  @Override
  public boolean tryPublishEvent(EventTranslatorVararg<E> translator, Object... args)
  {
      try
      {
          final long sequence = sequencer.tryNext();
          translateAndPublish(translator, sequence, args);
          return true;
      }
      catch (InsufficientCapacityException e)
      {
          return false;
      }
  }


  /**
   * @see com.lmax.disruptor.EventSink#publishEvents(com.lmax.disruptor.EventTranslator[])
   */
  @Override
  public void publishEvents(EventTranslator<E>[] translators)
  {
      publishEvents(translators, 0, translators.length);
  }

  /**
   * @see com.lmax.disruptor.EventSink#publishEvents(com.lmax.disruptor.EventTranslator[], int, int)
   */
  @Override
  public void publishEvents(EventTranslator<E>[] translators, int batchStartsAt, int batchSize)
  {
      checkBounds(translators, batchStartsAt, batchSize);
      final long finalSequence = sequencer.next(batchSize);
      translateAndPublishBatch(translators, batchStartsAt, batchSize, finalSequence);
  }

  /**
   * @see com.lmax.disruptor.EventSink#tryPublishEvents(com.lmax.disruptor.EventTranslator[])
   */
  @Override
  public boolean tryPublishEvents(EventTranslator<E>[] translators)
  {
      return tryPublishEvents(translators, 0, translators.length);
  }

  /**
   * @see com.lmax.disruptor.EventSink#tryPublishEvents(com.lmax.disruptor.EventTranslator[], int, int)
   */
  @Override
  public boolean tryPublishEvents(EventTranslator<E>[] translators, int batchStartsAt, int batchSize)
  {
      checkBounds(translators, batchStartsAt, batchSize);
      try
      {
          final long finalSequence = sequencer.tryNext(batchSize);
          translateAndPublishBatch(translators, batchStartsAt, batchSize, finalSequence);
          return true;
      }
      catch (InsufficientCapacityException e)
      {
          return false;
      }
  }

  /**
   * @see com.lmax.disruptor.EventSink#publishEvents(com.lmax.disruptor.EventTranslatorOneArg, Object[])
   * com.lmax.disruptor.EventSink#publishEvents(com.lmax.disruptor.EventTranslatorOneArg, A[])
   */
  @Override
  public <A> void publishEvents(EventTranslatorOneArg<E, A> translator, A[] arg0)
  {
      publishEvents(translator, 0, arg0.length, arg0);
  }

  /**
   * @see com.lmax.disruptor.EventSink#publishEvents(com.lmax.disruptor.EventTranslatorOneArg, int, int, Object[])
   * com.lmax.disruptor.EventSink#publishEvents(com.lmax.disruptor.EventTranslatorOneArg, int, int, A[])
   */
  @Override
  public <A> void publishEvents(EventTranslatorOneArg<E, A> translator, int batchStartsAt, int batchSize, A[] arg0)
  {
      checkBounds(arg0, batchStartsAt, batchSize);
      final long finalSequence = sequencer.next(batchSize);
      translateAndPublishBatch(translator, arg0, batchStartsAt, batchSize, finalSequence);
  }

  /**
   * @see com.lmax.disruptor.EventSink#tryPublishEvents(com.lmax.disruptor.EventTranslatorOneArg, Object[])
   * com.lmax.disruptor.EventSink#tryPublishEvents(com.lmax.disruptor.EventTranslatorOneArg, A[])
   */
  @Override
  public <A> boolean tryPublishEvents(EventTranslatorOneArg<E, A> translator, A[] arg0)
  {
      return tryPublishEvents(translator, 0, arg0.length, arg0);
  }

  /**
   * @see com.lmax.disruptor.EventSink#tryPublishEvents(com.lmax.disruptor.EventTranslatorOneArg, int, int, Object[])
   * com.lmax.disruptor.EventSink#tryPublishEvents(com.lmax.disruptor.EventTranslatorOneArg, int, int, A[])
   */
  @Override
  public <A> boolean tryPublishEvents(
      EventTranslatorOneArg<E, A> translator, int batchStartsAt, int batchSize, A[] arg0)
  {
      checkBounds(arg0, batchStartsAt, batchSize);
      try
      {
          final long finalSequence = sequencer.tryNext(batchSize);
          translateAndPublishBatch(translator, arg0, batchStartsAt, batchSize, finalSequence);
          return true;
      }
      catch (InsufficientCapacityException e)
      {
          return false;
      }
  }

  /**
   * @see com.lmax.disruptor.EventSink#publishEvents(com.lmax.disruptor.EventTranslatorTwoArg, Object[], Object[])
   * com.lmax.disruptor.EventSink#publishEvents(com.lmax.disruptor.EventTranslatorTwoArg, A[], B[])
   */
  @Override
  public <A, B> void publishEvents(EventTranslatorTwoArg<E, A, B> translator, A[] arg0, B[] arg1)
  {
      publishEvents(translator, 0, arg0.length, arg0, arg1);
  }

  /**
   * @see com.lmax.disruptor.EventSink#publishEvents(com.lmax.disruptor.EventTranslatorTwoArg, int, int, Object[], Object[])
   * com.lmax.disruptor.EventSink#publishEvents(com.lmax.disruptor.EventTranslatorTwoArg, int, int, A[], B[])
   */
  @Override
  public <A, B> void publishEvents(
      EventTranslatorTwoArg<E, A, B> translator, int batchStartsAt, int batchSize, A[] arg0, B[] arg1)
  {
      checkBounds(arg0, arg1, batchStartsAt, batchSize);
      final long finalSequence = sequencer.next(batchSize);
      translateAndPublishBatch(translator, arg0, arg1, batchStartsAt, batchSize, finalSequence);
  }

  /**
   * @see com.lmax.disruptor.EventSink#tryPublishEvents(com.lmax.disruptor.EventTranslatorTwoArg, Object[], Object[])
   * com.lmax.disruptor.EventSink#tryPublishEvents(com.lmax.disruptor.EventTranslatorTwoArg, A[], B[])
   */
  @Override
  public <A, B> boolean tryPublishEvents(EventTranslatorTwoArg<E, A, B> translator, A[] arg0, B[] arg1)
  {
      return tryPublishEvents(translator, 0, arg0.length, arg0, arg1);
  }

  /**
   * @see com.lmax.disruptor.EventSink#tryPublishEvents(com.lmax.disruptor.EventTranslatorTwoArg, int, int, Object[], Object[])
   * com.lmax.disruptor.EventSink#tryPublishEvents(com.lmax.disruptor.EventTranslatorTwoArg, int, int, A[], B[])
   */
  @Override
  public <A, B> boolean tryPublishEvents(
      EventTranslatorTwoArg<E, A, B> translator, int batchStartsAt, int batchSize, A[] arg0, B[] arg1)
  {
      checkBounds(arg0, arg1, batchStartsAt, batchSize);
      try
      {
          final long finalSequence = sequencer.tryNext(batchSize);
          translateAndPublishBatch(translator, arg0, arg1, batchStartsAt, batchSize, finalSequence);
          return true;
      }
      catch (InsufficientCapacityException e)
      {
          return false;
      }
  }

  /**
   * @see com.lmax.disruptor.EventSink#publishEvents(com.lmax.disruptor.EventTranslatorThreeArg, Object[], Object[], Object[])
   * com.lmax.disruptor.EventSink#publishEvents(com.lmax.disruptor.EventTranslatorThreeArg, A[], B[], C[])
   */
  @Override
  public <A, B, C> void publishEvents(EventTranslatorThreeArg<E, A, B, C> translator, A[] arg0, B[] arg1, C[] arg2)
  {
      publishEvents(translator, 0, arg0.length, arg0, arg1, arg2);
  }

  /**
   * @see com.lmax.disruptor.EventSink#publishEvents(com.lmax.disruptor.EventTranslatorThreeArg, int, int, Object[], Object[], Object[])
   * com.lmax.disruptor.EventSink#publishEvents(com.lmax.disruptor.EventTranslatorThreeArg, int, int, A[], B[], C[])
   */
  @Override
  public <A, B, C> void publishEvents(
      EventTranslatorThreeArg<E, A, B, C> translator, int batchStartsAt, int batchSize, A[] arg0, B[] arg1, C[] arg2)
  {
      checkBounds(arg0, arg1, arg2, batchStartsAt, batchSize);
      final long finalSequence = sequencer.next(batchSize);
      translateAndPublishBatch(translator, arg0, arg1, arg2, batchStartsAt, batchSize, finalSequence);
  }

  /**
   * @see com.lmax.disruptor.EventSink#tryPublishEvents(com.lmax.disruptor.EventTranslatorThreeArg, Object[], Object[], Object[])
   * com.lmax.disruptor.EventSink#tryPublishEvents(com.lmax.disruptor.EventTranslatorThreeArg, A[], B[], C[])
   */
  @Override
  public <A, B, C> boolean tryPublishEvents(
      EventTranslatorThreeArg<E, A, B, C> translator, A[] arg0, B[] arg1, C[] arg2)
  {
      return tryPublishEvents(translator, 0, arg0.length, arg0, arg1, arg2);
  }

  /**
   * @see com.lmax.disruptor.EventSink#tryPublishEvents(com.lmax.disruptor.EventTranslatorThreeArg, int, int, Object[], Object[], Object[])
   * com.lmax.disruptor.EventSink#tryPublishEvents(com.lmax.disruptor.EventTranslatorThreeArg, int, int, A[], B[], C[])
   */
  @Override
  public <A, B, C> boolean tryPublishEvents(
      EventTranslatorThreeArg<E, A, B, C> translator, int batchStartsAt, int batchSize, A[] arg0, B[] arg1, C[] arg2)
  {
      checkBounds(arg0, arg1, arg2, batchStartsAt, batchSize);
      try
      {
          final long finalSequence = sequencer.tryNext(batchSize);
          translateAndPublishBatch(translator, arg0, arg1, arg2, batchStartsAt, batchSize, finalSequence);
          return true;
      }
      catch (InsufficientCapacityException e)
      {
          return false;
      }
  }

  /**
   * @see com.lmax.disruptor.EventSink#publishEvents(com.lmax.disruptor.EventTranslatorVararg, java.lang.Object[][])
   */
  @Override
  public void publishEvents(EventTranslatorVararg<E> translator, Object[]... args)
  {
      publishEvents(translator, 0, args.length, args);
  }

  /**
   * @see com.lmax.disruptor.EventSink#publishEvents(com.lmax.disruptor.EventTranslatorVararg, int, int, java.lang.Object[][])
   */
  @Override
  public void publishEvents(EventTranslatorVararg<E> translator, int batchStartsAt, int batchSize, Object[]... args)
  {
      checkBounds(batchStartsAt, batchSize, args);
      final long finalSequence = sequencer.next(batchSize);
      translateAndPublishBatch(translator, batchStartsAt, batchSize, finalSequence, args);
  }

  /**
   * @see com.lmax.disruptor.EventSink#tryPublishEvents(com.lmax.disruptor.EventTranslatorVararg, java.lang.Object[][])
   */
  @Override
  public boolean tryPublishEvents(EventTranslatorVararg<E> translator, Object[]... args)
  {
      return tryPublishEvents(translator, 0, args.length, args);
  }

  /**
   * @see com.lmax.disruptor.EventSink#tryPublishEvents(com.lmax.disruptor.EventTranslatorVararg, int, int, java.lang.Object[][])
   */
  @Override
  public boolean tryPublishEvents(
      EventTranslatorVararg<E> translator, int batchStartsAt, int batchSize, Object[]... args)
  {
      checkBounds(args, batchStartsAt, batchSize);
      try
      {
          final long finalSequence = sequencer.tryNext(batchSize);
          translateAndPublishBatch(translator, batchStartsAt, batchSize, finalSequence, args);
          return true;
      }
      catch (InsufficientCapacityException e)
      {
          return false;
      }
  }

  /**
   * Publish the specified sequence.  This action marks this particular
   * message as being available to be read.
   *
   * @param sequence the sequence to publish.
   */
  @Override
  public void publish(long sequence)
  {
      sequencer.publish(sequence);
  }

  /**
   * Publish the specified sequences.  This action marks these particular
   * messages as being available to be read.
   *
   * @param lo the lowest sequence number to be published
   * @param hi the highest sequence number to be published
   * @see Sequencer#next(int)
   */
  @Override
  public void publish(long lo, long hi)
  {
      sequencer.publish(lo, hi);
  }

  /**
   * Get the remaining capacity for this ringBuffer.
   *
   * @return The number of slots remaining.
   */
  public long remainingCapacity()
  {
      return sequencer.remainingCapacity();
  }

  private void checkBounds(final EventTranslator<E>[] translators, final int batchStartsAt, final int batchSize)
  {
      checkBatchSizing(batchStartsAt, batchSize);
      batchOverRuns(translators, batchStartsAt, batchSize);
  }

  private void checkBatchSizing(int batchStartsAt, int batchSize)
  {
      if (batchStartsAt < 0 || batchSize < 0)
      {
          throw new IllegalArgumentException("Both batchStartsAt and batchSize must be positive but got: batchStartsAt " + batchStartsAt + " and batchSize " + batchSize);
      }
      else if (batchSize > bufferSize)
      {
          throw new IllegalArgumentException("The ring buffer cannot accommodate " + batchSize + " it only has space for " + bufferSize + " entities.");
      }
  }

  private <A> void checkBounds(final A[] arg0, final int batchStartsAt, final int batchSize)
  {
      checkBatchSizing(batchStartsAt, batchSize);
      batchOverRuns(arg0, batchStartsAt, batchSize);
  }

  private <A, B> void checkBounds(final A[] arg0, final B[] arg1, final int batchStartsAt, final int batchSize)
  {
      checkBatchSizing(batchStartsAt, batchSize);
      batchOverRuns(arg0, batchStartsAt, batchSize);
      batchOverRuns(arg1, batchStartsAt, batchSize);
  }

  private <A, B, C> void checkBounds(
      final A[] arg0, final B[] arg1, final C[] arg2, final int batchStartsAt, final int batchSize)
  {
      checkBatchSizing(batchStartsAt, batchSize);
      batchOverRuns(arg0, batchStartsAt, batchSize);
      batchOverRuns(arg1, batchStartsAt, batchSize);
      batchOverRuns(arg2, batchStartsAt, batchSize);
  }

  private void checkBounds(final int batchStartsAt, final int batchSize, final Object[][] args)
  {
      checkBatchSizing(batchStartsAt, batchSize);
      batchOverRuns(args, batchStartsAt, batchSize);
  }

  private <A> void batchOverRuns(final A[] arg0, final int batchStartsAt, final int batchSize)
  {
      if (batchStartsAt + batchSize > arg0.length)
      {
          throw new IllegalArgumentException(
              "A batchSize of: " + batchSize +
                  " with batchStatsAt of: " + batchStartsAt +
                  " will overrun the available number of arguments: " + (arg0.length - batchStartsAt));
      }
  }

  private void translateAndPublish(EventTranslator<E> translator, long sequence)
  {
      try
      {
          translator.translateTo(get(sequence), sequence);
      }
      finally
      {
          sequencer.publish(sequence);
      }
  }

  private <A> void translateAndPublish(EventTranslatorOneArg<E, A> translator, long sequence, A arg0)
  {
      try
      {
          translator.translateTo(get(sequence), sequence, arg0);
      }
      finally
      {
          sequencer.publish(sequence);
      }
  }

  private <A, B> void translateAndPublish(EventTranslatorTwoArg<E, A, B> translator, long sequence, A arg0, B arg1)
  {
      try
      {
          translator.translateTo(get(sequence), sequence, arg0, arg1);
      }
      finally
      {
          sequencer.publish(sequence);
      }
  }

  private <A, B, C> void translateAndPublish(
      EventTranslatorThreeArg<E, A, B, C> translator, long sequence,
      A arg0, B arg1, C arg2)
  {
      try
      {
          translator.translateTo(get(sequence), sequence, arg0, arg1, arg2);
      }
      finally
      {
          sequencer.publish(sequence);
      }
  }

  private void translateAndPublish(EventTranslatorVararg<E> translator, long sequence, Object... args)
  {
      try
      {
          translator.translateTo(get(sequence), sequence, args);
      }
      finally
      {
          sequencer.publish(sequence);
      }
  }

  private void translateAndPublishBatch(
      final EventTranslator<E>[] translators, int batchStartsAt,
      final int batchSize, final long finalSequence)
  {
      final long initialSequence = finalSequence - (batchSize - 1);
      try
      {
          long sequence = initialSequence;
          final int batchEndsAt = batchStartsAt + batchSize;
          for (int i = batchStartsAt; i < batchEndsAt; i++)
          {
              final EventTranslator<E> translator = translators[i];
              translator.translateTo(get(sequence), sequence++);
          }
      }
      finally
      {
          sequencer.publish(initialSequence, finalSequence);
      }
  }

  private <A> void translateAndPublishBatch(
      final EventTranslatorOneArg<E, A> translator, final A[] arg0,
      int batchStartsAt, final int batchSize, final long finalSequence)
  {
      final long initialSequence = finalSequence - (batchSize - 1);
      try
      {
          long sequence = initialSequence;
          final int batchEndsAt = batchStartsAt + batchSize;
          for (int i = batchStartsAt; i < batchEndsAt; i++)
          {
              translator.translateTo(get(sequence), sequence++, arg0[i]);
          }
      }
      finally
      {
          sequencer.publish(initialSequence, finalSequence);
      }
  }

  private <A, B> void translateAndPublishBatch(
      final EventTranslatorTwoArg<E, A, B> translator, final A[] arg0,
      final B[] arg1, int batchStartsAt, int batchSize,
      final long finalSequence)
  {
      final long initialSequence = finalSequence - (batchSize - 1);
      try
      {
          long sequence = initialSequence;
          final int batchEndsAt = batchStartsAt + batchSize;
          for (int i = batchStartsAt; i < batchEndsAt; i++)
          {
              translator.translateTo(get(sequence), sequence++, arg0[i], arg1[i]);
          }
      }
      finally
      {
          sequencer.publish(initialSequence, finalSequence);
      }
  }

  private <A, B, C> void translateAndPublishBatch(
      final EventTranslatorThreeArg<E, A, B, C> translator,
      final A[] arg0, final B[] arg1, final C[] arg2, int batchStartsAt,
      final int batchSize, final long finalSequence)
  {
      final long initialSequence = finalSequence - (batchSize - 1);
      try
      {
          long sequence = initialSequence;
          final int batchEndsAt = batchStartsAt + batchSize;
          for (int i = batchStartsAt; i < batchEndsAt; i++)
          {
              translator.translateTo(get(sequence), sequence++, arg0[i], arg1[i], arg2[i]);
          }
      }
      finally
      {
          sequencer.publish(initialSequence, finalSequence);
      }
  }

  private void translateAndPublishBatch(
      final EventTranslatorVararg<E> translator, int batchStartsAt,
      final int batchSize, final long finalSequence, final Object[][] args)
  {
      final long initialSequence = finalSequence - (batchSize - 1);
      try
      {
          long sequence = initialSequence;
          final int batchEndsAt = batchStartsAt + batchSize;
          for (int i = batchStartsAt; i < batchEndsAt; i++)
          {
              translator.translateTo(get(sequence), sequence++, args[i]);
          }
      }
      finally
      {
          sequencer.publish(initialSequence, finalSequence);
      }
  }