{-# LANGUAGE DataKinds         #-}
{-# OPTIONS_HADDOCK prune #-}
{-# LANGUAGE OverloadedStrings #-}
module Striot.Nodes ( nodeSource
                    , nodeLink
                    , nodeLink2
                    , nodeSink
                    , nodeSink2
                    , nodeSimple

                    , defaultConfig
                    , defaultSource
                    , defaultLink
                    , defaultSink

                    , mkStream
                    , unStream

                    ) where

import           Control.Concurrent.Async                      (async)
import           Control.Concurrent.Chan.Unagi.Bounded         as U
import           Control.Lens
import           Control.Monad.IO.Class
import           Control.Monad.Reader
import           Data.IORef
import           Data.Maybe
import           Data.Store                                    (Store)
import           Data.Text                                     as T (pack)
import           Data.Time                                     (getCurrentTime)
import           Network.Socket                                (HostName,
                                                                ServiceName)
import           Striot.FunctionalIoTtypes
import           Striot.Nodes.Kafka
import           Striot.Nodes.MQTT
import           Striot.Nodes.TCP
import           Striot.Nodes.Types                            hiding (nc,
                                                                readConf,
                                                                writeConf)
import           System.IO
import           System.IO.Unsafe
import           System.Metrics.Prometheus.Concurrent.Registry as PR (new, registerCounter,
                                                                      registerGauge,
                                                                      sample)
import           System.Metrics.Prometheus.Http.Scrape         (serveMetrics)
import           System.Metrics.Prometheus.Metric.Counter      as PC (inc)
import           System.Metrics.Prometheus.MetricId            (addLabel)


-- NODE FUNCTIONS

nodeSource :: (Store alpha, Store beta)
           => StriotConfig
           -> IO alpha
           -> (Stream alpha -> Stream beta)
           -> IO ()
nodeSource :: forall alpha beta.
(Store alpha, Store beta) =>
StriotConfig -> IO alpha -> (Stream alpha -> Stream beta) -> IO ()
nodeSource StriotConfig
config IO alpha
iofn Stream alpha -> Stream beta
streamOp =
    ReaderT StriotConfig IO () -> StriotConfig -> IO ()
forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT (StriotApp () -> ReaderT StriotConfig IO ()
forall a. StriotApp a -> ReaderT StriotConfig IO a
unApp (StriotApp () -> ReaderT StriotConfig IO ())
-> StriotApp () -> ReaderT StriotConfig IO ()
forall a b. (a -> b) -> a -> b
$ IO alpha -> (Stream alpha -> Stream beta) -> StriotApp ()
forall alpha beta r (m :: * -> *).
(Store alpha, Store beta, MonadReader r m, HasStriotConfig r,
 MonadIO m) =>
IO alpha -> (Stream alpha -> Stream beta) -> m ()
nodeSource' IO alpha
iofn Stream alpha -> Stream beta
streamOp) StriotConfig
config


nodeSource' :: (Store alpha, Store beta,
               MonadReader r m,
               HasStriotConfig r,
               MonadIO m)
            => IO alpha
            -> (Stream alpha -> Stream beta) -> m ()
nodeSource' :: forall alpha beta r (m :: * -> *).
(Store alpha, Store beta, MonadReader r m, HasStriotConfig r,
 MonadIO m) =>
IO alpha -> (Stream alpha -> Stream beta) -> m ()
nodeSource' IO alpha
iofn Stream alpha -> Stream beta
streamOp = do
    r
c <- m r
forall r (m :: * -> *). MonadReader r m => m r
ask
    Metrics
metrics <- IO Metrics -> m Metrics
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Metrics -> m Metrics) -> IO Metrics -> m Metrics
forall a b. (a -> b) -> a -> b
$ String -> IO Metrics
startPrometheus (r
c r -> Getting String r String -> String
forall s a. s -> Getting a s a -> a
^. Getting String r String
forall c. HasStriotConfig c => Lens' c String
nodeName)
    let iofnAndMetrics :: IO alpha
iofnAndMetrics = Counter -> IO ()
PC.inc (Metrics -> Counter
_ingressEvents Metrics
metrics) IO () -> IO alpha -> IO alpha
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO alpha
iofn
    Stream alpha
stream <- IO (Stream alpha) -> m (Stream alpha)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Stream alpha) -> m (Stream alpha))
-> IO (Stream alpha) -> m (Stream alpha)
forall a b. (a -> b) -> a -> b
$ IO alpha -> IO (Stream alpha)
forall alpha. IO alpha -> IO (Stream alpha)
readListFromSource IO alpha
iofnAndMetrics
    let result :: Stream beta
result = Stream alpha -> Stream beta
streamOp Stream alpha
stream
    Metrics -> Stream beta -> m ()
forall alpha r (m :: * -> *).
(Store alpha, MonadReader r m, HasStriotConfig r, MonadIO m) =>
Metrics -> Stream alpha -> m ()
sendStream Metrics
metrics Stream beta
result


nodeLink :: (Store alpha, Store beta)
         => StriotConfig
         -> (Stream alpha -> Stream beta)
         -> IO ()
nodeLink :: forall alpha beta.
(Store alpha, Store beta) =>
StriotConfig -> (Stream alpha -> Stream beta) -> IO ()
nodeLink StriotConfig
config Stream alpha -> Stream beta
streamOp =
    ReaderT StriotConfig IO () -> StriotConfig -> IO ()
forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT (StriotApp () -> ReaderT StriotConfig IO ()
forall a. StriotApp a -> ReaderT StriotConfig IO a
unApp (StriotApp () -> ReaderT StriotConfig IO ())
-> StriotApp () -> ReaderT StriotConfig IO ()
forall a b. (a -> b) -> a -> b
$ (Stream alpha -> Stream beta) -> StriotApp ()
forall alpha beta r (m :: * -> *).
(Store alpha, Store beta, MonadReader r m, HasStriotConfig r,
 MonadIO m) =>
(Stream alpha -> Stream beta) -> m ()
nodeLink' Stream alpha -> Stream beta
streamOp) StriotConfig
config


nodeLink' :: (Store alpha, Store beta,
             MonadReader r m,
             HasStriotConfig r,
             MonadIO m)
          => (Stream alpha -> Stream beta)
          -> m ()
nodeLink' :: forall alpha beta r (m :: * -> *).
(Store alpha, Store beta, MonadReader r m, HasStriotConfig r,
 MonadIO m) =>
(Stream alpha -> Stream beta) -> m ()
nodeLink' Stream alpha -> Stream beta
streamOp = do
    r
c <- m r
forall r (m :: * -> *). MonadReader r m => m r
ask
    Metrics
metrics <- IO Metrics -> m Metrics
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Metrics -> m Metrics) -> IO Metrics -> m Metrics
forall a b. (a -> b) -> a -> b
$ String -> IO Metrics
startPrometheus (r
c r -> Getting String r String -> String
forall s a. s -> Getting a s a -> a
^. Getting String r String
forall c. HasStriotConfig c => Lens' c String
nodeName)
    Stream alpha
stream <- Metrics -> m (Stream alpha)
forall alpha r (m :: * -> *).
(Store alpha, MonadReader r m, HasStriotConfig r, MonadIO m) =>
Metrics -> m (Stream alpha)
processInput Metrics
metrics
    let result :: Stream beta
result = Stream alpha -> Stream beta
streamOp Stream alpha
stream
    Metrics -> Stream beta -> m ()
forall alpha r (m :: * -> *).
(Store alpha, MonadReader r m, HasStriotConfig r, MonadIO m) =>
Metrics -> Stream alpha -> m ()
sendStream Metrics
metrics Stream beta
result


-- Old style configless link with 2 inputs
nodeLink2 :: (Store alpha, Store beta, Store gamma)
          => (Stream alpha -> Stream beta -> Stream gamma)
          -> ServiceName
          -> ServiceName
          -> HostName
          -> ServiceName
          -> IO ()
nodeLink2 :: forall alpha beta gamma.
(Store alpha, Store beta, Store gamma) =>
(Stream alpha -> Stream beta -> Stream gamma)
-> String -> String -> String -> String -> IO ()
nodeLink2 Stream alpha -> Stream beta -> Stream gamma
streamOp String
inputPort1 String
inputPort2 String
outputHost String
outputPort = do
    let nodeName :: String
nodeName = String
"node-link"
        (ConnTCPConfig TCPConfig
ic1) = String -> String -> ConnectionConfig
tcpConfig String
"" String
inputPort1
        (ConnTCPConfig TCPConfig
ic2) = String -> String -> ConnectionConfig
tcpConfig String
"" String
inputPort2
        (ConnTCPConfig TCPConfig
ec)  = String -> String -> ConnectionConfig
tcpConfig String
outputHost String
outputPort
    Metrics
metrics <- String -> IO Metrics
startPrometheus String
nodeName
    String -> IO ()
putStrLn String
"Starting link ..."
    Stream alpha
stream1 <- String -> TCPConfig -> Metrics -> IO (Stream alpha)
forall alpha.
Store alpha =>
String -> TCPConfig -> Metrics -> IO (Stream alpha)
processSocket String
nodeName TCPConfig
ic1 Metrics
metrics
    Stream beta
stream2 <- String -> TCPConfig -> Metrics -> IO (Stream beta)
forall alpha.
Store alpha =>
String -> TCPConfig -> Metrics -> IO (Stream alpha)
processSocket String
nodeName TCPConfig
ic2 Metrics
metrics
    let result :: Stream gamma
result = Stream alpha -> Stream beta -> Stream gamma
streamOp Stream alpha
stream1 Stream beta
stream2
    String -> TCPConfig -> Metrics -> Stream gamma -> IO ()
forall alpha.
Store alpha =>
String -> TCPConfig -> Metrics -> Stream alpha -> IO ()
sendStreamTCP String
nodeName TCPConfig
ec Metrics
metrics Stream gamma
result


nodeSink :: (Store alpha, Store beta)
         => StriotConfig
         -> (Stream alpha -> Stream beta)
         -> (Stream beta -> IO ())
         -> IO ()
nodeSink :: forall alpha beta.
(Store alpha, Store beta) =>
StriotConfig
-> (Stream alpha -> Stream beta) -> (Stream beta -> IO ()) -> IO ()
nodeSink StriotConfig
config Stream alpha -> Stream beta
streamOp Stream beta -> IO ()
iofn =
    ReaderT StriotConfig IO () -> StriotConfig -> IO ()
forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT (StriotApp () -> ReaderT StriotConfig IO ()
forall a. StriotApp a -> ReaderT StriotConfig IO a
unApp (StriotApp () -> ReaderT StriotConfig IO ())
-> StriotApp () -> ReaderT StriotConfig IO ()
forall a b. (a -> b) -> a -> b
$ (Stream alpha -> Stream beta)
-> (Stream beta -> IO ()) -> StriotApp ()
forall alpha beta r (m :: * -> *).
(Store alpha, Store beta, MonadReader r m, HasStriotConfig r,
 MonadIO m) =>
(Stream alpha -> Stream beta) -> (Stream beta -> IO ()) -> m ()
nodeSink' Stream alpha -> Stream beta
streamOp Stream beta -> IO ()
iofn) StriotConfig
config


nodeSink' :: (Store alpha, Store beta,
             MonadReader r m,
             HasStriotConfig r,
             MonadIO m)
          => (Stream alpha -> Stream beta)
          -> (Stream beta -> IO ())
          -> m ()
nodeSink' :: forall alpha beta r (m :: * -> *).
(Store alpha, Store beta, MonadReader r m, HasStriotConfig r,
 MonadIO m) =>
(Stream alpha -> Stream beta) -> (Stream beta -> IO ()) -> m ()
nodeSink' Stream alpha -> Stream beta
streamOp Stream beta -> IO ()
iofn = do
    r
c <- m r
forall r (m :: * -> *). MonadReader r m => m r
ask
    Metrics
metrics <- IO Metrics -> m Metrics
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Metrics -> m Metrics) -> IO Metrics -> m Metrics
forall a b. (a -> b) -> a -> b
$ String -> IO Metrics
startPrometheus (r
c r -> Getting String r String -> String
forall s a. s -> Getting a s a -> a
^. Getting String r String
forall c. HasStriotConfig c => Lens' c String
nodeName)
    Stream alpha
stream <- Metrics -> m (Stream alpha)
forall alpha r (m :: * -> *).
(Store alpha, MonadReader r m, HasStriotConfig r, MonadIO m) =>
Metrics -> m (Stream alpha)
processInput Metrics
metrics
    let result :: Stream beta
result = Stream alpha -> Stream beta
streamOp Stream alpha
stream
    IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Stream beta -> IO ()
iofn Stream beta
result


-- Old style configless sink with 2 inputs
nodeSink2 :: (Store alpha, Store beta, Store gamma)
          => (Stream alpha -> Stream beta -> Stream gamma)
          -> (Stream gamma -> IO ())
          -> ServiceName
          -> ServiceName
          -> IO ()
nodeSink2 :: forall alpha beta gamma.
(Store alpha, Store beta, Store gamma) =>
(Stream alpha -> Stream beta -> Stream gamma)
-> (Stream gamma -> IO ()) -> String -> String -> IO ()
nodeSink2 Stream alpha -> Stream beta -> Stream gamma
streamOp Stream gamma -> IO ()
iofn String
inputPort1 String
inputPort2 = do
    let nodeName :: String
nodeName = String
"node-sink"
        (ConnTCPConfig TCPConfig
ic1) = String -> String -> ConnectionConfig
tcpConfig String
"" String
inputPort1
        (ConnTCPConfig TCPConfig
ic2) = String -> String -> ConnectionConfig
tcpConfig String
"" String
inputPort2
    Metrics
metrics <- String -> IO Metrics
startPrometheus String
nodeName
    String -> IO ()
putStrLn String
"Starting sink ..."
    Stream alpha
stream1 <- String -> TCPConfig -> Metrics -> IO (Stream alpha)
forall alpha.
Store alpha =>
String -> TCPConfig -> Metrics -> IO (Stream alpha)
processSocket String
nodeName TCPConfig
ic1 Metrics
metrics
    Stream beta
stream2 <- String -> TCPConfig -> Metrics -> IO (Stream beta)
forall alpha.
Store alpha =>
String -> TCPConfig -> Metrics -> IO (Stream alpha)
processSocket String
nodeName TCPConfig
ic2 Metrics
metrics
    let result :: Stream gamma
result = Stream alpha -> Stream beta -> Stream gamma
streamOp Stream alpha
stream1 Stream beta
stream2
    Stream gamma -> IO ()
iofn Stream gamma
result

-- | a simple source-sink combined function for single-Node
-- deployments.
-- The first argument is a Source function to create data. The second is the pure
-- stream-processing function. The third is a Sink function to operate on the
-- processed Stream.
nodeSimple :: (IO a) -> (Stream a -> Stream b) -> (Stream b -> IO()) -> IO ()
nodeSimple :: forall a b.
IO a -> (Stream a -> Stream b) -> (Stream b -> IO ()) -> IO ()
nodeSimple IO a
src Stream a -> Stream b
proc Stream b -> IO ()
sink = Stream b -> IO ()
sink (Stream b -> IO ()) -> (Stream a -> Stream b) -> Stream a -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Stream a -> Stream b
proc (Stream a -> IO ()) -> IO (Stream a) -> IO ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< IO a -> IO (Stream a)
forall alpha. IO alpha -> IO (Stream alpha)
readListFromSource IO a
src

--- CONFIG FUNCTIONS ---

defaultConfig :: String
              -> HostName
              -> ServiceName
              -> HostName
              -> ServiceName
              -> StriotConfig
defaultConfig :: String -> String -> String -> String -> String -> StriotConfig
defaultConfig = ConnectProtocol
-> ConnectProtocol
-> String
-> String
-> String
-> String
-> String
-> StriotConfig
defaultConfig' ConnectProtocol
TCP ConnectProtocol
TCP


defaultSource :: HostName -> ServiceName -> StriotConfig
defaultSource :: String -> String -> StriotConfig
defaultSource = String -> String -> String -> String -> String -> StriotConfig
defaultConfig String
"striot-source" String
"" String
""


defaultLink :: ServiceName -> HostName -> ServiceName -> StriotConfig
defaultLink :: String -> String -> String -> StriotConfig
defaultLink = String -> String -> String -> String -> String -> StriotConfig
defaultConfig String
"striot-link" String
""


defaultSink :: ServiceName -> StriotConfig
defaultSink :: String -> StriotConfig
defaultSink String
port = String -> String -> String -> String -> String -> StriotConfig
defaultConfig String
"striot-sink" String
"" String
port String
"" String
""


defaultConfig' :: ConnectProtocol
               -> ConnectProtocol
               -> String
               -> HostName
               -> ServiceName
               -> HostName
               -> ServiceName
               -> StriotConfig
defaultConfig' :: ConnectProtocol
-> ConnectProtocol
-> String
-> String
-> String
-> String
-> String
-> StriotConfig
defaultConfig' ConnectProtocol
ict ConnectProtocol
ect String
name String
inHost String
inPort String
outHost String
outPort =
    let ccf :: ConnectProtocol -> String -> String -> ConnectionConfig
ccf ConnectProtocol
ct = case ConnectProtocol
ct of
                    ConnectProtocol
TCP   -> String -> String -> ConnectionConfig
tcpConfig
                    ConnectProtocol
KAFKA -> String -> String -> ConnectionConfig
defaultKafkaConfig
                    ConnectProtocol
MQTT  -> String -> String -> ConnectionConfig
defaultMqttConfig
    in  String -> ConnectionConfig -> ConnectionConfig -> StriotConfig
baseConfig String
name (ConnectProtocol -> String -> String -> ConnectionConfig
ccf ConnectProtocol
ict String
inHost String
inPort) (ConnectProtocol -> String -> String -> ConnectionConfig
ccf ConnectProtocol
ect String
outHost String
outPort)


baseConfig :: String -> ConnectionConfig -> ConnectionConfig -> StriotConfig
baseConfig :: String -> ConnectionConfig -> ConnectionConfig -> StriotConfig
baseConfig String
name ConnectionConfig
icc ConnectionConfig
ecc =
    StriotConfig :: String
-> ConnectionConfig -> ConnectionConfig -> Int -> StriotConfig
StriotConfig
        { _nodeName :: String
_nodeName          = String
name
        , _ingressConnConfig :: ConnectionConfig
_ingressConnConfig = ConnectionConfig
icc
        , _egressConnConfig :: ConnectionConfig
_egressConnConfig  = ConnectionConfig
ecc
        , _chanSize :: Int
_chanSize          = Int
10
        }


tcpConfig :: HostName -> ServiceName -> ConnectionConfig
tcpConfig :: String -> String -> ConnectionConfig
tcpConfig String
host String
port =
    TCPConfig -> ConnectionConfig
ConnTCPConfig (TCPConfig -> ConnectionConfig) -> TCPConfig -> ConnectionConfig
forall a b. (a -> b) -> a -> b
$ NetConfig -> TCPConfig
TCPConfig (NetConfig -> TCPConfig) -> NetConfig -> TCPConfig
forall a b. (a -> b) -> a -> b
$ String -> String -> NetConfig
NetConfig String
host String
port


kafkaConfig :: String -> String -> HostName -> ServiceName -> ConnectionConfig
kafkaConfig :: String -> String -> String -> String -> ConnectionConfig
kafkaConfig String
topic String
conGroup String
host String
port =
    KafkaConfig -> ConnectionConfig
ConnKafkaConfig (KafkaConfig -> ConnectionConfig)
-> KafkaConfig -> ConnectionConfig
forall a b. (a -> b) -> a -> b
$ NetConfig -> String -> String -> KafkaConfig
KafkaConfig (String -> String -> NetConfig
NetConfig String
host String
port) String
topic String
conGroup


defaultKafkaConfig :: HostName -> ServiceName -> ConnectionConfig
defaultKafkaConfig :: String -> String -> ConnectionConfig
defaultKafkaConfig = String -> String -> String -> String -> ConnectionConfig
kafkaConfig String
"striot-queue" String
"striot_consumer_group"


mqttConfig :: String -> HostName -> ServiceName -> ConnectionConfig
mqttConfig :: String -> String -> String -> ConnectionConfig
mqttConfig String
topic String
host String
port =
    MQTTConfig -> ConnectionConfig
ConnMQTTConfig (MQTTConfig -> ConnectionConfig) -> MQTTConfig -> ConnectionConfig
forall a b. (a -> b) -> a -> b
$ NetConfig -> String -> MQTTConfig
MQTTConfig (String -> String -> NetConfig
NetConfig String
host String
port) String
topic


defaultMqttConfig :: HostName -> ServiceName -> ConnectionConfig
defaultMqttConfig :: String -> String -> ConnectionConfig
defaultMqttConfig = String -> String -> String -> ConnectionConfig
mqttConfig String
"StriotQueue"


--- INTERNAL OPS ---

processInput :: (Store alpha,
                MonadReader r m,
                HasStriotConfig r,
                MonadIO m)
             => Metrics
             -> m (Stream alpha)
processInput :: forall alpha r (m :: * -> *).
(Store alpha, MonadReader r m, HasStriotConfig r, MonadIO m) =>
Metrics -> m (Stream alpha)
processInput Metrics
metrics = Metrics -> m (OutChan (Event alpha))
forall alpha r (m :: * -> *).
(Store alpha, MonadReader r m, HasStriotConfig r, MonadIO m) =>
Metrics -> m (OutChan (Event alpha))
connectDispatch Metrics
metrics m (OutChan (Event alpha))
-> (OutChan (Event alpha) -> m [Event alpha]) -> m [Event alpha]
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (IO [Event alpha] -> m [Event alpha]
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO [Event alpha] -> m [Event alpha])
-> (OutChan (Event alpha) -> IO [Event alpha])
-> OutChan (Event alpha)
-> m [Event alpha]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. OutChan (Event alpha) -> IO [Event alpha]
forall a. OutChan a -> IO [a]
U.getChanContents)


connectDispatch :: (Store alpha,
                   MonadReader r m,
                   HasStriotConfig r,
                   MonadIO m)
                => Metrics
                -> m (U.OutChan (Event alpha))
connectDispatch :: forall alpha r (m :: * -> *).
(Store alpha, MonadReader r m, HasStriotConfig r, MonadIO m) =>
Metrics -> m (OutChan (Event alpha))
connectDispatch Metrics
metrics = do
    r
c <- m r
forall r (m :: * -> *). MonadReader r m => m r
ask
    IO (OutChan (Event alpha)) -> m (OutChan (Event alpha))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (OutChan (Event alpha)) -> m (OutChan (Event alpha)))
-> IO (OutChan (Event alpha)) -> m (OutChan (Event alpha))
forall a b. (a -> b) -> a -> b
$ do
        (InChan (Event alpha)
inChan, OutChan (Event alpha)
outChan) <- Int -> IO (InChan (Event alpha), OutChan (Event alpha))
forall a. Int -> IO (InChan a, OutChan a)
U.newChan (r
c r -> Getting Int r Int -> Int
forall s a. s -> Getting a s a -> a
^. Getting Int r Int
forall c. HasStriotConfig c => Lens' c Int
chanSize)
        IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (IO () -> IO (Async ())) -> IO () -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ String
-> ConnectionConfig -> Metrics -> InChan (Event alpha) -> IO ()
forall alpha (m :: * -> *).
(Store alpha, MonadIO m) =>
String
-> ConnectionConfig -> Metrics -> InChan (Event alpha) -> m ()
connectDispatch' (r
c r -> Getting String r String -> String
forall s a. s -> Getting a s a -> a
^. Getting String r String
forall c. HasStriotConfig c => Lens' c String
nodeName)
                                 (r
c r
-> Getting ConnectionConfig r ConnectionConfig -> ConnectionConfig
forall s a. s -> Getting a s a -> a
^. Getting ConnectionConfig r ConnectionConfig
forall c. HasStriotConfig c => Lens' c ConnectionConfig
ingressConnConfig)
                                 Metrics
metrics
                                 InChan (Event alpha)
inChan
        OutChan (Event alpha) -> IO (OutChan (Event alpha))
forall (m :: * -> *) a. Monad m => a -> m a
return OutChan (Event alpha)
outChan


connectDispatch' :: (Store alpha,
                    MonadIO m)
                 => String
                 -> ConnectionConfig
                 -> Metrics
                 -> U.InChan (Event alpha)
                 -> m ()
connectDispatch' :: forall alpha (m :: * -> *).
(Store alpha, MonadIO m) =>
String
-> ConnectionConfig -> Metrics -> InChan (Event alpha) -> m ()
connectDispatch' String
name (ConnTCPConfig   TCPConfig
cc) Metrics
met InChan (Event alpha)
chan = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ String -> TCPConfig -> Metrics -> InChan (Event alpha) -> IO ()
forall alpha.
Store alpha =>
String -> TCPConfig -> Metrics -> InChan (Event alpha) -> IO ()
connectTCP       String
name TCPConfig
cc Metrics
met InChan (Event alpha)
chan
connectDispatch' String
name (ConnKafkaConfig KafkaConfig
cc) Metrics
met InChan (Event alpha)
chan = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ String -> KafkaConfig -> Metrics -> InChan (Event alpha) -> IO ()
forall alpha.
Store alpha =>
String -> KafkaConfig -> Metrics -> InChan (Event alpha) -> IO ()
runKafkaConsumer String
name KafkaConfig
cc Metrics
met InChan (Event alpha)
chan
connectDispatch' String
name (ConnMQTTConfig  MQTTConfig
cc) Metrics
met InChan (Event alpha)
chan = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ String -> MQTTConfig -> Metrics -> InChan (Event alpha) -> IO ()
forall alpha.
Store alpha =>
String -> MQTTConfig -> Metrics -> InChan (Event alpha) -> IO ()
runMQTTSub       String
name MQTTConfig
cc Metrics
met InChan (Event alpha)
chan


sendStream :: (Store alpha,
              MonadReader r m,
              HasStriotConfig r,
              MonadIO m)
           => Metrics
           -> Stream alpha
           -> m ()
sendStream :: forall alpha r (m :: * -> *).
(Store alpha, MonadReader r m, HasStriotConfig r, MonadIO m) =>
Metrics -> Stream alpha -> m ()
sendStream Metrics
metrics Stream alpha
stream = do
    r
c <- m r
forall r (m :: * -> *). MonadReader r m => m r
ask
    IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
        (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ String -> ConnectionConfig -> Metrics -> Stream alpha -> IO ()
forall alpha (m :: * -> *).
(Store alpha, MonadIO m) =>
String -> ConnectionConfig -> Metrics -> Stream alpha -> m ()
sendDispatch (r
c r -> Getting String r String -> String
forall s a. s -> Getting a s a -> a
^. Getting String r String
forall c. HasStriotConfig c => Lens' c String
nodeName)
                       (r
c r
-> Getting ConnectionConfig r ConnectionConfig -> ConnectionConfig
forall s a. s -> Getting a s a -> a
^. Getting ConnectionConfig r ConnectionConfig
forall c. HasStriotConfig c => Lens' c ConnectionConfig
egressConnConfig)
                       Metrics
metrics
                       Stream alpha
stream


sendDispatch :: (Store alpha,
                MonadIO m)
             => String
             -> ConnectionConfig
             -> Metrics
             -> Stream alpha
             -> m ()
sendDispatch :: forall alpha (m :: * -> *).
(Store alpha, MonadIO m) =>
String -> ConnectionConfig -> Metrics -> Stream alpha -> m ()
sendDispatch String
name (ConnTCPConfig   TCPConfig
cc) Metrics
met Stream alpha
stream = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ String -> TCPConfig -> Metrics -> Stream alpha -> IO ()
forall alpha.
Store alpha =>
String -> TCPConfig -> Metrics -> Stream alpha -> IO ()
sendStreamTCP   String
name TCPConfig
cc Metrics
met Stream alpha
stream
sendDispatch String
name (ConnKafkaConfig KafkaConfig
cc) Metrics
met Stream alpha
stream = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ String -> KafkaConfig -> Metrics -> Stream alpha -> IO ()
forall alpha.
Store alpha =>
String -> KafkaConfig -> Metrics -> Stream alpha -> IO ()
sendStreamKafka String
name KafkaConfig
cc Metrics
met Stream alpha
stream
sendDispatch String
name (ConnMQTTConfig  MQTTConfig
cc) Metrics
met Stream alpha
stream = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ String -> MQTTConfig -> Metrics -> Stream alpha -> IO ()
forall alpha.
Store alpha =>
String -> MQTTConfig -> Metrics -> Stream alpha -> IO ()
sendStreamMQTT  String
name MQTTConfig
cc Metrics
met Stream alpha
stream


readListFromSource :: IO alpha -> IO (Stream alpha)
readListFromSource :: forall alpha. IO alpha -> IO (Stream alpha)
readListFromSource = IO alpha -> IO [Event alpha]
forall alpha. IO alpha -> IO (Stream alpha)
go
  where
    go :: IO a -> IO [Event a]
go IO a
pay = IO [Event a] -> IO [Event a]
forall a. IO a -> IO a
unsafeInterleaveIO (IO [Event a] -> IO [Event a]) -> IO [Event a] -> IO [Event a]
forall a b. (a -> b) -> a -> b
$ do
        Event a
x  <- IO (Event a)
msg
        [Event a]
xs <- IO a -> IO [Event a]
go IO a
pay
        [Event a] -> IO [Event a]
forall (m :: * -> *) a. Monad m => a -> m a
return (Event a
x Event a -> [Event a] -> [Event a]
forall a. a -> [a] -> [a]
: [Event a]
xs)
      where
        msg :: IO (Event a)
msg = do
            UTCTime
now <- IO UTCTime
getCurrentTime
            Maybe UTCTime -> Maybe a -> Event a
forall alpha. Maybe UTCTime -> Maybe alpha -> Event alpha
Event (UTCTime -> Maybe UTCTime
forall a. a -> Maybe a
Just UTCTime
now) (Maybe a -> Event a) -> (a -> Maybe a) -> a -> Event a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Maybe a
forall a. a -> Maybe a
Just (a -> Event a) -> IO a -> IO (Event a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO a
pay


--- PROMETHEUS ---

startPrometheus :: String -> IO Metrics
startPrometheus :: String -> IO Metrics
startPrometheus String
name = do
    Registry
reg <- IO Registry
PR.new
    let lbl :: Labels
lbl = Text -> Text -> Labels -> Labels
addLabel Text
"node" (String -> Text
T.pack String
name) Labels
forall a. Monoid a => a
mempty
        registerFn :: (t -> Labels -> Registry -> t) -> t -> t
registerFn t -> Labels -> Registry -> t
fn t
mName = t -> Labels -> Registry -> t
fn t
mName Labels
lbl Registry
reg
        rg :: Name -> IO Gauge
rg = (Name -> Labels -> Registry -> IO Gauge) -> Name -> IO Gauge
forall {t} {t}. (t -> Labels -> Registry -> t) -> t -> t
registerFn Name -> Labels -> Registry -> IO Gauge
registerGauge
        rc :: Name -> IO Counter
rc = (Name -> Labels -> Registry -> IO Counter) -> Name -> IO Counter
forall {t} {t}. (t -> Labels -> Registry -> t) -> t -> t
registerFn Name -> Labels -> Registry -> IO Counter
registerCounter
    IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (IO () -> IO (Async ())) -> IO () -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ Int -> Path -> IO RegistrySample -> IO ()
forall (m :: * -> *).
MonadIO m =>
Int -> Path -> IO RegistrySample -> m ()
serveMetrics Int
8080 [Text
"metrics"] (Registry -> IO RegistrySample
PR.sample Registry
reg)
    Gauge
-> Counter -> Counter -> Gauge -> Counter -> Counter -> Metrics
Metrics
        (Gauge
 -> Counter -> Counter -> Gauge -> Counter -> Counter -> Metrics)
-> IO Gauge
-> IO
     (Counter -> Counter -> Gauge -> Counter -> Counter -> Metrics)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Name -> IO Gauge
rg Name
"striot_ingress_connection"
        IO (Counter -> Counter -> Gauge -> Counter -> Counter -> Metrics)
-> IO Counter
-> IO (Counter -> Gauge -> Counter -> Counter -> Metrics)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Name -> IO Counter
rc Name
"striot_ingress_bytes_total"
        IO (Counter -> Gauge -> Counter -> Counter -> Metrics)
-> IO Counter -> IO (Gauge -> Counter -> Counter -> Metrics)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Name -> IO Counter
rc Name
"striot_ingress_events_total"
        IO (Gauge -> Counter -> Counter -> Metrics)
-> IO Gauge -> IO (Counter -> Counter -> Metrics)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Name -> IO Gauge
rg Name
"striot_egress_connection"
        IO (Counter -> Counter -> Metrics)
-> IO Counter -> IO (Counter -> Metrics)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Name -> IO Counter
rc Name
"striot_egress_bytes_total"
        IO (Counter -> Metrics) -> IO Counter -> IO Metrics
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Name -> IO Counter
rc Name
"striot_egress_events_total"

--------------------------------------------------------------------
-- simple routines for pure streams

-- | Convenience function for creating a pure `Stream`.
mkStream :: [a] -> Stream a
mkStream :: forall a. [a] -> Stream a
mkStream = (a -> Event a) -> [a] -> [Event a]
forall a b. (a -> b) -> [a] -> [b]
map ((a -> Event a) -> [a] -> [Event a])
-> (a -> Event a) -> [a] -> [Event a]
forall a b. (a -> b) -> a -> b
$ Maybe UTCTime -> Maybe a -> Event a
forall alpha. Maybe UTCTime -> Maybe alpha -> Event alpha
Event Maybe UTCTime
forall a. Maybe a
Nothing (Maybe a -> Event a) -> (a -> Maybe a) -> a -> Event a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Maybe a
forall a. a -> Maybe a
Just

-- | A convenience function to extract a list of values from a `Stream`.
unStream :: Stream a -> [a]
unStream :: forall a. Stream a -> [a]
unStream = [Maybe a] -> [a]
forall a. [Maybe a] -> [a]
catMaybes ([Maybe a] -> [a]) -> ([Event a] -> [Maybe a]) -> [Event a] -> [a]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Event a -> Maybe a) -> [Event a] -> [Maybe a]
forall a b. (a -> b) -> [a] -> [b]
map Event a -> Maybe a
forall alpha. Event alpha -> Maybe alpha
value