16#ifdef THREAD_SIMPLE_IMPL
23#ifndef WIN32_LEAN_AND_MEAN
24#define WIN32_LEAN_AND_MEAN 1
29bool ThreadSimpleManager::_pointers_initialized;
30ThreadSimpleManager *ThreadSimpleManager::_global_ptr;
36ThreadSimpleManager() :
37 _simple_thread_epoch_timeslice
38 (
"simple-thread-epoch-timeslice", 0.05,
39 PRC_DESC(
"When SIMPLE_THREADS is defined, this defines the amount of time, "
40 "in seconds, that should be considered the "
41 "typical timeslice for one epoch (to run all threads once).")),
42 _simple_thread_volunteer_delay
43 (
"simple-thread-volunteer-delay", 0.0,
44 PRC_DESC(
"When SIMPLE_THREADS is defined, this defines the amount of time, "
45 "in seconds, for which a task that voluntarily yields should "
47 _simple_thread_yield_sleep
48 (
"simple-thread-yield-sleep", 0.001,
49 PRC_DESC(
"When SIMPLE_THREADS is defined, this defines the amount of time, "
50 "in seconds, for which the process should be put to sleep when "
51 "yielding the timeslice to the system.")),
53 (
"simple-thread-window", 1.0,
54 PRC_DESC(
"When SIMPLE_THREADS is defined, this defines the amount of time, "
55 "in seconds, over which to average all the threads' runtimes, "
56 "for the purpose of scheduling threads.")),
57 _simple_thread_low_weight
58 (
"simple-thread-low-weight", 0.2,
59 PRC_DESC(
"When SIMPLE_THREADS is defined, this determines the relative "
60 "amount of time that is given to threads with priority TP_low.")),
61 _simple_thread_normal_weight
62 (
"simple-thread-normal-weight", 1.0,
63 PRC_DESC(
"When SIMPLE_THREADS is defined, this determines the relative "
64 "amount of time that is given to threads with priority TP_normal.")),
65 _simple_thread_high_weight
66 (
"simple-thread-high-weight", 5.0,
67 PRC_DESC(
"When SIMPLE_THREADS is defined, this determines the relative "
68 "amount of time that is given to threads with priority TP_high.")),
69 _simple_thread_urgent_weight
70 (
"simple-thread-urgent-weight", 10.0,
71 PRC_DESC(
"When SIMPLE_THREADS is defined, this determines the relative "
72 "amount of time that is given to threads with priority TP_urgent."))
74 _tick_scale = 1000000.0;
76 _current_thread =
nullptr;
78 _waiting_for_exit =
nullptr;
95void ThreadSimpleManager::
96enqueue_ready(ThreadSimpleImpl *thread,
bool volunteer) {
100 _next_ready.push_back(thread);
106 double now = get_current_time();
107 thread->_wake_time = now + _simple_thread_volunteer_delay;
108 _volunteers.push_back(thread);
109 push_heap(_volunteers.begin(), _volunteers.end(), CompareStartTime());
118void ThreadSimpleManager::
119enqueue_sleep(ThreadSimpleImpl *thread,
double seconds) {
120 if (thread_cat->is_debug()) {
122 << *_current_thread->_parent_obj <<
" sleeping for "
123 << seconds <<
" seconds\n";
126 double now = get_current_time();
127 thread->_wake_time = now + seconds;
128 _sleeping.push_back(thread);
129 push_heap(_sleeping.begin(), _sleeping.end(), CompareStartTime());
137void ThreadSimpleManager::
138enqueue_block(ThreadSimpleImpl *thread, BlockerSimple *blocker) {
139 _blocked[blocker].push_back(thread);
140 blocker->_flags |= BlockerSimple::F_has_waiters;
147bool ThreadSimpleManager::
148unblock_one(BlockerSimple *blocker) {
149 Blocked::iterator bi = _blocked.find(blocker);
150 if (bi != _blocked.end()) {
151 nassertr(blocker->_flags & BlockerSimple::F_has_waiters,
false);
153 FifoThreads &threads = (*bi).second;
154 nassertr(!threads.empty(),
false);
155 ThreadSimpleImpl *thread = threads.front();
157 _ready.push_back(thread);
158 if (threads.empty()) {
159 blocker->_flags &= ~BlockerSimple::F_has_waiters;
173bool ThreadSimpleManager::
174unblock_all(BlockerSimple *blocker) {
175 Blocked::iterator bi = _blocked.find(blocker);
176 if (bi != _blocked.end()) {
177 nassertr(blocker->_flags & BlockerSimple::F_has_waiters,
false);
179 FifoThreads &threads = (*bi).second;
180 nassertr(!threads.empty(),
false);
181 while (!threads.empty()) {
182 ThreadSimpleImpl *thread = threads.front();
184 _ready.push_back(thread);
186 blocker->_flags &= ~BlockerSimple::F_has_waiters;
199void ThreadSimpleManager::
200enqueue_finished(ThreadSimpleImpl *thread) {
201 _finished.push_back(thread);
208void ThreadSimpleManager::
209preempt(ThreadSimpleImpl *thread) {
210 FifoThreads::iterator ti;
211 ti = find(_ready.begin(), _ready.end(), thread);
212 if (ti != _ready.end()) {
214 _ready.push_front(thread);
226void ThreadSimpleManager::
230 while (!_finished.empty() && _finished.front() != _current_thread) {
231 ThreadSimpleImpl *finished_thread = _finished.front();
232 _finished.pop_front();
240 _current_thread->_python_state = thread_state_swap(
nullptr);
245 if (pstats_callback !=
nullptr) {
250 save_thread_context(_current_thread->_context, st_choose_next_context,
this);
255 if (pstats_callback !=
nullptr) {
256 pstats_callback->
activate_hook(_current_thread->_parent_obj);
261 thread_state_swap(_current_thread->_python_state);
270void ThreadSimpleManager::
272 if (!_current_thread->_parent_obj->is_exact_type(MainThread::get_class_type())) {
273 if (thread_cat->is_debug()) {
275 <<
"Ignoring prepare_for_exit called from "
276 << *(_current_thread->_parent_obj) <<
"\n";
281 if (thread_cat->is_debug()) {
283 <<
"prepare_for_exit\n";
286 nassertv(_waiting_for_exit ==
nullptr);
287 _waiting_for_exit = _current_thread;
291 kill_non_joinable(_ready);
293 Blocked::iterator bi = _blocked.begin();
294 while (bi != _blocked.end()) {
295 Blocked::iterator bnext = bi;
297 BlockerSimple *blocker = (*bi).first;
298 FifoThreads &threads = (*bi).second;
299 kill_non_joinable(threads);
300 if (threads.empty()) {
301 blocker->_flags &= ~BlockerSimple::F_has_waiters;
307 kill_non_joinable(_sleeping);
308 kill_non_joinable(_volunteers);
313 while (!_finished.empty() && _finished.front() != _current_thread) {
314 ThreadSimpleImpl *finished_thread = _finished.front();
315 _finished.pop_front();
324void ThreadSimpleManager::
325set_current_thread(ThreadSimpleImpl *current_thread) {
326 nassertv(_current_thread ==
nullptr);
327 _current_thread = current_thread;
334void ThreadSimpleManager::
335remove_thread(ThreadSimpleImpl *thread) {
336 TickRecords new_records;
337 TickRecords::iterator ri;
338 for (ri = _tick_records.begin(); ri != _tick_records.end(); ++ri) {
339 if ((*ri)._thread != thread) {
341 new_records.push_back(*ri);
344 nassertv(_total_ticks >= (*ri)._tick_count);
345 _total_ticks -= (*ri)._tick_count;
349 _tick_records.swap(new_records);
356void ThreadSimpleManager::
357system_sleep(
double seconds) {
359 Sleep((
int)(seconds * 1000 + 0.5));
372 tv.tv_sec = time_t(seconds);
373 tv.tv_usec = long((seconds - (
double)tv.tv_sec) * 1000000.0 + 0.5);
374 select(0,
nullptr,
nullptr,
nullptr, &tv);
381void ThreadSimpleManager::
382write_status(std::ostream &out)
const {
383 out <<
"Currently running: " << *_current_thread->_parent_obj <<
"\n";
386 FifoThreads::const_iterator ti;
387 Sleeping::const_iterator si;
388 for (ti = _ready.begin(); ti != _ready.end(); ++ti) {
389 out <<
" " << *(*ti)->_parent_obj;
391 for (ti = _next_ready.begin(); ti != _next_ready.end(); ++ti) {
392 out <<
" " << *(*ti)->_parent_obj;
394 for (si = _volunteers.begin(); si != _volunteers.end(); ++si) {
395 out <<
" " << *(*si)->_parent_obj;
399 double now = get_current_time();
403 Sleeping s2 = _sleeping;
404 sort(s2.begin(), s2.end(), CompareStartTime());
405 for (si = s2.begin(); si != s2.end(); ++si) {
406 out <<
" " << *(*si)->_parent_obj <<
"(" << (*si)->_wake_time - now
411 Blocked::const_iterator bi;
412 for (bi = _blocked.begin(); bi != _blocked.end(); ++bi) {
413 BlockerSimple *blocker = (*bi).first;
414 const FifoThreads &threads = (*bi).second;
415 out <<
"On blocker " << blocker <<
":\n";
416 FifoThreads::const_iterator ti;
417 for (ti = threads.begin(); ti != threads.end(); ++ti) {
418 ThreadSimpleImpl *thread = (*ti);
419 out <<
" " << *thread->_parent_obj;
422 thread->_parent_obj->output_blocker(out);
434void ThreadSimpleManager::
436 if (!_pointers_initialized) {
441 if (thread_cat->is_debug()) {
452 system_sleep(_global_ptr->_simple_thread_yield_sleep);
459double ThreadSimpleManager::
460get_current_time()
const {
461 return _clock->get_short_raw_time();
467void ThreadSimpleManager::
469 if (!_pointers_initialized) {
470 _pointers_initialized =
true;
471 _global_ptr =
new ThreadSimpleManager;
479void ThreadSimpleManager::
480st_choose_next_context(
struct ThreadContext *from_context,
void *data) {
481 ThreadSimpleManager *self = (ThreadSimpleManager *)data;
482 self->choose_next_context(from_context);
488void ThreadSimpleManager::
489choose_next_context(
struct ThreadContext *from_context) {
490 double now = get_current_time();
492 do_timeslice_accounting(_current_thread, now);
493 _current_thread =
nullptr;
495 if (!_sleeping.empty() || !_volunteers.empty()) {
496 if (_ready.empty() && _next_ready.empty()) {
499 wake_all_sleepers(_volunteers);
504 now = get_current_time();
506 wake_sleepers(_sleeping, now);
507 wake_sleepers(_volunteers, now);
510 bool new_epoch = !_ready.empty() && _next_ready.empty();
515 while (_ready.empty()) {
516 if (!_next_ready.empty()) {
518 _ready.swap(_next_ready);
520 if (new_epoch && !_tick_records.empty()) {
524 if (thread_cat->is_debug()) {
526 <<
"All threads exceeded budget.\n";
528 TickRecord &record = _tick_records.front();
529 _total_ticks -= record._tick_count;
531 if (record._thread->_run_ticks >= record._tick_count) {
533 record._thread->_run_ticks -= record._tick_count;
537 record._thread->_run_ticks = 0;
539 _tick_records.pop_front();
543 }
else if (!_volunteers.empty()) {
546 if (thread_cat->is_debug()) {
548 <<
"Waking volunteers.\n";
553 now = get_current_time();
554 wake_all_sleepers(_volunteers);
555 wake_sleepers(_sleeping, now);
557 }
else if (!_sleeping.empty()) {
559 double wait = _sleeping.front()->_wake_time - now;
561 if (thread_cat->is_debug()) {
563 <<
"Sleeping all threads " << wait <<
" seconds\n";
567 now = get_current_time();
568 wake_sleepers(_sleeping, now);
569 wake_sleepers(_volunteers, now);
573 if (_waiting_for_exit !=
nullptr) {
577 _ready.push_back(_waiting_for_exit);
578 _waiting_for_exit =
nullptr;
584 if (!_blocked.empty()) {
586 <<
"Deadlock! All threads blocked.\n";
595 <<
"All threads disappeared!\n";
600 ThreadSimpleImpl *chosen_thread = _ready.front();
603 double timeslice = determine_timeslice(chosen_thread);
604 if (timeslice > 0.0) {
606 chosen_thread->_start_time = now;
607 chosen_thread->_stop_time = now + timeslice;
608 _current_thread = chosen_thread;
614 _next_ready.push_back(chosen_thread);
618 if (thread_cat->is_debug()) {
619 size_t blocked_count = 0;
620 Blocked::const_iterator bi;
621 for (bi = _blocked.begin(); bi != _blocked.end(); ++bi) {
622 const FifoThreads &threads = (*bi).second;
623 blocked_count += threads.size();
626 double timeslice = _current_thread->_stop_time - _current_thread->_start_time;
628 <<
"Switching to " << *_current_thread->_parent_obj
629 <<
" for " << timeslice <<
" s ("
630 << _ready.size() <<
" + " << _next_ready.size()
631 <<
" + " << _volunteers.size()
632 <<
" other threads ready, " << blocked_count
633 <<
" blocked, " << _sleeping.size() <<
" sleeping)\n";
636 switch_to_thread_context(from_context, _current_thread->_context);
647void ThreadSimpleManager::
648do_timeslice_accounting(ThreadSimpleImpl *thread,
double now) {
649 double elapsed = now - thread->_start_time;
650 if (thread_cat.is_debug()) {
652 << *thread->_parent_obj <<
" ran for " << elapsed <<
" s of "
653 << thread->_stop_time - thread->_start_time <<
" requested.\n";
658 elapsed = std::max(elapsed, 0.0);
660 unsigned int ticks = (
unsigned int)(elapsed * _tick_scale + 0.5);
661 thread->_run_ticks += ticks;
664 unsigned int ticks_window = (
unsigned int)(_simple_thread_window * _tick_scale + 0.5);
665 while (_total_ticks > ticks_window) {
666 nassertv(!_tick_records.empty());
667 TickRecord &record = _tick_records.front();
668 _total_ticks -= record._tick_count;
669 if (record._thread->_run_ticks >= record._tick_count) {
671 record._thread->_run_ticks -= record._tick_count;
675 record._thread->_run_ticks = 0;
677 _tick_records.pop_front();
682 record._tick_count = ticks;
683 record._thread = thread;
684 _tick_records.push_back(record);
685 _total_ticks += ticks;
693void ThreadSimpleManager::
694wake_sleepers(ThreadSimpleManager::Sleeping &sleepers,
double now) {
695 while (!sleepers.empty() && sleepers.front()->_wake_time <= now) {
696 ThreadSimpleImpl *thread = sleepers.front();
697 pop_heap(sleepers.begin(), sleepers.end(), CompareStartTime());
699 _ready.push_back(thread);
707void ThreadSimpleManager::
708wake_all_sleepers(ThreadSimpleManager::Sleeping &sleepers) {
709 while (!sleepers.empty()) {
710 ThreadSimpleImpl *thread = sleepers.front();
711 pop_heap(sleepers.begin(), sleepers.end(), CompareStartTime());
713 _ready.push_back(thread);
720void ThreadSimpleManager::
722 Blocked::const_iterator bi;
723 for (bi = _blocked.begin(); bi != _blocked.end(); ++bi) {
724 BlockerSimple *blocker = (*bi).first;
725 const FifoThreads &threads = (*bi).second;
727 <<
"On blocker " << blocker <<
":\n";
728 FifoThreads::const_iterator ti;
729 for (ti = threads.begin(); ti != threads.end(); ++ti) {
730 ThreadSimpleImpl *thread = (*ti);
732 <<
" " << *thread->_parent_obj;
734 thread_cat.info(
false) <<
" (";
735 thread->_parent_obj->output_blocker(thread_cat.info(
false));
736 thread_cat.info(
false) <<
")";
738 thread_cat.info(
false) <<
"\n";
748double ThreadSimpleManager::
749determine_timeslice(ThreadSimpleImpl *chosen_thread) {
750 if (_ready.empty() && _next_ready.empty()) {
752 return _simple_thread_epoch_timeslice;
756 unsigned int total_ticks = chosen_thread->_run_ticks;
757 double total_weight = chosen_thread->_priority_weight;
759 FifoThreads::const_iterator ti;
760 for (ti = _ready.begin(); ti != _ready.end(); ++ti) {
761 total_ticks += (*ti)->_run_ticks;
762 total_weight += (*ti)->_priority_weight;
764 for (ti = _next_ready.begin(); ti != _next_ready.end(); ++ti) {
765 total_ticks += (*ti)->_run_ticks;
766 total_weight += (*ti)->_priority_weight;
769 nassertr(total_weight != 0.0, 0.0);
770 double budget_ratio = chosen_thread->_priority_weight / total_weight;
772 if (total_ticks == 0) {
774 return budget_ratio * _simple_thread_epoch_timeslice;
777 double run_ratio = (double)chosen_thread->_run_ticks / (
double)total_ticks;
778 double remaining_ratio = budget_ratio - run_ratio;
780 if (thread_cat->is_debug()) {
782 << *chosen_thread->_parent_obj <<
" accrued "
783 << chosen_thread->_run_ticks / _tick_scale <<
" s of "
784 << total_ticks / _tick_scale <<
"; budget is "
785 << budget_ratio * total_ticks / _tick_scale <<
".\n";
786 if (remaining_ratio <= 0.0) {
788 <<
"Exceeded budget.\n";
792 return remaining_ratio * _simple_thread_epoch_timeslice;
799void ThreadSimpleManager::
800kill_non_joinable(ThreadSimpleManager::FifoThreads &threads) {
801 FifoThreads new_threads;
802 FifoThreads::iterator ti;
803 for (ti = threads.begin(); ti != threads.end(); ++ti) {
804 ThreadSimpleImpl *thread = (*ti);
805 if (thread->_joinable) {
806 new_threads.push_back(thread);
808 if (thread_cat->is_debug()) {
810 <<
"Killing " << *thread->_parent_obj <<
"\n";
812 thread->_status = ThreadSimpleImpl::TS_killed;
813 enqueue_finished(thread);
817 threads.swap(new_threads);
824void ThreadSimpleManager::
825kill_non_joinable(ThreadSimpleManager::Sleeping &threads) {
826 Sleeping new_threads;
827 Sleeping::iterator ti;
828 for (ti = threads.begin(); ti != threads.end(); ++ti) {
829 ThreadSimpleImpl *thread = (*ti);
830 if (thread->_joinable) {
831 new_threads.push_back(thread);
833 if (thread_cat->is_debug()) {
835 <<
"Killing " << *thread->_parent_obj <<
"\n";
837 thread->_status = ThreadSimpleImpl::TS_killed;
838 enqueue_finished(thread);
841 make_heap(new_threads.begin(), new_threads.end(), CompareStartTime());
842 threads.swap(new_threads);
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
virtual void deactivate_hook(Thread *thread)
Called when the thread is deactivated (swapped for another running thread).
virtual void activate_hook(Thread *thread)
Called when the thread is activated (resumes execution).
get_main_thread
Returns a pointer to the "main" Thread object–this is the Thread that started the whole process.
static void consider_yield()
Possibly suspends the current thread for the rest of the current epoch, if it has run for enough this...
static void force_yield()
Suspends the current thread for the rest of the current epoch.
static TrueClock * get_global_ptr()
Returns a pointer to the one TrueClock object in the world.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
void unref_delete(RefCountType *ptr)
This global helper function will unref the given ReferenceCount object, and if the reference count re...
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.