aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--macros/src/codegen/async_dispatchers.rs11
-rw-r--r--macros/src/codegen/module.rs8
-rw-r--r--src/export/executor.rs45
3 files changed, 34 insertions, 30 deletions
diff --git a/macros/src/codegen/async_dispatchers.rs b/macros/src/codegen/async_dispatchers.rs
index 012bd61a..a12ad325 100644
--- a/macros/src/codegen/async_dispatchers.rs
+++ b/macros/src/codegen/async_dispatchers.rs
@@ -44,16 +44,15 @@ pub fn codegen(app: &App, analysis: &Analysis) -> TokenStream2 {
for name in channel.tasks.iter() {
let exec_name = util::internal_task_ident(name, "EXEC");
+ // TODO: Fix cfg
// let task = &app.software_tasks[name];
// let cfgs = &task.cfgs;
stmts.push(quote!(
- if #exec_name.check_and_clear_pending() {
- #exec_name.poll(|| {
- #exec_name.set_pending();
- #pend_interrupt
- });
- }
+ #exec_name.poll(|| {
+ #exec_name.set_pending();
+ #pend_interrupt
+ });
));
}
diff --git a/macros/src/codegen/module.rs b/macros/src/codegen/module.rs
index 666bd042..f4c188a4 100644
--- a/macros/src/codegen/module.rs
+++ b/macros/src/codegen/module.rs
@@ -156,11 +156,8 @@ pub fn codegen(ctxt: Context, app: &App, analysis: &Analysis) -> TokenStream2 {
#[allow(non_snake_case)]
#[doc(hidden)]
pub fn #internal_spawn_ident(#(#input_args,)*) -> Result<(), #input_ty> {
- if #exec_name.try_reserve() {
- // This unsafe is protected by `try_reserve`, see its documentation for details
- unsafe {
- #exec_name.spawn_unchecked(#name(#name::Context::new() #(,#input_untupled)*));
- }
+
+ if #exec_name.spawn(|| #name(unsafe { #name::Context::new() } #(,#input_untupled)*) ) {
#pend_interrupt
@@ -168,6 +165,7 @@ pub fn codegen(ctxt: Context, app: &App, analysis: &Analysis) -> TokenStream2 {
} else {
Err(#input_tupled)
}
+
}
));
diff --git a/src/export/executor.rs b/src/export/executor.rs
index 2f88eff9..e64cc43e 100644
--- a/src/export/executor.rs
+++ b/src/export/executor.rs
@@ -30,7 +30,7 @@ unsafe fn waker_drop(_: *const ()) {
/// Executor for an async task.
pub struct AsyncTaskExecutor<F: Future> {
- // `task` is proteced by the `running` flag.
+ // `task` is protected by the `running` flag.
task: UnsafeCell<MaybeUninit<F>>,
running: AtomicBool,
pending: AtomicBool,
@@ -40,6 +40,7 @@ unsafe impl<F: Future> Sync for AsyncTaskExecutor<F> {}
impl<F: Future> AsyncTaskExecutor<F> {
/// Create a new executor.
+ #[inline(always)]
pub const fn new() -> Self {
Self {
task: UnsafeCell::new(MaybeUninit::uninit()),
@@ -49,45 +50,51 @@ impl<F: Future> AsyncTaskExecutor<F> {
}
/// Check if there is an active task in the executor.
+ #[inline(always)]
pub fn is_running(&self) -> bool {
self.running.load(Ordering::Relaxed)
}
/// Checks if a waker has pended the executor and simultaneously clears the flag.
- pub fn check_and_clear_pending(&self) -> bool {
+ #[inline(always)]
+ fn check_and_clear_pending(&self) -> bool {
+ // Ordering::Acquire to enforce that update of task is visible to poll
self.pending
- .compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed)
+ .compare_exchange(true, false, Ordering::Acquire, Ordering::Relaxed)
.is_ok()
}
// Used by wakers to indicate that the executor needs to run.
+ #[inline(always)]
pub fn set_pending(&self) {
self.pending.store(true, Ordering::Release);
}
- /// Try to reserve the executor for a future.
- /// Used in conjunction with `spawn_unchecked` to reserve the executor before spawning.
- ///
- /// This could have been joined with `spawn_unchecked` for a complete safe API, however the
- /// codegen needs to see if the reserve fails so it can give back input parameters. If spawning
- /// was done within the same call the input parameters would be lost and could not be returned.
- pub fn try_reserve(&self) -> bool {
- self.running
+ /// Spawn a future
+ #[inline(always)]
+ pub fn spawn(&self, future: impl Fn() -> F) -> bool {
+ // Try to reserve the executor for a future.
+ if self
+ .running
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
.is_ok()
- }
-
- /// Spawn a future, only valid to do after `try_reserve` succeeds.
- pub unsafe fn spawn_unchecked(&self, future: F) {
- debug_assert!(self.running.load(Ordering::Relaxed));
+ {
+ // This unsafe is protected by `running` being false and the atomic setting it to true.
+ unsafe {
+ self.task.get().write(MaybeUninit::new(future()));
+ }
+ self.set_pending();
- self.task.get().write(MaybeUninit::new(future));
- self.set_pending();
+ true
+ } else {
+ false
+ }
}
/// Poll the future in the executor.
+ #[inline(always)]
pub fn poll(&self, wake: fn()) {
- if self.is_running() {
+ if self.is_running() && self.check_and_clear_pending() {
let waker = unsafe { Waker::from_raw(RawWaker::new(wake as *const (), &WAKER_VTABLE)) };
let mut cx = Context::from_waker(&waker);
let future = unsafe { Pin::new_unchecked(&mut *(self.task.get() as *mut F)) };