From f9e75ce9511d15ad7ef5f7ed1b23da1529638d0e Mon Sep 17 00:00:00 2001 From: namse Date: Sun, 3 Nov 2024 05:13:22 +0000 Subject: [PATCH] Write into leaf node if not full --- .../src/document_store/nfs/bp_id_tree.rs | 207 ----------- .../src/document_store/nfs/bp_id_tree/mod.rs | 321 ++++++++++++++++++ .../src/document_store/nfs/bp_id_tree/wal.rs | 172 ++++++++++ .../src/document_store/nfs/db_thread.rs | 5 +- .../database/src/document_store/nfs/mod.rs | 16 +- 5 files changed, 505 insertions(+), 216 deletions(-) delete mode 100644 luda-editor/new-server/database/database/src/document_store/nfs/bp_id_tree.rs create mode 100644 luda-editor/new-server/database/database/src/document_store/nfs/bp_id_tree/mod.rs create mode 100644 luda-editor/new-server/database/database/src/document_store/nfs/bp_id_tree/wal.rs diff --git a/luda-editor/new-server/database/database/src/document_store/nfs/bp_id_tree.rs b/luda-editor/new-server/database/database/src/document_store/nfs/bp_id_tree.rs deleted file mode 100644 index fad366913..000000000 --- a/luda-editor/new-server/database/database/src/document_store/nfs/bp_id_tree.rs +++ /dev/null @@ -1,207 +0,0 @@ -//! # B+IdTree -//! -//! B+IdTree is a B+Tree implementation for storing 128bit Ids. -//! All node size is 4KB, which will be called a page. -//! Offset size is 4Byte, so the maximum item count is 268,435,456. -//! -//! u32::MAX will be used as a null. -//! Endian is little. -//! -//! ## File Structure -//! -//! ### Free Page Stack -//! -//! Linked List, storing free page's offset in the file. -//! - Next Node Offset: u32 -//! - Length in this page: u32 -//! - Free Page Offsets: [u32; 1022] -//! -//! ### Header -//! - Free Internal Node Stack Top Offset: u32 -//! - Root Node Offset: u32 - -use super::crc; -use bytes::{Buf, BufMut, Bytes}; -use std::{ - fs::File, - io::{Read, Seek, SeekFrom, Write}, - path::Path, -}; - -pub struct BpIdTree { - file: std::fs::File, - wal_file: std::fs::File, -} - -impl BpIdTree { - pub fn open(path: impl AsRef) -> std::io::Result { - let path = path.as_ref(); - - let mut wal_file = std::fs::OpenOptions::new() - .write(true) - .create(true) - .open(path.with_extension("wal"))?; - - let mut file = std::fs::OpenOptions::new() - .write(true) - .create(true) - .open(path)?; - - if file.metadata()?.len() == 0 { - init(&mut file, &mut wal_file)? - }; - - flush_wal(&mut file, &mut wal_file); - - Ok(Self { file, wal_file }) - } -} - -/// # Wal File -/// - Header -/// - Body Checksum: u64 -/// - Body Length: u32 -/// - Body types: u8 -/// - Body -/// - Init: type 0 -/// - nothing -/// -#[repr(C)] -enum WalBody { - Init, -} - -const WAL_HEADER_SIZE: usize = - std::mem::size_of::() + std::mem::size_of::() + std::mem::size_of::(); - -impl WalBody { - fn write(self, wal_file: &mut std::fs::File) -> std::io::Result<()> { - let (body_types, body) = match self { - Self::Init => (0u8, vec![]), - }; - let body_checksum = crc().checksum(&body); - let body_length = body.len() as u32; - - let wal_bytes = { - let mut wal_bytes = Vec::with_capacity(WAL_HEADER_SIZE + body.len()); - wal_bytes.put_u64_le(body_checksum); - wal_bytes.put_u32_le(body_length); - wal_bytes.put_u8(body_types); - wal_bytes.put_slice(&body); - wal_bytes - }; - - wal_file.write_all(&wal_bytes)?; - wal_file.sync_all()?; - Ok(()) - } - fn read(wal_file: &mut std::fs::File) -> std::io::Result> { - let mut bytes = { - let mut bytes = vec![]; - wal_file.read_to_end(&mut bytes)?; - Bytes::from(bytes) - }; - - if bytes.len() < WAL_HEADER_SIZE { - return Ok(None); - } - - let checksum = bytes.get_u64_le(); - let body_length = bytes.get_u32_le() as usize; - let body_types = bytes.get_u8(); - - if bytes.len() < body_length { - return Ok(None); - } - - let body = bytes.split_to(body_length); - if crc().checksum(&body) != checksum { - return Ok(None); - } - - match body_types { - 0 => Ok(Some(Self::Init)), - _ => unreachable!(), - } - } -} - -fn init(file: &mut File, wal_file: &mut File) -> std::io::Result<()> { - WalBody::Init.write(wal_file)?; - - let header = Header { - free_page_stack_top_offset: Offset::NULL, - root_node_offset: Offset::NULL, - }; - file.write_all(unsafe { - std::slice::from_raw_parts( - &header as *const _ as *const u8, - std::mem::size_of::
(), - ) - })?; - file.sync_all()?; - - Ok(()) -} - -fn flush_wal(file: &mut File, wal_file: &mut File) -> std::io::Result<()> { - let Some(wal_body) = WalBody::read(wal_file)? else { - return Ok(()); - }; - match wal_body { - WalBody::Init => init(file, wal_file)?, - } - wal_file.seek(SeekFrom::Start(0))?; - wal_file.set_len(0)?; - wal_file.sync_all()?; - Ok(()) -} - -#[repr(C)] -struct Offset { - value: u32, -} -impl Offset { - const NULL: Self = Self { value: u32::MAX }; -} - -#[repr(C)] -struct Header { - free_page_stack_top_offset: Offset, - root_node_offset: Offset, -} - -#[repr(C)] -struct FreePageStackNode { - next_node_offset: Offset, - length: u32, - free_page_offsets: [u32; 1022], -} - -#[repr(C)] -struct InternalNode { - parent_offset: Offset, - key_count: u32, - ids: [u128; 203], - child_offsets: [u32; 204], - _padding: u32, -} - -#[repr(C)] -struct LeafNode { - parent_offset: Offset, - id_count: u32, - ids: [u128; 255], -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn node_size() { - assert_eq!(std::mem::size_of::(), 4096); - assert_eq!(std::mem::size_of::(), 4096); - assert_eq!(std::mem::size_of::(), 4096); - } -} diff --git a/luda-editor/new-server/database/database/src/document_store/nfs/bp_id_tree/mod.rs b/luda-editor/new-server/database/database/src/document_store/nfs/bp_id_tree/mod.rs new file mode 100644 index 000000000..c8fb30fa3 --- /dev/null +++ b/luda-editor/new-server/database/database/src/document_store/nfs/bp_id_tree/mod.rs @@ -0,0 +1,321 @@ +//! # B+IdTree +//! +//! B+IdTree is a B+Tree implementation for storing 128bit Ids. +//! All node size is 4KB, which will be called a page. +//! Page Index size is 31 bit. +//! B+IdTree can store 2^31 pages, total Ids are 2^31 * 255 = 2^38. +//! +//! Page Index '0' will be used as a null. +//! Endian is little. +//! +//! ## File Structure +//! +//! ### Free Page Stack +//! +//! Linked List, storing free page's offset in the file. +//! - Next Free Page Stack Node Index: u32 +//! - Length in this page: u32 +//! - Free Page Indexes: [u32; 1022] +//! +//! ### Header +//! Header has one page size. +//! - Free Page Stack Top Node Index: u32 +//! - Root Node Index: u32 +//! - Root Node would be an Internal Node or a Leaf Node. +//! +//! ### Internal Node +//! - Leaf Type Bit and Parent Node Index: u32 +//! - 31 bit: Parent Node Index +//! - MSB 1 bit: 0 for Internal Node +//! +//! ### Leaf Node +//! - Leaf Type Bit and Parent Node Index: u32 +//! - 31 bit: Parent Node Index +//! - MSB 1 bit: 1 for Leaf Node + +mod wal; + +use std::{ + collections::HashMap, + fs::File, + io::{Read, Result, Seek, SeekFrom, Write}, + mem::MaybeUninit, + num::NonZeroU32, + path::Path, +}; +use wal::*; + +pub struct BpIdTree { + file: File, + wal: Wal, + header: Header, + // TODO: Remove nodes cache for memory usage. + nodes: HashMap, +} + +impl BpIdTree { + pub fn open(path: impl AsRef) -> Result { + let path = path.as_ref(); + + let mut wal = Wal::open(path.with_extension("wal"))?; + + let mut file = std::fs::OpenOptions::new() + .write(true) + .create(true) + .truncate(false) + .open(path)?; + + wal.flush(&mut file)?; + + if file.metadata()?.len() == 0 { + wal.write_init()?; + wal.flush(&mut file)?; + } + + Self::read_from_file(file, wal) + } + pub fn insert(&mut self, id: u128) -> Result<()> { + let (node_index, mut leaf_node) = self.find_leaf_node_for_insertion(id)?; + if !leaf_node.is_full() { + self.wal.write_insert_to_leaf_node(node_index, id)?; + + leaf_node.insert(id); + self.nodes.insert(node_index, leaf_node.into_node()); + + return Ok(()); + } + // - Otherwise, before inserting the new record + // - Split the node. + // - original node has ⌈(K+1)/2⌉ items + // - new node has ⌊(K+1)/2⌋ items + // - Copy ⌈(K+1)/2⌉-th key to the parent, and insert the new node to the parent. + // - Repeat until a parent is found that need not split. + // - Insert the new record into the new node. + // - If the root splits, treat it as if it has an empty parent and split as outline above. + todo!() + } + pub fn delete(&mut self, id: u128) -> Result<()> { + todo!() + } + pub fn iter(&self) -> Result> { + // TODO + Ok(std::iter::empty()) + } + fn find_leaf_node_for_insertion(&mut self, id: u128) -> Result<(PageIndex, LeafNode)> { + let mut node_index = self.header.root_node_index; + + loop { + let node = self.node(node_index)?; + if node.is_leaf() { + return Ok((node_index, node.into_leaf_node())); + } + let internal_node = node.into_internal_node(); + node_index = internal_node.find_child_node_index_for(id); + } + } + fn node(&mut self, node_index: PageIndex) -> Result { + if let Some(node) = self.nodes.get(&node_index) { + return Ok(*node); + } + + self.read_node_from_file(node_index) + } + fn read_node_from_file(&mut self, node_index: PageIndex) -> Result { + self.wal.flush(&mut self.file)?; + + let node = read_node_from_file(&mut self.file, node_index)?; + + self.nodes.insert(node_index, node); + + Ok(node) + } + fn read_from_file(mut file: File, wal: Wal) -> Result { + let header = unsafe { + let mut header = MaybeUninit::
::uninit(); + file.seek(SeekFrom::Start(0))?; + file.read_exact(std::slice::from_raw_parts_mut( + header.as_mut_ptr() as *mut u8, + std::mem::size_of::
(), + ))?; + header.assume_init() + }; + + Ok(Self { + file, + wal, + header, + nodes: HashMap::new(), + }) + } +} + +fn read_node_from_file(file: &mut File, node_index: PageIndex) -> Result { + file.seek(node_index.file_pos())?; + + let node = unsafe { + let mut node = MaybeUninit::::uninit(); + file.read_exact(std::slice::from_raw_parts_mut( + node.as_mut_ptr() as *mut u8, + std::mem::size_of::(), + ))?; + node.assume_init() + }; + Ok(node) +} + +#[repr(C)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] +struct PageIndex { + value: u32, +} +impl PageIndex { + const NULL: Self = Self { value: 0 }; + fn with_node_type_msb(&self, is_leaf: bool) -> u32 { + self.value | if is_leaf { 0x80000000 } else { 0 } + } + + fn without_node_type_msb(value: NonZeroU32) -> PageIndex { + Self { value: value.get() } + } + + fn file_pos(&self) -> SeekFrom { + SeekFrom::Start((self.value & 0x7FFFFFFF) as u64 * 4096) + } +} + +trait AsSlice: Sized { + fn as_slice(&self) -> &[u8] { + unsafe { + std::slice::from_raw_parts(self as *const _ as *const u8, std::mem::size_of::()) + } + } +} + +#[repr(C)] +struct Header { + free_page_stack_top_page_index: PageIndex, + root_node_index: PageIndex, + padding: [u32; 1022], +} +impl AsSlice for Header {} + +#[repr(C)] +struct FreePageStackNode { + next_node_index: PageIndex, + length: u32, + free_page_indexes: [u32; 1022], +} +impl AsSlice for FreePageStackNode {} + +#[repr(C)] +struct InternalNode { + leaf_type_bit_and_parent_index: u32, + id_count: u32, + ids: [u128; 203], + child_indexes: [PageIndex; 204], + _padding: u32, +} +impl AsSlice for InternalNode {} + +impl InternalNode { + fn offset(&self) -> PageIndex { + PageIndex { + value: self.leaf_type_bit_and_parent_index & 0x7FFFFFFF, + } + } + + fn find_child_node_index_for(&self, id: u128) -> PageIndex { + self.ids + .iter() + .take(self.id_count as usize) + .enumerate() + .find(|(_, &key_id)| id < key_id) + .map(|(i, _)| self.child_indexes[i]) + .unwrap_or(self.child_indexes[self.id_count as usize]) + } +} + +#[repr(C)] +struct LeafNode { + leaf_type_bit_and_parent_node_index: u32, + id_count: u32, + ids: [u128; 255], +} +impl AsSlice for LeafNode {} + +impl LeafNode { + fn parent_node_index(&self) -> PageIndex { + PageIndex { + value: self.leaf_type_bit_and_parent_node_index & 0x7FFFFFFF, + } + } + + fn new(parent_node_index: PageIndex) -> Self { + Self { + leaf_type_bit_and_parent_node_index: parent_node_index.with_node_type_msb(true), + id_count: 0, + ids: [0; 255], + } + } + + fn is_full(&self) -> bool { + self.id_count == self.ids.len() as u32 + } + + fn insert(&mut self, id: u128) { + let index = self + .ids + .iter() + .take(self.id_count as usize) + .enumerate() + .find(|(_, &key_id)| id < key_id) + .map(|(i, _)| i) + .unwrap_or(self.id_count as usize); + self.ids.copy_within(index.., index + 1); + self.ids[index] = id; + self.id_count += 1; + } + + fn into_node(self) -> Node { + unsafe { std::mem::transmute(self) } + } +} + +#[repr(C)] +#[derive(Clone, Copy)] +struct Node { + leaf_type_bit_and_parent_node_index: u32, + _padding: [u8; 4092], +} +impl AsSlice for Node {} +impl Node { + fn is_leaf(&self) -> bool { + self.leaf_type_bit_and_parent_node_index & 0x80000000 != 0 + } + fn into_internal_node(self) -> InternalNode { + unsafe { std::mem::transmute(self) } + } + fn into_leaf_node(self) -> LeafNode { + unsafe { std::mem::transmute(self) } + } + fn as_internal_node_mut(&mut self) -> &mut InternalNode { + unsafe { std::mem::transmute(self) } + } + fn as_leaf_node_mut(&mut self) -> &mut LeafNode { + unsafe { std::mem::transmute(self) } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn node_size() { + assert_eq!(std::mem::size_of::
(), 4096); + assert_eq!(std::mem::size_of::(), 4096); + assert_eq!(std::mem::size_of::(), 4096); + assert_eq!(std::mem::size_of::(), 4096); + assert_eq!(std::mem::size_of::(), 4096); + } +} diff --git a/luda-editor/new-server/database/database/src/document_store/nfs/bp_id_tree/wal.rs b/luda-editor/new-server/database/database/src/document_store/nfs/bp_id_tree/wal.rs new file mode 100644 index 000000000..47be37306 --- /dev/null +++ b/luda-editor/new-server/database/database/src/document_store/nfs/bp_id_tree/wal.rs @@ -0,0 +1,172 @@ +//! # Wal File +//! +//! [Header][Body][Header][Body]... +//! +//! - Header +//! - Body Checksum: u64 +//! - Body Length: u32 +//! - Body types: u8 +//! +//! - Init(0) Body +//! - nothing +//! +//! - InsertToLeafNode(1) Body +//! - NodeIndex: PageIndex +//! - Id: u128 + +use super::{super::crc, *}; +use bytes::BufMut; +use std::{ + fs::File, + io::{BufReader, Read, Result, Seek, Write}, +}; + +pub struct Wal { + file: File, + dirty: bool, +} + +impl Wal { + pub(crate) fn open(path: std::path::PathBuf) -> Result { + let file = std::fs::OpenOptions::new() + .create(true) + .append(true) + .open(path)?; + + Ok(Self { file, dirty: true }) + } + + pub(crate) fn flush(&mut self, file: &mut File) -> Result<()> { + if !self.dirty { + return Ok(()); + } + + let wal_file_len = self.file.metadata()?.len(); + let mut reader = BufReader::new(&mut self.file); + + while wal_file_len > reader.stream_position()? { + let header = unsafe { + let mut header = MaybeUninit::::uninit(); + reader.read_exact(std::slice::from_raw_parts_mut( + header.as_mut_ptr() as *mut u8, + size_of::(), + ))?; + header.assume_init() + }; + + match header.body_types { + // Init + 0 => { + let root_node_index = + PageIndex::without_node_type_msb(NonZeroU32::new(1).unwrap()); + + let header = Header { + free_page_stack_top_page_index: PageIndex::NULL, + root_node_index, + padding: [0; 1022], + }; + + let root_node = LeafNode::new(PageIndex::NULL); + + let mut bytes = Vec::with_capacity(size_of::
() + size_of::()); + bytes.put_slice(header.as_slice()); + bytes.put_slice(root_node.as_slice()); + + file.set_len(0)?; + file.write_all(&bytes)?; + file.sync_all()?; + } + // InsertToLeafNode + 1 => { + let body = unsafe { + let mut body = MaybeUninit::::uninit(); + reader.read_exact(std::slice::from_raw_parts_mut( + body.as_mut_ptr() as *mut u8, + header.body_length as usize, + ))?; + body.assume_init() + }; + + let mut node = read_node_from_file(file, body.node_index)?.into_leaf_node(); + node.insert(body.id); + write_node_to_file(file, body.node_index, node.into_node())?; + } + _ => unreachable!(), + } + } + + if wal_file_len > 0 { + self.file.set_len(0)?; + self.file.sync_all()?; + } + self.dirty = false; + + Ok(()) + } + + pub(crate) fn write_init(&mut self) -> Result<()> { + self.dirty = true; + + let body = []; + let header = WalHeader { + checksum: crc().checksum(&body), + body_length: body.len() as u32, + body_types: 0u8, + }; + + self.file.write_all(header.as_slice())?; + self.file.sync_all()?; + Ok(()) + } + + pub(crate) fn write_insert_to_leaf_node( + &mut self, + node_index: PageIndex, + id: u128, + ) -> Result<()> { + self.dirty = true; + + let body = InsertToLeafNodeBody { node_index, id }; + let body_bytes = body.as_slice(); + let header = WalHeader { + checksum: crc().checksum(body_bytes), + body_length: body_bytes.len() as u32, + body_types: 1u8, + }; + + let mut bytes = [0u8; size_of::() + size_of::()]; + { + let mut bytes = bytes.as_mut(); + bytes.put_slice(header.as_slice()); + bytes.put_slice(body_bytes); + } + + self.file.write_all(&bytes)?; + self.file.sync_all()?; + Ok(()) + } +} + +fn write_node_to_file(file: &mut File, node_index: PageIndex, node: Node) -> Result<()> { + file.seek(node_index.file_pos())?; + + file.write_all(node.as_slice())?; + file.sync_all()?; + + Ok(()) +} + +#[repr(C)] +struct WalHeader { + checksum: u64, + body_length: u32, + body_types: u8, +} +impl AsSlice for WalHeader {} + +#[repr(C)] +struct InsertToLeafNodeBody { + node_index: PageIndex, + id: u128, +} +impl AsSlice for InsertToLeafNodeBody {} diff --git a/luda-editor/new-server/database/database/src/document_store/nfs/db_thread.rs b/luda-editor/new-server/database/database/src/document_store/nfs/db_thread.rs index fb95f7cad..d38b1971f 100644 --- a/luda-editor/new-server/database/database/src/document_store/nfs/db_thread.rs +++ b/luda-editor/new-server/database/database/src/document_store/nfs/db_thread.rs @@ -20,7 +20,7 @@ pub(crate) fn db_thread( while let Ok(request) = db_request_rx.recv() { match request { - DbThreadRequest::Read { pk, sk, tx } => { + DbThreadRequest::Read { key, tx } => { if let Some(cached) = cache.get(&key) { _ = tx.send(Ok(cached.get())); continue; @@ -211,8 +211,7 @@ fn get_last_accessed_secs() -> u64 { pub(crate) enum DbThreadRequest { Read { - pk: String, - sk: Option, + key: String, tx: DataTx, }, ReadResult { diff --git a/luda-editor/new-server/database/database/src/document_store/nfs/mod.rs b/luda-editor/new-server/database/database/src/document_store/nfs/mod.rs index 768d40184..c79bda461 100644 --- a/luda-editor/new-server/database/database/src/document_store/nfs/mod.rs +++ b/luda-editor/new-server/database/database/src/document_store/nfs/mod.rs @@ -25,6 +25,14 @@ pub struct NfsV4DocStore { impl NfsV4DocStore { pub fn new(mount_point: impl AsRef) -> Self { + // remove below. + { + let mut tree = bp_id_tree::BpIdTree::open(&mount_point).unwrap(); + _ = tree.insert(5); + _ = tree.delete(5); + _ = tree.iter(); + } + let mount_point = mount_point.as_ref().to_path_buf(); let (db_request_tx, db_request_rx) = std::sync::mpsc::channel(); @@ -39,14 +47,10 @@ impl NfsV4DocStore { Self { db_request_tx } } /// Consistency: Read-After-Write - async fn read(&self, pk: &str, sk: Option<&str>) -> Result> { + async fn read(&self, key: String) -> Result> { let (tx, rx) = oneshot::channel(); self.db_request_tx - .send(DbThreadRequest::Read { - pk: pk.to_string(), - sk: sk.map(|s| s.to_string()), - tx, - }) + .send(DbThreadRequest::Read { key, tx }) .map_err(|_| TransactionError::DbThreadDown)?; rx.await.map_err(|_| TransactionError::DbThreadDown)? }