您当前的位置:首页 > 电脑百科 > 程序开发 > 架构

RocketMq 消息重试机制、死信队列

时间:2022-10-07 20:07:03  来源:稀土掘金  作者:索码理

消息队列中的消息消费时并不能保证总是成功的,那失败的消息该怎么进行消息补偿呢?这就用到今天的主角消息重试和死信队列了。

生产者消息重试

有时因为网路等原因生产者也可能发送消息失败,也会进行消息重试,生产者消息重试比较简单,在springboot中只要在配置文件中配置一下就可以了。

# 异步消息发送失败重试次数,默认为2
rocketmq.producer.retry-times-when-send-async-fAIled=2
# 消息发送失败重试次数,默认为2
rocketmq.producer.retry-times-when-send-failed=2

也可以通过下面这种方式配置

DefaultMQProducer defaultMQProducer = new DefaultMQProducer();
defaultMQProducer.setRetryTimesWhenSendFailed(2);
defaultMQProducer.setRetryTimesWhenSendAsyncFailed(2);

消费者消息重试

Apache RocketMQ 有两种消费模式:集群消费模式和广播消费模式。消息重试只针对集群消费模式生效;广播消费模式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息。

同时RocketMq Push消费提供了两种消费方式:并发消费和顺序消费。

并发消费

在并发消费中,可能会有多个线程同时消费一个队列的消息,因此即使发送端通过发送顺序消息保证消息在同一个队列中按照FIFO的顺序,也无法保证消息实际被顺序消费,所有并发消费也可以称之为无序消费。

顺序消费

顺序消费是消息生产者发送过来的消息会遵循FIFO队列的思想,先进先出有顺序的消费消息。 对于顺序消息,当消费者消费消息失败后,消息队列 RocketMQ 会自动不断进行消息重试(每次间隔时间为 1 秒),这时,应用会出现消息消费被阻塞的情况。因此,在使用顺序消息时,务必保证应用能够及时监控并处理消费失败的情况,避免阻塞现象的发生。

并发消费和顺序消费区别

顺序消费和并发消费的重试机制并不相同,顺序消费消费失败后会先在客户端本地重试直到最大重试次数,这样可以避免消费失败的消息被跳过,消费下一条消息而打乱顺序消费的顺序,而并发消费消费失败后会将消费失败的消息重新投递回服务端,再等待服务端重新投递回来,在这期间会正常消费队列后面的消息。

并发消费失败后并不是投递回原Topic,而是投递到一个特殊Topic,其命名为%RETRY%ConsumerGroupName,集群模式下并发消费每一个ConsumerGroup会对应一个特殊Topic,并会订阅该Topic。

两者参数差别如下

消费类型

重试间隔

最大重试次数

顺序消费

间隔时间可通过自定义设置,
SuspendCurrentQueueTimeMillis

最大重试次数可通过自定义参数MaxReconsumeTimes取值进行配置。该参数取值无最大限制。若未设置参数值,默认最大重试次数为Integer.MAX

并发消费

间隔时间根据重试次数阶梯变化,取值范围:1秒~2小时。不支持自定义配置

最大重试次数可通过自定义参数MaxReconsumeTimes取值进行配置。默认值为16次,该参数取值无最大限制,建议使用默认值

并发消费重试间隔如下:

第几次重试

与上次重试的间隔时间

第几次重试

与上次重试的间隔时间

1

10s

9

7min

2

30s

10

8min

3

1min

11

9min

4

2min

12

10min

5

3min

13

20min

6

4min

14

30min

7

5min

15

1h

8

6min

16

2h

死信队列

当一条消息初次消费失败,RocketMQ会自动进行消息重试,达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息。此时,该消息不会立刻被丢弃,而是将其发送到该消费者对应的特殊队列中,这类消息称为死信消息(Dead-Letter Message),存储死信消息的特殊队列称为死信队列(Dead-Letter Queue),死信队列是死信Topic下分区数唯一的单独队列。如果产生了死信消息,那对应的ConsumerGroup的死信Topic名称为%DLQ%ConsumerGroupName,死信队列的消息将不会再被消费。可以利用RocketMQ Admin工具或者RocketMQ Dashboard上查询到对应死信消息的信息。

实践出真知

Talk is cheap,show you the code.

公共部分创建

1.配置文件

rocketmq.name-server=localhost:9876
# 消费者组
rocketmq.producer.group=producer_group

rocketmq.consumer.topic=consumer_topic
rocketmq.consumer.group=consumer_group

2.创建消费者RetryConsumerDemo

@Component
public class RetryConsumerDemo {

    @Value("${rocketmq.name-server}")
    private String namesrvAddr;

    @Value("${rocketmq.consumer.topic}")
    private String topic;

    @Value("${rocketmq.consumer.group}")
    private String consumerGroup;

    private final DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");

    @PostConstruct
    public void start() {
        try {
            consumer.setNamesrvAddr(namesrvAddr);
            //设置集群消费模式
            consumer.setMessageModel(MessageModel.CLUSTERING);

            //设置消费超时时间(分钟)
            consumer.setConsumeTimeout(1);

            //订阅主题
            consumer.subscribe(topic , "*");

            //注册消息监听器
            consumer.registerMessageListener(new MessageListenerConcurrentlyImpl());

            //最大重试次数
            consumer.setMaxReconsumeTimes(2);

            //启动消费端
            consumer.start();
            System.out.println("Retry Consumer Start...");
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }
}

测试并发消费

1.创建并发消费监听类 并发消费监听类要实现
MessageListenerConcurrently类

public class MessageListenerConcurrentlyImpl implements MessageListenerConcurrently {

    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {

        if (CollectionUtils.isEmpty(msgs)) {
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }

        MessageExt message = msgs.get(0);
        try {
            final LocalDateTime now = LocalDateTime.now();
            //逐条消费
            String messageBody = new String(message.getBody(), StandardCharsets.UTF_8);
            System.out.println("当前时间:"+now+", messageId: " + message.getMsgId() + ",topic: " +
                    message.getTopic()  + ",messageBody: " + messageBody);

            //模拟消费失败
            if ("Concurrently_test".equals(messageBody)) {
                int a = 1 / 0;
            }

            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        } catch (Exception e) {
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
    }
}

2.注册监听类 在消费者类RetryConsumerDemo中注册监听类

//注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrentlyImpl());

3.测试

@RunWith(SpringRunner.class)
@SpringBootTest(classes = RocketmqApplication.class)
class RocketmqApplicationTests {

    @Value("${rocketmq.consumer.topic}")
    private String topic;
    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @Test
    public void testProducer(){
        String msg = "Concurrently_test";
        rocketMQTemplate.convertAndSend(topic , msg);
    }
}

测试结果:

 

后面重试时间太长就不做测试了,可以看到并发消费的消息时间都是按照上面那张时间间隔表来。

然后通过RocketMq Dashboard Topic一栏可以看到有一个重试消费者组%RETRY%consumer_group,这个消费者组内存放的就是consumer_group消费者组消费失败重试的消息。

 

并发消费的重试次数是可以修改的,重试次数对应参数DefaultMQPushConsumer类的maxReconsumeTimes属性,maxReconsumeTimes默认是-1,也就是默认会重试16次;0代表不重试,只要失败就会放入死信队列;1-16重试次数对应着上面时间间隔表中对应次数。配置的最大重试次数超过16就按16处理。

并发消费状态

并发消费有两个状态CONSUME_SUCCESS和RECONSUME_LATER。返回CONSUME_SUCCESS代表着消费成功,返回RECONSUME_LATER代表进行消息重试。

public enum ConsumeConcurrentlyStatus {
    /**
     * Success consumption
     */
    CONSUME_SUCCESS,
    /**
     * Failure consumption,later try to consume
     */
    RECONSUME_LATER;
}


MessageListenerConcurrently接口的consumeMessage方法返回ConsumeConcurrentlyStatus#RECONSUME_LATER、null或者方法抛异常了,都会进行消息重试。当然还是推荐返回ConsumeConcurrentlyStatus#RECONSUME_LATER。

测试顺序消费

顺序消费和并行消费其实都差不多的,只不过顺序消费实现的是MessageListenerOrderly 接口

1.创建顺序消费监听类

public class MessageListenerOrderlyImpl implements MessageListenerOrderly {

    @Override
    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
        if (CollectionUtils.isEmpty(msgs)) {
            return ConsumeOrderlyStatus.SUCCESS;
        }

        MessageExt message = msgs.get(0);
        try {
            final LocalDateTime now = LocalDateTime.now();
            //逐条消费
            String messageBody = new String(message.getBody(), StandardCharsets.UTF_8);
            System.out.println("当前时间:"+now+", messageId: " + message.getMsgId() + ",topic: " +
                    message.getTopic()  + ",messageBody: " + messageBody);

            //模拟消费失败
            if ("Orderly_test".equals(messageBody)) {
                int a = 1 / 0;
            }

            return ConsumeOrderlyStatus.SUCCESS;
        } catch (Exception e) {
            return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
        }
    }
}

2.注册监听类

//最大重试次数
consumer.setMaxReconsumeTimes(2);
//顺序消费 重试时间间隔
consumer.setSuspendCurrentQueueTimeMillis(2000);


SuspendCurrentQueueTimeMillis表示重试的时间间隔,默认是1s,这里修改成2s

3.测试

@RunWith(SpringRunner.class)
@SpringBootTest(classes = RocketmqApplication.class)
class RocketmqApplicationTests {

    @Value("${rocketmq.consumer.topic}")
    private String topic;
    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @Test
    public void testProducer(){
        String msg = "Orderly_test";
        rocketMQTemplate.convertAndSend(topic , msg);
    }
}

测试结果:

 

可以看到三条结果,第一条是第一次消费的,其余两条是隔了2s重试的。重试2次之后这条数据就进入了死信队列。

顺序消费状态

顺序消费目前也是两个状态:SUCCESS和
SUSPEND_CURRENT_QUEUE_A_MOMENT。SUSPEND_CURRENT_QUEUE_A_MOMENT意思是先暂停消费一下,过SuspendCurrentQueueTimeMillis时间间隔后再重试一下,而不是放到重试队列里。

public enum ConsumeOrderlyStatus {
    /**
     * Success consumption
     */
    SUCCESS,
    /**
     * Rollback consumption(only for binlog consumption)
     */
    @Deprecated
    ROLLBACK,
    /**
     * Commit offset(only for binlog consumption)
     */
    @Deprecated
    COMMIT,
    /**
     * Suspend current queue a moment
     */
    SUSPEND_CURRENT_QUEUE_A_MOMENT;
}

测试死信队列

并发消费和顺序消费达到了最大重试次数之后就会放到死信队列。死信队列在一开始是不会被创建的,只有需要的时候才会被创建。就拿上面测试结果来看,进入到的死信队列就是%DLQ%consumer_group,进入死信队列的消息要收到处理。

 

死信队列特性

  • 不会再被消费者正常消费。
  • 一个死信队列对应一个分组, 而不是对应单个消费者实例。
  • 如果一个消费者组未产生死信消息,消息队列 RocketMQ 不会为其创建相应的死信队列。
  • 一个死信队列包含了对应 分组产生的所有死信消息,不论该消息属于哪个 Topic。
  • 有效期与正常消息相同,均为 3 天,3 天后会被自动删除。因此,请在死信消息产生后的 3 天内及时处理

作者:索码理
链接:
https://juejin.cn/post/7151571345199874084
来源:稀土掘金



Tags:RocketMq   点击:()  评论:()
声明:本站部分内容及图片来自互联网,转载是出于传递更多信息之目的,内容观点仅代表作者本人,不构成投资建议。投资者据此操作,风险自担。如有任何标注错误或版权侵犯请与我们联系,我们将及时更正、删除。
▌相关推荐
如何应对 RocketMQ 消息堆积
这篇文章,我们聊聊如何应对 RocketMQ 消息堆积。图片1 基础概念消费者在消费的过程中,消费的速度跟不上服务端的发送速度,未处理的消息会越来越多,消息出现堆积进而会造成消息消...【详细内容】
2023-12-21  Search: RocketMq  点击:(71)  评论:(0)  加入收藏
Apache RocketMQ 5.0腾讯云落地实践
Apache RocketMQ 发展历程回顾RocketMQ 最早诞生于淘宝的在线电商交易场景,经过了历年双十一大促流量洪峰的打磨,2016年捐献给 Apache 社区,成为 Apache 社区的顶级项目,并在国...【详细内容】
2023-12-13  Search: RocketMq  点击:(134)  评论:(0)  加入收藏
聊聊 RocketMQ 5.0 的 POP 消费模式!
大家都知道,RocketMQ 消费模式有 PULL 模式和 PUSH 模式,不过本质上都是 PULL 模式,而在实际使用时,一般使用 PUSH 模式。不过,RocketMQ 的 PUSH 模式有明显的不足,主要体现在以下...【详细内容】
2023-05-16  Search: RocketMq  点击:(303)  评论:(0)  加入收藏
40个定时任务!这次带你彻底理解 RocketMQ 设计精髓!
今天来分享 RocketMQ 的定时任务。通过这些定时任务,能让我们更加理解 RocketMQ 的消息处理机制和设计理念。从 RocketMQ 4.9.4 的源代码上看,RocketMQ 的定时任务有很多,今天...【详细内容】
2023-01-29  Search: RocketMq  点击:(207)  评论:(0)  加入收藏
RocketMQ 存储设计到底强在哪?
引言对于一款消息中间件来说,优良的数据存储设计,是实现高性能消息吞吐以及消息查询的关键所在。因为消息中间件对于外部来说就是发消息消费消息的一个平台基础设施,但是从其本...【详细内容】
2022-10-27  Search: RocketMq  点击:(384)  评论:(0)  加入收藏
RocketMq 消息重试机制、死信队列
消息队列中的消息消费时并不能保证总是成功的,那失败的消息该怎么进行消息补偿呢?这就用到今天的主角消息重试和死信队列了。生产者消息重试有时因为网路等原因生产者也可能发...【详细内容】
2022-10-07  Search: RocketMq  点击:(288)  评论:(0)  加入收藏
5张图带你理解 RocketMQ 顺序消息实现机制
作者:君哥聊技术来源: https://mp.weixin.qq.com/s/n9QlZ73SQyCGIyPLvHMy0A大家好,我是君哥。今天聊一聊 RocketMQ 的顺序消息实现机制。在有些场景下,使用 MQ 需要保证消息的顺...【详细内容】
2022-06-29  Search: RocketMq  点击:(302)  评论:(0)  加入收藏
▌简易百科推荐
对于微服务架构监控应该遵守的原则
随着软件交付方式的变革,微服务架构的兴起使得软件开发变得更加快速和灵活。在这种情况下,监控系统成为了微服务控制系统的核心组成部分。随着软件的复杂性不断增加,了解系统的...【详细内容】
2024-04-03  步步运维步步坑    Tags:架构   点击:(5)  评论:(0)  加入收藏
大模型应用的 10 种架构模式
作者 | 曹洪伟在塑造新领域的过程中,我们往往依赖于一些经过实践验证的策略、方法和模式。这种观念对于软件工程领域的专业人士来说,已经司空见惯,设计模式已成为程序员们的重...【详细内容】
2024-03-27    InfoQ  Tags:架构模式   点击:(13)  评论:(0)  加入收藏
哈啰云原生架构落地实践
一、弹性伸缩技术实践1.全网容器化后一线研发的使用问题全网容器化后一线研发会面临一系列使用问题,包括时机、容量、效率和成本问题,弹性伸缩是云原生容器化后的必然技术选择...【详细内容】
2024-03-27  哈啰技术  微信公众号  Tags:架构   点击:(10)  评论:(0)  加入收藏
DDD 与 CQRS 才是黄金组合
在日常工作中,你是否也遇到过下面几种情况: 使用一个已有接口进行业务开发,上线后出现严重的性能问题,被老板当众质疑:“你为什么不使用缓存接口,这个接口全部走数据库,这怎么能扛...【详细内容】
2024-03-27  dbaplus社群    Tags:DDD   点击:(12)  评论:(0)  加入收藏
高并发架构设计(三大利器:缓存、限流和降级)
软件系统有三个追求:高性能、高并发、高可用,俗称三高。本篇讨论高并发,从高并发是什么到高并发应对的策略、缓存、限流、降级等。引言1.高并发背景互联网行业迅速发展,用户量剧...【详细内容】
2024-03-13    阿里云开发者  Tags:高并发   点击:(6)  评论:(0)  加入收藏
如何判断架构设计的优劣?
架构设计的基本准则是非常重要的,它们指导着我们如何构建可靠、可维护、可测试的系统。下面是这些准则的转换表达方式:简单即美(KISS):KISS原则的核心思想是保持简单。在设计系统...【详细内容】
2024-02-20  二进制跳动  微信公众号  Tags:架构设计   点击:(36)  评论:(0)  加入收藏
详解基于SpringBoot的WebSocket应用开发
在现代Web应用中,实时交互和数据推送的需求日益增长。WebSocket协议作为一种全双工通信协议,允许服务端与客户端之间建立持久性的连接,实现实时、双向的数据传输,极大地提升了用...【详细内容】
2024-01-30  ijunfu  今日头条  Tags:SpringBoot   点击:(19)  评论:(0)  加入收藏
PHP+Go 开发仿简书,实战高并发高可用微服务架构
来百度APP畅享高清图片//下栽のke:chaoxingit.com/2105/PHP和Go语言结合,可以开发出高效且稳定的仿简书应用。在实现高并发和高可用微服务架构时,我们可以采用一些关键技术。首...【详细内容】
2024-01-14  547蓝色星球    Tags:架构   点击:(115)  评论:(0)  加入收藏
GraalVM与Spring Boot 3.0:加速应用性能的完美融合
在2023年,SpringBoot3.0的发布标志着Spring框架对GraalVM的全面支持,这一支持是对Spring技术栈的重要补充。GraalVM是一个高性能的多语言虚拟机,它提供了Ahead-of-Time(AOT)编...【详细内容】
2024-01-11    王建立  Tags:Spring Boot   点击:(124)  评论:(0)  加入收藏
Spring Boot虚拟线程的性能还不如Webflux?
早上看到一篇关于Spring Boot虚拟线程和Webflux性能对比的文章,觉得还不错。内容较长,抓重点给大家介绍一下这篇文章的核心内容,方便大家快速阅读。测试场景作者采用了一个尽可...【详细内容】
2024-01-10  互联网架构小马哥    Tags:Spring Boot   点击:(118)  评论:(0)  加入收藏
站内最新
站内热门
站内头条