SpringBoot集成MQTT入门
  19qMgiCiiRfc 2023年11月02日 64 0

一、新建消息发布者服务mqtt_publish

SpringBoot集成MQTT入门_EMQX

1.项目的pom文件如下所示:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.13</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>mqtt_publish</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>mqtt_publish</name>
    <description>mqtt_publish</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-core</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

2.在application.properties添加mqtt的相关配置

#端口
server.port=8091

#MQTT-服务端地址
publish.mqtt.host=tcp://localhost:1883
#MQTT-服务端用户名
publish.mqtt.username=admin
#MQTT-服务端密码
publish.mqtt.password=123456
#MQTT-是否清理session
publish.mqtt.cleansession=false
#MQTT-当前客户端的唯一标识
publish.mqtt.clientid=mqtt_publish
#当前客户端的主题
publish.mqtt.defaultTopic=topic
#发送超时时间
publish.mqtt.timeout=1000
#心跳时间
publish.mqtt.keepalive=10
#连接超时时间
publish.mqtt.connectionTimeout=3000

3.创建MQTT服务端连接工具类,用于加载配置参数

package com.example.mqtt_publish.config;

import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

/**
 * @author qx
 * @date 2023/07/18
 * @desc MQTT配置类
 */
@Configuration
@ConfigurationProperties(prefix = "publish.mqtt")
@Getter
@Setter
public class MQTTConfig {

    /**
     * 服务端地址
     */
    private String host;
    /**
     * 客户端的唯一标识
     */
    private String clientid;
    /**
     * 服务端用户名
     */
    private String username;
    /**
     * 服务端密码
     */
    private String password;
    /**
     * 是否清理session
     */
    private boolean cleansession;
    /**
     * 客户端的主题
     */
    private String defaultTopic;
    /**
     * 超时时间
     */
    private int timeout;
    /**
     * 心跳时间
     */
    private int keepalive;
    /**
     * 超时时间
     */
    private int connectionTimeout;

}

4.建立MQTT服务端连接类MqttConnect

package com.example.mqtt_publish.util;

import com.example.mqtt_publish.config.MQTTConfig;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * @author qx
 * @date 2023/07/18
 * @desc MQTT服务端连接类
 */
@Component
public class MqttConnect {

    @Autowired
    private MQTTConfig config;

    public MqttConnect(MQTTConfig config) {
        this.config = config;
    }

    public MqttConnectOptions getOptions() {
        MqttConnectOptions options = new MqttConnectOptions();
        options.setCleanSession(config.isCleansession());
        options.setUserName(config.getUsername());
        options.setPassword(config.getPassword().toCharArray());
        options.setConnectionTimeout(config.getConnectionTimeout());
        //设置心跳
        options.setKeepAliveInterval(config.getKeepalive());
        return options;
    }

    public MqttConnectOptions getOptions(MqttConnectOptions options) {

        options.setCleanSession(options.isCleanSession());
        options.setUserName(options.getUserName());
        options.setPassword(options.getPassword());
        options.setConnectionTimeout(options.getConnectionTimeout());
        options.setKeepAliveInterval(options.getKeepAliveInterval());
        return options;
    }

}

5.编写消息接收回调类PushCallback

package com.example.mqtt_publish.service;

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;

import java.nio.charset.StandardCharsets;

/**
 * @author qx
 * @date 2023/07/18
 * @desc 消息接收回调类
 */
@Slf4j
public class PushCallback implements MqttCallback {

    private final MQTTServer mqttServer;

    public PushCallback(MQTTServer mqttServer) {
        this.mqttServer = mqttServer;
    }


    public void connectionLost(Throwable cause) {
        // 连接丢失后,一般在这里面进行重连
        log.info("---------------------连接断开,可以做重连");
        mqttServer.subsribeConnect();

        while (true) {
            try {
                //如果没有发生异常说明连接成功,如果发生异常,则死循环
                Thread.sleep(1000);
                break;
            } catch (Exception e) {
                continue;
            }
        }

    }

    /**
     * 发送消息,消息到达后处理方法
     *
     * @param token
     */
    public void deliveryComplete(IMqttDeliveryToken token) {
        System.out.println("deliveryComplete---------" + token.isComplete());
    }

    /**
     * 接收所订阅的主题的消息并处理
     *
     * @param topic   主题
     * @param message 消息
     */
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        // subscribe后得到的消息会执行到这里面
        String result = new String(message.getPayload(), StandardCharsets.UTF_8);
        System.out.println("接收消息主题 : " + topic);
        System.out.println("接收消息Qos : " + message.getQos());
        System.out.println("接收消息内容 : " + result);
        //这里可以针对收到的消息做处理,比如持久化
    }

}

6.建立消息发送类MQTTServer类,实现主题消息的发送,以及订阅主题、取消订阅主题

package com.example.mqtt_publish.service;

import com.example.mqtt_publish.config.MQTTConfig;
import com.example.mqtt_publish.util.MqttConnect;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
 * @author qx
 * @date 2023/07/18
 * @desc MQTT消息发送
 */
@Service
@Slf4j
public class MQTTServer {

    /* 订阅者客户端对象 */
    private MqttClient subsribeClient;

    /**
     * 发布者客户端对象
     * 这里订阅者和发布者的MqttClient对象分别命名是为了让发布者和订阅者分开,
     * 如果订阅者和发布者都用一个MqttClient链接对象,则会出现两方都订阅了某个主题后,
     * 谁发送了消息,都会自己接收到自己发的消息,所以分开写,里面主要就是回调类的设置setCallback
     * */
    private MqttClient publishClient;

    /* 主题对象 */
    public MqttTopic topic;

    /* 消息内容对象 */
    public MqttMessage message;

    @Autowired
    private MqttConnect mqttConnect;

    @Autowired
    private MQTTConfig config;

    public MQTTServer() {
        log.info("8091上线了");
    }

    /**
     * 发布者客户端和服务端建立连接
     */
    public MqttClient publishConnect() {
        //防止重复创建MQTTClient实例
        try {
            if (publishClient==null) {
                //先让客户端和服务器建立连接,MemoryPersistence设置clientid的保存形式,默认为以内存保存
                publishClient = new MqttClient(config.getHost(), config.getClientid(), new MemoryPersistence());
                //发布消息不需要回调连接
                //client.setCallback(new PushCallback());
            }

            MqttConnectOptions options = mqttConnect.getOptions();
            //判断拦截状态,这里注意一下,如果没有这个判断,是非常坑的
            if (!publishClient.isConnected()) {
                publishClient.connect(options);
                log.info("---------------------连接成功");
            }else {//这里的逻辑是如果连接成功就重新连接
                publishClient.disconnect();
                publishClient.connect(mqttConnect.getOptions(options));
                log.info("---------------------连接成功");
            }
        } catch (MqttException e) {
            log.info(e.toString());
        }
        return publishClient;
    }

    /**
     * 订阅端的链接方法,关键是回调类的设置,要对订阅的主题消息进行处理
     * 断线重连方法,如果是持久订阅,重连时不需要再次订阅
     * 如果是非持久订阅,重连是需要重新订阅主题 取决于options.setCleanSession(true);
     * true为非持久订阅
     */
    public void subsribeConnect() {
        try {
            //防止重复创建MQTTClient实例
            if (subsribeClient==null) {
                //clientId不能和其它的clientId一样,否则会出现频繁断开连接和重连的问题
                subsribeClient = new MqttClient(config.getHost(), config.getClientid(), new MemoryPersistence());// MemoryPersistence设置clientid的保存形式,默认为以内存保存
                //如果是订阅者则添加回调类,发布不需要,PushCallback类在后面,继续往下看
                subsribeClient.setCallback(new PushCallback(MQTTServer.this));
            }
            MqttConnectOptions options = mqttConnect.getOptions();
            //判断拦截状态,这里注意一下,如果没有这个判断,是非常坑的
            if (!subsribeClient.isConnected()) {
                subsribeClient.connect(options);
            }else {//这里的逻辑是如果连接成功就重新连接
                subsribeClient.disconnect();
                subsribeClient.connect(mqttConnect.getOptions(options));
            }
            log.info("----------客户端连接成功");
        } catch (MqttException e) {
            log.info(e.getMessage(), e);
        }
    }

    /**
     * 把组装好的消息发出去
     * @param topic
     * @param message
     * @return
     */
    public boolean publish(MqttTopic topic , MqttMessage message) {

        MqttDeliveryToken token = null;
        try {
            //把消息发送给对应的主题
            token = topic.publish(message);
            token.waitForCompletion();
            //检查发送是否成功
            boolean flag = token.isComplete();

            StringBuffer sbf = new StringBuffer(200);
            sbf.append("给主题为'"+topic.getName());
            sbf.append("'发布消息:");
            if (flag) {
                sbf.append("成功!消息内容是:"+new String(message.getPayload()));
            } else {
                sbf.append("失败!");
            }
            log.info(sbf.toString());
        } catch (MqttException e) {
            log.info(e.toString());
        }
        return token.isComplete();
    }

    /**
     * MQTT发送指令:主要是组装消息体
     * @param topic 主题
     * @param data 消息内容
     * @param qos 消息级别
     */
    public void sendMQTTMessage(String topic, String data, int qos) {

        try {
            this.publishClient = publishConnect();
            this.topic = this.publishClient.getTopic(topic);
            message = new MqttMessage();
            //消息等级
            //level 0:消息最多传递一次,不再关心它有没有发送到对方,也不设置任何重发机制
            //level 1:包含了简单的重发机制,发送消息之后等待接收者的回复,如果没收到回复则重新发送消息。这种模式能保证消息至少能到达一次,但无法保证消息重复
            //level 2: 有了重发和重复消息发现机制,保证消息到达对方并且严格只到达一次
            message.setQos(qos);
            //如果重复消费,则把值改为true,然后发送一条空的消息,之前的消息就会覆盖,然后在改为false
            message.setRetained(false);

            message.setPayload(data.getBytes());

            //将组装好的消息发出去
            publish(this.topic, message);
        } catch (Exception e) {
            log.info(e.toString());
            e.printStackTrace();
        }

    }

    /**
     * 订阅端订阅消息
     * @param topic 要订阅的主题
     * @param qos 订阅消息的级别
     */
    public void init(String topic, int qos) {
        //建立连接
        subsribeConnect();
        //以某个消息级别订阅某个主题
        try {
            subsribeClient.subscribe(topic, qos);
        } catch (MqttException e) {
            log.info(e.getMessage(), e);
        }
    }

    /**
     * 订阅端取消订阅消息
     * @param topic 要订阅的主题
     */
    public void unionInit(String topic) {
        //建立连接
        subsribeConnect();
        //取消订阅某个主题
        try {
            //MQTT 协议中订阅关系是持久化的,因此如果不需要订阅某些 Topic,需要调用 unsubscribe 方法取消订阅关系。
            subsribeClient.unsubscribe(topic);
        } catch (MqttException e) {
            log.info(e.getMessage(), e);
        }
    }
}

7.创建控制层进行测试

package com.example.mqtt_publish.controller;

import com.example.mqtt_publish.service.MQTTServer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author qx
 * @date 2023/07/18
 * @desc 测试
 */
@RestController
@RequestMapping("/publish")
public class PublishController {

    @Autowired
    private MQTTServer mqttServer;

    /**
     * 发送数据
     *
     * @param topic 主题
     * @param msg   消息内容
     * @param qos   qos消息级别
     * @return
     */
    @RequestMapping("/sendMsg")
    public String testSend(String topic, String msg, int qos) {
        mqttServer.sendMQTTMessage(topic, msg, qos);
        return "发送了一条主题是‘" + topic + "’,内容是:" + msg + ",消息级别 " + qos;
    }

    /**
     * 订阅主题
     *
     * @param topic 主题名称
     * @param qos   qos消息级别
     * @return
     */
    @RequestMapping("/subsribe")
    public String testSubsribe(String topic, int qos) {
        mqttServer.init(topic, qos);
        return "订阅主题" + topic + "成功";
    }
}

8.启动程序,在浏览器输入http://localhost:8091/publish/sendMsg?topic=test&msg=hello&qos=0

SpringBoot集成MQTT入门_发布_02

控制台显示

2023-07-19 14:30:55.023  INFO 3712 --- [nio-8091-exec-1] o.s.web.servlet.DispatcherServlet        : Completed initialization in 5 ms
2023-07-19 14:30:55.438  INFO 3712 --- [nio-8091-exec-1] c.e.mqtt_publish.service.MQTTServer      : ---------------------连接成功
2023-07-19 14:30:55.441  INFO 3712 --- [nio-8091-exec-1] c.e.mqtt_publish.service.MQTTServer      : 给主题为'test'发布消息:成功!消息内容是:hello

现在EMQ X服务端监控信息如下:

SpringBoot集成MQTT入门_MQTT_03

二、新建消息订阅者服务mqtt_subsribe

SpringBoot集成MQTT入门_发布_04

pom文件和发布端的文件一样,参考一下就可以了

1.application.properties的文件配置和发布端的类似,修改一下端口号和客户端ID就可以了


#端口
server.port=8092

#MQTT-服务端地址
publish.mqtt.host=tcp://localhost:1883
#MQTT-服务端用户名
publish.mqtt.username=admin
#MQTT-服务端密码
publish.mqtt.password=123456
#MQTT-是否清理session
publish.mqtt.cleansession=false
#MQTT-当前客户端的唯一标识
publish.mqtt.clientid=mqtt_subsribe
#当前客户端的主题
publish.mqtt.defaultTopic=topic
#发送超时时间
publish.mqtt.timeout=1000
#心跳时间
publish.mqtt.keepalive=10
#连接超时时间
publish.mqtt.connectionTimeout=3000

2.把发布者端的PushCallback、MQTTConfig、MqttConnect复制过来

3.把发布者端的MQTTServer服务复制过来修改一下名称为MQTTSubsribe。

4.创建测试控制层

package com.example.mqtt_subsribe.controller;

import com.example.mqtt_subsribe.service.MQTTSubsribe;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author qx
 * @date 2023/07/19
 * @desc 测试
 */
@RestController
@RequestMapping("/subsribe")
public class SubsribeController {

    @Autowired
    private MQTTSubsribe mqttSubsribe;

    /**
     * 订阅主题
     *
     * @param topic 主题名称
     * @param qos   消息级别
     * @return
     */
    @RequestMapping("/topic")
    public String testTopic(String topic, int qos) {
        mqttSubsribe.init(topic, qos);
        return "订阅[" + topic + "]成功";
    }

    /**
     * 取消订阅
     *
     * @param topic 主题名称
     * @return
     */
    @RequestMapping("/cancel")
    public String cancelTopic(String topic) {
        mqttSubsribe.unionInit(topic);
        return "取消订阅[" + topic + "]成功";
    }

}

5.启动程序,先订阅消息。

SpringBoot集成MQTT入门_订阅_05

2023-07-19 15:16:58.222  INFO 5468 --- [nio-8092-exec-1] o.s.web.servlet.DispatcherServlet        : Completed initialization in 5 ms
2023-07-19 15:16:58.647  INFO 5468 --- [nio-8092-exec-1] c.e.mqtt_subsribe.service.MQTTSubsribe   : ----------客户端连接成功

6.我们重新调用发布者端的服务,发布指定主题的消息

SpringBoot集成MQTT入门_发布_06

我们的订阅端因为订阅了test这个主题,所以接收到了发布者端发送过来的消息。

订阅者服务的控制台显示如下:

接收消息主题 : test
接收消息Qos : 0
接收消息内容 : haha

此时的MQTT服务器页面显示了两台客户端

SpringBoot集成MQTT入门_EMQX_07

7.我们在订阅者服务里面调用取消订阅的接口。

SpringBoot集成MQTT入门_EMQX_08

发现MQTT服务器页面里面的订阅者客户端已经没有了订阅的数量

SpringBoot集成MQTT入门_EMQX_09

我们再次调用发布者端发布订阅的消息接口

SpringBoot集成MQTT入门_发布_10

然后我们在订阅者服务的控制台里面并没有打印出获取消息的日志。我们取消订阅的测试成功了。

总而言之, 所有的客户端都可以是订阅端,或者是发布端。任何一个客户端发布一条某个主题的消息,都会通过服务端转发给每一个订阅了该主题的客户端。

【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月08日 0

暂无评论

推荐阅读
19qMgiCiiRfc