diff --git a/core/startos/src/util/io.rs b/core/startos/src/util/io.rs index b8cab713f..464666e00 100644 --- a/core/startos/src/util/io.rs +++ b/core/startos/src/util/io.rs @@ -1532,18 +1532,36 @@ pub trait ReadWriter: AsyncRead + AsyncWrite {} impl ReadWriter for T {} #[instrument(skip_all)] -async fn wait_for_created(stream: &mut EventStream<[u8; 1024]>, path: &Path) -> Result<(), Error> { - let parent = stream - .watches() - .add(path.parent().unwrap_or("/".as_ref()), WatchMask::CREATE)?; - while let Some(e) = stream.try_next().await? { - if e.mask & EventMask::CREATE != EventMask::empty() && e.name.as_deref() == path.file_name() - { - break; +fn wait_for_created<'a>( + stream: &'a mut EventStream<[u8; 1024]>, + path: &'a Path, +) -> BoxFuture<'a, Result<(), Error>> { + async { + let parent_path = path.parent().unwrap_or("/".as_ref()); + let parent; + loop { + match stream.watches().add(parent_path, WatchMask::CREATE) { + Ok(desc) => { + parent = desc; + break; + } + Err(e) if e.kind() == std::io::ErrorKind::NotFound => { + wait_for_created(stream, parent_path).await?; + } + Err(e) => Err(e)?, + } } + while let Some(e) = stream.try_next().await? { + if e.mask & EventMask::CREATE != EventMask::empty() + && e.name.as_deref() == path.file_name() + { + break; + } + } + stream.watches().remove(parent)?; + Ok(()) } - stream.watches().remove(parent)?; - Ok(()) + .boxed() } #[instrument(skip_all)] @@ -1554,32 +1572,26 @@ pub fn file_string_stream( async_stream::try_stream! { let mut stream = Inotify::init()?.into_event_stream([0; 1024])?; loop { - stream.watches().add( - &path, - WatchMask::MODIFY | WatchMask::MOVE_SELF | WatchMask::MOVED_TO | WatchMask::DELETE_SELF, - )?; - if let Some(contents) = maybe_read_file_to_string(&path).await? { - yield Some(contents); - } else { - wait_for_created(&mut stream, &path).await?; + loop { + match stream.watches().add( + &path, + WatchMask::MODIFY | WatchMask::MOVE_SELF | WatchMask::MOVED_TO | WatchMask::DELETE_SELF, + ) { + Ok(_) => break, + Err(e) if e.kind() == std::io::ErrorKind::NotFound => { + yield None; + wait_for_created(&mut stream, &path).await?; + } + Err(e) => Err(e)?, + } } + yield maybe_read_file_to_string(&path).await?; while let Some(e) = stream.try_next().await? { - yield maybe_read_file_to_string(&path).await?; if e.mask & EventMask::DELETE_SELF != EventMask::empty() { break; } + yield maybe_read_file_to_string(&path).await?; } } } } - -#[tokio::test] -async fn test_wait_for_created() { - let mut stream = Inotify::init() - .unwrap() - .into_event_stream([0; 1024]) - .unwrap(); - wait_for_created(&mut stream, Path::new("/tmp/wait-for-me")) - .await - .unwrap(); -}