SpringBoot 整合RabbitMQ:SimpleMessageListenerContainer原理分析
SimpleMessageListenerContainer 是 spring 在 RabbitMQ 原生api基础上封装实现的一个消费工具类。
该类非常强大,可以实现:监听单个或多个队列、自动启动、自动声明,它还支持动态配置,如动态添加监听队列、动态调整并发数等等;基本上对RabbitMQ消费场景这个类都能满足。
如@RabbitListener、cloud-stream中StreamListener中底层实现都是基于该类,所以,理解SimpleMessageListenerContainer原理对理解spring rabbitmq中消费模型非常关键。
一、基本使用
1.1、添加/移除队列
SimpleMessageListenerContainer#addQueueNames()方法可以运行时添加监听队列,removeQueueNames()方法可以运行时移除监听队列;
1.2、后置处理器
SpringBoot 框架整合 RabbitMQ 服务,设置后置处理器:setAfterReceivePostProcessors()。
//后置处理器,接收到的消息都添加了Header请求头
container.setAfterReceivePostProcessors(message -> {
message.getMessageProperties().getHeaders().put("desc",10);
return message;
});
container.setMessageListener((MessageListener) message -> {
System.out.println("====接收到消息=====");
System.out.println(message.getMessageProperties());
System.out.println(new String(message.getBody()));
});
1.3、消费者对象设置
对消费者对象设置Consumer_tag 和 Arguments。
container.setConsumerTagStrategy 可以设置消费者的 Consumer_tag, container.setConsumerArguments可以设置消费者的 Arguments。
container.setConsumerTagStrategy(queue -> "order_queue_"+(++count));
//设置消费者的Arguments
Map<String, Object> args = new HashMap<>();
args.put("module","订单模块");
args.put("fun","发送消息");
container.setConsumerArguments(args);
1.4、setConcurrentConsumers设置并发消费者
方法:setConcurrentConsumers 设置多个并发消费者一起消费,并支持运行时动态修改。
方法:setMaxConcurrentConsumers设置最多的并发消费者。
@Bean
public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames("dingtalk-test");
container.setConcurrentConsumers(5);
container.setMaxConcurrentConsumers(10);
container.setMessageListener((MessageListener) message -> {
System.out.println("====接收到消息=====");
System.out.println(message.getMessageProperties());
System.out.println(new String(message.getBody()));
});
return container;
}
二、核心原理
2.1、API结构
SimpleMessageListenerContainer类结构如下:

2.2、源码分析:方法入口
SimpleMessageListenerContainer类启动的入口是start()方法,该方法位于AbstractMessageListenerContainer类中:
public void start() {
//如果已启动,则什么也不执行,直接退出
if (isRunning()) {
return;
}
//initialized是否执行初始化,没有则执行afterPropertiesSet()方法进行初始化,执行完成后initialized设置成true
if (!this.initialized) {
synchronized (this.lifecycleMonitor) {
if (!this.initialized) {
afterPropertiesSet();
}
}
}
try {
logger.debug("Starting Rabbit listener container.");
//验证RabbitAdmin,mismatchedQueuesFatal=true时,spring context中RabbitAdmin数量不能大于1
configureAdminIfNeeded();
//执行RabbitAdmin#initialize方法,spring context中注入的exchanges, queues and bindings执行声明式创建
/*
总结一下,我们发现,要想自动创建队列,SimpleMessageListenerContainer需要满足这么两点:
mismatchedQueuesFatal属性设置为true
autoDeclare属性也设置为true
*/
checkMismatchedQueues();
//启动核心
doStart();
}
catch (Exception ex) {
throw convertRabbitAccessException(ex);
}
finally {
this.lazyLoad = false;
}
}
SimpleMessageListenerContainer#doStart方法:
protected void doStart() {
Assert.state(!this.consumerBatchEnabled || getMessageListener() instanceof BatchMessageListener
|| getMessageListener() instanceof ChannelAwareBatchMessageListener,
"When setting 'consumerBatchEnabled' to true, the listener must support batching");
//如果MessageListener是ListenerContainerAware,则进行expectedQueueNames校验
checkListenerContainerAware();
//调用父类doStart()方法,主要是active和running都设置成true
super.doStart();
synchronized (this.consumersMonitor) {
if (this.consumers != null) {
throw new IllegalStateException("A stopped container should not have consumers");
}
/*
创建BlockingQueueConsumer类型consumer,每个concurrentConsumers并发对应创建一个对象,并存储到Set<BlockingQueueConsumer> consumers集合中,
返回值就是创建consumer对象个数,具体创建逻辑见:SimpleMessageListenerContainer#createBlockingQueueConsumer,主要注意下prefetchCount计算:
int actualPrefetchCount = getPrefetchCount() > this.batchSize ? getPrefetchCount() : this.batchSize;即如果prefetchCount大于batchSize,则其就是实际值,否则prefetchCount等于batchSize值
*/
int newConsumers = initializeConsumers();
if (this.consumers == null) {
logger.info("Consumers were initialized and then cleared " +
"(presumably the container was stopped concurrently)");
return;
}
if (newConsumers <= 0) {
if (logger.isInfoEnabled()) {
logger.info("Consumers are already running");
}
return;
}
/*
每个并发对应一个BlockingQueueConsumer对象,这里将每个BlockingQueueConsumer对象封装成AsyncMessageProcessingConsumer,这样可以丢到线程池中异步执行
*/
Set<AsyncMessageProcessingConsumer> processors = new HashSet<AsyncMessageProcessingConsumer>();
for (BlockingQueueConsumer consumer : this.consumers) {
//将BlockingQueueConsumer对象封装成AsyncMessageProcessingConsumer进行异步执行
AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer);
//存储到processors集合中
processors.add(processor);
//将AsyncMessageProcessingConsumer丢到线程池中执行
getTaskExecutor().execute(processor);
if (getApplicationEventPublisher() != null) {
//事件发送
getApplicationEventPublisher().publishEvent(new AsyncConsumerStartedEvent(this, consumer));
}
}
//判断启动过程中是否存在异常
waitForConsumersToStart(processors);
}
}
上面代码大致逻辑:BlockingQueueConsumer对象可以看成consumer,然后将其包装成AsyncMessageProcessingConsumer异步任务丢入到线程池中运行。
2.3、源码分析:异步任务
上面分析了BlockingQueueConsumer类型的consumer会被封装成AsyncMessageProcessingConsumer异步任务丢入到线程池中运行。
下面主要就来分析下异步任务执行时做了些什么,该逻辑在SimpleMessageListenerContainer.AsyncMessageProcessingConsumer#run方法,它主要做如下几个事情:
1、监听队列判断
//BlockingQueueConsumer.getQueueCount() < 1,表示当前consumer没有设置任何监听队列,则没必要启动
if (this.consumer.getQueueCount() < 1) {
if (logger.isDebugEnabled()) {
logger.debug("Consumer stopping; no queues for " + this.consumer);
}
SimpleMessageListenerContainer.this.cancellationLock.release(this.consumer);
if (getApplicationEventPublisher() != null) {
getApplicationEventPublisher().publishEvent(
new AsyncConsumerStoppedEvent(SimpleMessageListenerContainer.this, this.consumer));
}
this.start.countDown();
return;
2、核心逻辑
try {
initialize();
while (isActive(this.consumer) || this.consumer.hasDelivery() || !this.consumer.cancelled()) {
mainLoop();
}
}
catch (InterruptedException e) {
省略。。。
}
catch (QueuesNotAvailableException ex) {
省略。。。
}
省略。。。
AsyncMessageProcessingConsumer#run执行最核心逻辑就是上面try语句中,首先执行initialize()初始化方法,然后开始无限循环执行mainLoop()方法。
2.4、源码分析:初始化
上面分析的AsyncMessageProcessingConsumer运行时执行了两个关键操作:initialize()初始化操作和mainLoop()无限循环。
首先,来看下initialize()初始化操作主要干了些什么:
- 调用attemptDeclarations()方法进行声明式exchange、queue、bindings创建,主要通过执行RabbitAdmin#initialize方法实现;
- 调用BlockingQueueConsumer#start方法,该方法主要完成与Rabbit Broker指令交互
