您当前的位置:首页 > 电脑百科 > 程序开发 > 框架

微服务与中间件系列——RabbitMQ,SpringAMQP使用

时间:2022-08-26 15:17:57  来源:  作者:Java互联网技术栈

同步通讯

发送方发出数据后,等接收方发回响应以后才发下一个数据包的通讯方式

同步调用的时效性强,可以立即获取结果

同步调用的问题

我们以前在使用Feign或OpenFeign时,就是使用的同步调用

  1. 代码耦合度高:每次加入新的需求,都要修改原来的代码
  2. 性能低:调用者需要等待服务提供者响应,如果调用链过长则响应时间等于每次调用的时间之和。
  3. 资源浪费:调用链中的每个服务在等待响应过程中,不能释放请求占用的资源,高并发场景下会极度浪费系统资源
  4. 级联失败:如果服务提供者出现问题,所有调用方都会跟着出问题,如同多米诺骨牌一样,迅速导致整个微服务群故障

异步通讯

发送方发出数据后,不等接收方发回响应,接着发送下个数据包的通讯方式

其常见的实线就是事件驱动模式

可以实现服务解耦的问题,性能得到提升,吞吐量提高,服务没有强依赖性,不必担心级联失败问题,实现服务削峰

异步通信的问题

  1. 依赖于Broker的可靠性、安全性、吞吐能力
  2. 架构复杂了,业务没有明显的流程线,不好追踪管理

什么是MQ

N (Message Quene):翻译为j消息队列,通过典型的生产者和消费者模型生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入轻松地实现系统间解耦。别名为消息中间件,通过利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。

1.ActiveMQ

ActiveM 是A4pache出品,最流行的,能力强劲的开源消息总线。它是一个完全支持/8规范的的消息中间件。丰富的API,多种集群架构模式让认kctiveMA在业界成为老牌的消息中间件,在中小型企业颇受欢迎!

2.Kafka

Kafka是LinkedIn开源的分布式发布-订阅消息系统,目前归属于Apache顶级项目。Kafka主要特点是基于Pu11的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输。8.8版本开始支持复制,不支持事务,对消息的重复、丢失、错误没有严格要求,适合产生大量数据的互联网服务的数据收集业务。

3.RocketMQ

RocketNQ是阿里开源的消息中间件,它是纯JAVA开发,具有高吞吐量、高可用性、适合大规模分布式系统应用的特点。RocketNO思路起源于Kafka,但并不是Kafka的一个Copy,它对消息的可靠传输及事务性做了优化,目前在阿里集团被广泛应用于交易、充值、流计算、消息推送、日志流式处理、binglog分发等场景。

4.RabbitMQ

RabbitNQ是使用Erlang语言开发的开源消息队列系统,基于ANQP协议来实现。ANQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。ANQP协议更多用在企业系统内对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。

Docker安装RabbitMQ

docker pull rabbitmq
//启动
docker run 
-e RABBITMQ_DEFAULT_USER=syf20020816 
-e RABBITMQ_DEFAULT_PASS=20020816 
--name mq 
--hostname mq1 
-p 15672:15672 
-p 5672:5672 
-d 
rabbitmq:3.10-management

 

访问15672端口

 

 

RabbitMq架构

 

  • channel:操作MQ的工具
  • exchange:路由消息到队列中
  • queue:缓存消息
  • virtual host:虚拟主机,是对queue、exchange等资源的逻辑分组

QuickStart

简单项目中(点对点的简单模型)

 

1.引入依赖

<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.15.0</version>
</dependency>

2.建立虚拟主机

 

添加完成后如图:

 

3.建立生产者

/**
 * 直连式连接
 */
public class Publisher {
 
    public static void mAIn(String[] args) throws IOException, TimeoutException {
 
        //创建连接mq的连接工厂对象
        final ConnectionFactory connectionFactory = new ConnectionFactory();
        //设置主机
        connectionFactory.setHost("192.168.112.101");
        //设置端口号
        connectionFactory.setPort(5672);
        //设置连接虚拟主机
        connectionFactory.setVirtualHost("/test");
        //设置虚拟主机的用户名密码
        connectionFactory.setUsername("ssf2asdas6");
        connectionFactory.setPassword("204545454");
        //获取连接对象
        final Connection connection = connectionFactory.newConnection();
        //获取连接中的通道
        final Channel channel = connection.createChannel();
        //通道绑定对应的消息队列
        //参数1:队列名称,不存在则创建
        //参数2:定义队列特性是否需要持久化,true:持久化,false:非持久化
        //参数3:是否独占队列
        //参数4:是否在消费完成后删除队列
        //参数5:拓展附加参数
        channel.queueDeclare("demo1",false,false,false,null);
        //发布消息
        //参数1:交换机名称(exchange)
        //参数2:队列名称
        //参数3:传递消息额外设置
        //参数4:消息的具体内容
        channel.basicPublish("","demo1",null,"hello world".getBytes());
        //关闭资源
        channel.close();
        connection.close();

    }
}

你可以看到在队列里就多了一条消息了

 

4.建立消费者

public class Consumer {
 
    public static void main(String[] args) throws IOException, TimeoutException {
 
        //创建链接工厂
        final ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.112.101");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/test");
        connectionFactory.setUsername("ssf2asdas6");
        connectionFactory.setPassword("204545454");
        //创建连接对象
        final Connection connection = connectionFactory.newConnection();
        //创建通道
        final Channel channel = connection.createChannel();
        channel.queueDeclare("demo1",false,false,false,null);
        //消费消息
        //参数1:消费队列的名称
        //参数2:开启消息的自动确认机制
        //参数3:消费时的回调接口
        channel.basicConsume("demo1",true, new DefaultConsumer(channel){
 
            //最后一个参数:消息队列中取出的消息
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
 
                System.out.println(new String(body));
            }
        });
        //关闭资源,若不关闭则一直进行监听
        channel.close();
        connection.close();
    }
}

 

消费后就看到这里队列里的消息就清零了

 

SpringAMQP

Spring AMQP是基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息。包含两部分,其中spring-amqp是基础抽象,spring-rabbit是底层的默认实现。

官方网址

https://spring.io/projects/spring-amqp

QuickStart

1. 初始化一个简单微服务项目

 

结构如下

 

2.编写yaml配置

无论你的消息发布者还是消息消费者都需要使用以下yaml配置

spring:
  rabbitmq:
    host: 192.168.112.101
    port: 5672
    virtual-host: /test
    username: sysdaa6
    password: 20asdsa16

3.发送消息

这里注意的是,你的队列一定要是存在的

@SpringBootTest
class PublisherApplicationTests {
 
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    void contextLoads() {
 
        rabbitTemplate.convertAndSend("demo1","hello springAMQP!");

    }
}

4.接收消息(消费)

@Component
public class SpringRabbitListener {
 
    @RabbitListener(queues = "demo1")
    public void listenQueue(String msg){
 
        System.out.println(msg);
    }
}

5.启动测试

 

WorkQueue模型

使用工作队列模型

工作队列,可以提高消息处理速度,避免队列消息堆积

 

消息预取机制

由于消息预取机制的存在,消息会平均地分配给每一个消费者

 

修改消费者的yaml修改预取机制

spring:
  rabbitmq:
    host: 192.168.112.101
    port: 5672
    virtual-host: /test
    username: your username
    password: your password
    listener:
      simple:
        prefetch: 1 #表示每次只能获取一条消息,处理完才能获取下一条

消息发送者

@SpringBootTest
class PublisherApplicationTests {
 
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    void testWorkQueue() throws InterruptedException {
 
        String queueName = "demo1";
        String msg = "this is the msg:";
        for (int i = 0; i <= 50; i++) {
 
            rabbitTemplate.convertAndSend(queueName,msg+i);
            Thread.sleep(50);
        }
    }
}

消息消费者

@Component
public class SpringRabbitListener {
 

    @RabbitListener(queues = "demo1")
    public void listenWorkQueue(String msg) throws InterruptedException {
 
        System.out.println("=====consumer 1:=====|"+ LocalDateTime.now());
        System.out.println(msg);
        Thread.sleep(50);
    }

    @RabbitListener(queues = "demo1")
    public void listenWorkQueue2(String msg) throws InterruptedException {
 
        System.err.println("=====consumer 2:=====|"+ LocalDateTime.now());
        System.err.println(msg);
        Thread.sleep(500);
    }
}

 

发布订阅模式

发布订阅模式与之前案例的区别就是允许将同一消息发送给多个消费者。实现方式是加入了exchange(交换机)

exchange负责消息路由,而不是存储,路由失败则消息丢失

 

常见exchange类型包括

  1. Fanout:广播
  2. Direct:路由
  3. Topic:话题

交换机的作用

  1. 接收publisher发送的消息
  2. 将消息按照规则路由到与之绑定的队列
  3. 不能缓存消息,路由失败,消息丢失
  4. FanoutExchange的会将消息路由到每个绑定的队列

消息消费者

定义一个 FanoutConfig 配置类进行交换机和队列的绑定

@Configuration
public class FanoutConfig {
 
    //交换机
    @Bean
    public FanoutExchange fanoutExchange() {
 
        return new FanoutExchange("test.fanout");
    }

    //队列1
    @Bean
    public Queue fanoutQueue1() {
 
        return new Queue("fanout.queue1");
    }

    //绑定队列1到交换机
    @Bean
    public Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange) {
 
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }

    //队列2
    @Bean
    public Queue fanoutQueue2() {
 
        return new Queue("fanout.queue2");
    }

    //绑定队列2到交换机
    @Bean
    public Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange) {
 
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
    }
}

消息发送者

@Test
    void testSendFanoutExchange(){
 
        //交换机名称
        String exchangeName = "test.fanout";
        //消息
        String msg = "test for send FanoutExchange";
        //发送消息
        rabbitTemplate.convertAndSend(exchangeName,"",msg);
    }

 


 

 

路由模式

Direct Exchange 会将接收到的消息根据规则路由到指定的Queue,因此称为路由模式(routes)。

  • 每一个Queue都与Exchange设置一个BindingKey
  • 发布者发送消息时,指定消息的RoutingKey
  • Exchange将消息路由到BindingKey与消息RoutingKey一致的队列

 

消息发送者

消息发送者中指明routerKey

@Test
    void testSendDirectExchange(){
 
        //交换机名称
        String exchangeName = "test.direct";
        //消息
        String msg = "test for router to black queue";
        //发送消息
        rabbitTemplate.convertAndSend(exchangeName,"black",msg);
    }
    @Test
    void testSendDirectExchange2(){
 
        //交换机名称
        String exchangeName = "test.direct";
        //消息
        String msg = "test for router to white queue";
        //发送消息
        rabbitTemplate.convertAndSend(exchangeName,"white",msg);
    }

消息消费者

定义一个监听组件使用注解形式指定队列名称,交换机名称和类型(默认direct),以及路由通道

@Component
public class SpringRabbitListener {
 

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue1"),
            exchange = @Exchange(name = "test.direct",type = ExchangeTypes.DIRECT),
            key = {
 "black","white"}
    ))
    public void listenDirectQueue1(String msg){
 
        System.out.println("direct queue1:"+msg);
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue2"),
            exchange = @Exchange(name = "test.direct",type = ExchangeTypes.DIRECT),
            key = {
 "black","green"}
    ))
    public void listenDirectQueue2(String msg){
 
        System.out.println("direct queue2:"+msg);
    }
}

 


 


 

话题模式

TopicExchange与DirectExchange类似,区别在于routingKey必须是多个单词的列表,并且以 . 分割。

如: person.zhangsan

Queue与Exchange指定BindingKey时可以使用通配符:

  • #:代指0个或多个单词
  • *:代指一个单词

 

消息发送者

@Test
    public void  testTopicExchange(){
 
        //交换机名称
        String exchangeName = "test.topic";
        //消息
        String msg = "test for topic in china shanghai";
        //发送消息
        rabbitTemplate.convertAndSend(exchangeName,"china.shanghai",msg);
    }

消息消费者

@RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.queue1"),
            exchange = @Exchange(name ="test.topic",type = ExchangeTypes.TOPIC),
            key = "china.shanghai"
    ))
    public void listenTopicQueue1(String msg){
 
        System.out.println("topic:china.shanghai:"+msg);
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.queue2"),
            exchange = @Exchange(name ="test.topic",type = ExchangeTypes.TOPIC),
            key = "american.newyork"
    ))
    public void listenTopicQueue2(String msg){
 
        System.out.println("topic:american.newyork:"+msg);
    }
结果

 


 



Tags:微服务   点击:()  评论:()
声明:本站部分内容及图片来自互联网,转载是出于传递更多信息之目的,内容观点仅代表作者本人,不构成投资建议。投资者据此操作,风险自担。如有任何标注错误或版权侵犯请与我们联系,我们将及时更正、删除。
▌相关推荐
对于微服务架构监控应该遵守的原则
随着软件交付方式的变革,微服务架构的兴起使得软件开发变得更加快速和灵活。在这种情况下,监控系统成为了微服务控制系统的核心组成部分。随着软件的复杂性不断增加,了解系统的...【详细内容】
2024-04-03  Search: 微服务  点击:(5)  评论:(0)  加入收藏
PHP+Go 开发仿简书,实战高并发高可用微服务架构
来百度APP畅享高清图片//下栽のke:chaoxingit.com/2105/PHP和Go语言结合,可以开发出高效且稳定的仿简书应用。在实现高并发和高可用微服务架构时,我们可以采用一些关键技术。首...【详细内容】
2024-01-14  Search: 微服务  点击:(115)  评论:(0)  加入收藏
九条微服务最佳实践,你学会了哪条?
微服务之间连贯一致的代码库对于可维护性至关重要。保持代码成熟度相似,可确保系统统一演进,防止服务间出现性能、安全性和功能差异。在开发微服务时,我们需要遵循哪些最佳实践...【详细内容】
2024-01-05  Search: 微服务  点击:(99)  评论:(0)  加入收藏
Go微服务入门到容器化实践
Go微服务入门到容器化实践Go 是一门高效、现代化、快速增长的编程语言,非常适合构建 Web 应用程序。而 Docker 是一种轻量级的容器化技术,能够使得您的应用程序在任何地方运行...【详细内容】
2024-01-01  Search: 微服务  点击:(63)  评论:(0)  加入收藏
微服务全做错了!谷歌提出新方法,成本直接降为1/9!
2023,微服务“水逆”之年。长期以来,不管大厂还是小厂,微服务都被认为是云原生服务应用程序架构的事实标准,然而2023,不止那位37signals的DHH决心下云,放弃微服务,就连亚马逊和谷歌...【详细内容】
2023-12-29  Search: 微服务  点击:(121)  评论:(0)  加入收藏
微服务架构中的数据一致性
在微服务中,一个逻辑上原子操作可以经常跨越多个微服务。即使是单片系统也可能使用多个数据库或消息传递解决方案。使用多个独立的数据存储解决方案,如果其中一个分布式流程参...【详细内容】
2023-12-27  Search: 微服务  点击:(143)  评论:(0)  加入收藏
监控 Spring Cloud 微服务的实践方案
一、简介Spring Cloud是一个基于Spring Boot实现的微服务框架,它提供了丰富的微服务功能,如分布式配置、服务注册与发现、服务熔断、负载均衡等。为了更好地管理和监控这样复...【详细内容】
2023-12-19  Search: 微服务  点击:(145)  评论:(0)  加入收藏
聊聊微服务链路服务
微服务架构图片如果有用户反馈某个页面很慢,我们知道这个页面的请求调用链是 A -----> C -----> B -----> D(图片有误),怎么来定位是由哪个服务引起的问题呢? 更进一步,如果...【详细内容】
2023-12-15  Search: 微服务  点击:(127)  评论:(0)  加入收藏
选择适合微服务的编程语言,让你的工作事半功倍!
讨论编程语言就像是一场政治辩论。每个开发者都会过分捍卫他/她所使用的编程语言。然而,编程语言应该被看作是它们真正是的东西,即一种工作工具。每种编程语言都有特定的目的...【详细内容】
2023-12-14  Search: 微服务  点击:(178)  评论:(0)  加入收藏
Eureka: 微服务架构中不可或缺的服务治理工具
Eureka是Netflix开源的一款用于服务治理的工具,它是NetflixOSS(OpenSourceSoftware)项目的一部分,主要用于实现微服务架构中的服务注册与发现。在当今庞大而复杂的微服务系统中,E...【详细内容】
2023-12-14  Search: 微服务  点击:(193)  评论:(0)  加入收藏
▌简易百科推荐
Qt与Flutter:在跨平台UI框架中哪个更受欢迎?
在跨平台UI框架领域,Qt和Flutter是两个备受瞩目的选择。它们各自具有独特的优势,也各自有着广泛的应用场景。本文将对Qt和Flutter进行详细的比较,以探讨在跨平台UI框架中哪个更...【详细内容】
2024-04-12  刘长伟    Tags:UI框架   点击:(1)  评论:(0)  加入收藏
Web Components实践:如何搭建一个框架无关的AI组件库
一、让人又爱又恨的Web ComponentsWeb Components是一种用于构建可重用的Web元素的技术。它允许开发者创建自定义的HTML元素,这些元素可以在不同的Web应用程序中重复使用,并且...【详细内容】
2024-04-03  京东云开发者    Tags:Web Components   点击:(8)  评论:(0)  加入收藏
Kubernetes 集群 CPU 使用率只有 13% :这下大家该知道如何省钱了
作者 | THE STACK译者 | 刘雅梦策划 | Tina根据 CAST AI 对 4000 个 Kubernetes 集群的分析,Kubernetes 集群通常只使用 13% 的 CPU 和平均 20% 的内存,这表明存在严重的过度...【详细内容】
2024-03-08  InfoQ    Tags:Kubernetes   点击:(17)  评论:(0)  加入收藏
Spring Security:保障应用安全的利器
SpringSecurity作为一个功能强大的安全框架,为Java应用程序提供了全面的安全保障,包括认证、授权、防护和集成等方面。本文将介绍SpringSecurity在这些方面的特性和优势,以及它...【详细内容】
2024-02-27  风舞凋零叶    Tags:Spring Security   点击:(54)  评论:(0)  加入收藏
五大跨平台桌面应用开发框架:Electron、Tauri、Flutter等
一、什么是跨平台桌面应用开发框架跨平台桌面应用开发框架是一种工具或框架,它允许开发者使用一种统一的代码库或语言来创建能够在多个操作系统上运行的桌面应用程序。传统上...【详细内容】
2024-02-26  贝格前端工场    Tags:框架   点击:(47)  评论:(0)  加入收藏
Spring Security权限控制框架使用指南
在常用的后台管理系统中,通常都会有访问权限控制的需求,用于限制不同人员对于接口的访问能力,如果用户不具备指定的权限,则不能访问某些接口。本文将用 waynboot-mall 项目举例...【详细内容】
2024-02-19  程序员wayn  微信公众号  Tags:Spring   点击:(39)  评论:(0)  加入收藏
开发者的Kubernetes懒人指南
你可以将本文作为开发者快速了解 Kubernetes 的指南。从基础知识到更高级的主题,如 Helm Chart,以及所有这些如何影响你作为开发者。译自Kubernetes for Lazy Developers。作...【详细内容】
2024-02-01  云云众生s  微信公众号  Tags:Kubernetes   点击:(51)  评论:(0)  加入收藏
链世界:一种简单而有效的人类行为Agent模型强化学习框架
强化学习是一种机器学习的方法,它通过让智能体(Agent)与环境交互,从而学习如何选择最优的行动来最大化累积的奖励。强化学习在许多领域都有广泛的应用,例如游戏、机器人、自动驾...【详细内容】
2024-01-30  大噬元兽  微信公众号  Tags:框架   点击:(68)  评论:(0)  加入收藏
Spring实现Kafka重试Topic,真的太香了
概述Kafka的强大功能之一是每个分区都有一个Consumer的偏移值。该偏移值是消费者将读取的下一条消息的值。可以自动或手动增加该值。如果我们由于错误而无法处理消息并想重...【详细内容】
2024-01-26  HELLO程序员  微信公众号  Tags:Spring   点击:(88)  评论:(0)  加入收藏
SpringBoot如何实现缓存预热?
缓存预热是指在 Spring Boot 项目启动时,预先将数据加载到缓存系统(如 Redis)中的一种机制。那么问题来了,在 Spring Boot 项目启动之后,在什么时候?在哪里可以将数据加载到缓存系...【详细内容】
2024-01-19   Java中文社群  微信公众号  Tags:SpringBoot   点击:(86)  评论:(0)  加入收藏
站内最新
站内热门
站内头条