【注意】最后更新于 May 1, 2019,文中内容可能已过时,请谨慎使用。
一、介绍
流量整形(Traffic Shaping)是一种主动调整流量输出速率的措施。流量整形与流量监管的主要区别在于,流量整形对流量监管中需要丢弃的报文进行缓存——通常是将它们放入缓冲区或队列内,
也称流量整形(Traffic Shaping,简称TS)。当报文的发送速度过快时,首先在缓冲区进行缓存;再通过流量计量算法的控制下“均匀”地发送这些被缓冲的报文。
流量整形与流量监管的另一区别是,整形可能会增加延迟,而监管几乎不引入额外的延迟。
1.Netty实现

Netty提供了GlobalTrafficShapingHandler、ChannelTrafficShapingHandler、GlobalChannelTrafficShapingHandler三个类来实现流量整形;
他们都继承AbstractTrafficShapingHandler抽象类的实现类,最终调用TrafficCounter处理;
二、TrafficCounter
计算速率限制流量的读取和写入字节数。
它以给定的checkInterval定期计算入站和出站流量的统计信息,并回调AbstractTrafficShapingHandler.doAccounting(TrafficCounter)方法。
如果checkInterval为0,将不进行任何记录,并且仅在每次接收或写入操作时计算统计信息。
1.readTimeToWait 给定的limitTraffic和最大等待时间,返回给定长度消息的等待时间
- sum * 1000 / limitTraffic - interval + pastDelay 计算核心
- sum * 1000 / limitTraffic 当前总读书字节花费多少时间(s)*1000
- interval 当前区间已过去间隔
- pastDelay 上个区间超出时间
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
|
public long readTimeToWait(final long size, final long limitTraffic, final long maxTime, final long now) {
//统计当前读字节数
//currentReadBytes.addAndGet(recv);
//cumulativeReadBytes.addAndGet(recv);
bytesRecvFlowControl(size);
if (size == 0 || limitTraffic == 0) {
return 0;
}
//统计区间开始时间
final long lastTimeCheck = lastTime.get();
//当前总字节
long sum = currentReadBytes.get();
//当前+等待时间
long localReadingTime = readingTime;
//最后区间读取总字节数
long lastRB = lastReadBytes;
//间隔(当前时间-区间开始时间)
final long interval = now - lastTimeCheck;
//最后区间超过延迟时间
long pastDelay = Math.max(lastReadingTime - lastTimeCheck, 0);
if (interval > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
// sum * 1000 / limitTraffic ==> 总读书字节花费多少时间(s)
// 转化为毫秒* 1000
// 下面计算要等待时间
long time = sum * 1000 / limitTraffic - interval + pastDelay;
if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
...
//now + time - localReadingTime
// localReadingTime比now大不可能??
if (time > maxTime && now + time - localReadingTime > maxTime) {
time = maxTime;
}
readingTime = Math.max(localReadingTime, now + time);
return time;
}
readingTime = Math.max(localReadingTime, now);
return 0;
}
// 进行前区间+当前区间计算
long lastsum = sum + lastRB;
long lastinterval = interval + checkInterval.get();
long time = lastsum * 1000 / limitTraffic - lastinterval + pastDelay;
if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
...
if (time > maxTime && now + time - localReadingTime > maxTime) {
time = maxTime;
}
readingTime = Math.max(localReadingTime, now + time);
return time;
}
readingTime = Math.max(localReadingTime, now);
return 0;
}
|
2.writeTimeToWait 使用给定的limitTraffic和最大等待时间,返回给定长度消息的等待时间
- 计算逻辑跟readTimeToWait相似不列出代码
3.resetAccounting
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
synchronized void resetAccounting(long newLastTime) {
long interval = newLastTime - lastTime.getAndSet(newLastTime);
if (interval == 0) {
// nothing to do
return;
}
...
lastReadBytes = currentReadBytes.getAndSet(0);
lastWrittenBytes = currentWrittenBytes.getAndSet(0);
lastReadThroughput = lastReadBytes * 1000 / interval;
// nb byte / checkInterval in ms * 1000 (1s)
lastWriteThroughput = lastWrittenBytes * 1000 / interval;
// nb byte / checkInterval in ms * 1000 (1s)
realWriteThroughput = realWrittenBytes.getAndSet(0) * 1000 / interval;
lastWritingTime = Math.max(lastWritingTime, writingTime);
lastReadingTime = Math.max(lastReadingTime, readingTime);
}
|
4.start
- 开始监视过程
- 添加调用任务,checkInterval
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
public synchronized void start() {
if (monitorActive) {
return;
}
lastTime.set(milliSecondFromNano());
long localCheckInterval = checkInterval.get();
// if executor is null, it means it is piloted by a GlobalChannelTrafficCounter, so no executor
if (localCheckInterval > 0 && executor != null) {
monitorActive = true;
monitor = new TrafficMonitoringTask();
scheduledFuture =
executor.schedule(monitor, localCheckInterval, TimeUnit.MILLISECONDS);
}
}
|
a.TrafficMonitoringTask 监控任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
private final class TrafficMonitoringTask implements Runnable {
@Override
public void run() {
if (!monitorActive) {
return;
}
//重置操作
resetAccounting(milliSecondFromNano());
if (trafficShapingHandler != null) {
//调用trafficShapingHandler.doAccounting
trafficShapingHandler.doAccounting(TrafficCounter.this);
}
scheduledFuture = executor.schedule(this, checkInterval.get(), TimeUnit.MILLISECONDS);
}
}
|
三、AbstractTrafficShapingHandler
使用TrafficCounter中的监视器来实现对带宽的几乎实时的监视,该监视器将回调此处理程序的doAccounting方法的每个checkInterval。
- 如果出于任何特定原因想要停止监视(计数)或更改读/写限制或检查间隔,则有几种方法可以满足您的需要:
- configure允许您更改读取或写入限制,或checkInterval
- getTrafficCounter允许您访问TrafficCounter,从而可以停止或启动监视,直接更改checkInterval或访问其值。
- TrafficCounter是计算速率限制流量的读写字节数;
1.channelRead 读书数据
- config.setAutoRead(false) 设置Channel不能读取数据
- 等待调度任务,把config.setAutoRead(true),可以读取数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
long size = calculateSize(msg);
long now = TrafficCounter.milliSecondFromNano();
if (size > 0) {
// 计算重新打开通道之前要等待的毫秒数
long wait = trafficCounter.readTimeToWait(size, readLimit, maxTime, now);
//钩子方法
wait = checkWaitReadTime(ctx, wait, now);
if (wait >= MINIMAL_WAIT) { // At least 10ms seems a minimal
Channel channel = ctx.channel();
ChannelConfig config = channel.config();
...
if (config.isAutoRead() && isHandlerActive(ctx)) {
//把当前Channel设置读取为false
config.setAutoRead(false);
//标示读取已暂停
channel.attr(READ_SUSPENDED).set(true);
// Create a Runnable to reactive the read if needed. If one was create before it will just be
// reused to limit object creation
Attribute<Runnable> attr = channel.attr(REOPEN_TASK);
Runnable reopenTask = attr.get();
//这里做个缓存处理,下次不用在重新创建ReopenReadTimerTask
if (reopenTask == null) {
reopenTask = new ReopenReadTimerTask(ctx);
attr.set(reopenTask);
}
//设置调度任务,时间设置wait
ctx.executor().schedule(reopenTask, wait, TimeUnit.MILLISECONDS);
...
}
}
}
informReadOperation(ctx, now);
ctx.fireChannelRead(msg);
}
|
a.ReopenReadTimerTask
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
@Override
public void run() {
Channel channel = ctx.channel();
ChannelConfig config = channel.config();
if (!config.isAutoRead() && isHandlerActive(ctx)) {
...
channel.attr(READ_SUSPENDED).set(false);
} else {
...
channel.attr(READ_SUSPENDED).set(false);
config.setAutoRead(true);
channel.read();
}
...
}
|
2.write 操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
@Override
public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise)
throws Exception {
//计算大小
long size = calculateSize(msg);
long now = TrafficCounter.milliSecondFromNano();
if (size > 0) {
// 计算继续使用该通道之前要等待的毫秒数
long wait = trafficCounter.writeTimeToWait(size, writeLimit, maxTime, now);
if (wait >= MINIMAL_WAIT) {
...
submitWrite(ctx, msg, size, wait, now, promise);
return;
}
}
// 保持写顺序
// 抽象方法
submitWrite(ctx, msg, size, 0, now, promise);
}
|
四、ChannelTrafficShapingHandler
因为流量整形的精度取决于计算流量的时间段。间隔越高,流量整形的精度越低。建议将其设置为接近5或10分钟的较高值。
maxTimeToWait(默认设置为15s)允许指定时间整形的上限。
1.handlerAdded 每个Channel创建TrafficCounter
- 为我们每个Channel创建TrafficCounter
1
2
3
4
5
6
7
8
|
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
TrafficCounter trafficCounter = new TrafficCounter(this, ctx.executor(), "ChannelTC" +
ctx.channel().hashCode(), checkInterval);
setTrafficCounter(trafficCounter);
trafficCounter.start();
super.handlerAdded(ctx);
}
|
2.submitWrite 提交写
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
@Override
void submitWrite(final ChannelHandlerContext ctx, final Object msg,
final long size, final long delay, final long now,
final ChannelPromise promise) {
final ToSend newToSend;
// 写入顺序控制
synchronized (this) {
if (delay == 0 && messagesQueue.isEmpty()) {
trafficCounter.bytesRealWriteFlowControl(size);
ctx.write(msg, promise);
return;
}
newToSend = new ToSend(delay + now, msg, promise);
messagesQueue.addLast(newToSend);
queueSize += size;
//根据通道的延迟和大小检查可写性。必要时设置setUserDefinedWritability状态。
checkWriteSuspend(ctx, delay, queueSize);
}
final long futureNow = newToSend.relativeTimeAction;
//调度
ctx.executor().schedule(new Runnable() {
@Override
public void run() {
sendAllValid(ctx, futureNow);
}
}, delay, TimeUnit.MILLISECONDS);
}
|
a.sendAllValid
- 写入顺序控制
- 从队列读取数据,以及根据时间判断,判断时间大立马退出,可能后面没有延迟,这一来顺序不能保障
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
private void sendAllValid(final ChannelHandlerContext ctx, final long now) {
// 写入顺序控制
synchronized (this) {
ToSend newToSend = messagesQueue.pollFirst();
//messagesQueue.pollFirst() 读取第一个元素
for (; newToSend != null; newToSend = messagesQueue.pollFirst()) {
if (newToSend.relativeTimeAction <= now) {
long size = calculateSize(newToSend.toSend);
trafficCounter.bytesRealWriteFlowControl(size);
queueSize -= size;
ctx.write(newToSend.toSend, newToSend.promise);
} else {
//不满足,把元素添加到首尾
//立马退出
messagesQueue.addFirst(newToSend);
break;
}
}
if (messagesQueue.isEmpty()) {
//明确释放“写入暂停”状态。
releaseWriteSuspended(ctx);
}
}
ctx.flush();
}
|
五、ChannelTrafficShapingHandler
GlobalTrafficShapingHandler全局,全局就一个TrafficCounter,对写操作时保持顺序性,ConcurrentMap保持数据,Channel对应队列;相互不影响;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
/**
* key对象Channel.hashCode()
*/
private final ConcurrentMap<Integer, PerChannel> channelQueues = PlatformDependent.newConcurrentHashMap();
/**
* Global queues size
*/
private final AtomicLong queuesSize = new AtomicLong();
/**
* Max size in the list before proposing to stop writing new objects from next handlers
* for all channel (global)
*/
long maxGlobalWriteSize = DEFAULT_MAX_SIZE * 100; // default 400MB
private static final class PerChannel {
ArrayDeque<ToSend> messagesQueue;
long queueSize;
long lastWriteTimestamp;
long lastReadTimestamp;
}
|
六、GlobalChannelTrafficShapingHandler
相比于GlobalTrafficShapingHandler增加了一个误差概念,以平衡各个Channel间的读/写操作。也就是说,使得各个Channel间的读/写操作尽量均衡。
比如,尽量避免不同Channel的大数据包都延迟近乎一样的是时间再操作,以及如果小数据包在一个大数据包后才发送,则减少该小数据包的延迟发送时间等。。