Panda3D
threadSimpleManager.cxx
1 // Filename: threadSimpleManager.cxx
2 // Created by: drose (19Jun07)
3 //
4 ////////////////////////////////////////////////////////////////////
5 //
6 // PANDA 3D SOFTWARE
7 // Copyright (c) Carnegie Mellon University. All rights reserved.
8 //
9 // All use of this software is subject to the terms of the revised BSD
10 // license. You should have received a copy of this license along
11 // with this source code in a file named "LICENSE."
12 //
13 ////////////////////////////////////////////////////////////////////
14 
15 #include "threadSimpleManager.h"
16 
17 #ifdef THREAD_SIMPLE_IMPL
18 
19 #include "threadSimpleImpl.h"
20 #include "blockerSimple.h"
21 #include "mainThread.h"
22 
23 #ifdef WIN32
24 #define WIN32_LEAN_AND_MEAN
25 #include <windows.h>
26 #endif
27 
28 bool ThreadSimpleManager::_pointers_initialized;
29 ThreadSimpleManager *ThreadSimpleManager::_global_ptr;
30 
31 ////////////////////////////////////////////////////////////////////
32 // Function: ThreadSimpleManager::Constructor
33 // Access: Private
34 // Description:
35 ////////////////////////////////////////////////////////////////////
36 ThreadSimpleManager::
37 ThreadSimpleManager() :
38  _simple_thread_epoch_timeslice
39  ("simple-thread-epoch-timeslice", 0.05,
40  PRC_DESC("When SIMPLE_THREADS is defined, this defines the amount of time, "
41  "in seconds, that should be considered the "
42  "typical timeslice for one epoch (to run all threads once).")),
43  _simple_thread_volunteer_delay
44  ("simple-thread-volunteer-delay", 0.0,
45  PRC_DESC("When SIMPLE_THREADS is defined, this defines the amount of time, "
46  "in seconds, for which a task that voluntarily yields should "
47  "be delayed.")),
48  _simple_thread_yield_sleep
49  ("simple-thread-yield-sleep", 0.001,
50  PRC_DESC("When SIMPLE_THREADS is defined, this defines the amount of time, "
51  "in seconds, for which the process should be put to sleep when "
52  "yielding the timeslice to the system.")),
53  _simple_thread_window
54  ("simple-thread-window", 1.0,
55  PRC_DESC("When SIMPLE_THREADS is defined, this defines the amount of time, "
56  "in seconds, over which to average all the threads' runtimes, "
57  "for the purpose of scheduling threads.")),
58  _simple_thread_low_weight
59  ("simple-thread-low-weight", 0.2,
60  PRC_DESC("When SIMPLE_THREADS is defined, this determines the relative "
61  "amount of time that is given to threads with priority TP_low.")),
62  _simple_thread_normal_weight
63  ("simple-thread-normal-weight", 1.0,
64  PRC_DESC("When SIMPLE_THREADS is defined, this determines the relative "
65  "amount of time that is given to threads with priority TP_normal.")),
66  _simple_thread_high_weight
67  ("simple-thread-high-weight", 5.0,
68  PRC_DESC("When SIMPLE_THREADS is defined, this determines the relative "
69  "amount of time that is given to threads with priority TP_high.")),
70  _simple_thread_urgent_weight
71  ("simple-thread-urgent-weight", 10.0,
72  PRC_DESC("When SIMPLE_THREADS is defined, this determines the relative "
73  "amount of time that is given to threads with priority TP_urgent."))
74 {
75  _tick_scale = 1000000.0;
76  _total_ticks = 0;
77  _current_thread = NULL;
78  _clock = TrueClock::get_global_ptr();
79  _waiting_for_exit = NULL;
80 
81  // Install these global pointers so very low-level code (code
82  // defined before the pipeline directory) can yield when necessary.
83  global_thread_yield = &Thread::force_yield;
84  global_thread_consider_yield = &Thread::consider_yield;
85 }
86 
87 ////////////////////////////////////////////////////////////////////
88 // Function: ThreadSimpleManager::enqueue_ready
89 // Access: Public
90 // Description: Adds the indicated thread to the ready queue. The
91 // thread will be executed when its turn comes. If the
92 // thread is not the currently executing thread, its
93 // _jmp_context should be filled appropriately.
94 //
95 // If volunteer is true, the thread is volunteering to
96 // sleep before its timeslice has been used up. If
97 // volunteer is false, the thread would still be running
98 // if it could.
99 ////////////////////////////////////////////////////////////////////
100 void ThreadSimpleManager::
101 enqueue_ready(ThreadSimpleImpl *thread, bool volunteer) {
102  // We actually add it to _next_ready, so that we can tell when we
103  // have processed every thread in a given epoch.
104  if (!volunteer) {
105  _next_ready.push_back(thread);
106 
107  } else {
108  // Unless it's a volunteer, in which case we actually put it to
109  // sleep for the duration of the timeslice, so it won't interfere
110  // with timeslice accounting for the remaining ready threads.
111  double now = get_current_time();
112  thread->_wake_time = now + _simple_thread_volunteer_delay;
113  _volunteers.push_back(thread);
114  push_heap(_volunteers.begin(), _volunteers.end(), CompareStartTime());
115  }
116 }
117 
118 ////////////////////////////////////////////////////////////////////
119 // Function: ThreadSimpleManager::enqueue_sleep
120 // Access: Public
121 // Description: Adds the indicated thread to the sleep queue, until
122 // the indicated number of seconds have elapsed. Then
123 // the thread will be automatically moved to the ready
124 // queue.
125 ////////////////////////////////////////////////////////////////////
126 void ThreadSimpleManager::
127 enqueue_sleep(ThreadSimpleImpl *thread, double seconds) {
128  if (thread_cat->is_debug()) {
129  thread_cat.debug()
130  << *_current_thread->_parent_obj << " sleeping for "
131  << seconds << " seconds\n";
132  }
133 
134  double now = get_current_time();
135  thread->_wake_time = now + seconds;
136  _sleeping.push_back(thread);
137  push_heap(_sleeping.begin(), _sleeping.end(), CompareStartTime());
138 }
139 
140 ////////////////////////////////////////////////////////////////////
141 // Function: ThreadSimpleManager::enqueue_block
142 // Access: Public
143 // Description: Adds the indicated thread to the blocked queue for
144 // the indicated blocker. The thread will be awoken by
145 // a later call to unblock_one() or unblock_all().
146 ////////////////////////////////////////////////////////////////////
147 void ThreadSimpleManager::
148 enqueue_block(ThreadSimpleImpl *thread, BlockerSimple *blocker) {
149  _blocked[blocker].push_back(thread);
150  blocker->_flags |= BlockerSimple::F_has_waiters;
151 }
152 
153 ////////////////////////////////////////////////////////////////////
154 // Function: ThreadSimpleManager::unblock_one
155 // Access: Public
156 // Description: Unblocks one thread waiting on the indicated blocker,
157 // if any. Returns true if anything was unblocked,
158 // false otherwise.
159 ////////////////////////////////////////////////////////////////////
160 bool ThreadSimpleManager::
161 unblock_one(BlockerSimple *blocker) {
162  Blocked::iterator bi = _blocked.find(blocker);
163  if (bi != _blocked.end()) {
164  nassertr(blocker->_flags & BlockerSimple::F_has_waiters, false);
165 
166  FifoThreads &threads = (*bi).second;
167  nassertr(!threads.empty(), false);
168  ThreadSimpleImpl *thread = threads.front();
169  threads.pop_front();
170  _ready.push_back(thread);
171  if (threads.empty()) {
172  blocker->_flags &= ~BlockerSimple::F_has_waiters;
173  _blocked.erase(bi);
174  }
175 
176  return true;
177  }
178 
179  return false;
180 }
181 
182 ////////////////////////////////////////////////////////////////////
183 // Function: ThreadSimpleManager::unblock_all
184 // Access: Public
185 // Description: Unblocks all threads waiting on the indicated
186 // blocker. Returns true if anything was unblocked,
187 // false otherwise.
188 ////////////////////////////////////////////////////////////////////
189 bool ThreadSimpleManager::
190 unblock_all(BlockerSimple *blocker) {
191  Blocked::iterator bi = _blocked.find(blocker);
192  if (bi != _blocked.end()) {
193  nassertr(blocker->_flags & BlockerSimple::F_has_waiters, false);
194 
195  FifoThreads &threads = (*bi).second;
196  nassertr(!threads.empty(), false);
197  while (!threads.empty()) {
198  ThreadSimpleImpl *thread = threads.front();
199  threads.pop_front();
200  _ready.push_back(thread);
201  }
202  blocker->_flags &= ~BlockerSimple::F_has_waiters;
203  _blocked.erase(bi);
204  return true;
205  }
206  return false;
207 }
208 
209 ////////////////////////////////////////////////////////////////////
210 // Function: ThreadSimpleManager::enqueue_finished
211 // Access: Public
212 // Description: Adds the indicated thread to the finished queue.
213 // The manager will drop the reference count on the
214 // indicated thread at the next epoch. (A thread can't
215 // drop its own reference count while it is running,
216 // since that might deallocate its own stack.)
217 ////////////////////////////////////////////////////////////////////
218 void ThreadSimpleManager::
219 enqueue_finished(ThreadSimpleImpl *thread) {
220  _finished.push_back(thread);
221 }
222 
223 ////////////////////////////////////////////////////////////////////
224 // Function: ThreadSimpleManager::preempt
225 // Access: Public
226 // Description: Moves the indicated thread to the head of the ready
227 // queue. If it is not already on the ready queue, does
228 // nothing.
229 ////////////////////////////////////////////////////////////////////
230 void ThreadSimpleManager::
231 preempt(ThreadSimpleImpl *thread) {
232  FifoThreads::iterator ti;
233  ti = find(_ready.begin(), _ready.end(), thread);
234  if (ti != _ready.end()) {
235  _ready.erase(ti);
236  _ready.push_front(thread);
237  }
238 }
239 
240 ////////////////////////////////////////////////////////////////////
241 // Function: ThreadSimpleManager::next_context
242 // Access: Public
243 // Description: Switches out the currently executing thread and
244 // chooses a new thread for execution. Before calling
245 // this, the current thread should have already
246 // re-enqueued itself with a call to enqueue(), if it
247 // intends to run again.
248 //
249 // This will fill in the current thread's _jmp_context
250 // member appropriately, and then change the global
251 // current_thread pointer.
252 ////////////////////////////////////////////////////////////////////
253 void ThreadSimpleManager::
254 next_context() {
255  // Delete any threads that need it. We can't delete the current
256  // thread, though.
257  while (!_finished.empty() && _finished.front() != _current_thread) {
258  ThreadSimpleImpl *finished_thread = _finished.front();
259  _finished.pop_front();
260  unref_delete(finished_thread->_parent_obj);
261  }
262 
263  // Mark the current thread's resume point.
264 
265 #ifdef HAVE_PYTHON
266  // Save the current Python thread state.
267  _current_thread->_python_state = PyThreadState_Swap(NULL);
268 #endif // HAVE_PYTHON
269 
270 #ifdef DO_PSTATS
271  Thread::PStatsCallback *pstats_callback = _current_thread->_parent_obj->get_pstats_callback();
272  if (pstats_callback != NULL) {
273  pstats_callback->deactivate_hook(_current_thread->_parent_obj);
274  }
275 #endif // DO_PSTATS
276 
277  save_thread_context(_current_thread->_context, st_choose_next_context, this);
278  // Pass 2: we have returned into the context, and are now resuming
279  // the current thread.
280 
281 #ifdef DO_PSTATS
282  if (pstats_callback != NULL) {
283  pstats_callback->activate_hook(_current_thread->_parent_obj);
284  }
285 #endif // DO_PSTATS
286 
287 #ifdef HAVE_PYTHON
288  PyThreadState_Swap(_current_thread->_python_state);
289 #endif // HAVE_PYTHON
290 }
291 
292 ////////////////////////////////////////////////////////////////////
293 // Function: ThreadSimpleManager::prepare_for_exit
294 // Access: Public
295 // Description: Blocks until all running threads (other than the
296 // current thread) have finished. This only works when
297 // called from the main thread; if called on any other
298 // thread, nothing will happen.
299 ////////////////////////////////////////////////////////////////////
300 void ThreadSimpleManager::
301 prepare_for_exit() {
302  if (!_current_thread->_parent_obj->is_exact_type(MainThread::get_class_type())) {
303  if (thread_cat->is_debug()) {
304  thread_cat.debug()
305  << "Ignoring prepare_for_exit called from "
306  << *(_current_thread->_parent_obj) << "\n";
307  }
308  return;
309  }
310 
311  if (thread_cat->is_debug()) {
312  thread_cat.debug()
313  << "prepare_for_exit\n";
314  }
315 
316  nassertv(_waiting_for_exit == NULL);
317  _waiting_for_exit = _current_thread;
318 
319  // At this point, any non-joinable threads on any of the queues are
320  // automatically killed.
321  kill_non_joinable(_ready);
322 
323  Blocked::iterator bi = _blocked.begin();
324  while (bi != _blocked.end()) {
325  Blocked::iterator bnext = bi;
326  ++bnext;
327  BlockerSimple *blocker = (*bi).first;
328  FifoThreads &threads = (*bi).second;
329  kill_non_joinable(threads);
330  if (threads.empty()) {
331  blocker->_flags &= ~BlockerSimple::F_has_waiters;
332  _blocked.erase(bi);
333  }
334  bi = bnext;
335  }
336 
337  kill_non_joinable(_sleeping);
338  kill_non_joinable(_volunteers);
339 
340  next_context();
341 
342  // Delete any remaining threads.
343  while (!_finished.empty() && _finished.front() != _current_thread) {
344  ThreadSimpleImpl *finished_thread = _finished.front();
345  _finished.pop_front();
346  unref_delete(finished_thread->_parent_obj);
347  }
348 }
349 
350 ////////////////////////////////////////////////////////////////////
351 // Function: ThreadSimpleManager::set_current_thread
352 // Access: Public
353 // Description: Sets the initial value of the current_thread pointer,
354 // i.e. the main thread. It is valid to call this
355 // method only exactly once.
356 ////////////////////////////////////////////////////////////////////
357 void ThreadSimpleManager::
358 set_current_thread(ThreadSimpleImpl *current_thread) {
359  nassertv(_current_thread == (ThreadSimpleImpl *)NULL);
360  _current_thread = current_thread;
361 }
362 
363 ////////////////////////////////////////////////////////////////////
364 // Function: ThreadSimpleManager::remove_thread
365 // Access: Public
366 // Description: Removes the indicated thread from the accounting, for
367 // instance just before the thread destructs.
368 ////////////////////////////////////////////////////////////////////
369 void ThreadSimpleManager::
370 remove_thread(ThreadSimpleImpl *thread) {
371  TickRecords new_records;
372  TickRecords::iterator ri;
373  for (ri = _tick_records.begin(); ri != _tick_records.end(); ++ri) {
374  if ((*ri)._thread != thread) {
375  // Keep this record.
376  new_records.push_back(*ri);
377  } else {
378  // Lose this record.
379  nassertv(_total_ticks >= (*ri)._tick_count);
380  _total_ticks -= (*ri)._tick_count;
381  }
382  }
383 
384  _tick_records.swap(new_records);
385 }
386 
387 ////////////////////////////////////////////////////////////////////
388 // Function: ThreadSimpleManager::system_sleep
389 // Access: Public, Static
390 // Description: Calls the appropriate system sleep function to sleep
391 // the whole process for the indicated number of
392 // seconds.
393 ////////////////////////////////////////////////////////////////////
394 void ThreadSimpleManager::
395 system_sleep(double seconds) {
396 #ifdef WIN32
397  Sleep((int)(seconds * 1000 + 0.5));
398 
399 #else
400  /*
401  struct timespec rqtp;
402  rqtp.tv_sec = time_t(seconds);
403  rqtp.tv_nsec = long((seconds - (double)rqtp.tv_sec) * 1000000000.0 + 0.5);
404  nanosleep(&rqtp, NULL);
405  */
406 
407  // We use select() as the only way that seems to actually yield the
408  // timeslice. sleep() and nanosleep() don't appear to do the trick.
409  struct timeval tv;
410  tv.tv_sec = time_t(seconds);
411  tv.tv_usec = long((seconds - (double)tv.tv_sec) * 1000000.0 + 0.5);
412  select(0, NULL, NULL, NULL, &tv);
413 #endif // WIN32
414 }
415 
416 ////////////////////////////////////////////////////////////////////
417 // Function: ThreadSimpleManager::write_status
418 // Access: Public
419 // Description: Writes a list of threads running and threads blocked.
420 ////////////////////////////////////////////////////////////////////
421 void ThreadSimpleManager::
422 write_status(ostream &out) const {
423  out << "Currently running: " << *_current_thread->_parent_obj << "\n";
424 
425  out << "Ready:";
426  FifoThreads::const_iterator ti;
427  Sleeping::const_iterator si;
428  for (ti = _ready.begin(); ti != _ready.end(); ++ti) {
429  out << " " << *(*ti)->_parent_obj;
430  }
431  for (ti = _next_ready.begin(); ti != _next_ready.end(); ++ti) {
432  out << " " << *(*ti)->_parent_obj;
433  }
434  for (si = _volunteers.begin(); si != _volunteers.end(); ++si) {
435  out << " " << *(*si)->_parent_obj;
436  }
437  out << "\n";
438 
439  double now = get_current_time();
440 
441  out << "Sleeping:";
442  // Copy and sort for convenience.
443  Sleeping s2 = _sleeping;
444  sort(s2.begin(), s2.end(), CompareStartTime());
445  for (si = s2.begin(); si != s2.end(); ++si) {
446  out << " " << *(*si)->_parent_obj << "(" << (*si)->_wake_time - now
447  << "s)";
448  }
449  out << "\n";
450 
451  Blocked::const_iterator bi;
452  for (bi = _blocked.begin(); bi != _blocked.end(); ++bi) {
453  BlockerSimple *blocker = (*bi).first;
454  const FifoThreads &threads = (*bi).second;
455  out << "On blocker " << blocker << ":\n";
456  FifoThreads::const_iterator ti;
457  for (ti = threads.begin(); ti != threads.end(); ++ti) {
458  ThreadSimpleImpl *thread = (*ti);
459  out << " " << *thread->_parent_obj;
460 #ifdef DEBUG_THREADS
461  out << " (";
462  thread->_parent_obj->output_blocker(out);
463  out << ")";
464 #endif // DEBUG_THREADS
465  }
466  out << "\n";
467  }
468 }
469 
470 ////////////////////////////////////////////////////////////////////
471 // Function: ThreadSimpleManager::system_yield
472 // Access: Public, Static
473 // Description: Calls the appropriate system function to yield
474 // the whole process to any other system processes.
475 ////////////////////////////////////////////////////////////////////
476 void ThreadSimpleManager::
477 system_yield() {
478  if (!_pointers_initialized) {
479  // Ignore this call before we construct the global ThreadSimpleManager.
480  return;
481  }
482 
483  if (thread_cat->is_debug()) {
484  thread_cat.debug()
485  << "system_yield\n";
486  }
487 
488  // There seem to be some issues with modern operating systems not
489  // wanting to actually yield the timeslice in response to sleep(0).
490  // In particular, Windows and OSX both seemed to do nothing in that
491  // call. Whatever. We'll force the point by explicitly sleeping
492  // for 1 ms in both cases. This is user-configurable in case 1 ms
493  // is too much (though on Windows that's all the resolution you
494  // have).
495  system_sleep(_global_ptr->_simple_thread_yield_sleep);
496 }
497 
498 ////////////////////////////////////////////////////////////////////
499 // Function: ThreadSimpleManager::get_current_time
500 // Access: Public
501 // Description: Returns elapsed time in seconds from some undefined
502 // epoch, via whatever clock the manager is using for
503 // all thread timing.
504 ////////////////////////////////////////////////////////////////////
505 double ThreadSimpleManager::
506 get_current_time() const {
507  return _clock->get_short_raw_time();
508 }
509 
510 ////////////////////////////////////////////////////////////////////
511 // Function: ThreadSimpleManager::init_pointers
512 // Access: Private, Static
513 // Description: Should be called at startup to initialize the
514 // simple threading system.
515 ////////////////////////////////////////////////////////////////////
516 void ThreadSimpleManager::
517 init_pointers() {
518  if (!_pointers_initialized) {
519  _pointers_initialized = true;
520  _global_ptr = new ThreadSimpleManager;
522 
523 #ifdef HAVE_PYTHON
524  // Ensure that the Python threading system is initialized and ready
525  // to go.
526 
527 #if PY_VERSION_HEX >= 0x03020000
528  Py_Initialize();
529 #endif
530 
531  PyEval_InitThreads();
532 #endif
533  }
534 }
535 
536 ////////////////////////////////////////////////////////////////////
537 // Function: ThreadSimpleManager::st_choose_next_context
538 // Access: Private, Static
539 // Description: Select the next context to run. Continuing the work
540 // of next_context().
541 ////////////////////////////////////////////////////////////////////
542 void ThreadSimpleManager::
543 st_choose_next_context(struct ThreadContext *from_context, void *data) {
544  ThreadSimpleManager *self = (ThreadSimpleManager *)data;
545  self->choose_next_context(from_context);
546 }
547 
548 ////////////////////////////////////////////////////////////////////
549 // Function: ThreadSimpleManager::choose_next_context
550 // Access: Private
551 // Description: Select the next context to run. Continuing the work
552 // of next_context().
553 ////////////////////////////////////////////////////////////////////
554 void ThreadSimpleManager::
555 choose_next_context(struct ThreadContext *from_context) {
556  double now = get_current_time();
557 
558  do_timeslice_accounting(_current_thread, now);
559  _current_thread = NULL;
560 
561  if (!_sleeping.empty() || !_volunteers.empty()) {
562  if (_ready.empty() && _next_ready.empty()) {
563  // All of our threads are currently sleeping. Therefore, wake
564  // the volunteer(s) immediately.
565  wake_all_sleepers(_volunteers);
566 
567  // We should also yield the whole process now, to be polite to
568  // the rest of the system.
569  system_yield();
570  now = get_current_time();
571  }
572  wake_sleepers(_sleeping, now);
573  wake_sleepers(_volunteers, now);
574  }
575 
576  bool new_epoch = !_ready.empty() && _next_ready.empty();
577 
578  // Choose a new thread to execute.
579  while (true) {
580  // If there are no threads, sleep.
581  while (_ready.empty()) {
582  if (!_next_ready.empty()) {
583  // We've finished an epoch.
584  _ready.swap(_next_ready);
585 
586  if (new_epoch && !_tick_records.empty()) {
587  // Pop the oldest timeslice record off when we finish an
588  // epoch without executing any threads, to ensure we don't
589  // get caught in an "all threads reached budget" loop.
590  if (thread_cat->is_debug()) {
591  thread_cat.debug()
592  << "All threads exceeded budget.\n";
593  }
594  TickRecord &record = _tick_records.front();
595  _total_ticks -= record._tick_count;
596 
597  if (record._thread->_run_ticks >= record._tick_count) {
598  // Ensure we don't go negative.
599  record._thread->_run_ticks -= record._tick_count;
600  } else {
601  // It is possible for this to happen if the application has been
602  // paused for more than 2^31 ticks.
603  record._thread->_run_ticks = 0;
604  }
605  _tick_records.pop_front();
606  }
607  new_epoch = true;
608 
609  } else if (!_volunteers.empty()) {
610  // There are some volunteers. Wake them. Also wake any
611  // sleepers that need it.
612  if (thread_cat->is_debug()) {
613  thread_cat.debug()
614  << "Waking volunteers.\n";
615  }
616  // We should yield the whole process now, to be polite to the
617  // rest of the system.
618  system_yield();
619  now = get_current_time();
620  wake_all_sleepers(_volunteers);
621  wake_sleepers(_sleeping, now);
622 
623  } else if (!_sleeping.empty()) {
624  // All threads are sleeping.
625  double wait = _sleeping.front()->_wake_time - now;
626  if (wait > 0.0) {
627  if (thread_cat->is_debug()) {
628  thread_cat.debug()
629  << "Sleeping all threads " << wait << " seconds\n";
630  }
631  system_sleep(wait);
632  }
633  now = get_current_time();
634  wake_sleepers(_sleeping, now);
635  wake_sleepers(_volunteers, now);
636 
637  } else {
638  // No threads are ready!
639  if (_waiting_for_exit != NULL) {
640  // This is a shutdown situation. In this case, we quietly
641  // abandoned the remaining blocked threads, if any, and
642  // switch back to the main thread to finish shutting down.
643  _ready.push_back(_waiting_for_exit);
644  _waiting_for_exit = NULL;
645  break;
646  }
647 
648  // No threads are ready to run, but we're not explicitly
649  // shutting down. This is an error condition, an
650  // unintentional deadlock.
651  if (!_blocked.empty()) {
652  thread_cat->error()
653  << "Deadlock! All threads blocked.\n";
654  report_deadlock();
655  abort();
656  }
657 
658  // No threads are queued anywhere. This is some kind of
659  // internal error, since normally the main thread, at least,
660  // should be queued somewhere.
661  thread_cat->error()
662  << "All threads disappeared!\n";
663  exit(0);
664  }
665  }
666 
667  ThreadSimpleImpl *chosen_thread = _ready.front();
668  _ready.pop_front();
669 
670  double timeslice = determine_timeslice(chosen_thread);
671  if (timeslice > 0.0) {
672  // This thread is ready to roll. Break out of the loop.
673  chosen_thread->_start_time = now;
674  chosen_thread->_stop_time = now + timeslice;
675  _current_thread = chosen_thread;
676  break;
677  }
678 
679  // This thread is not ready to wake up yet. Put it back for next
680  // epoch. It doesn't count as a volunteer, though--its timeslice
681  // was used up.
682  _next_ready.push_back(chosen_thread);
683  }
684 
685  // All right, the thread is ready to roll. Begin.
686  if (thread_cat->is_debug()) {
687  size_t blocked_count = 0;
688  Blocked::const_iterator bi;
689  for (bi = _blocked.begin(); bi != _blocked.end(); ++bi) {
690  const FifoThreads &threads = (*bi).second;
691  blocked_count += threads.size();
692  }
693 
694  double timeslice = _current_thread->_stop_time - _current_thread->_start_time;
695  thread_cat.debug()
696  << "Switching to " << *_current_thread->_parent_obj
697  << " for " << timeslice << " s ("
698  << _ready.size() << " + " << _next_ready.size()
699  << " + " << _volunteers.size()
700  << " other threads ready, " << blocked_count
701  << " blocked, " << _sleeping.size() << " sleeping)\n";
702  }
703 
704  switch_to_thread_context(from_context, _current_thread->_context);
705 
706  // Shouldn't get here.
707  nassertv(false);
708  abort();
709 }
710 
711 ////////////////////////////////////////////////////////////////////
712 // Function: ThreadSimpleManager::do_timeslice_accounting
713 // Access: Private
714 // Description: Records the amount of time the indicated thread has
715 // run, and updates the moving average.
716 ////////////////////////////////////////////////////////////////////
717 void ThreadSimpleManager::
718 do_timeslice_accounting(ThreadSimpleImpl *thread, double now) {
719  double elapsed = now - thread->_start_time;
720  if (thread_cat.is_debug()) {
721  thread_cat.debug()
722  << *thread->_parent_obj << " ran for " << elapsed << " s of "
723  << thread->_stop_time - thread->_start_time << " requested.\n";
724  }
725 
726  // Clamp the elapsed time at 0. (If it's less than 0, the clock is
727  // running backwards, ick.)
728  elapsed = max(elapsed, 0.0);
729 
730  unsigned int ticks = (unsigned int)(elapsed * _tick_scale + 0.5);
731  thread->_run_ticks += ticks;
732 
733  // Now remove any old records.
734  unsigned int ticks_window = (unsigned int)(_simple_thread_window * _tick_scale + 0.5);
735  while (_total_ticks > ticks_window) {
736  nassertv(!_tick_records.empty());
737  TickRecord &record = _tick_records.front();
738  _total_ticks -= record._tick_count;
739  if (record._thread->_run_ticks >= record._tick_count) {
740  // Ensure we don't go negative.
741  record._thread->_run_ticks -= record._tick_count;
742  } else {
743  // It is possible for this to happen if the application has been
744  // paused for more than 2^31 ticks.
745  record._thread->_run_ticks = 0;
746  }
747  _tick_records.pop_front();
748  }
749 
750  // Finally, record the new record.
751  TickRecord record;
752  record._tick_count = ticks;
753  record._thread = thread;
754  _tick_records.push_back(record);
755  _total_ticks += ticks;
756 }
757 
758 
759 ////////////////////////////////////////////////////////////////////
760 // Function: ThreadSimpleManager::wake_sleepers
761 // Access: Private
762 // Description: Moves any threads due to wake up from the sleeping
763 // queue to the ready queue.
764 ////////////////////////////////////////////////////////////////////
765 void ThreadSimpleManager::
766 wake_sleepers(ThreadSimpleManager::Sleeping &sleepers, double now) {
767  while (!sleepers.empty() && sleepers.front()->_wake_time <= now) {
768  ThreadSimpleImpl *thread = sleepers.front();
769  pop_heap(sleepers.begin(), sleepers.end(), CompareStartTime());
770  sleepers.pop_back();
771  _ready.push_back(thread);
772  }
773 }
774 
775 ////////////////////////////////////////////////////////////////////
776 // Function: ThreadSimpleManager::wake_all_sleepers
777 // Access: Private
778 // Description: Moves all threads from the indicated sleeping queue
779 // to the ready queue, regardless of wake time.
780 ////////////////////////////////////////////////////////////////////
781 void ThreadSimpleManager::
782 wake_all_sleepers(ThreadSimpleManager::Sleeping &sleepers) {
783  while (!sleepers.empty()) {
784  ThreadSimpleImpl *thread = sleepers.front();
785  pop_heap(sleepers.begin(), sleepers.end(), CompareStartTime());
786  sleepers.pop_back();
787  _ready.push_back(thread);
788  }
789 }
790 
791 ////////////////////////////////////////////////////////////////////
792 // Function: ThreadSimpleManager::report_deadlock
793 // Access: Private
794 // Description:
795 ////////////////////////////////////////////////////////////////////
796 void ThreadSimpleManager::
797 report_deadlock() {
798  Blocked::const_iterator bi;
799  for (bi = _blocked.begin(); bi != _blocked.end(); ++bi) {
800  BlockerSimple *blocker = (*bi).first;
801  const FifoThreads &threads = (*bi).second;
802  thread_cat.info()
803  << "On blocker " << blocker << ":\n";
804  FifoThreads::const_iterator ti;
805  for (ti = threads.begin(); ti != threads.end(); ++ti) {
806  ThreadSimpleImpl *thread = (*ti);
807  thread_cat.info()
808  << " " << *thread->_parent_obj;
809 #ifdef DEBUG_THREADS
810  thread_cat.info(false) << " (";
811  thread->_parent_obj->output_blocker(thread_cat.info(false));
812  thread_cat.info(false) << ")";
813 #endif // DEBUG_THREADS
814  thread_cat.info(false) << "\n";
815  }
816  }
817 }
818 
819 ////////////////////////////////////////////////////////////////////
820 // Function: ThreadSimpleManager::determine_timeslice
821 // Access: Private
822 // Description: Determines the amount of time that should be
823 // allocated to the next timeslice of this thread, based
824 // on its priority weight and the amount of time it has
825 // run recently relative to other threads.
826 ////////////////////////////////////////////////////////////////////
827 double ThreadSimpleManager::
828 determine_timeslice(ThreadSimpleImpl *chosen_thread) {
829  if (_ready.empty() && _next_ready.empty()) {
830  // This is the only ready thread. It gets the full timeslice.
831  return _simple_thread_epoch_timeslice;
832  }
833 
834  // Count up the total runtime and weight of all ready threads.
835  unsigned int total_ticks = chosen_thread->_run_ticks;
836  double total_weight = chosen_thread->_priority_weight;
837 
838  FifoThreads::const_iterator ti;
839  for (ti = _ready.begin(); ti != _ready.end(); ++ti) {
840  total_ticks += (*ti)->_run_ticks;
841  total_weight += (*ti)->_priority_weight;
842  }
843  for (ti = _next_ready.begin(); ti != _next_ready.end(); ++ti) {
844  total_ticks += (*ti)->_run_ticks;
845  total_weight += (*ti)->_priority_weight;
846  }
847 
848  nassertr(total_weight != 0.0, 0.0);
849  double budget_ratio = chosen_thread->_priority_weight / total_weight;
850 
851  if (total_ticks == 0) {
852  // This must be the first thread. Special case.
853  return budget_ratio * _simple_thread_epoch_timeslice;
854  }
855 
856  double run_ratio = (double)chosen_thread->_run_ticks / (double)total_ticks;
857  double remaining_ratio = budget_ratio - run_ratio;
858 
859  if (thread_cat->is_debug()) {
860  thread_cat.debug()
861  << *chosen_thread->_parent_obj << " accrued "
862  << chosen_thread->_run_ticks / _tick_scale << " s of "
863  << total_ticks / _tick_scale << "; budget is "
864  << budget_ratio * total_ticks / _tick_scale << ".\n";
865  if (remaining_ratio <= 0.0) {
866  thread_cat.debug()
867  << "Exceeded budget.\n";
868  }
869  }
870 
871  return remaining_ratio * _simple_thread_epoch_timeslice;
872 }
873 
874 ////////////////////////////////////////////////////////////////////
875 // Function: ThreadSimpleManager::kill_non_joinable
876 // Access: Private
877 // Description: Removes any non-joinable threads from the indicated
878 // queue and marks them killed.
879 ////////////////////////////////////////////////////////////////////
880 void ThreadSimpleManager::
881 kill_non_joinable(ThreadSimpleManager::FifoThreads &threads) {
882  FifoThreads new_threads;
883  FifoThreads::iterator ti;
884  for (ti = threads.begin(); ti != threads.end(); ++ti) {
885  ThreadSimpleImpl *thread = (*ti);
886  if (thread->_joinable) {
887  new_threads.push_back(thread);
888  } else {
889  if (thread_cat->is_debug()) {
890  thread_cat.debug()
891  << "Killing " << *thread->_parent_obj << "\n";
892  }
893  thread->_status = ThreadSimpleImpl::TS_killed;
894  enqueue_finished(thread);
895  }
896  }
897 
898  threads.swap(new_threads);
899 }
900 
901 ////////////////////////////////////////////////////////////////////
902 // Function: ThreadSimpleManager::kill_non_joinable
903 // Access: Private
904 // Description: Removes any non-joinable threads from the indicated
905 // queue and marks them killed.
906 ////////////////////////////////////////////////////////////////////
907 void ThreadSimpleManager::
908 kill_non_joinable(ThreadSimpleManager::Sleeping &threads) {
909  Sleeping new_threads;
910  Sleeping::iterator ti;
911  for (ti = threads.begin(); ti != threads.end(); ++ti) {
912  ThreadSimpleImpl *thread = (*ti);
913  if (thread->_joinable) {
914  new_threads.push_back(thread);
915  } else {
916  if (thread_cat->is_debug()) {
917  thread_cat.debug()
918  << "Killing " << *thread->_parent_obj << "\n";
919  }
920  thread->_status = ThreadSimpleImpl::TS_killed;
921  enqueue_finished(thread);
922  }
923  }
924  make_heap(new_threads.begin(), new_threads.end(), CompareStartTime());
925  threads.swap(new_threads);
926 }
927 
928 #endif // THREAD_SIMPLE_IMPL
static TrueClock * get_global_ptr()
Returns a pointer to the one TrueClock object in the world.
Definition: trueClock.I:81
static void consider_yield()
Possibly suspends the current thread for the rest of the current epoch, if it has run for enough this...
Definition: thread.I:263
static void force_yield()
Suspends the current thread for the rest of the current epoch.
Definition: thread.I:248
static Thread * get_main_thread()
Returns a pointer to the "main" Thread object–this is the Thread that started the whole process...
Definition: thread.I:107
virtual void activate_hook(Thread *thread)
Called when the thread is activated (resumes execution).
Definition: thread.cxx:575
virtual void deactivate_hook(Thread *thread)
Called when the thread is deactivated (swapped for another running thread).
Definition: thread.cxx:563