Skip to content

Commit

Permalink
Turn BlockCursor::{read_blob,read_blob_into_buf} async fn (neondataba…
Browse files Browse the repository at this point in the history
…se#4905)

## Problem

The `BlockCursor::read_blob` and `BlockCursor::read_blob_into_buf`
functions are calling `read_blk` internally, so if we want to make that
function async fn, they need to be async themselves.

## Summary of changes

* We first turn `ValueRef::load` into an async fn.
* Then, we switch the `RwLock` implementation in `InMemoryLayer` to use
the one from `tokio`.
* Last, we convert the `read_blob` and `read_blob_into_buf` functions
into async fn.

In three instances we use `Handle::block_on`:

* one use is in compaction code, which currently isn't async. We put the
entire loop into an `async` block to prevent the potentially hot loop
from doing cross-thread operations.
* one use is in dumping code for `DeltaLayer`. The "proper" way to
address this would be to enable the visit function to take async
closures, but then we'd need to be generic over async fs non async,
which [isn't supported by rust right
now](https://blog.rust-lang.org/inside-rust/2022/07/27/keyword-generics.html).
The other alternative would be to do a first pass where we cache the
data into memory, and only then to dump it.
* the third use is in writing code, inside a loop that copies from one
file to another. It is is synchronous and we'd like to keep it that way
(for now?).

Part of neondatabase#4743
  • Loading branch information
arpad-m committed Aug 14, 2023
1 parent ef4a76c commit ce7efbe
Show file tree
Hide file tree
Showing 8 changed files with 152 additions and 125 deletions.
2 changes: 1 addition & 1 deletion pageserver/ctl/src/layers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ async fn read_delta_file(path: impl AsRef<Path>) -> Result<()> {
.await?;
let cursor = BlockCursor::new(&file);
for (k, v) in all {
let value = cursor.read_blob(v.pos())?;
let value = cursor.read_blob(v.pos()).await?;
println!("key:{} value_len:{}", k, value.len());
}
// TODO(chi): special handling for last key?
Expand Down
6 changes: 3 additions & 3 deletions pageserver/src/tenant/blob_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ where
R: BlockReader,
{
/// Read a blob into a new buffer.
pub fn read_blob(&self, offset: u64) -> Result<Vec<u8>, std::io::Error> {
pub async fn read_blob(&self, offset: u64) -> Result<Vec<u8>, std::io::Error> {
let mut buf = Vec::new();
self.read_blob_into_buf(offset, &mut buf)?;
self.read_blob_into_buf(offset, &mut buf).await?;
Ok(buf)
}
/// Read blob into the given buffer. Any previous contents in the buffer
/// are overwritten.
pub fn read_blob_into_buf(
pub async fn read_blob_into_buf(
&self,
offset: u64,
dstbuf: &mut Vec<u8>,
Expand Down
23 changes: 16 additions & 7 deletions pageserver/src/tenant/ephemeral_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -401,17 +401,26 @@ mod tests {
Ok(())
}

#[test]
fn test_ephemeral_blobs() -> Result<(), io::Error> {
#[tokio::test]
async fn test_ephemeral_blobs() -> Result<(), io::Error> {
let (conf, tenant_id, timeline_id) = harness("ephemeral_blobs")?;

let mut file = EphemeralFile::create(conf, tenant_id, timeline_id)?;

let pos_foo = file.write_blob(b"foo")?;
assert_eq!(b"foo", file.block_cursor().read_blob(pos_foo)?.as_slice());
assert_eq!(
b"foo",
file.block_cursor().read_blob(pos_foo).await?.as_slice()
);
let pos_bar = file.write_blob(b"bar")?;
assert_eq!(b"foo", file.block_cursor().read_blob(pos_foo)?.as_slice());
assert_eq!(b"bar", file.block_cursor().read_blob(pos_bar)?.as_slice());
assert_eq!(
b"foo",
file.block_cursor().read_blob(pos_foo).await?.as_slice()
);
assert_eq!(
b"bar",
file.block_cursor().read_blob(pos_bar).await?.as_slice()
);

let mut blobs = Vec::new();
for i in 0..10000 {
Expand All @@ -428,7 +437,7 @@ mod tests {

let cursor = BlockCursor::new(&file);
for (pos, expected) in blobs {
let actual = cursor.read_blob(pos)?;
let actual = cursor.read_blob(pos).await?;
assert_eq!(actual, expected);
}

Expand All @@ -437,7 +446,7 @@ mod tests {
large_data.resize(20000, 0);
thread_rng().fill_bytes(&mut large_data);
let pos_large = file.write_blob(&large_data)?;
let result = file.block_cursor().read_blob(pos_large)?;
let result = file.block_cursor().read_blob(pos_large).await?;
assert_eq!(result, large_data);

Ok(())
Expand Down
24 changes: 14 additions & 10 deletions pageserver/src/tenant/storage_layer/delta_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ use std::ops::Range;
use std::os::unix::fs::FileExt;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tokio::runtime::Handle;
use tokio::sync::OnceCell;
use tracing::*;

Expand Down Expand Up @@ -280,7 +281,8 @@ impl Layer for DeltaLayer {

// A subroutine to dump a single blob
let dump_blob = |blob_ref: BlobRef| -> anyhow::Result<String> {
let buf = cursor.read_blob(blob_ref.pos())?;
// TODO this is not ideal, but on the other hand we are in dumping code...
let buf = Handle::current().block_on(cursor.read_blob(blob_ref.pos()))?;
let val = Value::des(&buf)?;
let desc = match val {
Value::Image(img) => {
Expand Down Expand Up @@ -335,7 +337,6 @@ impl Layer for DeltaLayer {
let inner = self
.load(LayerAccessKind::GetValueReconstructData, ctx)
.await?;

inner
.get_value_reconstruct_data(key, lsn_range, reconstruct_state)
.await
Expand Down Expand Up @@ -912,12 +913,15 @@ impl DeltaLayerInner {
let cursor = file.block_cursor();
let mut buf = Vec::new();
for (entry_lsn, pos) in offsets {
cursor.read_blob_into_buf(pos, &mut buf).with_context(|| {
format!(
"Failed to read blob from virtual file {}",
file.file.path.display()
)
})?;
cursor
.read_blob_into_buf(pos, &mut buf)
.await
.with_context(|| {
format!(
"Failed to read blob from virtual file {}",
file.file.path.display()
)
})?;
let val = Value::des(&buf).with_context(|| {
format!(
"Failed to deserialize file blob from virtual file {}",
Expand Down Expand Up @@ -1026,9 +1030,9 @@ pub struct ValueRef<T: AsRef<DeltaLayerInner>> {

impl<T: AsRef<DeltaLayerInner>> ValueRef<T> {
/// Loads the value from disk
pub fn load(&self) -> Result<Value> {
pub async fn load(&self) -> Result<Value> {
// theoretically we *could* record an access time for each, but it does not really matter
let buf = self.reader.read_blob(self.blob_ref.pos())?;
let buf = self.reader.read_blob(self.blob_ref.pos()).await?;
let val = Value::des(&buf)?;
Ok(val)
}
Expand Down
1 change: 1 addition & 0 deletions pageserver/src/tenant/storage_layer/image_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,7 @@ impl ImageLayerInner {
let blob = file
.block_cursor()
.read_blob(offset)
.await
.with_context(|| format!("failed to read value from offset {}", offset))?;
let value = Bytes::from(blob);

Expand Down
28 changes: 14 additions & 14 deletions pageserver/src/tenant/storage_layer/inmemory_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use utils::{
// while being able to use std::fmt::Write's methods
use std::fmt::Write as _;
use std::ops::Range;
use std::sync::RwLock;
use tokio::sync::RwLock;

use super::{DeltaLayer, DeltaLayerWriter, Layer};

Expand Down Expand Up @@ -125,7 +125,7 @@ impl Layer for InMemoryLayer {

/// debugging function to print out the contents of the layer
async fn dump(&self, verbose: bool, _ctx: &RequestContext) -> Result<()> {
let inner = self.inner.read().unwrap();
let inner = self.inner.read().await;

let end_str = self.end_lsn_or_max();

Expand All @@ -143,7 +143,7 @@ impl Layer for InMemoryLayer {
for (key, vec_map) in inner.index.iter() {
for (lsn, pos) in vec_map.as_slice() {
let mut desc = String::new();
cursor.read_blob_into_buf(*pos, &mut buf)?;
cursor.read_blob_into_buf(*pos, &mut buf).await?;
let val = Value::des(&buf);
match val {
Ok(Value::Image(img)) => {
Expand Down Expand Up @@ -181,15 +181,15 @@ impl Layer for InMemoryLayer {
ensure!(lsn_range.start >= self.start_lsn);
let mut need_image = true;

let inner = self.inner.read().unwrap();
let inner = self.inner.read().await;

let reader = inner.file.block_cursor();

// Scan the page versions backwards, starting from `lsn`.
if let Some(vec_map) = inner.index.get(&key) {
let slice = vec_map.slice_range(lsn_range);
for (entry_lsn, pos) in slice.iter().rev() {
let buf = reader.read_blob(*pos)?;
let buf = reader.read_blob(*pos).await?;
let value = Value::des(&buf)?;
match value {
Value::Image(img) => {
Expand Down Expand Up @@ -232,8 +232,8 @@ impl InMemoryLayer {
///
/// Get layer size on the disk
///
pub fn size(&self) -> Result<u64> {
let inner = self.inner.read().unwrap();
pub async fn size(&self) -> Result<u64> {
let inner = self.inner.read().await;
Ok(inner.file.size)
}

Expand Down Expand Up @@ -267,9 +267,9 @@ impl InMemoryLayer {

/// Common subroutine of the public put_wal_record() and put_page_image() functions.
/// Adds the page version to the in-memory tree
pub fn put_value(&self, key: Key, lsn: Lsn, val: &Value) -> Result<()> {
pub async fn put_value(&self, key: Key, lsn: Lsn, val: &Value) -> Result<()> {
trace!("put_value key {} at {}/{}", key, self.timeline_id, lsn);
let mut inner = self.inner.write().unwrap();
let mut inner = self.inner.write().await;
self.assert_writable();

let off = {
Expand Down Expand Up @@ -301,8 +301,8 @@ impl InMemoryLayer {
/// Make the layer non-writeable. Only call once.
/// Records the end_lsn for non-dropped layers.
/// `end_lsn` is exclusive
pub fn freeze(&self, end_lsn: Lsn) {
let inner = self.inner.write().unwrap();
pub async fn freeze(&self, end_lsn: Lsn) {
let inner = self.inner.write().await;

assert!(self.start_lsn < end_lsn);
self.end_lsn.set(end_lsn).expect("end_lsn set only once");
Expand All @@ -317,7 +317,7 @@ impl InMemoryLayer {
/// Write this frozen in-memory layer to disk.
///
/// Returns a new delta layer with all the same data as this in-memory layer
pub fn write_to_disk(&self) -> Result<DeltaLayer> {
pub async fn write_to_disk(&self) -> Result<DeltaLayer> {
// Grab the lock in read-mode. We hold it over the I/O, but because this
// layer is not writeable anymore, no one should be trying to acquire the
// write lock on it, so we shouldn't block anyone. There's one exception
Expand All @@ -327,7 +327,7 @@ impl InMemoryLayer {
// lock, it will see that it's not writeable anymore and retry, but it
// would have to wait until we release it. That race condition is very
// rare though, so we just accept the potential latency hit for now.
let inner = self.inner.read().unwrap();
let inner = self.inner.read().await;

let end_lsn = *self.end_lsn.get().unwrap();

Expand All @@ -350,7 +350,7 @@ impl InMemoryLayer {
let key = **key;
// Write all page versions
for (lsn, pos) in vec_map.as_slice() {
cursor.read_blob_into_buf(*pos, &mut buf)?;
cursor.read_blob_into_buf(*pos, &mut buf).await?;
let will_init = Value::des(&buf)?.will_init();
delta_layer_writer.put_value_bytes(key, *lsn, &buf, will_init)?;
}
Expand Down
Loading

0 comments on commit ce7efbe

Please sign in to comment.