Реализация «предлагаемой коллекции» сервера и клиента MQTT на основе MQTTnet 3.0.12.
Реализация «предлагаемой коллекции» сервера и клиента MQTT на основе MQTTnet 3.0.12.

Всем привет, мы снова встретились, я ваш друг Цюаньчжаньцзюнь.

Реализация сервера и клиента MQTT на базе MQTTnet 3.0.12)

Обзор - Ерунда смотреть или не смотреть

Концепция Интернета вещей сейчас очень популярна. Латиао уже семь лет работает в отрасли как «инженер по автоматизации», но на самом деле до сих пор его отрасль все еще относительно ограничивается преобразованием автоматизированных производственных линий. Что касается промышленного интеллектуального соединения, у нас больше опыта в том, как соединить островные машины конвейерного типа в горизонтальном направлении. Однако в вертикальном направлении, как осуществлять интеллектуальный анализ данных или как выполнять интеллектуальный анализ данных эффективно и гибко. требует дополнительных занятий. Есть еще много вещей. MQTT — это протокол, предложенный IBM уже давно, но, к сожалению, он никогда не использовался. Новый проект компании цитировал библиотеку с открытым исходным кодом MQTTnet, но автор GitHub слишком сильно поторопился с новой версией. Обновление изменилось. много чего в старой версии, так что топовые статьи на Baidu нельзя напрямую использовать для справки. В отчаянии я сошел с ума по Baidu + прочитал исходный код на Git, и наконец придумал небольшую демо, которая есть. записано ниже.

Что такое MQTT

MQTT — это стандартный протокол обмена сообщениями OASIS для Интернета вещей (IoT). Он спроектирован как чрезвычайно легкий транспорт сообщений для публикации/подписки, идеально подходящий для подключения удаленных устройств с небольшим объемом кода и минимальной пропускной способностью сети. Сегодня MQTT широко используется в таких отраслях, как автомобилестроение, производство, телекоммуникации, нефть и газ и других.

Хорошо, я скопировал этот абзац с официального сайта MQTT, оригинальный английский текст, дословный перевод браузера, Aojiao. 官网指路:https://mqtt.org/

MQTTnet

MQTTnet — это высокопроизводительная библиотека .NET, основанная на связи MQTT, которая предоставляет клиент MQTT и сервер MQTT (брокер). На данный момент последней версией является 3.0.12.0, которая поддерживает ядро ​​.net и версии MQTT 3.X и 5.0.

https://github.com/chkr1011/MQTTnet Путь Git для MQTTnet.

текст

Эта демонстрация разработана как программа формы Winform. На основе MQTTnet реализовано создание MQTT-сервера или брокера, а в форме представлена ​​функция создания MQTT-клиента. После подключения MQTT-клиента к серверу реализуются основные функции подписки и публикации тем. нажав кнопку.

Реализация MQTT-сервера/брокера

Идея создания MQTT Server вполне понятна, в основном реализация различных событий MqttServer поначалу очень смущала, потому что написание изменений в 3.0.12 полностью отличается от 2.X. Вот фрагмент кода MQTTnet для облегчения понимания реализации класса:

Создание MQTT-сервера/брокера

  • Сначала определите MqttServerOptions — параметры, передаваемые при запуске службы Mqtt. Определите различные параметры при запуске службы: IP, порт, пароль учетной записи и т. д.
Язык кода:javascript
копировать
var optionsBuilder = new MqttServerOptionsBuilder();
  • Создайте экземпляр MqttServer и делегируйте различные события MqttServer.
Язык кода:javascript
копировать
mqttServer = new MqttFactory().CreateMqttServer() as MqttServer;
mqttServer.StartedHandler = new MqttServerStartedHandlerDelegate(OnMqttServerStarted);
mqttServer.StoppedHandler = new MqttServerStoppedHandlerDelegate(OnMqttServerStopped);

Вот готовый код реализации:

Язык кода:javascript
копировать
public MqttServer mqttServer = null;
public async void StartMqttServer()
{ 
   
    try
    { 
   
        if (mqttServer == null)
        { 
   
            var config = ReadConfiguration();
            var optionsBuilder = new MqttServerOptionsBuilder()
            .WithDefaultEndpoint().WithDefaultEndpointPort(int.Parse(config["Port"].ToString())).WithConnectionValidator(
                c =>
                        { 
   
                            var currentUser = config["Users"][0]["UserName"].ToString();
                            var currentPWD = config["Users"][0]["Password"].ToString();

                            if (currentUser == null || currentPWD == null)
                            { 
   
                                c.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
                                return;
                            }

                            if (c.Username != currentUser)
                            { 
   
                                c.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
                                return;
                            }

                            if (c.Password != currentPWD)
                            { 
   
                                c.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
                                return;
                            }

                            c.ReasonCode = MqttConnectReasonCode.Success;
                        }).WithSubscriptionInterceptor(
                c =>
                { 
   
                    c.AcceptSubscription = true;
                }).WithApplicationMessageInterceptor(
                c =>
                { 
   
                    c.AcceptPublish = true;
                });

            mqttServer = new MqttFactory().CreateMqttServer() as MqttServer;
            mqttServer.StartedHandler = new MqttServerStartedHandlerDelegate(OnMqttServerStarted);
            mqttServer.StoppedHandler = new MqttServerStoppedHandlerDelegate(OnMqttServerStopped);

            mqttServer.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(OnMqttServerClientConnected);
            mqttServer.ClientDisconnectedHandler = new MqttServerClientDisconnectedHandlerDelegate(OnMqttServerClientDisconnected);
            mqttServer.ClientSubscribedTopicHandler = new MqttServerClientSubscribedHandlerDelegate(OnMqttServerClientSubscribedTopic);
            mqttServer.ClientUnsubscribedTopicHandler = new MqttServerClientUnsubscribedTopicHandlerDelegate(OnMqttServerClientUnsubscribedTopic);
            mqttServer.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(OnMqttServer_ApplicationMessageReceived);


            await mqttServer.StartAsync(optionsBuilder.Build());
            lbxMonitor.BeginInvoke(_updateMonitorAction,
                Logger.TraceLog(Logger.Level.Info, "MQTT Server is started."));

        }
    }
    catch (Exception ex)
    { 
   
        lbxMonitor.BeginInvoke(_updateMonitorAction,
                    Logger.TraceLog(Logger.Level.Fatal, $"MQTT Server start fail.>{ex.Message}"));
    }
}

public async void StopMqttServer()
{ 
   
    if (mqttServer == null) return;
    try
    { 
   
        await mqttServer?.StopAsync();
        mqttServer = null;
        lbxMonitor.BeginInvoke(_updateMonitorAction,
                Logger.TraceLog(Logger.Level.Info, "MQTT Server is stopped."));
    }
    catch (Exception ex)
    { 
   
        lbxMonitor.BeginInvoke(_updateMonitorAction,
                    Logger.TraceLog(Logger.Level.Fatal, $"MQTT Server stop fail.>{ex.Message}"));
    }
}

Сервер/брокер MQTT публикует сообщения

-С точки зрения конструкции MQTT сервер играет роль агента, а подписчики и издатели — клиентов. Поэтому, вообще говоря, подписка и публикация сообщений должны выполняться клиентом. Однако сервер, естественно, также может выступать в роли издателя. Я не думал о реальном сценарии использования этой функции. Давайте сначала реализуем эту функцию.

  • Это очень грубо. Сначала создайте экземпляр объекта MqttApplicationMessage, а затем вызовите MqttServer.PublishAsync в качестве параметра для публикации сообщения.
Язык кода:javascript
копировать
public async void ServerPublishMqttTopic(string topic, string payload)
{ 
   
    var message = new MqttApplicationMessage()
    { 
   
        Topic = topic,
        Payload = Encoding.UTF8.GetBytes(payload)
    };
    await mqttServer.PublishAsync(message);
    lbxMonitor.BeginInvoke(_updateMonitorAction,
            Logger.TraceLog(Logger.Level.Info, string.Format("MQTT Брокер успешно опубликовал тему [{0}]! ", topic)));
}

Конечно, MqttApplicationMessage также имеет множество значений атрибутов, которые можно передавать, например QoS (MqttQualityOfServiceLevel), Retain и т. д. Здесь, чтобы сначала реализовать функцию, передаются только самые простые Topic и Payload.

Реализация MQTT-клиента

Создание MQTT-клиента

Общая идея реализации такая же, как и на стороне сервера. Объявите MqttClientOptions, назначьте различные параметры, необходимые для подключения к серверу, и, наконец, используйте его в качестве параметра MqttClient.ConnectAsync для подключения к серверу. Здесь я попробовал WillMessage в MqttClientOptions, который на самом деле является объектом MqttApplicationMessage. WillMessage используется в качестве механизма последнего слова для широковещательного уведомления, когда сторона клиента и сервера зависает. Следующим шагом будет реализация различных событий MqttClient, которые используются для настройки различных логик Connected, Disconnected и MessageReceived. Отличий от реализации на стороне сервера нет, поэтому я не буду вдаваться в подробности. Код вверх——

Язык кода:javascript
копировать
private MqttClient mqttClient = null;
private async Task ClientStart()
{ 
   
    try
    { 
   
        var tcpServer = txtIPAddr.Text;
        var tcpPort = int.Parse(txtPort.Text.Trim());
        var mqttUser = txtUserName.Text.Trim();
        var mqttPassword = txtPWD.Text.Trim();

        var mqttFactory = new MqttFactory();



        var options = new MqttClientOptions
        { 
   
            ClientId = txtClientID.Text.Trim(),
            ProtocolVersion = MQTTnet.Formatter.MqttProtocolVersion.V311,
            ChannelOptions = new MqttClientTcpOptions
                    { 
   
                        Server = tcpServer,
                        Port = tcpPort
                    },
            WillDelayInterval = 10,
            WillMessage = new MqttApplicationMessage()
            { 
   
                Topic = $"LastWill/{txtClientID.Text.Trim()}",
                Payload= Encoding.UTF8.GetBytes("I Lost the connection!"),
                QualityOfServiceLevel = MqttQualityOfServiceLevel.ExactlyOnce
            }
            
        };
        if (options.ChannelOptions == null)
        { 
   
            throw new InvalidOperationException();
        }

        if (!string.IsNullOrEmpty(mqttUser))
        { 
   
            options.Credentials = new MqttClientCredentials
            { 
   
                Username = mqttUser,
                Password = Encoding.UTF8.GetBytes(mqttPassword)
            };
        }

        options.CleanSession = true;
        options.KeepAlivePeriod = TimeSpan.FromSeconds(5);

        mqttClient = mqttFactory.CreateMqttClient() as MqttClient;
        mqttClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(OnMqttClientConnected);
        mqttClient.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(OnMqttClientDisConnected);
        mqttClient.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(OnSubscriberMessageReceived);
        await mqttClient.ConnectAsync(options);
        lbxMonitor.BeginInvoke(_updateMonitorAction,
                Logger.TraceLog(Logger.Level.Info, $"Клиент[{options.ClientId}] пытается подключиться..."));
    }
    catch (Exception ex)
    { 
   
        lbxMonitor.BeginInvoke(_updateMonitorAction,
                Logger.TraceLog(Logger.Level.Fatal, $"Произошла ошибка при попытке клиента подключиться.>{ex.Message}"));
    }
}

private async Task ClientStop()
{ 
   
    try
    { 
   
        if (mqttClient == null) return;
        await mqttClient.DisconnectAsync();
        mqttClient = null;
    }
    catch (Exception ex)
    { 
   
        lbxMonitor.BeginInvoke(_updateMonitorAction,
                Logger.TraceLog(Logger.Level.Fatal, $"Клиент пытается отключитьсяServerОшибка.>{ex.Message}"));
    }
}

Клиент MQTT публикует сообщения

Логика реализации здесь такая же, как и метод записи и публикация на стороне сервера. Я добавил здесь настройки QoS и Retain в MqttApplicationMessage, а параметры передаются элементом управления на странице формы. Несколько слов об использовании параметра «Сохранить»: «Сохранить» означает «сохранить». Установка значения «Истина» означает, что если на момент публикации этого сообщения нет подписчиков, сообщение будет храниться на стороне сервера до тех пор, пока оно не будет опубликовано, и немедленно удалено после его публикации. подписан. Установлено значение False. Такого эффекта нет.

Язык кода:javascript
копировать
public async void ClientPublishMqttTopic(string topic, string payload)
{ 
   
    try
    { 
   
        var message = new MqttApplicationMessage()
        { 
   
            Topic = topic,
            Payload = Encoding.UTF8.GetBytes(payload),
            QualityOfServiceLevel = (MqttQualityOfServiceLevel)cmbQos.SelectedIndex,
            Retain = bool.Parse(cmbRetain.SelectedItem.ToString())
        };
        await mqttClient.PublishAsync(message);
        lbxMonitor.BeginInvoke(_updateMonitorAction,
                Logger.TraceLog(Logger.Level.Info, string.Format("Клиент [{0}] успешно опубликовал тему [{1}]!", mqttClient.Options.ClientId, topic)));
    }
    catch (Exception ex)
    { 
   
        lbxMonitor.BeginInvoke(_updateMonitorAction,
                    Logger.TraceLog(Logger.Level.Fatal, string.Format("клиент[{0}]Опубликовать тему[{1}]аномальный!>{2}", mqttClient.Options.ClientId, topic,ex.Message)));
    }
}

Клиент MQTT подписывается на сообщения

Вызовите MqttClient.SubscribeAsync и передайте тему сообщения.

Язык кода:javascript
копировать
public async void ClientSubscribeTopic(string topic)
{ 
   
    await mqttClient.SubscribeAsync(topic);
    lbxMonitor.BeginInvoke(_updateMonitorAction,
            Logger.TraceLog(Logger.Level.Info, string.Format("Клиент [{0}] успешно подписался на тему [{1}]!", mqttClient.Options.ClientId, topic)));
}

Сообщение об отказе от подписки MQTT-клиента

Вызовите MqttClient.UnsubscribeAsync, чтобы отменить подписку на сообщения.

Язык кода:javascript
копировать
public async void ClientUnSubscribeTopic(string topic)
{ 
   
    await mqttClient.UnsubscribeAsync(topic);
    lbxMonitor.BeginInvoke(_updateMonitorAction,
            Logger.TraceLog(Logger.Level.Info, string.Format("Клиент [{0}] успешно отменил тему [{1}]!", mqttClient.Options.ClientId, topic)));
}

До сих пор,Реализованы простые функции на основе MQTTnet на стороне сервера и на стороне клиента.,Полный код отправлен наGitHub,Приглашаем всех дать совет и обсудить,Добивайтесь прогресса вместе,Растите вместе!

Издатель: Лидер стека программистов полного стека, укажите источник для перепечатки: https://javaforall.cn/152881.html Исходная ссылка: https://javaforall.cn

boy illustration
Неразрушающее увеличение изображений одним щелчком мыши, чтобы сделать их более четкими артефактами искусственного интеллекта, включая руководства по установке и использованию.
boy illustration
Копикодер: этот инструмент отлично работает с Cursor, Bolt и V0! Предоставьте более качественные подсказки для разработки интерфейса (создание навигационного веб-сайта с использованием искусственного интеллекта).
boy illustration
Новый бесплатный RooCline превосходит Cline v3.1? ! Быстрее, умнее и лучше вилка Cline! (Независимое программирование AI, порог 0)
boy illustration
Разработав более 10 проектов с помощью Cursor, я собрал 10 примеров и 60 подсказок.
boy illustration
Я потратил 72 часа на изучение курсорных агентов, и вот неоспоримые факты, которыми я должен поделиться!
boy illustration
Идеальная интеграция Cursor и DeepSeek API
boy illustration
DeepSeek V3 снижает затраты на обучение больших моделей
boy illustration
Артефакт, увеличивающий количество очков: на основе улучшения характеристик препятствия малым целям Yolov8 (SEAM, MultiSEAM).
boy illustration
DeepSeek V3 раскручивался уже три дня. Сегодня я попробовал самопровозглашенную модель «ChatGPT».
boy illustration
Open Devin — инженер-программист искусственного интеллекта с открытым исходным кодом, который меньше программирует и больше создает.
boy illustration
Эксклюзивное оригинальное улучшение YOLOv8: собственная разработка SPPF | SPPF сочетается с воспринимаемой большой сверткой ядра UniRepLK, а свертка с большим ядром + без расширения улучшает восприимчивое поле
boy illustration
Популярное и подробное объяснение DeepSeek-V3: от его появления до преимуществ и сравнения с GPT-4o.
boy illustration
9 основных словесных инструкций по доработке академических работ с помощью ChatGPT, эффективных и практичных, которые стоит собрать
boy illustration
Вызовите deepseek в vscode для реализации программирования с помощью искусственного интеллекта.
boy illustration
Познакомьтесь с принципами сверточных нейронных сетей (CNN) в одной статье (суперподробно)
boy illustration
50,3 тыс. звезд! Immich: автономное решение для резервного копирования фотографий и видео, которое экономит деньги и избавляет от беспокойства.
boy illustration
Cloud Native|Практика: установка Dashbaord для K8s, графика неплохая
boy illustration
Краткий обзор статьи — использование синтетических данных при обучении больших моделей и оптимизации производительности
boy illustration
MiniPerplx: новая поисковая система искусственного интеллекта с открытым исходным кодом, спонсируемая xAI и Vercel.
boy illustration
Конструкция сервиса Synology Drive сочетает проникновение в интрасеть и синхронизацию папок заметок Obsidian в облаке.
boy illustration
Центр конфигурации————Накос
boy illustration
Начинаем с нуля при разработке в облаке Copilot: начать разработку с минимальным использованием кода стало проще
boy illustration
[Серия Docker] Docker создает мультиплатформенные образы: практика архитектуры Arm64
boy illustration
Обновление новых возможностей coze | Я использовал coze для создания апплета помощника по исправлению домашних заданий по математике
boy illustration
Советы по развертыванию Nginx: практическое создание статических веб-сайтов на облачных серверах
boy illustration
Feiniu fnos использует Docker для развертывания личного блокнота Notepad
boy illustration
Сверточная нейронная сеть VGG реализует классификацию изображений Cifar10 — практический опыт Pytorch
boy illustration
Начало работы с EdgeonePages — новым недорогим решением для хостинга веб-сайтов
boy illustration
[Зона легкого облачного игрового сервера] Управление игровыми архивами
boy illustration
Развертывание SpringCloud-проекта на базе Docker и Docker-Compose