Panda3D
|
00001 // Filename: asyncTaskChain.cxx 00002 // Created by: drose (23Aug06) 00003 // 00004 //////////////////////////////////////////////////////////////////// 00005 // 00006 // PANDA 3D SOFTWARE 00007 // Copyright (c) Carnegie Mellon University. All rights reserved. 00008 // 00009 // All use of this software is subject to the terms of the revised BSD 00010 // license. You should have received a copy of this license along 00011 // with this source code in a file named "LICENSE." 00012 // 00013 //////////////////////////////////////////////////////////////////// 00014 00015 #include "asyncTaskChain.h" 00016 #include "asyncTaskManager.h" 00017 #include "event.h" 00018 #include "mutexHolder.h" 00019 #include "indent.h" 00020 #include "pStatClient.h" 00021 #include "pStatTimer.h" 00022 #include "clockObject.h" 00023 #include <algorithm> 00024 #include <stdio.h> // For sprintf/snprintf 00025 00026 TypeHandle AsyncTaskChain::_type_handle; 00027 00028 PStatCollector AsyncTaskChain::_task_pcollector("Task"); 00029 PStatCollector AsyncTaskChain::_wait_pcollector("Wait"); 00030 00031 //////////////////////////////////////////////////////////////////// 00032 // Function: AsyncTaskChain::Constructor 00033 // Access: Published 00034 // Description: 00035 //////////////////////////////////////////////////////////////////// 00036 AsyncTaskChain:: 00037 AsyncTaskChain(AsyncTaskManager *manager, const string &name) : 00038 Namable(name), 00039 _manager(manager), 00040 _cvar(manager->_lock), 00041 _tick_clock(false), 00042 _timeslice_priority(false), 00043 _num_threads(0), 00044 _thread_priority(TP_normal), 00045 _frame_budget(-1.0), 00046 _frame_sync(false), 00047 _num_busy_threads(0), 00048 _num_tasks(0), 00049 _state(S_initial), 00050 _current_sort(-INT_MAX), 00051 _pickup_mode(false), 00052 _needs_cleanup(false), 00053 _current_frame(0), 00054 _time_in_frame(0.0), 00055 _block_till_next_frame(false) 00056 { 00057 } 00058 00059 //////////////////////////////////////////////////////////////////// 00060 // Function: AsyncTaskChain::Destructor 00061 // Access: Published, Virtual 00062 // Description: 00063 //////////////////////////////////////////////////////////////////// 00064 AsyncTaskChain:: 00065 ~AsyncTaskChain() { 00066 // We only grab the lock if _needs_cleanup is true. This way, the 00067 // temporary AsyncTaskChain objects created (and destructed) within 00068 // the task manager won't risk a double-lock. 00069 if (_needs_cleanup) { 00070 MutexHolder holder(_manager->_lock); 00071 do_cleanup(); 00072 } 00073 } 00074 00075 //////////////////////////////////////////////////////////////////// 00076 // Function: AsyncTaskChain::set_tick_clock 00077 // Access: Published 00078 // Description: Sets the tick_clock flag. When this is true, 00079 // get_clock()->tick() will be called automatically at 00080 // each task epoch. This is false by default. 00081 //////////////////////////////////////////////////////////////////// 00082 void AsyncTaskChain:: 00083 set_tick_clock(bool tick_clock) { 00084 MutexHolder holder(_manager->_lock); 00085 _tick_clock = tick_clock; 00086 } 00087 00088 //////////////////////////////////////////////////////////////////// 00089 // Function: AsyncTaskChain::get_tick_clock 00090 // Access: Published 00091 // Description: Returns the tick_clock flag. See set_tick_clock(). 00092 //////////////////////////////////////////////////////////////////// 00093 bool AsyncTaskChain:: 00094 get_tick_clock() const { 00095 MutexHolder holder(_manager->_lock); 00096 return _tick_clock; 00097 } 00098 00099 //////////////////////////////////////////////////////////////////// 00100 // Function: AsyncTaskChain::set_num_threads 00101 // Access: Published 00102 // Description: Changes the number of threads for this task chain. 00103 // This may require stopping the threads if they are 00104 // already running. 00105 //////////////////////////////////////////////////////////////////// 00106 void AsyncTaskChain:: 00107 set_num_threads(int num_threads) { 00108 nassertv(num_threads >= 0); 00109 00110 if (task_cat.is_debug()) { 00111 do_output(task_cat.debug()); 00112 task_cat.debug(false) 00113 << ": set_num_threads(" << num_threads << ")\n"; 00114 } 00115 00116 if (!Thread::is_threading_supported()) { 00117 num_threads = 0; 00118 } 00119 00120 MutexHolder holder(_manager->_lock); 00121 if (_num_threads != num_threads) { 00122 do_stop_threads(); 00123 _num_threads = num_threads; 00124 00125 if (_num_tasks != 0) { 00126 do_start_threads(); 00127 } 00128 } 00129 } 00130 00131 //////////////////////////////////////////////////////////////////// 00132 // Function: AsyncTaskChain::get_num_threads 00133 // Access: Published 00134 // Description: Returns the number of threads that will be servicing 00135 // tasks for this chain. Also see 00136 // get_num_running_threads(). 00137 //////////////////////////////////////////////////////////////////// 00138 int AsyncTaskChain:: 00139 get_num_threads() const { 00140 MutexHolder holder(_manager->_lock); 00141 return _num_threads; 00142 } 00143 00144 //////////////////////////////////////////////////////////////////// 00145 // Function: AsyncTaskChain::get_num_running_threads 00146 // Access: Published 00147 // Description: Returns the number of threads that have been created 00148 // and are actively running. This will return 0 before 00149 // the threads have been started; it will also return 0 00150 // if thread support is not available. 00151 //////////////////////////////////////////////////////////////////// 00152 int AsyncTaskChain:: 00153 get_num_running_threads() const { 00154 MutexHolder holder(_manager->_lock); 00155 return _threads.size(); 00156 } 00157 00158 //////////////////////////////////////////////////////////////////// 00159 // Function: AsyncTaskChain::set_thread_priority 00160 // Access: Published 00161 // Description: Changes the priority associated with threads that 00162 // serve this task chain. This may require stopping the 00163 // threads if they are already running. 00164 //////////////////////////////////////////////////////////////////// 00165 void AsyncTaskChain:: 00166 set_thread_priority(ThreadPriority priority) { 00167 MutexHolder holder(_manager->_lock); 00168 if (_thread_priority != priority) { 00169 do_stop_threads(); 00170 _thread_priority = priority; 00171 00172 if (_num_tasks != 0) { 00173 do_start_threads(); 00174 } 00175 } 00176 } 00177 00178 //////////////////////////////////////////////////////////////////// 00179 // Function: AsyncTaskChain::get_thread_priority 00180 // Access: Published 00181 // Description: Returns the priority associated with threads that 00182 // serve this task chain. 00183 //////////////////////////////////////////////////////////////////// 00184 ThreadPriority AsyncTaskChain:: 00185 get_thread_priority() const { 00186 MutexHolder holder(_manager->_lock); 00187 return _thread_priority; 00188 } 00189 00190 //////////////////////////////////////////////////////////////////// 00191 // Function: AsyncTaskChain::set_frame_budget 00192 // Access: Published 00193 // Description: Sets the maximum amount of time per frame the tasks 00194 // on this chain are granted for execution. If this is 00195 // less than zero, there is no limit; if it is >= 0, it 00196 // represents a maximum amount of time (in seconds) that 00197 // will be used to execute tasks. If this time is 00198 // exceeded in any one frame, the task chain will stop 00199 // executing tasks until the next frame, as defined by 00200 // the TaskManager's clock. 00201 //////////////////////////////////////////////////////////////////// 00202 void AsyncTaskChain:: 00203 set_frame_budget(double frame_budget) { 00204 MutexHolder holder(_manager->_lock); 00205 _frame_budget = frame_budget; 00206 } 00207 00208 //////////////////////////////////////////////////////////////////// 00209 // Function: AsyncTaskChain::get_frame_budget 00210 // Access: Published 00211 // Description: Returns the maximum amount of time per frame the 00212 // tasks on this chain are granted for execution. See 00213 // set_frame_budget(). 00214 //////////////////////////////////////////////////////////////////// 00215 double AsyncTaskChain:: 00216 get_frame_budget() const { 00217 MutexHolder holder(_manager->_lock); 00218 return _frame_budget; 00219 } 00220 00221 //////////////////////////////////////////////////////////////////// 00222 // Function: AsyncTaskChain::set_frame_sync 00223 // Access: Published 00224 // Description: Sets the frame_sync flag. When this flag is true, 00225 // this task chain will be forced to sync with the 00226 // TaskManager's clock. It will run no faster than one 00227 // epoch per clock frame. 00228 // 00229 // When this flag is false, the default, the task chain 00230 // will finish all of its tasks and then immediately 00231 // start from the first task again, regardless of the 00232 // clock frame. When it is true, the task chain will 00233 // finish all of its tasks and then wait for the clock 00234 // to tick to the next frame before resuming the first 00235 // task. 00236 // 00237 // This only makes sense for threaded task chains. 00238 // Non-threaded task chains are automatically 00239 // synchronous. 00240 //////////////////////////////////////////////////////////////////// 00241 void AsyncTaskChain:: 00242 set_frame_sync(bool frame_sync) { 00243 MutexHolder holder(_manager->_lock); 00244 _frame_sync = frame_sync; 00245 } 00246 00247 //////////////////////////////////////////////////////////////////// 00248 // Function: AsyncTaskChain::get_frame_sync 00249 // Access: Published 00250 // Description: Returns the frame_sync flag. See set_frame_sync(). 00251 //////////////////////////////////////////////////////////////////// 00252 bool AsyncTaskChain:: 00253 get_frame_sync() const { 00254 MutexHolder holder(_manager->_lock); 00255 return _frame_sync; 00256 } 00257 00258 //////////////////////////////////////////////////////////////////// 00259 // Function: AsyncTaskChain::set_timeslice_priority 00260 // Access: Published 00261 // Description: Sets the timeslice_priority flag. This changes 00262 // the interpretation of priority, and the number of 00263 // times per epoch each task will run. 00264 // 00265 // When this flag is true, some tasks might not run in 00266 // any given epoch. Instead, tasks with priority higher 00267 // than 1 will be given precedence, in proportion to the 00268 // amount of time they have already used. This gives 00269 // higher-priority tasks more runtime than 00270 // lower-priority tasks. Each task gets the amount of 00271 // time proportional to its priority value, so a task 00272 // with priority 100 will get five times as much 00273 // processing time as a task with priority 20. For 00274 // these purposes, priority values less than 1 are 00275 // deemed to be equal to 1. 00276 // 00277 // When this flag is false (the default), all tasks are 00278 // run exactly once each epoch, round-robin style. 00279 // Priority is only used to determine which task runs 00280 // first within tasks of the same sort value. 00281 //////////////////////////////////////////////////////////////////// 00282 void AsyncTaskChain:: 00283 set_timeslice_priority(bool timeslice_priority) { 00284 MutexHolder holder(_manager->_lock); 00285 _timeslice_priority = timeslice_priority; 00286 } 00287 00288 //////////////////////////////////////////////////////////////////// 00289 // Function: AsyncTaskChain::get_timeslice_priority 00290 // Access: Published 00291 // Description: Returns the timeslice_priority flag. This changes 00292 // the interpretation of priority, and the number of 00293 // times per epoch each task will run. See 00294 // set_timeslice_priority(). 00295 //////////////////////////////////////////////////////////////////// 00296 bool AsyncTaskChain:: 00297 get_timeslice_priority() const { 00298 MutexHolder holder(_manager->_lock); 00299 return _timeslice_priority; 00300 } 00301 00302 //////////////////////////////////////////////////////////////////// 00303 // Function: AsyncTaskChain::stop_threads 00304 // Access: Published 00305 // Description: Stops any threads that are currently running. If any 00306 // tasks are still pending and have not yet been picked 00307 // up by a thread, they will not be serviced unless 00308 // poll() or start_threads() is later called. 00309 //////////////////////////////////////////////////////////////////// 00310 void AsyncTaskChain:: 00311 stop_threads() { 00312 if (_state == S_started || _state == S_interrupted) { 00313 // Clean up all of the threads. 00314 MutexHolder holder(_manager->_lock); 00315 do_stop_threads(); 00316 } 00317 } 00318 00319 //////////////////////////////////////////////////////////////////// 00320 // Function: AsyncTaskChain::start_threads 00321 // Access: Published 00322 // Description: Starts any requested threads to service the tasks on 00323 // the queue. This is normally not necessary, since 00324 // adding a task will start the threads automatically. 00325 //////////////////////////////////////////////////////////////////// 00326 void AsyncTaskChain:: 00327 start_threads() { 00328 if (_state == S_initial || _state == S_interrupted) { 00329 MutexHolder holder(_manager->_lock); 00330 do_start_threads(); 00331 } 00332 } 00333 00334 //////////////////////////////////////////////////////////////////// 00335 // Function: AsyncTaskChain::has_task 00336 // Access: Published 00337 // Description: Returns true if the indicated task has been added to 00338 // this AsyncTaskChain, false otherwise. 00339 //////////////////////////////////////////////////////////////////// 00340 bool AsyncTaskChain:: 00341 has_task(AsyncTask *task) const { 00342 MutexHolder holder(_manager->_lock); 00343 00344 if (task->_chain != this) { 00345 nassertr(!do_has_task(task), false); 00346 return false; 00347 } 00348 00349 if (task->_state == AsyncTask::S_servicing_removed) { 00350 return false; 00351 } 00352 00353 // The task might not actually be in the active queue, since it 00354 // might be being serviced right now. That's OK. 00355 return true; 00356 } 00357 00358 //////////////////////////////////////////////////////////////////// 00359 // Function: AsyncTaskChain::wait_for_tasks 00360 // Access: Published 00361 // Description: Blocks until the task list is empty. 00362 //////////////////////////////////////////////////////////////////// 00363 void AsyncTaskChain:: 00364 wait_for_tasks() { 00365 MutexHolder holder(_manager->_lock); 00366 do_wait_for_tasks(); 00367 } 00368 00369 //////////////////////////////////////////////////////////////////// 00370 // Function: AsyncTaskChain::get_num_tasks 00371 // Access: Published 00372 // Description: Returns the number of tasks that are currently active 00373 // or sleeping within the task chain. 00374 //////////////////////////////////////////////////////////////////// 00375 int AsyncTaskChain:: 00376 get_num_tasks() const { 00377 MutexHolder holder(_manager->_lock); 00378 return _num_tasks; 00379 } 00380 00381 //////////////////////////////////////////////////////////////////// 00382 // Function: AsyncTaskChain::get_tasks 00383 // Access: Published 00384 // Description: Returns the set of tasks that are active or sleeping 00385 // on the task chain, at the time of the call. 00386 //////////////////////////////////////////////////////////////////// 00387 AsyncTaskCollection AsyncTaskChain:: 00388 get_tasks() const { 00389 MutexHolder holder(_manager->_lock); 00390 AsyncTaskCollection result = do_get_active_tasks(); 00391 result.add_tasks_from(do_get_sleeping_tasks()); 00392 return result; 00393 } 00394 00395 //////////////////////////////////////////////////////////////////// 00396 // Function: AsyncTaskChain::get_active_tasks 00397 // Access: Published 00398 // Description: Returns the set of tasks that are active (and not 00399 // sleeping) on the task chain, at the time of the 00400 // call. 00401 //////////////////////////////////////////////////////////////////// 00402 AsyncTaskCollection AsyncTaskChain:: 00403 get_active_tasks() const { 00404 MutexHolder holder(_manager->_lock); 00405 return do_get_active_tasks(); 00406 } 00407 00408 //////////////////////////////////////////////////////////////////// 00409 // Function: AsyncTaskChain::get_sleeping_tasks 00410 // Access: Published 00411 // Description: Returns the set of tasks that are sleeping (and not 00412 // active) on the task chain, at the time of the 00413 // call. 00414 //////////////////////////////////////////////////////////////////// 00415 AsyncTaskCollection AsyncTaskChain:: 00416 get_sleeping_tasks() const { 00417 MutexHolder holder(_manager->_lock); 00418 return do_get_sleeping_tasks(); 00419 } 00420 00421 //////////////////////////////////////////////////////////////////// 00422 // Function: AsyncTaskChain::poll 00423 // Access: Published 00424 // Description: Runs through all the tasks in the task list, once, if 00425 // the task chain is running in single-threaded mode 00426 // (no threads available). This method does nothing in 00427 // threaded mode, so it may safely be called in either 00428 // case. 00429 // 00430 // Normally, you would not call this function directly; 00431 // instead, call AsyncTaskManager::poll(), which polls 00432 // all of the task chains in sequence. 00433 //////////////////////////////////////////////////////////////////// 00434 void AsyncTaskChain:: 00435 poll() { 00436 MutexHolder holder(_manager->_lock); 00437 do_poll(); 00438 } 00439 00440 //////////////////////////////////////////////////////////////////// 00441 // Function: AsyncTaskChain::get_next_wake_time 00442 // Access: Published 00443 // Description: Returns the scheduled time (on the manager's clock) 00444 // of the next sleeping task, on any task chain, to 00445 // awaken. Returns -1 if there are no sleeping tasks. 00446 //////////////////////////////////////////////////////////////////// 00447 double AsyncTaskChain:: 00448 get_next_wake_time() const { 00449 MutexHolder holder(_manager->_lock); 00450 return do_get_next_wake_time(); 00451 } 00452 00453 //////////////////////////////////////////////////////////////////// 00454 // Function: AsyncTaskChain::output 00455 // Access: Published, Virtual 00456 // Description: 00457 //////////////////////////////////////////////////////////////////// 00458 void AsyncTaskChain:: 00459 output(ostream &out) const { 00460 MutexHolder holder(_manager->_lock); 00461 do_output(out); 00462 } 00463 00464 //////////////////////////////////////////////////////////////////// 00465 // Function: AsyncTaskChain::write 00466 // Access: Published, Virtual 00467 // Description: 00468 //////////////////////////////////////////////////////////////////// 00469 void AsyncTaskChain:: 00470 write(ostream &out, int indent_level) const { 00471 MutexHolder holder(_manager->_lock); 00472 do_write(out, indent_level); 00473 } 00474 00475 //////////////////////////////////////////////////////////////////// 00476 // Function: AsyncTaskChain::do_add 00477 // Access: Protected 00478 // Description: Adds the indicated task to the active queue. It is 00479 // an error if the task is already added to this or any 00480 // other active queue. 00481 // 00482 // This is normally called only by the AsyncTaskManager. 00483 // Assumes the lock is already held. 00484 //////////////////////////////////////////////////////////////////// 00485 void AsyncTaskChain:: 00486 do_add(AsyncTask *task) { 00487 nassertv(task->_chain == NULL && 00488 task->_manager == NULL && 00489 task->_chain_name == get_name() && 00490 task->_state == AsyncTask::S_inactive); 00491 nassertv(!do_has_task(task)); 00492 00493 do_start_threads(); 00494 00495 task->_chain = this; 00496 task->_manager = _manager; 00497 00498 double now = _manager->_clock->get_frame_time(); 00499 task->_start_time = now; 00500 task->_start_frame = _manager->_clock->get_frame_count(); 00501 00502 _manager->add_task_by_name(task); 00503 00504 if (task->has_delay()) { 00505 // This is a deferred task. Add it to the sleeping queue. 00506 task->_wake_time = now + task->get_delay(); 00507 task->_start_time = task->_wake_time; 00508 task->_state = AsyncTask::S_sleeping; 00509 _sleeping.push_back(task); 00510 push_heap(_sleeping.begin(), _sleeping.end(), AsyncTaskSortWakeTime()); 00511 00512 } else { 00513 // This is an active task. Add it to the active set. 00514 task->_state = AsyncTask::S_active; 00515 if (task_cat.is_spam()) { 00516 task_cat.spam() 00517 << "Adding " << *task << " with sort " << task->get_sort() 00518 << " to chain " << get_name() << " with current_sort " 00519 << _current_sort << "\n"; 00520 } 00521 if (task->get_sort() >= _current_sort) { 00522 // It will run this frame. 00523 _active.push_back(task); 00524 push_heap(_active.begin(), _active.end(), AsyncTaskSortPriority()); 00525 } else { 00526 // It will run next frame. 00527 _next_active.push_back(task); 00528 } 00529 } 00530 ++_num_tasks; 00531 ++(_manager->_num_tasks); 00532 _needs_cleanup = true; 00533 00534 _cvar.notify_all(); 00535 } 00536 00537 //////////////////////////////////////////////////////////////////// 00538 // Function: AsyncTaskChain::do_remove 00539 // Access: Protected 00540 // Description: Removes the indicated task from this chain. Returns 00541 // true if removed, false otherwise. Assumes the lock 00542 // is already held. The task->upon_death() method is 00543 // *not* called. 00544 //////////////////////////////////////////////////////////////////// 00545 bool AsyncTaskChain:: 00546 do_remove(AsyncTask *task) { 00547 bool removed = false; 00548 00549 nassertr(task->_chain == this, false); 00550 00551 switch (task->_state) { 00552 case AsyncTask::S_servicing: 00553 // This task is being serviced. 00554 task->_state = AsyncTask::S_servicing_removed; 00555 removed = true; 00556 break; 00557 00558 case AsyncTask::S_servicing_removed: 00559 // Being serviced, though it will be removed later. 00560 break; 00561 00562 case AsyncTask::S_sleeping: 00563 // Sleeping, easy. 00564 { 00565 int index = find_task_on_heap(_sleeping, task); 00566 nassertr(index != -1, false); 00567 _sleeping.erase(_sleeping.begin() + index); 00568 make_heap(_sleeping.begin(), _sleeping.end(), AsyncTaskSortWakeTime()); 00569 removed = true; 00570 cleanup_task(task, false, false); 00571 } 00572 break; 00573 00574 case AsyncTask::S_active: 00575 { 00576 // Active, but not being serviced, easy. 00577 int index = find_task_on_heap(_active, task); 00578 if (index != -1) { 00579 _active.erase(_active.begin() + index); 00580 make_heap(_active.begin(), _active.end(), AsyncTaskSortPriority()); 00581 } else { 00582 index = find_task_on_heap(_next_active, task); 00583 if (index != -1) { 00584 _next_active.erase(_next_active.begin() + index); 00585 } else { 00586 index = find_task_on_heap(_this_active, task); 00587 nassertr(index != -1, false); 00588 } 00589 } 00590 removed = true; 00591 cleanup_task(task, false, false); 00592 } 00593 00594 default: 00595 break; 00596 } 00597 00598 return removed; 00599 } 00600 00601 //////////////////////////////////////////////////////////////////// 00602 // Function: AsyncTaskChain::do_wait_for_tasks 00603 // Access: Protected 00604 // Description: Blocks until the task list is empty. Assumes the 00605 // lock is held. 00606 //////////////////////////////////////////////////////////////////// 00607 void AsyncTaskChain:: 00608 do_wait_for_tasks() { 00609 do_start_threads(); 00610 00611 if (_threads.empty()) { 00612 // Non-threaded case. 00613 while (_num_tasks > 0) { 00614 if (_state == S_shutdown || _state == S_interrupted) { 00615 return; 00616 } 00617 do_poll(); 00618 } 00619 00620 } else { 00621 // Threaded case. 00622 while (_num_tasks > 0) { 00623 if (_state == S_shutdown || _state == S_interrupted) { 00624 return; 00625 } 00626 00627 PStatTimer timer(_wait_pcollector); 00628 _cvar.wait(); 00629 } 00630 } 00631 } 00632 00633 //////////////////////////////////////////////////////////////////// 00634 // Function: AsyncTaskChain::do_cleanup 00635 // Access: Protected 00636 // Description: Stops all threads and messily empties the task list. 00637 // This is intended to be called on destruction only. 00638 // Assumes the lock is already held. 00639 //////////////////////////////////////////////////////////////////// 00640 void AsyncTaskChain:: 00641 do_cleanup() { 00642 if (task_cat.is_spam()) { 00643 do_output(task_cat.spam()); 00644 task_cat.spam(false) 00645 << ": do_cleanup()\n"; 00646 } 00647 do_stop_threads(); 00648 _num_threads = 0; 00649 00650 // Don't call the upon_death functions while we clean up the tasks. 00651 // Instead, store all the tasks in a list as we clean them up, and 00652 // then call the upon_death functions all at once. We do this 00653 // because calling upon_death wil release the lock, allowing the 00654 // iterators to become invalid. 00655 00656 TaskHeap dead; 00657 dead.reserve(_num_tasks); 00658 00659 _needs_cleanup = false; 00660 00661 TaskHeap::const_iterator ti; 00662 for (ti = _active.begin(); ti != _active.end(); ++ti) { 00663 AsyncTask *task = (*ti); 00664 dead.push_back(task); 00665 cleanup_task(task, false, false); 00666 } 00667 for (ti = _this_active.begin(); ti != _this_active.end(); ++ti) { 00668 AsyncTask *task = (*ti); 00669 dead.push_back(task); 00670 cleanup_task(task, false, false); 00671 } 00672 for (ti = _next_active.begin(); ti != _next_active.end(); ++ti) { 00673 AsyncTask *task = (*ti); 00674 dead.push_back(task); 00675 cleanup_task(task, false, false); 00676 } 00677 for (ti = _sleeping.begin(); ti != _sleeping.end(); ++ti) { 00678 AsyncTask *task = (*ti); 00679 dead.push_back(task); 00680 cleanup_task(task, false, false); 00681 } 00682 00683 // There might still be one task remaining: the currently-executing 00684 // task. 00685 nassertv(_num_tasks == 0 || _num_tasks == 1); 00686 00687 // Now go back and call the upon_death functions. 00688 _manager->_lock.release(); 00689 for (ti = dead.begin(); ti != dead.end(); ++ti) { 00690 (*ti)->upon_death(_manager, false); 00691 } 00692 _manager->_lock.acquire(); 00693 00694 if (task_cat.is_spam()) { 00695 do_output(task_cat.spam()); 00696 task_cat.spam(false) 00697 << ": done do_cleanup()\n"; 00698 } 00699 } 00700 00701 //////////////////////////////////////////////////////////////////// 00702 // Function: AsyncTaskChain::do_has_task 00703 // Access: Protected 00704 // Description: Returns true if the task is on one of the task lists, 00705 // false if it is not (false may mean that the task is 00706 // currently being serviced). Assumes the lock is 00707 // currently held. 00708 //////////////////////////////////////////////////////////////////// 00709 bool AsyncTaskChain:: 00710 do_has_task(AsyncTask *task) const { 00711 return (find_task_on_heap(_active, task) != -1 || 00712 find_task_on_heap(_next_active, task) != -1 || 00713 find_task_on_heap(_sleeping, task) != -1 || 00714 find_task_on_heap(_this_active, task) != -1); 00715 } 00716 00717 //////////////////////////////////////////////////////////////////// 00718 // Function: AsyncTaskChain::find_task_on_heap 00719 // Access: Protected 00720 // Description: Returns the index number of the indicated task within 00721 // the specified task list, or -1 if the task is not 00722 // found in the list (this may mean that it is currently 00723 // being serviced). Assumes that the lock is currently 00724 // held. 00725 //////////////////////////////////////////////////////////////////// 00726 int AsyncTaskChain:: 00727 find_task_on_heap(const TaskHeap &heap, AsyncTask *task) const { 00728 for (int i = 0; i < (int)heap.size(); ++i) { 00729 if (heap[i] == task) { 00730 return i; 00731 } 00732 } 00733 00734 return -1; 00735 } 00736 00737 //////////////////////////////////////////////////////////////////// 00738 // Function: AsyncTaskChain::service_one_task 00739 // Access: Protected 00740 // Description: Pops a single task off the active queue, services it, 00741 // and restores it to the end of the queue. This is 00742 // called internally only within one of the task 00743 // threads. Assumes the lock is already held. 00744 // 00745 // Note that the lock may be temporarily released by 00746 // this method. 00747 //////////////////////////////////////////////////////////////////// 00748 void AsyncTaskChain:: 00749 service_one_task(AsyncTaskChain::AsyncTaskChainThread *thread) { 00750 if (!_active.empty()) { 00751 PT(AsyncTask) task = _active.front(); 00752 pop_heap(_active.begin(), _active.end(), AsyncTaskSortPriority()); 00753 _active.pop_back(); 00754 00755 if (thread != (AsyncTaskChain::AsyncTaskChainThread *)NULL) { 00756 thread->_servicing = task; 00757 } 00758 00759 if (task_cat.is_spam()) { 00760 task_cat.spam() 00761 << "Servicing " << *task << " in " 00762 << *Thread::get_current_thread() << "\n"; 00763 } 00764 00765 nassertv(task->get_sort() == _current_sort); 00766 nassertv(task->_state == AsyncTask::S_active); 00767 task->_state = AsyncTask::S_servicing; 00768 task->_servicing_thread = thread; 00769 00770 AsyncTask::DoneStatus ds = task->unlock_and_do_task(); 00771 00772 if (thread != (AsyncTaskChain::AsyncTaskChainThread *)NULL) { 00773 thread->_servicing = NULL; 00774 } 00775 task->_servicing_thread = NULL; 00776 00777 if (task->_chain == this) { 00778 if (task->_state == AsyncTask::S_servicing_removed) { 00779 // This task wants to kill itself. 00780 cleanup_task(task, true, false); 00781 00782 } else if (task->_chain_name != get_name()) { 00783 // The task wants to jump to a different chain. 00784 PT(AsyncTask) hold_task = task; 00785 cleanup_task(task, false, false); 00786 task->jump_to_task_chain(_manager); 00787 00788 } else { 00789 switch (ds) { 00790 case AsyncTask::DS_cont: 00791 // The task is still alive; put it on the next frame's active 00792 // queue. 00793 task->_state = AsyncTask::S_active; 00794 _next_active.push_back(task); 00795 _cvar.notify_all(); 00796 break; 00797 00798 case AsyncTask::DS_again: 00799 // The task wants to sleep again. 00800 { 00801 double now = _manager->_clock->get_frame_time(); 00802 task->_wake_time = now + task->get_delay(); 00803 task->_start_time = task->_wake_time; 00804 task->_state = AsyncTask::S_sleeping; 00805 _sleeping.push_back(task); 00806 push_heap(_sleeping.begin(), _sleeping.end(), AsyncTaskSortWakeTime()); 00807 if (task_cat.is_spam()) { 00808 task_cat.spam() 00809 << "Sleeping " << *task << ", wake time at " 00810 << task->_wake_time - now << "\n"; 00811 } 00812 _cvar.notify_all(); 00813 } 00814 break; 00815 00816 case AsyncTask::DS_pickup: 00817 // The task wants to run again this frame if possible. 00818 task->_state = AsyncTask::S_active; 00819 _this_active.push_back(task); 00820 _cvar.notify_all(); 00821 break; 00822 00823 case AsyncTask::DS_interrupt: 00824 // The task had an exception and wants to raise a big flag. 00825 task->_state = AsyncTask::S_active; 00826 _next_active.push_back(task); 00827 if (_state == S_started) { 00828 _state = S_interrupted; 00829 _cvar.notify_all(); 00830 } 00831 break; 00832 00833 default: 00834 // The task has finished. 00835 cleanup_task(task, true, true); 00836 } 00837 } 00838 } else { 00839 task_cat.error() 00840 << "Task is no longer on chain " << get_name() 00841 << ": " << *task << "\n"; 00842 } 00843 00844 if (task_cat.is_spam()) { 00845 task_cat.spam() 00846 << "Done servicing " << *task << " in " 00847 << *Thread::get_current_thread() << "\n"; 00848 } 00849 } 00850 thread_consider_yield(); 00851 } 00852 00853 //////////////////////////////////////////////////////////////////// 00854 // Function: AsyncTaskChain::cleanup_task 00855 // Access: Protected 00856 // Description: Called internally when a task has completed (or been 00857 // interrupted) and is about to be removed from the 00858 // active queue. Assumes the lock is held. 00859 // 00860 // If upon_death is true, then task->upon_death() will 00861 // also be called, with the indicated clean_exit 00862 // parameter. 00863 // 00864 // Note that the lock may be temporarily released by 00865 // this method. 00866 //////////////////////////////////////////////////////////////////// 00867 void AsyncTaskChain:: 00868 cleanup_task(AsyncTask *task, bool upon_death, bool clean_exit) { 00869 if (task_cat.is_spam()) { 00870 do_output(task_cat.spam()); 00871 task_cat.spam(false) 00872 << ": cleanup_task(" << *task << ", " << upon_death << ", " << clean_exit 00873 << ")\n"; 00874 } 00875 00876 nassertv(task->_chain == this); 00877 PT(AsyncTask) hold_task = task; 00878 00879 task->_state = AsyncTask::S_inactive; 00880 task->_chain = NULL; 00881 task->_manager = NULL; 00882 --_num_tasks; 00883 --(_manager->_num_tasks); 00884 00885 _manager->remove_task_by_name(task); 00886 00887 if (upon_death) { 00888 _manager->_lock.release(); 00889 task->upon_death(_manager, clean_exit); 00890 _manager->_lock.acquire(); 00891 } 00892 } 00893 00894 //////////////////////////////////////////////////////////////////// 00895 // Function: AsyncTaskChain::finish_sort_group 00896 // Access: Protected 00897 // Description: Called internally when all tasks of a given sort 00898 // value have been completed, and it is time to 00899 // increment to the next sort value, or begin the next 00900 // epoch. Assumes the lock is held. 00901 // 00902 // Returns true if there are more tasks on the queue 00903 // after this operation, or false if the task list is 00904 // empty and we need to wait. 00905 //////////////////////////////////////////////////////////////////// 00906 bool AsyncTaskChain:: 00907 finish_sort_group() { 00908 nassertr(_num_busy_threads == 0, true); 00909 00910 if (!_threads.empty()) { 00911 PStatClient::thread_tick(get_name()); 00912 } 00913 00914 if (!_active.empty()) { 00915 // There are more tasks; just set the next sort value. 00916 nassertr(_current_sort < _active.front()->get_sort(), true); 00917 _current_sort = _active.front()->get_sort(); 00918 _cvar.notify_all(); 00919 return true; 00920 } 00921 00922 // There are no more tasks in this epoch; advance to the next epoch. 00923 00924 if (!_this_active.empty() && _frame_budget >= 0.0) { 00925 // Enter pickup mode. This is a special mode at the end of the 00926 // epoch in which we are just re-running the tasks that think they 00927 // can still run within the frame, in an attempt to use up our 00928 // frame budget. 00929 00930 if (task_cat.is_spam()) { 00931 do_output(task_cat.spam()); 00932 task_cat.spam(false) 00933 << ": next epoch (pickup mode)\n"; 00934 } 00935 00936 _pickup_mode = true; 00937 _active.swap(_this_active); 00938 00939 } else { 00940 // Not in pickup mode. 00941 00942 if (task_cat.is_spam()) { 00943 do_output(task_cat.spam()); 00944 task_cat.spam(false) 00945 << ": next epoch\n"; 00946 } 00947 00948 _pickup_mode = false; 00949 00950 // Here, there's no difference between _this_active and 00951 // _next_active. Combine them. 00952 _next_active.insert(_next_active.end(), _this_active.begin(), _this_active.end()); 00953 _this_active.clear(); 00954 00955 _active.swap(_next_active); 00956 00957 // We only tick the clock and wake sleepers in normal mode, the 00958 // first time through the task list; not in pickup mode when we 00959 // are re-running the stragglers just to use up our frame budget. 00960 00961 if (_tick_clock) { 00962 if (task_cat.is_spam()) { 00963 do_output(task_cat.spam()); 00964 task_cat.spam(false) 00965 << ": tick clock\n"; 00966 } 00967 _manager->_clock->tick(); 00968 _manager->_frame_cvar.notify_all(); 00969 00970 } else if (_frame_sync) { 00971 // If we're a synced chain, we have to wait at the end of the 00972 // epoch for someone else to tick the clock. 00973 _block_till_next_frame = true; 00974 } 00975 00976 // Check for any sleeping tasks that need to be woken. 00977 double now = _manager->_clock->get_frame_time(); 00978 while (!_sleeping.empty() && _sleeping.front()->_wake_time <= now) { 00979 PT(AsyncTask) task = _sleeping.front(); 00980 if (task_cat.is_spam()) { 00981 task_cat.spam() 00982 << "Waking " << *task << ", wake time at " 00983 << task->_wake_time - now << "\n"; 00984 } 00985 pop_heap(_sleeping.begin(), _sleeping.end(), AsyncTaskSortWakeTime()); 00986 _sleeping.pop_back(); 00987 task->_state = AsyncTask::S_active; 00988 task->_start_frame = _manager->_clock->get_frame_count(); 00989 _active.push_back(task); 00990 } 00991 00992 if (task_cat.is_spam()) { 00993 if (_sleeping.empty()) { 00994 task_cat.spam() 00995 << "No more tasks on sleeping queue.\n"; 00996 } else { 00997 task_cat.spam() 00998 << "Next sleeper: " << *_sleeping.front() << ", wake time at " 00999 << _sleeping.front()->_wake_time - now << "\n"; 01000 } 01001 } 01002 01003 // Any tasks that are on the active queue at the beginning of the 01004 // epoch are deemed to have run one frame (or to be about to). 01005 TaskHeap::const_iterator ti; 01006 for (ti = _active.begin(); ti != _active.end(); ++ti) { 01007 AsyncTask *task = (*ti); 01008 ++task->_num_frames; 01009 } 01010 } 01011 01012 if (_timeslice_priority) { 01013 filter_timeslice_priority(); 01014 } 01015 01016 nassertr((size_t)_num_tasks == _active.size() + _this_active.size() + _next_active.size() + _sleeping.size(), true); 01017 make_heap(_active.begin(), _active.end(), AsyncTaskSortPriority()); 01018 01019 _current_sort = -INT_MAX; 01020 01021 if (!_active.empty()) { 01022 // Signal the threads to start executing the first task again. 01023 _cvar.notify_all(); 01024 return true; 01025 } 01026 01027 // There are no tasks to be had anywhere. Chill. 01028 _pickup_mode = false; 01029 nassertr(_this_active.empty(), false); 01030 return false; 01031 } 01032 01033 //////////////////////////////////////////////////////////////////// 01034 // Function: AsyncTaskChain::filter_timeslice_priority 01035 // Access: Protected 01036 // Description: Called to filter the _active tasks list when we are 01037 // in the special timeslice_priority mode. In this 01038 // mode, go through and postpone any tasks that have 01039 // already exceeded their priority budget for this 01040 // epoch. 01041 // 01042 // Assumes the lock is already held. 01043 //////////////////////////////////////////////////////////////////// 01044 void AsyncTaskChain:: 01045 filter_timeslice_priority() { 01046 if (_active.empty()) { 01047 return; 01048 } 01049 nassertv(_timeslice_priority); 01050 01051 // We must first sum up the average per-epoch runtime of each task. 01052 double net_runtime = 0.0; 01053 int net_priority = 0; 01054 01055 TaskHeap::iterator ti; 01056 for (ti = _active.begin(); ti != _active.end(); ++ti) { 01057 AsyncTask *task = (*ti); 01058 double runtime = max(task->get_average_dt(), 0.0); 01059 int priority = max(task->_priority, 1); 01060 net_runtime += runtime; 01061 net_priority += priority; 01062 } 01063 01064 // That gives us a timeslice budget per priority value. 01065 double average_budget = net_runtime / (double)net_priority; 01066 01067 TaskHeap keep, postpone; 01068 for (ti = _active.begin(); ti != _active.end(); ++ti) { 01069 AsyncTask *task = (*ti); 01070 double runtime = max(task->get_average_dt(), 0.0); 01071 int priority = max(task->_priority, 1); 01072 double consumed = runtime / (double)priority; 01073 // cerr << *task << " consumed " << consumed << " vs. " << average_budget << "\n"; 01074 if (consumed > average_budget) { 01075 // Postpone. Run this task next epoch. 01076 postpone.push_back(task); 01077 } else { 01078 // Keep, and run this task this epoch. 01079 keep.push_back(task); 01080 } 01081 } 01082 01083 if (keep.empty()) { 01084 // Hmm, nothing to keep. Grab the postponed task with the highest 01085 // priority and keep that instead. 01086 nassertv(!postpone.empty()); 01087 ti = postpone.begin(); 01088 TaskHeap::iterator max_ti = ti; 01089 ++ti; 01090 while (ti != postpone.end()) { 01091 if ((*ti)->_priority > (*max_ti)->_priority) { 01092 max_ti = ti; 01093 } 01094 } 01095 01096 // cerr << "Nothing to keep, keeping " << *(*max_ti) << " instead\n"; 01097 01098 keep.push_back(*max_ti); 01099 postpone.erase(max_ti); 01100 } 01101 01102 _active.swap(keep); 01103 if (_pickup_mode) { 01104 _this_active.insert(_this_active.end(), postpone.begin(), postpone.end()); 01105 } else { 01106 _next_active.insert(_next_active.end(), postpone.begin(), postpone.end()); 01107 } 01108 01109 nassertv(!_active.empty()); 01110 } 01111 01112 //////////////////////////////////////////////////////////////////// 01113 // Function: AsyncTaskChain::do_stop_threads 01114 // Access: Protected 01115 // Description: The private implementation of stop_threads; assumes 01116 // the lock is already held. 01117 //////////////////////////////////////////////////////////////////// 01118 void AsyncTaskChain:: 01119 do_stop_threads() { 01120 if (_state == S_started || _state == S_interrupted) { 01121 if (task_cat.is_debug() && !_threads.empty()) { 01122 task_cat.debug() 01123 << "Stopping " << _threads.size() 01124 << " threads for " << _manager->get_name() 01125 << " chain " << get_name() 01126 << " in " << *Thread::get_current_thread() << "\n"; 01127 } 01128 01129 _state = S_shutdown; 01130 _cvar.notify_all(); 01131 _manager->_frame_cvar.notify_all(); 01132 01133 Threads wait_threads; 01134 wait_threads.swap(_threads); 01135 01136 // We have to release the lock while we join, so the threads can 01137 // wake up and see that we're shutting down. 01138 _manager->_lock.release(); 01139 Threads::iterator ti; 01140 for (ti = wait_threads.begin(); ti != wait_threads.end(); ++ti) { 01141 if (task_cat.is_debug()) { 01142 task_cat.debug() 01143 << "Waiting for " << *(*ti) << " in " 01144 << *Thread::get_current_thread() << "\n"; 01145 } 01146 (*ti)->join(); 01147 if (task_cat.is_spam()) { 01148 task_cat.spam() 01149 << "Done waiting for " << *(*ti) << " in " 01150 << *Thread::get_current_thread() << "\n"; 01151 } 01152 } 01153 _manager->_lock.acquire(); 01154 01155 _state = S_initial; 01156 01157 // There might be one busy "thread" still: the main thread. 01158 nassertv(_num_busy_threads == 0 || _num_busy_threads == 1); 01159 cleanup_pickup_mode(); 01160 } 01161 } 01162 01163 //////////////////////////////////////////////////////////////////// 01164 // Function: AsyncTaskChain::do_start_threads 01165 // Access: Protected 01166 // Description: The private implementation of start_threads; assumes 01167 // the lock is already held. 01168 //////////////////////////////////////////////////////////////////// 01169 void AsyncTaskChain:: 01170 do_start_threads() { 01171 if (_state == S_interrupted) { 01172 do_stop_threads(); 01173 } 01174 01175 if (_state == S_initial) { 01176 _state = S_started; 01177 if (Thread::is_threading_supported() && _num_threads > 0) { 01178 if (task_cat.is_debug()) { 01179 task_cat.debug() 01180 << "Starting " << _num_threads << " threads for " 01181 << _manager->get_name() << " chain " << get_name() << "\n"; 01182 } 01183 _needs_cleanup = true; 01184 _threads.reserve(_num_threads); 01185 for (int i = 0; i < _num_threads; ++i) { 01186 ostringstream strm; 01187 strm << _manager->get_name() << "_" << get_name() << "_" << i; 01188 PT(AsyncTaskChainThread) thread = new AsyncTaskChainThread(strm.str(), this); 01189 if (thread->start(_thread_priority, true)) { 01190 _threads.push_back(thread); 01191 } 01192 } 01193 } 01194 } 01195 } 01196 01197 //////////////////////////////////////////////////////////////////// 01198 // Function: AsyncTaskChain::do_get_active_tasks 01199 // Access: Protected 01200 // Description: Returns the set of tasks that are active (and not 01201 // sleeping) on the task chain, at the time of the 01202 // call. Assumes the lock is held. 01203 //////////////////////////////////////////////////////////////////// 01204 AsyncTaskCollection AsyncTaskChain:: 01205 do_get_active_tasks() const { 01206 AsyncTaskCollection result; 01207 01208 Threads::const_iterator thi; 01209 for (thi = _threads.begin(); thi != _threads.end(); ++thi) { 01210 AsyncTask *task = (*thi)->_servicing; 01211 if (task != (AsyncTask *)NULL) { 01212 result.add_task(task); 01213 } 01214 } 01215 TaskHeap::const_iterator ti; 01216 for (ti = _active.begin(); ti != _active.end(); ++ti) { 01217 AsyncTask *task = (*ti); 01218 result.add_task(task); 01219 } 01220 for (ti = _this_active.begin(); ti != _this_active.end(); ++ti) { 01221 AsyncTask *task = (*ti); 01222 result.add_task(task); 01223 } 01224 for (ti = _next_active.begin(); ti != _next_active.end(); ++ti) { 01225 AsyncTask *task = (*ti); 01226 result.add_task(task); 01227 } 01228 01229 return result; 01230 } 01231 01232 //////////////////////////////////////////////////////////////////// 01233 // Function: AsyncTaskChain::do_get_sleeping_tasks 01234 // Access: Published 01235 // Description: Returns the set of tasks that are sleeping (and not 01236 // active) on the task chain, at the time of the 01237 // call. Assumes the lock is held. 01238 //////////////////////////////////////////////////////////////////// 01239 AsyncTaskCollection AsyncTaskChain:: 01240 do_get_sleeping_tasks() const { 01241 AsyncTaskCollection result; 01242 01243 TaskHeap::const_iterator ti; 01244 for (ti = _sleeping.begin(); ti != _sleeping.end(); ++ti) { 01245 AsyncTask *task = (*ti); 01246 result.add_task(task); 01247 } 01248 01249 return result; 01250 } 01251 01252 //////////////////////////////////////////////////////////////////// 01253 // Function: AsyncTaskChain::do_poll 01254 // Access: Protected 01255 // Description: The private implementation of poll(), this assumes 01256 // the lock is already held. 01257 //////////////////////////////////////////////////////////////////// 01258 void AsyncTaskChain:: 01259 do_poll() { 01260 thread_consider_yield(); 01261 if (_num_tasks == 0) { 01262 return; 01263 } 01264 01265 do_start_threads(); 01266 01267 if (!_threads.empty()) { 01268 return; 01269 } 01270 01271 if (_num_busy_threads != 0) { 01272 // We are recursively nested within another task. Return, with a warning. 01273 task_cat.warning() 01274 << "Ignoring recursive poll() within another task.\n"; 01275 return; 01276 } 01277 01278 nassertv(!_pickup_mode); 01279 01280 do { 01281 while (!_active.empty()) { 01282 if (_state == S_shutdown || _state == S_interrupted) { 01283 return; 01284 } 01285 int frame = _manager->_clock->get_frame_count(); 01286 if (_current_frame != frame) { 01287 _current_frame = frame; 01288 _time_in_frame = 0.0; 01289 _block_till_next_frame = false; 01290 } 01291 if (_block_till_next_frame || 01292 (_frame_budget >= 0.0 && _time_in_frame >= _frame_budget)) { 01293 // If we've exceeded our budget, stop here. We'll resume from 01294 // this point at the next call to poll(). 01295 cleanup_pickup_mode(); 01296 return; 01297 } 01298 01299 _current_sort = _active.front()->get_sort(); 01300 01301 // Normally, there won't be any threads running at the same time 01302 // we're in poll(). But it's possible, if someone calls 01303 // set_num_threads() while we're processing. 01304 _num_busy_threads++; 01305 service_one_task(NULL); 01306 _num_busy_threads--; 01307 _cvar.notify_all(); 01308 01309 if (!_threads.empty()) { 01310 return; 01311 } 01312 } 01313 01314 finish_sort_group(); 01315 } while (_pickup_mode); 01316 } 01317 01318 //////////////////////////////////////////////////////////////////// 01319 // Function: AsyncTaskChain::cleanup_pickup_mode 01320 // Access: Protected 01321 // Description: Clean up the damage from setting pickup mode. This 01322 // means we restore the _active and _next_active lists 01323 // as they should have been without pickup mode, for 01324 // next frame. Assumes the lock is held. 01325 //////////////////////////////////////////////////////////////////// 01326 void AsyncTaskChain:: 01327 cleanup_pickup_mode() { 01328 if (_pickup_mode) { 01329 _pickup_mode = false; 01330 01331 // Move everything to the _next_active queue. 01332 _next_active.insert(_next_active.end(), _this_active.begin(), _this_active.end()); 01333 _this_active.clear(); 01334 _next_active.insert(_next_active.end(), _active.begin(), _active.end()); 01335 _active.clear(); 01336 01337 // Now finish the epoch properly. 01338 finish_sort_group(); 01339 } 01340 } 01341 01342 //////////////////////////////////////////////////////////////////// 01343 // Function: AsyncTaskChain::do_output 01344 // Access: Protected 01345 // Description: The private implementation of output(), this assumes 01346 // the lock is already held. 01347 //////////////////////////////////////////////////////////////////// 01348 void AsyncTaskChain:: 01349 do_output(ostream &out) const { 01350 if (_manager != (AsyncTaskManager *)NULL) { 01351 out << _manager->get_type() << " " << _manager->get_name(); 01352 } else { 01353 out << "(no manager)"; 01354 } 01355 out << " task chain " << get_name() 01356 << "; " << _num_tasks << " tasks"; 01357 } 01358 01359 //////////////////////////////////////////////////////////////////// 01360 // Function: AsyncTaskChain::do_write 01361 // Access: Protected 01362 // Description: The private implementation of write(), this assumes 01363 // the lock is already held. 01364 //////////////////////////////////////////////////////////////////// 01365 void AsyncTaskChain:: 01366 do_write(ostream &out, int indent_level) const { 01367 indent(out, indent_level) 01368 << "Task chain \"" << get_name() << "\"\n"; 01369 if (_num_threads > 0) { 01370 indent(out, indent_level + 2) 01371 << _num_threads << " threads, priority " << _thread_priority << "\n"; 01372 } 01373 if (_frame_budget >= 0.0) { 01374 indent(out, indent_level + 2) 01375 << "frame budget " << _frame_budget << " s\n"; 01376 } 01377 if (_timeslice_priority) { 01378 indent(out, indent_level + 2) 01379 << "timeslice priority\n"; 01380 } 01381 if (_tick_clock) { 01382 indent(out, indent_level + 2) 01383 << "tick clock\n"; 01384 } 01385 01386 static const size_t buffer_size = 1024; 01387 char buffer[buffer_size]; 01388 sprintf(buffer, " %-32s %8s %8s %8s %8s %6s", 01389 "Task", 01390 "sleep(s)", 01391 "dt(ms)", "avg", "max", 01392 "sort"); 01393 nassertv(strlen(buffer) < buffer_size); 01394 01395 indent(out, indent_level) 01396 << buffer << "\n"; 01397 01398 indent(out, indent_level); 01399 for (int i = 0; i < 32+8+8+8+8+6+7; ++i) { 01400 out << '-'; 01401 } 01402 out << "\n"; 01403 01404 // Collect a list of all active tasks, then sort them into order for 01405 // output. 01406 TaskHeap tasks = _active; 01407 tasks.insert(tasks.end(), _this_active.begin(), _this_active.end()); 01408 tasks.insert(tasks.end(), _next_active.begin(), _next_active.end()); 01409 01410 Threads::const_iterator thi; 01411 for (thi = _threads.begin(); thi != _threads.end(); ++thi) { 01412 AsyncTask *task = (*thi)->_servicing; 01413 if (task != (AsyncTask *)NULL) { 01414 tasks.push_back(task); 01415 } 01416 } 01417 01418 double now = _manager->_clock->get_frame_time(); 01419 01420 if (!tasks.empty()) { 01421 sort(tasks.begin(), tasks.end(), AsyncTaskSortPriority()); 01422 01423 // Since AsyncTaskSortPriority() sorts backwards (because of STL's 01424 // push_heap semantics), we go through the task list in reverse 01425 // order to print them forwards. 01426 TaskHeap::reverse_iterator ti; 01427 for (ti = tasks.rbegin(); ti != tasks.rend(); ++ti) { 01428 AsyncTask *task = (*ti); 01429 write_task_line(out, indent_level, task, now); 01430 } 01431 } 01432 01433 // Instead of iterating through the _sleeping list in heap order, 01434 // copy it and then use repeated pops to get it out in sorted 01435 // order, for the user's satisfaction. 01436 TaskHeap sleeping = _sleeping; 01437 while (!sleeping.empty()) { 01438 PT(AsyncTask) task = sleeping.front(); 01439 pop_heap(sleeping.begin(), sleeping.end(), AsyncTaskSortWakeTime()); 01440 sleeping.pop_back(); 01441 01442 write_task_line(out, indent_level, task, now); 01443 } 01444 } 01445 01446 //////////////////////////////////////////////////////////////////// 01447 // Function: AsyncTaskChain::write_task_line 01448 // Access: Protected 01449 // Description: Writes a single line for a task in the report 01450 // generated by do_write(). Assumes the lock is already 01451 // held. 01452 //////////////////////////////////////////////////////////////////// 01453 void AsyncTaskChain:: 01454 write_task_line(ostream &out, int indent_level, AsyncTask *task, double now) const { 01455 char servicing_flag = ' '; 01456 if (task->_state == AsyncTask::S_servicing) { 01457 servicing_flag = '*'; 01458 } else if (task->_state == AsyncTask::S_servicing_removed) { 01459 servicing_flag = '-'; 01460 } 01461 01462 static const size_t buffer_size = 1024; 01463 char buffer[buffer_size]; 01464 01465 if (task->_state == AsyncTask::S_sleeping) { 01466 // For sleeping tasks, include the wake time, as an elapsed time 01467 // in seconds. 01468 string name = task->get_name().substr(0, 32); 01469 sprintf(buffer, "%c%-32s %8.1f", 01470 servicing_flag, name.c_str(), 01471 task->_wake_time - now); 01472 } else { 01473 // For active tasks, don't include a wake time. This means we 01474 // have more space for the name. 01475 string name = task->get_name().substr(0, 41); 01476 sprintf(buffer, "%c%-41s", 01477 servicing_flag, name.c_str()); 01478 } 01479 nassertv(strlen(buffer) < buffer_size); 01480 01481 indent(out, indent_level) 01482 << buffer; 01483 01484 if (task->_num_frames > 0) { 01485 sprintf(buffer, " %8.1f %8.1f %8.1f %6d", 01486 task->_dt * 1000.0, task->get_average_dt() * 1000.0, 01487 task->_max_dt * 1000.0, 01488 task->_sort); 01489 } else { 01490 // No statistics for a task that hasn't run yet. 01491 sprintf(buffer, " %8s %8s %8s %6d", 01492 "", "", "", 01493 task->_sort); 01494 } 01495 01496 nassertv(strlen(buffer) < buffer_size); 01497 out << buffer << "\n"; 01498 } 01499 01500 //////////////////////////////////////////////////////////////////// 01501 // Function: AsyncTaskChain::AsyncTaskChainThread::Constructor 01502 // Access: Public 01503 // Description: 01504 //////////////////////////////////////////////////////////////////// 01505 AsyncTaskChain::AsyncTaskChainThread:: 01506 AsyncTaskChainThread(const string &name, AsyncTaskChain *chain) : 01507 Thread(name, chain->get_name()), 01508 _chain(chain), 01509 _servicing(NULL) 01510 { 01511 } 01512 01513 //////////////////////////////////////////////////////////////////// 01514 // Function: AsyncTaskChain::AsyncTaskChainThread::thread_main 01515 // Access: Public, Virtual 01516 // Description: 01517 //////////////////////////////////////////////////////////////////// 01518 void AsyncTaskChain::AsyncTaskChainThread:: 01519 thread_main() { 01520 MutexHolder holder(_chain->_manager->_lock); 01521 while (_chain->_state != S_shutdown && _chain->_state != S_interrupted) { 01522 thread_consider_yield(); 01523 if (!_chain->_active.empty() && 01524 _chain->_active.front()->get_sort() == _chain->_current_sort) { 01525 01526 int frame = _chain->_manager->_clock->get_frame_count(); 01527 if (_chain->_current_frame != frame) { 01528 _chain->_current_frame = frame; 01529 _chain->_time_in_frame = 0.0; 01530 _chain->_block_till_next_frame = false; 01531 } 01532 01533 // If we've exceeded our frame budget, sleep until the next 01534 // frame. 01535 if (_chain->_block_till_next_frame || 01536 (_chain->_frame_budget >= 0.0 && _chain->_time_in_frame >= _chain->_frame_budget)) { 01537 while ((_chain->_block_till_next_frame || 01538 (_chain->_frame_budget >= 0.0 && _chain->_time_in_frame >= _chain->_frame_budget)) && 01539 _chain->_state != S_shutdown && _chain->_state != S_interrupted) { 01540 _chain->cleanup_pickup_mode(); 01541 _chain->_manager->_frame_cvar.wait(); 01542 frame = _chain->_manager->_clock->get_frame_count(); 01543 if (_chain->_current_frame != frame) { 01544 _chain->_current_frame = frame; 01545 _chain->_time_in_frame = 0.0; 01546 _chain->_block_till_next_frame = false; 01547 } 01548 } 01549 // Now that it's the next frame, go back to the top of the loop. 01550 continue; 01551 } 01552 01553 PStatTimer timer(_task_pcollector); 01554 _chain->_num_busy_threads++; 01555 _chain->service_one_task(this); 01556 _chain->_num_busy_threads--; 01557 _chain->_cvar.notify_all(); 01558 01559 } else { 01560 // We've finished all the available tasks of the current sort 01561 // value. We can't pick up a new task until all of the threads 01562 // finish the tasks with the same sort value. 01563 if (_chain->_num_busy_threads == 0) { 01564 // We're the last thread to finish. Update _current_sort. 01565 if (!_chain->finish_sort_group()) { 01566 // Nothing to do. Wait for more tasks to be added. 01567 if (_chain->_sleeping.empty()) { 01568 PStatTimer timer(_wait_pcollector); 01569 _chain->_cvar.wait(); 01570 } else { 01571 double wake_time = _chain->do_get_next_wake_time(); 01572 double now = _chain->_manager->_clock->get_frame_time(); 01573 double timeout = max(wake_time - now, 0.0); 01574 PStatTimer timer(_wait_pcollector); 01575 _chain->_cvar.wait(timeout); 01576 } 01577 } 01578 01579 } else { 01580 // Wait for the other threads to finish their current task 01581 // before we continue. 01582 PStatTimer timer(_wait_pcollector); 01583 _chain->_cvar.wait(); 01584 } 01585 } 01586 } 01587 } 01588