Skip to content

Commit 83ccf06

Browse files
committed
asyncfetch and awsfetch
1 parent 16907c6 commit 83ccf06

File tree

13 files changed

+426
-33
lines changed

13 files changed

+426
-33
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
2. batchfetch支持自定义上传Host设置
44
3. 添加awsfetch抓取亚马逊空间数据到七牛空间
55
4. 添加awslist列举亚马逊空间文件
6+
5. 添加了异步抓取命令abfetch
67

78
# 2.3.7
89
1. 加入forbidden命令,可以禁用或者解禁文件

cmd/asyncfetch.go

Lines changed: 267 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,267 @@
1+
package cmd
2+
3+
import (
4+
"fmt"
5+
"os"
6+
"strconv"
7+
"time"
8+
9+
"github.com/qiniu/api.v7/storage"
10+
"github.com/qiniu/qshell/iqshell"
11+
"github.com/spf13/cobra"
12+
)
13+
14+
// NewCmdAsyncFetch 返回一个cobra.Command指针
15+
// 该命令使用七牛异步抓取的接口
16+
func NewCmdAsyncFetch() *cobra.Command {
17+
options := asyncFetchOptions{}
18+
19+
asyncFetch := &cobra.Command{
20+
Use: "abfetch <Bucket> [-i <urlList>]",
21+
Short: "Async Batch fetch network resources to qiniu Bucket",
22+
Args: cobra.ExactArgs(1),
23+
Run: options.Run,
24+
}
25+
26+
asyncFetch.Flags().StringVarP(&options.host, "host", "t", "", "download HOST header")
27+
asyncFetch.Flags().StringVarP(&options.callbackUrl, "callback-url", "a", "", "callback url")
28+
asyncFetch.Flags().StringVarP(&options.callbackBody, "callback-body", "b", "", "callback body")
29+
asyncFetch.Flags().StringVarP(&options.callbackHost, "callback-host", "T", "", "callback HOST")
30+
asyncFetch.Flags().IntVarP(&options.fileType, "storage-type", "g", 0, "storage type")
31+
asyncFetch.Flags().StringVarP(&options.inputFile, "input-file", "i", "", "input file with urls")
32+
asyncFetch.Flags().IntVarP(&options.threadCount, "thread-count", "c", 20, "thread count")
33+
asyncFetch.Flags().StringVarP(&options.successFname, "success-list", "s", "", "success fetch list")
34+
asyncFetch.Flags().StringVarP(&options.failureFname, "failure-list", "e", "", "error fetch list")
35+
36+
return asyncFetch
37+
}
38+
39+
// NewCmdAsyncCheck 用来查询异步抓取的结果
40+
func NewCmdAsyncCheck() *cobra.Command {
41+
42+
asyncCheck := &cobra.Command{
43+
Use: "acheck <Bucket> <ID>",
44+
Short: "Check Async fetch status",
45+
Args: cobra.ExactArgs(2),
46+
Run: func(cmd *cobra.Command, positionalArgs []string) {
47+
bm := iqshell.GetBucketManager()
48+
49+
// let API choose APIHOST
50+
bm.Cfg.ApiHost = ""
51+
52+
ret, err := bm.CheckAsyncFetchStatus(positionalArgs[0], positionalArgs[1])
53+
if err != nil {
54+
fmt.Fprintf(os.Stderr, "CheckAsyncFetchStatus: %v\n", err)
55+
os.Exit(1)
56+
}
57+
fmt.Println(ret)
58+
},
59+
}
60+
return asyncCheck
61+
}
62+
63+
type asyncFetchOptions struct {
64+
// 从指定URL下载时指定的HOST
65+
host string
66+
67+
// 设置了该值,抓取的过程使用文件md5值进行校验, 校验失败不存在七牛空间
68+
md5 string
69+
70+
// 设置了该值, 抓取的过程中使用etag进行校验,失败不保存在存储空间中
71+
etag string
72+
73+
// 抓取成功的回调地址
74+
callbackUrl string
75+
76+
callbackBody string
77+
78+
callbackBodyType string
79+
80+
// 回调时使用的HOST
81+
callbackHost string
82+
83+
// 文件存储类型, 0 标准存储, 1 低频存储
84+
fileType int
85+
86+
// 输入访问地址列表
87+
inputFile string
88+
89+
fetchConfig
90+
}
91+
92+
type asyncItem struct {
93+
id string
94+
url string
95+
key string
96+
size uint64
97+
bucket string
98+
duration int
99+
waiter int
100+
101+
start time.Time
102+
}
103+
104+
func (i *asyncItem) degrade() {
105+
i.duration = i.duration / 2
106+
if i.duration <= 0 {
107+
i.duration = 3
108+
}
109+
}
110+
111+
func (i *asyncItem) estimatDuration() {
112+
if i.duration == 0 {
113+
switch {
114+
case i.size >= 500*MB:
115+
i.duration = 40
116+
case i.size > 200*MB && i.size < 500*MB:
117+
i.duration = 30
118+
case i.size > 100*MB && i.size <= 200*MB:
119+
i.duration = 20
120+
case i.size <= 10*MB:
121+
i.duration = 3
122+
case i.size <= 10*MB:
123+
i.duration = 6
124+
case i.size <= 100*MB:
125+
i.duration = 10
126+
default:
127+
i.duration = 3
128+
}
129+
}
130+
131+
}
132+
133+
func (i *asyncItem) timeEnough() bool {
134+
135+
now := time.Now()
136+
137+
i.estimatDuration()
138+
if now.Sub(i.start) > time.Duration(i.duration)*time.Second {
139+
return true
140+
}
141+
return false
142+
}
143+
144+
func (ao *asyncFetchOptions) Run(cmd *cobra.Command, positionalArgs []string) {
145+
bucket := positionalArgs[0]
146+
147+
var lc chan string
148+
var err error
149+
150+
if ao.inputFile != "" {
151+
lc, err = getLines(ao.inputFile)
152+
if err != nil {
153+
fmt.Fprintf(os.Stderr, "get lines from file: %s: %v\n", ao.inputFile, err)
154+
os.Exit(1)
155+
}
156+
} else {
157+
lc = getLinesFromReader(os.Stdin)
158+
}
159+
ao.initFileExporter()
160+
ao.initBucketManager()
161+
162+
limitc := make(chan struct{}, ao.threadCount)
163+
queuec := make(chan *asyncItem, 1000)
164+
donec := make(chan struct{})
165+
166+
go func() {
167+
168+
for item := range queuec {
169+
counter := 0
170+
for counter < 3 {
171+
if item.timeEnough() {
172+
173+
ret, cErr := ao.bm.CheckAsyncFetchStatus(item.bucket, item.id)
174+
if cErr != nil {
175+
fmt.Fprintf(os.Stderr, "CheckAsyncFetchStatus: %v\n", cErr)
176+
} else if ret.Wait == -1 { // 视频抓取过一次,有可能成功了,有可能失败了
177+
counter += 1
178+
_, err := ao.bm.Stat(item.bucket, item.key)
179+
if err != nil {
180+
fmt.Fprintf(os.Stderr, "Stat: %s: %v\n", item.key, err)
181+
} else {
182+
ao.fileExporter.WriteToSuccessWriter(fmt.Sprintf("%s\t%s\n", item.url, item.key))
183+
fmt.Printf("fetch %s => %s:%s success\n", item.url, item.bucket, item.key)
184+
break
185+
}
186+
}
187+
item.degrade()
188+
}
189+
time.Sleep(3 * time.Second)
190+
}
191+
if counter >= 3 {
192+
ao.fileExporter.WriteToFailedWriter(fmt.Sprintf("%s\t%d\t%s\n", item.url, item.size, item.key))
193+
fmt.Fprintf(os.Stderr, "fetch %s => %s:%s failed\n", item.url, item.bucket, item.key)
194+
}
195+
}
196+
197+
donec <- struct{}{}
198+
}()
199+
200+
var size uint64
201+
var pErr error
202+
for line := range lc {
203+
limitc <- struct{}{}
204+
205+
fields := ParseLine(line, "")
206+
if len(fields) <= 0 {
207+
continue
208+
}
209+
url := fields[0]
210+
if len(fields) >= 2 {
211+
size, pErr = strconv.ParseUint(fields[1], 10, 64)
212+
if pErr != nil {
213+
ao.fileExporter.WriteToFailedWriter(fmt.Sprintf("%s: %v\n", line, pErr))
214+
continue
215+
}
216+
} else {
217+
size = 0
218+
}
219+
saveKey, pError := iqshell.KeyFromUrl(url)
220+
if pError != nil {
221+
ao.fileExporter.WriteToFailedWriter(fmt.Sprintf("%s: %v\n", line, pError))
222+
continue
223+
}
224+
params := storage.AsyncFetchParam{
225+
Url: line,
226+
Host: ao.host,
227+
Bucket: bucket,
228+
Key: saveKey,
229+
CallbackURL: ao.callbackUrl,
230+
CallbackBody: ao.callbackBody,
231+
CallbackBodyType: ao.callbackBodyType,
232+
FileType: ao.fileType,
233+
}
234+
go func(params storage.AsyncFetchParam) {
235+
236+
ret, aerr := ao.bm.AsyncFetch(params)
237+
if aerr != nil {
238+
ao.fileExporter.WriteToFailedWriter(fmt.Sprintf("%s: %v\n", params.Url, aerr))
239+
<-limitc
240+
return
241+
}
242+
queuec <- &asyncItem{
243+
id: ret.Id,
244+
size: size,
245+
waiter: ret.Wait,
246+
key: params.Key,
247+
url: params.Url,
248+
bucket: params.Bucket,
249+
start: time.Now(),
250+
}
251+
252+
<-limitc
253+
}(params)
254+
}
255+
256+
for i := 0; i < ao.threadCount; i++ {
257+
limitc <- struct{}{}
258+
}
259+
close(queuec)
260+
261+
<-donec
262+
}
263+
264+
func init() {
265+
RootCmd.AddCommand(NewCmdAsyncFetch())
266+
RootCmd.AddCommand(NewCmdAsyncCheck())
267+
}

cmd/list.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package cmd
2+
3+
import (
4+
"bufio"
5+
"io"
6+
"os"
7+
)
8+
9+
// getLines 打开文件filename, 把每一行放到c channel中去
10+
func getLines(filename string) (c chan string, err error) {
11+
12+
f, oErr := os.Open(filename)
13+
if oErr != nil {
14+
err = oErr
15+
return
16+
}
17+
c = getLinesFromReader(f)
18+
return
19+
}
20+
21+
func getLinesFromReader(r io.Reader) (c chan string) {
22+
23+
c = make(chan string)
24+
scanner := bufio.NewScanner(r)
25+
26+
go func() {
27+
for scanner.Scan() {
28+
c <- scanner.Text()
29+
}
30+
31+
close(c)
32+
}()
33+
34+
return c
35+
}

0 commit comments

Comments
 (0)