Class ProcessingContainer_Impl

All Implemented Interfaces:
Runnable, CasProcessorController, RunnableContainer, ConfigurableResource, Resource

public class ProcessingContainer_Impl extends ProcessingContainer implements RunnableContainer
Manages a pool of CasProcessor instances. Provides access to CasProcessor instance to Processing Thread. Processing threads check out an instance of Cas Processor and when done invoking its process() method return it back to pool. The container aggregates counts and totals on behalf of all instances of Cas Processor. It also manages error and restart thresholds for Cas Processors as a group. Errors are aggregated for all instances of Cas Processor as a group NOT individually. The container takes appropriate actions when threshold are exceeded. What action is taken depends on declaritive specification in the cpe descriptor.
  • Field Details

    • CONTAINER_SLEEP_TIME

      private static final int CONTAINER_SLEEP_TIME
      The Constant CONTAINER_SLEEP_TIME.
      See Also:
    • casProcessorStatus

      private int casProcessorStatus
      The cas processor status.
    • configParams

      private ConfigurationParameterSettings configParams
      The config params.
    • isLocal

      private boolean isLocal
      The is local.
    • isRemote

      private boolean isRemote
      The is remote.
    • isIntegrated

      private boolean isIntegrated
      The is integrated.
    • batchCounter

      private long batchCounter
      The batch counter.
    • errorCounter

      private int errorCounter
      The error counter.
    • sampleCounter

      private long sampleCounter
      The sample counter.
    • failureThresholdSample

      private long failureThresholdSample
      The failure threshold sample.
    • configuredErrorRate

      private int configuredErrorRate
      The configured error rate.
    • batchSize

      private int batchSize
      The batch size.
    • processed

      private long processed
      The processed.
    • restartCount

      private int restartCount
      The restart count.
    • casProcessorCPEConfiguration

      private CasProcessorConfiguration casProcessorCPEConfiguration
      The cas processor CPE configuration.
    • bytesIn

      private long bytesIn
      The bytes in.
    • bytesOut

      private long bytesOut
      The bytes out.
    • retryCount

      private int retryCount
      The retry count.
    • abortCount

      private int abortCount
      The abort count.
    • filteredCount

      private int filteredCount
      The filtered count.
    • remaining

      private long remaining
      The remaining.
    • processedEntityIds

      private Stack processedEntityIds
      The processed entity ids.
    • totalTime

      private long totalTime
      The total time.
    • filterList

      private LinkedList filterList
      The filter list.
    • logPath

      private String logPath
      The log path.
    • logger

      private Logger logger
      The logger.
    • initialized

      private boolean initialized
      The initialized.
    • lastCas

      private Object lastCas
      The last cas.
    • casProcessorPool

      public ServiceProxyPool casProcessorPool
      The cas processor pool.
    • casPDeployer

      private CasProcessorDeployer casPDeployer
      The cas P deployer.
    • metadata

      private ProcessingResourceMetaData metadata
      The metadata.
    • statMap

      private HashMap statMap
      The stat map.
    • isPaused

      private boolean isPaused
      The is paused.
    • singleFencedInstance

      private boolean singleFencedInstance
      The single fenced instance.
    • lockForIsPaused

      private final Object lockForIsPaused
      The lock for is paused.
    • processorName

      private String processorName
      The processor name.
    • fetchTime

      private long fetchTime
      The fetch time.
    • failedCasProcessorList

      public LinkedList failedCasProcessorList
      The failed cas processor list.
  • Constructor Details

  • Method Details

    • getMetadata

      public ProcessingResourceMetaData getMetadata()
      Returns component's input/output capabilities.
      Returns:
      the metadata
    • setMetadata

      public void setMetadata(ProcessingResourceMetaData aMetadata)
      Sets component's input/output capabilities.
      Specified by:
      setMetadata in class ProcessingContainer
      Parameters:
      aMetadata - component capabilities
    • setCasProcessorDeployer

      public void setCasProcessorDeployer(CasProcessorDeployer aDeployer)
      Plug in deployer object used to launch/deploy the CasProcessor instance. Used for restarts.
      Specified by:
      setCasProcessorDeployer in class ProcessingContainer
      Parameters:
      aDeployer - - object responsible for deploying/launching CasProcessor
    • getDeployer

      public CasProcessorDeployer getDeployer()
      Returns deployer object used to launch the CasProcessor.
      Specified by:
      getDeployer in class ProcessingContainer
      Returns:
      - CasProcessorDeployer - deployer object
    • deployLogger

      private void deployLogger()
      Deploy Container specific logger to capture any exceptions occuring during CasProcessor(s) lifespan. Each CasProcessor may have its own log. It is optional and available when there is a parameter called 'containerLogPath' in the section of the cpe descriptor. If
    • logAbortedCases

      public void logAbortedCases(Object[] abortedCasList)
      Logs Cas'es that could not be processed.
      Specified by:
      logAbortedCases in class ProcessingContainer
      Parameters:
      abortedCasList - - an arrar of Cas'es that could not be processed by this CasProcessor
    • resetSampleCounter

      private void resetSampleCounter()
      Resets sample counter. This counter is used to determine acceptable error rates associated with hosted CasProcessor. Error rates are measured based on error rate in a given sample: 3 per 1000 for example, where 3 is an error rate and 1000 is a sample size.
    • resetBatchCounter

      private void resetBatchCounter()
      Resets batch counter. This counter is used to determine if the hosted CasProcessor should do special processing. A CasProcessor may buffer all Cas's in memory and only when its batch size is reached, it does something usefull with them, like save them to a file, index them, etc
    • getBytesIn

      public long getBytesIn()
      Returns total number of bytes ingested so far by all CasProcessor instances managed by this container.
      Specified by:
      getBytesIn in class ProcessingContainer
      Returns:
      - bytes processed
    • addBytesIn

      public void addBytesIn(long aBytesIn)
      Aggregate total bytes ingested by the CasProcessor.
      Specified by:
      addBytesIn in class ProcessingContainer
      Parameters:
      aBytesIn - - number of ingested bytes
    • getBytesOut

      public long getBytesOut()
      Returns total number of bytes processed so far by all CasProcessor instances managed by this container.
      Specified by:
      getBytesOut in class ProcessingContainer
      Returns:
      - bytes processed
    • addBytesOut

      public void addBytesOut(long aBytesOut)
      Aggregate total bytes processed by this CasProcessor.
      Specified by:
      addBytesOut in class ProcessingContainer
      Parameters:
      aBytesOut - the a bytes out
    • incrementRestartCount

      public void incrementRestartCount(int aCount)
      Increment number of times the casProcessor was restarted due to failures.
      Specified by:
      incrementRestartCount in class ProcessingContainer
      Parameters:
      aCount - - restart count
    • getRestartCount

      public int getRestartCount()
      Returns total number of all CasProcessor restarts.
      Specified by:
      getRestartCount in class ProcessingContainer
      Returns:
      number of restarts
    • incrementRetryCount

      public void incrementRetryCount(int aCount)
      Increments number of times CasProceesor failed analyzing Cas'es due to timeout or some other problems.
      Specified by:
      incrementRetryCount in class ProcessingContainer
      Parameters:
      aCount - - failure count
    • getRetryCount

      public int getRetryCount()
      Return the up todate number of retries recorded by the container.
      Specified by:
      getRetryCount in class ProcessingContainer
      Returns:
      - retry count
    • incrementAbortCount

      public void incrementAbortCount(int aCount)
      Increment number of aborted Cas'es due to inability to process the Cas.
      Specified by:
      incrementAbortCount in class ProcessingContainer
      Parameters:
      aCount - - number of aborts while processing Cas'es
    • getAbortCount

      public int getAbortCount()
      Return the up todate number of aborts recorded by the container.
      Specified by:
      getAbortCount in class ProcessingContainer
      Returns:
      - number of failed attempts to analyze CAS'es
    • incrementFilteredCount

      public void incrementFilteredCount(int aCount)
      Increments number of CAS'es filtered by the CasProcessor. Filtered CAS'es dont contain required features. Features that are required by the Cas Processor to perform analysis. Dependant feateurs are defined in the filter expression in the CPE descriptor
      Specified by:
      incrementFilteredCount in class ProcessingContainer
      Parameters:
      aCount - - number of filtered Cas'es
    • getFilteredCount

      public int getFilteredCount()
      Returns number of filtered Cas'es.
      Specified by:
      getFilteredCount in class ProcessingContainer
      Returns:
      # of filtered Cas'es
    • getRemaining

      public long getRemaining()
      Returns number of entities still to be processed by the CasProcessor It is a delta of total number of entities to be processed by the CPE minus number of entities processed so far.
      Specified by:
      getRemaining in class ProcessingContainer
      Returns:
      Number of entities yet to be processed
    • setRemaining

      public void setRemaining(long aRemainingCount)
      Copies number of entities the CasProcessor has yet to process.
      Specified by:
      setRemaining in class ProcessingContainer
      Parameters:
      aRemainingCount - - number of entities to process
    • setLastProcessedEntityId

      public void setLastProcessedEntityId(String aEntityId)
      Copies id of the last entity processed by the CasProcessor.
      Specified by:
      setLastProcessedEntityId in class ProcessingContainer
      Parameters:
      aEntityId - - id of the entity
    • getLastProcessedEntityId

      public String getLastProcessedEntityId()
      Returns id of the last entity processed by the CasProcessor.
      Specified by:
      getLastProcessedEntityId in class ProcessingContainer
      Returns:
      - id of entity
    • setLastCas

      @Deprecated public void setLastCas(Object aCasObject)
      Deprecated.
      Copies the last Cas Processed.
      Specified by:
      setLastCas in class ProcessingContainer
      Parameters:
      aCasObject - the new last cas
    • getLastCas

      @Deprecated public Object getLastCas()
      Deprecated.
      Returns the last Cas processed.
      Specified by:
      getLastCas in class ProcessingContainer
      Returns:
      the last cas
    • incrementProcessed

      public void incrementProcessed(int aIncrement)
      Increment processed.
      Parameters:
      aIncrement - the a increment
    • setProcessed

      public void setProcessed(long aProcessedCount)
      Used when recovering from checkpoint, sets the total number of entities before CPE stopped.
      Specified by:
      setProcessed in class ProcessingContainer
      Parameters:
      aProcessedCount - - number of entities processed before CPE stopped
    • getProcessed

      public long getProcessed()
      Returns number of entities processed so far.
      Specified by:
      getProcessed in class ProcessingContainer
      Returns:
      - processed - number of entities processed
    • resetErrorCounter

      private void resetErrorCounter()
      Re-initializes the error counter.
    • resetRestartCount

      public void resetRestartCount()
      Specified by:
      resetRestartCount in class ProcessingContainer
    • incrementTotalTime

      public void incrementTotalTime(long aTime)
      Increments total time spend in the process() method of the CasProcessor.
      Specified by:
      incrementTotalTime in class ProcessingContainer
      Parameters:
      aTime - - total time in process()
    • getTotalTime

      public long getTotalTime()
      Returns total time spent in process().
      Specified by:
      getTotalTime in class ProcessingContainer
      Returns:
      - number of millis spent in process()
    • abortCPMOnError

      public boolean abortCPMOnError()
      Returns true if maximum threshold for errors has been exceeded and the CasProcessor is configured to force CPE shutdown. It looks at the value of the action attribute of the <errorRateThreshold> element in the cpe descriptor.
      Specified by:
      abortCPMOnError in class ProcessingContainer
      Returns:
      - true if the CPE should stop processing, false otherwise
    • isTimeout

      private boolean isTimeout(Throwable aThrowable)
      Returns true if the Exception cause is SocketTimeoutException.
      Parameters:
      aThrowable - - Exception to check for SocketTimeoutException
      Returns:
      - true if Socket Timeout, false otherwise
    • incrementCasProcessorErrors

      public void incrementCasProcessorErrors(Throwable aThrowable) throws Exception
      This routine determines what to do with an exception thrown during the CasProcessor processing. It interprets given exception and throws a new one according to configuration specified in the CPE descriptor. It examines provided thresholds and determines if the CPE should continue to run, if it should disable the CasProcessor (and all its instances), or disregard the error and continue.
      Specified by:
      incrementCasProcessorErrors in class ProcessingContainer
      Parameters:
      aThrowable - - exception to examine
      Throws:
      Exception - the exception
    • isEndOfBatch

      public boolean isEndOfBatch(CasProcessor aCasProcessor, int aProcessedSize) throws ResourceProcessException, IOException
      Specified by:
      isEndOfBatch in class ProcessingContainer
      Throws:
      ResourceProcessException
      IOException
    • processCas

      public boolean processCas(Object[] aCasList)
      Returns true if the Cas bundles should be processed by the CasProcessor. This routine checks for existance of dependent featues defined in the filter expression defined for the CasProcessor in the cpe descriptor. Currently this is done on per bundle basis. Meaning that all Cas'es must contain required features. If even one Cas does not have them, the entire bundle is skipped.
      Specified by:
      processCas in class ProcessingContainer
      Parameters:
      aCasList - - bundle containing instances of CAS
      Returns:
      true, if successful
    • hasFeature

      private boolean hasFeature(CasData aCas)
      Used during filtering, determines if a given Cas has a required feature. Required featured are defined in the filter. Filtering is optional and if not present in the cpe descriptor this routine always returns true.
      Parameters:
      aCas - - Cas instance to check
      Returns:
      - true if feature is in the Cas, false otherwise
    • processCas

      private boolean processCas(Object aCas)
      Checks if a given Cas has required features.
      Parameters:
      aCas - - Cas instance to check
      Returns:
      - true if feature is in the Cas, false otherwise
    • getCasProcessorConfiguration

      public CasProcessorConfiguration getCasProcessorConfiguration()
      Returns CasProcessor configuration object. This object represents xml configuration defined in the <casProcessor> section of the cpe descriptor.
      Specified by:
      getCasProcessorConfiguration in class ProcessingContainer
      Returns:
      CasProcessorConfiguration instance
    • start

      @Deprecated public void start()
      Deprecated.
      Start.
      Specified by:
      start in interface RunnableContainer
    • stop

      @Deprecated public void stop()
      Deprecated.
      Stop.
      Specified by:
      stop in interface RunnableContainer
    • getCasProcessor

      public CasProcessor getCasProcessor()
      Returns available instance of the CasProcessor from the instance pool. It will wait indefinitely until an instance is available.
      Specified by:
      getCasProcessor in interface CasProcessorController
      Returns:
      the cas processor
    • releaseCasProcessor

      public void releaseCasProcessor(CasProcessor aCasProcessor)
      Returns a given casProcessor instance back to the pool.
      Specified by:
      releaseCasProcessor in class ProcessingContainer
      Parameters:
      aCasProcessor - - an instance of CasProcessor to return back to the pool
      See Also:
    • getStatus

      public int getStatus()
      Returns the current status of the CasProcessor.
      Specified by:
      getStatus in interface CasProcessorController
      Returns:
      the status
    • setStatus

      public void setStatus(int aStatus)
      Changes the status of the CasProcessor as a group.
      Specified by:
      setStatus in interface CasProcessorController
      Parameters:
      aStatus - - new status
    • isLocal

      @Deprecated public boolean isLocal()
      Deprecated.
      Checks if is local.
      Specified by:
      isLocal in interface CasProcessorController
      Returns:
      true, if is local
    • isRemote

      @Deprecated public boolean isRemote()
      Deprecated.
      Checks if is remote.
      Specified by:
      isRemote in interface CasProcessorController
      Returns:
      true, if is remote
    • isIntegrated

      @Deprecated public boolean isIntegrated()
      Deprecated.
      Checks if is integrated.
      Specified by:
      isIntegrated in interface CasProcessorController
      Returns:
      true, if is integrated
    • continueOnError

      private boolean continueOnError()
      Returns true if the CasProcessor has been configured to continue despite error.
      Returns:
      - true if ignoring errors, false otherwise
    • isAbortable

      public boolean isAbortable()
      Determines if instances of CasProcessor managed by this container are abortable. Abortable CasProcessor's action attribute in the <errorRateThreshold> element has a value of 'disable'.
      Specified by:
      isAbortable in interface CasProcessorController
      Returns:
      true if CasProcessor can be disabled
    • initialize

      public boolean initialize(ResourceSpecifier aSpecifier, Map aAdditionalParams) throws ResourceInitializationException
      Description copied from interface: Resource
      Initializes this Resource from a ResourceSpecifier. Applications do not need to call this method. It is called automatically by the ResourceFactory and cannot be called a second time.
      Specified by:
      initialize in interface Resource
      Overrides:
      initialize in class Resource_ImplBase
      Parameters:
      aSpecifier - specifies how to create a resource or locate an existing resource service.
      aAdditionalParams - a Map containing additional parameters. May be null if there are no parameters. Each class that implements this interface can decide what additional parameters it supports.
      Returns:
      true if and only if initialization completed successfully. Returns false if the given ResourceSpecifier is not of an appropriate type for this Resource. If the ResourceSpecifier is of an appropriate type but is invalid or if some other failure occurs, an exception should be thrown.
      Throws:
      ResourceInitializationException - if a failure occurs during initialization.
      See Also:
    • destroy

      public void destroy()
      Destroy instances of CasProcessors managed by this container. Before destroying the instance, this method notifies it with CollectionProcessComplete so that the component finalizes its logic and does appropriate cleanup before shutdown.
      Specified by:
      destroy in interface Resource
      Overrides:
      destroy in class Resource_ImplBase
      See Also:
    • run

      public void run()
      Specified by:
      run in interface Runnable
    • getConfigParameterValue

      public Object getConfigParameterValue(String aParamName)
      Description copied from interface: ConfigurableResource
      Looks up the value of a configuration parameter. This method will only return the value of a parameter that is not defined in any group.

      This method returns null if the parameter is optional and has not been assigned a value. (For mandatory parameters, an exception is thrown during initialization if no value has been assigned.) This method also returns null if there is no declared configuration parameter with the specified name.

      Specified by:
      getConfigParameterValue in interface ConfigurableResource
      Parameters:
      aParamName - the name of a parameter that is not in any group
      Returns:
      the value of the parameter with name aParamName, null is either the parameter does not exist or it has not been assigned a value.
    • getConfigParameterValue

      public Object getConfigParameterValue(String aGroupName, String aParamName)
      Description copied from interface: ConfigurableResource
      Looks up the value of a configuration parameter in a group. If the parameter has no value assigned within the group, fallback strategies will be followed.

      This method returns null if the parameter is optional and has not been assigned a value. (For mandatory parameters, an exception is thrown during initialization if no value has been assigned.) This method also returns null if there is no declared configuration parameter with the specified name.

      Specified by:
      getConfigParameterValue in interface ConfigurableResource
      Parameters:
      aGroupName - the name of a configuration group. If the group name is null, this method will return the same value as getParameterValue(String).
      aParamName - the name of a parameter in the group
      Returns:
      the value of the parameter in group aGroupName with name aParamName,,null is either the parameter does not exist or it has not been assigned a value.
    • setConfigParameterValue

      public void setConfigParameterValue(String aParamName, Object aValue)
      Description copied from interface: ConfigurableResource
      Sets the value of a configuration parameter. This only works for a parameter that is not defined in any group. Note that there is no guarantee that the change will take effect until ConfigurableResource.reconfigure() is called.
      Specified by:
      setConfigParameterValue in interface ConfigurableResource
      Parameters:
      aParamName - the name of a parameter that is not in any group
      aValue - the value to assign to the parameter
    • setConfigParameterValue

      public void setConfigParameterValue(String aGroupName, String aParamName, Object aValue)
      Description copied from interface: ConfigurableResource
      Sets the value of a configuration parameter in a group. Note that there is no guarantee that the change will take effect until ConfigurableResource.reconfigure() is called.
      Specified by:
      setConfigParameterValue in interface ConfigurableResource
      Parameters:
      aGroupName - the name of a configuration group. If this parameter is null, this method will have the same effect as setParameterValue(String,Object).
      aParamName - the name of a parameter in the group
      aValue - the value to assign to the parameter.
    • reconfigure

      public void reconfigure() throws ResourceConfigurationException
      Description copied from interface: ConfigurableResource
      Instructs this Resource to re-read its configuration parameter settings.
      Specified by:
      reconfigure in interface ConfigurableResource
      Throws:
      ResourceConfigurationException - if the configuration is not valid
    • getName

      public String getName()
      Returns the name of this container. It is the name of the Cas Processor.
      Specified by:
      getName in class ProcessingContainer
      Returns:
      the name
    • getMetaData

      public ResourceMetaData getMetaData()
      Description copied from interface: Resource
      Gets the metadata that describes this Resource.
      Specified by:
      getMetaData in interface Resource
      Overrides:
      getMetaData in class Resource_ImplBase
      Returns:
      an object containing all metadata for this resource.
      See Also:
    • incrementStat

      public void incrementStat(String aStatName, Integer aStat)
      Increment a value of a given stat.
      Specified by:
      incrementStat in class ProcessingContainer
      Parameters:
      aStatName - the a stat name
      aStat - the a stat
    • addStat

      public void addStat(String aStatName, Object aStat)
      Add an arbitrary object and bind it to a given name.
      Specified by:
      addStat in class ProcessingContainer
      Parameters:
      aStatName - the a stat name
      aStat - the a stat
    • getStat

      public Object getStat(String aStatName)
      Return an abject identified with a given name.
      Specified by:
      getStat in class ProcessingContainer
      Parameters:
      aStatName - the a stat name
      Returns:
      the stat
    • getAllStats

      public HashMap getAllStats()
      Returns all stats aggregate during the CPM run.
      Specified by:
      getAllStats in class ProcessingContainer
      Returns:
      a map of all stats aggregated during the CPM run
    • pause

      public void pause()
      Pauses the container until resumed. The CPM will pause to the Container while it is trying to re-connect to a shared remote service. While the Container is paused getCasProcessor() will not be allowed to return a new CasProcessor. All other methods are accessible and will function fine.
      Specified by:
      pause in class ProcessingContainer
    • resume

      public void resume()
      Specified by:
      resume in class ProcessingContainer
    • isPaused

      public boolean isPaused()
      Specified by:
      isPaused in class ProcessingContainer
    • getPool

      public ServiceProxyPool getPool()
      Specified by:
      getPool in class ProcessingContainer
    • setSingleFencedService

      public void setSingleFencedService(boolean aSingleFencedInstance)
      Specified by:
      setSingleFencedService in class ProcessingContainer
    • isSingleFencedService

      public boolean isSingleFencedService()
      Specified by:
      isSingleFencedService in class ProcessingContainer
    • getFetchTime

      public long getFetchTime()
      Gets the fetch time.
      Returns:
      the fetch time