Panda3D
asyncFuture.cxx
Go to the documentation of this file.
1 /**
2  * PANDA 3D SOFTWARE
3  * Copyright (c) Carnegie Mellon University. All rights reserved.
4  *
5  * All use of this software is subject to the terms of the revised BSD
6  * license. You should have received a copy of this license along
7  * with this source code in a file named "LICENSE."
8  *
9  * @file asyncFuture.cxx
10  * @author rdb
11  * @date 2017-11-28
12  */
13 
14 #include "asyncFuture.h"
15 #include "asyncTask.h"
16 #include "asyncTaskManager.h"
17 #include "conditionVarFull.h"
18 #include "config_event.h"
19 #include "pStatTimer.h"
20 #include "throw_event.h"
21 
22 TypeHandle AsyncFuture::_type_handle;
23 TypeHandle AsyncGatheringFuture::_type_handle;
24 
25 /**
26  * Destroys the future. Assumes notify_done() has already been called.
27  */
29 ~AsyncFuture() {
30  // If this triggers, the future destroyed before it was cancelled, which is
31  // not valid. Unless we should simply call cancel() here?
32  nassertv(_waiting.empty());
33 
34  // This is an attempt to work around what appears to be a compiler bug in
35  // MSVC when compiling with optimizations and having an EventStoreInt stored
36  // in this field. It crashes when we delete via the ReferenceCount base
37  // instead of via the TypedObject. I haven't been able to find out why;
38  // just that it doesn't happen with ParamString. ~rdb
39  ReferenceCount *result_ref = _result_ref.p();
40  if (result_ref != nullptr) {
41  _result_ref.cheat() = nullptr;
42  if (!result_ref->unref()) {
43  delete _result;
44  }
45  _result = nullptr;
46  }
47 }
48 
49 /**
50  * Cancels the future. Returns true if it was cancelled, or false if the
51  * future was already done. Either way, done() will return true after this
52  * call returns.
53  *
54  * In the case of a task, this is equivalent to remove().
55  */
57 cancel() {
58  if (set_future_state(FS_cancelled)) {
59  // The compare-swap operation succeeded, so schedule the callbacks.
60  notify_done(false);
61  return true;
62  } else {
63  // It's already done.
64  return false;
65  }
66 }
67 
68 /**
69  *
70  */
71 void AsyncFuture::
72 output(std::ostream &out) const {
73  out << get_type();
74  FutureState state = (FutureState)AtomicAdjust::get(_future_state);
75  switch (state) {
76  case FS_pending:
77  case FS_locked_pending:
78  out << " (pending)";
79  break;
80  case FS_finished:
81  out << " (finished)";
82  break;
83  case FS_cancelled:
84  out << " (cancelled)";
85  break;
86  default:
87  out << " (**INVALID**)";
88  break;
89  }
90 }
91 
92 /**
93  * Waits until the future is done.
94  */
96 wait() {
97  if (done()) {
98  return;
99  }
100 
101  PStatTimer timer(AsyncTaskChain::_wait_pcollector);
102  if (task_cat.is_debug()) {
103  task_cat.debug()
104  << "Waiting for future " << *this << "\n";
105  }
106 
107  // Continue to yield while the future isn't done. It may be more efficient
108  // to use a condition variable, but let's not add the extra complexity
109  // unless we're sure that we need it.
110  do {
112  } while (!done());
113 }
114 
115 /**
116  * Waits until the future is done, or until the timeout is reached.
117  */
119 wait(double timeout) {
120  if (done()) {
121  return;
122  }
123 
124  PStatTimer timer(AsyncTaskChain::_wait_pcollector);
125  if (task_cat.is_debug()) {
126  task_cat.debug()
127  << "Waiting up to " << timeout << " seconds for future " << *this << "\n";
128  }
129 
130  // Continue to yield while the future isn't done. It may be more efficient
131  // to use a condition variable, but let's not add the extra complexity
132  // unless we're sure that we need it.
134  double end = clock->get_real_time() + timeout;
135  do {
137  } while (!done() && clock->get_real_time() < end);
138 }
139 
140 /**
141  * Schedules the done callbacks. Called after the future has just entered the
142  * 'done' state.
143  * @param clean_exit true if finished successfully, false if cancelled.
144  */
146 notify_done(bool clean_exit) {
147  nassertv(done());
148 
149  // This will only be called by the thread that managed to set the
150  // _future_state away from the "pending" state, so this is thread safe.
151 
152  Futures::iterator it;
153  for (it = _waiting.begin(); it != _waiting.end(); ++it) {
154  AsyncFuture *fut = *it;
155  if (fut->is_task()) {
156  // It's a task. Make it active again.
157  wake_task((AsyncTask *)fut);
158  } else {
159  // It's a gathering future. Decrease the pending count on it, and if
160  // we're the last one, call notify_done() on it.
162  if (!AtomicAdjust::dec(gather->_num_pending)) {
163  if (gather->set_future_state(FS_finished)) {
164  gather->notify_done(true);
165  }
166  }
167  }
168  }
169  _waiting.clear();
170 
171  // For historical reasons, we don't send the "done event" if the future was
172  // cancelled.
173  if (clean_exit && !_done_event.empty()) {
174  PT_Event event = new Event(_done_event);
175  event->add_parameter(EventParameter(this));
176  throw_event(std::move(event));
177  }
178 }
179 
180 /**
181  * Sets this future's result. Can only be done while the future is not done.
182  * Calling this marks the future as done and schedules the done callbacks.
183  *
184  * This variant takes two pointers; the second one is only set if this object
185  * inherits from ReferenceCount, so that a reference can be held.
186  *
187  * Assumes the manager's lock is *not* held.
188  */
190 set_result(TypedObject *ptr, ReferenceCount *ref_ptr) {
191  // We don't strictly need to lock the future since only one thread is
192  // allowed to call set_result(), but we might as well.
193  FutureState orig_state = (FutureState)AtomicAdjust::
194  compare_and_exchange(_future_state, (AtomicAdjust::Integer)FS_pending,
195  (AtomicAdjust::Integer)FS_locked_pending);
196 
197  while (orig_state == FS_locked_pending) {
199  orig_state = (FutureState)AtomicAdjust::
200  compare_and_exchange(_future_state, (AtomicAdjust::Integer)FS_pending,
201  (AtomicAdjust::Integer)FS_locked_pending);
202  }
203 
204  if (orig_state == FS_pending) {
205  _result = ptr;
206  _result_ref = ref_ptr;
207  unlock(FS_finished);
208 
209  // OK, now our thread owns the _waiting vector et al.
210  notify_done(true);
211 
212  } else if (orig_state == FS_cancelled) {
213  // This was originally illegal, but there is a chance that the future was
214  // cancelled while another thread was setting the result. So, we drop
215  // this, but we can issue a warning.
216  task_cat.warning()
217  << "Ignoring set_result() called on cancelled " << *this << "\n";
218 
219  } else {
220  task_cat.error()
221  << "set_result() was called on finished " << *this << "\n";
222  }
223 }
224 
225 /**
226  * Indicates that the given task is waiting for this future to complete. When
227  * the future is done, it will reactivate the given task. If this is called
228  * while the future is already done, schedules the task immediately.
229  * Assumes the manager's lock is not held.
230  * @returns true if the future was pending, false if it was already done.
231  */
234  nassertr(task->is_runnable(), false);
235 
236  // We have to make sure we're not going to change state while we're in the
237  // process of adding the task.
238  if (try_lock_pending()) {
239  if (_manager == nullptr) {
240  _manager = task->_manager;
241  }
242 
243  _waiting.push_back(task);
244 
245  // Unlock the state.
246  unlock();
247  nassertr(task->_manager == nullptr || task->_manager == _manager, true);
248  return true;
249  } else {
250  // It's already done. Wake the task immediately.
251  wake_task(task);
252  return false;
253  }
254 }
255 
256 /**
257  * Reactivates the given task. Assumes the manager lock is not held.
258  */
259 void AsyncFuture::
260 wake_task(AsyncTask *task) {
261  AsyncTaskManager *manager = task->_manager;
262  if (manager == nullptr) {
263  // If it's an unscheduled task, schedule it on the same manager as the
264  // rest of the waiting tasks.
265  manager = _manager;
266  if (manager == nullptr) {
268  }
269  }
270 
271  MutexHolder holder(manager->_lock);
272  switch (task->_state) {
273  case AsyncTask::S_servicing_removed:
274  nassertv(task->_manager == _manager);
275  // Re-adding a self-removed task; this just means clearing the removed
276  // flag.
277  task->_state = AsyncTask::S_servicing;
278  return;
279 
280  case AsyncTask::S_inactive:
281  // Schedule it immediately.
282  nassertv(task->_manager == nullptr);
283 
284  if (task_cat.is_debug()) {
285  task_cat.debug()
286  << "Adding " << *task << " (woken by future " << *this << ")\n";
287  }
288 
289  {
290  manager->_lock.unlock();
291  task->upon_birth(manager);
292  manager->_lock.lock();
293  nassertv(task->_manager == nullptr &&
294  task->_state == AsyncTask::S_inactive);
295 
296  AsyncTaskChain *chain = manager->do_find_task_chain(task->_chain_name);
297  if (chain == nullptr) {
298  task_cat.warning()
299  << "Creating implicit AsyncTaskChain " << task->_chain_name
300  << " for " << manager->get_type() << " " << manager->get_name() << "\n";
301  chain = manager->do_make_task_chain(task->_chain_name);
302  }
303  chain->do_add(task);
304  }
305  return;
306 
307  case AsyncTask::S_awaiting:
308  nassertv(task->_manager == _manager);
309  task->_state = AsyncTask::S_active;
310  task->_chain->_active.push_back(task);
311  --task->_chain->_num_awaiting_tasks;
312  return;
313 
314  default:
315  nassert_raise("unexpected task state");
316  return;
317  }
318 }
319 
320 /**
321  * @see AsyncFuture::gather
322  */
323 AsyncGatheringFuture::
324 AsyncGatheringFuture(AsyncFuture::Futures futures) :
325  _futures(std::move(futures)),
326  _num_pending(0) {
327 
328  bool any_pending = false;
329 
330  AsyncFuture::Futures::const_iterator it;
331  for (it = _futures.begin(); it != _futures.end(); ++it) {
332  AsyncFuture *fut = *it;
333  // If this returns true, the future is not yet done and we need to
334  // register ourselves with it. This creates a circular reference, but it
335  // is resolved when the future is completed or cancelled.
336  if (fut->try_lock_pending()) {
337  if (_manager == nullptr) {
338  _manager = fut->_manager;
339  }
340  fut->_waiting.push_back((AsyncFuture *)this);
341  AtomicAdjust::inc(_num_pending);
342  fut->unlock();
343  any_pending = true;
344  }
345  }
346  if (!any_pending) {
347  // Start in the done state if all the futures we were passed are done.
348  // Note that it is only safe to set this member in this manner if indeed
349  // no other future holds a reference to us.
350  _future_state = (AtomicAdjust::Integer)FS_finished;
351  }
352 }
353 
354 /**
355  * Cancels all the futures. Returns true if any futures were cancelled.
356  * Makes sure that all the futures finish before this one is marked done, in
357  * order to maintain the guarantee that calling result() is safe when done()
358  * returns true.
359  */
361 cancel() {
362  if (!done()) {
363  // Temporarily increase the pending count so that the notify_done()
364  // callbacks won't end up causing it to be set to "finished".
365  AtomicAdjust::inc(_num_pending);
366 
367  bool any_cancelled = false;
368  AsyncFuture::Futures::const_iterator it;
369  for (it = _futures.begin(); it != _futures.end(); ++it) {
370  AsyncFuture *fut = *it;
371  if (fut->cancel()) {
372  any_cancelled = true;
373  }
374  }
375 
376  // Now change state to "cancelled" and call the notify_done() callbacks.
377  // Don't call notify_done() if another thread has beaten us to it.
378  if (set_future_state(FS_cancelled)) {
379  notify_done(false);
380  }
381 
382  // Decreasing the pending count is kind of pointless now, so we do it only
383  // in a debug build.
384  nassertr(!AtomicAdjust::dec(_num_pending), any_cancelled);
385  return any_cancelled;
386  } else {
387  return false;
388  }
389 }
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
This class represents a thread-safe handle to a promised future result of an asynchronous operation,...
Definition: asyncFuture.h:61
virtual bool cancel()
Cancels the future.
Definition: asyncFuture.cxx:57
void notify_done(bool clean_exit)
Schedules the done callbacks.
void set_result(std::nullptr_t)
Sets this future's result.
Definition: asyncFuture.I:92
void wait()
Waits until the future is done.
Definition: asyncFuture.cxx:96
virtual ~AsyncFuture()
Destroys the future.
Definition: asyncFuture.cxx:29
static AsyncFuture * gather(Futures futures)
Creates a new future that returns `done()` when all of the contained futures are done.
Definition: asyncFuture.I:124
bool add_waiting_task(AsyncTask *task)
Indicates that the given task is waiting for this future to complete.
bool done() const
Returns true if the future is done or has been cancelled.
Definition: asyncFuture.I:29
Specific future that collects the results of several futures.
Definition: asyncFuture.h:166
virtual bool cancel() override
Cancels all the futures.
The AsyncTaskChain is a subset of the AsyncTaskManager.
A class to manage a loose queue of isolated tasks, which can be performed either synchronously (in th...
static AsyncTaskManager * get_global_ptr()
Returns a pointer to the global AsyncTaskManager.
This class represents a concrete task performed by an AsyncManager.
Definition: asyncTask.h:32
A trivial implementation for atomic adjustments for systems that don't require multiprogramming,...
static bool dec(Integer &var)
Atomically decrements the indicated variable and returns true if the new value is nonzero,...
static void inc(Integer &var)
Atomically increments the indicated variable.
static Integer get(const Integer &var)
Atomically retrieves the snapshot value of the indicated variable.
A ClockObject keeps track of elapsed real time and discrete time.
Definition: clockObject.h:58
get_real_time
Returns the actual number of seconds elapsed since the ClockObject was created, or since it was last ...
Definition: clockObject.h:92
static ClockObject * get_global_clock()
Returns a pointer to the global ClockObject.
Definition: clockObject.I:215
An optional parameter associated with an event.
A named event, possibly with parameters.
Definition: event.h:33
void unlock()
Alias for release() to match C++11 semantics.
Definition: mutexDirect.I:39
void lock()
Alias for acquire() to match C++11 semantics.
Definition: mutexDirect.I:19
A lightweight C++ object whose constructor calls acquire() and whose destructor calls release() on a ...
Definition: mutexHolder.h:25
A lightweight class that can be used to automatically start and stop a PStatCollector around a sectio...
Definition: pStatTimer.h:30
A base class for all things that want to be reference-counted.
virtual bool unref() const
Explicitly decrements the reference count.
static void force_yield()
Suspends the current thread for the rest of the current epoch.
Definition: thread.I:201
TypeHandle is the identifier used to differentiate C++ class types.
Definition: typeHandle.h:81
This is an abstract class that all classes which use TypeHandle, and also provide virtual functions t...
Definition: typedObject.h:88
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.