Class HandlerPublisher<T>

java.lang.Object
io.netty.channel.ChannelHandlerAdapter
io.netty.channel.ChannelInboundHandlerAdapter
io.netty.channel.ChannelDuplexHandler
org.playframework.netty.HandlerPublisher<T>
All Implemented Interfaces:
io.netty.channel.ChannelHandler, io.netty.channel.ChannelInboundHandler, io.netty.channel.ChannelOutboundHandler, org.reactivestreams.Publisher<T>

public class HandlerPublisher<T> extends io.netty.channel.ChannelDuplexHandler implements org.reactivestreams.Publisher<T>
Publisher for a Netty Handler. This publisher supports only one subscriber. All interactions with the subscriber are done from the handlers executor, hence, they provide the same happens before semantics that Netty provides. The handler publishes all messages that match the type as specified by the passed in class. Any non matching messages are forwarded to the next handler. The publisher will signal complete if it receives a channel inactive event. The publisher will release any messages that it drops (for example, messages that are buffered when the subscriber cancels), but other than that, it does not release any messages. It is up to the subscriber to release messages. If the subscriber cancels, the publisher will send a close event up the channel pipeline. All errors will short circuit the buffer, and cause publisher to immediately call the subscribers onError method, dropping the buffer. The publisher can be subscribed to or placed in a handler chain in any order.
  • Field Details

    • executor

      private final io.netty.util.concurrent.EventExecutor executor
    • matcher

      private final io.netty.util.internal.TypeParameterMatcher matcher
    • buffer

      private final Queue<Object> buffer
    • hasSubscriber

      private final AtomicBoolean hasSubscriber
      Whether a subscriber has been provided. This is used to detect whether two subscribers are subscribing simultaneously.
    • state

      private HandlerPublisher.State state
    • subscriber

      private volatile org.reactivestreams.Subscriber<? super T> subscriber
    • ctx

      private io.netty.channel.ChannelHandlerContext ctx
    • outstandingDemand

      private long outstandingDemand
    • noSubscriberError

      private Throwable noSubscriberError
    • COMPLETE

      private static final Object COMPLETE
      Used for buffering a completion signal.
  • Constructor Details

    • HandlerPublisher

      public HandlerPublisher(io.netty.util.concurrent.EventExecutor executor, Class<? extends T> subscriberMessageType)
      Create a handler publisher. The supplied executor must be the same event loop as the event loop that this handler is eventually registered with, if not, an exception will be thrown when the handler is registered.
      Parameters:
      executor - The executor to execute asynchronous events from the subscriber on.
      subscriberMessageType - The type of message this publisher accepts.
  • Method Details

    • acceptInboundMessage

      protected boolean acceptInboundMessage(Object msg) throws Exception
      Returns true if the given message should be handled. If false it will be passed to the next ChannelInboundHandler in the ChannelPipeline.
      Parameters:
      msg - The message to check.
      Returns:
      True if the message should be accepted.
      Throws:
      Exception
    • cancelled

      protected void cancelled()
      Override to handle when a subscriber cancels the subscription. By default, this method will simply close the channel.
    • requestDemand

      protected void requestDemand()
      Override to intercept when demand is requested. By default, a channel read is invoked.
    • subscribe

      public void subscribe(org.reactivestreams.Subscriber<? super T> subscriber)
      Specified by:
      subscribe in interface org.reactivestreams.Publisher<T>
    • provideSubscriber

      private void provideSubscriber(org.reactivestreams.Subscriber<? super T> subscriber)
    • handlerAdded

      public void handlerAdded(io.netty.channel.ChannelHandlerContext ctx) throws Exception
      Specified by:
      handlerAdded in interface io.netty.channel.ChannelHandler
      Overrides:
      handlerAdded in class io.netty.channel.ChannelHandlerAdapter
      Throws:
      Exception
    • channelRegistered

      public void channelRegistered(io.netty.channel.ChannelHandlerContext ctx) throws Exception
      Specified by:
      channelRegistered in interface io.netty.channel.ChannelInboundHandler
      Overrides:
      channelRegistered in class io.netty.channel.ChannelInboundHandlerAdapter
      Throws:
      Exception
    • provideChannelContext

      private void provideChannelContext(io.netty.channel.ChannelHandlerContext ctx)
    • verifyRegisteredWithRightExecutor

      private void verifyRegisteredWithRightExecutor(io.netty.channel.ChannelHandlerContext ctx)
    • channelActive

      public void channelActive(io.netty.channel.ChannelHandlerContext ctx) throws Exception
      Specified by:
      channelActive in interface io.netty.channel.ChannelInboundHandler
      Overrides:
      channelActive in class io.netty.channel.ChannelInboundHandlerAdapter
      Throws:
      Exception
    • receivedDemand

      private void receivedDemand(long demand)
    • addDemand

      private boolean addDemand(long demand)
    • illegalDemand

      private void illegalDemand()
    • flushBuffer

      private void flushBuffer()
    • receivedCancel

      private void receivedCancel()
    • channelRead

      public void channelRead(io.netty.channel.ChannelHandlerContext ctx, Object message) throws Exception
      Specified by:
      channelRead in interface io.netty.channel.ChannelInboundHandler
      Overrides:
      channelRead in class io.netty.channel.ChannelInboundHandlerAdapter
      Throws:
      Exception
    • publishMessage

      private void publishMessage(Object message)
    • channelReadComplete

      public void channelReadComplete(io.netty.channel.ChannelHandlerContext ctx) throws Exception
      Specified by:
      channelReadComplete in interface io.netty.channel.ChannelInboundHandler
      Overrides:
      channelReadComplete in class io.netty.channel.ChannelInboundHandlerAdapter
      Throws:
      Exception
    • channelInactive

      public void channelInactive(io.netty.channel.ChannelHandlerContext ctx) throws Exception
      Specified by:
      channelInactive in interface io.netty.channel.ChannelInboundHandler
      Overrides:
      channelInactive in class io.netty.channel.ChannelInboundHandlerAdapter
      Throws:
      Exception
    • handlerRemoved

      public void handlerRemoved(io.netty.channel.ChannelHandlerContext ctx) throws Exception
      Specified by:
      handlerRemoved in interface io.netty.channel.ChannelHandler
      Overrides:
      handlerRemoved in class io.netty.channel.ChannelHandlerAdapter
      Throws:
      Exception
    • complete

      private void complete()
    • exceptionCaught

      public void exceptionCaught(io.netty.channel.ChannelHandlerContext ctx, Throwable cause) throws Exception
      Specified by:
      exceptionCaught in interface io.netty.channel.ChannelHandler
      Specified by:
      exceptionCaught in interface io.netty.channel.ChannelInboundHandler
      Overrides:
      exceptionCaught in class io.netty.channel.ChannelInboundHandlerAdapter
      Throws:
      Exception
    • cleanup

      private void cleanup()
      Release all elements from the buffer.