无SpringBoot的基础使用见另一篇文章 。
引入依赖与配置 Maven添加Starter:
1 2 3 4 <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-amqp</artifactId > </dependency >
SpringBoot的配置文件中:
1 2 3 4 5 6 7 spring: rabbitmq: host: 192.168 .137 .16 port: 5672 username: admin password: admin
不需要在生产者或消费者声明队列和交换机,被配置类替代。
我将以我看的教程的顺序,即以例子来逐步记录。
延迟队列 延迟队列 Pro 只需要在配置类中配置好复杂的死信队列结构,就可以轻松使用。
配置类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 @Configuration public class MyMqConfig { private static final String NORMAL_EXCHANGE = "X" ; private static final String DEAD_EXCHANGE = "Y" ; private static final String NORMAL_QUEUE_1 = "QA" ; private static final String NORMAL_QUEUE_2 = "QB" ; private static final String DEAD_QUEUE = "QD" ; @Bean("xExchange") public DirectExchange xExchange () { return new DirectExchange (NORMAL_EXCHANGE); } @Bean("yExchange") public DirectExchange yExchange () { return new DirectExchange (DEAD_EXCHANGE); } @Bean("queueA") public Queue queueA () { return QueueBuilder .nonDurable(NORMAL_QUEUE_1) .ttl(10000 ) .deadLetterExchange(DEAD_EXCHANGE) .deadLetterRoutingKey("YD" ) .build(); } @Bean("queueB") public Queue queueB () { return QueueBuilder .nonDurable(NORMAL_QUEUE_2) .ttl(40000 ) .deadLetterExchange(DEAD_EXCHANGE) .deadLetterRoutingKey("YD" ) .build(); } @Bean("queueD") public Queue queueDead () { return QueueBuilder.nonDurable(DEAD_QUEUE).build(); } @Bean public Binding queueABindingX (@Qualifier("queueA") Queue qA, @Qualifier("xExchange") DirectExchange ex) { return BindingBuilder.bind(qA).to(ex).with("XA" ); } @Bean public Binding queueBBindingX (@Qualifier("queueB") Queue qB, @Qualifier("xExchange") DirectExchange ex) { return BindingBuilder.bind(qB).to(ex).with("XB" ); } @Bean public Binding queueDBindingX (@Qualifier("queueD") Queue qD, @Qualifier("yExchange") DirectExchange ex) { return BindingBuilder.bind(qD).to(ex).with("YD" ); } }
留意@Bean(“”)与@Qualifier(“”)的使用。
生产者 跟随教程使用Controller实现。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @RestController @RequestMapping("/test") public class TestController { @Resource private RabbitTemplate rabbitTemplate; @GetMapping("/send/{msg}") public void sendMessage (@PathVariable("msg") String message) { LoggerFactory.getLogger(TestController.class).info("{}: {}" , new Date (), message); rabbitTemplate.convertAndSend("X" , "XA" , "Hi" ); rabbitTemplate.convertAndSend("X" , "XB" , "Hi" ); } }
消费者(死信) 消费者使用使用注解实现监听某个队列。
1 2 3 4 5 6 7 8 @Slf4j @Component public class DeadLetterConsumer { @RabbitListener(queues = "QD") public void receiveD (Message message, Channel channel) throws Exception { log.info("Consumer: {},{}" , new String (message.getBody()), new Date ()); } }
是的没错,使用@RabbitListener
来声明一个方法作为Consumer,这里的参数列表自由发挥,一般来说需要一个Message。
延迟队列 Pro Plus 上面的延迟队列,你要新的延迟时间就要创建新的队列,太笨比。
应该由生产者指定过期时间,发送到一个没有设置过期时间的队列上(仍然没有消费者)。
配置类 只是减少了TTL设置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 @Configuration public class MyMqConfig { public static final String DELAY_EXCHANGE = "normal_exchange" ; public static final String DELAY_QUEUE = "delay_queue" ; public static final String DELAY_ROUTINGKEY = "delay_key" ; public static final String DEAD_EXCHANGE = "dead_exchange" ; public static final String DEAD_ROUTINGKEY = "dead_key" ; public static final String DEAD_QUEUE = "dead_queue" ; @Bean("nExchange") public DirectExchange nExchange () { return new DirectExchange (DELAY_EXCHANGE); } @Bean("dExchange") public DirectExchange dExchange () { return new DirectExchange (DEAD_EXCHANGE); } @Bean("delay_queue") public Queue delayQueue () { return QueueBuilder .nonDurable(DELAY_QUEUE) .deadLetterExchange(DEAD_EXCHANGE) .deadLetterRoutingKey(DEAD_ROUTINGKEY) .build(); } @Bean("dead_queue") public Queue deadQueue () { return QueueBuilder.nonDurable(DEAD_QUEUE).build(); } @Bean public Binding queueBindingDelay (@Qualifier("delay_queue") Queue que, @Qualifier("nExchange") DirectExchange ex) { return BindingBuilder.bind(que).to(ex).with(DELAY_ROUTINGKEY); } @Bean public Binding queueBindingDead (@Qualifier("dead_queue") Queue que, @Qualifier("dExchange") DirectExchange ex) { return BindingBuilder.bind(que).to(ex).with(DEAD_ROUTINGKEY); } }
生产者 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 @Slf4j @RestController @RequestMapping("/test") public class TestController { @Resource private RabbitTemplate rabbitTemplate; @GetMapping("/send/{msg}/{ttl}") public String sendMessage (@PathVariable String msg, @PathVariable String ttl) { log.info("Send in TTL {} : {}" , ttl, msg); rabbitTemplate.convertAndSend( MyMqConfig.DELAY_EXCHANGE, MyMqConfig.DELAY_ROUTINGKEY, msg.getBytes(StandardCharsets.UTF_8), message -> { message.getMessageProperties().setExpiration(ttl); return message; } ); return "OK" ; } }
关键在于MessagePostProcessor,其签名实际如下:
1 2 3 4 5 6 7 8 9 10 11 12 @FunctionalInterface public interface MessagePostProcessor { Message postProcessMessage (Message var1) throws AmqpException; default Message postProcessMessage (Message message, Correlation correlation) { return this .postProcessMessage(message); } default Message postProcessMessage (Message message, Correlation correlation, String exchange, String routingKey) { return this .postProcessMessage(message, correlation); } }
使用的是第一个,只有一个Message的“重载”,对消息进行操作原样返回即可。
问题 如果前面发的一条消息延迟很大,而后面的延迟很小,会堵在前面的消息等待TTL(队列先进先出特性)。
该问题不引入插件目前无解。
延迟队列 Pro Plus Max(基于插件) 要想解决上面的问题,需要插件来让RabbitMQ支持该功能。
插件安装 到RabbitMQ网站上下载,社区插件 。
也可以到Github,链接 。
藏得有点深。
找到插件目录:
1 2 3 4 5 rabbitmq-plugins directories -s# 通常输出 # Plugin archives directory: /usr/lib/rabbitmq/plugins:/usr/lib/rabbitmq/lib/rabbitmq_server-3.11.2/plugins # Plugin expansion directory: /var/lib/rabbitmq/mnesia/rabbit@debian-plugins-expand # Enabled plugins file: /etc/rabbitmq/enabled_plugins
把下载的插件安装包丢到Plugin Archives目录。
启用:
1 rabbitmq-plugins enable rabbitmq_delayed_message_exchange
最后重启:
1 systemctl restart rabbitmq-server.service
安装成功后,可在管理界面创建交换机时看到新的类型,x-delayed-message。这也意味着,延迟队列实现从利用死信队列变为使用交换机。新的结构如下:
配置类 新的交换机类型需要额外的配置,没有那个类了,需要自定义交换机。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 @Configuration public class MyMqConfig { public static final String DELAY_EXCHANGE = "delay_exchange" ; public static final String DELAY_QUEUE = "delay_queue" ; public static final String DELAY_RK = "delay_rk" ; @Bean public CustomExchange delayExchange () { Map<String, Object> args = new HashMap <>(); args.put("x-delayed-type" , "direct" ); return new CustomExchange (DELAY_EXCHANGE, "x-delayed-message" , false , false , args); } @Bean public Queue delayQueue () { return QueueBuilder.nonDurable(DELAY_QUEUE).build(); } @Bean public Binding delayExchangeBinding (@Qualifier("delayExchange") CustomExchange ex, @Qualifier("delayQueue") Queue que) { return BindingBuilder.bind(que).to(ex).with(DELAY_RK).noargs(); } }
留意@Bean与@Qualifier(“”)的使用。
生产者 只变了设置延迟的一行。
生产者发送消息设置Expiration变为设置Delay。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 @Slf4j @RestController @RequestMapping("/test") public class TestController { @Resource private RabbitTemplate rabbitTemplate; @GetMapping("/send/{msg}/{ttl}") public String sendMessage (@PathVariable String msg, @PathVariable String ttl) { log.info("Send in TTL {} : {}" , ttl, msg); rabbitTemplate.convertAndSend( MyMqConfig.DELAY_EXCHANGE, MyMqConfig.DELAY_RK, msg.getBytes(StandardCharsets.UTF_8), message -> { message.getMessageProperties().setDelay(Integer.parseInt(ttl)); return message; } ); return "OK" ; } }
延迟队列 总结 利用RabbitMQ特性实现需要延迟的场景,可以简化并且消息不会丢失(开了持久化)。
在开发中要使用延迟队列,首选是使用基于插件的方法,但如果每个任务的延迟是固定的那么用死信队列也行。
发布确认 交换机确认 和基础篇中的情况基本一致,把消息缓存起来以防交换机寄了没机会重传。
配置与配置类 需要在配置文件中启用发布确认:
1 2 3 4 5 6 7 8 9 spring: rabbitmq: publisher-confirm-type: correlated
结构是,简单的一生产者一消费者与一直接交换机和消费者的队列,串联起来,硬要看配置类的话就:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 @Configuration public class MyMqConfig { public static final String EXCHANGE_NAME = "E" ; public static final String ROUTING_KEY = "RK" ; public static final String QUEUE_NAME = "Q" ; @Bean public DirectExchange theExchange () { return ExchangeBuilder.directExchange(EXCHANGE_NAME).autoDelete().build(); } @Bean public Queue theQueue () { return QueueBuilder.durable(QUEUE_NAME).autoDelete().build(); } @Bean public Binding theBinding (@Qualifier("theExchange") DirectExchange exchange, @Qualifier("theQueue") Queue queue) { return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY); } }
留意ExchangeBuilder。
生产者 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { String id = (null == correlationData) ? "" : correlationData.getId(); if (ack) log.info("Success {}" , id); else log.error("Fail {}, Reason: {}" , id, cause); }); rabbitTemplate.convertAndSend( MyMqConfig.EXCHANGE_NAME, MyMqConfig.ROUTING_KEY, msg.getBytes(StandardCharsets.UTF_8) );
记得填写CorrelationData。
如何重发消息请看基础篇,需要注意的是这里ID就不用什么deliverTag了,自己生成个填CorrelationData里吧。
可以尝试掐网线、写错交换机名称来模仿到交换机的丢包。
和基础篇中的一样,不可感知交换机后的如不可路由等失败的丢失。解决见下方。
回退消息(队列确认) 将交换机发送到队列失败的消息回退给生产者,好让生产者处理。
配置 需要配置:
1 2 3 spring: rabbitmq: publisher-returns: true
生产者 其他部分与上一个生产者的相同,设置回调如下所示:
1 2 3 4 5 6 7 8 9 rabbitTemplate.setReturnsCallback(returned -> { log.error("Returned: {}, Reason: {}, Exchange: {}, RoutingKey: {}" , new String (returned.getMessage().getBody(), StandardCharsets.UTF_8), returned.getReplyText(), returned.getExchange(), returned.getRoutingKey()); });
当遇到上述所谓无法路由的情况时,该回调会打印:
1 Returned: aaa, Reason: NO_ROUTE, Exchange: E, RoutingKey: RK114514ERROR
坑 消息序列化 RabbitMQ的MessageConvert接口处理消息的序列化,其实现有 SimpleMessageConverter(默认)、Jackson2JsonMessageConverter等。当调用了 convertAndSend 时会使用 MessageConvert 进行消息对象的序列化。
SimpleMessageConverter 对于要发送的消息体 body:
为 byte[] 时不进行处理
是 String 则转成字节数组
是 Java 对象,则用JDK自带序列化将消息转成字节数组,性能较差
Jackson2JsonMessageConverter顾名思义转成JSON。