您当前的位置:首页 > 电脑百科 > 网络技术 > 网络知识

Reactor网络模型核心思想探秘

时间:2023-12-06 14:56:16  来源:微信公众号  作者:程序员班吉

在网络编程系列文章中,我们实现了一个基于epoll的网络框架,并在此基础上开发了一个简单的HTTP服务,在那个系列文章中我们使用了读、写两个buffer将网络IO和数据的读写进行了分离,它们之间的扭转完全通过epoll事件通知,如果你认真研究过源码,会发现,所有针对网络IO的操作都是由事件触发的。这种基于事件触发的网络模型通常我们叫做Reactor网络模型。

由于网络编程系列文章中代码实现相对比较复杂,不太好讲清楚。所以,我决定单独出几篇文章对那个系列文章进行一些拓展,主要涉及到网络编程思想和性能测试。

这篇文章我们通过实现一个简单的网络框架,来说明Reactor网络模型实现的一般思路,其本质思想和x.NET项目基本上是一样的,只是在代码上做了非常大的精简,理解起来会轻松很多。

首先,我们来看一段代码
#include <sys/socket.h>#include <errno.h>#include <netinet/in.h>#include <stdio.h>#include <string.h>#include <unistd.h>#include <sys/epoll.h>

int mAIn() {    int sockfd = socket(AF_INET, SOCK_STREAM, 0);
    struct sockaddr_in servaddr;    memset(&servaddr, 0, sizeof(struct sockaddr_in));
    servaddr.sin_family = AF_INET;    servaddr.sin_addr.s_addr = htonl(INADDR_ANY);    servaddr.sin_port = htons(2048);
    if (-1 == bind(sockfd, (struct sockaddr*)&servaddr, sizeof(struct sockaddr))) {        perror("bind fail");        return -1;    }
    listen(sockfd, 10);
    printf("sock-fd:%dn", sockfd);
    int epfd = epoll_create(1);
    struct epoll_event ev;    ev.events = EPOLLIN;    ev.data.fd = sockfd;
    epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &ev);
    struct epoll_event events[1024] = {0};
    while(1) {        int nready = epoll_wait(epfd, events, 1024, -1);
        int i = 0;        for (i = 0; i < nready; i++) {            int connfd = events[i].data.fd;            if (events[i].events & EPOLLIN && sockfd == connfd) {                struct sockaddr_in clientaddr;                socklen_t len = sizeof(clientaddr);
                int clientfd = accept(sockfd, (struct sockaddr*)&clientaddr, &len);
                ev.events = EPOLLIN | EPOLLET;                ev.data.fd = clientfd;                epoll_ctl(epfd, EPOLL_CTL_ADD, clientfd, &ev);
                printf("clientfd: %dn", clientfd);            } else if (events[i].events & EPOLLIN) {
                char buffer[10] = {0};
                int count = recv(connfd, buffer, 10, 0);                if (count == 0) {                    printf("discounnectn");
                    epoll_ctl(epfd, EPOLL_CTL_DEL, connfd, NULL);                    close(i);
                    continue;                }
                send(connfd, buffer, count, 0);                printf("clientfd: %d, count: %d, buffer: %sn", connfd, count, buffer);            }        }    }}

 

熟悉epoll的人应该对上面的代码比较熟悉,这段代码的核心在下面的while主循环,如果是当前Server的Socket说明有新的连接进来,调用accept拿到客户端的fd,将其放在epoll的events中,并注册EPOLLIN事件,一般我们理解为可读事件。

如果不是sockfd,说明是客户端的fd可读,我们将数据读出来再原样发送回去。

上面的代码存在的主要问题在于,套接字的accept和读写操作我们是直接写在主循环里了,这将会让代码的逻辑变得难以琢磨。

对于一个套接字,最直接的操作就是读和写。所以,最容易想到的就是将读和写分离开。为了实现读和写分离我们封装两个回调函数,如下:
int recv_callback(int fd, char *buffer, int size);int send_callback(int fd, char *buffer, int size);
你可以想一下,这两个函数应该怎么写?下面是根据原有的逻辑将读和写封装在了recv_callback和send_callback两个函数中,代码如下:
int recv_callback(int fd, char *buffer, int size) {    int count = recv(fd, buffer, size, 0);
    send_callback(fd, buffer, count, 0);
    return count;}int send_callback(int fd, char *buffer, int size) {    int count = send(fd, buffer, size, 0);
    return count;}
然后,在主循环中就可以这样使用
int main() {
    ...
    while(1) {        int nready = epoll_wait(epfd, events, 1024, -1);
        int i = 0;        for (i = 0; i < nready; i++) {            int connfd = events[i].data.fd;
            if (events[i].events & EPOLLIN && sockfd == connfd) {                ...            } else if (events[i].events & EPOLLIN) {                char buffer[10] = {0};
                int count = recv_callback(fd, buffer, 10);                if (count == 0) {                    printf("disconnect\n");                    epoll_ctl(epfd, EPOLL_CTL_DEL, connfd, NULL);                    clise(i);                    continue;                }            }        }    } }
虽然我们将读和写拆成了两个方法,但读和写并没有分离开,我们在recv_callback中每次收到数据之后调用send_callback将数据原样又发回去,在这里我们希望recv_callback和send_callback各管各的互不干扰,比如像下面这样
int recv_callback(int fd, char *buffer, int size) {    int count = recv(fd, buffer, size, 0);
    return count;}int send_callback(int fd, char *buffer, int size) {    int count = send(fd, buffer, size, 0);
    return count;}

但这样明显也是有问题的,在recv_callback中读完了之后,如何发送数据呢?这里,我们可以想一下,围绕着一个套接字都有哪些部分呢?是不是可以设计出一个类似字典的结构,这个字典的key对应的就是套接字,而value对应的就是围绕套接字相关的各个组件。

我们将recv_callback和send_callback放在了一个conn_channel结构体中,并且设计了两个buffer,一个用来读数据,另一个用来发数据,conn_channel便是这个字典对应的value,代码如下:
#define BUF_LEN   1024
typedef int(*callback)(int fd);
struct conn_channel {    int fd;
    callback recv_call;    callback send_call;
    char wbuf[BUF_LEN];    int wlen;    char rbuf[BUF_LEN];    int rlen;};
其中,fd表示的是当前客户端套接字。然后我们定义一个数组来表示套接字到套接字value的映射关系,代码如下:
struct conn_channel conn_map[1024] = {0};
这样,我们在主循环中,就可以像下面这样,往conn_map中添加对应的套接字了,代码如下:
int main() {    ...
    while(1) {        int nready = epoll_wait(epfd, events, 1024, -1);
        int i = 0;        for (i = 0; i < nready; i++) {            int connfd = events[i].data.fd;
            if (events[i].events & EPOLLIN && sockfd == connfd) {                struct sockaddr_in clientaddr;                socklen_t len = sizeof(clientaddr);
                int clientfd = accept(sockfd, (struct sockaddr*)&clientaddr, &len);
                ev.events = EPOLLIN;                ev.data.fd = clientaddr;
                epoll_ctl(epfd, EPOLL_CTL_ADD, clientfd, &ev);
                conn_map[clientfd].fd = clientfd;                conn_map[clientfd].rlen = 0;                conn_map[clientfd].wlen = 0;                conn_map[clientfd].recv_call = recv_callback;                conn_map[clientfd].send_call = send_callback;                memset(conn_map[clientfd].rbuf, 0, BUF_LEN);                memset(conn_map[clientfd].wbuf, 0, BUF_LEN);
                printf("clientfd:%d\n", clientfd);            } else if (events[i].events & EPOLLIN) {                ...            }        }    } }
在上面的代码中,每当accept出来一个客户端的套接字,我们就将它放到conn_map中,设置好读写buffer和回调函数。但如果你细心点会发现,recv_callback、send_callback和conn_channel中的回调函数签名是不一样的。所以,我们要调整一下这两个函数的实现,调整之后代码如下:
int recv_callback(int fd) {    int count = recv(fd, conn_map[fd].rbuf + conn_map[fd].rlen, BUF_LEN - conn_map[fd].rlen, 0);    // do something
    memcpy(conn_map[fd].wbuf, conn_map[fd].rbuf, conn_map[fd].rlen);    conn_map[fd].wlen = conn_map[fd].rlen;    conn_map[fd].rlen = 0;
    return count;}int send_callback(int fd) {    int count = send(fd, conn_map[fd].wbuffer, conn_map[fd].wlen, 0);
    return count;}

因为有了conn_map,所以原来传进来的buffer和size都不需要了,在conn_channel中已经有记录了。所以只需要一个fd参数就可以了。我们在recv_callback中模拟了回复消息,强行将读到的数据写到了wbuffer中。这里补充一下,conn_channel中的rbuffer是用来从套接字中读数据的,wbuffer表示的是将要发送到套接字的数据。

你可以试着把上面的代码跑起来,然后你会发现,并没有按我们的预期执行,send_callback中的send似乎没有起作用。这是因为我们只是将数据从rbuffer写到了wbuffer中,而send_callback并没有机会调用。你可以想一想send_callback放在哪里调用比较合适呢?

在上面的例子中,显然放在主循环中执行比较合适,在epoll中,EPOLLOUT表示可写事件,我们可以利用这个事件。在recv_callback执行完之后我们注册一个EPOLLOUT事件,然后在主循环中我们去监听EPOLLOUT事件。这样,当recv_callback将rbuffer的数据复制到wbuffer中之后,send_callback通过EPOLLOUT事件就可以在主循环中得以执行。

为了实现上面的效果我们要修改两个地方,一个是recv_callback中我们要注册一下EPOLLOUT事件,代码如下:
int recv_callback(int fd) {    int count = recv(fd, conn_map[fd].rbuf + conn_map[fd].rlen, BUF_LEN - conn_map[fd].rlen, 0);    // do something
    memcpy(conn_map[fd].wbuf, conn_map[fd].rbuf, conn_map[fd].rlen);    conn_map[fd].wlen = conn_map[fd].rlen;    conn_map[fd].rlen = 0;
    struct epoll_event ev;    ev.events = EPOLLOUT;    ev.data.fd = fd;
    epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev);
    return count;}
我们在rbuf拷贝到wbuf之后,给当前fd注册了EPOLLOUT事件,然后我们在主循环中要处理EPOLLOUT事件,代码如下:
int main() {    ...
    while(1) {        int nready = epoll_wait(epfd, events, 1024, -1);
        int i = 0;        for (i = 0; i < nready; i++) {            int connfd = events[i].data.fd;
            if (events[i].events & EPOLLIN && sockfd == connfd) {                struct sockaddr_in clientaddr;                socklen_t len = sizeof(clientaddr);
                int clientfd = accept(sockfd, (struct sockaddr*)&clientaddr, &len);
                ev.events = EPOLLIN;                ev.data.fd = clientaddr;
                epoll_ctl(epfd, EPOLL_CTL_ADD, clientfd, &ev);
                conn_map[clientfd].fd = clientfd;                conn_map[clientfd].rlen = 0;                conn_map[clientfd].wlen = 0;                conn_map[clientfd].recv_call = recv_callback;                conn_map[clientfd].send_call = send_callback;                memset(conn_map[clientfd].rbuf, 0, BUF_LEN);                memset(conn_map[clientfd].wbuf, 0, BUF_LEN);
                printf("clientfd:%d\n", clientfd);            } else if (events[i].events & EPOLLIN) {                int count = conn_map[connfd].recv_call(connfd);                printf("recv-count:%d\n", count);            } else if (events[i].events & EPOLLOUT) { // 处理EPOLLOUT事件                int count  = conn_map[connfd].send_call(connfd);                printf("send-count:%d\n", count);            }        }    } }

要注意的是,epfd是在main函数中定义的,而我们在recv_callback中有使用,所以我们可以暂时将epfd声明成一个全局变量,放在外面。

上面的代码有一个问题,EPOLLOUT事件触发之后你会发现再向当前fd发送数据,就没响应了,这是因为epoll事件被我们修改了,为了解决这个问题我们可以在send_callback执行完之后再设置回去,如下:
int send_callback(int fd) {    int count = send(fd, conn_map[fd].wbuffer, conn_map[fd].wlen, 0);
    struct epoll_event ev;    ev.events = EPOLLIN;    ev.data.fd = fd;
    epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev);
    return count;}

这样,我们就将IO操作给屏蔽了,在主循环中我们只关注事件,不同的事件调用不同的回调函数。在对应的回调函数中只做自己该做的,做完之后注册事件通知其它的回调函数。

但是,上面的代码还不够优雅,对于accept和读事件来讲在epoll中都是EPOLLIN事件,这两个是不是可以合并在一起处理呢?答案是可以的,首先,我们要将accept相关的逻辑给拆出来,拆解之后的代码如下:
int accept_callback(int fd) {    struct sockaddr_in clientaddr;    socklen_t len = sizeof(clientaddr);
    int clientfd = accept(fd, (struct sockaddr*)&clientaddr, &len);
    ev.events = EPOLLIN;    ev.data.fd = clientaddr;
    epoll_ctl(epfd, EPOLL_CTL_ADD, clientfd, &ev);
    conn_map[clientfd].fd = clientfd;    conn_map[clientfd].rlen = 0;    conn_map[clientfd].wlen = 0;    conn_map[clientfd].recv_call = recv_callback;    conn_map[clientfd].send_call = send_callback;    memset(conn_map[clientfd].rbuf, 0, BUF_LEN);    memset(conn_map[clientfd].wbuf, 0, BUF_LEN);
    return clientfd;}
我们发现,accept_callback和recv_callback以及send_callback的签名是一样的,这样我们可以在conn_channel用一个union,将accept_callback也放到conn_channel中来。如下:
struct conn_channel {    int fd;
    union {        callback accept_call;        callback recv_call;    } call_t;    callback send_call;
    char wbuf[BUF_LEN];    int wlen;    char rbuf[BUF_LEN];    int rlen;};
在主循环中,我们就可以先给sockfd注册好accept回调函数,然后我们只需要在主循环中保留两个逻辑就可以了,代码如下:
int main() {    int sockfd = create_serv(9000);    if (sockfd == -1) {        perror("create-server-fail");        return -1;    }
    make_nonblocking(sockfd);
    epfd = epoll_create1(1);
    struct epoll_event ev;    ev.events = EPOLLIN;    ev.data.fd = sockfd;
    epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &ev);
    struct epoll_event events[1024] = {0}; 
    conn_map[sockfd].rlen = 0;    conn_map[sockfd].wlen = 0;    conn_map[sockfd].fd = sockfd;    conn_map[sockfd].call_t.accept_call = accept_callback;    conn_map[sodkfd].send_call = send_callback;    memset(conn_map[sockfd].rbuf, 0, BUF_LEN);    memset(conn_map[sockfd].wbuf, 0, BUF_LEN);
    while(1) {        int nready = epoll_wait(epfd, events, 1024, -1);
        int i = 0;        for (i = 0; i < nready; i++) {            int connfd = events[i].data.fd;            if (events[i].events & EPOLLIN) {                int count = conn_map[connfd].call_t.recv_call(connfd);                printf("recv-count:%d\n", count);            } else if (events[i].events & EPOLLOUT) {                int count  = conn_map[connfd].send_call(connfd);                printf("send-count:%d\n", count);            }        }    } }

你可以想一下,我们注册的是call_t.accept_call,但在调用的时候确是call_t.recv_call,为什么这样可行?

我们在网络编程系列文章中,单独为accept抽象出了一个对象,你可以对比一下这两种实现方式,看看它们有什么区别?在系列文件中我们为什么要单独抽象出一个accepter对象呢?

可以看到,最后主循环中的逻辑,只有两个分支,这两个分支代表了两种事件,这种通过事件驱动的网络模型便是Reactor网络模型。本文为了容易理解,将代码进行了精简。在实际的工程中我们还要考虑诸多情况。比如,上面的代码只支持epoll,我们是不是可以将事件驱动相关的代码抽象成单独的组件,让其可以支持其它的事件模型。

本文虽然代码简单,但Reactor网络模型的实现基本上都逃脱不了这个套路,只是在此基础上可能会将各个部分进行单独的封装,比如我们在网络编程系列文章中就将channel和map进行了抽象,让它能适配各种场景。

总结

reactor网络模型是网络编程中非常重要的一种编程思想,本文通过一个简短的示例试图讲明白reactor网络编程模型的核心思想。当然,本文的实现还不是很完善,比如在调用回调函数的时候还是传入了fd,我们是否可以不需要这个参数,彻彻底底地和IO进行分离呢?



Tags:Reactor   点击:()  评论:()
声明:本站部分内容及图片来自互联网,转载是出于传递更多信息之目的,内容观点仅代表作者本人,不构成投资建议。投资者据此操作,风险自担。如有任何标注错误或版权侵犯请与我们联系,我们将及时更正、删除。
▌相关推荐
Reactor网络模型核心思想探秘
在网络编程系列文章中,我们实现了一个基于epoll的网络框架,并在此基础上开发了一个简单的HTTP服务,在那个系列文章中我们使用了读、写两个buffer将网络IO和数据的读写进行了分...【详细内容】
2023-12-06  Search: Reactor  点击:(81)  评论:(0)  加入收藏
新一代WebFlux框架核心技术Reactor响应式编程基本用法
环境:projectreactor2020.0.14 1. 前言在响应式编程中,Project Reactor提供了两个核心的概念:Mono和Flux。Mono和Flux都是Reactor中的Publisher,它们可以产生并发布数据,然后可以...【详细内容】
2023-11-27  Search: Reactor  点击:(181)  评论:(0)  加入收藏
一文聊透 Netty 核心引擎 Reactor 的运转架构
本文笔者来为大家介绍下Netty的核心引擎Reactor的运转架构,希望通过本文的介绍能够让大家对Reactor是如何驱动着整个Netty框架的运转有一个全面的认识。也为我们后续进一步...【详细内容】
2022-07-05  Search: Reactor  点击:(325)  评论:(0)  加入收藏
单Reactor多线程
public class SReactorSThread { private Selector selector; private ServerSocketChannel serverSocketChannel; private int PORT = 6666; public SReactorS...【详细内容】
2021-05-25  Search: Reactor  点击:(1054)  评论:(0)  加入收藏
Linux两种处理模式reactor模式proactor模式
前言 同步I/O模型通常用于实现Reactor模式 异步I/O模型则用于实现Proactor模式 最后我们会使用同步I/O方式模拟出Proactor模式一、Reactor模式 Reactor 释义“反应堆”,是一...【详细内容】
2020-12-24  Search: Reactor  点击:(398)  评论:(0)  加入收藏
浅谈 Netty三种 I/O 模式和Reactor 编程模式
Netty 怎么切换三种 I/O 模式什么是经典的三种 I/O 模式?生活场景:当我们去饭店吃饭时:&bull; 食堂排队打饭模式:排队在窗口,打好才走;&bull; 点单、等待被叫模式:等待被叫,好了自己...【详细内容】
2020-10-13  Search: Reactor  点击:(232)  评论:(0)  加入收藏
Reactor线程模型
前言Reactor模型是基于事件驱动的线程模型,可以分为Reactor单线程模型、Reactor多线程模型、主从Reactor多线程模型,通常基于在I/O多路复用实现。不同的角色职责有:Dispatcher...【详细内容】
2020-06-13  Search: Reactor  点击:(1260)  评论:(0)  加入收藏
Java NIO的三种Reactor线程模型分析
概述 在使用Java NIO和多线程来进行高并发Java服务端应用程序设计时,通常是基于Reactor线程模型来设计的。Reactor,即包含一个Java NIO的多路复用选择器Selector的反应堆,当有...【详细内容】
2019-08-28  Search: Reactor  点击:(1017)  评论:(0)  加入收藏
彻底搞懂Reactor模型和Proactor模型
在高性能的I/O设计中,有两个著名的模型:Reactor模型和Proactor模型,其中Reactor模型用于同步I/O,而Proactor模型运用于异步I/O操作。想要了解两种模型,需要了解一些IO、同步异步...【详细内容】
2019-08-13  Search: Reactor  点击:(1244)  评论:(0)  加入收藏
▌简易百科推荐
学生偷看“不良网站”,手机上3个痕迹无法清除,网友:咋不早说
众所周知,中国的常规教育中,总是“谈性色变”,但在这个信息爆炸的互联网时代,即便是一些年纪很小的孩子,也能轻易接触到一些所谓的不良网站,因此这一方面的教育缺失,其实是很可怕的...【详细内容】
2024-03-28    叶姐生活指南  Tags:不良网站   点击:(11)  评论:(0)  加入收藏
什么是网络中的路由器?核心功能解释
路由器是互联网连接的核心元素,是一种允许多个设备连接到互联网,并促进将数据包转发到各自的目标地址的设备。使用动态路由技术,路由器检查数据并在各种可用路径中选择最有效的...【详细内容】
2024-03-07    千家网  Tags:路由器   点击:(31)  评论:(0)  加入收藏
过年该不该升级Wi-Fi 7路由?看完就知道
打开电商网站不难发现,从2023年第三季度到现在,Wi-Fi 7路由器新品越来越多。而且价格不再是高高在上,已经基本和Wi-Fi 6路由价格差不多了。看到这些Wi-Fi 7新品路由,不少朋友就...【详细内容】
2024-02-27    中关村在线  Tags:Wi-Fi   点击:(34)  评论:(0)  加入收藏
聊聊 Kubernetes 网络模型综合指南
这篇详细的博文探讨了 Kubernetes 网络的复杂性,提供了关于如何在容器化环境中确保高效和安全通信的见解。译自Navigating the Network: A Comprehensive Guide to Kubernete...【详细内容】
2024-02-19  云云众生s  微信公众号  Tags:Kubernetes   点击:(37)  评论:(0)  加入收藏
SSL协议是什么?关于SSL和TLS的常见问题解答
SSL(安全套接字层)及其后继者TLS(传输层安全)是用于在联网计算机之间建立经过身份验证和加密的链接的协议。尽管SSL协议在 1999年已经随着TLS 1.0的发布而被弃用,但我们仍将这些...【详细内容】
2024-02-06  IDC点评网    Tags:SSL协议   点击:(69)  评论:(0)  加入收藏
从零开始了解网络协议:TCP/IP详解
从零开始了解网络协议:TCP/IP详解 在当今数字化的时代,网络协议已经成为我们生活中不可或缺的一部分。作为互联网的基础,网络协议规定了数据如何在不同的网络设备之间传输。TC...【详细内容】
2024-02-01    简易百科  Tags:TCP/IP   点击:(59)  评论:(0)  加入收藏
BGP路由属性:互联网路由的灵活控制器
在互联网的庞大网络中,边界网关协议(BGP)是确保不同自治系统(AS)间路由信息有效交换的关键协议。然而,BGP的功能远不止于此。其核心组成部分,即BGP路由属性,赋予了BGP强大的灵活性,使...【详细内容】
2024-01-26  诺诺爱生活    Tags:互联网路由   点击:(40)  评论:(0)  加入收藏
简易百科之什么是网络延迟?
简易百科之什么是网络延迟?随着互联网的普及和发展,网络已经成为我们生活中不可或缺的一部分。然而,我们在使用网络时可能会遇到一种情况,那就是网络延迟。那么,什么是网络延迟呢...【详细内容】
2024-01-24    简易百科  Tags:网络延迟   点击:(143)  评论:(0)  加入收藏
网络延迟与网络速度有什么区别?分享具体的答案
通常,许多人抱怨网速测试。速度还是不错的,但是他们玩游戏的时候怎么会卡住,还是断开连接等等问题,这一系列问题始终困扰着大家。那么,网络延迟与网络速度有什么区别呢?请不要担心...【详细内容】
2024-01-24  萌新小郭    Tags:网络延迟   点击:(48)  评论:(0)  加入收藏
揭秘IP地址的网络威胁与攻击类型
在当今数字化时代,网络攻击已经成为网络安全的一大挑战。IP地址,作为互联网通信的基础,也成为网络威胁和攻击的焦点之一。本文将深入探讨不同类型的网络威胁和攻击,以及如何防范...【详细内容】
2024-01-22  IP数据云    Tags:IP地址   点击:(74)  评论:(0)  加入收藏
站内最新
站内热门
站内头条