Panda3D
asyncTaskChain.cxx
Go to the documentation of this file.
1 /**
2  * PANDA 3D SOFTWARE
3  * Copyright (c) Carnegie Mellon University. All rights reserved.
4  *
5  * All use of this software is subject to the terms of the revised BSD
6  * license. You should have received a copy of this license along
7  * with this source code in a file named "LICENSE."
8  *
9  * @file asyncTaskChain.cxx
10  * @author drose
11  * @date 2006-08-23
12  */
13 
14 #include "asyncTaskChain.h"
15 #include "asyncTaskManager.h"
16 #include "event.h"
17 #include "mutexHolder.h"
18 #include "indent.h"
19 #include "pStatClient.h"
20 #include "pStatTimer.h"
21 #include "clockObject.h"
22 #include "config_event.h"
23 #include <algorithm>
24 #include <stdio.h> // For sprintf/snprintf
25 
26 using std::max;
27 using std::ostream;
28 using std::ostringstream;
29 using std::string;
30 
31 TypeHandle AsyncTaskChain::_type_handle;
32 
33 PStatCollector AsyncTaskChain::_task_pcollector("Task");
34 PStatCollector AsyncTaskChain::_wait_pcollector("Wait");
35 
36 /**
37  *
38  */
39 AsyncTaskChain::
40 AsyncTaskChain(AsyncTaskManager *manager, const string &name) :
41  Namable(name),
42  _manager(manager),
43  _cvar(manager->_lock),
44  _tick_clock(false),
45  _timeslice_priority(false),
46  _num_threads(0),
47  _thread_priority(TP_normal),
48  _frame_budget(-1.0),
49  _frame_sync(false),
50  _num_busy_threads(0),
51  _num_tasks(0),
52  _num_awaiting_tasks(0),
53  _state(S_initial),
54  _current_sort(-INT_MAX),
55  _pickup_mode(false),
56  _needs_cleanup(false),
57  _current_frame(0),
58  _time_in_frame(0.0),
59  _block_till_next_frame(false),
60  _next_implicit_sort(0)
61 {
62 }
63 
64 /**
65  *
66  */
67 AsyncTaskChain::
68 ~AsyncTaskChain() {
69  // We only grab the lock if _needs_cleanup is true. This way, the temporary
70  // AsyncTaskChain objects created (and destructed) within the task manager
71  // won't risk a double-lock.
72  if (_needs_cleanup) {
73  MutexHolder holder(_manager->_lock);
74  do_cleanup();
75  }
76 }
77 
78 /**
79  * Sets the tick_clock flag. When this is true, get_clock()->tick() will be
80  * called automatically at each task epoch. This is false by default.
81  */
83 set_tick_clock(bool tick_clock) {
84  MutexHolder holder(_manager->_lock);
85  _tick_clock = tick_clock;
86 }
87 
88 /**
89  * Returns the tick_clock flag. See set_tick_clock().
90  */
92 get_tick_clock() const {
93  MutexHolder holder(_manager->_lock);
94  return _tick_clock;
95 }
96 
97 /**
98  * Changes the number of threads for this task chain. This may require
99  * stopping the threads if they are already running.
100  */
102 set_num_threads(int num_threads) {
103  nassertv(num_threads >= 0);
104 
105  if (task_cat.is_debug()) {
106  do_output(task_cat.debug());
107  task_cat.debug(false)
108  << ": set_num_threads(" << num_threads << ")\n";
109  }
110 
112  num_threads = 0;
113  }
114 
115  MutexHolder holder(_manager->_lock);
116  if (_num_threads != num_threads) {
117  do_stop_threads();
118  _num_threads = num_threads;
119 
120  if (_num_tasks != 0) {
121  do_start_threads();
122  }
123  }
124 }
125 
126 /**
127  * Returns the number of threads that will be servicing tasks for this chain.
128  * Also see get_num_running_threads().
129  */
131 get_num_threads() const {
132  MutexHolder holder(_manager->_lock);
133  return _num_threads;
134 }
135 
136 /**
137  * Returns the number of threads that have been created and are actively
138  * running. This will return 0 before the threads have been started; it will
139  * also return 0 if thread support is not available.
140  */
142 get_num_running_threads() const {
143  MutexHolder holder(_manager->_lock);
144  return _threads.size();
145 }
146 
147 /**
148  * Changes the priority associated with threads that serve this task chain.
149  * This may require stopping the threads if they are already running.
150  */
152 set_thread_priority(ThreadPriority priority) {
153  MutexHolder holder(_manager->_lock);
154  if (_thread_priority != priority) {
155  do_stop_threads();
156  _thread_priority = priority;
157 
158  if (_num_tasks != 0) {
159  do_start_threads();
160  }
161  }
162 }
163 
164 /**
165  * Returns the priority associated with threads that serve this task chain.
166  */
167 ThreadPriority AsyncTaskChain::
168 get_thread_priority() const {
169  MutexHolder holder(_manager->_lock);
170  return _thread_priority;
171 }
172 
173 /**
174  * Sets the maximum amount of time per frame the tasks on this chain are
175  * granted for execution. If this is less than zero, there is no limit; if it
176  * is >= 0, it represents a maximum amount of time (in seconds) that will be
177  * used to execute tasks. If this time is exceeded in any one frame, the task
178  * chain will stop executing tasks until the next frame, as defined by the
179  * TaskManager's clock.
180  */
182 set_frame_budget(double frame_budget) {
183  MutexHolder holder(_manager->_lock);
184  _frame_budget = frame_budget;
185 }
186 
187 /**
188  * Returns the maximum amount of time per frame the tasks on this chain are
189  * granted for execution. See set_frame_budget().
190  */
192 get_frame_budget() const {
193  MutexHolder holder(_manager->_lock);
194  return _frame_budget;
195 }
196 
197 /**
198  * Sets the frame_sync flag. When this flag is true, this task chain will be
199  * forced to sync with the TaskManager's clock. It will run no faster than
200  * one epoch per clock frame.
201  *
202  * When this flag is false, the default, the task chain will finish all of its
203  * tasks and then immediately start from the first task again, regardless of
204  * the clock frame. When it is true, the task chain will finish all of its
205  * tasks and then wait for the clock to tick to the next frame before resuming
206  * the first task.
207  *
208  * This only makes sense for threaded task chains. Non-threaded task chains
209  * are automatically synchronous.
210  */
212 set_frame_sync(bool frame_sync) {
213  MutexHolder holder(_manager->_lock);
214  _frame_sync = frame_sync;
215 }
216 
217 /**
218  * Returns the frame_sync flag. See set_frame_sync().
219  */
221 get_frame_sync() const {
222  MutexHolder holder(_manager->_lock);
223  return _frame_sync;
224 }
225 
226 /**
227  * Sets the timeslice_priority flag. This changes the interpretation of
228  * priority, and the number of times per epoch each task will run.
229  *
230  * When this flag is true, some tasks might not run in any given epoch.
231  * Instead, tasks with priority higher than 1 will be given precedence, in
232  * proportion to the amount of time they have already used. This gives
233  * higher-priority tasks more runtime than lower-priority tasks. Each task
234  * gets the amount of time proportional to its priority value, so a task with
235  * priority 100 will get five times as much processing time as a task with
236  * priority 20. For these purposes, priority values less than 1 are deemed to
237  * be equal to 1.
238  *
239  * When this flag is false (the default), all tasks are run exactly once each
240  * epoch, round-robin style. Priority is only used to determine which task
241  * runs first within tasks of the same sort value.
242  */
244 set_timeslice_priority(bool timeslice_priority) {
245  MutexHolder holder(_manager->_lock);
246  _timeslice_priority = timeslice_priority;
247 }
248 
249 /**
250  * Returns the timeslice_priority flag. This changes the interpretation of
251  * priority, and the number of times per epoch each task will run. See
252  * set_timeslice_priority().
253  */
255 get_timeslice_priority() const {
256  MutexHolder holder(_manager->_lock);
257  return _timeslice_priority;
258 }
259 
260 /**
261  * Stops any threads that are currently running. If any tasks are still
262  * pending and have not yet been picked up by a thread, they will not be
263  * serviced unless poll() or start_threads() is later called.
264  */
266 stop_threads() {
267  if (_state == S_started || _state == S_interrupted) {
268  // Clean up all of the threads.
269  MutexHolder holder(_manager->_lock);
270  do_stop_threads();
271  }
272 }
273 
274 /**
275  * Starts any requested threads to service the tasks on the queue. This is
276  * normally not necessary, since adding a task will start the threads
277  * automatically.
278  */
280 start_threads() {
281  if (_state == S_initial || _state == S_interrupted) {
282  MutexHolder holder(_manager->_lock);
283  do_start_threads();
284  }
285 }
286 
287 /**
288  * Returns true if the indicated task has been added to this AsyncTaskChain,
289  * false otherwise.
290  */
292 has_task(AsyncTask *task) const {
293  MutexHolder holder(_manager->_lock);
294 
295  if (task->_chain != this) {
296  nassertr(!do_has_task(task), false);
297  return false;
298  }
299 
300  if (task->_state == AsyncTask::S_servicing_removed) {
301  return false;
302  }
303 
304  // The task might not actually be in the active queue, since it might be
305  // being serviced right now. That's OK.
306  return true;
307 }
308 
309 /**
310  * Blocks until the task list is empty.
311  */
313 wait_for_tasks() {
314  MutexHolder holder(_manager->_lock);
315  do_wait_for_tasks();
316 }
317 
318 /**
319  * Returns the number of tasks that are currently active or sleeping within
320  * the task chain.
321  */
323 get_num_tasks() const {
324  MutexHolder holder(_manager->_lock);
325  return _num_tasks;
326 }
327 
328 /**
329  * Returns the set of tasks that are active or sleeping on the task chain, at
330  * the time of the call.
331  */
333 get_tasks() const {
334  MutexHolder holder(_manager->_lock);
335  AsyncTaskCollection result = do_get_active_tasks();
336  result.add_tasks_from(do_get_sleeping_tasks());
337  return result;
338 }
339 
340 /**
341  * Returns the set of tasks that are active (and not sleeping) on the task
342  * chain, at the time of the call.
343  */
345 get_active_tasks() const {
346  MutexHolder holder(_manager->_lock);
347  return do_get_active_tasks();
348 }
349 
350 /**
351  * Returns the set of tasks that are sleeping (and not active) on the task
352  * chain, at the time of the call.
353  */
355 get_sleeping_tasks() const {
356  MutexHolder holder(_manager->_lock);
357  return do_get_sleeping_tasks();
358 }
359 
360 /**
361  * Runs through all the tasks in the task list, once, if the task chain is
362  * running in single-threaded mode (no threads available). This method does
363  * nothing in threaded mode, so it may safely be called in either case.
364  *
365  * Normally, you would not call this function directly; instead, call
366  * AsyncTaskManager::poll(), which polls all of the task chains in sequence.
367  */
369 poll() {
370  MutexHolder holder(_manager->_lock);
371  do_poll();
372 }
373 
374 /**
375  * Returns the scheduled time (on the manager's clock) of the next sleeping
376  * task, on any task chain, to awaken. Returns -1 if there are no sleeping
377  * tasks.
378  */
380 get_next_wake_time() const {
381  MutexHolder holder(_manager->_lock);
382  return do_get_next_wake_time();
383 }
384 
385 /**
386  *
387  */
388 void AsyncTaskChain::
389 output(ostream &out) const {
390  MutexHolder holder(_manager->_lock);
391  do_output(out);
392 }
393 
394 /**
395  *
396  */
397 void AsyncTaskChain::
398 write(ostream &out, int indent_level) const {
399  MutexHolder holder(_manager->_lock);
400  do_write(out, indent_level);
401 }
402 
403 /**
404  * Adds the indicated task to the active queue. It is an error if the task is
405  * already added to this or any other active queue.
406  *
407  * This is normally called only by the AsyncTaskManager. Assumes the lock is
408  * already held.
409  */
410 void AsyncTaskChain::
411 do_add(AsyncTask *task) {
412  nassertv(task->_chain == nullptr &&
413  task->_manager == nullptr &&
414  task->_chain_name == get_name() &&
415  task->_state == AsyncTask::S_inactive);
416  nassertv(!do_has_task(task));
417 
418  do_start_threads();
419 
420  task->_chain = this;
421  task->_manager = _manager;
422 
423  double now = _manager->_clock->get_frame_time();
424  task->_start_time = now;
425  task->_start_frame = _manager->_clock->get_frame_count();
426 
427  // Remember the order in which tasks were added to the chain.
428  task->_implicit_sort = _next_implicit_sort++;
429 
430  _manager->add_task_by_name(task);
431 
432  if (task->has_delay()) {
433  // This is a deferred task. Add it to the sleeping queue.
434  task->_wake_time = now + task->get_delay();
435  task->_start_time = task->_wake_time;
436  task->_state = AsyncTask::S_sleeping;
437  _sleeping.push_back(task);
438  push_heap(_sleeping.begin(), _sleeping.end(), AsyncTaskSortWakeTime());
439 
440  } else {
441  // This is an active task. Add it to the active set.
442  task->_state = AsyncTask::S_active;
443  if (task_cat.is_spam()) {
444  task_cat.spam()
445  << "Adding " << *task << " with sort " << task->get_sort()
446  << " to chain " << get_name() << " with current_sort "
447  << _current_sort << "\n";
448  }
449  if (task->get_sort() >= _current_sort) {
450  // It will run this frame.
451  _active.push_back(task);
452  push_heap(_active.begin(), _active.end(), AsyncTaskSortPriority());
453  } else {
454  // It will run next frame.
455  _next_active.push_back(task);
456  }
457  }
458  ++_num_tasks;
459  ++(_manager->_num_tasks);
460  _needs_cleanup = true;
461 
462  _cvar.notify_all();
463 }
464 
465 /**
466  * Removes the indicated task from this chain. Returns true if removed, false
467  * otherwise. Assumes the lock is already held. The task->upon_death()
468  * method is called with clean_exit=false if upon_death is given.
469  */
470 bool AsyncTaskChain::
471 do_remove(AsyncTask *task, bool upon_death) {
472  nassertr(task->_chain == this, false);
473 
474  switch (task->_state) {
475  case AsyncTask::S_servicing:
476  // This task is being serviced. upon_death will be called afterwards.
477  task->_state = AsyncTask::S_servicing_removed;
478  return true;
479 
480  case AsyncTask::S_servicing_removed:
481  // Being serviced, though it is already marked to be removed afterwards.
482  return false;
483 
484  case AsyncTask::S_sleeping:
485  // Sleeping, easy.
486  {
487  int index = find_task_on_heap(_sleeping, task);
488  nassertr(index != -1, false);
489  PT(AsyncTask) hold_task = task;
490  _sleeping.erase(_sleeping.begin() + index);
491  make_heap(_sleeping.begin(), _sleeping.end(), AsyncTaskSortWakeTime());
492  cleanup_task(task, upon_death, false);
493  }
494  return true;
495 
496  case AsyncTask::S_active:
497  {
498  // Active, but not being serviced, easy.
499  PT(AsyncTask) hold_task = task;
500  int index = find_task_on_heap(_active, task);
501  if (index != -1) {
502  _active.erase(_active.begin() + index);
503  make_heap(_active.begin(), _active.end(), AsyncTaskSortPriority());
504  } else {
505  index = find_task_on_heap(_next_active, task);
506  if (index != -1) {
507  _next_active.erase(_next_active.begin() + index);
508  } else {
509  index = find_task_on_heap(_this_active, task);
510  nassertr(index != -1, false);
511  }
512  }
513  cleanup_task(task, upon_death, false);
514  return true;
515  }
516 
517  default:
518  break;
519  }
520 
521  return false;
522 }
523 
524 /**
525  * Blocks until the task list is empty. Assumes the lock is held.
526  */
527 void AsyncTaskChain::
528 do_wait_for_tasks() {
529  do_start_threads();
530 
531  if (_threads.empty()) {
532  // Non-threaded case.
533  while (_num_tasks > 0) {
534  if (_state == S_shutdown || _state == S_interrupted) {
535  return;
536  }
537  do_poll();
538  }
539 
540  } else {
541  // Threaded case.
542  while (_num_tasks > 0) {
543  if (_state == S_shutdown || _state == S_interrupted) {
544  return;
545  }
546 
547  PStatTimer timer(_wait_pcollector);
548  _cvar.wait();
549  }
550  }
551 }
552 
553 /**
554  * Stops all threads and messily empties the task list. This is intended to
555  * be called on destruction only. Assumes the lock is already held.
556  */
557 void AsyncTaskChain::
558 do_cleanup() {
559  if (task_cat.is_spam()) {
560  do_output(task_cat.spam());
561  task_cat.spam(false)
562  << ": do_cleanup()\n";
563  }
564  do_stop_threads();
565  _num_threads = 0;
566 
567  // Don't call the upon_death functions while we clean up the tasks.
568  // Instead, store all the tasks in a list as we clean them up, and then call
569  // the upon_death functions all at once. We do this because calling
570  // upon_death wil release the lock, allowing the iterators to become
571  // invalid.
572 
573  TaskHeap dead;
574  dead.reserve(_num_tasks);
575 
576  _needs_cleanup = false;
577 
578  TaskHeap::const_iterator ti;
579  for (ti = _active.begin(); ti != _active.end(); ++ti) {
580  AsyncTask *task = (*ti);
581  dead.push_back(task);
582  cleanup_task(task, false, false);
583  }
584  for (ti = _this_active.begin(); ti != _this_active.end(); ++ti) {
585  AsyncTask *task = (*ti);
586  dead.push_back(task);
587  cleanup_task(task, false, false);
588  }
589  for (ti = _next_active.begin(); ti != _next_active.end(); ++ti) {
590  AsyncTask *task = (*ti);
591  dead.push_back(task);
592  cleanup_task(task, false, false);
593  }
594  for (ti = _sleeping.begin(); ti != _sleeping.end(); ++ti) {
595  AsyncTask *task = (*ti);
596  dead.push_back(task);
597  cleanup_task(task, false, false);
598  }
599 
600  // There might still be one task remaining: the currently-executing task.
601  nassertv(_num_tasks == 0 || _num_tasks == 1);
602 
603  // Now go back and call the upon_death functions.
604  _manager->_lock.unlock();
605  for (ti = dead.begin(); ti != dead.end(); ++ti) {
606  (*ti)->upon_death(_manager, false);
607  }
608  _manager->_lock.lock();
609 
610  if (task_cat.is_spam()) {
611  do_output(task_cat.spam());
612  task_cat.spam(false)
613  << ": done do_cleanup()\n";
614  }
615 }
616 
617 /**
618  * Returns true if the task is on one of the task lists, false if it is not
619  * (false may mean that the task is currently being serviced). Assumes the
620  * lock is currently held.
621  */
622 bool AsyncTaskChain::
623 do_has_task(AsyncTask *task) const {
624  return (find_task_on_heap(_active, task) != -1 ||
625  find_task_on_heap(_next_active, task) != -1 ||
626  find_task_on_heap(_sleeping, task) != -1 ||
627  find_task_on_heap(_this_active, task) != -1);
628 }
629 
630 /**
631  * Returns the index number of the indicated task within the specified task
632  * list, or -1 if the task is not found in the list (this may mean that it is
633  * currently being serviced). Assumes that the lock is currently held.
634  */
635 int AsyncTaskChain::
636 find_task_on_heap(const TaskHeap &heap, AsyncTask *task) const {
637  for (int i = 0; i < (int)heap.size(); ++i) {
638  if (heap[i] == task) {
639  return i;
640  }
641  }
642 
643  return -1;
644 }
645 
646 /**
647  * Pops a single task off the active queue, services it, and restores it to
648  * the end of the queue. This is called internally only within one of the
649  * task threads. Assumes the lock is already held.
650  *
651  * Note that the lock may be temporarily released by this method.
652  */
653 void AsyncTaskChain::
654 service_one_task(AsyncTaskChain::AsyncTaskChainThread *thread) {
655  if (!_active.empty()) {
656  PT(AsyncTask) task = _active.front();
657  pop_heap(_active.begin(), _active.end(), AsyncTaskSortPriority());
658  _active.pop_back();
659 
660  if (thread != nullptr) {
661  thread->_servicing = task;
662  }
663 
664  if (task_cat.is_spam()) {
665  task_cat.spam()
666  << "Servicing " << *task << " in "
667  << *Thread::get_current_thread() << "\n";
668  }
669 
670  nassertv(task->get_sort() == _current_sort);
671  nassertv(task->_state == AsyncTask::S_active);
672  task->_state = AsyncTask::S_servicing;
673  task->_servicing_thread = thread;
674 
675  AsyncTask::DoneStatus ds = task->unlock_and_do_task();
676 
677  if (thread != nullptr) {
678  thread->_servicing = nullptr;
679  }
680  task->_servicing_thread = nullptr;
681 
682  if (task->_chain == this) {
683  if (task->_state == AsyncTask::S_servicing_removed) {
684  // This task wants to kill itself.
685  cleanup_task(task, true, false);
686 
687  } else if (task->_chain_name != get_name()) {
688  // The task wants to jump to a different chain.
689  PT(AsyncTask) hold_task = task;
690  cleanup_task(task, false, false);
691  task->jump_to_task_chain(_manager);
692 
693  } else {
694  switch (ds) {
695  case AsyncTask::DS_cont:
696  // The task is still alive; put it on the next frame's active queue.
697  task->_state = AsyncTask::S_active;
698  _next_active.push_back(task);
699  _cvar.notify_all();
700  break;
701 
702  case AsyncTask::DS_again:
703  // The task wants to sleep again.
704  {
705  double now = _manager->_clock->get_frame_time();
706  task->_wake_time = now + task->get_delay();
707  task->_start_time = task->_wake_time;
708  task->_state = AsyncTask::S_sleeping;
709  _sleeping.push_back(task);
710  push_heap(_sleeping.begin(), _sleeping.end(), AsyncTaskSortWakeTime());
711  if (task_cat.is_spam()) {
712  task_cat.spam()
713  << "Sleeping " << *task << ", wake time at "
714  << task->_wake_time - now << "\n";
715  }
716  _cvar.notify_all();
717  }
718  break;
719 
720  case AsyncTask::DS_pickup:
721  // The task wants to run again this frame if possible.
722  task->_state = AsyncTask::S_active;
723  _this_active.push_back(task);
724  _cvar.notify_all();
725  break;
726 
727  case AsyncTask::DS_interrupt:
728  // The task had an exception and wants to raise a big flag.
729  task->_state = AsyncTask::S_active;
730  _next_active.push_back(task);
731  if (_state == S_started) {
732  _state = S_interrupted;
733  _cvar.notify_all();
734  }
735  break;
736 
737  case AsyncTask::DS_await:
738  // The task wants to wait for another one to finish.
739  task->_state = AsyncTask::S_awaiting;
740  _cvar.notify_all();
741  ++_num_awaiting_tasks;
742  break;
743 
744  default:
745  // The task has finished.
746  cleanup_task(task, true, true);
747  }
748  }
749  } else {
750  task_cat.error()
751  << "Task is no longer on chain " << get_name()
752  << ": " << *task << "\n";
753  }
754 
755  if (task_cat.is_spam()) {
756  task_cat.spam()
757  << "Done servicing " << *task << " in "
758  << *Thread::get_current_thread() << "\n";
759  }
760  }
761  thread_consider_yield();
762 }
763 
764 /**
765  * Called internally when a task has completed (or been interrupted) and is
766  * about to be removed from the active queue. Assumes the lock is held.
767  *
768  * If upon_death is true, then task->upon_death() will also be called, with
769  * the indicated clean_exit parameter.
770  *
771  * Note that the lock may be temporarily released by this method.
772  */
773 void AsyncTaskChain::
774 cleanup_task(AsyncTask *task, bool upon_death, bool clean_exit) {
775  if (task_cat.is_spam()) {
776  do_output(task_cat.spam());
777  task_cat.spam(false)
778  << ": cleanup_task(" << *task << ", " << upon_death << ", " << clean_exit
779  << ")\n";
780  }
781 
782  nassertv(task->_chain == this);
783 
784  task->_state = AsyncTask::S_inactive;
785  task->_chain = nullptr;
786  --_num_tasks;
787  --(_manager->_num_tasks);
788 
789  _manager->remove_task_by_name(task);
790 
791  if (upon_death) {
792  _manager->_lock.unlock();
793  if (task->set_future_state(clean_exit ? AsyncFuture::FS_finished
794  : AsyncFuture::FS_cancelled)) {
795  task->notify_done(clean_exit);
796  }
797  task->upon_death(_manager, clean_exit);
798  _manager->_lock.lock();
799  }
800 
801  task->_manager = nullptr;
802 }
803 
804 /**
805  * Called internally when all tasks of a given sort value have been completed,
806  * and it is time to increment to the next sort value, or begin the next
807  * epoch. Assumes the lock is held.
808  *
809  * Returns true if there are more tasks on the queue after this operation, or
810  * false if the task list is empty and we need to wait.
811  */
812 bool AsyncTaskChain::
813 finish_sort_group() {
814  nassertr(_num_busy_threads == 0, true);
815 
816  if (!_threads.empty()) {
817  PStatClient::thread_tick(get_name());
818  }
819 
820  if (!_active.empty()) {
821  // There are more tasks; just set the next sort value.
822  nassertr(_current_sort < _active.front()->get_sort(), true);
823  _current_sort = _active.front()->get_sort();
824  _cvar.notify_all();
825  return true;
826  }
827 
828  // There are no more tasks in this epoch; advance to the next epoch.
829 
830  if (!_this_active.empty() && _frame_budget >= 0.0) {
831  // Enter pickup mode. This is a special mode at the end of the epoch in
832  // which we are just re-running the tasks that think they can still run
833  // within the frame, in an attempt to use up our frame budget.
834 
835  if (task_cat.is_spam()) {
836  do_output(task_cat.spam());
837  task_cat.spam(false)
838  << ": next epoch (pickup mode)\n";
839  }
840 
841  _pickup_mode = true;
842  _active.swap(_this_active);
843 
844  } else {
845  // Not in pickup mode.
846 
847  if (task_cat.is_spam()) {
848  do_output(task_cat.spam());
849  task_cat.spam(false)
850  << ": next epoch\n";
851  }
852 
853  _pickup_mode = false;
854 
855  // Here, there's no difference between _this_active and _next_active.
856  // Combine them.
857  _next_active.insert(_next_active.end(), _this_active.begin(), _this_active.end());
858  _this_active.clear();
859 
860  _active.swap(_next_active);
861 
862  // We only tick the clock and wake sleepers in normal mode, the first time
863  // through the task list; not in pickup mode when we are re-running the
864  // stragglers just to use up our frame budget.
865 
866  if (_tick_clock) {
867  if (task_cat.is_spam()) {
868  do_output(task_cat.spam());
869  task_cat.spam(false)
870  << ": tick clock\n";
871  }
872  _manager->_clock->tick();
873  _manager->_frame_cvar.notify_all();
874 
875  } else if (_frame_sync) {
876  // If we're a synced chain, we have to wait at the end of the epoch for
877  // someone else to tick the clock.
878  _block_till_next_frame = true;
879  }
880 
881  // Check for any sleeping tasks that need to be woken.
882  double now = _manager->_clock->get_frame_time();
883  while (!_sleeping.empty() && _sleeping.front()->_wake_time <= now) {
884  PT(AsyncTask) task = _sleeping.front();
885  if (task_cat.is_spam()) {
886  task_cat.spam()
887  << "Waking " << *task << ", wake time at "
888  << task->_wake_time - now << "\n";
889  }
890  pop_heap(_sleeping.begin(), _sleeping.end(), AsyncTaskSortWakeTime());
891  _sleeping.pop_back();
892  task->_state = AsyncTask::S_active;
893  task->_start_frame = _manager->_clock->get_frame_count();
894  _active.push_back(task);
895  }
896 
897  if (task_cat.is_spam()) {
898  if (_sleeping.empty()) {
899  task_cat.spam()
900  << "No more tasks on sleeping queue.\n";
901  } else {
902  task_cat.spam()
903  << "Next sleeper: " << *_sleeping.front() << ", wake time at "
904  << _sleeping.front()->_wake_time - now << "\n";
905  }
906  }
907 
908  // Any tasks that are on the active queue at the beginning of the epoch
909  // are deemed to have run one frame (or to be about to).
910  TaskHeap::const_iterator ti;
911  for (ti = _active.begin(); ti != _active.end(); ++ti) {
912  AsyncTask *task = (*ti);
913  ++task->_num_frames;
914  }
915  }
916 
917  if (_timeslice_priority) {
918  filter_timeslice_priority();
919  }
920 
921  nassertr((size_t)_num_tasks == _active.size() + _this_active.size() + _next_active.size() + _sleeping.size() + (size_t)_num_awaiting_tasks, true);
922  make_heap(_active.begin(), _active.end(), AsyncTaskSortPriority());
923 
924  _current_sort = -INT_MAX;
925 
926  if (!_active.empty()) {
927  // Signal the threads to start executing the first task again.
928  _cvar.notify_all();
929  return true;
930  }
931 
932  // There are no tasks to be had anywhere. Chill.
933  _pickup_mode = false;
934  nassertr(_this_active.empty(), false);
935  return false;
936 }
937 
938 /**
939  * Called to filter the _active tasks list when we are in the special
940  * timeslice_priority mode. In this mode, go through and postpone any tasks
941  * that have already exceeded their priority budget for this epoch.
942  *
943  * Assumes the lock is already held.
944  */
945 void AsyncTaskChain::
946 filter_timeslice_priority() {
947  if (_active.empty()) {
948  return;
949  }
950  nassertv(_timeslice_priority);
951 
952  // We must first sum up the average per-epoch runtime of each task.
953  double net_runtime = 0.0;
954  int net_priority = 0;
955 
956  TaskHeap::iterator ti;
957  for (ti = _active.begin(); ti != _active.end(); ++ti) {
958  AsyncTask *task = (*ti);
959  double runtime = max(task->get_average_dt(), 0.0);
960  int priority = max(task->_priority, 1);
961  net_runtime += runtime;
962  net_priority += priority;
963  }
964 
965  // That gives us a timeslice budget per priority value.
966  double average_budget = net_runtime / (double)net_priority;
967 
968  TaskHeap keep, postpone;
969  for (ti = _active.begin(); ti != _active.end(); ++ti) {
970  AsyncTask *task = (*ti);
971  double runtime = max(task->get_average_dt(), 0.0);
972  int priority = max(task->_priority, 1);
973  double consumed = runtime / (double)priority;
974  // cerr << *task << " consumed " << consumed << " vs. " << average_budget
975  // << "\n";
976  if (consumed > average_budget) {
977  // Postpone. Run this task next epoch.
978  postpone.push_back(task);
979  } else {
980  // Keep, and run this task this epoch.
981  keep.push_back(task);
982  }
983  }
984 
985  if (keep.empty()) {
986  // Hmm, nothing to keep. Grab the postponed task with the highest
987  // priority and keep that instead.
988  nassertv(!postpone.empty());
989  ti = postpone.begin();
990  TaskHeap::iterator max_ti = ti;
991  ++ti;
992  while (ti != postpone.end()) {
993  if ((*ti)->_priority > (*max_ti)->_priority) {
994  max_ti = ti;
995  }
996  }
997 
998  // cerr << "Nothing to keep, keeping " << *(*max_ti) << " instead\n";
999 
1000  keep.push_back(*max_ti);
1001  postpone.erase(max_ti);
1002  }
1003 
1004  _active.swap(keep);
1005  if (_pickup_mode) {
1006  _this_active.insert(_this_active.end(), postpone.begin(), postpone.end());
1007  } else {
1008  _next_active.insert(_next_active.end(), postpone.begin(), postpone.end());
1009  }
1010 
1011  nassertv(!_active.empty());
1012 }
1013 
1014 /**
1015  * The private implementation of stop_threads; assumes the lock is already
1016  * held.
1017  */
1018 void AsyncTaskChain::
1019 do_stop_threads() {
1020  if (_state == S_started || _state == S_interrupted) {
1021  if (task_cat.is_debug() && !_threads.empty()) {
1022  task_cat.debug()
1023  << "Stopping " << _threads.size()
1024  << " threads for " << _manager->get_name()
1025  << " chain " << get_name()
1026  << " in " << *Thread::get_current_thread() << "\n";
1027  }
1028 
1029  _state = S_shutdown;
1030  _cvar.notify_all();
1031  _manager->_frame_cvar.notify_all();
1032 
1033  Threads wait_threads;
1034  wait_threads.swap(_threads);
1035 
1036  // We have to release the lock while we join, so the threads can wake up
1037  // and see that we're shutting down.
1038  _manager->_lock.unlock();
1039  Threads::iterator ti;
1040  for (ti = wait_threads.begin(); ti != wait_threads.end(); ++ti) {
1041  if (task_cat.is_debug()) {
1042  task_cat.debug()
1043  << "Waiting for " << *(*ti) << " in "
1044  << *Thread::get_current_thread() << "\n";
1045  }
1046  (*ti)->join();
1047  if (task_cat.is_spam()) {
1048  task_cat.spam()
1049  << "Done waiting for " << *(*ti) << " in "
1050  << *Thread::get_current_thread() << "\n";
1051  }
1052  }
1053  _manager->_lock.lock();
1054 
1055  _state = S_initial;
1056 
1057  // There might be one busy "thread" still: the main thread.
1058  nassertv(_num_busy_threads == 0 || _num_busy_threads == 1);
1059  cleanup_pickup_mode();
1060  }
1061 }
1062 
1063 /**
1064  * The private implementation of start_threads; assumes the lock is already
1065  * held.
1066  */
1067 void AsyncTaskChain::
1068 do_start_threads() {
1069  if (_state == S_interrupted) {
1070  do_stop_threads();
1071  }
1072 
1073  if (_state == S_initial) {
1074  _state = S_started;
1075  if (Thread::is_threading_supported() && _num_threads > 0) {
1076  if (task_cat.is_debug()) {
1077  task_cat.debug()
1078  << "Starting " << _num_threads << " threads for "
1079  << _manager->get_name() << " chain " << get_name() << "\n";
1080  }
1081  _needs_cleanup = true;
1082  _threads.reserve(_num_threads);
1083  for (int i = 0; i < _num_threads; ++i) {
1084  ostringstream strm;
1085  strm << _manager->get_name() << "_" << get_name() << "_" << i;
1086  PT(AsyncTaskChainThread) thread = new AsyncTaskChainThread(strm.str(), this);
1087  if (thread->start(_thread_priority, true)) {
1088  _threads.push_back(thread);
1089  }
1090  }
1091  }
1092  }
1093 }
1094 
1095 /**
1096  * Returns the set of tasks that are active (and not sleeping) on the task
1097  * chain, at the time of the call. Assumes the lock is held.
1098  */
1099 AsyncTaskCollection AsyncTaskChain::
1100 do_get_active_tasks() const {
1101  AsyncTaskCollection result;
1102 
1103  Threads::const_iterator thi;
1104  for (thi = _threads.begin(); thi != _threads.end(); ++thi) {
1105  AsyncTask *task = (*thi)->_servicing;
1106  if (task != nullptr) {
1107  result.add_task(task);
1108  }
1109  }
1110  TaskHeap::const_iterator ti;
1111  for (ti = _active.begin(); ti != _active.end(); ++ti) {
1112  AsyncTask *task = (*ti);
1113  result.add_task(task);
1114  }
1115  for (ti = _this_active.begin(); ti != _this_active.end(); ++ti) {
1116  AsyncTask *task = (*ti);
1117  result.add_task(task);
1118  }
1119  for (ti = _next_active.begin(); ti != _next_active.end(); ++ti) {
1120  AsyncTask *task = (*ti);
1121  result.add_task(task);
1122  }
1123 
1124  return result;
1125 }
1126 
1127 /**
1128  * Returns the set of tasks that are sleeping (and not active) on the task
1129  * chain, at the time of the call. Assumes the lock is held.
1130  */
1131 AsyncTaskCollection AsyncTaskChain::
1132 do_get_sleeping_tasks() const {
1133  AsyncTaskCollection result;
1134 
1135  TaskHeap::const_iterator ti;
1136  for (ti = _sleeping.begin(); ti != _sleeping.end(); ++ti) {
1137  AsyncTask *task = (*ti);
1138  result.add_task(task);
1139  }
1140 
1141  return result;
1142 }
1143 
1144 /**
1145  * The private implementation of poll(), this assumes the lock is already
1146  * held.
1147  */
1148 void AsyncTaskChain::
1149 do_poll() {
1150  thread_consider_yield();
1151  if (_num_tasks == 0) {
1152  return;
1153  }
1154 
1155  do_start_threads();
1156 
1157  if (!_threads.empty()) {
1158  return;
1159  }
1160 
1161  if (_num_busy_threads != 0) {
1162  // We are recursively nested within another task. Return, with a warning.
1163  task_cat.warning()
1164  << "Ignoring recursive poll() within another task.\n";
1165  return;
1166  }
1167 
1168  nassertv(!_pickup_mode);
1169 
1170  do {
1171  while (!_active.empty()) {
1172  if (_state == S_shutdown || _state == S_interrupted) {
1173  return;
1174  }
1175  int frame = _manager->_clock->get_frame_count();
1176  if (_current_frame != frame) {
1177  _current_frame = frame;
1178  _time_in_frame = 0.0;
1179  _block_till_next_frame = false;
1180  }
1181  if (_block_till_next_frame ||
1182  (_frame_budget >= 0.0 && _time_in_frame >= _frame_budget)) {
1183  // If we've exceeded our budget, stop here. We'll resume from this
1184  // point at the next call to poll().
1185  cleanup_pickup_mode();
1186  return;
1187  }
1188 
1189  _current_sort = _active.front()->get_sort();
1190 
1191  // Normally, there won't be any threads running at the same time we're
1192  // in poll(). But it's possible, if someone calls set_num_threads()
1193  // while we're processing.
1194  _num_busy_threads++;
1195  service_one_task(nullptr);
1196  _num_busy_threads--;
1197  _cvar.notify_all();
1198 
1199  if (!_threads.empty()) {
1200  return;
1201  }
1202  }
1203 
1204  finish_sort_group();
1205  } while (_pickup_mode);
1206 }
1207 
1208 /**
1209  * Clean up the damage from setting pickup mode. This means we restore the
1210  * _active and _next_active lists as they should have been without pickup
1211  * mode, for next frame. Assumes the lock is held.
1212  */
1213 void AsyncTaskChain::
1214 cleanup_pickup_mode() {
1215  if (_pickup_mode) {
1216  _pickup_mode = false;
1217 
1218  // Move everything to the _next_active queue.
1219  _next_active.insert(_next_active.end(), _this_active.begin(), _this_active.end());
1220  _this_active.clear();
1221  _next_active.insert(_next_active.end(), _active.begin(), _active.end());
1222  _active.clear();
1223 
1224  // Now finish the epoch properly.
1225  finish_sort_group();
1226  }
1227 }
1228 
1229 /**
1230  * The private implementation of output(), this assumes the lock is already
1231  * held.
1232  */
1233 void AsyncTaskChain::
1234 do_output(ostream &out) const {
1235  if (_manager != nullptr) {
1236  out << _manager->get_type() << " " << _manager->get_name();
1237  } else {
1238  out << "(no manager)";
1239  }
1240  out << " task chain " << get_name()
1241  << "; " << _num_tasks << " tasks";
1242 }
1243 
1244 /**
1245  * The private implementation of write(), this assumes the lock is already
1246  * held.
1247  */
1248 void AsyncTaskChain::
1249 do_write(ostream &out, int indent_level) const {
1250  indent(out, indent_level)
1251  << "Task chain \"" << get_name() << "\"\n";
1252  if (_num_threads > 0) {
1253  indent(out, indent_level + 2)
1254  << _num_threads << " threads, priority " << _thread_priority << "\n";
1255  }
1256  if (_frame_budget >= 0.0) {
1257  indent(out, indent_level + 2)
1258  << "frame budget " << _frame_budget << " s\n";
1259  }
1260  if (_timeslice_priority) {
1261  indent(out, indent_level + 2)
1262  << "timeslice priority\n";
1263  }
1264  if (_tick_clock) {
1265  indent(out, indent_level + 2)
1266  << "tick clock\n";
1267  }
1268 
1269  static const size_t buffer_size = 1024;
1270  char buffer[buffer_size];
1271  sprintf(buffer, " %-32s %8s %8s %8s %8s %6s",
1272  "Task",
1273  "sleep(s)",
1274  "dt(ms)", "avg", "max",
1275  "sort");
1276  nassertv(strlen(buffer) < buffer_size);
1277 
1278  indent(out, indent_level)
1279  << buffer << "\n";
1280 
1281  indent(out, indent_level);
1282  for (int i = 0; i < 32+8+8+8+8+6+7; ++i) {
1283  out << '-';
1284  }
1285  out << "\n";
1286 
1287  // Collect a list of all active tasks, then sort them into order for output.
1288  TaskHeap tasks = _active;
1289  tasks.insert(tasks.end(), _this_active.begin(), _this_active.end());
1290  tasks.insert(tasks.end(), _next_active.begin(), _next_active.end());
1291 
1292  Threads::const_iterator thi;
1293  for (thi = _threads.begin(); thi != _threads.end(); ++thi) {
1294  AsyncTask *task = (*thi)->_servicing;
1295  if (task != nullptr) {
1296  tasks.push_back(task);
1297  }
1298  }
1299 
1300  double now = _manager->_clock->get_frame_time();
1301 
1302  if (!tasks.empty()) {
1303  sort(tasks.begin(), tasks.end(), AsyncTaskSortPriority());
1304 
1305  // Since AsyncTaskSortPriority() sorts backwards (because of STL's
1306  // push_heap semantics), we go through the task list in reverse order to
1307  // print them forwards.
1308  TaskHeap::reverse_iterator ti;
1309  for (ti = tasks.rbegin(); ti != tasks.rend(); ++ti) {
1310  AsyncTask *task = (*ti);
1311  write_task_line(out, indent_level, task, now);
1312  }
1313  }
1314 
1315  // Instead of iterating through the _sleeping list in heap order, copy it
1316  // and then use repeated pops to get it out in sorted order, for the user's
1317  // satisfaction.
1318  TaskHeap sleeping = _sleeping;
1319  while (!sleeping.empty()) {
1320  PT(AsyncTask) task = sleeping.front();
1321  pop_heap(sleeping.begin(), sleeping.end(), AsyncTaskSortWakeTime());
1322  sleeping.pop_back();
1323 
1324  write_task_line(out, indent_level, task, now);
1325  }
1326 }
1327 
1328 /**
1329  * Writes a single line for a task in the report generated by do_write().
1330  * Assumes the lock is already held.
1331  */
1332 void AsyncTaskChain::
1333 write_task_line(ostream &out, int indent_level, AsyncTask *task, double now) const {
1334  char servicing_flag = ' ';
1335  if (task->_state == AsyncTask::S_servicing) {
1336  servicing_flag = '*';
1337  } else if (task->_state == AsyncTask::S_servicing_removed) {
1338  servicing_flag = '-';
1339  }
1340 
1341  static const size_t buffer_size = 1024;
1342  char buffer[buffer_size];
1343 
1344  if (task->_state == AsyncTask::S_sleeping) {
1345  // For sleeping tasks, include the wake time, as an elapsed time in
1346  // seconds.
1347  string name = task->get_name().substr(0, 32);
1348  sprintf(buffer, "%c%-32s %8.1f",
1349  servicing_flag, name.c_str(),
1350  task->_wake_time - now);
1351  } else {
1352  // For active tasks, don't include a wake time. This means we have more
1353  // space for the name.
1354  string name = task->get_name().substr(0, 41);
1355  sprintf(buffer, "%c%-41s",
1356  servicing_flag, name.c_str());
1357  }
1358  nassertv(strlen(buffer) < buffer_size);
1359 
1360  indent(out, indent_level)
1361  << buffer;
1362 
1363  if (task->_num_frames > 0) {
1364  sprintf(buffer, " %8.1f %8.1f %8.1f %6d",
1365  task->_dt * 1000.0, task->get_average_dt() * 1000.0,
1366  task->_max_dt * 1000.0,
1367  task->_sort);
1368  } else {
1369  // No statistics for a task that hasn't run yet.
1370  sprintf(buffer, " %8s %8s %8s %6d",
1371  "", "", "",
1372  task->_sort);
1373  }
1374 
1375  nassertv(strlen(buffer) < buffer_size);
1376  out << buffer << "\n";
1377 }
1378 
1379 /**
1380  *
1381  */
1382 AsyncTaskChain::AsyncTaskChainThread::
1383 AsyncTaskChainThread(const string &name, AsyncTaskChain *chain) :
1384  Thread(name, chain->get_name()),
1385  _chain(chain),
1386  _servicing(nullptr)
1387 {
1388 }
1389 
1390 /**
1391  *
1392  */
1393 void AsyncTaskChain::AsyncTaskChainThread::
1394 thread_main() {
1395  MutexHolder holder(_chain->_manager->_lock);
1396  while (_chain->_state != S_shutdown && _chain->_state != S_interrupted) {
1397  thread_consider_yield();
1398  if (!_chain->_active.empty() &&
1399  _chain->_active.front()->get_sort() == _chain->_current_sort) {
1400 
1401  int frame = _chain->_manager->_clock->get_frame_count();
1402  if (_chain->_current_frame != frame) {
1403  _chain->_current_frame = frame;
1404  _chain->_time_in_frame = 0.0;
1405  _chain->_block_till_next_frame = false;
1406  }
1407 
1408  // If we've exceeded our frame budget, sleep until the next frame.
1409  if (_chain->_block_till_next_frame ||
1410  (_chain->_frame_budget >= 0.0 && _chain->_time_in_frame >= _chain->_frame_budget)) {
1411  while ((_chain->_block_till_next_frame ||
1412  (_chain->_frame_budget >= 0.0 && _chain->_time_in_frame >= _chain->_frame_budget)) &&
1413  _chain->_state != S_shutdown && _chain->_state != S_interrupted) {
1414  _chain->cleanup_pickup_mode();
1415  _chain->_manager->_frame_cvar.wait();
1416  frame = _chain->_manager->_clock->get_frame_count();
1417  if (_chain->_current_frame != frame) {
1418  _chain->_current_frame = frame;
1419  _chain->_time_in_frame = 0.0;
1420  _chain->_block_till_next_frame = false;
1421  }
1422  }
1423  // Now that it's the next frame, go back to the top of the loop.
1424  continue;
1425  }
1426 
1427  PStatTimer timer(_task_pcollector);
1428  _chain->_num_busy_threads++;
1429  _chain->service_one_task(this);
1430  _chain->_num_busy_threads--;
1431  _chain->_cvar.notify_all();
1432 
1433  } else {
1434  // We've finished all the available tasks of the current sort value. We
1435  // can't pick up a new task until all of the threads finish the tasks
1436  // with the same sort value.
1437  if (_chain->_num_busy_threads == 0) {
1438  // We're the last thread to finish. Update _current_sort.
1439  if (!_chain->finish_sort_group()) {
1440  // Nothing to do. Wait for more tasks to be added.
1441  if (_chain->_sleeping.empty()) {
1442  PStatTimer timer(_wait_pcollector);
1443  _chain->_cvar.wait();
1444  } else {
1445  double wake_time = _chain->do_get_next_wake_time();
1446  double now = _chain->_manager->_clock->get_frame_time();
1447  double timeout = max(wake_time - now, 0.0);
1448  PStatTimer timer(_wait_pcollector);
1449  _chain->_cvar.wait(timeout);
1450  }
1451  }
1452 
1453  } else {
1454  // Wait for the other threads to finish their current task before we
1455  // continue.
1456  PStatTimer timer(_wait_pcollector);
1457  _chain->_cvar.wait();
1458  }
1459  }
1460  }
1461 }
MutexHolder
A lightweight C++ object whose constructor calls acquire() and whose destructor calls release() on a ...
Definition: mutexHolder.h:25
indent
std::ostream & indent(std::ostream &out, int indent_level)
A handy function for doing text formatting.
Definition: indent.cxx:20
AsyncTaskChain::wait_for_tasks
void wait_for_tasks()
Blocks until the task list is empty.
Definition: asyncTaskChain.cxx:313
AsyncTaskChain::get_frame_sync
bool get_frame_sync() const
Returns the frame_sync flag.
Definition: asyncTaskChain.cxx:221
AsyncTaskChain
The AsyncTaskChain is a subset of the AsyncTaskManager.
Definition: asyncTaskChain.h:50
AsyncTaskChain::get_tick_clock
bool get_tick_clock() const
Returns the tick_clock flag.
Definition: asyncTaskChain.cxx:92
asyncTaskManager.h
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
AsyncTaskChain::get_active_tasks
AsyncTaskCollection get_active_tasks() const
Returns the set of tasks that are active (and not sleeping) on the task chain, at the time of the cal...
Definition: asyncTaskChain.cxx:345
AsyncFuture::notify_done
void notify_done(bool clean_exit)
Schedules the done callbacks.
Definition: asyncFuture.cxx:146
AsyncTaskChain::get_frame_budget
double get_frame_budget() const
Returns the maximum amount of time per frame the tasks on this chain are granted for execution.
Definition: asyncTaskChain.cxx:192
MutexDirect::lock
void lock()
Alias for acquire() to match C++11 semantics.
Definition: mutexDirect.I:19
AsyncTaskCollection
A list of tasks, for instance as returned by some of the AsyncTaskManager query functions.
Definition: asyncTaskCollection.h:27
clockObject.h
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
pStatTimer.h
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
AsyncTaskChain::poll
void poll()
Runs through all the tasks in the task list, once, if the task chain is running in single-threaded mo...
Definition: asyncTaskChain.cxx:369
AsyncTask::get_average_dt
get_average_dt
Returns the average amount of time elapsed during each of the task's previous run cycles,...
Definition: asyncTask.h:121
AsyncTaskChain::set_thread_priority
void set_thread_priority(ThreadPriority priority)
Changes the priority associated with threads that serve this task chain.
Definition: asyncTaskChain.cxx:152
AsyncTask::get_sort
get_sort
Returns the task's current sort value.
Definition: asyncTask.h:115
AsyncTaskCollection::add_tasks_from
void add_tasks_from(const AsyncTaskCollection &other)
Adds all the AsyncTasks indicated in the other collection to this task.
Definition: asyncTaskCollection.cxx:99
AsyncTaskChain::set_num_threads
void set_num_threads(int num_threads)
Changes the number of threads for this task chain.
Definition: asyncTaskChain.cxx:102
AsyncTask::has_delay
bool has_delay() const
Returns true if a delay has been set for this task via set_delay(), or false otherwise.
Definition: asyncTask.I:93
AsyncTaskChain::get_next_wake_time
double get_next_wake_time() const
Returns the scheduled time (on the manager's clock) of the next sleeping task, on any task chain,...
Definition: asyncTaskChain.cxx:380
AsyncTaskChain::get_thread_priority
ThreadPriority get_thread_priority() const
Returns the priority associated with threads that serve this task chain.
Definition: asyncTaskChain.cxx:168
config_event.h
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
Thread::get_current_thread
get_current_thread
Returns a pointer to the currently-executing Thread object.
Definition: thread.h:109
PStatTimer
A lightweight class that can be used to automatically start and stop a PStatCollector around a sectio...
Definition: pStatTimer.h:30
AsyncTaskChain::has_task
bool has_task(AsyncTask *task) const
Returns true if the indicated task has been added to this AsyncTaskChain, false otherwise.
Definition: asyncTaskChain.cxx:292
TypeHandle
TypeHandle is the identifier used to differentiate C++ class types.
Definition: typeHandle.h:81
AsyncTaskChain::set_tick_clock
void set_tick_clock(bool tick_clock)
Sets the tick_clock flag.
Definition: asyncTaskChain.cxx:83
AsyncTaskChain::get_tasks
AsyncTaskCollection get_tasks() const
Returns the set of tasks that are active or sleeping on the task chain, at the time of the call.
Definition: asyncTaskChain.cxx:333
AsyncTaskChain::get_num_running_threads
int get_num_running_threads() const
Returns the number of threads that have been created and are actively running.
Definition: asyncTaskChain.cxx:142
PStatCollector
A lightweight class that represents a single element that may be timed and/or counted via stats.
Definition: pStatCollector.h:43
AsyncTaskChain::start_threads
void start_threads()
Starts any requested threads to service the tasks on the queue.
Definition: asyncTaskChain.cxx:280
AsyncTask
This class represents a concrete task performed by an AsyncManager.
Definition: asyncTask.h:32
Thread::is_threading_supported
is_threading_supported
Returns true if threading support has been compiled in and enabled, or false if no threading is avail...
Definition: thread.h:112
MutexDirect::unlock
void unlock()
Alias for release() to match C++11 semantics.
Definition: mutexDirect.I:39
event.h
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
AsyncTaskChain::get_num_threads
int get_num_threads() const
Returns the number of threads that will be servicing tasks for this chain.
Definition: asyncTaskChain.cxx:131
AsyncTaskChain::get_timeslice_priority
bool get_timeslice_priority() const
Returns the timeslice_priority flag.
Definition: asyncTaskChain.cxx:255
AsyncTaskChain::get_num_tasks
int get_num_tasks() const
Returns the number of tasks that are currently active or sleeping within the task chain.
Definition: asyncTaskChain.cxx:323
AsyncTask::get_delay
double get_delay() const
Returns the delay value that has been set via set_delay, if any.
Definition: asyncTask.I:101
ConditionVarFullDirect::wait
void wait()
Waits on the condition.
Definition: conditionVarFullDirect.I:53
AsyncTaskManager
A class to manage a loose queue of isolated tasks, which can be performed either synchronously (in th...
Definition: asyncTaskManager.h:48
asyncTaskChain.h
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
AsyncTaskChain::set_timeslice_priority
void set_timeslice_priority(bool timeslice_priority)
Sets the timeslice_priority flag.
Definition: asyncTaskChain.cxx:244
pStatClient.h
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
mutexHolder.h
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
Namable
A base class for all things which can have a name.
Definition: namable.h:26
AsyncTaskChain::get_sleeping_tasks
AsyncTaskCollection get_sleeping_tasks() const
Returns the set of tasks that are sleeping (and not active) on the task chain, at the time of the cal...
Definition: asyncTaskChain.cxx:355
AsyncTaskChain::set_frame_budget
void set_frame_budget(double frame_budget)
Sets the maximum amount of time per frame the tasks on this chain are granted for execution.
Definition: asyncTaskChain.cxx:182
indent.h
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
AsyncTaskChain::set_frame_sync
void set_frame_sync(bool frame_sync)
Sets the frame_sync flag.
Definition: asyncTaskChain.cxx:212
Thread
A thread; that is, a lightweight process.
Definition: thread.h:46
ConditionVarFullDirect::notify_all
void notify_all()
Informs all of the other threads who are currently blocked on wait() that the relevant condition has ...
Definition: conditionVarFullDirect.I:100
AsyncTaskChain::stop_threads
void stop_threads()
Stops any threads that are currently running.
Definition: asyncTaskChain.cxx:266
AsyncTaskCollection::add_task
void add_task(AsyncTask *task)
Adds a new AsyncTask to the collection.
Definition: asyncTaskCollection.cxx:45