Skip to content

Commit ac059ed

Browse files
Merge pull request #27 from cardano-scaling/sample-transactions
Endpoint to fetch the next N transactions from a given node
2 parents 556c989 + 7bf8ae6 commit ac059ed

File tree

8 files changed

+135
-7
lines changed

8 files changed

+135
-7
lines changed

.gitignore

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,4 +12,4 @@ utxo.json
1212
.pre-commit-config.yaml
1313
admin.sk
1414
run.sh
15-
/keys
15+
crates/rpc/keys

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/rpc/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ k8s-openapi = { version = "0.23.0", features = ["latest"] }
4343
kube = { version = "0.96.0", features = ["client", "derive", "runtime"] }
4444
schemars = "0.8.21"
4545
rocket-errors = "0.1.0"
46+
rand = "0.8.5"
4647

4748
[profile.release]
4849
debug = true

crates/rpc/src/main.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
1-
21
use anyhow::{Context, Result};
32
use model::cluster::ClusterState;
43
use rocket::{http::Method, routes};
54
use rocket_cors::{AllowedOrigins, CorsOptions};
65
use routes::{
76
add_player::add_player, cleanup::cleanup, head::head, heads::heads, new_game::new_game,
7+
sample_transactions::sample_transactions,
88
};
99
use serde::Deserialize;
1010

@@ -41,7 +41,17 @@ async fn main() -> Result<()> {
4141

4242
let _rocket = rocket::build()
4343
.manage(cluster)
44-
.mount("/", routes![new_game, heads, head, add_player, cleanup])
44+
.mount(
45+
"/",
46+
routes![
47+
new_game,
48+
heads,
49+
head,
50+
add_player,
51+
cleanup,
52+
sample_transactions
53+
],
54+
)
4555
.attach(cors.to_cors().unwrap())
4656
.launch()
4757
.await?;

crates/rpc/src/model/cluster/node.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,10 @@ use tracing::debug;
1313

1414
use crate::model::{
1515
game::player::Player,
16-
hydra::{hydra_socket, messages::new_tx::NewTx},
16+
hydra::{
17+
hydra_socket,
18+
messages::{new_tx::NewTx, tx_valid::TxValid},
19+
},
1720
tx_builder::TxBuilder,
1821
};
1922

@@ -207,6 +210,16 @@ impl NodeClient {
207210

208211
Ok(utxos)
209212
}
213+
214+
pub async fn sample_txs(&self, count: usize) -> Result<Vec<TxValid>> {
215+
//TODO: make duration configurable
216+
hydra_socket::sample_txs(
217+
&format!("{}/?history=no", &self.connection.to_websocket_url()),
218+
count,
219+
Duration::from_secs(10),
220+
)
221+
.await
222+
}
210223
}
211224

212225
impl ConnectionInfo {

crates/rpc/src/model/hydra/hydra_socket.rs

Lines changed: 49 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use futures_util::{
2020
use tokio::{
2121
net::TcpStream,
2222
sync::{mpsc::UnboundedSender, Mutex},
23-
task::yield_now,
23+
task::{yield_now, JoinHandle},
2424
};
2525
use tokio_native_tls::TlsStream;
2626
use tracing::{debug, info, warn};
@@ -29,7 +29,7 @@ use crate::model::hydra::hydra_message::HydraEventMessage;
2929

3030
use super::{
3131
hydra_message::{HydraData, HydraMessage},
32-
messages::new_tx::NewTx,
32+
messages::{new_tx::NewTx, tx_valid::TxValid},
3333
};
3434

3535
#[allow(dead_code)]
@@ -159,6 +159,49 @@ impl HydraSender {
159159
}
160160
}
161161

162+
pub async fn sample_txs(url: &str, count: usize, timeout: Duration) -> Result<Vec<TxValid>> {
163+
let request = url.into_client_request().unwrap();
164+
info!("attempting to connect to {}", &url);
165+
let (ws_stream, _) = connect_async(request).await.context("failed to connect")?;
166+
info!("connected to {}", &url);
167+
168+
let (_, mut receiver) = ws_stream.split();
169+
let fetch_transactions: JoinHandle<Result<Vec<TxValid>>> = tokio::spawn(async move {
170+
let mut transactions: Vec<TxValid> = Vec::with_capacity(count);
171+
loop {
172+
let next = receiver.next().await.context("failed to receive")??;
173+
let msg = HydraMessage::try_from(next).context("failed to parse hydra message")?;
174+
175+
match msg {
176+
HydraMessage::HydraEvent(x) => match x {
177+
HydraEventMessage::TxValid(tx) => {
178+
transactions.push(tx);
179+
if transactions.len() == count {
180+
break;
181+
}
182+
}
183+
_ => {}
184+
},
185+
_ => {}
186+
}
187+
}
188+
189+
Ok(transactions)
190+
});
191+
192+
tokio::select! {
193+
join = fetch_transactions => {
194+
match join {
195+
Ok(result) => result,
196+
Err(e) => Err(e.into()),
197+
}
198+
}
199+
_ = tokio::time::sleep(timeout) => {
200+
Err(anyhow!("failed to fetch {} transactions within timeout", count))
201+
}
202+
}
203+
}
204+
162205
pub async fn submit_tx_roundtrip(url: &str, tx: NewTx, timeout: Duration) -> Result<()> {
163206
let request = url.into_client_request().unwrap();
164207
let (ws_stream, _) = connect_async(request).await.context("failed to connect")?;
@@ -188,7 +231,10 @@ pub async fn submit_tx_roundtrip(url: &str, tx: NewTx, timeout: Duration) -> Res
188231
}
189232
});
190233

191-
sender.send(Message::Text(tx.into())).await.context("failed to send transaction")?;
234+
sender
235+
.send(Message::Text(tx.into()))
236+
.await
237+
.context("failed to send transaction")?;
192238

193239
tokio::select! {
194240
// TODO: result.flatten https://github.com/rust-lang/rust/issues/70142

crates/rpc/src/routes/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,4 @@ pub mod cleanup;
33
pub mod head;
44
pub mod heads;
55
pub mod new_game;
6+
pub mod sample_transactions;
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
use rand::seq::SliceRandom;
2+
3+
use crate::model::{
4+
cluster::{ClusterState, NodeClient},
5+
hydra::messages::tx_valid::TxValid,
6+
};
7+
use rand::thread_rng;
8+
use rocket::{get, http::Status, serde::json::Json, State};
9+
use rocket_errors::anyhow::Result;
10+
use serde::Serialize;
11+
use tracing::error;
12+
13+
#[derive(Serialize)]
14+
pub struct SampleTransaction {
15+
cbor: String,
16+
tx_id: String,
17+
}
18+
19+
#[get("/sample_transactions?<count>&<id>")]
20+
pub async fn sample_transactions(
21+
count: usize,
22+
id: Option<&str>,
23+
state: &State<ClusterState>,
24+
) -> Result<Json<Vec<SampleTransaction>>, Status> {
25+
let node = match id {
26+
Some(id) => state.get_node_by_id(id).ok_or(Status::NotFound)?,
27+
None => state
28+
.get_all_nodes()
29+
.choose(&mut thread_rng())
30+
.ok_or(Status::NotFound)?
31+
.to_owned(),
32+
};
33+
let client = NodeClient::new(node, state.admin_sk.clone(), true)
34+
.inspect_err(|err| error!("error connecting to node: {}", err))
35+
.map_err(|_| Status::InternalServerError)?;
36+
37+
let transactions = client
38+
.sample_txs(count)
39+
.await
40+
.inspect_err(|err| error!("error sampling transactions: {}", err))
41+
.map_err(|_| Status::InternalServerError)?
42+
.into_iter()
43+
.map(|x| x.into())
44+
.collect::<Vec<SampleTransaction>>();
45+
46+
Ok(Json(transactions))
47+
}
48+
49+
impl From<TxValid> for SampleTransaction {
50+
fn from(value: TxValid) -> Self {
51+
Self {
52+
cbor: hex::encode(value.cbor),
53+
tx_id: value.tx_id,
54+
}
55+
}
56+
}

0 commit comments

Comments
 (0)