\subsection{Cardano.BM.Backend.TraceAcceptor}
\label{module:Cardano.BM.Backend.TraceAcceptor}
%if style == newcode
\begin{code}
{-# LANGUAGE BlockArguments #-}
{-# LANGUAGE CPP #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies #-}
{-# OPTIONS_GHC -Wextra #-}
#if !defined(mingw32_HOST_OS)
#define POSIX
#endif
module Cardano.BM.Backend.TraceAcceptor
( TraceAcceptor
, plugin
) where
import qualified Control.Concurrent.Async as Async
import Control.Exception
import Control.Monad (forM, unless)
import Data.Aeson (FromJSON, ToJSON, eitherDecodeStrict)
import qualified Data.ByteString as BS
import Data.Text (pack)
import Data.Text.Encoding (decodeUtf8)
import Data.Typeable (Typeable)
import qualified Network.Socket as Socket
import System.IO (Handle)
import qualified System.IO as IO
import Cardano.BM.Configuration
import Cardano.BM.Data.Backend
import Cardano.BM.Data.Configuration (RemoteAddr(..), RemoteAddrNamed(..))
import Cardano.BM.Data.LogItem
( LOContent (LogError), LOMeta (..), LogObject (..)
, PrivacyAnnotation (Public), loName, mkLOMeta
)
import Cardano.BM.Data.Severity (Severity (..))
import Cardano.BM.Data.Tracer (traceWith)
import Cardano.BM.IOManager
import Cardano.BM.Plugin
import qualified Cardano.BM.Snocket as Snocket
import qualified Cardano.BM.Trace as Trace
\end{code}
%endif
|TraceAcceptor| is a backend responsible for processing |LogObject|s of an
external process captured by a pipe or socket. At the time being it redirects
the |LogObject|s to the |SwitchBoard|.
\subsubsection{Plugin definition}
\begin{code}
plugin :: forall s a
. (IsEffectuator s a, ToJSON a, FromJSON a)
=> IOManager -> Configuration -> Trace.Trace IO a -> s a -> IO (Plugin a)
plugin :: IOManager -> Configuration -> Trace IO a -> s a -> IO (Plugin a)
plugin IOManager
iomgr Configuration
cf Trace IO a
basicTrace s a
_ = Configuration -> IO (Maybe [RemoteAddrNamed])
getAcceptAt Configuration
cf IO (Maybe [RemoteAddrNamed])
-> (Maybe [RemoteAddrNamed] -> IO (Plugin a)) -> IO (Plugin a)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Just [RemoteAddrNamed]
acceptors -> do
[(IO (), Async ())]
socketsNServers <- [RemoteAddrNamed]
-> (RemoteAddrNamed -> IO (IO (), Async ()))
-> IO [(IO (), Async ())]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [RemoteAddrNamed]
acceptors ((RemoteAddrNamed -> IO (IO (), Async ()))
-> IO [(IO (), Async ())])
-> (RemoteAddrNamed -> IO (IO (), Async ()))
-> IO [(IO (), Async ())]
forall a b. (a -> b) -> a -> b
$ \(RemoteAddrNamed Text
nodeName RemoteAddr
addr) -> do
let trace :: Trace IO a
trace = Text -> Trace IO a -> Trace IO a
forall (m :: * -> *) a. Text -> Trace m a -> Trace m a
Trace.appendName Text
nodeName Trace IO a
basicTrace
(IO ()
serverCleanup, Async ()
serverThr) <- Configuration
-> Trace IO a -> IOManager -> RemoteAddr -> IO (IO (), Async ())
forall a.
FromJSON a =>
Configuration
-> Trace IO a -> IOManager -> RemoteAddr -> IO (IO (), Async ())
acceptorForAddress Configuration
cf Trace IO a
trace IOManager
iomgr RemoteAddr
addr
Async () -> IO ()
forall a. Async a -> IO ()
Async.link Async ()
serverThr
(IO (), Async ()) -> IO (IO (), Async ())
forall (m :: * -> *) a. Monad m => a -> m a
return (IO ()
serverCleanup, Async ()
serverThr)
let ([IO ()]
cleanups, [Async ()]
servers) = [(IO (), Async ())] -> ([IO ()], [Async ()])
forall a b. [(a, b)] -> ([a], [b])
unzip [(IO (), Async ())]
socketsNServers
be :: (Cardano.BM.Backend.TraceAcceptor.TraceAcceptor a)
be :: TraceAcceptor a
be = TraceAcceptor :: forall a. [Async ()] -> IO () -> TraceAcceptor a
TraceAcceptor
{ taServers :: [Async ()]
taServers = [Async ()]
servers
, taShutdown :: IO ()
taShutdown = [IO ()] -> IO ()
forall (t :: * -> *) (m :: * -> *) a.
(Foldable t, Monad m) =>
t (m a) -> m ()
sequence_ [IO ()]
cleanups
}
Plugin a -> IO (Plugin a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Plugin a -> IO (Plugin a)) -> Plugin a -> IO (Plugin a)
forall a b. (a -> b) -> a -> b
$ Backend a -> BackendKind -> Plugin a
forall a. Backend a -> BackendKind -> Plugin a
BackendPlugin
(MkBackend :: forall a. (LogObject a -> IO ()) -> IO () -> Backend a
MkBackend { bEffectuate :: LogObject a -> IO ()
bEffectuate = TraceAcceptor a -> LogObject a -> IO ()
forall (t :: * -> *) a.
IsEffectuator t a =>
t a -> LogObject a -> IO ()
effectuate TraceAcceptor a
be
, bUnrealize :: IO ()
bUnrealize = TraceAcceptor a -> IO ()
forall (t :: * -> *) a. IsBackend t a => t a -> IO ()
unrealize TraceAcceptor a
be })
(TraceAcceptor a -> BackendKind
forall (t :: * -> *) a. IsBackend t a => t a -> BackendKind
bekind TraceAcceptor a
be)
Maybe [RemoteAddrNamed]
Nothing -> String -> IO (Plugin a)
forall (m :: * -> *) a. MonadFail m => String -> m a
fail String
"TraceAcceptor not configured: no traceAcceptAt option"
\end{code}
\subsubsection{Structure of TraceAcceptor}\label{code:TraceAcceptor}\index{TraceAcceptor}
\begin{code}
data TraceAcceptor a = TraceAcceptor
{ TraceAcceptor a -> [Async ()]
taServers :: [Async.Async ()]
, TraceAcceptor a -> IO ()
taShutdown :: IO ()
}
instance IsEffectuator TraceAcceptor a where
effectuate :: TraceAcceptor a -> LogObject a -> IO ()
effectuate TraceAcceptor a
_ta LogObject a
_item = () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
handleOverflow :: TraceAcceptor a -> IO ()
handleOverflow TraceAcceptor a
_ta = () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
instance (ToJSON a, FromJSON a) => IsBackend TraceAcceptor a where
type BackendFailure TraceAcceptor = TraceAcceptorBackendFailure
bekind :: TraceAcceptor a -> BackendKind
bekind TraceAcceptor a
_ = BackendKind
TraceAcceptorBK
realize :: Configuration -> IO (TraceAcceptor a)
realize Configuration
_ = String -> IO (TraceAcceptor a)
forall (m :: * -> *) a. MonadFail m => String -> m a
fail String
"TraceAcceptor cannot be instantiated by 'realize'"
realizefrom :: Configuration -> Trace IO a -> s a -> IO (TraceAcceptor a)
realizefrom Configuration
_ Trace IO a
_ s a
_ = String -> IO (TraceAcceptor a)
forall (m :: * -> *) a. MonadFail m => String -> m a
fail String
"TraceAcceptor cannot be instantiated by 'realizefrom'"
unrealize :: TraceAcceptor a -> IO ()
unrealize TraceAcceptor a
ta = do
(Async () -> IO ()) -> [Async ()] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ Async () -> IO ()
forall a. Async a -> IO ()
Async.cancel ([Async ()] -> IO ()) -> [Async ()] -> IO ()
forall a b. (a -> b) -> a -> b
$ TraceAcceptor a -> [Async ()]
forall a. TraceAcceptor a -> [Async ()]
taServers TraceAcceptor a
ta
TraceAcceptor a -> IO ()
forall a. TraceAcceptor a -> IO ()
taShutdown TraceAcceptor a
ta
handleError :: (String -> BackendFailure TraceAcceptor) -> IO a -> IO a
handleError :: (String -> BackendFailure TraceAcceptor) -> IO a -> IO a
handleError String -> BackendFailure TraceAcceptor
ctor = (IOException -> IO a) -> IO a -> IO a
forall e a. Exception e => (e -> IO a) -> IO a -> IO a
handle ((IOException -> IO a) -> IO a -> IO a)
-> (IOException -> IO a) -> IO a -> IO a
forall a b. (a -> b) -> a -> b
$ \(IOException
e :: IOException) -> TraceAcceptorBackendFailure -> IO a
forall e a. Exception e => e -> IO a
throwIO (TraceAcceptorBackendFailure -> IO a)
-> (IOException -> TraceAcceptorBackendFailure)
-> IOException
-> IO a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> BackendFailure TraceAcceptor
String -> TraceAcceptorBackendFailure
ctor (String -> TraceAcceptorBackendFailure)
-> (IOException -> String)
-> IOException
-> TraceAcceptorBackendFailure
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IOException -> String
forall a. Show a => a -> String
show (IOException -> IO a) -> IOException -> IO a
forall a b. (a -> b) -> a -> b
$ IOException
e
data TraceAcceptorBackendFailure
= TraceAcceptorPipeError String
| TraceAcceptorSocketError String
| TraceAcceptorServerError String
| TraceAcceptorClientThreadError String
deriving (Int -> TraceAcceptorBackendFailure -> ShowS
[TraceAcceptorBackendFailure] -> ShowS
TraceAcceptorBackendFailure -> String
(Int -> TraceAcceptorBackendFailure -> ShowS)
-> (TraceAcceptorBackendFailure -> String)
-> ([TraceAcceptorBackendFailure] -> ShowS)
-> Show TraceAcceptorBackendFailure
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [TraceAcceptorBackendFailure] -> ShowS
$cshowList :: [TraceAcceptorBackendFailure] -> ShowS
show :: TraceAcceptorBackendFailure -> String
$cshow :: TraceAcceptorBackendFailure -> String
showsPrec :: Int -> TraceAcceptorBackendFailure -> ShowS
$cshowsPrec :: Int -> TraceAcceptorBackendFailure -> ShowS
Show, Typeable)
instance Exception TraceAcceptorBackendFailure
acceptorForAddress
:: FromJSON a
=> Configuration
-> Trace.Trace IO a
-> IOManager
-> RemoteAddr
-> IO (IO (), Async.Async ())
acceptorForAddress :: Configuration
-> Trace IO a -> IOManager -> RemoteAddr -> IO (IO (), Async ())
acceptorForAddress Configuration
config Trace IO a
trace IOManager
iomgr (RemotePipe String
pipePath) =
(String -> BackendFailure TraceAcceptor)
-> IO (IO (), Async ()) -> IO (IO (), Async ())
forall a. (String -> BackendFailure TraceAcceptor) -> IO a -> IO a
handleError String -> BackendFailure TraceAcceptor
String -> TraceAcceptorBackendFailure
TraceAcceptorPipeError (IO (IO (), Async ()) -> IO (IO (), Async ()))
-> IO (IO (), Async ()) -> IO (IO (), Async ())
forall a b. (a -> b) -> a -> b
$
Configuration
-> Trace IO a
-> (LocalFD -> IO Handle)
-> Snocket IO LocalFD LocalAddress
-> LocalAddress
-> IO (IO (), Async ())
forall a fd addr.
FromJSON a =>
Configuration
-> Trace IO a
-> (fd -> IO Handle)
-> Snocket IO fd addr
-> addr
-> IO (IO (), Async ())
acceptorForSnocket
Configuration
config
Trace IO a
trace
LocalFD -> IO Handle
Snocket.localFDToHandle
(IOManager -> String -> Snocket IO LocalFD LocalAddress
Snocket.localSnocket IOManager
iomgr String
pipePath)
(String -> LocalAddress
Snocket.localAddressFromPath String
pipePath)
acceptorForAddress Configuration
config Trace IO a
trace IOManager
iomgr (RemoteSocket String
host String
port) = (String -> BackendFailure TraceAcceptor)
-> IO (IO (), Async ()) -> IO (IO (), Async ())
forall a. (String -> BackendFailure TraceAcceptor) -> IO a -> IO a
handleError String -> BackendFailure TraceAcceptor
String -> TraceAcceptorBackendFailure
TraceAcceptorSocketError (IO (IO (), Async ()) -> IO (IO (), Async ()))
-> IO (IO (), Async ()) -> IO (IO (), Async ())
forall a b. (a -> b) -> a -> b
$ do
let sn :: SocketSnocket
sn = IOManager -> SocketSnocket
Snocket.socketSnocket IOManager
iomgr
[AddrInfo]
ainfos <- Maybe AddrInfo -> Maybe String -> Maybe String -> IO [AddrInfo]
Socket.getAddrInfo Maybe AddrInfo
forall a. Maybe a
Nothing (String -> Maybe String
forall a. a -> Maybe a
Just String
host) (String -> Maybe String
forall a. a -> Maybe a
Just String
port)
case [AddrInfo]
ainfos of
[] -> TraceAcceptorBackendFailure -> IO (IO (), Async ())
forall e a. Exception e => e -> IO a
throwIO (String -> TraceAcceptorBackendFailure
TraceAcceptorSocketError (String
"bad socket address: " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
host String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
":" String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
port))
AddrInfo
a:[AddrInfo]
_ -> Configuration
-> Trace IO a
-> (LocalFD -> IO Handle)
-> SocketSnocket
-> SockAddr
-> IO (IO (), Async ())
forall a fd addr.
FromJSON a =>
Configuration
-> Trace IO a
-> (fd -> IO Handle)
-> Snocket IO fd addr
-> addr
-> IO (IO (), Async ())
acceptorForSnocket
Configuration
config
Trace IO a
trace
((LocalFD -> IOMode -> IO Handle) -> IOMode -> LocalFD -> IO Handle
forall a b c. (a -> b -> c) -> b -> a -> c
flip LocalFD -> IOMode -> IO Handle
Socket.socketToHandle IOMode
IO.ReadWriteMode)
SocketSnocket
sn
(AddrInfo -> SockAddr
Socket.addrAddress AddrInfo
a)
acceptorForSnocket
:: forall a fd addr. (FromJSON a)
=> Configuration
-> Trace.Trace IO a
-> (fd -> IO Handle)
-> Snocket.Snocket IO fd addr
-> addr
-> IO (IO (), Async.Async ())
acceptorForSnocket :: Configuration
-> Trace IO a
-> (fd -> IO Handle)
-> Snocket IO fd addr
-> addr
-> IO (IO (), Async ())
acceptorForSnocket Configuration
config Trace IO a
trace fd -> IO Handle
toHandle Snocket IO fd addr
sn addr
addr = do
fd
sock <- Snocket IO fd addr -> Maybe addr -> AddressFamily addr -> IO fd
forall fd addr.
Snocket IO fd addr -> Maybe addr -> AddressFamily addr -> IO fd
Snocket.mkListeningSocket Snocket IO fd addr
sn (addr -> Maybe addr
forall a. a -> Maybe a
Just addr
addr) (Snocket IO fd addr -> addr -> AddressFamily addr
forall (m :: * -> *) fd addr.
Snocket m fd addr -> addr -> AddressFamily addr
Snocket.addrFamily Snocket IO fd addr
sn addr
addr)
Async ()
server <- IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
Async.async (IO () -> IO (Async ())) -> IO () -> IO (Async ())
forall a b. (a -> b) -> a -> b
$
IO fd -> (fd -> IO ()) -> (fd -> IO ()) -> IO ()
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket (fd -> IO fd
forall (f :: * -> *) a. Applicative f => a -> f a
pure fd
sock) (Snocket IO fd addr -> fd -> IO ()
forall (m :: * -> *) fd addr. Snocket m fd addr -> fd -> m ()
Snocket.close Snocket IO fd addr
sn) ((fd -> IO ()) -> IO ()) -> (fd -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$
\fd
sock -> Accept addr fd -> IO ()
acceptLoop (Accept addr fd -> IO ()) -> Accept addr fd -> IO ()
forall a b. (a -> b) -> a -> b
$ Snocket IO fd addr -> fd -> Accept addr fd
forall (m :: * -> *) fd addr.
Snocket m fd addr -> fd -> Accept addr fd
Snocket.accept Snocket IO fd addr
sn fd
sock
(IO (), Async ()) -> IO (IO (), Async ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Snocket IO fd addr -> fd -> IO ()
forall (m :: * -> *) fd addr. Snocket m fd addr -> fd -> m ()
Snocket.close Snocket IO fd addr
sn fd
sock, Async ()
server)
where
acceptLoop :: Snocket.Accept addr fd -> IO ()
acceptLoop :: Accept addr fd -> IO ()
acceptLoop (Snocket.Accept IO (fd, addr, Accept addr fd)
accept) = do
(fd
cfd, addr
_caddr, Accept addr fd
k) <- IO (fd, addr, Accept addr fd)
accept
Handle
h <- fd -> IO Handle
toHandle fd
cfd
Async ()
_client <- IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
Async.async (IO () -> IO (Async ())) -> IO () -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ Configuration -> Trace IO a -> Handle -> IO ()
forall a.
FromJSON a =>
Configuration -> Trace IO a -> Handle -> IO ()
clientThread Configuration
config Trace IO a
trace Handle
h
Accept addr fd -> IO ()
acceptLoop Accept addr fd
k
\end{code}
\subsubsection{Reading log items from the client}
\begin{code}
clientThread
:: forall a. (FromJSON a)
=> Configuration
-> Trace.Trace IO a
-> Handle
-> IO ()
clientThread :: Configuration -> Trace IO a -> Handle -> IO ()
clientThread Configuration
_config Trace IO a
sbtrace Handle
h = (String -> BackendFailure TraceAcceptor) -> IO () -> IO ()
forall a. (String -> BackendFailure TraceAcceptor) -> IO a -> IO a
handleError String -> BackendFailure TraceAcceptor
String -> TraceAcceptorBackendFailure
TraceAcceptorClientThreadError IO ()
pProc
where
pProc :: IO ()
pProc :: IO ()
pProc = do
ByteString
hn <- Handle -> IO ByteString
BS.hGetLine Handle
h
ByteString
bs <- Handle -> IO ByteString
BS.hGetLine Handle
h
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (ByteString -> Bool
BS.null ByteString
bs) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
let hname :: Text
hname = ByteString -> Text
decodeUtf8 ByteString
hn
case ByteString -> Either String [LogObject a]
forall a. FromJSON a => ByteString -> Either String a
eitherDecodeStrict ByteString
bs of
Right ([LogObject a]
logObjects :: [LogObject a]) ->
(LogObject a -> IO ()) -> [LogObject a] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (\LogObject a
lo -> Trace IO a -> (Text, LogObject a) -> IO ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Trace IO a
sbtrace (LogObject a -> Text
forall a. LogObject a -> Text
loName LogObject a
lo, LogObject a
lo)) [LogObject a]
logObjects
Left String
e -> do
LOMeta
lometa0 <- Severity -> PrivacyAnnotation -> IO LOMeta
forall (m :: * -> *).
MonadIO m =>
Severity -> PrivacyAnnotation -> m LOMeta
mkLOMeta Severity
Warning PrivacyAnnotation
Public
let trace :: Trace.Trace IO a
trace :: Trace IO a
trace = Text -> Trace IO a -> Trace IO a
forall (m :: * -> *) a. Text -> Trace m a -> Trace m a
Trace.appendName Text
"#external" Trace IO a
sbtrace
lometa :: LOMeta
lometa = LOMeta
lometa0 { hostname :: Text
hostname = Text
hname }
Trace IO a -> (LOMeta, LOContent a) -> IO ()
forall (m :: * -> *) a.
MonadIO m =>
Trace m a -> (LOMeta, LOContent a) -> m ()
Trace.traceNamedObject Trace IO a
trace ((LOMeta, LOContent a) -> IO ())
-> IO (LOMeta, LOContent a) -> IO ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<<
(,) (LOMeta -> LOContent a -> (LOMeta, LOContent a))
-> IO LOMeta -> IO (LOContent a -> (LOMeta, LOContent a))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> LOMeta -> IO LOMeta
forall (f :: * -> *) a. Applicative f => a -> f a
pure LOMeta
lometa
IO (LOContent a -> (LOMeta, LOContent a))
-> IO (LOContent a) -> IO (LOMeta, LOContent a)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> LOContent a -> IO (LOContent a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Text -> LOContent a
forall a. Text -> LOContent a
LogError (Text -> LOContent a) -> Text -> LOContent a
forall a b. (a -> b) -> a -> b
$ Text
"Could not parse external log objects: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
pack String
e)
IO ()
pProc
\end{code}