@@ -103,17 +103,17 @@ class AsyncFileWriteChecker : public IAsyncFile, public ReferenceCounted<AsyncFi
103
103
struct WriteInfo {
104
104
WriteInfo () : checksum(0 ), timestamp(0 ) {}
105
105
uint32_t checksum;
106
- uint32_t timestamp; // keep a precision of ms
106
+ uint64_t timestamp; // keep a precision of ms
107
107
};
108
108
109
- static uint32_t transformTime (double unixTime) { return (uint32_t )(unixTime * millisecondsPerSecond); }
109
+ static uint64_t transformTime (double unixTime) { return (uint64_t )(unixTime * millisecondsPerSecond); }
110
110
111
111
class LRU {
112
112
private:
113
- int64_t step;
113
+ uint64_t step;
114
114
std::string fileName;
115
- std::map<int , uint32_t > stepToKey;
116
- std::map<uint32_t , int > keyToStep; // std::map is to support ::truncate
115
+ std::map<uint64_t , uint32_t > stepToKey;
116
+ std::map<uint32_t , uint64_t > keyToStep; // std::map is to support ::truncate
117
117
std::unordered_map<uint32_t , AsyncFileWriteChecker::WriteInfo> pageContents;
118
118
119
119
public:
@@ -137,7 +137,7 @@ class AsyncFileWriteChecker : public IAsyncFile, public ReferenceCounted<AsyncFi
137
137
auto it = keyToStep.lower_bound (page);
138
138
// iterate through keyToStep, to find corresponding entries in stepToKey
139
139
while (it != keyToStep.end ()) {
140
- int step = it->second ;
140
+ uint64_t step = it->second ;
141
141
auto next = it;
142
142
next++;
143
143
keyToStep.erase (it);
@@ -148,7 +148,7 @@ class AsyncFileWriteChecker : public IAsyncFile, public ReferenceCounted<AsyncFi
148
148
149
149
uint32_t randomPage () {
150
150
if (keyToStep.size () == 0 ) {
151
- return - 1 ;
151
+ return 0 ;
152
152
}
153
153
auto it = keyToStep.begin ();
154
154
std::advance (it, deterministicRandom ()->randomInt (0 , (int )keyToStep.size ()));
@@ -173,7 +173,7 @@ class AsyncFileWriteChecker : public IAsyncFile, public ReferenceCounted<AsyncFi
173
173
174
174
uint32_t leastRecentlyUsedPage () {
175
175
if (stepToKey.size () == 0 ) {
176
- return - 1 ;
176
+ return 0 ;
177
177
}
178
178
return stepToKey.begin ()->second ;
179
179
}
@@ -207,14 +207,14 @@ class AsyncFileWriteChecker : public IAsyncFile, public ReferenceCounted<AsyncFi
207
207
}
208
208
209
209
private:
210
- // transform from unixTime(double) to uint32_t, to retain ms precision.
211
210
Reference<IAsyncFile> m_f;
212
211
Future<Void> checksumWorker;
213
212
Future<Void> checksumLogger;
214
213
LRU lru;
215
214
void * pageBuffer;
216
215
uint64_t totalCheckedFail, totalCheckedSucceed;
217
- uint32_t syncedTime;
216
+ // transform from unixTime(double) to uint64_t, to retain ms precision.
217
+ uint64_t syncedTime;
218
218
// to avoid concurrent operation, so that the continuous reader will skip a page if it is being written
219
219
std::unordered_set<uint32_t > writing;
220
220
// This is the most page checksum history blocks we will use across all files.
@@ -226,7 +226,7 @@ class AsyncFileWriteChecker : public IAsyncFile, public ReferenceCounted<AsyncFi
226
226
// for each page, read and do checksum
227
227
// scan from the least recently used, thus it is safe to quit if data has not been synced
228
228
state uint32_t page = self->lru .leastRecentlyUsedPage ();
229
- while (self->writing .find (page) != self->writing .end () || page == - 1 ) {
229
+ while (self->writing .find (page) != self->writing .end () || page == 0 ) {
230
230
// avoid concurrent ops
231
231
wait (delay (FLOW_KNOBS->ASYNC_FILE_WRITE_CHEKCER_CHECKING_DELAY ));
232
232
continue ;
@@ -253,7 +253,7 @@ class AsyncFileWriteChecker : public IAsyncFile, public ReferenceCounted<AsyncFi
253
253
254
254
// return true if there are still remaining valid synced pages to check, otherwise false
255
255
// this method removes the page entry from checksum history upon a successful check
256
- bool verifyChecksum (int page, uint32_t checksum, uint8_t * start, bool sweep ) {
256
+ bool verifyChecksum (uint32_t page, uint32_t checksum, uint8_t * start) {
257
257
if (!lru.exist (page)) {
258
258
// it has already been verified succesfully and removed by checksumWorker
259
259
return true ;
@@ -265,7 +265,6 @@ class AsyncFileWriteChecker : public IAsyncFile, public ReferenceCounted<AsyncFi
265
265
TraceEvent (SevError, " AsyncFileLostWriteDetected" )
266
266
.error (checksum_failed ())
267
267
.detail (" Filename" , getFilename ())
268
- .detail (" Sweep" , sweep)
269
268
.detail (" PageNumber" , page)
270
269
.detail (" Size" , lru.size ())
271
270
.detail (" Start" , (long )start)
@@ -287,23 +286,20 @@ class AsyncFileWriteChecker : public IAsyncFile, public ReferenceCounted<AsyncFi
287
286
288
287
// Update or check checksum(s) in history for any full pages covered by this operation
289
288
// return the updated pages when updateChecksum is true
290
- std::vector<uint32_t > updateChecksumHistory (bool updateChecksum,
291
- int64_t offset,
292
- int len,
293
- uint8_t * buf,
294
- bool sweep = false ) {
289
+ std::vector<uint32_t > updateChecksumHistory (bool updateChecksum, int64_t offset, int len, uint8_t * buf) {
295
290
std::vector<uint32_t > pages;
296
291
// Check or set each full block in the the range
297
- int page = offset / checksumHistoryPageSize; // First page number
292
+ // page number starts at 1, as we use 0 to indicate invalid page
293
+ uint32_t page = offset / checksumHistoryPageSize + 1 ; // First page number
298
294
int slack = offset % checksumHistoryPageSize; // Bytes after most recent page boundary
299
295
uint8_t * start = buf; // Position in buffer to start checking from
300
296
// If offset is not page-aligned, move to next page and adjust start
301
297
if (slack != 0 ) {
302
298
++page;
303
299
start += (checksumHistoryPageSize - slack);
304
300
}
305
- int startPage = page;
306
- int pageEnd = (offset + len) / checksumHistoryPageSize; // Last page plus 1
301
+ uint32_t startPage = page;
302
+ uint32_t pageEnd = (offset + len) / checksumHistoryPageSize; // Last page plus 1
307
303
while (page < pageEnd) {
308
304
uint32_t checksum = crc32c_append (0xab12fd93 , start, checksumHistoryPageSize);
309
305
#if VALGRIND
@@ -332,7 +328,7 @@ class AsyncFileWriteChecker : public IAsyncFile, public ReferenceCounted<AsyncFi
332
328
history.checksum = checksum;
333
329
lru.update (page, history);
334
330
} else {
335
- if (!verifyChecksum (page, checksum, start, sweep )) {
331
+ if (!verifyChecksum (page, checksum, start)) {
336
332
break ;
337
333
}
338
334
}
0 commit comments