module Striot.Nodes.MQTT
( sendStreamMQTT
, runMQTTSub
) where

import           Control.Concurrent.Chan.Unagi.Bounded    as U
import           Control.DeepSeq                          (force)
import qualified Control.Exception                        as E (evaluate)
import           Control.Lens
import qualified Data.ByteString                          as B (length)
import qualified Data.ByteString.Lazy.Char8               as BLC
import           Data.Store                               (Store, decode,
                                                           encode)
import           Data.Maybe                               (fromJust)
import           Data.Text                                as T (pack)
import           Network.MQTT.Client                      as MQTT hiding
                                                                   (Timeout)
import           Network.MQTT.Topic                       (mkTopic, mkFilter)
import           Network.MQTT.Types                       (RetainHandling (..))
import           Network.Socket                           (HostName,
                                                           ServiceName)
import           Network.URI                              (parseURI)
import           Striot.FunctionalIoTtypes
import           Striot.Nodes.Types                       as NT
import           System.Metrics.Prometheus.Metric.Counter as PC (add, inc)
import           System.Metrics.Prometheus.Metric.Gauge   as PG (dec, inc)


sendStreamMQTT :: Store alpha => String -> NT.MQTTConfig -> Metrics -> Stream alpha -> IO ()
sendStreamMQTT :: forall alpha.
Store alpha =>
String -> MQTTConfig -> Metrics -> Stream alpha -> IO ()
sendStreamMQTT String
name MQTTConfig
conf Metrics
met Stream alpha
stream = do
    MQTTClient
mc <- String -> String -> String -> IO MQTTClient
runMQTTPub String
name (MQTTConfig
conf MQTTConfig -> Getting String MQTTConfig String -> String
forall s a. s -> Getting a s a -> a
^. (NetConfig -> Const String NetConfig)
-> MQTTConfig -> Const String MQTTConfig
Lens' MQTTConfig NetConfig
mqttConn ((NetConfig -> Const String NetConfig)
 -> MQTTConfig -> Const String MQTTConfig)
-> ((String -> Const String String)
    -> NetConfig -> Const String NetConfig)
-> Getting String MQTTConfig String
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (String -> Const String String)
-> NetConfig -> Const String NetConfig
Lens' NetConfig String
host) (MQTTConfig
conf MQTTConfig -> Getting String MQTTConfig String -> String
forall s a. s -> Getting a s a -> a
^. (NetConfig -> Const String NetConfig)
-> MQTTConfig -> Const String MQTTConfig
Lens' MQTTConfig NetConfig
mqttConn ((NetConfig -> Const String NetConfig)
 -> MQTTConfig -> Const String MQTTConfig)
-> ((String -> Const String String)
    -> NetConfig -> Const String NetConfig)
-> Getting String MQTTConfig String
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (String -> Const String String)
-> NetConfig -> Const String NetConfig
Lens' NetConfig String
port)
    (Event alpha -> IO ()) -> Stream alpha -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (\Event alpha
x -> do
                    ByteString
val <- ByteString -> IO ByteString
forall a. a -> IO a
E.evaluate (ByteString -> IO ByteString)
-> (Event alpha -> ByteString) -> Event alpha -> IO ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> ByteString
forall a. NFData a => a -> a
force (ByteString -> ByteString)
-> (Event alpha -> ByteString) -> Event alpha -> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Event alpha -> ByteString
forall a. Store a => a -> ByteString
encode (Event alpha -> IO ByteString) -> Event alpha -> IO ByteString
forall a b. (a -> b) -> a -> b
$ Event alpha
x
                    Counter -> IO ()
PC.inc (Metrics -> Counter
_egressEvents Metrics
met)
                        IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Int -> Counter -> IO ()
PC.add (ByteString -> Int
B.length ByteString
val) (Metrics -> Counter
_egressBytes Metrics
met)
                        IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> MQTTClient
-> Topic -> ByteString -> Bool -> QoS -> [Property] -> IO ()
publishq MQTTClient
mc Topic
topic (ByteString -> ByteString
BLC.fromStrict ByteString
val) Bool
False QoS
QoS0 []) Stream alpha
stream
                        where
                            topic :: Topic
topic = (Maybe Topic -> Topic
forall a. HasCallStack => Maybe a -> a
fromJust (Maybe Topic -> Topic)
-> (String -> Maybe Topic) -> String -> Topic
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> Maybe Topic
mkTopic (Text -> Maybe Topic) -> (String -> Text) -> String -> Maybe Topic
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> Text
T.pack) (String -> Topic) -> String -> Topic
forall a b. (a -> b) -> a -> b
$ MQTTConfig
conf MQTTConfig -> Getting String MQTTConfig String -> String
forall s a. s -> Getting a s a -> a
^. Getting String MQTTConfig String
Lens' MQTTConfig String
mqttTopic

runMQTTPub :: String -> HostName -> ServiceName -> IO MQTTClient
runMQTTPub :: String -> String -> String -> IO MQTTClient
runMQTTPub String
name String
host String
port =
    let (Just URI
uri) = String -> Maybe URI
parseURI (String -> Maybe URI) -> String -> Maybe URI
forall a b. (a -> b) -> a -> b
$ String
"mqtt://" String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
host String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
":" String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
port
    in  MQTTConfig -> URI -> IO MQTTClient
connectURI (String -> String -> String -> MessageCallback -> MQTTConfig
netmqttConf String
name String
host String
port MessageCallback
NoCallback) URI
uri


runMQTTSub :: Store alpha => String -> NT.MQTTConfig -> Metrics -> U.InChan (Event alpha) -> IO ()
runMQTTSub :: forall alpha.
Store alpha =>
String -> MQTTConfig -> Metrics -> InChan (Event alpha) -> IO ()
runMQTTSub String
name MQTTConfig
conf Metrics
met InChan (Event alpha)
chan = do
    let h :: String
h = MQTTConfig
conf MQTTConfig -> Getting String MQTTConfig String -> String
forall s a. s -> Getting a s a -> a
^. (NetConfig -> Const String NetConfig)
-> MQTTConfig -> Const String MQTTConfig
Lens' MQTTConfig NetConfig
mqttConn ((NetConfig -> Const String NetConfig)
 -> MQTTConfig -> Const String MQTTConfig)
-> ((String -> Const String String)
    -> NetConfig -> Const String NetConfig)
-> Getting String MQTTConfig String
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (String -> Const String String)
-> NetConfig -> Const String NetConfig
Lens' NetConfig String
host
        p :: String
p = MQTTConfig
conf MQTTConfig -> Getting String MQTTConfig String -> String
forall s a. s -> Getting a s a -> a
^. (NetConfig -> Const String NetConfig)
-> MQTTConfig -> Const String MQTTConfig
Lens' MQTTConfig NetConfig
mqttConn ((NetConfig -> Const String NetConfig)
 -> MQTTConfig -> Const String MQTTConfig)
-> ((String -> Const String String)
    -> NetConfig -> Const String NetConfig)
-> Getting String MQTTConfig String
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (String -> Const String String)
-> NetConfig -> Const String NetConfig
Lens' NetConfig String
port
        (Just URI
uri) = String -> Maybe URI
parseURI (String -> Maybe URI) -> String -> Maybe URI
forall a b. (a -> b) -> a -> b
$ String
"mqtt://" String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
h String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
":" String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
p
    MQTTClient
mc <- MQTTConfig -> URI -> IO MQTTClient
connectURI (String -> String -> String -> MessageCallback -> MQTTConfig
netmqttConf String
name String
h String
p ((MQTTClient -> Topic -> ByteString -> [Property] -> IO ())
-> MessageCallback
SimpleCallback ((MQTTClient -> Topic -> ByteString -> [Property] -> IO ())
 -> MessageCallback)
-> (MQTTClient -> Topic -> ByteString -> [Property] -> IO ())
-> MessageCallback
forall a b. (a -> b) -> a -> b
$ Metrics
-> InChan (Event alpha)
-> MQTTClient
-> Topic
-> ByteString
-> [Property]
-> IO ()
forall alpha.
Store alpha =>
Metrics
-> InChan (Event alpha)
-> MQTTClient
-> Topic
-> ByteString
-> [Property]
-> IO ()
mqttMessageCallback Metrics
met InChan (Event alpha)
chan)) URI
uri
    ([Either SubErr QoS], [Property]) -> IO ()
forall a. Show a => a -> IO ()
print (([Either SubErr QoS], [Property]) -> IO ())
-> IO ([Either SubErr QoS], [Property]) -> IO ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< MQTTClient
-> [(Filter, SubOptions)]
-> [Property]
-> IO ([Either SubErr QoS], [Property])
subscribe MQTTClient
mc ((Filter -> (Filter, SubOptions))
-> [Filter] -> [(Filter, SubOptions)]
forall a b. (a -> b) -> [a] -> [b]
map (\Filter
x -> (Filter
x, SubOptions
subOptions)) [Filter
filtr]) []
    MQTTClient -> IO ()
waitForClient MQTTClient
mc
    where
        filtr :: Filter
filtr = (Maybe Filter -> Filter
forall a. HasCallStack => Maybe a -> a
fromJust (Maybe Filter -> Filter)
-> (String -> Maybe Filter) -> String -> Filter
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> Maybe Filter
mkFilter (Text -> Maybe Filter)
-> (String -> Text) -> String -> Maybe Filter
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> Text
T.pack) (MQTTConfig
conf MQTTConfig -> Getting String MQTTConfig String -> String
forall s a. s -> Getting a s a -> a
^. Getting String MQTTConfig String
Lens' MQTTConfig String
mqttTopic)


mqttMessageCallback :: Store alpha => Metrics -> U.InChan (Event alpha) -> MQTTClient -> Topic -> BLC.ByteString -> [Property] -> IO ()
mqttMessageCallback :: forall alpha.
Store alpha =>
Metrics
-> InChan (Event alpha)
-> MQTTClient
-> Topic
-> ByteString
-> [Property]
-> IO ()
mqttMessageCallback Metrics
met InChan (Event alpha)
chan MQTTClient
mc Topic
topic ByteString
msg [Property]
_ =
    let bmsg :: ByteString
bmsg = ByteString -> ByteString
BLC.toStrict ByteString
msg
    in  Counter -> IO ()
PC.inc (Metrics -> Counter
_ingressEvents Metrics
met)
            IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Int -> Counter -> IO ()
PC.add (ByteString -> Int
B.length ByteString
bmsg) (Metrics -> Counter
_ingressBytes Metrics
met)
            IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> case ByteString -> Either PeekException (Event alpha)
forall a. Store a => ByteString -> Either PeekException a
decode ByteString
bmsg of
                    Right Event alpha
event -> InChan (Event alpha) -> Event alpha -> IO ()
forall a. InChan a -> a -> IO ()
U.writeChan InChan (Event alpha)
chan Event alpha
event
                    Left  PeekException
_     -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()


netmqttConf :: String -> HostName -> ServiceName -> MessageCallback -> MQTT.MQTTConfig
netmqttConf :: String -> String -> String -> MessageCallback -> MQTTConfig
netmqttConf String
name String
host String
port MessageCallback
msgCB =
    MQTTConfig
mqttConfig
        { _hostname :: String
MQTT._hostname = String
host
        , _port :: Int
MQTT._port     = String -> Int
forall a. Read a => String -> a
read String
port
        , _connID :: String
_connID        = String
name
        , _username :: Maybe String
_username      = String -> Maybe String
forall a. a -> Maybe a
Just String
"striot"
        , _password :: Maybe String
_password      = String -> Maybe String
forall a. a -> Maybe a
Just String
"striot"
        , _msgCB :: MessageCallback
_msgCB         = MessageCallback
msgCB }


mqttSubOptions :: SubOptions
mqttSubOptions :: SubOptions
mqttSubOptions =
    SubOptions
subOptions
        { _retainHandling :: RetainHandling
_retainHandling    = RetainHandling
SendOnSubscribeNew
        , _retainAsPublished :: Bool
_retainAsPublished = Bool
True
        , _noLocal :: Bool
_noLocal           = Bool
True
        , _subQoS :: QoS
_subQoS            = QoS
QoS0 }