一、死信交换机 当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter
):
消费者使用basic.reject
或 basic.nack
声明消费失败,并且消息的requeue
参数设置为false
消息是一个过期消息,超时无人消费
要投递的队列消息堆积满了,最早的消息可能成为死信
如果该队列配置了dead-letter-exchange
属性,指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而 这个交换机称为死信交换机(Dead Letter Exchange
,简称DLX
)。
二、TTL
如果message
和queue
都有ttl
,采用更小的一方。
1. Queue指定死信交换机并设置TTL language-java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 @Configuration public class CommonConfig { @Bean public DirectExchange ttlExchange(){ return new DirectExchange("ttl.direct"); } @Bean public Queue ttlQueue(){ return QueueBuilder .durable("ttl.queue") .ttl(10000) .deadLetterExchange("dl.direct") .deadLetterRoutingKey("dl") .build(); } @Bean public Binding ttlBinding(){ return BindingBuilder.bind(ttlQueue()).to(ttlExchange()).with("ttl"); } }
2. 消息设置TTL language-java 1 2 3 4 5 6 7 8 9 10 @Test public void testTTLMessage(){ Message message = MessageBuilder.withBody("hello ttl".getBytes(StandardCharsets.UTF_8)) .setDeliveryMode(MessageDeliveryMode.PERSISTENT) .setExpiration("5000") .build(); rabbitTemplate.convertAndSend("ttl.direct", "ttl", message); log.info("ttl消息已发送"); }
借助TTL
机制可以用死信交换机模拟延迟队列,但是设计上比较牵强,性能不好。
三、延迟队列 这是官方提供的一些额外插件https://www.rabbitmq.com/community-plugins.html
下载其中的DelayExchange
插件,把.ez
文件挂载到RabbitMQ
容器的/plugins
目录下,然后进入容器,执行
language-bash 1 rabbitmq-plugins enable rabbitmq_delayed_message_exchange
language-txt 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 root@7c4ba266e5bc:/# rabbitmq-plugins enable rabbitmq_delayed_message_exchange Enabling plugins on node rabbit@7c4ba266e5bc: rabbitmq_delayed_message_exchange The following plugins have been configured: rabbitmq_delayed_message_exchange rabbitmq_management rabbitmq_management_agent rabbitmq_prometheus rabbitmq_web_dispatch Applying plugin configuration to rabbit@7c4ba266e5bc... The following plugins have been enabled: rabbitmq_delayed_message_exchange started 1 plugins.
1. SpringAMQP创建延迟队列 基于@RabbitListener
或者基于@Bean
都可以。
language-java 1 2 3 4 5 6 7 8 9 @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "delay.queue"), exchange = @Exchange(name = "delay.direct", delayed = "true"), key = "delay" )) public void listenDelayExchange(String msg){ log.info("消费者接收到delay.queue的延迟消息:【" + msg + "】"); }
2. 设置消息延迟 这个插件只能在消息上设置延迟时间,没有队列设置延迟时间的概念,不过都是一样的。message
要在Header
上添加一个x-delay
。
language-java 1 2 3 4 5 6 7 8 9 10 11 12 @Test public void testDelayMessage(){ Message message = MessageBuilder.withBody("hello delay".getBytes(StandardCharsets.UTF_8)) .setDeliveryMode(MessageDeliveryMode.PERSISTENT) .setHeader("x-delay", 5000) .build(); // confirm callback CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); rabbitTemplate.convertAndSend("delay.direct", "delay", message, correlationData); log.info("发送消息成功"); }
3. 测试 直接运行测试,可能会报错,因为rabbitmq
意识到消息到了exchange
却没有立即到queue
,被认为错误,回调returnback
,所以我们在ReturnCallBack
中绕过这个限制。
language-java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 @Slf4j @Configuration public class CommonConfig implements ApplicationContextAware { @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class); rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey)->{ //check if is delay message if (message.getMessageProperties().getReceivedDelay() != null && message.getMessageProperties().getReceivedDelay() > 0) { return; } log.error("消息发送到queue失败,replyCode={}, reason={}, exchange={}, routeKey={}, message={}", replyCode, replyText, exchange, routingKey, message.toString()); }); } }
运行Test
测试,可以看到Test
方面,消息发送的时间为21:09:13
language-txt 1 2 21:09:13:516 INFO 25468 --- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#2063c53e:0/SimpleConnection@6415f61e [delegate=amqp://rabbitmq@127.0.0.1:5672/, localPort= 62470] 21:09:13:557 INFO 25468 --- [ main] cn.itcast.mq.spring.SpringAmqpTest : 发送消息成功
listener
方面消息消费的时间为21:09:18
,刚好5s。
language-txt 1 2 21:08:31:952 INFO 19532 --- [ main] cn.itcast.mq.ConsumerApplication : Started ConsumerApplication in 1.735 seconds (JVM running for 2.357) 21:09:18:583 INFO 19532 --- [ntContainer#0-1] c.i.mq.listener.SpringRabbitListener : 消费者接收到delay.queue的延迟消息:【hello delay】