rocketMQ消息队列简介及其实例
  ijmlJDf00eQp 2023年11月01日 53 0

 RocketMQ优点:

单机吞吐量:十万级

可用性:非常高,分布式架构

消息可靠性:经过参数优化配置,消息可以做到0丢失

功能支持:MQ功能较为完善,还是分布式的,扩展性好

支持10亿级别的消息堆积,不会因为堆积导致性能下降

缺点:兼容性差点

一、RocketMQ 核心的四大组件:

Producer:就是消息生产者,可以集群部署。它会先和 NameServer 集群中的随机一台建立长连接,得知当前要发送的 Topic 存在哪台 Broker Master上,然后再与其建立长连接,支持多种负载平衡模式发送消息。

Consumer:消息消费者,也可以集群部署。它也会先和 NameServer 集群中的随机一台建立长连接,得知当前要消息的 Topic 存在哪台 Broker Master、Slave上,然后它们建立长连接,支持集群消费和广播消费消息。

Broker:主要负责消息的存储、查询消费,支持主从部署,一个 Master 可以对应多个 Slave,Master 支持读写,Slave 只支持读。Broker 会向集群中的每一台 NameServer 注册自己的路由信息。

做集群的时候只需要保证配置文件中nameserver的地址指向相同;且brokerid=0为master,>0为slave即可;

NameServer:类似Zookeeper,是一个很简单的 Topic 路由注册中心,支持 Broker 的动态注册和发现,保存 Topic 和 Borker 之间的关系。通常也是集群部署,但是各 NameServer 之间不会互相通信, 各 NameServer 都有完整的路由信息,即无状态。

二、rocketmq基本工作流程:

1、先启动 NameServer 集群,各 NameServer 之间无任何数据交互,Broker在启动的时候会注册自己配置的Topic信息到NameServer集群的每一台机器中。即每一个NameServer均有该broker的Topic路由配置信息,并向所有 NameServer 定期(每 30s)发送心跳包,包括:IP、Port、TopicInfo;NameServer 也会定期扫描 Broker 存活列表,如果超过 120s 没有心跳则移除此 Broker 相关信息,代表下线。

2、这样每个 NameServer 就知道集群所有 Broker 的相关信息,此时 Producer 上线会根据配置文件中的NameServer 地址自动连接一个NameServer ;每 30s 会从连接的 NameServer 获取 Topic 和 Broker 的映射关系存在本地内存中,从 NameServer 就可以得知它要发送的某 Topic 消息在哪个 Broker 上,和对应的 Broker (Master 角色的)建立长连接,发送消息。

3、Consumer 上线也可以从 NameServer 得知它所要接收的 Topic 是哪个 Broker ,和对应的 Master、Slave 建立连接,接收消息。

可以理解为如下:

name server:注册中心

broker:消息处理

procucer:生成消息

consumer:消费消息

每个组件都可以部署成集群模式进行水平扩展。
消息由topic区分消息类型(一级分类):如订单消息,物流消息等
tag为二级分类
message queue为消息类型下的消息队列。
用于并行发送和接受消息。

四、基础
分布式事务:
对于分布式事务,通俗地说就是,一次操作由若干分支操作组成,这些分支操作分属不同应用,分布在不同服务器上。分布式事务需要保证这些分支操作要么全部成功,要么全部失败。分布式事务与普通事务一样,就是为了保证操作结果的一致性。

事务消息:
RocketMQ提供了类似X/Open XA的分布式事务功能,通过事务消息能达到分布式事务的最终一致。XA是一种分布式事务解决方案,一种分布式事务处理模式。

半事务消息:
暂不能投递的消息,发送方已经成功地将消息发送到了Broker,但Broker未收到最终确认指令,此时该消息被标记成“暂不能投递”状态,即不能被消费者看到。处于该种状态下的消息即半事务消息。

本地事务状态:
Producer回调操作执行的结果为本地事务状态,其会发送给TC,而TC会再发送给TM。TM会根据TC发送来的本地事务状态来决定全局事务确认令。

// 描述本地事务执行状态 public enum LocalTransactionState {
COMMIT_MESSAGE, // 本地事务执行成功
ROLLBACK_MESSAGE, // 本地事务执行失败
UNKNOW, // 不确定,表示需要进行回查以确定本地事务的执行结果
}

RocketMQ中的消息回查设置:
关于消息回查,有三个常见的属性设置。它们都在broker加载的配置文件中设置,例如:

transactionTimeout=20,指定TM在 20 秒内应将最终确认状态发送给TC,否则引发消息回查。默认为 60 秒
transactionCheckMax=5,指定最多回查 5 次,超过后将丢弃消息并记录错误日志。默认 15 次。
transactionCheckInterval=10,指定设置的多次消息回查的时间间隔为 10 秒。默认为 60 秒。
五、Topic与Broker的关系:

  • Borker中有一个或多个Topic
  • Topic中有一个或多个MessageQueue

Topic可以自动创建和手动创建;

1、手动创建也叫预先创建,就是在使用Topic之前就创建,可以通过命令行或者通过RocketMQ的管理界面(可视化控制台)创建Topic。

方法:DefaultMQProducer producer = rocketMQTemplate.getProducer();

producer.createTopic(String key, String newTopic, int queueNum, int topicSysFlag)

key:这个参数是系统已经存在的一个topic的名称,新建的topic会跟它在相同的broker上创建;key或者是broker的名称也行
newTopic:新建的topic的名称
queueNum:指定topic中queue的数量
topicSysFlag:topic的标记位设置,没有特殊要求就填0就可以了。可选值在TopicSysFlag中定义

根据源码可以分析出大致创建分为如下几步:

第1步,根据提供的key代表的topic去获取该topic所在的broker的路由,如果想在所有broker创建,一般使用DefaultTopic,因为这个topic是在所有broker上都存在的。
第2步,轮询所有的broker,在master上创建topic,中间有一个broker失败,则中止创建,返回失败。因为master和slave的配置数据也会自动同步,所以只需要在master上创建。
第3,4步,设置参数
第5步,调用MQClientAPIImpl接口创建,失败会重试4次。

2、自动创建就是在broker.conf中设置了autoCreateTopicEnable =true,都在设置false;

TBW102 是啥用的?

TBW102是Broker启动时,当autoCreateTopicEnable的配置为true时,会自动创建该默认(TBW102)topic。

就是一个接受自动创建topic的 Broker上的topic, 启动会把这个默认Topic(主题)的Broker登记到 NameServer,这样当 Producer 发送新 Topic 的消息时候会根据"TBW102 "这个topic得知哪个 Broker 可以自动创建主题,然后发往那个 Broker。

而 Broker 接受到这个消息的时候发现没找到对应的主题,但是它接受创建新主题,这样就会创建对应的 Topic 路由信息。

假设此时发送方还在连续快速的发送消息,那 NameServer 上其实还没有关于这个 Topic 的路由信息,所以有机会让别的允许自动创建的 Broker 也创建对应的 Topic 路由信息,这样集群里的 Broker 就能接受这个 Topic 的信息,达到负载均衡的目的,但也有个别 Broker 可能,没收到。

如果发送方这一次发了之后 30s 内一个都不发,之前的那个 Broker 随着心跳把这个路由信息更新到 NameServer 了,那么之后发送该 Topic 消息的 Producer 从 NameServer 只能得知该 Topic 消息只能发往之前的那台 Broker ,这就不均衡了,如果这个新主题消息很多,那台 Broker 负载就很高了。

所以不建议线上开启允许自动创建主题,即 autoCreateTopicEnable 参数。

Tags的使用

tag(标签): 标签可以被认为是对topic的进一步细化。一般在相同业务模块中通过引入标签来标记不同用途的消息。区分相同topic下不同种类的消息。生产到哪个topic的哪个tag下,消费者也是从topic的哪个tag进行消费,即实现消息的过滤。

建议一个应用一个 Topic,利用 tages 来标记不同业务,因为 tages 设置比较灵活,且一个应用一个 Topic 很清晰,能直观的辨别。

Keys的使用

如果有消息业务上的唯一标识,请填写到 keys 字段中,方便日后的定位查找。

queue(队列): queue是消息的物理管理单位,而topic是逻辑管理单位。一个topic下可以有多个queue,默认自动创建是4个,手动创建是8个

 

六、下面以windows服务器为例演示使用rocketmq如下:

1、下载rocketmq的安装包:https://rocketmq.apache.org/zh/download

2、下载rocketmq仪表盘(也就是可视化操作界面,是一个完整的java项目可以用idea运行)

3、修改conf/broker.conf配置在末尾添加如下配置(IP使用自己的),并保存。

brokerIP1=192.168.31.199

namesrvAddr=192.168.31.199:9876

4、配置ROCKET_HOME环境变量,路径使用下载路径;path中配置%ROCKET_HOME%\bin即可

5、启动Namesrv

在rocketmq文件的bin目录下,进入cmd使用如下命令:start mqnamesrv.cmd

关闭mqshutdown namesrv

6、启动Broker:start mqbroker.cmd -n 127.0.0.1:9876  autoCreateTopicEnable=true  (也就是说,producer使用RocketMQTemplate发送的消息,就算Booker上的topic之前不存在,rocket也会帮我们创建好)

关闭mqshutdown broker

7、将仪表盘项目导入idea,然后打开application.properties文件修改rocket.config.namesrvAddr=localhost:9876;

8、启动仪表盘项目:浏览器输入http://localhost:8080/#/即可看到可视化界面;

9、java代码创建生产者和消费者:

创建普通springboot项目,添加依赖

<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.1</version>
</dependency>

10、修改配置文件

# 应用名称
spring:
application:
name: rocket-producer
# 应用服务 WEB 访问端口
server:
port: 8002
rocketmq:
name-server: localhost:9876
producer:
group: my-group

11、创建测试代码

import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

@Component
public class SendMessage {
@Resource
private RocketMQTemplate rocketMQTemplate;


@Scheduled(fixedRate = 5000)
public void run(){
//发送消息

DefaultMQProducer producer = rocketMQTemplate.getProducer();
// key值为brokerName手动创建topic
//producer.createTopic("USER-20210820NG","99999",4);
// key值为已经存在的topic,根据此topic寻找到对应的broker在其上创建新的topic
producer.createTopic("1414","6666",4);
rocketMQTemplate.convertAndSend("6666", "Hello, World!");

}
}

12、创建消费者项目(同上)

消费端测试代码:
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

@Service
@RocketMQMessageListener(topic = "test-topic-1", consumerGroup = "my-consumer_test-topic-1")
class MyConsumer1 implements RocketMQListener<String> {

/**
*需要注意的是,onMessage()封装了ACK机制,消费者往外抛异常时,RocketMQ认为消费失败,重新发送该条消息,否则默认消费成功
*/

@Override
public void onMessage(String s) {
System.out.println(s);
}
}

【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月08日 0

暂无评论

推荐阅读
  2Vtxr3XfwhHq   2024年05月17日   55   0   0 Java
  Tnh5bgG19sRf   2024年05月20日   110   0   0 Java
  8s1LUHPryisj   2024年05月17日   46   0   0 Java
  aRSRdgycpgWt   2024年05月17日   47   0   0 Java
ijmlJDf00eQp