Перейти к содержимому

Очереди FreeRTOS

Добрый день, уважаемый читатель! Тема сегодняшней статьи – очереди. Но не всякие подряд очереди (потому что могут быть и другие очереди, которые мы обсуждали в одной из предыдущих статей), а конкретно – очереди FreeRTOS. Про очереди написано довольно много статей, например: тыц, тыц, и тыц.  Однако это не помешало мне наломать немного дров и набить себе чуть-чуть виртуальных шишек несколько лет назад, когда я пытался написать свою первую прошивку для FreeRTOS. Особенно при попытках передачи через очередь динамических строк и массивов данных. Поэтому я решил поделиться своей поленницей с вами, возможно это поможет кому-то освоить этот полезный механизм проще и быстрее.

Очередь может выполнять в вашей программе несколько функций:

  • передача произвольных данных между разными задачами / потоками, то есть очередь позволяет максимально просто и безопасно получить какие-либо данные “извне” выполняющейся задачи
  • одновременно с этим очередь может использоваться для приостановки выполнения (блокировки) задачи на определенное время или до появления новых данных, то есть для формирования временных интервалов, наподобие vTaskDelay(), которую мы уже обсуждали.

То есть очереди, если очень кратко, выполняют функции обмена информацией и синхронизации задач между собой.

 


Зачем нужны какие-то очереди?

На однопоточных системах Arduino не использовались никакие очереди – они там просто не требуются, от слова “совсем” (разве что можно иногда использовать связные списки или массивы). Так как в случае с ESP8266 без FreeRTOS ни одна из функций никогда не получит управления, пока другая функция не завершит свою работу. В многопоточных системах всё сложнее.

Самый примитивный способ организовать обмен информацией между задачами – использовать общие глобальные переменные. Доступ к таким переменным может осуществляться одновременно из нескольких задач – почему нет???. Однако при этом может возникнуть очень неприятная ситуация, когда выполнение одной из задач прерывается планировщиком FreeRTOS именно в тот момент, когда она записывает новое значение в эту самую общую переменную. В результате эта самая общая переменная будет содержать искаженное и непредсказуемое значение. Если в этот момент любая из других задач получит управление и обратится к этой переменной с искаженными данными,  результат её работы может быть  также непредсказуемым. И чем больше переменная занимает в памяти байт, тем выше шанс такой ситуации. Например если вы попытаетесь отправить длинное сообщение из одной задачи через общий буфер в памяти, а другая задача начнет “не вовремя” его считывание из буфера и отправку вам в мессенджер, то очень даже вероятно, что вы получите только произвольный кусочек исходного сообщения. Очевидно, что это хоть и самый простой, но не самый лучший путь.

Для решения подобных проблем и были придуманы очереди. Во FreeRTOS очереди представляют собой базовый механизм взаимодействия задач друг с другом. Они могут быть использованы для передачи информации не только между задачами, так и между прерываниями и задачами. Кстати, передавать информацию между задачами можно и с помощью других объектов – например через циклы событий, но и циклы событий, в свою очередь, построены на базе очереди и задачи. Как и многие другие объекты синхронизации задач – во FreeRTOS они представляют собой всего-лишь макросы препроцессора вокруг базового механизма очередей.


Принципы функционирования очереди

Очередь может хранить в себе конечное число элементов фиксированного размера. Максимальное число элементов, которое может хранить очередь, называется размером очереди. Как размер элемента, так и размер очереди задаются при создании очереди и остаются неизменными до ее удаления. Нужно отметить, что память выделяется сразу под все элементы очереди при её создании, то есть пустая и заполненная очередь не отличаются друг от друга по объему занимаемой памяти. При записи элементов в очередь динамического выделения или перераспределения памяти не происходит. То есть при работе с очередью фрагментация кучи не увеличивается (если только вы не передаете через очередь указатель на строку, только что размещенную в куче). Из этого же следует, что создавать очередь под большое количество элементов огромного размера не самая лучшая идея – во-первых менеджер памяти может просто отказать в выделении такого большого объема памяти, а во-вторых это просто варварский способ разбрасываться оперативной памятью.

Кроме того, ESP-FreeRTOS предоставляет нам отличную возможность  – статическое выделение памяти под очередь. Про статическое создание задач я уже рассказывал, очередь так же имеет такую же возможность, и лично я часто её использую. Что дает статическое создание очереди? В этом случае буфер под размещение элементов очереди выделяется не в общей куче уже после запуска FreeRTOS и вашей программы, а ещё при компиляции в сегменте BSS (кто ещё не знаком с распределением памяти в ESP32, рекомендую почитать эту статейку). А это значит, что буфер очереди не сможет случайно “придавить” небольшие временные динамические переменные (что приведет к дополнительной фрагментации памяти).

Обычно элементы записываются в конец («хвост») очереди и считываются с начала («головы») очереди,  то есть очередь функционирует по принципу «первым вошел — первым вышел» (First In First Out, FIFO). Но это не обязательно, можно легко вести запись и в начало очереди, тогда очередь превращается в стек, работающий по принципу «последним вошел — первым вышел» (Last In First Out, LIFO). Запись в очередь вызывает побайтовое копирование данных, которые передаются очереди. Чтение данных из очереди вызывает также копирование данных из очереди, при этом данные из очереди удаляются (хотя имеется и функция чтения без удаления данных из очереди – xQueuePeek). 

Совсем не обязательно создавать очередь на 100500 элементов. Вы вполне можете создать очередь длиной = 1, например просто чтобы отправить задаче какое-либо уведомление. В этом случае можно использовать функцию “перезапиcь” сообщений в очереди (если оно уже там имеется) – xQueueOverwrite.

Очередь — это самостоятельный объект, она не принадлежит ни одной конкретной задаче, любое количество задач могут как читать, так и записывать данные в одну и ту же очередь. На практике чаще всего встречается ситуация, когда некоторое количество задач могут писать в очередь, а вот считывать данные может только одна задача, которая логически связана с данной задачей и создается специально для её обслуживания. Пример: задача отправки сообщений в telegram. Отправлять сообщения и данные могут совершенно разные задачи, а вот забирать отправляемые данные из входящей очереди и отправлять их адресату может только задача отправки. Записывать данные в очередь могут не только задачи, но и прерывания – для этого существуют специальные функции.

 

Блокировка задачи при чтении из очереди

Когда какая-либо задача пытается прочитать данные из очереди, которая не содержит ни одного элемента, то задача приостанавливается – то есть переходит в блокированное состояние. Приостановленная задача вернется в состояние готовности к выполнению, когда другая задача (или прерывание) поместит данные в очередь. После этого планировщик сможет передать управление ожидающей задаче (но если нет задач с более высоким приоритетом).

Выход из блокированного состояния возможен также при истечении заданного тайм-аута, даже если очередь на протяжении всего этого времени оставалась пуста. Данные из очереди могут читать сразу несколько задач. Когда очередь пуста, то все они находятся в блокированном состоянии. Когда в очереди появляется элемент данных, начнет выполняться та задача, которая имеет наибольший приоритет. Возможна ситуация, когда равноприоритетные задачи ожидают появления данных в очереди. В этом случае при поступлении
данных в очередь управление получит та задача, время ожидания которой было наибольшим. Остальные же останутся в блокированном состоянии.

Таймаут чтения из очереди позволяет реализовать механизм формирования интервалов вместо использования vTaskDelay(). В этом случае разблокировка задачи произойдет в двух случаях – либо появятся новые данные в очереди, либо истечет заданный таймаут, в зависимости от того, что наступит ранее. Таймаут можно задать равным нулю, тогда задача чтения вообще не будет блокироваться.

 

Блокировка при записи в очередь

Как и при чтении из очереди, “записывающая” задача может быть заблокирована, ожидая возможность записи в очередь. Это происходит, когда очередь полностью заполнена и в ней нет свободного места для записи нового элемента данных. До тех пор пока какая-либо другая задача не прочитает данные из очереди, задача, которая пишет в очередь, будет «ожидать», находясь в блокированном состоянии. В одну очередь могут писать сразу несколько задач, поэтому возможна ситуация, когда несколько задач находятся в блокированном состоянии, ожидая завершения операции записи в одну очередь. Когда в очереди появится свободное место, получит управление задача с наивысшим приоритетом. В случае если запись в очередь ожидали равноприоритетные задачи, управление получит та, которая находилась в блокированном состоянии дольше остальных.

Однажды я столкнулся с весьма непонятным “глюком” прошивки: при резком увеличении пинга “до интернета” или обрыве связи постепенно “зависали” прикладные задачи, которые никак с интернетом связаны не были. И никаких сообщений в лог. Тихо и постепенно “умирали” большинство задач. С со свободной памятью – все в порядке, WDT не срабатывает, но … Ничего не понятно! Причина оказалась в том, что задача, которая занимается отправкой данных с сенсоров на open monitoring, при прерывании доступа к интернету переставала отправлять данные на хост, и её входящая очередь постепенно заполнялась. А прикладные задачи, как обычно, продолжали исправно отправлять данные в нее. Из-за неверно выбранного таймаута отправки задачи постепенно блокировались без каких-либо сообщений об ошибках. Решение проблемы довольно простое: если не удалось поместить данные в очередь мгновенно (или за пару секунд, если хотите надежности) то просто отбрасываем их. Глюки прекратились.

 


Основные функции для работы с очередью

Давайте кратенько рассмотрим основные функции для работы с очередью. 

Создание очереди

Прежде чем начать пользоваться очередью, её нужно создать. Делается это с помощью следующей функции:

QueueHandle_t xQueueCreate(const UBaseType_t uxQueueLength, const UBaseType_t uxItemSize)

где:

  • uxQueueLength – размер очереди (количество элементов в очереди)
  • uxItemSize – размер одного элемента в очереди

Функция вернет хендл (указатель) на созданную очередь, после этого с ней можно начинать работать. Примечание: на самом деле это не функция, а макрос, который в свою очередь вызывает xQueueGenericCreate(), но сути процесса это никак не меняет

// Создание очереди для управления светодиодом
ESP_LOGV(logTAG, "Creating message queue to control LED on GPIO %d", ledGPIO);
QueueHandle_t ledQueue = xQueueCreate(10, sizeof(ledQueueData_t));
if (ledQueue == NULL) {
  ESP_LOGE(logTAG, "Error creating message queue to control LED on GPIO %d", ledGPIO);
  return NULL;
};

 

Статическое создание очереди

Так как очереди чаще всего создаются один раз только при запуске программы (а после этого не удаляются из памяти до её завершения / перезапуска MCU), имеет смысл создать очередь статически. В этом случае размер занятой оперативной памяти под очередь никак не изменится, но выделенный для этого блок памяти будет перемещен в секцию BSS и не будет оказывать никакого влияния на работу кучи, а следовательно – и на её фрагментацию. Для этого воспользуйтесь похожей функцией:

QueueHandle_t xQueueCreateStatic(const UBaseType_t uxQueueLength, const UBaseType_t uxItemSize, uint8_t *pucQueueStorageStaticQueue_t *pxStaticQueue)

где:

  • uxQueueLength – размер очереди (количество элементов в очереди)
  • uxItemSize – размер одного элемента в очереди
  • pucQueueStorage – указатель на статическую переменную под хранение служебных данных очереди
  • pxStaticQueue – указатель на статически выделенный блок памяти под собственно саму очередь

Я чаще всего пользуюсь именно этим методом. И вам советую.

typedef struct {
  ext_data_service_t kind;
  uint32_t uid;
  char* data;
  time_t timestamp;
} dataSendQueueItem_t;  

#define CONFIG_DATASEND_QUEUE_SIZE 16
#define DATASEND_QUEUE_ITEM_SIZE sizeof(dataSendQueueItem_t)

StaticQueue_t _dataSendQueueBuffer;
uint8_t _dataSendQueueStorage[CONFIG_DATASEND_QUEUE_SIZE * DATASEND_QUEUE_ITEM_SIZE];

QueueHandle_t _dataSendQueue = xQueueCreateStatic(CONFIG_DATASEND_QUEUE_SIZE, DATASEND_QUEUE_ITEM_SIZE, _dataSendQueueStorage[0], &_dataSendQueueBuffer);

 

Удаление очереди

Не могу придумать такую ситуацию в реальном устройстве, когда понадобиться удалить очередь, но такая функция таки присутствует в API:

void vQueueDelete(QueueHandle_t xQueue)

где:

  • xQueue – хендл очереди, который мы получили при её создании

 

Чтение данных из очереди

Для получения первого элемента из очереди, вызовите следующую функцию:

BaseType_t xQueueReceive(QueueHandle_t xQueue, void * const pvBuffer, TickType_t xTicksToWait)

где:

  • xQueue – хендл очереди, который мы получили при её создании
  • pvBuffer – указатель на буфер, размер которого должен соответствовать размеру одного элемента очереди
  • xTicksToWait – количество тиков FreeRTOS, которое мы должны ждать для появления данных. Если передать 0 (ноль) – то данная задача не будет блокироваться вовсе. Если передать макрос portMAX_DELAY – то задача будет ожидать данные бесконечно. Если необходимо периодически выводить задачу из состояния задумчивости, используйте макросы pdMS_TO_TICKS ( время в мс ) или время в мс / portTICK_PERIOD_MS

Если в очереди есть какие-либо данные, функция скопирует данные в предоставленный буфер, удалит первый элемент из очереди и вернет pdTRUE; если данных нет и функция была завершена по истечении таймаута – pdFALSE. Вот примерно так задача отправки уведомлений в Telegram получает “входящие” сообщения от других задач прошивки:

typedef struct {
  char* message;
  time_t timestamp;
} tgMessage_t;

tgMessage_t inMsg;

if (xQueueReceive(_tgQueue, &inMsg, portMAX_DELAY) == pdPASS) {
  ESP_LOGV(logTAG, "New message received: %s", inMsg->message);
  ...
};

 

А что, если нужно только проверить – а есть ли что-нибудь в очереди, но не удалять данные из нее? Просто приоткрыть дверку и посмотреть – есть ли кто в очереди на прием или нет. В этом случае следует воспользоваться похожей функцией:

BaseType_t xQueuePeek(QueueHandle_t xQueue, void * const pvBuffer, TickType_t xTicksToWait)

Параметры у неё точно такие же, как и в предыдущем случае. Отличается она от xQueueReceive только тем, что не удаляет данные из очереди.

 

Помещение данных (запись) в очередь

Для записи в очередь есть свой набор функций:

BaseType_t xQueueSendToBack(QueueHandle_t xQueue, const void * const pvItemToQueue, TickType_t xTicksToWait) 
BaseType_t xQueueSendToFront(QueueHandle_t xQueue, const void * const pvItemToQueue, TickType_t xTicksToWait)
BaseType_t xQueueSend(QueueHandle_t xQueue, const void * const pvItemToQueue, TickType_t xTicksToWait)

Собственно, это также не функции, а макросы, которые “смотрят” на базовую функцию xQueueGeneticSend(). Соответственно, xQueueSendToBack помещает данные в хвост очереди, xQueueSendToFront – в начало, а xQueueSend полностью аналогичен xQueueSendToFront и оставлен только для целей обратной совместимости.

Параметры:

  • xQueue – хендл очереди, который мы получили при её создании
  • pvItemToQueue – указатель на элемент, размер которого должен соответствовать размеру одного элемента очереди. При помещении данных в очередь с него снимается копия, поэтому если это была динамическая переменная (вручную размещенная в куче), после помещения в очередь её можно спокойно удалить из памяти. Но если пересылаемые данные очень большого размера, имеет смысл пересылать указатель на эти данные, а не сами данные.
  • xTicksToWait – количество тиков FreeRTOS, которое мы должны ждать для помещения данных в очередь, если очередь заполнена полностью. Если передать 0 (ноль) – то данная задача не будет блокироваться вовсе – “ну не шмогла я, не шмогла…“. Если передать макрос portMAX_DELAY – то задача, отправляющая данные будет ожидать появления свободного места бесконечно. Если необходимо ограничить время ожидания, используйте макросы pdMS_TO_TICKS ( время в мс ) или время в мс / portTICK_PERIOD_MS

Если удалось поместить данные в очередь, то эти функции вернут pdTRUE; если очередь заполнена и функция была завершена по истечении таймаута – pdFALSE.

С отправкой в очередь чисел обычно проблем не возникает, да и в стандартных примерах всё довольно подробно описано. А вот при пересылка через очередь каких-либо динамических строк поначалу может вызывать вопросы. Так как размер каждого элемента очереди строго фиксирован при её создании, необходимо пересылать не саму строку, а указатель на неё в памяти. Либо включить эту строку в состав структуры, а затем передавать уже саму структуру, как в примере ниже:

typedef struct {
  char* message;
  time_t timestamp;
} tgMessage_t;

static tgMessage_t tgMsg;

tgMsg.message = malloc_stringf("Код ошибки: %d", 101);
tgMsg.timestamp = time(NULL);

if (xQueueSend(_tgQueue, &tgMsg, pdMS_TO_TICKS(1000)) == pdPASS) {
  return true;
} else {
  ESP_LOGE(logTAG, "Failed to adding message to queue [ %s ]!", tgTaskName);
  return false;
};

Но не забудьте “на принимающей стороне” освободить память, если вы выделяли её перед отправкой, иначе будет так называемая утечка памяти.

 

Сервисные функции

Существует ещё несколько служебных функция, которые вы можете использовать по необходимости:

  • uxQueueMessagesWaiting(hQueue) – вернет количество элементов, которые ожидают обработки в очереди
  • uxQueueSpacesAvailable(hQueue) – вернет количество свободных мест в очереди
  • xQueueReset(hQueue) – сброс очереди в исходное состояние

Их можно использовать, например, чтобы понять, если ли какие-либо данные в очереди или можно ли писать в очередь без попытки записи.


А что с прерываниями?

Обработчики прерываний – весьма специфичные функции, поэтому для работы с очередями из обработчиков прерываний существуют отдельные варианты функций и макросов:

BaseType_t xQueueReceiveFromISR(QueueHandle_t xQueue, void *const pvBuffer, BaseType_t *const pxHigherPriorityTaskWoken)
BaseType_t xQueuePeekFromISR(QueueHandle_t xQueue, void *const pvBuffer)
BaseType_t xQueueSendToBackFromISR(xQueue, pvItemToQueue, pxHigherPriorityTaskWoken)
BaseType_t xQueueSendToFrontFromISR(xQueue, pvItemToQueue, pxHigherPriorityTaskWoken)
BaseType_t xQueueSendFromISR(xQueue, pvItemToQueue, pxHigherPriorityTaskWoken)
BaseType_t xQueueIsQueueEmptyFromISR(const QueueHandle_t xQueue)
BaseType_t xQueueIsQueueFullFromISR(const QueueHandle_t xQueue)
UBaseType_t uxQueueMessagesWaitingFromISR(const QueueHandle_t xQueue)

Я не буду повторно описывать все эти функции, так как они очень похожи на обычные версии, но обратите особое внимание на отличия:

  • отсутствует  аргумент xTicksToWait  – поскольку выполнение обработчиков прерываний должно занимать как можно меньше времени, поэтому ни о каких ожиданиях внутри прерываний и речи быть не может
  • появился новый аргумент pxHigherPriorityTaskWoken. А вот про него стоит рассказать чуть-чуть поподробнее, для чего он нужен.  Функция может вернуть в данном аргументе два состояния – pdTRUE или pdFALSE. Если вернулось pdTRUE, то это указывает на необходимо досрочно отдать управление планировщику. Допустим, в текущий момент выполняется задача с низким приоритетом, а высокоприоритетная ожидает сообщения из очереди и приостановлена. Далее происходит прерывание, из которого отправляется какое-либо сообщение в очередь. Но по окончании работы обработчика прерываний выполнение возвращается к текущей низкоприоритетной задаче, а высокоприоритетная все ещё ожидает, пока закончится текущий квант времени, и очередь всё ещё не обрабатывается. Однако если после этого передать управление планировщику досрочно через portYIELD_FROM_ISR, то он передаст управление ожидающей высокоприоритетной задаче. Это позволяет значительно сократить время реакции системы на прерывание.

 


Наборы очередей

Несколько задач могут одновременно писать в одну и ту же очередь – без вопросов, так и было задумано. Несколько задач могут ждать событий из одной и той же очереди – тоже не проблема. А как сделать так, чтобы одна и та же задача могла ждать событий сразу из нескольких очередей? Честно говоря, я пока не могу себе представить такую задачу, но API такое уже имеется – наборы очередей. Последовательность работы с ними такова:

  1. Создаем набор очередей с помощью xQueueCreateSet(uxEventQueueLength). Причем здесь uxEventQueueLength указывает максимальное количество событий, которые могут быть поставлены в очередь одновременно. Чтобы быть абсолютно уверенным, что события не будут потеряны, uxEventQueueLength должен быть установлен равным общей сумме длин очередей, добавленных в набор.
  2. Создаем очереди с помощью xQueueCreate(), а затем добавляем их в набор с помощью xQueueAddToSet().
  3. Ожидаем события в очереди с помощью xQueueSelectFromSet(). Если в любой из очередей имеются события, то эта функция вернет дескриптор очереди, которая содержит эти данные.

См. документацию на https://www.FreeRTOS.org/RTOS-queue-sets.html, чтобы узнать, почему наборы очередей очень редко нужны на практике, поскольку существуют более простые методы блокировки нескольких объектов.


На этом пока всё, до встречи на сайте и на dzen-канале!

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *