消息队列学习
  SxTLKyfVla02 2023年11月02日 28 0
  1. 为什么使用消息队列

解藕、异步、削峰

  • 解藕:在多服务架构中(不只是分布式),服务与服务之间的通信,通常是发起服务之间的调用来完成,如果另一端服务出现延迟,则调用方就需要等待,这个场景中就出现了严重的耦合现象。例如订单等场景;
  • 削峰:应用于高并发场景下,例如一个服务器每秒能处理的最大请求数是 1000 个,但是实际业务请求高达1w,此时就会引起服务器卡顿甚至宕机。此时引入 MQ,所有的请求都放入到 MQ,服务端直接从 MQ 拉取,固定拉取 1000 个来进行处理,一旦峰值过去,系统会快速将积压的消息处理掉。
  • 异步处理:将一些非及时相应的任务,直接派发给 MQ,主任务不关心消费者什么时候处理或者处理结果,一般在短信、邮件等非及时相应的场景中使用。
  1. 消息队列的选型

RabbitMQ kafka RocketMQ

消息传递性上

对消息的有序性 RabbitMQ和 RocketMQ可以实现有序性,他们更多的关注并发性;kafka本身就具有有序性的特点,它的消费者是单线程消费的。

  1. 消息队列实现高吞吐

将大量的数据批量推送的下游服务。

  1. RabbitMQ 的架构设计

RabbitMQ 是一个使用 Erlang 语言开发,基于 AMQP 通信协议实现的,他是一种应用程序对应用程序通信的方法,应用程序通过读写出入队列的消息来通信,而无需专用的链接来链接他们。producer 发送消息到 rabbitMQ 服务端的 exchange 交换机,交换机通过路由key 路由到队列,然后消费者从队列获取消息,开始消费。rabbitMQ 主要由以下几部分组成:

  • Queue:他是 MQ 的一个内部对象,用于存储消息,多个消费者可以订阅同一个队列,队列中的消息只会被一个订阅者消费,队列会把消息均分到不同的订阅者。因此 rabbitMQ 不支持队列层面的广播消费,如果想要实现广播消费,可以采用一个交换机绑定多个路由的方式实现。
  • Exchange:交换机,生产者发送消息到交换机,交换机将消息路由到一个或多个队列中,如果路由不到,就返回给生产者,或者直接丢弃。在 rabbitMQ 中一共有四种类型的交换机:
  1. fanuot:类似于广播,群发,是一种广播机制,接收到的消息,会分发给所有绑定的队列,每个队列都会得到全部消息;
  2. direct:他在fanuot模式上增加了routingkey,fanuot 是将消息直接存储到所有的队列中,而 direct 添加了过滤条件,交换机只会将消息存储到对应的 routing key队列中;
  3. topic:收到消息后,交换机根据 routingkey进行通配符匹配,如果匹配成功,就将消息分发到对应的队列,有多个就分发多个,没有匹配成功,则消息丢失;
  4. simple:简单模式,一个生产者、一个消费者、一个队列组成,此时可以不定义交换机,使用默认的交换机处理。
  • RoutingKey:路由 key,生产者将消息发送给交换机,一般会指定一个路由 key,用来确认这个消息的路由规则。这个路由 key 需要与交换机通过 binding 来生效;
  • Binding:通过绑定将交换机和队列关联起来,在绑定的时候一般会指定一个绑定key,这样 rabbitMQ 就可以指定如何正确的路由到队列了;
  • Channel:通信信道,几乎所有的操作都在 channel 中进行,channel 是进行消息读写的通道,客户端建立多个 channel,每个 channel 代表一个会话任务;
  • Message:消息,服务与应用之间传送的数据,由 perperties 和 body 组成,properties 可以对消息进行修饰,比如消息的优先级、延迟等高级特性,body 则是消息的内容。
  1. RabbitMQ 的事务消息

看第 7 的消息持久化

  1. RabbitMQ 死信队列、延迟队列

死信队列的定义:

  1. 消息被消费方否定确认,使用 channel.basicNack或者channel.basickReject,并且 requeue属性被设置为 false;
  2. 消息队列的存活时间超过了设置的 TTL;
  3. 消息队列的消息数量已经超过最大队列的长度

死信消息会被 MQ 特殊处理,如果配置了死信队列,那么就会转移到死信队列中,如果没有,则直接丢弃。

延迟队列:将消息设置了最大存活时间,如果一条消息在存活时间内没有被消费,就会称为死信消息。

  1. RabbitMQ如何确保消息不丢失

首先说一下rabbitMQ的整体工作流程:

  • 生产者发送消息到RabbitMQ服务端;
  • RabbitMQ接收到消息,消息先是到达了exchange交换机,然后路由到对应的队列,同时将消息存入内存中,并将消息发送到消费端;
  • 消费者接收到消息,开始消费消息

在以上三个步骤中,每一个环节都可能导致消息的丢失,例如生产者发送消息,因为网络问题没有发送出去,或者生产者宕机;MQ服务端接收到消息以后,还没有来得及消费,MQ挂了;消费者接收到消息,还没有来得及消费,消费者挂了,这都会导致消息丢失,消息丢失不可怕,可怕的是消息丢失了都还不知道,因此为了确保消息的安全性,主要从以下几方面考虑:

  • 生产端消息可靠
  1. 使用RabbitMQ的事务消息机制

生产者在发送消息之前开启事务channel.txSelect,然后发送消息,如果消息没有成功发送到服务端,则生产者抛出异常,此时可以做回滚事务的处理channel.txRollback,然后重发消息或者消息入库处理,如果服务端接收到了消息,就提交事务channel.txCommit,但是由于RabbitMQ的事务机制是一个同步处理的过程,事务会造成阻塞,这就抵消了MQ带来的异步或解耦的功能,严重降低性能,因此一般不采用此种方式处理。

  1. confirm消息确认机制

生产者通过channel.confirmSelect()开启comfirm消息确认模式,每次发送的消息都会分配一个唯一的消息ID,生产者将消息发送到MQ服务端后,服务端发送一个ack确认消息给生产者,此时生产者就可以知道消息已经发送成功,否则会调用nack接口,告知消息失败,此时可以做消息重试或者消息入库处理。

  • 服务端消息持久化

由于服务端的消息是存储在内存中的,此时一旦出现服务端宕机,则消息就会丢失,此时必须开启RabbitMQ的消息持久化,这样即便是服务挂了,那么在服务恢复之后也会自动的读取到磁盘上的日志进行恢复。由于消息是先到达exhcange交换机,然后将消息路由到队列,最终将消息发送给消费者,因此这里需要将exchange、queue、message都持久化:

  1. 通过channel.exchangeDeclare来设置交换机持久化;
  2. 设置channel.queueDeclare设置队列持久化,也可以在创建队列的时候将某一个指定的队列持久化,但是单独设置某一个队列,他只会持久化元数据,并不会持久化队列的数据。
  3. 设置channel.basicPublish持久化message

通过以上三种方式设置,就可以完成服务端的消息持久化,即使服务端挂了,再重启后也会自行恢复,但是存在一种极端情况,就是服务端收到了消息还没有来得及持久化,此时宕机了,导致生产者无法接收到ack,此时结合生产者的comfirm机制,将消息入库,生产者通过消息重试或者定时任务进行补偿。

  • 消费端消息确保能正常消费

前面两道关口都能正常通过,最后一步失败,例如服务端到生产者网络故障、消费者挂了、消费者消费中宕机,这三种情况都会引起消费失败,而服务端消息发送出来了,此时就会造成消息丢失。由于RabbitMQ的自动ack机制,即默认消息发出后就立刻将这条消息删除,而不管消费端是否正常消费,就会导致消费端消费失败,而服务端也没有了这条消息,此时我们只需要改自动ack机制为手动ack即可,服务端没有收到消费者的确认信号,此时RabbitMQ就会将消息重新放入队列头部,等待下一个消费者消费或者待消费者恢复以后重新消费

  1. 如何保证消息不被重复消费

重复是必然发生的,在下游服务进行幂等操作

幂等:消息带有可以幂等的信息,下游服务器在处理的时候可以判断此消息有么有被处理过。

  1. 如何处理消息积压问题

1.增加消费能力 下游消费能力不够导致

2.增加分治负载能力 增加分区

  1. redis + mq + mysql的架构可行性

确保最终一致性即可。

  1. 延迟队列与定时消息

延迟:time后可见

定时:time内合建

  1. 可靠投递与事务消息

producer 可靠投递,是自己的一条消息能不能投递成功

producer 事务消息 N的操作事务性

kafka事务消息:提交前消息放入是放入一个queue文件中的,只不过消费不可见,通过提交commit-warkmark

rocket事务消息:提交前消息时放入一个事务队列,消费者不可见,通过commit转移数据到目标队列

差异:kafka就是一个追求自己投递的事务;rocketmq 他有一个回查机制

  1. rocketMQ 如何保证消息不丢失
  • 生产者:对于生产者来讲,只需要确认消息能够被正常发送即可,主要可以通过三种方式来确保消息发送成功:
  1. 同步发送,加上失败重试机制,可能 broker 存储失败,可以通过查询确认
  2. 异步发送消息,通过回调方法来确认发送结果
  3. 通过ack机制,存储成功后返回ack,通过ack判断是否成功接收消息
  • broker:同步刷盘,集群模式下采用同步复制、等待slave复制完成才返回结果
  • 消费者:手动提交offset,消息消费的幂等
  1. rocketMQ 事务消息原理

rocketMQ 事务消息依赖于一个 TransactionListener 接口,producer 需要实现此接口,其中有两个方法:

  • executeLocalTransaction: 在发送消息后调用,用于执行本地事务,如果本地事务提交成功,rocketMQ 再提交消息
  • checkLocalTransaction:对本地事务进行检查,rocketMQ 依赖此方法做补偿

同时通过两个内部的topic来实现对消息二阶段提交的支持:

  • prepare: 将带有事务标识的消息投递到一个名为RMS_SYS_TRANS_HALF_TOPIC的 topic 中,而不是真正的 topic;
  • commit/rollback:producer 执行本地事务,当 producer 的本地事务处理成功或者失败后,producer 向 broker 发送 commit 或者 rollback 命令,如果是 commit,则 broker 将投递到半 topic 中的消息发送到真正的 topic,然后投递一个标识删除的消息到RMS_SYS_TRANS_OP_HALF_TOPIC,标识当前事务执行完成;如果是 rollback,则没有投递到真实topic 的操作,只投递标识删除的消息。最后消费者消费事务消息。
  1. rocketMQ 如何实现顺序消息

默认情况下是无法保证的,需要程序确保发送和消费都是在同一个 queue,在多线程消费的情况下,也是无法保证消息的顺序性的。

  • 发送顺序:发送端自己保证顺序,可以在消息体上设置消息的顺序,发送到一个固定的 queue,生产者通过实现 messageQueueSelecter接口,选择一个 queue 发送,也可以使用 rocketMQ 提供的默认实现来发送消息:
  1. selectmessageQueueByhash:按参数的 hashcode 与可选队列进行取模选择
  2. selectMessageQueueByrandom:随机选择
  • 服务端的 queue 本来就是有序的,只需要保证一个队列在同一个时间只有一个消费者进行消费
  • 消费端:
  1. pull 模式:消费者自己维护拉取的 queue,一次拉取的 queue 都是有序的,消费者自己保证消费顺序;
  2. push 模式:消费者自己实现 MQPushConsumer 接口提供的注册消息监听方法 registerMessageListener,他里面包含了两个方法,并行消费和串行消费,串行消费,consumer 会把消息放入本地队列并加锁,定时任务保证锁的同步
  • kafka如何保证消息不丢失

kafka 是一个用来实现异步消息通信的中间件,他的架构由producer、consumer、brocker组成,因此这个问题也主要基于这三个方面来回答。

  • producer 需要确保消息能够到达brocker,并且实现消息的存储,在这个层面有可能网络问题导致消息发送失败,producer默认是异步发送消息的,那么这里有三种方式:

1. 可以把异步发送改为同步发送,这样producer就能实时直到消息发送的结果;

2. 另外一种是添加异步回调函数来监听消息发送的结果,如果发送失败,就在回调中重试;

3. producer本身提供了一个重试参数retries,因为网络问题或其他故障导致发送失败,producer会自动重试。

  • 只要确保producer的消息不会丢失,那么broker端只要将接受到的消息持久化到磁盘即可,但是kafka为了提供性能,采用了异步批量刷盘的机制,即按照一定的消息量和时间间隔去刷盘,而最终刷新到磁盘的这个动作,是由操作系统来调度的,所以这个时候受外部系统影响非常大,例如操作系统奔溃、断电等。而kafka并没有提供同步刷盘的机制,解决这个问题需要通过partition的副本机制和acks机制来解决。
  • 最后一步 只需要确保consumer能够正常的消费这个消息,正常情况下,只需要确保前面两步没有出现消息丢失的情况,消费者是不会出现消息丢失的情况,除非consumer没有消费完这个消息,就提交了offset,这个问题我们也可以重新调整offset来解决这个问题。
  1. kafka的零拷贝原理

在实际应用中,如果我们需要吧磁盘中的某个文件发送的远程服务器上,他必须要经过几个拷贝过程:

  • 从磁盘中读取目标文件内容copy到内核缓冲区
  • 从内核缓冲区copy到用户空间的缓冲区
  • 在应用程序中调用write方法,把用户空间缓冲区中的数据copy到内核空间的socketBuffer中
  • 把socketBuffer中的数据,赋值到网卡缓冲区NIC Buffer,最后网卡发送数据

在这个过程中,发送一个数据到远程服务器要经历4次拷贝,而在这4次拷贝中,有两次是多余的,就是从内核态copy到用户态,然后再从用户态拷贝的内核态。除此之外,空间的切换也会导致CPU的上下文切换,对CPU的性能也有有一定的浪费,而所谓的零拷贝,就是把这两次多余的操作给忽略掉,应用程序可以直接把磁盘中的数据从内核中直接传输到socket,而不需要再次经过应用程序所在的用户空间,零拷贝通过DMA技术(Direct Memory Access)把文件内容直接复制到内核空间中的ReadBuffer,接着把包含文件长度和位置信息的文件描述符加载到socketBuffer中,DMA引擎就可以直接把数组从内核空间传递到网卡中,在这个过程中,数据只经历了两次拷贝就把数据发送到网卡中,并且减少了两次CPU上线文的切换,对于效率有非常大的提高,因此所谓的零拷贝并不是完全没有数据的拷贝,只是相当于用户空间来说不需要在进行数据拷贝的过程,他只是减少了不必要的拷贝次数而已。

在linux系统中,零拷贝基于其底层的sendFile方法去实现,而在Java中,fileChannel.transeferTo方法的底层实现就是sendFile方法。除此之外,还有一个叫MMAP的文件映射机制,他的原理是把磁盘文件映射到内存,用户通过修改内存就可以修改磁盘文件,使用这种方式可以获得很大的IO提升,省去了用户空间到内核空间的复制开销。

  1. Kafka如何避免重复消费的问题

Kafka broker上存储的消息,都有一个offset的标记,Kafka的消费者是通过offset这个标记来维护当前已经消费的数据,消费者每消费一批数据,Kafka broker就会更新offset来避免重复消费的问题,默认情况下,消息消费完成后会自动提交一个offset,在Kafka消费端提交offset的逻辑里面,有一个默认5秒的一个间隔,因此每一次提交offset都会存在5秒的间隔,因此在consumer消费的过程中,应用程序被强制的kill掉或者宕机,可能会导致这部分offset没有提交,从而导致重复消费的问题,除此之外还有一个原因也会导致重复消费的问题,在Kafka里面有一个叫partition balance的机制,就是把partition均衡的分配给多个消费者,consumer端会从分配的partition里面去消费消息,如果consumer在默认的5分钟以内没有办法处理这一批消息,就会触发Kafka的rebalance机制,从而导致offset自动提交失败,而在重新balance之后,consume端还是会从之前的offset位置去消费,从而导致重复消费的问题,基于此,有几种解决办法:

  • 提高消费端的处理性能,避免触发rebalance,如通过异步的方式来处理消息,缩短消息的处理时长,调整消息超时时间、减少一次性获取数据的条数;
  • 针对每条消息生成MD5存在redis,在处理消息的时候都检查MD5,如果有就不处理了。


【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月08日 0

暂无评论

推荐阅读
SxTLKyfVla02
作者其他文章 更多

2023-11-02