您当前的位置:首页 > 电脑百科 > 程序开发 > 架构

如何消息队列的数据积压问题

时间:2020-08-07 11:28:09  来源:  作者:

今天,就讲讲解决消息队列的数据积压的三个方案。

 

1 概述

最近生产环境的消息通知队列发生了大量的数据积压问题,从而影响到整个平台商户的交易无法正常进行,最后只能通过临时关闭交易量较大的商户来缓解消息队列积压的问题,经线上数据分析,我们的消息队列在面对交易突发洪峰的情况下无法快速的消费并处理队列中的数据,考虑到后续还会出现各种交易量突发状况,以下为针对消息队列(ActiveMQ)的优化过程。

 

2 消息队列通信图

三招!解决消息队列的数据积压问题

 

3 问题定位与分析

3.1 消息通知数据为什么会被积压?分析:平台中每个交易的发生可能会产生一到多条的消息通知数据,这些通知数据会通过消息队列(ActiveMQ)来中转消费并处理,那么在交易量突发洪峰的情况下会产生大量的消息通知数据,如果消息队列(ActiveMQ)的消费能力被阻塞的话会严重影响到数据的吞吐量,从而积压大量数据无法被快速处理!

3.2 配置了多个ActiveMQ的消费者为什么数据积压还是无法缓解?分析:经过分析消息队列的数据消费处理模块的代码,消息的消费处理是通过监听器SessionAwareMessageListener异步回调onMessage方法而接收消息的,但是在回调的方法onMessage上加了synchronized同步锁,问题就在这里,由于整个onMessage方法被锁,导致程序只能通过串行(一次只能消费一条数据)处理数据,而无法通过多线程并发处理数据,从而影响了整个队列的数据消费能力。

public synchronized void onMessage(Message message, Session session)

3.3 去掉synchronized同步锁会产生多线程并发的安全性问题吗?分析:首先多个消费者并发处理的数据是不同的,而且多个消费者线程并发回调onMessage方法的时候并未使用到共享的变量,全部在各自线程的方法栈中,所以理论上不会出现多线程并发产生的安全性问题。

3.4 消息会被重复多次消费吗?

分析:

(1)通过分析ActiveMQ的消费者消息接收处理的源代码发现,一条消息是否已经消费是通过ack确认机制来保证的,如果是通过异步回调的方式接收消息的话,在onMessage回调函数返回之后会立即进行ack确认提交,那么只要保证onMessage函数内部不抛出异常,及需要内部捕获异常,那么消息就不会被重复消息。

(2)因为我们的系统在接收到消息后会首先存入db中进行持久化,而且每条消息在存入数据库的时候都做了唯一性约束,那么即使有重复的消息也不会被正常处理。

 

4 阶段一优化方案

4.1 准备测试数据启动多个线程分别往MQ消息队列中发送数据,共发送15000个消息,然后启动消费者模块消费消息,设定每个消息处理耗时为10ms,配置ActiveMQ的消费者数量为concurrency = 5-100

4.2 优化前性能测试

测试次数 是否并发处理 消息数量

queuePrefetch

consumers 耗时

1 否 15000 1000 15 151s
2 否 15000 1000 16 151s
3 否 15000 1000 15 151s

优化前通过测试数据发现,虽然配置了concurrency = 5-100 (消费者动态伸缩),但是只有15个消费者在忙碌,而且消息都是串行化执行的,15000条消息共需要151s的时间,效率非常差,ps:哈哈,不知道是哪位开发的大神加的同步锁!

注:queuePrefetch 为MQ的消费者一次从Queue中拉取的数量,默认为1000,consumers为处理消息的消费者数量

4.3 优化后性能测试

4.3.1 取消同步锁取消在监听器的回调方法onMessage上的synchronized同步锁

4.3.2 取消同步锁后的性能测试

测试次数 是否并发处理 消息数量

queuePrefetch

consumers 耗时

1 是 15000 1000 14 13s
2 是 15000 1000 15 13s
3 是 15000 1000 15 13s

通过以上数据发现取消同步锁,15000条消息只需要13s就可以处理完,相比之前快了近12倍,虽然速度提升了不少,但是发现配置了5-100的消费者,确只有15个消费者在忙碌,其他消费者都没有消息可处理,及造成了数据倾斜,那么接下来就要通过优化queuePrefetch 参数了。

4.3.3 优化ActiveMQ的queuePrefetch 参数预获取消息数量是MQ中重要的调优参数之一,为了提高网络的传输效率,ActiveMQ默认给Consumer批量push 1000条消息,可以从ActiveMQ源码中的ActiveMQPrefetchPolicy类的DEFAULT_QUEUE_PREFETCH字段得知,考虑到我们的通知消息的消费处理中涉及到数据库的操作,以及综合网络传输效率,这里将queuePrefetch的值设置为100,具体需配置到ActiveMQ的连接地址后,如:

tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=100

4.3.4 优化queuePrefetch参数后的性能测试

测试次数 是否并发处理 消息数量

queuePrefetch

consumers 耗时

1 是 15000 100 40 7s
2 是 15000 100 47 5s
3 是 15000 100 41 6s

将ActiveMQ的queuePrefetch参数修改为100,那么发现有近一半的消费者在处理数据,最后15000条消息需要6s中就可以处理完成。

4.3.5 结论通过以上两步的优化后的测试结果可以得出,取消同步锁之后队列的消费能力提升了近11倍,在取消同步锁的基础上再优化ActiveMQ批处理参数后性能又提升了近1倍,综合以上两步的优化处理,队列整体的消费能力提高了30多倍。

三招!解决消息队列的数据积压问题

 

5 阶段二优化方案

阶段二的优化方案是在阶段一的基础上进行的优化处理

5.1 单队列处理

三招!解决消息队列的数据积压问题

由于我们的消息通知业务属于幂等性操作,会按照设定的通知次数来反复通知处理,直到通知成功为止,我们系统现在的做法是将接收到MQ的消息暂存于延时队列(DelayQueue)中,然后通过多线程轮训取出,然后通过HTTP通知到其他模块处理,如果通知失败,则重新放入同一个延时队列等待下次执行,如上图:消息1通知失败后会重新放入延时队列。

注:单队列处理的不足

由于使用了单队列处理,使得可以一次通知成功的消息与通知多次失败的消息混合在了一起,这样在队列中失败通知的消息就会阻塞到后续可以正常通知的消息,最终导致消息整体的一个吞吐量下降

5.2 双队列处理

三招!解决消息队列的数据积压问题

针对5.1单队列的不足,我们可以重新设计,将单队列设计为双队列处理,双队列的核心思想为如果队列1中的消息通知失败,则不再重新放入队列1,而是放入队列2去通知,这样可以起到消息数据分离的作用,及失败通知的数据不再会影响到后续可以成功通知的消息,从而提高队列消息通知的整体性能!

 

6 阶段三优化方案

6.1 MQ组件重选型

ActiveMQ是一个老牌的消息队列组件,吞吐量方面表现不是很理想,适合在业务量不大的场景中使用,现在有非常多比较成熟及高性能高吞吐的消息队列组件可供我们选择,如:RabbitMQ、RocketMQ、Kafka,后续可根据实际情况考虑替换掉ActiveMQ组件。

 

7 总结

针对消息队列的数据积压问题,我们主要做了三个方面的优化处理,取消同步锁、ActiveMQ参数优化、本地双队列优化,通过这三个方面的优化基本解决了队列数据积压的问题。

文章来源:http://JAVAjgs.com/archives/5572



Tags:消息队列   点击:()  评论:()
声明:本站部分内容来自互联网,转载是出于传递更多信息之目的,内容观点仅代表作者本人,如有任何标注错误或版权侵犯请与我们联系,我们将及时更正、删除,谢谢。
▌相关评论
发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表
▌相关推荐
消息队列是在两个进程之间传递二进制数据的一种简单有效的方式。每个数据块都有一个特定的类型,接收方可以根据类型来有选择地接受数据,而不一定像管道和命名管道那样必须以先...【详细内容】
2020-08-20   消息队列  点击:(4)  评论:(0)  加入收藏
今天,就讲讲解决消息队列的数据积压的三个方案。 1 概述最近生产环境的消息通知队列发生了大量的数据积压问题,从而影响到整个平台商户的交易无法正常进行,最后只能通过临时关...【详细内容】
2020-08-07   消息队列  点击:(3)  评论:(0)  加入收藏
作者:趁你还年轻转发链接:https://segmentfault.com/a/1190000022950333前言看到这些词仿佛比较让人摸不着头脑,其实在我们的日常开发中,早就和它们打过交道了。我来举几个常见...【详细内容】
2020-06-18   消息队列  点击:(3)  评论:(0)  加入收藏
熟悉Apache 大数据开发技术栈的朋友都知道 Kafka 在大数据开发中的作用,所以面试中会遇到相关问题。这个问题问的非常好,所以有必要记录一下。对从事大数据开发的工程师来...【详细内容】
2020-06-16   消息队列  点击:(44)  评论:(0)  加入收藏
一、简单的发送与接收消息 HelloWorld1. 发送消息发送消息首先要获取与rabbitmq-server的连接,然后从渠道(chann)中指定的queue发送消息 , 不能定义两个queue名字相同,但属性...【详细内容】
2020-04-03   消息队列  点击:(16)  评论:(0)  加入收藏
索引: 基于list的实现方式 基于publish/subscribe 实战消息队列简介消息队列:是消息的顺序集合。 比如网站的PV统计和查看,传统方式就是每个页面发一个AJAX然后mysql给PV+1。用...【详细内容】
2020-01-02   消息队列  点击:(28)  评论:(0)  加入收藏
最近小L会听到很多学员说,在面试大型互联网公司的时候,很可能会被问到消息队列的问题: 在何种场景下使用了消息中间件? 为什么要在系统里引入消息中间件? 如何实现幂等?链式调...【详细内容】
2019-11-20   消息队列  点击:(23)  评论:(0)  加入收藏
Redis以内存数据库而闻名。但是,某些系统将它用作消息队列管理工具。Pub/Sub 和 RPOPLPUSH 是用于实现这样一个系统的两组命令。在这篇文章中,我将分享一些关于这两个命令集的...【详细内容】
2019-11-14   消息队列  点击:(31)  评论:(0)  加入收藏
在互联网中,我们常常讲究着唯快不破,相信不少人都有这样的经历,明明一个非常简单的需求,但却要修改非常多的系统,导致开发效率低下,从而不停的加班。一个好的程序员,一定要学会设计...【详细内容】
2019-09-10   消息队列  点击:(36)  评论:(0)  加入收藏
来源:https://dwz.cn/y8GDcqOh在项目中为什么要使用消息队列消息队列使用场景主要有三个:解耦,异步,削峰1、解耦 如上图所示,可能存在某一个系统产生关键数据,所有系统都需要其进...【详细内容】
2019-09-02   消息队列  点击:(41)  评论:(0)  加入收藏
作者:yanglbme 来源:https://github.com/doocs/advanced-java/blob/master/docs/high-concurrency/mq-design.md # 面试官心理分析 其实聊到这个问题,一般面试官要考察两块: 1、...【详细内容】
2019-08-20   消息队列  点击:(70)  评论:(0)  加入收藏
背景最近项目有个需求需要动态更新规则,当时脑中想到的第一个方案是利用zk的监听机制,管理人员更新完规则将状态写入zk,集群中的机器监听zk的状态,当有状态变更后,集群中的机器开...【详细内容】
2019-08-12   消息队列  点击:(81)  评论:(0)  加入收藏
最新更新
栏目热门
栏目头条