RabbitMQ高级特性中消息确认机制的深入探究

2个月前发布 gsjqwyl
10 0 0

文章标题:

深入剖析RabbitMQ高级特性之消息确认机制

文章内容:

目录

消息确认机制

RabbitMQ的消息确认机制

当生产者发送消息后,消息到达消费者端时,可能出现以下状况:

a. 消息成功被处理
b. 消息处理出现异常

RabbitMQ在向消费者发送消息后,会将该消息删除,若出现第二种情况便会导致消息丢失。那么如何确保消费端成功接收并正确处理消息呢?为保障消息从队列可靠传递至消费者,RabbitMQ提供了消息确认机制(message acknowledgement)。

消费者在订阅队列时,可指定autoAck参数,据此参数设置,消息确认机制分为两类:
自动确认:当autoAck为true时,RabbitMQ会自动将发送的消息标记为已确认,随后从内存(或磁盘)中删除,而不关心消费者是否真正消费到这些消息。此模式适用于对消息可靠性要求不高的场景。
手动确认:当autoAck为false时,RabbitMQ会等待消费者显式调用Basic.Ack命令回复确认信号后,才从内存(或磁盘)中移除消息。该模式适用于对消息可靠性要求较高的场景。

自动确认

源代码:

/**
 * Start a non-nolocal, non-exclusive consumer, with
 * a server-generated consumerTag.
 * @param queue the name of the queue
 * @param autoAck true if the server should consider messages
 * acknowledged once delivered; false if the server should expect
 * explicit acknowledgements
 * @param callback an interface to the consumer object
 * @return the consumerTag generated by the server
 * @throws java.io.IOException if an error is encountered
 * @see com.rabbitmq.client.AMQP.Basic.Consume
 * @see com.rabbitmq.client.AMQP.Basic.ConsumeOk
 * @see #basicConsume(String, boolean, String, boolean, boolean, Map, Consumer)
 */
String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;

代码示例:

DefaultConsumer consumer = new DefaultConsumer(channel) {
 @Override
 public void handleDelivery(String consumerTag, Envelope envelope, 
AMQP.BasicProperties properties, byte[] body) throws IOException {
 System.out.println("接收到消息: " + new String(body));
 }
};
channel.basicConsume(Constants.TOPIC_QUEUE_NAME1, true, consumer);

当autoAck参数设为false时,RabbitMQ服务端将队列中的消息分为两部分:
– 一是等待投递给消费者的消息。
– 二是已投递给消费者,但尚未收到消费者确认信号的消息。

若RabbitMQ始终未收到消费者的确认信号,且消费该消息的消费者已断开连接,RabbitMQ会将该消息重新放入队列,等待投递给下一个消费者,也有可能是原消费者。

从RabbitMQ的Web管理平台,可看到当前队列中Ready状态(等待投递给消费者的消息数)和Unacked状态(已投递给消费者但未收到确认信号的消息数)的消息数量。

手动确认

消费者收到消息后,可选择确认、直接拒绝或跳过。RabbitMQ提供了不同的确认应答方式,消费者客户端可调用对应channel的相关方法,共有以下三种:
1. 肯定确认:Channel.basicAck(long deliveryTag, boolean multiple)

RabbitMQ已知晓该消息且成功处理该消息,可将其丢弃。
参数说明:
– deliveryTag:消息的唯一标识,是单调递增的64位长整型值。deliveryTag在每个通道(Channel)上独立维护,因此在每个通道上唯一。消费者确认(ack)消息时,必须使用对应通道进行确认。
– multiple:是否批量确认。某些情况下,为减少网络流量,可对一系列连续的deliveryTag进行批量确认。值为true时,会一次性ack所有小于或等于指定deliveryTag的消息;值为false时,仅确认当前指定deliveryTag的消息。

deliveryTag是RabbitMQ消息确认机制的重要组成部分,确保消息传递的可靠性和顺序性。
2. 否定确认:Channel.basicReject(long deliveryTag, boolean requeue)

RabbitMQ在2.0.0版本引入Basic.Reject命令,消费者客户端可调用channel.basicReject方法告知RabbitMQ拒绝该消息。
参数说明:
– deliveryTag:参考channel.basicAck
– requeue:表示拒绝后该消息的处理方式。若requeue参数设为true,RabbitMQ会将该消息重新存入队列,以便发送给下一个订阅的消费者;若requeue参数设为false,RabbitMQ会将消息从队列移除,不发送给新消费者。
3. 否定确认:Channel.basicNack(long deliveryTag, boolean multiple, boolean requeue)

Basic.Reject命令一次只能拒绝一条消息,若要批量拒绝消息,可使用Basic.Nack命令。消费者客户端可调用channel.basicNack方法实现。
multiple参数设为true时,表示拒绝deliveryTag编号之前所有未被当前消费者确认的消息。

Spring-AMQP的消息确认机制

接下来基于SpringBoot演示消息确认机制,与RabbitMQ Java Client库有一定差异。Spring-AMQP对消息确认机制提供三种策略。

消息确认机制的三种策略解读:
1. AcknowledgeMode.NONE
此模式下,消息一旦投递给消费者,无论消费者是否成功处理消息,RabbitMQ都会自动确认消息并从队列移除。若消费者处理消息失败,消息可能丢失。
2. AcknowledgeMode.AUTO(默认)
此模式下,消费者在成功处理消息时会自动确认消息,若处理过程中抛出异常,则不确认消息。
3. AcknowledgeMode.MANUAL
手动确认模式下,消费者必须在成功处理消息后显式调用basicAck方法确认消息。若消息未被确认,RabbitMQ会认为消息未成功处理,且在消费者可用时重新投递该消息。此模式提高了消息处理的可靠性,即使消费者处理消息后失败,消息也不会丢失,而是可重新处理。

代码演示
常量类
public class Constants {
    public static final String ACK_QUEUE = "ack.queue";
    public static final String ACK_EXCHANGE = "ack.exchange";
}
声明队列和交换机并绑定二者关系
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import rabbitextensionsdemo.constant.Constants;

@Configuration
public class RabbitMQConfig {
    //消息确认
    @Bean("ackQueue")
    public Queue ackQueue() {
        return QueueBuilder.durable(Constants.ACK_QUEUE).build();
    }

    @Bean("directExchange")
    public DirectExchange directExchange() {
        return ExchangeBuilder.directExchange(Constants.ACK_EXCHANGE).build();
    }

    @Bean("ackBinding")
    public Binding ackBinding(@Qualifier("directExchange") DirectExchange directExchange, @Qualifier("ackQueue") Queue queue) {
        return BindingBuilder.bind(queue).to(directExchange).with("ack");
    }
}
声明RabbitTemplate
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitTemplateConfig {
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        return rabbitTemplate;
    }
}
编写生产消息代码
import jakarta.annotation.Resource;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import rabbitextensionsdemo.constant.Constants;


@RequestMapping("/producer")
@RestController
public class ProducerController {
    @Resource(name = "rabbitTemplate")
    private RabbitTemplate rabbitTemplate;

    @RequestMapping("/ack")
    public String ack() {
        rabbitTemplate.convertAndSend(Constants.ACK_EXCHANGE, "ack", "consumer ack mode test...");
        return "消息发送成功";
    }
}
AcknowledgeMode.NONE(演示)

添加配置:

spring:
  application:
    name: rabbit-extensions-demo
  rabbitmq:
    addresses: amqp://study:study@47.98.109.138:5672/extension
    listener:
      simple:
        acknowledge-mode: none
编写消费消息代码1
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import rabbitextensionsdemo.constant.Constants;

@Component
public class AckListener {

    @RabbitListener(queues = Constants.ACK_QUEUE)
    public void handMessage(Message message, Channel channel) throws UnsupportedEncodingException {

        //消费者逻辑
        System.out.printf("接收到消息: %s, deliveryTag: %d \n", new String(message.getBody(),"UTF-8"), message.getMessageProperties().getDeliveryTag());

        //进行业务逻辑处理
        System.out.println("业务逻辑处理");
        System.out.println("业务处理完成");
    }
}

发送消息:

消费消息:

编写消费消息代码2
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import rabbitextensionsdemo.constant.Constants;

@Component
public class AckListener {

    @RabbitListener(queues = Constants.ACK_QUEUE)
    public void handMessage(Message message, Channel channel)throws UnsupportedEncodingException {

        //消费者逻辑
        System.out.printf("接收到消息: %s, deliveryTag: %d \n", new String(message.getBody(),"UTF-8"), message.getMessageProperties().getDeliveryTag());

        //进行业务逻辑处理
        System.out.println("业务逻辑处理");
        int num = 3/0;
        System.out.println("业务处理完成");
    }
}

发送消息:

消费消息:

此时可见,消息虽被消费者接收,但因消费时异常导致未正常消费,队列中消息也消失。

AcknowledgeMode.AUTO(演示)

添加配置:

spring:
  application:
    name: rabbit-extensions-demo
  rabbitmq:
    addresses: amqp://study:study@47.98.109.138:5672/extension
    listener:
      simple:
        acknowledge-mode: auto
编写消费消息代码1
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import rabbitextensionsdemo.constant.Constants;

@Component
public class AckListener {

    @RabbitListener(queues = Constants.ACK_QUEUE)
    public void handMessage(Message message, Channel channel) throws UnsupportedEncodingException {

        //消费者逻辑
        System.out.printf("接收到消息: %s, deliveryTag: %d \n", new String(message.getBody(),"UTF-8"), message.getMessageProperties().getDeliveryTag());

        //进行业务逻辑处理
        System.out.println("业务逻辑处理");
        System.out.println("业务处理完成");
    }
}

生产消息:

消费消息:

编写消费消息代码2
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import rabbitextensionsdemo.constant.Constants;

@Component
public class AckListener {

    @RabbitListener(queues = Constants.ACK_QUEUE)
    public void handMessage(Message message, Channel channel)throws UnsupportedEncodingException {

        //消费者逻辑
        System.out.printf("接收到消息: %s, deliveryTag: %d \n", new String(message.getBody(),"UTF-8"), message.getMessageProperties().getDeliveryTag());

        //进行业务逻辑处理
        System.out.println("业务逻辑处理");
        int num = 3/0;
        System.out.println("业务处理完成");
    }
}

生产消息:

消费消息:

此时可见,消费消息时异常导致未正常消费,控制台不断打印日志,deleverTag值递增,队列中始终有一条消息,因AUTO机制下消费异常时不确认消息,消息会重新入队,直到消费方正常消费。

AcknowledgeMode.MANUAL(演示)

添加配置:

spring:
  application:
    name: rabbit-extensions-demo
  rabbitmq:
    addresses: amqp://study:study@47.98.109.138:5672/extension
    listener:
      simple:
        acknowledge-mode: manual
编写消费消息代码1
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import rabbitextensionsdemo.constant.Constants;


@Component
public class AckListener {
    @RabbitListener(queues = Constants.ACK_QUEUE)
    public void handMessage(Message message, Channel channel) throws Exception {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        //消费者逻辑
        System.out.printf("接收到消息: %s, deliveryTag: %d \n", new String(message.getBody(),"UTF-8"), message.getMessageProperties().getDeliveryTag());

        //进行业务逻辑处理
        System.out.println("业务逻辑处理");
        System.out.println("业务处理完成");

    }
}

生产消息:

消费消息:

![](https://i-blog.csdnimg.cn/direct/b8e276

© 版权声明

相关文章

没有相关内容!

暂无评论

none
暂无评论...