Spring boot 整合 RabbitMQ 消息队列介绍

2020-05-15 0 By admin

Spring boot 整合 RabbitMQ,本示例采用 RabbitTemplate 发送消息,采用 @RabbitListener 接受消息。

一、依赖包和配置项

1.1、在 pom 文件中添加依赖信息

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

1.2、添加配置项(自动装配)

需要在配置文件中设置 RabbitMQ 的服务器、端口号、用户名和密码等信息。
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

二、自定义注入配置 Bean 相关组件

在 Spring boot 整合 RabbitMQ 的项目中,为了方便使用 RabbitMQ 的相关操作组件和跟踪消息在发送过程中的状态,可以在项目中自定义注入和配置 Bean 相关组件。下面设置自定义配置的 Bean 组件放到 RabbitmqConfig 配置类中。

package com.example.mgt.examplemgtservicek8stest.config;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;

/**
 * RabbitMQ 自定义注入配置 Bean 相关组件
 * @since 2020/5/15 13:52
 */
@Configuration
public class RabbitmqConfig {
  //定义日志
  private static final Logger log = LoggerFactory.getLogger(RabbitmqConfig.class);

  //自动装配 RabbitMQ 的链接工厂实例
  @Autowired
  private CachingConnectionFactory connectionFactory;
  //自动装配消息监听器所在的容器工厂配置类实例
  @Autowired
  private SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer;


//下面为单一消费者实例的配置
  @Bean(name="singleListenerContainer")
  public SimpleRabbitListenerContainerFactory listenerContainer(){
  //定义消息监听器所在的容器工厂
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
  //设置容器工厂所用的实例
    factory.setConnectionFactory(connectionFactory);
  //设置消息在传输中的格式,这里采用 JSON 的格式进行传输
    factory.setMessageConverter(new Jackson2JsonMessageConverter());
  //设置并发消费者实例的初始数量为1
    factory.setConcurrentConsumers(1);
  //设置并发消费者实例中最大数量为1
    factory.setMaxConcurrentConsumers(1);
  //设置并发消费者实例中每个实例拉取的消息数量为1个
    factory.setPrefetchCount(1);
    return factory;
  }

//自定义配置 RabbitMQ 发送消息的操作组件 RabbitTemplate
  @Bean
  public RabbitTemplate rabbitTemplate(){
  //设置:发送消息后进行确认
    connectionFactory.setPublisherConfirms(true);
  //设置:发送消息后返回确认信息
    connectionFactory.setPublisherReturns(true);
  //构造发送消息组件实例对象
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);

    rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
    @Override
      public void confirm(
          CorrelationData correlationData, boolean b, String s) {
        if(b){
          log.info("消息发送成功:correlationData({}),ack({}),cause({})",correlationData,b,s);
        }else{
          System.out.println("消息确认失败");
        }

      }
    });

// 设置消息收到确认
rabbitTemplate.setMandatory(true);
/*    rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
    @Override
      public void returnedMessage(
          Message message, int i, String s, String s1, String s2) {
        log.info("消息发送失败");
      }
    });*/

    return rabbitTemplate;
  }
}

1、当mandatory标志位设置为true时,如果exchange根据自身类型和消息routingKey无法找到一个合适的queue存储消息,那么broker会调用basic.return方法将消息返还给生产者;
2、当mandatory设置为false时,出现上述情况broker会直接将消息丢弃;

三、RabbitMQ 发送、接受消息实战

Spring boot 整合 RabbitMQ 的正式代码编写前,需要创建队列、交换机、路由及绑定操作。以生产者发送一个简单字符串为例。

3.1、在 RabbitmqConfig 类中创建需要对象

最先在 RabbitmqConfig 类中创建队列、交换机、路由和绑定操作,如下:

// 定义读取配置文件的环境变量的实例
  @Autowired
  private Environment env;
//创建队列
  @Bean(name = "basicQueue")
  public Queue basicQueue(){
    return new Queue(env.getProperty("mq.basic.info.queue.name"),true);
  }
//创建交换机:以 DirectExchange 为例
  @Bean
  public DirectExchange basicExchange(){
    return new DirectExchange(env.getProperty("mq.basic.info.exchange.name"),true,false);
  }
//绑定
  @Bean
  public Binding basicBinding(){
    return BindingBuilder.bind(basicQueue()).to(basicExchange()).
        with(env.getProperty("mq.basic.info.routing.key.name"));
  }

补充:上面 env 读取到的变量,都需要在配置文件 application.yml 中设置好。

mq:
  basic:
    info:
      queue:
        name: lyz.queue
      exchange:
        name: lyz.exchange
      routing:
        key:
          name: lyz.key

3.2、生产者发送消息

package com.example.mgt.examplemgtservicek8stest.rabbitmq.publisher;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;

/**
 * 生产者
 * @since 2020/5/15 14:33
 */
@Component
public class BasicPublisher {
  private static final Logger log = LoggerFactory.getLogger(BasicPublisher.class);
  //定义 JSON 序列化和反序列化实例
  @Autowired
  private ObjectMapper objectMapper;
  //定义 RabbitMQ 消息操作组件
  @Autowired
  private RabbitTemplate rabbitTemplate;

  @Autowired
  private Environment env;

  public void sendMsg(String message) {
    if(!Strings.isNullOrEmpty(message)){
      try{
	  //定义传输格式为 JSON 字符串
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
	  //指定消息模型中的交换机
        rabbitTemplate.setExchange(env.getProperty("mq.basic.info.exchange.name"));
	  //指定消息模型中的路由
        rabbitTemplate.setRoutingKey(env.getProperty("mq.basic.info.routing.key.name"));
	  //将字符串转化为待发送的消息,即一串二进制的数据流
        Message msg = MessageBuilder.withBody(message.getBytes("utf-8")).build();
	  //转化并发送消息
        rabbitTemplate.convertAndSend(msg);
        log.info("基本消息模型-生产者-发送消息:{}",message);
      } catch (Exception e){
        log.error("基本消息模型-生产者-发送消息异常:{}",message,e.fillInStackTrace());
      }
    }
  }
}

3.3、消费者监听并接受消费消息

package com.example.mgt.examplemgtservicek8stest.rabbitmq.consumer;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

/**
 * 消费者
 * @since 2020/5/15 14:57
 */
@Component
public class BasicConsumer {
  private static final Logger log = LoggerFactory.getLogger(BasicConsumer.class);
  @Autowired
  public ObjectMapper objectMapper;
//监听并接受消费队列中的消息-容器工厂 singleListenerContainer 在 RabbitmqConfig 类中定义
  @RabbitListener(queues="${mq.basic.info.queue.name}",containerFactory = "singleListenerContainer")
//由于消息本质是一串二进制数据流,因而监听接受的消息采用字节数组接收
  public void consumeMsg1(@Payload byte[] msg) {
    try{
      String message = new String(msg,"utf-8");
      log.info("基本信息模型-消费者-监听到的消息: {}",message);
    } catch (Exception e){
      log.error("基本信息获取失败:",e.fillInStackTrace());
    }
  }
}

3.4、编写测试单元

package com.example.mgt.examplemgtservicek8stest;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.example.mgt.examplemgtservicek8stest.rabbitmq.publisher.BasicPublisher;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

/**
 * RabbitMQ 的 JAVA 单元测试类
 * @since 2020/5/15 15:13
 */
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest
public class RabbitmqTest {
  private static final Logger log = LoggerFactory.getLogger(RabbitmqTest.class);
  @Autowired
  private ObjectMapper objectMapper;
  @Autowired
  private BasicPublisher basicPublisher;

  @Test
  public void test1() throws Exception{
    String msg = "~~~~~~~~~~测试";
    basicPublisher.sendMsg(msg);
  }
}