ESPHome 2025.7.1
Loading...
Searching...
No Matches
lock_free_queue.h
Go to the documentation of this file.
1#pragma once
2
3#if defined(USE_ESP32) || defined(USE_LIBRETINY)
4
5#include <atomic>
6#include <cstddef>
7
8#if defined(USE_ESP32)
9#include <freertos/FreeRTOS.h>
10#include <freertos/task.h>
11#elif defined(USE_LIBRETINY)
12#include <FreeRTOS.h>
13#include <task.h>
14#endif
15
16/*
17 * Lock-free queue for single-producer single-consumer scenarios.
18 * This allows one thread to push items and another to pop them without
19 * blocking each other.
20 *
21 * This is a Single-Producer Single-Consumer (SPSC) lock-free ring buffer.
22 * Available on platforms with FreeRTOS support (ESP32, LibreTiny).
23 *
24 * Common use cases:
25 * - BLE events: BLE task produces, main loop consumes
26 * - MQTT messages: main task produces, MQTT thread consumes
27 *
28 * @tparam T The type of elements stored in the queue (must be a pointer type)
29 * @tparam SIZE The maximum number of elements (1-255, limited by uint8_t indices)
30 */
31
32namespace esphome {
33
34// Base lock-free queue without task notification
35template<class T, uint8_t SIZE> class LockFreeQueue {
36 public:
38
39 bool push(T *element) {
40 bool was_empty;
41 uint8_t old_tail;
42 return push_internal_(element, was_empty, old_tail);
43 }
44
45 protected:
46 // Internal push that reports queue state - for use by derived classes
47 bool push_internal_(T *element, bool &was_empty, uint8_t &old_tail) {
48 if (element == nullptr)
49 return false;
50
51 uint8_t current_tail = tail_.load(std::memory_order_relaxed);
52 uint8_t next_tail = (current_tail + 1) % SIZE;
53
54 // Read head before incrementing tail
55 uint8_t head_before = head_.load(std::memory_order_acquire);
56
57 if (next_tail == head_before) {
58 // Buffer full
59 dropped_count_.fetch_add(1, std::memory_order_relaxed);
60 return false;
61 }
62
63 was_empty = (current_tail == head_before);
64 old_tail = current_tail;
65
66 buffer_[current_tail] = element;
67 tail_.store(next_tail, std::memory_order_release);
68
69 return true;
70 }
71
72 public:
73 T *pop() {
74 uint8_t current_head = head_.load(std::memory_order_relaxed);
75
76 if (current_head == tail_.load(std::memory_order_acquire)) {
77 return nullptr; // Empty
78 }
79
80 T *element = buffer_[current_head];
81 head_.store((current_head + 1) % SIZE, std::memory_order_release);
82 return element;
83 }
84
85 size_t size() const {
86 uint8_t tail = tail_.load(std::memory_order_acquire);
87 uint8_t head = head_.load(std::memory_order_acquire);
88 return (tail - head + SIZE) % SIZE;
89 }
90
91 uint16_t get_and_reset_dropped_count() { return dropped_count_.exchange(0, std::memory_order_relaxed); }
92
93 void increment_dropped_count() { dropped_count_.fetch_add(1, std::memory_order_relaxed); }
94
95 bool empty() const { return head_.load(std::memory_order_acquire) == tail_.load(std::memory_order_acquire); }
96
97 bool full() const {
98 uint8_t next_tail = (tail_.load(std::memory_order_relaxed) + 1) % SIZE;
99 return next_tail == head_.load(std::memory_order_acquire);
100 }
101
102 protected:
103 T *buffer_[SIZE];
104 // Atomic: written by producer (push/increment), read+reset by consumer (get_and_reset)
105 std::atomic<uint16_t> dropped_count_; // 65535 max - more than enough for drop tracking
106 // Atomic: written by consumer (pop), read by producer (push) to check if full
107 // Using uint8_t limits queue size to 255 elements but saves memory and ensures
108 // atomic operations are efficient on all platforms
109 std::atomic<uint8_t> head_;
110 // Atomic: written by producer (push), read by consumer (pop) to check if empty
111 std::atomic<uint8_t> tail_;
112};
113
114// Extended queue with task notification support
115template<class T, uint8_t SIZE> class NotifyingLockFreeQueue : public LockFreeQueue<T, SIZE> {
116 public:
117 NotifyingLockFreeQueue() : LockFreeQueue<T, SIZE>(), task_to_notify_(nullptr) {}
118
119 bool push(T *element) {
120 bool was_empty;
121 uint8_t old_tail;
122 bool result = this->push_internal_(element, was_empty, old_tail);
123
124 // Notify optimization: only notify if we need to
125 if (result && task_to_notify_ != nullptr &&
126 (was_empty || this->head_.load(std::memory_order_acquire) == old_tail)) {
127 // Notify in two cases:
128 // 1. Queue was empty - consumer might be going to sleep
129 // 2. Consumer just caught up to where tail was - might go to sleep
130 // Note: There's a benign race in case 2 - between reading head and calling
131 // xTaskNotifyGive(), the consumer could advance further. This would result
132 // in an unnecessary wake-up, but is harmless and extremely rare in practice.
133 xTaskNotifyGive(task_to_notify_);
134 }
135 // Otherwise: consumer is still behind, no need to notify
136
137 return result;
138 }
139
140 // Set the FreeRTOS task handle to notify when items are pushed to the queue
141 // This enables efficient wake-up of a consumer task that's waiting for data
142 // @param task The FreeRTOS task handle to notify, or nullptr to disable notifications
143 void set_task_to_notify(TaskHandle_t task) { task_to_notify_ = task; }
144
145 private:
146 TaskHandle_t task_to_notify_;
147};
148
149} // namespace esphome
150
151#endif // defined(USE_ESP32) || defined(USE_LIBRETINY)
uint16_t get_and_reset_dropped_count()
bool push(T *element)
std::atomic< uint16_t > dropped_count_
bool push_internal_(T *element, bool &was_empty, uint8_t &old_tail)
std::atomic< uint8_t > tail_
std::atomic< uint8_t > head_
void set_task_to_notify(TaskHandle_t task)
Providing packet encoding functions for exchanging data with a remote host.
Definition a01nyub.cpp:7