Panda3D
 All Classes Functions Variables Enumerations
asyncTaskChain.cxx
1 // Filename: asyncTaskChain.cxx
2 // Created by: drose (23Aug06)
3 //
4 ////////////////////////////////////////////////////////////////////
5 //
6 // PANDA 3D SOFTWARE
7 // Copyright (c) Carnegie Mellon University. All rights reserved.
8 //
9 // All use of this software is subject to the terms of the revised BSD
10 // license. You should have received a copy of this license along
11 // with this source code in a file named "LICENSE."
12 //
13 ////////////////////////////////////////////////////////////////////
14 
15 #include "asyncTaskChain.h"
16 #include "asyncTaskManager.h"
17 #include "event.h"
18 #include "mutexHolder.h"
19 #include "indent.h"
20 #include "pStatClient.h"
21 #include "pStatTimer.h"
22 #include "clockObject.h"
23 #include "config_event.h"
24 #include <algorithm>
25 #include <stdio.h> // For sprintf/snprintf
26 
27 TypeHandle AsyncTaskChain::_type_handle;
28 
29 PStatCollector AsyncTaskChain::_task_pcollector("Task");
30 PStatCollector AsyncTaskChain::_wait_pcollector("Wait");
31 
32 ////////////////////////////////////////////////////////////////////
33 // Function: AsyncTaskChain::Constructor
34 // Access: Published
35 // Description:
36 ////////////////////////////////////////////////////////////////////
37 AsyncTaskChain::
38 AsyncTaskChain(AsyncTaskManager *manager, const string &name) :
39  Namable(name),
40  _manager(manager),
41  _cvar(manager->_lock),
42  _tick_clock(false),
43  _timeslice_priority(false),
44  _num_threads(0),
45  _thread_priority(TP_normal),
46  _frame_budget(-1.0),
47  _frame_sync(false),
48  _num_busy_threads(0),
49  _num_tasks(0),
50  _state(S_initial),
51  _current_sort(-INT_MAX),
52  _pickup_mode(false),
53  _needs_cleanup(false),
54  _current_frame(0),
55  _time_in_frame(0.0),
56  _block_till_next_frame(false)
57 {
58 }
59 
60 ////////////////////////////////////////////////////////////////////
61 // Function: AsyncTaskChain::Destructor
62 // Access: Published, Virtual
63 // Description:
64 ////////////////////////////////////////////////////////////////////
65 AsyncTaskChain::
66 ~AsyncTaskChain() {
67  // We only grab the lock if _needs_cleanup is true. This way, the
68  // temporary AsyncTaskChain objects created (and destructed) within
69  // the task manager won't risk a double-lock.
70  if (_needs_cleanup) {
71  MutexHolder holder(_manager->_lock);
72  do_cleanup();
73  }
74 }
75 
76 ////////////////////////////////////////////////////////////////////
77 // Function: AsyncTaskChain::set_tick_clock
78 // Access: Published
79 // Description: Sets the tick_clock flag. When this is true,
80 // get_clock()->tick() will be called automatically at
81 // each task epoch. This is false by default.
82 ////////////////////////////////////////////////////////////////////
84 set_tick_clock(bool tick_clock) {
85  MutexHolder holder(_manager->_lock);
86  _tick_clock = tick_clock;
87 }
88 
89 ////////////////////////////////////////////////////////////////////
90 // Function: AsyncTaskChain::get_tick_clock
91 // Access: Published
92 // Description: Returns the tick_clock flag. See set_tick_clock().
93 ////////////////////////////////////////////////////////////////////
95 get_tick_clock() const {
96  MutexHolder holder(_manager->_lock);
97  return _tick_clock;
98 }
99 
100 ////////////////////////////////////////////////////////////////////
101 // Function: AsyncTaskChain::set_num_threads
102 // Access: Published
103 // Description: Changes the number of threads for this task chain.
104 // This may require stopping the threads if they are
105 // already running.
106 ////////////////////////////////////////////////////////////////////
107 void AsyncTaskChain::
108 set_num_threads(int num_threads) {
109  nassertv(num_threads >= 0);
110 
111  if (task_cat.is_debug()) {
112  do_output(task_cat.debug());
113  task_cat.debug(false)
114  << ": set_num_threads(" << num_threads << ")\n";
115  }
116 
118  num_threads = 0;
119  }
120 
121  MutexHolder holder(_manager->_lock);
122  if (_num_threads != num_threads) {
123  do_stop_threads();
124  _num_threads = num_threads;
125 
126  if (_num_tasks != 0) {
127  do_start_threads();
128  }
129  }
130 }
131 
132 ////////////////////////////////////////////////////////////////////
133 // Function: AsyncTaskChain::get_num_threads
134 // Access: Published
135 // Description: Returns the number of threads that will be servicing
136 // tasks for this chain. Also see
137 // get_num_running_threads().
138 ////////////////////////////////////////////////////////////////////
141  MutexHolder holder(_manager->_lock);
142  return _num_threads;
143 }
144 
145 ////////////////////////////////////////////////////////////////////
146 // Function: AsyncTaskChain::get_num_running_threads
147 // Access: Published
148 // Description: Returns the number of threads that have been created
149 // and are actively running. This will return 0 before
150 // the threads have been started; it will also return 0
151 // if thread support is not available.
152 ////////////////////////////////////////////////////////////////////
155  MutexHolder holder(_manager->_lock);
156  return _threads.size();
157 }
158 
159 ////////////////////////////////////////////////////////////////////
160 // Function: AsyncTaskChain::set_thread_priority
161 // Access: Published
162 // Description: Changes the priority associated with threads that
163 // serve this task chain. This may require stopping the
164 // threads if they are already running.
165 ////////////////////////////////////////////////////////////////////
166 void AsyncTaskChain::
167 set_thread_priority(ThreadPriority priority) {
168  MutexHolder holder(_manager->_lock);
169  if (_thread_priority != priority) {
170  do_stop_threads();
171  _thread_priority = priority;
172 
173  if (_num_tasks != 0) {
174  do_start_threads();
175  }
176  }
177 }
178 
179 ////////////////////////////////////////////////////////////////////
180 // Function: AsyncTaskChain::get_thread_priority
181 // Access: Published
182 // Description: Returns the priority associated with threads that
183 // serve this task chain.
184 ////////////////////////////////////////////////////////////////////
185 ThreadPriority AsyncTaskChain::
187  MutexHolder holder(_manager->_lock);
188  return _thread_priority;
189 }
190 
191 ////////////////////////////////////////////////////////////////////
192 // Function: AsyncTaskChain::set_frame_budget
193 // Access: Published
194 // Description: Sets the maximum amount of time per frame the tasks
195 // on this chain are granted for execution. If this is
196 // less than zero, there is no limit; if it is >= 0, it
197 // represents a maximum amount of time (in seconds) that
198 // will be used to execute tasks. If this time is
199 // exceeded in any one frame, the task chain will stop
200 // executing tasks until the next frame, as defined by
201 // the TaskManager's clock.
202 ////////////////////////////////////////////////////////////////////
203 void AsyncTaskChain::
204 set_frame_budget(double frame_budget) {
205  MutexHolder holder(_manager->_lock);
206  _frame_budget = frame_budget;
207 }
208 
209 ////////////////////////////////////////////////////////////////////
210 // Function: AsyncTaskChain::get_frame_budget
211 // Access: Published
212 // Description: Returns the maximum amount of time per frame the
213 // tasks on this chain are granted for execution. See
214 // set_frame_budget().
215 ////////////////////////////////////////////////////////////////////
216 double AsyncTaskChain::
218  MutexHolder holder(_manager->_lock);
219  return _frame_budget;
220 }
221 
222 ////////////////////////////////////////////////////////////////////
223 // Function: AsyncTaskChain::set_frame_sync
224 // Access: Published
225 // Description: Sets the frame_sync flag. When this flag is true,
226 // this task chain will be forced to sync with the
227 // TaskManager's clock. It will run no faster than one
228 // epoch per clock frame.
229 //
230 // When this flag is false, the default, the task chain
231 // will finish all of its tasks and then immediately
232 // start from the first task again, regardless of the
233 // clock frame. When it is true, the task chain will
234 // finish all of its tasks and then wait for the clock
235 // to tick to the next frame before resuming the first
236 // task.
237 //
238 // This only makes sense for threaded task chains.
239 // Non-threaded task chains are automatically
240 // synchronous.
241 ////////////////////////////////////////////////////////////////////
242 void AsyncTaskChain::
243 set_frame_sync(bool frame_sync) {
244  MutexHolder holder(_manager->_lock);
245  _frame_sync = frame_sync;
246 }
247 
248 ////////////////////////////////////////////////////////////////////
249 // Function: AsyncTaskChain::get_frame_sync
250 // Access: Published
251 // Description: Returns the frame_sync flag. See set_frame_sync().
252 ////////////////////////////////////////////////////////////////////
253 bool AsyncTaskChain::
254 get_frame_sync() const {
255  MutexHolder holder(_manager->_lock);
256  return _frame_sync;
257 }
258 
259 ////////////////////////////////////////////////////////////////////
260 // Function: AsyncTaskChain::set_timeslice_priority
261 // Access: Published
262 // Description: Sets the timeslice_priority flag. This changes
263 // the interpretation of priority, and the number of
264 // times per epoch each task will run.
265 //
266 // When this flag is true, some tasks might not run in
267 // any given epoch. Instead, tasks with priority higher
268 // than 1 will be given precedence, in proportion to the
269 // amount of time they have already used. This gives
270 // higher-priority tasks more runtime than
271 // lower-priority tasks. Each task gets the amount of
272 // time proportional to its priority value, so a task
273 // with priority 100 will get five times as much
274 // processing time as a task with priority 20. For
275 // these purposes, priority values less than 1 are
276 // deemed to be equal to 1.
277 //
278 // When this flag is false (the default), all tasks are
279 // run exactly once each epoch, round-robin style.
280 // Priority is only used to determine which task runs
281 // first within tasks of the same sort value.
282 ////////////////////////////////////////////////////////////////////
283 void AsyncTaskChain::
284 set_timeslice_priority(bool timeslice_priority) {
285  MutexHolder holder(_manager->_lock);
286  _timeslice_priority = timeslice_priority;
287 }
288 
289 ////////////////////////////////////////////////////////////////////
290 // Function: AsyncTaskChain::get_timeslice_priority
291 // Access: Published
292 // Description: Returns the timeslice_priority flag. This changes
293 // the interpretation of priority, and the number of
294 // times per epoch each task will run. See
295 // set_timeslice_priority().
296 ////////////////////////////////////////////////////////////////////
297 bool AsyncTaskChain::
299  MutexHolder holder(_manager->_lock);
300  return _timeslice_priority;
301 }
302 
303 ////////////////////////////////////////////////////////////////////
304 // Function: AsyncTaskChain::stop_threads
305 // Access: Published
306 // Description: Stops any threads that are currently running. If any
307 // tasks are still pending and have not yet been picked
308 // up by a thread, they will not be serviced unless
309 // poll() or start_threads() is later called.
310 ////////////////////////////////////////////////////////////////////
311 void AsyncTaskChain::
313  if (_state == S_started || _state == S_interrupted) {
314  // Clean up all of the threads.
315  MutexHolder holder(_manager->_lock);
316  do_stop_threads();
317  }
318 }
319 
320 ////////////////////////////////////////////////////////////////////
321 // Function: AsyncTaskChain::start_threads
322 // Access: Published
323 // Description: Starts any requested threads to service the tasks on
324 // the queue. This is normally not necessary, since
325 // adding a task will start the threads automatically.
326 ////////////////////////////////////////////////////////////////////
327 void AsyncTaskChain::
329  if (_state == S_initial || _state == S_interrupted) {
330  MutexHolder holder(_manager->_lock);
331  do_start_threads();
332  }
333 }
334 
335 ////////////////////////////////////////////////////////////////////
336 // Function: AsyncTaskChain::has_task
337 // Access: Published
338 // Description: Returns true if the indicated task has been added to
339 // this AsyncTaskChain, false otherwise.
340 ////////////////////////////////////////////////////////////////////
341 bool AsyncTaskChain::
342 has_task(AsyncTask *task) const {
343  MutexHolder holder(_manager->_lock);
344 
345  if (task->_chain != this) {
346  nassertr(!do_has_task(task), false);
347  return false;
348  }
349 
350  if (task->_state == AsyncTask::S_servicing_removed) {
351  return false;
352  }
353 
354  // The task might not actually be in the active queue, since it
355  // might be being serviced right now. That's OK.
356  return true;
357 }
358 
359 ////////////////////////////////////////////////////////////////////
360 // Function: AsyncTaskChain::wait_for_tasks
361 // Access: Published
362 // Description: Blocks until the task list is empty.
363 ////////////////////////////////////////////////////////////////////
364 void AsyncTaskChain::
366  MutexHolder holder(_manager->_lock);
367  do_wait_for_tasks();
368 }
369 
370 ////////////////////////////////////////////////////////////////////
371 // Function: AsyncTaskChain::get_num_tasks
372 // Access: Published
373 // Description: Returns the number of tasks that are currently active
374 // or sleeping within the task chain.
375 ////////////////////////////////////////////////////////////////////
377 get_num_tasks() const {
378  MutexHolder holder(_manager->_lock);
379  return _num_tasks;
380 }
381 
382 ////////////////////////////////////////////////////////////////////
383 // Function: AsyncTaskChain::get_tasks
384 // Access: Published
385 // Description: Returns the set of tasks that are active or sleeping
386 // on the task chain, at the time of the call.
387 ////////////////////////////////////////////////////////////////////
389 get_tasks() const {
390  MutexHolder holder(_manager->_lock);
391  AsyncTaskCollection result = do_get_active_tasks();
392  result.add_tasks_from(do_get_sleeping_tasks());
393  return result;
394 }
395 
396 ////////////////////////////////////////////////////////////////////
397 // Function: AsyncTaskChain::get_active_tasks
398 // Access: Published
399 // Description: Returns the set of tasks that are active (and not
400 // sleeping) on the task chain, at the time of the
401 // call.
402 ////////////////////////////////////////////////////////////////////
405  MutexHolder holder(_manager->_lock);
406  return do_get_active_tasks();
407 }
408 
409 ////////////////////////////////////////////////////////////////////
410 // Function: AsyncTaskChain::get_sleeping_tasks
411 // Access: Published
412 // Description: Returns the set of tasks that are sleeping (and not
413 // active) on the task chain, at the time of the
414 // call.
415 ////////////////////////////////////////////////////////////////////
418  MutexHolder holder(_manager->_lock);
419  return do_get_sleeping_tasks();
420 }
421 
422 ////////////////////////////////////////////////////////////////////
423 // Function: AsyncTaskChain::poll
424 // Access: Published
425 // Description: Runs through all the tasks in the task list, once, if
426 // the task chain is running in single-threaded mode
427 // (no threads available). This method does nothing in
428 // threaded mode, so it may safely be called in either
429 // case.
430 //
431 // Normally, you would not call this function directly;
432 // instead, call AsyncTaskManager::poll(), which polls
433 // all of the task chains in sequence.
434 ////////////////////////////////////////////////////////////////////
435 void AsyncTaskChain::
436 poll() {
437  MutexHolder holder(_manager->_lock);
438  do_poll();
439 }
440 
441 ////////////////////////////////////////////////////////////////////
442 // Function: AsyncTaskChain::get_next_wake_time
443 // Access: Published
444 // Description: Returns the scheduled time (on the manager's clock)
445 // of the next sleeping task, on any task chain, to
446 // awaken. Returns -1 if there are no sleeping tasks.
447 ////////////////////////////////////////////////////////////////////
448 double AsyncTaskChain::
450  MutexHolder holder(_manager->_lock);
451  return do_get_next_wake_time();
452 }
453 
454 ////////////////////////////////////////////////////////////////////
455 // Function: AsyncTaskChain::output
456 // Access: Published, Virtual
457 // Description:
458 ////////////////////////////////////////////////////////////////////
459 void AsyncTaskChain::
460 output(ostream &out) const {
461  MutexHolder holder(_manager->_lock);
462  do_output(out);
463 }
464 
465 ////////////////////////////////////////////////////////////////////
466 // Function: AsyncTaskChain::write
467 // Access: Published, Virtual
468 // Description:
469 ////////////////////////////////////////////////////////////////////
470 void AsyncTaskChain::
471 write(ostream &out, int indent_level) const {
472  MutexHolder holder(_manager->_lock);
473  do_write(out, indent_level);
474 }
475 
476 ////////////////////////////////////////////////////////////////////
477 // Function: AsyncTaskChain::do_add
478 // Access: Protected
479 // Description: Adds the indicated task to the active queue. It is
480 // an error if the task is already added to this or any
481 // other active queue.
482 //
483 // This is normally called only by the AsyncTaskManager.
484 // Assumes the lock is already held.
485 ////////////////////////////////////////////////////////////////////
486 void AsyncTaskChain::
487 do_add(AsyncTask *task) {
488  nassertv(task->_chain == NULL &&
489  task->_manager == NULL &&
490  task->_chain_name == get_name() &&
491  task->_state == AsyncTask::S_inactive);
492  nassertv(!do_has_task(task));
493 
494  do_start_threads();
495 
496  task->_chain = this;
497  task->_manager = _manager;
498 
499  double now = _manager->_clock->get_frame_time();
500  task->_start_time = now;
501  task->_start_frame = _manager->_clock->get_frame_count();
502 
503  _manager->add_task_by_name(task);
504 
505  if (task->has_delay()) {
506  // This is a deferred task. Add it to the sleeping queue.
507  task->_wake_time = now + task->get_delay();
508  task->_start_time = task->_wake_time;
509  task->_state = AsyncTask::S_sleeping;
510  _sleeping.push_back(task);
511  push_heap(_sleeping.begin(), _sleeping.end(), AsyncTaskSortWakeTime());
512 
513  } else {
514  // This is an active task. Add it to the active set.
515  task->_state = AsyncTask::S_active;
516  if (task_cat.is_spam()) {
517  task_cat.spam()
518  << "Adding " << *task << " with sort " << task->get_sort()
519  << " to chain " << get_name() << " with current_sort "
520  << _current_sort << "\n";
521  }
522  if (task->get_sort() >= _current_sort) {
523  // It will run this frame.
524  _active.push_back(task);
525  push_heap(_active.begin(), _active.end(), AsyncTaskSortPriority());
526  } else {
527  // It will run next frame.
528  _next_active.push_back(task);
529  }
530  }
531  ++_num_tasks;
532  ++(_manager->_num_tasks);
533  _needs_cleanup = true;
534 
535  _cvar.notify_all();
536 }
537 
538 ////////////////////////////////////////////////////////////////////
539 // Function: AsyncTaskChain::do_remove
540 // Access: Protected
541 // Description: Removes the indicated task from this chain. Returns
542 // true if removed, false otherwise. Assumes the lock
543 // is already held. The task->upon_death() method is
544 // *not* called.
545 ////////////////////////////////////////////////////////////////////
546 bool AsyncTaskChain::
547 do_remove(AsyncTask *task) {
548  bool removed = false;
549 
550  nassertr(task->_chain == this, false);
551 
552  switch (task->_state) {
553  case AsyncTask::S_servicing:
554  // This task is being serviced.
555  task->_state = AsyncTask::S_servicing_removed;
556  removed = true;
557  break;
558 
559  case AsyncTask::S_servicing_removed:
560  // Being serviced, though it will be removed later.
561  break;
562 
563  case AsyncTask::S_sleeping:
564  // Sleeping, easy.
565  {
566  int index = find_task_on_heap(_sleeping, task);
567  nassertr(index != -1, false);
568  _sleeping.erase(_sleeping.begin() + index);
569  make_heap(_sleeping.begin(), _sleeping.end(), AsyncTaskSortWakeTime());
570  removed = true;
571  cleanup_task(task, false, false);
572  }
573  break;
574 
575  case AsyncTask::S_active:
576  {
577  // Active, but not being serviced, easy.
578  int index = find_task_on_heap(_active, task);
579  if (index != -1) {
580  _active.erase(_active.begin() + index);
581  make_heap(_active.begin(), _active.end(), AsyncTaskSortPriority());
582  } else {
583  index = find_task_on_heap(_next_active, task);
584  if (index != -1) {
585  _next_active.erase(_next_active.begin() + index);
586  } else {
587  index = find_task_on_heap(_this_active, task);
588  nassertr(index != -1, false);
589  }
590  }
591  removed = true;
592  cleanup_task(task, false, false);
593  }
594 
595  default:
596  break;
597  }
598 
599  return removed;
600 }
601 
602 ////////////////////////////////////////////////////////////////////
603 // Function: AsyncTaskChain::do_wait_for_tasks
604 // Access: Protected
605 // Description: Blocks until the task list is empty. Assumes the
606 // lock is held.
607 ////////////////////////////////////////////////////////////////////
608 void AsyncTaskChain::
609 do_wait_for_tasks() {
610  do_start_threads();
611 
612  if (_threads.empty()) {
613  // Non-threaded case.
614  while (_num_tasks > 0) {
615  if (_state == S_shutdown || _state == S_interrupted) {
616  return;
617  }
618  do_poll();
619  }
620 
621  } else {
622  // Threaded case.
623  while (_num_tasks > 0) {
624  if (_state == S_shutdown || _state == S_interrupted) {
625  return;
626  }
627 
628  PStatTimer timer(_wait_pcollector);
629  _cvar.wait();
630  }
631  }
632 }
633 
634 ////////////////////////////////////////////////////////////////////
635 // Function: AsyncTaskChain::do_cleanup
636 // Access: Protected
637 // Description: Stops all threads and messily empties the task list.
638 // This is intended to be called on destruction only.
639 // Assumes the lock is already held.
640 ////////////////////////////////////////////////////////////////////
641 void AsyncTaskChain::
642 do_cleanup() {
643  if (task_cat.is_spam()) {
644  do_output(task_cat.spam());
645  task_cat.spam(false)
646  << ": do_cleanup()\n";
647  }
648  do_stop_threads();
649  _num_threads = 0;
650 
651  // Don't call the upon_death functions while we clean up the tasks.
652  // Instead, store all the tasks in a list as we clean them up, and
653  // then call the upon_death functions all at once. We do this
654  // because calling upon_death wil release the lock, allowing the
655  // iterators to become invalid.
656 
657  TaskHeap dead;
658  dead.reserve(_num_tasks);
659 
660  _needs_cleanup = false;
661 
662  TaskHeap::const_iterator ti;
663  for (ti = _active.begin(); ti != _active.end(); ++ti) {
664  AsyncTask *task = (*ti);
665  dead.push_back(task);
666  cleanup_task(task, false, false);
667  }
668  for (ti = _this_active.begin(); ti != _this_active.end(); ++ti) {
669  AsyncTask *task = (*ti);
670  dead.push_back(task);
671  cleanup_task(task, false, false);
672  }
673  for (ti = _next_active.begin(); ti != _next_active.end(); ++ti) {
674  AsyncTask *task = (*ti);
675  dead.push_back(task);
676  cleanup_task(task, false, false);
677  }
678  for (ti = _sleeping.begin(); ti != _sleeping.end(); ++ti) {
679  AsyncTask *task = (*ti);
680  dead.push_back(task);
681  cleanup_task(task, false, false);
682  }
683 
684  // There might still be one task remaining: the currently-executing
685  // task.
686  nassertv(_num_tasks == 0 || _num_tasks == 1);
687 
688  // Now go back and call the upon_death functions.
689  _manager->_lock.release();
690  for (ti = dead.begin(); ti != dead.end(); ++ti) {
691  (*ti)->upon_death(_manager, false);
692  }
693  _manager->_lock.acquire();
694 
695  if (task_cat.is_spam()) {
696  do_output(task_cat.spam());
697  task_cat.spam(false)
698  << ": done do_cleanup()\n";
699  }
700 }
701 
702 ////////////////////////////////////////////////////////////////////
703 // Function: AsyncTaskChain::do_has_task
704 // Access: Protected
705 // Description: Returns true if the task is on one of the task lists,
706 // false if it is not (false may mean that the task is
707 // currently being serviced). Assumes the lock is
708 // currently held.
709 ////////////////////////////////////////////////////////////////////
710 bool AsyncTaskChain::
711 do_has_task(AsyncTask *task) const {
712  return (find_task_on_heap(_active, task) != -1 ||
713  find_task_on_heap(_next_active, task) != -1 ||
714  find_task_on_heap(_sleeping, task) != -1 ||
715  find_task_on_heap(_this_active, task) != -1);
716 }
717 
718 ////////////////////////////////////////////////////////////////////
719 // Function: AsyncTaskChain::find_task_on_heap
720 // Access: Protected
721 // Description: Returns the index number of the indicated task within
722 // the specified task list, or -1 if the task is not
723 // found in the list (this may mean that it is currently
724 // being serviced). Assumes that the lock is currently
725 // held.
726 ////////////////////////////////////////////////////////////////////
727 int AsyncTaskChain::
728 find_task_on_heap(const TaskHeap &heap, AsyncTask *task) const {
729  for (int i = 0; i < (int)heap.size(); ++i) {
730  if (heap[i] == task) {
731  return i;
732  }
733  }
734 
735  return -1;
736 }
737 
738 ////////////////////////////////////////////////////////////////////
739 // Function: AsyncTaskChain::service_one_task
740 // Access: Protected
741 // Description: Pops a single task off the active queue, services it,
742 // and restores it to the end of the queue. This is
743 // called internally only within one of the task
744 // threads. Assumes the lock is already held.
745 //
746 // Note that the lock may be temporarily released by
747 // this method.
748 ////////////////////////////////////////////////////////////////////
749 void AsyncTaskChain::
750 service_one_task(AsyncTaskChain::AsyncTaskChainThread *thread) {
751  if (!_active.empty()) {
752  PT(AsyncTask) task = _active.front();
753  pop_heap(_active.begin(), _active.end(), AsyncTaskSortPriority());
754  _active.pop_back();
755 
756  if (thread != (AsyncTaskChain::AsyncTaskChainThread *)NULL) {
757  thread->_servicing = task;
758  }
759 
760  if (task_cat.is_spam()) {
761  task_cat.spam()
762  << "Servicing " << *task << " in "
763  << *Thread::get_current_thread() << "\n";
764  }
765 
766  nassertv(task->get_sort() == _current_sort);
767  nassertv(task->_state == AsyncTask::S_active);
768  task->_state = AsyncTask::S_servicing;
769  task->_servicing_thread = thread;
770 
771  AsyncTask::DoneStatus ds = task->unlock_and_do_task();
772 
773  if (thread != (AsyncTaskChain::AsyncTaskChainThread *)NULL) {
774  thread->_servicing = NULL;
775  }
776  task->_servicing_thread = NULL;
777 
778  if (task->_chain == this) {
779  if (task->_state == AsyncTask::S_servicing_removed) {
780  // This task wants to kill itself.
781  cleanup_task(task, true, false);
782 
783  } else if (task->_chain_name != get_name()) {
784  // The task wants to jump to a different chain.
785  PT(AsyncTask) hold_task = task;
786  cleanup_task(task, false, false);
787  task->jump_to_task_chain(_manager);
788 
789  } else {
790  switch (ds) {
791  case AsyncTask::DS_cont:
792  // The task is still alive; put it on the next frame's active
793  // queue.
794  task->_state = AsyncTask::S_active;
795  _next_active.push_back(task);
796  _cvar.notify_all();
797  break;
798 
799  case AsyncTask::DS_again:
800  // The task wants to sleep again.
801  {
802  double now = _manager->_clock->get_frame_time();
803  task->_wake_time = now + task->get_delay();
804  task->_start_time = task->_wake_time;
805  task->_state = AsyncTask::S_sleeping;
806  _sleeping.push_back(task);
807  push_heap(_sleeping.begin(), _sleeping.end(), AsyncTaskSortWakeTime());
808  if (task_cat.is_spam()) {
809  task_cat.spam()
810  << "Sleeping " << *task << ", wake time at "
811  << task->_wake_time - now << "\n";
812  }
813  _cvar.notify_all();
814  }
815  break;
816 
817  case AsyncTask::DS_pickup:
818  // The task wants to run again this frame if possible.
819  task->_state = AsyncTask::S_active;
820  _this_active.push_back(task);
821  _cvar.notify_all();
822  break;
823 
824  case AsyncTask::DS_interrupt:
825  // The task had an exception and wants to raise a big flag.
826  task->_state = AsyncTask::S_active;
827  _next_active.push_back(task);
828  if (_state == S_started) {
829  _state = S_interrupted;
830  _cvar.notify_all();
831  }
832  break;
833 
834  default:
835  // The task has finished.
836  cleanup_task(task, true, true);
837  }
838  }
839  } else {
840  task_cat.error()
841  << "Task is no longer on chain " << get_name()
842  << ": " << *task << "\n";
843  }
844 
845  if (task_cat.is_spam()) {
846  task_cat.spam()
847  << "Done servicing " << *task << " in "
848  << *Thread::get_current_thread() << "\n";
849  }
850  }
851  thread_consider_yield();
852 }
853 
854 ////////////////////////////////////////////////////////////////////
855 // Function: AsyncTaskChain::cleanup_task
856 // Access: Protected
857 // Description: Called internally when a task has completed (or been
858 // interrupted) and is about to be removed from the
859 // active queue. Assumes the lock is held.
860 //
861 // If upon_death is true, then task->upon_death() will
862 // also be called, with the indicated clean_exit
863 // parameter.
864 //
865 // Note that the lock may be temporarily released by
866 // this method.
867 ////////////////////////////////////////////////////////////////////
868 void AsyncTaskChain::
869 cleanup_task(AsyncTask *task, bool upon_death, bool clean_exit) {
870  if (task_cat.is_spam()) {
871  do_output(task_cat.spam());
872  task_cat.spam(false)
873  << ": cleanup_task(" << *task << ", " << upon_death << ", " << clean_exit
874  << ")\n";
875  }
876 
877  nassertv(task->_chain == this);
878  PT(AsyncTask) hold_task = task;
879 
880  task->_state = AsyncTask::S_inactive;
881  task->_chain = NULL;
882  task->_manager = NULL;
883  --_num_tasks;
884  --(_manager->_num_tasks);
885 
886  _manager->remove_task_by_name(task);
887 
888  if (upon_death) {
889  _manager->_lock.release();
890  task->upon_death(_manager, clean_exit);
891  _manager->_lock.acquire();
892  }
893 }
894 
895 ////////////////////////////////////////////////////////////////////
896 // Function: AsyncTaskChain::finish_sort_group
897 // Access: Protected
898 // Description: Called internally when all tasks of a given sort
899 // value have been completed, and it is time to
900 // increment to the next sort value, or begin the next
901 // epoch. Assumes the lock is held.
902 //
903 // Returns true if there are more tasks on the queue
904 // after this operation, or false if the task list is
905 // empty and we need to wait.
906 ////////////////////////////////////////////////////////////////////
907 bool AsyncTaskChain::
908 finish_sort_group() {
909  nassertr(_num_busy_threads == 0, true);
910 
911  if (!_threads.empty()) {
912  PStatClient::thread_tick(get_name());
913  }
914 
915  if (!_active.empty()) {
916  // There are more tasks; just set the next sort value.
917  nassertr(_current_sort < _active.front()->get_sort(), true);
918  _current_sort = _active.front()->get_sort();
919  _cvar.notify_all();
920  return true;
921  }
922 
923  // There are no more tasks in this epoch; advance to the next epoch.
924 
925  if (!_this_active.empty() && _frame_budget >= 0.0) {
926  // Enter pickup mode. This is a special mode at the end of the
927  // epoch in which we are just re-running the tasks that think they
928  // can still run within the frame, in an attempt to use up our
929  // frame budget.
930 
931  if (task_cat.is_spam()) {
932  do_output(task_cat.spam());
933  task_cat.spam(false)
934  << ": next epoch (pickup mode)\n";
935  }
936 
937  _pickup_mode = true;
938  _active.swap(_this_active);
939 
940  } else {
941  // Not in pickup mode.
942 
943  if (task_cat.is_spam()) {
944  do_output(task_cat.spam());
945  task_cat.spam(false)
946  << ": next epoch\n";
947  }
948 
949  _pickup_mode = false;
950 
951  // Here, there's no difference between _this_active and
952  // _next_active. Combine them.
953  _next_active.insert(_next_active.end(), _this_active.begin(), _this_active.end());
954  _this_active.clear();
955 
956  _active.swap(_next_active);
957 
958  // We only tick the clock and wake sleepers in normal mode, the
959  // first time through the task list; not in pickup mode when we
960  // are re-running the stragglers just to use up our frame budget.
961 
962  if (_tick_clock) {
963  if (task_cat.is_spam()) {
964  do_output(task_cat.spam());
965  task_cat.spam(false)
966  << ": tick clock\n";
967  }
968  _manager->_clock->tick();
969  _manager->_frame_cvar.notify_all();
970 
971  } else if (_frame_sync) {
972  // If we're a synced chain, we have to wait at the end of the
973  // epoch for someone else to tick the clock.
974  _block_till_next_frame = true;
975  }
976 
977  // Check for any sleeping tasks that need to be woken.
978  double now = _manager->_clock->get_frame_time();
979  while (!_sleeping.empty() && _sleeping.front()->_wake_time <= now) {
980  PT(AsyncTask) task = _sleeping.front();
981  if (task_cat.is_spam()) {
982  task_cat.spam()
983  << "Waking " << *task << ", wake time at "
984  << task->_wake_time - now << "\n";
985  }
986  pop_heap(_sleeping.begin(), _sleeping.end(), AsyncTaskSortWakeTime());
987  _sleeping.pop_back();
988  task->_state = AsyncTask::S_active;
989  task->_start_frame = _manager->_clock->get_frame_count();
990  _active.push_back(task);
991  }
992 
993  if (task_cat.is_spam()) {
994  if (_sleeping.empty()) {
995  task_cat.spam()
996  << "No more tasks on sleeping queue.\n";
997  } else {
998  task_cat.spam()
999  << "Next sleeper: " << *_sleeping.front() << ", wake time at "
1000  << _sleeping.front()->_wake_time - now << "\n";
1001  }
1002  }
1003 
1004  // Any tasks that are on the active queue at the beginning of the
1005  // epoch are deemed to have run one frame (or to be about to).
1006  TaskHeap::const_iterator ti;
1007  for (ti = _active.begin(); ti != _active.end(); ++ti) {
1008  AsyncTask *task = (*ti);
1009  ++task->_num_frames;
1010  }
1011  }
1012 
1013  if (_timeslice_priority) {
1014  filter_timeslice_priority();
1015  }
1016 
1017  nassertr((size_t)_num_tasks == _active.size() + _this_active.size() + _next_active.size() + _sleeping.size(), true);
1018  make_heap(_active.begin(), _active.end(), AsyncTaskSortPriority());
1019 
1020  _current_sort = -INT_MAX;
1021 
1022  if (!_active.empty()) {
1023  // Signal the threads to start executing the first task again.
1024  _cvar.notify_all();
1025  return true;
1026  }
1027 
1028  // There are no tasks to be had anywhere. Chill.
1029  _pickup_mode = false;
1030  nassertr(_this_active.empty(), false);
1031  return false;
1032 }
1033 
1034 ////////////////////////////////////////////////////////////////////
1035 // Function: AsyncTaskChain::filter_timeslice_priority
1036 // Access: Protected
1037 // Description: Called to filter the _active tasks list when we are
1038 // in the special timeslice_priority mode. In this
1039 // mode, go through and postpone any tasks that have
1040 // already exceeded their priority budget for this
1041 // epoch.
1042 //
1043 // Assumes the lock is already held.
1044 ////////////////////////////////////////////////////////////////////
1045 void AsyncTaskChain::
1046 filter_timeslice_priority() {
1047  if (_active.empty()) {
1048  return;
1049  }
1050  nassertv(_timeslice_priority);
1051 
1052  // We must first sum up the average per-epoch runtime of each task.
1053  double net_runtime = 0.0;
1054  int net_priority = 0;
1055 
1056  TaskHeap::iterator ti;
1057  for (ti = _active.begin(); ti != _active.end(); ++ti) {
1058  AsyncTask *task = (*ti);
1059  double runtime = max(task->get_average_dt(), 0.0);
1060  int priority = max(task->_priority, 1);
1061  net_runtime += runtime;
1062  net_priority += priority;
1063  }
1064 
1065  // That gives us a timeslice budget per priority value.
1066  double average_budget = net_runtime / (double)net_priority;
1067 
1068  TaskHeap keep, postpone;
1069  for (ti = _active.begin(); ti != _active.end(); ++ti) {
1070  AsyncTask *task = (*ti);
1071  double runtime = max(task->get_average_dt(), 0.0);
1072  int priority = max(task->_priority, 1);
1073  double consumed = runtime / (double)priority;
1074  // cerr << *task << " consumed " << consumed << " vs. " << average_budget << "\n";
1075  if (consumed > average_budget) {
1076  // Postpone. Run this task next epoch.
1077  postpone.push_back(task);
1078  } else {
1079  // Keep, and run this task this epoch.
1080  keep.push_back(task);
1081  }
1082  }
1083 
1084  if (keep.empty()) {
1085  // Hmm, nothing to keep. Grab the postponed task with the highest
1086  // priority and keep that instead.
1087  nassertv(!postpone.empty());
1088  ti = postpone.begin();
1089  TaskHeap::iterator max_ti = ti;
1090  ++ti;
1091  while (ti != postpone.end()) {
1092  if ((*ti)->_priority > (*max_ti)->_priority) {
1093  max_ti = ti;
1094  }
1095  }
1096 
1097  // cerr << "Nothing to keep, keeping " << *(*max_ti) << " instead\n";
1098 
1099  keep.push_back(*max_ti);
1100  postpone.erase(max_ti);
1101  }
1102 
1103  _active.swap(keep);
1104  if (_pickup_mode) {
1105  _this_active.insert(_this_active.end(), postpone.begin(), postpone.end());
1106  } else {
1107  _next_active.insert(_next_active.end(), postpone.begin(), postpone.end());
1108  }
1109 
1110  nassertv(!_active.empty());
1111 }
1112 
1113 ////////////////////////////////////////////////////////////////////
1114 // Function: AsyncTaskChain::do_stop_threads
1115 // Access: Protected
1116 // Description: The private implementation of stop_threads; assumes
1117 // the lock is already held.
1118 ////////////////////////////////////////////////////////////////////
1119 void AsyncTaskChain::
1120 do_stop_threads() {
1121  if (_state == S_started || _state == S_interrupted) {
1122  if (task_cat.is_debug() && !_threads.empty()) {
1123  task_cat.debug()
1124  << "Stopping " << _threads.size()
1125  << " threads for " << _manager->get_name()
1126  << " chain " << get_name()
1127  << " in " << *Thread::get_current_thread() << "\n";
1128  }
1129 
1130  _state = S_shutdown;
1131  _cvar.notify_all();
1132  _manager->_frame_cvar.notify_all();
1133 
1134  Threads wait_threads;
1135  wait_threads.swap(_threads);
1136 
1137  // We have to release the lock while we join, so the threads can
1138  // wake up and see that we're shutting down.
1139  _manager->_lock.release();
1140  Threads::iterator ti;
1141  for (ti = wait_threads.begin(); ti != wait_threads.end(); ++ti) {
1142  if (task_cat.is_debug()) {
1143  task_cat.debug()
1144  << "Waiting for " << *(*ti) << " in "
1145  << *Thread::get_current_thread() << "\n";
1146  }
1147  (*ti)->join();
1148  if (task_cat.is_spam()) {
1149  task_cat.spam()
1150  << "Done waiting for " << *(*ti) << " in "
1151  << *Thread::get_current_thread() << "\n";
1152  }
1153  }
1154  _manager->_lock.acquire();
1155 
1156  _state = S_initial;
1157 
1158  // There might be one busy "thread" still: the main thread.
1159  nassertv(_num_busy_threads == 0 || _num_busy_threads == 1);
1160  cleanup_pickup_mode();
1161  }
1162 }
1163 
1164 ////////////////////////////////////////////////////////////////////
1165 // Function: AsyncTaskChain::do_start_threads
1166 // Access: Protected
1167 // Description: The private implementation of start_threads; assumes
1168 // the lock is already held.
1169 ////////////////////////////////////////////////////////////////////
1170 void AsyncTaskChain::
1171 do_start_threads() {
1172  if (_state == S_interrupted) {
1173  do_stop_threads();
1174  }
1175 
1176  if (_state == S_initial) {
1177  _state = S_started;
1178  if (Thread::is_threading_supported() && _num_threads > 0) {
1179  if (task_cat.is_debug()) {
1180  task_cat.debug()
1181  << "Starting " << _num_threads << " threads for "
1182  << _manager->get_name() << " chain " << get_name() << "\n";
1183  }
1184  _needs_cleanup = true;
1185  _threads.reserve(_num_threads);
1186  for (int i = 0; i < _num_threads; ++i) {
1187  ostringstream strm;
1188  strm << _manager->get_name() << "_" << get_name() << "_" << i;
1189  PT(AsyncTaskChainThread) thread = new AsyncTaskChainThread(strm.str(), this);
1190  if (thread->start(_thread_priority, true)) {
1191  _threads.push_back(thread);
1192  }
1193  }
1194  }
1195  }
1196 }
1197 
1198 ////////////////////////////////////////////////////////////////////
1199 // Function: AsyncTaskChain::do_get_active_tasks
1200 // Access: Protected
1201 // Description: Returns the set of tasks that are active (and not
1202 // sleeping) on the task chain, at the time of the
1203 // call. Assumes the lock is held.
1204 ////////////////////////////////////////////////////////////////////
1205 AsyncTaskCollection AsyncTaskChain::
1206 do_get_active_tasks() const {
1207  AsyncTaskCollection result;
1208 
1209  Threads::const_iterator thi;
1210  for (thi = _threads.begin(); thi != _threads.end(); ++thi) {
1211  AsyncTask *task = (*thi)->_servicing;
1212  if (task != (AsyncTask *)NULL) {
1213  result.add_task(task);
1214  }
1215  }
1216  TaskHeap::const_iterator ti;
1217  for (ti = _active.begin(); ti != _active.end(); ++ti) {
1218  AsyncTask *task = (*ti);
1219  result.add_task(task);
1220  }
1221  for (ti = _this_active.begin(); ti != _this_active.end(); ++ti) {
1222  AsyncTask *task = (*ti);
1223  result.add_task(task);
1224  }
1225  for (ti = _next_active.begin(); ti != _next_active.end(); ++ti) {
1226  AsyncTask *task = (*ti);
1227  result.add_task(task);
1228  }
1229 
1230  return result;
1231 }
1232 
1233 ////////////////////////////////////////////////////////////////////
1234 // Function: AsyncTaskChain::do_get_sleeping_tasks
1235 // Access: Published
1236 // Description: Returns the set of tasks that are sleeping (and not
1237 // active) on the task chain, at the time of the
1238 // call. Assumes the lock is held.
1239 ////////////////////////////////////////////////////////////////////
1240 AsyncTaskCollection AsyncTaskChain::
1241 do_get_sleeping_tasks() const {
1242  AsyncTaskCollection result;
1243 
1244  TaskHeap::const_iterator ti;
1245  for (ti = _sleeping.begin(); ti != _sleeping.end(); ++ti) {
1246  AsyncTask *task = (*ti);
1247  result.add_task(task);
1248  }
1249 
1250  return result;
1251 }
1252 
1253 ////////////////////////////////////////////////////////////////////
1254 // Function: AsyncTaskChain::do_poll
1255 // Access: Protected
1256 // Description: The private implementation of poll(), this assumes
1257 // the lock is already held.
1258 ////////////////////////////////////////////////////////////////////
1259 void AsyncTaskChain::
1260 do_poll() {
1261  thread_consider_yield();
1262  if (_num_tasks == 0) {
1263  return;
1264  }
1265 
1266  do_start_threads();
1267 
1268  if (!_threads.empty()) {
1269  return;
1270  }
1271 
1272  if (_num_busy_threads != 0) {
1273  // We are recursively nested within another task. Return, with a warning.
1274  task_cat.warning()
1275  << "Ignoring recursive poll() within another task.\n";
1276  return;
1277  }
1278 
1279  nassertv(!_pickup_mode);
1280 
1281  do {
1282  while (!_active.empty()) {
1283  if (_state == S_shutdown || _state == S_interrupted) {
1284  return;
1285  }
1286  int frame = _manager->_clock->get_frame_count();
1287  if (_current_frame != frame) {
1288  _current_frame = frame;
1289  _time_in_frame = 0.0;
1290  _block_till_next_frame = false;
1291  }
1292  if (_block_till_next_frame ||
1293  (_frame_budget >= 0.0 && _time_in_frame >= _frame_budget)) {
1294  // If we've exceeded our budget, stop here. We'll resume from
1295  // this point at the next call to poll().
1296  cleanup_pickup_mode();
1297  return;
1298  }
1299 
1300  _current_sort = _active.front()->get_sort();
1301 
1302  // Normally, there won't be any threads running at the same time
1303  // we're in poll(). But it's possible, if someone calls
1304  // set_num_threads() while we're processing.
1305  _num_busy_threads++;
1306  service_one_task(NULL);
1307  _num_busy_threads--;
1308  _cvar.notify_all();
1309 
1310  if (!_threads.empty()) {
1311  return;
1312  }
1313  }
1314 
1315  finish_sort_group();
1316  } while (_pickup_mode);
1317 }
1318 
1319 ////////////////////////////////////////////////////////////////////
1320 // Function: AsyncTaskChain::cleanup_pickup_mode
1321 // Access: Protected
1322 // Description: Clean up the damage from setting pickup mode. This
1323 // means we restore the _active and _next_active lists
1324 // as they should have been without pickup mode, for
1325 // next frame. Assumes the lock is held.
1326 ////////////////////////////////////////////////////////////////////
1327 void AsyncTaskChain::
1328 cleanup_pickup_mode() {
1329  if (_pickup_mode) {
1330  _pickup_mode = false;
1331 
1332  // Move everything to the _next_active queue.
1333  _next_active.insert(_next_active.end(), _this_active.begin(), _this_active.end());
1334  _this_active.clear();
1335  _next_active.insert(_next_active.end(), _active.begin(), _active.end());
1336  _active.clear();
1337 
1338  // Now finish the epoch properly.
1339  finish_sort_group();
1340  }
1341 }
1342 
1343 ////////////////////////////////////////////////////////////////////
1344 // Function: AsyncTaskChain::do_output
1345 // Access: Protected
1346 // Description: The private implementation of output(), this assumes
1347 // the lock is already held.
1348 ////////////////////////////////////////////////////////////////////
1349 void AsyncTaskChain::
1350 do_output(ostream &out) const {
1351  if (_manager != (AsyncTaskManager *)NULL) {
1352  out << _manager->get_type() << " " << _manager->get_name();
1353  } else {
1354  out << "(no manager)";
1355  }
1356  out << " task chain " << get_name()
1357  << "; " << _num_tasks << " tasks";
1358 }
1359 
1360 ////////////////////////////////////////////////////////////////////
1361 // Function: AsyncTaskChain::do_write
1362 // Access: Protected
1363 // Description: The private implementation of write(), this assumes
1364 // the lock is already held.
1365 ////////////////////////////////////////////////////////////////////
1366 void AsyncTaskChain::
1367 do_write(ostream &out, int indent_level) const {
1368  indent(out, indent_level)
1369  << "Task chain \"" << get_name() << "\"\n";
1370  if (_num_threads > 0) {
1371  indent(out, indent_level + 2)
1372  << _num_threads << " threads, priority " << _thread_priority << "\n";
1373  }
1374  if (_frame_budget >= 0.0) {
1375  indent(out, indent_level + 2)
1376  << "frame budget " << _frame_budget << " s\n";
1377  }
1378  if (_timeslice_priority) {
1379  indent(out, indent_level + 2)
1380  << "timeslice priority\n";
1381  }
1382  if (_tick_clock) {
1383  indent(out, indent_level + 2)
1384  << "tick clock\n";
1385  }
1386 
1387  static const size_t buffer_size = 1024;
1388  char buffer[buffer_size];
1389  sprintf(buffer, " %-32s %8s %8s %8s %8s %6s",
1390  "Task",
1391  "sleep(s)",
1392  "dt(ms)", "avg", "max",
1393  "sort");
1394  nassertv(strlen(buffer) < buffer_size);
1395 
1396  indent(out, indent_level)
1397  << buffer << "\n";
1398 
1399  indent(out, indent_level);
1400  for (int i = 0; i < 32+8+8+8+8+6+7; ++i) {
1401  out << '-';
1402  }
1403  out << "\n";
1404 
1405  // Collect a list of all active tasks, then sort them into order for
1406  // output.
1407  TaskHeap tasks = _active;
1408  tasks.insert(tasks.end(), _this_active.begin(), _this_active.end());
1409  tasks.insert(tasks.end(), _next_active.begin(), _next_active.end());
1410 
1411  Threads::const_iterator thi;
1412  for (thi = _threads.begin(); thi != _threads.end(); ++thi) {
1413  AsyncTask *task = (*thi)->_servicing;
1414  if (task != (AsyncTask *)NULL) {
1415  tasks.push_back(task);
1416  }
1417  }
1418 
1419  double now = _manager->_clock->get_frame_time();
1420 
1421  if (!tasks.empty()) {
1422  sort(tasks.begin(), tasks.end(), AsyncTaskSortPriority());
1423 
1424  // Since AsyncTaskSortPriority() sorts backwards (because of STL's
1425  // push_heap semantics), we go through the task list in reverse
1426  // order to print them forwards.
1427  TaskHeap::reverse_iterator ti;
1428  for (ti = tasks.rbegin(); ti != tasks.rend(); ++ti) {
1429  AsyncTask *task = (*ti);
1430  write_task_line(out, indent_level, task, now);
1431  }
1432  }
1433 
1434  // Instead of iterating through the _sleeping list in heap order,
1435  // copy it and then use repeated pops to get it out in sorted
1436  // order, for the user's satisfaction.
1437  TaskHeap sleeping = _sleeping;
1438  while (!sleeping.empty()) {
1439  PT(AsyncTask) task = sleeping.front();
1440  pop_heap(sleeping.begin(), sleeping.end(), AsyncTaskSortWakeTime());
1441  sleeping.pop_back();
1442 
1443  write_task_line(out, indent_level, task, now);
1444  }
1445 }
1446 
1447 ////////////////////////////////////////////////////////////////////
1448 // Function: AsyncTaskChain::write_task_line
1449 // Access: Protected
1450 // Description: Writes a single line for a task in the report
1451 // generated by do_write(). Assumes the lock is already
1452 // held.
1453 ////////////////////////////////////////////////////////////////////
1454 void AsyncTaskChain::
1455 write_task_line(ostream &out, int indent_level, AsyncTask *task, double now) const {
1456  char servicing_flag = ' ';
1457  if (task->_state == AsyncTask::S_servicing) {
1458  servicing_flag = '*';
1459  } else if (task->_state == AsyncTask::S_servicing_removed) {
1460  servicing_flag = '-';
1461  }
1462 
1463  static const size_t buffer_size = 1024;
1464  char buffer[buffer_size];
1465 
1466  if (task->_state == AsyncTask::S_sleeping) {
1467  // For sleeping tasks, include the wake time, as an elapsed time
1468  // in seconds.
1469  string name = task->get_name().substr(0, 32);
1470  sprintf(buffer, "%c%-32s %8.1f",
1471  servicing_flag, name.c_str(),
1472  task->_wake_time - now);
1473  } else {
1474  // For active tasks, don't include a wake time. This means we
1475  // have more space for the name.
1476  string name = task->get_name().substr(0, 41);
1477  sprintf(buffer, "%c%-41s",
1478  servicing_flag, name.c_str());
1479  }
1480  nassertv(strlen(buffer) < buffer_size);
1481 
1482  indent(out, indent_level)
1483  << buffer;
1484 
1485  if (task->_num_frames > 0) {
1486  sprintf(buffer, " %8.1f %8.1f %8.1f %6d",
1487  task->_dt * 1000.0, task->get_average_dt() * 1000.0,
1488  task->_max_dt * 1000.0,
1489  task->_sort);
1490  } else {
1491  // No statistics for a task that hasn't run yet.
1492  sprintf(buffer, " %8s %8s %8s %6d",
1493  "", "", "",
1494  task->_sort);
1495  }
1496 
1497  nassertv(strlen(buffer) < buffer_size);
1498  out << buffer << "\n";
1499 }
1500 
1501 ////////////////////////////////////////////////////////////////////
1502 // Function: AsyncTaskChain::AsyncTaskChainThread::Constructor
1503 // Access: Public
1504 // Description:
1505 ////////////////////////////////////////////////////////////////////
1506 AsyncTaskChain::AsyncTaskChainThread::
1507 AsyncTaskChainThread(const string &name, AsyncTaskChain *chain) :
1508  Thread(name, chain->get_name()),
1509  _chain(chain),
1510  _servicing(NULL)
1511 {
1512 }
1513 
1514 ////////////////////////////////////////////////////////////////////
1515 // Function: AsyncTaskChain::AsyncTaskChainThread::thread_main
1516 // Access: Public, Virtual
1517 // Description:
1518 ////////////////////////////////////////////////////////////////////
1519 void AsyncTaskChain::AsyncTaskChainThread::
1520 thread_main() {
1521  MutexHolder holder(_chain->_manager->_lock);
1522  while (_chain->_state != S_shutdown && _chain->_state != S_interrupted) {
1523  thread_consider_yield();
1524  if (!_chain->_active.empty() &&
1525  _chain->_active.front()->get_sort() == _chain->_current_sort) {
1526 
1527  int frame = _chain->_manager->_clock->get_frame_count();
1528  if (_chain->_current_frame != frame) {
1529  _chain->_current_frame = frame;
1530  _chain->_time_in_frame = 0.0;
1531  _chain->_block_till_next_frame = false;
1532  }
1533 
1534  // If we've exceeded our frame budget, sleep until the next
1535  // frame.
1536  if (_chain->_block_till_next_frame ||
1537  (_chain->_frame_budget >= 0.0 && _chain->_time_in_frame >= _chain->_frame_budget)) {
1538  while ((_chain->_block_till_next_frame ||
1539  (_chain->_frame_budget >= 0.0 && _chain->_time_in_frame >= _chain->_frame_budget)) &&
1540  _chain->_state != S_shutdown && _chain->_state != S_interrupted) {
1541  _chain->cleanup_pickup_mode();
1542  _chain->_manager->_frame_cvar.wait();
1543  frame = _chain->_manager->_clock->get_frame_count();
1544  if (_chain->_current_frame != frame) {
1545  _chain->_current_frame = frame;
1546  _chain->_time_in_frame = 0.0;
1547  _chain->_block_till_next_frame = false;
1548  }
1549  }
1550  // Now that it's the next frame, go back to the top of the loop.
1551  continue;
1552  }
1553 
1554  PStatTimer timer(_task_pcollector);
1555  _chain->_num_busy_threads++;
1556  _chain->service_one_task(this);
1557  _chain->_num_busy_threads--;
1558  _chain->_cvar.notify_all();
1559 
1560  } else {
1561  // We've finished all the available tasks of the current sort
1562  // value. We can't pick up a new task until all of the threads
1563  // finish the tasks with the same sort value.
1564  if (_chain->_num_busy_threads == 0) {
1565  // We're the last thread to finish. Update _current_sort.
1566  if (!_chain->finish_sort_group()) {
1567  // Nothing to do. Wait for more tasks to be added.
1568  if (_chain->_sleeping.empty()) {
1569  PStatTimer timer(_wait_pcollector);
1570  _chain->_cvar.wait();
1571  } else {
1572  double wake_time = _chain->do_get_next_wake_time();
1573  double now = _chain->_manager->_clock->get_frame_time();
1574  double timeout = max(wake_time - now, 0.0);
1575  PStatTimer timer(_wait_pcollector);
1576  _chain->_cvar.wait(timeout);
1577  }
1578  }
1579 
1580  } else {
1581  // Wait for the other threads to finish their current task
1582  // before we continue.
1583  PStatTimer timer(_wait_pcollector);
1584  _chain->_cvar.wait();
1585  }
1586  }
1587  }
1588 }
1589 
bool get_frame_sync() const
Returns the frame_sync flag.
int get_num_tasks() const
Returns the number of tasks that are currently active or sleeping within the task chain...
void wait()
Waits on the condition.
bool has_delay() const
Returns true if a delay has been set for this task via set_delay(), or false otherwise.
Definition: asyncTask.I:112
A class to manage a loose queue of isolated tasks, which can be performed either synchronously (in th...
ThreadPriority get_thread_priority() const
Returns the priority associated with threads that serve this task chain.
void set_frame_sync(bool frame_sync)
Sets the frame_sync flag.
void add_task(AsyncTask *task)
Adds a new AsyncTask to the collection.
A list of tasks, for instance as returned by some of the AsyncTaskManager query functions.
void start_threads()
Starts any requested threads to service the tasks on the queue.
int get_num_threads() const
Returns the number of threads that will be servicing tasks for this chain.
A lightweight class that can be used to automatically start and stop a PStatCollector around a sectio...
Definition: pStatTimer.h:34
A lightweight C++ object whose constructor calls acquire() and whose destructor calls release() on a ...
Definition: mutexHolder.h:29
int get_sort() const
Returns the task&#39;s current sort value.
Definition: asyncTask.I:198
bool has_task(AsyncTask *task) const
Returns true if the indicated task has been added to this AsyncTaskChain, false otherwise.
void wait_for_tasks()
Blocks until the task list is empty.
static Thread * get_current_thread()
Returns a pointer to the currently-executing Thread object.
Definition: thread.I:145
void set_num_threads(int num_threads)
Changes the number of threads for this task chain.
void notify_all()
Informs all of the other threads who are currently blocked on wait() that the relevant condition has ...
void acquire() const
Grabs the mutex if it is available.
Definition: mutexDirect.I:70
static bool is_threading_supported()
Returns true if threading support has been compiled in and enabled, or false if no threading is avail...
Definition: thread.I:185
A lightweight class that represents a single element that may be timed and/or counted via stats...
void set_frame_budget(double frame_budget)
Sets the maximum amount of time per frame the tasks on this chain are granted for execution...
AsyncTaskCollection get_sleeping_tasks() const
Returns the set of tasks that are sleeping (and not active) on the task chain, at the time of the cal...
A base class for all things which can have a name.
Definition: namable.h:29
double get_delay() const
Returns the delay value that has been set via set_delay, if any.
Definition: asyncTask.I:123
AsyncTaskCollection get_active_tasks() const
Returns the set of tasks that are active (and not sleeping) on the task chain, at the time of the cal...
int get_num_running_threads() const
Returns the number of threads that have been created and are actively running.
void add_tasks_from(const AsyncTaskCollection &other)
Adds all the AsyncTasks indicated in the other collection to this task.
double get_frame_budget() const
Returns the maximum amount of time per frame the tasks on this chain are granted for execution...
The AsyncTaskChain is a subset of the AsyncTaskManager.
AsyncTaskCollection get_tasks() const
Returns the set of tasks that are active or sleeping on the task chain, at the time of the call...
double get_average_dt() const
Returns the average amount of time elapsed during each of the task&#39;s previous run cycles...
Definition: asyncTask.I:302
void poll()
Runs through all the tasks in the task list, once, if the task chain is running in single-threaded mo...
void release() const
Releases the mutex.
Definition: mutexDirect.I:99
This class represents a concrete task performed by an AsyncManager.
Definition: asyncTask.h:43
void join()
Blocks the calling process until the thread terminates.
Definition: thread.I:298
A thread; that is, a lightweight process.
Definition: thread.h:51
void set_tick_clock(bool tick_clock)
Sets the tick_clock flag.
void set_timeslice_priority(bool timeslice_priority)
Sets the timeslice_priority flag.
void stop_threads()
Stops any threads that are currently running.
TypeHandle is the identifier used to differentiate C++ class types.
Definition: typeHandle.h:85
void set_thread_priority(ThreadPriority priority)
Changes the priority associated with threads that serve this task chain.
bool get_tick_clock() const
Returns the tick_clock flag.
bool get_timeslice_priority() const
Returns the timeslice_priority flag.
double get_next_wake_time() const
Returns the scheduled time (on the manager&#39;s clock) of the next sleeping task, on any task chain...