您当前的位置:首页 > 电脑百科 > 站长技术 > 服务器

Netty客户端断线重连实现及问题思考

时间:2021-12-24 14:25:41  来源:CSDN  作者:程序猿阿嘴

前言

在实现TCP长连接功能中,客户端断线重连是一个很常见的问题,当我们使用netty实现断线重连时,是否考虑过如下几个问题:

  • 如何监听到客户端和服务端连接断开 ?
  • 如何实现断线后重新连接 ?
  • netty客户端线程给多大比较合理 ?

其实上面都是笔者在做断线重连时所遇到的问题,而 “netty客户端线程给多大比较合理?” 这个问题更是笔者在做断线重连时因一个异常引发的思考。下面讲讲整个过程:

因为本节讲解内容主要涉及在客户端,但是为了读者能够运行整个程序,所以这里先给出服务端及公共的依赖和实体类。

服务端及common代码

maven依赖:

<dependencies>
    <!--只是用到了spring-boot的日志框架-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
        <version>2.4.1</version>
    </dependency>

    <dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-all</artifactId>
        <version>4.1.56.Final</version>
    </dependency>

    <dependency>
        <groupId>org.jboss.marshalling</groupId>
        <artifactId>jboss-marshalling-serial</artifactId>
        <version>2.0.10.Final</version>
    </dependency>
</dependencies>

服务端业务处理代码

主要用于记录打印当前客户端连接数,当接收到客户端信息后返回“hello netty”字符串

@ChannelHandler.Sharable
public class SimpleServerHandler extends ChannelInboundHandlerAdapter {
    private static final InternalLogger log = InternalLoggerFactory.getInstance(SimpleServerHandler.class);
    public static final ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        channels.add(ctx.channel());
        log.info("客户端连接成功: client address :{}", ctx.channel().remoteAddress());
        log.info("当前共有{}个客户端连接", channels.size());
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        log.info("server channelRead:{}", msg);
        ctx.channel().writeAndFlush("hello netty");
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        log.info("channelInactive: client close");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        if (cause instanceof JAVA.io.IOException) {
            log.warn("exceptionCaught: client close");
        } else {
            cause.printStackTrace();
        }
    }
}

服务端心跳检查代码

当接收心跳"ping"信息后,返回客户端’'pong"信息。如果客户端在指定时间内没有发送任何信息则关闭客户端。

public class ServerHeartbeatHandler extends ChannelInboundHandlerAdapter {
    private static final InternalLogger log = InternalLoggerFactory.getInstance(ServerHeartbeatHandler.class);

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        log.info("server channelRead:{}", msg);
        if (msg.equals("ping")) {
            ctx.channel().writeAndFlush("pong");
        } else {
            //由下一个handler处理,示例中则为SimpleServerHandler
            ctx.fireChannelRead(msg);
        }
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            //该事件需要配合 io.netty.handler.timeout.IdleStateHandler使用
            IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
            if (idleStateEvent.state() == IdleState.READER_IDLE) {
                //超过指定时间没有读事件,关闭连接
                log.info("超过心跳时间,关闭和服务端的连接:{}", ctx.channel().remoteAddress());
                //ctx.channel().close();
            }
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }
}

编解码工具类

主要使用jboss-marshalling-serial编解码工具,可自行查询其优缺点,这里只是示例使用。

public final class MarshallingCodeFactory {
    /** 创建Jboss marshalling 解码器 */
    public static MarshallingDecoder buildMarshallingDecoder() {
        //参数serial表示创建的是Java序列化工厂对象,由jboss-marshalling-serial提供
        MarshallerFactory factory = Marshalling.getProvidedMarshallerFactory("serial");
        MarshallingConfiguration configuration = new MarshallingConfiguration();
        configuration.setVersion(5);
        DefaultUnmarshallerProvider provider = new DefaultUnmarshallerProvider(factory, configuration);
        return new MarshallingDecoder(provider, 1024);
    }

    /** 创建Jboss marshalling 编码器 */
    public static MarshallingEncoder buildMarshallingEncoder() {
        MarshallerFactory factory = Marshalling.getProvidedMarshallerFactory("serial");
        MarshallingConfiguration configuration = new MarshallingConfiguration();
        configuration.setVersion(5);
        DefaultMarshallerProvider provider = new DefaultMarshallerProvider(factory, configuration);
        return new MarshallingEncoder(provider);
    }
}

公共实体类

public class UserInfo implements Serializable {
    private static final long serialVersionUID = 6271330872494117382L;
 
    private String username;
    private int age;

    public UserInfo() {
    }

    public UserInfo(String username, int age) {
        this.username = username;
        this.age = age;
    }
   //省略getter/setter/toString
}

下面开始本文的重点,客户端断线重连以及问题思考。

客户端实现

  • 刚开始启动时需要进行同步连接,指定连接次数内没用通过则抛出异常,进程退出。
  • 客户端启动后,开启定时任务,模拟客户端数据发送。

客户端业务处理handler,接收到数据后,通过日志打印。

public class SimpleClientHandler extends ChannelInboundHandlerAdapter {
    private static final InternalLogger log = InternalLoggerFactory.getInstance(SimpleClientHandler.class);
    private NettyClient client;

    public SimpleClientHandler(NettyClient client) {
        this.client = client;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        log.info("client receive:{}", msg);
    }
}

封装连接方法、断开连接方法、getChannel()返回io.netty.channel.Channel用于向服务端发送数据。boolean connect()是一个同步连接方法,如果连接成功返回true,连接失败返回false。

public class NettyClient {
    private static final InternalLogger log = InternalLoggerFactory.getInstance(NettyClient.class);

    private EventLoopGroup workerGroup;
    private Bootstrap bootstrap;
    private volatile Channel clientChannel;

    public NettyClient() {
        this(-1);
    }

    public NettyClient(int threads) {
        workerGroup = threads > 0 ? new NioEventLoopGroup(threads) : new NioEventLoopGroup();
        bootstrap = new Bootstrap();
        bootstrap.group(workerGroup)
                .channel(NIOSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY, true)
                .option(ChannelOption.SO_KEEPALIVE, false)
                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 30000)
                .handler(new ClientHandlerInitializer(this));
    }

    public boolean connect() {
        log.info("尝试连接到服务端: 127.0.0.1:8088");
        try {
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8088);

            boolean notTimeout = channelFuture.awaitUninterruptibly(30, TimeUnit.SECONDS);
            clientChannel = channelFuture.channel();
            if (notTimeout) {
                if (clientChannel != null && clientChannel.isActive()) {
                    log.info("netty client started !!! {} connect to server", clientChannel.localAddress());
                    return true;
                }
                Throwable cause = channelFuture.cause();
                if (cause != null) {
                    exceptionHandler(cause);
                }
            } else {
                log.warn("connect remote host[{}] timeout {}s", clientChannel.remoteAddress(), 30);
            }
        } catch (Exception e) {
            exceptionHandler(e);
        }
        clientChannel.close();
        return false;
    }

    private void exceptionHandler(Throwable cause) {
        if (cause instanceof ConnectException) {
            log.error("连接异常:{}", cause.getMessage());
        } else if (cause instanceof ClosedChannelException) {
            log.error("connect error:{}", "client has destroy");
        } else {
            log.error("connect error:", cause);
        }
    }

    public void close() {
        if (clientChannel != null) {
            clientChannel.close();
        }
        if (workerGroup != null) {
            workerGroup.shutdownGracefully();
        }
    }

    public Channel getChannel() {
        return clientChannel;
    }

    static class ClientHandlerInitializer extends ChannelInitializer<SocketChannel> {
        private static final InternalLogger log = InternalLoggerFactory.getInstance(NettyClient.class);
        private NettyClient client;

        public ClientHandlerInitializer(NettyClient client) {
            this.client = client;
        }

        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            pipeline.addLast(MarshallingCodeFactory.buildMarshallingDecoder());
            pipeline.addLast(MarshallingCodeFactory.buildMarshallingEncoder());
            //pipeline.addLast(new IdleStateHandler(25, 0, 10));
            //pipeline.addLast(new ClientHeartbeatHandler());
            pipeline.addLast(new SimpleClientHandler(client));
        }
    }
}

客户端启动类

public class NettyClientMain {
    private static final InternalLogger log = InternalLoggerFactory.getInstance(NettyClientMain.class);
    private static final ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor();

    public static void main(String[] args) {
        NettyClient nettyClient = new NettyClient();
        boolean connect = false;
        //刚启动时尝试连接10次,都无法建立连接则不在尝试
        //如果想在刚启动后,一直尝试连接,需要放在线程中,异步执行,防止阻塞程序
        for (int i = 0; i < 10; i++) {
            connect = nettyClient.connect();
            if (connect) {
                break;
            }
            //连接不成功,隔5s之后重新尝试连接
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        if (connect) {
            log.info("定时发送数据");
            send(nettyClient);
        } else {
            nettyClient.close();
            log.info("进程退出");
        }
    }

    /** 定时发送数据 */
    static void send(NettyClient client) {
        scheduledExecutor.schedule(new SendTask(client,scheduledExecutor), 2, TimeUnit.SECONDS);
    }
}

客户端断线重连

断线重连需求:

  • 服务端和客户端之间网络异常,或响应超时(例如有个很长时间的fullGC),客户端需要主动重连其他节点。
  • 服务端宕机时或者和客户端之间发生任何异常时,客户端需要主动重连其他节点。
  • 服务端主动向客户端发送(服务端)下线通知时,客户端需要主动重连其他节点。

如何监听到客户端和服务端连接断开 ?

netty的io.netty.channel.ChannelInboundHandler接口中给我们提供了许多重要的接口方法。为了避免实现全部的接口方法,可以通过继承io.netty.channel.ChannelInboundHandlerAdapter来重写相应的方法即可。

1.void channelInactive(ChannelHandlerContext ctx);在客户端关闭时被调用,表示客户端断开连接。当有以下几种情况发生时会触发:

  • 客户端在正常active状态下,主动调用channel或者ctx的close方法。
  • 服务端主动调用channel或者ctx的close方法关闭客户端的连接 。
  • 发生java.io.IOException(一般情况下是双方连接断开)或者java.lang.OutOfMemoryError(4.1.52版本中新增)时

2.void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;则是在入栈发生任何异常时被调用。如果异常是java.io.IOException或者java.lang.OutOfMemoryError(4.1.52版本新增)时,还会触发channelInactive方法,也就是上面channelInactive被触发的第3条情况。

3.心跳检查也是检查客户端与服务端之间连接状态的必要方式,因为在一些状态下,两端实际上已经断开连接,但客户端无法感知,这时候就需要通过心跳来判断两端的连接状态。心跳可以是客户端心跳和服务端心跳。

  • 客户端信跳:即为客户端发送心跳ping信息,服务端回复pong信息。这样在指定时间内,双方有数据交互则认为是正常连接状态。
  • 服务端信息:则是服务端向客户端发送ping信息,客户端回复pong信息。在指定时间内没有收到回复,则认为对方下线。

netty给我们提供了非常简单的心跳检查方式,只需要在channel的handler链上,添加io.netty.handler.timeout.IdleStateHandler即可实现。

IdleStateHandler有如下几个重要的参数:

  • readerIdleTimeSeconds, 读超时. 即当在指定的时间间隔内没有从 Channel 读取到数据时, 会触发一个READER_IDLE的IdleStateEvent 事件.
  • writerIdleTimeSeconds, 写超时. 即当在指定的时间间隔内没有数据写入到 Channel 时, 会触发一个WRITER_IDLE的IdleStateEvent 事件.
  • allIdleTimeSeconds, 读/写超时. 当在指定的时间间隔内没有读或写操作时, 会触发一个ALL_IDLE的IdleStateEvent 事件.

为了能够监听到这些事件的触发,还需要重写ChannelInboundHandler#userEventTriggered(ChannelHandlerContext ctx, Object evt)方法,通过参数evt判断事件类型。在指定的时间内如果没有读写则发送一条心跳的ping请求,在指定时间内没有收到读操作则任务已经和服务端断开连接。则调用channel或者ctx的close方法,使客户端Handler执行channelInactive方法。

到这里看来我们只要在channelInactive和exceptionCaught两个方法中实现自己的重连逻辑即可,但是笔者遇到了第一个坑,重连方法执行了两次。

先看示例代码和结果,在com.bruce.netty.rpc.client.SimpleClientHandler中添加如下代码:

public class SimpleClientHandler extends ChannelInboundHandlerAdapter {
    private static final InternalLogger log = InternalLoggerFactory.getInstance(SimpleClientHandler.class);
    //省略部分代码......
    /** 客户端正常下线时执行该方法 */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        log.warn("channelInactive:{}", ctx.channel().localAddress());
        reconnection(ctx);
    }

    /** 入栈发生异常时执行exceptionCaught */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        if (cause instanceof IOException) {
            log.warn("exceptionCaught:客户端[{}]和远程断开连接", ctx.channel().localAddress());
        } else {
            log.error(cause);
        }
        reconnection(ctx);
    }

    private void reconnection(ChannelHandlerContext ctx) {
        log.info("5s之后重新建立连接");
        //暂时为空实现
    }
}

ClientHandlerInitializer 中添加io.netty.handler.timeout.IdleStateHandler用于心跳检查,ClientHeartbeatHandler用于监听心跳事件,接收心跳pong回复。

static class ClientHandlerInitializer extends ChannelInitializer<SocketChannel> {
    private static final InternalLogger log = InternalLoggerFactory.getInstance(NettyClient.class);
    private NettyClient client;

    public ClientHandlerInitializer(NettyClient client) {
        this.client = client;
    }

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(MarshallingCodeFactory.buildMarshallingDecoder());
        pipeline.addLast(MarshallingCodeFactory.buildMarshallingEncoder());
        //25s内没有read操作则触发READER_IDLE事件
        //10s内既没有read又没有write操作则触发ALL_IDLE事件
        pipeline.addLast(new IdleStateHandler(25, 0, 10));
        pipeline.addLast(new ClientHeartbeatHandler());
        pipeline.addLast(new SimpleClientHandler(client));
    }
}

com.bruce.netty.rpc.client.ClientHeartbeatHandler

public class ClientHeartbeatHandler extends ChannelInboundHandlerAdapter {
    private static final InternalLogger log = InternalLoggerFactory.getInstance(ClientHeartbeatHandler.class);

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg.equals("pong")) {
            log.info("收到心跳回复");
        } else {
            super.channelRead(ctx, msg);
        }
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            //该事件需要配合 io.netty.handler.timeout.IdleStateHandler使用
            IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
            if (idleStateEvent.state() == IdleState.ALL_IDLE) {
                //向服务端发送心跳检测
                ctx.writeAndFlush("ping");
                log.info("发送心跳数据");
            } else if (idleStateEvent.state() == IdleState.READER_IDLE) {
                //超过指定时间没有读事件,关闭连接
                log.info("超过心跳时间,关闭和服务端的连接:{}", ctx.channel().remoteAddress());
                ctx.channel().close();
            }
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }
}

先启动server端,再启动client端,待连接成功之后kill掉 server端进程。

Netty客户端断线重连实现及问题思考

 

通过客户端日志可以看出,先是执行了exceptionCaught方法然后执行了channelInactive方法,但是这两个方法中都调用了reconnection方法,导致同时执行了两次重连。

为什么执行了exceptionCaught方法又执行了channelInactive方法呢?

我们可以在exceptionCaught和channelInactive方法添加断点一步步查看源码

Netty客户端断线重连实现及问题思考

 

当NioEventLoop执行select操作之后,处理相应的SelectionKey,发生异常后,会调用AbstractNioByteChannel.NioByteUnsafe#handleReadException方法进行处理,并触发pipeline.fireExceptionCaught(cause),最终调用到用户handler的fireExceptionCaught方法。

private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close,
  RecvByteBufAllocator.Handle allocHandle) {
 if (byteBuf != null) {
  if (byteBuf.isReadable()) {
   readPending = false;
   pipeline.fireChannelRead(byteBuf);
  } else {
   byteBuf.release();
  }
 }
 allocHandle.readComplete();
 pipeline.fireChannelReadComplete();
 pipeline.fireExceptionCaught(cause);

 // If oom will close the read event, release connection.
 // See https://github.com/netty/netty/issues/10434
 if (close || cause instanceof OutOfMemoryError || cause instanceof IOException) {
  closeonRead(pipeline);
 }
}

该方法最后会判断异常类型,执行close连接的方法。在连接断线的场景中,这里即为java.io.IOException,所以执行了close方法,当debug到AbstractChannel.AbstractUnsafe#close(ChannelPromise, Throwable, ClosedChannelException, notify)方法中会发现最后又调用了AbstractUnsafe#fireChannelInactiveAndDeregister方法,继续debug最后则会执行自定义的fireChannelInactive方法。

到这里可以总结一个知识点:netty中当执行到handler地fireExceptionCaught方法时,可能会继续触发到fireChannelInactive,也可能不会触发fireChannelInactive。

除了netty根据异常类型判断是否执行close方法外,其实开发人员也可以自己通过ctx或者channel去调用close方法,代码如下:

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    if (cause instanceof IOException) {
        log.warn("exceptionCaught:客户端[{}]和远程断开连接", ctx.channel().localAddress());
    } else {
        log.error(cause);
    }
    //ctx.close();
    ctx.channel().close();
}

但这种显示调用close方法,是否一定会触发调用fireChannelInactive呢?

如果是,那么只需要在exceptionCaught中调用close方法,fireChannelInactive中做重连的逻辑即可!!

在笔者通过日志观察到,在exceptionCaught中调用close方法每次都会调用fireChannelInactive方法。但是查看源码,笔者认为这是不一定的,因为在AbstractChannel.AbstractUnsafe#close(ChannelPromise,Throwable, ClosedChannelException, notify)中会调用io.netty.channel.Channel#isActive进行判断,只有为true,才会执行fireChannelInactive方法。

//io.netty.channel.socket.nio.NioSocketChannel#isActive
@Override
public boolean isActive() {
    SocketChannel ch = javaChannel();
    return ch.isOpen() && ch.isConnected();
}

如何解决同时执行两次问题呢?

在netty初始化时,我们都会添加一系列的handler处理器,这些handler实际上会在netty创建Channel对象(NioSocketChannel)时,被封装在DefaultChannelPipeline中,而DefaultChannelPipeline实际上是一个双向链表,头节点为TailContext,尾节点为TailContext,而中间的节点则是我们添加的一个个handler(被封装成DefaultChannelHandlerContext),当执行Pipeline上的方法时,会从链表上遍历handler执行,因此当执行exceptionCaught方法时,我们只需要提前移除链表上自定义的Handler则无法执行fireChannelInactive方法。

Netty客户端断线重连实现及问题思考

 

最后实现代码如下:

public class SimpleClientHandler extends ChannelInboundHandlerAdapter {

    private static final InternalLogger log = InternalLoggerFactory.getInstance(SimpleClientHandler.class);

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        log.warn("channelInactive:{}", ctx.channel().localAddress());
  ctx.pipeline().remove(this);
        ctx.channel().close();
        reconnection(ctx);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        if (cause instanceof IOException) {
            log.warn("exceptionCaught:客户端[{}]和远程断开连接", ctx.channel().localAddress());
        } else {
            log.error(cause);
        }
        ctx.pipeline().remove(this);
        //ctx.close();
        ctx.channel().close();
        reconnection(ctx);
    }
}

执行效果如下,可以看到当发生异常时,只是执行了exceptionCaught方法,并且通过channel关闭了上一次连接资源,也没有执行当前handler的fireChannelInactive方法。

Netty客户端断线重连实现及问题思考

 

如何实现断线后重新连接 ?

通过上面分析,我们已经知道在什么方法中实现自己的重连逻辑,但是具体该怎么实现呢,怀着好奇的心态搜索了一下各大码友的实现方案。大多做法是通过ctx.channel().eventLoop().schedule添加一个定时任务调用客户端的连接方法。笔者也参考该方式实现代码如下:

private void reconnection(ChannelHandlerContext ctx) {
 log.info("5s之后重新建立连接");
 ctx.channel().eventLoop().schedule(new Runnable() {
  @Override
  public void run() {
   boolean connect = client.connect();
   if (connect) {
    log.info("重新连接成功");
   } else {
    reconnection(ctx);
   }
  }
 }, 5, TimeUnit.SECONDS);
}

测试:先启动server端,再启动client端,待连接成功之后kill掉 server端进程。客户端如期定时执行重连,但也就去茶水间倒杯水的时间,回来后发现了如下异常。

......省略14条相同的重试日志
[2021-01-17 18:46:45.032] INFO   [nioEventLoopGroup-2-1] [com.bruce.netty.rpc.client.SimpleClientHandler] : 5s之后重新建立连接
[2021-01-17 18:46:48.032] INFO   [nioEventLoopGroup-2-1] [com.bruce.netty.rpc.client.NettyClient] : 尝试连接到服务端: 127.0.0.1:8088
[2021-01-17 18:46:50.038] ERROR   [nioEventLoopGroup-2-1] [com.bruce.netty.rpc.client.NettyClient] : 连接异常:Connection refused: no further information: /127.0.0.1:8088
[2021-01-17 18:46:50.038] INFO   [nioEventLoopGroup-2-1] [com.bruce.netty.rpc.client.SimpleClientHandler] : 5s之后重新建立连接
[2021-01-17 18:46:53.040] INFO   [nioEventLoopGroup-2-1] [com.bruce.netty.rpc.client.NettyClient] : 尝试连接到服务端: 127.0.0.1:8088
[2021-01-17 18:46:53.048] ERROR   [nioEventLoopGroup-2-1] [com.bruce.netty.rpc.client.NettyClient] : connect error:
io.netty.util.concurrent.BlockingOperationException: DefaultChannelPromise@10122121(incomplete)
 at io.netty.util.concurrent.DefaultPromise.checkDeadLock(DefaultPromise.java:462)
 at io.netty.channel.DefaultChannelPromise.checkDeadLock(DefaultChannelPromise.java:159)
 at io.netty.util.concurrent.DefaultPromise.await0(DefaultPromise.java:667)
 at io.netty.util.concurrent.DefaultPromise.awaitUninterruptibly(DefaultPromise.java:305)
 at com.bruce.netty.rpc.client.NettyClient.connect(NettyClient.java:49)
 at com.bruce.netty.rpc.client.SimpleClientHandler$1.run(SimpleClientHandler.java:65)
 at io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98)
 at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:170)
 at io.netty.util.concurrent.AbstractEventExecutor.safeExecute$$$capture(AbstractEventExecutor.java:164)
 at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java)
 at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
 at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)

根据异常栈,可以发现是com.bruce.netty.rpc.client.NettyClient#connect方法中调用了等待方法

boolean notTimeout = channelFuture.awaitUninterruptibly(20, TimeUnit.SECONDS);

而该方法内部会进行检测,是否在io线程上执行了同步等待,这会导致抛出异常BlockingOperationException。

@Override
protected void checkDeadLock() {
    if (channel().isRegistered()) {
        super.checkDeadLock();
    }
}
protected void checkDeadLock() {
    EventExecutor e = executor();
    if (e != null && e.inEventLoop()) {
        throw new BlockingOperationException(toString());
    }
}

奇怪的是为什么不是每次尝试重连都抛出该异常,而是每隔16次抛出一次呢?

这让我联想到自己的笔记本是8核处理器,而netty默认线程池是2 * c,就是16条线程,这之间似乎有些关联。

实际上在调用ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8088);,netty首先会创建一个io.netty.channel.Channel(示例中是NioSocketChannel),然后通过io.netty.util.concurrent.EventExecutorChooserFactory.EventExecutorChooser依次选择一个NioEventLoop,将Channel绑定到NioEventLoop上。

Netty客户端断线重连实现及问题思考

 

io.netty.util.concurrent.SingleThreadEventExecutor#inEventLoop

//Return true if the given Thread is executed in the event loop, false otherwise.
@Override
public boolean inEventLoop(Thread thread) {
    return thread == this.thread;
}

重连的方法是在一个NioEventLoop(也就是io线程)上被调用,第1次重连实际上是选择了第2个NioEventLoop,第2次重连实际上是选择了第3个NioEventLoop,以此类推,当一轮选择过后,重新选到第一个NioEventLoop时,boolean inEventLoop()返回true,则抛出了BlockingOperationException。

方案1

不要在netty的io线程上执行同步连接,使用单独的线程池定时执行重试,该线程还可以执行自己重连的业务逻辑操作,不阻塞io线程。(如果不需要业务操作之后销毁线程池)。

com.bruce.netty.rpc.client.SimpleClientHandler 修改reconnection方法

private static ScheduledExecutorService SCHEDULED_EXECUTOR;

private void initScheduledExecutor() {
 if (SCHEDULED_EXECUTOR == null) {
  synchronized (SimpleClientHandler.class) {
   if (SCHEDULED_EXECUTOR == null) {
    SCHEDULED_EXECUTOR = Executors.newSingleThreadScheduledExecutor(r -> {
     Thread t = new Thread(r, "Client-Reconnect-1");
     t.setDaemon(true);
     return t;
    });
   }
  }
 }
}

private void reconnection(ChannelHandlerContext ctx) {
 log.info("5s之后重新建立连接");
 initScheduledExecutor();

 SCHEDULED_EXECUTOR.schedule(() -> {
  boolean connect = client.connect();
  if (connect) {
   //连接成功,关闭线程池
   SCHEDULED_EXECUTOR.shutdown();
   log.info("重新连接成功");
  } else {
   reconnection(ctx);
  }
 }, 3, TimeUnit.SECONDS);
}

方案2

可以在io线程上使用异步重连:

com.bruce.netty.rpc.client.NettyClient添加方法connectAsync方法,两者的区别在于connectAsync方法中没有调用channelFuture的同步等待方法。而是改成监听器(ChannelFutureListener)的方式,实际上这个监听器是运行在io线程上。

 public void connectAsync() {
    log.info("尝试连接到服务端: 127.0.0.1:8088");
    ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8088);
    channelFuture.addListener((ChannelFutureListener) future -> {
        Throwable cause = future.cause();
        if (cause != null) {
            exceptionHandler(cause);
            log.info("等待下一次重连");
            channelFuture.channel().eventLoop().schedule(this::connectAsync, 5, TimeUnit.SECONDS);
        } else {
            clientChannel = channelFuture.channel();
            if (clientChannel != null && clientChannel.isActive()) {
                log.info("Netty client started !!! {} connect to server", clientChannel.localAddress());
            }
        }
    });
}

com.bruce.netty.rpc.client.SimpleClientHandler

public class SimpleClientHandler extends ChannelInboundHandlerAdapter {
    private static final InternalLogger log = InternalLoggerFactory.getInstance(SimpleClientHandler.class);
    private NettyClient client;

    public SimpleClientHandler(NettyClient client) {
        this.client = client;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        log.info("client receive:{}", msg);
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        log.warn("channelInactive:{}", ctx.channel().localAddress());
        ctx.pipeline().remove(this);
        ctx.channel().close();
        reconnectionAsync(ctx);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        if (cause instanceof IOException) {
            log.warn("exceptionCaught:客户端[{}]和远程断开连接", ctx.channel().localAddress());
        } else {
            log.error(cause);
        }
        ctx.pipeline().remove(this);
        ctx.close();
        reconnectionAsync(ctx);
    }

    private void reconnectionAsync(ChannelHandlerContext ctx) {
        log.info("5s之后重新建立连接");
        ctx.channel().eventLoop().schedule(new Runnable() {
            @Override
            public void run() {
                client.connectAsync();
            }
        }, 5, TimeUnit.SECONDS);
    }
}

netty客户端线程给多大比较合理 ?

netty中一个NioEventLoopGroup默认创建的线程数是cpu核心数 * 2 ,这些线程都是用于io操作,那么对于客户端应用程序来说真的需要这么多io线程么?

通过上面分析BlockingOperationException异常时我们分析到,实际上netty在创建一个Channel对象后只会从NioEventLoopGroup中选择一个NioEventLoop来绑定,只有创建多个Channel才会依次选择下一个NioEventLoop,也就是说一个Channel只会对应一个NioEventLoop,而NioEventLoop可以绑定多个Channel。

1.对于客户端来说,如果只是连接的一个server节点,那么只要设置1条线程即可。即使出现了断线重连,在连接断开之后,之前的Channel会从NioEventLoop移除。重连之后,仍然只会在仅有的一个NioEventLoop注册一个新的Channel。

2.如果客户端同时如下方式多次调用io.netty.bootstrap.Bootstrap#connect(String inetHost, int inetPort)连接多个Server节点,那么线程可以设置大一点,但不要超过2*c,而且只要出现断线重连,同样不能保证每个NioEventLoop都会绑定一个客户端Channel。

 public boolean connect() {
      try {
          ChannelFuture channelFuture1 = bootstrap.connect("127.0.0.1", 8088);
          ChannelFuture channelFuture2 = bootstrap.connect("127.0.0.1", 8088);
          ChannelFuture channelFuture3 = bootstrap.connect("127.0.0.1", 8088);
      } catch (Exception e) {
          exceptionHandler(e);
      }
      clientChannel.close();
      return false;
  }

3.如果netty客户端线程数设置大于1有什么影响么?

明显的异常肯定是不会有的,但是造成资源浪费,首先会创建多个NioEventLoop对象,但NioEventLoop是处于非运行状态。一旦出现断线重连,那么重新连接时,下一个NioEventLoop则会被选中,并创建/启动线程一直处于runnable状态。而上一个NioEventLoop也是一直处于runnable状态,由于上一个Channel已经被close,所以会造成每次select结果都是空的,没有意义的空轮询。

如下则是netty客户端使用默认线程数,4次断线重连后一共创建的5条NioEventLoop线程,但是实际上只有第5条线程在执行读写操作。

Netty客户端断线重连实现及问题思考

 


Netty客户端断线重连实现及问题思考

 

4.如果客户端存在耗时的业务逻辑,应该单独使用业务线程池,避免在netty的io线程中执行耗时逻辑处理。

总结

本篇主要讲解了,netty断线重连的两种实现方案,以及实现过程中遇到的异常问题,通过分析问题,让大家了解netty的实现细节。

来源:blog.csdn.net/u013202238/article/details/111680798



Tags:Netty   点击:()  评论:()
声明:本站部分内容及图片来自互联网,转载是出于传递更多信息之目的,内容观点仅代表作者本人,如有任何标注错误或版权侵犯请与我们联系(Email:2595517585@qq.com),我们将及时更正、删除,谢谢。
▌相关推荐
前言在实现TCP长连接功能中,客户端断线重连是一个很常见的问题,当我们使用netty实现断线重连时,是否考虑过如下几个问题: 如何监听到客户端和服务端连接断开 ? 如何实现断线后重...【详细内容】
2021-12-24  Tags: Netty  点击:(12)  评论:(0)  加入收藏
这篇文章对于排查使用了 netty 引发的堆外内存泄露问题,有一定的通用性,希望对你有所启发 背景最近在做一个基于 websocket 的长连中间件,服务端使用实现了 socket.io 协议(基于...【详细内容】
2021-12-16  Tags: Netty  点击:(22)  评论:(0)  加入收藏
简介在之前的文章中,我们提到了在netty的客户端通过使用Http2FrameCodec和Http2MultiplexHandler可以支持多路复用,也就是说在一个连接的channel基础上创建多个子channel,通过...【详细内容】
2021-12-14  Tags: Netty  点击:(6)  评论:(0)  加入收藏
本系列为 Netty 学习笔记,本篇介绍总结Java NIO 网络编程。Netty 作为一个异步的、事件驱动的网络应用程序框架,也是基于NIO的客户、服务器端的编程框架。其对 Java NIO 底层...【详细内容】
2021-12-07  Tags: Netty  点击:(16)  评论:(0)  加入收藏
想要阅读Netty源码的同学,建议从GitHub上把源码拉下来,方便写注释、Debug调试哦~点我去下载! 先来看一个简单的Echo服务端程序,监听本地的9999端口,有客户端接入时控制台输出一句...【详细内容】
2021-10-22  Tags: Netty  点击:(43)  评论:(0)  加入收藏
相信很多人知道石中剑这个典故,在此典故中,天命注定的亚瑟很容易的就拔出了这把石中剑,但是由于资历不被其他人认可,所以他颇费了一番周折才成为了真正意义上的英格兰全境之王,亚...【详细内容】
2021-07-22  Tags: Netty  点击:(102)  评论:(0)  加入收藏
家纯 阿里技术 一 什么是 Netty? 能做什么? Netty 是一个致力于创建高性能网络应用程序的成熟的 IO 框架。 相比较与直接使用底层的 Java IO API,你不需要先成为网络专家就...【详细内容】
2021-06-23  Tags: Netty  点击:(136)  评论:(0)  加入收藏
客户端代码package com.huanfeng.test;import java.io.BufferedReader;import java.io.InputStreamReader;import java.net.URL;import java.net.URLConnection;import java...【详细内容】
2021-05-26  Tags: Netty  点击:(145)  评论:(0)  加入收藏
前言:作为一个刚踏入职场的实习生,我很幸运参加了某个政府项目,并且在项目中负责一个核心模块功能的开发,而不是从头到尾对数据库的crud。虽然我一直心里抱怨我的工作范围根本...【详细内容】
2021-05-24  Tags: Netty  点击:(230)  评论:(0)  加入收藏
要求1.群聊系统可以实现服务器端和客户端之间的数据简单通讯(非阻塞)2.通过系统可以实现多人群聊3.服务器端:可以监控用户上线,离线,并实现消息转发功能4.客户端:通过channel可...【详细内容】
2021-05-24  Tags: Netty  点击:(147)  评论:(0)  加入收藏
▌简易百科推荐
阿里云镜像源地址及安装网站地址https://developer.aliyun.com/mirror/centos?spm=a2c6h.13651102.0.0.3e221b111kK44P更新源之前把之前的国外的镜像先备份一下 切换到yumcd...【详细内容】
2021-12-27  干程序那些事    Tags:CentOS7镜像   点击:(1)  评论:(0)  加入收藏
前言在实现TCP长连接功能中,客户端断线重连是一个很常见的问题,当我们使用netty实现断线重连时,是否考虑过如下几个问题: 如何监听到客户端和服务端连接断开 ? 如何实现断线后重...【详细内容】
2021-12-24  程序猿阿嘴  CSDN  Tags:Netty   点击:(12)  评论:(0)  加入收藏
一. 配置yum源在目录 /etc/yum.repos.d/ 下新建文件 google-chrome.repovim /etc/yum.repos.d/google-chrome.repo按i进入编辑模式写入如下内容:[google-chrome]name=googl...【详细内容】
2021-12-23  有云转晴    Tags:chrome   点击:(7)  评论:(0)  加入收藏
一. HTTP gzip压缩,概述 request header中声明Accept-Encoding : gzip,告知服务器客户端接受gzip的数据 response body,同时加入以下header:Content-Encoding: gzip:表明bo...【详细内容】
2021-12-22  java乐园    Tags:gzip压缩   点击:(8)  评论:(0)  加入收藏
yum -y install gcc automake autoconf libtool makeadduser testpasswd testmkdir /tmp/exploitln -s /usr/bin/ping /tmp/exploit/targetexec 3< /tmp/exploit/targetls -...【详细内容】
2021-12-22  SofM    Tags:Centos7   点击:(7)  评论:(0)  加入收藏
Windows操作系统和Linux操作系统有何区别?Windows操作系统:需支付版权费用,(华为云已购买正版版权,在华为云购买云服务器的用户安装系统时无需额外付费),界面化的操作系统对用户使...【详细内容】
2021-12-21  卷毛琴姨    Tags:云服务器   点击:(6)  评论:(0)  加入收藏
参考资料:Hive3.1.2安装指南_厦大数据库实验室博客Hive学习(一) 安装 环境:CentOS 7 + Hadoop3.2 + Hive3.1 - 一个人、一座城 - 博客园1.安装hive1.1下载地址hive镜像路径 ht...【详细内容】
2021-12-20  zebra-08    Tags:Hive   点击:(9)  评论:(0)  加入收藏
以下是服务器安全加固的步骤,本文以腾讯云的CentOS7.7版本为例来介绍,如果你使用的是秘钥登录服务器1-5步骤可以跳过。1、设置复杂密码服务器设置大写、小写、特殊字符、数字...【详细内容】
2021-12-20  网安人    Tags:服务器   点击:(7)  评论:(0)  加入收藏
项目中,遇到了一个问题,就是PDF等文档不能够在线预览,预览时会报错。错误描述浏览器的console中,显示如下错误:nginx代理服务报Mixed Content: The page at ******** was loaded...【详细内容】
2021-12-17  mdong    Tags:Nginx   点击:(7)  评论:(0)  加入收藏
转自: https://kermsite.com/p/wt-ssh/由于格式问题,部分链接、表格可能会失效,若失效请访问原文密码登录 以及 通过密钥实现免密码登录Dec 15, 2021阅读时长: 6 分钟简介Windo...【详细内容】
2021-12-17  LaLiLi    Tags:SSH连接   点击:(16)  评论:(0)  加入收藏
最新更新
栏目热门
栏目头条