SpringBoot 整合RabbitMQ 之:生产者发送消息确认

2020-11-28 0 By admin

在 RabbitMQ 消息队列使用过程中,为了保证生产者发送消息到 RabbitMQ 服务过程中消息不丢失,需要使用【生产者确认】机制。
生产者确认机制有两种方式:事务机制和发送方确认机制。事务机制会在 AMQP 协议中增加确认的数据包,效率较低,这里不做考虑。

一、发送方确认机制

默认情况下,生产者和 RabbitMQ 服务间创建的信道 Channel 不是 Confirm (确认)模式;即生产者将消息发送给 RabbitMQ 服务后,就不再关心消息是否真正的到达了 RabbitMQ(如突然宕机等)。
为了避免生产者向 RabbitMQ 服务发送消息过程中的数据丢失,我们需要将他们创建的信道设置为 Confirm (确认)模式。

1.1、信道的 Confirm 确认模式

一旦信道进入 confirm 模式,所有在信道上发布的消息都会被指派一个唯一的 ID(从 1 开始)。

  1. 一旦消息被投递给所有匹配的队列后,RabbitMQ 会发送一个确认(Basic.ACK)给生产者(包含消息的唯一 ID)。
  2. 如果消息和队列是持久化的,那么确认消息只会在队列将消息写入磁盘后才会发出。
  3. 如果 RabbitMQ 发生内部错误从而导致消息丢失,会发送一条 nack(not acknowledged,未确认)消息给生产者。
  4. RabbitMQ 传回给生产者的确认消息中的 deliveryTag 包含了确认消息的序号;通过这个需要可以实现批量发送和确认消息。

1.2、生产者确认方式:串行确认

生产者每发送一条消息,调用waitForConfirms()方法等待服务端confirm;这实际上是一种串行的confirm。
每 publish 一条消息之后就等待服务端 confirm,如果服务端返回false或者超时时间内未返回,客户端进行消息重传。

for(int i = 0;i < count;i++)
{
	channel.basicPublish(exchangeName, routingKey,null, ("第"+(i+1)+"条消息").getBytes());
	if(channel.waitForConfirms())
	{
		System.out.println("发送成功");
	}
}

1.3、生产者确认方式:批量确认

生产者每发送一批消息之后,调用waitForConfirms()方法,等待服务端confirm,这种批量确认的模式极大的提高了confirm 效率。
但是如果一旦出现confirm返回false或者超时的情况,客户端需要将这一批次的消息全部重发,这会带来明显的重复消息,如果这种情况频繁发生的话,效率也会不升反降。
RabbitMQ 服务默认对生产者的确认方式就是批量确认;即消息传输中附件参数 Multiple:true。

for(int i = 0;i < count;i++)
{
	channel.basicPublish(exchangeName, routingKey,MessageProperties.PERSISTENT_BASIC, ("第"+(i+1)+"条消息").getBytes());
}
long start = System.currentTimeMillis();
channel.waitForConfirmsOrDie();

1.4、生产者确认方式:异步确认

发送方确认模式是异步的,生产者应用程序在等待确认的同时,可以继续发送消息。当确认消息到达生产者应用程序,生产者应用程序的回调方法就会被触发来处理确认消息。

二、SpringBoot 整合生产者发送消息确认

SpringBoot 框架整合 RabbitMQ 消息队列开发过程中,主要关注的是 RabbitMQ 服务对象特定设置的配置文件和对象类封装 rabbitTemplate 的使用方法。

2.1、SpringBoot 配置文件 publisherConfirm

spring.rabbitmq.publisher-confirm-type 发布确认属性有三种确认类型:None,Correlated,Simple

配置项含义:生产者将消息发送到 RabbitMQ 服务的 Exchange 交换器。

2.1.1、None 值是禁用发布确认模式,是默认值。

Publisher confirms are disabled (default).

2.1.2、Correlated 值是发布消息成功到交换器后会触发回调方法 。

Use {@code RabbitTemplate#waitForConfirms()} (or {@code waitForConfirmsOrDie()} within scoped operations.

2.1.3、Simple 值经测试有两种效果

Use with {@code CorrelationData} to correlate confirmations with sent messsages.
其一:效果和CORRELATED值一样会触发回调方法,
其二:在发布消息成功后使用rabbitTemplate调用waitForConfirms或waitForConfirmsOrDie方法等待 broker节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是waitForConfirmsOrDie方法如果返回false则会关闭channel,则接下来无法发送消息到broker。

2.2、SpringBoot 配置文件 publisherReturns

spring.rabbitmq.publisher-returns: true
配置含义:生产者发送的消息,经 Exchange 交换器路由到某个Queue队列中。
如果消息没有到达Queue,则将消息返回给生产者(需要和Mandatory 参数一起用)。

2.3、SpringBoot 设置RabbitMQ 服务属性

spring.rabbitmq.template.mandatory:true
当 mandatory 参数设置为true时,交换器无法根据自身的类型和路由键找到一个符合条件对队列,那么 RabbitMQ 会调用 Basic.Return 命令将消息返回给生产者。
当 mandatory 参数设置为 false 时,出现上述情况,则消息直接被丢弃。

2.4、rabbitTemplate 生产者发送消息确认

2.4.1、简单的实现方法如下:
boolean sendFlag = rabbitTemplate.invoke(operations -> {
    rabbitTemplate.convertAndSend("dingtalkExchange2", jsonMessage.getString("target"), jsonMessage);
    return rabbitTemplate.waitForConfirms(2*1000);
});
if (sendFlag){
    logger.info("Sent message : ok ");
} else {
    logger.info("Sent message : fail ");
}
2.4.2、更加合理的实现方式如下:
/**
 * 设置生产者的生产消息的ack信息回调(公共处理)
 */
@Bean
public RabbitTemplate.ConfirmCallback confirmCallback(){
    return (correlationData, ack, cause)->{
        //我们可以通过correlationData原始数据 来对消息进行后续处理,但是这是有个要求在于发送必须使用CorrelationData类
        if(ack){
            logger.info("消息发送成功!!!!!,消息data:{},时间:{}",correlationData,System.currentTimeMillis());
        }else {
            logger.error("消息发送失败!!!!,原因是:{}",cause);
        }
    };
}
/**
* 发送者失败通知
*/
    @Bean
    public RabbitTemplate.ReturnCallback returnCallback(){
        //构建一个
        return (Message message, int replyCode, String replyText, String exchange, String routingKey)->{
            logger.error("发送者路由失败,请检查路由 Returned replyCode:{} Returned replyText:{} Returned routingKey:{} Returned message:{}"
                    ,  replyCode,replyText,routingKey,new String(message.getBody()));
        };
    }

@Bean
    public RabbitTemplate rabbitTemplate() {
/*      // 可以在配置项中设置,或者在实例中设置。
        //若使用confirm-callback ,必须要配置publisherConfirms 为true
        connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
        //若使用return-callback,必须要配置publisherReturns为true
        connectionFactory.setPublisherReturns(true);*/
        final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(producerJackson2MessageConverter());
        rabbitTemplate.setConfirmCallback(confirmCallback());
        rabbitTemplate.setReturnCallback(returnCallback());
        rabbitTemplate.setMandatory(true);
        return rabbitTemplate;
    }

@Bean
    public Jackson2JsonMessageConverter producerJackson2MessageConverter(){
        return new Jackson2JsonMessageConverter();
    }