\subsection{Cardano.BM.Backend.TraceForwarder}
\label{module:Cardano.BM.Backend.TraceForwarder}
%if style == newcode
\begin{code}
{-# LANGUAGE CPP #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies #-}
#if !defined(mingw32_HOST_OS)
#define POSIX
#endif
module Cardano.BM.Backend.TraceForwarder
( TraceForwarder (..)
, plugin
) where
import Control.Exception
import Control.Monad (forever, when)
import Control.Concurrent (threadDelay)
import qualified Control.Concurrent.Async as Async
import Control.Concurrent.MVar (MVar, modifyMVar_, newMVar, readMVar)
import Control.Concurrent.STM (atomically)
import qualified Control.Concurrent.STM.TBQueue as TBQ
import Data.Aeson (FromJSON, ToJSON, encode)
import qualified Data.ByteString as BS
import qualified Data.ByteString.Char8 as BSC
import qualified Data.ByteString.Lazy as BL
import Data.IORef (IORef, atomicModifyIORef', newIORef, readIORef)
import Data.Maybe (fromMaybe)
import Data.Text (Text, pack, unpack)
import Data.Text.Encoding (encodeUtf8)
import qualified Data.Text.IO as TIO
import Data.Typeable (Typeable)
import GHC.Natural (Natural)
import qualified Network.Socket as Socket
import System.IO (Handle, IOMode (..), hClose, stderr)
import Text.Read (readMaybe)
import Cardano.BM.Configuration
import Cardano.BM.Data.Backend
import Cardano.BM.Data.Configuration (RemoteAddr(..))
import Cardano.BM.Data.LogItem (LOContent (..), LOMeta (..), LogObject (..))
import Cardano.BM.Data.Severity (Severity (..))
import Cardano.BM.IOManager
import Cardano.BM.Plugin
import qualified Cardano.BM.Snocket as Snocket
import qualified Cardano.BM.Trace as Trace
\end{code}
%endif
|TraceForwarder| is a backend responsible for redirecting logs to a
different process (running a |TraceAcceptor| backend), by means of
either a pipe or a socket.
The |TraceForwarder| is looking up a minimum |Severity| in the options
section of the configuration. This filters out all messages that have not at
least the |Severity|.
The callback 'getStateDigest' is used as a source of |LogObject|s
that should be sent additionally, only once after the connection is
re\-established. The application that uses 'lobemo-backend-trace-forwarder'
plugin provides this callback.
\subsubsection{Plugin definition}
\begin{code}
plugin :: forall a s . (IsEffectuator s a, ToJSON a, FromJSON a)
=> Configuration
-> Trace.Trace IO a
-> s a
-> Text
-> IO [LogObject a]
-> IO (Plugin a)
plugin :: Configuration
-> Trace IO a -> s a -> Text -> IO [LogObject a] -> IO (Plugin a)
plugin Configuration
config Trace IO a
_trace s a
_sb Text
tfid IO [LogObject a]
getStateDigest = do
Maybe Text
opts <- Configuration -> Text -> IO (Maybe Text)
getTextOption Configuration
config Text
tfid
let minsev :: Severity
minsev = case Maybe Text
opts of
Maybe Text
Nothing -> Severity
Debug
Just Text
sevtext -> Severity -> Maybe Severity -> Severity
forall a. a -> Maybe a -> a
fromMaybe Severity
Debug (String -> Maybe Severity
forall a. Read a => String -> Maybe a
readMaybe (String -> Maybe Severity) -> String -> Maybe Severity
forall a b. (a -> b) -> a -> b
$ Text -> String
unpack Text
sevtext)
TraceForwarder a
be :: Cardano.BM.Backend.TraceForwarder.TraceForwarder a <- Configuration -> IO (TraceForwarder a)
forall (t :: * -> *) a. IsBackend t a => Configuration -> IO (t a)
realize Configuration
config
Async ()
dispatcherThr <- Configuration -> TraceForwarderMVar a -> IO (Async ())
forall a.
ToJSON a =>
Configuration -> TraceForwarderMVar a -> IO (Async ())
spawnDispatcher Configuration
config (TraceForwarder a -> TraceForwarderMVar a
forall a. TraceForwarder a -> TraceForwarderMVar a
getTF TraceForwarder a
be)
TraceForwarderMVar a
-> (TraceForwarderInternal a -> IO (TraceForwarderInternal a))
-> IO ()
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ (TraceForwarder a -> TraceForwarderMVar a
forall a. TraceForwarder a -> TraceForwarderMVar a
getTF TraceForwarder a
be) ((TraceForwarderInternal a -> IO (TraceForwarderInternal a))
-> IO ())
-> (TraceForwarderInternal a -> IO (TraceForwarderInternal a))
-> IO ()
forall a b. (a -> b) -> a -> b
$ \TraceForwarderInternal a
initialBE ->
TraceForwarderInternal a -> IO (TraceForwarderInternal a)
forall (m :: * -> *) a. Monad m => a -> m a
return (TraceForwarderInternal a -> IO (TraceForwarderInternal a))
-> TraceForwarderInternal a -> IO (TraceForwarderInternal a)
forall a b. (a -> b) -> a -> b
$ TraceForwarderInternal a
initialBE
{ tfFilter :: Severity
tfFilter = Severity
minsev
, tfDispatcher :: Maybe (Async ())
tfDispatcher = Async () -> Maybe (Async ())
forall a. a -> Maybe a
Just Async ()
dispatcherThr
, tfGetStateDigest :: IO [LogObject a]
tfGetStateDigest = IO [LogObject a]
getStateDigest
}
Plugin a -> IO (Plugin a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Plugin a -> IO (Plugin a)) -> Plugin a -> IO (Plugin a)
forall a b. (a -> b) -> a -> b
$ Backend a -> BackendKind -> Plugin a
forall a. Backend a -> BackendKind -> Plugin a
BackendPlugin
(MkBackend :: forall a. (LogObject a -> IO ()) -> IO () -> Backend a
MkBackend { bEffectuate :: LogObject a -> IO ()
bEffectuate = TraceForwarder a -> LogObject a -> IO ()
forall (t :: * -> *) a.
IsEffectuator t a =>
t a -> LogObject a -> IO ()
effectuate TraceForwarder a
be, bUnrealize :: IO ()
bUnrealize = TraceForwarder a -> IO ()
forall (t :: * -> *) a. IsBackend t a => t a -> IO ()
unrealize TraceForwarder a
be })
(TraceForwarder a -> BackendKind
forall (t :: * -> *) a. IsBackend t a => t a -> BackendKind
bekind TraceForwarder a
be)
\end{code}
\subsubsection{Structure of TraceForwarder}\label{code:TraceForwarder}\index{TraceForwarder}
Contains the handler to the pipe or to the socket.
\begin{code}
newtype TraceForwarder a = TraceForwarder
{ TraceForwarder a -> TraceForwarderMVar a
getTF :: TraceForwarderMVar a }
type TraceForwarderMVar a = MVar (TraceForwarderInternal a)
data TraceForwarderInternal a = TraceForwarderInternal
{ TraceForwarderInternal a -> TBQueue (LogObject a)
tfQueue :: TBQ.TBQueue (LogObject a)
, TraceForwarderInternal a -> Maybe Handle
tfHandle :: Maybe Handle
, TraceForwarderInternal a -> RemoteAddr
tfRemoteAddr :: RemoteAddr
, TraceForwarderInternal a -> Severity
tfFilter :: Severity
, TraceForwarderInternal a -> Maybe (Async ())
tfDispatcher :: Maybe (Async.Async ())
, TraceForwarderInternal a -> IORef Int
tfQueueFullCounter :: IORef Int
, TraceForwarderInternal a -> IO [LogObject a]
tfGetStateDigest :: IO [LogObject a]
}
\end{code}
\subsubsection{TraceForwarder is an effectuator}\index{TraceForwarder!instance of IsEffectuator}
Every |LogObject| before being written to the given handler is converted to
|ByteString| through its |JSON| representation.
\begin{code}
instance (ToJSON a) => IsEffectuator TraceForwarder a where
effectuate :: TraceForwarder a -> LogObject a -> IO ()
effectuate TraceForwarder a
tf LogObject a
lo = do
let currentMVar :: TraceForwarderMVar a
currentMVar = TraceForwarder a -> TraceForwarderMVar a
forall a. TraceForwarder a -> TraceForwarderMVar a
getTF TraceForwarder a
tf
TraceForwarderInternal a
currentTF <- TraceForwarderMVar a -> IO (TraceForwarderInternal a)
forall a. MVar a -> IO a
readMVar TraceForwarderMVar a
currentMVar
if Bool
isError
then TraceForwarderInternal a -> IO ()
writeMessageToQueue TraceForwarderInternal a
currentTF
else case LogObject a -> LOContent a
forall a. LogObject a -> LOContent a
loContent LogObject a
lo of
LogValue Text
_ Measurable
_ -> TraceForwarderInternal a -> IO ()
writeMessageToQueue TraceForwarderInternal a
currentTF
LOContent a
_ -> Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (LOMeta -> Severity
severity (LogObject a -> LOMeta
forall a. LogObject a -> LOMeta
loMeta LogObject a
lo) Severity -> Severity -> Bool
forall a. Ord a => a -> a -> Bool
>= TraceForwarderInternal a -> Severity
forall a. TraceForwarderInternal a -> Severity
tfFilter TraceForwarderInternal a
currentTF) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
TraceForwarderInternal a -> IO ()
writeMessageToQueue TraceForwarderInternal a
currentTF
where
isError :: Bool
isError = Bool
errByConstr Bool -> Bool -> Bool
|| Bool
errBySev
where
errByConstr :: Bool
errByConstr =
case LogObject a -> LOContent a
forall a. LogObject a -> LOContent a
loContent LogObject a
lo of
LogError Text
_ -> Bool
True
LOContent a
_ -> Bool
False
errBySev :: Bool
errBySev = LOMeta -> Severity
severity (LogObject a -> LOMeta
forall a. LogObject a -> LOMeta
loMeta LogObject a
lo) Severity -> Severity -> Bool
forall a. Ord a => a -> a -> Bool
>= Severity
Error
writeMessageToQueue :: TraceForwarderInternal a -> IO ()
writeMessageToQueue TraceForwarderInternal a
currentTF' = do
let queue :: TBQueue (LogObject a)
queue = TraceForwarderInternal a -> TBQueue (LogObject a)
forall a. TraceForwarderInternal a -> TBQueue (LogObject a)
tfQueue TraceForwarderInternal a
currentTF'
Bool
noCapacity <- STM Bool -> IO Bool
forall a. STM a -> IO a
atomically (STM Bool -> IO Bool) -> STM Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ TBQueue (LogObject a) -> STM Bool
forall a. TBQueue a -> STM Bool
TBQ.isFullTBQueue TBQueue (LogObject a)
queue
if Bool
noCapacity
then do
let counterIORef :: IORef Int
counterIORef = TraceForwarderInternal a -> IORef Int
forall a. TraceForwarderInternal a -> IORef Int
tfQueueFullCounter TraceForwarderInternal a
currentTF'
Bool
overflowed <- IORef Int -> (Int -> (Int, Bool)) -> IO Bool
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef Int
counterIORef ((Int -> (Int, Bool)) -> IO Bool)
-> (Int -> (Int, Bool)) -> IO Bool
forall a b. (a -> b) -> a -> b
$ \Int
counter ->
if Int
counter Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
overflowCriticalNum
then (Int
1, Bool
True)
else (Int
counter Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1, Bool
False)
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
overflowed (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ TraceForwarder a -> IO ()
forall (t :: * -> *) a. IsEffectuator t a => t a -> IO ()
handleOverflow TraceForwarder a
tf
else STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TBQueue (LogObject a) -> LogObject a -> STM ()
forall a. TBQueue a -> a -> STM ()
TBQ.writeTBQueue TBQueue (LogObject a)
queue LogObject a
lo
handleOverflow :: TraceForwarder a -> IO ()
handleOverflow TraceForwarder a
_ = Handle -> Text -> IO ()
TIO.hPutStrLn Handle
stderr (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$ Text
"Notice: TraceForwarder's queue is full, "
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
pack (Int -> String
forall a. Show a => a -> String
show Int
overflowCriticalNum)
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" log items were dropped!"
overflowCriticalNum :: Int
overflowCriticalNum :: Int
overflowCriticalNum = Int
200
\end{code}
\subsubsection{|TraceForwarder| implements |Backend| functions}\index{TraceForwarder!instance of IsBackend}
|TraceForwarder| is an |IsBackend|
\begin{code}
instance (FromJSON a, ToJSON a) => IsBackend TraceForwarder a where
type BackendFailure TraceForwarder = TraceForwarderBackendFailure
bekind :: TraceForwarder a -> BackendKind
bekind TraceForwarder a
_ = BackendKind
TraceForwarderBK
realize :: Configuration -> IO (TraceForwarder a)
realize Configuration
cfg = Configuration -> IO (Maybe RemoteAddr)
getForwardTo Configuration
cfg IO (Maybe RemoteAddr)
-> (Maybe RemoteAddr -> IO (TraceForwarder a))
-> IO (TraceForwarder a)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Maybe RemoteAddr
Nothing -> String -> IO (TraceForwarder a)
forall (m :: * -> *) a. MonadFail m => String -> m a
fail String
"Trace forwarder not configured: option 'forwardTo'"
Just RemoteAddr
addr -> do
TBQueue (LogObject a)
queue <- STM (TBQueue (LogObject a)) -> IO (TBQueue (LogObject a))
forall a. STM a -> IO a
atomically (STM (TBQueue (LogObject a)) -> IO (TBQueue (LogObject a)))
-> STM (TBQueue (LogObject a)) -> IO (TBQueue (LogObject a))
forall a b. (a -> b) -> a -> b
$ Natural -> STM (TBQueue (LogObject a))
forall a. Natural -> STM (TBQueue a)
TBQ.newTBQueue Natural
queueMaxSize
IORef Int
counter <- Int -> IO (IORef Int)
forall a. a -> IO (IORef a)
newIORef Int
0
MVar (TraceForwarderInternal a)
tfMVar <- TraceForwarderInternal a -> IO (MVar (TraceForwarderInternal a))
forall a. a -> IO (MVar a)
newMVar (TraceForwarderInternal a -> IO (MVar (TraceForwarderInternal a)))
-> TraceForwarderInternal a -> IO (MVar (TraceForwarderInternal a))
forall a b. (a -> b) -> a -> b
$ TraceForwarderInternal :: forall a.
TBQueue (LogObject a)
-> Maybe Handle
-> RemoteAddr
-> Severity
-> Maybe (Async ())
-> IORef Int
-> IO [LogObject a]
-> TraceForwarderInternal a
TraceForwarderInternal
{ tfQueue :: TBQueue (LogObject a)
tfQueue = TBQueue (LogObject a)
queue
, tfHandle :: Maybe Handle
tfHandle = Maybe Handle
forall a. Maybe a
Nothing
, tfRemoteAddr :: RemoteAddr
tfRemoteAddr = RemoteAddr
addr
, tfFilter :: Severity
tfFilter = Severity
Debug
, tfDispatcher :: Maybe (Async ())
tfDispatcher = Maybe (Async ())
forall a. Maybe a
Nothing
, tfQueueFullCounter :: IORef Int
tfQueueFullCounter = IORef Int
counter
, tfGetStateDigest :: IO [LogObject a]
tfGetStateDigest = [LogObject a] -> IO [LogObject a]
forall (m :: * -> *) a. Monad m => a -> m a
return []
}
TraceForwarder a -> IO (TraceForwarder a)
forall (m :: * -> *) a. Monad m => a -> m a
return (TraceForwarder a -> IO (TraceForwarder a))
-> TraceForwarder a -> IO (TraceForwarder a)
forall a b. (a -> b) -> a -> b
$ MVar (TraceForwarderInternal a) -> TraceForwarder a
forall a. TraceForwarderMVar a -> TraceForwarder a
TraceForwarder MVar (TraceForwarderInternal a)
tfMVar
unrealize :: TraceForwarder a -> IO ()
unrealize TraceForwarder a
tf = do
TraceForwarderInternal a
currentTF <- MVar (TraceForwarderInternal a) -> IO (TraceForwarderInternal a)
forall a. MVar a -> IO a
readMVar (TraceForwarder a -> MVar (TraceForwarderInternal a)
forall a. TraceForwarder a -> TraceForwarderMVar a
getTF TraceForwarder a
tf)
case TraceForwarderInternal a -> Maybe (Async ())
forall a. TraceForwarderInternal a -> Maybe (Async ())
tfDispatcher TraceForwarderInternal a
currentTF of
Maybe (Async ())
Nothing -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just Async ()
thr -> Async () -> IO ()
forall a. Async a -> IO ()
Async.uninterruptibleCancel Async ()
thr
Maybe Handle -> IO ()
closeHandle (Maybe Handle -> IO ()) -> Maybe Handle -> IO ()
forall a b. (a -> b) -> a -> b
$ TraceForwarderInternal a -> Maybe Handle
forall a. TraceForwarderInternal a -> Maybe Handle
tfHandle TraceForwarderInternal a
currentTF
closeHandle :: Maybe Handle -> IO ()
closeHandle :: Maybe Handle -> IO ()
closeHandle (Just Handle
h) = Handle -> IO ()
hClose Handle
h
closeHandle Maybe Handle
Nothing = () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
connectForwarder :: IOManager -> RemoteAddr -> IO Handle
connectForwarder :: IOManager -> RemoteAddr -> IO Handle
connectForwarder IOManager
iomgr (RemotePipe String
pipePath) = do
let sn :: LocalSnocket
sn = IOManager -> String -> LocalSnocket
Snocket.localSnocket IOManager
iomgr String
pipePath
LocalFD -> IO Handle
Snocket.localFDToHandle (LocalFD -> IO Handle) -> IO LocalFD -> IO Handle
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< LocalSnocket -> LocalAddress -> IO LocalFD
forall fd addr. Snocket IO fd addr -> addr -> IO fd
doConnect LocalSnocket
sn (String -> LocalAddress
Snocket.localAddressFromPath String
pipePath)
connectForwarder IOManager
iomgr (RemoteSocket String
host String
port) = do
let sn :: SocketSnocket
sn = IOManager -> SocketSnocket
Snocket.socketSnocket IOManager
iomgr
[AddrInfo]
addrs <- Maybe AddrInfo -> Maybe String -> Maybe String -> IO [AddrInfo]
Socket.getAddrInfo Maybe AddrInfo
forall a. Maybe a
Nothing (String -> Maybe String
forall a. a -> Maybe a
Just String
host) (String -> Maybe String
forall a. a -> Maybe a
Just String
port)
case [AddrInfo]
addrs of
[] -> TraceForwarderBackendFailure -> IO Handle
forall e a. Exception e => e -> IO a
throwIO (String -> TraceForwarderBackendFailure
TraceForwarderSocketError (String
"bad socket address: " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
host String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
":" String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
port))
AddrInfo
a:[AddrInfo]
_ -> SocketSnocket -> SockAddr -> IO LocalFD
forall fd addr. Snocket IO fd addr -> addr -> IO fd
doConnect SocketSnocket
sn (AddrInfo -> SockAddr
Socket.addrAddress AddrInfo
a)
IO LocalFD -> (LocalFD -> IO Handle) -> IO Handle
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (LocalFD -> IOMode -> IO Handle) -> IOMode -> LocalFD -> IO Handle
forall a b c. (a -> b -> c) -> b -> a -> c
flip LocalFD -> IOMode -> IO Handle
Socket.socketToHandle IOMode
ReadWriteMode
doConnect :: Snocket.Snocket IO fd addr -> addr -> IO fd
doConnect :: Snocket IO fd addr -> addr -> IO fd
doConnect Snocket IO fd addr
sn addr
remoteAddr = do
fd
sd <- Snocket IO fd addr -> addr -> IO fd
forall (m :: * -> *) fd addr. Snocket m fd addr -> addr -> m fd
Snocket.openToConnect Snocket IO fd addr
sn addr
remoteAddr
Snocket IO fd addr -> fd -> addr -> IO ()
forall (m :: * -> *) fd addr.
Snocket m fd addr -> fd -> addr -> m ()
Snocket.connect Snocket IO fd addr
sn fd
sd addr
remoteAddr
fd -> IO fd
forall (f :: * -> *) a. Applicative f => a -> f a
pure fd
sd
data TraceForwarderBackendFailure
= TraceForwarderConnectionError String
| TraceForwarderSocketError String
deriving (Int -> TraceForwarderBackendFailure -> String -> String
[TraceForwarderBackendFailure] -> String -> String
TraceForwarderBackendFailure -> String
(Int -> TraceForwarderBackendFailure -> String -> String)
-> (TraceForwarderBackendFailure -> String)
-> ([TraceForwarderBackendFailure] -> String -> String)
-> Show TraceForwarderBackendFailure
forall a.
(Int -> a -> String -> String)
-> (a -> String) -> ([a] -> String -> String) -> Show a
showList :: [TraceForwarderBackendFailure] -> String -> String
$cshowList :: [TraceForwarderBackendFailure] -> String -> String
show :: TraceForwarderBackendFailure -> String
$cshow :: TraceForwarderBackendFailure -> String
showsPrec :: Int -> TraceForwarderBackendFailure -> String -> String
$cshowsPrec :: Int -> TraceForwarderBackendFailure -> String -> String
Show, Typeable)
instance Exception TraceForwarderBackendFailure
\end{code}
\subsubsection{Asynchronously reading log items from the queue and sending them to an acceptor.}
\begin{code}
spawnDispatcher :: ToJSON a => Configuration -> TraceForwarderMVar a -> IO (Async.Async ())
spawnDispatcher :: Configuration -> TraceForwarderMVar a -> IO (Async ())
spawnDispatcher Configuration
config TraceForwarderMVar a
tfMVar = do
Word
forwardDelay <- Configuration -> IO (Maybe Word)
getForwardDelay Configuration
config IO (Maybe Word) -> (Maybe Word -> IO Word) -> IO Word
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Maybe Word
Nothing -> Word -> IO Word
forall (f :: * -> *) a. Applicative f => a -> f a
pure Word
defaultDelayInMs
Just Word
delayInMs -> Word -> IO Word
forall (f :: * -> *) a. Applicative f => a -> f a
pure Word
delayInMs
IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
Async.async (IO () -> IO (Async ())) -> IO () -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ Word -> IO ()
processQueue Word
forwardDelay
where
defaultDelayInMs :: Word
defaultDelayInMs :: Word
defaultDelayInMs = Word
1000
processQueue :: Word -> IO ()
processQueue :: Word -> IO ()
processQueue Word
delayInMs = IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
Int -> IO ()
threadDelay (Int -> IO ()) -> Int -> IO ()
forall a b. (a -> b) -> a -> b
$ Word -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word
delayInMs Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1000
TraceForwarderInternal a
currentTF <- TraceForwarderMVar a -> IO (TraceForwarderInternal a)
forall a. MVar a -> IO a
readMVar TraceForwarderMVar a
tfMVar
let getStateDigest :: IO [LogObject a]
getStateDigest = TraceForwarderInternal a -> IO [LogObject a]
forall a. TraceForwarderInternal a -> IO [LogObject a]
tfGetStateDigest TraceForwarderInternal a
currentTF
[LogObject a]
itemsList <- STM [LogObject a] -> IO [LogObject a]
forall a. STM a -> IO a
atomically (STM [LogObject a] -> IO [LogObject a])
-> STM [LogObject a] -> IO [LogObject a]
forall a b. (a -> b) -> a -> b
$ TBQueue (LogObject a) -> STM [LogObject a]
forall a. TBQueue a -> STM [a]
TBQ.flushTBQueue (TraceForwarderInternal a -> TBQueue (LogObject a)
forall a. TraceForwarderInternal a -> TBQueue (LogObject a)
tfQueue TraceForwarderInternal a
currentTF)
Configuration
-> TraceForwarderMVar a
-> [LogObject a]
-> IO [LogObject a]
-> IO ()
forall a.
ToJSON a =>
Configuration
-> TraceForwarderMVar a
-> [LogObject a]
-> IO [LogObject a]
-> IO ()
sendItems Configuration
config TraceForwarderMVar a
tfMVar [LogObject a]
itemsList IO [LogObject a]
getStateDigest
sendItems :: ToJSON a
=> Configuration
-> TraceForwarderMVar a
-> [LogObject a]
-> IO [LogObject a]
-> IO ()
sendItems :: Configuration
-> TraceForwarderMVar a
-> [LogObject a]
-> IO [LogObject a]
-> IO ()
sendItems Configuration
_ TraceForwarderMVar a
_ [] IO [LogObject a]
_ = () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
sendItems Configuration
config TraceForwarderMVar a
tfMVar items :: [LogObject a]
items@(LogObject a
lo:[LogObject a]
_) IO [LogObject a]
getStateDigest =
TraceForwarderInternal a -> Maybe Handle
forall a. TraceForwarderInternal a -> Maybe Handle
tfHandle (TraceForwarderInternal a -> Maybe Handle)
-> IO (TraceForwarderInternal a) -> IO (Maybe Handle)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TraceForwarderMVar a -> IO (TraceForwarderInternal a)
forall a. MVar a -> IO a
readMVar TraceForwarderMVar a
tfMVar IO (Maybe Handle) -> (Maybe Handle -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Maybe Handle
Nothing -> do
Int -> Int -> TraceForwarderMVar a -> IO ()
forall a. Int -> Int -> TraceForwarderMVar a -> IO ()
establishConnection Int
1 Int
1 TraceForwarderMVar a
tfMVar
[LogObject a]
additionalItems <- IO [LogObject a]
getStateDigest
let allItems :: [LogObject a]
allItems = [LogObject a]
additionalItems [LogObject a] -> [LogObject a] -> [LogObject a]
forall a. [a] -> [a] -> [a]
++ [LogObject a]
items
Configuration
-> TraceForwarderMVar a
-> [LogObject a]
-> IO [LogObject a]
-> IO ()
forall a.
ToJSON a =>
Configuration
-> TraceForwarderMVar a
-> [LogObject a]
-> IO [LogObject a]
-> IO ()
sendItems Configuration
config TraceForwarderMVar a
tfMVar [LogObject a]
allItems IO [LogObject a]
getStateDigest
Just Handle
h ->
IO () -> IO (Either IOException ())
forall e a. Exception e => IO a -> IO (Either e a)
try (Handle -> ByteString -> IO ()
BSC.hPutStrLn Handle
h (ByteString -> IO ()) -> ByteString -> IO ()
forall a b. (a -> b) -> a -> b
$! ByteString
encodedHostname) IO (Either IOException ())
-> (Either IOException () -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Right ()
_ ->
IO () -> IO (Either IOException ())
forall e a. Exception e => IO a -> IO (Either e a)
try (Handle -> ByteString -> IO ()
BSC.hPutStrLn Handle
h (ByteString -> IO ()) -> ByteString -> IO ()
forall a b. (a -> b) -> a -> b
$! ByteString
bs) IO (Either IOException ())
-> (Either IOException () -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Right ()
_ ->
() -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Left (IOException
_e :: IOException) -> do
IO ()
reConnectIfQueueIsAlmostFull
Int -> IO ()
threadDelay Int
10000
Configuration
-> TraceForwarderMVar a
-> [LogObject a]
-> IO [LogObject a]
-> IO ()
forall a.
ToJSON a =>
Configuration
-> TraceForwarderMVar a
-> [LogObject a]
-> IO [LogObject a]
-> IO ()
sendItems Configuration
config TraceForwarderMVar a
tfMVar [LogObject a]
items IO [LogObject a]
getStateDigest
Left (IOException
_e :: IOException) -> do
IO ()
reConnectIfQueueIsAlmostFull
Int -> IO ()
threadDelay Int
10000
Configuration
-> TraceForwarderMVar a
-> [LogObject a]
-> IO [LogObject a]
-> IO ()
forall a.
ToJSON a =>
Configuration
-> TraceForwarderMVar a
-> [LogObject a]
-> IO [LogObject a]
-> IO ()
sendItems Configuration
config TraceForwarderMVar a
tfMVar [LogObject a]
items IO [LogObject a]
getStateDigest
where
encodedHostname :: ByteString
encodedHostname = Text -> ByteString
encodeUtf8 (LOMeta -> Text
hostname (LOMeta -> Text) -> (LogObject a -> LOMeta) -> LogObject a -> Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. LogObject a -> LOMeta
forall a. LogObject a -> LOMeta
loMeta (LogObject a -> Text) -> LogObject a -> Text
forall a b. (a -> b) -> a -> b
$ LogObject a
lo)
(Int
_, ByteString
bs) = [LogObject a] -> (Int, ByteString)
forall b. ToJSON b => b -> (Int, ByteString)
jsonToBS [LogObject a]
items
jsonToBS :: ToJSON b => b -> (Int, BS.ByteString)
jsonToBS :: b -> (Int, ByteString)
jsonToBS b
a =
let bs' :: ByteString
bs' = ByteString -> ByteString
BL.toStrict (ByteString -> ByteString) -> ByteString -> ByteString
forall a b. (a -> b) -> a -> b
$ b -> ByteString
forall a. ToJSON a => a -> ByteString
encode b
a
in (ByteString -> Int
BS.length ByteString
bs', ByteString
bs')
reConnectIfQueueIsAlmostFull :: IO ()
reConnectIfQueueIsAlmostFull = do
TraceForwarderInternal a
currentTF <- TraceForwarderMVar a -> IO (TraceForwarderInternal a)
forall a. MVar a -> IO a
readMVar TraceForwarderMVar a
tfMVar
Natural
currentQueueSize <- STM Natural -> IO Natural
forall a. STM a -> IO a
atomically (STM Natural -> IO Natural) -> STM Natural -> IO Natural
forall a b. (a -> b) -> a -> b
$ TBQueue (LogObject a) -> STM Natural
forall a. TBQueue a -> STM Natural
TBQ.lengthTBQueue (TraceForwarderInternal a -> TBQueue (LogObject a)
forall a. TraceForwarderInternal a -> TBQueue (LogObject a)
tfQueue TraceForwarderInternal a
currentTF)
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Natural -> Bool
forall a. Integral a => a -> Bool
queueIsAlmostFull Natural
currentQueueSize) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
Maybe Handle -> IO ()
closeHandle (Maybe Handle -> IO ()) -> Maybe Handle -> IO ()
forall a b. (a -> b) -> a -> b
$ TraceForwarderInternal a -> Maybe Handle
forall a. TraceForwarderInternal a -> Maybe Handle
tfHandle TraceForwarderInternal a
currentTF
TraceForwarderMVar a
-> (TraceForwarderInternal a -> IO (TraceForwarderInternal a))
-> IO ()
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ TraceForwarderMVar a
tfMVar ((TraceForwarderInternal a -> IO (TraceForwarderInternal a))
-> IO ())
-> (TraceForwarderInternal a -> IO (TraceForwarderInternal a))
-> IO ()
forall a b. (a -> b) -> a -> b
$ \TraceForwarderInternal a
be -> TraceForwarderInternal a -> IO (TraceForwarderInternal a)
forall (m :: * -> *) a. Monad m => a -> m a
return (TraceForwarderInternal a -> IO (TraceForwarderInternal a))
-> TraceForwarderInternal a -> IO (TraceForwarderInternal a)
forall a b. (a -> b) -> a -> b
$ TraceForwarderInternal a
be { tfHandle :: Maybe Handle
tfHandle = Maybe Handle
forall a. Maybe a
Nothing }
queueIsAlmostFull :: a -> Bool
queueIsAlmostFull a
queueSize = a
queueSize a -> a -> Bool
forall a. Ord a => a -> a -> Bool
>= Float -> a
forall a b. (RealFrac a, Integral b) => a -> b
round Float
almostFullSize
where
almostFullSize :: Float
almostFullSize :: Float
almostFullSize = Float
0.8 Float -> Float -> Float
forall a. Num a => a -> a -> a
* Natural -> Float
forall a b. (Integral a, Num b) => a -> b
fromIntegral Natural
queueMaxSize
queueMaxSize :: Natural
queueMaxSize :: Natural
queueMaxSize = Natural
2500
establishConnection :: Int -> Int -> TraceForwarderMVar a -> IO ()
establishConnection :: Int -> Int -> TraceForwarderMVar a -> IO ()
establishConnection Int
delayInSec Int
delayInSec' TraceForwarderMVar a
tfMVar = (IOManager -> IO ()) -> IO ()
WithIOManager
withIOManager ((IOManager -> IO ()) -> IO ()) -> (IOManager -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \IOManager
iomgr -> do
RemoteAddr
addr <- TraceForwarderInternal a -> RemoteAddr
forall a. TraceForwarderInternal a -> RemoteAddr
tfRemoteAddr (TraceForwarderInternal a -> RemoteAddr)
-> IO (TraceForwarderInternal a) -> IO RemoteAddr
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TraceForwarderMVar a -> IO (TraceForwarderInternal a)
forall a. MVar a -> IO a
readMVar TraceForwarderMVar a
tfMVar
IO Handle -> IO (Either IOException Handle)
forall e a. Exception e => IO a -> IO (Either e a)
try (IOManager -> RemoteAddr -> IO Handle
connectForwarder IOManager
iomgr RemoteAddr
addr) IO (Either IOException Handle)
-> (Either IOException Handle -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Right Handle
h ->
TraceForwarderMVar a
-> (TraceForwarderInternal a -> IO (TraceForwarderInternal a))
-> IO ()
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ TraceForwarderMVar a
tfMVar ((TraceForwarderInternal a -> IO (TraceForwarderInternal a))
-> IO ())
-> (TraceForwarderInternal a -> IO (TraceForwarderInternal a))
-> IO ()
forall a b. (a -> b) -> a -> b
$ \TraceForwarderInternal a
be -> TraceForwarderInternal a -> IO (TraceForwarderInternal a)
forall (m :: * -> *) a. Monad m => a -> m a
return (TraceForwarderInternal a -> IO (TraceForwarderInternal a))
-> TraceForwarderInternal a -> IO (TraceForwarderInternal a)
forall a b. (a -> b) -> a -> b
$ TraceForwarderInternal a
be { tfHandle :: Maybe Handle
tfHandle = Handle -> Maybe Handle
forall a. a -> Maybe a
Just Handle
h }
Left (IOException
e :: IOException) -> do
Int -> IO ()
threadDelay (Int -> IO ()) -> Int -> IO ()
forall a b. (a -> b) -> a -> b
$ Int
1000000 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
delayInSec'
if Int
delayInSec' Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
60
then
Int -> Int -> TraceForwarderMVar a -> IO ()
forall a. Int -> Int -> TraceForwarderMVar a -> IO ()
establishConnection Int
delayInSec' (Int
delayInSec Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
delayInSec') TraceForwarderMVar a
tfMVar
else
Int -> Int -> TraceForwarderMVar a -> IO ()
forall a. Int -> Int -> TraceForwarderMVar a -> IO ()
establishConnection Int
1 Int
60 TraceForwarderMVar a
tfMVar
\end{code}