activemq-cpp-3.9.5
FutureTask.h
Go to the documentation of this file.
1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18#ifndef _DECAF_UTIL_CONCURRENT_FUTURETASK_H_
19#define _DECAF_UTIL_CONCURRENT_FUTURETASK_H_
20
21#include <decaf/util/Config.h>
22
23#include <decaf/lang/Thread.h>
24#include <decaf/lang/Pointer.h>
26
34
35namespace decaf {
36namespace util {
37namespace concurrent {
38
39 using decaf::lang::Pointer;
40
57 template<typename T>
58 class FutureTask : public RunnableFuture<T> {
59 private:
60
65 class FutureTaskAdapter : public decaf::util::concurrent::Callable<T> {
66 private:
67
70 bool owns;
71 T result;
72
73 private:
74
75 FutureTaskAdapter(const FutureTaskAdapter&);
76 FutureTaskAdapter operator= (const FutureTaskAdapter&);
77
78 public:
79
80 FutureTaskAdapter(decaf::lang::Runnable* task, const T& result, bool owns = true) :
81 decaf::util::concurrent::Callable<T>(), task(task), callable(NULL), owns(owns), result(result) {
82 }
83
84 FutureTaskAdapter(decaf::util::concurrent::Callable<T>* task, bool owns = true) :
85 decaf::util::concurrent::Callable<T>(), task(NULL), callable(task), owns(owns), result(T()) {
86 }
87
88 virtual ~FutureTaskAdapter() {
89 try{
90 if (owns) {
91 delete this->task;
92 delete this->callable;
93 }
94 }
96 }
97
98 virtual T call() {
99 if (this->task != NULL) {
100 this->task->run();
101 return result;
102 } else {
103 return this->callable->call();
104 }
105 }
106 };
107
113 class FutureTaskSync : public locks::AbstractQueuedSynchronizer {
114 private:
115
116 enum SyncState {
118 READY = 0,
120 RUNNING = 1,
122 RAN = 2,
124 CANCELLED = 4
125 };
126
128 Pointer< Callable<T> > callable;
129
131 T result;
132
135
136 // The FutureTask parent of the Sync object.
137 FutureTask* parent;
138
143 decaf::lang::Thread* runner;
144
145 private:
146
147 FutureTaskSync(const FutureTaskSync&);
148 FutureTaskSync operator= (const FutureTaskSync&);
149
150 public:
151
152 FutureTaskSync(FutureTask* parent, Callable<T>* callable) :
153 AbstractQueuedSynchronizer(), callable(callable), result(), exception(), parent(parent), runner(NULL) {
154 }
155
156 virtual ~FutureTaskSync() {
157 }
158
159 bool innerIsCancelled() const {
160 return getState() == CANCELLED;
161 }
162
163 bool innerIsDone() const {
164 return ranOrCancelled(getState()) && this->runner == NULL;
165 }
166
167 T innerGet() {
169 if (getState() == CANCELLED) {
170 throw CancellationException();
171 }
172 if (exception != NULL) {
173 throw ExecutionException(exception->clone());
174 }
175 return result;
176 }
177
178 T innerGet(long long nanosTimeout) {
179 if (!tryAcquireSharedNanos(0, nanosTimeout)) {
180 throw TimeoutException();
181 }
182 if (getState() == CANCELLED) {
183 throw CancellationException();
184 }
185 if (exception != NULL) {
186 throw ExecutionException(exception->clone());
187 }
188 return result;
189 }
190
191 void innerSet(const T& result) {
192 for (;;) {
193 int s = getState();
194 if (s == RAN) {
195 return;
196 }
197 if (s == CANCELLED) {
198 // aggressively release to set runner to null,
199 // in case we are racing with a cancel request
200 // that will try to interrupt runner
201 releaseShared(0);
202 return;
203 }
204 if (compareAndSetState(s, RAN)) {
205 this->result = result;
206 releaseShared(0);
207 this->parent->done();
208 return;
209 }
210 }
211 }
212
213 void innerSetException(const decaf::lang::Exception& t) {
214 for (;;) {
215 int s = getState();
216 if (s == RAN) {
217 return;
218 }
219 if (s == CANCELLED) {
220 // aggressively release to set runner to null,
221 // in case we are racing with a cancel request
222 // that will try to interrupt runner
223 releaseShared(0);
224 return;
225 }
226 if (compareAndSetState(s, RAN)) {
227 exception.reset(t.clone());
228 releaseShared(0);
229 this->parent->done();
230 return;
231 }
232 }
233 }
234
235 bool innerCancel(bool mayInterruptIfRunning) {
236 for (;;) {
237 int s = getState();
238 if (ranOrCancelled(s)) {
239 return false;
240 }
241 if (compareAndSetState(s, CANCELLED)) {
242 break;
243 }
244 }
245
246 if (mayInterruptIfRunning) {
247 decaf::lang::Thread* r = runner;
248 if (r != NULL) {
249 r->interrupt();
250 }
251 }
252
253 releaseShared(0);
254 this->parent->done();
255 return true;
256 }
257
258 void innerRun() {
259 if (!compareAndSetState(READY, RUNNING)) {
260 return;
261 }
262
263 this->runner = decaf::lang::Thread::currentThread();
264 if (getState() == RUNNING) { // recheck after setting thread
265 T result;
266 try {
267 result = this->callable->call();
268 } catch(decaf::lang::Exception& ex) {
269 this->parent->setException(ex);
270 return;
271 } catch(std::exception& stdex) {
272 this->parent->setException(decaf::lang::Exception(new std::exception(stdex)));
273 return;
274 } catch(...) {
276 __FILE__, __LINE__, "FutureTask Caught Unknown exception during task execution."));
277 return;
278 }
279 this->parent->set(result);
280 } else {
281 releaseShared(0); // cancel
282 }
283 }
284
285 bool innerRunAndReset() {
286 if (!compareAndSetState(READY, RUNNING)) {
287 return false;
288 }
289
290 try {
291 this->runner = decaf::lang::Thread::currentThread();
292 if (getState() == RUNNING) {
293 this->callable->call(); // don't set result
294 }
295 this->runner = NULL;
296 return compareAndSetState(RUNNING, READY);
297 } catch(decaf::lang::Exception& ex) {
298 this->parent->setException(ex);
299 return false;
300 } catch(std::exception& stdex) {
301 this->parent->setException(decaf::lang::Exception(new std::exception(stdex)));
302 return false;
303 } catch(...) {
305 __FILE__, __LINE__, "FutureTask Caught Unknown exception during task execution."));
306 return false;
307 }
308 }
309
310 protected:
311
315 virtual int tryAcquireShared(int ignore DECAF_UNUSED) {
316 return innerIsDone() ? 1 : -1;
317 }
318
323 virtual bool tryReleaseShared(int ignore DECAF_UNUSED) {
324 runner = NULL;
325 return true;
326 }
327
328 private:
329
330 bool ranOrCancelled(int state) const {
331 return (state & (RAN | CANCELLED)) != 0;
332 }
333 };
334
335 private:
336
338
339 public:
340
352 FutureTask(Callable<T>* callable, bool takeOwnership = true) : sync(NULL) {
353 if (callable == NULL ) {
354 throw decaf::lang::exceptions::NullPointerException(__FILE__, __LINE__,
355 "The Callable pointer passed to the constructor was NULL");
356 }
357
358 this->sync.reset(new FutureTaskSync(this, new FutureTaskAdapter(callable, takeOwnership)));
359 }
360
375 FutureTask(decaf::lang::Runnable* runnable, const T& result, bool takeOwnership = true) : sync(NULL) {
376 if (runnable == NULL ) {
377 throw decaf::lang::exceptions::NullPointerException(__FILE__, __LINE__,
378 "The Runnable pointer passed to the constructor was NULL");
379 }
380
381 this->sync.reset(new FutureTaskSync(this, new FutureTaskAdapter(runnable, result, takeOwnership)));
382 }
383
384 virtual ~FutureTask() {
385 }
386
387 virtual bool isCancelled() const {
388 return this->sync->innerIsCancelled();
389 }
390
391 virtual bool isDone() const {
392 return this->sync->innerIsDone();
393 }
394
395 virtual bool cancel(bool mayInterruptIfRunning) {
396 return this->sync->innerCancel(mayInterruptIfRunning);
397 }
398
399 virtual T get() {
400 return this->sync->innerGet();
401 }
402
403 virtual T get(long long timeout, const TimeUnit& unit) {
404 return this->sync->innerGet(unit.toNanos(timeout));
405 }
406
408 return new FutureTask<T>(*this);
409 }
410
411 public:
412
421 virtual void done() {}
422
432 virtual void set(const T& result) {
433 this->sync->innerSet(result);
434 }
435
445 virtual void setException(const decaf::lang::Exception& error) {
446 this->sync->innerSetException(error);
447 }
448
449 virtual void run() {
450 this->sync->innerRun();
451 }
452
461 virtual bool runAndReset() {
462 return this->sync->innerRunAndReset();
463 }
464
465 public:
466
467 FutureTask(const FutureTask<T>& source) : RunnableFuture<T>(), sync(source.sync) {
468 }
469
471 this->sync = source.sync;
472 return *this;
473 }
474
475 };
476
477}}}
478
479#endif /* _DECAF_UTIL_CONCURRENT_FUTURETASK_H_ */
Definition Exception.h:38
virtual Exception * clone() const
Clones this exception.
Decaf's implementation of a Smart Pointer that is a template on a Type and is Thread Safe if the defa...
Definition Pointer.h:53
void reset(T *value=NULL)
Resets the Pointer to hold the new value.
Definition Pointer.h:161
Interface for a runnable object - defines a task that can be run by a thread.
Definition Runnable.h:29
virtual void run()=0
Run method - called by the Thread class in the context of the thread.
A Thread is a concurrent unit of execution.
Definition Thread.h:64
void interrupt()
Interrupts the Thread if it is blocked and in an interruptible state.
static Thread * currentThread()
Returns a pointer to the currently executing thread object.
Definition NullPointerException.h:32
A task that returns a result and may throw an exception.
Definition Callable.h:47
virtual V call()=0
Computes a result, or throws an exception if unable to do so.
Definition CancellationException.h:30
Definition ExecutionException.h:31
virtual void run()
Run method - called by the Thread class in the context of the thread.
Definition FutureTask.h:449
virtual bool isDone() const
Returns true if this task completed.
Definition FutureTask.h:391
virtual void setException(const decaf::lang::Exception &error)
Causes this future to report an ExecutionException with the given Exception as its cause,...
Definition FutureTask.h:445
virtual bool cancel(bool mayInterruptIfRunning)
Attempts to cancel execution of this task.
Definition FutureTask.h:395
FutureTask< T > & operator=(const FutureTask< T > &source)
Definition FutureTask.h:470
FutureTask(const FutureTask< T > &source)
Definition FutureTask.h:467
FutureTask(Callable< T > *callable, bool takeOwnership=true)
Creates a FutureTask instance that will, upon running, execute the given Callable.
Definition FutureTask.h:352
virtual T get(long long timeout, const TimeUnit &unit)
Waits if necessary for at most the given time for the computation to complete, and then retrieves its...
Definition FutureTask.h:403
FutureTask< T > * clone()
Definition FutureTask.h:407
virtual bool runAndReset()
Executes the computation without setting its result, and then resets this Future to initial state,...
Definition FutureTask.h:461
virtual ~FutureTask()
Definition FutureTask.h:384
virtual void done()
Protected method invoked when this task transitions to state isDone (whether normally or via cancella...
Definition FutureTask.h:421
virtual T get()
Waits if necessary for the computation to complete, and then retrieves its result.
Definition FutureTask.h:399
virtual bool isCancelled() const
Returns true if this task was canceled before it completed normally.
Definition FutureTask.h:387
virtual void set(const T &result)
Sets the result of this Future to the given value unless this future has already been set or has been...
Definition FutureTask.h:432
FutureTask(decaf::lang::Runnable *runnable, const T &result, bool takeOwnership=true)
Creates a FutureTask that will, upon running, execute the given Runnable, and arrange that the get me...
Definition FutureTask.h:375
A Runnable version of the Future type.
Definition RunnableFuture.h:38
A TimeUnit represents time durations at a given unit of granularity and provides utility methods to c...
Definition TimeUnit.h:62
long long toNanos(long long duration) const
Equivalent to NANOSECONDS.convert(duration, this).
Definition TimeUnit.h:126
Definition TimeoutException.h:32
Definition AbstractQueuedSynchronizer.h:35
virtual int getState() const
Gets and returns the currently set value of this object Synchronization sate.
virtual bool compareAndSetState(int expect, int update)
Sets the synchronization state to the specified value if the current value is equal to the expected v...
void acquireSharedInterruptibly(int arg)
Acquire the lock in shared mode, allowing interruption.
bool releaseShared(int arg)
When held in shared mode this method releases the Synchronizer.
bool tryAcquireSharedNanos(int arg, long long nanos)
Acquires in shared mode if possible, first checking if the calling thread has already been interrupte...
#define DECAF_CATCHALL_NOTHROW()
A catch-all that does not throw an exception, one use would be to catch any exception in a destructor...
Definition ExceptionDefines.h:62
#define NULL
Definition Config.h:33
Definition AbstractExecutorService.h:28
Definition AbstractCollection.h:33
Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.
Definition AprPool.h:25