博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Netty NioEventLoop 创建过程源码分析
阅读量:6605 次
发布时间:2019-06-24

本文共 21958 字,大约阅读时间需要 73 分钟。

原文:

,我们分析了Netty中的Channel组件,本篇我们来介绍一下与Channel关联的另一个核心的组件 —— EventLoop

Netty版本:4.1.30

概述

EventLoop定义了Netty的核心抽象,用于处理网络连接生命周期中所有发生的事件。

我们先来从一个比较高的视角来了解一下Channels、Thread、EventLoops、EventLoopGroups之间的关系。

上图是表示了拥有4个EventLoop的EventLoopGroup处理IO的流程图。它们之间的关系如下:

  • 一个 EventLoopGroup包含一个或多个EventLoop
  • 一个 EventLoop在它的生命周期内只和一个Thread绑定
  • 所有由EventLoop处理的I/O事件都将在它专有的Thread上被处理
  • 一个Channel在它的生命周期内只注册于一个EventLoop
  • 一个EventLoop可能会被分配给一个或多个Channel

EventLoop 原理

下图是Netty EventLoop相关类的UML图。从中我们可以看到EventLoop相关的类都是实现了 java.util.concurrent 包中的 ExecutorService 接口。我们可以直接将任务(Runable 或 Callable) 提交给EventLoop去立即执行或定时执行。

例如,使用EventLoop去执行定时任务,样例代码:

public static void scheduleViaEventLoop() {    Channel ch = new NioSocketChannel();    ScheduledFuture
future = ch.eventLoop().schedule( () -> System.out.println("60 seconds later"), 60, TimeUnit.SECONDS);}复制代码

Thread 管理

Netty线程模型的高性能主要取决于当前所执行线程的身份的确定。一个线程提交到EventLoop执行的流程如下:

  • 将Task任务提交给EventLoop执行
  • 在Task传递到execute方法之后,检查当前要执行的Task的线程是否是分配给EventLoop的那个线程
  • 如果是,则该线程会立即执行
  • 如果不是,则将线程放入任务队列中,等待下一次执行

其中,Netty中的每一个EventLoop都有它自己的任务队列,并且和其他的EventLoop的任务队列独立开来。

Thread 分配

服务于Channel的I/O和事件的EventLoop包含在EventLoopGroup中。根据不同的传输实现,EventLoop的创建和分配方式也不同。

NIO传输

在NIO传输方式中,使用尽可能少的EventLoop就可以服务多个Channel。如图所示,EventLoopGroup采用顺序循环的方式负责为每一个新创建的Channel分配EventLoop,每一个EventLoop会被分配给多个Channels。

一旦一个Channel被分配给了一个EventLoop,则这个Channel的生命周期内,只会绑定这个EventLoop。这就让我们在ChannelHandler的实现省去了对线程安全和同步问题的担心。

OIO传输

与NIO方式的不同在于,一个EventLoop只会服务于一个Channel。

NioEventLoop & NioEventLoopGroup 创建

初步了解了 EventLoop 以及 EventLoopGroup 的工作机制,接下来我们以 NioEventLoopGroup 为例,来深入分析 NioEventLoopGroup 是如何创建的,又是如何启动的,它的内部执行逻辑又是怎样的等等问题。

MultithreadEventExecutorGroup 构造器

我们从 NioEventLoopGroup 的构造函数开始分析:

EventLoopGroup acceptorEventLoopGroup = new NioEventLoopGroup(1);复制代码

NioEventLoopGroup构造函数会调用到父类 MultithreadEventLoopGroup 的构造函数,默认情况下,EventLoop的数量 = 处理器数量 x 2:

public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {    private static final InternalLogger logger = InternalLoggerFactory.getInstance(MultithreadEventLoopGroup.class);    private static final int DEFAULT_EVENT_LOOP_THREADS;    // 默认情况下,EventLoop的数量 = 处理器数量 x 2    static {        DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(                "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));        if (logger.isDebugEnabled()) {            logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);        }    }    protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args)    {        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);    }    ...}复制代码

继续调用父类,会调用到 MultithreadEventExecutorGroup 的构造器,主要做三件事情:

  • 创建线程任务执行器 ThreadPerTaskExecutor
  • 通过for循环创建数量为 nThreads 个的 EventLoop
  • 创建 EventLoop 选择器 EventExecutorChooser
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,                                        EventExecutorChooserFactory chooserFactory, Object... args) {    if (nThreads <= 0) {        throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));    }    // 创建任务执行器 ThreadPerTaskExecutor    if (executor == null) {        executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());    }    // 创建 EventExecutor 数组    children = new EventExecutor[nThreads];    // 通过for循环创建数量为 nThreads 个的 EventLoop    for (int i = 0; i < nThreads; i ++) {        boolean success = false;        try {            // 调用 newChild 接口            children[i] = newChild(executor, args);            success = true;        } catch (Exception e) {            // TODO: Think about if this is a good exception type            throw new IllegalStateException("failed to create a child event loop", e);        } finally {            if (!success) {                for (int j = 0; j < i; j ++) {                    children[j].shutdownGracefully();                }                for (int j = 0; j < i; j ++) {                    EventExecutor e = children[j];                    try {                        while (!e.isTerminated()) {                            e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);                        }                    } catch (InterruptedException interrupted) {                        // Let the caller handle the interruption.                        Thread.currentThread().interrupt();                        break;                    }                }            }        }    }	    // 创建选择器    chooser = chooserFactory.newChooser(children);    final FutureListener terminationListener = new FutureListener() {        @Override        public void operationComplete(Future future) throws Exception {            if (terminatedChildren.incrementAndGet() == children.length) {                terminationFuture.setSuccess(null);            }        }    };    for (EventExecutor e: children) {        e.terminationFuture().addListener(terminationListener);    }    Set
childrenSet = new LinkedHashSet
(children.length); Collections.addAll(childrenSet, children); readonlyChildren = Collections.unmodifiableSet(childrenSet);}复制代码

创建线程任务执行器 ThreadPerTaskExecutor

if (executor == null) {    executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());}复制代码

线程任务执行器 ThreadPerTaskExecutor 源码如下,具体的任务都由 ThreadFactory 去执行:

public final class ThreadPerTaskExecutor implements Executor {    private final ThreadFactory threadFactory;    public ThreadPerTaskExecutor(ThreadFactory threadFactory) {        if (threadFactory == null) {            throw new NullPointerException("threadFactory");        }        this.threadFactory = threadFactory;    }	    // 使用 threadFactory 执行任务    @Override    public void execute(Runnable command) {        threadFactory.newThread(command).start();    }}复制代码

来看看 newDefaultThreadFactory 方法:

protected ThreadFactory newDefaultThreadFactory() {    return new DefaultThreadFactory(getClass());}复制代码

DefaultThreadFactory

接下来看看 DefaultThreadFactory 这个类,实现了 ThreadFactory 接口,我们可以了解到:

  • EventLoopGroup的命名规则
  • 具体的线程为 FastThreadLocalThread
public class DefaultThreadFactory implements ThreadFactory {    	// 线程池ID编号自增器    private static final AtomicInteger poolId = new AtomicInteger();	// 线程ID自增器    private final AtomicInteger nextId = new AtomicInteger();    // 线程名称前缀    private final String prefix;    // 是否为守护进程    private final boolean daemon;    // 线程优先级    private final int priority;    // 线程组    protected final ThreadGroup threadGroup;    public DefaultThreadFactory(Class
poolType) { this(poolType, false, Thread.NORM_PRIORITY); } ... // 获取线程名,返回结果:nioEventLoopGroup public static String toPoolName(Class
poolType) { if (poolType == null) { throw new NullPointerException("poolType"); } String poolName = StringUtil.simpleClassName(poolType); switch (poolName.length()) { case 0: return "unknown"; case 1: return poolName.toLowerCase(Locale.US); default: if (Character.isUpperCase(poolName.charAt(0)) && Character.isLowerCase(poolName.charAt(1))) { return Character.toLowerCase(poolName.charAt(0)) + poolName.substring(1); } else { return poolName; } } } public DefaultThreadFactory(String poolName, boolean daemon, int priority, ThreadGroup threadGroup) { if (poolName == null) { throw new NullPointerException("poolName"); } if (priority < Thread.MIN_PRIORITY || priority > Thread.MAX_PRIORITY) { throw new IllegalArgumentException( "priority: " + priority + " (expected: Thread.MIN_PRIORITY <= priority <= Thread.MAX_PRIORITY)"); } // nioEventLoopGroup-2- prefix = poolName + '-' + poolId.incrementAndGet() + '-'; this.daemon = daemon; this.priority = priority; this.threadGroup = threadGroup; } public DefaultThreadFactory(String poolName, boolean daemon, int priority) { this(poolName, daemon, priority, System.getSecurityManager() == null ? Thread.currentThread().getThreadGroup() : System.getSecurityManager().getThreadGroup()); } @Override public Thread newThread(Runnable r) { // 创建新线程 nioEventLoopGroup-2-1 Thread t = newThread(FastThreadLocalRunnable.wrap(r), prefix + nextId.incrementAndGet()); try { if (t.isDaemon() != daemon) { t.setDaemon(daemon); } if (t.getPriority() != priority) { t.setPriority(priority); } } catch (Exception ignored) { // Doesn't matter even if failed to set. } return t; } // 创建新线程 FastThreadLocalThread protected Thread newThread(Runnable r, String name) { return new FastThreadLocalThread(threadGroup, r, name); } }复制代码

创建NioEventLoop

继续从 MultithreadEventExecutorGroup 构造器开始,创建完任务执行器 ThreadPerTaskExecutor 之后,进入for循环,开始创建 NioEventLoop:

for (int i = 0; i < nThreads; i ++) {    boolean success = false;    try {        // 创建 nioEventLoop        children[i] = newChild(executor, args);        success = true;    } catch (Exception e) {        // TODO: Think about if this is a good exception type        throw new IllegalStateException("failed to create a child event loop", e);    }        ... 	}    复制代码

NioEventLoopGroup类中的 newChild() 方法:

@Overrideprotected EventLoop newChild(Executor executor, Object... args) throws Exception {    return new NioEventLoop(this, executor, (SelectorProvider) args[0],        ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);}复制代码

NioEventLoop 构造器:

public final class NioEventLoop extends SingleThreadEventLoop{        ...        NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {        // 调用父类 SingleThreadEventLoop 构造器        super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);        if (selectorProvider == null) {            throw new NullPointerException("selectorProvider");        }        if (strategy == null) {            throw new NullPointerException("selectStrategy");        }        // 设置 selectorProvider        provider = selectorProvider;        // 获取 SelectorTuple 对象,里面封装了原生的selector和优化过的selector        final SelectorTuple selectorTuple = openSelector();        // 设置优化过的selector        selector = selectorTuple.selector;        // 设置原生的selector        unwrappedSelector = selectorTuple.unwrappedSelector;        // 设置select策略        selectStrategy = strategy;    }		...    }复制代码

接下来我们看看 获取多路复用选择器 方法—— openSelector() ,

// selectKey 优化选项flagprivate static final boolean DISABLE_KEYSET_OPTIMIZATION =    SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization", false);private SelectorTuple openSelector() {    // JDK原生的selector    final Selector unwrappedSelector;    try {        // 通过 SelectorProvider 创建获得selector        unwrappedSelector = provider.openSelector();    } catch (IOException e) {        throw new ChannelException("failed to open a new selector", e);    }    // 如果不优化,则直接返回    if (DISABLE_KEYSET_OPTIMIZATION) {        return new SelectorTuple(unwrappedSelector);    }    // 通过反射创建 sun.nio.ch.SelectorImpl 对象    Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction() {        @Override        public Object run() {            try {                return Class.forName(                        "sun.nio.ch.SelectorImpl",                        false,                        PlatformDependent.getSystemClassLoader());            } catch (Throwable cause) {                return cause;            }        }    });        // 如果 maybeSelectorImplClass 不是 selector 的一个实现,则直接返回原生的Selector     if (!(maybeSelectorImplClass instanceof Class) ||        // ensure the current selector implementation is what we can instrument.        // 确保当前的选择器实现是我们可以检测的        !((Class
) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) { if (maybeSelectorImplClass instanceof Throwable) { Throwable t = (Throwable) maybeSelectorImplClass; logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t); } return new SelectorTuple(unwrappedSelector); } // maybeSelectorImplClass 是selector的实现,则转化为 selector 实现类 final Class
selectorImplClass = (Class
) maybeSelectorImplClass; // 创建新的 SelectionKey 集合 SelectedSelectionKeySet,内部采用的是 SelectionKey 数组的形 // 式,而非 set 集合 final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet(); Object maybeException = AccessController.doPrivileged(new PrivilegedAction() { @Override public Object run() { try { // 通过反射的方式获取 sun.nio.ch.SelectorImpl 的成员变量 selectedKeys Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys"); // 通过反射的方式获取 sun.nio.ch.SelectorImpl 的成员变量 publicSelectedKeys Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys"); if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) { // Let us try to use sun.misc.Unsafe to replace the SelectionKeySet. // This allows us to also do this in Java9+ without any extra flags. long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField); long publicSelectedKeysFieldOffset = PlatformDependent.objectFieldOffset(publicSelectedKeysField); if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) { PlatformDependent.putObject( unwrappedSelector, selectedKeysFieldOffset, selectedKeySet); PlatformDependent.putObject(unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet); return null; } // We could not retrieve the offset, lets try reflection as last-resort. } // 设置字段 selectedKeys Accessible 为true Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true); if (cause != null) { return cause; } // 设置字段 publicSelectedKeys Accessible 为true cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true); if (cause != null) { return cause; } selectedKeysField.set(unwrappedSelector, selectedKeySet); publicSelectedKeysField.set(unwrappedSelector, selectedKeySet); return null; } catch (NoSuchFieldException e) { return e; } catch (IllegalAccessException e) { return e; } } }); if (maybeException instanceof Exception) { selectedKeys = null; Exception e = (Exception) maybeException; logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e); return new SelectorTuple(unwrappedSelector); } // 设置 SelectedSelectionKeySet selectedKeys = selectedKeySet; logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector); // 返回包含了原生selector和优化过的selector的SelectorTuple return new SelectorTuple(unwrappedSelector, new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));}复制代码

优化后的 SelectedSelectionKeySet 对象,内部采用 SelectionKey 数组的形式:

final class SelectedSelectionKeySet extends AbstractSet
{ SelectionKey[] keys; int size; SelectedSelectionKeySet() { keys = new SelectionKey[1024]; } // 使用数组,来替代HashSet,可以降低时间复杂度为O(1) @Override public boolean add(SelectionKey o) { if (o == null) { return false; } keys[size++] = o; if (size == keys.length) { increaseCapacity(); } return true; } @Override public boolean remove(Object o) { return false; } @Override public boolean contains(Object o) { return false; } @Override public int size() { return size; } @Override public Iterator
iterator() { return new Iterator
() { private int idx; @Override public boolean hasNext() { return idx < size; } @Override public SelectionKey next() { if (!hasNext()) { throw new NoSuchElementException(); } return keys[idx++]; } @Override public void remove() { throw new UnsupportedOperationException(); } }; } void reset() { reset(0); } void reset(int start) { Arrays.fill(keys, start, size, null); size = 0; } // 扩容 private void increaseCapacity() { SelectionKey[] newKeys = new SelectionKey[keys.length << 1]; System.arraycopy(keys, 0, newKeys, 0, size); keys = newKeys; }}复制代码

SingleThreadEventLoop 构造器

public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {        ...    	protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,                                boolean addTaskWakesUp, int maxPendingTasks,                                RejectedExecutionHandler rejectedExecutionHandler) {        // 调用 SingleThreadEventExecutor 构造器        super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);        tailTasks = newTaskQueue(maxPendingTasks);	}        ...}复制代码

SingleThreadEventExecutor 构造器,主要做两件事情:

  • 设置线程任务执行器。
  • 设置任务队列。前面讲到EventLoop对于不能立即执行的Task会放入一个队列中,就是这里设置的。
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {	    ...        protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,                                    boolean addTaskWakesUp, int maxPendingTasks,                                    RejectedExecutionHandler rejectedHandler) {        super(parent);        this.addTaskWakesUp = addTaskWakesUp;        this.maxPendingTasks = Math.max(16, maxPendingTasks);        // 设置线程任务执行器        this.executor = ObjectUtil.checkNotNull(executor, "executor");        // 设置任务队列        taskQueue = newTaskQueue(this.maxPendingTasks);        rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");    	}            ...    }复制代码

NioEventLoop 中对 newTaskQueue 接口的实现,返回的是 工具包 Mpsc 队列。后面我们写文章单独介绍 JCTools 中的相关队列。

Mpsc:Multi Producer Single Consumer (Lock less, bounded and unbounded)

多个生产者对单个消费者(无锁、有界和无界都有实现)

public final class NioEventLoop extends SingleThreadEventLoop {    ...    @Override    protected Queue
newTaskQueue(int maxPendingTasks) { // This event loop never calls takeTask() return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.
newMpscQueue() : PlatformDependent.
newMpscQueue(maxPendingTasks); } ...}复制代码

创建线程执行选择器chooser

接下来,我们看看 MultithreadEventExecutorGroup 构造器的最后一个部分内容,创建线程执行选择器chooser,它的主要作用就是 EventLoopGroup 用于从 EventLoop 数组中选择一个 EventLoop 去执行任务。

// 创建选择器chooser = chooserFactory.newChooser(children);复制代码

EventLoopGroup 中定义的 next() 接口:

public interface EventLoopGroup extends EventExecutorGroup {		...		// 选择下一个 EventLoop 用于执行任务    @Override    EventLoop next();		...}复制代码

MultithreadEventExecutorGroup 中对 next() 的实现:

@Overridepublic EventExecutor next() {    // 调用 DefaultEventExecutorChooserFactory 中的next()    return chooser.next();}复制代码

DefaultEventExecutorChooserFactory 对于如何从数组中选择任务执行器,也做了巧妙的优化。

public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory {    public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();    private DefaultEventExecutorChooserFactory() { }    @SuppressWarnings("unchecked")    @Override    public EventExecutorChooser newChooser(EventExecutor[] executors) {        if (isPowerOfTwo(executors.length)) {            return new PowerOfTwoEventExecutorChooser(executors);        } else {            return new GenericEventExecutorChooser(executors);        }    }	    // 判断线程任务执行的个数是否为 2 的幂次方。e.g: 2、4、8、16    private static boolean isPowerOfTwo(int val) {        return (val & -val) == val;    }	    // 幂次方选择器    private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {        private final AtomicInteger idx = new AtomicInteger();        private final EventExecutor[] executors;                PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {            this.executors = executors;        }                @Override        public EventExecutor next() {            // 通过二级制进行 & 运算,效率更高            return executors[idx.getAndIncrement() & executors.length - 1];        }    }    // 普通选择器    private static final class GenericEventExecutorChooser implements EventExecutorChooser {        private final AtomicInteger idx = new AtomicInteger();        private final EventExecutor[] executors;        GenericEventExecutorChooser(EventExecutor[] executors) {            this.executors = executors;        }                @Override        public EventExecutor next() {            // 按照最普通的取模的方式从index=0开始向后开始选择            return executors[Math.abs(idx.getAndIncrement() % executors.length)];        }    }}复制代码

小结

通过本节内容,我们了解到了EventLoop与EventLoopGroup的基本原理,EventLoopGroup与EventLoop的创建过程:

  • 创建线程任务执行器 ThreadPerTaskExecutor
  • 创建EventLoop
  • 创建任务选择器 EventExecutorChooser

参考资料

转载地址:http://fbwso.baihongyu.com/

你可能感兴趣的文章
Mysql 数据库密码管理
查看>>
bean 作用 域
查看>>
我的友情链接
查看>>
Python 编程中常用的12种基础知识总结
查看>>
关于/.svn/源代码泄漏的问题
查看>>
Winetricks download
查看>>
编译型语言VS解释型语言
查看>>
Event handling for iOS - how hitTest:withEvent: and pointInside:withEvent: are related?
查看>>
Mybatis Generator 不识别主键
查看>>
Nginx 400 Bad Request | The plain HTTP request was sent to HTTPS port
查看>>
ftp指定本地用户访问的目录
查看>>
python 自定义模块的引用
查看>>
Zabbix 监控ESXi服务器【非虚拟机】CPU、内存、硬盘、网络带宽
查看>>
HTML5之SessionStorage本地存储
查看>>
error: cannot find javah找不到javah解决办法
查看>>
我的友情链接
查看>>
三种问题可能导致无线路由间歇断开网络
查看>>
MySQL的安装和使用
查看>>
lduan SCDPM 保护组与系统状态(五)
查看>>
我的友情链接
查看>>