Доброго здравия, уважаемые читатели!
В данной статье обсудим встроенный в ESP-IDF модуль MQTT-клиента – для чего он нужен, как использовать, настраивать и как использовать в своих проектах. Компонент MQTT-клиента называется esp-mqtt
и доступен не только из собственно фреймворка ESP-IDF, но и из фреймворка ArduinoEspressif32. То есть если вы используете в вашем проекте ESP32, то нет нужды использовать сторонние библиотеки (например PubSubClient или аналоги) для подключения к MQTT-серверу, даже если вы пишете скетч под Arduino – все уже есть и встроено, достаточно изучить и начать пользоваться. Чем мы, собственно, сейчас и займемся.
Если вы не знакомы с MQTT протоколом, рекомендую вам вначале ознакомиться с другой статьей “Что такое MQTT
и с чем его едят“ – там вы узнаете об основных компонентах MQTT-транспорта и принципах его работы. Здесь же я лишь в самых кратких словах остановлюсь на этом вопросе.
Осторожно – многа букав!
Обзор
ESP-MQTT — это реализация клиента протокола MQTT, который является облегченным протоколом обмена сообщениями на основе концепции «публикация/подписка». Теперь (теперь – а это с коих пор? – прим.авт.) ESP-MQTT поддерживает не только MQTT v3.1.1, но и MQTT v5.0.
ESP-MQTT является полнофункциональным клиентом и поддерживает следующие функции:
- Подключение к серверу посредством любого типа подключения: MQTT over TCP, SSL with Mbed TLS, MQTT over WebSocket, и MQTT over WebSocket Secure
- Возможность легкой настройки сервера через URI, но вообще имеется несколько вариантов
- Одновременная работа нескольких экземпляров (например для подключения к разным серверам)
- Поддерживаются все стандартные операции протокола: подписка, публикация, аутентификация, LWT-сообщения, keep alive pings и все три уровня QoS.
Отправка сообщений с разными уровнями QoS
В ESP-MQTT реализован механизм повторной отложенной отправки сообщений с использованием очереди исходящих сообщений (outbox).
Существует два способа создания нового сообщения MQTT:
- С помощью блокирующей функции
esp_mqtt_client_publish()
– эта функция пытается отправить сообщение немедленно. - С помощью её неблокирующего аналога
esp_mqtt_client_enqueue()
– эта функция всегда ставит сообщение в очередь отправки.
При этом поведение клиента зависит от уровня QoS, заданного при отправке:
- Сообщения с QoS 0 всегда отправляются только один раз.
- Сообщения с QoS 1 и 2 ведут себя по-разному, поскольку протокол требует дополнительных шагов для завершения процесса. Библиотека ESP-MQTT всегда повторно передает неподтвержденные сообщения публикации QoS 1 и 2, чтобы избежать потери сообщений при сбоях в соединениях (хотя спецификация MQTT требует повторной передачи только при повторном подключении, когда флаг «Clear Session» установлен на 0).
Таким образом, сообщения с QoS 1 и 2, которым может потребоваться повторная передача, всегда ставятся в очередь. Но если для создания сообщения была использована функция esp_mqtt_client_publish()
, то первая попытка передачи произойдет немедленно. Повторная попытка передачи для неподтвержденных сообщений произойдет после времени, заданного в message_retransmit_timeout
при создании подключения.
После истечения времени, заданного через menuconfig в макросе CONFIG_MQTT_OUTBOX_EXPIRED_TIMEOUT_MS, сообщения считаются устаревшими и удаляются, даже если они не были отправлены. При этом, если был включен (enabled) макрос CONFIG_MQTT_REPORT_DELETED_MESSAGES , то будет сгенерировано соответствующее событие для уведомления пользователя.
Использование ESP MQTT API
MQTT-клиент – это отдельная задача FreeRTOS, которая работает “в фоне”, а общение с ней происходит посредством вызова API-шных функций, а также событий и функций обратного вызова. ESP MQTT API имеет потоко-безопасность, и его можно использовать из разных задач одновременно.
Конфигурация ESP MQTT
Прежде чем переходить непосредственно к коду, стоит заглянуть в меню конфигурации ESP-IDF, например как описано здесь. Настроек ESP MQTT не очень много и все они находятся в разделе Component config → ESP-MQTT Configurations:
Здесь вы можете выбрать:
- Какие версии протокола MQTT вы собираетесь использовать в проекте. Конечно, можно оставить обе версии, но это может увеличить размер полученного бинарника, поэтому все что не нужно – смело “выпиливаем”.
- Enable MQTT over SSL – нужна ли вам поддержка защищенных соединений. Я советую так: если сервер “внешний” – то обязательно нужна, если локальный (внутри сети, например HA или на роутере) – то не обязательна.
- Enable MQTT over Websocket – нужна ли вам поддержка вебсокетов.
- Use Incremental Message Id – Установите
[*]
, чтобы идентификатор сообщения генерировался как инкрементное число, а не как случайное значение. По умолчанию используется случайное значение. - Skip publish if disconnected – Активация этой опции заставляет API отбрасывать любые сообщения, если клиент не подключен к серверу. Если эта опция выключена, то сообщения с QoS > 0 будут в любом случае добавлены во внутренний исходящий почтовый ящик для публикации позже, даже если клиент отключен. Это может привести к переполнению памяти в некоторых случаях. Параметр
MQTT_SKIP_PUBLISH_IF_DISCONNECTED
позволяет приложениям переопределять это поведение и не ставить пакеты публикации в очередь в отключенном состоянии. - Report deleted messages – Установите
[*]
, чтобы генерировались соответствующие события для сообщений, которые были удалены из папки «исходящие» до того, как они были корректно отправлены и подтверждено их получение. - MQTT Using custom configurations – разрешить пользователю самому настроить конфигурацию MQTT-брокера.
- Default MQTT over TCP port и Default MQTT over SSL port – здесь вы можете указать порты, используемые для подключения к брокеру “по умолчанию”. Лично я предпочитаю указать их явно при настройке подключения.
- Default MQTT Buffer Size – укажите здесь размер буфера, который используется как для передачи, так и для приема сообщений. Это не размер исходящего почтового ящика (очереди)!
- MQTT task stack size – размер стека для задачи MQTT-клиента. Тут все зависит от ваших потребностей. Например при настройках по умолчанию 6144 ( 6 кБ ) у меня стабильно остается чуть больше 2 кБ стека “не занято”. Поэтому, когда придется идти на режим “жестой экономии” памяти, можно смело уменьшить это значение до 5 кБ ( 5120 ).
- Disable API locks – настройки по умолчанию используют блокировки для защиты внутренних структур. Можно отключить эти блокировки, если пользовательский код не обращается к MQTT API из нескольких параллельных задач, чем ускорите работу и сэкономите память.
- MQTT task priority – приоритет для задачи MQTT-клиента. MQTT не такая важная задача, чтобы ставить ее важнее прикладной. Я обычно не изменяю значение по умолчанию. Если прикладная задача написана правильно, с периодическим уходом в спячку, проблем не будет.
- MQTT transport poll read timeut – таймаут в миллисекундах для операции чтения TCP/IP. Для нормальных сетей 1000 мс – более чем достаточно, но для каких-нибудь 2G – можно попробовать увеличить это значение.
- Number of queued events – количество событий в очереди. Для меня это на текущий момент загадка – help по этому поводу “молчит”, но это явно не размер исходящего ящика.
- Enable MQTT task core selection – разрешает выбор пользователю ядра, на котором будет работать задача MQTT. По умолчанию это CORE0 – системное ядро.
- Use external memory for outbox data – очень полезная опция, если у вас в модуле присутствует дополнительная “внешняя” память (SPIRAM). Позволяет организовать исходящий почтовый ящик не в основной куче, а на внешней памяти. Поскольку сетевые операции передачи сообщений все равно медленнее, чем чтение по QSPI, это практически не повлияет на производительность клиента.
- Enable custom outbox implementation – с помощью этой опции вы можете включить возможность самому написать свою реализацию очереди исходящих сообщений, например с лимитами и приоритетами. Не пользовался пока что, не знаю.
- Outbox message expired timeout[ms] – период времени в миллисекундах, после истечения которого не отправленные сообщения в очереди исходящих сообщений считаются устаревшими и подлежат немедленному уничтожению. По умолчанию это 30 секунд – если за 30 секунд не удалось отправить сообщение по любой причине, то все, оно потеряно.
Не забудьте сохранить настройки перед выходом из утилиты.
Настройка и запуск MQTT – клиента
Прежде чем начинать пользоваться данным API, необходимо включить в проект заголовочный файл:
#include "mqtt_client.h"
Далее необходимо настроить подключение к серверу. Самая сложная часть работы, далее вы поймете почему. Для этого используем функцию esp_mqtt_client_init():
esp_mqtt_client_handle_t esp_mqtt_client_init(const esp_mqtt_client_config_t *config)
Для этого ей необходимо передать ссылку на довольно сложную структуру esp_mqtt_client_config_t, посмотрите как она выглядит:
/** * *MQTT* client configuration structure * * - Default values can be set via menuconfig * - All certificates and key data could be passed in PEM or DER format. PEM format must have a terminating NULL * character and the related len field set to 0. DER format requires a related len field set to the correct length. */ typedef struct esp_mqtt_client_config_t { /** * Broker related configuration */ struct broker_t { /** * Broker address * * - uri have precedence over other fields * - If uri isn't set at least hostname, transport and port should. */ struct address_t { const char *uri; /*!< Complete *MQTT* broker URI */ const char *hostname; /*!< Hostname, to set ipv4 pass it as string) */ esp_mqtt_transport_t transport; /*!< Selects transport*/ const char *path; /*!< Path in the URI*/ uint32_t port; /*!< *MQTT* server port */ } address; /*!< Broker address configuration */ /** * Broker identity verification * * If fields are not set broker's identity isn't verified. it's recommended * to set the options in this struct for security reasons. */ struct verification_t { bool use_global_ca_store; /*!< Use a global ca_store, look esp-tls documentation for details. */ esp_err_t (*crt_bundle_attach)(void *conf); /*!< Pointer to ESP x509 Certificate Bundle attach function for the usage of certificate bundles. Client only attach the bundle, the clean up must be done by the user. */ const char *certificate; /*!< Certificate data, default is NULL. It's not copied nor freed by the client, user needs to clean up.*/ size_t certificate_len; /*!< Length of the buffer pointed to by certificate. */ const struct psk_key_hint *psk_hint_key; /*!< Pointer to PSK struct defined in esp_tls.h to enable PSK authentication (as alternative to certificate verification). PSK is enabled only if there are no other ways to verify broker. It's not copied nor freed by the client, user needs to clean up.*/ bool skip_cert_common_name_check; /*!< Skip any validation of server certificate CN field, this reduces the security of TLS and makes the *MQTT* client susceptible to MITM attacks */ const char **alpn_protos; /*!< NULL-terminated list of supported application protocols to be used for ALPN.*/ const char *common_name; /*!< Pointer to the string containing server certificate common name. If non-NULL, server certificate CN must match this name, If NULL, server certificate CN must match hostname. This is ignored if skip_cert_common_name_check=true. It's not copied nor freed by the client, user needs to clean up.*/ } verification; /*!< Security verification of the broker */ } broker; /*!< Broker address and security verification */ /** * Client related credentials for authentication. */ struct credentials_t { const char *username; /*!< *MQTT* username */ const char *client_id; /*!< Set *MQTT* client identifier. Ignored if set_null_client_id == true If NULL set the default client id. Default client id is ``ESP32_%CHIPID%`` where `%CHIPID%` are last 3 bytes of MAC address in hex format */ bool set_null_client_id; /*!< Selects a NULL client id */ /** * Client authentication * * Fields related to client authentication by broker * * For mutual authentication using TLS, user could select certificate and key, * secure element or digital signature peripheral if available. * */ struct authentication_t { const char *password; /*!< *MQTT* password */ const char *certificate; /*!< Certificate for ssl mutual authentication, not required if mutual authentication is not needed. Must be provided with `key`. It's not copied nor freed by the client, user needs to clean up.*/ size_t certificate_len; /*!< Length of the buffer pointed to by certificate.*/ const char *key; /*!< Private key for SSL mutual authentication, not required if mutual authentication is not needed. If it is not NULL, also `certificate` has to be provided. It's not copied nor freed by the client, user needs to clean up.*/ size_t key_len; /*!< Length of the buffer pointed to by key.*/ const char *key_password; /*!< Client key decryption password, not PEM nor DER, if provided `key_password_len` must be correctly set.*/ int key_password_len; /*!< Length of the password pointed to by `key_password` */ bool use_secure_element; /*!< Enable secure element, available in ESP32-ROOM-32SE, for SSL connection */ void *ds_data; /*!< Carrier of handle for digital signature parameters, digital signature peripheral is available in some Espressif devices. It's not copied nor freed by the client, user needs to clean up.*/ } authentication; /*!< Client authentication */ } credentials; /*!< User credentials for broker */ /** * *MQTT* Session related configuration */ struct session_t { /** * Last Will and Testament message configuration. */ struct last_will_t { const char *topic; /*!< LWT (Last Will and Testament) message topic */ const char *msg; /*!< LWT message, may be NULL terminated*/ int msg_len; /*!< LWT message length, if msg isn't NULL terminated must have the correct length */ int qos; /*!< LWT message QoS */ int retain; /*!< LWT retained message flag */ } last_will; /*!< Last will configuration */ bool disable_clean_session; /*!< *MQTT* clean session, default clean_session is true */ int keepalive; /*!< *MQTT* keepalive, default is 120 seconds When configuring this value, keep in mind that the client attempts to communicate with the broker at half the interval that is actually set. This conservative approach allows for more attempts before the broker's timeout occurs */ bool disable_keepalive; /*!< Set `disable_keepalive=true` to turn off keep-alive mechanism, keepalive is active by default. Note: setting the config value `keepalive` to `0` doesn't disable keepalive feature, but uses a default keepalive period */ esp_mqtt_protocol_ver_t protocol_ver; /*!< *MQTT* protocol version used for connection.*/ int message_retransmit_timeout; /*!< timeout for retransmitting of failed packet */ } session; /*!< *MQTT* session configuration. */ /** * Network related configuration */ struct network_t { int reconnect_timeout_ms; /*!< Reconnect to the broker after this value in miliseconds if auto reconnect is not disabled (defaults to 10s) */ int timeout_ms; /*!< Abort network operation if it is not completed after this value, in milliseconds (defaults to 10s). */ int refresh_connection_after_ms; /*!< Refresh connection after this value (in milliseconds) */ bool disable_auto_reconnect; /*!< Client will reconnect to server (when errors/disconnect). Set `disable_auto_reconnect=true` to disable */ esp_transport_handle_t transport; /*!< Custom transport handle to use. Warning: The transport should be valid during the client lifetime and is destroyed when esp_mqtt_client_destroy is called. */ struct ifreq * if_name; /*!< The name of interface for data to go through. Use the default interface without setting */ } network; /*!< Network configuration */ /** * Client task configuration */ struct task_t { int priority; /*!< *MQTT* task priority*/ int stack_size; /*!< *MQTT* task stack size*/ } task; /*!< FreeRTOS task configuration.*/ /** * Client buffer size configuration * * Client have two buffers for input and output respectivelly. */ struct buffer_t { int size; /*!< size of *MQTT* send/receive buffer*/ int out_size; /*!< size of *MQTT* output buffer. If not defined, defaults to the size defined by ``buffer_size`` */ } buffer; /*!< Buffer size configuration.*/ /** * Client outbox configuration options. */ struct outbox_config_t { uint64_t limit; /*!< Size limit for the outbox in bytes.*/ } outbox; /*!< Outbox configuration. */ } esp_mqtt_client_config_t;
Как говорится – “зацените масштаб проблемы”. Немножко громоздко??? Таки да. Что ж, придется со всем этим разбираться.
Указанная структура esp_mqtt_client_config_t
содержит внутри себя несколько вложенных структур поменьше, как в матрешке:
broker_t
– параметры брокера, в том числе его адрес и параметры SSLcredentials_t
– параметры учетной записи, с помощью которой вы собираетесь подключаться к серверуsession_t
– параметры сессииnetwork_t
– параметры транспортного уровня (сети передачи данных)task_t
– параметры задачи MQTT-клиентаbuffer_t
– параметры буферов приема / передачи транспортного уровняoutbox_config_t
– параметры исходящей очереди (“почтового ящика”)
Некоторые из параметров “перекрывают” параметры, заданные в menuconfig. Если их не заполнять, то значения будут подтянуты из настроек проекта.
Совет: чтобы не заполнять все подряд, в том числе и то, что вам абсолютно не нужно, можно сделать так (один из вариантов):
esp_mqtt_client_config_t mqttCfg; memset(&mqttCfg, 0, sizeof(esp_mqtt_client_config_t));
1. Параметры брокера
Структура broker_t
выглядит так:
struct broker_t { /** * Broker address * * - uri have precedence over other fields * - If uri isn't set at least hostname, transport and port should. */ struct address_t { const char *uri; /*!< Complete *MQTT* broker URI */ const char *hostname; /*!< Hostname, to set ipv4 pass it as string) */ esp_mqtt_transport_t transport; /*!< Selects transport*/ const char *path; /*!< Path in the URI*/ uint32_t port; /*!< *MQTT* server port */ } address; /*!< Broker address configuration */ /** * Broker identity verification * * If fields are not set broker's identity isn't verified. it's recommended * to set the options in this struct for security reasons. */ struct verification_t { bool use_global_ca_store; /*!< Use a global ca_store, look esp-tls documentation for details. */ esp_err_t (*crt_bundle_attach)(void *conf); /*!< Pointer to ESP x509 Certificate Bundle attach function for the usage of certificate bundles. Client only attach the bundle, the clean up must be done by the user. */ const char *certificate; /*!< Certificate data, default is NULL. It's not copied nor freed by the client, user needs to clean up.*/ size_t certificate_len; /*!< Length of the buffer pointed to by certificate. */ const struct psk_key_hint *psk_hint_key; /*!< Pointer to PSK struct defined in esp_tls.h to enable PSK authentication (as alternative to certificate verification). PSK is enabled only if there are no other ways to verify broker. It's not copied nor freed by the client, user needs to clean up.*/ bool skip_cert_common_name_check; /*!< Skip any validation of server certificate CN field, this reduces the security of TLS and makes the *MQTT* client susceptible to MITM attacks */ const char **alpn_protos; /*!< NULL-terminated list of supported application protocols to be used for ALPN.*/ const char *common_name; /*!< Pointer to the string containing server certificate common name. If non-NULL, server certificate CN must match this name, If NULL, server certificate CN must match hostname. This is ignored if skip_cert_common_name_check=true. It's not copied nor freed by the client, user needs to clean up.*/ } verification; /*!< Security verification of the broker */ } broker; /*!< Broker address and security verification */
В свою очередь, здесь две вложенных структуры – address_t
и verification_t
.
1. address_t
отвечает за настройку адреса и других параметров сервера:
struct address_t { const char *uri; /*!< Complete *MQTT* broker URI */ const char *hostname; /*!< Hostname, to set ipv4 pass it as string) */ esp_mqtt_transport_t transport; /*!< Selects transport*/ const char *path; /*!< Path in the URI*/ uint32_t port; /*!< *MQTT* server port */ } address; /*!< Broker address configuration */
где:
uri
– полный путь к брокеру с обязательным указанием префикса протокола и номера порта – напримерssl://server.com:8883
. Ну или если порт не указан, то конфигуратор возьмет его из настроек sdkconfig.h, которые мы рассматривали выше.
или есть альтернативный вариант:
hostname
– имя хоста или его адрес в локальной сети или сети интернетtransport
– тип транспортного протокола, который будет использоваться для передачи пакетов:MQTT_TRANSPORT_OVER_TCP
,MQTT_TRANSPORT_OVER_SSL
,MQTT_TRANSPORT_OVER_WS
,MQTT_TRANSPORT_OVER_WSS
path
– дополнительный путь к скрипту на сервере, если он вдруг используется (относительно хоста)port
– номер порта брокера, если указать 0, то будет взят в соответствии с параметрами конфигурации, которые мы рассматривали выше.
Чтобы не запутаться – у вас есть два пути:
- настроить только
uri
– в этом случае система сама распарситuri
и вытащит оттуда все остальные поля самостоятельно. - настроить все поля, кроме
uri
, по отдельности
Что вы предпочитаете – решать только вам.
Пример заполнения:
esp_mqtt_client_config_t mqttCfg; memset(&mqttCfg, 0, sizeof(esp_mqtt_client_config_t)); ... mqttCfg.broker.address.hostname = "192.168.1.1"; mqttCfg.broker.address.transport = MQTT_TRANSPORT_OVER_TCP; mqttCfg.broker.address.port = 1833;
2. verification_t
– параметры SSL, если оно используется
Что это такое, SSL и TLS, и зачем оно вообще хоть кому-то надо, я уже рассказывал в другой статье. Если вы впервые сталкиваетесь с этим – рекомендую ознакомиться.
Здесь же отмечу только два факта: а) вопреки расхожему мнению, что ESP32 “не тянет” TLS, то могу сказать , что таки “тянет”, и довольно успешно; и б) “никто этим все равно не пользуется” – а вот это вы определенно зря. Но – если сервер находится в локальной сети за NAT-ом и доступа “наружу” нет, то данную структуру действительно можно не заполнять.
struct verification_t { bool use_global_ca_store; /*!< Use a global ca_store, look esp-tls documentation for details. */ esp_err_t (*crt_bundle_attach)(void *conf); /*!< Pointer to ESP x509 Certificate Bundle attach function for the usage of certificate bundles. Client only attach the bundle, the clean up must be done by the user. */ const char *certificate; /*!< Certificate data, default is NULL. It's not copied nor freed by the client, user needs to clean up.*/ size_t certificate_len; /*!< Length of the buffer pointed to by certificate. */ const struct psk_key_hint *psk_hint_key; /*!< Pointer to PSK struct defined in esp_tls.h to enable PSK authentication (as alternative to certificate verification). PSK is enabled only if there are no other ways to verify broker. It's not copied nor freed by the client, user needs to clean up.*/ bool skip_cert_common_name_check; /*!< Skip any validation of server certificate CN field, this reduces the security of TLS and makes the *MQTT* client susceptible to MITM attacks */ const char **alpn_protos; /*!< NULL-terminated list of supported application protocols to be used for ALPN.*/ const char *common_name; /*!< Pointer to the string containing server certificate common name. If non-NULL, server certificate CN must match this name, If NULL, server certificate CN must match hostname. This is ignored if skip_cert_common_name_check=true. It's not copied nor freed by the client, user needs to clean up.*/ } verification; /*!< Security verification of the broker */
где:
use_global_ca_store
– указывает, следует ли использовать глобальное хранилище сертификатов, в противном случае следует заполнить полеcertificate
.(*crt_bundle_attach)(void *conf)
– указатель на функцию, подключающее глобальный сборник корневых сертификатов из хранилища корневых сертификатов Mozilla NSS – вам не нужно будет заботится о подключении корневого сертификата сервера, но взамен вы расплачиваетесь размером свободной памятиcertificate
– указатель на первый байт корневого сертификата сервераcertificate_len
– длина корневого сертификата сервераpsk_hint_key
– указатель на структуру PSK для включения аутентификации PSK (как альтернативы проверке сертификата). PSK используется только в том случае, если нет других способов проверки брокера. Он не копируется и не освобождается клиентом, пользователю необходимо удалить его из памяти при необходимости.skip_cert_common_name_check
– позволяет пропустить проверку сервера по сертификату. То есть как бы SSL, но не совсем. Не рекомендую этого делать без крайней необходимости и только тогда, когда вы понимаете для чего вы это делаете – это не способ исправления ваших ошибок.alpn_protos
– список поддерживаемых протоколов шифрования, завершающийся нулем, для использования в ALPN. Не заполняйте это, если не знаете для чего оно нужно. Я – не знаю 😉common_name
– Указатель на строку, содержащую общее имя сертификата сервера. Если не NULL, CN сертификата сервера должен соответствовать этому имени, Если NULL, CN сертификата сервера должен соответствовать имени хоста. Это игнорируется, если skip_cert_common_name_check=true
Я обычно заполняю только поля certificate
и certificate_len
, ну и use_global_ca_store
при необходимости.
esp_mqtt_client_config_t mqttCfg; memset(&mqttCfg, 0, sizeof(esp_mqtt_client_config_t)); ... mqttCfg.broker.address.hostname = "mqtt.ru"; mqttCfg.broker.address.port = 8883; mqttCfg.broker.address.transport = MQTT_TRANSPORT_OVER_SSL; mqttCfg.broker.verification.skip_cert_common_name_check = false; mqttCfg.broker.verification.use_global_ca_store = false; mqttCfg.broker.verification.certificate = (const char *)mqtt_broker_pem_start; mqttCfg.broker.verification.certificate_len = mqtt_broker_pem_end - mqtt_broker_pem_start;
Фух, с эти справились (вытираем пот со лба). Но не расслабляемся – спереди еще много работы.
2. Параметры учетной записи
Далее нам предстоит заполнить данные вашей учетной записи, с помощью которой будем подключаться к серверу. Все “серьезные” брокеры работают только через логин / пароль, ну или бывает токен безопасности. Конечно, есть варианты, на которых авторизация не требуется, но они только для целей тестирования.
Структура credentials_t
также, как и в предыдущем случае, содержит вложенную структуру authentication_t
:
struct credentials_t { const char *username; /*!< *MQTT* username */ const char *client_id; /*!< Set *MQTT* client identifier. Ignored if set_null_client_id == true If NULL set the default client id. Default client id is ``ESP32_%CHIPID%`` where `%CHIPID%` are last 3 bytes of MAC address in hex format */ bool set_null_client_id; /*!< Selects a NULL client id */ /** * Client authentication * * Fields related to client authentication by broker * * For mutual authentication using TLS, user could select certificate and key, * secure element or digital signature peripheral if available. * */ struct authentication_t { const char *password; /*!< *MQTT* password */ const char *certificate; /*!< Certificate for ssl mutual authentication, not required if mutual authentication is not needed. Must be provided with `key`. It's not copied nor freed by the client, user needs to clean up.*/ size_t certificate_len; /*!< Length of the buffer pointed to by certificate.*/ const char *key; /*!< Private key for SSL mutual authentication, not required if mutual authentication is not needed. If it is not NULL, also `certificate` has to be provided. It's not copied nor freed by the client, user needs to clean up.*/ size_t key_len; /*!< Length of the buffer pointed to by key.*/ const char *key_password; /*!< Client key decryption password, not PEM nor DER, if provided `key_password_len` must be correctly set.*/ int key_password_len; /*!< Length of the password pointed to by `key_password` */ bool use_secure_element; /*!< Enable secure element, available in ESP32-ROOM-32SE, for SSL connection */ void *ds_data; /*!< Carrier of handle for digital signature parameters, digital signature peripheral is available in some Espressif devices. It's not copied nor freed by the client, user needs to clean up.*/ } authentication; /*!< Client authentication */ } credentials; /*!< User credentials for broker */
где:
username
– имя пользователя MQTT-сервера должно быть указано в любом случаеclient_id
– уникальный идентификатор клиента. Если оставить NULL, то API сгенерирует его автоматически по шаблонуESP32_%CHIPID%
, где%CHIPID%
– последние три цифры MAC-адреса в HEX формате. Данный параметр игнорируется, еслиset_null_client_id
== true.set_null_client_id
– позволяет вообще оставить пустымclient_id
, если сервер его не требует.
и данные для дополнительной аутентификации authentication_t
, заполняемые опционально:
authentication.password
– пароль пользователя (если есть)authentication.certificate
– сертификат клиента для взаимной SSL-аутентификации, если таковая требуется. Должен быть указан вместе сkey
.authentication.certificate_len
– длина буфера клиентского сертификата, если он указанauthentication.key
– приватный ключ для для взаимной SSL-аутентификации, если таковая требуется.authentication.key_len
– длина буфера приватного ключаauthentication.key_password
– пароль для расшифровки клиентского ключа, это не PEM и не DER.authentication.key_password_len
– длина буфера пароль для расшифровки клиентского ключаauthentication.use_secure_element
– для ESP32-ROOM-32SE можно использовать встроенный аппаратный secure element для ускорения расчетовauthentication.ds_data
– хендл дескриптора для цифровой подписи, периферийное устройство цифровой подписи доступно в некоторых устройствах Espressif
В реальных проектах мне пока приходилось использовать всего три поля, этого вполне достаточно:
esp_mqtt_client_config_t mqttCfg; memset(&mqttCfg, 0, sizeof(esp_mqtt_client_config_t)); ... mqttCfg.credentials.username = "login"; mqttCfg.credentials.authentication.password = "password"; mqttCfg.credentials.client_id = "esp32_telemeter";
или так:
esp_mqtt_client_config_t mqttCfg; memset(&mqttCfg, 0, sizeof(esp_mqtt_client_config_t)); ... mqttCfg.credentials.username = "login"; mqttCfg.credentials.authentication.password = "password"; mqttCfg.credentials.client_id = NULL; // ESP32_%CHIPID%
3. Параметры сессии
Параметры сессии включают в себя и LWT-сообщение. То есть, несмотря на то, что стандартом MQTT предусмотрена возможность задать LWT-сообщение для нескольких топиков (тем) одновременно, в данной реализации клиента это возможно сделать только единожды:
struct session_t { /** * Last Will and Testament message configuration. */ struct last_will_t { const char *topic; /*!< LWT (Last Will and Testament) message topic */ const char *msg; /*!< LWT message, may be NULL terminated*/ int msg_len; /*!< LWT message length, if msg isn't NULL terminated must have the correct length */ int qos; /*!< LWT message QoS */ int retain; /*!< LWT retained message flag */ } last_will; /*!< Last will configuration */ bool disable_clean_session; /*!< *MQTT* clean session, default clean_session is true */ int keepalive; /*!< *MQTT* keepalive, default is 120 seconds When configuring this value, keep in mind that the client attempts to communicate with the broker at half the interval that is actually set. This conservative approach allows for more attempts before the broker's timeout occurs */ bool disable_keepalive; /*!< Set `disable_keepalive=true` to turn off keep-alive mechanism, keepalive is active by default. Note: setting the config value `keepalive` to `0` doesn't disable keepalive feature, but uses a default keepalive period */ esp_mqtt_protocol_ver_t protocol_ver; /*!< *MQTT* protocol version used for connection.*/ int message_retransmit_timeout; /*!< timeout for retransmitting of failed packet */ } session; /*!< *MQTT* session configuration. */
где:
disable_clean_session
– отключить “чистую сессию“. Флаг “чистый сеанс” намеренно инвертирован, чтобы при инициализации структуры нулями чистая сессия была по умолчанию.keepalive
– интервал пинга, с помощью которого клиент оповещает сервер, что “жив”, по умолчанию 120 секунд. Установка значения в 0 не отключает механизм keepalive, а использует значение по умолчанию.disable_keepalive
– отключает механизм keepalive совсемprotocol_ver
– задайте версию протокола, которую следует использовать для подключения к серверу:MQTT_PROTOCOL_UNDEFINED
,MQTT_PROTOCOL_V_3_1
,MQTT_PROTOCOL_V_3_1_1
илиMQTT_PROTOCOL_V_5
message_retransmit_timeout
– укажите интервал в миллисекундах, через который будет предпринята попытка повторной отправки сообщений с QoS > 0
и параметры LWT-сообщения:
last_will.topic
– топик ( тема )last_will.msg
– содержимое завещанияlast_will.msg_len
– длина сообщения завещанияlast_will.qos
– качество обслуживания: 0, 1 или 2last_will.retain
– сохранять это сообщение на сервередля будущих потомковили нет
Пример заполнения:
esp_mqtt_client_config_t mqttCfg; memset(&mqttCfg, 0, sizeof(esp_mqtt_client_config_t)); ... mqttCfg.session.disable_clean_session = false; mqttCfg.session.keepalive = 60; mqttCfg.session.disable_keepalive = false; mqttCfg.session.last_will.topic = "device1/status"; mqttCfg.session.last_will.msg = "offline"; mqttCfg.session.last_will.msg_len = 6; mqttCfg.session.last_will.qos = 1; mqttCfg.session.last_will.retain = false;
4. Параметры транспортного уровня (сети передачи данных)
Здесь все проще:
struct network_t { int reconnect_timeout_ms; /*!< Reconnect to the broker after this value in miliseconds if auto reconnect is not disabled (defaults to 10s) */ int timeout_ms; /*!< Abort network operation if it is not completed after this value, in milliseconds (defaults to 10s). */ int refresh_connection_after_ms; /*!< Refresh connection after this value (in milliseconds) */ bool disable_auto_reconnect; /*!< Client will reconnect to server (when errors/disconnect). Set `disable_auto_reconnect=true` to disable */ esp_transport_handle_t transport; /*!< Custom transport handle to use. Warning: The transport should be valid during the client lifetime and is destroyed when esp_mqtt_client_destroy is called. */ struct ifreq * if_name; /*!< The name of interface for data to go through. Use the default interface without setting */ } network; /*!< Network configuration */
где:
reconnect_timeout_ms
– интервал в миллисекундах, через который будет предпринята попытка повторного подключения к брокеру, если автоматическое повторное подключение не отключено (по умолчанию 10 с)timeout_ms
– таймаут для сетевых операций (чтение / запись сокета) (по умолчанию 10 с)refresh_connection_after_ms
– обновить соединение после этого значения (в миллисекундах)disable_auto_reconnect
– с помощью этого параметра можно отключить автоматическое переподключение к брокеру (например если вы сами его восстанавливаете)transport
– указатель на хендл транспортного сетевого протокола.if_name
– имя сетевого NETIF-интерфейса, если их несколько. Например, если у вас в системе два подключения – WiFi и Ethernet, то можно заставить работать MQTT только через один из них.
Пример заполнения:
esp_mqtt_client_config_t mqttCfg; memset(&mqttCfg, 0, sizeof(esp_mqtt_client_config_t)); ... mqttCfg.network.timeout_ms = 1000; mqttCfg.network.reconnect_timeout_ms = 3000; mqttCfg.network.disable_auto_reconnect = false;
5. Параметры задачи MQTT-клиента
При необходимости, здесь можно задать параметры задачи MQTT-клиента, отличные от того, что заданы через menuconfig:
struct task_t { int priority; /*!< *MQTT* task priority*/ int stack_size; /*!< *MQTT* task stack size*/ } task; /*!< FreeRTOS task configuration.*/
Я думаю эти параметры в особом пояснении не требуются.
Пример заполнения:
esp_mqtt_client_config_t mqttCfg; memset(&mqttCfg, 0, sizeof(esp_mqtt_client_config_t)); ... mqttCfg.task.priority = 10; mqttCfg.task.stack_size = 5 * 1024;
Можно оставить эту структуру не заполненной, тогда значения будут взяты из файла sdkconfig.h.
6. Параметры буферов приема / передачи транспортного уровня
Здесь, как я уже упоминал, можно задать размеры буферов передачи и приема данных по сети:
struct buffer_t { int size; /*!< size of *MQTT* send/receive buffer*/ int out_size; /*!< size of *MQTT* output buffer. If not defined, defaults to the size defined by ``buffer_size`` */ } buffer; /*!< Buffer size configuration.*/
Например:
esp_mqtt_client_config_t mqttCfg; memset(&mqttCfg, 0, sizeof(esp_mqtt_client_config_t)); ... mqttCfg.buffer.size = 1024; mqttCfg.buffer.out_size = 1024;
Можно оставить эту структуру не заполненной, тогда значения будут взяты из файла sdkconfig.h.
7. Параметры исходящей очереди (“почтового ящика”)
На текущий момент тут всего один параметр – uint64_t
limit
, который указывается в байтах:
struct outbox_config_t { uint64_t limit; /*!< Size limit for the outbox in bytes.*/ } outbox; /*!< Outbox configuration. */
Этот параметр отвечает за то, насколько большим может быть исходящий почтовый ящик. При превышении этого лимита все сообщения, даже с QoS > 0, будут отброшены. Это может быть полезно в условиях нестабильной связи с сервером, когда отправить сообщения зачастую не удается и свободная память “тает на глазах”. К слову, эту опцию разработчикам предложил я пару-тройку лет назад, когда “боролся со связью”, и мое issue было принято.
Просто не заполняйте это, если не желаете устанавливать никаких лимитов.
Итоговый вариант
Устали читать? Что ж, давайте сведем все вышеизложенное в один пример настройки MQTT-клиента.
Допустим мы хотим подключиться к публичному серверу где-то в этих ваших ыньтерьнетах, по протоколу TCP/IP с использованием SSL и авторизации. Тогда процедура инициализации может выглядеть так:
// Корневой сертификат сервера extern const uint8_t mqtt_broker_pem_start[] asm(CONFIG_MQTT_TLS_PEM_START); extern const uint8_t mqtt_broker_pem_end[] asm(CONFIG_MQTT_TLS_PEM_END); // Параметры MQTT клиента esp_mqtt_client_config_t mqttCfg; memset(&mqttCfg, 0, sizeof(esp_mqtt_client_config_t)); // Настраиваем сервер mqttCfg.broker.address.hostname = "mqtt.ru"; mqttCfg.broker.address.port = 8883; mqttCfg.broker.address.transport = MQTT_TRANSPORT_OVER_SSL; // Настраиваем SSL mqttCfg.broker.verification.skip_cert_common_name_check = false; mqttCfg.broker.verification.use_global_ca_store = false; mqttCfg.broker.verification.certificate = (const char *)mqtt_broker_pem_start; mqttCfg.broker.verification.certificate_len = mqtt_broker_pem_end - mqtt_broker_pem_start; // Логин / пароль mqttCfg.credentials.username = "login"; mqttCfg.credentials.authentication.password = "password"; // Автогенерация Client ID mqttCfg.credentials.client_id = NULL; // ESP32_%CHIPID% // Параметры сеанса mqttCfg.session.disable_clean_session = false; mqttCfg.session.keepalive = 60; mqttCfg.session.disable_keepalive = false; // Завещание mqttCfg.session.last_will.topic = "device1/status"; mqttCfg.session.last_will.msg = "offline"; mqttCfg.session.last_will.msg_len = 6; mqttCfg.session.last_will.qos = 1; mqttCfg.session.last_will.retain = false; // Все остальные параметры оставим "по умолчанию"... // Создаем задачу MQTT клиента esp_mqtt_client_handle_t client = esp_mqtt_client_init(&mqttCfg);
Ура! Мы сделали это! Это было непросто, но мы справились.
На самом деле, если говорить серьезно, все эти параметры нужны и важны, просто в некоторых популярных библиотеках для Arduino мы их “не видим” и не может как-то изменить. Здесь же API предоставляет вам возможность для более тонкой настройки.
Используя полученный хендл esp_mqtt_client_handle_t
mqtt
, мы потом сможем взаимодействовать с сервером – подписываться и отправлять сообщения. Как это сделать, мы и рассмотрим ниже. Но…
Мы только что создали, но еще не запустили, задачу MQTT-клиента, и это отнюдь не означает, что он уже подключился к серверу и уже можно начинать подписываться или отправлять сообщения. Задача пока ждет, чтобы её запустили с помощью esp_mqtt_client_start(client)
:
// Создаем задачу MQTT клиента esp_mqtt_client_handle_t client = esp_mqtt_client_init(&mqttCfg); // Запускаем клиент esp_mqtt_client_start(client);
Но делать это пока что несколько преждевременно.
Дело в том, что вся работа клиента происходит асинхронно (это же отдельная задача FreeRTOS), и сразу после выхода из esp_mqtt_client_start()
мы не можем быть уверены – есть подключение к серверу или нет. Например, он может не успеть подключиться, а может и того хуже – вообще нет подключения к сети.
Возникает извечный русский вопрос: а что делать? Можно взять муравья… А очень даже просто – необходимо создать обработчик событий MQTT-клиента, который и будет уведомлять нас о наступлении того или иного события в клиенте. Чем мы и займемся далее.
Создаем обработчик событий
Предупреждение! Attention! Aufmerksamkeit! Для того, чтобы вот это все работало, у вас должен быть предварительно уже запущен системный цикл событий. Я сильно подозреваю, что это уже должно быть сделано, так как WiFi и Ethernet требуют ровно того же самого. И если вы уже как-то подключились к любой сети – то системный цикл по любому должен работать. Но на всякий пожарный случай я вас предупредил – просто проверьте свой код и убедитесь в этом, если не уверены.
MQTT API имеет свой собственный класс событий – MQTT_EVENTS
. Он включает в себя несколько разных идентификаторов событий.
/** * @brief *MQTT* event types. * * User event handler receives context data in `esp_mqtt_event_t` structure with * - client - *MQTT* client handle * - various other data depending on event type * */ typedef enum esp_mqtt_event_id_t { MQTT_EVENT_ANY = -1, MQTT_EVENT_ERROR = 0, /*!< on error event, additional context: connection return code, error handle from esp_tls (if supported) */ MQTT_EVENT_CONNECTED, /*!< connected event, additional context: - session_present flag */ MQTT_EVENT_DISCONNECTED, /*!< disconnected event */ MQTT_EVENT_SUBSCRIBED, /*!< subscribed event, additional context: - msg_id message id - error_handle `error_type` in case subscribing failed - data pointer to broker response, check for errors. - data_len length of the data for this event */ MQTT_EVENT_UNSUBSCRIBED, /*!< unsubscribed event, additional context: msg_id */ MQTT_EVENT_PUBLISHED, /*!< published event, additional context: msg_id */ MQTT_EVENT_DATA, /*!< data event, additional context: - msg_id message id - topic pointer to the received topic - topic_len length of the topic - data pointer to the received data - data_len length of the data for this event - current_data_offset offset of the current data for this event - total_data_len total length of the data received - retain retain flag of the message - qos QoS level of the message - dup dup flag of the message Note: Multiple MQTT_EVENT_DATA could be fired for one message, if it is longer than internal buffer. In that case only first event contains topic pointer and length, other contain data only with current data length and current data offset updating. */ MQTT_EVENT_BEFORE_CONNECT, /*!< The event occurs before connecting */ MQTT_EVENT_DELETED, /*!< Notification on delete of one message from the internal outbox, if the message couldn't have been sent and acknowledged before expiring defined in OUTBOX_EXPIRED_TIMEOUT_MS. (events are not posted upon deletion of successfully acknowledged messages) - This event id is posted only if MQTT_REPORT_DELETED_MESSAGES==1 - Additional context: msg_id (id of the deleted message). */ MQTT_USER_EVENT, /*!< Custom event used to queue tasks into mqtt event handler All fields from the esp_mqtt_event_t type could be used to pass an additional context data to the handler. */ } esp_mqtt_event_id_t;
Мы имеем в своем распоряжении следующие события, на которые можем реагировать:
MQTT_EVENT_ERROR
– ошибка. Любая ошибка, произошедшая в MQTT-клиенте. Если эта ошибка связана с транспортным уровнем, то в дополнительных данных будет передан либо код возврата соединения, либо ошибка esp_tls (если поддерживается). Вы можете на это как-то реагировать, например сохранять записи в журнал или отправлять пользователям.MQTT_EVENT_CONNECTED
– подключение к серверу. Подключение к серверу успешно установлено, теперича уже таки можно начинать подписываться и отправлять сообщения.MQTT_EVENT_DISCONNECTED
– отключение от сервера. Что тот пошло не так и соединение было потеряно. Подписываться и отправлять сообщения в общем случае стало бесполезно. Если у вас отключен автореконнект соединения, можно попытаться подключиться заново. Если у вас несколько вариантов подключения – можно попытаться подключиться к резервному серверу.MQTT_EVENT_SUBSCRIBED
– подписка на тему. Вы отправили команду на “подписаться” и оно таки успешно её выполнило. Теперь ждем событияMQTT_EVENT_DATA
. Но можно как-то прореагировать, например вывести сообщение с лог.MQTT_EVENT_UNSUBSCRIBED
– отмена подписки. То же самое, но сразу после отмены подписки.MQTT_EVENT_PUBLISHED
– публикация сообщения. Вы отправили сообщение и оно было успешно отправлено серверу. Для QoS = 1 или 2 это означает, что сервер подтвердил получение. Но вот получили ли его подписчики “с той стороныЛуны”– еще большой вопрос.MQTT_EVENT_DATA
– входящие данные. Одно из самых “важных” событий, для которого нужно обязательно написать обработчик, если вы подписались на тему и хотите принимать сообщения от брокера.MQTT_EVENT_BEFORE_CONNECT
– перед подключением. Если вам нужно что-то сделать непосредственно перед подключением. Например у вас два варианта сервера: основной и резервный. Здесь вы можете выбрать, какой будете использовать и настроить.MQTT_EVENT_DELETED
– сообщение не было отправлено и было удалено по таймауту. Означает, что ваше отправленное сообщение потеряно и никогда уже не будет получено сервером. После истечения времени, заданного через menuconfig в макросе CONFIG_MQTT_OUTBOX_EXPIRED_TIMEOUT_MS, сообщения считаются устаревшими и удаляются, даже если они не были отправлены.MQTT_USER_EVENT
– пользовательское событие. Пользовательское событие, используемое для постановки задач в очередь обработчика событий MQTT. Все поля типа esp_mqtt_event_t могут использоваться для передачи дополнительных контекстных данных обработчику.
Каждое из этих событий может нести с собой дополнительные данные (additional context), которые могут быть важны. Я пока их здесь приводить не стал, но мы ими обязательно воспользуемся ниже. Описание см. выше или в справочной системе.
Регистрация обработчика событий
Для начала расскажу как создать пустой шаблон обработчика и как его зарегистрировать.
Обработчик событий MQTT-клиента – это не обработчик прерываний. Поэтому его не нужно пихать в IRAM, к нему не предъявляются такие же требования, как к обработчикам прерываний, например в нем можно выводить сообщения с помощью ESP_LOGx()
. Но нужно всегда помнить, что обработчик событий всегда вызывается в контексте задачи – цикла событий, а не в контексте MQTT-клиента. Поэтому попытка “достать” из обработчика внутренние ресурсы задачи MQTT-клиента может кончится плачевно.
Обработчик событий MQTT-клиента может быть один-одинёшенек, на все события сразу, а можно написать и несколько маленьких братьев-обработчиков, по каждому на интересующий вас тип событий – но тогда придется зарегистрировать их отдельно. Я предпочитаю первый путь – во первых так компактнее, во вторых “регистратор” вызывается только один раз, в третьих – циклу событий меньше работы; но есть и недостаток – обработчик будет вызываться для всего подряд, даже если данное событие вам совсем не интересно.
Шаблон обработчика выглядит следующим образом:
static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data) { ESP_LOGD(TAG, "Event dispatched from event loop base=%s, event_id=%" PRIi32 "", base, event_id); esp_mqtt_event_handle_t event = event_data; esp_mqtt_client_handle_t client = event->client; }
здесь стоит немного поподробнее остановиться на его аргументах:
*handler_args
– здесь будет переданы данные, который мы передали регистратору при регистрации события, см. нижеbase
– класс события, всегдаMQTT_EVENTS
, а потому ничуть не интересноevent_id
– идентификатор события, один из перечисленных вышеesp_mqtt_event_id_t
.*event_data
– данные, связанные с событием, это всегда указатель на структуруesp_mqtt_event_handle_t
, её мы разберем чуть ниже.
Зарегистрировать наш обработчик можно с помощью функции :
static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data) { ESP_LOGD(TAG, "Event dispatched from event loop base=%s, event_id=%" PRIi32 "", base, event_id); esp_mqtt_event_handle_t event = event_data; esp_mqtt_client_handle_t client = event->client; } ... ... ... // Создаем задачу MQTT клиента esp_mqtt_client_handle_t client = esp_mqtt_client_init(&mqtt_cfg); // Регистрируем созданный выше обработчик событий // Последний аргумент может использоваться для передачи данных обработчику событий, в данном примере mqtt_event_handler esp_mqtt_client_register_event(client, ESP_EVENT_ANY_ID, mqtt_event_handler, NULL); // Вот теперь можно запускать клиент esp_mqtt_client_start(client);
Хорошо, как зарегистрировать обработчик, мы выяснили. Теперь давайте доведем его до совершенства, ну почти…
Для этого нам придется выяснить, что такое это самое esp_mqtt_event_handle_t
:
/** * *MQTT* event configuration structure */ typedef struct esp_mqtt_event_t { esp_mqtt_event_id_t event_id; /*!< *MQTT* event type */ esp_mqtt_client_handle_t client; /*!< *MQTT* client handle for this event */ char *data; /*!< Data associated with this event */ int data_len; /*!< Length of the data for this event */ int total_data_len; /*!< Total length of the data (longer data are supplied with multiple events) */ int current_data_offset; /*!< Actual offset for the data associated with this event */ char *topic; /*!< Topic associated with this event */ int topic_len; /*!< Length of the topic for this event associated with this event */ int msg_id; /*!< *MQTT* messaged id of message */ int session_present; /*!< *MQTT* session_present flag for connection event */ esp_mqtt_error_codes_t *error_handle; /*!< esp-mqtt error handle including esp-tls errors as well as internal *MQTT* errors */ bool retain; /*!< Retained flag of the message associated with this event */ int qos; /*!< QoS of the messages associated with this event */ bool dup; /*!< dup flag of the message associated with this event */ esp_mqtt_protocol_ver_t protocol_ver; /*!< MQTT protocol version used for connection, defaults to value from menuconfig*/ #ifdef CONFIG_MQTT_PROTOCOL_5 esp_mqtt5_event_property_t *property; /*!< MQTT 5 property associated with this event */ #endif } esp_mqtt_event_t; typedef esp_mqtt_event_t *esp_mqtt_event_handle_t;
где можно найти много вкусного:
event_id
– идентификатор события (да, еще раз)client
– хендл MQTT-клиента, ведь таких клиентов можно иметь несколькоdata
– указатель на очередной входящий блок данных, полученный от сервера – это может быть все входящее сообщение целиком, если оно поместилось в буфер, а может быть и только отдельный кусочекdata_len
– длина указанного выше блока данныхtotal_data_len
– общая длина данных, которую мы должны будет получить в результатеcurrent_data_offset
– смещение текущего блока данных относительно начала сообщенияtopic
– указатель на топик (тему) сообщения, тема всегда представлена целикомtopic_len
– длина строки темыmsg_id
– уникальный идентификатор сообщения (либо случайный, либо последовательный)session_present
– сообщение передано в рамках сеанса (напомню, API позволяет не использовать функционал сеансов)error_handle
– указатель на структуру, содержащую данные об ошибке – передается только для ошибокretain
– флаг, предписывающий серверу хранить сообщениеqos
– уровень обслуживанияdup
– признак того, что это повторное сообщениеprotocol_ver
– версия используемого протокола MQTTproperty
– для MQTT 5.0 передаются дополнительные данные – только из-за этого стоит отключить поддержку MQTT 5.0, если вы не собираетесь его использовать.
Что можно сделать в обработчике событий как минимум:
- Для
MQTT_EVENT_DATA
необходимо написать код, который будет склеивать кусочки данных в одно большое целое – таким образом мы сможем обрабатывать очень большие сообщения, превосходящие размер буфера. - Для
MQTT_EVENT_CONNECTED
можно сразу же переоформить все подписки, если это чистая сессия. - В случае
MQTT_EVENT_ERROR
можно как-то попытаться понять, что произошло, хотя бы вывести сообщение в лог.
Остальные сообщения, в принципе, можно не обрабатывать, по желанию.
Мои “практические” обработчики слишком запутаны, поэтому я пока-что приведу код из примера, он достаточно прост и понятен, но не совсем корректен для практического применения:
static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data) { // Отладка ESP_LOGD(TAG, "Event dispatched from event loop base=%s, event_id=%" PRIi32 "", base, event_id); // Извлекаем нужные данные esp_mqtt_event_handle_t event = event_data; esp_mqtt_client_handle_t client = event->client; int msg_id; // В зависимости от идентификатора события действуем по разному... switch ((esp_mqtt_event_id_t)event_id) { // Подключились к серверу - публикуем некие данные и подписываемся case MQTT_EVENT_CONNECTED: ESP_LOGI(TAG, "MQTT_EVENT_CONNECTED"); msg_id = esp_mqtt_client_publish(client, "/topic/qos1", "data_3", 0, 1, 0); ESP_LOGI(TAG, "sent publish successful, msg_id=%d", msg_id); msg_id = esp_mqtt_client_subscribe(client, "/topic/qos0", 0); ESP_LOGI(TAG, "sent subscribe successful, msg_id=%d", msg_id); msg_id = esp_mqtt_client_subscribe(client, "/topic/qos1", 1); ESP_LOGI(TAG, "sent subscribe successful, msg_id=%d", msg_id); msg_id = esp_mqtt_client_unsubscribe(client, "/topic/qos1"); ESP_LOGI(TAG, "sent unsubscribe successful, msg_id=%d", msg_id); break; // Отключились от сервера case MQTT_EVENT_DISCONNECTED: ESP_LOGI(TAG, "MQTT_EVENT_DISCONNECTED"); break; // Подписка выполнена case MQTT_EVENT_SUBSCRIBED: ESP_LOGI(TAG, "MQTT_EVENT_SUBSCRIBED, msg_id=%d", event->msg_id); msg_id = esp_mqtt_client_publish(client, "/topic/qos0", "data", 0, 0, 0); ESP_LOGI(TAG, "sent publish successful, msg_id=%d", msg_id); break; // Отписка выполнена case MQTT_EVENT_UNSUBSCRIBED: ESP_LOGI(TAG, "MQTT_EVENT_UNSUBSCRIBED, msg_id=%d", event->msg_id); break; // Сообщение опубликовано case MQTT_EVENT_PUBLISHED: ESP_LOGI(TAG, "MQTT_EVENT_PUBLISHED, msg_id=%d", event->msg_id); break; // Поступил блок данных case MQTT_EVENT_DATA: ESP_LOGI(TAG, "MQTT_EVENT_DATA"); printf("TOPIC=%.*s\r\n", event->topic_len, event->topic); printf("DATA=%.*s\r\n", event->data_len, event->data); break; // Ошибка case MQTT_EVENT_ERROR: ESP_LOGI(TAG, "MQTT_EVENT_ERROR"); if (event->error_handle->error_type == MQTT_ERROR_TYPE_TCP_TRANSPORT) { log_error_if_nonzero("reported from esp-tls", event->error_handle->esp_tls_last_esp_err); log_error_if_nonzero("reported from tls stack", event->error_handle->esp_tls_stack_err); log_error_if_nonzero("captured as transport's socket errno", event->error_handle->esp_transport_sock_errno); ESP_LOGI(TAG, "Last errno string (%s)", strerror(event->error_handle->esp_transport_sock_errno)); } break; // Все остальное default: ESP_LOGI(TAG, "Other event id:%d", event->event_id); break; } }
Внимание! В данном примере обработка входящих сообщений сделана не совсем корректно! Ведь если сообщение не уместилось в буфер, мы не получим сообщение целиком. Но подробнее мы это обсудим ниже, в разделе Обработка входящей корреспонденции.
Отправка сообщений
Отправка сообщений не особо сложна, просто и понятно описана в примерах, и не вызывает трудностей в практической реализации у новичков, если они разобрались с протоколом MQTT в целом. Как я уже вскользь упомянул, в MQTT API есть два способа отправить сообщение:
- С помощью блокирующей функции
esp_mqtt_client_publish()
– эта функция пытается отправить сообщение немедленно. - С помощью её неблокирующего аналога
esp_mqtt_client_enqueue()
– эта функция всегда ставит сообщение в очередь отправки.
Рассмотрим их поподробнее.
1. esp_mqtt_client_publish()
Клиент должен отправить брокеру сообщение о публикации каких-либо данных.
int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic, const char *data, int len, int qos, int retain)
где:
client
– хендл MQTT-клиента, который мы получили при его инициализацииtopic
– указатель на строку – топик (тему) сообщенияdata
– указатель на строку – полезные данные, которые необходимо отправить (можно NULL)len
– длина передаваемых данных – можно просто указать 0, чтобы клиент сам посчитал длину строки с помощьюstrlen()
qos
– уровень обслуживанияretain
– флаг, предписывающий серверу хранить сообщение
Возвращает:
- message_id опубликованного сообщения (для QoS=0 message_id всегда будет равен нулю) в случае успеха
- -1 при неудаче
- -2 в случае полного исходящего ящика
Примечания:
- Эта функция всегда пытается отправить сообщение сразу же, в контексте текущей задачи, которая её вызывала.
- Эта функция потокобезопасна.
- Эта функция может блокировать поток задачи на несколько секунд либо из-за таймаута сети (по умолчанию 10 с), либо из-за публикации полезных данных, превышающих внутренний буфер (из-за фрагментации сообщения).
- Для работы этой функции клиенту не обязательно быть подключенным, так как сообщения с QoS > 1 всегда ставятся в очередь. Но если QoS = 0 и соединения с сервером нет, то функция не отправит сообщение и вернет -1. Также, если включена опция
MQTT_SKIP_PUBLISH_IF_DISCONNECTED
, этот API не будет даже пытаться выполнить публикацию с любым QoS, когда клиент не подключен, и всегда будет возвращать -1.
Пример использования:
esp_mqtt_client_publish(client, "/device1/temp", "14.5", 0, 1, 0);
2. esp_mqtt_client_enqueue()
Второй вариант – сразу поставить сообщение в очередь и забыть о нем – об остальном позаботиться MQTT клиент. Это может быть полезно, когда ваша задача, которая отправляет данные, не может ждать 10 секунд.
int esp_mqtt_client_enqueue(esp_mqtt_client_handle_t client, const char *topic, const char *data, int len, int qos, int retain, bool store)
где почти те же самые параметры:
client
– хендл MQTT-клиента, который мы получили при его инициализацииtopic
– указатель на строку – топик (тему) сообщенияdata
– указатель на строку – полезные данные, которые необходимо отправить (можно NULL)len
– длина передаваемых данных – можно просто указать 0, чтобы клиент сам посчитал длину строки с помощьюstrlen()
qos
– уровень обслуживанияretain
– флаг, предписывающий серверу хранить сообщениеstore
– признак, заставляющий запись сообщение в очередь даже с QoS = 0
Возвращает:
- message_id сообщения в случае успеха
- -1 при неудаче
- -2 в случае полного исходящего ящика
Примечания:
- В очередь ставятся только сообщения с QoS > 0 или если store=true. Если QoS = 0 и store=false, сообщение будет отброшено.
- Эта функция только сохраняет сообщение во внутреннем почтовом ящике, а фактическая отправка в сеть выполняется в контексте задячи mqtt-task (в отличие от
esp_mqtt_client_publish()
, которая отправляет сообщение публикации немедленно в контексте пользовательской задачи). - Эта функция потокобезопасна.
- Для работы этой функции клиенту не обязательно быть подключенным. Если включена опция
MQTT_SKIP_PUBLISH_IF_DISCONNECTED
, этот API не будет даже пытаться выполнить публикацию с любым QoS, когда клиент не подключен, и всегда будет возвращать -1.
Пример использования:
esp_mqtt_client_enqueue(client, "/device1/temp", "14.5", 0, 1, 0, false);
Небольшое замечание
Постойте! – скажут некоторые внимательные читатели. Если мы отправляем в функцию строку, а она при этом “лежит” где-то в heap (куче), то в какой момент её нужно оттуда удалить? Если сразу после вызова esp_mqtt_client_publish()
, то не может ли получиться так, что сообщение будет поставлено в очередь, а само сообщение мы уже удалили из памяти??? Хороший вопрос!
На самом деле – можно безопасно удалять из памяти и топик, и сами данные, если они были размещены в оперативной памяти (heap), сразу после выхода из этой функции, равно как и esp_mqtt_client_enqueue()
. Ведь если esp_mqtt_client_publish()
выполнила отправку сообщения немедленно, то эти данные уже как бы и не нужны; а если поставила в очередь – в очередь будет помещена копия топика и данных. Так что смело вызывайте free(topic) или free(message), если это необходимо.
if (free_topic && (topic != NULL)) free(topic); if (free_payload && (payload != NULL)) free(payload);
Оформление подписок
Сразу после подключения целесообразно подписаться на темы, сообщения которых мы хоте ли бы получать. Мы это уже немного упоминали выше, когда рассматривали пример обработчика событий. Для этого существует несколько специально предусмотренных функций:
esp_mqtt_client_subscribe()
илиesp_mqtt_client_subscribe_single()
– подписаться на заданный фильтр темesp_mqtt_client_subscribe_multiple()
– подписаться на несколько заданных фильтров по спискуesp_mqtt_client_unsubscribe()
– отписаться от заданного фильтра тем
Подписка
Для оформления подписки воспользуйтесь следующими функцями:
int esp_mqtt_client_subscribe_single(esp_mqtt_client_handle_t client, const char *topic, int qos)
или используйте устаревший вариант – это просто макрос, указывающий на esp_mqtt_client_subscribe_single()
:
esp_mqtt_client_subscribe(esp_mqtt_client_handle_t client, const char *topic, int qos)
или, если нужно подписаться сразу на целый список фильтров:
int esp_mqtt_client_subscribe_multiple(esp_mqtt_client_handle_t client, const esp_mqtt_topic_t *topic_list, int size)
где:
client
– хендл MQTT-клиента, который мы получили при его инициализацииtopic
– указатель на строку – фильтр тем, на который мы хотим подписатьсяqos
– уровень обслуживания
Примечания:
- Клиент должен быть подключен к серверу для отправки сообщения о подписке
- Эта функция может быть выполнена из пользовательской задачи или из функции обратного вызова события MQTT, т. е. внутренней задачи MQTT (API защищен внутренним мьютексом, поэтому он может быть заблокирован, если выполняется более длительная операция получения данных).
Пример использования:
esp_mqtt_client_subscribe_single(client, "/device1/relay", 2);
Отписка
Если необходимо прекратить получать сообщения из некоторой темы, то воспользуйтесь функцией esp_mqtt_client_unsubscribe()
:
int esp_mqtt_client_unsubscribe(esp_mqtt_client_handle_t client, const char *topic)
где:
client
– хендл MQTT-клиента, который мы получили при его инициализацииtopic
– указатель на строку – фильтр тем, от которого мы хотим отписаться
Примечания:
- Клиент должен быть подключен к серверу для отправки сообщения об отписке
- Эта функция может быть выполнена из пользовательской задачи или из функции обратного вызова события MQTT, т. е. внутренней задачи MQTT (API защищен внутренним мьютексом, поэтому он может быть заблокирован, если выполняется более длительная операция получения данных).
Пример использования:
esp_mqtt_client_unsubscribe(client, "/device1/relay");
Обработка входящей корреспонденции
А как же обрабатывать сообщения, которые мы получаем от сервера в результате подписки? Ведь они могут не поместиться в один буфер, приема!
Я реализовал это так. Во первых, создал статическую переменную, в которой я буду накапливать и хранить все принятое сообщение целиком:
typedef struct { char* topic; uint32_t topic_len; char* data; uint32_t data_len; } re_mqtt_incoming_data_t; static re_mqtt_incoming_data_t in_buffer;
Далее, в обработчике события я поместил следующий код (приведен только “кусок” из обработчика):
case MQTT_EVENT_DATA: if (event_data) { // Если это первый блок - очищаем буфер приема и выделяем память сразу вод весь ожидаемый объем данных if (data->current_data_offset == 0) { if (in_buffer.topic) free(in_buffer.topic); in_buffer.topic = nullptr; if (in_buffer.data) free(in_buffer.data); in_buffer.data = (char*)calloc(1, data->total_data_len+1); }; // Если память выделена, то копируем в нее кусок со смещением if (in_buffer.data) { memcpy(in_buffer.data+data->current_data_offset, data->data, data->data_len); // Если это последний блок сообщения... if (data->current_data_offset + data->data_len == data->total_data_len) { in_buffer.topic = (char*)calloc(1, data->topic_len + 1); if (in_buffer.topic) { strncpy(in_buffer.topic, data->topic, data->topic_len); in_buffer.topic_len = data->topic_len; in_buffer.data_len = data->total_data_len; // Сообщение в журнал ESP_LOGI(logTAG, "Incoming message \"%.*s\": [%s]", data->topic_len, data->topic, in_buffer.data); // Отправляем уведомление в цикл событий о том, что получены данные eventLoopPost(RE_MQTT_EVENTS, RE_MQTT_INCOMING_DATA, &in_buffer, sizeof(in_buffer), portMAX_DELAY); }; }; }; }; break;
В этом примере есть потенциальная проблема: если одновременно попытаются прийти два сообщения разом, и по крайней мере одно из них превышает размер буфера – большое сообщение может быть искажено. На практике я пока с этим не сталкивался, но в будущем постараюсь это исправить (todo). И вы подумайте – домашнее вам задание.
Дополнительные возможности управления MQTT-клиентом
В некоторых случаях могут пригодиться дополнительные функции, имеющиеся в MQTT API.
esp_mqtt_client_stop()
– остановить MQTT-клиент, например при разрыве WiFi соединенияesp_mqtt_set_config()
– перенастроить MQTT-клиентesp_mqtt_client_get_outbox_size()
– получить размер исходящего почтового ящикаesp_mqtt_client_register_event()
– зарегистрировать обработчик(и) событий
А где, #$%, пример?
А нету 🤷♂️ пока целого примера… Но эта статья написана в рамках подготовки к другим статьям, в которых будет обсуждаться создание полноценной прошивки на ESP32 – там вы примеры и найдете. Нужно только подождать.
Но, IMHO, по предоставленным в данной статье примерам, вполне возможно написать собственное приложение для MQTT. Дерзайте.
Засим откланяюсь, с вами был ваш Александр aka kotyara12.
Ссылки
Справочное руководство ESP-IDF -> раздел ESP MQTT
Пожалуйста, оцените статью:
-= Каталог статей (по разделам) =- -= Архив статей (подряд) =-