Class MemcachedConnection

All Implemented Interfaces:
Runnable

public class MemcachedConnection extends SpyThread
Main class for handling connections to a memcached cluster.
  • Field Details

    • DOUBLE_CHECK_EMPTY

      private static final int DOUBLE_CHECK_EMPTY
      The number of empty selects we'll allow before assuming we may have missed one and should check the current selectors. This generally indicates a bug, but we'll check it nonetheless.
      See Also:
    • EXCESSIVE_EMPTY

      private static final int EXCESSIVE_EMPTY
      The number of empty selects we'll allow before blowing up. It's too easy to write a bug that causes it to loop uncontrollably. This helps find those bugs and often works around them.
      See Also:
    • DEFAULT_WAKEUP_DELAY

      private static final int DEFAULT_WAKEUP_DELAY
      The default wakeup delay if not overridden by a system property.
      See Also:
    • DEFAULT_RETRY_QUEUE_SIZE

      private static final int DEFAULT_RETRY_QUEUE_SIZE
      By default, do not bound the retry queue.
      See Also:
    • MAX_CLONE_COUNT

      private static final int MAX_CLONE_COUNT
      If an operation gets cloned more than this ceiling, cancel it for safety reasons.
      See Also:
    • RECON_QUEUE_METRIC

      private static final String RECON_QUEUE_METRIC
      See Also:
    • SHUTD_QUEUE_METRIC

      private static final String SHUTD_QUEUE_METRIC
      See Also:
    • OVERALL_REQUEST_METRIC

      private static final String OVERALL_REQUEST_METRIC
      See Also:
    • OVERALL_AVG_BYTES_WRITE_METRIC

      private static final String OVERALL_AVG_BYTES_WRITE_METRIC
      See Also:
    • OVERALL_AVG_BYTES_READ_METRIC

      private static final String OVERALL_AVG_BYTES_READ_METRIC
      See Also:
    • OVERALL_AVG_TIME_ON_WIRE_METRIC

      private static final String OVERALL_AVG_TIME_ON_WIRE_METRIC
      See Also:
    • OVERALL_RESPONSE_METRIC

      private static final String OVERALL_RESPONSE_METRIC
      See Also:
    • OVERALL_RESPONSE_RETRY_METRIC

      private static final String OVERALL_RESPONSE_RETRY_METRIC
      See Also:
    • OVERALL_RESPONSE_FAIL_METRIC

      private static final String OVERALL_RESPONSE_FAIL_METRIC
      See Also:
    • OVERALL_RESPONSE_SUCC_METRIC

      private static final String OVERALL_RESPONSE_SUCC_METRIC
      See Also:
    • shutDown

      protected volatile boolean shutDown
      If the connection is alread shut down or shutting down.
    • shouldOptimize

      private final boolean shouldOptimize
      If true, optimization will collapse multiple sequential get ops.
    • selector

      protected Selector selector
      Holds the current Selector to use.
    • locator

      protected final NodeLocator locator
      The NodeLocator to use for this connection.
    • failureMode

      protected final FailureMode failureMode
      The configured FailureMode.
    • maxDelay

      private final long maxDelay
      Maximum amount of time to wait between reconnect attempts.
    • emptySelects

      private int emptySelects
      Contains the current number of empty select() calls, which could indicate bugs.
    • bufSize

      private final int bufSize
      The buffer size that will be used when reading from the server.
    • connectionFactory

      private final ConnectionFactory connectionFactory
      The connection factory to create MemcachedNodes from.
    • addedQueue

      protected final ConcurrentLinkedQueue<MemcachedNode> addedQueue
      AddedQueue is used to track the QueueAttachments for which operations have recently been queued.
    • reconnectQueue

      private final SortedMap<Long,MemcachedNode> reconnectQueue
      reconnectQueue contains the attachments that need to be reconnected. The key is the time at which they are eligible for reconnect.
    • running

      protected volatile boolean running
      True if not shutting down or shut down.
    • connObservers

      private final Collection<ConnectionObserver> connObservers
      Holds all connection observers that get notified on connection status changes.
    • opFact

      private final OperationFactory opFact
      The OperationFactory to clone or create operations.
    • timeoutExceptionThreshold

      private final int timeoutExceptionThreshold
      The threshold for timeout exceptions.
    • retryOps

      private final List<Operation> retryOps
      Holds operations that need to be retried.
    • nodesToShutdown

      protected final ConcurrentLinkedQueue<MemcachedNode> nodesToShutdown
      Holds all nodes that are scheduled for shutdown.
    • verifyAliveOnConnect

      private final boolean verifyAliveOnConnect
      If set to true, a proper check after finish connecting is done to see if the node is not responding but really alive.
    • listenerExecutorService

      private final ExecutorService listenerExecutorService
      The ExecutorService to use for callbacks.
    • metrics

      protected final MetricCollector metrics
      The MetricCollector to accumulate metrics (or dummy).
    • metricType

      protected final MetricType metricType
      The current type of metrics to collect.
    • wakeupDelay

      private final int wakeupDelay
      The selector wakeup delay, defaults to 1000ms.
    • retryQueueSize

      private final int retryQueueSize
      Optionally bound the retry queue if set via system property.
  • Constructor Details

  • Method Details

    • registerMetrics

      protected void registerMetrics()
      Register Metrics for collection. Note that these Metrics may or may not take effect, depending on the MetricCollector implementation. This can be controlled from the DefaultConnectionFactory.
    • createConnections

      protected List<MemcachedNode> createConnections(Collection<InetSocketAddress> addrs) throws IOException
      Create connections for the given list of addresses.
      Parameters:
      addrs - the list of addresses to connect to.
      Returns:
      addrs list of MemcachedNodes.
      Throws:
      IOException - if connecting was not successful.
    • selectorsMakeSense

      private boolean selectorsMakeSense()
      Make sure that the current selectors make sense.
      Returns:
      true if they do.
    • handleIO

      public void handleIO() throws IOException
      Handle all IO that flows through the connection. This method is called in an endless loop, listens on NIO selectors and dispatches the underlying read/write calls if needed.
      Throws:
      IOException
    • handleWokenUpSelector

      protected void handleWokenUpSelector()
      Helper method which gets called if the selector is woken up because of the timeout setting, if has been interrupted or if happens during regular write operation phases.

      This method can be overriden by child implementations to handle custom behavior on a manually woken selector, like sending pings through the channels to make sure they are alive.

      Note that there is no guarantee that this method is at all or in the regular interval called, so all overriding implementations need to take that into account. Also, it needs to take into account that it may be called very often under heavy workloads, so it should not perform extensive tasks in the same thread.

    • handleOperationalTasks

      private void handleOperationalTasks() throws IOException
      Helper method for handleIO() to encapsulate everything that needs to be checked on a regular basis that has nothing to do directly with reading and writing data.
      Throws:
      IOException - if an error happens during shutdown queue handling.
    • handleEmptySelects

      private void handleEmptySelects()
      Helper method for handleIO() to handle empty select calls.
    • handleShutdownQueue

      private void handleShutdownQueue() throws IOException
      Check if nodes need to be shut down and do so if needed.
      Throws:
      IOException - if the channel could not be closed properly.
    • checkPotentiallyTimedOutConnection

      private void checkPotentiallyTimedOutConnection()
      Check if one or more nodes exceeded the timeout Threshold.
    • handleInputQueue

      private void handleInputQueue()
      Handle any requests that have been made against the client.
    • addObserver

      public boolean addObserver(ConnectionObserver obs)
      Add a connection observer.
      Returns:
      whether the observer was successfully added.
    • removeObserver

      public boolean removeObserver(ConnectionObserver obs)
      Remove a connection observer.
      Returns:
      true if the observer existed and now doesn't.
    • connected

      private void connected(MemcachedNode node)
      Indicate a successful connect to the given node.
      Parameters:
      node - the node which was successfully connected.
    • lostConnection

      private void lostConnection(MemcachedNode node)
      Indicate a lost connection to the given node.
      Parameters:
      node - the node where the connection was lost.
    • belongsToCluster

      boolean belongsToCluster(MemcachedNode node)
      Makes sure that the given node belongs to the current cluster. Before trying to connect to a node, make sure it actually belongs to the currently connected cluster.
    • handleIO

      private void handleIO(SelectionKey sk)
      Handle IO for a specific selector. Any IOException will cause a reconnect. Note that this code makes sure that the corresponding node is not only able to connect, but also able to respond in a correct fashion (if verifyAliveOnConnect is set to true through a property). This is handled by issuing a dummy version/noop call and making sure it returns in a correct and timely fashion.
      Parameters:
      sk - the selector to handle IO against.
    • handleReadsAndWrites

      private void handleReadsAndWrites(SelectionKey sk, MemcachedNode node) throws IOException
      A helper method for handleIO(java.nio.channels.SelectionKey) to handle reads and writes if appropriate.
      Parameters:
      sk - the selection key to use.
      node - th enode to read write from.
      Throws:
      IOException - if an error occurs during read/write.
    • finishConnect

      private void finishConnect(SelectionKey sk, MemcachedNode node) throws IOException
      Finish the connect phase and potentially verify its liveness.
      Parameters:
      sk - the selection key for the node.
      node - the actual node.
      Throws:
      IOException - if something goes wrong during reading/writing.
    • handleWrites

      private void handleWrites(MemcachedNode node) throws IOException
      Handle pending writes for the given node.
      Parameters:
      node - the node to handle writes for.
      Throws:
      IOException - can be raised during writing failures.
    • handleReads

      private void handleReads(MemcachedNode node) throws IOException
      Handle pending reads for the given node.
      Parameters:
      node - the node to handle reads for.
      Throws:
      IOException - can be raised during reading failures.
    • readBufferAndLogMetrics

      private void readBufferAndLogMetrics(Operation currentOp, ByteBuffer rbuf, MemcachedNode node) throws IOException
      Read from the buffer and add metrics information.
      Parameters:
      currentOp - the current operation to read.
      rbuf - the read buffer to read from.
      node - the node to read from.
      Throws:
      IOException - if reading was not successful.
    • handleReadsWhenChannelEndOfStream

      private Operation handleReadsWhenChannelEndOfStream(Operation currentOp, MemcachedNode node, ByteBuffer rbuf) throws IOException
      Deal with an operation where the channel reached the end of a stream.
      Parameters:
      currentOp - the current operation to read.
      node - the node for that operation.
      rbuf - the read buffer.
      Returns:
      the next operation on the node to read.
      Throws:
      IOException - if disconnect while reading.
    • dbgBuffer

      static String dbgBuffer(ByteBuffer b, int size)
      Convert the ByteBuffer into a string for easier debugging.
      Parameters:
      b - the buffer to debug.
      size - the size of the buffer.
      Returns:
      the stringified ByteBuffer.
    • handleRetryInformation

      protected void handleRetryInformation(byte[] retryMessage)
      Optionally handle retry (NOT_MY_VBUKET) responses. This method can be overridden in subclasses to handle the content of the retry message appropriately.
      Parameters:
      retryMessage - the body of the retry message.
    • queueReconnect

      protected void queueReconnect(MemcachedNode node)
      Enqueue the given MemcachedNode for reconnect.
      Parameters:
      node - the node to reconnect.
    • cancelOperations

      private void cancelOperations(Collection<Operation> ops)
      Cancel the given collection of operations.
      Parameters:
      ops - the list of operations to cancel.
    • redistributeOperations

      public void redistributeOperations(Collection<Operation> ops)
      Redistribute the given list of operations to (potentially) other nodes. Note that operations can only be redistributed if they have not been cancelled already, timed out already or do not have definite targets (a key).
      Parameters:
      ops - the operations to redistribute.
    • redistributeOperation

      public void redistributeOperation(Operation op)
      Redistribute the given operation to (potentially) other nodes. Note that operations can only be redistributed if they have not been cancelled already, timed out already or do not have definite targets (a key).
      Parameters:
      op - the operation to redistribute.
    • attemptReconnects

      private void attemptReconnects()
      Attempt to reconnect MemcachedNodes in the reconnect queue. If the MemcachedNode does not belong to the cluster list anymore, the reconnect attempt is cancelled. If it does, the code tries to reconnect immediately and if this is not possible it waits until the connection information arrives. Note that if a socket error arises during reconnect, the node is scheduled for re-reconnect immediately.
    • potentiallyCloseLeakingChannel

      private void potentiallyCloseLeakingChannel(SocketChannel ch, MemcachedNode node)
      Make sure channel connections are not leaked and properly close under faulty reconnect cirumstances.
      Parameters:
      ch - the channel to potentially close.
      node - the node to which the channel should be bound to.
    • getLocator

      public NodeLocator getLocator()
      Returns the NodeLocator in use for this connection.
      Returns:
      the current NodeLocator.
    • enqueueOperation

      public void enqueueOperation(String key, Operation o)
      Enqueue the given Operation with the used key.
      Parameters:
      key - the key to use.
      o - the Operation to enqueue.
    • addOperation

      protected void addOperation(String key, Operation o)
      Add an operation to a connection identified by the given key. If the MemcachedNode is active or the FailureMode is set to retry, the primary node will be used for that key. If the primary node is not available and the FailureMode cancel is used, the operation will be cancelled without further retry. For any other FailureMode mechanisms (Redistribute), another possible node is used (only if its active as well). If no other active node could be identified, the original primary node is used and retried.
      Parameters:
      key - the key the operation is operating upon.
      o - the operation to add.
    • insertOperation

      public void insertOperation(MemcachedNode node, Operation o)
      Insert an operation on the given node to the beginning of the queue.
      Parameters:
      node - the node where to insert the Operation.
      o - the operation to insert.
    • addOperation

      protected void addOperation(MemcachedNode node, Operation o)
      Enqueue an operation on the given node.
      Parameters:
      node - the node where to enqueue the Operation.
      o - the operation to add.
    • addOperations

      public void addOperations(Map<MemcachedNode,Operation> ops)
      Enqueue the given list of operations on each handling node.
      Parameters:
      ops - the operations for each node.
    • broadcastOperation

      public CountDownLatch broadcastOperation(BroadcastOpFactory of)
      Broadcast an operation to all nodes.
      Returns:
      a CountDownLatch that will be counted down when the operations are complete.
    • broadcastOperation

      public CountDownLatch broadcastOperation(BroadcastOpFactory of, Collection<MemcachedNode> nodes)
      Broadcast an operation to a collection of nodes.
      Returns:
      a CountDownLatch that will be counted down when the operations are complete.
    • shutdown

      public void shutdown() throws IOException
      Shut down all connections and do not accept further incoming ops.
      Throws:
      IOException
    • toString

      public String toString()
      Overrides:
      toString in class Thread
    • connectionsStatus

      public String connectionsStatus()
      Construct a String containing information about all nodes and their state.
      Returns:
      a stringified representation of the connection status.
    • opTimedOut

      public static void opTimedOut(Operation op)
      Increase the timeout counter for the given handling node.
      Parameters:
      op - the operation to grab the node from.
    • opSucceeded

      public static void opSucceeded(Operation op)
      Reset the timeout counter for the given handling node.
      Parameters:
      op - the operation to grab the node from.
    • setTimeout

      private static void setTimeout(Operation op, boolean isTimeout)
      Set the continuous timeout on an operation. Ignore operations which have no handling nodes set yet (which may happen before nodes are properly authenticated).
      Parameters:
      op - the operation to use.
      isTimeout - is timed out or not.
    • checkState

      protected void checkState()
      Check to see if this connection is shutting down.
      Throws:
      IllegalStateException - when shutting down.
    • run

      public void run()
      Handle IO as long as the application is running.
      Specified by:
      run in interface Runnable
      Overrides:
      run in class Thread
    • logRunException

      private void logRunException(Exception e)
      Log a exception to different levels depending on the state. Exceptions get logged at debug level when happening during shutdown, but at warning level when operating normally.
      Parameters:
      e - the exception to log.
    • isShutDown

      public boolean isShutDown()
      Returns whether the connection is shut down or not.
      Returns:
      true if the connection is shut down, false otherwise.
    • retryOperation

      public void retryOperation(Operation op)
      Add a operation to the retry queue. If the retry queue size is bounded and the size of the queue is reaching that boundary, the operation is cancelled rather than added to the retry queue.
      Parameters:
      op - the operation to retry.