Netty 4 线程模型

(本文尚未完工! 本文尚未完工! )

何为线程模型?

说到线程模型,可能是指操作系统如何实现用户级线程,比如many to many,many to one,one to one;也可能是指编程语言对多线程的支持方式,或者直接提供对操作系统线程API的访问,或者在其之上提供一层抽象以屏蔽平台间的差异,或者干脆不提供并发和线程,等等。

而这里讨论的线程模型,是另外一种情况,指的是应用程序或者框架对线程的使用情况:

  • 有哪些线程
  • 如何管理这些线程
  • 线程的生命周期(何时创建何时销毁)
  • 各线程的职责

为什么要了解线程模型?

借用Netty in Action中的一段话:

How and when threads are created obviously has a significant impact on the execution of application code, so developers need to understand the trade-offs associated with different models. This is true whether they choose the model themselves or acquire it implicitly via the adoption of a language or framework.

而我之所以想去了解Netty的线程模型正是因为黑体部分,因为我现在所参与的项目不仅仅是把Netty当做一个网络框架,更把它当做一个业务容器来使用,可以说Netty的线程模型就是我们业务应用的线程模型,90%的业务逻辑都是由Netty的IO线程来驱动的(这种方式是好是坏这里不做讨论)。

还有一段有点“墨菲定律”的意思:

As we pointed out at the start of the chapter, a threading model specifies how code is going to be executed. Because we must always guard against the possible side effects of concurrent execution, it’s important to understand the implications of the model being applied (there are single-thread models as well). Ignoring these matters and merely hoping for the best is tantamount to gambling—with the odds definitely against you.

分析Netty 4 的线程模型

下面通过调试以及阅读相关源码的方式,以一个实际的Netty应用为例,来分析Netty 4的线程模型。

示例所使用的Netty版本为4.1.37.Final。因为我目前所参与的项目的原因,示例代码没有选择使用nio,而是选择了Linux专用的epoll,两者在线程模型上应该是没区别的。

从一个最简单的Netty应用开始:

package com.wallenwang.netty;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.socket.SocketChannel;

public final class Server {
    public static void main(String[] args) throws Exception {
        EventLoopGroup bossGroup = new EpollEventLoopGroup(2);
        EventLoopGroup workerGroup = new EpollEventLoopGroup(4);
        ServerBootstrap b = new ServerBootstrap();

        b.group(bossGroup, workerGroup)
         .channel(EpollServerSocketChannel.class)
         .childHandler(new ChannelInitializer<SocketChannel>() {
             @Override
             public void initChannel(SocketChannel ch) throws Exception {
             }
         });

        // Start the server.
        ChannelFuture f = b.bind(9).sync();

        // Wait until the server socket is closed.
        f.channel().closeFuture().sync();

        // Shut down all event loops to terminate all threads.
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
    }
}

监听9号端口,建立连接和收到消息后什么都不做,没错,这是一个Discard服务

bossGroup和workerGroup的线程数分别设置为2个和4个,那么服务启动后是否会有6个EventLoop线程呢?验证一下:

只有一个EventLoop线程,其余5个线程并没有创建,验证错误。这里引出我们第一个关注点:

EventLoop线程的创建:Lazy Fashion

本文采用EventLoop线程这种叫法,而不是IO线程,因为EventLoop除了IO操作之外,确实也做了些其他事情。

EventLoop的Nio实现和Epoll实现都是每个EventLoop固定对应一个线程,两者是一对一的关系,从实现类的继承关系可以看的出来:

public final class NioEventLoop extends SingleThreadEventLoop {/*...*/}
class EpollEventLoop extends SingleThreadEventLoop {/*...*/}

回到正题,EventLoop对应的线程以Lazy Fashion的方式创建,即:直到EventLoop真正需要开始工作时,其对应的线程才会被创建并启动起来。

EventLoop什么时候开始工作呢?当然是第一次调用EventLoop.execute()执行第一个任务时。一般情况下,第一个任务是向EventLoop注册第一个channel,调用堆栈为证:

当然在执行注册channel的任务之前我们也可以自行调用EventLoop.execute()执行任意其他任务,这同样会导致线程的创建和启动。

看下源码中对应的实现,EpollEventLoopexecute()继承自SingleThreadEventExecutor

    @Override
    public void execute(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        }

        boolean inEventLoop = inEventLoop();
        addTask(task);
        if (!inEventLoop) {
            startThread();
            if (isShutdown()) {
                boolean reject = false;
                try {
                    if (removeTask(task)) {
                        reject = true;
                    }
                } catch (UnsupportedOperationException e) {
                }
                if (reject) {
                    reject();
                }
            }
        }

        if (!addTaskWakesUp && wakesUpForTask(task)) {
            wakeup(inEventLoop);
        }
    }

每次execute()都会检查是否inEventLoop()(当前线程是否是该EventLoop线程),第一次execute()时EventLoop线程还没创建,所以当前线程不可能是EventLoop线程,因此调用startThread()创建并启动线程(其中有状态检查,不会重复启动)。

继续跟踪startThread()可以发现,线程最终是由ThreadPerTaskExecutor.execute()通过DefaultThreadFactory创建的:

    @Override
    public void execute(Runnable command) {
        threadFactory.newThread(command).start();
    }

而创建的线程用来执行这个方法:SingleThreadEventExecutor.this.run()

这样我们只看到一个EventLoop线程就解释的通了,服务启动时只创建了server channel并注册到bossGroup的某个EventLoop中,此时并没有客户端连接上来,因此workerGroup中的EventLoop线程暂时都不会创建。

继续分析,现在通过telnet连接一个客户端上来:

$ telnet localhost 9

可以看到workerGroup的线程也创建出来了,跟上面同样的道理。

继续用telnet创建多个连接,workerGroup的4个线程被陆续创建出来,每创建一个连接就会启动一个线程,直到达到配置的线程数4:

这里就有疑问了:workerGroup中只有4个EventLoop线程,但客户端Channel的数量是没有限制的,那么EventLoop跟Channel的对应关系是怎样的呢?Channel又是如何与EventLoop建立联系的呢?这是我们第二个关注点:

EventLoop的分配策略:Round Robin之后自始而终

新连接的channel会被注册到workerGroup(源码中叫childGroup)中的某个EventLoop上,相关代码ServerBootstrap.channelRead()

        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            final Channel child = (Channel) msg;

            child.pipeline().addLast(childHandler);

            setChannelOptions(child, childOptions, logger);

            for (Entry<AttributeKey<?>, Object> e: childAttrs) {
                child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
            }

            try {
                childGroup.register(child).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (!future.isSuccess()) {
                            forceClose(child, future.cause());
                        }
                    }
                });
            } catch (Throwable t) {
                forceClose(child, t);
            }
        }

EpollEventLoopGroupregister()方法继承自MultithreadEventLoopGroup,因此childGroup.register(child)实现如下( MultithreadEventLoopGroup.register()):

    @Override
    public ChannelFuture register(Channel channel) {
        return next().register(channel);
    }

重点在于next()方法,一路追踪下去可以发现,最终的选择策略由DefaultEventExecutorChooserFactory根据childGroup中EventLoop的数量来选择使用PowerOfTwoEventExecutorChooser或者GenericEventExecutorChooser,两者的选择策略其实都是round-robin,只是前者的按位与操作相比后者的取模效率高些罢了。

当然这只是Netty的缺省策略,用户可以提供自己的EventExecutorChooserFactory实现来自定义选择策略。

一旦Channel被注册到某个EventLoop上,除非手动反注册(deregister),这种绑定关系就是自始而终固定不变的。

因此,EventLoop与Channel的关系是一对多的关系,一个EventLoop可以服务多个Channel,但一个Channel同时只能被一个EventLoop服务,除非重新注册新的EventLoop。

现在我们知道,一个EventLoop管理了好多Channel,那么问题又来了,EventLoop线程具体干了什么呢?在Channel上执行IO操作应该少不了,那ChannelHandler的执行也是由EventLoop线程负责的吗?除此之外,EventLoop线程还会做其他事情吗?这是我们第三个关注点:

EventLoop线程的职责:不仅仅是IO

还记得EventLoop线程被创建后执行的SingleThreadEventExecutor.this.run()吗?答案全在这个run()方法里,对于Epoll实现来说就是EpollEventLoop.run()

@Override
    protected void run() {
        for (;;) {
            try {
                int strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
                switch (strategy) {
                    case SelectStrategy.CONTINUE:
                        continue;

                    case SelectStrategy.BUSY_WAIT:
                        strategy = epollBusyWait();
                        break;

                    case SelectStrategy.SELECT:
                        if (wakenUp == 1) {
                            wakenUp = 0;
                        }
                        if (!hasTasks()) {
                            strategy = epollWait();
                        }
                        // fallthrough
                    default:
                }

                final int ioRatio = this.ioRatio;
                if (ioRatio == 100) {
                    try {
                        if (strategy > 0) {
                            processReady(events, strategy);
                        }
                    } finally {
                        // Ensure we always run tasks.
                        runAllTasks();
                    }
                } else {
                    final long ioStartTime = System.nanoTime();

                    try {
                        if (strategy > 0) {
                            processReady(events, strategy);
                        }
                    } finally {
                        // Ensure we always run tasks.
                        final long ioTime = System.nanoTime() - ioStartTime;
                        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
                }
                if (allowGrowing && strategy == events.length()) {
                    //increase the size of the array as we needed the whole space for the events
                    events.increase();
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
            // Always handle shutdown even if the loop processing threw an exception.
            try {
                if (isShuttingDown()) {
                    closeAll();
                    if (confirmShutdown()) {
                        break;
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
        }
    }

简单来说,EventLoop线程不断的重复做三件事:

  • 监控其所管理的channel的IO是否就绪(epollWait()
  • 对就绪的channel进行IO(processReady()
  • 执行任务队列中的任务(runAllTasks()

这里有几处需要注意的地方:

  • epoll_wait()是被多线程调用的,每个EventLoop线程都会执行。
  • EventLoop可以用来执行其他任务,毕竟EventLoop是一个ScheduledExecutorService

谁来执行ChannelHandler?

优雅的退出

Netty 4线程模型总结

单线程避免同步

为handler指定其他线程

了解线程模型后我们可以做什么?

说点什么

avatar
  订阅  
提醒