Package org.playframework.netty
Class HandlerPublisher<T>
- java.lang.Object
-
- io.netty.channel.ChannelHandlerAdapter
-
- io.netty.channel.ChannelInboundHandlerAdapter
-
- io.netty.channel.ChannelDuplexHandler
-
- org.playframework.netty.HandlerPublisher<T>
-
- All Implemented Interfaces:
io.netty.channel.ChannelHandler
,io.netty.channel.ChannelInboundHandler
,io.netty.channel.ChannelOutboundHandler
,org.reactivestreams.Publisher<T>
public class HandlerPublisher<T> extends io.netty.channel.ChannelDuplexHandler implements org.reactivestreams.Publisher<T>
Publisher for a Netty Handler. This publisher supports only one subscriber. All interactions with the subscriber are done from the handlers executor, hence, they provide the same happens before semantics that Netty provides. The handler publishes all messages that match the type as specified by the passed in class. Any non matching messages are forwarded to the next handler. The publisher will signal complete if it receives a channel inactive event. The publisher will release any messages that it drops (for example, messages that are buffered when the subscriber cancels), but other than that, it does not release any messages. It is up to the subscriber to release messages. If the subscriber cancels, the publisher will send a close event up the channel pipeline. All errors will short circuit the buffer, and cause publisher to immediately call the subscribers onError method, dropping the buffer. The publisher can be subscribed to or placed in a handler chain in any order.
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description private class
HandlerPublisher.ChannelSubscription
(package private) static class
HandlerPublisher.State
-
Field Summary
Fields Modifier and Type Field Description private java.util.Queue<java.lang.Object>
buffer
private static java.lang.Object
COMPLETE
Used for buffering a completion signal.private io.netty.channel.ChannelHandlerContext
ctx
private io.netty.util.concurrent.EventExecutor
executor
private java.util.concurrent.atomic.AtomicBoolean
hasSubscriber
Whether a subscriber has been provided.private io.netty.util.internal.TypeParameterMatcher
matcher
private java.lang.Throwable
noSubscriberError
private long
outstandingDemand
private HandlerPublisher.State
state
private org.reactivestreams.Subscriber<? super T>
subscriber
-
Constructor Summary
Constructors Constructor Description HandlerPublisher(io.netty.util.concurrent.EventExecutor executor, java.lang.Class<? extends T> subscriberMessageType)
Create a handler publisher.
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description protected boolean
acceptInboundMessage(java.lang.Object msg)
Returnstrue
if the given message should be handled.private boolean
addDemand(long demand)
protected void
cancelled()
Override to handle when a subscriber cancels the subscription.void
channelActive(io.netty.channel.ChannelHandlerContext ctx)
void
channelInactive(io.netty.channel.ChannelHandlerContext ctx)
void
channelRead(io.netty.channel.ChannelHandlerContext ctx, java.lang.Object message)
void
channelReadComplete(io.netty.channel.ChannelHandlerContext ctx)
void
channelRegistered(io.netty.channel.ChannelHandlerContext ctx)
private void
cleanup()
Release all elements from the buffer.private void
complete()
void
exceptionCaught(io.netty.channel.ChannelHandlerContext ctx, java.lang.Throwable cause)
private void
flushBuffer()
void
handlerAdded(io.netty.channel.ChannelHandlerContext ctx)
void
handlerRemoved(io.netty.channel.ChannelHandlerContext ctx)
private void
illegalDemand()
private void
provideChannelContext(io.netty.channel.ChannelHandlerContext ctx)
private void
provideSubscriber(org.reactivestreams.Subscriber<? super T> subscriber)
private void
publishMessage(java.lang.Object message)
private void
receivedCancel()
private void
receivedDemand(long demand)
protected void
requestDemand()
Override to intercept when demand is requested.void
subscribe(org.reactivestreams.Subscriber<? super T> subscriber)
private void
verifyRegisteredWithRightExecutor(io.netty.channel.ChannelHandlerContext ctx)
-
Methods inherited from class io.netty.channel.ChannelDuplexHandler
bind, close, connect, deregister, disconnect, flush, read, write
-
-
-
-
Field Detail
-
executor
private final io.netty.util.concurrent.EventExecutor executor
-
matcher
private final io.netty.util.internal.TypeParameterMatcher matcher
-
buffer
private final java.util.Queue<java.lang.Object> buffer
-
hasSubscriber
private final java.util.concurrent.atomic.AtomicBoolean hasSubscriber
Whether a subscriber has been provided. This is used to detect whether two subscribers are subscribing simultaneously.
-
state
private HandlerPublisher.State state
-
subscriber
private volatile org.reactivestreams.Subscriber<? super T> subscriber
-
ctx
private io.netty.channel.ChannelHandlerContext ctx
-
outstandingDemand
private long outstandingDemand
-
noSubscriberError
private java.lang.Throwable noSubscriberError
-
COMPLETE
private static final java.lang.Object COMPLETE
Used for buffering a completion signal.
-
-
Constructor Detail
-
HandlerPublisher
public HandlerPublisher(io.netty.util.concurrent.EventExecutor executor, java.lang.Class<? extends T> subscriberMessageType)
Create a handler publisher. The supplied executor must be the same event loop as the event loop that this handler is eventually registered with, if not, an exception will be thrown when the handler is registered.- Parameters:
executor
- The executor to execute asynchronous events from the subscriber on.subscriberMessageType
- The type of message this publisher accepts.
-
-
Method Detail
-
acceptInboundMessage
protected boolean acceptInboundMessage(java.lang.Object msg) throws java.lang.Exception
Returnstrue
if the given message should be handled. Iffalse
it will be passed to the nextChannelInboundHandler
in theChannelPipeline
.- Parameters:
msg
- The message to check.- Returns:
- True if the message should be accepted.
- Throws:
java.lang.Exception
-
cancelled
protected void cancelled()
Override to handle when a subscriber cancels the subscription. By default, this method will simply close the channel.
-
requestDemand
protected void requestDemand()
Override to intercept when demand is requested. By default, a channel read is invoked.
-
subscribe
public void subscribe(org.reactivestreams.Subscriber<? super T> subscriber)
- Specified by:
subscribe
in interfaceorg.reactivestreams.Publisher<T>
-
provideSubscriber
private void provideSubscriber(org.reactivestreams.Subscriber<? super T> subscriber)
-
handlerAdded
public void handlerAdded(io.netty.channel.ChannelHandlerContext ctx) throws java.lang.Exception
- Specified by:
handlerAdded
in interfaceio.netty.channel.ChannelHandler
- Overrides:
handlerAdded
in classio.netty.channel.ChannelHandlerAdapter
- Throws:
java.lang.Exception
-
channelRegistered
public void channelRegistered(io.netty.channel.ChannelHandlerContext ctx) throws java.lang.Exception
- Specified by:
channelRegistered
in interfaceio.netty.channel.ChannelInboundHandler
- Overrides:
channelRegistered
in classio.netty.channel.ChannelInboundHandlerAdapter
- Throws:
java.lang.Exception
-
provideChannelContext
private void provideChannelContext(io.netty.channel.ChannelHandlerContext ctx)
-
verifyRegisteredWithRightExecutor
private void verifyRegisteredWithRightExecutor(io.netty.channel.ChannelHandlerContext ctx)
-
channelActive
public void channelActive(io.netty.channel.ChannelHandlerContext ctx) throws java.lang.Exception
- Specified by:
channelActive
in interfaceio.netty.channel.ChannelInboundHandler
- Overrides:
channelActive
in classio.netty.channel.ChannelInboundHandlerAdapter
- Throws:
java.lang.Exception
-
receivedDemand
private void receivedDemand(long demand)
-
addDemand
private boolean addDemand(long demand)
-
illegalDemand
private void illegalDemand()
-
flushBuffer
private void flushBuffer()
-
receivedCancel
private void receivedCancel()
-
channelRead
public void channelRead(io.netty.channel.ChannelHandlerContext ctx, java.lang.Object message) throws java.lang.Exception
- Specified by:
channelRead
in interfaceio.netty.channel.ChannelInboundHandler
- Overrides:
channelRead
in classio.netty.channel.ChannelInboundHandlerAdapter
- Throws:
java.lang.Exception
-
publishMessage
private void publishMessage(java.lang.Object message)
-
channelReadComplete
public void channelReadComplete(io.netty.channel.ChannelHandlerContext ctx) throws java.lang.Exception
- Specified by:
channelReadComplete
in interfaceio.netty.channel.ChannelInboundHandler
- Overrides:
channelReadComplete
in classio.netty.channel.ChannelInboundHandlerAdapter
- Throws:
java.lang.Exception
-
channelInactive
public void channelInactive(io.netty.channel.ChannelHandlerContext ctx) throws java.lang.Exception
- Specified by:
channelInactive
in interfaceio.netty.channel.ChannelInboundHandler
- Overrides:
channelInactive
in classio.netty.channel.ChannelInboundHandlerAdapter
- Throws:
java.lang.Exception
-
handlerRemoved
public void handlerRemoved(io.netty.channel.ChannelHandlerContext ctx) throws java.lang.Exception
- Specified by:
handlerRemoved
in interfaceio.netty.channel.ChannelHandler
- Overrides:
handlerRemoved
in classio.netty.channel.ChannelHandlerAdapter
- Throws:
java.lang.Exception
-
complete
private void complete()
-
exceptionCaught
public void exceptionCaught(io.netty.channel.ChannelHandlerContext ctx, java.lang.Throwable cause) throws java.lang.Exception
- Specified by:
exceptionCaught
in interfaceio.netty.channel.ChannelHandler
- Specified by:
exceptionCaught
in interfaceio.netty.channel.ChannelInboundHandler
- Overrides:
exceptionCaught
in classio.netty.channel.ChannelInboundHandlerAdapter
- Throws:
java.lang.Exception
-
cleanup
private void cleanup()
Release all elements from the buffer.
-
-