Skip to content

Commit

Permalink
feat(grpc): apply predicate on WatchTx (#384)
Browse files Browse the repository at this point in the history
  • Loading branch information
nicolasLuduena authored Dec 6, 2024
1 parent e0f42cf commit dfe233e
Showing 1 changed file with 158 additions and 6 deletions.
164 changes: 158 additions & 6 deletions src/serve/grpc/watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,171 @@ use futures_core::Stream;
use futures_util::StreamExt;
use pallas::interop::utxorpc as interop;
use pallas::interop::utxorpc::spec as u5c;
use pallas::ledger::traverse::MultiEraBlock;
use pallas::{
interop::utxorpc::spec::watch::any_chain_tx_pattern::Chain,
ledger::{addresses::Address, traverse::MultiEraBlock},
};
use std::pin::Pin;
use tonic::{Request, Response, Status};

fn outputs_match_address(
pattern: &u5c::cardano::AddressPattern,
outputs: &Vec<u5c::cardano::TxOutput>,

Check failure on line 18 in src/serve/grpc/watch.rs

View workflow job for this annotation

GitHub Actions / Lint Rust

writing `&Vec` instead of `&[_]` involves a new object where a slice will do
) -> bool {
let exact_matches = pattern.exact_address.is_empty()
|| outputs.iter().any(|o| o.address == &pattern.exact_address);

Check failure on line 21 in src/serve/grpc/watch.rs

View workflow job for this annotation

GitHub Actions / Lint Rust

taken reference of right operand

let delegation_matches = pattern.delegation_part.is_empty()
|| outputs.iter().any(|o| {
let addr = Address::from_bytes(&o.address).unwrap();
match addr {
Address::Shelley(s) => s.delegation().to_vec().eq(&pattern.delegation_part),
_ => false,
}
});
let payment_matches = pattern.payment_part.is_empty()
|| outputs.iter().any(|o| {
let addr = Address::from_bytes(&o.address).unwrap();
match addr {
Address::Shelley(s) => s.payment().to_vec().eq(&pattern.payment_part),
_ => false,
}
});

exact_matches && delegation_matches && payment_matches
}

fn outputs_match_asset(
asset_pattern: &u5c::cardano::AssetPattern,
outputs: &Vec<u5c::cardano::TxOutput>,

Check failure on line 45 in src/serve/grpc/watch.rs

View workflow job for this annotation

GitHub Actions / Lint Rust

writing `&Vec` instead of `&[_]` involves a new object where a slice will do
) -> bool {
(asset_pattern.asset_name.is_empty() && asset_pattern.policy_id.is_empty())
|| outputs.iter().any(|o| {
o.assets.iter().any(|ma| {
ma.policy_id.eq(&asset_pattern.policy_id)
&& ma
.assets
.iter()
.any(|a| a.name.eq(&asset_pattern.asset_name))
})
})
}

fn matches_output(
pattern: &u5c::cardano::TxOutputPattern,
outputs: &Vec<u5c::cardano::TxOutput>,
) -> bool {
let address_match = pattern.address.as_ref().map_or(true, |addr_pattern| {
outputs_match_address(addr_pattern, outputs)
});

let asset_match = pattern.asset.as_ref().map_or(true, |asset_pattern| {
outputs_match_asset(asset_pattern, outputs)
});

address_match && asset_match
}

fn matches_cardano_pattern(tx_pattern: &u5c::cardano::TxPattern, tx: &u5c::cardano::Tx) -> bool {
let has_address_match = tx_pattern
.has_address
.as_ref()
.map_or(true, |addr_pattern| {
let outputs: Vec<_> = tx.outputs.iter().cloned().collect();

Check failure on line 79 in src/serve/grpc/watch.rs

View workflow job for this annotation

GitHub Actions / Lint Rust

called `iter().cloned().collect()` on a slice to create a `Vec`. Calling `to_vec()` is both faster and more readable
let inputs: Vec<_> = tx
.inputs
.iter()
.filter_map(|x| x.as_output.as_ref().cloned())
.collect();

outputs_match_address(addr_pattern, &inputs)
|| outputs_match_address(addr_pattern, &outputs)
});

let consumes_match = tx_pattern.consumes.as_ref().map_or(true, |out_pattern| {
let inputs: Vec<_> = tx
.inputs
.iter()
.filter_map(|x| x.as_output.as_ref().cloned())
.collect();
matches_output(out_pattern, &inputs)
});

let mints_asset_match = tx_pattern
.mints_asset
.as_ref()
.map_or(true, |asset_pattern| {
(asset_pattern.asset_name.is_empty() && asset_pattern.policy_id.is_empty())
|| tx.mint.iter().any(|ma| {
ma.policy_id.eq(&asset_pattern.policy_id)
&& ma
.assets
.iter()
.any(|a| a.name.eq(&asset_pattern.asset_name))
})
});

let moves_asset_match = tx_pattern
.moves_asset
.as_ref()
.map_or(true, |asset_pattern| {
let inputs: Vec<_> = tx
.inputs
.iter()
.filter_map(|x| x.as_output.as_ref().cloned())
.collect();
outputs_match_asset(asset_pattern, &inputs)
|| outputs_match_asset(asset_pattern, &tx.outputs)
});

let produces_match = tx_pattern
.produces
.as_ref()
.map_or(true, |out_pattern| matches_output(out_pattern, &tx.outputs));

has_address_match && consumes_match && mints_asset_match && moves_asset_match && produces_match
}

fn matches_chain(chain: &Chain, tx: &u5c::cardano::Tx) -> bool {
match chain {
Chain::Cardano(tx_pattern) => matches_cardano_pattern(tx_pattern, tx),
}
}

fn apply_predicate(predicate: &u5c::watch::TxPredicate, tx: &u5c::cardano::Tx) -> bool {
let tx_matches = predicate
.r#match
.as_ref()
.and_then(|pattern| pattern.chain.as_ref())
.map_or(true, |chain| matches_chain(chain, tx));

let not_clause = predicate.not.iter().any(|p| apply_predicate(p, tx));

let and_clause = predicate.all_of.iter().all(|p| apply_predicate(p, tx));

let or_clause =
predicate.any_of.is_empty() || predicate.any_of.iter().any(|p| apply_predicate(p, tx));

tx_matches && !not_clause && and_clause && or_clause
}

fn block_to_txs(
block: &wal::RawBlock,
mapper: &interop::Mapper<LedgerStore>,
request: &u5c::watch::WatchTxRequest,
) -> Vec<u5c::watch::AnyChainTx> {
let wal::RawBlock { body, .. } = block;
let block = MultiEraBlock::decode(body).unwrap();
let txs = block.txs();

txs.iter()
.map(|x| mapper.map_tx(x))
.map(|x: &pallas::ledger::traverse::MultiEraTx<'_>| mapper.map_tx(x))
.filter(|tx| {
request
.predicate
.as_ref()
.map_or(true, |predicate| apply_predicate(predicate, tx))
})
.map(|x| u5c::watch::AnyChainTx {
chain: Some(u5c::watch::any_chain_tx::Chain::Cardano(x)),
})
Expand All @@ -29,14 +180,15 @@ fn block_to_txs(
fn roll_to_watch_response(
mapper: &interop::Mapper<LedgerStore>,
log: &wal::LogValue,
request: &u5c::watch::WatchTxRequest,
) -> impl Stream<Item = u5c::watch::WatchTxResponse> {
let txs: Vec<_> = match log {
wal::LogValue::Apply(block) => block_to_txs(block, mapper)
wal::LogValue::Apply(block) => block_to_txs(block, mapper, request)
.into_iter()
.map(u5c::watch::watch_tx_response::Action::Apply)
.map(|x| u5c::watch::WatchTxResponse { action: Some(x) })
.collect(),
wal::LogValue::Undo(block) => block_to_txs(block, mapper)
wal::LogValue::Undo(block) => block_to_txs(block, mapper, request)
.into_iter()
.map(u5c::watch::watch_tx_response::Action::Undo)
.map(|x| u5c::watch::WatchTxResponse { action: Some(x) })
Expand Down Expand Up @@ -72,7 +224,7 @@ impl u5c::watch::watch_service_server::WatchService for WatchServiceImpl {
&self,
request: Request<u5c::watch::WatchTxRequest>,
) -> Result<Response<Self::WatchTxStream>, Status> {
let _ = request.into_inner();
let inner_req = request.into_inner();

let from_seq = self
.wal
Expand All @@ -84,7 +236,7 @@ impl u5c::watch::watch_service_server::WatchService for WatchServiceImpl {
let mapper = self.mapper.clone();

let stream = wal::WalStream::start(self.wal.clone(), from_seq)
.flat_map(move |(_, log)| roll_to_watch_response(&mapper, &log))
.flat_map(move |(_, log)| roll_to_watch_response(&mapper, &log, &inner_req))
.map(Ok);

Ok(Response::new(Box::pin(stream)))
Expand Down

0 comments on commit dfe233e

Please sign in to comment.