From f79616866043a0b1893b55ede286c0fafe5336df Mon Sep 17 00:00:00 2001 From: Oleksiy Likhosherstov Date: Tue, 9 Oct 2018 21:25:15 +0300 Subject: [PATCH] Added redis connection pool support. Implemented io.ReadWriter. --- redisqueue/queue.go | 108 ++++++++++++++++++++++++++++++++++++------ redisqueue/scripts.go | 2 +- 2 files changed, 95 insertions(+), 15 deletions(-) diff --git a/redisqueue/queue.go b/redisqueue/queue.go index 9d840d1..84835a9 100644 --- a/redisqueue/queue.go +++ b/redisqueue/queue.go @@ -1,25 +1,74 @@ package redisqueue import ( + "bytes" "fmt" + "github.com/gomodule/redigo/redis" "strconv" "time" - - "github.com/gomodule/redigo/redis" ) // Queue holds a reference to a redis connection and a queue name. type Queue struct { - c redis.Conn - Name string + pool *redis.Pool + c redis.Conn + readBuf bytes.Buffer + usePool bool + Name string +} + +// New defines a new Queue with redis.Pool or single redis.Conn +func New(queueName string, conn interface{}) *Queue { + q := &Queue{ + Name: queueName, + readBuf: bytes.Buffer{}, + } + + switch conn.(type) { + case *redis.Pool: + q.pool = conn.(*redis.Pool) + q.usePool = true + case redis.Conn: + q.c = conn.(redis.Conn) + default: + return nil + } + + return q +} + +// Implements io.Writer +func (q *Queue) Write(p []byte) (n int, err error) { + _, err = q.Schedule(string(p), time.Now()) + if err != nil { + return + } + + n = len(p) + + return } -// New defines a new Queue -func New(queueName string, c redis.Conn) *Queue { - return &Queue{ - c: c, - Name: queueName, +// Implements io.Reader +func (q *Queue) Read(p []byte) (n int, err error) { + var available int64 + available, err = q.Pending() + if err != nil { + return + } + if available > 0 { + var data []string + data, err = q.PopJobs(int(available)) + if err != nil { + return + } + + for i := range data { + q.readBuf.WriteString(data[i]) + } } + + return q.readBuf.Read(p) } // Push pushes a single job on to the queue. The job string can be any format, as the queue doesn't really care. @@ -29,21 +78,44 @@ func (q *Queue) Push(job string) (bool, error) { // Schedule schedule a job at some point in the future, or some point in the past. Scheduling a job far in the past is the same as giving it a high priority, as jobs are popped in order of due date. func (q *Queue) Schedule(job string, when time.Time) (bool, error) { + var c redis.Conn + if q.usePool { + c = q.pool.Get() + defer c.Close() + } else { + c = q.c + } + score := when.UnixNano() - added, err := redis.Bool(q.c.Do("ZADD", q.Name, score, job)) + added, err := redis.Bool(c.Do("ZADD", q.Name, score, job)) // _, err := addTaskScript.Do(q.c, job) return added, err - } // Pending returns the count of jobs pending, including scheduled jobs that are not due yet. func (q *Queue) Pending() (int64, error) { - return redis.Int64(q.c.Do("ZCARD", q.Name)) + var c redis.Conn + if q.usePool { + c = q.pool.Get() + defer c.Close() + } else { + c = q.c + } + + return redis.Int64(c.Do("ZCARD", q.Name)) } // FlushQueue removes everything from the queue. Useful for testing. func (q *Queue) FlushQueue() error { - _, err := q.c.Do("DEL", q.Name) + var c redis.Conn + if q.usePool { + c = q.pool.Get() + defer c.Close() + } else { + c = q.c + } + + _, err := c.Do("DEL", q.Name) return err } @@ -61,7 +133,15 @@ func (q *Queue) Pop() (string, error) { // PopJobs returns multiple jobs from the queue. Safe for concurrent use (multiple goroutines must use their own Queue objects and redis connections) func (q *Queue) PopJobs(limit int) ([]string, error) { - return redis.Strings(popJobsScript.Do(q.c, q.Name, fmt.Sprintf("%d", time.Now().UnixNano()), strconv.Itoa(limit))) + var c redis.Conn + if q.usePool { + c = q.pool.Get() + defer c.Close() + } else { + c = q.c + } + + return redis.Strings(popJobsScript.Do(c, q.Name, fmt.Sprintf("%d", time.Now().UnixNano()), strconv.Itoa(limit))) } func quoteArgs(args []string) string { diff --git a/redisqueue/scripts.go b/redisqueue/scripts.go index 6270c31..5f1c7e3 100644 --- a/redisqueue/scripts.go +++ b/redisqueue/scripts.go @@ -8,7 +8,7 @@ func init() { popJobsScript = redis.NewScript(3, ` local name = KEYS[1] local timestamp = KEYS[2] - local limit = KEYS[3] + local limit = KEYS[3] local results = redis.call('zrangebyscore', name, '-inf', timestamp, 'LIMIT', 0, limit) if table.getn(results) > 0 then redis.call('zrem', name, unpack(results))