00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015 #include "asyncTaskManager.h"
00016 #include "event.h"
00017 #include "pt_Event.h"
00018 #include "mutexHolder.h"
00019 #include "indent.h"
00020 #include "pStatClient.h"
00021 #include "pStatTimer.h"
00022 #include "clockObject.h"
00023 #include "config_event.h"
00024 #include <algorithm>
00025
00026 PT(AsyncTaskManager) AsyncTaskManager::_global_ptr;
00027
00028 TypeHandle AsyncTaskManager::_type_handle;
00029
00030
00031
00032
00033
00034
00035 AsyncTaskManager::
00036 AsyncTaskManager(const string &name) :
00037 Namable(name),
00038 _lock("AsyncTaskManager::_lock"),
00039 _num_tasks(0),
00040 _clock(ClockObject::get_global_clock()),
00041 _frame_cvar(_lock)
00042 {
00043
00044 do_make_task_chain("default");
00045 }
00046
00047
00048
00049
00050
00051
00052 AsyncTaskManager::
00053 ~AsyncTaskManager() {
00054 cleanup();
00055 }
00056
00057
00058
00059
00060
00061
00062
00063 void AsyncTaskManager::
00064 cleanup() {
00065 MutexHolder holder(_lock);
00066
00067 if (task_cat.is_debug()) {
00068 do_output(task_cat.debug());
00069 task_cat.debug(false)
00070 << ": cleanup()\n";
00071 }
00072
00073
00074
00075 while (!_task_chains.empty()) {
00076 PT(AsyncTaskChain) chain = _task_chains[_task_chains.size() - 1];
00077 _task_chains.pop_back();
00078 chain->do_cleanup();
00079 }
00080
00081
00082
00083 if (_num_tasks == 1) {
00084 nassertv(_tasks_by_name.size() == 1);
00085 TasksByName::const_iterator tbni = _tasks_by_name.begin();
00086 AsyncTask *task = (*tbni);
00087 nassertv(task->_state == AsyncTask::S_servicing ||
00088 task->_state == AsyncTask::S_servicing_removed);
00089 task->_state = AsyncTask::S_servicing_removed;
00090
00091 } else {
00092
00093
00094 #ifndef NDEBUG
00095 nassertd(_num_tasks == 0 && _tasks_by_name.empty()) {
00096 task_cat.error()
00097 << "_num_tasks = " << _num_tasks << " _tasks_by_name = " << _tasks_by_name.size() << "\n";
00098 TasksByName::const_iterator tbni;
00099 for (tbni = _tasks_by_name.begin();
00100 tbni != _tasks_by_name.end();
00101 ++tbni) {
00102 task_cat.error()
00103 << " " << *(*tbni) << "\n";
00104 }
00105 }
00106 #endif // NDEBUG
00107 }
00108 }
00109
00110
00111
00112
00113
00114
00115 int AsyncTaskManager::
00116 get_num_task_chains() const {
00117 MutexHolder holder(_lock);
00118 return _task_chains.size();
00119 }
00120
00121
00122
00123
00124
00125
00126 AsyncTaskChain *AsyncTaskManager::
00127 get_task_chain(int n) const {
00128 MutexHolder holder(_lock);
00129 nassertr(n >= 0 && n < (int)_task_chains.size(), NULL);
00130 return _task_chains[n];
00131 }
00132
00133
00134
00135
00136
00137
00138
00139
00140
00141 AsyncTaskChain *AsyncTaskManager::
00142 make_task_chain(const string &name) {
00143 MutexHolder holder(_lock);
00144 return do_make_task_chain(name);
00145 }
00146
00147
00148
00149
00150
00151
00152
00153 AsyncTaskChain *AsyncTaskManager::
00154 find_task_chain(const string &name) {
00155 MutexHolder holder(_lock);
00156 return do_find_task_chain(name);
00157 }
00158
00159
00160
00161
00162
00163
00164
00165
00166
00167
00168
00169 bool AsyncTaskManager::
00170 remove_task_chain(const string &name) {
00171 MutexHolder holder(_lock);
00172
00173 PT(AsyncTaskChain) chain = new AsyncTaskChain(this, name);
00174 TaskChains::iterator tci = _task_chains.find(chain);
00175 if (tci == _task_chains.end()) {
00176
00177 return false;
00178 }
00179
00180 chain = (*tci);
00181
00182 while (chain->_num_tasks != 0) {
00183
00184 task_cat.info()
00185 << "Waiting for tasks on chain " << name << " to finish.\n";
00186 chain->do_wait_for_tasks();
00187 }
00188
00189
00190 chain->do_cleanup();
00191 _task_chains.erase(tci);
00192 return true;
00193 }
00194
00195
00196
00197
00198
00199
00200
00201
00202 void AsyncTaskManager::
00203 add(AsyncTask *task) {
00204 nassertv(task->is_runnable());
00205
00206 {
00207 MutexHolder holder(_lock);
00208
00209 if (task_cat.is_debug()) {
00210 task_cat.debug()
00211 << "Adding " << *task << "\n";
00212 }
00213
00214 if (task->_state == AsyncTask::S_servicing_removed) {
00215 if (task->_manager == this) {
00216
00217
00218 task->_state = AsyncTask::S_servicing;
00219 return;
00220 }
00221 }
00222
00223 nassertv(task->_manager == NULL &&
00224 task->_state == AsyncTask::S_inactive);
00225 nassertv(!do_has_task(task));
00226
00227 _lock.release();
00228 task->upon_birth(this);
00229 _lock.acquire();
00230 nassertv(task->_manager == NULL &&
00231 task->_state == AsyncTask::S_inactive);
00232 nassertv(!do_has_task(task));
00233
00234 AsyncTaskChain *chain = do_find_task_chain(task->_chain_name);
00235 if (chain == (AsyncTaskChain *)NULL) {
00236 task_cat.warning()
00237 << "Creating implicit AsyncTaskChain " << task->_chain_name
00238 << " for " << get_type() << " " << get_name() << "\n";
00239 chain = do_make_task_chain(task->_chain_name);
00240 }
00241 chain->do_add(task);
00242 }
00243 }
00244
00245
00246
00247
00248
00249
00250
00251 bool AsyncTaskManager::
00252 has_task(AsyncTask *task) const {
00253 MutexHolder holder(_lock);
00254
00255 if (task->_manager != this) {
00256 nassertr(!do_has_task(task), false);
00257 return false;
00258 }
00259
00260 if (task->_state == AsyncTask::S_servicing_removed) {
00261 return false;
00262 }
00263
00264
00265
00266 return true;
00267 }
00268
00269
00270
00271
00272
00273
00274
00275
00276
00277
00278 AsyncTask *AsyncTaskManager::
00279 find_task(const string &name) const {
00280 AsyncTask sample_task(name);
00281 sample_task.local_object();
00282
00283 TasksByName::const_iterator tbni = _tasks_by_name.lower_bound(&sample_task);
00284 if (tbni != _tasks_by_name.end() && (*tbni)->get_name() == name) {
00285 return (*tbni);
00286 }
00287
00288 return NULL;
00289 }
00290
00291
00292
00293
00294
00295
00296
00297 AsyncTaskCollection AsyncTaskManager::
00298 find_tasks(const string &name) const {
00299 AsyncTask sample_task(name);
00300 sample_task.local_object();
00301
00302 TasksByName::const_iterator tbni = _tasks_by_name.lower_bound(&sample_task);
00303 AsyncTaskCollection result;
00304 while (tbni != _tasks_by_name.end() && (*tbni)->get_name() == name) {
00305 result.add_task(*tbni);
00306 ++tbni;
00307 }
00308
00309 return result;
00310 }
00311
00312
00313
00314
00315
00316
00317
00318 AsyncTaskCollection AsyncTaskManager::
00319 find_tasks_matching(const GlobPattern &pattern) const {
00320 string prefix = pattern.get_const_prefix();
00321 AsyncTask sample_task(prefix);
00322 sample_task.local_object();
00323
00324 TasksByName::const_iterator tbni = _tasks_by_name.lower_bound(&sample_task);
00325 AsyncTaskCollection result;
00326 while (tbni != _tasks_by_name.end() && (*tbni)->get_name().substr(0, prefix.size()) == prefix) {
00327 AsyncTask *task = (*tbni);
00328 if (pattern.matches(task->get_name())) {
00329 result.add_task(task);
00330 }
00331 ++tbni;
00332 }
00333
00334 return result;
00335 }
00336
00337
00338
00339
00340
00341
00342
00343
00344 bool AsyncTaskManager::
00345 remove(AsyncTask *task) {
00346
00347
00348
00349 AsyncTaskCollection tasks;
00350 tasks.add_task(task);
00351 return remove(tasks) != 0;
00352 }
00353
00354
00355
00356
00357
00358
00359
00360 int AsyncTaskManager::
00361 remove(const AsyncTaskCollection &tasks) {
00362 MutexHolder holder(_lock);
00363 int num_removed = 0;
00364
00365 int num_tasks = tasks.get_num_tasks();
00366 int i;
00367 for (i = 0; i < num_tasks; ++i) {
00368 PT(AsyncTask) task = tasks.get_task(i);
00369
00370 if (task->_manager != this) {
00371
00372 nassertr(!do_has_task(task), num_removed);
00373 } else {
00374 nassertr(task->_chain->_manager == this, num_removed);
00375 if (task_cat.is_debug()) {
00376 task_cat.debug()
00377 << "Removing " << *task << "\n";
00378 }
00379 if (task->_chain->do_remove(task)) {
00380 _lock.release();
00381 task->upon_death(this, false);
00382 _lock.acquire();
00383 ++num_removed;
00384 } else {
00385 if (task_cat.is_debug()) {
00386 task_cat.debug()
00387 << " (unable to remove " << *task << ")\n";
00388 }
00389 }
00390 }
00391 }
00392
00393 return num_removed;
00394 }
00395
00396
00397
00398
00399
00400
00401 void AsyncTaskManager::
00402 wait_for_tasks() {
00403 MutexHolder holder(_lock);
00404
00405
00406 while (_num_tasks > 0) {
00407
00408
00409
00410 for (unsigned int i = 0; i < _task_chains.size(); ++i) {
00411 AsyncTaskChain *chain = _task_chains[i];
00412 chain->do_wait_for_tasks();
00413 }
00414 }
00415 }
00416
00417
00418
00419
00420
00421
00422
00423
00424
00425 void AsyncTaskManager::
00426 stop_threads() {
00427 MutexHolder holder(_lock);
00428
00429
00430
00431
00432 for (unsigned int i = 0; i < _task_chains.size(); ++i) {
00433 AsyncTaskChain *chain = _task_chains[i];
00434 chain->do_stop_threads();
00435 }
00436 }
00437
00438
00439
00440
00441
00442
00443
00444
00445 void AsyncTaskManager::
00446 start_threads() {
00447 MutexHolder holder(_lock);
00448
00449
00450
00451
00452 for (unsigned int i = 0; i < _task_chains.size(); ++i) {
00453 AsyncTaskChain *chain = _task_chains[i];
00454
00455 chain->do_start_threads();
00456 }
00457 }
00458
00459
00460
00461
00462
00463
00464
00465 AsyncTaskCollection AsyncTaskManager::
00466 get_tasks() const {
00467 MutexHolder holder(_lock);
00468
00469 AsyncTaskCollection result;
00470 TaskChains::const_iterator tci;
00471 for (tci = _task_chains.begin();
00472 tci != _task_chains.end();
00473 ++tci) {
00474 AsyncTaskChain *chain = (*tci);
00475 result.add_tasks_from(chain->do_get_active_tasks());
00476 result.add_tasks_from(chain->do_get_sleeping_tasks());
00477 }
00478
00479 return result;
00480 }
00481
00482
00483
00484
00485
00486
00487
00488
00489 AsyncTaskCollection AsyncTaskManager::
00490 get_active_tasks() const {
00491 MutexHolder holder(_lock);
00492
00493 AsyncTaskCollection result;
00494 TaskChains::const_iterator tci;
00495 for (tci = _task_chains.begin();
00496 tci != _task_chains.end();
00497 ++tci) {
00498 AsyncTaskChain *chain = (*tci);
00499 result.add_tasks_from(chain->do_get_active_tasks());
00500 }
00501
00502 return result;
00503 }
00504
00505
00506
00507
00508
00509
00510
00511
00512 AsyncTaskCollection AsyncTaskManager::
00513 get_sleeping_tasks() const {
00514 MutexHolder holder(_lock);
00515
00516 AsyncTaskCollection result;
00517 TaskChains::const_iterator tci;
00518 for (tci = _task_chains.begin();
00519 tci != _task_chains.end();
00520 ++tci) {
00521 AsyncTaskChain *chain = (*tci);
00522 result.add_tasks_from(chain->do_get_sleeping_tasks());
00523 }
00524
00525 return result;
00526 }
00527
00528
00529
00530
00531
00532
00533
00534
00535
00536
00537 void AsyncTaskManager::
00538 poll() {
00539 MutexHolder holder(_lock);
00540
00541
00542
00543
00544 for (unsigned int i = 0; i < _task_chains.size(); ++i) {
00545 AsyncTaskChain *chain = _task_chains[i];
00546 chain->do_poll();
00547 }
00548
00549
00550
00551 _frame_cvar.notify_all();
00552 }
00553
00554
00555
00556
00557
00558
00559
00560
00561 double AsyncTaskManager::
00562 get_next_wake_time() const {
00563 MutexHolder holder(_lock);
00564
00565 bool got_any = false;
00566 double next_wake_time = -1.0;
00567
00568 TaskChains::const_iterator tci;
00569 for (tci = _task_chains.begin();
00570 tci != _task_chains.end();
00571 ++tci) {
00572 AsyncTaskChain *chain = (*tci);
00573 double time = chain->do_get_next_wake_time();
00574 if (time >= 0.0) {
00575 if (!got_any) {
00576 got_any = true;
00577 next_wake_time = time;
00578 } else {
00579 next_wake_time = min(time, next_wake_time);
00580 }
00581 }
00582 }
00583
00584 return next_wake_time;
00585 }
00586
00587
00588
00589
00590
00591
00592 void AsyncTaskManager::
00593 output(ostream &out) const {
00594 MutexHolder holder(_lock);
00595 do_output(out);
00596 }
00597
00598
00599
00600
00601
00602
00603 void AsyncTaskManager::
00604 write(ostream &out, int indent_level) const {
00605 MutexHolder holder(_lock);
00606 indent(out, indent_level)
00607 << get_type() << " " << get_name() << "\n";
00608
00609 TaskChains::const_iterator tci;
00610 for (tci = _task_chains.begin();
00611 tci != _task_chains.end();
00612 ++tci) {
00613 AsyncTaskChain *chain = (*tci);
00614 if (chain->_num_tasks != 0) {
00615 out << "\n";
00616 chain->do_write(out, indent_level + 2);
00617 }
00618 }
00619 }
00620
00621
00622
00623
00624
00625
00626
00627
00628
00629
00630
00631 AsyncTaskChain *AsyncTaskManager::
00632 do_make_task_chain(const string &name) {
00633 PT(AsyncTaskChain) chain = new AsyncTaskChain(this, name);
00634
00635 TaskChains::const_iterator tci = _task_chains.insert(chain).first;
00636 return (*tci);
00637 }
00638
00639
00640
00641
00642
00643
00644
00645
00646
00647 AsyncTaskChain *AsyncTaskManager::
00648 do_find_task_chain(const string &name) {
00649 PT(AsyncTaskChain) chain = new AsyncTaskChain(this, name);
00650
00651 TaskChains::const_iterator tci = _task_chains.find(chain);
00652 if (tci != _task_chains.end()) {
00653 return (*tci);
00654 }
00655
00656 return NULL;
00657 }
00658
00659
00660
00661
00662
00663
00664
00665 void AsyncTaskManager::
00666 remove_task_by_name(AsyncTask *task) {
00667 if (!task->get_name().empty()) {
00668
00669
00670 TasksByName::iterator tbni = _tasks_by_name.lower_bound(task);
00671 while (tbni != _tasks_by_name.end()) {
00672 if ((*tbni) == task) {
00673 _tasks_by_name.erase(tbni);
00674 return;
00675 }
00676 if ((*tbni)->get_name() != task->get_name()) {
00677
00678 break;
00679 }
00680
00681 ++tbni;
00682 }
00683
00684
00685 nassertv(false);
00686 }
00687 }
00688
00689
00690
00691
00692
00693
00694
00695
00696
00697 bool AsyncTaskManager::
00698 do_has_task(AsyncTask *task) const {
00699 TaskChains::const_iterator tci;
00700 for (tci = _task_chains.begin();
00701 tci != _task_chains.end();
00702 ++tci) {
00703 AsyncTaskChain *chain = (*tci);
00704 if (chain->do_has_task(task)) {
00705 return true;
00706 }
00707 }
00708
00709 return false;
00710 }
00711
00712
00713
00714
00715
00716
00717 void AsyncTaskManager::
00718 do_output(ostream &out) const {
00719 out << get_type() << " " << get_name()
00720 << "; " << _num_tasks << " tasks";
00721 }
00722
00723
00724
00725
00726
00727
00728
00729 void AsyncTaskManager::
00730 make_global_ptr() {
00731 nassertv(_global_ptr == (AsyncTaskManager *)NULL);
00732
00733 _global_ptr = new AsyncTaskManager("TaskManager");
00734 }