Netty中React线程模型到底是不是主从模型?
一个 Netty 程序启动时, 至少要指定一个 EventLoopGroup(如果使用到的是 NIO, 那么通常是NioEventLoopGroup), 那么这个 NioEventLoopGroup 在 Netty 中到底扮演着什么角色呢? 我们知道, Netty 是 Reactor 模型的一个实现, 那么首先从 Reactor 的线程模型开始吧.
关于 Reactor 的线程模型
首先我们来看一下 Reactor 的线程模型. Reactor 的线程模型有三种:
- 单线程模型
- 多线程模型
- 主从多线程模型
首先来看一下 单线程模型:
所谓单线程, 即 acceptor 处理和 handler 处理都在一个线程中处理. 这个模型的坏处显而易见: 当其中某个 handler 阻塞时, 会导致其他所有的 client 的 handler 都得不到执行, 并且更严重的是, handler 的阻塞也会导致整个服务不能接收新的 client 请求(因为 acceptor 也被阻塞了). 因为有这么多的缺陷, 因此单线程Reactor 模型用的比较少.
那么什么是 多线程模型 呢? Reactor 的多线程模型与单线程模型的区别就是 acceptor 是一个单独的线程处理, 并且有一组特定的 NIO 线程来负责各个客户端连接的 IO 操作. Reactor 多线程模型如下:
Reactor 多线程模型 有如下特点:
- 有专门一个线程, 即 Acceptor 线程用于监听客户端的TCP连接请求.
- 客户端连接的 IO 操作都是由一个特定的 NIO 线程池负责. 每个客户端连接都与一个特定的 NIO 线程绑定, 因此在这个客户端连接中的所有 IO 操作都是在同一个线程中完成的.
- 客户端连接有很多, 但是 NIO 线程数是比较少的, 因此一个 NIO 线程可以同时绑定到多个客户端连接中.
接下来我们再来看一下 Reactor 的主从多线程模型. 一般情况下, Reactor 的多线程模式已经可以很好的工作了, 但是我们考虑一下如下情况: 如果我们的服务器需要同时处理大量的客户端连接请求或我们需要在客户端连接时, 进行一些权限的检查, 那么单线程的 Acceptor 很有可能就处理不过来, 造成了大量的客户端不能连接到服务器. Reactor 的主从多线程模型就是在这样的情况下提出来的, 它的特点是: 服务器端接收客户端的连接请求不再是一个线程, 而是由一个独立的线程池组成. 它的线程模型如下:
可以看到, Reactor 的主从多线程模型和 Reactor 多线程模型很类似, 只不过 Reactor 的主从多线程模型的 acceptor 使用了线程池来处理大量的客户端请求.
NioEventLoopGroup 与 Reactor 线程模型的对应
在Netty中实现了Reactor模式,来看看Netty是如何实现这些线程模型的:注意:本文章Netty版本为4.1.x,不同版本源码会略有不同
单线程模型
来看一下下面的例子:
EventLoopGroup bossGroup = new NioEventLoopGroup(1); ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup) .channel(NioServerSocketChannel.class) ...
注意, 我们实例化了一个 NioEventLoopGroup, 构造器参数是1, 表示 NioEventLoopGroup 的线程池大小是1. 然后接着我们调用 b.group(bossGroup) 设置了服务器端的 EventLoopGroup. 有些朋友可能会有疑惑: 我记得在启动服务器端的 Netty 程序时, 是需要设置 bossGroup 和 workerGroup 的, 为什么这里就只有一个 bossGroup? 其实很简单, ServerBootstrap 重写了 group 方法:
@Override public ServerBootstrap group(EventLoopGroup group) { return group(group, group); }
因此当传入一个 group 时, 那么 bossGroup 和 workerGroup 就是同一个 NioEventLoopGroup 了. 这时候呢, 因为 bossGroup 和 workerGroup 就是同一个 NioEventLoopGroup, 并且这个 NioEventLoopGroup 只有一个线程, 这样就会导致 Netty 中的 acceptor 和后续的所有客户端连接的 IO 操作都是在一个线程中处理的. 那么对应到 Reactor 的线程模型中, 我们这样设置 NioEventLoopGroup 时, 就相当于 Reactor 单线程模型.
多线程模型
同理, 再来看一下下面的例子:
EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) ...
bossGroup 中只有一个线程, 而 workerGroup 中的线程是 CPU 核心数乘以2, 因此对应的到 Reactor 线程模型中, 我们知道, 这样设置的 NioEventLoopGroup 其实就是 Reactor 多线程模型.
主从多线程模型
根据上面的代码我们很容易想到主从多线程模式会是如下代码实现:
EventLoopGroup bossGroup = new NioEventLoopGroup(4); EventLoopGroup workerGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) ...
但事实真的是这样的吗?我们来分析一下源码:
我们来看看ServerSocketChannel是如何注册到Selector上的:
我们跟着函数调用栈逐步来分析这其中的来龙去脉:
ServerBootstrap调用bind方法传入想要绑定的端口,ServerBootstrap没有重写bind方法因此调用父类AbstractBootstrap的bind方法,然后调用doBind方法,我们来看看这个方法:
private ChannelFuture doBind(final SocketAddress localAddress) { final ChannelFuture regFuture = initAndRegister();//channel实例化并初始化 final Channel channel = regFuture.channel(); ... doBind0(regFuture, channel, localAddress, promise); }
这个方法中先调用了initAndRegister方法,此方法是用来生成我们前面通过通过handler方法传入的Channel的class类来通过ReflectiveChannelFactory反射生成对应的channel实例并初始化:
final ChannelFuture initAndRegister() {
channel = channelFactory.newChannel();
init(channel);
...
ChannelFuture regFuture = config().group().register(channel);
}
我们进入init方法看看Netty是如何初始化Channel的:
void init(Channel channel) throws Exception {
synchronized (options) {
setChannelOptions(channel, options, logger);//设置channel的Options
}
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());设置channel的attributes
}
}
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);//在channel对应的pipeline中添加由前面handler()传入的handler对象,这种加在主线程上的handler
}
// 接着将在pipeline的末端添加ServerBootstrapAcceptor对象这个任务抛到线程池中处理
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
我们可以看到Netty将添加ServerBootstrapAcceptor这个InBoundHandler异步处理了,为什么这么做很显然这个过程可能会比较耗时,我们进入源码看一看:
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
setChannelOptions(child, childOptions, logger);
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
try {
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
由于ServerBootstrapAcceptor继承ChannelInboundHandlerAdapter,它显然也是一个入站处理器,当有客户端连接时便会执行mainReactor中线程对应生成的pipeline维护的链表的handler各种回调方法,ServerBootstrapAcceptor的channelRead是将客户端对应的channel绑定到子NioEventLoopGroup上,(NioEventLoopGroup调用next方法获取一个EventLoop,next方法通过Round-robin算法来顺序获取线程池中中的线程返回),底层其实还是调用了JDK原生的NIO来讲channel注册到Selector上,到这里其实对应到我们以前写NIO代码:
ServerSocketChannel socketChannel = ServerSocketChannel.open();
socketChannel.configureBlocking(false);
socketChannel.socket().bind(new InetSocketAddress(8899));
Selector selector = Selector.open();
socketChannel.register(selector, SelectionKey.OP_ACCEPT);//对应这一步
但其实这是我们bind端口之后的流程了,初始化channel后调用config().group().register(channel),将channel注册到Selector上,这个Selector和上一个Selector不是同一个,分别为bossEventLoop和workEventLoop生成的,接着我们再回到之前的doBind方法来继续看下去,由于此时Netty已经做好了ServerSocketChannel的初始化工作,接下来就需要绑定端口了,来看看doBind0方法:
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
如果channel初始化成功Netty就会调用channel.bind方法,这个方法里面其实也有不少东西,AbstractChannel调用bind方法调用的是DefaultChannelPipeline的bind方法,然后调用tailContext的bind方法,我们知道DefaultChannelPipeline内部维护了两个变量,分别是tail和head,这是一条由AbstractChannelHandlerContext组成的双向链表,我们知道每一个handler绑定一个ChannelhandlerContext,handlerContext中维护了InBound和OutBound两个变量来控制pipeline中handler处理顺序,InBoundHandler只会处理入站请求数据,OutBoundHandler只会处理出站数据,这种流式设计大大简化了用户写代码的复杂度,handler支持可插拔,当然在handler不建议处理较长时间的业务逻辑例如IO/cpu密集计算等等,最好把任务抛到线程池中即可;而tailConext,headContext既是handler也是handlerContext,tail是一个InBoundHandler,head既是Inboundhandler也是outBoundHandler,我们在回到源码部分:
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
final AbstractChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeBind(localAddress, promise);
} else {
safeExecute(executor, new Runnable() {
@Override
public void run() {
next.invokeBind(localAddress, promise);
}
}, promise, null);
}
return promise;
}
这里有一个findContextOutBound方法我们跟进去看看:
private AbstractChannelHandlerContext findContextOutbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.prev;
} while (!ctx.outbound);
return ctx;
}
这个方法是找到离tail最近的一个outboundHandler的context,而此时pipeline中只有三个handler:从下图调用栈我们可以很明显的看出此时链表中只有三个HandlerContext,而ServerBootstrap是入站处理器,因此这个方法最后返回的是headContext:
调用headContext的bind方法最终调用NioServerSocketChannel的doBind方法,这个方法最终调用了NIObind端口的代码完成端口绑定:
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
上述操作完成后,服务端就进入NioEventLoop的run方法自旋等待请求连接了。
经过这个过程的分析,我们大致了解了channel从初始化到如何注册到Selector以及监听的过程,显然我们会发现虽然可以在ServerBootstrap中配置多个线程的NioEventLoopGroup,但Netty只会调用next方法从中获取一个线程将Channel注册、绑定,最后进行Select()阻塞监听后续所有请求的connect,显然这不符合Reactor主从多线程模式,Netty仅仅使用一个线程来处理连接请求,文章末尾我引用creator of Netty 作者Norman Maurer所说:
multiple boss threads are useful if we share NioEventLoopGroup between different server bootstraps, but I don't see the reason for it.
如果我们用一个线程来处理两个bootstrap的绑定过程,显然这是不合理的,极端情况下一个bootstrap的所有连接请求会被饿死,永远得不到处理。