Panda3D
asyncTaskChain.cxx
Go to the documentation of this file.
1 /**
2  * PANDA 3D SOFTWARE
3  * Copyright (c) Carnegie Mellon University. All rights reserved.
4  *
5  * All use of this software is subject to the terms of the revised BSD
6  * license. You should have received a copy of this license along
7  * with this source code in a file named "LICENSE."
8  *
9  * @file asyncTaskChain.cxx
10  * @author drose
11  * @date 2006-08-23
12  */
13 
14 #include "asyncTaskChain.h"
15 #include "asyncTaskManager.h"
16 #include "event.h"
17 #include "mutexHolder.h"
18 #include "indent.h"
19 #include "pStatClient.h"
20 #include "pStatTimer.h"
21 #include "clockObject.h"
22 #include "config_event.h"
23 #include <algorithm>
24 #include <stdio.h> // For sprintf/snprintf
25 
26 using std::max;
27 using std::ostream;
28 using std::ostringstream;
29 using std::string;
30 
31 TypeHandle AsyncTaskChain::_type_handle;
32 
33 PStatCollector AsyncTaskChain::_task_pcollector("Task");
34 PStatCollector AsyncTaskChain::_wait_pcollector("Wait");
35 
36 /**
37  *
38  */
39 AsyncTaskChain::
40 AsyncTaskChain(AsyncTaskManager *manager, const string &name) :
41  Namable(name),
42  _manager(manager),
43  _cvar(manager->_lock),
44  _tick_clock(false),
45  _timeslice_priority(false),
46  _num_threads(0),
47  _thread_priority(TP_normal),
48  _frame_budget(-1.0),
49  _frame_sync(false),
50  _num_busy_threads(0),
51  _num_tasks(0),
52  _num_awaiting_tasks(0),
53  _state(S_initial),
54  _current_sort(-INT_MAX),
55  _pickup_mode(false),
56  _needs_cleanup(false),
57  _current_frame(0),
58  _time_in_frame(0.0),
59  _block_till_next_frame(false),
60  _next_implicit_sort(0)
61 {
62 }
63 
64 /**
65  *
66  */
67 AsyncTaskChain::
68 ~AsyncTaskChain() {
69  // We only grab the lock if _needs_cleanup is true. This way, the temporary
70  // AsyncTaskChain objects created (and destructed) within the task manager
71  // won't risk a double-lock.
72  if (_needs_cleanup) {
73  MutexHolder holder(_manager->_lock);
74  do_cleanup();
75  }
76 }
77 
78 /**
79  * Sets the tick_clock flag. When this is true, get_clock()->tick() will be
80  * called automatically at each task epoch. This is false by default.
81  */
83 set_tick_clock(bool tick_clock) {
84  MutexHolder holder(_manager->_lock);
85  _tick_clock = tick_clock;
86 }
87 
88 /**
89  * Returns the tick_clock flag. See set_tick_clock().
90  */
92 get_tick_clock() const {
93  MutexHolder holder(_manager->_lock);
94  return _tick_clock;
95 }
96 
97 /**
98  * Changes the number of threads for this task chain. This may require
99  * stopping the threads if they are already running.
100  */
101 void AsyncTaskChain::
102 set_num_threads(int num_threads) {
103  nassertv(num_threads >= 0);
104 
105  if (task_cat.is_debug()) {
106  do_output(task_cat.debug());
107  task_cat.debug(false)
108  << ": set_num_threads(" << num_threads << ")\n";
109  }
110 
111  if (!Thread::is_threading_supported()) {
112  num_threads = 0;
113  }
114 
115  MutexHolder holder(_manager->_lock);
116  if (_num_threads != num_threads) {
117  do_stop_threads();
118  _num_threads = num_threads;
119 
120  if (_num_tasks != 0) {
121  do_start_threads();
122  }
123  }
124 }
125 
126 /**
127  * Returns the number of threads that will be servicing tasks for this chain.
128  * Also see get_num_running_threads().
129  */
132  MutexHolder holder(_manager->_lock);
133  return _num_threads;
134 }
135 
136 /**
137  * Returns the number of threads that have been created and are actively
138  * running. This will return 0 before the threads have been started; it will
139  * also return 0 if thread support is not available.
140  */
143  MutexHolder holder(_manager->_lock);
144  return _threads.size();
145 }
146 
147 /**
148  * Changes the priority associated with threads that serve this task chain.
149  * This may require stopping the threads if they are already running.
150  */
151 void AsyncTaskChain::
152 set_thread_priority(ThreadPriority priority) {
153  MutexHolder holder(_manager->_lock);
154  if (_thread_priority != priority) {
155  do_stop_threads();
156  _thread_priority = priority;
157 
158  if (_num_tasks != 0) {
159  do_start_threads();
160  }
161  }
162 }
163 
164 /**
165  * Returns the priority associated with threads that serve this task chain.
166  */
167 ThreadPriority AsyncTaskChain::
169  MutexHolder holder(_manager->_lock);
170  return _thread_priority;
171 }
172 
173 /**
174  * Sets the maximum amount of time per frame the tasks on this chain are
175  * granted for execution. If this is less than zero, there is no limit; if it
176  * is >= 0, it represents a maximum amount of time (in seconds) that will be
177  * used to execute tasks. If this time is exceeded in any one frame, the task
178  * chain will stop executing tasks until the next frame, as defined by the
179  * TaskManager's clock.
180  */
181 void AsyncTaskChain::
182 set_frame_budget(double frame_budget) {
183  MutexHolder holder(_manager->_lock);
184  _frame_budget = frame_budget;
185 }
186 
187 /**
188  * Returns the maximum amount of time per frame the tasks on this chain are
189  * granted for execution. See set_frame_budget().
190  */
191 double AsyncTaskChain::
193  MutexHolder holder(_manager->_lock);
194  return _frame_budget;
195 }
196 
197 /**
198  * Sets the frame_sync flag. When this flag is true, this task chain will be
199  * forced to sync with the TaskManager's clock. It will run no faster than
200  * one epoch per clock frame.
201  *
202  * When this flag is false, the default, the task chain will finish all of its
203  * tasks and then immediately start from the first task again, regardless of
204  * the clock frame. When it is true, the task chain will finish all of its
205  * tasks and then wait for the clock to tick to the next frame before resuming
206  * the first task.
207  *
208  * This only makes sense for threaded task chains. Non-threaded task chains
209  * are automatically synchronous.
210  */
211 void AsyncTaskChain::
212 set_frame_sync(bool frame_sync) {
213  MutexHolder holder(_manager->_lock);
214  _frame_sync = frame_sync;
215 }
216 
217 /**
218  * Returns the frame_sync flag. See set_frame_sync().
219  */
220 bool AsyncTaskChain::
221 get_frame_sync() const {
222  MutexHolder holder(_manager->_lock);
223  return _frame_sync;
224 }
225 
226 /**
227  * Sets the timeslice_priority flag. This changes the interpretation of
228  * priority, and the number of times per epoch each task will run.
229  *
230  * When this flag is true, some tasks might not run in any given epoch.
231  * Instead, tasks with priority higher than 1 will be given precedence, in
232  * proportion to the amount of time they have already used. This gives
233  * higher-priority tasks more runtime than lower-priority tasks. Each task
234  * gets the amount of time proportional to its priority value, so a task with
235  * priority 100 will get five times as much processing time as a task with
236  * priority 20. For these purposes, priority values less than 1 are deemed to
237  * be equal to 1.
238  *
239  * When this flag is false (the default), all tasks are run exactly once each
240  * epoch, round-robin style. Priority is only used to determine which task
241  * runs first within tasks of the same sort value.
242  */
243 void AsyncTaskChain::
244 set_timeslice_priority(bool timeslice_priority) {
245  MutexHolder holder(_manager->_lock);
246  _timeslice_priority = timeslice_priority;
247 }
248 
249 /**
250  * Returns the timeslice_priority flag. This changes the interpretation of
251  * priority, and the number of times per epoch each task will run. See
252  * set_timeslice_priority().
253  */
254 bool AsyncTaskChain::
256  MutexHolder holder(_manager->_lock);
257  return _timeslice_priority;
258 }
259 
260 /**
261  * Stops any threads that are currently running. If any tasks are still
262  * pending and have not yet been picked up by a thread, they will not be
263  * serviced unless poll() or start_threads() is later called.
264  */
265 void AsyncTaskChain::
267  if (_state == S_started || _state == S_interrupted) {
268  // Clean up all of the threads.
269  MutexHolder holder(_manager->_lock);
270  do_stop_threads();
271  }
272 }
273 
274 /**
275  * Starts any requested threads to service the tasks on the queue. This is
276  * normally not necessary, since adding a task will start the threads
277  * automatically.
278  */
279 void AsyncTaskChain::
281  if (_state == S_initial || _state == S_interrupted) {
282  MutexHolder holder(_manager->_lock);
283  do_start_threads();
284  }
285 }
286 
287 /**
288  * Returns true if the indicated task has been added to this AsyncTaskChain,
289  * false otherwise.
290  */
291 bool AsyncTaskChain::
292 has_task(AsyncTask *task) const {
293  MutexHolder holder(_manager->_lock);
294 
295  if (task->_chain != this) {
296  nassertr(!do_has_task(task), false);
297  return false;
298  }
299 
300  if (task->_state == AsyncTask::S_servicing_removed) {
301  return false;
302  }
303 
304  // The task might not actually be in the active queue, since it might be
305  // being serviced right now. That's OK.
306  return true;
307 }
308 
309 /**
310  * Blocks until the task list is empty.
311  */
312 void AsyncTaskChain::
314  MutexHolder holder(_manager->_lock);
315  do_wait_for_tasks();
316 }
317 
318 /**
319  * Returns the number of tasks that are currently active or sleeping within
320  * the task chain.
321  */
323 get_num_tasks() const {
324  MutexHolder holder(_manager->_lock);
325  return _num_tasks;
326 }
327 
328 /**
329  * Returns the set of tasks that are active or sleeping on the task chain, at
330  * the time of the call.
331  */
333 get_tasks() const {
334  MutexHolder holder(_manager->_lock);
335  AsyncTaskCollection result = do_get_active_tasks();
336  result.add_tasks_from(do_get_sleeping_tasks());
337  return result;
338 }
339 
340 /**
341  * Returns the set of tasks that are active (and not sleeping) on the task
342  * chain, at the time of the call.
343  */
346  MutexHolder holder(_manager->_lock);
347  return do_get_active_tasks();
348 }
349 
350 /**
351  * Returns the set of tasks that are sleeping (and not active) on the task
352  * chain, at the time of the call.
353  */
356  MutexHolder holder(_manager->_lock);
357  return do_get_sleeping_tasks();
358 }
359 
360 /**
361  * Runs through all the tasks in the task list, once, if the task chain is
362  * running in single-threaded mode (no threads available). This method does
363  * nothing in threaded mode, so it may safely be called in either case.
364  *
365  * Normally, you would not call this function directly; instead, call
366  * AsyncTaskManager::poll(), which polls all of the task chains in sequence.
367  */
368 void AsyncTaskChain::
369 poll() {
370  MutexHolder holder(_manager->_lock);
371  do_poll();
372 }
373 
374 /**
375  * Returns the scheduled time (on the manager's clock) of the next sleeping
376  * task, on any task chain, to awaken. Returns -1 if there are no sleeping
377  * tasks.
378  */
379 double AsyncTaskChain::
381  MutexHolder holder(_manager->_lock);
382  return do_get_next_wake_time();
383 }
384 
385 /**
386  *
387  */
388 void AsyncTaskChain::
389 output(ostream &out) const {
390  MutexHolder holder(_manager->_lock);
391  do_output(out);
392 }
393 
394 /**
395  *
396  */
397 void AsyncTaskChain::
398 write(ostream &out, int indent_level) const {
399  MutexHolder holder(_manager->_lock);
400  do_write(out, indent_level);
401 }
402 
403 /**
404  * Adds the indicated task to the active queue. It is an error if the task is
405  * already added to this or any other active queue.
406  *
407  * This is normally called only by the AsyncTaskManager. Assumes the lock is
408  * already held.
409  */
410 void AsyncTaskChain::
411 do_add(AsyncTask *task) {
412  nassertv(task->_chain == nullptr &&
413  task->_manager == nullptr &&
414  task->_chain_name == get_name() &&
415  task->_state == AsyncTask::S_inactive);
416  nassertv(!do_has_task(task));
417 
418  do_start_threads();
419 
420  task->_chain = this;
421  task->_manager = _manager;
422 
423  double now = _manager->_clock->get_frame_time();
424  task->_start_time = now;
425  task->_start_frame = _manager->_clock->get_frame_count();
426 
427  // Remember the order in which tasks were added to the chain.
428  task->_implicit_sort = _next_implicit_sort++;
429 
430  _manager->add_task_by_name(task);
431 
432  if (task->has_delay()) {
433  // This is a deferred task. Add it to the sleeping queue.
434  task->_wake_time = now + task->get_delay();
435  task->_start_time = task->_wake_time;
436  task->_state = AsyncTask::S_sleeping;
437  _sleeping.push_back(task);
438  push_heap(_sleeping.begin(), _sleeping.end(), AsyncTaskSortWakeTime());
439 
440  } else {
441  // This is an active task. Add it to the active set.
442  task->_state = AsyncTask::S_active;
443  if (task_cat.is_spam()) {
444  task_cat.spam()
445  << "Adding " << *task << " with sort " << task->get_sort()
446  << " to chain " << get_name() << " with current_sort "
447  << _current_sort << "\n";
448  }
449  if (task->get_sort() >= _current_sort) {
450  // It will run this frame.
451  _active.push_back(task);
452  push_heap(_active.begin(), _active.end(), AsyncTaskSortPriority());
453  } else {
454  // It will run next frame.
455  _next_active.push_back(task);
456  }
457  }
458  ++_num_tasks;
459  ++(_manager->_num_tasks);
460  _needs_cleanup = true;
461 
462  _cvar.notify_all();
463 }
464 
465 /**
466  * Removes the indicated task from this chain. Returns true if removed, false
467  * otherwise. Assumes the lock is already held. The task->upon_death()
468  * method is called with clean_exit=false if upon_death is given.
469  */
470 bool AsyncTaskChain::
471 do_remove(AsyncTask *task, bool upon_death) {
472  nassertr(task->_chain == this, false);
473 
474  switch (task->_state) {
475  case AsyncTask::S_servicing:
476  // This task is being serviced. upon_death will be called afterwards.
477  task->_state = AsyncTask::S_servicing_removed;
478  return true;
479 
480  case AsyncTask::S_servicing_removed:
481  // Being serviced, though it is already marked to be removed afterwards.
482  return false;
483 
484  case AsyncTask::S_sleeping:
485  // Sleeping, easy.
486  {
487  int index = find_task_on_heap(_sleeping, task);
488  nassertr(index != -1, false);
489  PT(AsyncTask) hold_task = task;
490  _sleeping.erase(_sleeping.begin() + index);
491  make_heap(_sleeping.begin(), _sleeping.end(), AsyncTaskSortWakeTime());
492  cleanup_task(task, upon_death, false);
493  }
494  return true;
495 
496  case AsyncTask::S_active:
497  {
498  // Active, but not being serviced, easy.
499  PT(AsyncTask) hold_task = task;
500  int index = find_task_on_heap(_active, task);
501  if (index != -1) {
502  _active.erase(_active.begin() + index);
503  make_heap(_active.begin(), _active.end(), AsyncTaskSortPriority());
504  } else {
505  index = find_task_on_heap(_next_active, task);
506  if (index != -1) {
507  _next_active.erase(_next_active.begin() + index);
508  } else {
509  index = find_task_on_heap(_this_active, task);
510  nassertr(index != -1, false);
511  }
512  }
513  cleanup_task(task, upon_death, false);
514  return true;
515  }
516 
517  default:
518  break;
519  }
520 
521  return false;
522 }
523 
524 /**
525  * Blocks until the task list is empty. Assumes the lock is held.
526  */
527 void AsyncTaskChain::
528 do_wait_for_tasks() {
529  do_start_threads();
530 
531  if (_threads.empty()) {
532  // Non-threaded case.
533  while (_num_tasks > 0) {
534  if (_state == S_shutdown || _state == S_interrupted) {
535  return;
536  }
537  do_poll();
538  }
539 
540  } else {
541  // Threaded case.
542  while (_num_tasks > 0) {
543  if (_state == S_shutdown || _state == S_interrupted) {
544  return;
545  }
546 
547  PStatTimer timer(_wait_pcollector);
548  _cvar.wait();
549  }
550  }
551 }
552 
553 /**
554  * Stops all threads and messily empties the task list. This is intended to
555  * be called on destruction only. Assumes the lock is already held.
556  */
557 void AsyncTaskChain::
558 do_cleanup() {
559  if (task_cat.is_spam()) {
560  do_output(task_cat.spam());
561  task_cat.spam(false)
562  << ": do_cleanup()\n";
563  }
564  do_stop_threads();
565  _num_threads = 0;
566 
567  // Don't call the upon_death functions while we clean up the tasks.
568  // Instead, store all the tasks in a list as we clean them up, and then call
569  // the upon_death functions all at once. We do this because calling
570  // upon_death wil release the lock, allowing the iterators to become
571  // invalid.
572 
573  TaskHeap dead;
574  dead.reserve(_num_tasks);
575 
576  _needs_cleanup = false;
577 
578  TaskHeap::const_iterator ti;
579  for (ti = _active.begin(); ti != _active.end(); ++ti) {
580  AsyncTask *task = (*ti);
581  dead.push_back(task);
582  cleanup_task(task, false, false);
583  }
584  for (ti = _this_active.begin(); ti != _this_active.end(); ++ti) {
585  AsyncTask *task = (*ti);
586  dead.push_back(task);
587  cleanup_task(task, false, false);
588  }
589  for (ti = _next_active.begin(); ti != _next_active.end(); ++ti) {
590  AsyncTask *task = (*ti);
591  dead.push_back(task);
592  cleanup_task(task, false, false);
593  }
594  for (ti = _sleeping.begin(); ti != _sleeping.end(); ++ti) {
595  AsyncTask *task = (*ti);
596  dead.push_back(task);
597  cleanup_task(task, false, false);
598  }
599 
600  // There might still be one task remaining: the currently-executing task.
601  nassertv(_num_tasks == 0 || _num_tasks == 1);
602 
603  // Now go back and call the upon_death functions.
604  _manager->_lock.unlock();
605  for (ti = dead.begin(); ti != dead.end(); ++ti) {
606  (*ti)->upon_death(_manager, false);
607  }
608  _manager->_lock.lock();
609 
610  if (task_cat.is_spam()) {
611  do_output(task_cat.spam());
612  task_cat.spam(false)
613  << ": done do_cleanup()\n";
614  }
615 }
616 
617 /**
618  * Returns true if the task is on one of the task lists, false if it is not
619  * (false may mean that the task is currently being serviced). Assumes the
620  * lock is currently held.
621  */
622 bool AsyncTaskChain::
623 do_has_task(AsyncTask *task) const {
624  return (find_task_on_heap(_active, task) != -1 ||
625  find_task_on_heap(_next_active, task) != -1 ||
626  find_task_on_heap(_sleeping, task) != -1 ||
627  find_task_on_heap(_this_active, task) != -1);
628 }
629 
630 /**
631  * Returns the index number of the indicated task within the specified task
632  * list, or -1 if the task is not found in the list (this may mean that it is
633  * currently being serviced). Assumes that the lock is currently held.
634  */
635 int AsyncTaskChain::
636 find_task_on_heap(const TaskHeap &heap, AsyncTask *task) const {
637  for (int i = 0; i < (int)heap.size(); ++i) {
638  if (heap[i] == task) {
639  return i;
640  }
641  }
642 
643  return -1;
644 }
645 
646 /**
647  * Pops a single task off the active queue, services it, and restores it to
648  * the end of the queue. This is called internally only within one of the
649  * task threads. Assumes the lock is already held.
650  *
651  * Note that the lock may be temporarily released by this method.
652  */
653 void AsyncTaskChain::
654 service_one_task(AsyncTaskChain::AsyncTaskChainThread *thread) {
655  if (!_active.empty()) {
656  PT(AsyncTask) task = _active.front();
657  pop_heap(_active.begin(), _active.end(), AsyncTaskSortPriority());
658  _active.pop_back();
659 
660  if (thread != nullptr) {
661  thread->_servicing = task;
662  }
663 
664  if (task_cat.is_spam()) {
665  task_cat.spam()
666  << "Servicing " << *task << " in "
667  << *Thread::get_current_thread() << "\n";
668  }
669 
670  nassertv(task->get_sort() == _current_sort);
671  nassertv(task->_state == AsyncTask::S_active);
672  task->_state = AsyncTask::S_servicing;
673  task->_servicing_thread = thread;
674 
675  AsyncTask::DoneStatus ds = task->unlock_and_do_task();
676 
677  if (thread != nullptr) {
678  thread->_servicing = nullptr;
679  }
680  task->_servicing_thread = nullptr;
681 
682  if (task->_chain == this) {
683  if (task->_state == AsyncTask::S_servicing_removed) {
684  // This task wants to kill itself.
685  cleanup_task(task, true, false);
686 
687  } else if (task->_chain_name != get_name()) {
688  // The task wants to jump to a different chain.
689  PT(AsyncTask) hold_task = task;
690  cleanup_task(task, false, false);
691  task->jump_to_task_chain(_manager);
692 
693  } else {
694  switch (ds) {
695  case AsyncTask::DS_cont:
696  // The task is still alive; put it on the next frame's active queue.
697  task->_state = AsyncTask::S_active;
698  _next_active.push_back(task);
699  _cvar.notify_all();
700  break;
701 
702  case AsyncTask::DS_again:
703  // The task wants to sleep again.
704  {
705  double now = _manager->_clock->get_frame_time();
706  task->_wake_time = now + task->get_delay();
707  task->_start_time = task->_wake_time;
708  task->_state = AsyncTask::S_sleeping;
709  _sleeping.push_back(task);
710  push_heap(_sleeping.begin(), _sleeping.end(), AsyncTaskSortWakeTime());
711  if (task_cat.is_spam()) {
712  task_cat.spam()
713  << "Sleeping " << *task << ", wake time at "
714  << task->_wake_time - now << "\n";
715  }
716  _cvar.notify_all();
717  }
718  break;
719 
720  case AsyncTask::DS_pickup:
721  // The task wants to run again this frame if possible.
722  task->_state = AsyncTask::S_active;
723  _this_active.push_back(task);
724  _cvar.notify_all();
725  break;
726 
727  case AsyncTask::DS_interrupt:
728  // The task had an exception and wants to raise a big flag.
729  task->_state = AsyncTask::S_active;
730  _next_active.push_back(task);
731  if (_state == S_started) {
732  _state = S_interrupted;
733  _cvar.notify_all();
734  }
735  break;
736 
737  case AsyncTask::DS_await:
738  // The task wants to wait for another one to finish.
739  task->_state = AsyncTask::S_awaiting;
740  _cvar.notify_all();
741  ++_num_awaiting_tasks;
742  break;
743 
744  default:
745  // The task has finished.
746  cleanup_task(task, true, true);
747  }
748  }
749  } else {
750  task_cat.error()
751  << "Task is no longer on chain " << get_name()
752  << ": " << *task << "\n";
753  }
754 
755  if (task_cat.is_spam()) {
756  task_cat.spam()
757  << "Done servicing " << *task << " in "
758  << *Thread::get_current_thread() << "\n";
759  }
760  }
761  thread_consider_yield();
762 }
763 
764 /**
765  * Called internally when a task has completed (or been interrupted) and is
766  * about to be removed from the active queue. Assumes the lock is held.
767  *
768  * If upon_death is true, then task->upon_death() will also be called, with
769  * the indicated clean_exit parameter.
770  *
771  * Note that the lock may be temporarily released by this method.
772  */
773 void AsyncTaskChain::
774 cleanup_task(AsyncTask *task, bool upon_death, bool clean_exit) {
775  if (task_cat.is_spam()) {
776  do_output(task_cat.spam());
777  task_cat.spam(false)
778  << ": cleanup_task(" << *task << ", " << upon_death << ", " << clean_exit
779  << ")\n";
780  }
781 
782  nassertv(task->_chain == this);
783 
784  task->_state = AsyncTask::S_inactive;
785  task->_chain = nullptr;
786  --_num_tasks;
787  --(_manager->_num_tasks);
788 
789  _manager->remove_task_by_name(task);
790 
791  if (upon_death) {
792  _manager->_lock.unlock();
793  if (task->set_future_state(clean_exit ? AsyncFuture::FS_finished
794  : AsyncFuture::FS_cancelled)) {
795  task->notify_done(clean_exit);
796  }
797  task->upon_death(_manager, clean_exit);
798  _manager->_lock.lock();
799  }
800 
801  task->_manager = nullptr;
802 }
803 
804 /**
805  * Called internally when all tasks of a given sort value have been completed,
806  * and it is time to increment to the next sort value, or begin the next
807  * epoch. Assumes the lock is held.
808  *
809  * Returns true if there are more tasks on the queue after this operation, or
810  * false if the task list is empty and we need to wait.
811  */
812 bool AsyncTaskChain::
813 finish_sort_group() {
814  nassertr(_num_busy_threads == 0, true);
815 
816  if (!_threads.empty()) {
817  PStatClient::thread_tick(get_name());
818  }
819 
820  if (!_active.empty()) {
821  // There are more tasks; just set the next sort value.
822  nassertr(_current_sort < _active.front()->get_sort(), true);
823  _current_sort = _active.front()->get_sort();
824  _cvar.notify_all();
825  return true;
826  }
827 
828  // There are no more tasks in this epoch; advance to the next epoch.
829 
830  if (!_this_active.empty() && _frame_budget >= 0.0) {
831  // Enter pickup mode. This is a special mode at the end of the epoch in
832  // which we are just re-running the tasks that think they can still run
833  // within the frame, in an attempt to use up our frame budget.
834 
835  if (task_cat.is_spam()) {
836  do_output(task_cat.spam());
837  task_cat.spam(false)
838  << ": next epoch (pickup mode)\n";
839  }
840 
841  _pickup_mode = true;
842  _active.swap(_this_active);
843 
844  } else {
845  // Not in pickup mode.
846 
847  if (task_cat.is_spam()) {
848  do_output(task_cat.spam());
849  task_cat.spam(false)
850  << ": next epoch\n";
851  }
852 
853  _pickup_mode = false;
854 
855  // Here, there's no difference between _this_active and _next_active.
856  // Combine them.
857  _next_active.insert(_next_active.end(), _this_active.begin(), _this_active.end());
858  _this_active.clear();
859 
860  _active.swap(_next_active);
861 
862  // We only tick the clock and wake sleepers in normal mode, the first time
863  // through the task list; not in pickup mode when we are re-running the
864  // stragglers just to use up our frame budget.
865 
866  if (_tick_clock) {
867  if (task_cat.is_spam()) {
868  do_output(task_cat.spam());
869  task_cat.spam(false)
870  << ": tick clock\n";
871  }
872  _manager->_clock->tick();
873  _manager->_frame_cvar.notify_all();
874 
875  } else if (_frame_sync) {
876  // If we're a synced chain, we have to wait at the end of the epoch for
877  // someone else to tick the clock.
878  _block_till_next_frame = true;
879  }
880 
881  // Check for any sleeping tasks that need to be woken.
882  double now = _manager->_clock->get_frame_time();
883  while (!_sleeping.empty() && _sleeping.front()->_wake_time <= now) {
884  PT(AsyncTask) task = _sleeping.front();
885  if (task_cat.is_spam()) {
886  task_cat.spam()
887  << "Waking " << *task << ", wake time at "
888  << task->_wake_time - now << "\n";
889  }
890  pop_heap(_sleeping.begin(), _sleeping.end(), AsyncTaskSortWakeTime());
891  _sleeping.pop_back();
892  task->_state = AsyncTask::S_active;
893  task->_start_frame = _manager->_clock->get_frame_count();
894  _active.push_back(task);
895  }
896 
897  if (task_cat.is_spam()) {
898  if (_sleeping.empty()) {
899  task_cat.spam()
900  << "No more tasks on sleeping queue.\n";
901  } else {
902  task_cat.spam()
903  << "Next sleeper: " << *_sleeping.front() << ", wake time at "
904  << _sleeping.front()->_wake_time - now << "\n";
905  }
906  }
907 
908  // Any tasks that are on the active queue at the beginning of the epoch
909  // are deemed to have run one frame (or to be about to).
910  TaskHeap::const_iterator ti;
911  for (ti = _active.begin(); ti != _active.end(); ++ti) {
912  AsyncTask *task = (*ti);
913  ++task->_num_frames;
914  }
915  }
916 
917  if (_timeslice_priority) {
918  filter_timeslice_priority();
919  }
920 
921  nassertr((size_t)_num_tasks == _active.size() + _this_active.size() + _next_active.size() + _sleeping.size() + (size_t)_num_awaiting_tasks, true);
922  make_heap(_active.begin(), _active.end(), AsyncTaskSortPriority());
923 
924  _current_sort = -INT_MAX;
925 
926  if (!_active.empty()) {
927  // Signal the threads to start executing the first task again.
928  _cvar.notify_all();
929  return true;
930  }
931 
932  // There are no tasks to be had anywhere. Chill.
933  _pickup_mode = false;
934  nassertr(_this_active.empty(), false);
935  return false;
936 }
937 
938 /**
939  * Called to filter the _active tasks list when we are in the special
940  * timeslice_priority mode. In this mode, go through and postpone any tasks
941  * that have already exceeded their priority budget for this epoch.
942  *
943  * Assumes the lock is already held.
944  */
945 void AsyncTaskChain::
946 filter_timeslice_priority() {
947  if (_active.empty()) {
948  return;
949  }
950  nassertv(_timeslice_priority);
951 
952  // We must first sum up the average per-epoch runtime of each task.
953  double net_runtime = 0.0;
954  int net_priority = 0;
955 
956  TaskHeap::iterator ti;
957  for (ti = _active.begin(); ti != _active.end(); ++ti) {
958  AsyncTask *task = (*ti);
959  double runtime = max(task->get_average_dt(), 0.0);
960  int priority = max(task->_priority, 1);
961  net_runtime += runtime;
962  net_priority += priority;
963  }
964 
965  // That gives us a timeslice budget per priority value.
966  double average_budget = net_runtime / (double)net_priority;
967 
968  TaskHeap keep, postpone;
969  for (ti = _active.begin(); ti != _active.end(); ++ti) {
970  AsyncTask *task = (*ti);
971  double runtime = max(task->get_average_dt(), 0.0);
972  int priority = max(task->_priority, 1);
973  double consumed = runtime / (double)priority;
974  // cerr << *task << " consumed " << consumed << " vs. " << average_budget
975  // << "\n";
976  if (consumed > average_budget) {
977  // Postpone. Run this task next epoch.
978  postpone.push_back(task);
979  } else {
980  // Keep, and run this task this epoch.
981  keep.push_back(task);
982  }
983  }
984 
985  if (keep.empty()) {
986  // Hmm, nothing to keep. Grab the postponed task with the highest
987  // priority and keep that instead.
988  nassertv(!postpone.empty());
989  ti = postpone.begin();
990  TaskHeap::iterator max_ti = ti;
991  ++ti;
992  while (ti != postpone.end()) {
993  if ((*ti)->_priority > (*max_ti)->_priority) {
994  max_ti = ti;
995  }
996  }
997 
998  // cerr << "Nothing to keep, keeping " << *(*max_ti) << " instead\n";
999 
1000  keep.push_back(*max_ti);
1001  postpone.erase(max_ti);
1002  }
1003 
1004  _active.swap(keep);
1005  if (_pickup_mode) {
1006  _this_active.insert(_this_active.end(), postpone.begin(), postpone.end());
1007  } else {
1008  _next_active.insert(_next_active.end(), postpone.begin(), postpone.end());
1009  }
1010 
1011  nassertv(!_active.empty());
1012 }
1013 
1014 /**
1015  * The private implementation of stop_threads; assumes the lock is already
1016  * held.
1017  */
1018 void AsyncTaskChain::
1019 do_stop_threads() {
1020  if (_state == S_started || _state == S_interrupted) {
1021  if (task_cat.is_debug() && !_threads.empty()) {
1022  task_cat.debug()
1023  << "Stopping " << _threads.size()
1024  << " threads for " << _manager->get_name()
1025  << " chain " << get_name()
1026  << " in " << *Thread::get_current_thread() << "\n";
1027  }
1028 
1029  _state = S_shutdown;
1030  _cvar.notify_all();
1031  _manager->_frame_cvar.notify_all();
1032 
1033  Threads wait_threads;
1034  wait_threads.swap(_threads);
1035 
1036  // We have to release the lock while we join, so the threads can wake up
1037  // and see that we're shutting down.
1038  _manager->_lock.unlock();
1039  Threads::iterator ti;
1040  for (ti = wait_threads.begin(); ti != wait_threads.end(); ++ti) {
1041  if (task_cat.is_debug()) {
1042  task_cat.debug()
1043  << "Waiting for " << *(*ti) << " in "
1044  << *Thread::get_current_thread() << "\n";
1045  }
1046  (*ti)->join();
1047  if (task_cat.is_spam()) {
1048  task_cat.spam()
1049  << "Done waiting for " << *(*ti) << " in "
1050  << *Thread::get_current_thread() << "\n";
1051  }
1052  }
1053  _manager->_lock.lock();
1054 
1055  _state = S_initial;
1056 
1057  // There might be one busy "thread" still: the main thread.
1058  nassertv(_num_busy_threads == 0 || _num_busy_threads == 1);
1059  cleanup_pickup_mode();
1060  }
1061 }
1062 
1063 /**
1064  * The private implementation of start_threads; assumes the lock is already
1065  * held.
1066  */
1067 void AsyncTaskChain::
1068 do_start_threads() {
1069  if (_state == S_interrupted) {
1070  do_stop_threads();
1071  }
1072 
1073  if (_state == S_initial) {
1074  _state = S_started;
1075  if (Thread::is_threading_supported() && _num_threads > 0) {
1076  if (task_cat.is_debug()) {
1077  task_cat.debug()
1078  << "Starting " << _num_threads << " threads for "
1079  << _manager->get_name() << " chain " << get_name() << "\n";
1080  }
1081  _needs_cleanup = true;
1082  _threads.reserve(_num_threads);
1083  for (int i = 0; i < _num_threads; ++i) {
1084  ostringstream strm;
1085  strm << _manager->get_name() << "_" << get_name() << "_" << i;
1086  PT(AsyncTaskChainThread) thread = new AsyncTaskChainThread(strm.str(), this);
1087  if (thread->start(_thread_priority, true)) {
1088  _threads.push_back(thread);
1089  }
1090  }
1091  }
1092  }
1093 }
1094 
1095 /**
1096  * Returns the set of tasks that are active (and not sleeping) on the task
1097  * chain, at the time of the call. Assumes the lock is held.
1098  */
1099 AsyncTaskCollection AsyncTaskChain::
1100 do_get_active_tasks() const {
1101  AsyncTaskCollection result;
1102 
1103  Threads::const_iterator thi;
1104  for (thi = _threads.begin(); thi != _threads.end(); ++thi) {
1105  AsyncTask *task = (*thi)->_servicing;
1106  if (task != nullptr) {
1107  result.add_task(task);
1108  }
1109  }
1110  TaskHeap::const_iterator ti;
1111  for (ti = _active.begin(); ti != _active.end(); ++ti) {
1112  AsyncTask *task = (*ti);
1113  result.add_task(task);
1114  }
1115  for (ti = _this_active.begin(); ti != _this_active.end(); ++ti) {
1116  AsyncTask *task = (*ti);
1117  result.add_task(task);
1118  }
1119  for (ti = _next_active.begin(); ti != _next_active.end(); ++ti) {
1120  AsyncTask *task = (*ti);
1121  result.add_task(task);
1122  }
1123 
1124  return result;
1125 }
1126 
1127 /**
1128  * Returns the set of tasks that are sleeping (and not active) on the task
1129  * chain, at the time of the call. Assumes the lock is held.
1130  */
1131 AsyncTaskCollection AsyncTaskChain::
1132 do_get_sleeping_tasks() const {
1133  AsyncTaskCollection result;
1134 
1135  TaskHeap::const_iterator ti;
1136  for (ti = _sleeping.begin(); ti != _sleeping.end(); ++ti) {
1137  AsyncTask *task = (*ti);
1138  result.add_task(task);
1139  }
1140 
1141  return result;
1142 }
1143 
1144 /**
1145  * The private implementation of poll(), this assumes the lock is already
1146  * held.
1147  */
1148 void AsyncTaskChain::
1149 do_poll() {
1150  thread_consider_yield();
1151  if (_num_tasks == 0) {
1152  return;
1153  }
1154 
1155  do_start_threads();
1156 
1157  if (!_threads.empty()) {
1158  return;
1159  }
1160 
1161  if (_num_busy_threads != 0) {
1162  // We are recursively nested within another task. Return, with a warning.
1163  task_cat.warning()
1164  << "Ignoring recursive poll() within another task.\n";
1165  return;
1166  }
1167 
1168  nassertv(!_pickup_mode);
1169 
1170  do {
1171  while (!_active.empty()) {
1172  if (_state == S_shutdown || _state == S_interrupted) {
1173  return;
1174  }
1175  int frame = _manager->_clock->get_frame_count();
1176  if (_current_frame != frame) {
1177  _current_frame = frame;
1178  _time_in_frame = 0.0;
1179  _block_till_next_frame = false;
1180  }
1181  if (_block_till_next_frame ||
1182  (_frame_budget >= 0.0 && _time_in_frame >= _frame_budget)) {
1183  // If we've exceeded our budget, stop here. We'll resume from this
1184  // point at the next call to poll().
1185  cleanup_pickup_mode();
1186  return;
1187  }
1188 
1189  _current_sort = _active.front()->get_sort();
1190 
1191  // Normally, there won't be any threads running at the same time we're
1192  // in poll(). But it's possible, if someone calls set_num_threads()
1193  // while we're processing.
1194  _num_busy_threads++;
1195  service_one_task(nullptr);
1196  _num_busy_threads--;
1197  _cvar.notify_all();
1198 
1199  if (!_threads.empty()) {
1200  return;
1201  }
1202  }
1203 
1204  finish_sort_group();
1205  } while (_pickup_mode);
1206 }
1207 
1208 /**
1209  * Clean up the damage from setting pickup mode. This means we restore the
1210  * _active and _next_active lists as they should have been without pickup
1211  * mode, for next frame. Assumes the lock is held.
1212  */
1213 void AsyncTaskChain::
1214 cleanup_pickup_mode() {
1215  if (_pickup_mode) {
1216  _pickup_mode = false;
1217 
1218  // Move everything to the _next_active queue.
1219  _next_active.insert(_next_active.end(), _this_active.begin(), _this_active.end());
1220  _this_active.clear();
1221  _next_active.insert(_next_active.end(), _active.begin(), _active.end());
1222  _active.clear();
1223 
1224  // Now finish the epoch properly.
1225  finish_sort_group();
1226  }
1227 }
1228 
1229 /**
1230  * The private implementation of output(), this assumes the lock is already
1231  * held.
1232  */
1233 void AsyncTaskChain::
1234 do_output(ostream &out) const {
1235  if (_manager != nullptr) {
1236  out << _manager->get_type() << " " << _manager->get_name();
1237  } else {
1238  out << "(no manager)";
1239  }
1240  out << " task chain " << get_name()
1241  << "; " << _num_tasks << " tasks";
1242 }
1243 
1244 /**
1245  * The private implementation of write(), this assumes the lock is already
1246  * held.
1247  */
1248 void AsyncTaskChain::
1249 do_write(ostream &out, int indent_level) const {
1250  indent(out, indent_level)
1251  << "Task chain \"" << get_name() << "\"\n";
1252  if (_num_threads > 0) {
1253  indent(out, indent_level + 2)
1254  << _num_threads << " threads, priority " << _thread_priority << "\n";
1255  }
1256  if (_frame_budget >= 0.0) {
1257  indent(out, indent_level + 2)
1258  << "frame budget " << _frame_budget << " s\n";
1259  }
1260  if (_timeslice_priority) {
1261  indent(out, indent_level + 2)
1262  << "timeslice priority\n";
1263  }
1264  if (_tick_clock) {
1265  indent(out, indent_level + 2)
1266  << "tick clock\n";
1267  }
1268 
1269  static const size_t buffer_size = 1024;
1270  char buffer[buffer_size];
1271  sprintf(buffer, " %-32s %8s %8s %8s %8s %6s",
1272  "Task",
1273  "sleep(s)",
1274  "dt(ms)", "avg", "max",
1275  "sort");
1276  nassertv(strlen(buffer) < buffer_size);
1277 
1278  indent(out, indent_level)
1279  << buffer << "\n";
1280 
1281  indent(out, indent_level);
1282  for (int i = 0; i < 32+8+8+8+8+6+7; ++i) {
1283  out << '-';
1284  }
1285  out << "\n";
1286 
1287  // Collect a list of all active tasks, then sort them into order for output.
1288  TaskHeap tasks = _active;
1289  tasks.insert(tasks.end(), _this_active.begin(), _this_active.end());
1290  tasks.insert(tasks.end(), _next_active.begin(), _next_active.end());
1291 
1292  Threads::const_iterator thi;
1293  for (thi = _threads.begin(); thi != _threads.end(); ++thi) {
1294  AsyncTask *task = (*thi)->_servicing;
1295  if (task != nullptr) {
1296  tasks.push_back(task);
1297  }
1298  }
1299 
1300  double now = _manager->_clock->get_frame_time();
1301 
1302  if (!tasks.empty()) {
1303  sort(tasks.begin(), tasks.end(), AsyncTaskSortPriority());
1304 
1305  // Since AsyncTaskSortPriority() sorts backwards (because of STL's
1306  // push_heap semantics), we go through the task list in reverse order to
1307  // print them forwards.
1308  TaskHeap::reverse_iterator ti;
1309  for (ti = tasks.rbegin(); ti != tasks.rend(); ++ti) {
1310  AsyncTask *task = (*ti);
1311  write_task_line(out, indent_level, task, now);
1312  }
1313  }
1314 
1315  // Instead of iterating through the _sleeping list in heap order, copy it
1316  // and then use repeated pops to get it out in sorted order, for the user's
1317  // satisfaction.
1318  TaskHeap sleeping = _sleeping;
1319  while (!sleeping.empty()) {
1320  PT(AsyncTask) task = sleeping.front();
1321  pop_heap(sleeping.begin(), sleeping.end(), AsyncTaskSortWakeTime());
1322  sleeping.pop_back();
1323 
1324  write_task_line(out, indent_level, task, now);
1325  }
1326 }
1327 
1328 /**
1329  * Writes a single line for a task in the report generated by do_write().
1330  * Assumes the lock is already held.
1331  */
1332 void AsyncTaskChain::
1333 write_task_line(ostream &out, int indent_level, AsyncTask *task, double now) const {
1334  char servicing_flag = ' ';
1335  if (task->_state == AsyncTask::S_servicing) {
1336  servicing_flag = '*';
1337  } else if (task->_state == AsyncTask::S_servicing_removed) {
1338  servicing_flag = '-';
1339  }
1340 
1341  static const size_t buffer_size = 1024;
1342  char buffer[buffer_size];
1343 
1344  if (task->_state == AsyncTask::S_sleeping) {
1345  // For sleeping tasks, include the wake time, as an elapsed time in
1346  // seconds.
1347  string name = task->get_name().substr(0, 32);
1348  sprintf(buffer, "%c%-32s %8.1f",
1349  servicing_flag, name.c_str(),
1350  task->_wake_time - now);
1351  } else {
1352  // For active tasks, don't include a wake time. This means we have more
1353  // space for the name.
1354  string name = task->get_name().substr(0, 41);
1355  sprintf(buffer, "%c%-41s",
1356  servicing_flag, name.c_str());
1357  }
1358  nassertv(strlen(buffer) < buffer_size);
1359 
1360  indent(out, indent_level)
1361  << buffer;
1362 
1363  if (task->_num_frames > 0) {
1364  sprintf(buffer, " %8.1f %8.1f %8.1f %6d",
1365  task->_dt * 1000.0, task->get_average_dt() * 1000.0,
1366  task->_max_dt * 1000.0,
1367  task->_sort);
1368  } else {
1369  // No statistics for a task that hasn't run yet.
1370  sprintf(buffer, " %8s %8s %8s %6d",
1371  "", "", "",
1372  task->_sort);
1373  }
1374 
1375  nassertv(strlen(buffer) < buffer_size);
1376  out << buffer << "\n";
1377 }
1378 
1379 /**
1380  *
1381  */
1382 AsyncTaskChain::AsyncTaskChainThread::
1383 AsyncTaskChainThread(const string &name, AsyncTaskChain *chain) :
1384  Thread(name, chain->get_name()),
1385  _chain(chain),
1386  _servicing(nullptr)
1387 {
1388 }
1389 
1390 /**
1391  *
1392  */
1393 void AsyncTaskChain::AsyncTaskChainThread::
1394 thread_main() {
1395  MutexHolder holder(_chain->_manager->_lock);
1396  while (_chain->_state != S_shutdown && _chain->_state != S_interrupted) {
1397  thread_consider_yield();
1398  if (!_chain->_active.empty() &&
1399  _chain->_active.front()->get_sort() == _chain->_current_sort) {
1400 
1401  int frame = _chain->_manager->_clock->get_frame_count();
1402  if (_chain->_current_frame != frame) {
1403  _chain->_current_frame = frame;
1404  _chain->_time_in_frame = 0.0;
1405  _chain->_block_till_next_frame = false;
1406  }
1407 
1408  // If we've exceeded our frame budget, sleep until the next frame.
1409  if (_chain->_block_till_next_frame ||
1410  (_chain->_frame_budget >= 0.0 && _chain->_time_in_frame >= _chain->_frame_budget)) {
1411  while ((_chain->_block_till_next_frame ||
1412  (_chain->_frame_budget >= 0.0 && _chain->_time_in_frame >= _chain->_frame_budget)) &&
1413  _chain->_state != S_shutdown && _chain->_state != S_interrupted) {
1414  _chain->cleanup_pickup_mode();
1415  _chain->_manager->_frame_cvar.wait();
1416  frame = _chain->_manager->_clock->get_frame_count();
1417  if (_chain->_current_frame != frame) {
1418  _chain->_current_frame = frame;
1419  _chain->_time_in_frame = 0.0;
1420  _chain->_block_till_next_frame = false;
1421  }
1422  }
1423  // Now that it's the next frame, go back to the top of the loop.
1424  continue;
1425  }
1426 
1427  PStatTimer timer(_task_pcollector);
1428  _chain->_num_busy_threads++;
1429  _chain->service_one_task(this);
1430  _chain->_num_busy_threads--;
1431  _chain->_cvar.notify_all();
1432 
1433  } else {
1434  // We've finished all the available tasks of the current sort value. We
1435  // can't pick up a new task until all of the threads finish the tasks
1436  // with the same sort value.
1437  if (_chain->_num_busy_threads == 0) {
1438  // We're the last thread to finish. Update _current_sort.
1439  if (!_chain->finish_sort_group()) {
1440  // Nothing to do. Wait for more tasks to be added.
1441  if (_chain->_sleeping.empty()) {
1442  PStatTimer timer(_wait_pcollector);
1443  _chain->_cvar.wait();
1444  } else {
1445  double wake_time = _chain->do_get_next_wake_time();
1446  double now = _chain->_manager->_clock->get_frame_time();
1447  double timeout = max(wake_time - now, 0.0);
1448  PStatTimer timer(_wait_pcollector);
1449  _chain->_cvar.wait(timeout);
1450  }
1451  }
1452 
1453  } else {
1454  // Wait for the other threads to finish their current task before we
1455  // continue.
1456  PStatTimer timer(_wait_pcollector);
1457  _chain->_cvar.wait();
1458  }
1459  }
1460  }
1461 }
bool get_timeslice_priority() const
Returns the timeslice_priority flag.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
AsyncTaskCollection get_tasks() const
Returns the set of tasks that are active or sleeping on the task chain, at the time of the call.
void wait()
Waits on the condition.
void notify_done(bool clean_exit)
Schedules the done callbacks.
A class to manage a loose queue of isolated tasks, which can be performed either synchronously (in th...
void set_frame_sync(bool frame_sync)
Sets the frame_sync flag.
bool get_tick_clock() const
Returns the tick_clock flag.
double get_frame_budget() const
Returns the maximum amount of time per frame the tasks on this chain are granted for execution.
void add_task(AsyncTask *task)
Adds a new AsyncTask to the collection.
A list of tasks, for instance as returned by some of the AsyncTaskManager query functions.
bool has_delay() const
Returns true if a delay has been set for this task via set_delay(), or false otherwise.
Definition: asyncTask.I:93
void start_threads()
Starts any requested threads to service the tasks on the queue.
get_sort
Returns the task's current sort value.
Definition: asyncTask.h:115
A lightweight class that can be used to automatically start and stop a PStatCollector around a sectio...
Definition: pStatTimer.h:30
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
A lightweight C++ object whose constructor calls acquire() and whose destructor calls release() on a ...
Definition: mutexHolder.h:25
void unlock()
Alias for release() to match C++11 semantics.
Definition: mutexDirect.I:39
void wait_for_tasks()
Blocks until the task list is empty.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
void set_num_threads(int num_threads)
Changes the number of threads for this task chain.
void notify_all()
Informs all of the other threads who are currently blocked on wait() that the relevant condition has ...
A lightweight class that represents a single element that may be timed and/or counted via stats.
void set_frame_budget(double frame_budget)
Sets the maximum amount of time per frame the tasks on this chain are granted for execution.
get_current_thread
Returns a pointer to the currently-executing Thread object.
Definition: thread.h:109
A base class for all things which can have a name.
Definition: namable.h:26
bool get_frame_sync() const
Returns the frame_sync flag.
std::ostream & indent(std::ostream &out, int indent_level)
A handy function for doing text formatting.
Definition: indent.cxx:20
int get_num_running_threads() const
Returns the number of threads that have been created and are actively running.
void add_tasks_from(const AsyncTaskCollection &other)
Adds all the AsyncTasks indicated in the other collection to this task.
ThreadPriority get_thread_priority() const
Returns the priority associated with threads that serve this task chain.
int get_num_tasks() const
Returns the number of tasks that are currently active or sleeping within the task chain.
The AsyncTaskChain is a subset of the AsyncTaskManager.
AsyncTaskCollection get_active_tasks() const
Returns the set of tasks that are active (and not sleeping) on the task chain, at the time of the cal...
int get_num_threads() const
Returns the number of threads that will be servicing tasks for this chain.
get_average_dt
Returns the average amount of time elapsed during each of the task's previous run cycles,...
Definition: asyncTask.h:121
void poll()
Runs through all the tasks in the task list, once, if the task chain is running in single-threaded mo...
This class represents a concrete task performed by an AsyncManager.
Definition: asyncTask.h:32
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
A thread; that is, a lightweight process.
Definition: thread.h:46
void set_tick_clock(bool tick_clock)
Sets the tick_clock flag.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
void set_timeslice_priority(bool timeslice_priority)
Sets the timeslice_priority flag.
void stop_threads()
Stops any threads that are currently running.
TypeHandle is the identifier used to differentiate C++ class types.
Definition: typeHandle.h:81
void set_thread_priority(ThreadPriority priority)
Changes the priority associated with threads that serve this task chain.
double get_next_wake_time() const
Returns the scheduled time (on the manager's clock) of the next sleeping task, on any task chain,...
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
void lock()
Alias for acquire() to match C++11 semantics.
Definition: mutexDirect.I:19
bool has_task(AsyncTask *task) const
Returns true if the indicated task has been added to this AsyncTaskChain, false otherwise.
double get_delay() const
Returns the delay value that has been set via set_delay, if any.
Definition: asyncTask.I:101
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
AsyncTaskCollection get_sleeping_tasks() const
Returns the set of tasks that are sleeping (and not active) on the task chain, at the time of the cal...