[异常处理]rabbitMQ 消费端异常进入死循环-消费消息时候抛出错误,控制台一直刷
  fwg2HHOh1sKG 2023年11月02日 86 0

消费端一直在循环 消费消>报错->消费. 问题点也能想的来,因为默认是自动应答,异常了相当于是没有应答,然后就一直异常重新抛回队列进行投递.

解决方案: 第一种方法: 对可能发生异常的部分try、catch;只要事先将问题catch住,就证明消费端已经将该问题消费掉,然后该消息就不存在于队列中,不会造成无限报错的情况。这里,你可以在catch中写一些业务,把这个出现异常的“消息”记录到数据库或者怎么怎么处理,反正是相当于被消费掉了。 第二种方法: "消费者重试"模式: 在配置文件中配置如下:消费者会尝试消费3次,之后丢弃这个消息,不会进入死循环

    listener:
      simple:
        default-requeue-rejected: true #意思是,消息被拒后(即未消费),重新(true)放入队列
        retry:
          enabled: true #是否开启消费者重试(为false时关闭消费者重试,这时消费端代码异常会一直重复收到消息)
          max-attempts: 3
          initial-interval: 5000ms

第三种方法: 只设置 default-requeue-rejected: false #意思是,消息被拒后(即未消费),重新(true)放入队列,当是false 时候不会出现死循环,就抛出一次异常(如果设置了死信队列,就被送到了死信队列;否则直接扔掉)。 image.png 第四种方式: 设置手动ack ,模式选择manual

    listener:
      simple:
        acknowledge-mode: auto
        #默认是auto,
        #none模式下,消息投递是不可靠的,可能丢失
        #auto模式类似事务机制,出现异常时返回nack,消息回滚到mq;没有异常,返回ack
        #manual:自己根据业务情况,判断什么时候该ack

推荐: 在项目中选择第二种方式,消费者重试方案,并且设置消费者确认机制为auto,开启重试模式后,重试次数耗尽之后,会被息会被丢弃,这是由Spring内部机制决定的。 在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecovery接口来处理,它包含三种不同的实现:

RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式

ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队

RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

比较优雅的一种处理方案是RepublishMessageRecoverer,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。

@Configuration
public class RabbitmqConfig {

    @Bean
    public DirectExchange errorExchange(){
        return new DirectExchange(ERROR_EXCHANGE);
    }
    @Bean
    public Queue errorQueue(){
        return new Queue(ERROR_QUEUE);
    }
    @Bean
    public Binding bindErrorQueue(){
        return BindingBuilder.bind(errorQueue()).to(errorExchange()).with(ERROR_ROUTE_KEY);
    }
    
    /**
     * 自定义失败消息处理策略,将消息投递到新的交换机和路由KEY
     */
    @Bean
    public MessageRecoverer republishMessageRecover(RabbitTemplate rabbitTemplate){
        return new RepublishMessageRecoverer(rabbitTemplate, ERROR_EXCHANGE, ERROR_ROUTE_KEY);
    }
}

    /**
     * 监听错误的队列
     * @param msg
     */
    @RabbitListener(queues = RabbitmqConstant.ERROR_QUEUE)
    public void listenErrorMessage(String msg) {
        System.out.println("接收到失败的消息:【" + msg + "】");
    }

测试结果如下: 当3次重试之后,投递到指定的交换机中,由指定的队列进行处理

image.png

【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月08日 0

暂无评论

推荐阅读
fwg2HHOh1sKG