Skip to content
This repository was archived by the owner on Apr 17, 2019. It is now read-only.

Commit 9b98ea7

Browse files
committed
MovedBatchPtr
Signed-off-by: Mikhail Boldyrev <[email protected]>
1 parent 3db6a12 commit 9b98ea7

File tree

19 files changed

+218
-142
lines changed

19 files changed

+218
-142
lines changed

irohad/multi_sig_transactions/impl/mst_processor.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ namespace iroha {
1919
return this->onStateUpdateImpl();
2020
}
2121

22-
rxcpp::observable<DataType> MstProcessor::onPreparedBatches() const {
22+
rxcpp::observable<std::shared_ptr<MovedBatchPtr>>
23+
MstProcessor::onPreparedBatches() const {
2324
return this->onPreparedBatchesImpl();
2425
}
2526

irohad/multi_sig_transactions/impl/mst_processor_impl.cpp

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ namespace iroha {
3535
auto FairMstProcessor::propagateBatchImpl(const iroha::DataType &batch)
3636
-> decltype(propagateBatch(batch)) {
3737
auto state_update = storage_->updateOwnState(batch);
38-
completedBatchesNotify(*state_update.completed_state_);
38+
completedBatchesNotify(state_update.completed_state_);
3939
updatedBatchesNotify(*state_update.updated_state_);
4040
expiredBatchesNotify(
4141
storage_->extractExpiredTransactions(time_provider_->getCurrentTime()));
@@ -57,14 +57,13 @@ namespace iroha {
5757
}
5858

5959
// TODO [IR-1687] Akvinikym 10.09.18: three methods below should be one
60-
void FairMstProcessor::completedBatchesNotify(ConstRefState state) const {
61-
if (not state.isEmpty()) {
62-
auto completed_batches = state.getBatches();
63-
std::for_each(completed_batches.begin(),
64-
completed_batches.end(),
65-
[this](const auto &batch) {
66-
batches_subject_.get_subscriber().on_next(batch);
67-
});
60+
void FairMstProcessor::completedBatchesNotify(
61+
std::vector<std::shared_ptr<MovedBatchPtr>> completed) const {
62+
if (not completed.empty()) {
63+
std::for_each(
64+
completed.begin(), completed.end(), [this](const auto &batch) {
65+
batches_subject_.get_subscriber().on_next(batch);
66+
});
6867
}
6968
}
7069

@@ -106,7 +105,7 @@ namespace iroha {
106105
state_update.updated_state_->transactionsQuantity());
107106

108107
// completed batches
109-
completedBatchesNotify(*state_update.completed_state_);
108+
completedBatchesNotify(state_update.completed_state_);
110109

111110
// expired batches
112111
expiredBatchesNotify(storage_->getDiffState(from, current_time));

irohad/multi_sig_transactions/impl/propagation_to_pcs.cpp

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ MstToPcsPropagation::MstToPcsPropagation(
2323
std::make_unique<InternalStorage>())),
2424
propagation_available_subscription_(
2525
propagation_available.subscribe([this, pcs](size_t available_txs) {
26-
pending_batches_.extract(
26+
pending_batches_.extractMultiple(
2727
[pcs, &available_txs](InternalStorage &storage) {
2828
std::vector<BatchPtr> extracted;
2929
extracted.reserve(storage.pending_batches.size());
@@ -47,12 +47,13 @@ MstToPcsPropagation::~MstToPcsPropagation() {
4747
propagation_available_subscription_.unsubscribe();
4848
}
4949

50-
void MstToPcsPropagation::notifyCompletedBatch(BatchPtr batch) {
51-
if (not pcs_->propagate_batch(batch)) {
52-
if (not pending_batches_.insert(batch)) {
50+
void MstToPcsPropagation::notifyCompletedBatch(
51+
std::shared_ptr<MovedBatchPtr> moved_batch) {
52+
if (not pcs_->propagate_batch(moved_batch->get())) {
53+
if (not pending_batches_.insert(std::move(moved_batch))) {
5354
log_->critical(
5455
"Dropped a completed MST batch because no place left in storage: {}",
55-
batch);
56+
moved_batch->get());
5657
assert(false);
5758
}
5859
}

irohad/multi_sig_transactions/mst_processor.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ namespace iroha {
4747
* Observable emit batches which are prepared for further processing in
4848
* system
4949
*/
50-
rxcpp::observable<DataType> onPreparedBatches() const;
50+
rxcpp::observable<std::shared_ptr<MovedBatchPtr>> onPreparedBatches() const;
5151

5252
/**
5353
* Observable emit expired by time transactions

irohad/multi_sig_transactions/mst_processor_impl.hpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,8 @@ namespace iroha {
7575
* signatures and ready to move forward
7676
* @param state with those batches
7777
*/
78-
void completedBatchesNotify(ConstRefState state) const;
78+
void completedBatchesNotify(
79+
std::vector<std::shared_ptr<MovedBatchPtr>> completed) const;
7980

8081
/**
8182
* Notify subscribers when some of the batches received new signatures, but
@@ -102,7 +103,7 @@ namespace iroha {
102103
rxcpp::subjects::subject<std::shared_ptr<MstState>> state_subject_;
103104

104105
/// use for share completed batches
105-
rxcpp::subjects::subject<DataType> batches_subject_;
106+
rxcpp::subjects::subject<std::shared_ptr<MovedBatchPtr>> batches_subject_;
106107

107108
/// use for share expired batches
108109
rxcpp::subjects::subject<DataType> expired_subject_;

irohad/multi_sig_transactions/mst_types.hpp

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -38,20 +38,6 @@ namespace iroha {
3838
using ConstRefState = ConstRefT<MstState>;
3939

4040
using DataType = BatchPtr;
41-
42-
/**
43-
* Contains result of updating local state:
44-
* - state with completed batches
45-
* - state with updated (still not enough signatures) batches
46-
*/
47-
struct StateUpdateResult {
48-
StateUpdateResult(std::shared_ptr<MstState> completed_state,
49-
std::shared_ptr<MstState> updated_state)
50-
: completed_state_{std::move(completed_state)},
51-
updated_state_{std::move(updated_state)} {}
52-
std::shared_ptr<MstState> completed_state_;
53-
std::shared_ptr<MstState> updated_state_;
54-
};
5541
} // namespace iroha
5642

5743
#endif // IROHA_MST_TYPES_HPP

irohad/multi_sig_transactions/propagation_to_pcs.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ namespace iroha {
2626

2727
virtual ~MstToPcsPropagation();
2828

29-
void notifyCompletedBatch(BatchPtr batch);
29+
void notifyCompletedBatch(std::shared_ptr<MovedBatchPtr> batch);
3030

3131
size_t pendingBatchesQuantity() const;
3232

irohad/multi_sig_transactions/state/impl/mst_state.cpp

Lines changed: 43 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -81,21 +81,13 @@ namespace iroha {
8181
}
8282

8383
StateUpdateResult MstState::operator+=(const DataType &rhs) {
84-
auto state_update = StateUpdateResult{
85-
std::make_shared<MstState>(empty(
86-
completer_, std::make_shared<iroha::StorageLimitDummy>(), log_)),
87-
std::make_shared<MstState>(empty(
88-
completer_, std::make_shared<iroha::StorageLimitDummy>(), log_))};
84+
auto state_update = StateUpdateResult{completer_, log_};
8985
insertOne(state_update, rhs);
9086
return state_update;
9187
}
9288

9389
StateUpdateResult MstState::operator+=(const MstState &rhs) {
94-
auto state_update = StateUpdateResult{
95-
std::make_shared<MstState>(empty(
96-
completer_, std::make_shared<iroha::StorageLimitDummy>(), log_)),
97-
std::make_shared<MstState>(empty(
98-
completer_, std::make_shared<iroha::StorageLimitDummy>(), log_))};
90+
auto state_update = StateUpdateResult{completer_, log_};
9991
return rhs.batches_.access([this, &state_update](const auto &storage) {
10092
for (auto &&rhs_tx : storage.batches.right | boost::adaptors::map_keys) {
10193
this->insertOne(state_update, rhs_tx);
@@ -211,42 +203,48 @@ namespace iroha {
211203
void MstState::insertOne(StateUpdateResult &state_update,
212204
const DataType &rhs_batch) {
213205
log_->info("batch: {}", *rhs_batch);
214-
batches_.extract([this, &state_update, &rhs_batch](
215-
auto &storage) -> std::vector<BatchPtr> {
216-
auto corresponding = storage.batches.right.find(rhs_batch);
217-
if (corresponding == storage.batches.right.end()) {
218-
// when state does not contain transaction
219-
if (this->rawInsert(rhs_batch)) {
220-
// there is enough room for the new batch
221-
BOOST_VERIFY_MSG(state_update.updated_state_->rawInsert(rhs_batch),
222-
"Could not insert new MST batch to state update.");
223-
} else {
224-
// there is not enough room for the new batch
225-
log_->info("Dropped a batch because it did not fit into storage: {}",
226-
*rhs_batch);
227-
}
228-
return {};
229-
}
206+
if (auto opt_completed_batch =
207+
batches_.move([this, &state_update, &rhs_batch](
208+
auto &storage) -> boost::optional<BatchPtr> {
209+
auto corresponding = storage.batches.right.find(rhs_batch);
210+
if (corresponding == storage.batches.right.end()) {
211+
// when state does not contain transaction
212+
if (this->rawInsert(rhs_batch)) {
213+
// there is enough room for the new batch
214+
BOOST_VERIFY_MSG(
215+
state_update.updated_state_->rawInsert(rhs_batch),
216+
"Could not insert new MST batch to state update.");
217+
} else {
218+
// there is not enough room for the new batch
219+
log_->info(
220+
"Dropped a batch because it did not fit into storage: {}",
221+
*rhs_batch);
222+
}
223+
return boost::none;
224+
}
230225

231-
DataType found = corresponding->first;
232-
// Append new signatures to the existing state
233-
auto inserted_new_signatures = mergeSignaturesInBatch(found, rhs_batch);
226+
DataType found = corresponding->first;
227+
// Append new signatures to the existing state
228+
auto inserted_new_signatures =
229+
mergeSignaturesInBatch(found, rhs_batch);
234230

235-
if (completer_->isCompleted(found)) {
236-
// state already has completed transaction,
237-
// remove from state and return it
238-
storage.batches.right.erase(found);
239-
state_update.completed_state_->rawInsert(found);
240-
return {found};
241-
}
231+
if (completer_->isCompleted(found)) {
232+
// state already has completed transaction,
233+
// remove from state and return it
234+
storage.batches.right.erase(found);
235+
return found;
236+
}
242237

243-
// if batch still isn't completed, return it, if new signatures were
244-
// inserted
245-
if (inserted_new_signatures) {
246-
state_update.updated_state_->rawInsert(found);
247-
}
248-
return {};
249-
});
238+
// if batch still isn't completed, return it, if new signatures
239+
// were inserted
240+
if (inserted_new_signatures) {
241+
state_update.updated_state_->rawInsert(found);
242+
}
243+
return boost::none;
244+
})) {
245+
state_update.completed_state_.emplace_back(
246+
std::move(*opt_completed_batch));
247+
}
250248
}
251249

252250
bool MstState::rawInsert(const DataType &rhs_batch) {
@@ -261,7 +259,8 @@ namespace iroha {
261259

262260
void MstState::extractExpiredImpl(const TimeType &current_time,
263261
boost::optional<MstState &> opt_extracted) {
264-
auto extracted = batches_.extract([this, &current_time](auto &storage) {
262+
auto extracted = batches_.extractMultiple([this,
263+
&current_time](auto &storage) {
265264
std::vector<BatchPtr> extracted;
266265
for (auto it = storage.batches.left.begin();
267266
it != storage.batches.left.end()

irohad/multi_sig_transactions/state/mst_state.hpp

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,8 @@ namespace iroha {
8484

8585
using CompleterType = std::shared_ptr<const Completer>;
8686

87+
struct StateUpdateResult;
88+
8789
class MstState {
8890
public:
8991
// -----------------------------| public api |------------------------------
@@ -222,6 +224,22 @@ namespace iroha {
222224
logger::LoggerPtr log_;
223225
};
224226

227+
/**
228+
* Contains result of updating local state:
229+
* - state with completed batches
230+
* - state with updated (still not enough signatures) batches
231+
*/
232+
struct StateUpdateResult {
233+
StateUpdateResult(std::shared_ptr<const Completer> completer,
234+
logger::LoggerPtr log)
235+
: updated_state_(std::make_shared<MstState>(
236+
MstState::empty(std::move(completer),
237+
std::make_shared<iroha::StorageLimitDummy>(),
238+
std::move(log)))) {}
239+
std::vector<std::shared_ptr<MovedBatchPtr>> completed_state_;
240+
std::shared_ptr<MstState> updated_state_;
241+
};
242+
225243
} // namespace iroha
226244

227245
#endif // IROHA_MST_STATE_HPP

0 commit comments

Comments
 (0)