ESPHome 2026.1.5
Loading...
Searching...
No Matches
mqtt_backend_esp32.cpp
Go to the documentation of this file.
2
3#ifdef USE_MQTT
4#ifdef USE_ESP32
5
6#include <string>
8#include "esphome/core/log.h"
10
11namespace esphome::mqtt {
12
13static const char *const TAG = "mqtt.idf";
14
16 mqtt_cfg_.broker.address.hostname = this->host_.c_str();
17 mqtt_cfg_.broker.address.port = this->port_;
18 mqtt_cfg_.session.keepalive = this->keep_alive_;
19 mqtt_cfg_.session.disable_clean_session = !this->clean_session_;
20
21 if (!this->username_.empty()) {
22 mqtt_cfg_.credentials.username = this->username_.c_str();
23 if (!this->password_.empty()) {
24 mqtt_cfg_.credentials.authentication.password = this->password_.c_str();
25 }
26 }
27
28 if (!this->lwt_topic_.empty()) {
29 mqtt_cfg_.session.last_will.topic = this->lwt_topic_.c_str();
30 this->mqtt_cfg_.session.last_will.qos = this->lwt_qos_;
31 this->mqtt_cfg_.session.last_will.retain = this->lwt_retain_;
32
33 if (!this->lwt_message_.empty()) {
34 mqtt_cfg_.session.last_will.msg = this->lwt_message_.c_str();
35 mqtt_cfg_.session.last_will.msg_len = this->lwt_message_.size();
36 }
37 }
38
39 if (!this->client_id_.empty()) {
40 mqtt_cfg_.credentials.client_id = this->client_id_.c_str();
41 }
43 mqtt_cfg_.broker.verification.certificate = ca_certificate_.value().c_str();
44 mqtt_cfg_.broker.verification.skip_cert_common_name_check = skip_cert_cn_check_;
45 mqtt_cfg_.broker.address.transport = MQTT_TRANSPORT_OVER_SSL;
46
47 if (this->cl_certificate_.has_value() && this->cl_key_.has_value()) {
48 mqtt_cfg_.credentials.authentication.certificate = this->cl_certificate_.value().c_str();
49 mqtt_cfg_.credentials.authentication.key = this->cl_key_.value().c_str();
50 }
51 } else {
52 mqtt_cfg_.broker.address.transport = MQTT_TRANSPORT_OVER_TCP;
53 }
54
55 auto *mqtt_client = esp_mqtt_client_init(&mqtt_cfg_);
56 if (mqtt_client) {
57 handler_.reset(mqtt_client);
58 is_initalized_ = true;
59 esp_mqtt_client_register_event(mqtt_client, MQTT_EVENT_ANY, mqtt_event_handler, this);
60#if defined(USE_MQTT_IDF_ENQUEUE)
61 // Create the task only after MQTT client is initialized successfully
62 // Use larger stack size when TLS is enabled
63 size_t stack_size = this->ca_certificate_.has_value() ? TASK_STACK_SIZE_TLS : TASK_STACK_SIZE;
64 xTaskCreate(esphome_mqtt_task, "esphome_mqtt", stack_size, (void *) this, TASK_PRIORITY, &this->task_handle_);
65 if (this->task_handle_ == nullptr) {
66 ESP_LOGE(TAG, "Failed to create MQTT task");
67 // Clean up MQTT client since we can't start the async task
68 handler_.reset();
69 is_initalized_ = false;
70 return false;
71 }
72 // Set the task handle so the queue can notify it
73 this->mqtt_queue_.set_task_to_notify(this->task_handle_);
74#endif
75 return true;
76 } else {
77 ESP_LOGE(TAG, "Failed to init client");
78 return false;
79 }
80}
81
83 // process new events
84 // handle only 1 message per loop iteration
85 if (!mqtt_events_.empty()) {
86 auto &event = mqtt_events_.front();
88 mqtt_events_.pop();
89 }
90
91#if defined(USE_MQTT_IDF_ENQUEUE)
92 // Periodically log dropped messages to avoid blocking during spikes.
93 // During high load, many messages can be dropped in quick succession.
94 // Logging each drop immediately would flood the logs and potentially
95 // cause more drops if MQTT logging is enabled (cascade effect).
96 // Instead, we accumulate the count and log a summary periodically.
97 // IMPORTANT: Don't move this to the scheduler - if drops are due to memory
98 // pressure, the scheduler's heap allocations would make things worse.
100 // Handle rollover: (now - last_time) works correctly with unsigned arithmetic
101 // even when now < last_time due to rollover
102 if ((now - this->last_dropped_log_time_) >= DROP_LOG_INTERVAL_MS) {
103 uint16_t dropped = this->mqtt_queue_.get_and_reset_dropped_count();
104 if (dropped > 0) {
105 ESP_LOGW(TAG, "Dropped %u messages (%us)", dropped, DROP_LOG_INTERVAL_MS / 1000);
106 }
107 this->last_dropped_log_time_ = now;
108 }
109#endif
110}
111
113 ESP_LOGV(TAG, "Event dispatched from event loop event_id=%d", event.event_id);
114 switch (event.event_id) {
115 case MQTT_EVENT_BEFORE_CONNECT:
116 ESP_LOGV(TAG, "MQTT_EVENT_BEFORE_CONNECT");
117 break;
118
119 case MQTT_EVENT_CONNECTED:
120 ESP_LOGV(TAG, "MQTT_EVENT_CONNECTED");
121 this->is_connected_ = true;
122#if defined(USE_MQTT_IDF_ENQUEUE)
123 this->last_dropped_log_time_ = 0;
124 xTaskNotifyGive(this->task_handle_);
125#endif
126 this->on_connect_.call(event.session_present);
127 break;
128 case MQTT_EVENT_DISCONNECTED:
129 ESP_LOGV(TAG, "MQTT_EVENT_DISCONNECTED");
130 // TODO is there a way to get the disconnect reason?
131 this->is_connected_ = false;
132#if defined(USE_MQTT_IDF_ENQUEUE)
133 this->last_dropped_log_time_ = 0;
134 xTaskNotifyGive(this->task_handle_);
135#endif
137 break;
138
139 case MQTT_EVENT_SUBSCRIBED:
140 ESP_LOGV(TAG, "MQTT_EVENT_SUBSCRIBED, msg_id=%d", event.msg_id);
141 // hardcode QoS to 0. QoS is not used in this context but required to mirror the AsyncMqtt interface
142 this->on_subscribe_.call((int) event.msg_id, 0);
143 break;
144 case MQTT_EVENT_UNSUBSCRIBED:
145 ESP_LOGV(TAG, "MQTT_EVENT_UNSUBSCRIBED, msg_id=%d", event.msg_id);
146 this->on_unsubscribe_.call((int) event.msg_id);
147 break;
148 case MQTT_EVENT_PUBLISHED:
149 ESP_LOGV(TAG, "MQTT_EVENT_PUBLISHED, msg_id=%d", event.msg_id);
150 this->on_publish_.call((int) event.msg_id);
151 break;
152 case MQTT_EVENT_DATA: {
153 static std::string topic;
154 if (!event.topic.empty()) {
155 // When a single message arrives as multiple chunks, the topic will be empty
156 // on any but the first message, leading to event.topic being an empty string.
157 // To ensure handlers get the correct topic, cache the last seen topic to
158 // simulate always receiving the topic from underlying library
159 topic = event.topic;
160 }
161 ESP_LOGV(TAG, "MQTT_EVENT_DATA %s", topic.c_str());
162 this->on_message_.call(topic.c_str(), event.data.data(), event.data.size(), event.current_data_offset,
163 event.total_data_len);
164 } break;
165 case MQTT_EVENT_ERROR:
166 ESP_LOGE(TAG, "MQTT_EVENT_ERROR");
167 if (event.error_handle.error_type == MQTT_ERROR_TYPE_TCP_TRANSPORT) {
168 ESP_LOGE(TAG,
169 "Last error code reported from esp-tls: 0x%x\n"
170 "Last tls stack error number: 0x%x\n"
171 "Last captured errno : %d (%s)",
172 event.error_handle.esp_tls_last_esp_err, event.error_handle.esp_tls_stack_err,
173 event.error_handle.esp_transport_sock_errno, strerror(event.error_handle.esp_transport_sock_errno));
174 } else if (event.error_handle.error_type == MQTT_ERROR_TYPE_CONNECTION_REFUSED) {
175 ESP_LOGE(TAG, "Connection refused error: 0x%x", event.error_handle.connect_return_code);
176 } else {
177 ESP_LOGE(TAG, "Unknown error type: 0x%x", event.error_handle.error_type);
178 }
179 break;
180 default:
181 ESP_LOGV(TAG, "Other event id:%d", event.event_id);
182 break;
183 }
184}
185
187void MQTTBackendESP32::mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id,
188 void *event_data) {
189 MQTTBackendESP32 *instance = static_cast<MQTTBackendESP32 *>(handler_args);
190 // queue event to decouple processing
191 if (instance) {
192 auto event = *static_cast<esp_mqtt_event_t *>(event_data);
193 instance->mqtt_events_.emplace(event);
194
195 // Wake main loop immediately to process MQTT event instead of waiting for select() timeout
196#if defined(USE_SOCKET_SELECT_SUPPORT) && defined(USE_WAKE_LOOP_THREADSAFE)
198#endif
199 }
200}
201
202#if defined(USE_MQTT_IDF_ENQUEUE)
204 MQTTBackendESP32 *this_mqtt = (MQTTBackendESP32 *) params;
205
206 while (true) {
207 // Wait for notification indefinitely
208 ulTaskNotifyTake(pdTRUE, portMAX_DELAY);
209
210 // Process all queued items
211 struct QueueElement *elem;
212 while ((elem = this_mqtt->mqtt_queue_.pop()) != nullptr) {
213 if (this_mqtt->is_connected_) {
214 switch (elem->type) {
216 esp_mqtt_client_subscribe(this_mqtt->handler_.get(), elem->topic, elem->qos);
217 break;
218
220 esp_mqtt_client_unsubscribe(this_mqtt->handler_.get(), elem->topic);
221 break;
222
224 esp_mqtt_client_publish(this_mqtt->handler_.get(), elem->topic, elem->payload, elem->payload_len, elem->qos,
225 elem->retain);
226 break;
227
228 default:
229 ESP_LOGE(TAG, "Invalid operation type from MQTT queue");
230 break;
231 }
232 }
233 this_mqtt->mqtt_event_pool_.release(elem);
234 }
235 }
236}
237
238bool MQTTBackendESP32::enqueue_(MqttQueueTypeT type, const char *topic, int qos, bool retain, const char *payload,
239 size_t len) {
240 auto *elem = this->mqtt_event_pool_.allocate();
241
242 if (!elem) {
243 // Queue is full - increment counter but don't log immediately.
244 // Logging here can cause a cascade effect: if MQTT logging is enabled,
245 // each dropped message would generate a log message, which could itself
246 // be sent via MQTT, causing more drops and more logs in a feedback loop
247 // that eventually triggers a watchdog reset. Instead, we log periodically
248 // in loop() to prevent blocking the event loop during spikes.
249 this->mqtt_queue_.increment_dropped_count();
250 return false;
251 }
252
253 elem->type = type;
254 elem->qos = qos;
255 elem->retain = retain;
256
257 // Use the helper to allocate and copy data
258 if (!elem->set_data(topic, payload, len)) {
259 // Allocation failed, return elem to pool
260 this->mqtt_event_pool_.release(elem);
261 // Increment counter without logging to avoid cascade effect during memory pressure
262 this->mqtt_queue_.increment_dropped_count();
263 return false;
264 }
265
266 // Push to queue - always succeeds since we allocated from the pool
267 this->mqtt_queue_.push(elem);
268 return true;
269}
270#endif // USE_MQTT_IDF_ENQUEUE
271
272} // namespace esphome::mqtt
273#endif // USE_ESP32
274#endif
void wake_loop_threadsafe()
Wake the main event loop from a FreeRTOS task Thread-safe, can be called from task context to immedia...
uint32_t IRAM_ATTR HOT get_loop_component_start_time() const
Get the cached time in milliseconds from when the current component started its loop execution.
CallbackManager< on_connect_callback_t > on_connect_
CallbackManager< on_disconnect_callback_t > on_disconnect_
CallbackManager< on_message_callback_t > on_message_
static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data)
static - Dispatch event to instance method
optional< std::string > ca_certificate_
CallbackManager< on_subscribe_callback_t > on_subscribe_
static void esphome_mqtt_task(void *params)
bool enqueue_(MqttQueueTypeT type, const char *topic, int qos=0, bool retain=false, const char *payload=NULL, size_t len=0)
EventPool< struct QueueElement, MQTT_QUEUE_LENGTH > mqtt_event_pool_
CallbackManager< on_unsubscribe_callback_t > on_unsubscribe_
esp_mqtt_client_config_t mqtt_cfg_
CallbackManager< on_publish_user_callback_t > on_publish_
static constexpr uint32_t DROP_LOG_INTERVAL_MS
NotifyingLockFreeQueue< struct QueueElement, MQTT_QUEUE_LENGTH > mqtt_queue_
void mqtt_event_handler_(const Event &event)
optional< std::string > cl_certificate_
bool has_value() const
Definition optional.h:92
value_type const & value() const
Definition optional.h:94
uint16_t type
std::string size_t len
Definition helpers.h:595
Application App
Global storage of Application pointer - only one Application can exist.
uint8_t event_id
Definition tt21100.cpp:3