Panda3D
asyncFuture.cxx
Go to the documentation of this file.
1 /**
2  * PANDA 3D SOFTWARE
3  * Copyright (c) Carnegie Mellon University. All rights reserved.
4  *
5  * All use of this software is subject to the terms of the revised BSD
6  * license. You should have received a copy of this license along
7  * with this source code in a file named "LICENSE."
8  *
9  * @file asyncFuture.cxx
10  * @author rdb
11  * @date 2017-11-28
12  */
13 
14 #include "asyncFuture.h"
15 #include "asyncTask.h"
16 #include "asyncTaskManager.h"
17 #include "conditionVarFull.h"
18 #include "config_event.h"
19 #include "pStatTimer.h"
20 #include "throw_event.h"
21 
22 TypeHandle AsyncFuture::_type_handle;
23 TypeHandle AsyncGatheringFuture::_type_handle;
24 
25 /**
26  * Destroys the future. Assumes notify_done() has already been called.
27  */
30  // If this triggers, the future destroyed before it was cancelled, which is
31  // not valid. Unless we should simply call cancel() here?
32  nassertv(_waiting.empty());
33 }
34 
35 /**
36  * Cancels the future. Returns true if it was cancelled, or false if the
37  * future was already done. Either way, done() will return true after this
38  * call returns.
39  *
40  * In the case of a task, this is equivalent to remove().
41  */
42 bool AsyncFuture::
43 cancel() {
44  if (set_future_state(FS_cancelled)) {
45  // The compare-swap operation succeeded, so schedule the callbacks.
46  notify_done(false);
47  return true;
48  } else {
49  // It's already done.
50  return false;
51  }
52 }
53 
54 /**
55  *
56  */
57 void AsyncFuture::
58 output(std::ostream &out) const {
59  out << get_type();
60  FutureState state = (FutureState)AtomicAdjust::get(_future_state);
61  switch (state) {
62  case FS_pending:
63  case FS_locked_pending:
64  out << " (pending)";
65  break;
66  case FS_finished:
67  out << " (finished)";
68  break;
69  case FS_cancelled:
70  out << " (cancelled)";
71  break;
72  default:
73  out << " (**INVALID**)";
74  break;
75  }
76 }
77 
78 /**
79  * Waits until the future is done.
80  */
81 void AsyncFuture::
82 wait() {
83  if (done()) {
84  return;
85  }
86 
87  PStatTimer timer(AsyncTaskChain::_wait_pcollector);
88  if (task_cat.is_debug()) {
89  task_cat.debug()
90  << "Waiting for future " << *this << "\n";
91  }
92 
93  // Continue to yield while the future isn't done. It may be more efficient
94  // to use a condition variable, but let's not add the extra complexity
95  // unless we're sure that we need it.
96  do {
98  } while (!done());
99 }
100 
101 /**
102  * Waits until the future is done, or until the timeout is reached.
103  */
104 void AsyncFuture::
105 wait(double timeout) {
106  if (done()) {
107  return;
108  }
109 
110  PStatTimer timer(AsyncTaskChain::_wait_pcollector);
111  if (task_cat.is_debug()) {
112  task_cat.debug()
113  << "Waiting up to " << timeout << " seconds for future " << *this << "\n";
114  }
115 
116  // Continue to yield while the future isn't done. It may be more efficient
117  // to use a condition variable, but let's not add the extra complexity
118  // unless we're sure that we need it.
120  double end = clock->get_real_time() + timeout;
121  do {
123  } while (!done() && clock->get_real_time() < end);
124 }
125 
126 /**
127  * Schedules the done callbacks. Called after the future has just entered the
128  * 'done' state.
129  * @param clean_exit true if finished successfully, false if cancelled.
130  */
131 void AsyncFuture::
132 notify_done(bool clean_exit) {
133  nassertv(done());
134 
135  // This will only be called by the thread that managed to set the
136  // _future_state away from the "pending" state, so this is thread safe.
137 
138  Futures::iterator it;
139  for (it = _waiting.begin(); it != _waiting.end(); ++it) {
140  AsyncFuture *fut = *it;
141  if (fut->is_task()) {
142  // It's a task. Make it active again.
143  wake_task((AsyncTask *)fut);
144  } else {
145  // It's a gathering future. Decrease the pending count on it, and if
146  // we're the last one, call notify_done() on it.
148  if (!AtomicAdjust::dec(gather->_num_pending)) {
149  if (gather->set_future_state(FS_finished)) {
150  gather->notify_done(true);
151  }
152  }
153  }
154  }
155  _waiting.clear();
156 
157  // For historical reasons, we don't send the "done event" if the future was
158  // cancelled.
159  if (clean_exit && !_done_event.empty()) {
160  PT_Event event = new Event(_done_event);
161  event->add_parameter(EventParameter(this));
162  throw_event(std::move(event));
163  }
164 }
165 
166 /**
167  * Sets this future's result. Can only be done while the future is not done.
168  * Calling this marks the future as done and schedules the done callbacks.
169  *
170  * This variant takes two pointers; the second one is only set if this object
171  * inherits from ReferenceCount, so that a reference can be held.
172  *
173  * Assumes the manager's lock is *not* held.
174  */
175 void AsyncFuture::
177  // We don't strictly need to lock the future since only one thread is
178  // allowed to call set_result(), but we might as well.
179  FutureState orig_state = (FutureState)AtomicAdjust::
180  compare_and_exchange(_future_state, (AtomicAdjust::Integer)FS_pending,
181  (AtomicAdjust::Integer)FS_locked_pending);
182 
183  while (orig_state == FS_locked_pending) {
185  orig_state = (FutureState)AtomicAdjust::
186  compare_and_exchange(_future_state, (AtomicAdjust::Integer)FS_pending,
187  (AtomicAdjust::Integer)FS_locked_pending);
188  }
189 
190  if (orig_state == FS_pending) {
191  _result = ptr;
192  _result_ref = ref_ptr;
193  unlock(FS_finished);
194 
195  // OK, now our thread owns the _waiting vector et al.
196  notify_done(true);
197 
198  } else if (orig_state == FS_cancelled) {
199  // This was originally illegal, but there is a chance that the future was
200  // cancelled while another thread was setting the result. So, we drop
201  // this, but we can issue a warning.
202  task_cat.warning()
203  << "Ignoring set_result() called on cancelled " << *this << "\n";
204 
205  } else {
206  task_cat.error()
207  << "set_result() was called on finished " << *this << "\n";
208  }
209 }
210 
211 /**
212  * Indicates that the given task is waiting for this future to complete. When
213  * the future is done, it will reactivate the given task. If this is called
214  * while the future is already done, schedules the task immediately.
215  * Assumes the manager's lock is not held.
216  * @returns true if the future was pending, false if it was already done.
217  */
218 bool AsyncFuture::
220  nassertr(task->is_runnable(), false);
221 
222  // We have to make sure we're not going to change state while we're in the
223  // process of adding the task.
224  if (try_lock_pending()) {
225  if (_manager == nullptr) {
226  _manager = task->_manager;
227  }
228 
229  _waiting.push_back(task);
230 
231  // Unlock the state.
232  unlock();
233  nassertr(task->_manager == nullptr || task->_manager == _manager, true);
234  return true;
235  } else {
236  // It's already done. Wake the task immediately.
237  wake_task(task);
238  return false;
239  }
240 }
241 
242 /**
243  * Reactivates the given task. Assumes the manager lock is not held.
244  */
245 void AsyncFuture::
246 wake_task(AsyncTask *task) {
247  AsyncTaskManager *manager = task->_manager;
248  if (manager == nullptr) {
249  // If it's an unscheduled task, schedule it on the same manager as the
250  // rest of the waiting tasks.
251  manager = _manager;
252  if (manager == nullptr) {
254  }
255  }
256 
257  MutexHolder holder(manager->_lock);
258  switch (task->_state) {
259  case AsyncTask::S_servicing_removed:
260  nassertv(task->_manager == _manager);
261  // Re-adding a self-removed task; this just means clearing the removed
262  // flag.
263  task->_state = AsyncTask::S_servicing;
264  return;
265 
266  case AsyncTask::S_inactive:
267  // Schedule it immediately.
268  nassertv(task->_manager == nullptr);
269 
270  if (task_cat.is_debug()) {
271  task_cat.debug()
272  << "Adding " << *task << " (woken by future " << *this << ")\n";
273  }
274 
275  {
276  manager->_lock.unlock();
277  task->upon_birth(manager);
278  manager->_lock.lock();
279  nassertv(task->_manager == nullptr &&
280  task->_state == AsyncTask::S_inactive);
281 
282  AsyncTaskChain *chain = manager->do_find_task_chain(task->_chain_name);
283  if (chain == nullptr) {
284  task_cat.warning()
285  << "Creating implicit AsyncTaskChain " << task->_chain_name
286  << " for " << manager->get_type() << " " << manager->get_name() << "\n";
287  chain = manager->do_make_task_chain(task->_chain_name);
288  }
289  chain->do_add(task);
290  }
291  return;
292 
293  case AsyncTask::S_awaiting:
294  nassertv(task->_manager == _manager);
295  task->_state = AsyncTask::S_active;
296  task->_chain->_active.push_back(task);
297  --task->_chain->_num_awaiting_tasks;
298  return;
299 
300  default:
301  nassert_raise("unexpected task state");
302  return;
303  }
304 }
305 
306 /**
307  * @see AsyncFuture::gather
308  */
309 AsyncGatheringFuture::
310 AsyncGatheringFuture(AsyncFuture::Futures futures) :
311  _futures(std::move(futures)),
312  _num_pending(0) {
313 
314  bool any_pending = false;
315 
316  AsyncFuture::Futures::const_iterator it;
317  for (it = _futures.begin(); it != _futures.end(); ++it) {
318  AsyncFuture *fut = *it;
319  // If this returns true, the future is not yet done and we need to
320  // register ourselves with it. This creates a circular reference, but it
321  // is resolved when the future is completed or cancelled.
322  if (fut->try_lock_pending()) {
323  if (_manager == nullptr) {
324  _manager = fut->_manager;
325  }
326  fut->_waiting.push_back((AsyncFuture *)this);
327  AtomicAdjust::inc(_num_pending);
328  fut->unlock();
329  any_pending = true;
330  }
331  }
332  if (!any_pending) {
333  // Start in the done state if all the futures we were passed are done.
334  // Note that it is only safe to set this member in this manner if indeed
335  // no other future holds a reference to us.
336  _future_state = (AtomicAdjust::Integer)FS_finished;
337  }
338 }
339 
340 /**
341  * Cancels all the futures. Returns true if any futures were cancelled.
342  * Makes sure that all the futures finish before this one is marked done, in
343  * order to maintain the guarantee that calling result() is safe when done()
344  * returns true.
345  */
348  if (!done()) {
349  // Temporarily increase the pending count so that the notify_done()
350  // callbacks won't end up causing it to be set to "finished".
351  AtomicAdjust::inc(_num_pending);
352 
353  bool any_cancelled = false;
354  AsyncFuture::Futures::const_iterator it;
355  for (it = _futures.begin(); it != _futures.end(); ++it) {
356  AsyncFuture *fut = *it;
357  if (fut->cancel()) {
358  any_cancelled = true;
359  }
360  }
361 
362  // Now change state to "cancelled" and call the notify_done() callbacks.
363  // Don't call notify_done() if another thread has beaten us to it.
364  if (set_future_state(FS_cancelled)) {
365  notify_done(false);
366  }
367 
368  // Decreasing the pending count is kind of pointless now, so we do it only
369  // in a debug build.
370  nassertr(!AtomicAdjust::dec(_num_pending), any_cancelled);
371  return any_cancelled;
372  } else {
373  return false;
374  }
375 }
static ClockObject * get_global_clock()
Returns a pointer to the global ClockObject.
Definition: clockObject.I:215
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
This class represents a thread-safe handle to a promised future result of an asynchronous operation,...
Definition: asyncFuture.h:61
An optional parameter associated with an event.
void notify_done(bool clean_exit)
Schedules the done callbacks.
A class to manage a loose queue of isolated tasks, which can be performed either synchronously (in th...
virtual ~AsyncFuture()
Destroys the future.
Definition: asyncFuture.cxx:29
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
This is an abstract class that all classes which use TypeHandle, and also provide virtual functions t...
Definition: typedObject.h:88
A lightweight class that can be used to automatically start and stop a PStatCollector around a sectio...
Definition: pStatTimer.h:30
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
A lightweight C++ object whose constructor calls acquire() and whose destructor calls release() on a ...
Definition: mutexHolder.h:25
virtual bool cancel() override
Cancels all the futures.
void unlock()
Alias for release() to match C++11 semantics.
Definition: mutexDirect.I:39
static void inc(Integer &var)
Atomically increments the indicated variable.
static void force_yield()
Suspends the current thread for the rest of the current epoch.
Definition: thread.I:201
static bool dec(Integer &var)
Atomically decrements the indicated variable and returns true if the new value is nonzero,...
bool add_waiting_task(AsyncTask *task)
Indicates that the given task is waiting for this future to complete.
static AsyncFuture * gather(Futures futures)
Creates a new future that returns `done()` when all of the contained futures are done.
Definition: asyncFuture.I:124
bool done() const
Returns true if the future is done or has been cancelled.
Definition: asyncFuture.I:29
void wait()
Waits until the future is done.
Definition: asyncFuture.cxx:82
A trivial implementation for atomic adjustments for systems that don't require multiprogramming,...
A ClockObject keeps track of elapsed real time and discrete time.
Definition: clockObject.h:58
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
void set_result(std::nullptr_t)
Sets this future's result.
Definition: asyncFuture.I:92
The AsyncTaskChain is a subset of the AsyncTaskManager.
static Integer get(const Integer &var)
Atomically retrieves the snapshot value of the indicated variable.
get_real_time
Returns the actual number of seconds elapsed since the ClockObject was created, or since it was last ...
Definition: clockObject.h:92
Specific future that collects the results of several futures.
Definition: asyncFuture.h:165
A base class for all things that want to be reference-counted.
This class represents a concrete task performed by an AsyncManager.
Definition: asyncTask.h:32
A named event, possibly with parameters.
Definition: event.h:33
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
TypeHandle is the identifier used to differentiate C++ class types.
Definition: typeHandle.h:81
virtual bool cancel()
Cancels the future.
Definition: asyncFuture.cxx:43
void lock()
Alias for acquire() to match C++11 semantics.
Definition: mutexDirect.I:19
static AsyncTaskManager * get_global_ptr()
Returns a pointer to the global AsyncTaskManager.