1use std::{collections::VecDeque, path::PathBuf, sync::Arc};
2use tokio::sync::{RwLock, Semaphore};
3
4pub struct AsyncCapReadDir(
5 pub Option<cap_std::fs::ReadDir>,
6 pub Option<VecDeque<std::io::Result<(bool, String)>>>,
7);
8
9impl AsyncCapReadDir {
10 async fn next_entry(&mut self) -> Option<std::io::Result<(bool, String)>> {
11 if let Some(buffer) = self.1.as_mut()
12 && !buffer.is_empty()
13 {
14 return buffer.pop_front();
15 }
16
17 let mut read_dir = self.0.take()?;
18 let mut buffer = self.1.take()?;
19
20 match tokio::task::spawn_blocking(move || {
21 for _ in 0..32 {
22 if let Some(entry) = read_dir.next() {
23 buffer.push_back(entry.map(|e| {
24 (
25 e.file_type().is_ok_and(|ft| ft.is_dir()),
26 e.file_name().to_string_lossy().to_string(),
27 )
28 }));
29 } else {
30 break;
31 }
32 }
33
34 (buffer, read_dir)
35 })
36 .await
37 {
38 Ok((buffer, read_dir)) => {
39 self.0 = Some(read_dir);
40 self.1 = Some(buffer);
41
42 self.1.as_mut()?.pop_front()
43 }
44 Err(_) => None,
45 }
46 }
47}
48
49pub struct AsyncTokioReadDir(pub tokio::fs::ReadDir);
50
51impl AsyncTokioReadDir {
52 async fn next_entry(&mut self) -> Option<std::io::Result<(bool, String)>> {
53 match self.0.next_entry().await {
54 Ok(Some(entry)) => Some(Ok((
55 entry.file_type().await.is_ok_and(|ft| ft.is_dir()),
56 entry.file_name().to_string_lossy().to_string(),
57 ))),
58 Ok(None) => None,
59 Err(err) => Some(Err(err)),
60 }
61 }
62}
63
64pub enum AsyncReadDir {
65 Cap(AsyncCapReadDir),
66 Tokio(AsyncTokioReadDir),
67}
68
69impl AsyncReadDir {
70 pub async fn next_entry(&mut self) -> Option<std::io::Result<(bool, String)>> {
71 match self {
72 AsyncReadDir::Cap(read_dir) => read_dir.next_entry().await,
73 AsyncReadDir::Tokio(read_dir) => read_dir.next_entry().await,
74 }
75 }
76}
77
78pub struct CapReadDir(pub cap_std::fs::ReadDir);
79
80impl CapReadDir {
81 pub fn next_entry(&mut self) -> Option<std::io::Result<(bool, String)>> {
82 match self.0.next() {
83 Some(Ok(entry)) => Some(Ok((
84 entry.file_type().is_ok_and(|ft| ft.is_dir()),
85 entry.file_name().to_string_lossy().to_string(),
86 ))),
87 Some(Err(err)) => Some(Err(err)),
88 None => None,
89 }
90 }
91}
92
93pub struct StdReadDir(pub std::fs::ReadDir);
94
95impl StdReadDir {
96 pub fn next_entry(&mut self) -> Option<std::io::Result<(bool, String)>> {
97 match self.0.next() {
98 Some(Ok(entry)) => Some(Ok((
99 entry.file_type().is_ok_and(|ft| ft.is_dir()),
100 entry.file_name().to_string_lossy().to_string(),
101 ))),
102 Some(Err(err)) => Some(Err(err)),
103 None => None,
104 }
105 }
106}
107
108pub enum ReadDir {
109 Cap(CapReadDir),
110 Std(StdReadDir),
111}
112
113impl ReadDir {
114 pub fn next_entry(&mut self) -> Option<std::io::Result<(bool, String)>> {
115 match self {
116 ReadDir::Cap(read_dir) => read_dir.next_entry(),
117 ReadDir::Std(read_dir) => read_dir.next_entry(),
118 }
119 }
120}
121
122pub struct AsyncWalkDir<'a> {
123 cap_filesystem: super::CapFilesystem,
124 stack: Vec<(PathBuf, AsyncReadDir)>,
125 ignored: &'a [ignore::gitignore::Gitignore],
126}
127
128impl<'a> AsyncWalkDir<'a> {
129 pub async fn new(
130 cap_filesystem: super::CapFilesystem,
131 path: PathBuf,
132 ) -> Result<Self, anyhow::Error> {
133 let read_dir = cap_filesystem.async_read_dir(&path).await?;
134
135 Ok(Self {
136 cap_filesystem,
137 stack: vec![(path, read_dir)],
138 ignored: &[],
139 })
140 }
141
142 pub fn with_ignored(mut self, ignored: &'a [ignore::gitignore::Gitignore]) -> Self {
143 self.ignored = ignored;
144 self
145 }
146
147 pub async fn next_entry(&mut self) -> Option<Result<(bool, PathBuf), anyhow::Error>> {
148 'stack: while let Some((parent_path, read_dir)) = self.stack.last_mut() {
149 match read_dir.next_entry().await {
150 Some(Ok((is_dir, name))) => {
151 let full_path = parent_path.join(&name);
152
153 let should_ignore = self
154 .ignored
155 .iter()
156 .any(|ignored| ignored.matched(&full_path, is_dir).is_ignore());
157 if crate::unlikely(should_ignore) {
158 continue 'stack;
159 }
160
161 if is_dir {
162 match self.cap_filesystem.async_read_dir(&full_path).await {
163 Ok(dir) => self.stack.push((full_path.clone(), dir)),
164 Err(e) => return Some(Err(e)),
165 };
166 }
167
168 return Some(Ok((is_dir, full_path)));
169 }
170 Some(Err(err)) => return Some(Err(err.into())),
171 None => {
172 self.stack.pop();
173 }
174 }
175 }
176
177 None
178 }
179
180 pub async fn run_multithreaded<F, Fut>(
181 &mut self,
182 threads: usize,
183 func: Arc<F>,
184 ) -> Result<(), anyhow::Error>
185 where
186 F: Fn(bool, PathBuf) -> Fut + Send + Sync + 'static,
187 Fut: futures_util::Future<Output = Result<(), anyhow::Error>> + Send + 'static,
188 {
189 let semaphore = Arc::new(Semaphore::new(threads));
190 let error = Arc::new(RwLock::new(None));
191
192 while let Some(entry) = self.next_entry().await {
193 match entry {
194 Ok((is_dir, path)) => {
195 let semaphore = Arc::clone(&semaphore);
196 let error = Arc::clone(&error);
197 let func = Arc::clone(&func);
198
199 if crate::unlikely(error.read().await.is_some()) {
200 break;
201 }
202
203 let permit = match semaphore.acquire_owned().await {
204 Ok(permit) => permit,
205 Err(_) => break,
206 };
207 tokio::spawn(async move {
208 let _permit = permit;
209 match func(is_dir, path).await {
210 Ok(_) => {}
211 Err(err) => {
212 *error.write().await = Some(err);
213 }
214 }
215 });
216 }
217 Err(err) => return Err(err),
218 }
219 }
220
221 semaphore.acquire_many(threads as u32).await.ok();
222
223 if let Some(err) = error.write().await.take() {
224 return Err(err);
225 }
226
227 Ok(())
228 }
229}
230
231pub struct WalkDir<'a> {
232 cap_filesystem: super::CapFilesystem,
233 stack: Vec<(PathBuf, ReadDir)>,
234 ignored: &'a [ignore::gitignore::Gitignore],
235}
236
237impl<'a> WalkDir<'a> {
238 pub fn new(cap_filesystem: super::CapFilesystem, path: PathBuf) -> Result<Self, anyhow::Error> {
239 let read_dir = cap_filesystem.read_dir(&path)?;
240
241 Ok(Self {
242 cap_filesystem,
243 stack: vec![(path, read_dir)],
244 ignored: &[],
245 })
246 }
247
248 pub fn with_ignored(mut self, ignored: &'a [ignore::gitignore::Gitignore]) -> Self {
249 self.ignored = ignored;
250 self
251 }
252
253 pub fn next_entry(&mut self) -> Option<Result<(bool, PathBuf), anyhow::Error>> {
254 'stack: while let Some((parent_path, read_dir)) = self.stack.last_mut() {
255 match read_dir.next_entry() {
256 Some(Ok((is_dir, name))) => {
257 let full_path = parent_path.join(&name);
258
259 let should_ignore = self
260 .ignored
261 .iter()
262 .any(|ignored| ignored.matched(&full_path, is_dir).is_ignore());
263 if crate::unlikely(should_ignore) {
264 continue 'stack;
265 }
266
267 if is_dir {
268 match self.cap_filesystem.read_dir(&full_path) {
269 Ok(dir) => self.stack.push((full_path.clone(), dir)),
270 Err(e) => return Some(Err(e)),
271 };
272 }
273
274 return Some(Ok((is_dir, full_path)));
275 }
276 Some(Err(err)) => {
277 return Some(Err(err.into()));
278 }
279 None => {
280 self.stack.pop();
281 }
282 }
283 }
284
285 None
286 }
287}