环境如下
Version
SpringBoot
3.2.1
spring-amqp
3.1.1
RabbitMq
3-management
一、起因 老版本的spring-amqp
在CorrelationData
上设置ConfirmCallback
。但是今天却突然发现correlationData.getFuture()
没有addCallback
函数了。
查询文档和帖子后,发现ConfirmCallback
和ReturnsCallback
都需要在RabbitTemplate
中设置,同时ConfirmCallback
中默认无法得到消息内容,如果想在ConfirmCallback
中把消息内容存到数据库等地方进行记录,怎么办呢?
参考手册
二、代码 1. 定义exchange和queue language-java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 @Slf4j @Configuration public class PayNotifyConfig{ //交换机 public static final String PAYNOTIFY_EXCHANGE_FANOUT = "paynotify_exchange_fanout"; //支付通知队列 public static final String PAYNOTIFY_QUEUE = "paynotify_queue"; //支付结果通知消息类型 public static final String MESSAGE_TYPE = "payresult_notify"; //声明交换机,且持久化 @Bean(PAYNOTIFY_EXCHANGE_FANOUT) public FanoutExchange paynotify_exchange_fanout() { // 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除 return new FanoutExchange(PAYNOTIFY_EXCHANGE_FANOUT, true, false); } //支付通知队列,且持久化 @Bean(PAYNOTIFY_QUEUE) public Queue paynotify_queue() { return QueueBuilder.durable(PAYNOTIFY_QUEUE).build(); } //交换机和支付通知队列绑定 @Bean public Binding binding_paynotify_queue(@Qualifier(PAYNOTIFY_QUEUE) Queue queue, @Qualifier(PAYNOTIFY_EXCHANGE_FANOUT) FanoutExchange exchange) { return BindingBuilder.bind(queue).to(exchange); } }
2. RabbitTemplate 在上面的类中继续添加RabbitTemplate
,并设置ConfirmCallback
和ReturnsCallback
。
language-java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 @Bean public RabbitTemplate rabbitTemplate(final ConnectionFactory connectionFactory) { final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); //设置confirm callback rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { String body = "1"; if (correlationData instanceof EnhancedCorrelationData) { body = ((EnhancedCorrelationData) correlationData).getBody(); } if (ack) { //消息投递到exchange log.debug("消息发送到exchange成功:correlationData={},message_id={} ", correlationData, body); System.out.println("消息发送到exchange成功:correlationData={},message_id={}"+correlationData+body); } else { log.debug("消息发送到exchange失败:cause={},message_id={}",cause, body); System.out.println("消息发送到exchange失败:cause={},message_id={}"+cause+body); } }); //设置return callback rabbitTemplate.setReturnsCallback(returned -> { Message message = returned.getMessage(); int replyCode = returned.getReplyCode(); String replyText = returned.getReplyText(); String exchange = returned.getExchange(); String routingKey = returned.getRoutingKey(); // 投递失败,记录日志 log.error("消息发送失败,应答码{},原因{},交换机{},路由键{},消息{}", replyCode, replyText, exchange, routingKey, message.toString()); }); return rabbitTemplate; }
3. EnhancedCorrelationData 原始的CorrelationData
,目前已经无法从中获取消息内容,也就是说现在的ConfirmCallback
无法获取到消息的内容,因为设计上只关注是否投递到exchange
成功。如果需要在ConfirmCallback
中获取消息的内容,需要扩展这个类,并在发消息的时候,放入自定义数据。
language-java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public class EnhancedCorrelationData extends CorrelationData { private final String body; public EnhancedCorrelationData(String id, String body) { super(id); this.body = body; } public String getBody() { return body; } }
4. 发送消息 在EnhancedCorrelationData
把消息本身放进去,或者如果你有表记录消息,你可以只放入其id
。这样触发ConfirmCallback的时候,就可以获取消息内容。
language-java 1 2 3 4 5 6 7 8 9 public void notifyPayResult() { String message = "TEST Message"; Message message1 = MessageBuilder.withBody(message.getBytes(StandardCharsets.UTF_8)) .setDeliveryMode(MessageDeliveryMode.PERSISTENT) .build(); CorrelationData correlationData = new EnhancedCorrelationData(UUID.randomUUID().toString(), message.toString()); rabbitTemplate.convertAndSend(PayNotifyConfig.PAYNOTIFY_EXCHANGE_FANOUT,"", message1, correlationData); }