Simple, reliable, and scalable pub/sub system based on FastAPI and PostgreSQL
fastpubsub is a lightweight publish-subscribe messaging system built with FastAPI and PostgreSQL. It provides a simple HTTP API for message publishing and subscription management with powerful features like message filtering, delivery guarantees, dead-letter queues, and automatic retries with exponential backoff. The system is built with asyncio for efficient concurrent operations and uses SQLAlchemy's async engine with psycopg's native async support.
fastpubsub is not intended to replace dedicated high-throughput messaging systems like Google Cloud Pub/Sub, NATS, or Apache Kafka. Instead, it brings key pub/sub features to simple architectures where you already have PostgreSQL available. If you're running a small to medium-sized application with PostgreSQL as your primary database, fastpubsub lets you add reliable messaging capabilities without introducing additional infrastructure complexity.
Use fastpubsub when you:
- Already use PostgreSQL and want to avoid managing separate message brokers
- Want to keep your stack simple with fewer moving parts
- Need pub/sub functionality for small to medium workloads
- Prefer simplicity over maximum throughput
- Want a single database for both application data and messaging
- Need reliable message delivery with retries and dead-letter queues
Consider dedicated message brokers when you:
- Need to handle millions of messages per second
- Require horizontal scalability across multiple datacenters
- Need advanced features like message streaming or complex routing
- Want to decouple your messaging infrastructure from your database
- 🎯 Topic-based messaging - Organize messages by topics
- 🔒 Secure - Built-in authentication with JWT and scope-based permissions
- 🔍 Message filtering - Subscribe to specific messages using JSON-based filters
- 🔄 Automatic retries - Configurable retry logic with exponential backoff
- 💀 Dead Letter Queue (DLQ) - Handle failed messages gracefully
- 📊 Metrics & Monitoring - Built-in subscription metrics and Prometheus support
- 🐳 Docker-ready - Easy deployment with Docker
- 🛡️ Reliable delivery - Acknowledgment and negative-acknowledgment support
- 🧹 Automatic cleanup - Background jobs for message maintenance
fastpubsub uses PostgreSQL as its backend, leveraging stored procedures and JSONB capabilities for high-performance message routing and filtering. The system is built with asyncio for efficient concurrent operations, using SQLAlchemy's async engine with psycopg's native async support. The architecture consists of:
- API Server: Asynchronous RESTful HTTP API for all operations
- Database: PostgreSQL with custom functions for message management, accessed via async SQLAlchemy
- Cleanup Workers: Background jobs for message maintenance
- Publish: Messages are published to a topic
- Route: Messages are routed to all subscriptions for that topic
- Filter: Subscriptions with filters only receive matching messages
- Consume: Consumers fetch messages in batches
- Process: Consumer processes the message
- ACK/NACK: Consumer acknowledges success or requests retry
- Retry: Failed messages are retried with exponential backoff
- DLQ: Messages exceeding max attempts move to the dead letter queue
All commands use the official Docker image from Docker Hub.
- Docker installed
- PostgreSQL database (can also run in Docker)
First, you need to run database migrations:
docker run --rm \
-e FASTPUBSUB_DATABASE_URL='postgresql+psycopg://YOUR_USER:YOUR_PASSWORD@YOUR_HOST:5432/YOUR_DATABASE' \
allisson/fastpubsub db-migrateRun the API server:
docker run -p 8000:8000 \
-e FASTPUBSUB_DATABASE_URL='postgresql+psycopg://YOUR_USER:YOUR_PASSWORD@YOUR_HOST:5432/YOUR_DATABASE' \
allisson/fastpubsub serverThe API will be available at http://localhost:8000. You can access the interactive API documentation at http://localhost:8000/docs.
Apply database migrations to set up or upgrade the schema:
docker run --rm \
-e FASTPUBSUB_DATABASE_URL='postgresql+psycopg://YOUR_USER:YOUR_PASSWORD@YOUR_HOST:5432/YOUR_DATABASE' \
allisson/fastpubsub db-migrateThis command creates all necessary tables, indexes, and stored procedures.
Start the HTTP API server:
docker run -p 8000:8000 \
-e FASTPUBSUB_DATABASE_URL='postgresql+psycopg://YOUR_USER:YOUR_PASSWORD@YOUR_HOST:5432/YOUR_DATABASE' \
allisson/fastpubsub serverThe server runs with Gunicorn and Uvicorn workers for production-grade performance.
Remove acknowledged messages older than a specified threshold:
docker run --rm \
-e FASTPUBSUB_DATABASE_URL='postgresql+psycopg://YOUR_USER:YOUR_PASSWORD@YOUR_HOST:5432/YOUR_DATABASE' \
allisson/fastpubsub cleanup_acked_messagesThis removes acked messages to prevent database bloat. By default, messages older than 1 hour (3600 seconds) are deleted.
Release messages that are stuck in "delivered" state (locked but not acked/nacked):
docker run --rm \
-e FASTPUBSUB_DATABASE_URL='postgresql+psycopg://YOUR_USER:YOUR_PASSWORD@YOUR_HOST:5432/YOUR_DATABASE' \
allisson/fastpubsub cleanup_stuck_messagesThis handles cases where a consumer crashed without acknowledging messages. By default, messages locked for more than 60 seconds are released.
It's recommended to run cleanup commands periodically using cron or a scheduler like Kubernetes CronJob:
# Example: Run cleanup_acked_messages every hour
0 * * * * docker run --rm -e FASTPUBSUB_DATABASE_URL='postgresql+psycopg://YOUR_USER:YOUR_PASSWORD@YOUR_HOST:5432/YOUR_DATABASE' allisson/fastpubsub cleanup_acked_messages
# Example: Run cleanup_stuck_messages every 5 minutes
*/5 * * * * docker run --rm -e FASTPUBSUB_DATABASE_URL='postgresql+psycopg://YOUR_USER:YOUR_PASSWORD@YOUR_HOST:5432/YOUR_DATABASE' allisson/fastpubsub cleanup_stuck_messagesGenerate a secure random secret key for authentication:
docker run --rm allisson/fastpubsub generate_secret_keyOutput:
new_secret=a1b2c3d4e5f6g7h8i9j0k1l2m3n4o5p6
Use this secret key to set the FASTPUBSUB_AUTH_SECRET_KEY environment variable.
Create a new client with API credentials:
docker run --rm \
-e FASTPUBSUB_DATABASE_URL='postgresql+psycopg://YOUR_USER:YOUR_PASSWORD@YOUR_HOST:5432/YOUR_DATABASE' \
-e FASTPUBSUB_AUTH_ENABLED='true' \
-e FASTPUBSUB_AUTH_SECRET_KEY='your-secret-key' \
allisson/fastpubsub create_client "My Application" "*" trueArguments:
- Client name (e.g., "My Application")
- Scopes (e.g., "*" for admin, or "topics:create topics:read")
- Is active flag (true or false)
Output:
client_id=550e8400-e29b-41d4-a716-446655440000
client_secret=a1b2c3d4e5f6g7h8
Save the client_id and client_secret securely - the secret cannot be retrieved later.
Configure fastpubsub using environment variables. All variables are prefixed with FASTPUBSUB_.
| Variable | Description | Default |
|---|---|---|
FASTPUBSUB_DATABASE_URL |
PostgreSQL connection URL (required) | - |
FASTPUBSUB_DATABASE_ECHO |
Enable SQLAlchemy query logging | false |
FASTPUBSUB_DATABASE_POOL_SIZE |
Connection pool size | 5 |
FASTPUBSUB_DATABASE_MAX_OVERFLOW |
Max overflow connections | 10 |
FASTPUBSUB_DATABASE_POOL_PRE_PING |
Test connections before use | true |
| Variable | Description | Default |
|---|---|---|
FASTPUBSUB_LOG_LEVEL |
Log level (debug, info, warning, error) | info |
FASTPUBSUB_LOG_FORMATTER |
Log format string | See below |
Default log format:
asctime=%(asctime)s level=%(levelname)s pathname=%(pathname)s line=%(lineno)s message=%(message)s
| Variable | Description | Default |
|---|---|---|
FASTPUBSUB_SUBSCRIPTION_MAX_ATTEMPTS |
Maximum delivery attempts before DLQ | 5 |
FASTPUBSUB_SUBSCRIPTION_BACKOFF_MIN_SECONDS |
Minimum retry delay | 5 |
FASTPUBSUB_SUBSCRIPTION_BACKOFF_MAX_SECONDS |
Maximum retry delay | 300 |
| Variable | Description | Default |
|---|---|---|
FASTPUBSUB_API_DEBUG |
Enable debug mode | false |
FASTPUBSUB_API_HOST |
Server bind host | 0.0.0.0 |
FASTPUBSUB_API_PORT |
Server port | 8000 |
FASTPUBSUB_API_NUM_WORKERS |
Number of Gunicorn workers | 1 |
| Variable | Description | Default |
|---|---|---|
FASTPUBSUB_AUTH_ENABLED |
Enable authentication | false |
FASTPUBSUB_AUTH_SECRET_KEY |
Secret key for JWT signing (required if auth enabled) | None |
FASTPUBSUB_AUTH_ALGORITHM |
JWT signing algorithm | HS256 |
FASTPUBSUB_AUTH_ACCESS_TOKEN_EXPIRE_MINUTES |
Access token expiration time in minutes | 30 |
| Variable | Description | Default |
|---|---|---|
FASTPUBSUB_CLEANUP_ACKED_MESSAGES_OLDER_THAN_SECONDS |
Delete acked messages older than (seconds) | 3600 |
FASTPUBSUB_CLEANUP_STUCK_MESSAGES_LOCK_TIMEOUT_SECONDS |
Release messages locked longer than (seconds) | 60 |
docker run -p 8000:8000 \
-e FASTPUBSUB_DATABASE_URL='postgresql+psycopg://YOUR_USER:YOUR_PASSWORD@YOUR_HOST:5432/YOUR_DATABASE' \
-e FASTPUBSUB_LOG_LEVEL='info' \
-e FASTPUBSUB_API_NUM_WORKERS='4' \
-e FASTPUBSUB_SUBSCRIPTION_MAX_ATTEMPTS='10' \
allisson/fastpubsub serverfastpubsub supports optional JWT-based authentication to secure API access. When authentication is disabled (default), all API endpoints are accessible without credentials. When enabled, clients must authenticate using OAuth2 client credentials flow.
Authentication uses a scope-based permission system. Scopes can be global or object-specific:
Global Scopes:
*- Admin mode, full access to all resources and operationstopics:create- Can create new topicstopics:read- Can list or get topicstopics:delete- Can delete topicstopics:publish- Can publish messages to topicssubscriptions:create- Can create new subscriptionssubscriptions:read- Can list or get subscriptionssubscriptions:delete- Can delete subscriptionssubscriptions:consume- Can consume messages from subscriptionsclients:create- Can create new clientsclients:update- Can update clientsclients:read- Can list or get clientsclients:delete- Can delete clients
Object-Specific Scopes:
You can restrict access to specific resources by appending the resource ID to the scope:
topics:publish:my-topic-id- Can only publish to the topic with ID "my-topic-id"subscriptions:consume:my-subscription- Can only consume from the subscription with ID "my-subscription"
Multiple scopes can be combined, separated by spaces: topics:create topics:read subscriptions:read
Request:
POST /oauth/token
Content-Type: application/json
{
"client_id": "550e8400-e29b-41d4-a716-446655440000",
"client_secret": "a1b2c3d4e5f6g7h8"
}Response: 201 Created
{
"access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...",
"token_type": "Bearer",
"expires_in": 1800,
"scope": "topics:create topics:read"
}Include the access token in the Authorization header for authenticated requests:
curl -H "Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..." \
http://localhost:8000/topicsClients represent applications or services that access the API. Each client has credentials (client_id and client_secret) and a set of scopes that define their permissions.
POST /clients
Authorization: Bearer <token>Request Body:
{
"name": "My Application",
"scopes": "topics:create topics:read subscriptions:consume",
"is_active": true
}Response: 201 Created
{
"id": "550e8400-e29b-41d4-a716-446655440000",
"secret": "a1b2c3d4e5f6g7h8"
}Note: The client secret is only returned once during creation. Store it securely.
GET /clients/{id}
Authorization: Bearer <token>Response: 200 OK
{
"id": "550e8400-e29b-41d4-a716-446655440000",
"name": "My Application",
"scopes": "topics:create topics:read",
"is_active": true,
"token_version": 1,
"created_at": "2025-12-29T15:30:00Z",
"updated_at": "2025-12-29T15:30:00Z"
}GET /clients?offset=0&limit=10
Authorization: Bearer <token>Response: 200 OK
{
"data": [
{
"id": "550e8400-e29b-41d4-a716-446655440000",
"name": "My Application",
"scopes": "topics:create topics:read",
"is_active": true,
"token_version": 1,
"created_at": "2025-12-29T15:30:00Z",
"updated_at": "2025-12-29T15:30:00Z"
}
]
}PUT /clients/{id}
Authorization: Bearer <token>Request Body:
{
"name": "Updated Application Name",
"scopes": "topics:read subscriptions:read",
"is_active": true
}Response: 200 OK
Note: Updating a client increments its token_version, which invalidates all existing access tokens for that client.
DELETE /clients/{id}
Authorization: Bearer <token>Response: 204 No Content
Topics are channels where messages are published.
POST /topicsRequest Body:
{
"id": "user-events"
}Response: 201 Created
{
"id": "user-events",
"created_at": "2025-12-29T15:30:00Z"
}GET /topics/{id}Response: 200 OK
GET /topics?offset=0&limit=10Response: 200 OK
{
"data": [
{
"id": "user-events",
"created_at": "2025-12-29T15:30:00Z"
}
]
}DELETE /topics/{id}Response: 204 No Content
POST /topics/{id}/messagesRequest Body:
[
{
"event": "user.created",
"user_id": "123",
"country": "BR"
},
{
"event": "user.updated",
"user_id": "456",
"country": "US"
}
]Response: 204 No Content
Subscriptions receive messages from topics, optionally filtered.
POST /subscriptionsRequest Body:
{
"id": "user-processor",
"topic_id": "user-events",
"filter": {"country": ["BR", "US"]},
"max_delivery_attempts": 5,
"backoff_min_seconds": 5,
"backoff_max_seconds": 300
}Response: 201 Created
Filter Examples:
{"country": ["BR", "US"]}- Messages where country is BR or US{"event": ["user.created"]}- Only user.created events{"premium": [true]}- Only premium users{}ornull- No filtering, receive all messages
GET /subscriptions/{id}Response: 200 OK
GET /subscriptions?offset=0&limit=10Response: 200 OK
DELETE /subscriptions/{id}Response: 204 No Content
Retrieve messages from a subscription:
GET /subscriptions/{id}/messages?consumer_id=worker-1&batch_size=10Response: 200 OK
{
"data": [
{
"id": "550e8400-e29b-41d4-a716-446655440000",
"subscription_id": "user-processor",
"payload": {
"event": "user.created",
"user_id": "123",
"country": "BR"
},
"delivery_attempts": 1,
"created_at": "2025-12-29T15:30:00Z"
}
]
}Parameters:
consumer_id: Unique identifier for the consumer (required)batch_size: Number of messages to retrieve (default: 10, max: 100)
Mark messages as successfully processed:
POST /subscriptions/{id}/acksRequest Body:
[
"550e8400-e29b-41d4-a716-446655440000",
"660e8400-e29b-41d4-a716-446655440001"
]Response: 204 No Content
Mark messages for retry:
POST /subscriptions/{id}/nacksRequest Body:
[
"550e8400-e29b-41d4-a716-446655440000"
]Response: 204 No Content
NACKed messages will be retried with exponential backoff until max_delivery_attempts is reached.
Messages that exceed max_delivery_attempts are moved to the DLQ.
GET /subscriptions/{id}/dlq?offset=0&limit=10Response: 200 OK
Move messages back to the subscription queue for reprocessing:
POST /subscriptions/{id}/dlq/reprocessRequest Body:
[
"550e8400-e29b-41d4-a716-446655440000"
]Response: 204 No Content
GET /subscriptions/{id}/metricsResponse: 200 OK
{
"subscription_id": "user-processor",
"available": 150,
"delivered": 25,
"acked": 1000,
"dlq": 5
}Metrics:
available: Messages ready to be consumeddelivered: Messages currently locked by consumersacked: Total acknowledged messagesdlq: Messages in the dead letter queue
Health check endpoints are useful for monitoring and orchestration systems like Kubernetes.
Check if the application is alive:
GET /livenessResponse: 200 OK
{
"status": "alive"
}The liveness endpoint always returns a successful response if the application is running. Use this endpoint to determine if the application needs to be restarted.
Check if the application is ready to handle requests:
GET /readinessResponse: 200 OK
{
"status": "ready"
}Error Response: 503 Service Unavailable
{
"detail": "database is down"
}The readiness endpoint checks if the database connection is healthy. Use this endpoint to determine if the application should receive traffic.
Get Prometheus-compatible metrics for monitoring:
GET /metricsResponse: 200 OK (Prometheus text format)
# HELP http_requests_total Total number of requests by method, path and status
# TYPE http_requests_total counter
http_requests_total{method="GET",path="/topics",status="200"} 42.0
...
The metrics endpoint exposes application metrics in Prometheus format, including:
- HTTP request counts and latencies
- Request duration histograms
- Active requests gauge
- And other standard FastAPI metrics
You can configure Prometheus to scrape this endpoint for monitoring and alerting.
# 1. Generate a secret key
docker run --rm allisson/fastpubsub generate_secret_key
# Output: new_secret=a1b2c3d4e5f6g7h8i9j0k1l2m3n4o5p6
# 2. Start the server with authentication enabled
docker run -p 8000:8000 \
-e FASTPUBSUB_DATABASE_URL='postgresql+psycopg://YOUR_USER:YOUR_PASSWORD@YOUR_HOST:5432/YOUR_DATABASE' \
-e FASTPUBSUB_AUTH_ENABLED='true' \
-e FASTPUBSUB_AUTH_SECRET_KEY='a1b2c3d4e5f6g7h8i9j0k1l2m3n4o5p6' \
allisson/fastpubsub server
# 3. Create an admin client (requires initial client creation via CLI or direct DB access)
docker run --rm \
-e FASTPUBSUB_DATABASE_URL='postgresql+psycopg://YOUR_USER:YOUR_PASSWORD@YOUR_HOST:5432/YOUR_DATABASE' \
-e FASTPUBSUB_AUTH_ENABLED='true' \
-e FASTPUBSUB_AUTH_SECRET_KEY='a1b2c3d4e5f6g7h8i9j0k1l2m3n4o5p6' \
allisson/fastpubsub create_client "Admin Client" "*" true
# Output:
# client_id=550e8400-e29b-41d4-a716-446655440000
# client_secret=a1b2c3d4e5f6g7h8
# 4. Get an access token
curl -X POST http://localhost:8000/oauth/token \
-H "Content-Type: application/json" \
-d '{
"client_id": "550e8400-e29b-41d4-a716-446655440000",
"client_secret": "a1b2c3d4e5f6g7h8"
}'
# Output: {"access_token": "eyJhbGc...", "token_type": "Bearer", "expires_in": 1800, "scope": "*"}
# 5. Use the token to access protected endpoints
TOKEN="eyJhbGc..."
curl -H "Authorization: Bearer $TOKEN" http://localhost:8000/topics# 1. Create a topic
curl -X POST http://localhost:8000/topics \
-H "Content-Type: application/json" \
-d '{"id": "notifications"}'
# 2. Create a subscription
curl -X POST http://localhost:8000/subscriptions \
-H "Content-Type: application/json" \
-d '{
"id": "email-sender",
"topic_id": "notifications"
}'
# 3. Publish messages
curl -X POST http://localhost:8000/topics/notifications/messages \
-H "Content-Type: application/json" \
-d '[
{"type": "email", "to": "[email protected]", "subject": "Welcome!"}
]'
# 4. Consume messages
curl "http://localhost:8000/subscriptions/email-sender/messages?consumer_id=worker-1&batch_size=10"
# 5. Acknowledge messages
curl -X POST http://localhost:8000/subscriptions/email-sender/acks \
-H "Content-Type: application/json" \
-d '["550e8400-e29b-41d4-a716-446655440000"]'# Assuming you have an admin token
ADMIN_TOKEN="eyJhbGc..."
# 1. Create a client that can only publish to a specific topic
curl -X POST http://localhost:8000/clients \
-H "Authorization: Bearer $ADMIN_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"name": "Publisher Service",
"scopes": "topics:publish:notifications",
"is_active": true
}'
# 2. Create a client that can only consume from a specific subscription
curl -X POST http://localhost:8000/clients \
-H "Authorization: Bearer $ADMIN_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"name": "Consumer Service",
"scopes": "subscriptions:consume:email-sender",
"is_active": true
}'
# 3. Create a client with multiple permissions
curl -X POST http://localhost:8000/clients \
-H "Authorization: Bearer $ADMIN_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"name": "Multi-Purpose Service",
"scopes": "topics:create topics:read topics:publish subscriptions:create subscriptions:read",
"is_active": true
}'# Create a subscription that only receives messages from BR and US
curl -X POST http://localhost:8000/subscriptions \
-H "Content-Type: application/json" \
-d '{
"id": "regional-processor",
"topic_id": "user-events",
"filter": {"country": ["BR", "US"]}
}'
# Publish messages - only BR/US messages will be routed to this subscription
curl -X POST http://localhost:8000/topics/user-events/messages \
-H "Content-Type: application/json" \
-d '[
{"event": "user.login", "user_id": "1", "country": "BR"},
{"event": "user.login", "user_id": "2", "country": "JP"},
{"event": "user.login", "user_id": "3", "country": "US"}
]'The subscription will only receive messages with country set to "BR" or "US" (messages for user 1 and 3, not user 2).
The filter feature uses a simple JSON style where keys are field names and values are arrays of acceptable values:
{
"filter": {
"event_type": ["order.created", "order.updated"],
"priority": ["high", "critical"],
"region": ["us-east", "us-west"]
}
}This filter matches messages that have:
event_typeequal to "order.created" OR "order.updated"- AND
priorityequal to "high" OR "critical" - AND
regionequal to "us-east" OR "us-west"
# Check metrics to see if there are DLQ messages
curl "http://localhost:8000/subscriptions/email-sender/metrics"
# List DLQ messages
curl "http://localhost:8000/subscriptions/email-sender/dlq"
# Reprocess DLQ messages after fixing the issue
curl -X POST http://localhost:8000/subscriptions/email-sender/dlq/reprocess \
-H "Content-Type: application/json" \
-d '["550e8400-e29b-41d4-a716-446655440000"]'# Check if the application is alive (for restart decisions)
curl "http://localhost:8000/liveness"
# Check if the application is ready to serve traffic
curl "http://localhost:8000/readiness"Kubernetes example configuration:
livenessProbe:
httpGet:
path: /liveness
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /readiness
port: 8000
initialDelaySeconds: 10
periodSeconds: 5# Access Prometheus metrics
curl "http://localhost:8000/metrics"Prometheus scrape configuration:
scrape_configs:
- job_name: 'fastpubsub'
static_configs:
- targets: ['localhost:8000']
metrics_path: '/metrics'
scrape_interval: 15sRunning multiple consumers in parallel for the same subscription helps process messages faster:
# Terminal 1: Start consumer worker 1
while true; do
MESSAGES=$(curl -s "http://localhost:8000/subscriptions/email-sender/messages?consumer_id=worker-1&batch_size=10")
echo "$MESSAGES" | jq -r '.data[].id' | while read -r msg_id; do
# Process message here
echo "Worker 1 processing: $msg_id"
# Acknowledge after processing
curl -X POST http://localhost:8000/subscriptions/email-sender/acks \
-H "Content-Type: application/json" \
-d "[\"$msg_id\"]"
done
sleep 1
done
# Terminal 2: Start consumer worker 2
while true; do
MESSAGES=$(curl -s "http://localhost:8000/subscriptions/email-sender/messages?consumer_id=worker-2&batch_size=10")
echo "$MESSAGES" | jq -r '.data[].id' | while read -r msg_id; do
echo "Worker 2 processing: $msg_id"
curl -X POST http://localhost:8000/subscriptions/email-sender/acks \
-H "Content-Type: application/json" \
-d "[\"$msg_id\"]"
done
sleep 1
doneEach consumer uses a unique consumer_id to identify itself. Messages are locked to prevent duplicate processing across consumers.
Proper error handling ensures reliable message processing:
import asyncio
import httpx
import logging
from typing import Dict, Any
logger = logging.getLogger(__name__)
# Define custom exceptions (implement based on your business logic)
class RetriableError(Exception):
"""Temporary error that should be retried"""
pass
class PermanentError(Exception):
"""Permanent error that should not be retried"""
pass
async def process_single_message(payload: Dict[str, Any]):
"""
Process a single message - implement your business logic here.
Raise RetriableError for temporary failures (network issues, service unavailable).
Raise PermanentError for permanent failures (invalid data, business rule violation).
"""
# Example implementation
if not payload.get("email"):
raise PermanentError("Missing required field: email")
try:
# Your actual processing logic here
# For example: send email, update database, call external API, etc.
logger.info(f"Processing message: {payload}")
except ConnectionError:
# Temporary network issue - retry later
raise RetriableError("Network connection failed")
async def process_messages(subscription_id: str, consumer_id: str):
"""Consumer implementation with proper error handling"""
base_url = "http://localhost:8000"
async with httpx.AsyncClient() as client:
while True:
try:
# Fetch messages
response = await client.get(
f"{base_url}/subscriptions/{subscription_id}/messages",
params={"consumer_id": consumer_id, "batch_size": 10},
timeout=30.0
)
response.raise_for_status()
messages = response.json()["data"]
if not messages:
await asyncio.sleep(1)
continue
# Process each message
ack_ids = []
nack_ids = []
for message in messages:
msg_id = message["id"]
try:
await process_single_message(message["payload"])
ack_ids.append(msg_id)
logger.info(f"Successfully processed message {msg_id}")
except RetriableError as e:
# Temporary error - retry later
nack_ids.append(msg_id)
logger.warning(f"Retriable error for {msg_id}: {e}")
except PermanentError as e:
# Permanent error - ack to prevent retries
ack_ids.append(msg_id)
logger.error(f"Permanent error for {msg_id}: {e}")
# Acknowledge successful/permanent-error messages
if ack_ids:
await client.post(
f"{base_url}/subscriptions/{subscription_id}/acks",
json=ack_ids,
timeout=10.0
)
# NACK retriable errors for retry with backoff
if nack_ids:
await client.post(
f"{base_url}/subscriptions/{subscription_id}/nacks",
json=nack_ids,
timeout=10.0
)
except httpx.HTTPError as e:
logger.error(f"HTTP error: {e}")
await asyncio.sleep(5)
except Exception as e:
logger.error(f"Unexpected error: {e}")
await asyncio.sleep(5)
if __name__ == "__main__":
# Run the consumer
asyncio.run(process_messages("email-sender", "worker-1"))Best practices shown:
- Distinguish between retriable and permanent errors
- ACK permanent errors to prevent infinite retries
- NACK retriable errors to trigger exponential backoff
- Use timeouts to prevent hanging
- Log processing status for debugging
Set up monitoring to track system health:
# Create a monitoring script (monitor.sh)
#!/bin/bash
SUBSCRIPTION_ID="email-sender"
API_URL="http://localhost:8000"
# Get metrics
METRICS=$(curl -s "$API_URL/subscriptions/$SUBSCRIPTION_ID/metrics")
AVAILABLE=$(echo "$METRICS" | jq -r '.available')
DELIVERED=$(echo "$METRICS" | jq -r '.delivered')
DLQ=$(echo "$METRICS" | jq -r '.dlq')
echo "Subscription: $SUBSCRIPTION_ID"
echo "Available messages: $AVAILABLE"
echo "Delivered messages: $DELIVERED"
echo "DLQ messages: $DLQ"
# Alert if DLQ has messages
if [ "$DLQ" -gt 0 ]; then
echo "⚠️ WARNING: $DLQ messages in dead letter queue!"
# Send alert (e.g., to Slack, PagerDuty, etc.)
# curl -X POST YOUR_WEBHOOK_URL -d "DLQ has $DLQ messages"
fi
# Alert if messages are piling up
if [ "$AVAILABLE" -gt 1000 ]; then
echo "⚠️ WARNING: $AVAILABLE messages waiting (consumer may be slow)"
fi
# Alert if too many messages are stuck in delivered state
if [ "$DELIVERED" -gt 100 ]; then
echo "⚠️ WARNING: $DELIVERED messages in delivered state (possible consumer crash)"
fiOne message published to multiple subscriptions for different purposes:
# 1. Create a topic for order events
curl -X POST http://localhost:8000/topics \
-H "Content-Type: application/json" \
-d '{"id": "orders"}'
# 2. Create multiple subscriptions for different purposes
# Subscription for sending emails
curl -X POST http://localhost:8000/subscriptions \
-H "Content-Type: application/json" \
-d '{
"id": "order-emails",
"topic_id": "orders"
}'
# Subscription for updating inventory
curl -X POST http://localhost:8000/subscriptions \
-H "Content-Type: application/json" \
-d '{
"id": "order-inventory",
"topic_id": "orders"
}'
# Subscription for analytics (only completed orders)
curl -X POST http://localhost:8000/subscriptions \
-H "Content-Type: application/json" \
-d '{
"id": "order-analytics",
"topic_id": "orders",
"filter": {"status": ["completed"]}
}'
# 3. Publish an order event
curl -X POST http://localhost:8000/topics/orders/messages \
-H "Content-Type: application/json" \
-d '[
{
"order_id": "12345",
"customer_email": "[email protected]",
"status": "completed",
"total": 99.99
}
]'
# Result: All three subscriptions receive the message
# - order-emails: Sends confirmation email
# - order-inventory: Updates stock levels
# - order-analytics: Records completed order for analyticsThis pattern allows you to decouple different parts of your system while maintaining a single source of truth for events.
- Always acknowledge messages: Use ACK for success, NACK for retriable failures
- Use unique consumer IDs: Helps with debugging and metrics
- Handle idempotency: Messages may be delivered more than once
- Implement timeouts: Don't let message processing hang indefinitely
- Monitor DLQ: Regularly check and handle dead-letter messages
- Batch consumption: Use appropriate
batch_sizefor your workload - Multiple workers: Run multiple consumers with different
consumer_id - Optimize filters: More specific filters reduce unnecessary message delivery
- Regular cleanup: Schedule cleanup jobs to maintain database performance
- Connection pooling: Configure appropriate pool sizes for your load
- Run cleanup workers: Essential for production deployments
- Monitor metrics: Track available, delivered, acked, and DLQ counts
- Set appropriate timeouts: Configure backoff settings based on your use case
- Database backups: Regular PostgreSQL backups are crucial
- Enable authentication: Set
FASTPUBSUB_AUTH_ENABLED=truefor production deployments - Secure secret keys: Generate strong secret keys using the
generate_secret_keycommand - Principle of least privilege: Grant clients only the scopes they need
- Rotate credentials: Regularly update client secrets by recreating clients
- Token management: Access tokens expire after 30 minutes by default (configurable)
- Revoke access: Update a client to increment its
token_versionand invalidate all existing tokens
Once the server is running, you can access the interactive API documentation:
- Swagger UI:
http://localhost:8000/docs - ReDoc:
http://localhost:8000/redoc
This project is licensed under the MIT License - see the LICENSE file for details.
Contributions are welcome! Please feel free to submit issues and pull requests.
If you're having trouble connecting to the database from Docker:
- Use
host.docker.internalinstead oflocalhostwhen running on Docker Desktop - Ensure your PostgreSQL allows connections from Docker networks
- Check firewall rules if using a remote database
- Verify the subscription exists and is properly configured
- Check if filters are too restrictive
- Look at metrics to see message counts
- Ensure cleanup_stuck_messages is running if consumers crashed
- Increase the number of API workers (
FASTPUBSUB_API_NUM_WORKERS) - Run multiple consumer instances
- Check database connection pool settings
- Review and optimize your subscription filters
401 Unauthorized / Invalid Token:
- Verify that
FASTPUBSUB_AUTH_ENABLED=trueis set on the server - Ensure you're using a valid access token obtained from
/oauth/token - Check that the token hasn't expired (default: 30 minutes)
- Verify the client is still active and hasn't been deleted or disabled
- If the client was updated, old tokens are invalidated - request a new token
403 Forbidden / Insufficient Scope:
- Check that the client has the required scope for the operation
- For object-specific operations, ensure the scope includes the resource ID
- Use
*scope for admin/testing purposes (not recommended for production) - Example: To publish to topic "events", client needs
topics:publishortopics:publish:eventsscope
Missing FASTPUBSUB_AUTH_SECRET_KEY:
- Generate a secret key using
docker run --rm allisson/fastpubsub generate_secret_key - Set it as an environment variable:
FASTPUBSUB_AUTH_SECRET_KEY=your-generated-key - The same secret key must be used across all server instances
This section is for developers who want to contribute to fastpubsub or run it locally without Docker.
- Python 3.14+: The project requires Python 3.14 or later
- uv: Fast Python package installer and resolver (installation guide)
- PostgreSQL 14+: Local PostgreSQL instance for development
- make: For running Makefile commands (usually pre-installed on Unix-like systems)
- Clone the repository:
git clone https://github.com/allisson/fastpubsub.git
cd fastpubsub- Start a local PostgreSQL instance (optional):
If you don't have PostgreSQL running, you can use the provided Makefile command:
make start-postgresqlThis starts a PostgreSQL container with default credentials:
- User:
fastpubsub - Password:
fastpubsub - Database:
fastpubsub - Port:
5432
To stop and remove the PostgreSQL container later:
make remove-postgresql- Set up environment variables:
Copy the sample environment file and adjust as needed:
cp env.sample .envEdit .env to configure your local database connection and other settings.
- Install dependencies:
# Install uv if you haven't already
pip install uv
# Install project dependencies (including development dependencies)
uv syncThis creates a virtual environment at .venv and installs all required packages.
- Run database migrations:
make run-db-migrateOr manually:
PYTHONPATH=./ uv run python fastpubsub/main.py db-migrateRun the full test suite:
make testOr manually with pytest:
uv run pytest -vFor coverage reporting:
uv run pytest -v --cov=fastpubsub --cov-report=term-missingThe project uses ruff for linting and formatting, along with pre-commit hooks.
Run linting:
make lintThis runs all pre-commit hooks including:
- Ruff linting and formatting
- Various file checks (trailing whitespace, YAML/JSON validation, etc.)
- MyPy type checking
Install pre-commit hooks (recommended):
uv run pre-commit installAfter installation, the hooks will run automatically on every commit.
Manual formatting:
# Format code with ruff
uv run ruff format .
# Run ruff checks with auto-fix
uv run ruff check --fix .Start the development server:
make run-serverOr manually:
PYTHONPATH=./ uv run python fastpubsub/main.py serverThe API will be available at:
- Server:
http://localhost:8000 - Swagger UI:
http://localhost:8000/docs - ReDoc:
http://localhost:8000/redoc
Create a new migration:
make create-migrationThis generates a new migration file in migrations/versions/. Edit the file to define your schema changes.
Apply migrations:
make run-db-migrateBuild the Docker image:
make docker-buildOr manually:
docker build --rm -t fastpubsub .- Create a feature branch:
git checkout -b feature/your-feature-name- Make your changes and test locally:
# Run linting
make lint
# Run tests
make test
# Start the server to manually test
make run-server- Commit your changes:
The pre-commit hooks will automatically run linting and checks. Ensure all checks pass.
git add .
git commit -m "Your commit message"- Push and create a pull request:
git push origin feature/your-feature-namefastpubsub/
├── fastpubsub/ # Main application package
│ ├── api/ # FastAPI routes and API logic
│ ├── services/ # Business logic and services
│ ├── config.py # Configuration management
│ ├── database.py # Database connection and migrations
│ ├── models.py # Pydantic models
│ ├── main.py # CLI entry point
│ └── ...
├── migrations/ # Alembic database migrations
│ └── versions/ # Migration files
├── tests/ # Test suite
│ ├── api/ # API tests
│ ├── services/ # Service tests
│ └── ...
├── Dockerfile # Production Docker image
├── Makefile # Development commands
├── pyproject.toml # Project metadata and dependencies
├── ruff.toml # Ruff linter configuration
├── .pre-commit-config.yaml # Pre-commit hooks configuration
└── README.md # This file
| Command | Description |
|---|---|
make test |
Run the test suite with pytest |
make lint |
Run pre-commit hooks (linting, formatting, checks) |
make start-postgresql |
Start a local PostgreSQL Docker container |
make remove-postgresql |
Stop and remove the PostgreSQL container |
make create-migration |
Create a new Alembic migration file |
make run-db-migrate |
Apply database migrations |
make run-server |
Start the development server |
make docker-build |
Build the Docker image locally |
- Virtual Environment: The project uses
uvwhich automatically manages a virtual environment in.venv/ - Python Version: Ensure you're using Python 3.14+ as specified in
pyproject.toml - Environment Variables: All configuration is done via environment variables prefixed with
FASTPUBSUB_ - IDE Setup: Consider configuring your IDE to use the
.venv/bin/pythoninterpreter - Database: The test suite uses the same database configured in your
.envfile
Made with ❤️ using FastAPI and PostgreSQL