feat(postgres): Change the postgres adapter to be partition aware#534
Open
feat(postgres): Change the postgres adapter to be partition aware#534
Conversation
This adds a postgres storage adapter for the taskbroker, as well as providing a way to choose between the adapters in the configuration. This adapter will also work with AlloyDB. In postgres, the keyword `offset` is reserved, so that column is called `kafka_offset` in the PG tables and converted to `offset`. The tests were updated to run with both the SQLite and Postgres adapter using the rstest crate. The `create_test_store` function was updated to be the standard for all tests, and to allow choosing between a SQLite and Postgres DB. A `remove_db` function was added to the trait and the existing adapters, since the tests create a unique PG database on every run that should be cleaned up. The `create_test_store` function was updated to be the standard for all tests, and to allow choosing between an SQLite and Postgres DB.
Have the postgres adapter only fetch and do upkeep on activations that are part of the partition that the consumer is assigned. The broker can still update tasks outside its partitions, in case a worker is connected to a broker that is then rebalanced. Change the consumer to pass the partitions to the store whenever partitions are assigned. This was originally tested with PARTITION BY, but that requires manually keeping track of the partition tables which isn't a desired behaviour.
8689312 to
05125fe
Compare
markstory
reviewed
Jan 19, 2026
| state = match (state, event) { | ||
| (ConsumerState::Ready, Event::Assign(tpl)) => { | ||
| metrics::gauge!("arroyo.consumer.current_partitions").set(tpl.len() as f64); | ||
| // Note: This assumes we only process one topic per consumer. |
Member
There was a problem hiding this comment.
So far that has always been true.
| read_pool: PgPool, | ||
| write_pool: PgPool, | ||
| config: PostgresActivationStoreConfig, | ||
| partitions: RwLock<Vec<i32>>, |
Member
There was a problem hiding this comment.
Should you use a tokio RwLock to avoid locking up one of the tokio threads?
Member
Author
There was a problem hiding this comment.
No because updating the partitions isn't an asynchronous operation. So I want this to block.
Member
There was a problem hiding this comment.
Ok, we'll need to be careful about .await calls inside the lock guards as we could deadlock more easily than with an async lock.
markstory
approved these changes
Feb 3, 2026
Member
markstory
left a comment
There was a problem hiding this comment.
Query changes look good to me.
| read_pool: PgPool, | ||
| write_pool: PgPool, | ||
| config: PostgresActivationStoreConfig, | ||
| partitions: RwLock<Vec<i32>>, |
Member
There was a problem hiding this comment.
Ok, we'll need to be careful about .await calls inside the lock guards as we could deadlock more easily than with an async lock.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Have the postgres adapter only fetch and do upkeep on activations that are part of the partition that the consumer is assigned. The broker can still update tasks outside its partitions, in case a worker is connected to a broker that is then rebalanced. Change the consumer to pass the partitions to the store whenever partitions are assigned.