一、介绍

Bootstrap和ServerBootstrap的类图

Bootstrap是客服端实现,ServerBootstrap服务端实现

二、AbstractBootstrap 源码实现

1.属性

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {

    volatile EventLoopGroup group;//线程池
    @SuppressWarnings("deprecation")
    // 通道工厂,主要用来创建初始的Channel,比如服务端的第一个执行bind()方法的serverChannel,
    // 客户端第一个执行connect()方法的Channel
    private volatile ChannelFactory<? extends C> channelFactory;
    private volatile SocketAddress localAddress;
    //配置
    private final Map<ChannelOption<?>, Object> options = new LinkedHashMap<ChannelOption<?>, Object>();
    private final Map<AttributeKey<?>, Object> attrs = new LinkedHashMap<AttributeKey<?>, Object>();
    //业务逻辑handler
    private volatile ChannelHandler handler;

2.构造

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
  AbstractBootstrap() {
        // Disallow extending from a different package.
    }

    AbstractBootstrap(AbstractBootstrap<B, C> bootstrap) {
        group = bootstrap.group;
        channelFactory = bootstrap.channelFactory;
        handler = bootstrap.handler;
        localAddress = bootstrap.localAddress;
        synchronized (bootstrap.options) {
            options.putAll(bootstrap.options);
        }
        synchronized (bootstrap.attrs) {
            attrs.putAll(bootstrap.attrs);
        }
    }

3.bind 接口

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
 /**
     * Create a new {@link Channel} and bind it.
     */
    public ChannelFuture bind(SocketAddress localAddress) {
        validate();
        if (localAddress == null) {
            throw new NullPointerException("localAddress");
        }
        return doBind(localAddress);
    }

    private ChannelFuture doBind(final SocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }

        if (regFuture.isDone()) {
            // At this point we know that the registration was complete and successful.
            //在这一点上,我们知道注册是完整和成功的。
            ChannelPromise promise = channel.newPromise();
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        } else {
            // Registration future is almost always fulfilled already, but just in case it's not.
            //注册的未来几乎总是已经实现了,但以防万一。
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    Throwable cause = future.cause();
                    if (cause != null) {
                       promise.setFailure(cause);
                    } else {
                        // Registration was successful, so set the correct executor to use.
                        //注册成功,所以设置正确的执行器
                        // See https://github.com/netty/netty/issues/2586
                        promise.registered();

                        doBind0(regFuture, channel, localAddress, promise);
                    }
                }
            });
            return promise;
        }
    }

1).initAndRegister 初始化和注册channel

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            channel = channelFactory.newChannel();//创建channel
            init(channel);       //钩子方法,子类实现
        } catch (Throwable t) {
          ...
        }
        //Channel注册操作
        ChannelFuture regFuture = config().group().register(channel);//config()是抽象方法
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }
        return regFuture;
    }

2).doBind0 EventLoop提交channel.bind任务

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
  private static void doBind0(
            final ChannelFuture regFuture, final Channel channel,
            final SocketAddress localAddress, final ChannelPromise promise) {

        // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
        // the pipeline in its channelRegistered() implementation.
        //这个方法在channelRegistered()被触发之前被调用。给用户处理程序一个在channelRegistered()实现中设置pipeline的机会。
        channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                if (regFuture.isSuccess()) {
                    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                    promise.setFailure(regFuture.cause());
                }
            }
        });
    }

三、ServerBootstrap

1.属性

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {

    private static final InternalLogger logger = InternalLoggerFactory.getInstance(ServerBootstrap.class);
    //sub线程配置
    private final Map<ChannelOption<?>, Object> childOptions = new LinkedHashMap<ChannelOption<?>, Object>();
    private final Map<AttributeKey<?>, Object> childAttrs = new LinkedHashMap<AttributeKey<?>, Object>();
    private final ServerBootstrapConfig config = new ServerBootstrapConfig(this);
    private volatile EventLoopGroup childGroup;//sub线程
    private volatile ChannelHandler childHandler;//sub线程ChannelHandler
}    

2. 构造

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
    public ServerBootstrap() { }

    private ServerBootstrap(ServerBootstrap bootstrap) {
        super(bootstrap);
        childGroup = bootstrap.childGroup;
        childHandler = bootstrap.childHandler;
        synchronized (bootstrap.childOptions) {
            childOptions.putAll(bootstrap.childOptions);
        }
        synchronized (bootstrap.childAttrs) {
            childAttrs.putAll(bootstrap.childAttrs);
        }
    }

3.重写init

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50

   @Override
   void init(Channel channel) throws Exception {
       final Map<ChannelOption<?>, Object> options = options0();//main线程配置
       synchronized (options) {
           setChannelOptions(channel, options, logger);
       }

       final Map<AttributeKey<?>, Object> attrs = attrs0();
       synchronized (attrs) {
           for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
               @SuppressWarnings("unchecked")
               AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
               channel.attr(key).set(e.getValue());
           }
       }

       ChannelPipeline p = channel.pipeline();

       final EventLoopGroup currentChildGroup = childGroup;
       final ChannelHandler currentChildHandler = childHandler;
       final Entry<ChannelOption<?>, Object>[] currentChildOptions;
       final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
       synchronized (childOptions) {
           currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
       }
       synchronized (childAttrs) {
           currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
       }

       p.addLast(new ChannelInitializer<Channel>() {
           @Override1
           public void initChannel(final Channel ch) throws Exception {
               final ChannelPipeline pipeline = ch.pipeline();
               ChannelHandler handler = config.handler();
               if (handler != null) {
                   pipeline.addLast(handler);
               }

               ch.eventLoop().execute(new Runnable() {
                   @Override
                   public void run() {
                       //交给sub线程池处理业务或者io操作
                       pipeline.addLast(new ServerBootstrapAcceptor(
                               ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                   }
               });
           }
       });
   }

4.ServerBootstrapAcceptor 源码分析

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
    private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {
        
      
        private final EventLoopGroup childGroup;
        private final ChannelHandler childHandler;
        private final Entry<ChannelOption<?>, Object>[] childOptions;
        private final Entry<AttributeKey<?>, Object>[] childAttrs;
        private final Runnable enableAutoReadTask;

        ServerBootstrapAcceptor(
                final Channel channel, EventLoopGroup childGroup, ChannelHandler childHandler,
                Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {
            this.childGroup = childGroup;
            this.childHandler = childHandler;
            this.childOptions = childOptions;
            this.childAttrs = childAttrs;

            // Task which is scheduled to re-enable auto-read.
            // It's important to create this Runnable before we try to submit it as otherwise the URLClassLoader may
            // not be able to load the class because of the file limit it already reached.
            //计划重新启用自动读取的任务。在我们尝试提交之前创建这个Runnable非常重要,否则由于文件限制已经到达,URLClassLoader可能无法加载该类
            // See https://github.com/netty/netty/issues/1328
            enableAutoReadTask = new Runnable() {
                @Override
                public void run() {
                    channel.config().setAutoRead(true);
                }
            };
        }

        @Override
        @SuppressWarnings("unchecked")
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            final Channel child = (Channel) msg;

            child.pipeline().addLast(childHandler);

            setChannelOptions(child, childOptions, logger);

            for (Entry<AttributeKey<?>, Object> e: childAttrs) {
                child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
            }

            try {
                // 将客户端channel注册到subReactor线程池,注册失败或者抛出异常则关闭channel
                childGroup.register(child).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (!future.isSuccess()) {
                            forceClose(child, future.cause());
                        }
                    }
                });
            } catch (Throwable t) {
                forceClose(child, t);
            }
        }

        private static void forceClose(Channel child, Throwable t) {
            child.unsafe().closeForcibly();
            logger.warn("Failed to register an accepted channel: {}", child, t);
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            final ChannelConfig config = ctx.channel().config();
            if (config.isAutoRead()) {
                // stop accept new connections for 1 second to allow the channel to recover
                //停止接受新的连接1秒,以允许通道恢复。
                // See https://github.com/netty/netty/issues/1328
                config.setAutoRead(false);
                ctx.channel().eventLoop().schedule(enableAutoReadTask, 1, TimeUnit.SECONDS);
            }
            // still let the exceptionCaught event flow through the pipeline to give the user
            // a chance to do something with it
            //仍然让exceptionCaught事件流经管道,给用户一个机会去做一些事情
            ctx.fireExceptionCaught(cause);
        }
    }

三、Bootstrap

1.属性

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
public class Bootstrap extends AbstractBootstrap<Bootstrap, Channel> {

    private static final InternalLogger logger = InternalLoggerFactory.getInstance(Bootstrap.class);

    private static final AddressResolverGroup<?> DEFAULT_RESOLVER = DefaultAddressResolverGroup.INSTANCE;

    private final BootstrapConfig config = new BootstrapConfig(this);//配置

    //地址解析器实例。
    @SuppressWarnings("unchecked")
    private final AddressResolverGroup<SocketAddress> resolver =
            DEFAULT_RESOLVER;
    private volatile SocketAddress remoteAddress;//远程地址
}    

2.构造

1
2
3
4
5
6
7
    public Bootstrap() { }

    private Bootstrap(Bootstrap bootstrap) {
        super(bootstrap);
        resolver = bootstrap.resolver;
        remoteAddress = bootstrap.remoteAddress;
    }

3.init

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19

   @Override
   @SuppressWarnings("unchecked")
   void init(Channel channel) throws Exception {
       ChannelPipeline p = channel.pipeline();
       p.addLast(config.handler());//添加ChannelHander

       final Map<ChannelOption<?>, Object> options = options0();
       synchronized (options) {
           setChannelOptions(channel, options, logger);
       }

       final Map<AttributeKey<?>, Object> attrs = attrs0();
       synchronized (attrs) {
           for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
               channel.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
           }
       }
   }

4.connect 链接服务地址

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
   /**
     * Connect a {@link Channel} to the remote peer.
     */
    public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
        if (remoteAddress == null) {
            throw new NullPointerException("remoteAddress");
        }
        validate();
        return doResolveAndConnect(remoteAddress, localAddress);
    }

    /**
     * @see #connect()
     */
    private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister();//初始化init和注册channel
        final Channel channel = regFuture.channel();

        if (regFuture.isDone()) {
            if (!regFuture.isSuccess()) {
                return regFuture;
            }
            return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
        } else {
            // Registration future is almost always fulfilled already, but just in case it's not.
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    // Directly obtain the cause and do a null check so we only need one volatile read in case of a
                    // failure.
                    Throwable cause = future.cause();
                    if (cause != null) {
                        // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                        // IllegalStateException once we try to access the EventLoop of the Channel.
                        promise.setFailure(cause);
                    } else {
                        // Registration was successful, so set the correct executor to use.
                        // See https://github.com/netty/netty/issues/2586
                        promise.registered();
                        doResolveAndConnect0(channel, remoteAddress, localAddress, promise);
                    }
                }
            });
            return promise;
        }
    }

1). doResolveAndConnect0

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
 private ChannelFuture doResolveAndConnect0(final Channel channel, SocketAddress remoteAddress,
                                               final SocketAddress localAddress, final ChannelPromise promise) {
        try {
            final EventLoop eventLoop = channel.eventLoop();
            final AddressResolver<SocketAddress> resolver = this.resolver.getResolver(eventLoop);

            if (!resolver.isSupported(remoteAddress) || resolver.isResolved(remoteAddress)) {
                // Resolver has no idea about what to do with the specified remote address or it's resolved already.
                doConnect(remoteAddress, localAddress, promise);
                return promise;
            }

            final Future<SocketAddress> resolveFuture = resolver.resolve(remoteAddress);

            if (resolveFuture.isDone()) {
                final Throwable resolveFailureCause = resolveFuture.cause();

                if (resolveFailureCause != null) {
                    // Failed to resolve immediately
                    channel.close();
                    promise.setFailure(resolveFailureCause);
                } else {
                    // Succeeded to resolve immediately; cached? (or did a blocking lookup)
                    //成功立即解决;缓存? (或做了阻止查找)
                    doConnect(resolveFuture.getNow(), localAddress, promise);
                }
                return promise;
            }

            // Wait until the name resolution is finished.
            resolveFuture.addListener(new FutureListener<SocketAddress>() {
                @Override
                public void operationComplete(Future<SocketAddress> future) throws Exception {
                    if (future.cause() != null) {
                        channel.close();
                        promise.setFailure(future.cause());
                    } else {
                        doConnect(future.getNow(), localAddress, promise);
                    }
                }
            });
        } catch (Throwable cause) {
            promise.tryFailure(cause);
        }
        return promise;
    }

2). doConnect

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
     private static void doConnect(
             final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise connectPromise) {
 
         // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
         // the pipeline in its channelRegistered() implementation.
         //这个方法在channelRegistered()被触发之前被调用。给用户处理程序一个在channelRegistered()实现中设置pipeline的机会。
         final Channel channel = connectPromise.channel();
         channel.eventLoop().execute(new Runnable() {
             @Override
             public void run() {
                 if (localAddress == null) {
                     channel.connect(remoteAddress, connectPromise);
                 } else {
                     channel.connect(remoteAddress, localAddress, connectPromise);
                 }
                 connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
             }
         });
     }