Panda3D
asyncTaskChain.cxx
1 // Filename: asyncTaskChain.cxx
2 // Created by: drose (23Aug06)
3 //
4 ////////////////////////////////////////////////////////////////////
5 //
6 // PANDA 3D SOFTWARE
7 // Copyright (c) Carnegie Mellon University. All rights reserved.
8 //
9 // All use of this software is subject to the terms of the revised BSD
10 // license. You should have received a copy of this license along
11 // with this source code in a file named "LICENSE."
12 //
13 ////////////////////////////////////////////////////////////////////
14 
15 #include "asyncTaskChain.h"
16 #include "asyncTaskManager.h"
17 #include "event.h"
18 #include "mutexHolder.h"
19 #include "indent.h"
20 #include "pStatClient.h"
21 #include "pStatTimer.h"
22 #include "clockObject.h"
23 #include "config_event.h"
24 #include <algorithm>
25 #include <stdio.h> // For sprintf/snprintf
26 
27 TypeHandle AsyncTaskChain::_type_handle;
28 
29 PStatCollector AsyncTaskChain::_task_pcollector("Task");
30 PStatCollector AsyncTaskChain::_wait_pcollector("Wait");
31 
32 ////////////////////////////////////////////////////////////////////
33 // Function: AsyncTaskChain::Constructor
34 // Access: Published
35 // Description:
36 ////////////////////////////////////////////////////////////////////
37 AsyncTaskChain::
38 AsyncTaskChain(AsyncTaskManager *manager, const string &name) :
39  Namable(name),
40  _manager(manager),
41  _cvar(manager->_lock),
42  _tick_clock(false),
43  _timeslice_priority(false),
44  _num_threads(0),
45  _thread_priority(TP_normal),
46  _frame_budget(-1.0),
47  _frame_sync(false),
48  _num_busy_threads(0),
49  _num_tasks(0),
50  _state(S_initial),
51  _current_sort(-INT_MAX),
52  _pickup_mode(false),
53  _needs_cleanup(false),
54  _current_frame(0),
55  _time_in_frame(0.0),
56  _block_till_next_frame(false)
57 {
58 }
59 
60 ////////////////////////////////////////////////////////////////////
61 // Function: AsyncTaskChain::Destructor
62 // Access: Published, Virtual
63 // Description:
64 ////////////////////////////////////////////////////////////////////
65 AsyncTaskChain::
66 ~AsyncTaskChain() {
67  // We only grab the lock if _needs_cleanup is true. This way, the
68  // temporary AsyncTaskChain objects created (and destructed) within
69  // the task manager won't risk a double-lock.
70  if (_needs_cleanup) {
71  MutexHolder holder(_manager->_lock);
72  do_cleanup();
73  }
74 }
75 
76 ////////////////////////////////////////////////////////////////////
77 // Function: AsyncTaskChain::set_tick_clock
78 // Access: Published
79 // Description: Sets the tick_clock flag. When this is true,
80 // get_clock()->tick() will be called automatically at
81 // each task epoch. This is false by default.
82 ////////////////////////////////////////////////////////////////////
84 set_tick_clock(bool tick_clock) {
85  MutexHolder holder(_manager->_lock);
86  _tick_clock = tick_clock;
87 }
88 
89 ////////////////////////////////////////////////////////////////////
90 // Function: AsyncTaskChain::get_tick_clock
91 // Access: Published
92 // Description: Returns the tick_clock flag. See set_tick_clock().
93 ////////////////////////////////////////////////////////////////////
95 get_tick_clock() const {
96  MutexHolder holder(_manager->_lock);
97  return _tick_clock;
98 }
99 
100 ////////////////////////////////////////////////////////////////////
101 // Function: AsyncTaskChain::set_num_threads
102 // Access: Published
103 // Description: Changes the number of threads for this task chain.
104 // This may require stopping the threads if they are
105 // already running.
106 ////////////////////////////////////////////////////////////////////
107 void AsyncTaskChain::
108 set_num_threads(int num_threads) {
109  nassertv(num_threads >= 0);
110 
111  if (task_cat.is_debug()) {
112  do_output(task_cat.debug());
113  task_cat.debug(false)
114  << ": set_num_threads(" << num_threads << ")\n";
115  }
116 
118  num_threads = 0;
119  }
120 
121  MutexHolder holder(_manager->_lock);
122  if (_num_threads != num_threads) {
123  do_stop_threads();
124  _num_threads = num_threads;
125 
126  if (_num_tasks != 0) {
127  do_start_threads();
128  }
129  }
130 }
131 
132 ////////////////////////////////////////////////////////////////////
133 // Function: AsyncTaskChain::get_num_threads
134 // Access: Published
135 // Description: Returns the number of threads that will be servicing
136 // tasks for this chain. Also see
137 // get_num_running_threads().
138 ////////////////////////////////////////////////////////////////////
141  MutexHolder holder(_manager->_lock);
142  return _num_threads;
143 }
144 
145 ////////////////////////////////////////////////////////////////////
146 // Function: AsyncTaskChain::get_num_running_threads
147 // Access: Published
148 // Description: Returns the number of threads that have been created
149 // and are actively running. This will return 0 before
150 // the threads have been started; it will also return 0
151 // if thread support is not available.
152 ////////////////////////////////////////////////////////////////////
155  MutexHolder holder(_manager->_lock);
156  return _threads.size();
157 }
158 
159 ////////////////////////////////////////////////////////////////////
160 // Function: AsyncTaskChain::set_thread_priority
161 // Access: Published
162 // Description: Changes the priority associated with threads that
163 // serve this task chain. This may require stopping the
164 // threads if they are already running.
165 ////////////////////////////////////////////////////////////////////
166 void AsyncTaskChain::
167 set_thread_priority(ThreadPriority priority) {
168  MutexHolder holder(_manager->_lock);
169  if (_thread_priority != priority) {
170  do_stop_threads();
171  _thread_priority = priority;
172 
173  if (_num_tasks != 0) {
174  do_start_threads();
175  }
176  }
177 }
178 
179 ////////////////////////////////////////////////////////////////////
180 // Function: AsyncTaskChain::get_thread_priority
181 // Access: Published
182 // Description: Returns the priority associated with threads that
183 // serve this task chain.
184 ////////////////////////////////////////////////////////////////////
185 ThreadPriority AsyncTaskChain::
187  MutexHolder holder(_manager->_lock);
188  return _thread_priority;
189 }
190 
191 ////////////////////////////////////////////////////////////////////
192 // Function: AsyncTaskChain::set_frame_budget
193 // Access: Published
194 // Description: Sets the maximum amount of time per frame the tasks
195 // on this chain are granted for execution. If this is
196 // less than zero, there is no limit; if it is >= 0, it
197 // represents a maximum amount of time (in seconds) that
198 // will be used to execute tasks. If this time is
199 // exceeded in any one frame, the task chain will stop
200 // executing tasks until the next frame, as defined by
201 // the TaskManager's clock.
202 ////////////////////////////////////////////////////////////////////
203 void AsyncTaskChain::
204 set_frame_budget(double frame_budget) {
205  MutexHolder holder(_manager->_lock);
206  _frame_budget = frame_budget;
207 }
208 
209 ////////////////////////////////////////////////////////////////////
210 // Function: AsyncTaskChain::get_frame_budget
211 // Access: Published
212 // Description: Returns the maximum amount of time per frame the
213 // tasks on this chain are granted for execution. See
214 // set_frame_budget().
215 ////////////////////////////////////////////////////////////////////
216 double AsyncTaskChain::
218  MutexHolder holder(_manager->_lock);
219  return _frame_budget;
220 }
221 
222 ////////////////////////////////////////////////////////////////////
223 // Function: AsyncTaskChain::set_frame_sync
224 // Access: Published
225 // Description: Sets the frame_sync flag. When this flag is true,
226 // this task chain will be forced to sync with the
227 // TaskManager's clock. It will run no faster than one
228 // epoch per clock frame.
229 //
230 // When this flag is false, the default, the task chain
231 // will finish all of its tasks and then immediately
232 // start from the first task again, regardless of the
233 // clock frame. When it is true, the task chain will
234 // finish all of its tasks and then wait for the clock
235 // to tick to the next frame before resuming the first
236 // task.
237 //
238 // This only makes sense for threaded task chains.
239 // Non-threaded task chains are automatically
240 // synchronous.
241 ////////////////////////////////////////////////////////////////////
242 void AsyncTaskChain::
243 set_frame_sync(bool frame_sync) {
244  MutexHolder holder(_manager->_lock);
245  _frame_sync = frame_sync;
246 }
247 
248 ////////////////////////////////////////////////////////////////////
249 // Function: AsyncTaskChain::get_frame_sync
250 // Access: Published
251 // Description: Returns the frame_sync flag. See set_frame_sync().
252 ////////////////////////////////////////////////////////////////////
253 bool AsyncTaskChain::
254 get_frame_sync() const {
255  MutexHolder holder(_manager->_lock);
256  return _frame_sync;
257 }
258 
259 ////////////////////////////////////////////////////////////////////
260 // Function: AsyncTaskChain::set_timeslice_priority
261 // Access: Published
262 // Description: Sets the timeslice_priority flag. This changes
263 // the interpretation of priority, and the number of
264 // times per epoch each task will run.
265 //
266 // When this flag is true, some tasks might not run in
267 // any given epoch. Instead, tasks with priority higher
268 // than 1 will be given precedence, in proportion to the
269 // amount of time they have already used. This gives
270 // higher-priority tasks more runtime than
271 // lower-priority tasks. Each task gets the amount of
272 // time proportional to its priority value, so a task
273 // with priority 100 will get five times as much
274 // processing time as a task with priority 20. For
275 // these purposes, priority values less than 1 are
276 // deemed to be equal to 1.
277 //
278 // When this flag is false (the default), all tasks are
279 // run exactly once each epoch, round-robin style.
280 // Priority is only used to determine which task runs
281 // first within tasks of the same sort value.
282 ////////////////////////////////////////////////////////////////////
283 void AsyncTaskChain::
284 set_timeslice_priority(bool timeslice_priority) {
285  MutexHolder holder(_manager->_lock);
286  _timeslice_priority = timeslice_priority;
287 }
288 
289 ////////////////////////////////////////////////////////////////////
290 // Function: AsyncTaskChain::get_timeslice_priority
291 // Access: Published
292 // Description: Returns the timeslice_priority flag. This changes
293 // the interpretation of priority, and the number of
294 // times per epoch each task will run. See
295 // set_timeslice_priority().
296 ////////////////////////////////////////////////////////////////////
297 bool AsyncTaskChain::
299  MutexHolder holder(_manager->_lock);
300  return _timeslice_priority;
301 }
302 
303 ////////////////////////////////////////////////////////////////////
304 // Function: AsyncTaskChain::stop_threads
305 // Access: Published
306 // Description: Stops any threads that are currently running. If any
307 // tasks are still pending and have not yet been picked
308 // up by a thread, they will not be serviced unless
309 // poll() or start_threads() is later called.
310 ////////////////////////////////////////////////////////////////////
311 void AsyncTaskChain::
313  if (_state == S_started || _state == S_interrupted) {
314  // Clean up all of the threads.
315  MutexHolder holder(_manager->_lock);
316  do_stop_threads();
317  }
318 }
319 
320 ////////////////////////////////////////////////////////////////////
321 // Function: AsyncTaskChain::start_threads
322 // Access: Published
323 // Description: Starts any requested threads to service the tasks on
324 // the queue. This is normally not necessary, since
325 // adding a task will start the threads automatically.
326 ////////////////////////////////////////////////////////////////////
327 void AsyncTaskChain::
329  if (_state == S_initial || _state == S_interrupted) {
330  MutexHolder holder(_manager->_lock);
331  do_start_threads();
332  }
333 }
334 
335 ////////////////////////////////////////////////////////////////////
336 // Function: AsyncTaskChain::has_task
337 // Access: Published
338 // Description: Returns true if the indicated task has been added to
339 // this AsyncTaskChain, false otherwise.
340 ////////////////////////////////////////////////////////////////////
341 bool AsyncTaskChain::
342 has_task(AsyncTask *task) const {
343  MutexHolder holder(_manager->_lock);
344 
345  if (task->_chain != this) {
346  nassertr(!do_has_task(task), false);
347  return false;
348  }
349 
350  if (task->_state == AsyncTask::S_servicing_removed) {
351  return false;
352  }
353 
354  // The task might not actually be in the active queue, since it
355  // might be being serviced right now. That's OK.
356  return true;
357 }
358 
359 ////////////////////////////////////////////////////////////////////
360 // Function: AsyncTaskChain::wait_for_tasks
361 // Access: Published
362 // Description: Blocks until the task list is empty.
363 ////////////////////////////////////////////////////////////////////
364 void AsyncTaskChain::
366  MutexHolder holder(_manager->_lock);
367  do_wait_for_tasks();
368 }
369 
370 ////////////////////////////////////////////////////////////////////
371 // Function: AsyncTaskChain::get_num_tasks
372 // Access: Published
373 // Description: Returns the number of tasks that are currently active
374 // or sleeping within the task chain.
375 ////////////////////////////////////////////////////////////////////
377 get_num_tasks() const {
378  MutexHolder holder(_manager->_lock);
379  return _num_tasks;
380 }
381 
382 ////////////////////////////////////////////////////////////////////
383 // Function: AsyncTaskChain::get_tasks
384 // Access: Published
385 // Description: Returns the set of tasks that are active or sleeping
386 // on the task chain, at the time of the call.
387 ////////////////////////////////////////////////////////////////////
389 get_tasks() const {
390  MutexHolder holder(_manager->_lock);
391  AsyncTaskCollection result = do_get_active_tasks();
392  result.add_tasks_from(do_get_sleeping_tasks());
393  return result;
394 }
395 
396 ////////////////////////////////////////////////////////////////////
397 // Function: AsyncTaskChain::get_active_tasks
398 // Access: Published
399 // Description: Returns the set of tasks that are active (and not
400 // sleeping) on the task chain, at the time of the
401 // call.
402 ////////////////////////////////////////////////////////////////////
405  MutexHolder holder(_manager->_lock);
406  return do_get_active_tasks();
407 }
408 
409 ////////////////////////////////////////////////////////////////////
410 // Function: AsyncTaskChain::get_sleeping_tasks
411 // Access: Published
412 // Description: Returns the set of tasks that are sleeping (and not
413 // active) on the task chain, at the time of the
414 // call.
415 ////////////////////////////////////////////////////////////////////
418  MutexHolder holder(_manager->_lock);
419  return do_get_sleeping_tasks();
420 }
421 
422 ////////////////////////////////////////////////////////////////////
423 // Function: AsyncTaskChain::poll
424 // Access: Published
425 // Description: Runs through all the tasks in the task list, once, if
426 // the task chain is running in single-threaded mode
427 // (no threads available). This method does nothing in
428 // threaded mode, so it may safely be called in either
429 // case.
430 //
431 // Normally, you would not call this function directly;
432 // instead, call AsyncTaskManager::poll(), which polls
433 // all of the task chains in sequence.
434 ////////////////////////////////////////////////////////////////////
435 void AsyncTaskChain::
436 poll() {
437  MutexHolder holder(_manager->_lock);
438  do_poll();
439 }
440 
441 ////////////////////////////////////////////////////////////////////
442 // Function: AsyncTaskChain::get_next_wake_time
443 // Access: Published
444 // Description: Returns the scheduled time (on the manager's clock)
445 // of the next sleeping task, on any task chain, to
446 // awaken. Returns -1 if there are no sleeping tasks.
447 ////////////////////////////////////////////////////////////////////
448 double AsyncTaskChain::
450  MutexHolder holder(_manager->_lock);
451  return do_get_next_wake_time();
452 }
453 
454 ////////////////////////////////////////////////////////////////////
455 // Function: AsyncTaskChain::output
456 // Access: Published, Virtual
457 // Description:
458 ////////////////////////////////////////////////////////////////////
459 void AsyncTaskChain::
460 output(ostream &out) const {
461  MutexHolder holder(_manager->_lock);
462  do_output(out);
463 }
464 
465 ////////////////////////////////////////////////////////////////////
466 // Function: AsyncTaskChain::write
467 // Access: Published, Virtual
468 // Description:
469 ////////////////////////////////////////////////////////////////////
470 void AsyncTaskChain::
471 write(ostream &out, int indent_level) const {
472  MutexHolder holder(_manager->_lock);
473  do_write(out, indent_level);
474 }
475 
476 ////////////////////////////////////////////////////////////////////
477 // Function: AsyncTaskChain::do_add
478 // Access: Protected
479 // Description: Adds the indicated task to the active queue. It is
480 // an error if the task is already added to this or any
481 // other active queue.
482 //
483 // This is normally called only by the AsyncTaskManager.
484 // Assumes the lock is already held.
485 ////////////////////////////////////////////////////////////////////
486 void AsyncTaskChain::
487 do_add(AsyncTask *task) {
488  nassertv(task->_chain == NULL &&
489  task->_manager == NULL &&
490  task->_chain_name == get_name() &&
491  task->_state == AsyncTask::S_inactive);
492  nassertv(!do_has_task(task));
493 
494  do_start_threads();
495 
496  task->_chain = this;
497  task->_manager = _manager;
498 
499  double now = _manager->_clock->get_frame_time();
500  task->_start_time = now;
501  task->_start_frame = _manager->_clock->get_frame_count();
502 
503  _manager->add_task_by_name(task);
504 
505  if (task->has_delay()) {
506  // This is a deferred task. Add it to the sleeping queue.
507  task->_wake_time = now + task->get_delay();
508  task->_start_time = task->_wake_time;
509  task->_state = AsyncTask::S_sleeping;
510  _sleeping.push_back(task);
511  push_heap(_sleeping.begin(), _sleeping.end(), AsyncTaskSortWakeTime());
512 
513  } else {
514  // This is an active task. Add it to the active set.
515  task->_state = AsyncTask::S_active;
516  if (task_cat.is_spam()) {
517  task_cat.spam()
518  << "Adding " << *task << " with sort " << task->get_sort()
519  << " to chain " << get_name() << " with current_sort "
520  << _current_sort << "\n";
521  }
522  if (task->get_sort() >= _current_sort) {
523  // It will run this frame.
524  _active.push_back(task);
525  push_heap(_active.begin(), _active.end(), AsyncTaskSortPriority());
526  } else {
527  // It will run next frame.
528  _next_active.push_back(task);
529  }
530  }
531  ++_num_tasks;
532  ++(_manager->_num_tasks);
533  _needs_cleanup = true;
534 
535  _cvar.notify_all();
536 }
537 
538 ////////////////////////////////////////////////////////////////////
539 // Function: AsyncTaskChain::do_remove
540 // Access: Protected
541 // Description: Removes the indicated task from this chain. Returns
542 // true if removed, false otherwise. Assumes the lock
543 // is already held. The task->upon_death() method is
544 // *not* called.
545 ////////////////////////////////////////////////////////////////////
546 bool AsyncTaskChain::
547 do_remove(AsyncTask *task) {
548  bool removed = false;
549 
550  nassertr(task->_chain == this, false);
551 
552  switch (task->_state) {
553  case AsyncTask::S_servicing:
554  // This task is being serviced.
555  task->_state = AsyncTask::S_servicing_removed;
556  removed = true;
557  break;
558 
559  case AsyncTask::S_servicing_removed:
560  // Being serviced, though it will be removed later.
561  break;
562 
563  case AsyncTask::S_sleeping:
564  // Sleeping, easy.
565  {
566  int index = find_task_on_heap(_sleeping, task);
567  nassertr(index != -1, false);
568  _sleeping.erase(_sleeping.begin() + index);
569  make_heap(_sleeping.begin(), _sleeping.end(), AsyncTaskSortWakeTime());
570  removed = true;
571  cleanup_task(task, false, false);
572  }
573  break;
574 
575  case AsyncTask::S_active:
576  {
577  // Active, but not being serviced, easy.
578  int index = find_task_on_heap(_active, task);
579  if (index != -1) {
580  _active.erase(_active.begin() + index);
581  make_heap(_active.begin(), _active.end(), AsyncTaskSortPriority());
582  } else {
583  index = find_task_on_heap(_next_active, task);
584  if (index != -1) {
585  _next_active.erase(_next_active.begin() + index);
586  } else {
587  index = find_task_on_heap(_this_active, task);
588  nassertr(index != -1, false);
589  }
590  }
591  removed = true;
592  cleanup_task(task, false, false);
593  }
594 
595  default:
596  break;
597  }
598 
599  return removed;
600 }
601 
602 ////////////////////////////////////////////////////////////////////
603 // Function: AsyncTaskChain::do_wait_for_tasks
604 // Access: Protected
605 // Description: Blocks until the task list is empty. Assumes the
606 // lock is held.
607 ////////////////////////////////////////////////////////////////////
608 void AsyncTaskChain::
609 do_wait_for_tasks() {
610  do_start_threads();
611 
612  if (_threads.empty()) {
613  // Non-threaded case.
614  while (_num_tasks > 0) {
615  if (_state == S_shutdown || _state == S_interrupted) {
616  return;
617  }
618  do_poll();
619  }
620 
621  } else {
622  // Threaded case.
623  while (_num_tasks > 0) {
624  if (_state == S_shutdown || _state == S_interrupted) {
625  return;
626  }
627 
628  PStatTimer timer(_wait_pcollector);
629  _cvar.wait();
630  }
631  }
632 }
633 
634 ////////////////////////////////////////////////////////////////////
635 // Function: AsyncTaskChain::do_cleanup
636 // Access: Protected
637 // Description: Stops all threads and messily empties the task list.
638 // This is intended to be called on destruction only.
639 // Assumes the lock is already held.
640 ////////////////////////////////////////////////////////////////////
641 void AsyncTaskChain::
642 do_cleanup() {
643  if (task_cat.is_spam()) {
644  do_output(task_cat.spam());
645  task_cat.spam(false)
646  << ": do_cleanup()\n";
647  }
648  do_stop_threads();
649  _num_threads = 0;
650 
651  // Don't call the upon_death functions while we clean up the tasks.
652  // Instead, store all the tasks in a list as we clean them up, and
653  // then call the upon_death functions all at once. We do this
654  // because calling upon_death wil release the lock, allowing the
655  // iterators to become invalid.
656 
657  TaskHeap dead;
658  dead.reserve(_num_tasks);
659 
660  _needs_cleanup = false;
661 
662  TaskHeap::const_iterator ti;
663  for (ti = _active.begin(); ti != _active.end(); ++ti) {
664  AsyncTask *task = (*ti);
665  dead.push_back(task);
666  cleanup_task(task, false, false);
667  }
668  for (ti = _this_active.begin(); ti != _this_active.end(); ++ti) {
669  AsyncTask *task = (*ti);
670  dead.push_back(task);
671  cleanup_task(task, false, false);
672  }
673  for (ti = _next_active.begin(); ti != _next_active.end(); ++ti) {
674  AsyncTask *task = (*ti);
675  dead.push_back(task);
676  cleanup_task(task, false, false);
677  }
678  for (ti = _sleeping.begin(); ti != _sleeping.end(); ++ti) {
679  AsyncTask *task = (*ti);
680  dead.push_back(task);
681  cleanup_task(task, false, false);
682  }
683 
684  // There might still be one task remaining: the currently-executing
685  // task.
686  nassertv(_num_tasks == 0 || _num_tasks == 1);
687 
688  // Now go back and call the upon_death functions.
689  _manager->_lock.release();
690  for (ti = dead.begin(); ti != dead.end(); ++ti) {
691  (*ti)->upon_death(_manager, false);
692  }
693  _manager->_lock.acquire();
694 
695  if (task_cat.is_spam()) {
696  do_output(task_cat.spam());
697  task_cat.spam(false)
698  << ": done do_cleanup()\n";
699  }
700 }
701 
702 ////////////////////////////////////////////////////////////////////
703 // Function: AsyncTaskChain::do_has_task
704 // Access: Protected
705 // Description: Returns true if the task is on one of the task lists,
706 // false if it is not (false may mean that the task is
707 // currently being serviced). Assumes the lock is
708 // currently held.
709 ////////////////////////////////////////////////////////////////////
710 bool AsyncTaskChain::
711 do_has_task(AsyncTask *task) const {
712  return (find_task_on_heap(_active, task) != -1 ||
713  find_task_on_heap(_next_active, task) != -1 ||
714  find_task_on_heap(_sleeping, task) != -1 ||
715  find_task_on_heap(_this_active, task) != -1);
716 }
717 
718 ////////////////////////////////////////////////////////////////////
719 // Function: AsyncTaskChain::find_task_on_heap
720 // Access: Protected
721 // Description: Returns the index number of the indicated task within
722 // the specified task list, or -1 if the task is not
723 // found in the list (this may mean that it is currently
724 // being serviced). Assumes that the lock is currently
725 // held.
726 ////////////////////////////////////////////////////////////////////
727 int AsyncTaskChain::
728 find_task_on_heap(const TaskHeap &heap, AsyncTask *task) const {
729  for (int i = 0; i < (int)heap.size(); ++i) {
730  if (heap[i] == task) {
731  return i;
732  }
733  }
734 
735  return -1;
736 }
737 
738 ////////////////////////////////////////////////////////////////////
739 // Function: AsyncTaskChain::service_one_task
740 // Access: Protected
741 // Description: Pops a single task off the active queue, services it,
742 // and restores it to the end of the queue. This is
743 // called internally only within one of the task
744 // threads. Assumes the lock is already held.
745 //
746 // Note that the lock may be temporarily released by
747 // this method.
748 ////////////////////////////////////////////////////////////////////
749 void AsyncTaskChain::
750 service_one_task(AsyncTaskChain::AsyncTaskChainThread *thread) {
751  if (!_active.empty()) {
752  PT(AsyncTask) task = _active.front();
753  pop_heap(_active.begin(), _active.end(), AsyncTaskSortPriority());
754  _active.pop_back();
755 
756  if (thread != (AsyncTaskChain::AsyncTaskChainThread *)NULL) {
757  thread->_servicing = task;
758  }
759 
760  if (task_cat.is_spam()) {
761  task_cat.spam()
762  << "Servicing " << *task << " in "
763  << *Thread::get_current_thread() << "\n";
764  }
765 
766  nassertv(task->get_sort() == _current_sort);
767  nassertv(task->_state == AsyncTask::S_active);
768  task->_state = AsyncTask::S_servicing;
769  task->_servicing_thread = thread;
770 
771  AsyncTask::DoneStatus ds = task->unlock_and_do_task();
772 
773  if (thread != (AsyncTaskChain::AsyncTaskChainThread *)NULL) {
774  thread->_servicing = NULL;
775  }
776  task->_servicing_thread = NULL;
777 
778  if (task->_chain == this) {
779  if (task->_state == AsyncTask::S_servicing_removed) {
780  // This task wants to kill itself.
781  cleanup_task(task, true, false);
782 
783  } else if (task->_chain_name != get_name()) {
784  // The task wants to jump to a different chain.
785  PT(AsyncTask) hold_task = task;
786  cleanup_task(task, false, false);
787  task->jump_to_task_chain(_manager);
788 
789  } else {
790  switch (ds) {
791  case AsyncTask::DS_cont:
792  // The task is still alive; put it on the next frame's active
793  // queue.
794  task->_state = AsyncTask::S_active;
795  _next_active.push_back(task);
796  _cvar.notify_all();
797  break;
798 
799  case AsyncTask::DS_again:
800  // The task wants to sleep again.
801  {
802  double now = _manager->_clock->get_frame_time();
803  task->_wake_time = now + task->get_delay();
804  task->_start_time = task->_wake_time;
805  task->_state = AsyncTask::S_sleeping;
806  _sleeping.push_back(task);
807  push_heap(_sleeping.begin(), _sleeping.end(), AsyncTaskSortWakeTime());
808  if (task_cat.is_spam()) {
809  task_cat.spam()
810  << "Sleeping " << *task << ", wake time at "
811  << task->_wake_time - now << "\n";
812  }
813  _cvar.notify_all();
814  }
815  break;
816 
817  case AsyncTask::DS_pickup:
818  // The task wants to run again this frame if possible.
819  task->_state = AsyncTask::S_active;
820  _this_active.push_back(task);
821  _cvar.notify_all();
822  break;
823 
824  case AsyncTask::DS_interrupt:
825  // The task had an exception and wants to raise a big flag.
826  task->_state = AsyncTask::S_active;
827  _next_active.push_back(task);
828  if (_state == S_started) {
829  _state = S_interrupted;
830  _cvar.notify_all();
831  }
832  break;
833 
834  default:
835  // The task has finished.
836  cleanup_task(task, true, true);
837  }
838  }
839  } else {
840  task_cat.error()
841  << "Task is no longer on chain " << get_name()
842  << ": " << *task << "\n";
843  }
844 
845  if (task_cat.is_spam()) {
846  task_cat.spam()
847  << "Done servicing " << *task << " in "
848  << *Thread::get_current_thread() << "\n";
849  }
850  }
851  thread_consider_yield();
852 }
853 
854 ////////////////////////////////////////////////////////////////////
855 // Function: AsyncTaskChain::cleanup_task
856 // Access: Protected
857 // Description: Called internally when a task has completed (or been
858 // interrupted) and is about to be removed from the
859 // active queue. Assumes the lock is held.
860 //
861 // If upon_death is true, then task->upon_death() will
862 // also be called, with the indicated clean_exit
863 // parameter.
864 //
865 // Note that the lock may be temporarily released by
866 // this method.
867 ////////////////////////////////////////////////////////////////////
868 void AsyncTaskChain::
869 cleanup_task(AsyncTask *task, bool upon_death, bool clean_exit) {
870  if (task_cat.is_spam()) {
871  do_output(task_cat.spam());
872  task_cat.spam(false)
873  << ": cleanup_task(" << *task << ", " << upon_death << ", " << clean_exit
874  << ")\n";
875  }
876 
877  nassertv(task->_chain == this);
878  PT(AsyncTask) hold_task = task;
879 
880  task->_state = AsyncTask::S_inactive;
881  task->_chain = NULL;
882  task->_manager = NULL;
883  --_num_tasks;
884  --(_manager->_num_tasks);
885 
886  _manager->remove_task_by_name(task);
887 
888  if (upon_death) {
889  _manager->_lock.release();
890  task->upon_death(_manager, clean_exit);
891  _manager->_lock.acquire();
892  }
893 }
894 
895 ////////////////////////////////////////////////////////////////////
896 // Function: AsyncTaskChain::finish_sort_group
897 // Access: Protected
898 // Description: Called internally when all tasks of a given sort
899 // value have been completed, and it is time to
900 // increment to the next sort value, or begin the next
901 // epoch. Assumes the lock is held.
902 //
903 // Returns true if there are more tasks on the queue
904 // after this operation, or false if the task list is
905 // empty and we need to wait.
906 ////////////////////////////////////////////////////////////////////
907 bool AsyncTaskChain::
908 finish_sort_group() {
909  nassertr(_num_busy_threads == 0, true);
910 
911  if (!_threads.empty()) {
912  PStatClient::thread_tick(get_name());
913  }
914 
915  if (!_active.empty()) {
916  // There are more tasks; just set the next sort value.
917  nassertr(_current_sort < _active.front()->get_sort(), true);
918  _current_sort = _active.front()->get_sort();
919  _cvar.notify_all();
920  return true;
921  }
922 
923  // There are no more tasks in this epoch; advance to the next epoch.
924 
925  if (!_this_active.empty() && _frame_budget >= 0.0) {
926  // Enter pickup mode. This is a special mode at the end of the
927  // epoch in which we are just re-running the tasks that think they
928  // can still run within the frame, in an attempt to use up our
929  // frame budget.
930 
931  if (task_cat.is_spam()) {
932  do_output(task_cat.spam());
933  task_cat.spam(false)
934  << ": next epoch (pickup mode)\n";
935  }
936 
937  _pickup_mode = true;
938  _active.swap(_this_active);
939 
940  } else {
941  // Not in pickup mode.
942 
943  if (task_cat.is_spam()) {
944  do_output(task_cat.spam());
945  task_cat.spam(false)
946  << ": next epoch\n";
947  }
948 
949  _pickup_mode = false;
950 
951  // Here, there's no difference between _this_active and
952  // _next_active. Combine them.
953  _next_active.insert(_next_active.end(), _this_active.begin(), _this_active.end());
954  _this_active.clear();
955 
956  _active.swap(_next_active);
957 
958  // We only tick the clock and wake sleepers in normal mode, the
959  // first time through the task list; not in pickup mode when we
960  // are re-running the stragglers just to use up our frame budget.
961 
962  if (_tick_clock) {
963  if (task_cat.is_spam()) {
964  do_output(task_cat.spam());
965  task_cat.spam(false)
966  << ": tick clock\n";
967  }
968  _manager->_clock->tick();
969  _manager->_frame_cvar.notify_all();
970 
971  } else if (_frame_sync) {
972  // If we're a synced chain, we have to wait at the end of the
973  // epoch for someone else to tick the clock.
974  _block_till_next_frame = true;
975  }
976 
977  // Check for any sleeping tasks that need to be woken.
978  double now = _manager->_clock->get_frame_time();
979  while (!_sleeping.empty() && _sleeping.front()->_wake_time <= now) {
980  PT(AsyncTask) task = _sleeping.front();
981  if (task_cat.is_spam()) {
982  task_cat.spam()
983  << "Waking " << *task << ", wake time at "
984  << task->_wake_time - now << "\n";
985  }
986  pop_heap(_sleeping.begin(), _sleeping.end(), AsyncTaskSortWakeTime());
987  _sleeping.pop_back();
988  task->_state = AsyncTask::S_active;
989  task->_start_frame = _manager->_clock->get_frame_count();
990  _active.push_back(task);
991  }
992 
993  if (task_cat.is_spam()) {
994  if (_sleeping.empty()) {
995  task_cat.spam()
996  << "No more tasks on sleeping queue.\n";
997  } else {
998  task_cat.spam()
999  << "Next sleeper: " << *_sleeping.front() << ", wake time at "
1000  << _sleeping.front()->_wake_time - now << "\n";
1001  }
1002  }
1003 
1004  // Any tasks that are on the active queue at the beginning of the
1005  // epoch are deemed to have run one frame (or to be about to).
1006  TaskHeap::const_iterator ti;
1007  for (ti = _active.begin(); ti != _active.end(); ++ti) {
1008  AsyncTask *task = (*ti);
1009  ++task->_num_frames;
1010  }
1011  }
1012 
1013  if (_timeslice_priority) {
1014  filter_timeslice_priority();
1015  }
1016 
1017  nassertr((size_t)_num_tasks == _active.size() + _this_active.size() + _next_active.size() + _sleeping.size(), true);
1018  make_heap(_active.begin(), _active.end(), AsyncTaskSortPriority());
1019 
1020  _current_sort = -INT_MAX;
1021 
1022  if (!_active.empty()) {
1023  // Signal the threads to start executing the first task again.
1024  _cvar.notify_all();
1025  return true;
1026  }
1027 
1028  // There are no tasks to be had anywhere. Chill.
1029  _pickup_mode = false;
1030  nassertr(_this_active.empty(), false);
1031  return false;
1032 }
1033 
1034 ////////////////////////////////////////////////////////////////////
1035 // Function: AsyncTaskChain::filter_timeslice_priority
1036 // Access: Protected
1037 // Description: Called to filter the _active tasks list when we are
1038 // in the special timeslice_priority mode. In this
1039 // mode, go through and postpone any tasks that have
1040 // already exceeded their priority budget for this
1041 // epoch.
1042 //
1043 // Assumes the lock is already held.
1044 ////////////////////////////////////////////////////////////////////
1045 void AsyncTaskChain::
1046 filter_timeslice_priority() {
1047  if (_active.empty()) {
1048  return;
1049  }
1050  nassertv(_timeslice_priority);
1051 
1052  // We must first sum up the average per-epoch runtime of each task.
1053  double net_runtime = 0.0;
1054  int net_priority = 0;
1055 
1056  TaskHeap::iterator ti;
1057  for (ti = _active.begin(); ti != _active.end(); ++ti) {
1058  AsyncTask *task = (*ti);
1059  double runtime = max(task->get_average_dt(), 0.0);
1060  int priority = max(task->_priority, 1);
1061  net_runtime += runtime;
1062  net_priority += priority;
1063  }
1064 
1065  // That gives us a timeslice budget per priority value.
1066  double average_budget = net_runtime / (double)net_priority;
1067 
1068  TaskHeap keep, postpone;
1069  for (ti = _active.begin(); ti != _active.end(); ++ti) {
1070  AsyncTask *task = (*ti);
1071  double runtime = max(task->get_average_dt(), 0.0);
1072  int priority = max(task->_priority, 1);
1073  double consumed = runtime / (double)priority;
1074  // cerr << *task << " consumed " << consumed << " vs. " << average_budget << "\n";
1075  if (consumed > average_budget) {
1076  // Postpone. Run this task next epoch.
1077  postpone.push_back(task);
1078  } else {
1079  // Keep, and run this task this epoch.
1080  keep.push_back(task);
1081  }
1082  }
1083 
1084  if (keep.empty()) {
1085  // Hmm, nothing to keep. Grab the postponed task with the highest
1086  // priority and keep that instead.
1087  nassertv(!postpone.empty());
1088  ti = postpone.begin();
1089  TaskHeap::iterator max_ti = ti;
1090  ++ti;
1091  while (ti != postpone.end()) {
1092  if ((*ti)->_priority > (*max_ti)->_priority) {
1093  max_ti = ti;
1094  }
1095  }
1096 
1097  // cerr << "Nothing to keep, keeping " << *(*max_ti) << " instead\n";
1098 
1099  keep.push_back(*max_ti);
1100  postpone.erase(max_ti);
1101  }
1102 
1103  _active.swap(keep);
1104  if (_pickup_mode) {
1105  _this_active.insert(_this_active.end(), postpone.begin(), postpone.end());
1106  } else {
1107  _next_active.insert(_next_active.end(), postpone.begin(), postpone.end());
1108  }
1109 
1110  nassertv(!_active.empty());
1111 }
1112 
1113 ////////////////////////////////////////////////////////////////////
1114 // Function: AsyncTaskChain::do_stop_threads
1115 // Access: Protected
1116 // Description: The private implementation of stop_threads; assumes
1117 // the lock is already held.
1118 ////////////////////////////////////////////////////////////////////
1119 void AsyncTaskChain::
1120 do_stop_threads() {
1121  if (_state == S_started || _state == S_interrupted) {
1122  if (task_cat.is_debug() && !_threads.empty()) {
1123  task_cat.debug()
1124  << "Stopping " << _threads.size()
1125  << " threads for " << _manager->get_name()
1126  << " chain " << get_name()
1127  << " in " << *Thread::get_current_thread() << "\n";
1128  }
1129 
1130  _state = S_shutdown;
1131  _cvar.notify_all();
1132  _manager->_frame_cvar.notify_all();
1133 
1134  Threads wait_threads;
1135  wait_threads.swap(_threads);
1136 
1137  // We have to release the lock while we join, so the threads can
1138  // wake up and see that we're shutting down.
1139  _manager->_lock.release();
1140  Threads::iterator ti;
1141  for (ti = wait_threads.begin(); ti != wait_threads.end(); ++ti) {
1142  if (task_cat.is_debug()) {
1143  task_cat.debug()
1144  << "Waiting for " << *(*ti) << " in "
1145  << *Thread::get_current_thread() << "\n";
1146  }
1147  (*ti)->join();
1148  if (task_cat.is_spam()) {
1149  task_cat.spam()
1150  << "Done waiting for " << *(*ti) << " in "
1151  << *Thread::get_current_thread() << "\n";
1152  }
1153  }
1154  _manager->_lock.acquire();
1155 
1156  _state = S_initial;
1157 
1158  // There might be one busy "thread" still: the main thread.
1159  nassertv(_num_busy_threads == 0 || _num_busy_threads == 1);
1160  cleanup_pickup_mode();
1161  }
1162 }
1163 
1164 ////////////////////////////////////////////////////////////////////
1165 // Function: AsyncTaskChain::do_start_threads
1166 // Access: Protected
1167 // Description: The private implementation of start_threads; assumes
1168 // the lock is already held.
1169 ////////////////////////////////////////////////////////////////////
1170 void AsyncTaskChain::
1171 do_start_threads() {
1172  if (_state == S_interrupted) {
1173  do_stop_threads();
1174  }
1175 
1176  if (_state == S_initial) {
1177  _state = S_started;
1178  if (Thread::is_threading_supported() && _num_threads > 0) {
1179  if (task_cat.is_debug()) {
1180  task_cat.debug()
1181  << "Starting " << _num_threads << " threads for "
1182  << _manager->get_name() << " chain " << get_name() << "\n";
1183  }
1184  _needs_cleanup = true;
1185  _threads.reserve(_num_threads);
1186  for (int i = 0; i < _num_threads; ++i) {
1187  ostringstream strm;
1188  strm << _manager->get_name() << "_" << get_name() << "_" << i;
1189  PT(AsyncTaskChainThread) thread = new AsyncTaskChainThread(strm.str(), this);
1190  if (thread->start(_thread_priority, true)) {
1191  _threads.push_back(thread);
1192  }
1193  }
1194  }
1195  }
1196 }
1197 
1198 ////////////////////////////////////////////////////////////////////
1199 // Function: AsyncTaskChain::do_get_active_tasks
1200 // Access: Protected
1201 // Description: Returns the set of tasks that are active (and not
1202 // sleeping) on the task chain, at the time of the
1203 // call. Assumes the lock is held.
1204 ////////////////////////////////////////////////////////////////////
1205 AsyncTaskCollection AsyncTaskChain::
1206 do_get_active_tasks() const {
1207  AsyncTaskCollection result;
1208 
1209  Threads::const_iterator thi;
1210  for (thi = _threads.begin(); thi != _threads.end(); ++thi) {
1211  AsyncTask *task = (*thi)->_servicing;
1212  if (task != (AsyncTask *)NULL) {
1213  result.add_task(task);
1214  }
1215  }
1216  TaskHeap::const_iterator ti;
1217  for (ti = _active.begin(); ti != _active.end(); ++ti) {
1218  AsyncTask *task = (*ti);
1219  result.add_task(task);
1220  }
1221  for (ti = _this_active.begin(); ti != _this_active.end(); ++ti) {
1222  AsyncTask *task = (*ti);
1223  result.add_task(task);
1224  }
1225  for (ti = _next_active.begin(); ti != _next_active.end(); ++ti) {
1226  AsyncTask *task = (*ti);
1227  result.add_task(task);
1228  }
1229 
1230  return result;
1231 }
1232 
1233 ////////////////////////////////////////////////////////////////////
1234 // Function: AsyncTaskChain::do_get_sleeping_tasks
1235 // Access: Published
1236 // Description: Returns the set of tasks that are sleeping (and not
1237 // active) on the task chain, at the time of the
1238 // call. Assumes the lock is held.
1239 ////////////////////////////////////////////////////////////////////
1240 AsyncTaskCollection AsyncTaskChain::
1241 do_get_sleeping_tasks() const {
1242  AsyncTaskCollection result;
1243 
1244  TaskHeap::const_iterator ti;
1245  for (ti = _sleeping.begin(); ti != _sleeping.end(); ++ti) {
1246  AsyncTask *task = (*ti);
1247  result.add_task(task);
1248  }
1249 
1250  return result;
1251 }
1252 
1253 ////////////////////////////////////////////////////////////////////
1254 // Function: AsyncTaskChain::do_poll
1255 // Access: Protected
1256 // Description: The private implementation of poll(), this assumes
1257 // the lock is already held.
1258 ////////////////////////////////////////////////////////////////////
1259 void AsyncTaskChain::
1260 do_poll() {
1261  thread_consider_yield();
1262  if (_num_tasks == 0) {
1263  return;
1264  }
1265 
1266  do_start_threads();
1267 
1268  if (!_threads.empty()) {
1269  return;
1270  }
1271 
1272  if (_num_busy_threads != 0) {
1273  // We are recursively nested within another task. Return, with a warning.
1274  task_cat.warning()
1275  << "Ignoring recursive poll() within another task.\n";
1276  return;
1277  }
1278 
1279  nassertv(!_pickup_mode);
1280 
1281  do {
1282  while (!_active.empty()) {
1283  if (_state == S_shutdown || _state == S_interrupted) {
1284  return;
1285  }
1286  int frame = _manager->_clock->get_frame_count();
1287  if (_current_frame != frame) {
1288  _current_frame = frame;
1289  _time_in_frame = 0.0;
1290  _block_till_next_frame = false;
1291  }
1292  if (_block_till_next_frame ||
1293  (_frame_budget >= 0.0 && _time_in_frame >= _frame_budget)) {
1294  // If we've exceeded our budget, stop here. We'll resume from
1295  // this point at the next call to poll().
1296  cleanup_pickup_mode();
1297  return;
1298  }
1299 
1300  _current_sort = _active.front()->get_sort();
1301 
1302  // Normally, there won't be any threads running at the same time
1303  // we're in poll(). But it's possible, if someone calls
1304  // set_num_threads() while we're processing.
1305  _num_busy_threads++;
1306  service_one_task(NULL);
1307  _num_busy_threads--;
1308  _cvar.notify_all();
1309 
1310  if (!_threads.empty()) {
1311  return;
1312  }
1313  }
1314 
1315  finish_sort_group();
1316  } while (_pickup_mode);
1317 }
1318 
1319 ////////////////////////////////////////////////////////////////////
1320 // Function: AsyncTaskChain::cleanup_pickup_mode
1321 // Access: Protected
1322 // Description: Clean up the damage from setting pickup mode. This
1323 // means we restore the _active and _next_active lists
1324 // as they should have been without pickup mode, for
1325 // next frame. Assumes the lock is held.
1326 ////////////////////////////////////////////////////////////////////
1327 void AsyncTaskChain::
1328 cleanup_pickup_mode() {
1329  if (_pickup_mode) {
1330  _pickup_mode = false;
1331 
1332  // Move everything to the _next_active queue.
1333  _next_active.insert(_next_active.end(), _this_active.begin(), _this_active.end());
1334  _this_active.clear();
1335  _next_active.insert(_next_active.end(), _active.begin(), _active.end());
1336  _active.clear();
1337 
1338  // Now finish the epoch properly.
1339  finish_sort_group();
1340  }
1341 }
1342 
1343 ////////////////////////////////////////////////////////////////////
1344 // Function: AsyncTaskChain::do_output
1345 // Access: Protected
1346 // Description: The private implementation of output(), this assumes
1347 // the lock is already held.
1348 ////////////////////////////////////////////////////////////////////
1349 void AsyncTaskChain::
1350 do_output(ostream &out) const {
1351  if (_manager != (AsyncTaskManager *)NULL) {
1352  out << _manager->get_type() << " " << _manager->get_name();
1353  } else {
1354  out << "(no manager)";
1355  }
1356  out << " task chain " << get_name()
1357  << "; " << _num_tasks << " tasks";
1358 }
1359 
1360 ////////////////////////////////////////////////////////////////////
1361 // Function: AsyncTaskChain::do_write
1362 // Access: Protected
1363 // Description: The private implementation of write(), this assumes
1364 // the lock is already held.
1365 ////////////////////////////////////////////////////////////////////
1366 void AsyncTaskChain::
1367 do_write(ostream &out, int indent_level) const {
1368  indent(out, indent_level)
1369  << "Task chain \"" << get_name() << "\"\n";
1370  if (_num_threads > 0) {
1371  indent(out, indent_level + 2)
1372  << _num_threads << " threads, priority " << _thread_priority << "\n";
1373  }
1374  if (_frame_budget >= 0.0) {
1375  indent(out, indent_level + 2)
1376  << "frame budget " << _frame_budget << " s\n";
1377  }
1378  if (_timeslice_priority) {
1379  indent(out, indent_level + 2)
1380  << "timeslice priority\n";
1381  }
1382  if (_tick_clock) {
1383  indent(out, indent_level + 2)
1384  << "tick clock\n";
1385  }
1386 
1387  static const size_t buffer_size = 1024;
1388  char buffer[buffer_size];
1389  sprintf(buffer, " %-32s %8s %8s %8s %8s %6s",
1390  "Task",
1391  "sleep(s)",
1392  "dt(ms)", "avg", "max",
1393  "sort");
1394  nassertv(strlen(buffer) < buffer_size);
1395 
1396  indent(out, indent_level)
1397  << buffer << "\n";
1398 
1399  indent(out, indent_level);
1400  for (int i = 0; i < 32+8+8+8+8+6+7; ++i) {
1401  out << '-';
1402  }
1403  out << "\n";
1404 
1405  // Collect a list of all active tasks, then sort them into order for
1406  // output.
1407  TaskHeap tasks = _active;
1408  tasks.insert(tasks.end(), _this_active.begin(), _this_active.end());
1409  tasks.insert(tasks.end(), _next_active.begin(), _next_active.end());
1410 
1411  Threads::const_iterator thi;
1412  for (thi = _threads.begin(); thi != _threads.end(); ++thi) {
1413  AsyncTask *task = (*thi)->_servicing;
1414  if (task != (AsyncTask *)NULL) {
1415  tasks.push_back(task);
1416  }
1417  }
1418 
1419  double now = _manager->_clock->get_frame_time();
1420 
1421  if (!tasks.empty()) {
1422  sort(tasks.begin(), tasks.end(), AsyncTaskSortPriority());
1423 
1424  // Since AsyncTaskSortPriority() sorts backwards (because of STL's
1425  // push_heap semantics), we go through the task list in reverse
1426  // order to print them forwards.
1427  TaskHeap::reverse_iterator ti;
1428  for (ti = tasks.rbegin(); ti != tasks.rend(); ++ti) {
1429  AsyncTask *task = (*ti);
1430  write_task_line(out, indent_level, task, now);
1431  }
1432  }
1433 
1434  // Instead of iterating through the _sleeping list in heap order,
1435  // copy it and then use repeated pops to get it out in sorted
1436  // order, for the user's satisfaction.
1437  TaskHeap sleeping = _sleeping;
1438  while (!sleeping.empty()) {
1439  PT(AsyncTask) task = sleeping.front();
1440  pop_heap(sleeping.begin(), sleeping.end(), AsyncTaskSortWakeTime());
1441  sleeping.pop_back();
1442 
1443  write_task_line(out, indent_level, task, now);
1444  }
1445 }
1446 
1447 ////////////////////////////////////////////////////////////////////
1448 // Function: AsyncTaskChain::write_task_line
1449 // Access: Protected
1450 // Description: Writes a single line for a task in the report
1451 // generated by do_write(). Assumes the lock is already
1452 // held.
1453 ////////////////////////////////////////////////////////////////////
1454 void AsyncTaskChain::
1455 write_task_line(ostream &out, int indent_level, AsyncTask *task, double now) const {
1456  char servicing_flag = ' ';
1457  if (task->_state == AsyncTask::S_servicing) {
1458  servicing_flag = '*';
1459  } else if (task->_state == AsyncTask::S_servicing_removed) {
1460  servicing_flag = '-';
1461  }
1462 
1463  static const size_t buffer_size = 1024;
1464  char buffer[buffer_size];
1465 
1466  if (task->_state == AsyncTask::S_sleeping) {
1467  // For sleeping tasks, include the wake time, as an elapsed time
1468  // in seconds.
1469  string name = task->get_name().substr(0, 32);
1470  sprintf(buffer, "%c%-32s %8.1f",
1471  servicing_flag, name.c_str(),
1472  task->_wake_time - now);
1473  } else {
1474  // For active tasks, don't include a wake time. This means we
1475  // have more space for the name.
1476  string name = task->get_name().substr(0, 41);
1477  sprintf(buffer, "%c%-41s",
1478  servicing_flag, name.c_str());
1479  }
1480  nassertv(strlen(buffer) < buffer_size);
1481 
1482  indent(out, indent_level)
1483  << buffer;
1484 
1485  if (task->_num_frames > 0) {
1486  sprintf(buffer, " %8.1f %8.1f %8.1f %6d",
1487  task->_dt * 1000.0, task->get_average_dt() * 1000.0,
1488  task->_max_dt * 1000.0,
1489  task->_sort);
1490  } else {
1491  // No statistics for a task that hasn't run yet.
1492  sprintf(buffer, " %8s %8s %8s %6d",
1493  "", "", "",
1494  task->_sort);
1495  }
1496 
1497  nassertv(strlen(buffer) < buffer_size);
1498  out << buffer << "\n";
1499 }
1500 
1501 ////////////////////////////////////////////////////////////////////
1502 // Function: AsyncTaskChain::AsyncTaskChainThread::Constructor
1503 // Access: Public
1504 // Description:
1505 ////////////////////////////////////////////////////////////////////
1506 AsyncTaskChain::AsyncTaskChainThread::
1507 AsyncTaskChainThread(const string &name, AsyncTaskChain *chain) :
1508  Thread(name, chain->get_name()),
1509  _chain(chain),
1510  _servicing(NULL)
1511 {
1512 }
1513 
1514 ////////////////////////////////////////////////////////////////////
1515 // Function: AsyncTaskChain::AsyncTaskChainThread::thread_main
1516 // Access: Public, Virtual
1517 // Description:
1518 ////////////////////////////////////////////////////////////////////
1519 void AsyncTaskChain::AsyncTaskChainThread::
1520 thread_main() {
1521  MutexHolder holder(_chain->_manager->_lock);
1522  while (_chain->_state != S_shutdown && _chain->_state != S_interrupted) {
1523  thread_consider_yield();
1524  if (!_chain->_active.empty() &&
1525  _chain->_active.front()->get_sort() == _chain->_current_sort) {
1526 
1527  int frame = _chain->_manager->_clock->get_frame_count();
1528  if (_chain->_current_frame != frame) {
1529  _chain->_current_frame = frame;
1530  _chain->_time_in_frame = 0.0;
1531  _chain->_block_till_next_frame = false;
1532  }
1533 
1534  // If we've exceeded our frame budget, sleep until the next
1535  // frame.
1536  if (_chain->_block_till_next_frame ||
1537  (_chain->_frame_budget >= 0.0 && _chain->_time_in_frame >= _chain->_frame_budget)) {
1538  while ((_chain->_block_till_next_frame ||
1539  (_chain->_frame_budget >= 0.0 && _chain->_time_in_frame >= _chain->_frame_budget)) &&
1540  _chain->_state != S_shutdown && _chain->_state != S_interrupted) {
1541  _chain->cleanup_pickup_mode();
1542  _chain->_manager->_frame_cvar.wait();
1543  frame = _chain->_manager->_clock->get_frame_count();
1544  if (_chain->_current_frame != frame) {
1545  _chain->_current_frame = frame;
1546  _chain->_time_in_frame = 0.0;
1547  _chain->_block_till_next_frame = false;
1548  }
1549  }
1550  // Now that it's the next frame, go back to the top of the loop.
1551  continue;
1552  }
1553 
1554  PStatTimer timer(_task_pcollector);
1555  _chain->_num_busy_threads++;
1556  _chain->service_one_task(this);
1557  _chain->_num_busy_threads--;
1558  _chain->_cvar.notify_all();
1559 
1560  } else {
1561  // We've finished all the available tasks of the current sort
1562  // value. We can't pick up a new task until all of the threads
1563  // finish the tasks with the same sort value.
1564  if (_chain->_num_busy_threads == 0) {
1565  // We're the last thread to finish. Update _current_sort.
1566  if (!_chain->finish_sort_group()) {
1567  // Nothing to do. Wait for more tasks to be added.
1568  if (_chain->_sleeping.empty()) {
1569  PStatTimer timer(_wait_pcollector);
1570  _chain->_cvar.wait();
1571  } else {
1572  double wake_time = _chain->do_get_next_wake_time();
1573  double now = _chain->_manager->_clock->get_frame_time();
1574  double timeout = max(wake_time - now, 0.0);
1575  PStatTimer timer(_wait_pcollector);
1576  _chain->_cvar.wait(timeout);
1577  }
1578  }
1579 
1580  } else {
1581  // Wait for the other threads to finish their current task
1582  // before we continue.
1583  PStatTimer timer(_wait_pcollector);
1584  _chain->_cvar.wait();
1585  }
1586  }
1587  }
1588 }
1589 
bool get_timeslice_priority() const
Returns the timeslice_priority flag.
AsyncTaskCollection get_tasks() const
Returns the set of tasks that are active or sleeping on the task chain, at the time of the call...
void wait()
Waits on the condition.
A class to manage a loose queue of isolated tasks, which can be performed either synchronously (in th...
void set_frame_sync(bool frame_sync)
Sets the frame_sync flag.
double get_average_dt() const
Returns the average amount of time elapsed during each of the task&#39;s previous run cycles...
Definition: asyncTask.I:302
bool get_tick_clock() const
Returns the tick_clock flag.
double get_frame_budget() const
Returns the maximum amount of time per frame the tasks on this chain are granted for execution...
void add_task(AsyncTask *task)
Adds a new AsyncTask to the collection.
A list of tasks, for instance as returned by some of the AsyncTaskManager query functions.
bool has_delay() const
Returns true if a delay has been set for this task via set_delay(), or false otherwise.
Definition: asyncTask.I:112
void start_threads()
Starts any requested threads to service the tasks on the queue.
A lightweight class that can be used to automatically start and stop a PStatCollector around a sectio...
Definition: pStatTimer.h:34
A lightweight C++ object whose constructor calls acquire() and whose destructor calls release() on a ...
Definition: mutexHolder.h:29
void wait_for_tasks()
Blocks until the task list is empty.
static Thread * get_current_thread()
Returns a pointer to the currently-executing Thread object.
Definition: thread.I:145
void set_num_threads(int num_threads)
Changes the number of threads for this task chain.
void notify_all()
Informs all of the other threads who are currently blocked on wait() that the relevant condition has ...
static bool is_threading_supported()
Returns true if threading support has been compiled in and enabled, or false if no threading is avail...
Definition: thread.I:185
A lightweight class that represents a single element that may be timed and/or counted via stats...
void set_frame_budget(double frame_budget)
Sets the maximum amount of time per frame the tasks on this chain are granted for execution...
A base class for all things which can have a name.
Definition: namable.h:29
void acquire() const
Grabs the mutex if it is available.
Definition: mutexDirect.I:70
bool get_frame_sync() const
Returns the frame_sync flag.
int get_sort() const
Returns the task&#39;s current sort value.
Definition: asyncTask.I:198
int get_num_running_threads() const
Returns the number of threads that have been created and are actively running.
void add_tasks_from(const AsyncTaskCollection &other)
Adds all the AsyncTasks indicated in the other collection to this task.
ThreadPriority get_thread_priority() const
Returns the priority associated with threads that serve this task chain.
int get_num_tasks() const
Returns the number of tasks that are currently active or sleeping within the task chain...
The AsyncTaskChain is a subset of the AsyncTaskManager.
AsyncTaskCollection get_active_tasks() const
Returns the set of tasks that are active (and not sleeping) on the task chain, at the time of the cal...
int get_num_threads() const
Returns the number of threads that will be servicing tasks for this chain.
void poll()
Runs through all the tasks in the task list, once, if the task chain is running in single-threaded mo...
This class represents a concrete task performed by an AsyncManager.
Definition: asyncTask.h:43
void join()
Blocks the calling process until the thread terminates.
Definition: thread.I:298
A thread; that is, a lightweight process.
Definition: thread.h:51
void set_tick_clock(bool tick_clock)
Sets the tick_clock flag.
void set_timeslice_priority(bool timeslice_priority)
Sets the timeslice_priority flag.
void stop_threads()
Stops any threads that are currently running.
void release() const
Releases the mutex.
Definition: mutexDirect.I:99
TypeHandle is the identifier used to differentiate C++ class types.
Definition: typeHandle.h:85
void set_thread_priority(ThreadPriority priority)
Changes the priority associated with threads that serve this task chain.
double get_next_wake_time() const
Returns the scheduled time (on the manager&#39;s clock) of the next sleeping task, on any task chain...
bool has_task(AsyncTask *task) const
Returns true if the indicated task has been added to this AsyncTaskChain, false otherwise.
double get_delay() const
Returns the delay value that has been set via set_delay, if any.
Definition: asyncTask.I:123
AsyncTaskCollection get_sleeping_tasks() const
Returns the set of tasks that are sleeping (and not active) on the task chain, at the time of the cal...