Skip to content

Commit

Permalink
Upgrade to cr-sqlite 0.16.0 + a few performance improvements (#75)
Browse files Browse the repository at this point in the history
Upgrading to cr-sqlite 0.16.0 (unreleased, for now) gives a significant disk space reduction by adding a "lookaside" table for primary keys. Instead of repeating a whole primary key for each row in the clock table (there's 1 row per column + 1 sentinel row), this assigns an integer that can be looked up.

For consul services and checks, the savings are significant (about 30% in our case).

This PR also adds a few performance improvements:
- Process all "empty" (cleared) db versions out of band in bigger transactions
- Buffer more data when synchronizing change before sending them
  • Loading branch information
jeromegn authored Sep 29, 2023
1 parent b4dac80 commit 4b723ea
Show file tree
Hide file tree
Showing 14 changed files with 351 additions and 175 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## Unreleased

- Upgraded to cr-sqlite 0.16.0 (unreleased) ([#75](../../pull/75))
- Rewrite compaction logic to be more correct and efficient ([#74](../../pull/74))
- `corrosion consul sync` will now bundle services and checks in a single transaction (changeset) ([#73](../../pull/73))
- (**BREAKING**) Persist subscriptions across reboots, including many reliability improvements ([#69](../../pull/69))
- Support existing tables being added to the schema ([#64](../../pull/64))
Expand Down
207 changes: 170 additions & 37 deletions crates/corro-agent/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,15 @@ const COMPACT_BOOKED_INTERVAL: Duration = Duration::from_secs(300);
const ANNOUNCE_INTERVAL: Duration = Duration::from_secs(300);

pub struct AgentOptions {
actor_id: ActorId,
gossip_server_endpoint: quinn::Endpoint,
transport: Transport,
api_listener: TcpListener,
rx_bcast: Receiver<BroadcastInput>,
rx_apply: Receiver<(ActorId, i64)>,
rtt_rx: Receiver<(SocketAddr, Duration)>,
tripwire: Tripwire,
pub actor_id: ActorId,
pub gossip_server_endpoint: quinn::Endpoint,
pub transport: Transport,
pub api_listener: TcpListener,
pub rx_bcast: Receiver<BroadcastInput>,
pub rx_apply: Receiver<(ActorId, i64)>,
pub rx_empty: Receiver<(ActorId, RangeInclusive<i64>)>,
pub rtt_rx: Receiver<(SocketAddr, Duration)>,
pub tripwire: Tripwire,
}

pub async fn setup(conf: Config, tripwire: Tripwire) -> eyre::Result<(Agent, AgentOptions)> {
Expand Down Expand Up @@ -261,13 +262,16 @@ pub async fn setup(conf: Config, tripwire: Tripwire) -> eyre::Result<(Agent, Age

let (tx_bcast, rx_bcast) = channel(10240);

let (tx_empty, rx_empty) = channel(10240);

let opts = AgentOptions {
actor_id,
gossip_server_endpoint,
transport,
api_listener,
rx_bcast,
rx_apply,
rx_empty,
rtt_rx,
tripwire: tripwire.clone(),
};
Expand All @@ -283,6 +287,7 @@ pub async fn setup(conf: Config, tripwire: Tripwire) -> eyre::Result<(Agent, Age
bookie,
tx_bcast,
tx_apply,
tx_empty,
schema: RwLock::new(schema),
tripwire,
});
Expand All @@ -307,6 +312,7 @@ pub async fn run(agent: Agent, opts: AgentOptions) -> eyre::Result<()> {
mut tripwire,
rx_bcast,
rx_apply,
rx_empty,
rtt_rx,
} = opts;

Expand Down Expand Up @@ -893,8 +899,14 @@ pub async fn run(agent: Agent, opts: AgentOptions) -> eyre::Result<()> {
);

spawn_counted(
sync_loop(agent.clone(), transport.clone(), rx_apply, tripwire.clone())
.inspect(|_| info!("corrosion agent sync loop is done")),
sync_loop(
agent.clone(),
transport.clone(),
rx_apply,
rx_empty,
tripwire.clone(),
)
.inspect(|_| info!("corrosion agent sync loop is done")),
);

let mut db_cleanup_interval = tokio::time::interval(Duration::from_secs(60 * 15));
Expand Down Expand Up @@ -987,7 +999,15 @@ async fn clear_overwritten_versions(agent: Agent) {
{
let booked = bookie.read().await;
for (actor_id, booked) in booked.iter() {
let versions = booked.read().await.current_versions();
let versions = {
match timeout(Duration::from_secs(1), booked.read()).await {
Ok(booked) => booked.current_versions(),
Err(_) => {
info!(%actor_id, "timed out acquiring read lock on bookkeeping, skipping for now");
continue;
}
}
};
if versions.is_empty() {
continue;
}
Expand Down Expand Up @@ -1081,7 +1101,7 @@ async fn clear_overwritten_versions(agent: Agent) {
db_version = NULL,
last_seq = NULL,
ts = NULL
WHERE end_version != excluded.end_version
WHERE end_version < excluded.end_version
",
)?
.execute(params![actor_id, range.start(), range.end()])?;
Expand Down Expand Up @@ -1271,12 +1291,16 @@ fn find_cleared_db_versions(tx: &Transaction) -> rusqlite::Result<BTreeSet<i64>>
.query_map([], |row| row.get(0))?
.collect::<Result<BTreeSet<String>, _>>()?;

if tables.is_empty() {
return Ok(BTreeSet::new());
}

let to_clear_query = format!(
"SELECT DISTINCT(db_version) FROM __corro_bookkeeping WHERE db_version IS NOT NULL
EXCEPT SELECT db_version FROM ({});",
tables
.iter()
.map(|table| format!("SELECT DISTINCT(__crsql_db_version) AS db_version FROM {table}"))
.map(|table| format!("SELECT DISTINCT(db_version) FROM {table}"))
.collect::<Vec<_>>()
.join(" UNION ")
);
Expand Down Expand Up @@ -1585,7 +1609,12 @@ fn store_empty_changeset(
"
INSERT INTO __corro_bookkeeping (actor_id, start_version, end_version, db_version, ts)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT (actor_id, start_version) DO NOTHING;
ON CONFLICT (actor_id, start_version) DO UPDATE SET
end_version = excluded.end_version,
db_version = NULL,
last_seq = NULL,
ts = NULL
WHERE end_version < excluded.end_version;
",
)?
.execute(params![
Expand Down Expand Up @@ -1830,17 +1859,36 @@ pub async fn process_multiple_changes(
continue;
}

let tx = conn.transaction()?;

let (known, changeset) = match process_single_version(&tx, change) {
Ok(res) => res,
Err(e) => {
error!(%actor_id, ?versions, "could not process single change: {e}");
continue;
// optimizing this, insert later!
let (known, changeset) = if change.is_complete() && change.is_empty() {
if let Err(e) = agent
.tx_empty()
.blocking_send((actor_id, change.versions()))
{
error!("could not send empty changed versions into channel: {e}");
}
};
// insert into in-memory bookkeeping right away
booked_write.insert_many(change.versions(), KnownDbVersion::Cleared);
(
KnownDbVersion::Cleared,
Changeset::Empty {
versions: change.versions(),
},
)
} else {
let tx = conn.transaction()?;

tx.commit()?;
let (known, changeset) = match process_single_version(&tx, change) {
Ok(res) => res,
Err(e) => {
error!(%actor_id, ?versions, "could not process single change: {e}");
continue;
}
};

tx.commit()?;
(known, changeset)
};

seen.insert(versions.clone(), known.clone());

Expand Down Expand Up @@ -1976,23 +2024,15 @@ fn process_complete_version(
tx: &Transaction,
actor_id: ActorId,
versions: RangeInclusive<i64>,
parts: Option<ChangesetParts>,
parts: ChangesetParts,
) -> rusqlite::Result<(KnownDbVersion, Changeset)> {
let ChangesetParts {
version,
changes,
seqs,
last_seq,
ts,
} = match parts {
None => {
store_empty_changeset(tx, actor_id, versions.clone())?;
info!(%actor_id, ?versions, "cleared empty versions range");
// booked_write.insert_many(versions.clone(), KnownDbVersion::Cleared);
return Ok((KnownDbVersion::Cleared, Changeset::Empty { versions }));
}
Some(parts) => parts,
};
} = parts;

info!(%actor_id, version, "complete change, applying right away! seqs: {seqs:?}, last_seq: {last_seq}");

Expand Down Expand Up @@ -2099,7 +2139,14 @@ fn process_single_version(
let versions = changeset.versions();

let (known, changeset) = if changeset.is_complete() {
process_complete_version(tx, actor_id, versions, changeset.into_parts())?
process_complete_version(
tx,
actor_id,
versions,
changeset
.into_parts()
.expect("no changeset parts, this shouldn't be happening!"),
)?
} else {
let parts = changeset.into_parts().unwrap();
let known = process_incomplete_version(tx, actor_id, &parts)?;
Expand Down Expand Up @@ -2285,10 +2332,13 @@ async fn handle_sync(agent: &Agent, transport: &Transport) -> Result<(), SyncCli
Ok(())
}

const CHECK_EMPTIES_TO_INSERT_AFTER: Duration = Duration::from_secs(120);

async fn sync_loop(
agent: Agent,
transport: Transport,
mut rx_apply: Receiver<(ActorId, i64)>,
mut rx_empty: Receiver<(ActorId, RangeInclusive<i64>)>,
mut tripwire: Tripwire,
) {
let mut sync_backoff = backoff::Backoff::new(0)
Expand All @@ -2297,6 +2347,62 @@ async fn sync_loop(
let next_sync_at = tokio::time::sleep(sync_backoff.next().unwrap());
tokio::pin!(next_sync_at);

spawn_counted({
let mut tripwire = tripwire.clone();
let agent = agent.clone();
async move {
let mut inserted_empties = 0;
let mut empties: BTreeMap<ActorId, Vec<RangeInclusive<i64>>> = BTreeMap::new();

let next_empties_check = tokio::time::sleep(CHECK_EMPTIES_TO_INSERT_AFTER);
tokio::pin!(next_empties_check);

loop {
tokio::select! {
maybe_empty = rx_empty.recv() => match maybe_empty {
Some((actor_id, versions)) => {
empties.entry(actor_id).or_default().push(versions);
inserted_empties += 1;

if inserted_empties < 1000 {
continue;
}
},
None => {
debug!("empties queue is done");
break;
}
},
_ = &mut next_empties_check => {
next_empties_check.as_mut().reset(tokio::time::Instant::now() + CHECK_EMPTIES_TO_INSERT_AFTER);
if empties.is_empty() {
continue;
}
},
_ = &mut tripwire => break
}

inserted_empties = 0;

if let Err(e) = process_completed_empties(&agent, &mut empties).await {
error!("could not process empties: {e}");
}
}
info!("Draining empty versions to process...");
// drain empties channel
while let Ok((actor_id, versions)) = rx_empty.try_recv() {
empties.entry(actor_id).or_default().push(versions);
}

if !empties.is_empty() {
info!("inserting last unprocessed empties before shut down");
if let Err(e) = process_completed_empties(&agent, &mut empties).await {
error!("could not process empties: {e}");
}
}
}
});

loop {
enum Branch {
Tick,
Expand Down Expand Up @@ -2362,6 +2468,33 @@ async fn sync_loop(
}
}

async fn process_completed_empties(
agent: &Agent,
empties: &mut BTreeMap<ActorId, Vec<RangeInclusive<i64>>>,
) -> eyre::Result<()> {
let mut conn = agent.pool().write_normal().await?;

block_in_place(|| {
let tx = conn.transaction()?;
while let Some((actor_id, empties)) = empties.pop_first() {
let booked = agent.bookie().for_actor_blocking(actor_id);
let bookedw = booked.blocking_write();

for (range, _) in empties
.iter()
.filter_map(|range| bookedw.get_key_value(range.start()))
.dedup()
{
store_empty_changeset(&tx, actor_id, range.clone())?;
}
}

tx.commit()?;

Ok(())
})
}

pub fn migrate(conn: &mut Connection) -> rusqlite::Result<()> {
let migrations: Vec<Box<dyn Migration>> = vec![
Box::new(init_migration as fn(&Transaction) -> rusqlite::Result<()>),
Expand Down Expand Up @@ -2916,10 +3049,10 @@ pub mod tests {

conn.execute_batch(
"
CREATE TABLE foo (a INTEGER PRIMARY KEY, b INTEGER);
CREATE TABLE foo (a INTEGER NOT NULL PRIMARY KEY, b INTEGER);
SELECT crsql_as_crr('foo');
CREATE TABLE foo2 (a INTEGER PRIMARY KEY, b INTEGER);
CREATE TABLE foo2 (a INTEGER NOT NULL PRIMARY KEY, b INTEGER);
SELECT crsql_as_crr('foo2');
",
)?;
Expand Down Expand Up @@ -2950,7 +3083,7 @@ pub mod tests {
}

{
let mut prepped = conn.prepare("SELECT DISTINCT(__crsql_db_version) AS db_version FROM foo2__crsql_clock UNION SELECT DISTINCT(__crsql_db_version) AS db_version FROM foo__crsql_clock;")?;
let mut prepped = conn.prepare("SELECT DISTINCT(db_version) FROM foo2__crsql_clock UNION SELECT DISTINCT(db_version) FROM foo__crsql_clock;")?;
let mut rows = prepped.query([])?;

while let Ok(Some(row)) = rows.next() {
Expand Down
Loading

0 comments on commit 4b723ea

Please sign in to comment.