Class ReplayingDecoder<T extends java.lang.Enum<T>>
- java.lang.Object
-
- org.jboss.netty.channel.SimpleChannelUpstreamHandler
-
- org.jboss.netty.handler.codec.frame.FrameDecoder
-
- org.jboss.netty.handler.codec.replay.ReplayingDecoder<T>
-
- Type Parameters:
T
- the state type; useVoidEnum
if state management is unused
- All Implemented Interfaces:
ChannelHandler
,ChannelUpstreamHandler
,LifeCycleAwareChannelHandler
- Direct Known Subclasses:
CompatibleMarshallingDecoder
,HttpMessageDecoder
,SocksAuthRequestDecoder
,SocksAuthResponseDecoder
,SocksCmdRequestDecoder
,SocksCmdResponseDecoder
,SocksInitRequestDecoder
,SocksInitResponseDecoder
,WebSocket00FrameDecoder
,WebSocket08FrameDecoder
public abstract class ReplayingDecoder<T extends java.lang.Enum<T>> extends FrameDecoder
A specialized variation ofFrameDecoder
which enables implementation of a non-blocking decoder in the blocking I/O paradigm.The biggest difference between
ReplayingDecoder
andFrameDecoder
is thatReplayingDecoder
allows you to implement thedecode()
anddecodeLast()
methods just like all required bytes were received already, rather than checking the availability of the required bytes. For example, the followingFrameDecoder
implementation:public class IntegerHeaderFrameDecoder extends
is simplified like the following withFrameDecoder
{@Override
protected Object decode(ChannelHandlerContext
ctx,Channel
channel,ChannelBuffer
buf) throws Exception { if (buf.readableBytes() < 4) { return null; } buf.markReaderIndex(); int length = buf.readInt(); if (buf.readableBytes() < length) { buf.resetReaderIndex(); return null; } return buf.readBytes(length); } }ReplayingDecoder
:public class IntegerHeaderFrameDecoder extends
ReplayingDecoder
<VoidEnum
> { protected Object decode(ChannelHandlerContext
ctx,Channel
channel,ChannelBuffer
buf,VoidEnum
state) throws Exception { return buf.readBytes(buf.readInt()); } }How does this work?
ReplayingDecoder
passes a specializedChannelBuffer
implementation which throws anError
of certain type when there's not enough data in the buffer. In theIntegerHeaderFrameDecoder
above, you just assumed that there will be 4 or more bytes in the buffer when you callbuf.readInt()
. If there's really 4 bytes in the buffer, it will return the integer header as you expected. Otherwise, theError
will be raised and the control will be returned toReplayingDecoder
. IfReplayingDecoder
catches theError
, then it will rewind thereaderIndex
of the buffer back to the 'initial' position (i.e. the beginning of the buffer) and call thedecode(..)
method again when more data is received into the buffer.Please note that
ReplayingDecoder
always throws the same cachedError
instance to avoid the overhead of creating a newError
and filling its stack trace for every throw.Limitations
At the cost of the simplicity,
ReplayingDecoder
enforces you a few limitations:- Some buffer operations are prohibited.
- Performance can be worse if the network is slow and the message format is complicated unlike the example above. In this case, your decoder might have to decode the same part of the message over and over again.
- You must keep in mind that
decode(..)
method can be called many times to decode a single message. For example, the following code will not work:public class MyDecoder extends
The correct implementation looks like the following, and you can also utilize the 'checkpoint' feature which is explained in detail in the next section.ReplayingDecoder
<VoidEnum
> { private final Queue<Integer> values = new LinkedList<Integer>();@Override
public Object decode(..,ChannelBuffer
buffer, ..) throws Exception { // A message contains 2 integers. values.offer(buffer.readInt()); values.offer(buffer.readInt()); // This assertion will fail intermittently since values.offer() // can be called more than two times! assert values.size() == 2; return values.poll() + values.poll(); } }public class MyDecoder extends
ReplayingDecoder
<VoidEnum
> { private final Queue<Integer> values = new LinkedList<Integer>();@Override
public Object decode(..,ChannelBuffer
buffer, ..) throws Exception { // Revert the state of the variable that might have been changed // since the last partial decode. values.clear(); // A message contains 2 integers. values.offer(buffer.readInt()); values.offer(buffer.readInt()); // Now we know this assertion will never fail. assert values.size() == 2; return values.poll() + values.poll(); } }
Improving the performance
Fortunately, the performance of a complex decoder implementation can be improved significantly with the
checkpoint()
method. Thecheckpoint()
method updates the 'initial' position of the buffer so thatReplayingDecoder
rewinds thereaderIndex
of the buffer to the last position where you called thecheckpoint()
method.Calling
checkpoint(T)
with anEnum
Although you can just use
checkpoint()
method and manage the state of the decoder by yourself, the easiest way to manage the state of the decoder is to create anEnum
type which represents the current state of the decoder and to callcheckpoint(T)
method whenever the state changes. You can have as many states as you want depending on the complexity of the message you want to decode:public enum MyDecoderState { READ_LENGTH, READ_CONTENT; } public class IntegerHeaderFrameDecoder extends
ReplayingDecoder
<MyDecoderState> { private int length; public IntegerHeaderFrameDecoder() { // Set the initial state. super(MyDecoderState.READ_LENGTH); }@Override
protected Object decode(ChannelHandlerContext
ctx,Channel
channel,ChannelBuffer
buf, MyDecoderState state) throws Exception { switch (state) { case READ_LENGTH: length = buf.readInt(); checkpoint(MyDecoderState.READ_CONTENT); case READ_CONTENT: ChannelBuffer frame = buf.readBytes(length); checkpoint(MyDecoderState.READ_LENGTH); return frame; default: throw new Error("Shouldn't reach here."); } } }Calling
checkpoint()
with no parameterAn alternative way to manage the decoder state is to manage it by yourself.
public class IntegerHeaderFrameDecoder extends
ReplayingDecoder
<VoidEnum
> { private boolean readLength; private int length;@Override
protected Object decode(ChannelHandlerContext
ctx,Channel
channel,ChannelBuffer
buf,VoidEnum
state) throws Exception { if (!readLength) { length = buf.readInt(); readLength = true; checkpoint(); } if (readLength) { ChannelBuffer frame = buf.readBytes(length); readLength = false; checkpoint(); return frame; } } }Replacing a decoder with another decoder in a pipeline
If you are going to write a protocol multiplexer, you will probably want to replace a
ReplayingDecoder
(protocol detector) with anotherReplayingDecoder
orFrameDecoder
(actual protocol decoder). It is not possible to achieve this simply by callingChannelPipeline.replace(ChannelHandler, String, ChannelHandler)
, but some additional steps are required:public class FirstDecoder extends
ReplayingDecoder
<VoidEnum
> { public FirstDecoder() { super(true); // Enable unfold }@Override
protected Object decode(ChannelHandlerContext
ctx,Channel
ch,ChannelBuffer
buf,VoidEnum
state) { ... // Decode the first message Object firstMessage = ...; // Add the second decoder ctx.getPipeline().addLast("second", new SecondDecoder()); // Remove the first decoder (me) ctx.getPipeline().remove(this); if (buf.readable()) { // Hand off the remaining data to the second decoder return new Object[] { firstMessage, buf.readBytes(super.actualReadableBytes()) }; } else { // Nothing to hand off return firstMessage; } }
-
-
Nested Class Summary
-
Nested classes/interfaces inherited from interface org.jboss.netty.channel.ChannelHandler
ChannelHandler.Sharable
-
-
Field Summary
Fields Modifier and Type Field Description private int
checkpoint
private boolean
needsCleanup
private ReplayingDecoderBuffer
replayable
private T
state
-
Fields inherited from class org.jboss.netty.handler.codec.frame.FrameDecoder
cumulation, DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS
-
-
Constructor Summary
Constructors Modifier Constructor Description protected
ReplayingDecoder()
Creates a new instance with no initial state (i.e:null
).protected
ReplayingDecoder(boolean unfold)
protected
ReplayingDecoder(T initialState)
Creates a new instance with the specified initial state.protected
ReplayingDecoder(T initialState, boolean unfold)
-
Method Summary
All Methods Instance Methods Abstract Methods Concrete Methods Modifier and Type Method Description private void
callDecode(ChannelHandlerContext context, Channel channel, ChannelBuffer input, ChannelBuffer replayableInput, java.net.SocketAddress remoteAddress)
protected void
checkpoint()
Stores the internal cumulative buffer's reader position.protected void
checkpoint(T state)
Stores the internal cumulative buffer's reader position and updates the current decoder state.protected void
cleanup(ChannelHandlerContext ctx, ChannelStateEvent e)
protected java.lang.Object
decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer)
protected abstract java.lang.Object
decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer, T state)
Decodes the received packets so far into a frame.protected java.lang.Object
decodeLast(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer)
Decodes the received data so far into a frame when the channel is disconnected.protected java.lang.Object
decodeLast(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer, T state)
Decodes the received data so far into a frame when the channel is disconnected.protected T
getState()
Returns the current state of this decoder.protected ChannelBuffer
internalBuffer()
Returns the internal cumulative buffer of this decoder.void
messageReceived(ChannelHandlerContext ctx, MessageEvent e)
Invoked when a message object (e.g:ChannelBuffer
) was received from a remote peer.protected T
setState(T newState)
Sets the current state of this decoder.-
Methods inherited from class org.jboss.netty.handler.codec.frame.FrameDecoder
actualReadableBytes, afterAdd, afterRemove, appendToCumulation, beforeAdd, beforeRemove, channelClosed, channelDisconnected, exceptionCaught, extractFrame, getMaxCumulationBufferCapacity, getMaxCumulationBufferComponents, isUnfold, newCumulationBuffer, replace, setMaxCumulationBufferCapacity, setMaxCumulationBufferComponents, setUnfold, unfoldAndFireMessageReceived, updateCumulation
-
Methods inherited from class org.jboss.netty.channel.SimpleChannelUpstreamHandler
channelBound, channelConnected, channelInterestChanged, channelOpen, channelUnbound, childChannelClosed, childChannelOpen, handleUpstream, writeComplete
-
-
-
-
Field Detail
-
replayable
private final ReplayingDecoderBuffer replayable
-
checkpoint
private int checkpoint
-
needsCleanup
private boolean needsCleanup
-
-
Constructor Detail
-
ReplayingDecoder
protected ReplayingDecoder()
Creates a new instance with no initial state (i.e:null
).
-
ReplayingDecoder
protected ReplayingDecoder(boolean unfold)
-
ReplayingDecoder
protected ReplayingDecoder(T initialState)
Creates a new instance with the specified initial state.
-
ReplayingDecoder
protected ReplayingDecoder(T initialState, boolean unfold)
-
-
Method Detail
-
internalBuffer
protected ChannelBuffer internalBuffer()
Description copied from class:FrameDecoder
Returns the internal cumulative buffer of this decoder. You usually do not need to access the internal buffer directly to write a decoder. Use it only when you must use it at your own risk.- Overrides:
internalBuffer
in classFrameDecoder
-
checkpoint
protected void checkpoint()
Stores the internal cumulative buffer's reader position.
-
checkpoint
protected void checkpoint(T state)
Stores the internal cumulative buffer's reader position and updates the current decoder state.
-
getState
protected T getState()
Returns the current state of this decoder.- Returns:
- the current state of this decoder
-
setState
protected T setState(T newState)
Sets the current state of this decoder.- Returns:
- the old state of this decoder
-
decode
protected abstract java.lang.Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer, T state) throws java.lang.Exception
Decodes the received packets so far into a frame.- Parameters:
ctx
- the context of this handlerchannel
- the current channelbuffer
- the cumulative buffer of received packets so far. Note that the buffer might be empty, which means you should not make an assumption that the buffer contains at least one byte in your decoder implementation.state
- the current decoder state (null
if unused)- Returns:
- the decoded frame
- Throws:
java.lang.Exception
-
decodeLast
protected java.lang.Object decodeLast(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer, T state) throws java.lang.Exception
Decodes the received data so far into a frame when the channel is disconnected.- Parameters:
ctx
- the context of this handlerchannel
- the current channelbuffer
- the cumulative buffer of received packets so far. Note that the buffer might be empty, which means you should not make an assumption that the buffer contains at least one byte in your decoder implementation.state
- the current decoder state (null
if unused)- Returns:
- the decoded frame
- Throws:
java.lang.Exception
-
decode
protected final java.lang.Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws java.lang.Exception
Callsdecode(ChannelHandlerContext, Channel, ChannelBuffer, Enum)
. This method should be never used byReplayingDecoder
itself. But to be safe we should handle it anyway- Specified by:
decode
in classFrameDecoder
- Parameters:
ctx
- the context of this handlerchannel
- the current channelbuffer
- the cumulative buffer of received packets so far. Note that the buffer might be empty, which means you should not make an assumption that the buffer contains at least one byte in your decoder implementation.- Returns:
- the decoded frame if a full frame was received and decoded.
null
if there's not enough data in the buffer to decode a frame. - Throws:
java.lang.Exception
-
decodeLast
protected final java.lang.Object decodeLast(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws java.lang.Exception
Description copied from class:FrameDecoder
Decodes the received data so far into a frame when the channel is disconnected.- Overrides:
decodeLast
in classFrameDecoder
- Parameters:
ctx
- the context of this handlerchannel
- the current channelbuffer
- the cumulative buffer of received packets so far. Note that the buffer might be empty, which means you should not make an assumption that the buffer contains at least one byte in your decoder implementation.- Returns:
- the decoded frame if a full frame was received and decoded.
null
if there's not enough data in the buffer to decode a frame. - Throws:
java.lang.Exception
-
messageReceived
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws java.lang.Exception
Description copied from class:SimpleChannelUpstreamHandler
Invoked when a message object (e.g:ChannelBuffer
) was received from a remote peer.- Overrides:
messageReceived
in classFrameDecoder
- Throws:
java.lang.Exception
-
callDecode
private void callDecode(ChannelHandlerContext context, Channel channel, ChannelBuffer input, ChannelBuffer replayableInput, java.net.SocketAddress remoteAddress) throws java.lang.Exception
- Throws:
java.lang.Exception
-
cleanup
protected void cleanup(ChannelHandlerContext ctx, ChannelStateEvent e) throws java.lang.Exception
Description copied from class:FrameDecoder
Gets called onFrameDecoder.channelDisconnected(ChannelHandlerContext, ChannelStateEvent)
andFrameDecoder.channelClosed(ChannelHandlerContext, ChannelStateEvent)
- Overrides:
cleanup
in classFrameDecoder
- Throws:
java.lang.Exception
-
-