forked from qdrvm/kagome
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathkagome_db_editor.cpp
512 lines (445 loc) · 16.7 KB
/
kagome_db_editor.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
/**
* Copyright Quadrivium LLC
* All Rights Reserved
* SPDX-License-Identifier: Apache-2.0
*/
#include "storage/buffer_map_types.hpp"
#include "storage/trie/trie_storage_backend.hpp"
#if defined(BACKWARD_HAS_BACKTRACE)
#include <backward.hpp>
#endif
#undef TRUE
#undef FALSE
#include <boost/di.hpp>
#include <soralog/impl/configurator_from_yaml.hpp>
#include "blockchain/block_storage_error.hpp"
#include "blockchain/impl/block_header_repository_impl.hpp"
#include "blockchain/impl/block_storage_impl.hpp"
#include "blockchain/impl/block_tree_impl.hpp"
#include "blockchain/impl/storage_util.hpp"
#include "common/outcome_throw.hpp"
#include "crypto/blake2/blake2b.h"
#include "crypto/hasher/hasher_impl.hpp"
#include "network/impl/extrinsic_observer_impl.hpp"
#include "runtime/common/runtime_upgrade_tracker_impl.hpp"
#include "storage/face/map_cursor.hpp"
#include "storage/predefined_keys.hpp"
#include "storage/rocksdb/rocksdb.hpp"
#include "storage/trie/impl/trie_storage_backend_impl.hpp"
#include "storage/trie/impl/trie_storage_impl.hpp"
#include "storage/trie/polkadot_trie/polkadot_trie_factory_impl.hpp"
#include "storage/trie/serialization/polkadot_codec.hpp"
#include "storage/trie/serialization/trie_serializer_impl.hpp"
#include "storage/trie_pruner/impl/trie_pruner_impl.hpp"
#include "utils/profiler.hpp"
namespace di = boost::di;
using namespace kagome; // NOLINT(google-build-using-namespace)
using namespace storage::trie; // NOLINT(google-build-using-namespace)
using common::BufferOrView;
using common::BufferView;
// NOLINTBEGIN(cppcoreguidelines-pro-bounds-pointer-arithmetic)
struct TrieTracker : storage::BufferStorage {
TrieTracker(storage::BufferStorage &inner) : inner{inner} {}
std::unique_ptr<Cursor> cursor() override {
abort();
}
outcome::result<BufferOrView> get(const BufferView &key) const override {
track(key);
return inner.get(key);
}
outcome::result<std::optional<BufferOrView>> tryGet(
const BufferView &key) const override {
abort();
}
outcome::result<bool> contains(const BufferView &key) const override {
abort();
}
outcome::result<void> put(const BufferView &key,
BufferOrView &&value) override {
abort();
}
outcome::result<void> remove(const common::BufferView &key) override {
abort();
}
void track(BufferView key) const {
keys.emplace(common::Hash256::fromSpan(key).value());
}
bool tracked(BufferView key) const {
return keys.contains(common::Hash256::fromSpan(key).value());
}
// NOLINTNEXTLINE(cppcoreguidelines-avoid-const-or-ref-data-members)
storage::BufferStorage &inner;
mutable std::set<common::Hash256> keys;
};
struct TrieTrackerBackend : TrieStorageBackend {
TrieTrackerBackend(std::shared_ptr<TrieStorageBackend> backend)
: backend{std::move(backend)}, node_tracker{backend->nodes()} {}
storage::BufferStorage &nodes() override {
return node_tracker;
}
storage::BufferStorage &values() override {
return backend->values();
}
std::unique_ptr<storage::BufferSpacedBatch> batch() override {
return backend->batch();
}
std::shared_ptr<TrieStorageBackend> backend;
TrieTracker node_tracker;
};
template <class T>
using sptr = std::shared_ptr<T>;
template <typename T>
struct is_optional : std::false_type {};
template <typename T>
struct is_optional<typename std::optional<T>> : std::true_type {};
template <typename T>
inline auto check(T &&res) {
if (not res.has_value()) {
if constexpr (is_optional<T>::value) {
throw std::runtime_error("No value");
} else {
kagome::common::raise(res.error());
}
}
return std::forward<T>(res);
}
namespace {
static const std::string embedded_config(R"(
# ----------------
sinks:
- name: console
type: console
thread: none
color: false
latency: 0
groups:
- name: main
sink: console
level: trace
is_fallback: true
children:
- name: kagome-db-editor
- name: trie
level: debug
- name: storage
- name: changes_trie
- name: blockchain
- name: profile
# ----------------
)");
}
class Configurator : public soralog::ConfiguratorFromYAML {
public:
Configurator() : ConfiguratorFromYAML(embedded_config) {}
};
enum ArgNum : uint8_t { DB_PATH = 1, STATE_HASH, MODE };
enum Command : uint8_t { COMPACT, DUMP };
void usage() {
std::string help(R"(
Kagome DB Editor - a storage pruner. Allows to reduce occupied disk space.
Usage:
kagome db-editor <db-path>
<db-path> full or relative path to kagome database. It is usually path
polkadot/db inside base path set in kagome options.
Example:
kagome-db-editor base-path/polkadot/db
)");
std::cout << help;
}
outcome::result<std::unique_ptr<TrieBatch>> persistent_batch(
const std::unique_ptr<TrieStorageImpl> &trie, const RootHash &hash) {
OUTCOME_TRY(batch, trie->getPersistentBatchAt(hash, std::nullopt));
auto cursor = batch->trieCursor();
auto res = check(cursor->next());
int count = 0;
auto log = log::createLogger("main", "kagome-db-editor");
{
TicToc t1("Process state.", log);
while (cursor->key().has_value()) {
count++;
res = check(cursor->next());
}
}
log->trace("{} keys were processed at the state.", ++count);
return batch;
}
void child_storage_root_hashes(const std::unique_ptr<TrieBatch> &batch,
std::set<RootHash> &hashes) {
auto log = log::createLogger("main", "kagome-db-editor");
const auto &child_prefix = storage::kChildStorageDefaultPrefix;
auto cursor = batch->trieCursor();
auto res = cursor->seekUpperBound(child_prefix);
if (res.has_value()) {
auto key = cursor->key();
while (key.has_value() && startsWith(key.value(), child_prefix)) {
if (auto value_res = batch->tryGet(key.value());
value_res.has_value() && value_res.value().has_value()) {
auto &value_opt = value_res.value();
log->trace("Found child root hash {}", *value_opt);
hashes.insert(common::Hash256::fromSpan(*value_opt).value());
}
res = cursor->next();
key = cursor->key();
}
}
}
auto is_hash(const char *s) {
// NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
return s[0] == '0' and s[1] == 'x'
and std::strlen(s) == common::Hash256::size();
}
int db_editor_main(int argc, const char **argv) {
#if defined(BACKWARD_HAS_BACKTRACE)
backward::SignalHandling sh;
#endif
libp2p::common::FinalAction flush_std_streams_at_exit([] {
std::cout.flush();
std::cerr.flush();
});
Command cmd; // NOLINT(cppcoreguidelines-init-variables)
if (argc == 2 or (argc == 3 && is_hash(argv[2]))
or (argc == 4 and std::strcmp(argv[MODE], "compact") == 0)) {
cmd = COMPACT;
} else if (argc == 4 and std::strcmp(argv[MODE], "dump") == 0) {
cmd = DUMP;
} else {
usage();
return 0;
}
std::optional<RootHash> target_state_param;
if (argc > 2) {
if (!is_hash(argv[2])) {
std::cout << "ERROR: Invalid state hash\n";
usage();
return -1;
}
target_state_param = RootHash::fromHexWithPrefix(argv[2]).value();
}
auto log = log::createLogger("main", "kagome-db-editor");
common::Buffer prefix{};
bool need_additional_compaction = false;
{
auto factory = std::make_shared<PolkadotTrieFactoryImpl>();
std::shared_ptr<storage::RocksDb> storage;
std::shared_ptr<storage::BufferBatchableStorage> buffer_storage;
try {
storage =
storage::RocksDb::create(argv[DB_PATH], rocksdb::Options()).value();
storage->dropColumn(storage::Space::kBlockBody).value();
buffer_storage = storage->getSpace(storage::Space::kDefault);
} catch (std::system_error &e) {
log->error("{}", e.what());
usage();
return 0;
}
auto trie_node_tracker = std::make_shared<TrieTrackerBackend>(
std::make_shared<TrieStorageBackendImpl>(storage));
auto injector = di::make_injector(
di::bind<TrieSerializer>.to([](const auto &injector) {
return std::make_shared<TrieSerializerImpl>(
injector.template create<sptr<PolkadotTrieFactory>>(),
injector.template create<sptr<Codec>>(),
injector.template create<sptr<TrieStorageBackend>>());
}),
di::bind<TrieStorageBackend>.to(trie_node_tracker),
di::bind<storage::trie_pruner::TriePruner>.to(
std::shared_ptr<storage::trie_pruner::TriePruner>(nullptr)),
di::bind<Codec>.to([](const auto &injector) {
return std::make_shared<PolkadotCodec>(kagome::crypto::blake2b<32>);
}),
di::bind<PolkadotTrieFactory>.to(factory),
di::bind<crypto::Hasher>.template to<crypto::HasherImpl>(),
di::bind<blockchain::BlockHeaderRepository>.template to<blockchain::BlockHeaderRepositoryImpl>(),
di::bind<network::ExtrinsicObserver>.template to<network::ExtrinsicObserverImpl>());
auto hasher = injector.template create<sptr<crypto::Hasher>>();
auto block_storage =
check(blockchain::BlockStorageImpl::create({}, storage, hasher))
.value();
auto block_tree_leaf_hashes =
check(block_storage->getBlockTreeLeaves()).value();
BOOST_ASSERT_MSG(not block_tree_leaf_hashes.empty(),
"Must be known or calculated at least one leaf");
// Find the least and best leaf
std::set<primitives::BlockInfo> leafs;
primitives::BlockInfo least_leaf(
std::numeric_limits<primitives::BlockNumber>::max(), {});
primitives::BlockInfo best_leaf(
std::numeric_limits<primitives::BlockNumber>::min(), {});
for (auto hash : block_tree_leaf_hashes) {
auto number = check(check(block_storage->getBlockHeader(hash)).value())
.value()
.number;
const auto &leaf = *leafs.emplace(number, hash).first;
SL_TRACE(log, "Leaf {} found", leaf);
if (leaf.number <= least_leaf.number) {
least_leaf = leaf;
}
if (leaf.number >= best_leaf.number) {
best_leaf = leaf;
}
}
primitives::BlockInfo last_finalized_block;
primitives::BlockHeader last_finalized_block_header;
storage::trie::RootHash last_finalized_block_state_root;
storage::trie::RootHash after_finalized_block_state_root;
std::set<primitives::BlockInfo> to_remove;
// Backward search of finalized block and connect blocks to remove
for (;;) {
auto it = leafs.rbegin();
auto node = leafs.extract((++it).base());
auto &block = node.value();
auto header =
check(check(block_storage->getBlockHeader(block.hash)).value())
.value();
if (header.number == 0) {
last_finalized_block = block;
last_finalized_block_header = header;
last_finalized_block_state_root = header.state_root;
break;
}
auto justifications =
check(block_storage->getJustification(block.hash)).value();
if (justifications.has_value()) {
last_finalized_block = block;
last_finalized_block_header = header;
last_finalized_block_state_root = header.state_root;
break;
}
after_finalized_block_state_root = header.state_root;
leafs.emplace(*header.parentInfo());
to_remove.insert(std::move(node));
}
RootHash target_state =
target_state_param.value_or(last_finalized_block_state_root);
log->trace("Autodetected finalized block is {}, state root is {:l}",
last_finalized_block,
last_finalized_block_state_root);
for (auto &block : std::ranges::reverse_view(to_remove)) {
check(block_storage->removeBlock(block.hash)).value();
}
SL_TRACE(log, "Save {} as single leaf", last_finalized_block);
check(block_storage->setBlockTreeLeaves({last_finalized_block.hash}))
.value();
// we place the only existing state hash at runtime look up key
// it won't work for code substitute
{
std::vector<runtime::RuntimeUpgradeTrackerImpl::RuntimeUpgradeData>
runtime_upgrade_data{};
runtime_upgrade_data.emplace_back(last_finalized_block,
last_finalized_block_header.state_root);
auto encoded_res = check(scale::encode(runtime_upgrade_data));
check(buffer_storage->put(storage::kRuntimeHashesLookupKey,
common::Buffer(encoded_res.value())))
.value();
}
auto trie =
TrieStorageImpl::createFromStorage(
injector.template create<sptr<Codec>>(),
injector.template create<sptr<TrieSerializer>>(),
injector.template create<sptr<storage::trie_pruner::TriePruner>>())
.value();
if (COMPACT == cmd) {
auto batch = check(persistent_batch(trie, target_state)).value();
auto finalized_batch =
check(persistent_batch(trie, target_state)).value();
std::vector<std::unique_ptr<TrieBatch>> child_batches;
{
std::set<RootHash> child_root_hashes;
child_storage_root_hashes(batch, child_root_hashes);
child_storage_root_hashes(finalized_batch, child_root_hashes);
for (const auto &child_root_hash : child_root_hashes) {
auto child_batch_res = persistent_batch(trie, child_root_hash);
if (child_batch_res.has_value()) {
child_batches.emplace_back(std::move(child_batch_res.value()));
} else {
log->error("Child batch {} not found in the storage",
child_root_hash);
}
}
}
auto trie_node_storage = storage->getSpace(storage::Space::kTrieNode);
auto trie_value_storage = storage->getSpace(storage::Space::kTrieValue);
auto track_trie_entries = [&log, &buffer_storage, &prefix](auto storage,
auto tracker) {
auto db_cursor = storage->cursor();
auto db_batch = storage->batch();
auto res = check(db_cursor->seekFirst());
int count = 0;
{
TicToc t2("Process DB.", log);
while (db_cursor->isValid() && db_cursor->key().has_value()) {
auto key = db_cursor->key().value();
if (tracker->node_tracker.tracked(key)) {
db_cursor->next().value();
continue;
}
auto res2 = check(db_batch->remove(key));
count++;
if (not(count % 10000000)) {
log->trace("{} keys were processed at the db.", count);
res2 = check(db_batch->commit());
dynamic_cast<storage::RocksDbSpace *>(buffer_storage.get())
->compact(prefix, check(db_cursor->key()).value());
db_cursor = buffer_storage->cursor();
db_batch = buffer_storage->batch();
res = check(db_cursor->seek(key));
}
res2 = check(db_cursor->next());
}
std::ignore = check(db_batch->commit());
}
log->trace("{} keys were processed at the db.", ++count);
};
track_trie_entries(trie_node_storage, trie_node_tracker);
{
TicToc t4("Compaction 1.", log);
dynamic_cast<storage::RocksDbSpace *>(buffer_storage.get())
->compact(common::Buffer(), common::Buffer());
}
need_additional_compaction = true;
} else if (DUMP == cmd) {
auto batch =
check(trie->getEphemeralBatchAt(last_finalized_block.hash)).value();
auto cursor = batch->trieCursor();
auto res = check(cursor->next());
{
TicToc t1("Dump full state.", log);
int count = 0;
std::ofstream ofs;
ofs.open("hex_full_state.yaml");
ofs << "keys:\n";
while (cursor->key().has_value()) {
ofs << " - " << cursor->key().value().toHex() << "\n";
if (not(++count % 10000)) {
log->trace("{} keys were dumped.", count);
}
res = cursor->next();
}
cursor = batch->trieCursor();
res = check(cursor->next());
ofs << "values:\n";
count = 0;
while (cursor->key().has_value()) {
ofs << " - "
<< check(batch->get(check(cursor->key()).value())).value().view()
<< "\n";
if (not(++count % 50000)) {
log->trace("{} values were dumped.", count);
}
res = check(cursor->next());
}
ofs.close();
}
}
}
if (need_additional_compaction) {
TicToc t5("Compaction 2.", log);
auto storage =
check(storage::RocksDb::create(argv[1], rocksdb::Options())).value();
auto buffer_storage = storage->getSpace(storage::Space::kDefault);
dynamic_cast<storage::RocksDbSpace *>(buffer_storage.get())
->compact(common::Buffer(), common::Buffer());
}
return 0;
}
// NOLINTEND(cppcoreguidelines-pro-bounds-pointer-arithmetic)