Skip to content

Commit 5ccfcf8

Browse files
Prashant Palmeta-codesync[bot]
authored andcommitted
Added wait while shutdown to move repos
Summary: As titled Reviewed By: RajivTS Differential Revision: D85244548 fbshipit-source-id: d98772982275370199a2c14e14e32132e9176ee3
1 parent 3c7eeb4 commit 5ccfcf8

File tree

5 files changed

+54
-21
lines changed

5 files changed

+54
-21
lines changed

eden/mononoke/cmdlib/sharding/src/facebook.rs

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ use stats::prelude::*;
3333
use tokio::runtime::Handle;
3434
use tokio::sync::RwLock;
3535
use tokio::sync::oneshot::Receiver;
36+
use tokio::sync::oneshot::Sender;
3637
use tokio::task::JoinHandle;
3738
use tokio::time;
3839

@@ -898,8 +899,8 @@ impl ShardedProcessHandler {
898899
}
899900
}
900901

901-
pub async fn repo_map_empty(&self) -> bool {
902-
self.repo_map.read().await.is_empty()
902+
pub async fn repo_map_len(&self) -> usize {
903+
self.repo_map.read().await.len()
903904
}
904905
}
905906

@@ -1191,7 +1192,7 @@ impl ShardedProcessExecutor {
11911192
logger: &Logger,
11921193
terminate_signal_receiver: Receiver<bool>,
11931194
) -> Result<()> {
1194-
self.block_and_execute_with_quiesce_timeout(logger, terminate_signal_receiver, None)
1195+
self.block_and_execute_with_quiesce_timeout(logger, terminate_signal_receiver, None, None)
11951196
.await
11961197
}
11971198

@@ -1200,6 +1201,7 @@ impl ShardedProcessExecutor {
12001201
logger: &Logger,
12011202
terminate_signal_receiver: Receiver<bool>,
12021203
quiesce_timeout: Option<Duration>,
1204+
quiesce_completion_sender: Option<Sender<bool>>,
12031205
) -> Result<()> {
12041206
info!(logger, "Initiating sharded execution for service");
12051207
let shards = self.client.get_my_shards()?;
@@ -1226,20 +1228,24 @@ impl ShardedProcessExecutor {
12261228
STATS::manual_shard_eviction_by_timeout.add_value(1);
12271229
break;
12281230
}
1229-
repo_empty = self.handler.repo_map_empty() => {
1230-
if repo_empty {
1231+
repo_count = self.handler.repo_map_len() => {
1232+
if repo_count == 0 {
12311233
info!(logger, "All repos moved, evicting...");
12321234
STATS::manual_shard_eviction_by_repomap.add_value(1);
12331235
break;
12341236
} else {
1235-
info!(logger, "repos present, not evicting...");
1237+
info!(logger, "repos present count={}, not evicting...", repo_count);
12361238
// Sleep a bit before next check to avoid busy loop
12371239
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
12381240
}
12391241
}
12401242
}
12411243
}
12421244
}
1245+
1246+
if let Some(quiesce_completion_sender) = quiesce_completion_sender {
1247+
let _ = quiesce_completion_sender.send(true);
1248+
}
12431249
Ok(())
12441250
}
12451251
}

eden/mononoke/cmdlib/sharding/src/oss.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use fbinit::FacebookInit;
1414
use slog::Logger;
1515
use tokio::runtime::Handle;
1616
use tokio::sync::oneshot::Receiver;
17+
use tokio::sync::oneshot::Sender;
1718

1819
use crate::RepoShardedProcess;
1920

@@ -46,6 +47,7 @@ impl ShardedProcessExecutor {
4647
_logger: &Logger,
4748
_terminate_signal_receiver: Receiver<bool>,
4849
_quiesce_timeout: Option<Duration>,
50+
_quiesce_completion_sender: Option<Sender<bool>>,
4951
) -> Result<()> {
5052
unimplemented!("ShardedProcessExecutor is supported only for fbcode build")
5153
}

eden/mononoke/git_server/src/main.rs

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ use mononoke_app::monitoring::MonitoringAppExtension;
6767
use ods_counters::OdsCounterManager;
6868
use rate_limiting::RateLimitEnvironment;
6969
use slog::info;
70+
use slog::warn;
7071
use tokio::net::TcpListener;
7172

7273
use crate::middleware::Ods3Middleware;
@@ -220,6 +221,7 @@ fn main(fb: FacebookInit) -> Result<(), Error> {
220221
};
221222
let will_exit = Arc::new(AtomicBool::new(false));
222223
let (sender, receiver) = tokio::sync::oneshot::channel::<bool>();
224+
let (quiesce_sender, quiesce_receiver) = tokio::sync::oneshot::channel::<bool>();
223225
let runtime = app.runtime().clone();
224226
// Service name is used for shallow or deep sharding. If sharding itself is disabled, provide
225227
// service name as None while opening repos.
@@ -252,6 +254,13 @@ fn main(fb: FacebookInit) -> Result<(), Error> {
252254
})
253255
};
254256

257+
let quiesce_timeout_secs = justknobs::get_as::<u64>(
258+
"scm/mononoke:shardmanager_shutdown_timeout_secs",
259+
Some("git_server"),
260+
)
261+
.unwrap();
262+
let quiesce_timeout = std::time::Duration::from_secs(quiesce_timeout_secs);
263+
255264
let requests_counter = Arc::new(AtomicI64::new(0));
256265
let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
257266
let server = {
@@ -352,17 +361,12 @@ fn main(fb: FacebookInit) -> Result<(), Error> {
352361
let logger = app.logger().clone();
353362
{
354363
async move {
355-
let timeout_secs = justknobs::get_as::<u64>(
356-
"scm/mononoke:shardmanager_shutdown_timeout_secs",
357-
Some("git_server"),
358-
)
359-
.unwrap();
360-
let quiesce_timeout = std::time::Duration::from_secs(timeout_secs);
361364
executor
362365
.block_and_execute_with_quiesce_timeout(
363366
&logger,
364367
receiver,
365368
Some(quiesce_timeout),
369+
Some(quiesce_sender),
366370
)
367371
.await
368372
}
@@ -398,6 +402,7 @@ fn main(fb: FacebookInit) -> Result<(), Error> {
398402
Ok(())
399403
}
400404
};
405+
let cloned_logger = app.logger().clone();
401406
app.run_until_terminated(
402407
server,
403408
move || {
@@ -406,14 +411,21 @@ fn main(fb: FacebookInit) -> Result<(), Error> {
406411
},
407412
args.shutdown_timeout_args.shutdown_grace_period,
408413
async move {
414+
match quiesce_receiver.await {
415+
Ok(_) => info!(
416+
&cloned_logger.clone(),
417+
"received signal from quiesce sender"
418+
),
419+
Err(_) => warn!(&cloned_logger, "quiesce sender dropped"),
420+
};
409421
let _ = shutdown_tx.send(());
410422
// Currently we kill off in-flight requests as soon as we've closed the listener.
411423
// If this is a problem in prod, this would be the point at which to wait
412424
// for all connections to shut down.
413425
// To do this properly, we'd need to track the `Connection` futures that Gotham
414426
// gets from Hyper, tell them to gracefully shutdown, then wait for them to complete
415427
},
416-
args.shutdown_timeout_args.shutdown_timeout,
428+
args.shutdown_timeout_args.shutdown_timeout + quiesce_timeout,
417429
// TODO
418430
Some(requests_counter),
419431
)?;

eden/mononoke/mononoke_macros/just_knobs_defaults/just_knobs.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@
105105
"scm/mononoke:retry_query_from_replica_with_consistency_check_jitter": 50,
106106
"scm/mononoke:retry_query_from_replica_with_consistency_check_interval_ms": 200,
107107
"scm/mononoke:retry_query_from_replica_with_consistency_check_hlc_drift_tolerance_ns": 0,
108-
"scm/mononoke:cgdm_reloading_interval_secs": 5
108+
"scm/mononoke:cgdm_reloading_interval_secs": 5,
109+
"scm/mononoke:shardmanager_shutdown_timeout_secs": 10
109110
}
110111
}

eden/mononoke/scs/scs_server/src/main.rs

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ use panichandler::Fate;
5151
use scs_methods::source_control_impl::SourceControlServiceImpl;
5252
use sharding_ext::RepoShard;
5353
use slog::info;
54+
use slog::warn;
5455
use source_control_services::make_SourceControlService_server;
5556
use sql_construct::SqlConstruct;
5657
use sql_storage::Destination;
@@ -285,6 +286,7 @@ fn main(fb: FacebookInit) -> Result<(), Error> {
285286

286287
let will_exit = Arc::new(AtomicBool::new(false));
287288
let (sm_shutdown_sender, sm_shutdown_receiver) = tokio::sync::oneshot::channel::<bool>();
289+
let (quiesce_sender, quiesce_receiver) = tokio::sync::oneshot::channel::<bool>();
288290

289291
if let Some(max_memory) = args.max_memory {
290292
memory::set_max_memory(max_memory);
@@ -398,6 +400,13 @@ fn main(fb: FacebookInit) -> Result<(), Error> {
398400
writer.write_all(b"\n")?;
399401
}
400402

403+
let timeout_secs = justknobs::get_as::<u64>(
404+
"scm/mononoke:shardmanager_shutdown_timeout_secs",
405+
Some("scs_server"),
406+
)
407+
.unwrap();
408+
let quiesce_timeout = std::time::Duration::from_secs(timeout_secs);
409+
401410
if let Some(executor) = args.sharded_executor_args.build_executor(
402411
fb,
403412
runtime.clone(),
@@ -411,18 +420,13 @@ fn main(fb: FacebookInit) -> Result<(), Error> {
411420
runtime.spawn({
412421
let logger = logger.clone();
413422
{
414-
let timeout_secs = justknobs::get_as::<u64>(
415-
"scm/mononoke:shardmanager_shutdown_timeout_secs",
416-
Some("scs_server"),
417-
)
418-
.unwrap();
419-
let quiesce_timeout = std::time::Duration::from_secs(timeout_secs);
420423
async move {
421424
executor
422425
.block_and_execute_with_quiesce_timeout(
423426
&logger,
424427
sm_shutdown_receiver,
425428
Some(quiesce_timeout),
429+
Some(quiesce_sender),
426430
)
427431
.await
428432
}
@@ -434,6 +438,7 @@ fn main(fb: FacebookInit) -> Result<(), Error> {
434438
// stats aggregation.
435439
app.start_stats_aggregation()?;
436440

441+
let cloned_logger = logger.clone();
437442
app.wait_until_terminated(
438443
move || {
439444
let _ = sm_shutdown_sender.send(true);
@@ -442,13 +447,20 @@ fn main(fb: FacebookInit) -> Result<(), Error> {
442447
args.shutdown_timeout_args.shutdown_grace_period,
443448
async {
444449
// Note that async blocks are lazy, so this isn't called until first poll
450+
match quiesce_receiver.await {
451+
Ok(_) => info!(
452+
&cloned_logger.clone(),
453+
"received signal from quiesce sender"
454+
),
455+
Err(_) => warn!(&cloned_logger, "quiesce sender dropped"),
456+
};
445457
let _ = task::spawn_blocking(move || {
446458
// Calling `stop` blocks until the service has completed all requests.
447459
service_framework.stop();
448460
})
449461
.await;
450462
},
451-
args.shutdown_timeout_args.shutdown_timeout,
463+
args.shutdown_timeout_args.shutdown_timeout + quiesce_timeout,
452464
None,
453465
)?;
454466

0 commit comments

Comments
 (0)