activemq-cpp-3.9.5
TransferQueue.h
Go to the documentation of this file.
1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18#ifndef _DECAF_INTERNAL_UTIL_CONCURRENT_TRANSFERQUEUE_H_
19#define _DECAF_INTERNAL_UTIL_CONCURRENT_TRANSFERQUEUE_H_
20
22
27
28#include <decaf/lang/Thread.h>
29
30namespace decaf {
31namespace internal {
32namespace util {
33namespace concurrent {
34
35 using decaf::util::concurrent::atomic::AtomicReference;
36
45 template< typename E >
46 class TransferQueue : public Transferer<E> {
47 private:
48
49// /** Node class for TransferQueue. */
50// class QNode {
51// private:
52//
53// AtomicReference<QNode> next; // next node in queue
54// volatile decaf::lang::Thread* waiter; // to control park/unpark
55// AtomicReference<E*> item; // CAS'ed to or from null
56//
57// bool data;
58// bool cancelled;
59//
60// public:
61//
62// QNode() : cancelled( false ), data( false ) {
63// }
64//
65// QNode( E* item ) : cancelled( false ), data( true ) {
66// this->item.set( item );
67// }
68//
69// bool casNext( QNode* cmp, QNode* val ) {
70// return ( this->next == cmp && this->next.compareAndSet( cmp, val ) );
71// }
72//
73// bool casItem( E* cmp, E* val ) {
74// return ( this->item == cmp && this->item.compareAndSet( cmp, val ) );
75// }
76//
77// /**
78// * Tries to cancel by CAS'ing ref to NULL if that succeeds then we mark as cancelled.
79// */
80// void tryCancel( E* cmp ) {
81// if( item.compareAndSet( cmp, NULL ) ) {
82// this->cancelled = true;
83// }
84// }
85//
86// bool isCancelled() {
87// return this->item == this;
88// }
89//
90// /**
91// * Returns true if this node is known to be off the queue
92// * because its next pointer has been forgotten due to
93// * an advanceHead operation.
94// */
95// bool isOffList() {
96// return this->next == NULL;
97// }
98// };
99
100 private:
101
102// decaf::util::concurrent::atomic::AtomicReference<QNode> head;
103// decaf::util::concurrent::atomic::AtomicReference<QNode> tail;
104//
105// decaf::util::concurrent::atomic::AtomicReference<QNode> cleanMe;
106
107 public:
108
110// // Initialize with a dummy Node.
111// this->head.set( new QNode() );
112// this->tail.set( this->head.get() );
113 }
114
115 virtual ~TransferQueue() {}
116
117 virtual void transfer( E* e, bool timed, long long nanos ) {
118
119 }
120
121 virtual E* transfer( bool timed, long long nanos ) {
122 return NULL;
123 }
124
128// E* transfer( E* e, bool timed, long long nanos ) {
129
130 /* Basic algorithm is to loop trying to take either of
131 * two actions:
132 *
133 * 1. If queue apparently empty or holding same-mode nodes,
134 * try to add node to queue of waiters, wait to be
135 * fulfilled (or cancelled) and return matching item.
136 *
137 * 2. If queue apparently contains waiting items, and this
138 * call is of complementary mode, try to fulfill by CAS'ing
139 * item field of waiting node and dequeuing it, and then
140 * returning matching item.
141 *
142 * In each case, along the way, check for and try to help
143 * advance head and tail on behalf of other stalled/slow
144 * threads.
145 *
146 * The loop starts off with a null check guarding against
147 * seeing uninitialized head or tail values. This never
148 * happens in current SynchronousQueue, but could if
149 * callers held non-volatile/final ref to the
150 * transferer. The check is here anyway because it places
151 * null checks at top of loop, which is usually faster
152 * than having them implicitly interspersed.
153 */
154
155// QNode s = null; // constructed/reused as needed
156// boolean isData = (e != null);
157//
158// for (;;) {
159// QNode t = tail;
160// QNode h = head;
161// if (t == null || h == null) // saw uninitialized value
162// continue; // spin
163//
164// if (h == t || t.isData == isData) { // empty or same-mode
165// QNode tn = t.next;
166// if (t != tail) // inconsistent read
167// continue;
168// if (tn != null) { // lagging tail
169// advanceTail(t, tn);
170// continue;
171// }
172// if (timed && nanos <= 0) // can't wait
173// return null;
174// if (s == null)
175// s = new QNode(e, isData);
176// if (!t.casNext(null, s)) // failed to link in
177// continue;
178//
179// advanceTail(t, s); // swing tail and wait
180// Object x = awaitFulfill(s, e, timed, nanos);
181// if (x == s) { // wait was cancelled
182// clean(t, s);
183// return null;
184// }
185//
186// if (!s.isOffList()) { // not already unlinked
187// advanceHead(t, s); // unlink if head
188// if (x != null) // and forget fields
189// s.item = s;
190// s.waiter = null;
191// }
192// return (x != null)? x : e;
193//
194// } else { // complementary-mode
195// QNode m = h.next; // node to fulfill
196// if (t != tail || m == null || h != head)
197// continue; // inconsistent read
198//
199// Object x = m.item;
200// if (isData == (x != null) || // m already fulfilled
201// x == m || // m cancelled
202// !m.casItem(x, e)) { // lost CAS
203// advanceHead(h, m); // dequeue and retry
204// continue;
205// }
206//
207// advanceHead(h, m); // successfully fulfilled
208// LockSupport.unpark(m.waiter);
209// return (x != null)? x : e;
210// }
211// }
212// }
213
214 private:
215
225// Object awaitFulfill(QNode s, Object e, boolean timed, long nanos) {
226// /* Same idea as TransferStack.awaitFulfill */
227// long lastTime = (timed)? System.nanoTime() : 0;
228// Thread w = Thread.currentThread();
229// int spins = ((head.next == s) ?
230// (timed? maxTimedSpins : maxUntimedSpins) : 0);
231// for (;;) {
232// if (w.isInterrupted())
233// s.tryCancel(e);
234// Object x = s.item;
235// if (x != e)
236// return x;
237// if (timed) {
238// long now = System.nanoTime();
239// nanos -= now - lastTime;
240// lastTime = now;
241// if (nanos <= 0) {
242// s.tryCancel(e);
243// continue;
244// }
245// }
246// if (spins > 0)
247// --spins;
248// else if (s.waiter == null)
249// s.waiter = w;
250// else if (!timed)
251// LockSupport.park();
252// else if (nanos > spinForTimeoutThreshold)
253// LockSupport.parkNanos(nanos);
254// }
255// }
256
260// void clean(QNode pred, QNode s) {
261// s.waiter = null; // forget thread
262// /*
263// * At any given time, exactly one node on list cannot be
264// * deleted -- the last inserted node. To accommodate this,
265// * if we cannot delete s, we save its predecessor as
266// * "cleanMe", deleting the previously saved version
267// * first. At least one of node s or the node previously
268// * saved can always be deleted, so this always terminates.
269// */
270// while (pred.next == s) { // Return early if already unlinked
271// QNode h = head;
272// QNode hn = h.next; // Absorb cancelled first node as head
273// if (hn != null && hn.isCancelled()) {
274// advanceHead(h, hn);
275// continue;
276// }
277// QNode t = tail; // Ensure consistent read for tail
278// if (t == h)
279// return;
280// QNode tn = t.next;
281// if (t != tail)
282// continue;
283// if (tn != null) {
284// advanceTail(t, tn);
285// continue;
286// }
287// if (s != t) { // If not tail, try to unsplice
288// QNode sn = s.next;
289// if (sn == s || pred.casNext(s, sn))
290// return;
291// }
292// QNode dp = cleanMe;
293// if (dp != null) { // Try unlinking previous cancelled node
294// QNode d = dp.next;
295// QNode dn;
296// if (d == null || // d is gone or
297// d == dp || // d is off list or
298// !d.isCancelled() || // d not cancelled or
299// (d != t && // d not tail and
300// (dn = d.next) != null && // has successor
301// dn != d && // that is on list
302// dp.casNext(d, dn))) // d unspliced
303// casCleanMe(dp, null);
304// if (dp == pred)
305// return; // s is already saved node
306// } else if (casCleanMe(null, pred))
307// return; // Postpone cleaning s
308// }
309// }
310
311// /**
312// * Tries to cas nh as new head; if successful, unlink
313// * old head's next node to avoid garbage retention.
314// */
315// void advanceHead( QNode* h, QNode* nh ) {
316// if( h == head.get() && this->head.compareAndSet( h, nh ) ) {
317// h->next = h; // forget old next
318// }
319// }
320//
321// /**
322// * Tries to cas nt as new tail.
323// */
324// void advanceTail( QNode* t, QNode* nt ) {
325// if( this->tail.get() == t ) {
326// this->tail.compareAndSet( t, nt );
327// }
328// }
329//
330// /**
331// * Tries to CAS cleanMe slot.
332// */
333// bool casCleanMe( QNode* cmp, QNode* val ) {
334// return ( this->cleanMe.get() == cmp &&
335// this->cleanMe.compareAndSet( cmp, val ) );
336// }
337
338 };
339
340}}}}
341
342#endif /* _DECAF_INTERNAL_UTIL_CONCURRENT_TRANSFERQUEUE_H_ */
virtual ~TransferQueue()
Definition TransferQueue.h:115
virtual E * transfer(bool timed, long long nanos)
Performs a take.
Definition TransferQueue.h:121
virtual void transfer(E *e, bool timed, long long nanos)
Performs a put.
Definition TransferQueue.h:117
TransferQueue()
Node class for TransferQueue.
Definition TransferQueue.h:109
Shared internal API for dual stacks and queues.
Definition Transferer.h:33
#define NULL
Definition Config.h:33
Definition Atomics.h:26
Definition ByteArrayAdapter.h:30
Definition AprPool.h:26
Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.
Definition AprPool.h:25