Skip to content

Commit

Permalink
Fix write_version sanity checking for 1.9
Browse files Browse the repository at this point in the history
In 1.9, the write_version will no longer be strictly monotonic across
accounts.
  • Loading branch information
ckamm committed Mar 28, 2022
1 parent 9e64377 commit c2f0c0a
Showing 1 changed file with 23 additions and 16 deletions.
39 changes: 23 additions & 16 deletions lib/src/grpc_plugin_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,18 +123,21 @@ async fn feed_data_geyser(
// Highest slot that an account write came in for.
let mut newest_write_slot: u64 = 0;

// map slot -> (pubkey -> write count)
struct WriteVersion {
// Write version seen on-chain
global: u64,
// The per-pubkey per-slot write version
slot: u32,
}

// map slot -> (pubkey -> WriteVersion)
//
// Since the write_version is a private indentifier per node it can't be used
// to deduplicate events from multiple nodes. Here we rewrite it such that each
// pubkey and each slot has a consecutive numbering of writes starting at 1.
//
// That number will be consistent for each node.
let mut slot_pubkey_writes = HashMap::<u64, HashMap<[u8; 32], u32>>::new();

// Keep track of write version from RPC node, to check the assumption that it
// increases monotonically
let mut last_write_version: u64 = 0;
let mut slot_pubkey_writes = HashMap::<u64, HashMap<[u8; 32], WriteVersion>>::new();

loop {
tokio::select! {
Expand Down Expand Up @@ -173,20 +176,24 @@ async fn feed_data_geyser(
anyhow::bail!("received write {} slots back from max rooted slot {}", max_rooted_slot - write.slot, max_rooted_slot);
}

// We assume we will receive write versions in sequence.
let pubkey_writes = slot_pubkey_writes.entry(write.slot).or_default();

let pubkey_bytes = Pubkey::new(&write.pubkey).to_bytes();
let write_version_mapping = pubkey_writes.entry(pubkey_bytes).or_insert(WriteVersion {
global: write.write_version,
slot: 1, // write version 0 is reserved for snapshots
});

// We assume we will receive write versions for each pubkey in sequence.
// If this is not the case, logic here does not work correctly because
// a later write could arrive first.
if write.write_version <= last_write_version {
anyhow::bail!("unexpected write version: {} expected > {}", write.write_version, last_write_version);
if write.write_version < write_version_mapping.global {
anyhow::bail!("unexpected write version: got {}, expected >= {}", write.write_version, write_version_mapping.global);
}
last_write_version = write.write_version;

let pubkey_writes = slot_pubkey_writes.entry(write.slot).or_default();

let pubkey_bytes = Pubkey::new(&write.pubkey).to_bytes();
let writes = pubkey_writes.entry(pubkey_bytes).or_insert(1); // write version 0 is reserved for snapshots
write.write_version = *writes as u64;
*writes += 1;
// Rewrite the update to use the local write version and bump it
write.write_version = write_version_mapping.slot as u64;
write_version_mapping.slot += 1;
},
geyser_proto::update::UpdateOneof::Ping(_) => {},
}
Expand Down

0 comments on commit c2f0c0a

Please sign in to comment.