Class CPMEngine
- java.lang.Object
-
- org.apache.uima.collection.impl.cpm.engine.CPMEngine
-
- All Implemented Interfaces:
java.lang.Runnable
public class CPMEngine extends java.lang.Object implements java.lang.Runnable
Responsible for creating and initializing processing threads. This instance manages the life-cycle of the CPE components. It exposes API for plugging in components programmatically instead of declaratively. Running in its own thread, this components creates separate Processing Pipelines for Analysis Engines and CAS Consumers, launches configured CollectionReader and attaches all of those components to form a pipeline from source to sink. The Collection Reader feeds Processing Threads containing Analysis Engines, and Analysis Engines feed results of analysis to CAS Consumers.
-
-
Field Summary
Fields Modifier and Type Field Description private int
activeProcessingUnits
The active processing units.private java.util.Hashtable
analysisEngines
The analysis engines.private java.util.LinkedList
annotatorDeployList
The annotator deploy list.private java.util.LinkedList
annotatorList
The annotator list.private static int
CAS_PROCESSED_MSG
The Constant CAS_PROCESSED_MSG.private ProcessingUnit
casConsumerPU
The cas consumer PU.private java.util.concurrent.Future<?>
casConsumerPUResult
CPECasPool
casPool
The CAS pool.private CasProcessor[]
casprocessorList
The casprocessor list.private boolean
casProcessorsDeployed
The cas processors deployed.private CheckpointData
checkpointData
The checkpoint data.private BaseCollectionReader
collectionReader
The collection reader.private int
concurrentThreadCount
The concurrent thread count.private java.util.LinkedList
consumerDeployList
The consumer deploy list.private java.util.LinkedList
consumerList
The consumer list.private java.util.Hashtable
consumers
The consumers.private boolean
consumerThreadStarted
The consumer thread started.private CPEFactory
cpeFactory
The cpe factory.private long
crFetchTime
The cr fetch time.private DebugControlThread
dbgCtrlThread
The dbg ctrl thread.private Capability[]
definedCapabilities
The defined capabilities.private boolean
dropCasOnExceptionPolicy
The drop cas on exception policy.private CPMExecutorService
executorService
private boolean
hardKill
The hard kill.private java.util.LinkedList
initial_cp_list
The initial cp list.private int
inputQueueSize
The input queue size.protected boolean
isRunning
The is running.protected boolean
killed
The killed.java.lang.Object
lockForPause
The lock for pause.private static int
MAX_WAIT_ON_QUEUE
The Constant MAX_WAIT_ON_QUEUE.private boolean
mixedCasProcessorTypeSupport
The mixed cas processor type support.private java.util.Properties
mPerformanceTuningSettings
The m performance tuning settings.private boolean
needsTCas
The needs T cas.private NonThreadedProcessingUnit
nonThreadedCasConsumerProcessingUnit
The non threaded cas consumer processing unit.private NonThreadedProcessingUnit
nonThreadedProcessingUnit
The non threaded processing unit.private long
numToProcess
The num to process.protected BoundedWorkQueue
outputQueue
The output queue.private int
outputQueueSize
The output queue size.protected boolean
pause
The pause.private boolean
pauseOnException
The pause on exception.private ProcessControllerAdapter
pca
The pca.private int
poolSize
The pool size.private int[]
processingThreadsState
The processing threads state.protected java.util.concurrent.Future<?>[]
processingUnitResults
protected ProcessingUnit[]
processingUnits
The processing units.private ProcessTrace
procTr
The proc tr.private ArtifactProducer
producer
The producer.private java.util.concurrent.Future<?>
producerResult
private int
readerFetchSize
The reader fetch size.private int
readerState
The reader state.private boolean
readerThreadStarted
The reader thread started.private static java.lang.String
SINGLE_THREADED_MODE
The Constant SINGLE_THREADED_MODE.private boolean
singleThreadedCPE
The single threaded CPE.private java.util.Hashtable
skippedDocs
The skipped docs.private java.util.Map
stats
The stats.private java.util.ArrayList
statusCbL
The status cb L.protected boolean
stopped
The stopped.protected BoundedWorkQueue
workQueue
The work queue.
-
Constructor Summary
Constructors Constructor Description CPMEngine(CPMExecutorService aExecutorService, CPEFactory aCpeFactory, ProcessTrace aProcTr, CheckpointData aCheckpointData)
Initializes Collection Processing Engine.
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Deprecated Methods Modifier and Type Method Description private void
addCasConsumer(CasProcessor aProcessor, java.lang.String aCpName)
Adds Cas Processor to a single-threaded pipeline.void
addCasProcessor(CasProcessor aCasProcessor)
Adds a CASProcessor to the processing pipeline.void
addCasProcessor(CasProcessor aCasProcessor, int aIndex)
Adds a CASProcessor to the processing pipeline at a given place in the processing pipeline.private void
addParallizableCasProcessor(CasProcessor aProcessor, java.lang.String aCpName)
Add Cas Processor to a list of CPs that are to be run in the parallelizable pipeline.void
addStatusCallbackListener(BaseStatusCallbackListener aListener)
Adds the status callback listener.void
asynchStop()
Deprecated.private void
bootstrapCPE()
Initialize the CPE.static void
callEntityProcessCompleteWithCAS(StatusCallbackListener statCL, CAS cas, EntityProcessStatus eps)
Internal use only, public for cross package access.private void
callTypeSystemInit()
Call typeSystemInit method on each component.private void
classifyCasProcessors()
Classify based on Cas Processor capability to run in parallel.void
cleanup()
Null out fields of this object.private void
copyComponentEvents(java.lang.String aEvType, java.util.List aList, ProcessTrace aPTr)
Copy given component events.private BoundedWorkQueue
createOutputQueue(int aQueueSize)
Instantiate custom Output Queue.private void
deployAnalysisEngines()
Deploys All Analysis Engines.void
deployCasProcessors()
Starts CASProcessor containers one a time.private void
deployConsumers()
Deploys all Cas Consumers.void
disableCasProcessor(int aCasProcessorIndex)
Disable a CASProcessor in the processing pipeline.void
disableCasProcessor(java.lang.String aCasProcessorName)
Disable a CASProcessor in the processing pipeline.boolean
dropCasOnException()
Drop cas on exception.private void
dumpState()
Dumps some internal state of the CPE.void
enableCasProcessor(java.lang.String aCasProcessorName)
Disable a CASProcessor in the processing pipeline.private boolean
endOfProcessingReached(long entityCount)
Determines if the CPM processed all documents.private void
forcePUShutdown()
Place EOF Token onto a work queue to force thread exit.java.util.LinkedList
getAllProcessingContainers()
Returns a list of All Processing Containers.java.util.ArrayList
getCallbackListeners()
Returns a list of ALL callback listeners currently registered with the CPM.CasProcessor[]
getCasProcessors()
Returns all CASProcesors in the processing pipeline.private java.lang.Object
getCasWithSOFA(java.lang.Object entity, ProcessTrace pTrTemp)
Gets the cas with SOFA.protected CpeConfiguration
getCpeConfig()
Gets the cpe config.(package private) CPMExecutorService
getExecutorService()
private int
getIndexInList(java.util.List aDeployList, java.lang.String aName)
Returns index to a CasProcessor with a given name in a given List.java.lang.String
getLastDocRepository()
Gets the last doc repository.java.lang.String
getLastProcessedDocId()
Returns Id of the last document processed.java.util.Properties
getPerformanceTuningSettings()
Gets the performance tuning settings.int
getPoolSize()
Gets the pool size.private int
getPositionInListIfExists(java.lang.String aName, java.util.List aList)
Find the position in the list of the Cas Processor with a given name.java.util.LinkedList
getProcessingContainers()
Returns a list of Processing Containers for Analysis Engines.private ProcessTrace
getProcessTrace()
Gets the process trace.Progress[]
getProgress()
Returns collectionReader progress.private java.util.HashMap
getStatForContainer(ProcessingContainer aContainer)
Gets the stat for container.java.util.Map
getStats()
Returns CPE stats.int
getThreadCount()
Returns number of processing threads.private UimaTimer
getTimer()
Return timer to measure performace of the cpm.private void
handleException(java.lang.Throwable t, java.lang.Object[] entity, ProcessTrace aPTrace)
Handle given exception.void
invalidateCASes(CAS[] aCASList)
Invalidate CA ses.boolean
isHardKilled()
Returns if the CPE was killed hard.boolean
isKilled()
Returns true if this engine has been killed.private boolean
isMultipleDeploymentAllowed(java.lang.String aDescPath, java.lang.String aCpName, boolean isConsumer)
Parses Cas Processor descriptor and checks if it is parallelizable.boolean
isParallizable(CasProcessor aProcessor, java.lang.String aCpName)
Determines if a given Cas Processor is parallelizable.boolean
isPaused()
Returns a global flag indicating if this Thread is in pause state.boolean
isPauseOnException()
Returns if the CPM should pause when exception occurs.private boolean
isProcessorReady(int aStatus)
Check if the CASProcessor status is available for processing.boolean
isRunning()
Returns a global flag indicating if this Thread is in processing state.void
killIt()
Kill CPM the hard way.private boolean
needsView()
Needs view.private void
notifyListeners(int aMsgType, java.lang.Object[] entity, ProcessTrace aPTrace)
Notify listeners.private void
notifyListeners(int aMsgType, java.lang.Object[] entity, ProcessTrace aPTrace, java.lang.Throwable t)
Notify listeners.private void
notifyListenersWithException(java.lang.Exception e)
Notify listeners of a given exception.void
pauseIt()
Pauses this thread.void
pipelineKilled(java.lang.String aPipelineThreadName)
Callback method used to notify the engine when a processing pipeline is killed due to excessive errors.(package private) void
processingUnitShutdown(ProcessingUnit unit)
Called from the ProcessingUnits when they shutdown due to having received the EOFToken.private ProcessingUnit
producePU(java.lang.String aClassName)
Instantiate custom Processing Pipeline.void
redeployAnalysisEngine(ProcessingContainer aProcessingContainer)
Deploys CasProcessor and associates it with aProcessingContainer
.private void
registerTypeSystemsWithCasManager()
Registers Type Systems of all components with the CasManager.void
releaseCASes(CAS[] aCASList)
Releases given cases back to pool.void
removeCasProcessor(int aCasProcessorIndex)
Removes a CASProcessor from the processing pipeline.void
removeStatusCallbackListener(BaseStatusCallbackListener aListener)
Unregisters given listener from the CPM.private void
restoreFromCheckpoint(java.lang.String component, java.lang.String aEvType)
Restores named events from the checkpoint.void
resumeIt()
Resumes this thread.void
run()
Using given configuration creates and starts CPE processing pipeline.void
runSingleThreaded()
Runs the CPE in a single thread without queues.private void
saveStat(java.lang.String aStatLabel, java.lang.String aStatValue, ProcessingContainer aContainer)
Save stat.void
setCollectionReader(BaseCollectionReader aCollectionReader)
Sets CollectionReader to use during processing.void
setConcurrentThreadSize(int aConcurrentThreadSize)
Defines number of threads executing the processing pipeline concurrently.void
setInputQueueSize(int aInputQueueSize)
Defines the size of inputQueue.void
setNumToProcess(long aNumToProcess)
Defines the size of the batch.void
setOutputQueueSize(int aOutputQueueSize)
Defines the size of outputQueue.void
setPauseOnException(boolean aPause)
Sets a global flag to indicate to the CPM that it should pause whenever exception occurs.void
setPerformanceTuningSettings(java.util.Properties aPerformanceTuningSettings)
Overrides the default performance tuning settings for this CPE.void
setPoolSize(int aPoolSize)
Defines the size of Cas Pool.void
setProcessControllerAdapter(ProcessControllerAdapter aPca)
Sets the process controller adapter.void
setStats(java.util.Map aMap)
Plugs in a map where the engine stores perfomance info at runtime.private void
setupConsumerPipeline()
Setup Cas Consumer pipeline as single threaded.private void
setupProcessingPipeline()
Setup single threaded pipeline.private boolean
skipDroppedDocument(java.lang.Object[] entity)
Determines if a given CAS should be skipped.private void
startDebugControlThread()
Start debug control thread.void
stopCasProcessors(boolean kill)
Stops All Cas Processors and optionally changes the status according to kill flag.void
stopIt()
Stops execution of the Processing Pipeline and this thread.private void
tearDownCPE()
Stop and cleanup single-threaded CPE.private void
waitForCpmToResumeIfPaused()
Wait for CPM to resume if paused.
-
-
-
Field Detail
-
MAX_WAIT_ON_QUEUE
private static final int MAX_WAIT_ON_QUEUE
The Constant MAX_WAIT_ON_QUEUE.- See Also:
- Constant Field Values
-
CAS_PROCESSED_MSG
private static final int CAS_PROCESSED_MSG
The Constant CAS_PROCESSED_MSG.- See Also:
- Constant Field Values
-
SINGLE_THREADED_MODE
private static final java.lang.String SINGLE_THREADED_MODE
The Constant SINGLE_THREADED_MODE.- See Also:
- Constant Field Values
-
executorService
private final CPMExecutorService executorService
-
casPool
public CPECasPool casPool
The CAS pool.
-
lockForPause
public final java.lang.Object lockForPause
The lock for pause.
-
collectionReader
private BaseCollectionReader collectionReader
The collection reader.
-
pause
protected boolean pause
The pause.
-
isRunning
protected volatile boolean isRunning
The is running.
-
stopped
protected volatile boolean stopped
The stopped.
-
killed
protected volatile boolean killed
The killed.
-
pauseOnException
private boolean pauseOnException
The pause on exception.
-
annotatorList
private java.util.LinkedList annotatorList
The annotator list.
-
annotatorDeployList
private java.util.LinkedList annotatorDeployList
The annotator deploy list.
-
consumerList
private java.util.LinkedList consumerList
The consumer list.
-
consumerDeployList
private java.util.LinkedList consumerDeployList
The consumer deploy list.
-
numToProcess
private long numToProcess
The num to process.
-
poolSize
private int poolSize
The pool size.
-
procTr
private ProcessTrace procTr
The proc tr.
-
stats
private java.util.Map stats
The stats.
-
statusCbL
private java.util.ArrayList statusCbL
The status cb L.
-
readerFetchSize
private int readerFetchSize
The reader fetch size.
-
inputQueueSize
private int inputQueueSize
The input queue size.
-
outputQueueSize
private int outputQueueSize
The output queue size.
-
concurrentThreadCount
private int concurrentThreadCount
The concurrent thread count.
-
analysisEngines
private java.util.Hashtable analysisEngines
The analysis engines.
-
consumers
private java.util.Hashtable consumers
The consumers.
-
casprocessorList
private CasProcessor[] casprocessorList
The casprocessor list.
-
producer
private ArtifactProducer producer
The producer.
-
producerResult
private java.util.concurrent.Future<?> producerResult
-
cpeFactory
private CPEFactory cpeFactory
The cpe factory.
-
processingUnits
protected ProcessingUnit[] processingUnits
The processing units.
-
processingUnitResults
protected java.util.concurrent.Future<?>[] processingUnitResults
-
casConsumerPU
private ProcessingUnit casConsumerPU
The cas consumer PU.
-
casConsumerPUResult
private java.util.concurrent.Future<?> casConsumerPUResult
-
outputQueue
protected BoundedWorkQueue outputQueue
The output queue.
-
workQueue
protected BoundedWorkQueue workQueue
The work queue.
-
checkpointData
private CheckpointData checkpointData
The checkpoint data.
-
mixedCasProcessorTypeSupport
private boolean mixedCasProcessorTypeSupport
The mixed cas processor type support.
-
mPerformanceTuningSettings
private java.util.Properties mPerformanceTuningSettings
The m performance tuning settings.
-
dbgCtrlThread
private DebugControlThread dbgCtrlThread
The dbg ctrl thread.
-
pca
private ProcessControllerAdapter pca
The pca.
-
activeProcessingUnits
private int activeProcessingUnits
The active processing units.
-
hardKill
private boolean hardKill
The hard kill.
-
skippedDocs
private java.util.Hashtable skippedDocs
The skipped docs.
-
definedCapabilities
private Capability[] definedCapabilities
The defined capabilities.
-
needsTCas
private boolean needsTCas
The needs T cas.
-
crFetchTime
private long crFetchTime
The cr fetch time.
-
readerState
private int readerState
The reader state.
-
dropCasOnExceptionPolicy
private boolean dropCasOnExceptionPolicy
The drop cas on exception policy.
-
singleThreadedCPE
private boolean singleThreadedCPE
The single threaded CPE.
-
nonThreadedProcessingUnit
private NonThreadedProcessingUnit nonThreadedProcessingUnit
The non threaded processing unit.
-
nonThreadedCasConsumerProcessingUnit
private NonThreadedProcessingUnit nonThreadedCasConsumerProcessingUnit
The non threaded cas consumer processing unit.
-
initial_cp_list
private java.util.LinkedList initial_cp_list
The initial cp list.
-
casProcessorsDeployed
private boolean casProcessorsDeployed
The cas processors deployed.
-
consumerThreadStarted
private boolean consumerThreadStarted
The consumer thread started.
-
readerThreadStarted
private boolean readerThreadStarted
The reader thread started.
-
processingThreadsState
private int[] processingThreadsState
The processing threads state.
-
-
Constructor Detail
-
CPMEngine
public CPMEngine(CPMExecutorService aExecutorService, CPEFactory aCpeFactory, ProcessTrace aProcTr, CheckpointData aCheckpointData) throws java.lang.Exception
Initializes Collection Processing Engine. Assigns this thread and all processing threads created by this component to a common Thread Group.- Parameters:
aExecutorService
- - contains all CPM related threadsaCpeFactory
- - CPE factory object responsible for parsing cpe descriptor and creating componentsaProcTr
- - instance of the ProcessTrace where the CPM accumulates statsaCheckpointData
- - checkpoint object facillitating restart from the last known point- Throws:
java.lang.Exception
- the exception
-
-
Method Detail
-
getExecutorService
CPMExecutorService getExecutorService()
-
getProcessingContainers
public java.util.LinkedList getProcessingContainers()
Returns a list of Processing Containers for Analysis Engines. Each CasProcessor is managed by its own container.- Returns:
- the processing containers
-
getAllProcessingContainers
public java.util.LinkedList getAllProcessingContainers()
Returns a list of All Processing Containers. Each CasProcessor is managed by its own container.- Returns:
- the all processing containers
-
getThreadCount
public int getThreadCount() throws ResourceConfigurationException
Returns number of processing threads.- Returns:
- - number of processing threads
- Throws:
ResourceConfigurationException
- -
-
setStats
public void setStats(java.util.Map aMap)
Plugs in a map where the engine stores perfomance info at runtime.- Parameters:
aMap
- - map for runtime stats and totals
-
getStats
public java.util.Map getStats()
Returns CPE stats.- Returns:
- Map containing CPE stats
-
setPauseOnException
public void setPauseOnException(boolean aPause)
Sets a global flag to indicate to the CPM that it should pause whenever exception occurs.- Parameters:
aPause
- - true if pause is requested on exception, false otherwise
-
isPauseOnException
public boolean isPauseOnException()
Returns if the CPM should pause when exception occurs.- Returns:
- - true if the CPM pauses when exception occurs, false otherwise
-
setInputQueueSize
public void setInputQueueSize(int aInputQueueSize)
Defines the size of inputQueue. The queue stores this many entities read from the CollectionReader. Every processing pipeline thread will read its entities from this input queue. The CollectionReader is decoupled from the consumer of entities, and continuously replenishes the input queue.- Parameters:
aInputQueueSize
- the size of the batch.
-
setOutputQueueSize
public void setOutputQueueSize(int aOutputQueueSize)
Defines the size of outputQueue. The queue stores this many entities enqueued by every processing pipeline thread.The results of analysis are dumped into this queue for consumer thread to consume its contents.- Parameters:
aOutputQueueSize
- the size of the batch.
-
setPoolSize
public void setPoolSize(int aPoolSize)
Defines the size of Cas Pool.- Parameters:
aPoolSize
- the size of the Cas pool.
-
getPoolSize
public int getPoolSize()
Gets the pool size.- Returns:
- the pool size
-
setConcurrentThreadSize
public void setConcurrentThreadSize(int aConcurrentThreadSize)
Defines number of threads executing the processing pipeline concurrently.- Parameters:
aConcurrentThreadSize
- the size of the batch.
-
addStatusCallbackListener
public void addStatusCallbackListener(BaseStatusCallbackListener aListener)
Adds the status callback listener.- Parameters:
aListener
- the a listener
-
getCallbackListeners
public java.util.ArrayList getCallbackListeners()
Returns a list of ALL callback listeners currently registered with the CPM.- Returns:
- -
-
removeStatusCallbackListener
public void removeStatusCallbackListener(BaseStatusCallbackListener aListener)
Unregisters given listener from the CPM.- Parameters:
aListener
- - instance ofBaseStatusCallbackListener
to unregister
-
isKilled
public boolean isKilled()
Returns true if this engine has been killed.- Returns:
- true if this engine has been killed
-
dumpState
private void dumpState()
Dumps some internal state of the CPE. Used for debugging.
-
killIt
public void killIt()
Kill CPM the hard way. None of the entities in the queues will be processed. This methof simply empties all queues and at the end adds EOFToken to the work queue so that all threads go away.
-
isHardKilled
public boolean isHardKilled()
Returns if the CPE was killed hard. Soft kill allows the CPE to finish processing all in-transit CASes. Hard kill causes the CPM to stop processing and to throw away all unprocessed CASes from its queues.- Returns:
- true if the CPE was killed hard
-
asynchStop
@Deprecated public void asynchStop()
Deprecated.Asynch stop.
-
stopIt
public void stopIt()
Stops execution of the Processing Pipeline and this thread.
-
getIndexInList
private int getIndexInList(java.util.List aDeployList, java.lang.String aName)
Returns index to a CasProcessor with a given name in a given List.- Parameters:
aDeployList
- - List of CasConsumers to be searchedaName
- - name of the CasConsumer we want to find- Returns:
- 0 - if a CasConsumer is not found in a list, else returns a position in the list where the CasConsumer can found
-
getPositionInListIfExists
private int getPositionInListIfExists(java.lang.String aName, java.util.List aList)
Find the position in the list of the Cas Processor with a given name.- Parameters:
aName
- the a nameaList
- the a list- Returns:
- the position in the list of the Cas Processor with a given name
-
isMultipleDeploymentAllowed
private boolean isMultipleDeploymentAllowed(java.lang.String aDescPath, java.lang.String aCpName, boolean isConsumer) throws java.lang.Exception
Parses Cas Processor descriptor and checks if it is parallelizable.- Parameters:
aDescPath
- - fully qualified path to a CP descriptoraCpName
- - name of the CPisConsumer
- - true if the CP is a Cas Consumer, false otherwise- Returns:
- - true if CP is parallelizable, false otherwise
- Throws:
java.lang.Exception
- -
-
isParallizable
public boolean isParallizable(CasProcessor aProcessor, java.lang.String aCpName) throws java.lang.Exception
Determines if a given Cas Processor is parallelizable. Remote Cas Processors are by default parallelizable. For integrated and managed the CPM consults Cas Processor's descriptor to determine if it is parallelizable.- Parameters:
aProcessor
- - Cas Processor being checkedaCpName
- - name of the CP- Returns:
- - true if CP is parallelizable, false otherwise
- Throws:
java.lang.Exception
- -
-
addCasConsumer
private void addCasConsumer(CasProcessor aProcessor, java.lang.String aCpName) throws java.lang.Exception
Adds Cas Processor to a single-threaded pipeline. This pipeline is fed by the output queue and typicall contains Cas Consumers. AEs can alos be part of this pipeline.- Parameters:
aProcessor
- - Cas Processor to add to single-threaded pipelineaCpName
- - name of the Cas Processor- Throws:
java.lang.Exception
- -
-
addParallizableCasProcessor
private void addParallizableCasProcessor(CasProcessor aProcessor, java.lang.String aCpName) throws java.lang.Exception
Add Cas Processor to a list of CPs that are to be run in the parallelizable pipeline. The fact that the CP is in parallelizable pipeline does not mean that there will be instance per pipeline of CP. Its allowed to have a single instance, shareable CP running in multi-threaded pipeline.- Parameters:
aProcessor
- - CP to add to parallelizable pipelineaCpName
- - name of the CP- Throws:
java.lang.Exception
- -
-
classifyCasProcessors
private void classifyCasProcessors() throws java.lang.Exception
Classify based on Cas Processor capability to run in parallel. Some Cas Processors need to run as single instance only. It scans the list of Cas Processors backwords and moves those Cas Processors that are not parallelizable to a separate single-thread pipeline. This process of moving CPs continues until the first parallelizable Cas Processor is found. Beyond this all Cas Processors are moved to a parallelizable pipeline. If the non-parallelizable CP is in the parallelizable pipeline there simply will be a single instance of it that will be shared by all processing threads.- Throws:
java.lang.Exception
- -
-
addCasProcessor
public void addCasProcessor(CasProcessor aCasProcessor) throws ResourceConfigurationException
Adds a CASProcessor to the processing pipeline. If a CasProcessor already exists and its status=DISABLED this method will re-enable the CasProcesser.- Parameters:
aCasProcessor
- CASProcessor to be added to the processing pipeline- Throws:
ResourceConfigurationException
- the resource configuration exception
-
addCasProcessor
public void addCasProcessor(CasProcessor aCasProcessor, int aIndex) throws ResourceConfigurationException
Adds a CASProcessor to the processing pipeline at a given place in the processing pipeline.- Parameters:
aCasProcessor
- CASProcessor to be added to the processing pipelineaIndex
- - insertion point for a given CasProcessor- Throws:
ResourceConfigurationException
- the resource configuration exception
-
removeCasProcessor
public void removeCasProcessor(int aCasProcessorIndex)
Removes a CASProcessor from the processing pipeline.- Parameters:
aCasProcessorIndex
- - CasProcessor position in processing pipeline
-
disableCasProcessor
public void disableCasProcessor(int aCasProcessorIndex)
Disable a CASProcessor in the processing pipeline.- Parameters:
aCasProcessorIndex
- CASProcessor to be added to the processing pipeline
-
disableCasProcessor
public void disableCasProcessor(java.lang.String aCasProcessorName)
Disable a CASProcessor in the processing pipeline.- Parameters:
aCasProcessorName
- CASProcessor to be added to the processing pipeline
-
enableCasProcessor
public void enableCasProcessor(java.lang.String aCasProcessorName)
Disable a CASProcessor in the processing pipeline.- Parameters:
aCasProcessorName
- CASProcessor to be added to the processing pipeline
-
getCasProcessors
public CasProcessor[] getCasProcessors()
Returns all CASProcesors in the processing pipeline.- Returns:
- the cas processors
-
deployConsumers
private void deployConsumers() throws AbortCPMException
Deploys all Cas Consumers.- Throws:
AbortCPMException
- -
-
redeployAnalysisEngine
public void redeployAnalysisEngine(ProcessingContainer aProcessingContainer) throws java.lang.Exception
Deploys CasProcessor and associates it with aProcessingContainer
.- Parameters:
aProcessingContainer
- the a processing container- Throws:
java.lang.Exception
- the exception
-
deployAnalysisEngines
private void deployAnalysisEngines() throws AbortCPMException
Deploys All Analysis Engines. Analysis Engines run in a replicated processing units seperate from Cas Consumers.- Throws:
AbortCPMException
- -
-
deployCasProcessors
public void deployCasProcessors() throws AbortCPMException
Starts CASProcessor containers one a time. During this phase the container deploys a TAE as local,remote, or integrated CasProcessor.- Throws:
AbortCPMException
- the abort CPM exception
-
restoreFromCheckpoint
private void restoreFromCheckpoint(java.lang.String component, java.lang.String aEvType)
Restores named events from the checkpoint.- Parameters:
component
- - component name to restore named event foraEvType
- - event to restore
-
copyComponentEvents
private void copyComponentEvents(java.lang.String aEvType, java.util.List aList, ProcessTrace aPTr) throws java.io.IOException
Copy given component events.- Parameters:
aEvType
- - event typeaList
- - list of events to copyaPTr
- -- Throws:
java.io.IOException
- -
-
isRunning
public boolean isRunning()
Returns a global flag indicating if this Thread is in processing state.- Returns:
- true, if is running
-
isPaused
public boolean isPaused()
Returns a global flag indicating if this Thread is in pause state.- Returns:
- true, if is paused
-
pauseIt
public void pauseIt()
Pauses this thread.
-
resumeIt
public void resumeIt()
Resumes this thread.
-
setCollectionReader
public void setCollectionReader(BaseCollectionReader aCollectionReader)
Sets CollectionReader to use during processing.- Parameters:
aCollectionReader
- aCollectionReader
-
setNumToProcess
public void setNumToProcess(long aNumToProcess)
Defines the size of the batch.- Parameters:
aNumToProcess
- the new num to process
-
getLastProcessedDocId
public java.lang.String getLastProcessedDocId()
Returns Id of the last document processed.- Returns:
- the last processed doc id
-
getLastDocRepository
public java.lang.String getLastDocRepository()
Gets the last doc repository.- Returns:
- the last doc repository
-
producePU
private ProcessingUnit producePU(java.lang.String aClassName) throws java.lang.Exception
Instantiate custom Processing Pipeline.- Parameters:
aClassName
- - name of a class that extends ProcessingUnit- Returns:
- - an instance of the ProcessingUnit
- Throws:
java.lang.Exception
- -
-
startDebugControlThread
private void startDebugControlThread()
Start debug control thread.
-
createOutputQueue
private BoundedWorkQueue createOutputQueue(int aQueueSize) throws java.lang.Exception
Instantiate custom Output Queue.- Parameters:
aQueueSize
- - max size of the queue- Returns:
- - new instance of the output queue
- Throws:
java.lang.Exception
- -
-
notifyListenersWithException
private void notifyListenersWithException(java.lang.Exception e)
Notify listeners of a given exception.- Parameters:
e
- - en exception to be sent to listeners
-
pipelineKilled
public void pipelineKilled(java.lang.String aPipelineThreadName)
Callback method used to notify the engine when a processing pipeline is killed due to excessive errors. This method is only called if the processing pipeline is unable to acquire a connection to remote service and when configuration indicates 'kill-pipeline' as the action to take on excessive errors. When running with multiple pipelines, routine decrements a global pipeline counter and tests if there are no more left. When all pipelines are killed as described above, the CPM needs to terminate. Since pipelines are prematurely killed, there are artifacts (CASes) in the work queue. These must be removed from the work queue and disposed of (released) back to the CAS pool so that the Collection Reader thread properly exits.- Parameters:
aPipelineThreadName
- - name of the pipeline thread exiting from its run() method
-
run
public void run()
Using given configuration creates and starts CPE processing pipeline. It is either single-threaded or a multi-threaded pipeline. Which is actually used depends on the configuration defined in the CPE descriptor. In multi-threaded mode, the CPE starts number of threads: 1) ArtifactProducer Thread - this is a thread containing a Collection Reader. It runs asynchronously and it fills a WorkQueue with CASes. 2) CasConsumer Thread - this is an optional thread. It is only instantiated if there Cas Consumers in the pipeline 3) Processing Threads - one or more processing threads, configured identically, that are performing analysis How many threads are started depends on configuration in CPE descriptor All threads started here are placed in a ThreadGroup. This provides a catch-all mechanism for errors that may occur in the CPM. If error is thrown, the ThreadGroup is notified. The ThreadGroup than notifies all registers listeners to give an application a chance to report the error and do necessary cleanup. This routine manages all the threads and makes sure that all of them are cleaned up before returning. The ThreadGroup must cleanup all threads under its control otherwise a memory leak occurs. Even those threads that are not started must be cleaned as they end up in the ThreadGroup when instantiated. The code uses number of state variables to make decisions during cleanup.- Specified by:
run
in interfacejava.lang.Runnable
-
forcePUShutdown
private void forcePUShutdown()
Place EOF Token onto a work queue to force thread exit.
-
getTimer
private UimaTimer getTimer() throws java.lang.Exception
Return timer to measure performace of the cpm. The timer can optionally be configured in the CPE descriptor. If none defined, the method returns default timer.- Returns:
- - customer timer or JavaTimer (default)
- Throws:
java.lang.Exception
- -
-
cleanup
public void cleanup()
Null out fields of this object. Call this only when this object is no longer needed.
-
registerTypeSystemsWithCasManager
private void registerTypeSystemsWithCasManager() throws java.lang.Exception
Registers Type Systems of all components with the CasManager.- Throws:
java.lang.Exception
- the exception
-
callTypeSystemInit
private void callTypeSystemInit() throws ResourceInitializationException
Call typeSystemInit method on each component.- Throws:
ResourceInitializationException
- the resource initialization exception
-
stopCasProcessors
public void stopCasProcessors(boolean kill) throws CasProcessorDeploymentException
Stops All Cas Processors and optionally changes the status according to kill flag.- Parameters:
kill
- - true if CPE has been stopped before completing normally- Throws:
CasProcessorDeploymentException
- the cas processor deployment exception
-
getProgress
public Progress[] getProgress()
Returns collectionReader progress.- Returns:
- the progress
-
getStatForContainer
private java.util.HashMap getStatForContainer(ProcessingContainer aContainer)
Gets the stat for container.- Parameters:
aContainer
- the a container- Returns:
- the stat for container
-
saveStat
private void saveStat(java.lang.String aStatLabel, java.lang.String aStatValue, ProcessingContainer aContainer)
Save stat.- Parameters:
aStatLabel
- the a stat labelaStatValue
- the a stat valueaContainer
- the a container
-
isProcessorReady
private boolean isProcessorReady(int aStatus)
Check if the CASProcessor status is available for processing.- Parameters:
aStatus
- the a status- Returns:
- true, if is processor ready
-
invalidateCASes
public void invalidateCASes(CAS[] aCASList)
Invalidate CA ses.- Parameters:
aCASList
- the a CAS list
-
releaseCASes
public void releaseCASes(CAS[] aCASList)
Releases given cases back to pool.- Parameters:
aCASList
- - cas list to release
-
setPerformanceTuningSettings
public void setPerformanceTuningSettings(java.util.Properties aPerformanceTuningSettings)
Overrides the default performance tuning settings for this CPE. This affects things such as CAS sizing parameters.- Parameters:
aPerformanceTuningSettings
- the new settings- See Also:
UIMAFramework.getDefaultPerformanceTuningProperties()
-
getPerformanceTuningSettings
public java.util.Properties getPerformanceTuningSettings()
Gets the performance tuning settings.- Returns:
- Returns the PerformanceTuningSettings.
-
setProcessControllerAdapter
public void setProcessControllerAdapter(ProcessControllerAdapter aPca)
Sets the process controller adapter.- Parameters:
aPca
- the new process controller adapter
-
getCpeConfig
protected CpeConfiguration getCpeConfig() throws java.lang.Exception
Gets the cpe config.- Returns:
- the cpe config
- Throws:
java.lang.Exception
- the exception
-
processingUnitShutdown
void processingUnitShutdown(ProcessingUnit unit)
Called from the ProcessingUnits when they shutdown due to having received the EOFToken. When all ProcessingUnits have shut down, we put an EOFToken on the output queue so that The CAS Consumers will also shut down. -Adam- Parameters:
unit
- the unit
-
dropCasOnException
public boolean dropCasOnException()
Drop cas on exception.- Returns:
- true, if successful
-
getCasWithSOFA
private java.lang.Object getCasWithSOFA(java.lang.Object entity, ProcessTrace pTrTemp)
Gets the cas with SOFA.- Parameters:
entity
- the entitypTrTemp
- the tr temp- Returns:
- the cas with SOFA
-
needsView
private boolean needsView()
Needs view.- Returns:
- true if needsTCas
-
bootstrapCPE
private void bootstrapCPE() throws java.lang.Exception
Initialize the CPE.- Throws:
java.lang.Exception
- -
-
setupProcessingPipeline
private void setupProcessingPipeline() throws java.lang.Exception
Setup single threaded pipeline.- Throws:
java.lang.Exception
- -
-
setupConsumerPipeline
private void setupConsumerPipeline() throws java.lang.Exception
Setup Cas Consumer pipeline as single threaded.- Throws:
java.lang.Exception
- -
-
skipDroppedDocument
private boolean skipDroppedDocument(java.lang.Object[] entity)
Determines if a given CAS should be skipped.- Parameters:
entity
- - container for CAS- Returns:
- true if a given CAS should be skipped
-
runSingleThreaded
public void runSingleThreaded() throws java.lang.Exception
Runs the CPE in a single thread without queues.- Throws:
java.lang.Exception
- -
-
endOfProcessingReached
private boolean endOfProcessingReached(long entityCount)
Determines if the CPM processed all documents.- Parameters:
entityCount
- - number of documents processed so far- Returns:
- true if all documents processed, false otherwise
-
handleException
private void handleException(java.lang.Throwable t, java.lang.Object[] entity, ProcessTrace aPTrace)
Handle given exception.- Parameters:
t
- - exception to handleentity
- - CAS containeraPTrace
- - process trace
-
notifyListeners
private void notifyListeners(int aMsgType, java.lang.Object[] entity, ProcessTrace aPTrace)
Notify listeners.- Parameters:
aMsgType
- the a msg typeentity
- the entityaPTrace
- the a P trace
-
notifyListeners
private void notifyListeners(int aMsgType, java.lang.Object[] entity, ProcessTrace aPTrace, java.lang.Throwable t)
Notify listeners.- Parameters:
aMsgType
- the a msg typeentity
- the entityaPTrace
- the a P tracet
- the t
-
callEntityProcessCompleteWithCAS
public static void callEntityProcessCompleteWithCAS(StatusCallbackListener statCL, CAS cas, EntityProcessStatus eps)
Internal use only, public for cross package access. switches class loaders and locks cas- Parameters:
statCL
- status call back listenercas
- CASeps
- entity process status
-
getProcessTrace
private ProcessTrace getProcessTrace() throws java.lang.Exception
Gets the process trace.- Returns:
- the process trace
- Throws:
java.lang.Exception
- the exception
-
tearDownCPE
private void tearDownCPE()
Stop and cleanup single-threaded CPE.
-
waitForCpmToResumeIfPaused
private void waitForCpmToResumeIfPaused()
Wait for CPM to resume if paused.
-
-