RabbitMQ -- 发布确认高级功能
  f45gpqkY1FKu 2023年11月24日 19 0

RabbitMQ服务器因不明原因重启,导致生产者消息投递失败,如何进行可靠投递?

生产者发消息到交换机,交换机/队列不存在或者无法接受消息,要对缓存进行处理

对发送的消息进行备份,用定时任务对未成功的消息进行重新投递

1)SpringBoot版本

配置文件:application.properties

spring.rabbitmq.publisher-confirm-type=correlated

添加配置类

import org.springframework.amqp.core.Binding
import org.springframework.amqp.core.BindingBuilder
import org.springframework.amqp.core.DirectExchange
import org.springframework.amqp.core.Queue
import org.springframework.amqp.core.QueueBuilder
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration

@Configuration
class ConfirmConfig {
    companion object{
        const val CONFIRM_EXCHANGE_NAME = "confirm_exchange"     // 交换机
        const val CONFIRM_QUEUE_NAME = "confirm_queue"           // 队列
        const val CONFIRM_ROUTING_KEY = "key1"                   // RoutingKey
    }

    // 交换机声明
    @Bean("confirmExchange")
    fun confirmExchange(): DirectExchange = DirectExchange(CONFIRM_EXCHANGE_NAME)

    @Bean("confirmQueue")
    fun confirmQueue(): Queue = QueueBuilder.durable(CONFIRM_QUEUE_NAME).build()

    @Bean
    fun queueBindingExchange(@Qualifier("confirmQueue") queue: Queue, @Qualifier("confirmExchange") exchange: DirectExchange): Binding{
        return BindingBuilder.bind(queue).to(exchange).with(CONFIRM_ROUTING_KEY);
    }
}

生产者

@RestController
@RequestMapping("confirm")
class ProducerController {

    @Autowired
    private lateinit var rabbitTemplate: RabbitTemplate

    @GetMapping("sendMsg/{message}")
    fun sendMessage(@PathVariable message: String){

        val correlationData = CorrelationData("1")

        rabbitTemplate.convertAndSend(
            ConfirmConfig.CONFIRM_EXCHANGE_NAME,
            ConfirmConfig.CONFIRM_ROUTING_KEY,
            message, correlationData
        )
        println("发消息内容为:$message")
    }
}

消费者

import cn.soldat.springbootrabbitmq.config.ConfirmConfig
import org.springframework.amqp.core.Message
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component

@Component
class Consumer {

    @RabbitListener(queues = [ConfirmConfig.CONFIRM_QUEUE_NAME])
    fun receiveConfirmMessage(message: Message){
        println("接受到的队列confirm.queue消息:${String(message.body)}")
    }
}
回退消息

在仅开启了生产者确认机制情况下,交换机接收到消息后,会直接给生产者发送确认消息,如果发现该消息不可路由,则会丢弃消息,不会通知生产者。通过设置mandatory参数,可以在当消息传递过程中不可达目的地时将消息返回给生产者。

配置文件

spring.rabbitmq.publisher-returns=true

回调接口

@Component
class MyCallBack: RabbitTemplate.ReturnsCallback {

    @Autowired
    private lateinit var rabbitTemplate: RabbitTemplate
    // 注入到 RabbitTemplate

    @PostConstruct
    fun init() {
        rabbitTemplate.setReturnsCallback(this)
    }

    override fun returnedMessage(returned: ReturnedMessage) {
        println("消息:${returned.message} 被交换机:${returned.exchange} 退回,退回原因:${returned} 路由Key:${returned.routingKey}")
    }
}
备份交换机

无法投递的消息将发送给备份交换机,可以设置报警队列,用独立的消费者进行监测和报警。

备份交换机的优先级高于消息回退

配置类添加备份交换机、备份队列、报警队列

@Configuration
class ConfirmConfig {
    companion object{
        const val CONFIRM_EXCHANGE_NAME = "confirm_exchange"     // 交换机
        const val CONFIRM_QUEUE_NAME = "confirm_queue"           // 队列
        const val CONFIRM_ROUTING_KEY = "key1"                   // RoutingKey

        const val BACKUP_EXCHANGE = "backup_exchange"   // 备份交换机
        const val BACKUP_QUEUE_NAME = "backup_queue"    // 备份队列
        const val WARNING_QUEUE_NAME = "warning_queue"  // 报警队列

    }

    // 交换机声明
    @Bean("confirmExchange")
    fun confirmExchange(): DirectExchange{
        return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME).withArgument("alternate-exchange", BACKUP_EXCHANGE).build()
    }

    @Bean("confirmQueue")
    fun confirmQueue(): Queue = QueueBuilder.durable(CONFIRM_QUEUE_NAME).build()

    @Bean
    fun queueBindingExchange(@Qualifier("confirmQueue") queue: Queue, @Qualifier("confirmExchange") exchange: DirectExchange): Binding{
        return BindingBuilder.bind(queue).to(exchange).with(CONFIRM_ROUTING_KEY);
    }

    @Bean("backupExchange")
    fun backupExchange(): FanoutExchange = FanoutExchange(BACKUP_EXCHANGE)

    @Bean("backupQueue")
    fun backupQueue(): Queue = Queue(BACKUP_QUEUE_NAME)

    @Bean("warningQueue")
    fun warningQueue(): Queue = Queue(WARNING_QUEUE_NAME)

    @Bean
    fun backupQueueBindingBackupExchange(@Qualifier("backupQueue") queue: Queue, @Qualifier("backupExchange") exchange: FanoutExchange): Binding{
        return BindingBuilder.bind(queue).to(exchange)
    }

    @Bean
    fun warningQueueBindingBackupExchange(@Qualifier("warningQueue") queue: Queue, @Qualifier("backupExchange") exchange: FanoutExchange): Binding{
        return BindingBuilder.bind(queue).to(exchange)
    }
}

消费者

@Component
class WarningConsumer{
    @RabbitListener(queues = [ConfirmConfig.WARNING_QUEUE_NAME])
    fun receiveConfirmMessage(message: Message){
        val msg = String(message.body)
        println("报警发现不可路由消息:$msg")
    }
}
【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月24日 0

暂无评论

推荐阅读
f45gpqkY1FKu