Skip to content

Commit

Permalink
fix: ensure WAL is atomic for each write batch
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Chi <[email protected]>
  • Loading branch information
skyzh committed Jul 3, 2024
1 parent 6d16ae2 commit 3121289
Show file tree
Hide file tree
Showing 9 changed files with 124 additions and 38 deletions.
2 changes: 1 addition & 1 deletion mini-lsm-book/src/week2-07-snacks.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ We will do a per-record checksum in the write-ahead log. To do this, you have tw
* Generate a buffer of the key-value record, and use `crc32fast::hash` to compute the checksum at once.
* Write one field at a time (e.g., key length, key slice), and use a `crc32fast::Hasher` to compute the checksum incrementally on each field.

This is up to your choice and you will need to *choose your own adventure*. The new WAL encoding should be like:
This is up to your choice and you will need to *choose your own adventure*. Both method should produce exactly the same result, as long as you handle little endian / big endian correctly. The new WAL encoding should be like:

```
| key_len | key | value_len | value | checksum |
Expand Down
6 changes: 6 additions & 0 deletions mini-lsm-book/src/week3-02-snapshot-read-part-1.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,12 @@ It should now store `(KeyBytes, Bytes)` and the return key type should be `KeySl

Write-ahead log should now accept a key slice instead of a user key slice. When serializing and deserializing the WAL record, you should put timestamp into the WAL file and do checksum over the timestamp and all other fields you had before.

The WAL format is as follows:

```
| key_len (exclude ts len) (u16) | key | ts (u64) | value_len (u16) | value | checksum (u32) |
```

**LsmStorageInner::get**

Previously, we implement `get` as first probe the memtables and then scan the SSTs. Now that we change the memtable to use the new key-ts APIs, we will need to re-implement the `get` interface. The easiest way to do this is to create a merge iterator over everything we have -- memtables, immutable memtables, L0 SSTs, and other level SSTs, the same as what you have done in `scan`, except that we do a bloom filter filtering over the SSTs.
Expand Down
29 changes: 29 additions & 0 deletions mini-lsm-book/src/week3-05-txn-occ.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,40 @@ We assume that a transaction will only be used on a single thread. Once your tra

Your commit implementation should simply collect all key-value pairs from the local storage and submit a write batch to the storage engine.

## Task 4: Atomic WAL

In this task, you will need to modify:

```
src/wal.rs
src/mem_table.rs
```

Note that `commit` involves producing a write batch, and for now, the write batch does not guarantee atomicity. You will need to change the WAL implementation to produce a header and a footer for the write batch.

The new WAL encoding is as follows:

```
| HEADER | BODY | FOOTER |
| u32 | u16 | var | u64 | u16 | var | ... | u32 |
| batch_size | key_len | key | ts | value_len | value | more key-value pairs ... | checksum |
```

`batch_size` is the size of the `BODY` section. `checksum` is the checksum for the `BODY` section.

There are no test cases to verify your implementation. As long as you pass all existing test cases and implement the above WAL format, everything should be fine.

You should implement `Wal::put_batch` and `MemTable::put_batch`. The original `put` function should treat the
single key-value pair as a batch. That is to say, at this point, your `put` function should call `put_batch`.

A batch should be handled in the same mem table and the same WAL, even if it exceeds the mem table size limit.

## Test Your Understanding

* With all the things we have implemented up to this point, does the system satisfy snapshot isolation? If not, what else do we need to do to support snapshot isolation? (Note: snapshot isolation is different from serializable snapshot isolation we will talk about in the next chapter)
* What if the user wants to batch import data (i.e., 1TB?) If they use the transaction API to do that, will you give them some advice? Is there any opportunity to optimize for this case?
* What is optimistic concurrency control? What would the system be like if we implement pessimistic concurrency control instead in Mini-LSM?
* What happens if your system crashes and leave a corrupted WAL on the disk? How do you handle this situation?

## Bonus Tasks

Expand Down
21 changes: 15 additions & 6 deletions mini-lsm-mvcc/src/mem_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,16 +122,25 @@ impl MemTable {
///
/// In week 1, day 1, simply put the key-value pair into the skipmap.
/// In week 2, day 6, also flush the data to WAL.
/// In week 3, day 5, modify the function to use the batch API.
pub fn put(&self, key: KeySlice, value: &[u8]) -> Result<()> {
let estimated_size = key.raw_len() + value.len();
self.map.insert(
key.to_key_vec().into_key_bytes(),
Bytes::copy_from_slice(value),
);
self.put_batch(&[(key, value)])
}

/// Implement this in week 3, day 5.
pub fn put_batch(&self, data: &[(KeySlice, &[u8])]) -> Result<()> {
let mut estimated_size = 0;
for (key, value) in data {
estimated_size += key.raw_len() + value.len();
self.map.insert(
key.to_key_vec().into_key_bytes(),
Bytes::copy_from_slice(value),
);
}
self.approximate_size
.fetch_add(estimated_size, std::sync::atomic::Ordering::Relaxed);
if let Some(ref wal) = self.wal {
wal.put(key, value)?;
wal.put_batch(data)?;
}
Ok(())
}
Expand Down
82 changes: 51 additions & 31 deletions mini-lsm-mvcc/src/wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,51 +40,71 @@ impl Wal {
file.read_to_end(&mut buf)?;
let mut rbuf: &[u8] = buf.as_slice();
while rbuf.has_remaining() {
let batch_size = rbuf.get_u32() as usize;
if rbuf.remaining() < batch_size {
bail!("incomplete WAL");
}
let mut batch_buf = &rbuf[..batch_size];
let mut kv_pairs = Vec::new();
let mut hasher = crc32fast::Hasher::new();
let key_len = rbuf.get_u16() as usize;
hasher.write_u16(key_len as u16);
let key = Bytes::copy_from_slice(&rbuf[..key_len]);
hasher.write(&key);
rbuf.advance(key_len);
let ts = rbuf.get_u64();
hasher.write_u64(ts);
let value_len = rbuf.get_u16() as usize;
hasher.write_u16(value_len as u16);
let value = Bytes::copy_from_slice(&rbuf[..value_len]);
hasher.write(&value);
rbuf.advance(value_len);
let checksum = rbuf.get_u32();
if hasher.finalize() != checksum {
// The checksum computed from the individual components should be the same as a direct checksum on the buffer.
// Students' implementation only needs to do a single checksum on the buffer. We compute both for verification purpose.
let single_checksum = crc32fast::hash(batch_buf);
while batch_buf.has_remaining() {
let key_len = batch_buf.get_u16() as usize;
hasher.write(&(key_len as u16).to_be_bytes());
let key = Bytes::copy_from_slice(&batch_buf[..key_len]);
hasher.write(&key);
batch_buf.advance(key_len);
let ts = batch_buf.get_u64();
hasher.write(&ts.to_be_bytes());
let value_len = batch_buf.get_u16() as usize;
hasher.write(&(value_len as u16).to_be_bytes());
let value = Bytes::copy_from_slice(&batch_buf[..value_len]);
hasher.write(&value);
kv_pairs.push((key, ts, value));
batch_buf.advance(value_len);
}
let expected_checksum = rbuf.get_u32();
let component_checksum = hasher.finalize();
assert_eq!(component_checksum, single_checksum);
if single_checksum != expected_checksum {
bail!("checksum mismatch");
}
skiplist.insert(KeyBytes::from_bytes_with_ts(key, ts), value);
for (key, ts, value) in kv_pairs {
skiplist.insert(KeyBytes::from_bytes_with_ts(key, ts), value);
}
rbuf.advance(batch_size);
}
Ok(Self {
file: Arc::new(Mutex::new(BufWriter::new(file))),
})
}

pub fn put(&self, key: KeySlice, value: &[u8]) -> Result<()> {
/// Implement this in week 3, day 5.
pub fn put_batch(&self, data: &[(KeySlice, &[u8])]) -> Result<()> {
let mut file = self.file.lock();
let mut buf: Vec<u8> =
Vec::with_capacity(key.raw_len() + value.len() + std::mem::size_of::<u16>());
let mut hasher = crc32fast::Hasher::new();
hasher.write_u16(key.key_len() as u16);
buf.put_u16(key.key_len() as u16);
hasher.write(key.key_ref());
buf.put_slice(key.key_ref());
hasher.write_u64(key.ts());
buf.put_u64(key.ts());
hasher.write_u16(value.len() as u16);
buf.put_u16(value.len() as u16);
buf.put_slice(value);
hasher.write(value);
// add checksum: week 2 day 7
buf.put_u32(hasher.finalize());
let mut buf = Vec::<u8>::new();
for (key, value) in data {
buf.put_u16(key.key_len() as u16);
buf.put_slice(key.key_ref());
buf.put_u64(key.ts());
buf.put_u16(value.len() as u16);
buf.put_slice(value);
}
// write batch_size header (u32)
file.write_all(&(buf.len() as u32).to_be_bytes())?;
// write key-value pairs body
file.write_all(&buf)?;
// write checksum (u32)
file.write_all(&crc32fast::hash(&buf).to_be_bytes())?;
Ok(())
}

pub fn put(&self, key: KeySlice, value: &[u8]) -> Result<()> {
self.put_batch(&[(key, value)])
}

pub fn sync(&self) -> Result<()> {
let mut file = self.file.lock();
file.flush()?;
Expand Down
6 changes: 6 additions & 0 deletions mini-lsm-starter/src/mem_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,16 @@ impl MemTable {
///
/// In week 1, day 1, simply put the key-value pair into the skipmap.
/// In week 2, day 6, also flush the data to WAL.
/// In week 3, day 5, modify the function to use the batch API.
pub fn put(&self, _key: &[u8], _value: &[u8]) -> Result<()> {
unimplemented!()
}

/// Implement this in week 3, day 5.
pub fn put_batch(&self, _data: &[(KeySlice, &[u8])]) -> Result<()> {
unimplemented!()
}

pub fn sync_wal(&self) -> Result<()> {
if let Some(ref wal) = self.wal {
wal.sync()?;
Expand Down
5 changes: 5 additions & 0 deletions mini-lsm-starter/src/wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ impl Wal {
unimplemented!()
}

/// Implement this in week 3, day 5.
pub fn put_batch(&self, _data: &[(&[u8], &[u8])]) -> Result<()> {
unimplemented!()
}

pub fn sync(&self) -> Result<()> {
unimplemented!()
}
Expand Down
6 changes: 6 additions & 0 deletions mini-lsm/src/mem_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ impl MemTable {
///
/// In week 1, day 1, simply put the key-value pair into the skipmap.
/// In week 2, day 6, also flush the data to WAL.
/// In week 3, day 5, modify the function to use the batch API.
pub fn put(&self, key: &[u8], value: &[u8]) -> Result<()> {
let estimated_size = key.len() + value.len();
self.map
Expand All @@ -103,6 +104,11 @@ impl MemTable {
Ok(())
}

/// Implement this in week 3, day 5.
pub fn put_batch(&self, _data: &[(KeySlice, &[u8])]) -> Result<()> {
unimplemented!()
}

pub fn sync_wal(&self) -> Result<()> {
if let Some(ref wal) = self.wal {
wal.sync()?;
Expand Down
5 changes: 5 additions & 0 deletions mini-lsm/src/wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ impl Wal {
Ok(())
}

/// Implement this in week 3, day 5.
pub fn put_batch(&self, _data: &[(&[u8], &[u8])]) -> Result<()> {
unimplemented!()
}

pub fn sync(&self) -> Result<()> {
let mut file = self.file.lock();
file.flush()?;
Expand Down

0 comments on commit 3121289

Please sign in to comment.