Skip to content

Commit b419052

Browse files
committed
特性:支持 commit 读取进度及 close 优化
1 parent 66bca74 commit b419052

5 files changed

Lines changed: 145 additions & 76 deletions

File tree

README.md

Lines changed: 33 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -14,34 +14,55 @@ Disk-based FIFO queue
1414
## Getting Started
1515

1616
```
17-
go get -u github.com/4x99/diskqueue
17+
go get -u github.com/yoonper/diskqueue
1818
```
1919

2020
```
2121
package main
2222
2323
import (
2424
"fmt"
25-
"github.com/4x99/diskqueue"
25+
"github.com/yoonper/diskqueue"
2626
"log"
27+
"time"
2728
)
2829
2930
func main() {
31+
var err error
32+
var queue *diskqueue.Diskqueue
33+
34+
// config
35+
diskqueue.Config.Path = "/tmp/diskqueue"
36+
diskqueue.Config.BatchSize = 1
37+
3038
// start
31-
diskqueue.Config.Path = "/tmp"
32-
queue, err := diskqueue.Start()
33-
if err != nil {
39+
if queue, err = diskqueue.Start(); err != nil {
3440
log.Fatalln(err)
3541
}
3642
37-
// write data
38-
err = queue.Write([]byte("data"))
39-
fmt.Println(err)
43+
// write
44+
go func() {
45+
for {
46+
time.Sleep(time.Second)
47+
data := []byte(time.Now().Format("2006-01-02 15:04:05"))
48+
if err := queue.Write(data); err != nil {
49+
fmt.Println(err)
50+
}
51+
}
52+
}()
4053
41-
// read data
42-
if data, err := queue.Read(); err != nil {
43-
fmt.Println(data)
44-
}
54+
// read
55+
go func() {
56+
for {
57+
time.Sleep(time.Second)
58+
if index, offset, data, err := queue.Read(); err == nil {
59+
fmt.Println(index, offset, string(data))
60+
queue.Commit(index, offset) // commit
61+
}
62+
}
63+
}()
64+
65+
select {}
4566
}
4667
```
4768

config.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,6 @@ type config struct {
1212
BatchTime time.Duration // interval per sync
1313
SegmentSize int64 // size of each segment (in bytes)
1414
SegmentLimit int64 // max number of segment
15-
CheckpointFile string // record read offset
15+
CheckpointFile string // read index and offset
1616
MinRequiredSpace int64 // minimum required free space (in bytes)
1717
}

diskqueue.go

Lines changed: 29 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package diskqueue
22

33
import (
4+
"context"
45
"errors"
56
"io"
67
"os"
@@ -10,9 +11,11 @@ import (
1011

1112
type Diskqueue struct {
1213
sync.RWMutex
13-
close bool
14-
closeChan chan bool
15-
ticker *time.Ticker
14+
close bool
15+
ticker *time.Ticker
16+
wg *sync.WaitGroup
17+
ctx context.Context
18+
cancel context.CancelFunc
1619
}
1720

1821
var (
@@ -36,10 +39,10 @@ func Start() (*Diskqueue, error) {
3639
if _, err := os.Stat(Config.Path); err != nil {
3740
return nil, err
3841
}
39-
40-
queue := &Diskqueue{close: false, closeChan: make(chan bool)}
42+
queue := &Diskqueue{close: false, wg: &sync.WaitGroup{}}
4143
queue.ticker = time.NewTicker(Config.BatchTime)
42-
Reader.restore()
44+
queue.ctx, queue.cancel = context.WithCancel(context.TODO())
45+
_ = Reader.restore()
4346
go queue.sync()
4447
return queue, nil
4548
}
@@ -57,19 +60,30 @@ func (queue *Diskqueue) Write(data []byte) error {
5760
}
5861

5962
// Read data
60-
func (queue *Diskqueue) Read() ([]byte, error) {
63+
func (queue *Diskqueue) Read() (int64, int64, []byte, error) {
6164
if queue.close {
62-
return nil, errors.New("closed")
65+
return 0, 0, nil, errors.New("closed")
6366
}
6467

6568
queue.RLock()
6669
defer queue.RUnlock()
6770

68-
data, err := Reader.read()
71+
index, offset, data, err := Reader.read()
6972
if err == io.EOF && (Writer.file == nil || Reader.file.Name() != Writer.file.Name()) {
7073
_ = Reader.rotate()
7174
}
72-
return data, err
75+
return index, offset, data, err
76+
}
77+
78+
// Commit index and offset
79+
func (queue *Diskqueue) Commit(index int64, offset int64) {
80+
if queue.close {
81+
return
82+
}
83+
84+
ck := &Reader.checkpoint
85+
ck.Index, ck.Offset = index, offset
86+
Reader.sync()
7387
}
7488

7589
// Close diskqueue
@@ -79,21 +93,23 @@ func (queue *Diskqueue) Close() {
7993
}
8094

8195
queue.close = true
82-
queue.closeChan <- true
96+
queue.cancel()
97+
queue.wg.Wait()
8398
Writer.close()
8499
Reader.close()
85100
}
86101

87102
// sync data
88103
func (queue *Diskqueue) sync() {
104+
queue.wg.Add(1)
105+
defer queue.wg.Done()
89106
for {
90107
select {
91108
case <-queue.ticker.C:
92109
queue.Lock()
93110
Writer.sync()
94-
Reader.sync()
95111
queue.Unlock()
96-
case <-queue.closeChan:
112+
case <-queue.ctx.Done():
97113
return
98114
}
99115
}

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
module github.com/4x99/diskqueue
22

3-
go 1.17
3+
go 1.18

reader.go

Lines changed: 81 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@ package diskqueue
22

33
import (
44
"bufio"
5+
"encoding/json"
56
"errors"
7+
"fmt"
68
"io/ioutil"
79
"os"
810
"path"
@@ -12,51 +14,63 @@ import (
1214
)
1315

1416
type reader struct {
15-
file *os.File
16-
offset int64
17-
reader *bufio.Reader
17+
file *os.File
18+
index int64
19+
offset int64
20+
reader *bufio.Reader
21+
checkpoint checkpoint
22+
}
23+
24+
type checkpoint struct {
25+
Index int64 `json:"index"`
26+
Offset int64 `json:"offset"`
1827
}
1928

2029
// read data
21-
func (r *reader) read() ([]byte, error) {
22-
// open a new segment
23-
if err := r.open(); err != nil {
24-
return nil, err
30+
func (r *reader) read() (int64, int64, []byte, error) {
31+
if err := r.check(); err != nil {
32+
return r.index, r.offset, nil, err
2533
}
2634

2735
// read a line
2836
data, _, err := r.reader.ReadLine()
2937
if err != nil {
30-
return nil, err
38+
return r.index, r.offset, nil, err
3139
}
3240

3341
r.offset += int64(len(data)) + 1
34-
return data, err
42+
return r.index, r.offset, data, err
3543
}
3644

37-
// open a new segment
38-
func (r *reader) open() error {
45+
// check a new segment
46+
func (r *reader) check() error {
3947
if r.file != nil {
4048
return nil
4149
}
4250

43-
files, err := r.list()
51+
file, err := r.next()
4452
if err != nil {
4553
return err
4654
}
4755

48-
// open the earliest segment
49-
if r.file, err = os.OpenFile(files[0], os.O_RDONLY, Config.FilePerm); err != nil {
56+
return r.open(file)
57+
}
58+
59+
func (r *reader) open(file string) (err error) {
60+
if r.file, err = os.OpenFile(file, os.O_RDONLY, Config.FilePerm); err != nil {
5061
return err
5162
}
5263

64+
// get file index
65+
r.index = r.getIndex(file)
66+
5367
// seek read offset
5468
if _, err = r.file.Seek(r.offset, 0); err != nil {
5569
return err
5670
}
5771

5872
r.reader = bufio.NewReader(r.file)
59-
return err
73+
return nil
6074
}
6175

6276
// rotate to next segment
@@ -66,58 +80,76 @@ func (r *reader) rotate() error {
6680
}
6781

6882
// close segment
69-
r.file.Close()
70-
71-
// remove segment
72-
if err := os.Remove(r.file.Name()); err != nil {
73-
return err
74-
}
75-
83+
_ = r.file.Close()
7684
r.file, r.offset, r.reader = nil, 0, nil
77-
r.sync()
78-
7985
return nil
8086
}
8187

82-
// list all segments
83-
func (r *reader) list() ([]string, error) {
84-
files, err := filepath.Glob(filepath.Join(Config.Path, "*.data"))
85-
if err != nil {
86-
return nil, err
88+
// close reader
89+
func (r *reader) close() {
90+
if r.file == nil {
91+
return
8792
}
8893

89-
if len(files) == 0 {
90-
return nil, errors.New("empty")
94+
if err := r.file.Close(); err != nil {
95+
return
9196
}
9297

93-
sort.Strings(files)
94-
return files, err
98+
r.file, r.reader, r.index, r.offset = nil, nil, 0, 0
9599
}
96100

97-
// sync read offset
101+
// sync index and offset
98102
func (r *reader) sync() {
99103
name := path.Join(Config.Path, Config.CheckpointFile)
100-
offset := []byte(strconv.FormatInt(r.offset, 10))
101-
_ = ioutil.WriteFile(name, offset, Config.FilePerm)
104+
data, _ := json.Marshal(&r.checkpoint)
105+
_ = ioutil.WriteFile(name, data, Config.FilePerm)
102106
}
103107

104-
// close reader
105-
func (r *reader) close() {
106-
if r.file == nil {
107-
return
108+
// restore index and offset
109+
func (r *reader) restore() (err error) {
110+
name := path.Join(Config.Path, Config.CheckpointFile)
111+
data, _ := ioutil.ReadFile(name)
112+
_ = json.Unmarshal(data, &r.checkpoint)
113+
r.index, r.offset = r.checkpoint.Index, r.checkpoint.Offset
114+
115+
if r.index == 0 {
116+
return nil
108117
}
109118

110-
r.sync()
111-
if err := r.file.Close(); err != nil {
112-
return
119+
if err = r.open(fmt.Sprintf("%s/%d.data", Config.Path, r.index)); err != nil {
120+
r.offset = 0
113121
}
114122

115-
r.file, r.offset, r.reader = nil, 0, nil
123+
return err
116124
}
117125

118-
// restore read offset
119-
func (r *reader) restore() {
120-
name := path.Join(Config.Path, Config.CheckpointFile)
121-
offset, _ := ioutil.ReadFile(name)
122-
r.offset, _ = strconv.ParseInt(string(offset), 10, 64)
126+
// next segment
127+
func (r *reader) next() (string, error) {
128+
files, err := filepath.Glob(filepath.Join(Config.Path, "*.data"))
129+
if err != nil {
130+
return "", err
131+
}
132+
133+
sort.Strings(files)
134+
135+
for _, file := range files {
136+
index := r.getIndex(file)
137+
if index < r.checkpoint.Index {
138+
_ = os.Remove(file) // remove expired segment
139+
}
140+
141+
if index > r.index {
142+
return file, nil
143+
}
144+
}
145+
146+
return "", errors.New("queue is empty")
147+
}
148+
149+
// get segment index
150+
func (r *reader) getIndex(filename string) int64 {
151+
base := path.Base(filename)
152+
name := base[0 : len(base)-len(path.Ext(filename))]
153+
index, _ := strconv.ParseInt(name, 10, 64)
154+
return index
123155
}

0 commit comments

Comments
 (0)