15static const char *
const TAG =
"scheduler";
17static const uint32_t MAX_LOGICALLY_DELETED_ITEMS = 10;
19static constexpr uint32_t HALF_MAX_UINT32 = std::numeric_limits<uint32_t>::max() / 2;
21static constexpr uint32_t MAX_INTERVAL_DELAY = 5000;
26#ifdef ESPHOME_DEBUG_SCHEDULER
28static void validate_static_string(
const char *name) {
34 uintptr_t addr =
reinterpret_cast<uintptr_t
>(name);
38 uintptr_t stack_addr =
reinterpret_cast<uintptr_t
>(&stack_var);
42 if (addr > (stack_addr - 0x2000) && addr < (stack_addr + 0x2000)) {
44 "WARNING: Scheduler name '%s' at %p appears to be on the stack - this is unsafe!\n"
45 " Stack reference at %p",
46 name, name, &stack_var);
51 static const char *static_str =
"test";
52 uintptr_t static_addr =
reinterpret_cast<uintptr_t
>(static_str);
55 if (addr > static_addr + 0x100000 || (static_addr > 0x100000 && addr < static_addr - 0x100000)) {
56 ESP_LOGW(TAG,
"WARNING: Scheduler name '%s' at %p might be on heap (static ref at %p)", name, name, static_str);
67void HOT Scheduler::set_timer_common_(Component *component, SchedulerItem::Type
type,
bool is_static_string,
68 const void *name_ptr, uint32_t
delay, std::function<
void()> func,
bool is_retry) {
70 const char *name_cstr = this->get_name_cstr_(is_static_string, name_ptr);
72 if (
delay == SCHEDULER_DONT_RUN) {
74 LockGuard guard{this->lock_};
75 this->cancel_item_locked_(component, name_cstr,
type);
80 auto item = make_unique<SchedulerItem>();
81 item->component = component;
82 item->set_name(name_cstr, !is_static_string);
84 item->callback = std::move(func);
87#ifdef ESPHOME_THREAD_MULTI_ATOMICS
88 item->remove.store(
false, std::memory_order_relaxed);
92 item->is_retry = is_retry;
94#ifndef ESPHOME_THREAD_SINGLE
97 if (
delay == 0 &&
type == SchedulerItem::TIMEOUT) {
99 LockGuard guard{this->lock_};
100 this->cancel_item_locked_(component, name_cstr,
type);
101 this->defer_queue_.push_back(std::move(item));
107 const auto now = this->millis_64_(
millis());
110 if (
type == SchedulerItem::INTERVAL) {
111 item->interval =
delay;
115 item->next_execution_ = now + offset;
116 ESP_LOGV(TAG,
"Scheduler interval for %s is %" PRIu32
"ms, offset %" PRIu32
"ms", name_cstr ? name_cstr :
"",
delay,
120 item->next_execution_ = now +
delay;
123#ifdef ESPHOME_DEBUG_SCHEDULER
125 if (is_static_string && name_cstr !=
nullptr) {
126 validate_static_string(name_cstr);
130 const char *type_str = (
type == SchedulerItem::TIMEOUT) ?
"timeout" :
"interval";
131 if (
type == SchedulerItem::TIMEOUT) {
132 ESP_LOGD(TAG,
"set_%s(name='%s/%s', %s=%" PRIu32
")", type_str, item->get_source(),
133 name_cstr ? name_cstr :
"(null)", type_str,
delay);
135 ESP_LOGD(TAG,
"set_%s(name='%s/%s', %s=%" PRIu32
", offset=%" PRIu32
")", type_str, item->get_source(),
136 name_cstr ? name_cstr :
"(null)", type_str,
delay,
static_cast<uint32_t>(item->next_execution_ - now));
140 LockGuard guard{this->lock_};
143 if (is_retry && name_cstr !=
nullptr &&
type == SchedulerItem::TIMEOUT &&
144 (has_cancelled_timeout_in_container_(this->items_, component, name_cstr,
true) ||
145 has_cancelled_timeout_in_container_(this->to_add_, component, name_cstr,
true))) {
147#ifdef ESPHOME_DEBUG_SCHEDULER
148 ESP_LOGD(TAG,
"Skipping retry '%s' - found cancelled item", name_cstr);
155 this->cancel_item_locked_(component, name_cstr,
type);
158 this->to_add_.push_back(std::move(item));
161void HOT Scheduler::set_timeout(Component *component,
const char *name, uint32_t timeout, std::function<
void()> func) {
162 this->set_timer_common_(component, SchedulerItem::TIMEOUT,
true, name, timeout, std::move(func));
165void HOT Scheduler::set_timeout(Component *component,
const std::string &name, uint32_t timeout,
166 std::function<
void()> func) {
167 this->set_timer_common_(component, SchedulerItem::TIMEOUT,
false, &name, timeout, std::move(func));
169bool HOT Scheduler::cancel_timeout(Component *component,
const std::string &name) {
170 return this->cancel_item_(component,
false, &name, SchedulerItem::TIMEOUT);
172bool HOT Scheduler::cancel_timeout(Component *component,
const char *name) {
173 return this->cancel_item_(component,
true, name, SchedulerItem::TIMEOUT);
175void HOT Scheduler::set_interval(Component *component,
const std::string &name, uint32_t interval,
176 std::function<
void()> func) {
177 this->set_timer_common_(component, SchedulerItem::INTERVAL,
false, &name, interval, std::move(func));
180void HOT Scheduler::set_interval(Component *component,
const char *name, uint32_t interval,
181 std::function<
void()> func) {
182 this->set_timer_common_(component, SchedulerItem::INTERVAL,
true, name, interval, std::move(func));
184bool HOT Scheduler::cancel_interval(Component *component,
const std::string &name) {
185 return this->cancel_item_(component,
false, &name, SchedulerItem::INTERVAL);
187bool HOT Scheduler::cancel_interval(Component *component,
const char *name) {
188 return this->cancel_item_(component,
true, name, SchedulerItem::INTERVAL);
193 uint8_t retry_countdown;
195 Component *component;
197 float backoff_increase_factor;
198 Scheduler *scheduler;
202 RetryResult const retry_result = args->func(--args->retry_countdown);
206 args->scheduler->set_timer_common_(
207 args->component, Scheduler::SchedulerItem::TIMEOUT,
false, &args->name, args->current_interval,
208 [args]() { retry_handler(args); },
true);
210 args->current_interval *= args->backoff_increase_factor;
213void HOT Scheduler::set_retry_common_(Component *component,
bool is_static_string,
const void *name_ptr,
214 uint32_t initial_wait_time, uint8_t max_attempts,
215 std::function<
RetryResult(uint8_t)> func,
float backoff_increase_factor) {
216 const char *name_cstr = this->get_name_cstr_(is_static_string, name_ptr);
218 if (name_cstr !=
nullptr)
219 this->cancel_retry(component, name_cstr);
221 if (initial_wait_time == SCHEDULER_DONT_RUN)
224 ESP_LOGVV(TAG,
"set_retry(name='%s', initial_wait_time=%" PRIu32
", max_attempts=%u, backoff_factor=%0.1f)",
225 name_cstr ? name_cstr :
"", initial_wait_time, max_attempts, backoff_increase_factor);
227 if (backoff_increase_factor < 0.0001) {
228 ESP_LOGE(TAG,
"backoff_factor %0.1f too small, using 1.0: %s", backoff_increase_factor, name_cstr ? name_cstr :
"");
229 backoff_increase_factor = 1;
232 auto args = std::make_shared<RetryArgs>();
233 args->func = std::move(func);
234 args->retry_countdown = max_attempts;
235 args->current_interval = initial_wait_time;
236 args->component = component;
237 args->name = name_cstr ? name_cstr :
"";
238 args->backoff_increase_factor = backoff_increase_factor;
239 args->scheduler =
this;
242 this->set_timer_common_(
243 component, SchedulerItem::TIMEOUT,
false, &args->name, 0, [args]() { retry_handler(args); },
247void HOT Scheduler::set_retry(Component *component,
const std::string &name, uint32_t initial_wait_time,
248 uint8_t max_attempts, std::function<
RetryResult(uint8_t)> func,
249 float backoff_increase_factor) {
250 this->set_retry_common_(component,
false, &name, initial_wait_time, max_attempts, std::move(func),
251 backoff_increase_factor);
254void HOT Scheduler::set_retry(Component *component,
const char *name, uint32_t initial_wait_time, uint8_t max_attempts,
255 std::function<
RetryResult(uint8_t)> func,
float backoff_increase_factor) {
256 this->set_retry_common_(component,
true, name, initial_wait_time, max_attempts, std::move(func),
257 backoff_increase_factor);
259bool HOT Scheduler::cancel_retry(Component *component,
const std::string &name) {
260 return this->cancel_retry(component, name.c_str());
263bool HOT Scheduler::cancel_retry(Component *component,
const char *name) {
265 LockGuard guard{this->lock_};
266 return this->cancel_item_locked_(component, name, SchedulerItem::TIMEOUT,
true);
269optional<uint32_t> HOT Scheduler::next_schedule_in(uint32_t now) {
275 if (this->cleanup_() == 0)
278 auto &item = this->items_[0];
280 const auto now_64 = this->millis_64_(now);
281 if (item->next_execution_ < now_64)
283 return item->next_execution_ - now_64;
285void HOT Scheduler::call(uint32_t now) {
286#ifndef ESPHOME_THREAD_SINGLE
299 while (!this->defer_queue_.empty()) {
303 std::unique_ptr<SchedulerItem> item;
305 LockGuard lock(this->lock_);
306 item = std::move(this->defer_queue_.front());
307 this->defer_queue_.pop_front();
312 if (!this->should_skip_item_(item.get())) {
313 this->execute_item_(item.get(), now);
319 const auto now_64 = this->millis_64_(now);
320 this->process_to_add();
322#ifdef ESPHOME_DEBUG_SCHEDULER
323 static uint64_t last_print = 0;
325 if (now_64 - last_print > 2000) {
327 std::vector<std::unique_ptr<SchedulerItem>> old_items;
328#ifdef ESPHOME_THREAD_MULTI_ATOMICS
329 const auto last_dbg = this->last_millis_.load(std::memory_order_relaxed);
330 const auto major_dbg = this->millis_major_.load(std::memory_order_relaxed);
331 ESP_LOGD(TAG,
"Items: count=%zu, now=%" PRIu64
" (%" PRIu16
", %" PRIu32
")", this->items_.size(), now_64,
332 major_dbg, last_dbg);
334 ESP_LOGD(TAG,
"Items: count=%zu, now=%" PRIu64
" (%" PRIu16
", %" PRIu32
")", this->items_.size(), now_64,
335 this->millis_major_, this->last_millis_);
339 while (!this->items_.empty()) {
340 std::unique_ptr<SchedulerItem> item;
342 LockGuard guard{this->lock_};
343 item = std::move(this->items_[0]);
347 const char *name = item->get_name();
348 ESP_LOGD(TAG,
" %s '%s/%s' interval=%" PRIu32
" next_execution in %" PRIu64
"ms at %" PRIu64,
349 item->get_type_str(), item->get_source(), name ? name :
"(null)", item->interval,
350 item->next_execution_ - now_64, item->next_execution_);
352 old_items.push_back(std::move(item));
357 LockGuard guard{this->lock_};
358 this->items_ = std::move(old_items);
360 std::make_heap(this->items_.begin(), this->items_.end(), SchedulerItem::cmp);
366 if (this->to_remove_ > MAX_LOGICALLY_DELETED_ITEMS) {
372 LockGuard guard{this->lock_};
374 std::vector<std::unique_ptr<SchedulerItem>> valid_items;
377 for (
auto &item : this->items_) {
379 valid_items.push_back(std::move(item));
384 this->items_ = std::move(valid_items);
386 std::make_heap(this->items_.begin(), this->items_.end(), SchedulerItem::cmp);
387 this->to_remove_ = 0;
392 while (!this->items_.empty()) {
396 auto &item = this->items_[0];
397 if (item->next_execution_ > now_64) {
402 if (item->component !=
nullptr && item->component->is_failed()) {
403 LockGuard guard{this->lock_};
412#ifdef ESPHOME_THREAD_MULTI_NO_ATOMICS
415 LockGuard guard{this->lock_};
416 if (is_item_removed_(item.get())) {
424 if (is_item_removed_(item.get())) {
425 LockGuard guard{this->lock_};
432#ifdef ESPHOME_DEBUG_SCHEDULER
433 const char *item_name = item->get_name();
434 ESP_LOGV(TAG,
"Running %s '%s/%s' with interval=%" PRIu32
" next_execution=%" PRIu64
" (now=%" PRIu64
")",
435 item->get_type_str(), item->get_source(), item_name ? item_name :
"(null)", item->interval,
436 item->next_execution_, now_64);
442 this->execute_item_(item.get(), now);
446 LockGuard guard{this->lock_};
449 auto item = std::move(this->items_[0]);
460 if (item->type == SchedulerItem::INTERVAL) {
461 item->next_execution_ = now_64 + item->interval;
464 this->to_add_.push_back(std::move(item));
469 this->process_to_add();
471void HOT Scheduler::process_to_add() {
472 LockGuard guard{this->lock_};
473 for (
auto &it : this->to_add_) {
478 this->items_.push_back(std::move(it));
479 std::push_heap(this->items_.begin(), this->items_.end(), SchedulerItem::cmp);
481 this->to_add_.clear();
483size_t HOT Scheduler::cleanup_() {
491 if (this->to_remove_ == 0)
492 return this->items_.size();
502 LockGuard guard{this->lock_};
503 while (!this->items_.empty()) {
504 auto &item = this->items_[0];
510 return this->items_.size();
512void HOT Scheduler::pop_raw_() {
513 std::pop_heap(this->items_.begin(), this->items_.end(), SchedulerItem::cmp);
514 this->items_.pop_back();
518void HOT Scheduler::execute_item_(SchedulerItem *item, uint32_t now) {
520 WarnIfComponentBlockingGuard guard{item->component, now};
526bool HOT Scheduler::cancel_item_(Component *component,
bool is_static_string,
const void *name_ptr,
527 SchedulerItem::Type
type) {
529 const char *name_cstr = this->get_name_cstr_(is_static_string, name_ptr);
532 LockGuard guard{this->lock_};
533 return this->cancel_item_locked_(component, name_cstr,
type);
537bool HOT Scheduler::cancel_item_locked_(Component *component,
const char *name_cstr, SchedulerItem::Type
type,
540 if (name_cstr ==
nullptr) {
544 size_t total_cancelled = 0;
547#ifndef ESPHOME_THREAD_SINGLE
549 if (
type == SchedulerItem::TIMEOUT) {
550 for (
auto &item : this->defer_queue_) {
551 if (this->matches_item_(item, component, name_cstr,
type, match_retry)) {
552 this->mark_item_removed_(item.get());
560 for (
auto &item : this->items_) {
561 if (this->matches_item_(item, component, name_cstr,
type, match_retry)) {
562 this->mark_item_removed_(item.get());
569 for (
auto &item : this->to_add_) {
570 if (this->matches_item_(item, component, name_cstr,
type, match_retry)) {
571 this->mark_item_removed_(item.get());
577 return total_cancelled > 0;
580uint64_t Scheduler::millis_64_(uint32_t now) {
594#ifdef ESPHOME_THREAD_SINGLE
600 uint16_t major = this->millis_major_;
604 if (now < last && (last - now) > HALF_MAX_UINT32) {
605 this->millis_major_++;
607#ifdef ESPHOME_DEBUG_SCHEDULER
608 ESP_LOGD(TAG,
"Detected true 32-bit rollover at %" PRIu32
"ms (was %" PRIu32
")", now, last);
614 this->last_millis_ = now;
618 return now + (
static_cast<uint64_t
>(major) << 32);
620#elif defined(ESPHOME_THREAD_MULTI_NO_ATOMICS)
628 uint16_t major = this->millis_major_;
633 static const uint32_t ROLLOVER_WINDOW = 10000;
636 bool near_rollover = (last > (std::numeric_limits<uint32_t>::max() - ROLLOVER_WINDOW)) || (now < ROLLOVER_WINDOW);
638 if (near_rollover || (now < last && (last - now) > HALF_MAX_UINT32)) {
640 LockGuard guard{this->lock_};
642 last = this->last_millis_;
644 if (now < last && (last - now) > HALF_MAX_UINT32) {
646 this->millis_major_++;
648#ifdef ESPHOME_DEBUG_SCHEDULER
649 ESP_LOGD(TAG,
"Detected true 32-bit rollover at %" PRIu32
"ms (was %" PRIu32
")", now, last);
653 this->last_millis_ = now;
654 }
else if (now > last) {
661 this->last_millis_ = now;
667 return now + (
static_cast<uint64_t
>(major) << 32);
669#elif defined(ESPHOME_THREAD_MULTI_ATOMICS)
680 uint16_t major = this->millis_major_.load(std::memory_order_acquire);
688 uint32_t last = this->last_millis_.load(std::memory_order_acquire);
692 if (now < last && (last - now) > HALF_MAX_UINT32) {
694 LockGuard guard{this->lock_};
696 last = this->last_millis_.load(std::memory_order_relaxed);
698 if (now < last && (last - now) > HALF_MAX_UINT32) {
700 this->millis_major_.fetch_add(1, std::memory_order_relaxed);
702#ifdef ESPHOME_DEBUG_SCHEDULER
703 ESP_LOGD(TAG,
"Detected true 32-bit rollover at %" PRIu32
"ms (was %" PRIu32
")", now, last);
711 this->last_millis_.store(now, std::memory_order_release);
715 while (now > last && (now - last) < HALF_MAX_UINT32) {
716 if (this->last_millis_.compare_exchange_weak(last, now,
717 std::memory_order_release,
718 std::memory_order_relaxed)) {
725 uint16_t major_end = this->millis_major_.load(std::memory_order_relaxed);
726 if (major_end == major)
727 return now + (
static_cast<uint64_t
>(major) << 32);
730 __builtin_unreachable();
734 "No platform threading model defined. One of ESPHOME_THREAD_SINGLE, ESPHOME_THREAD_MULTI_NO_ATOMICS, or ESPHOME_THREAD_MULTI_ATOMICS must be defined."
738bool HOT Scheduler::SchedulerItem::cmp(
const std::unique_ptr<SchedulerItem> &a,
739 const std::unique_ptr<SchedulerItem> &b) {
740 return a->next_execution_ > b->next_execution_;
void set_current_component(Component *component)
Providing packet encoding functions for exchanging data with a remote host.
float random_float()
Return a random float between 0 and 1.
void retry_handler(const std::shared_ptr< RetryArgs > &args)
void IRAM_ATTR HOT delay(uint32_t ms)
uint32_t IRAM_ATTR HOT millis()
Application App
Global storage of Application pointer - only one Application can exist.