{-# OPTIONS_GHC -F -pgmF htfpp #-}
{-# OPTIONS_HADDOCK prune #-}
{-# LANGUAGE TemplateHaskell #-}
module Striot.CompileIoT ( createPartitions
, generateCode
, GenerateOpts(..)
, defaultOpts
, Partition
, PartitionMap
, writePart
, genDockerfile
, partitionGraph
, generateCodeFromStreamGraph
, nodeFn
, nodeType
, generateNodeSrc
, connectNodeId
, htf_thisModulesTests
) where
import Data.List (intercalate, nub)
import Algebra.Graph
import Algebra.Graph.ToGraph (reachable)
import Test.Framework
import System.FilePath ((</>))
import System.Directory (createDirectoryIfMissing)
import Data.Function ((&))
import Data.Maybe (catMaybes)
import Data.List (nub,sort)
import Data.List.Match (compareLength)
import Language.Haskell.TH
import Striot.StreamGraph
import Striot.LogicalOptimiser
import Striot.Partition
type Partition = Int
type PartitionMap = [[Int]]
createPartitions :: StreamGraph -> PartitionMap -> PartitionedGraph
createPartitions :: StreamGraph -> PartitionMap -> PartitionedGraph
createPartitions StreamGraph
_ [] = ([],StreamGraph
forall a. Graph a
empty)
createPartitions StreamGraph
g ([Int]
p:PartitionMap
ps) = (StreamGraph
thisGraphStreamGraph -> [StreamGraph] -> [StreamGraph]
forall a. a -> [a] -> [a]
:[StreamGraph]
tailParts, StreamGraph
edgesOut StreamGraph -> StreamGraph -> StreamGraph
forall a. Graph a -> Graph a -> Graph a
`overlay` StreamGraph
tailCuts) where
fv :: StreamVertex -> Bool
fv StreamVertex
v = (StreamVertex -> Int
vertexId StreamVertex
v) Int -> [Int] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` [Int]
p
vs :: StreamGraph
vs = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
vertices ([StreamVertex] -> StreamGraph) -> [StreamVertex] -> StreamGraph
forall a b. (a -> b) -> a -> b
$ (StreamVertex -> Bool) -> [StreamVertex] -> [StreamVertex]
forall a. (a -> Bool) -> [a] -> [a]
filter StreamVertex -> Bool
fv (StreamGraph -> [StreamVertex]
forall a. Ord a => Graph a -> [a]
vertexList StreamGraph
g)
es :: StreamGraph
es = [(StreamVertex, StreamVertex)] -> StreamGraph
forall a. [(a, a)] -> Graph a
edges ([(StreamVertex, StreamVertex)] -> StreamGraph)
-> [(StreamVertex, StreamVertex)] -> StreamGraph
forall a b. (a -> b) -> a -> b
$ ((StreamVertex, StreamVertex) -> Bool)
-> [(StreamVertex, StreamVertex)] -> [(StreamVertex, StreamVertex)]
forall a. (a -> Bool) -> [a] -> [a]
filter (\(StreamVertex
v1,StreamVertex
v2) -> (StreamVertex -> Bool
fv StreamVertex
v1) Bool -> Bool -> Bool
&& (StreamVertex -> Bool
fv StreamVertex
v2)) (StreamGraph -> [(StreamVertex, StreamVertex)]
forall a. Ord a => Graph a -> [(a, a)]
edgeList StreamGraph
g)
thisGraph :: StreamGraph
thisGraph = StreamGraph -> StreamGraph -> StreamGraph
forall a. Graph a -> Graph a -> Graph a
overlay StreamGraph
vs StreamGraph
es
edgesOut :: StreamGraph
edgesOut = [(StreamVertex, StreamVertex)] -> StreamGraph
forall a. [(a, a)] -> Graph a
edges ([(StreamVertex, StreamVertex)] -> StreamGraph)
-> [(StreamVertex, StreamVertex)] -> StreamGraph
forall a b. (a -> b) -> a -> b
$ ((StreamVertex, StreamVertex) -> Bool)
-> [(StreamVertex, StreamVertex)] -> [(StreamVertex, StreamVertex)]
forall a. (a -> Bool) -> [a] -> [a]
filter (\(StreamVertex
v1,StreamVertex
v2) -> (StreamVertex -> Bool
fv StreamVertex
v1) Bool -> Bool -> Bool
&& (Bool -> Bool
not(StreamVertex -> Bool
fv StreamVertex
v2))) (StreamGraph -> [(StreamVertex, StreamVertex)]
forall a. Ord a => Graph a -> [(a, a)]
edgeList StreamGraph
g)
([StreamGraph]
tailParts, StreamGraph
tailCuts) = StreamGraph -> PartitionMap -> PartitionedGraph
createPartitions StreamGraph
g PartitionMap
ps
unPartition :: PartitionedGraph -> Graph StreamVertex
unPartition :: PartitionedGraph -> StreamGraph
unPartition ([StreamGraph]
a,StreamGraph
b) = StreamGraph -> StreamGraph -> StreamGraph
forall a. Graph a -> Graph a -> Graph a
overlay StreamGraph
b (StreamGraph -> StreamGraph) -> StreamGraph -> StreamGraph
forall a b. (a -> b) -> a -> b
$ (StreamGraph -> StreamGraph -> StreamGraph)
-> StreamGraph -> [StreamGraph] -> StreamGraph
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
foldl StreamGraph -> StreamGraph -> StreamGraph
forall a. Graph a -> Graph a -> Graph a
overlay StreamGraph
forall a. Graph a
Empty [StreamGraph]
a
data GenerateOpts = GenerateOpts
{ GenerateOpts -> [String]
imports :: [String]
, GenerateOpts -> [String]
packages :: [String]
, GenerateOpts -> Maybe String
preSource :: Maybe String
, GenerateOpts -> [RewriteRule]
rules :: [RewriteRule]
, GenerateOpts -> Double
maxNodeUtil :: Double
, GenerateOpts -> Double
bandwidthLimit :: Double
}
defaultOpts :: GenerateOpts
defaultOpts = GenerateOpts :: [String]
-> [String]
-> Maybe String
-> [RewriteRule]
-> Double
-> Double
-> GenerateOpts
GenerateOpts
{ imports :: [String]
imports = [ String
"Striot.FunctionalIoTtypes"
, String
"Striot.FunctionalProcessing"
, String
"Striot.Nodes"
, String
"Control.Concurrent"
, String
"Control.Category ((>>>))"
]
, packages :: [String]
packages = []
, preSource :: Maybe String
preSource = Maybe String
forall a. Maybe a
Nothing
, rules :: [RewriteRule]
rules = [RewriteRule]
defaultRewriteRules
, maxNodeUtil :: Double
maxNodeUtil = Double
3.0
, bandwidthLimit :: Double
bandwidthLimit = Double
31
}
generateCode :: GenerateOpts -> StreamGraph -> PartitionMap -> [String]
generateCode :: GenerateOpts -> StreamGraph -> PartitionMap -> [String]
generateCode GenerateOpts
opts StreamGraph
sg PartitionMap
pm = let
([StreamGraph]
sgs,StreamGraph
cuts) = StreamGraph -> PartitionMap -> PartitionedGraph
createPartitions StreamGraph
sg (PartitionMap -> PartitionMap
forall a. Ord a => [a] -> [a]
sort (([Int] -> [Int]) -> PartitionMap -> PartitionMap
forall a b. (a -> b) -> [a] -> [b]
map [Int] -> [Int]
forall a. Ord a => [a] -> [a]
sort PartitionMap
pm))
enumeratedParts :: [(Integer, StreamGraph)]
enumeratedParts = [Integer] -> [StreamGraph] -> [(Integer, StreamGraph)]
forall a b. [a] -> [b] -> [(a, b)]
zip [Integer
1..] [StreamGraph]
sgs
in ((Integer, StreamGraph) -> String)
-> [(Integer, StreamGraph)] -> [String]
forall a b. (a -> b) -> [a] -> [b]
map (GenerateOpts
-> [(Integer, StreamGraph)]
-> StreamGraph
-> (Integer, StreamGraph)
-> String
generateCodeFromStreamGraph GenerateOpts
opts [(Integer, StreamGraph)]
enumeratedParts StreamGraph
cuts) [(Integer, StreamGraph)]
enumeratedParts
data NodeType = NodeSource | NodeSink | NodeLink deriving (Int -> NodeType -> ShowS
[NodeType] -> ShowS
NodeType -> String
(Int -> NodeType -> ShowS)
-> (NodeType -> String) -> ([NodeType] -> ShowS) -> Show NodeType
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [NodeType] -> ShowS
$cshowList :: [NodeType] -> ShowS
show :: NodeType -> String
$cshow :: NodeType -> String
showsPrec :: Int -> NodeType -> ShowS
$cshowsPrec :: Int -> NodeType -> ShowS
Show)
nodeType :: StreamGraph -> NodeType
nodeType :: StreamGraph -> NodeType
nodeType StreamGraph
sg = if StreamOperator -> Bool
isSource (StreamOperator -> Bool) -> StreamOperator -> Bool
forall a b. (a -> b) -> a -> b
$ StreamVertex -> StreamOperator
operator ([StreamVertex] -> StreamVertex
forall a. [a] -> a
head (StreamGraph -> [StreamVertex]
forall a. Ord a => Graph a -> [a]
vertexList StreamGraph
sg))
then NodeType
NodeSource
else if (StreamVertex -> StreamOperator
operator(StreamVertex -> StreamOperator)
-> (StreamGraph -> StreamVertex) -> StreamGraph -> StreamOperator
forall b c a. (b -> c) -> (a -> b) -> a -> c
.[StreamVertex] -> StreamVertex
forall a. [a] -> a
head([StreamVertex] -> StreamVertex)
-> (StreamGraph -> [StreamVertex]) -> StreamGraph -> StreamVertex
forall b c a. (b -> c) -> (a -> b) -> a -> c
.[StreamVertex] -> [StreamVertex]
forall a. [a] -> [a]
reverse([StreamVertex] -> [StreamVertex])
-> (StreamGraph -> [StreamVertex]) -> StreamGraph -> [StreamVertex]
forall b c a. (b -> c) -> (a -> b) -> a -> c
.StreamGraph -> [StreamVertex]
forall a. Ord a => Graph a -> [a]
vertexList) StreamGraph
sg StreamOperator -> StreamOperator -> Bool
forall a. Eq a => a -> a -> Bool
== StreamOperator
Sink
then NodeType
NodeSink
else NodeType
NodeLink
generateCodeFromStreamGraph :: GenerateOpts -> [(Integer, StreamGraph)] -> StreamGraph -> (Integer,StreamGraph) -> String
generateCodeFromStreamGraph :: GenerateOpts
-> [(Integer, StreamGraph)]
-> StreamGraph
-> (Integer, StreamGraph)
-> String
generateCodeFromStreamGraph GenerateOpts
opts [(Integer, StreamGraph)]
parts StreamGraph
cuts (Integer
partId,StreamGraph
sg) = String -> [String] -> String
forall a. [a] -> [[a]] -> [a]
intercalate String
"\n" ([String] -> String) -> [String] -> String
forall a b. (a -> b) -> a -> b
$
String
nodeId String -> [String] -> [String]
forall a. a -> [a] -> [a]
:
[String]
imports' [String] -> [String] -> [String]
forall a. [a] -> [a] -> [a]
++
[(Integer, StreamGraph)] -> StreamGraph -> String
forall {t :: * -> *} {a}.
Foldable t =>
t a -> StreamGraph -> String
possibleSrcFn [(Integer, StreamGraph)]
parts StreamGraph
sg String -> [String] -> [String]
forall a. a -> [a] -> [a]
:
[(Integer, StreamGraph)] -> StreamGraph -> String
forall {t :: * -> *} {a}.
Foldable t =>
t a -> StreamGraph -> String
possibleSinkFn [(Integer, StreamGraph)]
parts StreamGraph
sg String -> [String] -> [String]
forall a. a -> [a] -> [a]
:
String
sgTypeSignature String -> [String] -> [String]
forall a. a -> [a] -> [a]
:
String
sgIntro String -> [String] -> [String]
forall a. a -> [a] -> [a]
:
[String]
sgBody [String] -> [String] -> [String]
forall a. [a] -> [a] -> [a]
++
[String
padding String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
"in " String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
lastIdentifier,String
"\n",
String
"main :: IO ()",
[(Integer, StreamGraph)]
-> StreamGraph -> Integer -> StreamGraph -> GenerateOpts -> String
nodeFn [(Integer, StreamGraph)]
parts StreamGraph
sg Integer
partId StreamGraph
cuts GenerateOpts
opts] where
nodeId :: String
nodeId = String
"-- node"String -> ShowS
forall a. [a] -> [a] -> [a]
++(Integer -> String
forall a. Show a => a -> String
show Integer
partId)
padding :: String
padding = String
" "
pad :: [String] -> [String]
pad = ShowS -> [String] -> [String]
forall a b. (a -> b) -> [a] -> [b]
map (String
paddingString -> ShowS
forall a. [a] -> [a] -> [a]
++)
sgTypeSignature :: String
sgTypeSignature = String
"streamGraphFn ::" String -> ShowS
forall a. [a] -> [a] -> [a]
++ case StreamGraph -> StreamOperator
startsWith StreamGraph
sg of
StreamOperator
Join -> String
" Stream "String -> ShowS
forall a. [a] -> [a] -> [a]
++StreamGraph -> String
inType StreamGraph
sgString -> ShowS
forall a. [a] -> [a] -> [a]
++String
" ->"
String -> ShowS
forall a. [a] -> [a] -> [a]
++String
" Stream "String -> ShowS
forall a. [a] -> [a] -> [a]
++StreamGraph -> String
inType StreamGraph
sgString -> ShowS
forall a. [a] -> [a] -> [a]
++String
" ->"
String -> ShowS
forall a. [a] -> [a] -> [a]
++String
" Stream "String -> ShowS
forall a. [a] -> [a] -> [a]
++StreamGraph -> String
outType StreamGraph
sg
StreamOperator
Merge -> String
" Stream "String -> ShowS
forall a. [a] -> [a] -> [a]
++StreamGraph -> String
outType StreamGraph
sgString -> ShowS
forall a. [a] -> [a] -> [a]
++String
" ->"String -> ShowS
forall a. [a] -> [a] -> [a]
++String
" Stream "String -> ShowS
forall a. [a] -> [a] -> [a]
++StreamGraph -> String
outType StreamGraph
sg
StreamOperator
_ -> String
" Stream "String -> ShowS
forall a. [a] -> [a] -> [a]
++StreamGraph -> String
inType StreamGraph
sgString -> ShowS
forall a. [a] -> [a] -> [a]
++String
" ->"String -> ShowS
forall a. [a] -> [a] -> [a]
++String
" Stream "String -> ShowS
forall a. [a] -> [a] -> [a]
++StreamGraph -> String
outType StreamGraph
sg
sgIntro :: String
sgIntro = String
"streamGraphFn "String -> ShowS
forall a. [a] -> [a] -> [a]
++String
sgArgsString -> ShowS
forall a. [a] -> [a] -> [a]
++String
" = let"
sgArgs :: String
sgArgs = if StreamGraph -> Bool
startsWithJoin StreamGraph
sg
then String
"n1 n2"
else String
"n1"
sgBody :: [String]
sgBody = [String] -> [String]
pad ([String] -> [String]) -> [String] -> [String]
forall a b. (a -> b) -> a -> b
$ case [StreamVertex]
intVerts of
[] -> [String
"n2 = n1"]
[StreamVertex]
ns -> if StreamGraph -> Bool
startsWithJoin StreamGraph
sg
then ((Int, StreamVertex) -> String)
-> [(Int, StreamVertex)] -> [String]
forall a b. (a -> b) -> [a] -> [b]
map (Int, StreamVertex) -> String
generateCodeFromVertex ([Int] -> [StreamVertex] -> [(Int, StreamVertex)]
forall a b. [a] -> [b] -> [(a, b)]
zip [Int
3..] [StreamVertex]
ns)
else ((Int, StreamVertex) -> String)
-> [(Int, StreamVertex)] -> [String]
forall a b. (a -> b) -> [a] -> [b]
map (Int, StreamVertex) -> String
generateCodeFromVertex ([Int] -> [StreamVertex] -> [(Int, StreamVertex)]
forall a b. [a] -> [b] -> [(a, b)]
zip [Int
2..] [StreamVertex]
ns)
imports' :: [String]
imports' = (ShowS -> [String] -> [String]
forall a b. (a -> b) -> [a] -> [b]
map (String
"import "String -> ShowS
forall a. [a] -> [a] -> [a]
++) (GenerateOpts -> [String]
imports GenerateOpts
opts)) [String] -> [String] -> [String]
forall a. [a] -> [a] -> [a]
++ [String
"\n"]
lastIdentifier :: String
lastIdentifier = Char
'n'Char -> ShowS
forall a. a -> [a] -> [a]
:(Int -> String
forall a. Show a => a -> String
show (Int -> String) -> Int -> String
forall a b. (a -> b) -> a -> b
$ [StreamVertex] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [StreamVertex]
intVerts
Int -> Int -> Int
forall a. Num a => a -> a -> a
+ if StreamGraph -> Bool
startsWithJoin StreamGraph
sg then Int
2 else Int
1)
intVerts :: [StreamVertex]
intVerts= (StreamVertex -> Bool) -> [StreamVertex] -> [StreamVertex]
forall a. (a -> Bool) -> [a] -> [a]
filter (Bool -> Bool
not (Bool -> Bool) -> (StreamVertex -> Bool) -> StreamVertex -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamVertex -> Bool
singleton) ([StreamVertex] -> [StreamVertex])
-> [StreamVertex] -> [StreamVertex]
forall a b. (a -> b) -> a -> b
$ StreamGraph -> [StreamVertex]
forall a. Ord a => Graph a -> [a]
vertexList StreamGraph
sg
nodeFn :: [(Integer, StreamGraph)]
-> StreamGraph -> Integer -> StreamGraph -> GenerateOpts -> String
nodeFn [(Integer, StreamGraph)]
parts StreamGraph
sg Integer
partId StreamGraph
cuts GenerateOpts
opts =
if [(Integer, StreamGraph)] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [(Integer, StreamGraph)]
parts Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
1
then String
"main = nodeSimple src1 streamGraphFn sink1"
else case (StreamGraph -> NodeType
nodeType StreamGraph
sg) of
NodeType
NodeSource -> Integer
-> [Integer] -> GenerateOpts -> [(Integer, StreamGraph)] -> String
generateNodeSrc Integer
partId (StreamGraph -> [(Integer, StreamGraph)] -> StreamGraph -> [Integer]
connectNodeId StreamGraph
sg [(Integer, StreamGraph)]
parts StreamGraph
cuts) GenerateOpts
opts [(Integer, StreamGraph)]
parts
NodeType
NodeLink -> Integer -> String
generateNodeLink (Integer
partId Integer -> Integer -> Integer
forall a. Num a => a -> a -> a
+ Integer
1)
NodeType
NodeSink -> StreamGraph -> String
generateNodeSink StreamGraph
sg
possibleSrcFn :: t a -> StreamGraph -> String
possibleSrcFn t a
parts StreamGraph
sg =
if t a -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length t a
parts Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
1
then StreamGraph -> String
generateSrcFn StreamGraph
sg
else case (StreamGraph -> NodeType
nodeType StreamGraph
sg) of
NodeType
NodeSource -> StreamGraph -> String
generateSrcFn StreamGraph
sg
NodeType
_ -> String
""
possibleSinkFn :: t a -> StreamGraph -> String
possibleSinkFn t a
parts StreamGraph
sg =
if t a -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length t a
parts Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
1
then StreamGraph -> String
generateSinkFn StreamGraph
sg
else case (StreamGraph -> NodeType
nodeType StreamGraph
sg) of
NodeType
NodeSink -> StreamGraph -> String
generateSinkFn StreamGraph
sg
NodeType
_ -> String
""
possibleSrcSinkFn :: StreamGraph -> String
possibleSrcSinkFn StreamGraph
sg = case (StreamGraph -> NodeType
nodeType StreamGraph
sg) of
NodeType
NodeSource -> StreamGraph -> String
generateSrcFn StreamGraph
sg
NodeType
NodeLink -> String
""
NodeType
NodeSink -> StreamGraph -> String
generateSinkFn StreamGraph
sg
outType :: StreamGraph -> String
outType :: StreamGraph -> String
outType StreamGraph
sg = let node :: StreamVertex
node = ([StreamVertex] -> StreamVertex
forall a. [a] -> a
last ([StreamVertex] -> StreamVertex)
-> (StreamGraph -> [StreamVertex]) -> StreamGraph -> StreamVertex
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamGraph -> [StreamVertex]
forall a. Ord a => Graph a -> [a]
vertexList) StreamGraph
sg
in if StreamVertex -> StreamOperator
operator StreamVertex
node StreamOperator -> StreamOperator -> Bool
forall a. Eq a => a -> a -> Bool
== StreamOperator
Sink
then StreamVertex -> String
intype StreamVertex
node
else StreamVertex -> String
outtype StreamVertex
node
inType :: StreamGraph -> String
inType :: StreamGraph -> String
inType StreamGraph
sg = let node :: StreamVertex
node = ([StreamVertex] -> StreamVertex
forall a. [a] -> a
head ([StreamVertex] -> StreamVertex)
-> (StreamGraph -> [StreamVertex]) -> StreamGraph -> StreamVertex
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamGraph -> [StreamVertex]
forall a. Ord a => Graph a -> [a]
vertexList) StreamGraph
sg
in if StreamOperator -> Bool
isSource (StreamOperator -> Bool) -> StreamOperator -> Bool
forall a b. (a -> b) -> a -> b
$ StreamVertex -> StreamOperator
operator StreamVertex
node
then StreamVertex -> String
outtype StreamVertex
node
else StreamVertex -> String
intype StreamVertex
node
t :: StreamGraph
t = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path [ Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 (Double -> StreamOperator
Source Double
1) [[| return 0 |]] String
"IO Int" String
"Int" Double
1
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 StreamOperator
Map [[| show |]] String
"Int" String
"String" Double
2
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
2 StreamOperator
Sink [[| mapM_ putStrLn |]] String
"String" String
"IO ()" Double
3
]
test_outType :: IO ()
test_outType = String -> String -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
assertEqual String
"String" (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$
(StreamGraph -> String
outType (StreamGraph -> String)
-> (PartitionedGraph -> StreamGraph) -> PartitionedGraph -> String
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [StreamGraph] -> StreamGraph
forall a. [a] -> a
head ([StreamGraph] -> StreamGraph)
-> (PartitionedGraph -> [StreamGraph])
-> PartitionedGraph
-> StreamGraph
forall b c a. (b -> c) -> (a -> b) -> a -> c
. PartitionedGraph -> [StreamGraph]
forall a b. (a, b) -> a
fst) (StreamGraph -> PartitionMap -> PartitionedGraph
createPartitions StreamGraph
t [[Int
0,Int
1],[Int
2]])
test_outType_sink :: IO ()
test_outType_sink = assertEqual "String" $
(StreamGraph -> String
outType (StreamGraph -> String)
-> (PartitionedGraph -> StreamGraph) -> PartitionedGraph -> String
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [StreamGraph] -> StreamGraph
forall a. [a] -> a
head ([StreamGraph] -> StreamGraph)
-> (PartitionedGraph -> [StreamGraph])
-> PartitionedGraph
-> StreamGraph
forall b c a. (b -> c) -> (a -> b) -> a -> c
. PartitionedGraph -> [StreamGraph]
forall a b. (a, b) -> a
fst) (StreamGraph -> PartitionMap -> PartitionedGraph
createPartitions StreamGraph
t [[Int
0,Int
1,Int
2]])
test_inType :: IO ()
test_inType = String -> String -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
assertEqual String
"Int" (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$
(StreamGraph -> String
inType (StreamGraph -> String)
-> (PartitionedGraph -> StreamGraph) -> PartitionedGraph -> String
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [StreamGraph] -> StreamGraph
forall a. [a] -> a
head ([StreamGraph] -> StreamGraph)
-> (PartitionedGraph -> [StreamGraph])
-> PartitionedGraph
-> StreamGraph
forall b c a. (b -> c) -> (a -> b) -> a -> c
. PartitionedGraph -> [StreamGraph]
forall a b. (a, b) -> a
fst) (StreamGraph -> PartitionMap -> PartitionedGraph
createPartitions StreamGraph
t [[Int
0,Int
1],[Int
2]])
connectNodeId :: StreamGraph -> [(Integer, StreamGraph)] -> StreamGraph -> [Integer]
connectNodeId :: StreamGraph -> [(Integer, StreamGraph)] -> StreamGraph -> [Integer]
connectNodeId StreamGraph
sg [(Integer, StreamGraph)]
parts StreamGraph
cuts = let
edges :: [(StreamVertex, StreamVertex)]
edges = StreamGraph -> [(StreamVertex, StreamVertex)]
forall a. Ord a => Graph a -> [(a, a)]
edgeList StreamGraph
cuts
outs :: [StreamVertex]
outs = StreamGraph -> [StreamVertex]
forall a. Ord a => Graph a -> [a]
vertexList StreamGraph
sg
outEs :: [(StreamVertex, StreamVertex)]
outEs = ((StreamVertex, StreamVertex) -> Bool)
-> [(StreamVertex, StreamVertex)] -> [(StreamVertex, StreamVertex)]
forall a. (a -> Bool) -> [a] -> [a]
filter (\(StreamVertex
f,StreamVertex
t) -> StreamVertex
f StreamVertex -> [StreamVertex] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` [StreamVertex]
outs) [(StreamVertex, StreamVertex)]
edges
destVs :: [StreamVertex]
destVs= ((StreamVertex, StreamVertex) -> StreamVertex)
-> [(StreamVertex, StreamVertex)] -> [StreamVertex]
forall a b. (a -> b) -> [a] -> [b]
map (StreamVertex, StreamVertex) -> StreamVertex
forall a b. (a, b) -> b
snd [(StreamVertex, StreamVertex)]
outEs
destGs :: [(Integer, StreamGraph)]
destGs= (StreamVertex -> [(Integer, StreamGraph)])
-> [StreamVertex] -> [(Integer, StreamGraph)]
forall (t :: * -> *) a b. Foldable t => (a -> [b]) -> t a -> [b]
concatMap (\StreamVertex
v -> ((Integer, StreamGraph) -> Bool)
-> [(Integer, StreamGraph)] -> [(Integer, StreamGraph)]
forall a. (a -> Bool) -> [a] -> [a]
filter (\(Integer
n,StreamGraph
sg) -> StreamVertex
v StreamVertex -> [StreamVertex] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` (StreamGraph -> [StreamVertex]
forall a. Ord a => Graph a -> [a]
vertexList StreamGraph
sg)) [(Integer, StreamGraph)]
parts) [StreamVertex]
destVs
in case ((Integer, StreamGraph) -> Integer)
-> [(Integer, StreamGraph)] -> [Integer]
forall a b. (a -> b) -> [a] -> [b]
map (Integer, StreamGraph) -> Integer
forall a b. (a, b) -> a
fst [(Integer, StreamGraph)]
destGs of
[] -> String -> [Integer]
forall a. HasCallStack => String -> a
error String
"connectNodeId returned an empty list, last vertex optimised away?"
[Integer]
x -> [Integer]
x
generateSrcFn :: StreamGraph -> String
generateSrcFn :: StreamGraph -> String
generateSrcFn StreamGraph
sg = String
"src1 = " String -> ShowS
forall a. [a] -> [a] -> [a]
++
(String -> [String] -> String
forall a. [a] -> [[a]] -> [a]
intercalate String
"\n" ([String] -> String)
-> (StreamGraph -> [String]) -> StreamGraph -> String
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (ExpQ -> String) -> [ExpQ] -> [String]
forall a b. (a -> b) -> [a] -> [b]
map ExpQ -> String
showParam ([ExpQ] -> [String])
-> (StreamGraph -> [ExpQ]) -> StreamGraph -> [String]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamVertex -> [ExpQ]
parameters (StreamVertex -> [ExpQ])
-> (StreamGraph -> StreamVertex) -> StreamGraph -> [ExpQ]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [StreamVertex] -> StreamVertex
forall a. [a] -> a
head ([StreamVertex] -> StreamVertex)
-> (StreamGraph -> [StreamVertex]) -> StreamGraph -> StreamVertex
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamGraph -> [StreamVertex]
forall a. Ord a => Graph a -> [a]
vertexList (StreamGraph -> String) -> StreamGraph -> String
forall a b. (a -> b) -> a -> b
$ StreamGraph
sg) String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
"\n"
generateSinkFn:: StreamGraph -> String
generateSinkFn :: StreamGraph -> String
generateSinkFn StreamGraph
sg = String
"sink1 :: Show a => Stream a -> IO ()\nsink1 = " String -> ShowS
forall a. [a] -> [a] -> [a]
++
(String -> [String] -> String
forall a. [a] -> [[a]] -> [a]
intercalate String
"\n" ([String] -> String)
-> (StreamGraph -> [String]) -> StreamGraph -> String
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (ExpQ -> String) -> [ExpQ] -> [String]
forall a b. (a -> b) -> [a] -> [b]
map ExpQ -> String
showParam ([ExpQ] -> [String])
-> (StreamGraph -> [ExpQ]) -> StreamGraph -> [String]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamVertex -> [ExpQ]
parameters (StreamVertex -> [ExpQ])
-> (StreamGraph -> StreamVertex) -> StreamGraph -> [ExpQ]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [StreamVertex] -> StreamVertex
forall a. [a] -> a
head ([StreamVertex] -> StreamVertex)
-> (StreamGraph -> [StreamVertex]) -> StreamGraph -> StreamVertex
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [StreamVertex] -> [StreamVertex]
forall a. [a] -> [a]
reverse ([StreamVertex] -> [StreamVertex])
-> (StreamGraph -> [StreamVertex]) -> StreamGraph -> [StreamVertex]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamGraph -> [StreamVertex]
forall a. Ord a => Graph a -> [a]
vertexList (StreamGraph -> String) -> StreamGraph -> String
forall a b. (a -> b) -> a -> b
$ StreamGraph
sg) String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
"\n"
generateNodeLink :: Integer -> String
generateNodeLink :: Integer -> String
generateNodeLink Integer
n = String
"main = nodeLink (defaultLink \"9001\" \"node"String -> ShowS
forall a. [a] -> [a] -> [a]
++(Integer -> String
forall a. Show a => a -> String
show Integer
n)String -> ShowS
forall a. [a] -> [a] -> [a]
++String
"\" \"9001\") streamGraphFn"
generateNodeSrc :: Integer -> [Integer] -> GenerateOpts -> [(Integer, StreamGraph)] -> String
generateNodeSrc :: Integer
-> [Integer] -> GenerateOpts -> [(Integer, StreamGraph)] -> String
generateNodeSrc Integer
partId [Integer]
nodes GenerateOpts
opts [(Integer, StreamGraph)]
parts = let
node :: Integer
node = [Integer] -> Integer
forall a. [a] -> a
head [Integer]
nodes
host :: String
host = String
"node" String -> ShowS
forall a. [a] -> [a] -> [a]
++ (Integer -> String
forall a. Show a => a -> String
show Integer
node)
port :: Integer
port = case Integer -> [(Integer, StreamGraph)] -> Maybe StreamGraph
forall a b. Eq a => a -> [(a, b)] -> Maybe b
lookup Integer
node [(Integer, StreamGraph)]
parts of
Just StreamGraph
sg -> if StreamGraph -> Bool
startsWithJoin StreamGraph
sg
then Integer
9001 Integer -> Integer -> Integer
forall a. Num a => a -> a -> a
+ Integer
partId Integer -> Integer -> Integer
forall a. Num a => a -> a -> a
-Integer
1
else Integer
9001
Maybe StreamGraph
Nothing -> Integer
9001
pref :: String
pref = case GenerateOpts -> Maybe String
preSource GenerateOpts
opts of
Maybe String
Nothing -> String
""
Just String
f -> String
f String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
"\n "
in String
"main = do\n " String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
pref String -> ShowS
forall a. [a] -> [a] -> [a]
++ (ExpQ -> String
showParam [|
nodeSource (defaultSource $(litE (StringL host))
$(litE (StringL (show port))))
src1 streamGraphFn
|])
startsWithJoin :: StreamGraph -> Bool
startsWithJoin :: StreamGraph -> Bool
startsWithJoin = (StreamOperator
JoinStreamOperator -> StreamOperator -> Bool
forall a. Eq a => a -> a -> Bool
==) (StreamOperator -> Bool)
-> (StreamGraph -> StreamOperator) -> StreamGraph -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamGraph -> StreamOperator
startsWith
startsWith :: StreamGraph -> StreamOperator
startsWith :: StreamGraph -> StreamOperator
startsWith StreamGraph
sg = let
inEdges :: [StreamVertex]
inEdges = ((StreamVertex, StreamVertex) -> StreamVertex)
-> [(StreamVertex, StreamVertex)] -> [StreamVertex]
forall a b. (a -> b) -> [a] -> [b]
map (StreamVertex, StreamVertex) -> StreamVertex
forall a b. (a, b) -> b
snd (StreamGraph -> [(StreamVertex, StreamVertex)]
forall a. Ord a => Graph a -> [(a, a)]
edgeList StreamGraph
sg)
in (StreamVertex -> StreamOperator
operator (StreamVertex -> StreamOperator)
-> (StreamGraph -> StreamVertex) -> StreamGraph -> StreamOperator
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [StreamVertex] -> StreamVertex
forall a. [a] -> a
head ([StreamVertex] -> StreamVertex)
-> (StreamGraph -> [StreamVertex]) -> StreamGraph -> StreamVertex
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (StreamVertex -> Bool) -> [StreamVertex] -> [StreamVertex]
forall a. (a -> Bool) -> [a] -> [a]
filter (Bool -> Bool
not (Bool -> Bool) -> (StreamVertex -> Bool) -> StreamVertex -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ((StreamVertex -> [StreamVertex] -> Bool)
-> [StreamVertex] -> StreamVertex -> Bool
forall a b c. (a -> b -> c) -> b -> a -> c
flip StreamVertex -> [StreamVertex] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
elem) [StreamVertex]
inEdges) ([StreamVertex] -> [StreamVertex])
-> (StreamGraph -> [StreamVertex]) -> StreamGraph -> [StreamVertex]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamGraph -> [StreamVertex]
forall a. Ord a => Graph a -> [a]
vertexList) StreamGraph
sg
test_startsWithJoin_1 :: IO ()
test_startsWithJoin_1 = HasCallStack => Bool -> IO ()
Bool -> IO ()
assertBool (Bool -> IO ())
-> ([StreamVertex] -> Bool) -> [StreamVertex] -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamGraph -> Bool
startsWithJoin (StreamGraph -> Bool)
-> ([StreamVertex] -> StreamGraph) -> [StreamVertex] -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path ([StreamVertex] -> IO ()) -> [StreamVertex] -> IO ()
forall a b. (a -> b) -> a -> b
$
[Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 StreamOperator
Join [] String
"" String
"" Double
1, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 StreamOperator
Merge [] String
"" String
"" Double
2]
test_startsWithJoin_2 :: IO ()
test_startsWithJoin_2 = HasCallStack => Bool -> IO ()
Bool -> IO ()
assertBool (Bool -> IO ())
-> ([StreamVertex] -> Bool) -> [StreamVertex] -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Bool -> Bool
not (Bool -> Bool)
-> ([StreamVertex] -> Bool) -> [StreamVertex] -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamGraph -> Bool
startsWithJoin (StreamGraph -> Bool)
-> ([StreamVertex] -> StreamGraph) -> [StreamVertex] -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path ([StreamVertex] -> IO ()) -> [StreamVertex] -> IO ()
forall a b. (a -> b) -> a -> b
$
[Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 StreamOperator
Merge [] String
"" String
"" Double
3, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 StreamOperator
Join [] String
"" String
"" Double
4]
generateNodeSink :: StreamGraph -> String
generateNodeSink :: StreamGraph -> String
generateNodeSink StreamGraph
sg = String
"main = " String -> ShowS
forall a. [a] -> [a] -> [a]
++ (ExpQ -> String
showParam (ExpQ -> String) -> ExpQ -> String
forall a b. (a -> b) -> a -> b
$
if StreamGraph -> Bool
startsWithJoin StreamGraph
sg
then [| nodeSink2 streamGraphFn sink1 "9001" "9002" |]
else [| nodeSink (defaultSink "9001") streamGraphFn sink1 |])
generateCodeFromVertex :: (Int, StreamVertex) -> String
generateCodeFromVertex :: (Int, StreamVertex) -> String
generateCodeFromVertex (Int
opid, StreamVertex
v) = let
op :: StreamOperator
op = StreamVertex -> StreamOperator
operator StreamVertex
v
n :: String
n = Char
'n' Char -> ShowS
forall a. a -> [a] -> [a]
: Int -> String
forall a. Show a => a -> String
show Int
opid
n_1 :: String
n_1 = Char
'n' Char -> ShowS
forall a. a -> [a] -> [a]
: Int -> String
forall a. Show a => a -> String
show (Int
opid Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1)
in case StreamOperator
op of
StreamOperator
Merge -> String
nString -> ShowS
forall a. [a] -> [a] -> [a]
++String
" = "String -> ShowS
forall a. [a] -> [a] -> [a]
++String
n_1
StreamOperator
Join -> let n_2 :: String
n_2 = Char
'n' Char -> ShowS
forall a. a -> [a] -> [a]
: Int -> String
forall a. Show a => a -> String
show (Int
opid Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
2)
in [String] -> String
forall (t :: * -> *) a. Foldable t => t [a] -> [a]
concat [String
n, String
" = streamJoin ", String
n_2, String
" ", String
n_1]
StreamOperator
_ -> let params :: String
params = String -> [String] -> String
forall a. [a] -> [[a]] -> [a]
intercalate String
" " ([String] -> String) -> [String] -> String
forall a b. (a -> b) -> a -> b
$ (ExpQ -> String) -> [ExpQ] -> [String]
forall a b. (a -> b) -> [a] -> [b]
map (ShowS
parenShowS -> (ExpQ -> String) -> ExpQ -> String
forall b c a. (b -> c) -> (a -> b) -> a -> c
.ExpQ -> String
showParam) (StreamVertex -> [ExpQ]
parameters StreamVertex
v)
in [String] -> String
forall (t :: * -> *) a. Foldable t => t [a] -> [a]
concat [String
n, String
" = (\\s -> ", StreamOperator -> String
printOp StreamOperator
op, String
" ", String
params, String
" s) ", String
n_1]
paren :: String -> String
paren :: ShowS
paren String
s = String
"("String -> ShowS
forall a. [a] -> [a] -> [a]
++String
sString -> ShowS
forall a. [a] -> [a] -> [a]
++String
")"
printOp :: StreamOperator -> String
printOp :: StreamOperator -> String
printOp (Filter Double
_) = String
"streamFilter"
printOp (FilterAcc Double
_) = String
"streamFilterAcc"
printOp StreamOperator
op = String
"stream" String -> ShowS
forall a. [a] -> [a] -> [a]
++ (StreamOperator -> String
forall a. Show a => a -> String
show StreamOperator
op)
main :: IO ()
main = TestSuite -> IO ()
forall t. TestableHTF t => t -> IO ()
htfMain htf_thisModulesTests
s0 :: StreamGraph
s0 = StreamGraph -> StreamGraph -> StreamGraph
forall a. Graph a -> Graph a -> Graph a
connect (StreamVertex -> StreamGraph
forall a. a -> Graph a
Vertex (Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 ((Double -> StreamOperator
Source Double
1)) [] String
"String" String
"String" Double
1))
(StreamVertex -> StreamGraph
forall a. a -> Graph a
Vertex (Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 (StreamOperator
Sink) [] String
"String" String
"String" Double
2))
s1 :: StreamGraph
s1 = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path [ Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 ((Double -> StreamOperator
Source Double
1)) [] String
"String" String
"String" Double
3
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 (Double -> StreamOperator
Filter Double
0.5) [] String
"String" String
"String" Double
4
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
2 (StreamOperator
Sink) [] String
"String" String
"String" Double
5
]
test_reform_s0 :: IO ()
test_reform_s0 = StreamGraph -> StreamGraph -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
assertEqual StreamGraph
s0 (PartitionedGraph -> StreamGraph
unPartition (PartitionedGraph -> StreamGraph)
-> PartitionedGraph -> StreamGraph
forall a b. (a -> b) -> a -> b
$ StreamGraph -> PartitionMap -> PartitionedGraph
createPartitions StreamGraph
s0 [[Int
0],[Int
1]])
test_reform_s1 :: IO ()
test_reform_s1 = StreamGraph -> StreamGraph -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
assertEqual StreamGraph
s1 (PartitionedGraph -> StreamGraph
unPartition (PartitionedGraph -> StreamGraph)
-> PartitionedGraph -> StreamGraph
forall a b. (a -> b) -> a -> b
$ StreamGraph -> PartitionMap -> PartitionedGraph
createPartitions StreamGraph
s1 [[Int
0,Int
1],[Int
2]])
test_reform_s1_2 :: IO ()
test_reform_s1_2 = StreamGraph -> StreamGraph -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
assertEqual StreamGraph
s1 (PartitionedGraph -> StreamGraph
unPartition (PartitionedGraph -> StreamGraph)
-> PartitionedGraph -> StreamGraph
forall a b. (a -> b) -> a -> b
$ StreamGraph -> PartitionMap -> PartitionedGraph
createPartitions StreamGraph
s1 [[Int
0],[Int
1,Int
2]])
genDockerfile :: Bool -> GenerateOpts -> String
genDockerfile Bool
listen GenerateOpts
opts =
let pkgs :: [String]
pkgs = GenerateOpts -> [String]
packages GenerateOpts
opts in [String] -> String
forall (t :: * -> *) a. Foldable t => t [a] -> [a]
concat
[ String
"FROM ghcr.io/striot/striot:main\n"
, String
"WORKDIR /opt/node\n"
, String
"COPY . /opt/node\n"
, if [String]
pkgs [String] -> [String] -> Bool
forall a. Eq a => a -> a -> Bool
/= [] then String
"RUN cabal install " String -> ShowS
forall a. [a] -> [a] -> [a]
++ (String -> [String] -> String
forall a. [a] -> [[a]] -> [a]
intercalate String
" " [String]
pkgs) else String
""
, String
"\n"
, String
"RUN ghc node.hs\n"
, if Bool
listen then String
"EXPOSE 9001\n" else String
""
, String
"CMD /opt/node/node\n"
]
mergeEx :: StreamGraph
mergeEx = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path
[ Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 (Double -> StreamOperator
Source Double
1) [] String
"Int" String
"Int" Double
1
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 StreamOperator
Merge [] String
"[Int]" String
"Int" Double
2
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
2 StreamOperator
Map [[| show |]] String
"Int" String
"String" Double
3
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
3 StreamOperator
Sink [[| mapM_ print |]] String
"String" String
"String" Double
4
]
writePart :: GenerateOpts -> (Int, String) -> IO ()
writePart :: GenerateOpts -> (Int, String) -> IO ()
writePart GenerateOpts
opts (Int
x,String
y) = let
dockerfile :: String
dockerfile = Bool -> GenerateOpts -> String
genDockerfile Bool
True GenerateOpts
opts
bn :: String
bn = String
"node" String -> ShowS
forall a. [a] -> [a] -> [a]
++ (Int -> String
forall a. Show a => a -> String
show Int
x)
fn :: String
fn = String
bn String -> ShowS
</> String
"node.hs"
in do
Bool -> String -> IO ()
createDirectoryIfMissing Bool
True String
bn
String -> String -> IO ()
writeFile (String
bn String -> ShowS
</> String
"Dockerfile") String
dockerfile
String -> String -> IO ()
writeFile String
fn String
y
partitionGraph :: StreamGraph -> PartitionMap -> GenerateOpts -> IO ()
partitionGraph :: StreamGraph -> PartitionMap -> GenerateOpts -> IO ()
partitionGraph StreamGraph
graph PartitionMap
partitions GenerateOpts
opts = do
((Int, String) -> IO ()) -> [(Int, String)] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (GenerateOpts -> (Int, String) -> IO ()
writePart GenerateOpts
opts) ([(Int, String)] -> IO ()) -> [(Int, String)] -> IO ()
forall a b. (a -> b) -> a -> b
$ [Int] -> [String] -> [(Int, String)]
forall a b. [a] -> [b] -> [(a, b)]
zip [Int
1..] ([String] -> [(Int, String)]) -> [String] -> [(Int, String)]
forall a b. (a -> b) -> a -> b
$ GenerateOpts -> StreamGraph -> PartitionMap -> [String]
generateCode GenerateOpts
opts StreamGraph
graph PartitionMap
partitions
partitionings :: StreamGraph -> [Partition] -> PartitionMap
partitionings :: StreamGraph -> [Int] -> PartitionMap
partitionings StreamGraph
sg [Int]
parts = let
vIds :: [Int]
vIds = (StreamVertex -> Int) -> [StreamVertex] -> [Int]
forall a b. (a -> b) -> [a] -> [b]
map StreamVertex -> Int
vertexId ([StreamVertex] -> [Int])
-> (StreamGraph -> [StreamVertex]) -> StreamGraph -> [Int]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamGraph -> [StreamVertex]
forall a. Ord a => Graph a -> [a]
vertexList (StreamGraph -> [Int]) -> StreamGraph -> [Int]
forall a b. (a -> b) -> a -> b
$ StreamGraph
sg
in case [Int] -> [Int] -> Ordering
forall a b. [a] -> [b] -> Ordering
compareLength [Int]
vIds [Int]
parts of
Ordering
EQ -> (Int -> [Int]) -> [Int] -> PartitionMap
forall a b. (a -> b) -> [a] -> [b]
map (Int -> [Int] -> [Int]
forall a. a -> [a] -> [a]
:[]) [Int]
vIds
Ordering
GT -> let
diff :: Int
diff = [Int] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [Int]
vIds Int -> Int -> Int
forall a. Num a => a -> a -> a
- [Int] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [Int]
parts
([Int]
first,[Int]
rest) = Int -> [Int] -> ([Int], [Int])
forall a. Int -> [a] -> ([a], [a])
splitAt (Int
diff Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) [Int]
vIds
in [[Int]
first] PartitionMap -> PartitionMap -> PartitionMap
forall a. [a] -> [a] -> [a]
++ (Int -> [Int]) -> [Int] -> PartitionMap
forall a b. (a -> b) -> [a] -> [b]
map (Int -> [Int] -> [Int]
forall a. a -> [a] -> [a]
:[]) [Int]
rest
Ordering
LT -> String -> PartitionMap
forall a. HasCallStack => String -> a
error String
"cannot partition a graph over more partitions than there are nodes"
partTestGraph :: StreamGraph
partTestGraph = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path
[ Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 (Double -> StreamOperator
Source Double
1) [] String
"Int" String
"Int" Double
1
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 StreamOperator
Map [[| show |]] String
"Int" String
"String" Double
2
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
2 (Double -> StreamOperator
Filter Double
0.5) [[| (<3) |]] String
"Int" String
"Int" Double
3
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
3 StreamOperator
Window [] String
"String" String
"[String]" Double
4
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
4 StreamOperator
Sink [] String
"String" String
"String" Double
5
]
test_partitionings_1 :: IO ()
test_partitionings_1 = PartitionMap -> PartitionMap -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
assertEqual [[Int
x]|Int
x <- [Int
0..Int
4]] (PartitionMap -> IO ()) -> PartitionMap -> IO ()
forall a b. (a -> b) -> a -> b
$
StreamGraph -> [Int] -> PartitionMap
partitionings StreamGraph
partTestGraph [Int
0..Int
4]
test_partitionings_2 :: IO ()
test_partitionings_2 = Int -> Int -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
assertEqual Int
3 (Int -> IO ()) -> Int -> IO ()
forall a b. (a -> b) -> a -> b
$ PartitionMap -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length (PartitionMap -> Int) -> PartitionMap -> Int
forall a b. (a -> b) -> a -> b
$
StreamGraph -> [Int] -> PartitionMap
partitionings StreamGraph
partTestGraph [Int
0..Int
2]
test_partitionings_3 :: IO ()
test_partitionings_3 = Int -> Int -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
assertEqual Int
3 (Int -> IO ()) -> Int -> IO ()
forall a b. (a -> b) -> a -> b
$ [Int] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length ([Int] -> Int) -> [Int] -> Int
forall a b. (a -> b) -> a -> b
$ PartitionMap -> [Int]
forall a. [a] -> a
head (PartitionMap -> [Int]) -> PartitionMap -> [Int]
forall a b. (a -> b) -> a -> b
$
StreamGraph -> [Int] -> PartitionMap
partitionings StreamGraph
partTestGraph [Int
0..Int
2]