axtask/
wait_queue.rs

1use alloc::collections::VecDeque;
2use alloc::sync::Arc;
3use alloc::vec::Vec;
4
5use kernel_guard::{NoOp, NoPreemptIrqSave};
6use kspin::{SpinNoIrq, SpinNoIrqGuard};
7
8use crate::{AxTaskRef, CurrentTask, current_run_queue, select_run_queue};
9
10/// A queue to store sleeping tasks.
11///
12/// # Examples
13///
14/// ```
15/// use axtask::WaitQueue;
16/// use core::sync::atomic::{AtomicU32, Ordering};
17///
18/// static VALUE: AtomicU32 = AtomicU32::new(0);
19/// static WQ: WaitQueue = WaitQueue::new();
20///
21/// axtask::init_scheduler();
22/// // spawn a new task that updates `VALUE` and notifies the main task
23/// axtask::spawn(|| {
24///     assert_eq!(VALUE.load(Ordering::Relaxed), 0);
25///     VALUE.fetch_add(1, Ordering::Relaxed);
26///     WQ.notify_one(true); // wake up the main task
27/// });
28///
29/// WQ.wait(); // block until `notify()` is called
30/// assert_eq!(VALUE.load(Ordering::Relaxed), 1);
31/// ```
32pub struct WaitQueue {
33    queue: SpinNoIrq<VecDeque<AxTaskRef>>,
34}
35
36pub(crate) type WaitQueueGuard<'a> = SpinNoIrqGuard<'a, VecDeque<AxTaskRef>>;
37
38impl WaitQueue {
39    /// Creates an empty wait queue.
40    pub const fn new() -> Self {
41        Self {
42            queue: SpinNoIrq::new(VecDeque::new()),
43        }
44    }
45
46    /// Creates an empty wait queue with space for at least `capacity` elements.
47    pub fn with_capacity(capacity: usize) -> Self {
48        Self {
49            queue: SpinNoIrq::new(VecDeque::with_capacity(capacity)),
50        }
51    }
52
53    /// Cancel events by removing the task from the wait queue.
54    /// If `from_timer_list` is true, try to remove the task from the timer list.
55    fn cancel_events(&self, curr: CurrentTask, _from_timer_list: bool) {
56        // A task can be wake up only one events (timer or `notify()`), remove
57        // the event from another queue.
58        if curr.in_wait_queue() {
59            // wake up by timer (timeout).
60            self.queue.lock().retain(|t| !curr.ptr_eq(t));
61            curr.set_in_wait_queue(false);
62        }
63
64        // Try to cancel a timer event from timer lists.
65        // Just mark task's current timer ticket ID as expired.
66        #[cfg(feature = "irq")]
67        if _from_timer_list {
68            curr.timer_ticket_expired();
69            // Note:
70            //  this task is still not removed from timer list of target CPU,
71            //  which may cause some redundant timer events because it still needs to
72            //  go through the process of expiring an event from the timer list and invoking the callback.
73            //  (it can be considered a lazy-removal strategy, it will be ignored when it is about to take effect.)
74        }
75    }
76
77    /// Blocks the current task and put it into the wait queue, until other task
78    /// notifies it.
79    pub fn wait(&self) {
80        current_run_queue::<NoPreemptIrqSave>().blocked_resched(self.queue.lock());
81        self.cancel_events(crate::current(), false);
82    }
83
84    /// Blocks the current task and put it into the wait queue, until the given
85    /// `condition` becomes true.
86    ///
87    /// Note that even other tasks notify this task, it will not wake up until
88    /// the condition becomes true.
89    pub fn wait_until<F>(&self, condition: F)
90    where
91        F: Fn() -> bool,
92    {
93        let curr = crate::current();
94        loop {
95            let mut rq = current_run_queue::<NoPreemptIrqSave>();
96            let wq = self.queue.lock();
97            if condition() {
98                break;
99            }
100            rq.blocked_resched(wq);
101            // Preemption may occur here.
102        }
103        self.cancel_events(curr, false);
104    }
105
106    /// Blocks the current task and put it into the wait queue, until other tasks
107    /// notify it, or the given duration has elapsed.
108    #[cfg(feature = "irq")]
109    pub fn wait_timeout(&self, dur: core::time::Duration) -> bool {
110        let mut rq = current_run_queue::<NoPreemptIrqSave>();
111        let curr = crate::current();
112        let deadline = axhal::time::wall_time() + dur;
113        debug!(
114            "task wait_timeout: {} deadline={:?}",
115            curr.id_name(),
116            deadline
117        );
118        crate::timers::set_alarm_wakeup(deadline, curr.clone());
119
120        rq.blocked_resched(self.queue.lock());
121
122        let timeout = curr.in_wait_queue(); // still in the wait queue, must have timed out
123
124        // Always try to remove the task from the timer list.
125        self.cancel_events(curr, true);
126        timeout
127    }
128
129    /// Blocks the current task and put it into the wait queue, until the given
130    /// `condition` becomes true, or the given duration has elapsed.
131    ///
132    /// Note that even other tasks notify this task, it will not wake up until
133    /// the above conditions are met.
134    #[cfg(feature = "irq")]
135    pub fn wait_timeout_until<F>(&self, dur: core::time::Duration, condition: F) -> bool
136    where
137        F: Fn() -> bool,
138    {
139        let curr = crate::current();
140        let deadline = axhal::time::wall_time() + dur;
141        debug!(
142            "task wait_timeout: {}, deadline={:?}",
143            curr.id_name(),
144            deadline
145        );
146        crate::timers::set_alarm_wakeup(deadline, curr.clone());
147
148        let mut timeout = true;
149        loop {
150            let mut rq = current_run_queue::<NoPreemptIrqSave>();
151            if axhal::time::wall_time() >= deadline {
152                break;
153            }
154            let wq = self.queue.lock();
155            if condition() {
156                timeout = false;
157                break;
158            }
159
160            rq.blocked_resched(wq);
161            // Preemption may occur here.
162        }
163        // Always try to remove the task from the timer list.
164        self.cancel_events(curr, true);
165        timeout
166    }
167
168    /// Wakes up one task in the wait queue, usually the first one.
169    ///
170    /// If `resched` is true, the current task will be preempted when the
171    /// preemption is enabled.
172    pub fn notify_one(&self, resched: bool) -> bool {
173        let mut wq = self.queue.lock();
174        if let Some(task) = wq.pop_front() {
175            unblock_one_task(task, resched);
176            true
177        } else {
178            false
179        }
180    }
181
182    /// Wakes all tasks in the wait queue.
183    ///
184    /// If `resched` is true, the current task will be preempted when the
185    /// preemption is enabled.
186    pub fn notify_all(&self, resched: bool) {
187        while self.notify_one(resched) {
188            // loop until the wait queue is empty
189        }
190    }
191
192    /// Wake up the given task in the wait queue.
193    ///
194    /// If `resched` is true, the current task will be preempted when the
195    /// preemption is enabled.
196    pub fn notify_task(&mut self, resched: bool, task: &AxTaskRef) -> bool {
197        let mut wq = self.queue.lock();
198        if let Some(index) = wq.iter().position(|t| Arc::ptr_eq(t, task)) {
199            unblock_one_task(wq.remove(index).unwrap(), resched);
200            true
201        } else {
202            false
203        }
204    }
205
206    /// Requeues at most `count` tasks in the wait queue to the target wait queue.
207    ///
208    /// Returns the number of tasks requeued.
209    pub fn requeue(&self, mut count: usize, target: &WaitQueue) -> usize {
210        let tasks: Vec<_> = {
211            let mut wq = self.queue.lock();
212            count = count.min(wq.len());
213            wq.drain(..count).collect()
214        };
215        if !tasks.is_empty() {
216            let mut wq = target.queue.lock();
217            wq.extend(tasks);
218        }
219        count
220    }
221
222    /// Returns the number of tasks in the wait queue.
223    pub fn len(&self) -> usize {
224        self.queue.lock().len()
225    }
226
227    /// Returns true if the wait queue is empty.
228    pub fn is_empty(&self) -> bool {
229        self.queue.lock().is_empty()
230    }
231}
232
233fn unblock_one_task(task: AxTaskRef, resched: bool) {
234    // Mark task as not in wait queue.
235    task.set_in_wait_queue(false);
236    // Select run queue by the CPU set of the task.
237    // Use `NoOp` kernel guard here because the function is called with holding the
238    // lock of wait queue, where the irq and preemption are disabled.
239    select_run_queue::<NoOp>(&task).unblock_task(task, resched)
240}