Skip to content

Commit

Permalink
Update event filters, skip update if the header exists, fix unreceived
Browse files Browse the repository at this point in the history
packets checking
  • Loading branch information
en committed Jan 16, 2024
1 parent 0c5b7b6 commit 60a6e6c
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 54 deletions.
52 changes: 52 additions & 0 deletions crates/relayer/src/chain/counterparty.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,37 @@ pub fn packet_acknowledgements(
Ok(Some((acked_sequences, response_height)))
}

pub fn packet_acknowledgements1(
chain: &impl ChainHandle,
port_id: &PortId,
channel_id: &ChannelId,
mut commit_sequences: Vec<Sequence>,
) -> Result<Option<(Vec<Sequence>, Height)>, Error> {
// If there aren't any sequences to query for, return early.
// Otherwise we end up with the full list of acknowledgements on chain,
// which is potentially huge and extremely costly.
if commit_sequences.is_empty() {
return Ok(None);
}

// Get the packet acknowledgments on counterparty/source chain
let (acked_sequences, response_height) = chain
.query_packet_acknowledgements(QueryPacketAcknowledgementsRequest {
port_id: port_id.clone(),
channel_id: channel_id.clone(),
pagination: Some(PageRequest::all()),
packet_commitment_sequences: commit_sequences.clone(),
})
.map_err(Error::relayer)?;

let acked_sequences_set = acked_sequences.iter().cloned().collect::<HashSet<_>>();

commit_sequences.retain(|s| !acked_sequences_set.contains(s));
commit_sequences.sort_unstable();

Ok(Some((commit_sequences, response_height)))
}

/// Returns the sequences of the packets that were sent on the chain and for which:
/// - `MsgRecvPacket`-s have been received on the counterparty chain but
/// - `MsgAcknowledgement`-s have NOT been received by the chain
Expand Down Expand Up @@ -508,6 +539,27 @@ pub fn unreceived_packets(
Ok((packet_seq_nrs, h))
}

pub fn acknowledgements_on_chain1(
chain: &impl ChainHandle,
counterparty_chain: &impl ChainHandle,
path: &PathIdentifiers,
) -> Result<Option<(Vec<Sequence>, Height)>, Error> {
let (commitments_on_counterparty, _) = commitments_on_chain(
counterparty_chain,
&path.counterparty_port_id,
&path.counterparty_channel_id,
)?;

let sequences_and_height = packet_acknowledgements1(
chain,
&path.port_id,
&path.channel_id,
commitments_on_counterparty,
)?;

Ok(sequences_and_height)
}

pub fn acknowledgements_on_chain(
chain: &impl ChainHandle,
counterparty_chain: &impl ChainHandle,
Expand Down
10 changes: 0 additions & 10 deletions crates/relayer/src/chain/near/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1185,16 +1185,6 @@ impl ChainEndpoint for NearChain {
request,
std::panic::Location::caller()
);
let mut request = request;
request.height = request.height.map(|height| match height {
QueryHeight::Latest => QueryHeight::Latest,
QueryHeight::Specific(value) => QueryHeight::Specific(
// TODO(davirina): why +10?
Height::new(value.revision_number(), value.revision_height() + 10)
.expect("failed construct ibc height"),
),
// todo(davirain) can improve this error handling
});
let original_result = self
.get_packet_events(request)
.map_err(Error::near_chain_error)?;
Expand Down
8 changes: 5 additions & 3 deletions crates/relayer/src/event/near_source/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,10 +269,12 @@ impl EventSource {
.filter(|e| {
matches!(
e.event,
IbcEvent::CloseInitChannel(_)
| IbcEvent::TimeoutPacket(_)
| IbcEvent::SendPacket(_)
IbcEvent::SendPacket(_)
| IbcEvent::ReceivePacket(_)
| IbcEvent::WriteAcknowledgement(_)
| IbcEvent::AcknowledgePacket(_)
| IbcEvent::TimeoutPacket(_)
| IbcEvent::TimeoutOnClosePacket(_)
)
})
.collect();
Expand Down
6 changes: 3 additions & 3 deletions crates/relayer/src/event/source/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use tokio::{
runtime::Runtime as TokioRuntime,
time::{sleep, Duration, Instant},
};
use tracing::{debug, error, error_span, trace};
use tracing::{debug, error, error_span, info, trace};

use tendermint::abci;
use tendermint::block::Height as BlockHeight;
Expand Down Expand Up @@ -296,10 +296,10 @@ async fn collect_events(
latest_block_height: BlockHeight,
) -> Result<Option<EventBatch>> {
let abci_events = fetch_all_events(rpc_client, latest_block_height).await?;
trace!("Found {} ABCI events before dedupe", abci_events.len());
info!("Found {} ABCI events before dedupe", abci_events.len());

let abci_events = dedupe(abci_events);
trace!("Found {} ABCI events after dedupe", abci_events.len());
info!("Found {} ABCI events after dedupe", abci_events.len());

let height = Height::from_tm(latest_block_height, chain_id);
let new_block_event =
Expand Down
83 changes: 45 additions & 38 deletions crates/relayer/src/link/relay_path.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use ibc_relayer_types::timestamp::Timestamp;
use ibc_relayer_types::tx_msg::Msg;
use ibc_relayer_types::Height;

use crate::chain::counterparty::acknowledgements_on_chain1;
use crate::chain::counterparty::unreceived_acknowledgements;
use crate::chain::counterparty::unreceived_packets;
use crate::chain::endpoint::ChainStatus;
Expand Down Expand Up @@ -1109,10 +1110,20 @@ impl<ChainA: ChainHandle, ChainB: ChainHandle> RelayPath<ChainA, ChainB> {
)
.entered();

// Pull the s.n. of all packets that the destination chain has not yet received.
let (sequences, src_response_height) =
let (sequences, src_response_height) = if self.ordered_channel() {
if let Some((seqs, height)) =
acknowledgements_on_chain1(self.dst_chain(), self.src_chain(), &self.path_id)
.map_err(LinkError::supervisor)?
{
(seqs, height)
} else {
return Ok(());
}
} else {
// Pull the s.n. of all packets that the destination chain has not yet received.
unreceived_packets(self.dst_chain(), self.src_chain(), &self.path_id)
.map_err(LinkError::supervisor)?;
.map_err(LinkError::supervisor)?
};

let query_height = opt_query_height.unwrap_or(src_response_height);

Expand Down Expand Up @@ -1211,33 +1222,30 @@ impl<ChainA: ChainHandle, ChainB: ChainHandle> RelayPath<ChainA, ChainB> {
packet: &Packet,
height: Height,
) -> Result<Option<Vec<Any>>, LinkError> {
if self
.src_chain()
.config()
.map_err(|e| {
LinkError::custom_error(format!("[in connection: build_recv_packet decode src_chain get config failed] -> Error({})", e))
})?
.r#type
== ChainType::Near
{
if self.src_chain().config().unwrap().r#type == ChainType::Near {
let mut msgs = self.build_update_client_on_dst(height)?;
assert!(!msgs.is_empty());
let msg_update_client = msgs.last().ok_or(LinkError::custom_error(
"[in connection: build_recv_packet msgs.last() is none]".into(),
))?;
let domain_msg = MsgUpdateClient::decode_vec(&msg_update_client.value).map_err(|e| {
LinkError::custom_error(format!(
let proof_height = if msgs.is_empty() {
warn!("consensus state already exists at height {height}, skipping update");
height
} else {
let msg_update_client = msgs.last().ok_or(LinkError::custom_error(
"[in connection: build_recv_packet msgs.last() is none]".into(),
))?;
let domain_msg =
MsgUpdateClient::decode_vec(&msg_update_client.value).map_err(|e| {
LinkError::custom_error(format!(
"[in packet: build_recv_packet decode MsgUpdateClient failed] -> Error({})",
e
))
})?;
let near_header = AnyHeader::try_from(domain_msg.header).map_err(|e| {
})?;
let near_header = AnyHeader::try_from(domain_msg.header).map_err(|e| {
LinkError::custom_error(format!(
"[in packet: build_recv_packet decode ClientMessage to AnyHeader failed] -> Error({})",
e
))
})?;
let proof_height = near_header.height();
near_header.height()
};
warn!("new header for recv_packet: {:?}", proof_height);

let proofs = self
Expand Down Expand Up @@ -1282,34 +1290,33 @@ impl<ChainA: ChainHandle, ChainB: ChainHandle> RelayPath<ChainA, ChainB> {
) -> Result<Option<Vec<Any>>, LinkError> {
let packet = event.packet.clone();

if self
.src_chain()
.config()
.map_err(|e| {
LinkError::custom_error(format!("[in connection: build_ack_from_recv_event decode src_chain get config failed] -> Error({})", e))
})?
.r#type
== ChainType::Near
{
if self.src_chain().config().unwrap().r#type == ChainType::Near {
let mut msgs = self.build_update_client_on_dst(height)?;
assert!(!msgs.is_empty());
let msg_update_client = msgs.last().ok_or(LinkError::custom_error(
"[in connection: build_recv_packet msgs.last() is none]".into(),
))?;
let domain_msg = MsgUpdateClient::decode_vec(&msg_update_client.value).map_err(|e| {
let proof_height = if msgs.is_empty() {
warn!("consensus state already exists at height {height}, skipping update");
height
} else {
let msg_update_client = msgs.last().ok_or(LinkError::custom_error(
"[in connection: build_recv_packet msgs.last() is none]".into(),
))?;
let domain_msg = MsgUpdateClient::decode_vec(&msg_update_client.value).map_err(|e| {
LinkError::custom_error(format!(
"[in packet: build_ack_from_recv_event decode MsgUpdateClient failed] -> Error({})",
e
))
})?;
let near_header = AnyHeader::try_from(domain_msg.header).map_err(|e| {
let near_header = AnyHeader::try_from(domain_msg.header).map_err(|e| {
LinkError::custom_error(format!(
"[in packet: build_ack_from_recv_event decode ClientMessage to AnyHeader failed] -> Error({})",
e
))
})?;
let proof_height = near_header.height();
warn!("new header for build_ack_from_recv_event: {:?}", proof_height);
near_header.height()
};
warn!(
"new header for build_ack_from_recv_event: {:?}",
proof_height
);

let proofs = self
.src_chain()
Expand Down

0 comments on commit 60a6e6c

Please sign in to comment.