我们在上一章安装、配置启动的基础上进行RocketMQ的使用。
一、添加依赖
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.example</groupId>
<artifactId>cloud-demo</artifactId>
<version>1.0-SNAPSHOT</version>
</parent>
<artifactId>cloud-rocketmq</artifactId>
<packaging>jar</packaging>
<name>cloud-rocketmq</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.0</version>
</dependency>
</dependencies>
</project>
二、修改application.yml配置
server:
port: 1004
spring:
application:
# 服务名
name: cloud-rocketmq
rocketmq:
name-server: 127.0.0.1:9876
producer:
group: demo-producer-group
三、创建一个消息生产者
package org.example.rocketmq.service;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
/**
* @author qx
* @date 2023/9/28
* @des 消息生产者
*/
@Service
public class MessageProducer {
@Resource
private RocketMQTemplate rocketMQTemplate;
public void send() {
rocketMQTemplate.send("test-topic-1", MessageBuilder.withPayload("Hello World!I'm from spring message").build());
}
}
四、创建一个消息监听接收者
package org.example.rocketmq.service;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
/**
* @author qx
* @date 2023/9/28
* @des 消息监听器
*/
@Service
@RocketMQMessageListener(topic = "test-topic-1", consumerGroup = "my-consumer-test-topic-1")
public class MessageListener implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
System.out.println("接收到数据:" + s);
}
}
五、创建控制器测试
package org.example.rocketmq.controller;
import org.example.rocketmq.service.MessageProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author qx
* @date 2023/9/28
* @des 控制器
*/
@RestController
@RequestMapping("/rocketmq")
public class RocketMqController {
@Autowired
private MessageProducer messageProducer;
@RequestMapping("/message")
public void message() {
messageProducer.send();
}
}
我们启动RocketMQ和运行当前的程序。在浏览器输入
http://localhost:1004/rocketmq/message
我们可以在控制台上看到接收到了消息。