ESPHome 2026.3.3
Loading...
Searching...
No Matches
lwip_raw_tcp_impl.cpp
Go to the documentation of this file.
1#include "socket.h"
3
4#ifdef USE_SOCKET_IMPL_LWIP_TCP
5
6#include <cerrno>
7#include <cstring>
8#include <sys/time.h>
9
11#include "esphome/core/log.h"
12
13#ifdef USE_ESP8266
14#include <coredecls.h> // For esp_schedule()
15#elif defined(USE_RP2040)
16#include <hardware/sync.h> // For __sev(), __wfe()
17#include <pico/time.h> // For add_alarm_in_ms(), cancel_alarm()
18#endif
19
20namespace esphome::socket {
21
22#ifdef USE_ESP8266
23// Flag to signal socket activity - checked by socket_delay() to exit early
24// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
25static volatile bool s_socket_woke = false;
26
28 // Use esp_delay with a callback that checks if socket data arrived.
29 // This allows the delay to exit early when socket_wake() is called by
30 // lwip recv_fn/accept_fn callbacks, reducing socket latency.
31 //
32 // When ms is 0, we must use delay(0) because esp_delay(0, callback)
33 // exits immediately without yielding, which can cause watchdog timeouts
34 // when the main loop runs in high-frequency mode (e.g., during light effects).
35 if (ms == 0) {
36 delay(0);
37 return;
38 }
39 s_socket_woke = false;
40 esp_delay(ms, []() { return !s_socket_woke; });
41}
42
43void IRAM_ATTR socket_wake() {
44 s_socket_woke = true;
45 esp_schedule();
46}
47#elif defined(USE_RP2040)
48// RP2040 (non-FreeRTOS) socket wake using hardware WFE/SEV instructions.
49//
50// Same pattern as ESP8266's esp_delay()/esp_schedule(): set a one-shot timer,
51// then sleep with __wfe(). Wake on either:
52// - Timer alarm fires → callback calls __sev() → __wfe() returns → timeout
53// - Socket data arrives → LWIP callback calls socket_wake() → __sev() → __wfe() returns → early wake
54//
55// CYW43 WiFi chip communicates via SPI interrupts on core 0. When data arrives,
56// the GPIO interrupt fires → async_context pendsv processes CYW43/LWIP → recv/accept
57// callbacks call socket_wake() → __sev() wakes the main loop from __wfe() sleep.
58// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
59static volatile bool s_socket_woke = false;
60// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
61static volatile bool s_delay_expired = false;
62
63static int64_t alarm_callback(alarm_id_t id, void *user_data) {
64 (void) id;
65 (void) user_data;
66 s_delay_expired = true;
67 // Wake the main loop from __wfe() sleep — timeout expired.
68 __sev();
69 // Return 0 = don't reschedule (one-shot)
70 return 0;
71}
72
73void socket_delay(uint32_t ms) {
74 if (ms == 0) {
75 yield();
76 return;
77 }
78 // If a wake was already signalled, consume it and return immediately
79 // instead of going to sleep. This avoids losing a wake that arrived
80 // between loop iterations.
81 if (s_socket_woke) {
82 s_socket_woke = false;
83 return;
84 }
85 // Don't clear s_socket_woke here — if an IRQ fires between the check above
86 // and the while loop below, the while condition sees it immediately. Clearing
87 // here would lose that wake and sleep until the timer fires.
88 s_delay_expired = false;
89 // Set a one-shot timer to wake us after the timeout.
90 // add_alarm_in_ms returns >0 on success, 0 if time already passed, <0 on error.
91 alarm_id_t alarm = add_alarm_in_ms(ms, alarm_callback, nullptr, true);
92 if (alarm <= 0) {
93 delay(ms);
94 return;
95 }
96 // Sleep until woken by either the timer alarm or socket_wake().
97 // __wfe() may return spuriously (stale event register, other interrupts),
98 // so we loop checking both flags.
99 while (!s_socket_woke && !s_delay_expired) {
100 __wfe();
101 }
102 // Cancel timer if we woke early (socket data arrived before timeout)
103 if (!s_delay_expired)
104 cancel_alarm(alarm);
105 s_socket_woke = false; // consume the wake for next call
106}
107
108// No IRAM_ATTR equivalent needed: on RP2040, CYW43 async_context runs LWIP
109// callbacks via pendsv (not hard IRQ), so they execute from flash safely.
110void socket_wake() {
111 s_socket_woke = true;
112 // Wake the main loop from __wfe() sleep. __sev() is a global event that
113 // wakes any core sleeping in __wfe(). This is ISR-safe.
114 __sev();
115}
116#endif
117
118// ---- LWIP thread safety ----
119//
120// On RP2040 (Pico W), arduino-pico sets PICO_CYW43_ARCH_THREADSAFE_BACKGROUND=1.
121// This means lwip callbacks (recv_fn, accept_fn, err_fn) run from a low-priority
122// user IRQ context, not the main loop (see low_priority_irq_handler() in pico-sdk
123// async_context_threadsafe_background.c). They can preempt main-loop code at any point.
124//
125// Without locking, this causes race conditions between recv_fn and read() on the
126// shared rx_buf_ pbuf chain — recv_fn calls pbuf_cat() while read() is freeing
127// nodes, leading to use-after-free and infinite-loop crashes. See esphome#10681.
128//
129// On ESP8266, lwip callbacks run from the SYS context which cooperates with user
130// code (CONT context) — they never preempt each other, so no locking is needed.
131//
132// esphome::LwIPLock is the platform-provided RAII guard (see helpers.h/helpers.cpp).
133// On RP2040, it acquires cyw43_arch_lwip_begin/end. On ESP8266, it's a no-op.
134#define LWIP_LOCK() esphome::LwIPLock lwip_lock_guard // NOLINT
135
136static const char *const TAG = "socket.lwip";
137
138// set to 1 to enable verbose lwip logging
139#if 0 // NOLINT(readability-avoid-unconditional-preprocessor-if)
140#define LWIP_LOG(msg, ...) ESP_LOGVV(TAG, "socket %p: " msg, this, ##__VA_ARGS__)
141#else
142#define LWIP_LOG(msg, ...)
143#endif
144
145// Clear arg, recv, and err callbacks, then abort a connected PCB.
146// Only valid for full tcp_pcb (not tcp_pcb_listen).
147// Must be called before destroying the object that tcp_arg points to —
148// tcp_abort() triggers the err callback synchronously, which would
149// otherwise call back into a partially-destroyed object.
150// tcp_sent/tcp_poll are not cleared because this implementation
151// never registers them.
152static void pcb_detach_abort(struct tcp_pcb *pcb) {
153 tcp_arg(pcb, nullptr);
154 tcp_recv(pcb, nullptr);
155 tcp_err(pcb, nullptr);
156 tcp_abort(pcb);
157}
158
159// Clear arg, recv, and err callbacks, then gracefully close a connected PCB.
160// Only valid for full tcp_pcb (not tcp_pcb_listen).
161// After tcp_close(), the PCB remains alive during the TCP close handshake
162// (FIN_WAIT, TIME_WAIT states). Without clearing callbacks first, LWIP
163// would call recv/err on a destroyed socket object, corrupting the heap.
164// tcp_sent/tcp_poll are not cleared because this implementation
165// never registers them.
166// Returns ERR_OK on success; on failure the PCB is aborted instead.
167static err_t pcb_detach_close(struct tcp_pcb *pcb) {
168 tcp_arg(pcb, nullptr);
169 tcp_recv(pcb, nullptr);
170 tcp_err(pcb, nullptr);
171 err_t err = tcp_close(pcb);
172 if (err != ERR_OK) {
173 tcp_abort(pcb);
174 }
175 return err;
176}
177
178// ---- LWIPRawCommon methods ----
179
181 LWIP_LOCK();
182 if (this->pcb_ != nullptr) {
183 LWIP_LOG("tcp_abort(%p)", this->pcb_);
184 pcb_detach_abort(this->pcb_);
185 this->pcb_ = nullptr;
186 }
187}
188
189int LWIPRawCommon::bind(const struct sockaddr *name, socklen_t addrlen) {
190 LWIP_LOCK();
191 if (this->pcb_ == nullptr) {
192 errno = EBADF;
193 return -1;
194 }
195 if (name == nullptr) {
196 errno = EINVAL;
197 return -1;
198 }
199 ip_addr_t ip;
200 in_port_t port;
201#if LWIP_IPV6
202 if (this->family_ == AF_INET) {
203 if (addrlen < sizeof(sockaddr_in)) {
204 errno = EINVAL;
205 return -1;
206 }
207 auto *addr4 = reinterpret_cast<const sockaddr_in *>(name);
208 port = ntohs(addr4->sin_port);
209 ip.type = IPADDR_TYPE_V4;
210 ip.u_addr.ip4.addr = addr4->sin_addr.s_addr;
211 LWIP_LOG("tcp_bind(%p ip=%s port=%u)", this->pcb_, ip4addr_ntoa(&ip.u_addr.ip4), port);
212 } else if (this->family_ == AF_INET6) {
213 if (addrlen < sizeof(sockaddr_in6)) {
214 errno = EINVAL;
215 return -1;
216 }
217 auto *addr6 = reinterpret_cast<const sockaddr_in6 *>(name);
218 port = ntohs(addr6->sin6_port);
219 ip.type = IPADDR_TYPE_ANY;
220 memcpy(&ip.u_addr.ip6.addr, &addr6->sin6_addr.un.u8_addr, 16);
221 LWIP_LOG("tcp_bind(%p ip=%s port=%u)", this->pcb_, ip6addr_ntoa(&ip.u_addr.ip6), port);
222 } else {
223 errno = EINVAL;
224 return -1;
225 }
226#else
227 if (this->family_ != AF_INET) {
228 errno = EINVAL;
229 return -1;
230 }
231 auto *addr4 = reinterpret_cast<const sockaddr_in *>(name);
232 port = ntohs(addr4->sin_port);
233 ip.addr = addr4->sin_addr.s_addr;
234 LWIP_LOG("tcp_bind(%p ip=%u port=%u)", this->pcb_, ip.addr, port);
235#endif
236 err_t err = tcp_bind(this->pcb_, &ip, port);
237 if (err == ERR_USE) {
238 LWIP_LOG(" -> err ERR_USE");
239 errno = EADDRINUSE;
240 return -1;
241 }
242 if (err == ERR_VAL) {
243 LWIP_LOG(" -> err ERR_VAL");
244 errno = EINVAL;
245 return -1;
246 }
247 if (err != ERR_OK) {
248 LWIP_LOG(" -> err %d", err);
249 errno = EIO;
250 return -1;
251 }
252 return 0;
253}
254
256 LWIP_LOCK();
257 if (this->pcb_ == nullptr) {
258 errno = ECONNRESET;
259 return -1;
260 }
261 LWIP_LOG("tcp_close(%p)", this->pcb_);
262 err_t err = pcb_detach_close(this->pcb_);
263 this->pcb_ = nullptr;
264 if (err != ERR_OK) {
265 LWIP_LOG(" -> err %d", err);
266 errno = err == ERR_MEM ? ENOMEM : EIO;
267 return -1;
268 }
269 return 0;
270}
271
273 LWIP_LOCK();
274 if (this->pcb_ == nullptr) {
275 errno = ECONNRESET;
276 return -1;
277 }
278 bool shut_rx = false, shut_tx = false;
279 if (how == SHUT_RD) {
280 shut_rx = true;
281 } else if (how == SHUT_WR) {
282 shut_tx = true;
283 } else if (how == SHUT_RDWR) {
284 shut_rx = shut_tx = true;
285 } else {
286 errno = EINVAL;
287 return -1;
288 }
289 LWIP_LOG("tcp_shutdown(%p shut_rx=%d shut_tx=%d)", this->pcb_, shut_rx ? 1 : 0, shut_tx ? 1 : 0);
290 err_t err = tcp_shutdown(this->pcb_, shut_rx, shut_tx);
291 if (err != ERR_OK) {
292 LWIP_LOG(" -> err %d", err);
293 errno = err == ERR_MEM ? ENOMEM : EIO;
294 return -1;
295 }
296 return 0;
297}
298
299int LWIPRawCommon::getpeername(struct sockaddr *name, socklen_t *addrlen) {
300 LWIP_LOCK();
301 if (this->pcb_ == nullptr) {
302 errno = ECONNRESET;
303 return -1;
304 }
305 if (name == nullptr || addrlen == nullptr) {
306 errno = EINVAL;
307 return -1;
308 }
309 return this->ip2sockaddr_(&this->pcb_->remote_ip, this->pcb_->remote_port, name, addrlen);
310}
311
312int LWIPRawCommon::getsockname(struct sockaddr *name, socklen_t *addrlen) {
313 LWIP_LOCK();
314 if (this->pcb_ == nullptr) {
315 errno = ECONNRESET;
316 return -1;
317 }
318 if (name == nullptr || addrlen == nullptr) {
319 errno = EINVAL;
320 return -1;
321 }
322 return this->ip2sockaddr_(&this->pcb_->local_ip, this->pcb_->local_port, name, addrlen);
323}
324
325size_t LWIPRawCommon::getpeername_to(std::span<char, SOCKADDR_STR_LEN> buf) {
326 struct sockaddr_storage storage;
327 socklen_t len = sizeof(storage);
328 if (this->getpeername(reinterpret_cast<struct sockaddr *>(&storage), &len) != 0) {
329 buf[0] = '\0';
330 return 0;
331 }
332 return format_sockaddr_to(reinterpret_cast<struct sockaddr *>(&storage), len, buf);
333}
334
335size_t LWIPRawCommon::getsockname_to(std::span<char, SOCKADDR_STR_LEN> buf) {
336 struct sockaddr_storage storage;
337 socklen_t len = sizeof(storage);
338 if (this->getsockname(reinterpret_cast<struct sockaddr *>(&storage), &len) != 0) {
339 buf[0] = '\0';
340 return 0;
341 }
342 return format_sockaddr_to(reinterpret_cast<struct sockaddr *>(&storage), len, buf);
343}
344
345int LWIPRawCommon::getsockopt(int level, int optname, void *optval, socklen_t *optlen) {
346 LWIP_LOCK();
347 if (this->pcb_ == nullptr) {
348 errno = ECONNRESET;
349 return -1;
350 }
351 if (optlen == nullptr || optval == nullptr) {
352 errno = EINVAL;
353 return -1;
354 }
355 if (level == SOL_SOCKET && optname == SO_REUSEADDR) {
356 if (*optlen < 4) {
357 errno = EINVAL;
358 return -1;
359 }
360 // lwip doesn't seem to have this feature. Don't send an error
361 // to prevent warnings
362 *reinterpret_cast<int *>(optval) = 1;
363 *optlen = 4;
364 return 0;
365 }
366 if (level == SOL_SOCKET && optname == SO_RCVTIMEO) {
367 if (*optlen < sizeof(struct timeval)) {
368 errno = EINVAL;
369 return -1;
370 }
371 uint32_t ms = this->recv_timeout_cs_ * 10;
372 auto *tv = reinterpret_cast<struct timeval *>(optval);
373 tv->tv_sec = ms / 1000;
374 tv->tv_usec = (ms % 1000) * 1000;
375 *optlen = sizeof(struct timeval);
376 return 0;
377 }
378 if (level == IPPROTO_TCP && optname == TCP_NODELAY) {
379 if (*optlen < 4) {
380 errno = EINVAL;
381 return -1;
382 }
383 *reinterpret_cast<int *>(optval) = this->nodelay_;
384 *optlen = 4;
385 return 0;
386 }
387
388 errno = EINVAL;
389 return -1;
390}
391
392int LWIPRawCommon::setsockopt(int level, int optname, const void *optval, socklen_t optlen) {
393 LWIP_LOCK();
394 if (this->pcb_ == nullptr) {
395 errno = ECONNRESET;
396 return -1;
397 }
398 if (level == SOL_SOCKET && optname == SO_REUSEADDR) {
399 if (optlen != 4) {
400 errno = EINVAL;
401 return -1;
402 }
403 // lwip doesn't seem to have this feature. Don't send an error
404 // to prevent warnings
405 return 0;
406 }
407 if (level == SOL_SOCKET && optname == SO_RCVTIMEO) {
408 if (optlen < sizeof(struct timeval)) {
409 errno = EINVAL;
410 return -1;
411 }
412 const auto *tv = reinterpret_cast<const struct timeval *>(optval);
413 uint32_t ms = tv->tv_sec * 1000 + tv->tv_usec / 1000;
414 uint32_t cs = (ms + 9) / 10; // round up to nearest centisecond
415 this->recv_timeout_cs_ = cs > 255 ? 255 : static_cast<uint8_t>(cs);
416 return 0;
417 }
418 if (level == SOL_SOCKET && optname == SO_SNDTIMEO) {
419 // Raw TCP writes are non-blocking (tcp_write), so send timeout is a no-op.
420 return 0;
421 }
422 if (level == IPPROTO_TCP && optname == TCP_NODELAY) {
423 if (optlen != 4) {
424 errno = EINVAL;
425 return -1;
426 }
427 int val = *reinterpret_cast<const int *>(optval);
428 this->nodelay_ = val;
429 return 0;
430 }
431
432 errno = EINVAL;
433 return -1;
434}
435
436int LWIPRawCommon::ip2sockaddr_(ip_addr_t *ip, uint16_t port, struct sockaddr *name, socklen_t *addrlen) {
437 if (this->family_ == AF_INET) {
438 if (*addrlen < sizeof(struct sockaddr_in)) {
439 errno = EINVAL;
440 return -1;
441 }
442
443 struct sockaddr_in *addr = reinterpret_cast<struct sockaddr_in *>(name);
444 addr->sin_family = AF_INET;
445 *addrlen = addr->sin_len = sizeof(struct sockaddr_in);
446 addr->sin_port = port;
447 inet_addr_from_ip4addr(&addr->sin_addr, ip_2_ip4(ip));
448 return 0;
449 }
450#if LWIP_IPV6
451 else if (this->family_ == AF_INET6) {
452 if (*addrlen < sizeof(struct sockaddr_in6)) {
453 errno = EINVAL;
454 return -1;
455 }
456
457 struct sockaddr_in6 *addr = reinterpret_cast<struct sockaddr_in6 *>(name);
458 addr->sin6_family = AF_INET6;
459 *addrlen = addr->sin6_len = sizeof(struct sockaddr_in6);
460 addr->sin6_port = port;
461
462 // AF_INET6 sockets are bound to IPv4 as well, so we may encounter IPv4 addresses that must be converted to IPv6.
463 if (IP_IS_V4(ip)) {
464 ip_addr_t mapped;
465 ip4_2_ipv4_mapped_ipv6(ip_2_ip6(&mapped), ip_2_ip4(ip));
466 inet6_addr_from_ip6addr(&addr->sin6_addr, ip_2_ip6(&mapped));
467 } else {
468 inet6_addr_from_ip6addr(&addr->sin6_addr, ip_2_ip6(ip));
469 }
470 return 0;
471 }
472#endif
473 return -1;
474}
475
476// ---- LWIPRawImpl methods ----
477
479 LWIP_LOCK();
480 // Free any received pbufs that LWIP transferred ownership of via recv_fn.
481 // tcp_abort() in the base destructor won't free these since LWIP considers
482 // ownership transferred once the recv callback accepts them.
483 if (this->rx_buf_ != nullptr) {
484 pbuf_free(this->rx_buf_);
485 this->rx_buf_ = nullptr;
486 }
487 // Base class destructor handles pcb_ cleanup via tcp_abort
488}
489
490void LWIPRawImpl::init(struct pbuf *initial_rx, bool initial_rx_closed) {
491 LWIP_LOCK();
492 LWIP_LOG("init(%p)", this->pcb_);
493 tcp_arg(this->pcb_, this);
494 tcp_recv(this->pcb_, LWIPRawImpl::s_recv_fn);
495 tcp_err(this->pcb_, LWIPRawImpl::s_err_fn);
496 if (initial_rx != nullptr) {
497 this->rx_buf_ = initial_rx;
498 this->rx_buf_offset_ = 0;
499 }
500 this->rx_closed_ = initial_rx_closed;
501}
502
503void LWIPRawImpl::s_err_fn(void *arg, err_t err) {
504 // LWIP CALLBACK — runs from IRQ context on RP2040 (low-priority user IRQ).
505 // No heap allocation allowed — malloc is not IRQ-safe (see #14687).
506 // No LWIP_LOCK() needed — lwip core already holds the async_context lock.
507 //
508 // pcb is already freed when this callback is called
509 // ERR_RST: connection was reset by remote host
510 // ERR_ABRT: aborted through tcp_abort or TCP timer
511 auto *arg_this = reinterpret_cast<LWIPRawImpl *>(arg);
512 ESP_LOGVV(TAG, "socket %p: err(err=%d)", arg_this, err);
513 arg_this->pcb_ = nullptr;
514}
515
516err_t LWIPRawImpl::s_recv_fn(void *arg, struct tcp_pcb *pcb, struct pbuf *pb, err_t err) {
517 auto *arg_this = reinterpret_cast<LWIPRawImpl *>(arg);
518 return arg_this->recv_fn(pb, err);
519}
520
521err_t LWIPRawImpl::recv_fn(struct pbuf *pb, err_t err) {
522 // LWIP CALLBACK — runs from IRQ context on RP2040 (low-priority user IRQ).
523 // No heap allocation allowed — malloc is not IRQ-safe (see #14687).
524 LWIP_LOG("recv(pb=%p err=%d)", pb, err);
525 if (err != 0) {
526 // "An error code if there has been an error receiving Only return ERR_ABRT if you have
527 // called tcp_abort from within the callback function!"
528 if (pb != nullptr) {
529 pbuf_free(pb);
530 }
531 this->rx_closed_ = true;
532 return ERR_OK;
533 }
534 if (pb == nullptr) {
535 this->rx_closed_ = true;
536 return ERR_OK;
537 }
538 if (this->rx_buf_ == nullptr) {
539 // no need to copy because lwIP gave control of it to us
540 this->rx_buf_ = pb;
541 this->rx_buf_offset_ = 0;
542 } else {
543 pbuf_cat(this->rx_buf_, pb);
544 }
545#if (defined(USE_ESP8266) || defined(USE_RP2040))
546 // Wake the main loop immediately so it can process the received data.
547 socket_wake();
548#endif
549 return ERR_OK;
550}
551
553 // Wait for data without holding LWIP_LOCK so recv_fn() can run on RP2040
554 // (needs async_context lock).
555 //
556 // Loop until data arrives, connection closes, or the full timeout elapses.
557 // socket_delay() may return early due to other sockets waking the global
558 // socket_wake() flag, so we re-enter for the remaining time.
559 uint32_t timeout_ms = this->recv_timeout_cs_ * 10;
560 uint32_t start = millis();
561 while (this->waiting_for_data_()) {
562 uint32_t elapsed = millis() - start;
563 if (elapsed >= timeout_ms)
564 break;
565 socket_delay(timeout_ms - elapsed);
566 }
567}
568
570 // Caller must hold LWIP_LOCK. Copies available data from rx_buf_ into buf.
571 if (this->pcb_ == nullptr) {
572 errno = ECONNRESET;
573 return -1;
574 }
575 if (this->rx_closed_ && this->rx_buf_ == nullptr) {
576 return 0;
577 }
578 if (len == 0) {
579 return 0;
580 }
581 if (this->rx_buf_ == nullptr) {
582 errno = EWOULDBLOCK;
583 return -1;
584 }
585
586 size_t read = 0;
587 uint8_t *buf8 = reinterpret_cast<uint8_t *>(buf);
588 while (len && this->rx_buf_ != nullptr) {
589 size_t pb_len = this->rx_buf_->len;
590 size_t pb_left = pb_len - this->rx_buf_offset_;
591 if (pb_left == 0)
592 break;
593 size_t copysize = std::min(len, pb_left);
594 memcpy(buf8, reinterpret_cast<uint8_t *>(this->rx_buf_->payload) + this->rx_buf_offset_, copysize);
595
596 if (pb_left == copysize) {
597 // full pb copied, free it
598 if (this->rx_buf_->next == nullptr) {
599 // last buffer in chain
600 pbuf_free(this->rx_buf_);
601 this->rx_buf_ = nullptr;
602 this->rx_buf_offset_ = 0;
603 } else {
604 auto *old_buf = this->rx_buf_;
605 this->rx_buf_ = this->rx_buf_->next;
606 pbuf_ref(this->rx_buf_);
607 pbuf_free(old_buf);
608 this->rx_buf_offset_ = 0;
609 }
610 } else {
611 this->rx_buf_offset_ += copysize;
612 }
613 LWIP_LOG("tcp_recved(%p %u)", this->pcb_, copysize);
614 tcp_recved(this->pcb_, copysize);
615
616 buf8 += copysize;
617 len -= copysize;
618 read += copysize;
619 }
620
621 if (read == 0) {
622 errno = EWOULDBLOCK;
623 return -1;
624 }
625
626 return read;
627}
628
629ssize_t LWIPRawImpl::read(void *buf, size_t len) {
630 // See waiting_for_data_() for safety of unlocked reads.
631 if (this->recv_timeout_cs_ > 0 && this->waiting_for_data_()) {
632 this->wait_for_data_();
633 }
634
635 LWIP_LOCK();
636 return this->read_locked_(buf, len);
637}
638
639ssize_t LWIPRawImpl::readv(const struct iovec *iov, int iovcnt) {
640 // See waiting_for_data_() for safety of unlocked reads.
641 if (this->recv_timeout_cs_ > 0 && this->waiting_for_data_()) {
642 this->wait_for_data_();
643 }
644
645 LWIP_LOCK(); // Hold for entire scatter-gather operation
646 ssize_t ret = 0;
647 for (int i = 0; i < iovcnt; i++) {
648 ssize_t err = this->read_locked_(reinterpret_cast<uint8_t *>(iov[i].iov_base), iov[i].iov_len);
649 if (err == -1) {
650 if (ret != 0) {
651 // if we already read some don't return an error
652 break;
653 }
654 return err;
655 }
656 ret += err;
657 if ((size_t) err != iov[i].iov_len)
658 break;
659 }
660 return ret;
661}
662
663ssize_t LWIPRawImpl::internal_write_(const void *buf, size_t len) {
664 LWIP_LOCK();
665 if (this->pcb_ == nullptr) {
666 errno = ECONNRESET;
667 return -1;
668 }
669 if (len == 0)
670 return 0;
671 if (buf == nullptr) {
672 errno = EINVAL;
673 return 0;
674 }
675 auto space = tcp_sndbuf(this->pcb_);
676 if (space == 0) {
677 errno = EWOULDBLOCK;
678 return -1;
679 }
680 size_t to_send = std::min((size_t) space, len);
681 LWIP_LOG("tcp_write(%p buf=%p %u)", this->pcb_, buf, to_send);
682 err_t err = tcp_write(this->pcb_, buf, to_send, TCP_WRITE_FLAG_COPY);
683 if (err == ERR_MEM) {
684 LWIP_LOG(" -> err ERR_MEM");
685 errno = EWOULDBLOCK;
686 return -1;
687 }
688 if (err != ERR_OK) {
689 LWIP_LOG(" -> err %d", err);
690 errno = ECONNRESET;
691 return -1;
692 }
693 return to_send;
694}
695
697 LWIP_LOCK();
698 if (this->pcb_ == nullptr) {
699 errno = ECONNRESET;
700 return -1;
701 }
702 LWIP_LOG("tcp_output(%p)", this->pcb_);
703 err_t err = tcp_output(this->pcb_);
704 if (err == ERR_ABRT) {
705 // sometimes lwip returns ERR_ABRT for no apparent reason
706 // the connection works fine afterwards, and back with ESPAsyncTCP we
707 // indirectly also ignored this error
708 // FIXME: figure out where this is returned and what it means in this context
709 LWIP_LOG(" -> err ERR_ABRT");
710 return 0;
711 }
712 if (err != ERR_OK) {
713 LWIP_LOG(" -> err %d", err);
714 errno = ECONNRESET;
715 return -1;
716 }
717 return 0;
718}
719
720ssize_t LWIPRawImpl::write(const void *buf, size_t len) {
721 LWIP_LOCK(); // Hold for write + optional output
722 ssize_t written = this->internal_write_(buf, len);
723 if (written == -1)
724 return -1;
725 if (written == 0) {
726 // no need to output if nothing written
727 return 0;
728 }
729 if (this->nodelay_) {
730 int err = this->internal_output_();
731 if (err == -1)
732 return -1;
733 }
734 return written;
735}
736
737ssize_t LWIPRawImpl::writev(const struct iovec *iov, int iovcnt) {
738 LWIP_LOCK(); // Hold for entire scatter-gather operation
739 ssize_t written = 0;
740 for (int i = 0; i < iovcnt; i++) {
741 ssize_t err = this->internal_write_(reinterpret_cast<uint8_t *>(iov[i].iov_base), iov[i].iov_len);
742 if (err == -1) {
743 if (written != 0) {
744 // if we already read some don't return an error
745 break;
746 }
747 return err;
748 }
749 written += err;
750 if ((size_t) err != iov[i].iov_len)
751 break;
752 }
753 if (written == 0) {
754 // no need to output if nothing written
755 return 0;
756 }
757 if (this->nodelay_) {
758 int err = this->internal_output_();
759 if (err == -1)
760 return -1;
761 }
762 return written;
763}
764
765// ---- LWIPRawListenImpl methods ----
766
768 LWIP_LOCK();
769 // Abort any queued PCBs that were never accepted by the main loop.
770 for (uint8_t i = 0; i < this->accepted_socket_count_; i++) {
771 auto &entry = this->accepted_pcbs_[i];
772 if (entry.pcb != nullptr) {
773 pcb_detach_abort(entry.pcb);
774 entry.pcb = nullptr;
775 }
776 if (entry.rx_buf != nullptr) {
777 pbuf_free(entry.rx_buf);
778 entry.rx_buf = nullptr;
779 }
780 }
781 this->accepted_socket_count_ = 0;
782 // Listen PCBs must use tcp_close(), not tcp_abort().
783 // tcp_abandon() asserts pcb->state != LISTEN and would access
784 // fields that don't exist in the smaller tcp_pcb_listen struct.
785 // Don't use pcb_detach_close() here — tcp_recv()/tcp_err() also access
786 // fields that only exist in the full tcp_pcb, not tcp_pcb_listen.
787 // tcp_close() on a listen PCB is synchronous (frees immediately),
788 // so there are no async callbacks to worry about.
789 // Close here and null pcb_ so the base destructor skips tcp_abort.
790 if (this->pcb_ != nullptr) {
791 tcp_close(this->pcb_);
792 this->pcb_ = nullptr;
793 }
794}
795
797 LWIP_LOCK();
798 LWIP_LOG("init(%p)", this->pcb_);
799 tcp_arg(this->pcb_, this);
800 tcp_accept(this->pcb_, LWIPRawListenImpl::s_accept_fn);
801 tcp_err(this->pcb_, LWIPRawListenImpl::s_err_fn);
802}
803
804void LWIPRawListenImpl::s_err_fn(void *arg, err_t err) {
805 // LWIP CALLBACK — runs from IRQ context on RP2040 (low-priority user IRQ).
806 // No heap allocation allowed — malloc is not IRQ-safe (see #14687).
807 auto *arg_this = reinterpret_cast<LWIPRawListenImpl *>(arg);
808 ESP_LOGVV(TAG, "socket %p: err(err=%d)", arg_this, err);
809 arg_this->pcb_ = nullptr;
810}
811
812void LWIPRawListenImpl::s_queued_err_fn(void *arg, err_t err) {
813 // LWIP CALLBACK — runs from IRQ context on RP2040 (low-priority user IRQ).
814 // No heap allocation allowed — malloc is not IRQ-safe (see #14687).
815 // Called when a queued (not yet accepted) PCB errors — e.g., remote sent RST.
816 // The PCB is already freed by lwip. Null our pointer so accept() skips it.
817 (void) err;
818 auto *entry = reinterpret_cast<QueuedPcb *>(arg);
819 entry->pcb = nullptr;
820 // Don't free rx_buf here — accept() will clean it up when it sees pcb==nullptr
821}
822
823err_t LWIPRawListenImpl::s_queued_recv_fn(void *arg, struct tcp_pcb *pcb, struct pbuf *pb, err_t err) {
824 // LWIP CALLBACK — runs from IRQ context on RP2040 (low-priority user IRQ).
825 // No heap allocation allowed — malloc is not IRQ-safe (see #14687).
826 // Temporary recv callback for PCBs queued between accept_fn_ and accept().
827 // Without this, lwip's default tcp_recv_null handler would ACK and drop the data,
828 // causing the API handshake to silently fail (client sends Hello, server never sees it).
829 (void) pcb;
830 auto *entry = reinterpret_cast<QueuedPcb *>(arg);
831 if (pb == nullptr || err != ERR_OK) {
832 // Remote closed or error
833 if (pb != nullptr) {
834 pbuf_free(pb);
835 }
836 entry->rx_closed = true;
837 return ERR_OK;
838 }
839 // Buffer the data — tcp_recved() is deferred to read() after accept() creates the socket.
840 if (entry->rx_buf == nullptr) {
841 entry->rx_buf = pb;
842 } else {
843 pbuf_cat(entry->rx_buf, pb);
844 }
845 return ERR_OK;
846}
847
848err_t LWIPRawListenImpl::s_accept_fn(void *arg, struct tcp_pcb *newpcb, err_t err) {
849 auto *arg_this = reinterpret_cast<LWIPRawListenImpl *>(arg);
850 return arg_this->accept_fn_(newpcb, err);
851}
852
853std::unique_ptr<LWIPRawImpl> LWIPRawListenImpl::accept(struct sockaddr *addr, socklen_t *addrlen) {
854 LWIP_LOCK();
855 if (this->pcb_ == nullptr) {
856 errno = EBADF;
857 return nullptr;
858 }
859 // Dequeue front entry, skipping any null entries (PCBs freed by lwip while queued).
860 // The error callback nulled their pcb pointers; clean up buffered data and discard.
861 while (this->accepted_socket_count_ > 0) {
862 QueuedPcb entry = this->accepted_pcbs_[0];
863 // Shift remaining entries forward, updating tcp_arg pointers as we go.
864 // Safe because we hold LWIP_LOCK, so err/recv callbacks can't fire during the update.
865 for (uint8_t i = 1; i < this->accepted_socket_count_; i++) {
866 this->accepted_pcbs_[i - 1] = this->accepted_pcbs_[i];
867 if (this->accepted_pcbs_[i - 1].pcb != nullptr) {
868 tcp_arg(this->accepted_pcbs_[i - 1].pcb, &this->accepted_pcbs_[i - 1]);
869 }
870 }
871 this->accepted_pcbs_[this->accepted_socket_count_ - 1] = {};
872 this->accepted_socket_count_--;
873 if (entry.pcb == nullptr) {
874 // PCB was freed by lwip (RST/timeout) while queued — discard and try next
875 if (entry.rx_buf != nullptr) {
876 pbuf_free(entry.rx_buf);
877 }
878 continue;
879 }
880 LWIP_LOG("Connection accepted by application, queue size: %d", this->accepted_socket_count_);
881 // Create socket wrapper on the main loop (not in accept callback) to avoid
882 // heap allocation in IRQ context on RP2040. Transfer any data received while queued.
883 auto sock = make_unique<LWIPRawImpl>(this->family_, entry.pcb);
884 sock->init(entry.rx_buf, entry.rx_closed);
885 if (addr != nullptr) {
886 sock->getpeername(addr, addrlen);
887 }
888 LWIP_LOG("accept(%p)", sock.get());
889 return sock;
890 }
891 errno = EWOULDBLOCK;
892 return nullptr;
893}
894
896 LWIP_LOCK();
897 if (this->pcb_ == nullptr) {
898 errno = EBADF;
899 return -1;
900 }
901 LWIP_LOG("tcp_listen_with_backlog(%p backlog=%d)", this->pcb_, backlog);
902 struct tcp_pcb *listen_pcb = tcp_listen_with_backlog(this->pcb_, backlog);
903 if (listen_pcb == nullptr) {
904 tcp_abort(this->pcb_);
905 this->pcb_ = nullptr;
906 errno = EOPNOTSUPP;
907 return -1;
908 }
909 // tcp_listen reallocates the pcb, replace ours
910 this->pcb_ = listen_pcb;
911 // set callbacks on new pcb
912 LWIP_LOG("tcp_arg(%p)", this->pcb_);
913 tcp_arg(this->pcb_, this);
914 tcp_accept(this->pcb_, LWIPRawListenImpl::s_accept_fn);
915 // Note: tcp_err() is NOT re-registered here. tcp_listen_with_backlog() converts the
916 // full tcp_pcb to a smaller tcp_pcb_listen struct that lacks the errf field.
917 // Calling tcp_err() on a listen PCB writes past the struct boundary (undefined behavior).
918 return 0;
919}
920
921err_t LWIPRawListenImpl::accept_fn_(struct tcp_pcb *newpcb, err_t err) {
922 // LWIP CALLBACK — runs from IRQ context on RP2040 (low-priority user IRQ).
923 // No heap allocation allowed — malloc is not IRQ-safe (see #14687).
924 LWIP_LOG("accept(newpcb=%p err=%d)", newpcb, err);
925 if (err != ERR_OK || newpcb == nullptr) {
926 // "An error code if there has been an error accepting. Only return ERR_ABRT if you have
927 // called tcp_abort from within the callback function!"
928 // https://www.nongnu.org/lwip/2_1_x/tcp_8h.html#a00517abce6856d6c82f0efebdafb734d
929 // nothing to do here, we just don't push it to the queue
930 return ERR_OK;
931 }
932 // Check if we've reached the maximum accept queue size
933 if (this->accepted_socket_count_ >= MAX_ACCEPTED_SOCKETS) {
934 LWIP_LOG("Rejecting connection, queue full (%d)", this->accepted_socket_count_);
935 // Abort the connection when queue is full
936 tcp_abort(newpcb);
937 // Must return ERR_ABRT since we called tcp_abort()
938 return ERR_ABRT;
939 }
940 // Store the raw PCB — LWIPRawImpl creation is deferred to the main-loop accept().
941 // This avoids heap allocation in this callback, which is unsafe from IRQ context on RP2040.
942 uint8_t idx = this->accepted_socket_count_++;
943 this->accepted_pcbs_[idx] = {newpcb, nullptr, false};
944 // Register temporary callbacks so that while the PCB is queued:
945 // - err: nulls our pointer if the connection errors (RST, timeout)
946 // - recv: buffers any data that arrives before accept() creates the LWIPRawImpl
947 // (without this, lwip's default tcp_recv_null would ACK and drop the data)
948 // tcp_arg points to our queue entry; accept() updates these pointers after shifting.
949 tcp_arg(newpcb, &this->accepted_pcbs_[idx]);
950 tcp_err(newpcb, LWIPRawListenImpl::s_queued_err_fn);
951 tcp_recv(newpcb, LWIPRawListenImpl::s_queued_recv_fn);
952 LWIP_LOG("Accepted connection, queue size: %d", this->accepted_socket_count_);
953#if (defined(USE_ESP8266) || defined(USE_RP2040))
954 // Wake the main loop immediately so it can accept the new connection.
955 socket_wake();
956#endif
957 return ERR_OK;
958}
959
960// ---- Factory functions ----
961
962std::unique_ptr<Socket> socket(int domain, int type, int protocol) {
963 if (type != SOCK_STREAM) {
964 ESP_LOGE(TAG, "UDP sockets not supported on this platform, use WiFiUDP");
965 errno = EPROTOTYPE;
966 return nullptr;
967 }
968 LWIP_LOCK();
969 auto *pcb = tcp_new();
970 if (pcb == nullptr)
971 return nullptr;
972 auto *sock = new LWIPRawImpl((sa_family_t) domain, pcb); // NOLINT(cppcoreguidelines-owning-memory)
973 sock->init();
974 return std::unique_ptr<Socket>{sock};
975}
976
977std::unique_ptr<Socket> socket_loop_monitored(int domain, int type, int protocol) {
978 // LWIPRawImpl doesn't use file descriptors, so monitoring is not applicable
979 return socket(domain, type, protocol);
980}
981
982std::unique_ptr<ListenSocket> socket_listen(int domain, int type, int protocol) {
983 if (type != SOCK_STREAM) {
984 ESP_LOGE(TAG, "UDP sockets not supported on this platform, use WiFiUDP");
985 errno = EPROTOTYPE;
986 return nullptr;
987 }
988 LWIP_LOCK();
989 auto *pcb = tcp_new();
990 if (pcb == nullptr)
991 return nullptr;
992 auto *sock = new LWIPRawListenImpl((sa_family_t) domain, pcb); // NOLINT(cppcoreguidelines-owning-memory)
993 sock->init();
994 return std::unique_ptr<ListenSocket>{sock};
995}
996
997std::unique_ptr<ListenSocket> socket_listen_loop_monitored(int domain, int type, int protocol) {
998 // LWIPRawImpl doesn't use file descriptors, so monitoring is not applicable
999 return socket_listen(domain, type, protocol);
1000}
1001
1002#undef LWIP_LOCK
1003
1004} // namespace esphome::socket
1005
1006#endif // USE_SOCKET_IMPL_LWIP_TCP
int getsockname(struct sockaddr *name, socklen_t *addrlen)
size_t getsockname_to(std::span< char, SOCKADDR_STR_LEN > buf)
Format local address into a fixed-size buffer (no heap allocation)
int bind(const struct sockaddr *name, socklen_t addrlen)
int ip2sockaddr_(ip_addr_t *ip, uint16_t port, struct sockaddr *name, socklen_t *addrlen)
int setsockopt(int level, int optname, const void *optval, socklen_t optlen)
int getsockopt(int level, int optname, void *optval, socklen_t *optlen)
int getpeername(struct sockaddr *name, socklen_t *addrlen)
size_t getpeername_to(std::span< char, SOCKADDR_STR_LEN > buf)
Format peer address into a fixed-size buffer (no heap allocation)
Connected socket implementation for LWIP raw TCP.
ssize_t read_locked_(void *buf, size_t len)
static err_t s_recv_fn(void *arg, struct tcp_pcb *pcb, struct pbuf *pb, err_t err)
void init(struct pbuf *initial_rx=nullptr, bool initial_rx_closed=false)
ssize_t readv(const struct iovec *iov, int iovcnt)
static void s_err_fn(void *arg, err_t err)
err_t recv_fn(struct pbuf *pb, err_t err)
ssize_t internal_write_(const void *buf, size_t len)
ssize_t write(const void *buf, size_t len)
ssize_t read(void *buf, size_t len)
ssize_t writev(const struct iovec *iov, int iovcnt)
Listening socket implementation for LWIP raw TCP.
static void s_err_fn(void *arg, err_t err)
std::unique_ptr< LWIPRawImpl > accept(struct sockaddr *addr, socklen_t *addrlen)
uint16_t type
uint16_t in_port_t
Definition headers.h:60
uint32_t socklen_t
Definition headers.h:99
uint8_t sa_family_t
Definition headers.h:59
__int64 ssize_t
Definition httplib.h:178
in_addr ip_addr_t
Definition ip_address.h:22
mopeka_std_values val[3]
size_t format_sockaddr_to(const struct sockaddr *addr_ptr, socklen_t len, std::span< char, SOCKADDR_STR_LEN > buf)
Format sockaddr into caller-provided buffer, returns length written (excluding null)
Definition socket.cpp:53
std::unique_ptr< ListenSocket > socket_listen(int domain, int type, int protocol)
Create a listening socket of the given domain, type and protocol.
std::unique_ptr< ListenSocket > socket_listen_loop_monitored(int domain, int type, int protocol)
void IRAM_ATTR socket_wake()
Signal socket/IO activity and wake the main loop early.
std::unique_ptr< Socket > socket(int domain, int type, int protocol)
Create a socket of the given domain, type and protocol.
std::unique_ptr< Socket > socket_loop_monitored(int domain, int type, int protocol)
Create a socket and monitor it for data in the main loop.
void socket_delay(uint32_t ms)
Delay that can be woken early by socket activity.
std::string size_t len
Definition helpers.h:892
void HOT yield()
Definition core.cpp:25
void HOT delay(uint32_t ms)
Definition core.cpp:28
uint32_t IRAM_ATTR HOT millis()
Definition core.cpp:26
int written
Definition helpers.h:936
static void uint32_t
uint8_t sin6_len
Definition headers.h:75
in_port_t sin6_port
Definition headers.h:77
struct in6_addr sin6_addr
Definition headers.h:79
sa_family_t sin6_family
Definition headers.h:76
struct in_addr sin_addr
Definition headers.h:67
uint8_t sin_len
Definition headers.h:64
sa_family_t sin_family
Definition headers.h:65
in_port_t sin_port
Definition headers.h:66