32 nassertv(_waiting.empty());
40 if (result_ref !=
nullptr) {
41 _result_ref.cheat() =
nullptr;
42 if (!result_ref->
unref()) {
58 if (set_future_state(FS_cancelled)) {
72 output(std::ostream &out)
const {
77 case FS_locked_pending:
84 out <<
" (cancelled)";
87 out <<
" (**INVALID**)";
101 PStatTimer timer(AsyncTaskChain::_wait_pcollector);
102 if (task_cat.is_debug()) {
104 <<
"Waiting for future " << *
this <<
"\n";
124 PStatTimer timer(AsyncTaskChain::_wait_pcollector);
125 if (task_cat.is_debug()) {
127 <<
"Waiting up to " << timeout <<
" seconds for future " << *
this <<
"\n";
152 Futures::iterator it;
153 for (it = _waiting.begin(); it != _waiting.end(); ++it) {
155 if (fut->is_task()) {
163 if (
gather->set_future_state(FS_finished)) {
173 if (clean_exit && !_done_event.empty()) {
174 PT_Event
event =
new Event(_done_event);
176 throw_event(std::move(event));
194 compare_and_exchange(_future_state, (AtomicAdjust::Integer)FS_pending,
195 (AtomicAdjust::Integer)FS_locked_pending);
197 while (orig_state == FS_locked_pending) {
200 compare_and_exchange(_future_state, (AtomicAdjust::Integer)FS_pending,
201 (AtomicAdjust::Integer)FS_locked_pending);
204 if (orig_state == FS_pending) {
206 _result_ref = ref_ptr;
212 }
else if (orig_state == FS_cancelled) {
217 <<
"Ignoring set_result() called on cancelled " << *
this <<
"\n";
221 <<
"set_result() was called on finished " << *
this <<
"\n";
234 nassertr(task->is_runnable(),
false);
238 if (try_lock_pending()) {
239 if (_manager ==
nullptr) {
240 _manager = task->_manager;
243 _waiting.push_back(task);
247 nassertr(task->_manager ==
nullptr || task->_manager == _manager,
true);
262 if (manager ==
nullptr) {
266 if (manager ==
nullptr) {
272 switch (task->_state) {
273 case AsyncTask::S_servicing_removed:
274 nassertv(task->_manager == _manager);
277 task->_state = AsyncTask::S_servicing;
280 case AsyncTask::S_inactive:
282 nassertv(task->_manager ==
nullptr);
284 if (task_cat.is_debug()) {
286 <<
"Adding " << *task <<
" (woken by future " << *
this <<
")\n";
291 task->upon_birth(manager);
292 manager->_lock.
lock();
293 nassertv(task->_manager ==
nullptr &&
294 task->_state == AsyncTask::S_inactive);
296 AsyncTaskChain *chain = manager->do_find_task_chain(task->_chain_name);
297 if (chain ==
nullptr) {
299 <<
"Creating implicit AsyncTaskChain " << task->_chain_name
300 <<
" for " << manager->get_type() <<
" " << manager->get_name() <<
"\n";
301 chain = manager->do_make_task_chain(task->_chain_name);
307 case AsyncTask::S_awaiting:
308 nassertv(task->_manager == _manager);
309 task->_state = AsyncTask::S_active;
310 task->_chain->_active.push_back(task);
311 --task->_chain->_num_awaiting_tasks;
315 nassert_raise(
"unexpected task state");
323 AsyncGatheringFuture::
325 _futures(std::move(futures)),
328 bool any_pending =
false;
330 AsyncFuture::Futures::const_iterator it;
331 for (it = _futures.begin(); it != _futures.end(); ++it) {
336 if (fut->try_lock_pending()) {
337 if (_manager ==
nullptr) {
338 _manager = fut->_manager;
350 _future_state = (AtomicAdjust::Integer)FS_finished;
367 bool any_cancelled =
false;
368 AsyncFuture::Futures::const_iterator it;
369 for (it = _futures.begin(); it != _futures.end(); ++it) {
372 any_cancelled =
true;
378 if (set_future_state(FS_cancelled)) {
385 return any_cancelled;