Skip to content

Commit

Permalink
feat: Add Meta TX support
Browse files Browse the repository at this point in the history
  • Loading branch information
WuTao18 committed Apr 18, 2023
1 parent d522a98 commit 5d59b93
Show file tree
Hide file tree
Showing 11 changed files with 238 additions and 245 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## 0.10.35

* Upgrade Indexer Framework to be based on [`nearcore` version `1.32.2`](https://github.com/near/nearcore/releases/tag/1.32.2)
* Add support of [Meta Transactions](https://github.com/near/NEPs/pull/366)

## 0.10.34

Expand Down
12 changes: 12 additions & 0 deletions migrations/2023-02-28-160000_meta_tx/down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
-- We do not drop `DELEGATE_ACTION` here because we can't remove items from enum in Postgres.
-- To be honest, it does not sound as a big problem. `IF_NOT_EXISTS` will prevent re-adding it.

ALTER TABLE transaction_actions
DROP COLUMN is_delegate_action,
DROP COLUMN delegate_parameters,
DROP COLUMN delegate_parent_index_in_transaction;

ALTER TABLE action_receipt_actions
DROP COLUMN is_delegate_action,
DROP COLUMN delegate_parameters,
DROP COLUMN delegate_parent_index_in_action_receipt;
17 changes: 17 additions & 0 deletions migrations/2023-02-28-160000_meta_tx/up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
-- If your DB is not empty, you need to turn off the Indexer, then apply the migration, then switch to 0.12.0

ALTER TYPE action_kind ADD VALUE IF NOT EXISTS 'DELEGATE_ACTION';

-- For all happy users of Postgres 11+, this should run fast
ALTER TABLE transaction_actions
ADD COLUMN is_delegate_action BOOLEAN NOT NULL DEFAULT FALSE,
ADD COLUMN delegate_parameters JSONB,
ADD COLUMN delegate_parent_index_in_transaction INTEGER;

ALTER TABLE action_receipt_actions
ADD COLUMN is_delegate_action BOOLEAN NOT NULL DEFAULT FALSE,
ADD COLUMN delegate_parameters JSONB,
ADD COLUMN delegate_parent_index_in_action_receipt INTEGER;

ALTER TABLE transaction_actions ALTER COLUMN is_delegate_action DROP DEFAULT;
ALTER TABLE action_receipt_actions ALTER COLUMN is_delegate_action DROP DEFAULT;
6 changes: 3 additions & 3 deletions src/db_adapters/assets/fungible_token_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ fn compose_ft_db_events(
event_memo: mint_event
.memo
.clone()
.unwrap_or_else(|| "".to_string())
.unwrap_or_default()
.escape_default()
.to_string(),
});
Expand All @@ -102,7 +102,7 @@ fn compose_ft_db_events(
event_memo: transfer_event
.memo
.clone()
.unwrap_or_else(|| "".to_string())
.unwrap_or_default()
.escape_default()
.to_string(),
});
Expand All @@ -126,7 +126,7 @@ fn compose_ft_db_events(
event_memo: burn_event
.memo
.clone()
.unwrap_or_else(|| "".to_string())
.unwrap_or_default()
.escape_default()
.to_string(),
});
Expand Down
19 changes: 5 additions & 14 deletions src/db_adapters/assets/non_fungible_token_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ fn compose_nft_db_events(
match &event.event_kind {
event_types::Nep171EventKind::NftMint(mint_events) => {
for mint_event in mint_events {
let memo = mint_event.memo.clone().unwrap_or_else(|| "".to_string());
let memo = mint_event.memo.clone().unwrap_or_default();
for token_id in &mint_event.token_ids {
nft_events.push(
models::assets::non_fungible_token_events::NonFungibleTokenEvent {
Expand All @@ -84,14 +84,8 @@ fn compose_nft_db_events(
}
event_types::Nep171EventKind::NftTransfer(transfer_events) => {
for transfer_event in transfer_events {
let authorized_id = transfer_event
.authorized_id
.clone()
.unwrap_or_else(|| "".to_string());
let memo = transfer_event
.memo
.clone()
.unwrap_or_else(|| "".to_string());
let authorized_id = transfer_event.authorized_id.clone().unwrap_or_default();
let memo = transfer_event.memo.clone().unwrap_or_default();
for token_id in &transfer_event.token_ids {
nft_events.push(
models::assets::non_fungible_token_events::NonFungibleTokenEvent {
Expand Down Expand Up @@ -121,11 +115,8 @@ fn compose_nft_db_events(
}
event_types::Nep171EventKind::NftBurn(burn_events) => {
for burn_event in burn_events {
let authorized_id = &burn_event
.authorized_id
.clone()
.unwrap_or_else(|| "".to_string());
let memo = burn_event.memo.clone().unwrap_or_else(|| "".to_string());
let authorized_id = &burn_event.authorized_id.clone().unwrap_or_default();
let memo = burn_event.memo.clone().unwrap_or_default();
for token_id in &burn_event.token_ids {
nft_events.push(
models::assets::non_fungible_token_events::NonFungibleTokenEvent {
Expand Down
105 changes: 80 additions & 25 deletions src/db_adapters/receipts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ use diesel::pg::expression::array_comparison::any;
use diesel::{ExpressionMethods, JoinOnDsl, PgConnection, QueryDsl};
use futures::future::try_join_all;
use futures::try_join;
use num_traits::cast::FromPrimitive;
use near_primitives::transaction::Action;
use near_primitives::views::ActionView;
use tracing::{error, warn};

use crate::schema;
Expand Down Expand Up @@ -526,36 +527,90 @@ async fn store_action_receipt_actions(
receipts: &[&near_indexer::near_primitives::views::ReceiptView],
block_timestamp: u64,
) -> anyhow::Result<()> {
let receipt_action_actions: Vec<models::ActionReceiptAction> = receipts
.iter()
.filter_map(|receipt| {
if let near_indexer::near_primitives::views::ReceiptEnumView::Action { actions, .. } =
&receipt.receipt
{
Some(actions.iter().enumerate().map(move |(index, action)| {
models::ActionReceiptAction::from_action_view(
receipt.receipt_id.to_string(),
i32::from_usize(index).expect("We expect usize to not overflow i32 here"),
action,
receipt.predecessor_id.to_string(),
receipt.receiver_id.to_string(),
block_timestamp,
)
}))
} else {
None
let mut action_receipt_actions: Vec<models::ActionReceiptAction> = vec![];
for receipt in receipts {
if let near_primitives::views::ReceiptEnumView::Action { actions, .. } =
&receipt.receipt
{
let mut index = 0;
for action in actions {
let (action_kind, args) =
models::extract_action_type_and_value_from_action_view(action);
match action {
ActionView::Delegate {
delegate_action,
signature,
} => {
let parent_index = index;
let delegate_parameters = serde_json::json!({
"signature": signature,
"sender_id": delegate_action.sender_id,
"receiver_id": delegate_action.receiver_id,
"nonce": delegate_action.nonce,
"max_block_height": delegate_action.max_block_height,
"public_key": delegate_action.public_key,
});
action_receipt_actions.push(models::ActionReceiptAction {
receipt_id: receipt.receipt_id.to_string(),
index_in_action_receipt: index,
action_kind,
args,
receipt_predecessor_account_id: receipt.predecessor_id.to_string(),
receipt_receiver_account_id: receipt.receiver_id.to_string(),
receipt_included_in_block_timestamp: block_timestamp.into(),
is_delegate_action: true,
delegate_parameters: Some(delegate_parameters.clone()),
delegate_parent_index_in_action_receipt: None,
});
index += 1;
for non_delegate_action in &delegate_action.actions {
let (action_kind, args) =
models::extract_action_type_and_value_from_action_view(
&ActionView::from(Action::from(non_delegate_action.clone())),
);
action_receipt_actions.push(models::ActionReceiptAction {
receipt_id: receipt.receipt_id.to_string(),
index_in_action_receipt: index,
action_kind,
args,
receipt_predecessor_account_id: receipt.predecessor_id.to_string(),
receipt_receiver_account_id: receipt.receiver_id.to_string(),
receipt_included_in_block_timestamp: block_timestamp.into(),
is_delegate_action: true,
delegate_parameters: Some(delegate_parameters.clone()),
delegate_parent_index_in_action_receipt: Some(parent_index),
});
index += 1;
}
}
_ => {
action_receipt_actions.push(models::ActionReceiptAction {
receipt_id: receipt.receipt_id.to_string(),
index_in_action_receipt: index,
action_kind,
args,
receipt_predecessor_account_id: receipt.predecessor_id.to_string(),
receipt_receiver_account_id: receipt.receiver_id.to_string(),
receipt_included_in_block_timestamp: block_timestamp.into(),
is_delegate_action: false,
delegate_parameters: None,
delegate_parent_index_in_action_receipt: None,
});
index += 1;
}
}
}
})
.flatten()
.collect();
}
}

crate::await_retry_or_panic!(
diesel::insert_into(schema::action_receipt_actions::table)
.values(receipt_action_actions.clone())
.values(action_receipt_actions.clone())
.on_conflict_do_nothing()
.execute_async(pool),
10,
"ReceiptActionActions were stored in database".to_string(),
&receipt_action_actions
"ActionReceiptActions were stored in database".to_string(),
&action_receipt_actions
);
Ok(())
}
Expand Down
85 changes: 66 additions & 19 deletions src/db_adapters/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use diesel::{ExpressionMethods, PgConnection, QueryDsl};
use futures::future::try_join_all;

use near_indexer::near_primitives;
use near_primitives::transaction::Action;
use near_primitives::views::ActionView;

use crate::schema;
use crate::{metrics, models};
Expand Down Expand Up @@ -133,7 +135,7 @@ async fn store(

async fn store_chunk_transactions(
pool: &actix_diesel::Database<PgConnection>,
transactions: Vec<(usize, &near_indexer::IndexerTransactionWithOutcome)>,
enumerated_transactions: Vec<(usize, &near_indexer::IndexerTransactionWithOutcome)>,
chunk_hash: &near_indexer::near_primitives::hash::CryptoHash,
block_hash: &near_indexer::near_primitives::hash::CryptoHash,
block_timestamp: u64,
Expand All @@ -143,7 +145,7 @@ async fn store_chunk_transactions(
) -> anyhow::Result<()> {
let mut receipts_cache_lock = receipts_cache_arc.lock().await;

let transaction_models: Vec<models::transactions::Transaction> = transactions
let transaction_models: Vec<models::transactions::Transaction> = enumerated_transactions
.iter()
.map(|(index, tx)| {
let transaction_hash = tx.transaction.hash.to_string() + transaction_hash_suffix;
Expand Down Expand Up @@ -199,23 +201,69 @@ async fn store_chunk_transaction_actions(
// hack for supporting duplicated transaction hashes. Empty for most of transactions
transaction_hash_suffix: &str,
) -> anyhow::Result<()> {
let transaction_action_models: Vec<models::TransactionAction> = transactions
.into_iter()
.flat_map(|tx| {
let mut transaction_action_models: Vec<models::TransactionAction> = vec![];
for tx in transactions {
let mut index = 0;
for action in &tx.transaction.actions {
let transaction_hash = tx.transaction.hash.to_string() + transaction_hash_suffix;
tx.transaction
.actions
.iter()
.enumerate()
.map(move |(index, action)| {
models::transactions::TransactionAction::from_action_view(
transaction_hash.clone(),
index as i32,
action,
)
})
})
.collect();
let (action_kind, args) =
models::extract_action_type_and_value_from_action_view(action);
match action {
ActionView::Delegate {
delegate_action,
signature,
} => {
let parent_index = index;
let delegate_parameters = serde_json::json!({
"signature": signature,
"sender_id": delegate_action.sender_id,
"receiver_id": delegate_action.receiver_id,
"nonce": delegate_action.nonce,
"max_block_height": delegate_action.max_block_height,
"public_key": delegate_action.public_key,
});
transaction_action_models.push(models::transactions::TransactionAction {
transaction_hash: transaction_hash.clone(),
index_in_transaction: index,
args,
action_kind,
is_delegate_action: true,
delegate_parameters: Some(delegate_parameters.clone()),
delegate_parent_index_in_transaction: None,
});
index += 1;
for non_delegate_action in &delegate_action.actions {
let (action_kind, args) =
models::extract_action_type_and_value_from_action_view(
&ActionView::from(Action::from(non_delegate_action.clone())),
);
transaction_action_models.push(models::transactions::TransactionAction {
transaction_hash: transaction_hash.clone(),
index_in_transaction: index,
args,
action_kind,
is_delegate_action: true,
delegate_parameters: Some(delegate_parameters.clone()),
delegate_parent_index_in_transaction: Some(parent_index),
});
index += 1;
}
}
_ => {
transaction_action_models.push(models::transactions::TransactionAction {
transaction_hash: transaction_hash.clone(),
index_in_transaction: index,
args,
action_kind,
is_delegate_action: false,
delegate_parameters: None,
delegate_parent_index_in_transaction: None,
});
index += 1;
}
}
}
}

crate::await_retry_or_panic!(
diesel::insert_into(schema::transaction_actions::table)
Expand All @@ -226,6 +274,5 @@ async fn store_chunk_transaction_actions(
"TransactionActions were stored in database".to_string(),
&transaction_action_models
);

Ok(())
}
27 changes: 3 additions & 24 deletions src/models/receipts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,30 +121,9 @@ pub struct ActionReceiptAction {
pub receipt_predecessor_account_id: String,
pub receipt_receiver_account_id: String,
pub receipt_included_in_block_timestamp: BigDecimal,
}

impl ActionReceiptAction {
pub fn from_action_view(
receipt_id: String,
index: i32,
action_view: &near_indexer::near_primitives::views::ActionView,
predecessor_account_id: String,
receiver_account_id: String,
block_timestamp: u64,
) -> Self {
let (action_kind, args) =
crate::models::extract_action_type_and_value_from_action_view(action_view);

Self {
receipt_id,
index_in_action_receipt: index,
args,
action_kind,
receipt_predecessor_account_id: predecessor_account_id,
receipt_receiver_account_id: receiver_account_id,
receipt_included_in_block_timestamp: block_timestamp.into(),
}
}
pub is_delegate_action: bool,
pub delegate_parameters: Option<serde_json::Value>,
pub delegate_parent_index_in_action_receipt: Option<i32>,
}

#[derive(Insertable, Clone, Debug)]
Expand Down
Loading

0 comments on commit 5d59b93

Please sign in to comment.