Skip to content

Commit b7fc04e

Browse files
pashinovRexagon
authored andcommitted
refactor(control): move ManualCompaction to control
1 parent ec961fe commit b7fc04e

File tree

9 files changed

+119
-167
lines changed

9 files changed

+119
-167
lines changed

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

cli/src/cmd/node/control.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -288,14 +288,16 @@ pub struct CmdCompact {
288288
#[clap(flatten)]
289289
args: ControlArgs,
290290

291-
#[clap(short, long)]
292-
database: tycho_control::proto::TriggerCompactionRequest,
291+
#[clap(short, long, value_parser = ["base", "mempool", "rpc"])]
292+
database: String,
293293
}
294294

295295
impl CmdCompact {
296296
pub fn run(self, args: BaseArgs) -> Result<()> {
297297
self.args.rt(args, |client| async move {
298-
client.trigger_compaction(self.database).await?;
298+
client
299+
.trigger_compaction(self.database.as_str().try_into()?)
300+
.await?;
299301
print_json(Empty {})
300302
})
301303
}

cli/src/node/mod.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,9 @@ use tycho_control::{ControlEndpoint, ControlServer, ControlServerConfig, Control
2525
use tycho_core::block_strider::{
2626
ArchiveBlockProvider, ArchiveBlockProviderConfig, BlockProvider, BlockProviderExt,
2727
BlockStrider, BlockSubscriberExt, BlockchainBlockProvider, BlockchainBlockProviderConfig,
28-
ColdBootType, FileZerostateProvider, GcSubscriber, ManualCompaction, MetricsSubscriber,
29-
OptionalBlockStuff, PersistentBlockStriderState, PsSubscriber, ShardStateApplier, Starter,
30-
StarterConfig, StateSubscriber, StateSubscriberContext, StorageBlockProvider,
28+
ColdBootType, FileZerostateProvider, GcSubscriber, MetricsSubscriber, OptionalBlockStuff,
29+
PersistentBlockStriderState, PsSubscriber, ShardStateApplier, Starter, StarterConfig,
30+
StateSubscriber, StateSubscriberContext, StorageBlockProvider,
3131
};
3232
use tycho_core::blockchain_rpc::{
3333
BlockchainRpcClient, BlockchainRpcService, BroadcastListener, SelfBroadcastListener,
@@ -368,7 +368,6 @@ impl Node {
368368

369369
let gc_subscriber = GcSubscriber::new(self.storage.clone());
370370
let ps_subscriber = PsSubscriber::new(self.storage.clone());
371-
let manual_compaction = ManualCompaction::new(self.storage.clone());
372371

373372
// Create control server
374373
let control_server = {
@@ -377,7 +376,6 @@ impl Node {
377376
.with_gc_subscriber(gc_subscriber.clone())
378377
.with_storage(self.storage.clone())
379378
.with_blockchain_rpc_client(self.blockchain_rpc_client.clone())
380-
.with_manual_compaction(manual_compaction)
381379
.with_validator_keypair(self.keypair.clone())
382380
.with_collator(Arc::new(CollatorControl {
383381
config: self.collator_config.clone(),

control/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,11 @@ anyhow = { workspace = true }
1717
arc-swap = { workspace = true, optional = true }
1818
async-trait = { workspace = true }
1919
bytes = { workspace = true, features = ["serde"] }
20-
clap = { workspace = true }
2120
everscale-crypto = { workspace = true, optional = true }
2221
everscale-types = { workspace = true }
2322
futures-util = { workspace = true }
2423
parking_lot = { workspace = true, optional = true }
24+
scopeguard = { workspace = true }
2525
serde = { workspace = true }
2626
tarpc = { workspace = true }
2727
thiserror = { workspace = true }

control/src/proto.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ pub enum TriggerGcRequest {
122122
Distance(u32),
123123
}
124124

125-
#[derive(Debug, Clone, Serialize, Deserialize, clap::ValueEnum)]
125+
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
126126
pub enum TriggerCompactionRequest {
127127
/// Trigger compaction for `BaseDb`.
128128
Base,
@@ -132,6 +132,21 @@ pub enum TriggerCompactionRequest {
132132
Rpc,
133133
}
134134

135+
impl TryFrom<&str> for TriggerCompactionRequest {
136+
type Error = anyhow::Error;
137+
138+
fn try_from(s: &str) -> Result<Self, Self::Error> {
139+
let res = match s {
140+
"base" => Self::Base,
141+
"mempool" => Self::Mempool,
142+
"rpc" => Self::Rpc,
143+
_ => anyhow::bail!("invalid database type"),
144+
};
145+
146+
Ok(res)
147+
}
148+
}
149+
135150
#[derive(Debug, Clone, Serialize, Deserialize)]
136151
pub struct BroadcastExtMsgRequest {
137152
/// A BOC with a [`Message`].

control/src/server.rs

Lines changed: 92 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,15 @@ use everscale_types::prelude::*;
1616
use futures_util::future::BoxFuture;
1717
use futures_util::{FutureExt, StreamExt};
1818
use parking_lot::RwLock;
19+
use scopeguard::defer;
1920
use serde::{Deserialize, Serialize};
2021
use tarpc::server::Channel;
22+
use tokio::sync::watch;
23+
use tokio::task::AbortHandle;
2124
use tycho_block_util::config::build_elections_data_to_sign;
2225
use tycho_block_util::state::RefMcStateHandle;
2326
use tycho_core::block_strider::{
24-
GcSubscriber, ManualCompaction, ManualCompactionTrigger, ManualGcTrigger, StateSubscriber,
25-
StateSubscriberContext,
27+
GcSubscriber, ManualGcTrigger, StateSubscriber, StateSubscriberContext,
2628
};
2729
use tycho_core::blockchain_rpc::BlockchainRpcClient;
2830
use tycho_network::Network;
@@ -150,13 +152,7 @@ impl Drop for ControlEndpoint {
150152
}
151153

152154
pub struct ControlServerBuilder<
153-
MandatoryFields = (
154-
Network,
155-
Storage,
156-
GcSubscriber,
157-
BlockchainRpcClient,
158-
ManualCompaction,
159-
),
155+
MandatoryFields = (Network, Storage, GcSubscriber, BlockchainRpcClient),
160156
> {
161157
mandatory_fields: MandatoryFields,
162158
memory_profiler: Option<Arc<dyn MemoryProfiler>>,
@@ -166,8 +162,7 @@ pub struct ControlServerBuilder<
166162

167163
impl ControlServerBuilder {
168164
pub async fn build(self, version: ControlServerVersion) -> Result<ControlServer> {
169-
let (network, storage, gc_subscriber, blockchain_rpc_client, manual_compaction) =
170-
self.mandatory_fields;
165+
let (network, storage, gc_subscriber, blockchain_rpc_client) = self.mandatory_fields;
171166
let memory_profiler = self
172167
.memory_profiler
173168
.unwrap_or_else(|| Arc::new(StubMemoryProfiler));
@@ -207,6 +202,8 @@ impl ControlServerBuilder {
207202
},
208203
};
209204

205+
let manual_compaction = ManualCompaction::new(storage.clone());
206+
210207
Ok(ControlServer {
211208
inner: Arc::new(Inner {
212209
node_info,
@@ -224,71 +221,53 @@ impl ControlServerBuilder {
224221
}
225222
}
226223

227-
impl<T2, T3, T4, T5> ControlServerBuilder<((), T2, T3, T4, T5)> {
228-
pub fn with_network(
229-
self,
230-
network: &Network,
231-
) -> ControlServerBuilder<(Network, T2, T3, T4, T5)> {
232-
let (_, t2, t3, t4, t5) = self.mandatory_fields;
224+
impl<T2, T3, T4> ControlServerBuilder<((), T2, T3, T4)> {
225+
pub fn with_network(self, network: &Network) -> ControlServerBuilder<(Network, T2, T3, T4)> {
226+
let (_, t2, t3, t4) = self.mandatory_fields;
233227
ControlServerBuilder {
234-
mandatory_fields: (network.clone(), t2, t3, t4, t5),
228+
mandatory_fields: (network.clone(), t2, t3, t4),
235229
memory_profiler: self.memory_profiler,
236230
validator_keypair: self.validator_keypair,
237231
collator: self.collator,
238232
}
239233
}
240234
}
241235

242-
impl<T1, T3, T4, T5> ControlServerBuilder<(T1, (), T3, T4, T5)> {
243-
pub fn with_storage(self, storage: Storage) -> ControlServerBuilder<(T1, Storage, T3, T4, T5)> {
244-
let (t1, _, t3, t4, t5) = self.mandatory_fields;
236+
impl<T1, T3, T4> ControlServerBuilder<(T1, (), T3, T4)> {
237+
pub fn with_storage(self, storage: Storage) -> ControlServerBuilder<(T1, Storage, T3, T4)> {
238+
let (t1, _, t3, t4) = self.mandatory_fields;
245239
ControlServerBuilder {
246-
mandatory_fields: (t1, storage, t3, t4, t5),
240+
mandatory_fields: (t1, storage, t3, t4),
247241
memory_profiler: self.memory_profiler,
248242
validator_keypair: self.validator_keypair,
249243
collator: self.collator,
250244
}
251245
}
252246
}
253247

254-
impl<T1, T2, T4, T5> ControlServerBuilder<(T1, T2, (), T4, T5)> {
248+
impl<T1, T2, T4> ControlServerBuilder<(T1, T2, (), T4)> {
255249
pub fn with_gc_subscriber(
256250
self,
257251
gc_subscriber: GcSubscriber,
258-
) -> ControlServerBuilder<(T1, T2, GcSubscriber, T4, T5)> {
259-
let (t1, t2, _, t4, t5) = self.mandatory_fields;
252+
) -> ControlServerBuilder<(T1, T2, GcSubscriber, T4)> {
253+
let (t1, t2, _, t4) = self.mandatory_fields;
260254
ControlServerBuilder {
261-
mandatory_fields: (t1, t2, gc_subscriber, t4, t5),
255+
mandatory_fields: (t1, t2, gc_subscriber, t4),
262256
memory_profiler: self.memory_profiler,
263257
validator_keypair: self.validator_keypair,
264258
collator: self.collator,
265259
}
266260
}
267261
}
268262

269-
impl<T1, T2, T3, T5> ControlServerBuilder<(T1, T2, T3, (), T5)> {
263+
impl<T1, T2, T3> ControlServerBuilder<(T1, T2, T3, ())> {
270264
pub fn with_blockchain_rpc_client(
271265
self,
272266
client: BlockchainRpcClient,
273-
) -> ControlServerBuilder<(T1, T2, T3, BlockchainRpcClient, T5)> {
274-
let (t1, t2, t3, _, t5) = self.mandatory_fields;
275-
ControlServerBuilder {
276-
mandatory_fields: (t1, t2, t3, client, t5),
277-
memory_profiler: self.memory_profiler,
278-
validator_keypair: self.validator_keypair,
279-
collator: self.collator,
280-
}
281-
}
282-
}
283-
284-
impl<T1, T2, T3, T4> ControlServerBuilder<(T1, T2, T3, T4, ())> {
285-
pub fn with_manual_compaction(
286-
self,
287-
manual_compaction: ManualCompaction,
288-
) -> ControlServerBuilder<(T1, T2, T3, T4, ManualCompaction)> {
289-
let (t1, t2, t3, t4, _) = self.mandatory_fields;
267+
) -> ControlServerBuilder<(T1, T2, T3, BlockchainRpcClient)> {
268+
let (t1, t2, t3, _) = self.mandatory_fields;
290269
ControlServerBuilder {
291-
mandatory_fields: (t1, t2, t3, t4, manual_compaction),
270+
mandatory_fields: (t1, t2, t3, client),
292271
memory_profiler: self.memory_profiler,
293272
validator_keypair: self.validator_keypair,
294273
collator: self.collator,
@@ -320,9 +299,9 @@ pub struct ControlServer {
320299
}
321300

322301
impl ControlServer {
323-
pub fn builder() -> ControlServerBuilder<((), (), (), (), ())> {
302+
pub fn builder() -> ControlServerBuilder<((), (), (), ())> {
324303
ControlServerBuilder {
325-
mandatory_fields: ((), (), (), (), ()),
304+
mandatory_fields: ((), (), (), ()),
326305
memory_profiler: None,
327306
validator_keypair: None,
328307
collator: None,
@@ -468,7 +447,7 @@ impl proto::ControlServer for ControlServer {
468447
}
469448

470449
async fn trigger_compaction(self, _: Context, req: proto::TriggerCompactionRequest) {
471-
self.inner.manual_compaction.trigger_compaction(req.into());
450+
self.inner.manual_compaction.trigger_compaction(req);
472451
}
473452

474453
async fn set_memory_profiler_enabled(self, _: Context, enabled: bool) -> bool {
@@ -854,16 +833,6 @@ impl From<proto::TriggerGcRequest> for ManualGcTrigger {
854833
}
855834
}
856835

857-
impl From<proto::TriggerCompactionRequest> for ManualCompactionTrigger {
858-
fn from(value: proto::TriggerCompactionRequest) -> Self {
859-
match value {
860-
proto::TriggerCompactionRequest::Base => Self::Base,
861-
proto::TriggerCompactionRequest::Mempool => Self::Mempool,
862-
proto::TriggerCompactionRequest::Rpc => Self::Rpc,
863-
}
864-
}
865-
}
866-
867836
/// A bit more weak version of `CachedAccounts` from the `tycho-rpc`.
868837
struct CachedAccounts {
869838
block_handle: BlockHandle,
@@ -927,3 +896,68 @@ fn extend_signature_with_id(data: &[u8], signature_id: Option<i32>) -> Cow<'_, [
927896
None => Cow::Borrowed(data),
928897
}
929898
}
899+
900+
#[derive(Clone)]
901+
struct ManualCompaction {
902+
trigger: ManualTriggerTx,
903+
handle: AbortHandle,
904+
}
905+
906+
impl ManualCompaction {
907+
pub fn new(storage: Storage) -> Self {
908+
let (compaction_trigger, manual_compaction_rx) =
909+
watch::channel(None::<proto::TriggerCompactionRequest>);
910+
911+
let watcher = tokio::spawn(Self::watcher(manual_compaction_rx, storage.clone()));
912+
913+
Self {
914+
trigger: compaction_trigger,
915+
handle: watcher.abort_handle(),
916+
}
917+
}
918+
919+
pub fn trigger_compaction(&self, trigger: proto::TriggerCompactionRequest) {
920+
self.trigger.send_replace(Some(trigger));
921+
}
922+
923+
#[tracing::instrument(skip_all)]
924+
async fn watcher(mut manual_rx: ManualTriggerRx, storage: Storage) {
925+
tracing::info!("manager started");
926+
defer! {
927+
tracing::info!("manager stopped");
928+
}
929+
930+
loop {
931+
if manual_rx.changed().await.is_err() {
932+
break;
933+
}
934+
935+
let Some(trigger) = *manual_rx.borrow_and_update() else {
936+
continue;
937+
};
938+
939+
match trigger {
940+
proto::TriggerCompactionRequest::Base => {
941+
storage.base_db().trigger_compaction().await;
942+
}
943+
proto::TriggerCompactionRequest::Mempool => {
944+
storage.mempool_db().trigger_compaction().await;
945+
}
946+
proto::TriggerCompactionRequest::Rpc => {
947+
if let Some(rpc_db) = storage.rpc_db() {
948+
rpc_db.trigger_compaction().await;
949+
}
950+
}
951+
}
952+
}
953+
}
954+
}
955+
956+
impl Drop for ManualCompaction {
957+
fn drop(&mut self) {
958+
self.handle.abort();
959+
}
960+
}
961+
962+
type ManualTriggerTx = watch::Sender<Option<proto::TriggerCompactionRequest>>;
963+
type ManualTriggerRx = watch::Receiver<Option<proto::TriggerCompactionRequest>>;

0 commit comments

Comments
 (0)