The supa_queue
extension is a simple but robust message queue system designed to work seamlessly with Postgres. It allows you to manage and process asynchronous jobs efficiently within your Postgres database.
You can read the blog post explaining this system here: Building a Queue System with Supabase and PostgreSQL
- Simple message queue system.
- Support for various HTTP methods (GET, POST, DELETE).
- Retry mechanism for failed jobs.
- Concurrent processing using multiple workers.
- Integration with external services via HTTP requests.
- Flexible scheduling with
pg_cron
for optimal job management.
To install the supa_queue
extension, follow these steps:
-
Make sure you have Postgres installed and running.
-
Install the required extensions
pg_cron
andpg_net
if not already installed. -
Install DB.DEV:
create extension if not exists http with schema extensions;
create extension if not exists pg_tle;
select pgtle.uninstall_extension_if_exists('supabase-dbdev');
drop extension if exists "supabase-dbdev";
select
pgtle.install_extension(
'supabase-dbdev',
resp.contents ->> 'version',
'PostgreSQL package manager',
resp.contents ->> 'sql'
)
from http(
(
'GET',
'https://api.database.dev/rest/v1/'
|| 'package_versions?select=sql,version'
|| '&package_name=eq.supabase-dbdev'
|| '&order=version.desc'
|| '&limit=1',
array[
('apiKey', 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZSIsInJlZiI6InhtdXB0cHBsZnZpaWZyYndtbXR2Iiwicm9sZSI6ImFub24iLCJpYXQiOjE2ODAxMDczNzIsImV4cCI6MTk5NTY4MzM3Mn0.z2CN0mvO2No8wSi46Gw59DFGCTJrzM0AQKsu_5k134s')::http_header
],
null,
null
)
) x,
lateral (
select
((row_to_json(x) -> 'content') #>> '{}')::json -> 0
) resp(contents);
create extension "supabase-dbdev";
select dbdev.install('supabase-dbdev');
drop extension if exists "supabase-dbdev";
create extension "supabase-dbdev";
- Install supa_queue with the following code:
select dbdev.install('mansueli-supa_queue');
create extension "mansueli-supa_queue"
version '1.0.4';
You can also install it in a different schema
with:
create schema supa_queue;
select dbdev.install('mansueli-supa_queue');
create extension "mansueli-supa_queue"
schema supa_queue
version '1.0.4';
To add a job to the queue, insert a new record into the job_queue
table. Specify the HTTP verb (GET, POST, DELETE), payload, and other relevant information. The job will be processed asynchronously.
INSERT INTO job_queue (http_verb, payload, url_path) VALUES ('GET', '{"key": "value"}', '/api/resource');
Jobs are processed automatically using the provided functions. The process_job()
trigger function processes newly inserted jobs. The process_current_jobs_if_unlocked()
function assigns jobs to available workers for execution.
Failed jobs are automatically retried with the retry_failed_jobs()
function, increasing job reliability. Jobs with a status of 'failed' and within the retry limit will be retried.
You can configure various aspects of the supa_queue
extension by modifying the provided SQL functions and cron schedules to suit your specific use case.
Note that you'll need to set these values in Vault:
service_role
key you can get this in the dashboard.consumer_function
this is the URL of the Edge Function that will consume the tasks.
The job_queue
table comes with several values by default, which you can tweak for more advanced cases or use the standard values.
Here's the definition of each column:
Usable params to be used when adding tasks:
http_verb
: HTTP verb to be used when consuming the functionpayload
: the payload of the task in the queuestatus
: the status of the request (⚠️ you should leave this empty⚠️ )retry_limit
: the amount of times a job can be retriedurl_path
: additional paths to be passed to the consuming edge function for routing cases
Internal parameters (should not be changed by you):
job_id
: the id for the tableretry_count
: another internal parametercontent
: the content received from the edge function (response)created_at
: timestamp of when the task entered the queue
CREATE TABLE job_queue (
job_id BIGINT GENERATED BY DEFAULT AS IDENTITY,
http_verb TEXT NOT NULL CHECK (http_verb IN ('GET', 'POST', 'DELETE')),
payload jsonb,
status TEXT NOT NULL DEFAULT '',
retry_count INTEGER DEFAULT 0,
retry_limit INTEGER DEFAULT 10,
url_path TEXT DEFAULT '',
content TEXT DEFAULT '',
created_at TIMESTAMPTZ DEFAULT NOW()
);
This extension is provided under the license included in the repository.
If you'd like to contribute to this project or report issues, please visit the GitHub repository for more information.