Package org.jboss.netty.handler.queue
Class BufferedWriteHandler
- java.lang.Object
-
- org.jboss.netty.channel.SimpleChannelHandler
-
- org.jboss.netty.handler.queue.BufferedWriteHandler
-
- All Implemented Interfaces:
ChannelDownstreamHandler
,ChannelHandler
,ChannelUpstreamHandler
,LifeCycleAwareChannelHandler
public class BufferedWriteHandler extends SimpleChannelHandler implements LifeCycleAwareChannelHandler
Emulates buffered write operation. This handler stores all write requests into an unboundedQueue
and flushes them to the downstream whenflush()
method is called.Here is an example that demonstrates the usage:
BufferedWriteHandler bufferedWriter = new BufferedWriteHandler(); ChannelPipeline p = ...; p.addFirst("buffer", bufferedWriter); ... Channel ch = ...; // msg1, 2, and 3 are stored in the queue of bufferedWriter. ch.write(msg1); ch.write(msg2); ch.write(msg3); // and will be flushed on request. bufferedWriter.flush();
Auto-flush
The write request queue is automatically flushed when the associatedChannel
is disconnected or closed. However, it does not flush the queue otherwise. It means you have to callflush()
before the size of the queue increases too much. You can implement your own auto-flush strategy by extending this handler:public class AutoFlusher extends
BufferedWriteHandler
{ private final AtomicLong bufferSize = new AtomicLong(); @Override public void writeRequested(ChannelHandlerContext
ctx,MessageEvent
e) { super.writeRequested(ctx, e);ChannelBuffer
data = (ChannelBuffer
) e.getMessage(); int newBufferSize = bufferSize.addAndGet(data.readableBytes()); // Flush the queue if it gets larger than 8KiB. if (newBufferSize > 8192) { flush(); bufferSize.set(0); } } }Consolidate on flush
If there are two or more write requests in the queue and all their message type isChannelBuffer
, they can be merged into a single write request to save the number of system calls.BEFORE consolidation: AFTER consolidation: +-------+-------+-------+ +-------------+ | Req C | Req B | Req A |------\\| Request ABC | | "789" | "456" | "123" |------//| "123456789" | +-------+-------+-------+ +-------------+
This feature is disabled by default. You can override the default when you create this handler or callflush(boolean)
. If you specifiedtrue
when you call the constructor, callingflush()
will always consolidate the queue. Otherwise, you have to callflush(boolean)
withtrue
to enable this feature for each flush.The disadvantage of consolidation is that the
ChannelFuture
and itsChannelFutureListener
s associated with the original write requests might be notified later than when they are actually written out. They will always be notified when the consolidated write request is fully written.The following example implements the consolidation strategy that reduces the number of write requests based on the writability of a channel:
public class ConsolidatingAutoFlusher extends
BufferedWriteHandler
{ public ConsolidatingAutoFlusher() { // Enable consolidation by default. super(true); } @Override public void channelOpen(ChannelHandlerContext
ctx,ChannelStateEvent
e) throws Exception {ChannelConfig
cfg = e.getChannel().getConfig(); if (cfg instanceofNioSocketChannelConfig
) { // Lower the watermark to increase the chance of consolidation. cfg.setWriteBufferLowWaterMark(0); } super.channelOpen(e); } @Override public void writeRequested(ChannelHandlerContext
ctx,MessageEvent
e) throws Exception { super.writeRequested(ctx, et); if (e.getChannel().isWritable()) { flush(); } } @Override public void channelInterestChanged(ChannelHandlerContext
ctx,ChannelStateEvent
e) throws Exception { if (e.getChannel().isWritable()) { flush(); } } }Prioritized Writes
You can implement prioritized writes by specifying an unbounded priority queue in the constructor of this handler. It will be required to design the proper strategy to determine how oftenflush()
should be called. For example, you could callflush()
periodically, usingHashedWheelTimer
every second.
-
-
Nested Class Summary
-
Nested classes/interfaces inherited from interface org.jboss.netty.channel.ChannelHandler
ChannelHandler.Sharable
-
-
Field Summary
Fields Modifier and Type Field Description private boolean
consolidateOnFlush
private ChannelHandlerContext
ctx
private java.util.concurrent.atomic.AtomicBoolean
flush
private java.util.Queue<MessageEvent>
queue
-
Constructor Summary
Constructors Constructor Description BufferedWriteHandler()
Creates a new instance with the default unboundedBlockingQueue
implementation and without buffer consolidation.BufferedWriteHandler(boolean consolidateOnFlush)
Creates a new instance withConcurrentLinkedQueue
BufferedWriteHandler(java.util.Queue<MessageEvent> queue)
Creates a new instance with the specified thread-safe unboundedQueue
and without buffer consolidation.BufferedWriteHandler(java.util.Queue<MessageEvent> queue, boolean consolidateOnFlush)
Creates a new instance with the specified thread-safe unboundedQueue
.
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description void
afterAdd(ChannelHandlerContext ctx)
void
afterRemove(ChannelHandlerContext ctx)
Fail all buffered writes that are left.void
beforeAdd(ChannelHandlerContext ctx)
void
beforeRemove(ChannelHandlerContext ctx)
void
channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)
Fail all buffered writes that are left.void
closeRequested(ChannelHandlerContext ctx, ChannelStateEvent e)
Invoked whenChannel.close()
was called.private java.util.List<MessageEvent>
consolidatedWrite(java.util.List<MessageEvent> pendingWrites)
void
disconnectRequested(ChannelHandlerContext ctx, ChannelStateEvent e)
Invoked whenChannel.disconnect()
was called.void
flush()
Sends the queued write requests to the downstream.void
flush(boolean consolidateOnFlush)
Sends the queued write requests to the downstream.protected java.util.Queue<MessageEvent>
getQueue()
Returns the queue which stores the write requests.boolean
isConsolidateOnFlush()
void
writeRequested(ChannelHandlerContext ctx, MessageEvent e)
Stores all write requests to the queue so that they are actually written onflush()
.-
Methods inherited from class org.jboss.netty.channel.SimpleChannelHandler
bindRequested, channelBound, channelConnected, channelDisconnected, channelInterestChanged, channelOpen, channelUnbound, childChannelClosed, childChannelOpen, connectRequested, exceptionCaught, handleDownstream, handleUpstream, messageReceived, setInterestOpsRequested, unbindRequested, writeComplete
-
-
-
-
Field Detail
-
queue
private final java.util.Queue<MessageEvent> queue
-
consolidateOnFlush
private final boolean consolidateOnFlush
-
ctx
private volatile ChannelHandlerContext ctx
-
flush
private final java.util.concurrent.atomic.AtomicBoolean flush
-
-
Constructor Detail
-
BufferedWriteHandler
public BufferedWriteHandler()
Creates a new instance with the default unboundedBlockingQueue
implementation and without buffer consolidation.
-
BufferedWriteHandler
public BufferedWriteHandler(java.util.Queue<MessageEvent> queue)
Creates a new instance with the specified thread-safe unboundedQueue
and without buffer consolidation. Please note that specifying a boundedQueue
or a thread-unsafeQueue
will result in an unspecified behavior.
-
BufferedWriteHandler
public BufferedWriteHandler(boolean consolidateOnFlush)
Creates a new instance withConcurrentLinkedQueue
- Parameters:
consolidateOnFlush
-true
if and only if the buffered write requests are merged into a single write request onflush()
-
BufferedWriteHandler
public BufferedWriteHandler(java.util.Queue<MessageEvent> queue, boolean consolidateOnFlush)
Creates a new instance with the specified thread-safe unboundedQueue
. Please note that specifying a boundedQueue
or a thread-unsafeQueue
will result in an unspecified behavior.- Parameters:
consolidateOnFlush
-true
if and only if the buffered write requests are merged into a single write request onflush()
-
-
Method Detail
-
isConsolidateOnFlush
public boolean isConsolidateOnFlush()
-
getQueue
protected java.util.Queue<MessageEvent> getQueue()
Returns the queue which stores the write requests. The default implementation returns the queue which was specified in the constructor.
-
flush
public void flush()
Sends the queued write requests to the downstream.
-
flush
public void flush(boolean consolidateOnFlush)
Sends the queued write requests to the downstream.- Parameters:
consolidateOnFlush
-true
if and only if the buffered write requests are merged into a single write request
-
consolidatedWrite
private java.util.List<MessageEvent> consolidatedWrite(java.util.List<MessageEvent> pendingWrites)
-
writeRequested
public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) throws java.lang.Exception
Stores all write requests to the queue so that they are actually written onflush()
.- Overrides:
writeRequested
in classSimpleChannelHandler
- Throws:
java.lang.Exception
-
disconnectRequested
public void disconnectRequested(ChannelHandlerContext ctx, ChannelStateEvent e) throws java.lang.Exception
Description copied from class:SimpleChannelHandler
Invoked whenChannel.disconnect()
was called.- Overrides:
disconnectRequested
in classSimpleChannelHandler
- Throws:
java.lang.Exception
-
closeRequested
public void closeRequested(ChannelHandlerContext ctx, ChannelStateEvent e) throws java.lang.Exception
Description copied from class:SimpleChannelHandler
Invoked whenChannel.close()
was called.- Overrides:
closeRequested
in classSimpleChannelHandler
- Throws:
java.lang.Exception
-
channelClosed
public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws java.lang.Exception
Fail all buffered writes that are left. See Overrides:channelClosed
in classSimpleChannelHandler
- Throws:
java.lang.Exception
-
beforeAdd
public void beforeAdd(ChannelHandlerContext ctx) throws java.lang.Exception
- Specified by:
beforeAdd
in interfaceLifeCycleAwareChannelHandler
- Throws:
java.lang.Exception
-
afterAdd
public void afterAdd(ChannelHandlerContext ctx) throws java.lang.Exception
- Specified by:
afterAdd
in interfaceLifeCycleAwareChannelHandler
- Throws:
java.lang.Exception
-
beforeRemove
public void beforeRemove(ChannelHandlerContext ctx) throws java.lang.Exception
- Specified by:
beforeRemove
in interfaceLifeCycleAwareChannelHandler
- Throws:
java.lang.Exception
-
afterRemove
public void afterRemove(ChannelHandlerContext ctx) throws java.lang.Exception
-
-