一、介绍

流量整形(Traffic Shaping)是一种主动调整流量输出速率的措施。流量整形与流量监管的主要区别在于,流量整形对流量监管中需要丢弃的报文进行缓存——通常是将它们放入缓冲区或队列内, 也称流量整形(Traffic Shaping,简称TS)。当报文的发送速度过快时,首先在缓冲区进行缓存;再通过流量计量算法的控制下“均匀”地发送这些被缓冲的报文。 流量整形与流量监管的另一区别是,整形可能会增加延迟,而监管几乎不引入额外的延迟。

1.Netty实现

Netty提供了GlobalTrafficShapingHandler、ChannelTrafficShapingHandler、GlobalChannelTrafficShapingHandler三个类来实现流量整形; 他们都继承AbstractTrafficShapingHandler抽象类的实现类,最终调用TrafficCounter处理;

二、TrafficCounter

计算速率限制流量的读取和写入字节数。 它以给定的checkInterval定期计算入站和出站流量的统计信息,并回调AbstractTrafficShapingHandler.doAccounting(TrafficCounter)方法。 如果checkInterval为0,将不进行任何记录,并且仅在每次接收或写入操作时计算统计信息。

1.readTimeToWait 给定的limitTraffic和最大等待时间,返回给定长度消息的等待时间

  • sum * 1000 / limitTraffic - interval + pastDelay 计算核心
    • sum * 1000 / limitTraffic 当前总读书字节花费多少时间(s)*1000
    • interval 当前区间已过去间隔
    • pastDelay 上个区间超出时间
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
    public long readTimeToWait(final long size, final long limitTraffic, final long maxTime, final long now) {
         //统计当前读字节数
         //currentReadBytes.addAndGet(recv);
        //cumulativeReadBytes.addAndGet(recv);
        bytesRecvFlowControl(size);
        if (size == 0 || limitTraffic == 0) {
            return 0;
        }
        //统计区间开始时间
        final long lastTimeCheck = lastTime.get();
        //当前总字节
        long sum = currentReadBytes.get();
        //当前+等待时间
        long localReadingTime = readingTime;
        //最后区间读取总字节数
        long lastRB = lastReadBytes;
        //间隔(当前时间-区间开始时间)
        final long interval = now - lastTimeCheck;
        //最后区间超过延迟时间
        long pastDelay = Math.max(lastReadingTime - lastTimeCheck, 0);
        if (interval > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
            // sum * 1000 / limitTraffic ==> 总读书字节花费多少时间(s)
            // 转化为毫秒* 1000
            // 下面计算要等待时间
            long time = sum * 1000 / limitTraffic - interval + pastDelay;
            if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
                ...
                //now + time - localReadingTime
                // localReadingTime比now大不可能??
                if (time > maxTime && now + time - localReadingTime > maxTime) {
                    time = maxTime;
                }
                readingTime = Math.max(localReadingTime, now + time);
                return time;
            }
            readingTime = Math.max(localReadingTime, now);
            return 0;
        }
        // 进行前区间+当前区间计算
        long lastsum = sum + lastRB;
        long lastinterval = interval + checkInterval.get();
        long time = lastsum * 1000 / limitTraffic - lastinterval + pastDelay;
        if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
            ...
            if (time > maxTime && now + time - localReadingTime > maxTime) {
                time = maxTime;
            }
            readingTime = Math.max(localReadingTime, now + time);
            return time;
        }
        readingTime = Math.max(localReadingTime, now);
        return 0;
    }

2.writeTimeToWait 使用给定的limitTraffic和最大等待时间,返回给定长度消息的等待时间

  • 计算逻辑跟readTimeToWait相似不列出代码

3.resetAccounting

  • 重置信息
  • 开始下个区间计算统计
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
   synchronized void resetAccounting(long newLastTime) {
       long interval = newLastTime - lastTime.getAndSet(newLastTime);
       if (interval == 0) {
           // nothing to do
           return;
       }
       ...
       lastReadBytes = currentReadBytes.getAndSet(0);
       lastWrittenBytes = currentWrittenBytes.getAndSet(0);
       lastReadThroughput = lastReadBytes * 1000 / interval;
       // nb byte / checkInterval in ms * 1000 (1s)
       lastWriteThroughput = lastWrittenBytes * 1000 / interval;
       // nb byte / checkInterval in ms * 1000 (1s)
       realWriteThroughput = realWrittenBytes.getAndSet(0) * 1000 / interval;
       lastWritingTime = Math.max(lastWritingTime, writingTime);
       lastReadingTime = Math.max(lastReadingTime, readingTime);
   } 

4.start

  • 开始监视过程
  • 添加调用任务,checkInterval
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
   public synchronized void start() {
       if (monitorActive) {
           return;
       }
       lastTime.set(milliSecondFromNano());
       long localCheckInterval = checkInterval.get();
       // if executor is null, it means it is piloted by a GlobalChannelTrafficCounter, so no executor
       if (localCheckInterval > 0 && executor != null) {
           monitorActive = true;
           monitor = new TrafficMonitoringTask();
           scheduledFuture =
               executor.schedule(monitor, localCheckInterval, TimeUnit.MILLISECONDS);
       }
   }

a.TrafficMonitoringTask 监控任务

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
   private final class TrafficMonitoringTask implements Runnable {
       @Override
       public void run() {
           if (!monitorActive) {
               return;
           }
           //重置操作
           resetAccounting(milliSecondFromNano());
           if (trafficShapingHandler != null) {
              //调用trafficShapingHandler.doAccounting
               trafficShapingHandler.doAccounting(TrafficCounter.this);
           }
           scheduledFuture = executor.schedule(this, checkInterval.get(), TimeUnit.MILLISECONDS);
       }
   } 

三、AbstractTrafficShapingHandler

使用TrafficCounter中的监视器来实现对带宽的几乎实时的监视,该监视器将回调此处理程序的doAccounting方法的每个checkInterval。

  • 如果出于任何特定原因想要停止监视(计数)或更改读/写限制或检查间隔,则有几种方法可以满足您的需要:
    • configure允许您更改读取或写入限制,或checkInterval
    • getTrafficCounter允许您访问TrafficCounter,从而可以停止或启动监视,直接更改checkInterval或访问其值。
  • TrafficCounter是计算速率限制流量的读写字节数;

1.channelRead 读书数据

  • config.setAutoRead(false) 设置Channel不能读取数据
  • 等待调度任务,把config.setAutoRead(true),可以读取数据
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
   @Override
   public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
       long size = calculateSize(msg);
       long now = TrafficCounter.milliSecondFromNano();
       if (size > 0) {
           // 计算重新打开通道之前要等待的毫秒数
           long wait = trafficCounter.readTimeToWait(size, readLimit, maxTime, now);
           //钩子方法
           wait = checkWaitReadTime(ctx, wait, now);
           if (wait >= MINIMAL_WAIT) { // At least 10ms seems a minimal
               Channel channel = ctx.channel();
               ChannelConfig config = channel.config();
               ...
               if (config.isAutoRead() && isHandlerActive(ctx)) {
                   //把当前Channel设置读取为false
                   config.setAutoRead(false);
                   //标示读取已暂停
                   channel.attr(READ_SUSPENDED).set(true);
                   // Create a Runnable to reactive the read if needed. If one was create before it will just be
                   // reused to limit object creation
                   Attribute<Runnable> attr = channel.attr(REOPEN_TASK);
                   Runnable reopenTask = attr.get();
                   //这里做个缓存处理,下次不用在重新创建ReopenReadTimerTask
                   if (reopenTask == null) {
                       reopenTask = new ReopenReadTimerTask(ctx);
                       attr.set(reopenTask);
                   }
                   //设置调度任务,时间设置wait
                   ctx.executor().schedule(reopenTask, wait, TimeUnit.MILLISECONDS);
                   ...
               }
           }
       }
       informReadOperation(ctx, now);
       ctx.fireChannelRead(msg);
   } 

a.ReopenReadTimerTask

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
    @Override
       public void run() {
           Channel channel = ctx.channel();
           ChannelConfig config = channel.config();
           if (!config.isAutoRead() && isHandlerActive(ctx)) {
               ...
               channel.attr(READ_SUSPENDED).set(false);
           } else {
               ...
               channel.attr(READ_SUSPENDED).set(false);
               config.setAutoRead(true);
               channel.read();
           }
           ...
       } 

2.write 操作

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
   @Override
   public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise)
           throws Exception {
       //计算大小 
       long size = calculateSize(msg);
       long now = TrafficCounter.milliSecondFromNano();
       if (size > 0) {
           // 计算继续使用该通道之前要等待的毫秒数
           long wait = trafficCounter.writeTimeToWait(size, writeLimit, maxTime, now);
           if (wait >= MINIMAL_WAIT) {
               ...
               submitWrite(ctx, msg, size, wait, now, promise);
               return;
           }
       }
       // 保持写顺序 
       // 抽象方法
       submitWrite(ctx, msg, size, 0, now, promise);
   }

四、ChannelTrafficShapingHandler

因为流量整形的精度取决于计算流量的时间段。间隔越高,流量整形的精度越低。建议将其设置为接近5或10分钟的较高值。 maxTimeToWait(默认设置为15s)允许指定时间整形的上限。

1.handlerAdded 每个Channel创建TrafficCounter

  • 为我们每个Channel创建TrafficCounter
1
2
3
4
5
6
7
8
   @Override
   public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
       TrafficCounter trafficCounter = new TrafficCounter(this, ctx.executor(), "ChannelTC" +
               ctx.channel().hashCode(), checkInterval);
       setTrafficCounter(trafficCounter);
       trafficCounter.start();
       super.handlerAdded(ctx);
   } 

2.submitWrite 提交写

  • 队列缓存延迟发送数据
  • 根据延迟值调度任务
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
   @Override
   void submitWrite(final ChannelHandlerContext ctx, final Object msg,
           final long size, final long delay, final long now,
           final ChannelPromise promise) {
       final ToSend newToSend;
       // 写入顺序控制
       synchronized (this) {
           if (delay == 0 && messagesQueue.isEmpty()) {
               trafficCounter.bytesRealWriteFlowControl(size);
               ctx.write(msg, promise);
               return;
           }
          
           newToSend = new ToSend(delay + now, msg, promise);
           messagesQueue.addLast(newToSend);
           queueSize += size;
           //根据通道的延迟和大小检查可写性。必要时设置setUserDefinedWritability状态。
           checkWriteSuspend(ctx, delay, queueSize);
       }
       final long futureNow = newToSend.relativeTimeAction;
       //调度
       ctx.executor().schedule(new Runnable() {
           @Override
           public void run() {
               sendAllValid(ctx, futureNow);
           }
       }, delay, TimeUnit.MILLISECONDS);
   }

a.sendAllValid

  • 写入顺序控制
  • 从队列读取数据,以及根据时间判断,判断时间大立马退出,可能后面没有延迟,这一来顺序不能保障
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
   private void sendAllValid(final ChannelHandlerContext ctx, final long now) {
       // 写入顺序控制
       synchronized (this) {
           ToSend newToSend = messagesQueue.pollFirst();
            //messagesQueue.pollFirst() 读取第一个元素 
           for (; newToSend != null; newToSend = messagesQueue.pollFirst()) {
               if (newToSend.relativeTimeAction <= now) {
                   long size = calculateSize(newToSend.toSend);
                   trafficCounter.bytesRealWriteFlowControl(size);
                   queueSize -= size;
                   ctx.write(newToSend.toSend, newToSend.promise);
               } else {
                   //不满足,把元素添加到首尾
                   //立马退出
                   messagesQueue.addFirst(newToSend);
                   break;
               }
           }
           if (messagesQueue.isEmpty()) {
               //明确释放“写入暂停”状态。
               releaseWriteSuspended(ctx);
           }
       }
       ctx.flush();
   } 

五、ChannelTrafficShapingHandler

GlobalTrafficShapingHandler全局,全局就一个TrafficCounter,对写操作时保持顺序性,ConcurrentMap保持数据,Channel对应队列;相互不影响;

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
  /**
   * key对象Channel.hashCode()
   */
  private final ConcurrentMap<Integer, PerChannel> channelQueues = PlatformDependent.newConcurrentHashMap();

  /**
   * Global queues size
   */
  private final AtomicLong queuesSize = new AtomicLong();

  /**
   * Max size in the list before proposing to stop writing new objects from next handlers
   * for all channel (global)
   */
  long maxGlobalWriteSize = DEFAULT_MAX_SIZE * 100; // default 400MB

  private static final class PerChannel {
      ArrayDeque<ToSend> messagesQueue;
      long queueSize;
      long lastWriteTimestamp;
      long lastReadTimestamp;
  }  

六、GlobalChannelTrafficShapingHandler

相比于GlobalTrafficShapingHandler增加了一个误差概念,以平衡各个Channel间的读/写操作。也就是说,使得各个Channel间的读/写操作尽量均衡。 比如,尽量避免不同Channel的大数据包都延迟近乎一样的是时间再操作,以及如果小数据包在一个大数据包后才发送,则减少该小数据包的延迟发送时间等。。