module Striot.Nodes.Kafka
( sendStreamKafka
, runKafkaConsumer
) where

import           Control.Concurrent                       (threadDelay)
import           Control.Concurrent.Chan.Unagi.Bounded    as U
import qualified Control.Exception                        as E (bracket)
import           Control.Lens
import           Control.Monad                            (forever, void)
import qualified Data.ByteString                          as B (ByteString,
                                                                length)
import           Data.Store                               (Store, decode,
                                                           encode)
import           Data.Text                                as T (Text, pack)
import           Kafka.Consumer                           as KC
import           Kafka.Producer                           as KP
import           Striot.FunctionalIoTtypes
import           Striot.Nodes.Types
import           System.Metrics.Prometheus.Metric.Counter as PC (add, inc)
import           System.Metrics.Prometheus.Metric.Gauge   as PG (dec, inc)


sendStreamKafka :: Store alpha => String -> KafkaConfig -> Metrics -> Stream alpha -> IO ()
sendStreamKafka :: forall alpha.
Store alpha =>
HostName -> KafkaConfig -> Metrics -> Stream alpha -> IO ()
sendStreamKafka HostName
name KafkaConfig
conf Metrics
met Stream alpha
stream =
    IO (Either KafkaError KafkaProducer)
-> (Either KafkaError KafkaProducer -> IO ())
-> (Either KafkaError KafkaProducer -> IO (Either KafkaError ()))
-> IO (Either KafkaError ())
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
E.bracket IO (Either KafkaError KafkaProducer)
mkProducer Either KafkaError KafkaProducer -> IO ()
forall {a}. Either a KafkaProducer -> IO ()
clProducer Either KafkaError KafkaProducer -> IO (Either KafkaError ())
runHandler IO (Either KafkaError ())
-> (Either KafkaError () -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Either KafkaError () -> IO ()
forall a. Show a => a -> IO ()
print
        where
          mkProducer :: IO (Either KafkaError KafkaProducer)
mkProducer              = Gauge -> IO ()
PG.inc (Metrics -> Gauge
_egressConn Metrics
met)
                                    IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> HostName -> IO ()
forall a. Show a => a -> IO ()
print HostName
"create new producer"
                                    IO ()
-> IO (Either KafkaError KafkaProducer)
-> IO (Either KafkaError KafkaProducer)
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> ProducerProperties -> IO (Either KafkaError KafkaProducer)
forall (m :: * -> *).
MonadIO m =>
ProducerProperties -> m (Either KafkaError KafkaProducer)
newProducer (KafkaConfig -> ProducerProperties
producerProps KafkaConfig
conf)
          clProducer :: Either a KafkaProducer -> IO ()
clProducer (Left a
_)     = HostName -> IO ()
forall a. Show a => a -> IO ()
print HostName
"error close producer"
                                    IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
          clProducer (Right KafkaProducer
prod) = Gauge -> IO ()
PG.dec (Metrics -> Gauge
_egressConn Metrics
met)
                                    IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> KafkaProducer -> IO ()
forall (m :: * -> *). MonadIO m => KafkaProducer -> m ()
closeProducer KafkaProducer
prod
                                    IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> HostName -> IO ()
forall a. Show a => a -> IO ()
print HostName
"close producer"
          runHandler :: Either KafkaError KafkaProducer -> IO (Either KafkaError ())
runHandler (Left KafkaError
err)   = Either KafkaError () -> IO (Either KafkaError ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Either KafkaError () -> IO (Either KafkaError ()))
-> Either KafkaError () -> IO (Either KafkaError ())
forall a b. (a -> b) -> a -> b
$ KafkaError -> Either KafkaError ()
forall a b. a -> Either a b
Left KafkaError
err
          runHandler (Right KafkaProducer
prod) = HostName -> IO ()
forall a. Show a => a -> IO ()
print HostName
"runhandler producer"
                                    IO () -> IO (Either KafkaError ()) -> IO (Either KafkaError ())
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> KafkaProducer
-> TopicName
-> Stream alpha
-> Metrics
-> IO (Either KafkaError ())
forall alpha.
Store alpha =>
KafkaProducer
-> TopicName
-> Stream alpha
-> Metrics
-> IO (Either KafkaError ())
sendMessagesKafka KafkaProducer
prod (Text -> TopicName
TopicName (Text -> TopicName) -> (HostName -> Text) -> HostName -> TopicName
forall b c a. (b -> c) -> (a -> b) -> a -> c
. HostName -> Text
T.pack (HostName -> TopicName) -> HostName -> TopicName
forall a b. (a -> b) -> a -> b
$ KafkaConfig
conf KafkaConfig -> Getting HostName KafkaConfig HostName -> HostName
forall s a. s -> Getting a s a -> a
^. Getting HostName KafkaConfig HostName
Lens' KafkaConfig HostName
kafkaTopic) Stream alpha
stream Metrics
met


kafkaConnectDelayMs :: Int
kafkaConnectDelayMs :: Int
kafkaConnectDelayMs = Int
300000


producerProps :: KafkaConfig -> ProducerProperties
producerProps :: KafkaConfig -> ProducerProperties
producerProps KafkaConfig
conf =
    [BrokerAddress] -> ProducerProperties
KP.brokersList [Text -> BrokerAddress
BrokerAddress (Text -> BrokerAddress) -> Text -> BrokerAddress
forall a b. (a -> b) -> a -> b
$ KafkaConfig -> Text
brokerAddress KafkaConfig
conf]
       ProducerProperties -> ProducerProperties -> ProducerProperties
forall a. Semigroup a => a -> a -> a
<> KafkaLogLevel -> ProducerProperties
KP.logLevel KafkaLogLevel
KafkaLogInfo


sendMessagesKafka :: Store alpha => KafkaProducer -> TopicName -> Stream alpha -> Metrics -> IO (Either KafkaError ())
sendMessagesKafka :: forall alpha.
Store alpha =>
KafkaProducer
-> TopicName
-> Stream alpha
-> Metrics
-> IO (Either KafkaError ())
sendMessagesKafka KafkaProducer
prod TopicName
topic Stream alpha
stream Metrics
met = do
    (Event alpha -> IO ()) -> Stream alpha -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (\Event alpha
x -> do
            let val :: ByteString
val = Event alpha -> ByteString
forall a. Store a => a -> ByteString
encode Event alpha
x
            KafkaProducer -> ProducerRecord -> IO (Maybe KafkaError)
forall (m :: * -> *).
MonadIO m =>
KafkaProducer -> ProducerRecord -> m (Maybe KafkaError)
produceMessage KafkaProducer
prod (TopicName -> Maybe ByteString -> Maybe ByteString -> ProducerRecord
mkMessage TopicName
topic Maybe ByteString
forall a. Maybe a
Nothing (ByteString -> Maybe ByteString
forall a. a -> Maybe a
Just ByteString
val))
                IO (Maybe KafkaError) -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Counter -> IO ()
PC.inc (Metrics -> Counter
_egressEvents Metrics
met)
                IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Int -> Counter -> IO ()
PC.add (ByteString -> Int
B.length ByteString
val) (Metrics -> Counter
_egressBytes Metrics
met)
          ) Stream alpha
stream
    Either KafkaError () -> IO (Either KafkaError ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Either KafkaError () -> IO (Either KafkaError ()))
-> Either KafkaError () -> IO (Either KafkaError ())
forall a b. (a -> b) -> a -> b
$ () -> Either KafkaError ()
forall a b. b -> Either a b
Right ()


mkMessage :: TopicName -> Maybe B.ByteString -> Maybe B.ByteString -> ProducerRecord
mkMessage :: TopicName -> Maybe ByteString -> Maybe ByteString -> ProducerRecord
mkMessage TopicName
topic Maybe ByteString
k Maybe ByteString
v =
    ProducerRecord :: TopicName
-> ProducePartition
-> Maybe ByteString
-> Maybe ByteString
-> ProducerRecord
ProducerRecord
        { prTopic :: TopicName
prTopic     = TopicName
topic
        , prPartition :: ProducePartition
prPartition = ProducePartition
UnassignedPartition
        , prKey :: Maybe ByteString
prKey       = Maybe ByteString
k
        , prValue :: Maybe ByteString
prValue     = Maybe ByteString
v
        }


runKafkaConsumer :: Store alpha => String -> KafkaConfig -> Metrics -> U.InChan (Event alpha) -> IO ()
runKafkaConsumer :: forall alpha.
Store alpha =>
HostName -> KafkaConfig -> Metrics -> InChan (Event alpha) -> IO ()
runKafkaConsumer HostName
name KafkaConfig
conf Metrics
met InChan (Event alpha)
chan = IO (Either KafkaError KafkaConsumer)
-> (Either KafkaError KafkaConsumer -> IO ())
-> (Either KafkaError KafkaConsumer -> IO ())
-> IO ()
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
E.bracket IO (Either KafkaError KafkaConsumer)
mkConsumer Either KafkaError KafkaConsumer -> IO ()
forall {a}. Either a KafkaConsumer -> IO ()
clConsumer (InChan (Event alpha) -> Either KafkaError KafkaConsumer -> IO ()
forall {alpha} {a}.
Store alpha =>
InChan (Event alpha) -> Either a KafkaConsumer -> IO ()
runHandler InChan (Event alpha)
chan)
    where
        mkConsumer :: IO (Either KafkaError KafkaConsumer)
mkConsumer                 = Gauge -> IO ()
PG.inc (Metrics -> Gauge
_ingressConn Metrics
met)
                                     IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> HostName -> IO ()
forall a. Show a => a -> IO ()
print HostName
"create new consumer"
                                     IO ()
-> IO (Either KafkaError KafkaConsumer)
-> IO (Either KafkaError KafkaConsumer)
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> ConsumerProperties
-> Subscription -> IO (Either KafkaError KafkaConsumer)
forall (m :: * -> *).
MonadIO m =>
ConsumerProperties
-> Subscription -> m (Either KafkaError KafkaConsumer)
newConsumer (KafkaConfig -> ConsumerProperties
consumerProps KafkaConfig
conf)
                                                    (TopicName -> Subscription
consumerSub (TopicName -> Subscription) -> TopicName -> Subscription
forall a b. (a -> b) -> a -> b
$ Text -> TopicName
TopicName (Text -> TopicName) -> (HostName -> Text) -> HostName -> TopicName
forall b c a. (b -> c) -> (a -> b) -> a -> c
. HostName -> Text
T.pack (HostName -> TopicName) -> HostName -> TopicName
forall a b. (a -> b) -> a -> b
$ KafkaConfig
conf KafkaConfig -> Getting HostName KafkaConfig HostName -> HostName
forall s a. s -> Getting a s a -> a
^. Getting HostName KafkaConfig HostName
Lens' KafkaConfig HostName
kafkaTopic)
        clConsumer :: Either a KafkaConsumer -> IO ()
clConsumer      (Left a
err) = HostName -> IO ()
forall a. Show a => a -> IO ()
print HostName
"error close consumer"
                                     IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
        clConsumer      (Right KafkaConsumer
kc) = IO () -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ KafkaConsumer -> IO (Maybe KafkaError)
forall (m :: * -> *).
MonadIO m =>
KafkaConsumer -> m (Maybe KafkaError)
closeConsumer KafkaConsumer
kc
                                          IO (Maybe KafkaError) -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Gauge -> IO ()
PG.dec (Metrics -> Gauge
_ingressConn Metrics
met)
                                          IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> HostName -> IO ()
forall a. Show a => a -> IO ()
print HostName
"close consumer"
        runHandler :: InChan (Event alpha) -> Either a KafkaConsumer -> IO ()
runHandler InChan (Event alpha)
_    (Left a
err) = HostName -> IO ()
forall a. Show a => a -> IO ()
print HostName
"error handler close consumer"
                                     IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
        runHandler InChan (Event alpha)
chan (Right KafkaConsumer
kc) = HostName -> IO ()
forall a. Show a => a -> IO ()
print HostName
"runhandler consumer"
                                     IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Metrics -> KafkaConsumer -> InChan (Event alpha) -> IO ()
forall alpha.
Store alpha =>
Metrics -> KafkaConsumer -> InChan (Event alpha) -> IO ()
processKafkaMessages Metrics
met KafkaConsumer
kc InChan (Event alpha)
chan


processKafkaMessages :: Store alpha => Metrics -> KafkaConsumer -> U.InChan (Event alpha) -> IO ()
processKafkaMessages :: forall alpha.
Store alpha =>
Metrics -> KafkaConsumer -> InChan (Event alpha) -> IO ()
processKafkaMessages Metrics
met KafkaConsumer
kc InChan (Event alpha)
chan = IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
    Int -> IO ()
threadDelay Int
kafkaConnectDelayMs
    Either
  KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))
msg <- KafkaConsumer
-> Timeout
-> IO
     (Either
        KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
forall (m :: * -> *).
MonadIO m =>
KafkaConsumer
-> Timeout
-> m (Either
        KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
pollMessage KafkaConsumer
kc (Int -> Timeout
Timeout Int
50)
    (KafkaError -> IO ())
-> (ConsumerRecord (Maybe ByteString) (Maybe ByteString) -> IO ())
-> Either
     KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))
-> IO ()
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either (\KafkaError
_ -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()) ConsumerRecord (Maybe ByteString) (Maybe ByteString) -> IO ()
forall {k}. ConsumerRecord k (Maybe ByteString) -> IO ()
extractValue Either
  KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))
msg
      where
        extractValue :: ConsumerRecord k (Maybe ByteString) -> IO ()
extractValue ConsumerRecord k (Maybe ByteString)
m = IO () -> (ByteString -> IO ()) -> Maybe ByteString -> IO ()
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (HostName -> IO ()
forall a. Show a => a -> IO ()
print HostName
"kafka-error: crValue Nothing") ByteString -> IO ()
writeRight (ConsumerRecord k (Maybe ByteString) -> Maybe ByteString
forall k v. ConsumerRecord k v -> v
crValue ConsumerRecord k (Maybe ByteString)
m)
        writeRight :: ByteString -> IO ()
writeRight   ByteString
v = (PeekException -> IO ())
-> (Event alpha -> IO ())
-> Either PeekException (Event alpha)
-> IO ()
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either (\PeekException
err -> HostName -> IO ()
forall a. Show a => a -> IO ()
print (HostName -> IO ()) -> HostName -> IO ()
forall a b. (a -> b) -> a -> b
$ HostName
"decode-error: " HostName -> HostName -> HostName
forall a. [a] -> [a] -> [a]
++ PeekException -> HostName
forall a. Show a => a -> HostName
show PeekException
err)
                                (\Event alpha
x -> do
                                    Counter -> IO ()
PC.inc (Metrics -> Counter
_ingressEvents Metrics
met)
                                        IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Int -> Counter -> IO ()
PC.add (ByteString -> Int
B.length ByteString
v) (Metrics -> Counter
_ingressBytes Metrics
met)
                                    InChan (Event alpha) -> Event alpha -> IO ()
forall a. InChan a -> a -> IO ()
U.writeChan InChan (Event alpha)
chan Event alpha
x)
                                (ByteString -> Either PeekException (Event alpha)
forall a. Store a => ByteString -> Either PeekException a
decode ByteString
v)


consumerProps :: KafkaConfig -> ConsumerProperties
consumerProps :: KafkaConfig -> ConsumerProperties
consumerProps KafkaConfig
conf =
    [BrokerAddress] -> ConsumerProperties
KC.brokersList [Text -> BrokerAddress
BrokerAddress (Text -> BrokerAddress) -> Text -> BrokerAddress
forall a b. (a -> b) -> a -> b
$ KafkaConfig -> Text
brokerAddress KafkaConfig
conf]
         ConsumerProperties -> ConsumerProperties -> ConsumerProperties
forall a. Semigroup a => a -> a -> a
<> ConsumerGroupId -> ConsumerProperties
groupId (Text -> ConsumerGroupId
ConsumerGroupId (Text -> ConsumerGroupId)
-> (HostName -> Text) -> HostName -> ConsumerGroupId
forall b c a. (b -> c) -> (a -> b) -> a -> c
. HostName -> Text
T.pack (HostName -> ConsumerGroupId) -> HostName -> ConsumerGroupId
forall a b. (a -> b) -> a -> b
$ KafkaConfig
conf KafkaConfig -> Getting HostName KafkaConfig HostName -> HostName
forall s a. s -> Getting a s a -> a
^. Getting HostName KafkaConfig HostName
Lens' KafkaConfig HostName
kafkaConGroup)
         ConsumerProperties -> ConsumerProperties -> ConsumerProperties
forall a. Semigroup a => a -> a -> a
<> KafkaLogLevel -> ConsumerProperties
KC.logLevel KafkaLogLevel
KafkaLogInfo
         ConsumerProperties -> ConsumerProperties -> ConsumerProperties
forall a. Semigroup a => a -> a -> a
<> CallbackPollMode -> ConsumerProperties
KC.callbackPollMode CallbackPollMode
CallbackPollModeSync


consumerSub :: TopicName -> Subscription
consumerSub :: TopicName -> Subscription
consumerSub TopicName
topic = [TopicName] -> Subscription
topics [TopicName
topic]
                    Subscription -> Subscription -> Subscription
forall a. Semigroup a => a -> a -> a
<> OffsetReset -> Subscription
offsetReset OffsetReset
Earliest


brokerAddress :: KafkaConfig -> T.Text
brokerAddress :: KafkaConfig -> Text
brokerAddress KafkaConfig
conf = HostName -> Text
T.pack (HostName -> Text) -> HostName -> Text
forall a b. (a -> b) -> a -> b
$ (KafkaConfig
conf KafkaConfig -> Getting HostName KafkaConfig HostName -> HostName
forall s a. s -> Getting a s a -> a
^. (NetConfig -> Const HostName NetConfig)
-> KafkaConfig -> Const HostName KafkaConfig
Lens' KafkaConfig NetConfig
kafkaConn ((NetConfig -> Const HostName NetConfig)
 -> KafkaConfig -> Const HostName KafkaConfig)
-> ((HostName -> Const HostName HostName)
    -> NetConfig -> Const HostName NetConfig)
-> Getting HostName KafkaConfig HostName
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (HostName -> Const HostName HostName)
-> NetConfig -> Const HostName NetConfig
Lens' NetConfig HostName
host) HostName -> HostName -> HostName
forall a. [a] -> [a] -> [a]
++ HostName
":" HostName -> HostName -> HostName
forall a. [a] -> [a] -> [a]
++ (KafkaConfig
conf KafkaConfig -> Getting HostName KafkaConfig HostName -> HostName
forall s a. s -> Getting a s a -> a
^. (NetConfig -> Const HostName NetConfig)
-> KafkaConfig -> Const HostName KafkaConfig
Lens' KafkaConfig NetConfig
kafkaConn ((NetConfig -> Const HostName NetConfig)
 -> KafkaConfig -> Const HostName KafkaConfig)
-> ((HostName -> Const HostName HostName)
    -> NetConfig -> Const HostName NetConfig)
-> Getting HostName KafkaConfig HostName
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (HostName -> Const HostName HostName)
-> NetConfig -> Const HostName NetConfig
Lens' NetConfig HostName
port)