一、简介
在netty中,事件循环EventLoop
是一个很重要的组件,用于处理已注册Channel
的各种IO事件
,而EventLoopGroup对应了一个或多个EventLoop,可以看做EvenLoopGroup就是EventLoop的集合
。下面是EventLoop和EventLoopGroup相关类图:
从上面类图可以看到,netty在jdk原生接口ScheduledExecutorService上衍生了EventExecutorGroup接口
,其通过next()方法
来获取EventExecutor事件执行器,并在ScheduledExecutorService的基础上添加了优雅关闭、是否正在关闭
等操作,如下图
而EventLoopGroup继承了EventExecutorGroup接口,重写next()方法
并添加注册Channel
的操作,如下图
EventLoop接口本身比较简单,继承于EventExecutor及EventLoopGroup接口,如下
最常使用的 NioEventLoopGroup 和 NioEventLoop,分别继承于抽象类 MultithreadEventLoopGroup 和 SingleThreadEventLoop
,而这两个抽象类本身实现不难,其主要是继承了 MultithreadEventExecutorGroup 和 SingleThreadEventExecutor
,所以下面来看下 MultithreadEventExecutorGroup 和 SingleThreadEventExecutor 的主要代码逻辑
二、MultithreadEventExecutorGroup和SingleThreadEventExecutor
2.1 MultithreadEventExecutorGroup
MultithreadEventExecutorGroup表示通过多个EventExecutor来处理所提交的任务
2.1.1 重要属性
有两个较为重要的属性children和chooser
,children对应EventExecutor数组,而chooser选取器的作用是从children选取EventExecutor来执行任务。如下
// 对应的EventExecutor数组
private final EventExecutor[] children;
// 选取器,作用是从children里选取EventExecutor来执行任务
private final EventExecutorChooserFactory.EventExecutorChooser chooser;
2.1.2 初始化
MultithreadEventExecutorGroup的构造函数会对children和chooser进行初始化,大致步骤如下:
- 根据传进来的nThreads线程数来初始化children数组
children = new EventExecutor[nThreads]
- 通过
newChild()方法
来实例化children的每个EventExecutor,newChild()为抽象方法,需要子类(如NioEventLoopGroup)具体实现。如果没有成功的实例化children数组,则逐一优雅关闭EventExecutor - 初始化chooser选取器
- 给children中的每个EventExecutor添加
terminationListener终止监听器
,每有一个EventExecutor终止了,就会将terminatedChildren加1,等到terminatedChildren==children总数时,说明所有的EventExecutor已经全部终止
源码如下:
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
checkPositive(nThreads, "nThreads");
if (executor == null) {
// 如果传进的executor执行器为空,设置为ThreadPerTaskExecutor执行器,该执行器会单独创建一个线程来处理每个任务
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
// 根据传进来的nThreads线程数来实例化children
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
// newChild作用是生成具体的EventExecutor,其为抽象方法,需要子类(如NioEventLoopGroup)去具体实现
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
// 如果没有成功的实例化children数组,则逐一优雅关闭EventExecutor
if (!success) {
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}
// 等待终止所有的EventExecutor
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
Thread.currentThread().interrupt();
break;
}
}
}
}
}
// 初始化chooser选取器
chooser = chooserFactory.newChooser(children);
// 给children中的每个EventExecutor添加终止监听器
// 每有一个EventExecutor终止了,就会将terminatedChildren加1
// 等到terminatedChildren==children总数时,说明所有的EventExecutor已经全部终止
final FutureListener
2.1.3 提交任务
MultithreadEventExecutorGroup提交任务的大致流程如下图:
提交任务时,MultithreadEventExecutorGroup是直接使用父类AbstractEventExecutorGroup的submit方法来提交,而该submit方法中是通过调用next()方法来选取到某个EventExecutor,再调用EventExecutor的submit()方法来提交的
,如下
@Override
public Future> submit(Runnable task) {
return next().submit(task);
}
而next()方法则是通过chooser选取器来选取到某个EventExecutor的,如下
@Override
public EventExecutor next() {
return chooser.next();
}
2.2 SingleThreadEventExecutor
从上面我们可以得知MultithreadEventExecutorGroup提交任务时,实质上是选取到某个EventExecutor,再由该EventExecutor来进行提交
。
由于我们常用的NioEventLoop的大多数操作其实是由SingleThreadEventExecutor提供了默认实现
(当然NioEventLoop也有其具体的一些操作,后续会详解),所以在深入NioEventLoop之前,有必要先了解一下SingleThreadEventExecutor
2.2.1 重要属性
SingleThreadEventExecutor中有一个存放任务的taskQueue任务队列
,还有一个与之绑定的thread线程
,还有一些优雅关闭相关属性,如下
// 存放任务的队列
private final Queue taskQueue;
// 与该SingleThreadEventExecutor绑定的thread
private volatile Thread thread;
// 执行器,首次启动时通过该执行器来启动线程,再由该线程来消费taskQueue的任务
private final Executor executor;
// 该属性很重要,表示addTask添加任务时,是否自动唤醒线程,如果不能自动唤醒,需要主动调用wakeup方法来唤醒
// 如:DefaultEventExecutor的addTaskWakesUp为true,而NioEventLoop为false
private final boolean addTaskWakesUp;
// 队列的最大容量
private final int maxPendingTasks;
// 优雅关闭的静默时间
private volatile long gracefulShutdownQuietPeriod;
// 优雅关闭的超时时间
private volatile long gracefulShutdownTimeout;
// 优雅关闭的开始时间
private long gracefulShutdownStartTime;
2.2.2 状态管理
SingleThreadEventExecutor总共有5种状态,如下
- ST_NOT_STARTED (未启动)
- ST_STARTED (启动)
- ST_SHUTTING_DOWN (关闭中)
- ST_SHUTDOWN (已关闭)
- ST_TERMINATED (已终止)