您当前的位置:首页 > 电脑百科 > 程序开发 > 编程百科

消息队列如何保证不重复消费消息

时间:2023-08-23 14:03:48  来源:今日头条  作者:java小悠

问题背景

当使用消息队列时,客户端重复消费可能会成为一个严重的问题。 这是因为消息队列具有持久性和可靠性的特性,确保消息能够被成功传递给消费者。然而,这也会导致客户端在某些情况下重复消费消息,例如网络故障、客户端崩溃、消息处理失败等情况。

为了避免这种情况发生,需要在客户端实现一些机制来确保消息不会被重复消费,例如记录消费者已经处理的消息 ID、使用分布式锁来控制消费进程的唯一性等。这些机制能够保证消息被成功处理,同时也能够提高系统的可靠性和稳定性。

今天的文章我们将探讨如何确保消息队列中的消息不会被重复消费,下文将以 RocketMQ 为例说明。

消息幂等性

消息中间件是分布式系统中常用的组件,它具有广泛的应用价值,例如实现异步化、解耦、削峰等功能。通常情况下,我们认为消息中间件是一个可靠的组件。这里的可靠性指的是,只要消息被成功投递到了消息中间件,它就不会丢失,至少能够被消费者成功消费一次。这是消息中间件最基本的特性之一,也就是我们通常所说的 “AT LEAST ONCE”,即消息至少会被成功消费一遍。

举个例子,假设一个消息M被发送到消息中间件并被消费程序A接收到,A开始消费这个消息,但是在消费过程中程序重启了。由于这个消息没有被标记为已经被消费成功,消息中间件会持续地将这个消息投递给消费者,直到消息被成功消费为止。

然而,这种可靠性特性也会导致消息被多次投递的情况。举个例子,仍然以之前的例子为例,如果消费程序A接收并完成消息M的消费逻辑后,正准备通知消息中间件“我已经消费成功了”,但在此之前程序A又重启了,那么对于消息中间件来说,这个消息M并没有被成功消费过,因此消息中间件会继续投递这个消息。而对于消费程序A来说,尽管它已经成功消费了这个消息,但由于程序重启导致消息中间件继续投递,看起来就好像这个消息还没有被成功消费过一样。

在 RockectMQ 的场景中,这意味着同一个 messageId 的消息会被重复投递。由于消息的可靠投递是更重要的,所以避免消息重复投递的任务转移给了应用程序自身来实现。这也是 RocketMQ 文档强调消费逻辑需要自行实现幂等性的原因。实际上,这背后的逻辑是:在分布式场景下,保证消息不丢和避免消息重复投递是矛盾的,但是消息重复投递是可以解决的,而消息丢失则非常麻烦。

幂等设计

让我们先来了解一下邮件消息的发送流程,以便更好了解消息队列幂等工作原理。

 

正如我们在之前提到的,RocketMQ 遵循 "AT LEAST ONCE" 语义,这意味着消息可能会被重复消费。在发送邮件消息的情况下,由于消息可能被重复消费,我们需要保证幂等性,以确保邮件不会被重复发送。

1. 消息发送逻辑

下面这块代码是 12306 支付结果回调订单逻辑实现,通过 RocketMQMessageListener 监听并消费 RocketMQ 消息。

JAVA复制代码@Slf4j
@Component
@RequiredArgsConstructor
@RocketMQMessageListener(
        topic = OrderRocketMQConstant.PAY_GLOBAL_TOPIC_KEY,
        selectorExpression = OrderRocketMQConstant.PAY_RESULT_CALLBACK_ORDER_TAG_KEY,
        consumerGroup = OrderRocketMQConstant.PAY_RESULT_CALLBACK_ORDER_CG_KEY
)
public class PayResultCallbackOrderConsumer implements RocketMQListener<MessageWrApper<PayResultCallbackOrderEvent>> {

    private final OrderService orderService;

    
    @Transactional(rollbackFor = Exception.class)
    @Override
    public void onMessage(MessageWrapper<PayResultCallbackOrderEvent> message) {
        PayResultCallbackOrderEvent payResultCallbackOrderEvent = message.getMessage();
        OrderStatusReversalDTO orderStatusReversalDTO = OrderStatusReversalDTO.builder()
                .orderSn(payResultCallbackOrderEvent.getOrderSn())
                .orderStatus(OrderStatusEnum.ALREADY_PAID.getStatus())
                .build();
        orderService.statusReversal(orderStatusReversalDTO);
        orderService.payCallbackOrder(payResultCallbackOrderEvent);
    }
}

2. 幂等处理逻辑

下述方案的优点在于,使用 redis 消息去重表,不依赖事务,针对消息表本身做了状态的区分:消费中、消费完成。

如果消息已经在消费中,抛出异常,消息会触发延迟消费,在 RocketMQ 的场景下即发送到 RETRY TOPIC。

 

通过该方案可以解决什么问题?

  1. 消息已经消费成功了,第二条消息将被直接幂等处理掉(消费成功)。
  2. 并发场景下的消息,依旧能满足不会出现消息重复,即穿透幂等挡板的问题。
  3. 支持上游业务生产者重发的业务重复的消息幂等问题。

为什么要给初始化的幂等标识新增 10 分钟过期时间?

在并发场景下,我们使用消息状态来实现并发控制,以使第二条消息被不断延迟消费(即重试)。但如果在此期间第一条消息也因某些异常原因(例如机器重启或外部异常)未成功消费,该怎么办呢?因为每次查询时都会显示消费中的状态,所以延迟消费会一直进行下去,直到最终被视为消费失败并被投递到死信 Topic 中(RocketMQ 默认最多可以重复消费 16 次)。

针对这个问题,我们采取了一种解决方案:在插入消息表时,必须为每条消息设置一个最长消费过期时间,例如 10 分钟。这意味着,如果某个消息在消费过程中超过了 10 分钟,就会被视为消费失败并从消息表中删除。

抽象幂等通用组件

为了解决消息队列中的重复消费问题,我们可以设计一套通用的消息队列幂等组件。这个组件可以被各个应用程序使用,以确保它们的消费逻辑是幂等的。这种通用的幂等组件可以使应用程序不必为了解决重复消费问题而浪费精力和时间,从而更专注于业务逻辑的实现。

在企业项目中,使用 MySQL 作为幂等去重表的情况比较少见,因此在代码中只提供了 Redis 实现方案。

1. 定义幂等注解

我们提供了一种通用的幂等注解,该注解可用于 RestAPI 和消息队列消息防重复场景。

java复制代码@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Idempotent {
    
    /**
     * 幂等Key,只有在 {@link Idempotent#type()} 为 {@link IdempotentTypeEnum#SPEL} 时生效
     */
    String key() default "";
    
    /**
     * 触发幂等失败逻辑时,返回的错误提示信息
     */
    String message() default "您操作太快,请稍后再试";
    
    /**
     * 验证幂等类型,支持多种幂等方式
     * RestAPI 建议使用 {@link IdempotentTypeEnum#TOKEN} 或 {@link IdempotentTypeEnum#PARAM}
     * 其它类型幂等验证,使用 {@link IdempotentTypeEnum#SPEL}
     */
    IdempotentTypeEnum type() default IdempotentTypeEnum.PARAM;
    
    /**
     * 验证幂等场景,支持多种 {@link IdempotentSceneEnum}
     */
    IdempotentSceneEnum scene() default IdempotentSceneEnum.RESTAPI;
    
    /**
     * 设置防重令牌 Key 前缀,MQ 幂等去重可选设置
     * {@link IdempotentSceneEnum#MQ} and {@link IdempotentTypeEnum#SPEL} 时生效
     */
    String uniqueKeyPrefix() default "";
    
    /**
     * 设置防重令牌 Key 过期时间,单位秒,默认 1 小时,MQ 幂等去重可选设置
     * {@link IdempotentSceneEnum#MQ} and {@link IdempotentTypeEnum#SPEL} 时生效
     */
    long keyTimeout() default 3600L;
}

为了方便理解,整理成思维导图方便记忆。

 

2. 定义 AOP 增强

我们使用 AOP 技术为方法增强提供了通用的幂等性保证,只需要在需要保证幂等性的方法上添加 @Idempotent 注解,Aspect 就会对该方法进行增强。

这种技术不仅适用于 RestAPI 场景,还适用于消息队列的防重复消费场景。

java复制代码package org.opengoofy.index12306.framework.starter.idempotent.core;

import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.opengoofy.index12306.framework.starter.idempotent.annotation.Idempotent;

import java.lang.reflect.Method;

/**
 * 幂等注解 AOP 拦截器
 *
 * @公众号:马丁玩编程,回复:加群,添加马哥微信(备注:12306)获取项目资料
 */
@Aspect
public final class IdempotentAspect {

    /**
     * 增强方法标记 {@link Idempotent} 注解逻辑
     */
    @Around("@annotation(org.opengoofy.index12306.framework.starter.idempotent.annotation.Idempotent)")
    public Object idempotentHandler(ProceedingJoinPoint joinPoint) throws Throwable {
        Idempotent idempotent = getIdempotent(joinPoint);
        IdempotentExecuteHandler instance = IdempotentExecuteHandlerFactory.getInstance(idempotent.scene(), idempotent.type());
        Object resultObj;
        try {
            instance.execute(joinPoint, idempotent);
            resultObj = joinPoint.proceed();
            instance.postProcessing();
        } catch (RepeatConsumptionException ex) {
            /**
             * 触发幂等逻辑时可能有两种情况:
             *    * 1. 消息还在处理,但是不确定是否执行成功,那么需要返回错误,方便 RocketMQ 再次通过重试队列投递
             *    * 2. 消息处理成功了,该消息直接返回成功即可
             */
            if (!ex.getError()) {
                return null;
            }
            throw ex;
        } catch (Throwable ex) {
            // 客户端消费存在异常,需要删除幂等标识方便下次 RocketMQ 再次通过重试队列投递
            instance.exceptionProcessing();
            throw ex;
        } finally {
            IdempotentContext.clean();
        }
        return resultObj;
    }

    public static Idempotent getIdempotent(ProceedingJoinPoint joinPoint) throws NoSuchMethodException {
        MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature();
        Method targetMethod = joinPoint.getTarget().getClass().getDeclaredMethod(methodSignature.getName(), methodSignature.getMethod().getParameterTypes());
        return targetMethod.getAnnotation(Idempotent.class);
    }
}

这个方法的执行逻辑与设计部分相同,因此在此处不再贴出具体的代码。大家可以跟着设计阅读幂等源码。

为了提高通用性和抽象性,该组件采用了模板方法和简单工厂等设计模式,这有助于隔离复杂性和提高可扩展性。如果您在学习过程中遇到问题,欢迎在知识星球 APP 上向我提问。

3. 实际场景使用

以实现支付结果回调订单为例,我们可以将通用组件引入到消息消费的逻辑中,具体流程如下:

java复制代码@Slf4j
@Component
@RequiredArgsConstructor
@RocketMQMessageListener(
        topic = OrderRocketMQConstant.PAY_GLOBAL_TOPIC_KEY,
        selectorExpression = OrderRocketMQConstant.PAY_RESULT_CALLBACK_ORDER_TAG_KEY,
        consumerGroup = OrderRocketMQConstant.PAY_RESULT_CALLBACK_ORDER_CG_KEY
)
public class PayResultCallbackOrderConsumer implements RocketMQListener<MessageWrapper<PayResultCallbackOrderEvent>> {

    private final OrderService orderService;

    @Idempotent(
            uniqueKeyPrefix = "index12306-order:pay_result_callback:",
            key = "#message.getKeys()+'_'+#message.hashCode()",
            type = IdempotentTypeEnum.SPEL,
            scene = IdempotentSceneEnum.MQ,
            keyTimeout = 7200L
    )
    @Transactional(rollbackFor = Exception.class)
    @Override
    public void onMessage(MessageWrapper<PayResultCallbackOrderEvent> message) {
        PayResultCallbackOrderEvent payResultCallbackOrderEvent = message.getMessage();
        OrderStatusReversalDTO orderStatusReversalDTO = OrderStatusReversalDTO.builder()
                .orderSn(payResultCallbackOrderEvent.getOrderSn())
                .orderStatus(OrderStatusEnum.ALREADY_PAID.getStatus())
                .build();
        orderService.statusReversal(orderStatusReversalDTO);
        orderService.payCallbackOrder(payResultCallbackOrderEvent);
    }
}

支持通过 SpEL 表达式来充当幂等去重表唯一键,通过一个简单的注解,完美解决消息队列重复消费问题。

更复杂的幂等场景

到这里,方案看起来非常完美,所有的消息都可以快速接入去重,而且与具体业务实现完全解耦。但是,是否这样就可以完美地完成去重的所有任务呢? 很遗憾,实际上并非如此。因为需要确保消息至少成功消费一次,因此消息在消费过程中有可能失败并触发重试。

还是以上面的例子,假设消息消费的流程包含:

  1. 检查库存(RPC)
  2. 锁库存(RPC)
  3. 开启事务,插入订单表(MySQL)
  4. 调用某些其他下游服务(RPC)
  5. 更新订单状态
  6. commit 事务(MySQL)

当消息消费到第三步的时候假设 MySQL 异常导致失败了,触发消息重试。在重试前我们会删除幂等表的记录,所以消息重试的时候就会重新进入消费代码,那么步骤 1 和步骤 2 就会重新再执行一遍。

如果步骤 2 本身不是幂等的,那么这个业务消息消费依旧没有做好完整的幂等处理。

1. 通用方法实现价值

尽管这种方式并不能完全解决消息幂等问题(事实上,软件工程领域里很少有银弹可以完全解决问题),但它仍然具有很大的价值。通过这种简便的方式,我们能够解决以下问题:

  1. 各种由于Broker、负载均衡等原因导致的消息重投递的重复问题。
  2. 各种上游生产者导致的业务级别消息重复问题。
  3. 重复消息并发消费的控制窗口问题,就算重复,重复也不可能同一时间进入消费逻辑。

2. 消息去重的建议

使用这种方法可以确保在正常的消费逻辑场景下(无异常,无异常退出),消息的幂等性全部得到解决,无论是业务重复还是 RocketMQ 特性带来的重复。虽然它不是解决消息幂等性的银弹,但它以一种简单和便捷的方式提供了解决方案。

实际上,这种方法已经可以解决 99% 的消息重复问题了,因为异常情况通常是少数情况。但是,如果希望在异常情况下也能处理好幂等问题,可以采取以下措施来降低问题发生的概率:

  1. 消息消费失败时,应该及时回滚处理。如果消息消费失败本身具备回滚机制,则消息重试也就没有副作用了。
  2. 为了尽可能避免程序异常退出导致的消息重试,需要在消费者代码中做好优雅退出处理。
  3. 针对一些无法做到完全幂等的操作,至少要做到终止消息的消费并进行告警。比如锁定库存的操作,如果通过业务流水号已经成功锁定了库存,再次触发锁库存操作的话,如果无法做到幂等性处理,那么至少要在消息消费过程中触发异常(如因主键冲突导致消费异常等),并终止消息的消费,以避免重复消费产生的副作用。
  4. 在 #3 做好的前提下,做好消息的消费监控,发现消息重试不断失败的时候,手动做好 #1 的回滚,使得下次重试消费成功。

文末总结

当我们在使用 RocketMQ 进行消息处理时,消息的幂等性是一个非常重要的问题。本文通过抽象出通用组件的方式,实现了 RestAPI 和 RocketMQ 的幂等处理。 同时,我们也发现,幂等性并不是一个银弹,不同的业务场景需要不同的幂等处理策略。

但是,通过一些基本的处理策略,如优雅退出、回滚处理、消费监控等,我们能够大大减少消息重复的问题,提高消息消费的稳定性和可靠性。 在实际开发中,需要结合具体业务场景,选择合适的幂等处理策略,并且在每次新的场景出现时,都需要仔细考虑是否需要重新审视幂等性的处理方式。

上文中的代码以及实现已在基础架构模块中定义,详情查看。

 


原文链接:
https://juejin.cn/post/7270139990696722484



Tags:消息队列   点击:()  评论:()
声明:本站部分内容及图片来自互联网,转载是出于传递更多信息之目的,内容观点仅代表作者本人,不构成投资建议。投资者据此操作,风险自担。如有任何标注错误或版权侵犯请与我们联系,我们将及时更正、删除。
▌相关推荐
如何使用 Redis 实现消息队列
Redis不仅是一个强大的内存数据存储系统,它还可以用作一个高效的消息队列。消息队列是应用程序间或应用程序内部进行异步通信的一种方式,它允许数据生产者将消息放入队列中,然...【详细内容】
2024-03-22  Search: 消息队列  点击:(17)  评论:(0)  加入收藏
消息队列备选架构选择,你选择哪个?
中间件团队的研发人员认为这个方案比较简单,实现成本低,但测试代表认为这个方案测试人力投入较大。运维团队认为这个方案的硬件成本比较高,一个数据分组就需要4台机器(2台服务器...【详细内容】
2023-11-30  Search: 消息队列  点击:(180)  评论:(0)  加入收藏
四种消息队列,如何选型?
最近发现很多号主发消息队列的文章,质量参差不齐,相关文章我之前也写过,建议直接看这篇。这篇文章,主要讲述 Kafka、RabbitMQ、RocketMQ 和 ActiveMQ 这 4 种消息队列的异同,无论...【详细内容】
2023-11-27  Search: 消息队列  点击:(193)  评论:(0)  加入收藏
几款主流消息队列之间的差异,我们应该如何选择
为什么需要消息队列消息队列是历史最悠久的中间件之一,它可以和不同的进程进行通信,从而实现上下游之间的消息传递。基于此特性,我们可以在以下三个场景中使用消息队列。 解耦; ...【详细内容】
2023-11-17  Search: 消息队列  点击:(124)  评论:(0)  加入收藏
常用消息队列框架与技术选型
又是一年双11季,土豪们买买买,程序员看看热闹,聊聊技术。海量的订单、支付请求以及库存更新等任务,离不开分布式架构(SOFAStack)、分布式数据库(OceanBase)、分布式缓存(Tair)、数据处...【详细内容】
2023-11-13  Search: 消息队列  点击:(204)  评论:(0)  加入收藏
Java中的消息队列实战,构建高效异步系统
随着互联网应用的发展,高效的异步系统变得越来越重要。在这样的系统中,消息队列起到了关键的作用。通过消息队列,可以将不同组件之间的耦合度降低,实现解耦和异步处理,提高系统的...【详细内容】
2023-11-07  Search: 消息队列  点击:(324)  评论:(0)  加入收藏
热门的消息队列框架比较、使用方法、优缺点,提供示例代码
消息队列(Message Queue)是一种在分布式系统中用于消息传递的通信模式。它可以将消息发送者和接收者解耦,提高系统的可靠性、可扩展性和可维护性。下面将详细介绍3-5个常用的...【详细内容】
2023-10-11  Search: 消息队列  点击:(151)  评论:(0)  加入收藏
为什么我们需要消息队列?
消息队列有着悠久的历史,它们经常用于不同系统之间的通信。图1通过将其与星巴克的工作方式进行比较,阐述了消息队列的概念。在星巴克,收银员接受订单并收取款项,然后在咖啡杯上...【详细内容】
2023-09-05  Search: 消息队列  点击:(353)  评论:(0)  加入收藏
使用 SQL 的方式查询消息队列数据以及踩坑指南
Pulsar-SQL 是一个非常有用的功能,只是我们使用过程中确实发现了一些问题,大部分都已经修复了;希望对后续使用该功能的朋友有所帮助。背景为了让业务团队可以更好的跟踪自己消...【详细内容】
2023-08-31  Search: 消息队列  点击:(275)  评论:(0)  加入收藏
Java消息队列开发实战,打造高效异步处理
随着互联网应用的发展,高效的异步系统变得越来越重要。在这样的系统中,消息队列起到了关键的作用。通过消息队列,可以将不同组件之间的耦合度降低,实现解耦和异步处理,提高系统的...【详细内容】
2023-08-30  Search: 消息队列  点击:(357)  评论:(0)  加入收藏
▌简易百科推荐
即将过时的 5 种软件开发技能!
作者 | Eran Yahav编译 | 言征出品 | 51CTO技术栈(微信号:blog51cto) 时至今日,AI编码工具已经进化到足够强大了吗?这未必好回答,但从2023 年 Stack Overflow 上的调查数据来看,44%...【详细内容】
2024-04-03    51CTO  Tags:软件开发   点击:(5)  评论:(0)  加入收藏
跳转链接代码怎么写?
在网页开发中,跳转链接是一项常见的功能。然而,对于非技术人员来说,编写跳转链接代码可能会显得有些困难。不用担心!我们可以借助外链平台来简化操作,即使没有编程经验,也能轻松实...【详细内容】
2024-03-27  蓝色天纪    Tags:跳转链接   点击:(12)  评论:(0)  加入收藏
中台亡了,问题到底出在哪里?
曾几何时,中台一度被当做“变革灵药”,嫁接在“前台作战单元”和“后台资源部门”之间,实现企业各业务线的“打通”和全域业务能力集成,提高开发和服务效率。但在中台如火如荼之...【详细内容】
2024-03-27  dbaplus社群    Tags:中台   点击:(8)  评论:(0)  加入收藏
员工写了个比删库更可怕的Bug!
想必大家都听说过删库跑路吧,我之前一直把它当一个段子来看。可万万没想到,就在昨天,我们公司的某位员工,竟然写了一个比删库更可怕的 Bug!给大家分享一下(不是公开处刑),希望朋友们...【详细内容】
2024-03-26  dbaplus社群    Tags:Bug   点击:(5)  评论:(0)  加入收藏
我们一起聊聊什么是正向代理和反向代理
从字面意思上看,代理就是代替处理的意思,一个对象有能力代替另一个对象处理某一件事。代理,这个词在我们的日常生活中也不陌生,比如在购物、旅游等场景中,我们经常会委托别人代替...【详细内容】
2024-03-26  萤火架构  微信公众号  Tags:正向代理   点击:(10)  评论:(0)  加入收藏
看一遍就理解:IO模型详解
前言大家好,我是程序员田螺。今天我们一起来学习IO模型。在本文开始前呢,先问问大家几个问题哈~什么是IO呢?什么是阻塞非阻塞IO?什么是同步异步IO?什么是IO多路复用?select/epoll...【详细内容】
2024-03-26  捡田螺的小男孩  微信公众号  Tags:IO模型   点击:(8)  评论:(0)  加入收藏
为什么都说 HashMap 是线程不安全的?
做Java开发的人,应该都用过 HashMap 这种集合。今天就和大家来聊聊,为什么 HashMap 是线程不安全的。1.HashMap 数据结构简单来说,HashMap 基于哈希表实现。它使用键的哈希码来...【详细内容】
2024-03-22  Java技术指北  微信公众号  Tags:HashMap   点击:(11)  评论:(0)  加入收藏
如何从头开始编写LoRA代码,这有一份教程
选自 lightning.ai作者:Sebastian Raschka机器之心编译编辑:陈萍作者表示:在各种有效的 LLM 微调方法中,LoRA 仍然是他的首选。LoRA(Low-Rank Adaptation)作为一种用于微调 LLM(大...【详细内容】
2024-03-21  机器之心Pro    Tags:LoRA   点击:(12)  评论:(0)  加入收藏
这样搭建日志中心,传统的ELK就扔了吧!
最近客户有个新需求,就是想查看网站的访问情况。由于网站没有做google的统计和百度的统计,所以访问情况,只能通过日志查看,通过脚本的形式给客户导出也不太实际,给客户写个简单的...【详细内容】
2024-03-20  dbaplus社群    Tags:日志   点击:(4)  评论:(0)  加入收藏
Kubernetes 究竟有没有 LTS?
从一个有趣的问题引出很多人都在关注的 Kubernetes LTS 的问题。有趣的问题2019 年,一个名为 apiserver LoopbackClient Server cert expired after 1 year[1] 的 issue 中提...【详细内容】
2024-03-15  云原生散修  微信公众号  Tags:Kubernetes   点击:(6)  评论:(0)  加入收藏
站内最新
站内热门
站内头条