Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: move away from inner-join, count up unique receipts for workflow info #188

Merged
merged 3 commits into from
Jul 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions flake.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 19 additions & 3 deletions homestar-runtime/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,23 @@ pub trait Database: Send + Sync + Clone {
.map_err(Into::into)
}

/// Store series of receipts for a workflow [Cid] in the
/// [schema::workflows_receipts] table.
///
/// NOTE: We cannot do batch inserts with `on_conflict`, so we add
/// each one 1-by-1:
/// <https://github.com/diesel-rs/diesel/issues/3114>
fn store_workflow_receipts(
workflow_cid: Cid,
receipts: &[Cid],
conn: &mut Connection,
) -> Result<usize> {
receipts.iter().try_fold(0, |acc, receipt| {
let res = Self::store_workflow_receipt(workflow_cid, *receipt, conn)?;
Ok::<_, anyhow::Error>(acc + res)
})
}

/// Select workflow given a [Cid] to the workflow.
fn select_workflow(cid: Cid, conn: &mut Connection) -> Result<workflow::Stored> {
let wf = schema::workflows::dsl::workflows
Expand All @@ -154,12 +171,11 @@ pub trait Database: Send + Sync + Clone {
Ok(wf)
}

/// Join workflow information with number of receipts emitted.
/// Return workflow information with number of receipts emitted.
fn get_workflow_info(workflow_cid: Cid, conn: &mut Connection) -> Result<workflow::Info> {
let wf = Self::select_workflow(workflow_cid, conn)?;
let associated_receipts = workflow::StoredReceipt::belonging_to(&wf)
.inner_join(schema::receipts::dsl::receipts)
.select(schema::receipts::dsl::cid)
.select(schema::workflows_receipts::receipt_cid)
.load(conn)?;

let cids = associated_receipts
Expand Down
25 changes: 11 additions & 14 deletions homestar-runtime/src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,15 @@ use tokio::{
use tracing::info;

/// Type alias for a [DashMap] containing running task information.
pub type RunningSet = DashMap<Cid, Vec<AbortHandle>>;
pub type RunningTaskSet = DashMap<Cid, Vec<AbortHandle>>;

/// Trait for managing a [DashMap] of running task information.
pub trait ModifiedSet {
/// Append or insert a new [AbortHandle] into the [RunningSet].
/// Append or insert a new [AbortHandle] into the [RunningTaskSet].
fn append_or_insert(&mut self, cid: Cid, handles: Vec<AbortHandle>);
}

impl ModifiedSet for RunningSet {
impl ModifiedSet for RunningTaskSet {
fn append_or_insert(&mut self, cid: Cid, mut handles: Vec<AbortHandle>) {
self.entry(cid)
.and_modify(|prev_handles| {
Expand All @@ -55,7 +55,7 @@ pub struct Runner {
command_sender: oneshot::Sender<Event>,
command_receiver: oneshot::Receiver<Event>,
event_sender: Arc<mpsc::Sender<Event>>,
running_set: RunningSet,
running_set: RunningTaskSet,
#[allow(dead_code)]
ws_sender: ws::WsSender,
ws_receiver: BoundedChannelReceiver<ws::WsMessage>,
Expand All @@ -71,7 +71,7 @@ pub struct Runner {
command_sender: oneshot::Sender<Event>,
command_receiver: oneshot::Receiver<Event>,
event_sender: Arc<mpsc::Sender<Event>>,
running_set: RunningSet,
running_set: RunningTaskSet,
}

impl Runner {
Expand Down Expand Up @@ -162,17 +162,16 @@ impl Runner {
Ok(())
}

/// Garbage-collect task [AbortHandle]s in the [RunningSet].
/// Garbage-collect task [AbortHandle]s in the [RunningTaskSet].
pub fn gc(&mut self) {
self.running_set.retain(|_cid, handles| {
handles.retain(|handle| !handle.is_finished());
!handles.is_empty()
});
}

/// Garbage-collect task [AbortHandle]s in the [RunningSet] for a specific
/// Garbage-collect task [AbortHandle]s in the [RunningTaskSet] for a specific
/// workflow [Cid], running on a worker.
///
pub fn gc_worker(&mut self, cid: Cid) {
if let Some(mut handles) = self.running_set.get_mut(&cid) {
handles.retain(|handle| !handle.is_finished());
Expand Down Expand Up @@ -242,15 +241,13 @@ mod test {
time::Duration,
};

static ATOMIC_PORT: AtomicUsize = AtomicUsize::new(1338);
static ATOMIC_PORT: AtomicUsize = AtomicUsize::new(444);

async fn setup() -> Runner {
let mut settings = Settings::load().unwrap();
settings.node.network.websocket_port = ATOMIC_PORT.fetch_add(1, Ordering::SeqCst) as u16;
let db = crate::test_utils::db::MemoryDb::setup_connection_pool(
Settings::load().unwrap().node(),
)
.unwrap();
let _ = ATOMIC_PORT.fetch_add(1, Ordering::SeqCst) as u16;
settings.node.network.websocket_port = ATOMIC_PORT.load(Ordering::SeqCst) as u16;
let db = crate::test_utils::db::MemoryDb::setup_connection_pool(settings.node()).unwrap();

Runner::start(settings.into(), db).await.unwrap()
}
Expand Down
6 changes: 3 additions & 3 deletions homestar-runtime/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::{
Event,
},
network::swarm::CapsuleTag,
runner::{ModifiedSet, RunningSet},
runner::{ModifiedSet, RunningTaskSet},
scheduler::TaskScheduler,
tasks::{RegisteredTasks, WasmContext},
workflow::{self, Resource},
Expand Down Expand Up @@ -138,15 +138,15 @@ impl<'a> Worker<'a> {
pub(crate) async fn run(
self,
db: impl Database + Sync,
running_set: &mut RunningSet,
running_set: &mut RunningTaskSet,
) -> Result<()> {
self.run_queue(db, running_set).await
}

async fn run_queue(
mut self,
db: impl Database + Sync,
running_set: &mut RunningSet,
running_set: &mut RunningTaskSet,
) -> Result<()> {
fn insert_into_map<T>(mut map: Arc<LinkMap<T>>, key: Cid, value: T)
where
Expand Down
7 changes: 6 additions & 1 deletion homestar-runtime/src/workflow/info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ impl Stored {
///
/// [Workflow]: homestar_core::Workflow
/// [receipts]: crate::Receipt
#[derive(Debug, Clone, PartialEq, Queryable, Insertable, Identifiable, Associations, Hash)]
#[derive(
Debug, Clone, PartialEq, Queryable, Insertable, Identifiable, Selectable, Associations, Hash,
)]
#[diesel(belongs_to(Receipt, foreign_key = receipt_cid))]
#[diesel(belongs_to(Stored, foreign_key = workflow_cid))]
#[diesel(table_name = crate::db::schema::workflows_receipts, primary_key(workflow_cid, receipt_cid))]
Expand Down Expand Up @@ -171,6 +173,7 @@ impl Info {
) -> Result<Self> {
let workflow_len = workflow.len();
let workflow_cid = workflow.to_cid()?;

let handle_timeout_fn = |workflow_cid, reused_conn: Option<&'a mut Connection>| {
let workflow_info = Self::default(workflow_cid, workflow_len);
// store workflow from info
Expand Down Expand Up @@ -234,6 +237,8 @@ impl Info {
),
conn,
)?;

Db::store_workflow_receipts(workflow_cid, &workflow_info.progress, conn)?;
}

Ok(workflow_info)
Expand Down