Всем привет, мы снова встретились, я ваш друг Цюаньчжаньцзюнь.
Концепция Интернета вещей сейчас очень популярна. Латиао уже семь лет работает в отрасли как «инженер по автоматизации», но на самом деле до сих пор его отрасль все еще относительно ограничивается преобразованием автоматизированных производственных линий. Что касается промышленного интеллектуального соединения, у нас больше опыта в том, как соединить островные машины конвейерного типа в горизонтальном направлении. Однако в вертикальном направлении, как осуществлять интеллектуальный анализ данных или как выполнять интеллектуальный анализ данных эффективно и гибко. требует дополнительных занятий. Есть еще много вещей. MQTT — это протокол, предложенный IBM уже давно, но, к сожалению, он никогда не использовался. Новый проект компании цитировал библиотеку с открытым исходным кодом MQTTnet, но автор GitHub слишком сильно поторопился с новой версией. Обновление изменилось. много чего в старой версии, так что топовые статьи на Baidu нельзя напрямую использовать для справки. В отчаянии я сошел с ума по Baidu + прочитал исходный код на Git, и наконец придумал небольшую демо, которая есть. записано ниже.
MQTT — это стандартный протокол обмена сообщениями OASIS для Интернета вещей (IoT). Он спроектирован как чрезвычайно легкий транспорт сообщений для публикации/подписки, идеально подходящий для подключения удаленных устройств с небольшим объемом кода и минимальной пропускной способностью сети. Сегодня MQTT широко используется в таких отраслях, как автомобилестроение, производство, телекоммуникации, нефть и газ и других.
Хорошо, я скопировал этот абзац с официального сайта MQTT, оригинальный английский текст, дословный перевод браузера, Aojiao. 官网指路:https://mqtt.org/
MQTTnet — это высокопроизводительная библиотека .NET, основанная на связи MQTT, которая предоставляет клиент MQTT и сервер MQTT (брокер). На данный момент последней версией является 3.0.12.0, которая поддерживает ядро .net и версии MQTT 3.X и 5.0.
https://github.com/chkr1011/MQTTnet Путь Git для MQTTnet.
Эта демонстрация разработана как программа формы Winform. На основе MQTTnet реализовано создание MQTT-сервера или брокера, а в форме представлена функция создания MQTT-клиента. После подключения MQTT-клиента к серверу реализуются основные функции подписки и публикации тем. нажав кнопку.
Идея создания MQTT Server вполне понятна, в основном реализация различных событий MqttServer поначалу очень смущала, потому что написание изменений в 3.0.12 полностью отличается от 2.X. Вот фрагмент кода MQTTnet для облегчения понимания реализации класса:
var optionsBuilder = new MqttServerOptionsBuilder();
mqttServer = new MqttFactory().CreateMqttServer() as MqttServer;
mqttServer.StartedHandler = new MqttServerStartedHandlerDelegate(OnMqttServerStarted);
mqttServer.StoppedHandler = new MqttServerStoppedHandlerDelegate(OnMqttServerStopped);
Вот готовый код реализации:
public MqttServer mqttServer = null;
public async void StartMqttServer()
{
try
{
if (mqttServer == null)
{
var config = ReadConfiguration();
var optionsBuilder = new MqttServerOptionsBuilder()
.WithDefaultEndpoint().WithDefaultEndpointPort(int.Parse(config["Port"].ToString())).WithConnectionValidator(
c =>
{
var currentUser = config["Users"][0]["UserName"].ToString();
var currentPWD = config["Users"][0]["Password"].ToString();
if (currentUser == null || currentPWD == null)
{
c.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
return;
}
if (c.Username != currentUser)
{
c.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
return;
}
if (c.Password != currentPWD)
{
c.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
return;
}
c.ReasonCode = MqttConnectReasonCode.Success;
}).WithSubscriptionInterceptor(
c =>
{
c.AcceptSubscription = true;
}).WithApplicationMessageInterceptor(
c =>
{
c.AcceptPublish = true;
});
mqttServer = new MqttFactory().CreateMqttServer() as MqttServer;
mqttServer.StartedHandler = new MqttServerStartedHandlerDelegate(OnMqttServerStarted);
mqttServer.StoppedHandler = new MqttServerStoppedHandlerDelegate(OnMqttServerStopped);
mqttServer.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(OnMqttServerClientConnected);
mqttServer.ClientDisconnectedHandler = new MqttServerClientDisconnectedHandlerDelegate(OnMqttServerClientDisconnected);
mqttServer.ClientSubscribedTopicHandler = new MqttServerClientSubscribedHandlerDelegate(OnMqttServerClientSubscribedTopic);
mqttServer.ClientUnsubscribedTopicHandler = new MqttServerClientUnsubscribedTopicHandlerDelegate(OnMqttServerClientUnsubscribedTopic);
mqttServer.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(OnMqttServer_ApplicationMessageReceived);
await mqttServer.StartAsync(optionsBuilder.Build());
lbxMonitor.BeginInvoke(_updateMonitorAction,
Logger.TraceLog(Logger.Level.Info, "MQTT Server is started."));
}
}
catch (Exception ex)
{
lbxMonitor.BeginInvoke(_updateMonitorAction,
Logger.TraceLog(Logger.Level.Fatal, $"MQTT Server start fail.>{ex.Message}"));
}
}
public async void StopMqttServer()
{
if (mqttServer == null) return;
try
{
await mqttServer?.StopAsync();
mqttServer = null;
lbxMonitor.BeginInvoke(_updateMonitorAction,
Logger.TraceLog(Logger.Level.Info, "MQTT Server is stopped."));
}
catch (Exception ex)
{
lbxMonitor.BeginInvoke(_updateMonitorAction,
Logger.TraceLog(Logger.Level.Fatal, $"MQTT Server stop fail.>{ex.Message}"));
}
}
-С точки зрения конструкции MQTT сервер играет роль агента, а подписчики и издатели — клиентов. Поэтому, вообще говоря, подписка и публикация сообщений должны выполняться клиентом. Однако сервер, естественно, также может выступать в роли издателя. Я не думал о реальном сценарии использования этой функции. Давайте сначала реализуем эту функцию.
public async void ServerPublishMqttTopic(string topic, string payload)
{
var message = new MqttApplicationMessage()
{
Topic = topic,
Payload = Encoding.UTF8.GetBytes(payload)
};
await mqttServer.PublishAsync(message);
lbxMonitor.BeginInvoke(_updateMonitorAction,
Logger.TraceLog(Logger.Level.Info, string.Format("MQTT Брокер успешно опубликовал тему [{0}]! ", topic)));
}
Конечно, MqttApplicationMessage также имеет множество значений атрибутов, которые можно передавать, например QoS (MqttQualityOfServiceLevel), Retain и т. д. Здесь, чтобы сначала реализовать функцию, передаются только самые простые Topic и Payload.
Общая идея реализации такая же, как и на стороне сервера. Объявите MqttClientOptions, назначьте различные параметры, необходимые для подключения к серверу, и, наконец, используйте его в качестве параметра MqttClient.ConnectAsync для подключения к серверу. Здесь я попробовал WillMessage в MqttClientOptions, который на самом деле является объектом MqttApplicationMessage. WillMessage используется в качестве механизма последнего слова для широковещательного уведомления, когда сторона клиента и сервера зависает. Следующим шагом будет реализация различных событий MqttClient, которые используются для настройки различных логик Connected, Disconnected и MessageReceived. Отличий от реализации на стороне сервера нет, поэтому я не буду вдаваться в подробности. Код вверх——
private MqttClient mqttClient = null;
private async Task ClientStart()
{
try
{
var tcpServer = txtIPAddr.Text;
var tcpPort = int.Parse(txtPort.Text.Trim());
var mqttUser = txtUserName.Text.Trim();
var mqttPassword = txtPWD.Text.Trim();
var mqttFactory = new MqttFactory();
var options = new MqttClientOptions
{
ClientId = txtClientID.Text.Trim(),
ProtocolVersion = MQTTnet.Formatter.MqttProtocolVersion.V311,
ChannelOptions = new MqttClientTcpOptions
{
Server = tcpServer,
Port = tcpPort
},
WillDelayInterval = 10,
WillMessage = new MqttApplicationMessage()
{
Topic = $"LastWill/{txtClientID.Text.Trim()}",
Payload= Encoding.UTF8.GetBytes("I Lost the connection!"),
QualityOfServiceLevel = MqttQualityOfServiceLevel.ExactlyOnce
}
};
if (options.ChannelOptions == null)
{
throw new InvalidOperationException();
}
if (!string.IsNullOrEmpty(mqttUser))
{
options.Credentials = new MqttClientCredentials
{
Username = mqttUser,
Password = Encoding.UTF8.GetBytes(mqttPassword)
};
}
options.CleanSession = true;
options.KeepAlivePeriod = TimeSpan.FromSeconds(5);
mqttClient = mqttFactory.CreateMqttClient() as MqttClient;
mqttClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(OnMqttClientConnected);
mqttClient.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(OnMqttClientDisConnected);
mqttClient.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(OnSubscriberMessageReceived);
await mqttClient.ConnectAsync(options);
lbxMonitor.BeginInvoke(_updateMonitorAction,
Logger.TraceLog(Logger.Level.Info, $"Клиент[{options.ClientId}] пытается подключиться..."));
}
catch (Exception ex)
{
lbxMonitor.BeginInvoke(_updateMonitorAction,
Logger.TraceLog(Logger.Level.Fatal, $"Произошла ошибка при попытке клиента подключиться.>{ex.Message}"));
}
}
private async Task ClientStop()
{
try
{
if (mqttClient == null) return;
await mqttClient.DisconnectAsync();
mqttClient = null;
}
catch (Exception ex)
{
lbxMonitor.BeginInvoke(_updateMonitorAction,
Logger.TraceLog(Logger.Level.Fatal, $"Клиент пытается отключитьсяServerОшибка.>{ex.Message}"));
}
}
Логика реализации здесь такая же, как и метод записи и публикация на стороне сервера. Я добавил здесь настройки QoS и Retain в MqttApplicationMessage, а параметры передаются элементом управления на странице формы. Несколько слов об использовании параметра «Сохранить»: «Сохранить» означает «сохранить». Установка значения «Истина» означает, что если на момент публикации этого сообщения нет подписчиков, сообщение будет храниться на стороне сервера до тех пор, пока оно не будет опубликовано, и немедленно удалено после его публикации. подписан. Установлено значение False. Такого эффекта нет.
public async void ClientPublishMqttTopic(string topic, string payload)
{
try
{
var message = new MqttApplicationMessage()
{
Topic = topic,
Payload = Encoding.UTF8.GetBytes(payload),
QualityOfServiceLevel = (MqttQualityOfServiceLevel)cmbQos.SelectedIndex,
Retain = bool.Parse(cmbRetain.SelectedItem.ToString())
};
await mqttClient.PublishAsync(message);
lbxMonitor.BeginInvoke(_updateMonitorAction,
Logger.TraceLog(Logger.Level.Info, string.Format("Клиент [{0}] успешно опубликовал тему [{1}]!", mqttClient.Options.ClientId, topic)));
}
catch (Exception ex)
{
lbxMonitor.BeginInvoke(_updateMonitorAction,
Logger.TraceLog(Logger.Level.Fatal, string.Format("клиент[{0}]Опубликовать тему[{1}]аномальный!>{2}", mqttClient.Options.ClientId, topic,ex.Message)));
}
}
Вызовите MqttClient.SubscribeAsync и передайте тему сообщения.
public async void ClientSubscribeTopic(string topic)
{
await mqttClient.SubscribeAsync(topic);
lbxMonitor.BeginInvoke(_updateMonitorAction,
Logger.TraceLog(Logger.Level.Info, string.Format("Клиент [{0}] успешно подписался на тему [{1}]!", mqttClient.Options.ClientId, topic)));
}
Вызовите MqttClient.UnsubscribeAsync, чтобы отменить подписку на сообщения.
public async void ClientUnSubscribeTopic(string topic)
{
await mqttClient.UnsubscribeAsync(topic);
lbxMonitor.BeginInvoke(_updateMonitorAction,
Logger.TraceLog(Logger.Level.Info, string.Format("Клиент [{0}] успешно отменил тему [{1}]!", mqttClient.Options.ClientId, topic)));
}
До сих пор,Реализованы простые функции на основе MQTTnet на стороне сервера и на стороне клиента.,Полный код отправлен наGitHub,Приглашаем всех дать совет и обсудить,Добивайтесь прогресса вместе,Растите вместе!
Издатель: Лидер стека программистов полного стека, укажите источник для перепечатки: https://javaforall.cn/152881.html Исходная ссылка: https://javaforall.cn