SpringBoot 整合RabbitMQ:SimpleMessageListenerContainer原理分析

2020-12-01 0 By admin

SimpleMessageListenerContainer 是 spring 在 RabbitMQ 原生api基础上封装实现的一个消费工具类。
该类非常强大,可以实现:监听单个或多个队列、自动启动、自动声明,它还支持动态配置,如动态添加监听队列、动态调整并发数等等;基本上对RabbitMQ消费场景这个类都能满足。
如@RabbitListener、cloud-stream中StreamListener中底层实现都是基于该类,所以,理解SimpleMessageListenerContainer原理对理解spring rabbitmq中消费模型非常关键。

一、基本使用

1.1、添加/移除队列

SimpleMessageListenerContainer#addQueueNames()方法可以运行时添加监听队列,removeQueueNames()方法可以运行时移除监听队列;

1.2、后置处理器

SpringBoot 框架整合 RabbitMQ 服务,设置后置处理器:setAfterReceivePostProcessors()。

//后置处理器,接收到的消息都添加了Header请求头
container.setAfterReceivePostProcessors(message -> {
    message.getMessageProperties().getHeaders().put("desc",10);
    return message;
});
container.setMessageListener((MessageListener) message -> {
    System.out.println("====接收到消息=====");
    System.out.println(message.getMessageProperties());
    System.out.println(new String(message.getBody()));
});

1.3、消费者对象设置

对消费者对象设置Consumer_tag 和 Arguments。
container.setConsumerTagStrategy 可以设置消费者的 Consumer_tag, container.setConsumerArguments可以设置消费者的 Arguments。

container.setConsumerTagStrategy(queue -> "order_queue_"+(++count));
//设置消费者的Arguments
Map<String, Object> args = new HashMap<>();
args.put("module","订单模块");
args.put("fun","发送消息");
container.setConsumerArguments(args);

1.4、setConcurrentConsumers设置并发消费者

方法:setConcurrentConsumers 设置多个并发消费者一起消费,并支持运行时动态修改。
方法:setMaxConcurrentConsumers设置最多的并发消费者。

@Bean
public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.setQueueNames("dingtalk-test");
    container.setConcurrentConsumers(5);
    container.setMaxConcurrentConsumers(10);
    container.setMessageListener((MessageListener) message -> {
        System.out.println("====接收到消息=====");
        System.out.println(message.getMessageProperties());
        System.out.println(new String(message.getBody()));
    });
    return container;
}

二、核心原理

2.1、API结构

SimpleMessageListenerContainer类结构如下:

simpleMessageListenerContainer 类
simpleMessageListenerContainer 类

2.2、源码分析:方法入口

SimpleMessageListenerContainer类启动的入口是start()方法,该方法位于AbstractMessageListenerContainer类中:

public void start() {
    //如果已启动,则什么也不执行,直接退出
 if (isRunning()) {
  return;
 }
    //initialized是否执行初始化,没有则执行afterPropertiesSet()方法进行初始化,执行完成后initialized设置成true
 if (!this.initialized) {
  synchronized (this.lifecycleMonitor) {
   if (!this.initialized) {
    afterPropertiesSet();
   }
  }
 }
 try {
  logger.debug("Starting Rabbit listener container.");
        //验证RabbitAdmin,mismatchedQueuesFatal=true时,spring context中RabbitAdmin数量不能大于1
  configureAdminIfNeeded();
        //执行RabbitAdmin#initialize方法,spring context中注入的exchanges, queues and bindings执行声明式创建
        /*
        总结一下,我们发现,要想自动创建队列,SimpleMessageListenerContainer需要满足这么两点:
         mismatchedQueuesFatal属性设置为true
         autoDeclare属性也设置为true
        */
  checkMismatchedQueues();
        //启动核心
  doStart();
 }
 catch (Exception ex) {
  throw convertRabbitAccessException(ex);
 }
 finally {
  this.lazyLoad = false;
 }
}

SimpleMessageListenerContainer#doStart方法:

protected void doStart() {
 Assert.state(!this.consumerBatchEnabled || getMessageListener() instanceof BatchMessageListener
   || getMessageListener() instanceof ChannelAwareBatchMessageListener,
   "When setting 'consumerBatchEnabled' to true, the listener must support batching");
 //如果MessageListener是ListenerContainerAware,则进行expectedQueueNames校验
    checkListenerContainerAware();
    //调用父类doStart()方法,主要是active和running都设置成true
 super.doStart();
 synchronized (this.consumersMonitor) {
  if (this.consumers != null) {
   throw new IllegalStateException("A stopped container should not have consumers");
  }
        /*
        创建BlockingQueueConsumer类型consumer,每个concurrentConsumers并发对应创建一个对象,并存储到Set<BlockingQueueConsumer> consumers集合中,
        返回值就是创建consumer对象个数,具体创建逻辑见:SimpleMessageListenerContainer#createBlockingQueueConsumer,主要注意下prefetchCount计算:
        int actualPrefetchCount = getPrefetchCount() > this.batchSize ? getPrefetchCount() : this.batchSize;即如果prefetchCount大于batchSize,则其就是实际值,否则prefetchCount等于batchSize值
        */
  int newConsumers = initializeConsumers();
  if (this.consumers == null) {
   logger.info("Consumers were initialized and then cleared " +
     "(presumably the container was stopped concurrently)");
   return;
  }
  if (newConsumers <= 0) {
   if (logger.isInfoEnabled()) {
    logger.info("Consumers are already running");
   }
   return;
  }
        /*
        每个并发对应一个BlockingQueueConsumer对象,这里将每个BlockingQueueConsumer对象封装成AsyncMessageProcessingConsumer,这样可以丢到线程池中异步执行
        */
  Set<AsyncMessageProcessingConsumer> processors = new HashSet<AsyncMessageProcessingConsumer>();
  for (BlockingQueueConsumer consumer : this.consumers) {
            //将BlockingQueueConsumer对象封装成AsyncMessageProcessingConsumer进行异步执行
   AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer);
            //存储到processors集合中
   processors.add(processor);
            //将AsyncMessageProcessingConsumer丢到线程池中执行
   getTaskExecutor().execute(processor);
   if (getApplicationEventPublisher() != null) {
                //事件发送
    getApplicationEventPublisher().publishEvent(new AsyncConsumerStartedEvent(this, consumer));
   }
  }
        //判断启动过程中是否存在异常
  waitForConsumersToStart(processors);
 }
}

上面代码大致逻辑:BlockingQueueConsumer对象可以看成consumer,然后将其包装成AsyncMessageProcessingConsumer异步任务丢入到线程池中运行。

2.3、源码分析:异步任务

上面分析了BlockingQueueConsumer类型的consumer会被封装成AsyncMessageProcessingConsumer异步任务丢入到线程池中运行。
下面主要就来分析下异步任务执行时做了些什么,该逻辑在SimpleMessageListenerContainer.AsyncMessageProcessingConsumer#run方法,它主要做如下几个事情:

1、监听队列判断

//BlockingQueueConsumer.getQueueCount() < 1,表示当前consumer没有设置任何监听队列,则没必要启动
if (this.consumer.getQueueCount() < 1) {
  if (logger.isDebugEnabled()) {
   logger.debug("Consumer stopping; no queues for " + this.consumer);
  }
  SimpleMessageListenerContainer.this.cancellationLock.release(this.consumer);
  if (getApplicationEventPublisher() != null) {
   getApplicationEventPublisher().publishEvent(
     new AsyncConsumerStoppedEvent(SimpleMessageListenerContainer.this, this.consumer));
  }
  this.start.countDown();
  return;

2、核心逻辑

try {
 initialize();
 while (isActive(this.consumer) || this.consumer.hasDelivery() || !this.consumer.cancelled()) {
  mainLoop();
 }
}
catch (InterruptedException e) {
    省略。。。
}
catch (QueuesNotAvailableException ex) {
 省略。。。
}
省略。。。

AsyncMessageProcessingConsumer#run执行最核心逻辑就是上面try语句中,首先执行initialize()初始化方法,然后开始无限循环执行mainLoop()方法。

2.4、源码分析:初始化

上面分析的AsyncMessageProcessingConsumer运行时执行了两个关键操作:initialize()初始化操作和mainLoop()无限循环。
首先,来看下initialize()初始化操作主要干了些什么:

  1. 调用attemptDeclarations()方法进行声明式exchange、queue、bindings创建,主要通过执行RabbitAdmin#initialize方法实现;
  2. 调用BlockingQueueConsumer#start方法,该方法主要完成与Rabbit Broker指令交互

转发地址

simpleMessageListenerContainer 过程
simpleMessageListenerContainer 过程