避免立即执行资源密集型任务,生产者大量发消息到队列中,单个消费者线程无法及时处理,多个消费者线程轮询处理消息
注意:一个消息只能被处理一次,不可以被处理多次
提取公共代码:RabbitMQUtils
object RabbitMQUtils {
fun getChannel(): Channel{
// 创建连接工厂
val factory = ConnectionFactory()
factory.host = "166.166.166.92"
factory.username = "admin"
factory.password = "123456"
// 创建连接
val connection = factory.newConnection()
// 创建信道
return connection.createChannel()
}
}
工作线程(消费者):两个线程代码相同
import cn.soldat.utils.RabbitMqUtils
import com.rabbitmq.client.CancelCallback
import com.rabbitmq.client.DeliverCallback
object Worker01{
// 队列名称
const val QUEUE_NAME = "hello"
}
fun main() {
val channel = RabbitMqUtils.getChannel()
// 消息接收
val deliverCallback = DeliverCallback { _, message -> println("接收到的消息:${String(message.body)}") }
val cancelCallback = CancelCallback { tag -> println("$tag :: 消息已被取消") }
println("C2等待接收消息......")
channel.basicConsume(Worker01.QUEUE_NAME, true, deliverCallback, cancelCallback)
}
生产者:
import cn.soldat.utils.RabbitMqUtils
import java.util.Scanner
object Task01{
const val QUEUE_NAME = "hello"
}
fun main() {
val channel = RabbitMqUtils.getChannel()
// 发送大量消息
// 队列声明
channel.queueDeclare(Task01.QUEUE_NAME, false, false,false, null)
// 发送消息 从控制台中接收消息
val scanner = Scanner(System.`in`)
while (scanner.hasNext()){
val message = scanner.next()
channel.basicPublish("", Task01.QUEUE_NAME, null, message.toByteArray())
println("发送消息成功:: $message")
}
}
测试结果:
# 生产者
AA
发送消息成功:: AA
BB
发送消息成功:: BB
CC
发送消息成功:: CC
DD
发送消息成功:: DD
# C1消费者
C1等待接收消息......
接收到的消息:AA
接收到的消息:CC
# C2消费者
C2等待接收消息......
接收到的消息:BB
接收到的消息:DD
消息应答
消费者完成一个任务耗时过长,并且在过程中突然挂掉了,消息会丢失
为了保证消息在发送过程中不丢失,rabbitmq引入消息应答机制,只要不应答,队列就不会删除消息
- 自动应答: 需要在高吞吐量和数据传输安全性方面做权衡,没有对传递的消息数量进行限制,这种模式仅适用于在消费者可以高效并以某种速率能够处理这些消息的情况下使用;
自动应答相对不靠谱,只以接收到消息为准;
- 手动应答:(推荐使用)
Channel.basicAck() 用于肯定确认
Channel.basicNack() 用于否定确认
Channel.basicReject() 用于否定确认
批量应答:不推荐使用批量应答
消息自动重新入队:如果消费者由于某些原因断开连接,消息会重新入队,分配给其他的消费者进行处理
生产者:
import cn.soldat.utils.RabbitMqUtils
import java.util.*
object Producer{
// 队列名称
const val TASK_QUEUE_NAME = "ack_queue"
}
fun main() {
val channel = RabbitMqUtils.getChannel()
// 声明队列
channel.queueDeclare(Producer.TASK_QUEUE_NAME, false, false, false, null)
// 从控制台输入信息
val scanner = Scanner(System.`in`)
while (scanner.hasNext()){
val message = scanner.next()
channel.basicPublish("", Producer.TASK_QUEUE_NAME, null, message.toByteArray())
println("生产者发出消息:$message")
}
}
消费者01:
import cn.soldat.utils.RabbitMqUtils
import com.rabbitmq.client.CancelCallback
import com.rabbitmq.client.DeliverCallback
object Consumer {
const val TASK_QUEUE_NAME = "ack_queue"
}
fun main() {
// 接收消息
val channel = RabbitMqUtils.getChannel()
println("C1等待接收消息处理时间较短")
val deliverCallback = DeliverCallback{ _, message ->
// 休眠 1S
Thread.sleep(1 * 1000)
println("接收到的消息:${String(message.body)}")
/**
* 手动应答
* 1. Tag: 消息标记
* 2. 是否批量应答
*/
channel.basicAck(message.envelope.deliveryTag, false)
}
val cancelCallback = CancelCallback{ consumerTag -> println("$consumerTag ") }
channel.basicConsume(
Consumer.TASK_QUEUE_NAME, false, deliverCallback, cancelCallback
)
}
消费者02:
import cn.soldat.utils.RabbitMqUtils
import com.rabbitmq.client.CancelCallback
import com.rabbitmq.client.DeliverCallback
object Consumer02 {
const val TASK_QUEUE_NAME = "ack_queue"
}
fun main() {
// 接收消息
val channel = RabbitMqUtils.getChannel()
println("C2等待接收消息处理时间较长")
val deliverCallback = DeliverCallback{ _, message ->
// 休眠 30S
Thread.sleep(30 * 1000)
println("接收到的消息:${String(message.body)}")
/**
* 手动应答
* 1. Tag: 消息标记
* 2. 是否批量应答
*/
channel.basicAck(message.envelope.deliveryTag, false)
}
val cancelCallback = CancelCallback{ consumerTag -> println("$consumerTag ") }
channel.basicConsume(
Consumer02.TASK_QUEUE_NAME, false, deliverCallback, cancelCallback
)
}
RabbitMQ 消息持久化
需要将队列和消息都标记为持久化,重启RabbitMQ之后依然存在
队列实现持久化:channel.queueDeclare( ... durable = true ... )
注意: 如果之前声明的队列不是持久化的,需要先把原来的队列先删除,或者重新创建一个持久化的队列
消息持久化:(生产者发过来的消息)channel.basicPublish( ... props = MessageProperties.PERSISTENT_TEXT_PLAIN ... )
不公平分发
在某些场景下轮询分发策略并不是很好
不公平分发:(由消费者更改)channel.basicQos(1) // 1 为不公平分发,0 - 默认为轮询分发,其他为预取值
预取值(prefetch)
生产者发消息,指定消费者1分配多少消息,指定消费者2分配多少消息
主要是系统集群时使用,根据不同的机器性能分配
注意:预取值的数量是消息在信道中的堆积量,如果消费者在收到下一条之前已经处理完成,则当前信道中只存在一条消息,会继续由此消费者继续处理,不会分配到其他消费者处理