RabbitMQ常见问题

3周前发布 gsjqwyl
17 0 0

RabbitMQ常见堵塞问题剖析

1、一次线上RabbitMQ阻塞事件的处理

当时解决问题所参考的文档:https://www.codenong.com/cs109484329/

1、背景情况

RabbitMQ被用于将外省市的运单数据同步到本系统当中。

2、问题呈现

某日清晨,运维群里有众多企业反馈称,在系统里无法查找到自己最新的运单。赶到公司后查看系统,发现自昨晚10:30之后,就再没有新的运单新增了。首先推测是交通部没有将运单推送到MQ的队列里,查看服务器日志,发现没有MQ消费相关的日志,最早的MQ消费日志停留在昨晚10:30左右。让运维同事查看后,发现MQ中堆积了一两千条还未消费的消息。

并且在10:30左右的日志里出现了大量报错日志。经过对错误日志的分析,发现彼时MQ消费消息时出现了业务报错。

3、问题解决

1、依据报错的原因,解决了报错问题。解决报错后将代码发布到线上,发现消费端又开始正常消费了,经过一天的观察,MQ消费状态恢复正常。

4、问题产生原因

后来有时间进行了思考,得出MQ不消费的原因:MQ设置的是手动应答模式,由于消费端消费出现报错,没有正常进行ack确认,导致MQ中出现了很多unacked状态的消息。这时MQ会认为消费端已经没有能力再去消费消息了,就不会再向消费者发送消息了,而消息生产者还在持续向MQ推送消息,使得ready状态的消息越来越多,却又得不到消费,从而引发了消息堵塞。

其实这是MQ的一种保护消费者的机制——QOS(服务质量保证)。

5、QOS(服务质量保证)

在手动应答模式下启用该机制,当消费端出现大量报错且无法正常ack确认时,MQ中会有一定数量的unacked消息,MQ为了保护消费端不再出现报错情况,就不会再向消费者发送消息,以此来保障消费端服务能够正常运行。可以通过设置参数PrefetchCount(spring.rabbitmq.listener.simple.prefetch)来设置MQ支持的最大未正常确认消息数量。

spring:
  # 消息队列
  rabbitmq:
    host: 1.1.1.1
    port: 5672
    username: 1
    password: 1
    #虚拟主机,用于隔离业务
    virtual-host: 1
    # 消息发送确认
    publisher-confirm-type: correlated
    # 开启发送失败退回
    publisher-returns: true
    listener:
      simple:
        # 消费端最小并发数
        concurrency: 1
        # 消费端最大并发数
        max-concurrency: 5
        # 一次请求中预处理的消息数量
        prefetch: 2
        # 手动应答
        acknowledge-mode: manual
        # 重试配置
        retry:
          enabled: true
          max-attempts: 3
          initial-interval: 5000ms
          max-interval: 1200000ms
          multiplier: 2

比如上面设置Prefetch=2,那么当有两个消息没有正常ack确认时,MQ就会停止发送消息。

6、为何重启后消息又能正常消费

因为重启之后,unacked状态的消息会重新排到队列的开头重新被消费,这样后面正常的消息就能够继续被推送过来。

7、如何判断是否存在堵塞风险

参考:https://www.codenong.com/cs109484329/

堵塞是因为unacked数量达到了限制。允许出现的unacked数量可以通过channelCount * prefetchCount * 节点数量来计算得出。channlCount由concurrency和max-concurrency决定。所以:

min = concurrency * prefetch * 节点数量
max = max-concurrency * prefetch * 节点数量

结论:
– unacked_msg_count < min时,队列不会阻塞,但需要及时处理unacked状态的消息。
– unacked_msg_count >= min时,可能会出现堵塞情况。
– unacked_msg_count >= max时,队列一定会阻塞。

消费者消费MQ消息时存在一个缓冲池,会一次性拉取一批消息到缓冲池中,消费者从缓冲池中消费消息,缓冲池的大小=max-concurrency(最大并发数)* prefetch(一次预处理消费的消息数)。消息在缓冲池中时,处于待消费状态,也就是unacked状态,所以缓冲池中的消息数量就是unacked的最大数量,如果unacked数量超过这个值,就会触发QPS保护。例如:max-concurrency=5,prefetch=20

8、事故重现
1、环境情况
1、生产者
@PostMapping(value = "/pushOkMsg")
public R<String> pushOkMsg(@RequestParam(value = "num")Integer num,@RequestParam(value = "msg")String msg){
    for (int integer = 0; integer < num; integer++) {
        String msgId = UUID.randomUUID().toString().toLowerCase().replaceAll("-", "");
        CorrelationData correlationData = new CorrelationData(msgId);
        rabbitTemplate.convertAndSend(MqConfigV2.TEST_QUEUE_KEY_V1,msg.getBytes(StandardCharsets.UTF_8),correlationData);
    }
    return R.ok("success!!!");
}
2、生产者配置
3、队列
4、消费者
@RabbitListener(queues = MqConfigV2.TEST_QUEUE_KEY_V1,containerFactory = "customContainerFactory")
@RabbitHandlerpublic void test4(Message message, Channel channel, @Headers Map<String, Object> heads) throws Exception {
    String data = new String(message.getBody(), StandardCharsets.UTF_8);
    System.out.println(MqConfigV2.BASE_YD_QUEUE+" 消息接收=" + data);
    if("error".equals(data)){
        throw new RuntimeException("系统报错了!!!");
    }
    //模拟业务处理
    Thread.sleep(1000);
    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
2、重现过程
1、先发送10条正常消息

系统运行正常。

2、发送一条error消息

系统报错,消费失败,未正常ack确认,MQ中出现一条unacked状态的消息。

3、再发送10条正常消息

由于只有一条unacked状态的消息,小于配置的prefetch=2,系统能够正常消费。

4、再发送一条error

MQ中出现两条unacked状态的消息。

5、发送10条正常的消息

消费者不再消费消息,MQ消息出现堵塞情况,因为unacked=2,大于等于prefetch=2,成功重现了堵塞问题。

其他博客的描述

© 版权声明

相关文章

暂无评论

暂无评论...