Panda3D
 All Classes Functions Variables Enumerations
asyncTaskChain.cxx
00001 // Filename: asyncTaskChain.cxx
00002 // Created by:  drose (23Aug06)
00003 //
00004 ////////////////////////////////////////////////////////////////////
00005 //
00006 // PANDA 3D SOFTWARE
00007 // Copyright (c) Carnegie Mellon University.  All rights reserved.
00008 //
00009 // All use of this software is subject to the terms of the revised BSD
00010 // license.  You should have received a copy of this license along
00011 // with this source code in a file named "LICENSE."
00012 //
00013 ////////////////////////////////////////////////////////////////////
00014 
00015 #include "asyncTaskChain.h"
00016 #include "asyncTaskManager.h"
00017 #include "event.h"
00018 #include "mutexHolder.h"
00019 #include "indent.h"
00020 #include "pStatClient.h"
00021 #include "pStatTimer.h"
00022 #include "clockObject.h"
00023 #include <algorithm>
00024 #include <stdio.h>  // For sprintf/snprintf
00025 
00026 TypeHandle AsyncTaskChain::_type_handle;
00027 
00028 PStatCollector AsyncTaskChain::_task_pcollector("Task");
00029 PStatCollector AsyncTaskChain::_wait_pcollector("Wait");
00030 
00031 ////////////////////////////////////////////////////////////////////
00032 //     Function: AsyncTaskChain::Constructor
00033 //       Access: Published
00034 //  Description:
00035 ////////////////////////////////////////////////////////////////////
00036 AsyncTaskChain::
00037 AsyncTaskChain(AsyncTaskManager *manager, const string &name) :
00038   Namable(name),
00039   _manager(manager),
00040   _cvar(manager->_lock),
00041   _tick_clock(false),
00042   _timeslice_priority(false),
00043   _num_threads(0),
00044   _thread_priority(TP_normal),
00045   _frame_budget(-1.0),
00046   _frame_sync(false),
00047   _num_busy_threads(0),
00048   _num_tasks(0),
00049   _state(S_initial),
00050   _current_sort(-INT_MAX),
00051   _pickup_mode(false),
00052   _needs_cleanup(false),
00053   _current_frame(0),
00054   _time_in_frame(0.0),
00055   _block_till_next_frame(false)
00056 {
00057 }
00058 
00059 ////////////////////////////////////////////////////////////////////
00060 //     Function: AsyncTaskChain::Destructor
00061 //       Access: Published, Virtual
00062 //  Description:
00063 ////////////////////////////////////////////////////////////////////
00064 AsyncTaskChain::
00065 ~AsyncTaskChain() {
00066   // We only grab the lock if _needs_cleanup is true.  This way, the
00067   // temporary AsyncTaskChain objects created (and destructed) within
00068   // the task manager won't risk a double-lock.
00069   if (_needs_cleanup) {
00070     MutexHolder holder(_manager->_lock);
00071     do_cleanup();
00072   }
00073 }
00074 
00075 ////////////////////////////////////////////////////////////////////
00076 //     Function: AsyncTaskChain::set_tick_clock
00077 //       Access: Published
00078 //  Description: Sets the tick_clock flag.  When this is true,
00079 //               get_clock()->tick() will be called automatically at
00080 //               each task epoch.  This is false by default.
00081 ////////////////////////////////////////////////////////////////////
00082 void AsyncTaskChain::
00083 set_tick_clock(bool tick_clock) {
00084   MutexHolder holder(_manager->_lock);
00085   _tick_clock = tick_clock;
00086 }
00087 
00088 ////////////////////////////////////////////////////////////////////
00089 //     Function: AsyncTaskChain::get_tick_clock
00090 //       Access: Published
00091 //  Description: Returns the tick_clock flag.  See set_tick_clock().
00092 ////////////////////////////////////////////////////////////////////
00093 bool AsyncTaskChain::
00094 get_tick_clock() const {
00095   MutexHolder holder(_manager->_lock);
00096   return _tick_clock;
00097 }
00098 
00099 ////////////////////////////////////////////////////////////////////
00100 //     Function: AsyncTaskChain::set_num_threads
00101 //       Access: Published
00102 //  Description: Changes the number of threads for this task chain.
00103 //               This may require stopping the threads if they are
00104 //               already running.
00105 ////////////////////////////////////////////////////////////////////
00106 void AsyncTaskChain::
00107 set_num_threads(int num_threads) {
00108   nassertv(num_threads >= 0);
00109 
00110   if (task_cat.is_debug()) {
00111     do_output(task_cat.debug());
00112     task_cat.debug(false)
00113       << ": set_num_threads(" << num_threads << ")\n";
00114   }
00115 
00116   if (!Thread::is_threading_supported()) {
00117     num_threads = 0;
00118   }
00119 
00120   MutexHolder holder(_manager->_lock);
00121   if (_num_threads != num_threads) {
00122     do_stop_threads();
00123     _num_threads = num_threads;
00124 
00125     if (_num_tasks != 0) {
00126       do_start_threads();
00127     }
00128   }
00129 }
00130 
00131 ////////////////////////////////////////////////////////////////////
00132 //     Function: AsyncTaskChain::get_num_threads
00133 //       Access: Published
00134 //  Description: Returns the number of threads that will be servicing
00135 //               tasks for this chain.  Also see
00136 //               get_num_running_threads().
00137 ////////////////////////////////////////////////////////////////////
00138 int AsyncTaskChain::
00139 get_num_threads() const {
00140   MutexHolder holder(_manager->_lock);
00141   return _num_threads;
00142 }
00143 
00144 ////////////////////////////////////////////////////////////////////
00145 //     Function: AsyncTaskChain::get_num_running_threads
00146 //       Access: Published
00147 //  Description: Returns the number of threads that have been created
00148 //               and are actively running.  This will return 0 before
00149 //               the threads have been started; it will also return 0
00150 //               if thread support is not available.
00151 ////////////////////////////////////////////////////////////////////
00152 int AsyncTaskChain::
00153 get_num_running_threads() const {
00154   MutexHolder holder(_manager->_lock);
00155   return _threads.size();
00156 }
00157 
00158 ////////////////////////////////////////////////////////////////////
00159 //     Function: AsyncTaskChain::set_thread_priority
00160 //       Access: Published
00161 //  Description: Changes the priority associated with threads that
00162 //               serve this task chain.  This may require stopping the
00163 //               threads if they are already running.
00164 ////////////////////////////////////////////////////////////////////
00165 void AsyncTaskChain::
00166 set_thread_priority(ThreadPriority priority) {
00167   MutexHolder holder(_manager->_lock);
00168   if (_thread_priority != priority) {
00169     do_stop_threads();
00170     _thread_priority = priority;
00171 
00172     if (_num_tasks != 0) {
00173       do_start_threads();
00174     }
00175   }
00176 }
00177 
00178 ////////////////////////////////////////////////////////////////////
00179 //     Function: AsyncTaskChain::get_thread_priority
00180 //       Access: Published
00181 //  Description: Returns the priority associated with threads that
00182 //               serve this task chain.
00183 ////////////////////////////////////////////////////////////////////
00184 ThreadPriority AsyncTaskChain::
00185 get_thread_priority() const {
00186   MutexHolder holder(_manager->_lock);
00187   return _thread_priority;
00188 }
00189 
00190 ////////////////////////////////////////////////////////////////////
00191 //     Function: AsyncTaskChain::set_frame_budget
00192 //       Access: Published
00193 //  Description: Sets the maximum amount of time per frame the tasks
00194 //               on this chain are granted for execution.  If this is
00195 //               less than zero, there is no limit; if it is >= 0, it
00196 //               represents a maximum amount of time (in seconds) that
00197 //               will be used to execute tasks.  If this time is
00198 //               exceeded in any one frame, the task chain will stop
00199 //               executing tasks until the next frame, as defined by
00200 //               the TaskManager's clock.
00201 ////////////////////////////////////////////////////////////////////
00202 void AsyncTaskChain::
00203 set_frame_budget(double frame_budget) {
00204   MutexHolder holder(_manager->_lock);
00205   _frame_budget = frame_budget;
00206 }
00207 
00208 ////////////////////////////////////////////////////////////////////
00209 //     Function: AsyncTaskChain::get_frame_budget
00210 //       Access: Published
00211 //  Description: Returns the maximum amount of time per frame the
00212 //               tasks on this chain are granted for execution.  See
00213 //               set_frame_budget().
00214 ////////////////////////////////////////////////////////////////////
00215 double AsyncTaskChain::
00216 get_frame_budget() const {
00217   MutexHolder holder(_manager->_lock);
00218   return _frame_budget;
00219 }
00220 
00221 ////////////////////////////////////////////////////////////////////
00222 //     Function: AsyncTaskChain::set_frame_sync
00223 //       Access: Published
00224 //  Description: Sets the frame_sync flag.  When this flag is true,
00225 //               this task chain will be forced to sync with the
00226 //               TaskManager's clock.  It will run no faster than one
00227 //               epoch per clock frame.
00228 //
00229 //               When this flag is false, the default, the task chain
00230 //               will finish all of its tasks and then immediately
00231 //               start from the first task again, regardless of the
00232 //               clock frame.  When it is true, the task chain will
00233 //               finish all of its tasks and then wait for the clock
00234 //               to tick to the next frame before resuming the first
00235 //               task.
00236 //
00237 //               This only makes sense for threaded task chains.
00238 //               Non-threaded task chains are automatically
00239 //               synchronous.
00240 ////////////////////////////////////////////////////////////////////
00241 void AsyncTaskChain::
00242 set_frame_sync(bool frame_sync) {
00243   MutexHolder holder(_manager->_lock);
00244   _frame_sync = frame_sync;
00245 }
00246 
00247 ////////////////////////////////////////////////////////////////////
00248 //     Function: AsyncTaskChain::get_frame_sync
00249 //       Access: Published
00250 //  Description: Returns the frame_sync flag.  See set_frame_sync().
00251 ////////////////////////////////////////////////////////////////////
00252 bool AsyncTaskChain::
00253 get_frame_sync() const {
00254   MutexHolder holder(_manager->_lock);
00255   return _frame_sync;
00256 }
00257 
00258 ////////////////////////////////////////////////////////////////////
00259 //     Function: AsyncTaskChain::set_timeslice_priority
00260 //       Access: Published
00261 //  Description: Sets the timeslice_priority flag.  This changes
00262 //               the interpretation of priority, and the number of
00263 //               times per epoch each task will run.  
00264 //
00265 //               When this flag is true, some tasks might not run in
00266 //               any given epoch.  Instead, tasks with priority higher
00267 //               than 1 will be given precedence, in proportion to the
00268 //               amount of time they have already used.  This gives
00269 //               higher-priority tasks more runtime than
00270 //               lower-priority tasks.  Each task gets the amount of
00271 //               time proportional to its priority value, so a task
00272 //               with priority 100 will get five times as much
00273 //               processing time as a task with priority 20.  For
00274 //               these purposes, priority values less than 1 are
00275 //               deemed to be equal to 1.
00276 //
00277 //               When this flag is false (the default), all tasks are
00278 //               run exactly once each epoch, round-robin style.
00279 //               Priority is only used to determine which task runs
00280 //               first within tasks of the same sort value.
00281 ////////////////////////////////////////////////////////////////////
00282 void AsyncTaskChain::
00283 set_timeslice_priority(bool timeslice_priority) {
00284   MutexHolder holder(_manager->_lock);
00285   _timeslice_priority = timeslice_priority;
00286 }
00287 
00288 ////////////////////////////////////////////////////////////////////
00289 //     Function: AsyncTaskChain::get_timeslice_priority
00290 //       Access: Published
00291 //  Description: Returns the timeslice_priority flag.  This changes
00292 //               the interpretation of priority, and the number of
00293 //               times per epoch each task will run.  See
00294 //               set_timeslice_priority().
00295 ////////////////////////////////////////////////////////////////////
00296 bool AsyncTaskChain::
00297 get_timeslice_priority() const {
00298   MutexHolder holder(_manager->_lock);
00299   return _timeslice_priority;
00300 }
00301 
00302 ////////////////////////////////////////////////////////////////////
00303 //     Function: AsyncTaskChain::stop_threads
00304 //       Access: Published
00305 //  Description: Stops any threads that are currently running.  If any
00306 //               tasks are still pending and have not yet been picked
00307 //               up by a thread, they will not be serviced unless
00308 //               poll() or start_threads() is later called.
00309 ////////////////////////////////////////////////////////////////////
00310 void AsyncTaskChain::
00311 stop_threads() {
00312   if (_state == S_started || _state == S_interrupted) {
00313     // Clean up all of the threads.
00314     MutexHolder holder(_manager->_lock);
00315     do_stop_threads();
00316   }
00317 }
00318 
00319 ////////////////////////////////////////////////////////////////////
00320 //     Function: AsyncTaskChain::start_threads
00321 //       Access: Published
00322 //  Description: Starts any requested threads to service the tasks on
00323 //               the queue.  This is normally not necessary, since
00324 //               adding a task will start the threads automatically.
00325 ////////////////////////////////////////////////////////////////////
00326 void AsyncTaskChain::
00327 start_threads() {
00328   if (_state == S_initial || _state == S_interrupted) {
00329     MutexHolder holder(_manager->_lock);
00330     do_start_threads();
00331   }
00332 }
00333 
00334 ////////////////////////////////////////////////////////////////////
00335 //     Function: AsyncTaskChain::has_task
00336 //       Access: Published
00337 //  Description: Returns true if the indicated task has been added to
00338 //               this AsyncTaskChain, false otherwise.
00339 ////////////////////////////////////////////////////////////////////
00340 bool AsyncTaskChain::
00341 has_task(AsyncTask *task) const {
00342   MutexHolder holder(_manager->_lock);
00343 
00344   if (task->_chain != this) {
00345     nassertr(!do_has_task(task), false);
00346     return false;
00347   }
00348 
00349   if (task->_state == AsyncTask::S_servicing_removed) {
00350     return false;
00351   }
00352 
00353   // The task might not actually be in the active queue, since it
00354   // might be being serviced right now.  That's OK.
00355   return true;
00356 }
00357 
00358 ////////////////////////////////////////////////////////////////////
00359 //     Function: AsyncTaskChain::wait_for_tasks
00360 //       Access: Published
00361 //  Description: Blocks until the task list is empty.
00362 ////////////////////////////////////////////////////////////////////
00363 void AsyncTaskChain::
00364 wait_for_tasks() {
00365   MutexHolder holder(_manager->_lock);
00366   do_wait_for_tasks();
00367 }
00368 
00369 ////////////////////////////////////////////////////////////////////
00370 //     Function: AsyncTaskChain::get_num_tasks
00371 //       Access: Published
00372 //  Description: Returns the number of tasks that are currently active
00373 //               or sleeping within the task chain.
00374 ////////////////////////////////////////////////////////////////////
00375 int AsyncTaskChain::
00376 get_num_tasks() const {
00377   MutexHolder holder(_manager->_lock);
00378   return _num_tasks;
00379 }
00380 
00381 ////////////////////////////////////////////////////////////////////
00382 //     Function: AsyncTaskChain::get_tasks
00383 //       Access: Published
00384 //  Description: Returns the set of tasks that are active or sleeping
00385 //               on the task chain, at the time of the call.
00386 ////////////////////////////////////////////////////////////////////
00387 AsyncTaskCollection AsyncTaskChain::
00388 get_tasks() const {
00389   MutexHolder holder(_manager->_lock);
00390   AsyncTaskCollection result = do_get_active_tasks();
00391   result.add_tasks_from(do_get_sleeping_tasks());
00392   return result;
00393 }
00394 
00395 ////////////////////////////////////////////////////////////////////
00396 //     Function: AsyncTaskChain::get_active_tasks
00397 //       Access: Published
00398 //  Description: Returns the set of tasks that are active (and not
00399 //               sleeping) on the task chain, at the time of the
00400 //               call.
00401 ////////////////////////////////////////////////////////////////////
00402 AsyncTaskCollection AsyncTaskChain::
00403 get_active_tasks() const {
00404   MutexHolder holder(_manager->_lock);
00405   return do_get_active_tasks();
00406 }
00407 
00408 ////////////////////////////////////////////////////////////////////
00409 //     Function: AsyncTaskChain::get_sleeping_tasks
00410 //       Access: Published
00411 //  Description: Returns the set of tasks that are sleeping (and not
00412 //               active) on the task chain, at the time of the
00413 //               call.
00414 ////////////////////////////////////////////////////////////////////
00415 AsyncTaskCollection AsyncTaskChain::
00416 get_sleeping_tasks() const {
00417   MutexHolder holder(_manager->_lock);
00418   return do_get_sleeping_tasks();
00419 }
00420 
00421 ////////////////////////////////////////////////////////////////////
00422 //     Function: AsyncTaskChain::poll
00423 //       Access: Published
00424 //  Description: Runs through all the tasks in the task list, once, if
00425 //               the task chain is running in single-threaded mode
00426 //               (no threads available).  This method does nothing in
00427 //               threaded mode, so it may safely be called in either
00428 //               case.
00429 //
00430 //               Normally, you would not call this function directly;
00431 //               instead, call AsyncTaskManager::poll(), which polls
00432 //               all of the task chains in sequence.
00433 ////////////////////////////////////////////////////////////////////
00434 void AsyncTaskChain::
00435 poll() {
00436   MutexHolder holder(_manager->_lock);
00437   do_poll();
00438 }
00439 
00440 ////////////////////////////////////////////////////////////////////
00441 //     Function: AsyncTaskChain::get_next_wake_time
00442 //       Access: Published
00443 //  Description: Returns the scheduled time (on the manager's clock)
00444 //               of the next sleeping task, on any task chain, to
00445 //               awaken.  Returns -1 if there are no sleeping tasks.
00446 ////////////////////////////////////////////////////////////////////
00447 double AsyncTaskChain::
00448 get_next_wake_time() const {
00449   MutexHolder holder(_manager->_lock);
00450   return do_get_next_wake_time();
00451 }
00452 
00453 ////////////////////////////////////////////////////////////////////
00454 //     Function: AsyncTaskChain::output
00455 //       Access: Published, Virtual
00456 //  Description: 
00457 ////////////////////////////////////////////////////////////////////
00458 void AsyncTaskChain::
00459 output(ostream &out) const {
00460   MutexHolder holder(_manager->_lock);
00461   do_output(out);
00462 }
00463 
00464 ////////////////////////////////////////////////////////////////////
00465 //     Function: AsyncTaskChain::write
00466 //       Access: Published, Virtual
00467 //  Description: 
00468 ////////////////////////////////////////////////////////////////////
00469 void AsyncTaskChain::
00470 write(ostream &out, int indent_level) const {
00471   MutexHolder holder(_manager->_lock);
00472   do_write(out, indent_level);
00473 }
00474 
00475 ////////////////////////////////////////////////////////////////////
00476 //     Function: AsyncTaskChain::do_add
00477 //       Access: Protected
00478 //  Description: Adds the indicated task to the active queue.  It is
00479 //               an error if the task is already added to this or any
00480 //               other active queue.
00481 //
00482 //               This is normally called only by the AsyncTaskManager.
00483 //               Assumes the lock is already held.
00484 ////////////////////////////////////////////////////////////////////
00485 void AsyncTaskChain::
00486 do_add(AsyncTask *task) {
00487   nassertv(task->_chain == NULL &&
00488            task->_manager == NULL &&
00489            task->_chain_name == get_name() &&
00490            task->_state == AsyncTask::S_inactive);
00491   nassertv(!do_has_task(task));
00492 
00493   do_start_threads();
00494 
00495   task->_chain = this;
00496   task->_manager = _manager;
00497 
00498   double now = _manager->_clock->get_frame_time();
00499   task->_start_time = now;
00500   task->_start_frame = _manager->_clock->get_frame_count();
00501 
00502   _manager->add_task_by_name(task);
00503 
00504   if (task->has_delay()) {
00505     // This is a deferred task.  Add it to the sleeping queue.
00506     task->_wake_time = now + task->get_delay();
00507     task->_start_time = task->_wake_time;
00508     task->_state = AsyncTask::S_sleeping;
00509     _sleeping.push_back(task);
00510     push_heap(_sleeping.begin(), _sleeping.end(), AsyncTaskSortWakeTime());
00511 
00512   } else {
00513     // This is an active task.  Add it to the active set.
00514     task->_state = AsyncTask::S_active;
00515     if (task_cat.is_spam()) {
00516       task_cat.spam()
00517         << "Adding " << *task << " with sort " << task->get_sort()
00518         << " to chain " << get_name() << " with current_sort "
00519         << _current_sort << "\n";
00520     }
00521     if (task->get_sort() >= _current_sort) {
00522       // It will run this frame.
00523       _active.push_back(task);
00524       push_heap(_active.begin(), _active.end(), AsyncTaskSortPriority());
00525     } else {
00526       // It will run next frame.
00527       _next_active.push_back(task);
00528     }
00529   }
00530   ++_num_tasks;
00531   ++(_manager->_num_tasks);
00532   _needs_cleanup = true;
00533 
00534   _cvar.notify_all();
00535 }
00536 
00537 ////////////////////////////////////////////////////////////////////
00538 //     Function: AsyncTaskChain::do_remove
00539 //       Access: Protected
00540 //  Description: Removes the indicated task from this chain.  Returns
00541 //               true if removed, false otherwise.  Assumes the lock
00542 //               is already held.  The task->upon_death() method is
00543 //               *not* called.
00544 ////////////////////////////////////////////////////////////////////
00545 bool AsyncTaskChain::
00546 do_remove(AsyncTask *task) {
00547   bool removed = false;
00548 
00549   nassertr(task->_chain == this, false);
00550 
00551   switch (task->_state) {
00552   case AsyncTask::S_servicing:
00553     // This task is being serviced.
00554     task->_state = AsyncTask::S_servicing_removed;
00555     removed = true;
00556     break;
00557     
00558   case AsyncTask::S_servicing_removed:
00559     // Being serviced, though it will be removed later.
00560     break;
00561     
00562   case AsyncTask::S_sleeping:
00563     // Sleeping, easy.
00564     {
00565       int index = find_task_on_heap(_sleeping, task);
00566       nassertr(index != -1, false);
00567       _sleeping.erase(_sleeping.begin() + index);
00568       make_heap(_sleeping.begin(), _sleeping.end(), AsyncTaskSortWakeTime());
00569       removed = true;
00570       cleanup_task(task, false, false);
00571     }
00572     break;
00573     
00574   case AsyncTask::S_active:
00575     {
00576       // Active, but not being serviced, easy.
00577       int index = find_task_on_heap(_active, task);
00578       if (index != -1) {
00579         _active.erase(_active.begin() + index);
00580         make_heap(_active.begin(), _active.end(), AsyncTaskSortPriority());
00581       } else {
00582         index = find_task_on_heap(_next_active, task);
00583         if (index != -1) {
00584           _next_active.erase(_next_active.begin() + index);
00585         } else {
00586           index = find_task_on_heap(_this_active, task);
00587           nassertr(index != -1, false);
00588         }
00589       }
00590       removed = true;
00591       cleanup_task(task, false, false);
00592     }
00593     
00594   default:
00595     break;
00596   }
00597 
00598   return removed;
00599 }
00600 
00601 ////////////////////////////////////////////////////////////////////
00602 //     Function: AsyncTaskChain::do_wait_for_tasks
00603 //       Access: Protected
00604 //  Description: Blocks until the task list is empty.  Assumes the
00605 //               lock is held.
00606 ////////////////////////////////////////////////////////////////////
00607 void AsyncTaskChain::
00608 do_wait_for_tasks() {
00609   do_start_threads();
00610 
00611   if (_threads.empty()) {
00612     // Non-threaded case.
00613     while (_num_tasks > 0) {
00614       if (_state == S_shutdown || _state == S_interrupted) {
00615         return;
00616       }
00617       do_poll();
00618     }
00619 
00620   } else {
00621     // Threaded case.
00622     while (_num_tasks > 0) {
00623       if (_state == S_shutdown || _state == S_interrupted) {
00624         return;
00625       }
00626       
00627       PStatTimer timer(_wait_pcollector);
00628       _cvar.wait();
00629     }
00630   }
00631 }
00632 
00633 ////////////////////////////////////////////////////////////////////
00634 //     Function: AsyncTaskChain::do_cleanup
00635 //       Access: Protected
00636 //  Description: Stops all threads and messily empties the task list.
00637 //               This is intended to be called on destruction only.
00638 //               Assumes the lock is already held.
00639 ////////////////////////////////////////////////////////////////////
00640 void AsyncTaskChain::
00641 do_cleanup() {
00642   if (task_cat.is_spam()) {
00643     do_output(task_cat.spam());
00644     task_cat.spam(false)
00645       << ": do_cleanup()\n";
00646   }
00647   do_stop_threads();
00648   _num_threads = 0;
00649 
00650   // Don't call the upon_death functions while we clean up the tasks.
00651   // Instead, store all the tasks in a list as we clean them up, and
00652   // then call the upon_death functions all at once.  We do this
00653   // because calling upon_death wil release the lock, allowing the
00654   // iterators to become invalid.
00655 
00656   TaskHeap dead;
00657   dead.reserve(_num_tasks);
00658 
00659   _needs_cleanup = false;
00660 
00661   TaskHeap::const_iterator ti;
00662   for (ti = _active.begin(); ti != _active.end(); ++ti) {
00663     AsyncTask *task = (*ti);
00664     dead.push_back(task);
00665     cleanup_task(task, false, false);
00666   }
00667   for (ti = _this_active.begin(); ti != _this_active.end(); ++ti) {
00668     AsyncTask *task = (*ti);
00669     dead.push_back(task);
00670     cleanup_task(task, false, false);
00671   }
00672   for (ti = _next_active.begin(); ti != _next_active.end(); ++ti) {
00673     AsyncTask *task = (*ti);
00674     dead.push_back(task);
00675     cleanup_task(task, false, false);
00676   }
00677   for (ti = _sleeping.begin(); ti != _sleeping.end(); ++ti) {
00678     AsyncTask *task = (*ti);
00679     dead.push_back(task);
00680     cleanup_task(task, false, false);
00681   }
00682 
00683   // There might still be one task remaining: the currently-executing
00684   // task.
00685   nassertv(_num_tasks == 0 || _num_tasks == 1);
00686   
00687   // Now go back and call the upon_death functions.
00688   _manager->_lock.release();
00689   for (ti = dead.begin(); ti != dead.end(); ++ti) {
00690     (*ti)->upon_death(_manager, false);
00691   }
00692   _manager->_lock.acquire();
00693 
00694   if (task_cat.is_spam()) {
00695     do_output(task_cat.spam());
00696     task_cat.spam(false)
00697       << ": done do_cleanup()\n";
00698   }
00699 }
00700 
00701 ////////////////////////////////////////////////////////////////////
00702 //     Function: AsyncTaskChain::do_has_task
00703 //       Access: Protected
00704 //  Description: Returns true if the task is on one of the task lists,
00705 //               false if it is not (false may mean that the task is
00706 //               currently being serviced).  Assumes the lock is
00707 //               currently held.
00708 ////////////////////////////////////////////////////////////////////
00709 bool AsyncTaskChain::
00710 do_has_task(AsyncTask *task) const {
00711   return (find_task_on_heap(_active, task) != -1 ||
00712           find_task_on_heap(_next_active, task) != -1 ||
00713           find_task_on_heap(_sleeping, task) != -1 ||
00714           find_task_on_heap(_this_active, task) != -1);
00715 }
00716 
00717 ////////////////////////////////////////////////////////////////////
00718 //     Function: AsyncTaskChain::find_task_on_heap
00719 //       Access: Protected
00720 //  Description: Returns the index number of the indicated task within
00721 //               the specified task list, or -1 if the task is not
00722 //               found in the list (this may mean that it is currently
00723 //               being serviced).  Assumes that the lock is currently
00724 //               held.
00725 ////////////////////////////////////////////////////////////////////
00726 int AsyncTaskChain::
00727 find_task_on_heap(const TaskHeap &heap, AsyncTask *task) const {
00728   for (int i = 0; i < (int)heap.size(); ++i) {
00729     if (heap[i] == task) {
00730       return i;
00731     }
00732   }
00733 
00734   return -1;
00735 }
00736 
00737 ////////////////////////////////////////////////////////////////////
00738 //     Function: AsyncTaskChain::service_one_task
00739 //       Access: Protected
00740 //  Description: Pops a single task off the active queue, services it,
00741 //               and restores it to the end of the queue.  This is
00742 //               called internally only within one of the task
00743 //               threads.  Assumes the lock is already held.
00744 //
00745 //               Note that the lock may be temporarily released by
00746 //               this method.
00747 ////////////////////////////////////////////////////////////////////
00748 void AsyncTaskChain::
00749 service_one_task(AsyncTaskChain::AsyncTaskChainThread *thread) {
00750   if (!_active.empty()) {
00751     PT(AsyncTask) task = _active.front();
00752     pop_heap(_active.begin(), _active.end(), AsyncTaskSortPriority());
00753     _active.pop_back();
00754 
00755     if (thread != (AsyncTaskChain::AsyncTaskChainThread *)NULL) {
00756       thread->_servicing = task;
00757     }
00758 
00759     if (task_cat.is_spam()) {
00760       task_cat.spam()
00761         << "Servicing " << *task << " in "
00762         << *Thread::get_current_thread() << "\n";
00763     }
00764 
00765     nassertv(task->get_sort() == _current_sort);
00766     nassertv(task->_state == AsyncTask::S_active);
00767     task->_state = AsyncTask::S_servicing;
00768     task->_servicing_thread = thread;
00769 
00770     AsyncTask::DoneStatus ds = task->unlock_and_do_task();
00771 
00772     if (thread != (AsyncTaskChain::AsyncTaskChainThread *)NULL) {
00773       thread->_servicing = NULL;
00774     }
00775     task->_servicing_thread = NULL;
00776 
00777     if (task->_chain == this) {
00778       if (task->_state == AsyncTask::S_servicing_removed) {
00779         // This task wants to kill itself.
00780         cleanup_task(task, true, false);
00781 
00782       } else if (task->_chain_name != get_name()) {
00783         // The task wants to jump to a different chain.
00784         PT(AsyncTask) hold_task = task;
00785         cleanup_task(task, false, false);
00786         task->jump_to_task_chain(_manager);
00787 
00788       } else {
00789         switch (ds) {
00790         case AsyncTask::DS_cont:
00791           // The task is still alive; put it on the next frame's active
00792           // queue.
00793           task->_state = AsyncTask::S_active;
00794           _next_active.push_back(task);
00795           _cvar.notify_all();
00796           break;
00797           
00798         case AsyncTask::DS_again:
00799           // The task wants to sleep again.
00800           {
00801             double now = _manager->_clock->get_frame_time();
00802             task->_wake_time = now + task->get_delay();
00803             task->_start_time = task->_wake_time;
00804             task->_state = AsyncTask::S_sleeping;
00805             _sleeping.push_back(task);
00806             push_heap(_sleeping.begin(), _sleeping.end(), AsyncTaskSortWakeTime());
00807             if (task_cat.is_spam()) {
00808               task_cat.spam()
00809                 << "Sleeping " << *task << ", wake time at " 
00810                 << task->_wake_time - now << "\n";
00811             }
00812             _cvar.notify_all();
00813           }
00814           break;
00815 
00816         case AsyncTask::DS_pickup:
00817           // The task wants to run again this frame if possible.
00818           task->_state = AsyncTask::S_active;
00819           _this_active.push_back(task);
00820           _cvar.notify_all();
00821           break;
00822 
00823         case AsyncTask::DS_interrupt:
00824           // The task had an exception and wants to raise a big flag.
00825           task->_state = AsyncTask::S_active;
00826           _next_active.push_back(task);
00827           if (_state == S_started) {
00828             _state = S_interrupted;
00829             _cvar.notify_all();
00830           }
00831           break;
00832           
00833         default:
00834           // The task has finished.
00835           cleanup_task(task, true, true);
00836         }
00837       }
00838     } else {
00839       task_cat.error()
00840         << "Task is no longer on chain " << get_name() 
00841         << ": " << *task << "\n";
00842     }
00843 
00844     if (task_cat.is_spam()) {
00845       task_cat.spam()
00846         << "Done servicing " << *task << " in "
00847         << *Thread::get_current_thread() << "\n";
00848     }
00849   }
00850   thread_consider_yield();
00851 }
00852 
00853 ////////////////////////////////////////////////////////////////////
00854 //     Function: AsyncTaskChain::cleanup_task
00855 //       Access: Protected
00856 //  Description: Called internally when a task has completed (or been
00857 //               interrupted) and is about to be removed from the
00858 //               active queue.  Assumes the lock is held.
00859 //
00860 //               If upon_death is true, then task->upon_death() will
00861 //               also be called, with the indicated clean_exit
00862 //               parameter.
00863 //
00864 //               Note that the lock may be temporarily released by
00865 //               this method.
00866 ////////////////////////////////////////////////////////////////////
00867 void AsyncTaskChain::
00868 cleanup_task(AsyncTask *task, bool upon_death, bool clean_exit) {
00869   if (task_cat.is_spam()) {
00870     do_output(task_cat.spam());
00871     task_cat.spam(false)
00872       << ": cleanup_task(" << *task << ", " << upon_death << ", " << clean_exit
00873       << ")\n";
00874   }
00875 
00876   nassertv(task->_chain == this);
00877   PT(AsyncTask) hold_task = task;
00878 
00879   task->_state = AsyncTask::S_inactive;
00880   task->_chain = NULL;
00881   task->_manager = NULL;
00882   --_num_tasks;
00883   --(_manager->_num_tasks);
00884 
00885   _manager->remove_task_by_name(task);
00886 
00887   if (upon_death) {
00888     _manager->_lock.release();
00889     task->upon_death(_manager, clean_exit);
00890     _manager->_lock.acquire();
00891   }
00892 }
00893 
00894 ////////////////////////////////////////////////////////////////////
00895 //     Function: AsyncTaskChain::finish_sort_group
00896 //       Access: Protected
00897 //  Description: Called internally when all tasks of a given sort
00898 //               value have been completed, and it is time to
00899 //               increment to the next sort value, or begin the next
00900 //               epoch.  Assumes the lock is held.
00901 //
00902 //               Returns true if there are more tasks on the queue
00903 //               after this operation, or false if the task list is
00904 //               empty and we need to wait.
00905 ////////////////////////////////////////////////////////////////////
00906 bool AsyncTaskChain::
00907 finish_sort_group() {
00908   nassertr(_num_busy_threads == 0, true);
00909 
00910   if (!_threads.empty()) {
00911     PStatClient::thread_tick(get_name());
00912   }
00913   
00914   if (!_active.empty()) {
00915     // There are more tasks; just set the next sort value.
00916     nassertr(_current_sort < _active.front()->get_sort(), true);
00917     _current_sort = _active.front()->get_sort();
00918     _cvar.notify_all();
00919     return true;
00920   }
00921 
00922   // There are no more tasks in this epoch; advance to the next epoch.
00923 
00924   if (!_this_active.empty() && _frame_budget >= 0.0) {
00925     // Enter pickup mode.  This is a special mode at the end of the
00926     // epoch in which we are just re-running the tasks that think they
00927     // can still run within the frame, in an attempt to use up our
00928     // frame budget.
00929 
00930     if (task_cat.is_spam()) {
00931       do_output(task_cat.spam());
00932       task_cat.spam(false)
00933         << ": next epoch (pickup mode)\n";
00934     }
00935 
00936     _pickup_mode = true;
00937     _active.swap(_this_active);
00938 
00939   } else {
00940     // Not in pickup mode.
00941 
00942     if (task_cat.is_spam()) {
00943       do_output(task_cat.spam());
00944       task_cat.spam(false)
00945         << ": next epoch\n";
00946     }
00947 
00948     _pickup_mode = false;
00949 
00950     // Here, there's no difference between _this_active and
00951     // _next_active.  Combine them.
00952     _next_active.insert(_next_active.end(), _this_active.begin(), _this_active.end());
00953     _this_active.clear();
00954 
00955     _active.swap(_next_active);
00956 
00957     // We only tick the clock and wake sleepers in normal mode, the
00958     // first time through the task list; not in pickup mode when we
00959     // are re-running the stragglers just to use up our frame budget.
00960 
00961     if (_tick_clock) {
00962       if (task_cat.is_spam()) {
00963         do_output(task_cat.spam());
00964         task_cat.spam(false)
00965           << ": tick clock\n";
00966       }
00967       _manager->_clock->tick();
00968       _manager->_frame_cvar.notify_all();
00969 
00970     } else if (_frame_sync) {
00971       // If we're a synced chain, we have to wait at the end of the
00972       // epoch for someone else to tick the clock.
00973       _block_till_next_frame = true;
00974     }
00975     
00976     // Check for any sleeping tasks that need to be woken.
00977     double now = _manager->_clock->get_frame_time();
00978     while (!_sleeping.empty() && _sleeping.front()->_wake_time <= now) {
00979       PT(AsyncTask) task = _sleeping.front();
00980       if (task_cat.is_spam()) {
00981         task_cat.spam()
00982           << "Waking " << *task << ", wake time at " 
00983           << task->_wake_time - now << "\n";
00984       }
00985       pop_heap(_sleeping.begin(), _sleeping.end(), AsyncTaskSortWakeTime());
00986       _sleeping.pop_back();
00987       task->_state = AsyncTask::S_active;
00988       task->_start_frame = _manager->_clock->get_frame_count();
00989       _active.push_back(task);
00990     }
00991 
00992     if (task_cat.is_spam()) {
00993       if (_sleeping.empty()) {
00994         task_cat.spam()
00995           << "No more tasks on sleeping queue.\n";
00996       } else {
00997         task_cat.spam()
00998           << "Next sleeper: " << *_sleeping.front() << ", wake time at " 
00999           << _sleeping.front()->_wake_time - now << "\n";
01000       }
01001     }
01002 
01003     // Any tasks that are on the active queue at the beginning of the
01004     // epoch are deemed to have run one frame (or to be about to).
01005     TaskHeap::const_iterator ti;
01006     for (ti = _active.begin(); ti != _active.end(); ++ti) {
01007       AsyncTask *task = (*ti);
01008       ++task->_num_frames;
01009     }
01010   }
01011 
01012   if (_timeslice_priority) {
01013     filter_timeslice_priority();
01014   }
01015 
01016   nassertr((size_t)_num_tasks == _active.size() + _this_active.size() + _next_active.size() + _sleeping.size(), true);
01017   make_heap(_active.begin(), _active.end(), AsyncTaskSortPriority());
01018 
01019   _current_sort = -INT_MAX;
01020 
01021   if (!_active.empty()) {
01022     // Signal the threads to start executing the first task again.
01023     _cvar.notify_all();
01024     return true;
01025   }
01026 
01027   // There are no tasks to be had anywhere.  Chill.
01028   _pickup_mode = false;
01029   nassertr(_this_active.empty(), false);
01030   return false;
01031 }
01032 
01033 ////////////////////////////////////////////////////////////////////
01034 //     Function: AsyncTaskChain::filter_timeslice_priority
01035 //       Access: Protected
01036 //  Description: Called to filter the _active tasks list when we are
01037 //               in the special timeslice_priority mode.  In this
01038 //               mode, go through and postpone any tasks that have
01039 //               already exceeded their priority budget for this
01040 //               epoch.
01041 //
01042 //               Assumes the lock is already held.
01043 ////////////////////////////////////////////////////////////////////
01044 void AsyncTaskChain::
01045 filter_timeslice_priority() {
01046   if (_active.empty()) {
01047     return;
01048   }
01049   nassertv(_timeslice_priority);
01050 
01051   // We must first sum up the average per-epoch runtime of each task.
01052   double net_runtime = 0.0;
01053   int net_priority = 0;
01054   
01055   TaskHeap::iterator ti;
01056   for (ti = _active.begin(); ti != _active.end(); ++ti) {
01057     AsyncTask *task = (*ti);
01058     double runtime = max(task->get_average_dt(), 0.0);
01059     int priority = max(task->_priority, 1);
01060     net_runtime += runtime;
01061     net_priority += priority;
01062   }
01063   
01064   // That gives us a timeslice budget per priority value.
01065   double average_budget = net_runtime / (double)net_priority;
01066   
01067   TaskHeap keep, postpone;
01068   for (ti = _active.begin(); ti != _active.end(); ++ti) {
01069     AsyncTask *task = (*ti);
01070     double runtime = max(task->get_average_dt(), 0.0);
01071     int priority = max(task->_priority, 1);
01072     double consumed = runtime / (double)priority;
01073     //    cerr << *task << " consumed " << consumed << " vs. " << average_budget << "\n";
01074     if (consumed > average_budget) {
01075       // Postpone.  Run this task next epoch.
01076       postpone.push_back(task);
01077     } else {
01078       // Keep, and run this task this epoch.
01079       keep.push_back(task);
01080     }
01081   }
01082 
01083   if (keep.empty()) {
01084     // Hmm, nothing to keep.  Grab the postponed task with the highest
01085     // priority and keep that instead.
01086     nassertv(!postpone.empty());
01087     ti = postpone.begin();
01088     TaskHeap::iterator max_ti = ti;
01089     ++ti;
01090     while (ti != postpone.end()) {
01091       if ((*ti)->_priority > (*max_ti)->_priority) {
01092         max_ti = ti;
01093       }
01094     }
01095 
01096     //    cerr << "Nothing to keep, keeping " << *(*max_ti) << " instead\n";
01097 
01098     keep.push_back(*max_ti);
01099     postpone.erase(max_ti);
01100   }
01101    
01102   _active.swap(keep);
01103   if (_pickup_mode) {
01104     _this_active.insert(_this_active.end(), postpone.begin(), postpone.end());
01105   } else {
01106     _next_active.insert(_next_active.end(), postpone.begin(), postpone.end());
01107   }
01108 
01109   nassertv(!_active.empty());
01110 }
01111 
01112 ////////////////////////////////////////////////////////////////////
01113 //     Function: AsyncTaskChain::do_stop_threads
01114 //       Access: Protected
01115 //  Description: The private implementation of stop_threads; assumes
01116 //               the lock is already held.
01117 ////////////////////////////////////////////////////////////////////
01118 void AsyncTaskChain::
01119 do_stop_threads() {
01120   if (_state == S_started || _state == S_interrupted) {
01121     if (task_cat.is_debug() && !_threads.empty()) {
01122       task_cat.debug()
01123         << "Stopping " << _threads.size() 
01124         << " threads for " << _manager->get_name()
01125         << " chain " << get_name() 
01126         << " in " << *Thread::get_current_thread() << "\n";
01127     }
01128 
01129     _state = S_shutdown;
01130     _cvar.notify_all();
01131     _manager->_frame_cvar.notify_all();
01132     
01133     Threads wait_threads;
01134     wait_threads.swap(_threads);
01135     
01136     // We have to release the lock while we join, so the threads can
01137     // wake up and see that we're shutting down.
01138     _manager->_lock.release();
01139     Threads::iterator ti;
01140     for (ti = wait_threads.begin(); ti != wait_threads.end(); ++ti) {
01141       if (task_cat.is_debug()) {
01142         task_cat.debug()
01143           << "Waiting for " << *(*ti) << " in " 
01144           << *Thread::get_current_thread() << "\n";
01145       }
01146       (*ti)->join();
01147       if (task_cat.is_spam()) {
01148         task_cat.spam()
01149           << "Done waiting for " << *(*ti) << " in " 
01150           << *Thread::get_current_thread() << "\n";
01151       }
01152     }
01153     _manager->_lock.acquire();
01154     
01155     _state = S_initial;
01156 
01157     // There might be one busy "thread" still: the main thread.
01158     nassertv(_num_busy_threads == 0 || _num_busy_threads == 1);
01159     cleanup_pickup_mode();
01160   }
01161 }
01162 
01163 ////////////////////////////////////////////////////////////////////
01164 //     Function: AsyncTaskChain::do_start_threads
01165 //       Access: Protected
01166 //  Description: The private implementation of start_threads; assumes
01167 //               the lock is already held.
01168 ////////////////////////////////////////////////////////////////////
01169 void AsyncTaskChain::
01170 do_start_threads() {
01171   if (_state == S_interrupted) {
01172     do_stop_threads();
01173   }
01174 
01175   if (_state == S_initial) {
01176     _state = S_started;
01177     if (Thread::is_threading_supported() && _num_threads > 0) {
01178       if (task_cat.is_debug()) {
01179         task_cat.debug()
01180           << "Starting " << _num_threads << " threads for "
01181           << _manager->get_name() << " chain " << get_name() << "\n";
01182       }
01183       _needs_cleanup = true;
01184       _threads.reserve(_num_threads);
01185       for (int i = 0; i < _num_threads; ++i) {
01186         ostringstream strm;
01187         strm << _manager->get_name() << "_" << get_name() << "_" << i;
01188         PT(AsyncTaskChainThread) thread = new AsyncTaskChainThread(strm.str(), this);
01189         if (thread->start(_thread_priority, true)) {
01190           _threads.push_back(thread);
01191         }
01192       }
01193     }
01194   }
01195 }
01196 
01197 ////////////////////////////////////////////////////////////////////
01198 //     Function: AsyncTaskChain::do_get_active_tasks
01199 //       Access: Protected
01200 //  Description: Returns the set of tasks that are active (and not
01201 //               sleeping) on the task chain, at the time of the
01202 //               call.  Assumes the lock is held.
01203 ////////////////////////////////////////////////////////////////////
01204 AsyncTaskCollection AsyncTaskChain::
01205 do_get_active_tasks() const {
01206   AsyncTaskCollection result;
01207 
01208   Threads::const_iterator thi;
01209   for (thi = _threads.begin(); thi != _threads.end(); ++thi) {
01210     AsyncTask *task = (*thi)->_servicing;
01211     if (task != (AsyncTask *)NULL) {
01212       result.add_task(task);
01213     }
01214   }
01215   TaskHeap::const_iterator ti;
01216   for (ti = _active.begin(); ti != _active.end(); ++ti) {
01217     AsyncTask *task = (*ti);
01218     result.add_task(task);
01219   }
01220   for (ti = _this_active.begin(); ti != _this_active.end(); ++ti) {
01221     AsyncTask *task = (*ti);
01222     result.add_task(task);
01223   }
01224   for (ti = _next_active.begin(); ti != _next_active.end(); ++ti) {
01225     AsyncTask *task = (*ti);
01226     result.add_task(task);
01227   }
01228 
01229   return result;
01230 }
01231 
01232 ////////////////////////////////////////////////////////////////////
01233 //     Function: AsyncTaskChain::do_get_sleeping_tasks
01234 //       Access: Published
01235 //  Description: Returns the set of tasks that are sleeping (and not
01236 //               active) on the task chain, at the time of the
01237 //               call.  Assumes the lock is held.
01238 ////////////////////////////////////////////////////////////////////
01239 AsyncTaskCollection AsyncTaskChain::
01240 do_get_sleeping_tasks() const {
01241   AsyncTaskCollection result;
01242 
01243   TaskHeap::const_iterator ti;
01244   for (ti = _sleeping.begin(); ti != _sleeping.end(); ++ti) {
01245     AsyncTask *task = (*ti);
01246     result.add_task(task);
01247   }
01248 
01249   return result;
01250 }
01251 
01252 ////////////////////////////////////////////////////////////////////
01253 //     Function: AsyncTaskChain::do_poll
01254 //       Access: Protected
01255 //  Description: The private implementation of poll(), this assumes
01256 //               the lock is already held.
01257 ////////////////////////////////////////////////////////////////////
01258 void AsyncTaskChain::
01259 do_poll() {
01260   thread_consider_yield();
01261   if (_num_tasks == 0) {
01262     return;
01263   }
01264 
01265   do_start_threads();
01266 
01267   if (!_threads.empty()) {
01268     return;
01269   }
01270 
01271   if (_num_busy_threads != 0) {
01272     // We are recursively nested within another task.  Return, with a warning.
01273     task_cat.warning()
01274       << "Ignoring recursive poll() within another task.\n";
01275     return;
01276   }
01277 
01278   nassertv(!_pickup_mode);
01279 
01280   do {
01281     while (!_active.empty()) {
01282       if (_state == S_shutdown || _state == S_interrupted) {
01283         return;
01284       }
01285       int frame = _manager->_clock->get_frame_count();
01286       if (_current_frame != frame) {
01287         _current_frame = frame;
01288         _time_in_frame = 0.0;
01289         _block_till_next_frame = false;
01290       }
01291       if (_block_till_next_frame ||
01292           (_frame_budget >= 0.0 && _time_in_frame >= _frame_budget)) {
01293         // If we've exceeded our budget, stop here.  We'll resume from
01294         // this point at the next call to poll().
01295         cleanup_pickup_mode();
01296         return;
01297       }
01298       
01299       _current_sort = _active.front()->get_sort();
01300 
01301       // Normally, there won't be any threads running at the same time
01302       // we're in poll().  But it's possible, if someone calls
01303       // set_num_threads() while we're processing.
01304       _num_busy_threads++;
01305       service_one_task(NULL);
01306       _num_busy_threads--;
01307       _cvar.notify_all();
01308 
01309       if (!_threads.empty()) {
01310         return;
01311       }
01312     }
01313     
01314     finish_sort_group();
01315   } while (_pickup_mode);
01316 }
01317 
01318 ////////////////////////////////////////////////////////////////////
01319 //     Function: AsyncTaskChain::cleanup_pickup_mode
01320 //       Access: Protected
01321 //  Description: Clean up the damage from setting pickup mode.  This
01322 //               means we restore the _active and _next_active lists
01323 //               as they should have been without pickup mode, for
01324 //               next frame.  Assumes the lock is held.
01325 ////////////////////////////////////////////////////////////////////
01326 void AsyncTaskChain::
01327 cleanup_pickup_mode() {
01328   if (_pickup_mode) {
01329     _pickup_mode = false;
01330 
01331     // Move everything to the _next_active queue.
01332     _next_active.insert(_next_active.end(), _this_active.begin(), _this_active.end());
01333     _this_active.clear();
01334     _next_active.insert(_next_active.end(), _active.begin(), _active.end());
01335     _active.clear();
01336 
01337     // Now finish the epoch properly.
01338     finish_sort_group();
01339   }
01340 }
01341 
01342 ////////////////////////////////////////////////////////////////////
01343 //     Function: AsyncTaskChain::do_output
01344 //       Access: Protected
01345 //  Description: The private implementation of output(), this assumes
01346 //               the lock is already held.
01347 ////////////////////////////////////////////////////////////////////
01348 void AsyncTaskChain::
01349 do_output(ostream &out) const {
01350   if (_manager != (AsyncTaskManager *)NULL) {
01351     out << _manager->get_type() << " " << _manager->get_name();
01352   } else {
01353     out << "(no manager)";
01354   }
01355   out << " task chain " << get_name()
01356       << "; " << _num_tasks << " tasks";
01357 }
01358 
01359 ////////////////////////////////////////////////////////////////////
01360 //     Function: AsyncTaskChain::do_write
01361 //       Access: Protected
01362 //  Description: The private implementation of write(), this assumes
01363 //               the lock is already held.
01364 ////////////////////////////////////////////////////////////////////
01365 void AsyncTaskChain::
01366 do_write(ostream &out, int indent_level) const {
01367   indent(out, indent_level)
01368     << "Task chain \"" << get_name() << "\"\n";
01369   if (_num_threads > 0) {
01370     indent(out, indent_level + 2) 
01371       << _num_threads << " threads, priority " << _thread_priority << "\n";
01372   }
01373   if (_frame_budget >= 0.0) {
01374     indent(out, indent_level + 2) 
01375       << "frame budget " << _frame_budget << " s\n";
01376   }
01377   if (_timeslice_priority) {
01378     indent(out, indent_level + 2) 
01379       << "timeslice priority\n";
01380   }
01381   if (_tick_clock) {
01382     indent(out, indent_level + 2) 
01383       << "tick clock\n";
01384   }
01385 
01386   static const size_t buffer_size = 1024;
01387   char buffer[buffer_size];
01388   sprintf(buffer, " %-32s %8s %8s %8s %8s %6s",
01389           "Task", 
01390           "sleep(s)",
01391           "dt(ms)", "avg", "max",
01392           "sort");
01393   nassertv(strlen(buffer) < buffer_size);
01394 
01395   indent(out, indent_level) 
01396     << buffer << "\n";
01397 
01398   indent(out, indent_level);
01399   for (int i = 0; i < 32+8+8+8+8+6+7; ++i) {
01400     out << '-';
01401   }
01402   out << "\n";
01403 
01404   // Collect a list of all active tasks, then sort them into order for
01405   // output.
01406   TaskHeap tasks = _active;
01407   tasks.insert(tasks.end(), _this_active.begin(), _this_active.end());
01408   tasks.insert(tasks.end(), _next_active.begin(), _next_active.end());
01409 
01410   Threads::const_iterator thi;
01411   for (thi = _threads.begin(); thi != _threads.end(); ++thi) {
01412     AsyncTask *task = (*thi)->_servicing;
01413     if (task != (AsyncTask *)NULL) {
01414       tasks.push_back(task);
01415     }
01416   }
01417 
01418   double now = _manager->_clock->get_frame_time();
01419 
01420   if (!tasks.empty()) {
01421     sort(tasks.begin(), tasks.end(), AsyncTaskSortPriority());
01422 
01423     // Since AsyncTaskSortPriority() sorts backwards (because of STL's
01424     // push_heap semantics), we go through the task list in reverse
01425     // order to print them forwards.
01426     TaskHeap::reverse_iterator ti;
01427     for (ti = tasks.rbegin(); ti != tasks.rend(); ++ti) {
01428       AsyncTask *task = (*ti);
01429       write_task_line(out, indent_level, task, now);
01430     }
01431   }
01432 
01433   // Instead of iterating through the _sleeping list in heap order,
01434   // copy it and then use repeated pops to get it out in sorted
01435   // order, for the user's satisfaction.
01436   TaskHeap sleeping = _sleeping;
01437   while (!sleeping.empty()) {
01438     PT(AsyncTask) task = sleeping.front();
01439     pop_heap(sleeping.begin(), sleeping.end(), AsyncTaskSortWakeTime());
01440     sleeping.pop_back();
01441     
01442     write_task_line(out, indent_level, task, now);
01443   }
01444 }
01445 
01446 ////////////////////////////////////////////////////////////////////
01447 //     Function: AsyncTaskChain::write_task_line
01448 //       Access: Protected
01449 //  Description: Writes a single line for a task in the report
01450 //               generated by do_write().  Assumes the lock is already
01451 //               held.
01452 ////////////////////////////////////////////////////////////////////
01453 void AsyncTaskChain::
01454 write_task_line(ostream &out, int indent_level, AsyncTask *task, double now) const {
01455   char servicing_flag = ' ';
01456   if (task->_state == AsyncTask::S_servicing) {
01457     servicing_flag = '*';
01458   } else if (task->_state == AsyncTask::S_servicing_removed) {
01459     servicing_flag = '-';
01460   }
01461 
01462   static const size_t buffer_size = 1024;
01463   char buffer[buffer_size];
01464 
01465   if (task->_state == AsyncTask::S_sleeping) {
01466     // For sleeping tasks, include the wake time, as an elapsed time
01467     // in seconds.
01468     string name = task->get_name().substr(0, 32);
01469     sprintf(buffer, "%c%-32s %8.1f",
01470             servicing_flag, name.c_str(),
01471             task->_wake_time - now);
01472   } else {
01473     // For active tasks, don't include a wake time.  This means we
01474     // have more space for the name.
01475     string name = task->get_name().substr(0, 41);
01476     sprintf(buffer, "%c%-41s",
01477             servicing_flag, name.c_str());
01478   }
01479   nassertv(strlen(buffer) < buffer_size);
01480 
01481   indent(out, indent_level) 
01482     << buffer;
01483 
01484   if (task->_num_frames > 0) {
01485     sprintf(buffer, " %8.1f %8.1f %8.1f %6d",
01486             task->_dt * 1000.0, task->get_average_dt() * 1000.0,
01487             task->_max_dt * 1000.0, 
01488             task->_sort);
01489   } else {
01490     // No statistics for a task that hasn't run yet.
01491     sprintf(buffer, " %8s %8s %8s %6d",
01492             "", "", "",
01493             task->_sort);
01494   }
01495 
01496   nassertv(strlen(buffer) < buffer_size);
01497   out << buffer << "\n";
01498 }
01499 
01500 ////////////////////////////////////////////////////////////////////
01501 //     Function: AsyncTaskChain::AsyncTaskChainThread::Constructor
01502 //       Access: Public
01503 //  Description: 
01504 ////////////////////////////////////////////////////////////////////
01505 AsyncTaskChain::AsyncTaskChainThread::
01506 AsyncTaskChainThread(const string &name, AsyncTaskChain *chain) :
01507   Thread(name, chain->get_name()),
01508   _chain(chain),
01509   _servicing(NULL)
01510 {
01511 }
01512 
01513 ////////////////////////////////////////////////////////////////////
01514 //     Function: AsyncTaskChain::AsyncTaskChainThread::thread_main
01515 //       Access: Public, Virtual
01516 //  Description: 
01517 ////////////////////////////////////////////////////////////////////
01518 void AsyncTaskChain::AsyncTaskChainThread::
01519 thread_main() {
01520   MutexHolder holder(_chain->_manager->_lock);
01521   while (_chain->_state != S_shutdown && _chain->_state != S_interrupted) {
01522     thread_consider_yield();
01523     if (!_chain->_active.empty() &&
01524         _chain->_active.front()->get_sort() == _chain->_current_sort) {
01525 
01526       int frame = _chain->_manager->_clock->get_frame_count();
01527       if (_chain->_current_frame != frame) {
01528         _chain->_current_frame = frame;
01529         _chain->_time_in_frame = 0.0;
01530         _chain->_block_till_next_frame = false;
01531       }
01532 
01533       // If we've exceeded our frame budget, sleep until the next
01534       // frame.
01535       if (_chain->_block_till_next_frame ||
01536           (_chain->_frame_budget >= 0.0 && _chain->_time_in_frame >= _chain->_frame_budget)) {
01537         while ((_chain->_block_till_next_frame ||
01538                 (_chain->_frame_budget >= 0.0 && _chain->_time_in_frame >= _chain->_frame_budget)) &&
01539                _chain->_state != S_shutdown && _chain->_state != S_interrupted) {
01540           _chain->cleanup_pickup_mode();
01541           _chain->_manager->_frame_cvar.wait();
01542           frame = _chain->_manager->_clock->get_frame_count();
01543           if (_chain->_current_frame != frame) {
01544             _chain->_current_frame = frame;
01545             _chain->_time_in_frame = 0.0;
01546             _chain->_block_till_next_frame = false;
01547           }
01548         }
01549         // Now that it's the next frame, go back to the top of the loop.
01550         continue;
01551       }
01552 
01553       PStatTimer timer(_task_pcollector);
01554       _chain->_num_busy_threads++;
01555       _chain->service_one_task(this);
01556       _chain->_num_busy_threads--;
01557       _chain->_cvar.notify_all();
01558 
01559     } else {
01560       // We've finished all the available tasks of the current sort
01561       // value.  We can't pick up a new task until all of the threads
01562       // finish the tasks with the same sort value.
01563       if (_chain->_num_busy_threads == 0) {
01564         // We're the last thread to finish.  Update _current_sort.
01565         if (!_chain->finish_sort_group()) {
01566           // Nothing to do.  Wait for more tasks to be added.
01567           if (_chain->_sleeping.empty()) {
01568             PStatTimer timer(_wait_pcollector);
01569             _chain->_cvar.wait();
01570           } else {
01571             double wake_time = _chain->do_get_next_wake_time();
01572             double now = _chain->_manager->_clock->get_frame_time();
01573             double timeout = max(wake_time - now, 0.0);
01574             PStatTimer timer(_wait_pcollector);
01575             _chain->_cvar.wait(timeout);
01576           }            
01577         }
01578 
01579       } else {
01580         // Wait for the other threads to finish their current task
01581         // before we continue.
01582         PStatTimer timer(_wait_pcollector);
01583         _chain->_cvar.wait();
01584       }
01585     }
01586   }
01587 }
01588 
 All Classes Functions Variables Enumerations