\subsection{Cardano.BM.Backend.TraceForwarder}
\label{module:Cardano.BM.Backend.TraceForwarder}

%if style == newcode
\begin{code}
{-# LANGUAGE CPP                   #-}
{-# LANGUAGE FlexibleInstances     #-}
{-# LANGUAGE LambdaCase            #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE ScopedTypeVariables   #-}
{-# LANGUAGE TypeFamilies          #-}

#if !defined(mingw32_HOST_OS)
#define POSIX
#endif

module Cardano.BM.Backend.TraceForwarder
    ( TraceForwarder (..)
    -- * Plugin
    , plugin
    ) where

import           Control.Exception
import           Control.Monad (forever, when)
import           Control.Concurrent (threadDelay)
import qualified Control.Concurrent.Async as Async
import           Control.Concurrent.MVar (MVar, modifyMVar_, newMVar, readMVar)
import           Control.Concurrent.STM (atomically)
import qualified Control.Concurrent.STM.TBQueue as TBQ
import           Data.Aeson (FromJSON, ToJSON, encode)
import qualified Data.ByteString as BS
import qualified Data.ByteString.Char8 as BSC
import qualified Data.ByteString.Lazy as BL
import           Data.IORef (IORef, atomicModifyIORef', newIORef, readIORef)
import           Data.Maybe (fromMaybe)
import           Data.Text (Text, pack, unpack)
import           Data.Text.Encoding (encodeUtf8)
import qualified Data.Text.IO as TIO
import           Data.Typeable (Typeable)
import           GHC.Natural (Natural)

import qualified Network.Socket as Socket
import           System.IO (Handle, IOMode (..), hClose, stderr)
import           Text.Read (readMaybe)

import           Cardano.BM.Configuration
import           Cardano.BM.Data.Backend
import           Cardano.BM.Data.Configuration (RemoteAddr(..))
import           Cardano.BM.Data.LogItem (LOContent (..), LOMeta (..), LogObject (..))
import           Cardano.BM.Data.Severity (Severity (..))
import           Cardano.BM.IOManager
import           Cardano.BM.Plugin
import qualified Cardano.BM.Snocket as Snocket
import qualified Cardano.BM.Trace as Trace

\end{code}
%endif

|TraceForwarder| is a backend responsible for redirecting logs to a
different process (running a |TraceAcceptor| backend), by means of
either a pipe or a socket.

The |TraceForwarder| is looking up a minimum |Severity| in the options
section of the configuration. This filters out all messages that have not at
least the |Severity|.

The callback 'getStateDigest' is used as a source of |LogObject|s
that should be sent additionally, only once after the connection is
re\-established. The application that uses 'lobemo-backend-trace-forwarder'
plugin provides this callback.
\subsubsection{Plugin definition}
\begin{code}
plugin :: forall a s . (IsEffectuator s a, ToJSON a, FromJSON a)
       => Configuration
       -> Trace.Trace IO a
       -> s a
       -> Text
       -> IO [LogObject a]
       -> IO (Plugin a)
plugin :: Configuration
-> Trace IO a -> s a -> Text -> IO [LogObject a] -> IO (Plugin a)
plugin Configuration
config Trace IO a
_trace s a
_sb Text
tfid IO [LogObject a]
getStateDigest = do
    Maybe Text
opts <- Configuration -> Text -> IO (Maybe Text)
getTextOption Configuration
config Text
tfid
    let minsev :: Severity
minsev = case Maybe Text
opts of
                   Maybe Text
Nothing -> Severity
Debug
                   Just Text
sevtext -> Severity -> Maybe Severity -> Severity
forall a. a -> Maybe a -> a
fromMaybe Severity
Debug (String -> Maybe Severity
forall a. Read a => String -> Maybe a
readMaybe (String -> Maybe Severity) -> String -> Maybe Severity
forall a b. (a -> b) -> a -> b
$ Text -> String
unpack Text
sevtext)
    TraceForwarder a
be :: Cardano.BM.Backend.TraceForwarder.TraceForwarder a <- Configuration -> IO (TraceForwarder a)
forall (t :: * -> *) a. IsBackend t a => Configuration -> IO (t a)
realize Configuration
config
    Async ()
dispatcherThr <- Configuration -> TraceForwarderMVar a -> IO (Async ())
forall a.
ToJSON a =>
Configuration -> TraceForwarderMVar a -> IO (Async ())
spawnDispatcher Configuration
config (TraceForwarder a -> TraceForwarderMVar a
forall a. TraceForwarder a -> TraceForwarderMVar a
getTF TraceForwarder a
be)
    TraceForwarderMVar a
-> (TraceForwarderInternal a -> IO (TraceForwarderInternal a))
-> IO ()
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ (TraceForwarder a -> TraceForwarderMVar a
forall a. TraceForwarder a -> TraceForwarderMVar a
getTF TraceForwarder a
be) ((TraceForwarderInternal a -> IO (TraceForwarderInternal a))
 -> IO ())
-> (TraceForwarderInternal a -> IO (TraceForwarderInternal a))
-> IO ()
forall a b. (a -> b) -> a -> b
$ \TraceForwarderInternal a
initialBE ->
      TraceForwarderInternal a -> IO (TraceForwarderInternal a)
forall (m :: * -> *) a. Monad m => a -> m a
return (TraceForwarderInternal a -> IO (TraceForwarderInternal a))
-> TraceForwarderInternal a -> IO (TraceForwarderInternal a)
forall a b. (a -> b) -> a -> b
$ TraceForwarderInternal a
initialBE
                 { tfFilter :: Severity
tfFilter         = Severity
minsev
                 , tfDispatcher :: Maybe (Async ())
tfDispatcher     = Async () -> Maybe (Async ())
forall a. a -> Maybe a
Just Async ()
dispatcherThr
                 , tfGetStateDigest :: IO [LogObject a]
tfGetStateDigest = IO [LogObject a]
getStateDigest
                 }
    Plugin a -> IO (Plugin a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Plugin a -> IO (Plugin a)) -> Plugin a -> IO (Plugin a)
forall a b. (a -> b) -> a -> b
$ Backend a -> BackendKind -> Plugin a
forall a. Backend a -> BackendKind -> Plugin a
BackendPlugin
               (MkBackend :: forall a. (LogObject a -> IO ()) -> IO () -> Backend a
MkBackend { bEffectuate :: LogObject a -> IO ()
bEffectuate = TraceForwarder a -> LogObject a -> IO ()
forall (t :: * -> *) a.
IsEffectuator t a =>
t a -> LogObject a -> IO ()
effectuate TraceForwarder a
be, bUnrealize :: IO ()
bUnrealize = TraceForwarder a -> IO ()
forall (t :: * -> *) a. IsBackend t a => t a -> IO ()
unrealize TraceForwarder a
be })
               (TraceForwarder a -> BackendKind
forall (t :: * -> *) a. IsBackend t a => t a -> BackendKind
bekind TraceForwarder a
be)

\end{code}

\subsubsection{Structure of TraceForwarder}\label{code:TraceForwarder}\index{TraceForwarder}
Contains the handler to the pipe or to the socket.
\begin{code}
newtype TraceForwarder a = TraceForwarder
    { TraceForwarder a -> TraceForwarderMVar a
getTF :: TraceForwarderMVar a }

type TraceForwarderMVar a = MVar (TraceForwarderInternal a)

data TraceForwarderInternal a = TraceForwarderInternal
    { TraceForwarderInternal a -> TBQueue (LogObject a)
tfQueue            :: TBQ.TBQueue (LogObject a)
    , TraceForwarderInternal a -> Maybe Handle
tfHandle           :: Maybe Handle
    , TraceForwarderInternal a -> RemoteAddr
tfRemoteAddr       :: RemoteAddr
    , TraceForwarderInternal a -> Severity
tfFilter           :: Severity
    , TraceForwarderInternal a -> Maybe (Async ())
tfDispatcher       :: Maybe (Async.Async ())
    , TraceForwarderInternal a -> IORef Int
tfQueueFullCounter :: IORef Int
    , TraceForwarderInternal a -> IO [LogObject a]
tfGetStateDigest   :: IO [LogObject a]
    }
\end{code}

\subsubsection{TraceForwarder is an effectuator}\index{TraceForwarder!instance of IsEffectuator}
Every |LogObject| before being written to the given handler is converted to
|ByteString| through its |JSON| representation.
\begin{code}
instance (ToJSON a) => IsEffectuator TraceForwarder a where
    effectuate :: TraceForwarder a -> LogObject a -> IO ()
effectuate TraceForwarder a
tf LogObject a
lo = do
        let currentMVar :: TraceForwarderMVar a
currentMVar = TraceForwarder a -> TraceForwarderMVar a
forall a. TraceForwarder a -> TraceForwarderMVar a
getTF TraceForwarder a
tf
        TraceForwarderInternal a
currentTF <- TraceForwarderMVar a -> IO (TraceForwarderInternal a)
forall a. MVar a -> IO a
readMVar TraceForwarderMVar a
currentMVar
        -- Severity filter allows to ignore LogObjects with too low severity.
        -- However, errors and metrics should be forwarded in any case,
        -- regardless their severity.
        if Bool
isError
          then TraceForwarderInternal a -> IO ()
writeMessageToQueue TraceForwarderInternal a
currentTF
          else case LogObject a -> LOContent a
forall a. LogObject a -> LOContent a
loContent LogObject a
lo of
                 LogValue Text
_ Measurable
_ -> TraceForwarderInternal a -> IO ()
writeMessageToQueue TraceForwarderInternal a
currentTF
                 LOContent a
_            -> Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (LOMeta -> Severity
severity (LogObject a -> LOMeta
forall a. LogObject a -> LOMeta
loMeta LogObject a
lo) Severity -> Severity -> Bool
forall a. Ord a => a -> a -> Bool
>= TraceForwarderInternal a -> Severity
forall a. TraceForwarderInternal a -> Severity
tfFilter TraceForwarderInternal a
currentTF) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
                                   TraceForwarderInternal a -> IO ()
writeMessageToQueue TraceForwarderInternal a
currentTF
     where
      isError :: Bool
isError = Bool
errByConstr Bool -> Bool -> Bool
|| Bool
errBySev
       where
        errByConstr :: Bool
errByConstr =
          case LogObject a -> LOContent a
forall a. LogObject a -> LOContent a
loContent LogObject a
lo of
            LogError Text
_ -> Bool
True
            LOContent a
_          -> Bool
False
        errBySev :: Bool
errBySev = LOMeta -> Severity
severity (LogObject a -> LOMeta
forall a. LogObject a -> LOMeta
loMeta LogObject a
lo) Severity -> Severity -> Bool
forall a. Ord a => a -> a -> Bool
>= Severity
Error

      writeMessageToQueue :: TraceForwarderInternal a -> IO ()
writeMessageToQueue TraceForwarderInternal a
currentTF' = do
        let queue :: TBQueue (LogObject a)
queue = TraceForwarderInternal a -> TBQueue (LogObject a)
forall a. TraceForwarderInternal a -> TBQueue (LogObject a)
tfQueue TraceForwarderInternal a
currentTF'
        Bool
noCapacity <- STM Bool -> IO Bool
forall a. STM a -> IO a
atomically (STM Bool -> IO Bool) -> STM Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ TBQueue (LogObject a) -> STM Bool
forall a. TBQueue a -> STM Bool
TBQ.isFullTBQueue TBQueue (LogObject a)
queue
        if Bool
noCapacity
          then do
            let counterIORef :: IORef Int
counterIORef = TraceForwarderInternal a -> IORef Int
forall a. TraceForwarderInternal a -> IORef Int
tfQueueFullCounter TraceForwarderInternal a
currentTF'
            Bool
overflowed <- IORef Int -> (Int -> (Int, Bool)) -> IO Bool
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef Int
counterIORef ((Int -> (Int, Bool)) -> IO Bool)
-> (Int -> (Int, Bool)) -> IO Bool
forall a b. (a -> b) -> a -> b
$ \Int
counter ->
              if Int
counter Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
overflowCriticalNum
                then (Int
1, Bool
True)
                else (Int
counter Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1, Bool
False)
            Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
overflowed (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ TraceForwarder a -> IO ()
forall (t :: * -> *) a. IsEffectuator t a => t a -> IO ()
handleOverflow TraceForwarder a
tf
          else STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TBQueue (LogObject a) -> LogObject a -> STM ()
forall a. TBQueue a -> a -> STM ()
TBQ.writeTBQueue TBQueue (LogObject a)
queue LogObject a
lo

    handleOverflow :: TraceForwarder a -> IO ()
handleOverflow TraceForwarder a
_ = Handle -> Text -> IO ()
TIO.hPutStrLn Handle
stderr (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$ Text
"Notice: TraceForwarder's queue is full, "
                                              Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
pack (Int -> String
forall a. Show a => a -> String
show Int
overflowCriticalNum)
                                              Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" log items were dropped!"

overflowCriticalNum :: Int
overflowCriticalNum :: Int
overflowCriticalNum = Int
200

\end{code}


\subsubsection{|TraceForwarder| implements |Backend| functions}\index{TraceForwarder!instance of IsBackend}

|TraceForwarder| is an |IsBackend|
\begin{code}
instance (FromJSON a, ToJSON a) => IsBackend TraceForwarder a where
    type BackendFailure TraceForwarder = TraceForwarderBackendFailure

    bekind :: TraceForwarder a -> BackendKind
bekind TraceForwarder a
_ = BackendKind
TraceForwarderBK

    realize :: Configuration -> IO (TraceForwarder a)
realize Configuration
cfg = Configuration -> IO (Maybe RemoteAddr)
getForwardTo Configuration
cfg IO (Maybe RemoteAddr)
-> (Maybe RemoteAddr -> IO (TraceForwarder a))
-> IO (TraceForwarder a)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
      Maybe RemoteAddr
Nothing -> String -> IO (TraceForwarder a)
forall (m :: * -> *) a. MonadFail m => String -> m a
fail String
"Trace forwarder not configured:  option 'forwardTo'"
      Just RemoteAddr
addr -> do
        TBQueue (LogObject a)
queue <- STM (TBQueue (LogObject a)) -> IO (TBQueue (LogObject a))
forall a. STM a -> IO a
atomically (STM (TBQueue (LogObject a)) -> IO (TBQueue (LogObject a)))
-> STM (TBQueue (LogObject a)) -> IO (TBQueue (LogObject a))
forall a b. (a -> b) -> a -> b
$ Natural -> STM (TBQueue (LogObject a))
forall a. Natural -> STM (TBQueue a)
TBQ.newTBQueue Natural
queueMaxSize
        IORef Int
counter <- Int -> IO (IORef Int)
forall a. a -> IO (IORef a)
newIORef Int
0
        MVar (TraceForwarderInternal a)
tfMVar <- TraceForwarderInternal a -> IO (MVar (TraceForwarderInternal a))
forall a. a -> IO (MVar a)
newMVar (TraceForwarderInternal a -> IO (MVar (TraceForwarderInternal a)))
-> TraceForwarderInternal a -> IO (MVar (TraceForwarderInternal a))
forall a b. (a -> b) -> a -> b
$ TraceForwarderInternal :: forall a.
TBQueue (LogObject a)
-> Maybe Handle
-> RemoteAddr
-> Severity
-> Maybe (Async ())
-> IORef Int
-> IO [LogObject a]
-> TraceForwarderInternal a
TraceForwarderInternal
                              { tfQueue :: TBQueue (LogObject a)
tfQueue            = TBQueue (LogObject a)
queue
                              , tfHandle :: Maybe Handle
tfHandle           = Maybe Handle
forall a. Maybe a
Nothing
                              , tfRemoteAddr :: RemoteAddr
tfRemoteAddr       = RemoteAddr
addr
                              , tfFilter :: Severity
tfFilter           = Severity
Debug
                              , tfDispatcher :: Maybe (Async ())
tfDispatcher       = Maybe (Async ())
forall a. Maybe a
Nothing
                              , tfQueueFullCounter :: IORef Int
tfQueueFullCounter = IORef Int
counter
                              , tfGetStateDigest :: IO [LogObject a]
tfGetStateDigest   = [LogObject a] -> IO [LogObject a]
forall (m :: * -> *) a. Monad m => a -> m a
return []
                              }
        TraceForwarder a -> IO (TraceForwarder a)
forall (m :: * -> *) a. Monad m => a -> m a
return (TraceForwarder a -> IO (TraceForwarder a))
-> TraceForwarder a -> IO (TraceForwarder a)
forall a b. (a -> b) -> a -> b
$ MVar (TraceForwarderInternal a) -> TraceForwarder a
forall a. TraceForwarderMVar a -> TraceForwarder a
TraceForwarder MVar (TraceForwarderInternal a)
tfMVar

    unrealize :: TraceForwarder a -> IO ()
unrealize TraceForwarder a
tf = do
      TraceForwarderInternal a
currentTF <- MVar (TraceForwarderInternal a) -> IO (TraceForwarderInternal a)
forall a. MVar a -> IO a
readMVar (TraceForwarder a -> MVar (TraceForwarderInternal a)
forall a. TraceForwarder a -> TraceForwarderMVar a
getTF TraceForwarder a
tf)
      -- Cancel dispatcher thread.
      case TraceForwarderInternal a -> Maybe (Async ())
forall a. TraceForwarderInternal a -> Maybe (Async ())
tfDispatcher TraceForwarderInternal a
currentTF of
        Maybe (Async ())
Nothing  -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
        Just Async ()
thr -> Async () -> IO ()
forall a. Async a -> IO ()
Async.uninterruptibleCancel Async ()
thr
      -- If there's a handle - close it.
      Maybe Handle -> IO ()
closeHandle (Maybe Handle -> IO ()) -> Maybe Handle -> IO ()
forall a b. (a -> b) -> a -> b
$ TraceForwarderInternal a -> Maybe Handle
forall a. TraceForwarderInternal a -> Maybe Handle
tfHandle TraceForwarderInternal a
currentTF

closeHandle :: Maybe Handle -> IO ()
closeHandle :: Maybe Handle -> IO ()
closeHandle (Just Handle
h) = Handle -> IO ()
hClose Handle
h
closeHandle Maybe Handle
Nothing  = () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()

connectForwarder :: IOManager -> RemoteAddr -> IO Handle
connectForwarder :: IOManager -> RemoteAddr -> IO Handle
connectForwarder IOManager
iomgr (RemotePipe String
pipePath) = do
  let sn :: LocalSnocket
sn = IOManager -> String -> LocalSnocket
Snocket.localSnocket IOManager
iomgr String
pipePath
  LocalFD -> IO Handle
Snocket.localFDToHandle (LocalFD -> IO Handle) -> IO LocalFD -> IO Handle
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< LocalSnocket -> LocalAddress -> IO LocalFD
forall fd addr. Snocket IO fd addr -> addr -> IO fd
doConnect LocalSnocket
sn (String -> LocalAddress
Snocket.localAddressFromPath String
pipePath)
connectForwarder IOManager
iomgr (RemoteSocket String
host String
port) = do
  let sn :: SocketSnocket
sn = IOManager -> SocketSnocket
Snocket.socketSnocket IOManager
iomgr
  [AddrInfo]
addrs <- Maybe AddrInfo -> Maybe String -> Maybe String -> IO [AddrInfo]
Socket.getAddrInfo Maybe AddrInfo
forall a. Maybe a
Nothing (String -> Maybe String
forall a. a -> Maybe a
Just String
host) (String -> Maybe String
forall a. a -> Maybe a
Just String
port)
  case [AddrInfo]
addrs of
    [] -> TraceForwarderBackendFailure -> IO Handle
forall e a. Exception e => e -> IO a
throwIO (String -> TraceForwarderBackendFailure
TraceForwarderSocketError (String
"bad socket address: " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
host String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
":" String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
port))
    AddrInfo
a:[AddrInfo]
_ -> SocketSnocket -> SockAddr -> IO LocalFD
forall fd addr. Snocket IO fd addr -> addr -> IO fd
doConnect SocketSnocket
sn (AddrInfo -> SockAddr
Socket.addrAddress AddrInfo
a)
           IO LocalFD -> (LocalFD -> IO Handle) -> IO Handle
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (LocalFD -> IOMode -> IO Handle) -> IOMode -> LocalFD -> IO Handle
forall a b c. (a -> b -> c) -> b -> a -> c
flip LocalFD -> IOMode -> IO Handle
Socket.socketToHandle IOMode
ReadWriteMode

doConnect :: Snocket.Snocket IO fd addr -> addr -> IO fd
doConnect :: Snocket IO fd addr -> addr -> IO fd
doConnect Snocket IO fd addr
sn addr
remoteAddr = do
  fd
sd <- Snocket IO fd addr -> addr -> IO fd
forall (m :: * -> *) fd addr. Snocket m fd addr -> addr -> m fd
Snocket.openToConnect Snocket IO fd addr
sn addr
remoteAddr
  Snocket IO fd addr -> fd -> addr -> IO ()
forall (m :: * -> *) fd addr.
Snocket m fd addr -> fd -> addr -> m ()
Snocket.connect Snocket IO fd addr
sn fd
sd addr
remoteAddr
  fd -> IO fd
forall (f :: * -> *) a. Applicative f => a -> f a
pure fd
sd

data TraceForwarderBackendFailure
  = TraceForwarderConnectionError String
  | TraceForwarderSocketError String
  deriving (Int -> TraceForwarderBackendFailure -> String -> String
[TraceForwarderBackendFailure] -> String -> String
TraceForwarderBackendFailure -> String
(Int -> TraceForwarderBackendFailure -> String -> String)
-> (TraceForwarderBackendFailure -> String)
-> ([TraceForwarderBackendFailure] -> String -> String)
-> Show TraceForwarderBackendFailure
forall a.
(Int -> a -> String -> String)
-> (a -> String) -> ([a] -> String -> String) -> Show a
showList :: [TraceForwarderBackendFailure] -> String -> String
$cshowList :: [TraceForwarderBackendFailure] -> String -> String
show :: TraceForwarderBackendFailure -> String
$cshow :: TraceForwarderBackendFailure -> String
showsPrec :: Int -> TraceForwarderBackendFailure -> String -> String
$cshowsPrec :: Int -> TraceForwarderBackendFailure -> String -> String
Show, Typeable)

instance Exception TraceForwarderBackendFailure

\end{code}

\subsubsection{Asynchronously reading log items from the queue and sending them to an acceptor.}
\begin{code}
spawnDispatcher :: ToJSON a => Configuration -> TraceForwarderMVar a -> IO (Async.Async ())
spawnDispatcher :: Configuration -> TraceForwarderMVar a -> IO (Async ())
spawnDispatcher Configuration
config TraceForwarderMVar a
tfMVar = do
  -- To reduce network traffic it's possible to send log items not one by one,
  -- but collect them in the queue and periodically send this list as one ByteString.
  Word
forwardDelay <- Configuration -> IO (Maybe Word)
getForwardDelay Configuration
config IO (Maybe Word) -> (Maybe Word -> IO Word) -> IO Word
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
    Maybe Word
Nothing -> Word -> IO Word
forall (f :: * -> *) a. Applicative f => a -> f a
pure Word
defaultDelayInMs
    Just Word
delayInMs -> Word -> IO Word
forall (f :: * -> *) a. Applicative f => a -> f a
pure Word
delayInMs
  IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
Async.async (IO () -> IO (Async ())) -> IO () -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ Word -> IO ()
processQueue Word
forwardDelay
 where
  -- If the configuration doesn't specify forward delay, use default one.
  defaultDelayInMs :: Word
  defaultDelayInMs :: Word
defaultDelayInMs = Word
1000

  processQueue :: Word -> IO ()
  processQueue :: Word -> IO ()
processQueue Word
delayInMs = IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
    Int -> IO ()
threadDelay (Int -> IO ()) -> Int -> IO ()
forall a b. (a -> b) -> a -> b
$ Word -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word
delayInMs Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1000
    TraceForwarderInternal a
currentTF <- TraceForwarderMVar a -> IO (TraceForwarderInternal a)
forall a. MVar a -> IO a
readMVar TraceForwarderMVar a
tfMVar
    let getStateDigest :: IO [LogObject a]
getStateDigest = TraceForwarderInternal a -> IO [LogObject a]
forall a. TraceForwarderInternal a -> IO [LogObject a]
tfGetStateDigest TraceForwarderInternal a
currentTF
    [LogObject a]
itemsList <- STM [LogObject a] -> IO [LogObject a]
forall a. STM a -> IO a
atomically (STM [LogObject a] -> IO [LogObject a])
-> STM [LogObject a] -> IO [LogObject a]
forall a b. (a -> b) -> a -> b
$ TBQueue (LogObject a) -> STM [LogObject a]
forall a. TBQueue a -> STM [a]
TBQ.flushTBQueue (TraceForwarderInternal a -> TBQueue (LogObject a)
forall a. TraceForwarderInternal a -> TBQueue (LogObject a)
tfQueue TraceForwarderInternal a
currentTF)
    -- Try to write it to the handle. If there's a problem with connection,
    -- this thread will initiate re\-establishing of the connection and
    -- will wait until it's established.
    Configuration
-> TraceForwarderMVar a
-> [LogObject a]
-> IO [LogObject a]
-> IO ()
forall a.
ToJSON a =>
Configuration
-> TraceForwarderMVar a
-> [LogObject a]
-> IO [LogObject a]
-> IO ()
sendItems Configuration
config TraceForwarderMVar a
tfMVar [LogObject a]
itemsList IO [LogObject a]
getStateDigest

-- Try to send log items to the handle.
sendItems :: ToJSON a
          => Configuration
          -> TraceForwarderMVar a
          -> [LogObject a]
          -> IO [LogObject a]
          -> IO ()
sendItems :: Configuration
-> TraceForwarderMVar a
-> [LogObject a]
-> IO [LogObject a]
-> IO ()
sendItems Configuration
_ TraceForwarderMVar a
_ [] IO [LogObject a]
_ = () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
sendItems Configuration
config TraceForwarderMVar a
tfMVar items :: [LogObject a]
items@(LogObject a
lo:[LogObject a]
_) IO [LogObject a]
getStateDigest =
  TraceForwarderInternal a -> Maybe Handle
forall a. TraceForwarderInternal a -> Maybe Handle
tfHandle (TraceForwarderInternal a -> Maybe Handle)
-> IO (TraceForwarderInternal a) -> IO (Maybe Handle)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TraceForwarderMVar a -> IO (TraceForwarderInternal a)
forall a. MVar a -> IO a
readMVar TraceForwarderMVar a
tfMVar IO (Maybe Handle) -> (Maybe Handle -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
    Maybe Handle
Nothing -> do
      -- There's no handle, initiate the connection.
      Int -> Int -> TraceForwarderMVar a -> IO ()
forall a. Int -> Int -> TraceForwarderMVar a -> IO ()
establishConnection Int
1 Int
1 TraceForwarderMVar a
tfMVar
      -- Connection is re\-established, try to send log item.
      -- Since the connection is re\-established, add state digest items as well.
      [LogObject a]
additionalItems <- IO [LogObject a]
getStateDigest
      let allItems :: [LogObject a]
allItems = [LogObject a]
additionalItems [LogObject a] -> [LogObject a] -> [LogObject a]
forall a. [a] -> [a] -> [a]
++ [LogObject a]
items
      Configuration
-> TraceForwarderMVar a
-> [LogObject a]
-> IO [LogObject a]
-> IO ()
forall a.
ToJSON a =>
Configuration
-> TraceForwarderMVar a
-> [LogObject a]
-> IO [LogObject a]
-> IO ()
sendItems Configuration
config TraceForwarderMVar a
tfMVar [LogObject a]
allItems IO [LogObject a]
getStateDigest
    Just Handle
h ->
      IO () -> IO (Either IOException ())
forall e a. Exception e => IO a -> IO (Either e a)
try (Handle -> ByteString -> IO ()
BSC.hPutStrLn Handle
h (ByteString -> IO ()) -> ByteString -> IO ()
forall a b. (a -> b) -> a -> b
$! ByteString
encodedHostname) IO (Either IOException ())
-> (Either IOException () -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
        Right ()
_ ->
          -- Hostname was written to the handler successfully,
          -- try to write serialized list of LogObjects.
          IO () -> IO (Either IOException ())
forall e a. Exception e => IO a -> IO (Either e a)
try (Handle -> ByteString -> IO ()
BSC.hPutStrLn Handle
h (ByteString -> IO ()) -> ByteString -> IO ()
forall a b. (a -> b) -> a -> b
$! ByteString
bs) IO (Either IOException ())
-> (Either IOException () -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
            Right ()
_ ->
              () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return () -- Everything is ok, LogObjects were written to the handler.
            Left (IOException
_e :: IOException) -> do
              IO ()
reConnectIfQueueIsAlmostFull
              Int -> IO ()
threadDelay Int
10000
              Configuration
-> TraceForwarderMVar a
-> [LogObject a]
-> IO [LogObject a]
-> IO ()
forall a.
ToJSON a =>
Configuration
-> TraceForwarderMVar a
-> [LogObject a]
-> IO [LogObject a]
-> IO ()
sendItems Configuration
config TraceForwarderMVar a
tfMVar [LogObject a]
items IO [LogObject a]
getStateDigest
        Left (IOException
_e :: IOException) -> do
          IO ()
reConnectIfQueueIsAlmostFull
          Int -> IO ()
threadDelay Int
10000
          Configuration
-> TraceForwarderMVar a
-> [LogObject a]
-> IO [LogObject a]
-> IO ()
forall a.
ToJSON a =>
Configuration
-> TraceForwarderMVar a
-> [LogObject a]
-> IO [LogObject a]
-> IO ()
sendItems Configuration
config TraceForwarderMVar a
tfMVar [LogObject a]
items IO [LogObject a]
getStateDigest
 where
  encodedHostname :: ByteString
encodedHostname = Text -> ByteString
encodeUtf8 (LOMeta -> Text
hostname (LOMeta -> Text) -> (LogObject a -> LOMeta) -> LogObject a -> Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. LogObject a -> LOMeta
forall a. LogObject a -> LOMeta
loMeta (LogObject a -> Text) -> LogObject a -> Text
forall a b. (a -> b) -> a -> b
$ LogObject a
lo)

  (Int
_, ByteString
bs) = [LogObject a] -> (Int, ByteString)
forall b. ToJSON b => b -> (Int, ByteString)
jsonToBS [LogObject a]
items

  jsonToBS :: ToJSON b => b -> (Int, BS.ByteString)
  jsonToBS :: b -> (Int, ByteString)
jsonToBS b
a =
    let bs' :: ByteString
bs' = ByteString -> ByteString
BL.toStrict (ByteString -> ByteString) -> ByteString -> ByteString
forall a b. (a -> b) -> a -> b
$ b -> ByteString
forall a. ToJSON a => a -> ByteString
encode b
a
    in (ByteString -> Int
BS.length ByteString
bs', ByteString
bs')

  -- Handle is bad, it looks like the connection is broken.
  -- Check if the queue is almost full.
  reConnectIfQueueIsAlmostFull :: IO ()
reConnectIfQueueIsAlmostFull = do
    TraceForwarderInternal a
currentTF <- TraceForwarderMVar a -> IO (TraceForwarderInternal a)
forall a. MVar a -> IO a
readMVar TraceForwarderMVar a
tfMVar
    Natural
currentQueueSize <- STM Natural -> IO Natural
forall a. STM a -> IO a
atomically (STM Natural -> IO Natural) -> STM Natural -> IO Natural
forall a b. (a -> b) -> a -> b
$ TBQueue (LogObject a) -> STM Natural
forall a. TBQueue a -> STM Natural
TBQ.lengthTBQueue (TraceForwarderInternal a -> TBQueue (LogObject a)
forall a. TraceForwarderInternal a -> TBQueue (LogObject a)
tfQueue TraceForwarderInternal a
currentTF)
    Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Natural -> Bool
forall a. Integral a => a -> Bool
queueIsAlmostFull Natural
currentQueueSize) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
      -- The queue is almost full, it means that log items will be dropped soon.
      -- Initiate re-establishing of connection.
      Maybe Handle -> IO ()
closeHandle (Maybe Handle -> IO ()) -> Maybe Handle -> IO ()
forall a b. (a -> b) -> a -> b
$ TraceForwarderInternal a -> Maybe Handle
forall a. TraceForwarderInternal a -> Maybe Handle
tfHandle TraceForwarderInternal a
currentTF
      TraceForwarderMVar a
-> (TraceForwarderInternal a -> IO (TraceForwarderInternal a))
-> IO ()
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ TraceForwarderMVar a
tfMVar ((TraceForwarderInternal a -> IO (TraceForwarderInternal a))
 -> IO ())
-> (TraceForwarderInternal a -> IO (TraceForwarderInternal a))
-> IO ()
forall a b. (a -> b) -> a -> b
$ \TraceForwarderInternal a
be -> TraceForwarderInternal a -> IO (TraceForwarderInternal a)
forall (m :: * -> *) a. Monad m => a -> m a
return (TraceForwarderInternal a -> IO (TraceForwarderInternal a))
-> TraceForwarderInternal a -> IO (TraceForwarderInternal a)
forall a b. (a -> b) -> a -> b
$ TraceForwarderInternal a
be { tfHandle :: Maybe Handle
tfHandle = Maybe Handle
forall a. Maybe a
Nothing }

  -- When the queue is almost full (80 percent of its max size)
  -- we initiate re-establishing of connection.
  queueIsAlmostFull :: a -> Bool
queueIsAlmostFull a
queueSize = a
queueSize a -> a -> Bool
forall a. Ord a => a -> a -> Bool
>= Float -> a
forall a b. (RealFrac a, Integral b) => a -> b
round Float
almostFullSize
   where
    almostFullSize :: Float
    almostFullSize :: Float
almostFullSize = Float
0.8 Float -> Float -> Float
forall a. Num a => a -> a -> a
* Natural -> Float
forall a b. (Integral a, Num b) => a -> b
fromIntegral Natural
queueMaxSize

queueMaxSize :: Natural
queueMaxSize :: Natural
queueMaxSize = Natural
2500

establishConnection :: Int -> Int -> TraceForwarderMVar a -> IO ()
establishConnection :: Int -> Int -> TraceForwarderMVar a -> IO ()
establishConnection Int
delayInSec Int
delayInSec' TraceForwarderMVar a
tfMVar = (IOManager -> IO ()) -> IO ()
WithIOManager
withIOManager ((IOManager -> IO ()) -> IO ()) -> (IOManager -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \IOManager
iomgr -> do
  RemoteAddr
addr <- TraceForwarderInternal a -> RemoteAddr
forall a. TraceForwarderInternal a -> RemoteAddr
tfRemoteAddr (TraceForwarderInternal a -> RemoteAddr)
-> IO (TraceForwarderInternal a) -> IO RemoteAddr
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TraceForwarderMVar a -> IO (TraceForwarderInternal a)
forall a. MVar a -> IO a
readMVar TraceForwarderMVar a
tfMVar
  IO Handle -> IO (Either IOException Handle)
forall e a. Exception e => IO a -> IO (Either e a)
try (IOManager -> RemoteAddr -> IO Handle
connectForwarder IOManager
iomgr RemoteAddr
addr) IO (Either IOException Handle)
-> (Either IOException Handle -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
    Right Handle
h ->
      TraceForwarderMVar a
-> (TraceForwarderInternal a -> IO (TraceForwarderInternal a))
-> IO ()
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ TraceForwarderMVar a
tfMVar ((TraceForwarderInternal a -> IO (TraceForwarderInternal a))
 -> IO ())
-> (TraceForwarderInternal a -> IO (TraceForwarderInternal a))
-> IO ()
forall a b. (a -> b) -> a -> b
$ \TraceForwarderInternal a
be -> TraceForwarderInternal a -> IO (TraceForwarderInternal a)
forall (m :: * -> *) a. Monad m => a -> m a
return (TraceForwarderInternal a -> IO (TraceForwarderInternal a))
-> TraceForwarderInternal a -> IO (TraceForwarderInternal a)
forall a b. (a -> b) -> a -> b
$ TraceForwarderInternal a
be { tfHandle :: Maybe Handle
tfHandle = Handle -> Maybe Handle
forall a. a -> Maybe a
Just Handle
h }
    Left (IOException
e :: IOException) -> do
      -- Cannot establish it, let's try again..
      Int -> IO ()
threadDelay (Int -> IO ()) -> Int -> IO ()
forall a b. (a -> b) -> a -> b
$ Int
1000000 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
delayInSec'
      if Int
delayInSec' Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
60
        then
          -- Next attempt to re-establish the connection will be perform after Fibonacci-calculated delay.
          Int -> Int -> TraceForwarderMVar a -> IO ()
forall a. Int -> Int -> TraceForwarderMVar a -> IO ()
establishConnection Int
delayInSec' (Int
delayInSec Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
delayInSec') TraceForwarderMVar a
tfMVar
        else
          -- Next attempt to re-establish the connection will be perform after fixed delay (1 minute).
          Int -> Int -> TraceForwarderMVar a -> IO ()
forall a. Int -> Int -> TraceForwarderMVar a -> IO ()
establishConnection Int
1 Int
60 TraceForwarderMVar a
tfMVar

\end{code}