Skip to content

Commit

Permalink
Ready to run wal one by one
Browse files Browse the repository at this point in the history
  • Loading branch information
namse committed Nov 6, 2024
1 parent dbbaf6f commit 716d6f7
Show file tree
Hide file tree
Showing 2 changed files with 147 additions and 117 deletions.
18 changes: 8 additions & 10 deletions luda-editor/new-server/bptree/src/id_set/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,18 @@ impl Backend {
) -> Result<()> {
let path = path.as_ref();

let mut wal = Wal::open(path.with_extension("wal"))?;

let mut file = std::fs::OpenOptions::new()
.write(true)
.read(true)
.create(true)
.truncate(false)
.open(path)?;

wal.execute(&mut file, true)?;
let mut wal = Wal::open(path.with_extension("wal"), &mut file)?;

if file.metadata()?.len() == 0 {
wal.write_init()?;
wal.execute(&mut file, false)?;
wal.execute_one(&mut file)?;
}

let this = Self { file, wal, cache };
Expand Down Expand Up @@ -143,24 +141,24 @@ impl Backend {
let mut sleep_time = Duration::from_millis(100);
let mut retrial = 0;

while let Err(flush_error) = self.wal.execute(&mut self.file, false) {
let is_corrupted = flush_error.is_corrupted();
while let Err(execute_error) = self.wal.execute_one(&mut self.file) {
let is_corrupted = execute_error.is_corrupted();
if is_corrupted {
unreachable!(
"WAL File did fsync on write but corrupted! error: {:?}",
flush_error
execute_error
);
}
retrial += 1;

if retrial > 10 {
unreachable!(
"Too many retrial on wal flush. last error: {:?}",
flush_error
"Too many retrial on wal execute. last error: {:?}",
execute_error
);
}

eprintln!("Error on wal flush: {:?}", flush_error);
eprintln!("Error on wal execute: {:?}", execute_error);
tokio::time::sleep(sleep_time).await;
sleep_time = (sleep_time * 2).max(Duration::from_secs(4));
}
Expand Down
246 changes: 139 additions & 107 deletions luda-editor/new-server/bptree/src/id_set/wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,138 +12,157 @@ use std::{
fs::{File, OpenOptions},
io::{BufReader, BufWriter, ErrorKind, Read, Seek, SeekFrom, Write},
mem::MaybeUninit,
sync::Arc,
};

pub struct Wal {
buf_writer: BufWriter<File>,
dirty: bool,
writer: BufWriter<Arc<File>>,
reader: BufReader<Arc<File>>,
read_offset: u64,
write_offset: u64,
written: u64,
}

impl Wal {
pub(crate) fn open(path: std::path::PathBuf) -> Result<Self> {
let file = OpenOptions::new()
.create(true)
.read(true)
.write(true)
.truncate(false)
.open(path)?;

Ok(Self {
dirty: file.metadata()?.len() != 0,
buf_writer: BufWriter::new(file),
})
}
pub(crate) fn open(path: std::path::PathBuf, file: &mut File) -> Result<Self, ExecuteError> {
let wal_file = Arc::new(
OpenOptions::new()
.create(true)
.read(true)
.write(true)
.truncate(false)
.open(path)?,
);

let wal_file_len = wal_file.metadata()?.len();

let mut this = Self {
read_offset: 0,
write_offset: 0,
written: 0,
writer: BufWriter::new(wal_file.clone()),
reader: BufReader::new(wal_file),
};

fn file(&self) -> &File {
self.buf_writer.get_ref()
}
if wal_file_len == 0 {
return Ok(this);
}

pub(crate) fn execute(
&mut self,
file: &mut File,
ignore_corruption: bool,
) -> Result<(), FlushError> {
if !self.dirty {
return Ok(());
this.write_offset = wal_file_len;
while this.write_offset > 0 {
match this.execute_one(file) {
Ok(_) => continue,
Err(err) => {
if err.is_corrupted() {
this.reset()?;
break;
} else {
return Err(err);
}
}
};
}

let wal_file_len = self.file().metadata()?.len();
if wal_file_len == 0 {
Ok(this)
}
pub(crate) fn execute_one(&mut self, file: &mut File) -> Result<(), ExecuteError> {
self.reset_if_need()?;

if self.read_offset == self.write_offset {
return Ok(());
}

let mut reader = BufReader::new(self.file());
reader.seek(SeekFrom::Start(0))?;
self.reader.seek(SeekFrom::Start(self.read_offset))?;

let result: Result<(), FlushError> = (|| {
while reader.stream_position()? < wal_file_len {
let header = unsafe {
let mut header = MaybeUninit::<WalHeader>::uninit();
reader.read_exact(std::slice::from_raw_parts_mut(
header.as_mut_ptr() as *mut u8,
size_of::<WalHeader>(),
))?;
let header = unsafe {
let mut header = MaybeUninit::<WalHeader>::uninit();
self.reader.read_exact(std::slice::from_raw_parts_mut(
header.as_mut_ptr() as *mut u8,
size_of::<WalHeader>(),
))?;

header.assume_init()
};
header.assume_init()
};
match header.body_types {
// Init
0 => {
let root_node_offset = PageOffset::new(1);

match header.body_types {
// Init
0 => {
let root_node_offset = PageOffset::new(1);
let header = Header::new(PageOffset::NULL, root_node_offset, PageOffset::new(2));

let header =
Header::new(PageOffset::NULL, root_node_offset, PageOffset::new(2));
let root_node = LeafNode::new();

let root_node = LeafNode::new();
let mut bytes = Vec::with_capacity(size_of::<Header>() + size_of::<LeafNode>());
bytes.put_slice(header.as_slice());
bytes.put_slice(root_node.as_slice());

let mut bytes =
Vec::with_capacity(size_of::<Header>() + size_of::<LeafNode>());
bytes.put_slice(header.as_slice());
bytes.put_slice(root_node.as_slice());
file.set_len(0)?;
file.write_all(&bytes)?;
file.sync_all()?;
}
// PutPage
1 => {
let body = unsafe {
let mut body = MaybeUninit::<PutPage>::uninit();
self.reader.read_exact(std::slice::from_raw_parts_mut(
body.as_mut_ptr() as *mut u8,
header.body_length as usize,
))?;
body.assume_init()
};

file.set_len(0)?;
file.write_all(&bytes)?;
file.sync_all()?;
}
// PutPage
1 => {
let body = unsafe {
let mut body = MaybeUninit::<PutPage>::uninit();
reader.read_exact(std::slice::from_raw_parts_mut(
body.as_mut_ptr() as *mut u8,
header.body_length as usize,
))?;
body.assume_init()
};

let body_checksum = checksum(body.as_slice());
let bad_checksum = body_checksum != header.checksum;
if bad_checksum {
return Err(FlushError::Checksum {
expected: header.checksum,
actual: body_checksum,
});
}

file.seek(body.page_offset.file_pos())?;
file.write_all(body.page.as_slice())?;
}
body_type => {
return Err(FlushError::WrongBodyType { body_type });
}
let body_checksum = checksum(body.as_slice());
let bad_checksum = body_checksum != header.checksum;
if bad_checksum {
return Err(ExecuteError::Checksum {
expected: header.checksum,
actual: body_checksum,
});
}

file.seek(body.page_offset.file_pos())?;
file.write_all(body.page.as_slice())?;
}
body_type => {
return Err(ExecuteError::WrongBodyType { body_type });
}
}

Ok(())
})();
self.read_offset += size_of::<WalHeader>() as u64 + header.body_length as u64;

if let Err(err) = result {
let corrupted = match &err {
FlushError::Io(error) => error.kind() != ErrorKind::UnexpectedEof,
FlushError::Checksum { .. } => true,
FlushError::WrongBodyType { .. } => true,
};
if corrupted && !ignore_corruption {
return Err(err);
}
};
if self.read_offset == self.write_offset {
self.reset()?;
}

self.buf_writer.seek(SeekFrom::Start(0))?;
Ok(())
}

fn reset_if_need(&mut self) -> std::io::Result<()> {
if self.read_offset == self.write_offset && self.read_offset != 0 {
self.reset()?;
}
Ok(())
}

fn reset(&mut self) -> std::io::Result<()> {
self.file().set_len(0)?;
self.file().sync_all()?;
self.dirty = false;

self.read_offset = 0;
self.write_offset = 0;
Ok(())
}

pub(crate) fn write_init(&mut self) -> Result<()> {
self.start_write()?;
self.write_wal(Init)?;
self.sync_all()?;
Ok(())
}

pub(crate) fn update_pages(&mut self, pages: &BTreeMap<PageOffset, Page>) -> Result<()> {
self.start_write()?;

if pages.is_empty() {
return Ok(());
}
Expand All @@ -161,32 +180,45 @@ impl Wal {

Ok(())
}
fn write_wal<Body: WalBody>(&mut self, body: Body) -> Result<()> {
self.dirty = true;

fn file(&self) -> &File {
self.writer.get_ref()
}
fn write_wal<Body: WalBody>(&mut self, body: Body) -> Result<()> {
let body_bytes = body.as_slice();
let header = WalHeader {
checksum: checksum(body_bytes),
body_length: body_bytes.len() as u32,
body_types: Body::body_types(),
};

self.buf_writer.write_all(header.as_slice())?;
self.buf_writer.write_all(body_bytes)?;
self.writer.write_all(header.as_slice())?;
self.writer.write_all(body_bytes)?;

self.written += size_of::<WalHeader>() as u64 + body_bytes.len() as u64;

Ok(())
}

fn sync_all(&mut self) -> Result<()> {
self.buf_writer.flush()?;
self.writer.flush()?;
self.file().sync_all()?;
self.write_offset += self.written;

Ok(())
}

/// This must be called before writing to the file,
/// because the wal file would be corrupted on previous write with error.
fn start_write(&mut self) -> Result<()> {
self.writer.seek(SeekFrom::Start(self.write_offset))?;

Ok(())
}
}

#[derive(Debug)]
pub(crate) enum FlushError {
pub(crate) enum ExecuteError {
Io(std::io::Error),
#[allow(dead_code)]
Checksum {
Expand All @@ -198,26 +230,26 @@ pub(crate) enum FlushError {
body_type: u8,
},
}
impl FlushError {
impl ExecuteError {
pub(crate) fn is_corrupted(&self) -> bool {
match self {
FlushError::Io(error) => error.kind() == std::io::ErrorKind::UnexpectedEof,
FlushError::Checksum { .. } => true,
FlushError::WrongBodyType { .. } => true,
ExecuteError::Io(error) => error.kind() == ErrorKind::UnexpectedEof,
ExecuteError::Checksum { .. } => true,
ExecuteError::WrongBodyType { .. } => true,
}
}
}
impl From<std::io::Error> for FlushError {
impl From<std::io::Error> for ExecuteError {
fn from(err: std::io::Error) -> Self {
Self::Io(err)
}
}
impl Display for FlushError {
impl Display for ExecuteError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", self)
}
}
impl std::error::Error for FlushError {}
impl std::error::Error for ExecuteError {}

#[repr(C)]
struct WalHeader {
Expand Down

0 comments on commit 716d6f7

Please sign in to comment.