RoutingKey必须时一个单词列表,已点号分隔开,比其他交换机更完美 (info.base.xxx),最长不能超过255个字节
注意:* 可以代替一个单词;# 可以替代零个或多个单词,不匹配的任何绑定会被丢弃;
当一个队列RoutingKey是 # ,那么这个队列将接收所有数据,有点像fanout
如果队列RoutingKey中没有 # 或 * ,那么这个队列类型就是 direct
// 生产者
import cn.soldat.utils.RabbitMqUtils
object EmitLogTopic {
// 交换机名称
const val EXCHANGE_NAME = "topic_logs"
}
fun main() {
val channel = RabbitMqUtils.getChannel()
val bindingKeyMap = mapOf(
"quick.orange.rabbit" to "被队列Q1Q2收到",
"lazy.orange.elephant" to "被队列Q1Q2收到",
"quick.orange.fox" to "被队列Q1接收到",
"lazy.brown.fox" to "被队列Q2接收到",
"lazy.pink.rabbit" to "虽然满足两个绑定但只被队列Q2接收一次",
"quick.brown.fox" to "不匹配任何绑定 不回被任何队列接收到 会被丢弃",
"quick.orange.male.rabbit" to "是四个单词不匹配任何绑定 会被丢弃",
"lazy.orange.male.rabbit" to "是四个单词但匹配Q2",
)
bindingKeyMap.keys.forEach { routingKey ->
val message = bindingKeyMap[routingKey]!!
channel.basicPublish(EmitLogTopic.EXCHANGE_NAME, routingKey,null, message.toByteArray())
println("生产者发出的消息:$message")
}
}
// 消费者 1
import cn.soldat.utils.RabbitMqUtils
import com.rabbitmq.client.BuiltinExchangeType
import com.rabbitmq.client.CancelCallback
import com.rabbitmq.client.DeliverCallback
object ReceiveLogsTopic01 {
// 交换机名称
const val EXCHANGE_NAME = "topic_logs"
}
// 接收消息
fun main() {
val channel = RabbitMqUtils.getChannel()
// 声明交换机
channel.exchangeDeclare(ReceiveLogsTopic01.EXCHANGE_NAME, BuiltinExchangeType.TOPIC)
// 声明队列
val queueName = "Q1"
channel.queueDeclare(queueName, false, false, false, null)
channel.queueBind(queueName, ReceiveLogsTopic01.EXCHANGE_NAME, "*.orange.*")
println("等待接收消息......")
// 接收消息
val deliverCallback = DeliverCallback{ _, message ->
println("接收队列: $queueName 绑定键:${message.envelope.routingKey}( ${String(message.body)})")
}
channel.basicConsume(queueName, true, deliverCallback, CancelCallback { })
}
// 消费者 2
import cn.soldat.utils.RabbitMqUtils
import com.rabbitmq.client.BuiltinExchangeType
import com.rabbitmq.client.CancelCallback
import com.rabbitmq.client.DeliverCallback
object ReceiveLogsTopic02 {
// 交换机名称
const val EXCHANGE_NAME = "topic_logs"
}
// 接收消息
fun main() {
val channel = RabbitMqUtils.getChannel()
// 声明交换机
channel.exchangeDeclare(ReceiveLogsTopic02.EXCHANGE_NAME, BuiltinExchangeType.TOPIC)
// 声明队列
val queueName = "Q2"
channel.queueDeclare(queueName, false, false, false, null)
channel.queueBind(queueName, ReceiveLogsTopic02.EXCHANGE_NAME, "*.*.rabbit")
channel.queueBind(queueName, ReceiveLogsTopic02.EXCHANGE_NAME, "lazy.#")
println("等待接收消息......")
// 接收消息
val deliverCallback = DeliverCallback{ _, message ->
println("接收队列: $queueName 绑定键:${message.envelope.routingKey}( ${String(message.body)})")
}
channel.basicConsume(queueName, true, deliverCallback, CancelCallback { })
}