Panda3D
asyncTaskManager.cxx
Go to the documentation of this file.
1 /**
2  * PANDA 3D SOFTWARE
3  * Copyright (c) Carnegie Mellon University. All rights reserved.
4  *
5  * All use of this software is subject to the terms of the revised BSD
6  * license. You should have received a copy of this license along
7  * with this source code in a file named "LICENSE."
8  *
9  * @file asyncTaskManager.cxx
10  * @author drose
11  * @date 2006-08-23
12  */
13 
14 #include "asyncTaskManager.h"
15 #include "event.h"
16 #include "pt_Event.h"
17 #include "mutexHolder.h"
18 #include "indent.h"
19 #include "pStatClient.h"
20 #include "pStatTimer.h"
21 #include "clockObject.h"
22 #include "config_event.h"
23 #include <algorithm>
24 
25 using std::string;
26 
27 AsyncTaskManager *AsyncTaskManager::_global_ptr = nullptr;
28 
29 TypeHandle AsyncTaskManager::_type_handle;
30 
31 /**
32  *
33  */
34 AsyncTaskManager::
35 AsyncTaskManager(const string &name) :
36  Namable(name),
37  _lock("AsyncTaskManager::_lock"),
38  _num_tasks(0),
39  _clock(ClockObject::get_global_clock()),
40  _frame_cvar(_lock)
41 {
42  // Make a default task chain.
43  do_make_task_chain("default");
44 }
45 
46 /**
47  *
48  */
49 AsyncTaskManager::
50 ~AsyncTaskManager() {
51  cleanup();
52 }
53 
54 /**
55  * Stops all threads and messily empties the task list. This is intended to
56  * be called on destruction only.
57  */
59 cleanup() {
60  MutexHolder holder(_lock);
61 
62  if (task_cat.is_debug()) {
63  do_output(task_cat.debug());
64  task_cat.debug(false)
65  << ": cleanup()\n";
66  }
67 
68  // Iterate carefully in case the tasks adjust the chain list within
69  // cleanup().
70  while (!_task_chains.empty()) {
71  PT(AsyncTaskChain) chain = _task_chains[_task_chains.size() - 1];
72  _task_chains.pop_back();
73  chain->do_cleanup();
74  }
75 
76  // There might be one remaining task, the current task. Especially if it
77  // wasn't running on a thread.
78  if (_num_tasks == 1) {
79  nassertv(_tasks_by_name.size() == 1);
80  TasksByName::const_iterator tbni = _tasks_by_name.begin();
81  AsyncTask *task = (*tbni);
82  nassertv(task->_state == AsyncTask::S_servicing ||
83  task->_state == AsyncTask::S_servicing_removed);
84  task->_state = AsyncTask::S_servicing_removed;
85 
86  } else {
87  // If there isn't exactly one remaining task, there should be none.
88 #ifndef NDEBUG
89  nassertd(_num_tasks == 0 && _tasks_by_name.empty()) {
90  task_cat.error()
91  << "_num_tasks = " << _num_tasks << " _tasks_by_name = " << _tasks_by_name.size() << "\n";
92  TasksByName::const_iterator tbni;
93  for (tbni = _tasks_by_name.begin();
94  tbni != _tasks_by_name.end();
95  ++tbni) {
96  task_cat.error()
97  << " " << *(*tbni) << "\n";
98  }
99  }
100 #endif // NDEBUG
101  }
102 }
103 
104 /**
105  * Returns the number of different task chains.
106  */
108 get_num_task_chains() const {
109  MutexHolder holder(_lock);
110  return _task_chains.size();
111 }
112 
113 /**
114  * Returns the nth task chain.
115  */
117 get_task_chain(int n) const {
118  MutexHolder holder(_lock);
119  nassertr(n >= 0 && n < (int)_task_chains.size(), nullptr);
120  return _task_chains[n];
121 }
122 
123 /**
124  * Creates a new AsyncTaskChain of the indicated name and stores it within the
125  * AsyncTaskManager. If a task chain with this name already exists, returns
126  * it instead.
127  */
129 make_task_chain(const string &name) {
130  MutexHolder holder(_lock);
131  return do_make_task_chain(name);
132 }
133 
134 /**
135  * Searches a new AsyncTaskChain of the indicated name and returns it if it
136  * exists, or NULL otherwise.
137  */
139 find_task_chain(const string &name) {
140  MutexHolder holder(_lock);
141  return do_find_task_chain(name);
142 }
143 
144 /**
145  * Removes the AsyncTaskChain of the indicated name. If the chain still has
146  * tasks, this will block until all tasks are finished.
147  *
148  * Returns true if successful, or false if the chain did not exist.
149  */
151 remove_task_chain(const string &name) {
152  MutexHolder holder(_lock);
153 
154  PT(AsyncTaskChain) chain = new AsyncTaskChain(this, name);
155  TaskChains::iterator tci = _task_chains.find(chain);
156  if (tci == _task_chains.end()) {
157  // No chain.
158  return false;
159  }
160 
161  chain = (*tci);
162 
163  while (chain->_num_tasks != 0) {
164  // Still has tasks.
165  task_cat.info()
166  << "Waiting for tasks on chain " << name << " to finish.\n";
167  chain->do_wait_for_tasks();
168  }
169 
170  // Safe to remove.
171  chain->do_cleanup();
172  _task_chains.erase(tci);
173  return true;
174 }
175 
176 /**
177  * Adds the indicated task to the active queue. It is an error if the task is
178  * already added to this or any other active queue.
179  */
181 add(AsyncTask *task) {
182  nassertv(task->is_runnable());
183 
184  {
185  MutexHolder holder(_lock);
186 
187  if (task_cat.is_debug()) {
188  task_cat.debug()
189  << "Adding " << *task << "\n";
190  }
191 
192  if (task->_state == AsyncTask::S_servicing_removed) {
193  if (task->_manager == this) {
194  // Re-adding a self-removed task; this just means clearing the removed
195  // flag.
196  task->_state = AsyncTask::S_servicing;
197  return;
198  }
199  }
200 
201  nassertv(task->_manager == nullptr &&
202  task->_state == AsyncTask::S_inactive);
203  nassertv(!do_has_task(task));
204 
205  _lock.unlock();
206  task->upon_birth(this);
207  _lock.lock();
208  nassertv(task->_manager == nullptr &&
209  task->_state == AsyncTask::S_inactive);
210  nassertv(!do_has_task(task));
211 
212  AsyncTaskChain *chain = do_find_task_chain(task->_chain_name);
213  if (chain == nullptr) {
214  task_cat.warning()
215  << "Creating implicit AsyncTaskChain " << task->_chain_name
216  << " for " << get_type() << " " << get_name() << "\n";
217  chain = do_make_task_chain(task->_chain_name);
218  }
219  chain->do_add(task);
220  }
221 }
222 
223 /**
224  * Returns true if the indicated task has been added to this AsyncTaskManager,
225  * false otherwise.
226  */
228 has_task(AsyncTask *task) const {
229  MutexHolder holder(_lock);
230 
231  if (task->_manager != this) {
232  nassertr(!do_has_task(task), false);
233  return false;
234  }
235 
236  if (task->_state == AsyncTask::S_servicing_removed) {
237  return false;
238  }
239 
240  // The task might not actually be in the active queue, since it might be
241  // being serviced right now. That's OK.
242  return true;
243 }
244 
245 /**
246  * Returns the first task found with the indicated name, or NULL if there is
247  * no task with the indicated name.
248  *
249  * If there are multiple tasks with the same name, returns one of them
250  * arbitrarily.
251  */
253 find_task(const string &name) const {
254  AsyncTask sample_task(name);
255  sample_task.local_object();
256 
257  TasksByName::const_iterator tbni = _tasks_by_name.lower_bound(&sample_task);
258  if (tbni != _tasks_by_name.end() && (*tbni)->get_name() == name) {
259  return (*tbni);
260  }
261 
262  return nullptr;
263 }
264 
265 /**
266  * Returns the list of tasks found with the indicated name.
267  */
269 find_tasks(const string &name) const {
270  AsyncTask sample_task(name);
271  sample_task.local_object();
272 
273  TasksByName::const_iterator tbni = _tasks_by_name.lower_bound(&sample_task);
274  AsyncTaskCollection result;
275  while (tbni != _tasks_by_name.end() && (*tbni)->get_name() == name) {
276  result.add_task(*tbni);
277  ++tbni;
278  }
279 
280  return result;
281 }
282 
283 /**
284  * Returns the list of tasks found whose name matches the indicated glob
285  * pattern, e.g. "my_task_*".
286  */
288 find_tasks_matching(const GlobPattern &pattern) const {
289  string prefix = pattern.get_const_prefix();
290  AsyncTask sample_task(prefix);
291  sample_task.local_object();
292 
293  TasksByName::const_iterator tbni = _tasks_by_name.lower_bound(&sample_task);
294  AsyncTaskCollection result;
295  while (tbni != _tasks_by_name.end() && (*tbni)->get_name().substr(0, prefix.size()) == prefix) {
296  AsyncTask *task = (*tbni);
297  if (pattern.matches(task->get_name())) {
298  result.add_task(task);
299  }
300  ++tbni;
301  }
302 
303  return result;
304 }
305 
306 /**
307  * Removes the indicated task from the active queue. Returns true if the task
308  * is successfully removed, or false if it wasn't there.
309  */
311 remove(AsyncTask *task) {
312  return task->remove();
313 }
314 
315 /**
316  * Removes all of the tasks in the AsyncTaskCollection. Returns the number of
317  * tasks removed.
318  */
320 remove(const AsyncTaskCollection &tasks) {
321  MutexHolder holder(_lock);
322  size_t num_removed = 0;
323 
324  size_t num_tasks = tasks.get_num_tasks();
325  for (size_t i = 0; i < num_tasks; ++i) {
326  PT(AsyncTask) task = tasks.get_task(i);
327 
328  if (task->_manager != this) {
329  // Not a member of this manager, or already removed.
330  nassertr(!do_has_task(task), num_removed);
331  } else {
332  nassertr(task->_chain->_manager == this, num_removed);
333  if (task_cat.is_debug()) {
334  task_cat.debug()
335  << "Removing " << *task << "\n";
336  }
337  if (task->_chain->do_remove(task, true)) {
338  ++num_removed;
339  } else {
340  if (task_cat.is_debug()) {
341  task_cat.debug()
342  << " (unable to remove " << *task << ")\n";
343  }
344  }
345  }
346  }
347 
348  return num_removed;
349 }
350 
351 /**
352  * Blocks until the task list is empty.
353  */
355 wait_for_tasks() {
356  MutexHolder holder(_lock);
357 
358  // Wait for each of our task chains to finish.
359  while (_num_tasks > 0) {
360  // We iterate through with an index, rather than with an iterator, because
361  // it's possible for a task to adjust the task_chain list during its
362  // execution.
363  for (unsigned int i = 0; i < _task_chains.size(); ++i) {
364  AsyncTaskChain *chain = _task_chains[i];
365  chain->do_wait_for_tasks();
366  }
367  }
368 }
369 
370 /**
371  * Stops any threads that are currently running. If any tasks are still
372  * pending and have not yet been picked up by a thread, they will not be
373  * serviced unless poll() or start_threads() is later called.
374  */
376 stop_threads() {
377  MutexHolder holder(_lock);
378 
379  // We iterate through with an index, rather than with an iterator, because
380  // it's possible for a task to adjust the task_chain list during its
381  // execution.
382  for (unsigned int i = 0; i < _task_chains.size(); ++i) {
383  AsyncTaskChain *chain = _task_chains[i];
384  chain->do_stop_threads();
385  }
386 }
387 
388 /**
389  * Starts any requested threads to service the tasks on the queue. This is
390  * normally not necessary, since adding a task will start the threads
391  * automatically.
392  */
394 start_threads() {
395  MutexHolder holder(_lock);
396 
397  // We iterate through with an index, rather than with an iterator, because
398  // it's possible for a task to adjust the task_chain list during its
399  // execution.
400  for (unsigned int i = 0; i < _task_chains.size(); ++i) {
401  AsyncTaskChain *chain = _task_chains[i];
402 
403  chain->do_start_threads();
404  }
405 }
406 
407 /**
408  * Returns the set of tasks that are active or sleeping on the task manager,
409  * at the time of the call.
410  */
412 get_tasks() const {
413  MutexHolder holder(_lock);
414 
415  AsyncTaskCollection result;
416  TaskChains::const_iterator tci;
417  for (tci = _task_chains.begin();
418  tci != _task_chains.end();
419  ++tci) {
420  AsyncTaskChain *chain = (*tci);
421  result.add_tasks_from(chain->do_get_active_tasks());
422  result.add_tasks_from(chain->do_get_sleeping_tasks());
423  }
424 
425  return result;
426 }
427 
428 /**
429  * Returns the set of tasks that are active (and not sleeping) on the task
430  * manager, at the time of the call.
431  */
433 get_active_tasks() const {
434  MutexHolder holder(_lock);
435 
436  AsyncTaskCollection result;
437  TaskChains::const_iterator tci;
438  for (tci = _task_chains.begin();
439  tci != _task_chains.end();
440  ++tci) {
441  AsyncTaskChain *chain = (*tci);
442  result.add_tasks_from(chain->do_get_active_tasks());
443  }
444 
445  return result;
446 }
447 
448 /**
449  * Returns the set of tasks that are sleeping (and not active) on the task
450  * manager, at the time of the call.
451  */
453 get_sleeping_tasks() const {
454  MutexHolder holder(_lock);
455 
456  AsyncTaskCollection result;
457  TaskChains::const_iterator tci;
458  for (tci = _task_chains.begin();
459  tci != _task_chains.end();
460  ++tci) {
461  AsyncTaskChain *chain = (*tci);
462  result.add_tasks_from(chain->do_get_sleeping_tasks());
463  }
464 
465  return result;
466 }
467 
468 /**
469  * Runs through all the tasks in the task list, once, if the task manager is
470  * running in single-threaded mode (no threads available). This method does
471  * nothing in threaded mode, so it may safely be called in either case.
472  */
474 poll() {
475  MutexHolder holder(_lock);
476 
477  // We iterate through with an index, rather than with an iterator, because
478  // it's possible for a task to adjust the task_chain list during its
479  // execution.
480  for (unsigned int i = 0; i < _task_chains.size(); ++i) {
481  AsyncTaskChain *chain = _task_chains[i];
482  chain->do_poll();
483 
484  if (chain->_state == AsyncTaskChain::S_interrupted) {
485  // If a task returned DS_interrupt, we need to interrupt the entire
486  // manager, since an exception state may have been set.
487  break;
488  }
489  }
490 
491  // Just in case the clock was ticked explicitly by one of our polling
492  // chains.
493  _frame_cvar.notify_all();
494 }
495 
496 /**
497  * Returns the scheduled time (on the manager's clock) of the next sleeping
498  * task, on any task chain, to awaken. Returns -1 if there are no sleeping
499  * tasks.
500  */
501 double AsyncTaskManager::
502 get_next_wake_time() const {
503  MutexHolder holder(_lock);
504 
505  bool got_any = false;
506  double next_wake_time = -1.0;
507 
508  TaskChains::const_iterator tci;
509  for (tci = _task_chains.begin();
510  tci != _task_chains.end();
511  ++tci) {
512  AsyncTaskChain *chain = (*tci);
513  double time = chain->do_get_next_wake_time();
514  if (time >= 0.0) {
515  if (!got_any) {
516  got_any = true;
517  next_wake_time = time;
518  } else {
519  next_wake_time = std::min(time, next_wake_time);
520  }
521  }
522  }
523 
524  return next_wake_time;
525 }
526 
527 /**
528  *
529  */
530 void AsyncTaskManager::
531 output(std::ostream &out) const {
532  MutexHolder holder(_lock);
533  do_output(out);
534 }
535 
536 /**
537  *
538  */
539 void AsyncTaskManager::
540 write(std::ostream &out, int indent_level) const {
541  MutexHolder holder(_lock);
542  indent(out, indent_level)
543  << get_type() << " " << get_name() << "\n";
544 
545  TaskChains::const_iterator tci;
546  for (tci = _task_chains.begin();
547  tci != _task_chains.end();
548  ++tci) {
549  AsyncTaskChain *chain = (*tci);
550  if (chain->_num_tasks != 0) {
551  out << "\n";
552  chain->do_write(out, indent_level + 2);
553  }
554  }
555 }
556 
557 /**
558  * Creates a new AsyncTaskChain of the indicated name and stores it within the
559  * AsyncTaskManager. If a task chain with this name already exists, returns
560  * it instead.
561  *
562  * Assumes the lock is held.
563  */
564 AsyncTaskChain *AsyncTaskManager::
565 do_make_task_chain(const string &name) {
566  PT(AsyncTaskChain) chain = new AsyncTaskChain(this, name);
567 
568  TaskChains::const_iterator tci = _task_chains.insert(chain).first;
569  return (*tci);
570 }
571 
572 /**
573  * Searches a new AsyncTaskChain of the indicated name and returns it if it
574  * exists, or NULL otherwise.
575  *
576  * Assumes the lock is held.
577  */
578 AsyncTaskChain *AsyncTaskManager::
579 do_find_task_chain(const string &name) {
580  PT(AsyncTaskChain) chain = new AsyncTaskChain(this, name);
581 
582  TaskChains::const_iterator tci = _task_chains.find(chain);
583  if (tci != _task_chains.end()) {
584  return (*tci);
585  }
586 
587  return nullptr;
588 }
589 
590 /**
591  * Removes the task from the _tasks_by_name index, if it has a nonempty name.
592  */
593 void AsyncTaskManager::
594 remove_task_by_name(AsyncTask *task) {
595  if (!task->get_name().empty()) {
596  // We have to scan linearly through all of the tasks with the same name.
597  TasksByName::iterator tbni = _tasks_by_name.lower_bound(task);
598  while (tbni != _tasks_by_name.end()) {
599  if ((*tbni) == task) {
600  _tasks_by_name.erase(tbni);
601  return;
602  }
603  if ((*tbni)->get_name() != task->get_name()) {
604  // Too far.
605  break;
606  }
607 
608  ++tbni;
609  }
610 
611  // For some reason, the task wasn't on the index.
612  nassertv(false);
613  }
614 }
615 
616 /**
617  * Returns true if the task is on one of the task lists, false if it is not
618  * (false may mean that the task is currently being serviced). Assumes the
619  * lock is currently held.
620  */
621 bool AsyncTaskManager::
622 do_has_task(AsyncTask *task) const {
623  TaskChains::const_iterator tci;
624  for (tci = _task_chains.begin();
625  tci != _task_chains.end();
626  ++tci) {
627  AsyncTaskChain *chain = (*tci);
628  if (chain->do_has_task(task)) {
629  return true;
630  }
631  }
632 
633  return false;
634 }
635 
636 /**
637  *
638  */
639 void AsyncTaskManager::
640 do_output(std::ostream &out) const {
641  out << get_type() << " " << get_name()
642  << "; " << _num_tasks << " tasks";
643 }
644 
645 /**
646  * Called once per application to create the global task manager object.
647  */
648 void AsyncTaskManager::
649 make_global_ptr() {
650  nassertv(_global_ptr == nullptr);
651 
653  _global_ptr = new AsyncTaskManager("TaskManager");
654  _global_ptr->ref();
655 }
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
The AsyncTaskChain is a subset of the AsyncTaskManager.
A list of tasks, for instance as returned by some of the AsyncTaskManager query functions.
get_num_tasks
Returns the number of AsyncTasks in the collection.
void add_task(AsyncTask *task)
Adds a new AsyncTask to the collection.
get_task
Returns the nth AsyncTask in the collection.
void add_tasks_from(const AsyncTaskCollection &other)
Adds all the AsyncTasks indicated in the other collection to this task.
A class to manage a loose queue of isolated tasks, which can be performed either synchronously (in th...
get_num_task_chains
Returns the number of different task chains.
void wait_for_tasks()
Blocks until the task list is empty.
bool remove(AsyncTask *task)
Removes the indicated task from the active queue.
void add(AsyncTask *task)
Adds the indicated task to the active queue.
AsyncTaskCollection find_tasks_matching(const GlobPattern &pattern) const
Returns the list of tasks found whose name matches the indicated glob pattern, e.g.
void stop_threads()
Stops any threads that are currently running.
void poll()
Runs through all the tasks in the task list, once, if the task manager is running in single-threaded ...
AsyncTaskChain * find_task_chain(const std::string &name)
Searches a new AsyncTaskChain of the indicated name and returns it if it exists, or NULL otherwise.
void cleanup()
Stops all threads and messily empties the task list.
get_sleeping_tasks
Returns the set of tasks that are sleeping (and not active) on the task manager, at the time of the c...
get_next_wake_time
Returns the scheduled time (on the manager's clock) of the next sleeping task, on any task chain,...
AsyncTaskChain * make_task_chain(const std::string &name)
Creates a new AsyncTaskChain of the indicated name and stores it within the AsyncTaskManager.
get_tasks
Returns the set of tasks that are active or sleeping on the task manager, at the time of the call.
AsyncTask * find_task(const std::string &name) const
Returns the first task found with the indicated name, or NULL if there is no task with the indicated ...
get_active_tasks
Returns the set of tasks that are active (and not sleeping) on the task manager, at the time of the c...
get_task_chain
Returns the nth task chain.
bool has_task(AsyncTask *task) const
Returns true if the indicated task has been added to this AsyncTaskManager, false otherwise.
bool remove_task_chain(const std::string &name)
Removes the AsyncTaskChain of the indicated name.
void start_threads()
Starts any requested threads to service the tasks on the queue.
AsyncTaskCollection find_tasks(const std::string &name) const
Returns the list of tasks found with the indicated name.
This class represents a concrete task performed by an AsyncManager.
Definition: asyncTask.h:32
bool remove()
Removes the task from its active manager, if any, and makes the state S_inactive (or possible S_servi...
Definition: asyncTask.cxx:73
A ClockObject keeps track of elapsed real time and discrete time.
Definition: clockObject.h:58
void notify_all()
Informs all of the other threads who are currently blocked on wait() that the relevant condition has ...
This class can be used to test for string matches against standard Unix- shell filename globbing conv...
Definition: globPattern.h:32
std::string get_const_prefix() const
Returns the initial part of the pattern before the first glob character.
Definition: globPattern.cxx:53
bool matches(const std::string &candidate) const
Returns true if the candidate string matches the pattern, false otherwise.
Definition: globPattern.I:122
void unlock()
Alias for release() to match C++11 semantics.
Definition: mutexDirect.I:39
void lock()
Alias for acquire() to match C++11 semantics.
Definition: mutexDirect.I:19
A lightweight C++ object whose constructor calls acquire() and whose destructor calls release() on a ...
Definition: mutexHolder.h:25
A base class for all things which can have a name.
Definition: namable.h:26
void ref() const
Explicitly increments the reference count.
void local_object()
This function should be called, once, immediately after creating a new instance of some ReferenceCoun...
TypeHandle is the identifier used to differentiate C++ class types.
Definition: typeHandle.h:81
iterator_0 begin()
Returns the iterator that marks the first element in the ordered vector.
size_type_0 size() const
Returns the number of elements in the ordered vector.
bool empty() const
Returns true if the ordered vector is empty, false otherwise.
iterator_0 end()
Returns the iterator that marks the end of the ordered vector.
void pop_back()
Removes the last element at the end of the vector.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
void init_memory_hook()
Any code that might need to use PANDA_MALLOC or PANDA_FREE, or any methods of the global memory_hook ...
Definition: dtoolbase.cxx:38
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
std::ostream & indent(std::ostream &out, int indent_level)
A handy function for doing text formatting.
Definition: indent.cxx:20
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.