Skip to content

Commit bc3b3a1

Browse files
authored
Add support of parsing job body format according to the job id version (#225)
Currently, we store the job payload in Redis without any encoding, so it's possible to extend more fields for a job like attributes, etc. To mitigate this issue, we introduce the version prefix for the job id to identify different job payload formats. Use the length to tell if it's a legacy id or not since the ulid's id is a fixed-length string(26 chars). That said, we will return the value as the job body directly if the length is 26. Otherwise, decode it in JSON format. To avoid introducing breaking changes during the upgrade stage, we add a new HTTP header: Enable-Job-Version to enable this feature. The new job format would be enabled only if the header `Enable-Job-Version: yes` was explicitly passed in the publish request. So that we can smoothly support the new job payload format by upgrading the server first, and enabling it on the client side since then.
1 parent dd55a0a commit bc3b3a1

File tree

8 files changed

+184
-128
lines changed

8 files changed

+184
-128
lines changed

engine/job.go

+1-69
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,7 @@ package engine
22

33
import (
44
"encoding"
5-
"encoding/binary"
65
"encoding/json"
7-
"errors"
86

97
"github.com/bitleak/lmstfy/uuid"
108
)
@@ -20,8 +18,6 @@ type Job interface {
2018
ElapsedMS() int64
2119
Attributes() map[string]string
2220

23-
encoding.BinaryMarshaler
24-
encoding.BinaryUnmarshaler
2521
encoding.TextMarshaler
2622
}
2723

@@ -43,7 +39,7 @@ type jobImpl struct {
4339
// a tombstone record in that AOF.
4440
func NewJob(namespace, queue string, body []byte, ttl, delay uint32, tries uint16, jobID string) Job {
4541
if jobID == "" {
46-
jobID = uuid.GenUniqueJobIDWithDelay(delay)
42+
jobID = uuid.GenJobIDWithVersion(0, delay)
4743
}
4844
return &jobImpl{
4945
namespace: namespace,
@@ -110,70 +106,6 @@ func (j *jobImpl) Attributes() map[string]string {
110106
return j.attributes
111107
}
112108

113-
// Marshal into binary of the format:
114-
// {total len: 4 bytes}{ns len: 1 byte}{ns}{queue len: 1 byte}{queue}{id: 16 bytes}{ttl: 4 bytes}{tries: 2 byte}{job data}
115-
func (j *jobImpl) MarshalBinary() (data []byte, err error) {
116-
nsLen := len(j.namespace)
117-
qLen := len(j.queue)
118-
bodyLen := len(j.body)
119-
totalSize := 1 + nsLen + 1 + qLen + 16 + 4 + 2 + bodyLen
120-
buf := make([]byte, totalSize+4)
121-
binary.LittleEndian.PutUint32(buf, uint32(totalSize))
122-
123-
nsOffset := 4 + 1
124-
qOffset := nsOffset + nsLen + 1
125-
idOffset := qOffset + qLen
126-
ttlOffset := idOffset + 16
127-
triesOffset := ttlOffset + 4
128-
jobOffset := triesOffset + 2
129-
130-
buf[4] = uint8(nsLen)
131-
copy(buf[nsOffset:], j.namespace)
132-
buf[qOffset-1] = uint8(qLen)
133-
copy(buf[qOffset:], j.queue)
134-
binID := uuid.UniqueIDToBinary(j.id)
135-
copy(buf[idOffset:], binID[:]) // binary ID is 16 byte-long
136-
binary.LittleEndian.PutUint32(buf[ttlOffset:], j.ttl)
137-
binary.LittleEndian.PutUint16(buf[triesOffset:], j.tries)
138-
copy(buf[jobOffset:], j.body)
139-
return buf, nil
140-
}
141-
142-
func (j *jobImpl) UnmarshalBinary(data []byte) error {
143-
if len(data) <= 4 {
144-
return errors.New("data too small")
145-
}
146-
totalSize := binary.LittleEndian.Uint32(data[0:])
147-
if len(data) != int(totalSize)+4 {
148-
return errors.New("corrupted data")
149-
}
150-
151-
nsLen := int(data[4])
152-
nsOffset := 4 + 1
153-
j.namespace = string(data[nsOffset : nsOffset+nsLen])
154-
qOffset := nsOffset + nsLen + 1
155-
qLen := int(data[qOffset-1])
156-
j.queue = string(data[qOffset : qOffset+qLen])
157-
idOffset := qOffset + qLen
158-
var binaryID [16]byte
159-
copy(binaryID[:], data[idOffset:idOffset+16])
160-
j.id = uuid.BinaryToUniqueID(binaryID)
161-
ttlOffset := idOffset + 16
162-
j.ttl = binary.LittleEndian.Uint32(data[ttlOffset:])
163-
triesOffset := ttlOffset + 4
164-
j.tries = binary.LittleEndian.Uint16(data[triesOffset:])
165-
jobOffset := triesOffset + 2
166-
j.body = make([]byte, len(data)-jobOffset)
167-
copy(j.body, data[jobOffset:])
168-
169-
delay, err := uuid.ExtractDelaySecondFromUniqueID(j.id)
170-
if err != nil {
171-
return err
172-
}
173-
j.delay = delay
174-
return nil
175-
}
176-
177109
func (j *jobImpl) MarshalText() (text []byte, err error) {
178110
var job struct {
179111
Namespace string `json:"namespace"`

engine/job_test.go

-27
This file was deleted.

engine/redis/pool.go

+46-19
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,20 @@
11
package redis
22

33
import (
4+
"encoding/json"
45
"errors"
56
"time"
67

78
go_redis "github.com/go-redis/redis/v8"
89

910
"github.com/bitleak/lmstfy/engine"
11+
"github.com/bitleak/lmstfy/uuid"
1012
)
1113

14+
type JobPayload struct {
15+
Body []byte `json:"body"`
16+
}
17+
1218
// Pool stores all the jobs' data. this is a global singleton per engine
1319
// note: this `Pool` is NOT the same terminology as the EnginePool
1420
type Pool struct {
@@ -33,14 +39,24 @@ func PoolJobKeyPrefix(namespace, queue string) string {
3339
return join(PoolPrefix, namespace, queue)
3440
}
3541

36-
func (p *Pool) Add(j engine.Job) error {
37-
body := j.Body()
42+
func (p *Pool) Add(j engine.Job) (err error) {
3843
metrics.poolAddJobs.WithLabelValues(p.redis.Name).Inc()
44+
45+
// For the version 0(legacy) jobID, the payload is the body directly,
46+
// for the version 1 jobID, the payload is a JSON string contains the body.
47+
payload := j.Body()
48+
if uuid.ExtractJobIDVersion(j.ID()) != 0 {
49+
payload, err = json.Marshal(JobPayload{Body: j.Body()})
50+
if err != nil {
51+
return err
52+
}
53+
}
54+
3955
// SetNX return OK(true) if key didn't exist before.
40-
ok, err := p.redis.Conn.SetNX(dummyCtx, PoolJobKey(j), body, time.Duration(j.TTL())*time.Second).Result()
56+
ok, err := p.redis.Conn.SetNX(dummyCtx, PoolJobKey(j), payload, time.Duration(j.TTL())*time.Second).Result()
4157
if err != nil {
4258
// Just retry once.
43-
ok, err = p.redis.Conn.SetNX(dummyCtx, PoolJobKey(j), body, time.Duration(j.TTL())*time.Second).Result()
59+
ok, err = p.redis.Conn.SetNX(dummyCtx, PoolJobKey(j), payload, time.Duration(j.TTL())*time.Second).Result()
4460
}
4561
if err != nil {
4662
return err
@@ -57,24 +73,35 @@ func (p *Pool) Get(namespace, queue, jobID string) (body []byte, ttlSecond uint3
5773
getCmd := pipeline.Get(dummyCtx, jobKey)
5874
ttlCmd := pipeline.TTL(dummyCtx, jobKey)
5975
_, err = pipeline.Exec(dummyCtx)
60-
switch err {
61-
case nil:
62-
val := getCmd.Val()
63-
ttl := int64(ttlCmd.Val().Seconds())
64-
if ttl < 0 {
65-
// Use `0` to identify indefinite TTL, NOTE: in redis ttl=0 is possible when
66-
// the key is not recycled fast enough. but here is okay we use `0` to identify
67-
// indefinite TTL, because we issue GET cmd before TTL cmd, so the ttl must be > 0,
68-
// OR GET cmd would fail.
69-
ttl = 0
76+
if err != nil {
77+
if errors.Is(err, go_redis.Nil) {
78+
return nil, 0, engine.ErrNotFound
7079
}
71-
metrics.poolGetJobs.WithLabelValues(p.redis.Name).Inc()
72-
return []byte(val), uint32(ttl), nil
73-
case go_redis.Nil:
74-
return nil, 0, engine.ErrNotFound
75-
default:
7680
return nil, 0, err
7781
}
82+
83+
val := []byte(getCmd.Val())
84+
ttl := int64(ttlCmd.Val().Seconds())
85+
if ttl < 0 {
86+
// Use `0` to identify indefinite TTL, NOTE: in redis ttl=0 is possible when
87+
// the key is not recycled fast enough. but here is okay we use `0` to identify
88+
// indefinite TTL, because we issue GET cmd before TTL cmd, so the ttl must be > 0,
89+
// OR GET cmd would fail.
90+
ttl = 0
91+
}
92+
metrics.poolGetJobs.WithLabelValues(p.redis.Name).Inc()
93+
if uuid.ExtractJobIDVersion(jobID) == 0 {
94+
// For the version 0(legacy) jobID, the val only contains the body,
95+
// so we need to return the val as body directly.
96+
return val, uint32(ttl), nil
97+
}
98+
// For the version 1 jobID, the value is encoded as a JSON string,
99+
// need to unmarshal it before return.
100+
var payload JobPayload
101+
if err := json.Unmarshal(val, &payload); err != nil {
102+
return nil, 0, err
103+
}
104+
return payload.Body, uint32(ttl), nil
78105
}
79106

80107
func (p *Pool) Delete(namespace, queue, jobID string) error {

engine/redis/pool_test.go

+19
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,10 @@ import (
66
"time"
77

88
go_redis "github.com/go-redis/redis/v8"
9+
"github.com/stretchr/testify/require"
910

1011
"github.com/bitleak/lmstfy/engine"
12+
"github.com/bitleak/lmstfy/uuid"
1113
)
1214

1315
func TestPool_Add(t *testing.T) {
@@ -55,3 +57,20 @@ func TestPool_Get(t *testing.T) {
5557
t.Fatalf("Expected TTL is around 50 seconds")
5658
}
5759
}
60+
61+
func TestPool_GetCompatibility(t *testing.T) {
62+
p := NewPool(R)
63+
64+
t.Run("test job with different versions should get correct body", func(t *testing.T) {
65+
for i := 0; i <= uuid.JobIDV1; i++ {
66+
jobID := uuid.GenJobIDWithVersion(i, 123)
67+
job := engine.NewJob("ns-pool", "q5", []byte("hello msg 5"), 50, 0, 1, jobID)
68+
p.Add(job)
69+
body, ttl, err := p.Get(job.Namespace(), job.Queue(), job.ID())
70+
require.NoError(t, err)
71+
require.Equal(t, []byte("hello msg 5"), body)
72+
require.InDelta(t, 50, ttl, 5)
73+
require.Equal(t, i, uuid.ExtractJobIDVersion(job.ID()))
74+
}
75+
})
76+
}

server/handlers/queue.go

+21-2
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"github.com/sirupsen/logrus"
1111

1212
"github.com/bitleak/lmstfy/engine"
13+
"github.com/bitleak/lmstfy/uuid"
1314
)
1415

1516
const (
@@ -31,6 +32,8 @@ func Publish(c *gin.Context) {
3132
queue := c.Param("queue")
3233
jobID := c.Param("job_id")
3334

35+
enabledJobVersion := strings.ToUpper(c.GetHeader("Enable-Job-Version")) == "YES"
36+
3437
if jobID != "" {
3538
// delete job whatever other publish parameters
3639
if err := e.Delete(namespace, queue, jobID); err != nil {
@@ -85,7 +88,14 @@ func Publish(c *gin.Context) {
8588
c.JSON(http.StatusRequestEntityTooLarge, gin.H{"error": "body too large"})
8689
return
8790
}
88-
job := engine.NewJob(namespace, queue, body, uint32(ttlSecond), uint32(delaySecond), uint16(tries), "")
91+
92+
if enabledJobVersion {
93+
jobID = uuid.GenJobIDWithVersion(uuid.JobIDV1, uint32(delaySecond))
94+
} else {
95+
// use the legacy jobID if the version is not enabled
96+
jobID = uuid.GenJobIDWithVersion(0, uint32(delaySecond))
97+
}
98+
job := engine.NewJob(namespace, queue, body, uint32(ttlSecond), uint32(delaySecond), uint16(tries), jobID)
8999
jobID, err = e.Publish(job)
90100
if err != nil {
91101
logger.WithFields(logrus.Fields{
@@ -122,6 +132,8 @@ func PublishBulk(c *gin.Context) {
122132
namespace := c.Param("namespace")
123133
queue := c.Param("queue")
124134

135+
enabledJobVersion := strings.ToUpper(c.GetHeader("Enable-Job-Version")) == "YES"
136+
125137
delaySecondStr := c.DefaultQuery("delay", DefaultDelay)
126138
delaySecond, err := strconv.ParseUint(delaySecondStr, 10, 32)
127139
if err != nil {
@@ -180,7 +192,14 @@ func PublishBulk(c *gin.Context) {
180192

181193
jobIDs := make([]string, 0)
182194
for _, job := range jobs {
183-
j := engine.NewJob(namespace, queue, job, uint32(ttlSecond), uint32(delaySecond), uint16(tries), "")
195+
var jobID string
196+
if enabledJobVersion {
197+
jobID = uuid.GenJobIDWithVersion(uuid.JobIDV1, uint32(delaySecond))
198+
} else {
199+
// use the legacy jobID if the version is not enabled
200+
jobID = uuid.GenJobIDWithVersion(0, uint32(delaySecond))
201+
}
202+
j := engine.NewJob(namespace, queue, job, uint32(ttlSecond), uint32(delaySecond), uint16(tries), jobID)
184203
jobID, err := e.Publish(j)
185204
if err != nil {
186205
logger.WithFields(logrus.Fields{

server/handlers/queue_test.go

+37
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,11 @@ import (
1212
"time"
1313

1414
"github.com/magiconair/properties/assert"
15+
"github.com/stretchr/testify/require"
1516

1617
"github.com/bitleak/lmstfy/engine"
1718
"github.com/bitleak/lmstfy/server/handlers"
19+
"github.com/bitleak/lmstfy/uuid"
1820
)
1921

2022
func TestPublish(t *testing.T) {
@@ -543,6 +545,41 @@ func TestPublishBulk(t *testing.T) {
543545
}
544546
}
545547

548+
func TestPublish_WithJobVersion(t *testing.T) {
549+
for _, enable := range []string{"YES", "NO"} {
550+
query := url.Values{}
551+
query.Add("delay", "0")
552+
query.Add("ttl", "10")
553+
query.Add("tries", "1")
554+
targetUrl := fmt.Sprintf("http://localhost/api/ns/q18?%s", query.Encode())
555+
body := strings.NewReader("hello job version")
556+
req, err := http.NewRequest("PUT", targetUrl, body)
557+
req.Header.Add("Enable-Job-Version", enable)
558+
require.NoError(t, err, "Failed to create request")
559+
560+
c, e, resp := ginTest(req)
561+
e.Use(handlers.ValidateParams, handlers.SetupQueueEngine)
562+
e.PUT("/api/:namespace/:queue", handlers.Publish)
563+
e.HandleContext(c)
564+
565+
require.Equal(t, http.StatusCreated, resp.Code, "Failed to publish")
566+
var payload struct {
567+
JobID string `json:"job_id"`
568+
}
569+
require.NoError(t, json.Unmarshal(resp.Body.Bytes(), &payload))
570+
expectedVersion := 0
571+
if enable == "YES" {
572+
expectedVersion = uuid.JobIDV1
573+
}
574+
require.Equal(t, expectedVersion, uuid.ExtractJobIDVersion(payload.JobID))
575+
576+
// Consume should also return the correct version and job body
577+
bytes, jobID := consumeTestJob("ns", "q18", 10, 3)
578+
require.Equal(t, expectedVersion, uuid.ExtractJobIDVersion(jobID))
579+
require.Equal(t, "hello job version", string(bytes))
580+
}
581+
}
582+
546583
func publishTestJob(ns, q string, delay, ttl uint32) (body []byte, jobID string) {
547584
e := engine.GetEngine("")
548585
body = make([]byte, 10)

0 commit comments

Comments
 (0)