{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE RankNTypes #-}

module Ouroboros.Network.Channel
  ( Channel (..)
  , toChannel
  , fromChannel
  , createPipeConnectedChannels
  , hoistChannel
  , isoKleisliChannel
  , fixedInputChannel
  , mvarsAsChannel
  , handlesAsChannel
  , createConnectedChannels
  , createConnectedBufferedChannels
  , createPipelineTestChannels
  , channelEffect
  , delayChannel
  , loggingChannel
  ) where

import           Control.Monad ((>=>))
import           Control.Monad.Class.MonadSay
import           Control.Monad.Class.MonadTimer
import qualified Data.ByteString      as BS
import qualified Data.ByteString.Lazy as LBS
import           Data.ByteString.Lazy.Internal (smallChunkSize)
import           Numeric.Natural

import qualified System.IO as IO
                   ( Handle, hFlush, hIsEOF )

import           Control.Monad.Class.MonadSTM

import qualified Network.Mux.Channel as Mx


-- | One end of a duplex channel. It is a reliable, ordered channel of some
-- medium. The medium does not imply message boundaries, it can be just bytes.
--
data Channel m a = Channel {

       -- | Write output to the channel.
       --
       -- It may raise exceptions (as appropriate for the monad and kind of
       -- channel).
       --
       Channel m a -> a -> m ()
send :: a -> m (),

       -- | Read some input from the channel, or @Nothing@ to indicate EOF.
       --
       -- Note that having received EOF it is still possible to send.
       -- The EOF condition is however monotonic.
       --
       -- It may raise exceptions (as appropriate for the monad and kind of
       -- channel).
       --
       Channel m a -> m (Maybe a)
recv :: m (Maybe a)
     }

-- TODO: eliminate the second Channel type and these conversion functions.

fromChannel :: Mx.Channel m
            -> Channel m LBS.ByteString
fromChannel :: Channel m -> Channel m ByteString
fromChannel Mx.Channel { ByteString -> m ()
send :: forall (m :: * -> *). Channel m -> ByteString -> m ()
send :: ByteString -> m ()
Mx.send, m (Maybe ByteString)
recv :: forall (m :: * -> *). Channel m -> m (Maybe ByteString)
recv :: m (Maybe ByteString)
Mx.recv } = Channel :: forall (m :: * -> *) a. (a -> m ()) -> m (Maybe a) -> Channel m a
Channel {
    send :: ByteString -> m ()
send = ByteString -> m ()
send,
    recv :: m (Maybe ByteString)
recv = m (Maybe ByteString)
recv
  }

toChannel :: Channel m LBS.ByteString
          -> Mx.Channel m
toChannel :: Channel m ByteString -> Channel m
toChannel Channel { ByteString -> m ()
send :: ByteString -> m ()
send :: forall (m :: * -> *) a. Channel m a -> a -> m ()
send, m (Maybe ByteString)
recv :: m (Maybe ByteString)
recv :: forall (m :: * -> *) a. Channel m a -> m (Maybe a)
recv } = Channel :: forall (m :: * -> *).
(ByteString -> m ()) -> m (Maybe ByteString) -> Channel m
Mx.Channel {
    send :: ByteString -> m ()
Mx.send = ByteString -> m ()
send,
    recv :: m (Maybe ByteString)
Mx.recv = m (Maybe ByteString)
recv
  }

-- | Create a local pipe, with both ends in this process, and expose that as
-- a pair of 'Channel's, one for each end.
--
-- This is primarily for testing purposes since it does not allow actual IPC.
--
createPipeConnectedChannels :: IO (Channel IO LBS.ByteString,
                                   Channel IO LBS.ByteString)
createPipeConnectedChannels :: IO (Channel IO ByteString, Channel IO ByteString)
createPipeConnectedChannels =
    (\(Channel IO
a, Channel IO
b) -> (Channel IO -> Channel IO ByteString
forall (m :: * -> *). Channel m -> Channel m ByteString
fromChannel Channel IO
a, Channel IO -> Channel IO ByteString
forall (m :: * -> *). Channel m -> Channel m ByteString
fromChannel Channel IO
b))
    ((Channel IO, Channel IO)
 -> (Channel IO ByteString, Channel IO ByteString))
-> IO (Channel IO, Channel IO)
-> IO (Channel IO ByteString, Channel IO ByteString)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO (Channel IO, Channel IO)
Mx.createPipeConnectedChannels

-- | Given an isomorphism between @a@ and @b@ (in Kleisli category), transform
-- a @'Channel' m a@ into @'Channel' m b@.
--
isoKleisliChannel
  :: forall a b m. Monad m
  => (a -> m b)
  -> (b -> m a)
  -> Channel m a
  -> Channel m b
isoKleisliChannel :: (a -> m b) -> (b -> m a) -> Channel m a -> Channel m b
isoKleisliChannel a -> m b
f b -> m a
finv Channel{a -> m ()
send :: a -> m ()
send :: forall (m :: * -> *) a. Channel m a -> a -> m ()
send, m (Maybe a)
recv :: m (Maybe a)
recv :: forall (m :: * -> *) a. Channel m a -> m (Maybe a)
recv} = Channel :: forall (m :: * -> *) a. (a -> m ()) -> m (Maybe a) -> Channel m a
Channel {
    send :: b -> m ()
send = b -> m a
finv (b -> m a) -> (a -> m ()) -> b -> m ()
forall (m :: * -> *) a b c.
Monad m =>
(a -> m b) -> (b -> m c) -> a -> m c
>=> a -> m ()
send,
    recv :: m (Maybe b)
recv = m (Maybe a)
recv m (Maybe a) -> (Maybe a -> m (Maybe b)) -> m (Maybe b)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (a -> m b) -> Maybe a -> m (Maybe b)
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse a -> m b
f
  }


hoistChannel
  :: (forall x . m x -> n x)
  -> Channel m a
  -> Channel n a
hoistChannel :: (forall x. m x -> n x) -> Channel m a -> Channel n a
hoistChannel forall x. m x -> n x
nat Channel m a
channel = Channel :: forall (m :: * -> *) a. (a -> m ()) -> m (Maybe a) -> Channel m a
Channel
  { send :: a -> n ()
send = m () -> n ()
forall x. m x -> n x
nat (m () -> n ()) -> (a -> m ()) -> a -> n ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Channel m a -> a -> m ()
forall (m :: * -> *) a. Channel m a -> a -> m ()
send Channel m a
channel
  , recv :: n (Maybe a)
recv = m (Maybe a) -> n (Maybe a)
forall x. m x -> n x
nat (Channel m a -> m (Maybe a)
forall (m :: * -> *) a. Channel m a -> m (Maybe a)
recv Channel m a
channel)
  }

-- | A 'Channel' with a fixed input, and where all output is discarded.
--
-- The input is guaranteed to be supplied via 'read' with the given chunk
-- boundaries.
--
-- This is only useful for testing. In particular the fixed chunk boundaries
-- can be used to test that framing and other codecs work with any possible
-- chunking.
--
fixedInputChannel :: MonadSTM m => [a] -> m (Channel m a)
fixedInputChannel :: [a] -> m (Channel m a)
fixedInputChannel [a]
xs0 = do
    TVar_ (STM m) [a]
v <- STM m (TVar_ (STM m) [a]) -> m (TVar_ (STM m) [a])
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (TVar_ (STM m) [a]) -> m (TVar_ (STM m) [a]))
-> STM m (TVar_ (STM m) [a]) -> m (TVar_ (STM m) [a])
forall a b. (a -> b) -> a -> b
$ [a] -> STM m (TVar_ (STM m) [a])
forall (stm :: * -> *) a. MonadSTMTx stm => a -> stm (TVar_ stm a)
newTVar [a]
xs0
    Channel m a -> m (Channel m a)
forall (m :: * -> *) a. Monad m => a -> m a
return Channel :: forall (m :: * -> *) a. (a -> m ()) -> m (Maybe a) -> Channel m a
Channel {a -> m ()
forall (m :: * -> *) p. Monad m => p -> m ()
send :: forall (m :: * -> *) p. Monad m => p -> m ()
send :: a -> m ()
send, recv :: m (Maybe a)
recv = TVar_ (STM m) [a] -> m (Maybe a)
forall (m :: * -> *) a.
MonadSTM m =>
TVar_ (STM m) [a] -> m (Maybe a)
recv TVar_ (STM m) [a]
v}
  where
    recv :: TVar_ (STM m) [a] -> m (Maybe a)
recv TVar_ (STM m) [a]
v = STM m (Maybe a) -> m (Maybe a)
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (Maybe a) -> m (Maybe a)) -> STM m (Maybe a) -> m (Maybe a)
forall a b. (a -> b) -> a -> b
$ do
               [a]
xs <- TVar_ (STM m) [a] -> STM m [a]
forall (stm :: * -> *) a. MonadSTMTx stm => TVar_ stm a -> stm a
readTVar TVar_ (STM m) [a]
v
               case [a]
xs of
                 []      -> Maybe a -> STM m (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing
                 (a
x:[a]
xs') -> TVar_ (STM m) [a] -> [a] -> STM m ()
forall (stm :: * -> *) a.
MonadSTMTx stm =>
TVar_ stm a -> a -> stm ()
writeTVar TVar_ (STM m) [a]
v [a]
xs' STM m () -> STM m (Maybe a) -> STM m (Maybe a)
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Maybe a -> STM m (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> Maybe a
forall a. a -> Maybe a
Just a
x)

    send :: p -> m ()
send p
_ = () -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()


-- | Make a 'Channel' from a pair of 'TMVar's, one for reading and one for
-- writing.
--
mvarsAsChannel :: MonadSTM m
               => TMVar m a
               -> TMVar m a
               -> Channel m a 
mvarsAsChannel :: TMVar m a -> TMVar m a -> Channel m a
mvarsAsChannel TMVar m a
bufferRead TMVar m a
bufferWrite =
    Channel :: forall (m :: * -> *) a. (a -> m ()) -> m (Maybe a) -> Channel m a
Channel{a -> m ()
send :: a -> m ()
send :: a -> m ()
send, m (Maybe a)
recv :: m (Maybe a)
recv :: m (Maybe a)
recv}
  where
    send :: a -> m ()
send a
x = STM m () -> m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (TMVar m a -> a -> STM m ()
forall (stm :: * -> *) a.
MonadSTMTx stm =>
TMVar_ stm a -> a -> stm ()
putTMVar TMVar m a
bufferWrite a
x)
    recv :: m (Maybe a)
recv   = STM m (Maybe a) -> m (Maybe a)
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (a -> Maybe a
forall a. a -> Maybe a
Just (a -> Maybe a) -> STM m a -> STM m (Maybe a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TMVar m a -> STM m a
forall (stm :: * -> *) a. MonadSTMTx stm => TMVar_ stm a -> stm a
takeTMVar TMVar m a
bufferRead)


-- | Create a pair of channels that are connected via one-place buffers.
--
-- This is primarily useful for testing protocols.
--
createConnectedChannels :: MonadSTM m => m (Channel m a, Channel m a)
createConnectedChannels :: m (Channel m a, Channel m a)
createConnectedChannels = do
    -- Create two TMVars to act as the channel buffer (one for each direction)
    -- and use them to make both ends of a bidirectional channel
    TMVar_ (STM m) a
bufferA <- STM m (TMVar_ (STM m) a) -> m (TMVar_ (STM m) a)
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (TMVar_ (STM m) a) -> m (TMVar_ (STM m) a))
-> STM m (TMVar_ (STM m) a) -> m (TMVar_ (STM m) a)
forall a b. (a -> b) -> a -> b
$ STM m (TMVar_ (STM m) a)
forall (stm :: * -> *) a. MonadSTMTx stm => stm (TMVar_ stm a)
newEmptyTMVar
    TMVar_ (STM m) a
bufferB <- STM m (TMVar_ (STM m) a) -> m (TMVar_ (STM m) a)
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (TMVar_ (STM m) a) -> m (TMVar_ (STM m) a))
-> STM m (TMVar_ (STM m) a) -> m (TMVar_ (STM m) a)
forall a b. (a -> b) -> a -> b
$ STM m (TMVar_ (STM m) a)
forall (stm :: * -> *) a. MonadSTMTx stm => stm (TMVar_ stm a)
newEmptyTMVar

    (Channel m a, Channel m a) -> m (Channel m a, Channel m a)
forall (m :: * -> *) a. Monad m => a -> m a
return (TMVar_ (STM m) a -> TMVar_ (STM m) a -> Channel m a
forall (m :: * -> *) a.
MonadSTM m =>
TMVar m a -> TMVar m a -> Channel m a
mvarsAsChannel TMVar_ (STM m) a
bufferB TMVar_ (STM m) a
bufferA,
            TMVar_ (STM m) a -> TMVar_ (STM m) a -> Channel m a
forall (m :: * -> *) a.
MonadSTM m =>
TMVar m a -> TMVar m a -> Channel m a
mvarsAsChannel TMVar_ (STM m) a
bufferA TMVar_ (STM m) a
bufferB)


-- | Create a pair of channels that are connected via N-place buffers.
--
-- This variant /blocks/ when 'send' would exceed the maximum buffer size.
-- Use this variant when you want the environment rather than the 'Peer' to
-- limit the pipelining.
--
-- This is primarily useful for testing protocols.
--
createConnectedBufferedChannels :: MonadSTM m
                                => Natural -> m (Channel m a, Channel m a)
createConnectedBufferedChannels :: Natural -> m (Channel m a, Channel m a)
createConnectedBufferedChannels Natural
sz = do
    -- Create two TBQueues to act as the channel buffers (one for each
    -- direction) and use them to make both ends of a bidirectional channel
    TBQueue_ (STM m) a
bufferA <- STM m (TBQueue_ (STM m) a) -> m (TBQueue_ (STM m) a)
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (TBQueue_ (STM m) a) -> m (TBQueue_ (STM m) a))
-> STM m (TBQueue_ (STM m) a) -> m (TBQueue_ (STM m) a)
forall a b. (a -> b) -> a -> b
$ Natural -> STM m (TBQueue_ (STM m) a)
forall (stm :: * -> *) a.
MonadSTMTx stm =>
Natural -> stm (TBQueue_ stm a)
newTBQueue Natural
sz
    TBQueue_ (STM m) a
bufferB <- STM m (TBQueue_ (STM m) a) -> m (TBQueue_ (STM m) a)
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (TBQueue_ (STM m) a) -> m (TBQueue_ (STM m) a))
-> STM m (TBQueue_ (STM m) a) -> m (TBQueue_ (STM m) a)
forall a b. (a -> b) -> a -> b
$ Natural -> STM m (TBQueue_ (STM m) a)
forall (stm :: * -> *) a.
MonadSTMTx stm =>
Natural -> stm (TBQueue_ stm a)
newTBQueue Natural
sz

    (Channel m a, Channel m a) -> m (Channel m a, Channel m a)
forall (m :: * -> *) a. Monad m => a -> m a
return (TBQueue_ (STM m) a -> TBQueue_ (STM m) a -> Channel m a
forall (m :: * -> *) a.
MonadSTM m =>
TBQueue_ (STM m) a -> TBQueue_ (STM m) a -> Channel m a
queuesAsChannel TBQueue_ (STM m) a
bufferB TBQueue_ (STM m) a
bufferA,
            TBQueue_ (STM m) a -> TBQueue_ (STM m) a -> Channel m a
forall (m :: * -> *) a.
MonadSTM m =>
TBQueue_ (STM m) a -> TBQueue_ (STM m) a -> Channel m a
queuesAsChannel TBQueue_ (STM m) a
bufferA TBQueue_ (STM m) a
bufferB)
  where
    queuesAsChannel :: TBQueue_ (STM m) a -> TBQueue_ (STM m) a -> Channel m a
queuesAsChannel TBQueue_ (STM m) a
bufferRead TBQueue_ (STM m) a
bufferWrite =
        Channel :: forall (m :: * -> *) a. (a -> m ()) -> m (Maybe a) -> Channel m a
Channel{a -> m ()
send :: a -> m ()
send :: a -> m ()
send, m (Maybe a)
recv :: m (Maybe a)
recv :: m (Maybe a)
recv}
      where
        send :: a -> m ()
send a
x = STM m () -> m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (TBQueue_ (STM m) a -> a -> STM m ()
forall (stm :: * -> *) a.
MonadSTMTx stm =>
TBQueue_ stm a -> a -> stm ()
writeTBQueue TBQueue_ (STM m) a
bufferWrite a
x)
        recv :: m (Maybe a)
recv   = STM m (Maybe a) -> m (Maybe a)
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (a -> Maybe a
forall a. a -> Maybe a
Just (a -> Maybe a) -> STM m a -> STM m (Maybe a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TBQueue_ (STM m) a -> STM m a
forall (stm :: * -> *) a. MonadSTMTx stm => TBQueue_ stm a -> stm a
readTBQueue TBQueue_ (STM m) a
bufferRead)


-- | Create a pair of channels that are connected via N-place buffers.
--
-- This variant /fails/ when  'send' would exceed the maximum buffer size.
-- Use this variant when you want the 'PeerPipelined' to limit the pipelining
-- itself, and you want to check that it does not exceed the expected level of
-- pipelining.
--
-- This is primarily useful for testing protocols.
--
createPipelineTestChannels :: MonadSTM m
                           => Natural -> m (Channel m a, Channel m a)
createPipelineTestChannels :: Natural -> m (Channel m a, Channel m a)
createPipelineTestChannels Natural
sz = do
    -- Create two TBQueues to act as the channel buffers (one for each
    -- direction) and use them to make both ends of a bidirectional channel
    TBQueue_ (STM m) a
bufferA <- STM m (TBQueue_ (STM m) a) -> m (TBQueue_ (STM m) a)
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (TBQueue_ (STM m) a) -> m (TBQueue_ (STM m) a))
-> STM m (TBQueue_ (STM m) a) -> m (TBQueue_ (STM m) a)
forall a b. (a -> b) -> a -> b
$ Natural -> STM m (TBQueue_ (STM m) a)
forall (stm :: * -> *) a.
MonadSTMTx stm =>
Natural -> stm (TBQueue_ stm a)
newTBQueue Natural
sz
    TBQueue_ (STM m) a
bufferB <- STM m (TBQueue_ (STM m) a) -> m (TBQueue_ (STM m) a)
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (TBQueue_ (STM m) a) -> m (TBQueue_ (STM m) a))
-> STM m (TBQueue_ (STM m) a) -> m (TBQueue_ (STM m) a)
forall a b. (a -> b) -> a -> b
$ Natural -> STM m (TBQueue_ (STM m) a)
forall (stm :: * -> *) a.
MonadSTMTx stm =>
Natural -> stm (TBQueue_ stm a)
newTBQueue Natural
sz

    (Channel m a, Channel m a) -> m (Channel m a, Channel m a)
forall (m :: * -> *) a. Monad m => a -> m a
return (TBQueue_ (STM m) a -> TBQueue_ (STM m) a -> Channel m a
forall (m :: * -> *) a.
MonadSTM m =>
TBQueue_ (STM m) a -> TBQueue_ (STM m) a -> Channel m a
queuesAsChannel TBQueue_ (STM m) a
bufferB TBQueue_ (STM m) a
bufferA,
            TBQueue_ (STM m) a -> TBQueue_ (STM m) a -> Channel m a
forall (m :: * -> *) a.
MonadSTM m =>
TBQueue_ (STM m) a -> TBQueue_ (STM m) a -> Channel m a
queuesAsChannel TBQueue_ (STM m) a
bufferA TBQueue_ (STM m) a
bufferB)
  where
    queuesAsChannel :: TBQueue_ (STM m) a -> TBQueue_ (STM m) a -> Channel m a
queuesAsChannel TBQueue_ (STM m) a
bufferRead TBQueue_ (STM m) a
bufferWrite =
        Channel :: forall (m :: * -> *) a. (a -> m ()) -> m (Maybe a) -> Channel m a
Channel{a -> m ()
send :: a -> m ()
send :: a -> m ()
send, m (Maybe a)
recv :: m (Maybe a)
recv :: m (Maybe a)
recv}
      where
        send :: a -> m ()
send a
x = STM m () -> m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
                   Bool
full <- TBQueue_ (STM m) a -> STM m Bool
forall (stm :: * -> *) a.
MonadSTMTx stm =>
TBQueue_ stm a -> stm Bool
isFullTBQueue TBQueue_ (STM m) a
bufferWrite
                   if Bool
full then [Char] -> STM m ()
forall a. HasCallStack => [Char] -> a
error [Char]
failureMsg
                           else TBQueue_ (STM m) a -> a -> STM m ()
forall (stm :: * -> *) a.
MonadSTMTx stm =>
TBQueue_ stm a -> a -> stm ()
writeTBQueue TBQueue_ (STM m) a
bufferWrite a
x
        recv :: m (Maybe a)
recv   = STM m (Maybe a) -> m (Maybe a)
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (a -> Maybe a
forall a. a -> Maybe a
Just (a -> Maybe a) -> STM m a -> STM m (Maybe a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TBQueue_ (STM m) a -> STM m a
forall (stm :: * -> *) a. MonadSTMTx stm => TBQueue_ stm a -> stm a
readTBQueue TBQueue_ (STM m) a
bufferRead)

    failureMsg :: [Char]
failureMsg = [Char]
"createPipelineTestChannels: "
              [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ [Char]
"maximum pipeline depth exceeded: " [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ Natural -> [Char]
forall a. Show a => a -> [Char]
show Natural
sz


-- | Make a 'Channel' from a pair of IO 'Handle's, one for reading and one
-- for writing.
--
-- The Handles should be open in the appropriate read or write mode, and in
-- binary mode. Writes are flushed after each write, so it is safe to use
-- a buffering mode.
--
-- For bidirectional handles it is safe to pass the same handle for both.
--
handlesAsChannel :: IO.Handle -- ^ Read handle
                 -> IO.Handle -- ^ Write handle
                 -> Channel IO LBS.ByteString
handlesAsChannel :: Handle -> Handle -> Channel IO ByteString
handlesAsChannel Handle
hndRead Handle
hndWrite =
    Channel :: forall (m :: * -> *) a. (a -> m ()) -> m (Maybe a) -> Channel m a
Channel{ByteString -> IO ()
send :: ByteString -> IO ()
send :: ByteString -> IO ()
send, IO (Maybe ByteString)
recv :: IO (Maybe ByteString)
recv :: IO (Maybe ByteString)
recv}
  where
    send :: LBS.ByteString -> IO ()
    send :: ByteString -> IO ()
send ByteString
chunk = do
      Handle -> ByteString -> IO ()
LBS.hPut Handle
hndWrite ByteString
chunk
      Handle -> IO ()
IO.hFlush Handle
hndWrite

    recv :: IO (Maybe LBS.ByteString)
    recv :: IO (Maybe ByteString)
recv = do
      Bool
eof <- Handle -> IO Bool
IO.hIsEOF Handle
hndRead
      if Bool
eof
        then Maybe ByteString -> IO (Maybe ByteString)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe ByteString
forall a. Maybe a
Nothing
        else ByteString -> Maybe ByteString
forall a. a -> Maybe a
Just (ByteString -> Maybe ByteString)
-> (ByteString -> ByteString) -> ByteString -> Maybe ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> ByteString
LBS.fromStrict (ByteString -> Maybe ByteString)
-> IO ByteString -> IO (Maybe ByteString)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Handle -> Int -> IO ByteString
BS.hGetSome Handle
hndRead Int
smallChunkSize


-- | Transform a channel to add an extra action before /every/ send and after
-- /every/ receive.
--
channelEffect :: forall m a. 
                 Monad m
              => (a -> m ())        -- ^ Action before 'send'
              -> (Maybe a -> m ())  -- ^ Action after 'recv'
              -> Channel m a
              -> Channel m a
channelEffect :: (a -> m ()) -> (Maybe a -> m ()) -> Channel m a -> Channel m a
channelEffect a -> m ()
beforeSend Maybe a -> m ()
afterRecv Channel{a -> m ()
send :: a -> m ()
send :: forall (m :: * -> *) a. Channel m a -> a -> m ()
send, m (Maybe a)
recv :: m (Maybe a)
recv :: forall (m :: * -> *) a. Channel m a -> m (Maybe a)
recv} =
    Channel :: forall (m :: * -> *) a. (a -> m ()) -> m (Maybe a) -> Channel m a
Channel{
      send :: a -> m ()
send = \a
x -> do
        a -> m ()
beforeSend a
x
        a -> m ()
send a
x

    , recv :: m (Maybe a)
recv = do
        Maybe a
mx <- m (Maybe a)
recv
        Maybe a -> m ()
afterRecv Maybe a
mx
        Maybe a -> m (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
mx
    }

-- | Delay a channel on the receiver end.
--
-- This is intended for testing, as a crude approximation of network delays.
-- More accurate models along these lines are of course possible.
--
delayChannel :: ( MonadSTM m
                , MonadTimer m
                )
             => DiffTime
             -> Channel m a
             -> Channel m a
delayChannel :: DiffTime -> Channel m a -> Channel m a
delayChannel DiffTime
delay = (a -> m ()) -> (Maybe a -> m ()) -> Channel m a -> Channel m a
forall (m :: * -> *) a.
Monad m =>
(a -> m ()) -> (Maybe a -> m ()) -> Channel m a -> Channel m a
channelEffect (\a
_ -> () -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())
                                   (\Maybe a
_ -> DiffTime -> m ()
forall (m :: * -> *). MonadDelay m => DiffTime -> m ()
threadDelay DiffTime
delay)


-- | Channel which logs sent and received messages.
--
loggingChannel :: ( MonadSay m
                  , Show id
                  , Show a
                  )
               => id
               -> Channel m a
               -> Channel m a
loggingChannel :: id -> Channel m a -> Channel m a
loggingChannel id
ident Channel{a -> m ()
send :: a -> m ()
send :: forall (m :: * -> *) a. Channel m a -> a -> m ()
send,m (Maybe a)
recv :: m (Maybe a)
recv :: forall (m :: * -> *) a. Channel m a -> m (Maybe a)
recv} =
  Channel :: forall (m :: * -> *) a. (a -> m ()) -> m (Maybe a) -> Channel m a
Channel {
    send :: a -> m ()
send = a -> m ()
loggingSend,
    recv :: m (Maybe a)
recv = m (Maybe a)
loggingRecv
  }
 where
  loggingSend :: a -> m ()
loggingSend a
a = do
    [Char] -> m ()
forall (m :: * -> *). MonadSay m => [Char] -> m ()
say (id -> [Char]
forall a. Show a => a -> [Char]
show id
ident [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ [Char]
":send:" [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ a -> [Char]
forall a. Show a => a -> [Char]
show a
a)
    a -> m ()
send a
a

  loggingRecv :: m (Maybe a)
loggingRecv = do
    Maybe a
msg <- m (Maybe a)
recv
    case Maybe a
msg of
      Maybe a
Nothing -> () -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
      Just a
a  -> [Char] -> m ()
forall (m :: * -> *). MonadSay m => [Char] -> m ()
say (id -> [Char]
forall a. Show a => a -> [Char]
show id
ident [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ [Char]
":recv:" [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ a -> [Char]
forall a. Show a => a -> [Char]
show a
a)
    Maybe a -> m (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
msg