您当前的位置:首页 > 电脑百科 > 程序开发 > 语言 > Rust

异步Rust:构建实时消息代理服务器

时间:2024-02-01 13:16:46  来源:  作者:

在本文中,我们将深入研究使用Rust构建实时消息代理服务器,展示其强大的并发特性。我们将使用Warp作为web服务器,并使用Tokio来管理异步任务。此外,我们将创建一个WebSocket客户端来测试代理服务器的功能。

设计图如下:

图片图片

构建消息代理服务器

消息代理服务器允许客户端为主题生成事件并订阅它们。它使用Warp作为HTTP和WebSocket服务器,使用Tokio作为异步运行时。

使用以下命令创建一个Rust项目:

cargo new real-ime-message

在Cargo.toml文件中加入以下依赖项:

[dependencies]
futures-util = "0.3.30"
tokio = {version = "1.35.1", features = ["full"]}
tokio-tungstenite = "0.21.0"
url = "2.5.0"
warp = "0.3.6"

在src/mAIn.rs文件中定义一个Broker结构体:

use std::{
    collections::{HashMap, VecDeque},
    sync::Arc,
};

use futures_util::{SinkExt, StreamExt};
use tokio::sync::{
    mpsc::{self, UnboundedSender},
    RwLock,
};
use warp::{filters::ws::Message, Filter};

type Topic = String;
type Event = String;
type WsSender = UnboundedSender<warp::ws::Message>;

struct Broker {
    events: Arc<RwLock<HashMap<Topic, VecDeque<Event>>>>,
    subscribers: Arc<RwLock<HashMap<Topic, Vec<WsSender>>>>,
}
  • events:存储每个主题的事件。
  • subscribers:跟踪每个主题的订阅者。

创建一个新的Broker实例:

impl Broker {
    fn new() -> Self {
        Broker {
            events: Arc::new(RwLock::new(HashMap::new())),
            subscribers: Arc::new(RwLock::new(HashMap::new())),
        }
    }
}

定义发布事件的方法produce:

impl Broker {
    ......

    async fn produce(&self, topic: Topic, event: Event) {
        let mut events = self.events.write().await;
        events
            .entry(topic.clone())
            .or_default()
            .push_back(event.clone());

        // 异步通知所有订阅者
        let subscribers_list;
        {
            let subscribers = self.subscribers.read().await;
            subscribers_list = subscribers.get(&topic).cloned().unwrap_or_default();
        }

        for ws_sender in subscribers_list {
            // 将事件发送到WebSocket客户端
            let _ = ws_sender.send(warp::ws::Message::text(event.clone()));
        }
    }
}

这个方法主要是将事件添加到相应的主题,然后将新事件通知所有订阅者。

定义subscribe方法,来管理新的订阅:

impl Broker {
    ......

    pub async fn subscribe(&self, topic: Topic, socket: warp::ws::WebSocket) {
        let (ws_sender, mut ws_receiver) = socket.split();

        let (tx, mut rx) = mpsc::unbounded_channel::<Message>();

        {
            let mut subs = self.subscribers.write().await;
            subs.entry(topic).or_default().push(tx);
        }

        tokio::task::spawn(async move {
            while let Some(result) = ws_receiver.next().await {
                match result {
                    Ok(message) => {
                        // 处理有效的消息
                        if message.is_text() {
                            println!(
                                "Received message from client: {}",
                                message.to_str().unwrap()
                            );
                        }
                    }
                    Err(e) => {
                        // 处理错误
                        eprintln!("WebSocket error: {:?}", e);
                        break;
                    }
                }
            }
            println!("WebSocket connection closed");
        });

        tokio::task::spawn(async move {
            let mut sender = ws_sender;

            while let Some(msg) = rx.recv().await {
                let _ = sender.send(msg).await;
            }
        });
    }
}

这个方法主要是将WebSocket拆分为发送方和接收方,将订阅者添加到订阅者列表中,处理传入的WebSocket消息。

main函数代码如下:

#[tokio::main]
async fn main() {
    let broker = Arc::new(Broker::new());
    let broker_clone1 = Arc::clone(&broker);
    let broker_clone2 = Arc::clone(&broker);

    let produce = warp::path!("produce" / String)
        .and(warp::post())
        .and(warp::body::json())
        .and(warp::any().map(move || Arc::clone(&broker_clone1)))
        .and_then(
            move |topic: String, event: Event, broker_clone2: Arc<Broker>| async move {
                broker_clone2.produce(topic, event).await;
                Ok::<_, warp::Rejection>(warp::reply())
            },
        );

    let subscribe = warp::path!("subscribe" / String).and(warp::ws()).map(
        move |topic: String, ws: warp::ws::Ws| {
            let broker_clone3 = Arc::clone(&broker_clone2);
            ws.on_upgrade(move |socket| async move {
                broker_clone3.subscribe(topic.clone(), socket).await;
            })
        },
    );

    let routes = produce.or(subscribe);

    println!("Broker server running at http://127.0.0.1:3030");
    warp::serve(routes).run(([127, 0, 0, 1], 3030)).await;
}

实现WebSocket客户端

WebSocket客户端将模拟一个订阅主题和接收消息的真实用户。

在src/bin目录下,创建一个ws_cli.rs文件。在文件中定义websocket_client函数,建立WebSocket连接并管理消息:

use futures_util::{sink::SinkExt, stream::StreamExt};
use std::sync::Arc;
use tokio::sync::RwLock;
use tokio::time::{sleep, Duration};
use tokio_tungstenite::{connect_async, tungstenite::protocol::Message};
use url::Url;

async fn websocket_client(topic_url: &str) {
    // 解析要连接WebSocket服务器的URL
    let url = Url::parse(topic_url).expect("Invalid URL");

    // 连接到WebSocket服务器
    let (ws_stream, _) = connect_async(url).await.expect("Failed to connect");

    println!("WebSocket client connected");

    let (mut write, mut read) = ws_stream.split();
    let message = Arc::new(RwLock::new(String::new()));
    let message_1 = message.clone();
    // 生成一个任务来处理传入的消息
    tokio::spawn(async move {
        let msg_lock = message_1.clone();
        while let Some(message) = read.next().await {
            match message {
                Ok(msg) => {
                    let mut ms = msg_lock.write().await;
                    *ms = msg.to_text().unwrap().to_string();
                    println!("Received message: {}", msg.to_text().unwrap());
                }
                Err(e) => {
                    eprintln!("Error receiving message: {:?}", e);
                    break;
                }
            }
        }
    });

    // 发送消息
    loop {
        let msg_lock = message.clone();
        let ms = msg_lock.read().await;
        if let Err(e) = write.send(Message::Text(ms.to_string())).await {
            eprintln!("Error sending message: {:?}", e);
            break;
        }
        sleep(Duration::from_secs(5)).await;
    }
}

main函数代码如下:

#[tokio::main]
async fn main() {
    websocket_client("ws://127.0.0.1:3030/subscribe/newtopic").await;
}

测试

执行如下命令运行消息代理服务器:

cargo run --bin real-ime-message

执行结果:

Broker server running at http://127.0.0.1:3030

然后打开一个新的命令行,执行如下命令运行WebSocket客户端:

cargo run --bin ws_cli

执行结果:

WebSocket client connected

向http://127.0.0.1:3030/produce/newtopic接口发送post请求,如图:

图片图片

客户端接收到消息:

WebSocket client connected
Received message: This is a new event

总结

我们已经探索了在Rust中创建一个简单的消息代理,并使用WebSocket客户端对其进行测试。这个例子突出了Rust在构建高效、并发的网络应用程序方面的能力。



Tags:Rust   点击:()  评论:()
声明:本站部分内容及图片来自互联网,转载是出于传递更多信息之目的,内容观点仅代表作者本人,不构成投资建议。投资者据此操作,风险自担。如有任何标注错误或版权侵犯请与我们联系,我们将及时更正、删除。
▌相关推荐
在Rust中使用Serde的详细指南
在处理HTTP请求时,我们总是需要在一种数据结构(可以是enum、struct等)和一种可以存储或传输并稍后重建的格式(例如JSON)之间来回转换。Serde是一个库(crate),用于高效、通用地...【详细内容】
2024-03-26  Search: Rust  点击:(13)  评论:(0)  加入收藏
Rust 写脚手架,Clap你应该知道的二三事
有感而发最近,在和前端小伙伴聊天发现,在2024年,她们都有打算入局Rust学习的行列。毕竟前端现在太卷了,框架算是走到「穷途末路」了,无非就是在原有基础上修修补补。所有他们想在...【详细内容】
2024-03-11  Search: Rust  点击:(20)  评论:(0)  加入收藏
前端开始“锈化”?Vue团队开源JS打包工具:基于Rust、速度极快、尤雨溪主导
Vue 团队已正式开源Rolldown &mdash;&mdash; 基于 Rust 的 JavaScrip 打包工具。Rolldown 是使用 Rust 开发的 Rollup 替代品,它提供与 Rollup 兼容的应用程序接口和插件接口...【详细内容】
2024-03-09  Search: Rust  点击:(11)  评论:(0)  加入收藏
Rust中的数据可视化指南
可视化是数据分析和解释的一个关键方面。虽然Rust主要以其性能和安全特性而闻名,但它也为数据可视化提供了强大的工具。在这个全面的指南中,我们将深入研究Rust中的数据可视化...【详细内容】
2024-03-07  Search: Rust  点击:(29)  评论:(0)  加入收藏
如何在Rust中操作JSON,你学会了吗?
sonic-rs ​还具有一些额外的方法来进行惰性评估和提高速度。例如,如果我们想要一个 JSON​ 字符串文字,我们可以在反序列化时使用 LazyValue​ 类型将其转换为一个仍然带有斜...【详细内容】
2024-02-27  Search: Rust  点击:(47)  评论:(0)  加入收藏
记一次Rust内存泄漏排查之旅
在某次持续压测过程中,我们发现 GreptimeDB 的 Frontend 节点内存即使在请求量平稳的阶段也在持续上涨,直至被 OOM kill。我们判断 Frontend 应该是有内存泄漏了,于是开启了排...【详细内容】
2024-02-27  Search: Rust  点击:(12)  评论:(0)  加入收藏
Rust 最受欢迎的这些库
今天分享主题是,关于一些值得注意的 Rust 库,这些库可以根据它们的功能和在编码中的受欢迎程度进行选择。什么是 Rust 库?在 Rust 中,常被称为 “crate” 的库,是一个打包的单元...【详细内容】
2024-02-19  Search: Rust  点击:(50)  评论:(0)  加入收藏
异步Rust:构建实时消息代理服务器
在本文中,我们将深入研究使用Rust构建实时消息代理服务器,展示其强大的并发特性。我们将使用Warp作为web服务器,并使用Tokio来管理异步任务。此外,我们将创建一个WebSocket客户...【详细内容】
2024-02-01  Search: Rust  点击:(57)  评论:(0)  加入收藏
在 Rust 编程中使用泛型
本文的内容将涉及泛型定义函数、结构体、枚举和方法, 还将讨论泛型如何影响代码性能。1.摘要Rust中的泛型可以让我们为像函数签名或结构体这样的项创建定义, 这样它们就可以...【详细内容】
2024-01-09  Search: Rust  点击:(89)  评论:(0)  加入收藏
什么是Rust语言 ,特点是什么,跟其它语言对比有什么优势
什么是RustRust是一种系统编程语言,旨在提供高性能和安全性。它是由Mozilla和其开发社区创建的开源语言,设计目标是在C++的应用场景中提供一种现代、可靠和高效的选择。Rust的...【详细内容】
2024-01-09  Search: Rust  点击:(203)  评论:(0)  加入收藏
▌简易百科推荐
在Rust中使用Serde的详细指南
在处理HTTP请求时,我们总是需要在一种数据结构(可以是enum、struct等)和一种可以存储或传输并稍后重建的格式(例如JSON)之间来回转换。Serde是一个库(crate),用于高效、通用地...【详细内容】
2024-03-26  coding到灯火阑珊  微信公众号  Tags:Rust   点击:(13)  评论:(0)  加入收藏
Rust 写脚手架,Clap你应该知道的二三事
有感而发最近,在和前端小伙伴聊天发现,在2024年,她们都有打算入局Rust学习的行列。毕竟前端现在太卷了,框架算是走到「穷途末路」了,无非就是在原有基础上修修补补。所有他们想在...【详细内容】
2024-03-11  前端柒八九  微信公众号  Tags:Rust   点击:(20)  评论:(0)  加入收藏
Rust中的数据可视化指南
可视化是数据分析和解释的一个关键方面。虽然Rust主要以其性能和安全特性而闻名,但它也为数据可视化提供了强大的工具。在这个全面的指南中,我们将深入研究Rust中的数据可视化...【详细内容】
2024-03-07  coding到灯火阑珊  微信公众号  Tags:Rust   点击:(29)  评论:(0)  加入收藏
如何在Rust中操作JSON,你学会了吗?
sonic-rs ​还具有一些额外的方法来进行惰性评估和提高速度。例如,如果我们想要一个 JSON​ 字符串文字,我们可以在反序列化时使用 LazyValue​ 类型将其转换为一个仍然带有斜...【详细内容】
2024-02-27  前端柒八九  微信公众号  Tags:Rust   点击:(47)  评论:(0)  加入收藏
记一次Rust内存泄漏排查之旅
在某次持续压测过程中,我们发现 GreptimeDB 的 Frontend 节点内存即使在请求量平稳的阶段也在持续上涨,直至被 OOM kill。我们判断 Frontend 应该是有内存泄漏了,于是开启了排...【详细内容】
2024-02-27  OSC开源社区    Tags:Rust   点击:(12)  评论:(0)  加入收藏
Rust 最受欢迎的这些库
今天分享主题是,关于一些值得注意的 Rust 库,这些库可以根据它们的功能和在编码中的受欢迎程度进行选择。什么是 Rust 库?在 Rust 中,常被称为 “crate” 的库,是一个打包的单元...【详细内容】
2024-02-19  码农渔夫  微信公众号  Tags:Rust   点击:(50)  评论:(0)  加入收藏
异步Rust:构建实时消息代理服务器
在本文中,我们将深入研究使用Rust构建实时消息代理服务器,展示其强大的并发特性。我们将使用Warp作为web服务器,并使用Tokio来管理异步任务。此外,我们将创建一个WebSocket客户...【详细内容】
2024-02-01      Tags:Rust   点击:(57)  评论:(0)  加入收藏
在 Rust 编程中使用泛型
本文的内容将涉及泛型定义函数、结构体、枚举和方法, 还将讨论泛型如何影响代码性能。1.摘要Rust中的泛型可以让我们为像函数签名或结构体这样的项创建定义, 这样它们就可以...【详细内容】
2024-01-09  二进制空间安全  微信公众号  Tags:Rust   点击:(89)  评论:(0)  加入收藏
什么是Rust语言 ,特点是什么,跟其它语言对比有什么优势
什么是RustRust是一种系统编程语言,旨在提供高性能和安全性。它是由Mozilla和其开发社区创建的开源语言,设计目标是在C++的应用场景中提供一种现代、可靠和高效的选择。Rust的...【详细内容】
2024-01-09    简易百科  Tags:Rust语言   点击:(203)  评论:(0)  加入收藏
在 Rust 编程中使用多线程
编程语言有一些不同的方法来实现线程,而且很多操作系统提供了创建新线程的 API。Rust 标准库使用 1:1 线程实现,这代表程序的每一个语言级线程使用一个系统线程。1. Rust线程...【详细内容】
2024-01-07  二进制空间安全  微信公众号  Tags:Rust 编程   点击:(77)  评论:(0)  加入收藏
站内最新
站内热门
站内头条