Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

new feature: (optional) Queue priorities #74

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,32 @@ goworker.Enqueue(&goworker.Job{
})
```

### Priority queues

There is an optional way to define priorities for the queues, meaning that jobs from a queue with priority *n* would be pulled before jobs from a queue with priority *m* (*n* < *m*).

```go
settings := goworker.WorkerSettings{
URI: "redis://localhost:6379/",
Connections: 100,
Queues: []string{"med-priority", "low-priority", "high-priority"},
QueuesPriority: map[string]int{
"high-priority": 10,
"med-priority": 100,
"low-priority": 200,
},
UseNumber: true,
ExitOnComplete: false,
Concurrency: 2,
Namespace: "resque:",
Interval: 5.0,
}

goworker.SetSettings(settings)
```

Priority will be set as zero (higher) if it is not provided or in case that a negative value would be provided.

## Flags

There are several flags which control the operation of the goworker client.
Expand Down
5 changes: 3 additions & 2 deletions goworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ var workerSettings WorkerSettings
type WorkerSettings struct {
QueuesString string
Queues queuesFlag
QueuesPriority map[string]int
IntervalFloat float64
Interval intervalFlag
Concurrency int
Expand Down Expand Up @@ -125,7 +126,7 @@ func Work() error {

quit := signals()

poller, err := newPoller(workerSettings.Queues, workerSettings.IsStrict)
poller, err := newPoller(workerSettings.Queues, workerSettings.QueuesPriority, workerSettings.IsStrict)
if err != nil {
return err
}
Expand All @@ -137,7 +138,7 @@ func Work() error {
var monitor sync.WaitGroup

for id := 0; id < workerSettings.Concurrency; id++ {
worker, err := newWorker(strconv.Itoa(id), workerSettings.Queues)
worker, err := newWorker(strconv.Itoa(id), workerSettings.Queues, workerSettings.QueuesPriority)
if err != nil {
return err
}
Expand Down
16 changes: 13 additions & 3 deletions poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ type poller struct {
isStrict bool
}

func newPoller(queues []string, isStrict bool) (*poller, error) {
process, err := newProcess("poller", queues)
func newPoller(queues []string, queuesPriority map[string]int, isStrict bool) (*poller, error) {
process, err := newProcess("poller", queues, queuesPriority)
if err != nil {
return nil, err
}
Expand All @@ -23,8 +23,18 @@ func newPoller(queues []string, isStrict bool) (*poller, error) {
}, nil
}

// getJob returns a job from the non-empty highest priority queue.
// If no priorities were provided, all queues have the same priority: 0 (the highest).
func (p *poller) getJob(conn *RedisConn) (*Job, error) {
for _, queue := range p.queues(p.isStrict) {
var (
queues = p.queues(p.isStrict)
queue string
)

// iterate the queues in the exact order they were sorted
for i := 0; i < len(queues); i++ {
queue = queues[i]

logger.Debugf("Checking %s", queue)

reply, err := conn.Do("LPOP", fmt.Sprintf("%squeue:%s", workerSettings.Namespace, queue))
Expand Down
103 changes: 85 additions & 18 deletions process.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,36 @@ import (
"fmt"
"math/rand"
"os"
"sort"
"strings"
"time"
)

type process struct {
Hostname string
Pid int
ID string
Queues []string
Hostname string
Pid int
ID string
Queues []string
QueuesPriority map[string]int
queuesSortedByPriority []string
}

func newProcess(id string, queues []string) (*process, error) {
func newProcess(id string, queues []string, queuesPriority map[string]int) (*process, error) {
hostname, err := os.Hostname()
if err != nil {
return nil, err
}

return &process{
Hostname: hostname,
Pid: os.Getpid(),
ID: id,
Queues: queues,
}, nil
p := &process{
Hostname: hostname,
Pid: os.Getpid(),
ID: id,
Queues: queues,
QueuesPriority: queuesPriority,
}
p.queuesSortedByPriority = p.getQueuesSortedByPriority()

return p, nil
}

func (p *process) String() string {
Expand Down Expand Up @@ -75,16 +82,76 @@ func (p *process) fail(conn *RedisConn) error {
return nil
}

// queues returns a slice of queues.
// If priorities were provided, this list will be sorted by increasing priorities.
// Priority will be automatically set to zero for those queues without priority.
func (p *process) queues(strict bool) []string {
// If the queues order is strict then just return them.
if strict {
if p.QueuesPriority == nil {
// If the queues order is strict then just return them.
if strict {
return p.Queues
}

// If not then we want to to shuffle the queues before returning them.
queues := make([]string, len(p.Queues))
for i, v := range rand.Perm(len(p.Queues)) {
queues[i] = p.Queues[v]
}
return queues
}

return p.queuesSortedByPriority
}

// getQueuesSortedByPriority returns the original queue list sorted by increasing priorities (based on QueuesPriority).
// The original queue list will be unaltered if QueuesPriority is not provided.
func (p *process) getQueuesSortedByPriority() []string {
// priorities are the same for all queues if no priorities were defined
if p.QueuesPriority == nil {
return p.Queues
}

// If not then we want to to shuffle the queues before returning them.
queues := make([]string, len(p.Queues))
for i, v := range rand.Perm(len(p.Queues)) {
queues[i] = p.Queues[v]
// [priority] => []queue
pQueueMap := make(map[int][]string)

for _, queue := range p.Queues {
// get queue's priority
if priority, ok := p.QueuesPriority[queue]; ok {
// not allowed a highest priority than zero
if priority < 0 {
priority = 0
}
// insert queue into priority list
p.insertQueueIntoPriorityMap(queue, priority, pQueueMap)
} else {
// insert queue into priority 0 list
p.insertQueueIntoPriorityMap(queue, 0, pQueueMap)
}
}

// get the priorities
priorities := make([]int, 0)
for k := range pQueueMap {
priorities = append(priorities, k)
}
// sort the priorities in increasing order
sort.Ints(priorities)

// generate the queue list: sorted by increasing priorities
sortedPriorities := make([]string, 0)
for i := 0; i < len(priorities); i++ {
sortedPriorities = append(sortedPriorities, pQueueMap[priorities[i]]...)
}

return sortedPriorities
}

// insertQueueIntoPriorityMap inserts a queue into a given priority's []string. This function is intended
// to be called ONLY by *process.getQueuesSortedByPriority
func (p *process) insertQueueIntoPriorityMap(queue string, priority int, priorityMap map[int][]string) {
if qList, ok := priorityMap[priority]; ok {
priorityMap[priority] = append(qList, queue)
} else {
priorityMap[priority] = []string{queue}
}
return queues
}
126 changes: 109 additions & 17 deletions process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,85 @@ import (
"testing"
)

var processStringTests = []struct {
p process
expected string
}{
{
process{},
":0-:",
},
{
process{
Hostname: "hostname",
Pid: 12345,
ID: "123",
Queues: []string{"high", "low"},
var (
processStringTests = []struct {
p process
expected string
}{
{
process{},
":0-:",
},
"hostname:12345-123:high,low",
},
}
{
process{
Hostname: "hostname",
Pid: 12345,
ID: "123",
Queues: []string{"high", "low"},
},
"hostname:12345-123:high,low",
},
}

processPriorityQueueTests = []struct {
p *process
expectedQueues []string
}{
// no queues, no priorities
{
&process{
Pid: 1,
Queues: []string{},
},
[]string{},
},
// queues + priorities for all queues
{
&process{
Pid: 2,
Queues: []string{"low-priority", "high-priority", "med-priority"},
QueuesPriority: map[string]int{
"high-priority": 5,
"med-priority": 10,
"low-priority": 15,
},
},
[]string{"high-priority", "med-priority", "low-priority"},
},
// queues + priorities for some queues
{
&process{
Pid: 3,
Queues: []string{"low-priority", "high-priority", "med-priority"},
QueuesPriority: map[string]int{
"med-priority": 10,
"low-priority": 15,
},
},
[]string{"high-priority", "med-priority", "low-priority"},
},
// queues + no priorities (all queues get the same priority: zero)
{
&process{
Pid: 4,
Queues: []string{"low-priority", "high-priority", "med-priority"},
},
[]string{"low-priority", "high-priority", "med-priority"},
},
// queues + negative priority (automatically set to zero: the highest priority)
{
&process{
Pid: 5,
Queues: []string{"low-priority", "med-priority", "high-priority"},
QueuesPriority: map[string]int{
"high-priority": -10,
"low-priority": 15,
},
},
[]string{"med-priority", "high-priority", "low-priority"},
},
}
)

func TestProcessString(t *testing.T) {
for _, tt := range processStringTests {
Expand All @@ -31,3 +92,34 @@ func TestProcessString(t *testing.T) {
}
}
}

// TestProcessPriorityQueues verifies that queues get sorted by incremental priority (if priorities were provided)
func TestProcessPriorityQueues(t *testing.T) {
for _, ppqt := range processPriorityQueueTests {
ppqt.p.queuesSortedByPriority = ppqt.p.getQueuesSortedByPriority()
actual := ppqt.p.queues(true)
if !testStringSliceEq(ppqt.expectedQueues, actual) {
t.Errorf("Process(%#v): expected %s, actual %s", ppqt.p.Pid, ppqt.expectedQueues, actual)
}
}
}

// testStringSliceEq returns true if a == b (a, b []string)
func testStringSliceEq(a, b []string) bool {
// If one is nil, the other must also be nil.
if (a == nil) != (b == nil) {
return false
}

if len(a) != len(b) {
return false
}

for i := range a {
if a[i] != b[i] {
return false
}
}

return true
}
2 changes: 1 addition & 1 deletion redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func newRedisFactory(uri string) pools.Factory {
}

func newRedisPool(uri string, capacity int, maxCapacity int, idleTimout time.Duration) *pools.ResourcePool {
return pools.NewResourcePool(newRedisFactory(uri), capacity, maxCapacity, idleTimout)
return pools.NewResourcePool(newRedisFactory(uri), capacity, maxCapacity, idleTimout, 0)
}

func redisConnFromURI(uriString string) (*RedisConn, error) {
Expand Down
4 changes: 2 additions & 2 deletions worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ type worker struct {
process
}

func newWorker(id string, queues []string) (*worker, error) {
process, err := newProcess(id, queues)
func newWorker(id string, queues []string, queuesPriority map[string]int) (*worker, error) {
process, err := newProcess(id, queues, queuesPriority)
if err != nil {
return nil, err
}
Expand Down