-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcrawler.go
121 lines (104 loc) · 2.48 KB
/
crawler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
package feedcrawler
import (
"time"
"github.com/mmcdole/gofeed"
)
var defaultNumWorkers = 3
// Result is a result of a feed crawling.
type Result struct {
Subscription Subscription
Feed *gofeed.Feed
NewItems []*gofeed.Item
Err error
}
// Crawler is a crawler for RSS and Atom feeds.
type Crawler struct {
Subscriptions []Subscription
States States
NumWorkers int
Parser *gofeed.Parser
}
// Crawl crawls subscribed feeds.
func (fc *Crawler) Crawl() ([]Result, error) {
var results []Result
err := fc.CrawlFunc(func(r Result) {
results = append(results, r)
})
if err != nil {
return nil, err
}
return results, nil
}
// CrawlFunc crawls subscribed feeds and call the func with each result.
func (fc *Crawler) CrawlFunc(f func(Result)) error {
if fc.States == nil {
fc.States = make(States, 0)
}
subscriptions := make(chan Subscription, len(fc.Subscriptions))
results := make(chan Result, len(fc.Subscriptions))
defer close(results)
nw := fc.NumWorkers
if nw <= 0 {
nw = defaultNumWorkers
}
for w := 1; w <= nw; w++ {
go fc.worker(w, subscriptions, results)
}
for _, s := range fc.Subscriptions {
subscriptions <- s
}
close(subscriptions)
for i := 1; i <= len(fc.Subscriptions); i++ {
result := <-results
if fc.States != nil {
fc.States.UpdateState(result)
}
f(result)
}
return nil
}
func (fc *Crawler) worker(id int, subscriptions <-chan Subscription, results chan<- Result) {
var fp *gofeed.Parser
if fc.Parser != nil {
fp = fc.Parser
} else {
fp = gofeed.NewParser()
fp.AtomTranslator = NewEnhancedAtomTranslator()
}
for s := range subscriptions {
feed, err := fp.ParseURL(s.URI())
if err != nil {
results <- Result{
Subscription: s,
Feed: nil,
NewItems: nil,
Err: err,
}
} else {
results <- Result{
Subscription: s,
Feed: feed,
NewItems: fc.selectNewItems(s, feed),
Err: nil,
}
}
}
}
func (fc *Crawler) selectNewItems(s Subscription, feed *gofeed.Feed) []*gofeed.Item {
if fc.States == nil || len(fc.States) == 0 {
return feed.Items
}
var updatedAt time.Time
if state, ok := fc.States[s.ID()]; ok {
updatedAt = state.UpdatedAt
}
var newItems []*gofeed.Item
for _, item := range feed.Items {
published := latestTime(item.PublishedParsed, item.UpdatedParsed)
if published.Before(updatedAt) || published.Equal(updatedAt) || !s.Filter(item) {
continue
}
newItems = append(newItems, item)
}
return newItems
}