ESPHome 2026.3.3
Loading...
Searching...
No Matches
mqtt_backend_esp32.h
Go to the documentation of this file.
1#pragma once
2
3#include "mqtt_backend.h"
4#ifdef USE_MQTT
5#ifdef USE_ESP32
6
7#include <string>
8#include <cstring>
9#include <mqtt_client.h>
10#include <freertos/FreeRTOS.h>
11#include <freertos/task.h>
16
17namespace esphome::mqtt {
18
19struct Event {
20 esp_mqtt_event_id_t event_id{};
21 std::vector<char> data;
24 std::string topic;
25 int msg_id{0};
26 bool retain{false};
27 int qos{0};
28 bool dup{false};
29 bool session_present{false};
30 esp_mqtt_error_codes_t error_handle{};
31
32 // Populate from esp_mqtt_event_t
33 // Copies pointer-based data to owned storage for safe cross-thread transfer
34 void populate(const esp_mqtt_event_t &event) {
35 this->event_id = event.event_id;
36 this->data.assign(event.data, event.data + event.data_len);
37 this->total_data_len = event.total_data_len;
38 this->current_data_offset = event.current_data_offset;
39 this->topic.assign(event.topic, event.topic_len);
40 this->msg_id = event.msg_id;
41 this->retain = event.retain;
42 this->qos = event.qos;
43 this->dup = event.dup;
44 this->session_present = event.session_present;
45 this->error_handle = *event.error_handle;
46 }
47
48 // Release owned resources for pool reuse (keeps allocated capacity for efficiency)
49 void release() {
50 this->data.clear();
51 this->topic.clear();
52 }
53};
54
61
63 char *topic;
64 char *payload;
65 uint16_t payload_len; // MQTT max payload is 64KiB
66 uint8_t type : 2;
67 uint8_t qos : 2; // QoS only needs values 0-2
68 uint8_t retain : 1;
69 uint8_t reserved : 3; // Reserved for future use
70
71 QueueElement() : topic(nullptr), payload(nullptr), payload_len(0), qos(0), retain(0), reserved(0) {}
72
73 // Helper to set topic/payload (uses RAMAllocator)
74 bool set_data(const char *topic_str, const char *payload_data, size_t len) {
75 // Check payload size limit (MQTT max is 64KiB)
76 if (len > std::numeric_limits<uint16_t>::max()) {
77 return false;
78 }
79
80 // Use RAMAllocator with default flags (tries external RAM first, falls back to internal)
81 RAMAllocator<char> allocator;
82
83 // Allocate and copy topic
84 size_t topic_len = strlen(topic_str) + 1;
85 topic = allocator.allocate(topic_len);
86 if (!topic)
87 return false;
88 memcpy(topic, topic_str, topic_len);
89
90 if (payload_data && len) {
91 payload = allocator.allocate(len);
92 if (!payload) {
93 allocator.deallocate(topic, topic_len);
94 topic = nullptr;
95 return false;
96 }
97 memcpy(payload, payload_data, len);
98 payload_len = static_cast<uint16_t>(len);
99 } else {
100 payload = nullptr;
101 payload_len = 0;
102 }
103 return true;
104 }
105
106 // Helper to release (uses RAMAllocator)
107 void release() {
108 RAMAllocator<char> allocator;
109 if (topic) {
110 allocator.deallocate(topic, strlen(topic) + 1);
111 topic = nullptr;
112 }
113 if (payload) {
114 allocator.deallocate(payload, payload_len);
115 payload = nullptr;
116 }
117 payload_len = 0;
118 }
119};
120
121class MQTTBackendESP32 final : public MQTTBackend {
122 public:
123 static constexpr size_t MQTT_BUFFER_SIZE = 4096;
124 static constexpr size_t TASK_STACK_SIZE = 3072;
125 static constexpr size_t TASK_STACK_SIZE_TLS = 4096; // Larger stack for TLS operations
126 static constexpr ssize_t TASK_PRIORITY = 5;
127 static constexpr uint8_t MQTT_QUEUE_LENGTH = 30; // 30*12 bytes = 360
128 static constexpr uint8_t MQTT_EVENT_QUEUE_LENGTH = 32; // Inbound events from broker
129
130 void set_keep_alive(uint16_t keep_alive) final { this->keep_alive_ = keep_alive; }
131 void set_client_id(const char *client_id) final { this->client_id_ = client_id; }
132 void set_clean_session(bool clean_session) final { this->clean_session_ = clean_session; }
133
134 void set_credentials(const char *username, const char *password) final {
135 if (username)
136 this->username_ = username;
137 if (password)
138 this->password_ = password;
139 }
140 void set_will(const char *topic, uint8_t qos, bool retain, const char *payload) final {
141 if (topic)
142 this->lwt_topic_ = topic;
143 this->lwt_qos_ = qos;
144 if (payload)
145 this->lwt_message_ = payload;
146 this->lwt_retain_ = retain;
147 }
148 void set_server(network::IPAddress ip, uint16_t port) final {
149 char ip_buf[network::IP_ADDRESS_BUFFER_SIZE];
150 this->host_ = ip.str_to(ip_buf);
151 this->port_ = port;
152 }
153 void set_server(const char *host, uint16_t port) final {
154 this->host_ = host;
155 this->port_ = port;
156 }
157 void set_on_connect(std::function<on_connect_callback_t> &&callback) final {
158 this->on_connect_.add(std::move(callback));
159 }
160 void set_on_disconnect(std::function<on_disconnect_callback_t> &&callback) final {
161 this->on_disconnect_.add(std::move(callback));
162 }
163 void set_on_subscribe(std::function<on_subscribe_callback_t> &&callback) final {
164 this->on_subscribe_.add(std::move(callback));
165 }
166 void set_on_unsubscribe(std::function<on_unsubscribe_callback_t> &&callback) final {
167 this->on_unsubscribe_.add(std::move(callback));
168 }
169 void set_on_message(std::function<on_message_callback_t> &&callback) final {
170 this->on_message_.add(std::move(callback));
171 }
172 void set_on_publish(std::function<on_publish_user_callback_t> &&callback) final {
173 this->on_publish_.add(std::move(callback));
174 }
175 bool connected() const final { return this->is_connected_; }
176
177 void connect() final {
178 if (!is_initalized_) {
179 if (initialize_()) {
180 esp_mqtt_client_start(handler_.get());
181 }
182 }
183 }
184 void disconnect() final {
185 if (is_initalized_)
186 esp_mqtt_client_disconnect(handler_.get());
187 }
188
189 bool subscribe(const char *topic, uint8_t qos) final {
190#if defined(USE_MQTT_IDF_ENQUEUE)
191 return enqueue_(MQTT_QUEUE_TYPE_SUBSCRIBE, topic, qos);
192#else
193 return esp_mqtt_client_subscribe(handler_.get(), topic, qos) != -1;
194#endif
195 }
196 bool unsubscribe(const char *topic) final {
197#if defined(USE_MQTT_IDF_ENQUEUE)
199#else
200 return esp_mqtt_client_unsubscribe(handler_.get(), topic) != -1;
201#endif
202 }
203
204 bool publish(const char *topic, const char *payload, size_t length, uint8_t qos, bool retain) final {
205#if defined(USE_MQTT_IDF_ENQUEUE)
206 return enqueue_(MQTT_QUEUE_TYPE_PUBLISH, topic, qos, retain, payload, length);
207#else
208 // might block for several seconds, either due to network timeout (10s)
209 // or if publishing payloads longer than internal buffer (due to message fragmentation)
210 return esp_mqtt_client_publish(handler_.get(), topic, payload, length, qos, retain) != -1;
211#endif
212 }
214
215 void loop() final;
216
217 void set_ca_certificate(const std::string &cert) { ca_certificate_ = cert; }
218 void set_cl_certificate(const std::string &cert) { cl_certificate_ = cert; }
219 void set_cl_key(const std::string &key) { cl_key_ = key; }
220 void set_skip_cert_cn_check(bool skip_check) { skip_cert_cn_check_ = skip_check; }
221
222 // No destructor needed: ESPHome components live for the entire device runtime.
223 // The MQTT task and queue will run until the device reboots or loses power,
224 // at which point the entire process terminates and FreeRTOS cleans up all tasks.
225 // Implementing a destructor would add complexity and potential race conditions
226 // for a scenario that never occurs in practice.
227
228 protected:
229 bool initialize_();
230 void mqtt_event_handler_(const Event &event);
231 static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data);
232
234 void operator()(esp_mqtt_client *client_handler) { esp_mqtt_client_destroy(client_handler); }
235 };
236 using ClientHandler_ = std::unique_ptr<esp_mqtt_client, MqttClientDeleter>;
238
239 bool is_connected_{false};
240 bool is_initalized_{false};
241
242 esp_mqtt_client_config_t mqtt_cfg_{};
243
244 std::string host_;
245 uint16_t port_;
246 std::string username_;
247 std::string password_;
248 std::string lwt_topic_;
249 std::string lwt_message_;
250 uint8_t lwt_qos_;
252 std::string client_id_;
253 uint16_t keep_alive_;
255 optional<std::string> ca_certificate_;
256 optional<std::string> cl_certificate_;
257 optional<std::string> cl_key_;
259#if defined(USE_MQTT_IDF_ENQUEUE)
260 static void esphome_mqtt_task(void *params);
261 // Pool sized to queue capacity (SIZE-1) — see mqtt_event_pool_ comment.
264 TaskHandle_t task_handle_{nullptr};
265 bool enqueue_(MqttQueueTypeT type, const char *topic, int qos = 0, bool retain = false, const char *payload = NULL,
266 size_t len = 0);
267#endif
268
269 // callbacks
276 std::string cached_topic_;
277 // Pool sized to queue capacity (SIZE-1) because LockFreeQueue<T,N> is a ring
278 // buffer that holds N-1 elements (one slot distinguishes full from empty).
279 // This guarantees allocate() returns nullptr before push() can fail, which:
280 // 1. Prevents leaking a pool slot (the Nth allocate succeeds but push fails)
281 // 2. Avoids needing release() on the producer path after a failed push(),
282 // preserving the SPSC contract on the pool's internal free list
285
286#if defined(USE_MQTT_IDF_ENQUEUE)
288 static constexpr uint32_t DROP_LOG_INTERVAL_MS = 10000; // Log every 10 seconds
289#endif
290};
291
292} // namespace esphome::mqtt
293
294#endif
295#endif
An STL allocator that uses SPI or internal RAM.
Definition helpers.h:1899
void deallocate(T *p, size_t n)
Definition helpers.h:1954
T * allocate(size_t n)
Definition helpers.h:1916
EventPool< Event, MQTT_EVENT_QUEUE_LENGTH - 1 > mqtt_event_pool_
void set_keep_alive(uint16_t keep_alive) final
EventPool< struct QueueElement, MQTT_QUEUE_LENGTH - 1 > mqtt_outbound_pool_
void set_on_message(std::function< on_message_callback_t > &&callback) final
CallbackManager< on_connect_callback_t > on_connect_
CallbackManager< on_disconnect_callback_t > on_disconnect_
static constexpr size_t MQTT_BUFFER_SIZE
void set_ca_certificate(const std::string &cert)
CallbackManager< on_message_callback_t > on_message_
static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data)
static - Dispatch event to instance method
void set_client_id(const char *client_id) final
optional< std::string > ca_certificate_
CallbackManager< on_subscribe_callback_t > on_subscribe_
static void esphome_mqtt_task(void *params)
void set_on_publish(std::function< on_publish_user_callback_t > &&callback) final
bool enqueue_(MqttQueueTypeT type, const char *topic, int qos=0, bool retain=false, const char *payload=NULL, size_t len=0)
void set_server(const char *host, uint16_t port) final
static constexpr uint8_t MQTT_EVENT_QUEUE_LENGTH
void set_cl_key(const std::string &key)
void set_on_connect(std::function< on_connect_callback_t > &&callback) final
void set_will(const char *topic, uint8_t qos, bool retain, const char *payload) final
bool subscribe(const char *topic, uint8_t qos) final
void set_server(network::IPAddress ip, uint16_t port) final
static constexpr size_t TASK_STACK_SIZE_TLS
void set_cl_certificate(const std::string &cert)
CallbackManager< on_unsubscribe_callback_t > on_unsubscribe_
esp_mqtt_client_config_t mqtt_cfg_
void set_skip_cert_cn_check(bool skip_check)
void set_on_unsubscribe(std::function< on_unsubscribe_callback_t > &&callback) final
bool publish(const char *topic, const char *payload, size_t length, uint8_t qos, bool retain) final
static constexpr size_t TASK_STACK_SIZE
CallbackManager< on_publish_user_callback_t > on_publish_
void set_clean_session(bool clean_session) final
bool unsubscribe(const char *topic) final
static constexpr uint32_t DROP_LOG_INTERVAL_MS
NotifyingLockFreeQueue< struct QueueElement, MQTT_QUEUE_LENGTH > mqtt_queue_
static constexpr uint8_t MQTT_QUEUE_LENGTH
void set_on_subscribe(std::function< on_subscribe_callback_t > &&callback) final
void mqtt_event_handler_(const Event &event)
optional< std::string > cl_certificate_
void set_on_disconnect(std::function< on_disconnect_callback_t > &&callback) final
void set_credentials(const char *username, const char *password) final
static constexpr ssize_t TASK_PRIORITY
std::unique_ptr< esp_mqtt_client, MqttClientDeleter > ClientHandler_
LockFreeQueue< Event, MQTT_EVENT_QUEUE_LENGTH > mqtt_event_queue_
virtual bool publish(const char *topic, const char *payload, size_t length, uint8_t qos, bool retain)=0
uint16_t type
__int64 ssize_t
Definition httplib.h:178
std::string size_t len
Definition helpers.h:892
std::vector< char > data
esp_mqtt_error_codes_t error_handle
void populate(const esp_mqtt_event_t &event)
void operator()(esp_mqtt_client *client_handler)
bool set_data(const char *topic_str, const char *payload_data, size_t len)
uint16_t length
Definition tt21100.cpp:0
uint8_t event_id
Definition tt21100.cpp:3