Skip to content

Commit

Permalink
Persist subscriptions (#69)
Browse files Browse the repository at this point in the history
Fixes #66
  • Loading branch information
jeromegn authored Sep 25, 2023
1 parent 6ff31e0 commit 403cb77
Show file tree
Hide file tree
Showing 19 changed files with 1,870 additions and 647 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

## Unreleased

- Support existing tables being added to the schema [#64](https://github.com/superfly/corrosion/pull/64)
- Support existing tables being added to the schema ([#64](p64))
- (**BREAKING**) Persist subscriptions across reboots, including many reliability improvements ([#69](p69))

## v0.1.0

Expand Down
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ trust-dns-resolver = "0.22.0"
uhlc = { version = "0.5.2", features = ["defmt"] }
uuid = { version = "1.3.1", features = ["v4", "serde"] }
webpki = { version = "0.22.0", features = ["std"] }
http = { version = "0.2.9" }

[profile.release]
debug = 1
Expand Down
79 changes: 74 additions & 5 deletions crates/corro-agent/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ use crate::{
peer::{bidirectional_sync, gossip_client_endpoint, gossip_server_endpoint, SyncError},
public::{
api_v1_db_schema, api_v1_queries, api_v1_transactions,
pubsub::{api_v1_sub_by_id, api_v1_subs, MatcherBroadcastCache, MatcherIdCache},
pubsub::{
api_v1_sub_by_id, api_v1_subs, process_sub_channel, MatcherBroadcastCache,
MatcherIdCache,
},
},
},
broadcast::runtime_loop,
Expand All @@ -33,6 +36,7 @@ use corro_types::{
change::{Change, SqliteValue},
config::{AuthzConfig, Config, DEFAULT_GOSSIP_PORT},
members::{MemberEvent, Members, Rtt},
pubsub::{migrate_subs, Matcher},
schema::init_schema,
sqlite::{CrConn, Migration, SqlitePoolError},
sync::{generate_sync, SyncMessageDecodeError, SyncMessageEncodeError},
Expand Down Expand Up @@ -110,14 +114,25 @@ pub async fn setup(conf: Config, tripwire: Tripwire) -> eyre::Result<(Agent, Age

info!("Actor ID: {}", actor_id);

let pool = SplitPool::create(&conf.db.path, tripwire.clone()).await?;
let subscriptions_db_path = conf.db.subscriptions_db_path();

if let Some(parent) = subscriptions_db_path.parent() {
tokio::fs::create_dir_all(parent).await?;
}

let pool = SplitPool::create(&conf.db.path, &subscriptions_db_path, tripwire.clone()).await?;

let schema = {
let mut conn = pool.write_priority().await?;
migrate(&mut conn)?;
init_schema(&conn)?
};

{
let mut conn = rusqlite::Connection::open(&subscriptions_db_path)?;
migrate_subs(&mut conn)?;
}

let (tx_apply, rx_apply) = channel(10240);

let mut bk: HashMap<ActorId, BookedVersions> = HashMap::new();
Expand Down Expand Up @@ -290,6 +305,59 @@ pub async fn run(agent: Agent, opts: AgentOptions) -> eyre::Result<()> {
rtt_rx,
} = opts;

let mut matcher_id_cache = MatcherIdCache::default();
let mut matcher_bcast_cache = MatcherBroadcastCache::default();

{
// open database and set its journal to WAL
let rows = {
let subscriptions_db_path = agent.config().db.subscriptions_db_path();
let mut conn = rusqlite::Connection::open(&subscriptions_db_path)?;
conn.execute_batch(
r#"
PRAGMA journal_mode = WAL;
PRAGMA synchronous = NORMAL;
"#,
)?;

migrate_subs(&mut conn)?;

let res = conn
.prepare("SELECT id, sql FROM subs")?
.query_map([], |row| Ok((row.get(0)?, row.get(1)?)))?
.collect::<rusqlite::Result<Vec<(uuid::Uuid, String)>>>()?;

// the let is required or else we get a lifetime error
#[allow(clippy::let_and_return)]
res
};

for (id, sql) in rows {
let conn = agent.pool().dedicated().await?;
let (evt_tx, evt_rx) = channel(512);
match Matcher::restore(id, &agent.schema().read(), conn, evt_tx, &sql) {
Ok(handle) => {
agent.matchers().write().insert(id, handle);
let (sub_tx, _) = tokio::sync::broadcast::channel(10240);
tokio::spawn(process_sub_channel(
agent.clone(),
id,
sub_tx.clone(),
evt_rx,
));
matcher_id_cache.insert(sql, id);
matcher_bcast_cache.insert(id, sub_tx);
}
Err(e) => {
error!("could not restore subscription {id}: {e}");
}
}
}
};

let matcher_id_cache = Arc::new(tokio::sync::RwLock::new(matcher_id_cache));
let matcher_bcast_cache = Arc::new(tokio::sync::RwLock::new(matcher_bcast_cache));

let (to_send_tx, to_send_rx) = channel(10240);
let (notifications_tx, notifications_rx) = channel(10240);

Expand Down Expand Up @@ -900,8 +968,8 @@ pub async fn run(agent: Agent, opts: AgentOptions) -> eyre::Result<()> {
tower::ServiceBuilder::new()
.layer(Extension(Arc::new(AtomicI64::new(0))))
.layer(Extension(agent.clone()))
.layer(Extension(MatcherIdCache::default()))
.layer(Extension(MatcherBroadcastCache::default()))
.layer(Extension(matcher_id_cache))
.layer(Extension(matcher_bcast_cache))
.layer(Extension(tripwire.clone())),
)
.layer(DefaultBodyLimit::disable())
Expand Down Expand Up @@ -2020,6 +2088,7 @@ pub fn process_subs(agent: &Agent, changeset: &[Change]) {
}
}
}

for id in matchers_to_delete {
agent.matchers().write().remove(&id);
}
Expand Down Expand Up @@ -2229,7 +2298,7 @@ async fn sync_loop(
}
}

pub fn migrate(conn: &mut CrConn) -> rusqlite::Result<()> {
pub fn migrate(conn: &mut Connection) -> rusqlite::Result<()> {
let migrations: Vec<Box<dyn Migration>> = vec![Box::new(
init_migration as fn(&Transaction) -> rusqlite::Result<()>,
)];
Expand Down
29 changes: 22 additions & 7 deletions crates/corro-agent/src/api/public/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,8 @@ async fn build_query_rows_response(
.collect::<rusqlite::Result<Vec<_>>>()
{
Ok(cells) => {
if let Err(e) = data_tx.blocking_send(QueryEvent::Row(rowid, cells))
if let Err(e) =
data_tx.blocking_send(QueryEvent::Row(rowid.into(), cells))
{
error!("could not send back row: {e}");
return;
Expand Down Expand Up @@ -691,7 +692,7 @@ pub async fn api_v1_db_schema(
mod tests {
use arc_swap::ArcSwap;
use bytes::Bytes;
use corro_types::{actor::ActorId, agent::SplitPool, config::Config, schema::SqliteType};
use corro_types::{actor::ActorId, agent::SplitPool, config::Config, schema::SqliteType, api::RowId};
use futures::Stream;
use http_body::{combinators::UnsyncBoxBody, Body};
use tokio::sync::mpsc::{channel, error::TryRecvError};
Expand Down Expand Up @@ -724,7 +725,12 @@ mod tests {

let dir = tempfile::tempdir()?;

let pool = SplitPool::create(dir.path().join("./test.sqlite"), tripwire.clone()).await?;
let pool = SplitPool::create(
dir.path().join("./test.sqlite"),
dir.path().join("./subs.sqlite"),
tripwire.clone(),
)
.await?;

{
let mut conn = pool.write_priority().await?;
Expand Down Expand Up @@ -824,7 +830,12 @@ mod tests {

let dir = tempfile::tempdir()?;

let pool = SplitPool::create(dir.path().join("./test.sqlite"), tripwire.clone()).await?;
let pool = SplitPool::create(
dir.path().join("./test.sqlite"),
dir.path().join("./subs.sqlite"),
tripwire.clone(),
)
.await?;

{
let mut conn = pool.write_priority().await?;
Expand Down Expand Up @@ -917,7 +928,7 @@ mod tests {

assert_eq!(
row,
QueryEvent::Row(1, vec!["service-id".into(), "service-name".into()])
QueryEvent::Row(RowId(1), vec!["service-id".into(), "service-name".into()])
);

buf.extend_from_slice(&body.data().await.unwrap()?);
Expand All @@ -928,7 +939,10 @@ mod tests {

assert_eq!(
row,
QueryEvent::Row(2, vec!["service-id-2".into(), "service-name-2".into()])
QueryEvent::Row(
RowId(2),
vec!["service-id-2".into(), "service-name-2".into()]
)
);

buf.extend_from_slice(&body.data().await.unwrap()?);
Expand All @@ -952,7 +966,8 @@ mod tests {
let dir = tempfile::tempdir()?;
let db_path = dir.path().join("./test.sqlite");

let pool = SplitPool::create(&db_path, tripwire.clone()).await?;
let pool =
SplitPool::create(&db_path, dir.path().join("./subs.sqlite"), tripwire.clone()).await?;

{
let mut conn = pool.write_priority().await?;
Expand Down
Loading

0 comments on commit 403cb77

Please sign in to comment.