【Netty】四、事件循环EventLoop与EventLoopGroup

发布时间:2022-09-16 19:00

一、简介

在netty中,事件循环EventLoop是一个很重要的组件,用于处理已注册Channel的各种IO事件,而EventLoopGroup对应了一个或多个EventLoop,可以看做EvenLoopGroup就是EventLoop的集合。下面是EventLoop和EventLoopGroup相关类图:

【Netty】四、事件循环EventLoop与EventLoopGroup_第1张图片

从上面类图可以看到,netty在jdk原生接口ScheduledExecutorService上衍生了EventExecutorGroup接口,其通过next()方法来获取EventExecutor事件执行器,并在ScheduledExecutorService的基础上添加了优雅关闭、是否正在关闭等操作,如下图

clipboard.png

【Netty】四、事件循环EventLoop与EventLoopGroup_第2张图片

而EventLoopGroup继承了EventExecutorGroup接口,重写next()方法并添加注册Channel的操作,如下图

【Netty】四、事件循环EventLoop与EventLoopGroup_第3张图片

EventLoop接口本身比较简单,继承于EventExecutor及EventLoopGroup接口,如下

【Netty】四、事件循环EventLoop与EventLoopGroup_第4张图片

最常使用的 NioEventLoopGroup 和 NioEventLoop,分别继承于抽象类 MultithreadEventLoopGroup 和 SingleThreadEventLoop,而这两个抽象类本身实现不难,其主要是继承了 MultithreadEventExecutorGroup 和 SingleThreadEventExecutor,所以下面来看下 MultithreadEventExecutorGroup 和 SingleThreadEventExecutor 的主要代码逻辑

二、MultithreadEventExecutorGroup和SingleThreadEventExecutor

2.1 MultithreadEventExecutorGroup

MultithreadEventExecutorGroup表示通过多个EventExecutor来处理所提交的任务

2.1.1 重要属性

有两个较为重要的属性children和chooser,children对应EventExecutor数组,而chooser选取器的作用是从children选取EventExecutor来执行任务。如下

// 对应的EventExecutor数组
private final EventExecutor[] children;

// 选取器,作用是从children里选取EventExecutor来执行任务
private final EventExecutorChooserFactory.EventExecutorChooser chooser;

2.1.2 初始化

MultithreadEventExecutorGroup的构造函数会对children和chooser进行初始化,大致步骤如下:

  1. 根据传进来的nThreads线程数来初始化children数组 children = new EventExecutor[nThreads]
  2. 通过newChild()方法来实例化children的每个EventExecutor,newChild()为抽象方法,需要子类(如NioEventLoopGroup)具体实现。如果没有成功的实例化children数组,则逐一优雅关闭EventExecutor
  3. 初始化chooser选取器
  4. 给children中的每个EventExecutor添加terminationListener终止监听器,每有一个EventExecutor终止了,就会将terminatedChildren加1,等到terminatedChildren==children总数时,说明所有的EventExecutor已经全部终止

源码如下:

protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                        EventExecutorChooserFactory chooserFactory, Object... args) {
    checkPositive(nThreads, "nThreads");

    if (executor == null) {
        // 如果传进的executor执行器为空,设置为ThreadPerTaskExecutor执行器,该执行器会单独创建一个线程来处理每个任务
        executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
    }

    // 根据传进来的nThreads线程数来实例化children
    children = new EventExecutor[nThreads];

    for (int i = 0; i < nThreads; i ++) {
        boolean success = false;
        try {
            // newChild作用是生成具体的EventExecutor,其为抽象方法,需要子类(如NioEventLoopGroup)去具体实现
            children[i] = newChild(executor, args);
            success = true;
        } catch (Exception e) {
            throw new IllegalStateException("failed to create a child event loop", e);
        } finally {
            // 如果没有成功的实例化children数组,则逐一优雅关闭EventExecutor
            if (!success) {
                for (int j = 0; j < i; j ++) {
                    children[j].shutdownGracefully();
                }

                // 等待终止所有的EventExecutor
                for (int j = 0; j < i; j ++) {
                    EventExecutor e = children[j];
                    try {
                        while (!e.isTerminated()) {
                            e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                        }
                    } catch (InterruptedException interrupted) {
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            }
        }
    }

    // 初始化chooser选取器
    chooser = chooserFactory.newChooser(children);

    // 给children中的每个EventExecutor添加终止监听器
    // 每有一个EventExecutor终止了,就会将terminatedChildren加1
    // 等到terminatedChildren==children总数时,说明所有的EventExecutor已经全部终止
    final FutureListener terminationListener = new FutureListener() {
        @Override
        public void operationComplete(Future future) throws Exception {
            if (terminatedChildren.incrementAndGet() == children.length) {
                terminationFuture.setSuccess(null);
            }
        }
    };
    for (EventExecutor e: children) {
        e.terminationFuture().addListener(terminationListener);
    }
} 
 

2.1.3 提交任务

MultithreadEventExecutorGroup提交任务的大致流程如下图:
【Netty】四、事件循环EventLoop与EventLoopGroup_第5张图片

提交任务时,MultithreadEventExecutorGroup是直接使用父类AbstractEventExecutorGroup的submit方法来提交,而该submit方法中是通过调用next()方法来选取到某个EventExecutor,再调用EventExecutor的submit()方法来提交的,如下

@Override
public Future submit(Runnable task) {
    return next().submit(task);
}

而next()方法则是通过chooser选取器来选取到某个EventExecutor的,如下

@Override
public EventExecutor next() {
    return chooser.next();
}

2.2 SingleThreadEventExecutor

从上面我们可以得知MultithreadEventExecutorGroup提交任务时,实质上是选取到某个EventExecutor,再由该EventExecutor来进行提交

由于我们常用的NioEventLoop的大多数操作其实是由SingleThreadEventExecutor提供了默认实现(当然NioEventLoop也有其具体的一些操作,后续会详解),所以在深入NioEventLoop之前,有必要先了解一下SingleThreadEventExecutor

2.2.1 重要属性

SingleThreadEventExecutor中有一个存放任务的taskQueue任务队列,还有一个与之绑定的thread线程,还有一些优雅关闭相关属性,如下

// 存放任务的队列
private final Queue taskQueue;

// 与该SingleThreadEventExecutor绑定的thread
private volatile Thread thread;

// 执行器,首次启动时通过该执行器来启动线程,再由该线程来消费taskQueue的任务
private final Executor executor;

// 该属性很重要,表示addTask添加任务时,是否自动唤醒线程,如果不能自动唤醒,需要主动调用wakeup方法来唤醒
// 如:DefaultEventExecutor的addTaskWakesUp为true,而NioEventLoop为false
private final boolean addTaskWakesUp;

// 队列的最大容量
private final int maxPendingTasks;

// 优雅关闭的静默时间
private volatile long gracefulShutdownQuietPeriod;

// 优雅关闭的超时时间
private volatile long gracefulShutdownTimeout;

// 优雅关闭的开始时间
private long gracefulShutdownStartTime;

2.2.2 状态管理

SingleThreadEventExecutor总共有5种状态,如下