13static const char *
const TAG =
"mqtt.idf";
45 mqtt_cfg_.broker.address.transport = MQTT_TRANSPORT_OVER_SSL;
52 mqtt_cfg_.broker.address.transport = MQTT_TRANSPORT_OVER_TCP;
55 auto *mqtt_client = esp_mqtt_client_init(&
mqtt_cfg_);
60#if defined(USE_MQTT_IDF_ENQUEUE)
66 ESP_LOGE(TAG,
"Failed to create MQTT task");
77 ESP_LOGE(TAG,
"Failed to init client");
86 if (event !=
nullptr) {
93 if (inbound_dropped > 0) {
94 ESP_LOGW(TAG,
"Dropped %u inbound MQTT events", inbound_dropped);
97#if defined(USE_MQTT_IDF_ENQUEUE)
109 uint16_t dropped = this->
mqtt_queue_.get_and_reset_dropped_count();
119 ESP_LOGV(TAG,
"Event dispatched from event loop event_id=%d", event.event_id);
120 switch (event.event_id) {
121 case MQTT_EVENT_BEFORE_CONNECT:
122 ESP_LOGV(TAG,
"MQTT_EVENT_BEFORE_CONNECT");
125 case MQTT_EVENT_CONNECTED:
126 ESP_LOGV(TAG,
"MQTT_EVENT_CONNECTED");
128#if defined(USE_MQTT_IDF_ENQUEUE)
134 case MQTT_EVENT_DISCONNECTED:
135 ESP_LOGV(TAG,
"MQTT_EVENT_DISCONNECTED");
138#if defined(USE_MQTT_IDF_ENQUEUE)
145 case MQTT_EVENT_SUBSCRIBED:
146 ESP_LOGV(TAG,
"MQTT_EVENT_SUBSCRIBED, msg_id=%d", event.msg_id);
150 case MQTT_EVENT_UNSUBSCRIBED:
151 ESP_LOGV(TAG,
"MQTT_EVENT_UNSUBSCRIBED, msg_id=%d", event.msg_id);
154 case MQTT_EVENT_PUBLISHED:
155 ESP_LOGV(TAG,
"MQTT_EVENT_PUBLISHED, msg_id=%d", event.msg_id);
158 case MQTT_EVENT_DATA: {
159 if (!event.topic.empty()) {
166 ESP_LOGV(TAG,
"MQTT_EVENT_DATA %s", this->
cached_topic_.c_str());
168 event.current_data_offset, event.total_data_len);
170 case MQTT_EVENT_ERROR:
171 ESP_LOGE(TAG,
"MQTT_EVENT_ERROR");
172 if (event.error_handle.error_type == MQTT_ERROR_TYPE_TCP_TRANSPORT) {
173 ESP_LOGE(TAG,
"Last esp-tls error: 0x%x, tls stack error: 0x%x, socket errno: %d (%s)",
174 event.error_handle.esp_tls_last_esp_err, event.error_handle.esp_tls_stack_err,
175 event.error_handle.esp_transport_sock_errno, strerror(event.error_handle.esp_transport_sock_errno));
176 }
else if (event.error_handle.error_type == MQTT_ERROR_TYPE_CONNECTION_REFUSED) {
177 ESP_LOGE(TAG,
"Connection refused error: 0x%x", event.error_handle.connect_return_code);
179 ESP_LOGE(TAG,
"Unknown error type: 0x%x", event.error_handle.error_type);
183 ESP_LOGV(TAG,
"Other event id:%d", event.event_id);
195 if (event ==
nullptr) {
200 event->populate(*
static_cast<esp_mqtt_event_t *
>(event_data));
206#if defined(USE_SOCKET_SELECT_SUPPORT) && defined(USE_WAKE_LOOP_THREADSAFE)
212#if defined(USE_MQTT_IDF_ENQUEUE)
218 ulTaskNotifyTake(pdTRUE, portMAX_DELAY);
222 while ((elem = this_mqtt->
mqtt_queue_.pop()) !=
nullptr) {
224 switch (elem->
type) {
226 esp_mqtt_client_subscribe(this_mqtt->
handler_.get(), elem->
topic, elem->
qos);
230 esp_mqtt_client_unsubscribe(this_mqtt->
handler_.get(), elem->
topic);
239 ESP_LOGE(TAG,
"Invalid operation type from MQTT queue");
265 elem->retain = retain;
268 if (!elem->set_data(topic, payload,
len)) {
void wake_loop_threadsafe()
Wake the main event loop from another FreeRTOS task.
uint32_t IRAM_ATTR HOT get_loop_component_start_time() const
Get the cached time in milliseconds from when the current component started its loop execution.
EventPool< Event, MQTT_EVENT_QUEUE_LENGTH - 1 > mqtt_event_pool_
EventPool< struct QueueElement, MQTT_QUEUE_LENGTH - 1 > mqtt_outbound_pool_
CallbackManager< on_connect_callback_t > on_connect_
CallbackManager< on_disconnect_callback_t > on_disconnect_
CallbackManager< on_message_callback_t > on_message_
static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data)
static - Dispatch event to instance method
optional< std::string > ca_certificate_
CallbackManager< on_subscribe_callback_t > on_subscribe_
static void esphome_mqtt_task(void *params)
bool enqueue_(MqttQueueTypeT type, const char *topic, int qos=0, bool retain=false, const char *payload=NULL, size_t len=0)
static constexpr size_t TASK_STACK_SIZE_TLS
CallbackManager< on_unsubscribe_callback_t > on_unsubscribe_
std::string cached_topic_
esp_mqtt_client_config_t mqtt_cfg_
TaskHandle_t task_handle_
static constexpr size_t TASK_STACK_SIZE
CallbackManager< on_publish_user_callback_t > on_publish_
static constexpr uint32_t DROP_LOG_INTERVAL_MS
NotifyingLockFreeQueue< struct QueueElement, MQTT_QUEUE_LENGTH > mqtt_queue_
uint32_t last_dropped_log_time_
optional< std::string > cl_key_
void mqtt_event_handler_(const Event &event)
optional< std::string > cl_certificate_
static constexpr ssize_t TASK_PRIORITY
LockFreeQueue< Event, MQTT_EVENT_QUEUE_LENGTH > mqtt_event_queue_
@ MQTT_QUEUE_TYPE_SUBSCRIBE
@ MQTT_QUEUE_TYPE_UNSUBSCRIBE
@ MQTT_QUEUE_TYPE_PUBLISH
Application App
Global storage of Application pointer - only one Application can exist.