diff options
Diffstat (limited to 'macros/src/codegen/dispatchers.rs')
-rw-r--r-- | macros/src/codegen/dispatchers.rs | 128 |
1 files changed, 109 insertions, 19 deletions
diff --git a/macros/src/codegen/dispatchers.rs b/macros/src/codegen/dispatchers.rs index a90a97c7..e6caa781 100644 --- a/macros/src/codegen/dispatchers.rs +++ b/macros/src/codegen/dispatchers.rs @@ -5,7 +5,7 @@ use rtic_syntax::ast::App; use crate::{analyze::Analysis, check::Extra, codegen::util}; /// Generates task dispatchers -pub fn codegen(app: &App, analysis: &Analysis, _extra: &Extra) -> Vec<TokenStream2> { +pub fn codegen(app: &App, analysis: &Analysis, extra: &Extra) -> Vec<TokenStream2> { let mut items = vec![]; let interrupts = &analysis.interrupts; @@ -64,6 +64,9 @@ pub fn codegen(app: &App, analysis: &Analysis, _extra: &Extra) -> Vec<TokenStrea static #rq: rtic::RacyCell<#rq_ty> = rtic::RacyCell::new(#rq_expr); )); + let device = &extra.device; + let enum_ = util::interrupt_ident(); + let interrupt = util::suffixed(&interrupts[&level].0.to_string()); let arms = channel .tasks .iter() @@ -73,37 +76,124 @@ pub fn codegen(app: &App, analysis: &Analysis, _extra: &Extra) -> Vec<TokenStrea let fq = util::fq_ident(name); let inputs = util::inputs_ident(name); let (_, tupled, pats, _) = util::regroup_inputs(&task.inputs); + let exec_name = util::internal_task_ident(name, "EXEC"); - quote!( - #(#cfgs)* - #t::#name => { - let #tupled = - (&*#inputs - .get()) - .get_unchecked(usize::from(index)) - .as_ptr() - .read(); - (&mut *#fq.get_mut()).split().0.enqueue_unchecked(index); - let priority = &rtic::export::Priority::new(PRIORITY); - #name( - #name::Context::new(priority) - #(,#pats)* - ) - } - ) + if task.is_async { + let executor_run_ident = util::executor_run_ident(name); + + quote!( + #(#cfgs)* + #t::#name => { + if !(&mut *#exec_name.get_mut()).is_running() { + let #tupled = + (&*#inputs + .get()) + .get_unchecked(usize::from(index)) + .as_ptr() + .read(); + (&mut *#fq.get_mut()).split().0.enqueue_unchecked(index); + + let priority = &rtic::export::Priority::new(PRIORITY); + (&mut *#exec_name.get_mut()).spawn(#name(#name::Context::new(priority), #(,#pats)*)); + #executor_run_ident.store(true, core::sync::atomic::Ordering::Relaxed); + } else { + retry_queue.push_unchecked((#t::#name, index)); + } + } + ) + } else { + quote!( + #(#cfgs)* + #t::#name => { + let #tupled = + (&*#inputs + .get()) + .get_unchecked(usize::from(index)) + .as_ptr() + .read(); + (&mut *#fq.get_mut()).split().0.enqueue_unchecked(index); + let priority = &rtic::export::Priority::new(PRIORITY); + #name( + #name::Context::new(priority) + #(,#pats)* + ) + } + ) + } }) .collect::<Vec<_>>(); + for (name, task) in app.software_tasks.iter() { + if task.is_async { + let type_name = util::internal_task_ident(name, "F"); + let exec_name = util::internal_task_ident(name, "EXEC"); + + stmts.push(quote!( + type #type_name = impl core::future::Future + 'static; + static #exec_name: + rtic::RacyCell<rtic::export::executor::AsyncTaskExecutor<#type_name>> = + rtic::RacyCell::new(rtic::export::executor::AsyncTaskExecutor::new()); + )); + } + } + + let n_executors: usize = app + .software_tasks + .iter() + .map(|(_, task)| if task.is_async { 1 } else { 0 }) + .sum(); + + // TODO: This `retry_queue` comes from the current design of the dispatcher queue handling. + // To remove this we would need to redesign how the dispatcher handles queues, and this can + // be done as an optimization later. + // + // The core issue is that we should only dequeue the ready queue if the exexutor associated + // to the task is not running. As it is today this queue is blindly dequeued, see the + // `while let Some(...) = (&mut *#rq.get_mut())...` a few lines down. The current "hack" is + // to just requeue the executor run if it should not have been dequeued. This needs however + // to be done after the ready queue has been exhausted. + if n_executors > 0 { + stmts.push(quote!( + let mut retry_queue: rtic::export::Vec<_, #n_executors> = rtic::export::Vec::new(); + )); + } + stmts.push(quote!( while let Some((task, index)) = (&mut *#rq.get_mut()).split().1.dequeue() { match task { #(#arms)* } } + + while let Some((task, index)) = retry_queue.pop() { + rtic::export::interrupt::free(|_| { + (&mut *#rq.get_mut()).enqueue_unchecked((task, index)); + }); + } )); + for (name, _task) in app.software_tasks.iter().filter_map(|(name, task)| { + if task.is_async { + Some((name, task)) + } else { + None + } + }) { + let exec_name = util::internal_task_ident(name, "EXEC"); + + let executor_run_ident = util::executor_run_ident(name); + stmts.push(quote!( + if #executor_run_ident.load(core::sync::atomic::Ordering::Relaxed) { + #executor_run_ident.store(false, core::sync::atomic::Ordering::Relaxed); + (&mut *#exec_name.get_mut()).poll(|| { + #executor_run_ident.store(true, core::sync::atomic::Ordering::Release); + rtic::pend(#device::#enum_::#interrupt); + }); + } + )); + } + let doc = format!("Interrupt handler to dispatch tasks at priority {}", level); - let interrupt = util::suffixed(&interrupts[&level].0.to_string()); let attribute = &interrupts[&level].1.attrs; items.push(quote!( #[allow(non_snake_case)] |