您当前的位置:首页 > 电脑百科 > 站长技术 > 服务器

Netty入门实践-模拟IM聊天

时间:2023-12-05 15:13:46  来源:  作者:不焦躁的程序员

我们使用的框架几乎都有网络通信的模块,比如常见的Dubbo、RocketMQ、ElasticSearch等。它们的网络通信模块使.NETty实现,之所以选择Netty,有2个主要原因:

  • Netty封装了复杂的JDK 的 NIO操作,还封装了各种复杂的异常场景,丰富的API使得在使用上也非常方便,几行代码就可以实现高性能的网络通信功能。
  • Netty已经经历各种大型中间件的生产环境的验证,高可用性和健壮性都得到了全方位验证,用起来更放心。

本文以入门实践为主,通过原理+代码的方式,实现一个简易IM聊天功能。分为2个部分:Netty的核心概念、IM聊天简易实现。

一、Netty核心概念

1、通信流程

既然是网络通信,那肯定有服务端和客户端。在客户端-A和客户端-B通信的过程中,实际上是利用服务端作为消息中转站,来实现A-B通信的。

不管是点-点通信,还是群通信,都可以认为是客户端-服务端之间的通信,有了这一点,许多设计方案都可以轻松理解。

Netty入门实践-模拟IM聊天

 

2、服务端核心概念

Boss线程

Boss线程负责监听端口,接受新的连接,监听连接的数据读写变化。

Worker线程

Worker线程负责处理具体的业务逻辑,Boss线程接收到连接的读写变化后,然后交给Worker处理具体业务逻辑。

服务端的IO模型

Netty支持使用NIO和BIO进行通信,可以自行设置。一般使用NIOServerSocketChannel来指定NIO模型。

服务端引导类

服务端通过引导类 ServerBootstrap来启动一系列的工作。

3、客户端核心概念

Worker线程

客户端只有工作线程的概念,负责连接到服务端,监听数据读写变化。

客户端的IO模型

一般使用NioSocketChannel指定客户端的NIO模型

客户端引导类

客户端通过引导类Bootstrap来启动一些列工作。

4、通用核心概念

Handler

负责处理接受到的消息,大部分的业务逻辑都是放在Handler里处理。自定义的Handler一般继承于
SimpleChannelInboundHandler或者ChannelInboundHandlerAdapter。

ByteBuf和编码、解码

数据的载体,JAVA对象编码成字节码,存放于ByteBuf,然后发送出去。服务端接收到消息后,从ByteBuf中取出数据,解码成Java对象。

通讯协议

许多框架都会自定义一套自己的协议,这样比较符合业务。比如dubbo协议、hessian协议。

一般的协议包括如下部分:魔数、版本号、序列化算法、指令、数据长度、数据内容,其余的都是为了适配自身业务而定的。

Netty入门实践-模拟IM聊天

 

  • 魔数:一般是固定数字,用来快速判断是否符合本协议,如果不符合本协议,则快速失败。
  • 版本号:一般无需改动,如果早期设置的协议到了后续不适用了,在升级版本号。
  • 序列化算法:Java对象转序列化的方式,比如JSON。
  • 指令:操作大类。比如说登录指令、单点发送消息指令、建群指令等。这样服务端接收到对应指令就用对应的Handler去处理业务逻辑。指令占用的字节数可以根据自身业务适当调大。
  • 数据长度:用来记录本次数据的长度。
  • 数据内容:具体消息内容,比如聊天时的消息、登录时的用户名密码等。

粘包拆包

Netty属于上层应用,在发送消息时,还是通过底层操作系统将数据发送出去,操作系统在发送数据时,不会按照我们设想的消息长度去发送内容。这就需要我们在接收到内容时,自行做好内容的分割和等待。

比如有一条消息1024字节,如果接受的内容没这么长就需要继续等待,等这条消息的内容完整后,在处理。如果接受的内容包含了1条完整消息和1条不完整的消息,那么就需要拆分内容,将完整的消息先传递到后面处理,剩下不完整的消息则继续等待下一个内容。

Netty自带了几种拆包器:固定长度的拆包器 FixedLengthFrameDecoder、行拆包器 LineBasedFrameDecoder、分隔符拆包器
DelimiterBasedFrameDecoder、长度域拆包器LengthFieldBasedFrameDecoder。

一般在使用自定义协议时,会使用:长度域拆包器
LengthFieldBasedFrameDecoder。

空闲检测和定时心跳

在服务端和客户端的通信过程中,有时候会出现假死连接,或者长时间没有消息传递需要释放连接。对于这些连接,我们需要及时释放,毕竟每条连接都占用着CPU和内存资源。大量这种连接如果不及时释放,服务器资源迟早会耗尽,最终崩溃。

应对这种问题的解决方式是:Netty提供了IdleStateHandler做空闲检测,用来检测连接是否活跃,如果再指定的时间内,没有活跃,那么就关闭连接。然后就是客户端定时发送心跳请求,服务器响应心跳请求。

二、IM聊天简易实现

介绍完Netty的核心概念,接下来以一个简易的点对点IM聊天,将核心概念融入到案例中。IM聊天的核心模块大致是如下几个:

1、通信主体流程

通信主体流程就是搭建好:服务端、客户端、两端正常建立连接进行通信。

服务端代码:

public static void mAIn(String[] args) {
    ServerBootstrap serverBootstrap = new ServerBootstrap();

    NioEventLoopGroup boss = new NioEventLoopGroup();
    NioEventLoopGroup worker = new NioEventLoopGroup();
    serverBootstrap
            .group(boss, worker)
            .channel(NioServerSocketChannel.class)
            .childHandler(new ChannelInitializer<NioSocketChannel>() {
                protected void initChannel(NioSocketChannel ch) {
                    ch.pipeline().addLast(new StringDecoder());
                    ch.pipeline().addLast(new SimpleChannelInboundHandler<String>() {
                        @Override
                        protected void channelRead0(ChannelHandlerContext ctx, String msg) {
                            System.out.println("server accept: " + msg);
                        }
                    });
                }
            });
    serverBootstrap.bind(9000)
            .addListener(future -> {
                if (future.isSuccess()) {
                    System.out.println("端口9000绑定成功");
                } else {
                    System.err.println("端口9000绑定失败");
                }
            });
}

客户端代码:

public static void main(String[] args) throws InterruptedException {
    Bootstrap bootstrap = new Bootstrap();
    NioEventLoopGroup group = new NioEventLoopGroup();

    bootstrap.group(group)
            .channel(NioSocketChannel.class)
            .handler(new ChannelInitializer<Channel>() {
                @Override
                protected void initChannel(Channel ch) {
                    ch.pipeline().addLast(new StringEncoder());
                }
            });

    bootstrap.connect("127.0.0.1", 9000)
            .addListener(future -> {
                if (future.isSuccess()) {
                    System.out.println("链接服务端成功");
                    Channel channel = ((ChannelFuture) future).channel();
                    channel.writeAndFlush("我是客户端A");
                } else {
                    System.err.println("连接服务端失败");
                }
            });
}

2、数据包—包含通讯协议

定义数据包的抽象类,后续的各种类型的数据包都继承此类。数据包中定义通讯协议的各种字段。

@Data
public abstract class Packet {
    /**
     * 协议版本
     */
    private Byte version = 1;

    /**
     * 指令,此处有多种实现:比如登录、登出、单聊、建群等等
     *
     * @return
     */
    public abstract Byte getCommand();

    /**
     * 获取算法,默认使用JSON,如果使用其余算法,子类重写此方法
     *
     * @return
     */
    public Byte getSerializeAlgorithm() {
        return SerializerAlgorithm.JSON;
    }
}

public class LoginRequestPacket extends Packet {
    private String userName;

    private String password;

    @Override
    public Byte getCommand() {
        return Command.LOGIN_REQUEST;
    }
}

3、序列化器

定义序列化器,功能包括:序列化、反序列化。可以定义多种序列化算法,文中以JSON为例。

public interface Serializer {
    /**
     * 序列化算法
     *
     * @return
     */
    byte getSerializerAlgorithm();

    /**
     * java 对象转换成二进制
     */
    byte[] serialize(Object object);

    /**
     * 二进制转换成 java 对象
     */
    <T> T deserialize(Class<T> clazz, byte[] bytes);
}

public class JSONSerializer implements Serializer {

    @Override
    public byte getSerializerAlgorithm() {
        return SerializerAlgorithm.JSON;
    }

    @Override
    public byte[] serialize(Object object) {
        return JSON.toJSONBytes(object);
    }

    @Override
    public <T> T deserialize(Class<T> clazz, byte[] bytes) {
        return JSON.parseobject(bytes, clazz);
    }
}

4、编解码器

有了通讯协议、有了序列化协议,接下来就是对数据的编码和解码了。

public void encode(ByteBuf byteBuf, Packet packet) {
    Serializer serializer = getSerializer(packet.getSerializeAlgorithm());

    // 1. 序列化 java 对象
    byte[] bytes = serializer.serialize(packet);

    // 2. 实际编码过程
    byteBuf.writeInt(MAGIC_NUMBER);
    byteBuf.writeByte(packet.getVersion());
    byteBuf.writeByte(packet.getSerializeAlgorithm());
    byteBuf.writeByte(packet.getCommand());
    byteBuf.writeInt(bytes.length);
    byteBuf.writeBytes(bytes);
}


public Packet decode(ByteBuf byteBuf) {
    // 跳过 magic number
    byteBuf.skipBytes(4);
    // 跳过版本号
    byteBuf.skipBytes(1);
    // 读取序列化算法
    byte serializeAlgorithm = byteBuf.readByte();
    // 读取指令
    byte command = byteBuf.readByte();
    // 读取数据包长度
    int length = byteBuf.readInt();
    // 读取数据
    byte[] bytes = new byte[length];
    byteBuf.readBytes(bytes);

    Class<? extends Packet> requestType = getRequestType(command);
    Serializer serializer = getSerializer(serializeAlgorithm);

    if (requestType != null && serializer != null) {
        return serializer.deserialize(requestType, bytes);
    }

    return null;
}

5、消息处理器Handler

以上把通讯的基本架子和收发消息的数据包、协议、编解码器等基础工具已经做完,接下来就是编写Handler实现具体的业务逻辑了。

这里以客户端发起登录功能为例,分3步,消息收发也是类似:

  1. 先在客户端发送登录请求数据包。
  2. 服务端接收到登录请求数据包后,在服务端的Handler里做业务逻辑处理,然后发送响应给客户端。
  3. 客户端接收到登录响应数据包后,在客户端的Handler里做业务逻辑处理。

效果如下

Netty入门实践-模拟IM聊天

核心代码如下

  • 客户端发送请求
bootstrap.connect("127.0.0.1", 9000)
                .addListener(future -> {
                    if (future.isSuccess()) {
                        System.out.println("连接服务端成功");
                        Channel channel = ((ChannelFuture) future).channel();
                        // 连接之后,假设再这里发起各种操作指令,采用异步线程开始发送各种指令,发送数据用到的的channel是必不可少的
                        sendActionCommand(channel);
                    } else {
                        System.err.println("连接服务端失败");
                    }
                });

private static void sendActionCommand(Channel channel) {
        // 直接采用控制台输入的方式,模拟操作指令
        Scanner scanner = new Scanner(System.in);
        LoginActionCommand loginActionCommand = new LoginActionCommand();
        new Thread(() -> {
            loginActionCommand.exec(scanner, channel);
        }).start();
    }

  • 服务端接受请求,并且处理
protected void channelRead0(ChannelHandlerContext ctx, LoginRequestPacket loginRequestPacket) {
    LoginResponsePacket loginResponsePacket = new LoginResponsePacket();
    loginResponsePacket.setVersion(loginRequestPacket.getVersion());
    loginResponsePacket.setUserName(loginRequestPacket.getUserName());

    if (valid(loginRequestPacket)) {
        loginResponsePacket.setSuccess(true);
        String userId = IDUtil.randomId();
        loginResponsePacket.setUserId(userId);
        System.out.println("[" + loginRequestPacket.getUserName() + "]登录成功");
        SessionUtil.bindSession(new Session(userId, loginRequestPacket.getUserName()), ctx.channel());
    } else {
        loginResponsePacket.setReason("校验失败");
        loginResponsePacket.setSuccess(false);
        System.out.println("登录失败!");
    }

    // 登录响应
    ctx.writeAndFlush(loginResponsePacket);
}

private boolean valid(LoginRequestPacket loginRequestPacket) {
    System.out.println("服务端LoginRequestHandler,正在校验客户端登录请求");
    return true;
}
  • 客户端接受响应,并且处理
public class LoginResponseHandler extends SimpleChannelInboundHandler<LoginResponsePacket> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, LoginResponsePacket loginResponsePacket) {
        String userId = loginResponsePacket.getUserId();
        String userName = loginResponsePacket.getUserName();

        if (loginResponsePacket.isSuccess()) {
            System.out.println("[" + userName + "]登录成功,userId为: " + loginResponsePacket.getUserId());
            SessionUtil.bindSession(new Session(userId, userName), ctx.channel());
        } else {
            System.out.println("[" + userName + "]登录失败,原因为:" + loginResponsePacket.getReason());
        }
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) {
        System.out.println("客户端连接被关闭!");
    }
}

6、空闲检测和定时心跳

主流程和主要功能已经实现,还剩最后一个空闲检测和定时心跳。

实现步骤:

  1. 客户端和服务端都先定义好空闲检测。如果再规定的时间内没有数据传输,则关闭通道。
  2. 客户端定时发送心跳
  3. 服务端处理心跳请求,发送响应给客户端

核心代码

空闲检测代码:

/**
 * IM聊天空闲检测器
 * 比如:20秒内没有数据,则关闭通道
 */
public class ImIdleStateHandler extends IdleStateHandler {

    private static final int READER_IDLE_TIME = 20;

    public ImIdleStateHandler() {
        super(READER_IDLE_TIME, 0, 0, TimeUnit.SECONDS);
    }

    @Override
    protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) {
        System.out.println(READER_IDLE_TIME + "秒内未读到数据,关闭连接!");
        ctx.channel().close();
    }
}

客户端定时心跳代码:

public void channelActive(ChannelHandlerContext ctx) throws Exception {
        scheduleSendHeartBeat(ctx);

        super.channelActive(ctx);
    }

    private void scheduleSendHeartBeat(ChannelHandlerContext ctx) {
        // 此处无需使用scheduleAtFixedRate,因为如果通道失效后,就无需在发起心跳了,按照目前的方式是最好的:成功一次安排一次
        ctx.executor().schedule(() -> {

            if (ctx.channel().isActive()) {
                System.out.println("定时任务发送心跳!");
                ctx.writeAndFlush(new HeartBeatRequestPacket());
                scheduleSendHeartBeat(ctx);
            }

        }, HEARTBEAT_INTERVAL, TimeUnit.SECONDS);
    }

服务端响应心跳代码:

public class ImIdleStateHandler extends IdleStateHandler {

    private static final int READER_IDLE_TIME = 20;

    public ImIdleStateHandler() {
        super(READER_IDLE_TIME, 0, 0, TimeUnit.SECONDS);
    }

    @Override
    protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) {
        System.out.println(READER_IDLE_TIME + "秒内未读到数据,关闭连接!");
        ctx.channel().close();
    }
}

三、总结

本文介绍了Netty的核心概念,以及基本使用方法,希望能够帮到你。本文核心词:

  • 通信流程
  • Boss线程、Worker线程
  • 处理消息的Handler
  • 通讯协议、序列化协议、编解码器
  • 空闲检测、定时心跳

本文完整代码:
https://Github.com/yclxiao/netty-demo.git



Tags:Netty   点击:()  评论:()
声明:本站部分内容及图片来自互联网,转载是出于传递更多信息之目的,内容观点仅代表作者本人,不构成投资建议。投资者据此操作,风险自担。如有任何标注错误或版权侵犯请与我们联系,我们将及时更正、删除。
▌相关推荐
Netty入门实践:模拟IM聊天
我们使用的框架几乎都有网络通信的模块,比如常见的Dubbo、RocketMQ、ElasticSearch等。它们的网络通信模块使用Netty实现,之所以选择Netty,有两个主要原因: Netty封装了复杂的JD...【详细内容】
2023-12-08  Search: Netty  点击:(241)  评论:(0)  加入收藏
Netty入门实践-模拟IM聊天
我们使用的框架几乎都有网络通信的模块,比如常见的Dubbo、RocketMQ、ElasticSearch等。它们的网络通信模块使用Netty实现,之所以选择Netty,有2个主要原因: Netty封装了复杂的JDK...【详细内容】
2023-12-05  Search: Netty  点击:(108)  评论:(0)  加入收藏
如何使用Netty模拟一个Web服务端
Netty作为Web服务端具有以下好处:高性能Netty是一个基于事件驱动和异步非阻塞的网络编程框架,它使用了高效的NIO(非阻塞输入输出)模型。这使得Netty在处理大量并发连接时表现出...【详细内容】
2023-09-11  Search: Netty  点击:(271)  评论:(0)  加入收藏
玩转Netty,从“Hello World”开始!
为什么要用Netty?首先当然是NIO的使用,本身比较复杂,而且还存在一些问题。除此之外,如果在项目的开发中,要实现稳定的网络通信,就得考虑网络的闪断、客户端的重复接入、客户端的...【详细内容】
2023-05-16  Search: Netty  点击:(417)  评论:(0)  加入收藏
Netty和原生Java的性能比较
Netty是一种基于异步事件循环的网络应用编程方法。本文对比Netty与Java的本地服务器。虽然目前本地服务器使用的人不多,但我仍要找出netty比本地服务器的好处有多少。让我们...【详细内容】
2023-05-16  Search: Netty  点击:(87)  评论:(0)  加入收藏
基于Netty搭建WebSocket,模仿微信聊天页面
作者:小傅哥 博客:https://bugstack.cn 沉淀、分享、成长,让自己和他人都能有所收获!前言介绍 本章节我们模仿微信聊天页面,开发一个基于Netty搭建WebSocket通信案例。Netty的应...【详细内容】
2023-03-17  Search: Netty  点击:(237)  评论:(0)  加入收藏
Netty:遇到TCP发送缓冲区满了 写半包操作该如何处理
什么是写半包写半包:一份数据,一次发送没有把他全部发送,需要循环发送,那么第一次的操作称为写半包什么情况下会出现写半包:发送方发送200byte,但是接收方只能接受100byte,因此发送...【详细内容】
2023-03-13  Search: Netty  点击:(207)  评论:(0)  加入收藏
Java使用Netty框架自建DNS代理服务器教程
前言DNS协议作为着互联网客户端-服务器通信模式得第一关,在当下每天都有成千上亿上网记录产生得当今社会,其重要性自然不可言喻。在国内比较有名得DNS服务器有电信得114.114.1...【详细内容】
2023-02-09  Search: Netty  点击:(148)  评论:(0)  加入收藏
Netty基础介绍(使用场景、组件、模型、代码示例等)
Netty简介Netty 对 JDK 自带的 NIO 的 API 进行了良好的封装,解决了如客户端面临断线重连、 网络闪断、心跳处理、半包读写、 网络拥塞和异常流的处理等等问题。且Netty拥有...【详细内容】
2023-01-10  Search: Netty  点击:(145)  评论:(0)  加入收藏
从Redis、HTTP协议,看Nett协议设计,我发现了个惊天大秘密
1. 协议的作用TCP/IP 中消息传输基于流的方式,没有边界协议的目的就是划定消息的边界,制定通信双方要共同遵守的通信规则2. Redis 协议如果我们要向 Redis 服务器发送一条 set...【详细内容】
2023-01-03  Search: Netty  点击:(302)  评论:(0)  加入收藏
▌简易百科推荐
为什么Nginx被称为“反向”代理呢?
Nginx(发音为"engine-x")是一款高性能、轻量级的开源Web服务器软件,也可用作反向代理服务器、负载均衡器和HTTP缓存。Nginx之所以有被称为“反向”代理,是因为它充当客户端设备...【详细内容】
2024-02-01  coderidea  微信公众号  Tags:Nginx   点击:(59)  评论:(0)  加入收藏
哪种服务器操作系统更好呢?
在当今的IT世界中,服务器操作系统扮演着至关重要的角色。它们是确保服务器能够高效、安全地运行的关键因素。然而,对于许多人来说,服务器操作系统的种类和特点可能是一个复杂的...【详细内容】
2024-01-30    简易百科  Tags:操作系统   点击:(75)  评论:(0)  加入收藏
什么是VPS服务器
VPS服务器是一种虚拟化技术,它将一台物理服务器划分为多个虚拟的独立服务器,每个虚拟服务器都可以拥有自己的操作系统、运行环境、应用程序等。这种技术使得每个虚拟服务器可...【详细内容】
2024-01-30    简易百科  Tags:VPS服务器   点击:(67)  评论:(0)  加入收藏
VPS服务器下载速度慢?这五招帮你提速
VPS服务器下载速度慢可能会让用户感到沮丧,尤其是对于需要大量下载和上传数据的用户。幸运的是,有一些方法可以帮助您提高VPS服务器的下载速度,使您的在线体验更加顺畅。在本文...【详细内容】
2024-01-30  IDC行业观察者    Tags:VPS服务器   点击:(56)  评论:(0)  加入收藏
美国VPS和英国VPS:地理位置对服务器性能的影响
在今天的数字时代,VPS已成为在线业务和网站托管的关键组成部分。然而,选择合适的VPS主机服务时,地理位置通常被忽视,尽管它对服务器性能有着重要的影响。本文将探讨美国VPS和英...【详细内容】
2024-01-26  IDC行业观察者    Tags:服务器   点击:(50)  评论:(0)  加入收藏
如何判断服务器所需带宽:基于业务需求和流量模式的关键考量
在选择服务器时,带宽是一个重要的考虑因素。带宽的大小直接影响到网站的加载速度和用户的访问体验。那么,如何判断服务器需要多大的带宽呢?本文将为你揭示这一关键问题的答案...【详细内容】
2024-01-26  源库科技    Tags:服务器   点击:(71)  评论:(0)  加入收藏
服务器内存空间及IO操作原理解析
服务器的内存空间分为内核空间和用户空间,而我们编写的程序通常在用户空间中运行。在进行读写操作时,我们直接操作的是用户缓冲区,而用户缓冲区的内容来自于内核缓冲区。这种内...【详细内容】
2024-01-23  王建立    Tags:服务器   点击:(42)  评论:(0)  加入收藏
如何在Java环境中安装Nginx?
1. 下载Nginx:首先,前往Nginx官方网站(https://nginx.org/en/download.html)下载新版本的Nginx。选择适合您操作系统的版本,通常有Windows、Linux和Mac等不同操作系统的版本可供...【详细内容】
2024-01-22  敲代码的小动    Tags:Nginx   点击:(58)  评论:(0)  加入收藏
服务器证书和SSL证书有啥区别?
在互联网经济时代,随着越来越多的信息以及合作都是从企业官网开始的,因此绝大多数企业都会为自己的网站配置SSL证书,以提高安全性。在接触SSL证书时,也有很多人称之为服务器证书...【详细内容】
2024-01-10  安信SSL证书    Tags:服务器证书   点击:(65)  评论:(0)  加入收藏
宝塔面板怎样部署java项目?
宝塔面板怎样部署java项目?在使用宝塔面板部署Java项目之前,需要确保已经安装了Java Development Kit (JDK)。接下来,将介绍如何使用宝塔面板来部署Java项目的步骤。步骤一:安装...【详细内容】
2024-01-09  西部数码    Tags:宝塔面板   点击:(103)  评论:(0)  加入收藏
站内最新
站内热门
站内头条