module Striot.Nodes.Kafka ( sendStreamKafka , runKafkaConsumer ) where import Control.Concurrent (threadDelay) import Control.Concurrent.Chan.Unagi.Bounded as U import qualified Control.Exception as E (bracket) import Control.Lens import Control.Monad (forever, void) import qualified Data.ByteString as B (ByteString, length) import Data.Store (Store, decode, encode) import Data.Text as T (Text, pack) import Kafka.Consumer as KC import Kafka.Producer as KP import Striot.FunctionalIoTtypes import Striot.Nodes.Types import System.Metrics.Prometheus.Metric.Counter as PC (add, inc) import System.Metrics.Prometheus.Metric.Gauge as PG (dec, inc) sendStreamKafka :: Store alpha => String -> KafkaConfig -> Metrics -> Stream alpha -> IO () sendStreamKafka :: forall alpha. Store alpha => HostName -> KafkaConfig -> Metrics -> Stream alpha -> IO () sendStreamKafka HostName name KafkaConfig conf Metrics met Stream alpha stream = IO (Either KafkaError KafkaProducer) -> (Either KafkaError KafkaProducer -> IO ()) -> (Either KafkaError KafkaProducer -> IO (Either KafkaError ())) -> IO (Either KafkaError ()) forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c E.bracket IO (Either KafkaError KafkaProducer) mkProducer Either KafkaError KafkaProducer -> IO () forall {a}. Either a KafkaProducer -> IO () clProducer Either KafkaError KafkaProducer -> IO (Either KafkaError ()) runHandler IO (Either KafkaError ()) -> (Either KafkaError () -> IO ()) -> IO () forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b >>= Either KafkaError () -> IO () forall a. Show a => a -> IO () print where mkProducer :: IO (Either KafkaError KafkaProducer) mkProducer = Gauge -> IO () PG.inc (Metrics -> Gauge _egressConn Metrics met) IO () -> IO () -> IO () forall (m :: * -> *) a b. Monad m => m a -> m b -> m b >> HostName -> IO () forall a. Show a => a -> IO () print HostName "create new producer" IO () -> IO (Either KafkaError KafkaProducer) -> IO (Either KafkaError KafkaProducer) forall (m :: * -> *) a b. Monad m => m a -> m b -> m b >> ProducerProperties -> IO (Either KafkaError KafkaProducer) forall (m :: * -> *). MonadIO m => ProducerProperties -> m (Either KafkaError KafkaProducer) newProducer (KafkaConfig -> ProducerProperties producerProps KafkaConfig conf) clProducer :: Either a KafkaProducer -> IO () clProducer (Left a _) = HostName -> IO () forall a. Show a => a -> IO () print HostName "error close producer" IO () -> IO () -> IO () forall (m :: * -> *) a b. Monad m => m a -> m b -> m b >> () -> IO () forall (m :: * -> *) a. Monad m => a -> m a return () clProducer (Right KafkaProducer prod) = Gauge -> IO () PG.dec (Metrics -> Gauge _egressConn Metrics met) IO () -> IO () -> IO () forall (m :: * -> *) a b. Monad m => m a -> m b -> m b >> KafkaProducer -> IO () forall (m :: * -> *). MonadIO m => KafkaProducer -> m () closeProducer KafkaProducer prod IO () -> IO () -> IO () forall (m :: * -> *) a b. Monad m => m a -> m b -> m b >> HostName -> IO () forall a. Show a => a -> IO () print HostName "close producer" runHandler :: Either KafkaError KafkaProducer -> IO (Either KafkaError ()) runHandler (Left KafkaError err) = Either KafkaError () -> IO (Either KafkaError ()) forall (m :: * -> *) a. Monad m => a -> m a return (Either KafkaError () -> IO (Either KafkaError ())) -> Either KafkaError () -> IO (Either KafkaError ()) forall a b. (a -> b) -> a -> b $ KafkaError -> Either KafkaError () forall a b. a -> Either a b Left KafkaError err runHandler (Right KafkaProducer prod) = HostName -> IO () forall a. Show a => a -> IO () print HostName "runhandler producer" IO () -> IO (Either KafkaError ()) -> IO (Either KafkaError ()) forall (m :: * -> *) a b. Monad m => m a -> m b -> m b >> KafkaProducer -> TopicName -> Stream alpha -> Metrics -> IO (Either KafkaError ()) forall alpha. Store alpha => KafkaProducer -> TopicName -> Stream alpha -> Metrics -> IO (Either KafkaError ()) sendMessagesKafka KafkaProducer prod (Text -> TopicName TopicName (Text -> TopicName) -> (HostName -> Text) -> HostName -> TopicName forall b c a. (b -> c) -> (a -> b) -> a -> c . HostName -> Text T.pack (HostName -> TopicName) -> HostName -> TopicName forall a b. (a -> b) -> a -> b $ KafkaConfig conf KafkaConfig -> Getting HostName KafkaConfig HostName -> HostName forall s a. s -> Getting a s a -> a ^. Getting HostName KafkaConfig HostName Lens' KafkaConfig HostName kafkaTopic) Stream alpha stream Metrics met kafkaConnectDelayMs :: Int kafkaConnectDelayMs :: Int kafkaConnectDelayMs = Int 300000 producerProps :: KafkaConfig -> ProducerProperties producerProps :: KafkaConfig -> ProducerProperties producerProps KafkaConfig conf = [BrokerAddress] -> ProducerProperties KP.brokersList [Text -> BrokerAddress BrokerAddress (Text -> BrokerAddress) -> Text -> BrokerAddress forall a b. (a -> b) -> a -> b $ KafkaConfig -> Text brokerAddress KafkaConfig conf] ProducerProperties -> ProducerProperties -> ProducerProperties forall a. Semigroup a => a -> a -> a <> KafkaLogLevel -> ProducerProperties KP.logLevel KafkaLogLevel KafkaLogInfo sendMessagesKafka :: Store alpha => KafkaProducer -> TopicName -> Stream alpha -> Metrics -> IO (Either KafkaError ()) sendMessagesKafka :: forall alpha. Store alpha => KafkaProducer -> TopicName -> Stream alpha -> Metrics -> IO (Either KafkaError ()) sendMessagesKafka KafkaProducer prod TopicName topic Stream alpha stream Metrics met = do (Event alpha -> IO ()) -> Stream alpha -> IO () forall (t :: * -> *) (m :: * -> *) a b. (Foldable t, Monad m) => (a -> m b) -> t a -> m () mapM_ (\Event alpha x -> do let val :: ByteString val = Event alpha -> ByteString forall a. Store a => a -> ByteString encode Event alpha x KafkaProducer -> ProducerRecord -> IO (Maybe KafkaError) forall (m :: * -> *). MonadIO m => KafkaProducer -> ProducerRecord -> m (Maybe KafkaError) produceMessage KafkaProducer prod (TopicName -> Maybe ByteString -> Maybe ByteString -> ProducerRecord mkMessage TopicName topic Maybe ByteString forall a. Maybe a Nothing (ByteString -> Maybe ByteString forall a. a -> Maybe a Just ByteString val)) IO (Maybe KafkaError) -> IO () -> IO () forall (m :: * -> *) a b. Monad m => m a -> m b -> m b >> Counter -> IO () PC.inc (Metrics -> Counter _egressEvents Metrics met) IO () -> IO () -> IO () forall (m :: * -> *) a b. Monad m => m a -> m b -> m b >> Int -> Counter -> IO () PC.add (ByteString -> Int B.length ByteString val) (Metrics -> Counter _egressBytes Metrics met) ) Stream alpha stream Either KafkaError () -> IO (Either KafkaError ()) forall (m :: * -> *) a. Monad m => a -> m a return (Either KafkaError () -> IO (Either KafkaError ())) -> Either KafkaError () -> IO (Either KafkaError ()) forall a b. (a -> b) -> a -> b $ () -> Either KafkaError () forall a b. b -> Either a b Right () mkMessage :: TopicName -> Maybe B.ByteString -> Maybe B.ByteString -> ProducerRecord mkMessage :: TopicName -> Maybe ByteString -> Maybe ByteString -> ProducerRecord mkMessage TopicName topic Maybe ByteString k Maybe ByteString v = ProducerRecord :: TopicName -> ProducePartition -> Maybe ByteString -> Maybe ByteString -> ProducerRecord ProducerRecord { prTopic :: TopicName prTopic = TopicName topic , prPartition :: ProducePartition prPartition = ProducePartition UnassignedPartition , prKey :: Maybe ByteString prKey = Maybe ByteString k , prValue :: Maybe ByteString prValue = Maybe ByteString v } runKafkaConsumer :: Store alpha => String -> KafkaConfig -> Metrics -> U.InChan (Event alpha) -> IO () runKafkaConsumer :: forall alpha. Store alpha => HostName -> KafkaConfig -> Metrics -> InChan (Event alpha) -> IO () runKafkaConsumer HostName name KafkaConfig conf Metrics met InChan (Event alpha) chan = IO (Either KafkaError KafkaConsumer) -> (Either KafkaError KafkaConsumer -> IO ()) -> (Either KafkaError KafkaConsumer -> IO ()) -> IO () forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c E.bracket IO (Either KafkaError KafkaConsumer) mkConsumer Either KafkaError KafkaConsumer -> IO () forall {a}. Either a KafkaConsumer -> IO () clConsumer (InChan (Event alpha) -> Either KafkaError KafkaConsumer -> IO () forall {alpha} {a}. Store alpha => InChan (Event alpha) -> Either a KafkaConsumer -> IO () runHandler InChan (Event alpha) chan) where mkConsumer :: IO (Either KafkaError KafkaConsumer) mkConsumer = Gauge -> IO () PG.inc (Metrics -> Gauge _ingressConn Metrics met) IO () -> IO () -> IO () forall (m :: * -> *) a b. Monad m => m a -> m b -> m b >> HostName -> IO () forall a. Show a => a -> IO () print HostName "create new consumer" IO () -> IO (Either KafkaError KafkaConsumer) -> IO (Either KafkaError KafkaConsumer) forall (m :: * -> *) a b. Monad m => m a -> m b -> m b >> ConsumerProperties -> Subscription -> IO (Either KafkaError KafkaConsumer) forall (m :: * -> *). MonadIO m => ConsumerProperties -> Subscription -> m (Either KafkaError KafkaConsumer) newConsumer (KafkaConfig -> ConsumerProperties consumerProps KafkaConfig conf) (TopicName -> Subscription consumerSub (TopicName -> Subscription) -> TopicName -> Subscription forall a b. (a -> b) -> a -> b $ Text -> TopicName TopicName (Text -> TopicName) -> (HostName -> Text) -> HostName -> TopicName forall b c a. (b -> c) -> (a -> b) -> a -> c . HostName -> Text T.pack (HostName -> TopicName) -> HostName -> TopicName forall a b. (a -> b) -> a -> b $ KafkaConfig conf KafkaConfig -> Getting HostName KafkaConfig HostName -> HostName forall s a. s -> Getting a s a -> a ^. Getting HostName KafkaConfig HostName Lens' KafkaConfig HostName kafkaTopic) clConsumer :: Either a KafkaConsumer -> IO () clConsumer (Left a err) = HostName -> IO () forall a. Show a => a -> IO () print HostName "error close consumer" IO () -> IO () -> IO () forall (m :: * -> *) a b. Monad m => m a -> m b -> m b >> () -> IO () forall (m :: * -> *) a. Monad m => a -> m a return () clConsumer (Right KafkaConsumer kc) = IO () -> IO () forall (f :: * -> *) a. Functor f => f a -> f () void (IO () -> IO ()) -> IO () -> IO () forall a b. (a -> b) -> a -> b $ KafkaConsumer -> IO (Maybe KafkaError) forall (m :: * -> *). MonadIO m => KafkaConsumer -> m (Maybe KafkaError) closeConsumer KafkaConsumer kc IO (Maybe KafkaError) -> IO () -> IO () forall (m :: * -> *) a b. Monad m => m a -> m b -> m b >> Gauge -> IO () PG.dec (Metrics -> Gauge _ingressConn Metrics met) IO () -> IO () -> IO () forall (m :: * -> *) a b. Monad m => m a -> m b -> m b >> HostName -> IO () forall a. Show a => a -> IO () print HostName "close consumer" runHandler :: InChan (Event alpha) -> Either a KafkaConsumer -> IO () runHandler InChan (Event alpha) _ (Left a err) = HostName -> IO () forall a. Show a => a -> IO () print HostName "error handler close consumer" IO () -> IO () -> IO () forall (m :: * -> *) a b. Monad m => m a -> m b -> m b >> () -> IO () forall (m :: * -> *) a. Monad m => a -> m a return () runHandler InChan (Event alpha) chan (Right KafkaConsumer kc) = HostName -> IO () forall a. Show a => a -> IO () print HostName "runhandler consumer" IO () -> IO () -> IO () forall (m :: * -> *) a b. Monad m => m a -> m b -> m b >> Metrics -> KafkaConsumer -> InChan (Event alpha) -> IO () forall alpha. Store alpha => Metrics -> KafkaConsumer -> InChan (Event alpha) -> IO () processKafkaMessages Metrics met KafkaConsumer kc InChan (Event alpha) chan processKafkaMessages :: Store alpha => Metrics -> KafkaConsumer -> U.InChan (Event alpha) -> IO () processKafkaMessages :: forall alpha. Store alpha => Metrics -> KafkaConsumer -> InChan (Event alpha) -> IO () processKafkaMessages Metrics met KafkaConsumer kc InChan (Event alpha) chan = IO () -> IO () forall (f :: * -> *) a b. Applicative f => f a -> f b forever (IO () -> IO ()) -> IO () -> IO () forall a b. (a -> b) -> a -> b $ do Int -> IO () threadDelay Int kafkaConnectDelayMs Either KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)) msg <- KafkaConsumer -> Timeout -> IO (Either KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))) forall (m :: * -> *). MonadIO m => KafkaConsumer -> Timeout -> m (Either KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))) pollMessage KafkaConsumer kc (Int -> Timeout Timeout Int 50) (KafkaError -> IO ()) -> (ConsumerRecord (Maybe ByteString) (Maybe ByteString) -> IO ()) -> Either KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)) -> IO () forall a c b. (a -> c) -> (b -> c) -> Either a b -> c either (\KafkaError _ -> () -> IO () forall (m :: * -> *) a. Monad m => a -> m a return ()) ConsumerRecord (Maybe ByteString) (Maybe ByteString) -> IO () forall {k}. ConsumerRecord k (Maybe ByteString) -> IO () extractValue Either KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)) msg where extractValue :: ConsumerRecord k (Maybe ByteString) -> IO () extractValue ConsumerRecord k (Maybe ByteString) m = IO () -> (ByteString -> IO ()) -> Maybe ByteString -> IO () forall b a. b -> (a -> b) -> Maybe a -> b maybe (HostName -> IO () forall a. Show a => a -> IO () print HostName "kafka-error: crValue Nothing") ByteString -> IO () writeRight (ConsumerRecord k (Maybe ByteString) -> Maybe ByteString forall k v. ConsumerRecord k v -> v crValue ConsumerRecord k (Maybe ByteString) m) writeRight :: ByteString -> IO () writeRight ByteString v = (PeekException -> IO ()) -> (Event alpha -> IO ()) -> Either PeekException (Event alpha) -> IO () forall a c b. (a -> c) -> (b -> c) -> Either a b -> c either (\PeekException err -> HostName -> IO () forall a. Show a => a -> IO () print (HostName -> IO ()) -> HostName -> IO () forall a b. (a -> b) -> a -> b $ HostName "decode-error: " HostName -> HostName -> HostName forall a. [a] -> [a] -> [a] ++ PeekException -> HostName forall a. Show a => a -> HostName show PeekException err) (\Event alpha x -> do Counter -> IO () PC.inc (Metrics -> Counter _ingressEvents Metrics met) IO () -> IO () -> IO () forall (m :: * -> *) a b. Monad m => m a -> m b -> m b >> Int -> Counter -> IO () PC.add (ByteString -> Int B.length ByteString v) (Metrics -> Counter _ingressBytes Metrics met) InChan (Event alpha) -> Event alpha -> IO () forall a. InChan a -> a -> IO () U.writeChan InChan (Event alpha) chan Event alpha x) (ByteString -> Either PeekException (Event alpha) forall a. Store a => ByteString -> Either PeekException a decode ByteString v) consumerProps :: KafkaConfig -> ConsumerProperties consumerProps :: KafkaConfig -> ConsumerProperties consumerProps KafkaConfig conf = [BrokerAddress] -> ConsumerProperties KC.brokersList [Text -> BrokerAddress BrokerAddress (Text -> BrokerAddress) -> Text -> BrokerAddress forall a b. (a -> b) -> a -> b $ KafkaConfig -> Text brokerAddress KafkaConfig conf] ConsumerProperties -> ConsumerProperties -> ConsumerProperties forall a. Semigroup a => a -> a -> a <> ConsumerGroupId -> ConsumerProperties groupId (Text -> ConsumerGroupId ConsumerGroupId (Text -> ConsumerGroupId) -> (HostName -> Text) -> HostName -> ConsumerGroupId forall b c a. (b -> c) -> (a -> b) -> a -> c . HostName -> Text T.pack (HostName -> ConsumerGroupId) -> HostName -> ConsumerGroupId forall a b. (a -> b) -> a -> b $ KafkaConfig conf KafkaConfig -> Getting HostName KafkaConfig HostName -> HostName forall s a. s -> Getting a s a -> a ^. Getting HostName KafkaConfig HostName Lens' KafkaConfig HostName kafkaConGroup) ConsumerProperties -> ConsumerProperties -> ConsumerProperties forall a. Semigroup a => a -> a -> a <> KafkaLogLevel -> ConsumerProperties KC.logLevel KafkaLogLevel KafkaLogInfo ConsumerProperties -> ConsumerProperties -> ConsumerProperties forall a. Semigroup a => a -> a -> a <> CallbackPollMode -> ConsumerProperties KC.callbackPollMode CallbackPollMode CallbackPollModeSync consumerSub :: TopicName -> Subscription consumerSub :: TopicName -> Subscription consumerSub TopicName topic = [TopicName] -> Subscription topics [TopicName topic] Subscription -> Subscription -> Subscription forall a. Semigroup a => a -> a -> a <> OffsetReset -> Subscription offsetReset OffsetReset Earliest brokerAddress :: KafkaConfig -> T.Text brokerAddress :: KafkaConfig -> Text brokerAddress KafkaConfig conf = HostName -> Text T.pack (HostName -> Text) -> HostName -> Text forall a b. (a -> b) -> a -> b $ (KafkaConfig conf KafkaConfig -> Getting HostName KafkaConfig HostName -> HostName forall s a. s -> Getting a s a -> a ^. (NetConfig -> Const HostName NetConfig) -> KafkaConfig -> Const HostName KafkaConfig Lens' KafkaConfig NetConfig kafkaConn ((NetConfig -> Const HostName NetConfig) -> KafkaConfig -> Const HostName KafkaConfig) -> ((HostName -> Const HostName HostName) -> NetConfig -> Const HostName NetConfig) -> Getting HostName KafkaConfig HostName forall b c a. (b -> c) -> (a -> b) -> a -> c . (HostName -> Const HostName HostName) -> NetConfig -> Const HostName NetConfig Lens' NetConfig HostName host) HostName -> HostName -> HostName forall a. [a] -> [a] -> [a] ++ HostName ":" HostName -> HostName -> HostName forall a. [a] -> [a] -> [a] ++ (KafkaConfig conf KafkaConfig -> Getting HostName KafkaConfig HostName -> HostName forall s a. s -> Getting a s a -> a ^. (NetConfig -> Const HostName NetConfig) -> KafkaConfig -> Const HostName KafkaConfig Lens' KafkaConfig NetConfig kafkaConn ((NetConfig -> Const HostName NetConfig) -> KafkaConfig -> Const HostName KafkaConfig) -> ((HostName -> Const HostName HostName) -> NetConfig -> Const HostName NetConfig) -> Getting HostName KafkaConfig HostName forall b c a. (b -> c) -> (a -> b) -> a -> c . (HostName -> Const HostName HostName) -> NetConfig -> Const HostName NetConfig Lens' NetConfig HostName port)