This repository has been archived by the owner on Aug 26, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.go
88 lines (72 loc) · 2.67 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
// Package main implements the main functionality of Gofetch.
package main
import (
"github.com/op/go-logging"
"sync"
"time"
)
// testGofetch must be true when testing to use the appropriate S3 folders.
var testGofetch = false
var log = logging.MustGetLogger("gofetch")
func main() {
mainStart := time.Now()
ConfigureLogger()
CheckEnvVars()
log.Info("Starting gofetch.")
config := ConfigFromS3()
if len(config.Urls) == 0 {
panic("No URLs found in the configuration file.")
}
throttleMap := ThrottleMap(config.Throttlers)
throttled := len(throttleMap)
concWriters := ConcurrentS3Writes()
concFetches := ConcurrentFetches(throttled)
fetchOffset := FetchOffset()
fetchLimit := FetchLimit()
// Checking configuration file URLs to avoid over allocating memory.
actualLimit := fetchOffset + fetchLimit
if actualLimit > len(config.Urls) {
log.Notice("Forcing fetching limit to %d (instead of %d).", len(config.Urls), actualLimit)
actualLimit = len(config.Urls)
}
fetchRange := actualLimit - fetchOffset
log.Notice("Fetching %d URLs of %d in configuration file.", actualLimit-fetchOffset, len(config.Urls))
// s3chan stores up to 100 buffered HttpResponses.
s3chan := make(chan *HTTPFetch, 100)
// fetchChan stores the up to X concurrent scrapes, allows to block when we've reached capacity.
fetchChan := make(chan *URLInfo, concFetches)
// logChan stores all the fetch logs as a result of the overall fetch.
logChan := make(chan *Fetch, fetchRange)
// errChan stores all the fetch errors. It is as long as the logChan in case all fetches fail.
errChan := make(chan *FetchError, fetchRange)
// Using a wait group to make sure not to die prior to all URLs fetched.
var wg sync.WaitGroup
ConfigureRuntime()
// Starting as many concurrent scrapers as requested.
for i := 0; i < concFetches; i++ {
go Fetcher(fetchChan, s3chan, errChan, throttleMap, &wg)
}
// Putting all URLs to fetch to the fetch channel, as determined by the environment.
for _, urlI := range config.Urls[fetchOffset:fetchRange] {
wg.Add(1)
go func(urlI *URLInfo) {
fetchChan <- urlI
}(urlI)
}
// The fetchChan is closed after everything has been processed because failure to process
// a fetch will add it to the channel again.
// Starting the S3 processor.
for i := 0; i < concWriters; i++ {
go ProcessResponses(s3chan, logChan, config.Indexes, &wg)
}
// Wait for completion of both fetching and writing content to S3.
wg.Wait()
close(fetchChan)
close(s3chan)
close(logChan)
close(errChan)
fetchDuration := time.Now().Sub(mainStart)
// Write the log completion file to S3.
LogFetches(logChan, errChan, &fetchDuration)
log.Info("Successfully completed gofetch in %s.", fetchDuration)
}