Panda3D
asyncTaskChain.cxx
Go to the documentation of this file.
1 /**
2  * PANDA 3D SOFTWARE
3  * Copyright (c) Carnegie Mellon University. All rights reserved.
4  *
5  * All use of this software is subject to the terms of the revised BSD
6  * license. You should have received a copy of this license along
7  * with this source code in a file named "LICENSE."
8  *
9  * @file asyncTaskChain.cxx
10  * @author drose
11  * @date 2006-08-23
12  */
13 
14 #include "asyncTaskChain.h"
15 #include "asyncTaskManager.h"
16 #include "event.h"
17 #include "mutexHolder.h"
18 #include "indent.h"
19 #include "pStatClient.h"
20 #include "pStatTimer.h"
21 #include "clockObject.h"
22 #include "config_event.h"
23 #include <algorithm>
24 #include <stdio.h> // For sprintf/snprintf
25 
26 using std::max;
27 using std::ostream;
28 using std::ostringstream;
29 using std::string;
30 
31 TypeHandle AsyncTaskChain::_type_handle;
32 
33 PStatCollector AsyncTaskChain::_task_pcollector("Task");
34 PStatCollector AsyncTaskChain::_wait_pcollector("Wait");
35 
36 /**
37  *
38  */
39 AsyncTaskChain::
40 AsyncTaskChain(AsyncTaskManager *manager, const string &name) :
41  Namable(name),
42  _manager(manager),
43  _cvar(manager->_lock),
44  _tick_clock(false),
45  _timeslice_priority(false),
46  _num_threads(0),
47  _thread_priority(TP_normal),
48  _frame_budget(-1.0),
49  _frame_sync(false),
50  _num_busy_threads(0),
51  _num_tasks(0),
52  _num_awaiting_tasks(0),
53  _state(S_initial),
54  _current_sort(-INT_MAX),
55  _pickup_mode(false),
56  _needs_cleanup(false),
57  _current_frame(0),
58  _time_in_frame(0.0),
59  _block_till_next_frame(false),
60  _next_implicit_sort(0)
61 {
62 }
63 
64 /**
65  *
66  */
67 AsyncTaskChain::
68 ~AsyncTaskChain() {
69  // We only grab the lock if _needs_cleanup is true. This way, the temporary
70  // AsyncTaskChain objects created (and destructed) within the task manager
71  // won't risk a double-lock.
72  if (_needs_cleanup) {
73  MutexHolder holder(_manager->_lock);
74  do_cleanup();
75  }
76 }
77 
78 /**
79  * Sets the tick_clock flag. When this is true, get_clock()->tick() will be
80  * called automatically at each task epoch. This is false by default.
81  */
83 set_tick_clock(bool tick_clock) {
84  MutexHolder holder(_manager->_lock);
85  _tick_clock = tick_clock;
86 }
87 
88 /**
89  * Returns the tick_clock flag. See set_tick_clock().
90  */
92 get_tick_clock() const {
93  MutexHolder holder(_manager->_lock);
94  return _tick_clock;
95 }
96 
97 /**
98  * Changes the number of threads for this task chain. This may require
99  * stopping the threads if they are already running.
100  */
101 void AsyncTaskChain::
102 set_num_threads(int num_threads) {
103  nassertv(num_threads >= 0);
104 
105  if (task_cat.is_debug()) {
106  do_output(task_cat.debug());
107  task_cat.debug(false)
108  << ": set_num_threads(" << num_threads << ")\n";
109  }
110 
111  if (!Thread::is_threading_supported()) {
112  num_threads = 0;
113  }
114 
115  MutexHolder holder(_manager->_lock);
116  if (_num_threads != num_threads) {
117  do_stop_threads();
118  _num_threads = num_threads;
119 
120  if (_num_tasks != 0) {
121  do_start_threads();
122  }
123  }
124 }
125 
126 /**
127  * Returns the number of threads that will be servicing tasks for this chain.
128  * Also see get_num_running_threads().
129  */
132  MutexHolder holder(_manager->_lock);
133  return _num_threads;
134 }
135 
136 /**
137  * Returns the number of threads that have been created and are actively
138  * running. This will return 0 before the threads have been started; it will
139  * also return 0 if thread support is not available.
140  */
143  MutexHolder holder(_manager->_lock);
144  return _threads.size();
145 }
146 
147 /**
148  * Changes the priority associated with threads that serve this task chain.
149  * This may require stopping the threads if they are already running.
150  */
151 void AsyncTaskChain::
152 set_thread_priority(ThreadPriority priority) {
153  MutexHolder holder(_manager->_lock);
154  if (_thread_priority != priority) {
155  do_stop_threads();
156  _thread_priority = priority;
157 
158  if (_num_tasks != 0) {
159  do_start_threads();
160  }
161  }
162 }
163 
164 /**
165  * Returns the priority associated with threads that serve this task chain.
166  */
167 ThreadPriority AsyncTaskChain::
169  MutexHolder holder(_manager->_lock);
170  return _thread_priority;
171 }
172 
173 /**
174  * Sets the maximum amount of time per frame the tasks on this chain are
175  * granted for execution. If this is less than zero, there is no limit; if it
176  * is >= 0, it represents a maximum amount of time (in seconds) that will be
177  * used to execute tasks. If this time is exceeded in any one frame, the task
178  * chain will stop executing tasks until the next frame, as defined by the
179  * TaskManager's clock.
180  */
181 void AsyncTaskChain::
182 set_frame_budget(double frame_budget) {
183  MutexHolder holder(_manager->_lock);
184  _frame_budget = frame_budget;
185 }
186 
187 /**
188  * Returns the maximum amount of time per frame the tasks on this chain are
189  * granted for execution. See set_frame_budget().
190  */
191 double AsyncTaskChain::
193  MutexHolder holder(_manager->_lock);
194  return _frame_budget;
195 }
196 
197 /**
198  * Sets the frame_sync flag. When this flag is true, this task chain will be
199  * forced to sync with the TaskManager's clock. It will run no faster than
200  * one epoch per clock frame.
201  *
202  * When this flag is false, the default, the task chain will finish all of its
203  * tasks and then immediately start from the first task again, regardless of
204  * the clock frame. When it is true, the task chain will finish all of its
205  * tasks and then wait for the clock to tick to the next frame before resuming
206  * the first task.
207  *
208  * This only makes sense for threaded task chains. Non-threaded task chains
209  * are automatically synchronous.
210  */
211 void AsyncTaskChain::
212 set_frame_sync(bool frame_sync) {
213  MutexHolder holder(_manager->_lock);
214  _frame_sync = frame_sync;
215 }
216 
217 /**
218  * Returns the frame_sync flag. See set_frame_sync().
219  */
220 bool AsyncTaskChain::
221 get_frame_sync() const {
222  MutexHolder holder(_manager->_lock);
223  return _frame_sync;
224 }
225 
226 /**
227  * Sets the timeslice_priority flag. This changes the interpretation of
228  * priority, and the number of times per epoch each task will run.
229  *
230  * When this flag is true, some tasks might not run in any given epoch.
231  * Instead, tasks with priority higher than 1 will be given precedence, in
232  * proportion to the amount of time they have already used. This gives
233  * higher-priority tasks more runtime than lower-priority tasks. Each task
234  * gets the amount of time proportional to its priority value, so a task with
235  * priority 100 will get five times as much processing time as a task with
236  * priority 20. For these purposes, priority values less than 1 are deemed to
237  * be equal to 1.
238  *
239  * When this flag is false (the default), all tasks are run exactly once each
240  * epoch, round-robin style. Priority is only used to determine which task
241  * runs first within tasks of the same sort value.
242  */
243 void AsyncTaskChain::
244 set_timeslice_priority(bool timeslice_priority) {
245  MutexHolder holder(_manager->_lock);
246  _timeslice_priority = timeslice_priority;
247 }
248 
249 /**
250  * Returns the timeslice_priority flag. This changes the interpretation of
251  * priority, and the number of times per epoch each task will run. See
252  * set_timeslice_priority().
253  */
254 bool AsyncTaskChain::
256  MutexHolder holder(_manager->_lock);
257  return _timeslice_priority;
258 }
259 
260 /**
261  * Stops any threads that are currently running. If any tasks are still
262  * pending and have not yet been picked up by a thread, they will not be
263  * serviced unless poll() or start_threads() is later called.
264  */
265 void AsyncTaskChain::
267  if (_state == S_started || _state == S_interrupted) {
268  // Clean up all of the threads.
269  MutexHolder holder(_manager->_lock);
270  do_stop_threads();
271  }
272 }
273 
274 /**
275  * Starts any requested threads to service the tasks on the queue. This is
276  * normally not necessary, since adding a task will start the threads
277  * automatically.
278  */
279 void AsyncTaskChain::
281  if (_state == S_initial || _state == S_interrupted) {
282  MutexHolder holder(_manager->_lock);
283  do_start_threads();
284  }
285 }
286 
287 /**
288  * Returns true if the indicated task has been added to this AsyncTaskChain,
289  * false otherwise.
290  */
291 bool AsyncTaskChain::
292 has_task(AsyncTask *task) const {
293  MutexHolder holder(_manager->_lock);
294 
295  if (task->_chain != this) {
296  nassertr(!do_has_task(task), false);
297  return false;
298  }
299 
300  if (task->_state == AsyncTask::S_servicing_removed) {
301  return false;
302  }
303 
304  // The task might not actually be in the active queue, since it might be
305  // being serviced right now. That's OK.
306  return true;
307 }
308 
309 /**
310  * Blocks until the task list is empty.
311  */
312 void AsyncTaskChain::
314  MutexHolder holder(_manager->_lock);
315  do_wait_for_tasks();
316 }
317 
318 /**
319  * Returns the number of tasks that are currently active or sleeping within
320  * the task chain.
321  */
323 get_num_tasks() const {
324  MutexHolder holder(_manager->_lock);
325  return _num_tasks;
326 }
327 
328 /**
329  * Returns the set of tasks that are active or sleeping on the task chain, at
330  * the time of the call.
331  */
333 get_tasks() const {
334  MutexHolder holder(_manager->_lock);
335  AsyncTaskCollection result = do_get_active_tasks();
336  result.add_tasks_from(do_get_sleeping_tasks());
337  return result;
338 }
339 
340 /**
341  * Returns the set of tasks that are active (and not sleeping) on the task
342  * chain, at the time of the call.
343  */
346  MutexHolder holder(_manager->_lock);
347  return do_get_active_tasks();
348 }
349 
350 /**
351  * Returns the set of tasks that are sleeping (and not active) on the task
352  * chain, at the time of the call.
353  */
356  MutexHolder holder(_manager->_lock);
357  return do_get_sleeping_tasks();
358 }
359 
360 /**
361  * Runs through all the tasks in the task list, once, if the task chain is
362  * running in single-threaded mode (no threads available). This method does
363  * nothing in threaded mode, so it may safely be called in either case.
364  *
365  * Normally, you would not call this function directly; instead, call
366  * AsyncTaskManager::poll(), which polls all of the task chains in sequence.
367  */
368 void AsyncTaskChain::
369 poll() {
370  MutexHolder holder(_manager->_lock);
371  do_poll();
372 }
373 
374 /**
375  * Returns the scheduled time (on the manager's clock) of the next sleeping
376  * task, on any task chain, to awaken. Returns -1 if there are no sleeping
377  * tasks.
378  */
379 double AsyncTaskChain::
381  MutexHolder holder(_manager->_lock);
382  return do_get_next_wake_time();
383 }
384 
385 /**
386  *
387  */
388 void AsyncTaskChain::
389 output(ostream &out) const {
390  MutexHolder holder(_manager->_lock);
391  do_output(out);
392 }
393 
394 /**
395  *
396  */
397 void AsyncTaskChain::
398 write(ostream &out, int indent_level) const {
399  MutexHolder holder(_manager->_lock);
400  do_write(out, indent_level);
401 }
402 
403 /**
404  * Adds the indicated task to the active queue. It is an error if the task is
405  * already added to this or any other active queue.
406  *
407  * This is normally called only by the AsyncTaskManager. Assumes the lock is
408  * already held.
409  */
410 void AsyncTaskChain::
411 do_add(AsyncTask *task) {
412  nassertv(task->_chain == nullptr &&
413  task->_manager == nullptr &&
414  task->_chain_name == get_name() &&
415  task->_state == AsyncTask::S_inactive);
416  nassertv(!do_has_task(task));
417 
418  do_start_threads();
419 
420  task->_chain = this;
421  task->_manager = _manager;
422 
423  double now = _manager->_clock->get_frame_time();
424  task->_start_time = now;
425  task->_start_frame = _manager->_clock->get_frame_count();
426 
427  // Remember the order in which tasks were added to the chain.
428  task->_implicit_sort = _next_implicit_sort++;
429 
430  _manager->add_task_by_name(task);
431 
432  if (task->has_delay()) {
433  // This is a deferred task. Add it to the sleeping queue.
434  task->_wake_time = now + task->get_delay();
435  task->_start_time = task->_wake_time;
436  task->_state = AsyncTask::S_sleeping;
437  _sleeping.push_back(task);
438  push_heap(_sleeping.begin(), _sleeping.end(), AsyncTaskSortWakeTime());
439 
440  } else {
441  // This is an active task. Add it to the active set.
442  task->_state = AsyncTask::S_active;
443  if (task_cat.is_spam()) {
444  task_cat.spam()
445  << "Adding " << *task << " with sort " << task->get_sort()
446  << " to chain " << get_name() << " with current_sort "
447  << _current_sort << "\n";
448  }
449  if (task->get_sort() >= _current_sort) {
450  // It will run this frame.
451  _active.push_back(task);
452  push_heap(_active.begin(), _active.end(), AsyncTaskSortPriority());
453  } else {
454  // It will run next frame.
455  _next_active.push_back(task);
456  }
457  }
458  ++_num_tasks;
459  ++(_manager->_num_tasks);
460  _needs_cleanup = true;
461 
462  _cvar.notify_all();
463 }
464 
465 /**
466  * Removes the indicated task from this chain. Returns true if removed, false
467  * otherwise. Assumes the lock is already held. The task->upon_death()
468  * method is called with clean_exit=false if upon_death is given.
469  */
470 bool AsyncTaskChain::
471 do_remove(AsyncTask *task, bool upon_death) {
472  nassertr(task->_chain == this, false);
473 
474  switch (task->_state) {
475  case AsyncTask::S_servicing:
476  // This task is being serviced. upon_death will be called afterwards.
477  task->_state = AsyncTask::S_servicing_removed;
478  return true;
479 
480  case AsyncTask::S_servicing_removed:
481  // Being serviced, though it is already marked to be removed afterwards.
482  return false;
483 
484  case AsyncTask::S_sleeping:
485  // Sleeping, easy.
486  {
487  int index = find_task_on_heap(_sleeping, task);
488  nassertr(index != -1, false);
489  PT(AsyncTask) hold_task = task;
490  _sleeping.erase(_sleeping.begin() + index);
491  make_heap(_sleeping.begin(), _sleeping.end(), AsyncTaskSortWakeTime());
492  cleanup_task(task, upon_death, false);
493  }
494  return true;
495 
496  case AsyncTask::S_active:
497  {
498  // Active, but not being serviced, easy.
499  PT(AsyncTask) hold_task = task;
500  int index = find_task_on_heap(_active, task);
501  if (index != -1) {
502  _active.erase(_active.begin() + index);
503  make_heap(_active.begin(), _active.end(), AsyncTaskSortPriority());
504  } else {
505  index = find_task_on_heap(_next_active, task);
506  if (index != -1) {
507  _next_active.erase(_next_active.begin() + index);
508  } else {
509  index = find_task_on_heap(_this_active, task);
510  nassertr(index != -1, false);
511  }
512  }
513  cleanup_task(task, upon_death, false);
514  return true;
515  }
516 
517  default:
518  break;
519  }
520 
521  return false;
522 }
523 
524 /**
525  * Blocks until the task list is empty. Assumes the lock is held.
526  */
527 void AsyncTaskChain::
528 do_wait_for_tasks() {
529  do_start_threads();
530 
531  if (_threads.empty()) {
532  // Non-threaded case.
533  while (_num_tasks > 0) {
534  if (_state == S_shutdown || _state == S_interrupted) {
535  return;
536  }
537  do_poll();
538  }
539 
540  } else {
541  // Threaded case.
542  while (_num_tasks > 0) {
543  if (_state == S_shutdown || _state == S_interrupted) {
544  return;
545  }
546 
547  PStatTimer timer(_wait_pcollector);
548  _cvar.wait();
549  }
550  }
551 }
552 
553 /**
554  * Stops all threads and messily empties the task list. This is intended to
555  * be called on destruction only. Assumes the lock is already held.
556  */
557 void AsyncTaskChain::
558 do_cleanup() {
559  if (task_cat.is_spam()) {
560  do_output(task_cat.spam());
561  task_cat.spam(false)
562  << ": do_cleanup()\n";
563  }
564  do_stop_threads();
565  _num_threads = 0;
566 
567  // Don't call the upon_death functions while we clean up the tasks.
568  // Instead, store all the tasks in a list as we clean them up, and then call
569  // the upon_death functions all at once. We do this because calling
570  // upon_death wil release the lock, allowing the iterators to become
571  // invalid.
572 
573  TaskHeap dead;
574  dead.reserve(_num_tasks);
575 
576  _needs_cleanup = false;
577 
578  TaskHeap::const_iterator ti;
579  for (ti = _active.begin(); ti != _active.end(); ++ti) {
580  AsyncTask *task = (*ti);
581  dead.push_back(task);
582  cleanup_task(task, false, false);
583  }
584  for (ti = _this_active.begin(); ti != _this_active.end(); ++ti) {
585  AsyncTask *task = (*ti);
586  dead.push_back(task);
587  cleanup_task(task, false, false);
588  }
589  for (ti = _next_active.begin(); ti != _next_active.end(); ++ti) {
590  AsyncTask *task = (*ti);
591  dead.push_back(task);
592  cleanup_task(task, false, false);
593  }
594  for (ti = _sleeping.begin(); ti != _sleeping.end(); ++ti) {
595  AsyncTask *task = (*ti);
596  dead.push_back(task);
597  cleanup_task(task, false, false);
598  }
599 
600  // There might still be one task remaining: the currently-executing task.
601  nassertv(_num_tasks == 0 || _num_tasks == 1);
602 
603  // Now go back and call the upon_death functions.
604  _manager->_lock.unlock();
605  for (ti = dead.begin(); ti != dead.end(); ++ti) {
606  (*ti)->upon_death(_manager, false);
607  }
608  _manager->_lock.lock();
609 
610  if (task_cat.is_spam()) {
611  do_output(task_cat.spam());
612  task_cat.spam(false)
613  << ": done do_cleanup()\n";
614  }
615 }
616 
617 /**
618  * Returns true if the task is on one of the task lists, false if it is not
619  * (false may mean that the task is currently being serviced). Assumes the
620  * lock is currently held.
621  */
622 bool AsyncTaskChain::
623 do_has_task(AsyncTask *task) const {
624  return (find_task_on_heap(_active, task) != -1 ||
625  find_task_on_heap(_next_active, task) != -1 ||
626  find_task_on_heap(_sleeping, task) != -1 ||
627  find_task_on_heap(_this_active, task) != -1);
628 }
629 
630 /**
631  * Returns the index number of the indicated task within the specified task
632  * list, or -1 if the task is not found in the list (this may mean that it is
633  * currently being serviced). Assumes that the lock is currently held.
634  */
635 int AsyncTaskChain::
636 find_task_on_heap(const TaskHeap &heap, AsyncTask *task) const {
637  for (int i = 0; i < (int)heap.size(); ++i) {
638  if (heap[i] == task) {
639  return i;
640  }
641  }
642 
643  return -1;
644 }
645 
646 /**
647  * Pops a single task off the active queue, services it, and restores it to
648  * the end of the queue. This is called internally only within one of the
649  * task threads. Assumes the lock is already held.
650  *
651  * Note that the lock may be temporarily released by this method.
652  */
653 void AsyncTaskChain::
654 service_one_task(AsyncTaskChain::AsyncTaskChainThread *thread) {
655  if (!_active.empty()) {
656  PT(AsyncTask) task = _active.front();
657  pop_heap(_active.begin(), _active.end(), AsyncTaskSortPriority());
658  _active.pop_back();
659 
660  if (thread != nullptr) {
661  thread->_servicing = task;
662  }
663 
664  if (task_cat.is_spam()) {
665  task_cat.spam()
666  << "Servicing " << *task << " in "
667  << *Thread::get_current_thread() << "\n";
668  }
669 
670  nassertv(task->get_sort() == _current_sort);
671  nassertv(task->_state == AsyncTask::S_active);
672  task->_state = AsyncTask::S_servicing;
673  task->_servicing_thread = thread;
674 
675  AsyncTask::DoneStatus ds = task->unlock_and_do_task();
676 
677  if (thread != nullptr) {
678  thread->_servicing = nullptr;
679  }
680  task->_servicing_thread = nullptr;
681 
682  if (task->_chain == this) {
683  if (task->_state == AsyncTask::S_servicing_removed) {
684  // This task wants to kill itself.
685  cleanup_task(task, true, false);
686 
687  } else if (task->_chain_name != get_name()) {
688  // The task wants to jump to a different chain.
689  PT(AsyncTask) hold_task = task;
690  cleanup_task(task, false, false);
691  task->jump_to_task_chain(_manager);
692 
693  } else {
694  switch (ds) {
695  case AsyncTask::DS_cont:
696  // The task is still alive; put it on the next frame's active queue.
697  task->_state = AsyncTask::S_active;
698  _next_active.push_back(task);
699  _cvar.notify_all();
700  break;
701 
702  case AsyncTask::DS_again:
703  // The task wants to sleep again.
704  {
705  double now = _manager->_clock->get_frame_time();
706  task->_wake_time = now + task->get_delay();
707  task->_start_time = task->_wake_time;
708  task->_state = AsyncTask::S_sleeping;
709  _sleeping.push_back(task);
710  push_heap(_sleeping.begin(), _sleeping.end(), AsyncTaskSortWakeTime());
711  if (task_cat.is_spam()) {
712  task_cat.spam()
713  << "Sleeping " << *task << ", wake time at "
714  << task->_wake_time - now << "\n";
715  }
716  _cvar.notify_all();
717  }
718  break;
719 
720  case AsyncTask::DS_pickup:
721  // The task wants to run again this frame if possible.
722  task->_state = AsyncTask::S_active;
723  _this_active.push_back(task);
724  _cvar.notify_all();
725  break;
726 
727  case AsyncTask::DS_interrupt:
728  // The task had an exception and wants to raise a big flag.
729  task->_state = AsyncTask::S_active;
730  _next_active.push_back(task);
731  if (_state == S_started) {
732  _state = S_interrupted;
733  _cvar.notify_all();
734  }
735  break;
736 
737  case AsyncTask::DS_await:
738  // The task wants to wait for another one to finish.
739  task->_state = AsyncTask::S_awaiting;
740  _cvar.notify_all();
741  ++_num_awaiting_tasks;
742  break;
743 
744  default:
745  // The task has finished.
746  cleanup_task(task, true, true);
747  }
748  }
749  } else {
750  task_cat.error()
751  << "Task is no longer on chain " << get_name()
752  << ": " << *task << "\n";
753  }
754 
755  if (task_cat.is_spam()) {
756  task_cat.spam()
757  << "Done servicing " << *task << " in "
758  << *Thread::get_current_thread() << "\n";
759  }
760  }
761  thread_consider_yield();
762 }
763 
764 /**
765  * Called internally when a task has completed (or been interrupted) and is
766  * about to be removed from the active queue. Assumes the lock is held.
767  *
768  * If upon_death is true, then task->upon_death() will also be called, with
769  * the indicated clean_exit parameter.
770  *
771  * Note that the lock may be temporarily released by this method.
772  */
773 void AsyncTaskChain::
774 cleanup_task(AsyncTask *task, bool upon_death, bool clean_exit) {
775  if (task_cat.is_spam()) {
776  do_output(task_cat.spam());
777  task_cat.spam(false)
778  << ": cleanup_task(" << *task << ", " << upon_death << ", " << clean_exit
779  << ")\n";
780  }
781 
782  nassertv(task->_chain == this);
783 
784  task->_state = AsyncTask::S_inactive;
785  task->_chain = nullptr;
786  --_num_tasks;
787  --(_manager->_num_tasks);
788 
789  _manager->remove_task_by_name(task);
790 
791  if (upon_death && task->set_future_state(clean_exit ? AsyncFuture::FS_finished
792  : AsyncFuture::FS_cancelled)) {
793  task->notify_done(clean_exit);
794  }
795 
796  task->_manager = nullptr;
797 
798  if (upon_death) {
799  _manager->_lock.unlock();
800  task->upon_death(_manager, clean_exit);
801  _manager->_lock.lock();
802  }
803 }
804 
805 /**
806  * Called internally when all tasks of a given sort value have been completed,
807  * and it is time to increment to the next sort value, or begin the next
808  * epoch. Assumes the lock is held.
809  *
810  * Returns true if there are more tasks on the queue after this operation, or
811  * false if the task list is empty and we need to wait.
812  */
813 bool AsyncTaskChain::
814 finish_sort_group() {
815  nassertr(_num_busy_threads == 0, true);
816 
817  if (!_threads.empty()) {
818  PStatClient::thread_tick(get_name());
819  }
820 
821  if (!_active.empty()) {
822  // There are more tasks; just set the next sort value.
823  nassertr(_current_sort < _active.front()->get_sort(), true);
824  _current_sort = _active.front()->get_sort();
825  _cvar.notify_all();
826  return true;
827  }
828 
829  // There are no more tasks in this epoch; advance to the next epoch.
830 
831  if (!_this_active.empty() && _frame_budget >= 0.0) {
832  // Enter pickup mode. This is a special mode at the end of the epoch in
833  // which we are just re-running the tasks that think they can still run
834  // within the frame, in an attempt to use up our frame budget.
835 
836  if (task_cat.is_spam()) {
837  do_output(task_cat.spam());
838  task_cat.spam(false)
839  << ": next epoch (pickup mode)\n";
840  }
841 
842  _pickup_mode = true;
843  _active.swap(_this_active);
844 
845  } else {
846  // Not in pickup mode.
847 
848  if (task_cat.is_spam()) {
849  do_output(task_cat.spam());
850  task_cat.spam(false)
851  << ": next epoch\n";
852  }
853 
854  _pickup_mode = false;
855 
856  // Here, there's no difference between _this_active and _next_active.
857  // Combine them.
858  _next_active.insert(_next_active.end(), _this_active.begin(), _this_active.end());
859  _this_active.clear();
860 
861  _active.swap(_next_active);
862 
863  // We only tick the clock and wake sleepers in normal mode, the first time
864  // through the task list; not in pickup mode when we are re-running the
865  // stragglers just to use up our frame budget.
866 
867  if (_tick_clock) {
868  if (task_cat.is_spam()) {
869  do_output(task_cat.spam());
870  task_cat.spam(false)
871  << ": tick clock\n";
872  }
873  _manager->_clock->tick();
874  _manager->_frame_cvar.notify_all();
875 
876  } else if (_frame_sync) {
877  // If we're a synced chain, we have to wait at the end of the epoch for
878  // someone else to tick the clock.
879  _block_till_next_frame = true;
880  }
881 
882  // Check for any sleeping tasks that need to be woken.
883  double now = _manager->_clock->get_frame_time();
884  while (!_sleeping.empty() && _sleeping.front()->_wake_time <= now) {
885  PT(AsyncTask) task = _sleeping.front();
886  if (task_cat.is_spam()) {
887  task_cat.spam()
888  << "Waking " << *task << ", wake time at "
889  << task->_wake_time - now << "\n";
890  }
891  pop_heap(_sleeping.begin(), _sleeping.end(), AsyncTaskSortWakeTime());
892  _sleeping.pop_back();
893  task->_state = AsyncTask::S_active;
894  task->_start_frame = _manager->_clock->get_frame_count();
895  _active.push_back(task);
896  }
897 
898  if (task_cat.is_spam()) {
899  if (_sleeping.empty()) {
900  task_cat.spam()
901  << "No more tasks on sleeping queue.\n";
902  } else {
903  task_cat.spam()
904  << "Next sleeper: " << *_sleeping.front() << ", wake time at "
905  << _sleeping.front()->_wake_time - now << "\n";
906  }
907  }
908 
909  // Any tasks that are on the active queue at the beginning of the epoch
910  // are deemed to have run one frame (or to be about to).
911  TaskHeap::const_iterator ti;
912  for (ti = _active.begin(); ti != _active.end(); ++ti) {
913  AsyncTask *task = (*ti);
914  ++task->_num_frames;
915  }
916  }
917 
918  if (_timeslice_priority) {
919  filter_timeslice_priority();
920  }
921 
922  nassertr((size_t)_num_tasks == _active.size() + _this_active.size() + _next_active.size() + _sleeping.size() + (size_t)_num_awaiting_tasks, true);
923  make_heap(_active.begin(), _active.end(), AsyncTaskSortPriority());
924 
925  _current_sort = -INT_MAX;
926 
927  if (!_active.empty()) {
928  // Signal the threads to start executing the first task again.
929  _cvar.notify_all();
930  return true;
931  }
932 
933  // There are no tasks to be had anywhere. Chill.
934  _pickup_mode = false;
935  nassertr(_this_active.empty(), false);
936  return false;
937 }
938 
939 /**
940  * Called to filter the _active tasks list when we are in the special
941  * timeslice_priority mode. In this mode, go through and postpone any tasks
942  * that have already exceeded their priority budget for this epoch.
943  *
944  * Assumes the lock is already held.
945  */
946 void AsyncTaskChain::
947 filter_timeslice_priority() {
948  if (_active.empty()) {
949  return;
950  }
951  nassertv(_timeslice_priority);
952 
953  // We must first sum up the average per-epoch runtime of each task.
954  double net_runtime = 0.0;
955  int net_priority = 0;
956 
957  TaskHeap::iterator ti;
958  for (ti = _active.begin(); ti != _active.end(); ++ti) {
959  AsyncTask *task = (*ti);
960  double runtime = max(task->get_average_dt(), 0.0);
961  int priority = max(task->_priority, 1);
962  net_runtime += runtime;
963  net_priority += priority;
964  }
965 
966  // That gives us a timeslice budget per priority value.
967  double average_budget = net_runtime / (double)net_priority;
968 
969  TaskHeap keep, postpone;
970  for (ti = _active.begin(); ti != _active.end(); ++ti) {
971  AsyncTask *task = (*ti);
972  double runtime = max(task->get_average_dt(), 0.0);
973  int priority = max(task->_priority, 1);
974  double consumed = runtime / (double)priority;
975  // cerr << *task << " consumed " << consumed << " vs. " << average_budget
976  // << "\n";
977  if (consumed > average_budget) {
978  // Postpone. Run this task next epoch.
979  postpone.push_back(task);
980  } else {
981  // Keep, and run this task this epoch.
982  keep.push_back(task);
983  }
984  }
985 
986  if (keep.empty()) {
987  // Hmm, nothing to keep. Grab the postponed task with the highest
988  // priority and keep that instead.
989  nassertv(!postpone.empty());
990  ti = postpone.begin();
991  TaskHeap::iterator max_ti = ti;
992  ++ti;
993  while (ti != postpone.end()) {
994  if ((*ti)->_priority > (*max_ti)->_priority) {
995  max_ti = ti;
996  }
997  }
998 
999  // cerr << "Nothing to keep, keeping " << *(*max_ti) << " instead\n";
1000 
1001  keep.push_back(*max_ti);
1002  postpone.erase(max_ti);
1003  }
1004 
1005  _active.swap(keep);
1006  if (_pickup_mode) {
1007  _this_active.insert(_this_active.end(), postpone.begin(), postpone.end());
1008  } else {
1009  _next_active.insert(_next_active.end(), postpone.begin(), postpone.end());
1010  }
1011 
1012  nassertv(!_active.empty());
1013 }
1014 
1015 /**
1016  * The private implementation of stop_threads; assumes the lock is already
1017  * held.
1018  */
1019 void AsyncTaskChain::
1020 do_stop_threads() {
1021  if (_state == S_started || _state == S_interrupted) {
1022  if (task_cat.is_debug() && !_threads.empty()) {
1023  task_cat.debug()
1024  << "Stopping " << _threads.size()
1025  << " threads for " << _manager->get_name()
1026  << " chain " << get_name()
1027  << " in " << *Thread::get_current_thread() << "\n";
1028  }
1029 
1030  _state = S_shutdown;
1031  _cvar.notify_all();
1032  _manager->_frame_cvar.notify_all();
1033 
1034  Threads wait_threads;
1035  wait_threads.swap(_threads);
1036 
1037  // We have to release the lock while we join, so the threads can wake up
1038  // and see that we're shutting down.
1039  _manager->_lock.unlock();
1040  Threads::iterator ti;
1041  for (ti = wait_threads.begin(); ti != wait_threads.end(); ++ti) {
1042  if (task_cat.is_debug()) {
1043  task_cat.debug()
1044  << "Waiting for " << *(*ti) << " in "
1045  << *Thread::get_current_thread() << "\n";
1046  }
1047  (*ti)->join();
1048  if (task_cat.is_spam()) {
1049  task_cat.spam()
1050  << "Done waiting for " << *(*ti) << " in "
1051  << *Thread::get_current_thread() << "\n";
1052  }
1053  }
1054  _manager->_lock.lock();
1055 
1056  _state = S_initial;
1057 
1058  // There might be one busy "thread" still: the main thread.
1059  nassertv(_num_busy_threads == 0 || _num_busy_threads == 1);
1060  cleanup_pickup_mode();
1061  }
1062 }
1063 
1064 /**
1065  * The private implementation of start_threads; assumes the lock is already
1066  * held.
1067  */
1068 void AsyncTaskChain::
1069 do_start_threads() {
1070  if (_state == S_interrupted) {
1071  do_stop_threads();
1072  }
1073 
1074  if (_state == S_initial) {
1075  _state = S_started;
1076  if (Thread::is_threading_supported() && _num_threads > 0) {
1077  if (task_cat.is_debug()) {
1078  task_cat.debug()
1079  << "Starting " << _num_threads << " threads for "
1080  << _manager->get_name() << " chain " << get_name() << "\n";
1081  }
1082  _needs_cleanup = true;
1083  _threads.reserve(_num_threads);
1084  for (int i = 0; i < _num_threads; ++i) {
1085  ostringstream strm;
1086  strm << _manager->get_name() << "_" << get_name() << "_" << i;
1087  PT(AsyncTaskChainThread) thread = new AsyncTaskChainThread(strm.str(), this);
1088  if (thread->start(_thread_priority, true)) {
1089  _threads.push_back(thread);
1090  }
1091  }
1092  }
1093  }
1094 }
1095 
1096 /**
1097  * Returns the set of tasks that are active (and not sleeping) on the task
1098  * chain, at the time of the call. Assumes the lock is held.
1099  */
1100 AsyncTaskCollection AsyncTaskChain::
1101 do_get_active_tasks() const {
1102  AsyncTaskCollection result;
1103 
1104  Threads::const_iterator thi;
1105  for (thi = _threads.begin(); thi != _threads.end(); ++thi) {
1106  AsyncTask *task = (*thi)->_servicing;
1107  if (task != nullptr) {
1108  result.add_task(task);
1109  }
1110  }
1111  TaskHeap::const_iterator ti;
1112  for (ti = _active.begin(); ti != _active.end(); ++ti) {
1113  AsyncTask *task = (*ti);
1114  result.add_task(task);
1115  }
1116  for (ti = _this_active.begin(); ti != _this_active.end(); ++ti) {
1117  AsyncTask *task = (*ti);
1118  result.add_task(task);
1119  }
1120  for (ti = _next_active.begin(); ti != _next_active.end(); ++ti) {
1121  AsyncTask *task = (*ti);
1122  result.add_task(task);
1123  }
1124 
1125  return result;
1126 }
1127 
1128 /**
1129  * Returns the set of tasks that are sleeping (and not active) on the task
1130  * chain, at the time of the call. Assumes the lock is held.
1131  */
1132 AsyncTaskCollection AsyncTaskChain::
1133 do_get_sleeping_tasks() const {
1134  AsyncTaskCollection result;
1135 
1136  TaskHeap::const_iterator ti;
1137  for (ti = _sleeping.begin(); ti != _sleeping.end(); ++ti) {
1138  AsyncTask *task = (*ti);
1139  result.add_task(task);
1140  }
1141 
1142  return result;
1143 }
1144 
1145 /**
1146  * The private implementation of poll(), this assumes the lock is already
1147  * held.
1148  */
1149 void AsyncTaskChain::
1150 do_poll() {
1151  thread_consider_yield();
1152  if (_num_tasks == 0) {
1153  return;
1154  }
1155 
1156  do_start_threads();
1157 
1158  if (!_threads.empty()) {
1159  return;
1160  }
1161 
1162  if (_num_busy_threads != 0) {
1163  // We are recursively nested within another task. Return, with a warning.
1164  task_cat.warning()
1165  << "Ignoring recursive poll() within another task.\n";
1166  return;
1167  }
1168 
1169  nassertv(!_pickup_mode);
1170 
1171  do {
1172  while (!_active.empty()) {
1173  if (_state == S_shutdown || _state == S_interrupted) {
1174  return;
1175  }
1176  int frame = _manager->_clock->get_frame_count();
1177  if (_current_frame != frame) {
1178  _current_frame = frame;
1179  _time_in_frame = 0.0;
1180  _block_till_next_frame = false;
1181  }
1182  if (_block_till_next_frame ||
1183  (_frame_budget >= 0.0 && _time_in_frame >= _frame_budget)) {
1184  // If we've exceeded our budget, stop here. We'll resume from this
1185  // point at the next call to poll().
1186  cleanup_pickup_mode();
1187  return;
1188  }
1189 
1190  _current_sort = _active.front()->get_sort();
1191 
1192  // Normally, there won't be any threads running at the same time we're
1193  // in poll(). But it's possible, if someone calls set_num_threads()
1194  // while we're processing.
1195  _num_busy_threads++;
1196  service_one_task(nullptr);
1197  _num_busy_threads--;
1198  _cvar.notify_all();
1199 
1200  if (!_threads.empty()) {
1201  return;
1202  }
1203  }
1204 
1205  finish_sort_group();
1206  } while (_pickup_mode);
1207 }
1208 
1209 /**
1210  * Clean up the damage from setting pickup mode. This means we restore the
1211  * _active and _next_active lists as they should have been without pickup
1212  * mode, for next frame. Assumes the lock is held.
1213  */
1214 void AsyncTaskChain::
1215 cleanup_pickup_mode() {
1216  if (_pickup_mode) {
1217  _pickup_mode = false;
1218 
1219  // Move everything to the _next_active queue.
1220  _next_active.insert(_next_active.end(), _this_active.begin(), _this_active.end());
1221  _this_active.clear();
1222  _next_active.insert(_next_active.end(), _active.begin(), _active.end());
1223  _active.clear();
1224 
1225  // Now finish the epoch properly.
1226  finish_sort_group();
1227  }
1228 }
1229 
1230 /**
1231  * The private implementation of output(), this assumes the lock is already
1232  * held.
1233  */
1234 void AsyncTaskChain::
1235 do_output(ostream &out) const {
1236  if (_manager != nullptr) {
1237  out << _manager->get_type() << " " << _manager->get_name();
1238  } else {
1239  out << "(no manager)";
1240  }
1241  out << " task chain " << get_name()
1242  << "; " << _num_tasks << " tasks";
1243 }
1244 
1245 /**
1246  * The private implementation of write(), this assumes the lock is already
1247  * held.
1248  */
1249 void AsyncTaskChain::
1250 do_write(ostream &out, int indent_level) const {
1251  indent(out, indent_level)
1252  << "Task chain \"" << get_name() << "\"\n";
1253  if (_num_threads > 0) {
1254  indent(out, indent_level + 2)
1255  << _num_threads << " threads, priority " << _thread_priority << "\n";
1256  }
1257  if (_frame_budget >= 0.0) {
1258  indent(out, indent_level + 2)
1259  << "frame budget " << _frame_budget << " s\n";
1260  }
1261  if (_timeslice_priority) {
1262  indent(out, indent_level + 2)
1263  << "timeslice priority\n";
1264  }
1265  if (_tick_clock) {
1266  indent(out, indent_level + 2)
1267  << "tick clock\n";
1268  }
1269 
1270  static const size_t buffer_size = 1024;
1271  char buffer[buffer_size];
1272  sprintf(buffer, " %-32s %8s %8s %8s %8s %6s",
1273  "Task",
1274  "sleep(s)",
1275  "dt(ms)", "avg", "max",
1276  "sort");
1277  nassertv(strlen(buffer) < buffer_size);
1278 
1279  indent(out, indent_level)
1280  << buffer << "\n";
1281 
1282  indent(out, indent_level);
1283  for (int i = 0; i < 32+8+8+8+8+6+7; ++i) {
1284  out << '-';
1285  }
1286  out << "\n";
1287 
1288  // Collect a list of all active tasks, then sort them into order for output.
1289  TaskHeap tasks = _active;
1290  tasks.insert(tasks.end(), _this_active.begin(), _this_active.end());
1291  tasks.insert(tasks.end(), _next_active.begin(), _next_active.end());
1292 
1293  Threads::const_iterator thi;
1294  for (thi = _threads.begin(); thi != _threads.end(); ++thi) {
1295  AsyncTask *task = (*thi)->_servicing;
1296  if (task != nullptr) {
1297  tasks.push_back(task);
1298  }
1299  }
1300 
1301  double now = _manager->_clock->get_frame_time();
1302 
1303  if (!tasks.empty()) {
1304  sort(tasks.begin(), tasks.end(), AsyncTaskSortPriority());
1305 
1306  // Since AsyncTaskSortPriority() sorts backwards (because of STL's
1307  // push_heap semantics), we go through the task list in reverse order to
1308  // print them forwards.
1309  TaskHeap::reverse_iterator ti;
1310  for (ti = tasks.rbegin(); ti != tasks.rend(); ++ti) {
1311  AsyncTask *task = (*ti);
1312  write_task_line(out, indent_level, task, now);
1313  }
1314  }
1315 
1316  // Instead of iterating through the _sleeping list in heap order, copy it
1317  // and then use repeated pops to get it out in sorted order, for the user's
1318  // satisfaction.
1319  TaskHeap sleeping = _sleeping;
1320  while (!sleeping.empty()) {
1321  PT(AsyncTask) task = sleeping.front();
1322  pop_heap(sleeping.begin(), sleeping.end(), AsyncTaskSortWakeTime());
1323  sleeping.pop_back();
1324 
1325  write_task_line(out, indent_level, task, now);
1326  }
1327 }
1328 
1329 /**
1330  * Writes a single line for a task in the report generated by do_write().
1331  * Assumes the lock is already held.
1332  */
1333 void AsyncTaskChain::
1334 write_task_line(ostream &out, int indent_level, AsyncTask *task, double now) const {
1335  char servicing_flag = ' ';
1336  if (task->_state == AsyncTask::S_servicing) {
1337  servicing_flag = '*';
1338  } else if (task->_state == AsyncTask::S_servicing_removed) {
1339  servicing_flag = '-';
1340  }
1341 
1342  static const size_t buffer_size = 1024;
1343  char buffer[buffer_size];
1344 
1345  if (task->_state == AsyncTask::S_sleeping) {
1346  // For sleeping tasks, include the wake time, as an elapsed time in
1347  // seconds.
1348  string name = task->get_name().substr(0, 32);
1349  sprintf(buffer, "%c%-32s %8.1f",
1350  servicing_flag, name.c_str(),
1351  task->_wake_time - now);
1352  } else {
1353  // For active tasks, don't include a wake time. This means we have more
1354  // space for the name.
1355  string name = task->get_name().substr(0, 41);
1356  sprintf(buffer, "%c%-41s",
1357  servicing_flag, name.c_str());
1358  }
1359  nassertv(strlen(buffer) < buffer_size);
1360 
1361  indent(out, indent_level)
1362  << buffer;
1363 
1364  if (task->_num_frames > 0) {
1365  sprintf(buffer, " %8.1f %8.1f %8.1f %6d",
1366  task->_dt * 1000.0, task->get_average_dt() * 1000.0,
1367  task->_max_dt * 1000.0,
1368  task->_sort);
1369  } else {
1370  // No statistics for a task that hasn't run yet.
1371  sprintf(buffer, " %8s %8s %8s %6d",
1372  "", "", "",
1373  task->_sort);
1374  }
1375 
1376  nassertv(strlen(buffer) < buffer_size);
1377  out << buffer << "\n";
1378 }
1379 
1380 /**
1381  *
1382  */
1383 AsyncTaskChain::AsyncTaskChainThread::
1384 AsyncTaskChainThread(const string &name, AsyncTaskChain *chain) :
1385  Thread(name, chain->get_name()),
1386  _chain(chain),
1387  _servicing(nullptr)
1388 {
1389 }
1390 
1391 /**
1392  *
1393  */
1394 void AsyncTaskChain::AsyncTaskChainThread::
1395 thread_main() {
1396  MutexHolder holder(_chain->_manager->_lock);
1397  while (_chain->_state != S_shutdown && _chain->_state != S_interrupted) {
1398  thread_consider_yield();
1399  if (!_chain->_active.empty() &&
1400  _chain->_active.front()->get_sort() == _chain->_current_sort) {
1401 
1402  int frame = _chain->_manager->_clock->get_frame_count();
1403  if (_chain->_current_frame != frame) {
1404  _chain->_current_frame = frame;
1405  _chain->_time_in_frame = 0.0;
1406  _chain->_block_till_next_frame = false;
1407  }
1408 
1409  // If we've exceeded our frame budget, sleep until the next frame.
1410  if (_chain->_block_till_next_frame ||
1411  (_chain->_frame_budget >= 0.0 && _chain->_time_in_frame >= _chain->_frame_budget)) {
1412  while ((_chain->_block_till_next_frame ||
1413  (_chain->_frame_budget >= 0.0 && _chain->_time_in_frame >= _chain->_frame_budget)) &&
1414  _chain->_state != S_shutdown && _chain->_state != S_interrupted) {
1415  _chain->cleanup_pickup_mode();
1416  _chain->_manager->_frame_cvar.wait();
1417  frame = _chain->_manager->_clock->get_frame_count();
1418  if (_chain->_current_frame != frame) {
1419  _chain->_current_frame = frame;
1420  _chain->_time_in_frame = 0.0;
1421  _chain->_block_till_next_frame = false;
1422  }
1423  }
1424  // Now that it's the next frame, go back to the top of the loop.
1425  continue;
1426  }
1427 
1428  PStatTimer timer(_task_pcollector);
1429  _chain->_num_busy_threads++;
1430  _chain->service_one_task(this);
1431  _chain->_num_busy_threads--;
1432  _chain->_cvar.notify_all();
1433 
1434  } else {
1435  // We've finished all the available tasks of the current sort value. We
1436  // can't pick up a new task until all of the threads finish the tasks
1437  // with the same sort value.
1438  if (_chain->_num_busy_threads == 0) {
1439  // We're the last thread to finish. Update _current_sort.
1440  if (!_chain->finish_sort_group()) {
1441  // Nothing to do. Wait for more tasks to be added.
1442  if (_chain->_sleeping.empty()) {
1443  PStatTimer timer(_wait_pcollector);
1444  _chain->_cvar.wait();
1445  } else {
1446  double wake_time = _chain->do_get_next_wake_time();
1447  double now = _chain->_manager->_clock->get_frame_time();
1448  double timeout = max(wake_time - now, 0.0);
1449  PStatTimer timer(_wait_pcollector);
1450  _chain->_cvar.wait(timeout);
1451  }
1452  }
1453 
1454  } else {
1455  // Wait for the other threads to finish their current task before we
1456  // continue.
1457  PStatTimer timer(_wait_pcollector);
1458  _chain->_cvar.wait();
1459  }
1460  }
1461  }
1462 }
bool get_timeslice_priority() const
Returns the timeslice_priority flag.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
AsyncTaskCollection get_tasks() const
Returns the set of tasks that are active or sleeping on the task chain, at the time of the call...
void wait()
Waits on the condition.
void notify_done(bool clean_exit)
Schedules the done callbacks.
A class to manage a loose queue of isolated tasks, which can be performed either synchronously (in th...
void set_frame_sync(bool frame_sync)
Sets the frame_sync flag.
bool get_tick_clock() const
Returns the tick_clock flag.
double get_frame_budget() const
Returns the maximum amount of time per frame the tasks on this chain are granted for execution...
void add_task(AsyncTask *task)
Adds a new AsyncTask to the collection.
A list of tasks, for instance as returned by some of the AsyncTaskManager query functions.
bool has_delay() const
Returns true if a delay has been set for this task via set_delay(), or false otherwise.
Definition: asyncTask.I:93
void start_threads()
Starts any requested threads to service the tasks on the queue.
get_sort
Returns the task&#39;s current sort value.
Definition: asyncTask.h:115
A lightweight class that can be used to automatically start and stop a PStatCollector around a sectio...
Definition: pStatTimer.h:30
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
A lightweight C++ object whose constructor calls acquire() and whose destructor calls release() on a ...
Definition: mutexHolder.h:25
void unlock()
Alias for release() to match C++11 semantics.
Definition: mutexDirect.I:39
void wait_for_tasks()
Blocks until the task list is empty.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
void set_num_threads(int num_threads)
Changes the number of threads for this task chain.
void notify_all()
Informs all of the other threads who are currently blocked on wait() that the relevant condition has ...
A lightweight class that represents a single element that may be timed and/or counted via stats...
void set_frame_budget(double frame_budget)
Sets the maximum amount of time per frame the tasks on this chain are granted for execution...
get_current_thread
Returns a pointer to the currently-executing Thread object.
Definition: thread.h:109
A base class for all things which can have a name.
Definition: namable.h:26
bool get_frame_sync() const
Returns the frame_sync flag.
std::ostream & indent(std::ostream &out, int indent_level)
A handy function for doing text formatting.
Definition: indent.cxx:20
int get_num_running_threads() const
Returns the number of threads that have been created and are actively running.
void add_tasks_from(const AsyncTaskCollection &other)
Adds all the AsyncTasks indicated in the other collection to this task.
ThreadPriority get_thread_priority() const
Returns the priority associated with threads that serve this task chain.
int get_num_tasks() const
Returns the number of tasks that are currently active or sleeping within the task chain...
The AsyncTaskChain is a subset of the AsyncTaskManager.
AsyncTaskCollection get_active_tasks() const
Returns the set of tasks that are active (and not sleeping) on the task chain, at the time of the cal...
int get_num_threads() const
Returns the number of threads that will be servicing tasks for this chain.
get_average_dt
Returns the average amount of time elapsed during each of the task&#39;s previous run cycles...
Definition: asyncTask.h:121
void poll()
Runs through all the tasks in the task list, once, if the task chain is running in single-threaded mo...
This class represents a concrete task performed by an AsyncManager.
Definition: asyncTask.h:32
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
A thread; that is, a lightweight process.
Definition: thread.h:46
void set_tick_clock(bool tick_clock)
Sets the tick_clock flag.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
void set_timeslice_priority(bool timeslice_priority)
Sets the timeslice_priority flag.
void stop_threads()
Stops any threads that are currently running.
TypeHandle is the identifier used to differentiate C++ class types.
Definition: typeHandle.h:81
void set_thread_priority(ThreadPriority priority)
Changes the priority associated with threads that serve this task chain.
double get_next_wake_time() const
Returns the scheduled time (on the manager&#39;s clock) of the next sleeping task, on any task chain...
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
void lock()
Alias for acquire() to match C++11 semantics.
Definition: mutexDirect.I:19
bool has_task(AsyncTask *task) const
Returns true if the indicated task has been added to this AsyncTaskChain, false otherwise.
double get_delay() const
Returns the delay value that has been set via set_delay, if any.
Definition: asyncTask.I:101
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
AsyncTaskCollection get_sleeping_tasks() const
Returns the set of tasks that are sleeping (and not active) on the task chain, at the time of the cal...