Skip to content

Commit d1b79e5

Browse files
TaskManager spawn helpers (#1065)
* Add `spawn` and `run` helpers These functions are added in the hopes of providing easy methods to ensure our tasks are spawned onto their own threads when starting them with TaskManager. `run()` is provided for tasks that can not/do not implement `Send`. However, effort should be put into allowing these tasks to use `spawn()`. Exporting the type alias `TaskLocalBoxFuture` is a nicety. It allows us to more easily target the return type if we ever want to refactor it into something else. And prevents the proliferation of Managed Tasks importing LocalBoxFuture from wherever they please. - `futures::future::LocalBoxFuture` - `futures::prelude::future::LocalBoxFuture` - `futures_util::future::LocalBoxFuture` * Update impl ManagedTask to use helpers Everywhere, we replace the return type with the type alias. Where we can, use `task_manager::spawn()` to ensure tasks are spawned in their own tasks. Where we can’t, use `task_manager::run()` to wrap tasks in a pin until they can implement `Send` and use `task_manager::spawn()`. * Update chain-ingest to use task_manager helper
1 parent e8ec23d commit d1b79e5

File tree

39 files changed

+125
-235
lines changed

39 files changed

+125
-235
lines changed

file_store/src/file_info_poller.rs

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use crate::{traits::MsgDecode, Error, FileInfo, Result};
22
use aws_sdk_s3::primitives::ByteStream;
33
use chrono::{DateTime, Utc};
44
use derive_builder::Builder;
5-
use futures::{future::LocalBoxFuture, stream::BoxStream, StreamExt};
5+
use futures::{stream::BoxStream, StreamExt};
66
use futures_util::TryFutureExt;
77
use retainer::Cache;
88
use std::{collections::VecDeque, marker::PhantomData, sync::Arc, time::Duration};
@@ -235,14 +235,8 @@ where
235235
fn start_task(
236236
self: Box<Self>,
237237
shutdown: triggered::Listener,
238-
) -> LocalBoxFuture<'static, anyhow::Result<()>> {
239-
let handle = tokio::spawn(self.run(shutdown));
240-
241-
Box::pin(
242-
handle
243-
.map_err(anyhow::Error::from)
244-
.and_then(|result| async move { result.map_err(anyhow::Error::from) }),
245-
)
238+
) -> task_manager::TaskLocalBoxFuture {
239+
task_manager::spawn(self.run(shutdown).err_into())
246240
}
247241
}
248242

file_store/src/file_sink.rs

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use crate::{file_upload::FileUpload, traits::MsgBytes, Error, Result};
22
use async_compression::tokio::write::GzipEncoder;
33
use bytes::Bytes;
44
use chrono::{DateTime, Utc};
5-
use futures::{future::LocalBoxFuture, SinkExt, TryFutureExt};
5+
use futures::{SinkExt, TryFutureExt};
66
use metrics::Label;
77
use std::time::Duration;
88
use std::{
@@ -290,14 +290,8 @@ impl<T: MsgBytes + Send + Sync + 'static> ManagedTask for FileSink<T> {
290290
fn start_task(
291291
self: Box<Self>,
292292
shutdown: triggered::Listener,
293-
) -> LocalBoxFuture<'static, anyhow::Result<()>> {
294-
let handle = tokio::spawn(self.run(shutdown));
295-
296-
Box::pin(
297-
handle
298-
.map_err(anyhow::Error::from)
299-
.and_then(|result| async move { result.map_err(anyhow::Error::from) }),
300-
)
293+
) -> task_manager::TaskLocalBoxFuture {
294+
task_manager::spawn(self.run(shutdown).err_into())
301295
}
302296
}
303297

file_store/src/file_upload.rs

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::{Error, Result};
2-
use futures::{future::LocalBoxFuture, StreamExt, TryFutureExt};
2+
use futures::{StreamExt, TryFutureExt};
33
use std::{
44
path::{Path, PathBuf},
55
time::Duration,
@@ -54,14 +54,8 @@ impl ManagedTask for FileUploadServer {
5454
fn start_task(
5555
self: Box<Self>,
5656
shutdown: triggered::Listener,
57-
) -> LocalBoxFuture<'static, anyhow::Result<()>> {
58-
let handle = tokio::spawn(self.run(shutdown));
59-
60-
Box::pin(
61-
handle
62-
.map_err(anyhow::Error::from)
63-
.and_then(|result| async move { result.map_err(anyhow::Error::from) }),
64-
)
57+
) -> task_manager::TaskLocalBoxFuture {
58+
task_manager::spawn(self.run(shutdown).err_into())
6559
}
6660
}
6761

ingest/src/server_chain.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ impl ManagedTask for GrpcServer {
118118
self: Box<Self>,
119119
shutdown: triggered::Listener,
120120
) -> LocalBoxFuture<'static, anyhow::Result<()>> {
121-
Box::pin(self.run(shutdown))
121+
task_manager::spawn(self.run(shutdown))
122122
}
123123
}
124124

ingest/src/server_iot.rs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,7 @@ use file_store::{
66
file_upload,
77
traits::{FileSinkCommitStrategy, FileSinkRollTime, FileSinkWriteExt, MsgVerify},
88
};
9-
use futures::{
10-
future::{LocalBoxFuture, TryFutureExt},
11-
Stream, StreamExt,
12-
};
9+
use futures::{future::TryFutureExt, Stream, StreamExt};
1310
use helium_crypto::{Network, PublicKey};
1411
use helium_proto::services::poc_lora::{
1512
self, lora_stream_request_v1::Request as StreamRequest,
@@ -181,9 +178,9 @@ impl ManagedTask for GrpcServer {
181178
fn start_task(
182179
self: Box<Self>,
183180
shutdown: triggered::Listener,
184-
) -> LocalBoxFuture<'static, anyhow::Result<()>> {
181+
) -> task_manager::TaskLocalBoxFuture {
185182
let address = self.address;
186-
Box::pin(async move {
183+
task_manager::spawn(async move {
187184
let grpc_server = transport::Server::builder()
188185
.layer(custom_tracing::grpc_layer::new_with_span(make_span))
189186
.layer(poc_metrics::request_layer!("ingest_server_iot_connection"))

ingest/src/server_mobile.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ use file_store::{
66
file_upload,
77
traits::{FileSinkCommitStrategy, FileSinkRollTime, FileSinkWriteExt, MsgVerify},
88
};
9-
use futures::future::LocalBoxFuture;
109
use futures_util::TryFutureExt;
1110
use helium_crypto::{Network, PublicKey, PublicKeyBinary};
1211
use helium_proto::services::poc_mobile::{
@@ -73,8 +72,8 @@ where
7372
fn start_task(
7473
self: Box<Self>,
7574
shutdown: triggered::Listener,
76-
) -> LocalBoxFuture<'static, anyhow::Result<()>> {
77-
Box::pin(self.run(shutdown))
75+
) -> task_manager::TaskLocalBoxFuture {
76+
task_manager::spawn(self.run(shutdown))
7877
}
7978
}
8079

iot_config/src/db_cleaner.rs

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
use chrono::{DateTime, Utc};
2-
use futures::TryFutureExt;
32
use sqlx::{PgConnection, Pool, Postgres, Transaction};
43
use std::time::Duration;
54
use task_manager::ManagedTask;
@@ -15,12 +14,8 @@ impl ManagedTask for DbCleaner {
1514
fn start_task(
1615
self: Box<Self>,
1716
shutdown: triggered::Listener,
18-
) -> futures::future::LocalBoxFuture<'static, anyhow::Result<()>> {
19-
Box::pin(
20-
tokio::spawn(self.run(shutdown))
21-
.map_err(anyhow::Error::from)
22-
.and_then(|result| async move { result }),
23-
)
17+
) -> task_manager::TaskLocalBoxFuture {
18+
task_manager::spawn(self.run(shutdown))
2419
}
2520
}
2621

iot_config/src/main.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
use anyhow::{Error, Result};
22
use clap::Parser;
3-
use futures::future::LocalBoxFuture;
43
use futures_util::TryFutureExt;
54
use helium_proto::services::{
65
iot_config::{AdminServer, GatewayServer, OrgServer, RouteServer},
@@ -144,8 +143,8 @@ impl ManagedTask for GrpcServer {
144143
fn start_task(
145144
self: Box<Self>,
146145
shutdown: triggered::Listener,
147-
) -> LocalBoxFuture<'static, anyhow::Result<()>> {
148-
Box::pin(async move {
146+
) -> task_manager::TaskLocalBoxFuture {
147+
task_manager::spawn(async move {
149148
let grpc_server = transport::Server::builder()
150149
.http2_keepalive_interval(Some(Duration::from_secs(250)))
151150
.http2_keepalive_timeout(Some(Duration::from_secs(60)))

iot_packet_verifier/src/burner.rs

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use crate::{
44
confirm_pending_txns, Burn, ConfirmPendingError, PendingTables, PendingTablesTransaction,
55
},
66
};
7-
use futures::{future::LocalBoxFuture, TryFutureExt};
7+
use futures::TryFutureExt;
88
use solana::{burn::SolanaNetwork, GetSignature, SolanaRpcError};
99
use std::time::Duration;
1010
use task_manager::ManagedTask;
@@ -25,14 +25,8 @@ where
2525
fn start_task(
2626
self: Box<Self>,
2727
shutdown: triggered::Listener,
28-
) -> LocalBoxFuture<'static, anyhow::Result<()>> {
29-
let handle = tokio::spawn(self.run(shutdown));
30-
31-
Box::pin(
32-
handle
33-
.map_err(anyhow::Error::from)
34-
.and_then(|result| async move { result.map_err(anyhow::Error::from) }),
35-
)
28+
) -> task_manager::TaskLocalBoxFuture {
29+
task_manager::spawn(self.run(shutdown).err_into())
3630
}
3731
}
3832

iot_packet_verifier/src/daemon.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,8 @@ where
4141
fn start_task(
4242
self: Box<Self>,
4343
shutdown: triggered::Listener,
44-
) -> futures::future::LocalBoxFuture<'static, anyhow::Result<()>> {
45-
Box::pin(self.run(shutdown))
44+
) -> task_manager::TaskLocalBoxFuture {
45+
task_manager::spawn(self.run(shutdown))
4646
}
4747
}
4848

0 commit comments

Comments
 (0)