axtask/wait_queue.rs
1use alloc::collections::VecDeque;
2use alloc::sync::Arc;
3use alloc::vec::Vec;
4
5use kernel_guard::{NoOp, NoPreemptIrqSave};
6use kspin::{SpinNoIrq, SpinNoIrqGuard};
7
8use crate::{AxTaskRef, CurrentTask, current_run_queue, select_run_queue};
9
10/// A queue to store sleeping tasks.
11///
12/// # Examples
13///
14/// ```
15/// use axtask::WaitQueue;
16/// use core::sync::atomic::{AtomicU32, Ordering};
17///
18/// static VALUE: AtomicU32 = AtomicU32::new(0);
19/// static WQ: WaitQueue = WaitQueue::new();
20///
21/// axtask::init_scheduler();
22/// // spawn a new task that updates `VALUE` and notifies the main task
23/// axtask::spawn(|| {
24/// assert_eq!(VALUE.load(Ordering::Relaxed), 0);
25/// VALUE.fetch_add(1, Ordering::Relaxed);
26/// WQ.notify_one(true); // wake up the main task
27/// });
28///
29/// WQ.wait(); // block until `notify()` is called
30/// assert_eq!(VALUE.load(Ordering::Relaxed), 1);
31/// ```
32pub struct WaitQueue {
33 queue: SpinNoIrq<VecDeque<AxTaskRef>>,
34}
35
36pub(crate) type WaitQueueGuard<'a> = SpinNoIrqGuard<'a, VecDeque<AxTaskRef>>;
37
38impl WaitQueue {
39 /// Creates an empty wait queue.
40 pub const fn new() -> Self {
41 Self {
42 queue: SpinNoIrq::new(VecDeque::new()),
43 }
44 }
45
46 /// Creates an empty wait queue with space for at least `capacity` elements.
47 pub fn with_capacity(capacity: usize) -> Self {
48 Self {
49 queue: SpinNoIrq::new(VecDeque::with_capacity(capacity)),
50 }
51 }
52
53 /// Cancel events by removing the task from the wait queue.
54 /// If `from_timer_list` is true, try to remove the task from the timer list.
55 fn cancel_events(&self, curr: CurrentTask, _from_timer_list: bool) {
56 // A task can be wake up only one events (timer or `notify()`), remove
57 // the event from another queue.
58 if curr.in_wait_queue() {
59 // wake up by timer (timeout).
60 self.queue.lock().retain(|t| !curr.ptr_eq(t));
61 curr.set_in_wait_queue(false);
62 }
63
64 // Try to cancel a timer event from timer lists.
65 // Just mark task's current timer ticket ID as expired.
66 #[cfg(feature = "irq")]
67 if _from_timer_list {
68 curr.timer_ticket_expired();
69 // Note:
70 // this task is still not removed from timer list of target CPU,
71 // which may cause some redundant timer events because it still needs to
72 // go through the process of expiring an event from the timer list and invoking the callback.
73 // (it can be considered a lazy-removal strategy, it will be ignored when it is about to take effect.)
74 }
75 }
76
77 /// Blocks the current task and put it into the wait queue, until other task
78 /// notifies it.
79 pub fn wait(&self) {
80 current_run_queue::<NoPreemptIrqSave>().blocked_resched(self.queue.lock());
81 self.cancel_events(crate::current(), false);
82 }
83
84 /// Blocks the current task and put it into the wait queue, until the given
85 /// `condition` becomes true.
86 ///
87 /// Note that even other tasks notify this task, it will not wake up until
88 /// the condition becomes true.
89 pub fn wait_until<F>(&self, condition: F)
90 where
91 F: Fn() -> bool,
92 {
93 let curr = crate::current();
94 loop {
95 let mut rq = current_run_queue::<NoPreemptIrqSave>();
96 let wq = self.queue.lock();
97 if condition() {
98 break;
99 }
100 rq.blocked_resched(wq);
101 // Preemption may occur here.
102 }
103 self.cancel_events(curr, false);
104 }
105
106 /// Blocks the current task and put it into the wait queue, until other tasks
107 /// notify it, or the given duration has elapsed.
108 #[cfg(feature = "irq")]
109 pub fn wait_timeout(&self, dur: core::time::Duration) -> bool {
110 let mut rq = current_run_queue::<NoPreemptIrqSave>();
111 let curr = crate::current();
112 let deadline = axhal::time::wall_time() + dur;
113 debug!(
114 "task wait_timeout: {} deadline={:?}",
115 curr.id_name(),
116 deadline
117 );
118 crate::timers::set_alarm_wakeup(deadline, curr.clone());
119
120 rq.blocked_resched(self.queue.lock());
121
122 let timeout = curr.in_wait_queue(); // still in the wait queue, must have timed out
123
124 // Always try to remove the task from the timer list.
125 self.cancel_events(curr, true);
126 timeout
127 }
128
129 /// Blocks the current task and put it into the wait queue, until the given
130 /// `condition` becomes true, or the given duration has elapsed.
131 ///
132 /// Note that even other tasks notify this task, it will not wake up until
133 /// the above conditions are met.
134 #[cfg(feature = "irq")]
135 pub fn wait_timeout_until<F>(&self, dur: core::time::Duration, condition: F) -> bool
136 where
137 F: Fn() -> bool,
138 {
139 let curr = crate::current();
140 let deadline = axhal::time::wall_time() + dur;
141 debug!(
142 "task wait_timeout: {}, deadline={:?}",
143 curr.id_name(),
144 deadline
145 );
146 crate::timers::set_alarm_wakeup(deadline, curr.clone());
147
148 let mut timeout = true;
149 loop {
150 let mut rq = current_run_queue::<NoPreemptIrqSave>();
151 if axhal::time::wall_time() >= deadline {
152 break;
153 }
154 let wq = self.queue.lock();
155 if condition() {
156 timeout = false;
157 break;
158 }
159
160 rq.blocked_resched(wq);
161 // Preemption may occur here.
162 }
163 // Always try to remove the task from the timer list.
164 self.cancel_events(curr, true);
165 timeout
166 }
167
168 /// Wakes up one task in the wait queue, usually the first one.
169 ///
170 /// If `resched` is true, the current task will be preempted when the
171 /// preemption is enabled.
172 pub fn notify_one(&self, resched: bool) -> bool {
173 let mut wq = self.queue.lock();
174 if let Some(task) = wq.pop_front() {
175 unblock_one_task(task, resched);
176 true
177 } else {
178 false
179 }
180 }
181
182 /// Wakes all tasks in the wait queue.
183 ///
184 /// If `resched` is true, the current task will be preempted when the
185 /// preemption is enabled.
186 pub fn notify_all(&self, resched: bool) {
187 while self.notify_one(resched) {
188 // loop until the wait queue is empty
189 }
190 }
191
192 /// Wake up the given task in the wait queue.
193 ///
194 /// If `resched` is true, the current task will be preempted when the
195 /// preemption is enabled.
196 pub fn notify_task(&mut self, resched: bool, task: &AxTaskRef) -> bool {
197 let mut wq = self.queue.lock();
198 if let Some(index) = wq.iter().position(|t| Arc::ptr_eq(t, task)) {
199 unblock_one_task(wq.remove(index).unwrap(), resched);
200 true
201 } else {
202 false
203 }
204 }
205
206 /// Requeues at most `count` tasks in the wait queue to the target wait queue.
207 ///
208 /// Returns the number of tasks requeued.
209 pub fn requeue(&self, mut count: usize, target: &WaitQueue) -> usize {
210 let tasks: Vec<_> = {
211 let mut wq = self.queue.lock();
212 count = count.min(wq.len());
213 wq.drain(..count).collect()
214 };
215 if !tasks.is_empty() {
216 let mut wq = target.queue.lock();
217 wq.extend(tasks);
218 }
219 count
220 }
221
222 /// Returns the number of tasks in the wait queue.
223 pub fn len(&self) -> usize {
224 self.queue.lock().len()
225 }
226
227 /// Returns true if the wait queue is empty.
228 pub fn is_empty(&self) -> bool {
229 self.queue.lock().is_empty()
230 }
231}
232
233fn unblock_one_task(task: AxTaskRef, resched: bool) {
234 // Mark task as not in wait queue.
235 task.set_in_wait_queue(false);
236 // Select run queue by the CPU set of the task.
237 // Use `NoOp` kernel guard here because the function is called with holding the
238 // lock of wait queue, where the irq and preemption are disabled.
239 select_run_queue::<NoOp>(&task).unblock_task(task, resched)
240}