18#ifndef _DECAF_UTIL_CONCURRENT_LINKEDBLOCKINGQUEUE_H_
19#define _DECAF_UTIL_CONCURRENT_LINKEDBLOCKINGQUEUE_H_
39 using decaf::lang::Pointer;
55 template<
typename U >
69 QueueNode(
const QueueNode&);
74 QueueNode() : value(), unlinked(
false), dequeued(
false), next() {}
75 QueueNode(
const U& value) : value(value), unlinked(
false), dequeued(
false), next() {}
77 void set(
Pointer< QueueNode<U> > next,
const U& value) {
80 this->unlinked =
false;
81 this->dequeued =
false;
89 E result = this->value;
91 this->dequeued =
true;
98 this->unlinked =
true;
101 bool isUnlinked()
const {
102 return this->unlinked;
105 bool isDequeued()
const {
106 return this->dequeued;
113 TotalLock(
const TotalLock& src);
114 TotalLock&
operator=(
const TotalLock& src);
123 parent->putLock.
lock();
124 parent->takeLock.
lock();
129 parent->takeLock.
unlock();
160 takeLock(), notEmpty(), putLock(), notFull(), head(new QueueNode<E>()), tail() {
162 this->tail = this->head;
164 this->notFull.reset(this->putLock.newCondition());
176 takeLock(), notEmpty(), putLock(), notFull(), head(new QueueNode<E>()), tail() {
179 __FILE__, __LINE__,
"Capacity value must be greater than zero.");
182 this->tail = this->head;
184 this->notFull.reset(this->putLock.newCondition());
198 capacity(
lang::Integer::MAX_VALUE), count(),
199 takeLock(), notEmpty(), putLock(), notFull(),
200 head(new QueueNode<E>()), tail() {
202 this->tail = this->head;
204 this->notFull.reset(this->putLock.newCondition());
212 while(iter->hasNext()) {
213 if(count == this->capacity) {
214 throw decaf::lang::exceptions::IllegalStateException( __FILE__, __LINE__,
215 "Number of elements in the Collection exceeds this Queue's Capacity.");
218 this->enqueue(iter->next());
222 this->count.set(count);
240 capacity(
lang::Integer::MAX_VALUE), count(),
241 takeLock(), notEmpty(), putLock(), notFull(),
242 head(new QueueNode<E>()), tail() {
244 this->tail = this->head;
245 this->notEmpty.reset(this->takeLock.newCondition());
246 this->notFull.reset(this->putLock.newCondition());
254 while(iter->hasNext()) {
255 if(count == this->capacity) {
256 throw decaf::lang::exceptions::IllegalStateException( __FILE__, __LINE__,
257 "Number of elements in the Collection exceeds this Queue's Capacity.");
260 this->enqueue(iter->next());
264 this->count.set(count);
294 return this->count.get();
299 TotalLock
lock(
this);
302 this->tail = this->head;
305 if(this->count.getAndSet(0) == this->capacity) {
306 this->notFull->signal();
311 return this->capacity - this->count.get();
314 virtual void put(
const E& value ) {
318 this->putLock.lockInterruptibly();
326 while (this->count.get() == this->capacity) {
327 this->notFull->await();
335 c = this->count.getAndIncrement();
337 if(c + 1 < this->capacity) {
338 this->notFull->signal();
341 this->putLock.unlock();
345 this->putLock.unlock();
351 this->signalNotEmpty();
355 virtual bool offer(
const E& value,
long long timeout,
const TimeUnit& unit ) {
358 long long nanos = unit.
toNanos(timeout);
360 this->putLock.lockInterruptibly();
363 while(this->count.get() == this->capacity) {
368 nanos = this->notFull->awaitNanos(nanos);
372 c = this->count.getAndIncrement();
374 if(c + 1 < this->capacity) {
375 this->notFull->signal();
379 this->putLock.unlock();
383 this->putLock.unlock();
386 this->signalNotEmpty();
392 virtual bool offer(
const E& value) {
394 if (this->count.get() == this->capacity) {
399 this->putLock.lockInterruptibly();
402 if (this->count.get() < this->capacity) {
405 c = this->count.getAndIncrement();
407 if (c + 1 < this->capacity) {
408 this->notFull->signal();
413 this->putLock.unlock();
417 this->putLock.unlock();
420 this->signalNotEmpty();
431 this->takeLock.lockInterruptibly();
434 while (this->count.get() == 0) {
435 this->notEmpty->await();
442 c = this->count.getAndDecrement();
445 this->notEmpty->signal();
449 this->takeLock.unlock();
453 this->takeLock.unlock();
458 if (c == this->capacity) {
459 this->signalNotFull();
465 virtual bool poll(E& result,
long long timeout,
const TimeUnit& unit) {
467 long long nanos = unit.
toNanos(timeout);
469 this->takeLock.lockInterruptibly();
472 while (this->count.get() == 0) {
477 nanos = this->notEmpty->awaitNanos(nanos);
481 c = this->count.getAndDecrement();
484 this->notEmpty->signal();
488 this->takeLock.unlock();
492 this->takeLock.unlock();
494 if(c == this->capacity) {
495 this->signalNotFull();
503 if (this->count.get() == 0) {
508 this->takeLock.lock();
511 if (this->count.get() > 0) {
513 c = this->count.getAndDecrement();
516 this->notEmpty->signal();
521 this->takeLock.unlock();
525 this->takeLock.unlock();
527 if (c == this->capacity) {
528 this->signalNotFull();
534 virtual bool peek(E& result)
const {
536 if(this->count.get() == 0) {
540 this->takeLock.lock();
546 result = front->get();
549 this->takeLock.unlock();
553 this->takeLock.unlock();
562 TotalLock
lock(
this);
564 for(
Pointer< QueueNode<E> > predicessor = this->head, p = predicessor->next; p !=
NULL;
565 predicessor = p, p = p->next) {
567 if(value == p->get()) {
568 unlink(p, predicessor);
578 TotalLock
lock(
this);
580 int size = this->count.get();
581 std::vector<E> array;
584 for(
Pointer< QueueNode<E> > p = this->head->next; p !=
NULL; p = p->next) {
585 array.push_back(p->get());
592 return std::string(
"LinkedBlockingQueue [ current size = ") +
604 "Cannot drain this Collection to itself.");
607 bool signalNotFull =
false;
608 bool shouldThrow =
false;
612 this->takeLock.lock();
625 sink.
add( p->getAndDequeue() );
637 signalNotFull = (this->count.getAndAdd(-i) == this->capacity);
641 this->takeLock.unlock();
645 this->takeLock.unlock();
648 this->signalNotFull();
660 class LinkedIterator :
public Iterator<E> {
670 LinkedIterator(
const LinkedIterator&);
671 LinkedIterator& operator= (
const LinkedIterator&);
676 currentElement(), parent(parent) {
677 TotalLock lock(parent);
679 this->current = parent->head->next;
680 if(this->current !=
NULL) {
681 this->currentElement = current->get();
685 virtual bool hasNext()
const {
686 return this->current !=
NULL;
691 TotalLock lock(this->parent);
693 if(this->current ==
NULL) {
695 "Iterator next called with no matching next element.");
698 E result = this->currentElement;
699 this->last = this->current;
700 this->current = this->nextNode(this->current);
701 this->currentElement = (this->current ==
NULL) ? E() : this->current->get();
706 virtual void remove() {
708 if(this->last ==
NULL) {
709 throw decaf::lang::exceptions::IllegalStateException(__FILE__, __LINE__,
710 "Iterator remove called without having called next().");
713 TotalLock lock(this->parent);
715 Pointer< QueueNode<E> > node;
716 node.swap(this->last);
718 for(Pointer< QueueNode<E> > trail = this->parent->head, p = trail->next; p !=
NULL;
719 trail = p, p = p->next) {
722 this->parent->unlink(p, trail);
730 Pointer< QueueNode<E> > nextNode(Pointer< QueueNode<E> >& p) {
734 if(p->isDequeued()) {
735 return this->parent->head->next;
738 Pointer< QueueNode<E> > s = p->next;
745 while(s !=
NULL && s->isUnlinked()) {
754 class ConstLinkedIterator :
public Iterator<E> {
757 Pointer< QueueNode<E> > current;
758 Pointer< QueueNode<E> > last;
760 const LinkedBlockingQueue<E>* parent;
764 ConstLinkedIterator(
const ConstLinkedIterator&);
765 ConstLinkedIterator& operator= (
const ConstLinkedIterator&);
769 ConstLinkedIterator(
const LinkedBlockingQueue<E>* parent) : current(), last(),
772 TotalLock lock(parent);
774 this->current = parent->head->next;
775 if(this->current !=
NULL) {
776 this->currentElement = current->get();
780 virtual bool hasNext()
const {
781 return this->current !=
NULL;
786 TotalLock lock(this->parent);
788 if(this->current ==
NULL) {
789 throw decaf::util::NoSuchElementException(__FILE__, __LINE__,
790 "Iterator next called with no matching next element.");
793 E result = this->currentElement;
794 this->last = this->current;
795 this->current = this->nextNode(this->current);
796 this->currentElement = (this->current ==
NULL) ? E() : this->current->get();
801 virtual void remove() {
802 throw lang::exceptions::UnsupportedOperationException(
803 __FILE__, __LINE__,
"Cannot write to a const ListIterator." );
808 Pointer< QueueNode<E> > nextNode(Pointer< QueueNode<E> >& p) {
812 if(p->isDequeued()) {
813 return this->parent->head->next;
816 Pointer< QueueNode<E> > s = p->next;
823 while(s !=
NULL && s->isUnlinked()) {
835 return new LinkedIterator(
this);
839 return new ConstLinkedIterator(
this);
844 void unlink(
Pointer< QueueNode<E> >& p,
Pointer< QueueNode<E> >& predicessor) {
851 predicessor->next = p->next;
853 if(this->tail == p) {
854 this->tail = predicessor;
857 if(this->count.getAndDecrement() == capacity) {
858 this->signalNotFull();
862 void signalNotEmpty() {
863 this->takeLock.lock();
865 this->notEmpty->signal();
866 }
catch(decaf::lang::Exception& ex) {
867 this->takeLock.unlock();
870 this->takeLock.unlock();
873 void signalNotFull() {
874 this->putLock.lock();
876 this->notFull->signal();
877 }
catch(decaf::lang::Exception& ex) {
878 this->putLock.unlock();
881 this->putLock.unlock();
885 void enqueue(E value) {
886 Pointer< QueueNode<E> > newTail(
new QueueNode<E>(value) );
887 this->tail->next = newTail;
888 this->tail = newTail;
893 Pointer< QueueNode<E> > temp = this->head;
894 Pointer< QueueNode<E> > newHead = temp->next;
895 this->head = newHead;
897 return newHead->getAndDequeue();
901 Pointer< QueueNode<E> > current = this->head->next;
902 Pointer< QueueNode<E> > temp;
903 while(current !=
NULL) {
905 current = current->next;
906 temp->next.reset(
NULL);
Definition Exception.h:38
static const int MAX_VALUE
The maximum value that the primitive type can hold.
Definition Integer.h:45
std::string toString() const
virtual decaf::util::Iterator< E > * iterator()=0
static short min(short a, short b)
Returns the double value that is closest in value to the argument and is equal to a mathematical inte...
Definition Math.h:346
Decaf's implementation of a Smart Pointer that is a template on a Type and is Thread Safe if the defa...
Definition Pointer.h:53
void reset(T *value=NULL)
Resets the Pointer to hold the new value.
Definition Pointer.h:161
Definition IllegalArgumentException.h:31
Definition IllegalStateException.h:32
virtual void lock()
Locks the object.
Definition AbstractCollection.h:344
This class provides skeletal implementations of some Queue operations.
Definition AbstractQueue.h:48
virtual bool addAll(const Collection< E > &collection)
Adds all of the elements in the specified collection to this collection.The behavior of this operatio...
Definition AbstractQueue.h:78
The root interface in the collection hierarchy.
Definition Collection.h:69
virtual bool add(const E &value)=0
Returns true if this collection changed as a result of the call.
Defines an object that can be used to iterate over the elements of a collection.
Definition Iterator.h:34
Definition NoSuchElementException.h:31
A decaf::util::Queue that additionally supports operations that wait for the queue to become non-empt...
Definition BlockingQueue.h:164
A BlockingQueue derivative that allows for a bound to be placed on the number of elements that can be...
Definition LinkedBlockingQueue.h:52
virtual void clear()
Removes all of the elements from this collection (optional operation).This collection will be empty a...
Definition LinkedBlockingQueue.h:297
virtual E take()
Retrieves and removes the head of this queue, waiting if necessary until an element becomes available...
Definition LinkedBlockingQueue.h:426
virtual void put(const E &value)
Inserts the specified element into this queue, waiting if necessary for space to become available.
Definition LinkedBlockingQueue.h:314
LinkedBlockingQueue(const LinkedBlockingQueue &queue)
Create a new instance with a Capacity of Integer::MAX_VALUE and adds all the values contained in the ...
Definition LinkedBlockingQueue.h:239
virtual decaf::util::Iterator< E > * iterator()
Definition LinkedBlockingQueue.h:834
virtual bool peek(E &result) const
Gets but not removes the element in the head of the queue.
Definition LinkedBlockingQueue.h:534
virtual int drainTo(Collection< E > &sink, int maxElements)
Removes at most the given number of available elements from this queue and adds them to the given col...
Definition LinkedBlockingQueue.h:600
virtual ~LinkedBlockingQueue()
Definition LinkedBlockingQueue.h:271
virtual bool offer(const E &value)
Inserts the specified element into the queue provided that the condition allows such an operation.
Definition LinkedBlockingQueue.h:392
virtual bool remove(const E &value)
Removes a single instance of the specified element from the collection.
Definition LinkedBlockingQueue.h:560
virtual int size() const
Returns the number of elements in this collection.
Definition LinkedBlockingQueue.h:293
virtual int remainingCapacity() const
Returns the number of additional elements that this queue can ideally (in the absence of memory or re...
Definition LinkedBlockingQueue.h:310
virtual std::vector< E > toArray() const
Returns an array containing all of the elements in this collection.
Definition LinkedBlockingQueue.h:576
virtual std::string toString() const
Definition LinkedBlockingQueue.h:591
LinkedBlockingQueue(const Collection< E > &collection)
Create a new instance with a Capacity of Integer::MAX_VALUE and adds all the values contained in the ...
Definition LinkedBlockingQueue.h:197
virtual decaf::util::Iterator< E > * iterator() const
Definition LinkedBlockingQueue.h:838
LinkedBlockingQueue()
Create a new instance with a Capacity of Integer::MAX_VALUE.
Definition LinkedBlockingQueue.h:159
virtual bool poll(E &result, long long timeout, const TimeUnit &unit)
Retrieves and removes the head of this queue, waiting up to the specified wait time if necessary for ...
Definition LinkedBlockingQueue.h:465
virtual bool offer(const E &value, long long timeout, const TimeUnit &unit)
Inserts the specified element into this queue, waiting up to the specified wait time if necessary for...
Definition LinkedBlockingQueue.h:355
virtual bool poll(E &result)
Gets and removes the element in the head of the queue.
Definition LinkedBlockingQueue.h:501
LinkedBlockingQueue< E > & operator=(const LinkedBlockingQueue< E > &queue)
Definition LinkedBlockingQueue.h:279
LinkedBlockingQueue(int capacity)
Create a new instance with the given initial capacity value.
Definition LinkedBlockingQueue.h:175
virtual int drainTo(Collection< E > &c)
Removes all available elements from this queue and adds them to the given collection.
Definition LinkedBlockingQueue.h:596
A TimeUnit represents time durations at a given unit of granularity and provides utility methods to c...
Definition TimeUnit.h:62
long long toNanos(long long duration) const
Equivalent to NANOSECONDS.convert(duration, this).
Definition TimeUnit.h:126
An int value that may be updated atomically.
Definition AtomicInteger.h:37
A reentrant mutual exclusion Lock with extended capabilities.
Definition ReentrantLock.h:80
virtual void unlock()
Attempts to release this lock.
virtual Condition * newCondition()
Returns a Condition instance for use with this Lock instance.
virtual void lock()
Acquires the lock.
#define DECAF_CATCH_RETHROW(type)
Macro for catching and rethrowing an exception of a given type.
Definition ExceptionDefines.h:27
#define DECAF_CATCHALL_THROW(type)
A catch-all that throws a known exception.
Definition ExceptionDefines.h:50
#define NULL
Definition Config.h:33
Definition ThreadingTypes.h:31
Definition AbstractExecutorService.h:28
Definition AbstractCollection.h:33
Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.
Definition AprPool.h:25