RabbitMQ延迟队列,死信队列配置
  wR16XKHtuTxh 2023年11月01日 81 0
延迟和死信队列的配置
  • 延迟队列有效期一分钟,后进入死信队列,如果异常就进入异常队列
@Configuration
@Data
public class RabbitMQConfig {


    /**
     * 交换机
     */
    private String orderEventExchange="order.event.exchange";


    /**
     * 延迟队列, 不能被监听消费
     */
    private String orderCloseDelayQueue="order.close.delay.queue";

    /**
     * 关单队列, 延迟队列的消息过期后转发的队列,被消费者监听
     */
    private String orderCloseQueue="order.close.queue";


    /**
     * 进入延迟队列的路由key
     */
    private String orderCloseDelayRoutingKey="order.close.delay.routing.key";


    /**
     * 进入死信队列的路由key,消息过期进入死信队列的key
     */
    private String orderCloseRoutingKey="order.close.routing.key";

    /**
     * 过期时间 毫秒,临时改为1分钟定时关单
     */
    private Integer ttl=1000*60;

    /**
     * 消息转换器
     * @return
     */
    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }


    /**
     * 创建交换机 Topic类型,也可以用dirct路由
     * 一般一个微服务一个交换机
     * @return
     */
    @Bean
    public Exchange orderEventExchange(){
        return new TopicExchange(orderEventExchange,true,false);
    }


    /**
     * 延迟队列
     */
    @Bean
    public Queue orderCloseDelayQueue(){

        Map<String,Object> args = new HashMap<>(3);
        args.put("x-dead-letter-exchange",orderEventExchange);
        args.put("x-dead-letter-routing-key",orderCloseRoutingKey);
        args.put("x-message-ttl",ttl);

        return new Queue(orderCloseDelayQueue,true,false,false,args);
    }


    /**
     * 死信队列,普通队列,用于被监听
     */
    @Bean
    public Queue orderCloseQueue(){

        return new Queue(orderCloseQueue,true,false,false);

    }

    /**
     * 第一个队列,即延迟队列的绑定关系建立
     * @return
     */
    @Bean
    public Binding orderCloseDelayBinding(){

        return new Binding(orderCloseDelayQueue,Binding.DestinationType.QUEUE,orderEventExchange,orderCloseDelayRoutingKey,null);
    }

    /**
     * 死信队列绑定关系建立
     * @return
     */
    @Bean
    public Binding orderCloseBinding(){

        return new Binding(orderCloseQueue,Binding.DestinationType.QUEUE,orderEventExchange,orderCloseRoutingKey,null);
    }



}
异常队列配置类
public class RabbitMQErrorConfig {
    @Autowired
    RabbitTemplate rabbitTemplate;
    /**
     * 异常交换机
     */
    private String orderErrorExchange = "order.error.exchange";
    /**
     * 异常队列
     */
    private String orderErrorQueue = "order.error.queue";
    /**
     * 异常routing.key
     */
    private String orderErrorRoutingKey = "order.error.routing.key";

    /**
     * 异常交换机
     * @return
     */
    @Bean
    public TopicExchange errorTopicExchange(){
        return new TopicExchange(orderErrorExchange,true,false);
    }

    /**
     * 异常队列
     * @return
     */
    @Bean
    public Queue errorQueue(){
        return new Queue(orderErrorQueue,true);
    }

    /**
     * 队列交换机进行绑定
     * @param errorQueue
     * @return
     */
    @Bean
    public Binding BindingErrorQueueAndExchange(Queue errorQueue,TopicExchange errorTopicExchange){
        return BindingBuilder.bind(errorQueue).to(errorTopicExchange).with(orderErrorRoutingKey);
    }

    /**
     * 配置 RepublishMessageRecoverer
     * 用途:消息重试一定次数后,用特定的routingKey转发到指定的交换机中,方便后续排查和告警
     *
     * 顶层是 MessageRecoverer接口,多个实现类
     *
     * @return
     */
    @Bean
    public MessageRecoverer messageRecoverer(){
        return new RepublishMessageRecoverer(rabbitTemplate,orderErrorExchange,orderErrorRoutingKey);
    }
}
【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月08日 0

暂无评论

推荐阅读
  2Vtxr3XfwhHq   2024年05月17日   55   0   0 Java
  Tnh5bgG19sRf   2024年05月20日   110   0   0 Java
  8s1LUHPryisj   2024年05月17日   46   0   0 Java
  aRSRdgycpgWt   2024年05月17日   47   0   0 Java
wR16XKHtuTxh