shared/cap/
utils.rs

1use std::{collections::VecDeque, path::PathBuf, sync::Arc};
2use tokio::sync::{RwLock, Semaphore};
3
4pub struct AsyncCapReadDir(
5    pub Option<cap_std::fs::ReadDir>,
6    pub Option<VecDeque<std::io::Result<(bool, String)>>>,
7);
8
9impl AsyncCapReadDir {
10    async fn next_entry(&mut self) -> Option<std::io::Result<(bool, String)>> {
11        if let Some(buffer) = self.1.as_mut()
12            && !buffer.is_empty()
13        {
14            return buffer.pop_front();
15        }
16
17        let mut read_dir = self.0.take()?;
18        let mut buffer = self.1.take()?;
19
20        match tokio::task::spawn_blocking(move || {
21            for _ in 0..32 {
22                if let Some(entry) = read_dir.next() {
23                    buffer.push_back(entry.map(|e| {
24                        (
25                            e.file_type().is_ok_and(|ft| ft.is_dir()),
26                            e.file_name().to_string_lossy().to_string(),
27                        )
28                    }));
29                } else {
30                    break;
31                }
32            }
33
34            (buffer, read_dir)
35        })
36        .await
37        {
38            Ok((buffer, read_dir)) => {
39                self.0 = Some(read_dir);
40                self.1 = Some(buffer);
41
42                self.1.as_mut()?.pop_front()
43            }
44            Err(_) => None,
45        }
46    }
47}
48
49pub struct AsyncTokioReadDir(pub tokio::fs::ReadDir);
50
51impl AsyncTokioReadDir {
52    async fn next_entry(&mut self) -> Option<std::io::Result<(bool, String)>> {
53        match self.0.next_entry().await {
54            Ok(Some(entry)) => Some(Ok((
55                entry.file_type().await.is_ok_and(|ft| ft.is_dir()),
56                entry.file_name().to_string_lossy().to_string(),
57            ))),
58            Ok(None) => None,
59            Err(err) => Some(Err(err)),
60        }
61    }
62}
63
64pub enum AsyncReadDir {
65    Cap(AsyncCapReadDir),
66    Tokio(AsyncTokioReadDir),
67}
68
69impl AsyncReadDir {
70    pub async fn next_entry(&mut self) -> Option<std::io::Result<(bool, String)>> {
71        match self {
72            AsyncReadDir::Cap(read_dir) => read_dir.next_entry().await,
73            AsyncReadDir::Tokio(read_dir) => read_dir.next_entry().await,
74        }
75    }
76}
77
78pub struct CapReadDir(pub cap_std::fs::ReadDir);
79
80impl CapReadDir {
81    pub fn next_entry(&mut self) -> Option<std::io::Result<(bool, String)>> {
82        match self.0.next() {
83            Some(Ok(entry)) => Some(Ok((
84                entry.file_type().is_ok_and(|ft| ft.is_dir()),
85                entry.file_name().to_string_lossy().to_string(),
86            ))),
87            Some(Err(err)) => Some(Err(err)),
88            None => None,
89        }
90    }
91}
92
93pub struct StdReadDir(pub std::fs::ReadDir);
94
95impl StdReadDir {
96    pub fn next_entry(&mut self) -> Option<std::io::Result<(bool, String)>> {
97        match self.0.next() {
98            Some(Ok(entry)) => Some(Ok((
99                entry.file_type().is_ok_and(|ft| ft.is_dir()),
100                entry.file_name().to_string_lossy().to_string(),
101            ))),
102            Some(Err(err)) => Some(Err(err)),
103            None => None,
104        }
105    }
106}
107
108pub enum ReadDir {
109    Cap(CapReadDir),
110    Std(StdReadDir),
111}
112
113impl ReadDir {
114    pub fn next_entry(&mut self) -> Option<std::io::Result<(bool, String)>> {
115        match self {
116            ReadDir::Cap(read_dir) => read_dir.next_entry(),
117            ReadDir::Std(read_dir) => read_dir.next_entry(),
118        }
119    }
120}
121
122pub struct AsyncWalkDir<'a> {
123    cap_filesystem: super::CapFilesystem,
124    stack: Vec<(PathBuf, AsyncReadDir)>,
125    ignored: &'a [ignore::gitignore::Gitignore],
126}
127
128impl<'a> AsyncWalkDir<'a> {
129    pub async fn new(
130        cap_filesystem: super::CapFilesystem,
131        path: PathBuf,
132    ) -> Result<Self, anyhow::Error> {
133        let read_dir = cap_filesystem.async_read_dir(&path).await?;
134
135        Ok(Self {
136            cap_filesystem,
137            stack: vec![(path, read_dir)],
138            ignored: &[],
139        })
140    }
141
142    pub fn with_ignored(mut self, ignored: &'a [ignore::gitignore::Gitignore]) -> Self {
143        self.ignored = ignored;
144        self
145    }
146
147    pub async fn next_entry(&mut self) -> Option<Result<(bool, PathBuf), anyhow::Error>> {
148        'stack: while let Some((parent_path, read_dir)) = self.stack.last_mut() {
149            match read_dir.next_entry().await {
150                Some(Ok((is_dir, name))) => {
151                    let full_path = parent_path.join(&name);
152
153                    let should_ignore = self
154                        .ignored
155                        .iter()
156                        .any(|ignored| ignored.matched(&full_path, is_dir).is_ignore());
157                    if crate::unlikely(should_ignore) {
158                        continue 'stack;
159                    }
160
161                    if is_dir {
162                        match self.cap_filesystem.async_read_dir(&full_path).await {
163                            Ok(dir) => self.stack.push((full_path.clone(), dir)),
164                            Err(e) => return Some(Err(e)),
165                        };
166                    }
167
168                    return Some(Ok((is_dir, full_path)));
169                }
170                Some(Err(err)) => return Some(Err(err.into())),
171                None => {
172                    self.stack.pop();
173                }
174            }
175        }
176
177        None
178    }
179
180    pub async fn run_multithreaded<F, Fut>(
181        &mut self,
182        threads: usize,
183        func: Arc<F>,
184    ) -> Result<(), anyhow::Error>
185    where
186        F: Fn(bool, PathBuf) -> Fut + Send + Sync + 'static,
187        Fut: futures_util::Future<Output = Result<(), anyhow::Error>> + Send + 'static,
188    {
189        let semaphore = Arc::new(Semaphore::new(threads));
190        let error = Arc::new(RwLock::new(None));
191
192        while let Some(entry) = self.next_entry().await {
193            match entry {
194                Ok((is_dir, path)) => {
195                    let semaphore = Arc::clone(&semaphore);
196                    let error = Arc::clone(&error);
197                    let func = Arc::clone(&func);
198
199                    if crate::unlikely(error.read().await.is_some()) {
200                        break;
201                    }
202
203                    let permit = match semaphore.acquire_owned().await {
204                        Ok(permit) => permit,
205                        Err(_) => break,
206                    };
207                    tokio::spawn(async move {
208                        let _permit = permit;
209                        match func(is_dir, path).await {
210                            Ok(_) => {}
211                            Err(err) => {
212                                *error.write().await = Some(err);
213                            }
214                        }
215                    });
216                }
217                Err(err) => return Err(err),
218            }
219        }
220
221        semaphore.acquire_many(threads as u32).await.ok();
222
223        if let Some(err) = error.write().await.take() {
224            return Err(err);
225        }
226
227        Ok(())
228    }
229}
230
231pub struct WalkDir<'a> {
232    cap_filesystem: super::CapFilesystem,
233    stack: Vec<(PathBuf, ReadDir)>,
234    ignored: &'a [ignore::gitignore::Gitignore],
235}
236
237impl<'a> WalkDir<'a> {
238    pub fn new(cap_filesystem: super::CapFilesystem, path: PathBuf) -> Result<Self, anyhow::Error> {
239        let read_dir = cap_filesystem.read_dir(&path)?;
240
241        Ok(Self {
242            cap_filesystem,
243            stack: vec![(path, read_dir)],
244            ignored: &[],
245        })
246    }
247
248    pub fn with_ignored(mut self, ignored: &'a [ignore::gitignore::Gitignore]) -> Self {
249        self.ignored = ignored;
250        self
251    }
252
253    pub fn next_entry(&mut self) -> Option<Result<(bool, PathBuf), anyhow::Error>> {
254        'stack: while let Some((parent_path, read_dir)) = self.stack.last_mut() {
255            match read_dir.next_entry() {
256                Some(Ok((is_dir, name))) => {
257                    let full_path = parent_path.join(&name);
258
259                    let should_ignore = self
260                        .ignored
261                        .iter()
262                        .any(|ignored| ignored.matched(&full_path, is_dir).is_ignore());
263                    if crate::unlikely(should_ignore) {
264                        continue 'stack;
265                    }
266
267                    if is_dir {
268                        match self.cap_filesystem.read_dir(&full_path) {
269                            Ok(dir) => self.stack.push((full_path.clone(), dir)),
270                            Err(e) => return Some(Err(e)),
271                        };
272                    }
273
274                    return Some(Ok((is_dir, full_path)));
275                }
276                Some(Err(err)) => {
277                    return Some(Err(err.into()));
278                }
279                None => {
280                    self.stack.pop();
281                }
282            }
283        }
284
285        None
286    }
287}