Skip to content

Commit

Permalink
Prover Limit Concurrency (#53)
Browse files Browse the repository at this point in the history
* Limit Concurrent Proof Gen

* refactor
  • Loading branch information
ec2 authored Jan 9, 2024
1 parent 2102a86 commit 1697d63
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 9 deletions.
2 changes: 1 addition & 1 deletion preprocessor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ pub async fn get_block_header<C: ClientTypes>(
}

pub async fn light_client_update_to_args<S: Spec>(
update: &mut LightClientUpdateCapella<
update: &LightClientUpdateCapella<
{ S::SYNC_COMMITTEE_SIZE },
{ S::SYNC_COMMITTEE_ROOT_INDEX },
{ S::SYNC_COMMITTEE_DEPTH },
Expand Down
4 changes: 2 additions & 2 deletions preprocessor/src/rotation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ where
slot, period
);

let mut update = get_light_client_update_at_period(client, period).await?;
rotation_args_from_update(&mut update).await
let update = get_light_client_update_at_period(client, period).await?;
rotation_args_from_update(&update).await
}

/// Converts a [`LightClientUpdateCapella`] to a [`CommitteeUpdateArgs`] witness.
Expand Down
4 changes: 4 additions & 0 deletions prover/src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ pub enum BaseCmd {
/// Path to directory with circuit artifacts
#[clap(long, short, default_value = "./build")]
build_dir: PathBuf,

/// How many proofs can be run at the same tome
#[clap(long, short, default_value = "1")]
concurrency: usize,
},
/// Circuit related commands.
Circuit {
Expand Down
3 changes: 3 additions & 0 deletions prover/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@ async fn app(options: Cli) -> eyre::Result<()> {
port,
spec,
build_dir,
concurrency,
} => {
match spec {
args::Spec::Testnet => {
run_rpc::<eth_types::Testnet>(
port.parse().unwrap(),
options.args.config_dir,
build_dir,
concurrency,
)
.await
}
Expand All @@ -39,6 +41,7 @@ async fn app(options: Cli) -> eyre::Result<()> {
port.parse().unwrap(),
options.args.config_dir,
build_dir,
concurrency,
)
.await
}
Expand Down
6 changes: 5 additions & 1 deletion prover/src/prover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ use lightclient_circuits::util::AppCircuit;
use snark_verifier_sdk::halo2::aggregation::AggregationCircuit;
use std::collections::BTreeMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tokio::sync::Semaphore;

#[derive(Clone, Debug, Getters)]
pub struct CircuitContext {
Expand All @@ -35,10 +37,11 @@ pub struct ProverState {
pub step_verifier: CircuitContext,
pub committee_update: CircuitContext,
pub committee_update_verifier: CircuitContext,
pub concurrency: Arc<Semaphore>,
}

impl ProverState {
pub fn new<S: Spec>(config_dir: &Path, build_dir: &Path) -> Self {
pub fn new<S: Spec>(config_dir: &Path, build_dir: &Path, concurrency: usize) -> Self {
let mut params_map = BTreeMap::new();

fn load_ctx<Circuit: AppCircuit>(
Expand Down Expand Up @@ -109,6 +112,7 @@ impl ProverState {
vec![committee_update_snark],
),
params: params_map,
concurrency: Arc::new(Semaphore::new(concurrency)),
}
}
}
24 changes: 19 additions & 5 deletions prover/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use std::path::{Path, PathBuf};
use std::sync::Arc;

pub type JsonRpcServerState = Arc<JsonRpcServer<JsonRpcMapRouter>>;

use crate::rpc_api::{
CommitteeUpdateEvmProofResult, GenProofCommitteeUpdateParams, GenProofStepParams,
SyncStepCompressedEvmProofResult, RPC_EVM_PROOF_COMMITTEE_UPDATE_CIRCUIT_COMPRESSED,
Expand Down Expand Up @@ -52,6 +51,7 @@ where
)
.finish_unwrapped()
}

pub(crate) async fn gen_evm_proof_committee_update_handler<S: eth_types::Spec>(
Data(state): Data<ProverState>,
Params(params): Params<GenProofCommitteeUpdateParams>,
Expand All @@ -65,12 +65,19 @@ where
[(); S::SYNC_COMMITTEE_DEPTH]:,
[(); S::FINALIZED_HEADER_INDEX]:,
{
if let Err(e) = state.concurrency.clone().acquire_owned().await {
return Err(JsonRpcError::internal(format!(
"Failed to acquire concurrency lock: {}",
e
)));
};

let GenProofCommitteeUpdateParams {
light_client_update,
} = params;

let mut update = ssz_rs::deserialize(&light_client_update)?;
let witness = rotation_args_from_update(&mut update).await?;
let update = ssz_rs::deserialize(&light_client_update)?;
let witness = rotation_args_from_update(&update).await?;
let params = state.params.get(state.committee_update.degree()).unwrap();

let snark = gen_uncompressed_snark::<CommitteeUpdateCircuit<S, Fr>>(
Expand Down Expand Up @@ -122,6 +129,12 @@ where
[(); S::BYTES_PER_LOGS_BLOOM]:,
[(); S::MAX_EXTRA_DATA_BYTES]:,
{
if let Err(e) = state.concurrency.clone().acquire_owned().await {
return Err(JsonRpcError::internal(format!(
"Failed to acquire concurrency lock: {}",
e
)));
};
let GenProofStepParams {
light_client_finality_update,
domain,
Expand Down Expand Up @@ -186,6 +199,7 @@ pub async fn run_rpc<S: eth_types::Spec>(
port: usize,
config_dir: impl AsRef<Path>,
build_dir: impl AsRef<Path>,
concurrency: usize,
) -> Result<(), eyre::Error>
where
[(); S::SYNC_COMMITTEE_SIZE]:,
Expand All @@ -198,10 +212,9 @@ where
{
let tcp_listener = tokio::net::TcpListener::bind(format!("0.0.0.0:{}", port)).await?;
let timer = start_timer!(|| "Load proving keys");
let state = ProverState::new::<S>(config_dir.as_ref(), build_dir.as_ref());
let state = ProverState::new::<S>(config_dir.as_ref(), build_dir.as_ref(), concurrency);
end_timer!(timer);
let rpc_server = Arc::new(jsonrpc_server::<S>(state));

let router = Router::new()
.route("/rpc", post(handler))
.with_state(rpc_server);
Expand All @@ -218,6 +231,7 @@ async fn handler(
axum::Json(rpc_call): axum::Json<JsonRpcRequestObject>,
) -> impl IntoResponse {
let response_headers = [("content-type", "application/json-rpc;charset=utf-8")];

log::debug!("RPC request with method: {}", rpc_call.method_ref());

let response = rpc_server.handle(rpc_call).await;
Expand Down

0 comments on commit 1697d63

Please sign in to comment.