Skip to content

Commit 6694ab6

Browse files
authored
Add support of passing attributes in publish/consume (#226)
1 parent bc3b3a1 commit 6694ab6

14 files changed

+207
-143
lines changed

docker/docker-compose.yml

-16
This file was deleted.

engine/job.go

+26-22
Original file line numberDiff line numberDiff line change
@@ -37,31 +37,33 @@ type jobImpl struct {
3737
// NOTE: there is a trick in this factory, the delay is embedded in the jobID.
3838
// By doing this we can delete the job that's located in hourly AOF, by placing
3939
// a tombstone record in that AOF.
40-
func NewJob(namespace, queue string, body []byte, ttl, delay uint32, tries uint16, jobID string) Job {
40+
func NewJob(namespace, queue string, body []byte, attributes map[string]string, ttl, delay uint32, tries uint16, jobID string) Job {
4141
if jobID == "" {
4242
jobID = uuid.GenJobIDWithVersion(0, delay)
4343
}
4444
return &jobImpl{
45-
namespace: namespace,
46-
queue: queue,
47-
id: jobID,
48-
body: body,
49-
ttl: ttl,
50-
delay: delay,
51-
tries: tries,
45+
namespace: namespace,
46+
queue: queue,
47+
id: jobID,
48+
body: body,
49+
ttl: ttl,
50+
delay: delay,
51+
tries: tries,
52+
attributes: attributes,
5253
}
5354
}
5455

55-
func NewJobWithID(namespace, queue string, body []byte, ttl uint32, tries uint16, jobID string) Job {
56+
func NewJobWithID(namespace, queue string, body []byte, attributes map[string]string, ttl uint32, tries uint16, jobID string) Job {
5657
delay, _ := uuid.ExtractDelaySecondFromUniqueID(jobID)
5758
return &jobImpl{
58-
namespace: namespace,
59-
queue: queue,
60-
id: jobID,
61-
body: body,
62-
ttl: ttl,
63-
delay: delay,
64-
tries: tries,
59+
namespace: namespace,
60+
queue: queue,
61+
id: jobID,
62+
body: body,
63+
ttl: ttl,
64+
delay: delay,
65+
tries: tries,
66+
attributes: attributes,
6567
}
6668
}
6769

@@ -108,19 +110,21 @@ func (j *jobImpl) Attributes() map[string]string {
108110

109111
func (j *jobImpl) MarshalText() (text []byte, err error) {
110112
var job struct {
111-
Namespace string `json:"namespace"`
112-
Queue string `json:"queue"`
113-
ID string `json:"id"`
114-
TTL uint32 `json:"ttl"`
115-
ElapsedMS int64 `json:"elapsed_ms"`
116-
Body []byte `json:"body"`
113+
Namespace string `json:"namespace"`
114+
Queue string `json:"queue"`
115+
ID string `json:"id"`
116+
TTL uint32 `json:"ttl"`
117+
ElapsedMS int64 `json:"elapsed_ms"`
118+
Body []byte `json:"body"`
119+
Attributes map[string]string `json:"attributes,omitempty"`
117120
}
118121
job.Namespace = j.namespace
119122
job.Queue = j.queue
120123
job.ID = j.id
121124
job.TTL = j.ttl
122125
job.ElapsedMS = j._elapsedMS
123126
job.Body = j.body
127+
job.Attributes = j.attributes
124128
return json.Marshal(job)
125129
}
126130

engine/migration/engine_test.go

+15-15
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,15 @@ import (
1313
func TestEngine_Publish(t *testing.T) {
1414
e := NewEngine(OldRedisEngine, NewRedisEngine)
1515
body := []byte("hello msg 1")
16-
j := engine.NewJob("ns-engine", "q1", body, 10, 2, 1, "")
16+
j := engine.NewJob("ns-engine", "q1", body, nil, 10, 2, 1, "")
1717
jobID, err := e.Publish(j)
1818
t.Log(jobID)
1919
if err != nil {
2020
t.Fatalf("Failed to publish: %s", err)
2121
}
2222

2323
// Publish no-delay job
24-
j = engine.NewJob("ns-engine", "q1", body, 10, 0, 1, "")
24+
j = engine.NewJob("ns-engine", "q1", body, nil, 10, 0, 1, "")
2525
jobID, err = e.Publish(j)
2626
t.Log(jobID)
2727
if err != nil {
@@ -37,7 +37,7 @@ func TestEngine_Publish(t *testing.T) {
3737
func TestEngine_Consume(t *testing.T) {
3838
e := NewEngine(OldRedisEngine, NewRedisEngine)
3939
body := []byte("hello msg 2")
40-
j := engine.NewJob("ns-engine", "q2", body, 10, 2, 1, "")
40+
j := engine.NewJob("ns-engine", "q2", body, nil, 10, 2, 1, "")
4141
jobID, err := e.Publish(j)
4242
t.Log(jobID)
4343
if err != nil {
@@ -53,7 +53,7 @@ func TestEngine_Consume(t *testing.T) {
5353
}
5454

5555
// Consume job that's published in no-delay way
56-
j = engine.NewJob("ns-engine", "q2", body, 10, 0, 1, "")
56+
j = engine.NewJob("ns-engine", "q2", body, nil, 10, 0, 1, "")
5757
jobID, err = e.Publish(j)
5858
t.Log(jobID)
5959
if err != nil {
@@ -72,9 +72,9 @@ func TestEngine_Consume(t *testing.T) {
7272
func TestEngine_Consume2(t *testing.T) {
7373
e := NewEngine(OldRedisEngine, NewRedisEngine)
7474
body := []byte("hello msg 3")
75-
j1 := engine.NewJob("ns-engine", "q3", []byte("delay msg"), 10, 5, 1, "")
75+
j1 := engine.NewJob("ns-engine", "q3", []byte("delay msg"), nil, 10, 5, 1, "")
7676
_, err := e.Publish(j1)
77-
j2 := engine.NewJob("ns-engine", "q3", body, 10, 2, 1, "")
77+
j2 := engine.NewJob("ns-engine", "q3", body, nil, 10, 2, 1, "")
7878
jobID, err := e.Publish(j2)
7979
if err != nil {
8080
t.Fatalf("Failed to publish: %s", err)
@@ -91,12 +91,12 @@ func TestEngine_Consume2(t *testing.T) {
9191
func TestEngine_ConsumeMulti(t *testing.T) {
9292
e := NewEngine(OldRedisEngine, NewRedisEngine)
9393
body := []byte("hello msg 4")
94-
j1 := engine.NewJob("ns-engine", "q4", body, 10, 3, 1, "")
94+
j1 := engine.NewJob("ns-engine", "q4", body, nil, 10, 3, 1, "")
9595
jobID, err := e.Publish(j1)
9696
if err != nil {
9797
t.Fatalf("Failed to publish: %s", err)
9898
}
99-
j2 := engine.NewJob("ns-engine", "q5", body, 10, 1, 1, "")
99+
j2 := engine.NewJob("ns-engine", "q5", body, nil, 10, 1, 1, "")
100100
jobID2, err := e.Publish(j2)
101101
if err != nil {
102102
t.Fatalf("Failed to publish: %s", err)
@@ -122,7 +122,7 @@ func TestEngine_ConsumeMulti(t *testing.T) {
122122
func TestEngine_Peek(t *testing.T) {
123123
e := NewEngine(OldRedisEngine, NewRedisEngine)
124124
body := []byte("hello msg 6")
125-
j := engine.NewJob("ns-engine", "q6", body, 10, 0, 1, "")
125+
j := engine.NewJob("ns-engine", "q6", body, nil, 10, 0, 1, "")
126126
jobID, err := e.Publish(j)
127127
if err != nil {
128128
t.Fatalf("Failed to publish: %s", err)
@@ -136,7 +136,7 @@ func TestEngine_Peek(t *testing.T) {
136136
func TestEngine_DrainOld(t *testing.T) {
137137
e := NewEngine(OldRedisEngine, NewRedisEngine)
138138
body := []byte("hello msg 7")
139-
j := engine.NewJob("ns-engine", "q7", body, 10, 0, 1, "")
139+
j := engine.NewJob("ns-engine", "q7", body, nil, 10, 0, 1, "")
140140
jobID, err := OldRedisEngine.Publish(j)
141141
job, err := e.Consume("ns-engine", []string{"q7"}, 5, 0)
142142
if err != nil {
@@ -150,7 +150,7 @@ func TestEngine_DrainOld(t *testing.T) {
150150
func TestEngine_BatchConsume(t *testing.T) {
151151
e := NewEngine(OldRedisEngine, NewRedisEngine)
152152
body := []byte("hello msg 8")
153-
j := engine.NewJob("ns-engine", "q8", body, 10, 2, 1, "")
153+
j := engine.NewJob("ns-engine", "q8", body, nil, 10, 2, 1, "")
154154
jobID, err := e.Publish(j)
155155
if err != nil {
156156
t.Fatalf("Failed to publish: %s", err)
@@ -176,7 +176,7 @@ func TestEngine_BatchConsume(t *testing.T) {
176176
// Consume some jobs
177177
jobIDMap := map[string]bool{}
178178
for i := 0; i < 4; i++ {
179-
j := engine.NewJob("ns-engine", "q8", body, 10, 0, 1, "")
179+
j := engine.NewJob("ns-engine", "q8", body, nil, 10, 0, 1, "")
180180
jobID, err := e.Publish(j)
181181
t.Log(jobID)
182182
if err != nil {
@@ -223,7 +223,7 @@ func TestEngine_BatchConsume(t *testing.T) {
223223
func TestEngine_DeadLetter_Size(t *testing.T) {
224224
body := []byte("hello msg 9")
225225
queues := []string{"q9"}
226-
j := engine.NewJob("ns-engine", "q9", body, 10, 0, 1, "")
226+
j := engine.NewJob("ns-engine", "q9", body, nil, 10, 0, 1, "")
227227
jobID, err := OldRedisEngine.Publish(j)
228228
job, err := OldRedisEngine.Consume("ns-engine", queues, 0, 0)
229229
if err != nil {
@@ -232,7 +232,7 @@ func TestEngine_DeadLetter_Size(t *testing.T) {
232232
if job.ID() != jobID {
233233
t.Fatal("Mismatched job")
234234
}
235-
j = engine.NewJob("ns-engine", "q9", body, 10, 0, 1, "")
235+
j = engine.NewJob("ns-engine", "q9", body, nil, 10, 0, 1, "")
236236
jobID, err = NewRedisEngine.Publish(j)
237237
job, err = NewRedisEngine.Consume("ns-engine", queues, 0, 0)
238238
if job.ID() != jobID {
@@ -250,7 +250,7 @@ func TestEngine_PublishWithJobID(t *testing.T) {
250250
e := NewEngine(OldRedisEngine, NewRedisEngine)
251251
body := []byte("hello msg 1")
252252
// Publish no-delay job
253-
j := engine.NewJob("ns-engine", "q10", body, 10, 0, 1, "jobID1")
253+
j := engine.NewJob("ns-engine", "q10", body, nil, 10, 0, 1, "jobID1")
254254
jobID, err := e.Publish(j)
255255
t.Log(jobID)
256256
assert.Nil(t, err)

engine/redis/deadletter_test.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,9 @@ func TestDeadLetter_Delete(t *testing.T) {
5757

5858
func TestDeadLetter_Respawn(t *testing.T) {
5959
p := NewPool(R)
60-
job1 := engine.NewJob("ns-dead", "q3", []byte("1"), 60, 0, 1, "")
61-
job2 := engine.NewJob("ns-dead", "q3", []byte("2"), 60, 0, 1, "")
62-
job3 := engine.NewJob("ns-dead", "q3", []byte("3"), 60, 0, 1, "")
60+
job1 := engine.NewJob("ns-dead", "q3", []byte("1"), nil, 60, 0, 1, "")
61+
job2 := engine.NewJob("ns-dead", "q3", []byte("2"), nil, 60, 0, 1, "")
62+
job3 := engine.NewJob("ns-dead", "q3", []byte("3"), nil, 60, 0, 1, "")
6363
p.Add(job1)
6464
p.Add(job2)
6565
p.Add(job3)
@@ -120,7 +120,7 @@ func TestDeadLetter_Size(t *testing.T) {
120120
dl, _ := NewDeadLetter("ns-dead", "q3", R)
121121
cnt := 3
122122
for i := 0; i < cnt; i++ {
123-
job := engine.NewJob("ns-dead", "q3", []byte("1"), 60, 0, 1, "")
123+
job := engine.NewJob("ns-dead", "q3", []byte("1"), nil, 60, 0, 1, "")
124124
p.Add(job)
125125
dl.Add(job.ID())
126126
}

engine/redis/engine.go

+6-6
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ func (e *Engine) consumeMulti(namespace string, queues []string, ttrSecond, time
156156
return nil, nil
157157
}
158158
endTime := time.Now().Unix()
159-
body, ttl, err := e.pool.Get(namespace, queueName.Queue, jobID)
159+
payload, ttl, err := e.pool.Get(namespace, queueName.Queue, jobID)
160160
switch err {
161161
case nil:
162162
// no-op
@@ -177,7 +177,7 @@ func (e *Engine) consumeMulti(namespace string, queues []string, ttrSecond, time
177177
default:
178178
return nil, fmt.Errorf("pool: %s", err)
179179
}
180-
job = engine.NewJobWithID(namespace, queueName.Queue, body, ttl, tries, jobID)
180+
job = engine.NewJobWithID(namespace, queueName.Queue, payload.Body, payload.Attributes, ttl, tries, jobID)
181181
metrics.jobElapsedMS.WithLabelValues(e.redis.Name, namespace, queueName.Queue).Observe(float64(job.ElapsedMS()))
182182
return job, nil
183183
}
@@ -207,19 +207,19 @@ func (e *Engine) Peek(namespace, queue, optionalJobID string) (job engine.Job, e
207207
return nil, fmt.Errorf("failed to peek queue: %s", err)
208208
}
209209
}
210-
body, ttl, err := e.pool.Get(namespace, queue, jobID)
210+
payload, ttl, err := e.pool.Get(namespace, queue, jobID)
211211
// Tricky: we shouldn't return the not found error when the job was not found,
212212
// since the job may expired(TTL was reached) and it would confuse the user, so
213213
// we return the nil job instead of the not found error here. But if the `optionalJobID`
214214
// was assigned we should return the not fond error.
215-
if optionalJobID == "" && err == engine.ErrNotFound {
215+
if optionalJobID == "" && errors.Is(err, engine.ErrNotFound) {
216216
// return jobID with nil body if the job is expired
217-
return engine.NewJobWithID(namespace, queue, nil, 0, 0, jobID), nil
217+
return engine.NewJobWithID(namespace, queue, nil, nil, 0, 0, jobID), nil
218218
}
219219
if err != nil {
220220
return nil, err
221221
}
222-
return engine.NewJobWithID(namespace, queue, body, ttl, tries, jobID), err
222+
return engine.NewJobWithID(namespace, queue, payload.Body, payload.Attributes, ttl, tries, jobID), err
223223
}
224224

225225
func (e *Engine) Size(namespace, queue string) (size int64, err error) {

engine/redis/engine_test.go

+12-12
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,15 @@ func TestEngine_Publish(t *testing.T) {
1717
}
1818
defer e.Shutdown()
1919
body := []byte("hello msg 1")
20-
j := engine.NewJob("ns-engine", "q0", body, 10, 2, 1, "")
20+
j := engine.NewJob("ns-engine", "q0", body, nil, 10, 2, 1, "")
2121
jobID, err := e.Publish(j)
2222
t.Log(jobID)
2323
if err != nil {
2424
t.Fatalf("Failed to publish: %s", err)
2525
}
2626

2727
// Publish no-delay job
28-
j = engine.NewJob("ns-engine", "q0", body, 10, 0, 1, "")
28+
j = engine.NewJob("ns-engine", "q0", body, nil, 10, 0, 1, "")
2929
jobID, err = e.Publish(j)
3030
t.Log(jobID)
3131
if err != nil {
@@ -40,7 +40,7 @@ func TestEngine_Consume(t *testing.T) {
4040
}
4141
defer e.Shutdown()
4242
body := []byte("hello msg 2")
43-
j := engine.NewJob("ns-engine", "q2", body, 10, 2, 1, "")
43+
j := engine.NewJob("ns-engine", "q2", body, nil, 10, 2, 1, "")
4444
jobID, err := e.Publish(j)
4545
t.Log(jobID)
4646
if err != nil {
@@ -58,7 +58,7 @@ func TestEngine_Consume(t *testing.T) {
5858
}
5959

6060
// Consume job that's published in no-delay way
61-
j = engine.NewJob("ns-engine", "q2", body, 10, 0, 1, "")
61+
j = engine.NewJob("ns-engine", "q2", body, nil, 10, 0, 1, "")
6262
jobID, err = e.Publish(j)
6363
t.Log(jobID)
6464
if err != nil {
@@ -81,9 +81,9 @@ func TestEngine_Consume2(t *testing.T) {
8181
}
8282
defer e.Shutdown()
8383
body := []byte("hello msg 3")
84-
j := engine.NewJob("ns-engine", "q3", []byte("delay msg"), 10, 5, 1, "")
84+
j := engine.NewJob("ns-engine", "q3", []byte("delay msg"), nil, 10, 5, 1, "")
8585
_, err = e.Publish(j)
86-
j = engine.NewJob("ns-engine", "q3", body, 10, 2, 1, "")
86+
j = engine.NewJob("ns-engine", "q3", body, nil, 10, 2, 1, "")
8787
jobID, err := e.Publish(j)
8888
if err != nil {
8989
t.Fatalf("Failed to publish: %s", err)
@@ -107,12 +107,12 @@ func TestEngine_ConsumeMulti(t *testing.T) {
107107
}
108108
defer e.Shutdown()
109109
body := []byte("hello msg 4")
110-
j := engine.NewJob("ns-engine", "q4", body, 10, 3, 1, "")
110+
j := engine.NewJob("ns-engine", "q4", body, nil, 10, 3, 1, "")
111111
jobID, err := e.Publish(j)
112112
if err != nil {
113113
t.Fatalf("Failed to publish: %s", err)
114114
}
115-
j2 := engine.NewJob("ns-engine", "q5", body, 10, 1, 1, "")
115+
j2 := engine.NewJob("ns-engine", "q5", body, nil, 10, 1, 1, "")
116116
jobID2, err := e.Publish(j2)
117117
if err != nil {
118118
t.Fatalf("Failed to publish: %s", err)
@@ -148,7 +148,7 @@ func TestEngine_Peek(t *testing.T) {
148148
}
149149
defer e.Shutdown()
150150
body := []byte("hello msg 6")
151-
j := engine.NewJob("ns-engine", "q6", body, 10, 0, 1, "")
151+
j := engine.NewJob("ns-engine", "q6", body, nil, 10, 0, 1, "")
152152
jobID, err := e.Publish(j)
153153
if err != nil {
154154
t.Fatalf("Failed to publish: %s", err)
@@ -173,7 +173,7 @@ func TestEngine_BatchConsume(t *testing.T) {
173173
}
174174
defer e.Shutdown()
175175
body := []byte("hello msg 7")
176-
j := engine.NewJob("ns-engine", "q7", body, 10, 3, 1, "")
176+
j := engine.NewJob("ns-engine", "q7", body, nil, 10, 3, 1, "")
177177
jobID, err := e.Publish(j)
178178
t.Log(jobID)
179179
if err != nil {
@@ -199,7 +199,7 @@ func TestEngine_BatchConsume(t *testing.T) {
199199
// Consume some jobs
200200
jobIDMap := map[string]bool{}
201201
for i := 0; i < 4; i++ {
202-
j := engine.NewJob("ns-engine", "q7", body, 10, 0, 1, "")
202+
j := engine.NewJob("ns-engine", "q7", body, nil, 10, 0, 1, "")
203203
jobID, err := e.Publish(j)
204204
t.Log(jobID)
205205
if err != nil {
@@ -251,7 +251,7 @@ func TestEngine_PublishWithJobID(t *testing.T) {
251251
}
252252
defer e.Shutdown()
253253
body := []byte("hello msg 1")
254-
j := engine.NewJob("ns-engine", "q8", body, 10, 0, 1, "jobID1")
254+
j := engine.NewJob("ns-engine", "q8", body, nil, 10, 0, 1, "jobID1")
255255
jobID, err := e.Publish(j)
256256
t.Log(jobID)
257257
assert.Nil(t, err)

0 commit comments

Comments
 (0)