Class ProcessingUnit
- java.lang.Object
-
- org.apache.uima.collection.impl.cpm.engine.ProcessingUnit
-
- All Implemented Interfaces:
java.lang.Runnable
public class ProcessingUnit extends java.lang.Object implements java.lang.Runnable
This component executes the processing pipeline. Running in a separate thread it continuously reads bundles of CAS from the Work Queue filled byArtifactProducer
and sends it through configured CasProcessors. The sequence in which CasProcessors are invoked is defined by the order of CAS Processor listing in the CPE descriptor. The results of analysis produced be CAS Processors is enqueued onto an output queue that is shared with CAS Consumers.
-
-
Field Summary
Fields Modifier and Type Field Description protected java.lang.Object[]
artifact
The artifact.private CAS[]
casCache
The cas cache.protected CAS[]
casList
The cas list.protected CPECasPool
casPool
The cas pool.protected CAS
conversionCas
The conversion cas.protected CAS[]
conversionCasArray
The conversion cas array.protected CpeConfiguration
cpeConfiguration
The cpe configuration.protected CPMEngine
cpm
The cpm.private boolean
isCasConsumerPipeline
The is cas consumer pipeline.private boolean
isRunning
The is running.protected CasConverter
mConverter
The m converter.private java.lang.String
name
protected boolean
notifyListeners
The notify listeners.protected long
numToProcess
The num to process.protected BoundedWorkQueue
outputQueue
The output queue.protected java.util.LinkedList
processContainers
The process containers.protected ProcessTrace
processingUnitProcessTrace
The processing unit process trace.protected boolean
releaseCAS
The release CAS.protected java.util.ArrayList
statusCbL
The status cb L.private static java.lang.String
thisClassName
The Constant thisClassName.protected java.lang.String
threadId
The thread id.int
threadState
The thread state.protected UimaTimer
timer
The timer.long
timer01
The timer 01.long
timer02
The timer 02.long
timer03
The timer 03.long
timer04
The timer 04.long
timer05
The timer 05.long
timer06
The timer 06.protected BoundedWorkQueue
workQueue
The work queue.private static java.lang.Object[]
zeroLengthObjectArray
loggers Special forms for frequent args sets "maybe" versions test isLoggable Additional args passed as object array to logger.
-
Constructor Summary
Constructors Constructor Description ProcessingUnit()
Instantiates a new processing unit.ProcessingUnit(CPMEngine acpm)
Instantiates a new processing unit.ProcessingUnit(CPMEngine acpm, BoundedWorkQueue aInputQueue, BoundedWorkQueue aOutputQueue)
Initialize the PU.
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description void
addStatusCallbackListener(BaseStatusCallbackListener aListener)
Plugs in Listener object used for notifications.protected boolean
analyze(java.lang.Object[] aCasObjectList, ProcessTrace pTrTemp)
An alternate processing loop designed for the single-threaded CPM.void
cleanup()
Null out fields of this object.private void
clearCasCache()
Releases all CAS instances from the Cache back to the Cas Pool.boolean
consumeQueue()
Consumes the input queue to make sure all bundles still there get processede before CPE terminates.private boolean
containerDisabled(ProcessingContainer aContainer)
Container disabled.private void
convertCasDataToCasObject(int casIndex, java.lang.String aContainerName, java.lang.Object[] aCasObjectList)
Convert cas data to cas object.void
disableCasProcessor(int aCasProcessorIndex)
Disable a CASProcessor in the processing pipeline.void
disableCasProcessor(java.lang.String aCasProcessorName)
Alternative method to disable Cas Processor.private void
doEndOfBatch(ProcessingContainer aContainer, CasProcessor aProcessor, ProcessTrace aProcessTr, int howManyCases)
Do end of batch.private void
doEndOfBatchProcessing(ProcessingContainer aContainer, CasProcessor aProcessor, ProcessTrace aProcessTr, java.lang.Object[] aCasObjectList)
Performs end of batch processing.protected void
doNotifyListeners(java.lang.Object aCas, boolean isCasObject, EntityProcessStatus aEntityProcStatus)
Notifies all configured listeners.private void
doReleaseCasProcessor(ProcessingContainer aContainer, CasProcessor aCasProcessor)
Do release cas processor.void
enableCasProcessor(java.lang.String aCasProcessorName)
Enables Cas Processor with a given name.protected boolean
endOfProcessingReached(long aCount)
Returns true if the CPM has finished analyzing the collection.private boolean
filterOutTheCAS(ProcessingContainer aContainer, boolean isCasObject, java.lang.Object[] aCasObjectList)
Filter out the CAS.protected long
getBytes(java.lang.Object aCas)
Returns the size of the CAS object.java.util.ArrayList
getCallbackListeners()
Returns list of listeners used by this PU for callbacks.java.lang.String
getName()
private ProcessTrace
getProcessTrace()
Returns aProcessTrace
instance used by this component.private void
handleAbortCasProcessor(ProcessingContainer aContainer, CasProcessor aProcessor)
Diables currect CasProcessor.private void
handleAbortCPM(ProcessingContainer aContainer, CasProcessor aProcessor)
Terminates the CPM.private void
handleEOFToken()
Handles EOFToken.private boolean
handleErrors(java.lang.Throwable e, ProcessingContainer aContainer, CasProcessor aProcessor, ProcessTrace aProcessTrace, java.lang.Object[] aCasObjectList, boolean isCasObject)
Main routine that handles errors occuring in the processing loop.private void
handleKillPipeline(ProcessingContainer aContainer)
Terminates the CPM.private void
handleServiceException(ProcessingContainer aContainer, CasProcessor aProcessor, ProcessTrace aProcessTr, java.lang.Exception ex)
Handle exceptions related to remote invocations.private void
handleSkipCasProcessor(ProcessingContainer aContainer, java.lang.Object[] aCasObjectList, boolean isLastCP)
In case a CAS is skipped ( due to excessive exceptions that it causes ), increments stats and totals.private void
invokeCasDataCasProcessor(ProcessingContainer container, CasProcessor processor, java.lang.Object[] aCasObjectList, ProcessTrace pTrTemp, boolean isCasObject, boolean retry)
Invoke cas data cas processor.private void
invokeCasObjectCasProcessor(ProcessingContainer container, CasProcessor processor, java.lang.Object[] aCasObjectList, ProcessTrace pTrTemp, boolean isCasObject)
Invoke cas object cas processor.boolean
isCasConsumerPipeline()
Checks if is cas consumer pipeline.private void
isCpmPaused()
Checks if is cpm paused.protected boolean
isProcessorReady(int aStatus)
Check if the CASProcessor status is available for processing.boolean
isRunning()
Returns true if this component is in running state.private void
logCPM(Level level, java.lang.String msgBundleId, java.lang.Object[] args)
Log CPM.private void
logFinest(java.lang.String msgBundleId)
Log finest.private void
logFinest(java.lang.String msgBundleId, java.lang.String arg1)
Log finest.private void
logFinest(java.lang.String msgBundleId, java.lang.String arg1, java.lang.String arg2)
Log finest.private void
logFinest(java.lang.String msgBundleId, java.lang.String arg1, java.lang.String arg2, java.lang.String arg3)
Log finest.private void
logFinest(java.lang.String msgBundleId, ProcessingContainer container, CasProcessor processor)
Log finest.private void
logMemoryFinest()
Log memory finest.private void
logSevere(java.lang.String msgBundleId, java.lang.String arg1)
Log severe.private void
logSevere(java.lang.String msgBundleId, java.lang.String arg1, java.lang.String arg2)
Log severe.private void
logSevere(java.lang.String msgBundleId, java.lang.String arg1, java.lang.String arg2, java.lang.String arg3)
Log severe.private void
logWarning(java.lang.String msgBundleId)
Log warning.private void
logWarning(java.lang.String msgBundleId, java.lang.String arg1, java.lang.String arg2)
Log warning.private void
maybeLogFinest(java.lang.String msgBundleId)
Maybe log finest.private void
maybeLogFinest(java.lang.String msgBundleId, java.lang.String arg1)
Maybe log finest.private void
maybeLogFinest(java.lang.String msgBundleId, java.lang.String arg1, java.lang.String arg2)
Maybe log finest.private void
maybeLogFinest(java.lang.String msgBundleId, CAS[] casCache)
Maybe log finest.private void
maybeLogFinest(java.lang.String msgBundleId, CasProcessor processor)
Maybe log finest.private void
maybeLogFinest(java.lang.String msgBundleId, ProcessingContainer container)
Maybe log finest.private void
maybeLogFinest(java.lang.String msgBundleId, ProcessingContainer container, CasProcessor processor)
Maybe log finest.private void
maybeLogFinest(java.lang.String msgBundleId, ProcessingContainer container, CasProcessor processor, CAS[] casCache)
Maybe log finest.private void
maybeLogFinestWorkQueue(java.lang.String msgBundleId, BoundedWorkQueue workQueue)
Maybe log finest work queue.private void
maybeLogMemoryFinest()
Maybe log memory finest.private void
maybeLogSevere(java.lang.String msgBundleId)
Maybe log severe.private void
maybeLogSevere(java.lang.String msgBundleId, java.lang.String arg1)
Maybe log severe.private void
maybeLogSevere(java.lang.String msgBundleId, java.lang.String arg1, java.lang.String arg2)
Maybe log severe.private void
maybeLogSevere(java.lang.String msgBundleId, java.lang.String arg1, java.lang.String arg2, java.lang.String arg3)
Maybe log severe.private void
maybeLogSevereException(java.lang.Throwable e)
Maybe log severe exception.private void
maybeLogWarning(java.lang.String msgBundleId, java.lang.String arg1, java.lang.String arg2)
Maybe log warning.protected void
notifyListeners(java.lang.Object aCas, boolean isCasObject, EntityProcessStatus aEntityProcStatus)
Notifies Listeners of the fact that the pipeline has finished processing the current set Cas'es.private boolean
pauseContainer(ProcessingContainer aContainer, java.lang.Exception aException, java.lang.String aThreadId)
Determines if the thread should be paused.private void
postAnalysis(java.lang.Object[] aCasObjectList, boolean isCasObject, java.lang.Object[] casObjects, ProcessTrace aProcessTr, boolean doneAlready)
Notifies application listeners of completed analysis and stores results of analysis (CAS) in the Output Queue that this thread shares with a Cas Consumer thread.protected void
process(java.lang.Object anArtifact)
Process.protected boolean
processNext(java.lang.Object[] aCasObjectList, ProcessTrace pTrTemp)
Executes the processing pipeline.private void
releaseCases(java.lang.Object aCasList, boolean lastProcessor, java.lang.String aName)
Conditionally, releases CASes back to the CAS pool.private void
releaseTimedOutCases(java.lang.Object[] artifact)
Release CAS back to the CAS Pool.void
removeStatusCallbackListener(BaseStatusCallbackListener aListener)
Removes given listener from the list of listeners.void
run()
Starts the Processing Pipeline thread.void
setCasConsumerPipelineIdentity()
Define a CasConsumer Pipeline identity for this instance.void
setCasPool(CPECasPool aPool)
Sets the cas pool.void
setContainers(java.util.LinkedList processorList)
Plugs in a list of Cas Processor containers.void
setCPMEngine(CPMEngine acpm)
Alternative method of providing the reference to the component managing the lifecycle of the CPE.void
setInputQueue(BoundedWorkQueue aInputQueue)
Alternative method of providing a queue from which this PU will read bundle of Cas.void
setName(java.lang.String aName)
void
setNotifyListeners(boolean aDoNotify)
Set a flag indicating if notifications should be made via configured Listeners.void
setOutputQueue(BoundedWorkQueue aOutputQueue)
Alternative method of providing a queue where this PU will deposit results of analysis.void
setProcessingUnitProcessTrace(ProcessTrace aProcessingUnitProcessTrace)
Plugs in ProcessTrace object used to collect statistics.void
setReleaseCASFlag(boolean aFlag)
Called by the CPMEngine during setup to indicate that this thread is supposed to release a CAS at the end of processing.void
setUimaTimer(UimaTimer aTimer)
Plugs in custom timer used by the PU for getting time.protected void
showMetadata(java.lang.Object[] aCasList)
Show metadata.void
stopCasProcessors(boolean kill)
Stops all Cas Processors that are part of this PU.
-
-
-
Field Detail
-
threadState
public int threadState
The thread state.
-
casPool
protected CPECasPool casPool
The cas pool.
-
releaseCAS
protected boolean releaseCAS
The release CAS.
-
cpm
protected CPMEngine cpm
The cpm.
-
workQueue
protected BoundedWorkQueue workQueue
The work queue.
-
outputQueue
protected BoundedWorkQueue outputQueue
The output queue.
-
mConverter
protected CasConverter mConverter
The m converter.
-
processingUnitProcessTrace
protected ProcessTrace processingUnitProcessTrace
The processing unit process trace.
-
processContainers
protected java.util.LinkedList processContainers
The process containers.
-
numToProcess
protected long numToProcess
The num to process.
-
casList
protected CAS[] casList
The cas list.
-
statusCbL
protected java.util.ArrayList statusCbL
The status cb L.
-
notifyListeners
protected boolean notifyListeners
The notify listeners.
-
conversionCas
protected CAS conversionCas
The conversion cas.
-
artifact
protected java.lang.Object[] artifact
The artifact.
-
conversionCasArray
protected CAS[] conversionCasArray
The conversion cas array.
-
timer
protected UimaTimer timer
The timer.
-
threadId
protected java.lang.String threadId
The thread id.
-
cpeConfiguration
protected CpeConfiguration cpeConfiguration
The cpe configuration.
-
casCache
private CAS[] casCache
The cas cache.
-
isCasConsumerPipeline
private boolean isCasConsumerPipeline
The is cas consumer pipeline.
-
isRunning
private boolean isRunning
The is running.
-
timer01
public long timer01
The timer 01.
-
timer02
public long timer02
The timer 02.
-
timer03
public long timer03
The timer 03.
-
timer04
public long timer04
The timer 04.
-
timer05
public long timer05
The timer 05.
-
timer06
public long timer06
The timer 06.
-
name
private java.lang.String name
-
zeroLengthObjectArray
private static final java.lang.Object[] zeroLengthObjectArray
loggers Special forms for frequent args sets "maybe" versions test isLoggable Additional args passed as object array to logger.
-
thisClassName
private static final java.lang.String thisClassName
The Constant thisClassName.
-
-
Constructor Detail
-
ProcessingUnit
public ProcessingUnit()
Instantiates a new processing unit.
-
ProcessingUnit
public ProcessingUnit(CPMEngine acpm, BoundedWorkQueue aInputQueue, BoundedWorkQueue aOutputQueue)
Initialize the PU.- Parameters:
acpm
- - component managing life cycle of the CPEaInputQueue
- - queue to read fromaOutputQueue
- - queue to write to
-
ProcessingUnit
public ProcessingUnit(CPMEngine acpm)
Instantiates a new processing unit.- Parameters:
acpm
- the acpm
-
-
Method Detail
-
setName
public void setName(java.lang.String aName)
-
getName
public java.lang.String getName()
-
isRunning
public boolean isRunning()
Returns true if this component is in running state.- Returns:
- - true if running, false otherwise
-
setCasConsumerPipelineIdentity
public void setCasConsumerPipelineIdentity()
Define a CasConsumer Pipeline identity for this instance.
-
isCasConsumerPipeline
public boolean isCasConsumerPipeline()
Checks if is cas consumer pipeline.- Returns:
- true, if is cas consumer pipeline
-
setInputQueue
public void setInputQueue(BoundedWorkQueue aInputQueue)
Alternative method of providing a queue from which this PU will read bundle of Cas.- Parameters:
aInputQueue
- - read queue
-
setOutputQueue
public void setOutputQueue(BoundedWorkQueue aOutputQueue)
Alternative method of providing a queue where this PU will deposit results of analysis.- Parameters:
aOutputQueue
- - queue to write to
-
setCPMEngine
public void setCPMEngine(CPMEngine acpm)
Alternative method of providing the reference to the component managing the lifecycle of the CPE.- Parameters:
acpm
- - reference to the contrlling engine
-
cleanup
public void cleanup()
Null out fields of this object. Call this only when this object is no longer needed.
-
setNotifyListeners
public void setNotifyListeners(boolean aDoNotify)
Set a flag indicating if notifications should be made via configured Listeners.- Parameters:
aDoNotify
- - true if notification is required, false otherwise
-
addStatusCallbackListener
public void addStatusCallbackListener(BaseStatusCallbackListener aListener)
Plugs in Listener object used for notifications.- Parameters:
aListener
- -BaseStatusCallbackListener
instance
-
getCallbackListeners
public java.util.ArrayList getCallbackListeners()
Returns list of listeners used by this PU for callbacks.- Returns:
- - lif of
BaseStatusCallbackListener
instances
-
removeStatusCallbackListener
public void removeStatusCallbackListener(BaseStatusCallbackListener aListener)
Removes given listener from the list of listeners.- Parameters:
aListener
- - object to remove from the list
-
setProcessingUnitProcessTrace
public void setProcessingUnitProcessTrace(ProcessTrace aProcessingUnitProcessTrace)
Plugs in ProcessTrace object used to collect statistics.- Parameters:
aProcessingUnitProcessTrace
- - object to compile stats
-
setUimaTimer
public void setUimaTimer(UimaTimer aTimer)
Plugs in custom timer used by the PU for getting time.- Parameters:
aTimer
- - custom timer to use
-
setContainers
public void setContainers(java.util.LinkedList processorList)
Plugs in a list of Cas Processor containers. During processing Cas Processors in this list are called sequentially. Each Cas Processor is contained in the container that is managing errors, counts and totals, and restarts.- Parameters:
processorList
- CASProcessor to be added to the processing pipeline
-
disableCasProcessor
public void disableCasProcessor(int aCasProcessorIndex)
Disable a CASProcessor in the processing pipeline. Locate it by provided index. The disabled Cas Processor remains in the Processing Pipeline, however it is not used furing processing.- Parameters:
aCasProcessorIndex
- - location in the pipeline of the Cas Processor to delete
-
disableCasProcessor
public void disableCasProcessor(java.lang.String aCasProcessorName)
Alternative method to disable Cas Processor. Uses a name to locate it.- Parameters:
aCasProcessorName
- - a name of the Cas Processor to disable
-
enableCasProcessor
public void enableCasProcessor(java.lang.String aCasProcessorName)
Enables Cas Processor with a given name. Enabled Cas Processor will immediately begin to receive bundles of Cas.- Parameters:
aCasProcessorName
- - name of the Cas Processor to enable
-
getProcessTrace
private ProcessTrace getProcessTrace()
Returns aProcessTrace
instance used by this component.- Returns:
- - ProcessTrace instance
-
handleEOFToken
private void handleEOFToken() throws java.lang.Exception
Handles EOFToken. This object is received when the CPM terminates. This token is passed to each running processing thread and cas consumer thread to allow orderly shutdown. The EOFToken may be generated by ArtifactProducer if end of collection is reached, or the CPM itself can place it in the Work Queue to force all processing threads to stop.- Throws:
java.lang.Exception
- -
-
releaseTimedOutCases
private void releaseTimedOutCases(java.lang.Object[] artifact)
Release CAS back to the CAS Pool. This method is only used when chunk-aware queue is used. When a document is chunked each chunk represents a portion of the document. These chunks are ingested in sequential order by the Cas Consumer. The delivery of chunks in the correct sequence ( chunk seg 1 before chunk sequence 2) is guaranteed. Since chunks are processed asynchronously ( if multi pipeline configuration is used), they may arrive in the queue out of sequence. If this happens the Cas Consumer will wait for an expected chunk sequence. If such chunk does not arrive in configured interval the entire sequence ( all related chunks (CASes) ) are invalidated. Invalidated in the sense that they are marked as timed out. Each CAS will be released back to the CAS Pool.- Parameters:
artifact
- - an array of CAS instances
-
isCpmPaused
private void isCpmPaused()
Checks if is cpm paused.
-
run
public void run()
Starts the Processing Pipeline thread. This thread waits for an artifact to arrive on configured Work Queue. Once the CAS arrives, it is removed from the queue and sent through the analysis pipeline.- Specified by:
run
in interfacejava.lang.Runnable
-
clearCasCache
private void clearCasCache()
Releases all CAS instances from the Cache back to the Cas Pool. Cas Cache is used as optimization to store CAS in case it is needed for conversion. Specifically, in configurations that use XCAS and CAS based AEs.
-
consumeQueue
public boolean consumeQueue()
Consumes the input queue to make sure all bundles still there get processede before CPE terminates.- Returns:
- true, if successful
-
processNext
protected boolean processNext(java.lang.Object[] aCasObjectList, ProcessTrace pTrTemp) throws ResourceProcessException, java.io.IOException, CollectionException, AbortCPMException, KillPipelineException
Executes the processing pipeline. Given bundle of CAS instances is processed by each CAS Processor in the pipeline. Conversions between different types of CAS Processors is done on the fly. Two types of CAS Processors are currently supported:- CasDataProcessor
- CasObjectProcessor
- Parameters:
aCasObjectList
- - bundle of CAS to analyzepTrTemp
- - object used to aggregate stats- Returns:
- true, if successful
- Throws:
ResourceProcessException
- the resource process exceptionjava.io.IOException
- Signals that an I/O exception has occurred.CollectionException
- the collection exceptionAbortCPMException
- the abort CPM exceptionKillPipelineException
- the kill pipeline exception
-
postAnalysis
private void postAnalysis(java.lang.Object[] aCasObjectList, boolean isCasObject, java.lang.Object[] casObjects, ProcessTrace aProcessTr, boolean doneAlready) throws java.lang.Exception
Notifies application listeners of completed analysis and stores results of analysis (CAS) in the Output Queue that this thread shares with a Cas Consumer thread.- Parameters:
aCasObjectList
- - List of Artifacts just analyzedisCasObject
- - determines the types of CAS just analyzed ( CasData vs CasObject)casObjects
- the cas objectsaProcessTr
- - ProcessTrace object holding events and statsdoneAlready
- - flag to indicate if the last Cas Processor was released back to its container- Throws:
java.lang.Exception
- -
-
doEndOfBatchProcessing
private void doEndOfBatchProcessing(ProcessingContainer aContainer, CasProcessor aProcessor, ProcessTrace aProcessTr, java.lang.Object[] aCasObjectList)
Performs end of batch processing. It delegates the processing to Cas Processor container. The container using configuration determines if its time to call Cas Processor's batchProcessComplete() method.- Parameters:
aContainer
- - container performing end of batch processingaProcessor
- - Cas Processor to call on end of batchaProcessTr
- - Process Trace to use for aggregating eventsaCasObjectList
- - CASes just analyzed
-
handleSkipCasProcessor
private void handleSkipCasProcessor(ProcessingContainer aContainer, java.lang.Object[] aCasObjectList, boolean isLastCP) throws java.lang.Exception
In case a CAS is skipped ( due to excessive exceptions that it causes ), increments stats and totals.- Parameters:
aContainer
- the a containeraCasObjectList
- the a cas object listisLastCP
- the is last CP- Throws:
java.lang.Exception
- -
-
handleServiceException
private void handleServiceException(ProcessingContainer aContainer, CasProcessor aProcessor, ProcessTrace aProcessTr, java.lang.Exception ex) throws java.lang.Exception
Handle exceptions related to remote invocations.- Parameters:
aContainer
- - container managing CasProcessor that failedaProcessor
- - failed CasProcessoraProcessTr
- - ProcessTrace object holding eventsex
- - Source exception- Throws:
java.lang.Exception
- -
-
handleAbortCasProcessor
private void handleAbortCasProcessor(ProcessingContainer aContainer, CasProcessor aProcessor) throws java.lang.Exception
Diables currect CasProcessor.- Parameters:
aContainer
- - a container that manages the current Cas Processor.aProcessor
- - a Cas Processor to be disabled- Throws:
java.lang.Exception
- - exception
-
handleAbortCPM
private void handleAbortCPM(ProcessingContainer aContainer, CasProcessor aProcessor) throws java.lang.Exception
Terminates the CPM.- Parameters:
aContainer
- - a container that manages the current Cas Processor.aProcessor
- - a Cas Processor to be disabled- Throws:
java.lang.Exception
- - exception
-
handleKillPipeline
private void handleKillPipeline(ProcessingContainer aContainer) throws java.lang.Exception
Terminates the CPM.- Parameters:
aContainer
- - a container that manages the current Cas Processor.- Throws:
java.lang.Exception
- - exception
-
pauseContainer
private boolean pauseContainer(ProcessingContainer aContainer, java.lang.Exception aException, java.lang.String aThreadId)
Determines if the thread should be paused. Pausing container effectively pauses ALL Cas Processors that are managed by the container. The pause is needed when there are multiple pipelines shareing a common service. If this service dies (Socket Down), only one thread should initiate service restart. While the service is being restarted no invocations on the service should be done. Containers will be resumed on successfull service restart.- Parameters:
aContainer
- - a container that manages the current Cas Processor.aException
- the a exceptionaThreadId
- - id of the current thread- Returns:
- true, if successful
-
releaseCases
private void releaseCases(java.lang.Object aCasList, boolean lastProcessor, java.lang.String aName)
Conditionally, releases CASes back to the CAS pool. The release only occurs if the Cas Processor is the last in the processing chain.- Parameters:
aCasList
- - list of CASes to releaselastProcessor
- - determines if the release takes placeaName
- the a name
-
notifyListeners
protected void notifyListeners(java.lang.Object aCas, boolean isCasObject, EntityProcessStatus aEntityProcStatus)
Notifies Listeners of the fact that the pipeline has finished processing the current set Cas'es.- Parameters:
aCas
- - object containing an array of OR a single instance of CasisCasObject
- - true if instance of Cas is of type Cas, false otherwiseaEntityProcStatus
- - status object that may contain exceptions and trace
-
doNotifyListeners
protected void doNotifyListeners(java.lang.Object aCas, boolean isCasObject, EntityProcessStatus aEntityProcStatus)
Notifies all configured listeners. Makes sure that appropriate type of Cas is sent to the listener. Convertions take place to ensure compatibility.- Parameters:
aCas
- - Cas to pass to listenerisCasObject
- - true is Cas is of type CASaEntityProcStatus
- - status object containing exceptions and trace info
-
setReleaseCASFlag
public void setReleaseCASFlag(boolean aFlag)
Called by the CPMEngine during setup to indicate that this thread is supposed to release a CAS at the end of processing. This is typically done for Cas Consumer thread, but in configurations not using Cas Consumers The processing pipeline may also release the CAS.- Parameters:
aFlag
- - true if this thread should release a CAS when analysis is complete
-
stopCasProcessors
public void stopCasProcessors(boolean kill)
Stops all Cas Processors that are part of this PU.- Parameters:
kill
- - true if CPE has been stopped before finishing processing during external stop
-
endOfProcessingReached
protected boolean endOfProcessingReached(long aCount)
Returns true if the CPM has finished analyzing the collection.- Parameters:
aCount
- - running total of documents processed so far- Returns:
- - true if CPM has processed all docs, false otherwise
-
process
protected void process(java.lang.Object anArtifact)
Process.- Parameters:
anArtifact
- the an artifact
-
showMetadata
protected void showMetadata(java.lang.Object[] aCasList)
Show metadata.- Parameters:
aCasList
- the a cas list
-
isProcessorReady
protected boolean isProcessorReady(int aStatus)
Check if the CASProcessor status is available for processing.- Parameters:
aStatus
- the a status- Returns:
- true, if is processor ready
-
getBytes
protected long getBytes(java.lang.Object aCas)
Returns the size of the CAS object. Currently only CASData is supported.- Parameters:
aCas
- CAS to get the size for- Returns:
- the size of the CAS object. Currently only CASData is supported.
-
setCasPool
public void setCasPool(CPECasPool aPool)
Sets the cas pool.- Parameters:
aPool
- the new cas pool
-
filterOutTheCAS
private boolean filterOutTheCAS(ProcessingContainer aContainer, boolean isCasObject, java.lang.Object[] aCasObjectList)
Filter out the CAS.- Parameters:
aContainer
- the a containerisCasObject
- the is cas objectaCasObjectList
- the a cas object list- Returns:
- true, if successful
-
containerDisabled
private boolean containerDisabled(ProcessingContainer aContainer)
Container disabled.- Parameters:
aContainer
- the a container- Returns:
- true, if successful
-
analyze
protected boolean analyze(java.lang.Object[] aCasObjectList, ProcessTrace pTrTemp) throws java.lang.Exception
An alternate processing loop designed for the single-threaded CPM.- Parameters:
aCasObjectList
- - a list of CASes to analyzepTrTemp
- - process trace where statistics are added during analysis- Returns:
- true, if successful
- Throws:
java.lang.Exception
- the exception
-
doReleaseCasProcessor
private void doReleaseCasProcessor(ProcessingContainer aContainer, CasProcessor aCasProcessor)
Do release cas processor.- Parameters:
aContainer
- the a containeraCasProcessor
- the a cas processor
-
doEndOfBatch
private void doEndOfBatch(ProcessingContainer aContainer, CasProcessor aProcessor, ProcessTrace aProcessTr, int howManyCases)
Do end of batch.- Parameters:
aContainer
- the a containeraProcessor
- the a processoraProcessTr
- the a process trhowManyCases
- the how many cases
-
handleErrors
private boolean handleErrors(java.lang.Throwable e, ProcessingContainer aContainer, CasProcessor aProcessor, ProcessTrace aProcessTrace, java.lang.Object[] aCasObjectList, boolean isCasObject) throws java.lang.Exception
Main routine that handles errors occuring in the processing loop.- Parameters:
e
- - exception in the main processing loopaContainer
- - current container of the Cas ProcessoraProcessor
- - current Cas ProcessoraProcessTrace
- - an object containing stats for this procesing loopaCasObjectList
- - list of CASes being analyzedisCasObject
- - determines type of CAS in the aCasObjectList ( CasData or CasObject)- Returns:
- boolean
- Throws:
java.lang.Exception
- -
-
invokeCasObjectCasProcessor
private void invokeCasObjectCasProcessor(ProcessingContainer container, CasProcessor processor, java.lang.Object[] aCasObjectList, ProcessTrace pTrTemp, boolean isCasObject) throws java.lang.Exception
Invoke cas object cas processor.- Parameters:
container
- the containerprocessor
- the processoraCasObjectList
- the a cas object listpTrTemp
- the tr tempisCasObject
- the is cas object- Throws:
java.lang.Exception
- -
-
convertCasDataToCasObject
private void convertCasDataToCasObject(int casIndex, java.lang.String aContainerName, java.lang.Object[] aCasObjectList) throws java.lang.Exception
Convert cas data to cas object.- Parameters:
casIndex
- the cas indexaContainerName
- the a container nameaCasObjectList
- the a cas object list- Throws:
java.lang.Exception
- -
-
invokeCasDataCasProcessor
private void invokeCasDataCasProcessor(ProcessingContainer container, CasProcessor processor, java.lang.Object[] aCasObjectList, ProcessTrace pTrTemp, boolean isCasObject, boolean retry) throws java.lang.Exception
Invoke cas data cas processor.- Parameters:
container
- the containerprocessor
- the processoraCasObjectList
- the a cas object listpTrTemp
- the tr tempisCasObject
- the is cas objectretry
- the retry- Throws:
java.lang.Exception
- -
-
logCPM
private void logCPM(Level level, java.lang.String msgBundleId, java.lang.Object[] args)
Log CPM.- Parameters:
level
- the levelmsgBundleId
- the msg bundle idargs
- the args
-
maybeLogFinest
private void maybeLogFinest(java.lang.String msgBundleId)
Maybe log finest.- Parameters:
msgBundleId
- the msg bundle id
-
logFinest
private void logFinest(java.lang.String msgBundleId)
Log finest.- Parameters:
msgBundleId
- the msg bundle id
-
maybeLogFinest
private void maybeLogFinest(java.lang.String msgBundleId, java.lang.String arg1)
Maybe log finest.- Parameters:
msgBundleId
- the msg bundle idarg1
- the arg 1
-
logFinest
private void logFinest(java.lang.String msgBundleId, java.lang.String arg1)
Log finest.- Parameters:
msgBundleId
- the msg bundle idarg1
- the arg 1
-
maybeLogFinest
private void maybeLogFinest(java.lang.String msgBundleId, java.lang.String arg1, java.lang.String arg2)
Maybe log finest.- Parameters:
msgBundleId
- the msg bundle idarg1
- the arg 1arg2
- the arg 2
-
logFinest
private void logFinest(java.lang.String msgBundleId, java.lang.String arg1, java.lang.String arg2)
Log finest.- Parameters:
msgBundleId
- the msg bundle idarg1
- the arg 1arg2
- the arg 2
-
logFinest
private void logFinest(java.lang.String msgBundleId, java.lang.String arg1, java.lang.String arg2, java.lang.String arg3)
Log finest.- Parameters:
msgBundleId
- the msg bundle idarg1
- the arg 1arg2
- the arg 2arg3
- the arg 3
-
maybeLogFinest
private void maybeLogFinest(java.lang.String msgBundleId, ProcessingContainer container, CasProcessor processor)
Maybe log finest.- Parameters:
msgBundleId
- the msg bundle idcontainer
- the containerprocessor
- the processor
-
logFinest
private void logFinest(java.lang.String msgBundleId, ProcessingContainer container, CasProcessor processor)
Log finest.- Parameters:
msgBundleId
- the msg bundle idcontainer
- the containerprocessor
- the processor
-
maybeLogFinest
private void maybeLogFinest(java.lang.String msgBundleId, ProcessingContainer container)
Maybe log finest.- Parameters:
msgBundleId
- the msg bundle idcontainer
- the container
-
maybeLogFinest
private void maybeLogFinest(java.lang.String msgBundleId, CasProcessor processor)
Maybe log finest.- Parameters:
msgBundleId
- the msg bundle idprocessor
- the processor
-
maybeLogFinest
private void maybeLogFinest(java.lang.String msgBundleId, ProcessingContainer container, CasProcessor processor, CAS[] casCache)
Maybe log finest.- Parameters:
msgBundleId
- the msg bundle idcontainer
- the containerprocessor
- the processorcasCache
- the cas cache
-
maybeLogFinest
private void maybeLogFinest(java.lang.String msgBundleId, CAS[] casCache)
Maybe log finest.- Parameters:
msgBundleId
- the msg bundle idcasCache
- the cas cache
-
maybeLogMemoryFinest
private void maybeLogMemoryFinest()
Maybe log memory finest.
-
logMemoryFinest
private void logMemoryFinest()
Log memory finest.
-
logWarning
private void logWarning(java.lang.String msgBundleId)
Log warning.- Parameters:
msgBundleId
- the msg bundle id
-
maybeLogWarning
private void maybeLogWarning(java.lang.String msgBundleId, java.lang.String arg1, java.lang.String arg2)
Maybe log warning.- Parameters:
msgBundleId
- the msg bundle idarg1
- the arg 1arg2
- the arg 2
-
logWarning
private void logWarning(java.lang.String msgBundleId, java.lang.String arg1, java.lang.String arg2)
Log warning.- Parameters:
msgBundleId
- the msg bundle idarg1
- the arg 1arg2
- the arg 2
-
maybeLogSevere
private void maybeLogSevere(java.lang.String msgBundleId)
Maybe log severe.- Parameters:
msgBundleId
- the msg bundle id
-
maybeLogSevere
private void maybeLogSevere(java.lang.String msgBundleId, java.lang.String arg1)
Maybe log severe.- Parameters:
msgBundleId
- the msg bundle idarg1
- the arg 1
-
logSevere
private void logSevere(java.lang.String msgBundleId, java.lang.String arg1)
Log severe.- Parameters:
msgBundleId
- the msg bundle idarg1
- the arg 1
-
maybeLogSevere
private void maybeLogSevere(java.lang.String msgBundleId, java.lang.String arg1, java.lang.String arg2)
Maybe log severe.- Parameters:
msgBundleId
- the msg bundle idarg1
- the arg 1arg2
- the arg 2
-
logSevere
private void logSevere(java.lang.String msgBundleId, java.lang.String arg1, java.lang.String arg2)
Log severe.- Parameters:
msgBundleId
- the msg bundle idarg1
- the arg 1arg2
- the arg 2
-
maybeLogSevere
private void maybeLogSevere(java.lang.String msgBundleId, java.lang.String arg1, java.lang.String arg2, java.lang.String arg3)
Maybe log severe.- Parameters:
msgBundleId
- the msg bundle idarg1
- the arg 1arg2
- the arg 2arg3
- the arg 3
-
logSevere
private void logSevere(java.lang.String msgBundleId, java.lang.String arg1, java.lang.String arg2, java.lang.String arg3)
Log severe.- Parameters:
msgBundleId
- the msg bundle idarg1
- the arg 1arg2
- the arg 2arg3
- the arg 3
-
maybeLogSevereException
private void maybeLogSevereException(java.lang.Throwable e)
Maybe log severe exception.- Parameters:
e
- the e
-
maybeLogFinestWorkQueue
private void maybeLogFinestWorkQueue(java.lang.String msgBundleId, BoundedWorkQueue workQueue)
Maybe log finest work queue.- Parameters:
msgBundleId
- the msg bundle idworkQueue
- the work queue
-
-