您当前的位置:首页 > 电脑百科 > 程序开发 > 架构

好奇延时消息如何实现吗?来看RocketMQ是怎么做的-源码解读

时间:2022-08-03 17:08:55  来源:  作者:杂文论

一、背景

之前在做电商相关业务的时候,有一个常见的需求场景是:用户下单之后,超过半小时不支付,就取消订单。现在我们在淘宝京东买东西,或者通过美团点外卖,下单之后,如果不在指定时间内支付,订单也会取消。 那么,如何实现这样的超时取消逻辑呢,通过消息队列的延时消息,是一个非常稳定的实现方案。

RocketMQ 就提供了这样的延时消息功能,producer 端在发送消息时,设置延迟级别,从秒级到分钟小时等等。消息在发送之后,会在消息队列的服务器进行存储。等过了设定的延迟时间之后,消息才会被consumer端消费到。

 

如果我们在下单的时候,发送一条设置延时30分钟的消息,这条消息会在30分钟之后被下游系统消费,然后判断订单有没有支付,如果没有支付,则取消订单。那么这样,通过消息队列就完成了一个延迟取消的逻辑了。

二、原理

设置延时

先来看一下如何设置消息的延时 消息体可以通过setDelayTimeLevel方法来设置延时级别

public void produce() {
    Message msg = new Message("TopicTest",
        "TagA",
        "OrderID188",
        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
    msg.setDelayTimeLevel(1)
    SendResult sendResult = producer.send(msg);
}

public void consume() {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
    consumer.subscribe("TopicTest", "TagA");
    consumer.registerMessageListener(new MessageListenerConcurrently() {

        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
}

其实是将延迟信息存到 Message 的 property 中(property是一个保存meta信息的hashmap)

public void setDelayTimeLevel(int level) {
    this.putProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL, String.valueOf(level));
}

void putProperty(final String name, final String value) {
    if (null == this.properties) {
        this.properties = new HashMap<String, String>();
    }

    this.properties.put(name, value);
}

之后,broker收到 message之后,会根据 message 中设置的延时级别进行处理 可以看看延时级别的具体情况: 一共分为18个级别(1-18),对应时间从1s到2h

public class MessageStoreConfig {
    
    private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

}

那么整个系统是怎么做到让consumer在设定的延时之后,开始消费指定消息的呢?

不得不说,RocketMQ 的设计还是挺巧妙的,我们接着往下看。

消息预存

对于broker收到的延时消息,并不是和普通消息一样,进入发送端指定的topic中, 而是进入专门的延时topic中,延时topic有18条队列(queueId 编号0-17),queueId 和 delayLevel 的关系是 queueId + 1 = delayLevel,是一一对应的。所以计算延时消息的待执行时间deliverTimestamp之后,会将消息存入对应延时级别的队列中。

// 如果是延迟消息
if (msg.getDelayTimeLevel() > 0) {
    if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
        msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
    }
    
    // 重设延迟消息的topic和queueId,topic为指定的RMQ_SYS_SCHEDULE_TOPIC
    topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
    queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

    ...
    // 将实际的指定topic和queueId进行存入property,进行备份
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));        
    msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

    msg.setTopic(topic);
    msg.setQueueId(queueId);
}

之后,会由ScheduleMessageService来进行任务处理。ScheduleMessageService是broker启动时就开始执行的,用来处理延迟队列中的消息,处理的逻辑如下所示。

public class ScheduleMessageService extends ConfigManager {

    // key: delayLevel | value: delayTimeMillis 
    private final ConcurrentMap<Integer, Long> delayLevelTable = new ConcurrentHashMap<Integer, Long>(32);

    public void start() {
        // 创建一个Timer,用于执行定时任务
        this.timer = new Timer("ScheduleMessageTimerThread", true);
            
        // 这里对每个delayLevel的queue都创建一个DeliverDelayedMessageTimerTask,
        // 用来处理对应queue中的消息
        for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
            Integer level = entry.getKey();
            Long timeDelay = entry.getValue();
            Long offset = this.offsetTable.get(level);
            if (null == offset) {
                offset = 0L;
            }

            if (timeDelay != null) {
                this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
            }
        }
    }
    
}

ScheduleMessageService启动之后,会根据延时队列的数目创建一一对应的
DeliverDelayedMessageTimerTask,然后周期执行。该类继承自TimerTask,是JDK的工具类,用于执行定时任务,原理可以参考这篇文章 如何实现定时任务- JAVA Timer/TimerTask 源码原理解析

消息转投

可以看到
DeliverDelayedMessageTimerTask实现的 run 方法,主要逻辑都在executeOnTimeup方法中,从对应的延迟队列中取出时间已到的 message,发送到 message 对应原始topic的队列中。只要队列没有发生消费积压,message 就会马上被消费了。(这部分的代码实现比较复杂,感兴趣可以去看对应的源码)

class DeliverDelayedMessageTimerTask extends TimerTask {
    private final int delayLevel;
    private final long offset;

    public DeliverDelayedMessageTimerTask(int delayLevel, long offset) {
        this.delayLevel = delayLevel;
        this.offset = offset;
    }

    @Override
    public void run() {
        try {
            if (isStarted()) {
                this.executeOnTimeup();
            }
        } catch (Exception e) {
            // XXX: warn and notify me
            log.error("ScheduleMessageService, executeOnTimeup exception", e);
            ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
                this.delayLevel, this.offset), DELAY_FOR_A_PERIOD);
        }
    }

    public void executeOnTimeup() {
        ConsumeQueue cq =
            ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
                delayLevel2QueueId(delayLevel));

        long failScheduleOffset = offset;

        if (cq != null) {
            
            // 这部分是核心逻辑,实现的是 从延时消息队列中取出 deliverTimestamp - now <= 0 的消息,
            // 将消息从延时queue移到原本指定Topic的queue中,这些消息就马上会被consumer消费。
        } 

        ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
            failScheduleOffset), DELAY_FOR_A_WHILE);
    }
}

总体的原理示意图,如下所示:

 

broker 在接收到延时消息的时候,会将延时消息存入到延时TOPIC的队列中,然后ScheduleMessageService中,每个 queue 对应的定时任务会不停地被执行,检查 queue 中哪些消息已到设定时间,然后转发到消息的原始TOPIC,这些消息就会被各自的 producer 消费了。

三、拓展-消费重试

平常在使用RocketMQ的时候,一般会依赖consumer的消费重试功能。 而consumer端的消费重试,其实也是通过这个和延时队列差不多的原理来实现的。

public void consume() {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
    consumer.subscribe("TopicTest", "TagA");
    consumer.registerMessageListener(new MessageListenerConcurrently() {

        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
            // 这里如果返回RECONSUME_LATER,就会重试消费
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
}

RocketMQ规定,以下三种情况统一按照消费失败处理并会发起重试。

  • 业务消费方返回ConsumeConcurrentlyStatus.RECONSUME_LATER
  • 业务消费方返回null
  • 业务消费方主动/被动抛出异常

业务代码中,一般会利用重试功能去做下游逻辑的重试。而RocketMQ的重试并不是固定时间间隔重复进行,二是采取的退避式重试,重试的时间间隔会不断变长。 这个时间间隔,和设置delayLevel的延时类似。

Consumer客户端会通过processConsumeResult方法处理每一条消息的消费结果,如果判断需要进行重试,则会通过sendMessageBack方法将消息发送到broker,重试消息会带上已重试次数的信息。

broker收到消息之后,SendMessageProcessor会对重试消息进行处理,设置topic为RETRY_TOPIC,具体逻辑如下:

public class SendMessageProcessor
    extends AbstractSendMessageProcessor
    implements.NETtyRequestProcessor {

    private RemotingCommand asyncConsumerSendMsgBack(ChannelHandlerContext ctx, RemotingCommand request)
        throws RemotingCommandException {

        // 给重试消息设置新的topic
        String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());
        int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums();

        // 根据已经发生重试的次数确定delayLevel
        if (0 == delayLevel) {
            delayLevel = 3 + msgExt.getReconsumeTimes();
        }
        msgExt.setDelayTimeLevel(delayLevel);

        // 重试次数+1
        msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);

        // 存储消息
        PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);

        // ...
    }   
}

存储消息的时候,CommitLog.putMessage方法内部判断如果设置了delayLevel,就会重设topic为SCHEDULE_TOPIC,然后将消息存储到延时队列中,后续就是和ScheduleMessageService的逻辑相同。

整个消息重试的逻辑示意图如下所示:

 

如图所示

  1. Consumer在消费的时候,都会订阅指定的TOPIC-NORMAL_TOPIC和该ConsumerGroup对应的重试TOPIC-RETRY_GROUP1_TOPIC,同时消费来自这两个topic中的消息。
  2. 当发生消费失败后,Client端会调用sendMessageBack方法将失败消息发送回broker。
  3. broker端的SendMessageProcessor会根据当前的重试次数确定延时级别,将消息存入延时队列-SCHEDULE_TOPIC中。
  4. ScheduleMessageService会将到期的消息重新发送到重试TOPIC-RETRY_GROUP1_TOPIC中,这个时候消息被Consumer消费,就完成了整个重试过程。

可以对比之前的延时消息流程去看,其实重试消息就是将失败的消息当作延时消息进行处理,只不过最后投入的是专门的重试消息队列中。

四、总结

延时消息都是非常日常业务使用中很重要的功能,而RocketMQ通过时间片分级+多队列+定时任务,就实现了这样的功能,设计上是很巧妙的。并且消费重试采用退避式的策略,重试时间的梯度刚好与延时消息策略一致,这样就可以直接利用延时队列去完成消息重试的功能,从策略上来说非常合理(消息消费重复失败,在短时间内重试成功的可能性比较低),并且复用了底层代码,这些是值得去学习和借鉴的。



Tags:RocketMQ   点击:()  评论:()
声明:本站部分内容及图片来自互联网,转载是出于传递更多信息之目的,内容观点仅代表作者本人,如有任何标注错误或版权侵犯请与我们联系(Email:2595517585@qq.com),我们将及时更正、删除,谢谢。
▌相关推荐
消息队列中的消息消费时并不能保证总是成功的,那失败的消息该怎么进行消息补偿呢?这就用到今天的主角消息重试和死信队列了。生产者消息重试有时因为网路等原因生产者也可能发...【详细内容】
2022-10-07  Tags: RocketMQ  点击:(20)  评论:(0)  加入收藏
一、背景之前在做电商相关业务的时候,有一个常见的需求场景是:用户下单之后,超过半小时不支付,就取消订单。现在我们在淘宝京东买东西,或者通过美团点外卖,下单之后,如果不在指定时...【详细内容】
2022-08-03  Tags: RocketMQ  点击:(50)  评论:(0)  加入收藏
大量业务使用消息中间件进行系统间的解耦、异步化、削峰填谷设计实现。公司内部前期基于RabbitMQ实现了一套高可用的消息中间件平台。随着业务的持续增长,消息体量随之增大,对...【详细内容】
2022-07-28  Tags: RocketMQ  点击:(69)  评论:(0)  加入收藏
刷盘策略CommitLog的asyncPutMessage方法中可以看到在写入消息之后,调用了submitFlushRequest方法执行刷盘策略:public class CommitLog { public CompletableFuture<PutMe...【详细内容】
2022-07-06  Tags: RocketMQ  点击:(68)  评论:(0)  加入收藏
作者:君哥聊技术来源: https://mp.weixin.qq.com/s/n9QlZ73SQyCGIyPLvHMy0A大家好,我是君哥。今天聊一聊 RocketMQ 的顺序消息实现机制。在有些场景下,使用 MQ 需要保证消息的顺...【详细内容】
2022-06-29  Tags: RocketMQ  点击:(73)  评论:(0)  加入收藏
前置掌握:SpringBoot基础使用、RocketMQ和SpringBoot的整合使用源码地址:gitee.com/tianxincode&hellip; 文章只会说明核心代码,其他的基础整合配置和多环境自动隔离参考源码即...【详细内容】
2022-06-22  Tags: RocketMQ  点击:(122)  评论:(0)  加入收藏
@TOCApache 上开源官方地址: https://rocketmq.apache.org/GitHub 托管地址: https://github.com/apache/rocketmq阿里官方的介绍文档: http://jm.taobao.org/2017/01/12/rocke...【详细内容】
2021-12-07  Tags: RocketMQ  点击:(312)  评论:(0)  加入收藏
消息中间件的应用场景 异步解耦 削峰填谷 顺序收发 分布式事务一致性腾讯应用案例: 主流 MQ 框架及对比 说明 Kafka:整个行业应用广泛 RocketMQ:阿里,从 apache 孵化 Pulsar:雅...【详细内容】
2021-01-26  Tags: RocketMQ  点击:(261)  评论:(0)  加入收藏
今年双十一大促中,消息中间件 RocketMQ 发生了以下几个方面的变化: 云原生化实践。完成运维层面的云原生化改造,实现 Kubernetes 化。 性能优化。消息过滤优化交易集群性能提升...【详细内容】
2020-12-25  Tags: RocketMQ  点击:(225)  评论:(0)  加入收藏
本文主要分如下几个部分展开:- Linux服务器安装RocketMQ、RocketMQ-Console- IDEA中搭建可调试环境 1、Linux安装RocketMQ、RocketMQ-Console 1.1安装RocketMQStep1:从如下地...【详细内容】
2020-10-12  Tags: RocketMQ  点击:(114)  评论:(0)  加入收藏
▌简易百科推荐
数据一致性前面总结了微服务的9个痛点,有些痛点没有好的解决方案,而有些痛点是有对策的,从本章开始,就来讲解某些痛点对应的解决方案。这一章先解决数据一致性的问题,先来看一个...【详细内容】
2022-10-23  互联共商     Tags:微服务   点击:(1)  评论:(0)  加入收藏
前导近期有个同事跟我说遇到一件很奇怪的事情,时不时收到售后反馈说 部分用户无法接收到聊天室(WebSocket 服务)消息,然而在测试服以各种方式测试都无法复现这种现象。于是陷...【详细内容】
2022-10-21  raylin666  今日头条  Tags:WebSocket   点击:(11)  评论:(0)  加入收藏
引言笔者在经历的的一些项目中都使用了 DDD 领域驱动设计进行架构设计,尤其是在业务梳理、中台规划以及微服务划分等方面,DDD 是重要的架构设计方法论,对业务领域建模、微服务...【详细内容】
2022-10-20  宫心职场攻略   网易号  Tags:DDD   点击:(5)  评论:(0)  加入收藏
微服务的痛点在产品研发过程中,引入一种技术来解决一个业务问题并不难,难的是能否合理评估技术风险,这个观点对微服务同样适用。因此,本节将专门讨论微服务会带来哪些问题,这部分...【详细内容】
2022-10-17  大数据架构师  今日头条  Tags:微服务   点击:(2)  评论:(0)  加入收藏
译者 | 布加迪数据在急剧增多。全球每天生成的数据量三年后将达到463 EB。相比之下,人类迄今生成的所有单词量估计总共也就5 EB。为了在当今的数字经济下取得成功,许多企业在...【详细内容】
2022-10-08    51CTO  Tags:架构   点击:(14)  评论:(0)  加入收藏
大家好!在本手册中,您将了解软件架构这一广阔而复杂的领域。当我第一次开始编码之旅时,我发现这是一个既令人困惑又令人生畏的领域。所以我会尽量避免你的困惑。在这本手册中,我...【详细内容】
2022-10-08  一个即将被退役的码农  今日头条  Tags:软件架构   点击:(5)  评论:(0)  加入收藏
消息队列中的消息消费时并不能保证总是成功的,那失败的消息该怎么进行消息补偿呢?这就用到今天的主角消息重试和死信队列了。生产者消息重试有时因为网路等原因生产者也可能发...【详细内容】
2022-10-07  索码理  稀土掘金  Tags:RocketMq   点击:(20)  评论:(0)  加入收藏
访问者模式:从介绍到实践百万级高并发WebRTC流媒体服务器设计与开发download:https://www.zxit666.com/1305/01什么是访客模式?访问者模式的定义如下,是指在不改变数据结构的情...【详细内容】
2022-10-07  石哥学长  网易号  Tags:WebRTC   点击:(19)  评论:(0)  加入收藏
在云原生架构出现之前,大家谈论最多的是微服务架构。有的企业可能只有一种架构,有的企业经历过多种架构的演变。架构的选择与企业当前所处的阶段有很大关系,好的架构都是为了解...【详细内容】
2022-10-06  大数据推荐杂谈  今日头条  Tags:架构   点击:(18)  评论:(0)  加入收藏
本期推荐的优质项目是基于 Java开发的微服务聚合网关,是拥有自主知识产权的应用网关国产化替代方案,能够实现热服务编排聚合、自动授权选择、线上服务脚本编码、在线测试、高...【详细内容】
2022-10-06  硕宇精选开源  今日头条  Tags:微服务   点击:(34)  评论:(0)  加入收藏
站内最新
站内热门
站内头条