ESPHome 2026.3.0
Loading...
Searching...
No Matches
mqtt_backend_esp32.cpp
Go to the documentation of this file.
2
3#ifdef USE_MQTT
4#ifdef USE_ESP32
5
6#include <string>
8#include "esphome/core/log.h"
10
11namespace esphome::mqtt {
12
13static const char *const TAG = "mqtt.idf";
14
16 mqtt_cfg_.broker.address.hostname = this->host_.c_str();
17 mqtt_cfg_.broker.address.port = this->port_;
18 mqtt_cfg_.session.keepalive = this->keep_alive_;
19 mqtt_cfg_.session.disable_clean_session = !this->clean_session_;
20
21 if (!this->username_.empty()) {
22 mqtt_cfg_.credentials.username = this->username_.c_str();
23 if (!this->password_.empty()) {
24 mqtt_cfg_.credentials.authentication.password = this->password_.c_str();
25 }
26 }
27
28 if (!this->lwt_topic_.empty()) {
29 mqtt_cfg_.session.last_will.topic = this->lwt_topic_.c_str();
30 this->mqtt_cfg_.session.last_will.qos = this->lwt_qos_;
31 this->mqtt_cfg_.session.last_will.retain = this->lwt_retain_;
32
33 if (!this->lwt_message_.empty()) {
34 mqtt_cfg_.session.last_will.msg = this->lwt_message_.c_str();
35 mqtt_cfg_.session.last_will.msg_len = this->lwt_message_.size();
36 }
37 }
38
39 if (!this->client_id_.empty()) {
40 mqtt_cfg_.credentials.client_id = this->client_id_.c_str();
41 }
42 if (ca_certificate_.has_value()) {
43 mqtt_cfg_.broker.verification.certificate = ca_certificate_.value().c_str();
44 mqtt_cfg_.broker.verification.skip_cert_common_name_check = skip_cert_cn_check_;
45 mqtt_cfg_.broker.address.transport = MQTT_TRANSPORT_OVER_SSL;
46
47 if (this->cl_certificate_.has_value() && this->cl_key_.has_value()) {
48 mqtt_cfg_.credentials.authentication.certificate = this->cl_certificate_.value().c_str();
49 mqtt_cfg_.credentials.authentication.key = this->cl_key_.value().c_str();
50 }
51 } else {
52 mqtt_cfg_.broker.address.transport = MQTT_TRANSPORT_OVER_TCP;
53 }
54
55 auto *mqtt_client = esp_mqtt_client_init(&mqtt_cfg_);
56 if (mqtt_client) {
57 handler_.reset(mqtt_client);
58 is_initalized_ = true;
59 esp_mqtt_client_register_event(mqtt_client, MQTT_EVENT_ANY, mqtt_event_handler, this);
60#if defined(USE_MQTT_IDF_ENQUEUE)
61 // Create the task only after MQTT client is initialized successfully
62 // Use larger stack size when TLS is enabled
63 size_t stack_size = this->ca_certificate_.has_value() ? TASK_STACK_SIZE_TLS : TASK_STACK_SIZE;
64 xTaskCreate(esphome_mqtt_task, "esphome_mqtt", stack_size, (void *) this, TASK_PRIORITY, &this->task_handle_);
65 if (this->task_handle_ == nullptr) {
66 ESP_LOGE(TAG, "Failed to create MQTT task");
67 // Clean up MQTT client since we can't start the async task
68 handler_.reset();
69 is_initalized_ = false;
70 return false;
71 }
72 // Set the task handle so the queue can notify it
73 this->mqtt_queue_.set_task_to_notify(this->task_handle_);
74#endif
75 return true;
76 } else {
77 ESP_LOGE(TAG, "Failed to init client");
78 return false;
79 }
80}
81
83 // process new events
84 // handle only 1 message per loop iteration
85 Event *event = this->mqtt_event_queue_.pop();
86 if (event != nullptr) {
87 this->mqtt_event_handler_(*event);
88 this->mqtt_event_pool_.release(event);
89 }
90
91 // Log dropped inbound events (check is cheap - single atomic load in common case)
92 uint16_t inbound_dropped = this->mqtt_event_queue_.get_and_reset_dropped_count();
93 if (inbound_dropped > 0) {
94 ESP_LOGW(TAG, "Dropped %u inbound MQTT events", inbound_dropped);
95 }
96
97#if defined(USE_MQTT_IDF_ENQUEUE)
98 // Periodically log dropped messages to avoid blocking during spikes.
99 // During high load, many messages can be dropped in quick succession.
100 // Logging each drop immediately would flood the logs and potentially
101 // cause more drops if MQTT logging is enabled (cascade effect).
102 // Instead, we accumulate the count and log a summary periodically.
103 // IMPORTANT: Don't move this to the scheduler - if drops are due to memory
104 // pressure, the scheduler's heap allocations would make things worse.
106 // Handle rollover: (now - last_time) works correctly with unsigned arithmetic
107 // even when now < last_time due to rollover
108 if ((now - this->last_dropped_log_time_) >= DROP_LOG_INTERVAL_MS) {
109 uint16_t dropped = this->mqtt_queue_.get_and_reset_dropped_count();
110 if (dropped > 0) {
111 ESP_LOGW(TAG, "Dropped %u messages (%us)", dropped, DROP_LOG_INTERVAL_MS / 1000);
112 }
113 this->last_dropped_log_time_ = now;
114 }
115#endif
116}
117
119 ESP_LOGV(TAG, "Event dispatched from event loop event_id=%d", event.event_id);
120 switch (event.event_id) {
121 case MQTT_EVENT_BEFORE_CONNECT:
122 ESP_LOGV(TAG, "MQTT_EVENT_BEFORE_CONNECT");
123 break;
124
125 case MQTT_EVENT_CONNECTED:
126 ESP_LOGV(TAG, "MQTT_EVENT_CONNECTED");
127 this->is_connected_ = true;
128#if defined(USE_MQTT_IDF_ENQUEUE)
129 this->last_dropped_log_time_ = 0;
130 xTaskNotifyGive(this->task_handle_);
131#endif
132 this->on_connect_.call(event.session_present);
133 break;
134 case MQTT_EVENT_DISCONNECTED:
135 ESP_LOGV(TAG, "MQTT_EVENT_DISCONNECTED");
136 // TODO is there a way to get the disconnect reason?
137 this->is_connected_ = false;
138#if defined(USE_MQTT_IDF_ENQUEUE)
139 this->last_dropped_log_time_ = 0;
140 xTaskNotifyGive(this->task_handle_);
141#endif
143 break;
144
145 case MQTT_EVENT_SUBSCRIBED:
146 ESP_LOGV(TAG, "MQTT_EVENT_SUBSCRIBED, msg_id=%d", event.msg_id);
147 // hardcode QoS to 0. QoS is not used in this context but required to mirror the AsyncMqtt interface
148 this->on_subscribe_.call((int) event.msg_id, 0);
149 break;
150 case MQTT_EVENT_UNSUBSCRIBED:
151 ESP_LOGV(TAG, "MQTT_EVENT_UNSUBSCRIBED, msg_id=%d", event.msg_id);
152 this->on_unsubscribe_.call((int) event.msg_id);
153 break;
154 case MQTT_EVENT_PUBLISHED:
155 ESP_LOGV(TAG, "MQTT_EVENT_PUBLISHED, msg_id=%d", event.msg_id);
156 this->on_publish_.call((int) event.msg_id);
157 break;
158 case MQTT_EVENT_DATA: {
159 if (!event.topic.empty()) {
160 // When a single message arrives as multiple chunks, the topic will be empty
161 // on any but the first message, leading to event.topic being an empty string.
162 // To ensure handlers get the correct topic, cache the last seen topic to
163 // simulate always receiving the topic from underlying library
164 this->cached_topic_ = event.topic;
165 }
166 ESP_LOGV(TAG, "MQTT_EVENT_DATA %s", this->cached_topic_.c_str());
167 this->on_message_.call(this->cached_topic_.c_str(), event.data.data(), event.data.size(),
168 event.current_data_offset, event.total_data_len);
169 } break;
170 case MQTT_EVENT_ERROR:
171 ESP_LOGE(TAG, "MQTT_EVENT_ERROR");
172 if (event.error_handle.error_type == MQTT_ERROR_TYPE_TCP_TRANSPORT) {
173 ESP_LOGE(TAG, "Last esp-tls error: 0x%x, tls stack error: 0x%x, socket errno: %d (%s)",
174 event.error_handle.esp_tls_last_esp_err, event.error_handle.esp_tls_stack_err,
175 event.error_handle.esp_transport_sock_errno, strerror(event.error_handle.esp_transport_sock_errno));
176 } else if (event.error_handle.error_type == MQTT_ERROR_TYPE_CONNECTION_REFUSED) {
177 ESP_LOGE(TAG, "Connection refused error: 0x%x", event.error_handle.connect_return_code);
178 } else {
179 ESP_LOGE(TAG, "Unknown error type: 0x%x", event.error_handle.error_type);
180 }
181 break;
182 default:
183 ESP_LOGV(TAG, "Other event id:%d", event.event_id);
184 break;
185 }
186}
187
189void MQTTBackendESP32::mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id,
190 void *event_data) {
191 MQTTBackendESP32 *instance = static_cast<MQTTBackendESP32 *>(handler_args);
192 // queue event to decouple processing from ESP-IDF MQTT task to main loop
193 if (instance) {
194 auto *event = instance->mqtt_event_pool_.allocate();
195 if (event == nullptr) {
196 // Pool exhausted, drop event (counted via queue's dropped counter)
197 instance->mqtt_event_queue_.increment_dropped_count();
198 return;
199 }
200 event->populate(*static_cast<esp_mqtt_event_t *>(event_data));
201 // Push always succeeds: pool is sized to queue capacity (SIZE-1), so if
202 // allocate() returned non-null, the queue cannot be full.
203 instance->mqtt_event_queue_.push(event);
204
205 // Wake main loop immediately to process MQTT event instead of waiting for select() timeout
206#if defined(USE_SOCKET_SELECT_SUPPORT) && defined(USE_WAKE_LOOP_THREADSAFE)
208#endif
209 }
210}
211
212#if defined(USE_MQTT_IDF_ENQUEUE)
214 MQTTBackendESP32 *this_mqtt = (MQTTBackendESP32 *) params;
215
216 while (true) {
217 // Wait for notification indefinitely
218 ulTaskNotifyTake(pdTRUE, portMAX_DELAY);
219
220 // Process all queued items
221 struct QueueElement *elem;
222 while ((elem = this_mqtt->mqtt_queue_.pop()) != nullptr) {
223 if (this_mqtt->is_connected_) {
224 switch (elem->type) {
226 esp_mqtt_client_subscribe(this_mqtt->handler_.get(), elem->topic, elem->qos);
227 break;
228
230 esp_mqtt_client_unsubscribe(this_mqtt->handler_.get(), elem->topic);
231 break;
232
234 esp_mqtt_client_publish(this_mqtt->handler_.get(), elem->topic, elem->payload, elem->payload_len, elem->qos,
235 elem->retain);
236 break;
237
238 default:
239 ESP_LOGE(TAG, "Invalid operation type from MQTT queue");
240 break;
241 }
242 }
243 this_mqtt->mqtt_outbound_pool_.release(elem);
244 }
245 }
246}
247
248bool MQTTBackendESP32::enqueue_(MqttQueueTypeT type, const char *topic, int qos, bool retain, const char *payload,
249 size_t len) {
250 auto *elem = this->mqtt_outbound_pool_.allocate();
251
252 if (!elem) {
253 // Queue is full - increment counter but don't log immediately.
254 // Logging here can cause a cascade effect: if MQTT logging is enabled,
255 // each dropped message would generate a log message, which could itself
256 // be sent via MQTT, causing more drops and more logs in a feedback loop
257 // that eventually triggers a watchdog reset. Instead, we log periodically
258 // in loop() to prevent blocking the event loop during spikes.
259 this->mqtt_queue_.increment_dropped_count();
260 return false;
261 }
262
263 elem->type = type;
264 elem->qos = qos;
265 elem->retain = retain;
266
267 // Use the helper to allocate and copy data
268 if (!elem->set_data(topic, payload, len)) {
269 // Allocation failed, return elem to pool
270 this->mqtt_outbound_pool_.release(elem);
271 // Increment counter without logging to avoid cascade effect during memory pressure
272 this->mqtt_queue_.increment_dropped_count();
273 return false;
274 }
275
276 // Push to queue - always succeeds since we allocated from the pool
277 this->mqtt_queue_.push(elem);
278 return true;
279}
280#endif // USE_MQTT_IDF_ENQUEUE
281
282} // namespace esphome::mqtt
283#endif // USE_ESP32
284#endif
void wake_loop_threadsafe()
Wake the main event loop from another FreeRTOS task.
uint32_t IRAM_ATTR HOT get_loop_component_start_time() const
Get the cached time in milliseconds from when the current component started its loop execution.
EventPool< Event, MQTT_EVENT_QUEUE_LENGTH - 1 > mqtt_event_pool_
EventPool< struct QueueElement, MQTT_QUEUE_LENGTH - 1 > mqtt_outbound_pool_
CallbackManager< on_connect_callback_t > on_connect_
CallbackManager< on_disconnect_callback_t > on_disconnect_
CallbackManager< on_message_callback_t > on_message_
static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data)
static - Dispatch event to instance method
optional< std::string > ca_certificate_
CallbackManager< on_subscribe_callback_t > on_subscribe_
static void esphome_mqtt_task(void *params)
bool enqueue_(MqttQueueTypeT type, const char *topic, int qos=0, bool retain=false, const char *payload=NULL, size_t len=0)
static constexpr size_t TASK_STACK_SIZE_TLS
CallbackManager< on_unsubscribe_callback_t > on_unsubscribe_
esp_mqtt_client_config_t mqtt_cfg_
static constexpr size_t TASK_STACK_SIZE
CallbackManager< on_publish_user_callback_t > on_publish_
static constexpr uint32_t DROP_LOG_INTERVAL_MS
NotifyingLockFreeQueue< struct QueueElement, MQTT_QUEUE_LENGTH > mqtt_queue_
void mqtt_event_handler_(const Event &event)
optional< std::string > cl_certificate_
static constexpr ssize_t TASK_PRIORITY
LockFreeQueue< Event, MQTT_EVENT_QUEUE_LENGTH > mqtt_event_queue_
uint16_t type
std::string size_t len
Definition helpers.h:892
Application App
Global storage of Application pointer - only one Application can exist.
uint8_t event_id
Definition tt21100.cpp:3