Java 的源代码学习(6)——Netty源代码学习:EventLoop 的实现

当客户端的数量暴增时,给每一个客户端都单独分配一个线程的代价实在是太高,所以“单线程”的设计(比如 Redis、Node.JS 等)对提供系统的并发度至关重要(不过在 Netty 中,并不一定只有一个线程)。另一方面,类似于 UI 线程的单线程模型,在单线程模型下就不需要过多地考虑并发问题,这样会简化相当多的开发工作并能减少很多 bug。老夫写这篇博文来重点研究一下 Netty 中 EventLoop 的具体实现。

其实要自己实现一个简单的 EventLoop 也相当简单,用一个死循环不断地执行队列里的任务,在没有任务的时候把自己 block 掉直到下一个任务的到来时即可。但作为一个通用的库,要考虑的应用场景就非常多了,所以就有了下面的类图(PS:这个类图是用 StarUML 画的,我觉得这个软件还行,不过老夫还在 Evaluate,还在纠结是否购买,毕竟太穷)。


相关类图

Netty 中与 EventLoop 相关的类和接口的示意图。

其实仔细看,这个设计并不复杂,只是 Netty 把它抽象的层级太多了(相比于后面博文要学习的 Channel,现在要看的几个类关系实在是太简单,个人不太赞同这种太细化的抽象设计)。图中标注 leaf 的是标准库中最底层的类(没有给出具体实现),乃如果有需要的话得自己实现,更有可能的是作者准备在今后版本再进行实现。最上层的AbstractExecutorService 类、ExecutorService 和 ScheduledExecutorService 接口是 JDK 的标准库,老夫原本应该在上一篇博文学习这些东西(包括各种 ThreadPool 的实现),但是由于太懒所以只好先烂尾一段时间。


定时任务的实现类

AbstractEventExecutor 是 Netty 中除了 JDK 标准库以外的最顶层的 EventLoop 抽象类。主要提供了一个 parent 字段记录它输入哪个 EventExecutorGroup,并提供了一些包装方法,没有什么好看的。

AbstractScheduledEventExecutor 提供了一个计划任务的机制,说白了就是可以指定这个任务是在多久以后开始或是设置一个定时任务。在实现上,它提供了一个优先队列来存放这些任务:

PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue;

Netty 没有使用 JDK 的标准 PriorityQueue 而是自己写了一个 DefaultPriorityQueue 的实现,由于与满大街的二叉堆实现区别不大所以这里就忽略掉。这个优先队列的元素是 ScheduledFutureTask<V> 类型,实现的 compareTo 方法就是谁的计划执行时间越近谁的优先级就越高。这就可以看出它这个执行的时间也是不精确的,由于在一个线程中处理,前一个线程耗的时间太多有可能导致后续任务推迟执行。不过这个在通常的场景中是可以忍受的,如果乃需要非常精确的时间控制,那么乃就需要继承 AbstractEventExecutor 自己来实现啦。

ScheduledFutureTask 类的 run 方法用来具体执行这个任务:

    @Override
    public void run() {
        assert executor().inEventLoop();
        try {
            if (periodNanos == 0) {
                if (setUncancellableInternal()) {
                    V result = task.call();
                    setSuccessInternal(result);
                }
            } else {
                // check if is done as it may was cancelled
                if (!isCancelled()) {
                    task.call();
                    if (!executor().isShutdown()) {
                        long p = periodNanos;
                        if (p > 0) {
                            deadlineNanos += p;
                        } else {
                            deadlineNanos = nanoTime() - p;
                        }
                        if (!isCancelled()) {
                            // scheduledTaskQueue can never be null as we lazy init it before submit the task!
                            Queue<ScheduledFutureTask<?>> scheduledTaskQueue =
                                    ((AbstractScheduledEventExecutor) executor()).scheduledTaskQueue;
                            assert scheduledTaskQueue != null;
                            scheduledTaskQueue.add(this);
                        }
                    }
                }
            }
        } catch (Throwable cause) {
            setFailureInternal(cause);
        }
    }
  • 5-9 行:periodNanos 用来记录这个任务的定期执行的周期。为零时表示执行一次,此时先标记这个任务不能被取消了(已经准备开始执行了),执行完后设置这个 Promise 对象的执行结果,并通知相关的监听者(代码省略)。
  • deadlineNanos 记录这个任务在什么时候到期,periodNanos 不为零的情况表示其有执行周期,具体代表什么意思我们不管它。总之思路就是,执行了周期任务之后,再次算出一个新的执行时间并且把它插入到优先队列里,如果这个任务被取消了,那么就不放到队列里去。这个实现算是非常简单且明了。此外由于是定时任务,所以这里不会去设置其 result,call 完就不管了。

此外,AbstractScheduledEventExecutor 提供了一些方法来查看或者取出最近要执行的任务以便子类调用,基本上就是在优先队列里找最高优先级(最近要执行)的任务,这个就不需要看了。需要注意的是这个类的 schedule 方法:

    <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
        if (inEventLoop()) {
            scheduledTaskQueue().add(task);
        } else {
            execute(new Runnable() {
                @Override
                public void run() {
                    scheduledTaskQueue().add(task);
                }
            });
        }

        return task;
    }

当不在 event loop 线程时,会调用 Executor 接口定义的 execute 方法,这个方法在子类 SingleThreadEventExecutor 才给出实现,那么我们接下来就看 SingleThreadEventExecutor 类,这个类就是“单线程模型”的核心了。


SingleThreadEventExecutor 和 SingleThreadEventLoop 类

与传统的线程类相似,本类定义了 5 种线程状态:ST_NOT_STARTED、ST_STARTED、ST_SHUTTING_DOWN、ST_SHUTDOWN、ST_TERMINATED,但是没有 BLOCKED 和 WAITING 状态,毕竟单线程 Loop 不应该被其他线程阻塞。与之相关的,定义了 Thread 类型的成员变量来包装真正的线程:

    private volatile Thread thread;
    @SuppressWarnings("unused")
    private volatile ThreadProperties threadProperties;
    private final Executor executor;
    private volatile boolean interrupted;

具体的任务则是放在了一个 Queue<Runnable> 类型的 taskQueue里。其用到的具体类型时 LinkedBlockingQueue,这个非常符合常理,要老夫来实现也会用这个。

还有一些其他成员变量,在接下来的代码中我们会见到:

    private final Semaphore threadLock = new Semaphore(0);
    private final Set<Runnable> shutdownHooks = new LinkedHashSet<Runnable>();
    private final boolean addTaskWakesUp;
    private final int maxPendingTasks;
    private final RejectedExecutionHandler rejectedExecutionHandler;

    private long lastExecutionTime;

    @SuppressWarnings({ "FieldMayBeFinal", "unused" })
    private volatile int state = ST_NOT_STARTED;

    private volatile long gracefulShutdownQuietPeriod;
    private volatile long gracefulShutdownTimeout;
    private long gracefulShutdownStartTime;

    private final Promise<?> terminationFuture = new DefaultPromise<Void>(GlobalEventExecutor.INSTANCE);

子类使用此类时,比如 DefaultEventLoop,会使用一个 for (;;) 的循环来不断调用 takeTask 方法去执行,如果此时没有任务,会把自己给 block 掉:

    protected Runnable takeTask() {
    ......
        for (;;) {
            ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
            if (scheduledTask == null) {
                Runnable task = null;
                try {
                    task = taskQueue.take();
                    if (task == WAKEUP_TASK) {
                        task = null;
                    }
                } catch (InterruptedException e) {
                    // Ignore
                }
                return task;

先看一下父类 AbstractScheduledEventExecutor 里有没有要执行的定时任务,如果没有则取 taskQueue 里要执行的任务(此时如果没有任务,take 方法会把当前线程 block 掉)。这里有一个 WAKEUP_TASK,是用在线程关闭的时候的(要合理地关闭掉线程很麻烦,这里不做深究)。

            } else {
                long delayNanos = scheduledTask.delayNanos();
                Runnable task = null;
                if (delayNanos > 0) {
                    try {
                        task = taskQueue.poll(delayNanos, TimeUnit.NANOSECONDS);
                    } catch (InterruptedException e) {
                        // Waken up.
                        return null;
                    }
                }
                if (task == null) {
                    // We need to fetch the scheduled tasks now as otherwise there may be a chance that
                    // scheduled tasks are never executed if there is always one task in the taskQueue.
                    // This is for example true for the read task of OIO Transport
                    // See https://github.com/netty/netty/issues/1614
                    fetchFromScheduledTaskQueue();
                    task = taskQueue.poll();
                }

                if (task != null) {
                    return task;
                }
            }
        }
    }
  • 第 6 行:在下一个定时任务到来之前,我们尝试往 taskQueue 里取任务,如果没有任务则一直 block 到下一个定时任务来临(poll 方法超时返回)。
  • 第 11 行:这个老夫暂时还没理解,以后有时间再研究。

NioEventLoop 类

DefaultEventLoop 类就是一个 for(;;) 循环来不断地执行 takeTask,更高级一点的就是这个 NioEventLoop,它支持了 Selector 与 channel 绑定,并且它并不使用上面的 takeTask 方法。

从 Java 1.5 开始,Java NIO 的底层实现从 select/poll 替换成了 epoll,可随之而来的也带来一个很严重的 bug(准确地说是 Linux 实现 poll/epoll 的 bug),那就是 select 方法可能在没有任何事件发生的时候返回(想到了 Object.wait 的虚假唤醒),这就导致 for(;;) 不断循环致使 CPU 升至 100%(而且貌似仍然修复这个 bug)。那么 Netty 就要解决(准确地说是绕过)这个 bug,不然如果程序碰到了这个问题那就没法用啦。

略过 bind 和 register 的相关代码(为了减少篇幅以及偷懒),我们直接看 select 方法:

    private void select(boolean oldWakenUp) throws IOException {
        Selector selector = this.selector;
        try {
            int selectCnt = 0;
            long currentTimeNanos = System.nanoTime();
            long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);

            for (;;) {
                long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
                if (timeoutMillis <= 0) {
                    if (selectCnt == 0) {
                        selector.selectNow();
                        selectCnt = 1;
                    }
                    break;
                }

这里的 selectCnt 是一个计数器,用来记录有多少次“疑似”假唤醒的 select。第 6 行的 delayNanos 方法的访问父类 SingleThreadEventExecutor 的相关方法,实际上是调用的 AbstractScheduledEventExecutor 的 peek 方法来看一下计划任务里最近一次任务要执行的时间(之前分析过了)。如果已经超时了或者还差 0.5 毫秒,则执行 selectNow 语句让 selector 立即返回,酱紫就可以去处理即将执行的计划任务了(因为是单线程)。

                if (hasTasks() && wakenUp.compareAndSet(false, true)) {
                    selector.selectNow();
                    selectCnt = 1;
                    break;
                }

这里的 hasTask 是父类 SingleThreadEventLoop 的方法,这个类定义了一个 tailTasks 的成员变量。这个队列用来在 event loop 迭代完后执行(暂时还没有用,而且貌似会被 remove)。

                int selectedKeys = selector.select(timeoutMillis);
                selectCnt ++;

                if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
                    break;
                }

......(省略部分代码)

在确认没有定时任务以后,就可以放心的调用 select 方法把自己阻塞掉直到事件发生。在 timeoutMillis 以后由于有定时任务,如果此前没有事件发生的话,select 就需要超时返回。接着,我们就判断一下是否真的有事件发生(注意上面的虚假唤醒的情况),如果有,或者有定时任务需要执行的话(或者被 wake ),就 break 掉这个 for 循环回到上层的 run 方法去执行具体任务(这个方法将在后面分析)。

                long time = System.nanoTime();
                if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
                    // timeoutMillis elapsed without anything selected.
                    selectCnt = 1;
                } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                        selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                    // The code exists in an extra method to ensure the method is not too big to inline as this
                    // branch is not very likely to get hit very frequently.
                    selector = selectRebuildSelector(selectCnt);
                    selectCnt = 1;
                    break;
                }

                currentTimeNanos = time;
            }

            ......(省略部分代码)
        }
    }

这里就是处理 epoll bug 的逻辑。当 select 超过 SELECTOR_AUTO_REBUILD_THRESHOLD(默认为 512 次)次没有选到东西时(此时的判断条件是在下次定时任务来临之前被唤醒 512 次,如果没有定时任务则为 0.5 毫秒以后),Netty 认为极有可能出现了 epoll 的 bug,就会强行把 selector 重新建立,强制所有 channel 与新的 selector 绑定,简单粗暴地抛弃掉出现 bug 的 selector 重新来一次。

这个 select 方法在 run 方法里面调用的:

    @Override
    protected void run() {
        for (;;) {
            try {
                try {
                    switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                    case SelectStrategy.CONTINUE:
                        continue;

                    case SelectStrategy.BUSY_WAIT:
                        // fall-through to SELECT since the busy-wait is not supported with NIO

                    case SelectStrategy.SELECT:
                        select(wakenUp.getAndSet(false));
                        if (wakenUp.get()) {
                            selector.wakeup();
                        }
                        // fall through
                    default:
                    }
                } catch (IOException e) {
                    // If we receive an IOException here its because the Selector is messed up. Let's rebuild
                    // the selector and retry. https://github.com/netty/netty/issues/8566
                    rebuildSelector0();
                    handleLoopException(e);
                    continue;
                }

这里的 wakenUp 相关的判断是为了绕过一些 bug,具体乃自己看代码吧。而 catch 到的 IOException 则是因为 selectNow 方法可能抛出的异常(这个在接口里定义了),由于 Netty 不知道也不关心这个 Selector 的具体实现,所以出问题时只好 rebuild 一下来尝试解决。

                cancelledKeys = 0;
                needsToSelectAgain = false;
                final int ioRatio = this.ioRatio;
                if (ioRatio == 100) {
                    try {
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        runAllTasks();
                    }
                } else {
                    final long ioStartTime = System.nanoTime();
                    try {
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        final long ioTime = System.nanoTime() - ioStartTime;
                        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }

    ......(省略其他代码)
    }

NioEventLoop 提供了一个 IoRatio 来设置 I/O 操作在整个事件循环中所大致占用的事件(默认为 50%)。如果不是破罐破摔的 100%,就计算一个非 IO 的时间来 runAllTasks。这个 runAllTasks 实际上是 SingleThreadEventExecutor 定义的,我们当时没有看这个方法的代码,因为是在这里用到了:

    protected boolean runAllTasks(long timeoutNanos) {
        fetchFromScheduledTaskQueue();
        Runnable task = pollTask();
        if (task == null) {
            afterRunningAllTasks();
            return false;
        }

        final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
        long runTasks = 0;
        long lastExecutionTime;
        for (;;) {
            safeExecute(task);

            runTasks ++;

            // Check timeout every 64 tasks because nanoTime() is relatively expensive.
            // XXX: Hard-coded value - will make it configurable if it is really a problem.
            if ((runTasks & 0x3F) == 0) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                if (lastExecutionTime >= deadline) {
                    break;
                }
            }

            task = pollTask();
            if (task == null) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                break;
            }
        }

        afterRunningAllTasks();
        this.lastExecutionTime = lastExecutionTime;
        return true;
    }
  • 13 行:safeExecute 就是包装了一个 try-catch 的 task.run 方法,无视掉发生的错误。
  • 19 行:每执行 64 个任务时才检查一下时间,频繁调用 System.nanoTime 相较于执行简单的 task 会浪费掉更多事件。

最后来看一下这个类时如何处理 selector 事件的。在 selector 创建的时候,NioEventLoop 类会用反射查找 Selector 存放感兴趣的事件的 HashSet 保存到 SelectedSelectionKeySet 里面,这个实际上是一个长度最少为 1024 的数组。如果失败了(因为 Selectors 的实现方法不可知),就只能调用迭代器去遍历了,效率会差一些,这里我们只看已经优化过的情况。

    private void processSelectedKeysOptimized() {
        for (int i = 0; i < selectedKeys.size; ++i) {
            final SelectionKey k = selectedKeys.keys[i];
            // null out entry in the array to allow to have it GC'ed once the Channel close
            // See https://github.com/netty/netty/issues/2363
            selectedKeys.keys[i] = null;

            final Object a = k.attachment();

            if (a instanceof AbstractNioChannel) {
                processSelectedKey(k, (AbstractNioChannel) a);
            } else {
                @SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                processSelectedKey(k, task);
            }

            if (needsToSelectAgain) {
                // null out entries in the array to allow to have it GC'ed once the Channel close
                // See https://github.com/netty/netty/issues/2363
                selectedKeys.reset(i + 1);

                selectAgain();
                i = -1;
            }
        }
    }

可以看出,每个 key 都在 attachment 字段记录了相关的 AbstractNioChannel 或者 NioTask(在 Channel 注册到 Selector 上时记录),在事件发生时,这个 attachment 就是发生事件时的 Channel(因为一个 Selector 可能绑定成千上万的 Channel)。我们接下来看 processSelectedKey:

    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        ......(省略处理 key 已经失效的代码)

        try {
            int readyOps = k.readyOps();
            // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
            // the NIO JDK channel implementation may throw a NotYetConnectedException.
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
                // See https://github.com/netty/netty/issues/924
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);

                unsafe.finishConnect();
            }

            // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                ch.unsafe().forceFlush();
            }

            // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
            // to a spin loop
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }

这里直接拿到了 Channel 对应的 Unsafe 对象进行操作,主要是为了方便处理各种奇奇怪怪的意外情况给 NIO 和 JDK 填坑。注释已经说的很清除了。


到这里 EventLoop 最主要的代码就已经看完了,从下一篇博客开始就要进入 Netty 的核心:Channel 的世界了。

发表评论

Fill in your details below or click an icon to log in:

WordPress.com 徽标

You are commenting using your WordPress.com account. Log Out /  更改 )

Google photo

You are commenting using your Google account. Log Out /  更改 )

Twitter picture

You are commenting using your Twitter account. Log Out /  更改 )

Facebook photo

You are commenting using your Facebook account. Log Out /  更改 )

Connecting to %s