RabbitMQ六大核心(六大模式)-- 2. WorkQueues (工作队列模式)
  f45gpqkY1FKu 2023年11月09日 18 0

避免立即执行资源密集型任务,生产者大量发消息到队列中,单个消费者线程无法及时处理,多个消费者线程轮询处理消息

注意:一个消息只能被处理一次,不可以被处理多次

提取公共代码:RabbitMQUtils

object RabbitMQUtils {
    fun getChannel(): Channel{
        // 创建连接工厂
        val factory = ConnectionFactory()
        factory.host = "166.166.166.92"
        factory.username = "admin"
        factory.password = "123456"

        // 创建连接
        val connection = factory.newConnection()

        // 创建信道
        return connection.createChannel()
    }
}

工作线程(消费者):两个线程代码相同

import cn.soldat.utils.RabbitMqUtils
import com.rabbitmq.client.CancelCallback
import com.rabbitmq.client.DeliverCallback

object Worker01{
    // 队列名称
    const val QUEUE_NAME = "hello"
}

fun main() {
    val channel = RabbitMqUtils.getChannel()
    // 消息接收
    val deliverCallback = DeliverCallback { _, message -> println("接收到的消息:${String(message.body)}") }
    val cancelCallback = CancelCallback { tag -> println("$tag :: 消息已被取消") }
    println("C2等待接收消息......")
    channel.basicConsume(Worker01.QUEUE_NAME, true, deliverCallback, cancelCallback)
}

生产者:

import cn.soldat.utils.RabbitMqUtils
import java.util.Scanner

object Task01{
    const val QUEUE_NAME = "hello"
}

fun main() {
    val channel = RabbitMqUtils.getChannel()
    // 发送大量消息
    // 队列声明
    channel.queueDeclare(Task01.QUEUE_NAME, false, false,false, null)
    // 发送消息 从控制台中接收消息
    val scanner = Scanner(System.`in`)
    while (scanner.hasNext()){
        val message = scanner.next()
        channel.basicPublish("", Task01.QUEUE_NAME, null, message.toByteArray())
        println("发送消息成功:: $message")
    }
}

测试结果:

# 生产者
    AA
    发送消息成功:: AA
    BB
    发送消息成功:: BB
    CC
    发送消息成功:: CC
    DD
    发送消息成功:: DD
# C1消费者
    C1等待接收消息......
    接收到的消息:AA
    接收到的消息:CC
# C2消费者
    C2等待接收消息......
    接收到的消息:BB
    接收到的消息:DD
消息应答

消费者完成一个任务耗时过长,并且在过程中突然挂掉了,消息会丢失

为了保证消息在发送过程中不丢失,rabbitmq引入消息应答机制,只要不应答,队列就不会删除消息

  1. 自动应答: 需要在高吞吐量和数据传输安全性方面做权衡,没有对传递的消息数量进行限制,这种模式仅适用于在消费者可以高效并以某种速率能够处理这些消息的情况下使用;

自动应答相对不靠谱,只以接收到消息为准;

  1. 手动应答:(推荐使用)

Channel.basicAck() 用于肯定确认

Channel.basicNack() 用于否定确认

Channel.basicReject() 用于否定确认

批量应答:不推荐使用批量应答

消息自动重新入队:如果消费者由于某些原因断开连接,消息会重新入队,分配给其他的消费者进行处理

生产者:

import cn.soldat.utils.RabbitMqUtils
import java.util.*


object Producer{
   // 队列名称
   const val TASK_QUEUE_NAME = "ack_queue"
}

fun main() {
       val channel = RabbitMqUtils.getChannel()
       // 声明队列
       channel.queueDeclare(Producer.TASK_QUEUE_NAME, false, false, false, null)
       // 从控制台输入信息

       val scanner = Scanner(System.`in`)
       while (scanner.hasNext()){
           val message = scanner.next()
           channel.basicPublish("", Producer.TASK_QUEUE_NAME, null, message.toByteArray())
           println("生产者发出消息:$message")
       }
}

消费者01:

import cn.soldat.utils.RabbitMqUtils
import com.rabbitmq.client.CancelCallback
import com.rabbitmq.client.DeliverCallback

object Consumer {
   const val TASK_QUEUE_NAME = "ack_queue"
}

fun main() {
   // 接收消息
   val channel = RabbitMqUtils.getChannel()
   println("C1等待接收消息处理时间较短")

   val deliverCallback = DeliverCallback{ _, message ->
       // 休眠 1S
       Thread.sleep(1 * 1000)
       println("接收到的消息:${String(message.body)}")
       /**
        * 手动应答
        * 1. Tag: 消息标记
        * 2. 是否批量应答
        */
       channel.basicAck(message.envelope.deliveryTag, false)
   } 

   val cancelCallback = CancelCallback{ consumerTag ->  println("$consumerTag ") }

   channel.basicConsume(
       Consumer.TASK_QUEUE_NAME, false, deliverCallback, cancelCallback
   )
}

消费者02:

import cn.soldat.utils.RabbitMqUtils
import com.rabbitmq.client.CancelCallback
import com.rabbitmq.client.DeliverCallback

object Consumer02 {
   const val TASK_QUEUE_NAME = "ack_queue"
}

fun main() {
   // 接收消息
   val channel = RabbitMqUtils.getChannel()
   println("C2等待接收消息处理时间较长")

   val deliverCallback = DeliverCallback{ _, message ->
       // 休眠 30S
       Thread.sleep(30 * 1000)
       println("接收到的消息:${String(message.body)}")
       /**
        * 手动应答
        * 1. Tag: 消息标记
        * 2. 是否批量应答
        */
       channel.basicAck(message.envelope.deliveryTag, false)
   }

   val cancelCallback = CancelCallback{ consumerTag ->  println("$consumerTag ") }

   channel.basicConsume(
       Consumer02.TASK_QUEUE_NAME, false, deliverCallback, cancelCallback
   )
}
RabbitMQ 消息持久化

需要将队列和消息都标记为持久化,重启RabbitMQ之后依然存在

队列实现持久化:channel.queueDeclare( ... durable = true ... )

注意: 如果之前声明的队列不是持久化的,需要先把原来的队列先删除,或者重新创建一个持久化的队列

消息持久化:(生产者发过来的消息)channel.basicPublish( ... props = MessageProperties.PERSISTENT_TEXT_PLAIN ... )

不公平分发

在某些场景下轮询分发策略并不是很好

不公平分发:(由消费者更改)channel.basicQos(1) // 1 为不公平分发,0 - 默认为轮询分发,其他为预取值

预取值(prefetch)

生产者发消息,指定消费者1分配多少消息,指定消费者2分配多少消息

主要是系统集群时使用,根据不同的机器性能分配

注意:预取值的数量是消息在信道中的堆积量,如果消费者在收到下一条之前已经处理完成,则当前信道中只存在一条消息,会继续由此消费者继续处理,不会分配到其他消费者处理

【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月09日 0

暂无评论

推荐阅读
f45gpqkY1FKu