This repository has been archived by the owner on Aug 3, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
buffering_manager.go
226 lines (195 loc) · 6.53 KB
/
buffering_manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
// Copyright 2017 Google Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package apidAnalytics
import (
"bufio"
"compress/gzip"
"crypto/rand"
"encoding/json"
"fmt"
"os"
"path/filepath"
"sync"
"time"
)
const fileExtension = ".txt.gz"
// Channel where analytics records are buffered before being dumped to a
// file as write to file should not performed in the Http Thread
var internalBuffer chan axRecords
// channel to indicate that internalBuffer channel is closed
var doneInternalBufferChan chan bool
// Channel where close bucket event is published i.e. when a bucket
// is ready to be closed based on collection interval
var closeBucketEvent chan bucket
// channel to indicate that closeBucketEvent channel is closed
var doneClosebucketChan chan bool
// Map from timestampt to bucket
var bucketMap map[int64]bucket
// RW lock for bucketMap since the cache can be
// read while its being written to and vice versa
var bucketMaplock = sync.RWMutex{}
type bucket struct {
keyTS int64
DirName string
// We need file handle and writer to close the file
FileWriter fileWriter
}
// This struct will store open file handle and writer to close the file
type fileWriter struct {
file *os.File
gw *gzip.Writer
bw *bufio.Writer
}
func initBufferingManager() {
internalBuffer = make(chan axRecords,
config.GetInt(analyticsBufferChannelSize))
closeBucketEvent = make(chan bucket)
doneInternalBufferChan = make(chan bool)
doneClosebucketChan = make(chan bool)
bucketMaplock.Lock()
bucketMap = make(map[int64]bucket)
bucketMaplock.Unlock()
// Keep polling the internal buffer for new messages
go func() {
for records := range internalBuffer {
err := save(records)
if err != nil {
log.Errorf("Could not save %d messages to file"+
" due to: %v", len(records.Records), err)
}
}
// indicates a close signal was sent on the channel
log.Debugf("Closing channel internal buffer")
doneInternalBufferChan <- true
}()
// Keep polling the closeEvent channel to see if bucket is ready to be closed
go func() {
for bucket := range closeBucketEvent {
log.Debugf("Close Event received for bucket: %s",
bucket.DirName)
// close open file
closeGzipFile(bucket.FileWriter)
dirToBeClosed := filepath.Join(localAnalyticsTempDir, bucket.DirName)
stagingPath := filepath.Join(localAnalyticsStagingDir, bucket.DirName)
// close files in tmp folder and move directory to
// staging to indicate its ready for upload
err := os.Rename(dirToBeClosed, stagingPath)
if err != nil {
log.Errorf("Cannot move directory '%s' from"+
" tmp to staging folder due to '%s", bucket.DirName, err)
} else {
// Remove bucket from bucket map once its closed successfully
bucketMaplock.Lock()
delete(bucketMap, bucket.keyTS)
bucketMaplock.Unlock()
}
}
// indicates a close signal was sent on the channel
log.Debugf("Closing channel close bucketevent")
doneClosebucketChan <- true
}()
}
// Save records to correct file based on what timestamp data is being collected for
func save(records axRecords) error {
bucket, err := getBucketForTimestamp(time.Now().UTC(), records.Tenant)
if err != nil {
return err
}
writeGzipFile(bucket.FileWriter, records.Records)
return nil
}
func getBucketForTimestamp(now time.Time, tenant tenant) (bucket, error) {
// first based on current timestamp and collection interval,
// determine the timestamp of the bucket
ts := now.Unix() / int64(config.GetInt(analyticsCollectionInterval)) * int64(config.GetInt(analyticsCollectionInterval))
bucketMaplock.RLock()
b, exists := bucketMap[ts]
bucketMaplock.RUnlock()
if exists {
return b, nil
} else {
timestamp := time.Unix(ts, 0).UTC().Format(timestampLayout)
// endtimestamp of bucket = starttimestamp + collectionInterval
endTime := time.Unix(ts+int64(config.GetInt(analyticsCollectionInterval)), 0)
endtimestamp := endTime.UTC().Format(timestampLayout)
dirName := tenant.Org + "~" + tenant.Env + "~" + timestamp
newPath := filepath.Join(localAnalyticsTempDir, dirName)
// create dir
err := os.Mkdir(newPath, os.ModePerm)
if err != nil {
return bucket{}, fmt.Errorf("Cannot create directory "+
"'%s' to buffer messages '%v'", dirName, err)
}
// create file for writing
// Format: <4DigitRandomHex>_<TSStart>.<TSEnd>_<APIDINSTANCEUUID>_writer_0.txt.gz
fileName := getRandomHex() + "_" + timestamp + "." +
endtimestamp + "_" +
config.GetString("apigeesync_apid_instance_id") +
"_writer_0" + fileExtension
completeFilePath := filepath.Join(newPath, fileName)
fw, err := createGzipFile(completeFilePath)
if err != nil {
return bucket{}, err
}
newBucket := bucket{keyTS: ts, DirName: dirName, FileWriter: fw}
bucketMaplock.Lock()
bucketMap[ts] = newBucket
bucketMaplock.Unlock()
//Send event to close directory after endTime + 5
// seconds to make sure all buffers are flushed to file
timer := time.After(endTime.Sub(time.Now().UTC()) + time.Second*5)
go func() {
<-timer
closeBucketEvent <- newBucket
}()
return newBucket, nil
}
}
// 4 digit Hex is prefixed to each filename to improve
// how s3 partitions the files being uploaded
func getRandomHex() string {
buff := make([]byte, 2)
rand.Read(buff)
return fmt.Sprintf("%x", buff)
}
func createGzipFile(s string) (fileWriter, error) {
file, err := os.OpenFile(s, os.O_WRONLY|os.O_CREATE, os.ModePerm)
if err != nil {
return fileWriter{},
fmt.Errorf("Cannot create file '%s' "+
"to buffer messages '%v'", s, err)
}
gw := gzip.NewWriter(file)
bw := bufio.NewWriter(gw)
return fileWriter{file, gw, bw}, nil
}
func writeGzipFile(fw fileWriter, records []interface{}) {
// write each record as a new line to the bufferedWriter
for _, eachRecord := range records {
s, _ := json.Marshal(eachRecord)
_, err := (fw.bw).WriteString(string(s))
if err != nil {
log.Errorf("Write to file failed '%v'", err)
}
(fw.bw).WriteString("\n")
}
// Flush entire batch of records to file vs each message
fw.bw.Flush()
fw.gw.Flush()
}
func closeGzipFile(fw fileWriter) {
fw.bw.Flush()
fw.gw.Close()
fw.file.Close()
}