寇油 发表于 2025-6-6 14:53:09

RabbitMQ常见问题

RabbitMQ

1、记一次线上RabbitMQ的堵塞问题

当时解决问题参考的文档:https://www.codenong.com/cs109484329/
1、背景

RabbitMQ同步外省市运单到本系统中
2、问题

某天早上上班,发现运维群里有很多企业反馈,在系统中查不到自己最新的运单了,当时我赶到公司,打开系统一看,昨天晚上10:30点之后,运单就没有新增了,当时我首先想到的是交通部那边没有将运单推送到MQ的队列中,我打开服务器log,发现没有MQ消费相关的log,最早的MQ消费log还是昨天晚上10:30左右的。当时我赶紧让运维同事看了一下,发现MQ中堆积了一两千个消息没有消费。

并且再10:30左右的log出现了很多报错log
经过分析错误log,发现昨天晚上10:30左右,MQ消费的消息报错了(业务报错)
3、解决

1、根据报错原因,解决报错
解决报错之后,发布线上,发现消费端又开始正常消费了,经过一天的观察,MQ消费正常
4、问题原因

后面有时间的时候,想了一下MQ不消费的原因:
MQ设置的是手动应答,由于消费端消费报错,没有正常ack,导致MQ中出现了很多unacked的消息。这个时候MQ会认为消费端已经没有能力去消费消息了,就不会再发送消息给消费者了,但是消息生产者继续将消息推送到MQ中,导致ready消息越来越多,但又不消费了,就导致了消息堵塞。
上面的其实是MQ的一种保护消费者的机制:QOS(服务质量保证)
5、QOS(服务质量保证)

在手动应答模式下启用, 在消费端出现大量报错,无法正常ack的情况下,MQ出现一定数量unacked,MQ为了保护消费端不在报错,MQ将不在发送消息给消费者,进而保护消费端服务的正常运行。
可以通过设置参数:PrefetchCount(spring.rabbitmq.listener.simple.prefetch),来设置MQ支持的最大未正常确认消息数量。
spring:
# 消息队列
rabbitmq:
    host: 1.1.1.1
    port: 5672
    username: 1
    password: 1
    #虚拟主机,用于隔离业务
    virtual-host: 1
    # 消息发送确认
    publisher-confirm-type: correlated
    # 开启发送失败退回
    publisher-returns: true
    listener:
      simple:
      # 消费端最小并发数
      concurrency: 1
      # 消费端最大并发数
      max-concurrency: 5
      # 一次请求中预处理的消息数量
      prefetch: 2
      # 手动应答
      acknowledge-mode: manual
      # 重试配置
      retry:
          enabled: true
          max-attempts: 3
          initial-interval: 5000ms
          max-interval: 1200000ms
          multiplier: 2如上面Prefetch=2,那么当有两个消息没有正常ack的时候,MQ就会不再发送消息了
6、为什么重启之后,消息又正常消费了呢

因为重启之后,unacked的消息,会重新会排到队列开头重新被消费,那么后面正常的消息就能继续被推送
7、如何判断是否又堵塞的风险

参考:https://www.codenong.com/cs109484329/
堵塞是因为unacked数量达到了限制
允许出现unacked的数量可以通过channelCount * prefetchCount * 节点数量 得出。
channlCount就是由concurrency,max-concurrency决定的。
所以
min = concurrency * prefetch * 节点数量
max = max-concurrency * prefetch * 节点数量结论
unacked_msg_count < min 队列不会阻塞。但需要及时处理unacked的消息。
unacked_msg_count >= min 可能会出现堵塞。
unacked_msg_count >= max 队列一定阻塞消费者消费MQ消息,有一个缓冲池,会一下拉一批消息到缓冲池中,消费者从缓冲池中消费消息,缓冲池大小=max-concurrency(最大并发数) * prefetch(一次预处理消费的消息数)
消息再缓冲池中,属于待消费的消息,也就是unacked状态,所以缓冲池中的消息数量=unacked最大数量,如果unacked超过这个值,会触发QPS保护
如:max-concurrency=5,prefetch=20


max-concurrency:最大并发送,如设置5,那么MQ消费者就有5个


8、事故重现

1、环境

1、生产者

@PostMapping(value = "/pushOkMsg")
public R<String> pushOkMsg(@RequestParam(value = "num")Integer num,@RequestParam(value = "msg")String msg){
    for (int integer = 0; integer < num; integer++) {
      String msgId = UUID.randomUUID().toString().toLowerCase().replaceAll("-", "");
      CorrelationData correlationData = new CorrelationData(msgId);
      rabbitTemplate.convertAndSend(MqConfigV2.TEST_QUEUE_KEY_V1,msg.getBytes(StandardCharsets.UTF_8),correlationData);
    }
    return R.ok("success!!!");
}2、生产者配置


3、队列


4、消费者

@RabbitListener(queues = MqConfigV2.TEST_QUEUE_KEY_V1,containerFactory = "customContainerFactory")
@RabbitHandlerpublic void test4(Message message, Channel channel, @Headers Map<String, Object> heads) throws Exception {
    String data = new String(message.getBody(), StandardCharsets.UTF_8);
    System.out.println(MqConfigV2.BASE_YD_QUEUE+" 消息接收=" + data);
    if("error".equals(data)){
      throw new RuntimeException("系统报错了!!!");
    }
    //模拟业务处理
    Thread.sleep(1000);
    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}2、重现

1、先发送10条正常消息

系统正常



2、发送一条error消息

系统报错,消费失败,未正常ack,mq中出现一条unacked



3、再发送10条正常消息

由于只有一条unacked消息,小于配置的prefetch=2
系统正常消费



4、再发送一条error

MQ出现两条unacked



5、发送10条正常的消息

消费者不消费了,MQ消息也堵塞了,因为unacked=2,大于等于prefetch=2
问题重现成功了



其他博客的描述



来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
页: [1]
查看完整版本: RabbitMQ常见问题