您当前的位置:首页 > 电脑百科 > 网络技术 > 网络技术

Kafka 的网络通信设计,竟然只用 20 行就实现了粘包拆包逻辑

时间:2021-08-11 09:27:58  来源:熬夜码农  作者:

一、开篇

经过上次文章的铺垫,相信大家对 JAVA 的 NIO 有了一些感性的认识,也初步了解了它的 API 了,可以开始去阅读 Kafka Producer 端的发送消息的部分了。

突然想感叹一下,阅读 Kafka 这个全世界著名的开源项目,多多少少会让人赏心悦目

二、发送消息的八个主流程

先大致扫一眼,发送消息的八个主流程,然后再逐个击破。

发送消息的主流程主要是在 Sender 方法里的,Sender 是一个后台线程,在构造 Producer 的时候,就已经被启动在后台运行了。所以我们主要看它的 run 方法。

run 方法是一个 while 循环,我们看里面的 run 方法。(当前位置:Sender 类)

Kafka 的网络通信设计,竟然只用 20 行就实现了粘包拆包逻辑

 

步骤一:获取集群的元数据。(当前位置:Sender 类)

Kafka 的网络通信设计,竟然只用 20 行就实现了粘包拆包逻辑

 

在上一篇文章可以知道,我们已经在 KafkaProducer 类的 doSend 方法中,完成了元数据的拉取,所以这里是可以获取到元数据的了。

步骤二:判断哪些 partition 有消息可以发送。(当前位置:Sender 类)

Kafka 的网络通信设计,竟然只用 20 行就实现了粘包拆包逻辑

 

步骤三:标识还没有拉取到元数据的 topic,这些 topic 需要再次拉取一次元数据。(当前位置:Sender 类)

Kafka 的网络通信设计,竟然只用 20 行就实现了粘包拆包逻辑

 

这个是一些容错

步骤四:检查与要发送消息的主机的网络连接是否建立好了(当前类:Sender 类)

Kafka 的网络通信设计,竟然只用 20 行就实现了粘包拆包逻辑

 

步骤五:把发往同一台机器的不同批次的消息合并成一个请求

Kafka 的网络通信设计,竟然只用 20 行就实现了粘包拆包逻辑

 

步骤六:处理超时的批次

Kafka 的网络通信设计,竟然只用 20 行就实现了粘包拆包逻辑

 

步骤七:创建请求

Kafka 的网络通信设计,竟然只用 20 行就实现了粘包拆包逻辑

 

步骤八:真正的发送消息出去的网络请求,包括:发送请求,接收和处理响应,拉取元数据等

Kafka 的网络通信设计,竟然只用 20 行就实现了粘包拆包逻辑

 

三、消息可以发送出去的条件

(1)首先我们来到这个 ready 方法里面(当前位置:RecordAccumulator)

Kafka 的网络通信设计,竟然只用 20 行就实现了粘包拆包逻辑

 

(2)来看这一行:

boolean exhausted = this.free.queued() > 0;

free 是指 BufferPool,queued 方法:

Kafka 的网络通信设计,竟然只用 20 行就实现了粘包拆包逻辑

 

waiters 里面是 Condition,表示是否有等待释放内存的线程,如果有,那么就是内存不足的意思。

也就是说,内存不足,exhausted 为 true,否则 为 false。

(3)遍历所有的分区和批次

Kafka 的网络通信设计,竟然只用 20 行就实现了粘包拆包逻辑

 

拿出一个批次出来,下面开始判断是否可发送的条件:

(4)第一次发送为 false;下次重试时间到了,false;重试时间没到,true。

boolean backingOff = batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > nowMs;

batch.attempts :表示是否尝试过了

batch.lastAttemptMs :表示分区的上次尝试时间,初始值为当前时间

retryBackOffMs :表示重试的时间间隔,默认为 100 ms

nowMs:表示当前时间

那么这句是什么意思?

  • 如果消息是第一次发送,那么这个 backingOff 就是 false;
  • 如果消息第一次发送失败,进入重试,并且还没到下次重试的时间,这个 backingOff 就是 true,如果到了重试的时间,那么 backingOff 就是 false。

这句话可能不好理解,可以假设,上次重试时间点是 10:00:00.000,重试的时间间隔是 100ms,下次重试时间是 10:00:00.100,而当前时间是 10:00:00.020,即还没到下次重试的时间。

那么 batch.lastAttemptMs + retryBackoffMs > nowMs 为 true,即还没到下次重试时间。

(5)计算出已经等待的时间

long waitedTimeMs = nowMs - batch.lastAttemptMs;

nowMs:表示当前时间

batch.lastAttemptMs:上次重试时间

waitedTimeMs:已经等待的时间

(6)等待的时间

long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;

retryBackoffMs :表示重试的时间间隔,默认是 100 ms

lingerMs:这个值默认是 0,即来一条发送一条。所以在生产上,一定要配置这个值,充分利用 batch 来缓存批次,避免过多和服务器的通信。

如果是第一次发送,backingOff 为 false,那么 timeToWaitMs 为 lingerMs。

(7)还需要等待多久

long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);

timeToWaitMs:一共需要等待的时间

waitedTimeMs:已经等待的时间

timeLeftMs:还需要等待的时间

(8)是否有批次满了

boolean full = deque.size() > 1 || batch.records.isFull();

如果队列里的批次数量大于 1,则表示已经有批次已经满了。

如果批次数量为 1,但是这个批次的消息已经满了

(9)是否超时,即已经等待的时长,是否大于一共需要等待的时长

boolean expired = waitedTimeMs >= timeToWaitMs;

(10)最后是发送条件,下面的五个条件是或的关系,任意一个满足,都可以发送

boolean sendable = full || expired || exhausted || closed || flushInProgress();
  • 如果批次已经满了
  • 等待的时间到了
  • 内存满了
  • 客户端关闭,但仍然有消息没发送

(11)如果达到了发送消息的条件,并且重试的时间到了(或者是第一次发送)

则把当前消息所在的分区的 Leader Partition 对应的主机,加到 readyNodes 数据结构中来

if (sendable && !backingOff) {
    readyNodes.add(leader);
}

至此,已经找到了需要发送消息的主机,那么接下来就是建立到这台主机的连接。

四、Kafka Producer 对于 Java NIO 的封装

到建立网络连接的时候,看到这段代码:

Kafka 的网络通信设计,竟然只用 20 行就实现了粘包拆包逻辑

 

可以看到具体的实现是在 NetwordClient 里面

Kafka 的网络通信设计,竟然只用 20 行就实现了粘包拆包逻辑

 


Kafka 的网络通信设计,竟然只用 20 行就实现了粘包拆包逻辑

 

第一个条件就是发送消息不能是在更新元数据的时候;

第二个条件点进去:

Kafka 的网络通信设计,竟然只用 20 行就实现了粘包拆包逻辑

 

发现这边有个核心的对象,selector,它是 NetworkClient 里的一个属性。(NetworkClient 是 Kafka 网络连接的一个很重要的对象!):

Kafka 的网络通信设计,竟然只用 20 行就实现了粘包拆包逻辑

 

我们再点进去,找它的实现类,Selector:

Kafka 的网络通信设计,竟然只用 20 行就实现了粘包拆包逻辑

 

可以看到有两个核心属性,第一个 nIOSelector 就是对于 Java 的 Nio 的封装。

第二个是一个 Map,Map 的 key 是 broker 的编号,value 是 KafkaChannel,KafkaChannel 可以理解为是 SocketChannel。

好,然后再继续看一下 KafkaChannel:

Kafka 的网络通信设计,竟然只用 20 行就实现了粘包拆包逻辑

 


Kafka 的网络通信设计,竟然只用 20 行就实现了粘包拆包逻辑

 

最终,如下图所示:

Kafka 的网络通信设计,竟然只用 20 行就实现了粘包拆包逻辑

 

五、检查并建立网络连接

我们从第四步的代码开始看:

Kafka 的网络通信设计,竟然只用 20 行就实现了粘包拆包逻辑

 

第一个条件,表示是否建立好了连接,如果建立好了,会在 nodeState 的结构中缓存起来的。

Kafka 的网络通信设计,竟然只用 20 行就实现了粘包拆包逻辑

 

第二个条件:通道是否准备好了:

Kafka 的网络通信设计,竟然只用 20 行就实现了粘包拆包逻辑

 

第三个条件:

Kafka 的网络通信设计,竟然只用 20 行就实现了粘包拆包逻辑

 

max.in.flight.requests.per.connection

这个参数,是在初始化 NetworkClient 对象的时候,传递进来的,默认值是 5.

表示最多默认有多少次请求没有得到服务端的响应。

这里第三个条件,就是说,是否小于 5 个请求发送出去了,没有得到响应。

但现在我们是第一次判断与主机的网络是否连接好,网络肯定是没有建立好的,所以这个方法会返回 false。

Kafka 的网络通信设计,竟然只用 20 行就实现了粘包拆包逻辑

 


Kafka 的网络通信设计,竟然只用 20 行就实现了粘包拆包逻辑

 

然后就开始初始化网络连接了:

Kafka 的网络通信设计,竟然只用 20 行就实现了粘包拆包逻辑

 


Kafka 的网络通信设计,竟然只用 20 行就实现了粘包拆包逻辑

 

这里连接的代码和平时写的 Java NIO 的代码是一样的

Kafka 的网络通信设计,竟然只用 20 行就实现了粘包拆包逻辑

 

socket.setTcpNoDelay(true);

注意,他这里有一句这个代码,这个默认值是 false,意思是它会把网络中的一些小的数据包收集起来,组合成一个大的数据包然后再发送出去。

它认为如果网络中有大量小的数据包,会影响网络拥塞。

所以这里,一定是要把它设置为 true 的。因为有时候,数据包就是比较小,这里不帮我们发送,明细是不合适的。

这里,建立网络连接,最终往 selector 上绑定了一个 OP_CONNECT 事件,和我们平时写的代码是一样的。

最终这个方法返回了 false:

Kafka 的网络通信设计,竟然只用 20 行就实现了粘包拆包逻辑

 

那么回到主流程上,返回 false 之后,这些主机都会被移除。

Kafka 的网络通信设计,竟然只用 20 行就实现了粘包拆包逻辑

 

然后是步骤七,创建一个请求。

最后执行到这里:

Kafka 的网络通信设计,竟然只用 20 行就实现了粘包拆包逻辑

 

点进去看,核心代码在这里:

Kafka 的网络通信设计,竟然只用 20 行就实现了粘包拆包逻辑

 

继续往里面看,核心代码在这里:

Kafka 的网络通信设计,竟然只用 20 行就实现了粘包拆包逻辑

 

点进去:

Kafka 的网络通信设计,竟然只用 20 行就实现了粘包拆包逻辑

 

再点进去,(当前位置:PlaintextTransportLayer)

Kafka 的网络通信设计,竟然只用 20 行就实现了粘包拆包逻辑

 

这里,如果已经连接网络了,则移除 OP_CONNECT 事件,并且增加 OP_READ 事件,这样的话,就可以读取到 服务端发送回来的响应了。

到这里位置,第一遍就建立好了网络连接。

六、准备发送消息

刚刚我们第一遍执行,建立好了网络连接,现在开始第二次执行

Kafka 的网络通信设计,竟然只用 20 行就实现了粘包拆包逻辑

 

这里网络已经准备好了,所以 if 的方法不执行,节点也不会被移除了

这个时候是可以合并批次的,因为这个 nodes 不为空

Kafka 的网络通信设计,竟然只用 20 行就实现了粘包拆包逻辑

 

然后创建一个请求,并且发送这个请求:

Kafka 的网络通信设计,竟然只用 20 行就实现了粘包拆包逻辑

 

点进去:

Kafka 的网络通信设计,竟然只用 20 行就实现了粘包拆包逻辑

 

在点进去 send 方法里,这里有一个很重要的操作,绑定了 OP_WRITE 事件

Kafka 的网络通信设计,竟然只用 20 行就实现了粘包拆包逻辑

 


Kafka 的网络通信设计,竟然只用 20 行就实现了粘包拆包逻辑

 

绑定了 OP_WRITE 事件,才能把数据发送出去!!

现在我们再退回到 这个方法:

Kafka 的网络通信设计,竟然只用 20 行就实现了粘包拆包逻辑

 

点到 poll 方法里来:

Kafka 的网络通信设计,竟然只用 20 行就实现了粘包拆包逻辑

 

然后这里会从 selector 上拿到 SelectionKey,如果是写事件:

Kafka 的网络通信设计,竟然只用 20 行就实现了粘包拆包逻辑

 


Kafka 的网络通信设计,竟然只用 20 行就实现了粘包拆包逻辑

 

点到 send 方法里来:

Kafka 的网络通信设计,竟然只用 20 行就实现了粘包拆包逻辑

 

把消息写出去,并且移除 OP_WRITE 事件。

到此为止,消息终于发送出去了。

七、获取服务端的响应,拆包和粘包处理

我们可以想到,客户端发送出去的肯定是多个请求,那么服务端返回的也是多个请求,那客户端如何从响应中解析出这多个请求呢?这就是拆包处理。

比如,服务端返回的响应是这样的:

响应成功响应失败

我们要拆分成:

响应成功

响应失败

但是,由于网络原因,返回的可能是这样的

响应成

功响应失败

也就是分两次发回给客户端

客户端该如何处理?

Kafka 是在响应消息的前面加上了每个响应的长度编码

40响应成功30响应失败

那这个长度会发生拆包吗?也很简单,申请一定长度的字节,比如2个字节来存长度,把这个2字节的长度满了,就是长度了。

等到读满了2字节,就转换成 int 类型,再申请这个 int 类型长度的内存,再去接收这么多长度的字节,一直到读满为止。

然后来看看 Kafka 的代码如何处理的,看到 poll 方法里处理 OP_READ 的方法的部分

Kafka 的网络通信设计,竟然只用 20 行就实现了粘包拆包逻辑

 


Kafka 的网络通信设计,竟然只用 20 行就实现了粘包拆包逻辑

 


Kafka 的网络通信设计,竟然只用 20 行就实现了粘包拆包逻辑

 

最终,拆包和粘包的代码:

Kafka 的网络通信设计,竟然只用 20 行就实现了粘包拆包逻辑

 

size.hasRemaining, size 是一个 4 字节的 ByteBuffer

Kafka 的网络通信设计,竟然只用 20 行就实现了粘包拆包逻辑

 

然后开始读4个字节的数据

int bytesRead = channel.read(size);

读取完了之后,再看有没有剩余空间了,如果读满了,那么把这个4字节的数变成一个 int 值,并且继续分配这个 int 值大小的 ByteBuffer

if (!size.hasRemaining()) {
    size.rewind();
    int receiveSize = size.getInt();
    if (receiveSize < 0)
        throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + ")");
    if (maxSize != UNLIMITED && receiveSize > maxSize)
        throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + " larger than " + maxSize + ")");

    this.buffer = ByteBuffer.allocate(receiveSize);
}

然后一直读取内容:

if (buffer != null) {
    int bytesRead = channel.read(buffer);
    if (bytesRead < 0)
        throw new EOFException();
    read += bytesRead;
}

然后再来看:

Kafka 的网络通信设计,竟然只用 20 行就实现了粘包拆包逻辑

 


Kafka 的网络通信设计,竟然只用 20 行就实现了粘包拆包逻辑

 

这个 complete 方法,是判断 size 已经读满了,并且 内容也已经读满了,那么就表示读取到了一个完整的响应了。

那么这就是完整的拆包和粘包的处理了,大概也就是20行代码,也是很精彩的。

八、总结

本次我们完整的看了 Sender 线程发送消息的完整过程,里面包括了 Kafka 如何封装 Java NIO 代码,并且合理的建立连接,绑定 OP_READ,OP_WRITE 事件,并且读取服务端的响应,代码质量还是非常高的,看起来也是赏心悦目。

希望大家对着源码再好好看一遍,一定会有收货的。



Tags:Kafka   点击:()  评论:()
声明:本站部分内容及图片来自互联网,转载是出于传递更多信息之目的,内容观点仅代表作者本人,如有任何标注错误或版权侵犯请与我们联系(Email:2595517585@qq.com),我们将及时更正、删除,谢谢。
▌相关推荐
前言Kafka 中有很多延时操作,比如对于耗时的网络请求(比如 Produce 是等待 ISR 副本复制成功)会被封装成 DelayOperation 进行延迟处理操作,防止阻塞 Kafka请求处理线程。Kafka...【详细内容】
2021-12-27  Tags: Kafka  点击:(1)  评论:(0)  加入收藏
一、开篇经过上次文章的铺垫,相信大家对 java 的 NIO 有了一些感性的认识,也初步了解了它的 API 了,可以开始去阅读 Kafka Producer 端的发送消息的部分了。突然想感叹一下,阅读...【详细内容】
2021-08-11  Tags: Kafka  点击:(65)  评论:(0)  加入收藏
Kafka集群安装、配置和启动Kafka需要依赖zookeeper,并且自身集成了zookeeper,zookeeper至少需要3个节点保证集群高可用,下面是在单机linux下创建kafka3个节点伪集群模式。1、下...【详细内容】
2020-11-10  Tags: Kafka  点击:(111)  评论:(0)  加入收藏
以前我们讨论的消费组,都是 group 的形式,group 可以自动地帮助消费者分配分区,且在发生异常时,还能自定地进行重平衡(Rebalance)。正常来说,group 帮助用户实现自动监听分区消费,但...【详细内容】
2020-09-19  Tags: Kafka  点击:(105)  评论:(0)  加入收藏
MQTT与Kafka完全不同。MQTT是由OASIS技术委员会的成员(大多数是IBM和Microsoft的高级工程师)开发的协议和技术标准。Kafka是LinkedIn首次实现的开源流平台。2011年开放源码...【详细内容】
2020-07-17  Tags: Kafka  点击:(645)  评论:(0)  加入收藏
如上图所示,kafaka集群的 broker,和 Consumer 都需要连接 Zookeeper。 Producer 直接连接 Broker。Producer 把数据上传到 Broker,Producer可以指定数据有几个分区、几个备份...【详细内容】
2020-06-15  Tags: Kafka  点击:(124)  评论:(0)  加入收藏
前提近段时间,业务系统架构基本完备,数据层面的建设比较薄弱,因为笔者目前工作重心在于搭建一个小型的数据平台。优先级比较高的一个任务就是需要近实时同步业务系统的数据(包括...【详细内容】
2020-04-02  Tags: Kafka  点击:(162)  评论:(0)  加入收藏
零拷贝,从字面意思理解就是数据不需要来回的拷贝,大大提升了系统的性能。我们也经常在 Java NIO,Netty,Kafka,RocketMQ 等框架中听到零拷贝,它经常作为其提升性能的一大亮点下面从...【详细内容】
2020-03-27  Tags: Kafka  点击:(90)  评论:(0)  加入收藏
▌简易百科推荐
写一个shell获取本机ip地址、网关地址以及dns信息。经常会遇到取本机ip、网关、dns地址,windows一个命令ipconfig /all全部获取到,但linux系统却并非如此。linux系统都自带ifc...【详细内容】
2021-12-27  K佬食古    Tags:shell   点击:(0)  评论:(0)  加入收藏
步骤1、配置 /etc/sysconfig/network-scripts/ifcfg-eth0 里的文件。it动力的CentOS下的ifcfg-eth0的配置详情:[root@localhost ~]# vim /etc/sysconfig/network-scripts/ifc...【详细内容】
2021-12-24  忆梦如风    Tags:网卡   点击:(9)  评论:(0)  加入收藏
1、查找当前目录下所有以.tar结尾的文件然后移动到指定目录find . -name “*.tar” -execmv {}./backup/ ;注解:find &ndash;name 主要用于查找某个文件名字,-exec 、xargs可...【详细内容】
2021-12-17  郭主任    Tags:运维   点击:(18)  评论:(0)  加入收藏
对于经常上网的朋友来说,除了手机购物上网,pc端玩网页游戏还是很多小伙伴首选的,但是有时候明明宽带链接上了,打开浏览器却出现上不了网的现象,下面小编要来跟大家说说电脑有网络...【详细内容】
2021-12-16  小白系统    Tags:网页无法打开   点击:(28)  评论:(0)  加入收藏
在访问像github、gitlab这样的外国网站时,很有可能会出现页面加载不出来或找不到页面的错误。这时候有的朋友就会以为是网络的问题,于是把Wifi断掉连上自己手机的热点,结果却还...【详细内容】
2021-12-15  启施技术IT狼叔    Tags:外网   点击:(14)  评论:(0)  加入收藏
网络地址来源:获取公网IP地址 https://ipip.yy.com/get_ip_info.phphttp://pv.sohu.com/cityjson?ie=utf-8http://www.ip168.com/json.do?view=myipaddress...【详细内容】
2021-12-15  韦廷华12    Tags:外网ip   点击:(14)  评论:(0)  加入收藏
准备好软件IPOP、用ENSP模拟一下华为交换机 启动交换机 <Huawei>sysEnter system view, return user view with Ctrl+Z.[Huawei]sysname FTPClient[FTPClient]interface vla...【详细内容】
2021-12-15  思源Edward    Tags:交换机   点击:(22)  评论:(0)  加入收藏
我们经常用到netstat命令查看主机连接状况,包括连接ip、端口、状态等,今天就练习下shell分析netsat结果。描述假设netstat命令运行的结果我们存储在nowcoder.txt里,格式如下:Pro...【详细内容】
2021-12-14  K佬食古    Tags:netstat   点击:(19)  评论:(0)  加入收藏
什么是滑动窗口?窗口是操作系统开辟的一块缓存空间,发送方在收到接收方ACK应答之前,必须在缓冲区保留已发送的数据,如果按期收到确认应答,数据就可以从缓冲区移除。什么是滑动窗...【详细内容】
2021-12-14  DifferentJava    Tags:TCP   点击:(28)  评论:(0)  加入收藏
概述日常管理华为路由设备过程中,难为会忘记设备登录密码,那么该如何重置设备登录密码吗?本期文章将全面向各位小伙伴总结分享。重置华为设备登录密码思路先行 采用console登录...【详细内容】
2021-12-10  onme0    Tags:   点击:(26)  评论:(0)  加入收藏
最新更新
栏目热门
栏目头条