Panda3D
Loading...
Searching...
No Matches
asyncTaskManager.cxx
Go to the documentation of this file.
1/**
2 * PANDA 3D SOFTWARE
3 * Copyright (c) Carnegie Mellon University. All rights reserved.
4 *
5 * All use of this software is subject to the terms of the revised BSD
6 * license. You should have received a copy of this license along
7 * with this source code in a file named "LICENSE."
8 *
9 * @file asyncTaskManager.cxx
10 * @author drose
11 * @date 2006-08-23
12 */
13
14#include "asyncTaskManager.h"
15#include "event.h"
16#include "pt_Event.h"
17#include "mutexHolder.h"
18#include "indent.h"
19#include "pStatClient.h"
20#include "pStatTimer.h"
21#include "clockObject.h"
22#include "config_event.h"
23#include <algorithm>
24
25using std::string;
26
27AsyncTaskManager *AsyncTaskManager::_global_ptr = nullptr;
28
29TypeHandle AsyncTaskManager::_type_handle;
30
31/**
32 *
33 */
34AsyncTaskManager::
35AsyncTaskManager(const string &name) :
36 Namable(name),
37 _lock("AsyncTaskManager::_lock"),
38 _num_tasks(0),
39 _clock(ClockObject::get_global_clock()),
40 _frame_cvar(_lock)
41{
42 // Make a default task chain.
43 do_make_task_chain("default");
44}
45
46/**
47 *
48 */
49AsyncTaskManager::
50~AsyncTaskManager() {
51 cleanup();
52}
53
54/**
55 * Stops all threads and messily empties the task list. This is intended to
56 * be called on destruction only.
57 */
59cleanup() {
60 MutexHolder holder(_lock);
61
62 if (task_cat.is_debug()) {
63 do_output(task_cat.debug());
64 task_cat.debug(false)
65 << ": cleanup()\n";
66 }
67
68 // Iterate carefully in case the tasks adjust the chain list within
69 // cleanup().
70 while (!_task_chains.empty()) {
71 PT(AsyncTaskChain) chain = _task_chains[_task_chains.size() - 1];
72 _task_chains.pop_back();
73 chain->do_cleanup();
74 }
75
76 // There might be one remaining task, the current task. Especially if it
77 // wasn't running on a thread.
78 if (_num_tasks == 1) {
79 nassertv(_tasks_by_name.size() == 1);
80 TasksByName::const_iterator tbni = _tasks_by_name.begin();
81 AsyncTask *task = (*tbni);
82 nassertv(task->_state == AsyncTask::S_servicing ||
83 task->_state == AsyncTask::S_servicing_removed);
84 task->_state = AsyncTask::S_servicing_removed;
85
86 } else {
87 // If there isn't exactly one remaining task, there should be none.
88#ifndef NDEBUG
89 nassertd(_num_tasks == 0 && _tasks_by_name.empty()) {
90 task_cat.error()
91 << "_num_tasks = " << _num_tasks << " _tasks_by_name = " << _tasks_by_name.size() << "\n";
92 TasksByName::const_iterator tbni;
93 for (tbni = _tasks_by_name.begin();
94 tbni != _tasks_by_name.end();
95 ++tbni) {
96 task_cat.error()
97 << " " << *(*tbni) << "\n";
98 }
99 }
100#endif // NDEBUG
101 }
102}
103
104/**
105 * Returns the number of different task chains.
106 */
108get_num_task_chains() const {
109 MutexHolder holder(_lock);
110 return _task_chains.size();
111}
112
113/**
114 * Returns the nth task chain.
115 */
117get_task_chain(int n) const {
118 MutexHolder holder(_lock);
119 nassertr(n >= 0 && n < (int)_task_chains.size(), nullptr);
120 return _task_chains[n];
121}
122
123/**
124 * Creates a new AsyncTaskChain of the indicated name and stores it within the
125 * AsyncTaskManager. If a task chain with this name already exists, returns
126 * it instead.
127 */
129make_task_chain(const string &name) {
130 MutexHolder holder(_lock);
131 return do_make_task_chain(name);
132}
133
134/**
135 * Searches a new AsyncTaskChain of the indicated name and returns it if it
136 * exists, or NULL otherwise.
137 */
139find_task_chain(const string &name) {
140 MutexHolder holder(_lock);
141 return do_find_task_chain(name);
142}
143
144/**
145 * Removes the AsyncTaskChain of the indicated name. If the chain still has
146 * tasks, this will block until all tasks are finished.
147 *
148 * Returns true if successful, or false if the chain did not exist.
149 */
151remove_task_chain(const string &name) {
152 MutexHolder holder(_lock);
153
154 PT(AsyncTaskChain) chain = new AsyncTaskChain(this, name);
155 TaskChains::iterator tci = _task_chains.find(chain);
156 if (tci == _task_chains.end()) {
157 // No chain.
158 return false;
159 }
160
161 chain = (*tci);
162
163 while (chain->_num_tasks != 0) {
164 // Still has tasks.
165 task_cat.info()
166 << "Waiting for tasks on chain " << name << " to finish.\n";
167 chain->do_wait_for_tasks();
168 }
169
170 // Safe to remove.
171 chain->do_cleanup();
172 _task_chains.erase(tci);
173 return true;
174}
175
176/**
177 * Adds the indicated task to the active queue. It is an error if the task is
178 * already added to this or any other active queue.
179 */
181add(AsyncTask *task) {
182 nassertv(task->is_runnable());
183
184 {
185 MutexHolder holder(_lock);
186
187 if (task_cat.is_debug()) {
188 task_cat.debug()
189 << "Adding " << *task << "\n";
190 }
191
192 if (task->_state == AsyncTask::S_servicing_removed) {
193 if (task->_manager == this) {
194 // Re-adding a self-removed task; this just means clearing the removed
195 // flag.
196 task->_state = AsyncTask::S_servicing;
197 return;
198 }
199 }
200
201 nassertv(task->_manager == nullptr &&
202 task->_state == AsyncTask::S_inactive);
203 nassertv(!do_has_task(task));
204
205 _lock.unlock();
206 task->upon_birth(this);
207 _lock.lock();
208 nassertv(task->_manager == nullptr &&
209 task->_state == AsyncTask::S_inactive);
210 nassertv(!do_has_task(task));
211
212 AsyncTaskChain *chain = do_find_task_chain(task->_chain_name);
213 if (chain == nullptr) {
214 task_cat.warning()
215 << "Creating implicit AsyncTaskChain " << task->_chain_name
216 << " for " << get_type() << " " << get_name() << "\n";
217 chain = do_make_task_chain(task->_chain_name);
218 }
219 chain->do_add(task);
220 }
221}
222
223/**
224 * Returns true if the indicated task has been added to this AsyncTaskManager,
225 * false otherwise.
226 */
228has_task(AsyncTask *task) const {
229 MutexHolder holder(_lock);
230
231 if (task->_manager != this) {
232 nassertr(!do_has_task(task), false);
233 return false;
234 }
235
236 if (task->_state == AsyncTask::S_servicing_removed) {
237 return false;
238 }
239
240 // The task might not actually be in the active queue, since it might be
241 // being serviced right now. That's OK.
242 return true;
243}
244
245/**
246 * Returns the first task found with the indicated name, or NULL if there is
247 * no task with the indicated name.
248 *
249 * If there are multiple tasks with the same name, returns one of them
250 * arbitrarily.
251 */
253find_task(const string &name) const {
254 AsyncTask sample_task(name);
255 sample_task.local_object();
256
257 TasksByName::const_iterator tbni = _tasks_by_name.lower_bound(&sample_task);
258 if (tbni != _tasks_by_name.end() && (*tbni)->get_name() == name) {
259 return (*tbni);
260 }
261
262 return nullptr;
263}
264
265/**
266 * Returns the list of tasks found with the indicated name.
267 */
269find_tasks(const string &name) const {
270 AsyncTask sample_task(name);
271 sample_task.local_object();
272
273 TasksByName::const_iterator tbni = _tasks_by_name.lower_bound(&sample_task);
274 AsyncTaskCollection result;
275 while (tbni != _tasks_by_name.end() && (*tbni)->get_name() == name) {
276 result.add_task(*tbni);
277 ++tbni;
278 }
279
280 return result;
281}
282
283/**
284 * Returns the list of tasks found whose name matches the indicated glob
285 * pattern, e.g. "my_task_*".
286 */
288find_tasks_matching(const GlobPattern &pattern) const {
289 string prefix = pattern.get_const_prefix();
290 AsyncTask sample_task(prefix);
291 sample_task.local_object();
292
293 TasksByName::const_iterator tbni = _tasks_by_name.lower_bound(&sample_task);
294 AsyncTaskCollection result;
295 while (tbni != _tasks_by_name.end() && (*tbni)->get_name().substr(0, prefix.size()) == prefix) {
296 AsyncTask *task = (*tbni);
297 if (pattern.matches(task->get_name())) {
298 result.add_task(task);
299 }
300 ++tbni;
301 }
302
303 return result;
304}
305
306/**
307 * Removes the indicated task from the active queue. Returns true if the task
308 * is successfully removed, or false if it wasn't there.
309 */
311remove(AsyncTask *task) {
312 return task->remove();
313}
314
315/**
316 * Removes all of the tasks in the AsyncTaskCollection. Returns the number of
317 * tasks removed.
318 */
320remove(const AsyncTaskCollection &tasks) {
321 MutexHolder holder(_lock);
322 size_t num_removed = 0;
323
324 size_t num_tasks = tasks.get_num_tasks();
325 for (size_t i = 0; i < num_tasks; ++i) {
326 PT(AsyncTask) task = tasks.get_task(i);
327
328 if (task->_manager != this) {
329 // Not a member of this manager, or already removed.
330 nassertr(!do_has_task(task), num_removed);
331 } else {
332 nassertr(task->_chain->_manager == this, num_removed);
333 if (task_cat.is_debug()) {
334 task_cat.debug()
335 << "Removing " << *task << "\n";
336 }
337 if (task->_chain->do_remove(task, true)) {
338 ++num_removed;
339 } else {
340 if (task_cat.is_debug()) {
341 task_cat.debug()
342 << " (unable to remove " << *task << ")\n";
343 }
344 }
345 }
346 }
347
348 return num_removed;
349}
350
351/**
352 * Blocks until the task list is empty.
353 */
356 MutexHolder holder(_lock);
357
358 // Wait for each of our task chains to finish.
359 while (_num_tasks > 0) {
360 // We iterate through with an index, rather than with an iterator, because
361 // it's possible for a task to adjust the task_chain list during its
362 // execution.
363 for (unsigned int i = 0; i < _task_chains.size(); ++i) {
364 AsyncTaskChain *chain = _task_chains[i];
365 chain->do_wait_for_tasks();
366 }
367 }
368}
369
370/**
371 * Stops any threads that are currently running. If any tasks are still
372 * pending and have not yet been picked up by a thread, they will not be
373 * serviced unless poll() or start_threads() is later called.
374 */
376stop_threads() {
377 MutexHolder holder(_lock);
378
379 // We iterate through with an index, rather than with an iterator, because
380 // it's possible for a task to adjust the task_chain list during its
381 // execution.
382 for (unsigned int i = 0; i < _task_chains.size(); ++i) {
383 AsyncTaskChain *chain = _task_chains[i];
384 chain->do_stop_threads();
385 }
386}
387
388/**
389 * Starts any requested threads to service the tasks on the queue. This is
390 * normally not necessary, since adding a task will start the threads
391 * automatically.
392 */
395 MutexHolder holder(_lock);
396
397 // We iterate through with an index, rather than with an iterator, because
398 // it's possible for a task to adjust the task_chain list during its
399 // execution.
400 for (unsigned int i = 0; i < _task_chains.size(); ++i) {
401 AsyncTaskChain *chain = _task_chains[i];
402
403 chain->do_start_threads();
404 }
405}
406
407/**
408 * Returns the set of tasks that are active or sleeping on the task manager,
409 * at the time of the call.
410 */
412get_tasks() const {
413 MutexHolder holder(_lock);
414
415 AsyncTaskCollection result;
416 TaskChains::const_iterator tci;
417 for (tci = _task_chains.begin();
418 tci != _task_chains.end();
419 ++tci) {
420 AsyncTaskChain *chain = (*tci);
421 result.add_tasks_from(chain->do_get_active_tasks());
422 result.add_tasks_from(chain->do_get_sleeping_tasks());
423 }
424
425 return result;
426}
427
428/**
429 * Returns the set of tasks that are active (and not sleeping) on the task
430 * manager, at the time of the call.
431 */
433get_active_tasks() const {
434 MutexHolder holder(_lock);
435
436 AsyncTaskCollection result;
437 TaskChains::const_iterator tci;
438 for (tci = _task_chains.begin();
439 tci != _task_chains.end();
440 ++tci) {
441 AsyncTaskChain *chain = (*tci);
442 result.add_tasks_from(chain->do_get_active_tasks());
443 }
444
445 return result;
446}
447
448/**
449 * Returns the set of tasks that are sleeping (and not active) on the task
450 * manager, at the time of the call.
451 */
453get_sleeping_tasks() const {
454 MutexHolder holder(_lock);
455
456 AsyncTaskCollection result;
457 TaskChains::const_iterator tci;
458 for (tci = _task_chains.begin();
459 tci != _task_chains.end();
460 ++tci) {
461 AsyncTaskChain *chain = (*tci);
462 result.add_tasks_from(chain->do_get_sleeping_tasks());
463 }
464
465 return result;
466}
467
468/**
469 * Runs through all the tasks in the task list, once, if the task manager is
470 * running in single-threaded mode (no threads available). This method does
471 * nothing in threaded mode, so it may safely be called in either case.
472 */
474poll() {
475 MutexHolder holder(_lock);
476
477 // We iterate through with an index, rather than with an iterator, because
478 // it's possible for a task to adjust the task_chain list during its
479 // execution.
480 for (unsigned int i = 0; i < _task_chains.size(); ++i) {
481 AsyncTaskChain *chain = _task_chains[i];
482 chain->do_poll();
483
484 if (chain->_state == AsyncTaskChain::S_interrupted) {
485 // If a task returned DS_interrupt, we need to interrupt the entire
486 // manager, since an exception state may have been set.
487 break;
488 }
489 }
490
491 // Just in case the clock was ticked explicitly by one of our polling
492 // chains.
493 _frame_cvar.notify_all();
494}
495
496/**
497 * Returns the scheduled time (on the manager's clock) of the next sleeping
498 * task, on any task chain, to awaken. Returns -1 if there are no sleeping
499 * tasks.
500 */
502get_next_wake_time() const {
503 MutexHolder holder(_lock);
504
505 bool got_any = false;
506 double next_wake_time = -1.0;
507
508 TaskChains::const_iterator tci;
509 for (tci = _task_chains.begin();
510 tci != _task_chains.end();
511 ++tci) {
512 AsyncTaskChain *chain = (*tci);
513 double time = chain->do_get_next_wake_time();
514 if (time >= 0.0) {
515 if (!got_any) {
516 got_any = true;
517 next_wake_time = time;
518 } else {
519 next_wake_time = std::min(time, next_wake_time);
520 }
521 }
522 }
523
524 return next_wake_time;
525}
526
527/**
528 *
529 */
530void AsyncTaskManager::
531output(std::ostream &out) const {
532 MutexHolder holder(_lock);
533 do_output(out);
534}
535
536/**
537 *
538 */
539void AsyncTaskManager::
540write(std::ostream &out, int indent_level) const {
541 MutexHolder holder(_lock);
542 indent(out, indent_level)
543 << get_type() << " " << get_name() << "\n";
544
545 TaskChains::const_iterator tci;
546 for (tci = _task_chains.begin();
547 tci != _task_chains.end();
548 ++tci) {
549 AsyncTaskChain *chain = (*tci);
550 if (chain->_num_tasks != 0) {
551 out << "\n";
552 chain->do_write(out, indent_level + 2);
553 }
554 }
555}
556
557/**
558 * Creates a new AsyncTaskChain of the indicated name and stores it within the
559 * AsyncTaskManager. If a task chain with this name already exists, returns
560 * it instead.
561 *
562 * Assumes the lock is held.
563 */
564AsyncTaskChain *AsyncTaskManager::
565do_make_task_chain(const string &name) {
566 PT(AsyncTaskChain) chain = new AsyncTaskChain(this, name);
567
568 TaskChains::const_iterator tci = _task_chains.insert(chain).first;
569 return (*tci);
570}
571
572/**
573 * Searches a new AsyncTaskChain of the indicated name and returns it if it
574 * exists, or NULL otherwise.
575 *
576 * Assumes the lock is held.
577 */
578AsyncTaskChain *AsyncTaskManager::
579do_find_task_chain(const string &name) {
580 PT(AsyncTaskChain) chain = new AsyncTaskChain(this, name);
581
582 TaskChains::const_iterator tci = _task_chains.find(chain);
583 if (tci != _task_chains.end()) {
584 return (*tci);
585 }
586
587 return nullptr;
588}
589
590/**
591 * Removes the task from the _tasks_by_name index, if it has a nonempty name.
592 */
593void AsyncTaskManager::
594remove_task_by_name(AsyncTask *task) {
595 if (!task->get_name().empty()) {
596 // We have to scan linearly through all of the tasks with the same name.
597 TasksByName::iterator tbni = _tasks_by_name.lower_bound(task);
598 while (tbni != _tasks_by_name.end()) {
599 if ((*tbni) == task) {
600 _tasks_by_name.erase(tbni);
601 return;
602 }
603 if ((*tbni)->get_name() != task->get_name()) {
604 // Too far.
605 break;
606 }
607
608 ++tbni;
609 }
610
611 // For some reason, the task wasn't on the index.
612 nassertv(false);
613 }
614}
615
616/**
617 * Returns true if the task is on one of the task lists, false if it is not
618 * (false may mean that the task is currently being serviced). Assumes the
619 * lock is currently held.
620 */
621bool AsyncTaskManager::
622do_has_task(AsyncTask *task) const {
623 TaskChains::const_iterator tci;
624 for (tci = _task_chains.begin();
625 tci != _task_chains.end();
626 ++tci) {
627 AsyncTaskChain *chain = (*tci);
628 if (chain->do_has_task(task)) {
629 return true;
630 }
631 }
632
633 return false;
634}
635
636/**
637 *
638 */
639void AsyncTaskManager::
640do_output(std::ostream &out) const {
641 out << get_type() << " " << get_name()
642 << "; " << _num_tasks << " tasks";
643}
644
645/**
646 * Called once per application to create the global task manager object.
647 */
648void AsyncTaskManager::
649make_global_ptr() {
650 nassertv(_global_ptr == nullptr);
651
653 _global_ptr = new AsyncTaskManager("TaskManager");
654 _global_ptr->ref();
655}
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
The AsyncTaskChain is a subset of the AsyncTaskManager.
A list of tasks, for instance as returned by some of the AsyncTaskManager query functions.
get_num_tasks
Returns the number of AsyncTasks in the collection.
void add_task(AsyncTask *task)
Adds a new AsyncTask to the collection.
get_task
Returns the nth AsyncTask in the collection.
void add_tasks_from(const AsyncTaskCollection &other)
Adds all the AsyncTasks indicated in the other collection to this task.
A class to manage a loose queue of isolated tasks, which can be performed either synchronously (in th...
get_num_task_chains
Returns the number of different task chains.
void wait_for_tasks()
Blocks until the task list is empty.
bool remove(AsyncTask *task)
Removes the indicated task from the active queue.
void add(AsyncTask *task)
Adds the indicated task to the active queue.
AsyncTaskCollection find_tasks_matching(const GlobPattern &pattern) const
Returns the list of tasks found whose name matches the indicated glob pattern, e.g.
void stop_threads()
Stops any threads that are currently running.
void poll()
Runs through all the tasks in the task list, once, if the task manager is running in single-threaded ...
AsyncTaskChain * find_task_chain(const std::string &name)
Searches a new AsyncTaskChain of the indicated name and returns it if it exists, or NULL otherwise.
void cleanup()
Stops all threads and messily empties the task list.
get_sleeping_tasks
Returns the set of tasks that are sleeping (and not active) on the task manager, at the time of the c...
get_next_wake_time
Returns the scheduled time (on the manager's clock) of the next sleeping task, on any task chain,...
AsyncTaskChain * make_task_chain(const std::string &name)
Creates a new AsyncTaskChain of the indicated name and stores it within the AsyncTaskManager.
get_tasks
Returns the set of tasks that are active or sleeping on the task manager, at the time of the call.
AsyncTask * find_task(const std::string &name) const
Returns the first task found with the indicated name, or NULL if there is no task with the indicated ...
get_active_tasks
Returns the set of tasks that are active (and not sleeping) on the task manager, at the time of the c...
get_task_chain
Returns the nth task chain.
bool has_task(AsyncTask *task) const
Returns true if the indicated task has been added to this AsyncTaskManager, false otherwise.
bool remove_task_chain(const std::string &name)
Removes the AsyncTaskChain of the indicated name.
void start_threads()
Starts any requested threads to service the tasks on the queue.
AsyncTaskCollection find_tasks(const std::string &name) const
Returns the list of tasks found with the indicated name.
This class represents a concrete task performed by an AsyncManager.
Definition asyncTask.h:32
bool remove()
Removes the task from its active manager, if any, and makes the state S_inactive (or possible S_servi...
Definition asyncTask.cxx:73
A ClockObject keeps track of elapsed real time and discrete time.
Definition clockObject.h:58
void notify_all()
Informs all of the other threads who are currently blocked on wait() that the relevant condition has ...
This class can be used to test for string matches against standard Unix- shell filename globbing conv...
Definition globPattern.h:32
std::string get_const_prefix() const
Returns the initial part of the pattern before the first glob character.
bool matches(const std::string &candidate) const
Returns true if the candidate string matches the pattern, false otherwise.
void unlock()
Alias for release() to match C++11 semantics.
Definition mutexDirect.I:39
void lock()
Alias for acquire() to match C++11 semantics.
Definition mutexDirect.I:19
A lightweight C++ object whose constructor calls acquire() and whose destructor calls release() on a ...
Definition mutexHolder.h:25
A base class for all things which can have a name.
Definition namable.h:26
void ref() const
Explicitly increments the reference count.
void local_object()
This function should be called, once, immediately after creating a new instance of some ReferenceCoun...
TypeHandle is the identifier used to differentiate C++ class types.
Definition typeHandle.h:81
iterator_0 begin()
Returns the iterator that marks the first element in the ordered vector.
size_type_0 size() const
Returns the number of elements in the ordered vector.
bool empty() const
Returns true if the ordered vector is empty, false otherwise.
iterator_0 end()
Returns the iterator that marks the end of the ordered vector.
void pop_back()
Removes the last element at the end of the vector.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
void init_memory_hook()
Any code that might need to use PANDA_MALLOC or PANDA_FREE, or any methods of the global memory_hook ...
Definition dtoolbase.cxx:38
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
std::ostream & indent(std::ostream &out, int indent_level)
A handy function for doing text formatting.
Definition indent.cxx:20
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.