Class ProcessingContainer_Impl

  • All Implemented Interfaces:
    java.lang.Runnable, CasProcessorController, RunnableContainer, ConfigurableResource, Resource

    public class ProcessingContainer_Impl
    extends ProcessingContainer
    implements RunnableContainer
    Manages a pool of CasProcessor instances. Provides access to CasProcessor instance to Processing Thread. Processing threads check out an instance of Cas Processor and when done invoking its process() method return it back to pool. The container aggregates counts and totals on behalf of all instances of Cas Processor. It also manages error and restart thresholds for Cas Processors as a group. Errors are aggregated for all instances of Cas Processor as a group NOT individually. The container takes appropriate actions when threshold are exceeded. What action is taken depends on declaritive specification in the cpe descriptor.
    • Field Detail

      • CONTAINER_SLEEP_TIME

        private static final int CONTAINER_SLEEP_TIME
        The Constant CONTAINER_SLEEP_TIME.
        See Also:
        Constant Field Values
      • casProcessorStatus

        private int casProcessorStatus
        The cas processor status.
      • isLocal

        private boolean isLocal
        The is local.
      • isRemote

        private boolean isRemote
        The is remote.
      • isIntegrated

        private boolean isIntegrated
        The is integrated.
      • batchCounter

        private long batchCounter
        The batch counter.
      • errorCounter

        private int errorCounter
        The error counter.
      • sampleCounter

        private long sampleCounter
        The sample counter.
      • failureThresholdSample

        private long failureThresholdSample
        The failure threshold sample.
      • configuredErrorRate

        private int configuredErrorRate
        The configured error rate.
      • batchSize

        private int batchSize
        The batch size.
      • processed

        private long processed
        The processed.
      • restartCount

        private int restartCount
        The restart count.
      • casProcessorCPEConfiguration

        private CasProcessorConfiguration casProcessorCPEConfiguration
        The cas processor CPE configuration.
      • bytesIn

        private long bytesIn
        The bytes in.
      • bytesOut

        private long bytesOut
        The bytes out.
      • retryCount

        private int retryCount
        The retry count.
      • abortCount

        private int abortCount
        The abort count.
      • filteredCount

        private int filteredCount
        The filtered count.
      • remaining

        private long remaining
        The remaining.
      • processedEntityIds

        private java.util.Stack processedEntityIds
        The processed entity ids.
      • totalTime

        private long totalTime
        The total time.
      • filterList

        private java.util.LinkedList filterList
        The filter list.
      • logPath

        private java.lang.String logPath
        The log path.
      • logger

        private java.util.logging.Logger logger
        The logger.
      • initialized

        private boolean initialized
        The initialized.
      • lastCas

        private java.lang.Object lastCas
        The last cas.
      • casProcessorPool

        public ServiceProxyPool casProcessorPool
        The cas processor pool.
      • statMap

        private java.util.HashMap statMap
        The stat map.
      • isPaused

        private boolean isPaused
        The is paused.
      • singleFencedInstance

        private boolean singleFencedInstance
        The single fenced instance.
      • lockForIsPaused

        private final java.lang.Object lockForIsPaused
        The lock for is paused.
      • processorName

        private java.lang.String processorName
        The processor name.
      • fetchTime

        private long fetchTime
        The fetch time.
      • failedCasProcessorList

        public java.util.LinkedList failedCasProcessorList
        The failed cas processor list.
    • Method Detail

      • getMetadata

        public ProcessingResourceMetaData getMetadata()
        Returns component's input/output capabilities.
        Returns:
        the metadata
      • setCasProcessorDeployer

        public void setCasProcessorDeployer​(CasProcessorDeployer aDeployer)
        Plug in deployer object used to launch/deploy the CasProcessor instance. Used for restarts.
        Specified by:
        setCasProcessorDeployer in class ProcessingContainer
        Parameters:
        aDeployer - - object responsible for deploying/launching CasProcessor
      • deployLogger

        private void deployLogger()
        Deploy Container specific logger to capture any exceptions occuring during CasProcessor(s) lifespan. Each CasProcessor may have its own log. It is optional and available when there is a parameter called 'containerLogPath' in the section of the cpe descriptor. If
      • logAbortedCases

        public void logAbortedCases​(java.lang.Object[] abortedCasList)
        Logs Cas'es that could not be processed.
        Specified by:
        logAbortedCases in class ProcessingContainer
        Parameters:
        abortedCasList - - an arrar of Cas'es that could not be processed by this CasProcessor
      • resetSampleCounter

        private void resetSampleCounter()
        Resets sample counter. This counter is used to determine acceptable error rates associated with hosted CasProcessor. Error rates are measured based on error rate in a given sample: 3 per 1000 for example, where 3 is an error rate and 1000 is a sample size.
      • resetBatchCounter

        private void resetBatchCounter()
        Resets batch counter. This counter is used to determine if the hosted CasProcessor should do special processing. A CasProcessor may buffer all Cas's in memory and only when its batch size is reached, it does something usefull with them, like save them to a file, index them, etc
      • getBytesIn

        public long getBytesIn()
        Returns total number of bytes ingested so far by all CasProcessor instances managed by this container.
        Specified by:
        getBytesIn in class ProcessingContainer
        Returns:
        - bytes processed
      • addBytesIn

        public void addBytesIn​(long aBytesIn)
        Aggregate total bytes ingested by the CasProcessor.
        Specified by:
        addBytesIn in class ProcessingContainer
        Parameters:
        aBytesIn - - number of ingested bytes
      • getBytesOut

        public long getBytesOut()
        Returns total number of bytes processed so far by all CasProcessor instances managed by this container.
        Specified by:
        getBytesOut in class ProcessingContainer
        Returns:
        - bytes processed
      • addBytesOut

        public void addBytesOut​(long aBytesOut)
        Aggregate total bytes processed by this CasProcessor.
        Specified by:
        addBytesOut in class ProcessingContainer
        Parameters:
        aBytesOut - the a bytes out
      • incrementRestartCount

        public void incrementRestartCount​(int aCount)
        Increment number of times the casProcessor was restarted due to failures.
        Specified by:
        incrementRestartCount in class ProcessingContainer
        Parameters:
        aCount - - restart count
      • getRestartCount

        public int getRestartCount()
        Returns total number of all CasProcessor restarts.
        Specified by:
        getRestartCount in class ProcessingContainer
        Returns:
        number of restarts
      • incrementRetryCount

        public void incrementRetryCount​(int aCount)
        Increments number of times CasProceesor failed analyzing Cas'es due to timeout or some other problems.
        Specified by:
        incrementRetryCount in class ProcessingContainer
        Parameters:
        aCount - - failure count
      • getRetryCount

        public int getRetryCount()
        Return the up todate number of retries recorded by the container.
        Specified by:
        getRetryCount in class ProcessingContainer
        Returns:
        - retry count
      • incrementAbortCount

        public void incrementAbortCount​(int aCount)
        Increment number of aborted Cas'es due to inability to process the Cas.
        Specified by:
        incrementAbortCount in class ProcessingContainer
        Parameters:
        aCount - - number of aborts while processing Cas'es
      • getAbortCount

        public int getAbortCount()
        Return the up todate number of aborts recorded by the container.
        Specified by:
        getAbortCount in class ProcessingContainer
        Returns:
        - number of failed attempts to analyze CAS'es
      • incrementFilteredCount

        public void incrementFilteredCount​(int aCount)
        Increments number of CAS'es filtered by the CasProcessor. Filtered CAS'es dont contain required features. Features that are required by the Cas Processor to perform analysis. Dependant feateurs are defined in the filter expression in the CPE descriptor
        Specified by:
        incrementFilteredCount in class ProcessingContainer
        Parameters:
        aCount - - number of filtered Cas'es
      • getFilteredCount

        public int getFilteredCount()
        Returns number of filtered Cas'es.
        Specified by:
        getFilteredCount in class ProcessingContainer
        Returns:
        # of filtered Cas'es
      • getRemaining

        public long getRemaining()
        Returns number of entities still to be processed by the CasProcessor It is a delta of total number of entities to be processed by the CPE minus number of entities processed so far.
        Specified by:
        getRemaining in class ProcessingContainer
        Returns:
        Number of entities yet to be processed
      • setRemaining

        public void setRemaining​(long aRemainingCount)
        Copies number of entities the CasProcessor has yet to process.
        Specified by:
        setRemaining in class ProcessingContainer
        Parameters:
        aRemainingCount - - number of entities to process
      • setLastProcessedEntityId

        public void setLastProcessedEntityId​(java.lang.String aEntityId)
        Copies id of the last entity processed by the CasProcessor.
        Specified by:
        setLastProcessedEntityId in class ProcessingContainer
        Parameters:
        aEntityId - - id of the entity
      • getLastProcessedEntityId

        public java.lang.String getLastProcessedEntityId()
        Returns id of the last entity processed by the CasProcessor.
        Specified by:
        getLastProcessedEntityId in class ProcessingContainer
        Returns:
        - id of entity
      • setLastCas

        @Deprecated
        public void setLastCas​(java.lang.Object aCasObject)
        Deprecated.
        Copies the last Cas Processed.
        Specified by:
        setLastCas in class ProcessingContainer
        Parameters:
        aCasObject - the new last cas
      • getLastCas

        @Deprecated
        public java.lang.Object getLastCas()
        Deprecated.
        Returns the last Cas processed.
        Specified by:
        getLastCas in class ProcessingContainer
        Returns:
        the last cas
      • incrementProcessed

        public void incrementProcessed​(int aIncrement)
        Increment processed.
        Parameters:
        aIncrement - the a increment
      • setProcessed

        public void setProcessed​(long aProcessedCount)
        Used when recovering from checkpoint, sets the total number of entities before CPE stopped.
        Specified by:
        setProcessed in class ProcessingContainer
        Parameters:
        aProcessedCount - - number of entities processed before CPE stopped
      • getProcessed

        public long getProcessed()
        Returns number of entities processed so far.
        Specified by:
        getProcessed in class ProcessingContainer
        Returns:
        - processed - number of entities processed
      • resetErrorCounter

        private void resetErrorCounter()
        Re-initializes the error counter.
      • incrementTotalTime

        public void incrementTotalTime​(long aTime)
        Increments total time spend in the process() method of the CasProcessor.
        Specified by:
        incrementTotalTime in class ProcessingContainer
        Parameters:
        aTime - - total time in process()
      • getTotalTime

        public long getTotalTime()
        Returns total time spent in process().
        Specified by:
        getTotalTime in class ProcessingContainer
        Returns:
        - number of millis spent in process()
      • abortCPMOnError

        public boolean abortCPMOnError()
        Returns true if maximum threshold for errors has been exceeded and the CasProcessor is configured to force CPE shutdown. It looks at the value of the action attribute of the <errorRateThreshold> element in the cpe descriptor.
        Specified by:
        abortCPMOnError in class ProcessingContainer
        Returns:
        - true if the CPE should stop processing, false otherwise
      • isTimeout

        private boolean isTimeout​(java.lang.Throwable aThrowable)
        Returns true if the Exception cause is SocketTimeoutException.
        Parameters:
        aThrowable - - Exception to check for SocketTimeoutException
        Returns:
        - true if Socket Timeout, false otherwise
      • incrementCasProcessorErrors

        public void incrementCasProcessorErrors​(java.lang.Throwable aThrowable)
                                         throws java.lang.Exception
        This routine determines what to do with an exception thrown during the CasProcessor processing. It interprets given exception and throws a new one according to configuration specified in the CPE descriptor. It examines provided thresholds and determines if the CPE should continue to run, if it should disable the CasProcessor (and all its instances), or disregard the error and continue.
        Specified by:
        incrementCasProcessorErrors in class ProcessingContainer
        Parameters:
        aThrowable - - exception to examine
        Throws:
        java.lang.Exception - the exception
      • processCas

        public boolean processCas​(java.lang.Object[] aCasList)
        Returns true if the Cas bundles should be processed by the CasProcessor. This routine checks for existance of dependent featues defined in the filter expression defined for the CasProcessor in the cpe descriptor. Currently this is done on per bundle basis. Meaning that all Cas'es must contain required features. If even one Cas does not have them, the entire bundle is skipped.
        Specified by:
        processCas in class ProcessingContainer
        Parameters:
        aCasList - - bundle containing instances of CAS
        Returns:
        true, if successful
      • hasFeature

        private boolean hasFeature​(CasData aCas)
        Used during filtering, determines if a given Cas has a required feature. Required featured are defined in the filter. Filtering is optional and if not present in the cpe descriptor this routine always returns true.
        Parameters:
        aCas - - Cas instance to check
        Returns:
        - true if feature is in the Cas, false otherwise
      • processCas

        private boolean processCas​(java.lang.Object aCas)
        Checks if a given Cas has required features.
        Parameters:
        aCas - - Cas instance to check
        Returns:
        - true if feature is in the Cas, false otherwise
      • start

        @Deprecated
        public void start()
        Deprecated.
        Start.
        Specified by:
        start in interface RunnableContainer
      • stop

        @Deprecated
        public void stop()
        Deprecated.
        Stop.
        Specified by:
        stop in interface RunnableContainer
      • getCasProcessor

        public CasProcessor getCasProcessor()
        Returns available instance of the CasProcessor from the instance pool. It will wait indefinitely until an instance is available.
        Specified by:
        getCasProcessor in interface CasProcessorController
        Returns:
        the cas processor
      • getStatus

        public int getStatus()
        Returns the current status of the CasProcessor.
        Specified by:
        getStatus in interface CasProcessorController
        Returns:
        the status
      • setStatus

        public void setStatus​(int aStatus)
        Changes the status of the CasProcessor as a group.
        Specified by:
        setStatus in interface CasProcessorController
        Parameters:
        aStatus - - new status
      • isLocal

        @Deprecated
        public boolean isLocal()
        Deprecated.
        Checks if is local.
        Specified by:
        isLocal in interface CasProcessorController
        Returns:
        true, if is local
      • isRemote

        @Deprecated
        public boolean isRemote()
        Deprecated.
        Checks if is remote.
        Specified by:
        isRemote in interface CasProcessorController
        Returns:
        true, if is remote
      • isIntegrated

        @Deprecated
        public boolean isIntegrated()
        Deprecated.
        Checks if is integrated.
        Specified by:
        isIntegrated in interface CasProcessorController
        Returns:
        true, if is integrated
      • continueOnError

        private boolean continueOnError()
        Returns true if the CasProcessor has been configured to continue despite error.
        Returns:
        - true if ignoring errors, false otherwise
      • isAbortable

        public boolean isAbortable()
        Determines if instances of CasProcessor managed by this container are abortable. Abortable CasProcessor's action attribute in the <errorRateThreshold> element has a value of 'disable'.
        Specified by:
        isAbortable in interface CasProcessorController
        Returns:
        true if CasProcessor can be disabled
      • destroy

        public void destroy()
        Destroy instances of CasProcessors managed by this container. Before destroying the instance, this method notifies it with CollectionProcessComplete so that the component finalizes its logic and does appropriate cleanup before shutdown.
        Specified by:
        destroy in interface Resource
        Overrides:
        destroy in class Resource_ImplBase
        See Also:
        Resource.destroy()
      • run

        public void run()
        Specified by:
        run in interface java.lang.Runnable
      • getConfigParameterValue

        public java.lang.Object getConfigParameterValue​(java.lang.String aParamName)
        Description copied from interface: ConfigurableResource
        Looks up the value of a configuration parameter. This method will only return the value of a parameter that is not defined in any group.

        This method returns null if the parameter is optional and has not been assigned a value. (For mandatory parameters, an exception is thrown during initialization if no value has been assigned.) This method also returns null if there is no declared configuration parameter with the specified name.

        Specified by:
        getConfigParameterValue in interface ConfigurableResource
        Parameters:
        aParamName - the name of a parameter that is not in any group
        Returns:
        the value of the parameter with name aParamName, null is either the parameter does not exist or it has not been assigned a value.
      • getConfigParameterValue

        public java.lang.Object getConfigParameterValue​(java.lang.String aGroupName,
                                                        java.lang.String aParamName)
        Description copied from interface: ConfigurableResource
        Looks up the value of a configuration parameter in a group. If the parameter has no value assigned within the group, fallback strategies will be followed.

        This method returns null if the parameter is optional and has not been assigned a value. (For mandatory parameters, an exception is thrown during initialization if no value has been assigned.) This method also returns null if there is no declared configuration parameter with the specified name.

        Specified by:
        getConfigParameterValue in interface ConfigurableResource
        Parameters:
        aGroupName - the name of a configuration group. If the group name is null, this method will return the same value as getParameterValue(String).
        aParamName - the name of a parameter in the group
        Returns:
        the value of the parameter in group aGroupName with name aParamName,,null is either the parameter does not exist or it has not been assigned a value.
      • setConfigParameterValue

        public void setConfigParameterValue​(java.lang.String aParamName,
                                            java.lang.Object aValue)
        Description copied from interface: ConfigurableResource
        Sets the value of a configuration parameter. This only works for a parameter that is not defined in any group. Note that there is no guarantee that the change will take effect until ConfigurableResource.reconfigure() is called.
        Specified by:
        setConfigParameterValue in interface ConfigurableResource
        Parameters:
        aParamName - the name of a parameter that is not in any group
        aValue - the value to assign to the parameter
      • setConfigParameterValue

        public void setConfigParameterValue​(java.lang.String aGroupName,
                                            java.lang.String aParamName,
                                            java.lang.Object aValue)
        Description copied from interface: ConfigurableResource
        Sets the value of a configuration parameter in a group. Note that there is no guarantee that the change will take effect until ConfigurableResource.reconfigure() is called.
        Specified by:
        setConfigParameterValue in interface ConfigurableResource
        Parameters:
        aGroupName - the name of a configuration group. If this parameter is null, this method will have the same effect as setParameterValue(String,Object).
        aParamName - the name of a parameter in the group
        aValue - the value to assign to the parameter.
      • getName

        public java.lang.String getName()
        Returns the name of this container. It is the name of the Cas Processor.
        Specified by:
        getName in class ProcessingContainer
        Returns:
        the name
      • incrementStat

        public void incrementStat​(java.lang.String aStatName,
                                  java.lang.Integer aStat)
        Increment a value of a given stat.
        Specified by:
        incrementStat in class ProcessingContainer
        Parameters:
        aStatName - the a stat name
        aStat - the a stat
      • addStat

        public void addStat​(java.lang.String aStatName,
                            java.lang.Object aStat)
        Add an arbitrary object and bind it to a given name.
        Specified by:
        addStat in class ProcessingContainer
        Parameters:
        aStatName - the a stat name
        aStat - the a stat
      • getStat

        public java.lang.Object getStat​(java.lang.String aStatName)
        Return an abject identified with a given name.
        Specified by:
        getStat in class ProcessingContainer
        Parameters:
        aStatName - the a stat name
        Returns:
        the stat
      • getAllStats

        public java.util.HashMap getAllStats()
        Returns all stats aggregate during the CPM run.
        Specified by:
        getAllStats in class ProcessingContainer
        Returns:
        a map of all stats aggregated during the CPM run
      • pause

        public void pause()
        Pauses the container until resumed. The CPM will pause to the Container while it is trying to re-connect to a shared remote service. While the Container is paused getCasProcessor() will not be allowed to return a new CasProcessor. All other methods are accessible and will function fine.
        Specified by:
        pause in class ProcessingContainer
      • getFetchTime

        public long getFetchTime()
        Gets the fetch time.
        Returns:
        the fetch time