{-# LANGUAGE DataKinds #-}
{-# OPTIONS_HADDOCK prune #-}
{-# LANGUAGE OverloadedStrings #-}
module Striot.Nodes ( nodeSource
, nodeLink
, nodeLink2
, nodeSink
, nodeSink2
, nodeSimple
, defaultConfig
, defaultSource
, defaultLink
, defaultSink
, mkStream
, unStream
) where
import Control.Concurrent.Async (async)
import Control.Concurrent.Chan.Unagi.Bounded as U
import Control.Lens
import Control.Monad.IO.Class
import Control.Monad.Reader
import Data.IORef
import Data.Maybe
import Data.Store (Store)
import Data.Text as T (pack)
import Data.Time (getCurrentTime)
import Network.Socket (HostName,
ServiceName)
import Striot.FunctionalIoTtypes
import Striot.Nodes.Kafka
import Striot.Nodes.MQTT
import Striot.Nodes.TCP
import Striot.Nodes.Types hiding (nc,
readConf,
writeConf)
import System.IO
import System.IO.Unsafe
import System.Metrics.Prometheus.Concurrent.Registry as PR (new, registerCounter,
registerGauge,
sample)
import System.Metrics.Prometheus.Http.Scrape (serveMetrics)
import System.Metrics.Prometheus.Metric.Counter as PC (inc)
import System.Metrics.Prometheus.MetricId (addLabel)
nodeSource :: (Store alpha, Store beta)
=> StriotConfig
-> IO alpha
-> (Stream alpha -> Stream beta)
-> IO ()
nodeSource :: forall alpha beta.
(Store alpha, Store beta) =>
StriotConfig -> IO alpha -> (Stream alpha -> Stream beta) -> IO ()
nodeSource StriotConfig
config IO alpha
iofn Stream alpha -> Stream beta
streamOp =
ReaderT StriotConfig IO () -> StriotConfig -> IO ()
forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT (StriotApp () -> ReaderT StriotConfig IO ()
forall a. StriotApp a -> ReaderT StriotConfig IO a
unApp (StriotApp () -> ReaderT StriotConfig IO ())
-> StriotApp () -> ReaderT StriotConfig IO ()
forall a b. (a -> b) -> a -> b
$ IO alpha -> (Stream alpha -> Stream beta) -> StriotApp ()
forall alpha beta r (m :: * -> *).
(Store alpha, Store beta, MonadReader r m, HasStriotConfig r,
MonadIO m) =>
IO alpha -> (Stream alpha -> Stream beta) -> m ()
nodeSource' IO alpha
iofn Stream alpha -> Stream beta
streamOp) StriotConfig
config
nodeSource' :: (Store alpha, Store beta,
MonadReader r m,
HasStriotConfig r,
MonadIO m)
=> IO alpha
-> (Stream alpha -> Stream beta) -> m ()
nodeSource' :: forall alpha beta r (m :: * -> *).
(Store alpha, Store beta, MonadReader r m, HasStriotConfig r,
MonadIO m) =>
IO alpha -> (Stream alpha -> Stream beta) -> m ()
nodeSource' IO alpha
iofn Stream alpha -> Stream beta
streamOp = do
r
c <- m r
forall r (m :: * -> *). MonadReader r m => m r
ask
Metrics
metrics <- IO Metrics -> m Metrics
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Metrics -> m Metrics) -> IO Metrics -> m Metrics
forall a b. (a -> b) -> a -> b
$ String -> IO Metrics
startPrometheus (r
c r -> Getting String r String -> String
forall s a. s -> Getting a s a -> a
^. Getting String r String
forall c. HasStriotConfig c => Lens' c String
nodeName)
let iofnAndMetrics :: IO alpha
iofnAndMetrics = Counter -> IO ()
PC.inc (Metrics -> Counter
_ingressEvents Metrics
metrics) IO () -> IO alpha -> IO alpha
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO alpha
iofn
Stream alpha
stream <- IO (Stream alpha) -> m (Stream alpha)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Stream alpha) -> m (Stream alpha))
-> IO (Stream alpha) -> m (Stream alpha)
forall a b. (a -> b) -> a -> b
$ IO alpha -> IO (Stream alpha)
forall alpha. IO alpha -> IO (Stream alpha)
readListFromSource IO alpha
iofnAndMetrics
let result :: Stream beta
result = Stream alpha -> Stream beta
streamOp Stream alpha
stream
Metrics -> Stream beta -> m ()
forall alpha r (m :: * -> *).
(Store alpha, MonadReader r m, HasStriotConfig r, MonadIO m) =>
Metrics -> Stream alpha -> m ()
sendStream Metrics
metrics Stream beta
result
nodeLink :: (Store alpha, Store beta)
=> StriotConfig
-> (Stream alpha -> Stream beta)
-> IO ()
nodeLink :: forall alpha beta.
(Store alpha, Store beta) =>
StriotConfig -> (Stream alpha -> Stream beta) -> IO ()
nodeLink StriotConfig
config Stream alpha -> Stream beta
streamOp =
ReaderT StriotConfig IO () -> StriotConfig -> IO ()
forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT (StriotApp () -> ReaderT StriotConfig IO ()
forall a. StriotApp a -> ReaderT StriotConfig IO a
unApp (StriotApp () -> ReaderT StriotConfig IO ())
-> StriotApp () -> ReaderT StriotConfig IO ()
forall a b. (a -> b) -> a -> b
$ (Stream alpha -> Stream beta) -> StriotApp ()
forall alpha beta r (m :: * -> *).
(Store alpha, Store beta, MonadReader r m, HasStriotConfig r,
MonadIO m) =>
(Stream alpha -> Stream beta) -> m ()
nodeLink' Stream alpha -> Stream beta
streamOp) StriotConfig
config
nodeLink' :: (Store alpha, Store beta,
MonadReader r m,
HasStriotConfig r,
MonadIO m)
=> (Stream alpha -> Stream beta)
-> m ()
nodeLink' :: forall alpha beta r (m :: * -> *).
(Store alpha, Store beta, MonadReader r m, HasStriotConfig r,
MonadIO m) =>
(Stream alpha -> Stream beta) -> m ()
nodeLink' Stream alpha -> Stream beta
streamOp = do
r
c <- m r
forall r (m :: * -> *). MonadReader r m => m r
ask
Metrics
metrics <- IO Metrics -> m Metrics
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Metrics -> m Metrics) -> IO Metrics -> m Metrics
forall a b. (a -> b) -> a -> b
$ String -> IO Metrics
startPrometheus (r
c r -> Getting String r String -> String
forall s a. s -> Getting a s a -> a
^. Getting String r String
forall c. HasStriotConfig c => Lens' c String
nodeName)
Stream alpha
stream <- Metrics -> m (Stream alpha)
forall alpha r (m :: * -> *).
(Store alpha, MonadReader r m, HasStriotConfig r, MonadIO m) =>
Metrics -> m (Stream alpha)
processInput Metrics
metrics
let result :: Stream beta
result = Stream alpha -> Stream beta
streamOp Stream alpha
stream
Metrics -> Stream beta -> m ()
forall alpha r (m :: * -> *).
(Store alpha, MonadReader r m, HasStriotConfig r, MonadIO m) =>
Metrics -> Stream alpha -> m ()
sendStream Metrics
metrics Stream beta
result
nodeLink2 :: (Store alpha, Store beta, Store gamma)
=> (Stream alpha -> Stream beta -> Stream gamma)
-> ServiceName
-> ServiceName
-> HostName
-> ServiceName
-> IO ()
nodeLink2 :: forall alpha beta gamma.
(Store alpha, Store beta, Store gamma) =>
(Stream alpha -> Stream beta -> Stream gamma)
-> String -> String -> String -> String -> IO ()
nodeLink2 Stream alpha -> Stream beta -> Stream gamma
streamOp String
inputPort1 String
inputPort2 String
outputHost String
outputPort = do
let nodeName :: String
nodeName = String
"node-link"
(ConnTCPConfig TCPConfig
ic1) = String -> String -> ConnectionConfig
tcpConfig String
"" String
inputPort1
(ConnTCPConfig TCPConfig
ic2) = String -> String -> ConnectionConfig
tcpConfig String
"" String
inputPort2
(ConnTCPConfig TCPConfig
ec) = String -> String -> ConnectionConfig
tcpConfig String
outputHost String
outputPort
Metrics
metrics <- String -> IO Metrics
startPrometheus String
nodeName
String -> IO ()
putStrLn String
"Starting link ..."
Stream alpha
stream1 <- String -> TCPConfig -> Metrics -> IO (Stream alpha)
forall alpha.
Store alpha =>
String -> TCPConfig -> Metrics -> IO (Stream alpha)
processSocket String
nodeName TCPConfig
ic1 Metrics
metrics
Stream beta
stream2 <- String -> TCPConfig -> Metrics -> IO (Stream beta)
forall alpha.
Store alpha =>
String -> TCPConfig -> Metrics -> IO (Stream alpha)
processSocket String
nodeName TCPConfig
ic2 Metrics
metrics
let result :: Stream gamma
result = Stream alpha -> Stream beta -> Stream gamma
streamOp Stream alpha
stream1 Stream beta
stream2
String -> TCPConfig -> Metrics -> Stream gamma -> IO ()
forall alpha.
Store alpha =>
String -> TCPConfig -> Metrics -> Stream alpha -> IO ()
sendStreamTCP String
nodeName TCPConfig
ec Metrics
metrics Stream gamma
result
nodeSink :: (Store alpha, Store beta)
=> StriotConfig
-> (Stream alpha -> Stream beta)
-> (Stream beta -> IO ())
-> IO ()
nodeSink :: forall alpha beta.
(Store alpha, Store beta) =>
StriotConfig
-> (Stream alpha -> Stream beta) -> (Stream beta -> IO ()) -> IO ()
nodeSink StriotConfig
config Stream alpha -> Stream beta
streamOp Stream beta -> IO ()
iofn =
ReaderT StriotConfig IO () -> StriotConfig -> IO ()
forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT (StriotApp () -> ReaderT StriotConfig IO ()
forall a. StriotApp a -> ReaderT StriotConfig IO a
unApp (StriotApp () -> ReaderT StriotConfig IO ())
-> StriotApp () -> ReaderT StriotConfig IO ()
forall a b. (a -> b) -> a -> b
$ (Stream alpha -> Stream beta)
-> (Stream beta -> IO ()) -> StriotApp ()
forall alpha beta r (m :: * -> *).
(Store alpha, Store beta, MonadReader r m, HasStriotConfig r,
MonadIO m) =>
(Stream alpha -> Stream beta) -> (Stream beta -> IO ()) -> m ()
nodeSink' Stream alpha -> Stream beta
streamOp Stream beta -> IO ()
iofn) StriotConfig
config
nodeSink' :: (Store alpha, Store beta,
MonadReader r m,
HasStriotConfig r,
MonadIO m)
=> (Stream alpha -> Stream beta)
-> (Stream beta -> IO ())
-> m ()
nodeSink' :: forall alpha beta r (m :: * -> *).
(Store alpha, Store beta, MonadReader r m, HasStriotConfig r,
MonadIO m) =>
(Stream alpha -> Stream beta) -> (Stream beta -> IO ()) -> m ()
nodeSink' Stream alpha -> Stream beta
streamOp Stream beta -> IO ()
iofn = do
r
c <- m r
forall r (m :: * -> *). MonadReader r m => m r
ask
Metrics
metrics <- IO Metrics -> m Metrics
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Metrics -> m Metrics) -> IO Metrics -> m Metrics
forall a b. (a -> b) -> a -> b
$ String -> IO Metrics
startPrometheus (r
c r -> Getting String r String -> String
forall s a. s -> Getting a s a -> a
^. Getting String r String
forall c. HasStriotConfig c => Lens' c String
nodeName)
Stream alpha
stream <- Metrics -> m (Stream alpha)
forall alpha r (m :: * -> *).
(Store alpha, MonadReader r m, HasStriotConfig r, MonadIO m) =>
Metrics -> m (Stream alpha)
processInput Metrics
metrics
let result :: Stream beta
result = Stream alpha -> Stream beta
streamOp Stream alpha
stream
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Stream beta -> IO ()
iofn Stream beta
result
nodeSink2 :: (Store alpha, Store beta, Store gamma)
=> (Stream alpha -> Stream beta -> Stream gamma)
-> (Stream gamma -> IO ())
-> ServiceName
-> ServiceName
-> IO ()
nodeSink2 :: forall alpha beta gamma.
(Store alpha, Store beta, Store gamma) =>
(Stream alpha -> Stream beta -> Stream gamma)
-> (Stream gamma -> IO ()) -> String -> String -> IO ()
nodeSink2 Stream alpha -> Stream beta -> Stream gamma
streamOp Stream gamma -> IO ()
iofn String
inputPort1 String
inputPort2 = do
let nodeName :: String
nodeName = String
"node-sink"
(ConnTCPConfig TCPConfig
ic1) = String -> String -> ConnectionConfig
tcpConfig String
"" String
inputPort1
(ConnTCPConfig TCPConfig
ic2) = String -> String -> ConnectionConfig
tcpConfig String
"" String
inputPort2
Metrics
metrics <- String -> IO Metrics
startPrometheus String
nodeName
String -> IO ()
putStrLn String
"Starting sink ..."
Stream alpha
stream1 <- String -> TCPConfig -> Metrics -> IO (Stream alpha)
forall alpha.
Store alpha =>
String -> TCPConfig -> Metrics -> IO (Stream alpha)
processSocket String
nodeName TCPConfig
ic1 Metrics
metrics
Stream beta
stream2 <- String -> TCPConfig -> Metrics -> IO (Stream beta)
forall alpha.
Store alpha =>
String -> TCPConfig -> Metrics -> IO (Stream alpha)
processSocket String
nodeName TCPConfig
ic2 Metrics
metrics
let result :: Stream gamma
result = Stream alpha -> Stream beta -> Stream gamma
streamOp Stream alpha
stream1 Stream beta
stream2
Stream gamma -> IO ()
iofn Stream gamma
result
nodeSimple :: (IO a) -> (Stream a -> Stream b) -> (Stream b -> IO()) -> IO ()
nodeSimple :: forall a b.
IO a -> (Stream a -> Stream b) -> (Stream b -> IO ()) -> IO ()
nodeSimple IO a
src Stream a -> Stream b
proc Stream b -> IO ()
sink = Stream b -> IO ()
sink (Stream b -> IO ()) -> (Stream a -> Stream b) -> Stream a -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Stream a -> Stream b
proc (Stream a -> IO ()) -> IO (Stream a) -> IO ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< IO a -> IO (Stream a)
forall alpha. IO alpha -> IO (Stream alpha)
readListFromSource IO a
src
defaultConfig :: String
-> HostName
-> ServiceName
-> HostName
-> ServiceName
-> StriotConfig
defaultConfig :: String -> String -> String -> String -> String -> StriotConfig
defaultConfig = ConnectProtocol
-> ConnectProtocol
-> String
-> String
-> String
-> String
-> String
-> StriotConfig
defaultConfig' ConnectProtocol
TCP ConnectProtocol
TCP
defaultSource :: HostName -> ServiceName -> StriotConfig
defaultSource :: String -> String -> StriotConfig
defaultSource = String -> String -> String -> String -> String -> StriotConfig
defaultConfig String
"striot-source" String
"" String
""
defaultLink :: ServiceName -> HostName -> ServiceName -> StriotConfig
defaultLink :: String -> String -> String -> StriotConfig
defaultLink = String -> String -> String -> String -> String -> StriotConfig
defaultConfig String
"striot-link" String
""
defaultSink :: ServiceName -> StriotConfig
defaultSink :: String -> StriotConfig
defaultSink String
port = String -> String -> String -> String -> String -> StriotConfig
defaultConfig String
"striot-sink" String
"" String
port String
"" String
""
defaultConfig' :: ConnectProtocol
-> ConnectProtocol
-> String
-> HostName
-> ServiceName
-> HostName
-> ServiceName
-> StriotConfig
defaultConfig' :: ConnectProtocol
-> ConnectProtocol
-> String
-> String
-> String
-> String
-> String
-> StriotConfig
defaultConfig' ConnectProtocol
ict ConnectProtocol
ect String
name String
inHost String
inPort String
outHost String
outPort =
let ccf :: ConnectProtocol -> String -> String -> ConnectionConfig
ccf ConnectProtocol
ct = case ConnectProtocol
ct of
ConnectProtocol
TCP -> String -> String -> ConnectionConfig
tcpConfig
ConnectProtocol
KAFKA -> String -> String -> ConnectionConfig
defaultKafkaConfig
ConnectProtocol
MQTT -> String -> String -> ConnectionConfig
defaultMqttConfig
in String -> ConnectionConfig -> ConnectionConfig -> StriotConfig
baseConfig String
name (ConnectProtocol -> String -> String -> ConnectionConfig
ccf ConnectProtocol
ict String
inHost String
inPort) (ConnectProtocol -> String -> String -> ConnectionConfig
ccf ConnectProtocol
ect String
outHost String
outPort)
baseConfig :: String -> ConnectionConfig -> ConnectionConfig -> StriotConfig
baseConfig :: String -> ConnectionConfig -> ConnectionConfig -> StriotConfig
baseConfig String
name ConnectionConfig
icc ConnectionConfig
ecc =
StriotConfig :: String
-> ConnectionConfig -> ConnectionConfig -> Int -> StriotConfig
StriotConfig
{ _nodeName :: String
_nodeName = String
name
, _ingressConnConfig :: ConnectionConfig
_ingressConnConfig = ConnectionConfig
icc
, _egressConnConfig :: ConnectionConfig
_egressConnConfig = ConnectionConfig
ecc
, _chanSize :: Int
_chanSize = Int
10
}
tcpConfig :: HostName -> ServiceName -> ConnectionConfig
tcpConfig :: String -> String -> ConnectionConfig
tcpConfig String
host String
port =
TCPConfig -> ConnectionConfig
ConnTCPConfig (TCPConfig -> ConnectionConfig) -> TCPConfig -> ConnectionConfig
forall a b. (a -> b) -> a -> b
$ NetConfig -> TCPConfig
TCPConfig (NetConfig -> TCPConfig) -> NetConfig -> TCPConfig
forall a b. (a -> b) -> a -> b
$ String -> String -> NetConfig
NetConfig String
host String
port
kafkaConfig :: String -> String -> HostName -> ServiceName -> ConnectionConfig
kafkaConfig :: String -> String -> String -> String -> ConnectionConfig
kafkaConfig String
topic String
conGroup String
host String
port =
KafkaConfig -> ConnectionConfig
ConnKafkaConfig (KafkaConfig -> ConnectionConfig)
-> KafkaConfig -> ConnectionConfig
forall a b. (a -> b) -> a -> b
$ NetConfig -> String -> String -> KafkaConfig
KafkaConfig (String -> String -> NetConfig
NetConfig String
host String
port) String
topic String
conGroup
defaultKafkaConfig :: HostName -> ServiceName -> ConnectionConfig
defaultKafkaConfig :: String -> String -> ConnectionConfig
defaultKafkaConfig = String -> String -> String -> String -> ConnectionConfig
kafkaConfig String
"striot-queue" String
"striot_consumer_group"
mqttConfig :: String -> HostName -> ServiceName -> ConnectionConfig
mqttConfig :: String -> String -> String -> ConnectionConfig
mqttConfig String
topic String
host String
port =
MQTTConfig -> ConnectionConfig
ConnMQTTConfig (MQTTConfig -> ConnectionConfig) -> MQTTConfig -> ConnectionConfig
forall a b. (a -> b) -> a -> b
$ NetConfig -> String -> MQTTConfig
MQTTConfig (String -> String -> NetConfig
NetConfig String
host String
port) String
topic
defaultMqttConfig :: HostName -> ServiceName -> ConnectionConfig
defaultMqttConfig :: String -> String -> ConnectionConfig
defaultMqttConfig = String -> String -> String -> ConnectionConfig
mqttConfig String
"StriotQueue"
processInput :: (Store alpha,
MonadReader r m,
HasStriotConfig r,
MonadIO m)
=> Metrics
-> m (Stream alpha)
processInput :: forall alpha r (m :: * -> *).
(Store alpha, MonadReader r m, HasStriotConfig r, MonadIO m) =>
Metrics -> m (Stream alpha)
processInput Metrics
metrics = Metrics -> m (OutChan (Event alpha))
forall alpha r (m :: * -> *).
(Store alpha, MonadReader r m, HasStriotConfig r, MonadIO m) =>
Metrics -> m (OutChan (Event alpha))
connectDispatch Metrics
metrics m (OutChan (Event alpha))
-> (OutChan (Event alpha) -> m [Event alpha]) -> m [Event alpha]
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (IO [Event alpha] -> m [Event alpha]
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO [Event alpha] -> m [Event alpha])
-> (OutChan (Event alpha) -> IO [Event alpha])
-> OutChan (Event alpha)
-> m [Event alpha]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. OutChan (Event alpha) -> IO [Event alpha]
forall a. OutChan a -> IO [a]
U.getChanContents)
connectDispatch :: (Store alpha,
MonadReader r m,
HasStriotConfig r,
MonadIO m)
=> Metrics
-> m (U.OutChan (Event alpha))
connectDispatch :: forall alpha r (m :: * -> *).
(Store alpha, MonadReader r m, HasStriotConfig r, MonadIO m) =>
Metrics -> m (OutChan (Event alpha))
connectDispatch Metrics
metrics = do
r
c <- m r
forall r (m :: * -> *). MonadReader r m => m r
ask
IO (OutChan (Event alpha)) -> m (OutChan (Event alpha))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (OutChan (Event alpha)) -> m (OutChan (Event alpha)))
-> IO (OutChan (Event alpha)) -> m (OutChan (Event alpha))
forall a b. (a -> b) -> a -> b
$ do
(InChan (Event alpha)
inChan, OutChan (Event alpha)
outChan) <- Int -> IO (InChan (Event alpha), OutChan (Event alpha))
forall a. Int -> IO (InChan a, OutChan a)
U.newChan (r
c r -> Getting Int r Int -> Int
forall s a. s -> Getting a s a -> a
^. Getting Int r Int
forall c. HasStriotConfig c => Lens' c Int
chanSize)
IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (IO () -> IO (Async ())) -> IO () -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ String
-> ConnectionConfig -> Metrics -> InChan (Event alpha) -> IO ()
forall alpha (m :: * -> *).
(Store alpha, MonadIO m) =>
String
-> ConnectionConfig -> Metrics -> InChan (Event alpha) -> m ()
connectDispatch' (r
c r -> Getting String r String -> String
forall s a. s -> Getting a s a -> a
^. Getting String r String
forall c. HasStriotConfig c => Lens' c String
nodeName)
(r
c r
-> Getting ConnectionConfig r ConnectionConfig -> ConnectionConfig
forall s a. s -> Getting a s a -> a
^. Getting ConnectionConfig r ConnectionConfig
forall c. HasStriotConfig c => Lens' c ConnectionConfig
ingressConnConfig)
Metrics
metrics
InChan (Event alpha)
inChan
OutChan (Event alpha) -> IO (OutChan (Event alpha))
forall (m :: * -> *) a. Monad m => a -> m a
return OutChan (Event alpha)
outChan
connectDispatch' :: (Store alpha,
MonadIO m)
=> String
-> ConnectionConfig
-> Metrics
-> U.InChan (Event alpha)
-> m ()
connectDispatch' :: forall alpha (m :: * -> *).
(Store alpha, MonadIO m) =>
String
-> ConnectionConfig -> Metrics -> InChan (Event alpha) -> m ()
connectDispatch' String
name (ConnTCPConfig TCPConfig
cc) Metrics
met InChan (Event alpha)
chan = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ String -> TCPConfig -> Metrics -> InChan (Event alpha) -> IO ()
forall alpha.
Store alpha =>
String -> TCPConfig -> Metrics -> InChan (Event alpha) -> IO ()
connectTCP String
name TCPConfig
cc Metrics
met InChan (Event alpha)
chan
connectDispatch' String
name (ConnKafkaConfig KafkaConfig
cc) Metrics
met InChan (Event alpha)
chan = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ String -> KafkaConfig -> Metrics -> InChan (Event alpha) -> IO ()
forall alpha.
Store alpha =>
String -> KafkaConfig -> Metrics -> InChan (Event alpha) -> IO ()
runKafkaConsumer String
name KafkaConfig
cc Metrics
met InChan (Event alpha)
chan
connectDispatch' String
name (ConnMQTTConfig MQTTConfig
cc) Metrics
met InChan (Event alpha)
chan = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ String -> MQTTConfig -> Metrics -> InChan (Event alpha) -> IO ()
forall alpha.
Store alpha =>
String -> MQTTConfig -> Metrics -> InChan (Event alpha) -> IO ()
runMQTTSub String
name MQTTConfig
cc Metrics
met InChan (Event alpha)
chan
sendStream :: (Store alpha,
MonadReader r m,
HasStriotConfig r,
MonadIO m)
=> Metrics
-> Stream alpha
-> m ()
sendStream :: forall alpha r (m :: * -> *).
(Store alpha, MonadReader r m, HasStriotConfig r, MonadIO m) =>
Metrics -> Stream alpha -> m ()
sendStream Metrics
metrics Stream alpha
stream = do
r
c <- m r
forall r (m :: * -> *). MonadReader r m => m r
ask
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
(IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ String -> ConnectionConfig -> Metrics -> Stream alpha -> IO ()
forall alpha (m :: * -> *).
(Store alpha, MonadIO m) =>
String -> ConnectionConfig -> Metrics -> Stream alpha -> m ()
sendDispatch (r
c r -> Getting String r String -> String
forall s a. s -> Getting a s a -> a
^. Getting String r String
forall c. HasStriotConfig c => Lens' c String
nodeName)
(r
c r
-> Getting ConnectionConfig r ConnectionConfig -> ConnectionConfig
forall s a. s -> Getting a s a -> a
^. Getting ConnectionConfig r ConnectionConfig
forall c. HasStriotConfig c => Lens' c ConnectionConfig
egressConnConfig)
Metrics
metrics
Stream alpha
stream
sendDispatch :: (Store alpha,
MonadIO m)
=> String
-> ConnectionConfig
-> Metrics
-> Stream alpha
-> m ()
sendDispatch :: forall alpha (m :: * -> *).
(Store alpha, MonadIO m) =>
String -> ConnectionConfig -> Metrics -> Stream alpha -> m ()
sendDispatch String
name (ConnTCPConfig TCPConfig
cc) Metrics
met Stream alpha
stream = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ String -> TCPConfig -> Metrics -> Stream alpha -> IO ()
forall alpha.
Store alpha =>
String -> TCPConfig -> Metrics -> Stream alpha -> IO ()
sendStreamTCP String
name TCPConfig
cc Metrics
met Stream alpha
stream
sendDispatch String
name (ConnKafkaConfig KafkaConfig
cc) Metrics
met Stream alpha
stream = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ String -> KafkaConfig -> Metrics -> Stream alpha -> IO ()
forall alpha.
Store alpha =>
String -> KafkaConfig -> Metrics -> Stream alpha -> IO ()
sendStreamKafka String
name KafkaConfig
cc Metrics
met Stream alpha
stream
sendDispatch String
name (ConnMQTTConfig MQTTConfig
cc) Metrics
met Stream alpha
stream = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ String -> MQTTConfig -> Metrics -> Stream alpha -> IO ()
forall alpha.
Store alpha =>
String -> MQTTConfig -> Metrics -> Stream alpha -> IO ()
sendStreamMQTT String
name MQTTConfig
cc Metrics
met Stream alpha
stream
readListFromSource :: IO alpha -> IO (Stream alpha)
readListFromSource :: forall alpha. IO alpha -> IO (Stream alpha)
readListFromSource = IO alpha -> IO [Event alpha]
forall alpha. IO alpha -> IO (Stream alpha)
go
where
go :: IO a -> IO [Event a]
go IO a
pay = IO [Event a] -> IO [Event a]
forall a. IO a -> IO a
unsafeInterleaveIO (IO [Event a] -> IO [Event a]) -> IO [Event a] -> IO [Event a]
forall a b. (a -> b) -> a -> b
$ do
Event a
x <- IO (Event a)
msg
[Event a]
xs <- IO a -> IO [Event a]
go IO a
pay
[Event a] -> IO [Event a]
forall (m :: * -> *) a. Monad m => a -> m a
return (Event a
x Event a -> [Event a] -> [Event a]
forall a. a -> [a] -> [a]
: [Event a]
xs)
where
msg :: IO (Event a)
msg = do
UTCTime
now <- IO UTCTime
getCurrentTime
Maybe UTCTime -> Maybe a -> Event a
forall alpha. Maybe UTCTime -> Maybe alpha -> Event alpha
Event (UTCTime -> Maybe UTCTime
forall a. a -> Maybe a
Just UTCTime
now) (Maybe a -> Event a) -> (a -> Maybe a) -> a -> Event a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Maybe a
forall a. a -> Maybe a
Just (a -> Event a) -> IO a -> IO (Event a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO a
pay
startPrometheus :: String -> IO Metrics
startPrometheus :: String -> IO Metrics
startPrometheus String
name = do
Registry
reg <- IO Registry
PR.new
let lbl :: Labels
lbl = Text -> Text -> Labels -> Labels
addLabel Text
"node" (String -> Text
T.pack String
name) Labels
forall a. Monoid a => a
mempty
registerFn :: (t -> Labels -> Registry -> t) -> t -> t
registerFn t -> Labels -> Registry -> t
fn t
mName = t -> Labels -> Registry -> t
fn t
mName Labels
lbl Registry
reg
rg :: Name -> IO Gauge
rg = (Name -> Labels -> Registry -> IO Gauge) -> Name -> IO Gauge
forall {t} {t}. (t -> Labels -> Registry -> t) -> t -> t
registerFn Name -> Labels -> Registry -> IO Gauge
registerGauge
rc :: Name -> IO Counter
rc = (Name -> Labels -> Registry -> IO Counter) -> Name -> IO Counter
forall {t} {t}. (t -> Labels -> Registry -> t) -> t -> t
registerFn Name -> Labels -> Registry -> IO Counter
registerCounter
IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (IO () -> IO (Async ())) -> IO () -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ Int -> Path -> IO RegistrySample -> IO ()
forall (m :: * -> *).
MonadIO m =>
Int -> Path -> IO RegistrySample -> m ()
serveMetrics Int
8080 [Text
"metrics"] (Registry -> IO RegistrySample
PR.sample Registry
reg)
Gauge
-> Counter -> Counter -> Gauge -> Counter -> Counter -> Metrics
Metrics
(Gauge
-> Counter -> Counter -> Gauge -> Counter -> Counter -> Metrics)
-> IO Gauge
-> IO
(Counter -> Counter -> Gauge -> Counter -> Counter -> Metrics)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Name -> IO Gauge
rg Name
"striot_ingress_connection"
IO (Counter -> Counter -> Gauge -> Counter -> Counter -> Metrics)
-> IO Counter
-> IO (Counter -> Gauge -> Counter -> Counter -> Metrics)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Name -> IO Counter
rc Name
"striot_ingress_bytes_total"
IO (Counter -> Gauge -> Counter -> Counter -> Metrics)
-> IO Counter -> IO (Gauge -> Counter -> Counter -> Metrics)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Name -> IO Counter
rc Name
"striot_ingress_events_total"
IO (Gauge -> Counter -> Counter -> Metrics)
-> IO Gauge -> IO (Counter -> Counter -> Metrics)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Name -> IO Gauge
rg Name
"striot_egress_connection"
IO (Counter -> Counter -> Metrics)
-> IO Counter -> IO (Counter -> Metrics)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Name -> IO Counter
rc Name
"striot_egress_bytes_total"
IO (Counter -> Metrics) -> IO Counter -> IO Metrics
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Name -> IO Counter
rc Name
"striot_egress_events_total"
mkStream :: [a] -> Stream a
mkStream :: forall a. [a] -> Stream a
mkStream = (a -> Event a) -> [a] -> [Event a]
forall a b. (a -> b) -> [a] -> [b]
map ((a -> Event a) -> [a] -> [Event a])
-> (a -> Event a) -> [a] -> [Event a]
forall a b. (a -> b) -> a -> b
$ Maybe UTCTime -> Maybe a -> Event a
forall alpha. Maybe UTCTime -> Maybe alpha -> Event alpha
Event Maybe UTCTime
forall a. Maybe a
Nothing (Maybe a -> Event a) -> (a -> Maybe a) -> a -> Event a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Maybe a
forall a. a -> Maybe a
Just
unStream :: Stream a -> [a]
unStream :: forall a. Stream a -> [a]
unStream = [Maybe a] -> [a]
forall a. [Maybe a] -> [a]
catMaybes ([Maybe a] -> [a]) -> ([Event a] -> [Maybe a]) -> [Event a] -> [a]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Event a -> Maybe a) -> [Event a] -> [Maybe a]
forall a b. (a -> b) -> [a] -> [b]
map Event a -> Maybe a
forall alpha. Event alpha -> Maybe alpha
value