Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prover Limit Concurrency #53

Merged
merged 2 commits into from
Jan 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion preprocessor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ pub async fn get_block_header<C: ClientTypes>(
}

pub async fn light_client_update_to_args<S: Spec>(
update: &mut LightClientUpdateCapella<
update: &LightClientUpdateCapella<
{ S::SYNC_COMMITTEE_SIZE },
{ S::SYNC_COMMITTEE_ROOT_INDEX },
{ S::SYNC_COMMITTEE_DEPTH },
Expand Down
4 changes: 2 additions & 2 deletions preprocessor/src/rotation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ where
slot, period
);

let mut update = get_light_client_update_at_period(client, period).await?;
rotation_args_from_update(&mut update).await
let update = get_light_client_update_at_period(client, period).await?;
rotation_args_from_update(&update).await
}

/// Converts a [`LightClientUpdateCapella`] to a [`CommitteeUpdateArgs`] witness.
Expand Down
4 changes: 4 additions & 0 deletions prover/src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ pub enum BaseCmd {
/// Path to directory with circuit artifacts
#[clap(long, short, default_value = "./build")]
build_dir: PathBuf,

/// How many proofs can be run at the same tome
#[clap(long, short, default_value = "1")]
concurrency: usize,
},
/// Circuit related commands.
Circuit {
Expand Down
3 changes: 3 additions & 0 deletions prover/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@ async fn app(options: Cli) -> eyre::Result<()> {
port,
spec,
build_dir,
concurrency,
} => {
match spec {
args::Spec::Testnet => {
run_rpc::<eth_types::Testnet>(
port.parse().unwrap(),
options.args.config_dir,
build_dir,
concurrency,
)
.await
}
Expand All @@ -39,6 +41,7 @@ async fn app(options: Cli) -> eyre::Result<()> {
port.parse().unwrap(),
options.args.config_dir,
build_dir,
concurrency,
)
.await
}
Expand Down
6 changes: 5 additions & 1 deletion prover/src/prover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ use lightclient_circuits::util::AppCircuit;
use snark_verifier_sdk::halo2::aggregation::AggregationCircuit;
use std::collections::BTreeMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tokio::sync::Semaphore;

#[derive(Clone, Debug, Getters)]
pub struct CircuitContext {
Expand All @@ -35,10 +37,11 @@ pub struct ProverState {
pub step_verifier: CircuitContext,
pub committee_update: CircuitContext,
pub committee_update_verifier: CircuitContext,
pub concurrency: Arc<Semaphore>,
}

impl ProverState {
pub fn new<S: Spec>(config_dir: &Path, build_dir: &Path) -> Self {
pub fn new<S: Spec>(config_dir: &Path, build_dir: &Path, concurrency: usize) -> Self {
let mut params_map = BTreeMap::new();

fn load_ctx<Circuit: AppCircuit>(
Expand Down Expand Up @@ -109,6 +112,7 @@ impl ProverState {
vec![committee_update_snark],
),
params: params_map,
concurrency: Arc::new(Semaphore::new(concurrency)),
}
}
}
24 changes: 19 additions & 5 deletions prover/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use std::path::{Path, PathBuf};
use std::sync::Arc;

pub type JsonRpcServerState = Arc<JsonRpcServer<JsonRpcMapRouter>>;

use crate::rpc_api::{
CommitteeUpdateEvmProofResult, GenProofCommitteeUpdateParams, GenProofStepParams,
SyncStepCompressedEvmProofResult, RPC_EVM_PROOF_COMMITTEE_UPDATE_CIRCUIT_COMPRESSED,
Expand Down Expand Up @@ -52,6 +51,7 @@ where
)
.finish_unwrapped()
}

pub(crate) async fn gen_evm_proof_committee_update_handler<S: eth_types::Spec>(
Data(state): Data<ProverState>,
Params(params): Params<GenProofCommitteeUpdateParams>,
Expand All @@ -65,12 +65,19 @@ where
[(); S::SYNC_COMMITTEE_DEPTH]:,
[(); S::FINALIZED_HEADER_INDEX]:,
{
if let Err(e) = state.concurrency.clone().acquire_owned().await {
return Err(JsonRpcError::internal(format!(
"Failed to acquire concurrency lock: {}",
e
)));
};

let GenProofCommitteeUpdateParams {
light_client_update,
} = params;

let mut update = ssz_rs::deserialize(&light_client_update)?;
let witness = rotation_args_from_update(&mut update).await?;
let update = ssz_rs::deserialize(&light_client_update)?;
let witness = rotation_args_from_update(&update).await?;
let params = state.params.get(state.committee_update.degree()).unwrap();

let snark = gen_uncompressed_snark::<CommitteeUpdateCircuit<S, Fr>>(
Expand Down Expand Up @@ -122,6 +129,12 @@ where
[(); S::BYTES_PER_LOGS_BLOOM]:,
[(); S::MAX_EXTRA_DATA_BYTES]:,
{
if let Err(e) = state.concurrency.clone().acquire_owned().await {
return Err(JsonRpcError::internal(format!(
"Failed to acquire concurrency lock: {}",
e
)));
};
let GenProofStepParams {
light_client_finality_update,
domain,
Expand Down Expand Up @@ -186,6 +199,7 @@ pub async fn run_rpc<S: eth_types::Spec>(
port: usize,
config_dir: impl AsRef<Path>,
build_dir: impl AsRef<Path>,
concurrency: usize,
) -> Result<(), eyre::Error>
where
[(); S::SYNC_COMMITTEE_SIZE]:,
Expand All @@ -198,10 +212,9 @@ where
{
let tcp_listener = tokio::net::TcpListener::bind(format!("0.0.0.0:{}", port)).await?;
let timer = start_timer!(|| "Load proving keys");
let state = ProverState::new::<S>(config_dir.as_ref(), build_dir.as_ref());
let state = ProverState::new::<S>(config_dir.as_ref(), build_dir.as_ref(), concurrency);
end_timer!(timer);
let rpc_server = Arc::new(jsonrpc_server::<S>(state));

let router = Router::new()
.route("/rpc", post(handler))
.with_state(rpc_server);
Expand All @@ -218,6 +231,7 @@ async fn handler(
axum::Json(rpc_call): axum::Json<JsonRpcRequestObject>,
) -> impl IntoResponse {
let response_headers = [("content-type", "application/json-rpc;charset=utf-8")];

log::debug!("RPC request with method: {}", rpc_call.method_ref());

let response = rpc_server.handle(rpc_call).await;
Expand Down
Loading