RabbitMQ六大核心(六大模式)-- 5. Topics(主题模式)主题交换机
  f45gpqkY1FKu 2023年11月19日 19 0

RoutingKey必须时一个单词列表,已点号分隔开,比其他交换机更完美 (info.base.xxx),最长不能超过255个字节

注意:* 可以代替一个单词;# 可以替代零个或多个单词,不匹配的任何绑定会被丢弃;

   当一个队列RoutingKey是 # ,那么这个队列将接收所有数据,有点像fanout

   如果队列RoutingKey中没有 # 或 * ,那么这个队列类型就是 direct

// 生产者
import cn.soldat.utils.RabbitMqUtils

object EmitLogTopic {
    // 交换机名称
    const val EXCHANGE_NAME = "topic_logs"
}

fun main() {
    val channel = RabbitMqUtils.getChannel()
    val bindingKeyMap = mapOf(
        "quick.orange.rabbit" to "被队列Q1Q2收到",
        "lazy.orange.elephant" to "被队列Q1Q2收到",
        "quick.orange.fox" to "被队列Q1接收到",
        "lazy.brown.fox" to "被队列Q2接收到",
        "lazy.pink.rabbit" to "虽然满足两个绑定但只被队列Q2接收一次",
        "quick.brown.fox" to "不匹配任何绑定 不回被任何队列接收到 会被丢弃",
        "quick.orange.male.rabbit" to "是四个单词不匹配任何绑定 会被丢弃",
        "lazy.orange.male.rabbit" to "是四个单词但匹配Q2",
    )

    bindingKeyMap.keys.forEach { routingKey ->
        val message = bindingKeyMap[routingKey]!!
        channel.basicPublish(EmitLogTopic.EXCHANGE_NAME, routingKey,null, message.toByteArray())
        println("生产者发出的消息:$message")
    }

}
// 消费者 1
import cn.soldat.utils.RabbitMqUtils
import com.rabbitmq.client.BuiltinExchangeType
import com.rabbitmq.client.CancelCallback
import com.rabbitmq.client.DeliverCallback

object ReceiveLogsTopic01 {
    // 交换机名称
    const val EXCHANGE_NAME = "topic_logs"
}

// 接收消息
fun main() {

    val channel = RabbitMqUtils.getChannel()
    // 声明交换机
    channel.exchangeDeclare(ReceiveLogsTopic01.EXCHANGE_NAME, BuiltinExchangeType.TOPIC)
    // 声明队列
    val queueName = "Q1"
    channel.queueDeclare(queueName, false, false, false, null)

    channel.queueBind(queueName, ReceiveLogsTopic01.EXCHANGE_NAME, "*.orange.*")
    println("等待接收消息......")

    // 接收消息
    val deliverCallback = DeliverCallback{ _, message ->
        println("接收队列: $queueName 绑定键:${message.envelope.routingKey}( ${String(message.body)})")
    }
    channel.basicConsume(queueName, true, deliverCallback, CancelCallback {  })
}
// 消费者 2
import cn.soldat.utils.RabbitMqUtils
import com.rabbitmq.client.BuiltinExchangeType
import com.rabbitmq.client.CancelCallback
import com.rabbitmq.client.DeliverCallback

object ReceiveLogsTopic02 {
    // 交换机名称
    const val EXCHANGE_NAME = "topic_logs"
}

// 接收消息
fun main() {

    val channel = RabbitMqUtils.getChannel()
    // 声明交换机
    channel.exchangeDeclare(ReceiveLogsTopic02.EXCHANGE_NAME, BuiltinExchangeType.TOPIC)
    // 声明队列
    val queueName = "Q2"
    channel.queueDeclare(queueName, false, false, false, null)

    channel.queueBind(queueName, ReceiveLogsTopic02.EXCHANGE_NAME, "*.*.rabbit")
    channel.queueBind(queueName, ReceiveLogsTopic02.EXCHANGE_NAME, "lazy.#")
    println("等待接收消息......")

    // 接收消息
    val deliverCallback = DeliverCallback{ _, message ->
        println("接收队列: $queueName 绑定键:${message.envelope.routingKey}( ${String(message.body)})")
    }
    channel.basicConsume(queueName, true, deliverCallback, CancelCallback {  })
}
【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月19日 0

暂无评论

推荐阅读
f45gpqkY1FKu