您当前的位置:首页 > 电脑百科 > 程序开发 > 架构

从netty-example分析Netty组件

时间:2019-09-06 11:20:08  来源:  作者:

分析netty从源码开始

准备工作:

1.下载源代码:https://github.com/netty/netty.git

我下载的版本为4.1

2. eclipse导入maven工程。

netty提供了一个netty-example工程,

从netty-example分析Netty组件

 

分类如下:

Fundamental

  • Echo ‐ the very basic client and server
  • Discard ‐ see how to send an infinite data stream asynchronously without flooding the write buffer
  • Uptime ‐ implement automatic reconnection mechanism

Text protocols

  • Telnet ‐ a classic line-based network Application
  • Quote of the Moment ‐ broadcast a UDP/IP packet
  • SecureChat ‐ an TLS-based chat server, derived from the Telnet example

Binary protocols

  • ObjectEcho ‐ exchange serializable JAVA objects
  • Factorial ‐ write a stateful client and server with a custom binary protocol
  • WorldClock ‐ rapid protocol protyping with google Protocol Buffers integration

HTTP

  • Snoop ‐ build your own extremely light-weight HTTP client and server
  • File server ‐ asynchronous large file streaming in HTTP
  • Web Sockets (Client & Server) ‐ add a two-way full-duplex communication channel to HTTP using Web Sockets
  • SPDY (Client & Server) ‐ implement SPDY protocol
  • CORS demo ‐ implement cross-origin resource sharing

Advanced

  • Proxy server ‐ write a highly efficient tunneling proxy server
  • Port unification ‐ run services with different protocols on a single TCP/IP port

UDT

  • Byte streams ‐ use UDT in TCP-like byte streaming mode
  • Message flow ‐ use UDT in UDP-like message delivery mode
  • Byte streams in symmetric peer-to-peer rendezvous connect mode
  • Message flow in symmetric peer-to-peer rendezvous connect mode

我们的分析从这里开始,netty是client-server形式的,我们以最简单的discard示例开始,其服务器端代码如下:

/**
 * Discards any incoming data.
 */
public final class DiscardServer {
 static final boolean SSL = System.getProperty("ssl") != null;
 static final int PORT = Integer.parseInt(System.getProperty("port", "8009"));
 public static void main(String[] args) throws Exception {
 // Configure SSL.
 final SslContext sslCtx;
 if (SSL) {
 SelfSignedCertificate ssc = new SelfSignedCertificate();
 sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
 } else {
 sslCtx = null;
 }
 EventLoopGroup bossGroup = new NioEventLoopGroup(1);
 EventLoopGroup workerGroup = new NioEventLoopGroup();
 try {
 ServerBootstrap b = new ServerBootstrap();
 b.group(bossGroup, workerGroup)
 .channel(NIOServerSocketChannel.class)
 .handler(new LoggingHandler(LogLevel.INFO))
 .childHandler(new ChannelInitializer<SocketChannel>() {
 @Override
 public void initChannel(SocketChannel ch) {
 ChannelPipeline p = ch.pipeline();
 if (sslCtx != null) {
 p.addLast(sslCtx.newHandler(ch.alloc()));
 }
 p.addLast(new DiscardServerHandler());
 }
 });
 // Bind and start to accept incoming connections.
 ChannelFuture f = b.bind(PORT).sync();
 // Wait until the server socket is closed.
 // In this example, this does not happen, but you can do that to gracefully
 // shut down your server.
 f.channel().closeFuture().sync();
 } finally {
 workerGroup.shutdownGracefully();
 bossGroup.shutdownGracefully();
 }
 }
}

上面的代码中使用了下面几个类:

1. EventLoopGroup

实现类为NioEventLoopGroup,其层次结构为:

从netty-example分析Netty组件

 

EventExecutorGroup为所有类的父接口,它通过next()方法来提供EventExecutor供使用。除此以外,它还负责处理它们的生命周期,允许以优雅的方式关闭。

EventExecutor是一种特殊的EventExcutorGroup,它提供了一些便利的方法来查看一个线程是否在一个事件循环中执行过,除此以外,它也扩展了EventExecutorGroup,从而提供了一个通用的获取方法的方式。

EventLoopGroup是一种特殊的EventExcutorGroup,它提供了channel的注册功能,channel在事件循环中将被后面的selection来获取到。

NioEventLoopGroup继承自MultithreadEventLoopGroup,基于channel的NIO selector会使用该类。

2.ServerBootstrap使ServerChannel容易自举。

group(EventLoopGroup parentGroup, EventLoopGroup childGroup)方法设置父EventLoopGroup和子EventLoopGroup。这些EventLoopGroup用来处理所有的事件和ServerChannel和Channel的IO。

channel(Class<? extends C> channelClass)方法用来创建一个Channel实例。创建Channel实例要不使用此方法,如果channel实现是无参构造要么可以使用channelFactory来创建。

handler(ChannelHandler handler)方法,channelHandler用来处理请求的。

childHandler(ChannelHandler childHandler)方法,设置用来处理请求的channelHandler。

3. ChannelInitializer一种特殊的ChannelInboundHandler

当Channel注册到它的eventLoop中时,ChannelInitializer提供了一个方便初始化channel的方法。该类的实现通常用来设置ChannelPipeline的channel,通常使用在Bootstrap#handler(ChannelHandler),ServerBootstrap#handler(ChannelHandler)和ServerBootstrap#childHandler(ChannelHandler)三个场景中。示例:

 public class MyChannelInitializer extends ChannelInitializer{
 public void initChannel({@link Channel} channel) {
 channel.pipeline().addLast("myHandler", new MyHandler());
 }
 }

然后:

ServerBootstrap bootstrap = ...;
...
bootstrap.childHandler(new MyChannelInitializer());

注意:这个类标示为可共享的,因此实现类重用时需要时安全的。

4. ChannelPipeline相关

理解ChannelPipeline需要先理解ChannelHandler,

4.1 ChannelHandler

处理一个IO事件或者翻译一个IO操作,并且传递给ChannelPineline的下一个handler。

你可以使用ChannelHandlerAdapter来替代它

因为这个接口有太多接口需要实现,因此你只有实现ChannelHandlerAdapter就可以代替实现这个接口。

Context对象

ChannelHandlerContext封装了ChannelHandler。ChannelHandler应该通过context对象与它所属的ChannelPipeLine进行交互。通过使用context对象,ChannelHandler可以传递上行或者下行事件,或者动态修改pipeline,或者存储特定handler的信息(使用AttributeKey)。

状态管理

一个channelHandler通常需要存储一些状态信息。最简单最值得推荐的方法是使用member变量:

 public interface Message {
 // your methods here
 }
 
 public class DataServerHandler extends SimpleChannelInboundHandler<Message> {
 
 private boolean loggedIn;
 
 {@code @Override}
 protected void messageReceived( ChannelHandlerContext ctx, Message message) {
 Channel ch = e.getChannel();
 if (message instanceof LoginMessage) {
 authenticate((LoginMessage) message);
 loggedIn = true;
 } else (message instanceof GetDataMessage) {
 if (loggedIn) {
 ch.write(fetchSecret((GetDataMessage) message));
 } else {
 fail();
 }
 }
 }
 ...
 }

注意:handler的状态附在ChannePipelineContext上,因此可以增加相同的handler实例到不同的pipeline上:

 public class DataServerInitializer extends ChannelInitializer<Channel> {
 
 private static final DataServerHandler SHARED = new DataServerHandler();
 
 @Override
 public void initChannel(Channel channel) {
 channel.pipeline().addLast("handler", SHARED);
 }
 }

@Sharable注解

在上面的示例中,使用了一个AttributeKey,你可能注意到了@Sharable注解。

如果一个ChannelHandler使用@sharable进行注解,那就意味着你仅仅创建了一个handler一次,可以添加到一个或者多个ChannelPipeline多次而不会产生竞争。

如果没有指定该注解,你必须每次都创建一个新的handler实例,并且增加到一个ChannelPipeline,因为它没有像member变量一样,它有一个非共享的状态。

4.2 ChannelPipeline

ChanelPipeline是一组ChanelHandler的集合,它处理或者解析Channel的Inbound事件和OutBound操作。ChannelPipeline的实现是Intercepting Filter的一种高级形式,这样用户可以控制事件如何处理,一个pipeline内部ChannelHandler如何交互。

 pipeline事件流程

从netty-example分析Netty组件

 

上图描述了IO事件如何被一个ChannelPipeline的ChannelHandler处理的。一个IO事件被一个ChannelInBoundHandler处理或者ChannelOutboundHandler,然后通过调用在ChannelHandlerContext中定义的事件传播方法传递给最近的handler,传播方法有ChannelHandlerContext#filreChannelRead(Object)和ChannelHandlerContext#write(Object)。

一个Inbound事件通常由Inbound handler来处理,如上如左上。一个Inbound handler通常处理在上图底部IO线程产生的Inbound数据。Inbound数据通过真实的输入操作如SocketChannel#read(ByteBuffer)来获取。如果一个inbound事件越过了最上面的inbound handler,该事件将会被抛弃到而不会通知你或者如果你需要关注,打印出日志。

一个outbound事件由上图的右下的outbound handler来处理。一个outbound handler通常由outbound流量如写请求产生或者转变的。如果一个outbound事件越过了底部的outbound handler,它将由channel关联的IO线程处理。IO线程通常运行的是真实的输出操作如SocketChannel#write(byteBuffer).

示例,假设我们创建下面这样一个pipeline:

 ChannelPipeline} p = ...;
 p.addLast("1", new InboundHandlerA());
 p.addLast("2", new InboundHandlerB());
 p.addLast("3", new OutboundHandlerA());
 p.addLast("4", new OutboundHandlerB());
 p.addLast("5", new InboundOutboundHandlerX());

在上例中,inbound开头的handler意味着它是一个inbound handler。outbound开头的handler意味着它是一个outbound handler。上例的配置中当一个事件进入inbound时handler的顺序是1,2,3,4,5;当一个事件进入outbound时,handler的顺序是5,4,3,2,1.在这个最高准则下,ChannelPipeline跳过特定handler的处理来缩短stack的深度:

3,4没有实现ChannelInboundHandler,因而一个inbound事件的处理顺序是1,2,5.

1,2没有实现ChannelOutBoundhandler,因而一个outbound事件的处理顺序是5,4,3

若5同时实现了ChannelInboundHandler和channelOutBoundHandler,一个inbound和一个outbound事件的执行顺序分别是125和543.

一个事件跳向下一个handler

如上图所示,一个handler触发ChannelHandlerContext中的事件传播方法,然后传递到下一个handler。这些方法有:

inbound 事件传播方法:

 ChannelHandlerContext#fireChannelRegistered()
 ChannelHandlerContext#fireChannelActive()
 ChannelHandlerContext#fireChannelRead(Object)
 ChannelHandlerContext#fireChannelReadComplete()
 ChannelHandlerContext#fireExceptionCaught(Throwable)
 ChannelHandlerContext#fireUserEventTriggered(Object)
 ChannelHandlerContext#fireChannelWritabilityChanged()
 ChannelHandlerContext#fireChannelInactive()
 ChannelHandlerContext#fireChannelUnregistered()

outbound事件传播方法:

ChannelHandlerContext#bind(SocketAddress, ChannelPromise)
ChannelHandlerContext#connect(SocketAddress, SocketAddress, ChannelPromise)
ChannelHandlerContext#write(Object, ChannelPromise)
ChannelHandlerContext#flush()
ChannelHandlerContext#read()
ChannelHandlerContext#disconnect(ChannelPromise)
ChannelHandlerContext#close(ChannelPromise)
ChannelHandlerContext#deregister(ChannelPromise)

下面的示例展示了事件是如何传播的:

 public class MyInboundHandler extends ChannelInboundHandlerAdapter {
 @Override
 public void channelActive(ChannelHandlerContext} ctx) {
 System.out.println("Connected!");
 ctx.fireChannelActive();
 }
 }
 
 public clas MyOutboundHandler extends ChannelOutboundHandlerAdapter {
 @Override
 public void close(ChannelHandlerContext} ctx, ChannelPromise} promise) {
 System.out.println("Closing ..");
 ctx.close(promise);
 }
 }

创建一个pipeline

在pipeline中,一个用户一般由一个或者多个ChannelHandler来接收IO事件(例如读)和IO操作请求(如写或者close)。例如,一个典型的服务器pipeline通常具有以下几个handler,但最多有多少handler取决于协议和业务逻辑的复杂度:

Protocol Decoder--将二进制数据(如ByteBuffer)转换成一个java对象

Protocol Encoder--将一个java对象转换成二进制数据。

Business Logic Handler--处理真实的业务逻辑(如数据库访问)。

让我们用下面的示例展示:

 static final EventExecutorGroup group = new DefaultEventExecutorGroup(16);
 ...
 
 ChannelPipeline} pipeline = ch.pipeline();
 
 pipeline.addLast("decoder", new MyProtocolDecoder());
 pipeline.addLast("encoder", new MyProtocolEncoder());
 
 // Tell the pipeline to run MyBusinessLogicHandler's event handler methods
 // in a different thread than an I/O thread so that the I/O thread is not blocked by
 // a time-consuming task.
 // If your business logic is fully asynchronous or finished very quickly, you don't
 // need to specify a group.
 pipeline.addLast(group, "handler", new MyBusinessLogicHandler());

线程安全

因为ChannelPipeline是线程安全的,一个channelhandler可以在任意时间内增加或者删除。例如,当有敏感信息交换时,你可以插入一个加密handler,然后当信息交换结束后删除该handler。

4.3 Channel

Channel是网络socket的一个纽带或者一个处理IO操作如读、写、连接、绑定的组件。一个Channel提供如下信息:

当前channel的状态,如它是否开启?是否连接?

Channel的ChannelConfig的配置参数,如接受缓存大小;

channel支持的IO操作,如读、写、连接、绑定;

channel支持的ChannelPipeline,它处理所有的IO事件和channel关联的请求。

所有的IO操作都是异步的。

在Netty中所有的IO操作都是异步的。这意味着所有的IO调用将立即返回,但不保证在调用结束时请求的IO操作都已经执行完毕。而是在请求操作处理完成、失败或者取消时返回一个ChannelFuture来通知。

Channel是继承性的。

一个Channel可以它如何创建的来获取它的父Channel(#parent()方法)。例如:一个由ServerSocketChannel接受的SocketChannel调用parent()方法时返回ServerSocketChannel。

继承的结构依赖于Channel的所属transport实现。例如,你可以新写一个Channel实现,它创建了一个共享同一个socket连接的子channel,如BEEP和SSH

向下去获取特定transport操作。

一些transport会暴露一些该transport特定的操作。Channel向下转换到子类型可以触发这些操作。例如:老的IO datargram transport,DatagramChannel提供了多播的join和leave操作。

释放资源

当Channel处理完后,一定记得调用close()或者close(ChannelPromise)来释放资源。

5. channelFuture

channelFuture是异步IO操作的返回值。

在Netty中所有的IO操作都是异步的。这意味着所有的IO调用将立即返回,但不保证在调用结束时请求的IO操作都已经执行完毕。而是在请求操作处理完成、失败或者取消时返回一个ChannelFuture来通知。

当一个IO操作开始时,创建一个新的future。ChannelFuture要么是uncompleted,要么是completed。新的future开始时是uncompleted---既不是成功、失败,也不是取消,因为IO操作还没有开始呢。若IO操作结束时future要么成功,要么失败或者取消,标记为completed的future有更多特殊的意义,例如失败的原因。请注意:即使失败和取消也属于completed状态。

从netty-example分析Netty组件

 

有很多方法可以查询IO操作是否完成:等待完成,检索IO操作的结果。同样也允许你增加ChannelFutureListenner,这样你可以在IO操作完成后获得通知。

在尽可能的情况下,推荐addListenner()方法而不是await()方法,当IO操作完成后去完成接下来的其它任务时去获取通知。

6.ChannelHandlerContext

对ChannelHandler相关信息的包装。

小结

netty处理请求的总流程是经过ChannelPipeline中的多个ChannelHandler后,返回结果ChannelFuture。如下图所示:

从netty-example分析Netty组件

 

具体I/O操作调用的流程,

应用->Channel的I/O操作->调用Pipeline相应的I/O操作->调用ChannelHandlerContext的相应I/O操作->调用ChannelHandler的相应操作->Channel.UnSafe中相关的I/O操作。

应用为什么不直接调用Channel.UnSafe接口中的I/O操作呢,而要绕一个大圈呢?因为它是框架,要支持扩展。

执行者完成操作后,是如何通知命令者的呢?一般流程是这样的:

Channel.UnSafe中执行相关的I/O操作,根据操作结果->调用ChannelPipeline中相应发fireXXXX()接口->调用ChannelHandlerContext中相应的fireXXXX()接口->调用ChannelHandler中相应方法->调用应用中的相关逻辑。

参考文献:

【1】http://www.jiancool.com/article/71493268111/

【2】http://blog.csdn.net/pingnanlee/article/details/11973769



Tags:Netty   点击:()  评论:()
声明:本站部分内容及图片来自互联网,转载是出于传递更多信息之目的,内容观点仅代表作者本人,如有任何标注错误或版权侵犯请与我们联系(Email:2595517585@qq.com),我们将及时更正、删除,谢谢。
▌相关推荐
前言在实现TCP长连接功能中,客户端断线重连是一个很常见的问题,当我们使用netty实现断线重连时,是否考虑过如下几个问题: 如何监听到客户端和服务端连接断开 ? 如何实现断线后重...【详细内容】
2021-12-24  Tags: Netty  点击:(12)  评论:(0)  加入收藏
这篇文章对于排查使用了 netty 引发的堆外内存泄露问题,有一定的通用性,希望对你有所启发 背景最近在做一个基于 websocket 的长连中间件,服务端使用实现了 socket.io 协议(基于...【详细内容】
2021-12-16  Tags: Netty  点击:(22)  评论:(0)  加入收藏
简介在之前的文章中,我们提到了在netty的客户端通过使用Http2FrameCodec和Http2MultiplexHandler可以支持多路复用,也就是说在一个连接的channel基础上创建多个子channel,通过...【详细内容】
2021-12-14  Tags: Netty  点击:(8)  评论:(0)  加入收藏
本系列为 Netty 学习笔记,本篇介绍总结Java NIO 网络编程。Netty 作为一个异步的、事件驱动的网络应用程序框架,也是基于NIO的客户、服务器端的编程框架。其对 Java NIO 底层...【详细内容】
2021-12-07  Tags: Netty  点击:(17)  评论:(0)  加入收藏
想要阅读Netty源码的同学,建议从GitHub上把源码拉下来,方便写注释、Debug调试哦~点我去下载! 先来看一个简单的Echo服务端程序,监听本地的9999端口,有客户端接入时控制台输出一句...【详细内容】
2021-10-22  Tags: Netty  点击:(45)  评论:(0)  加入收藏
相信很多人知道石中剑这个典故,在此典故中,天命注定的亚瑟很容易的就拔出了这把石中剑,但是由于资历不被其他人认可,所以他颇费了一番周折才成为了真正意义上的英格兰全境之王,亚...【详细内容】
2021-07-22  Tags: Netty  点击:(102)  评论:(0)  加入收藏
家纯 阿里技术 一 什么是 Netty? 能做什么? Netty 是一个致力于创建高性能网络应用程序的成熟的 IO 框架。 相比较与直接使用底层的 Java IO API,你不需要先成为网络专家就...【详细内容】
2021-06-23  Tags: Netty  点击:(136)  评论:(0)  加入收藏
客户端代码package com.huanfeng.test;import java.io.BufferedReader;import java.io.InputStreamReader;import java.net.URL;import java.net.URLConnection;import java...【详细内容】
2021-05-26  Tags: Netty  点击:(146)  评论:(0)  加入收藏
前言:作为一个刚踏入职场的实习生,我很幸运参加了某个政府项目,并且在项目中负责一个核心模块功能的开发,而不是从头到尾对数据库的crud。虽然我一直心里抱怨我的工作范围根本...【详细内容】
2021-05-24  Tags: Netty  点击:(231)  评论:(0)  加入收藏
要求1.群聊系统可以实现服务器端和客户端之间的数据简单通讯(非阻塞)2.通过系统可以实现多人群聊3.服务器端:可以监控用户上线,离线,并实现消息转发功能4.客户端:通过channel可...【详细内容】
2021-05-24  Tags: Netty  点击:(147)  评论:(0)  加入收藏
▌简易百科推荐
为了构建高并发、高可用的系统架构,压测、容量预估必不可少,在发现系统瓶颈后,需要有针对性地扩容、优化。结合楼主的经验和知识,本文做一个简单的总结,欢迎探讨。1、QPS保障目标...【详细内容】
2021-12-27  大数据架构师    Tags:架构   点击:(5)  评论:(0)  加入收藏
前言 单片机开发中,我们往往首先接触裸机系统,然后到RTOS,那么它们的软件架构是什么?这是我们开发人员必须认真考虑的问题。在实际项目中,首先选择软件架构是非常重要的,接下来我...【详细内容】
2021-12-23  正点原子原子哥    Tags:架构   点击:(7)  评论:(0)  加入收藏
现有数据架构难以支撑现代化应用的实现。 随着云计算产业的快速崛起,带动着各行各业开始自己的基于云的业务创新和信息架构现代化,云计算的可靠性、灵活性、按需计费的高性价...【详细内容】
2021-12-22    CSDN  Tags:数据架构   点击:(10)  评论:(0)  加入收藏
▶ 企业级项目结构封装释义 如果你刚毕业,作为Java新手程序员进入一家企业,拿到代码之后,你有什么感觉呢?如果你没有听过多模块、分布式这类的概念,那么多半会傻眼。为什么一个项...【详细内容】
2021-12-20  蜗牛学苑    Tags:微服务   点击:(9)  评论:(0)  加入收藏
我是一名程序员关注我们吧,我们会多多分享技术和资源。进来的朋友,可以多了解下青锋的产品,已开源多个产品的架构版本。Thymeleaf版(开源)1、采用技术: springboot、layui、Thymel...【详细内容】
2021-12-14  青锋爱编程    Tags:后台架构   点击:(21)  评论:(0)  加入收藏
在了解连接池之前,我们需要对长、短链接建立初步认识。我们都知道,网络通信大部分都是基于TCP/IP协议,数据传输之前,双方通过“三次握手”建立连接,当数据传输完成之后,又通过“四次挥手”释放连接,以下是“三次握手”与“四...【详细内容】
2021-12-14  架构即人生    Tags:连接池   点击:(17)  评论:(0)  加入收藏
随着移动互联网技术的快速发展,在新业务、新领域、新场景的驱动下,基于传统大型机的服务部署方式,不仅难以适应快速增长的业务需求,而且持续耗费高昂的成本,从而使得各大生产厂商...【详细内容】
2021-12-08  架构驿站    Tags:分布式系统   点击:(23)  评论:(0)  加入收藏
本系列为 Netty 学习笔记,本篇介绍总结Java NIO 网络编程。Netty 作为一个异步的、事件驱动的网络应用程序框架,也是基于NIO的客户、服务器端的编程框架。其对 Java NIO 底层...【详细内容】
2021-12-07  大数据架构师    Tags:Netty   点击:(17)  评论:(0)  加入收藏
前面谈过很多关于数字化转型,云原生,微服务方面的文章。虽然自己一直做大集团的SOA集成平台咨询规划和建设项目,但是当前传统企业数字化转型,国产化和自主可控,云原生,微服务是不...【详细内容】
2021-12-06  人月聊IT    Tags:架构   点击:(23)  评论:(0)  加入收藏
微服务看似是完美的解决方案。从理论上来说,微服务提高了开发速度,而且还可以单独扩展应用的某个部分。但实际上,微服务带有一定的隐形成本。我认为,没有亲自动手构建微服务的经历,就无法真正了解其复杂性。...【详细内容】
2021-11-26  GreekDataGuy  CSDN  Tags:单体应用   点击:(35)  评论:(0)  加入收藏
最新更新
栏目热门
栏目头条