15 #include "threadSimpleManager.h"
17 #ifdef THREAD_SIMPLE_IMPL
19 #include "threadSimpleImpl.h"
20 #include "blockerSimple.h"
21 #include "mainThread.h"
24 #define WIN32_LEAN_AND_MEAN
28 bool ThreadSimpleManager::_pointers_initialized;
29 ThreadSimpleManager *ThreadSimpleManager::_global_ptr;
37 ThreadSimpleManager() :
38 _simple_thread_epoch_timeslice
39 (
"simple-thread-epoch-timeslice", 0.05,
40 PRC_DESC(
"When SIMPLE_THREADS is defined, this defines the amount of time, "
41 "in seconds, that should be considered the "
42 "typical timeslice for one epoch (to run all threads once).")),
43 _simple_thread_volunteer_delay
44 (
"simple-thread-volunteer-delay", 0.0,
45 PRC_DESC(
"When SIMPLE_THREADS is defined, this defines the amount of time, "
46 "in seconds, for which a task that voluntarily yields should "
48 _simple_thread_yield_sleep
49 (
"simple-thread-yield-sleep", 0.001,
50 PRC_DESC(
"When SIMPLE_THREADS is defined, this defines the amount of time, "
51 "in seconds, for which the process should be put to sleep when "
52 "yielding the timeslice to the system.")),
54 (
"simple-thread-window", 1.0,
55 PRC_DESC(
"When SIMPLE_THREADS is defined, this defines the amount of time, "
56 "in seconds, over which to average all the threads' runtimes, "
57 "for the purpose of scheduling threads.")),
58 _simple_thread_low_weight
59 (
"simple-thread-low-weight", 0.2,
60 PRC_DESC(
"When SIMPLE_THREADS is defined, this determines the relative "
61 "amount of time that is given to threads with priority TP_low.")),
62 _simple_thread_normal_weight
63 (
"simple-thread-normal-weight", 1.0,
64 PRC_DESC(
"When SIMPLE_THREADS is defined, this determines the relative "
65 "amount of time that is given to threads with priority TP_normal.")),
66 _simple_thread_high_weight
67 (
"simple-thread-high-weight", 5.0,
68 PRC_DESC(
"When SIMPLE_THREADS is defined, this determines the relative "
69 "amount of time that is given to threads with priority TP_high.")),
70 _simple_thread_urgent_weight
71 (
"simple-thread-urgent-weight", 10.0,
72 PRC_DESC(
"When SIMPLE_THREADS is defined, this determines the relative "
73 "amount of time that is given to threads with priority TP_urgent."))
75 _tick_scale = 1000000.0;
77 _current_thread = NULL;
79 _waiting_for_exit = NULL;
100 void ThreadSimpleManager::
101 enqueue_ready(ThreadSimpleImpl *thread,
bool volunteer) {
105 _next_ready.push_back(thread);
111 double now = get_current_time();
112 thread->_wake_time = now + _simple_thread_volunteer_delay;
113 _volunteers.push_back(thread);
114 push_heap(_volunteers.begin(), _volunteers.end(), CompareStartTime());
126 void ThreadSimpleManager::
127 enqueue_sleep(ThreadSimpleImpl *thread,
double seconds) {
128 if (thread_cat->is_debug()) {
130 << *_current_thread->_parent_obj <<
" sleeping for "
131 << seconds <<
" seconds\n";
134 double now = get_current_time();
135 thread->_wake_time = now + seconds;
136 _sleeping.push_back(thread);
137 push_heap(_sleeping.begin(), _sleeping.end(), CompareStartTime());
147 void ThreadSimpleManager::
148 enqueue_block(ThreadSimpleImpl *thread, BlockerSimple *blocker) {
149 _blocked[blocker].push_back(thread);
150 blocker->_flags |= BlockerSimple::F_has_waiters;
160 bool ThreadSimpleManager::
161 unblock_one(BlockerSimple *blocker) {
162 Blocked::iterator bi = _blocked.find(blocker);
163 if (bi != _blocked.end()) {
164 nassertr(blocker->_flags & BlockerSimple::F_has_waiters,
false);
166 FifoThreads &threads = (*bi).second;
167 nassertr(!threads.empty(),
false);
168 ThreadSimpleImpl *thread = threads.front();
170 _ready.push_back(thread);
171 if (threads.empty()) {
172 blocker->_flags &= ~BlockerSimple::F_has_waiters;
189 bool ThreadSimpleManager::
190 unblock_all(BlockerSimple *blocker) {
191 Blocked::iterator bi = _blocked.find(blocker);
192 if (bi != _blocked.end()) {
193 nassertr(blocker->_flags & BlockerSimple::F_has_waiters,
false);
195 FifoThreads &threads = (*bi).second;
196 nassertr(!threads.empty(),
false);
197 while (!threads.empty()) {
198 ThreadSimpleImpl *thread = threads.front();
200 _ready.push_back(thread);
202 blocker->_flags &= ~BlockerSimple::F_has_waiters;
218 void ThreadSimpleManager::
219 enqueue_finished(ThreadSimpleImpl *thread) {
220 _finished.push_back(thread);
230 void ThreadSimpleManager::
231 preempt(ThreadSimpleImpl *thread) {
232 FifoThreads::iterator ti;
233 ti = find(_ready.begin(), _ready.end(), thread);
234 if (ti != _ready.end()) {
236 _ready.push_front(thread);
253 void ThreadSimpleManager::
257 while (!_finished.empty() && _finished.front() != _current_thread) {
258 ThreadSimpleImpl *finished_thread = _finished.front();
259 _finished.pop_front();
260 unref_delete(finished_thread->_parent_obj);
267 _current_thread->_python_state = PyThreadState_Swap(NULL);
268 #endif // HAVE_PYTHON
272 if (pstats_callback != NULL) {
277 save_thread_context(_current_thread->_context, st_choose_next_context,
this);
282 if (pstats_callback != NULL) {
283 pstats_callback->
activate_hook(_current_thread->_parent_obj);
288 PyThreadState_Swap(_current_thread->_python_state);
289 #endif // HAVE_PYTHON
300 void ThreadSimpleManager::
302 if (!_current_thread->_parent_obj->is_exact_type(MainThread::get_class_type())) {
303 if (thread_cat->is_debug()) {
305 <<
"Ignoring prepare_for_exit called from "
306 << *(_current_thread->_parent_obj) <<
"\n";
311 if (thread_cat->is_debug()) {
313 <<
"prepare_for_exit\n";
316 nassertv(_waiting_for_exit == NULL);
317 _waiting_for_exit = _current_thread;
321 kill_non_joinable(_ready);
323 Blocked::iterator bi = _blocked.begin();
324 while (bi != _blocked.end()) {
325 Blocked::iterator bnext = bi;
327 BlockerSimple *blocker = (*bi).first;
328 FifoThreads &threads = (*bi).second;
329 kill_non_joinable(threads);
330 if (threads.empty()) {
331 blocker->_flags &= ~BlockerSimple::F_has_waiters;
337 kill_non_joinable(_sleeping);
338 kill_non_joinable(_volunteers);
343 while (!_finished.empty() && _finished.front() != _current_thread) {
344 ThreadSimpleImpl *finished_thread = _finished.front();
345 _finished.pop_front();
346 unref_delete(finished_thread->_parent_obj);
357 void ThreadSimpleManager::
358 set_current_thread(ThreadSimpleImpl *current_thread) {
359 nassertv(_current_thread == (ThreadSimpleImpl *)NULL);
360 _current_thread = current_thread;
369 void ThreadSimpleManager::
370 remove_thread(ThreadSimpleImpl *thread) {
371 TickRecords new_records;
372 TickRecords::iterator ri;
373 for (ri = _tick_records.begin(); ri != _tick_records.end(); ++ri) {
374 if ((*ri)._thread != thread) {
376 new_records.push_back(*ri);
379 nassertv(_total_ticks >= (*ri)._tick_count);
380 _total_ticks -= (*ri)._tick_count;
384 _tick_records.swap(new_records);
394 void ThreadSimpleManager::
395 system_sleep(
double seconds) {
397 Sleep((
int)(seconds * 1000 + 0.5));
410 tv.tv_sec = time_t(seconds);
411 tv.tv_usec = long((seconds - (
double)tv.tv_sec) * 1000000.0 + 0.5);
412 select(0, NULL, NULL, NULL, &tv);
421 void ThreadSimpleManager::
422 write_status(ostream &out)
const {
423 out <<
"Currently running: " << *_current_thread->_parent_obj <<
"\n";
426 FifoThreads::const_iterator ti;
427 Sleeping::const_iterator si;
428 for (ti = _ready.begin(); ti != _ready.end(); ++ti) {
429 out <<
" " << *(*ti)->_parent_obj;
431 for (ti = _next_ready.begin(); ti != _next_ready.end(); ++ti) {
432 out <<
" " << *(*ti)->_parent_obj;
434 for (si = _volunteers.begin(); si != _volunteers.end(); ++si) {
435 out <<
" " << *(*si)->_parent_obj;
439 double now = get_current_time();
443 Sleeping s2 = _sleeping;
444 sort(s2.begin(), s2.end(), CompareStartTime());
445 for (si = s2.begin(); si != s2.end(); ++si) {
446 out <<
" " << *(*si)->_parent_obj <<
"(" << (*si)->_wake_time - now
451 Blocked::const_iterator bi;
452 for (bi = _blocked.begin(); bi != _blocked.end(); ++bi) {
453 BlockerSimple *blocker = (*bi).first;
454 const FifoThreads &threads = (*bi).second;
455 out <<
"On blocker " << blocker <<
":\n";
456 FifoThreads::const_iterator ti;
457 for (ti = threads.begin(); ti != threads.end(); ++ti) {
458 ThreadSimpleImpl *thread = (*ti);
459 out <<
" " << *thread->_parent_obj;
462 thread->_parent_obj->output_blocker(out);
464 #endif // DEBUG_THREADS
476 void ThreadSimpleManager::
478 if (!_pointers_initialized) {
483 if (thread_cat->is_debug()) {
495 system_sleep(_global_ptr->_simple_thread_yield_sleep);
505 double ThreadSimpleManager::
506 get_current_time()
const {
507 return _clock->get_short_raw_time();
516 void ThreadSimpleManager::
518 if (!_pointers_initialized) {
519 _pointers_initialized =
true;
520 _global_ptr =
new ThreadSimpleManager;
527 #if PY_VERSION_HEX >= 0x03020000
531 PyEval_InitThreads();
542 void ThreadSimpleManager::
543 st_choose_next_context(
struct ThreadContext *from_context,
void *data) {
544 ThreadSimpleManager *
self = (ThreadSimpleManager *)data;
545 self->choose_next_context(from_context);
554 void ThreadSimpleManager::
555 choose_next_context(
struct ThreadContext *from_context) {
556 double now = get_current_time();
558 do_timeslice_accounting(_current_thread, now);
559 _current_thread = NULL;
561 if (!_sleeping.empty() || !_volunteers.empty()) {
562 if (_ready.empty() && _next_ready.empty()) {
565 wake_all_sleepers(_volunteers);
570 now = get_current_time();
572 wake_sleepers(_sleeping, now);
573 wake_sleepers(_volunteers, now);
576 bool new_epoch = !_ready.empty() && _next_ready.empty();
581 while (_ready.empty()) {
582 if (!_next_ready.empty()) {
584 _ready.swap(_next_ready);
586 if (new_epoch && !_tick_records.empty()) {
590 if (thread_cat->is_debug()) {
592 <<
"All threads exceeded budget.\n";
594 TickRecord &record = _tick_records.front();
595 _total_ticks -= record._tick_count;
597 if (record._thread->_run_ticks >= record._tick_count) {
599 record._thread->_run_ticks -= record._tick_count;
603 record._thread->_run_ticks = 0;
605 _tick_records.pop_front();
609 }
else if (!_volunteers.empty()) {
612 if (thread_cat->is_debug()) {
614 <<
"Waking volunteers.\n";
619 now = get_current_time();
620 wake_all_sleepers(_volunteers);
621 wake_sleepers(_sleeping, now);
623 }
else if (!_sleeping.empty()) {
625 double wait = _sleeping.front()->_wake_time - now;
627 if (thread_cat->is_debug()) {
629 <<
"Sleeping all threads " << wait <<
" seconds\n";
633 now = get_current_time();
634 wake_sleepers(_sleeping, now);
635 wake_sleepers(_volunteers, now);
639 if (_waiting_for_exit != NULL) {
643 _ready.push_back(_waiting_for_exit);
644 _waiting_for_exit = NULL;
651 if (!_blocked.empty()) {
653 <<
"Deadlock! All threads blocked.\n";
662 <<
"All threads disappeared!\n";
667 ThreadSimpleImpl *chosen_thread = _ready.front();
670 double timeslice = determine_timeslice(chosen_thread);
671 if (timeslice > 0.0) {
673 chosen_thread->_start_time = now;
674 chosen_thread->_stop_time = now + timeslice;
675 _current_thread = chosen_thread;
682 _next_ready.push_back(chosen_thread);
686 if (thread_cat->is_debug()) {
687 size_t blocked_count = 0;
688 Blocked::const_iterator bi;
689 for (bi = _blocked.begin(); bi != _blocked.end(); ++bi) {
690 const FifoThreads &threads = (*bi).second;
691 blocked_count += threads.size();
694 double timeslice = _current_thread->_stop_time - _current_thread->_start_time;
696 <<
"Switching to " << *_current_thread->_parent_obj
697 <<
" for " << timeslice <<
" s ("
698 << _ready.size() <<
" + " << _next_ready.size()
699 <<
" + " << _volunteers.size()
700 <<
" other threads ready, " << blocked_count
701 <<
" blocked, " << _sleeping.size() <<
" sleeping)\n";
704 switch_to_thread_context(from_context, _current_thread->_context);
717 void ThreadSimpleManager::
718 do_timeslice_accounting(ThreadSimpleImpl *thread,
double now) {
719 double elapsed = now - thread->_start_time;
720 if (thread_cat.is_debug()) {
722 << *thread->_parent_obj <<
" ran for " << elapsed <<
" s of "
723 << thread->_stop_time - thread->_start_time <<
" requested.\n";
728 elapsed = max(elapsed, 0.0);
730 unsigned int ticks = (
unsigned int)(elapsed * _tick_scale + 0.5);
731 thread->_run_ticks += ticks;
734 unsigned int ticks_window = (
unsigned int)(_simple_thread_window * _tick_scale + 0.5);
735 while (_total_ticks > ticks_window) {
736 nassertv(!_tick_records.empty());
737 TickRecord &record = _tick_records.front();
738 _total_ticks -= record._tick_count;
739 if (record._thread->_run_ticks >= record._tick_count) {
741 record._thread->_run_ticks -= record._tick_count;
745 record._thread->_run_ticks = 0;
747 _tick_records.pop_front();
752 record._tick_count = ticks;
753 record._thread = thread;
754 _tick_records.push_back(record);
755 _total_ticks += ticks;
765 void ThreadSimpleManager::
766 wake_sleepers(ThreadSimpleManager::Sleeping &sleepers,
double now) {
767 while (!sleepers.empty() && sleepers.front()->_wake_time <= now) {
768 ThreadSimpleImpl *thread = sleepers.front();
769 pop_heap(sleepers.begin(), sleepers.end(), CompareStartTime());
771 _ready.push_back(thread);
781 void ThreadSimpleManager::
782 wake_all_sleepers(ThreadSimpleManager::Sleeping &sleepers) {
783 while (!sleepers.empty()) {
784 ThreadSimpleImpl *thread = sleepers.front();
785 pop_heap(sleepers.begin(), sleepers.end(), CompareStartTime());
787 _ready.push_back(thread);
796 void ThreadSimpleManager::
798 Blocked::const_iterator bi;
799 for (bi = _blocked.begin(); bi != _blocked.end(); ++bi) {
800 BlockerSimple *blocker = (*bi).first;
801 const FifoThreads &threads = (*bi).second;
803 <<
"On blocker " << blocker <<
":\n";
804 FifoThreads::const_iterator ti;
805 for (ti = threads.begin(); ti != threads.end(); ++ti) {
806 ThreadSimpleImpl *thread = (*ti);
808 <<
" " << *thread->_parent_obj;
810 thread_cat.info(
false) <<
" (";
811 thread->_parent_obj->output_blocker(thread_cat.info(
false));
812 thread_cat.info(
false) <<
")";
813 #endif // DEBUG_THREADS
814 thread_cat.info(
false) <<
"\n";
827 double ThreadSimpleManager::
828 determine_timeslice(ThreadSimpleImpl *chosen_thread) {
829 if (_ready.empty() && _next_ready.empty()) {
831 return _simple_thread_epoch_timeslice;
835 unsigned int total_ticks = chosen_thread->_run_ticks;
836 double total_weight = chosen_thread->_priority_weight;
838 FifoThreads::const_iterator ti;
839 for (ti = _ready.begin(); ti != _ready.end(); ++ti) {
840 total_ticks += (*ti)->_run_ticks;
841 total_weight += (*ti)->_priority_weight;
843 for (ti = _next_ready.begin(); ti != _next_ready.end(); ++ti) {
844 total_ticks += (*ti)->_run_ticks;
845 total_weight += (*ti)->_priority_weight;
848 nassertr(total_weight != 0.0, 0.0);
849 double budget_ratio = chosen_thread->_priority_weight / total_weight;
851 if (total_ticks == 0) {
853 return budget_ratio * _simple_thread_epoch_timeslice;
856 double run_ratio = (double)chosen_thread->_run_ticks / (
double)total_ticks;
857 double remaining_ratio = budget_ratio - run_ratio;
859 if (thread_cat->is_debug()) {
861 << *chosen_thread->_parent_obj <<
" accrued "
862 << chosen_thread->_run_ticks / _tick_scale <<
" s of "
863 << total_ticks / _tick_scale <<
"; budget is "
864 << budget_ratio * total_ticks / _tick_scale <<
".\n";
865 if (remaining_ratio <= 0.0) {
867 <<
"Exceeded budget.\n";
871 return remaining_ratio * _simple_thread_epoch_timeslice;
880 void ThreadSimpleManager::
881 kill_non_joinable(ThreadSimpleManager::FifoThreads &threads) {
882 FifoThreads new_threads;
883 FifoThreads::iterator ti;
884 for (ti = threads.begin(); ti != threads.end(); ++ti) {
885 ThreadSimpleImpl *thread = (*ti);
886 if (thread->_joinable) {
887 new_threads.push_back(thread);
889 if (thread_cat->is_debug()) {
891 <<
"Killing " << *thread->_parent_obj <<
"\n";
893 thread->_status = ThreadSimpleImpl::TS_killed;
894 enqueue_finished(thread);
898 threads.swap(new_threads);
907 void ThreadSimpleManager::
908 kill_non_joinable(ThreadSimpleManager::Sleeping &threads) {
909 Sleeping new_threads;
910 Sleeping::iterator ti;
911 for (ti = threads.begin(); ti != threads.end(); ++ti) {
912 ThreadSimpleImpl *thread = (*ti);
913 if (thread->_joinable) {
914 new_threads.push_back(thread);
916 if (thread_cat->is_debug()) {
918 <<
"Killing " << *thread->_parent_obj <<
"\n";
920 thread->_status = ThreadSimpleImpl::TS_killed;
921 enqueue_finished(thread);
924 make_heap(new_threads.begin(), new_threads.end(), CompareStartTime());
925 threads.swap(new_threads);
928 #endif // THREAD_SIMPLE_IMPL
static TrueClock * get_global_ptr()
Returns a pointer to the one TrueClock object in the world.
static void consider_yield()
Possibly suspends the current thread for the rest of the current epoch, if it has run for enough this...
static void force_yield()
Suspends the current thread for the rest of the current epoch.
static Thread * get_main_thread()
Returns a pointer to the "main" Thread object–this is the Thread that started the whole process...
virtual void activate_hook(Thread *thread)
Called when the thread is activated (resumes execution).
virtual void deactivate_hook(Thread *thread)
Called when the thread is deactivated (swapped for another running thread).