Skip to content

Commit b87255c

Browse files
committed
Split code into modules
1 parent e8659a7 commit b87255c

File tree

3 files changed

+266
-256
lines changed

3 files changed

+266
-256
lines changed

src/insert.rs

Lines changed: 214 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,214 @@
1+
use std::ops::Bound;
2+
use std::sync::atomic::Ordering;
3+
4+
use crate::hashing::{PartedHash, USER_NAMESPACE};
5+
use crate::shard::{InsertStatus, Shard};
6+
use crate::store::VickyStore;
7+
use crate::{Result, VickyError};
8+
9+
impl VickyStore {
10+
fn compact(&self, shard_end: u32, write_offset: u32) -> Result<bool> {
11+
let mut guard = self.shards.write().unwrap();
12+
// it's possible that another thread already compacted this shard
13+
if guard
14+
.get(&shard_end)
15+
.unwrap()
16+
.header
17+
.write_offset
18+
.load(Ordering::Relaxed)
19+
< write_offset
20+
{
21+
return Ok(false);
22+
}
23+
24+
let removed_shard = guard.remove(&shard_end).unwrap();
25+
let orig_filename = self.dir_path.join(format!(
26+
"shard_{:04x}-{:04x}",
27+
removed_shard.span.start, removed_shard.span.end
28+
));
29+
let tmpfile = self.dir_path.join(format!(
30+
"compact_{:04x}-{:04x}",
31+
removed_shard.span.start, removed_shard.span.end
32+
));
33+
let compacted_shard = Shard::open(
34+
tmpfile.clone(),
35+
removed_shard.span.clone(),
36+
true,
37+
self.config.clone(),
38+
)?;
39+
40+
self.num_compactions.fetch_add(1, Ordering::SeqCst);
41+
42+
for res in removed_shard.unlocked_iter() {
43+
let (k, v) = res?;
44+
// XXX: this will not work with namespaces
45+
let ph = PartedHash::from_buffer(USER_NAMESPACE, &self.config.secret_key, &k);
46+
47+
let status = compacted_shard.insert(ph, &k, &v)?;
48+
assert!(matches!(status, InsertStatus::Added), "{status:?}");
49+
}
50+
51+
std::fs::rename(tmpfile, &orig_filename)?;
52+
guard.insert(shard_end, compacted_shard);
53+
Ok(true)
54+
}
55+
56+
fn split(&self, shard_start: u32, shard_end: u32) -> Result<bool> {
57+
let mut guard = self.shards.write().unwrap();
58+
// it's possible that another thread already split this range - check if midpoint exists, and if so, bail out
59+
let midpoint = shard_start / 2 + shard_end / 2;
60+
if guard.contains_key(&midpoint) {
61+
return Ok(false);
62+
}
63+
64+
let removed_shard = guard.remove(&shard_end).unwrap();
65+
66+
let bottomfile = self
67+
.dir_path
68+
.join(format!("bottom_{:04x}-{:04x}", shard_start, midpoint));
69+
let topfile = self
70+
.dir_path
71+
.join(format!("top_{:04x}-{:04x}", midpoint, shard_end));
72+
73+
let bottom_shard = Shard::open(
74+
bottomfile.clone(),
75+
shard_start..midpoint,
76+
true,
77+
self.config.clone(),
78+
)?;
79+
let top_shard = Shard::open(
80+
topfile.clone(),
81+
midpoint..shard_end,
82+
true,
83+
self.config.clone(),
84+
)?;
85+
86+
for res in removed_shard.unlocked_iter() {
87+
let (k, v) = res?;
88+
89+
let ph = PartedHash::from_buffer(USER_NAMESPACE, &self.config.secret_key, &k);
90+
let status = if (ph.shard_selector as u32) < midpoint {
91+
bottom_shard.insert(ph, &k, &v)?
92+
} else {
93+
top_shard.insert(ph, &k, &v)?
94+
};
95+
assert!(matches!(status, InsertStatus::Added), "{status:?}");
96+
}
97+
98+
self.num_splits.fetch_add(1, Ordering::SeqCst);
99+
100+
// this is not atomic, so when loading, we need to take the larger span if it exists and
101+
// delete the partial ones
102+
std::fs::rename(
103+
bottomfile,
104+
self.dir_path
105+
.join(format!("shard_{:04x}-{:04x}", shard_start, midpoint)),
106+
)?;
107+
std::fs::rename(
108+
topfile,
109+
self.dir_path
110+
.join(format!("shard_{:04x}-{:04x}", midpoint, shard_end)),
111+
)?;
112+
std::fs::remove_file(
113+
self.dir_path
114+
.join(format!("shard_{:04x}-{:04x}", shard_start, shard_end)),
115+
)
116+
.unwrap();
117+
118+
guard.insert(midpoint, bottom_shard);
119+
guard.insert(shard_end, top_shard);
120+
121+
Ok(true)
122+
}
123+
124+
fn try_insert(
125+
&self,
126+
ph: PartedHash,
127+
key: &[u8],
128+
val: &[u8],
129+
) -> Result<(InsertStatus, u32, u32)> {
130+
let guard = self.shards.read().unwrap();
131+
let cursor = guard.lower_bound(Bound::Excluded(&(ph.shard_selector as u32)));
132+
let shard_start = cursor
133+
.peek_prev()
134+
.map(|(&shard_start, _)| shard_start)
135+
.unwrap_or(0);
136+
let (shard_end, shard) = cursor.peek_next().unwrap();
137+
let status = shard.insert(ph, key, val)?;
138+
139+
Ok((status, shard_start, *shard_end))
140+
}
141+
142+
pub(crate) fn insert_internal(&self, ph: PartedHash, key: &[u8], val: &[u8]) -> Result<()> {
143+
if key.len() > u16::MAX as usize {
144+
return Err(Box::new(VickyError::KeyTooLong));
145+
}
146+
if val.len() > u16::MAX as usize {
147+
return Err(Box::new(VickyError::ValueTooLong));
148+
}
149+
150+
loop {
151+
let (status, shard_start, shard_end) = self.try_insert(ph, key, val)?;
152+
153+
match status {
154+
InsertStatus::Added => {
155+
self.num_entries.fetch_add(1, Ordering::SeqCst);
156+
return Ok(());
157+
}
158+
InsertStatus::Replaced => {
159+
return Ok(());
160+
}
161+
InsertStatus::CompactionNeeded(write_offset) => {
162+
self.compact(shard_end, write_offset)?;
163+
// retry
164+
}
165+
InsertStatus::SplitNeeded => {
166+
self.split(shard_start, shard_end)?;
167+
// retry
168+
}
169+
}
170+
}
171+
}
172+
173+
/// Inserts a key-value pair, creating it or replacing an existing pair. Note that if the program crashed
174+
/// while or "right after" this operation, or if the operating system is unable to flush the page cache,
175+
/// you may lose some data. However, you will still be in a consistent state, where you will get a previous
176+
/// version of the state.
177+
///
178+
/// While this method is O(1) amortized, every so often it will trigger either a shard compaction or a
179+
/// shard split, which requires rewriting the whole shard.
180+
pub fn insert<B1: AsRef<[u8]> + ?Sized, B2: AsRef<[u8]> + ?Sized>(
181+
&self,
182+
key: &B1,
183+
val: &B2,
184+
) -> Result<()> {
185+
let ph = PartedHash::from_buffer(USER_NAMESPACE, &self.config.secret_key, key.as_ref());
186+
self.insert_internal(ph, key.as_ref(), val.as_ref())
187+
}
188+
189+
/// Modifies an existing entry in-place, instead of creating a version. Note that the key must exist
190+
/// and `patch.len() + patch_offset` must be less than or equal to the current value's length.
191+
///
192+
/// This is operation is NOT crash-safe as it overwrites existing data, and thus may produce inconsistent
193+
/// results on crashes (reading part old data, part new data).
194+
///
195+
/// This method will never trigger a shard split or a shard compaction.
196+
pub fn modify_inplace<B1: AsRef<[u8]> + ?Sized, B2: AsRef<[u8]> + ?Sized>(
197+
&self,
198+
key: &B1,
199+
patch: &B2,
200+
patch_offset: usize,
201+
) -> Result<()> {
202+
let key = key.as_ref();
203+
let patch = patch.as_ref();
204+
let ph = PartedHash::from_buffer(USER_NAMESPACE, &self.config.secret_key, key);
205+
self.shards
206+
.read()
207+
.unwrap()
208+
.lower_bound(Bound::Excluded(&(ph.shard_selector as u32)))
209+
.peek_next()
210+
.unwrap()
211+
.1
212+
.modify_inplace(ph, key, patch, patch_offset)
213+
}
214+
}

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#![feature(btree_cursors)]
22

33
mod hashing;
4+
mod insert;
45
mod shard;
56
mod store;
67
mod typed;

0 commit comments

Comments
 (0)