Skip to content

Commit 03ee71e

Browse files
committed
Fixes in acquire phase and threadpool.h
1 parent a9c6792 commit 03ee71e

File tree

2 files changed

+92
-161
lines changed

2 files changed

+92
-161
lines changed

src/rt/sched/behaviourcore.h

Lines changed: 90 additions & 129 deletions
Original file line numberDiff line numberDiff line change
@@ -454,10 +454,8 @@ namespace verona::rt
454454

455455
/**
456456
* Remove `n` from the exec_count_down.
457-
*
458-
* Returns true if this call makes the count_down_zero
459457
*/
460-
bool resolve(size_t n = 1, bool fifo = true)
458+
void resolve(size_t n = 1, bool fifo = true)
461459
{
462460
Logging::cout() << "Behaviour::resolve " << n << " for behaviour "
463461
<< *this << Logging::endl;
@@ -469,10 +467,7 @@ namespace verona::rt
469467
{
470468
Logging::cout() << "Scheduling Behaviour " << *this << Logging::endl;
471469
Scheduler::schedule(as_work(), fifo);
472-
return true;
473470
}
474-
475-
return false;
476471
}
477472

478473
// TODO: When C++ 20 move to span.
@@ -488,6 +483,32 @@ namespace verona::rt
488483
return pointer_offset<T>(slots, sizeof(Slot) * count);
489484
}
490485

486+
static void acquire_with_transfer(Cown* cown, size_t transfer, size_t required)
487+
{
488+
if (transfer == required)
489+
return;
490+
491+
if (transfer > required)
492+
{
493+
Logging::cout() << "Releasing references as more transferred than "
494+
"required: transfer: "
495+
<< transfer << " required: " << required << " on cown "
496+
<< cown << Logging::endl;
497+
// Release transfer - required times, we needed one as we woke up
498+
// the cown, but the rest were not required.
499+
for (int j = 0; j < transfer - required; j++)
500+
Cown::release(ThreadAlloc::get(), cown);
501+
return;
502+
}
503+
504+
Logging::cout() << "Acquiring addition reference count: transfer: "
505+
<< transfer << " required: " << required << " on cown "
506+
<< cown << Logging::endl;
507+
// We didn't have any RCs passed in, so we need to acquire one.
508+
for (int j = 0; j < required - transfer; j++)
509+
Cown::acquire(cown);
510+
}
511+
491512
/**
492513
* @brief Constructs a behaviour. Leaves space for the closure.
493514
*
@@ -742,15 +763,15 @@ namespace verona::rt
742763
// Mark the slot as ready for scheduling
743764
curr_slot->reset_status();
744765
yield();
745-
746-
if (curr_slot->is_read_only())
747-
{
766+
if(curr_slot->is_read_only())
748767
curr_slot->set_behaviour(body);
749-
yield();
750-
auto prev_slot =
751-
cown->last_slot.exchange(curr_slot, std::memory_order_acq_rel);
752-
yield();
753-
if (prev_slot == nullptr)
768+
yield();
769+
auto prev_slot =
770+
cown->last_slot.exchange(curr_slot, std::memory_order_acq_rel);
771+
772+
if(prev_slot == nullptr)
773+
{
774+
if(curr_slot->is_read_only())
754775
{
755776
yield();
756777
bool first_reader = cown->read_ref_count.add_read();
@@ -760,136 +781,76 @@ namespace verona::rt
760781
curr_slot->set_active();
761782
ec[std::get<0>(indexes[first_chain_index])]++;
762783
yield();
763-
if (transfer_count)
764-
{
765-
Logging::cout()
766-
<< "Releasing reader transferred count " << transfer_count
767-
<< " -1 for first in queue " << *curr_slot << Logging::endl;
768-
// Release transfer_count - 1 times, we needed one as we woke up
769-
// the cown, but the rest were not required.
770-
for (int j = 0; j < transfer_count - 1; j++)
771-
Cown::release(ThreadAlloc::get(), cown);
772-
}
773-
else
774-
{
775-
Logging::cout() << "Acquiring reader reference count for first "
776-
"in queue on cown "
777-
<< *curr_slot << Logging::endl;
778-
// We didn't have any RCs passed in, so we need to acquire one.
779-
Cown::acquire(cown);
780-
}
781-
782-
if (first_reader)
783-
{
784-
Logging::cout()
785-
<< "Acquiring reference count for the first reader on cown "
786-
<< *curr_slot << Logging::endl;
787-
Cown::acquire(cown);
788-
}
789-
784+
acquire_with_transfer(cown, transfer_count, 1 + first_reader);
790785
continue;
791786
}
792-
else
787+
788+
cown->next_writer.store(body, std::memory_order_release);
789+
yield();
790+
acquire_with_transfer(cown, transfer_count, 1);
791+
792+
if (
793+
!cown->read_ref_count.any_reader() &&
794+
cown->next_writer.exchange(nullptr, std::memory_order_acq_rel) ==
795+
body)
793796
{
794797
yield();
795-
Logging::cout() << " Someone in queue cown " << *curr_slot
796-
<< " previous " << *prev_slot << Logging::endl;
798+
Logging::cout() << " Writer at head of queue and got the cown "
799+
<< *curr_slot << Logging::endl;
800+
ec[std::get<0>(indexes[first_chain_index])]++;
801+
yield();
802+
continue;
803+
}
804+
Logging::cout() << " Writer waiting for previous readers cown "
805+
<< *curr_slot << Logging::endl;
806+
continue;
807+
}
797808

798-
while (prev_slot->is_wait_2pl())
799-
{
800-
Systematic::yield_until(
801-
[prev_slot]() { return !prev_slot->is_wait_2pl(); });
802-
Aal::pause();
803-
}
809+
yield();
810+
Logging::cout() << " Someone in queue cown " << *curr_slot
811+
<< " previous " << *prev_slot << Logging::endl;
804812

805-
if (prev_slot->set_next_slot_reader(curr_slot))
806-
{
807-
Logging::cout()
808-
<< " Previous slot is a writer or blocked reader cown "
809-
<< *curr_slot << " previous " << *prev_slot << Logging::endl;
810-
yield();
811-
continue;
812-
}
813-
else
814-
{
815-
yield();
816-
bool first_reader = cown->read_ref_count.add_read();
817-
Logging::cout() << " Reader got the cown " << *curr_slot
818-
<< " previous " << *prev_slot << Logging::endl;
819-
yield();
820-
curr_slot->set_active();
821-
ec[std::get<0>(indexes[first_chain_index])]++;
822-
if (first_reader)
823-
{
824-
Logging::cout()
825-
<< "Acquiring reference count for first reader on cown "
826-
<< *curr_slot << Logging::endl;
827-
Cown::acquire(cown);
828-
}
829-
continue;
830-
}
831-
}
813+
while (prev_slot->is_wait_2pl())
814+
{
815+
Systematic::yield_until(
816+
[prev_slot]() { return !prev_slot->is_wait_2pl(); });
817+
Aal::pause();
832818
}
833-
else
819+
820+
if (curr_slot->is_read_only())
834821
{
835-
auto prev_slot =
836-
cown->last_slot.exchange(curr_slot, std::memory_order_acq_rel);
837-
yield();
838-
if (prev_slot == nullptr)
822+
if (prev_slot->set_next_slot_reader(curr_slot))
839823
{
840-
cown->next_writer.store(body, std::memory_order_release);
824+
Logging::cout()
825+
<< " Previous slot is a writer or blocked reader cown "
826+
<< *curr_slot << " previous " << *prev_slot << Logging::endl;
841827
yield();
842-
if (transfer_count)
843-
{
844-
Logging::cout()
845-
<< "Releasing writer transferred count " << transfer_count
846-
<< " -1 for first in queue cown " << *curr_slot
847-
<< Logging::endl;
848-
// Release transfer_count - 1 times, we needed one as we woke up
849-
// the cown, but the rest were not required.
850-
for (int j = 0; j < transfer_count - 1; j++)
851-
Cown::release(ThreadAlloc::get(), cown);
852-
}
853-
else
854-
{
855-
Logging::cout() << "Acquiring writer reference count on cown "
856-
<< *curr_slot << Logging::endl;
857-
// We didn't have any RCs passed in, so we need to acquire one.
858-
Cown::acquire(cown);
859-
}
860-
861-
if (
862-
!cown->read_ref_count.any_reader() &&
863-
cown->next_writer.exchange(nullptr, std::memory_order_acq_rel) ==
864-
body)
865-
{
866-
yield();
867-
Logging::cout() << " Writer at head of queue and got the cown "
868-
<< *curr_slot << Logging::endl;
869-
ec[std::get<0>(indexes[first_chain_index])]++;
870-
yield();
871-
continue;
872-
}
873-
Logging::cout() << " Writer waiting for previous readers cown "
874-
<< *curr_slot << Logging::endl;
828+
continue;
875829
}
876-
else
830+
831+
yield();
832+
bool first_reader = cown->read_ref_count.add_read();
833+
Logging::cout() << " Reader got the cown " << *curr_slot
834+
<< " previous " << *prev_slot << Logging::endl;
835+
yield();
836+
curr_slot->set_active();
837+
ec[std::get<0>(indexes[first_chain_index])]++;
838+
if (first_reader)
877839
{
878-
while (prev_slot->is_wait_2pl())
879-
{
880-
Systematic::yield_until(
881-
[prev_slot]() { return !prev_slot->is_wait_2pl(); });
882-
Aal::pause();
883-
}
884840
Logging::cout()
885-
<< " Writer waiting for cown " << *curr_slot << Logging::endl;
886-
prev_slot->set_next_slot_writer(body);
887-
Logging::cout()
888-
<< " Writer Set next of previous slot cown " << *curr_slot
889-
<< " previous " << *prev_slot << Logging::endl;
841+
<< "Acquiring reference count for first reader on cown "
842+
<< *curr_slot << Logging::endl;
843+
Cown::acquire(cown);
890844
}
845+
continue;
891846
}
892-
847+
848+
Logging::cout()
849+
<< " Writer waiting for cown " << *curr_slot << Logging::endl;
850+
prev_slot->set_next_slot_writer(body);
851+
Logging::cout()
852+
<< " Writer Set next of previous slot cown " << *curr_slot
853+
<< " previous " << *prev_slot << Logging::endl;
893854
yield();
894855
}
895856

src/rt/sched/threadpool.h

Lines changed: 2 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -199,38 +199,7 @@ namespace verona::rt
199199
T::schedule_lifo(core, w);
200200
}
201201

202-
void init(size_t count)
203-
{
204-
Logging::cout() << "Init runtime" << Logging::endl;
205-
206-
if ((thread_count != 0) || (count == 0))
207-
abort();
208-
209-
thread_count = count;
210-
teardown_in_progress = false;
211-
212-
// Initialize the corepool.
213-
core_pool.init(count);
214-
215-
// For future ids.
216-
systematic_ids = count + 1;
217-
218-
for (; count > 0; count--)
219-
{
220-
T* t = new T;
221-
t->systematic_id = count;
222-
#ifdef USE_SYSTEMATIC_TESTING
223-
t->local_systematic =
224-
Systematic::create_systematic_thread(t->systematic_id);
225-
#endif
226-
threads.add_free(t);
227-
}
228-
Logging::cout() << "Runtime initialised" << Logging::endl;
229-
init_barrier();
230-
}
231-
232-
template<typename RunAtTermination>
233-
void init(size_t count, RunAtTermination run_at_termination)
202+
void init(size_t count, void (*run_at_termination)(void) = nullptr)
234203
{
235204
Logging::cout() << "Init runtime" << Logging::endl;
236205

@@ -260,6 +229,7 @@ namespace verona::rt
260229
Logging::cout() << "Runtime initialised" << Logging::endl;
261230
init_barrier();
262231
}
232+
263233
void run()
264234
{
265235
run_with_startup<>(&nop);

0 commit comments

Comments
 (0)