{-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE RankNTypes #-} module Network.Mux.Bearer.Queues ( queuesAsMuxBearer ) where import qualified Data.ByteString.Lazy as BL import Data.Word (Word16) import Control.Monad.Class.MonadSTM.Strict import Control.Monad.Class.MonadTime import Control.Monad.Class.MonadThrow import Control.Tracer import qualified Network.Mux as Mx import Network.Mux.Types (MuxBearer) import qualified Network.Mux.Types as Mx import qualified Network.Mux.Codec as Mx import Network.Mux.Time as Mx import qualified Network.Mux.Timeout as Mx queuesAsMuxBearer :: forall m. ( MonadSTM m , MonadTime m , MonadThrow m ) => Tracer m Mx.MuxTrace -> TBQueue m BL.ByteString -> TBQueue m BL.ByteString -> Word16 -> MuxBearer m queuesAsMuxBearer :: Tracer m MuxTrace -> TBQueue m ByteString -> TBQueue m ByteString -> Word16 -> MuxBearer m queuesAsMuxBearer Tracer m MuxTrace tracer TBQueue m ByteString writeQueue TBQueue m ByteString readQueue Word16 sduSize = do MuxBearer :: forall (m :: * -> *). (TimeoutFn m -> MuxSDU -> m Time) -> (TimeoutFn m -> m (MuxSDU, Time)) -> Word16 -> MuxBearer m Mx.MuxBearer { read :: TimeoutFn m -> m (MuxSDU, Time) Mx.read = TimeoutFn m -> m (MuxSDU, Time) readMux, write :: TimeoutFn m -> MuxSDU -> m Time Mx.write = TimeoutFn m -> MuxSDU -> m Time writeMux, sduSize :: Word16 Mx.sduSize = Word16 sduSize } where readMux :: Mx.TimeoutFn m -> m (Mx.MuxSDU, Time) readMux :: TimeoutFn m -> m (MuxSDU, Time) readMux TimeoutFn m _ = do Tracer m MuxTrace -> MuxTrace -> m () forall (m :: * -> *) a. Tracer m a -> a -> m () traceWith Tracer m MuxTrace tracer (MuxTrace -> m ()) -> MuxTrace -> m () forall a b. (a -> b) -> a -> b $ MuxTrace Mx.MuxTraceRecvHeaderStart ByteString buf <- STM m ByteString -> m ByteString forall (m :: * -> *) a. (MonadSTM m, HasCallStack) => STM m a -> m a atomically (STM m ByteString -> m ByteString) -> STM m ByteString -> m ByteString forall a b. (a -> b) -> a -> b $ TBQueue m ByteString -> STM m ByteString forall (stm :: * -> *) a. MonadSTMTx stm => TBQueue_ stm a -> stm a readTBQueue TBQueue m ByteString readQueue let (ByteString hbuf, ByteString payload) = Int64 -> ByteString -> (ByteString, ByteString) BL.splitAt Int64 8 ByteString buf case ByteString -> Either MuxError MuxSDU Mx.decodeMuxSDU ByteString hbuf of Left MuxError e -> MuxError -> m (MuxSDU, Time) forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a throwIO MuxError e Right MuxSDU header -> do Tracer m MuxTrace -> MuxTrace -> m () forall (m :: * -> *) a. Tracer m a -> a -> m () traceWith Tracer m MuxTrace tracer (MuxTrace -> m ()) -> MuxTrace -> m () forall a b. (a -> b) -> a -> b $ MuxSDUHeader -> MuxTrace Mx.MuxTraceRecvHeaderEnd (MuxSDU -> MuxSDUHeader Mx.msHeader MuxSDU header) Time ts <- m Time forall (m :: * -> *). MonadMonotonicTime m => m Time getMonotonicTime Tracer m MuxTrace -> MuxTrace -> m () forall (m :: * -> *) a. Tracer m a -> a -> m () traceWith Tracer m MuxTrace tracer (MuxTrace -> m ()) -> MuxTrace -> m () forall a b. (a -> b) -> a -> b $ MuxSDUHeader -> Time -> MuxTrace Mx.MuxTraceRecvDeltaQObservation (MuxSDU -> MuxSDUHeader Mx.msHeader MuxSDU header) Time ts (MuxSDU, Time) -> m (MuxSDU, Time) forall (m :: * -> *) a. Monad m => a -> m a return (MuxSDU header {msBlob :: ByteString Mx.msBlob = ByteString payload}, Time ts) writeMux :: Mx.TimeoutFn m -> Mx.MuxSDU -> m Time writeMux :: TimeoutFn m -> MuxSDU -> m Time writeMux TimeoutFn m _ MuxSDU sdu = do Time ts <- m Time forall (m :: * -> *). MonadMonotonicTime m => m Time getMonotonicTime let ts32 :: Word32 ts32 = Time -> Word32 Mx.timestampMicrosecondsLow32Bits Time ts sdu' :: MuxSDU sdu' = MuxSDU -> RemoteClockModel -> MuxSDU Mx.setTimestamp MuxSDU sdu (Word32 -> RemoteClockModel Mx.RemoteClockModel Word32 ts32) buf :: ByteString buf = MuxSDU -> ByteString Mx.encodeMuxSDU MuxSDU sdu' Tracer m MuxTrace -> MuxTrace -> m () forall (m :: * -> *) a. Tracer m a -> a -> m () traceWith Tracer m MuxTrace tracer (MuxTrace -> m ()) -> MuxTrace -> m () forall a b. (a -> b) -> a -> b $ MuxSDUHeader -> MuxTrace Mx.MuxTraceSendStart (MuxSDU -> MuxSDUHeader Mx.msHeader MuxSDU sdu') STM m () -> m () forall (m :: * -> *) a. (MonadSTM m, HasCallStack) => STM m a -> m a atomically (STM m () -> m ()) -> STM m () -> m () forall a b. (a -> b) -> a -> b $ TBQueue m ByteString -> ByteString -> STM m () forall (stm :: * -> *) a. MonadSTMTx stm => TBQueue_ stm a -> a -> stm () writeTBQueue TBQueue m ByteString writeQueue ByteString buf Tracer m MuxTrace -> MuxTrace -> m () forall (m :: * -> *) a. Tracer m a -> a -> m () traceWith Tracer m MuxTrace tracer (MuxTrace -> m ()) -> MuxTrace -> m () forall a b. (a -> b) -> a -> b $ MuxTrace Mx.MuxTraceSendEnd Time -> m Time forall (m :: * -> *) a. Monad m => a -> m a return Time ts