RocketMQ使用
  TEZNKK3IfmPf 2024年04月26日 19 0

说明:本文介绍RocketMQ的消费模式&消息类型,RocketMQ的安装参考及简单使用,参考:RocketMQ的安装参考及简单使用

消费模式

RocketMQ与RabbitMQ最大的区别在于,RocketMQ是根据消息的Topic锁定消费者的,Topic属性设置为相同的消费者,可以看做是一个消费者集群。消息模式分为以下三种:

(1)一对一

最简单的一种方式,消息的Topic只被一个消费者消费,如下:

(生产者)

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @Test
    public void simpleTest(){
     
       
        rocketMQTemplate.syncSend("simple","hello rocketmq!");
    }

(消费者)

@Component
@RocketMQMessageListener(consumerGroup = "groupA", topic = "simple")
public class ConsumerListener implements RocketMQListener<String> {
     
       
    @Override
    public void onMessage(String s) {
     
       
        System.out.println("s = " + s);
    }
}

执行结果

RocketMQ使用

(2)一对多

当存在多个Topic相同的消费者时,这些消费者共同消费消息,如下:

(开启两个消费者,Topic相同)

RocketMQ使用

(生产者)

    @Test
    public void oneToMany(){
     
       
        for (int i = 0; i < 10; i++) {
     
       
            rocketMQTemplate.syncSend("simple","one to many" + i);
        }
    }

执行结果,可以看到负载均衡策略是随机;

RocketMQ使用

RocketMQ使用

(3)多对多

参考一对多方式,发送多个Topic的消息,让多种Topic的消费者接收消息;

消息类型

根据消息的类型和对消息的处理,可以分为以下几种:

(1)同步消息

同步消息,消息发送到MQ,MQ保存成功后才会返回结果,在API中是以"sync"(synchronous,同步)开头的一些方法,可以看到这些方法都有返回值,可以通过返回结果判断是否发送成功;
RocketMQ使用

(消费者)

@Component
@RocketMQMessageListener(consumerGroup = "groupA", topic = "simple")
public class ConsumerListener implements RocketMQListener<String> {
     
       
    @Override
    public void onMessage(String s) {
     
       
        System.out.println("接收到同步消息 = " + s);
    }
}

(生产者,可以通过返回结果判断发送是否成功)

	@Autowired
	private RocketMQTemplate rocketMQTemplate;
	
	@Test
	public void simpleTest1(){
     
       
	    SendResult sendResult = rocketMQTemplate.syncSend("simple", "这是一个异步消息");
	    System.out.println("sendResult.getSendStatus() = " + sendResult.getSendStatus());
	}

RocketMQ使用

(2)异步消息

异步消息,消息发送给MQ后代码就会立即向下执行,在API中是以“asyn”(asynchronous,异步),可以手动设置发送消息成功与否执行的方法;

(生产者)

    @Test
    public void simpleTest2() throws InterruptedException {
     
       
        rocketMQTemplate.asyncSend("simple", "这是一个异步消息", new SendCallback() {
     
       
            @Override
            public void onSuccess(SendResult sendResult) {
     
       
                System.out.println("成功信息" + sendResult.toString());
            }

            @Override
            public void onException(Throwable throwable) {
     
       
                System.out.println("异常信息" + throwable.getMessage());
            }
        });
        TimeUnit.SECONDS.sleep(2);
    }

(发送消息成功,执行成功的方法)

RocketMQ使用

需要注意,这里是指发送消息成功与否,与消费者是否成功消费无关;

(3)单向消息

单向消息,是指只管发送消息,不关系MQ是否成功接收,没有返回值;

    @Test
    public void simpleTest3() {
     
       
        rocketMQTemplate.sendOneWay("simple", "这是一个单向消息");
    }

(4)延迟消息

延迟消息,指给消息设置一个延迟级别,达到指定时间后,消费者才能收到这个消息,延迟级别如下:

# 延迟级别,从1开始
1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

(生产者)

    @Test
    public void simpleTest4() {
     
       
        // 设置超时为1秒,延迟等级为3,即10秒
        rocketMQTemplate.syncSend("simple", MessageBuilder.withPayload("这是一个延迟消息").build(),1000,3);
    }

(消费者,10秒后才收到消息)

RocketMQ使用

延迟消息相较于RabbitMQ,使用起来更方便,但是只能设置时间等级,不能设置准确时间,非常难受;

(5)批量消息

RocketMQ可以发送一个集合,如下:

(消费者)

    @Test
    public void simpleTest5(){
     
       

        ArrayList<Message> list = new ArrayList<>();
        list.add(MessageBuilder.withPayload("aaa").build());
        list.add(MessageBuilder.withPayload("bbb").build());
        list.add(MessageBuilder.withPayload("ccc").build());

        rocketMQTemplate.syncSend("simple", list, 3000);
    }

(执行结果)

RocketMQ使用

(6)消息过滤

消息过滤,是RocketMQ较与RabbitMQ独有的功能,指对发送的消息进行过滤,指接收限定条件的消息,对消息进行限制接收。有两种方式,如下:

a. 标签过滤

在发送消息时,指定topic的同时,加上一个标签,表示只发给有这个标签的消费者;

(生产者)

    @Test
    public void simpleTest6(){
     
       
        rocketMQTemplate.syncSend("simple:tag", "Tag Message");
    }

(消费者)

@Component
@RocketMQMessageListener(consumerGroup = "groupA", topic = "simple", selectorExpression = "tag1")
public class ConsumerListener implements RocketMQListener<String> {
     
       
    @Override
    public void onMessage(String s) {
     
       
        System.out.println("接收到标签过滤消息 = " + s);
    }
}

(执行结果)

RocketMQ使用

b. SQL过滤

另一种是SQL过滤的方式,在消费者这边,写SQL语句对消息进行过滤消息;

(生产者,设置name = SQL)

    @Test
    public void simpleTest6(){
     
       
        // 标签方式
        rocketMQTemplate.syncSend("simple:tag", "Tag Message");

        // SQL语句方式
        rocketMQTemplate.syncSend("simple",
                MessageBuilder.withPayload("SQL Message")
                        .setHeader("name","SQL")
                        .build());
    }

(消费者,只接受name = SQL的消息)

@Component
@RocketMQMessageListener(consumerGroup = "groupA", topic = "simple",
 selectorType = SelectorType.SQL92, selectorExpression = "name = 'SQL'")
public class ConsumerListener implements RocketMQListener<String> {
     
       
    @Override
    public void onMessage(String s) {
     
       
        System.out.println("接收到SQL语句过滤消息 = " + s);
    }
}

(执行结果)

RocketMQ使用

(7)对象消息

RocketMQ当然也可以发送对象作为消息,该对象应该要实现Serializable接口,如下:

import java.io.Serializable;

public class User implements Serializable {
     
       

    private String username;

    private String password;

    public User() {
     
       
    }

    public User(String username, String password) {
     
       
        this.username = username;
        this.password = password;
    }

    public String getUsername() {
     
       
        return username;
    }

    public void setUsername(String username) {
     
       
        this.username = username;
    }

    public String getPassword() {
     
       
        return password;
    }

    public void setPassword(String password) {
     
       
        this.password = password;
    }

    @Override
    public String toString() {
     
       
        return "User{" +
                "username='" + username + '\'' +
                ", password='" + password + '\'' +
                '}';
    }
}

(生产者)

    @Test
    public void simpleTest7(){
     
       
        User user = new User();
        user.setUsername("zhangsan");
        user.setPassword("123456");
        rocketMQTemplate.syncSend("simple", user);
    }

(消费者)

@Component
@RocketMQMessageListener(consumerGroup = "groupA", topic = "simple")
public class ConsumerListener implements RocketMQListener<User> {
     
       
    @Override
    public void onMessage(User user) {
     
       
        System.out.println("user = " + user);
    }
}

(执行结果)

RocketMQ使用

(8)顺序消息

顺序消息,是指消息从发送到被消费,需要始终保持前后顺序。如下,发送15次消息,可以看到消费者那边的消费顺序并不是一直的;

    @Test
    public void simpleTest1() {
     
       
        for (int i = 0; i < 15; i++) {
     
       
            rocketMQTemplate.syncSend("simple", "这是一个同步消息===>" + i);
        }
    }

RocketMQ使用

顺序消息,需要保证以下两方面:

  • 所有的消息存入到MQ中的同一个队列中,因为RocketMQ默认有四个队列,消息会被负载均衡存储在这些队列里;

  • 该队列只能被一个线程消费,因为一个队列的消息在消费时会有多个线程同时进行消费;

前者可以通过,XxxOrderly()方法实现消息在队列中的顺序存储,如下:

(生产者:给对象设置一个ID,让它们按照ID顺序存储在MQ中)

    @Test
    public void simpleTest8(){
     
       
        ArrayList<User> users = new ArrayList<>();

        User user1 = new User("1","zhangsan","zs");
        User user2 = new User("2","lisi","ls");
        User user3 = new User("3","wangwu","ww");

        users.add(user1);
        users.add(user2);
        users.add(user3);


        for (User user : users) {
     
       
            rocketMQTemplate.syncSendOrderly("simple",user,user.getId());
        }
    }

后者,可以通过在消费者这边添加这个配置,保证消息被顺序消费,如下:

(消费者,设置消费模式 consumeMode = ConsumeMode.ORDERLY

@Component
@RocketMQMessageListener(consumerGroup = "groupA", topic = "simple",consumeMode = ConsumeMode.ORDERLY)
public class ConsumerListener implements RocketMQListener<User> {
     
       

    @Override
    public void onMessage(User user) {
     
       
        System.out.println(user);
    }
}

执行结果,可以看到消息时顺序进行的

RocketMQ使用

【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2024年04月26日 0

暂无评论

推荐阅读
  TEZNKK3IfmPf   4天前   20   0   0 java