{-# LANGUAGE NamedFieldPuns #-}
module Ouroboros.Network.Subscription.Client
( ClientSubscriptionParams (..)
, clientSubscriptionWorker
) where
import Control.Monad.Class.MonadTime
import Control.Tracer
import Data.Void (Void)
import Data.Functor.Identity (Identity (..))
import Ouroboros.Network.Snocket ( LocalAddress
, LocalSnocket
, LocalSocket
)
import Ouroboros.Network.ErrorPolicy ( ErrorPolicies
, ErrorPolicyTrace
, WithAddr
, completeApplicationTx
)
import Ouroboros.Network.Socket (NetworkMutableState (..))
import Ouroboros.Network.Subscription.Ip (socketStateChangeTx, mainTx)
import Ouroboros.Network.Subscription.Worker
import Ouroboros.Network.Subscription.Subscriber
data ClientSubscriptionParams a = ClientSubscriptionParams
{ ClientSubscriptionParams a -> LocalAddress
cspAddress :: !LocalAddress
, ClientSubscriptionParams a -> Maybe DiffTime
cspConnectionAttemptDelay :: !(Maybe DiffTime)
, ClientSubscriptionParams a -> ErrorPolicies
cspErrorPolicies :: !ErrorPolicies
}
clientSubscriptionWorker
:: LocalSnocket
-> Tracer IO (SubscriptionTrace LocalAddress)
-> Tracer IO (WithAddr LocalAddress ErrorPolicyTrace)
-> NetworkMutableState LocalAddress
-> ClientSubscriptionParams a
-> (LocalSocket -> IO a)
-> IO Void
clientSubscriptionWorker :: LocalSnocket
-> Tracer IO (SubscriptionTrace LocalAddress)
-> Tracer IO (WithAddr LocalAddress ErrorPolicyTrace)
-> NetworkMutableState LocalAddress
-> ClientSubscriptionParams a
-> (LocalSocket -> IO a)
-> IO Void
clientSubscriptionWorker LocalSnocket
snocket
Tracer IO (SubscriptionTrace LocalAddress)
tracer
Tracer IO (WithAddr LocalAddress ErrorPolicyTrace)
errorPolicyTracer
NetworkMutableState { ConnectionTable IO LocalAddress
nmsConnectionTable :: forall addr. NetworkMutableState addr -> ConnectionTable IO addr
nmsConnectionTable :: ConnectionTable IO LocalAddress
nmsConnectionTable, StrictTVar IO (PeerStates IO LocalAddress)
nmsPeerStates :: forall addr.
NetworkMutableState addr -> StrictTVar IO (PeerStates IO addr)
nmsPeerStates :: StrictTVar IO (PeerStates IO LocalAddress)
nmsPeerStates }
ClientSubscriptionParams { LocalAddress
cspAddress :: LocalAddress
cspAddress :: forall a. ClientSubscriptionParams a -> LocalAddress
cspAddress
, Maybe DiffTime
cspConnectionAttemptDelay :: Maybe DiffTime
cspConnectionAttemptDelay :: forall a. ClientSubscriptionParams a -> Maybe DiffTime
cspConnectionAttemptDelay
, ErrorPolicies
cspErrorPolicies :: ErrorPolicies
cspErrorPolicies :: forall a. ClientSubscriptionParams a -> ErrorPolicies
cspErrorPolicies
}
LocalSocket -> IO a
k =
Tracer IO (SubscriptionTrace LocalAddress)
-> Tracer IO (WithAddr LocalAddress ErrorPolicyTrace)
-> ConnectionTable IO LocalAddress
-> StrictTVar IO (PeerStates IO LocalAddress)
-> LocalSnocket
-> WorkerCallbacks
IO (PeerStates IO LocalAddress) LocalAddress a Void
-> WorkerParams IO Identity LocalAddress
-> (LocalSocket -> IO a)
-> IO Void
forall s sock (localAddrs :: * -> *) addr a x.
Ord addr =>
Tracer IO (SubscriptionTrace addr)
-> Tracer IO (WithAddr addr ErrorPolicyTrace)
-> ConnectionTable IO addr
-> StateVar IO s
-> Snocket IO sock addr
-> WorkerCallbacks IO s addr a x
-> WorkerParams IO localAddrs addr
-> (sock -> IO a)
-> IO x
worker Tracer IO (SubscriptionTrace LocalAddress)
tracer
Tracer IO (WithAddr LocalAddress ErrorPolicyTrace)
errorPolicyTracer
ConnectionTable IO LocalAddress
nmsConnectionTable
StrictTVar IO (PeerStates IO LocalAddress)
nmsPeerStates
LocalSnocket
snocket
WorkerCallbacks :: forall (m :: * -> *) s addr a t.
SocketStateChange m s addr
-> CompleteApplication m s addr a
-> Main m s t
-> WorkerCallbacks m s addr a t
WorkerCallbacks
{ wcSocketStateChangeTx :: SocketStateChange IO (PeerStates IO LocalAddress) LocalAddress
wcSocketStateChangeTx = SocketStateChange IO (PeerStates IO LocalAddress) LocalAddress
forall addr.
Ord addr =>
SocketStateChange IO (PeerStates IO addr) addr
socketStateChangeTx
, wcCompleteApplicationTx :: CompleteApplication IO (PeerStates IO LocalAddress) LocalAddress a
wcCompleteApplicationTx = ErrorPolicies
-> CompleteApplication
IO (PeerStates IO LocalAddress) LocalAddress a
forall (m :: * -> *) addr a.
(MonadAsync m, Ord addr, Ord (Async m ())) =>
ErrorPolicies -> CompleteApplication m (PeerStates m addr) addr a
completeApplicationTx ErrorPolicies
cspErrorPolicies
, wcMainTx :: Main IO (PeerStates IO LocalAddress) Void
wcMainTx = Main IO (PeerStates IO LocalAddress) Void
forall (m :: * -> *) addr.
(MonadThrow (STM m), MonadSTM m) =>
Main m (PeerStates m addr) Void
mainTx
}
WorkerParams IO Identity LocalAddress
workerParams
LocalSocket -> IO a
k
where
workerParams :: WorkerParams IO Identity LocalAddress
workerParams :: WorkerParams IO Identity LocalAddress
workerParams = WorkerParams :: forall (m :: * -> *) (localAddrs :: * -> *) addr.
localAddrs addr
-> (addr -> localAddrs addr -> Maybe addr)
-> (addr -> Maybe DiffTime)
-> m (SubscriptionTarget m addr)
-> Int
-> WorkerParams m localAddrs addr
WorkerParams {
wpLocalAddresses :: Identity LocalAddress
wpLocalAddresses = LocalAddress -> Identity LocalAddress
forall a. a -> Identity a
Identity LocalAddress
cspAddress,
wpSelectAddress :: LocalAddress -> Identity LocalAddress -> Maybe LocalAddress
wpSelectAddress = \LocalAddress
_ (Identity LocalAddress
addr) -> LocalAddress -> Maybe LocalAddress
forall a. a -> Maybe a
Just LocalAddress
addr,
wpConnectionAttemptDelay :: LocalAddress -> Maybe DiffTime
wpConnectionAttemptDelay = Maybe DiffTime -> LocalAddress -> Maybe DiffTime
forall a b. a -> b -> a
const Maybe DiffTime
cspConnectionAttemptDelay,
wpSubscriptionTarget :: IO (SubscriptionTarget IO LocalAddress)
wpSubscriptionTarget = SubscriptionTarget IO LocalAddress
-> IO (SubscriptionTarget IO LocalAddress)
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([LocalAddress] -> SubscriptionTarget IO LocalAddress
forall (m :: * -> *) target.
Applicative m =>
[target] -> SubscriptionTarget m target
listSubscriptionTarget [LocalAddress
cspAddress]),
wpValency :: Int
wpValency = Int
1
}