您当前的位置:首页 > 电脑百科 > 程序开发 > 编程百科

RabbitMQ——最常用的三大模式

时间:2021-04-13 09:59:21  来源:博客园  作者:海向

Direct 模式#

  • 所有发送到 Direct Exchange 的消息被转发到 RouteKey 中指定的 Queue。
  • Direct 模式可以使用 RabbitMQ 自带的 Exchange: default Exchange,所以不需要将 Exchange 进行任何绑定(binding)操作。
  • 消息传递时,RouteKey 必须完全匹配才会被队列接收,否则该消息会被抛弃,
RabbitMQ——最常用的三大模式

 

Copyimport com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class DirectProducer {
    public static void main(String[] args) throws Exception {
        //1. 创建一个 ConnectionFactory 并进行设置
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("guest");

        //2. 通过连接工厂来创建连接
        Connection connection = factory.newConnection();

        //3. 通过 Connection 来创建 Channel
        Channel channel = connection.createChannel();

        //4. 声明
        String exchangeName = "test_direct_exchange";
        String routingKey = "item.direct";

        //5. 发送
        String msg = "this is direct msg";
        channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
        System.out.println("Send message : " + msg);

        //6. 关闭连接
        channel.close();
        connection.close();
    }
}

Copyimport com.rabbitmq.client.*;
import JAVA.io.IOException;

public class DirectConsumer {

    public static void main(String[] args) throws Exception {
        //1. 创建一个 ConnectionFactory 并进行设置
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("guest");
       	factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(3000);
      
        //2. 通过连接工厂来创建连接
        Connection connection = factory.newConnection();

        //3. 通过 Connection 来创建 Channel
        Channel channel = connection.createChannel();

        //4. 声明
        String exchangeName = "test_direct_exchange";
        String queueName = "test_direct_queue";
        String routingKey = "item.direct";
        channel.exchangeDeclare(exchangeName, "direct", true, false, null);
        channel.queueDeclare(queueName, false, false, false, null);

        //一般不用代码绑定,在管理界面手动绑定
        channel.queueBind(queueName, exchangeName, routingKey);

        //5. 创建消费者并接收消息
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
            }
        };

        //6. 设置 Channel 消费者绑定队列
        channel.basicConsume(queueName, true, consumer);

    }
}

Copy Send message : this is direct msg
 
 [x] Received 'this is direct msg'

Topic 模式#

可以使用通配符进行模糊匹配

  • 符号'#" 匹配一个或多个词
  • 符号"*”匹配不多不少一个词

例如:

  • 'log.#"能够匹配到'log.info.oa"
  • "log.*"只会匹配到"log.erro“
RabbitMQ——最常用的三大模式

 

Copyimport com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class TopicProducer {

    public static void main(String[] args) throws Exception {
        //1. 创建一个 ConnectionFactory 并进行设置
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("guest");

        //2. 通过连接工厂来创建连接
        Connection connection = factory.newConnection();

        //3. 通过 Connection 来创建 Channel
        Channel channel = connection.createChannel();

        //4. 声明
        String exchangeName = "test_topic_exchange";
        String routingKey1 = "item.update";
        String routingKey2 = "item.delete";
        String routingKey3 = "user.add";

        //5. 发送
        String msg = "this is topic msg";
        channel.basicPublish(exchangeName, routingKey1, null, msg.getBytes());
        channel.basicPublish(exchangeName, routingKey2, null, msg.getBytes());
        channel.basicPublish(exchangeName, routingKey3, null, msg.getBytes());
        System.out.println("Send message : " + msg);

        //6. 关闭连接
        channel.close();
        connection.close();
    }
}

Copyimport com.rabbitmq.client.*;
import java.io.IOException;

public class TopicConsumer {

    public static void main(String[] args) throws Exception {
        //1. 创建一个 ConnectionFactory 并进行设置
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(3000);

        //2. 通过连接工厂来创建连接
        Connection connection = factory.newConnection();

        //3. 通过 Connection 来创建 Channel
        Channel channel = connection.createChannel();

        //4. 声明
        String exchangeName = "test_topic_exchange";
        String queueName = "test_topic_queue";
        String routingKey = "item.#";
        channel.exchangeDeclare(exchangeName, "topic", true, false, null);
        channel.queueDeclare(queueName, false, false, false, null);

        //一般不用代码绑定,在管理界面手动绑定
        channel.queueBind(queueName, exchangeName, routingKey);

        //5. 创建消费者并接收消息
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
            }
        };
        //6. 设置 Channel 消费者绑定队列
        channel.basicConsume(queueName, true, consumer);

    }
}

CopySend message : this is topc msg

[x] Received 'this is topc msg'
[x] Received 'this is topc msg'

Fanout 模式#

不处理路由键,只需要简单的将队列绑定到交换机上发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。Fanout交换机转发消息是最快的。

RabbitMQ——最常用的三大模式

 

Copyimport com.rabbitmq.client.*;
import java.io.IOException;

public class FanoutConsumer {
    public static void main(String[] args) throws Exception {
        //1. 创建一个 ConnectionFactory 并进行设置
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(3000);

        //2. 通过连接工厂来创建连接
        Connection connection = factory.newConnection();

        //3. 通过 Connection 来创建 Channel
        Channel channel = connection.createChannel();

        //4. 声明
        String exchangeName = "test_fanout_exchange";
        String queueName = "test_fanout_queue";
        String routingKey = "item.#";
        channel.exchangeDeclare(exchangeName, "fanout", true, false, null);
        channel.queueDeclare(queueName, false, false, false, null);

        //一般不用代码绑定,在管理界面手动绑定
        channel.queueBind(queueName, exchangeName, routingKey);

        //5. 创建消费者并接收消息
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
            }
        };

        //6. 设置 Channel 消费者绑定队列
        channel.basicConsume(queueName, true, consumer);
    }
}

Copyimport com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class FanoutProducer {

    public static void main(String[] args) throws Exception {
        //1. 创建一个 ConnectionFactory 并进行设置
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("guest");

        //2. 通过连接工厂来创建连接
        Connection connection = factory.newConnection();

        //3. 通过 Connection 来创建 Channel
        Channel channel = connection.createChannel();

        //4. 声明
        String exchangeName = "test_fanout_exchange";
        String routingKey1 = "item.update";
        String routingKey2 = "";
        String routingKey3 = "ookjkjjkhjhk";//任意routingkey

        //5. 发送
        String msg = "this is fanout msg";
        channel.basicPublish(exchangeName, routingKey1, null, msg.getBytes());
        channel.basicPublish(exchangeName, routingKey2, null, msg.getBytes());
        channel.basicPublish(exchangeName, routingKey3, null, msg.getBytes());
        System.out.println("Send message : " + msg);

        //6. 关闭连接
        channel.close();
        connection.close();
    }
}

CopySend message : this is fanout msg

[x] Received 'this is fanout msg'
[x] Received 'this is fanout msg'
[x] Received 'this is fanout msg'


作者: 海向

出处:
https://www.cnblogs.com/haixiang/p/10864339.html

本站使用「CC BY 4.0」创作共享协议,转载请在文章明显位置注明作者及出处。



Tags:RabbitMQ   点击:()  评论:()
声明:本站部分内容及图片来自互联网,转载是出于传递更多信息之目的,内容观点仅代表作者本人,如有任何标注错误或版权侵犯请与我们联系(Email:2595517585@qq.com),我们将及时更正、删除,谢谢。
▌相关推荐
RabbitMQ 介绍RabbitMQ 是一个由erlang语言编写的、开源的、在AMQP基础上完整的、可复用的企业消息系统。支持多种语言,包括java、Python、ruby、PHP、C/C++等。1.1.AMQP模型...【详细内容】
2021-11-17  Tags: RabbitMQ  点击:(16)  评论:(0)  加入收藏
下载Erlang和RabbitMQ官网下载地址Erlang下载地址: http://www.erlang.org/downloadsRabbitMQ下载地址: http://www.rabbitmq.com/download.html版本:( Erlang23+RabbitMQ3.8.4...【详细内容】
2021-08-31  Tags: RabbitMQ  点击:(49)  评论:(0)  加入收藏
环境:Spring Boot2.3.10 + RabbitMQ 3.8.12 + Erlang 23.2.51.1 RabbitMQ介绍RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务...【详细内容】
2021-04-22  Tags: RabbitMQ  点击:(336)  评论:(0)  加入收藏
RabbitMQ环境搭建erlang和RabbitMQ版本对应关系:https://www.rabbitmq.com/which-erlang.htmlerlang环境安装yum方式安装 yum源配置[root@iyeed RabbitMQ]# curl -s https://...【详细内容】
2021-04-14  Tags: RabbitMQ  点击:(281)  评论:(0)  加入收藏
Direct 模式# 所有发送到 Direct Exchange 的消息被转发到 RouteKey 中指定的 Queue。 Direct 模式可以使用 RabbitMQ 自带的 Exchange: default Exchange,所以不需要将 Exch...【详细内容】
2021-04-13  Tags: RabbitMQ  点击:(220)  评论:(0)  加入收藏
一、关于 RabbitMQ说到 RabbitMQ,相信大家都不会陌生,微服务开发中必不可少的中间件。 在上篇关于消息队列的文章中,我们了解到 RabbitMQ 本质其实是用 Erlang 开发的 AMQP(Adva...【详细内容】
2021-03-11  Tags: RabbitMQ  点击:(229)  评论:(0)  加入收藏
说明:想要理解RabbitMQ,需要先理解MQ是什么?能做什么?然后根据基础知识去理解RabbitMQ是什么、提供了什么功能。一、MQ的简单理解1. 什么是MQ? 消息队列(Message Queue),是基础数据...【详细内容】
2021-02-07  Tags: RabbitMQ  点击:(155)  评论:(0)  加入收藏
1、 查找Docker容器中的RabbitMQ镜像docker ps -a[root@linux ~]# docker ps -aCONTAINER ID IMAGE COMMAND CREATED...【详细内容】
2020-11-27  Tags: RabbitMQ  点击:(221)  评论:(0)  加入收藏
一、Maven依赖添加 <!-- rabbitmq相关依赖 --> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId>...【详细内容】
2020-08-01  Tags: RabbitMQ  点击:(53)  评论:(0)  加入收藏
一、简单的发送与接收消息 HelloWorld1. 发送消息发送消息首先要获取与rabbitmq-server的连接,然后从渠道(chann)中指定的queue发送消息 , 不能定义两个queue名字相同,但属性...【详细内容】
2020-04-03  Tags: RabbitMQ  点击:(61)  评论:(0)  加入收藏
▌简易百科推荐
摘 要 (OF作品展示)OF之前介绍了用python实现数据可视化、数据分析及一些小项目,但基本都是后端的知识。想要做一个好看的可视化大屏,我们还要学一些前端的知识(vue),网上有很多比...【详细内容】
2021-12-27  项目与数据管理    Tags:Vue   点击:(1)  评论:(0)  加入收藏
程序是如何被执行的&emsp;&emsp;程序是如何被执行的?许多开发者可能也没法回答这个问题,大多数人更注重的是如何编写程序,却不会太注意编写好的程序是如何被运行,这并不是一个好...【详细内容】
2021-12-23  IT学习日记    Tags:程序   点击:(9)  评论:(0)  加入收藏
阅读收获✔️1. 了解单点登录实现原理✔️2. 掌握快速使用xxl-sso接入单点登录功能一、早期的多系统登录解决方案 单系统登录解决方案的核心是cookie,cookie携带会话id在浏览器...【详细内容】
2021-12-23  程序yuan    Tags:单点登录(   点击:(8)  评论:(0)  加入收藏
下载Eclipse RCP IDE如果你电脑上还没有安装Eclipse,那么请到这里下载对应版本的软件进行安装。具体的安装步骤就不在这赘述了。创建第一个标准Eclipse RCP应用(总共分为六步)1...【详细内容】
2021-12-22  阿福ChrisYuan    Tags:RCP应用   点击:(7)  评论:(0)  加入收藏
今天想简单聊一聊 Token 的 Value Capture,就是币的价值问题。首先说明啊,这个话题包含的内容非常之光,Token 的经济学设计也可以包含诸多问题,所以几乎不可能把这个问题说的清...【详细内容】
2021-12-21  唐少华TSH    Tags:Token   点击:(9)  评论:(0)  加入收藏
实现效果:假如有10条数据,分组展示,默认在当前页面展示4个,点击换一批,从第5个开始继续展示,到最后一组,再重新返回到第一组 data() { return { qList: [], //处理后...【详细内容】
2021-12-17  Mason程    Tags:VUE   点击:(14)  评论:(0)  加入收藏
什么是性能调优?(what) 为什么需要性能调优?(why) 什么时候需要性能调优?(when) 什么地方需要性能调优?(where) 什么时候来进行性能调优?(who) 怎么样进行性能调优?(How) 硬件配...【详细内容】
2021-12-16  软件测试小p    Tags:性能调优   点击:(19)  评论:(0)  加入收藏
Tasker 是一款适用于 Android 设备的高级自动化应用,它可以通过脚本让重复性的操作自动运行,提高效率。 不知道从哪里听说的抖音 app 会导致 OLED 屏幕烧屏。于是就现学现卖,自...【详细内容】
2021-12-15  ITBang    Tags:抖音防烧屏   点击:(23)  评论:(0)  加入收藏
11 月 23 日,Rust Moderation Team(审核团队)在 GitHub 上发布了辞职公告,即刻生效。根据公告,审核团队集体辞职是为了抗议 Rust 核心团队(Core team)在执行社区行为准则和标准上...【详细内容】
2021-12-15  InfoQ    Tags:Rust   点击:(24)  评论:(0)  加入收藏
一个项目的大部分API,测试用例在参数和参数值等信息会有很多相似的地方。我们可以复制API,复制用例来快速生成,然后做细微调整既可以满足我们的测试需求1.复制API:在菜单发布单...【详细内容】
2021-12-14  AutoMeter    Tags:AutoMeter   点击:(20)  评论:(0)  加入收藏
最新更新
栏目热门
栏目头条