fix file_stream and remove non-terminating test

This commit is contained in:
Aiden McClelland
2025-08-29 11:48:30 -06:00
parent 8163db7ac3
commit 369e559518

View File

@@ -1532,18 +1532,36 @@ pub trait ReadWriter: AsyncRead + AsyncWrite {}
impl<T: AsyncRead + AsyncWrite> ReadWriter for T {}
#[instrument(skip_all)]
async fn wait_for_created(stream: &mut EventStream<[u8; 1024]>, path: &Path) -> Result<(), Error> {
let parent = stream
.watches()
.add(path.parent().unwrap_or("/".as_ref()), WatchMask::CREATE)?;
while let Some(e) = stream.try_next().await? {
if e.mask & EventMask::CREATE != EventMask::empty() && e.name.as_deref() == path.file_name()
{
break;
fn wait_for_created<'a>(
stream: &'a mut EventStream<[u8; 1024]>,
path: &'a Path,
) -> BoxFuture<'a, Result<(), Error>> {
async {
let parent_path = path.parent().unwrap_or("/".as_ref());
let parent;
loop {
match stream.watches().add(parent_path, WatchMask::CREATE) {
Ok(desc) => {
parent = desc;
break;
}
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
wait_for_created(stream, parent_path).await?;
}
Err(e) => Err(e)?,
}
}
while let Some(e) = stream.try_next().await? {
if e.mask & EventMask::CREATE != EventMask::empty()
&& e.name.as_deref() == path.file_name()
{
break;
}
}
stream.watches().remove(parent)?;
Ok(())
}
stream.watches().remove(parent)?;
Ok(())
.boxed()
}
#[instrument(skip_all)]
@@ -1554,32 +1572,26 @@ pub fn file_string_stream(
async_stream::try_stream! {
let mut stream = Inotify::init()?.into_event_stream([0; 1024])?;
loop {
stream.watches().add(
&path,
WatchMask::MODIFY | WatchMask::MOVE_SELF | WatchMask::MOVED_TO | WatchMask::DELETE_SELF,
)?;
if let Some(contents) = maybe_read_file_to_string(&path).await? {
yield Some(contents);
} else {
wait_for_created(&mut stream, &path).await?;
loop {
match stream.watches().add(
&path,
WatchMask::MODIFY | WatchMask::MOVE_SELF | WatchMask::MOVED_TO | WatchMask::DELETE_SELF,
) {
Ok(_) => break,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
yield None;
wait_for_created(&mut stream, &path).await?;
}
Err(e) => Err(e)?,
}
}
yield maybe_read_file_to_string(&path).await?;
while let Some(e) = stream.try_next().await? {
yield maybe_read_file_to_string(&path).await?;
if e.mask & EventMask::DELETE_SELF != EventMask::empty() {
break;
}
yield maybe_read_file_to_string(&path).await?;
}
}
}
}
#[tokio::test]
async fn test_wait_for_created() {
let mut stream = Inotify::init()
.unwrap()
.into_event_stream([0; 1024])
.unwrap();
wait_for_created(&mut stream, Path::new("/tmp/wait-for-me"))
.await
.unwrap();
}