diff --git a/.idea/workspace.xml b/.idea/workspace.xml index 31a1a5b0..df0281e3 100644 --- a/.idea/workspace.xml +++ b/.idea/workspace.xml @@ -44,9 +44,7 @@ - - - + @@ -877,12 +872,16 @@ + - - - @@ -955,7 +948,9 @@ - diff --git a/Consensusd.cpp b/Consensusd.cpp index 98741401..78b4afbb 100644 --- a/Consensusd.cpp +++ b/Consensusd.cpp @@ -65,9 +65,10 @@ int main( int argc, char** argv ) { engine.slowStartBootStrapTest(); - sleep( 20 ); + while ( engine.getStatus() != CONSENSUS_EXITED ) { + usleep( 100 * 1000 ); + } - engine.exitGracefullyBlocking(); cerr << "Exited" << endl; #ifdef GOOGLE_PROFILE diff --git a/Consensust.cpp b/Consensust.cpp index 06ffc907..39d8f67c 100644 --- a/Consensust.cpp +++ b/Consensust.cpp @@ -23,6 +23,11 @@ #define CATCH_CONFIG_MAIN +#include +#include +#include + + #include #include #include @@ -121,6 +126,11 @@ void testLog( const char* message ) { printf( "TEST_LOG: %s\n", message ); } +void abort_handler( int ) { + printf( "cought SIGABRT, exiting.\n" ); + exit( 0 ); +} + block_id basicRun( int64_t _lastId = 0 ) { try { REQUIRE( ConsensusEngine::getEngineVersion().size() > 0 ); @@ -154,7 +164,13 @@ block_id basicRun( int64_t _lastId = 0 ) { REQUIRE( timestampS > 0 ); cerr << price << ":" << stateRoot << endl; - engine->exitGracefullyBlocking(); + signal( SIGABRT, abort_handler ); + engine->exitGracefully(); + + while ( engine->getStatus() != CONSENSUS_EXITED ) { + usleep( 100 * 1000 ); + } + delete engine; return lastId; } catch ( SkaleException& e ) { @@ -168,7 +184,11 @@ bool success = false; void exit_check() { sleep( STUCK_TEST_TIME ); - engine->exitGracefullyBlocking(); + engine->exitGracefully(); + + while ( engine->getStatus() != CONSENSUS_EXITED ) { + usleep( 100 * 1000 ); + } } diff --git a/chains/Schain.cpp b/chains/Schain.cpp index 0c9bb6d1..f7d3e871 100644 --- a/chains/Schain.cpp +++ b/chains/Schain.cpp @@ -732,11 +732,22 @@ void Schain::pushBlockToExtFace( const ptr< CommittedBlock >& _block ) { auto currentPrice = this->pricingAgent->readPrice( _block->getBlockID() - 1 ); + // block boundary is the safesf place for exit + // exit immediately if exit has been requested + // this will initiate immediate exit and throw ExitRequestedException + getSchain()->getNode()->checkForExitOnBlockBoundaryAndExitIfNeeded(); if ( extFace ) { - extFace->createBlock( *tv, _block->getTimeStampS(), _block->getTimeStampMs(), - ( __uint64_t ) _block->getBlockID(), currentPrice, _block->getStateRoot(), - ( uint64_t ) _block->getProposerIndex() ); + try { + inCreateBlock = true; + extFace->createBlock( *tv, _block->getTimeStampS(), _block->getTimeStampMs(), + ( __uint64_t ) _block->getBlockID(), currentPrice, _block->getStateRoot(), + ( uint64_t ) _block->getProposerIndex() ); + inCreateBlock = false; + } catch ( ... ) { + inCreateBlock = false; + throw; + } } // block boundary is the safesf place for exit diff --git a/chains/Schain.h b/chains/Schain.h index 2acec381..1405e03f 100644 --- a/chains/Schain.h +++ b/chains/Schain.h @@ -154,6 +154,7 @@ class Schain : public Agent { TimeStamp lastCommittedBlockTimeStamp; mutex lastCommittedBlockInfoMutex; atomic< uint64_t > proposalReceiptTime = 0; + atomic< bool > inCreateBlock = false; atomic< uint64_t > bootstrapBlockID = 0; @@ -347,6 +348,9 @@ class Schain : public Agent { uint64_t getVerifyDaSigsPatchTimestampS() const; + bool isInCreateBlock() const; + + void finalizeDecidedAndSignedBlock( block_id _blockId, schain_index _proposerIndex, const ptr< ThresholdSignature >& _thresholdSig ); diff --git a/chains/SchainGettersSetters.cpp b/chains/SchainGettersSetters.cpp index 157876bf..9acd8dec 100644 --- a/chains/SchainGettersSetters.cpp +++ b/chains/SchainGettersSetters.cpp @@ -332,3 +332,7 @@ void Schain::setLastCommittedBlockId( uint64_t _lastCommittedBlockId ) { const ptr< OracleClient > Schain::getOracleClient() const { return oracleClient; } + +bool Schain::isInCreateBlock() const { + return inCreateBlock; +} \ No newline at end of file diff --git a/network/ZMQNetwork.cpp b/network/ZMQNetwork.cpp index 01f4e918..3eb0ae6f 100644 --- a/network/ZMQNetwork.cpp +++ b/network/ZMQNetwork.cpp @@ -68,21 +68,8 @@ bool ZMQNetwork::sendMessage( uint64_t ZMQNetwork::interruptableRecv( void* _socket, void* _buf, size_t _len ) { int rc; - - /* - - zmq_pollitem_t items[1]; - items[0].socket = _socket; - items[0].events = ZMQ_POLLIN; - - */ - - for ( ;; ) { if ( this->getNode()->isExitRequested() ) { - zmq_close( _socket ); - LOG( debug, - getThreadName() + "zmq debug: closing = " + to_string( ( uint64_t ) _socket ) ); BOOST_THROW_EXCEPTION( ExitRequestedException( __CLASS_NAME__ ) ); } @@ -90,9 +77,6 @@ uint64_t ZMQNetwork::interruptableRecv( void* _socket, void* _buf, size_t _len ) rc = zmq_recv( _socket, _buf, _len, 0 ); if ( this->getNode()->isExitRequested() ) { - // zmq_close(_socket); - LOG( debug, - getThreadName() + " zmq debug: closing = " + to_string( ( uint64_t ) _socket ) ); BOOST_THROW_EXCEPTION( ExitRequestedException( __CLASS_NAME__ ) ); } @@ -119,8 +103,6 @@ bool ZMQNetwork::interruptableSend( void* _socket, void* _buf, size_t _len ) { int flags = ZMQ_DONTWAIT; if ( this->getNode()->isExitRequested() ) { - zmq_close( _socket ); - LOG( debug, getThreadName() + "zmq debug: closing = " + to_string( ( uint64_t ) _socket ) ); BOOST_THROW_EXCEPTION( ExitRequestedException( __CLASS_NAME__ ) ); } @@ -128,8 +110,6 @@ bool ZMQNetwork::interruptableSend( void* _socket, void* _buf, size_t _len ) { rc = zmq_send( _socket, _buf, _len, flags ); if ( this->getNode()->isExitRequested() ) { - zmq_close( _socket ); - LOG( debug, getThreadName() + "zmq debug: closing = " + to_string( ( uint64_t ) _socket ) ); BOOST_THROW_EXCEPTION( ExitRequestedException( __CLASS_NAME__ ) ); } diff --git a/network/ZMQSockets.cpp b/network/ZMQSockets.cpp index 06fb8c70..bc66a649 100644 --- a/network/ZMQSockets.cpp +++ b/network/ZMQSockets.cpp @@ -59,6 +59,8 @@ void* ZMQSockets::getDestinationSocket( const ptr< NodeInfo >& _remoteNodeInfo ) void* requester = zmq_socket( context, ZMQ_CLIENT ); + CHECK_STATE2( requester, "Could not create ZMQ send socket" ); + int val = CONSENSUS_ZMQ_HWM; auto rc = zmq_setsockopt( requester, ZMQ_RCVHWM, &val, sizeof( val ) ); CHECK_STATE( rc == 0 ); @@ -97,10 +99,13 @@ void* ZMQSockets::getReceiveSocket() { if ( !receiveSocket ) { receiveSocket = zmq_socket( context, ZMQ_SERVER ); + CHECK_STATE2( receiveSocket, "Could not create ZMQ receive socket" ); + int val = CONSENSUS_ZMQ_HWM; auto rc = zmq_setsockopt( receiveSocket, ZMQ_RCVHWM, &val, sizeof( val ) ); CHECK_STATE( rc == 0 ); val = CONSENSUS_ZMQ_HWM; + val = CONSENSUS_ZMQ_HWM; rc = zmq_setsockopt( receiveSocket, ZMQ_SNDHWM, &val, sizeof( val ) ); CHECK_STATE( rc == 0 ); @@ -138,7 +143,10 @@ void ZMQSockets::closeReceive() { LOG( info, "consensus engine exiting: closing receive sockets" ); if ( receiveSocket ) { - zmq_close( receiveSocket ); + if ( zmq_close( receiveSocket ) != 0 ) { + LOG( err, "zmq_close returned an error on receiveSocket;" ); + } + receiveSocket = nullptr; } } @@ -152,7 +160,9 @@ void ZMQSockets::closeSend() { if ( item.second ) { LOG( debug, getThreadName() + " zmq debug in closeSend(): closing " + to_string( ( uint64_t ) item.second ) ); - zmq_close( item.second ); + if ( zmq_close( item.second ) != 0 ) { + LOG( err, "zmq_close returned an error on sendSocket;" ); + } } } } @@ -185,11 +195,8 @@ void ZMQSockets::closeAndCleanupAll() { LOG( err, "Unknown exception in zmq_ctx_term" ); } - LOG( info, "Closed ZMQ" ); + LOG( info, "Closed ZMQ context" ); } -ZMQSockets::~ZMQSockets() { - // last resort - closeAndCleanupAll(); -} +ZMQSockets::~ZMQSockets() {} diff --git a/node/ConsensusEngine.cpp b/node/ConsensusEngine.cpp index 4894b075..0ec2c0bd 100644 --- a/node/ConsensusEngine.cpp +++ b/node/ConsensusEngine.cpp @@ -735,61 +735,46 @@ ConsensusExtFace* ConsensusEngine::getExtFace() const { return extFace; } +void ConsensusEngine::exitGracefully() { + // guaranteed to be called only once + RETURN_IF_PREVIOUSLY_CALLED( exitGracefullyCalled ) -void ConsensusEngine::exitGracefullyBlocking() { - LOG( info, "Consensus engine exiting: exitGracefullyBlocking called by skaled" ); - - cout << "Here is exitGracefullyBlocking() stack trace for your information:" << endl; + Node::exitOnBlockBoundaryRequested = true; + LOG( info, "Consensus exiting: exitGracefully called by skaled" ); + cerr << "Here is stack trace for your info:" << endl; cerr << boost::stacktrace::stacktrace() << endl; + // run and forget + thread( [this]() { exitGracefullyAsync(); } ).detach(); +} - // !! if we don't check this - exitGracefullyAsync() - // will try to exit on deleted object! - - - if ( getStatus() == CONSENSUS_EXITED ) - return; - +// used in tests only +void ConsensusEngine::testExitGracefullyBlocking() { exitGracefully(); - while ( getStatus() != CONSENSUS_EXITED ) { usleep( 100 * 1000 ); } } - - -void ConsensusEngine::exitGracefully() { - LOG( info, "Consensus engine exiting: blocking exit exitGracefully called by skaled" ); - cerr << "Here is exitGracefullyBlocking() stack trace for your information:" << endl; - cerr << boost::stacktrace::stacktrace() << endl; - - if ( getStatus() == CONSENSUS_EXITED ) - return; - - // guaranteedd to be called once - RETURN_IF_PREVIOUSLY_CALLED( exitGracefullyCalled ) - - - // run and forget - thread( [this]() { exitGracefullyAsync(); } ).detach(); -} - consensus_engine_status ConsensusEngine::getStatus() const { return status; } void ConsensusEngine::exitGracefullyAsync() { - LOG( info, "Consensus engine exiting: exitGracefullyAsync called by skaled" ); - - // guaranteed to be executed once RETURN_IF_PREVIOUSLY_CALLED( exitGracefullyAsyncCalled ) + LOG( info, "Consensus engine exiting: exitGracefullyAsync called by skaled" ); try { LOG( info, "exitGracefullyAsync running" ); + // if there is onlu one node we can all + // doSoftAndHardExit in the same + // thread for tests, we have many node objects and + // need many threads + + uint64_t counter = 0; for ( auto&& it : nodes ) { // run and forget @@ -800,7 +785,10 @@ void ConsensusEngine::exitGracefullyAsync() { // run and forget - thread( [node]() { + counter++; + + if ( counter == nodes.size() ) { + // run exit for the last node in the same thread try { LOG( info, "Node exit called" ); node->doSoftAndThenHardExit(); @@ -809,7 +797,18 @@ void ConsensusEngine::exitGracefullyAsync() { SkaleException::logNested( e ); } catch ( ... ) { }; - } ).detach(); + } else { + thread( [node]() { + try { + LOG( info, "Node exit called" ); + node->doSoftAndThenHardExit(); + LOG( info, "Node exit completed" ); + } catch ( exception& e ) { + SkaleException::logNested( e ); + } catch ( ... ) { + }; + } ).detach(); + } } CHECK_STATE( threadRegistry ); @@ -822,18 +821,24 @@ void ConsensusEngine::exitGracefullyAsync() { } catch ( exception& e ) { SkaleException::logNested( e ); status = CONSENSUS_EXITED; + } catch ( ... ) { + status = CONSENSUS_EXITED; } status = CONSENSUS_EXITED; } ConsensusEngine::~ConsensusEngine() { - exitGracefullyBlocking(); + /* the time ConsensusEngine destructor is called, exitGracefully must have been + called. This is just precaution to make sure we do not destroy Consensus engine + and all of its fields before all consensus threads exited. */ - nodes.clear(); + exitGracefully(); - curl_global_cleanup(); + while ( getStatus() != CONSENSUS_EXITED ) { + usleep( 100 * 1000 ); + } - std::cerr << "ConsensusEngine terminated." << std::endl; + curl_global_cleanup(); } @@ -904,16 +909,21 @@ map< string, uint64_t > ConsensusEngine::getConsensusDbUsage() const { // map< string, uint64_t > ret; // ret["blocks.db_disk_usage"] = node->getBlockDB()->getActiveDBSize(); // ret["block_proposal.db_disk_usage"] = - // node->getBlockProposalDB()->getBlockProposalDBSize(); ret["block_sigshare.db_disk_usage"] - // = node->getBlockSigShareDB()->getActiveDBSize(); ret["consensus_state.db_disk_usage"] = - // node->getConsensusStateDB()->getActiveDBSize(); ret["da_proof.db_disk_usage"] = - // node->getDaProofDB()->getDaProofDBSize(); ret["da_sigshare.db_disk_usage"] = - // node->getDaSigShareDB()->getActiveDBSize(); ret["incoming_msg.db_disk_usage"] = - // node->getIncomingMsgDB()->getActiveDBSize(); ret["interna_info.db_disk_usage"] = - // node->getInternalInfoDB()->getActiveDBSize(); ret["outgoing_msg.db_disk_usage"] = + // node->getBlockProposalDB()->getBlockProposalDBSize(); + // ret["block_sigshare.db_disk_usage"] = + // node->getBlockSigShareDB()->getActiveDBSize(); + // ret["consensus_state.db_disk_usage"] = + // node->getConsensusStateDB()->getActiveDBSize(); ret["da_proof.db_disk_usage"] + // = node->getDaProofDB()->getDaProofDBSize(); ret["da_sigshare.db_disk_usage"] = + // node->getDaSigShareDB()->getActiveDBSize(); ret["incoming_msg.db_disk_usage"] + // = node->getIncomingMsgDB()->getActiveDBSize(); + // ret["interna_info.db_disk_usage"] = + // node->getInternalInfoDB()->getActiveDBSize(); + // ret["outgoing_msg.db_disk_usage"] = // node->getOutgoingMsgDB()->getActiveDBSize(); ret["price.db_disk_usage"] = // node->getPriceDB()->getActiveDBSize(); ret["proposal_hash.db_disk_usage"] = - // node->getProposalHashDB()->getProposalHashDBSize(); ret["proposal_vector.db_disk_usage"] = + // node->getProposalHashDB()->getProposalHashDBSize(); + // ret["proposal_vector.db_disk_usage"] = // node->getProposalVectorDB()->getActiveDBSize(); ret["random.db_disk_usage"] = // node->getRandomDB()->getActiveDBSize(); diff --git a/node/ConsensusEngine.h b/node/ConsensusEngine.h index 65fbe4dc..a1a17b99 100644 --- a/node/ConsensusEngine.h +++ b/node/ConsensusEngine.h @@ -114,15 +114,15 @@ class ConsensusEngine : public ConsensusInterface { atomic< bool > exitGracefullyCalled = false; -public: - const map< string, uint64_t >& getPatchTimestamps() const; - -private: ptr< StorageLimits > storageLimits = nullptr; map< string, uint64_t > patchTimestamps; + void exitGracefullyAsync(); + public: + const map< string, uint64_t >& getPatchTimestamps() const; + // used for testing only ptr< map< uint64_t, ptr< NodeInfo > > > testNodeInfosByIndex; ptr< map< uint64_t, ptr< NodeInfo > > > testNodeInfosById; @@ -238,12 +238,13 @@ class ConsensusEngine : public ConsensusInterface { void parseTestConfigsAndCreateAllNodes( const fs_path& dirname, bool _useBlockIDFromConsensus = false ); - void exitGracefullyBlocking(); - - void exitGracefullyAsync(); virtual void exitGracefully() override; + + // used in tests + void testExitGracefullyBlocking(); + /* consensus status for now can be CONSENSUS_ACTIVE and CONSENSUS_EXITED */ virtual consensus_engine_status getStatus() const override; diff --git a/node/Node.cpp b/node/Node.cpp index 488f6370..ee0caca5 100644 --- a/node/Node.cpp +++ b/node/Node.cpp @@ -506,6 +506,8 @@ void Node::doSoftAndThenHardExit() { // guaranteed to execute only once RETURN_IF_PREVIOUSLY_CALLED( exitCalled ) + exitOnBlockBoundaryRequested = true; + // this handles the case when exit is called very early // so that the start barriers were not released yet // then they have to be released for system to start working @@ -522,11 +524,15 @@ void Node::doSoftAndThenHardExit() { return; } - exitOnBlockBoundaryRequested = true; + + // wait until block completes EVM processing + while ( getSchain()->isInCreateBlock() ) { + usleep( 100 * 1000 ); + } auto startTimeMs = Time::getCurrentTimeMs(); - LOG( info, "Node::exit() will to exit on block boundary for " + + LOG( info, "Node::exit() will try to exit on block boundary for " + to_string( CONSENSUS_WAIT_TIME_BEFORE_HARD_EXIT_MS / 1000 ) + " seconds" ); while ( Time::getCurrentTimeMs() < startTimeMs + CONSENSUS_WAIT_TIME_BEFORE_HARD_EXIT_MS ) { @@ -568,7 +574,7 @@ void Node::exitImmediately() { void Node::checkForExitOnBlockBoundaryAndExitIfNeeded() { if ( getSchain()->getNode()->isExitOnBlockBoundaryRequested() ) { // do immediate exit since we are at the safe point - auto msg = "Exiting on block boundary after processing block " + + auto msg = "Exiting on block boundary for block " + to_string( getSchain()->getLastCommittedBlockID() ); LOG( info, msg ); getSchain()->getNode()->exitImmediately(); @@ -714,3 +720,5 @@ bool Node::verifyRealSignatures() const { const map< string, uint64_t >& Node::getPatchTimestamps() const { return patchTimestamps; } + +atomic_bool Node::exitOnBlockBoundaryRequested( false ); diff --git a/node/Node.h b/node/Node.h index f77ccf06..eb4fbf29 100644 --- a/node/Node.h +++ b/node/Node.h @@ -92,17 +92,12 @@ class Node { atomic_bool fatalErrorOccured = false; - atomic_bool exitOnBlockBoundaryRequested = false; - atomic_bool closeAllSocketsCalled = false; - void exitImmediately(); - bool isExitOnBlockBoundaryRequested() const; - ptr< SkaleLog > log = nullptr; string name = ""; @@ -238,6 +233,8 @@ class Node { void closeAllSocketsAndNotifyAllAgentsAndThreads(); public: + static atomic< bool > exitOnBlockBoundaryRequested; + void checkForExitOnBlockBoundaryAndExitIfNeeded(); diff --git a/pendingqueue/PendingTransactionsAgent.cpp b/pendingqueue/PendingTransactionsAgent.cpp index f775140f..b42fe78d 100644 --- a/pendingqueue/PendingTransactionsAgent.cpp +++ b/pendingqueue/PendingTransactionsAgent.cpp @@ -110,12 +110,15 @@ PendingTransactionsAgent::createTransactionsListForProposal() { getSchain()->getNode()->exitCheck(); if ( sChain->getExtFace() ) { + getSchain()->getNode()->checkForExitOnBlockBoundaryAndExitIfNeeded(); txVector = sChain->getExtFace()->pendingTransactions( needMax, stateRoot ); - getSchain()->getNode()->exitCheck(); + // block boundary is the safest place for exit + // exit immediately if exit has been requested + // this will initiate immediate exit and throw ExitRequestedException + getSchain()->getNode()->checkForExitOnBlockBoundaryAndExitIfNeeded(); } else { stateRootSample++; stateRoot = 7; - txVector = sChain->getTestMessageGeneratorAgent()->pendingTransactions( needMax ); } diff --git a/unittests/consensus_tests.cpp b/unittests/consensus_tests.cpp index 13e81f7a..5bcbae39 100644 --- a/unittests/consensus_tests.cpp +++ b/unittests/consensus_tests.cpp @@ -46,7 +46,7 @@ TEST_CASE_METHOD( REQUIRE( engine->nodesCount() > 0 ); REQUIRE( engine->getLargestCommittedBlockID() > 0 ); - engine->exitGracefullyBlocking(); + engine->testExitGracefullyBlocking(); delete engine; SUCCEED(); } @@ -68,7 +68,7 @@ TEST_CASE_METHOD( StartFromScratch, "Get consensus to stuck", "[consensus-stuck] } catch ( ... ) { timer.join(); } - engine->exitGracefullyBlocking(); + engine->testExitGracefullyBlocking(); delete engine; SUCCEED(); } @@ -85,7 +85,7 @@ TEST_CASE_METHOD( REQUIRE( engine->nodesCount() > 0 ); REQUIRE( engine->getLargestCommittedBlockID() == 0 ); - engine->exitGracefullyBlocking(); + engine->testExitGracefullyBlocking(); delete engine; } catch ( SkaleException& e ) { SkaleException::logNested( e );