Skip to content

Commit

Permalink
feat(state-sync): save state sync header to external storage (#10458)
Browse files Browse the repository at this point in the history
  • Loading branch information
VanBarbascu authored Jan 24, 2024
1 parent b0df7f6 commit 1367881
Show file tree
Hide file tree
Showing 7 changed files with 274 additions and 65 deletions.
2 changes: 1 addition & 1 deletion chain/client/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,7 @@ pub(crate) static STATE_SYNC_DUMP_PUT_OBJECT_ELAPSED: Lazy<HistogramVec> = Lazy:
try_create_histogram_vec(
"near_state_sync_dump_put_object_elapsed_sec",
"Latency of writes to external storage",
&["shard_id", "result"],
&["shard_id", "result", "type"],
Some(exponential_buckets(0.001, 1.6, 25).unwrap()),
)
.unwrap()
Expand Down
102 changes: 70 additions & 32 deletions chain/client/src/sync/external.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,32 @@ use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;

#[derive(Debug)]
pub enum StateFileType {
StatePart { part_id: u64, num_parts: u64 },
StateHeader,
}

impl ToString for StateFileType {
fn to_string(&self) -> String {
match self {
StateFileType::StatePart { .. } => String::from("part"),
StateFileType::StateHeader => String::from("header"),
}
}
}

impl StateFileType {
pub fn filename(&self) -> String {
match self {
StateFileType::StatePart { part_id, num_parts } => {
format!("state_part_{:06}_of_{:06}", part_id, num_parts)
}
StateFileType::StateHeader => "header".to_string(),
}
}
}

/// Connection to the external storage.
#[derive(Clone)]
pub enum ExternalConnection {
Expand Down Expand Up @@ -78,35 +104,37 @@ impl ExternalConnection {
}
}

/// Uploads the given state part to external storage.
// Wrapper for adding is_ok to the metric labels.
pub async fn put_state_part(
/// Uploads the given state part or header to external storage.
/// Wrapper for adding is_ok to the metric labels.
pub async fn put_file(
&self,
state_part: &[u8],
file_type: StateFileType,
data: &[u8],
shard_id: ShardId,
location: &str,
) -> Result<(), anyhow::Error> {
let instant = Instant::now();
let res = self.put_state_part_impl(state_part, shard_id, location).await;
let res = self.put_file_impl(&file_type, data, shard_id, location).await;
let is_ok = if res.is_ok() { "ok" } else { "error" };
let elapsed = instant.elapsed();
metrics::STATE_SYNC_DUMP_PUT_OBJECT_ELAPSED
.with_label_values(&[&shard_id.to_string(), is_ok])
.with_label_values(&[&shard_id.to_string(), is_ok, &file_type.to_string()])
.observe(elapsed.as_secs_f64());
res
}

// Actual implementation.
async fn put_state_part_impl(
/// Actual implementation.
async fn put_file_impl(
&self,
state_part: &[u8],
file_type: &StateFileType,
data: &[u8],
shard_id: ShardId,
location: &str,
) -> Result<(), anyhow::Error> {
match self {
ExternalConnection::S3 { bucket } => {
bucket.put_object(&location, state_part).await?;
tracing::debug!(target: "state_sync_dump", shard_id, part_length = state_part.len(), ?location, "Wrote a state part to S3");
bucket.put_object(&location, data).await?;
tracing::debug!(target: "state_sync_dump", shard_id, part_length = data.len(), ?location, ?file_type, "Wrote a state part to S3");
Ok(())
}
ExternalConnection::Filesystem { root_dir } => {
Expand All @@ -115,16 +143,16 @@ impl ExternalConnection {
std::fs::create_dir_all(parent_dir)?;
}
let mut file = std::fs::OpenOptions::new().write(true).create(true).open(&path)?;
file.write_all(state_part)?;
tracing::debug!(target: "state_sync_dump", shard_id, part_length = state_part.len(), ?location, "Wrote a state part to a file");
file.write_all(data)?;
tracing::debug!(target: "state_sync_dump", shard_id, part_length = data.len(), ?location, ?file_type, "Wrote a state part to a file");
Ok(())
}
ExternalConnection::GCS { gcs_client, bucket, .. } => {
gcs_client
.object()
.create(bucket, state_part.to_vec(), location, "application/octet-stream")
.create(bucket, data.to_vec(), location, "application/octet-stream")
.await?;
tracing::debug!(target: "state_sync_dump", shard_id, part_length = state_part.len(), ?location, "Wrote a state part to GCS");
tracing::debug!(target: "state_sync_dump", shard_id, part_length = data.len(), ?location, ?file_type, "Wrote a state part to GCS");
Ok(())
}
}
Expand All @@ -141,7 +169,7 @@ impl ExternalConnection {
/// When using GCS external connection, this function requires credentials.
/// Thus, this function shouldn't be used for sync node that is expected to operate anonymously.
/// Only dump nodes should use this function.
pub async fn list_state_parts(
pub async fn list_objects(
&self,
shard_id: ShardId,
directory_path: &str,
Expand Down Expand Up @@ -201,19 +229,18 @@ impl ExternalConnection {
}
}

/// Construct a location on the external storage.
/// Construct the state file location on the external storage.
pub fn external_storage_location(
chain_id: &str,
epoch_id: &EpochId,
epoch_height: u64,
shard_id: u64,
part_id: u64,
num_parts: u64,
file_type: &StateFileType,
) -> String {
format!(
"{}/{}",
location_prefix(chain_id, epoch_height, epoch_id, shard_id),
part_filename(part_id, num_parts)
location_prefix(chain_id, epoch_height, epoch_id, shard_id, file_type),
file_type.filename()
)
}

Expand All @@ -222,20 +249,28 @@ pub fn external_storage_location_directory(
epoch_id: &EpochId,
epoch_height: u64,
shard_id: u64,
obj_type: &StateFileType,
) -> String {
location_prefix(chain_id, epoch_height, epoch_id, shard_id)
location_prefix(chain_id, epoch_height, epoch_id, shard_id, obj_type)
}

pub fn location_prefix(
chain_id: &str,
epoch_height: u64,
epoch_id: &EpochId,
shard_id: u64,
obj_type: &StateFileType,
) -> String {
format!(
"chain_id={}/epoch_height={}/epoch_id={}/shard_id={}",
chain_id, epoch_height, epoch_id.0, shard_id
)
match obj_type {
StateFileType::StatePart { .. } => format!(
"chain_id={}/epoch_height={}/epoch_id={}/shard_id={}",
chain_id, epoch_height, epoch_id.0, shard_id
),
StateFileType::StateHeader => format!(
"chain_id={}/epoch_height={}/epoch_id={}/headers/shard_id={}",
chain_id, epoch_height, epoch_id.0, shard_id
),
}
}

pub fn part_filename(part_id: u64, num_parts: u64) -> String {
Expand Down Expand Up @@ -328,8 +363,8 @@ fn create_bucket(
#[cfg(test)]
mod test {
use crate::sync::external::{
get_num_parts_from_filename, get_part_id_from_filename, is_part_filename, part_filename,
ExternalConnection,
get_num_parts_from_filename, get_part_id_from_filename, is_part_filename,
ExternalConnection, StateFileType,
};
use near_o11y::testonly::init_test_logger;
use rand::distributions::{Alphanumeric, DistString};
Expand All @@ -340,7 +375,7 @@ mod test {

#[test]
fn test_match_filename() {
let filename = part_filename(5, 15);
let filename = StateFileType::StatePart { part_id: 5, num_parts: 15 }.filename();
assert!(is_part_filename(&filename));
assert!(!is_part_filename("123123"));

Expand Down Expand Up @@ -379,17 +414,20 @@ mod test {
// Directory resembles real usecase.
let dir = "test_folder/chain_id=test/epoch_height=1/epoch_id=test/shard_id=0".to_string();
let full_filename = format!("{}/{}", dir, filename);
let file_type = StateFileType::StatePart { part_id: 1, num_parts: 1 };

// Before uploading we shouldn't see filename in the list of files.
let files = rt.block_on(async { connection.list_state_parts(0, &dir).await.unwrap() });
let files = rt.block_on(async { connection.list_objects(0, &dir).await.unwrap() });
tracing::debug!("Files before upload: {:?}", files);
assert_eq!(files.into_iter().filter(|x| *x == filename).collect::<Vec<String>>().len(), 0);

// Uploading the file.
rt.block_on(async { connection.put_state_part(&data, 0, &full_filename).await.unwrap() });
rt.block_on(async {
connection.put_file(file_type, &data, 0, &full_filename).await.unwrap()
});

// After uploading we should see filename in the list of files.
let files = rt.block_on(async { connection.list_state_parts(0, &dir).await.unwrap() });
let files = rt.block_on(async { connection.list_objects(0, &dir).await.unwrap() });
tracing::debug!("Files after upload: {:?}", files);
assert_eq!(files.into_iter().filter(|x| *x == filename).collect::<Vec<String>>().len(), 1);

Expand Down
11 changes: 9 additions & 2 deletions chain/client/src/sync/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ use std::time::Duration as TimeDuration;
use tokio::sync::{Semaphore, TryAcquireError};
use tracing::info;

use super::external::StateFileType;

/// Maximum number of state parts to request per peer on each round when node is trying to download the state.
pub const MAX_STATE_PART_REQUEST: u64 = 16;
/// Number of state parts already requested stored as pending.
Expand Down Expand Up @@ -1056,8 +1058,13 @@ fn request_part_from_external_storage(
download.state_requests_count += 1;
download.last_target = None;

let location =
external_storage_location(chain_id, epoch_id, epoch_height, shard_id, part_id, num_parts);
let location = external_storage_location(
chain_id,
epoch_id,
epoch_height,
shard_id,
&StateFileType::StatePart { part_id, num_parts },
);

match semaphore.try_acquire_owned() {
Ok(permit) => {
Expand Down
8 changes: 3 additions & 5 deletions integration-tests/src/tests/client/state_dump.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use near_chain::near_chain_primitives::error::QueryError;
use near_chain::{ChainGenesis, ChainStoreAccess, Provenance};
use near_chain_configs::ExternalStorageLocation::Filesystem;
use near_chain_configs::{DumpConfig, Genesis};
use near_client::sync::external::external_storage_location;
use near_client::sync::external::{external_storage_location, StateFileType};
use near_client::test_utils::TestEnv;
use near_client::ProcessTxResponse;
use near_crypto::{InMemorySigner, KeyType, Signer};
Expand Down Expand Up @@ -95,8 +95,7 @@ fn test_state_dump() {
&epoch_id,
epoch_height,
shard_id,
part_id,
num_parts,
&StateFileType::StatePart { part_id, num_parts },
));
if std::fs::read(&path).is_err() {
tracing::info!("Missing {:?}", path);
Expand Down Expand Up @@ -268,8 +267,7 @@ fn run_state_sync_with_dumped_parts(
&epoch_id,
epoch_height,
shard_id,
part_id,
num_parts,
&StateFileType::StatePart { part_id, num_parts },
));
if std::fs::read(&path).is_err() {
tracing::info!("dumping node: Missing {:?}", path);
Expand Down
Loading

0 comments on commit 1367881

Please sign in to comment.