starry_api/file/
pipe.rs

1use core::any::Any;
2
3use alloc::sync::Arc;
4use axerrno::{LinuxError, LinuxResult};
5use axio::PollState;
6use axsync::Mutex;
7use linux_raw_sys::general::S_IFIFO;
8
9use super::{FileLike, Kstat};
10
11#[derive(Copy, Clone, PartialEq)]
12enum RingBufferStatus {
13    Full,
14    Empty,
15    Normal,
16}
17
18const RING_BUFFER_SIZE: usize = 256;
19
20struct PipeRingBuffer {
21    arr: [u8; RING_BUFFER_SIZE],
22    head: usize,
23    tail: usize,
24    status: RingBufferStatus,
25}
26
27impl PipeRingBuffer {
28    const fn new() -> Self {
29        Self {
30            arr: [0; RING_BUFFER_SIZE],
31            head: 0,
32            tail: 0,
33            status: RingBufferStatus::Empty,
34        }
35    }
36
37    fn write_byte(&mut self, byte: u8) {
38        self.status = RingBufferStatus::Normal;
39        self.arr[self.tail] = byte;
40        self.tail = (self.tail + 1) % RING_BUFFER_SIZE;
41        if self.tail == self.head {
42            self.status = RingBufferStatus::Full;
43        }
44    }
45
46    fn read_byte(&mut self) -> u8 {
47        self.status = RingBufferStatus::Normal;
48        let c = self.arr[self.head];
49        self.head = (self.head + 1) % RING_BUFFER_SIZE;
50        if self.head == self.tail {
51            self.status = RingBufferStatus::Empty;
52        }
53        c
54    }
55
56    /// Get the length of remaining data in the buffer
57    const fn available_read(&self) -> usize {
58        if matches!(self.status, RingBufferStatus::Empty) {
59            0
60        } else if self.tail > self.head {
61            self.tail - self.head
62        } else {
63            self.tail + RING_BUFFER_SIZE - self.head
64        }
65    }
66
67    /// Get the length of remaining space in the buffer
68    const fn available_write(&self) -> usize {
69        if matches!(self.status, RingBufferStatus::Full) {
70            0
71        } else {
72            RING_BUFFER_SIZE - self.available_read()
73        }
74    }
75}
76
77pub struct Pipe {
78    readable: bool,
79    buffer: Arc<Mutex<PipeRingBuffer>>,
80}
81
82impl Pipe {
83    pub fn new() -> (Pipe, Pipe) {
84        let buffer = Arc::new(Mutex::new(PipeRingBuffer::new()));
85        let read_end = Pipe {
86            readable: true,
87            buffer: buffer.clone(),
88        };
89        let write_end = Pipe {
90            readable: false,
91            buffer,
92        };
93        (read_end, write_end)
94    }
95
96    pub const fn readable(&self) -> bool {
97        self.readable
98    }
99
100    pub const fn writable(&self) -> bool {
101        !self.readable
102    }
103
104    pub fn closed(&self) -> bool {
105        Arc::strong_count(&self.buffer) == 1
106    }
107}
108
109impl FileLike for Pipe {
110    fn read(&self, buf: &mut [u8]) -> LinuxResult<usize> {
111        if !self.readable() {
112            return Err(LinuxError::EPERM);
113        }
114        if buf.is_empty() {
115            return Ok(0);
116        }
117
118        loop {
119            let mut ring_buffer = self.buffer.lock();
120            let read_size = ring_buffer.available_read().min(buf.len());
121            if read_size == 0 {
122                if self.closed() {
123                    return Ok(0);
124                }
125                drop(ring_buffer);
126                // Data not ready, wait for write end
127                axtask::yield_now(); // TODO: use synconize primitive
128                continue;
129            }
130            for c in buf.iter_mut().take(read_size) {
131                *c = ring_buffer.read_byte();
132            }
133            return Ok(read_size);
134        }
135    }
136
137    fn write(&self, buf: &[u8]) -> LinuxResult<usize> {
138        if !self.writable() {
139            return Err(LinuxError::EPERM);
140        }
141        if self.closed() {
142            return Err(LinuxError::EPIPE);
143        }
144        if buf.is_empty() {
145            return Ok(0);
146        }
147
148        let mut write_size = 0usize;
149        let total_len = buf.len();
150        loop {
151            let mut ring_buffer = self.buffer.lock();
152            let loop_write = ring_buffer.available_write();
153            if loop_write == 0 {
154                if self.closed() {
155                    return Ok(write_size);
156                }
157                drop(ring_buffer);
158                // Buffer is full, wait for read end to consume
159                axtask::yield_now(); // TODO: use synconize primitive
160                continue;
161            }
162            for _ in 0..loop_write {
163                if write_size == total_len {
164                    return Ok(write_size);
165                }
166                ring_buffer.write_byte(buf[write_size]);
167                write_size += 1;
168            }
169        }
170    }
171
172    fn stat(&self) -> LinuxResult<Kstat> {
173        Ok(Kstat {
174            mode: S_IFIFO | 0o600u32, // rw-------
175            ..Default::default()
176        })
177    }
178
179    fn into_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
180        self
181    }
182
183    fn poll(&self) -> LinuxResult<PollState> {
184        let buf = self.buffer.lock();
185        Ok(PollState {
186            readable: self.readable() && buf.available_read() > 0,
187            writable: self.writable() && buf.available_write() > 0,
188        })
189    }
190
191    fn set_nonblocking(&self, _nonblocking: bool) -> LinuxResult {
192        Ok(())
193    }
194}