Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions crates/core/src/datasource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ pub struct AccountDeletion {
/// - `slot`: The slot number in which the transaction was recorded.
/// - `block_time`: The Unix timestamp of when the transaction was processed.
/// - `block_hash`: Block hash that can be used to detect a fork.
/// - `created_at`: The Unix timestamp of when the update was created (currently useful for Yellowstone GRPC TX updates)
///
/// Note: The `block_time` field may not be returned in all scenarios.
#[derive(Debug, Clone)]
Expand All @@ -312,4 +313,5 @@ pub struct TransactionUpdate {
pub slot: u64,
pub block_time: Option<i64>,
pub block_hash: Option<Hash>,
pub created_at: Option<i64>,
}
3 changes: 3 additions & 0 deletions crates/core/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ use {
/// - `message`: The versioned message containing the transaction instructions
/// and account keys
/// - `block_time`: The Unix timestamp of when the transaction was processed.
/// - `update_received_at`: The Unix timestamp of when the update for this transaction was received.
///
/// Note: The `block_time` field may not be returned in all scenarios.
#[derive(Debug, Clone, Default)]
Expand All @@ -74,6 +75,7 @@ pub struct TransactionMetadata {
pub message: solana_program::message::VersionedMessage,
pub block_time: Option<i64>,
pub block_hash: Option<Hash>,
pub update_received_at: Option<i64>,
}

/// Tries convert transaction update into the metadata.
Expand Down Expand Up @@ -113,6 +115,7 @@ impl TryFrom<crate::datasource::TransactionUpdate> for TransactionMetadata {
message: value.transaction.message.clone(),
block_time: value.block_time,
block_hash: value.block_hash,
update_received_at: value.created_at,
})
}
}
Expand Down
2 changes: 2 additions & 0 deletions crates/core/src/transformers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -741,6 +741,7 @@ mod tests {
slot: 123,
block_time: Some(123),
block_hash: Hash::from_str("9bit9vXNX9HyHwL89aGDNmk3vbyAM96nvb6F4SaoM1CU").ok(),
created_at: Some(123),
};
let transaction_metadata = transaction_update
.clone()
Expand Down Expand Up @@ -1170,6 +1171,7 @@ mod tests {
slot: 123,
block_time: Some(123),
block_hash: None,
created_at: Some(123),
};
let transaction_metadata = transaction_update
.clone()
Expand Down
1 change: 1 addition & 0 deletions datasources/helius-atlas-ws-datasource/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,7 @@ impl Datasource for HeliusWebsocket {
slot: tx_event.slot,
block_time: None,
block_hash: None,
created_at: None,
}));

metrics
Expand Down
1 change: 1 addition & 0 deletions datasources/helius-laserstream-datasource/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,7 @@ async fn send_subscribe_update_transaction_info(
slot,
block_time,
block_hash: None,
created_at: None,
}));
if let Err(e) = sender.try_send((update, id)) {
log::error!(
Expand Down
1 change: 1 addition & 0 deletions datasources/jito-shredstream-grpc-datasource/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ impl Datasource for JitoShredstreamGrpcClient {
slot: message.slot,
block_time,
block_hash: None,
created_at: None,
}));

if let Err(e) = sender.try_send((update, id_for_closure.clone())) {
Expand Down
1 change: 1 addition & 0 deletions datasources/rpc-block-crawler-datasource/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,7 @@ fn task_processor(
slot,
block_time: block.block_time,
block_hash,
created_at: None,
}));

metrics
Expand Down
1 change: 1 addition & 0 deletions datasources/rpc-block-subscribe-datasource/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ impl Datasource for RpcBlockSubscribe {
slot,
block_time: block.block_time,
block_hash,
created_at: None,
}));

metrics
Expand Down
1 change: 1 addition & 0 deletions datasources/rpc-transaction-crawler-datasource/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,7 @@ fn task_processor(
slot: fetched_transaction.slot,
block_time: fetched_transaction.block_time,
block_hash: None,
created_at: None,
}));


Expand Down
6 changes: 4 additions & 2 deletions datasources/yellowstone-grpc-datasource/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,14 +227,14 @@ impl Datasource for YellowstoneGrpcGeyserClient {
}

Some(UpdateOneof::Transaction(transaction_update)) => {
send_subscribe_update_transaction_info(transaction_update.transaction, &metrics, &sender, id_for_loop.clone(), transaction_update.slot, None).await
send_subscribe_update_transaction_info(transaction_update.transaction, &metrics, &sender, id_for_loop.clone(), transaction_update.slot, None, msg.created_at.map(|ts| ts.seconds)).await
}
Some(UpdateOneof::Block(block_update)) => {
let block_time = block_update.block_time.map(|ts| ts.timestamp);

for transaction_update in block_update.transactions {
if retain_block_failed_transactions || transaction_update.meta.as_ref().map(|meta| meta.err.is_none()).unwrap_or(false) {
send_subscribe_update_transaction_info(Some(transaction_update), &metrics, &sender, id_for_loop.clone(), block_update.slot, block_time).await
send_subscribe_update_transaction_info(Some(transaction_update), &metrics, &sender, id_for_loop.clone(), block_update.slot, block_time, msg.created_at.map(|ts| ts.seconds)).await
}
}

Expand Down Expand Up @@ -389,6 +389,7 @@ async fn send_subscribe_update_transaction_info(
id: DatasourceId,
slot: u64,
block_time: Option<i64>,
created_at: Option<i64>,
) {
let start_time = std::time::Instant::now();

Expand Down Expand Up @@ -420,6 +421,7 @@ async fn send_subscribe_update_transaction_info(
slot,
block_time,
block_hash: None,
created_at,
}));
if let Err(e) = sender.try_send((update, id)) {
log::error!(
Expand Down
Loading