aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--examples/async-task.rs168
-rw-r--r--examples/async.rs287
-rw-r--r--examples/async2.rs361
-rw-r--r--macros/src/codegen.rs56
-rw-r--r--macros/src/codegen/dispatchers.rs128
-rw-r--r--macros/src/codegen/module.rs34
-rw-r--r--macros/src/codegen/monotonic.rs251
-rw-r--r--macros/src/codegen/software_tasks.rs30
-rw-r--r--macros/src/codegen/timer_queue.rs36
-rw-r--r--macros/src/codegen/util.rs9
-rw-r--r--src/export.rs68
-rw-r--r--src/tq.rs262
12 files changed, 720 insertions, 970 deletions
diff --git a/examples/async-task.rs b/examples/async-task.rs
index a91dfc67..00fe581c 100644
--- a/examples/async-task.rs
+++ b/examples/async-task.rs
@@ -2,10 +2,6 @@
#![no_std]
#![feature(type_alias_impl_trait)]
-use core::future::Future;
-use core::pin::Pin;
-use core::task::{Context, Poll, Waker};
-
use cortex_m_semihosting::{debug, hprintln};
use panic_semihosting as _;
use systick_monotonic::*;
@@ -25,9 +21,7 @@ mod app {
pub type AppDuration = <Systick<100> as rtic::Monotonic>::Duration;
#[shared]
- struct Shared {
- s: u32,
- }
+ struct Shared {}
#[local]
struct Local {}
@@ -39,8 +33,11 @@ mod app {
fn init(cx: init::Context) -> (Shared, Local, init::Monotonics) {
hprintln!("init").unwrap();
+ normal_task::spawn().ok();
+ async_task::spawn().ok();
+
(
- Shared { s: 0 },
+ Shared {},
Local {},
init::Monotonics(Systick::new(cx.core.SYST, 12_000_000)),
)
@@ -55,156 +52,15 @@ mod app {
}
}
- #[task(priority = 2)]
- async fn task(cx: task::Context) {
- hprintln!("delay long time").ok();
-
- let fut = Delay::spawn(2500.millis());
-
- hprintln!("we have just created the future").ok();
- fut.await;
- hprintln!("long delay done").ok();
-
- hprintln!("delay short time").ok();
- delay(500.millis()).await;
- hprintln!("short delay done").ok();
-
- hprintln!("test timeout").ok();
- let res = timeout(NeverEndingFuture {}, 1.secs()).await;
- hprintln!("timeout done: {:?}", res).ok();
-
- hprintln!("test timeout 2").ok();
- let res = timeout(delay(500.millis()), 1.secs()).await;
- hprintln!("timeout done 2: {:?}", res).ok();
-
- debug::exit(debug::EXIT_SUCCESS);
- }
-
- #[task(capacity = 12)]
- fn delay_handler(_: delay_handler::Context, waker: Waker) {
- waker.wake();
- }
-}
-
-// Delay
-
-pub struct Delay {
- until: crate::app::AppInstant,
-}
-
-impl Delay {
- pub fn spawn(duration: crate::app::AppDuration) -> Self {
- let until = crate::app::monotonics::now() + duration;
-
- Delay { until }
- }
-}
-
-#[inline(always)]
-pub fn delay(duration: crate::app::AppDuration) -> Delay {
- Delay::spawn(duration)
-}
-
-impl Future for Delay {
- type Output = ();
-
- fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
- let s = self.as_mut();
- let now = crate::app::monotonics::now();
-
- hprintln!(" poll Delay").ok();
-
- if now >= s.until {
- Poll::Ready(())
- } else {
- let waker = cx.waker().clone();
- crate::app::delay_handler::spawn_after(s.until - now, waker).ok();
-
- Poll::Pending
- }
- }
-}
-
-//=============
-// Timeout future
-
-#[derive(Copy, Clone, Debug)]
-pub struct TimeoutError;
-
-pub struct Timeout<F: Future> {
- future: F,
- until: crate::app::AppInstant,
- cancel_handle: Option<crate::app::delay_handler::SpawnHandle>,
-}
-
-impl<F> Timeout<F>
-where
- F: Future,
-{
- pub fn timeout(future: F, duration: crate::app::AppDuration) -> Self {
- let until = crate::app::monotonics::now() + duration;
- Self {
- future,
- until,
- cancel_handle: None,
- }
- }
-}
-
-#[inline(always)]
-pub fn timeout<F: Future>(future: F, duration: crate::app::AppDuration) -> Timeout<F> {
- Timeout::timeout(future, duration)
-}
-
-impl<F> Future for Timeout<F>
-where
- F: Future,
-{
- type Output = Result<F::Output, TimeoutError>;
-
- fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
- let now = crate::app::monotonics::now();
-
- // SAFETY: We don't move the underlying pinned value.
- let mut s = unsafe { self.get_unchecked_mut() };
- let future = unsafe { Pin::new_unchecked(&mut s.future) };
-
- hprintln!(" poll Timeout").ok();
-
- match future.poll(cx) {
- Poll::Ready(r) => {
- if let Some(ch) = s.cancel_handle.take() {
- ch.cancel().ok();
- }
-
- Poll::Ready(Ok(r))
- }
- Poll::Pending => {
- if now >= s.until {
- Poll::Ready(Err(TimeoutError))
- } else if s.cancel_handle.is_none() {
- let waker = cx.waker().clone();
- let sh = crate::app::delay_handler::spawn_after(s.until - now, waker)
- .expect("Internal RTIC bug, this should never fail");
- s.cancel_handle = Some(sh);
-
- Poll::Pending
- } else {
- Poll::Pending
- }
- }
- }
+ #[task]
+ fn normal_task(_cx: normal_task::Context) {
+ hprintln!("hello from normal").ok();
}
-}
-
-pub struct NeverEndingFuture {}
-impl Future for NeverEndingFuture {
- type Output = ();
+ #[task]
+ async fn async_task(_cx: async_task::Context) {
+ hprintln!("hello from async").ok();
- fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> {
- // Never finish
- hprintln!(" polling NeverEndingFuture").ok();
- Poll::Pending
+ debug::exit(debug::EXIT_SUCCESS);
}
}
diff --git a/examples/async.rs b/examples/async.rs
deleted file mode 100644
index 6abbbad8..00000000
--- a/examples/async.rs
+++ /dev/null
@@ -1,287 +0,0 @@
-#![no_main]
-#![no_std]
-#![feature(type_alias_impl_trait)]
-
-use core::future::Future;
-use core::mem;
-use core::pin::Pin;
-use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
-
-use cortex_m_semihosting::{debug, hprintln};
-use panic_semihosting as _;
-
-#[rtic::app(device = lm3s6965, dispatchers = [SSI0], peripherals = true)]
-mod app {
- use crate::Timer;
- use crate::*;
-
- #[shared]
- struct Shared {
- syst: cortex_m::peripheral::SYST,
- }
-
- #[local]
- struct Local {}
-
- #[init]
- fn init(cx: init::Context) -> (Shared, Local, init::Monotonics) {
- hprintln!("init").unwrap();
- foo::spawn().unwrap();
- foo2::spawn().unwrap();
- (Shared { syst: cx.core.SYST }, Local {}, init::Monotonics())
- }
-
- #[idle]
- fn idle(_: idle::Context) -> ! {
- // debug::exit(debug::EXIT_SUCCESS);
- loop {
- hprintln!("idle");
- cortex_m::asm::wfi(); // put the MCU in sleep mode until interrupt occurs
- }
- }
-
- type F = impl Future + 'static;
- static mut TASK: Task<F> = Task::new();
-
- #[task(shared = [syst])]
- fn foo(mut cx: foo::Context) {
- // BEGIN BOILERPLATE
- fn create(cx: foo::Context<'static>) -> F {
- task(cx)
- }
-
- hprintln!("foo trampoline").ok();
- unsafe {
- match TASK {
- Task::Idle | Task::Done(_) => {
- hprintln!("foo spawn task").ok();
- TASK.spawn(|| create(mem::transmute(cx)));
- // Per:
- // I think transmute could be removed as in:
- // TASK.spawn(|| create(cx));
- //
- // This could be done if spawn for async tasks would be passed
- // a 'static reference by the generated code.
- //
- // Soundness:
- // Check if lifetime for async context is correct.
- }
- _ => {}
- };
-
- foo_poll::spawn();
- }
- // END BOILERPLATE
-
- async fn task(mut cx: foo::Context<'static>) {
- hprintln!("foo task").ok();
-
- hprintln!("delay long time").ok();
- let fut = cx.shared.syst.lock(|syst| timer_delay(syst, 5000000));
-
- hprintln!("we have just created the future");
- fut.await; // this calls poll on the timer future
- hprintln!("foo task resumed").ok();
-
- hprintln!("delay short time").ok();
- cx.shared.syst.lock(|syst| timer_delay(syst, 1000000)).await;
- hprintln!("foo task resumed").ok();
- debug::exit(debug::EXIT_SUCCESS);
- }
- }
-
- #[task(shared = [syst])]
- fn foo_poll(mut cx: foo_poll::Context) {
- // BEGIN BOILERPLATE
-
- hprintln!("foo poll trampoline").ok();
- unsafe {
- hprintln!("foo trampoline poll").ok();
- TASK.poll(|| {
- hprintln!("foo poll closure").ok();
- });
-
- match TASK {
- Task::Done(ref r) => {
- hprintln!("foo trampoline done").ok();
- // hprintln!("r = {:?}", mem::transmute::<_, &u32>(r)).ok();
- }
- _ => {
- hprintln!("foo trampoline running").ok();
- }
- }
- }
- // END BOILERPLATE
- }
-
- type F2 = impl Future + 'static;
- static mut TASK2: Task<F2> = Task::new();
-
- #[task(shared = [syst])]
- fn foo2(mut cx: foo2::Context) {
- // BEGIN BOILERPLATE
- fn create(cx: foo2::Context<'static>) -> F2 {
- task(cx)
- }
-
- hprintln!("foo2 trampoline").ok();
- unsafe {
- match TASK2 {
- Task::Idle | Task::Done(_) => {
- hprintln!("foo2 spawn task").ok();
- TASK2.spawn(|| create(mem::transmute(cx)));
- // Per:
- // I think transmute could be removed as in:
- // TASK.spawn(|| create(cx));
- //
- // This could be done if spawn for async tasks would be passed
- // a 'static reference by the generated code.
- //
- // Soundness:
- // Check if lifetime for async context is correct.
- }
- _ => {}
- };
-
- foo2_poll::spawn();
- }
- // END BOILERPLATE
-
- async fn task(mut cx: foo2::Context<'static>) {
- hprintln!("foo2 task").ok();
-
- hprintln!("foo2 delay long time").ok();
- let fut = cx.shared.syst.lock(|syst| timer_delay(syst, 10_000_000));
-
- hprintln!("we have just created the future");
- fut.await; // this calls poll on the timer future
- hprintln!("foo task resumed").ok();
- }
- }
-
- #[task(shared = [syst])]
- fn foo2_poll(mut cx: foo2_poll::Context) {
- // BEGIN BOILERPLATE
-
- hprintln!("foo2 poll trampoline").ok();
- unsafe {
- hprintln!("foo2 trampoline poll").ok();
- TASK2.poll(|| {
- hprintln!("foo2 poll closure").ok();
- });
-
- match TASK2 {
- Task::Done(ref r) => {
- hprintln!("foo2 trampoline done").ok();
- // hprintln!("r = {:?}", mem::transmute::<_, &u32>(r)).ok();
- }
- _ => {
- hprintln!("foo2 trampoline running").ok();
- }
- }
- }
- // END BOILERPLATE
- }
-
- // This the actual RTIC task, binds to systic.
- #[task(binds = SysTick, shared = [syst], priority = 2)]
- fn systic(mut cx: systic::Context) {
- hprintln!("systic interrupt").ok();
- cx.shared.syst.lock(|syst| syst.disable_interrupt());
- crate::app::foo_poll::spawn(); // this should be from a Queue later
- crate::app::foo2_poll::spawn(); // this should be from a Queue later
- }
-}
-
-//=============
-// Waker
-
-static WAKER_VTABLE: RawWakerVTable =
- RawWakerVTable::new(waker_clone, waker_wake, waker_wake, waker_drop);
-
-unsafe fn waker_clone(p: *const ()) -> RawWaker {
- RawWaker::new(p, &WAKER_VTABLE)
-}
-
-unsafe fn waker_wake(p: *const ()) {
- let f: fn() = mem::transmute(p);
- f();
-}
-
-unsafe fn waker_drop(_: *const ()) {
- // nop
-}
-
-//============
-// Task
-
-enum Task<F: Future + 'static> {
- Idle,
- Running(F),
- Done(F::Output),
-}
-
-impl<F: Future + 'static> Task<F> {
- const fn new() -> Self {
- Self::Idle
- }
-
- fn spawn(&mut self, future: impl FnOnce() -> F) {
- *self = Task::Running(future());
- }
-
- unsafe fn poll(&mut self, wake: fn()) {
- match self {
- Task::Idle => {}
- Task::Running(future) => {
- let future = Pin::new_unchecked(future);
- let waker_data: *const () = mem::transmute(wake);
- let waker = Waker::from_raw(RawWaker::new(waker_data, &WAKER_VTABLE));
- let mut cx = Context::from_waker(&waker);
-
- match future.poll(&mut cx) {
- Poll::Ready(r) => *self = Task::Done(r),
- Poll::Pending => {}
- };
- }
- Task::Done(_) => {}
- }
- }
-}
-
-//=============
-// Timer
-// Later we want a proper queue
-
-use heapless;
-pub struct Timer {
- pub done: bool,
- // pub waker_task: Option<fn() -> Result<(), ()>>,
-}
-
-impl Future for Timer {
- type Output = ();
- fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
- if self.done {
- Poll::Ready(())
- } else {
- hprintln!("timer polled");
- cx.waker().wake_by_ref();
- hprintln!("after wake_by_ref");
- self.done = true;
- Poll::Pending
- }
- }
-}
-
-fn timer_delay(syst: &mut cortex_m::peripheral::SYST, t: u32) -> Timer {
- hprintln!("timer_delay {}", t);
-
- syst.set_reload(t);
- syst.enable_counter();
- syst.enable_interrupt();
- Timer {
- done: false,
- // waker_task: Some(app::foo::spawn), // we should add waker field to async task context i RTIC
- }
-}
diff --git a/examples/async2.rs b/examples/async2.rs
deleted file mode 100644
index 2d82ed3e..00000000
--- a/examples/async2.rs
+++ /dev/null
@@ -1,361 +0,0 @@
-#![no_main]
-#![no_std]
-#![feature(type_alias_impl_trait)]
-
-use core::future::Future;
-use core::mem;
-use core::pin::Pin;
-use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
-
-use cortex_m_semihosting::{debug, hprintln};
-use panic_semihosting as _;
-use systick_monotonic::*;
-
-// NOTES:
-//
-// - Async tasks cannot have `#[lock_free]` resources, as they can interleve and each async
-// task can have a mutable reference stored.
-// - Spawning an async task equates to it being polled at least once.
-// - ...
-
-#[rtic::app(device = lm3s6965, dispatchers = [SSI0], peripherals = true)]
-mod app {
- use crate::*;
-
- pub type AppInstant = <Systick<100> as rtic::Monotonic>::Instant;
- pub type AppDuration = <Systick<100> as rtic::Monotonic>::Duration;
-
- #[shared]
- struct Shared {
- s: u32,
- }
-
- #[local]
- struct Local {}
-
- #[monotonic(binds = SysTick, default = true)]
- type MyMono = Systick<100>;
-
- #[init]
- fn init(cx: init::Context) -> (Shared, Local, init::Monotonics) {
- hprintln!("init").unwrap();
- task_executor::spawn().unwrap();
-
- (
- Shared { s: 0 },
- Local {},
- init::Monotonics(Systick::new(cx.core.SYST, 12_000_000)),
- )
- }
-
- #[idle]
- fn idle(_: idle::Context) -> ! {
- // debug::exit(debug::EXIT_SUCCESS);
- loop {
- // hprintln!("idle");
- cortex_m::asm::wfi(); // put the MCU in sleep mode until interrupt occurs
- }
- }
-
- // TODO: This should be the task, that is understood by the `syntax` proc-macro
- // #[task(priority = 2)]
- async fn task(cx: task_executor::Context<'_>) {
- #[allow(unused_imports)]
- use rtic::mutex_prelude::*;
-
- hprintln!("delay long time").ok();
-
- let fut = Delay::spawn(2500.millis());
-
- hprintln!("we have just created the future").ok();
- fut.await;
- hprintln!("long delay done").ok();
-
- hprintln!("delay short time").ok();
- sleep(500.millis()).await;
- hprintln!("short delay done").ok();
-
- hprintln!("test timeout").ok();
- let res = timeout(NeverEndingFuture {}, 1.secs()).await;
- hprintln!("timeout done: {:?}", res).ok();
-
- hprintln!("test timeout 2").ok();
- let res = timeout(Delay::spawn(500.millis()), 1.secs()).await;
- hprintln!("timeout done 2: {:?}", res).ok();
-
- debug::exit(debug::EXIT_SUCCESS);
- }
-
- //////////////////////////////////////////////
- // BEGIN BOILERPLATE
- //////////////////////////////////////////////
- type F = impl Future + 'static;
- static mut TASK: AsyncTaskExecutor<F> = AsyncTaskExecutor::new();
-
- // TODO: This should be a special case codegen for the `dispatcher`, which runs
- // in the dispatcher. Not as its own task, this is just to make it work
- // in this example.
- #[task(shared = [s])]
- fn task_executor(cx: task_executor::Context) {
- let task_storage = unsafe { &mut TASK };
- match task_storage {
- AsyncTaskExecutor::Idle => {
- // TODO: The context generated for async tasks need 'static lifetime,
- // use `mem::transmute` for now until codegen is fixed
- //
- // TODO: Check if there is some way to not need 'static lifetime
- hprintln!(" task_executor spawn").ok();
- task_storage.spawn(|| task(unsafe { mem::transmute(cx) }));
- task_executor::spawn().ok();
- }
- _ => {
- hprintln!(" task_executor run").ok();
- task_storage.poll(|| {
- task_executor::spawn().ok();
- });
- }
- };
- }
- //////////////////////////////////////////////
- // END BOILERPLATE
- //////////////////////////////////////////////
-
- // TODO: This is generated by the `delay` impl, it needs a capacity equal or grater
- // than the number of async tasks in the system. Should more likely be a part
- // of the monotonic codegen, not its own task.
- #[task(capacity = 12)]
- fn delay_handler(_: delay_handler::Context, waker: Waker) {
- waker.wake();
- }
-}
-
-//=============
-// Waker
-
-static WAKER_VTABLE: RawWakerVTable =
- RawWakerVTable::new(waker_clone, waker_wake, waker_wake, waker_drop);
-
-unsafe fn waker_clone(p: *const ()) -> RawWaker {
- RawWaker::new(p, &WAKER_VTABLE)
-}
-
-unsafe fn waker_wake(p: *const ()) {
- // The only thing we need from a waker is the function to call to pend the async
- // dispatcher.
- let f: fn() = mem::transmute(p);
- f();
-}
-
-unsafe fn waker_drop(_: *const ()) {
- // nop
-}
-
-//============
-// AsyncTaskExecutor
-
-enum AsyncTaskExecutor<F: Future + 'static> {
- Idle,
- Running(F),
-}
-
-impl<F: Future + 'static> AsyncTaskExecutor<F> {
- const fn new() -> Self {
- Self::Idle
- }
-
- fn spawn(&mut self, future: impl FnOnce() -> F) {
- *self = AsyncTaskExecutor::Running(future());
- }
-
- fn poll(&mut self, wake: fn()) {
- match self {
- AsyncTaskExecutor::Idle => {}
- AsyncTaskExecutor::Running(future) => unsafe {
- let waker_data: *const () = mem::transmute(wake);
- let waker = Waker::from_raw(RawWaker::new(waker_data, &WAKER_VTABLE));
- let mut cx = Context::from_waker(&waker);
- let future = Pin::new_unchecked(future);
-
- match future.poll(&mut cx) {
- Poll::Ready(_) => {
- *self = AsyncTaskExecutor::Idle;
- hprintln!(" task_executor idle").ok();
- }
- Poll::Pending => {}
- };
- },
- }
- }
-}
-
-//=============
-// Delay
-
-pub struct Delay {
- until: crate::app::AppInstant,
-}
-
-impl Delay {
- pub fn spawn(duration: crate::app::AppDuration) -> Self {
- let until = crate::app::monotonics::now() + duration;
-
- Delay { until }
- }
-}
-
-#[inline(always)]
-pub fn sleep(duration: crate::app::AppDuration) -> Delay {
- Delay::spawn(duration)
-}
-
-impl Future for Delay {
- type Output = ();
-
- fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
- let s = self.as_mut();
- let now = crate::app::monotonics::now();
-
- hprintln!(" poll Delay").ok();
-
- if now >= s.until {
- Poll::Ready(())
- } else {
- let waker = cx.waker().clone();
- crate::app::delay_handler::spawn_after(s.until - now, waker).ok();
-
- Poll::Pending
- }
- }
-}
-
-//=============
-// Timeout future
-
-#[derive(Copy, Clone, Debug)]
-pub struct TimeoutError;
-
-pub struct Timeout<F: Future> {
- future: F,
- until: crate::app::AppInstant,
- cancel_handle: Option<crate::app::delay_handler::SpawnHandle>,
-}
-
-impl<F> Timeout<F>
-where
- F: Future,
-{
- pub fn timeout(future: F, duration: crate::app::AppDuration) -> Self {
- let until = crate::app::monotonics::now() + duration;
- Self {
- future,
- until,
- cancel_handle: None,
- }
- }
-}
-
-#[inline(always)]
-pub fn timeout<F: Future>(future: F, duration: crate::app::AppDuration) -> Timeout<F> {
- Timeout::timeout(future, duration)
-}
-
-impl<F> Future for Timeout<F>
-where
- F: Future,
-{
- type Output = Result<F::Output, TimeoutError>;
-
- fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
- let now = crate::app::monotonics::now();
-
- // SAFETY: We don't move the underlying pinned value.
- let mut s = unsafe { self.get_unchecked_mut() };
- let future = unsafe { Pin::new_unchecked(&mut s.future) };
-
- hprintln!(" poll Timeout").ok();
-
- match future.poll(cx) {
- Poll::Ready(r) => {
- if let Some(ch) = s.cancel_handle.take() {
- ch.cancel().ok();
- }
-
- Poll::Ready(Ok(r))
- }
- Poll::Pending => {
- if now >= s.until {
- Poll::Ready(Err(TimeoutError))
- } else if s.cancel_handle.is_none() {
- let waker = cx.waker().clone();
- let sh = crate::app::delay_handler::spawn_after(s.until - now, waker)
- .expect("Internal RTIC bug, this should never fail");
- s.cancel_handle = Some(sh);
-
- Poll::Pending
- } else {
- Poll::Pending
- }
- }
- }
- }
-}
-
-pub struct NeverEndingFuture {}
-
-impl Future for NeverEndingFuture {
- type Output = ();
-
- fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> {
- // Never finish
- hprintln!(" polling NeverEndingFuture").ok();
- Poll::Pending
- }
-}
-
-//=============
-// Async SPI driver
-
-// #[task]
-async fn test_spi(async_spi_driver: &mut AsyncSpi) {
- let transfer = Transaction {
- buf: [0; 16],
- n_write: 1,
- n_read: 5,
- };
-
- let ret = async_spi_driver.transfer(transfer).await;
-
- // do_something(ret);
-}
-
-/// A DMA transaction.
-///
-/// NOTE: Don't leak this `Future`, if you do there is immediate UB!
-struct Transaction {
- pub buf: [u8; 16],
- pub n_write: usize,
- pub n_read: usize,
-}
-
-struct AsyncSpi {
- transaction: Option<Transaction>,
- queue: heapless::spsc::Queue<Waker, 8>,
-}
-
-impl AsyncSpi {
- pub fn transfer(&mut self, transfer: Transaction) -> AsyncSpiTransaction {
- todo!()
- }
-}
-
-struct AsyncSpiTransaction {
- // ...
-}
-
-impl Future for AsyncSpiTransaction {
- type Output = ();
-
- fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
- todo!()
- }
-}
diff --git a/macros/src/codegen.rs b/macros/src/codegen.rs
index 01be1d57..d7711b63 100644
--- a/macros/src/codegen.rs
+++ b/macros/src/codegen.rs
@@ -12,6 +12,7 @@ mod init;
mod local_resources;
mod local_resources_struct;
mod module;
+mod monotonic;
mod post_init;
mod pre_init;
mod shared_resources;
@@ -95,6 +96,8 @@ pub fn app(app: &App, analysis: &Analysis, extra: &Extra) -> TokenStream2 {
let (mod_app_software_tasks, root_software_tasks, user_software_tasks) =
software_tasks::codegen(app, analysis, extra);
+ let monotonics = monotonic::codegen(app, analysis, extra);
+
let mod_app_dispatchers = dispatchers::codegen(app, analysis, extra);
let mod_app_timer_queue = timer_queue::codegen(app, analysis, extra);
let user_imports = &app.user_imports;
@@ -102,59 +105,6 @@ pub fn app(app: &App, analysis: &Analysis, extra: &Extra) -> TokenStream2 {
let name = &app.name;
let device = &extra.device;
- let monotonic_parts: Vec<_> = app
- .monotonics
- .iter()
- .map(|(_, monotonic)| {
- let name = &monotonic.ident;
- let name_str = &name.to_string();
- let ident = util::monotonic_ident(name_str);
- let doc = &format!(
- "This module holds the static implementation for `{}::now()`",
- name_str
- );
-
- let default_monotonic = if monotonic.args.default {
- quote!(pub use #name::now;)
- } else {
- quote!()
- };
-
- quote! {
- #default_monotonic
-
- #[doc = #doc]
- #[allow(non_snake_case)]
- pub mod #name {
-
- /// Read the current time from this monotonic
- pub fn now() -> <super::super::#name as rtic::Monotonic>::Instant {
- rtic::export::interrupt::free(|_| {
- use rtic::Monotonic as _;
- if let Some(m) = unsafe{ &mut *super::super::#ident.get_mut() } {
- m.now()
- } else {
- <super::super::#name as rtic::Monotonic>::zero()
- }
- })
- }
- }
- }
- })
- .collect();
-
- let monotonics = if monotonic_parts.is_empty() {
- quote!()
- } else {
- quote!(
- pub use rtic::Monotonic as _;
-
- /// Holds static methods for each monotonic.
- pub mod monotonics {
- #(#monotonic_parts)*
- }
- )
- };
let rt_err = util::rt_err_ident();
quote!(
diff --git a/macros/src/codegen/dispatchers.rs b/macros/src/codegen/dispatchers.rs
index a90a97c7..e6caa781 100644
--- a/macros/src/codegen/dispatchers.rs
+++ b/macros/src/codegen/dispatchers.rs
@@ -5,7 +5,7 @@ use rtic_syntax::ast::App;
use crate::{analyze::Analysis, check::Extra, codegen::util};
/// Generates task dispatchers
-pub fn codegen(app: &App, analysis: &Analysis, _extra: &Extra) -> Vec<TokenStream2> {
+pub fn codegen(app: &App, analysis: &Analysis, extra: &Extra) -> Vec<TokenStream2> {
let mut items = vec![];
let interrupts = &analysis.interrupts;
@@ -64,6 +64,9 @@ pub fn codegen(app: &App, analysis: &Analysis, _extra: &Extra) -> Vec<TokenStrea
static #rq: rtic::RacyCell<#rq_ty> = rtic::RacyCell::new(#rq_expr);
));
+ let device = &extra.device;
+ let enum_ = util::interrupt_ident();
+ let interrupt = util::suffixed(&interrupts[&level].0.to_string());
let arms = channel
.tasks
.iter()
@@ -73,37 +76,124 @@ pub fn codegen(app: &App, analysis: &Analysis, _extra: &Extra) -> Vec<TokenStrea
let fq = util::fq_ident(name);
let inputs = util::inputs_ident(name);
let (_, tupled, pats, _) = util::regroup_inputs(&task.inputs);
+ let exec_name = util::internal_task_ident(name, "EXEC");
- quote!(
- #(#cfgs)*
- #t::#name => {
- let #tupled =
- (&*#inputs
- .get())
- .get_unchecked(usize::from(index))
- .as_ptr()
- .read();
- (&mut *#fq.get_mut()).split().0.enqueue_unchecked(index);
- let priority = &rtic::export::Priority::new(PRIORITY);
- #name(
- #name::Context::new(priority)
- #(,#pats)*
- )
- }
- )
+ if task.is_async {
+ let executor_run_ident = util::executor_run_ident(name);
+
+ quote!(
+ #(#cfgs)*
+ #t::#name => {
+ if !(&mut *#exec_name.get_mut()).is_running() {
+ let #tupled =
+ (&*#inputs
+ .get())
+ .get_unchecked(usize::from(index))
+ .as_ptr()
+ .read();
+ (&mut *#fq.get_mut()).split().0.enqueue_unchecked(index);
+
+ let priority = &rtic::export::Priority::new(PRIORITY);
+ (&mut *#exec_name.get_mut()).spawn(#name(#name::Context::new(priority), #(,#pats)*));
+ #executor_run_ident.store(true, core::sync::atomic::Ordering::Relaxed);
+ } else {
+ retry_queue.push_unchecked((#t::#name, index));
+ }
+ }
+ )
+ } else {
+ quote!(
+ #(#cfgs)*
+ #t::#name => {
+ let #tupled =
+ (&*#inputs
+ .get())
+ .get_unchecked(usize::from(index))
+ .as_ptr()
+ .read();
+ (&mut *#fq.get_mut()).split().0.enqueue_unchecked(index);
+ let priority = &rtic::export::Priority::new(PRIORITY);
+ #name(
+ #name::Context::new(priority)
+ #(,#pats)*
+ )
+ }
+ )
+ }
})
.collect::<Vec<_>>();
+ for (name, task) in app.software_tasks.iter() {
+ if task.is_async {
+ let type_name = util::internal_task_ident(name, "F");
+ let exec_name = util::internal_task_ident(name, "EXEC");
+
+ stmts.push(quote!(
+ type #type_name = impl core::future::Future + 'static;
+ static #exec_name:
+ rtic::RacyCell<rtic::export::executor::AsyncTaskExecutor<#type_name>> =
+ rtic::RacyCell::new(rtic::export::executor::AsyncTaskExecutor::new());
+ ));
+ }
+ }
+
+ let n_executors: usize = app
+ .software_tasks
+ .iter()
+ .map(|(_, task)| if task.is_async { 1 } else { 0 })
+ .sum();
+
+ // TODO: This `retry_queue` comes from the current design of the dispatcher queue handling.
+ // To remove this we would need to redesign how the dispatcher handles queues, and this can
+ // be done as an optimization later.
+ //
+ // The core issue is that we should only dequeue the ready queue if the exexutor associated
+ // to the task is not running. As it is today this queue is blindly dequeued, see the
+ // `while let Some(...) = (&mut *#rq.get_mut())...` a few lines down. The current "hack" is
+ // to just requeue the executor run if it should not have been dequeued. This needs however
+ // to be done after the ready queue has been exhausted.
+ if n_executors > 0 {
+ stmts.push(quote!(
+ let mut retry_queue: rtic::export::Vec<_, #n_executors> = rtic::export::Vec::new();
+ ));
+ }
+
stmts.push(quote!(
while let Some((task, index)) = (&mut *#rq.get_mut()).split().1.dequeue() {
match task {
#(#arms)*
}
}
+
+ while let Some((task, index)) = retry_queue.pop() {
+ rtic::export::interrupt::free(|_| {
+ (&mut *#rq.get_mut()).enqueue_unchecked((task, index));
+ });
+ }
));
+ for (name, _task) in app.software_tasks.iter().filter_map(|(name, task)| {
+ if task.is_async {
+ Some((name, task))
+ } else {
+ None
+ }
+ }) {
+ let exec_name = util::internal_task_ident(name, "EXEC");
+
+ let executor_run_ident = util::executor_run_ident(name);
+ stmts.push(quote!(
+ if #executor_run_ident.load(core::sync::atomic::Ordering::Relaxed) {
+ #executor_run_ident.store(false, core::sync::atomic::Ordering::Relaxed);
+ (&mut *#exec_name.get_mut()).poll(|| {
+ #executor_run_ident.store(true, core::sync::atomic::Ordering::Release);
+ rtic::pend(#device::#enum_::#interrupt);
+ });
+ }
+ ));
+ }
+
let doc = format!("Interrupt handler to dispatch tasks at priority {}", level);
- let interrupt = util::suffixed(&interrupts[&level].0.to_string());
let attribute = &interrupts[&level].1.attrs;
items.push(quote!(
#[allow(non_snake_case)]
diff --git a/macros/src/codegen/module.rs b/macros/src/codegen/module.rs
index fd8137fa..29f27662 100644
--- a/macros/src/codegen/module.rs
+++ b/macros/src/codegen/module.rs
@@ -54,14 +54,6 @@ pub fn codegen(
Context::Idle | Context::HardwareTask(_) | Context::SoftwareTask(_) => {}
}
- // if ctxt.has_locals(app) {
- // let ident = util::locals_ident(ctxt, app);
- // module_items.push(quote!(
- // #[doc(inline)]
- // pub use super::#ident as Locals;
- // ));
- // }
-
if ctxt.has_local_resources(app) {
let ident = util::local_resources_ident(ctxt, app);
let lt = if local_resources_tick {
@@ -133,6 +125,7 @@ pub fn codegen(
));
module_items.push(quote!(
+ #[doc(inline)]
pub use super::#internal_monotonics_ident as Monotonics;
));
}
@@ -193,6 +186,7 @@ pub fn codegen(
module_items.push(quote!(
#(#cfgs)*
+ #[doc(inline)]
pub use super::#internal_context_name as Context;
));
@@ -225,6 +219,8 @@ pub fn codegen(
#(#cfgs)*
/// Spawns the task directly
+ #[allow(non_snake_case)]
+ #[doc(hidden)]
pub fn #internal_spawn_ident(#(#args,)*) -> Result<(), #ty> {
let input = #tupled;
@@ -239,7 +235,6 @@ pub fn codegen(
rtic::export::interrupt::free(|_| {
(&mut *#rq.get_mut()).enqueue_unchecked((#t::#name, index));
});
-
rtic::pend(#device::#enum_::#interrupt);
Ok(())
@@ -252,6 +247,7 @@ pub fn codegen(
module_items.push(quote!(
#(#cfgs)*
+ #[doc(inline)]
pub use super::#internal_spawn_ident as spawn;
));
@@ -294,15 +290,21 @@ pub fn codegen(
if monotonic.args.default {
module_items.push(quote!(
+ #[doc(inline)]
pub use #m::spawn_after;
+ #[doc(inline)]
pub use #m::spawn_at;
+ #[doc(inline)]
pub use #m::SpawnHandle;
));
}
module_items.push(quote!(
pub mod #m {
+ #[doc(inline)]
pub use super::super::#internal_spawn_after_ident as spawn_after;
+ #[doc(inline)]
pub use super::super::#internal_spawn_at_ident as spawn_at;
+ #[doc(inline)]
pub use super::super::#internal_spawn_handle_ident as SpawnHandle;
}
));
@@ -316,6 +318,7 @@ pub fn codegen(
marker: u32,
}
+ #(#cfgs)*
impl core::fmt::Debug for #internal_spawn_handle_ident {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
f.debug_struct(#spawn_handle_string).finish()
@@ -327,7 +330,7 @@ pub fn codegen(
pub fn cancel(self) -> Result<#ty, ()> {
rtic::export::interrupt::free(|_| unsafe {
let tq = &mut *#tq.get_mut();
- if let Some((_task, index)) = tq.cancel_marker(self.marker) {
+ if let Some((_task, index)) = tq.cancel_task_marker(self.marker) {
// Get the message
let msg = (&*#inputs
.get())
@@ -362,11 +365,12 @@ pub fn codegen(
let tq = (&mut *#tq.get_mut());
- tq.update_marker(self.marker, marker, instant, || #pend).map(|_| #name::#m::SpawnHandle { marker })
+ tq.update_task_marker(self.marker, marker, instant, || #pend).map(|_| #name::#m::SpawnHandle { marker })
})
}
}
+
#(#cfgs)*
/// Spawns the task after a set duration relative to the current time
///
@@ -407,10 +411,10 @@ pub fn codegen(
rtic::export::interrupt::free(|_| {
let marker = #tq_marker.get().read();
- let nr = rtic::export::NotReady {
- instant,
- index,
+ let nr = rtic::export::TaskNotReady {
task: #t::#name,
+ index,
+ instant,
marker,
};
@@ -418,7 +422,7 @@ pub fn codegen(
let tq = &mut *#tq.get_mut();
- tq.enqueue_unchecked(
+ tq.enqueue_task_unchecked(
nr,
|| #enable_interrupt,
|| #pend,
diff --git a/macros/src/codegen/monotonic.rs b/macros/src/codegen/monotonic.rs
new file mode 100644
index 00000000..685502ed
--- /dev/null
+++ b/macros/src/codegen/monotonic.rs
@@ -0,0 +1,251 @@
+use proc_macro2::TokenStream as TokenStream2;
+use quote::quote;
+use rtic_syntax::ast::App;
+
+use crate::{analyze::Analysis, check::Extra, codegen::util};
+
+/// Generates monotonic module dispatchers
+pub fn codegen(app: &App, _analysis: &Analysis, _extra: &Extra) -> TokenStream2 {
+ let mut monotonic_parts: Vec<_> = Vec::new();
+
+ let tq_marker = util::timer_queue_marker_ident();
+
+ for (_, monotonic) in &app.monotonics {
+ // let instants = util::monotonic_instants_ident(name, &monotonic.ident);
+ let monotonic_name = monotonic.ident.to_string();
+
+ let tq = util::tq_ident(&monotonic_name);
+ let m = &monotonic.ident;
+ let m_ident = util::monotonic_ident(&monotonic_name);
+ let m_isr = &monotonic.args.binds;
+ let enum_ = util::interrupt_ident();
+ let name_str = &m.to_string();
+ let ident = util::monotonic_ident(name_str);
+ let doc = &format!(
+ "This module holds the static implementation for `{}::now()`",
+ name_str
+ );
+
+ let (enable_interrupt, pend) = if &*m_isr.to_string() == "SysTick" {
+ (
+ quote!(core::mem::transmute::<_, rtic::export::SYST>(()).enable_interrupt()),
+ quote!(rtic::export::SCB::set_pendst()),
+ )
+ } else {
+ let rt_err = util::rt_err_ident();
+ (
+ quote!(rtic::export::NVIC::unmask(#rt_err::#enum_::#m_isr)),
+ quote!(rtic::pend(#rt_err::#enum_::#m_isr)),
+ )
+ };
+
+ let default_monotonic = if monotonic.args.default {
+ quote!(
+ #[doc(inline)]
+ pub use #m::now;
+ #[doc(inline)]
+ pub use #m::delay;
+ #[doc(inline)]
+ pub use #m::timeout_at;
+ #[doc(inline)]
+ pub use #m::timeout_after;
+ )
+ } else {
+ quote!()
+ };
+
+ monotonic_parts.push(quote! {
+ #default_monotonic
+
+ #[doc = #doc]
+ #[allow(non_snake_case)]
+ pub mod #m {
+
+ /// Read the current time from this monotonic
+ pub fn now() -> <super::super::#m as rtic::Monotonic>::Instant {
+ rtic::export::interrupt::free(|_| {
+ use rtic::Monotonic as _;
+ if let Some(m) = unsafe{ &mut *super::super::#ident.get_mut() } {
+ m.now()
+ } else {
+ <super::super::#m as rtic::Monotonic>::zero()
+ }
+ })
+ }
+
+ fn enqueue_waker(
+ instant: <super::super::#m as rtic::Monotonic>::Instant,
+ waker: core::task::Waker
+ ) -> Result<u32, ()> {
+ unsafe {
+ rtic::export::interrupt::free(|_| {
+ let marker = super::super::#tq_marker.get().read();
+ super::super::#tq_marker.get_mut().write(marker.wrapping_add(1));
+
+ let nr = rtic::export::WakerNotReady {
+ waker,
+ instant,
+ marker,
+ };
+
+ let tq = &mut *super::super::#tq.get_mut();
+
+ tq.enqueue_waker(
+ nr,
+ || #enable_interrupt,
+ || #pend,
+ (&mut *super::super::#m_ident.get_mut()).as_mut()).map(|_| marker)
+ })
+ }
+ }
+
+ /// Delay
+ #[inline(always)]
+ #[allow(non_snake_case)]
+ pub fn delay(duration: <super::super::#m as rtic::Monotonic>::Duration)
+ -> DelayFuture {
+ let until = now() + duration;
+ DelayFuture { until, tq_marker: None }
+ }
+
+ /// Delay future.
+ #[allow(non_snake_case)]
+ #[allow(non_camel_case_types)]
+ pub struct DelayFuture {
+ until: <super::super::#m as rtic::Monotonic>::Instant,
+ tq_marker: Option<u32>,
+ }
+
+ impl core::future::Future for DelayFuture {
+ type Output = Result<(), ()>;
+
+ fn poll(
+ mut self: core::pin::Pin<&mut Self>,
+ cx: &mut core::task::Context<'_>
+ ) -> core::task::Poll<Self::Output> {
+ let mut s = self.as_mut();
+ let now = now();
+
+ if now >= s.until {
+ core::task::Poll::Ready(Ok(()))
+ } else {
+ if s.tq_marker.is_some() {
+ core::task::Poll::Pending
+ } else {
+ match enqueue_waker(s.until, cx.waker().clone()) {
+ Ok(marker) => {
+ s.tq_marker = Some(marker);
+ core::task::Poll::Pending
+ },
+ Err(()) => core::task::Poll::Ready(Err(())),
+ }
+ }
+ }
+ }
+ }
+
+ /// Timeout future.
+ #[allow(non_snake_case)]
+ #[allow(non_camel_case_types)]
+ pub struct TimeoutFuture<F: core::future::Future> {
+ future: F,
+ until: <super::super::#m as rtic::Monotonic>::Instant,
+ tq_marker: Option<u32>,
+ }
+
+ /// Timeout after
+ #[allow(non_snake_case)]
+ #[inline(always)]
+ pub fn timeout_after<F: core::future::Future>(
+ future: F,
+ duration: <super::super::#m as rtic::Monotonic>::Duration
+ ) -> TimeoutFuture<F> {
+ let until = now() + duration;
+ TimeoutFuture {
+ future,
+ until,
+ tq_marker: None,
+ }
+ }
+
+ /// Timeout at
+ #[allow(non_snake_case)]
+ #[inline(always)]
+ pub fn timeout_at<F: core::future::Future>(
+ future: F,
+ instant: <super::super::#m as rtic::Monotonic>::Instant
+ ) -> TimeoutFuture<F> {
+ TimeoutFuture {
+ future,
+ until: instant,
+ tq_marker: None,
+ }
+ }
+
+ impl<F> core::future::Future for TimeoutFuture<F>
+ where
+ F: core::future::Future,
+ {
+ type Output = Result<Result<F::Output, super::TimeoutError>, ()>;
+
+ fn poll(
+ self: core::pin::Pin<&mut Self>,
+ cx: &mut core::task::Context<'_>
+ ) -> core::task::Poll<Self::Output> {
+ let now = now();
+
+ // SAFETY: We don't move the underlying pinned value.
+ let mut s = unsafe { self.get_unchecked_mut() };
+ let future = unsafe { core::pin::Pin::new_unchecked(&mut s.future) };
+
+ match future.poll(cx) {
+ core::task::Poll::Ready(r) => {
+ if let Some(marker) = s.tq_marker {
+ rtic::export::interrupt::free(|_| unsafe {
+ let tq = &mut *super::super::#tq.get_mut();
+ tq.cancel_waker_marker(marker);
+ });
+ }
+
+ core::task::Poll::Ready(Ok(Ok(r)))
+ }
+ core::task::Poll::Pending => {
+ if now >= s.until {
+ // Timeout
+ core::task::Poll::Ready(Ok(Err(super::TimeoutError)))
+ } else if s.tq_marker.is_none() {
+ match enqueue_waker(s.until, cx.waker().clone()) {
+ Ok(marker) => {
+ s.tq_marker = Some(marker);
+ core::task::Poll::Pending
+ },
+ Err(()) => core::task::Poll::Ready(Err(())), // TQ full
+ }
+ } else {
+ core::task::Poll::Pending
+ }
+ }
+ }
+ }
+ }
+ }
+ });
+ }
+
+ if monotonic_parts.is_empty() {
+ quote!()
+ } else {
+ quote!(
+ pub use rtic::Monotonic as _;
+
+ /// Holds static methods for each monotonic.
+ pub mod monotonics {
+ /// A timeout error.
+ #[derive(Debug)]
+ pub struct TimeoutError;
+
+ #(#monotonic_parts)*
+ }
+ )
+ }
+}
diff --git a/macros/src/codegen/software_tasks.rs b/macros/src/codegen/software_tasks.rs
index 78f6c961..6d08a221 100644
--- a/macros/src/codegen/software_tasks.rs
+++ b/macros/src/codegen/software_tasks.rs
@@ -27,13 +27,8 @@ pub fn codegen(
let mut root = vec![];
let mut user_tasks = vec![];
- // Async tasks
- for (name, task) in app.software_tasks.iter().filter(|(_, task)| task.is_async) {
- // todo
- }
-
- // Non-async tasks
- for (name, task) in app.software_tasks.iter().filter(|(_, task)| !task.is_async) {
+ // Any task
+ for (name, task) in app.software_tasks.iter() {
let inputs = &task.inputs;
let (_, _, _, input_ty) = util::regroup_inputs(inputs);
@@ -87,6 +82,7 @@ pub fn codegen(
let uninit = mk_uninit();
let inputs_ident = util::inputs_ident(name);
+
mod_app.push(quote!(
#uninit
// /// Buffer that holds the inputs of a task
@@ -96,6 +92,18 @@ pub fn codegen(
static #inputs_ident: rtic::RacyCell<[core::mem::MaybeUninit<#input_ty>; #cap_lit]> =
rtic::RacyCell::new([#(#elems,)*]);
));
+ if task.is_async {
+ let executor_ident = util::executor_run_ident(name);
+ mod_app.push(quote!(
+ #[allow(non_camel_case_types)]
+ #[allow(non_upper_case_globals)]
+ #[doc(hidden)]
+ static #executor_ident: core::sync::atomic::AtomicBool =
+ core::sync::atomic::AtomicBool::new(false);
+ ));
+ }
+
+ let inputs = &task.inputs;
// `${task}Resources`
let mut shared_needs_lt = false;
@@ -131,11 +139,17 @@ pub fn codegen(
let attrs = &task.attrs;
let cfgs = &task.cfgs;
let stmts = &task.stmts;
+ let async_marker = if task.is_async {
+ quote!(async)
+ } else {
+ quote!()
+ };
+
user_tasks.push(quote!(
#(#attrs)*
#(#cfgs)*
#[allow(non_snake_case)]
- fn #name(#context: #name::Context #(,#inputs)*) {
+ #async_marker fn #name(#context: #name::Context #(,#inputs)*) {
use rtic::Mutex as _;
use rtic::mutex::prelude::*;
diff --git a/macros/src/codegen/timer_queue.rs b/macros/src/codegen/timer_queue.rs
index 32e288c5..513f78af 100644
--- a/macros/src/codegen/timer_queue.rs
+++ b/macros/src/codegen/timer_queue.rs
@@ -1,9 +1,8 @@
+use crate::{analyze::Analysis, check::Extra, codegen::util};
use proc_macro2::TokenStream as TokenStream2;
use quote::quote;
use rtic_syntax::ast::App;
-use crate::{analyze::Analysis, check::Extra, codegen::util};
-
/// Generates timer queues and timer queue handlers
#[allow(clippy::too_many_lines)]
pub fn codegen(app: &App, analysis: &Analysis, _extra: &Extra) -> Vec<TokenStream2> {
@@ -67,8 +66,14 @@ pub fn codegen(app: &App, analysis: &Analysis, _extra: &Extra) -> Vec<TokenStrea
.iter()
.map(|(_name, task)| task.args.capacity as usize)
.sum();
- let n = util::capacity_literal(cap);
- let tq_ty = quote!(rtic::export::TimerQueue<#mono_type, #t, #n>);
+ let n_task = util::capacity_literal(cap);
+ let n_worker: usize = app
+ .software_tasks
+ .iter()
+ .map(|(_name, task)| task.is_async as usize)
+ .sum();
+ let n_worker = util::capacity_literal(n_worker);
+ let tq_ty = quote!(rtic::export::TimerQueue<#mono_type, #t, #n_task, #n_worker>);
// For future use
// let doc = format!(" RTIC internal: {}:{}", file!(), line!());
@@ -76,8 +81,12 @@ pub fn codegen(app: &App, analysis: &Analysis, _extra: &Extra) -> Vec<TokenStrea
#[doc(hidden)]
#[allow(non_camel_case_types)]
#[allow(non_upper_case_globals)]
- static #tq: rtic::RacyCell<#tq_ty> =
- rtic::RacyCell::new(rtic::export::TimerQueue(rtic::export::SortedLinkedList::new_u16()));
+ static #tq: rtic::RacyCell<#tq_ty> = rtic::RacyCell::new(
+ rtic::export::TimerQueue {
+ task_queue: rtic::export::SortedLinkedList::new_u16(),
+ waker_queue: rtic::export::SortedLinkedList::new_u16(),
+ }
+ );
));
let mono = util::monotonic_ident(&monotonic_name);
@@ -118,7 +127,9 @@ pub fn codegen(app: &App, analysis: &Analysis, _extra: &Extra) -> Vec<TokenStrea
quote!(
#(#cfgs)*
#t::#name => {
- rtic::export::interrupt::free(|_| (&mut *#rq.get_mut()).split().0.enqueue_unchecked((#rqt::#name, index)));
+ rtic::export::interrupt::free(|_|
+ (&mut *#rq.get_mut()).split().0.enqueue_unchecked((#rqt::#name, index))
+ );
#pend
}
@@ -137,7 +148,7 @@ pub fn codegen(app: &App, analysis: &Analysis, _extra: &Extra) -> Vec<TokenStrea
#[no_mangle]
#[allow(non_snake_case)]
unsafe fn #bound_interrupt() {
- while let Some((task, index)) = rtic::export::interrupt::free(|_|
+ while let Some(task_or_waker) = rtic::export::interrupt::free(|_|
if let Some(mono) = (&mut *#m_ident.get_mut()).as_mut() {
(&mut *#tq.get_mut()).dequeue(|| #disable_isr, mono)
} else {
@@ -146,8 +157,13 @@ pub fn codegen(app: &App, analysis: &Analysis, _extra: &Extra) -> Vec<TokenStrea
core::hint::unreachable_unchecked()
})
{
- match task {
- #(#arms)*
+ match task_or_waker {
+ rtic::export::TaskOrWaker::Waker(waker) => waker.wake(),
+ rtic::export::TaskOrWaker::Task((task, index)) => {
+ match task {
+ #(#arms)*
+ }
+ }
}
}
diff --git a/macros/src/codegen/util.rs b/macros/src/codegen/util.rs
index 0a3edc20..a0a59090 100644
--- a/macros/src/codegen/util.rs
+++ b/macros/src/codegen/util.rs
@@ -67,6 +67,11 @@ pub fn inputs_ident(task: &Ident) -> Ident {
mark_internal_name(&format!("{}_INPUTS", task))
}
+/// Generates an identifier for the `EXECUTOR_RUN` atomics (`async` API)
+pub fn executor_run_ident(task: &Ident) -> Ident {
+ mark_internal_name(&format!("{}_EXECUTOR_RUN", task))
+}
+
/// Generates an identifier for the `INSTANTS` buffer (`schedule` API)
pub fn monotonic_instants_ident(task: &Ident, monotonic: &Ident) -> Ident {
mark_internal_name(&format!("{}_{}_INSTANTS", task, monotonic))
@@ -222,7 +227,7 @@ pub fn rq_ident(priority: u8) -> Ident {
/// Generates an identifier for the `enum` of `schedule`-able tasks
pub fn schedule_t_ident() -> Ident {
- Ident::new("SCHED_T", Span::call_site())
+ mark_internal_name("SCHED_T")
}
/// Generates an identifier for the `enum` of `spawn`-able tasks
@@ -230,7 +235,7 @@ pub fn schedule_t_ident() -> Ident {
/// This identifier needs the same structure as the `RQ` identifier because there's one ready queue
/// for each of these `T` enums
pub fn spawn_t_ident(priority: u8) -> Ident {
- Ident::new(&format!("P{}_T", priority), Span::call_site())
+ mark_internal_name(&format!("P{}_T", priority))
}
/// Suffixed identifier
diff --git a/src/export.rs b/src/export.rs
index 6f2a1b63..703c0e04 100644
--- a/src/export.rs
+++ b/src/export.rs
@@ -4,7 +4,7 @@ use core::{
sync::atomic::{AtomicBool, Ordering},
};
-pub use crate::tq::{NotReady, TimerQueue};
+pub use crate::tq::{TaskNotReady, TaskOrWaker, TimerQueue, WakerNotReady};
pub use bare_metal::CriticalSection;
pub use cortex_m::{
asm::nop,
@@ -16,8 +16,74 @@ pub use cortex_m::{
pub use heapless::sorted_linked_list::SortedLinkedList;
pub use heapless::spsc::Queue;
pub use heapless::BinaryHeap;
+pub use heapless::Vec;
pub use rtic_monotonic as monotonic;
+pub mod executor {
+ use core::{
+ future::Future,
+ mem,
+ pin::Pin,
+ task::{Context, Poll, RawWaker, RawWakerVTable, Waker},
+ };
+
+ static WAKER_VTABLE: RawWakerVTable =
+ RawWakerVTable::new(waker_clone, waker_wake, waker_wake, waker_drop);
+
+ unsafe fn waker_clone(p: *const ()) -> RawWaker {
+ RawWaker::new(p, &WAKER_VTABLE)
+ }
+
+ unsafe fn waker_wake(p: *const ()) {
+ // The only thing we need from a waker is the function to call to pend the async
+ // dispatcher.
+ let f: fn() = mem::transmute(p);
+ f();
+ }
+
+ unsafe fn waker_drop(_: *const ()) {
+ // nop
+ }
+
+ //============
+ // AsyncTaskExecutor
+
+ pub struct AsyncTaskExecutor<F: Future + 'static> {
+ task: Option<F>,
+ }
+
+ impl<F: Future + 'static> AsyncTaskExecutor<F> {
+ pub const fn new() -> Self {
+ Self { task: None }
+ }
+
+ pub fn is_running(&self) -> bool {
+ self.task.is_some()
+ }
+
+ pub fn spawn(&mut self, future: F) {
+ self.task = Some(future);
+ }
+
+ pub fn poll(&mut self, wake: fn()) {
+ if let Some(future) = &mut self.task {
+ unsafe {
+ let waker = Waker::from_raw(RawWaker::new(wake as *const (), &WAKER_VTABLE));
+ let mut cx = Context::from_waker(&waker);
+ let future = Pin::new_unchecked(future);
+
+ match future.poll(&mut cx) {
+ Poll::Ready(_) => {
+ self.task = None;
+ }
+ Poll::Pending => {}
+ };
+ }
+ }
+ }
+ }
+}
+
pub type SCFQ<const N: usize> = Queue<u8, N>;
pub type SCRQ<T, const N: usize> = Queue<(T, u8), N>;
diff --git a/src/tq.rs b/src/tq.rs
index 0f585ba4..90542e73 100644
--- a/src/tq.rs
+++ b/src/tq.rs
@@ -1,29 +1,25 @@
use crate::Monotonic;
use core::cmp::Ordering;
+use core::task::Waker;
use heapless::sorted_linked_list::{LinkedIndexU16, Min, SortedLinkedList};
-pub struct TimerQueue<Mono, Task, const N: usize>(
- pub SortedLinkedList<NotReady<Mono, Task>, LinkedIndexU16, Min, N>,
-)
+pub struct TimerQueue<Mono, Task, const N_TASK: usize, const N_WAKER: usize>
where
Mono: Monotonic,
- Task: Copy;
+ Task: Copy,
+{
+ pub task_queue: SortedLinkedList<TaskNotReady<Mono, Task>, LinkedIndexU16, Min, N_TASK>,
+ pub waker_queue: SortedLinkedList<WakerNotReady<Mono>, LinkedIndexU16, Min, N_WAKER>,
+}
-impl<Mono, Task, const N: usize> TimerQueue<Mono, Task, N>
+impl<Mono, Task, const N_TASK: usize, const N_WAKER: usize> TimerQueue<Mono, Task, N_TASK, N_WAKER>
where
Mono: Monotonic,
Task: Copy,
{
- /// # Safety
- ///
- /// Writing to memory with a transmute in order to enable
- /// interrupts of the ``SysTick`` timer
- ///
- /// Enqueue a task without checking if it is full
- #[inline]
- pub unsafe fn enqueue_unchecked<F1, F2>(
- &mut self,
- nr: NotReady<Mono, Task>,
+ fn check_if_enable<F1, F2>(
+ &self,
+ instant: Mono::Instant,
enable_interrupt: F1,
pend_handler: F2,
mono: Option<&mut Mono>,
@@ -33,11 +29,17 @@ where
{
// Check if the top contains a non-empty element and if that element is
// greater than nr
- let if_heap_max_greater_than_nr =
- self.0.peek().map_or(true, |head| nr.instant < head.instant);
+ let if_task_heap_max_greater_than_nr = self
+ .task_queue
+ .peek()
+ .map_or(true, |head| instant < head.instant);
+ let if_waker_heap_max_greater_than_nr = self
+ .waker_queue
+ .peek()
+ .map_or(true, |head| instant < head.instant);
- if if_heap_max_greater_than_nr {
- if Mono::DISABLE_INTERRUPT_ON_EMPTY_QUEUE && self.0.is_empty() {
+ if if_task_heap_max_greater_than_nr || if_waker_heap_max_greater_than_nr {
+ if Mono::DISABLE_INTERRUPT_ON_EMPTY_QUEUE && self.is_empty() {
if let Some(mono) = mono {
mono.enable_timer();
}
@@ -46,19 +48,50 @@ where
pend_handler();
}
+ }
+
+ /// Enqueue a task without checking if it is full
+ #[inline]
+ pub unsafe fn enqueue_task_unchecked<F1, F2>(
+ &mut self,
+ nr: TaskNotReady<Mono, Task>,
+ enable_interrupt: F1,
+ pend_handler: F2,
+ mono: Option<&mut Mono>,
+ ) where
+ F1: FnOnce(),
+ F2: FnOnce(),
+ {
+ self.check_if_enable(nr.instant, enable_interrupt, pend_handler, mono);
+ self.task_queue.push_unchecked(nr);
+ }
- self.0.push_unchecked(nr);
+ /// Enqueue a waker
+ #[inline]
+ pub fn enqueue_waker<F1, F2>(
+ &mut self,
+ nr: WakerNotReady<Mono>,
+ enable_interrupt: F1,
+ pend_handler: F2,
+ mono: Option<&mut Mono>,
+ ) -> Result<(), ()>
+ where
+ F1: FnOnce(),
+ F2: FnOnce(),
+ {
+ self.check_if_enable(nr.instant, enable_interrupt, pend_handler, mono);
+ self.waker_queue.push(nr).map_err(|_| ())
}
- /// Check if the timer queue is empty.
+ /// Check if all the timer queue is empty.
#[inline]
pub fn is_empty(&self) -> bool {
- self.0.is_empty()
+ self.task_queue.is_empty() && self.waker_queue.is_empty()
}
- /// Cancel the marker value
- pub fn cancel_marker(&mut self, marker: u32) -> Option<(Task, u8)> {
- if let Some(val) = self.0.find_mut(|nr| nr.marker == marker) {
+ /// Cancel the marker value for a task
+ pub fn cancel_task_marker(&mut self, marker: u32) -> Option<(Task, u8)> {
+ if let Some(val) = self.task_queue.find_mut(|nr| nr.marker == marker) {
let nr = val.pop();
Some((nr.task, nr.index))
@@ -67,16 +100,23 @@ where
}
}
- /// Update the instant at an marker value to a new instant
+ /// Cancel the marker value for a waker
+ pub fn cancel_waker_marker(&mut self, marker: u32) {
+ if let Some(val) = self.waker_queue.find_mut(|nr| nr.marker == marker) {
+ let _ = val.pop();
+ }
+ }
+
+ /// Update the instant at an marker value for a task to a new instant
#[allow(clippy::result_unit_err)]
- pub fn update_marker<F: FnOnce()>(
+ pub fn update_task_marker<F: FnOnce()>(
&mut self,
marker: u32,
new_marker: u32,
instant: Mono::Instant,
pend_handler: F,
) -> Result<(), ()> {
- if let Some(mut val) = self.0.find_mut(|nr| nr.marker == marker) {
+ if let Some(mut val) = self.task_queue.find_mut(|nr| nr.marker == marker) {
val.instant = instant;
val.marker = new_marker;
@@ -89,66 +129,134 @@ where
}
}
+ fn dequeue_task_queue(
+ &mut self,
+ instant: Mono::Instant,
+ mono: &mut Mono,
+ ) -> Option<TaskOrWaker<Task>> {
+ let now = mono.now();
+ if instant <= now {
+ // task became ready
+ let nr = unsafe { self.task_queue.pop_unchecked() };
+ Some(TaskOrWaker::Task((nr.task, nr.index)))
+ } else {
+ // Set compare
+ mono.set_compare(instant);
+
+ // Double check that the instant we set is really in the future, else
+ // dequeue. If the monotonic is fast enough it can happen that from the
+ // read of now to the set of the compare, the time can overflow. This is to
+ // guard against this.
+ if instant <= now {
+ let nr = unsafe { self.task_queue.pop_unchecked() };
+ Some(TaskOrWaker::Task((nr.task, nr.index)))
+ } else {
+ None
+ }
+ }
+ }
+
+ fn dequeue_waker_queue(
+ &mut self,
+ instant: Mono::Instant,
+ mono: &mut Mono,
+ ) -> Option<TaskOrWaker<Task>> {
+ let now = mono.now();
+ if instant <= now {
+ // task became ready
+ let nr = unsafe { self.waker_queue.pop_unchecked() };
+ Some(TaskOrWaker::Waker(nr.waker))
+ } else {
+ // Set compare
+ mono.set_compare(instant);
+
+ // Double check that the instant we set is really in the future, else
+ // dequeue. If the monotonic is fast enough it can happen that from the
+ // read of now to the set of the compare, the time can overflow. This is to
+ // guard against this.
+ if instant <= now {
+ let nr = unsafe { self.waker_queue.pop_unchecked() };
+ Some(TaskOrWaker::Waker(nr.waker))
+ } else {
+ None
+ }
+ }
+ }
+
/// Dequeue a task from the ``TimerQueue``
- pub fn dequeue<F>(&mut self, disable_interrupt: F, mono: &mut Mono) -> Option<(Task, u8)>
+ pub fn dequeue<F>(&mut self, disable_interrupt: F, mono: &mut Mono) -> Option<TaskOrWaker<Task>>
where
F: FnOnce(),
{
mono.clear_compare_flag();
- if let Some(instant) = self.0.peek().map(|p| p.instant) {
- if instant <= mono.now() {
- // task became ready
- let nr = unsafe { self.0.pop_unchecked() };
-
- Some((nr.task, nr.index))
- } else {
- // Set compare
- mono.set_compare(instant);
+ let tq = self.task_queue.peek().map(|p| p.instant);
+ let wq = self.waker_queue.peek().map(|p| p.instant);
- // Double check that the instant we set is really in the future, else
- // dequeue. If the monotonic is fast enough it can happen that from the
- // read of now to the set of the compare, the time can overflow. This is to
- // guard against this.
- if instant <= mono.now() {
- let nr = unsafe { self.0.pop_unchecked() };
+ let dequeue_task;
+ let instant;
- Some((nr.task, nr.index))
+ match (tq, wq) {
+ (Some(tq_instant), Some(wq_instant)) => {
+ if tq_instant <= wq_instant {
+ dequeue_task = true;
+ instant = tq_instant;
} else {
- None
+ dequeue_task = false;
+ instant = wq_instant;
}
}
- } else {
- // The queue is empty, disable the interrupt.
- if Mono::DISABLE_INTERRUPT_ON_EMPTY_QUEUE {
- disable_interrupt();
- mono.disable_timer();
+ (Some(tq_instant), None) => {
+ dequeue_task = true;
+ instant = tq_instant;
+ }
+ (None, Some(wq_instant)) => {
+ dequeue_task = false;
+ instant = wq_instant;
}
+ (None, None) => {
+ // The queue is empty, disable the interrupt.
+ if Mono::DISABLE_INTERRUPT_ON_EMPTY_QUEUE {
+ disable_interrupt();
+ mono.disable_timer();
+ }
- None
+ return None;
+ }
+ }
+
+ if dequeue_task {
+ self.dequeue_task_queue(instant, mono)
+ } else {
+ self.dequeue_waker_queue(instant, mono)
}
}
}
-pub struct NotReady<Mono, Task>
+pub enum TaskOrWaker<Task> {
+ Task((Task, u8)),
+ Waker(Waker),
+}
+
+pub struct TaskNotReady<Mono, Task>
where
Task: Copy,
Mono: Monotonic,
{
+ pub task: Task,
pub index: u8,
pub instant: Mono::Instant,
- pub task: Task,
pub marker: u32,
}
-impl<Mono, Task> Eq for NotReady<Mono, Task>
+impl<Mono, Task> Eq for TaskNotReady<Mono, Task>
where
Task: Copy,
Mono: Monotonic,
{
}
-impl<Mono, Task> Ord for NotReady<Mono, Task>
+impl<Mono, Task> Ord for TaskNotReady<Mono, Task>
where
Task: Copy,
Mono: Monotonic,
@@ -158,7 +266,7 @@ where
}
}
-impl<Mono, Task> PartialEq for NotReady<Mono, Task>
+impl<Mono, Task> PartialEq for TaskNotReady<Mono, Task>
where
Task: Copy,
Mono: Monotonic,
@@ -168,7 +276,7 @@ where
}
}
-impl<Mono, Task> PartialOrd for NotReady<Mono, Task>
+impl<Mono, Task> PartialOrd for TaskNotReady<Mono, Task>
where
Task: Copy,
Mono: Monotonic,
@@ -177,3 +285,41 @@ where
Some(self.cmp(other))
}
}
+
+pub struct WakerNotReady<Mono>
+where
+ Mono: Monotonic,
+{
+ pub waker: Waker,
+ pub instant: Mono::Instant,
+ pub marker: u32,
+}
+
+impl<Mono> Eq for WakerNotReady<Mono> where Mono: Monotonic {}
+
+impl<Mono> Ord for WakerNotReady<Mono>
+where
+ Mono: Monotonic,
+{
+ fn cmp(&self, other: &Self) -> Ordering {
+ self.instant.cmp(&other.instant)
+ }
+}
+
+impl<Mono> PartialEq for WakerNotReady<Mono>
+where
+ Mono: Monotonic,
+{
+ fn eq(&self, other: &Self) -> bool {
+ self.instant == other.instant
+ }
+}
+
+impl<Mono> PartialOrd for WakerNotReady<Mono>
+where
+ Mono: Monotonic,
+{
+ fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+ Some(self.cmp(other))
+ }
+}