-
Notifications
You must be signed in to change notification settings - Fork 24
/
table_builder.cc
executable file
·309 lines (272 loc) · 11.1 KB
/
table_builder.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "leveldb/table_builder.h"
#include <cassert>
#include "leveldb/comparator.h"
#include "leveldb/env.h"
#include "leveldb/filter_policy.h"
#include "leveldb/options.h"
#include "table/block_builder.h"
#include "table/filter_block.h"
#include "table/format.h"
#include "util/coding.h"
#include "util/crc32c.h"
namespace leveldb {
struct TableBuilder::Rep {
Rep(const Options& opt, WritableFile* f)
: options(opt),
index_block_options(opt),
file(f),
offset(0),
data_block(&options),
index_block(&index_block_options),
num_entries(0),
closed(false),
filter_block(opt.filter_policy == nullptr
? nullptr
: new FilterBlockBuilder(opt.filter_policy)),
pending_index_entry(false) {
index_block_options.block_restart_interval = 1;
}
Options options;
Options index_block_options;
WritableFile* file; // sstable 文件
uint64_t offset; // 下一个要写入的 DataBlock 在 sstable 文件中的 offset
Status status;
BlockBuilder data_block; // 正在构建中的 DataBlock
BlockBuilder index_block; // 当前 sstable 的 IndexBlock
std::string last_key; // 当前 DataBlock 最后一个 key
int64_t num_entries; // 当前 DataBlock 中键值对的个数
bool closed; // Either Finish() or Abandon() has been called.
FilterBlockBuilder* filter_block;
// We do not emit the index entry for a block until we have seen the
// first key for the next data block. This allows us to use shorter
// keys in the index block. For example, consider a block boundary
// between the keys "the quick brown fox" and "the who". We can use
// "the r" as the key for the index block entry since it is >= all
// entries in the first block and < all entries in subsequent
// blocks.
//
// Invariant: r->pending_index_entry is true only if data_block is empty.
// 直到遇到下一个 DataBlock 的第一个 key 时,我们才为上一个 DataBlock 生成 index entry, 这样我们可以为 index 使用较短的key
// 比如上一个 DataBlock 最后一个 key 是 "the quick brown fox", 下一个 data block的第一个key是"the who"
// 我们就可以用一个较短的字符串"the r" 作为上一个 DataBlock 的 index entry 的 key。
//
// pending_index_entry 当且仅当上一个 DataBlock 写入完毕但 index entry 尚未写入时为 true
// 当 pending_handle == true 时,pending_handle 字段储存了即将写入 index 的 DataBlock 的指针
bool pending_index_entry;
BlockHandle pending_handle; // Handle to add to index block
std::string compressed_output; // 压缩时用的输出缓冲区
};
TableBuilder::TableBuilder(const Options& options, WritableFile* file)
: rep_(new Rep(options, file)) {
if (rep_->filter_block != nullptr) {
rep_->filter_block->StartBlock(0);
}
}
TableBuilder::~TableBuilder() {
assert(rep_->closed); // Catch errors where caller forgot to call Finish()
delete rep_->filter_block;
delete rep_;
}
Status TableBuilder::ChangeOptions(const Options& options) {
// Note: if more fields are added to Options, update
// this function to catch changes that should not be allowed to
// change in the middle of building a Table.
if (options.comparator != rep_->options.comparator) {
return Status::InvalidArgument("changing comparator while building table");
}
// Note that any live BlockBuilders point to rep_->options and therefore
// will automatically pick up the updated options.
rep_->options = options;
rep_->index_block_options = options;
rep_->index_block_options.block_restart_interval = 1;
return Status::OK();
}
void TableBuilder::Add(const Slice& key, const Slice& value) {
Rep* r = rep_;
assert(!r->closed); // 没有调用过 Finish() 或 Abanbon()
if (!ok()) return; // 状态正常
if (r->num_entries > 0) { // key 大于上一个 key
assert(r->options.comparator->Compare(key, Slice(r->last_key)) > 0);
}
if (r->pending_index_entry) {
// 上一个 DataBlock 写入完成但还未写入 index entry, 在写入新的键值对之前先把 index entry 补上
// 可以参考 Rep::pending_index_entry 字段的注释或 06-SSTableBuiler.md 了解具体机制
// 寻找大于 last key 且小于 key 的最短字符串,作为指向上一个 DataBlock 的索引 key
// 比如 last_key:"the quick brown fox" 和 key:"the who" 之间的一个 ShortestSeparator 是 "the r"
// last_key:"the quick brown fox" 所在的 DataBlock 的 index entry 为 "the r" -> BlockHandle to the DataBlock
// 在进行查询时,可以通过 IndexBlock 中的 index entry 二分查找快速找到某个 key 可能存在的 DataBlock
assert(r->data_block.empty());
// FindShortestSeparator 负责找到介于两个 Block 中间的字符串作为索引值, 并将它写入r->last_key
r->options.comparator->FindShortestSeparator(&r->last_key, key);
std::string handle_encoding;
r->pending_handle.EncodeTo(&handle_encoding);
r->index_block.Add(r->last_key, Slice(handle_encoding));
r->pending_index_entry = false;
}
if (r->filter_block != nullptr) {
// 将 key 加入到 filter block 中
r->filter_block->AddKey(key);
}
r->last_key.assign(key.data(), key.size());
r->num_entries++;
r->data_block.Add(key, value);
const size_t estimated_block_size = r->data_block.CurrentSizeEstimate();
if (estimated_block_size >= r->options.block_size) {
Flush();
}
}
// 将缓存中的 DataBlock 刷新到磁盘, 此后的加入的键值对会存入新的 DataBlock 中
void TableBuilder::Flush() {
Rep* r = rep_;
assert(!r->closed);
if (!ok()) return;
if (r->data_block.empty()) return;
assert(!r->pending_index_entry);
// 将 data_block 写入文件,data_block 的指针会被存入 pending_handle 中
WriteBlock(&r->data_block, &r->pending_handle);
if (ok()) {
// pending_index_entry = true 说明 data_block 已经写入
// 但是 index_entry 的 key 要在添加下个 key 的时候才能确定
r->pending_index_entry = true;
r->status = r->file->Flush();
}
if (r->filter_block != nullptr) {
// 将下一个 DataBlock 的开头偏移量传给 filter builder,同时构建 filter block
// 第一个 DataBlock 的 FilterBuilder::StartBlock 在 TableBuilder 构造时调用
r->filter_block->StartBlock(r->offset);
}
}
// 写入一个 Block, 并将它的指针存储在 *handle 中
// 可能进行压缩,具体的写入过程在 WriteRawBlock 中
void TableBuilder::WriteBlock(BlockBuilder* block, BlockHandle* handle) {
// File format contains a sequence of blocks where each block has:
// block_data: uint8[n]
// type: uint8
// crc: uint32
assert(ok());
Rep* r = rep_;
Slice raw = block->Finish();
Slice block_contents;
CompressionType type = r->options.compression;
// TODO(postrelease): Support more compression options: zlib?
switch (type) {
case kNoCompression:
block_contents = raw;
break;
case kSnappyCompression: {
std::string* compressed = &r->compressed_output;
if (port::Snappy_Compress(raw.data(), raw.size(), compressed) &&
compressed->size() < raw.size() - (raw.size() / 8u)) {
block_contents = *compressed;
} else {
// Snappy not supported, or compressed less than 12.5%, so just
// store uncompressed form
block_contents = raw;
type = kNoCompression;
}
break;
}
case kZstdCompression: {
std::string* compressed = &r->compressed_output;
if (port::Zstd_Compress(r->options.zstd_compression_level, raw.data(),
raw.size(), compressed) &&
compressed->size() < raw.size() - (raw.size() / 8u)) {
block_contents = *compressed;
} else {
// Zstd not supported, or compressed less than 12.5%, so just
// store uncompressed form
block_contents = raw;
type = kNoCompression;
}
break;
}
}
WriteRawBlock(block_contents, type, handle);
r->compressed_output.clear();
block->Reset();
}
// 实际写入 Block 的代码
void TableBuilder::WriteRawBlock(const Slice& block_contents,
CompressionType type, BlockHandle* handle) {
Rep* r = rep_;
// 将 block 指针存入 handle 中
handle->set_offset(r->offset);
handle->set_size(block_contents.size());
r->status = r->file->Append(block_contents);
if (r->status.ok()) {
// 添加 block data 之后的 type 和 crc 字段
char trailer[kBlockTrailerSize];
trailer[0] = type;
uint32_t crc = crc32c::Value(block_contents.data(), block_contents.size());
crc = crc32c::Extend(crc, trailer, 1); // Extend crc to cover block type
EncodeFixed32(trailer + 1, crc32c::Mask(crc));
r->status = r->file->Append(Slice(trailer, kBlockTrailerSize));
if (r->status.ok()) {
r->offset += block_contents.size() + kBlockTrailerSize;
}
}
}
Status TableBuilder::status() const { return rep_->status; }
Status TableBuilder::Finish() {
Rep* r = rep_;
Flush();
assert(!r->closed);
r->closed = true;
BlockHandle filter_block_handle, metaindex_block_handle, index_block_handle;
// 构建 filter block
// Write filter block
if (ok() && r->filter_block != nullptr) {
WriteRawBlock(r->filter_block->Finish(), kNoCompression,
&filter_block_handle);
}
// 构建 metaindex block,目前只有一条指向 FilterBlock 的记录
// Write metaindex block
if (ok()) {
BlockBuilder meta_index_block(&r->options);
if (r->filter_block != nullptr) {
// Add mapping from "filter.Name" to location of filter data
std::string key = "filter.";
key.append(r->options.filter_policy->Name());
std::string handle_encoding;
filter_block_handle.EncodeTo(&handle_encoding);
meta_index_block.Add(key, handle_encoding);
}
// TODO(postrelease): Add stats and other meta blocks
WriteBlock(&meta_index_block, &metaindex_block_handle);
}
// Write index block
if (ok()) {
if (r->pending_index_entry) {
r->options.comparator->FindShortSuccessor(&r->last_key);
std::string handle_encoding;
r->pending_handle.EncodeTo(&handle_encoding);
r->index_block.Add(r->last_key, Slice(handle_encoding));
r->pending_index_entry = false;
}
WriteBlock(&r->index_block, &index_block_handle);
}
// Write footer
if (ok()) {
Footer footer;
footer.set_metaindex_handle(metaindex_block_handle);
footer.set_index_handle(index_block_handle);
std::string footer_encoding;
footer.EncodeTo(&footer_encoding);
r->status = r->file->Append(footer_encoding);
if (r->status.ok()) {
r->offset += footer_encoding.size();
}
}
return r->status;
}
void TableBuilder::Abandon() {
Rep* r = rep_;
assert(!r->closed);
r->closed = true;
}
uint64_t TableBuilder::NumEntries() const { return rep_->num_entries; }
uint64_t TableBuilder::FileSize() const { return rep_->offset; }
} // namespace leveldb