RabbitMQ六大核心(六大模式)-- 6. Publisher Confirms(发布确认模式)
  f45gpqkY1FKu 2023年11月21日 38 0

生产者发消息到队列,设置要求队列必须持久化、设置要求队列中的消息必须持久化

存在传送到队列中必须保存到磁盘上才能实现持久化,但是在保存之前已经宕机,无法实现持久化;

此时必须增加发布确认(保存到磁盘后进行确认)才能保证确实保存在磁盘上

开启发布确认方法

// 信道开启发布确认
channel.confirmSelect()

有三种方式:

单独发布消息: 同步等待确认,简单,吞吐量非常有限 ,发布速度特别慢

批量确认消息:合理吞吐量, 一旦出现问题,不知道是哪个消息出了问题

异步确认消息:(推荐使用)代码复杂,速度快,不会丢失消息,性价比高

/** 单个确认 发布 1000 个单独确认消息,耗时:300 ms */
fun publishMessageIndividually() {
   val channel = RabbitMqUtils.getChannel()
   // 队列声明 随机名称
   val queueName = UUID.randomUUID().toString()
   channel.queueDeclare(queueName, false, false, false, null)
   // 开启发布确认
   channel.confirmSelect()
   // 开始时间
   val begin = System.currentTimeMillis()
   // 批量发消息
   for (index in 0 until MESSAGE_COUNT) {
       val message = index.toString()
       channel.basicPublish("", queueName, null, message.toByteArray())
       // 确认消息 单个消息马上进行确认
       val flag = channel.waitForConfirms()
       if (flag) println("消息发送成功")
   }
   // 结束时间
   val end = System.currentTimeMillis()
   println("发布 $MESSAGE_COUNT 个单独确认消息,耗时:${end - begin} ms")
}

/** 批量发布确认 发布 1000 个批量确认消息,耗时:44 ms */
fun publishMessageBatch() {
   val channel = RabbitMqUtils.getChannel()
   // 队列声明 随机名称
   val queueName = UUID.randomUUID().toString()
   channel.queueDeclare(queueName, false, false, false, null)
   // 开启发布确认
   channel.confirmSelect()
   // 开始时间
   val begin = System.currentTimeMillis()
   // 批量确认消息大小
   val batchSize = 100

   // 批量发消息 批量确认
   for (index in 0 until MESSAGE_COUNT) {
       val message = index.toString()
       channel.basicPublish("", queueName, null, message.toByteArray())
       // 发送到100次的时候 发布批量确认一次
       if (index % 100 == 0) channel.waitForConfirms()
   }

   // 结束时间
   val end = System.currentTimeMillis()
   println("发布 $MESSAGE_COUNT 个批量确认消息,耗时:${end - begin} ms")
}

/** 异步发布确认 发布 1000 个异步确认消息,耗时:22 ms */
fun publishMessageAsync() {
   val channel = RabbitMqUtils.getChannel()
   // 队列声明 随机名称
   val queueName = UUID.randomUUID().toString()
   channel.queueDeclare(queueName, false, false, false, null)
   // 开启发布确认
   channel.confirmSelect()
   // 开始时间
   val begin = System.currentTimeMillis()

   // 消息确认成功回调函数
   val ackCallback = ConfirmCallback { deliverTag, _ -> println("确认的消息:$deliverTag") }

   /**
        * 消息确认失败回调函数
        * 1. 消息的标记
        * 2. 是否为批量确认
        */
   val nackCallback = ConfirmCallback { deliverTag, _ -> println("未确认的消息: $deliverTag") }
   /** 消息监听器 监听哪些消息成功/失败
        *  1. 消息确认成功
        *  2. 消息确认失败
        */
   channel.addConfirmListener(ackCallback, nackCallback)

   // 批量发送消息 异步确认
   for (index in 0 until MESSAGE_COUNT) {
       val message = index.toString()
       channel.basicPublish("", queueName, null, message.toByteArray())
   }

   // 结束时间
   val end = System.currentTimeMillis()
   println("发布 $MESSAGE_COUNT 个异步确认消息,耗时:${end - begin} ms")
}

处理异步未确认消息: 两个线程 - 并发链路队列 ConcurrentSkipListMap<Long, String>()

/** 异步发布确认: 处理未确认消息*/
fun publishMessageAsync() {
    val channel = RabbitMqUtils.getChannel()
    val queueName = UUID.randomUUID().toString()
    channel.queueDeclare(queueName, false, false, false, null)
    channel.confirmSelect()
    /**
     * 线程安全有序的哈希表 适用于高并发的情况下
     * 1. 轻松的将序号与消息进行关联
     * 2. 轻松的批量删除内容,只要有序号Key
     * 3. 支持高并发(多线程)
     */
    val outstandingConfirms = ConcurrentSkipListMap<Long, String>()

    // 消息确认成功回调函数
    val ackCallback = ConfirmCallback { deliverTag, multiple ->
        if (multiple) {
            val confirmed = outstandingConfirms.headMap(deliverTag)
            confirmed.clear()
        } else {
            outstandingConfirms.remove(deliverTag)
        }
        println("确认的消息:$deliverTag")
    }

       // 消息确认失败回调函数
    val nackCallback = ConfirmCallback { deliverTag, _ ->
        val message = outstandingConfirms[deliverTag]
        println("未确认的消息是:$message 未确认的消息编号:$deliverTag")
    }                                    

    channel.addConfirmListener(ackCallback, nackCallback)

    val begin = System.currentTimeMillis()

    // 批量发送消息 异步确认
    for (index in 0 until MESSAGE_COUNT) {
        val message = index.toString()
        channel.basicPublish("", queueName, null, message.toByteArray())
        /** 此处记录所有发送的消息 消息总和
         * 消息确认成功回调函数 : 删除确认的消息
         * 消息确认失败回调函数
         */
        outstandingConfirms[channel.nextPublishSeqNo] = message
    }

    val end = System.currentTimeMillis()
    println("发布 $MESSAGE_COUNT 个异步确认消息,耗时:${end - begin} ms")
}
【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月21日 0

暂无评论

推荐阅读
f45gpqkY1FKu