找回密码
 立即注册
首页 业界区 业界 RabbitMQ常见问题

RabbitMQ常见问题

寇油 2025-6-6 14:53:09
RabbitMQ

1、记一次线上RabbitMQ的堵塞问题

当时解决问题参考的文档:https://www.codenong.com/cs109484329/
1、背景

RabbitMQ同步外省市运单到本系统中
2、问题

某天早上上班,发现运维群里有很多企业反馈,在系统中查不到自己最新的运单了,当时我赶到公司,打开系统一看,昨天晚上10:30点之后,运单就没有新增了,当时我首先想到的是交通部那边没有将运单推送到MQ的队列中,我打开服务器log,发现没有MQ消费相关的log,最早的MQ消费log还是昨天晚上10:30左右的。当时我赶紧让运维同事看了一下,发现MQ中堆积了一两千个消息没有消费。
1.png

并且再10:30左右的log出现了很多报错log
经过分析错误log,发现昨天晚上10:30左右,MQ消费的消息报错了(业务报错)
3、解决

1、根据报错原因,解决报错
解决报错之后,发布线上,发现消费端又开始正常消费了,经过一天的观察,MQ消费正常
4、问题原因

后面有时间的时候,想了一下MQ不消费的原因:
MQ设置的是手动应答,由于消费端消费报错,没有正常ack,导致MQ中出现了很多unacked的消息。这个时候MQ会认为消费端已经没有能力去消费消息了,就不会再发送消息给消费者了,但是消息生产者继续将消息推送到MQ中,导致ready消息越来越多,但又不消费了,就导致了消息堵塞。
上面的其实是MQ的一种保护消费者的机制:QOS(服务质量保证)
5、QOS(服务质量保证)

在手动应答模式下启用, 在消费端出现大量报错,无法正常ack的情况下,MQ出现一定数量unacked,MQ为了保护消费端不在报错,MQ将不在发送消息给消费者,进而保护消费端服务的正常运行。
可以通过设置参数:PrefetchCount(spring.rabbitmq.listener.simple.prefetch),来设置MQ支持的最大未正常确认消息数量。
  1. spring:
  2.   # 消息队列
  3.   rabbitmq:
  4.     host: 1.1.1.1
  5.     port: 5672
  6.     username: 1
  7.     password: 1
  8.     #虚拟主机,用于隔离业务
  9.     virtual-host: 1
  10.     # 消息发送确认
  11.     publisher-confirm-type: correlated
  12.     # 开启发送失败退回
  13.     publisher-returns: true
  14.     listener:
  15.       simple:
  16.         # 消费端最小并发数
  17.         concurrency: 1
  18.         # 消费端最大并发数
  19.         max-concurrency: 5
  20.         # 一次请求中预处理的消息数量
  21.         prefetch: 2
  22.         # 手动应答
  23.         acknowledge-mode: manual
  24.         # 重试配置
  25.         retry:
  26.           enabled: true
  27.           max-attempts: 3
  28.           initial-interval: 5000ms
  29.           max-interval: 1200000ms
  30.           multiplier: 2
复制代码
如上面Prefetch=2,那么当有两个消息没有正常ack的时候,MQ就会不再发送消息了
6、为什么重启之后,消息又正常消费了呢

因为重启之后,unacked的消息,会重新会排到队列开头重新被消费,那么后面正常的消息就能继续被推送
7、如何判断是否又堵塞的风险

参考:https://www.codenong.com/cs109484329/
堵塞是因为unacked数量达到了限制
允许出现unacked的数量可以通过channelCount * prefetchCount * 节点数量 得出。
channlCount就是由concurrency,max-concurrency决定的。
所以
  1. min = concurrency * prefetch * 节点数量
  2. max = max-concurrency * prefetch * 节点数量
复制代码
结论
  1. unacked_msg_count < min 队列不会阻塞。但需要及时处理unacked的消息。
  2. unacked_msg_count >= min 可能会出现堵塞。
  3. unacked_msg_count >= max 队列一定阻塞
复制代码
消费者消费MQ消息,有一个缓冲池,会一下拉一批消息到缓冲池中,消费者从缓冲池中消费消息,缓冲池大小=max-concurrency(最大并发数) * prefetch(一次预处理消费的消息数)
消息再缓冲池中,属于待消费的消息,也就是unacked状态,所以缓冲池中的消息数量=unacked最大数量,如果unacked超过这个值,会触发QPS保护
如:max-concurrency=5,prefetch=20
2.png

3.png

max-concurrency:最大并发送,如设置5,那么MQ消费者就有5个
4.png

5.png

8、事故重现

1、环境

1、生产者
  1. @PostMapping(value = "/pushOkMsg")
  2. public R<String> pushOkMsg(@RequestParam(value = "num")Integer num,@RequestParam(value = "msg")String msg){
  3.     for (int integer = 0; integer < num; integer++) {
  4.         String msgId = UUID.randomUUID().toString().toLowerCase().replaceAll("-", "");
  5.         CorrelationData correlationData = new CorrelationData(msgId);
  6.         rabbitTemplate.convertAndSend(MqConfigV2.TEST_QUEUE_KEY_V1,msg.getBytes(StandardCharsets.UTF_8),correlationData);
  7.     }
  8.     return R.ok("success!!!");
  9. }
复制代码
2、生产者配置

6.png

3、队列

7.png

4、消费者
  1. @RabbitListener(queues = MqConfigV2.TEST_QUEUE_KEY_V1,containerFactory = "customContainerFactory")
  2. @RabbitHandlerpublic void test4(Message message, Channel channel, @Headers Map<String, Object> heads) throws Exception {
  3.     String data = new String(message.getBody(), StandardCharsets.UTF_8);
  4.     System.out.println(MqConfigV2.BASE_YD_QUEUE+" 消息接收=" + data);
  5.     if("error".equals(data)){
  6.         throw new RuntimeException("系统报错了!!!");
  7.     }
  8.     //模拟业务处理
  9.     Thread.sleep(1000);
  10.     channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  11. }
复制代码
2、重现

1、先发送10条正常消息

系统正常
8.png

9.png

10.png

2、发送一条error消息

系统报错,消费失败,未正常ack,mq中出现一条unacked
11.png

12.png

13.png

3、再发送10条正常消息

由于只有一条unacked消息,小于配置的prefetch=2
系统正常消费
14.png

15.png

16.png

4、再发送一条error

MQ出现两条unacked
17.png

18.png

19.png

5、发送10条正常的消息

消费者不消费了,MQ消息也堵塞了,因为unacked=2,大于等于prefetch=2
问题重现成功了
20.png

21.png

22.png

其他博客的描述

23.png


来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

您需要登录后才可以回帖 登录 | 立即注册