ESPHome 2025.5.0
Loading...
Searching...
No Matches
audio_pipeline.cpp
Go to the documentation of this file.
1#include "audio_pipeline.h"
2
3#ifdef USE_ESP_IDF
4
6#include "esphome/core/hal.h"
8#include "esphome/core/log.h"
9
10namespace esphome {
11namespace speaker {
12
13static const uint32_t INITIAL_BUFFER_MS = 1000; // Start playback after buffering this duration of the file
14
15static const uint32_t READ_TASK_STACK_SIZE = 5 * 1024;
16static const uint32_t DECODE_TASK_STACK_SIZE = 3 * 1024;
17
18static const uint32_t INFO_ERROR_QUEUE_COUNT = 5;
19
20static const char *const TAG = "speaker_media_player.pipeline";
21
22enum EventGroupBits : uint32_t {
23 // MESSAGE_* bits are only set by their respective tasks
24
25 // Stops all activity in the pipeline elements; cleared by process_state() and set by stop() or by each task
27
28 // Read audio from an HTTP source; cleared by reader task and set by start_url
30 // Read audio from an audio file from the flash; cleared by reader task and set by start_file
32
33 // Audio file type is read after checking it is supported; cleared by decoder task
35 // Reader is done (either through a failure or just end of the stream); cleared by reader task
37 // Error reading the file; cleared by process_state()
39
40 // Decoder is done (either through a faiilure or the end of the stream); cleared by decoder task
42 // Error decoding the file; cleared by process_state() by decoder task
44};
45
46AudioPipeline::AudioPipeline(speaker::Speaker *speaker, size_t buffer_size, bool task_stack_in_psram,
47 std::string base_name, UBaseType_t priority)
48 : base_name_(std::move(base_name)),
49 priority_(priority),
50 task_stack_in_psram_(task_stack_in_psram),
51 speaker_(speaker),
52 buffer_size_(buffer_size) {
54 this->transfer_buffer_size_ = std::min(buffer_size_ / 4, DEFAULT_TRANSFER_BUFFER_SIZE);
55}
56
57void AudioPipeline::start_url(const std::string &uri) {
58 if (this->is_playing_) {
59 xEventGroupSetBits(this->event_group_, PIPELINE_COMMAND_STOP);
60 }
61 this->current_uri_ = uri;
62 this->pending_url_ = true;
63}
64
66 if (this->is_playing_) {
67 xEventGroupSetBits(this->event_group_, PIPELINE_COMMAND_STOP);
68 }
69 this->current_audio_file_ = audio_file;
70 this->pending_file_ = true;
71}
72
74 xEventGroupSetBits(this->event_group_, EventGroupBits::PIPELINE_COMMAND_STOP);
75
76 return ESP_OK;
77}
78void AudioPipeline::set_pause_state(bool pause_state) {
79 this->speaker_->set_pause_state(pause_state);
80
81 this->pause_state_ = pause_state;
82}
83
85 if (this->read_task_handle_ != nullptr) {
86 vTaskSuspend(this->read_task_handle_);
87 }
88 if (this->decode_task_handle_ != nullptr) {
89 vTaskSuspend(this->decode_task_handle_);
90 }
91}
92
94 if (this->read_task_handle_ != nullptr) {
95 vTaskResume(this->read_task_handle_);
96 }
97 if (this->decode_task_handle_ != nullptr) {
98 vTaskResume(this->decode_task_handle_);
99 }
100}
101
103 /*
104 * Log items from info error queue
105 */
106 InfoErrorEvent event;
107 if (this->info_error_queue_ != nullptr) {
108 while (xQueueReceive(this->info_error_queue_, &event, 0)) {
109 switch (event.source) {
111 if (event.err.has_value()) {
112 ESP_LOGE(TAG, "Media reader encountered an error: %s", esp_err_to_name(event.err.value()));
113 } else if (event.file_type.has_value()) {
114 ESP_LOGD(TAG, "Reading %s file type", audio_file_type_to_string(event.file_type.value()));
115 }
116
117 break;
119 if (event.err.has_value()) {
120 ESP_LOGE(TAG, "Decoder encountered an error: %s", esp_err_to_name(event.err.value()));
121 }
122
123 if (event.audio_stream_info.has_value()) {
124 ESP_LOGD(TAG, "Decoded audio has %d channels, %" PRId32 " Hz sample rate, and %d bits per sample",
125 event.audio_stream_info.value().get_channels(), event.audio_stream_info.value().get_sample_rate(),
126 event.audio_stream_info.value().get_bits_per_sample());
127 }
128
129 if (event.decoding_err.has_value()) {
130 switch (event.decoding_err.value()) {
132 ESP_LOGE(TAG, "Failed to parse the file's header.");
133 break;
135 ESP_LOGE(TAG, "Incompatible bits per sample. Only 16 bits per sample is supported");
136 break;
138 ESP_LOGE(TAG, "Incompatible number of channels. Only 1 or 2 channel audio is supported.");
139 break;
140 }
141 }
142 break;
143 }
144 }
145 }
146
147 /*
148 * Determine the current state based on the event group bits and tasks' status
149 */
150
151 EventBits_t event_bits = xEventGroupGetBits(this->event_group_);
152
153 if (this->pending_url_ || this->pending_file_) {
154 // Init command pending
155 if (!(event_bits & EventGroupBits::PIPELINE_COMMAND_STOP)) {
156 // Only start if there is no pending stop command
157 if ((this->read_task_handle_ == nullptr) || (this->decode_task_handle_ == nullptr)) {
158 // At least one task isn't running
159 this->start_tasks_();
160 }
161
162 if (this->pending_url_) {
164 this->playback_ms_ = 0;
165 this->pending_url_ = false;
166 } else if (this->pending_file_) {
168 this->playback_ms_ = 0;
169 this->pending_file_ = false;
170 }
171
172 this->is_playing_ = true;
174 }
175 }
176
177 if ((event_bits & EventGroupBits::READER_MESSAGE_FINISHED) &&
180 // Tasks are finished and there's no media in between the reader and decoder
181
182 if (event_bits & EventGroupBits::PIPELINE_COMMAND_STOP) {
183 // Stop command is fully processed, so clear the command bit
184 xEventGroupClearBits(this->event_group_, EventGroupBits::PIPELINE_COMMAND_STOP);
185 this->hard_stop_ = true;
186 }
187
188 if (!this->is_playing_) {
189 // The tasks have been stopped for two ``process_state`` calls in a row, so delete the tasks
190 if ((this->read_task_handle_ != nullptr) || (this->decode_task_handle_ != nullptr)) {
191 this->delete_tasks_();
192 if (this->hard_stop_) {
193 // Stop command was sent, so immediately end of the playback
194 this->speaker_->stop();
195 this->hard_stop_ = false;
196 } else {
197 // Decoded all the audio, so let the speaker finish playing before stopping
198 this->speaker_->finish();
199 }
200 }
201 }
202 this->is_playing_ = false;
204 }
205
206 if ((event_bits & EventGroupBits::READER_MESSAGE_ERROR)) {
207 xEventGroupClearBits(this->event_group_, EventGroupBits::READER_MESSAGE_ERROR);
209 }
210
211 if ((event_bits & EventGroupBits::DECODER_MESSAGE_ERROR)) {
212 xEventGroupClearBits(this->event_group_, EventGroupBits::DECODER_MESSAGE_ERROR);
214 }
215
216 if (this->pause_state_) {
218 }
219
220 if ((this->read_task_handle_ == nullptr) && (this->decode_task_handle_ == nullptr)) {
221 // No tasks are running, so the pipeline is stopped.
222 xEventGroupClearBits(this->event_group_, EventGroupBits::PIPELINE_COMMAND_STOP);
224 }
225
226 this->is_playing_ = true;
228}
229
231 if (this->event_group_ == nullptr)
232 this->event_group_ = xEventGroupCreate();
233
234 if (this->event_group_ == nullptr) {
235 return ESP_ERR_NO_MEM;
236 }
237
238 if (this->info_error_queue_ == nullptr)
239 this->info_error_queue_ = xQueueCreate(INFO_ERROR_QUEUE_COUNT, sizeof(InfoErrorEvent));
240
241 if (this->info_error_queue_ == nullptr)
242 return ESP_ERR_NO_MEM;
243
244 return ESP_OK;
245}
246
248 if (this->read_task_handle_ == nullptr) {
249 if (this->read_task_stack_buffer_ == nullptr) {
250 if (this->task_stack_in_psram_) {
252 this->read_task_stack_buffer_ = stack_allocator.allocate(READ_TASK_STACK_SIZE);
253 } else {
255 this->read_task_stack_buffer_ = stack_allocator.allocate(READ_TASK_STACK_SIZE);
256 }
257 }
258
259 if (this->read_task_stack_buffer_ == nullptr) {
260 return ESP_ERR_NO_MEM;
261 }
262
263 if (this->read_task_handle_ == nullptr) {
264 this->read_task_handle_ =
265 xTaskCreateStatic(read_task, (this->base_name_ + "_read").c_str(), READ_TASK_STACK_SIZE, (void *) this,
267 }
268
269 if (this->read_task_handle_ == nullptr) {
270 return ESP_ERR_INVALID_STATE;
271 }
272 }
273
274 if (this->decode_task_handle_ == nullptr) {
275 if (this->decode_task_stack_buffer_ == nullptr) {
276 if (this->task_stack_in_psram_) {
278 this->decode_task_stack_buffer_ = stack_allocator.allocate(DECODE_TASK_STACK_SIZE);
279 } else {
281 this->decode_task_stack_buffer_ = stack_allocator.allocate(DECODE_TASK_STACK_SIZE);
282 }
283 }
284
285 if (this->decode_task_stack_buffer_ == nullptr) {
286 return ESP_ERR_NO_MEM;
287 }
288
289 if (this->decode_task_handle_ == nullptr) {
290 this->decode_task_handle_ =
291 xTaskCreateStatic(decode_task, (this->base_name_ + "_decode").c_str(), DECODE_TASK_STACK_SIZE, (void *) this,
293 }
294
295 if (this->decode_task_handle_ == nullptr) {
296 return ESP_ERR_INVALID_STATE;
297 }
298 }
299
300 return ESP_OK;
301}
302
304 if (this->read_task_handle_ != nullptr) {
305 vTaskDelete(this->read_task_handle_);
306
307 if (this->read_task_stack_buffer_ != nullptr) {
308 if (this->task_stack_in_psram_) {
310 stack_allocator.deallocate(this->read_task_stack_buffer_, READ_TASK_STACK_SIZE);
311 } else {
313 stack_allocator.deallocate(this->read_task_stack_buffer_, READ_TASK_STACK_SIZE);
314 }
315
316 this->read_task_stack_buffer_ = nullptr;
317 this->read_task_handle_ = nullptr;
318 }
319 }
320
321 if (this->decode_task_handle_ != nullptr) {
322 vTaskDelete(this->decode_task_handle_);
323
324 if (this->decode_task_stack_buffer_ != nullptr) {
325 if (this->task_stack_in_psram_) {
327 stack_allocator.deallocate(this->decode_task_stack_buffer_, DECODE_TASK_STACK_SIZE);
328 } else {
330 stack_allocator.deallocate(this->decode_task_stack_buffer_, DECODE_TASK_STACK_SIZE);
331 }
332
333 this->decode_task_stack_buffer_ = nullptr;
334 this->decode_task_handle_ = nullptr;
335 }
336 }
337}
338
339void AudioPipeline::read_task(void *params) {
340 AudioPipeline *this_pipeline = (AudioPipeline *) params;
341
342 while (true) {
343 xEventGroupSetBits(this_pipeline->event_group_, EventGroupBits::READER_MESSAGE_FINISHED);
344
345 // Wait until the pipeline notifies us the source of the media file
346 EventBits_t event_bits =
347 xEventGroupWaitBits(this_pipeline->event_group_,
349 EventGroupBits::PIPELINE_COMMAND_STOP, // Bit message to read
350 pdFALSE, // Clear the bit on exit
351 pdFALSE, // Wait for all the bits,
352 portMAX_DELAY); // Block indefinitely until bit is set
353
354 if (!(event_bits & EventGroupBits::PIPELINE_COMMAND_STOP)) {
355 xEventGroupClearBits(this_pipeline->event_group_, EventGroupBits::READER_MESSAGE_FINISHED |
358 InfoErrorEvent event;
360 esp_err_t err = ESP_OK;
361
362 std::unique_ptr<audio::AudioReader> reader =
364
366 err = reader->start(this_pipeline->current_audio_file_, this_pipeline->current_audio_file_type_);
367 } else {
368 err = reader->start(this_pipeline->current_uri_, this_pipeline->current_audio_file_type_);
369 }
370
371 if (err == ESP_OK) {
372 size_t file_ring_buffer_size = this_pipeline->buffer_size_;
373
374 std::shared_ptr<RingBuffer> temp_ring_buffer;
375
376 if (!this_pipeline->raw_file_ring_buffer_.use_count()) {
377 temp_ring_buffer = RingBuffer::create(file_ring_buffer_size);
378 this_pipeline->raw_file_ring_buffer_ = temp_ring_buffer;
379 }
380
381 if (!this_pipeline->raw_file_ring_buffer_.use_count()) {
382 err = ESP_ERR_NO_MEM;
383 } else {
384 reader->add_sink(this_pipeline->raw_file_ring_buffer_);
385 }
386 }
387
388 if (err != ESP_OK) {
389 // Send specific error message
390 event.err = err;
391 xQueueSend(this_pipeline->info_error_queue_, &event, portMAX_DELAY);
392
393 // Setting up the reader failed, stop the pipeline
394 xEventGroupSetBits(this_pipeline->event_group_,
396 } else {
397 // Send the file type to the pipeline
398 event.file_type = this_pipeline->current_audio_file_type_;
399 xQueueSend(this_pipeline->info_error_queue_, &event, portMAX_DELAY);
400 xEventGroupSetBits(this_pipeline->event_group_, EventGroupBits::READER_MESSAGE_LOADED_MEDIA_TYPE);
401 }
402
403 while (true) {
404 event_bits = xEventGroupGetBits(this_pipeline->event_group_);
405
406 if (event_bits & EventGroupBits::PIPELINE_COMMAND_STOP) {
407 break;
408 }
409
410 audio::AudioReaderState reader_state = reader->read();
411
412 if (reader_state == audio::AudioReaderState::FINISHED) {
413 break;
414 } else if (reader_state == audio::AudioReaderState::FAILED) {
415 xEventGroupSetBits(this_pipeline->event_group_,
417 break;
418 }
419 }
420 event_bits = xEventGroupGetBits(this_pipeline->event_group_);
422 (this_pipeline->raw_file_ring_buffer_.use_count() == 1)) {
423 // Decoder task hasn't started yet, so delay a bit before releasing ownership of the ring buffer
424 delay(10);
425 }
426 }
427 }
428}
429
430void AudioPipeline::decode_task(void *params) {
431 AudioPipeline *this_pipeline = (AudioPipeline *) params;
432
433 while (true) {
434 xEventGroupSetBits(this_pipeline->event_group_, EventGroupBits::DECODER_MESSAGE_FINISHED);
435
436 // Wait until the reader notifies us that the media type is available
437 EventBits_t event_bits = xEventGroupWaitBits(this_pipeline->event_group_,
439 EventGroupBits::PIPELINE_COMMAND_STOP, // Bit message to read
440 pdFALSE, // Clear the bit on exit
441 pdFALSE, // Wait for all the bits,
442 portMAX_DELAY); // Block indefinitely until bit is set
443
444 xEventGroupClearBits(this_pipeline->event_group_,
446
447 if (!(event_bits & EventGroupBits::PIPELINE_COMMAND_STOP)) {
448 InfoErrorEvent event;
450
451 std::unique_ptr<audio::AudioDecoder> decoder =
453
454 esp_err_t err = decoder->start(this_pipeline->current_audio_file_type_);
455 decoder->add_source(this_pipeline->raw_file_ring_buffer_);
456
457 if (err != ESP_OK) {
458 // Send specific error message
459 event.err = err;
460 xQueueSend(this_pipeline->info_error_queue_, &event, portMAX_DELAY);
461
462 // Setting up the decoder failed, stop the pipeline
463 xEventGroupSetBits(this_pipeline->event_group_,
465 }
466
467 bool has_stream_info = false;
468 bool started_playback = false;
469
470 size_t initial_bytes_to_buffer = 0;
471
472 while (true) {
473 event_bits = xEventGroupGetBits(this_pipeline->event_group_);
474
475 if (event_bits & EventGroupBits::PIPELINE_COMMAND_STOP) {
476 break;
477 }
478
479 // Update pause state
480 if (!started_playback) {
481 if (!(event_bits & EventGroupBits::READER_MESSAGE_FINISHED)) {
482 decoder->set_pause_output_state(true);
483 } else {
484 started_playback = true;
485 }
486 } else {
487 decoder->set_pause_output_state(this_pipeline->pause_state_);
488 }
489
490 // Stop gracefully if the reader has finished
491 audio::AudioDecoderState decoder_state = decoder->decode(event_bits & EventGroupBits::READER_MESSAGE_FINISHED);
492
493 if ((decoder_state == audio::AudioDecoderState::DECODING) ||
494 (decoder_state == audio::AudioDecoderState::FINISHED)) {
495 this_pipeline->playback_ms_ = decoder->get_playback_ms();
496 }
497
498 if (decoder_state == audio::AudioDecoderState::FINISHED) {
499 break;
500 } else if (decoder_state == audio::AudioDecoderState::FAILED) {
501 if (!has_stream_info) {
502 event.decoding_err = DecodingError::FAILED_HEADER;
503 xQueueSend(this_pipeline->info_error_queue_, &event, portMAX_DELAY);
504 }
505 xEventGroupSetBits(this_pipeline->event_group_,
507 break;
508 }
509
510 if (!has_stream_info && decoder->get_audio_stream_info().has_value()) {
511 has_stream_info = true;
512
513 this_pipeline->current_audio_stream_info_ = decoder->get_audio_stream_info().value();
514
515 // Send the stream information to the pipeline
516 event.audio_stream_info = this_pipeline->current_audio_stream_info_;
517
518 if (this_pipeline->current_audio_stream_info_.get_bits_per_sample() != 16) {
519 // Error state, incompatible bits per sample
521 xEventGroupSetBits(this_pipeline->event_group_,
523 } else if ((this_pipeline->current_audio_stream_info_.get_channels() > 2)) {
524 // Error state, incompatible number of channels
525 event.decoding_err = DecodingError::INCOMPATIBLE_CHANNELS;
526 xEventGroupSetBits(this_pipeline->event_group_,
528 } else {
529 // Send audio directly to the speaker
530 this_pipeline->speaker_->set_audio_stream_info(this_pipeline->current_audio_stream_info_);
531 decoder->add_sink(this_pipeline->speaker_);
532 }
533
534 initial_bytes_to_buffer = std::min(this_pipeline->current_audio_stream_info_.ms_to_bytes(INITIAL_BUFFER_MS),
535 this_pipeline->buffer_size_ * 3 / 4);
536
537 switch (this_pipeline->current_audio_file_type_) {
538#ifdef USE_AUDIO_MP3_SUPPORT
540 initial_bytes_to_buffer /= 8; // Estimate the MP3 compression factor is 8
541 break;
542#endif
543#ifdef USE_AUDIO_FLAC_SUPPORT
545 initial_bytes_to_buffer /= 2; // Estimate the FLAC compression factor is 2
546 break;
547#endif
548 default:
549 break;
550 }
551 xQueueSend(this_pipeline->info_error_queue_, &event, portMAX_DELAY);
552 }
553
554 if (!started_playback && has_stream_info) {
555 // Verify enough data is available before starting playback
556 std::shared_ptr<RingBuffer> temp_ring_buffer = this_pipeline->raw_file_ring_buffer_.lock();
557 if (temp_ring_buffer->available() >= initial_bytes_to_buffer) {
558 started_playback = true;
559 }
560 }
561 }
562 }
563 }
564}
565
566} // namespace speaker
567} // namespace esphome
568
569#endif
An STL allocator that uses SPI or internal RAM.
Definition helpers.h:683
void deallocate(T *p, size_t n)
Definition helpers.h:741
T * allocate(size_t n)
Definition helpers.h:703
static std::unique_ptr< RingBuffer > create(size_t len)
size_t ms_to_bytes(uint32_t ms) const
Converts duration to bytes.
Definition audio.h:73
uint8_t get_bits_per_sample() const
Definition audio.h:28
uint8_t get_channels() const
Definition audio.h:29
bool has_value() const
Definition optional.h:87
value_type const & value() const
Definition optional.h:89
static void read_task(void *params)
void suspend_tasks()
Suspends any running tasks.
void set_pause_state(bool pause_state)
void delete_tasks_()
Resets the task related pointers and deallocates their stacks.
std::weak_ptr< RingBuffer > raw_file_ring_buffer_
void start_url(const std::string &uri)
Starts an audio pipeline given a media url.
esp_err_t allocate_communications_()
Allocates the event group and info error queue.
esp_err_t start_tasks_()
Common start code for the pipeline, regardless if the source is a file or url.
audio::AudioStreamInfo current_audio_stream_info_
void start_file(audio::AudioFile *audio_file)
Starts an audio pipeline given a AudioFile pointer.
esp_err_t stop()
Stops the pipeline.
static void decode_task(void *params)
AudioPipeline(speaker::Speaker *speaker, size_t buffer_size, bool task_stack_in_psram, std::string base_name, UBaseType_t priority)
void resume_tasks()
Resumes any running tasks.
audio::AudioFile * current_audio_file_
audio::AudioFileType current_audio_file_type_
AudioPipelineState process_state()
Processes the state of the audio pipeline based on the info_error_queue_ and event_group_.
virtual void set_pause_state(bool pause_state)
Definition speaker.h:61
void set_audio_stream_info(const audio::AudioStreamInfo &audio_stream_info)
Definition speaker.h:99
virtual void finish()
Definition speaker.h:58
virtual void stop()=0
uint8_t priority
Providing packet encoding functions for exchanging data with a remote host.
Definition a01nyub.cpp:7
std::unique_ptr< T > make_unique(Args &&...args)
Definition helpers.h:85
void IRAM_ATTR HOT delay(uint32_t ms)
Definition core.cpp:28
optional< DecodingError > decoding_err
optional< audio::AudioFileType > file_type
optional< audio::AudioStreamInfo > audio_stream_info