您当前的位置:首页 > 电脑百科 > 程序开发 > 编程百科

5张图带你理解 RocketMQ 顺序消息实现机制

时间:2022-06-29 09:58:33  来源:  作者:程序那点事

作者:君哥聊技术

来源:
https://mp.weixin.qq.com/s/n9QlZ73SQyCGIyPLvHMy0A

大家好,我是君哥。今天聊一聊 RocketMQ 的顺序消息实现机制。

在有些场景下,使用 MQ 需要保证消息的顺序性,比如在电商系统中,用户提交订单、支付订单、订单出库这 3 个消息应该保证顺序性,如下图:

5张图带你理解 RocketMQ 顺序消息实现机制

 

对于 RocketMQ 来说,主要是通过 Producer 和 Consumer 来保证消息顺序的。

1 Producer

下面代码是 Producer 发送顺序消息的官方示例:

public static void mAIn(String[] args) throws UnsupportedEncodingException {
 try {
  DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
  producer.start();

  String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
  for (int i = 0; i < 100; i++) {
   int orderId = i % 10;
   Message msg =
    new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i,
     ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
   SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
     Integer id = (Integer) arg;
     int index = id % mqs.size();
     return mqs.get(index);
    }
   }, orderId);

   System.out.printf("%s%n", sendResult);
  }

  producer.shutdown();
 } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
  e.printStackTrace();
 }
}

跟发送并发消息不一样的是,发送消息时传入了 MessageQueueSelector,这里可以指定消息发送到固定的 MessageQueue。

注意:上面的代码把 orderId 相同的消息都会发送到同一个 MessageQueue,这样同一个 orderId 的消息是有序的,这也叫做局部有序。对应的另一种是全局有序,这需要把所有的消息都发到同一个 MessageQueue。

下面再来看一下发送的代码:

private SendResult sendSelectImpl(
 Message msg,
 MessageQueueSelector selector,
 Object arg,
 final CommunicationMode communicationMode,
 final SendCallback sendCallback, final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
 //省略部分逻辑
 TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
 if (topicPublishInfo != null && topicPublishInfo.ok()) {
  MessageQueue mq = null;
  try {
   List<MessageQueue> messageQueueList =
    mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList());
   Message userMessage = MessageAccessor.cloneMessage(msg);
   String userTopic = NamespaceUtil.withoutNamespace(userMessage.getTopic(), mQClientFactory.getClientConfig().getNamespace());
   userMessage.setTopic(userTopic);

   mq = mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg));
  } catch (Throwable e) {
   throw new MQClientException("select message queue threw exception.", e);
  }

  //省略部分逻辑
  if (mq != null) {
   return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout - costTime);
  } else {
   throw new MQClientException("select message queue return null.", null);
  }
 }
    //省略部分逻辑
}

可以看到,在发送的时候,使用 MessageQueueSelector 选择一个 MessageQueue,然后发送消息到这个 MessageQueue。对于并发消息,这里不传 MessageQueueSelector,如果发送方法没有指定 MessageQueue,就会按照默认的策略选择一个。

2 Consumer

以 RocketMQ 推模式为例,消费者会注册一个监听器,进行消息的拉取和消费处理,下面的 UML 类图显示了调用关系:

5张图带你理解 RocketMQ 顺序消息实现机制

 

上图中包含了对顺序消息和对并发消息的处理。其中 MessageListenerOrderly 和 ConsumeMessageOrderlyService 对顺序消息进行处理。跟并发消息不一样的是,顺序消息定义了一个 MessageQueueLock 类,这个类保存了每个 MessageQueue 对应的锁,代码如下:

private ConcurrentMap<MessageQueue, Object> mqLockTable = new ConcurrentHashMap<MessageQueue, Object>();

下面代码是顺序消费的官方示例:

public static void main(String[] args) throws MQClientException {
 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");

 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

 consumer.subscribe("TopicTest", "TagA || TagC || TagD");

 consumer.registerMessageListener(new MessageListenerOrderly() {
  AtomicLong consumeTimes = new AtomicLong(0);

  @Override
  public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
   context.setAutoCommit(true);
   System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
   this.consumeTimes.incrementAndGet();
   if ((this.consumeTimes.get() % 2) == 0) {
    return ConsumeOrderlyStatus.SUCCESS;
   } else if ((this.consumeTimes.get() % 5) == 0) {
    context.setSuspendCurrentQueueTimeMillis(3000);
    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
   }

   return ConsumeOrderlyStatus.SUCCESS;
  }
 });

 consumer.start();
 System.out.printf("Consumer Started.%n");
}

下面看一下顺序消息的消费端处理逻辑。

2.1 注册监听

上面的代码定义了顺序消息监听器 MessageListenerOrderly,并且注册到 DefaultMQPushConsumer,这个注册同时也注册到了 DefaultMQPushConsumerImpl。

2.2 PushConsumer 初始化

在 DefaultMQPushConsumerImpl 类初始化的时候,会判断注册的 MessageListener 是不是 MessageListenerOrderly,如果是,就把 consumeOrderly 变量设置为 true,以此来标记是顺序消息拉取还是并发消息拉取。然后把 ConsumeMessageService 初始化为 ConsumeMessageOrderlyService。

2.3 锁定 mq

要保证消息的顺序性,就需要保证同一个 MessageQueue 只能被同一个 Consumer 消费。

ConsumeMessageOrderlyService 初始化的时候,会启动一个定时任务,周期性(默认 20s)地向 Broker 发送锁定消息(请求类型 LOCK_BATCH_MQ),Broker 收到后,就会把 MessageQueue、group 和 clientId 进行绑定,这样其他客户端就不能从这个 MessageQueue 拉取消息。

注意:Broker 锁定是有过期时间的,默认 60s,可以配置,锁定过期后,有可能被其他 Consumer 进行消费。

Broker 端锁结构如下图:

5张图带你理解 RocketMQ 顺序消息实现机制

 

2.4 拉取消息

消费者启动时,启动消费拉取线程 PullMessageService,里面死循环不停地从 Broker 拉取消息。这里调用了 DefaultMQPushConsumerImpl 类的 pullMessage 方法。这里拉取消息的逻辑跟并发消息的逻辑是一样的。

拉取到消息后,调用 PullCallback 的 onSuccess 方法处理结果,这里调用了 ConsumeMessageOrderlyService 的 submitConsumeRequest 方法,里面用线程池提交了 ConsumeRequest 线程。

PullCallback pullCallback = new PullCallback() {
 @Override
 public void onSuccess(PullResult pullResult) {
  if (pullResult != null) {
   pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrApper.processPullResult(pullRequest.getMessageQueue(), pullResult,
    subscriptionData);

   switch (pullResult.getPullStatus()) {
    case FOUND:
     //省略
     if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
      DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
     } else {
      //省略
      boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
      DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
       pullResult.getMsgFoundList(),
       processQueue,
       pullRequest.getMessageQueue(),
       dispatchToConsume);
                        //省略
     }
     //省略
     break;
    //省略
   }
  }
 }
    //省略
};

上面拉取到消息后,先把消息放到了 ProcessQueue,然后调用了 submitConsumeRequest 方法。跟并发消息处理方式不同的是,submitConsumeRequest 方法并没有处理拉取到的消息,而真正处理的时候是从 ProcessQueue 获取。

2.5 处理消息

处理消息的逻辑在 ConsumeMessageOrderlyService 的内部类 ConsumeRequest,这是一个线程类,run 方法如下:

public void run() {
 //省略部分逻辑
 //1.获取到 MessageQueueLock 对应的锁
 final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
 synchronized (objLock) {
  if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
   || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
   final long beginTime = System.currentTimeMillis();
   for (boolean continueConsume = true; continueConsume; ) {
    //省略延后执行的逻辑
    final int consumeBatchSize =
     ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
                //2.从 processQueue 拉取消息
    List<MessageExt> msgs = this.processQueue.takeMessages(consumeBatchSize);
    if (!msgs.isEmpty()) {
     final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue);

     ConsumeOrderlyStatus status = null;
                    //省略部分逻辑
     boolean hasException = false;
     try {
         //3.获取处理锁
      this.processQueue.getConsumeLock().lock();
      //4.执行消费处理逻辑
      status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
     } catch (Throwable e) {
      log.warn(String.format("consumeMessage exception: %s Group: %s Msgs: %s MQ: %s",
       RemotingHelper.exceptionSimpleDesc(e),
       ConsumeMessageOrderlyService.this.consumerGroup,
       msgs,
       messageQueue), e);
      hasException = true;
     } finally {
         //5.释放处理锁
      this.processQueue.getConsumeLock().unlock();
     }
     //省略部分逻辑
     continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
    } else {
     continueConsume = false;
    }
   }
  } else {
   //省略部分逻辑
   ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);
  }
 }
}

上面的代码总结一下,Consumer 消费消息的逻辑如下:

  1. 对 MessageQueueLock 进行加锁,这样就保证只有一个线程在处理当前 MessageQueue;
  2. 从 ProcessQueue 拉取一批消息;
  3. 获取 ProcessQueue 锁,这样保证了只有当前线程可以进行消息处理,同时也可以防止 Rebalance 线程把当前处理的 MessageQueue 移除掉;
  4. 执行消费处理逻辑;
  5. 释放 ProcessQueue 处理锁;6.processConsumeResult 方法更新消息偏移量。

注意:ProcessQueue 中的锁是 ReentrantLock。

3 重试

跟并发消息不一样的是,顺序消息消费失败后并不会把消息发送到 Broker,而是直接在 Consumer 端进行重试,如果重试次数超过了最大重试次数(16 次),则发送到 Broker,Broker 则将消息推入死信队列。如下图:

5张图带你理解 RocketMQ 顺序消息实现机制

 

4 总结

RocketMQ 顺序消息的原理是在 Producer 端把一批需要保证顺序的消息发送到同一个 MessageQueue,Consumer 端则通过加锁的机制来保证消息消费的顺序性,Broker 端通过对 MessageQueue 进行加锁,保证同一个 MessageQueue 只能被同一个 Consumer 进行消费。

根据实现原理可以看到,RocketMQ 的顺序消息可能存在两个坑:

  1. 有顺序性的消息需要发送到同一个 MessageQueue,可能导致单个 MessageQueue 消息量很大,而 Consumer 端消费的时候只能单线程消费,很可能导致当前 MessageQueue 消息积压;
  2. 如果顺序消息 MessageQueue 所在的 broker 挂了,这时 Producer 只能把消息发送到其他 Broker 的 MessageQueue 上,而如果新的 MessageQueue 被其他 Consumer 消费,这样两个 Consumer 消费的消息就不能保证顺序性了。如下图:
5张图带你理解 RocketMQ 顺序消息实现机制

 

Broker1 发生故障,把订单出库的消息发送到了 Broker2,由 Consumer2 来进行消费,消息顺序很可能会错乱。



Tags:RocketMQ   点击:()  评论:()
声明:本站部分内容及图片来自互联网,转载是出于传递更多信息之目的,内容观点仅代表作者本人,不构成投资建议。投资者据此操作,风险自担。如有任何标注错误或版权侵犯请与我们联系,我们将及时更正、删除。
▌相关推荐
大白话设计RocketMQ延迟消息
延迟消息一般用于:提前发送消息,延迟一段时间后才需要被处理的场景。比如:下单半小时后还未支付,则取消订单 释放库存 等。RocketMQ的延迟消息使用上非常便捷,但是不支持任意时间...【详细内容】
2023-12-27  Search: RocketMQ  点击:(108)  评论:(0)  加入收藏
九个问答牢记RocketMQ架构
RocketMQ是Java兄弟们常用的消息中间件,虽说常用,但对于RocketMQ架构经常忘记。究其原因就2点:忙于业务开发然后长时间不看则忘了、不理解架构设计的根本原因记不牢。本文用大...【详细内容】
2023-12-27  Search: RocketMQ  点击:(114)  评论:(0)  加入收藏
如何应对 RocketMQ 消息堆积
这篇文章,我们聊聊如何应对 RocketMQ 消息堆积。图片1 基础概念消费者在消费的过程中,消费的速度跟不上服务端的发送速度,未处理的消息会越来越多,消息出现堆积进而会造成消息消...【详细内容】
2023-12-21  Search: RocketMQ  点击:(73)  评论:(0)  加入收藏
解锁RocketMQ秘籍:如何保障消息顺序性?
嗨,小伙伴们!小米在这里啦!今天我们要聊的话题是社招面试中一个经典而又百思不得其解的问题&mdash;&mdash;“RocketMQ如何保证顺序性?”不用担心,小米来给你揭秘RocketMQ的秘密武...【详细内容】
2023-12-15  Search: RocketMQ  点击:(101)  评论:(0)  加入收藏
Apache RocketMQ 5.0腾讯云落地实践
Apache RocketMQ 发展历程回顾RocketMQ 最早诞生于淘宝的在线电商交易场景,经过了历年双十一大促流量洪峰的打磨,2016年捐献给 Apache 社区,成为 Apache 社区的顶级项目,并在国...【详细内容】
2023-12-13  Search: RocketMQ  点击:(147)  评论:(0)  加入收藏
聊聊 RocketMQ 5.0 的 POP 消费模式!
大家都知道,RocketMQ 消费模式有 PULL 模式和 PUSH 模式,不过本质上都是 PULL 模式,而在实际使用时,一般使用 PUSH 模式。不过,RocketMQ 的 PUSH 模式有明显的不足,主要体现在以下...【详细内容】
2023-05-16  Search: RocketMQ  点击:(305)  评论:(0)  加入收藏
深扒RocketMQ源码之后,我找出了RocketMQ消息重复消费的7种原因
在众多关于MQ的面试八股文中有这么一道题,“如何保证MQ消息消费的幂等性”。为什么需要保证幂等性呢?是因为消息会重复消费。为什么消息会重复消费?明明已经消费了,为什么消息会...【详细内容】
2023-04-13  Search: RocketMQ  点击:(240)  评论:(0)  加入收藏
SpringBoot整合RocketMQ,老鸟们都是这么玩的!
今天我们来讨论如何在项目开发中优雅地使用RocketMQ。本文分为三部分,第一部分实现SpringBoot与RocketMQ的整合,第二部分解决在使用RocketMQ过程中可能遇到的一些问题并解决...【详细内容】
2023-04-12  Search: RocketMQ  点击:(431)  评论:(0)  加入收藏
SpringBoot 与RabbitMQ、RocketMQ高可靠、高性能、分布式应用实践
Spring Boot 是一个基于 Spring 框架的快速开发框架,而 RabbitMQ 和 RocketMQ 则是常用的消息队列中间件。下面是它们常用的一些用法和场景。 订单处理在电商等系统中,下单后...【详细内容】
2023-03-09  Search: RocketMQ  点击:(205)  评论:(0)  加入收藏
通过源码分析RocketMQ主从复制原理
作者:京东物流 宫丙来一、主从复制概述 RocketMQ Broker的主从复制主要包括两部分内容:CommitLog的消息复制和Broker元数据的复制。 CommitLog的消息复制是发生在消息写入时,当...【详细内容】
2023-03-02  Search: RocketMQ  点击:(69)  评论:(0)  加入收藏
▌简易百科推荐
Meta如何将缓存一致性提高到99.99999999%
介绍缓存是一种强大的技术,广泛应用于计算机系统的各个方面,从硬件缓存到操作系统、网络浏览器,尤其是后端开发。对于Meta这样的公司来说,缓存尤为重要,因为它有助于减少延迟、扩...【详细内容】
2024-04-15    dbaplus社群  Tags:Meta   点击:(3)  评论:(0)  加入收藏
SELECT COUNT(*) 会造成全表扫描?回去等通知吧
前言SELECT COUNT(*)会不会导致全表扫描引起慢查询呢?SELECT COUNT(*) FROM SomeTable网上有一种说法,针对无 where_clause 的 COUNT(*),MySQL 是有优化的,优化器会选择成本最小...【详细内容】
2024-04-11  dbaplus社群    Tags:SELECT   点击:(3)  评论:(0)  加入收藏
10年架构师感悟:从问题出发,而非技术
这些感悟并非来自于具体的技术实现,而是关于我在架构设计和实施过程中所体会到的一些软性经验和领悟。我希望通过这些分享,能够激发大家对于架构设计和技术实践的思考,帮助大家...【详细内容】
2024-04-11  dbaplus社群    Tags:架构师   点击:(2)  评论:(0)  加入收藏
Netflix 是如何管理 2.38 亿会员的
作者 | Surabhi Diwan译者 | 明知山策划 | TinaNetflix 高级软件工程师 Surabhi Diwan 在 2023 年旧金山 QCon 大会上发表了题为管理 Netflix 的 2.38 亿会员 的演讲。她在...【详细内容】
2024-04-08    InfoQ  Tags:Netflix   点击:(5)  评论:(0)  加入收藏
即将过时的 5 种软件开发技能!
作者 | Eran Yahav编译 | 言征出品 | 51CTO技术栈(微信号:blog51cto) 时至今日,AI编码工具已经进化到足够强大了吗?这未必好回答,但从2023 年 Stack Overflow 上的调查数据来看,44%...【详细内容】
2024-04-03    51CTO  Tags:软件开发   点击:(9)  评论:(0)  加入收藏
跳转链接代码怎么写?
在网页开发中,跳转链接是一项常见的功能。然而,对于非技术人员来说,编写跳转链接代码可能会显得有些困难。不用担心!我们可以借助外链平台来简化操作,即使没有编程经验,也能轻松实...【详细内容】
2024-03-27  蓝色天纪    Tags:跳转链接   点击:(16)  评论:(0)  加入收藏
中台亡了,问题到底出在哪里?
曾几何时,中台一度被当做“变革灵药”,嫁接在“前台作战单元”和“后台资源部门”之间,实现企业各业务线的“打通”和全域业务能力集成,提高开发和服务效率。但在中台如火如荼之...【详细内容】
2024-03-27  dbaplus社群    Tags:中台   点击:(14)  评论:(0)  加入收藏
员工写了个比删库更可怕的Bug!
想必大家都听说过删库跑路吧,我之前一直把它当一个段子来看。可万万没想到,就在昨天,我们公司的某位员工,竟然写了一个比删库更可怕的 Bug!给大家分享一下(不是公开处刑),希望朋友们...【详细内容】
2024-03-26  dbaplus社群    Tags:Bug   点击:(9)  评论:(0)  加入收藏
我们一起聊聊什么是正向代理和反向代理
从字面意思上看,代理就是代替处理的意思,一个对象有能力代替另一个对象处理某一件事。代理,这个词在我们的日常生活中也不陌生,比如在购物、旅游等场景中,我们经常会委托别人代替...【详细内容】
2024-03-26  萤火架构  微信公众号  Tags:正向代理   点击:(14)  评论:(0)  加入收藏
看一遍就理解:IO模型详解
前言大家好,我是程序员田螺。今天我们一起来学习IO模型。在本文开始前呢,先问问大家几个问题哈~什么是IO呢?什么是阻塞非阻塞IO?什么是同步异步IO?什么是IO多路复用?select/epoll...【详细内容】
2024-03-26  捡田螺的小男孩  微信公众号  Tags:IO模型   点击:(10)  评论:(0)  加入收藏
站内最新
站内热门
站内头条