SpringBoot 整合RabbitMQ 之:生产者发送消息确认
在 RabbitMQ 消息队列使用过程中,为了保证生产者发送消息到 RabbitMQ 服务过程中消息不丢失,需要使用【生产者确认】机制。
生产者确认机制有两种方式:事务机制和发送方确认机制。事务机制会在 AMQP 协议中增加确认的数据包,效率较低,这里不做考虑。
一、发送方确认机制
默认情况下,生产者和 RabbitMQ 服务间创建的信道 Channel 不是 Confirm (确认)模式;即生产者将消息发送给 RabbitMQ 服务后,就不再关心消息是否真正的到达了 RabbitMQ(如突然宕机等)。
为了避免生产者向 RabbitMQ 服务发送消息过程中的数据丢失,我们需要将他们创建的信道设置为 Confirm (确认)模式。
1.1、信道的 Confirm 确认模式
一旦信道进入 confirm 模式,所有在信道上发布的消息都会被指派一个唯一的 ID(从 1 开始)。
- 一旦消息被投递给所有匹配的队列后,RabbitMQ 会发送一个确认(Basic.ACK)给生产者(包含消息的唯一 ID)。
- 如果消息和队列是持久化的,那么确认消息只会在队列将消息写入磁盘后才会发出。
- 如果 RabbitMQ 发生内部错误从而导致消息丢失,会发送一条 nack(not acknowledged,未确认)消息给生产者。
- RabbitMQ 传回给生产者的确认消息中的 deliveryTag 包含了确认消息的序号;通过这个需要可以实现批量发送和确认消息。
1.2、生产者确认方式:串行确认
生产者每发送一条消息,调用waitForConfirms()方法等待服务端confirm;这实际上是一种串行的confirm。
每 publish 一条消息之后就等待服务端 confirm,如果服务端返回false或者超时时间内未返回,客户端进行消息重传。
for(int i = 0;i < count;i++)
{
channel.basicPublish(exchangeName, routingKey,null, ("第"+(i+1)+"条消息").getBytes());
if(channel.waitForConfirms())
{
System.out.println("发送成功");
}
}
1.3、生产者确认方式:批量确认
生产者每发送一批消息之后,调用waitForConfirms()方法,等待服务端confirm,这种批量确认的模式极大的提高了confirm 效率。
但是如果一旦出现confirm返回false或者超时的情况,客户端需要将这一批次的消息全部重发,这会带来明显的重复消息,如果这种情况频繁发生的话,效率也会不升反降。
RabbitMQ 服务默认对生产者的确认方式就是批量确认;即消息传输中附件参数 Multiple:true。
for(int i = 0;i < count;i++)
{
channel.basicPublish(exchangeName, routingKey,MessageProperties.PERSISTENT_BASIC, ("第"+(i+1)+"条消息").getBytes());
}
long start = System.currentTimeMillis();
channel.waitForConfirmsOrDie();
1.4、生产者确认方式:异步确认
发送方确认模式是异步的,生产者应用程序在等待确认的同时,可以继续发送消息。当确认消息到达生产者应用程序,生产者应用程序的回调方法就会被触发来处理确认消息。
二、SpringBoot 整合生产者发送消息确认
SpringBoot 框架整合 RabbitMQ 消息队列开发过程中,主要关注的是 RabbitMQ 服务对象特定设置的配置文件和对象类封装 rabbitTemplate 的使用方法。
2.1、SpringBoot 配置文件 publisherConfirm
spring.rabbitmq.publisher-confirm-type
发布确认属性有三种确认类型:None,Correlated,Simple
。
配置项含义:生产者将消息发送到 RabbitMQ 服务的 Exchange 交换器。
2.1.1、None 值是禁用发布确认模式,是默认值。
Publisher confirms are disabled (default).
2.1.2、Correlated 值是发布消息成功到交换器后会触发回调方法 。
Use {@code RabbitTemplate#waitForConfirms()} (or {@code waitForConfirmsOrDie()} within scoped operations.
2.1.3、Simple 值经测试有两种效果
Use with {@code CorrelationData} to correlate confirmations with sent messsages.
其一:效果和CORRELATED值一样会触发回调方法,
其二:在发布消息成功后使用rabbitTemplate调用waitForConfirms或waitForConfirmsOrDie方法等待 broker节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是waitForConfirmsOrDie方法如果返回false则会关闭channel,则接下来无法发送消息到broker。
2.2、SpringBoot 配置文件 publisherReturns
spring.rabbitmq.publisher-returns: true
配置含义:生产者发送的消息,经 Exchange 交换器路由到某个Queue队列中。
如果消息没有到达Queue,则将消息返回给生产者(需要和Mandatory 参数一起用)。
2.3、SpringBoot 设置RabbitMQ 服务属性
spring.rabbitmq.template.mandatory:true
当 mandatory 参数设置为true时,交换器无法根据自身的类型和路由键找到一个符合条件对队列,那么 RabbitMQ 会调用 Basic.Return 命令将消息返回给生产者。
当 mandatory 参数设置为 false 时,出现上述情况,则消息直接被丢弃。
2.4、rabbitTemplate 生产者发送消息确认
2.4.1、简单的实现方法如下:
boolean sendFlag = rabbitTemplate.invoke(operations -> {
rabbitTemplate.convertAndSend("dingtalkExchange2", jsonMessage.getString("target"), jsonMessage);
return rabbitTemplate.waitForConfirms(2*1000);
});
if (sendFlag){
logger.info("Sent message : ok ");
} else {
logger.info("Sent message : fail ");
}
2.4.2、更加合理的实现方式如下:
/**
* 设置生产者的生产消息的ack信息回调(公共处理)
*/
@Bean
public RabbitTemplate.ConfirmCallback confirmCallback(){
return (correlationData, ack, cause)->{
//我们可以通过correlationData原始数据 来对消息进行后续处理,但是这是有个要求在于发送必须使用CorrelationData类
if(ack){
logger.info("消息发送成功!!!!!,消息data:{},时间:{}",correlationData,System.currentTimeMillis());
}else {
logger.error("消息发送失败!!!!,原因是:{}",cause);
}
};
}
/**
* 发送者失败通知
*/
@Bean
public RabbitTemplate.ReturnCallback returnCallback(){
//构建一个
return (Message message, int replyCode, String replyText, String exchange, String routingKey)->{
logger.error("发送者路由失败,请检查路由 Returned replyCode:{} Returned replyText:{} Returned routingKey:{} Returned message:{}"
, replyCode,replyText,routingKey,new String(message.getBody()));
};
}
@Bean
public RabbitTemplate rabbitTemplate() {
/* // 可以在配置项中设置,或者在实例中设置。
//若使用confirm-callback ,必须要配置publisherConfirms 为true
connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
//若使用return-callback,必须要配置publisherReturns为true
connectionFactory.setPublisherReturns(true);*/
final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(producerJackson2MessageConverter());
rabbitTemplate.setConfirmCallback(confirmCallback());
rabbitTemplate.setReturnCallback(returnCallback());
rabbitTemplate.setMandatory(true);
return rabbitTemplate;
}
@Bean
public Jackson2JsonMessageConverter producerJackson2MessageConverter(){
return new Jackson2JsonMessageConverter();
}