{-# OPTIONS_GHC -F -pgmF htfpp #-}
{-# OPTIONS_HADDOCK prune #-}
{-# LANGUAGE TemplateHaskell #-}

module Striot.CompileIoT ( createPartitions
                         , generateCode
                         , GenerateOpts(..)
                         , defaultOpts
                         , Partition
                         , PartitionMap
                         , writePart
                         , genDockerfile
                         , partitionGraph

                         , generateCodeFromStreamGraph
                         , nodeFn
                         , nodeType
                         , generateNodeSrc
                         , connectNodeId

                         , htf_thisModulesTests
                         ) where

import Data.List (intercalate, nub)
import Algebra.Graph
import Algebra.Graph.ToGraph (reachable)
import Test.Framework
import System.FilePath ((</>))
import System.Directory (createDirectoryIfMissing)
import Data.Function ((&))
import Data.Maybe (catMaybes)
import Data.List (nub,sort)
import Data.List.Match (compareLength)
import Language.Haskell.TH

import Striot.StreamGraph
import Striot.LogicalOptimiser
import Striot.Partition

------------------------------------------------------------------------------
-- StreamGraph Partitioning

-- | A 'Partition', a.k.a. *Node*, in a deployment.
-- Eventually we may define properties about Partitions in these types. For
-- now they are considered to be homogeneous. The chosen type just needs to
-- be enumerable.
type Partition = Int

-- |The user's desired partitioning of the input Graph.
-- Each element in the outer-most list corresponds to a distinct partition.
-- The inner-lists are the IDs of Operators to include in that partition.
type PartitionMap = [[Int]]

-- |`createPartitions` returns ([partitions], [inter-graph links])
-- where inter-graph links are the cut edges due to partitioning
createPartitions :: StreamGraph -> PartitionMap -> PartitionedGraph
createPartitions :: StreamGraph -> PartitionMap -> PartitionedGraph
createPartitions StreamGraph
_ [] = ([],StreamGraph
forall a. Graph a
empty)
createPartitions StreamGraph
g ([Int]
p:PartitionMap
ps) = (StreamGraph
thisGraphStreamGraph -> [StreamGraph] -> [StreamGraph]
forall a. a -> [a] -> [a]
:[StreamGraph]
tailParts, StreamGraph
edgesOut StreamGraph -> StreamGraph -> StreamGraph
forall a. Graph a -> Graph a -> Graph a
`overlay` StreamGraph
tailCuts) where
    fv :: StreamVertex -> Bool
fv StreamVertex
v       = (StreamVertex -> Int
vertexId StreamVertex
v) Int -> [Int] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` [Int]
p
    vs :: StreamGraph
vs         = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
vertices ([StreamVertex] -> StreamGraph) -> [StreamVertex] -> StreamGraph
forall a b. (a -> b) -> a -> b
$ (StreamVertex -> Bool) -> [StreamVertex] -> [StreamVertex]
forall a. (a -> Bool) -> [a] -> [a]
filter StreamVertex -> Bool
fv (StreamGraph -> [StreamVertex]
forall a. Ord a => Graph a -> [a]
vertexList StreamGraph
g)
    es :: StreamGraph
es         = [(StreamVertex, StreamVertex)] -> StreamGraph
forall a. [(a, a)] -> Graph a
edges ([(StreamVertex, StreamVertex)] -> StreamGraph)
-> [(StreamVertex, StreamVertex)] -> StreamGraph
forall a b. (a -> b) -> a -> b
$ ((StreamVertex, StreamVertex) -> Bool)
-> [(StreamVertex, StreamVertex)] -> [(StreamVertex, StreamVertex)]
forall a. (a -> Bool) -> [a] -> [a]
filter (\(StreamVertex
v1,StreamVertex
v2) -> (StreamVertex -> Bool
fv StreamVertex
v1) Bool -> Bool -> Bool
&& (StreamVertex -> Bool
fv StreamVertex
v2)) (StreamGraph -> [(StreamVertex, StreamVertex)]
forall a. Ord a => Graph a -> [(a, a)]
edgeList StreamGraph
g)
    thisGraph :: StreamGraph
thisGraph  = StreamGraph -> StreamGraph -> StreamGraph
forall a. Graph a -> Graph a -> Graph a
overlay StreamGraph
vs StreamGraph
es
    edgesOut :: StreamGraph
edgesOut   = [(StreamVertex, StreamVertex)] -> StreamGraph
forall a. [(a, a)] -> Graph a
edges ([(StreamVertex, StreamVertex)] -> StreamGraph)
-> [(StreamVertex, StreamVertex)] -> StreamGraph
forall a b. (a -> b) -> a -> b
$ ((StreamVertex, StreamVertex) -> Bool)
-> [(StreamVertex, StreamVertex)] -> [(StreamVertex, StreamVertex)]
forall a. (a -> Bool) -> [a] -> [a]
filter (\(StreamVertex
v1,StreamVertex
v2) -> (StreamVertex -> Bool
fv StreamVertex
v1) Bool -> Bool -> Bool
&& (Bool -> Bool
not(StreamVertex -> Bool
fv StreamVertex
v2))) (StreamGraph -> [(StreamVertex, StreamVertex)]
forall a. Ord a => Graph a -> [(a, a)]
edgeList StreamGraph
g)
    ([StreamGraph]
tailParts, StreamGraph
tailCuts) = StreamGraph -> PartitionMap -> PartitionedGraph
createPartitions StreamGraph
g PartitionMap
ps

unPartition :: PartitionedGraph -> Graph StreamVertex
unPartition :: PartitionedGraph -> StreamGraph
unPartition ([StreamGraph]
a,StreamGraph
b) = StreamGraph -> StreamGraph -> StreamGraph
forall a. Graph a -> Graph a -> Graph a
overlay StreamGraph
b (StreamGraph -> StreamGraph) -> StreamGraph -> StreamGraph
forall a b. (a -> b) -> a -> b
$ (StreamGraph -> StreamGraph -> StreamGraph)
-> StreamGraph -> [StreamGraph] -> StreamGraph
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
foldl StreamGraph -> StreamGraph -> StreamGraph
forall a. Graph a -> Graph a -> Graph a
overlay StreamGraph
forall a. Graph a
Empty [StreamGraph]
a

------------------------------------------------------------------------------
-- Code generation from StreamGraph definitions

{-
    a well-formed streamgraph:
        always starts with a Source?
        always ends with a Sink?
        always has just one Sink?
        is entirely connected?
        ...
    a well-formed partition spec:
        has >= 1 partition
        references node IDs that exist
        covers all node IDs?
        passes some kind of connectedness test?
-}

-- | Options for source code generation are captured in instances of the
-- 'GenerateOpts' data-type.
data GenerateOpts = GenerateOpts
  { GenerateOpts -> [String]
imports           :: [String]      -- ^ list of import statements to add to generated files
  , GenerateOpts -> [String]
packages          :: [String]      -- ^ list of Cabal packages to install within containers
  , GenerateOpts -> Maybe String
preSource         :: Maybe String  -- ^ code to run prior to starting 'nodeSource'
  , GenerateOpts -> [RewriteRule]
rules             :: [RewriteRule] -- ^ A list of rewrite rules for the logical optimiser
  , GenerateOpts -> Double
maxNodeUtil       :: Double        -- ^ The per-Partition utilisation limit
  , GenerateOpts -> Double
bandwidthLimit    :: Double        -- ^ A program-global maximum bandwidth limit
  }

-- | Sensible default values for 'GenerateOpts'. Users who wish to customise
-- options in 'GenerateOpts' are encouraged to derive from 'defaultOpts'.
defaultOpts :: GenerateOpts
defaultOpts = GenerateOpts :: [String]
-> [String]
-> Maybe String
-> [RewriteRule]
-> Double
-> Double
-> GenerateOpts
GenerateOpts
  { imports :: [String]
imports     = [ String
"Striot.FunctionalIoTtypes"
                  , String
"Striot.FunctionalProcessing"
                  , String
"Striot.Nodes"
                  , String
"Control.Concurrent"
                  , String
"Control.Category ((>>>))" -- (generated by rewrites)
                  ]
  , packages :: [String]
packages    = []
  , preSource :: Maybe String
preSource   = Maybe String
forall a. Maybe a
Nothing
  , rules :: [RewriteRule]
rules       = [RewriteRule]
defaultRewriteRules
  , maxNodeUtil :: Double
maxNodeUtil = Double
3.0 -- finger in the air
  , bandwidthLimit :: Double
bandwidthLimit = Double
31 -- same
  }

-- |Partitions the supplied `StreamGraph` according to the supplied `PartitionMap`
-- and options specified within the supplied `GenerateOpts` and returns a list of
-- the sub-graphs converted into source code and encoded as `String`s.
generateCode :: GenerateOpts -> StreamGraph -> PartitionMap -> [String]
generateCode :: GenerateOpts -> StreamGraph -> PartitionMap -> [String]
generateCode GenerateOpts
opts StreamGraph
sg PartitionMap
pm = let
    ([StreamGraph]
sgs,StreamGraph
cuts)      = StreamGraph -> PartitionMap -> PartitionedGraph
createPartitions StreamGraph
sg (PartitionMap -> PartitionMap
forall a. Ord a => [a] -> [a]
sort (([Int] -> [Int]) -> PartitionMap -> PartitionMap
forall a b. (a -> b) -> [a] -> [b]
map [Int] -> [Int]
forall a. Ord a => [a] -> [a]
sort PartitionMap
pm))
    enumeratedParts :: [(Integer, StreamGraph)]
enumeratedParts = [Integer] -> [StreamGraph] -> [(Integer, StreamGraph)]
forall a b. [a] -> [b] -> [(a, b)]
zip [Integer
1..] [StreamGraph]
sgs
    in ((Integer, StreamGraph) -> String)
-> [(Integer, StreamGraph)] -> [String]
forall a b. (a -> b) -> [a] -> [b]
map (GenerateOpts
-> [(Integer, StreamGraph)]
-> StreamGraph
-> (Integer, StreamGraph)
-> String
generateCodeFromStreamGraph GenerateOpts
opts [(Integer, StreamGraph)]
enumeratedParts StreamGraph
cuts) [(Integer, StreamGraph)]
enumeratedParts

-- TODO: the sorting of the `PartitionMap` is a work-around for
-- <https://github.com/striot/striot/issues/124>
--
-- TODO: there is no test coverage for generateCode

data NodeType = NodeSource | NodeSink | NodeLink deriving (Int -> NodeType -> ShowS
[NodeType] -> ShowS
NodeType -> String
(Int -> NodeType -> ShowS)
-> (NodeType -> String) -> ([NodeType] -> ShowS) -> Show NodeType
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [NodeType] -> ShowS
$cshowList :: [NodeType] -> ShowS
show :: NodeType -> String
$cshow :: NodeType -> String
showsPrec :: Int -> NodeType -> ShowS
$cshowsPrec :: Int -> NodeType -> ShowS
Show)

nodeType :: StreamGraph -> NodeType
nodeType :: StreamGraph -> NodeType
nodeType StreamGraph
sg = if StreamOperator -> Bool
isSource (StreamOperator -> Bool) -> StreamOperator -> Bool
forall a b. (a -> b) -> a -> b
$ StreamVertex -> StreamOperator
operator ([StreamVertex] -> StreamVertex
forall a. [a] -> a
head (StreamGraph -> [StreamVertex]
forall a. Ord a => Graph a -> [a]
vertexList StreamGraph
sg))
              then NodeType
NodeSource
              else if (StreamVertex -> StreamOperator
operator(StreamVertex -> StreamOperator)
-> (StreamGraph -> StreamVertex) -> StreamGraph -> StreamOperator
forall b c a. (b -> c) -> (a -> b) -> a -> c
.[StreamVertex] -> StreamVertex
forall a. [a] -> a
head([StreamVertex] -> StreamVertex)
-> (StreamGraph -> [StreamVertex]) -> StreamGraph -> StreamVertex
forall b c a. (b -> c) -> (a -> b) -> a -> c
.[StreamVertex] -> [StreamVertex]
forall a. [a] -> [a]
reverse([StreamVertex] -> [StreamVertex])
-> (StreamGraph -> [StreamVertex]) -> StreamGraph -> [StreamVertex]
forall b c a. (b -> c) -> (a -> b) -> a -> c
.StreamGraph -> [StreamVertex]
forall a. Ord a => Graph a -> [a]
vertexList) StreamGraph
sg StreamOperator -> StreamOperator -> Bool
forall a. Eq a => a -> a -> Bool
== StreamOperator
Sink
                   then NodeType
NodeSink
                   else NodeType
NodeLink

------------------------------------------------------------------------------

-- vertexList outputs *sorted* (By Ord a =>). That corresponds to the Id value for
-- our StreamVertex type
-- TODO consider Difference Lists here
generateCodeFromStreamGraph :: GenerateOpts -> [(Integer, StreamGraph)] -> StreamGraph -> (Integer,StreamGraph) -> String
generateCodeFromStreamGraph :: GenerateOpts
-> [(Integer, StreamGraph)]
-> StreamGraph
-> (Integer, StreamGraph)
-> String
generateCodeFromStreamGraph GenerateOpts
opts [(Integer, StreamGraph)]
parts StreamGraph
cuts (Integer
partId,StreamGraph
sg) = String -> [String] -> String
forall a. [a] -> [[a]] -> [a]
intercalate String
"\n" ([String] -> String) -> [String] -> String
forall a b. (a -> b) -> a -> b
$
    String
nodeId String -> [String] -> [String]
forall a. a -> [a] -> [a]
: -- convenience comment labelling the node/partition ID
    [String]
imports' [String] -> [String] -> [String]
forall a. [a] -> [a] -> [a]
++
    [(Integer, StreamGraph)] -> StreamGraph -> String
forall {t :: * -> *} {a}.
Foldable t =>
t a -> StreamGraph -> String
possibleSrcFn [(Integer, StreamGraph)]
parts StreamGraph
sg String -> [String] -> [String]
forall a. a -> [a] -> [a]
:
    [(Integer, StreamGraph)] -> StreamGraph -> String
forall {t :: * -> *} {a}.
Foldable t =>
t a -> StreamGraph -> String
possibleSinkFn [(Integer, StreamGraph)]
parts StreamGraph
sg String -> [String] -> [String]
forall a. a -> [a] -> [a]
:
    String
sgTypeSignature String -> [String] -> [String]
forall a. a -> [a] -> [a]
:
    String
sgIntro String -> [String] -> [String]
forall a. a -> [a] -> [a]
:
    [String]
sgBody [String] -> [String] -> [String]
forall a. [a] -> [a] -> [a]
++
    [String
padding String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
"in " String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
lastIdentifier,String
"\n",
    String
"main :: IO ()",
    [(Integer, StreamGraph)]
-> StreamGraph -> Integer -> StreamGraph -> GenerateOpts -> String
nodeFn [(Integer, StreamGraph)]
parts StreamGraph
sg Integer
partId StreamGraph
cuts GenerateOpts
opts] where

        nodeId :: String
nodeId = String
"-- node"String -> ShowS
forall a. [a] -> [a] -> [a]
++(Integer -> String
forall a. Show a => a -> String
show Integer
partId)
        padding :: String
padding = String
"    "
        pad :: [String] -> [String]
pad = ShowS -> [String] -> [String]
forall a b. (a -> b) -> [a] -> [b]
map (String
paddingString -> ShowS
forall a. [a] -> [a] -> [a]
++)

        sgTypeSignature :: String
sgTypeSignature = String
"streamGraphFn ::" String -> ShowS
forall a. [a] -> [a] -> [a]
++ case StreamGraph -> StreamOperator
startsWith StreamGraph
sg of
            StreamOperator
Join ->   String
" Stream "String -> ShowS
forall a. [a] -> [a] -> [a]
++StreamGraph -> String
inType StreamGraph
sgString -> ShowS
forall a. [a] -> [a] -> [a]
++String
" ->"
                    String -> ShowS
forall a. [a] -> [a] -> [a]
++String
" Stream "String -> ShowS
forall a. [a] -> [a] -> [a]
++StreamGraph -> String
inType StreamGraph
sgString -> ShowS
forall a. [a] -> [a] -> [a]
++String
" ->"
                    String -> ShowS
forall a. [a] -> [a] -> [a]
++String
" Stream "String -> ShowS
forall a. [a] -> [a] -> [a]
++StreamGraph -> String
outType StreamGraph
sg
            -- the merge takes place in TCP/IP prior to the streamGraphFn.
            -- note the re-use of outType (a) instead of inType ([a])
            StreamOperator
Merge -> String
" Stream "String -> ShowS
forall a. [a] -> [a] -> [a]
++StreamGraph -> String
outType StreamGraph
sgString -> ShowS
forall a. [a] -> [a] -> [a]
++String
" ->"String -> ShowS
forall a. [a] -> [a] -> [a]
++String
" Stream "String -> ShowS
forall a. [a] -> [a] -> [a]
++StreamGraph -> String
outType StreamGraph
sg
            StreamOperator
_     -> String
" Stream "String -> ShowS
forall a. [a] -> [a] -> [a]
++StreamGraph -> String
inType  StreamGraph
sgString -> ShowS
forall a. [a] -> [a] -> [a]
++String
" ->"String -> ShowS
forall a. [a] -> [a] -> [a]
++String
" Stream "String -> ShowS
forall a. [a] -> [a] -> [a]
++StreamGraph -> String
outType StreamGraph
sg

        sgIntro :: String
sgIntro = String
"streamGraphFn "String -> ShowS
forall a. [a] -> [a] -> [a]
++String
sgArgsString -> ShowS
forall a. [a] -> [a] -> [a]
++String
" = let"
        sgArgs :: String
sgArgs = if StreamGraph -> Bool
startsWithJoin StreamGraph
sg
            then String
"n1 n2"
            else String
"n1"
        sgBody :: [String]
sgBody = [String] -> [String]
pad ([String] -> [String]) -> [String] -> [String]
forall a b. (a -> b) -> a -> b
$ case [StreamVertex]
intVerts of
            [] -> [String
"n2 = n1"]
            [StreamVertex]
ns -> if StreamGraph -> Bool
startsWithJoin StreamGraph
sg
                  then ((Int, StreamVertex) -> String)
-> [(Int, StreamVertex)] -> [String]
forall a b. (a -> b) -> [a] -> [b]
map (Int, StreamVertex) -> String
generateCodeFromVertex ([Int] -> [StreamVertex] -> [(Int, StreamVertex)]
forall a b. [a] -> [b] -> [(a, b)]
zip [Int
3..] [StreamVertex]
ns)
                  else ((Int, StreamVertex) -> String)
-> [(Int, StreamVertex)] -> [String]
forall a b. (a -> b) -> [a] -> [b]
map (Int, StreamVertex) -> String
generateCodeFromVertex ([Int] -> [StreamVertex] -> [(Int, StreamVertex)]
forall a b. [a] -> [b] -> [(a, b)]
zip [Int
2..] [StreamVertex]
ns)

        imports' :: [String]
imports' = (ShowS -> [String] -> [String]
forall a b. (a -> b) -> [a] -> [b]
map (String
"import "String -> ShowS
forall a. [a] -> [a] -> [a]
++) (GenerateOpts -> [String]
imports GenerateOpts
opts)) [String] -> [String] -> [String]
forall a. [a] -> [a] -> [a]
++ [String
"\n"]
        lastIdentifier :: String
lastIdentifier = Char
'n'Char -> ShowS
forall a. a -> [a] -> [a]
:(Int -> String
forall a. Show a => a -> String
show (Int -> String) -> Int -> String
forall a b. (a -> b) -> a -> b
$ [StreamVertex] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [StreamVertex]
intVerts
            Int -> Int -> Int
forall a. Num a => a -> a -> a
+ if StreamGraph -> Bool
startsWithJoin StreamGraph
sg then Int
2 else Int
1)
        intVerts :: [StreamVertex]
intVerts= (StreamVertex -> Bool) -> [StreamVertex] -> [StreamVertex]
forall a. (a -> Bool) -> [a] -> [a]
filter (Bool -> Bool
not (Bool -> Bool) -> (StreamVertex -> Bool) -> StreamVertex -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamVertex -> Bool
singleton) ([StreamVertex] -> [StreamVertex])
-> [StreamVertex] -> [StreamVertex]
forall a b. (a -> b) -> a -> b
$ StreamGraph -> [StreamVertex]
forall a. Ord a => Graph a -> [a]
vertexList StreamGraph
sg

nodeFn :: [(Integer, StreamGraph)]
-> StreamGraph -> Integer -> StreamGraph -> GenerateOpts -> String
nodeFn [(Integer, StreamGraph)]
parts StreamGraph
sg Integer
partId StreamGraph
cuts GenerateOpts
opts =
    if [(Integer, StreamGraph)] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [(Integer, StreamGraph)]
parts Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
1
    then String
"main = nodeSimple src1 streamGraphFn sink1"
    else case (StreamGraph -> NodeType
nodeType StreamGraph
sg) of
        NodeType
NodeSource -> Integer
-> [Integer] -> GenerateOpts -> [(Integer, StreamGraph)] -> String
generateNodeSrc Integer
partId (StreamGraph -> [(Integer, StreamGraph)] -> StreamGraph -> [Integer]
connectNodeId StreamGraph
sg [(Integer, StreamGraph)]
parts StreamGraph
cuts) GenerateOpts
opts [(Integer, StreamGraph)]
parts
        NodeType
NodeLink   -> Integer -> String
generateNodeLink (Integer
partId Integer -> Integer -> Integer
forall a. Num a => a -> a -> a
+ Integer
1)
        NodeType
NodeSink   -> StreamGraph -> String
generateNodeSink StreamGraph
sg

possibleSrcFn :: t a -> StreamGraph -> String
possibleSrcFn t a
parts StreamGraph
sg =
    if   t a -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length t a
parts Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
1
    then StreamGraph -> String
generateSrcFn StreamGraph
sg
    else case (StreamGraph -> NodeType
nodeType StreamGraph
sg) of
             NodeType
NodeSource -> StreamGraph -> String
generateSrcFn StreamGraph
sg
             NodeType
_          -> String
""

possibleSinkFn :: t a -> StreamGraph -> String
possibleSinkFn t a
parts StreamGraph
sg =
    if   t a -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length t a
parts Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
1
    then StreamGraph -> String
generateSinkFn StreamGraph
sg
    else case (StreamGraph -> NodeType
nodeType StreamGraph
sg) of
             NodeType
NodeSink -> StreamGraph -> String
generateSinkFn StreamGraph
sg
             NodeType
_        -> String
""

possibleSrcSinkFn :: StreamGraph -> String
possibleSrcSinkFn StreamGraph
sg = case (StreamGraph -> NodeType
nodeType StreamGraph
sg) of
    NodeType
NodeSource -> StreamGraph -> String
generateSrcFn StreamGraph
sg
    NodeType
NodeLink   -> String
""
    NodeType
NodeSink   -> StreamGraph -> String
generateSinkFn StreamGraph
sg

-- output type of a StreamGraph.
-- special-case if the terminal node is a Sink node: we want the
-- "pure" StreamGraph type that feeds into the sink function.
outType :: StreamGraph -> String
outType :: StreamGraph -> String
outType StreamGraph
sg = let node :: StreamVertex
node = ([StreamVertex] -> StreamVertex
forall a. [a] -> a
last ([StreamVertex] -> StreamVertex)
-> (StreamGraph -> [StreamVertex]) -> StreamGraph -> StreamVertex
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamGraph -> [StreamVertex]
forall a. Ord a => Graph a -> [a]
vertexList) StreamGraph
sg
        in if StreamVertex -> StreamOperator
operator StreamVertex
node StreamOperator -> StreamOperator -> Bool
forall a. Eq a => a -> a -> Bool
== StreamOperator
Sink
           then StreamVertex -> String
intype StreamVertex
node
           else StreamVertex -> String
outtype StreamVertex
node

-- input type of a StreamGraph
-- see outType for rationale
inType :: StreamGraph -> String
inType :: StreamGraph -> String
inType StreamGraph
sg = let node :: StreamVertex
node = ([StreamVertex] -> StreamVertex
forall a. [a] -> a
head  ([StreamVertex] -> StreamVertex)
-> (StreamGraph -> [StreamVertex]) -> StreamGraph -> StreamVertex
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamGraph -> [StreamVertex]
forall a. Ord a => Graph a -> [a]
vertexList) StreamGraph
sg
            in if StreamOperator -> Bool
isSource (StreamOperator -> Bool) -> StreamOperator -> Bool
forall a b. (a -> b) -> a -> b
$ StreamVertex -> StreamOperator
operator StreamVertex
node
               then StreamVertex -> String
outtype StreamVertex
node
               else StreamVertex -> String
intype StreamVertex
node

t :: StreamGraph
t = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path [ Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 (Double -> StreamOperator
Source Double
1) [[| return 0 |]]       String
"IO Int" String
"Int" Double
1
         , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 StreamOperator
Map    [[| show |]]       String
"Int" String
"String" Double
2
         , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
2 StreamOperator
Sink   [[| mapM_ putStrLn |]] String
"String" String
"IO ()" Double
3
         ]

test_outType :: IO ()
test_outType = String -> String -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
assertEqual String
"String" (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$
    (StreamGraph -> String
outType (StreamGraph -> String)
-> (PartitionedGraph -> StreamGraph) -> PartitionedGraph -> String
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [StreamGraph] -> StreamGraph
forall a. [a] -> a
head ([StreamGraph] -> StreamGraph)
-> (PartitionedGraph -> [StreamGraph])
-> PartitionedGraph
-> StreamGraph
forall b c a. (b -> c) -> (a -> b) -> a -> c
. PartitionedGraph -> [StreamGraph]
forall a b. (a, b) -> a
fst) (StreamGraph -> PartitionMap -> PartitionedGraph
createPartitions StreamGraph
t [[Int
0,Int
1],[Int
2]])

test_outType_sink :: IO ()
test_outType_sink = assertEqual "String" $
    (StreamGraph -> String
outType (StreamGraph -> String)
-> (PartitionedGraph -> StreamGraph) -> PartitionedGraph -> String
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [StreamGraph] -> StreamGraph
forall a. [a] -> a
head ([StreamGraph] -> StreamGraph)
-> (PartitionedGraph -> [StreamGraph])
-> PartitionedGraph
-> StreamGraph
forall b c a. (b -> c) -> (a -> b) -> a -> c
. PartitionedGraph -> [StreamGraph]
forall a b. (a, b) -> a
fst) (StreamGraph -> PartitionMap -> PartitionedGraph
createPartitions StreamGraph
t [[Int
0,Int
1,Int
2]])

test_inType :: IO ()
test_inType = String -> String -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
assertEqual String
"Int" (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$
    (StreamGraph -> String
inType (StreamGraph -> String)
-> (PartitionedGraph -> StreamGraph) -> PartitionedGraph -> String
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [StreamGraph] -> StreamGraph
forall a. [a] -> a
head ([StreamGraph] -> StreamGraph)
-> (PartitionedGraph -> [StreamGraph])
-> PartitionedGraph
-> StreamGraph
forall b c a. (b -> c) -> (a -> b) -> a -> c
. PartitionedGraph -> [StreamGraph]
forall a b. (a, b) -> a
fst) (StreamGraph -> PartitionMap -> PartitionedGraph
createPartitions StreamGraph
t [[Int
0,Int
1],[Int
2]])

-- determine the node(s?) to connect on to from this partition
-- XXX always 0 or 1? write quickcheck property...
-- TODO: this breaks if the outer-edge node has been optimised away...
-- the graph of cut edges has the pre-optimisation StreamVertex in it.
connectNodeId :: StreamGraph -> [(Integer, StreamGraph)] -> StreamGraph -> [Integer]
connectNodeId :: StreamGraph -> [(Integer, StreamGraph)] -> StreamGraph -> [Integer]
connectNodeId StreamGraph
sg [(Integer, StreamGraph)]
parts StreamGraph
cuts = let
    edges :: [(StreamVertex, StreamVertex)]
edges = StreamGraph -> [(StreamVertex, StreamVertex)]
forall a. Ord a => Graph a -> [(a, a)]
edgeList StreamGraph
cuts
    outs :: [StreamVertex]
outs  = StreamGraph -> [StreamVertex]
forall a. Ord a => Graph a -> [a]
vertexList StreamGraph
sg
    outEs :: [(StreamVertex, StreamVertex)]
outEs = ((StreamVertex, StreamVertex) -> Bool)
-> [(StreamVertex, StreamVertex)] -> [(StreamVertex, StreamVertex)]
forall a. (a -> Bool) -> [a] -> [a]
filter (\(StreamVertex
f,StreamVertex
t) -> StreamVertex
f StreamVertex -> [StreamVertex] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` [StreamVertex]
outs) [(StreamVertex, StreamVertex)]
edges
    destVs :: [StreamVertex]
destVs= ((StreamVertex, StreamVertex) -> StreamVertex)
-> [(StreamVertex, StreamVertex)] -> [StreamVertex]
forall a b. (a -> b) -> [a] -> [b]
map (StreamVertex, StreamVertex) -> StreamVertex
forall a b. (a, b) -> b
snd [(StreamVertex, StreamVertex)]
outEs
    destGs :: [(Integer, StreamGraph)]
destGs= (StreamVertex -> [(Integer, StreamGraph)])
-> [StreamVertex] -> [(Integer, StreamGraph)]
forall (t :: * -> *) a b. Foldable t => (a -> [b]) -> t a -> [b]
concatMap (\StreamVertex
v -> ((Integer, StreamGraph) -> Bool)
-> [(Integer, StreamGraph)] -> [(Integer, StreamGraph)]
forall a. (a -> Bool) -> [a] -> [a]
filter (\(Integer
n,StreamGraph
sg) -> StreamVertex
v StreamVertex -> [StreamVertex] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` (StreamGraph -> [StreamVertex]
forall a. Ord a => Graph a -> [a]
vertexList StreamGraph
sg)) [(Integer, StreamGraph)]
parts) [StreamVertex]
destVs

    in case ((Integer, StreamGraph) -> Integer)
-> [(Integer, StreamGraph)] -> [Integer]
forall a b. (a -> b) -> [a] -> [b]
map (Integer, StreamGraph) -> Integer
forall a b. (a, b) -> a
fst [(Integer, StreamGraph)]
destGs of
        [] -> String -> [Integer]
forall a. HasCallStack => String -> a
error String
"connectNodeId returned an empty list, last vertex optimised away?"
        [Integer]
x  -> [Integer]
x

generateSrcFn :: StreamGraph -> String
generateSrcFn :: StreamGraph -> String
generateSrcFn StreamGraph
sg = String
"src1 = " String -> ShowS
forall a. [a] -> [a] -> [a]
++
    (String -> [String] -> String
forall a. [a] -> [[a]] -> [a]
intercalate String
"\n" ([String] -> String)
-> (StreamGraph -> [String]) -> StreamGraph -> String
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (ExpQ -> String) -> [ExpQ] -> [String]
forall a b. (a -> b) -> [a] -> [b]
map ExpQ -> String
showParam ([ExpQ] -> [String])
-> (StreamGraph -> [ExpQ]) -> StreamGraph -> [String]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamVertex -> [ExpQ]
parameters (StreamVertex -> [ExpQ])
-> (StreamGraph -> StreamVertex) -> StreamGraph -> [ExpQ]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [StreamVertex] -> StreamVertex
forall a. [a] -> a
head ([StreamVertex] -> StreamVertex)
-> (StreamGraph -> [StreamVertex]) -> StreamGraph -> StreamVertex
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamGraph -> [StreamVertex]
forall a. Ord a => Graph a -> [a]
vertexList (StreamGraph -> String) -> StreamGraph -> String
forall a b. (a -> b) -> a -> b
$ StreamGraph
sg) String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
"\n"

generateSinkFn:: StreamGraph -> String
generateSinkFn :: StreamGraph -> String
generateSinkFn StreamGraph
sg = String
"sink1 :: Show a => Stream a -> IO ()\nsink1 = " String -> ShowS
forall a. [a] -> [a] -> [a]
++
    (String -> [String] -> String
forall a. [a] -> [[a]] -> [a]
intercalate String
"\n" ([String] -> String)
-> (StreamGraph -> [String]) -> StreamGraph -> String
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (ExpQ -> String) -> [ExpQ] -> [String]
forall a b. (a -> b) -> [a] -> [b]
map ExpQ -> String
showParam ([ExpQ] -> [String])
-> (StreamGraph -> [ExpQ]) -> StreamGraph -> [String]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamVertex -> [ExpQ]
parameters (StreamVertex -> [ExpQ])
-> (StreamGraph -> StreamVertex) -> StreamGraph -> [ExpQ]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [StreamVertex] -> StreamVertex
forall a. [a] -> a
head ([StreamVertex] -> StreamVertex)
-> (StreamGraph -> [StreamVertex]) -> StreamGraph -> StreamVertex
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [StreamVertex] -> [StreamVertex]
forall a. [a] -> [a]
reverse ([StreamVertex] -> [StreamVertex])
-> (StreamGraph -> [StreamVertex]) -> StreamGraph -> [StreamVertex]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamGraph -> [StreamVertex]
forall a. Ord a => Graph a -> [a]
vertexList (StreamGraph -> String) -> StreamGraph -> String
forall a b. (a -> b) -> a -> b
$ StreamGraph
sg) String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
"\n"

generateNodeLink :: Integer -> String
generateNodeLink :: Integer -> String
generateNodeLink Integer
n = String
"main = nodeLink (defaultLink \"9001\" \"node"String -> ShowS
forall a. [a] -> [a] -> [a]
++(Integer -> String
forall a. Show a => a -> String
show Integer
n)String -> ShowS
forall a. [a] -> [a] -> [a]
++String
"\" \"9001\") streamGraphFn"

-- warts:
--  we accept a list of onward nodes but nodeSource only accepts one anyway
generateNodeSrc :: Integer -> [Integer] -> GenerateOpts -> [(Integer, StreamGraph)] -> String
generateNodeSrc :: Integer
-> [Integer] -> GenerateOpts -> [(Integer, StreamGraph)] -> String
generateNodeSrc Integer
partId [Integer]
nodes GenerateOpts
opts [(Integer, StreamGraph)]
parts = let
    node :: Integer
node = [Integer] -> Integer
forall a. [a] -> a
head [Integer]
nodes
    host :: String
host = String
"node" String -> ShowS
forall a. [a] -> [a] -> [a]
++ (Integer -> String
forall a. Show a => a -> String
show Integer
node)

    port :: Integer
port = case Integer -> [(Integer, StreamGraph)] -> Maybe StreamGraph
forall a b. Eq a => a -> [(a, b)] -> Maybe b
lookup Integer
node [(Integer, StreamGraph)]
parts of
        Just StreamGraph
sg -> if StreamGraph -> Bool
startsWithJoin StreamGraph
sg
                   then Integer
9001 Integer -> Integer -> Integer
forall a. Num a => a -> a -> a
+ Integer
partId Integer -> Integer -> Integer
forall a. Num a => a -> a -> a
-Integer
1 -- XXX Unlikely to always be correct
                   else Integer
9001
        Maybe StreamGraph
Nothing -> Integer
9001

    pref :: String
pref = case GenerateOpts -> Maybe String
preSource GenerateOpts
opts of
       Maybe String
Nothing -> String
""
       Just String
f  -> String
f String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
"\n  "

    in String
"main = do\n  " String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
pref String -> ShowS
forall a. [a] -> [a] -> [a]
++ (ExpQ -> String
showParam [|
        nodeSource (defaultSource $(litE (StringL host))
                                  $(litE (StringL (show port))))
                   src1 streamGraphFn
    |])

-- | does this StreamGraph start with a Join operator?
startsWithJoin :: StreamGraph -> Bool
startsWithJoin :: StreamGraph -> Bool
startsWithJoin = (StreamOperator
JoinStreamOperator -> StreamOperator -> Bool
forall a. Eq a => a -> a -> Bool
==) (StreamOperator -> Bool)
-> (StreamGraph -> StreamOperator) -> StreamGraph -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamGraph -> StreamOperator
startsWith

startsWith :: StreamGraph -> StreamOperator
startsWith :: StreamGraph -> StreamOperator
startsWith StreamGraph
sg = let
    inEdges :: [StreamVertex]
inEdges = ((StreamVertex, StreamVertex) -> StreamVertex)
-> [(StreamVertex, StreamVertex)] -> [StreamVertex]
forall a b. (a -> b) -> [a] -> [b]
map (StreamVertex, StreamVertex) -> StreamVertex
forall a b. (a, b) -> b
snd (StreamGraph -> [(StreamVertex, StreamVertex)]
forall a. Ord a => Graph a -> [(a, a)]
edgeList StreamGraph
sg)
    in (StreamVertex -> StreamOperator
operator (StreamVertex -> StreamOperator)
-> (StreamGraph -> StreamVertex) -> StreamGraph -> StreamOperator
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [StreamVertex] -> StreamVertex
forall a. [a] -> a
head ([StreamVertex] -> StreamVertex)
-> (StreamGraph -> [StreamVertex]) -> StreamGraph -> StreamVertex
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (StreamVertex -> Bool) -> [StreamVertex] -> [StreamVertex]
forall a. (a -> Bool) -> [a] -> [a]
filter (Bool -> Bool
not (Bool -> Bool) -> (StreamVertex -> Bool) -> StreamVertex -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ((StreamVertex -> [StreamVertex] -> Bool)
-> [StreamVertex] -> StreamVertex -> Bool
forall a b c. (a -> b -> c) -> b -> a -> c
flip StreamVertex -> [StreamVertex] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
elem) [StreamVertex]
inEdges) ([StreamVertex] -> [StreamVertex])
-> (StreamGraph -> [StreamVertex]) -> StreamGraph -> [StreamVertex]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamGraph -> [StreamVertex]
forall a. Ord a => Graph a -> [a]
vertexList) StreamGraph
sg

test_startsWithJoin_1 :: IO ()
test_startsWithJoin_1 = HasCallStack => Bool -> IO ()
Bool -> IO ()
assertBool (Bool -> IO ())
-> ([StreamVertex] -> Bool) -> [StreamVertex] -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamGraph -> Bool
startsWithJoin (StreamGraph -> Bool)
-> ([StreamVertex] -> StreamGraph) -> [StreamVertex] -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path ([StreamVertex] -> IO ()) -> [StreamVertex] -> IO ()
forall a b. (a -> b) -> a -> b
$
    [Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 StreamOperator
Join [] String
"" String
"" Double
1, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 StreamOperator
Merge [] String
"" String
"" Double
2]

test_startsWithJoin_2 :: IO ()
test_startsWithJoin_2 = HasCallStack => Bool -> IO ()
Bool -> IO ()
assertBool (Bool -> IO ())
-> ([StreamVertex] -> Bool) -> [StreamVertex] -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Bool -> Bool
not (Bool -> Bool)
-> ([StreamVertex] -> Bool) -> [StreamVertex] -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamGraph -> Bool
startsWithJoin (StreamGraph -> Bool)
-> ([StreamVertex] -> StreamGraph) -> [StreamVertex] -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path ([StreamVertex] -> IO ()) -> [StreamVertex] -> IO ()
forall a b. (a -> b) -> a -> b
$
    [Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 StreamOperator
Merge [] String
"" String
"" Double
3, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 StreamOperator
Join [] String
"" String
"" Double
4]

generateNodeSink :: StreamGraph -> String
generateNodeSink :: StreamGraph -> String
generateNodeSink StreamGraph
sg = String
"main = " String -> ShowS
forall a. [a] -> [a] -> [a]
++ (ExpQ -> String
showParam (ExpQ -> String) -> ExpQ -> String
forall a b. (a -> b) -> a -> b
$
    if StreamGraph -> Bool
startsWithJoin StreamGraph
sg
    then [| nodeSink2 streamGraphFn sink1 "9001" "9002" |]
    else [| nodeSink (defaultSink "9001") streamGraphFn sink1 |])

-- generateCodeFromVertex:  generates Haskell code to be included in a
-- let expression, corresponding to the supplied StreamVertex. The Int
-- argument represents the sequence order of the StreamVertex relative
-- to others, and is used to calculate the names of the input stream
-- argument(s).
-- As the StreamVertex parameters may need to reference the input streams,
-- the generated expression is wrapped in a lambda expression with a
-- constant stream name 's'.
generateCodeFromVertex :: (Int, StreamVertex) -> String
generateCodeFromVertex :: (Int, StreamVertex) -> String
generateCodeFromVertex (Int
opid, StreamVertex
v) = let
    op :: StreamOperator
op  = StreamVertex -> StreamOperator
operator StreamVertex
v
    n :: String
n   = Char
'n' Char -> ShowS
forall a. a -> [a] -> [a]
: Int -> String
forall a. Show a => a -> String
show Int
opid
    n_1 :: String
n_1 = Char
'n' Char -> ShowS
forall a. a -> [a] -> [a]
: Int -> String
forall a. Show a => a -> String
show (Int
opid Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1)
    in case StreamOperator
op of
        -- merges are handled by the runtime via TCP/IP. Replace with a no-op
        StreamOperator
Merge -> String
nString -> ShowS
forall a. [a] -> [a] -> [a]
++String
" = "String -> ShowS
forall a. [a] -> [a] -> [a]
++String
n_1

        -- Joins have two inputs and no parameters
        StreamOperator
Join  -> let n_2 :: String
n_2 = Char
'n' Char -> ShowS
forall a. a -> [a] -> [a]
: Int -> String
forall a. Show a => a -> String
show (Int
opid Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
2)
                 in  [String] -> String
forall (t :: * -> *) a. Foldable t => t [a] -> [a]
concat [String
n, String
" = streamJoin ", String
n_2, String
" ", String
n_1]

        StreamOperator
_ -> let params :: String
params  = String -> [String] -> String
forall a. [a] -> [[a]] -> [a]
intercalate String
" " ([String] -> String) -> [String] -> String
forall a b. (a -> b) -> a -> b
$ (ExpQ -> String) -> [ExpQ] -> [String]
forall a b. (a -> b) -> [a] -> [b]
map (ShowS
parenShowS -> (ExpQ -> String) -> ExpQ -> String
forall b c a. (b -> c) -> (a -> b) -> a -> c
.ExpQ -> String
showParam) (StreamVertex -> [ExpQ]
parameters StreamVertex
v)
             in  [String] -> String
forall (t :: * -> *) a. Foldable t => t [a] -> [a]
concat [String
n, String
" = (\\s -> ", StreamOperator -> String
printOp StreamOperator
op, String
" ", String
params, String
" s) ", String
n_1]


paren :: String -> String
paren :: ShowS
paren String
s = String
"("String -> ShowS
forall a. [a] -> [a] -> [a]
++String
sString -> ShowS
forall a. [a] -> [a] -> [a]
++String
")"

printOp :: StreamOperator -> String
printOp :: StreamOperator -> String
printOp (Filter Double
_) = String
"streamFilter"
printOp (FilterAcc Double
_) = String
"streamFilterAcc"
printOp StreamOperator
op = String
"stream" String -> ShowS
forall a. [a] -> [a] -> [a]
++ (StreamOperator -> String
forall a. Show a => a -> String
show StreamOperator
op)

------------------------------------------------------------------------------
-- tests / test data

main :: IO ()
main = TestSuite -> IO ()
forall t. TestableHTF t => t -> IO ()
htfMain htf_thisModulesTests

-- Source -> Sink
s0 :: StreamGraph
s0 = StreamGraph -> StreamGraph -> StreamGraph
forall a. Graph a -> Graph a -> Graph a
connect (StreamVertex -> StreamGraph
forall a. a -> Graph a
Vertex (Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 ((Double -> StreamOperator
Source Double
1)) [] String
"String" String
"String" Double
1))
             (StreamVertex -> StreamGraph
forall a. a -> Graph a
Vertex (Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 (StreamOperator
Sink) [] String
"String" String
"String" Double
2))

-- Source -> Filter -> Sink
s1 :: StreamGraph
s1 = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path [ Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 ((Double -> StreamOperator
Source Double
1)) [] String
"String" String
"String" Double
3
          , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 (Double -> StreamOperator
Filter Double
0.5) [] String
"String" String
"String" Double
4
          , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
2 (StreamOperator
Sink) [] String
"String" String
"String" Double
5
          ]

test_reform_s0 :: IO ()
test_reform_s0 = StreamGraph -> StreamGraph -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
assertEqual StreamGraph
s0 (PartitionedGraph -> StreamGraph
unPartition (PartitionedGraph -> StreamGraph)
-> PartitionedGraph -> StreamGraph
forall a b. (a -> b) -> a -> b
$ StreamGraph -> PartitionMap -> PartitionedGraph
createPartitions StreamGraph
s0 [[Int
0],[Int
1]])
test_reform_s1 :: IO ()
test_reform_s1 = StreamGraph -> StreamGraph -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
assertEqual StreamGraph
s1 (PartitionedGraph -> StreamGraph
unPartition (PartitionedGraph -> StreamGraph)
-> PartitionedGraph -> StreamGraph
forall a b. (a -> b) -> a -> b
$ StreamGraph -> PartitionMap -> PartitionedGraph
createPartitions StreamGraph
s1 [[Int
0,Int
1],[Int
2]])
test_reform_s1_2 :: IO ()
test_reform_s1_2 = StreamGraph -> StreamGraph -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
assertEqual StreamGraph
s1 (PartitionedGraph -> StreamGraph
unPartition (PartitionedGraph -> StreamGraph)
-> PartitionedGraph -> StreamGraph
forall a b. (a -> b) -> a -> b
$ StreamGraph -> PartitionMap -> PartitionedGraph
createPartitions StreamGraph
s1 [[Int
0],[Int
1,Int
2]])

genDockerfile :: Bool -> GenerateOpts -> String
genDockerfile Bool
listen GenerateOpts
opts = 
    let pkgs :: [String]
pkgs = GenerateOpts -> [String]
packages GenerateOpts
opts in [String] -> String
forall (t :: * -> *) a. Foldable t => t [a] -> [a]
concat
    [ String
"FROM ghcr.io/striot/striot:main\n"
    , String
"WORKDIR /opt/node\n"
    , String
"COPY . /opt/node\n"
    , if [String]
pkgs [String] -> [String] -> Bool
forall a. Eq a => a -> a -> Bool
/= [] then String
"RUN cabal install " String -> ShowS
forall a. [a] -> [a] -> [a]
++ (String -> [String] -> String
forall a. [a] -> [[a]] -> [a]
intercalate String
" " [String]
pkgs) else String
""
    , String
"\n"
    , String
"RUN ghc node.hs\n"
    , if Bool
listen then String
"EXPOSE 9001\n" else String
""
    , String
"CMD /opt/node/node\n"
    ]

mergeEx :: StreamGraph
mergeEx = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path
    [ Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 (Double -> StreamOperator
Source Double
1) [] String
"Int" String
"Int" Double
1
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 StreamOperator
Merge [] String
"[Int]" String
"Int" Double
2
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
2 StreamOperator
Map [[| show |]] String
"Int" String
"String" Double
3
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
3 StreamOperator
Sink [[| mapM_ print |]] String
"String" String
"String" Double
4
    ]

------------------------------------------------------------------------------

-- XXX rename
writePart :: GenerateOpts -> (Int, String) -> IO ()
writePart :: GenerateOpts -> (Int, String) -> IO ()
writePart GenerateOpts
opts (Int
x,String
y) = let
    dockerfile :: String
dockerfile = Bool -> GenerateOpts -> String
genDockerfile Bool
True GenerateOpts
opts
    bn :: String
bn = String
"node" String -> ShowS
forall a. [a] -> [a] -> [a]
++ (Int -> String
forall a. Show a => a -> String
show Int
x)
    fn :: String
fn = String
bn String -> ShowS
</> String
"node.hs"
    in do
        Bool -> String -> IO ()
createDirectoryIfMissing Bool
True String
bn
        String -> String -> IO ()
writeFile (String
bn String -> ShowS
</> String
"Dockerfile") String
dockerfile
        String -> String -> IO ()
writeFile String
fn String
y

-- |Partitions the supplied `StreamGraph` according to the supplied `PartitionMap`;
-- invokes `generateCode` for each derived sub-graph; writes out the resulting
-- source code to individual source code files, one per node.
-- 
-- *TODO*: move GenerateOpts to first parameter?
partitionGraph :: StreamGraph -> PartitionMap -> GenerateOpts -> IO ()
partitionGraph :: StreamGraph -> PartitionMap -> GenerateOpts -> IO ()
partitionGraph StreamGraph
graph PartitionMap
partitions GenerateOpts
opts = do
    ((Int, String) -> IO ()) -> [(Int, String)] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (GenerateOpts -> (Int, String) -> IO ()
writePart GenerateOpts
opts) ([(Int, String)] -> IO ()) -> [(Int, String)] -> IO ()
forall a b. (a -> b) -> a -> b
$ [Int] -> [String] -> [(Int, String)]
forall a b. [a] -> [b] -> [(a, b)]
zip [Int
1..] ([String] -> [(Int, String)]) -> [String] -> [(Int, String)]
forall a b. (a -> b) -> a -> b
$ GenerateOpts -> StreamGraph -> PartitionMap -> [String]
generateCode GenerateOpts
opts StreamGraph
graph PartitionMap
partitions

------------------------------------------------------------------------------

-- | Derive a partition map.
-- Eventually, derive all possible partition maps.
partitionings :: StreamGraph -> [Partition] -> PartitionMap
partitionings :: StreamGraph -> [Int] -> PartitionMap
partitionings StreamGraph
sg [Int]
parts = let
    vIds :: [Int]
vIds = (StreamVertex -> Int) -> [StreamVertex] -> [Int]
forall a b. (a -> b) -> [a] -> [b]
map StreamVertex -> Int
vertexId ([StreamVertex] -> [Int])
-> (StreamGraph -> [StreamVertex]) -> StreamGraph -> [Int]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamGraph -> [StreamVertex]
forall a. Ord a => Graph a -> [a]
vertexList (StreamGraph -> [Int]) -> StreamGraph -> [Int]
forall a b. (a -> b) -> a -> b
$ StreamGraph
sg
    in case [Int] -> [Int] -> Ordering
forall a b. [a] -> [b] -> Ordering
compareLength [Int]
vIds [Int]
parts of
        Ordering
EQ -> (Int -> [Int]) -> [Int] -> PartitionMap
forall a b. (a -> b) -> [a] -> [b]
map (Int -> [Int] -> [Int]
forall a. a -> [a] -> [a]
:[]) [Int]
vIds

        Ordering
GT -> let
            diff :: Int
diff         = [Int] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [Int]
vIds Int -> Int -> Int
forall a. Num a => a -> a -> a
- [Int] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [Int]
parts
            ([Int]
first,[Int]
rest) = Int -> [Int] -> ([Int], [Int])
forall a. Int -> [a] -> ([a], [a])
splitAt (Int
diff Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) [Int]
vIds
            in [[Int]
first] PartitionMap -> PartitionMap -> PartitionMap
forall a. [a] -> [a] -> [a]
++ (Int -> [Int]) -> [Int] -> PartitionMap
forall a b. (a -> b) -> [a] -> [b]
map (Int -> [Int] -> [Int]
forall a. a -> [a] -> [a]
:[]) [Int]
rest

        Ordering
LT -> String -> PartitionMap
forall a. HasCallStack => String -> a
error String
"cannot partition a graph over more partitions than there are nodes"

partTestGraph :: StreamGraph
partTestGraph = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path
    [ Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 (Double -> StreamOperator
Source Double
1) []        String
"Int" String
"Int" Double
1
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 StreamOperator
Map [[| show |]] String
"Int" String
"String" Double
2
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
2 (Double -> StreamOperator
Filter Double
0.5) [[| (<3) |]]    String
"Int" String
"Int" Double
3
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
3 StreamOperator
Window []        String
"String" String
"[String]" Double
4
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
4 StreamOperator
Sink []          String
"String" String
"String" Double
5
    ]

test_partitionings_1 :: IO ()
test_partitionings_1 = PartitionMap -> PartitionMap -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
assertEqual [[Int
x]|Int
x <- [Int
0..Int
4]] (PartitionMap -> IO ()) -> PartitionMap -> IO ()
forall a b. (a -> b) -> a -> b
$
    StreamGraph -> [Int] -> PartitionMap
partitionings StreamGraph
partTestGraph [Int
0..Int
4]

test_partitionings_2 :: IO ()
test_partitionings_2 = Int -> Int -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
assertEqual Int
3 (Int -> IO ()) -> Int -> IO ()
forall a b. (a -> b) -> a -> b
$ PartitionMap -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length (PartitionMap -> Int) -> PartitionMap -> Int
forall a b. (a -> b) -> a -> b
$
    StreamGraph -> [Int] -> PartitionMap
partitionings StreamGraph
partTestGraph [Int
0..Int
2]

test_partitionings_3 :: IO ()
test_partitionings_3 = Int -> Int -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
assertEqual Int
3 (Int -> IO ()) -> Int -> IO ()
forall a b. (a -> b) -> a -> b
$ [Int] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length ([Int] -> Int) -> [Int] -> Int
forall a b. (a -> b) -> a -> b
$ PartitionMap -> [Int]
forall a. [a] -> a
head (PartitionMap -> [Int]) -> PartitionMap -> [Int]
forall a b. (a -> b) -> a -> b
$
    StreamGraph -> [Int] -> PartitionMap
partitionings StreamGraph
partTestGraph [Int
0..Int
2]