ESPHome 2026.3.3
Loading...
Searching...
No Matches
lock_free_queue.h
Go to the documentation of this file.
1#pragma once
2
3#include <atomic>
4#include <cstddef>
5
6#ifdef USE_ESP32
7#include <freertos/FreeRTOS.h>
8#include <freertos/task.h>
9#endif
10
11/*
12 * Lock-free queue for single-producer single-consumer scenarios.
13 * This allows one thread to push items and another to pop them without
14 * blocking each other.
15 *
16 * This is a Single-Producer Single-Consumer (SPSC) lock-free ring buffer.
17 * Available on platforms with FreeRTOS support (ESP32, LibreTiny).
18 *
19 * Common use cases:
20 * - BLE events: BLE task produces, main loop consumes
21 * - MQTT messages: main task produces, MQTT thread consumes
22 *
23 * @tparam T The type of elements stored in the queue (must be a pointer type)
24 * @tparam SIZE The maximum number of elements (1-255, limited by uint8_t indices)
25 */
26
27namespace esphome {
28
29// Base lock-free queue without task notification
30template<class T, uint8_t SIZE> class LockFreeQueue {
31 public:
33
34 bool push(T *element) {
35 bool was_empty;
36 uint8_t old_tail;
37 return push_internal_(element, was_empty, old_tail);
38 }
39
40 protected:
41 // Advance ring buffer index by one, wrapping at SIZE.
42 // Power-of-2 sizes use modulo (compiler emits single mask instruction).
43 // Non-power-of-2 sizes use comparison to avoid expensive multiply-shift sequences.
44 static constexpr uint8_t next_index(uint8_t index) {
45 if constexpr ((SIZE & (SIZE - 1)) == 0) {
46 return (index + 1) % SIZE;
47 } else {
48 uint8_t next = index + 1;
49 if (next >= SIZE) [[unlikely]]
50 next = 0;
51 return next;
52 }
53 }
54
55 // Internal push that reports queue state - for use by derived classes
56 bool push_internal_(T *element, bool &was_empty, uint8_t &old_tail) {
57 if (element == nullptr)
58 return false;
59
60 uint8_t current_tail = tail_.load(std::memory_order_relaxed);
61 uint8_t next_tail = next_index(current_tail);
62
63 // Read head before incrementing tail
64 uint8_t head_before = head_.load(std::memory_order_acquire);
65
66 if (next_tail == head_before) {
67 // Buffer full
68 dropped_count_.fetch_add(1, std::memory_order_relaxed);
69 return false;
70 }
71
72 was_empty = (current_tail == head_before);
73 old_tail = current_tail;
74
75 buffer_[current_tail] = element;
76 tail_.store(next_tail, std::memory_order_release);
77
78 return true;
79 }
80
81 public:
82 T *pop() {
83 uint8_t current_head = head_.load(std::memory_order_relaxed);
84
85 if (current_head == tail_.load(std::memory_order_acquire)) {
86 return nullptr; // Empty
87 }
88
89 T *element = buffer_[current_head];
90 head_.store(next_index(current_head), std::memory_order_release);
91 return element;
92 }
93
94 size_t size() const {
95 uint8_t tail = tail_.load(std::memory_order_acquire);
96 uint8_t head = head_.load(std::memory_order_acquire);
97 if constexpr ((SIZE & (SIZE - 1)) == 0) {
98 return (tail - head + SIZE) % SIZE;
99 } else {
100 int diff = static_cast<int>(tail) - static_cast<int>(head);
101 if (diff < 0)
102 diff += SIZE;
103 return static_cast<size_t>(diff);
104 }
105 }
106
108 // Fast path: relaxed load is a single instruction on all platforms.
109 // The atomic exchange (especially for uint16_t on Xtensa) compiles to
110 // an expensive sub-word CAS retry loop (~25 instructions + memory barriers).
111 // Since drops are rare, avoid the exchange in the common case.
112 if (dropped_count_.load(std::memory_order_relaxed) == 0)
113 return 0;
114 return dropped_count_.exchange(0, std::memory_order_relaxed);
115 }
116
117 void increment_dropped_count() { dropped_count_.fetch_add(1, std::memory_order_relaxed); }
118
119 bool empty() const { return head_.load(std::memory_order_acquire) == tail_.load(std::memory_order_acquire); }
120
121 bool full() const {
122 uint8_t next_tail = next_index(tail_.load(std::memory_order_relaxed));
123 return next_tail == head_.load(std::memory_order_acquire);
124 }
125
126 protected:
127 T *buffer_[SIZE]{};
128 // Atomic: written by producer (push/increment), read+reset by consumer (get_and_reset)
129 std::atomic<uint16_t> dropped_count_; // 65535 max - more than enough for drop tracking
130 // Atomic: written by consumer (pop), read by producer (push) to check if full
131 // Using uint8_t limits queue size to 255 elements but saves memory and ensures
132 // atomic operations are efficient on all platforms
133 std::atomic<uint8_t> head_;
134 // Atomic: written by producer (push), read by consumer (pop) to check if empty
135 std::atomic<uint8_t> tail_;
136};
137
138#ifdef USE_ESP32
139// Extended queue with task notification support
140template<class T, uint8_t SIZE> class NotifyingLockFreeQueue : public LockFreeQueue<T, SIZE> {
141 public:
142 NotifyingLockFreeQueue() : LockFreeQueue<T, SIZE>(), task_to_notify_(nullptr) {}
143
144 bool push(T *element) {
145 bool was_empty;
146 uint8_t old_tail;
147 bool result = this->push_internal_(element, was_empty, old_tail);
148
149 // Notify optimization: only notify if we need to
150 if (result && task_to_notify_ != nullptr &&
151 (was_empty || this->head_.load(std::memory_order_acquire) == old_tail)) {
152 // Notify in two cases:
153 // 1. Queue was empty - consumer might be going to sleep
154 // 2. Consumer just caught up to where tail was - might go to sleep
155 // Note: There's a benign race in case 2 - between reading head and calling
156 // xTaskNotifyGive(), the consumer could advance further. This would result
157 // in an unnecessary wake-up, but is harmless and extremely rare in practice.
158 xTaskNotifyGive(task_to_notify_);
159 }
160 // Otherwise: consumer is still behind, no need to notify
161
162 return result;
163 }
164
165 // Set the FreeRTOS task handle to notify when items are pushed to the queue
166 // This enables efficient wake-up of a consumer task that's waiting for data
167 // @param task The FreeRTOS task handle to notify, or nullptr to disable notifications
168 void set_task_to_notify(TaskHandle_t task) { task_to_notify_ = task; }
169
170 private:
171 TaskHandle_t task_to_notify_;
172};
173#endif
174
175} // namespace esphome
uint16_t get_and_reset_dropped_count()
bool push(T *element)
static constexpr uint8_t next_index(uint8_t index)
std::atomic< uint16_t > dropped_count_
bool push_internal_(T *element, bool &was_empty, uint8_t &old_tail)
std::atomic< uint8_t > tail_
std::atomic< uint8_t > head_
void set_task_to_notify(TaskHandle_t task)
Providing packet encoding functions for exchanging data with a remote host.
Definition a01nyub.cpp:7