@@ -9,14 +9,8 @@ use commonware_coding::ReedSolomon;
99use commonware_consensus:: {
1010 marshal:: {
1111 self ,
12- ingress:: {
13- coding:: {
14- application:: CodingAdapter ,
15- mailbox:: ShardMailbox ,
16- types:: { CodedBlock , Shard } ,
17- } ,
18- handler,
19- } ,
12+ coding:: { self , CodedBlock , CodingAdapter , Shard } ,
13+ ingress:: handler,
2014 } ,
2115 threshold_simplex:: { self , Engine as Consensus } ,
2216 types:: CodingCommitment ,
@@ -42,11 +36,8 @@ use std::{num::NonZero, time::Duration};
4236use tracing:: { error, warn} ;
4337
4438/// Reporter type for [threshold_simplex::Engine].
45- type Reporter < E , I > = Reporters <
46- Activity ,
47- marshal:: Mailbox < MinSig , Block , ReedSolomon < Sha256 > , PublicKey > ,
48- Option < indexer:: Pusher < E , I > > ,
49- > ;
39+ type Reporter < E , I > =
40+ Reporters < Activity , marshal:: Mailbox < MinSig , Block > , Option < indexer:: Pusher < E , I > > > ;
5041
5142/// To better support peers near tip during network instability, we multiply
5243/// the consensus activity timeout by this factor.
@@ -104,8 +95,9 @@ pub struct Engine<
10495
10596 application : Application < E > ,
10697 buffer : buffered:: Engine < E , PublicKey , Shard < ReedSolomon < Sha256 > , Sha256 > > ,
107- shard_mailbox : ShardMailbox < ReedSolomon < Sha256 > , Sha256 , Block , PublicKey > ,
108- marshal : marshal:: Actor < Block , E , MinSig , ReedSolomon < Sha256 > , PublicKey > ,
98+ shard_mailbox : coding:: Mailbox < Block , ReedSolomon < Sha256 > , PublicKey > ,
99+ coding : coding:: Actor < E , ReedSolomon < Sha256 > , Sha256 , Block , PublicKey > ,
100+ marshal : marshal:: Actor < Block , E , MinSig , ReedSolomon < Sha256 > > ,
109101
110102 pub supervisor : Supervisor ,
111103
@@ -135,43 +127,32 @@ impl<
135127 let buffer_pool = PoolRef :: new ( BUFFER_POOL_PAGE_SIZE , BUFFER_POOL_CAPACITY ) ;
136128
137129 // Create marshal
138- let ( marshal, marshal_mailbox) : (
139- _ ,
140- marshal:: Mailbox < MinSig , Block , ReedSolomon < Sha256 > , PublicKey > ,
141- ) = marshal:: Actor :: init (
142- context. with_label ( "marshal" ) ,
143- marshal:: Config {
144- identity,
145- partition_prefix : cfg. partition_prefix . clone ( ) ,
146- mailbox_size : cfg. mailbox_size ,
147- view_retention_timeout : cfg
148- . activity_timeout
149- . saturating_mul ( SYNCER_ACTIVITY_TIMEOUT_MULTIPLIER ) ,
150- namespace : NAMESPACE . to_vec ( ) ,
151- prunable_items_per_section : PRUNABLE_ITEMS_PER_SECTION ,
152- immutable_items_per_section : IMMUTABLE_ITEMS_PER_SECTION ,
153- freezer_table_initial_size : cfg. blocks_freezer_table_initial_size ,
154- freezer_table_resize_frequency : FREEZER_TABLE_RESIZE_FREQUENCY ,
155- freezer_table_resize_chunk_size : FREEZER_TABLE_RESIZE_CHUNK_SIZE ,
156- freezer_journal_target_size : FREEZER_JOURNAL_TARGET_SIZE ,
157- freezer_journal_compression : FREEZER_JOURNAL_COMPRESSION ,
158- freezer_journal_buffer_pool : buffer_pool. clone ( ) ,
159- replay_buffer : REPLAY_BUFFER ,
160- write_buffer : WRITE_BUFFER ,
161- codec_config : ( ) ,
162- max_repair : MAX_REPAIR ,
163- } ,
164- )
165- . await ;
166-
167- let supervisor = Supervisor :: new ( cfg. polynomial , cfg. participants . clone ( ) , cfg. share ) ;
168- let application = CodingAdapter :: new (
169- context. with_label ( "app" ) ,
170- AltoApp :: default ( ) ,
171- marshal_mailbox. clone ( ) ,
172- cfg. signer . public_key ( ) ,
173- supervisor. clone ( ) ,
174- ) ;
130+ let ( marshal, marshal_mailbox) : ( _ , marshal:: Mailbox < MinSig , Block > ) =
131+ marshal:: Actor :: init (
132+ context. with_label ( "marshal" ) ,
133+ marshal:: Config {
134+ identity,
135+ partition_prefix : cfg. partition_prefix . clone ( ) ,
136+ mailbox_size : cfg. mailbox_size ,
137+ view_retention_timeout : cfg
138+ . activity_timeout
139+ . saturating_mul ( SYNCER_ACTIVITY_TIMEOUT_MULTIPLIER ) ,
140+ namespace : NAMESPACE . to_vec ( ) ,
141+ prunable_items_per_section : PRUNABLE_ITEMS_PER_SECTION ,
142+ immutable_items_per_section : IMMUTABLE_ITEMS_PER_SECTION ,
143+ freezer_table_initial_size : cfg. blocks_freezer_table_initial_size ,
144+ freezer_table_resize_frequency : FREEZER_TABLE_RESIZE_FREQUENCY ,
145+ freezer_table_resize_chunk_size : FREEZER_TABLE_RESIZE_CHUNK_SIZE ,
146+ freezer_journal_target_size : FREEZER_JOURNAL_TARGET_SIZE ,
147+ freezer_journal_compression : FREEZER_JOURNAL_COMPRESSION ,
148+ freezer_journal_buffer_pool : buffer_pool. clone ( ) ,
149+ replay_buffer : REPLAY_BUFFER ,
150+ write_buffer : WRITE_BUFFER ,
151+ codec_config : ( ) ,
152+ max_repair : MAX_REPAIR ,
153+ } ,
154+ )
155+ . await ;
175156
176157 // Create the buffer
177158 let ( buffer, buffer_mailbox) = buffered:: Engine :: new (
@@ -184,10 +165,18 @@ impl<
184165 codec_config : ( usize:: MAX , usize:: MAX ) ,
185166 } ,
186167 ) ;
187- let shard_mailbox = ShardMailbox :: new (
188- context. with_label ( "shard_mailbox" ) ,
189- buffer_mailbox. clone ( ) ,
190- ( ) ,
168+
169+ let ( coding, shard_mailbox) =
170+ coding:: Actor :: new ( context. with_label ( "coding" ) , buffer_mailbox, ( ) ) ;
171+
172+ let supervisor = Supervisor :: new ( cfg. polynomial , cfg. participants . clone ( ) , cfg. share ) ;
173+ let application = CodingAdapter :: new (
174+ context. with_label ( "app" ) ,
175+ AltoApp :: default ( ) ,
176+ marshal_mailbox. clone ( ) ,
177+ shard_mailbox. clone ( ) ,
178+ cfg. signer . public_key ( ) ,
179+ supervisor. clone ( ) ,
191180 ) ;
192181
193182 // Create the reporter
@@ -235,6 +224,7 @@ impl<
235224 application,
236225 buffer,
237226 shard_mailbox,
227+ coding,
238228 marshal,
239229
240230 supervisor,
@@ -309,6 +299,9 @@ impl<
309299 // Start the buffer
310300 let buffer_handle = self . buffer . start ( broadcast_network) ;
311301
302+ // Start coding
303+ let coding_handle = self . coding . start ( ) ;
304+
312305 // Start marshal
313306 let marshal_handle =
314307 self . marshal
@@ -323,7 +316,14 @@ impl<
323316 . start ( pending_network, recovered_network, resolver_network) ;
324317
325318 // Wait for any actor to finish
326- if let Err ( e) = try_join_all ( vec ! [ buffer_handle, marshal_handle, consensus_handle] ) . await {
319+ if let Err ( e) = try_join_all ( vec ! [
320+ buffer_handle,
321+ coding_handle,
322+ marshal_handle,
323+ consensus_handle,
324+ ] )
325+ . await
326+ {
327327 error ! ( ?e, "engine failed" ) ;
328328 } else {
329329 warn ! ( "engine stopped" ) ;
0 commit comments