MQTT — это легкий протокол связи, подходящий для Интернета вещей (IoT) и сетевых сред с низкой пропускной способностью. Он основан на модели «публикация/подписка», при которой устройства отправляют данные (также называемые «публикацией») брокеру (называемому MQTT-брокером), который сохраняется и при необходимости пересылается подписчикам. Этот подход упрощает управление сетью, позволяет нескольким устройствам обмениваться данными в различных сетевых условиях (включая ограничения по задержке и пропускной способности) и поддерживает обновление данных в реальном времени. Он открыт, бесплатен в использовании и прост в реализации.
Сообщения протокола MQTT состоят из двух частей: фиксированного заголовка и переменного заголовка.
Формат фиксированного заголовка единый и включает два поля: тип сообщения и оставшаяся длина.
Формат заголовка переменной зависит от типа сообщения.
Ниже приводится описание переменных полей заголовка каждого типа сообщения в протоколе MQTT.
(1) CONNECT: сообщение запроса на соединение MQTT.
Сообщение CONNECT состоит из двух частей: фиксированного заголовка и переменного заголовка. Среди них первый байт фиксированного заголовка (то есть комбинация типа сообщения и бита флага) имеет значение 0x10, что указывает на то, что это сообщение CONNECT.
Заголовок переменной включает в себя следующие поля:
(2) CONNACK: ответное сообщение о соединении MQTT.
Сообщение CONNACK включает в себя фиксированный заголовок и переменный заголовок. Среди них первый байт фиксированного заголовка — 0x20, что указывает на то, что это сообщение CONNACK.
Заголовок переменной включает в себя следующие поля:
(3) ПУБЛИКАЦИЯ: MQTT публикует сообщения.
Сообщение PUBLISH включает фиксированный заголовок и переменный заголовок, а также тело сообщения. Среди них первый байт фиксированного заголовка состоит из типа сообщения и уровня QoS. Уровень QoS может быть 0, 1 или 2.
Заголовок переменной включает в себя следующие поля:
Тело сообщения содержит содержимое публикуемого сообщения.
(4) PUBACK: сообщение с подтверждением выпуска MQTT.
Сообщение PUBACK включает фиксированный заголовок и переменный заголовок. Среди них первый байт фиксированного заголовка — 0x40, что указывает на то, что это сообщение PUBACK.
Заголовок переменной включает только поле идентификатора пакета (Packet Identifier), которое используется для подтверждения сообщения о выпуске с уровнем QoS 1.
(5) PUBREC: MQTT публикует и получает сообщения.
Сообщение PUBREC включает фиксированный заголовок и переменный заголовок. Среди них первый байт фиксированного заголовка — 0x50, что указывает на то, что это сообщение PUBREC.
Заголовок переменной включает только поле идентификатора пакета (Packet Identifier), которое используется для подтверждения сообщений публикации QoS уровня 2.
(6) PUBREL: MQTT публикует сообщения о выпуске.
Сообщения PUBREL включают фиксированный заголовок и переменный заголовок. Среди них первый байт фиксированного заголовка — 0x62, что указывает на то, что это сообщение PUBREL.
Заголовок переменной включает только поле идентификатора пакета (Packet Identifier), которое используется для подтверждения сообщений публикации QoS уровня 2.
(7) PUBCOMP: сообщение о завершении выпуска MQTT.
Сообщение PUBCOMP состоит из двух частей: фиксированного заголовка и переменного заголовка. Среди них первый байт фиксированного заголовка — 0x70, что указывает на то, что это сообщение PUBCOMP.
Заголовок переменной включает только поле идентификатора пакета (Packet Identifier), которое используется для подтверждения сообщений публикации QoS уровня 2.
(8) ПОДПИСАТЬСЯ: сообщение с запросом на подписку MQTT.
Сообщение SUBSCRIBE включает фиксированный заголовок и переменный заголовок. Среди них первый байт фиксированного заголовка — 0x82, что указывает на то, что это сообщение SUBSCRIBE.
Заголовок переменной включает в себя следующие поля:
(9) SUBACK: сообщение подтверждения подписки MQTT.
Сообщение SUBACK состоит из двух частей: фиксированного заголовка и переменного заголовка. Среди них первый байт фиксированного заголовка — 0x90, что указывает на то, что это сообщение SUBACK.
Заголовок переменной включает в себя следующие поля:
(10) ОТПИСАТЬСЯ: сообщение MQTT об отмене подписки.
Сообщение UNSUBSCRIBE состоит из двух частей: фиксированного заголовка и переменного заголовка. Среди них первый байт фиксированного заголовка — 0xA2, указывающий, что это сообщение UNSUBSCRIBE.
Заголовок переменной включает в себя следующие поля:
(11) UNSUBACK: сообщение подтверждения отказа от подписки MQTT.
Сообщение UNSUBACK состоит из двух частей: фиксированного заголовка и переменного заголовка. Среди них первый байт фиксированного заголовка — 0xB0, что указывает на то, что это сообщение UNSUBACK.
Заголовок переменной содержит только поле «Идентификатор пакета», которое используется для подтверждения запроса на отмену подписки.
(12) PINGREQ: сообщение с запросом контрольного сигнала MQTT.
Сообщение PINGREQ включает фиксированный заголовок и переменный заголовок. Среди них первый байт фиксированного заголовка — 0xC0, указывающий, что это сообщение PINGREQ.
Сообщения PINGREQ не содержат переменных полей заголовка.
(13) PINGRESP: ответное сообщение контрольного сигнала MQTT.
Сообщения PINGRESP включают фиксированный заголовок и переменный заголовок. Среди них первый байт фиксированного заголовка — 0xD0, что указывает на то, что это сообщение PINGRESP.
Сообщения PINGRESP не содержат переменных полей заголовка.
(14) DISCONNECT: сообщение об отключении MQTT.
Сообщения DISCONNECT включают фиксированный заголовок и переменный заголовок. Среди них первый байт фиксированного заголовка — 0xE0, указывающий, что это сообщение DISCONNECT.
Сообщения DISCONNECT не содержат переменных полей заголовка.
Это пример использования языка C для установления TCP-связи и отправки сообщений MQTT в Linux. Инкапсулируйте протокол самостоятельно согласно сообщению MQTT.
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <unistd.h>
// Определить тип сообщения MQTT
#define MQTT_CONNECT 0x10
#define MQTT_CONNACK 0x20
#define MQTT_PUBLISH 0x30
#define MQTT_PUBACK 0x40
#define MQTT_SUBSCRIBE 0x80
#define MQTT_SUBACK 0x90
#define MQTT_UNSUBSCRIBE 0xA0
#define MQTT_UNSUBACK 0xB0
#define MQTT_PINGREQ 0xC0
#define MQTT_PINGRESP 0xD0
#define MQTT_DISCONNECT 0xE0
// Определить флаг MQTTсоединять
#define MQTT_CONNECT_FLAG_CLEAN 0x02
#define MQTT_CONNECT_FLAG_WILL 0x04
#define MQTT_CONNECT_FLAG_WILL_QOS0 0x00
#define MQTT_CONNECT_FLAG_WILL_QOS1 0x08
#define MQTT_CONNECT_FLAG_WILL_QOS2 0x10
#define MQTT_CONNECT_FLAG_WILL_RETAIN 0x20
#define MQTT_CONNECT_FLAG_PASSWORD 0x40
#define MQTT_CONNECT_FLAG_USERNAME 0x80
// Определить структуру сообщения MQTT
typedef struct mqtt_packet
{
unsigned char *data;
unsigned int length;
}
mqtt_packet_t;
// Создайте сокетсоединять и верните дескриптор файла сокета.
int socket_connect(char *address, int port)
{
struct sockaddr_in server_address;
int socket_fd = socket(AF_INET, SOCK_STREAM, 0);
if (socket_fd == -1)
{
printf("Failed to create socket!\n");
return -1;
}
server_address.sin_family = AF_INET;
server_address.sin_port = htons(port);
if ((inet_pton(AF_INET, address, &server_address.sin_addr)) <= 0)
{
printf("Invalid address/ Address not supported\n");
return -1;
}
if (connect(socket_fd, (struct sockaddr *)&server_address, sizeof(server_address)) < 0)
{
printf("Connection Failed!\n");
return -1;
}
return socket_fd;
}
// Упаковка сообщения MQTTсоединять
mqtt_packet_t *mqtt_connect(char *client_id, char *username, char *password)
{
mqtt_packet_t *packet = (mqtt_packet_t *)malloc(sizeof(mqtt_packet_t));
unsigned char *data = (unsigned char *)malloc(256);
unsigned int length = 0;
// фиксированный заголовок
data[length++] = MQTT_CONNECT;
// переменный заголовок
data[length++] = 0x0C;
// Очистите флаги сеанса и номера версий протокола.
data[length++] = 'M';
data[length++] = 'Q';
data[length++] = 'T';
data[length++] = 'T';
data[length++] = 0x04;
// номер версии // соединятьлоготип
unsigned char flags = MQTT_CONNECT_FLAG_CLEAN;
if (username != NULL)
{
flags |= MQTT_CONNECT_FLAG_USERNAME;
}
if (password != NULL)
{
flags |= MQTT_CONNECT_FLAG_PASSWORD;
}
data[length++] = flags;
data[length++] = 0xFF;
// Держите время соединения низким 8 цифр
data[length++] = 0xFF;
// Держите время связи высоким 8 цифр // оставшаяся длина
unsigned char remaining_length = length - 1;
data[remaining_length++] = (unsigned char)(length - 2);
packet->data = data;
packet->length = length;
return packet;
}
// Отправить MQTT-сообщение
void mqtt_send(int socket_fd, mqtt_packet_t *packet)
{
if (send(socket_fd, packet->data, packet->length, 0) < 0)
{
printf("Failed to send message!\n");
}
}
// Получать сообщения MQTT
int mqtt_recv(int socket_fd, mqtt_packet_t *packet)
{
unsigned char header[2];
if (recv(socket_fd, header, 2, 0) != 2)
{
printf("Failed to receive message header!\n");
return -1
}
unsigned int remaining_length = 0;
unsigned int multiplier = 1;
int i = 1;
do
{
if (recv(socket_fd, &header[i], 1, 0) != 1)
{
printf("Failed to receive remaining_length byte %d!\n", i);
return -1;
}
remaining_length += (header[i] & 127) * multiplier;
multiplier *= 128;
i++;
}
while ((header[i - 1] & 128) != 0);
packet->length = remaining_length + i;
packet->data = (unsigned char *)malloc(packet->length);
memcpy(packet->data, header, 2);
if (recv(socket_fd, packet->data + 2, packet->length - 2, 0) != packet->length - 2)
{
printf("Failed to receive full message!\n");
return -1;
}
return 0;
}
int main(int argc, char *argv[])
{
// Учреждать TCP соединять
int socket_fd = socket_connect("test.mosquitto.org", 1883);
if (socket_fd == -1)
{
printf("Failed to connect to MQTT server!\n");
return -1;
}
printf("Connected to MQTT server!\n");
// Упаковать и отправить MQTT соединятьсообщение
mqtt_packet_t *connect_packet = mqtt_connect("test_client", NULL, NULL);
mqtt_send(socket_fd, connect_packet);
printf("Sent MQTT CONNECT packet!\n");
free(connect_packet->data);
free(connect_packet);
// перенимать MQTT CONNACK сообщение
mqtt_packet_t *connack_packet = (mqtt_packet_t *)malloc(sizeof(mqtt_packet_t));
if (mqtt_recv(socket_fd, connack_packet) != 0)
{
printf("Failed to receive MQTT CONNACK packet!\n");
return -1;
}
if (connack_packet->data[1] != 0x00)
{
printf("MQTT server rejected connection!\n");
return -1;
}
printf("Received MQTT CONNACK packet!\n");
free(connack_packet->data);
free(connack_packet);
// отключиться TCP соединять close(socket_fd); return 0;
}