RabbitMQ使用 基础篇

概念见另一篇文章

引入依赖

引入RabbitMQ的连接“客户端”,Maven:

1
2
3
4
5
6
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.16.0</version>
</dependency>

SpringBoot的话直接:

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

简单模式(单收单发)

一个生产者在RabbitMQ上发送消息到指定队列,一个消费者在RabbitMQ上的指定队列接收消息。

生产者和消费者都连接到RabbitMQ,代码是相同的。

1
2
3
4
5
6
7
8
9
10
11
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置RabbitMQ服务器的地址
factory.setHost("localhost");
//服务器的端口
factory.setPort(5672);
//连接的用户名与密码
factory.setUsername("admin");
factory.setPassword("admin");
//获取连接
connection = factory.newConnection();

RabbitMQ的默认端口是5672!

获得连接之后,可以“声明”信道了,代码是相同的。

需要注意的是,任意一方声明信道都可以,重复申请信道不会出现问题,但是没有创建信道一定翻车。

1
2
3
4
//通过连接创建信道
Channel channel = connection.createChannel();
//“声明”队列
channel.queueDeclare("queue-a", false, false, false, null);

queueDeclare()创建队列的参数说明:

参数 说明
queue 队列名称,收发两端的要一致
durable 是否持久化,就是是否把消息存硬盘上而不是只是内存里
exclusive 是否独占,是否可以有超过一个消费者
autoDelete 是否在队列无人使用后自动删除
arguments 其他参数

注意,如队列是否持久化、是否独占等参数需要更改,需要删除原先的队列(可以在管理页面操作)。

发送端

发送消息可以通过信道的方法basicPublish进行。

1
2
channel.basicPublish("", "queue-a", null, str.getBytes(StandardCharsets.UTF_8));
//basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)

参数说明:

  1. 第一个参数为交换机名称,当前模式下不需要,则留空。

  2. 路由Key名称,当前模式下写队列的名称。

    (按我个人理解,其实路由Key就是指走向哪个队列,在路由模式下,同一个交换机的不同队列就靠该参数区分)

  3. 其他参数

  4. 消息体

“其他参数”可以设置为MessageProperties.PERSISTENT_TEXT_PLAIN等实现消息持久化。

接收端

接受消息使用信道的方法basicConsume进行。

1
2
3
4
5
6
7
channel.basicConsume("queue-a", true,
(consumerTag, message) -> {/*消息处理回调方法*/},
consumerTag -> {/*“消息被取消”回调方法*/});
//参数1:队列名称
//参数2:是否自动应答
// 自动应答指消息未处理完,但直接响应处理完成,继而继续接受下一个消息。
// 当消息处理耗时时,自动应答将会导致OOM之类的。

如果需要手动应答,则把basicConsume的autoAck设置为false,并在消息处理回调函数中应答:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
(consumerTag, message) -> {
//手动应答同意
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
//第二个参数为是否批量应答,批量应答deliveryTag当前和之前的消息。
//相当于滑动窗口的累计确认。

//手动应答否定
channel.basicNack(message.getEnvelope().getDeliveryTag(), false, true);
//第三个参数为是否重新将消息放回队列。

//手动拒绝
channel.basicReject(message.getEnvelope().getDeliveryTag(), true);
//第二个参数为是否重新将消息放回队列。
//实际上为Nack的不可批量方法。
}

当存在未确认(Ack)的消息而断开了连接,消息也会重新入队。

(补充)发布确认

通过信道的confirmSelect()方法为整条信道开启发布确认。

配合消息和队列的持久化,可以保证消息能到Broker手上且不会丢失。

如果从交换机到队列失败了,生产者是无感知的!因为这是交换机之间的发布确认!

单个发布确认(同步+逐个确认)

发一条消息,发送端在waitForConfirms()等方法阻塞。

当消息成功发送到Broker后,该方法将返回。

发送端的示例代码为:

1
2
3
4
for (int count = 0; count != 10; count++){
channel.basicPublish(...);
if (channel.waitForConfirms()){/*Success*/}
}

批量发布确认(同步+批量确认)

发送端发送若干条消息后,再waitForConfirms()方法阻塞确认多条消息,性能比上一个略高,但缺点是遇到失败无法知道是哪一条。不推荐使用。

发送端示例代码为:

1
2
3
4
5
6
7
for (int count = 0; count != 999; count++){
channel.basicPublish(...);
if (count%16 == 0){
//一个批次
if (channel.waitForConfirms()){/*Success*/}
}
}

异步发布确认

相比批量发布确认,在效率高的同时可以定位每一条消息发送的成功与失败并处理。

首选

发送端代码发送消息与上述两种同步方法相同,通常使用批量发送,但是不要紧接使用channel.waitForConfirms()进行等待,而是注册回调函数(添加监听器)。

注意是在发送消息前注册监听器。

1
2
3
4
5
6
7
8
9
channel.addConfirmListener((deliveryTag, multiple) -> {
System.out.println("Confirmed.");
}, (deliveryTag, multiple) -> {
System.out.println("Not Confirmed.");
});
.....
for (;;){
channel.basicPublish("", "que", null, input.getBytes(StandardCharsets.UTF_8));
}

在异步发布确认中遇到失败时,(在监听器方法中)通常不能拿着消息重新发送一次,因此需要一些方法。

在发送消息时,我们需要使用并发容器(如ConcurrentXXX)来存放消息以在发送消息线程和监听器线程“共享”消息,如:

1
ConcurrentSkipListMap<Long, byte[]> messageMap = new ConcurrentSkipListMap<>();

Long类型的Key实际上为DeliveryTag,通过每条消息publish后的channel.getNextPublishSeqNo()获取,如:

1
2
3
4
5
6
Random random = new Random();
for (int i = 0; i != 1000; i++) {
byte[] messageBody = Integer.toString(random.nextInt()).getBytes(StandardCharsets.UTF_8);
channel.basicPublish("", "que", null, messageBody);
messageMap.put(channel.getNextPublishSeqNo(), messageBody);
}

完整发送端代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
//获取信道
Channel channel = RabbitMqUtils.getDefaultChannel();
//储存消息的并发容器
ConcurrentSkipListMap<Long, byte[]> messageMap = new ConcurrentSkipListMap<>();
//监听器
channel.addConfirmListener(
(deliveryTag, multiple) ->
{
//批量确认
if (multiple) {
//获取小于指定deliveryTag的消息(即批量确认前面部分的)
ConcurrentNavigableMap<Long, byte[]> multi = messageMap.headMap(deliveryTag);
//清除它们
multi.clear();
} else {
//单个确认
messageMap.remove(deliveryTag);
}
},
(deliveryTag, multiple) -> {
//打印失败的消息,通常来说你应该想办法重新发送
System.out.println("Failed: " + deliveryTag + ", " + new String(messageMap.get(deliveryTag)));
}
);
//发送消息
Random random = new Random();
for (int i = 0; i != 1000; i++) {
byte[] messageBody = Integer.toString(random.nextInt()).getBytes(StandardCharsets.UTF_8);
channel.basicPublish("", "que", null, messageBody);
messageMap.put(channel.getNextPublishSeqNo(), messageBody);
}

工作队列模式

一个生产者产生“工作任务”,若干个消费者竞争(应该是轮询)获取这些“任务”。

接收端

接收端和简单模式中的相同,但是建议手动应答,否则可能会出现任务全往某一个Worker上怼而其他的闲着,达不到所谓“负载均衡”的效果。

发送端

发送端和简单模式中的完全相同。

不公平分发

默认轮询分发消息是每个人都拿一条,而当消费者速度不统一时,通常为消费者设置不同的“预取量”。

通过信道的basicQos(prefetchCount)方法设置“预取量”。

当数据堆积来不及处理时,最多积压的缓冲量为“预取值”。

发布订阅模式(FANOUT)

前面代码中,交换机Exchanges都使用的空字符串,实际上为AMQP Default默认交换机。并且,消息只能被一个消费者处理。

交换机

交换机类型有:直接(Direct)(路由模式),主题(Topic),标题(Headers),扇出(fanout)。扇出类型即发布订阅模式。

队列绑定到某个交换机,并有routingKey,交换机根据routingKey决定是否分发给某个队列。在当前模式中,通常生成随机名称的临时队列,按约定的routingKey绑定。

交换机与上面的队列一样,重复声明一样的没有问题但没声明会出事,而且声明不同配置的交换机也需要删除旧的重新声明。

接收端

在同一个交换机下且同一个routingKey(建议为空串)下,发送端指定交换机与routingKey后所有绑定的队列都能收到消息。

似乎有说法是,FANOUT模式下,routingKey实际上是无效的

接收端绑定代码如下:

1
2
3
4
5
6
7
8
//获取信道
Channel channel = RabbitMqUtils.getDefaultChannel();
//声明交换机
channel.exchangeDeclare("交换机名称", BuiltinExchangeType.FANOUT);
//声明临时队列并绑定
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, "交换机名称", "routingKey,可以为空串");
//后面按正常流程接收

channel.queueDeclare()为声明一个名称随机的临时队列,断开后队列就会销毁。

channel.queueBind(…)为绑定队列到交换机。

发送端

发送时需要指定对应的交换机与routingKey:

1
2
3
4
5
//获取信道
Channel channel = RabbitMqUtils.getChannel();
//声明交换机
channel.exchangeDeclare("foe", BuiltinExchangeType.FANOUT);
//后面按正常流程发送

路由模式(DIRECT)

路由器使用Direct模式,与发布订阅模式类似,只需要在交换机声明处的Type改为BuiltinExchangeType.DIRECT。

区别只在于,不同消费者的routingKey不相同,发送者通过routingKey决定谁作为消费者(路由)。

可以一个队列绑定多个routingKey,也可以多个交换机用同一个routingKey。

主题模式(TOPIC)

主题模式的关键在与routingKey,而routingKey的规则是一个单词列表并以符号点分割,如:

1
msg.info.user

长度上限是255个字节。

接收端使用星号(*)可以代替一个单词,井号(#)可以代替零到若干个单词,实际上就是通配符,如:

1
2
3
4
5
6
7
8
9
10
11
12
13
//固定开头
abc.efg.*
//固定三个单词并且第二个单词为abc的
*.abc.*
//以hibin结尾的
#.hibin
//完整匹配
a.b.c
/*
当指定abc.efg.hibin时,会匹配:
abc.efg.*
#.hibin
*/

同路由模式,一样可以一队列多routingKey或多队列使用相同routingKey。

发送端

1
2
3
4
5
6
//声明主题模式交换机
channel.exchangeDeclare("top", BuiltinExchangeType.TOPIC);
...
//发送指定主题内容
channel.basicPublish("top", "a.b.c", null, content);
...

接收端

1
2
3
4
5
6
7
//同发送端声明交换机
channel.exchangeDeclare("top", BuiltinExchangeType.TOPIC);
//设置监听的主题(通配或者完全匹配)
channel.queueBind(channel.queueDeclare().getQueue(), "top", "#.c");
channel.queueBind(channel.queueDeclare().getQueue(), "top", "#.b.#");//绑定多个主题
...
//正常接收消息

死信队列

死信指无法被消费的消息,可能是消费者发生异常、消息TTL过期、队列已满、消息被否定或拒绝且不重新入队(Reject/Nack),遇到死信通过死信交换机放入到死信队列中等待死信消费者后续处理(备胎)。

普通队列消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
Channel channel = RabbitMqUtils.getChannel();
//声明普通交换机与死信交换机
channel.exchangeDeclare("normal_exchange", BuiltinExchangeType.DIRECT);
channel.exchangeDeclare("dead_exchange", BuiltinExchangeType.DIRECT);
//声明普通队列与死信队列
//普通队列,参数指明消息如何成为死信,以及出现死信转发到死信交换机
Map<String, Object> arg = new HashMap<>();
//目标的私信交换机
arg.put("x-dead-letter-exchange", "dead_exchange");
//死信RoutingKey
arg.put("x-dead-letter-routing-key", "dead");
//过期时间,也可以在生产者设置
arg.put("x-message-ttl", 1000);//unit ms
//队列长度
arg.put("x-max-length",32);//超过长度的新消息将成为死信
//应用参数声明队列
channel.queueDeclare("normal_queue", false, false, false, arg);
//死信队列,处理死信的队列普普通通就好
channel.queueDeclare("dead_queue", false, false, false, null);
//队列绑定
channel.queueBind("normal_queue", "normal_exchange", "normal");
channel.queueBind("dead_queue", "dead_exchange", "dead");
//接收
channel.basicConsume("normal_queue", true,
(consumerTag, message) -> {
System.out.println("Normal: " + new String(message.getBody(), StandardCharsets.UTF_8));
}, consumerTag -> System.out.println("Cancelled."));

关键在于声明队列时,设置上参数x-dead-letter-exchangex-dead-letter-routing-key

为了模拟,我们应该在这个消费者成功启动并创建绑定死信队列后关闭,消息应该就会去到死信消费者。

生产者

生产者不需要在乎消费者什么死信,只管发就好。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Channel channel = RabbitMqUtils.getChannel();
//声明交换机与信道并绑定
//和接收端一样的重复声明不会报错,但是参数得完全一致,包括x-xxx那些,所以就不在生产者设置参数了
//channel.queueDeclare("normal_queue", false, false, false, null);
//channel.exchangeDeclare("normal_exchange", BuiltinExchangeType.DIRECT);
//channel.queueBind("normal_queue", "normal_exchange", "normal");
//发送消息
for (int count = 0; count != 8; count++) {
//发送端设置TTL的方法
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
.expiration("1000")//unit ms
.build();
//发送
channel.basicPublish("normal_exchange", "normal", properties, "Hi".getBytes(StandardCharsets.UTF_8));
}

死信消费者

就是一般的消费者,消费死信交换机下的死信队列即可。

1
2
3
4
5
6
7
8
9
10
11
12
Channel channel = RabbitMqUtils.getChannel();
//声明死信交换机,自信的话可以跳过
//channel.exchangeDeclare("dead_exchange", BuiltinExchangeType.DIRECT);
//声明死信队列,自信的话可以跳过
//channel.queueDeclare("dead_queue", false, false, false, null);
//队列绑定,自信的话可以跳过
//channel.queueBind("dead_queue", "dead_exchange", "dead");
//消费
channel.basicConsume("dead_queue", true,
(consumerTag, message) -> {
System.out.println("Normal: " + new String(message.getBody(), StandardCharsets.UTF_8));
}, consumerTag -> System.out.println("Cancelled."));

延迟队列

其实就是利用了死信队列的消息TTL过期机制实现。

当正常的消费者永久消失,则总是等待设定的TTL时间后到达死信消费者。

换句话说,只有队列没有消费者,消息有TTL,那么一定超时进入到死信队列。

如图例子中就是可利用延时队列简单化问题的例子。


RabbitMQ使用 基础篇
https://sodacooky.netlify.app/2022/RabbitMQ使用基础篇/
作者
Sodacooky
发布于
2022年10月24日
许可协议