1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
|
use super::atomic::{AtomicBool, Ordering};
use core::{
cell::UnsafeCell,
future::Future,
mem::{self, MaybeUninit},
pin::Pin,
task::{Context, Poll, RawWaker, RawWakerVTable, Waker},
};
static WAKER_VTABLE: RawWakerVTable =
RawWakerVTable::new(waker_clone, waker_wake, waker_wake, waker_drop);
unsafe fn waker_clone(p: *const ()) -> RawWaker {
RawWaker::new(p, &WAKER_VTABLE)
}
unsafe fn waker_wake(p: *const ()) {
// The only thing we need from a waker is the function to call to pend the async
// dispatcher.
let f: fn() = mem::transmute(p);
f();
}
unsafe fn waker_drop(_: *const ()) {
// nop
}
//============
// AsyncTaskExecutor
/// Executor for an async task.
pub struct AsyncTaskExecutor<F: Future> {
// `task` is protected by the `running` flag.
task: UnsafeCell<MaybeUninit<F>>,
running: AtomicBool,
pending: AtomicBool,
}
unsafe impl<F: Future> Sync for AsyncTaskExecutor<F> {}
impl<F: Future> AsyncTaskExecutor<F> {
/// Create a new executor.
#[inline(always)]
pub const fn new() -> Self {
Self {
task: UnsafeCell::new(MaybeUninit::uninit()),
running: AtomicBool::new(false),
pending: AtomicBool::new(false),
}
}
/// Check if there is an active task in the executor.
#[inline(always)]
pub fn is_running(&self) -> bool {
self.running.load(Ordering::Relaxed)
}
/// Checks if a waker has pended the executor and simultaneously clears the flag.
#[inline(always)]
fn check_and_clear_pending(&self) -> bool {
// Ordering::Acquire to enforce that update of task is visible to poll
self.pending
.compare_exchange(true, false, Ordering::Acquire, Ordering::Relaxed)
.is_ok()
}
// Used by wakers to indicate that the executor needs to run.
#[inline(always)]
pub fn set_pending(&self) {
self.pending.store(true, Ordering::Release);
}
/// Spawn a future
#[inline(always)]
pub fn spawn(&self, future: impl Fn() -> F) -> bool {
// Try to reserve the executor for a future.
if self
.running
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
.is_ok()
{
// This unsafe is protected by `running` being false and the atomic setting it to true.
unsafe {
self.task.get().write(MaybeUninit::new(future()));
}
self.set_pending();
true
} else {
false
}
}
/// Poll the future in the executor.
#[inline(always)]
pub fn poll(&self, wake: fn()) {
if self.is_running() && self.check_and_clear_pending() {
let waker = unsafe { Waker::from_raw(RawWaker::new(wake as *const (), &WAKER_VTABLE)) };
let mut cx = Context::from_waker(&waker);
let future = unsafe { Pin::new_unchecked(&mut *(self.task.get() as *mut F)) };
match future.poll(&mut cx) {
Poll::Ready(_) => {
self.running.store(false, Ordering::Release);
}
Poll::Pending => {}
}
}
}
}
|