{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GADTSyntax #-}
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies #-}
module Network.Mux (
newMux
, Mux
, MuxMode (..)
, HasInitiator
, HasResponder
, MiniProtocolBundle (..)
, MiniProtocolInfo (..)
, MiniProtocolNum (..)
, MiniProtocolDirection (..)
, MiniProtocolLimits (..)
, runMux
, MuxBearer
, runMiniProtocol
, StartOnDemandOrEagerly (..)
, stopMux
, MuxError (..)
, MuxErrorType (..)
, traceMuxBearerState
, MuxBearerState (..)
, MuxTrace (..)
, WithMuxBearer (..)
) where
import qualified Data.ByteString.Lazy as BL
import Data.Int (Int64)
import Data.Map (Map)
import qualified Data.Map.Strict as Map
import Control.Applicative
import qualified Control.Concurrent.JobPool as JobPool
import Control.Monad
import Control.Monad.Class.MonadAsync
import Control.Monad.Class.MonadFork
import Control.Monad.Class.MonadSTM.Strict
import Control.Monad.Class.MonadThrow
import Control.Monad.Class.MonadTime
import Control.Monad.Class.MonadTimer
import Control.Tracer
import Network.Mux.Channel
import Network.Mux.Egress as Egress
import Network.Mux.Ingress as Ingress
import Network.Mux.Trace
import Network.Mux.Types
data Mux (mode :: MuxMode) m =
Mux {
Mux mode m
-> Map
(MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m)
muxMiniProtocols :: !(Map (MiniProtocolNum, MiniProtocolDir)
(MiniProtocolState mode m)),
Mux mode m -> TQueue m (ControlCmd mode m)
muxControlCmdQueue :: !(TQueue m (ControlCmd mode m)),
Mux mode m -> StrictTVar m MuxStatus
muxStatus :: StrictTVar m MuxStatus
}
data MuxStatus = MuxReady | MuxFailed SomeException | MuxStopped
newMux :: MonadSTM m => MiniProtocolBundle mode -> m (Mux mode m)
newMux :: MiniProtocolBundle mode -> m (Mux mode m)
newMux (MiniProtocolBundle [MiniProtocolInfo mode]
ptcls) = do
Map (MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m)
muxMiniProtocols <- [MiniProtocolInfo mode]
-> m (Map
(MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m))
forall (m :: * -> *) (mode :: MuxMode).
MonadSTM m =>
[MiniProtocolInfo mode]
-> m (Map
(MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m))
mkMiniProtocolStateMap [MiniProtocolInfo mode]
ptcls
TQueue_ (STM m) (ControlCmd mode m)
muxControlCmdQueue <- STM m (TQueue_ (STM m) (ControlCmd mode m))
-> m (TQueue_ (STM m) (ControlCmd mode m))
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically STM m (TQueue_ (STM m) (ControlCmd mode m))
forall (stm :: * -> *) a. MonadSTMTx stm => stm (TQueue_ stm a)
newTQueue
StrictTVar m MuxStatus
muxStatus <- MuxStatus -> m (StrictTVar m MuxStatus)
forall (m :: * -> *) a. MonadSTM m => a -> m (StrictTVar m a)
newTVarIO MuxStatus
MuxReady
Mux mode m -> m (Mux mode m)
forall (m :: * -> *) a. Monad m => a -> m a
return Mux :: forall (mode :: MuxMode) (m :: * -> *).
Map (MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m)
-> TQueue m (ControlCmd mode m)
-> StrictTVar m MuxStatus
-> Mux mode m
Mux {
Map (MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m)
muxMiniProtocols :: Map (MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m)
muxMiniProtocols :: Map (MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m)
muxMiniProtocols,
TQueue_ (STM m) (ControlCmd mode m)
muxControlCmdQueue :: TQueue_ (STM m) (ControlCmd mode m)
muxControlCmdQueue :: TQueue_ (STM m) (ControlCmd mode m)
muxControlCmdQueue,
StrictTVar m MuxStatus
muxStatus :: StrictTVar m MuxStatus
muxStatus :: StrictTVar m MuxStatus
muxStatus
}
mkMiniProtocolStateMap :: MonadSTM m
=> [MiniProtocolInfo mode]
-> m (Map (MiniProtocolNum, MiniProtocolDir)
(MiniProtocolState mode m))
mkMiniProtocolStateMap :: [MiniProtocolInfo mode]
-> m (Map
(MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m))
mkMiniProtocolStateMap [MiniProtocolInfo mode]
ptcls =
[((MiniProtocolNum, MiniProtocolDir), MiniProtocolState mode m)]
-> Map
(MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m)
forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList ([((MiniProtocolNum, MiniProtocolDir), MiniProtocolState mode m)]
-> Map
(MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m))
-> m [((MiniProtocolNum, MiniProtocolDir),
MiniProtocolState mode m)]
-> m (Map
(MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
[m ((MiniProtocolNum, MiniProtocolDir), MiniProtocolState mode m)]
-> m [((MiniProtocolNum, MiniProtocolDir),
MiniProtocolState mode m)]
forall (t :: * -> *) (m :: * -> *) a.
(Traversable t, Monad m) =>
t (m a) -> m (t a)
sequence
[ do MiniProtocolState mode m
state <- MiniProtocolInfo mode -> m (MiniProtocolState mode m)
forall (m :: * -> *) (mode :: MuxMode).
MonadSTM m =>
MiniProtocolInfo mode -> m (MiniProtocolState mode m)
mkMiniProtocolState MiniProtocolInfo mode
ptcl
((MiniProtocolNum, MiniProtocolDir), MiniProtocolState mode m)
-> m ((MiniProtocolNum, MiniProtocolDir), MiniProtocolState mode m)
forall (m :: * -> *) a. Monad m => a -> m a
return ((MiniProtocolNum
miniProtocolNum, MiniProtocolDirection mode -> MiniProtocolDir
forall (mode :: MuxMode).
MiniProtocolDirection mode -> MiniProtocolDir
protocolDirEnum MiniProtocolDirection mode
miniProtocolDir), MiniProtocolState mode m
state)
| ptcl :: MiniProtocolInfo mode
ptcl@MiniProtocolInfo {MiniProtocolNum
miniProtocolNum :: forall (mode :: MuxMode). MiniProtocolInfo mode -> MiniProtocolNum
miniProtocolNum :: MiniProtocolNum
miniProtocolNum, MiniProtocolDirection mode
miniProtocolDir :: forall (mode :: MuxMode).
MiniProtocolInfo mode -> MiniProtocolDirection mode
miniProtocolDir :: MiniProtocolDirection mode
miniProtocolDir} <- [MiniProtocolInfo mode]
ptcls ]
mkMiniProtocolState :: MonadSTM m
=> MiniProtocolInfo mode
-> m (MiniProtocolState mode m)
mkMiniProtocolState :: MiniProtocolInfo mode -> m (MiniProtocolState mode m)
mkMiniProtocolState MiniProtocolInfo mode
miniProtocolInfo = do
StrictTVar m ByteString
miniProtocolIngressQueue <- ByteString -> m (StrictTVar m ByteString)
forall (m :: * -> *) a. MonadSTM m => a -> m (StrictTVar m a)
newTVarIO ByteString
BL.empty
StrictTVar m MiniProtocolStatus
miniProtocolStatusVar <- MiniProtocolStatus -> m (StrictTVar m MiniProtocolStatus)
forall (m :: * -> *) a. MonadSTM m => a -> m (StrictTVar m a)
newTVarIO MiniProtocolStatus
StatusIdle
MiniProtocolState mode m -> m (MiniProtocolState mode m)
forall (m :: * -> *) a. Monad m => a -> m a
return MiniProtocolState :: forall (mode :: MuxMode) (m :: * -> *).
MiniProtocolInfo mode
-> IngressQueue m
-> StrictTVar m MiniProtocolStatus
-> MiniProtocolState mode m
MiniProtocolState {
MiniProtocolInfo mode
miniProtocolInfo :: MiniProtocolInfo mode
miniProtocolInfo :: MiniProtocolInfo mode
miniProtocolInfo,
StrictTVar m ByteString
miniProtocolIngressQueue :: StrictTVar m ByteString
miniProtocolIngressQueue :: StrictTVar m ByteString
miniProtocolIngressQueue,
StrictTVar m MiniProtocolStatus
miniProtocolStatusVar :: StrictTVar m MiniProtocolStatus
miniProtocolStatusVar :: StrictTVar m MiniProtocolStatus
miniProtocolStatusVar
}
stopMux :: MonadSTM m => Mux mode m -> m ()
stopMux :: Mux mode m -> m ()
stopMux Mux{TQueue m (ControlCmd mode m)
muxControlCmdQueue :: TQueue m (ControlCmd mode m)
muxControlCmdQueue :: forall (mode :: MuxMode) (m :: * -> *).
Mux mode m -> TQueue m (ControlCmd mode m)
muxControlCmdQueue} =
STM m () -> m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ TQueue m (ControlCmd mode m) -> ControlCmd mode m -> STM m ()
forall (stm :: * -> *) a.
MonadSTMTx stm =>
TQueue_ stm a -> a -> stm ()
writeTQueue TQueue m (ControlCmd mode m)
muxControlCmdQueue ControlCmd mode m
forall (mode :: MuxMode) (m :: * -> *). ControlCmd mode m
CmdShutdown
runMux :: forall m mode.
( MonadAsync m
, MonadCatch m
, MonadFork m
, MonadThrow (STM m)
, MonadTime m
, MonadTimer m
, MonadMask m
)
=> Tracer m MuxTrace
-> Mux mode m
-> MuxBearer m
-> m ()
runMux :: Tracer m MuxTrace -> Mux mode m -> MuxBearer m -> m ()
runMux Tracer m MuxTrace
tracer Mux {Map (MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m)
muxMiniProtocols :: Map (MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m)
muxMiniProtocols :: forall (mode :: MuxMode) (m :: * -> *).
Mux mode m
-> Map
(MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m)
muxMiniProtocols, TQueue m (ControlCmd mode m)
muxControlCmdQueue :: TQueue m (ControlCmd mode m)
muxControlCmdQueue :: forall (mode :: MuxMode) (m :: * -> *).
Mux mode m -> TQueue m (ControlCmd mode m)
muxControlCmdQueue, StrictTVar m MuxStatus
muxStatus :: StrictTVar m MuxStatus
muxStatus :: forall (mode :: MuxMode) (m :: * -> *).
Mux mode m -> StrictTVar m MuxStatus
muxStatus} MuxBearer m
bearer = do
TBQueue_ (STM m) (TranslocationServiceRequest m)
egressQueue <- STM m (TBQueue_ (STM m) (TranslocationServiceRequest m))
-> m (TBQueue_ (STM m) (TranslocationServiceRequest m))
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (TBQueue_ (STM m) (TranslocationServiceRequest m))
-> m (TBQueue_ (STM m) (TranslocationServiceRequest m)))
-> STM m (TBQueue_ (STM m) (TranslocationServiceRequest m))
-> m (TBQueue_ (STM m) (TranslocationServiceRequest m))
forall a b. (a -> b) -> a -> b
$ Natural -> STM m (TBQueue_ (STM m) (TranslocationServiceRequest m))
forall (stm :: * -> *) a.
MonadSTMTx stm =>
Natural -> stm (TBQueue_ stm a)
newTBQueue Natural
100
(JobPool m MuxJobResult -> m ()) -> m ()
forall (m :: * -> *) a b.
(MonadAsync m, MonadThrow m) =>
(JobPool m a -> m b) -> m b
JobPool.withJobPool ((JobPool m MuxJobResult -> m ()) -> m ())
-> (JobPool m MuxJobResult -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \JobPool m MuxJobResult
jobpool -> do
JobPool m MuxJobResult -> Job m MuxJobResult -> m ()
forall (m :: * -> *) a.
(MonadAsync m, MonadMask m) =>
JobPool m a -> Job m a -> m ()
JobPool.forkJob JobPool m MuxJobResult
jobpool (TBQueue_ (STM m) (TranslocationServiceRequest m)
-> Job m MuxJobResult
muxerJob TBQueue_ (STM m) (TranslocationServiceRequest m)
egressQueue)
JobPool m MuxJobResult -> Job m MuxJobResult -> m ()
forall (m :: * -> *) a.
(MonadAsync m, MonadMask m) =>
JobPool m a -> Job m a -> m ()
JobPool.forkJob JobPool m MuxJobResult
jobpool Job m MuxJobResult
demuxerJob
Tracer m MuxTrace -> MuxTrace -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m MuxTrace
tracer (MuxBearerState -> MuxTrace
MuxTraceState MuxBearerState
Mature)
Tracer m MuxTrace
-> JobPool m MuxJobResult
-> TBQueue_ (STM m) (TranslocationServiceRequest m)
-> TQueue m (ControlCmd mode m)
-> StrictTVar m MuxStatus
-> m ()
forall (mode :: MuxMode) (m :: * -> *).
(MonadSTM m, MonadAsync m, MonadMask m, MonadThrow (STM m)) =>
Tracer m MuxTrace
-> JobPool m MuxJobResult
-> EgressQueue m
-> TQueue m (ControlCmd mode m)
-> StrictTVar m MuxStatus
-> m ()
monitor Tracer m MuxTrace
tracer JobPool m MuxJobResult
jobpool TBQueue_ (STM m) (TranslocationServiceRequest m)
egressQueue TQueue m (ControlCmd mode m)
muxControlCmdQueue StrictTVar m MuxStatus
muxStatus
where
muxerJob :: TBQueue_ (STM m) (TranslocationServiceRequest m)
-> Job m MuxJobResult
muxerJob TBQueue_ (STM m) (TranslocationServiceRequest m)
egressQueue =
m MuxJobResult
-> (SomeException -> MuxJobResult) -> String -> Job m MuxJobResult
forall (m :: * -> *) a.
m a -> (SomeException -> a) -> String -> Job m a
JobPool.Job (TBQueue_ (STM m) (TranslocationServiceRequest m)
-> MuxBearer m -> m MuxJobResult
forall (m :: * -> *) void.
(MonadAsync m, MonadFork m, MonadMask m, MonadThrow (STM m),
MonadTimer m, MonadTime m) =>
EgressQueue m -> MuxBearer m -> m void
muxer TBQueue_ (STM m) (TranslocationServiceRequest m)
egressQueue MuxBearer m
bearer)
SomeException -> MuxJobResult
MuxerException String
"muxer"
demuxerJob :: Job m MuxJobResult
demuxerJob =
m MuxJobResult
-> (SomeException -> MuxJobResult) -> String -> Job m MuxJobResult
forall (m :: * -> *) a.
m a -> (SomeException -> a) -> String -> Job m a
JobPool.Job ([MiniProtocolState mode m] -> MuxBearer m -> m MuxJobResult
forall (m :: * -> *) (mode :: MuxMode) void.
(MonadAsync m, MonadFork m, MonadMask m, MonadThrow (STM m),
MonadTimer m, MonadTime m) =>
[MiniProtocolState mode m] -> MuxBearer m -> m void
demuxer (Map (MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m)
-> [MiniProtocolState mode m]
forall k a. Map k a -> [a]
Map.elems Map (MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m)
muxMiniProtocols) MuxBearer m
bearer)
SomeException -> MuxJobResult
DemuxerException String
"demuxer"
miniProtocolJob
:: forall mode m.
( MonadSTM m
, MonadThrow (STM m)
)
=> Tracer m MuxTrace
-> EgressQueue m
-> MiniProtocolState mode m
-> MiniProtocolAction m
-> JobPool.Job m MuxJobResult
miniProtocolJob :: Tracer m MuxTrace
-> EgressQueue m
-> MiniProtocolState mode m
-> MiniProtocolAction m
-> Job m MuxJobResult
miniProtocolJob Tracer m MuxTrace
tracer EgressQueue m
egressQueue
MiniProtocolState {
miniProtocolInfo :: forall (mode :: MuxMode) (m :: * -> *).
MiniProtocolState mode m -> MiniProtocolInfo mode
miniProtocolInfo =
MiniProtocolInfo {
MiniProtocolNum
miniProtocolNum :: MiniProtocolNum
miniProtocolNum :: forall (mode :: MuxMode). MiniProtocolInfo mode -> MiniProtocolNum
miniProtocolNum,
MiniProtocolDirection mode
miniProtocolDir :: MiniProtocolDirection mode
miniProtocolDir :: forall (mode :: MuxMode).
MiniProtocolInfo mode -> MiniProtocolDirection mode
miniProtocolDir
},
IngressQueue m
miniProtocolIngressQueue :: IngressQueue m
miniProtocolIngressQueue :: forall (mode :: MuxMode) (m :: * -> *).
MiniProtocolState mode m -> IngressQueue m
miniProtocolIngressQueue,
StrictTVar m MiniProtocolStatus
miniProtocolStatusVar :: StrictTVar m MiniProtocolStatus
miniProtocolStatusVar :: forall (mode :: MuxMode) (m :: * -> *).
MiniProtocolState mode m -> StrictTVar m MiniProtocolStatus
miniProtocolStatusVar
}
(MiniProtocolAction Channel m -> m (a, Maybe ByteString)
protocolAction StrictTMVar m (Either SomeException a)
completionVar) =
m MuxJobResult
-> (SomeException -> MuxJobResult) -> String -> Job m MuxJobResult
forall (m :: * -> *) a.
m a -> (SomeException -> a) -> String -> Job m a
JobPool.Job m MuxJobResult
jobAction
(MiniProtocolNum -> MiniProtocolDir -> SomeException -> MuxJobResult
MiniProtocolException MiniProtocolNum
miniProtocolNum MiniProtocolDir
miniProtocolDirEnum)
(MiniProtocolNum -> String
forall a. Show a => a -> String
show MiniProtocolNum
miniProtocolNum String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
"." String -> String -> String
forall a. [a] -> [a] -> [a]
++ MiniProtocolDir -> String
forall a. Show a => a -> String
show MiniProtocolDir
miniProtocolDirEnum)
where
jobAction :: m MuxJobResult
jobAction = do
IngressQueue m
w <- ByteString -> m (IngressQueue m)
forall (m :: * -> *) a. MonadSTM m => a -> m (StrictTVar m a)
newTVarIO ByteString
BL.empty
let chan :: Channel m
chan = Tracer m MuxTrace
-> EgressQueue m
-> Wanton m
-> MiniProtocolNum
-> MiniProtocolDir
-> IngressQueue m
-> Channel m
forall (m :: * -> *).
MonadSTM m =>
Tracer m MuxTrace
-> EgressQueue m
-> Wanton m
-> MiniProtocolNum
-> MiniProtocolDir
-> IngressQueue m
-> Channel m
muxChannel Tracer m MuxTrace
tracer EgressQueue m
egressQueue (IngressQueue m -> Wanton m
forall (m :: * -> *). StrictTVar m ByteString -> Wanton m
Wanton IngressQueue m
w)
MiniProtocolNum
miniProtocolNum MiniProtocolDir
miniProtocolDirEnum
IngressQueue m
miniProtocolIngressQueue
(a
result, Maybe ByteString
remainder) <- Channel m -> m (a, Maybe ByteString)
protocolAction Channel m
chan
IngressQueue m -> m ()
mpsJobExit IngressQueue m
w
STM m () -> m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
StrictTVar m MiniProtocolStatus -> MiniProtocolStatus -> STM m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m MiniProtocolStatus
miniProtocolStatusVar MiniProtocolStatus
StatusIdle
StrictTMVar m (Either SomeException a)
-> Either SomeException a -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTMVar m a -> a -> STM m ()
putTMVar StrictTMVar m (Either SomeException a)
completionVar (a -> Either SomeException a
forall a b. b -> Either a b
Right a
result)
STM m () -> STM m () -> STM m ()
forall (stm :: * -> *) a. MonadSTMTx stm => stm a -> stm a -> stm a
`orElse` (MuxError -> STM m ()
forall (stm :: * -> *) e a.
(MonadSTMTx stm, MonadThrow stm, Exception e) =>
e -> stm a
throwSTM (MuxErrorType -> String -> MuxError
MuxError (MiniProtocolNum -> MuxErrorType
MuxBlockedOnCompletionVar MiniProtocolNum
miniProtocolNum) String
""))
case Maybe ByteString
remainder of
Just ByteString
trailing ->
IngressQueue m -> (ByteString -> ByteString) -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> (a -> a) -> STM m ()
modifyTVar IngressQueue m
miniProtocolIngressQueue (ByteString -> ByteString -> ByteString
BL.append ByteString
trailing)
Maybe ByteString
Nothing ->
() -> STM m ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
MuxJobResult -> m MuxJobResult
forall (m :: * -> *) a. Monad m => a -> m a
return (MiniProtocolNum -> MiniProtocolDir -> MuxJobResult
MiniProtocolShutdown MiniProtocolNum
miniProtocolNum MiniProtocolDir
miniProtocolDirEnum)
miniProtocolDirEnum :: MiniProtocolDir
miniProtocolDirEnum :: MiniProtocolDir
miniProtocolDirEnum = MiniProtocolDirection mode -> MiniProtocolDir
forall (mode :: MuxMode).
MiniProtocolDirection mode -> MiniProtocolDir
protocolDirEnum MiniProtocolDirection mode
miniProtocolDir
mpsJobExit :: IngressQueue m -> m ()
mpsJobExit :: IngressQueue m -> m ()
mpsJobExit IngressQueue m
w = do
Tracer m MuxTrace -> MuxTrace -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m MuxTrace
tracer (MiniProtocolNum -> MiniProtocolDir -> MuxTrace
MuxTraceTerminating MiniProtocolNum
miniProtocolNum MiniProtocolDir
miniProtocolDirEnum)
STM m () -> m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
ByteString
buf <- IngressQueue m -> STM m ByteString
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar IngressQueue m
w
Bool -> STM m ()
forall (stm :: * -> *). MonadSTMTx stm => Bool -> stm ()
check (ByteString -> Bool
BL.null ByteString
buf)
data ControlCmd mode m =
CmdStartProtocolThread
!StartOnDemandOrEagerly
!(MiniProtocolState mode m)
!(MiniProtocolAction m)
| CmdShutdown
data StartOnDemandOrEagerly = StartOnDemand | StartEagerly
deriving StartOnDemandOrEagerly -> StartOnDemandOrEagerly -> Bool
(StartOnDemandOrEagerly -> StartOnDemandOrEagerly -> Bool)
-> (StartOnDemandOrEagerly -> StartOnDemandOrEagerly -> Bool)
-> Eq StartOnDemandOrEagerly
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: StartOnDemandOrEagerly -> StartOnDemandOrEagerly -> Bool
$c/= :: StartOnDemandOrEagerly -> StartOnDemandOrEagerly -> Bool
== :: StartOnDemandOrEagerly -> StartOnDemandOrEagerly -> Bool
$c== :: StartOnDemandOrEagerly -> StartOnDemandOrEagerly -> Bool
Eq
data MiniProtocolAction m where
MiniProtocolAction :: (Channel m -> m (a, Maybe BL.ByteString))
-> StrictTMVar m (Either SomeException a)
-> MiniProtocolAction m
monitor :: forall mode m. (MonadSTM m, MonadAsync m, MonadMask m, MonadThrow (STM m))
=> Tracer m MuxTrace
-> JobPool.JobPool m MuxJobResult
-> EgressQueue m
-> TQueue m (ControlCmd mode m)
-> StrictTVar m MuxStatus
-> m ()
monitor :: Tracer m MuxTrace
-> JobPool m MuxJobResult
-> EgressQueue m
-> TQueue m (ControlCmd mode m)
-> StrictTVar m MuxStatus
-> m ()
monitor Tracer m MuxTrace
tracer JobPool m MuxJobResult
jobpool EgressQueue m
egressQueue TQueue m (ControlCmd mode m)
cmdQueue StrictTVar m MuxStatus
muxStatus =
Map
(MiniProtocolNum, MiniProtocolDir)
(MiniProtocolState mode m, MiniProtocolAction m)
-> m ()
go Map
(MiniProtocolNum, MiniProtocolDir)
(MiniProtocolState mode m, MiniProtocolAction m)
forall k a. Map k a
Map.empty
where
go :: Map (MiniProtocolNum, MiniProtocolDir)
(MiniProtocolState mode m, MiniProtocolAction m)
-> m ()
go :: Map
(MiniProtocolNum, MiniProtocolDir)
(MiniProtocolState mode m, MiniProtocolAction m)
-> m ()
go !Map
(MiniProtocolNum, MiniProtocolDir)
(MiniProtocolState mode m, MiniProtocolAction m)
ptclsStartOnDemand = do
MonitorEvent mode m
result <- STM m (MonitorEvent mode m) -> m (MonitorEvent mode m)
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (MonitorEvent mode m) -> m (MonitorEvent mode m))
-> STM m (MonitorEvent mode m) -> m (MonitorEvent mode m)
forall a b. (a -> b) -> a -> b
$
(MuxJobResult -> MonitorEvent mode m
forall (mode :: MuxMode) (m :: * -> *).
MuxJobResult -> MonitorEvent mode m
EventJobResult (MuxJobResult -> MonitorEvent mode m)
-> STM m MuxJobResult -> STM m (MonitorEvent mode m)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> JobPool m MuxJobResult -> STM m MuxJobResult
forall (m :: * -> *) a. MonadSTM m => JobPool m a -> STM m a
JobPool.collect JobPool m MuxJobResult
jobpool)
STM m (MonitorEvent mode m)
-> STM m (MonitorEvent mode m) -> STM m (MonitorEvent mode m)
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> (ControlCmd mode m -> MonitorEvent mode m
forall (mode :: MuxMode) (m :: * -> *).
ControlCmd mode m -> MonitorEvent mode m
EventControlCmd (ControlCmd mode m -> MonitorEvent mode m)
-> STM m (ControlCmd mode m) -> STM m (MonitorEvent mode m)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TQueue m (ControlCmd mode m) -> STM m (ControlCmd mode m)
forall (stm :: * -> *) a. MonadSTMTx stm => TQueue_ stm a -> stm a
readTQueue TQueue m (ControlCmd mode m)
cmdQueue)
STM m (MonitorEvent mode m)
-> STM m (MonitorEvent mode m) -> STM m (MonitorEvent mode m)
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> (STM m (MonitorEvent mode m)
-> STM m (MonitorEvent mode m) -> STM m (MonitorEvent mode m))
-> STM m (MonitorEvent mode m)
-> [STM m (MonitorEvent mode m)]
-> STM m (MonitorEvent mode m)
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr STM m (MonitorEvent mode m)
-> STM m (MonitorEvent mode m) -> STM m (MonitorEvent mode m)
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
(<|>) STM m (MonitorEvent mode m)
forall (stm :: * -> *) a. MonadSTMTx stm => stm a
retry
[ IngressQueue m -> STM m ()
checkNonEmptyQueue (MiniProtocolState mode m -> IngressQueue m
forall (mode :: MuxMode) (m :: * -> *).
MiniProtocolState mode m -> IngressQueue m
miniProtocolIngressQueue MiniProtocolState mode m
ptclState) STM m ()
-> STM m (MonitorEvent mode m) -> STM m (MonitorEvent mode m)
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>>
MonitorEvent mode m -> STM m (MonitorEvent mode m)
forall (m :: * -> *) a. Monad m => a -> m a
return (MiniProtocolState mode m
-> MiniProtocolAction m -> MonitorEvent mode m
forall (mode :: MuxMode) (m :: * -> *).
MiniProtocolState mode m
-> MiniProtocolAction m -> MonitorEvent mode m
EventStartOnDemand MiniProtocolState mode m
ptclState MiniProtocolAction m
ptclAction)
| (MiniProtocolState mode m
ptclState, MiniProtocolAction m
ptclAction) <- Map
(MiniProtocolNum, MiniProtocolDir)
(MiniProtocolState mode m, MiniProtocolAction m)
-> [(MiniProtocolState mode m, MiniProtocolAction m)]
forall k a. Map k a -> [a]
Map.elems Map
(MiniProtocolNum, MiniProtocolDir)
(MiniProtocolState mode m, MiniProtocolAction m)
ptclsStartOnDemand ]
case MonitorEvent mode m
result of
EventJobResult (MiniProtocolShutdown MiniProtocolNum
pnum MiniProtocolDir
pmode) -> do
Tracer m MuxTrace -> MuxTrace -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m MuxTrace
tracer (MiniProtocolNum -> MiniProtocolDir -> MuxTrace
MuxTraceCleanExit MiniProtocolNum
pnum MiniProtocolDir
pmode)
Map
(MiniProtocolNum, MiniProtocolDir)
(MiniProtocolState mode m, MiniProtocolAction m)
-> m ()
go Map
(MiniProtocolNum, MiniProtocolDir)
(MiniProtocolState mode m, MiniProtocolAction m)
ptclsStartOnDemand
EventJobResult (MiniProtocolException MiniProtocolNum
pnum MiniProtocolDir
pmode SomeException
e) -> do
Tracer m MuxTrace -> MuxTrace -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m MuxTrace
tracer (MuxBearerState -> MuxTrace
MuxTraceState MuxBearerState
Dead)
Tracer m MuxTrace -> MuxTrace -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m MuxTrace
tracer (MiniProtocolNum -> MiniProtocolDir -> SomeException -> MuxTrace
MuxTraceExceptionExit MiniProtocolNum
pnum MiniProtocolDir
pmode SomeException
e)
STM m () -> m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ StrictTVar m MuxStatus -> MuxStatus -> STM m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m MuxStatus
muxStatus (MuxStatus -> STM m ()) -> MuxStatus -> STM m ()
forall a b. (a -> b) -> a -> b
$ SomeException -> MuxStatus
MuxFailed SomeException
e
SomeException -> m ()
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO SomeException
e
EventJobResult (MuxerException SomeException
e) -> do
Tracer m MuxTrace -> MuxTrace -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m MuxTrace
tracer (MuxBearerState -> MuxTrace
MuxTraceState MuxBearerState
Dead)
STM m () -> m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ StrictTVar m MuxStatus -> MuxStatus -> STM m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m MuxStatus
muxStatus (MuxStatus -> STM m ()) -> MuxStatus -> STM m ()
forall a b. (a -> b) -> a -> b
$ SomeException -> MuxStatus
MuxFailed SomeException
e
SomeException -> m ()
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO SomeException
e
EventJobResult (DemuxerException SomeException
e) -> do
Tracer m MuxTrace -> MuxTrace -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m MuxTrace
tracer (MuxBearerState -> MuxTrace
MuxTraceState MuxBearerState
Dead)
STM m () -> m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ StrictTVar m MuxStatus -> MuxStatus -> STM m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m MuxStatus
muxStatus (MuxStatus -> STM m ()) -> MuxStatus -> STM m ()
forall a b. (a -> b) -> a -> b
$ SomeException -> MuxStatus
MuxFailed SomeException
e
SomeException -> m ()
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO SomeException
e
EventControlCmd (CmdStartProtocolThread
StartOnDemandOrEagerly
StartEagerly
ptclState :: MiniProtocolState mode m
ptclState@MiniProtocolState {
miniProtocolInfo :: forall (mode :: MuxMode) (m :: * -> *).
MiniProtocolState mode m -> MiniProtocolInfo mode
miniProtocolInfo = MiniProtocolInfo {
MiniProtocolNum
miniProtocolNum :: MiniProtocolNum
miniProtocolNum :: forall (mode :: MuxMode). MiniProtocolInfo mode -> MiniProtocolNum
miniProtocolNum,
MiniProtocolDirection mode
miniProtocolDir :: MiniProtocolDirection mode
miniProtocolDir :: forall (mode :: MuxMode).
MiniProtocolInfo mode -> MiniProtocolDirection mode
miniProtocolDir
}
}
MiniProtocolAction m
ptclAction) -> do
Tracer m MuxTrace -> MuxTrace -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m MuxTrace
tracer (MiniProtocolNum -> MiniProtocolDir -> MuxTrace
MuxTraceStartEagerly MiniProtocolNum
miniProtocolNum
(MiniProtocolDirection mode -> MiniProtocolDir
forall (mode :: MuxMode).
MiniProtocolDirection mode -> MiniProtocolDir
protocolDirEnum MiniProtocolDirection mode
miniProtocolDir))
JobPool m MuxJobResult -> Job m MuxJobResult -> m ()
forall (m :: * -> *) a.
(MonadAsync m, MonadMask m) =>
JobPool m a -> Job m a -> m ()
JobPool.forkJob JobPool m MuxJobResult
jobpool (Job m MuxJobResult -> m ()) -> Job m MuxJobResult -> m ()
forall a b. (a -> b) -> a -> b
$
Tracer m MuxTrace
-> EgressQueue m
-> MiniProtocolState mode m
-> MiniProtocolAction m
-> Job m MuxJobResult
forall (mode :: MuxMode) (m :: * -> *).
(MonadSTM m, MonadThrow (STM m)) =>
Tracer m MuxTrace
-> EgressQueue m
-> MiniProtocolState mode m
-> MiniProtocolAction m
-> Job m MuxJobResult
miniProtocolJob
Tracer m MuxTrace
tracer
EgressQueue m
egressQueue
MiniProtocolState mode m
ptclState
MiniProtocolAction m
ptclAction
Map
(MiniProtocolNum, MiniProtocolDir)
(MiniProtocolState mode m, MiniProtocolAction m)
-> m ()
go Map
(MiniProtocolNum, MiniProtocolDir)
(MiniProtocolState mode m, MiniProtocolAction m)
ptclsStartOnDemand
EventControlCmd (CmdStartProtocolThread
StartOnDemandOrEagerly
StartOnDemand
ptclState :: MiniProtocolState mode m
ptclState@MiniProtocolState {
miniProtocolInfo :: forall (mode :: MuxMode) (m :: * -> *).
MiniProtocolState mode m -> MiniProtocolInfo mode
miniProtocolInfo = MiniProtocolInfo {
MiniProtocolNum
miniProtocolNum :: MiniProtocolNum
miniProtocolNum :: forall (mode :: MuxMode). MiniProtocolInfo mode -> MiniProtocolNum
miniProtocolNum,
MiniProtocolDirection mode
miniProtocolDir :: MiniProtocolDirection mode
miniProtocolDir :: forall (mode :: MuxMode).
MiniProtocolInfo mode -> MiniProtocolDirection mode
miniProtocolDir
}
}
MiniProtocolAction m
ptclAction) -> do
let ptclsStartOnDemand' :: Map
(MiniProtocolNum, MiniProtocolDir)
(MiniProtocolState mode m, MiniProtocolAction m)
ptclsStartOnDemand' = (MiniProtocolNum, MiniProtocolDir)
-> (MiniProtocolState mode m, MiniProtocolAction m)
-> Map
(MiniProtocolNum, MiniProtocolDir)
(MiniProtocolState mode m, MiniProtocolAction m)
-> Map
(MiniProtocolNum, MiniProtocolDir)
(MiniProtocolState mode m, MiniProtocolAction m)
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert (MiniProtocolState mode m -> (MiniProtocolNum, MiniProtocolDir)
forall (mode :: MuxMode) (m :: * -> *).
MiniProtocolState mode m -> (MiniProtocolNum, MiniProtocolDir)
protocolKey MiniProtocolState mode m
ptclState)
(MiniProtocolState mode m
ptclState, MiniProtocolAction m
ptclAction)
Map
(MiniProtocolNum, MiniProtocolDir)
(MiniProtocolState mode m, MiniProtocolAction m)
ptclsStartOnDemand
Tracer m MuxTrace -> MuxTrace -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m MuxTrace
tracer (MiniProtocolNum -> MiniProtocolDir -> MuxTrace
MuxTraceStartedOnDemand MiniProtocolNum
miniProtocolNum
(MiniProtocolDirection mode -> MiniProtocolDir
forall (mode :: MuxMode).
MiniProtocolDirection mode -> MiniProtocolDir
protocolDirEnum MiniProtocolDirection mode
miniProtocolDir))
Map
(MiniProtocolNum, MiniProtocolDir)
(MiniProtocolState mode m, MiniProtocolAction m)
-> m ()
go Map
(MiniProtocolNum, MiniProtocolDir)
(MiniProtocolState mode m, MiniProtocolAction m)
ptclsStartOnDemand'
EventControlCmd ControlCmd mode m
CmdShutdown -> do
Tracer m MuxTrace -> MuxTrace -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m MuxTrace
tracer MuxTrace
MuxTraceShutdown
STM m () -> m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ StrictTVar m MuxStatus -> MuxStatus -> STM m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m MuxStatus
muxStatus MuxStatus
MuxStopped
EventStartOnDemand ptclState :: MiniProtocolState mode m
ptclState@MiniProtocolState {
miniProtocolInfo :: forall (mode :: MuxMode) (m :: * -> *).
MiniProtocolState mode m -> MiniProtocolInfo mode
miniProtocolInfo = MiniProtocolInfo {
MiniProtocolNum
miniProtocolNum :: MiniProtocolNum
miniProtocolNum :: forall (mode :: MuxMode). MiniProtocolInfo mode -> MiniProtocolNum
miniProtocolNum,
MiniProtocolDirection mode
miniProtocolDir :: MiniProtocolDirection mode
miniProtocolDir :: forall (mode :: MuxMode).
MiniProtocolInfo mode -> MiniProtocolDirection mode
miniProtocolDir
},
StrictTVar m MiniProtocolStatus
miniProtocolStatusVar :: StrictTVar m MiniProtocolStatus
miniProtocolStatusVar :: forall (mode :: MuxMode) (m :: * -> *).
MiniProtocolState mode m -> StrictTVar m MiniProtocolStatus
miniProtocolStatusVar
}
MiniProtocolAction m
ptclAction -> do
Tracer m MuxTrace -> MuxTrace -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m MuxTrace
tracer (MiniProtocolNum -> MiniProtocolDir -> MuxTrace
MuxTraceStartOnDemand MiniProtocolNum
miniProtocolNum
(MiniProtocolDirection mode -> MiniProtocolDir
forall (mode :: MuxMode).
MiniProtocolDirection mode -> MiniProtocolDir
protocolDirEnum MiniProtocolDirection mode
miniProtocolDir))
STM m () -> m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ StrictTVar m MiniProtocolStatus -> MiniProtocolStatus -> STM m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m MiniProtocolStatus
miniProtocolStatusVar MiniProtocolStatus
StatusRunning
JobPool m MuxJobResult -> Job m MuxJobResult -> m ()
forall (m :: * -> *) a.
(MonadAsync m, MonadMask m) =>
JobPool m a -> Job m a -> m ()
JobPool.forkJob JobPool m MuxJobResult
jobpool (Job m MuxJobResult -> m ()) -> Job m MuxJobResult -> m ()
forall a b. (a -> b) -> a -> b
$
Tracer m MuxTrace
-> EgressQueue m
-> MiniProtocolState mode m
-> MiniProtocolAction m
-> Job m MuxJobResult
forall (mode :: MuxMode) (m :: * -> *).
(MonadSTM m, MonadThrow (STM m)) =>
Tracer m MuxTrace
-> EgressQueue m
-> MiniProtocolState mode m
-> MiniProtocolAction m
-> Job m MuxJobResult
miniProtocolJob
Tracer m MuxTrace
tracer
EgressQueue m
egressQueue
MiniProtocolState mode m
ptclState
MiniProtocolAction m
ptclAction
Map
(MiniProtocolNum, MiniProtocolDir)
(MiniProtocolState mode m, MiniProtocolAction m)
-> m ()
go ((MiniProtocolNum, MiniProtocolDir)
-> Map
(MiniProtocolNum, MiniProtocolDir)
(MiniProtocolState mode m, MiniProtocolAction m)
-> Map
(MiniProtocolNum, MiniProtocolDir)
(MiniProtocolState mode m, MiniProtocolAction m)
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete (MiniProtocolNum
miniProtocolNum, MiniProtocolDir
miniProtocolDirEnum) Map
(MiniProtocolNum, MiniProtocolDir)
(MiniProtocolState mode m, MiniProtocolAction m)
ptclsStartOnDemand)
where
miniProtocolDirEnum :: MiniProtocolDir
miniProtocolDirEnum = MiniProtocolDirection mode -> MiniProtocolDir
forall (mode :: MuxMode).
MiniProtocolDirection mode -> MiniProtocolDir
protocolDirEnum MiniProtocolDirection mode
miniProtocolDir
checkNonEmptyQueue :: IngressQueue m -> STM m ()
checkNonEmptyQueue :: IngressQueue m -> STM m ()
checkNonEmptyQueue IngressQueue m
q = do
ByteString
buf <- IngressQueue m -> STM m ByteString
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar IngressQueue m
q
Bool -> STM m ()
forall (stm :: * -> *). MonadSTMTx stm => Bool -> stm ()
check (Bool -> Bool
not (ByteString -> Bool
BL.null ByteString
buf))
protocolKey :: MiniProtocolState mode m -> (MiniProtocolNum, MiniProtocolDir)
protocolKey MiniProtocolState {
miniProtocolInfo :: forall (mode :: MuxMode) (m :: * -> *).
MiniProtocolState mode m -> MiniProtocolInfo mode
miniProtocolInfo = MiniProtocolInfo {
MiniProtocolNum
miniProtocolNum :: MiniProtocolNum
miniProtocolNum :: forall (mode :: MuxMode). MiniProtocolInfo mode -> MiniProtocolNum
miniProtocolNum,
MiniProtocolDirection mode
miniProtocolDir :: MiniProtocolDirection mode
miniProtocolDir :: forall (mode :: MuxMode).
MiniProtocolInfo mode -> MiniProtocolDirection mode
miniProtocolDir
}
} =
(MiniProtocolNum
miniProtocolNum, MiniProtocolDirection mode -> MiniProtocolDir
forall (mode :: MuxMode).
MiniProtocolDirection mode -> MiniProtocolDir
protocolDirEnum MiniProtocolDirection mode
miniProtocolDir)
data MonitorEvent mode m =
EventJobResult MuxJobResult
| EventControlCmd (ControlCmd mode m)
| EventStartOnDemand (MiniProtocolState mode m)
(MiniProtocolAction m)
data MuxJobResult =
MiniProtocolShutdown MiniProtocolNum MiniProtocolDir
| MiniProtocolException MiniProtocolNum MiniProtocolDir SomeException
| MuxerException SomeException
| DemuxerException SomeException
muxChannel
:: forall m.
( MonadSTM m
)
=> Tracer m MuxTrace
-> EgressQueue m
-> Wanton m
-> MiniProtocolNum
-> MiniProtocolDir
-> IngressQueue m
-> Channel m
muxChannel :: Tracer m MuxTrace
-> EgressQueue m
-> Wanton m
-> MiniProtocolNum
-> MiniProtocolDir
-> IngressQueue m
-> Channel m
muxChannel Tracer m MuxTrace
tracer EgressQueue m
egressQueue want :: Wanton m
want@(Wanton IngressQueue m
w) MiniProtocolNum
mc MiniProtocolDir
md IngressQueue m
q =
Channel :: forall (m :: * -> *).
(ByteString -> m ()) -> m (Maybe ByteString) -> Channel m
Channel { ByteString -> m ()
send :: ByteString -> m ()
send :: ByteString -> m ()
send, m (Maybe ByteString)
recv :: m (Maybe ByteString)
recv :: m (Maybe ByteString)
recv}
where
perMiniProtocolBufferSize :: Int64
perMiniProtocolBufferSize :: Int64
perMiniProtocolBufferSize = Int64
0x3ffff
send :: BL.ByteString -> m ()
send :: ByteString -> m ()
send ByteString
encoding = do
Tracer m MuxTrace -> MuxTrace -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m MuxTrace
tracer (MuxTrace -> m ()) -> MuxTrace -> m ()
forall a b. (a -> b) -> a -> b
$ MiniProtocolNum -> Int -> MuxTrace
MuxTraceChannelSendStart MiniProtocolNum
mc (Int64 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int64 -> Int) -> Int64 -> Int
forall a b. (a -> b) -> a -> b
$ ByteString -> Int64
BL.length ByteString
encoding)
STM m () -> m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
ByteString
buf <- IngressQueue m -> STM m ByteString
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar IngressQueue m
w
if ByteString -> Int64
BL.length ByteString
buf Int64 -> Int64 -> Bool
forall a. Ord a => a -> a -> Bool
< Int64
perMiniProtocolBufferSize
then do
let wasEmpty :: Bool
wasEmpty = ByteString -> Bool
BL.null ByteString
buf
IngressQueue m -> ByteString -> STM m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
StrictTVar m a -> a -> STM m ()
writeTVar IngressQueue m
w (ByteString -> ByteString -> ByteString
BL.append ByteString
buf ByteString
encoding)
Bool -> STM m () -> STM m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
wasEmpty (STM m () -> STM m ()) -> STM m () -> STM m ()
forall a b. (a -> b) -> a -> b
$
EgressQueue m -> TranslocationServiceRequest m -> STM m ()
forall (stm :: * -> *) a.
MonadSTMTx stm =>
TBQueue_ stm a -> a -> stm ()
writeTBQueue EgressQueue m
egressQueue (MiniProtocolNum
-> MiniProtocolDir -> Wanton m -> TranslocationServiceRequest m
forall (m :: * -> *).
MiniProtocolNum
-> MiniProtocolDir -> Wanton m -> TranslocationServiceRequest m
TLSRDemand MiniProtocolNum
mc MiniProtocolDir
md Wanton m
want)
else STM m ()
forall (stm :: * -> *) a. MonadSTMTx stm => stm a
retry
Tracer m MuxTrace -> MuxTrace -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m MuxTrace
tracer (MuxTrace -> m ()) -> MuxTrace -> m ()
forall a b. (a -> b) -> a -> b
$ MiniProtocolNum -> MuxTrace
MuxTraceChannelSendEnd MiniProtocolNum
mc
recv :: m (Maybe BL.ByteString)
recv :: m (Maybe ByteString)
recv = do
Tracer m MuxTrace -> MuxTrace -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m MuxTrace
tracer (MuxTrace -> m ()) -> MuxTrace -> m ()
forall a b. (a -> b) -> a -> b
$ MiniProtocolNum -> MuxTrace
MuxTraceChannelRecvStart MiniProtocolNum
mc
ByteString
blob <- STM m ByteString -> m ByteString
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m ByteString -> m ByteString)
-> STM m ByteString -> m ByteString
forall a b. (a -> b) -> a -> b
$ do
ByteString
blob <- IngressQueue m -> STM m ByteString
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar IngressQueue m
q
if ByteString
blob ByteString -> ByteString -> Bool
forall a. Eq a => a -> a -> Bool
== ByteString
BL.empty
then STM m ByteString
forall (stm :: * -> *) a. MonadSTMTx stm => stm a
retry
else IngressQueue m -> ByteString -> STM m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
StrictTVar m a -> a -> STM m ()
writeTVar IngressQueue m
q ByteString
BL.empty STM m () -> STM m ByteString -> STM m ByteString
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> ByteString -> STM m ByteString
forall (m :: * -> *) a. Monad m => a -> m a
return ByteString
blob
Tracer m MuxTrace -> MuxTrace -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m MuxTrace
tracer (MuxTrace -> m ()) -> MuxTrace -> m ()
forall a b. (a -> b) -> a -> b
$ MiniProtocolNum -> Int -> MuxTrace
MuxTraceChannelRecvEnd MiniProtocolNum
mc (Int64 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int64 -> Int) -> Int64 -> Int
forall a b. (a -> b) -> a -> b
$ ByteString -> Int64
BL.length ByteString
blob)
Maybe ByteString -> m (Maybe ByteString)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe ByteString -> m (Maybe ByteString))
-> Maybe ByteString -> m (Maybe ByteString)
forall a b. (a -> b) -> a -> b
$ ByteString -> Maybe ByteString
forall a. a -> Maybe a
Just ByteString
blob
traceMuxBearerState :: Tracer m MuxTrace -> MuxBearerState -> m ()
traceMuxBearerState :: Tracer m MuxTrace -> MuxBearerState -> m ()
traceMuxBearerState Tracer m MuxTrace
tracer MuxBearerState
state =
Tracer m MuxTrace -> MuxTrace -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m MuxTrace
tracer (MuxBearerState -> MuxTrace
MuxTraceState MuxBearerState
state)
runMiniProtocol :: forall mode m a.
MonadSTM m
=> Mux mode m
-> MiniProtocolNum
-> MiniProtocolDirection mode
-> StartOnDemandOrEagerly
-> (Channel m -> m (a, Maybe BL.ByteString))
-> m (STM m (Either SomeException a))
runMiniProtocol :: Mux mode m
-> MiniProtocolNum
-> MiniProtocolDirection mode
-> StartOnDemandOrEagerly
-> (Channel m -> m (a, Maybe ByteString))
-> m (STM m (Either SomeException a))
runMiniProtocol Mux { Map (MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m)
muxMiniProtocols :: Map (MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m)
muxMiniProtocols :: forall (mode :: MuxMode) (m :: * -> *).
Mux mode m
-> Map
(MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m)
muxMiniProtocols, TQueue m (ControlCmd mode m)
muxControlCmdQueue :: TQueue m (ControlCmd mode m)
muxControlCmdQueue :: forall (mode :: MuxMode) (m :: * -> *).
Mux mode m -> TQueue m (ControlCmd mode m)
muxControlCmdQueue , StrictTVar m MuxStatus
muxStatus :: StrictTVar m MuxStatus
muxStatus :: forall (mode :: MuxMode) (m :: * -> *).
Mux mode m -> StrictTVar m MuxStatus
muxStatus}
MiniProtocolNum
ptclNum MiniProtocolDirection mode
ptclDir StartOnDemandOrEagerly
startMode Channel m -> m (a, Maybe ByteString)
protocolAction
| Just ptclState :: MiniProtocolState mode m
ptclState@MiniProtocolState{StrictTVar m MiniProtocolStatus
miniProtocolStatusVar :: StrictTVar m MiniProtocolStatus
miniProtocolStatusVar :: forall (mode :: MuxMode) (m :: * -> *).
MiniProtocolState mode m -> StrictTVar m MiniProtocolStatus
miniProtocolStatusVar}
<- (MiniProtocolNum, MiniProtocolDir)
-> Map
(MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m)
-> Maybe (MiniProtocolState mode m)
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup (MiniProtocolNum
ptclNum, MiniProtocolDir
ptclDir') Map (MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m)
muxMiniProtocols
= STM m (STM m (Either SomeException a))
-> m (STM m (Either SomeException a))
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (STM m (Either SomeException a))
-> m (STM m (Either SomeException a)))
-> STM m (STM m (Either SomeException a))
-> m (STM m (Either SomeException a))
forall a b. (a -> b) -> a -> b
$ do
MiniProtocolStatus
status <- StrictTVar m MiniProtocolStatus -> STM m MiniProtocolStatus
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m MiniProtocolStatus
miniProtocolStatusVar
Bool -> STM m () -> STM m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (MiniProtocolStatus
status MiniProtocolStatus -> MiniProtocolStatus -> Bool
forall a. Eq a => a -> a -> Bool
== MiniProtocolStatus
StatusIdle) (STM m () -> STM m ()) -> STM m () -> STM m ()
forall a b. (a -> b) -> a -> b
$
String -> STM m ()
forall a. HasCallStack => String -> a
error (String -> STM m ()) -> String -> STM m ()
forall a b. (a -> b) -> a -> b
$ String
"runMiniProtocol: protocol thread already running for "
String -> String -> String
forall a. [a] -> [a] -> [a]
++ MiniProtocolNum -> String
forall a. Show a => a -> String
show MiniProtocolNum
ptclNum String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
" " String -> String -> String
forall a. [a] -> [a] -> [a]
++ MiniProtocolDir -> String
forall a. Show a => a -> String
show MiniProtocolDir
ptclDir'
let !status' :: MiniProtocolStatus
status' = case StartOnDemandOrEagerly
startMode of
StartOnDemandOrEagerly
StartOnDemand -> MiniProtocolStatus
StatusStartOnDemand
StartOnDemandOrEagerly
StartEagerly -> MiniProtocolStatus
StatusRunning
StrictTVar m MiniProtocolStatus -> MiniProtocolStatus -> STM m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m MiniProtocolStatus
miniProtocolStatusVar MiniProtocolStatus
status'
StrictTMVar m (Either SomeException a)
completionVar <- STM m (StrictTMVar m (Either SomeException a))
forall (m :: * -> *) a. MonadSTM m => STM m (StrictTMVar m a)
newEmptyTMVar
TQueue m (ControlCmd mode m) -> ControlCmd mode m -> STM m ()
forall (stm :: * -> *) a.
MonadSTMTx stm =>
TQueue_ stm a -> a -> stm ()
writeTQueue TQueue m (ControlCmd mode m)
muxControlCmdQueue (ControlCmd mode m -> STM m ()) -> ControlCmd mode m -> STM m ()
forall a b. (a -> b) -> a -> b
$
StartOnDemandOrEagerly
-> MiniProtocolState mode m
-> MiniProtocolAction m
-> ControlCmd mode m
forall (mode :: MuxMode) (m :: * -> *).
StartOnDemandOrEagerly
-> MiniProtocolState mode m
-> MiniProtocolAction m
-> ControlCmd mode m
CmdStartProtocolThread
StartOnDemandOrEagerly
startMode
MiniProtocolState mode m
ptclState
((Channel m -> m (a, Maybe ByteString))
-> StrictTMVar m (Either SomeException a) -> MiniProtocolAction m
forall (m :: * -> *) a.
(Channel m -> m (a, Maybe ByteString))
-> StrictTMVar m (Either SomeException a) -> MiniProtocolAction m
MiniProtocolAction Channel m -> m (a, Maybe ByteString)
protocolAction StrictTMVar m (Either SomeException a)
completionVar)
STM m (Either SomeException a)
-> STM m (STM m (Either SomeException a))
forall (m :: * -> *) a. Monad m => a -> m a
return (STM m (Either SomeException a)
-> STM m (STM m (Either SomeException a)))
-> STM m (Either SomeException a)
-> STM m (STM m (Either SomeException a))
forall a b. (a -> b) -> a -> b
$ StrictTMVar m (Either SomeException a)
-> STM m (Either SomeException a)
completionAction StrictTMVar m (Either SomeException a)
completionVar
| Bool
otherwise
= String -> m (STM m (Either SomeException a))
forall a. HasCallStack => String -> a
error (String -> m (STM m (Either SomeException a)))
-> String -> m (STM m (Either SomeException a))
forall a b. (a -> b) -> a -> b
$ String
"runMiniProtocol: no such protocol num and mode in this mux: "
String -> String -> String
forall a. [a] -> [a] -> [a]
++ MiniProtocolNum -> String
forall a. Show a => a -> String
show MiniProtocolNum
ptclNum String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
" " String -> String -> String
forall a. [a] -> [a] -> [a]
++ MiniProtocolDir -> String
forall a. Show a => a -> String
show MiniProtocolDir
ptclDir'
where
ptclDir' :: MiniProtocolDir
ptclDir' = MiniProtocolDirection mode -> MiniProtocolDir
forall (mode :: MuxMode).
MiniProtocolDirection mode -> MiniProtocolDir
protocolDirEnum MiniProtocolDirection mode
ptclDir
completionAction :: StrictTMVar m (Either SomeException a)
-> STM m (Either SomeException a)
completionAction StrictTMVar m (Either SomeException a)
completionVar = do
MuxStatus
st <- StrictTVar m MuxStatus -> STM m MuxStatus
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m MuxStatus
muxStatus
case MuxStatus
st of
MuxStatus
MuxReady -> StrictTMVar m (Either SomeException a)
-> STM m (Either SomeException a)
forall (m :: * -> *) a. MonadSTM m => StrictTMVar m a -> STM m a
readTMVar StrictTMVar m (Either SomeException a)
completionVar
MuxStatus
MuxStopped -> (StrictTMVar m (Either SomeException a)
-> STM m (Either SomeException a)
forall (m :: * -> *) a. MonadSTM m => StrictTMVar m a -> STM m a
readTMVar StrictTMVar m (Either SomeException a)
completionVar)
STM m (Either SomeException a)
-> STM m (Either SomeException a) -> STM m (Either SomeException a)
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> (Either SomeException a -> STM m (Either SomeException a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Either SomeException a -> STM m (Either SomeException a))
-> Either SomeException a -> STM m (Either SomeException a)
forall a b. (a -> b) -> a -> b
$ SomeException -> Either SomeException a
forall a b. a -> Either a b
Left (SomeException -> Either SomeException a)
-> SomeException -> Either SomeException a
forall a b. (a -> b) -> a -> b
$ MuxError -> SomeException
forall e. Exception e => e -> SomeException
toException (MuxErrorType -> String -> MuxError
MuxError MuxErrorType
MuxCleanShutdown String
""))
MuxFailed SomeException
e -> (StrictTMVar m (Either SomeException a)
-> STM m (Either SomeException a)
forall (m :: * -> *) a. MonadSTM m => StrictTMVar m a -> STM m a
readTMVar StrictTMVar m (Either SomeException a)
completionVar)
STM m (Either SomeException a)
-> STM m (Either SomeException a) -> STM m (Either SomeException a)
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> (Either SomeException a -> STM m (Either SomeException a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Either SomeException a -> STM m (Either SomeException a))
-> Either SomeException a -> STM m (Either SomeException a)
forall a b. (a -> b) -> a -> b
$ SomeException -> Either SomeException a
forall a b. a -> Either a b
Left (SomeException -> Either SomeException a)
-> SomeException -> Either SomeException a
forall a b. (a -> b) -> a -> b
$ MuxError -> SomeException
forall e. Exception e => e -> SomeException
toException (MuxError -> SomeException) -> MuxError -> SomeException
forall a b. (a -> b) -> a -> b
$
case SomeException -> Maybe MuxError
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
e of
Just e' :: MuxError
e'@MuxError { MuxErrorType
errorType :: MuxError -> MuxErrorType
errorType :: MuxErrorType
errorType } ->
MuxError
e' { errorType :: MuxErrorType
errorType = Maybe MuxErrorType -> MuxErrorType
MuxShutdown (MuxErrorType -> Maybe MuxErrorType
forall a. a -> Maybe a
Just MuxErrorType
errorType) }
Maybe MuxError
Nothing ->
MuxErrorType -> String -> MuxError
MuxError (Maybe MuxErrorType -> MuxErrorType
MuxShutdown Maybe MuxErrorType
forall a. Maybe a
Nothing) (SomeException -> String
forall a. Show a => a -> String
show SomeException
e))