基于SpringBoot + Maven 整合MQTT
  6Kj1okubbK7m 2023年11月02日 46 0

依赖

<!-- MQTT -->    
<dependency>  
   <groupId>org.springframework.integration</groupId>  
   <artifactId>spring-integration-mqtt</artifactId>  
   <version>5.3.2.RELEASE</version>  
</dependency>

<!-- 上面的已整合下面 -->
<dependency>
   <groupId>org.eclipse.paho</groupId>
   <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    <version>1.2.5</version>
</dependency>

相关配置

mqtt:  
  #MQTT服务地址,端口号默认1883,如果有多个,用逗号隔开 
  # 请求使用 tcp 
  url: tcp://mqtt.XXX.com.cn:1883  
  #用户名  
  username: admin  
  #密码  
  password: public  
  #客户端id(不能重复)  
  client:  
    id: tester  
  #MQTT默认的消息推送主题,可在调用接口指定  
  default:  
    topic: test1  
  #连接超时时间 单位为秒  
  connectionTimeout: 100  
  #设置心跳时间 单位为秒,表示服务器每隔 1.5*?秒的时间向客户端发送心跳判断客户端是否在线  
  keepAliveInterval: 40  
  #发送超时时间  
  completionTimeout: 60  
  #是否自动重新连接  
  automaticReconnect: true

MQTT 客户端类

@Component  
public class MqttMd {  
  
    private static final Logger logger = LoggerFactory.getLogger(MqttMd.class);  
  
    /**  
     *  用户名  
     */  
    @Value("${spring.mqtt.username}")  
    private String username;  
  
    /**  
     *  密码  
     */  
    @Value("${spring.mqtt.password}")  
    private String password;  
  
    /**  
     *  服务器地址  
     */  
    @Value("${spring.mqtt.url}")  
    private String hostUrl;  
  
    /**  
     *  客户端ID  
     */    @Value("${spring.mqtt.client.id}")  
    private String clientId;  
  
    /**  
     *  默认主题  
     */  
    @Value("${spring.mqtt.default.topic}")  
    private String defaultTopic;  
  
    /**  
     *  设置心跳时间 单位为秒,表示服务器每隔 1.5*?秒的时间向客户端发送心跳判断客户端是否在线  
     */  
    @Value("${spring.mqtt.keepAliveInterval}")  
    private int keepAliveInterval;  
  
    /**  
     * 连接超时时间,单位为秒  
     */  
    @Value("${spring.mqtt.connectionTimeout}")  
    private int connectionTimeout;  
  
    /**  
     * 是否自动重新连接  
     */  
    @Value("${spring.mqtt.automaticReconnect}")  
    private Boolean automaticReconnect;  
  
    /**  
     * 发送超时时间  
     */  
    @Value("${spring.mqtt.completionTimeout}")  
    private int completionTimeout;  
  
    /**  
     * 客户端对象  
     */  
    private MqttClient client;  
  
    /**  
     * 连接初始化 
     */  
    //@PostConstruct(在 bean 创建后自动初始化)
    public void init(){  
        connect();  
    }  
  
    /**  
     * 客户端连接服务端  
     */  
    public void connect(){  
        try{  
            logger.info("========= MQTT连接 =============");  
            logger.info("开始连接服务端.....");  
            //创建MQTT客户端对象  
            client = new MqttClient(hostUrl,clientId,new MemoryPersistence());  
            logger.info("发送服务器地址--[{}]",hostUrl);  
            //连接设置  
            MqttConnectOptions options = new MqttConnectOptions();  
            logger.info("客户端ID--[{}]",clientId);  
            //是否清空session,设置false表示服务器会保留客户端的连接记录(订阅主题,qos),客户端重连之后能获取到服务器在客户端断开连接期间推送的消息  
            //设置为true表示每次连接服务器都是以新的身份  
            options.setCleanSession(false);  
            //设置连接用户名  
            options.setUserName(username);  
            logger.info("客户端用户名--[{}]",username);  
            //设置连接密码  
            options.setPassword(password.toCharArray());  
            //设置超时时间,单位为秒  
            options.setConnectionTimeout(connectionTimeout);  
            //设置心跳时间 单位为秒,表示服务器每隔 1.5*?秒的时间向客户端发送心跳判断客户端是否在线  
            options.setKeepAliveInterval(keepAliveInterval);  
            //设置自动重新连接  
            options.setAutomaticReconnect(automaticReconnect);  
            //设置遗嘱消息的话题,若客户端和服务器之间的连接意外断开,服务器将发布客户端的遗嘱信息  
            options.setWill("willTopic",(clientId + "与服务器断开连接").getBytes(),0,false);  
            //设置回调  
            client.setCallback(new MqttCallBack());  
            client.connect(options);  
            logger.info("===============================");  
        } catch(MqttException e){  
            e.printStackTrace();  
        }  
    }  
    
    /**  
     * 断开连接  
     */  
    public void disConnectd(){  
        try {  
            logger.info("========= MQTT连接 =============");  
            logger.info("正在与服务端断开连接.....");  
            client.disconnect();  
            logger.info("已断开连接");  
            logger.info("===============================");  
        } catch (MqttException e) {  
            e.printStackTrace();  
        }  
    }  
  
    /**  
     * 发布  
     * @param qos  
     * @param retained  
     * @param topic  
     * @param message  
     */  
    public void publish(int qos,boolean retained,String topic,String message) throws MqttException {  
  
        if (BeanUtil.isEmpty(client)) {  
            //先让客户端和服务器建立连接,MemoryPersistence设置clientid的保存形式,默认为以内存保存  
            logger.info("客户端不存在,正在创建.....");  
            client = new MqttClient(hostUrl,clientId,new MemoryPersistence());  
        }  
  
        if (!client.isConnected()){  
            //重新连接  
            logger.info("客户端未连接!");  
            client.connect();  
        }else {  
  
        }  
        logger.info("开始发布主题.....");  
        MqttMessage mqttMessage = new MqttMessage();  
        mqttMessage.setQos(qos);  
        mqttMessage.setRetained(retained);  
        mqttMessage.setPayload(message.getBytes());  
        //主题的目的地,用于发布/订阅信息  
        MqttTopic mqttTopic = client.getTopic(topic);  
        //提供一种机制来跟踪消息的传递进度  
        //用于在以非阻塞方式(在后台运行)执行发布是跟踪消息的传递进度  
        MqttDeliveryToken token;  
        try {  
            //将指定消息发布到主题,但不等待消息传递完成,返回的token可用于跟踪消息的传递状态  
            //一旦此方法干净地返回,消息就已被客户端接受发布,当连接可用,将在后台完成消息传递。  
            token = mqttTopic.publish(mqttMessage);  
            token.waitForCompletion();  
            logger.info("发布主题成功");  
        } catch (MqttException e) {  
            e.printStackTrace();  
        }  
    }  
    
    /**  
     * 订阅主题  
     * @param topic  
     * @param qos  
     */  
    public void subscribe(String topic,int qos) throws MqttException {  
  
        if (BeanUtil.isEmpty(client)) {  
            //先让客户端和服务器建立连接,MemoryPersistence设置clientid的保存形式,默认为以内存保存  
            logger.info("客户端不存在,正在创建.....");  
            client = new MqttClient(hostUrl,clientId,new MemoryPersistence());  
        }  
  
        if (!client.isConnected()){  
            //重新连接  
            logger.info("客户端未连接!");  
            client.connect();  
        }else {  
  
        }  
        logger.info("开始订阅主题.....");  
        try {  
            client.subscribeWithResponse(topic, qos);  
            client.setCallback(new MqttCallBack());  
            logger.info("订阅主题完成!");  
        } catch (MqttException e) {  
            e.printStackTrace();  
        }  
  
    }}

MQTT消息回调类

注意回调方法调用外部方法可能会导致掉线(大坑)!!!!

public class MqttCallBack implements MqttCallbackExtended {  

    @Autowired  
    private MqttMd client;  
  
    private static final Logger logger = LoggerFactory.getLogger(MqttCallBack.class);  
  
    /**  
     * 客户端连接成功的回调  
     * @param reconnect  
     * @param serverURI  
     */  
    @Override  
    public void connectComplete(boolean reconnect, String serverURI) {  
        logger.info("服务端连接成功!");  
    }  
  
    /**  
     * 客户端丢失连接的回调  
     * @param throwable  
     */  
    @Override  
    public void connectionLost(Throwable throwable) {  
        logger.info("与服务器断开连接....");  
        //重新连接  
        client.connect();  
    }  
  
    /**  
     * 消息到达的回调  
     * @param topic  
     * @param message  
     * @throws Exception  
     */    @Override  
    public void messageArrived(String topic, MqttMessage message) throws Exception {  
  
        logger.info("==========================");  
        logger.info("消息接收成功!");  
        logger.info(String.format("接收消息主题 : %s",topic));  
        logger.info(String.format("接收消息Qos : %d",message.getQos()));  
        logger.info(String.format("接收消息内容 : %s",new String(message.getPayload())));  
        logger.info(String.format("发送消息接收消息retained : %b",message.isRetained()));  
        logger.info("==========================");  
    }  
  
    /**  
     * 消息发布成功回调  
     * @param iMqttDeliveryToken  
     */  
    @SneakyThrows  
    @Override    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {  
  
        logger.info("==========================");  
        logger.info("消息发送成功!");  
        IMqttAsyncClient client = iMqttDeliveryToken.getClient();  
        logger.info("发送服务器地址--[{}]",client.getServerURI());  
        MqttMessage message = iMqttDeliveryToken.getMessage();  
        logger.info("发送消息Qos--[{}]",message.getQos());  
        logger.info("发送消息内容--[{}]",new String(message.getPayload()));  
        logger.info("发送消息接收消息retained--[{}]",message.isRetained());  
        logger.info("==========================");  
    }  
}

MQTT调用测试类

@RestController(value = "app.mqtt.MqttController")  
@RequestMapping(value = "/mqtt")  
public class MqttController {  
  
    @Autowired  
    private MqttMd client;  
  
    /**  
     * 连接服务器  
     */  
    @PostMapping("startMQTT")  
    public void startMQTT(){  
        client.init();  
    }  
  
    /**  
     * 断开服务器  
     */  
    @PostMapping("endMQTT")  
    public void endMQTT(){  
        client.disConnectd();  
    }  
  
    /**  
     * 发布消息  
     */  
    @PostMapping("sendMessage")  
    public void sendMessage() throws MqttException {  
        client.publish(0,true,"test","发布测试-payLoad");  
    }  
  
    /**  
     * 订阅消息  
     */  
    @PostMapping("receiveMessage")  
    public void receiveMessage() throws MqttException {  
        client.subscribe("test",1);  
    }  
}
【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月08日 0

暂无评论

推荐阅读