您当前的位置:首页 > 电脑百科 > 程序开发 > 编程百科

RabbitMQ如何实现延迟队列?

时间:2024-01-26 13:24:25  来源:微信公众号  作者: Java中文社群

延迟队列是指当消息被发送以后,并不是立即执行,而是等待特定的时间后,消费者才会执行该消息。

延迟队列的使用场景有以下几种:

  1. 未按时支付的订单,30 分钟过期之后取消订单。
  2. 给活跃度比较低的用户间隔 N 天之后推送消息,提高活跃度。
  3. 新注册会员的用户,等待几分钟之后发送欢迎邮件等。

一、如何实现延迟队列?

延迟队列有以下两种实现方式:

  1. 通过消息过期后进入死信交换器,再由交换器转发到延迟消费队列,实现延迟功能;
  2. 使用官方提供的延迟插件实现延迟功能。

早期,大部分公司都会采用第一种方式,而随着 RabbitMQ 3.5.7(2015 年底发布)的延迟插件的发布,因为其使用更简单、更方便,所以它现在才是大家普通会采用的,实现延迟队列的方式,所以本文也只讲第二种方式。

二、实现延迟队列

1、安装并启动延迟队列

(1)下载延迟插件

https://Github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases.

注意:需要根据你自己的 RabbitMQ 服务器端版本选择相同版本的延迟插件,可以在 RabbitMQ 控制台查看:

(2)将插件放到插件目录

接下来,将上一步下载的插件放到 RabbitMQ 服务器安装目录,如果是 Docker,使用一下命令复制:

docker cp 宿主机文件 容器名称或ID:容器目录

如下图所示:

之后,进入 docker 容器,查看插件中是否包含延迟队列:

docker exec -it 容器名称或ID /bin/bash rabbitmq-plugins list

如下图所示:

(3)启动插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

如下图所示:

(4)重启RabbitMQ服务

安装完 RabbitMQ 插件之后,需要重启 RabbitMQ 服务才能生效。如果使用的是 Docker,只需要重启 Docker 容器即可:

docker restart 容器名称或ID

如下图所示:

(5)验收结果

在 RabbitMQ 控制台查看,新建交换机时是否有延迟消息选项,如果有就说明延迟消息插件已经正常运行了,如下图所示:

(6)手动创建延迟交换器(可选)

此步骤可选(非必须),因为某些版本下通过程序创建延迟交换器可能会出错,如果出错了,手动创建延迟队列即可,如下图所示:

2、编写延迟消息实现代码

(1)配置交换器和队列

import org.springframework.context.annotation.Configuration;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;

/**
 * 延迟交换器和队列
 */
@Configuration
public class DelayedExchangeConfig {
    public static final String EXCHANGE_NAME = "myDelayedExchange";
    public static final String QUEUE_NAME = "delayed.queue";
    public static final String ROUTING_KEY = "delayed.routing.key";

    @Bean
    public CustomExchange delayedExchange() {
        return new CustomExchange(EXCHANGE_NAME,
                "x-delayed-message", // 消息类型
                true, // 是否持久化
                false); // 是否自动删除
    }

    @Bean
    public Queue delayedQueue() {
        return QueueBuilder.durable(QUEUE_NAME)
                .withArgument("x-delayed-type", "direct")
                .build();
    }

    @Bean
    public Binding delayedBinding(Queue delayedQueue,CustomExchange delayedExchange) {
        return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with(ROUTING_KEY).noargs();
    }
}

(2)定义消息发送方法

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

@Component
public class DelayedMessageProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Scheduled(fixedDelay = 5000)
    public void sendDelayedMessage(String message) {
        rabbitTemplate.convertAndSend(DelayedExchangeConfig.EXCHANGE_NAME,
                DelayedExchangeConfig.ROUTING_KEY,
                message,
                messagePostProcessor -> {
                    messagePostProcessor.getMessageProperties().setDelay(10000); // 设置延迟时间,单位毫秒
                    return messagePostProcessor;
                });
    }
}

(3)发送延迟消息

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMApping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/delayed")
public class DelayedMessageController {
    @Autowired
    private DelayedMessageProducer delayedMessageProducer;

    @GetMapping("/send")
    public String sendDirectMessage(@RequestParam String message) {
        delayedMessageProducer.sendDelayedMessage(message);
        return "Delayed message sent to Exchange: " + message;
    }
}(4)接收延迟消息
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


@Component
public class DelayedMessageConsumer {

    @RabbitListener(queues = DelayedExchangeConfig.QUEUE_NAME)
    public void receiveDelayedMessage(String message) {
        System.out.println("Received delayed message: " + message);
    }
}

小结

实现 RabbitMQ 延迟队列目前主流的实现方式,是采用官方提供的延迟插件来实现。而延迟插件需要先下载插件、然后配置并重启 RabbitMQ 服务,之后就可以通过编写代码的方式实现延迟队列了。



Tags:RabbitMQ   点击:()  评论:()
声明:本站部分内容及图片来自互联网,转载是出于传递更多信息之目的,内容观点仅代表作者本人,不构成投资建议。投资者据此操作,风险自担。如有任何标注错误或版权侵犯请与我们联系(Email:2595517585@qq.com),我们将及时更正、删除。
▌相关推荐
RabbitMQ如何实现延迟队列?
延迟队列是指当消息被发送以后,并不是立即执行,而是等待特定的时间后,消费者才会执行该消息。延迟队列的使用场景有以下几种: 未按时支付的订单,30 分钟过期之后取消订单。 给活...【详细内容】
2024-01-26  Tags: RabbitMQ  点击:(2)  评论:(0)  加入收藏
RabbitMQ消息顺序性解密:保证消息的正确顺序
在分布式系统中,保证消息的正确顺序对于一些应用场景至关重要。而RabbitMQ作为一种流行的消息队列系统,本身并不提供严格的消息顺序保证。下面将探讨如何在使用RabbitMQ时实现...【详细内容】
2023-12-04  Tags: RabbitMQ  点击:(60)  评论:(0)  加入收藏
RabbitMQ与消息限流策略的完美结合
在当今互联网时代,高并发访问已成为许多应用系统面临的常见挑战之一。对于需要处理大量请求的系统来说,如何保证系统的稳定性和可靠性是一个关键问题。RabbitMQ作为一种可靠的...【详细内容】
2023-11-27  Tags: RabbitMQ  点击:(97)  评论:(0)  加入收藏
实时协作的秘诀:RabbitMQ与WebSockets的结合
实时协作是现代软件开发中非常重要的一个方面。为了实现实时协作,一种常见的做法是将消息队列与WebSocket技术相结合。其中,RabbitMQ是一个功能强大的消息队列系统,它能够有效...【详细内容】
2023-11-21  Tags: RabbitMQ  点击:(117)  评论:(0)  加入收藏
RabbitMQ中的消息持久化策略与存储优化实践
本文将介绍RabbitMQ中的消息持久化策略,并提供一些存储优化的实践方法,帮助您确保消息的可靠性和系统的性能。在RabbitMQ消息队列中,消息的可靠性传输和持久化是非常重要的。下...【详细内容】
2023-11-15  Tags: RabbitMQ  点击:(126)  评论:(0)  加入收藏
Centos7下安装部署RabbitMQ,看这篇就够了
前言RabbitMQ是一个开源的强大的企业消息系统,支持主流的操作系统,支持多种开发语言。我们项目中使用RabbitMQ作为消息队列,解耦业务,构建高可靠的消息队列系统。RabbitMQ可以...【详细内容】
2023-11-09  Tags: RabbitMQ  点击:(201)  评论:(0)  加入收藏
RabbitMQ发送和接收消息的几种方式
channel.basicQos(0, 1, false):0表示对消息的大小无限制,1表示每次只允许消费一条,false表示该限制不作用于channel。同时,我们采用手工ACK的方式,因为我们配置文件配置了 spri...【详细内容】
2023-11-08  Tags: RabbitMQ  点击:(177)  评论:(0)  加入收藏
RabbitMQ的四种交换机详解
交换机主要是接收消息并且转发到绑定的队列,交换机不存储消息,在启用ack模式后,交换机找不到队列会返回错误。交换机有四种类型:Direct, topic, Headers and Fanout。图片一、to...【详细内容】
2023-11-06  Tags: RabbitMQ  点击:(192)  评论:(0)  加入收藏
深入浅出RabbitMQ:顺序消费、死信队列和延时队列
1. 引言在今天的文章中,我们来聊一聊 RabbitMQ,这是小 ❤ 在工作中用的最早的消息中间件,主要用于大量数据的异步消费。2. RabbitMQ2.1 核心组件RabbitMQ 是一个开源的消息中间...【详细内容】
2023-11-03  Tags: RabbitMQ  点击:(114)  评论:(0)  加入收藏
在Linux系统中实现容器化的消息中间件:RabbitMQ和Kafka
消息中间件在现代分布式系统中起着至关重要的作用。它们可以在不同的应用程序之间实现可靠的异步通信,提供高吞吐量、低延迟和可扩展性。下面将介绍如何在Linux系统中使用容...【详细内容】
2023-09-08  Tags: RabbitMQ  点击:(305)  评论:(0)  加入收藏
▌简易百科推荐
手把手教你为开源项目贡献代码
背景前段时间无意间看到一篇公众号 招贤令:一起来搞一个新开源项目,作者介绍他想要做一个开源项目:cprobe 用于整合目前市面上散落在各地的 Exporter,统一进行管理。比如我们常...【详细内容】
2024-01-26  crossoverJie  微信公众号  Tags:开源项目   点击:(2)  评论:(0)  加入收藏
RabbitMQ如何实现延迟队列?
延迟队列是指当消息被发送以后,并不是立即执行,而是等待特定的时间后,消费者才会执行该消息。延迟队列的使用场景有以下几种: 未按时支付的订单,30 分钟过期之后取消订单。 给活...【详细内容】
2024-01-26   Java中文社群  微信公众号  Tags:RabbitMQ   点击:(2)  评论:(0)  加入收藏
低代码开发:Nacos配置详解,如何确保平台跳转正常运作
在当今低代码开发的时代,平台的易用性和灵活性非常重要。右上角平台跳转作为用户界面中常见的交互元素,对于提高用户体验具有举足轻重的地位。然而,有时候我们会遇到跳转失效的...【详细内容】
2024-01-25  软件部长    Tags:Nacos   点击:(1)  评论:(0)  加入收藏
不容错过的4款宝藏GPTs:程序员新宠,让编程不再枯燥!
原文来源:硬 AI自从GPT Store“开业”以来,我们就一直在使用不同的GPTs,尝试自动化一些重复繁琐的日常工作。在浩如烟海的GPTs中,虽然真正有用的并不多,很多GPTs的功能都比较局限...【详细内容】
2024-01-24  AI新智界    Tags:GPTs   点击:(4)  评论:(0)  加入收藏
开发者如何使用Postgres扩展,包括AI应用?
作者 | Richard MacManus编译 | 言征一家名为Tembo的公司鼓励开发人员在Postgres之上进行构建,使用越来越多的Postgres扩展。PostgreSQL(通常称为Postgres)是一个流行的免费开...【详细内容】
2024-01-24    51CTO  Tags:Postgres扩展   点击:(6)  评论:(0)  加入收藏
从0实现React18
要从零开始实现React 18,需要理解React的核心概念和一些主要特性。以下是一个简要的步骤:1. 了解React的基本概念: 组件: React应用的基本构建块。组件可以是函数组件(Functional...【详细内容】
2024-01-22  阿小白    Tags:React   点击:(7)  评论:(0)  加入收藏
让你的Pandas代码快得离谱的两个技巧
如果你曾经使用过Pandas处理表格数据,你可能会熟悉导入数据、清洗和转换的过程,然后将其用作模型的输入。然而,当你需要扩展和将代码投入生产时,你的Pandas管道很可能开始崩溃并...【详细内容】
2024-01-19  郭小喵玩AI  微信公众号  Tags:Pandas   点击:(10)  评论:(0)  加入收藏
未来世界的12个软件开发预测
译者 | 李睿审校 | 重楼预测软件开发的未来趋势通常是一件困难的事情。因为人们总是期望软件开发领域中的新兴趋势和频繁的变化能够满足市场不断增长的期望。这样的趋势也将...【详细内容】
2024-01-15    51CTO  Tags:软件开发   点击:(11)  评论:(0)  加入收藏
什么是 DevOps?解读 IT 文化革命的目的和重要性
DevOps 将运维和开发相结合以提供持续的软件改进,可以降低复杂性并提高应用程序输出。 什么是 DevOps?DevOps 是组织用来创建和交付应用程序和服务的灵活实践和流程的集合,通过...【详细内容】
2024-01-12  35岁职场危机  今日头条  Tags:DevOps   点击:(9)  评论:(0)  加入收藏
深入理解与应用多线程技术
如果synchronized​作用于代码块,反编译可以看到两个指令:monitorenter、monitorexit,JVM​使用monitorenter和monitorexit​两个指令实现同步;如果作用synchronized​作用于方...【详细内容】
2024-01-09  一安未来  微信公众号  Tags:多线程技术   点击:(18)  评论:(0)  加入收藏
站内最新
站内热门
站内头条