Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add start_newPendingTransactions_loop #2690

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions client/src/rpc/impls/eth_pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,12 @@ pub struct PubSubClient {
handler: Arc<ChainNotificationHandler>,
heads_subscribers: Arc<RwLock<Subscribers<Client>>>,
logs_subscribers: Arc<RwLock<Subscribers<(Client, LogFilter)>>>,
pending_transactions_subscribers: Arc<RwLock<Subscribers<Client>>>,
epochs_ordered: Arc<Channel<(u64, Vec<H256>)>>,
consensus: SharedConsensusGraph,
heads_loop_started: Arc<RwLock<bool>>,
pending_transactions_loop_started: Arc<RwLock<bool>>,
new_pending_transactions: Arc<Channel<H256>>,
}

impl PubSubClient {
Expand All @@ -64,21 +67,26 @@ impl PubSubClient {
{
let heads_subscribers = Arc::new(RwLock::new(Subscribers::default()));
let logs_subscribers = Arc::new(RwLock::new(Subscribers::default()));
let pending_transactions_subscribers = Arc::new(RwLock::new(Subscribers::default()));

let handler = Arc::new(ChainNotificationHandler {
executor,
consensus: consensus.clone(),
data_man: consensus.get_data_manager().clone(),
heads_subscribers: heads_subscribers.clone(),
pending_transactions_subscribers: pending_transactions_subscribers.clone()
});

PubSubClient {
handler,
heads_subscribers,
logs_subscribers,
pending_transactions_subscribers,
epochs_ordered: notifications.epochs_ordered.clone(),
consensus: consensus.clone(),
new_pending_transactions: notifications.new_pending_transactions.clone(),
heads_loop_started: Arc::new(RwLock::new(false)),
pending_transactions_loop_started: Arc::new(RwLock::new(false)),
}
}

Expand All @@ -91,6 +99,31 @@ impl PubSubClient {
self.epochs_ordered.clone()
}

fn start_new_pending_transactions_loop(&self) {
let mut loop_started = self.pending_transactions_loop_started.write();
if *loop_started {
return;
}

debug!("start_pending_transactions_loop");
*loop_started = true;

let mut receiver = self.new_pending_transactions.subscribe();

let handler_clone = self.handler.clone();

let fut = async move {
while let Some(new_tx) = receiver.recv().await {
trace!("new_pending_transactions_loop: {:?}", new_tx);

handler_clone.notify_new_pending_transaction(new_tx);
}
};

let fut = fut.unit_error().boxed().compat();
self.handler.executor.spawn(fut);
}

fn start_heads_loop(&self) {
let mut loop_started = self.heads_loop_started.write();
if *loop_started {
Expand Down Expand Up @@ -232,6 +265,7 @@ pub struct ChainNotificationHandler {
consensus: SharedConsensusGraph,
data_man: Arc<BlockDataManager>,
heads_subscribers: Arc<RwLock<Subscribers<Client>>>,
pending_transactions_subscribers: Arc<RwLock<Subscribers<Client>>>,
}

impl ChainNotificationHandler {
Expand All @@ -254,6 +288,28 @@ impl ChainNotificationHandler {
let _ = fut.compat().await;
}

// notify each subscriber about new pending transaction `hash` concurrently
fn notify_new_pending_transaction(&self, hash: H256) {
trace!("notify_new_pending_transaction({:?})", hash);

let subscribers = self.pending_transactions_subscribers.read();

// do not retrieve anything unnecessarily
if subscribers.is_empty() {
trace!("No subscribers for pending transactions");
return;
}

debug!("Notify {}", hash);
for subscriber in subscribers.values() {
Self::notify(
&self.executor,
subscriber,
pubsub::Result::TransactionHash(hash),
);
}
}

// notify each subscriber about header `hash` concurrently
// NOTE: multiple calls to this method will result in concurrent
// notifications, so the headers published might be reordered.
Expand Down Expand Up @@ -542,6 +598,13 @@ impl PubSub for PubSubClient {
)
{
let error = match (kind, params) {
// --------- newPendingTransactions ---------
(pubsub::Kind::NewPendingTransactions, None) => {
info!("eth pubsub newPendingTransactions");
self.pending_transactions_subscribers.write().push(subscriber);
self.start_new_pending_transactions_loop();
return;
}
// --------- newHeads ---------
(pubsub::Kind::NewHeads, None) => {
info!("eth pubsub newheads");
Expand Down
2 changes: 2 additions & 0 deletions core/src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ pub struct Notifications {
pub new_block_hashes: Arc<Channel<H256>>,
pub epochs_ordered: Arc<Channel<(u64, Vec<H256>)>>,
pub blame_verification_results: Arc<Channel<(u64, Option<u64>)>>, /* <height, witness> */
pub new_pending_transactions: Arc<Channel<H256>>,
}

impl Notifications {
Expand All @@ -123,6 +124,7 @@ impl Notifications {
blame_verification_results: Arc::new(Channel::new(
"blame-verification-results",
)),
new_pending_transactions: Arc::new(Channel::new("new-pending-transactions")),
})
}
}
Expand Down