消费端一直在循环 消费消>报错->消费. 问题点也能想的来,因为默认是自动应答,异常了相当于是没有应答,然后就一直异常重新抛回队列进行投递.
解决方案: 第一种方法: 对可能发生异常的部分try、catch;只要事先将问题catch住,就证明消费端已经将该问题消费掉,然后该消息就不存在于队列中,不会造成无限报错的情况。这里,你可以在catch中写一些业务,把这个出现异常的“消息”记录到数据库或者怎么怎么处理,反正是相当于被消费掉了。 第二种方法: "消费者重试"模式: 在配置文件中配置如下:消费者会尝试消费3次,之后丢弃这个消息,不会进入死循环
listener:
simple:
default-requeue-rejected: true #意思是,消息被拒后(即未消费),重新(true)放入队列
retry:
enabled: true #是否开启消费者重试(为false时关闭消费者重试,这时消费端代码异常会一直重复收到消息)
max-attempts: 3
initial-interval: 5000ms
第三种方法: 只设置 default-requeue-rejected: false #意思是,消息被拒后(即未消费),重新(true)放入队列,当是false 时候不会出现死循环,就抛出一次异常(如果设置了死信队列,就被送到了死信队列;否则直接扔掉)。 第四种方式: 设置手动ack ,模式选择manual
listener:
simple:
acknowledge-mode: auto
#默认是auto,
#none模式下,消息投递是不可靠的,可能丢失
#auto模式类似事务机制,出现异常时返回nack,消息回滚到mq;没有异常,返回ack
#manual:自己根据业务情况,判断什么时候该ack
推荐: 在项目中选择第二种方式,消费者重试方案,并且设置消费者确认机制为auto,开启重试模式后,重试次数耗尽之后,会被息会被丢弃,这是由Spring内部机制决定的。 在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecovery接口来处理,它包含三种不同的实现:
RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机
比较优雅的一种处理方案是RepublishMessageRecoverer,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。
@Configuration
public class RabbitmqConfig {
@Bean
public DirectExchange errorExchange(){
return new DirectExchange(ERROR_EXCHANGE);
}
@Bean
public Queue errorQueue(){
return new Queue(ERROR_QUEUE);
}
@Bean
public Binding bindErrorQueue(){
return BindingBuilder.bind(errorQueue()).to(errorExchange()).with(ERROR_ROUTE_KEY);
}
/**
* 自定义失败消息处理策略,将消息投递到新的交换机和路由KEY
*/
@Bean
public MessageRecoverer republishMessageRecover(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate, ERROR_EXCHANGE, ERROR_ROUTE_KEY);
}
}
/**
* 监听错误的队列
* @param msg
*/
@RabbitListener(queues = RabbitmqConstant.ERROR_QUEUE)
public void listenErrorMessage(String msg) {
System.out.println("接收到失败的消息:【" + msg + "】");
}
测试结果如下: 当3次重试之后,投递到指定的交换机中,由指定的队列进行处理