RabbitMQ使用 SpringBoot整合篇

无SpringBoot的基础使用见另一篇文章

引入依赖与配置

Maven添加Starter:

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

SpringBoot的配置文件中:

1
2
3
4
5
6
7
spring:
rabbitmq:
host: 192.168.137.16
port: 5672
username: admin
password: admin
# 不会有笨蛋直接COPY吧

不需要在生产者或消费者声明队列和交换机,被配置类替代。

我将以我看的教程的顺序,即以例子来逐步记录。

延迟队列

延迟队列 Pro

只需要在配置类中配置好复杂的死信队列结构,就可以轻松使用。

配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
@Configuration //配置类
public class MyMqConfig {
//结构与上图中的基本相同
private static final String NORMAL_EXCHANGE = "X";
private static final String DEAD_EXCHANGE = "Y";
private static final String NORMAL_QUEUE_1 = "QA";
private static final String NORMAL_QUEUE_2 = "QB";
private static final String DEAD_QUEUE = "QD";
//声明普通交换机
@Bean("xExchange")
public DirectExchange xExchange() {
return new DirectExchange(NORMAL_EXCHANGE);
}
//声明死信交换机
@Bean("yExchange")
public DirectExchange yExchange() {
return new DirectExchange(DEAD_EXCHANGE);
}
//声明两个普通队列
@Bean("queueA")
public Queue queueA() {
return QueueBuilder
.nonDurable(NORMAL_QUEUE_1)
.ttl(10000)
.deadLetterExchange(DEAD_EXCHANGE) //死信交换机与RoutingKey
.deadLetterRoutingKey("YD")
.build();
}
@Bean("queueB")
public Queue queueB() {
return QueueBuilder
.nonDurable(NORMAL_QUEUE_2)
.ttl(40000)
.deadLetterExchange(DEAD_EXCHANGE)
.deadLetterRoutingKey("YD")
.build();
}
//声明死信队列
@Bean("queueD")
public Queue queueDead() {
return QueueBuilder.nonDurable(DEAD_QUEUE).build();
}
//队列绑定
@Bean
public Binding queueABindingX(@Qualifier("queueA") Queue qA, @Qualifier("xExchange") DirectExchange ex) {
return BindingBuilder.bind(qA).to(ex).with("XA");
}
@Bean
public Binding queueBBindingX(@Qualifier("queueB") Queue qB, @Qualifier("xExchange") DirectExchange ex) {
return BindingBuilder.bind(qB).to(ex).with("XB");
}
@Bean
public Binding queueDBindingX(@Qualifier("queueD") Queue qD, @Qualifier("yExchange") DirectExchange ex) {
return BindingBuilder.bind(qD).to(ex).with("YD");
}
}

留意@Bean(“”)与@Qualifier(“”)的使用。

生产者

跟随教程使用Controller实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@RestController
@RequestMapping("/test")
public class TestController {
//发送消息的关键
@Resource
private RabbitTemplate rabbitTemplate;

//测试发送的接口
@GetMapping("/send/{msg}")
public void sendMessage(@PathVariable("msg") String message) {
LoggerFactory.getLogger(TestController.class).info("{}: {}", new Date(), message);
rabbitTemplate.convertAndSend("X", "XA", "Hi");
rabbitTemplate.convertAndSend("X", "XB", "Hi");
}
}

消费者(死信)

消费者使用使用注解实现监听某个队列。

1
2
3
4
5
6
7
8
@Slf4j
@Component
public class DeadLetterConsumer {
@RabbitListener(queues = "QD") //监听QD
public void receiveD(Message message, Channel channel) throws Exception {
log.info("Consumer: {},{}", new String(message.getBody()), new Date());
}
}

是的没错,使用@RabbitListener来声明一个方法作为Consumer,这里的参数列表自由发挥,一般来说需要一个Message。

延迟队列 Pro Plus

上面的延迟队列,你要新的延迟时间就要创建新的队列,太笨比。

应该由生产者指定过期时间,发送到一个没有设置过期时间的队列上(仍然没有消费者)。

配置类

只是减少了TTL设置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@Configuration
public class MyMqConfig {
public static final String DELAY_EXCHANGE = "normal_exchange";
public static final String DELAY_QUEUE = "delay_queue";
public static final String DELAY_ROUTINGKEY = "delay_key";
public static final String DEAD_EXCHANGE = "dead_exchange";
public static final String DEAD_ROUTINGKEY = "dead_key";
public static final String DEAD_QUEUE = "dead_queue";
@Bean("nExchange")
public DirectExchange nExchange() {
return new DirectExchange(DELAY_EXCHANGE);
}
@Bean("dExchange")
public DirectExchange dExchange() {
return new DirectExchange(DEAD_EXCHANGE);
}
@Bean("delay_queue")
public Queue delayQueue() {
return QueueBuilder
.nonDurable(DELAY_QUEUE)
.deadLetterExchange(DEAD_EXCHANGE)
.deadLetterRoutingKey(DEAD_ROUTINGKEY)
.build();
}
@Bean("dead_queue")
public Queue deadQueue() {
return QueueBuilder.nonDurable(DEAD_QUEUE).build();
}
@Bean
public Binding queueBindingDelay(@Qualifier("delay_queue") Queue que, @Qualifier("nExchange") DirectExchange ex) {
return BindingBuilder.bind(que).to(ex).with(DELAY_ROUTINGKEY);
}
@Bean
public Binding queueBindingDead(@Qualifier("dead_queue") Queue que, @Qualifier("dExchange") DirectExchange ex) {
return BindingBuilder.bind(que).to(ex).with(DEAD_ROUTINGKEY);
}
}

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Slf4j
@RestController
@RequestMapping("/test")
public class TestController {
@Resource
private RabbitTemplate rabbitTemplate;
//测试发送的接口
@GetMapping("/send/{msg}/{ttl}")
public String sendMessage(@PathVariable String msg, @PathVariable String ttl) {
log.info("Send in TTL {} : {}", ttl, msg);
rabbitTemplate.convertAndSend(
MyMqConfig.DELAY_EXCHANGE,
MyMqConfig.DELAY_ROUTINGKEY,
msg.getBytes(StandardCharsets.UTF_8),
message -> {
//设置TTL
message.getMessageProperties().setExpiration(ttl);//ms
return message;
}
);
return "OK";
}
}

关键在于MessagePostProcessor,其签名实际如下:

1
2
3
4
5
6
7
8
9
10
11
12
@FunctionalInterface
public interface MessagePostProcessor {
Message postProcessMessage(Message var1) throws AmqpException;

default Message postProcessMessage(Message message, Correlation correlation) {
return this.postProcessMessage(message);
}

default Message postProcessMessage(Message message, Correlation correlation, String exchange, String routingKey) {
return this.postProcessMessage(message, correlation);
}
}

使用的是第一个,只有一个Message的“重载”,对消息进行操作原样返回即可。

问题

如果前面发的一条消息延迟很大,而后面的延迟很小,会堵在前面的消息等待TTL(队列先进先出特性)。

该问题不引入插件目前无解。

延迟队列 Pro Plus Max(基于插件)

要想解决上面的问题,需要插件来让RabbitMQ支持该功能。

插件安装

到RabbitMQ网站上下载,社区插件

也可以到Github,链接

藏得有点深。

找到插件目录:

1
2
3
4
5
rabbitmq-plugins directories -s
# 通常输出
#Plugin archives directory: /usr/lib/rabbitmq/plugins:/usr/lib/rabbitmq/lib/rabbitmq_server-3.11.2/plugins
#Plugin expansion directory: /var/lib/rabbitmq/mnesia/rabbit@debian-plugins-expand
#Enabled plugins file: /etc/rabbitmq/enabled_plugins

把下载的插件安装包丢到Plugin Archives目录。

启用:

1
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

最后重启:

1
systemctl restart rabbitmq-server.service 

安装成功后,可在管理界面创建交换机时看到新的类型,x-delayed-message。这也意味着,延迟队列实现从利用死信队列变为使用交换机。新的结构如下:

配置类

新的交换机类型需要额外的配置,没有那个类了,需要自定义交换机。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Configuration
public class MyMqConfig {

public static final String DELAY_EXCHANGE = "delay_exchange";
public static final String DELAY_QUEUE = "delay_queue";
public static final String DELAY_RK = "delay_rk";

@Bean
public CustomExchange delayExchange() {
//交换机参数填上类型,其实就是延迟+什么基础类型的交换机,可以是直接、扇出等
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");//一般用Direct
//CustomExchange(String name, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments)
//在第二个参数——交换机类型填上我们的新交换机类型
return new CustomExchange(DELAY_EXCHANGE, "x-delayed-message", false, false, args);
}

@Bean
public Queue delayQueue() {
return QueueBuilder.nonDurable(DELAY_QUEUE).build();
}

@Bean
public Binding delayExchangeBinding(@Qualifier("delayExchange") CustomExchange ex, @Qualifier("delayQueue") Queue que) {
return BindingBuilder.bind(que).to(ex).with(DELAY_RK).noargs();
}
}

留意@Bean与@Qualifier(“”)的使用。

生产者

只变了设置延迟的一行。

生产者发送消息设置Expiration变为设置Delay。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Slf4j
@RestController
@RequestMapping("/test")
public class TestController {
@Resource
private RabbitTemplate rabbitTemplate;
//测试发送的接口
@GetMapping("/send/{msg}/{ttl}")
public String sendMessage(@PathVariable String msg, @PathVariable String ttl) {
log.info("Send in TTL {} : {}", ttl, msg);
rabbitTemplate.convertAndSend(
MyMqConfig.DELAY_EXCHANGE,
MyMqConfig.DELAY_RK,
msg.getBytes(StandardCharsets.UTF_8),
message -> {
//从setExpiration变成了setDelay
message.getMessageProperties().setDelay(Integer.parseInt(ttl));//ms
return message;
}
);
return "OK";
}
}

延迟队列 总结

利用RabbitMQ特性实现需要延迟的场景,可以简化并且消息不会丢失(开了持久化)。

在开发中要使用延迟队列,首选是使用基于插件的方法,但如果每个任务的延迟是固定的那么用死信队列也行。

发布确认

交换机确认

和基础篇中的情况基本一致,把消息缓存起来以防交换机寄了没机会重传。

配置与配置类

需要在配置文件中启用发布确认:

1
2
3
4
5
6
7
8
9
spring:
rabbitmq:
publisher-confirm-type: correlated
# none:
# 关闭发布确认,默认值
# simple:
# "Use waitForConfirms() or waitForConfirmsOrDie() within scoped operations.",用在同步模式,这两个方法返回false会关闭channel无法再发送消息
# correlated:
# "Use with CorrelationData to correlate confirmations with sent messsages.",用在异步回调模式

结构是,简单的一生产者一消费者与一直接交换机和消费者的队列,串联起来,硬要看配置类的话就:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Configuration
public class MyMqConfig {
public static final String EXCHANGE_NAME = "E";
public static final String ROUTING_KEY = "RK";
public static final String QUEUE_NAME = "Q";
@Bean
public DirectExchange theExchange() {
return ExchangeBuilder.directExchange(EXCHANGE_NAME).autoDelete().build();
}
@Bean
public Queue theQueue() {
return QueueBuilder.durable(QUEUE_NAME).autoDelete().build();
}
@Bean
public Binding theBinding(@Qualifier("theExchange") DirectExchange exchange, @Qualifier("theQueue") Queue queue) {
return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);
}
}

留意ExchangeBuilder。

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//设置确认回调方法,等同于异步回调
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
//correlationData可能为null,这玩意是在生产者自己填写的,没填写就没有
String id = (null == correlationData) ? "" : correlationData.getId();
if (ack) log.info("Success {}", id);
else log.error("Fail {}, Reason: {}", id, cause);
});
/*
* Callback原型:
* public interface ConfirmCallback {
* void confirm (@Nullable CorrelationData correlationData,boolean ack,@Nullable String cause);
* }
* correlationData 消息的ID及相关信息
* ack 交换机是否收到消息
* cause 失败的原因,没失败为null
*/

rabbitTemplate.convertAndSend(
MyMqConfig.EXCHANGE_NAME,
MyMqConfig.ROUTING_KEY,
msg.getBytes(StandardCharsets.UTF_8)
// ,new CorrelationData(....)
);

记得填写CorrelationData。

如何重发消息请看基础篇,需要注意的是这里ID就不用什么deliverTag了,自己生成个填CorrelationData里吧。

可以尝试掐网线、写错交换机名称来模仿到交换机的丢包。

和基础篇中的一样,不可感知交换机后的如不可路由等失败的丢失。解决见下方。

回退消息(队列确认)

将交换机发送到队列失败的消息回退给生产者,好让生产者处理。

配置

需要配置:

1
2
3
spring:
rabbitmq:
publisher-returns: true

生产者

其他部分与上一个生产者的相同,设置回调如下所示:

1
2
3
4
5
6
7
8
9
//设置回退回调方法
rabbitTemplate.setReturnsCallback(returned -> {
log.error("Returned: {}, Reason: {}, Exchange: {}, RoutingKey: {}",
new String(returned.getMessage().getBody(), StandardCharsets.UTF_8),
returned.getReplyText(),
returned.getExchange(),
returned.getRoutingKey());
});
//returned中,储存的是被退回的消息的,交换机、路由键、消息体(自己send的对象)、原因(replyText)

当遇到上述所谓无法路由的情况时,该回调会打印:

1
Returned: aaa, Reason: NO_ROUTE, Exchange: E, RoutingKey: RK114514ERROR

消息序列化

RabbitMQ的MessageConvert接口处理消息的序列化,其实现有 SimpleMessageConverter(默认)、Jackson2JsonMessageConverter等。当调用了 convertAndSend 时会使用 MessageConvert 进行消息对象的序列化。

SimpleMessageConverter 对于要发送的消息体 body:

  • 为 byte[] 时不进行处理
  • 是 String 则转成字节数组
  • 是 Java 对象,则用JDK自带序列化将消息转成字节数组,性能较差

Jackson2JsonMessageConverter顾名思义转成JSON。


RabbitMQ使用 SpringBoot整合篇
https://sodacooky.netlify.app/2022/RabbitMQ使用SpringBoot篇/
作者
Sodacooky
发布于
2022年11月11日
许可协议