Springboot 版本: 2.7.0
配置类:
@Slf4j@ConfigurationpublicclassRabbitConfiguration{publicfinalstaticString TOPIC_EXCHANGE="myExchange";publicfinalstaticString QUEUE_NAME="myQueue";@BeanpublicRabbitAdminamqpAdmin(ConnectionFactory connectionFactory){returnnewRabbitAdmin(connectionFactory);}@BeanpublicRabbitTemplaterabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate template=newRabbitTemplate(connectionFactory); template.setMessageConverter(jsonConverter()); template.setExchange(TOPIC_EXCHANGE); template.setConfirmCallback((correlationData, ack, cause)->{if(ack){ log.info("消息:{}发送成功", correlationData.getId());}else{ log.error("消息:{}发送失败,失败原因为:{}", correlationData.getId(), cause);}}); template.setMandatory(true); template.setReturnsCallback(returned->{ log.error("消息:{}路由失败, 失败原因为:{}", returned.getMessage().toString(), returned.getReplyText());});return template;}@BeanpublicTopicExchangetopicExchange(){returnnewTopicExchange(TOPIC_EXCHANGE,true,false);}@BeanpublicQueuequeue(){returnnewQueue(QUEUE_NAME);}@BeanpublicBindingbinding(){returnBindingBuilder.bind(queue()).to(topicExchange()).with("my.test.*");}@BeanpublicJackson2JsonMessageConverterjsonConverter(){returnnewJackson2JsonMessageConverter();}}
配置文件:
spring:rabbitmq:host: localhostport:5672username: adminpassword: adminvirtual-host: my_vhost# 消息确认(ACK)publisher-confirm-type: CORRELATED#correlated #确认消息已发送到交换机(Exchange)publisher-returns:true#确认消息已发送到队列(Queue)
生产者:
@ComponentpublicclassPublisherService{@AutowiredprivateRabbitTemplate rabbitTemplate;publicvoidsend(){CorrelationData correlationData=newCorrelationData(); rabbitTemplate.convertAndSend("my.test.message",newUser("Kleven",18), correlationData);}}
@Data@NoArgsConstructor@AllArgsConstructor@ToStringpublicclassUserimplementsSerializable{privatestaticfinallong serialVersionUID=-5079682733940745661L;privateString name;privateInteger age;}
当确认模式设置为NONE时,只要中间件投递了消息就认为成功并将消息从队列中移除。
@RabbitListener(queues="myQueue", messageConverter="jsonConverter", ackMode="NONE")publicvoidnoneAckListener(User user){ log.info("收到消息 -> {}", user);// 添加个错误用于测试int a=1/0;}
结果:
可以看到,即使消费者出错了,队列中的消息依然被删除了。
/** * @param deliveryTag 当前消息的投递标签,是一个自增的数字。 * @param multiple true:确认 deliveryTag <= 当前消息deliveryTag 的所有消息; false:只确认当前收到的消息。 */voidbasicAck(long deliveryTag,boolean multiple)throwsIOException;
@AutowiredprivateJackson2JsonMessageConverter jsonConverter;@RabbitListener(queues="myQueue", ackMode="MANUAL")publicvoidmanualAckListener(Message message,Channel channel)throwsIOException{long deliveryTag= message.getMessageProperties().getDeliveryTag(); log.info("成功消费消息 -> {}", jsonConverter.fromMessage(message)); channel.basicAck(deliveryTag,false);}
结果:
消息消费成功,且从队列中删除。
消息:aaa9b3b7-85b4-42fb-8a12-0aad488817f1发送成功 成功消费消息 -> User(name=Kleven,age=18)
/** * * @param multiple 拒绝 deliveryTag <= 当前消息deliveryTag 的所有消息; false:只拒绝当前收到的消息。 * @param requeue true 将拒绝对的消息重新加入队列。 */voidbasicNack(long deliveryTag,boolean multiple,boolean requeue)throwsIOException;
@AutowiredprivateJackson2JsonMessageConverter jsonConverter;@RabbitListener(queues="myQueue", ackMode="MANUAL")publicvoidmanualAckListener(Message message,Channel channel)throwsIOException{long deliveryTag= message.getMessageProperties().getDeliveryTag(); log.info("消费消息 -> {}", jsonConverter.fromMessage(message)); channel.basicNack(deliveryTag,false,true);}
结果:
当 requeue 为 true时,拒绝消息后消息从重新入队,可以看到队列中任然有一条数据。
当 requeue 为 false时,拒绝消息后消息也还是从队列中删除掉了。
默认值,消费者成功时认为成功并从队列中删除消息。消费者失败时认为失败,不会从队列中删除消息。
@RabbitListener(queues="myQueue", messageConverter="jsonConverter")publicvoidautoAckListener(User user){ log.info("收到消息 -> {}", user);// 添加个错误用于测试int a=1/0;}
结果:
可以看到,消费者出错后,消息依然在队列中。当移除消费者中的错误代码后,成功消费消息后,队列中的数据被删除。