新连接接入处理逻辑
1、检测新连接:由boss
线程组中的那一个reactor
线程上绑定的Selector
对OP_ACCEPT
事件进行轮询;
2、创建NioSocketChannel
:创建一个JDK的NioChannel
,Netty封装成NioSocketChannel
即客户端Channel
;
3、给新创建的客户端Channel
分配NioEventLoop
并注册到绑定的Selector
上;
4、向Selector
注册读事件
下面主要按上面的逻辑进行分析
检测新连接
我们先来到服务端的reactor
线程的run方法中,这个方法是一个for循环,其中主要进行三件事情,轮询注册在selector
上的IO事件、处理IO事件、执行异步task。而新连接的处理逻辑就是这个处理IO事件。
# NioEventLoop.java
@Override protected void run() { for (;;) { try { // 1、轮询IO事件 switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { case SelectStrategy.CONTINUE: continue; case SelectStrategy.SELECT: select(wakenUp.getAndSet(false)); if (wakenUp.get()) { selector.wakeup(); } default: // fallthrough } cancelledKeys = 0; needsToSelectAgain = false; final int ioRatio = this.ioRatio; if (ioRatio == 100) { try { processSelectedKeys(); } finally { // Ensure we always run tasks. runAllTasks(); } } else { final long ioStartTime = System.nanoTime(); try { //2、处理IO事件 processSelectedKeys(); } finally { // Ensure we always run tasks. final long ioTime = System.nanoTime() - ioStartTime; // 3、处理异步任务 runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } } catch (Throwable t) { handleLoopException(t); } try { if (isShuttingDown()) { closeAll(); if (confirmShutdown()) { return; } } } catch (Throwable t) { handleLoopException(t); } } }
跟进处理IO事件代码:
private void processSelectedKeys() { if (selectedKeys != null) { processSelectedKeysOptimized(selectedKeys.flip()); } else { processSelectedKeysPlain(selector.selectedKeys()); } }
这里判断了一下selectedKeys
是否是优化后的,关于优化就是这个selectedKeys
是数组还是一个Set
集合,这里我们已经做了优化,就跟进到processSelectedKeysOptimized
:
private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) { for (int i = 0;; i ++) { final SelectionKey k = selectedKeys[i]; if (k == null) { break; } selectedKeys[i] = null; final Object a = k.attachment(); if (a instanceof AbstractNioChannel) { processSelectedKey(k, (AbstractNioChannel) a); } else { @SuppressWarnings("unchecked") NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; processSelectedKey(k, task); } ... ... } }
这个方法就是一个一个取轮询到的连接事件,然后进行处理,继续跟进到连接处理的方法:
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); // 取出服务端Channel的unsafe类实例保存 if (!k.isValid()) { final EventLoop eventLoop; try { eventLoop = ch.eventLoop(); } catch (Throwable ignored) { return; } if (eventLoop != this || eventLoop == null) { return; } unsafe.close(unsafe.voidPromise()); return; } try { int readyOps = k.readyOps(); if ((readyOps & SelectionKey.OP_CONNECT) != 0) { int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); } if ((readyOps & SelectionKey.OP_WRITE) != 0) { ch.unsafe().forceFlush(); } if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); if (!ch.isOpen()) { return; } } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } }
根据感兴趣的事件集:
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); if (!ch.isOpen()) { // Connection already closed - no need to handle write. return; } }
客户端的Channel的Unsafe类是上面的NioByteUnsafe
, 服务端Channel
的Unsafe
类就是这个NioMessageUnsafe
:
# NioMessageUnsafe
private final class NioMessageUnsafe extends AbstractNioUnsafe { private final List<Object> readBuf = new ArrayList<Object>(); // 用来保存新连接 @Override public void read() { assert eventLoop().inEventLoop(); final ChannelConfig config = config(); // 获取服务端Channel的Config final ChannelPipeline pipeline = pipeline(); // 获取服务端的pipeline final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); // 用来处理接入数据,读取channel allocHandle.reset(config); boolean closed = false; Throwable exception = null; try { try { do { int localRead = doReadMessages(readBuf); if (localRead == 0) { break; } if (localRead < 0) { closed = true; break; } allocHandle.incMessagesRead(localRead); } while (allocHandle.continueReading()); } catch (Throwable t) { exception = t; } int size = readBuf.size(); for (int i = 0; i < size; i ++) { readPending = false; pipeline.fireChannelRead(readBuf.get(i)); } readBuf.clear(); allocHandle.readComplete(); pipeline.fireChannelReadComplete(); if (exception != null) { closed = closeOnReadError(exception); pipeline.fireExceptionCaught(exception); } ... ... } finally { ... ... } } }
这个read()
方法只要做了几个事情:
1、获取服务端Channel
的config
和pipeline
;
2、allocHandle
处理接入数据读取Channel
;
3、allocHandle
调用continueReading()
判断总连接数是否超过每次最大读入的连接数。
进入到doReadMessages()
方法:
# NioServerSocketChannel.java
@Override protected int doReadMessages(List<Object> buf) throws Exception { SocketChannel ch = javaChannel().accept(); // 调用JDK底层的accept方法来获取一个SocketChannel,这个是阻塞的 try { if (ch != null) { buf.add(new NioSocketChannel(this, ch)); // 封装成NioSocketChannel return 1; } } catch (Throwable t) { logger.warn("Failed to create a new channel from an accepted socket.", t); try { ch.close(); } catch (Throwable t2) { logger.warn("Failed to close a socket.", t2); } } return 0; }
这个方法主要就是调用JDK底层原生的Channel
的accept()
方法获取SocketChannel
,再封装成NioSocketChannel
方法放到传入的buf
中。
NioSocketChannel的创建
# NioSocketChannel.java
public NioSocketChannel(Channel parent, SocketChannel socket) { super(parent, socket); config = new NioSocketChannelConfig(this, socket.socket()); }
主要就是逐层调用父类函数并且创建这个channel
的config
。
先跟进父类构造函数:
# AbstractNioByteChannel.java
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) { super(parent, ch, SelectionKey.OP_READ); }
这里传入了这个Channel
感兴趣的事件SelectionKey.OP_READ
,后续会将这个事件绑定到Selector
上去,即如果后续有读事件发生,就会被激活。
# AbstractNioChannel.java
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent); this.ch = ch; // 保存jdk创建的客户端channel this.readInterestOp = readInterestOp; // 保存兴趣事件 try { ch.configureBlocking(false); // 初始化客户端channel为非阻塞 } catch (IOException e) { try { ch.close(); } catch (IOException e2) { if (logger.isWarnEnabled()) { logger.warn( "Failed to close a partially initialized socket.", e2); } } throw new ChannelException("Failed to enter non-blocking mode.", e); } }
主要就是设置了客户端channel
为非阻塞。
# AbstractChannel.java
protected AbstractChannel(Channel parent) { this.parent = parent; // 即服务端的NioServerSocketChannel id = newId(); // 生成id unsafe = newUnsafe(); // 生成当前Channel的Unsafe类 pipeline = newChannelPipeline(); // 生成当前channel的pipeline }
其实整个父类构造函数的调用链主要就是配置了一下几大组件并且设置channel
为非阻塞。
再创建channel
的config
,跟到底:
# DefaultSocketChannelConfig.java
public DefaultSocketChannelConfig(SocketChannel channel, Socket javaSocket) { super(channel); if (javaSocket == null) { throw new NullPointerException("javaSocket"); } this.javaSocket = javaSocket; // Enable TCP_NODELAY by default if possible. if (PlatformDependent.canEnableTcpNoDelayByDefault()) { try { setTcpNoDelay(true); } catch (Exception e) { // Ignore. } } }
上述代码主要就是保存传入的一个jdk底层的socket
对象,并且禁用Nagle
算法,Nagle
算法其实就是如果数据包比较小,会将多个打包发送,禁用后会降低延迟。
新连接NioEventLoop分配
在服务端启动的过程中,服务端channel
中pipeline
中添加了一个用户代码自定义参数的ServerBootstrapAcceptor
连接器。
在读取完每一条NioSocketChannel
后,执行for循环,调用fireChannelRead
方法添加到连接器中,即调用ServerBootstrap
的channelRead
方法,这个方法主要做了这么几件事:
1、添加用户自定义的childHandler
到新连接到pipeline
中
2、设置options
和attrs
3、选择NioEventLoop
并注册selector
# ServerBootstrap.java
@Override @SuppressWarnings("unchecked") public void channelRead(ChannelHandlerContext ctx, Object msg) { final Channel child = (Channel) msg; //1、添加用户自定义的childHandler到新连接到pipeline中 child.pipeline().addLast(childHandler); //2、设置options和attrs for (Entry<ChannelOption<?>, Object> e: childOptions) { try { if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) { logger.warn("Unknown channel option: " + e); } } catch (Throwable t) { logger.warn("Failed to set a channel option: " + child, t); } } for (Entry<AttributeKey<?>, Object> e: childAttrs) { child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue()); } //3、选择NioEventLoop并注册selector try { childGroup.register(child).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { forceClose(child, future.cause()); } } }); } catch (Throwable t) { forceClose(child, t); } }
主要看第三步,跟进register()
方法,这里的childGroup
是workGroup
:
# MultithreadEventLoopGroup.java
@Override public ChannelFuture register(ChannelPromise promise) { return next().register(promise); }
@Override public EventLoop next() { return (EventLoop) super.next(); }
# MultithreadEventExecutorGroup.java
@Override public EventExecutor next() { return chooser.next(); }
这里就是调用线程选择器分配一个NioEventLoop
,然后调用register
方法:
# SingleThreadEventLoop.java
@Override public ChannelFuture register(final ChannelPromise promise) { ObjectUtil.checkNotNull(promise, "promise"); promise.channel().unsafe().register(this, promise); return promise; }
这里的unsafe
就是对应客户端的NioByteUnsafe
类的实例,继续进入register
方法:
# AbstractChannel.java
@Override public final void register(EventLoop eventLoop, final ChannelPromise promise) { if (eventLoop == null) { throw new NullPointerException("eventLoop"); } if (isRegistered()) { promise.setFailure(new IllegalStateException("registered to an event loop already")); return; } if (!isCompatible(eventLoop)) { promise.setFailure( new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName())); return; } // 把分配的NioEventLoop线程保存下来 AbstractChannel.this.eventLoop = eventLoop; // 这里的发起线程是服务端的NioEventLoop因此肯定不在客户端的NioEventLoop中,所以返回false if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable() { // 在这里启动客户端的eventLoop @Override public void run() { register0(promise); // 进行注册 } }); } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } }
进行了一些注释,继续跟进register0
方法,下面只截取跟register有关的:
private void register0(ChannelPromise promise) { try { if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } boolean firstRegistration = neverRegistered; doRegister(); neverRegistered = false; registered = true; }
跟进doRegister
方法:
# AbstractNioChannel.java
@Override protected void doRegister() throws Exception { boolean selected = false; for (;;) { try { selectionKey = javaChannel().register(eventLoop().selector, 0, this); return; } catch (CancelledKeyException e) { if (!selected) { // Force the Selector to select now as the "canceled" SelectionKey may still be // cached and not removed because no Select.select(..) operation was called yet. eventLoop().selectNow(); selected = true; } else { // We forced a select operation on the selector before but the SelectionKey is still cached // for whatever reason. JDK bug ? throw e; } } } }
恍然大悟,就是先拿到底层的javaChannel
然后把eventLoop上
绑定的selector
注册上去,然后对于感兴趣的事件参数为0,也就是对什么都不感兴趣,然后把当前的客户端channel
作为attachment
注册到selector
上。
NioSocketChannel读事件的注册
上一步将客户端的channel
注册到selector
上后,需要继续注册读事件,回到register0
方法:
private void register0(ChannelPromise promise) { try { // check if the channel is still open as it could be closed in the mean time when the register // call was outside of the eventLoop if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } boolean firstRegistration = neverRegistered; doRegister(); neverRegistered = false; registered = true; pipeline.invokeHandlerAddedIfNeeded(); safeSetSuccess(promise); pipeline.fireChannelRegistered(); // 调用每个handler的ChannelHandlerAdded方法处理回调 if (isActive()) { // 返回true if (firstRegistration) { // 第一次注册,返回true pipeline.fireChannelActive(); } else if (config().isAutoRead()) { beginRead(); } } } catch (Throwable t) { // Close the channel directly to avoid FD leak. closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } }
可见会调用pipeline.fireChannelActive()
,调用链路如下:
# HeadContext.java
@Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelActive(); readIfIsAutoRead(); // }
private void readIfIsAutoRead() { if (channel.config().isAutoRead()) { // 默认自动读,返回true channel.read(); } }
@Override public Channel read() { pipeline.read(); return this; }
接着继续调用,知道调用到:
@Override public void read(ChannelHandlerContext ctx) { unsafe.beginRead(); }
因此最终就是调用了客户端的NioByteUnsafe
类实例的beginRead()
:
# AbstractChannel.java
@Override public final void beginRead() { assertEventLoop(); if (!isActive()) { return; } try { doBeginRead(); } catch (final Exception e) { invokeLater(new Runnable() { @Override public void run() { pipeline.fireExceptionCaught(e); } }); close(voidPromise()); } }
跟进doBeginRead()
:
# AbstractNioChannel.java
@Override protected void doBeginRead() throws Exception { // Channel.read() or ChannelHandlerContext.read() was called final SelectionKey selectionKey = this.selectionKey; if (!selectionKey.isValid()) { return; } readPending = true; final int interestOps = selectionKey.interestOps(); if ((interestOps & readInterestOp) == 0) { selectionKey.interestOps(interestOps | readInterestOp); } }
之前注册上去的时候ops是0,因此(interestOps & readInterestOp) == 0
判断为true;
这里的readInterestOp
是之前newChannel
的时候传入的SelectionKey.OP_ACCEPT
,因此通过或操作变成事件集设置为当前selectionKey
的interestOps
,即完成了读事件的注册。
面试相关
问:Netty在哪里检测新连接接入的?
- Boss线程通过服务端Channel绑定的Selector轮询OP_ACCEPT事件,检测到后再通过底层的accept()方法获取JDK底层的SocketChannel(创建客户端jdk的channel),然后封装成NioSocketChannel加入到检测集中。
问:新连接如何注册到NioEventLoop线程上的?
- woker线程调用选择Chooser的next()方法获取一个NioEventLoop绑定到客户端Channel上,使用doRegister()方法将新连接注册到NioEventLoop上面的Selector。
问:新连接接入的流程是怎么样的?
- 服务端Channel绑定的NioEventLoop即Boss线程轮询OP_ACCEPT事件,调用服务端Channel的accept()方法获取客户端Channel封装成NioSocketChannel,封装创建组件Unsafe类用来读写和Pipeline负责数据处理业务逻辑链,服务端Channel通过连接接入器ServerBootstrapAcceptor给客户端Channel分配NioEventLoop,将客户端Channel绑定到Selector上面,通过传播ChannelActive方法将客户端Channel读事件注册到Selector。
全部评论
(1) 回帖