\subsection{Cardano.BM.Backend.TraceAcceptor}
\label{module:Cardano.BM.Backend.TraceAcceptor}


%if style == newcode
\begin{code}
{-# LANGUAGE BlockArguments        #-}
{-# LANGUAGE CPP                   #-}
{-# LANGUAGE FlexibleInstances     #-}
{-# LANGUAGE LambdaCase            #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE RankNTypes            #-}
{-# LANGUAGE ScopedTypeVariables   #-}
{-# LANGUAGE TypeFamilies          #-}
{-# OPTIONS_GHC -Wextra            #-}

#if !defined(mingw32_HOST_OS)
#define POSIX
#endif

module Cardano.BM.Backend.TraceAcceptor
    ( TraceAcceptor
    , plugin
    ) where

import qualified Control.Concurrent.Async as Async
import           Control.Exception
import           Control.Monad (forM, unless)
import           Data.Aeson (FromJSON, ToJSON, eitherDecodeStrict)
import qualified Data.ByteString as BS
import           Data.Text (pack)
import           Data.Text.Encoding (decodeUtf8)
import           Data.Typeable (Typeable)
import qualified Network.Socket as Socket
import           System.IO (Handle)
import qualified System.IO as IO

import           Cardano.BM.Configuration
import           Cardano.BM.Data.Backend
import           Cardano.BM.Data.Configuration (RemoteAddr(..), RemoteAddrNamed(..))
import           Cardano.BM.Data.LogItem
                   ( LOContent (LogError), LOMeta (..), LogObject (..)
                   , PrivacyAnnotation (Public), loName, mkLOMeta
                   )
import           Cardano.BM.Data.Severity (Severity (..))
import           Cardano.BM.Data.Tracer (traceWith)
import           Cardano.BM.IOManager
import           Cardano.BM.Plugin
import qualified Cardano.BM.Snocket as Snocket
import qualified Cardano.BM.Trace as Trace

\end{code}
%endif
|TraceAcceptor| is a backend responsible for processing |LogObject|s of an
external process captured by a pipe or socket. At the time being it redirects
the |LogObject|s to the |SwitchBoard|.

\subsubsection{Plugin definition}
\begin{code}
plugin :: forall s a
       . (IsEffectuator s a, ToJSON a, FromJSON a)
       => IOManager -> Configuration -> Trace.Trace IO a -> s a -> IO (Plugin a)
plugin :: IOManager -> Configuration -> Trace IO a -> s a -> IO (Plugin a)
plugin IOManager
iomgr Configuration
cf Trace IO a
basicTrace s a
_ = Configuration -> IO (Maybe [RemoteAddrNamed])
getAcceptAt Configuration
cf IO (Maybe [RemoteAddrNamed])
-> (Maybe [RemoteAddrNamed] -> IO (Plugin a)) -> IO (Plugin a)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
  Just [RemoteAddrNamed]
acceptors -> do
    [(IO (), Async ())]
socketsNServers <- [RemoteAddrNamed]
-> (RemoteAddrNamed -> IO (IO (), Async ()))
-> IO [(IO (), Async ())]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [RemoteAddrNamed]
acceptors ((RemoteAddrNamed -> IO (IO (), Async ()))
 -> IO [(IO (), Async ())])
-> (RemoteAddrNamed -> IO (IO (), Async ()))
-> IO [(IO (), Async ())]
forall a b. (a -> b) -> a -> b
$ \(RemoteAddrNamed Text
nodeName RemoteAddr
addr) -> do
      let trace :: Trace IO a
trace = Text -> Trace IO a -> Trace IO a
forall (m :: * -> *) a. Text -> Trace m a -> Trace m a
Trace.appendName Text
nodeName Trace IO a
basicTrace
      (IO ()
serverCleanup, Async ()
serverThr) <- Configuration
-> Trace IO a -> IOManager -> RemoteAddr -> IO (IO (), Async ())
forall a.
FromJSON a =>
Configuration
-> Trace IO a -> IOManager -> RemoteAddr -> IO (IO (), Async ())
acceptorForAddress Configuration
cf Trace IO a
trace IOManager
iomgr RemoteAddr
addr
      Async () -> IO ()
forall a. Async a -> IO ()
Async.link Async ()
serverThr
      (IO (), Async ()) -> IO (IO (), Async ())
forall (m :: * -> *) a. Monad m => a -> m a
return (IO ()
serverCleanup, Async ()
serverThr)

    let ([IO ()]
cleanups, [Async ()]
servers) = [(IO (), Async ())] -> ([IO ()], [Async ()])
forall a b. [(a, b)] -> ([a], [b])
unzip [(IO (), Async ())]
socketsNServers
        be :: (Cardano.BM.Backend.TraceAcceptor.TraceAcceptor a)
        be :: TraceAcceptor a
be = TraceAcceptor :: forall a. [Async ()] -> IO () -> TraceAcceptor a
TraceAcceptor
             { taServers :: [Async ()]
taServers   = [Async ()]
servers
             , taShutdown :: IO ()
taShutdown  = [IO ()] -> IO ()
forall (t :: * -> *) (m :: * -> *) a.
(Foldable t, Monad m) =>
t (m a) -> m ()
sequence_ [IO ()]
cleanups
             }
    Plugin a -> IO (Plugin a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Plugin a -> IO (Plugin a)) -> Plugin a -> IO (Plugin a)
forall a b. (a -> b) -> a -> b
$ Backend a -> BackendKind -> Plugin a
forall a. Backend a -> BackendKind -> Plugin a
BackendPlugin
               (MkBackend :: forall a. (LogObject a -> IO ()) -> IO () -> Backend a
MkBackend { bEffectuate :: LogObject a -> IO ()
bEffectuate = TraceAcceptor a -> LogObject a -> IO ()
forall (t :: * -> *) a.
IsEffectuator t a =>
t a -> LogObject a -> IO ()
effectuate TraceAcceptor a
be
                          , bUnrealize :: IO ()
bUnrealize = TraceAcceptor a -> IO ()
forall (t :: * -> *) a. IsBackend t a => t a -> IO ()
unrealize TraceAcceptor a
be })
               (TraceAcceptor a -> BackendKind
forall (t :: * -> *) a. IsBackend t a => t a -> BackendKind
bekind TraceAcceptor a
be)
  Maybe [RemoteAddrNamed]
Nothing -> String -> IO (Plugin a)
forall (m :: * -> *) a. MonadFail m => String -> m a
fail String
"TraceAcceptor not configured: no traceAcceptAt option"

\end{code}

\subsubsection{Structure of TraceAcceptor}\label{code:TraceAcceptor}\index{TraceAcceptor}
\begin{code}
data TraceAcceptor a = TraceAcceptor
    { TraceAcceptor a -> [Async ()]
taServers  :: [Async.Async ()]
    , TraceAcceptor a -> IO ()
taShutdown :: IO ()
    }

instance IsEffectuator TraceAcceptor a where
    effectuate :: TraceAcceptor a -> LogObject a -> IO ()
effectuate TraceAcceptor a
_ta LogObject a
_item = () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
    handleOverflow :: TraceAcceptor a -> IO ()
handleOverflow TraceAcceptor a
_ta = () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

instance (ToJSON a, FromJSON a) => IsBackend TraceAcceptor a where
    type BackendFailure TraceAcceptor = TraceAcceptorBackendFailure

    bekind :: TraceAcceptor a -> BackendKind
bekind TraceAcceptor a
_ = BackendKind
TraceAcceptorBK

    realize :: Configuration -> IO (TraceAcceptor a)
realize Configuration
_ = String -> IO (TraceAcceptor a)
forall (m :: * -> *) a. MonadFail m => String -> m a
fail String
"TraceAcceptor cannot be instantiated by 'realize'"

    realizefrom :: Configuration -> Trace IO a -> s a -> IO (TraceAcceptor a)
realizefrom Configuration
_ Trace IO a
_ s a
_ = String -> IO (TraceAcceptor a)
forall (m :: * -> *) a. MonadFail m => String -> m a
fail String
"TraceAcceptor cannot be instantiated by 'realizefrom'"

    unrealize :: TraceAcceptor a -> IO ()
unrealize TraceAcceptor a
ta = do
      (Async () -> IO ()) -> [Async ()] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ Async () -> IO ()
forall a. Async a -> IO ()
Async.cancel ([Async ()] -> IO ()) -> [Async ()] -> IO ()
forall a b. (a -> b) -> a -> b
$ TraceAcceptor a -> [Async ()]
forall a. TraceAcceptor a -> [Async ()]
taServers TraceAcceptor a
ta
      TraceAcceptor a -> IO ()
forall a. TraceAcceptor a -> IO ()
taShutdown TraceAcceptor a
ta

handleError :: (String -> BackendFailure TraceAcceptor) -> IO a -> IO a
handleError :: (String -> BackendFailure TraceAcceptor) -> IO a -> IO a
handleError String -> BackendFailure TraceAcceptor
ctor = (IOException -> IO a) -> IO a -> IO a
forall e a. Exception e => (e -> IO a) -> IO a -> IO a
handle ((IOException -> IO a) -> IO a -> IO a)
-> (IOException -> IO a) -> IO a -> IO a
forall a b. (a -> b) -> a -> b
$ \(IOException
e :: IOException) -> TraceAcceptorBackendFailure -> IO a
forall e a. Exception e => e -> IO a
throwIO (TraceAcceptorBackendFailure -> IO a)
-> (IOException -> TraceAcceptorBackendFailure)
-> IOException
-> IO a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> BackendFailure TraceAcceptor
String -> TraceAcceptorBackendFailure
ctor (String -> TraceAcceptorBackendFailure)
-> (IOException -> String)
-> IOException
-> TraceAcceptorBackendFailure
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IOException -> String
forall a. Show a => a -> String
show (IOException -> IO a) -> IOException -> IO a
forall a b. (a -> b) -> a -> b
$ IOException
e

data TraceAcceptorBackendFailure
  = TraceAcceptorPipeError String
  | TraceAcceptorSocketError String
  | TraceAcceptorServerError String
  | TraceAcceptorClientThreadError String
  deriving (Int -> TraceAcceptorBackendFailure -> ShowS
[TraceAcceptorBackendFailure] -> ShowS
TraceAcceptorBackendFailure -> String
(Int -> TraceAcceptorBackendFailure -> ShowS)
-> (TraceAcceptorBackendFailure -> String)
-> ([TraceAcceptorBackendFailure] -> ShowS)
-> Show TraceAcceptorBackendFailure
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [TraceAcceptorBackendFailure] -> ShowS
$cshowList :: [TraceAcceptorBackendFailure] -> ShowS
show :: TraceAcceptorBackendFailure -> String
$cshow :: TraceAcceptorBackendFailure -> String
showsPrec :: Int -> TraceAcceptorBackendFailure -> ShowS
$cshowsPrec :: Int -> TraceAcceptorBackendFailure -> ShowS
Show, Typeable)

instance Exception TraceAcceptorBackendFailure

acceptorForAddress
  :: FromJSON a
  => Configuration
  -> Trace.Trace IO a
  -> IOManager
  -> RemoteAddr
  -> IO (IO (), Async.Async ())
acceptorForAddress :: Configuration
-> Trace IO a -> IOManager -> RemoteAddr -> IO (IO (), Async ())
acceptorForAddress Configuration
config Trace IO a
trace IOManager
iomgr (RemotePipe String
pipePath) =
  (String -> BackendFailure TraceAcceptor)
-> IO (IO (), Async ()) -> IO (IO (), Async ())
forall a. (String -> BackendFailure TraceAcceptor) -> IO a -> IO a
handleError String -> BackendFailure TraceAcceptor
String -> TraceAcceptorBackendFailure
TraceAcceptorPipeError (IO (IO (), Async ()) -> IO (IO (), Async ()))
-> IO (IO (), Async ()) -> IO (IO (), Async ())
forall a b. (a -> b) -> a -> b
$
  Configuration
-> Trace IO a
-> (LocalFD -> IO Handle)
-> Snocket IO LocalFD LocalAddress
-> LocalAddress
-> IO (IO (), Async ())
forall a fd addr.
FromJSON a =>
Configuration
-> Trace IO a
-> (fd -> IO Handle)
-> Snocket IO fd addr
-> addr
-> IO (IO (), Async ())
acceptorForSnocket
    Configuration
config
    Trace IO a
trace
    LocalFD -> IO Handle
Snocket.localFDToHandle
    (IOManager -> String -> Snocket IO LocalFD LocalAddress
Snocket.localSnocket IOManager
iomgr String
pipePath)
    (String -> LocalAddress
Snocket.localAddressFromPath String
pipePath)

acceptorForAddress Configuration
config Trace IO a
trace IOManager
iomgr (RemoteSocket String
host String
port) = (String -> BackendFailure TraceAcceptor)
-> IO (IO (), Async ()) -> IO (IO (), Async ())
forall a. (String -> BackendFailure TraceAcceptor) -> IO a -> IO a
handleError String -> BackendFailure TraceAcceptor
String -> TraceAcceptorBackendFailure
TraceAcceptorSocketError (IO (IO (), Async ()) -> IO (IO (), Async ()))
-> IO (IO (), Async ()) -> IO (IO (), Async ())
forall a b. (a -> b) -> a -> b
$ do
  let sn :: SocketSnocket
sn = IOManager -> SocketSnocket
Snocket.socketSnocket IOManager
iomgr
  [AddrInfo]
ainfos <- Maybe AddrInfo -> Maybe String -> Maybe String -> IO [AddrInfo]
Socket.getAddrInfo Maybe AddrInfo
forall a. Maybe a
Nothing (String -> Maybe String
forall a. a -> Maybe a
Just String
host) (String -> Maybe String
forall a. a -> Maybe a
Just String
port)
  case [AddrInfo]
ainfos of
    [] -> TraceAcceptorBackendFailure -> IO (IO (), Async ())
forall e a. Exception e => e -> IO a
throwIO (String -> TraceAcceptorBackendFailure
TraceAcceptorSocketError (String
"bad socket address: " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
host String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
":" String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
port))
    AddrInfo
a:[AddrInfo]
_ -> Configuration
-> Trace IO a
-> (LocalFD -> IO Handle)
-> SocketSnocket
-> SockAddr
-> IO (IO (), Async ())
forall a fd addr.
FromJSON a =>
Configuration
-> Trace IO a
-> (fd -> IO Handle)
-> Snocket IO fd addr
-> addr
-> IO (IO (), Async ())
acceptorForSnocket
             Configuration
config
             Trace IO a
trace
             ((LocalFD -> IOMode -> IO Handle) -> IOMode -> LocalFD -> IO Handle
forall a b c. (a -> b -> c) -> b -> a -> c
flip LocalFD -> IOMode -> IO Handle
Socket.socketToHandle IOMode
IO.ReadWriteMode)
             SocketSnocket
sn
             (AddrInfo -> SockAddr
Socket.addrAddress AddrInfo
a)

acceptorForSnocket
  :: forall a fd addr. (FromJSON a)
  => Configuration
  -> Trace.Trace IO a
  -> (fd -> IO Handle)
  -> Snocket.Snocket IO fd addr
  -> addr
  -> IO (IO (), Async.Async ())
acceptorForSnocket :: Configuration
-> Trace IO a
-> (fd -> IO Handle)
-> Snocket IO fd addr
-> addr
-> IO (IO (), Async ())
acceptorForSnocket Configuration
config Trace IO a
trace fd -> IO Handle
toHandle Snocket IO fd addr
sn addr
addr = do
  fd
sock <- Snocket IO fd addr -> Maybe addr -> AddressFamily addr -> IO fd
forall fd addr.
Snocket IO fd addr -> Maybe addr -> AddressFamily addr -> IO fd
Snocket.mkListeningSocket Snocket IO fd addr
sn (addr -> Maybe addr
forall a. a -> Maybe a
Just addr
addr) (Snocket IO fd addr -> addr -> AddressFamily addr
forall (m :: * -> *) fd addr.
Snocket m fd addr -> addr -> AddressFamily addr
Snocket.addrFamily Snocket IO fd addr
sn addr
addr)
  Async ()
server <- IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
Async.async (IO () -> IO (Async ())) -> IO () -> IO (Async ())
forall a b. (a -> b) -> a -> b
$
    IO fd -> (fd -> IO ()) -> (fd -> IO ()) -> IO ()
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket (fd -> IO fd
forall (f :: * -> *) a. Applicative f => a -> f a
pure fd
sock) (Snocket IO fd addr -> fd -> IO ()
forall (m :: * -> *) fd addr. Snocket m fd addr -> fd -> m ()
Snocket.close Snocket IO fd addr
sn) ((fd -> IO ()) -> IO ()) -> (fd -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$
      \fd
sock -> Accept addr fd -> IO ()
acceptLoop (Accept addr fd -> IO ()) -> Accept addr fd -> IO ()
forall a b. (a -> b) -> a -> b
$ Snocket IO fd addr -> fd -> Accept addr fd
forall (m :: * -> *) fd addr.
Snocket m fd addr -> fd -> Accept addr fd
Snocket.accept Snocket IO fd addr
sn fd
sock
  (IO (), Async ()) -> IO (IO (), Async ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Snocket IO fd addr -> fd -> IO ()
forall (m :: * -> *) fd addr. Snocket m fd addr -> fd -> m ()
Snocket.close Snocket IO fd addr
sn fd
sock, Async ()
server)
 where
   acceptLoop :: Snocket.Accept addr fd -> IO ()
   acceptLoop :: Accept addr fd -> IO ()
acceptLoop (Snocket.Accept IO (fd, addr, Accept addr fd)
accept) = do
      (fd
cfd, addr
_caddr, Accept addr fd
k) <- IO (fd, addr, Accept addr fd)
accept
      Handle
h <- fd -> IO Handle
toHandle fd
cfd
      Async ()
_client <- IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
Async.async (IO () -> IO (Async ())) -> IO () -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ Configuration -> Trace IO a -> Handle -> IO ()
forall a.
FromJSON a =>
Configuration -> Trace IO a -> Handle -> IO ()
clientThread Configuration
config Trace IO a
trace Handle
h
      Accept addr fd -> IO ()
acceptLoop Accept addr fd
k

\end{code}

\subsubsection{Reading log items from the client}
\begin{code}
clientThread
  :: forall a. (FromJSON a)
  => Configuration
  -> Trace.Trace IO a
  -> Handle
  -> IO ()
clientThread :: Configuration -> Trace IO a -> Handle -> IO ()
clientThread Configuration
_config Trace IO a
sbtrace Handle
h = (String -> BackendFailure TraceAcceptor) -> IO () -> IO ()
forall a. (String -> BackendFailure TraceAcceptor) -> IO a -> IO a
handleError String -> BackendFailure TraceAcceptor
String -> TraceAcceptorBackendFailure
TraceAcceptorClientThreadError IO ()
pProc
  where
    {-@ lazy pProc @-}
    pProc :: IO ()
    pProc :: IO ()
pProc = do
      ByteString
hn <- Handle -> IO ByteString
BS.hGetLine Handle
h -- hostname
      ByteString
bs <- Handle -> IO ByteString
BS.hGetLine Handle
h -- payload
      Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (ByteString -> Bool
BS.null ByteString
bs) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
        let hname :: Text
hname = ByteString -> Text
decodeUtf8 ByteString
hn
        case ByteString -> Either String [LogObject a]
forall a. FromJSON a => ByteString -> Either String a
eitherDecodeStrict ByteString
bs of
          Right ([LogObject a]
logObjects :: [LogObject a]) ->
              (LogObject a -> IO ()) -> [LogObject a] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (\LogObject a
lo -> Trace IO a -> (Text, LogObject a) -> IO ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Trace IO a
sbtrace (LogObject a -> Text
forall a. LogObject a -> Text
loName LogObject a
lo, LogObject a
lo)) [LogObject a]
logObjects
          Left String
e -> do
              LOMeta
lometa0 <- Severity -> PrivacyAnnotation -> IO LOMeta
forall (m :: * -> *).
MonadIO m =>
Severity -> PrivacyAnnotation -> m LOMeta
mkLOMeta Severity
Warning PrivacyAnnotation
Public
              let trace :: Trace.Trace IO a
                  trace :: Trace IO a
trace = Text -> Trace IO a -> Trace IO a
forall (m :: * -> *) a. Text -> Trace m a -> Trace m a
Trace.appendName Text
"#external" Trace IO a
sbtrace
                  lometa :: LOMeta
lometa = LOMeta
lometa0 { hostname :: Text
hostname = Text
hname }
              Trace IO a -> (LOMeta, LOContent a) -> IO ()
forall (m :: * -> *) a.
MonadIO m =>
Trace m a -> (LOMeta, LOContent a) -> m ()
Trace.traceNamedObject Trace IO a
trace ((LOMeta, LOContent a) -> IO ())
-> IO (LOMeta, LOContent a) -> IO ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<<
                  (,) (LOMeta -> LOContent a -> (LOMeta, LOContent a))
-> IO LOMeta -> IO (LOContent a -> (LOMeta, LOContent a))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> LOMeta -> IO LOMeta
forall (f :: * -> *) a. Applicative f => a -> f a
pure LOMeta
lometa
                      IO (LOContent a -> (LOMeta, LOContent a))
-> IO (LOContent a) -> IO (LOMeta, LOContent a)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> LOContent a -> IO (LOContent a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Text -> LOContent a
forall a. Text -> LOContent a
LogError (Text -> LOContent a) -> Text -> LOContent a
forall a b. (a -> b) -> a -> b
$ Text
"Could not parse external log objects: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
pack String
e)
        IO ()
pProc
\end{code}