diff --git a/Cargo.lock b/Cargo.lock index 81cf8f028..a88c0a83c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3828,6 +3828,7 @@ dependencies = [ "near-sdk", "schemars", "serde", + "serde_json", ] [[package]] diff --git a/contract/Cargo.toml b/contract/Cargo.toml index be244230a..c0faa1b02 100644 --- a/contract/Cargo.toml +++ b/contract/Cargo.toml @@ -10,6 +10,7 @@ crate-type = ["cdylib", "lib"] borsh = "1.3.0" near-sdk = "5.0.0-alpha.1" serde = { version = "1", features = ["derive"] } +serde_json = "1" schemars = "0.8" [profile.release] diff --git a/contract/src/lib.rs b/contract/src/lib.rs index aed56a3aa..049f31c5b 100644 --- a/contract/src/lib.rs +++ b/contract/src/lib.rs @@ -1,8 +1,9 @@ pub mod primitives; use near_sdk::borsh::{self, BorshDeserialize, BorshSerialize}; +use near_sdk::collections::LookupMap; use near_sdk::serde::{Deserialize, Serialize}; -use near_sdk::{env, near_bindgen, AccountId, PanicOnDefault, PublicKey}; +use near_sdk::{env, near_bindgen, AccountId, PanicOnDefault, Promise, PromiseOrValue, PublicKey}; use primitives::{CandidateInfo, Candidates, ParticipantInfo, Participants, PkVotes, Votes}; use std::collections::{BTreeMap, HashSet}; @@ -47,6 +48,7 @@ pub enum ProtocolContractState { #[derive(BorshDeserialize, BorshSerialize, PanicOnDefault)] pub struct MpcContract { protocol_state: ProtocolContractState, + pending_requests: LookupMap<[u8; 32], Option<(String, String)>>, } #[near_bindgen] @@ -59,6 +61,7 @@ impl MpcContract { threshold, pk_votes: PkVotes::new(), }), + pending_requests: LookupMap::new(b"m"), } } @@ -257,12 +260,43 @@ impl MpcContract { } #[allow(unused_variables)] - pub fn sign(&mut self, payload: [u8; 32], path: String) -> [u8; 32] { - near_sdk::env::random_seed_array() + pub fn sign(&mut self, payload: [u8; 32], path: String) -> Promise { + match self.pending_requests.get(&payload) { + None => { + self.pending_requests.insert(&payload, &None); + env::log_str(&serde_json::to_string(&near_sdk::env::random_seed_array()).unwrap()); + Self::ext(env::current_account_id()).sign_helper(payload, 0) + } + Some(_) => env::panic_str("Signature for this payload already requested"), + } } - #[allow(unused_variables)] - pub fn respond(&mut self, receipt_id: [u8; 32], big_r: String, s: String) {} + #[private] + pub fn sign_helper( + &mut self, + payload: [u8; 32], + depth: usize, + ) -> PromiseOrValue<(String, String)> { + if let Some(signature) = self.pending_requests.get(&payload) { + match signature { + Some(signature) => { + self.pending_requests.remove(&payload); + PromiseOrValue::Value(signature) + } + None => { + env::log_str(&format!("not ready yet (depth={})", depth)); + let account_id = env::current_account_id(); + PromiseOrValue::Promise(Self::ext(account_id).sign_helper(payload, depth + 1)) + } + } + } else { + env::panic_str("unexpected request"); + } + } + + pub fn respond(&mut self, payload: [u8; 32], big_r: String, s: String) { + self.pending_requests.insert(&payload, &Some((big_r, s))); + } #[private] #[init(ignore_state)] @@ -272,6 +306,7 @@ impl MpcContract { } Self { protocol_state: ProtocolContractState::NotInitialized, + pending_requests: LookupMap::new(b"m"), } } diff --git a/integration-tests/src/indexer.rs b/integration-tests/src/indexer.rs deleted file mode 100644 index ad310c007..000000000 --- a/integration-tests/src/indexer.rs +++ /dev/null @@ -1,100 +0,0 @@ -use k256::{AffinePoint, Scalar}; -use near_lake_framework::{LakeBuilder, LakeContext}; -use near_lake_primitives::actions::ActionMetaDataExt; -use near_lake_primitives::{receipts::ExecutionStatus, AccountId}; -use near_primitives::hash::CryptoHash; -use serde::{Deserialize, Serialize}; -use std::collections::HashMap; -use std::sync::Arc; -use tokio::sync::RwLock; - -#[derive(Debug, Serialize, Deserialize)] -struct RespondPayload { - receipt_id: [u8; 32], - big_r: AffinePoint, - s: Scalar, -} - -pub struct FullSignature { - pub big_r: AffinePoint, - pub s: Scalar, -} - -#[derive(LakeContext)] -struct Context { - mpc_contract_id: AccountId, - responses: Arc>>, -} - -async fn handle_block( - mut block: near_lake_primitives::block::Block, - ctx: &Context, -) -> anyhow::Result<()> { - for action in block.actions().cloned().collect::>() { - if action.receiver_id() == ctx.mpc_contract_id { - let receipt = block.receipt_by_id(&action.receipt_id()).unwrap(); - if let Some(function_call) = action.as_function_call() { - if function_call.method_name() == "respond" { - let ExecutionStatus::SuccessValue(_) = receipt.status() else { - tracing::error!("indexed a failed `respond` function call"); - continue; - }; - if let Ok(respond_payload) = - serde_json::from_slice::<'_, RespondPayload>(function_call.args()) - { - let receipt_id = CryptoHash(respond_payload.receipt_id); - tracing::info!( - receipt_id = %receipt_id, - caller_id = receipt.predecessor_id().to_string(), - big_r = ?respond_payload.big_r, - s = ?respond_payload.s, - "indexed new `respond` function call" - ); - let mut responses = ctx.responses.write().await; - responses.insert( - receipt_id, - FullSignature { - big_r: respond_payload.big_r, - s: respond_payload.s, - }, - ); - drop(responses); - } - } - } - } - } - Ok(()) -} - -pub fn run( - s3_bucket: &str, - s3_region: &str, - start_block_height: u64, - s3_url: &str, - mpc_contract_id: AccountId, - responses: Arc>>, -) -> anyhow::Result<()> { - let mut lake_builder = LakeBuilder::default() - .s3_bucket_name(s3_bucket) - .s3_region_name(s3_region) - .start_block_height(start_block_height); - let lake = tokio::runtime::Builder::new_multi_thread() - .enable_all() - .build() - .unwrap() - .block_on(async { - let aws_config = aws_config::from_env().load().await; - let s3_config = aws_sdk_s3::config::Builder::from(&aws_config) - .endpoint_url(s3_url) - .build(); - lake_builder = lake_builder.s3_config(s3_config); - lake_builder.build() - })?; - let context = Context { - mpc_contract_id, - responses, - }; - lake.run_with_context(handle_block, &context)?; - Ok(()) -} diff --git a/integration-tests/src/lib.rs b/integration-tests/src/lib.rs index dd56187dc..0440d4961 100644 --- a/integration-tests/src/lib.rs +++ b/integration-tests/src/lib.rs @@ -12,7 +12,6 @@ use near_workspaces::types::NearToken; use testcontainers::{Container, GenericImage}; pub mod env; -pub mod indexer; pub mod mpc; pub mod multichain; pub mod sandbox; diff --git a/integration-tests/tests/lib.rs b/integration-tests/tests/lib.rs index 49290050b..bfd02a5b4 100644 --- a/integration-tests/tests/lib.rs +++ b/integration-tests/tests/lib.rs @@ -10,15 +10,10 @@ use mpc_recovery::{ ClaimOidcResponse, MpcPkResponse, NewAccountResponse, SignResponse, UserCredentialsResponse, }, }; +use mpc_recovery_integration_tests::env; use mpc_recovery_integration_tests::env::containers::DockerClient; -use mpc_recovery_integration_tests::indexer::FullSignature; -use mpc_recovery_integration_tests::{env, indexer}; -use near_primitives::hash::CryptoHash; +use near_jsonrpc_client::JsonRpcClient; use near_workspaces::{network::Sandbox, Worker}; -use std::collections::HashMap; -use std::sync::Arc; -use std::thread; -use tokio::sync::RwLock; pub struct TestContext { env: String, @@ -68,8 +63,8 @@ where pub struct MultichainTestContext<'a> { nodes: mpc_recovery_integration_tests::multichain::Nodes<'a>, rpc_client: near_fetch::Client, + jsonrpc_client: JsonRpcClient, http_client: reqwest::Client, - responses: Arc>>, } async fn with_multichain_nodes(nodes: usize, f: F) -> anyhow::Result<()> @@ -79,30 +74,14 @@ where let docker_client = DockerClient::default(); let nodes = mpc_recovery_integration_tests::multichain::run(nodes, &docker_client).await?; - let s3_bucket = nodes.ctx().localstack.s3_bucket.clone(); - let s3_region = nodes.ctx().localstack.s3_region.clone(); - let s3_url = nodes.ctx().localstack.s3_host_address.clone(); - let mpc_contract_id = nodes.ctx().mpc_contract.id().clone(); - let responses = Arc::new(RwLock::new(HashMap::new())); - let responses_clone = responses.clone(); - thread::spawn(move || { - indexer::run( - &s3_bucket, - &s3_region, - 0, - &s3_url, - mpc_contract_id, - responses_clone, - ) - .unwrap(); - }); - - let rpc_client = near_fetch::Client::new(&nodes.ctx().lake_indexer.rpc_host_address); + let connector = JsonRpcClient::new_client(); + let jsonrpc_client = connector.connect(&nodes.ctx().lake_indexer.rpc_host_address); + let rpc_client = near_fetch::Client::from_client(jsonrpc_client.clone()); f(MultichainTestContext { nodes, rpc_client, + jsonrpc_client, http_client: reqwest::Client::default(), - responses, }) .await?; @@ -217,9 +196,7 @@ mod wait_for { use backon::Retryable; use mpc_contract::ProtocolContractState; use mpc_contract::RunningContractState; - use mpc_recovery_integration_tests::indexer::FullSignature; use mpc_recovery_node::web::StateView; - use near_primitives::hash::CryptoHash; pub async fn running_mpc<'a>( ctx: &MultichainTestContext<'a>, @@ -304,26 +281,6 @@ mod wait_for { .await .with_context(|| format!("mpc node '{id}' failed to generate '{expected_presignature_count}' presignatures before deadline")) } - - pub async fn has_response<'a>( - ctx: &MultichainTestContext<'a>, - receipt_id: CryptoHash, - ) -> anyhow::Result { - let is_enough_presignatures = || async { - let mut responses = ctx.responses.write().await; - if let Some(signature) = responses.remove(&receipt_id) { - return Ok(signature); - } - drop(responses); - anyhow::bail!("mpc has not responded yet") - }; - is_enough_presignatures - .retry(&ExponentialBuilder::default().with_max_times(8)) - .await - .with_context(|| { - format!("mpc failed to respond to receipt id '{receipt_id}' before deadline") - }) - } } trait MpcCheck { diff --git a/integration-tests/tests/multichain/mod.rs b/integration-tests/tests/multichain/mod.rs index d6d7c99ed..025b220e3 100644 --- a/integration-tests/tests/multichain/mod.rs +++ b/integration-tests/tests/multichain/mod.rs @@ -1,12 +1,18 @@ use crate::{wait_for, with_multichain_nodes}; +use anyhow::Context; +use backon::{ExponentialBuilder, Retryable}; use k256::elliptic_curve::sec1::FromEncodedPoint; use k256::{AffinePoint, EncodedPoint, Scalar, Secp256k1}; use mpc_recovery_node::kdf; use mpc_recovery_node::util::ScalarExt; -use near_crypto::InMemorySigner; -use near_primitives::transaction::{Action, FunctionCallAction}; -use near_primitives::views::ExecutionStatusView; +use near_crypto::{InMemorySigner, Signer}; +use near_fetch::signer::ExposeAccountId; +use near_jsonrpc_client::methods::broadcast_tx_async::RpcBroadcastTxAsyncRequest; +use near_jsonrpc_client::methods::tx::{RpcTransactionStatusRequest, TransactionInfo}; +use near_primitives::transaction::{Action, FunctionCallAction, Transaction}; +use near_primitives::views::FinalExecutionStatus; use rand::Rng; +use std::time::Duration; use test_log::test; #[test(tokio::test)] @@ -66,6 +72,13 @@ async fn test_signature() -> anyhow::Result<()> { let state_0 = wait_for::running_mpc(&ctx, 0).await?; assert_eq!(state_0.participants.len(), 3); + for i in 0..ctx.nodes.len() { + wait_for::has_at_least_triples(&ctx, i, 2).await?; + } + for i in 0..ctx.nodes.len() { + wait_for::has_at_least_presignatures(&ctx, i, 2).await?; + } + let worker = &ctx.nodes.ctx().worker; let (account_id, secret_key) = worker.dev_generate().await; worker @@ -73,45 +86,66 @@ async fn test_signature() -> anyhow::Result<()> { .await? .into_result()?; let payload: [u8; 32] = rand::thread_rng().gen(); - let outcome = ctx + let signer = InMemorySigner { + account_id: account_id.clone(), + public_key: secret_key.public_key().clone().into(), + secret_key: secret_key.to_string().parse()?, + }; + let (nonce, block_hash, _) = ctx .rpc_client - .send_tx( - &InMemorySigner { - account_id: account_id.clone(), - public_key: secret_key.public_key().clone().into(), - secret_key: secret_key.to_string().parse()?, - }, - ctx.nodes.ctx().mpc_contract.id(), - vec![Action::FunctionCall(FunctionCallAction { - method_name: "sign".to_string(), - args: serde_json::to_vec(&serde_json::json!({ - "payload": payload, - "path": "test", - }))?, - gas: 300_000_000_000_000, - deposit: 0, - })], - ) + .fetch_nonce(signer.account_id(), &signer.public_key()) .await?; - let ExecutionStatusView::SuccessReceiptId(receipt_id) = - outcome.transaction_outcome.outcome.status - else { - anyhow::bail!("missing receipt id"); - }; - - let signature = wait_for::has_response(&ctx, receipt_id).await?; - let signature_output = cait_sith::FullSignature:: { - big_r: signature.big_r, - s: signature.s, + let tx_hash = ctx + .jsonrpc_client + .call(&RpcBroadcastTxAsyncRequest { + signed_transaction: Transaction { + nonce, + block_hash, + signer_id: signer.account_id().clone(), + public_key: signer.public_key(), + receiver_id: ctx.nodes.ctx().mpc_contract.id().clone(), + actions: vec![Action::FunctionCall(FunctionCallAction { + method_name: "sign".to_string(), + args: serde_json::to_vec(&serde_json::json!({ + "payload": payload, + "path": "test", + }))?, + gas: 300_000_000_000_000, + deposit: 0, + })], + } + .sign(&signer), + }) + .await?; + tokio::time::sleep(Duration::from_secs(1)).await; + let is_tx_ready = || async { + let outcome_view = ctx + .jsonrpc_client + .call(RpcTransactionStatusRequest { + transaction_info: TransactionInfo::TransactionId { + hash: tx_hash, + account_id: ctx.nodes.ctx().mpc_contract.id().clone(), + }, + }) + .await?; + let FinalExecutionStatus::SuccessValue(payload) = outcome_view.status else { + anyhow::bail!("tx finished unsuccessfully: {:?}", outcome_view.status); + }; + let (big_r, s): (AffinePoint, Scalar) = serde_json::from_slice(&payload)?; + let signature = cait_sith::FullSignature:: { big_r, s }; + Ok(signature) }; - + let signature = is_tx_ready + .retry(&ExponentialBuilder::default().with_max_times(6)) + .await + .with_context(|| "failed to wait for signature response")?; let mut bytes = vec![0x04]; bytes.extend_from_slice(&state_0.public_key.as_bytes()[1..]); let point = EncodedPoint::from_bytes(bytes).unwrap(); let public_key = AffinePoint::from_encoded_point(&point).unwrap(); let epsilon = kdf::derive_epsilon(&account_id, "test"); - assert!(signature_output.verify( + assert!(signature.verify( &kdf::derive_key(public_key, epsilon), &Scalar::from_bytes(&payload), )); diff --git a/node/src/indexer.rs b/node/src/indexer.rs index bf0e97ec3..32d2cbcda 100644 --- a/node/src/indexer.rs +++ b/node/src/indexer.rs @@ -79,7 +79,7 @@ async fn handle_block( for action in block.actions().cloned().collect::>() { if action.receiver_id() == ctx.mpc_contract_id { let receipt = block.receipt_by_id(&action.receipt_id()).unwrap(); - let ExecutionStatus::SuccessValue(result) = receipt.status() else { + let ExecutionStatus::SuccessReceiptId(receipt_id) = receipt.status() else { continue; }; if let Some(function_call) = action.as_function_call() { @@ -87,14 +87,23 @@ async fn handle_block( if let Ok(sign_payload) = serde_json::from_slice::<'_, SignPayload>(function_call.args()) { - let Ok(entropy) = serde_json::from_slice::<'_, [u8; 32]>(&result) else { + if receipt.logs().is_empty() { + tracing::warn!("`sign` did not produce entropy"); + continue; + } + let Ok(entropy) = serde_json::from_str::<'_, [u8; 32]>(&receipt.logs()[0]) + else { + tracing::warn!( + "`sign` did not produce entropy correctly: {:?}", + receipt.logs()[0] + ); continue; }; let epsilon = kdf::derive_epsilon(&action.predecessor_id(), &sign_payload.path); - let delta = kdf::derive_delta(receipt.receipt_id(), entropy); + let delta = kdf::derive_delta(receipt_id, entropy); tracing::info!( - receipt_id = %receipt.receipt_id(), + receipt_id = %receipt_id, caller_id = receipt.predecessor_id().to_string(), payload = hex::encode(sign_payload.payload), entropy = hex::encode(entropy), @@ -102,7 +111,7 @@ async fn handle_block( ); let mut queue = ctx.queue.write().await; queue.add(SignRequest { - receipt_id: receipt.receipt_id(), + receipt_id, msg_hash: sign_payload.payload, epsilon, delta, diff --git a/node/src/protocol/signature.rs b/node/src/protocol/signature.rs index 8198aeae8..ca702605f 100644 --- a/node/src/protocol/signature.rs +++ b/node/src/protocol/signature.rs @@ -101,7 +101,7 @@ pub struct SignatureManager { /// Ongoing signature generation protocols. generators: HashMap, /// Generated signatures assigned to the current node that are yet to be published. - signatures: Vec<(CryptoHash, FullSignature)>, + signatures: Vec<(CryptoHash, [u8; 32], FullSignature)>, participants: Vec, me: Participant, @@ -290,7 +290,8 @@ impl SignatureManager { "completed signature generation" ); if generator.proposer == self.me { - self.signatures.push((*receipt_id, output)); + self.signatures + .push((*receipt_id, generator.msg_hash, output)); } // Do not retain the protocol return false; @@ -307,7 +308,7 @@ impl SignatureManager { signer: &T, mpc_contract_id: &AccountId, ) -> Result<(), near_fetch::Error> { - for (receipt_id, signature) in self.signatures.drain(..) { + for (receipt_id, payload, signature) in self.signatures.drain(..) { // TODO: Figure out how to properly serialize the signature // let r_s = signature.big_r.x().concat(signature.s.to_bytes()); // let tag = @@ -315,8 +316,7 @@ impl SignatureManager { // let signature = r_s.append(tag); // let signature = Secp256K1Signature::try_from(signature.as_slice()).unwrap(); // let signature = Signature::SECP256K1(signature); - tracing::info!(%receipt_id, big_r = signature.big_r.to_base58(), s = ?signature.s, "publishing signature response"); - rpc_client + let response = rpc_client .send_tx( signer, mpc_contract_id, @@ -324,7 +324,7 @@ impl SignatureManager { FunctionCallAction { method_name: "respond".to_string(), args: serde_json::to_vec(&serde_json::json!({ - "receipt_id": receipt_id.as_bytes(), + "payload": payload, "big_r": signature.big_r, "s": signature.s })) @@ -335,6 +335,7 @@ impl SignatureManager { )], ) .await?; + tracing::info!(%receipt_id, big_r = signature.big_r.to_base58(), s = ?signature.s, status = ?response.status, "published signature response"); } Ok(()) }