Package net.spy.memcached
Class MemcachedConnection
java.lang.Object
java.lang.Thread
net.spy.memcached.compat.SpyThread
net.spy.memcached.MemcachedConnection
- All Implemented Interfaces:
Runnable
Main class for handling connections to a memcached cluster.
-
Nested Class Summary
Nested classes/interfaces inherited from class java.lang.Thread
Thread.Builder, Thread.State, Thread.UncaughtExceptionHandler
-
Field Summary
FieldsModifier and TypeFieldDescriptionprotected final ConcurrentLinkedQueue
<MemcachedNode> AddedQueue is used to track the QueueAttachments for which operations have recently been queued.private final int
The buffer size that will be used when reading from the server.private final ConnectionFactory
The connection factory to createMemcachedNode
s from.private final Collection
<ConnectionObserver> Holds all connection observers that get notified on connection status changes.private static final int
By default, do not bound the retry queue.private static final int
The default wakeup delay if not overridden by a system property.private static final int
The number of empty selects we'll allow before assuming we may have missed one and should check the current selectors.private int
Contains the current number of empty select() calls, which could indicate bugs.private static final int
The number of empty selects we'll allow before blowing up.protected final FailureMode
The configuredFailureMode
.private final ExecutorService
TheExecutorService
to use for callbacks.protected final NodeLocator
TheNodeLocator
to use for this connection.private static final int
If an operation gets cloned more than this ceiling, cancel it for safety reasons.private final long
Maximum amount of time to wait between reconnect attempts.protected final MetricCollector
TheMetricCollector
to accumulate metrics (or dummy).protected final MetricType
The current type of metrics to collect.protected final ConcurrentLinkedQueue
<MemcachedNode> Holds all nodes that are scheduled for shutdown.private final OperationFactory
TheOperationFactory
to clone or create operations.private static final String
private static final String
private static final String
private static final String
private static final String
private static final String
private static final String
private static final String
private static final String
private final SortedMap
<Long, MemcachedNode> reconnectQueue contains the attachments that need to be reconnected.Holds operations that need to be retried.private final int
Optionally bound the retry queue if set via system property.protected boolean
True if not shutting down or shut down.protected Selector
Holds the currentSelector
to use.private final boolean
If true, optimization will collapse multiple sequential get ops.private static final String
protected boolean
If the connection is alread shut down or shutting down.private final int
The threshold for timeout exceptions.private final boolean
If set to true, a proper check after finish connecting is done to see if the node is not responding but really alive.private final int
The selector wakeup delay, defaults to 1000ms.Fields inherited from class java.lang.Thread
MAX_PRIORITY, MIN_PRIORITY, NORM_PRIORITY
-
Constructor Summary
ConstructorsConstructorDescriptionMemcachedConnection
(int bufSize, ConnectionFactory f, List<InetSocketAddress> a, Collection<ConnectionObserver> obs, FailureMode fm, OperationFactory opfactory) Construct aMemcachedConnection
. -
Method Summary
Modifier and TypeMethodDescriptionboolean
Add a connection observer.protected void
addOperation
(String key, Operation o) Add an operation to a connection identified by the given key.protected void
addOperation
(MemcachedNode node, Operation o) Enqueue an operation on the given node.void
Enqueue the given list of operations on each handling node.private void
Attempt to reconnectMemcachedNode
s in the reconnect queue.(package private) boolean
Makes sure that the given node belongs to the current cluster.Broadcast an operation to all nodes.broadcastOperation
(BroadcastOpFactory of, Collection<MemcachedNode> nodes) Broadcast an operation to a collection of nodes.private void
Cancel the given collection of operations.private void
Check if one or more nodes exceeded the timeout Threshold.protected void
Check to see if this connection is shutting down.private void
connected
(MemcachedNode node) Indicate a successful connect to the given node.Construct a String containing information about all nodes and their state.protected List
<MemcachedNode> Create connections for the given list of addresses.(package private) static String
dbgBuffer
(ByteBuffer b, int size) Convert theByteBuffer
into a string for easier debugging.void
enqueueOperation
(String key, Operation o) Enqueue the givenOperation
with the used key.private void
finishConnect
(SelectionKey sk, MemcachedNode node) Finish the connect phase and potentially verify its liveness.Returns theNodeLocator
in use for this connection.private void
Helper method forhandleIO()
to handle empty select calls.private void
Handle any requests that have been made against the client.void
handleIO()
Handle all IO that flows through the connection.private void
handleIO
(SelectionKey sk) Handle IO for a specific selector.private void
Helper method forhandleIO()
to encapsulate everything that needs to be checked on a regular basis that has nothing to do directly with reading and writing data.private void
handleReads
(MemcachedNode node) Handle pending reads for the given node.private void
handleReadsAndWrites
(SelectionKey sk, MemcachedNode node) A helper method forhandleIO(java.nio.channels.SelectionKey)
to handle reads and writes if appropriate.private Operation
handleReadsWhenChannelEndOfStream
(Operation currentOp, MemcachedNode node, ByteBuffer rbuf) Deal with an operation where the channel reached the end of a stream.protected void
handleRetryInformation
(byte[] retryMessage) Optionally handle retry (NOT_MY_VBUKET) responses.private void
Check if nodes need to be shut down and do so if needed.protected void
Helper method which gets called if the selector is woken up because of the timeout setting, if has been interrupted or if happens during regular write operation phases.private void
handleWrites
(MemcachedNode node) Handle pending writes for the given node.void
insertOperation
(MemcachedNode node, Operation o) Insert an operation on the given node to the beginning of the queue.boolean
Returns whether the connection is shut down or not.private void
Log a exception to different levels depending on the state.private void
lostConnection
(MemcachedNode node) Indicate a lost connection to the given node.static void
opSucceeded
(Operation op) Reset the timeout counter for the given handling node.static void
opTimedOut
(Operation op) Increase the timeout counter for the given handling node.private void
Make sure channel connections are not leaked and properly close under faulty reconnect cirumstances.protected void
queueReconnect
(MemcachedNode node) Enqueue the givenMemcachedNode
for reconnect.private void
readBufferAndLogMetrics
(Operation currentOp, ByteBuffer rbuf, MemcachedNode node) Read from the buffer and add metrics information.void
Redistribute the given operation to (potentially) other nodes.void
Redistribute the given list of operations to (potentially) other nodes.protected void
Register Metrics for collection.boolean
Remove a connection observer.void
Add a operation to the retry queue.void
run()
Handle IO as long as the application is running.private boolean
Make sure that the current selectors make sense.private static void
setTimeout
(Operation op, boolean isTimeout) Set the continuous timeout on an operation.void
shutdown()
Shut down all connections and do not accept further incoming ops.toString()
Methods inherited from class java.lang.Thread
activeCount, checkAccess, clone, countStackFrames, currentThread, dumpStack, enumerate, getAllStackTraces, getContextClassLoader, getDefaultUncaughtExceptionHandler, getId, getName, getPriority, getStackTrace, getState, getThreadGroup, getUncaughtExceptionHandler, holdsLock, interrupt, interrupted, isAlive, isDaemon, isInterrupted, isVirtual, join, join, join, join, ofPlatform, ofVirtual, onSpinWait, resume, setContextClassLoader, setDaemon, setDefaultUncaughtExceptionHandler, setName, setPriority, setUncaughtExceptionHandler, sleep, sleep, sleep, start, startVirtualThread, stop, suspend, threadId, yield
-
Field Details
-
DOUBLE_CHECK_EMPTY
private static final int DOUBLE_CHECK_EMPTYThe number of empty selects we'll allow before assuming we may have missed one and should check the current selectors. This generally indicates a bug, but we'll check it nonetheless.- See Also:
-
EXCESSIVE_EMPTY
private static final int EXCESSIVE_EMPTYThe number of empty selects we'll allow before blowing up. It's too easy to write a bug that causes it to loop uncontrollably. This helps find those bugs and often works around them.- See Also:
-
DEFAULT_WAKEUP_DELAY
private static final int DEFAULT_WAKEUP_DELAYThe default wakeup delay if not overridden by a system property.- See Also:
-
DEFAULT_RETRY_QUEUE_SIZE
private static final int DEFAULT_RETRY_QUEUE_SIZEBy default, do not bound the retry queue.- See Also:
-
MAX_CLONE_COUNT
private static final int MAX_CLONE_COUNTIf an operation gets cloned more than this ceiling, cancel it for safety reasons.- See Also:
-
RECON_QUEUE_METRIC
- See Also:
-
SHUTD_QUEUE_METRIC
- See Also:
-
OVERALL_REQUEST_METRIC
- See Also:
-
OVERALL_AVG_BYTES_WRITE_METRIC
- See Also:
-
OVERALL_AVG_BYTES_READ_METRIC
- See Also:
-
OVERALL_AVG_TIME_ON_WIRE_METRIC
- See Also:
-
OVERALL_RESPONSE_METRIC
- See Also:
-
OVERALL_RESPONSE_RETRY_METRIC
- See Also:
-
OVERALL_RESPONSE_FAIL_METRIC
- See Also:
-
OVERALL_RESPONSE_SUCC_METRIC
- See Also:
-
shutDown
protected volatile boolean shutDownIf the connection is alread shut down or shutting down. -
shouldOptimize
private final boolean shouldOptimizeIf true, optimization will collapse multiple sequential get ops. -
selector
Holds the currentSelector
to use. -
locator
TheNodeLocator
to use for this connection. -
failureMode
The configuredFailureMode
. -
maxDelay
private final long maxDelayMaximum amount of time to wait between reconnect attempts. -
emptySelects
private int emptySelectsContains the current number of empty select() calls, which could indicate bugs. -
bufSize
private final int bufSizeThe buffer size that will be used when reading from the server. -
connectionFactory
The connection factory to createMemcachedNode
s from. -
addedQueue
AddedQueue is used to track the QueueAttachments for which operations have recently been queued. -
reconnectQueue
reconnectQueue contains the attachments that need to be reconnected. The key is the time at which they are eligible for reconnect. -
running
protected volatile boolean runningTrue if not shutting down or shut down. -
connObservers
Holds all connection observers that get notified on connection status changes. -
opFact
TheOperationFactory
to clone or create operations. -
timeoutExceptionThreshold
private final int timeoutExceptionThresholdThe threshold for timeout exceptions. -
retryOps
Holds operations that need to be retried. -
nodesToShutdown
Holds all nodes that are scheduled for shutdown. -
verifyAliveOnConnect
private final boolean verifyAliveOnConnectIf set to true, a proper check after finish connecting is done to see if the node is not responding but really alive. -
listenerExecutorService
TheExecutorService
to use for callbacks. -
metrics
TheMetricCollector
to accumulate metrics (or dummy). -
metricType
The current type of metrics to collect. -
wakeupDelay
private final int wakeupDelayThe selector wakeup delay, defaults to 1000ms. -
retryQueueSize
private final int retryQueueSizeOptionally bound the retry queue if set via system property.
-
-
Constructor Details
-
MemcachedConnection
public MemcachedConnection(int bufSize, ConnectionFactory f, List<InetSocketAddress> a, Collection<ConnectionObserver> obs, FailureMode fm, OperationFactory opfactory) throws IOException Construct aMemcachedConnection
.- Parameters:
bufSize
- the size of the buffer used for reading from the server.f
- the factory that will provide an operation queue.a
- the addresses of the servers to connect to.obs
- the initial observers to add.fm
- the failure mode to use.opfactory
- the operation factory.- Throws:
IOException
- if a connection attempt fails early
-
-
Method Details
-
registerMetrics
protected void registerMetrics()Register Metrics for collection. Note that these Metrics may or may not take effect, depending on theMetricCollector
implementation. This can be controlled from theDefaultConnectionFactory
. -
createConnections
protected List<MemcachedNode> createConnections(Collection<InetSocketAddress> addrs) throws IOException Create connections for the given list of addresses.- Parameters:
addrs
- the list of addresses to connect to.- Returns:
- addrs list of
MemcachedNode
s. - Throws:
IOException
- if connecting was not successful.
-
selectorsMakeSense
private boolean selectorsMakeSense()Make sure that the current selectors make sense.- Returns:
- true if they do.
-
handleIO
Handle all IO that flows through the connection. This method is called in an endless loop, listens on NIO selectors and dispatches the underlying read/write calls if needed.- Throws:
IOException
-
handleWokenUpSelector
protected void handleWokenUpSelector()Helper method which gets called if the selector is woken up because of the timeout setting, if has been interrupted or if happens during regular write operation phases.This method can be overriden by child implementations to handle custom behavior on a manually woken selector, like sending pings through the channels to make sure they are alive.
Note that there is no guarantee that this method is at all or in the regular interval called, so all overriding implementations need to take that into account. Also, it needs to take into account that it may be called very often under heavy workloads, so it should not perform extensive tasks in the same thread.
-
handleOperationalTasks
Helper method forhandleIO()
to encapsulate everything that needs to be checked on a regular basis that has nothing to do directly with reading and writing data.- Throws:
IOException
- if an error happens during shutdown queue handling.
-
handleEmptySelects
private void handleEmptySelects()Helper method forhandleIO()
to handle empty select calls. -
handleShutdownQueue
Check if nodes need to be shut down and do so if needed.- Throws:
IOException
- if the channel could not be closed properly.
-
checkPotentiallyTimedOutConnection
private void checkPotentiallyTimedOutConnection()Check if one or more nodes exceeded the timeout Threshold. -
handleInputQueue
private void handleInputQueue()Handle any requests that have been made against the client. -
addObserver
Add a connection observer.- Returns:
- whether the observer was successfully added.
-
removeObserver
Remove a connection observer.- Returns:
- true if the observer existed and now doesn't.
-
connected
Indicate a successful connect to the given node.- Parameters:
node
- the node which was successfully connected.
-
lostConnection
Indicate a lost connection to the given node.- Parameters:
node
- the node where the connection was lost.
-
belongsToCluster
Makes sure that the given node belongs to the current cluster. Before trying to connect to a node, make sure it actually belongs to the currently connected cluster. -
handleIO
Handle IO for a specific selector. Any IOException will cause a reconnect. Note that this code makes sure that the corresponding node is not only able to connect, but also able to respond in a correct fashion (if verifyAliveOnConnect is set to true through a property). This is handled by issuing a dummy version/noop call and making sure it returns in a correct and timely fashion.- Parameters:
sk
- the selector to handle IO against.
-
handleReadsAndWrites
A helper method forhandleIO(java.nio.channels.SelectionKey)
to handle reads and writes if appropriate.- Parameters:
sk
- the selection key to use.node
- th enode to read write from.- Throws:
IOException
- if an error occurs during read/write.
-
finishConnect
Finish the connect phase and potentially verify its liveness.- Parameters:
sk
- the selection key for the node.node
- the actual node.- Throws:
IOException
- if something goes wrong during reading/writing.
-
handleWrites
Handle pending writes for the given node.- Parameters:
node
- the node to handle writes for.- Throws:
IOException
- can be raised during writing failures.
-
handleReads
Handle pending reads for the given node.- Parameters:
node
- the node to handle reads for.- Throws:
IOException
- can be raised during reading failures.
-
readBufferAndLogMetrics
private void readBufferAndLogMetrics(Operation currentOp, ByteBuffer rbuf, MemcachedNode node) throws IOException Read from the buffer and add metrics information.- Parameters:
currentOp
- the current operation to read.rbuf
- the read buffer to read from.node
- the node to read from.- Throws:
IOException
- if reading was not successful.
-
handleReadsWhenChannelEndOfStream
private Operation handleReadsWhenChannelEndOfStream(Operation currentOp, MemcachedNode node, ByteBuffer rbuf) throws IOException Deal with an operation where the channel reached the end of a stream.- Parameters:
currentOp
- the current operation to read.node
- the node for that operation.rbuf
- the read buffer.- Returns:
- the next operation on the node to read.
- Throws:
IOException
- if disconnect while reading.
-
dbgBuffer
Convert theByteBuffer
into a string for easier debugging.- Parameters:
b
- the buffer to debug.size
- the size of the buffer.- Returns:
- the stringified
ByteBuffer
.
-
handleRetryInformation
protected void handleRetryInformation(byte[] retryMessage) Optionally handle retry (NOT_MY_VBUKET) responses. This method can be overridden in subclasses to handle the content of the retry message appropriately.- Parameters:
retryMessage
- the body of the retry message.
-
queueReconnect
Enqueue the givenMemcachedNode
for reconnect.- Parameters:
node
- the node to reconnect.
-
cancelOperations
Cancel the given collection of operations.- Parameters:
ops
- the list of operations to cancel.
-
redistributeOperations
Redistribute the given list of operations to (potentially) other nodes. Note that operations can only be redistributed if they have not been cancelled already, timed out already or do not have definite targets (a key).- Parameters:
ops
- the operations to redistribute.
-
redistributeOperation
Redistribute the given operation to (potentially) other nodes. Note that operations can only be redistributed if they have not been cancelled already, timed out already or do not have definite targets (a key).- Parameters:
op
- the operation to redistribute.
-
attemptReconnects
private void attemptReconnects()Attempt to reconnectMemcachedNode
s in the reconnect queue. If theMemcachedNode
does not belong to the cluster list anymore, the reconnect attempt is cancelled. If it does, the code tries to reconnect immediately and if this is not possible it waits until the connection information arrives. Note that if a socket error arises during reconnect, the node is scheduled for re-reconnect immediately. -
potentiallyCloseLeakingChannel
Make sure channel connections are not leaked and properly close under faulty reconnect cirumstances.- Parameters:
ch
- the channel to potentially close.node
- the node to which the channel should be bound to.
-
getLocator
Returns theNodeLocator
in use for this connection.- Returns:
- the current
NodeLocator
.
-
enqueueOperation
Enqueue the givenOperation
with the used key.- Parameters:
key
- the key to use.o
- theOperation
to enqueue.
-
addOperation
Add an operation to a connection identified by the given key. If theMemcachedNode
is active or theFailureMode
is set to retry, the primary node will be used for that key. If the primary node is not available and theFailureMode
cancel is used, the operation will be cancelled without further retry. For any otherFailureMode
mechanisms (Redistribute), another possible node is used (only if its active as well). If no other active node could be identified, the original primary node is used and retried.- Parameters:
key
- the key the operation is operating upon.o
- the operation to add.
-
insertOperation
Insert an operation on the given node to the beginning of the queue.- Parameters:
node
- the node where to insert theOperation
.o
- the operation to insert.
-
addOperation
Enqueue an operation on the given node.- Parameters:
node
- the node where to enqueue theOperation
.o
- the operation to add.
-
addOperations
Enqueue the given list of operations on each handling node.- Parameters:
ops
- the operations for each node.
-
broadcastOperation
Broadcast an operation to all nodes.- Returns:
- a
CountDownLatch
that will be counted down when the operations are complete.
-
broadcastOperation
Broadcast an operation to a collection of nodes.- Returns:
- a
CountDownLatch
that will be counted down when the operations are complete.
-
shutdown
Shut down all connections and do not accept further incoming ops.- Throws:
IOException
-
toString
-
connectionsStatus
Construct a String containing information about all nodes and their state.- Returns:
- a stringified representation of the connection status.
-
opTimedOut
Increase the timeout counter for the given handling node.- Parameters:
op
- the operation to grab the node from.
-
opSucceeded
Reset the timeout counter for the given handling node.- Parameters:
op
- the operation to grab the node from.
-
setTimeout
Set the continuous timeout on an operation. Ignore operations which have no handling nodes set yet (which may happen before nodes are properly authenticated).- Parameters:
op
- the operation to use.isTimeout
- is timed out or not.
-
checkState
protected void checkState()Check to see if this connection is shutting down.- Throws:
IllegalStateException
- when shutting down.
-
run
public void run()Handle IO as long as the application is running. -
logRunException
Log a exception to different levels depending on the state. Exceptions get logged at debug level when happening during shutdown, but at warning level when operating normally.- Parameters:
e
- the exception to log.
-
isShutDown
public boolean isShutDown()Returns whether the connection is shut down or not.- Returns:
- true if the connection is shut down, false otherwise.
-
retryOperation
Add a operation to the retry queue. If the retry queue size is bounded and the size of the queue is reaching that boundary, the operation is cancelled rather than added to the retry queue.- Parameters:
op
- the operation to retry.
-