Skip to content

Commit ffab365

Browse files
committed
feature: support quick merge
1 parent ef9a11c commit ffab365

File tree

7 files changed

+234
-22
lines changed

7 files changed

+234
-22
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ Feature
1212
- [x] Min Heap for efficient sorting
1313
- [x] Support gzip format
1414
- [x] Support processing each line
15+
- [x] Quick merge
1516

1617

1718
Usage

logmerge.go

Lines changed: 187 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,11 @@ import (
2323
"bufio"
2424
"compress/gzip"
2525
"container/heap"
26+
"context"
2627
"errors"
2728
"io"
2829
"os"
30+
"sync"
2931
)
3032

3133
// Action defined the read log behaviour.
@@ -43,17 +45,22 @@ const (
4345
var (
4446
// NEED_TIMEHANDLER returned when the getTime function is nil.
4547
NEED_TIMEHANDLER = errors.New("need time handler")
48+
// NEED_ERRCHAN returned when using quick merge without err channel.
49+
NEED_ERRCHAN = errors.New("need error channel")
4650
)
4751

4852
/*
4953
TimeHandler defined handlers for getting timestamp from each line.
5054
*/
5155
type TimeHandler = func([]byte) (int64, Action, error)
5256

57+
/*
58+
FilterHandler defined handlers for modifying each line.
59+
*/
60+
5361
type FilterHandler = func([]byte) ([]byte, Action, error)
5462

5563
type fileReader struct {
56-
filename string
5764
scanner *bufio.Scanner
5865
timestamp int64
5966
line []byte
@@ -66,15 +73,26 @@ type fileReader struct {
6673
Option defined some option can set for merging.
6774
*/
6875
type Option struct {
69-
SrcPath []string // Merge src File Path
70-
DstPath string // The filePath merge to
71-
SrcReader []io.Reader // DstReader io.Reader
72-
DstWriter io.Writer
73-
SrcGzip bool // Whether src file is in gzip format
74-
DstGzip bool // Merge file in gzip format
75-
DeleteSrc bool // Delete src file
76-
GetTime TimeHandler // The function to getTime from each line
77-
Filter FilterHandler // The function to process each line
76+
SrcPath []string // Merge src File Path
77+
DstPath string // The filePath merge to
78+
SrcReader []io.Reader // Src files' io.Reader
79+
DstWriter io.Writer // Destinated file's io.Writer
80+
SrcGzip bool // Whether src file is in gzip format
81+
DstGzip bool // Merge file in gzip format
82+
DeleteSrc bool // Delete src file
83+
GetTime TimeHandler // The function to getTime from each line
84+
Filter FilterHandler // The function to process each line
85+
Goroutine int // Quick merge's worker number
86+
ErrChan chan error // Quick merge's error return
87+
CTX context.Context // Quick merge's context
88+
}
89+
90+
type quickMergeJob struct {
91+
scanner *bufio.Scanner
92+
writer chan *[]byte
93+
filter FilterHandler
94+
errChan chan error
95+
ctx context.Context
7896
}
7997

8098
type fileHeap struct {
@@ -84,7 +102,9 @@ type fileHeap struct {
84102

85103
func (fh fileHeap) Len() int { return len(fh.readers) }
86104

87-
func (fh fileHeap) Less(i, j int) bool { return fh.readers[i].timestamp < fh.readers[j].timestamp }
105+
func (fh fileHeap) Less(i, j int) bool {
106+
return fh.readers[i].timestamp < fh.readers[j].timestamp
107+
}
88108

89109
func (fh fileHeap) Swap(i, j int) {
90110
fh.readers[i], fh.readers[j] = fh.readers[j], fh.readers[i]
@@ -170,17 +190,6 @@ func (fh *fileHeap) merge() error {
170190
return nil
171191
}
172192

173-
// Merge files to output file, and use getTime function to get timestamp.
174-
func Merge(srcPath []string, dstPath string, getTime TimeHandler) error {
175-
option := Option{
176-
SrcPath: srcPath,
177-
DstPath: dstPath,
178-
GetTime: getTime,
179-
}
180-
181-
return MergeByOption(option)
182-
}
183-
184193
func merge(readers []*bufio.Scanner, writer *bufio.Writer, getTime TimeHandler, filter FilterHandler) error {
185194
fHeap := new(fileHeap)
186195

@@ -208,6 +217,55 @@ func merge(readers []*bufio.Scanner, writer *bufio.Writer, getTime TimeHandler,
208217
return fHeap.merge()
209218
}
210219

220+
func quickMerge(job *quickMergeJob) {
221+
scanner := job.scanner
222+
writer := job.writer
223+
filter := job.filter
224+
errChan := job.errChan
225+
226+
for {
227+
select {
228+
case <-job.ctx.Done():
229+
return
230+
default:
231+
if ok := scanner.Scan(); !ok {
232+
if err := scanner.Err(); err != nil {
233+
errChan <- err
234+
}
235+
236+
// EOF
237+
return
238+
}
239+
240+
line := scanner.Bytes()
241+
if filter != nil {
242+
newline, action, err := job.filter(line)
243+
if action == SKIP {
244+
continue
245+
} else if action == STOP {
246+
errChan <- err
247+
return
248+
}
249+
250+
line = newline
251+
}
252+
253+
writer <- &line
254+
}
255+
}
256+
}
257+
258+
// Merge files to output file, and use getTime function to get timestamp.
259+
func Merge(srcPath []string, dstPath string, getTime TimeHandler) error {
260+
option := Option{
261+
SrcPath: srcPath,
262+
DstPath: dstPath,
263+
GetTime: getTime,
264+
}
265+
266+
return MergeByOption(option)
267+
}
268+
211269
// Use option to control merge behaviour.
212270
func MergeByOption(option Option) error {
213271
if option.GetTime == nil {
@@ -286,3 +344,110 @@ func MergeByOption(option Option) error {
286344

287345
return nil
288346
}
347+
348+
// Quick merge used for without sorting
349+
func QuickMerge(option Option) error {
350+
var wg sync.WaitGroup
351+
jobChan := make(chan *quickMergeJob, len(option.SrcPath))
352+
writerChan := make(chan *[]byte, len(option.SrcPath)*100)
353+
354+
if option.ErrChan == nil {
355+
return NEED_ERRCHAN
356+
}
357+
358+
if option.CTX == nil {
359+
option.CTX = context.Background()
360+
}
361+
362+
finishedCount := 0
363+
var mutex sync.Mutex
364+
for i := 0; i < option.Goroutine; i++ {
365+
wg.Add(1)
366+
go func() {
367+
for job := range jobChan {
368+
quickMerge(job)
369+
}
370+
wg.Done()
371+
372+
mutex.Lock()
373+
finishedCount++
374+
if finishedCount == option.Goroutine {
375+
close(writerChan)
376+
}
377+
mutex.Unlock()
378+
}()
379+
}
380+
381+
for _, fp := range option.SrcPath {
382+
fd, err := os.Open(fp)
383+
if err != nil {
384+
option.ErrChan <- err
385+
}
386+
387+
defer fd.Close()
388+
389+
var scanner *bufio.Scanner
390+
if option.SrcGzip {
391+
gzReader, err := gzip.NewReader(fd)
392+
if err != nil {
393+
option.ErrChan <- err
394+
}
395+
396+
defer gzReader.Close()
397+
398+
scanner = bufio.NewScanner(gzReader)
399+
} else {
400+
scanner = bufio.NewScanner(fd)
401+
}
402+
403+
jobChan <- &quickMergeJob{
404+
scanner: scanner,
405+
writer: writerChan,
406+
filter: option.Filter,
407+
errChan: option.ErrChan,
408+
ctx: option.CTX,
409+
}
410+
}
411+
close(jobChan)
412+
413+
fd, err := os.Create(option.DstPath)
414+
if err != nil {
415+
option.ErrChan <- err
416+
return nil
417+
}
418+
419+
defer fd.Close()
420+
421+
var writer *bufio.Writer
422+
if option.DstGzip {
423+
gzWriter := gzip.NewWriter(fd)
424+
defer gzWriter.Close()
425+
426+
writer = bufio.NewWriter(gzWriter)
427+
} else {
428+
writer = bufio.NewWriter(fd)
429+
}
430+
431+
loop:
432+
for {
433+
select {
434+
case <-option.CTX.Done():
435+
return nil
436+
case line, ok := <-writerChan:
437+
// chan closed
438+
if !ok {
439+
break loop
440+
}
441+
442+
if _, err := writer.Write(append(*line, '\n')); err != nil {
443+
option.ErrChan <- err
444+
continue
445+
}
446+
447+
writer.Flush()
448+
}
449+
}
450+
451+
wg.Wait()
452+
return nil
453+
}

logmerge_test.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@ package logmerge
22

33
import (
44
"compress/gzip"
5+
"context"
56
"errors"
7+
"fmt"
68
"io"
79
"io/ioutil"
810
"os"
@@ -32,6 +34,11 @@ const (
3234
hello world
3335
hello world
3436
hello world
37+
`
38+
39+
EXPECTED4 = `2020/01/18 12:20:30 [error] 177003#0: *1004128358 recv() failed (104: Connection reset by peer)
40+
2020/01/18 12:20:30 [error] 177003#0: *1004128358 recv() failed (104: Connection reset by peer)
41+
2020/01/18 12:20:30 [error] 177003#0: *1004128358 recv() failed (104: Connection reset by peer)
3542
`
3643
)
3744

@@ -371,3 +378,36 @@ func TestFilter(t *testing.T) {
371378
t.Errorf("Different content, merge failed")
372379
}
373380
}
381+
382+
func TestQuickMerge(t *testing.T) {
383+
filePath := []string{"./testdata/quick1.log", "./testdata/quick2.log", "./testdata/quick3.log"}
384+
dstPath := "./testdata/output.log"
385+
errChan := make(chan error, len(filePath))
386+
387+
option := Option{
388+
SrcPath: filePath,
389+
DstPath: dstPath,
390+
ErrChan: errChan,
391+
Goroutine: 3,
392+
CTX: context.Background(),
393+
}
394+
395+
go func() {
396+
for err := range errChan {
397+
fmt.Printf("err: %s", err.Error())
398+
}
399+
}()
400+
err := QuickMerge(option)
401+
if err != nil {
402+
t.Errorf("quick merge error: %s", err.Error())
403+
}
404+
405+
res, err := ioutil.ReadFile(dstPath)
406+
if err != nil {
407+
t.Errorf("read file %s error: %s", dstPath, err.Error())
408+
}
409+
410+
if string(res) != EXPECTED4 {
411+
t.Errorf("Different content, quick merge failed")
412+
}
413+
}

testdata/output.log

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
2020/01/18 12:20:30 [error] 177003#0: *1004128358 recv() failed (104: Connection reset by peer)
2+
2020/01/18 12:20:30 [error] 177003#0: *1004128358 recv() failed (104: Connection reset by peer)
3+
2020/01/18 12:20:30 [error] 177003#0: *1004128358 recv() failed (104: Connection reset by peer)

testdata/quick1.log

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
2020/01/18 12:20:30 [error] 177003#0: *1004128358 recv() failed (104: Connection reset by peer)

testdata/quick2.log

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
2020/01/18 12:20:30 [error] 177003#0: *1004128358 recv() failed (104: Connection reset by peer)

testdata/quick3.log

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
2020/01/18 12:20:30 [error] 177003#0: *1004128358 recv() failed (104: Connection reset by peer)

0 commit comments

Comments
 (0)