Skip to content

Commit

Permalink
refactor: move away from inner-join, count up unique receipts for wor…
Browse files Browse the repository at this point in the history
…kflow info (#188)

## Description

This PR essentially makes progress count monotonic by changing up the
inner join and storing receipt cids when grabbing info over libp2p.

## Type of change

- [X] Refactor (non-breaking change that updates existing functionality)
  • Loading branch information
Zeeshan Lakhani committed Jul 11, 2023
1 parent b1c76d2 commit 7376c05
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 27 deletions.
12 changes: 6 additions & 6 deletions flake.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 19 additions & 3 deletions homestar-runtime/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,23 @@ pub trait Database: Send + Sync + Clone {
.map_err(Into::into)
}

/// Store series of receipts for a workflow [Cid] in the
/// [schema::workflows_receipts] table.
///
/// NOTE: We cannot do batch inserts with `on_conflict`, so we add
/// each one 1-by-1:
/// <https://github.com/diesel-rs/diesel/issues/3114>
fn store_workflow_receipts(
workflow_cid: Cid,
receipts: &[Cid],
conn: &mut Connection,
) -> Result<usize> {
receipts.iter().try_fold(0, |acc, receipt| {
let res = Self::store_workflow_receipt(workflow_cid, *receipt, conn)?;
Ok::<_, anyhow::Error>(acc + res)
})
}

/// Select workflow given a [Cid] to the workflow.
fn select_workflow(cid: Cid, conn: &mut Connection) -> Result<workflow::Stored> {
let wf = schema::workflows::dsl::workflows
Expand All @@ -154,12 +171,11 @@ pub trait Database: Send + Sync + Clone {
Ok(wf)
}

/// Join workflow information with number of receipts emitted.
/// Return workflow information with number of receipts emitted.
fn get_workflow_info(workflow_cid: Cid, conn: &mut Connection) -> Result<workflow::Info> {
let wf = Self::select_workflow(workflow_cid, conn)?;
let associated_receipts = workflow::StoredReceipt::belonging_to(&wf)
.inner_join(schema::receipts::dsl::receipts)
.select(schema::receipts::dsl::cid)
.select(schema::workflows_receipts::receipt_cid)
.load(conn)?;

let cids = associated_receipts
Expand Down
25 changes: 11 additions & 14 deletions homestar-runtime/src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,15 @@ use tokio::{
use tracing::info;

/// Type alias for a [DashMap] containing running task information.
pub type RunningSet = DashMap<Cid, Vec<AbortHandle>>;
pub type RunningTaskSet = DashMap<Cid, Vec<AbortHandle>>;

/// Trait for managing a [DashMap] of running task information.
pub trait ModifiedSet {
/// Append or insert a new [AbortHandle] into the [RunningSet].
/// Append or insert a new [AbortHandle] into the [RunningTaskSet].
fn append_or_insert(&mut self, cid: Cid, handles: Vec<AbortHandle>);
}

impl ModifiedSet for RunningSet {
impl ModifiedSet for RunningTaskSet {
fn append_or_insert(&mut self, cid: Cid, mut handles: Vec<AbortHandle>) {
self.entry(cid)
.and_modify(|prev_handles| {
Expand All @@ -55,7 +55,7 @@ pub struct Runner {
command_sender: oneshot::Sender<Event>,
command_receiver: oneshot::Receiver<Event>,
event_sender: Arc<mpsc::Sender<Event>>,
running_set: RunningSet,
running_set: RunningTaskSet,
#[allow(dead_code)]
ws_sender: ws::WsSender,
ws_receiver: BoundedChannelReceiver<ws::WsMessage>,
Expand All @@ -71,7 +71,7 @@ pub struct Runner {
command_sender: oneshot::Sender<Event>,
command_receiver: oneshot::Receiver<Event>,
event_sender: Arc<mpsc::Sender<Event>>,
running_set: RunningSet,
running_set: RunningTaskSet,
}

impl Runner {
Expand Down Expand Up @@ -162,17 +162,16 @@ impl Runner {
Ok(())
}

/// Garbage-collect task [AbortHandle]s in the [RunningSet].
/// Garbage-collect task [AbortHandle]s in the [RunningTaskSet].
pub fn gc(&mut self) {
self.running_set.retain(|_cid, handles| {
handles.retain(|handle| !handle.is_finished());
!handles.is_empty()
});
}

/// Garbage-collect task [AbortHandle]s in the [RunningSet] for a specific
/// Garbage-collect task [AbortHandle]s in the [RunningTaskSet] for a specific
/// workflow [Cid], running on a worker.
///
pub fn gc_worker(&mut self, cid: Cid) {
if let Some(mut handles) = self.running_set.get_mut(&cid) {
handles.retain(|handle| !handle.is_finished());
Expand Down Expand Up @@ -242,15 +241,13 @@ mod test {
time::Duration,
};

static ATOMIC_PORT: AtomicUsize = AtomicUsize::new(1338);
static ATOMIC_PORT: AtomicUsize = AtomicUsize::new(444);

async fn setup() -> Runner {
let mut settings = Settings::load().unwrap();
settings.node.network.websocket_port = ATOMIC_PORT.fetch_add(1, Ordering::SeqCst) as u16;
let db = crate::test_utils::db::MemoryDb::setup_connection_pool(
Settings::load().unwrap().node(),
)
.unwrap();
let _ = ATOMIC_PORT.fetch_add(1, Ordering::SeqCst) as u16;
settings.node.network.websocket_port = ATOMIC_PORT.load(Ordering::SeqCst) as u16;
let db = crate::test_utils::db::MemoryDb::setup_connection_pool(settings.node()).unwrap();

Runner::start(settings.into(), db).await.unwrap()
}
Expand Down
6 changes: 3 additions & 3 deletions homestar-runtime/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::{
Event,
},
network::swarm::CapsuleTag,
runner::{ModifiedSet, RunningSet},
runner::{ModifiedSet, RunningTaskSet},
scheduler::TaskScheduler,
tasks::{RegisteredTasks, WasmContext},
workflow::{self, Resource},
Expand Down Expand Up @@ -138,15 +138,15 @@ impl<'a> Worker<'a> {
pub(crate) async fn run(
self,
db: impl Database + Sync,
running_set: &mut RunningSet,
running_set: &mut RunningTaskSet,
) -> Result<()> {
self.run_queue(db, running_set).await
}

async fn run_queue(
mut self,
db: impl Database + Sync,
running_set: &mut RunningSet,
running_set: &mut RunningTaskSet,
) -> Result<()> {
fn insert_into_map<T>(mut map: Arc<LinkMap<T>>, key: Cid, value: T)
where
Expand Down
7 changes: 6 additions & 1 deletion homestar-runtime/src/workflow/info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ impl Stored {
///
/// [Workflow]: homestar_core::Workflow
/// [receipts]: crate::Receipt
#[derive(Debug, Clone, PartialEq, Queryable, Insertable, Identifiable, Associations, Hash)]
#[derive(
Debug, Clone, PartialEq, Queryable, Insertable, Identifiable, Selectable, Associations, Hash,
)]
#[diesel(belongs_to(Receipt, foreign_key = receipt_cid))]
#[diesel(belongs_to(Stored, foreign_key = workflow_cid))]
#[diesel(table_name = crate::db::schema::workflows_receipts, primary_key(workflow_cid, receipt_cid))]
Expand Down Expand Up @@ -171,6 +173,7 @@ impl Info {
) -> Result<Self> {
let workflow_len = workflow.len();
let workflow_cid = workflow.to_cid()?;

let handle_timeout_fn = |workflow_cid, reused_conn: Option<&'a mut Connection>| {
let workflow_info = Self::default(workflow_cid, workflow_len);
// store workflow from info
Expand Down Expand Up @@ -234,6 +237,8 @@ impl Info {
),
conn,
)?;

Db::store_workflow_receipts(workflow_cid, &workflow_info.progress, conn)?;
}

Ok(workflow_info)
Expand Down

0 comments on commit 7376c05

Please sign in to comment.