SpringBoot 整合RabbitMQ 之:消费者消费消息确认

2020-12-01 0 By admin

为了保证消息从队列可靠地达到消费者,RabbitMQ 提供了消息确认机制(message acknowledgement)。

  1. 消费者在订阅队列时,当指定 autoACK 为 true(默认);则 RabbitMQ 会自动把发送出去的消息设置为确认。
  2. 如果指定 autoACK 为 false,RabbitMQ 会等待消费者显式地回复确认信号才会将消息移除。
  3. 当 autoACK 为 false,RabbitMQ 会一直等待消息确认;未等到消息确认前,只有消费者连接断开才能使 RabbitMQ 将消息转给新的消费者。

一、SpringBoot 整合RabbitMQ 消费者

SpringBoot 服务整合 RabbitMQ 消费者,分为设置配置项和代码实现两个过程。

1.1、消费者手动显式确认配置

SpringBoot 服务消费者客户端订阅 RabbitMQ 服务队列时,设置为手动确认(manual)。需要注意的时,消息监听有两个实现容器,Simple 和 Direct。

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manual
        concurrency: 1
        max-concurrency: 10
        prefetch: 10

1.2、消费者手动显式确认实现

RabbitListener 是Springboot RabbitMQ中经常用到的一个注解,将被 RabbitListener 注解的类和方法封装成 MessageListener 注入 MessageListenerContainer。

  1. 当RabbitListener注解在方法上时,对应的方式就是Rabbit消息的监听器。
  2. 当RabbitListener注解在类上时,和RabbitHandle注解配合使用,可以实现不同类型的消息的分发,类中被RabbitHandle注解的方法就是Rabbit消息的监听器。

使用 @RabbitHandler 注解标注的方法要增加 channel(信道)、message 两个参数。

@RabbitHandler
@RabbitListener(queues = "dingtalk-default")//监听的队列名称
public void processdefault(Message message, Channel channel) throws IOException {
    logger.info("消费者收到消息:{}", (new String(message.getBody())));
    Boolean sendSuccess = msgSend2.send((JSONObject) JSONObject.parse(new String(message.getBody())));
    if(sendSuccess){
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    }else if (message.getMessageProperties().getRedelivered()){
        logger.error("消息已重复处理失败,拒绝再次接收");
        channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
    }else{
        logger.error("消息即将再次返回队列处理");
        channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
    }
}

消息接收确认要比消息发送确认简单一点,因为只有一个消息回执(ack)的过程。