Всем привет, мы снова встретились, я ваш друг Цюаньчжаньцзюнь.
Оглавление
2. В этой статье объясняется только, как подключить Java к серверу MQTT для обработки данных.
Данные, собранные аппаратным обеспечением, передаются на платформу EMQX (с использованием протокола MQTT), а Java подключается к серверу MQTT через код для получения, анализа, бизнес-процесса, хранения и отображения собранных данных.
MQTT основан на Публикация/подписка режим связи и обмена данными.
1. Создайте новый проект Springboot и напрямую добавьте следующую зависимость mqtt в файл pom.
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
2. Напишите класс инструмента MQTT.
package com.siborui.dc.mqtt;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
/**
* Операции с инструментом MQTT
*
* @author Mr.Qu
* @since v1.1.0 2020-01-10
*/
@Slf4j
@Component
public class MQTTConnect {
private String HOST = "tcp://127.0.0.1:1883"; //Адрес и номер порта сервера mqtt
private final String clientId = "DC" + (int) (Math.random() * 100000000);
private MqttClient mqttClient;
/**
* Клиентское соединение подключается к серверу mqtt
*
* @param userName имя пользователя
* @param passWord пароль
* @param mqttCallback функция обратного вызова
**/
public void setMqttClient(String userName, String passWord, MqttCallback mqttCallback) throws MqttException {
MqttConnectOptions options = mqttConnectOptions(userName, passWord);
if (mqttCallback == null) {
mqttClient.setCallback(new Callback());
} else {
mqttClient.setCallback(mqttCallback);
}
mqttClient.connect(options);
}
/**
* Настройки параметров соединения MQTT
*/
private MqttConnectOptions mqttConnectOptions(String userName, String passWord) throws MqttException {
mqttClient = new MqttClient(HOST, clientId, new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(userName);
options.setPassword(passWord.toCharArray());
options.setConnectionTimeout(10); ///По умолчанию: 30;
options.setAutomaticReconnect(true);//По умолчанию: false
options.setCleanSession(false);//По умолчанию: true
//options.setKeepAliveInterval(20);//По умолчанию: 60
return options;
}
/**
* Закрыть соединение MQTT
*/
public void close() throws MqttException {
mqttClient.close();
mqttClient.disconnect();
}
/**
* Опубликовать сообщение в теме Качество обслуживания по умолчанию: 1
*
* @param тема:Опубликованная тема
* @param сообщение: опубликованное сообщение
*/
public void pub(String topic, String msg) throws MqttException {
MqttMessage mqttMessage = new MqttMessage();
//mqttMessage.setQos(2);
mqttMessage.setPayload(msg.getBytes());
MqttTopic mqttTopic = mqttClient.getTopic(topic);
MqttDeliveryToken token = mqttTopic.publish(mqttMessage);
token.waitForCompletion();
}
/**
* Опубликовать сообщение в теме
*
* @param topic: Размещенные темы
* @param msg: опубликованные новости
* @param qos: качество сообщения Qos:0、1、2
*/
public void pub(String topic, String msg, int qos) throws MqttException {
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setQos(qos);
mqttMessage.setPayload(msg.getBytes());
MqttTopic mqttTopic = mqttClient.getTopic(topic);
MqttDeliveryToken token = mqttTopic.publish(mqttMessage);
token.waitForCompletion();
}
/**
* Подписаться на тему , уровень Qos по умолчанию для этого метода: 1
*
* @param topic тема
*/
public void sub(String topic) throws MqttException {
mqttClient.subscribe(topic);
}
/**
* Подписаться на тему,ПортативныйQos
*
* @param topic тема на которую стоит подписаться
* @param qos качество сообщения:0、1、2
*/
public void sub(String topic, int qos) throws MqttException {
mqttClient.subscribe(topic, qos);
}
/**
* основная функция для самостоятельного тестирования
*/
public static void main(String[] args) throws MqttException {
MQTTConnect mqttConnect = new MQTTConnect();
mqttConnect.setMqttClient("admin", "public", new Callback());
mqttConnect.sub("com/iot/init");
mqttConnect.pub("com/iot/init", "Mr.Qu" + (int) (Math.random() * 100000000));
}
}
3. Напишите функцию обратного вызова MQTT.
package com.siborui.dc.mqtt;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
/**
* Обычная функция MQTT обратного вызова
*
* @author Mr.Qu
* @since 2020/1/9 16:26
*/
@Slf4j
public class Callback implements MqttCallback {
/**
* MQTT Отключение выполнит этот метод
*/
@Override
public void connectionLost(Throwable throwable) {
log.info("Соединение MQTT отключено :{}", throwable.getMessage());
log.error(throwable.getMessage(), throwable);
}
/**
* После успешной публикации публикация будет выполнена здесь.
*/
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
log.info("Сообщение опубликовано успешно");
}
/**
* Сообщение, полученное после подписки, будет выполнено здесь
*/
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
// TODO Здесь вы можете осуществлять бизнес-обработку и хранение данных о подписанных сообщениях.
log.info("Получено от " + topic + " Сообщение: {}", new String(message.getPayload()));
}
}
4. В связи с потребностями бизнес-сценария при запуске проекта мониторим тему MQTT и пишем прослушиватель MQTT.
package com.siborui.dc.mqtt;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Component;
import com.siborui.dc.mqtt.Callback;
/**
* Старт проекта монитортема
*
* @author Mr.Qu
* @since 2020/1/10
*/
@Slf4j
@Component
public class MQTTListener implements ApplicationListener<ContextRefreshedEvent> {
private final MQTTConnect server;
@Autowired
public MQTTListener(MQTTConnect server) {
this.server = server;
}
@Override
public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
try {
server.setMqttClient("admin", "public", new Callback());
server.sub("com/iot/init");
} catch (MqttException e) {
log.error(e.getMessage(), e);
}
}
}
Издатель: Лидер стека программистов полного стека, укажите источник для перепечатки: https://javaforall.cn/140440.html Исходная ссылка: https://javaforall.cn