Class MemcachedConnection

  • All Implemented Interfaces:
    java.lang.Runnable

    public class MemcachedConnection
    extends SpyThread
    Main class for handling connections to a memcached cluster.
    • Field Detail

      • DOUBLE_CHECK_EMPTY

        private static final int DOUBLE_CHECK_EMPTY
        The number of empty selects we'll allow before assuming we may have missed one and should check the current selectors. This generally indicates a bug, but we'll check it nonetheless.
        See Also:
        Constant Field Values
      • EXCESSIVE_EMPTY

        private static final int EXCESSIVE_EMPTY
        The number of empty selects we'll allow before blowing up. It's too easy to write a bug that causes it to loop uncontrollably. This helps find those bugs and often works around them.
        See Also:
        Constant Field Values
      • DEFAULT_WAKEUP_DELAY

        private static final int DEFAULT_WAKEUP_DELAY
        The default wakeup delay if not overridden by a system property.
        See Also:
        Constant Field Values
      • DEFAULT_RETRY_QUEUE_SIZE

        private static final int DEFAULT_RETRY_QUEUE_SIZE
        By default, do not bound the retry queue.
        See Also:
        Constant Field Values
      • MAX_CLONE_COUNT

        private static final int MAX_CLONE_COUNT
        If an operation gets cloned more than this ceiling, cancel it for safety reasons.
        See Also:
        Constant Field Values
      • RECON_QUEUE_METRIC

        private static final java.lang.String RECON_QUEUE_METRIC
        See Also:
        Constant Field Values
      • SHUTD_QUEUE_METRIC

        private static final java.lang.String SHUTD_QUEUE_METRIC
        See Also:
        Constant Field Values
      • OVERALL_REQUEST_METRIC

        private static final java.lang.String OVERALL_REQUEST_METRIC
        See Also:
        Constant Field Values
      • OVERALL_AVG_BYTES_WRITE_METRIC

        private static final java.lang.String OVERALL_AVG_BYTES_WRITE_METRIC
        See Also:
        Constant Field Values
      • OVERALL_AVG_BYTES_READ_METRIC

        private static final java.lang.String OVERALL_AVG_BYTES_READ_METRIC
        See Also:
        Constant Field Values
      • OVERALL_AVG_TIME_ON_WIRE_METRIC

        private static final java.lang.String OVERALL_AVG_TIME_ON_WIRE_METRIC
        See Also:
        Constant Field Values
      • OVERALL_RESPONSE_METRIC

        private static final java.lang.String OVERALL_RESPONSE_METRIC
        See Also:
        Constant Field Values
      • OVERALL_RESPONSE_RETRY_METRIC

        private static final java.lang.String OVERALL_RESPONSE_RETRY_METRIC
        See Also:
        Constant Field Values
      • OVERALL_RESPONSE_FAIL_METRIC

        private static final java.lang.String OVERALL_RESPONSE_FAIL_METRIC
        See Also:
        Constant Field Values
      • OVERALL_RESPONSE_SUCC_METRIC

        private static final java.lang.String OVERALL_RESPONSE_SUCC_METRIC
        See Also:
        Constant Field Values
      • shutDown

        protected volatile boolean shutDown
        If the connection is alread shut down or shutting down.
      • shouldOptimize

        private final boolean shouldOptimize
        If true, optimization will collapse multiple sequential get ops.
      • selector

        protected java.nio.channels.Selector selector
        Holds the current Selector to use.
      • maxDelay

        private final long maxDelay
        Maximum amount of time to wait between reconnect attempts.
      • emptySelects

        private int emptySelects
        Contains the current number of empty select() calls, which could indicate bugs.
      • bufSize

        private final int bufSize
        The buffer size that will be used when reading from the server.
      • addedQueue

        protected final java.util.concurrent.ConcurrentLinkedQueue<MemcachedNode> addedQueue
        AddedQueue is used to track the QueueAttachments for which operations have recently been queued.
      • reconnectQueue

        private final java.util.SortedMap<java.lang.Long,​MemcachedNode> reconnectQueue
        reconnectQueue contains the attachments that need to be reconnected. The key is the time at which they are eligible for reconnect.
      • running

        protected volatile boolean running
        True if not shutting down or shut down.
      • connObservers

        private final java.util.Collection<ConnectionObserver> connObservers
        Holds all connection observers that get notified on connection status changes.
      • timeoutExceptionThreshold

        private final int timeoutExceptionThreshold
        The threshold for timeout exceptions.
      • retryOps

        private final java.util.List<Operation> retryOps
        Holds operations that need to be retried.
      • nodesToShutdown

        protected final java.util.concurrent.ConcurrentLinkedQueue<MemcachedNode> nodesToShutdown
        Holds all nodes that are scheduled for shutdown.
      • verifyAliveOnConnect

        private final boolean verifyAliveOnConnect
        If set to true, a proper check after finish connecting is done to see if the node is not responding but really alive.
      • listenerExecutorService

        private final java.util.concurrent.ExecutorService listenerExecutorService
        The ExecutorService to use for callbacks.
      • metricType

        protected final MetricType metricType
        The current type of metrics to collect.
      • wakeupDelay

        private final int wakeupDelay
        The selector wakeup delay, defaults to 1000ms.
      • retryQueueSize

        private final int retryQueueSize
        Optionally bound the retry queue if set via system property.
    • Constructor Detail

      • MemcachedConnection

        public MemcachedConnection​(int bufSize,
                                   ConnectionFactory f,
                                   java.util.List<java.net.InetSocketAddress> a,
                                   java.util.Collection<ConnectionObserver> obs,
                                   FailureMode fm,
                                   OperationFactory opfactory)
                            throws java.io.IOException
        Construct a MemcachedConnection.
        Parameters:
        bufSize - the size of the buffer used for reading from the server.
        f - the factory that will provide an operation queue.
        a - the addresses of the servers to connect to.
        obs - the initial observers to add.
        fm - the failure mode to use.
        opfactory - the operation factory.
        Throws:
        java.io.IOException - if a connection attempt fails early
    • Method Detail

      • registerMetrics

        protected void registerMetrics()
        Register Metrics for collection. Note that these Metrics may or may not take effect, depending on the MetricCollector implementation. This can be controlled from the DefaultConnectionFactory.
      • createConnections

        protected java.util.List<MemcachedNode> createConnections​(java.util.Collection<java.net.InetSocketAddress> addrs)
                                                           throws java.io.IOException
        Create connections for the given list of addresses.
        Parameters:
        addrs - the list of addresses to connect to.
        Returns:
        addrs list of MemcachedNodes.
        Throws:
        java.io.IOException - if connecting was not successful.
      • selectorsMakeSense

        private boolean selectorsMakeSense()
        Make sure that the current selectors make sense.
        Returns:
        true if they do.
      • handleIO

        public void handleIO()
                      throws java.io.IOException
        Handle all IO that flows through the connection. This method is called in an endless loop, listens on NIO selectors and dispatches the underlying read/write calls if needed.
        Throws:
        java.io.IOException
      • handleWokenUpSelector

        protected void handleWokenUpSelector()
        Helper method which gets called if the selector is woken up because of the timeout setting, if has been interrupted or if happens during regular write operation phases.

        This method can be overriden by child implementations to handle custom behavior on a manually woken selector, like sending pings through the channels to make sure they are alive.

        Note that there is no guarantee that this method is at all or in the regular interval called, so all overriding implementations need to take that into account. Also, it needs to take into account that it may be called very often under heavy workloads, so it should not perform extensive tasks in the same thread.

      • handleOperationalTasks

        private void handleOperationalTasks()
                                     throws java.io.IOException
        Helper method for handleIO() to encapsulate everything that needs to be checked on a regular basis that has nothing to do directly with reading and writing data.
        Throws:
        java.io.IOException - if an error happens during shutdown queue handling.
      • handleEmptySelects

        private void handleEmptySelects()
        Helper method for handleIO() to handle empty select calls.
      • handleShutdownQueue

        private void handleShutdownQueue()
                                  throws java.io.IOException
        Check if nodes need to be shut down and do so if needed.
        Throws:
        java.io.IOException - if the channel could not be closed properly.
      • checkPotentiallyTimedOutConnection

        private void checkPotentiallyTimedOutConnection()
        Check if one or more nodes exceeded the timeout Threshold.
      • handleInputQueue

        private void handleInputQueue()
        Handle any requests that have been made against the client.
      • addObserver

        public boolean addObserver​(ConnectionObserver obs)
        Add a connection observer.
        Returns:
        whether the observer was successfully added.
      • removeObserver

        public boolean removeObserver​(ConnectionObserver obs)
        Remove a connection observer.
        Returns:
        true if the observer existed and now doesn't.
      • connected

        private void connected​(MemcachedNode node)
        Indicate a successful connect to the given node.
        Parameters:
        node - the node which was successfully connected.
      • lostConnection

        private void lostConnection​(MemcachedNode node)
        Indicate a lost connection to the given node.
        Parameters:
        node - the node where the connection was lost.
      • belongsToCluster

        boolean belongsToCluster​(MemcachedNode node)
        Makes sure that the given node belongs to the current cluster. Before trying to connect to a node, make sure it actually belongs to the currently connected cluster.
      • handleIO

        private void handleIO​(java.nio.channels.SelectionKey sk)
        Handle IO for a specific selector. Any IOException will cause a reconnect. Note that this code makes sure that the corresponding node is not only able to connect, but also able to respond in a correct fashion (if verifyAliveOnConnect is set to true through a property). This is handled by issuing a dummy version/noop call and making sure it returns in a correct and timely fashion.
        Parameters:
        sk - the selector to handle IO against.
      • handleReadsAndWrites

        private void handleReadsAndWrites​(java.nio.channels.SelectionKey sk,
                                          MemcachedNode node)
                                   throws java.io.IOException
        A helper method for handleIO(java.nio.channels.SelectionKey) to handle reads and writes if appropriate.
        Parameters:
        sk - the selection key to use.
        node - th enode to read write from.
        Throws:
        java.io.IOException - if an error occurs during read/write.
      • finishConnect

        private void finishConnect​(java.nio.channels.SelectionKey sk,
                                   MemcachedNode node)
                            throws java.io.IOException
        Finish the connect phase and potentially verify its liveness.
        Parameters:
        sk - the selection key for the node.
        node - the actual node.
        Throws:
        java.io.IOException - if something goes wrong during reading/writing.
      • handleWrites

        private void handleWrites​(MemcachedNode node)
                           throws java.io.IOException
        Handle pending writes for the given node.
        Parameters:
        node - the node to handle writes for.
        Throws:
        java.io.IOException - can be raised during writing failures.
      • handleReads

        private void handleReads​(MemcachedNode node)
                          throws java.io.IOException
        Handle pending reads for the given node.
        Parameters:
        node - the node to handle reads for.
        Throws:
        java.io.IOException - can be raised during reading failures.
      • readBufferAndLogMetrics

        private void readBufferAndLogMetrics​(Operation currentOp,
                                             java.nio.ByteBuffer rbuf,
                                             MemcachedNode node)
                                      throws java.io.IOException
        Read from the buffer and add metrics information.
        Parameters:
        currentOp - the current operation to read.
        rbuf - the read buffer to read from.
        node - the node to read from.
        Throws:
        java.io.IOException - if reading was not successful.
      • handleReadsWhenChannelEndOfStream

        private Operation handleReadsWhenChannelEndOfStream​(Operation currentOp,
                                                            MemcachedNode node,
                                                            java.nio.ByteBuffer rbuf)
                                                     throws java.io.IOException
        Deal with an operation where the channel reached the end of a stream.
        Parameters:
        currentOp - the current operation to read.
        node - the node for that operation.
        rbuf - the read buffer.
        Returns:
        the next operation on the node to read.
        Throws:
        java.io.IOException - if disconnect while reading.
      • dbgBuffer

        static java.lang.String dbgBuffer​(java.nio.ByteBuffer b,
                                          int size)
        Convert the ByteBuffer into a string for easier debugging.
        Parameters:
        b - the buffer to debug.
        size - the size of the buffer.
        Returns:
        the stringified ByteBuffer.
      • handleRetryInformation

        protected void handleRetryInformation​(byte[] retryMessage)
        Optionally handle retry (NOT_MY_VBUKET) responses. This method can be overridden in subclasses to handle the content of the retry message appropriately.
        Parameters:
        retryMessage - the body of the retry message.
      • queueReconnect

        protected void queueReconnect​(MemcachedNode node)
        Enqueue the given MemcachedNode for reconnect.
        Parameters:
        node - the node to reconnect.
      • cancelOperations

        private void cancelOperations​(java.util.Collection<Operation> ops)
        Cancel the given collection of operations.
        Parameters:
        ops - the list of operations to cancel.
      • redistributeOperations

        public void redistributeOperations​(java.util.Collection<Operation> ops)
        Redistribute the given list of operations to (potentially) other nodes. Note that operations can only be redistributed if they have not been cancelled already, timed out already or do not have definite targets (a key).
        Parameters:
        ops - the operations to redistribute.
      • redistributeOperation

        public void redistributeOperation​(Operation op)
        Redistribute the given operation to (potentially) other nodes. Note that operations can only be redistributed if they have not been cancelled already, timed out already or do not have definite targets (a key).
        Parameters:
        op - the operation to redistribute.
      • attemptReconnects

        private void attemptReconnects()
        Attempt to reconnect MemcachedNodes in the reconnect queue. If the MemcachedNode does not belong to the cluster list anymore, the reconnect attempt is cancelled. If it does, the code tries to reconnect immediately and if this is not possible it waits until the connection information arrives. Note that if a socket error arises during reconnect, the node is scheduled for re-reconnect immediately.
      • potentiallyCloseLeakingChannel

        private void potentiallyCloseLeakingChannel​(java.nio.channels.SocketChannel ch,
                                                    MemcachedNode node)
        Make sure channel connections are not leaked and properly close under faulty reconnect cirumstances.
        Parameters:
        ch - the channel to potentially close.
        node - the node to which the channel should be bound to.
      • enqueueOperation

        public void enqueueOperation​(java.lang.String key,
                                     Operation o)
        Enqueue the given Operation with the used key.
        Parameters:
        key - the key to use.
        o - the Operation to enqueue.
      • addOperation

        protected void addOperation​(java.lang.String key,
                                    Operation o)
        Add an operation to a connection identified by the given key. If the MemcachedNode is active or the FailureMode is set to retry, the primary node will be used for that key. If the primary node is not available and the FailureMode cancel is used, the operation will be cancelled without further retry. For any other FailureMode mechanisms (Redistribute), another possible node is used (only if its active as well). If no other active node could be identified, the original primary node is used and retried.
        Parameters:
        key - the key the operation is operating upon.
        o - the operation to add.
      • insertOperation

        public void insertOperation​(MemcachedNode node,
                                    Operation o)
        Insert an operation on the given node to the beginning of the queue.
        Parameters:
        node - the node where to insert the Operation.
        o - the operation to insert.
      • addOperation

        protected void addOperation​(MemcachedNode node,
                                    Operation o)
        Enqueue an operation on the given node.
        Parameters:
        node - the node where to enqueue the Operation.
        o - the operation to add.
      • addOperations

        public void addOperations​(java.util.Map<MemcachedNode,​Operation> ops)
        Enqueue the given list of operations on each handling node.
        Parameters:
        ops - the operations for each node.
      • broadcastOperation

        public java.util.concurrent.CountDownLatch broadcastOperation​(BroadcastOpFactory of)
        Broadcast an operation to all nodes.
        Returns:
        a CountDownLatch that will be counted down when the operations are complete.
      • broadcastOperation

        public java.util.concurrent.CountDownLatch broadcastOperation​(BroadcastOpFactory of,
                                                                      java.util.Collection<MemcachedNode> nodes)
        Broadcast an operation to a collection of nodes.
        Returns:
        a CountDownLatch that will be counted down when the operations are complete.
      • shutdown

        public void shutdown()
                      throws java.io.IOException
        Shut down all connections and do not accept further incoming ops.
        Throws:
        java.io.IOException
      • toString

        public java.lang.String toString()
        Overrides:
        toString in class java.lang.Thread
      • connectionsStatus

        public java.lang.String connectionsStatus()
        Construct a String containing information about all nodes and their state.
        Returns:
        a stringified representation of the connection status.
      • opTimedOut

        public static void opTimedOut​(Operation op)
        Increase the timeout counter for the given handling node.
        Parameters:
        op - the operation to grab the node from.
      • opSucceeded

        public static void opSucceeded​(Operation op)
        Reset the timeout counter for the given handling node.
        Parameters:
        op - the operation to grab the node from.
      • setTimeout

        private static void setTimeout​(Operation op,
                                       boolean isTimeout)
        Set the continuous timeout on an operation. Ignore operations which have no handling nodes set yet (which may happen before nodes are properly authenticated).
        Parameters:
        op - the operation to use.
        isTimeout - is timed out or not.
      • checkState

        protected void checkState()
        Check to see if this connection is shutting down.
        Throws:
        java.lang.IllegalStateException - when shutting down.
      • run

        public void run()
        Handle IO as long as the application is running.
        Specified by:
        run in interface java.lang.Runnable
        Overrides:
        run in class java.lang.Thread
      • logRunException

        private void logRunException​(java.lang.Exception e)
        Log a exception to different levels depending on the state. Exceptions get logged at debug level when happening during shutdown, but at warning level when operating normally.
        Parameters:
        e - the exception to log.
      • isShutDown

        public boolean isShutDown()
        Returns whether the connection is shut down or not.
        Returns:
        true if the connection is shut down, false otherwise.
      • retryOperation

        public void retryOperation​(Operation op)
        Add a operation to the retry queue. If the retry queue size is bounded and the size of the queue is reaching that boundary, the operation is cancelled rather than added to the retry queue.
        Parameters:
        op - the operation to retry.