依赖
<!-- MQTT -->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<version>5.3.2.RELEASE</version>
</dependency>
<!-- 上面的已整合下面 -->
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>
相关配置
mqtt:
#MQTT服务地址,端口号默认1883,如果有多个,用逗号隔开
# 请求使用 tcp
url: tcp://mqtt.XXX.com.cn:1883
#用户名
username: admin
#密码
password: public
#客户端id(不能重复)
client:
id: tester
#MQTT默认的消息推送主题,可在调用接口指定
default:
topic: test1
#连接超时时间 单位为秒
connectionTimeout: 100
#设置心跳时间 单位为秒,表示服务器每隔 1.5*?秒的时间向客户端发送心跳判断客户端是否在线
keepAliveInterval: 40
#发送超时时间
completionTimeout: 60
#是否自动重新连接
automaticReconnect: true
MQTT 客户端类
@Component
public class MqttMd {
private static final Logger logger = LoggerFactory.getLogger(MqttMd.class);
/**
* 用户名
*/
@Value("${spring.mqtt.username}")
private String username;
/**
* 密码
*/
@Value("${spring.mqtt.password}")
private String password;
/**
* 服务器地址
*/
@Value("${spring.mqtt.url}")
private String hostUrl;
/**
* 客户端ID
*/ @Value("${spring.mqtt.client.id}")
private String clientId;
/**
* 默认主题
*/
@Value("${spring.mqtt.default.topic}")
private String defaultTopic;
/**
* 设置心跳时间 单位为秒,表示服务器每隔 1.5*?秒的时间向客户端发送心跳判断客户端是否在线
*/
@Value("${spring.mqtt.keepAliveInterval}")
private int keepAliveInterval;
/**
* 连接超时时间,单位为秒
*/
@Value("${spring.mqtt.connectionTimeout}")
private int connectionTimeout;
/**
* 是否自动重新连接
*/
@Value("${spring.mqtt.automaticReconnect}")
private Boolean automaticReconnect;
/**
* 发送超时时间
*/
@Value("${spring.mqtt.completionTimeout}")
private int completionTimeout;
/**
* 客户端对象
*/
private MqttClient client;
/**
* 连接初始化
*/
//@PostConstruct(在 bean 创建后自动初始化)
public void init(){
connect();
}
/**
* 客户端连接服务端
*/
public void connect(){
try{
logger.info("========= MQTT连接 =============");
logger.info("开始连接服务端.....");
//创建MQTT客户端对象
client = new MqttClient(hostUrl,clientId,new MemoryPersistence());
logger.info("发送服务器地址--[{}]",hostUrl);
//连接设置
MqttConnectOptions options = new MqttConnectOptions();
logger.info("客户端ID--[{}]",clientId);
//是否清空session,设置false表示服务器会保留客户端的连接记录(订阅主题,qos),客户端重连之后能获取到服务器在客户端断开连接期间推送的消息
//设置为true表示每次连接服务器都是以新的身份
options.setCleanSession(false);
//设置连接用户名
options.setUserName(username);
logger.info("客户端用户名--[{}]",username);
//设置连接密码
options.setPassword(password.toCharArray());
//设置超时时间,单位为秒
options.setConnectionTimeout(connectionTimeout);
//设置心跳时间 单位为秒,表示服务器每隔 1.5*?秒的时间向客户端发送心跳判断客户端是否在线
options.setKeepAliveInterval(keepAliveInterval);
//设置自动重新连接
options.setAutomaticReconnect(automaticReconnect);
//设置遗嘱消息的话题,若客户端和服务器之间的连接意外断开,服务器将发布客户端的遗嘱信息
options.setWill("willTopic",(clientId + "与服务器断开连接").getBytes(),0,false);
//设置回调
client.setCallback(new MqttCallBack());
client.connect(options);
logger.info("===============================");
} catch(MqttException e){
e.printStackTrace();
}
}
/**
* 断开连接
*/
public void disConnectd(){
try {
logger.info("========= MQTT连接 =============");
logger.info("正在与服务端断开连接.....");
client.disconnect();
logger.info("已断开连接");
logger.info("===============================");
} catch (MqttException e) {
e.printStackTrace();
}
}
/**
* 发布
* @param qos
* @param retained
* @param topic
* @param message
*/
public void publish(int qos,boolean retained,String topic,String message) throws MqttException {
if (BeanUtil.isEmpty(client)) {
//先让客户端和服务器建立连接,MemoryPersistence设置clientid的保存形式,默认为以内存保存
logger.info("客户端不存在,正在创建.....");
client = new MqttClient(hostUrl,clientId,new MemoryPersistence());
}
if (!client.isConnected()){
//重新连接
logger.info("客户端未连接!");
client.connect();
}else {
}
logger.info("开始发布主题.....");
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setQos(qos);
mqttMessage.setRetained(retained);
mqttMessage.setPayload(message.getBytes());
//主题的目的地,用于发布/订阅信息
MqttTopic mqttTopic = client.getTopic(topic);
//提供一种机制来跟踪消息的传递进度
//用于在以非阻塞方式(在后台运行)执行发布是跟踪消息的传递进度
MqttDeliveryToken token;
try {
//将指定消息发布到主题,但不等待消息传递完成,返回的token可用于跟踪消息的传递状态
//一旦此方法干净地返回,消息就已被客户端接受发布,当连接可用,将在后台完成消息传递。
token = mqttTopic.publish(mqttMessage);
token.waitForCompletion();
logger.info("发布主题成功");
} catch (MqttException e) {
e.printStackTrace();
}
}
/**
* 订阅主题
* @param topic
* @param qos
*/
public void subscribe(String topic,int qos) throws MqttException {
if (BeanUtil.isEmpty(client)) {
//先让客户端和服务器建立连接,MemoryPersistence设置clientid的保存形式,默认为以内存保存
logger.info("客户端不存在,正在创建.....");
client = new MqttClient(hostUrl,clientId,new MemoryPersistence());
}
if (!client.isConnected()){
//重新连接
logger.info("客户端未连接!");
client.connect();
}else {
}
logger.info("开始订阅主题.....");
try {
client.subscribeWithResponse(topic, qos);
client.setCallback(new MqttCallBack());
logger.info("订阅主题完成!");
} catch (MqttException e) {
e.printStackTrace();
}
}}
MQTT消息回调类
注意回调方法调用外部方法可能会导致掉线(大坑)!!!!
public class MqttCallBack implements MqttCallbackExtended {
@Autowired
private MqttMd client;
private static final Logger logger = LoggerFactory.getLogger(MqttCallBack.class);
/**
* 客户端连接成功的回调
* @param reconnect
* @param serverURI
*/
@Override
public void connectComplete(boolean reconnect, String serverURI) {
logger.info("服务端连接成功!");
}
/**
* 客户端丢失连接的回调
* @param throwable
*/
@Override
public void connectionLost(Throwable throwable) {
logger.info("与服务器断开连接....");
//重新连接
client.connect();
}
/**
* 消息到达的回调
* @param topic
* @param message
* @throws Exception
*/ @Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
logger.info("==========================");
logger.info("消息接收成功!");
logger.info(String.format("接收消息主题 : %s",topic));
logger.info(String.format("接收消息Qos : %d",message.getQos()));
logger.info(String.format("接收消息内容 : %s",new String(message.getPayload())));
logger.info(String.format("发送消息接收消息retained : %b",message.isRetained()));
logger.info("==========================");
}
/**
* 消息发布成功回调
* @param iMqttDeliveryToken
*/
@SneakyThrows
@Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
logger.info("==========================");
logger.info("消息发送成功!");
IMqttAsyncClient client = iMqttDeliveryToken.getClient();
logger.info("发送服务器地址--[{}]",client.getServerURI());
MqttMessage message = iMqttDeliveryToken.getMessage();
logger.info("发送消息Qos--[{}]",message.getQos());
logger.info("发送消息内容--[{}]",new String(message.getPayload()));
logger.info("发送消息接收消息retained--[{}]",message.isRetained());
logger.info("==========================");
}
}
MQTT调用测试类
@RestController(value = "app.mqtt.MqttController")
@RequestMapping(value = "/mqtt")
public class MqttController {
@Autowired
private MqttMd client;
/**
* 连接服务器
*/
@PostMapping("startMQTT")
public void startMQTT(){
client.init();
}
/**
* 断开服务器
*/
@PostMapping("endMQTT")
public void endMQTT(){
client.disConnectd();
}
/**
* 发布消息
*/
@PostMapping("sendMessage")
public void sendMessage() throws MqttException {
client.publish(0,true,"test","发布测试-payLoad");
}
/**
* 订阅消息
*/
@PostMapping("receiveMessage")
public void receiveMessage() throws MqttException {
client.subscribe("test",1);
}
}