Panda3D
asyncTaskManager.cxx
Go to the documentation of this file.
1 /**
2  * PANDA 3D SOFTWARE
3  * Copyright (c) Carnegie Mellon University. All rights reserved.
4  *
5  * All use of this software is subject to the terms of the revised BSD
6  * license. You should have received a copy of this license along
7  * with this source code in a file named "LICENSE."
8  *
9  * @file asyncTaskManager.cxx
10  * @author drose
11  * @date 2006-08-23
12  */
13 
14 #include "asyncTaskManager.h"
15 #include "event.h"
16 #include "pt_Event.h"
17 #include "mutexHolder.h"
18 #include "indent.h"
19 #include "pStatClient.h"
20 #include "pStatTimer.h"
21 #include "clockObject.h"
22 #include "config_event.h"
23 #include <algorithm>
24 
25 using std::string;
26 
27 AsyncTaskManager *AsyncTaskManager::_global_ptr = nullptr;
28 
29 TypeHandle AsyncTaskManager::_type_handle;
30 
31 /**
32  *
33  */
34 AsyncTaskManager::
35 AsyncTaskManager(const string &name) :
36  Namable(name),
37  _lock("AsyncTaskManager::_lock"),
38  _num_tasks(0),
39  _clock(ClockObject::get_global_clock()),
40  _frame_cvar(_lock)
41 {
42  // Make a default task chain.
43  do_make_task_chain("default");
44 }
45 
46 /**
47  *
48  */
49 AsyncTaskManager::
50 ~AsyncTaskManager() {
51  cleanup();
52 }
53 
54 /**
55  * Stops all threads and messily empties the task list. This is intended to
56  * be called on destruction only.
57  */
60  MutexHolder holder(_lock);
61 
62  if (task_cat.is_debug()) {
63  do_output(task_cat.debug());
64  task_cat.debug(false)
65  << ": cleanup()\n";
66  }
67 
68  // Iterate carefully in case the tasks adjust the chain list within
69  // cleanup().
70  while (!_task_chains.empty()) {
71  PT(AsyncTaskChain) chain = _task_chains[_task_chains.size() - 1];
72  _task_chains.pop_back();
73  chain->do_cleanup();
74  }
75 
76  // There might be one remaining task, the current task. Especially if it
77  // wasn't running on a thread.
78  if (_num_tasks == 1) {
79  nassertv(_tasks_by_name.size() == 1);
80  TasksByName::const_iterator tbni = _tasks_by_name.begin();
81  AsyncTask *task = (*tbni);
82  nassertv(task->_state == AsyncTask::S_servicing ||
83  task->_state == AsyncTask::S_servicing_removed);
84  task->_state = AsyncTask::S_servicing_removed;
85 
86  } else {
87  // If there isn't exactly one remaining task, there should be none.
88 #ifndef NDEBUG
89  nassertd(_num_tasks == 0 && _tasks_by_name.empty()) {
90  task_cat.error()
91  << "_num_tasks = " << _num_tasks << " _tasks_by_name = " << _tasks_by_name.size() << "\n";
92  TasksByName::const_iterator tbni;
93  for (tbni = _tasks_by_name.begin();
94  tbni != _tasks_by_name.end();
95  ++tbni) {
96  task_cat.error()
97  << " " << *(*tbni) << "\n";
98  }
99  }
100 #endif // NDEBUG
101  }
102 }
103 
104 /**
105  * Returns the number of different task chains.
106  */
107 int AsyncTaskManager::
108 get_num_task_chains() const {
109  MutexHolder holder(_lock);
110  return _task_chains.size();
111 }
112 
113 /**
114  * Returns the nth task chain.
115  */
117 get_task_chain(int n) const {
118  MutexHolder holder(_lock);
119  nassertr(n >= 0 && n < (int)_task_chains.size(), nullptr);
120  return _task_chains[n];
121 }
122 
123 /**
124  * Creates a new AsyncTaskChain of the indicated name and stores it within the
125  * AsyncTaskManager. If a task chain with this name already exists, returns
126  * it instead.
127  */
129 make_task_chain(const string &name) {
130  MutexHolder holder(_lock);
131  return do_make_task_chain(name);
132 }
133 
134 /**
135  * Searches a new AsyncTaskChain of the indicated name and returns it if it
136  * exists, or NULL otherwise.
137  */
139 find_task_chain(const string &name) {
140  MutexHolder holder(_lock);
141  return do_find_task_chain(name);
142 }
143 
144 /**
145  * Removes the AsyncTaskChain of the indicated name. If the chain still has
146  * tasks, this will block until all tasks are finished.
147  *
148  * Returns true if successful, or false if the chain did not exist.
149  */
151 remove_task_chain(const string &name) {
152  MutexHolder holder(_lock);
153 
154  PT(AsyncTaskChain) chain = new AsyncTaskChain(this, name);
155  TaskChains::iterator tci = _task_chains.find(chain);
156  if (tci == _task_chains.end()) {
157  // No chain.
158  return false;
159  }
160 
161  chain = (*tci);
162 
163  while (chain->_num_tasks != 0) {
164  // Still has tasks.
165  task_cat.info()
166  << "Waiting for tasks on chain " << name << " to finish.\n";
167  chain->do_wait_for_tasks();
168  }
169 
170  // Safe to remove.
171  chain->do_cleanup();
172  _task_chains.erase(tci);
173  return true;
174 }
175 
176 /**
177  * Adds the indicated task to the active queue. It is an error if the task is
178  * already added to this or any other active queue.
179  */
181 add(AsyncTask *task) {
182  nassertv(task->is_runnable());
183 
184  {
185  MutexHolder holder(_lock);
186 
187  if (task_cat.is_debug()) {
188  task_cat.debug()
189  << "Adding " << *task << "\n";
190  }
191 
192  if (task->_state == AsyncTask::S_servicing_removed) {
193  if (task->_manager == this) {
194  // Re-adding a self-removed task; this just means clearing the removed
195  // flag.
196  task->_state = AsyncTask::S_servicing;
197  return;
198  }
199  }
200 
201  nassertv(task->_manager == nullptr &&
202  task->_state == AsyncTask::S_inactive);
203  nassertv(!do_has_task(task));
204 
205  _lock.unlock();
206  task->upon_birth(this);
207  _lock.lock();
208  nassertv(task->_manager == nullptr &&
209  task->_state == AsyncTask::S_inactive);
210  nassertv(!do_has_task(task));
211 
212  AsyncTaskChain *chain = do_find_task_chain(task->_chain_name);
213  if (chain == nullptr) {
214  task_cat.warning()
215  << "Creating implicit AsyncTaskChain " << task->_chain_name
216  << " for " << get_type() << " " << get_name() << "\n";
217  chain = do_make_task_chain(task->_chain_name);
218  }
219  chain->do_add(task);
220  }
221 }
222 
223 /**
224  * Returns true if the indicated task has been added to this AsyncTaskManager,
225  * false otherwise.
226  */
228 has_task(AsyncTask *task) const {
229  MutexHolder holder(_lock);
230 
231  if (task->_manager != this) {
232  nassertr(!do_has_task(task), false);
233  return false;
234  }
235 
236  if (task->_state == AsyncTask::S_servicing_removed) {
237  return false;
238  }
239 
240  // The task might not actually be in the active queue, since it might be
241  // being serviced right now. That's OK.
242  return true;
243 }
244 
245 /**
246  * Returns the first task found with the indicated name, or NULL if there is
247  * no task with the indicated name.
248  *
249  * If there are multiple tasks with the same name, returns one of them
250  * arbitrarily.
251  */
253 find_task(const string &name) const {
254  AsyncTask sample_task(name);
255  sample_task.local_object();
256 
257  TasksByName::const_iterator tbni = _tasks_by_name.lower_bound(&sample_task);
258  if (tbni != _tasks_by_name.end() && (*tbni)->get_name() == name) {
259  return (*tbni);
260  }
261 
262  return nullptr;
263 }
264 
265 /**
266  * Returns the list of tasks found with the indicated name.
267  */
269 find_tasks(const string &name) const {
270  AsyncTask sample_task(name);
271  sample_task.local_object();
272 
273  TasksByName::const_iterator tbni = _tasks_by_name.lower_bound(&sample_task);
274  AsyncTaskCollection result;
275  while (tbni != _tasks_by_name.end() && (*tbni)->get_name() == name) {
276  result.add_task(*tbni);
277  ++tbni;
278  }
279 
280  return result;
281 }
282 
283 /**
284  * Returns the list of tasks found whose name matches the indicated glob
285  * pattern, e.g. "my_task_*".
286  */
288 find_tasks_matching(const GlobPattern &pattern) const {
289  string prefix = pattern.get_const_prefix();
290  AsyncTask sample_task(prefix);
291  sample_task.local_object();
292 
293  TasksByName::const_iterator tbni = _tasks_by_name.lower_bound(&sample_task);
294  AsyncTaskCollection result;
295  while (tbni != _tasks_by_name.end() && (*tbni)->get_name().substr(0, prefix.size()) == prefix) {
296  AsyncTask *task = (*tbni);
297  if (pattern.matches(task->get_name())) {
298  result.add_task(task);
299  }
300  ++tbni;
301  }
302 
303  return result;
304 }
305 
306 /**
307  * Removes the indicated task from the active queue. Returns true if the task
308  * is successfully removed, or false if it wasn't there.
309  */
312  return task->remove();
313 }
314 
315 /**
316  * Removes all of the tasks in the AsyncTaskCollection. Returns the number of
317  * tasks removed.
318  */
319 size_t AsyncTaskManager::
321  MutexHolder holder(_lock);
322  size_t num_removed = 0;
323 
324  size_t num_tasks = tasks.get_num_tasks();
325  for (size_t i = 0; i < num_tasks; ++i) {
326  PT(AsyncTask) task = tasks.get_task(i);
327 
328  if (task->_manager != this) {
329  // Not a member of this manager, or already removed.
330  nassertr(!do_has_task(task), num_removed);
331  } else {
332  nassertr(task->_chain->_manager == this, num_removed);
333  if (task_cat.is_debug()) {
334  task_cat.debug()
335  << "Removing " << *task << "\n";
336  }
337  if (task->_chain->do_remove(task, true)) {
338  ++num_removed;
339  } else {
340  if (task_cat.is_debug()) {
341  task_cat.debug()
342  << " (unable to remove " << *task << ")\n";
343  }
344  }
345  }
346  }
347 
348  return num_removed;
349 }
350 
351 /**
352  * Blocks until the task list is empty.
353  */
356  MutexHolder holder(_lock);
357 
358  // Wait for each of our task chains to finish.
359  while (_num_tasks > 0) {
360  // We iterate through with an index, rather than with an iterator, because
361  // it's possible for a task to adjust the task_chain list during its
362  // execution.
363  for (unsigned int i = 0; i < _task_chains.size(); ++i) {
364  AsyncTaskChain *chain = _task_chains[i];
365  chain->do_wait_for_tasks();
366  }
367  }
368 }
369 
370 /**
371  * Stops any threads that are currently running. If any tasks are still
372  * pending and have not yet been picked up by a thread, they will not be
373  * serviced unless poll() or start_threads() is later called.
374  */
377  MutexHolder holder(_lock);
378 
379  // We iterate through with an index, rather than with an iterator, because
380  // it's possible for a task to adjust the task_chain list during its
381  // execution.
382  for (unsigned int i = 0; i < _task_chains.size(); ++i) {
383  AsyncTaskChain *chain = _task_chains[i];
384  chain->do_stop_threads();
385  }
386 }
387 
388 /**
389  * Starts any requested threads to service the tasks on the queue. This is
390  * normally not necessary, since adding a task will start the threads
391  * automatically.
392  */
395  MutexHolder holder(_lock);
396 
397  // We iterate through with an index, rather than with an iterator, because
398  // it's possible for a task to adjust the task_chain list during its
399  // execution.
400  for (unsigned int i = 0; i < _task_chains.size(); ++i) {
401  AsyncTaskChain *chain = _task_chains[i];
402 
403  chain->do_start_threads();
404  }
405 }
406 
407 /**
408  * Returns the set of tasks that are active or sleeping on the task manager,
409  * at the time of the call.
410  */
411 AsyncTaskCollection AsyncTaskManager::
412 get_tasks() const {
413  MutexHolder holder(_lock);
414 
415  AsyncTaskCollection result;
416  TaskChains::const_iterator tci;
417  for (tci = _task_chains.begin();
418  tci != _task_chains.end();
419  ++tci) {
420  AsyncTaskChain *chain = (*tci);
421  result.add_tasks_from(chain->do_get_active_tasks());
422  result.add_tasks_from(chain->do_get_sleeping_tasks());
423  }
424 
425  return result;
426 }
427 
428 /**
429  * Returns the set of tasks that are active (and not sleeping) on the task
430  * manager, at the time of the call.
431  */
432 AsyncTaskCollection AsyncTaskManager::
433 get_active_tasks() const {
434  MutexHolder holder(_lock);
435 
436  AsyncTaskCollection result;
437  TaskChains::const_iterator tci;
438  for (tci = _task_chains.begin();
439  tci != _task_chains.end();
440  ++tci) {
441  AsyncTaskChain *chain = (*tci);
442  result.add_tasks_from(chain->do_get_active_tasks());
443  }
444 
445  return result;
446 }
447 
448 /**
449  * Returns the set of tasks that are sleeping (and not active) on the task
450  * manager, at the time of the call.
451  */
452 AsyncTaskCollection AsyncTaskManager::
453 get_sleeping_tasks() const {
454  MutexHolder holder(_lock);
455 
456  AsyncTaskCollection result;
457  TaskChains::const_iterator tci;
458  for (tci = _task_chains.begin();
459  tci != _task_chains.end();
460  ++tci) {
461  AsyncTaskChain *chain = (*tci);
462  result.add_tasks_from(chain->do_get_sleeping_tasks());
463  }
464 
465  return result;
466 }
467 
468 /**
469  * Runs through all the tasks in the task list, once, if the task manager is
470  * running in single-threaded mode (no threads available). This method does
471  * nothing in threaded mode, so it may safely be called in either case.
472  */
474 poll() {
475  MutexHolder holder(_lock);
476 
477  // We iterate through with an index, rather than with an iterator, because
478  // it's possible for a task to adjust the task_chain list during its
479  // execution.
480  for (unsigned int i = 0; i < _task_chains.size(); ++i) {
481  AsyncTaskChain *chain = _task_chains[i];
482  chain->do_poll();
483  }
484 
485  // Just in case the clock was ticked explicitly by one of our polling
486  // chains.
487  _frame_cvar.notify_all();
488 }
489 
490 /**
491  * Returns the scheduled time (on the manager's clock) of the next sleeping
492  * task, on any task chain, to awaken. Returns -1 if there are no sleeping
493  * tasks.
494  */
495 double AsyncTaskManager::
496 get_next_wake_time() const {
497  MutexHolder holder(_lock);
498 
499  bool got_any = false;
500  double next_wake_time = -1.0;
501 
502  TaskChains::const_iterator tci;
503  for (tci = _task_chains.begin();
504  tci != _task_chains.end();
505  ++tci) {
506  AsyncTaskChain *chain = (*tci);
507  double time = chain->do_get_next_wake_time();
508  if (time >= 0.0) {
509  if (!got_any) {
510  got_any = true;
511  next_wake_time = time;
512  } else {
513  next_wake_time = std::min(time, next_wake_time);
514  }
515  }
516  }
517 
518  return next_wake_time;
519 }
520 
521 /**
522  *
523  */
524 void AsyncTaskManager::
525 output(std::ostream &out) const {
526  MutexHolder holder(_lock);
527  do_output(out);
528 }
529 
530 /**
531  *
532  */
533 void AsyncTaskManager::
534 write(std::ostream &out, int indent_level) const {
535  MutexHolder holder(_lock);
536  indent(out, indent_level)
537  << get_type() << " " << get_name() << "\n";
538 
539  TaskChains::const_iterator tci;
540  for (tci = _task_chains.begin();
541  tci != _task_chains.end();
542  ++tci) {
543  AsyncTaskChain *chain = (*tci);
544  if (chain->_num_tasks != 0) {
545  out << "\n";
546  chain->do_write(out, indent_level + 2);
547  }
548  }
549 }
550 
551 /**
552  * Creates a new AsyncTaskChain of the indicated name and stores it within the
553  * AsyncTaskManager. If a task chain with this name already exists, returns
554  * it instead.
555  *
556  * Assumes the lock is held.
557  */
558 AsyncTaskChain *AsyncTaskManager::
559 do_make_task_chain(const string &name) {
560  PT(AsyncTaskChain) chain = new AsyncTaskChain(this, name);
561 
562  TaskChains::const_iterator tci = _task_chains.insert(chain).first;
563  return (*tci);
564 }
565 
566 /**
567  * Searches a new AsyncTaskChain of the indicated name and returns it if it
568  * exists, or NULL otherwise.
569  *
570  * Assumes the lock is held.
571  */
572 AsyncTaskChain *AsyncTaskManager::
573 do_find_task_chain(const string &name) {
574  PT(AsyncTaskChain) chain = new AsyncTaskChain(this, name);
575 
576  TaskChains::const_iterator tci = _task_chains.find(chain);
577  if (tci != _task_chains.end()) {
578  return (*tci);
579  }
580 
581  return nullptr;
582 }
583 
584 /**
585  * Removes the task from the _tasks_by_name index, if it has a nonempty name.
586  */
587 void AsyncTaskManager::
588 remove_task_by_name(AsyncTask *task) {
589  if (!task->get_name().empty()) {
590  // We have to scan linearly through all of the tasks with the same name.
591  TasksByName::iterator tbni = _tasks_by_name.lower_bound(task);
592  while (tbni != _tasks_by_name.end()) {
593  if ((*tbni) == task) {
594  _tasks_by_name.erase(tbni);
595  return;
596  }
597  if ((*tbni)->get_name() != task->get_name()) {
598  // Too far.
599  break;
600  }
601 
602  ++tbni;
603  }
604 
605  // For some reason, the task wasn't on the index.
606  nassertv(false);
607  }
608 }
609 
610 /**
611  * Returns true if the task is on one of the task lists, false if it is not
612  * (false may mean that the task is currently being serviced). Assumes the
613  * lock is currently held.
614  */
615 bool AsyncTaskManager::
616 do_has_task(AsyncTask *task) const {
617  TaskChains::const_iterator tci;
618  for (tci = _task_chains.begin();
619  tci != _task_chains.end();
620  ++tci) {
621  AsyncTaskChain *chain = (*tci);
622  if (chain->do_has_task(task)) {
623  return true;
624  }
625  }
626 
627  return false;
628 }
629 
630 /**
631  *
632  */
633 void AsyncTaskManager::
634 do_output(std::ostream &out) const {
635  out << get_type() << " " << get_name()
636  << "; " << _num_tasks << " tasks";
637 }
638 
639 /**
640  * Called once per application to create the global task manager object.
641  */
642 void AsyncTaskManager::
643 make_global_ptr() {
644  nassertv(_global_ptr == nullptr);
645 
647  _global_ptr = new AsyncTaskManager("TaskManager");
648  _global_ptr->ref();
649 }
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
void pop_back()
Removes the last element at the end of the vector.
bool remove(AsyncTask *task)
Removes the indicated task from the active queue.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
bool remove_task_chain(const std::string &name)
Removes the AsyncTaskChain of the indicated name.
bool remove()
Removes the task from its active manager, if any, and makes the state S_inactive (or possible S_servi...
Definition: asyncTask.cxx:73
get_task
Returns the nth AsyncTask in the collection.
A class to manage a loose queue of isolated tasks, which can be performed either synchronously (in th...
size_type_0 size() const
Returns the number of elements in the ordered vector.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
void start_threads()
Starts any requested threads to service the tasks on the queue.
void cleanup()
Stops all threads and messily empties the task list.
iterator_0 begin()
Returns the iterator that marks the first element in the ordered vector.
void add_task(AsyncTask *task)
Adds a new AsyncTask to the collection.
A list of tasks, for instance as returned by some of the AsyncTaskManager query functions.
AsyncTaskCollection find_tasks_matching(const GlobPattern &pattern) const
Returns the list of tasks found whose name matches the indicated glob pattern, e.g.
void init_memory_hook()
Any code that might need to use PANDA_MALLOC or PANDA_FREE, or any methods of the global memory_hook ...
Definition: dtoolbase.cxx:38
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
A lightweight C++ object whose constructor calls acquire() and whose destructor calls release() on a ...
Definition: mutexHolder.h:25
iterator_0 end()
Returns the iterator that marks the end of the ordered vector.
get_task_chain
Returns the nth task chain.
void unlock()
Alias for release() to match C++11 semantics.
Definition: mutexDirect.I:39
bool empty() const
Returns true if the ordered vector is empty, false otherwise.
bool matches(const std::string &candidate) const
Returns true if the candidate string matches the pattern, false otherwise.
Definition: globPattern.I:122
void notify_all()
Informs all of the other threads who are currently blocked on wait() that the relevant condition has ...
AsyncTaskChain * find_task_chain(const std::string &name)
Searches a new AsyncTaskChain of the indicated name and returns it if it exists, or NULL otherwise.
bool has_task(AsyncTask *task) const
Returns true if the indicated task has been added to this AsyncTaskManager, false otherwise.
A base class for all things which can have a name.
Definition: namable.h:26
void wait_for_tasks()
Blocks until the task list is empty.
void stop_threads()
Stops any threads that are currently running.
void add(AsyncTask *task)
Adds the indicated task to the active queue.
std::ostream & indent(std::ostream &out, int indent_level)
A handy function for doing text formatting.
Definition: indent.cxx:20
AsyncTaskChain * make_task_chain(const std::string &name)
Creates a new AsyncTaskChain of the indicated name and stores it within the AsyncTaskManager.
A ClockObject keeps track of elapsed real time and discrete time.
Definition: clockObject.h:58
void add_tasks_from(const AsyncTaskCollection &other)
Adds all the AsyncTasks indicated in the other collection to this task.
std::string get_const_prefix() const
Returns the initial part of the pattern before the first glob character.
Definition: globPattern.cxx:53
AsyncTaskCollection find_tasks(const std::string &name) const
Returns the list of tasks found with the indicated name.
The AsyncTaskChain is a subset of the AsyncTaskManager.
void ref() const
Explicitly increments the reference count.
void local_object()
This function should be called, once, immediately after creating a new instance of some ReferenceCoun...
This class represents a concrete task performed by an AsyncManager.
Definition: asyncTask.h:32
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
TypeHandle is the identifier used to differentiate C++ class types.
Definition: typeHandle.h:81
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
void lock()
Alias for acquire() to match C++11 semantics.
Definition: mutexDirect.I:19
get_num_tasks
Returns the number of AsyncTasks in the collection.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
AsyncTask * find_task(const std::string &name) const
Returns the first task found with the indicated name, or NULL if there is no task with the indicated ...
This class can be used to test for string matches against standard Unix- shell filename globbing conv...
Definition: globPattern.h:32
void poll()
Runs through all the tasks in the task list, once, if the task manager is running in single-threaded ...