diff --git a/BASHO_RELEASES b/BASHO_RELEASES index 1e438fcd..077012d3 100644 --- a/BASHO_RELEASES +++ b/BASHO_RELEASES @@ -1,3 +1,45 @@ +github.com tag 2.0.34 - February 15, 2017 +----------------------------------------- +mv-hot-backup2: - correct MakeTieredDbname() within db/filename.cc + for case where dbname input is blank and fast/slow + already populated in options. Corrects issue + with hot backup in non-tiered storage situations + +github.com tag 2.0.33 - November 21, 2016 +----------------------------------------- +mv-bucket-expiry: - partial branch to enable X-Riak-Meta-Expiry-Base-Seconds + property within enterprise edition + +github.com tag 2.0.32 - November 8, 2016 +---------------------------------------- + - version shipped with Riak 2.2 +** additional race condition hardening when faced with two threads on same iterator +** (one iterating async_iterator_move() and one closing async_iterator_close()) + - wrap async_iterator_move operations with locked CloseMutex + - create and then use new manual SpinLock within RetrieveItrObject + - adapt origin spin lock code for Solaris / SmartOS compiling + +github.com tag 2.0.31 - November 1, 2016 +---------------------------------------- + - includes leveldb 2.0.31 (mv-no-md-expiry & mv-tuning8) +mv-ref-hardening: - series of thread hardening changes + related to AAE use of iterators. Biggest + fix was isolating AAE using one thread to + close while another was still moving, then + defending against it + +github.com tag 2.0.30 - October 11, 2016 +---------------------------------------- + - includes leveldb 2.0.30 (mv-delayed-bloom) + +github.com tag 2.0.28 - September 7, 2016 +----------------------------------------- +Clarify which compression algorithm used for default: + 1. leveldb open source users: lz4 default + 2. eleveldb open source users: snappy default + 3. riak.conf / app.config users of older generation: snappy default + 4. riak.conf from Riak 2.2: lz4 default + github.com tag 2.0.27 - August 22, 2016 --------------------------------------- mv-mem-fences: fix iterator double delete bug in eleveldb and diff --git a/c_src/build_deps.sh b/c_src/build_deps.sh index c488f2c5..b3b123d1 100755 --- a/c_src/build_deps.sh +++ b/c_src/build_deps.sh @@ -8,7 +8,7 @@ if [ `uname -s` = 'SunOS' -a "${POSIX_SHELL}" != "true" ]; then fi unset POSIX_SHELL # clear it so if we invoke other scripts, they run as ksh as well -LEVELDB_VSN="" +LEVELDB_VSN="mv-nonblocking-write3" SNAPPY_VSN="1.0.4" @@ -65,9 +65,11 @@ case "$1" in ;; *) + export MACOSX_DEPLOYMENT_TARGET=10.8 + if [ ! -d snappy-$SNAPPY_VSN ]; then tar -xzf snappy-$SNAPPY_VSN.tar.gz - (cd snappy-$SNAPPY_VSN && ./configure --prefix=$BASEDIR/system --libdir=$BASEDIR/system/lib --with-pic) + (cd snappy-$SNAPPY_VSN && ./configure --disable-shared --prefix=$BASEDIR/system --libdir=$BASEDIR/system/lib --with-pic) fi if [ ! -f system/lib/libsnappy.a ]; then diff --git a/c_src/eleveldb.cc b/c_src/eleveldb.cc index c8c7cf3b..5f85c7b1 100644 --- a/c_src/eleveldb.cc +++ b/c_src/eleveldb.cc @@ -43,7 +43,7 @@ #include "leveldb/perf_count.h" #define LEVELDB_PLATFORM_POSIX #include "util/hot_threads.h" -#include "leveldb_os/expiry_os.h" +#include "util/expiry_os.h" #ifndef INCL_WORKITEMS_H #include "workitems.h" @@ -250,7 +250,7 @@ ERL_NIF_TERM parse_init_option(ErlNifEnv* env, ERL_NIF_TERM item, EleveldbOption { if (option[0] == eleveldb::ATOM_TOTAL_LEVELDB_MEM) { - size_t memory_sz; + unsigned long memory_sz; if (enif_get_ulong(env, option[1], &memory_sz)) { if (memory_sz != 0) @@ -347,7 +347,7 @@ ERL_NIF_TERM parse_open_option(ErlNifEnv* env, ERL_NIF_TERM item, leveldb::Optio } else if (option[0] == eleveldb::ATOM_BLOCK_CACHE_THRESHOLD) { - size_t memory_sz; + unsigned long memory_sz; if (enif_get_ulong(env, option[1], &memory_sz)) { if (memory_sz != 0) @@ -488,7 +488,7 @@ ERL_NIF_TERM parse_open_option(ErlNifEnv* env, ERL_NIF_TERM item, leveldb::Optio if (option[1] == eleveldb::ATOM_TRUE) { if (NULL==opts.expiry_module.get()) - opts.expiry_module.assign(new leveldb::ExpiryModuleOS); + opts.expiry_module.assign(leveldb::ExpiryModule::CreateExpiryModule()); ((leveldb::ExpiryModuleOS *)opts.expiry_module.get())->expiry_enabled = true; } // if else @@ -503,7 +503,7 @@ ERL_NIF_TERM parse_open_option(ErlNifEnv* env, ERL_NIF_TERM item, leveldb::Optio if (enif_get_ulong(env, option[1], &minutes)) { if (NULL==opts.expiry_module.get()) - opts.expiry_module.assign(new leveldb::ExpiryModuleOS); + opts.expiry_module.assign(leveldb::ExpiryModule::CreateExpiryModule()); ((leveldb::ExpiryModuleOS *)opts.expiry_module.get())->expiry_minutes = minutes; } // if } // else if @@ -512,7 +512,7 @@ ERL_NIF_TERM parse_open_option(ErlNifEnv* env, ERL_NIF_TERM item, leveldb::Optio if (option[1] == eleveldb::ATOM_TRUE) { if (NULL==opts.expiry_module.get()) - opts.expiry_module.assign(new leveldb::ExpiryModuleOS); + opts.expiry_module.assign(leveldb::ExpiryModule::CreateExpiryModule()); ((leveldb::ExpiryModuleOS *)opts.expiry_module.get())->whole_file_expiry = true; } // if else @@ -612,6 +612,21 @@ ERL_NIF_TERM send_reply(ErlNifEnv *env, ERL_NIF_TERM ref, ERL_NIF_TERM reply) return ATOM_OK; } +// Boilerplate for submitting to the thread queue. +// Takes ownership of the item. assumes allocated through new + +ERL_NIF_TERM +submit_to_thread_queue(eleveldb::WorkTask *work_item, ErlNifEnv* env, ERL_NIF_TERM caller_ref){ + eleveldb_priv_data& data = *static_cast(enif_priv_data(env)); + if(false == data.thread_pool.Submit(work_item)) + { + delete work_item; + return send_reply(env, caller_ref, + enif_make_tuple2(env, eleveldb::ATOM_ERROR, caller_ref)); + } // if + return eleveldb::ATOM_OK; +} + ERL_NIF_TERM async_open( ErlNifEnv* env, @@ -666,19 +681,23 @@ async_open( eleveldb::WorkTask *work_item = new eleveldb::OpenTask(env, caller_ref, db_name, opts); - - if(false == priv.thread_pool.Submit(work_item)) - { - delete work_item; - return send_reply(env, caller_ref, - enif_make_tuple2(env, eleveldb::ATOM_ERROR, caller_ref)); - } - - return eleveldb::ATOM_OK; + return submit_to_thread_queue(work_item, env, caller_ref); } // async_open +/** + * async_write: originally designed to always send write requests to + * leveldb via a worker thread (to avoid scheduler collapse). + * Updated Nov. 2016 to write directly to leveldb if blocking + * conditions known to be minimized. + * + * returns: atom "ok" if write completed successfully on caller's thread + * "caller_ref" generated by eleveldb.erl if write request posted + * to worker pool. caller should await message with "ok" or error + * error atom if write failed on caller's thread or posting to worker pool + */ + ERL_NIF_TERM async_write( ErlNifEnv* env, @@ -690,6 +709,8 @@ async_write( const ERL_NIF_TERM& action_ref = argv[2]; const ERL_NIF_TERM& opts_ref = argv[3]; + bool non_blocking; + ReferencePtr db_ptr; db_ptr.assign(DbObject::RetrieveDbObject(env, handle_ref)); @@ -703,40 +724,67 @@ async_write( // is this even possible? if(NULL == db_ptr->m_Db) - return send_reply(env, caller_ref, error_einval(env)); + return error_einval(env); - eleveldb_priv_data& priv = *static_cast(enif_priv_data(env)); + non_blocking=db_ptr->m_Db->RequestNonBlockTicket(); + if (non_blocking) + { + leveldb::gPerfCounters->Inc(leveldb::ePerfDebug2); + // use stack if calling direct :-) + leveldb::WriteOptions opts; + leveldb::WriteBatch batch; - // Construct a write batch: - leveldb::WriteBatch* batch = new leveldb::WriteBatch; + fold(env, argv[3], parse_write_option, opts); + opts.non_blocking=true; - // Seed the batch's data: - ERL_NIF_TERM result = fold(env, argv[2], write_batch_item, *batch); - if(eleveldb::ATOM_OK != result) - { - // must manually delete batch on failure at this point, - // later WriteTask object will own and delete - delete batch; - return send_reply(env, caller_ref, - enif_make_tuple3(env, eleveldb::ATOM_ERROR, caller_ref, - enif_make_tuple2(env, eleveldb::ATOM_BAD_WRITE_ACTION, - result))); + // Seed the batch's data: + ERL_NIF_TERM result = fold(env, argv[2], write_batch_item, batch); + if(eleveldb::ATOM_OK != result) + { + return enif_make_tuple3(env, eleveldb::ATOM_ERROR, caller_ref, + enif_make_tuple2(env, eleveldb::ATOM_BAD_WRITE_ACTION, + result)); + } // if + + leveldb::Status status = db_ptr->m_Db->Write(opts, &batch); + + return (status.ok() ? ATOM_OK : error_tuple(env, ATOM_ERROR_DB_WRITE, status)); } // if + else + { + // use heap if sending message :-( + leveldb::gPerfCounters->Inc(leveldb::ePerfDebug3); - leveldb::WriteOptions* opts = new leveldb::WriteOptions; - fold(env, argv[3], parse_write_option, *opts); + // Construct a write batch: + leveldb::WriteBatch* batch = new leveldb::WriteBatch; - eleveldb::WorkTask* work_item = new eleveldb::WriteTask(env, caller_ref, - db_ptr.get(), batch, opts); + // Seed the batch's data: + ERL_NIF_TERM result = fold(env, argv[2], write_batch_item, *batch); + if(eleveldb::ATOM_OK != result) + { + delete batch; + return enif_make_tuple3(env, eleveldb::ATOM_ERROR, caller_ref, + enif_make_tuple2(env, eleveldb::ATOM_BAD_WRITE_ACTION, + result)); + } // if - if(false == priv.thread_pool.Submit(work_item)) - { - // work_item contains "batch" and the delete below gets both memory allocations - delete work_item; - return send_reply(env, caller_ref, - enif_make_tuple2(env, eleveldb::ATOM_ERROR, caller_ref)); - } // if + leveldb::WriteOptions* opts = new leveldb::WriteOptions; + fold(env, argv[3], parse_write_option, *opts); + + eleveldb::WorkTask* work_item = new eleveldb::WriteTask(env, caller_ref, + db_ptr, batch, opts); + eleveldb_priv_data& data = *static_cast(enif_priv_data(env)); + if(false == data.thread_pool.Submit(work_item)) + { + // work_item contains "batch" and the delete below gets both memory allocations + delete work_item; + return enif_make_tuple2(env, eleveldb::ATOM_ERROR, caller_ref); + } // if + return(caller_ref); + } // else + + // not going to reach this return eleveldb::ATOM_OK; } @@ -770,18 +818,8 @@ async_get( fold(env, opts_ref, parse_read_option, opts); eleveldb::WorkTask *work_item = new eleveldb::GetTask(env, caller_ref, - db_ptr.get(), key_ref, opts); - - eleveldb_priv_data& priv = *static_cast(enif_priv_data(env)); - - if(false == priv.thread_pool.Submit(work_item)) - { - delete work_item; - return send_reply(env, caller_ref, - enif_make_tuple2(env, eleveldb::ATOM_ERROR, caller_ref)); - } // if - - return eleveldb::ATOM_OK; + db_ptr, key_ref, opts); + return submit_to_thread_queue(work_item, env, caller_ref); } // async_get @@ -817,19 +855,8 @@ async_iterator( fold(env, options_ref, parse_read_option, opts); eleveldb::WorkTask *work_item = new eleveldb::IterTask(env, caller_ref, - db_ptr.get(), keys_only, opts); - - // Now-boilerplate setup (we'll consolidate this pattern soon, I hope): - eleveldb_priv_data& priv = *static_cast(enif_priv_data(env)); - - if(false == priv.thread_pool.Submit(work_item)) - { - delete work_item; - return send_reply(env, caller_ref, enif_make_tuple2(env, ATOM_ERROR, caller_ref)); - } // if - - return ATOM_OK; - + db_ptr, keys_only, opts); + return submit_to_thread_queue(work_item, env, caller_ref); } // async_iterator @@ -849,11 +876,14 @@ async_iterator_move( ReferencePtr itr_ptr; - itr_ptr.assign(ItrObject::RetrieveItrObject(env, itr_handle_ref)); + ItrObject::RetrieveItrObject(env, itr_handle_ref, false, itr_ptr); if(NULL==itr_ptr.get() || 0!=itr_ptr->GetCloseRequested()) return enif_make_badarg(env); + // Nov 2, 2016: Hack against AAE using iterator on two threads + leveldb::MutexLock lock(&itr_ptr->m_CloseMutex); + // Reuse ref from iterator creation const ERL_NIF_TERM& caller_ref = itr_ptr->itr_ref; @@ -873,12 +903,12 @@ async_iterator_move( } // if // debug syslog(LOG_ERR, "move state: %d, %d, %d", - // action, itr_ptr->m_Iter->m_PrefetchStarted, itr_ptr->m_Iter->m_HandoffAtomic); + // action, itr_ptr->m_Wrap.m_PrefetchStarted, itr_ptr->m_Wrap.m_HandoffAtomic); // must set this BEFORE call to compare_and_swap ... or have potential // for an "extra" message coming out of prefetch - prefetch_state = itr_ptr->m_Iter->m_PrefetchStarted; - itr_ptr->m_Iter->m_PrefetchStarted = prefetch_state && (eleveldb::MoveTask::PREFETCH_STOP != action ); + prefetch_state = itr_ptr->m_Wrap.m_PrefetchStarted; + itr_ptr->m_Wrap.m_PrefetchStarted = prefetch_state && (eleveldb::MoveTask::PREFETCH_STOP != action ); // // Three situations: @@ -899,14 +929,14 @@ async_iterator_move( ret_term = enif_make_copy(env, itr_ptr->itr_ref); // force reply to be a message - itr_ptr->m_Iter->m_HandoffAtomic=1; - itr_ptr->m_Iter->m_PrefetchStarted=false; + itr_ptr->m_Wrap.m_HandoffAtomic=1; + itr_ptr->m_Wrap.m_PrefetchStarted=false; } // if // case #2 // before we launch a background job for "next iteration", see if there is a // prefetch waiting for us - else if (leveldb::compare_and_swap(&itr_ptr->m_Iter->m_HandoffAtomic, 0, 1)) + else if (leveldb::compare_and_swap(&itr_ptr->m_Wrap.m_HandoffAtomic, 0, 1)) { // nope, no prefetch ... await a message to erlang queue // NOTE: "else" clause of MoveTask::DoWork() could be running simultaneously @@ -931,8 +961,8 @@ async_iterator_move( // (this is an absolute must since worker thread could change to false if // hits end of key space and its execution overlaps this block's execution) int cas_temp((eleveldb::MoveTask::PREFETCH_STOP != action ) // needed for Solaris CAS - && itr_ptr->m_Iter->Valid()); - leveldb::compare_and_swap(&itr_ptr->m_Iter->m_PrefetchStarted, + && itr_ptr->m_Wrap.Valid()); + leveldb::compare_and_swap(&itr_ptr->m_Wrap.m_PrefetchStarted, prefetch_state, cas_temp); } // else if @@ -943,34 +973,34 @@ async_iterator_move( // why yes there is. copy the key/value info into a return tuple before // we launch the iterator for "next" again // NOTE: worker thread is inactive at this time - if(!itr_ptr->m_Iter->Valid()) + if(!itr_ptr->m_Wrap.Valid()) ret_term=enif_make_tuple2(env, ATOM_ERROR, ATOM_INVALID_ITERATOR); - else if (itr_ptr->m_Iter->m_KeysOnly) - ret_term=enif_make_tuple2(env, ATOM_OK, slice_to_binary(env, itr_ptr->m_Iter->key())); + else if (itr_ptr->keys_only) + ret_term=enif_make_tuple2(env, ATOM_OK, slice_to_binary(env, itr_ptr->m_Wrap.key())); else ret_term=enif_make_tuple3(env, ATOM_OK, - slice_to_binary(env, itr_ptr->m_Iter->key()), - slice_to_binary(env, itr_ptr->m_Iter->value())); + slice_to_binary(env, itr_ptr->m_Wrap.key()), + slice_to_binary(env, itr_ptr->m_Wrap.value())); // reset for next race - itr_ptr->m_Iter->m_HandoffAtomic=0; + itr_ptr->m_Wrap.m_HandoffAtomic=0; // old MoveItem could still be active on its thread, cannot // reuse ... but the current Iterator is good itr_ptr->ReleaseReuseMove(); if (eleveldb::MoveTask::PREFETCH_STOP != action - && itr_ptr->m_Iter->Valid()) + && itr_ptr->m_Wrap.Valid()) { submit_new_request=true; } // if else { submit_new_request=false; - itr_ptr->m_Iter->m_HandoffAtomic=0; - itr_ptr->m_Iter->m_PrefetchStarted=false; + itr_ptr->m_Wrap.m_HandoffAtomic=0; + itr_ptr->m_Wrap.m_PrefetchStarted=false; } // else @@ -983,7 +1013,7 @@ async_iterator_move( eleveldb::MoveTask * move_item; move_item = new eleveldb::MoveTask(env, caller_ref, - itr_ptr->m_Iter.get(), action); + itr_ptr, action); // prevent deletes during worker loop move_item->RefInc(); @@ -1046,16 +1076,9 @@ async_close( && db_ptr->ClaimCloseFromCThread()) { eleveldb::WorkTask *work_item = new eleveldb::CloseTask(env, caller_ref, - db_ptr.get()); - - // Now-boilerplate setup (we'll consolidate this pattern soon, I hope): - eleveldb_priv_data& priv = *static_cast(enif_priv_data(env)); + db_ptr); + return submit_to_thread_queue(work_item, env, caller_ref); - if(false == priv.thread_pool.Submit(work_item)) - { - delete work_item; - return send_reply(env, caller_ref, enif_make_tuple2(env, ATOM_ERROR, caller_ref)); - } // if } // if else if (!term_ok) { @@ -1078,7 +1101,7 @@ async_iterator_close( ReferencePtr itr_ptr; - itr_ptr.assign(ItrObject::RetrieveItrObject(env, itr_ref)); + ItrObject::RetrieveItrObject(env, itr_ref, false, itr_ptr); if(NULL==itr_ptr.get() || 0!=itr_ptr->GetCloseRequested()) { @@ -1086,21 +1109,16 @@ async_iterator_close( return enif_make_badarg(env); } + // Nov 2, 2016: Hack against AAE using iterator on two threads + leveldb::MutexLock lock(&itr_ptr->m_CloseMutex); + // verify that Erlang has not called ItrObjectResourceCleanup AND // that a database close has not already started death proceedings if (itr_ptr->ClaimCloseFromCThread()) { eleveldb::WorkTask *work_item = new eleveldb::ItrCloseTask(env, caller_ref, - itr_ptr.get()); - - // Now-boilerplate setup (we'll consolidate this pattern soon, I hope): - eleveldb_priv_data& priv = *static_cast(enif_priv_data(env)); - - if(false == priv.thread_pool.Submit(work_item)) - { - delete work_item; - return send_reply(env, caller_ref, enif_make_tuple2(env, ATOM_ERROR, caller_ref)); - } // if + itr_ptr); + return submit_to_thread_queue(work_item, env, caller_ref); } // if // this close/cleanup call is way late ... bad programmer! @@ -1130,25 +1148,15 @@ async_destroy( ERL_NIF_TERM caller_ref = argv[0]; - eleveldb_priv_data& priv = *static_cast(enif_priv_data(env)); - leveldb::Options *opts = new leveldb::Options; fold(env, argv[2], parse_open_option, *opts); eleveldb::WorkTask *work_item = new eleveldb::DestroyTask(env, caller_ref, db_name, opts); - - if(false == priv.thread_pool.Submit(work_item)) - { - delete work_item; - return send_reply(env, caller_ref, - enif_make_tuple2(env, eleveldb::ATOM_ERROR, caller_ref)); - } - - return eleveldb::ATOM_OK; - + return submit_to_thread_queue(work_item, env, caller_ref); } // async_destroy + } // namespace eleveldb diff --git a/c_src/refobjects.cc b/c_src/refobjects.cc index 05f14031..1f7ee9e7 100644 --- a/c_src/refobjects.cc +++ b/c_src/refobjects.cc @@ -165,7 +165,7 @@ ErlRefObject::RefDec() return(cur_count); -} // DbObject::RefDec +} // ErlRefObject::RefDec /** @@ -341,7 +341,6 @@ DbObject::Shutdown() // if (leveldb::compare_and_swap(itr_ptr->m_ErlangThisPtr, itr_ptr, (ItrObject *)NULL)) if (itr_ptr->ClaimCloseFromCThread()) { - itr_ptr->m_Iter->LogIterator(); itr_ptr->ItrObject::InitiateCloseRequest(); } // if } // if @@ -388,51 +387,23 @@ DbObject::RemoveReference( */ LevelIteratorWrapper::LevelIteratorWrapper( - ItrObject * ItrPtr, - bool KeysOnly, - leveldb::ReadOptions & Options, - ERL_NIF_TERM itr_ref) - : m_DbPtr(ItrPtr->m_DbPtr.get()), m_ItrPtr(ItrPtr), m_Snapshot(NULL), m_Iterator(NULL), - m_HandoffAtomic(0), m_KeysOnly(KeysOnly), m_PrefetchStarted(false), - m_Options(Options), itr_ref(itr_ref), + DbObjectPtr_t & DbPtr, //!< db access for local iterator rebuild + leveldb::ReadOptions & Options) //!< options to use in iterator rebuild + : m_DbPtr(DbPtr), m_Options(Options), + m_Snapshot(NULL), m_Iterator(NULL), + m_HandoffAtomic(0), m_PrefetchStarted(false), m_IteratorStale(0), m_StillUse(true), - m_IteratorCreated(0), m_LastLogReport(0), m_MoveCount(0), m_IsValid(false) + m_IsValid(false) { - struct timeval tv; - - gettimeofday(&tv, NULL); - m_IteratorCreated=tv.tv_sec; - m_LastLogReport=tv.tv_sec; RebuildIterator(); } // LevelIteratorWrapper::LevelIteratorWrapper -/** - * put info about this iterator into leveldb LOG - */ - -void -LevelIteratorWrapper::LogIterator() -{ -#if 0 // available in different branch - struct tm created; - - localtime_r(&m_IteratorCreated, &created); - - leveldb::Log(m_DbPtr->m_Db->GetLogger(), - "Iterator created %d/%d/%d %d:%d:%d, move operations %zd (%p)", - created.tm_mon, created.tm_mday, created.tm_year-100, - created.tm_hour, created.tm_min, created.tm_sec, - m_MoveCount, m_Iterator); -#endif -} // LevelIteratorWrapper::LogIterator() - /** * Iterator management object (Erlang memory) */ - ErlNifResourceType * ItrObject::m_Itr_RESOURCE(NULL); @@ -453,18 +424,21 @@ ItrObject::CreateItrObjectType( void * ItrObject::CreateItrObject( - DbObject * DbPtr, + DbObjectPtr_t & DbPtr, bool KeysOnly, leveldb::ReadOptions & Options) { + ItrObjErlang * erl_ptr; ItrObject * ret_ptr; void * alloc_ptr; // the alloc call initializes the reference count to "one" - alloc_ptr=enif_alloc_resource(m_Itr_RESOURCE, sizeof(ItrObject *)); + alloc_ptr=enif_alloc_resource(m_Itr_RESOURCE, sizeof(ItrObjErlang)); + erl_ptr=(ItrObjErlang *)alloc_ptr; ret_ptr=new ItrObject(DbPtr, KeysOnly, Options); - *(ItrObject **)alloc_ptr=ret_ptr; + erl_ptr->m_ItrPtr=ret_ptr; + erl_ptr->m_SpinLock=0; // manual reference increase to keep active until "eleveldb_iterator_close" called ret_ptr->RefInc(); @@ -478,18 +452,28 @@ ItrObject::CreateItrObject( ItrObject * ItrObject::RetrieveItrObject( ErlNifEnv * Env, - const ERL_NIF_TERM & ItrTerm, bool ItrClosing) + const ERL_NIF_TERM & ItrTerm, + bool ItrClosing, + ItrObjectPtr_t & counted_ptr) { - ItrObject ** itr_ptr_ptr, * ret_ptr; + ItrObjErlang * erl_ptr; + ItrObject * ret_ptr; ret_ptr=NULL; - if (enif_get_resource(Env, ItrTerm, m_Itr_RESOURCE, (void **)&itr_ptr_ptr)) + if (enif_get_resource(Env, ItrTerm, m_Itr_RESOURCE, (void **)&erl_ptr)) { - ret_ptr=*itr_ptr_ptr; + ret_ptr=erl_ptr->m_ItrPtr; + // only continue if close sequence not started if (NULL!=ret_ptr) { + // need to use "const int" instead of literals for + // solaris and smartos compare_and_swap to compile + const int zero(0), one(1); + // lock access ... spin + while(!leveldb::compare_and_swap(&erl_ptr->m_SpinLock, zero, one)) ; + // has close been requested? if (ret_ptr->GetCloseRequested() || (!ItrClosing && ret_ptr->m_DbPtr->GetCloseRequested())) @@ -497,6 +481,12 @@ ItrObject::RetrieveItrObject( // object already closing ret_ptr=NULL; } // if + + // set during spin lock + counted_ptr.assign(ret_ptr); + + // use cas for memory fencing, we own the lock + leveldb::compare_and_swap(&erl_ptr->m_SpinLock, one, zero); } // if } // if @@ -510,14 +500,15 @@ ItrObject::ItrObjectResourceCleanup( ErlNifEnv * Env, void * Arg) { - ItrObject * volatile * erl_ptr; + + ItrObjErlang * erl_ptr; ItrObject * itr_ptr; - erl_ptr=(ItrObject * volatile *)Arg; - itr_ptr=*erl_ptr; + erl_ptr=(ItrObjErlang *)Arg; + itr_ptr=erl_ptr->m_ItrPtr; // is Erlang first to initiate close? - if (leveldb::compare_and_swap(erl_ptr, itr_ptr, (ItrObject *)NULL) + if (leveldb::compare_and_swap(&erl_ptr->m_ItrPtr, itr_ptr, (ItrObject *)NULL) && NULL!=itr_ptr) { leveldb::gPerfCounters->Inc(leveldb::ePerfDebug3); @@ -530,13 +521,15 @@ ItrObject::ItrObjectResourceCleanup( ItrObject::ItrObject( - DbObject * DbPtr, + DbObjectPtr_t & DbPtr, bool KeysOnly, leveldb::ReadOptions & Options) - : keys_only(KeysOnly), m_ReadOptions(Options), reuse_move(NULL), + : keys_only(KeysOnly), m_ReadOptions(Options), + m_Wrap(DbPtr, m_ReadOptions), + reuse_move(NULL), m_DbPtr(DbPtr), itr_ref_env(NULL) { - if (NULL!=DbPtr) + if (NULL!=DbPtr.get()) DbPtr->AddReference(this); } // ItrObject::ItrObject @@ -564,6 +557,37 @@ ItrObject::~ItrObject() } // ItrObject::~ItrObject +/** + * matthewv - This is a hack to compensate for Riak AAE + * having two active processes using the same iterator. + * One process attempts a close while the other iterates along. + * This is to help the close succeed. (October 2016) + */ +uint32_t +ItrObject::RefDec() +{ + uint32_t cur_count; + + // Race condition: + // Thread trying to close gets into InitiateCloseRequest() and + // finishes call to Shutdown(). Thread iterating gets far enough + // into async_iterator_move() to not see GetCloseRequest() set, but + // is able to create a new MoveItem within reuse_move. + // This hack knows that async_iterator_move() uses ItrObjectPtr_t that + // holds "this" until the end of the function. ItrObjectPtr_t will + // call RefDec in its destructor. Gives a chance to cleanup a tad. + if (1==GetCloseRequested()) + ReleaseReuseMove(); + + // WARNING: the following call could delete this object. + // make no references to object members afterward + cur_count=ErlRefObject::RefDec(); + + return(cur_count); + +} // ItrObject::RefDec + + void ItrObject::Shutdown() { @@ -572,9 +596,6 @@ ItrObject::Shutdown() // release when move object destructs) ReleaseReuseMove(); - // ItrObject and m_Iter each hold pointers to other, release ours - m_Iter.assign(NULL); - return; } // ItrObject::Shutdown diff --git a/c_src/refobjects.h b/c_src/refobjects.h index d020136b..142b2021 100644 --- a/c_src/refobjects.h +++ b/c_src/refobjects.h @@ -123,7 +123,7 @@ class ReferencePtr : t(NULL) {}; - ReferencePtr(TargetT *_t) + explicit ReferencePtr(TargetT *_t) : t(_t) { if (NULL!=t) @@ -207,43 +207,42 @@ class DbObject : public ErlRefObject DbObject& operator=(const DbObject&); // nocopyassign }; // class DbObject +typedef ReferencePtr DbObjectPtr_t; + /** * A self deleting wrapper to contain leveldb iterator. * Used when an ItrObject needs to skip around and might * have a background MoveItem performing a prefetch on existing * iterator. + * + * Oct 17, 2016: new usage model does not require the Wrapper + * be replaced for reuse after Seeks. Converting to static object */ -class LevelIteratorWrapper : public RefObject +class LevelIteratorWrapper { public: - ReferencePtr m_DbPtr; //!< need to keep db open for delete of this object - ReferencePtr m_ItrPtr; //!< shared itr_ref requires we hold ItrObject + DbObjectPtr_t m_DbPtr; //!< access to db for iterator rebuild + leveldb::ReadOptions & m_Options; //!< ItrObject's ReadOptions struct + // (updates "snapshot" member + const leveldb::Snapshot * m_Snapshot; leveldb::Iterator * m_Iterator; volatile uint32_t m_HandoffAtomic; //!< matthew's atomic foreground/background prefetch flag. - bool m_KeysOnly; //!< only return key values + // m_PrefetchStarted must use uint32_t instead of bool for Solaris CAS operations volatile uint32_t m_PrefetchStarted; //!< true after first prefetch command - leveldb::ReadOptions m_Options; //!< local copy of ItrObject::options - ERL_NIF_TERM itr_ref; //!< shared copy of ItrObject::itr_ref // only used if m_Options.iterator_refresh == true std::string m_RecentKey; //!< Most recent key returned time_t m_IteratorStale; //!< time iterator should refresh bool m_StillUse; //!< true if no error or key end seen - // debug data for hung iteratos - time_t m_IteratorCreated; //!< time constructor called - time_t m_LastLogReport; //!< LOG message was last written - size_t m_MoveCount; //!< number of calls to MoveItem - // read by Erlang thread, maintained by eleveldb MoveItem::DoWork volatile bool m_IsValid; //!< iterator state after last operation - LevelIteratorWrapper(ItrObject * ItrPtr, bool KeysOnly, - leveldb::ReadOptions & Options, ERL_NIF_TERM itr_ref); + LevelIteratorWrapper(DbObjectPtr_t & DbPtr, leveldb::ReadOptions & Options); virtual ~LevelIteratorWrapper() { @@ -294,15 +293,13 @@ class LevelIteratorWrapper : public RefObject m_Iterator = m_DbPtr->m_Db->NewIterator(m_Options); } // RebuildIterator - // hung iterator debug - void LogIterator(); - private: LevelIteratorWrapper(const LevelIteratorWrapper &); // no copy LevelIteratorWrapper& operator=(const LevelIteratorWrapper &); // no assignment }; // LevelIteratorWrapper +typedef ReferencePtr LevelIteratorWrapperPtr_t; /** @@ -311,10 +308,9 @@ class LevelIteratorWrapper : public RefObject class ItrObject : public ErlRefObject { public: - ReferencePtr m_Iter; - bool keys_only; leveldb::ReadOptions m_ReadOptions; //!< local copy, pass to LevelIteratorWrapper only + LevelIteratorWrapper m_Wrap; volatile class MoveTask * reuse_move; //!< iterator work object that is reused instead of lots malloc/free @@ -328,18 +324,21 @@ class ItrObject : public ErlRefObject static ErlNifResourceType* m_Itr_RESOURCE; public: - ItrObject(DbObject *, bool, leveldb::ReadOptions &); + ItrObject(DbObjectPtr_t &, bool, leveldb::ReadOptions &); virtual ~ItrObject(); // needs to perform free_itr + virtual uint32_t RefDec(); + virtual void Shutdown(); static void CreateItrObjectType(ErlNifEnv * Env); - static void * CreateItrObject(DbObject * Db, bool KeysOnly, leveldb::ReadOptions & Options); + static void * CreateItrObject(DbObjectPtr_t & Db, bool KeysOnly, leveldb::ReadOptions & Options); static ItrObject * RetrieveItrObject(ErlNifEnv * Env, const ERL_NIF_TERM & DbTerm, - bool ItrClosing=false); + bool ItrClosing, + ReferencePtr & CountedPtr); static void ItrObjectResourceCleanup(ErlNifEnv *Env, void * Arg); @@ -352,6 +351,21 @@ class ItrObject : public ErlRefObject }; // class ItrObject + +typedef ReferencePtr ItrObjectPtr_t; + + +/** + * Container stored in Erlang heap. Used + * to allow erlang heap to destroy iterator if process(s) holding + * iterator go away. + */ +struct ItrObjErlang +{ + ItrObject * m_ItrPtr; + volatile uint32_t m_SpinLock; +}; + } // namespace eleveldb diff --git a/c_src/workitems.cc b/c_src/workitems.cc index 4ef4d11e..911a45cd 100644 --- a/c_src/workitems.cc +++ b/c_src/workitems.cc @@ -79,7 +79,7 @@ WorkTask::WorkTask(ErlNifEnv *caller_env, ERL_NIF_TERM& caller_ref) } // WorkTask::WorkTask -WorkTask::WorkTask(ErlNifEnv *caller_env, ERL_NIF_TERM& caller_ref, DbObject * DbPtr) +WorkTask::WorkTask(ErlNifEnv *caller_env, ERL_NIF_TERM& caller_ref, DbObjectPtr_t & DbPtr) : m_DbPtr(DbPtr), terms_set(false) { if (NULL!=caller_env) @@ -178,67 +178,181 @@ OpenTask::DoWork() } // OpenTask::DoWork() +/** + * WriteTask functions + */ + +WriteTask::WriteTask(ErlNifEnv* _owner_env, ERL_NIF_TERM _caller_ref, + DbObjectPtr_t & _db_handle, + leveldb::WriteBatch* _batch, + leveldb::WriteOptions* _options) + : WorkTask(_owner_env, _caller_ref, _db_handle), + batch(_batch), + options(_options) +{} + +WriteTask::~WriteTask() +{ + delete batch; + delete options; +} + +work_result +WriteTask::DoWork() +{ + leveldb::Status status = m_DbPtr->m_Db->Write(*options, batch); + + return (status.ok() ? work_result(ATOM_OK) : work_result(local_env(), ATOM_ERROR_DB_WRITE, status)); +} + +/** + * GetTask functions + */ + +GetTask::GetTask(ErlNifEnv *_caller_env, + ERL_NIF_TERM _caller_ref, + DbObjectPtr_t & _db_handle, + ERL_NIF_TERM _key_term, + leveldb::ReadOptions &_options) + : WorkTask(_caller_env, _caller_ref, _db_handle), + options(_options) +{ + ErlNifBinary key; + + enif_inspect_binary(_caller_env, _key_term, &key); + m_Key.assign((const char *)key.data, key.size); +} + +GetTask::~GetTask() {} + +work_result +GetTask::DoWork() +{ + ERL_NIF_TERM value_bin; + BinaryValue value(local_env(), value_bin); + leveldb::Slice key_slice(m_Key); + + leveldb::Status status = m_DbPtr->m_Db->Get(options, key_slice, &value); + + if(!status.ok()){ + if ( status.IsNotFound() ) + return work_result(ATOM_NOT_FOUND); + else + return work_result(local_env(), ATOM_ERROR, status); + } + return work_result(local_env(), ATOM_OK, value_bin); +} + +/** + * IterTask functions + */ + +IterTask::IterTask(ErlNifEnv *_caller_env, + ERL_NIF_TERM _caller_ref, + DbObjectPtr_t & _db_handle, + const bool _keys_only, + leveldb::ReadOptions &_options) + : WorkTask(_caller_env, _caller_ref, _db_handle), + keys_only(_keys_only), options(_options) +{} + +IterTask::~IterTask() {} + +work_result +IterTask::DoWork() +{ + ItrObject * itr_ptr=0; + void * itr_ptr_ptr=0; + + // NOTE: transferring ownership of options to ItrObject + itr_ptr_ptr=ItrObject::CreateItrObject(m_DbPtr, keys_only, options); + + // Copy caller_ref to reuse in future iterator_move calls + itr_ptr=((ItrObjErlang*)itr_ptr_ptr)->m_ItrPtr; + itr_ptr->itr_ref_env = enif_alloc_env(); + itr_ptr->itr_ref = enif_make_copy(itr_ptr->itr_ref_env, caller_ref()); + + ERL_NIF_TERM result = enif_make_resource(local_env(), itr_ptr_ptr); + + // release reference created during CreateItrObject() + enif_release_resource(itr_ptr_ptr); + + return work_result(local_env(), ATOM_OK, result); +} // operator() /** * MoveTask functions */ +// Constructor with no seek target: + +MoveTask::MoveTask(ErlNifEnv *_caller_env, ERL_NIF_TERM _caller_ref, + ItrObjectPtr_t & Iter, action_t& _action) + : WorkTask(NULL, _caller_ref, Iter->m_DbPtr), + m_Itr(Iter), action(_action) +{ + // special case construction + local_env_=NULL; + enif_self(_caller_env, &local_pid); +} + +// Constructor with seek target: + +MoveTask::MoveTask(ErlNifEnv *_caller_env, ERL_NIF_TERM _caller_ref, + ItrObjectPtr_t & Iter, action_t& _action, + std::string& _seek_target) + : WorkTask(NULL, _caller_ref, Iter->m_DbPtr), + m_Itr(Iter), action(_action), + seek_target(_seek_target) +{ + // special case construction + local_env_=NULL; + enif_self(_caller_env, &local_pid); +} + +MoveTask::~MoveTask() {}; + work_result MoveTask::DoWork() { leveldb::Iterator* itr; - itr=m_ItrWrap->get(); + itr=m_Itr->m_Wrap.get(); - ++m_ItrWrap->m_MoveCount; // // race condition of prefetch clearing db iterator while // async_iterator_move looking at it. // // iterator_refresh operation - if (m_ItrWrap->m_Options.iterator_refresh && m_ItrWrap->m_StillUse) + if (m_Itr->m_Wrap.m_Options.iterator_refresh && m_Itr->m_Wrap.m_StillUse) { struct timeval tv; gettimeofday(&tv, NULL); - if (m_ItrWrap->m_IteratorStale < tv.tv_sec || NULL==itr) + if (m_Itr->m_Wrap.m_IteratorStale < tv.tv_sec || NULL==itr) { - m_ItrWrap->RebuildIterator(); - itr=m_ItrWrap->get(); + m_Itr->m_Wrap.RebuildIterator(); + itr=m_Itr->m_Wrap.get(); // recover position - if (NULL!=itr && 0!=m_ItrWrap->m_RecentKey.size()) + if (NULL!=itr && 0!=m_Itr->m_Wrap.m_RecentKey.size()) { - leveldb::Slice key_slice(m_ItrWrap->m_RecentKey); + leveldb::Slice key_slice(m_Itr->m_Wrap.m_RecentKey); itr->Seek(key_slice); - m_ItrWrap->m_StillUse=itr->Valid(); - if (!m_ItrWrap->m_StillUse) + m_Itr->m_Wrap.m_StillUse=itr->Valid(); + if (!m_Itr->m_Wrap.m_StillUse) { itr=NULL; - m_ItrWrap->PurgeIterator(); + m_Itr->m_Wrap.PurgeIterator(); } // if } // if } // if } // if - // hung iterator debug - { - struct timeval tv; - - gettimeofday(&tv, NULL); - - // 14400 is 4 hours in seconds ... 60*60*4 - if ((m_ItrWrap->m_LastLogReport + 14400) < tv.tv_sec && NULL!=m_ItrWrap->get()) - { - m_ItrWrap->LogIterator(); - m_ItrWrap->m_LastLogReport=tv.tv_sec; - } // if - } - // back to normal operation if(NULL == itr) return work_result(local_env(), ATOM_ERROR, ATOM_ITERATOR_CLOSED); @@ -273,30 +387,30 @@ MoveTask::DoWork() } // switch // set state for Erlang side to read - m_ItrWrap->SetValid(itr->Valid()); + m_Itr->m_Wrap.SetValid(itr->Valid()); // Post processing before telling the world the results // (while only one thread might be looking at objects) - if (m_ItrWrap->m_Options.iterator_refresh) + if (m_Itr->m_Wrap.m_Options.iterator_refresh) { if (itr->Valid()) { - m_ItrWrap->m_RecentKey.assign(itr->key().data(), itr->key().size()); + m_Itr->m_Wrap.m_RecentKey.assign(itr->key().data(), itr->key().size()); } // if else if (PREFETCH_STOP!=action) { // release iterator now, not later - m_ItrWrap->m_StillUse=false; - m_ItrWrap->PurgeIterator(); + m_Itr->m_Wrap.m_StillUse=false; + m_Itr->m_Wrap.PurgeIterator(); itr=NULL; } // else } // if // debug syslog(LOG_ERR, " MoveItem::DoWork() %d, %d, %d", - // action, m_ItrWrap->m_StillUse, m_ItrWrap->m_HandoffAtomic); + // action, m_Itr->m_Wrap.m_StillUse, m_Itr->m_Wrap.m_HandoffAtomic); // who got back first, us or the erlang loop - if (leveldb::compare_and_swap(&m_ItrWrap->m_HandoffAtomic, 0, 1)) + if (leveldb::compare_and_swap(&m_Itr->m_Wrap.m_HandoffAtomic, 0, 1)) { // this is prefetch of next iteration. It returned faster than actual // request to retrieve it. Stop and wait for erlang to catch up. @@ -305,15 +419,15 @@ MoveTask::DoWork() else { // setup next race for the response - m_ItrWrap->m_HandoffAtomic=0; + m_Itr->m_Wrap.m_HandoffAtomic=0; if(NULL!=itr && itr->Valid()) { - if (PREFETCH==action && m_ItrWrap->m_PrefetchStarted) + if (PREFETCH==action && m_Itr->m_Wrap.m_PrefetchStarted) m_ResubmitWork=true; // erlang is waiting, send message - if(m_ItrWrap->m_KeysOnly) + if(m_Itr->keys_only) return work_result(local_env(), ATOM_OK, slice_to_binary(local_env(), itr->key())); return work_result(local_env(), ATOM_OK, @@ -324,7 +438,7 @@ MoveTask::DoWork() { // using compare_and_swap as a hardware locking "set to false" // (a little heavy handed, but not executed often) - leveldb::compare_and_swap(&m_ItrWrap->m_PrefetchStarted, (int)true, (int)false); + leveldb::compare_and_swap(&m_Itr->m_Wrap.m_PrefetchStarted, (int)true, (int)false); return work_result(local_env(), ATOM_ERROR, ATOM_INVALID_ITERATOR); } // else @@ -342,7 +456,7 @@ MoveTask::local_env() if (!terms_set) { - caller_ref_term = enif_make_copy(local_env_, m_ItrWrap->itr_ref); + caller_ref_term = enif_make_copy(local_env_, m_Itr->itr_ref); caller_pid_term = enif_make_pid(local_env_, &local_pid); terms_set=true; } // if @@ -374,6 +488,83 @@ MoveTask::recycle() } // MoveTask::recycle +/** + * CloseTask functions + */ + +CloseTask::CloseTask(ErlNifEnv* _owner_env, ERL_NIF_TERM _caller_ref, + DbObjectPtr_t & _db_handle) + : WorkTask(_owner_env, _caller_ref, _db_handle) +{} + +CloseTask::~CloseTask() +{ +} + +work_result +CloseTask::DoWork() +{ + DbObject * db_ptr; + + // get db pointer then clear reference count to it + db_ptr=m_DbPtr.get(); + m_DbPtr.assign(NULL); + + if (NULL!=db_ptr) + { + // set closing flag, this is blocking + db_ptr->InitiateCloseRequest(); + + // db_ptr no longer valid + db_ptr=NULL; + + return(work_result(ATOM_OK)); + } // if + else + { + return work_result(local_env(), ATOM_ERROR, ATOM_BADARG); + } // else +} + +/** + * ItrCloseTask functions + */ + +ItrCloseTask::ItrCloseTask(ErlNifEnv* _owner_env, ERL_NIF_TERM _caller_ref, + ItrObjectPtr_t & _itr_handle) + : WorkTask(_owner_env, _caller_ref), + m_ItrPtr(_itr_handle) +{} + +ItrCloseTask::~ItrCloseTask() +{ +} + +work_result +ItrCloseTask::DoWork() +{ + ItrObject * itr_ptr; + + // get iterator pointer then clear reference count to it + itr_ptr=m_ItrPtr.get(); + m_ItrPtr.assign(NULL); + + if (NULL!=itr_ptr) + { + // set closing flag, this is blocking + itr_ptr->InitiateCloseRequest(); + + // itr_ptr no longer valid + itr_ptr=NULL; + + return(work_result(ATOM_OK)); + } // if + else + { + return work_result(local_env(), ATOM_ERROR, ATOM_BADARG); + } // else +} + /** * DestroyTask functions */ diff --git a/c_src/workitems.h b/c_src/workitems.h index fb84284a..475b7512 100644 --- a/c_src/workitems.h +++ b/c_src/workitems.h @@ -69,7 +69,7 @@ class WorkTask : public leveldb::ThreadTask public: WorkTask(ErlNifEnv *caller_env, ERL_NIF_TERM& caller_ref); - WorkTask(ErlNifEnv *caller_env, ERL_NIF_TERM& caller_ref, DbObject * DbPtr); + WorkTask(ErlNifEnv *caller_env, ERL_NIF_TERM& caller_ref, DbObjectPtr_t & DbPtr); virtual ~WorkTask(); @@ -132,32 +132,23 @@ class WriteTask : public WorkTask { protected: leveldb::WriteBatch* batch; - leveldb::WriteOptions* options; + leveldb::WriteOptions* options; public: - WriteTask(ErlNifEnv* _owner_env, ERL_NIF_TERM _caller_ref, - DbObject * _db_handle, + DbObjectPtr_t & _db_handle, leveldb::WriteBatch* _batch, - leveldb::WriteOptions* _options) - : WorkTask(_owner_env, _caller_ref, _db_handle), - batch(_batch), - options(_options) - {} + leveldb::WriteOptions* _options); - virtual ~WriteTask() - { - delete batch; - delete options; - } + virtual ~WriteTask(); protected: - virtual work_result DoWork() - { - leveldb::Status status = m_DbPtr->m_Db->Write(*options, batch); + virtual work_result DoWork(); - return (status.ok() ? work_result(ATOM_OK) : work_result(local_env(), ATOM_ERROR_DB_WRITE, status)); - } +private: + WriteTask(); + WriteTask(const WriteTask &); + WriteTask & operator=(const WriteTask &); }; // class WriteTask @@ -207,36 +198,13 @@ class GetTask : public WorkTask public: GetTask(ErlNifEnv *_caller_env, ERL_NIF_TERM _caller_ref, - DbObject *_db_handle, + DbObjectPtr_t & _db_handle, ERL_NIF_TERM _key_term, - leveldb::ReadOptions &_options) - : WorkTask(_caller_env, _caller_ref, _db_handle), - options(_options) - { - ErlNifBinary key; + leveldb::ReadOptions &_options); - enif_inspect_binary(_caller_env, _key_term, &key); - m_Key.assign((const char *)key.data, key.size); - } - - virtual ~GetTask() - { - } - -protected: - virtual work_result DoWork() - { - ERL_NIF_TERM value_bin; - BinaryValue value(local_env(), value_bin); - leveldb::Slice key_slice(m_Key); + virtual ~GetTask(); - leveldb::Status status = m_DbPtr->m_Db->Get(options, key_slice, &value); - - if(!status.ok()) - return work_result(ATOM_NOT_FOUND); - - return work_result(local_env(), ATOM_OK, value_bin); - } + virtual work_result DoWork(); }; // class GetTask @@ -256,41 +224,13 @@ class IterTask : public WorkTask public: IterTask(ErlNifEnv *_caller_env, ERL_NIF_TERM _caller_ref, - DbObject *_db_handle, + DbObjectPtr_t & _db_handle, const bool _keys_only, - leveldb::ReadOptions &_options) - : WorkTask(_caller_env, _caller_ref, _db_handle), - keys_only(_keys_only), options(_options) - {} - - virtual ~IterTask() - { - } - -protected: - virtual work_result DoWork() - { - ItrObject * itr_ptr; - void * itr_ptr_ptr; - - // NOTE: transfering ownership of options to ItrObject - itr_ptr_ptr=ItrObject::CreateItrObject(m_DbPtr.get(), keys_only, options); + leveldb::ReadOptions &_options); - // Copy caller_ref to reuse in future iterator_move calls - itr_ptr=*(ItrObject**)itr_ptr_ptr; - itr_ptr->itr_ref_env = enif_alloc_env(); - itr_ptr->itr_ref = enif_make_copy(itr_ptr->itr_ref_env, caller_ref()); + virtual ~IterTask(); - itr_ptr->m_Iter.assign(new LevelIteratorWrapper(itr_ptr, keys_only, - options, itr_ptr->itr_ref)); - - ERL_NIF_TERM result = enif_make_resource(local_env(), itr_ptr_ptr); - - // release reference created during CreateItrObject() - enif_release_resource(itr_ptr_ptr); - - return work_result(local_env(), ATOM_OK, result); - } + virtual work_result DoWork(); }; // class IterTask @@ -301,7 +241,7 @@ class MoveTask : public WorkTask typedef enum { FIRST, LAST, NEXT, PREV, SEEK, PREFETCH, PREFETCH_STOP } action_t; protected: - ReferencePtr m_ItrWrap; //!< access to database, and holds reference + ItrObjectPtr_t m_Itr; public: action_t action; @@ -311,28 +251,14 @@ class MoveTask : public WorkTask // No seek target: MoveTask(ErlNifEnv *_caller_env, ERL_NIF_TERM _caller_ref, - LevelIteratorWrapper * IterWrap, action_t& _action) - : WorkTask(NULL, _caller_ref, IterWrap->m_DbPtr.get()), - m_ItrWrap(IterWrap), action(_action) - { - // special case construction - local_env_=NULL; - enif_self(_caller_env, &local_pid); - } + ItrObjectPtr_t & Iter, action_t& _action); // With seek target: MoveTask(ErlNifEnv *_caller_env, ERL_NIF_TERM _caller_ref, - LevelIteratorWrapper * IterWrap, action_t& _action, - std::string& _seek_target) - : WorkTask(NULL, _caller_ref, IterWrap->m_DbPtr.get()), - m_ItrWrap(IterWrap), action(_action), - seek_target(_seek_target) - { - // special case construction - local_env_=NULL; - enif_self(_caller_env, &local_pid); - } - virtual ~MoveTask() {}; + ItrObjectPtr_t & Iter, action_t& _action, + std::string& _seek_target); + + virtual ~MoveTask(); virtual ErlNifEnv *local_env(); @@ -355,38 +281,11 @@ class CloseTask : public WorkTask public: CloseTask(ErlNifEnv* _owner_env, ERL_NIF_TERM _caller_ref, - DbObject * _db_handle) - : WorkTask(_owner_env, _caller_ref, _db_handle) - {} - - virtual ~CloseTask() - { - } + DbObjectPtr_t & _db_handle); -protected: - virtual work_result DoWork() - { - DbObject * db_ptr; + virtual ~CloseTask(); - // get db pointer then clear reference count to it - db_ptr=m_DbPtr.get(); - m_DbPtr.assign(NULL); - - if (NULL!=db_ptr) - { - // set closing flag, this is blocking - db_ptr->InitiateCloseRequest(); - - // db_ptr no longer valid - db_ptr=NULL; - - return(work_result(ATOM_OK)); - } // if - else - { - return work_result(local_env(), ATOM_ERROR, ATOM_BADARG); - } // else - } + virtual work_result DoWork(); }; // class CloseTask @@ -401,41 +300,12 @@ class ItrCloseTask : public WorkTask ReferencePtr m_ItrPtr; public: - ItrCloseTask(ErlNifEnv* _owner_env, ERL_NIF_TERM _caller_ref, - ItrObject * _itr_handle) - : WorkTask(_owner_env, _caller_ref), - m_ItrPtr(_itr_handle) - {} + ItrObjectPtr_t & _itr_handle); - virtual ~ItrCloseTask() - { - } + virtual ~ItrCloseTask(); -protected: - virtual work_result DoWork() - { - ItrObject * itr_ptr; - - // get iterator pointer then clear reference count to it - itr_ptr=m_ItrPtr.get(); - m_ItrPtr.assign(NULL); - - if (NULL!=itr_ptr) - { - // set closing flag, this is blocking - itr_ptr->InitiateCloseRequest(); - - // itr_ptr no longer valid - itr_ptr=NULL; - - return(work_result(ATOM_OK)); - } // if - else - { - return work_result(local_env(), ATOM_ERROR, ATOM_BADARG); - } // else - } + virtual work_result DoWork(); }; // class ItrCloseTask diff --git a/priv/eleveldb_multi.schema b/priv/eleveldb_multi.schema index 6c6af943..2e388f53 100644 --- a/priv/eleveldb_multi.schema +++ b/priv/eleveldb_multi.schema @@ -117,13 +117,14 @@ "multi_backend.$name.leveldb.compression", "riak_kv.multi_backend", [ {default, on}, + {commented, on}, {datatype, flag} ]}. {mapping, "multi_backend.$name.leveldb.compression.algorithm", "riak_kv.multi_backend", [ - {new_conf_value, lz4}, + {commented, lz4}, {datatype, {enum, [snappy, lz4]}} ]}. diff --git a/src/eleveldb.erl b/src/eleveldb.erl index a21edd6c..a14928b2 100644 --- a/src/eleveldb.erl +++ b/src/eleveldb.erl @@ -1,8 +1,6 @@ %% ------------------------------------------------------------------- %% -%% eleveldb: Erlang Wrapper for LevelDB (http://code.google.com/p/leveldb/) -%% -%% Copyright (c) 2010-2012 Basho Technologies, Inc. All Rights Reserved. +%% Copyright (c) 2010-2017 Basho Technologies, Inc. %% %% This file is provided to you under the Apache License, %% Version 2.0 (the "License"); you may not use this file @@ -19,6 +17,8 @@ %% under the License. %% %% ------------------------------------------------------------------- + +%% @doc Erlang NIF wrapper for LevelDB -module(eleveldb). -export([open/2, @@ -26,6 +26,7 @@ get/3, put/4, async_put/5, + async_put2/5, delete/3, write/3, fold/4, @@ -52,11 +53,18 @@ -compile(export_all). -ifdef(EQC). -include_lib("eqc/include/eqc.hrl"). --define(QC_OUT(P), - eqc:on_output(fun(Str, Args) -> io:format(user, Str, Args) end, P)). --endif. +-define(QC_OUT(P), eqc:on_output(fun terminal_format/2, P)). +-endif. % EQC -include_lib("eunit/include/eunit.hrl"). --endif. + +%% Maximum number of distinct database instances to create in any test. +%% The highest runtime limit used is the lower of this value or +%% ((num-schedulers x 4) + 1). +%% This limit is driven by filesystem size constraints on builders - MvM's +%% trials have shown this value to work on the majority of builders the +%% majority of the time. +-define(MAX_TEST_OPEN, 21). +-endif. % TEST %% This cannot be a separate function. Code must be inline to trigger %% Erlang compiler's use of optimized selective receive. @@ -172,16 +180,44 @@ delete(Ref, Key, Opts) -> write(Ref, [{delete, Key}], Opts). -spec write(db_ref(), write_actions(), write_options()) -> ok | {error, any()}. write(Ref, Updates, Opts) -> CallerRef = make_ref(), - async_write(CallerRef, Ref, Updates, Opts), - ?WAIT_FOR_REPLY(CallerRef). + case async_write(CallerRef, Ref, Updates, Opts) of + CallerRef -> + ?WAIT_FOR_REPLY(CallerRef); + Anything -> Anything + end. +%% Legacy interface. +%% +%% `async_write/4' formerly always returned `ok' and sent a message to +%% the caller. +%% +%% Now `async_write' returns the context if a message will be sent; if +%% it returns `ok' then we must generate that message ourselves for +%% code that does not know about the new `async_put2/5' interface. -spec async_put(db_ref(), reference(), binary(), binary(), write_options()) -> ok. async_put(Ref, Context, Key, Value, Opts) -> Updates = [{put, Key, Value}], - async_write(Context, Ref, Updates, Opts), - ok. + case async_write(Context, Ref, Updates, Opts) of + Context -> + ok; + ok -> + self() ! {Context, ok}, + ok + end. + +%% New (as of November 2016) interface to `async_write/4'. +%% +%% `async_write/4' intelligently decides whether to write the data +%% immediately. If it does so, it returns `ok'. If it will write +%% asynchronously, it returns the context so that the caller can wait +%% for a message indicating completion or error. +-spec async_put2(db_ref(), reference(), binary(), binary(), write_options()) -> ok. +async_put2(Ref, Context, Key, Value, Opts) -> + Updates = [{put, Key, Value}], + async_write(Context, Ref, Updates, Opts). --spec async_write(reference(), db_ref(), write_actions(), write_options()) -> ok. +-spec async_write(term(), db_ref(), write_actions(), write_options()) -> + ok | {error, any()} | term(). async_write(_CallerRef, _Ref, _Updates, _Opts) -> erlang:nif_error({error, not_loaded}). @@ -370,7 +406,12 @@ do_fold(Itr, Fun, Acc0, Opts) -> true = is_binary(Start) or (Start == first), fold_loop(iterator_move(Itr, Start), Itr, Fun, Acc0) after - iterator_close(Itr) + %% This clause shouldn't change the operation's result. + %% If the iterator has been invalidated by it or the db being closed, + %% the try clause above will raise an exception, and that's the one we + %% want to propagate. Catch the exception this raises in that case and + %% ignore it so we don't obscure the original. + catch iterator_close(Itr) end. fold_loop({error, iterator_closed}, _Itr, _Fun, Acc0) -> @@ -395,105 +436,280 @@ validate_type(_, _) -> false. %% =================================================================== -%% EUnit tests +%% Tests %% =================================================================== -ifdef(TEST). -open_test() -> [{open_test_Z(), l} || l <- lists:seq(1, 20)]. -open_test_Z() -> - os:cmd("rm -rf /tmp/eleveldb.open.test"), - {ok, Ref} = open("/tmp/eleveldb.open.test", [{create_if_missing, true}]), - ok = ?MODULE:put(Ref, <<"abc">>, <<"123">>, []), - {ok, <<"123">>} = ?MODULE:get(Ref, <<"abc">>, []), - not_found = ?MODULE:get(Ref, <<"def">>, []). - -fold_test() -> [{fold_test_Z(), l} || l <- lists:seq(1, 20)]. -fold_test_Z() -> - os:cmd("rm -rf /tmp/eleveldb.fold.test"), - {ok, Ref} = open("/tmp/eleveldb.fold.test", [{create_if_missing, true}]), - ok = ?MODULE:put(Ref, <<"def">>, <<"456">>, []), - ok = ?MODULE:put(Ref, <<"abc">>, <<"123">>, []), - ok = ?MODULE:put(Ref, <<"hij">>, <<"789">>, []), - [{<<"abc">>, <<"123">>}, - {<<"def">>, <<"456">>}, - {<<"hij">>, <<"789">>}] = lists:reverse(fold(Ref, fun({K, V}, Acc) -> [{K, V} | Acc] end, - [], [])). - -fold_keys_test() -> [{fold_keys_test_Z(), l} || l <- lists:seq(1, 20)]. -fold_keys_test_Z() -> - os:cmd("rm -rf /tmp/eleveldb.fold.keys.test"), - {ok, Ref} = open("/tmp/eleveldb.fold.keys.test", [{create_if_missing, true}]), - ok = ?MODULE:put(Ref, <<"def">>, <<"456">>, []), - ok = ?MODULE:put(Ref, <<"abc">>, <<"123">>, []), - ok = ?MODULE:put(Ref, <<"hij">>, <<"789">>, []), - [<<"abc">>, <<"def">>, <<"hij">>] = lists:reverse(fold_keys(Ref, - fun(K, Acc) -> [K | Acc] end, - [], [])). - -fold_from_key_test() -> [{fold_from_key_test_Z(), l} || l <- lists:seq(1, 20)]. -fold_from_key_test_Z() -> - os:cmd("rm -rf /tmp/eleveldb.fold.fromkeys.test"), - {ok, Ref} = open("/tmp/eleveldb.fromfold.keys.test", [{create_if_missing, true}]), - ok = ?MODULE:put(Ref, <<"def">>, <<"456">>, []), - ok = ?MODULE:put(Ref, <<"abc">>, <<"123">>, []), - ok = ?MODULE:put(Ref, <<"hij">>, <<"789">>, []), - [<<"def">>, <<"hij">>] = lists:reverse(fold_keys(Ref, - fun(K, Acc) -> [K | Acc] end, - [], [{first_key, <<"d">>}])). - -destroy_test() -> [{destroy_test_Z(), l} || l <- lists:seq(1, 20)]. -destroy_test_Z() -> - os:cmd("rm -rf /tmp/eleveldb.destroy.test"), - {ok, Ref} = open("/tmp/eleveldb.destroy.test", [{create_if_missing, true}]), - ok = ?MODULE:put(Ref, <<"def">>, <<"456">>, []), - {ok, <<"456">>} = ?MODULE:get(Ref, <<"def">>, []), - close(Ref), - ok = ?MODULE:destroy("/tmp/eleveldb.destroy.test", []), - {error, {db_open, _}} = open("/tmp/eleveldb.destroy.test", [{error_if_exists, true}]). - -compression_test() -> [{compression_test_Z(), l} || l <- lists:seq(1, 20)]. -compression_test_Z() -> - CompressibleData = list_to_binary([0 || _X <- lists:seq(1,20)]), - os:cmd("rm -rf /tmp/eleveldb.compress.0 /tmp/eleveldb.compress.1"), - {ok, Ref0} = open("/tmp/eleveldb.compress.0", [{write_buffer_size, 5}, - {create_if_missing, true}, - {compression, false}]), - [ok = ?MODULE:put(Ref0, <>, CompressibleData, [{sync, true}]) || - I <- lists:seq(1,10)], - {ok, Ref1} = open("/tmp/eleveldb.compress.1", [{write_buffer_size, 5}, - {create_if_missing, true}, - {compression, true}]), - [ok = ?MODULE:put(Ref1, <>, CompressibleData, [{sync, true}]) || - I <- lists:seq(1,10)], - %% Check both of the LOG files created to see if the compression option was correctly - %% passed down - MatchCompressOption = - fun(File, Expected) -> - {ok, Contents} = file:read_file(File), - case re:run(Contents, "Options.compression: " ++ Expected) of - {match, _} -> match; - nomatch -> nomatch - end - end, - Log0Option = MatchCompressOption("/tmp/eleveldb.compress.0/LOG", "0"), - Log1Option = MatchCompressOption("/tmp/eleveldb.compress.1/LOG", "1"), - ?assert(Log0Option =:= match andalso Log1Option =:= match). - - -close_test() -> [{close_test_Z(), l} || l <- lists:seq(1, 20)]. -close_test_Z() -> - os:cmd("rm -rf /tmp/eleveldb.close.test"), - {ok, Ref} = open("/tmp/eleveldb.close.test", [{create_if_missing, true}]), - ?assertEqual(ok, close(Ref)), - ?assertEqual({error, einval}, close(Ref)). - -close_fold_test() -> [{close_fold_test_Z(), l} || l <- lists:seq(1, 20)]. -close_fold_test_Z() -> - os:cmd("rm -rf /tmp/eleveldb.close_fold.test"), - {ok, Ref} = open("/tmp/eleveldb.close_fold.test", [{create_if_missing, true}]), - ok = eleveldb:put(Ref, <<"k">>,<<"v">>,[]), - ?assertException(throw, {iterator_closed, ok}, % ok is returned by close as the acc - eleveldb:fold(Ref, fun(_,_A) -> eleveldb:close(Ref) end, undefined, [])). +%% =================================================================== +%% Exported Test Helpers +%% =================================================================== + +-spec assert_close(DbRef :: db_ref()) -> ok | no_return(). +%% +%% Closes DbRef inside an ?assert... macro. +%% +assert_close(DbRef) -> + ?assertEqual(ok, ?MODULE:close(DbRef)). + +-spec assert_open(DbPath :: string()) -> db_ref() | no_return(). +%% +%% Opens Path inside an ?assert... macro, creating the database directory if needed. +%% +assert_open(DbPath) -> + assert_open(DbPath, [{create_if_missing, true}]). + +-spec assert_open(DbPath :: string(), OpenOpts :: open_options()) + -> db_ref() | no_return(). +%% +%% Opens DbPath, with OpenOpts, inside an ?assert... macro. +%% +assert_open(DbPath, OpenOpts) -> + OpenRet = ?MODULE:open(DbPath, OpenOpts), + ?assertMatch({ok, _}, OpenRet), + {_, DbRef} = OpenRet, + DbRef. + +-spec assert_open_small(DbPath :: string()) -> db_ref() | no_return(). +%% +%% Opens Path inside an ?assert... macro, using a limited storage footprint +%% and creating the database directory if needed. +%% +assert_open_small(DbPath) -> + assert_open(DbPath, [{create_if_missing, true}, {limited_developer_mem, true}]). + +-spec create_test_dir() -> string() | no_return(). +%% +%% Creates a new, empty, uniquely-named directory for testing and returns +%% its full path. This operation *should* never fail, but would raise an +%% ?assert...-ish exception if it did. +%% +create_test_dir() -> + string:strip(?cmd("mktemp -d /tmp/" ?MODULE_STRING ".XXXXXXX"), both, $\n). + +-spec delete_test_dir(Dir :: string()) -> ok | no_return(). +%% +%% Deletes a test directory fully, whether or not it exists. +%% This operation *should* never fail, but would raise an ?assert...-ish +%% exception if it did. +%% +delete_test_dir(Dir) -> + ?assertCmd("rm -rf " ++ Dir). + +-spec terminal_format(Fmt :: io:format(), Args :: list()) -> ok. +%% +%% Writes directly to the terminal, bypassing EUnit hooks. +%% +terminal_format(Fmt, Args) -> + io:format(user, Fmt, Args). + +%% =================================================================== +%% EUnit Tests +%% =================================================================== + +-define(local_test(Timeout, TestFunc), + fun(TestRoot) -> + Title = erlang:atom_to_list(TestFunc), + TestDir = filename:join(TestRoot, TestFunc), + {Title, {timeout, Timeout, fun() -> TestFunc(TestDir) end}} + end +). +-define(local_test(TestFunc), ?local_test(10, TestFunc)). +-define(max_test_open(Calc), erlang:min(?MAX_TEST_OPEN, Calc)). + +eleveldb_test_() -> + {foreach, + fun create_test_dir/0, + fun delete_test_dir/1, + [ + ?local_test(test_open), + ?local_test(test_close), + ?local_test(test_destroy), + ?local_test(test_fold), + ?local_test(test_fold_keys), + ?local_test(test_fold_from_key), + ?local_test(test_close_fold), + % On weak machines the following can take a while, so we tweak + % them a bit to avoid timeouts. On anything resembling a competent + % computer, these should complete in a small fraction of a second, + % but on some lightweight VMs used for validation, that can be + % extended by orders of magnitude. + ?local_test(15, test_compression), + fun(TestRoot) -> + TestName = "test_open_many", + TestDir = filename:join(TestRoot, TestName), + Count = ?max_test_open(erlang:system_info(schedulers) * 4 + 1), + Title = lists:flatten(io_lib:format("~s(~b)", [TestName, Count])), + {Title, {timeout, 30, fun() -> test_open_many(TestDir, Count) end}} + end + ] + }. + +%% fold accumulator used in a few tests +accumulate(Val, Acc) -> + [Val | Acc]. + +%% +%% Individual tests +%% + +test_open(TestDir) -> + Ref = assert_open(TestDir), + ?assertEqual(ok, ?MODULE:put(Ref, <<"abc">>, <<"123">>, [])), + ?assertEqual({ok, <<"123">>}, ?MODULE:get(Ref, <<"abc">>, [])), + ?assertEqual(not_found, ?MODULE:get(Ref, <<"def">>, [])), + assert_close(Ref). + +test_open_many(TestDir, HowMany) -> + Insts = lists:seq(1, HowMany), + KNonce = erlang:make_ref(), + VNonce = erlang:self(), + WorkSet = [ + begin + D = lists:flatten(io_lib:format("~s.~b", [TestDir, N])), + T = os:timestamp(), + K = erlang:phash2([T, N, KNonce], 1 bsl 32), + V = erlang:phash2([N, T, VNonce], 1 bsl 32), + {assert_open_small(D), + <>, <>} + end || N <- Insts], + lists:foreach( + fun({Ref, Key, Val}) -> + ?assertEqual(ok, ?MODULE:put(Ref, Key, Val, [])) + end, WorkSet), + lists:foreach( + fun({Ref, Key, Val}) -> + ?assertEqual({ok, Val}, ?MODULE:get(Ref, Key, [])) + end, WorkSet), + lists:foreach(fun assert_close/1, [R || {R, _, _} <- WorkSet]). + +test_close(TestDir) -> + Ref = assert_open(TestDir, [{create_if_missing, true}]), + assert_close(Ref), + ?assertError(badarg, ?MODULE:close(Ref)). + +test_fold(TestDir) -> + Ref = assert_open(TestDir), + ?assertEqual(ok, ?MODULE:put(Ref, <<"def">>, <<"456">>, [])), + ?assertEqual(ok, ?MODULE:put(Ref, <<"abc">>, <<"123">>, [])), + ?assertEqual(ok, ?MODULE:put(Ref, <<"hij">>, <<"789">>, [])), + ?assertEqual( + [{<<"abc">>, <<"123">>}, {<<"def">>, <<"456">>}, {<<"hij">>, <<"789">>}], + lists:reverse(?MODULE:fold(Ref, fun accumulate/2, [], []))), + assert_close(Ref). + +test_fold_keys(TestDir) -> + Ref = assert_open(TestDir), + ?assertEqual(ok, ?MODULE:put(Ref, <<"def">>, <<"456">>, [])), + ?assertEqual(ok, ?MODULE:put(Ref, <<"abc">>, <<"123">>, [])), + ?assertEqual(ok, ?MODULE:put(Ref, <<"hij">>, <<"789">>, [])), + ?assertEqual( + [<<"abc">>, <<"def">>, <<"hij">>], + lists:reverse(?MODULE:fold_keys(Ref, fun accumulate/2, [], []))), + assert_close(Ref). + +test_fold_from_key(TestDir) -> + Ref = assert_open(TestDir), + ?assertEqual(ok, ?MODULE:put(Ref, <<"def">>, <<"456">>, [])), + ?assertEqual(ok, ?MODULE:put(Ref, <<"abc">>, <<"123">>, [])), + ?assertEqual(ok, ?MODULE:put(Ref, <<"hij">>, <<"789">>, [])), + ?assertEqual([<<"def">>, <<"hij">>], lists:reverse( + ?MODULE:fold_keys(Ref, fun accumulate/2, [], [{first_key, <<"d">>}]))), + assert_close(Ref). + +test_destroy(TestDir) -> + Ref = assert_open(TestDir), + ?assertEqual(ok, ?MODULE:put(Ref, <<"def">>, <<"456">>, [])), + ?assertEqual({ok, <<"456">>}, ?MODULE:get(Ref, <<"def">>, [])), + assert_close(Ref), + ?assertEqual(ok, ?MODULE:destroy(TestDir, [])), + ?assertMatch({error, {db_open, _}}, ?MODULE:open(TestDir, [{error_if_exists, true}])). + +test_compression(TestDir) -> + IntSeq = lists:seq(1, 10), + CompressibleData = list_to_binary(lists:duplicate(20, 0)), + + Ref0 = assert_open(TestDir ++ ".0", [ + {write_buffer_size, 5}, {create_if_missing, true}, {compression, false}]), + lists:foreach( + fun(I) -> + ?assertEqual(ok, + ?MODULE:put(Ref0, <>, CompressibleData, [{sync, true}])) + end, IntSeq), + + Ref1 = assert_open(TestDir ++ ".1", [ + {write_buffer_size, 5}, {create_if_missing, true}, {compression, true}]), + lists:foreach( + fun(I) -> + ?assertEqual(ok, + ?MODULE:put(Ref1, <>, CompressibleData, [{sync, true}])) + end, IntSeq), + + %% Check both of the LOG files created to see if the compression option was + %% passed down correctly + lists:foreach( + fun(Val) -> + File = filename:join(TestDir ++ [$. | Val], "LOG"), + RRet = file:read_file(File), + ?assertMatch({ok, _}, RRet), + {_, Data} = RRet, + Pattern = "Options.compression: " ++ Val, + ?assertMatch({match, _}, re:run(Data, Pattern)) + end, ["0", "1"]), + assert_close(Ref0), + assert_close(Ref1). + +test_close_fold(TestDir) -> + Ref = assert_open(TestDir), + ?assertEqual(ok, ?MODULE:put(Ref, <<"k">>,<<"v">>,[])), + ?assertError(badarg, + ?MODULE:fold(Ref, fun(_,_) -> assert_close(Ref) end, undefined, [])). + +%% +%% Parallel tests +%% + +parallel_test_() -> + ParaCnt = ?max_test_open(erlang:system_info(schedulers) * 2 + 1), + LoadCnt = 99, + TestSeq = lists:seq(1, ParaCnt), + {foreach, + fun create_test_dir/0, + fun delete_test_dir/1, + [fun(TestRoot) -> + {inparallel, [begin + T = lists:flatten(io_lib:format("load proc ~b", [N])), + D = filename:join(TestRoot, io_lib:format("parallel_test.~b", [N])), + S = lists:seq(N, (N + LoadCnt - 1)), + {T, fun() -> run_load(D, S) end} + end || N <- TestSeq]} + end] + }. + +run_load(TestDir, IntSeq) -> + KNonce = [os:timestamp(), erlang:self()], + Ref = assert_open_small(TestDir), + VNonce = [erlang:make_ref(), os:timestamp()], + KVIn = [ + begin + K = erlang:phash2([N | KNonce], 1 bsl 32), + V = erlang:phash2([N | VNonce], 1 bsl 32), + {<>, <>} + end || N <- IntSeq], + lists:foreach( + fun({Key, Val}) -> + ?assertEqual(ok, ?MODULE:put(Ref, Key, Val, [])) + end, KVIn), + {L, R} = lists:split(erlang:hd(IntSeq), KVIn), + KVOut = R ++ L, + lists:foreach( + fun({Key, Val}) -> + ?assertEqual({ok, Val}, ?MODULE:get(Ref, Key, [])) + end, KVOut), + assert_close(Ref). + +%% =================================================================== +%% QuickCheck Tests +%% =================================================================== -ifdef(EQC). @@ -512,56 +728,81 @@ ops(Keys, Values) -> apply_kv_ops([], _Ref, Acc0) -> Acc0; apply_kv_ops([{put, K, V} | Rest], Ref, Acc0) -> - ok = eleveldb:put(Ref, K, V, []), + ?assertEqual(ok, ?MODULE:put(Ref, K, V, [])), apply_kv_ops(Rest, Ref, orddict:store(K, V, Acc0)); apply_kv_ops([{async_put, K, V} | Rest], Ref, Acc0) -> MyRef = make_ref(), Context = {my_context, MyRef}, - ok = eleveldb:async_put(Ref, Context, K, V, []), + ?assertEqual(ok, ?MODULE:async_put(Ref, Context, K, V, [])), receive {Context, ok} -> apply_kv_ops(Rest, Ref, orddict:store(K, V, Acc0)); Msg -> - error({unexpected_msg, Msg}) + erlang:error({unexpected_msg, Msg}) end; apply_kv_ops([{delete, K, _} | Rest], Ref, Acc0) -> - ok = eleveldb:delete(Ref, K, []), + ?assertEqual(ok, ?MODULE:delete(Ref, K, [])), apply_kv_ops(Rest, Ref, orddict:store(K, deleted, Acc0)). -prop_put_delete() -> +prop_put_delete(TestDir) -> ?LET({Keys, Values}, {keys(), values()}, - ?FORALL(Ops, eqc_gen:non_empty(list(ops(Keys, Values))), - begin - ?cmd("rm -rf /tmp/eleveldb.putdelete.qc"), - {ok, Ref} = eleveldb:open("/tmp/eleveldb.putdelete.qc", - [{create_if_missing, true}]), - Model = apply_kv_ops(Ops, Ref, []), - - %% Valdiate that all deleted values return not_found - F = fun({K, deleted}) -> - ?assertEqual(not_found, eleveldb:get(Ref, K, [])); - ({K, V}) -> - ?assertEqual({ok, V}, eleveldb:get(Ref, K, [])) - end, - lists:map(F, Model), - - %% Validate that a fold returns sorted values - Actual = lists:reverse(fold(Ref, fun({K, V}, Acc) -> [{K, V} | Acc] end, - [], [])), - ?assertEqual([{K, V} || {K, V} <- Model, V /= deleted], - Actual), - ok = eleveldb:close(Ref), - true - end)). + ?FORALL(Ops, eqc_gen:non_empty(list(ops(Keys, Values))), + begin + delete_test_dir(TestDir), + Ref = assert_open(TestDir, [{create_if_missing, true}]), + Model = apply_kv_ops(Ops, Ref, []), + + %% Validate that all deleted values return not_found + lists:foreach( + fun({K, deleted}) -> + ?assertEqual(not_found, ?MODULE:get(Ref, K, [])); + ({K, V}) -> + ?assertEqual({ok, V}, ?MODULE:get(Ref, K, [])) + end, Model), + + %% Validate that a fold returns sorted values + Actual = lists:reverse( + ?MODULE:fold(Ref, fun({K, V}, Acc) -> [{K, V} | Acc] end, [], [])), + ?assertEqual([{K, V} || {K, V} <- Model, V /= deleted], Actual), + assert_close(Ref), + true + end)). prop_put_delete_test_() -> Timeout1 = 10, Timeout2 = 15, - %% We use the ?ALWAYS(300, ...) wrapper around the second test as a - %% regression test. - [{timeout, 3*Timeout1, {"No ?ALWAYS()", fun() -> qc(eqc:testing_time(Timeout1,prop_put_delete())) end}}, - {timeout, 10*Timeout2, {"With ?ALWAYS()", fun() -> qc(eqc:testing_time(Timeout2,?ALWAYS(150,prop_put_delete()))) end}}]. - --endif. - --endif. + {foreach, + fun create_test_dir/0, + fun delete_test_dir/1, + [ + fun(TestRoot) -> + TestDir = filename:join(TestRoot, "putdelete.qc"), + InnerTO = Timeout1, + OuterTO = (InnerTO * 3), + Title = "Without ?ALWAYS()", + TestFun = fun() -> + qc(eqc:testing_time(InnerTO, prop_put_delete(TestDir))) + end, + {timeout, OuterTO, {Title, TestFun}} + end, + fun(TestRoot) -> + TestDir = filename:join(TestRoot, "putdelete.qc"), + InnerTO = Timeout2, + OuterTO = (InnerTO * 10), + AwCount = (InnerTO * 9), + %% We use the ?ALWAYS(AwCount, ...) wrapper as a regression test. + %% It's not clear how this is effectively different than the first + %% fixture, but I'm leaving it here in case I'm missing something. + Title = lists:flatten(io_lib:format("With ?ALWAYS(~b)", [AwCount])), + TestFun = fun() -> + qc(eqc:testing_time(InnerTO, + ?ALWAYS(AwCount, prop_put_delete(TestDir)))) + end, + {timeout, OuterTO, {Title, TestFun}} + end + ] + }. + +-endif. % EQC + +-endif. % TEST diff --git a/test/cacheleak.erl b/test/cacheleak.erl index 19b3bce1..7b24675a 100644 --- a/test/cacheleak.erl +++ b/test/cacheleak.erl @@ -1,8 +1,6 @@ %% ------------------------------------------------------------------- %% -%% eleveldb: Erlang Wrapper for LevelDB (http://code.google.com/p/leveldb/) -%% -%% Copyright (c) 2010-2013 Basho Technologies, Inc. All Rights Reserved. +%% Copyright (c) 2012-2017 Basho Technologies, Inc. %% %% This file is provided to you under the Apache License, %% Version 2.0 (the "License"); you may not use this file @@ -19,60 +17,80 @@ %% under the License. %% %% ------------------------------------------------------------------- --module(cacheleak). --compile(export_all). +-module(cacheleak). +-ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). +-define(KV_PAIRS, (1000 * 10)). +-define(VAL_SIZE, (1024 * 10)). +-define(MAX_RSS, (1000 * 500)). % driven by ?KV_PAIRS and ?VAL_SIZE ? + +-define(TEST_LOOPS, 10). +-define(TIMEOUT, (?TEST_LOOPS * 60)). + cacheleak_test_() -> - {timeout, 10*60, fun() -> - [] = os:cmd("rm -rf /tmp/eleveldb.cacheleak.test"), - Blobs = [{<>, compressible_bytes(10240)} || - I <- lists:seq(1, 10000)], - cacheleak_loop(10, Blobs, 500000) - end}. + TestRoot = eleveldb:create_test_dir(), + TestDir = filename:join(TestRoot, ?MODULE), + {setup, + fun() -> TestRoot end, + fun eleveldb:delete_test_dir/1, + {timeout, ?TIMEOUT, fun() -> + Bytes = compressible_bytes(?VAL_SIZE), + Blobs = [{<>, Bytes} || I <- lists:seq(1, ?KV_PAIRS)], + eleveldb:terminal_format("RSS limit: ~b\n", [?MAX_RSS]), + cacheleak_loop(0, Blobs, ?MAX_RSS, TestDir) + end}}. %% It's very important for this test that the data is compressible. Otherwise, -%% the file will be mmaped, and nothing will fill up the cache. +%% the file will be mmapped, and nothing will fill up the cache. compressible_bytes(Count) -> - list_to_binary([0 || _I <- lists:seq(1, Count)]). + erlang:list_to_binary(lists:duplicate(Count, 0)). -cacheleak_loop(0, _Blobs, _MaxFinalRSS) -> - ok; -cacheleak_loop(Count, Blobs, MaxFinalRSS) -> +cacheleak_loop(Count, Blobs, MaxFinalRSS, TestDir) when Count < ?TEST_LOOPS -> %% We spawn a process to open a LevelDB instance and do a series of %% reads/writes to fill up the cache. When the process exits, the LevelDB %% ref will get GC'd and we can re-evaluate the memory footprint of the %% process to make sure everything got cleaned up as expected. F = fun() -> - - {ok, Ref} = eleveldb:open("/tmp/eleveldb.cacheleak.test", - [{create_if_missing, true}, - {limited_developer_mem, true}]), - [ok = eleveldb:put(Ref, I, B, []) || {I, B} <- Blobs], - eleveldb:fold(Ref, fun({_K, _V}, A) -> A end, [], [{fill_cache, true}]), - [{ok, B} = eleveldb:get(Ref, I, []) || {I, B} <- Blobs], - ok = eleveldb:close(Ref), - erlang:garbage_collect(), - io:format(user, "RSS1: ~p\n", [rssmem()]) - end, - {_Pid, Mref} = spawn_monitor(F), + Ref = eleveldb:assert_open_small(TestDir), + lists:foreach( + fun({Key, Val}) -> + ?assertEqual(ok, eleveldb:put(Ref, Key, Val, [])) + end, Blobs), + ?assertEqual([], eleveldb:fold(Ref, + fun({_K, _V}, A) -> A end, [], [{fill_cache, true}])), + lists:foreach( + fun({Key, Val}) -> + ?assertEqual({ok, Val}, eleveldb:get(Ref, Key, [])) + end, Blobs), + eleveldb:assert_close(Ref), + erlang:garbage_collect(), + eleveldb:terminal_format("RSS ~2b: ~p\n", [Count, rssmem()]) + end, + {_Pid, Mon} = erlang:spawn_monitor(F), receive - {'DOWN', Mref, process, _, _} -> + {'DOWN', Mon, process, _, _} -> ok end, RSS = rssmem(), ?assert(MaxFinalRSS > RSS), - cacheleak_loop(Count-1, Blobs, MaxFinalRSS). + cacheleak_loop((Count + 1), Blobs, MaxFinalRSS, TestDir); + +cacheleak_loop(_Count, _Blobs, _MaxFinalRSS, _TestDir) -> + ok. rssmem() -> Cmd = io_lib:format("ps -o rss= -p ~s", [os:getpid()]), - S = string:strip(os:cmd(Cmd), both), + % Don't try to use eunit's ?cmd macro here, it won't do the right thing. + S = string:strip(os:cmd(Cmd), left), % only matters that the 1st character is $0-$9 case string:to_integer(S) of {error, _} -> - io:format(user, "Error parsing integer in: ~s\n", [S]), + eleveldb:terminal_format("Error parsing integer in: ~s\n", [S]), error; {I, _} -> I end. + +-endif. % TEST diff --git a/test/cleanup.erl b/test/cleanup.erl index 9a1c6af3..abb3f8b4 100644 --- a/test/cleanup.erl +++ b/test/cleanup.erl @@ -1,8 +1,6 @@ %% ------------------------------------------------------------------- %% -%% eleveldb: Erlang Wrapper for LevelDB (http://code.google.com/p/leveldb/) -%% -%% Copyright (c) 2010 Basho Technologies, Inc. All Rights Reserved. +%% Copyright (c) 2013-2017 Basho Technologies, Inc. %% %% This file is provided to you under the Apache License, %% Version 2.0 (the "License"); you may not use this file @@ -20,148 +18,141 @@ %% %% ------------------------------------------------------------------- -%% Test various scenarios that properly and improperly close LevelDB DB/iterator -%% handles and ensure everything cleans up properly. - +%% Test various scenarios that properly and improperly close LevelDB +%% DB/iterator handles and ensure everything cleans up properly. -module(cleanup). --compile(export_all). - +-ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). --define(COMMON_INSTANCE_DIR, "/tmp/eleveldb.cleanup.test"). +-define(local_test(Timeout, TestFunc), + fun(TestRoot) -> + Title = erlang:atom_to_list(TestFunc), + TestDir = filename:join(TestRoot, TestFunc), + {Title, {timeout, Timeout, fun() -> TestFunc(TestDir) end}} + end +). +-define(local_test(TestFunc), ?local_test(10, TestFunc)). + +cleanup_test_() -> + {foreach, + fun eleveldb:create_test_dir/0, + fun eleveldb:delete_test_dir/1, + [ + ?local_test(test_open_twice), + ?local_test(test_open_close), + ?local_test(test_open_exit), + ?local_test(test_iterator), + ?local_test(15, test_iterator_db_close), + ?local_test(15, test_iterator_exit) + ] + }. %% Purposely reopen an already opened database to test failure assumption -assumption_test() -> - DB = open(), - try - io:format(user, "assumption_test: top\n", []), - ok = failed_open(), - io:format(user, "assumption_test: bottom\n", []), - ok - after - eleveldb:close(DB), - timer:sleep(500) - end. +test_open_twice(TestDir) -> + DB = eleveldb:assert_open(TestDir), + ?assertMatch({error, {db_open, _}}, + eleveldb:open(TestDir, [{create_if_missing, true}])), + eleveldb:assert_close(DB). %% Open/close -open_close_test() -> - DB = open(), - eleveldb:close(DB), - check(). +test_open_close(TestDir) -> + check_open_close(TestDir), + check_open_close(TestDir). %% Open w/o close -open_exit_test() -> - spawn_wait(fun() -> - _DB = open() - end), - timer:sleep(500), - check(). +test_open_exit(TestDir) -> + spawn_wait(fun() -> eleveldb:assert_open(TestDir) end), + check_open_close(TestDir). %% Iterator open/close -iterator_test() -> - DB = open(), - try - write(100, DB), - {ok, Itr} = eleveldb:iterator(DB, []), - iterate(Itr), - eleveldb:iterator_close(Itr), - eleveldb:close(DB), - check(), - ok - after - catch eleveldb:close(DB), - timer:sleep(500) - end. +test_iterator(TestDir) -> + DB = eleveldb:assert_open(TestDir), + ?assertEqual(ok, write(100, DB)), + ItrRet = eleveldb:iterator(DB, []), + ?assertMatch({ok, _}, ItrRet), + {_, Itr} = ItrRet, + ?assertEqual(ok, iterate(Itr)), + ?assertEqual(ok, eleveldb:iterator_close(Itr)), + eleveldb:assert_close(DB), + check_open_close(TestDir). %% Close DB while iterator running %% Expected: reopen should fail while iterator reference alive %% however, iterator should fail after DB is closed %% once iterator process exits, open should succeed -iterator_db_close_test() -> - DB = open(), - try - write(100, DB), - Parent = self(), - spawn_monitor(fun() -> - {ok, Itr} = eleveldb:iterator(DB, []), - Parent ! continue, - try - iterate(Itr, 10) - catch - error:badarg -> - ok - end, - try - eleveldb:iterator_close(Itr) - catch - error:badarg -> - ok - end - end), - receive continue -> ok end, - eleveldb:close(DB), - %%failed_open(), - wait_down(), - erlang:garbage_collect(), - timer:sleep(500), - check(), - ok - after - catch eleveldb:close(DB), - timer:sleep(500) - end. +test_iterator_db_close(TestDir) -> + DB = eleveldb:assert_open(TestDir), + ?assertEqual(ok, write(100, DB)), + Parent = self(), + {Pid, Mon} = Proc = erlang:spawn_monitor( + fun() -> + {ok, Itr} = eleveldb:iterator(DB, []), + Parent ! continue, + try + iterate(Itr, 10) + catch + error:badarg -> + ok + end, + try + eleveldb:iterator_close(Itr) + catch + error:badarg -> + ok + end + end), + ?assertEqual(ok, receive + continue -> + ok; + {'DOWN', Mon, process, Pid, Info} -> + Info + end), + eleveldb:assert_close(DB), + ?assertEqual(ok, wait_down(Proc)), + check_open_close(TestDir). %% Iterate open, iterator process exit w/o close -iterator_exit_test() -> - DB = open(), - try - write(100, DB), - spawn_wait(fun() -> - {ok, Itr} = eleveldb:iterator(DB, []), - iterate(Itr) - end), - eleveldb:close(DB), - check(), - ok - after - catch eleveldb:close(DB), - timer:sleep(500) - end. +test_iterator_exit(TestDir) -> + DB = eleveldb:assert_open(TestDir), + ?assertEqual(ok, write(100, DB)), + spawn_wait(fun() -> + {ok, Itr} = eleveldb:iterator(DB, []), + iterate(Itr) + end), + eleveldb:assert_close(DB), + check_open_close(TestDir). spawn_wait(F) -> - spawn_monitor(F), - wait_down(). + wait_down(erlang:spawn_monitor(F)). -wait_down() -> - receive {'DOWN', _, process, _, _} -> +wait_down({Pid, Mon}) when erlang:is_pid(Pid) andalso erlang:is_reference(Mon) -> + receive + {'DOWN', Mon, process, Pid, _} -> + ok + end; +wait_down(Mon) when erlang:is_reference(Mon) -> + receive + {'DOWN', Mon, process, _, _} -> + ok + end; +wait_down(Pid) when erlang:is_pid(Pid) -> + receive + {'DOWN', _, process, Pid, _} -> ok end. -check() -> - timer:sleep(500), - DB = open(), - eleveldb:close(DB), - timer:sleep(500), - ok. - -open() -> - {ok, Ref} = eleveldb:open(?COMMON_INSTANCE_DIR, - [{create_if_missing, true}]), - Ref. - -failed_open() -> - {error, {db_open, _}} = eleveldb:open(?COMMON_INSTANCE_DIR, - [{create_if_missing, true}]), - ok. +check_open_close(TestDir) -> + eleveldb:assert_close(eleveldb:assert_open(TestDir)). write(N, DB) -> write(0, N, DB). write(Same, Same, _DB) -> ok; write(N, End, DB) -> - eleveldb:put(DB, <>, <>, []), - write(N+1, End, DB). + KV = <>, + ?assertEqual(ok, eleveldb:put(DB, KV, KV, [])), + write((N + 1), End, DB). iterate(Itr) -> iterate(Itr, 0). @@ -174,5 +165,6 @@ do_iterate({ok, K, _V}, {Itr, Expected, Delay}) -> <> = K, ?assertEqual(Expected, N), (Delay == 0) orelse timer:sleep(Delay), - do_iterate(eleveldb:iterator_move(Itr, next), - {Itr, Expected + 1, Delay}). + do_iterate(eleveldb:iterator_move(Itr, next), {Itr, (Expected + 1), Delay}). + +-endif. % TEST diff --git a/test/iterators.erl b/test/iterators.erl index 438f77b4..ce303308 100644 --- a/test/iterators.erl +++ b/test/iterators.erl @@ -1,8 +1,6 @@ %% ------------------------------------------------------------------- %% -%% eleveldb: Erlang Wrapper for LevelDB (http://code.google.com/p/leveldb/) -%% -%% Copyright (c) 2010-2013 Basho Technologies, Inc. All Rights Reserved. +%% Copyright (c) 2013-2017 Basho Technologies, Inc. %% %% This file is provided to you under the Apache License, %% Version 2.0 (the "License"); you may not use this file @@ -19,128 +17,131 @@ %% under the License. %% %% ------------------------------------------------------------------- --module(iterators). --compile(export_all). +-module(iterators). -ifdef(TEST). - -include_lib("eunit/include/eunit.hrl"). iterator_test_() -> - {spawn, - [{setup, - fun setup/0, - fun cleanup/1, - fun(Ref) -> - [ - prev_test_case(Ref), - seek_and_next_test_case(Ref), - basic_prefetch_test_case(Ref), - seek_and_prefetch_test_case(Ref), - aae_prefetch1(Ref), - aae_prefetch2(Ref), - aae_prefetch3(Ref) - ] - end}] - }. + {spawn, [ + {setup, + fun setup/0, + fun cleanup/1, + fun({_, Ref}) -> [ + prev_test_case(Ref), + seek_and_next_test_case(Ref), + basic_prefetch_test_case(Ref), + seek_and_prefetch_test_case(Ref), + aae_prefetch1(Ref), + aae_prefetch2(Ref), + aae_prefetch3(Ref) + ] end + }]}. setup() -> - os:cmd("rm -rf ltest"), % NOTE - {ok, Ref} = eleveldb:open("ltest", [{create_if_missing, true}]), - eleveldb:put(Ref, <<"a">>, <<"w">>, []), - eleveldb:put(Ref, <<"b">>, <<"x">>, []), - eleveldb:put(Ref, <<"c">>, <<"y">>, []), - eleveldb:put(Ref, <<"d">>, <<"z">>, []), - Ref. - -cleanup(Ref) -> - eleveldb:close(Ref). + Dir = eleveldb:create_test_dir(), + Ref = eleveldb:assert_open(Dir), + ?assertEqual(ok, eleveldb:put(Ref, <<"a">>, <<"w">>, [])), + ?assertEqual(ok, eleveldb:put(Ref, <<"b">>, <<"x">>, [])), + ?assertEqual(ok, eleveldb:put(Ref, <<"c">>, <<"y">>, [])), + ?assertEqual(ok, eleveldb:put(Ref, <<"d">>, <<"z">>, [])), + {Dir, Ref}. + +cleanup({Dir, Ref}) -> + eleveldb:assert_close(Ref), + eleveldb:delete_test_dir(Dir). + +assert_iterator(DbRef, ItrOpts) -> + ItrRet = eleveldb:iterator(DbRef, ItrOpts), + ?assertMatch({ok, _}, ItrRet), + {_, Itr} = ItrRet, + Itr. prev_test_case(Ref) -> fun() -> - {ok, I} = eleveldb:iterator(Ref, []), - ?assertEqual({ok, <<"a">>, <<"w">>},eleveldb:iterator_move(I, <<>>)), - ?assertEqual({ok, <<"b">>, <<"x">>},eleveldb:iterator_move(I, next)), - ?assertEqual({ok, <<"a">>, <<"w">>},eleveldb:iterator_move(I, prev)) + I = assert_iterator(Ref, []), + ?assertEqual({ok, <<"a">>, <<"w">>}, eleveldb:iterator_move(I, <<>>)), + ?assertEqual({ok, <<"b">>, <<"x">>}, eleveldb:iterator_move(I, next)), + ?assertEqual({ok, <<"a">>, <<"w">>}, eleveldb:iterator_move(I, prev)) end. seek_and_next_test_case(Ref) -> fun() -> - {ok, I} = eleveldb:iterator(Ref, []), - ?assertEqual({ok, <<"b">>, <<"x">>},eleveldb:iterator_move(I, <<"b">>)), - ?assertEqual({ok, <<"c">>, <<"y">>},eleveldb:iterator_move(I, next)), - ?assertEqual({ok, <<"a">>, <<"w">>},eleveldb:iterator_move(I, <<"a">>)), - ?assertEqual({ok, <<"b">>, <<"x">>},eleveldb:iterator_move(I, next)), - ?assertEqual({ok, <<"c">>, <<"y">>},eleveldb:iterator_move(I, next)) + I = assert_iterator(Ref, []), + ?assertEqual({ok, <<"b">>, <<"x">>}, eleveldb:iterator_move(I, <<"b">>)), + ?assertEqual({ok, <<"c">>, <<"y">>}, eleveldb:iterator_move(I, next)), + ?assertEqual({ok, <<"a">>, <<"w">>}, eleveldb:iterator_move(I, <<"a">>)), + ?assertEqual({ok, <<"b">>, <<"x">>}, eleveldb:iterator_move(I, next)), + ?assertEqual({ok, <<"c">>, <<"y">>}, eleveldb:iterator_move(I, next)) end. basic_prefetch_test_case(Ref) -> fun() -> - {ok, I} = eleveldb:iterator(Ref, []), - ?assertEqual({ok, <<"a">>, <<"w">>},eleveldb:iterator_move(I, <<>>)), - ?assertEqual({ok, <<"b">>, <<"x">>},eleveldb:iterator_move(I, prefetch)), - ?assertEqual({ok, <<"c">>, <<"y">>},eleveldb:iterator_move(I, prefetch)), - ?assertEqual({ok, <<"d">>, <<"z">>},eleveldb:iterator_move(I, prefetch)) + I = assert_iterator(Ref, []), + ?assertEqual({ok, <<"a">>, <<"w">>}, eleveldb:iterator_move(I, <<>>)), + ?assertEqual({ok, <<"b">>, <<"x">>}, eleveldb:iterator_move(I, prefetch)), + ?assertEqual({ok, <<"c">>, <<"y">>}, eleveldb:iterator_move(I, prefetch)), + ?assertEqual({ok, <<"d">>, <<"z">>}, eleveldb:iterator_move(I, prefetch)) end. seek_and_prefetch_test_case(Ref) -> fun() -> - {ok, I} = eleveldb:iterator(Ref, []), - ?assertEqual({ok, <<"b">>, <<"x">>},eleveldb:iterator_move(I, <<"b">>)), - ?assertEqual({ok, <<"c">>, <<"y">>},eleveldb:iterator_move(I, prefetch)), - ?assertEqual({ok, <<"d">>, <<"z">>},eleveldb:iterator_move(I, prefetch_stop)), - ?assertEqual({ok, <<"a">>, <<"w">>},eleveldb:iterator_move(I, <<"a">>)), - ?assertEqual({ok, <<"b">>, <<"x">>},eleveldb:iterator_move(I, prefetch)), - ?assertEqual({ok, <<"c">>, <<"y">>},eleveldb:iterator_move(I, prefetch_stop)), - ?assertEqual({ok, <<"a">>, <<"w">>},eleveldb:iterator_move(I, <<"a">>)), - ?assertEqual({ok, <<"b">>, <<"x">>},eleveldb:iterator_move(I, prefetch_stop)), - ?assertEqual({ok, <<"c">>, <<"y">>},eleveldb:iterator_move(I, prefetch_stop)), - ?assertEqual({ok, <<"d">>, <<"z">>},eleveldb:iterator_move(I, prefetch_stop)), - ?assertEqual({ok, <<"a">>, <<"w">>},eleveldb:iterator_move(I, <<"a">>)), - ?assertEqual({ok, <<"b">>, <<"x">>},eleveldb:iterator_move(I, prefetch)), - ?assertEqual({ok, <<"c">>, <<"y">>},eleveldb:iterator_move(I, prefetch_stop)), - ?assertEqual({ok, <<"d">>, <<"z">>},eleveldb:iterator_move(I, prefetch)), - ?assertEqual({error,invalid_iterator},eleveldb:iterator_move(I, prefetch_stop)), - ?assertEqual({ok, <<"a">>, <<"w">>},eleveldb:iterator_move(I, <<"a">>)), - ?assertEqual({ok, <<"b">>, <<"x">>},eleveldb:iterator_move(I, prefetch)), - ?assertEqual({ok, <<"c">>, <<"y">>},eleveldb:iterator_move(I, prefetch)), - ?assertEqual({ok, <<"d">>, <<"z">>},eleveldb:iterator_move(I, prefetch)), - ?assertEqual({error,invalid_iterator},eleveldb:iterator_move(I, prefetch)), - ?assertEqual({error,invalid_iterator},eleveldb:iterator_move(I, prefetch_stop)), - ?assertEqual({error,invalid_iterator},eleveldb:iterator_move(I, prefetch_stop)), - ?assertEqual({error,invalid_iterator},eleveldb:iterator_move(I, prefetch_stop)), - ?assertEqual({ok, <<"a">>, <<"w">>},eleveldb:iterator_move(I, <<"a">>)) + I = assert_iterator(Ref, []), + ?assertEqual({ok, <<"b">>, <<"x">>}, eleveldb:iterator_move(I, <<"b">>)), + ?assertEqual({ok, <<"c">>, <<"y">>}, eleveldb:iterator_move(I, prefetch)), + ?assertEqual({ok, <<"d">>, <<"z">>}, eleveldb:iterator_move(I, prefetch_stop)), + ?assertEqual({ok, <<"a">>, <<"w">>}, eleveldb:iterator_move(I, <<"a">>)), + ?assertEqual({ok, <<"b">>, <<"x">>}, eleveldb:iterator_move(I, prefetch)), + ?assertEqual({ok, <<"c">>, <<"y">>}, eleveldb:iterator_move(I, prefetch_stop)), + ?assertEqual({ok, <<"a">>, <<"w">>}, eleveldb:iterator_move(I, <<"a">>)), + ?assertEqual({ok, <<"b">>, <<"x">>}, eleveldb:iterator_move(I, prefetch_stop)), + ?assertEqual({ok, <<"c">>, <<"y">>}, eleveldb:iterator_move(I, prefetch_stop)), + ?assertEqual({ok, <<"d">>, <<"z">>}, eleveldb:iterator_move(I, prefetch_stop)), + ?assertEqual({ok, <<"a">>, <<"w">>}, eleveldb:iterator_move(I, <<"a">>)), + ?assertEqual({ok, <<"b">>, <<"x">>}, eleveldb:iterator_move(I, prefetch)), + ?assertEqual({ok, <<"c">>, <<"y">>}, eleveldb:iterator_move(I, prefetch_stop)), + ?assertEqual({ok, <<"d">>, <<"z">>}, eleveldb:iterator_move(I, prefetch)), + ?assertEqual({error, invalid_iterator}, eleveldb:iterator_move(I, prefetch_stop)), + ?assertEqual({ok, <<"a">>, <<"w">>}, eleveldb:iterator_move(I, <<"a">>)), + ?assertEqual({ok, <<"b">>, <<"x">>}, eleveldb:iterator_move(I, prefetch)), + ?assertEqual({ok, <<"c">>, <<"y">>}, eleveldb:iterator_move(I, prefetch)), + ?assertEqual({ok, <<"d">>, <<"z">>}, eleveldb:iterator_move(I, prefetch)), + ?assertEqual({error, invalid_iterator}, eleveldb:iterator_move(I, prefetch)), + ?assertEqual({error, invalid_iterator}, eleveldb:iterator_move(I, prefetch_stop)), + ?assertEqual({error, invalid_iterator}, eleveldb:iterator_move(I, prefetch_stop)), + ?assertEqual({error, invalid_iterator}, eleveldb:iterator_move(I, prefetch_stop)), + ?assertEqual({ok, <<"a">>, <<"w">>}, eleveldb:iterator_move(I, <<"a">>)) end. aae_prefetch1(Ref) -> fun() -> - {ok, I} = eleveldb:iterator(Ref, []), - ?assertEqual({ok, <<"b">>, <<"x">>},eleveldb:iterator_move(I, <<"b">>)), - ?assertEqual({ok, <<"c">>, <<"y">>},eleveldb:iterator_move(I, prefetch_stop)), + I = assert_iterator(Ref, []), + ?assertEqual({ok, <<"b">>, <<"x">>}, eleveldb:iterator_move(I, <<"b">>)), + ?assertEqual({ok, <<"c">>, <<"y">>}, eleveldb:iterator_move(I, prefetch_stop)), - {ok, J} = eleveldb:iterator(Ref, []), - ?assertEqual({error, invalid_iterator},eleveldb:iterator_move(J, <<"z">>)), - ?assertEqual({error, invalid_iterator},eleveldb:iterator_move(J, prefetch_stop)) + J = assert_iterator(Ref, []), + ?assertEqual({error, invalid_iterator}, eleveldb:iterator_move(J, <<"z">>)), + ?assertEqual({error, invalid_iterator}, eleveldb:iterator_move(J, prefetch_stop)) end. aae_prefetch2(Ref) -> fun() -> - {ok, I} = eleveldb:iterator(Ref, []), - ?assertEqual({ok, <<"b">>, <<"x">>},eleveldb:iterator_move(I, <<"b">>)), - ?assertEqual({ok, <<"c">>, <<"y">>},eleveldb:iterator_move(I, prefetch)), - ?assertEqual({ok, <<"d">>, <<"z">>},eleveldb:iterator_move(I, prefetch_stop)), - - {ok, J} = eleveldb:iterator(Ref, []), - ?assertEqual({error, invalid_iterator},eleveldb:iterator_move(J, <<"z">>)), - ?assertEqual({error, invalid_iterator},eleveldb:iterator_move(J, prefetch)), - ?assertEqual({error, invalid_iterator},eleveldb:iterator_move(J, prefetch_stop)) + I = assert_iterator(Ref, []), + ?assertEqual({ok, <<"b">>, <<"x">>}, eleveldb:iterator_move(I, <<"b">>)), + ?assertEqual({ok, <<"c">>, <<"y">>}, eleveldb:iterator_move(I, prefetch)), + ?assertEqual({ok, <<"d">>, <<"z">>}, eleveldb:iterator_move(I, prefetch_stop)), + + J = assert_iterator(Ref, []), + ?assertEqual({error, invalid_iterator}, eleveldb:iterator_move(J, <<"z">>)), + ?assertEqual({error, invalid_iterator}, eleveldb:iterator_move(J, prefetch)), + ?assertEqual({error, invalid_iterator}, eleveldb:iterator_move(J, prefetch_stop)) end. aae_prefetch3(Ref) -> fun() -> - {ok, I} = eleveldb:iterator(Ref, []), - ?assertEqual({error,invalid_iterator},eleveldb:iterator_move(I, prefetch_stop)) + I = assert_iterator(Ref, []), + ?assertEqual({error, invalid_iterator}, eleveldb:iterator_move(I, prefetch_stop)) end. --endif. +-endif. % TEST diff --git a/test/rand_gen_1.erl b/test/rand_gen_1.erl index 704f0e88..37410601 100644 --- a/test/rand_gen_1.erl +++ b/test/rand_gen_1.erl @@ -24,13 +24,13 @@ random_bin(_Id, Size) -> %% Make keys that look like this: <<"001328681207_012345">> %% The suffix part (after the underscore) will be assigned either -%% erlang:now/0's milliseconds or an integer between 0 and MaxSuffix. +%% os:timestamp/0's milliseconds or an integer between 0 and MaxSuffix. %% The integer between 0 & MaxSuffix will be chosen PercentAlmostSeq %% percent of the time. almost_completely_sequential(_Id, MaxSuffix, PercentAlmostSeq) -> fun() -> - {A, B, C} = now(), + {A, B, C} = os:timestamp(), TimeT = (A*1000000) + B, End = case random:uniform(100) of N when N < PercentAlmostSeq -> @@ -45,8 +45,8 @@ almost_completely_sequential(_Id, MaxSuffix, PercentAlmostSeq) -> %% Make keys that look like this: <<"001328681207_012345">>. %% %% With probability of 1 - (MillionNotSequential/1000000), the keys -%% will be generated using erlang:now/0, where the suffix is exactly -%% equal to the microseconds portion of erlang:now/0's return value. +%% will be generated using os:timestamp/0, where the suffix is exactly +%% equal to the microseconds portion of os:timestamp/0's return value. %% Such keys will be perfectly sorted for time series-style keys: each %% key will be "greater than" any previous key. %% @@ -61,7 +61,7 @@ almost_completely_sequential(_Id, MaxSuffix, PercentAlmostSeq) -> mostly_sequential(_Id, MillionNotSequential) -> fun() -> - {A, B, C} = now(), + {A, B, C} = os:timestamp(), {X, Y, Z} = case random:uniform(1000*1000) of N when N < MillionNotSequential -> {A - random:uniform(3),