准备赶趟上春招实习的车,刚开始学netty,感觉项目很简单,所以就想看看源码,非科班看这些是真吃力。。放点小笔记,都是站在巨人的肩膀上总结的(小抄),加油!
前置芝士:
需要知道Executor执行器的一些操作。
Demo
public class NettyServer { int port; public NettyServer(int port) { this.port = port; } public void start() { ServerBootstrap bootstrap = new ServerBootstrap(); EventLoopGroup boss = new NioEventLoopGroup(1); EventLoopGroup work = new NioEventLoopGroup(); try { bootstrap.group(boss, work) .handler(new LoggingHandler(LogLevel.DEBUG)) .channel(NioServerSocketChannel.class) .childHandler(new ChatRoomServerInitializer()); ChannelFuture f = bootstrap.bind(new InetSocketAddress(port)).sync(); System.out.println("http server started. port : " + port); f.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { boss.shutdownGracefully(); work.shutdownGracefully(); } } public static void main(String[] args) { NettyServer server = new NettyServer(8080);// 8080为启动端口 server.start(); } }
上面是一段比较简单的Netty服务端的代码,我们主要关注:
EventLoopGroup boss = new NioEventLoopGroup(1); // 用于新连接接入的Group,初始化为1 EventLoopGroup work = new NioEventLoopGroup(); // 用于处理channel中的io事件以及任务的group
NioEventLoopGroup初始化过程
跟进到上述构造函数中,最后会来到MultithreadEventLoopGroup
类中的构造函数:
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) { super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args); }
传入的参数就是指定Group的大小,默认大小 DEFAULT_EVENT_LOOP_THREADS
是 Runtime.getRuntime().availableProcessors() * 2
也就是两倍的CPU数。
继续跟会来到:
# MultithreadEventExecutorGroup.java
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) { if (nThreads <= 0) { throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads)); } // 这个executor是group所包含的executor,其将来会为其所包含的每个eventLoop创建一个线程 if (executor == null) { executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); } children = new EventExecutor[nThreads]; for (int i = 0; i < nThreads; i ++) { boolean success = false; try { // 创建eventLoop children[i] = newChild(executor, args); success = true; } catch (Exception e) { // TODO: Think about if this is a good exception type throw new IllegalStateException("failed to create a child event loop", e); } finally { // 有创建失败的eventLoop就关闭所有之前创建的 if (!success) { for (int j = 0; j < i; j ++) { children[j].shutdownGracefully(); } for (int j = 0; j < i; j ++) { EventExecutor e = children[j]; try { while (!e.isTerminated()) { e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); } } catch (InterruptedException interrupted) { // Let the caller handle the interruption. Thread.currentThread().interrupt(); break; } } } } } // 创建选择器 chooser = chooserFactory.newChooser(children); final FutureListener<Object> terminationListener = new FutureListener<Object>() { @Override public void operationComplete(Future<Object> future) throws Exception { if (terminatedChildren.incrementAndGet() == children.length) { terminationFuture.setSuccess(null); } } }; for (EventExecutor e: children) { e.terminationFuture().addListener(terminationListener); } Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length); Collections.addAll(childrenSet, children); readonlyChildren = Collections.unmodifiableSet(childrenSet); }
从类名就可以知道这是多线程的线程组,这里主要完成几件事情:
new ThreadPerTaskExecutor()
[线程创建器]for(){ new Child() }
[构造NioEventLoop]chooserFactory.newChooser()
[线程选择器]
线程创建器
先看看名字,是给每个任务创建一个线程的线程创建器,其保存在NioEventGroup
中的executor
中。主要是为每一个NioEventLoop
创建一个对应的线程,1:1。
# ThreadPerTaskExecutor.java
public final class ThreadPerTaskExecutor implements Executor { private final ThreadFactory threadFactory; public ThreadPerTaskExecutor(ThreadFactory threadFactory) { if (threadFactory == null) { throw new NullPointerException("threadFactory"); } // 传入一个线程工厂 this.threadFactory = threadFactory; } @Override public void execute(Runnable command) { // 在执行exector的execute()方法时,调用线程工厂创建线程,并start() threadFactory.newThread(command).start(); } }
上述代码其实就是初始化NioEventGroup
中的executor
为一个线程工厂,通过之后调用execute()
方法为将来的NioEventLoop
创建线程来一一对应。
打住,先来看看NioEventLoop的继承关系:
这图挂了。。
可知NioEventLoop
本身就是一个单线程的EventExecutor
,因此有下面创建线程组数组
children = new EventExecutor[nThreads];
而实例化创建EventLoop
在函数newChild()
中。
构造NioEventLoop
我们跟进到构造NioEventLoop
的函数newChild()
:
# NioEventLoopGroup.java
@Override protected EventLoop newChild(Executor executor, Object... args) throws Exception { return new NioEventLoop(this, executor, (SelectorProvider) args[0], ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]); }
继续跟来到NioEventLoop
的构造函数:
# NioEventLoop.java
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) { // 父类单线程的构造方法,传入的参数executor是group中的executor super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler); if (selectorProvider == null) { throw new NullPointerException("selectorProvider"); } if (strategy == null) { throw new NullPointerException("selectStrategy"); } provider = selectorProvider; selector = openSelector(); // 创建selector事件轮询器到NioEventLoop上 selectStrategy = strategy; }
先跟进父类的构造方法:
# SingleThreadEventLoop.java
protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor, boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedExecutionHandler) { super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler); tailTasks = newTaskQueue(maxPendingTasks); // 创建一个Mpsc任务队列,多生产者,单个消费者的任务队列 }
上述代码父类构造除了继续调用父类构造外,创建了一个Mpsc任务队列,外部线程的任务会被加入到这个任务中,保证只有一个线程去处理这些任务,保证线程安全。
# SingleThreadEventExecutor.java
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedHandler) { super(parent); this.addTaskWakesUp = addTaskWakesUp; this.maxPendingTasks = Math.max(16, maxPendingTasks); // 这是当前NioEventLoop所包含的executor this.executor = ObjectUtil.checkNotNull(executor, "executor"); taskQueue = newTaskQueue(this.maxPendingTasks); // 创建一个普通任务队列 rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler"); }
看博客说关于wakeup的代码比较旧,就不看了。😁
最终父类会创建一个单线程的线程创建器SingleThreadEventExecutor
,除此之外,还保存了executor
到了当前NioEventLoop
中,也就是前面一路下来的group中的executor
,帮你回忆一下:
children[i] = newChild(executor, args);
保存这个executor
主要是为了之后调用NioEventLoop
的execute()
方法时其实就是调用传入的这个执行器,也就是executor.execute()
。
然后是创建事件轮询器:
selector = openSelector();
# NioEventLoop.java
// 只截取了关心的一些代码 private Selector openSelector() { final Selector selector; try { selector = provider.openSelector(); } catch (IOException e) { throw new ChannelException("failed to open a new selector", e); } if (DISABLE_KEYSET_OPTIMIZATION) { return selector; } ... ... }
其中这个provider
是一个java.nio中的SelectorProvider
,也就是调用jdk创建了一个selector
绑定到NioEventLoop
上。
线程选择器
# MultithreadEventExecutorGroup.java
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) { this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args); }
在构造函数中,选择器参数传入的是一个默认选择器工厂的实例(单例模式)。
chooser = chooserFactory.newChooser(children);
将线程组交给线程选择器,跟进到chooserFactory.newChooser()
# DefaultEventExecutorChooserFactory.java
public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory { // 单例 public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory(); private DefaultEventExecutorChooserFactory() { } @SuppressWarnings("unchecked") @Override public EventExecutorChooser newChooser(EventExecutor[] executors) { if (isPowerOfTwo(executors.length)) { return new PowerOfTowEventExecutorChooser(executors); } else { return new GenericEventExecutorChooser(executors); } } // 判断长度是否是2的幂 private static boolean isPowerOfTwo(int val) { return (val & -val) == val; } // 如果是就用这个分配 private static final class PowerOfTowEventExecutorChooser implements EventExecutorChooser { private final AtomicInteger idx = new AtomicInteger(); private final EventExecutor[] executors; PowerOfTowEventExecutorChooser(EventExecutor[] executors) { this.executors = executors; } @Override public EventExecutor next() { return executors[idx.getAndIncrement() & executors.length - 1]; // 用与运算 } } // 否则就是普通的分配 private static final class GenericEventExecutorChooser implements EventExecutorChooser { private final AtomicInteger idx = new AtomicInteger(); private final EventExecutor[] executors; GenericEventExecutorChooser(EventExecutor[] executors) { this.executors = executors; } @Override public EventExecutor next() { return executors[Math.abs(idx.getAndIncrement() % executors.length)]; } } }
选择器策略是一个一个往后分配,循环遍历整个线程组给新连接绑定对应的NioEventLoop
,实际的调用在MultithreadEventExecutorGroup.next()
。
还做了以下的优化:
- 先判断线程组数组的长度是否是2的幂
- 如果是,则调用
PowerOfTowEventExecutorChooser()
,使用的是位运算替代%,效率比较高 - 否则就是
GenericEventExecutorChooser()
NioEventLoop启动过程
NioEventLoop
启动触发器:
- 服务端启动绑定端口
- 新连接接入通过chooser绑定一个NioEventLoop
以绑定端口为例跟一下启动过程,先说一下总的逻辑:
bind() -> execute(task) [入口]
- startThread() -> doStartThread() [创建线程]
- ThreadPerTaskExecutor.execute()
- thread = Thread.currentThread()
- NioEventLoop.run() [启动]
- ThreadPerTaskExecutor.execute()
先跟进到入口,bind方法:
- startThread() -> doStartThread() [创建线程]
# AbstractBootstrap.java
channel.eventLoop().execute(new Runnable() { @Override public void run() { if (regFuture.isSuccess()) { channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } });
将端口绑定作为一个Runnable
任务去调用NioEventLoop.execute()
方法,具体实现在父类中。
# SingleThreadEventExecutor.java
@Override public void execute(Runnable task) { if (task == null) { throw new NullPointerException("task"); } boolean inEventLoop = inEventLoop(); if (inEventLoop) { addTask(task); } else { startThread(); addTask(task); if (isShutdown() && removeTask(task)) { reject(); } } if (!addTaskWakesUp && wakesUpForTask(task)) { wakeup(inEventLoop); } }
先留意下inEventLoop()
这个方法,能够解决netty的异步串行无锁化。
直接跟到startThread()
中,还是在这个类中:
private void startThread() { if (STATE_UPDATER.get(this) == ST_NOT_STARTED) { if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) { // 如果线程状态为未启动,就cas设置成启动状态,然后执行下面方法 doStartThread(); } } }
// 只贴关心的代码 private void doStartThread() { assert thread == null; executor.execute(new Runnable() { @Override public void run() { thread = Thread.currentThread(); // 保留当前线程信息,绑定端口任务一开始的线程就是main线程 if (interrupted) { thread.interrupt(); } boolean success = false; updateLastExecutionTime(); try { SingleThreadEventExecutor.this.run(); // for循环执行任务队列的任务 success = true; } catch (Throwable t) { ... ... } ... ... } ... ... } }
上述代码中的executor
是NioEventLoop
所包含的那个,根据之前创建过程可以知道,这个execute
最后会调用NioEventLoopGroup
中的execute
方法。
NioEventLoop执行流程
先说一下总的执行逻辑:
- run() -> for(;;)
- select() [检查是否有io事件]
- processSelectedKeys() [处理io事件]
- runAllTasks() [处理异步任务队列]
run()
方法中有一个for循环,一共做三件事,select注册到轮询器上的channel中的io事件,然后调用processSelectedKeys()
处理轮询出来的io事件,runAllTasks()
处理外部线程扔到taskqueue中的任务。
跟进到SingleThreadEventExecutor.this.run()
:
# NioEventLoop.java
@Override protected void run() { for (;;) { try { // selector选择一个已经就绪的channel switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { case SelectStrategy.CONTINUE: continue; case SelectStrategy.SELECT: // SelectStrategy.SELECT == -1,说明此时任务队列中没有任务 // 阻塞式选择 select(wakenUp.getAndSet(false)); ... ... if (wakenUp.get()) { selector.wakeup(); } default: // fallthrough } cancelledKeys = 0; needsToSelectAgain = false; final int ioRatio = this.ioRatio; // 这是一个处理io和处理任务队列任务的比值 if (ioRatio == 100) { try { processSelectedKeys(); // 处理就绪channel的io } finally { // Ensure we always run tasks. runAllTasks(); // 执行任务队列中的任务 } } else { final long ioStartTime = System.nanoTime(); try { processSelectedKeys(); } finally { // Ensure we always run tasks. final long ioTime = System.nanoTime() - ioStartTime; runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } } catch (Throwable t) { handleLoopException(t); } // Always handle shutdown even if the loop processing threw an exception. try { if (isShuttingDown()) { closeAll(); if (confirmShutdown()) { return; } } } catch (Throwable t) { handleLoopException(t); } } }
ioRatio
默认是50,说明处理io和任务处理时间是1:1的。
接下来主要讲一下之前提到的三个过程。
select()方法执行逻辑
这一段逻辑就是在以下代码:
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { case SelectStrategy.CONTINUE: continue; case SelectStrategy.SELECT: select(wakenUp.getAndSet(false));
跟进到主要方法select()
中,还是在这个类中:
private void select(boolean oldWakenUp) throws IOException { Selector selector = this.selector; try { int selectCnt = 0; long currentTimeNanos = System.nanoTime(); long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos); for (;;) { // 判断是否超时 long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L; // 这一段的逻辑就是如果当前超时了,说明有定时任务要执行,那么如果一次都没有选择过,就执行一次非阻塞的选择,并且将选择计数器加1,break if (timeoutMillis <= 0) { if (selectCnt == 0) { selector.selectNow(); selectCnt = 1; } break; } // If a task was submitted when wakenUp value was true, the task didn't get a chance to call // Selector#wakeup. So we need to check task queue again before executing select operation. // If we don't, the task might be pended until select operation was timed out. // It might be pended until idle timeout if IdleStateHandler existed in pipeline. // 异步任务队列中是否有任务,如果有任务的话,就直接执行,并+1返回 if (hasTasks() && wakenUp.compareAndSet(false, true)) { selector.selectNow(); selectCnt = 1; break; } // 如果没有定时任务,没有超时,且任务队列中没有任务,就执行阻塞式select,超时时间1s int selectedKeys = selector.select(timeoutMillis); selectCnt ++; // 如果轮询到了时间 | select被外部线程唤醒 | 有任务 | 有定时任务,当前select操作就终止 if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) { // - Selected something, // - waken up by user, or // - the task queue has a pending task. // - a scheduled task is ready for processing break; } if (Thread.interrupted()) { // Thread was interrupted so reset selected keys and break so we not run into a busy loop. // As this is most likely a bug in the handler of the user or it's client library we will // also log it. // // See https://github.com/netty/netty/issues/2426 if (logger.isDebugEnabled()) { logger.debug("Selector.select() returned prematurely because " + "Thread.currentThread().interrupt() was called. Use " + "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop."); } selectCnt = 1; break; } // 执行到这里说明以及进行了一次阻塞式的select操作 long time = System.nanoTime(); if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) { // timeoutMillis elapsed without anything selected. // 如果阻塞时间到了,就说明执行了一次阻塞式select,那么计数器就是1 selectCnt = 1; } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) { // 否则没有到阻塞时间,那么说明没有进行阻塞就返回了,执行了空轮询,空轮询到一定阈值就会rebuildSelector() // The selector returned prematurely many times in a row. // Rebuild the selector to work around the problem. logger.warn( "Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.", selectCnt, selector); rebuildSelector(); selector = this.selector; // Select again to populate selectedKeys. selector.selectNow(); selectCnt = 1; break; } currentTimeNanos = time; } if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) { if (logger.isDebugEnabled()) { logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.", selectCnt - 1, selector); } } } catch (CancelledKeyException e) { if (logger.isDebugEnabled()) { logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?", selector, e); } // Harmless exception - log anyway } }
注意:
delayNanos(currentTimeNanos)
:用来计算当前定时任务队列第一个定时任务还有多久执行。
selector.selectNow()
:非阻塞选择
看了博客补充一下:Selector
的阻塞选择和非阻塞选择的区别就是,非阻塞选择在当前 select
方法执行时判断循环判断所有的 channel
是否就绪并返回所有的就绪数量,而阻塞式选择则是阻塞指定时间直至阻塞时间内获取到就绪 channel
或者阻塞时间超时时立刻返回。
这个select()
方法主要干了几件事情:
第一件事情,deadline以及任务穿插逻辑处理:
首先计算deadline,也就是定时任务的执行时间,计算出一个超时时间timeoutMillis
,也就是距离最近一次定时任务开始的时间,如果小于0,说明要执行定时任务,则执行一次非阻塞的选择:
// 这一段的逻辑就是如果当前超时了,说明有定时任务要执行,那么如果一次都没有选择过,就执行一次非阻塞的选择,并且将选择计数器加1,break if (timeoutMillis <= 0) { if (selectCnt == 0) { selector.selectNow(); selectCnt = 1; } break; }
第二件事,select阻塞式选择:
// 如果没有定时任务,没有超时,且任务队列中没有任务,就执行阻塞式select,超时时间1s int selectedKeys = selector.select(timeoutMillis); selectCnt ++; // 如果轮询到了时间 | select被外部线程唤醒 | 有任务 | 有定时任务,当前select操作就终止 if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) { // - Selected something, // - waken up by user, or // - the task queue has a pending task. // - a scheduled task is ready for processing break; }
第三件事,解决jdk空轮询bug:
// 执行到这里说明以及进行了一次阻塞式的select操作 long time = System.nanoTime(); if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) { // timeoutMillis elapsed without anything selected. // 如果阻塞时间到了,就说明执行了一次阻塞式select,那么计数器就是1 selectCnt = 1; } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) { // 否则没有到阻塞时间,那么说明没有进行阻塞就返回了,执行了空轮询,空轮询到一定阈值就会rebuildSelector() // The selector returned prematurely many times in a row. // Rebuild the selector to work around the problem. logger.warn( "Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.", selectCnt, selector); rebuildSelector(); selector = this.selector; // Select again to populate selectedKeys. selector.selectNow(); selectCnt = 1; break; }
其中阈值SELECTOR_AUTO_REBUILD_THRESHOLD
默认512,因此这个selectCnt
主要就是用来记录空轮询的次数。
这个解决空轮询的方法其实是有点乐观的,他并没有从根源上解决, 而是rebuildSelector()
,期盼着下一次能够不出现空轮询。
processSelectedKey()执行逻辑
NioEventLoop
的第二个过程就是处理检测到的io事件。
先来看看netty对于selectKey做了什么小动作。
select
操作每次会把就绪状态的io事件添加到底层的hashset
当中,而netty会通过反射把这个hashset
修改成数组,这样添加的操作就是O(1)的时间复杂度,具体过程如下:
# NioEventLoop.java
private Selector openSelector() { final Selector selector; try { selector = provider.openSelector(); } catch (IOException e) { throw new ChannelException("failed to open a new selector", e); } // 如果不需要优化,就直接返回原生的selector,默认为false if (DISABLE_KEYSET_OPTIMIZATION) { return selector; } final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet(); ... ... }
在之前创建事件轮询器的代码中,做了这方面的优化,他用一个SelectedSelectionKeySet
来替换底层的keyset的数据结构:
# SelectedSelectionKeySet.java
final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> { private SelectionKey[] keysA; private int keysASize; private SelectionKey[] keysB; private int keysBSize; private boolean isA = true; SelectedSelectionKeySet() { keysA = new SelectionKey[1024]; keysB = keysA.clone(); } @Override public boolean add(SelectionKey o) { if (o == null) { return false; } if (isA) { int size = keysASize; keysA[size ++] = o; keysASize = size; if (size == keysA.length) { doubleCapacityA(); } } else { int size = keysBSize; keysB[size ++] = o; keysBSize = size; if (size == keysB.length) { doubleCapacityB(); } } return true; } ... }
这个数据结构其实就一个方法有用就是add()
方法,是由一个数组和size的方法实现添加的,原来的HashSet
的添加在最坏情况下为O(n)。
final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass; Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() { @Override public Object run() { try { // 获取到成员变量selectedKeys和publicSelectedKeys Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys"); Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys"); selectedKeysField.setAccessible(true); // 允许修改 publicSelectedKeysField.setAccessible(true); selectedKeysField.set(selector, selectedKeySet); // 进行替换 publicSelectedKeysField.set(selector, selectedKeySet); return null; } catch (NoSuchFieldException e) { return e; } catch (IllegalAccessException e) { return e; } catch (RuntimeException e) { // JDK 9 can throw an inaccessible object exception here; since Netty compiles // against JDK 7 and this exception was only added in JDK 9, we have to weakly // check the type if ("java.lang.reflect.InaccessibleObjectException".equals(e.getClass().getName())) { return e; } else { throw e; } } } }); if (maybeException instanceof Exception) { selectedKeys = null; Exception e = (Exception) maybeException; logger.trace("failed to instrument a special java.util.Set into: {}", selector, e); } else { selectedKeys = selectedKeySet; logger.trace("instrumented a special java.util.Set into: {}", selector); } return selector;
还是在这个方法中,当我们构造好这个数组后,通过反射,修改原来selector中的属性selectedKeys
和publicSelectedKeys
为上面构造好的数组selectedKeySet
(一个披着set皮的array)。
在openSelector()
方法的最后也将selectedKeySet
保存成一个NioEventLoop
的成员变量。
对selected keySet优化完后,开始对这些就绪事件进行处理,调用processSelectedKeysOptimized()
,跟到run()
方法中的processSelectedKey()
:
# NioEventLoop.java
private void processSelectedKeys() { if (selectedKeys != null) { // 这个key是优化过的key processSelectedKeysOptimized(selectedKeys.flip()); } else { processSelectedKeysPlain(selector.selectedKeys()); } }
selectedKeys.flip()
:返回底层的数组,也就是selectedKeys
背后真正的keysA
数组,继续跟进 :
private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) { for (int i = 0;; i ++) { final SelectionKey k = selectedKeys[i]; if (k == null) { break; } // null out entry in the array to allow to have it GC'ed once the Channel close // See https://github.com/netty/netty/issues/2363 selectedKeys[i] = null; // 便于GC final Object a = k.attachment(); // 拿到key的attachment,也就是一个经过netty封装的channel if (a instanceof AbstractNioChannel) { // 如果是netty的channel processSelectedKey(k, (AbstractNioChannel) a); } else { @SuppressWarnings("unchecked") NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; processSelectedKey(k, task); } if (needsToSelectAgain) { // null out entries in the array to allow to have it GC'ed once the Channel close // See https://github.com/netty/netty/issues/2363 for (;;) { i++; if (selectedKeys[i] == null) { break; } selectedKeys[i] = null; } selectAgain(); // Need to flip the optimized selectedKeys to get the right reference to the array // and reset the index to -1 which will then set to 0 on the for loop // to start over again. // // See https://github.com/netty/netty/issues/1523 selectedKeys = this.selectedKeys.flip(); i = -1; } } }
显然需要继续跟进到方法processSelectedKey(k, (AbstractNioChannel) a)
中:
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); // unsafe也是和channel唯一绑定的 if (!k.isValid()) { // 如果key不合法,需要关闭channel final EventLoop eventLoop; try { eventLoop = ch.eventLoop(); } catch (Throwable ignored) { // If the channel implementation throws an exception because there is no event loop, we ignore this // because we are only trying to determine if ch is registered to this event loop and thus has authority // to close ch. return; } // Only close ch if ch is still registerd to this EventLoop. ch could have deregistered from the event loop // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is // still healthy and should not be closed. // See https://github.com/netty/netty/issues/5125 if (eventLoop != this || eventLoop == null) { return; } // close the channel if the key is not valid anymore unsafe.close(unsafe.voidPromise()); return; } try { int readyOps = k.readyOps(); // 判断就绪事件类型 if ((readyOps & SelectionKey.OP_CONNECT) != 0) { int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); } // Process OP_WRITE first as we may be able to write some queued buffers and so free memory. if ((readyOps & SelectionKey.OP_WRITE) != 0) { // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write ch.unsafe().forceFlush(); } // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead // to a spin loop if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { // 主要关心read和accept事件 unsafe.read(); if (!ch.isOpen()) { // Connection already closed - no need to handle write. return; } } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } }
看了别人博客的总结,这段逻辑就是处理就绪 channel 的 IO 事件的逻辑:
判断当前
SelectionKey
是否有效。失效结束处理并关闭资源。判断当前
channel
的关注事件,针对处理:获取SelectionKey
的readyOps
。这里的判断逻辑都是使用高效的位运算。readyOps
为当前SelectionKey
的就绪的事件类型。(readyOps & SelectionKey.OP_CONNECT) != 0
:连接就绪事件这个事件在 server 端不会关注,只有 client 用来连接 server 时才会关注连接就绪事件。
连接就绪后,获取当前
SelectionKey
的interestOps
值,将当前interestOps
值修改后,调用底层unsafe
连接server(readyOps & SelectionKey.OP_WRITE) != 0
:写就绪事件当前 channel 关注的是写就绪事件,此时写操作已经就绪,所以直接调用unsafe将数据写入网卡缓存。
(readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0
:当前channel
关注的是读就绪事件,或者前面因为有新增任务而触发的就绪channel
处理逻辑,只有因为任务触发的情况下readyOps
才可能会是 0 ,readyOps = 0
意味着没有就绪channel
。直接调用 unsafe 继续读操作,将网卡缓存的数据读取到用户空间。如果是 readyOps = 0 的情况相当于网卡缓存并没有就绪数据,则时进行的读操作不会读取到数据。
unsafe是个啥玩意?现在只知道Unsafe类使Java拥有了像C语言的指针一样操作内存空间的能力,等我有点B树了再说。
runAllTasks()执行逻辑
三件事:对task进行分类和添加、对任务进行聚合、执行任务
第一件事,对task进行分类和添加:
在之前说execute方法时,代码如下:
# SingleThreadEventExecutor.java
@Override public void execute(Runnable task) { if (task == null) { throw new NullPointerException("task"); } boolean inEventLoop = inEventLoop(); // 是否是外部线程 if (inEventLoop) { addTask(task); } else { startThread(); addTask(task); if (isShutdown() && removeTask(task)) { reject(); } } if (!addTaskWakesUp && wakesUpForTask(task)) { wakeup(inEventLoop); } }
先判断是否是外部线程调用的execute方法,如果是就先startThread();
,再将其加入到创建NioEventLoop时创建的任务队列MpscQueue中。
除了普通任务队列还有一个定时任务队列:
# AbstractScheduledEventExecutor.java
<V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) { if (inEventLoop()) { scheduledTaskQueue().add(task); } else { execute(new Runnable() { @Override public void run() { scheduledTaskQueue().add(task); } }); } return task;
对于定时任务,这里同样进行了判断,是否是外部线程,如果是外部线程在,则调用execute()
方法进行线程安全的操作,即现startThread()
,再添加任务,保证只有一个线程进行处理,即都在NioEventLoop中处理。
这是为什么呢?这是因为scheduledTaskQueue的实现是非线程安全的,普通优先队列:
Queue<ScheduledFutureTask<?>> scheduledTaskQueue() { if (scheduledTaskQueue == null) { scheduledTaskQueue = new PriorityQueue<ScheduledFutureTask<?>>(); } return scheduledTaskQueue; }
第二件事,任务的聚合:
# SingleThreadEventExecutor.java
protected boolean runAllTasks(long timeoutNanos) { fetchFromScheduledTaskQueue(); // 任务的聚合 Runnable task = pollTask(); if (task == null) { afterRunningAllTasks(); return false; } ... ... return true; }
private boolean fetchFromScheduledTaskQueue() { long nanoTime = AbstractScheduledEventExecutor.nanoTime(); Runnable scheduledTask = pollScheduledTask(nanoTime); // 取定时任务队列中当前需要允许的任务 while (scheduledTask != null) { if (!taskQueue.offer(scheduledTask)) { // No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again. scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask); return false; } scheduledTask = pollScheduledTask(nanoTime); } return true; }
上述while代码中的逻辑,定时任务不为空,即有定时任务:
- 先讲定时任务添加到普通任务队列中;
- 如果添加失败,则添加回定时任务队列中,因为之前取的时候poll了,并返回false;
- 成功添加后,继续从定时任务中取定时任务;
- while循环结束,所有定时任务都被添加到了普通任务队列,完成任务的聚合。
第三件事,任务的执行:
protected boolean runAllTasks(long timeoutNanos) { fetchFromScheduledTaskQueue(); Runnable task = pollTask(); // 从普通任务队列中拿任务 if (task == null) { afterRunningAllTasks(); return false; } final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos; // 根据超时时间计算截止时间 long runTasks = 0; long lastExecutionTime; for (;;) { safeExecute(task); // 执行任务,Runnable.run() runTasks ++; // Check timeout every 64 tasks because nanoTime() is relatively expensive. // XXX: Hard-coded value - will make it configurable if it is really a problem. if ((runTasks & 0x3F) == 0) { lastExecutionTime = ScheduledFutureTask.nanoTime(); if (lastExecutionTime >= deadline) { break; } } task = pollTask(); // 继续拿任务 if (task == null) { lastExecutionTime = ScheduledFutureTask.nanoTime(); break; } } afterRunningAllTasks(); this.lastExecutionTime = lastExecutionTime; return true; }
上述代码前半请看注释,关键在于对任务次数的判断:
// Check timeout every 64 tasks because nanoTime() is relatively expensive. // XXX: Hard-coded value - will make it configurable if it is really a problem. if ((runTasks & 0x3F) == 0) { lastExecutionTime = ScheduledFutureTask.nanoTime(); if (lastExecutionTime >= deadline) { break; } }
这里也是与操作,即(&63)== (%64)
,每执行64次就判断一下是否超时(因为需要和io处理时间分配时间,所以有一个超时时间),如果超时就退出,可能去进行io处理了。
为什么不每次都判断一下呢?上面的英文注释说了,nanoTime()
老费时间了。
面试相关
问:Netty如何保证异步串行无锁化?
- 在NioEventLoop中封装了一个线程, 这个IO线程就是用来处理客户端的连接事件, 读写事件, 处理队列中的任务. 没错, 每个NioEventLoop都有一个队列, 这个队列是在创建NioEventLoop时被初始化的,netty比较重任务。这个任务队列是一个多生产者单消费者的队列,因此可以保证线程安全。根据inEventLoop()判断,如果是外部线程,也就不是我们自己的io线程,那么就把他的runnable任务放到我们的Mpsc队列中来保证线程安全,同理与定时任务队列中的任务,我们的io线程只处理普通任务中的任务,因此保证了线程之间不需要同步。
问:默认情况下,Netty服务端起多少线程?何时启动?
- 默认两倍CPU数,调用execute()方法判断是否在本线程内,如果是,那么就已经启动了,如果是在外部线程中,那么就需要执行startThread()方法判断线程是否启动,未启动就启动此线程。
问:Netty如何解决jdk空轮询bug?
- jdk空轮询是在阻塞式select中,没有阻塞timeoutMillis时间就结束了阻塞select操作,我们称之为一次空轮询,因此判断这种空轮询操作是否超过设定的阈值(512),如果超过,就调用rebuildSelector()方法重建Selector把之前的key都移到新的轮询器上,避免bug。
问:简单说说NioEventLoop?
- 用户创建Boss/Worker EventLoopGroup时创建,默认创建NioEventLoop个数为2*CPU核数;每个NioEventLoop都由线程选择器chooser分配,并且用与运算优化了选择方式;每个NioEventLoop构造过程中都创建了Selector、任务队列,创建Selector时,通过反射使用数组替换集合方式保存selectedKeys ;NioEventLoop执行时调用execute()方法启动/创建FastThreadLocalThread线程,保存到该NioEventLoop的成员变量中进行一对一绑定;NioEventLoop执行逻辑在run方法中,包括检测io事件、处理io事件、执行任务队列。
PS:看了两天了这个。。非科班实在是水平不够,希望大佬们别喷,只是留个纪念,提提学习netty的建议,虚心接受-;-
全部评论
(2) 回帖