跳过正文

RabbitMQ使用 SpringBoot整合篇

·2823 字·6 分钟

无SpringBoot的基础使用见另一篇文章

引入依赖与配置
#

Maven添加Starter:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

SpringBoot的配置文件中:

spring:
  rabbitmq:
    host: 192.168.137.16
    port: 5672
    username: admin
    password: admin
# 不会有笨蛋直接COPY吧

不需要在生产者或消费者声明队列和交换机,被配置类替代。

我将以我看的教程的顺序,即以例子来逐步记录。

延迟队列
#

延迟队列 Pro
#

只需要在配置类中配置好复杂的死信队列结构,就可以轻松使用。

配置类
#

@Configuration //配置类
public class MyMqConfig {
    //结构与上图中的基本相同
    private static final String NORMAL_EXCHANGE = "X";
    private static final String DEAD_EXCHANGE = "Y";
    private static final String NORMAL_QUEUE_1 = "QA";
    private static final String NORMAL_QUEUE_2 = "QB";
    private static final String DEAD_QUEUE = "QD";
    //声明普通交换机
    @Bean("xExchange")
    public DirectExchange xExchange() {
        return new DirectExchange(NORMAL_EXCHANGE);
    }
    //声明死信交换机
    @Bean("yExchange")
    public DirectExchange yExchange() {
        return new DirectExchange(DEAD_EXCHANGE);
    }
    //声明两个普通队列
    @Bean("queueA")
    public Queue queueA() {
        return QueueBuilder
                .nonDurable(NORMAL_QUEUE_1)
                .ttl(10000)
                .deadLetterExchange(DEAD_EXCHANGE) //死信交换机与RoutingKey
                .deadLetterRoutingKey("YD")
                .build();
    }
    @Bean("queueB")
    public Queue queueB() {
        return QueueBuilder
                .nonDurable(NORMAL_QUEUE_2)
                .ttl(40000)
                .deadLetterExchange(DEAD_EXCHANGE)
                .deadLetterRoutingKey("YD")
                .build();
    }
    //声明死信队列
    @Bean("queueD")
    public Queue queueDead() {
        return QueueBuilder.nonDurable(DEAD_QUEUE).build();
    }
    //队列绑定
    @Bean
    public Binding queueABindingX(@Qualifier("queueA") Queue qA, @Qualifier("xExchange") DirectExchange ex) {
        return BindingBuilder.bind(qA).to(ex).with("XA");
    }
    @Bean
    public Binding queueBBindingX(@Qualifier("queueB") Queue qB, @Qualifier("xExchange") DirectExchange ex) {
        return BindingBuilder.bind(qB).to(ex).with("XB");
    }
    @Bean
    public Binding queueDBindingX(@Qualifier("queueD") Queue qD, @Qualifier("yExchange") DirectExchange ex) {
        return BindingBuilder.bind(qD).to(ex).with("YD");
    }
}

留意@Bean("")与@Qualifier("")的使用。

生产者
#

跟随教程使用Controller实现。

@RestController
@RequestMapping("/test")
public class TestController {
    //发送消息的关键
    @Resource
    private RabbitTemplate rabbitTemplate;

    //测试发送的接口
    @GetMapping("/send/{msg}")
    public void sendMessage(@PathVariable("msg") String message) {
        LoggerFactory.getLogger(TestController.class).info("{}: {}", new Date(), message);
        rabbitTemplate.convertAndSend("X", "XA", "Hi");
        rabbitTemplate.convertAndSend("X", "XB", "Hi");
    }
}

消费者(死信)
#

消费者使用使用注解实现监听某个队列。

@Slf4j
@Component
public class DeadLetterConsumer {
    @RabbitListener(queues = "QD") //监听QD
    public void receiveD(Message message, Channel channel) throws Exception {
        log.info("Consumer: {},{}", new String(message.getBody()), new Date());
    }
}

是的没错,使用@RabbitListener来声明一个方法作为Consumer,这里的参数列表自由发挥,一般来说需要一个Message。

延迟队列 Pro Plus
#

上面的延迟队列,你要新的延迟时间就要创建新的队列,太笨比。

应该由生产者指定过期时间,发送到一个没有设置过期时间的队列上(仍然没有消费者)。

配置类
#

只是减少了TTL设置:

@Configuration
public class MyMqConfig {
    public static final String DELAY_EXCHANGE = "normal_exchange";
    public static final String DELAY_QUEUE = "delay_queue";
    public static final String DELAY_ROUTINGKEY = "delay_key";
    public static final String DEAD_EXCHANGE = "dead_exchange";
    public static final String DEAD_ROUTINGKEY = "dead_key";
    public static final String DEAD_QUEUE = "dead_queue";
    @Bean("nExchange")
    public DirectExchange nExchange() {
        return new DirectExchange(DELAY_EXCHANGE);
    }
    @Bean("dExchange")
    public DirectExchange dExchange() {
        return new DirectExchange(DEAD_EXCHANGE);
    }
    @Bean("delay_queue")
    public Queue delayQueue() {
        return QueueBuilder
                .nonDurable(DELAY_QUEUE)
                .deadLetterExchange(DEAD_EXCHANGE)
                .deadLetterRoutingKey(DEAD_ROUTINGKEY)
                .build();
    }
    @Bean("dead_queue")
    public Queue deadQueue() {
        return QueueBuilder.nonDurable(DEAD_QUEUE).build();
    }
    @Bean
    public Binding queueBindingDelay(@Qualifier("delay_queue") Queue que, @Qualifier("nExchange") DirectExchange ex) {
        return BindingBuilder.bind(que).to(ex).with(DELAY_ROUTINGKEY);
    }
    @Bean
    public Binding queueBindingDead(@Qualifier("dead_queue") Queue que, @Qualifier("dExchange") DirectExchange ex) {
        return BindingBuilder.bind(que).to(ex).with(DEAD_ROUTINGKEY);
    }
}

生产者
#

@Slf4j
@RestController
@RequestMapping("/test")
public class TestController {
    @Resource
    private RabbitTemplate rabbitTemplate;
    //测试发送的接口
    @GetMapping("/send/{msg}/{ttl}")
    public String sendMessage(@PathVariable String msg, @PathVariable String ttl) {
        log.info("Send in TTL {} : {}", ttl, msg);
        rabbitTemplate.convertAndSend(
                MyMqConfig.DELAY_EXCHANGE,
                MyMqConfig.DELAY_ROUTINGKEY,
                msg.getBytes(StandardCharsets.UTF_8),
                message -> {
                    //设置TTL
                    message.getMessageProperties().setExpiration(ttl);//ms
                    return message;
                }
        );
        return "OK";
    }
}

关键在于MessagePostProcessor,其签名实际如下:

@FunctionalInterface
public interface MessagePostProcessor {
    Message postProcessMessage(Message var1) throws AmqpException;

    default Message postProcessMessage(Message message, Correlation correlation) {
        return this.postProcessMessage(message);
    }

    default Message postProcessMessage(Message message, Correlation correlation, String exchange, String routingKey) {
        return this.postProcessMessage(message, correlation);
    }
}

使用的是第一个,只有一个Message的“重载”,对消息进行操作原样返回即可。

问题
#

如果前面发的一条消息延迟很大,而后面的延迟很小,会堵在前面的消息等待TTL(队列先进先出特性)。

该问题不引入插件目前无解。

延迟队列 Pro Plus Max(基于插件)
#

要想解决上面的问题,需要插件来让RabbitMQ支持该功能。

插件安装
#

到RabbitMQ网站上下载,社区插件

也可以到Github,链接

藏得有点深。

找到插件目录:

rabbitmq-plugins directories -s
# 通常输出
#Plugin archives directory: /usr/lib/rabbitmq/plugins:/usr/lib/rabbitmq/lib/rabbitmq_server-3.11.2/plugins
#Plugin expansion directory: /var/lib/rabbitmq/mnesia/rabbit@debian-plugins-expand
#Enabled plugins file: /etc/rabbitmq/enabled_plugins

把下载的插件安装包丢到Plugin Archives目录。

启用:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

最后重启:

systemctl restart rabbitmq-server.service 

安装成功后,可在管理界面创建交换机时看到新的类型,x-delayed-message。这也意味着,延迟队列实现从利用死信队列变为使用交换机。新的结构如下:

配置类
#

新的交换机类型需要额外的配置,没有那个类了,需要自定义交换机。

@Configuration
public class MyMqConfig {

    public static final String DELAY_EXCHANGE = "delay_exchange";
    public static final String DELAY_QUEUE = "delay_queue";
    public static final String DELAY_RK = "delay_rk";

    @Bean
    public CustomExchange delayExchange() {
        //交换机参数填上类型,其实就是延迟+什么基础类型的交换机,可以是直接、扇出等
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");//一般用Direct
        //CustomExchange(String name, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments)
        //在第二个参数——交换机类型填上我们的新交换机类型
        return new CustomExchange(DELAY_EXCHANGE, "x-delayed-message", false, false, args);
    }

    @Bean
    public Queue delayQueue() {
        return QueueBuilder.nonDurable(DELAY_QUEUE).build();
    }

    @Bean
    public Binding delayExchangeBinding(@Qualifier("delayExchange") CustomExchange ex, @Qualifier("delayQueue") Queue que) {
        return BindingBuilder.bind(que).to(ex).with(DELAY_RK).noargs();
    }
}

留意@Bean与@Qualifier("")的使用。

生产者
#

只变了设置延迟的一行。

生产者发送消息设置Expiration变为设置Delay。

@Slf4j
@RestController
@RequestMapping("/test")
public class TestController {
    @Resource
    private RabbitTemplate rabbitTemplate;
    //测试发送的接口
    @GetMapping("/send/{msg}/{ttl}")
    public String sendMessage(@PathVariable String msg, @PathVariable String ttl) {
        log.info("Send in TTL {} : {}", ttl, msg);
        rabbitTemplate.convertAndSend(
                MyMqConfig.DELAY_EXCHANGE,
                MyMqConfig.DELAY_RK,
                msg.getBytes(StandardCharsets.UTF_8),
                message -> {
                    //从setExpiration变成了setDelay
                    message.getMessageProperties().setDelay(Integer.parseInt(ttl));//ms
                    return message;
                }
        );
        return "OK";
    }
}

延迟队列 总结
#

利用RabbitMQ特性实现需要延迟的场景,可以简化并且消息不会丢失(开了持久化)。

在开发中要使用延迟队列,首选是使用基于插件的方法,但如果每个任务的延迟是固定的那么用死信队列也行。

发布确认
#

交换机确认
#

和基础篇中的情况基本一致,把消息缓存起来以防交换机寄了没机会重传。

配置与配置类
#

需要在配置文件中启用发布确认:

spring:
  rabbitmq:
    publisher-confirm-type: correlated
# none:
# 关闭发布确认,默认值
# simple:
# "Use waitForConfirms() or waitForConfirmsOrDie() within scoped operations.",用在同步模式,这两个方法返回false会关闭channel无法再发送消息
# correlated:
# "Use with CorrelationData to correlate confirmations with sent messsages.",用在异步回调模式

结构是,简单的一生产者一消费者与一直接交换机和消费者的队列,串联起来,硬要看配置类的话就:

@Configuration
public class MyMqConfig {
    public static final String EXCHANGE_NAME = "E";
    public static final String ROUTING_KEY = "RK";
    public static final String QUEUE_NAME = "Q";
    @Bean
    public DirectExchange theExchange() {
        return ExchangeBuilder.directExchange(EXCHANGE_NAME).autoDelete().build();
    }
    @Bean
    public Queue theQueue() {
        return QueueBuilder.durable(QUEUE_NAME).autoDelete().build();
    }
    @Bean
    public Binding theBinding(@Qualifier("theExchange") DirectExchange exchange, @Qualifier("theQueue") Queue queue) {
        return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);
    }
}

留意ExchangeBuilder。

生产者
#

//设置确认回调方法,等同于异步回调
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
    //correlationData可能为null,这玩意是在生产者自己填写的,没填写就没有
    String id = (null == correlationData) ? "" : correlationData.getId();
    if (ack) log.info("Success {}", id);
    else log.error("Fail {}, Reason: {}", id, cause);
});
/*
 * Callback原型:
 * public interface ConfirmCallback {
 * void confirm (@Nullable CorrelationData correlationData,boolean ack,@Nullable String cause);
 * }
 *  correlationData     消息的ID及相关信息
 *  ack                 交换机是否收到消息
 *  cause               失败的原因,没失败为null
 */

rabbitTemplate.convertAndSend(
        MyMqConfig.EXCHANGE_NAME,
        MyMqConfig.ROUTING_KEY,
        msg.getBytes(StandardCharsets.UTF_8)
//      ,new CorrelationData(....)
);

记得填写CorrelationData。

如何重发消息请看基础篇,需要注意的是这里ID就不用什么deliverTag了,自己生成个填CorrelationData里吧。

可以尝试掐网线、写错交换机名称来模仿到交换机的丢包。

和基础篇中的一样,不可感知交换机后的如不可路由等失败的丢失。解决见下方。

回退消息(队列确认)
#

将交换机发送到队列失败的消息回退给生产者,好让生产者处理。

配置
#

需要配置:

spring:
  rabbitmq:
    publisher-returns: true

生产者
#

其他部分与上一个生产者的相同,设置回调如下所示:

//设置回退回调方法
rabbitTemplate.setReturnsCallback(returned -> {
    log.error("Returned: {}, Reason: {}, Exchange: {}, RoutingKey: {}",
            new String(returned.getMessage().getBody(), StandardCharsets.UTF_8),
            returned.getReplyText(),
            returned.getExchange(),
            returned.getRoutingKey());
});
//returned中储存的是被退回的消息的交换机路由键消息体(自己send的对象)原因(replyText)

当遇到上述所谓无法路由的情况时,该回调会打印:

Returned: aaa, Reason: NO_ROUTE, Exchange: E, RoutingKey: RK114514ERROR

#

消息序列化
#

RabbitMQ的MessageConvert接口处理消息的序列化,其实现有 SimpleMessageConverter(默认)、Jackson2JsonMessageConverter等。当调用了 convertAndSend 时会使用 MessageConvert 进行消息对象的序列化。

SimpleMessageConverter 对于要发送的消息体 body:

  • 为 byte[] 时不进行处理
  • 是 String 则转成字节数组
  • 是 Java 对象,则用JDK自带序列化将消息转成字节数组,性能较差

Jackson2JsonMessageConverter顾名思义转成JSON。

Sodacooky
作者
Sodacooky
奶盐味