Panda3D
asyncFuture.cxx
Go to the documentation of this file.
1 /**
2  * PANDA 3D SOFTWARE
3  * Copyright (c) Carnegie Mellon University. All rights reserved.
4  *
5  * All use of this software is subject to the terms of the revised BSD
6  * license. You should have received a copy of this license along
7  * with this source code in a file named "LICENSE."
8  *
9  * @file asyncFuture.cxx
10  * @author rdb
11  * @date 2017-11-28
12  */
13 
14 #include "asyncFuture.h"
15 #include "asyncTask.h"
16 #include "asyncTaskManager.h"
17 #include "conditionVarFull.h"
18 #include "config_event.h"
19 #include "pStatTimer.h"
20 #include "throw_event.h"
21 
22 TypeHandle AsyncFuture::_type_handle;
23 TypeHandle AsyncGatheringFuture::_type_handle;
24 
25 /**
26  * Destroys the future. Assumes notify_done() has already been called.
27  */
29 ~AsyncFuture() {
30  // If this triggers, the future destroyed before it was cancelled, which is
31  // not valid. Unless we should simply call cancel() here?
32  nassertv(_waiting.empty());
33 
34  // This is an attempt to work around what appears to be a compiler bug in
35  // MSVC when compiling with optimizations and having an EventStoreInt stored
36  // in this field. It crashes when we delete via the ReferenceCount base
37  // instead of via the TypedObject. I haven't been able to find out why;
38  // just that it doesn't happen with ParamString. ~rdb
39  ReferenceCount *result_ref = _result_ref.p();
40  if (result_ref != nullptr) {
41  _result_ref.cheat() = nullptr;
42  if (!result_ref->unref()) {
43  delete _result;
44  }
45  _result = nullptr;
46  }
47 }
48 
49 /**
50  * Cancels the future. Returns true if it was cancelled, or false if the
51  * future was already done. Either way, done() will return true after this
52  * call returns.
53  *
54  * In the case of a task, this is equivalent to remove().
55  */
57 cancel() {
58  if (set_future_state(FS_cancelled)) {
59  // The compare-swap operation succeeded, so schedule the callbacks.
60  notify_done(false);
61  return true;
62  } else {
63  // It's already done.
64  return false;
65  }
66 }
67 
68 /**
69  *
70  */
71 void AsyncFuture::
72 output(std::ostream &out) const {
73  out << get_type();
74  FutureState state = (FutureState)AtomicAdjust::get(_future_state);
75  switch (state) {
76  case FS_pending:
77  case FS_locked_pending:
78  out << " (pending)";
79  break;
80  case FS_finished:
81  out << " (finished)";
82  break;
83  case FS_cancelled:
84  out << " (cancelled)";
85  break;
86  default:
87  out << " (**INVALID**)";
88  break;
89  }
90 }
91 
92 /**
93  * Waits until the future is done.
94  */
96 wait() {
97  if (done()) {
98  return;
99  }
100 
101  PStatTimer timer(AsyncTaskChain::_wait_pcollector);
102  if (task_cat.is_debug()) {
103  task_cat.debug()
104  << "Waiting for future " << *this << "\n";
105  }
106 
107  // Continue to yield while the future isn't done. It may be more efficient
108  // to use a condition variable, but let's not add the extra complexity
109  // unless we're sure that we need it.
110  do {
112  } while (!done());
113 }
114 
115 /**
116  * Waits until the future is done, or until the timeout is reached.
117  */
119 wait(double timeout) {
120  if (done()) {
121  return;
122  }
123 
124  PStatTimer timer(AsyncTaskChain::_wait_pcollector);
125  if (task_cat.is_debug()) {
126  task_cat.debug()
127  << "Waiting up to " << timeout << " seconds for future " << *this << "\n";
128  }
129 
130  // Continue to yield while the future isn't done. It may be more efficient
131  // to use a condition variable, but let's not add the extra complexity
132  // unless we're sure that we need it.
134  double end = clock->get_real_time() + timeout;
135  do {
137  } while (!done() && clock->get_real_time() < end);
138 }
139 
140 /**
141  * Schedules the done callbacks. Called after the future has just entered the
142  * 'done' state.
143  * @param clean_exit true if finished successfully, false if cancelled.
144  */
146 notify_done(bool clean_exit) {
147  nassertv(done());
148 
149  // This will only be called by the thread that managed to set the
150  // _future_state away from the "pending" state, so this is thread safe.
151 
152  Futures::iterator it;
153  for (it = _waiting.begin(); it != _waiting.end(); ++it) {
154  AsyncFuture *fut = *it;
155  if (fut->is_task()) {
156  // It's a task. Make it active again.
157  wake_task((AsyncTask *)fut);
158  } else {
159  // It's a gathering future. Decrease the pending count on it, and if
160  // we're the last one, call notify_done() on it.
162  if (!AtomicAdjust::dec(gather->_num_pending)) {
163  if (gather->set_future_state(FS_finished)) {
164  gather->notify_done(true);
165  }
166  }
167  }
168  }
169  _waiting.clear();
170 
171  // For historical reasons, we don't send the "done event" if the future was
172  // cancelled.
173  if (clean_exit && !_done_event.empty()) {
174  PT_Event event = new Event(_done_event);
175  event->add_parameter(EventParameter(this));
176  throw_event(std::move(event));
177  }
178 }
179 
180 /**
181  * Sets this future's result. Can only be done while the future is not done.
182  * Calling this marks the future as done and schedules the done callbacks.
183  *
184  * This variant takes two pointers; the second one is only set if this object
185  * inherits from ReferenceCount, so that a reference can be held.
186  *
187  * Assumes the manager's lock is *not* held.
188  */
190 set_result(TypedObject *ptr, ReferenceCount *ref_ptr) {
191  // We don't strictly need to lock the future since only one thread is
192  // allowed to call set_result(), but we might as well.
193  FutureState orig_state = (FutureState)AtomicAdjust::
194  compare_and_exchange(_future_state, (AtomicAdjust::Integer)FS_pending,
195  (AtomicAdjust::Integer)FS_locked_pending);
196 
197  while (orig_state == FS_locked_pending) {
199  orig_state = (FutureState)AtomicAdjust::
200  compare_and_exchange(_future_state, (AtomicAdjust::Integer)FS_pending,
201  (AtomicAdjust::Integer)FS_locked_pending);
202  }
203 
204  if (orig_state == FS_pending) {
205  _result = ptr;
206  _result_ref = ref_ptr;
207  unlock(FS_finished);
208 
209  // OK, now our thread owns the _waiting vector et al.
210  notify_done(true);
211 
212  } else if (orig_state == FS_cancelled) {
213  // This was originally illegal, but there is a chance that the future was
214  // cancelled while another thread was setting the result. So, we drop
215  // this, but we can issue a warning.
216  task_cat.warning()
217  << "Ignoring set_result() called on cancelled " << *this << "\n";
218 
219  } else {
220  task_cat.error()
221  << "set_result() was called on finished " << *this << "\n";
222  }
223 }
224 
225 /**
226  * Indicates that the given task is waiting for this future to complete. When
227  * the future is done, it will reactivate the given task. If this is called
228  * while the future is already done, schedules the task immediately.
229  * Assumes the manager's lock is not held.
230  * @returns true if the future was pending, false if it was already done.
231  */
234  nassertr(task->is_runnable(), false);
235 
236  // We have to make sure we're not going to change state while we're in the
237  // process of adding the task.
238  if (try_lock_pending()) {
239  if (_manager == nullptr) {
240  _manager = task->_manager;
241  }
242 
243  _waiting.push_back(task);
244 
245  // Unlock the state.
246  unlock();
247  nassertr(task->_manager == nullptr || task->_manager == _manager, true);
248  return true;
249  } else {
250  // It's already done. Wake the task immediately.
251  wake_task(task);
252  return false;
253  }
254 }
255 
256 /**
257  * Reactivates the given task. Assumes the manager lock is not held.
258  */
259 void AsyncFuture::
260 wake_task(AsyncTask *task) {
261  AsyncTaskManager *manager = task->_manager;
262  if (manager == nullptr) {
263  // If it's an unscheduled task, schedule it on the same manager as the
264  // rest of the waiting tasks.
265  manager = _manager;
266  if (manager == nullptr) {
268  }
269  }
270 
271  MutexHolder holder(manager->_lock);
272  switch (task->_state) {
273  case AsyncTask::S_servicing_removed:
274  nassertv(task->_manager == _manager);
275  // Re-adding a self-removed task; this just means clearing the removed
276  // flag.
277  task->_state = AsyncTask::S_servicing;
278  return;
279 
280  case AsyncTask::S_inactive:
281  // Schedule it immediately.
282  nassertv(task->_manager == nullptr);
283 
284  if (task_cat.is_debug()) {
285  task_cat.debug()
286  << "Adding " << *task << " (woken by future " << *this << ")\n";
287  }
288 
289  {
290  manager->_lock.unlock();
291  task->upon_birth(manager);
292  manager->_lock.lock();
293  nassertv(task->_manager == nullptr &&
294  task->_state == AsyncTask::S_inactive);
295 
296  AsyncTaskChain *chain = manager->do_find_task_chain(task->_chain_name);
297  if (chain == nullptr) {
298  task_cat.warning()
299  << "Creating implicit AsyncTaskChain " << task->_chain_name
300  << " for " << manager->get_type() << " " << manager->get_name() << "\n";
301  chain = manager->do_make_task_chain(task->_chain_name);
302  }
303  chain->do_add(task);
304  }
305  return;
306 
307  case AsyncTask::S_awaiting:
308  nassertv(task->_manager == _manager);
309  task->_state = AsyncTask::S_active;
310  task->_chain->_active.push_back(task);
311  --task->_chain->_num_awaiting_tasks;
312  return;
313 
314  default:
315  nassert_raise("unexpected task state");
316  return;
317  }
318 }
319 
320 /**
321  * @see AsyncFuture::gather
322  */
323 AsyncGatheringFuture::
324 AsyncGatheringFuture(AsyncFuture::Futures futures) :
325  _futures(std::move(futures)),
326  _num_pending(0) {
327 
328  bool any_pending = false;
329 
330  AsyncFuture::Futures::const_iterator it;
331  for (it = _futures.begin(); it != _futures.end(); ++it) {
332  AsyncFuture *fut = *it;
333  // If this returns true, the future is not yet done and we need to
334  // register ourselves with it. This creates a circular reference, but it
335  // is resolved when the future is completed or cancelled.
336  if (fut->try_lock_pending()) {
337  if (_manager == nullptr) {
338  _manager = fut->_manager;
339  }
340  fut->_waiting.push_back((AsyncFuture *)this);
341  AtomicAdjust::inc(_num_pending);
342  fut->unlock();
343  any_pending = true;
344  }
345  }
346  if (!any_pending) {
347  // Start in the done state if all the futures we were passed are done.
348  // Note that it is only safe to set this member in this manner if indeed
349  // no other future holds a reference to us.
350  _future_state = (AtomicAdjust::Integer)FS_finished;
351  }
352 }
353 
354 /**
355  * Cancels all the futures. Returns true if any futures were cancelled.
356  * Makes sure that all the futures finish before this one is marked done, in
357  * order to maintain the guarantee that calling result() is safe when done()
358  * returns true.
359  */
361 cancel() {
362  if (!done()) {
363  // Temporarily increase the pending count so that the notify_done()
364  // callbacks won't end up causing it to be set to "finished".
365  AtomicAdjust::inc(_num_pending);
366 
367  bool any_cancelled = false;
368  AsyncFuture::Futures::const_iterator it;
369  for (it = _futures.begin(); it != _futures.end(); ++it) {
370  AsyncFuture *fut = *it;
371  if (fut->cancel()) {
372  any_cancelled = true;
373  }
374  }
375 
376  // Now change state to "cancelled" and call the notify_done() callbacks.
377  // Don't call notify_done() if another thread has beaten us to it.
378  if (set_future_state(FS_cancelled)) {
379  notify_done(false);
380  }
381 
382  // Decreasing the pending count is kind of pointless now, so we do it only
383  // in a debug build.
384  nassertr(!AtomicAdjust::dec(_num_pending), any_cancelled);
385  return any_cancelled;
386  } else {
387  return false;
388  }
389 }
MutexHolder
A lightweight C++ object whose constructor calls acquire() and whose destructor calls release() on a ...
Definition: mutexHolder.h:25
AsyncTaskChain
The AsyncTaskChain is a subset of the AsyncTaskManager.
Definition: asyncTaskChain.h:50
asyncTaskManager.h
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
throw_event.h
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
AsyncTaskManager::get_global_ptr
static AsyncTaskManager * get_global_ptr()
Returns a pointer to the global AsyncTaskManager.
Definition: asyncTaskManager.I:50
AtomicAdjustDummyImpl::get
static Integer get(const Integer &var)
Atomically retrieves the snapshot value of the indicated variable.
Definition: atomicAdjustDummyImpl.I:59
pvector< PT(AsyncFuture)>
AsyncFuture::notify_done
void notify_done(bool clean_exit)
Schedules the done callbacks.
Definition: asyncFuture.cxx:146
ClockObject
A ClockObject keeps track of elapsed real time and discrete time.
Definition: clockObject.h:58
conditionVarFull.h
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
ReferenceCount
A base class for all things that want to be reference-counted.
Definition: referenceCount.h:38
MutexDirect::lock
void lock()
Alias for acquire() to match C++11 semantics.
Definition: mutexDirect.I:19
AsyncGatheringFuture
Specific future that collects the results of several futures.
Definition: asyncFuture.h:165
pStatTimer.h
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
ClockObject::get_global_clock
static ClockObject * get_global_clock()
Returns a pointer to the global ClockObject.
Definition: clockObject.I:215
AsyncGatheringFuture::cancel
virtual bool cancel() override
Cancels all the futures.
Definition: asyncFuture.cxx:361
AsyncFuture::add_waiting_task
bool add_waiting_task(AsyncTask *task)
Indicates that the given task is waiting for this future to complete.
Definition: asyncFuture.cxx:233
AsyncFuture
This class represents a thread-safe handle to a promised future result of an asynchronous operation,...
Definition: asyncFuture.h:61
config_event.h
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
AsyncFuture::~AsyncFuture
virtual ~AsyncFuture()
Destroys the future.
Definition: asyncFuture.cxx:29
ClockObject::get_real_time
get_real_time
Returns the actual number of seconds elapsed since the ClockObject was created, or since it was last ...
Definition: clockObject.h:92
AsyncFuture::gather
static AsyncFuture * gather(Futures futures)
Creates a new future that returns `done()` when all of the contained futures are done.
Definition: asyncFuture.I:124
PStatTimer
A lightweight class that can be used to automatically start and stop a PStatCollector around a sectio...
Definition: pStatTimer.h:30
TypeHandle
TypeHandle is the identifier used to differentiate C++ class types.
Definition: typeHandle.h:81
Event
A named event, possibly with parameters.
Definition: event.h:33
AsyncFuture::set_result
void set_result(std::nullptr_t)
Sets this future's result.
Definition: asyncFuture.I:92
AtomicAdjustDummyImpl::dec
static bool dec(Integer &var)
Atomically decrements the indicated variable and returns true if the new value is nonzero,...
Definition: atomicAdjustDummyImpl.I:27
AsyncFuture::done
bool done() const
Returns true if the future is done or has been cancelled.
Definition: asyncFuture.I:29
AsyncTask
This class represents a concrete task performed by an AsyncManager.
Definition: asyncTask.h:32
MutexDirect::unlock
void unlock()
Alias for release() to match C++11 semantics.
Definition: mutexDirect.I:39
AsyncTaskManager
A class to manage a loose queue of isolated tasks, which can be performed either synchronously (in th...
Definition: asyncTaskManager.h:48
AsyncFuture::wait
void wait()
Waits until the future is done.
Definition: asyncFuture.cxx:96
EventParameter
An optional parameter associated with an event.
Definition: eventParameter.h:35
AtomicAdjustDummyImpl
A trivial implementation for atomic adjustments for systems that don't require multiprogramming,...
Definition: atomicAdjustDummyImpl.h:27
AsyncFuture::cancel
virtual bool cancel()
Cancels the future.
Definition: asyncFuture.cxx:57
asyncTask.h
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
ReferenceCount::unref
virtual bool unref() const
Explicitly decrements the reference count.
Definition: referenceCount.I:179
asyncFuture.h
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
Thread::force_yield
static void force_yield()
Suspends the current thread for the rest of the current epoch.
Definition: thread.I:201
TypedObject
This is an abstract class that all classes which use TypeHandle, and also provide virtual functions t...
Definition: typedObject.h:88
AtomicAdjustDummyImpl::inc
static void inc(Integer &var)
Atomically increments the indicated variable.
Definition: atomicAdjustDummyImpl.I:18