forked from kikinteractive/go-bqstreamer
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathasync_worker.go
94 lines (78 loc) · 2.35 KB
/
async_worker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
package bqstreamer
import "time"
// AsyncWorker implements an asynchronous streamer,
// by wrapping around SyncWorker.
type asyncWorker struct {
worker *SyncWorker
// Upon invoking Start(), the worker will fetch rows from this channel,
// and enqueue them into an internal rows queue (provided by SyncWorker).
rowChan chan Row
// Errors are reported to this channel.
errorChan chan *InsertErrors
// Max amount of rows to enqueue before executing an insert operation to BigQuery.
maxRows int
// Max delay between insert operations to BigQuery.
maxDelay time.Duration
// Shutdown channel to stop Start() execution.
done chan struct{}
// Used to notify the Start() loop has stopped and returned.
closedChan chan struct{}
}
// Start reads rows from rowChan and enqueues them internally.
// It performs an insert operation to BigQuery when the queue has been
// filled or a timer has expired, according to configuration.
//
// NOTE This function is executed in a goroutine,
// and is stopped via calling Close().
func (w *asyncWorker) Start() {
go func(w *asyncWorker) {
// Notify on return.
defer func(stopped chan<- struct{}) {
close(stopped)
}(w.closedChan)
for {
// Perform an insert operation and reset timer
// when one of the following signals is triggered:
select {
case <-w.done:
// Worker should close.
w.insert()
return
case <-time.After(w.maxDelay):
// Time delay between previous insert operation
// has passed
w.insert()
case r := <-w.rowChan:
// A row has been enqueued.
// Insert row to queue.
w.worker.Enqueue(r)
// Don't insert yet if not enough rows have been enqueued.
if len(w.worker.rows) < w.maxRows {
continue
}
w.insert()
}
}
}(w)
}
// Close closes the "done" channel, causing Start()'s infinite loop to stop.
// It returns a channel, which will be closed once the Start()
// loop has returned.
func (w *asyncWorker) Close() <-chan struct{} {
// Notify Start() loop to return.
close(w.done)
return w.closedChan
}
// insert performs an insert operation to BigQuery
// using the internal SyncWorker.
func (w *asyncWorker) insert() {
// No-op if no lines have been enqueued.
if len(w.worker.rows) == 0 {
return
}
insertErrs := w.worker.InsertWithRetry()
// Report errors to error channel if set.
if w.errorChan != nil {
w.errorChan <- insertErrs
}
}