SpringBoot 整合RabbitMQ 之:消费者消费消息确认
为了保证消息从队列可靠地达到消费者,RabbitMQ 提供了消息确认机制(message acknowledgement)。
- 消费者在订阅队列时,当指定 autoACK 为 true(默认);则 RabbitMQ 会自动把发送出去的消息设置为确认。
- 如果指定 autoACK 为 false,RabbitMQ 会等待消费者显式地回复确认信号才会将消息移除。
- 当 autoACK 为 false,RabbitMQ 会一直等待消息确认;未等到消息确认前,只有消费者连接断开才能使 RabbitMQ 将消息转给新的消费者。
一、SpringBoot 整合RabbitMQ 消费者
SpringBoot 服务整合 RabbitMQ 消费者,分为设置配置项和代码实现两个过程。
1.1、消费者手动显式确认配置
SpringBoot 服务消费者客户端订阅 RabbitMQ 服务队列时,设置为手动确认(manual)。需要注意的时,消息监听有两个实现容器,Simple 和 Direct。
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual
concurrency: 1
max-concurrency: 10
prefetch: 10
1.2、消费者手动显式确认实现
RabbitListener 是Springboot RabbitMQ中经常用到的一个注解,将被 RabbitListener 注解的类和方法封装成 MessageListener 注入 MessageListenerContainer。
- 当RabbitListener注解在方法上时,对应的方式就是Rabbit消息的监听器。
- 当RabbitListener注解在类上时,和RabbitHandle注解配合使用,可以实现不同类型的消息的分发,类中被RabbitHandle注解的方法就是Rabbit消息的监听器。
使用 @RabbitHandler 注解标注的方法要增加 channel(信道)、message 两个参数。
@RabbitHandler
@RabbitListener(queues = "dingtalk-default")//监听的队列名称
public void processdefault(Message message, Channel channel) throws IOException {
logger.info("消费者收到消息:{}", (new String(message.getBody())));
Boolean sendSuccess = msgSend2.send((JSONObject) JSONObject.parse(new String(message.getBody())));
if(sendSuccess){
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}else if (message.getMessageProperties().getRedelivered()){
logger.error("消息已重复处理失败,拒绝再次接收");
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
}else{
logger.error("消息即将再次返回队列处理");
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
消息接收确认要比消息发送确认简单一点,因为只有一个消息回执(ack)的过程。