Spring boot 整合 RabbitMQ 消息队列介绍
Spring boot 整合 RabbitMQ,本示例采用 RabbitTemplate 发送消息,采用 @RabbitListener 接受消息。
一、依赖包和配置项
1.1、在 pom 文件中添加依赖信息
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
1.2、添加配置项(自动装配)
需要在配置文件中设置 RabbitMQ 的服务器、端口号、用户名和密码等信息。
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
二、自定义注入配置 Bean 相关组件
在 Spring boot 整合 RabbitMQ 的项目中,为了方便使用 RabbitMQ 的相关操作组件和跟踪消息在发送过程中的状态,可以在项目中自定义注入和配置 Bean 相关组件。下面设置自定义配置的 Bean 组件放到 RabbitmqConfig 配置类中。
package com.example.mgt.examplemgtservicek8stest.config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
/**
* RabbitMQ 自定义注入配置 Bean 相关组件
* @since 2020/5/15 13:52
*/
@Configuration
public class RabbitmqConfig {
//定义日志
private static final Logger log = LoggerFactory.getLogger(RabbitmqConfig.class);
//自动装配 RabbitMQ 的链接工厂实例
@Autowired
private CachingConnectionFactory connectionFactory;
//自动装配消息监听器所在的容器工厂配置类实例
@Autowired
private SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer;
//下面为单一消费者实例的配置
@Bean(name="singleListenerContainer")
public SimpleRabbitListenerContainerFactory listenerContainer(){
//定义消息监听器所在的容器工厂
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
//设置容器工厂所用的实例
factory.setConnectionFactory(connectionFactory);
//设置消息在传输中的格式,这里采用 JSON 的格式进行传输
factory.setMessageConverter(new Jackson2JsonMessageConverter());
//设置并发消费者实例的初始数量为1
factory.setConcurrentConsumers(1);
//设置并发消费者实例中最大数量为1
factory.setMaxConcurrentConsumers(1);
//设置并发消费者实例中每个实例拉取的消息数量为1个
factory.setPrefetchCount(1);
return factory;
}
//自定义配置 RabbitMQ 发送消息的操作组件 RabbitTemplate
@Bean
public RabbitTemplate rabbitTemplate(){
//设置:发送消息后进行确认
connectionFactory.setPublisherConfirms(true);
//设置:发送消息后返回确认信息
connectionFactory.setPublisherReturns(true);
//构造发送消息组件实例对象
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(
CorrelationData correlationData, boolean b, String s) {
if(b){
log.info("消息发送成功:correlationData({}),ack({}),cause({})",correlationData,b,s);
}else{
System.out.println("消息确认失败");
}
}
});
// 设置消息收到确认
rabbitTemplate.setMandatory(true);
/* rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(
Message message, int i, String s, String s1, String s2) {
log.info("消息发送失败");
}
});*/
return rabbitTemplate;
}
}
1、当mandatory标志位设置为true时,如果exchange根据自身类型和消息routingKey无法找到一个合适的queue存储消息,那么broker会调用basic.return方法将消息返还给生产者;
2、当mandatory设置为false时,出现上述情况broker会直接将消息丢弃;
三、RabbitMQ 发送、接受消息实战
Spring boot 整合 RabbitMQ 的正式代码编写前,需要创建队列、交换机、路由及绑定操作。以生产者发送一个简单字符串为例。
3.1、在 RabbitmqConfig 类中创建需要对象
最先在 RabbitmqConfig 类中创建队列、交换机、路由和绑定操作,如下:
// 定义读取配置文件的环境变量的实例
@Autowired
private Environment env;
//创建队列
@Bean(name = "basicQueue")
public Queue basicQueue(){
return new Queue(env.getProperty("mq.basic.info.queue.name"),true);
}
//创建交换机:以 DirectExchange 为例
@Bean
public DirectExchange basicExchange(){
return new DirectExchange(env.getProperty("mq.basic.info.exchange.name"),true,false);
}
//绑定
@Bean
public Binding basicBinding(){
return BindingBuilder.bind(basicQueue()).to(basicExchange()).
with(env.getProperty("mq.basic.info.routing.key.name"));
}
补充:上面 env 读取到的变量,都需要在配置文件 application.yml 中设置好。
mq:
basic:
info:
queue:
name: lyz.queue
exchange:
name: lyz.exchange
routing:
key:
name: lyz.key
3.2、生产者发送消息
package com.example.mgt.examplemgtservicek8stest.rabbitmq.publisher;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;
/**
* 生产者
* @since 2020/5/15 14:33
*/
@Component
public class BasicPublisher {
private static final Logger log = LoggerFactory.getLogger(BasicPublisher.class);
//定义 JSON 序列化和反序列化实例
@Autowired
private ObjectMapper objectMapper;
//定义 RabbitMQ 消息操作组件
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private Environment env;
public void sendMsg(String message) {
if(!Strings.isNullOrEmpty(message)){
try{
//定义传输格式为 JSON 字符串
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
//指定消息模型中的交换机
rabbitTemplate.setExchange(env.getProperty("mq.basic.info.exchange.name"));
//指定消息模型中的路由
rabbitTemplate.setRoutingKey(env.getProperty("mq.basic.info.routing.key.name"));
//将字符串转化为待发送的消息,即一串二进制的数据流
Message msg = MessageBuilder.withBody(message.getBytes("utf-8")).build();
//转化并发送消息
rabbitTemplate.convertAndSend(msg);
log.info("基本消息模型-生产者-发送消息:{}",message);
} catch (Exception e){
log.error("基本消息模型-生产者-发送消息异常:{}",message,e.fillInStackTrace());
}
}
}
}
3.3、消费者监听并接受消费消息
package com.example.mgt.examplemgtservicek8stest.rabbitmq.consumer;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
/**
* 消费者
* @since 2020/5/15 14:57
*/
@Component
public class BasicConsumer {
private static final Logger log = LoggerFactory.getLogger(BasicConsumer.class);
@Autowired
public ObjectMapper objectMapper;
//监听并接受消费队列中的消息-容器工厂 singleListenerContainer 在 RabbitmqConfig 类中定义
@RabbitListener(queues="${mq.basic.info.queue.name}",containerFactory = "singleListenerContainer")
//由于消息本质是一串二进制数据流,因而监听接受的消息采用字节数组接收
public void consumeMsg1(@Payload byte[] msg) {
try{
String message = new String(msg,"utf-8");
log.info("基本信息模型-消费者-监听到的消息: {}",message);
} catch (Exception e){
log.error("基本信息获取失败:",e.fillInStackTrace());
}
}
}
3.4、编写测试单元
package com.example.mgt.examplemgtservicek8stest;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.example.mgt.examplemgtservicek8stest.rabbitmq.publisher.BasicPublisher;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
/**
* RabbitMQ 的 JAVA 单元测试类
* @since 2020/5/15 15:13
*/
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest
public class RabbitmqTest {
private static final Logger log = LoggerFactory.getLogger(RabbitmqTest.class);
@Autowired
private ObjectMapper objectMapper;
@Autowired
private BasicPublisher basicPublisher;
@Test
public void test1() throws Exception{
String msg = "~~~~~~~~~~测试";
basicPublisher.sendMsg(msg);
}
}