28 using std::ostringstream;
43 _cvar(manager->_lock),
45 _timeslice_priority(false),
47 _thread_priority(TP_normal),
52 _num_awaiting_tasks(0),
54 _current_sort(-INT_MAX),
56 _needs_cleanup(false),
59 _block_till_next_frame(false),
60 _next_implicit_sort(0)
85 _tick_clock = tick_clock;
103 nassertv(num_threads >= 0);
105 if (task_cat.is_debug()) {
106 do_output(task_cat.debug());
107 task_cat.debug(
false)
108 <<
": set_num_threads(" << num_threads <<
")\n";
116 if (_num_threads != num_threads) {
118 _num_threads = num_threads;
120 if (_num_tasks != 0) {
144 return _threads.size();
154 if (_thread_priority != priority) {
156 _thread_priority = priority;
158 if (_num_tasks != 0) {
170 return _thread_priority;
184 _frame_budget = frame_budget;
194 return _frame_budget;
214 _frame_sync = frame_sync;
246 _timeslice_priority = timeslice_priority;
257 return _timeslice_priority;
267 if (_state == S_started || _state == S_interrupted) {
281 if (_state == S_initial || _state == S_interrupted) {
295 if (task->_chain !=
this) {
296 nassertr(!do_has_task(task),
false);
300 if (task->_state == AsyncTask::S_servicing_removed) {
347 return do_get_active_tasks();
357 return do_get_sleeping_tasks();
382 return do_get_next_wake_time();
388 void AsyncTaskChain::
389 output(ostream &out)
const {
397 void AsyncTaskChain::
398 write(ostream &out,
int indent_level)
const {
400 do_write(out, indent_level);
410 void AsyncTaskChain::
412 nassertv(task->_chain ==
nullptr &&
413 task->_manager ==
nullptr &&
414 task->_chain_name == get_name() &&
415 task->_state == AsyncTask::S_inactive);
416 nassertv(!do_has_task(task));
421 task->_manager = _manager;
423 double now = _manager->_clock->get_frame_time();
424 task->_start_time = now;
425 task->_start_frame = _manager->_clock->get_frame_count();
428 task->_implicit_sort = _next_implicit_sort++;
430 _manager->add_task_by_name(task);
434 task->_wake_time = now + task->
get_delay();
435 task->_start_time = task->_wake_time;
436 task->_state = AsyncTask::S_sleeping;
437 _sleeping.push_back(task);
438 push_heap(_sleeping.begin(), _sleeping.end(), AsyncTaskSortWakeTime());
442 task->_state = AsyncTask::S_active;
443 if (task_cat.is_spam()) {
445 <<
"Adding " << *task <<
" with sort " << task->
get_sort()
446 <<
" to chain " << get_name() <<
" with current_sort "
447 << _current_sort <<
"\n";
449 if (task->
get_sort() >= _current_sort) {
451 _active.push_back(task);
452 push_heap(_active.begin(), _active.end(), AsyncTaskSortPriority());
455 _next_active.push_back(task);
459 ++(_manager->_num_tasks);
460 _needs_cleanup =
true;
470 bool AsyncTaskChain::
471 do_remove(
AsyncTask *task,
bool upon_death) {
472 nassertr(task->_chain ==
this,
false);
474 switch (task->_state) {
475 case AsyncTask::S_servicing:
477 task->_state = AsyncTask::S_servicing_removed;
480 case AsyncTask::S_servicing_removed:
484 case AsyncTask::S_sleeping:
487 int index = find_task_on_heap(_sleeping, task);
488 nassertr(index != -1,
false);
490 _sleeping.erase(_sleeping.begin() + index);
491 make_heap(_sleeping.begin(), _sleeping.end(), AsyncTaskSortWakeTime());
492 cleanup_task(task, upon_death,
false);
496 case AsyncTask::S_active:
500 int index = find_task_on_heap(_active, task);
502 _active.erase(_active.begin() + index);
503 make_heap(_active.begin(), _active.end(), AsyncTaskSortPriority());
505 index = find_task_on_heap(_next_active, task);
507 _next_active.erase(_next_active.begin() + index);
509 index = find_task_on_heap(_this_active, task);
510 nassertr(index != -1,
false);
513 cleanup_task(task, upon_death,
false);
527 void AsyncTaskChain::
528 do_wait_for_tasks() {
531 if (_threads.empty()) {
533 while (_num_tasks > 0) {
534 if (_state == S_shutdown || _state == S_interrupted) {
542 while (_num_tasks > 0) {
543 if (_state == S_shutdown || _state == S_interrupted) {
557 void AsyncTaskChain::
559 if (task_cat.is_spam()) {
560 do_output(task_cat.spam());
562 <<
": do_cleanup()\n";
574 dead.reserve(_num_tasks);
576 _needs_cleanup =
false;
578 TaskHeap::const_iterator ti;
579 for (ti = _active.begin(); ti != _active.end(); ++ti) {
581 dead.push_back(task);
582 cleanup_task(task,
false,
false);
584 for (ti = _this_active.begin(); ti != _this_active.end(); ++ti) {
586 dead.push_back(task);
587 cleanup_task(task,
false,
false);
589 for (ti = _next_active.begin(); ti != _next_active.end(); ++ti) {
591 dead.push_back(task);
592 cleanup_task(task,
false,
false);
594 for (ti = _sleeping.begin(); ti != _sleeping.end(); ++ti) {
596 dead.push_back(task);
597 cleanup_task(task,
false,
false);
601 nassertv(_num_tasks == 0 || _num_tasks == 1);
605 for (ti = dead.begin(); ti != dead.end(); ++ti) {
606 (*ti)->upon_death(_manager,
false);
608 _manager->_lock.
lock();
610 if (task_cat.is_spam()) {
611 do_output(task_cat.spam());
613 <<
": done do_cleanup()\n";
622 bool AsyncTaskChain::
624 return (find_task_on_heap(_active, task) != -1 ||
625 find_task_on_heap(_next_active, task) != -1 ||
626 find_task_on_heap(_sleeping, task) != -1 ||
627 find_task_on_heap(_this_active, task) != -1);
636 find_task_on_heap(
const TaskHeap &heap,
AsyncTask *task)
const {
637 for (
int i = 0; i < (int)heap.size(); ++i) {
638 if (heap[i] == task) {
653 void AsyncTaskChain::
654 service_one_task(AsyncTaskChain::AsyncTaskChainThread *thread) {
655 if (!_active.empty()) {
657 pop_heap(_active.begin(), _active.end(), AsyncTaskSortPriority());
660 if (thread !=
nullptr) {
661 thread->_servicing = task;
664 if (task_cat.is_spam()) {
666 <<
"Servicing " << *task <<
" in "
670 nassertv(task->
get_sort() == _current_sort);
671 nassertv(task->_state == AsyncTask::S_active);
672 task->_state = AsyncTask::S_servicing;
673 task->_servicing_thread = thread;
675 AsyncTask::DoneStatus ds = task->unlock_and_do_task();
677 if (thread !=
nullptr) {
678 thread->_servicing =
nullptr;
680 task->_servicing_thread =
nullptr;
682 if (task->_chain ==
this) {
683 if (task->_state == AsyncTask::S_servicing_removed) {
685 cleanup_task(task,
true,
false);
687 }
else if (task->_chain_name != get_name()) {
690 cleanup_task(task,
false,
false);
691 task->jump_to_task_chain(_manager);
695 case AsyncTask::DS_cont:
697 task->_state = AsyncTask::S_active;
698 _next_active.push_back(task);
702 case AsyncTask::DS_again:
705 double now = _manager->_clock->get_frame_time();
706 task->_wake_time = now + task->
get_delay();
707 task->_start_time = task->_wake_time;
708 task->_state = AsyncTask::S_sleeping;
709 _sleeping.push_back(task);
710 push_heap(_sleeping.begin(), _sleeping.end(), AsyncTaskSortWakeTime());
711 if (task_cat.is_spam()) {
713 <<
"Sleeping " << *task <<
", wake time at "
714 << task->_wake_time - now <<
"\n";
720 case AsyncTask::DS_pickup:
722 task->_state = AsyncTask::S_active;
723 _this_active.push_back(task);
727 case AsyncTask::DS_interrupt:
729 task->_state = AsyncTask::S_active;
730 _next_active.push_back(task);
731 if (_state == S_started) {
732 _state = S_interrupted;
737 case AsyncTask::DS_await:
739 task->_state = AsyncTask::S_awaiting;
741 ++_num_awaiting_tasks;
746 cleanup_task(task,
true,
true);
751 <<
"Task is no longer on chain " << get_name()
752 <<
": " << *task <<
"\n";
755 if (task_cat.is_spam()) {
757 <<
"Done servicing " << *task <<
" in "
761 thread_consider_yield();
773 void AsyncTaskChain::
774 cleanup_task(
AsyncTask *task,
bool upon_death,
bool clean_exit) {
775 if (task_cat.is_spam()) {
776 do_output(task_cat.spam());
778 <<
": cleanup_task(" << *task <<
", " << upon_death <<
", " << clean_exit
782 nassertv(task->_chain ==
this);
784 task->_state = AsyncTask::S_inactive;
785 task->_chain =
nullptr;
787 --(_manager->_num_tasks);
789 _manager->remove_task_by_name(task);
793 if (task->set_future_state(clean_exit ? AsyncFuture::FS_finished
794 : AsyncFuture::FS_cancelled)) {
797 task->upon_death(_manager, clean_exit);
798 _manager->_lock.
lock();
801 task->_manager =
nullptr;
812 bool AsyncTaskChain::
813 finish_sort_group() {
814 nassertr(_num_busy_threads == 0,
true);
816 if (!_threads.empty()) {
817 PStatClient::thread_tick(get_name());
820 if (!_active.empty()) {
822 nassertr(_current_sort < _active.front()->get_sort(),
true);
823 _current_sort = _active.front()->get_sort();
830 if (!_this_active.empty() && _frame_budget >= 0.0) {
835 if (task_cat.is_spam()) {
836 do_output(task_cat.spam());
838 <<
": next epoch (pickup mode)\n";
842 _active.swap(_this_active);
847 if (task_cat.is_spam()) {
848 do_output(task_cat.spam());
853 _pickup_mode =
false;
857 _next_active.insert(_next_active.end(), _this_active.begin(), _this_active.end());
858 _this_active.clear();
860 _active.swap(_next_active);
867 if (task_cat.is_spam()) {
868 do_output(task_cat.spam());
872 _manager->_clock->tick();
875 }
else if (_frame_sync) {
878 _block_till_next_frame =
true;
882 double now = _manager->_clock->get_frame_time();
883 while (!_sleeping.empty() && _sleeping.front()->_wake_time <= now) {
885 if (task_cat.is_spam()) {
887 <<
"Waking " << *task <<
", wake time at "
888 << task->_wake_time - now <<
"\n";
890 pop_heap(_sleeping.begin(), _sleeping.end(), AsyncTaskSortWakeTime());
891 _sleeping.pop_back();
892 task->_state = AsyncTask::S_active;
893 task->_start_frame = _manager->_clock->get_frame_count();
894 _active.push_back(task);
897 if (task_cat.is_spam()) {
898 if (_sleeping.empty()) {
900 <<
"No more tasks on sleeping queue.\n";
903 <<
"Next sleeper: " << *_sleeping.front() <<
", wake time at "
904 << _sleeping.front()->_wake_time - now <<
"\n";
910 TaskHeap::const_iterator ti;
911 for (ti = _active.begin(); ti != _active.end(); ++ti) {
917 if (_timeslice_priority) {
918 filter_timeslice_priority();
921 nassertr((
size_t)_num_tasks == _active.size() + _this_active.size() + _next_active.size() + _sleeping.size() + (
size_t)_num_awaiting_tasks,
true);
922 make_heap(_active.begin(), _active.end(), AsyncTaskSortPriority());
924 _current_sort = -INT_MAX;
926 if (!_active.empty()) {
933 _pickup_mode =
false;
934 nassertr(_this_active.empty(),
false);
945 void AsyncTaskChain::
946 filter_timeslice_priority() {
947 if (_active.empty()) {
950 nassertv(_timeslice_priority);
953 double net_runtime = 0.0;
954 int net_priority = 0;
956 TaskHeap::iterator ti;
957 for (ti = _active.begin(); ti != _active.end(); ++ti) {
960 int priority = max(task->_priority, 1);
961 net_runtime += runtime;
962 net_priority += priority;
966 double average_budget = net_runtime / (double)net_priority;
968 TaskHeap keep, postpone;
969 for (ti = _active.begin(); ti != _active.end(); ++ti) {
972 int priority = max(task->_priority, 1);
973 double consumed = runtime / (double)priority;
976 if (consumed > average_budget) {
978 postpone.push_back(task);
981 keep.push_back(task);
988 nassertv(!postpone.empty());
989 ti = postpone.begin();
990 TaskHeap::iterator max_ti = ti;
992 while (ti != postpone.end()) {
993 if ((*ti)->_priority > (*max_ti)->_priority) {
1000 keep.push_back(*max_ti);
1001 postpone.erase(max_ti);
1006 _this_active.insert(_this_active.end(), postpone.begin(), postpone.end());
1008 _next_active.insert(_next_active.end(), postpone.begin(), postpone.end());
1011 nassertv(!_active.empty());
1018 void AsyncTaskChain::
1020 if (_state == S_started || _state == S_interrupted) {
1021 if (task_cat.is_debug() && !_threads.empty()) {
1023 <<
"Stopping " << _threads.size()
1024 <<
" threads for " << _manager->get_name()
1025 <<
" chain " << get_name()
1029 _state = S_shutdown;
1033 Threads wait_threads;
1034 wait_threads.swap(_threads);
1038 _manager->_lock.
unlock();
1039 Threads::iterator ti;
1040 for (ti = wait_threads.begin(); ti != wait_threads.end(); ++ti) {
1041 if (task_cat.is_debug()) {
1043 <<
"Waiting for " << *(*ti) <<
" in "
1047 if (task_cat.is_spam()) {
1049 <<
"Done waiting for " << *(*ti) <<
" in "
1053 _manager->_lock.
lock();
1058 nassertv(_num_busy_threads == 0 || _num_busy_threads == 1);
1059 cleanup_pickup_mode();
1067 void AsyncTaskChain::
1068 do_start_threads() {
1069 if (_state == S_interrupted) {
1073 if (_state == S_initial) {
1076 if (task_cat.is_debug()) {
1078 <<
"Starting " << _num_threads <<
" threads for "
1079 << _manager->get_name() <<
" chain " << get_name() <<
"\n";
1081 _needs_cleanup =
true;
1082 _threads.reserve(_num_threads);
1083 for (
int i = 0; i < _num_threads; ++i) {
1085 strm << _manager->get_name() <<
"_" << get_name() <<
"_" << i;
1086 PT(AsyncTaskChainThread) thread =
new AsyncTaskChainThread(strm.str(),
this);
1087 if (thread->start(_thread_priority,
true)) {
1088 _threads.push_back(thread);
1100 do_get_active_tasks()
const {
1103 Threads::const_iterator thi;
1104 for (thi = _threads.begin(); thi != _threads.end(); ++thi) {
1106 if (task !=
nullptr) {
1110 TaskHeap::const_iterator ti;
1111 for (ti = _active.begin(); ti != _active.end(); ++ti) {
1115 for (ti = _this_active.begin(); ti != _this_active.end(); ++ti) {
1119 for (ti = _next_active.begin(); ti != _next_active.end(); ++ti) {
1132 do_get_sleeping_tasks()
const {
1135 TaskHeap::const_iterator ti;
1136 for (ti = _sleeping.begin(); ti != _sleeping.end(); ++ti) {
1148 void AsyncTaskChain::
1150 thread_consider_yield();
1151 if (_num_tasks == 0) {
1157 if (!_threads.empty()) {
1161 if (_num_busy_threads != 0) {
1164 <<
"Ignoring recursive poll() within another task.\n";
1168 nassertv(!_pickup_mode);
1171 while (!_active.empty()) {
1172 if (_state == S_shutdown || _state == S_interrupted) {
1175 int frame = _manager->_clock->get_frame_count();
1176 if (_current_frame != frame) {
1177 _current_frame = frame;
1178 _time_in_frame = 0.0;
1179 _block_till_next_frame =
false;
1181 if (_block_till_next_frame ||
1182 (_frame_budget >= 0.0 && _time_in_frame >= _frame_budget)) {
1185 cleanup_pickup_mode();
1189 _current_sort = _active.front()->get_sort();
1194 _num_busy_threads++;
1195 service_one_task(
nullptr);
1196 _num_busy_threads--;
1199 if (!_threads.empty()) {
1204 finish_sort_group();
1205 }
while (_pickup_mode);
1213 void AsyncTaskChain::
1214 cleanup_pickup_mode() {
1216 _pickup_mode =
false;
1219 _next_active.insert(_next_active.end(), _this_active.begin(), _this_active.end());
1220 _this_active.clear();
1221 _next_active.insert(_next_active.end(), _active.begin(), _active.end());
1225 finish_sort_group();
1233 void AsyncTaskChain::
1234 do_output(ostream &out)
const {
1235 if (_manager !=
nullptr) {
1236 out << _manager->get_type() <<
" " << _manager->get_name();
1238 out <<
"(no manager)";
1240 out <<
" task chain " << get_name()
1241 <<
"; " << _num_tasks <<
" tasks";
1248 void AsyncTaskChain::
1249 do_write(ostream &out,
int indent_level)
const {
1250 indent(out, indent_level)
1251 <<
"Task chain \"" << get_name() <<
"\"\n";
1252 if (_num_threads > 0) {
1253 indent(out, indent_level + 2)
1254 << _num_threads <<
" threads, priority " << _thread_priority <<
"\n";
1256 if (_frame_budget >= 0.0) {
1257 indent(out, indent_level + 2)
1258 <<
"frame budget " << _frame_budget <<
" s\n";
1260 if (_timeslice_priority) {
1261 indent(out, indent_level + 2)
1262 <<
"timeslice priority\n";
1265 indent(out, indent_level + 2)
1269 static const size_t buffer_size = 1024;
1270 char buffer[buffer_size];
1271 sprintf(buffer,
" %-32s %8s %8s %8s %8s %6s",
1274 "dt(ms)",
"avg",
"max",
1276 nassertv(strlen(buffer) < buffer_size);
1278 indent(out, indent_level)
1281 indent(out, indent_level);
1282 for (
int i = 0; i < 32+8+8+8+8+6+7; ++i) {
1288 TaskHeap tasks = _active;
1289 tasks.insert(tasks.end(), _this_active.begin(), _this_active.end());
1290 tasks.insert(tasks.end(), _next_active.begin(), _next_active.end());
1292 Threads::const_iterator thi;
1293 for (thi = _threads.begin(); thi != _threads.end(); ++thi) {
1295 if (task !=
nullptr) {
1296 tasks.push_back(task);
1300 double now = _manager->_clock->get_frame_time();
1302 if (!tasks.empty()) {
1303 sort(tasks.begin(), tasks.end(), AsyncTaskSortPriority());
1308 TaskHeap::reverse_iterator ti;
1309 for (ti = tasks.rbegin(); ti != tasks.rend(); ++ti) {
1311 write_task_line(out, indent_level, task, now);
1318 TaskHeap sleeping = _sleeping;
1319 while (!sleeping.empty()) {
1321 pop_heap(sleeping.begin(), sleeping.end(), AsyncTaskSortWakeTime());
1322 sleeping.pop_back();
1324 write_task_line(out, indent_level, task, now);
1332 void AsyncTaskChain::
1333 write_task_line(ostream &out,
int indent_level,
AsyncTask *task,
double now)
const {
1334 char servicing_flag =
' ';
1335 if (task->_state == AsyncTask::S_servicing) {
1336 servicing_flag =
'*';
1337 }
else if (task->_state == AsyncTask::S_servicing_removed) {
1338 servicing_flag =
'-';
1341 static const size_t buffer_size = 1024;
1342 char buffer[buffer_size];
1344 if (task->_state == AsyncTask::S_sleeping) {
1347 string name = task->get_name().substr(0, 32);
1348 sprintf(buffer,
"%c%-32s %8.1f",
1349 servicing_flag, name.c_str(),
1350 task->_wake_time - now);
1354 string name = task->get_name().substr(0, 41);
1355 sprintf(buffer,
"%c%-41s",
1356 servicing_flag, name.c_str());
1358 nassertv(strlen(buffer) < buffer_size);
1360 indent(out, indent_level)
1363 if (task->_num_frames > 0) {
1364 sprintf(buffer,
" %8.1f %8.1f %8.1f %6d",
1366 task->_max_dt * 1000.0,
1370 sprintf(buffer,
" %8s %8s %8s %6d",
1375 nassertv(strlen(buffer) < buffer_size);
1376 out << buffer <<
"\n";
1382 AsyncTaskChain::AsyncTaskChainThread::
1383 AsyncTaskChainThread(
const string &name,
AsyncTaskChain *chain) :
1384 Thread(name, chain->get_name()),
1393 void AsyncTaskChain::AsyncTaskChainThread::
1396 while (_chain->_state != S_shutdown && _chain->_state != S_interrupted) {
1397 thread_consider_yield();
1398 if (!_chain->_active.empty() &&
1399 _chain->_active.front()->get_sort() == _chain->_current_sort) {
1401 int frame = _chain->_manager->_clock->get_frame_count();
1402 if (_chain->_current_frame != frame) {
1403 _chain->_current_frame = frame;
1404 _chain->_time_in_frame = 0.0;
1405 _chain->_block_till_next_frame =
false;
1409 if (_chain->_block_till_next_frame ||
1410 (_chain->_frame_budget >= 0.0 && _chain->_time_in_frame >= _chain->_frame_budget)) {
1411 while ((_chain->_block_till_next_frame ||
1412 (_chain->_frame_budget >= 0.0 && _chain->_time_in_frame >= _chain->_frame_budget)) &&
1413 _chain->_state != S_shutdown && _chain->_state != S_interrupted) {
1414 _chain->cleanup_pickup_mode();
1415 _chain->_manager->_frame_cvar.wait();
1416 frame = _chain->_manager->_clock->get_frame_count();
1417 if (_chain->_current_frame != frame) {
1418 _chain->_current_frame = frame;
1419 _chain->_time_in_frame = 0.0;
1420 _chain->_block_till_next_frame =
false;
1428 _chain->_num_busy_threads++;
1429 _chain->service_one_task(
this);
1430 _chain->_num_busy_threads--;
1431 _chain->_cvar.notify_all();
1437 if (_chain->_num_busy_threads == 0) {
1439 if (!_chain->finish_sort_group()) {
1441 if (_chain->_sleeping.empty()) {
1443 _chain->_cvar.wait();
1445 double wake_time = _chain->do_get_next_wake_time();
1446 double now = _chain->_manager->_clock->get_frame_time();
1447 double timeout = max(wake_time - now, 0.0);
1449 _chain->_cvar.wait(timeout);
1457 _chain->_cvar.wait();