您当前的位置:首页 > 电脑百科 > 程序开发 > 语言 > JAVA

Netty启动流程剖析

时间:2019-11-13 13:26:31  来源:  作者:

编者注:Netty是JAVA领域有名的开源网络库,特点是高性能和高扩展性,因此很多流行的框架都是基于它来构建的,比如我们熟知的Dubbo、Rocketmq、Hadoop等,针对高性能RPC,一般都是基于Netty来构建,比如soft-bolt。总之一句话,Java小伙伴们需要且有必要学会使用Netty并理解其实现原理。
关于Netty的入门讲解可参考:Netty 入门,这一篇文章就够了

Netty的启动流程(ServerBootstrap),就是创建NioEventLoopGroup(内部可能包含多个NioEventLoop,每个eventLoop是一个线程,内部包含一个FIFO的taskQueue和Selector)和ServerBootstrap实例,并进行bind的过程(bind流程涉及到channel的创建和注册),之后就可以对外提供服务了。

Netty的启动流程中,涉及到多个操作,比如register、bind、注册对应事件等,为了不影响main线程执行,这些工作以task的形式提交给NioEventLoop,由NioEventLoop来执行这些task,也就是register、bind、注册事件等操作。

NioEventLoop(准确来说是SingleThreadEventExecutor)中包含了private volatile Thread thread,该thread变量的初始化是在new的线程第一次执行run方式时才赋值的,这种形式挺新颖的。

Netty启动流程剖析

 

Netty启动流程图如下所示:

Netty启动流程剖析

 

大致了解了Netty启动流程之后,下面就按照Netty启动流程中涉及到的源码来进行分析。

netty启动流程分为server端和client端,不同之处就是前者监听端口,对外提供服务(socket->bind->listen操作),对应类ServerBootstrap;后者主动去连接远端端口(socket->connect),对应类Bootstrap。

server端启动流程

server端启动流程可以理解成创建ServerBootstrap实例的过程,就以下面代码为例进行分析(echo服务):

public final class EchoServer { static final int PORT = Integer.parseInt(System.getProperty("port", "8007")); public static void main(String[] args) throws Exception { // bossGroup处理connect事件 // workerGroup处理read/write事件 EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); EchoServerHandler serverHandler = new EchoServerHandler(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NIOServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer() {
 @Override
 public void initChannel(SocketChannel ch) throws Exception {
 // 当连接建立后(register到childWorkerGroup前)初始化channel.pipeline
 ch.pipeline().addLast(serverHandler);
 }
 });

 // Start the server.
 ChannelFuture f = b.bind(PORT).sync();
 // Wait until the server socket is closed.
 f.channel().closeFuture().sync();
 } finally {
 // Shut down all event loops to terminate all threads.
 bossGroup.shutdownGracefully();
 workerGroup.shutdownGracefully();
 }
 }
}

public class EchoServerHandler extends ChannelInboundHandlerAdapter {
 @Override
 public void channelRead(ChannelHandlerContext ctx, Object msg) {
 ctx.write(msg);
 }

 @Override
 public void channelReadComplete(ChannelHandlerContext ctx) {
 ctx.flush();
 }

 @Override
 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
 // Close the connection when an exception is raised.
 cause.printStackTrace();
 ctx.close();
 }
}

EventLoopGroup创建

EventLoopGroup中可能包含了多个EventLoop,EventLoop是一个Reactor模型的事件处理器,一个EventLoop对应一个线程,其内部会维护一个selector和taskQueue,负责处理客户端请求和内部任务,内部任务如ServerSocketChannel注册和ServerSocket绑定操作等。关于NioEventLoop,后续专门写一篇文章分析,这里就不再展开,只需知道个大概即可,其架构图如下:

Netty启动流程剖析

 

EventLoopGroup创建本质就是创建多个NioEventLoop,这里创建NioEventLoop就是初始化一个Reactor,包括selector和taskQueue。主要逻辑如下:

protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) { // 创建NioEventLoop实例 children = new EventExecutor[nThreads]; // 初始化NioEventLoop,实际调用的是NioEventLoopGroup.newChild方法 for (int i = 0; i < nThreads; i ++) {
 children[i] = newChild(executor, args);
 }

 // 多个NioEventLoop中选择策略
 chooser = chooserFactory.newChooser(children);
}

NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
 // 创建taskQueue
 super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
 // 是不是很熟悉,java nio selector操作
 provider = selectorProvider;
 final SelectorTuple selectorTuple = openSelector();
 selector = selectorTuple.selector;
 unwrAppedSelector = selectorTuple.unwrappedSelector;
 selectStrategy = strategy;
}

EventLoopGroup创建OK后,启动的第一步就算完成了,接下来该进行bind、listen操作了。

ServerBootstrap流程

bind操作

bind操作是ServerBootstrap流程重要的一环,bind流程涉及到NioChannel的创建、初始化和注册(到Selector),启动NioEventLoop,之后就可以对外提供服务了。

public ChannelFuture bind(SocketAddress localAddress) {
 validate(); // 参数校验
 return doBind(localAddress);
}
private ChannelFuture doBind(final SocketAddress localAddress) {
 // 1. 初始化注册操作
 final ChannelFuture regFuture = initAndRegister();
 final Channel channel = regFuture.channel();
 if (regFuture.cause() != null) {
 return regFuture;
 }
 
 // 2. doBind0操作
 if (regFuture.isDone()) {
 // register已完成,这里直接调用doBind0
 ChannelPromise promise = channel.newPromise();
 doBind0(regFuture, channel, localAddress, promise);
 return promise;
 } else {
 // register还未完成,注册listener回调,在回调中调用doBind0
 final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
 regFuture.addListener(new ChannelFutureListener() {
 /**
 * channel register完成(注册到Selector并且调用了invokeHandlerAddedIfNeeded)之后,
 * 会调用safeSetSuccess,触发各个ChannelFutureListener,最终会调用到这里的operationComplete方法
 */
 @Override
 public void operationComplete(ChannelFuture future) throws Exception {
 Throwable cause = future.cause();
 if (cause != null) {
 promise.setFailure(cause);
 } else {
 promise.registered();
 doBind0(regFuture, channel, localAddress, promise);
 }
 }
 });
 return promise;
 }
}

这里涉及到2个操作,一个是channel的创建、初始化、注册操作,另一个是bind操作,下面兵分两路,分别来讲。

注意,这里如果main线程执行到regFuture.isDone()时,register还未完成,那么main线程是不会直接调用bind操作的,而是往regFuture上注册一个Listenner,这样channel register完成(注册到Selector并且调用了invokeHandlerAddedIfNeeded)之后,会调用safeSetSuccess,触发各个ChannelFutureListener,最终会调用到这里的operationComplete方法,进而在执行bind操作。

channel初始化、注册操作

final ChannelFuture initAndRegister() {
 Channel channel = null;
 try {
 // 1.创建(netty自定义)Channel实例,并初始化
 // channel为 NioServerSocketChannel 实例,NioServerSocketChannel的父类AbstractNioChannel保存有nio的ServerSocketChannel
 channel = channelFactory.newChannel();
 // 2.初始化channel()
 init(channel);
 } catch (Throwable t) {
 }
 
 // 3.向Selector注册channel
 ChannelFuture regFuture = config().group().register(channel);
 if (regFuture.cause() != null) {
 if (channel.isRegistered()) {
 channel.close();
 } else {
 channel.unsafe().closeForcibly();
 }
 }
 
 return regFuture;
}

这里重点关注下初始化channel流程,主要操作是设置channel属性、设置channel.pipeline的ChannelInitializer,注意,ChannelInitializer是在channel注册到selector之后被回调的。

/** * 初始channel属性,也就是ChannelOption对应socket的各种属性。 * 比如 SO_KEEPALIVE SO_RCVBUF ... 可以与linux中的setsockopt函数对应起来。 * 最后将ServerBootstrapAcceptor添加到对应channel的ChannelPipeline中。 */@Overridevoid init(Channel channel) throws Exception { final Map, Object> options = options0(); synchronized (options) { setChannelOptions(channel, options, logger); }  ChannelPipeline p = channel.pipeline(); // 获取childGroup和childHandler,传递给ServerBootstrapAcceptor final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry, Object>[] currentChildOptions; final Entry, Object>[] currentChildAttrs; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0)); } synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0)); }  p.addLast(new ChannelInitializer() {
 /**
 * 在register0中,将channel注册到Selector之后,会调用invokeHandlerAddedIfNeeded,
 * 进而调用到这里的initChannel方法
 */
 @Override
 public void initChannel(final Channel ch) throws Exception {
 final ChannelPipeline pipeline = ch.pipeline();
 ChannelHandler handler = config.handler();
 if (handler != null) {
 pipeline.addLast(handler);
 }
 
 // 这里注册一个添加ServerBootstrapAcceptor的任务
 ch.eventLoop().execute(new Runnable() {
 @Override
 public void run() {
 // 添加ServerBootstrapAcceptor
 pipeline.addLast(new ServerBootstrapAcceptor(
 ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
 }
 });
 }
 });
}

channel初始化之后就该将其注册到selector,即下面的register流程:

public ChannelFuture register(Channel channel) {
 // next()挑选一个EventLoop,默认轮询选择某个NioEventLoop
 return next().register(channel);
}
public ChannelFuture register(final ChannelPromise promise) {
 promise.channel().unsafe().register(this, promise);
 return promise;
}
// AbstractChannel
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
 AbstractChannel.this.eventLoop = eventLoop;
 
 // 直接执行register0或者以任务方式提交执行
 // 启动时,首先执行到这里的是main线程,所以是以任务的方式来提交执行的。
 // 也就是说,该任务是NioEventLoop第一次执行的任务,即调用register0
 if (eventLoop.inEventLoop()) {
 register0(promise);
 } else {
 // 往NioEventLoop中(任务队列)添加任务时,如果NioEventLoop线程还未启动,则启动该线程
 eventLoop.execute(new Runnable() {
 @Override
 public void run() {
 register0(promise);
 }
 });
 }
}

register操作

register操作之后伴随着多个回调及listener的触发:

// AbstractChannel$AbstractUnsafe
private void register0(ChannelPromise promise) {
 boolean firstRegistration = neverRegistered;
 // 这里调用的是AbstractNioChannel.doRegister
 // 这里将channel注册上去,并没有关注对应的事件(read/write事件)
 doRegister();
 neverRegistered = false;
 registered = true;
 
 // 调用handlerAdd事件,这里就会调用initChannel方法,设置channel.pipeline,也就是添加 ServerBootstrapAcceptor
 pipeline.invokeHandlerAddedIfNeeded();
 
 // 调用operationComplete回调
 safeSetSuccess(promise);
 // 回调fireChannelRegistered
 pipeline.fireChannelRegistered();
 // Only fire a channelActive if the channel has never been registered. This prevents firing
 // multiple channel actives if the channel is deregistered and re-registered.
 if (isActive()) {
 if (firstRegistration) {
 // 回调fireChannelActive
 pipeline.fireChannelActive();
 } else if (config().isAutoRead()) {
 beginRead();
 }
 }
}

上面代码中的initChannel回调也就是设置对外监听channel的channelHanlder为ServerBootstrapAcceptor;operationComplete回调也就是触发ChannelFutureListener.operationComplete,这里会进行后续的doBind操作。

// AbstractBootstrap
private static void doBind0(
 final ChannelFuture regFuture, final Channel channel,
 final SocketAddress localAddress, final ChannelPromise promise) {
 // doBind0向EventLoop任务队列中添加一个bind任务来完成后续操作。
 channel.eventLoop().execute(new Runnable() {
 @Override
 public void run() {
 if (regFuture.isSuccess()) {
 // bind操作
 channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
 }
 }
 });
}

bind操作

在回顾上面的bind操作代码,bind操作是在register之后进行的,因为register0是由NioEventLoop执行的,所以main线程需要先判断下future是否完成,如果完成直接进行doBind即可,否则添加listener回调进行doBind。

Netty启动流程剖析

 

bind操作及后续初始化操作(channelActive回调、设置监听事件)

public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
 boolean wasActive = isActive();
 try {
 // 调用底层bind操作
 doBind(localAddress);
 } catch (Throwable t) {
 safeSetFailure(promise, t);
 closeIfClosed();
 return;
 }

 if (!wasActive && isActive()) {
 invokeLater(new Runnable() {
 @Override
 public void run() {
 pipeline.fireChannelActive();
 }
 });
 }
 safeSetSuccess(promise);
}

// 最后底层bind逻辑bind入参包括了backlog,也就是底层会进行listen操作
// DefaultChannelPipeline.headContext -> NioMessageUnsafe -> NioServerSocketChannel
protected void doBind(SocketAddress localAddress) throws Exception {
 if (PlatformDependent.javaVersion() >= 7) {
 javaChannel().bind(localAddress, config.getBacklog());
 } else {
 javaChannel().socket().bind(localAddress, config.getBacklog());
 }
}

public void channelActive(ChannelHandlerContext ctx) throws Exception {
 // 回调fireChannelActive
 ctx.fireChannelActive();
 
 // 设置selectKey监听事件,对于监听端口就是SelectionKey.OP_ACCEPT,对于新建连接就是SelectionKey.OP_READ
 readIfIsAutoRead();
}

到这里为止整个netty启动流程就基本接近尾声,可以对外提供服务了。



Tags:Netty   点击:()  评论:()
声明:本站部分内容及图片来自互联网,转载是出于传递更多信息之目的,内容观点仅代表作者本人,如有任何标注错误或版权侵犯请与我们联系(Email:2595517585@qq.com),我们将及时更正、删除,谢谢。
▌相关推荐
前言在实现TCP长连接功能中,客户端断线重连是一个很常见的问题,当我们使用netty实现断线重连时,是否考虑过如下几个问题: 如何监听到客户端和服务端连接断开 ? 如何实现断线后重...【详细内容】
2021-12-24  Tags: Netty  点击:(12)  评论:(0)  加入收藏
这篇文章对于排查使用了 netty 引发的堆外内存泄露问题,有一定的通用性,希望对你有所启发 背景最近在做一个基于 websocket 的长连中间件,服务端使用实现了 socket.io 协议(基于...【详细内容】
2021-12-16  Tags: Netty  点击:(22)  评论:(0)  加入收藏
简介在之前的文章中,我们提到了在netty的客户端通过使用Http2FrameCodec和Http2MultiplexHandler可以支持多路复用,也就是说在一个连接的channel基础上创建多个子channel,通过...【详细内容】
2021-12-14  Tags: Netty  点击:(8)  评论:(0)  加入收藏
本系列为 Netty 学习笔记,本篇介绍总结Java NIO 网络编程。Netty 作为一个异步的、事件驱动的网络应用程序框架,也是基于NIO的客户、服务器端的编程框架。其对 Java NIO 底层...【详细内容】
2021-12-07  Tags: Netty  点击:(17)  评论:(0)  加入收藏
想要阅读Netty源码的同学,建议从GitHub上把源码拉下来,方便写注释、Debug调试哦~点我去下载! 先来看一个简单的Echo服务端程序,监听本地的9999端口,有客户端接入时控制台输出一句...【详细内容】
2021-10-22  Tags: Netty  点击:(45)  评论:(0)  加入收藏
相信很多人知道石中剑这个典故,在此典故中,天命注定的亚瑟很容易的就拔出了这把石中剑,但是由于资历不被其他人认可,所以他颇费了一番周折才成为了真正意义上的英格兰全境之王,亚...【详细内容】
2021-07-22  Tags: Netty  点击:(102)  评论:(0)  加入收藏
家纯 阿里技术 一 什么是 Netty? 能做什么? Netty 是一个致力于创建高性能网络应用程序的成熟的 IO 框架。 相比较与直接使用底层的 Java IO API,你不需要先成为网络专家就...【详细内容】
2021-06-23  Tags: Netty  点击:(136)  评论:(0)  加入收藏
客户端代码package com.huanfeng.test;import java.io.BufferedReader;import java.io.InputStreamReader;import java.net.URL;import java.net.URLConnection;import java...【详细内容】
2021-05-26  Tags: Netty  点击:(146)  评论:(0)  加入收藏
前言:作为一个刚踏入职场的实习生,我很幸运参加了某个政府项目,并且在项目中负责一个核心模块功能的开发,而不是从头到尾对数据库的crud。虽然我一直心里抱怨我的工作范围根本...【详细内容】
2021-05-24  Tags: Netty  点击:(231)  评论:(0)  加入收藏
要求1.群聊系统可以实现服务器端和客户端之间的数据简单通讯(非阻塞)2.通过系统可以实现多人群聊3.服务器端:可以监控用户上线,离线,并实现消息转发功能4.客户端:通过channel可...【详细内容】
2021-05-24  Tags: Netty  点击:(147)  评论:(0)  加入收藏
▌简易百科推荐
面向对象的特征之一封装 面向对象的特征之二继承 方法重写(override/overWrite) 方法的重载(overload)和重写(override)的区别: 面向对象特征之三:多态 Instanceof关键字...【详细内容】
2021-12-28  顶顶架构师    Tags:面向对象   点击:(2)  评论:(0)  加入收藏
一、Redis使用过程中一些小的注意点1、不要把Redis当成数据库来使用二、Arrays.asList常见失误需求:把数组转成list集合去处理。方法:Arrays.asList 或者 Java8的stream流式处...【详细内容】
2021-12-27  CF07    Tags:Java   点击:(3)  评论:(0)  加入收藏
文章目录 如何理解面向对象编程? JDK 和 JRE 有什么区别? 如何理解Java中封装,继承、多态特性? 如何理解Java中的字节码对象? 你是如何理解Java中的泛型的? 说说泛型应用...【详细内容】
2021-12-24  Java架构师之路    Tags:JAVA   点击:(5)  评论:(0)  加入收藏
大家好!我是老码农,一个喜欢技术、爱分享的同学,从今天开始和大家持续分享JVM调优方面的经验。JVM调优是个大话题,涉及的知识点很庞大 Java内存模型 垃圾回收机制 各种工具使用 ...【详细内容】
2021-12-23  小码匠和老码农    Tags:JVM调优   点击:(12)  评论:(0)  加入收藏
前言JDBC访问Postgresql的jsonb类型字段当然可以使用Postgresql jdbc驱动中提供的PGobject,但是这样在需要兼容多种数据库的系统开发中显得不那么通用,需要特殊处理。本文介绍...【详细内容】
2021-12-23  dingle    Tags:JDBC   点击:(13)  评论:(0)  加入收藏
Java与Lua相互调用案例比较少,因此项目使用需要做详细的性能测试,本内容只做粗略测试。目前已完成初版Lua-Java调用框架开发,后期有时间准备把框架进行抽象,并开源出来,感兴趣的...【详细内容】
2021-12-23  JAVA小白    Tags:Java   点击:(11)  评论:(0)  加入收藏
Java从版本5开始,在 java.util.concurrent.locks包内给我们提供了除了synchronized关键字以外的几个新的锁功能的实现,ReentrantLock就是其中的一个。但是这并不意味着我们可...【详细内容】
2021-12-17  小西学JAVA    Tags:JAVA并发   点击:(11)  评论:(0)  加入收藏
一、概述final是Java关键字中最常见之一,表示“最终的,不可更改”之意,在Java中也正是这个意思。有final修饰的内容,就会变得与众不同,它们会变成终极存在,其内容成为固定的存在。...【详细内容】
2021-12-15  唯一浩哥    Tags:Java基础   点击:(17)  评论:(0)  加入收藏
1、问题描述关于java中的日志管理logback,去年写过关于logback介绍的文章,这次项目中又优化了下,记录下,希望能帮到需要的朋友。2、解决方案这次其实是碰到了一个问题,一般的情况...【详细内容】
2021-12-15  软件老王    Tags:logback   点击:(19)  评论:(0)  加入收藏
本篇文章我们以AtomicInteger为例子,主要讲解下CAS(Compare And Swap)功能是如何在AtomicInteger中使用的,以及提供CAS功能的Unsafe对象。我们先从一个例子开始吧。假设现在我们...【详细内容】
2021-12-14  小西学JAVA    Tags:JAVA   点击:(22)  评论:(0)  加入收藏
最新更新
栏目热门
栏目头条