RabbitMQ高级特性
  vxNQtvtQlfbi 2023年11月02日 53 0

消息如何保障100%的投递成功?

什么是生产端的可靠性投递?

  • 1、保障消息的成功发出
  • 2、保障MQ节点的成功接收
  • 3、发送端收到MQ节点(Broker)确认应答
  • 4、完善的消息进行补偿机制 前三步不一定能保障消息能够100%投递成功,因此要加上第四步。

一线互联网大厂的解决方案: 1、消息落库,对消息状态进行打标 image.png 图解:

  • 蓝色部分表示:生产者负责发送消息发送至Broker端
  • Biz DB:订单数据库
  • MSG DB: 消息数据
  • 面对小规模的应用可以采用加事务的方式,保证事务的一致性。但在大厂中面对高并发,并没有加事务,事务的性能拼接非常严重,而是做补偿。 比如:如下发一条订单消息。
  • step1:存储订单消息(创建订单),业务数据入库,消息也入库。缺点:需要持久化两次。(status:0)
  • step2:在step1成功的前提下,发送消息
  • step3:Broker收到消息后,confirm给我们的生产端。Confirm Listener异步监听Broker回送的消息。
  • step4:抓取出指定的消息,更新(status=1),表示消息已经投递成功。
  • step5:分布式定时任务获取消息状态,如果等于0则抓取数据出来。
  • step6:重新发送消息
  • step7:重试限制设置3次。如果消息重试了3次还是失败,那么(status=2),认为这个消息就是失败的。
  • 查询这些消息为什么失败,可能需要人工去查询。
  • 假设step2执行成功,step3由于网络闪断。那么confirm将永远收不到消息,那么我们需要设定一个规则:
  • 例如:在消息入库的时候,设置一个临界值 timeout=5min,当超过5min之后,就将这条数据抓取出来。
  • 或者写一个定时任务每隔5分钟就将status=0的消息抓取出来。可能存在小问题:消息发送出去,定时任务又正好刚执行,Confirm还未收到,定时任务就会执行,会导致消息执行两次。
  • 更精细化操作:消息超时容忍限制。confirm在2-3分钟内未收到消息,则重新发送。
  • 总体概括上图:首先业务持久化入库之后,在发送消息之前,需要将消息持久化到数据库中,并给这个消息设置一个状态(未发送、发送中、到达)。发送之后Producer端需要有一个ConfirmListener监听Broker的返回confirm,当消息状态发生了变化,需要对消息做一个变更。针对没有到达的消息做一个轮训操作,重新发送。对轮训次数也需要做一个限制3-5次。确保消息能够成功的发送。
  • 第一种方案对数据有两次入库,一次业务数据入库,一次消息入库。这样对数据的入库是一个瓶颈。

2、消息的延迟投递,做二次确认,回调检查 这种方式并不一定能保证100%成功,但是也能保证99.99%的消息成功。如果遇到特别极端的情况,那么就只能需要人工去补偿,或者定时任务去做。 第二种方式主要是为了减少对数据库的操作。 image.png 图解:

  • Upstream service:生产端
  • DownStream service:消费端
  • Callback service:回调服务
  • step1:业务消息入库成功后,第一次消息发送。
  • step2:同样在消息入库成功后,发送第二次消息,这两条消息是同时发送的。第二条消息是延迟检查,可以设置2min、5min 延迟发送。
  • step3:消费端监听指定队列。
  • step4:消费端处理完消息后,内部生成新的消息send confirm。投递到MQ Broker。
  • step5: Callback Service 回调服务监听MQ Broker,如果收到Downstream service发送的消息,则可以确定消息发送成功,执行消息存储到MSG DB。
  • step6:Check Detail检查监听step2延迟投递的消息。此时两个监听的队列不是同一个,5分钟后,Callback service收到消息,检查MSG DB。如果发现之前的消息已经投递成功,则不需要做其他事情。如果检查发现失败,则Callback 进行补偿,主动发送RPC 通信。通知上游生产端重新发送消息。
  • 首先正常逻辑:业务持久化入库之后,一次性生成2条消息,第一条消息为Consumer监听消费,第2条消息为延迟消息,由Callback Service监听并消息;Consumer消费消息成功后给Callback Service发送确认消费消息,Callback Service做消息消费成功的入库持久化,Callback Service收到延迟消息不做任何处理。
  • 异常逻辑:Consumer消费消息失败,或者没有收到消息,那么Callback Service收到延迟消息之后,通过RPC调用Producer查询Producer的业务表进行消息的重新发送。极端情况下需要人工补偿处理。
  • 这样做的目的就是为了在Producer端少做一次消息的持久化入库处理,异步的进行补偿操作,提升性能。

具体采用哪种方案,还需要根据业务与消息的并发量而定。

幂等性概念详解

幂等性是什么?

幂等(idempotent、idempotence)是一个数学与计算机学概念,常见于抽象代数中,即f(f(x)) = f(x)。简单的来说就是一个操作多次执行产生的结果与一次执行产生的结果一致

  • 我们可以借鉴数据库的乐观锁机制:
  • 比如我们执行一条更新库存的SQL语句:
  • UPDATE T_REPS SET COUNT = COUNT - 1,VERSION = VERSION + 1 WHERE VERSION = 1
  • 利用加版本号Version的方式来保证幂等性。

消费端-幂等性保障

在海量订单产生的业务高峰期,如何避免消息的重复消费问题? 在高并发的情况下,会有大量的消息到达MQ,消费端需要监听大量的消息。这样的情况下,难免会出现消息的重复投递,网络闪断等等。如果不去做幂等,则会出现消息的重复消费。 消费端实现幂等性,就意味着,我们的消息永远不会被消费多次,即使我们收到了多条一样的消息,也只会执行一次。 看下互联网主流的幂等性操作:

唯一ID+指纹码机制

  • 唯一ID + 指纹码机制,利用数据库主键去重。保证唯一性
  • SELECT COUNT(1) FROM T_ORDER WHERE ID = 唯一ID + 指纹码 如果查询没有,则添加。有则不需要做任何操作,消费端不需要消费消息。
  • 好处:实现简单
  • 坏处:高并发下有数据库写入的性能瓶颈
  • 解决方案:跟进ID进行分库分表进行算法路由分摊流量压力。

Redis 原子特性实现 最简单使用Redis的自增。 使用Redis进行幂等,需要考虑的问题。

  • 第一:我们是否需要数据落库,如果落库的话,关键解决的问题是数据库和缓存如何做到原子性?加事务不行,Redis和数据库的事务不是同一个,无法保证同时成功同时失败。
  • 第二:如果不进行落库,那么都存储到缓存中,如何设置定时同步的策略?

Confirm确认消息、Return返回消息

理解Confirm 消息确认机制:

  • 消息的确认,是指生产者投递消息后,如果Broker收到消息,则会给我们生产者一个应答。
  • 生产者进行接收应答,用来确定这条消息是否正常的发送到Broker,这种方式也是消息的可靠性投递的核心保障! Confirm确认消息流程: image.png 蓝色:producer 生产者 红色:MQ Broker 服务器 生产者把消息发送到Broker端,Broker收到消息之后回送给producer。Confirm Listener 监听应答。 操作是异步操作,当生产者发送完消息之后,就不需要管了。Confirm Listener 监听MQ Broker的应答。

如何实现Confirm确认消息?

  • 第一步:在channel上开启确认模式:channel.confirmSelect()
  • 第二步;在chanel上 添加监听:addConfirmListener,监听成功和失败的返回结果,根据具体的结果对消息进行重新发送、或记录日志等后续处理!

Confirm确认模式的生产端:

public class Producer {

    public static void main(String[] args) throws IOException, TimeoutException {
        //1、创建一个ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        //2、通过连接工厂创建连接
        Connection connection = connectionFactory.newConnection();

        //3、通过Connection创建一个Channel
        Channel channel = connection.createChannel();

        //4、指定消息的投递模式:消息的确认模式
        channel.confirmSelect();

        String exchangeName = "test_confirm_exchange";
        String routingKey = "confirm.save";

        //5、发送消息
        String msg = "hello rabbitmq send confirm message!";
        channel.basicPublish(exchangeName,routingKey,null,msg.getBytes("UTF-8"));

        //6、添加一个确认监听
        channel.addConfirmListener(new ConfirmListener() {
            /**
             * 返回成功
             * @param deliveryTag
             * @param multiple
             * @throws IOException
             */
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("----------ack----------");
            }

            /**
             * 返回失败
             * @param deliveryTag
             * @param multiple
             * @throws IOException
             */
            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("----------no ack----------");
            }
        });
    }
}

Confirm确认模式的消费端:

public class Consumer {

    public static void main(String[] args) throws IOException, TimeoutException {
//1、创建一个ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        //2、通过连接工厂创建连接
        Connection connection = connectionFactory.newConnection();

        //3、通过Connection创建一个Channel
        Channel channel = connection.createChannel();

        String exchangeName = "test_confirm_exchange";
        String queueName = "test_confirm_queue";
        String routingKey = "confirm.#";

        //4、声明交换机和队列,然后进行绑定设置,指定路由key
        channel.exchangeDeclare(exchangeName,"topic",true);
        channel.queueDeclare(queueName,true,false,false,null);
        channel.queueBind(queueName,exchangeName,routingKey);

        //5、创建消费者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(envelope.getRoutingKey() + ":" + message);
            }
        };

        //6、设置channel
        channel.basicConsume(queueName,true,consumer);
    }
}

Return消息机制

  • Return Listener用于处理一些不可路由的消息!
  • 我们的消息生产者,通过指定一个Exchange和Routingkey,把消息送达到某一个队列中去,然后我们的消费者监听队列,进行消费处理操作!
  • 但是在某些情况下,如果我们在发送消息的时候,当前的exchange不存在或者指定的路由key路由不到,这个时候如果我们需要监听这种不可达的消息,就要使用Return Listener!

Return消息机制基础API中有一个关键的配置项:

  • Mandatory:如果为true,则监听器会接收到路由不可达的消息,然后进行后续处理,如果为false,那么broker端自动删除该消息!

Return消息机制流程 image.png

Return消息机制生产端:

public class Producer {

    public static void main(String[] args) throws IOException, TimeoutException {
        //1、创建一个ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        //2、通过连接工厂创建连接
        Connection connection = connectionFactory.newConnection();

        //3、通过Connection创建一个Channel
        Channel channel = connection.createChannel();

        //4、指定消息的投递模式:消息的确认模式
        channel.confirmSelect();

        String exchangeName = "test_return_exchange";
        String routingKey = "return.save";
        String routingKeyError = "error.save";

        //5、发送消息
        String msg = "hello rabbitmq send confirm message!";
        channel.basicPublish(exchangeName,routingKey,true,null,msg.getBytes("UTF-8"));

        //第三个参数mandatory=true,意味着路由不到的话mq也不会删除消息,false则会自动删除
        channel.basicPublish(exchangeName,routingKeyError,true,null,msg.getBytes("UTF-8"));
        channel.basicPublish(exchangeName,routingKeyError,false,null,msg.getBytes("UTF-8"));

        //6、添加一个return监听
        channel.addReturnListener(new ReturnListener() {

            @Override
            public void handleReturn(int replyCode, String replyText, String exchange,
                                     String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("----------handle return----------");
                System.out.println("replyCode:" + replyCode);
                System.out.println("replyText:" + replyText);
                System.out.println("exchange:" + exchange);
                System.out.println("routingKey:" + routingKey);
                System.out.println("properties:" + properties);
                System.out.println("body:" + new String(body,"UTF-8"));
            }
        });
    }
}

Return消息机制消费端:

public class Consumer {

    public static void main(String[] args) throws IOException, TimeoutException {
        //1、创建一个ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        //2、通过连接工厂创建连接
        Connection connection = connectionFactory.newConnection();

        //3、通过Connection创建一个Channel
        Channel channel = connection.createChannel();

        String exchangeName = "test_return_exchange";
        String queueName = "test_return_queue";
        String routingKey = "return.#";

        //4、声明交换机和队列,然后进行绑定设置,指定路由key
        channel.exchangeDeclare(exchangeName,"topic",true);
        channel.queueDeclare(queueName,true,false,false,null);
        channel.queueBind(queueName,exchangeName,routingKey);

        //5、创建消费者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(envelope.getRoutingKey() + ":" + message);
            }
        };

        //6、设置channel
        channel.basicConsume(queueName,true,consumer);
    }
}

关于Confirm确认消息、Return返回消息:rabbitmq生产者的消息确认

消息的限流(防止占用内存过多,节点宕机)

什么是消费端的限流?

  • 假设一个场景,首先,我们Rabbitmq服务器有上万条未处理的消息,我们随便打开一个消费者客户端,会出现下面情况:
  • 巨量的消息瞬间全部推送过来,但是我们单个客户端无法同时处理这么多数据!这个时候很容易导致服务器崩溃,出现故障。

为什么不在生产端进行限流呢?

因为在高并发的情况下,流量就是非常大,所以很难在生产端做限制。因此我们可以用MQ在消费端做限流。

  • RabbitMQ提供了一种qos(服务质量保证)功能,即在非自动确认消息的前提下,如果一定数目的消息(通过基于consume或者channel设置Qos的值)未被确认前,不进行消费新的消息。 在限流的情况下,千万不要设置自动签收,要设置为手动签收。
  • void BasicQos(uint prfetchSize,ushort prefetchCount,bool global); 参数解释:
  • prefetchSize:0
  • prefetchCount:会告诉RabbitMQ不要同时给一个消费者推送多于N个消息,即一旦有N个消息还没有ack,则该consumer将block掉,直到有消息ack。
  • global: true\false 是否将上面设置应用于channel,简单点说,就是上面限制是channel级别还是consumer级别。
  • prefetchSize和global这两项,rabbitmq没有实现,暂且不研究prefetch_count在no_ask = false的情况下生效,即在自动应答的情况下这两个值是不生效的。
  • 第一个参数:消息的限制大小,消息多少兆。一般不做限制,设置为0
  • 第二个参数:一次最多处理多少条,实际工作中设置为1就好
  • 第三个参数:限流策略在什么上应用。在RabbitMQ一般有两个应用级别:1.通道 2.Consumer级别。一般设置为false,true 表示channel级别,false表示在consumer级别

限流机制生产者端:

public class Producer {

    public static void main(String[] args) throws IOException, TimeoutException {
        //1、创建一个ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        //2、通过连接工厂创建连接
        Connection connection = connectionFactory.newConnection();

        //3、通过Connection创建一个Channel
        Channel channel = connection.createChannel();

        //4、指定消息的投递模式:消息的确认模式
        channel.confirmSelect();

        String exchangeName = "test_qos_exchange";
        String routingKey = "qos.save";

        //5、发送消息
        String msg = "hello rabbitmq send confirm message!";
        channel.basicPublish(exchangeName,routingKey,true,null,msg.getBytes("UTF-8"));
    }
}

限流机制消费者端:

public class MyConsumer extends DefaultConsumer {

    private Channel channel;

    public MyConsumer(Channel channel) {
        super(channel);
        this.channel = channel;
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
        System.out.println(envelope.getRoutingKey() + ":" + message);

        //手动签收
        channel.basicAck(envelope.getDeliveryTag(),false);
    }
}


public class Consumer {

    public static void main(String[] args) throws IOException, TimeoutException {
        //1、创建一个ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        //2、通过连接工厂创建连接
        Connection connection = connectionFactory.newConnection();

        //3、通过Connection创建一个Channel
        Channel channel = connection.createChannel();

        String exchangeName = "test_qos_exchange";
        String queueName = "test_qos_queue";
        String routingKey = "qos.#";

        //4、声明交换机和队列,然后进行绑定设置,指定路由key
        channel.exchangeDeclare(exchangeName,"topic",true);
        channel.queueDeclare(queueName,true,false,false,null);
        channel.queueBind(queueName,exchangeName,routingKey);


        //5、创建消费者
        DefaultConsumer consumer = new MyConsumer(channel);

        //6、限流方式 AutoAck设置为false
        channel.basicConsume(queueName,false,consumer);
        //
        channel.basicQos(0,1,false);
    }
}

消息的ACK与重回队列

消费端的手工ACK和NACK

  • 消费端进行消费的时候,如果由于业务异常我们可以进行日志的记录,然后进行补偿!
  • 如果由于服务器宕机等严重问题,那我们就需要手工进行ACK保障消费端消费成功!

消费端的重回队列

  • 消费端重回队列是为了对没有处理成功的消息,把消息重新传递给Broker!
  • 一般我们在实际应用中,都会关闭重回队列,也就是设置为False. 生产者端代码:
public class Producer {

    public static void main(String[] args) throws IOException, TimeoutException {
        //1、创建一个ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        //2、通过连接工厂创建连接
        Connection connection = connectionFactory.newConnection();

        //3、通过Connection创建一个Channel
        Channel channel = connection.createChannel();

        //4、指定消息的投递模式:消息的确认模式
        channel.confirmSelect();

        String exchangeName = "test_ack_exchange";
        String routingKey = "ack.save";

        Map<String,Object> headers = new HashMap<>();
        headers.put("num",0);
        AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
                .deliveryMode(2)
                .contentEncoding("UTF-8")
                .headers(headers)
                .build();
        for (int i = 0; i < 5; i++) {

            //5、发送消息
            String msg = "hello rabbitmq send ack message!"+i;
            channel.basicPublish(exchangeName,routingKey,true,properties,msg.getBytes("UTF-8"));
        }
    }
}

消费者端:

public class MyConsumer extends DefaultConsumer {

    private Channel channel;

    public MyConsumer(Channel channel) {
        super(channel);
        this.channel = channel;
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
        System.out.println(envelope.getRoutingKey() + ":" + message);

        Map<String, Object> headers = properties.getHeaders();
        Integer num = (Integer) headers.get("num");

        if(num == 0){
            //第三个参数,true:重回队列,false:不重回队列
            //重回队列,会将消息重新添加到消息的尾部
            channel.basicNack(envelope.getDeliveryTag(),false,true);
            return;
        }

        //手动签收
        channel.basicAck(envelope.getDeliveryTag(),false);
    }
}

public class Consumer {

    public static void main(String[] args) throws IOException, TimeoutException {
        //1、创建一个ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        //2、通过连接工厂创建连接
        Connection connection = connectionFactory.newConnection();

        //3、通过Connection创建一个Channel
        Channel channel = connection.createChannel();

        String exchangeName = "test_ack_exchange";
        String queueName = "test_ack_queue";
        String routingKey = "ack.save";

        //4、声明交换机和队列,然后进行绑定设置,指定路由key
        channel.exchangeDeclare(exchangeName,"topic",true);
        channel.queueDeclare(queueName,true,false,false,null);
        channel.queueBind(queueName,exchangeName,routingKey);


        //5、创建消费者
        DefaultConsumer consumer = new MyConsumer(channel);

        //6、手动签收 AutoAck设置为false
        channel.basicConsume(queueName,false,consumer);

    }
}

TTL消息

TTL

  • TTL是Time To Live的缩写,也就是生存时间
  • RabbitMQ支持消息的过期时间,在消息发送时可以进行指定
  • RabbitMQ支持队列的过期时间,从消息入队列开始计算,只要超过了队列的超时时间配置,那么消息会自动的清除

死信队列

死信队列:DLX,Dead-Letter-Exchange

RabbitMQ的死信队里与Exchange息息相关

  • 利用DLX,当消息在一个队列中变成死信(dead message)之后,它能被重新publish到另一个Exchange,这个Exchange就是DLX

消息变成死信有以下几种情况:

  • 消息被拒绝(basic.reject/basic.nack)并且requeue=false
  • 消息TTL过期
  • 队列达到最大长度

DLX也是一个正常的Exchange,和一般的Exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。

当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange上去,进而被路由到另一个队列。

可以监听这个队列中消息做相应的处理,这个特征可以弥补RabbitMQ3.0以前支持的immediate参数的功能。

死信队列设置:

  • 首先需要设置死信队列的exchange和queue,然后进行绑定:
    • Exchange:dlx.exchange
    • Queue:dlx.queue
    • RoutingKey:#
  • 然后我们进行正常声明交换机、队列、绑定,只不过我们需要在队列加上一个参数即可:arguments.put("x-dead-letter-exchange","dlx.exchange");
  • 这样消息在过期、requeue、队列在达到最大长度时,消息就可以直接路由到死信队列!

Producer端:

public class Producer {

    public static void main(String[] args) throws IOException, TimeoutException {
        //1、创建一个ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        //2、通过连接工厂创建连接
        Connection connection = connectionFactory.newConnection();

        //3、通过Connection创建一个Channel
        Channel channel = connection.createChannel();


        String exchangeName = "test_dlx_exchange";
        String routingKey = "dlx.save";

        AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
                .deliveryMode(2)
                .contentEncoding("UTF-8")
                .expiration("20000")
                .build();

        //5、发送消息
        String msg = "hello rabbitmq send dlx message!";
        channel.basicPublish(exchangeName,routingKey,true,properties,msg.getBytes("UTF-8"));

    }
}

消费者端:

public class MyConsumer extends DefaultConsumer {

    private Channel channel;

    public MyConsumer(Channel channel) {
        super(channel);
        this.channel = channel;
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
        System.out.println(envelope.getRoutingKey() + ":" + message);

        //手动签收
        channel.basicAck(envelope.getDeliveryTag(),false);
    }
}

public class Consumer {

    public static void main(String[] args) throws IOException, TimeoutException {
        //1、创建一个ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        //2、通过连接工厂创建连接
        Connection connection = connectionFactory.newConnection();

        //3、通过Connection创建一个Channel
        Channel channel = connection.createChannel();

        //这是一个普通的交换机、队列、路由
        String exchangeName = "test_dlx_exchange";
        String queueName = "test_dlx_queue";
        String routingKey = "dlx.#";

        //4、声明交换机和队列,然后进行绑定设置,指定路由key
        channel.exchangeDeclare(exchangeName,"topic",true);
        Map<String,Object> agruments = new HashMap<>();
        agruments.put("x-dead-letter-exchange", "dlx.exchange");
        //这个agruments属性要声明到队列上
        channel.queueDeclare(queueName,true,false,false,agruments);
        channel.queueBind(queueName,exchangeName,routingKey);

        //死信队列的声明
        channel.exchangeDeclare("dlx.exchange","topic",true,false,null);
        channel.queueDeclare("dlx.queue",true,false,false,null);
        channel.queueBind("dlx.queue","dlx.exchange","#");

        //5、创建消费者
        DefaultConsumer consumer = new MyConsumer(channel);

        //6、手动签收 AutoAck设置为false
        channel.basicConsume("dlx.queue",false,consumer);
    }
}

在实际工作中,死信队列非常重要,用于消息没有消费者,处于死信状态。我们可以才用补偿机制。

 

参考: https://www.cnblogs.com/coder-programming/p/11412048.html

https://www.cnblogs.com/coder-programming/p/11424152.html

https://my.oschina.net/genghz/blog/1840262

【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月08日 0

暂无评论

推荐阅读
vxNQtvtQlfbi