From ac1085ff9b8e72b824010e42529f61bc4060517d Mon Sep 17 00:00:00 2001 From: Aiden McClelland Date: Thu, 6 Nov 2025 17:53:56 -0700 Subject: [PATCH] fix raspi build --- core/startos/src/util/squashfs.rs | 1138 ++++++++++++++++++++++++++--- image-recipe/build.sh | 39 +- 2 files changed, 1038 insertions(+), 139 deletions(-) diff --git a/core/startos/src/util/squashfs.rs b/core/startos/src/util/squashfs.rs index f6dfba414..4f63e846a 100644 --- a/core/startos/src/util/squashfs.rs +++ b/core/startos/src/util/squashfs.rs @@ -1,5 +1,4 @@ -use std::io::{Seek, Write}; -use std::path::Path; +use std::io::{Seek, SeekFrom, Write}; use std::task::Poll; use async_compression::codecs::{Encode, ZstdEncoder}; @@ -9,7 +8,6 @@ use tokio::io::{AsyncSeek, AsyncWrite}; use visit_rs::{Visit, VisitAsync, VisitFields, VisitFieldsAsync, Visitor}; use crate::prelude::*; -use crate::registry::os::asset::add; struct SquashfsSerializer { writer: W, @@ -99,88 +97,235 @@ impl Visit> for Superblock { } #[pin_project::pin_project] -pub struct MetadataBlocks { +pub struct MetadataBlocksWriter { input: [u8; 8192], - input_flushed: usize, size: usize, size_addr: Option, - end_addr: Option, - zstd: Option, - output: PartialBuffer<[u8; 4096]>, + output: PartialBuffer<[u8; 8192]>, output_flushed: usize, + zstd: Option, + seek_state: SeekState, + write_state: WriteState, #[pin] writer: W, } -impl Write for MetadataBlocks { + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum SeekState { + Idle, + GettingPosition, + Seeking(u64), +} + +#[derive(Debug, Clone, PartialEq, Eq)] +enum WriteState { + Idle, + WritingSizeHeader(u16), + WritingOutput(Box), + EncodingInput, + FinishingCompression, + WritingFinalSizeHeader(u64, u64), + SeekingToEnd(u64), +} + +fn poll_seek_helper( + writer: std::pin::Pin<&mut W>, + seek_state: &mut SeekState, + cx: &mut std::task::Context<'_>, + pos: u64, +) -> std::task::Poll> { + match *seek_state { + SeekState::Idle => { + writer.start_seek(std::io::SeekFrom::Start(pos))?; + *seek_state = SeekState::Seeking(pos); + Poll::Pending + } + SeekState::Seeking(target) if target == pos => { + let result = ready!(writer.poll_complete(cx))?; + *seek_state = SeekState::Idle; + Poll::Ready(Ok(result)) + } + SeekState::Seeking(old_target) => { + tracing::warn!( + "poll_seek({}) called while seeking to {}, canceling previous seek", + pos, + old_target + ); + writer.start_seek(std::io::SeekFrom::Start(pos))?; + *seek_state = SeekState::Seeking(pos); + Poll::Pending + } + SeekState::GettingPosition => { + tracing::warn!( + "poll_seek({}) called while getting stream position, canceling", + pos + ); + writer.start_seek(std::io::SeekFrom::Start(pos))?; + *seek_state = SeekState::Seeking(pos); + Poll::Pending + } + } +} + +fn poll_stream_position_helper( + writer: std::pin::Pin<&mut W>, + seek_state: &mut SeekState, + cx: &mut std::task::Context<'_>, +) -> std::task::Poll> { + match *seek_state { + SeekState::Idle => { + writer.start_seek(std::io::SeekFrom::Current(0))?; + *seek_state = SeekState::GettingPosition; + Poll::Pending + } + SeekState::GettingPosition => { + let result = ready!(writer.poll_complete(cx))?; + *seek_state = SeekState::Idle; + Poll::Ready(Ok(result)) + } + SeekState::Seeking(target) => { + tracing::warn!( + "poll_stream_position called while seeking to {}, canceling", + target + ); + writer.start_seek(std::io::SeekFrom::Current(0))?; + *seek_state = SeekState::GettingPosition; + Poll::Pending + } + } +} + +impl Write for MetadataBlocksWriter { fn write(&mut self, buf: &[u8]) -> std::io::Result { let n = buf.len().min(self.input.len() - self.size); self.input[self.size..self.size + n].copy_from_slice(&buf[..n]); + self.size += n; if n < buf.len() { self.flush()?; } Ok(n) } fn flush(&mut self) -> std::io::Result<()> { - if self.size > 0 { - if self.size_addr.is_none() { - self.size_addr = Some(self.writer.stream_position()?); - self.output.unwritten_mut()[..2].copy_from_slice(&[0; 2]); - self.output.advance(2); - } - if self.output.written().len() > self.output_flushed { - let n = self - .writer - .write(&self.output.written()[self.output_flushed..])?; - self.output_flushed += n; - } - if self.output.written().len() == self.output_flushed { - self.output_flushed = 0; - self.output.reset(); - } - if self.input_flushed < self.size { - if !self.output.unwritten().is_empty() { - let mut input = PartialBuffer::new(&self.input[self.input_flushed..self.size]); - self.zstd - .get_or_insert_with(|| ZstdEncoder::new(22)) - .encode(&mut input, &mut self.output)?; - self.input_flushed += input.written().len(); - } - } else { - if !self.output.unwritten().is_empty() { - if self.zstd.as_mut().unwrap().finish(&mut self.output)? { - self.zstd = None; + loop { + match self.write_state { + WriteState::Idle => { + if self.size == 0 { + return Ok(()); } + self.write_state = WriteState::WritingSizeHeader(0); } - if self.zstd.is_none() && self.output.written().len() == self.output_flushed { - self.output_flushed = 0; - self.output.reset(); - if let Some(addr) = self.size_addr { - let end_addr = if let Some(end_addr) = self.end_addr { - end_addr + + WriteState::WritingSizeHeader(size) => { + let done = if let Some(size_addr) = self.size_addr { + self.writer.seek(SeekFrom::Start(size_addr))?; + Some(size_addr + size as u64) + } else { + self.size_addr = Some(self.writer.stream_position()?); + None + }; + self.output.unwritten_mut()[..2].copy_from_slice(&u16::to_le_bytes(size)[..]); + self.output.advance(2); + self.write_state = + WriteState::WritingOutput(Box::new(if let Some(end) = done { + WriteState::SeekingToEnd(end) } else { - let end_addr = self.writer.stream_position()?; - self.end_addr = Some(end_addr); - end_addr - }; - self.writer.seek(std::io::SeekFrom::Start(addr))?; - self.output.unwritten_mut()[..2] - .copy_from_slice(&((end_addr - addr - 2) as u16).to_le_bytes()); - self.output.advance(2); - self.size_addr = None; + WriteState::EncodingInput + })); + } + + WriteState::WritingOutput(next) => { + if self.output.written().len() > self.output_flushed { + let n = self + .writer + .write(&self.output.written()[self.output_flushed..])?; + self.output_flushed += n; + } else { + self.output.reset(); + self.output_flushed = 0; + self.write_state = *next; } - if let Some(end_addr) = self.end_addr { - self.writer.seek(std::io::SeekFrom::Start(end_addr))?; - self.end_addr = None; - self.input_flushed = 0; - self.size = 0; + } + + WriteState::EncodingInput => { + let encoder = self.zstd.get_or_insert_with(|| ZstdEncoder::new(22)); + let mut input = PartialBuffer::new(&self.input[..self.size]); + while !self.output.unwritten().is_empty() && !input.unwritten().is_empty() { + encoder.encode(&mut input, &mut self.output)?; } + while !encoder.flush(&mut self.output)? {} + while !encoder.finish(&mut self.output)? {} + if !self.output.unwritten().is_empty() { + let mut input = + PartialBuffer::new(&self.input[self.input_flushed..self.size]); + encoder.encode(&mut input, &mut self.output)?; + self.input_flushed += input.written().len(); + } + self.write_state = WriteState::WritingOutput(Box::new()); + continue; + } + + WriteState::FinishingCompression => { + if !self.output.unwritten().is_empty() { + if self.zstd.as_mut().unwrap().finish(&mut self.output)? { + self.zstd = None; + } + } + if self.output.written().len() > self.output_flushed { + self.write_state = WriteState::WritingOutput; + continue; + } + if self.zstd.is_none() && self.output.written().len() == self.output_flushed { + self.output_flushed = 0; + self.output.reset(); + let end_addr = self.writer.stream_position()?; + let size_addr = self.size_addr.ok_or_else(|| { + std::io::Error::new( + std::io::ErrorKind::InvalidData, + "size_addr not set when finishing compression", + ) + })?; + self.write_state = WriteState::WritingFinalSizeHeader(size_addr, end_addr); + continue; + } + return Ok(()); + } + + WriteState::WritingFinalSizeHeader(size_addr, end_addr) => { + if self.output.written().len() > self.output_flushed { + let n = self + .writer + .write(&self.output.written()[self.output_flushed..])?; + self.output_flushed += n; + continue; + } + self.writer.seek(std::io::SeekFrom::Start(size_addr))?; + self.output.unwritten_mut()[..2] + .copy_from_slice(&((end_addr - size_addr - 2) as u16).to_le_bytes()); + self.output.advance(2); + let n = self.writer.write(&self.output.written())?; + self.output_flushed = n; + if n == 2 { + self.output_flushed = 0; + self.output.reset(); + self.write_state = WriteState::SeekingToEnd(end_addr); + } + continue; + } + + WriteState::SeekingToEnd(end_addr) => { + self.writer.seek(std::io::SeekFrom::Start(end_addr))?; + self.input_flushed = 0; + self.size = 0; + self.size_addr = None; + self.write_state = WriteState::Idle; + return Ok(()); } } } - Ok(()) } } -impl AsyncWrite for MetadataBlocks { + +impl AsyncWrite for MetadataBlocksWriter { fn poll_write( mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, @@ -189,6 +334,7 @@ impl AsyncWrite for MetadataBlocks { let this = self.as_mut().project(); let n = buf.len().min(this.input.len() - *this.size); this.input[*this.size..*this.size + n].copy_from_slice(&buf[..n]); + *this.size += n; if n < buf.len() { ready!(self.poll_flush(cx)?); } @@ -199,70 +345,826 @@ impl AsyncWrite for MetadataBlocks { mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { - // let this = self.as_mut(); - // if self.size > 0 { - // if self.size_addr.is_none() { - // self.size_addr = Some(self.writer.stream_position()?); - // self.output.unwritten_mut()[..2].copy_from_slice(&[0; 2]); - // self.output.advance(2); - // } - // if self.output.written().len() > self.output_flushed { - // let n = self - // .writer - // .write(&self.output.written()[self.output_flushed..])?; - // self.output_flushed += n; - // } - // if self.output.written().len() == self.output_flushed { - // self.output_flushed = 0; - // self.output.reset(); - // } - // if self.input_flushed < self.size { - // if !self.output.unwritten().is_empty() { - // let mut input = PartialBuffer::new(&self.input[self.input_flushed..self.size]); - // self.zstd - // .get_or_insert_with(|| ZstdEncoder::new(22)) - // .encode(&mut input, &mut self.output)?; - // self.input_flushed += input.written().len(); - // } - // } else { - // if !self.output.unwritten().is_empty() { - // if self.zstd.as_mut().unwrap().finish(&mut self.output)? { - // self.zstd = None; - // } - // } - // if self.zstd.is_none() && self.output.written().len() == self.output_flushed { - // self.output_flushed = 0; - // self.output.reset(); - // if let Some(addr) = self.size_addr { - // let end_addr = if let Some(end_addr) = self.end_addr { - // end_addr - // } else { - // let end_addr = self.writer.stream_position()?; - // self.end_addr = Some(end_addr); - // end_addr - // }; - // self.writer.seek(std::io::SeekFrom::Start(addr))?; - // self.output.unwritten_mut()[..2] - // .copy_from_slice(&((end_addr - addr - 2) as u16).to_le_bytes()); - // self.output.advance(2); - // self.size_addr = None; - // } - // if let Some(end_addr) = self.end_addr { - // self.writer.seek(std::io::SeekFrom::Start(end_addr))?; - // self.end_addr = None; - // self.input_flushed = 0; - // self.size = 0; - // } - // } - // } - // } - Poll::Ready(Ok(())) + loop { + let mut this = self.as_mut().project(); + match *this.write_state { + WriteState::Idle => { + if *this.size == 0 { + return Poll::Ready(Ok(())); + } + if this.size_addr.is_none() { + let pos = ready!(poll_stream_position_helper( + this.writer.as_mut(), + this.seek_state, + cx + ))?; + *this.size_addr = Some(pos); + this.output.unwritten_mut()[..2].copy_from_slice(&[0; 2]); + this.output.advance(2); + } + *this.write_state = WriteState::WritingOutput; + continue; + } + + WriteState::WritingOutput => { + if this.output.written().len() > *this.output_flushed { + let n = ready!( + this.writer + .as_mut() + .poll_write(cx, &this.output.written()[*this.output_flushed..]) + )?; + *this.output_flushed += n; + continue; + } + if this.output.written().len() == *this.output_flushed { + *this.output_flushed = 0; + this.output.reset(); + } + if *this.input_flushed < *this.size { + if !this.output.unwritten().is_empty() { + let mut input = + PartialBuffer::new(&this.input[*this.input_flushed..*this.size]); + this.zstd + .get_or_insert_with(|| ZstdEncoder::new(22)) + .encode(&mut input, this.output)?; + *this.input_flushed += input.written().len(); + } + continue; + } else { + if !this.output.unwritten().is_empty() { + if this.zstd.as_mut().unwrap().finish(this.output)? { + *this.zstd = None; + } + continue; + } + if this.zstd.is_none() + && this.output.written().len() == *this.output_flushed + { + *this.output_flushed = 0; + this.output.reset(); + if let Some(size_addr) = *this.size_addr { + let end_addr = ready!(poll_stream_position_helper( + this.writer.as_mut(), + this.seek_state, + cx + ))?; + *this.write_state = + WriteState::WritingFinalSizeHeader(size_addr, end_addr); + ready!(poll_seek_helper( + this.writer.as_mut(), + this.seek_state, + cx, + size_addr + ))?; + this.output.unwritten_mut()[..2].copy_from_slice( + &((end_addr - size_addr - 2) as u16).to_le_bytes(), + ); + this.output.advance(2); + continue; + } + } + } + return Poll::Ready(Ok(())); + } + + WriteState::WritingSizeHeader(_size_addr) => { + *this.write_state = WriteState::WritingOutput; + continue; + } + + WriteState::EncodingInput => { + *this.write_state = WriteState::WritingOutput; + continue; + } + + WriteState::FinishingCompression => { + *this.write_state = WriteState::WritingOutput; + continue; + } + + WriteState::WritingFinalSizeHeader(_size_addr, end_addr) => { + if this.output.written().len() > *this.output_flushed { + let n = ready!( + this.writer + .as_mut() + .poll_write(cx, &this.output.written()[*this.output_flushed..]) + )?; + *this.output_flushed += n; + continue; + } + *this.output_flushed = 0; + this.output.reset(); + *this.write_state = WriteState::SeekingToEnd(end_addr); + continue; + } + + WriteState::SeekingToEnd(end_addr) => { + ready!(poll_seek_helper( + this.writer.as_mut(), + this.seek_state, + cx, + end_addr + ))?; + *this.size_addr = None; + *this.input_flushed = 0; + *this.size = 0; + *this.write_state = WriteState::Idle; + return Poll::Ready(Ok(())); + } + } + } } fn poll_shutdown( self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { - std::task::Poll::Ready(Ok(())) + self.poll_flush(cx) + } +} + +impl MetadataBlocksWriter { + pub fn new(writer: W) -> Self { + Self { + input: [0; 8192], + input_flushed: 0, + size: 0, + size_addr: None, + output: PartialBuffer::new([0; 4096]), + output_flushed: 0, + zstd: None, + seek_state: SeekState::Idle, + write_state: WriteState::Idle, + writer, + } + } +} + +use async_compression::codecs::{Decode, ZstdDecoder}; +use tokio::io::AsyncRead; + +#[pin_project::pin_project] +pub struct MetadataBlocksReader { + #[pin] + reader: R, + size_buf: [u8; 2], + size_bytes_read: usize, + compressed: [u8; 8192], + compressed_size: usize, + compressed_pos: usize, + output: PartialBuffer<[u8; 8192]>, + output_pos: usize, + zstd: Option, + state: ReadState, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum ReadState { + ReadingSize, + ReadingData, + Decompressing, + Outputting, + Eof, +} + +impl MetadataBlocksReader { + pub fn new(reader: R) -> Self { + Self { + reader, + size_buf: [0; 2], + size_bytes_read: 0, + compressed: [0; 8192], + compressed_size: 0, + compressed_pos: 0, + output: PartialBuffer::new([0; 8192]), + output_pos: 0, + zstd: None, + state: ReadState::ReadingSize, + } + } +} + +use std::io::Read; + +impl Read for MetadataBlocksReader { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + loop { + match self.state { + ReadState::ReadingSize => { + let n = self + .reader + .read(&mut self.size_buf[self.size_bytes_read..])?; + if n == 0 { + if self.size_bytes_read == 0 { + self.state = ReadState::Eof; + return Ok(0); + } else { + return Err(std::io::Error::new( + std::io::ErrorKind::UnexpectedEof, + "Unexpected EOF reading size header", + )); + } + } + + self.size_bytes_read += n; + if self.size_bytes_read < 2 { + continue; + } + + let size_header = u16::from_le_bytes(self.size_buf); + let is_compressed = (size_header & 0x8000) == 0; + let size = (size_header & 0x7FFF) as usize; + + if !is_compressed { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + "Uncompressed metadata blocks not supported", + )); + } + + if size == 0 || size > 8192 { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + format!("Invalid metadata block size: {}", size), + )); + } + + self.compressed_size = size; + self.compressed_pos = 0; + self.size_bytes_read = 0; + self.state = ReadState::ReadingData; + continue; + } + + ReadState::ReadingData => { + let n = self + .reader + .read(&mut self.compressed[self.compressed_pos..self.compressed_size])?; + if n == 0 { + return Err(std::io::Error::new( + std::io::ErrorKind::UnexpectedEof, + "Unexpected EOF reading compressed data", + )); + } + + self.compressed_pos += n; + if self.compressed_pos < self.compressed_size { + continue; + } + + self.zstd = Some(ZstdDecoder::new()); + self.output_pos = 0; + self.output.reset(); + self.state = ReadState::Decompressing; + continue; + } + + ReadState::Decompressing => { + if self.output.unwritten().is_empty() { + self.state = ReadState::Outputting; + continue; + } + + let mut input = PartialBuffer::new(&self.compressed[..self.compressed_size]); + let decoder = self.zstd.as_mut().unwrap(); + + if decoder.decode(&mut input, &mut self.output)? { + self.zstd = None; + self.state = ReadState::Outputting; + } + continue; + } + + ReadState::Outputting => { + let available = self.output.written().len() - self.output_pos; + if available == 0 { + if self.zstd.is_none() { + self.state = ReadState::ReadingSize; + continue; + } else { + self.output.reset(); + self.output_pos = 0; + self.state = ReadState::Decompressing; + continue; + } + } + + let to_copy = available.min(buf.len()); + buf[..to_copy].copy_from_slice( + &self.output.written()[self.output_pos..self.output_pos + to_copy], + ); + self.output_pos += to_copy; + return Ok(to_copy); + } + + ReadState::Eof => { + return Ok(0); + } + } + } + } +} + +impl AsyncRead for MetadataBlocksReader { + fn poll_read( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> Poll> { + loop { + let mut this = self.as_mut().project(); + + match *this.state { + ReadState::ReadingSize => { + let mut read_buf = + tokio::io::ReadBuf::new(&mut this.size_buf[*this.size_bytes_read..]); + ready!(this.reader.as_mut().poll_read(cx, &mut read_buf))?; + + let n = read_buf.filled().len(); + if n == 0 { + if *this.size_bytes_read == 0 { + *this.state = ReadState::Eof; + return Poll::Ready(Ok(())); + } else { + return Poll::Ready(Err(std::io::Error::new( + std::io::ErrorKind::UnexpectedEof, + "Unexpected EOF reading size header", + ))); + } + } + + *this.size_bytes_read += n; + if *this.size_bytes_read < 2 { + continue; + } + + let size_header = u16::from_le_bytes(*this.size_buf); + let is_compressed = (size_header & 0x8000) == 0; + let size = (size_header & 0x7FFF) as usize; + + if !is_compressed { + return Poll::Ready(Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + "Uncompressed metadata blocks not supported", + ))); + } + + if size == 0 || size > 8192 { + return Poll::Ready(Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + format!("Invalid metadata block size: {}", size), + ))); + } + + *this.compressed_size = size; + *this.compressed_pos = 0; + *this.size_bytes_read = 0; + *this.state = ReadState::ReadingData; + continue; + } + + ReadState::ReadingData => { + let mut read_buf = tokio::io::ReadBuf::new( + &mut this.compressed[*this.compressed_pos..*this.compressed_size], + ); + ready!(this.reader.as_mut().poll_read(cx, &mut read_buf))?; + + let n = read_buf.filled().len(); + if n == 0 { + return Poll::Ready(Err(std::io::Error::new( + std::io::ErrorKind::UnexpectedEof, + "Unexpected EOF reading compressed data", + ))); + } + + *this.compressed_pos += n; + if *this.compressed_pos < *this.compressed_size { + continue; + } + + *this.zstd = Some(ZstdDecoder::new()); + *this.output_pos = 0; + this.output.reset(); + *this.state = ReadState::Decompressing; + continue; + } + + ReadState::Decompressing => { + if this.output.unwritten().is_empty() { + *this.state = ReadState::Outputting; + continue; + } + + let mut input = PartialBuffer::new(&this.compressed[..*this.compressed_size]); + let decoder = this.zstd.as_mut().unwrap(); + + if decoder.decode(&mut input, this.output)? { + *this.zstd = None; + *this.state = ReadState::Outputting; + } + continue; + } + + ReadState::Outputting => { + let available = this.output.written().len() - *this.output_pos; + if available == 0 { + if this.zstd.is_none() { + *this.state = ReadState::ReadingSize; + continue; + } else { + this.output.reset(); + *this.output_pos = 0; + *this.state = ReadState::Decompressing; + continue; + } + } + + let to_copy = available.min(buf.remaining()); + buf.put_slice( + &this.output.written()[*this.output_pos..*this.output_pos + to_copy], + ); + *this.output_pos += to_copy; + return Poll::Ready(Ok(())); + } + + ReadState::Eof => { + return Poll::Ready(Ok(())); + } + } + } + } +} + +#[cfg(test)] +mod tests { + use std::io::{Cursor, Seek, SeekFrom}; + + use proptest::prelude::*; + + use super::*; + + #[test] + fn test_sync_roundtrip_empty() { + use std::io::{Read, Write}; + let mut buffer = Cursor::new(Vec::new()); + let mut writer = MetadataBlocksWriter::new(&mut buffer); + writer.flush().unwrap(); + + buffer.seek(SeekFrom::Start(0)).unwrap(); + let mut reader = MetadataBlocksReader::new(buffer); + let mut output = Vec::new(); + reader.read_to_end(&mut output).unwrap(); + + assert_eq!(output, Vec::::new()); + } + + #[test] + fn test_sync_roundtrip_small() { + use std::io::{Read, Write}; + let input = b"Hello, World!"; + let mut buffer = Cursor::new(Vec::new()); + let mut writer = MetadataBlocksWriter::new(&mut buffer); + writer.write_all(input).unwrap(); + writer.flush().unwrap(); + + buffer.seek(SeekFrom::Start(0)).unwrap(); + let mut reader = MetadataBlocksReader::new(buffer); + let mut output = Vec::new(); + reader.read_to_end(&mut output).unwrap(); + + assert_eq!(output, input); + } + + #[test] + fn test_sync_roundtrip_exact_block_size() { + use std::io::{Read, Write}; + let input = vec![0x42u8; 8192]; + let mut buffer = Cursor::new(Vec::new()); + let mut writer = MetadataBlocksWriter::new(&mut buffer); + writer.write_all(&input).unwrap(); + writer.flush().unwrap(); + + buffer.seek(SeekFrom::Start(0)).unwrap(); + let mut reader = MetadataBlocksReader::new(buffer); + let mut output = Vec::new(); + reader.read_to_end(&mut output).unwrap(); + + assert_eq!(output, input); + } + + #[test] + fn test_sync_roundtrip_larger_than_block() { + use std::io::{Read, Write}; + let input = vec![0x55u8; 16384]; + let mut buffer = Cursor::new(Vec::new()); + let mut writer = MetadataBlocksWriter::new(&mut buffer); + writer.write_all(&input).unwrap(); + writer.flush().unwrap(); + + buffer.seek(SeekFrom::Start(0)).unwrap(); + let mut reader = MetadataBlocksReader::new(buffer); + let mut output = Vec::new(); + reader.read_to_end(&mut output).unwrap(); + + assert_eq!(output, input); + } + + #[test] + fn test_sync_roundtrip_multiple_blocks() { + use std::io::{Read, Write}; + let input = vec![0xAAu8; 24576]; + let mut buffer = Cursor::new(Vec::new()); + let mut writer = MetadataBlocksWriter::new(&mut buffer); + writer.write_all(&input).unwrap(); + writer.flush().unwrap(); + + buffer.seek(SeekFrom::Start(0)).unwrap(); + let mut reader = MetadataBlocksReader::new(buffer); + let mut output = Vec::new(); + reader.read_to_end(&mut output).unwrap(); + + assert_eq!(output, input); + } + + #[test] + fn test_sync_roundtrip_incremental_writes() { + use std::io::{Read, Write}; + let input = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ"; + let mut buffer = Cursor::new(Vec::new()); + let mut writer = MetadataBlocksWriter::new(&mut buffer); + + for chunk in input.chunks(5) { + writer.write_all(chunk).unwrap(); + } + writer.flush().unwrap(); + + buffer.seek(SeekFrom::Start(0)).unwrap(); + let mut reader = MetadataBlocksReader::new(buffer); + let mut output = Vec::new(); + reader.read_to_end(&mut output).unwrap(); + + assert_eq!(output, input); + } + + #[tokio::test] + async fn test_async_roundtrip_empty() { + use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt}; + let mut buffer = Cursor::new(Vec::new()); + let mut writer = MetadataBlocksWriter::new(&mut buffer); + AsyncWriteExt::flush(&mut writer).await.unwrap(); + + AsyncSeekExt::seek(&mut buffer, SeekFrom::Start(0)) + .await + .unwrap(); + let mut reader = MetadataBlocksReader::new(buffer); + let mut output = Vec::new(); + AsyncReadExt::read_to_end(&mut reader, &mut output) + .await + .unwrap(); + + assert_eq!(output, Vec::::new()); + } + + #[tokio::test] + async fn test_async_roundtrip_small() { + use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt}; + let input = b"Hello, World!"; + let mut buffer = Cursor::new(Vec::new()); + let mut writer = MetadataBlocksWriter::new(&mut buffer); + AsyncWriteExt::write_all(&mut writer, input).await.unwrap(); + AsyncWriteExt::flush(&mut writer).await.unwrap(); + + AsyncSeekExt::seek(&mut buffer, SeekFrom::Start(0)) + .await + .unwrap(); + let mut reader = MetadataBlocksReader::new(buffer); + let mut output = Vec::new(); + AsyncReadExt::read_to_end(&mut reader, &mut output) + .await + .unwrap(); + + assert_eq!(output, input); + } + + #[tokio::test] + async fn test_async_roundtrip_exact_block_size() { + use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt}; + let input = vec![0x42u8; 8192]; + let mut buffer = Cursor::new(Vec::new()); + let mut writer = MetadataBlocksWriter::new(&mut buffer); + AsyncWriteExt::write_all(&mut writer, &input).await.unwrap(); + AsyncWriteExt::flush(&mut writer).await.unwrap(); + + AsyncSeekExt::seek(&mut buffer, SeekFrom::Start(0)) + .await + .unwrap(); + let mut reader = MetadataBlocksReader::new(buffer); + let mut output = Vec::new(); + AsyncReadExt::read_to_end(&mut reader, &mut output) + .await + .unwrap(); + + assert_eq!(output, input); + } + + #[tokio::test] + async fn test_async_roundtrip_larger_than_block() { + use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt}; + let input = vec![0x55u8; 16384]; + let mut buffer = Cursor::new(Vec::new()); + let mut writer = MetadataBlocksWriter::new(&mut buffer); + AsyncWriteExt::write_all(&mut writer, &input).await.unwrap(); + AsyncWriteExt::flush(&mut writer).await.unwrap(); + + AsyncSeekExt::seek(&mut buffer, SeekFrom::Start(0)) + .await + .unwrap(); + let mut reader = MetadataBlocksReader::new(buffer); + let mut output = Vec::new(); + AsyncReadExt::read_to_end(&mut reader, &mut output) + .await + .unwrap(); + + assert_eq!(output, input); + } + + #[tokio::test] + async fn test_async_roundtrip_multiple_blocks() { + use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt}; + let input = vec![0xAAu8; 24576]; + let mut buffer = Cursor::new(Vec::new()); + let mut writer = MetadataBlocksWriter::new(&mut buffer); + AsyncWriteExt::write_all(&mut writer, &input).await.unwrap(); + AsyncWriteExt::flush(&mut writer).await.unwrap(); + + AsyncSeekExt::seek(&mut buffer, SeekFrom::Start(0)) + .await + .unwrap(); + let mut reader = MetadataBlocksReader::new(buffer); + let mut output = Vec::new(); + AsyncReadExt::read_to_end(&mut reader, &mut output) + .await + .unwrap(); + + assert_eq!(output, input); + } + + proptest! { + #[test] + fn test_sync_roundtrip_proptest(input in prop::collection::vec(any::(), 0..50000)) { + use std::io::{Read, Write}; + let mut buffer = Cursor::new(Vec::new()); + let mut writer = MetadataBlocksWriter::new(&mut buffer); + writer.write_all(&input).unwrap(); + writer.flush().unwrap(); + + buffer.seek(SeekFrom::Start(0)).unwrap(); + let mut reader = MetadataBlocksReader::new(buffer); + let mut output = Vec::new(); + reader.read_to_end(&mut output).unwrap(); + + prop_assert_eq!(output, input); + } + + #[test] + fn test_sync_roundtrip_chunked_writes( + input in prop::collection::vec(any::(), 0..50000), + chunk_size in 1usize..1000 + ) { + use std::io::{Read, Write}; + let mut buffer = Cursor::new(Vec::new()); + let mut writer = MetadataBlocksWriter::new(&mut buffer); + + for chunk in input.chunks(chunk_size) { + writer.write_all(chunk).unwrap(); + } + writer.flush().unwrap(); + + buffer.seek(SeekFrom::Start(0)).unwrap(); + let mut reader = MetadataBlocksReader::new(buffer); + let mut output = Vec::new(); + reader.read_to_end(&mut output).unwrap(); + + prop_assert_eq!(output, input); + } + + #[test] + fn test_sync_roundtrip_chunked_reads( + input in prop::collection::vec(any::(), 0..50000), + chunk_size in 1usize..1000 + ) { + use std::io::{Read, Write}; + let mut buffer = Cursor::new(Vec::new()); + let mut writer = MetadataBlocksWriter::new(&mut buffer); + writer.write_all(&input).unwrap(); + writer.flush().unwrap(); + + buffer.seek(SeekFrom::Start(0)).unwrap(); + let mut reader = MetadataBlocksReader::new(buffer); + let mut output = Vec::new(); + let mut chunk = vec![0u8; chunk_size]; + + loop { + let n = reader.read(&mut chunk).unwrap(); + if n == 0 { + break; + } + output.extend_from_slice(&chunk[..n]); + } + + prop_assert_eq!(output, input); + } + } + + #[test] + fn test_sync_multiple_flush_cycles() { + use std::io::{Read, Write}; + let input1 = b"First block of data"; + let input2 = b"Second block of data"; + let input3 = b"Third block of data"; + + let mut buffer = Cursor::new(Vec::new()); + let mut writer = MetadataBlocksWriter::new(&mut buffer); + + writer.write_all(input1).unwrap(); + writer.flush().unwrap(); + + writer.write_all(input2).unwrap(); + writer.flush().unwrap(); + + writer.write_all(input3).unwrap(); + writer.flush().unwrap(); + + buffer.seek(SeekFrom::Start(0)).unwrap(); + let mut reader = MetadataBlocksReader::new(buffer); + let mut output = Vec::new(); + reader.read_to_end(&mut output).unwrap(); + + let expected: Vec = input1 + .iter() + .chain(input2.iter()) + .chain(input3.iter()) + .copied() + .collect(); + assert_eq!(output, expected); + } + + #[tokio::test] + async fn test_async_multiple_flush_cycles() { + use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt}; + let input1 = b"First block of data"; + let input2 = b"Second block of data"; + let input3 = b"Third block of data"; + + let mut buffer = Cursor::new(Vec::new()); + let mut writer = MetadataBlocksWriter::new(&mut buffer); + + AsyncWriteExt::write_all(&mut writer, input1).await.unwrap(); + AsyncWriteExt::flush(&mut writer).await.unwrap(); + + AsyncWriteExt::write_all(&mut writer, input2).await.unwrap(); + AsyncWriteExt::flush(&mut writer).await.unwrap(); + + AsyncWriteExt::write_all(&mut writer, input3).await.unwrap(); + AsyncWriteExt::flush(&mut writer).await.unwrap(); + + AsyncSeekExt::seek(&mut buffer, SeekFrom::Start(0)) + .await + .unwrap(); + let mut reader = MetadataBlocksReader::new(buffer); + let mut output = Vec::new(); + AsyncReadExt::read_to_end(&mut reader, &mut output) + .await + .unwrap(); + + let expected: Vec = input1 + .iter() + .chain(input2.iter()) + .chain(input3.iter()) + .copied() + .collect(); + assert_eq!(output, expected); + } + + #[test] + fn test_sync_size_header_format() { + use std::io::{Read, Write}; + let input = b"test"; + let mut buffer = Cursor::new(Vec::new()); + let mut writer = MetadataBlocksWriter::new(&mut buffer); + writer.write_all(input).unwrap(); + writer.flush().unwrap(); + + buffer.seek(SeekFrom::Start(0)).unwrap(); + let mut size_header = [0u8; 2]; + buffer.read_exact(&mut size_header).unwrap(); + + let size_value = u16::from_le_bytes(size_header); + let is_compressed = (size_value & 0x8000) == 0; + let size = size_value & 0x7FFF; + + assert!(is_compressed, "Data should be compressed"); + assert!(size > 0, "Compressed size should be greater than 0"); + assert!(size <= 8192, "Compressed size should not exceed 8192"); } } diff --git a/image-recipe/build.sh b/image-recipe/build.sh index 4b7c129c7..179ca779f 100755 --- a/image-recipe/build.sh +++ b/image-recipe/build.sh @@ -1,7 +1,7 @@ #!/bin/bash set -e -MAX_IMG_SECTORS=7217792 # 4GB +MAX_IMG_LEN=$((4 * 1024 * 1024 * 1024)) # 4GB echo "==== StartOS Image Build ====" @@ -129,6 +129,7 @@ EOT if [ "${IB_TARGET_PLATFORM}" = "raspberrypi" ]; then mkdir -p config/includes.chroot git clone --depth=1 --branch=stable https://github.com/raspberrypi/rpi-firmware.git config/includes.chroot/boot + rm -rf config/includes.chroot/boot/.git config/includes.chroot/boot/modules rsync -rLp $SOURCE_DIR/raspberrypi/squashfs/ config/includes.chroot/ fi @@ -260,14 +261,16 @@ if [ "${IMAGE_TYPE}" = iso ]; then elif [ "${IMAGE_TYPE}" = img ]; then - BOOT_START=2048 - BOOT_END=526335 - ROOT_START=526336 - ROOT_PART_END=$MAX_IMG_SECTORS + SECTOR_LEN=512 + BOOT_START=$((1024 * 1024)) # 1MiB + BOOT_LEN=$((512 * 1024 * 1024)) # 512MiB + BOOT_END=$((BOOT_START + BOOT_LEN - 1)) + ROOT_START=$((BOOT_END + 1)) + ROOT_LEN=$((MAX_IMG_LEN - ROOT_START)) + ROOT_END=$((MAX_IMG_LEN - 1)) TARGET_NAME=$prep_results_dir/${IMAGE_BASENAME}.img - TARGET_SIZE=$[($ROOT_PART_END+1)*512] - truncate -s $TARGET_SIZE $TARGET_NAME + truncate -s $MAX_IMG_LEN $TARGET_NAME sfdisk $TARGET_NAME <<-EOF label: dos @@ -275,17 +278,12 @@ elif [ "${IMAGE_TYPE}" = img ]; then unit: sectors sector-size: 512 - ${TARGET_NAME}1 : start=$BOOT_START, size=$((BOOT_END-BOOT_START+1)), type=c, bootable - ${TARGET_NAME}2 : start=$ROOT_START, size=$((ROOT_PART_END-ROOT_START+1)), type=83 + ${TARGET_NAME}1 : start=$((BOOT_START / SECTOR_LEN)), size=$((BOOT_LEN / SECTOR_LEN)), type=c, bootable + ${TARGET_NAME}2 : start=$((ROOT_START / SECTOR_LEN)), size=$((ROOT_LEN / SECTOR_LEN)), type=83 EOF - BOOT_OFFSET=$((BOOT_START * 512)) - BOOT_SIZE=$(((BOOT_END - BOOT_START + 1) * 512)) - ROOT_OFFSET=$((ROOT_START * 512)) - ROOT_SIZE=$(((ROOT_PART_END - ROOT_START + 1) * 512)) - - BOOT_DEV=$(losetup --show -f --offset $BOOT_OFFSET --sizelimit $BOOT_SIZE $TARGET_NAME) - ROOT_DEV=$(losetup --show -f --offset $ROOT_OFFSET --sizelimit $ROOT_SIZE $TARGET_NAME) + BOOT_DEV=$(losetup --show -f --offset $BOOT_START --sizelimit $BOOT_LEN $TARGET_NAME) + ROOT_DEV=$(losetup --show -f --offset $ROOT_START --sizelimit $ROOT_LEN $TARGET_NAME) mkfs.vfat -F32 $BOOT_DEV mkfs.ext4 $ROOT_DEV @@ -324,7 +322,7 @@ elif [ "${IMAGE_TYPE}" = img ]; then BLOCK_COUNT=$(dumpe2fs -h $ROOT_DEV | awk '/^Block count:/ { print $3 }') BLOCK_SIZE=$(dumpe2fs -h $ROOT_DEV | awk '/^Block size:/ { print $3 }') - SECTOR_LEN=$[$BLOCK_COUNT*$BLOCK_SIZE/512] + ROOT_LEN=$((BLOCK_COUNT * BLOCK_SIZE)) losetup -d $ROOT_DEV losetup -d $BOOT_DEV @@ -336,12 +334,11 @@ elif [ "${IMAGE_TYPE}" = img ]; then unit: sectors sector-size: 512 - ${TARGET_NAME}1 : start=$BOOT_START, size=$((BOOT_END-BOOT_START+1)), type=c, bootable - ${TARGET_NAME}2 : start=$ROOT_START, size=$SECTOR_LEN, type=83 + ${TARGET_NAME}1 : start=$((BOOT_START / SECTOR_LEN)), size=$((BOOT_LEN / SECTOR_LEN)), type=c, bootable + ${TARGET_NAME}2 : start=$((ROOT_START / SECTOR_LEN)), size=$((ROOT_LEN / SECTOR_LEN)), type=83 EOF - ROOT_PART_END=$[$ROOT_START+$SECTOR_LEN] - TARGET_SIZE=$[($ROOT_PART_END+1)*512] + TARGET_SIZE=$((ROOT_START + ROOT_LEN)) truncate -s $TARGET_SIZE $TARGET_NAME mv $TARGET_NAME $RESULTS_DIR/$IMAGE_BASENAME.img