-
Notifications
You must be signed in to change notification settings - Fork 51
/
Copy pathresult.go
108 lines (91 loc) · 1.99 KB
/
result.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
package sdk
import (
"context"
"encoding/json"
grpc "github.com/crawlab-team/crawlab-grpc"
"github.com/crawlab-team/crawlab-sdk/entity"
"github.com/crawlab-team/crawlab-sdk/interfaces"
"github.com/crawlab-team/go-trace"
)
var RS *ResultService
type ResultService struct {
// internals
sub grpc.TaskService_SubscribeClient
}
func (svc *ResultService) SaveItem(items ...entity.Result) {
svc.save(items)
}
func (svc *ResultService) SaveItems(items []entity.Result) {
svc.save(items)
}
func (svc *ResultService) save(items []entity.Result) {
var _items []entity.Result
for i, item := range items {
_items = append(_items, item)
if i > 0 && i%50 == 0 {
svc._save(_items)
_items = []entity.Result{}
}
}
if len(_items) > 0 {
svc._save(_items)
}
}
func (svc *ResultService) _save(items []entity.Result) {
// skip if no task id specified
if GetTaskId().IsZero() {
return
}
var records []interface{}
for _, item := range items {
item["_tid"] = GetTaskId()
records = append(records, item)
}
data, err := json.Marshal(&entity.StreamMessageTaskData{
TaskId: GetTaskId(),
Records: records,
})
if err != nil {
trace.PrintError(err)
return
}
if err := svc.sub.Send(&grpc.StreamMessage{
Code: grpc.StreamMessageCode_INSERT_DATA,
Data: data,
}); err != nil {
trace.PrintError(err)
return
}
}
func (svc *ResultService) init() (err error) {
c := GetClient()
taskClient := c.GetTaskClient()
svc.sub, err = taskClient.Subscribe(context.Background())
if err != nil {
return trace.TraceError(err)
}
return nil
}
func GetResultService(opts ...ResultServiceOption) interfaces.ResultService {
if RS != nil {
return RS
}
// service
svc := &ResultService{}
// apply options
for _, opt := range opts {
opt(svc)
}
// initialize
if err := svc.init(); err != nil {
panic(err)
}
RS = svc
return svc
}
func SaveItem(items ...entity.Result) {
GetResultService().SaveItem(items...)
}
func SaveItems(items []entity.Result) {
GetResultService().SaveItems(items)
}