13static const char *
const TAG =
"mqtt.idf";
45 mqtt_cfg_.broker.address.transport = MQTT_TRANSPORT_OVER_SSL;
52 mqtt_cfg_.broker.address.transport = MQTT_TRANSPORT_OVER_TCP;
55 auto *mqtt_client = esp_mqtt_client_init(&
mqtt_cfg_);
60#if defined(USE_MQTT_IDF_ENQUEUE)
66 ESP_LOGE(TAG,
"Failed to create MQTT task");
77 ESP_LOGE(TAG,
"Failed to init client");
91#if defined(USE_MQTT_IDF_ENQUEUE)
103 uint16_t dropped = this->
mqtt_queue_.get_and_reset_dropped_count();
113 ESP_LOGV(TAG,
"Event dispatched from event loop event_id=%d", event.event_id);
114 switch (event.event_id) {
115 case MQTT_EVENT_BEFORE_CONNECT:
116 ESP_LOGV(TAG,
"MQTT_EVENT_BEFORE_CONNECT");
119 case MQTT_EVENT_CONNECTED:
120 ESP_LOGV(TAG,
"MQTT_EVENT_CONNECTED");
122#if defined(USE_MQTT_IDF_ENQUEUE)
128 case MQTT_EVENT_DISCONNECTED:
129 ESP_LOGV(TAG,
"MQTT_EVENT_DISCONNECTED");
132#if defined(USE_MQTT_IDF_ENQUEUE)
139 case MQTT_EVENT_SUBSCRIBED:
140 ESP_LOGV(TAG,
"MQTT_EVENT_SUBSCRIBED, msg_id=%d", event.msg_id);
144 case MQTT_EVENT_UNSUBSCRIBED:
145 ESP_LOGV(TAG,
"MQTT_EVENT_UNSUBSCRIBED, msg_id=%d", event.msg_id);
148 case MQTT_EVENT_PUBLISHED:
149 ESP_LOGV(TAG,
"MQTT_EVENT_PUBLISHED, msg_id=%d", event.msg_id);
152 case MQTT_EVENT_DATA: {
153 static std::string topic;
154 if (!event.topic.empty()) {
161 ESP_LOGV(TAG,
"MQTT_EVENT_DATA %s", topic.c_str());
162 this->
on_message_.call(topic.c_str(), event.data.data(), event.data.size(), event.current_data_offset,
163 event.total_data_len);
165 case MQTT_EVENT_ERROR:
166 ESP_LOGE(TAG,
"MQTT_EVENT_ERROR");
167 if (event.error_handle.error_type == MQTT_ERROR_TYPE_TCP_TRANSPORT) {
169 "Last error code reported from esp-tls: 0x%x\n"
170 "Last tls stack error number: 0x%x\n"
171 "Last captured errno : %d (%s)",
172 event.error_handle.esp_tls_last_esp_err, event.error_handle.esp_tls_stack_err,
173 event.error_handle.esp_transport_sock_errno, strerror(event.error_handle.esp_transport_sock_errno));
174 }
else if (event.error_handle.error_type == MQTT_ERROR_TYPE_CONNECTION_REFUSED) {
175 ESP_LOGE(TAG,
"Connection refused error: 0x%x", event.error_handle.connect_return_code);
177 ESP_LOGE(TAG,
"Unknown error type: 0x%x", event.error_handle.error_type);
181 ESP_LOGV(TAG,
"Other event id:%d", event.event_id);
192 auto event = *
static_cast<esp_mqtt_event_t *
>(event_data);
196#if defined(USE_SOCKET_SELECT_SUPPORT) && defined(USE_WAKE_LOOP_THREADSAFE)
202#if defined(USE_MQTT_IDF_ENQUEUE)
208 ulTaskNotifyTake(pdTRUE, portMAX_DELAY);
212 while ((elem = this_mqtt->
mqtt_queue_.pop()) !=
nullptr) {
214 switch (elem->
type) {
216 esp_mqtt_client_subscribe(this_mqtt->
handler_.get(), elem->
topic, elem->
qos);
220 esp_mqtt_client_unsubscribe(this_mqtt->
handler_.get(), elem->
topic);
229 ESP_LOGE(TAG,
"Invalid operation type from MQTT queue");
255 elem->retain = retain;
258 if (!elem->set_data(topic, payload,
len)) {
void wake_loop_threadsafe()
Wake the main event loop from a FreeRTOS task Thread-safe, can be called from task context to immedia...
uint32_t IRAM_ATTR HOT get_loop_component_start_time() const
Get the cached time in milliseconds from when the current component started its loop execution.
CallbackManager< on_connect_callback_t > on_connect_
CallbackManager< on_disconnect_callback_t > on_disconnect_
static const size_t TASK_STACK_SIZE_TLS
std::queue< Event > mqtt_events_
CallbackManager< on_message_callback_t > on_message_
static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data)
static - Dispatch event to instance method
optional< std::string > ca_certificate_
static const size_t TASK_STACK_SIZE
CallbackManager< on_subscribe_callback_t > on_subscribe_
static void esphome_mqtt_task(void *params)
bool enqueue_(MqttQueueTypeT type, const char *topic, int qos=0, bool retain=false, const char *payload=NULL, size_t len=0)
EventPool< struct QueueElement, MQTT_QUEUE_LENGTH > mqtt_event_pool_
CallbackManager< on_unsubscribe_callback_t > on_unsubscribe_
static const ssize_t TASK_PRIORITY
esp_mqtt_client_config_t mqtt_cfg_
TaskHandle_t task_handle_
CallbackManager< on_publish_user_callback_t > on_publish_
static constexpr uint32_t DROP_LOG_INTERVAL_MS
NotifyingLockFreeQueue< struct QueueElement, MQTT_QUEUE_LENGTH > mqtt_queue_
uint32_t last_dropped_log_time_
optional< std::string > cl_key_
void mqtt_event_handler_(const Event &event)
optional< std::string > cl_certificate_
value_type const & value() const
@ MQTT_QUEUE_TYPE_SUBSCRIBE
@ MQTT_QUEUE_TYPE_UNSUBSCRIBE
@ MQTT_QUEUE_TYPE_PUBLISH
Application App
Global storage of Application pointer - only one Application can exist.