Skip to content

Added redis connection pool support. Implemented io.ReadWriter. #9

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 94 additions & 14 deletions redisqueue/queue.go
Original file line number Diff line number Diff line change
@@ -1,25 +1,74 @@
package redisqueue

import (
"bytes"
"fmt"
"github.com/gomodule/redigo/redis"
"strconv"
"time"

"github.com/gomodule/redigo/redis"
)

// Queue holds a reference to a redis connection and a queue name.
type Queue struct {
c redis.Conn
Name string
pool *redis.Pool
c redis.Conn
readBuf bytes.Buffer
usePool bool
Name string
}

// New defines a new Queue with redis.Pool or single redis.Conn
func New(queueName string, conn interface{}) *Queue {
q := &Queue{
Name: queueName,
readBuf: bytes.Buffer{},
}

switch conn.(type) {
case *redis.Pool:
q.pool = conn.(*redis.Pool)
q.usePool = true
case redis.Conn:
q.c = conn.(redis.Conn)
default:
return nil
}

return q
}

// Implements io.Writer
func (q *Queue) Write(p []byte) (n int, err error) {
_, err = q.Schedule(string(p), time.Now())
if err != nil {
return
}

n = len(p)

return
}

// New defines a new Queue
func New(queueName string, c redis.Conn) *Queue {
return &Queue{
c: c,
Name: queueName,
// Implements io.Reader
func (q *Queue) Read(p []byte) (n int, err error) {
var available int64
available, err = q.Pending()
if err != nil {
return
}
if available > 0 {
var data []string
data, err = q.PopJobs(int(available))
if err != nil {
return
}

for i := range data {
q.readBuf.WriteString(data[i])
}
}

return q.readBuf.Read(p)
}

// Push pushes a single job on to the queue. The job string can be any format, as the queue doesn't really care.
Expand All @@ -29,21 +78,44 @@ func (q *Queue) Push(job string) (bool, error) {

// Schedule schedule a job at some point in the future, or some point in the past. Scheduling a job far in the past is the same as giving it a high priority, as jobs are popped in order of due date.
func (q *Queue) Schedule(job string, when time.Time) (bool, error) {
var c redis.Conn
if q.usePool {
c = q.pool.Get()
defer c.Close()
} else {
c = q.c
}

score := when.UnixNano()
added, err := redis.Bool(q.c.Do("ZADD", q.Name, score, job))
added, err := redis.Bool(c.Do("ZADD", q.Name, score, job))
// _, err := addTaskScript.Do(q.c, job)
return added, err

}

// Pending returns the count of jobs pending, including scheduled jobs that are not due yet.
func (q *Queue) Pending() (int64, error) {
return redis.Int64(q.c.Do("ZCARD", q.Name))
var c redis.Conn
if q.usePool {
c = q.pool.Get()
defer c.Close()
} else {
c = q.c
}

return redis.Int64(c.Do("ZCARD", q.Name))
}

// FlushQueue removes everything from the queue. Useful for testing.
func (q *Queue) FlushQueue() error {
_, err := q.c.Do("DEL", q.Name)
var c redis.Conn
if q.usePool {
c = q.pool.Get()
defer c.Close()
} else {
c = q.c
}

_, err := c.Do("DEL", q.Name)
return err
}

Expand All @@ -61,7 +133,15 @@ func (q *Queue) Pop() (string, error) {

// PopJobs returns multiple jobs from the queue. Safe for concurrent use (multiple goroutines must use their own Queue objects and redis connections)
func (q *Queue) PopJobs(limit int) ([]string, error) {
return redis.Strings(popJobsScript.Do(q.c, q.Name, fmt.Sprintf("%d", time.Now().UnixNano()), strconv.Itoa(limit)))
var c redis.Conn
if q.usePool {
c = q.pool.Get()
defer c.Close()
} else {
c = q.c
}

return redis.Strings(popJobsScript.Do(c, q.Name, fmt.Sprintf("%d", time.Now().UnixNano()), strconv.Itoa(limit)))
}

func quoteArgs(args []string) string {
Expand Down
2 changes: 1 addition & 1 deletion redisqueue/scripts.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ func init() {
popJobsScript = redis.NewScript(3, `
local name = KEYS[1]
local timestamp = KEYS[2]
local limit = KEYS[3]
local limit = KEYS[3]
local results = redis.call('zrangebyscore', name, '-inf', timestamp, 'LIMIT', 0, limit)
if table.getn(results) > 0 then
redis.call('zrem', name, unpack(results))
Expand Down