module Striot.Nodes.MQTT ( sendStreamMQTT , runMQTTSub ) where import Control.Concurrent.Chan.Unagi.Bounded as U import Control.DeepSeq (force) import qualified Control.Exception as E (evaluate) import Control.Lens import qualified Data.ByteString as B (length) import qualified Data.ByteString.Lazy.Char8 as BLC import Data.Store (Store, decode, encode) import Data.Maybe (fromJust) import Data.Text as T (pack) import Network.MQTT.Client as MQTT hiding (Timeout) import Network.MQTT.Topic (mkTopic, mkFilter) import Network.MQTT.Types (RetainHandling (..)) import Network.Socket (HostName, ServiceName) import Network.URI (parseURI) import Striot.FunctionalIoTtypes import Striot.Nodes.Types as NT import System.Metrics.Prometheus.Metric.Counter as PC (add, inc) import System.Metrics.Prometheus.Metric.Gauge as PG (dec, inc) sendStreamMQTT :: Store alpha => String -> NT.MQTTConfig -> Metrics -> Stream alpha -> IO () sendStreamMQTT :: forall alpha. Store alpha => String -> MQTTConfig -> Metrics -> Stream alpha -> IO () sendStreamMQTT String name MQTTConfig conf Metrics met Stream alpha stream = do MQTTClient mc <- String -> String -> String -> IO MQTTClient runMQTTPub String name (MQTTConfig conf MQTTConfig -> Getting String MQTTConfig String -> String forall s a. s -> Getting a s a -> a ^. (NetConfig -> Const String NetConfig) -> MQTTConfig -> Const String MQTTConfig Lens' MQTTConfig NetConfig mqttConn ((NetConfig -> Const String NetConfig) -> MQTTConfig -> Const String MQTTConfig) -> ((String -> Const String String) -> NetConfig -> Const String NetConfig) -> Getting String MQTTConfig String forall b c a. (b -> c) -> (a -> b) -> a -> c . (String -> Const String String) -> NetConfig -> Const String NetConfig Lens' NetConfig String host) (MQTTConfig conf MQTTConfig -> Getting String MQTTConfig String -> String forall s a. s -> Getting a s a -> a ^. (NetConfig -> Const String NetConfig) -> MQTTConfig -> Const String MQTTConfig Lens' MQTTConfig NetConfig mqttConn ((NetConfig -> Const String NetConfig) -> MQTTConfig -> Const String MQTTConfig) -> ((String -> Const String String) -> NetConfig -> Const String NetConfig) -> Getting String MQTTConfig String forall b c a. (b -> c) -> (a -> b) -> a -> c . (String -> Const String String) -> NetConfig -> Const String NetConfig Lens' NetConfig String port) (Event alpha -> IO ()) -> Stream alpha -> IO () forall (t :: * -> *) (m :: * -> *) a b. (Foldable t, Monad m) => (a -> m b) -> t a -> m () mapM_ (\Event alpha x -> do ByteString val <- ByteString -> IO ByteString forall a. a -> IO a E.evaluate (ByteString -> IO ByteString) -> (Event alpha -> ByteString) -> Event alpha -> IO ByteString forall b c a. (b -> c) -> (a -> b) -> a -> c . ByteString -> ByteString forall a. NFData a => a -> a force (ByteString -> ByteString) -> (Event alpha -> ByteString) -> Event alpha -> ByteString forall b c a. (b -> c) -> (a -> b) -> a -> c . Event alpha -> ByteString forall a. Store a => a -> ByteString encode (Event alpha -> IO ByteString) -> Event alpha -> IO ByteString forall a b. (a -> b) -> a -> b $ Event alpha x Counter -> IO () PC.inc (Metrics -> Counter _egressEvents Metrics met) IO () -> IO () -> IO () forall (m :: * -> *) a b. Monad m => m a -> m b -> m b >> Int -> Counter -> IO () PC.add (ByteString -> Int B.length ByteString val) (Metrics -> Counter _egressBytes Metrics met) IO () -> IO () -> IO () forall (m :: * -> *) a b. Monad m => m a -> m b -> m b >> MQTTClient -> Topic -> ByteString -> Bool -> QoS -> [Property] -> IO () publishq MQTTClient mc Topic topic (ByteString -> ByteString BLC.fromStrict ByteString val) Bool False QoS QoS0 []) Stream alpha stream where topic :: Topic topic = (Maybe Topic -> Topic forall a. HasCallStack => Maybe a -> a fromJust (Maybe Topic -> Topic) -> (String -> Maybe Topic) -> String -> Topic forall b c a. (b -> c) -> (a -> b) -> a -> c . Text -> Maybe Topic mkTopic (Text -> Maybe Topic) -> (String -> Text) -> String -> Maybe Topic forall b c a. (b -> c) -> (a -> b) -> a -> c . String -> Text T.pack) (String -> Topic) -> String -> Topic forall a b. (a -> b) -> a -> b $ MQTTConfig conf MQTTConfig -> Getting String MQTTConfig String -> String forall s a. s -> Getting a s a -> a ^. Getting String MQTTConfig String Lens' MQTTConfig String mqttTopic runMQTTPub :: String -> HostName -> ServiceName -> IO MQTTClient runMQTTPub :: String -> String -> String -> IO MQTTClient runMQTTPub String name String host String port = let (Just URI uri) = String -> Maybe URI parseURI (String -> Maybe URI) -> String -> Maybe URI forall a b. (a -> b) -> a -> b $ String "mqtt://" String -> String -> String forall a. [a] -> [a] -> [a] ++ String host String -> String -> String forall a. [a] -> [a] -> [a] ++ String ":" String -> String -> String forall a. [a] -> [a] -> [a] ++ String port in MQTTConfig -> URI -> IO MQTTClient connectURI (String -> String -> String -> MessageCallback -> MQTTConfig netmqttConf String name String host String port MessageCallback NoCallback) URI uri runMQTTSub :: Store alpha => String -> NT.MQTTConfig -> Metrics -> U.InChan (Event alpha) -> IO () runMQTTSub :: forall alpha. Store alpha => String -> MQTTConfig -> Metrics -> InChan (Event alpha) -> IO () runMQTTSub String name MQTTConfig conf Metrics met InChan (Event alpha) chan = do let h :: String h = MQTTConfig conf MQTTConfig -> Getting String MQTTConfig String -> String forall s a. s -> Getting a s a -> a ^. (NetConfig -> Const String NetConfig) -> MQTTConfig -> Const String MQTTConfig Lens' MQTTConfig NetConfig mqttConn ((NetConfig -> Const String NetConfig) -> MQTTConfig -> Const String MQTTConfig) -> ((String -> Const String String) -> NetConfig -> Const String NetConfig) -> Getting String MQTTConfig String forall b c a. (b -> c) -> (a -> b) -> a -> c . (String -> Const String String) -> NetConfig -> Const String NetConfig Lens' NetConfig String host p :: String p = MQTTConfig conf MQTTConfig -> Getting String MQTTConfig String -> String forall s a. s -> Getting a s a -> a ^. (NetConfig -> Const String NetConfig) -> MQTTConfig -> Const String MQTTConfig Lens' MQTTConfig NetConfig mqttConn ((NetConfig -> Const String NetConfig) -> MQTTConfig -> Const String MQTTConfig) -> ((String -> Const String String) -> NetConfig -> Const String NetConfig) -> Getting String MQTTConfig String forall b c a. (b -> c) -> (a -> b) -> a -> c . (String -> Const String String) -> NetConfig -> Const String NetConfig Lens' NetConfig String port (Just URI uri) = String -> Maybe URI parseURI (String -> Maybe URI) -> String -> Maybe URI forall a b. (a -> b) -> a -> b $ String "mqtt://" String -> String -> String forall a. [a] -> [a] -> [a] ++ String h String -> String -> String forall a. [a] -> [a] -> [a] ++ String ":" String -> String -> String forall a. [a] -> [a] -> [a] ++ String p MQTTClient mc <- MQTTConfig -> URI -> IO MQTTClient connectURI (String -> String -> String -> MessageCallback -> MQTTConfig netmqttConf String name String h String p ((MQTTClient -> Topic -> ByteString -> [Property] -> IO ()) -> MessageCallback SimpleCallback ((MQTTClient -> Topic -> ByteString -> [Property] -> IO ()) -> MessageCallback) -> (MQTTClient -> Topic -> ByteString -> [Property] -> IO ()) -> MessageCallback forall a b. (a -> b) -> a -> b $ Metrics -> InChan (Event alpha) -> MQTTClient -> Topic -> ByteString -> [Property] -> IO () forall alpha. Store alpha => Metrics -> InChan (Event alpha) -> MQTTClient -> Topic -> ByteString -> [Property] -> IO () mqttMessageCallback Metrics met InChan (Event alpha) chan)) URI uri ([Either SubErr QoS], [Property]) -> IO () forall a. Show a => a -> IO () print (([Either SubErr QoS], [Property]) -> IO ()) -> IO ([Either SubErr QoS], [Property]) -> IO () forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b =<< MQTTClient -> [(Filter, SubOptions)] -> [Property] -> IO ([Either SubErr QoS], [Property]) subscribe MQTTClient mc ((Filter -> (Filter, SubOptions)) -> [Filter] -> [(Filter, SubOptions)] forall a b. (a -> b) -> [a] -> [b] map (\Filter x -> (Filter x, SubOptions subOptions)) [Filter filtr]) [] MQTTClient -> IO () waitForClient MQTTClient mc where filtr :: Filter filtr = (Maybe Filter -> Filter forall a. HasCallStack => Maybe a -> a fromJust (Maybe Filter -> Filter) -> (String -> Maybe Filter) -> String -> Filter forall b c a. (b -> c) -> (a -> b) -> a -> c . Text -> Maybe Filter mkFilter (Text -> Maybe Filter) -> (String -> Text) -> String -> Maybe Filter forall b c a. (b -> c) -> (a -> b) -> a -> c . String -> Text T.pack) (MQTTConfig conf MQTTConfig -> Getting String MQTTConfig String -> String forall s a. s -> Getting a s a -> a ^. Getting String MQTTConfig String Lens' MQTTConfig String mqttTopic) mqttMessageCallback :: Store alpha => Metrics -> U.InChan (Event alpha) -> MQTTClient -> Topic -> BLC.ByteString -> [Property] -> IO () mqttMessageCallback :: forall alpha. Store alpha => Metrics -> InChan (Event alpha) -> MQTTClient -> Topic -> ByteString -> [Property] -> IO () mqttMessageCallback Metrics met InChan (Event alpha) chan MQTTClient mc Topic topic ByteString msg [Property] _ = let bmsg :: ByteString bmsg = ByteString -> ByteString BLC.toStrict ByteString msg in Counter -> IO () PC.inc (Metrics -> Counter _ingressEvents Metrics met) IO () -> IO () -> IO () forall (m :: * -> *) a b. Monad m => m a -> m b -> m b >> Int -> Counter -> IO () PC.add (ByteString -> Int B.length ByteString bmsg) (Metrics -> Counter _ingressBytes Metrics met) IO () -> IO () -> IO () forall (m :: * -> *) a b. Monad m => m a -> m b -> m b >> case ByteString -> Either PeekException (Event alpha) forall a. Store a => ByteString -> Either PeekException a decode ByteString bmsg of Right Event alpha event -> InChan (Event alpha) -> Event alpha -> IO () forall a. InChan a -> a -> IO () U.writeChan InChan (Event alpha) chan Event alpha event Left PeekException _ -> () -> IO () forall (m :: * -> *) a. Monad m => a -> m a return () netmqttConf :: String -> HostName -> ServiceName -> MessageCallback -> MQTT.MQTTConfig netmqttConf :: String -> String -> String -> MessageCallback -> MQTTConfig netmqttConf String name String host String port MessageCallback msgCB = MQTTConfig mqttConfig { _hostname :: String MQTT._hostname = String host , _port :: Int MQTT._port = String -> Int forall a. Read a => String -> a read String port , _connID :: String _connID = String name , _username :: Maybe String _username = String -> Maybe String forall a. a -> Maybe a Just String "striot" , _password :: Maybe String _password = String -> Maybe String forall a. a -> Maybe a Just String "striot" , _msgCB :: MessageCallback _msgCB = MessageCallback msgCB } mqttSubOptions :: SubOptions mqttSubOptions :: SubOptions mqttSubOptions = SubOptions subOptions { _retainHandling :: RetainHandling _retainHandling = RetainHandling SendOnSubscribeNew , _retainAsPublished :: Bool _retainAsPublished = Bool True , _noLocal :: Bool _noLocal = Bool True , _subQoS :: QoS _subQoS = QoS QoS0 }