Panda3D
Loading...
Searching...
No Matches
asyncFuture.cxx
Go to the documentation of this file.
1/**
2 * PANDA 3D SOFTWARE
3 * Copyright (c) Carnegie Mellon University. All rights reserved.
4 *
5 * All use of this software is subject to the terms of the revised BSD
6 * license. You should have received a copy of this license along
7 * with this source code in a file named "LICENSE."
8 *
9 * @file asyncFuture.cxx
10 * @author rdb
11 * @date 2017-11-28
12 */
13
14#include "asyncFuture.h"
15#include "asyncTask.h"
16#include "asyncTaskManager.h"
17#include "conditionVarFull.h"
18#include "config_event.h"
19#include "pStatTimer.h"
20#include "throw_event.h"
21
22TypeHandle AsyncFuture::_type_handle;
23TypeHandle AsyncGatheringFuture::_type_handle;
24
25/**
26 * Destroys the future. Assumes notify_done() has already been called.
27 */
30 // If this triggers, the future destroyed before it was cancelled, which is
31 // not valid. Unless we should simply call cancel() here?
32 nassertv(_waiting.empty());
33
34 // This is an attempt to work around what appears to be a compiler bug in
35 // MSVC when compiling with optimizations and having an EventStoreInt stored
36 // in this field. It crashes when we delete via the ReferenceCount base
37 // instead of via the TypedObject. I haven't been able to find out why;
38 // just that it doesn't happen with ParamString. ~rdb
39 ReferenceCount *result_ref = _result_ref.p();
40 if (result_ref != nullptr) {
41 _result_ref.cheat() = nullptr;
42 if (!result_ref->unref()) {
43 delete _result;
44 }
45 _result = nullptr;
46 }
47}
48
49/**
50 * Cancels the future. Returns true if it was cancelled, or false if the
51 * future was already done. Either way, done() will return true after this
52 * call returns.
53 *
54 * In the case of a task, this is equivalent to remove().
55 */
57cancel() {
58 if (set_future_state(FS_cancelled)) {
59 // The compare-swap operation succeeded, so schedule the callbacks.
60 notify_done(false);
61 return true;
62 } else {
63 // It's already done.
64 return false;
65 }
66}
67
68/**
69 *
70 */
71void AsyncFuture::
72output(std::ostream &out) const {
73 out << get_type();
74 FutureState state = (FutureState)AtomicAdjust::get(_future_state);
75 switch (state) {
76 case FS_pending:
77 case FS_locked_pending:
78 out << " (pending)";
79 break;
80 case FS_finished:
81 out << " (finished)";
82 break;
83 case FS_cancelled:
84 out << " (cancelled)";
85 break;
86 default:
87 out << " (**INVALID**)";
88 break;
89 }
90}
91
92/**
93 * Waits until the future is done.
94 */
96wait() {
97 if (done()) {
98 return;
99 }
100
101 PStatTimer timer(AsyncTaskChain::_wait_pcollector);
102 if (task_cat.is_debug()) {
103 task_cat.debug()
104 << "Waiting for future " << *this << "\n";
105 }
106
107 // Continue to yield while the future isn't done. It may be more efficient
108 // to use a condition variable, but let's not add the extra complexity
109 // unless we're sure that we need it.
110 do {
112 } while (!done());
113}
114
115/**
116 * Waits until the future is done, or until the timeout is reached.
117 */
119wait(double timeout) {
120 if (done()) {
121 return;
122 }
123
124 PStatTimer timer(AsyncTaskChain::_wait_pcollector);
125 if (task_cat.is_debug()) {
126 task_cat.debug()
127 << "Waiting up to " << timeout << " seconds for future " << *this << "\n";
128 }
129
130 // Continue to yield while the future isn't done. It may be more efficient
131 // to use a condition variable, but let's not add the extra complexity
132 // unless we're sure that we need it.
134 double end = clock->get_real_time() + timeout;
135 do {
137 } while (!done() && clock->get_real_time() < end);
138}
139
140/**
141 * Schedules the done callbacks. Called after the future has just entered the
142 * 'done' state.
143 * @param clean_exit true if finished successfully, false if cancelled.
144 */
146notify_done(bool clean_exit) {
147 nassertv(done());
148
149 // This will only be called by the thread that managed to set the
150 // _future_state away from the "pending" state, so this is thread safe.
151
152 Futures::iterator it;
153 for (it = _waiting.begin(); it != _waiting.end(); ++it) {
154 AsyncFuture *fut = *it;
155 if (fut->is_task()) {
156 // It's a task. Make it active again.
157 wake_task((AsyncTask *)fut);
158 } else {
159 // It's a gathering future. Decrease the pending count on it, and if
160 // we're the last one, call notify_done() on it.
162 if (!AtomicAdjust::dec(gather->_num_pending)) {
163 if (gather->set_future_state(FS_finished)) {
164 gather->notify_done(true);
165 }
166 }
167 }
168 }
169 _waiting.clear();
170
171 // For historical reasons, we don't send the "done event" if the future was
172 // cancelled.
173 if (clean_exit && !_done_event.empty()) {
174 PT_Event event = new Event(_done_event);
175 event->add_parameter(EventParameter(this));
176 throw_event(std::move(event));
177 }
178}
179
180/**
181 * Sets this future's result. Can only be done while the future is not done.
182 * Calling this marks the future as done and schedules the done callbacks.
183 *
184 * This variant takes two pointers; the second one is only set if this object
185 * inherits from ReferenceCount, so that a reference can be held.
186 *
187 * Assumes the manager's lock is *not* held.
188 */
190set_result(TypedObject *ptr, ReferenceCount *ref_ptr) {
191 // We don't strictly need to lock the future since only one thread is
192 // allowed to call set_result(), but we might as well.
193 FutureState orig_state = (FutureState)AtomicAdjust::
194 compare_and_exchange(_future_state, (AtomicAdjust::Integer)FS_pending,
195 (AtomicAdjust::Integer)FS_locked_pending);
196
197 while (orig_state == FS_locked_pending) {
199 orig_state = (FutureState)AtomicAdjust::
200 compare_and_exchange(_future_state, (AtomicAdjust::Integer)FS_pending,
201 (AtomicAdjust::Integer)FS_locked_pending);
202 }
203
204 if (orig_state == FS_pending) {
205 _result = ptr;
206 _result_ref = ref_ptr;
207 unlock(FS_finished);
208
209 // OK, now our thread owns the _waiting vector et al.
210 notify_done(true);
211
212 } else if (orig_state == FS_cancelled) {
213 // This was originally illegal, but there is a chance that the future was
214 // cancelled while another thread was setting the result. So, we drop
215 // this, but we can issue a warning.
216 task_cat.warning()
217 << "Ignoring set_result() called on cancelled " << *this << "\n";
218
219 } else {
220 task_cat.error()
221 << "set_result() was called on finished " << *this << "\n";
222 }
223}
224
225/**
226 * Indicates that the given task is waiting for this future to complete. When
227 * the future is done, it will reactivate the given task. If this is called
228 * while the future is already done, schedules the task immediately.
229 * Assumes the manager's lock is not held.
230 * @returns true if the future was pending, false if it was already done.
231 */
234 nassertr(task->is_runnable(), false);
235
236 // We have to make sure we're not going to change state while we're in the
237 // process of adding the task.
238 if (try_lock_pending()) {
239 if (_manager == nullptr) {
240 _manager = task->_manager;
241 }
242
243 _waiting.push_back(task);
244
245 // Unlock the state.
246 unlock();
247 nassertr(task->_manager == nullptr || task->_manager == _manager, true);
248 return true;
249 } else {
250 // It's already done. Wake the task immediately.
251 wake_task(task);
252 return false;
253 }
254}
255
256/**
257 * Reactivates the given task. Assumes the manager lock is not held.
258 */
259void AsyncFuture::
260wake_task(AsyncTask *task) {
261 AsyncTaskManager *manager = task->_manager;
262 if (manager == nullptr) {
263 // If it's an unscheduled task, schedule it on the same manager as the
264 // rest of the waiting tasks.
265 manager = _manager;
266 if (manager == nullptr) {
268 }
269 }
270
271 MutexHolder holder(manager->_lock);
272 switch (task->_state) {
273 case AsyncTask::S_servicing_removed:
274 nassertv(task->_manager == _manager);
275 // Re-adding a self-removed task; this just means clearing the removed
276 // flag.
277 task->_state = AsyncTask::S_servicing;
278 return;
279
280 case AsyncTask::S_inactive:
281 // Schedule it immediately.
282 nassertv(task->_manager == nullptr);
283
284 if (task_cat.is_debug()) {
285 task_cat.debug()
286 << "Adding " << *task << " (woken by future " << *this << ")\n";
287 }
288
289 {
290 manager->_lock.unlock();
291 task->upon_birth(manager);
292 manager->_lock.lock();
293 nassertv(task->_manager == nullptr &&
294 task->_state == AsyncTask::S_inactive);
295
296 AsyncTaskChain *chain = manager->do_find_task_chain(task->_chain_name);
297 if (chain == nullptr) {
298 task_cat.warning()
299 << "Creating implicit AsyncTaskChain " << task->_chain_name
300 << " for " << manager->get_type() << " " << manager->get_name() << "\n";
301 chain = manager->do_make_task_chain(task->_chain_name);
302 }
303 chain->do_add(task);
304 }
305 return;
306
307 case AsyncTask::S_awaiting:
308 nassertv(task->_manager == _manager);
309 task->_state = AsyncTask::S_active;
310 task->_chain->_active.push_back(task);
311 --task->_chain->_num_awaiting_tasks;
312 return;
313
314 default:
315 nassert_raise("unexpected task state");
316 return;
317 }
318}
319
320/**
321 * @see AsyncFuture::gather
322 */
323AsyncGatheringFuture::
324AsyncGatheringFuture(AsyncFuture::Futures futures) :
325 _futures(std::move(futures)),
326 _num_pending(0) {
327
328 bool any_pending = false;
329
330 AsyncFuture::Futures::const_iterator it;
331 for (it = _futures.begin(); it != _futures.end(); ++it) {
332 AsyncFuture *fut = *it;
333 // If this returns true, the future is not yet done and we need to
334 // register ourselves with it. This creates a circular reference, but it
335 // is resolved when the future is completed or cancelled.
336 if (fut->try_lock_pending()) {
337 if (_manager == nullptr) {
338 _manager = fut->_manager;
339 }
340 fut->_waiting.push_back((AsyncFuture *)this);
341 AtomicAdjust::inc(_num_pending);
342 fut->unlock();
343 any_pending = true;
344 }
345 }
346 if (!any_pending) {
347 // Start in the done state if all the futures we were passed are done.
348 // Note that it is only safe to set this member in this manner if indeed
349 // no other future holds a reference to us.
350 _future_state = (AtomicAdjust::Integer)FS_finished;
351 }
352}
353
354/**
355 * Cancels all the futures. Returns true if any futures were cancelled.
356 * Makes sure that all the futures finish before this one is marked done, in
357 * order to maintain the guarantee that calling result() is safe when done()
358 * returns true.
359 */
361cancel() {
362 if (!done()) {
363 // Temporarily increase the pending count so that the notify_done()
364 // callbacks won't end up causing it to be set to "finished".
365 AtomicAdjust::inc(_num_pending);
366
367 bool any_cancelled = false;
368 AsyncFuture::Futures::const_iterator it;
369 for (it = _futures.begin(); it != _futures.end(); ++it) {
370 AsyncFuture *fut = *it;
371 if (fut->cancel()) {
372 any_cancelled = true;
373 }
374 }
375
376 // Now change state to "cancelled" and call the notify_done() callbacks.
377 // Don't call notify_done() if another thread has beaten us to it.
378 if (set_future_state(FS_cancelled)) {
379 notify_done(false);
380 }
381
382 // Decreasing the pending count is kind of pointless now, so we do it only
383 // in a debug build.
384 nassertr(!AtomicAdjust::dec(_num_pending), any_cancelled);
385 return any_cancelled;
386 } else {
387 return false;
388 }
389}
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
This class represents a thread-safe handle to a promised future result of an asynchronous operation,...
Definition asyncFuture.h:61
virtual bool cancel()
Cancels the future.
void notify_done(bool clean_exit)
Schedules the done callbacks.
void set_result(std::nullptr_t)
Sets this future's result.
Definition asyncFuture.I:92
void wait()
Waits until the future is done.
virtual ~AsyncFuture()
Destroys the future.
static AsyncFuture * gather(Futures futures)
Creates a new future that returns `done()` when all of the contained futures are done.
bool add_waiting_task(AsyncTask *task)
Indicates that the given task is waiting for this future to complete.
bool done() const
Returns true if the future is done or has been cancelled.
Definition asyncFuture.I:29
Specific future that collects the results of several futures.
virtual bool cancel() override
Cancels all the futures.
The AsyncTaskChain is a subset of the AsyncTaskManager.
A class to manage a loose queue of isolated tasks, which can be performed either synchronously (in th...
static AsyncTaskManager * get_global_ptr()
Returns a pointer to the global AsyncTaskManager.
This class represents a concrete task performed by an AsyncManager.
Definition asyncTask.h:32
A trivial implementation for atomic adjustments for systems that don't require multiprogramming,...
static bool dec(Integer &var)
Atomically decrements the indicated variable and returns true if the new value is nonzero,...
static void inc(Integer &var)
Atomically increments the indicated variable.
static Integer get(const Integer &var)
Atomically retrieves the snapshot value of the indicated variable.
A ClockObject keeps track of elapsed real time and discrete time.
Definition clockObject.h:58
get_real_time
Returns the actual number of seconds elapsed since the ClockObject was created, or since it was last ...
Definition clockObject.h:92
static ClockObject * get_global_clock()
Returns a pointer to the global ClockObject.
An optional parameter associated with an event.
A named event, possibly with parameters.
Definition event.h:33
void unlock()
Alias for release() to match C++11 semantics.
Definition mutexDirect.I:39
void lock()
Alias for acquire() to match C++11 semantics.
Definition mutexDirect.I:19
A lightweight C++ object whose constructor calls acquire() and whose destructor calls release() on a ...
Definition mutexHolder.h:25
A lightweight class that can be used to automatically start and stop a PStatCollector around a sectio...
Definition pStatTimer.h:30
A base class for all things that want to be reference-counted.
virtual bool unref() const
Explicitly decrements the reference count.
static void force_yield()
Suspends the current thread for the rest of the current epoch.
Definition thread.I:201
TypeHandle is the identifier used to differentiate C++ class types.
Definition typeHandle.h:81
This is an abstract class that all classes which use TypeHandle, and also provide virtual functions t...
Definition typedObject.h:88
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
STL namespace.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.