RabbitMQ -- 死信队列
  f45gpqkY1FKu 2023年11月22日 17 0

死信:无法被消费的消息

死信队列:当消息消费发生异常,将消息投入死信队列, 防止消息丢失。比如:用户在商城下单成功在指定时间未支付时自动失效

死信来源:消息TTL过期、队列达到最大长度、消息被拒(否定应答或拒绝应答,不自动放回队列,等后续有空再来处理)

消息TTL过期

// 生产者
import cn.soldat.utils.RabbitMqUtils
import com.rabbitmq.client.AMQP

object Producer {
    const val NORMAL_EXCHANGE = "normal_exchange" // 普通交换机名称
}

fun main() {
    val channel = RabbitMqUtils.getChannel()

    // 延迟消息 -- 死信消息 设置TTL时间 Time To Live 存活时间 10 * 1000 = 10s
    val props = AMQP.BasicProperties().builder().expiration("10000").build()
    for (index in 1 until 11){
        val message = "info $index"
        channel.basicPublish(Producer.NORMAL_EXCHANGE, "zhangsan", props, message.toByteArray())
    }
}
// 消费者 1 -- 将正常交换机与死信交换机连接
import cn.soldat.utils.RabbitMqUtils
import com.rabbitmq.client.BuiltinExchangeType
import com.rabbitmq.client.CancelCallback
import com.rabbitmq.client.DeliverCallback

object Consumer01 {
    const val NORMAL_EXCHANGE = "normal_exchange" // 普通交换机名称
    const val DEAD_EXCHANGE = "dead_exchange" // 死信交换机名称
    const val NORMAL_QUEUE = "normal_queue" // 普通队列名称
    const val DEAD_QUEUE = "dead_queue" // 死信队列名称
}

fun main() {
    val channel = RabbitMqUtils.getChannel()

    // 声明交换机 死信和普通类型为 direct
    channel.exchangeDeclare(Consumer01.NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT)
    channel.exchangeDeclare(Consumer01.DEAD_EXCHANGE, BuiltinExchangeType.DIRECT)
    // 声明队列 普通队列
    val arguments = mapOf(
        // 过期时间 -- 也可以在生产者发消息的时候设置
        // "x-message-ttl" to 10 * 1000,
        // 正常队列设置过期之后的死信交换机
        "x-dead-letter-exchange" to Consumer01.DEAD_EXCHANGE,
        // 设置死信RoutingKey
        "x-dead-letter-routing-key" to "lisi"
    )
    channel.queueDeclare(Consumer01.NORMAL_QUEUE, false, false, false, arguments)
    ///////////////////////////////////////////////////////////////
    // 声明队列 死信队列
    channel.queueDeclare(Consumer01.DEAD_QUEUE, false, false, false, null)

    // 绑定普通交换机与队列
    channel.queueBind(Consumer01.NORMAL_QUEUE, Consumer01.NORMAL_EXCHANGE, "zhangsan")
    // 绑定死信交换机与死信队列
    channel.queueBind(Consumer01.DEAD_QUEUE, Consumer01.DEAD_EXCHANGE, "lisi")
    println("等待接收消息......")

    val deliverCallback = DeliverCallback{ consumerTag,message ->
        println("Consumer01 接收的消息是: ${String(message.body)}")
    }
    channel.basicConsume(Consumer01.NORMAL_QUEUE, true, deliverCallback, CancelCallback {  })
}
// 消费者 2 -- 负责接收处理死信消息
import cn.soldat.utils.RabbitMqUtils
import com.rabbitmq.client.CancelCallback
import com.rabbitmq.client.DeliverCallback

object Consumer02 {
    const val DEAD_QUEUE = "dead_queue" // 死信队列名称
}

fun main() {
    val channel = RabbitMqUtils.getChannel()

    println("等待接收消息......")

    val deliverCallback = DeliverCallback{ consumerTag,message ->
        println("Consumer01 接收的消息是: ${String(message.body)}")
    }
    channel.basicConsume(Consumer02.DEAD_QUEUE, true, deliverCallback, CancelCallback {  })
}

队列达到最大长度: 超出的部分为死信消息

// 生产者
fun main() {
    val channel = RabbitMqUtils.getChannel()

    for (index in 1 until 11) {
        val message = "info $index"
        channel.basicPublish(Producer.NORMAL_EXCHANGE, "zhangsan", null, message.toByteArray())
    }
}
// 消费者 1
// 声明队列 普通队列
val arguments = mapOf(
    // 正常队列设置过期之后的死信交换机
    "x-dead-letter-exchange" to Consumer01.DEAD_EXCHANGE,
    // 设置死信RoutingKey
    "x-dead-letter-routing-key" to "lisi",
    // 设置正常队列的长度限制
    "x-max-length" to 6
)
channel.queueDeclare(Consumer01.NORMAL_QUEUE, false, false, false, arguments)

消息被拒绝

// 消费者 1
val deliverCallback = DeliverCallback{ _,message ->
        val msg = String(message.body)
        if (msg == "info 5"){
            println("Consumer01 接收的消息是: $msg : 此消息是被拒绝的")
            channel.basicReject(message.envelope.deliveryTag, false)
        }else{
            println("Consumer01 接收的消息是: $msg")
            channel.basicAck(message.envelope.deliveryTag, false)
        }
    }
    channel.basicConsume(Consumer01.NORMAL_QUEUE, false, deliverCallback, CancelCallback {})
【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月22日 0

暂无评论

推荐阅读
f45gpqkY1FKu