Job queues as refreshing as taking a swig 🍺
Swig is a robust, PostgreSQL-backed job queue system for Go applications, designed for developers who need reliable background job processing with minimal setup.
go get github.com/glamboyosa/[email protected]
import it like:
import "github.com/glamboyosa/swig"
In distributed systems, especially job queues, transactional integrity is crucial. Swig offers three levels of transaction control:
- Bring Your Own Transaction (Recommended): Use your existing database transaction:
tx, _ := pool.Begin(ctx)
defer tx.Rollback(ctx)
// Create user
userID := createUser(tx)
// Enqueue welcome email in same transaction
err := swigClient.AddJobWithTx(ctx, tx, &EmailWorker{
To: email,
Subject: "Welcome!",
})
if err != nil {
return err
}
return tx.Commit(ctx)
- Use Swig's Transaction Helper: Let Swig manage the transaction:
err := swigClient.driver.WithTx(ctx, func(tx Transaction) error {
// Create user
if err := createUser(tx); err != nil {
return err // Triggers rollback
}
// Add job (auto rollback if this fails)
return tx.Exec(ctx, insertJobSQL, ...)
})
- No Transaction (Simple): Just enqueue a job:
err := swigClient.AddJob(ctx, &EmailWorker{
To: email,
Subject: "Welcome!",
})
Choose the approach that best fits your needs:
- Use
AddJobWithTx
when you need to coordinate jobs with your application's data - Use
WithTx
when you want automatic transaction management - Use
AddJob
for simple, non-transactional job enqueueing
Benefits of transactional job enqueueing:
- No Lost Jobs: Jobs are either fully committed or not at all
- Atomic Processing: Jobs are processed exactly once using PostgreSQL's SKIP LOCK
- Data Consistency: Jobs can be part of your application's transactions
- PostgreSQL-Powered: Leverages PostgreSQL's SKIP LOCK for efficient job distribution
- Transactional Integrity: Jobs are processed exactly once with transactional guarantees
- Leader Election: Built-in leader election using advisory locks
- Multiple Queue Support: Priority and default queues out of the box
- Type-Safe Job Arguments: Strongly typed job arguments with Go generics
- Simple API: Intuitive API for enqueueing and processing jobs
go get github.com/glamboyosa/swig
Swig supports two PostgreSQL driver implementations:
pgx
(Recommended) - Using the high-performance pgx driver- Better performance
- Native LISTEN/NOTIFY support
- Real-time job notifications
database/sql
- Using Go's standarddatabase/sql
interface- Important: Requires
github.com/lib/pq
driver for LISTEN/NOTIFY support - Must import with:
import _ "github.com/lib/pq"
- Must use
postgres://
(notpostgresql://
) in connection strings
- Important: Requires
Workers in Swig are structs that implement two key requirements:
- The
JobName() string
method to identify the worker type - The
Process(context.Context) error
method to execute the job - Have JSON-serializable fields for job arguments
Example worker:
type EmailWorker struct {
To string `json:"to"`
Subject string `json:"subject"`
Body string `json:"body"`
}
func (w *EmailWorker) JobName() string {
return "send_email"
}
func (w *EmailWorker) Process(ctx context.Context) error {
return sendEmail(w.To, w.Subject, w.Body)
}
package main
import (
"context"
"github.com/jackc/pgx/v5/pgxpool"
"database/sql"
_ "github.com/lib/pq"
"github.com/swig/swig-go/drivers"
import swig "github.com/glamboyosa/swig/swig-go"
)
// 1. Define your worker (as shown above in Understanding Workers)
type EmailWorker struct {
To string `json:"to"`
Subject string `json:"subject"`
Body string `json:"body"`
}
func (w *EmailWorker) JobName() string {
return "send_email"
}
func (w *EmailWorker) Process(ctx context.Context) error {
return sendEmail(w.To, w.Subject, w.Body)
}
func main() {
ctx := context.Background()
// Setup database connection (choose one)
// Option A: Using pgx
pgxConfig, _ := pgxpool.ParseConfig("postgres://localhost:5432/myapp")
pgxPool, _ := pgxpool.NewWithConfig(ctx, pgxConfig)
driver, _ := drivers.NewPgxDriver(pgxPool)
// Option B: Using database/sql
// db, _ := sql.Open("postgres", "postgres://localhost:5432/myapp")
// driver, _ := drivers.NewSQLDriver(db, connString)
// Create a worker registry and register your workers
workers := swig.NewWorkerRegistry()
workers.RegisterWorker(&EmailWorker{})
// Configure queues (default setup)
configs := []swig.SwigQueueConfig{
{QueueType: swig.Default, MaxWorkers: 5},
}
// Create and start Swig with worker registry
swigClient := swig.NewSwig(driver, configs, workers)
swigClient.Start(ctx)
// Add a job (uses default queue)
err := swigClient.AddJob(ctx, &EmailWorker{
To: "[email protected]",
Subject: "Welcome!",
Body: "Hello from Swig",
})
}
Swig supports multiple queues with different worker pools. While the default queue is sufficient for many applications, you can configure multiple queues for more complex scenarios:
// Configure multiple queues
configs := []swig.SwigQueueConfig{
{QueueType: swig.Default, MaxWorkers: 5}, // Default queue
{QueueType: swig.Priority, MaxWorkers: 3}, // Priority queue
}
swigClient := swig.NewSwig(driver, configs)
Once you have multiple queues, you can specify which queue to use with JobOptions:
// High priority email
err := swigClient.AddJob(ctx, &EmailWorker{
To: "[email protected]",
Subject: "Urgent Notice!",
Body: "Priority message",
}, swig.JobOptions{
Queue: swig.Priority,
Priority: 5,
})
// Scheduled email in default queue
err = swigClient.AddJob(ctx, &EmailWorker{
To: "[email protected]",
Subject: "Reminder",
Body: "Don't forget!",
}, swig.JobOptions{
Queue: swig.Default,
RunAt: time.Now().Add(24 * time.Hour),
})
Each queue operates independently with its own worker pool, allowing you to:
- Process priority jobs faster with dedicated workers
- Prevent low-priority jobs from blocking important tasks
- Scale worker pools based on queue requirements
Workers must be registered with Swig before they can process jobs:
// Create workers registry
workers := swig.NewWorkerRegistry()
// Register workers
workers.RegisterWorker(&EmailWorker{})
workers.RegisterWorker(&ImageResizeWorker{})
// Pass workers to Swig
swigClient := swig.NewSwig(driver, configs, workers)
The registry ensures that:
- Only registered worker types can be processed
- Worker implementations are validated at startup
- Job payloads can be properly deserialized
Swig handles job processing with:
- Automatic retries
- Error tracking
- Job status management
- Scheduled jobs
- Priority queues
Swig provides methods for both graceful shutdown and complete cleanup:
// Graceful shutdown: Wait for jobs to complete
err := swigClient.Stop(ctx)
// Complete cleanup: Drop all Swig tables
err := swigClient.Close(ctx)
The Close
method is particularly useful in:
- Testing environments to clean up after tests
- Development scenarios to reset state
- CI/CD pipelines needing clean slate between runs
Example usage in tests:
func TestJobProcessing(t *testing.T) {
// Setup Swig
swigClient := swig.NewSwig(driver, configs, workers)
defer swigClient.Close(ctx) // Clean up after test
// Run your tests...
}
Swig uses PostgreSQL's SKIP LOCK for efficient job distribution across multiple processes. This, combined with advisory locks for leader election, ensures:
- No duplicate job processing
- Fair job distribution
- High availability
- Transactional integrity
We welcome contributions! Please see our Contributing Guide for details.
MIT License - see LICENSE for details.