@@ -21,9 +21,9 @@ use tycho_collator::internal_queue::types::{
21
21
DiffStatistics , DiffZone , EnqueuedMessage , InternalMessageValue , PartitionRouter ,
22
22
QueueDiffWithMessages , QueueShardRange ,
23
23
} ;
24
- use tycho_storage:: snapshot:: AccountStatistics ;
24
+ use tycho_storage:: snapshot:: { AccountStatistics , InternalQueueSnapshot } ;
25
25
use tycho_storage:: Storage ;
26
- use tycho_util:: { FastHashMap , FastHashSet } ;
26
+ use tycho_util:: FastHashSet ;
27
27
28
28
#[ derive( Clone , Debug , PartialEq , Eq ) ]
29
29
struct StoredObject {
@@ -1785,25 +1785,8 @@ async fn prepare_data_from_prepared_persistent_state(
1785
1785
block_id_str : & str ,
1786
1786
diff_hash_str : & str ,
1787
1787
tail_len : u32 ,
1788
- ) -> anyhow:: Result < (
1789
- Storage ,
1790
- QueueImpl < QueueStateStdImpl , EnqueuedMessage > ,
1791
- BlockId ,
1792
- OutMsgQueueUpdates ,
1793
- ) > {
1794
- let ( storage, _tmp_dir) = Storage :: new_temp ( ) . await ?;
1795
-
1796
- let queue_factory = QueueFactoryStdImpl {
1797
- state : QueueStateImplFactory {
1798
- storage : storage. clone ( ) ,
1799
- } ,
1800
- config : QueueConfig {
1801
- gc_interval : Duration :: from_secs ( 1 ) ,
1802
- } ,
1803
- } ;
1804
-
1805
- let queue: QueueImpl < QueueStateStdImpl , EnqueuedMessage > = queue_factory. create ( ) ;
1806
-
1788
+ storage : & Storage ,
1789
+ ) -> anyhow:: Result < ( BlockId , OutMsgQueueUpdates ) > {
1807
1790
let diff_hash = HashBytes :: from_str ( diff_hash_str) ?;
1808
1791
let top_update = OutMsgQueueUpdates {
1809
1792
diff_hash,
@@ -1812,36 +1795,87 @@ async fn prepare_data_from_prepared_persistent_state(
1812
1795
1813
1796
let block_id = BlockId :: from_str ( block_id_str) ?;
1814
1797
1815
- let file = std:: fs:: File :: open ( & file_path) ?;
1798
+ let file = std:: fs:: File :: open ( file_path) ?;
1816
1799
1817
1800
let internal_queue = storage. internal_queue_storage ( ) ;
1818
1801
internal_queue
1819
1802
. import_from_file ( & top_update, file, block_id)
1820
1803
. await ?;
1821
1804
1822
- Ok ( ( storage , queue , block_id, top_update) )
1805
+ Ok ( ( block_id, top_update) )
1823
1806
}
1824
1807
1825
1808
#[ tokio:: test]
1826
- async fn test_import_persistent_state_sc ( ) -> anyhow:: Result < ( ) > {
1827
- let ( storage, queue, block_id, top_update) = prepare_data_from_prepared_persistent_state (
1828
- "tests/test_data/sb.bin" ,
1809
+ async fn test_import_persistent_state ( ) -> anyhow:: Result < ( ) > {
1810
+ // init storage
1811
+ let ( storage, _tmp_dir) = Storage :: new_temp ( ) . await ?;
1812
+
1813
+ // init queue
1814
+ let queue_factory = QueueFactoryStdImpl {
1815
+ state : QueueStateImplFactory {
1816
+ storage : storage. clone ( ) ,
1817
+ } ,
1818
+ config : QueueConfig {
1819
+ gc_interval : Duration :: from_secs ( 1 ) ,
1820
+ } ,
1821
+ } ;
1822
+
1823
+ let queue: QueueImpl < QueueStateStdImpl , EnqueuedMessage > = queue_factory. create ( ) ;
1824
+
1825
+ // import shard persistent state
1826
+ let ( shard_block_id, shard_top_update) = prepare_data_from_prepared_persistent_state (
1827
+ "../test/data/internals/persistent_state/shard.bin" ,
1829
1828
// block_id_str
1830
1829
"0:8000000000000000:160:f54b2c09be8c873670374271571ea70aeacbfbdbef9ac45252451cb035f11a33:0097ee79d8551854a353dd80991c17dafdaaa933abfcb012f93437f3a25c0365" ,
1831
1830
// diff_hash_str
1832
1831
"794fe9ce31b7919d2b6ca9ae0f97501f4017382a273b8f3ee512dcfcb5b14482" ,
1833
1832
// tail_len
1834
1833
3 ,
1834
+ & storage
1835
+ ) . await ?;
1836
+
1837
+ // should be none because it mc block state is not imported
1838
+ let last_committed_block = queue. get_last_committed_mc_block_id ( ) ?;
1839
+ assert ! ( last_committed_block. is_none( ) ) ;
1840
+
1841
+ // import mc persistent state
1842
+ let ( mc_block_id, mc_top_update) = prepare_data_from_prepared_persistent_state (
1843
+ "../test/data/internals/persistent_state/master.bin" ,
1844
+ // block_id_str
1845
+ "-1:8000000000000000:90:9e6e3bdbefda3a64a9dab50387f64a8c30b33a04b6fd638a8058e1344bf8a9d1:60ac6a64e15fb70a1c5629f0bb38890f3fd45aa7ba77e4b847cbf0f48d5d5818" ,
1846
+ // diff_hash_str
1847
+ "5795ca5d1b8d0c1ce96394eef851bde718ff2e14fd2840098ed2bb7640f278c5" ,
1848
+ // tail_len
1849
+ 1 ,
1850
+ & storage,
1835
1851
) . await ?;
1836
1852
1837
1853
let snapshot = storage. internal_queue_storage ( ) . make_snapshot ( ) ;
1838
1854
1855
+ // check shard queue
1856
+ check_imported_queue ( & queue, & shard_block_id, & shard_top_update, & snapshot) ?;
1857
+ // check mc queue
1858
+ check_imported_queue ( & queue, & mc_block_id, & mc_top_update, & snapshot) ?;
1859
+
1860
+ // common checks
1861
+ let last_committed_pointer = snapshot. read_commit_pointers ( ) ?;
1862
+ assert_eq ! ( last_committed_pointer. len( ) , 2 ) ;
1863
+
1864
+ let last_committed_block = queue. get_last_committed_mc_block_id ( ) ?. unwrap ( ) ;
1865
+ assert_eq ! ( last_committed_block, mc_block_id) ;
1866
+
1867
+ Ok ( ( ) )
1868
+ }
1869
+
1870
+ fn check_imported_queue (
1871
+ queue : & QueueImpl < QueueStateStdImpl , EnqueuedMessage > ,
1872
+ block_id : & BlockId ,
1873
+ top_update : & OutMsgQueueUpdates ,
1874
+ snapshot : & InternalQueueSnapshot ,
1875
+ ) -> anyhow:: Result < ( ) > {
1839
1876
let diff_tail = queue. get_diffs_tail_len ( & block_id. shard , & QueueKey :: MIN ) ;
1840
1877
assert_eq ! ( diff_tail, top_update. tail_len) ;
1841
1878
1842
- let last_committed_block = queue. get_last_committed_mc_block_id ( ) ?;
1843
- assert ! ( last_committed_block. is_none( ) ) ;
1844
-
1845
1879
let last_applied_block_seqno = snapshot
1846
1880
. get_last_applied_diff_seqno ( & block_id. shard ) ?
1847
1881
. unwrap ( ) ;
@@ -1866,15 +1900,6 @@ async fn test_import_persistent_state_sc() -> anyhow::Result<()> {
1866
1900
}
1867
1901
}
1868
1902
1869
- let mut stats = FastHashMap :: default ( ) ;
1870
- snapshot. collect_stats_in_range (
1871
- & block_id. shard ,
1872
- 0 ,
1873
- & QueueKey :: MIN ,
1874
- & QueueKey :: MAX ,
1875
- & mut stats,
1876
- ) ?;
1877
-
1878
1903
let mut stat = AccountStatistics :: default ( ) ;
1879
1904
snapshot. collect_stats_in_range (
1880
1905
& block_id. shard ,
@@ -1884,65 +1909,40 @@ async fn test_import_persistent_state_sc() -> anyhow::Result<()> {
1884
1909
& mut stat,
1885
1910
) ?;
1886
1911
1912
+ assert_eq ! ( stat, total_stats) ;
1913
+
1887
1914
let mut total_messages_by_stat = 0 ;
1888
1915
for s in stat. values ( ) {
1889
1916
total_messages_by_stat += s;
1890
1917
}
1891
1918
1892
- let iter_range_1 = QueueShardRange {
1893
- shard_ident : ShardIdent :: BASECHAIN ,
1919
+ // if tail_len > 1, then we should have at least one message when tail len == 1 its not guaranteed that diff has messages
1920
+ if top_update. tail_len > 1 {
1921
+ assert ! ( total_messages_by_stat > 0 ) ;
1922
+ }
1923
+
1924
+ let iter_range = vec ! [ QueueShardRange {
1925
+ shard_ident: block_id. shard,
1894
1926
from: QueueKey :: MIN . next_value( ) ,
1895
1927
to: QueueKey :: MAX . next_value( ) ,
1896
- } ;
1897
-
1898
- let mut iterator = queue. iterator ( 0 , & vec ! [ iter_range_1] , block_id. shard ) ?;
1928
+ } ] ;
1899
1929
1900
1930
let mut read_messages = 0 ;
1901
- while let Some ( _) = iterator. next ( ) ? {
1931
+
1932
+ let mut iterator = queue. iterator ( 0 , & iter_range, ShardIdent :: MASTERCHAIN ) ?;
1933
+ while iterator. next ( ) ?. is_some ( ) {
1902
1934
read_messages += 1 ;
1903
1935
}
1904
1936
1905
- assert_eq ! ( read_messages, total_messages_by_stat) ;
1906
-
1907
- let last_committed_pointer = snapshot. read_commit_pointers ( ) ?;
1908
-
1909
- assert_eq ! ( last_committed_pointer. len( ) , 1 ) ;
1910
-
1911
- let pointer = last_committed_pointer. get ( & block_id. shard ) . unwrap ( ) ;
1912
- assert_eq ! ( pointer. seqno, block_id. seqno) ;
1913
-
1914
- Ok ( ( ) )
1915
- }
1916
-
1917
- #[ tokio:: test]
1918
- async fn test_import_persistent_state_mc ( ) -> anyhow:: Result < ( ) > {
1919
- let ( storage, queue, block_id, top_update) = prepare_data_from_prepared_persistent_state (
1920
- "tests/test_data/mb.bin" ,
1921
- // block_id_str
1922
- "-1:8000000000000000:90:9e6e3bdbefda3a64a9dab50387f64a8c30b33a04b6fd638a8058e1344bf8a9d1:60ac6a64e15fb70a1c5629f0bb38890f3fd45aa7ba77e4b847cbf0f48d5d5818" ,
1923
- // diff_hash_str
1924
- "5795ca5d1b8d0c1ce96394eef851bde718ff2e14fd2840098ed2bb7640f278c5" ,
1925
- // tail_len
1926
- 1 ,
1927
- ) . await ?;
1928
-
1929
- let snapshot = storage. internal_queue_storage ( ) . make_snapshot ( ) ;
1930
-
1931
- let diff_tail = queue. get_diffs_tail_len ( & block_id. shard , & QueueKey :: MIN ) ;
1932
- assert_eq ! ( diff_tail, top_update. tail_len) ;
1937
+ let mut iterator = queue. iterator ( 0 , & iter_range, ShardIdent :: BASECHAIN ) ?;
1933
1938
1934
- let last_committed_block = queue. get_last_committed_mc_block_id ( ) ?. unwrap ( ) ;
1935
- assert_eq ! ( last_committed_block, block_id) ;
1939
+ while iterator. next ( ) ?. is_some ( ) {
1940
+ read_messages += 1 ;
1941
+ }
1936
1942
1937
- let last_applied_block_seqno = snapshot
1938
- . get_last_applied_diff_seqno ( & block_id. shard ) ?
1939
- . unwrap ( ) ;
1940
- assert_eq ! ( last_applied_block_seqno, block_id. seqno) ;
1943
+ assert_eq ! ( read_messages, total_messages_by_stat) ;
1941
1944
1942
1945
let last_committed_pointer = snapshot. read_commit_pointers ( ) ?;
1943
-
1944
- assert_eq ! ( last_committed_pointer. len( ) , 1 ) ;
1945
-
1946
1946
let pointer = last_committed_pointer. get ( & block_id. shard ) . unwrap ( ) ;
1947
1947
assert_eq ! ( pointer. seqno, block_id. seqno) ;
1948
1948
0 commit comments