SpringBoot 整合RabbitMQ 之:消费者动态监听队列

2020-12-02 0 By admin

之前的文档中,已经介绍过 RabbitMQ 消费者通过 @RabbitListener 和 @RabbitHandler 注解,监听消息队列并消费消息的操作。不过使用这种方式,绑定消息队列是要通过代码层设置写死实现。
如果在工作中,需要动态的监听某个 RabbitMQ 的消息队列,或者通过配置文件获取要绑定的消息队列;这样就需要我们另外想办法解决;@RabbitListener 注解应该不能解决。

1.1、情况梳理

  1. @RabbitListener 注解只能固定(代码中写死)监听某个队列,不能解决动态绑定的需求。(好像也可以通过@Value 方式,动态获取;但不是想要的方式)。
  2. SpringBoot 整合RabbitMQ 服务过程中,只有SimpleMessageListenerContainer 等类具有 setQueueNames 方法,用于设置监听的消息队列。
  3. SimpleMessageListenerContainer 类实例对象有 setMessageListener 方法,用于设置监听消息对象。
  4. 监听消息对象应该从ChannelAwareMessageListener 类中继承,需要重写onMessage()方法;设置两个Message, Channel 形参。

注意:因为在消费者代码中,我们通过配置类创建了 SimpleMessageListenerContainer 对象并设置了属性。所以在配置文件中设置的 spring.rabbitmq.listener.simple.acknowledge-mode 参数,应该会失效。

一、消费者动态监听队列

这里举例实现的消费者动态监听消息队列,是从配置文件中获取队列列表;创建 SimpleMessageListenerContainer 对象,使其监听这些队列,设置 setMessageListener 消息监听类,来处理接受到的消息。

1.1、要监听的队列名存储变量

将监听的队列名,存储在一个字符串中,通过逗号分隔。

public String getKeys(){
    String keysString = "";
    if (urls.size() >= 1){
        for(Map.Entry<String,String> entry : urls.entrySet()){
            keysString = keysString + "dingtalk-" + entry.getKey() + ",";
        }
        return keysString.substring(0,keysString.length() -1 );
    }else {
        return "default";
    }
}

1.2、创建 SimpleMessageListenerContainer Bean

@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(HandleServer handleServer){
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
    container.setConcurrentConsumers(1);       //初始化消费者
    container.setMaxConcurrentConsumers(10);   //最多消费者
    container.setQueueNames(mapConfig.getKeys().split(","));  // 指定监听的队列s
    container.setAcknowledgeMode(AcknowledgeMode.MANUAL);  //消费者主动确认
    container.setExposeListenerChannel(true);      
    container.setMessageListener(handleServer);    //指定消息监听对象
    return container;
}

1.3、编写 消息监听对象

@Service
public class HandleServer implements ChannelAwareMessageListener {
    @Autowired
    private MsgSend2 msgSend2;
    private Boolean sendSuccess;
    private static final Logger logger = LoggerFactory.getLogger(HandleServer.class);
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        logger.info("消费者收到消息:{}", (new String(message.getBody())));
        sendSuccess = msgSend2.send((JSONObject) JSONObject.parse(new String(message.getBody())));
        if(sendSuccess){
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        }else if (message.getMessageProperties().getRedelivered()){
            logger.error("消息已重复处理失败,拒绝再次接收");
            channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
        }else{
            logger.error("消息即将再次返回队列处理");
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
        }
    }
}