生产者发消息到队列,设置要求队列必须持久化、设置要求队列中的消息必须持久化
存在传送到队列中必须保存到磁盘上才能实现持久化,但是在保存之前已经宕机,无法实现持久化;
此时必须增加发布确认(保存到磁盘后进行确认)才能保证确实保存在磁盘上
开启发布确认方法
// 信道开启发布确认
channel.confirmSelect()
有三种方式:
单独发布消息: 同步等待确认,简单,吞吐量非常有限 ,发布速度特别慢
批量确认消息:合理吞吐量, 一旦出现问题,不知道是哪个消息出了问题
异步确认消息:(推荐使用)代码复杂,速度快,不会丢失消息,性价比高
/** 单个确认 发布 1000 个单独确认消息,耗时:300 ms */
fun publishMessageIndividually() {
val channel = RabbitMqUtils.getChannel()
// 队列声明 随机名称
val queueName = UUID.randomUUID().toString()
channel.queueDeclare(queueName, false, false, false, null)
// 开启发布确认
channel.confirmSelect()
// 开始时间
val begin = System.currentTimeMillis()
// 批量发消息
for (index in 0 until MESSAGE_COUNT) {
val message = index.toString()
channel.basicPublish("", queueName, null, message.toByteArray())
// 确认消息 单个消息马上进行确认
val flag = channel.waitForConfirms()
if (flag) println("消息发送成功")
}
// 结束时间
val end = System.currentTimeMillis()
println("发布 $MESSAGE_COUNT 个单独确认消息,耗时:${end - begin} ms")
}
/** 批量发布确认 发布 1000 个批量确认消息,耗时:44 ms */
fun publishMessageBatch() {
val channel = RabbitMqUtils.getChannel()
// 队列声明 随机名称
val queueName = UUID.randomUUID().toString()
channel.queueDeclare(queueName, false, false, false, null)
// 开启发布确认
channel.confirmSelect()
// 开始时间
val begin = System.currentTimeMillis()
// 批量确认消息大小
val batchSize = 100
// 批量发消息 批量确认
for (index in 0 until MESSAGE_COUNT) {
val message = index.toString()
channel.basicPublish("", queueName, null, message.toByteArray())
// 发送到100次的时候 发布批量确认一次
if (index % 100 == 0) channel.waitForConfirms()
}
// 结束时间
val end = System.currentTimeMillis()
println("发布 $MESSAGE_COUNT 个批量确认消息,耗时:${end - begin} ms")
}
/** 异步发布确认 发布 1000 个异步确认消息,耗时:22 ms */
fun publishMessageAsync() {
val channel = RabbitMqUtils.getChannel()
// 队列声明 随机名称
val queueName = UUID.randomUUID().toString()
channel.queueDeclare(queueName, false, false, false, null)
// 开启发布确认
channel.confirmSelect()
// 开始时间
val begin = System.currentTimeMillis()
// 消息确认成功回调函数
val ackCallback = ConfirmCallback { deliverTag, _ -> println("确认的消息:$deliverTag") }
/**
* 消息确认失败回调函数
* 1. 消息的标记
* 2. 是否为批量确认
*/
val nackCallback = ConfirmCallback { deliverTag, _ -> println("未确认的消息: $deliverTag") }
/** 消息监听器 监听哪些消息成功/失败
* 1. 消息确认成功
* 2. 消息确认失败
*/
channel.addConfirmListener(ackCallback, nackCallback)
// 批量发送消息 异步确认
for (index in 0 until MESSAGE_COUNT) {
val message = index.toString()
channel.basicPublish("", queueName, null, message.toByteArray())
}
// 结束时间
val end = System.currentTimeMillis()
println("发布 $MESSAGE_COUNT 个异步确认消息,耗时:${end - begin} ms")
}
处理异步未确认消息: 两个线程 - 并发链路队列 ConcurrentSkipListMap<Long, String>()
/** 异步发布确认: 处理未确认消息*/
fun publishMessageAsync() {
val channel = RabbitMqUtils.getChannel()
val queueName = UUID.randomUUID().toString()
channel.queueDeclare(queueName, false, false, false, null)
channel.confirmSelect()
/**
* 线程安全有序的哈希表 适用于高并发的情况下
* 1. 轻松的将序号与消息进行关联
* 2. 轻松的批量删除内容,只要有序号Key
* 3. 支持高并发(多线程)
*/
val outstandingConfirms = ConcurrentSkipListMap<Long, String>()
// 消息确认成功回调函数
val ackCallback = ConfirmCallback { deliverTag, multiple ->
if (multiple) {
val confirmed = outstandingConfirms.headMap(deliverTag)
confirmed.clear()
} else {
outstandingConfirms.remove(deliverTag)
}
println("确认的消息:$deliverTag")
}
// 消息确认失败回调函数
val nackCallback = ConfirmCallback { deliverTag, _ ->
val message = outstandingConfirms[deliverTag]
println("未确认的消息是:$message 未确认的消息编号:$deliverTag")
}
channel.addConfirmListener(ackCallback, nackCallback)
val begin = System.currentTimeMillis()
// 批量发送消息 异步确认
for (index in 0 until MESSAGE_COUNT) {
val message = index.toString()
channel.basicPublish("", queueName, null, message.toByteArray())
/** 此处记录所有发送的消息 消息总和
* 消息确认成功回调函数 : 删除确认的消息
* 消息确认失败回调函数
*/
outstandingConfirms[channel.nextPublishSeqNo] = message
}
val end = System.currentTimeMillis()
println("发布 $MESSAGE_COUNT 个异步确认消息,耗时:${end - begin} ms")
}