Class SequencedQueue


  • public class SequencedQueue
    extends BoundedWorkQueue
    This component extends the Bound Queue by guaranteeing delivery of CASes in sequential order. Large documents may be split into smaller chunks and and each is processed asynchronously. Since these chunks are processed at different speeds (in multi-pipeline CPE configurations), they may arrive at the queue out of order. The Cas Consumer may need those chunks in the correct order. This component checks each CAS metadata for a clue to see if the CAS is part of a larger sequence. If so, it sets its internal state so that it can expect the proper chunk to come in. A timer thread is used to make sure that this component does not wait indefinitely for expected chunk. If the timer goes off, the entire document ( and all its CASes) are invalidated.
    • Field Detail

      • chunkState

        private boolean chunkState
        The chunk state.
      • nextChunkMetadata

        private ChunkMetadata nextChunkMetadata
        The next chunk metadata.
      • timedOutDocs

        private java.util.HashMap timedOutDocs
        The timed out docs.
      • statusCbL

        protected java.util.ArrayList statusCbL
        The status cb L.
    • Constructor Detail

      • SequencedQueue

        public SequencedQueue​(int aQueueSize,
                              java.lang.String aQueueName,
                              CPMEngine aCpmEngine)
        Initialize this queue.
        Parameters:
        aQueueSize - - the size of the queue
        aQueueName - - the name of the queue
        aCpmEngine - - reference to the CPE
    • Method Detail

      • sequenceTimedOut

        private boolean sequenceTimedOut​(ChunkMetadata achunkMetadata)
        Sequence timed out.
        Parameters:
        achunkMetadata - the achunk metadata
        Returns:
        true if it timed out
      • timedOutCas

        private java.lang.Object timedOutCas​(int aQueueIndex)
        Returns a CAS that belong to a timedout chunk sequence. It wraps the CAS in QueueEntity and indicates that the CAS arrived late. This must be called while holding the class lock (e.g. via synch on the calling methods within this class).
        Parameters:
        aQueueIndex - - position in queue from the CAS should be extracted
        Returns:
        QueueEntity containing CAS that arrived late
      • dequeue

        public java.lang.Object dequeue()
        Removes an object from the front of the queue according to FIFO model. It sequences chunks so that they are returned in the right sequential order. It handles out of sequence CAS arrivals and returns it in a wraper.
        Overrides:
        dequeue in class BoundedWorkQueue
        Returns:
        object dequeued from the head of the queue
      • dequeue

        public java.lang.Object dequeue​(long aTimeout)
        Returns an object from the queue. It will wait for the object to show up in the queue until a given timer expires.
        Overrides:
        dequeue in class BoundedWorkQueue
        Parameters:
        aTimeout - - max millis to wait for an object
        Returns:
        - Object from the queue, or null if time out
      • invalidate

        public void invalidate​(CAS[] aCasObjectList)
        Description copied from class: BoundedWorkQueue
        Invalidate.
        Overrides:
        invalidate in class BoundedWorkQueue
        Parameters:
        aCasObjectList - the a cas object list
      • addDocToTimedOutDocs

        private void addDocToTimedOutDocs​(int aLifespan,
                                          java.lang.String aDocId)
        Adds the doc to timed out docs.
        Parameters:
        aLifespan - the a lifespan
        aDocId - the a doc id
      • doNotifyListeners

        protected void doNotifyListeners​(java.lang.Object aCas,
                                         EntityProcessStatus aEntityProcStatus)
        Notifies all configured listeners. Makes sure that appropriate type of Cas is sent to the listener. Conversions take place to ensure compatibility.
        Parameters:
        aCas - - Cas to pass to listener
        aEntityProcStatus - - status object containing exceptions and trace info