{-# OPTIONS_GHC -F -pgmF htfpp #-}
{-# OPTIONS_HADDOCK prune #-}
{-# LANGUAGE TemplateHaskell #-}
module Striot.Orchestration ( Plan
, Cost
, distributeProgram
, chopAndChange
, viableRewrites
, deriveRewritesAndPartitionings
, allPartitionsPaired
, planCost
, simpleStream
, partitionGraph
, GenerateOpts(..)
, defaultOpts
, htf_thisModulesTests
) where
import Algebra.Graph
import Data.List (nub, sortOn, sort)
import Data.Maybe (fromJust, isJust)
import Test.Framework
import Data.Function ((&))
import Control.Arrow ((>>>))
import Striot.CompileIoT
import Striot.Jackson
import Striot.LogicalOptimiser (applyRules, RewriteRule)
import Striot.Partition
import Striot.StreamGraph
import Striot.VizGraph
import Striot.Bandwidth
type Plan = (StreamGraph, PartitionMap)
type Cost = Maybe Int
distributeProgram :: GenerateOpts -> StreamGraph -> IO ()
distributeProgram :: GenerateOpts -> StreamGraph -> IO ()
distributeProgram GenerateOpts
opts StreamGraph
sg = let
(StreamGraph
best,PartitionMap
partMap) = GenerateOpts -> StreamGraph -> (StreamGraph, PartitionMap)
chopAndChange GenerateOpts
opts StreamGraph
sg
in StreamGraph -> PartitionMap -> GenerateOpts -> IO ()
partitionGraph StreamGraph
best PartitionMap
partMap GenerateOpts
opts
chopAndChange :: GenerateOpts -> StreamGraph -> Plan
chopAndChange :: GenerateOpts -> StreamGraph -> (StreamGraph, PartitionMap)
chopAndChange GenerateOpts
opts StreamGraph
sg = case GenerateOpts
-> StreamGraph -> [((StreamGraph, PartitionMap), Cost)]
viableRewrites GenerateOpts
opts StreamGraph
sg of
[] -> [Char] -> (StreamGraph, PartitionMap)
forall a. HasCallStack => [Char] -> a
error [Char]
"distributeProgram: no viable programs"
[((StreamGraph, PartitionMap), Cost)]
rs -> [((StreamGraph, PartitionMap), Cost)]
rs [((StreamGraph, PartitionMap), Cost)]
-> ([((StreamGraph, PartitionMap), Cost)]
-> [((StreamGraph, PartitionMap), Cost)])
-> [((StreamGraph, PartitionMap), Cost)]
forall a b. a -> (a -> b) -> b
& (((StreamGraph, PartitionMap), Cost) -> Cost)
-> [((StreamGraph, PartitionMap), Cost)]
-> [((StreamGraph, PartitionMap), Cost)]
forall b a. Ord b => (a -> b) -> [a] -> [a]
sortOn ((StreamGraph, PartitionMap), Cost) -> Cost
forall a b. (a, b) -> b
snd [((StreamGraph, PartitionMap), Cost)]
-> ([((StreamGraph, PartitionMap), Cost)]
-> ((StreamGraph, PartitionMap), Cost))
-> ((StreamGraph, PartitionMap), Cost)
forall a b. a -> (a -> b) -> b
& [((StreamGraph, PartitionMap), Cost)]
-> ((StreamGraph, PartitionMap), Cost)
forall a. [a] -> a
head ((StreamGraph, PartitionMap), Cost)
-> (((StreamGraph, PartitionMap), Cost)
-> (StreamGraph, PartitionMap))
-> (StreamGraph, PartitionMap)
forall a b. a -> (a -> b) -> b
& ((StreamGraph, PartitionMap), Cost) -> (StreamGraph, PartitionMap)
forall a b. (a, b) -> a
fst
viableRewrites :: GenerateOpts -> StreamGraph -> [(Plan, Cost)]
viableRewrites :: GenerateOpts
-> StreamGraph -> [((StreamGraph, PartitionMap), Cost)]
viableRewrites GenerateOpts
opts = [RewriteRule] -> StreamGraph -> [(StreamGraph, PartitionMap)]
deriveRewritesAndPartitionings (GenerateOpts -> [RewriteRule]
rules GenerateOpts
opts)
(StreamGraph -> [(StreamGraph, PartitionMap)])
-> ([(StreamGraph, PartitionMap)]
-> [((StreamGraph, PartitionMap), Cost)])
-> StreamGraph
-> [((StreamGraph, PartitionMap), Cost)]
forall {k} (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> ((StreamGraph, PartitionMap)
-> ((StreamGraph, PartitionMap), Cost))
-> [(StreamGraph, PartitionMap)]
-> [((StreamGraph, PartitionMap), Cost)]
forall a b. (a -> b) -> [a] -> [b]
map (((StreamGraph, PartitionMap) -> Cost)
-> (StreamGraph, PartitionMap)
-> ((StreamGraph, PartitionMap), Cost)
forall a b. (a -> b) -> a -> (a, b)
toSnd (GenerateOpts -> (StreamGraph, PartitionMap) -> Cost
planCost GenerateOpts
opts))
([(StreamGraph, PartitionMap)]
-> [((StreamGraph, PartitionMap), Cost)])
-> ([((StreamGraph, PartitionMap), Cost)]
-> [((StreamGraph, PartitionMap), Cost)])
-> [(StreamGraph, PartitionMap)]
-> [((StreamGraph, PartitionMap), Cost)]
forall {k} (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> (((StreamGraph, PartitionMap), Cost) -> Bool)
-> [((StreamGraph, PartitionMap), Cost)]
-> [((StreamGraph, PartitionMap), Cost)]
forall a. (a -> Bool) -> [a] -> [a]
filter (Cost -> Bool
forall a. Maybe a -> Bool
isJust (Cost -> Bool)
-> (((StreamGraph, PartitionMap), Cost) -> Cost)
-> ((StreamGraph, PartitionMap), Cost)
-> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ((StreamGraph, PartitionMap), Cost) -> Cost
forall a b. (a, b) -> b
snd)
test_viableRewrites_graph :: IO ()
test_viableRewrites_graph = [((StreamGraph, PartitionMap), Cost)] -> IO ()
forall a. HasCallStack => [a] -> IO ()
assertNotEmpty ([((StreamGraph, PartitionMap), Cost)] -> IO ())
-> [((StreamGraph, PartitionMap), Cost)] -> IO ()
forall a b. (a -> b) -> a -> b
$ GenerateOpts
-> StreamGraph -> [((StreamGraph, PartitionMap), Cost)]
viableRewrites GenerateOpts
defaultOpts StreamGraph
graph
test_viableRewrites_tooMuch :: IO ()
test_viableRewrites_tooMuch = [((StreamGraph, PartitionMap), Cost)] -> IO ()
forall a. HasCallStack => [a] -> IO ()
assertEmpty ([((StreamGraph, PartitionMap), Cost)] -> IO ())
-> [((StreamGraph, PartitionMap), Cost)] -> IO ()
forall a b. (a -> b) -> a -> b
$ GenerateOpts
-> StreamGraph -> [((StreamGraph, PartitionMap), Cost)]
viableRewrites GenerateOpts
defaultOpts StreamGraph
tooMuch
toSnd :: (a -> b) -> a -> (a, b)
toSnd :: forall a b. (a -> b) -> a -> (a, b)
toSnd a -> b
f a
a = (a
a, a -> b
f a
a)
toFst :: (a -> b) -> a -> (b, a)
toFst :: forall a b. (a -> b) -> a -> (b, a)
toFst a -> b
f a
a = (a -> b
f a
a, a
a)
deriveRewritesAndPartitionings :: [RewriteRule] -> StreamGraph -> [Plan]
deriveRewritesAndPartitionings :: [RewriteRule] -> StreamGraph -> [(StreamGraph, PartitionMap)]
deriveRewritesAndPartitionings [RewriteRule]
rs = (StreamGraph -> [(StreamGraph, PartitionMap)])
-> [StreamGraph] -> [(StreamGraph, PartitionMap)]
forall (t :: * -> *) a b. Foldable t => (a -> [b]) -> t a -> [b]
concatMap StreamGraph -> [(StreamGraph, PartitionMap)]
allPartitionsPaired ([StreamGraph] -> [(StreamGraph, PartitionMap)])
-> (StreamGraph -> [StreamGraph])
-> StreamGraph
-> [(StreamGraph, PartitionMap)]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [StreamGraph] -> [StreamGraph]
forall a. Eq a => [a] -> [a]
nub ([StreamGraph] -> [StreamGraph])
-> (StreamGraph -> [StreamGraph]) -> StreamGraph -> [StreamGraph]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [RewriteRule] -> Int -> StreamGraph -> [StreamGraph]
applyRules [RewriteRule]
rs Int
5
allPartitionsPaired :: StreamGraph -> [Plan]
allPartitionsPaired :: StreamGraph -> [(StreamGraph, PartitionMap)]
allPartitionsPaired StreamGraph
sg = (PartitionMap -> (StreamGraph, PartitionMap))
-> [PartitionMap] -> [(StreamGraph, PartitionMap)]
forall a b. (a -> b) -> [a] -> [b]
map (\PartitionMap
pm -> (StreamGraph
sg,PartitionMap
pm)) (StreamGraph -> [PartitionMap]
allPartitions StreamGraph
sg)
planCost :: GenerateOpts -> Plan -> Cost
planCost :: GenerateOpts -> (StreamGraph, PartitionMap) -> Cost
planCost GenerateOpts
opts (StreamGraph
sg,PartitionMap
pm) = let
oi :: [OperatorInfo]
oi = StreamGraph -> [OperatorInfo]
calcAllSg StreamGraph
sg
in if [OperatorInfo] -> Bool
isOverUtilised [OperatorInfo]
oi
Bool -> Bool -> Bool
|| (Double -> Bool) -> [Double] -> Bool
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool
any (Double -> Double -> Bool
forall a. Ord a => a -> a -> Bool
> GenerateOpts -> Double
maxNodeUtil GenerateOpts
opts) ([OperatorInfo] -> PartitionMap -> [Double]
totalNodeUtilisations [OperatorInfo]
oi PartitionMap
pm)
Bool -> Bool -> Bool
|| StreamGraph -> PartitionMap -> Double -> Bool
overBandwidthLimit StreamGraph
sg PartitionMap
pm (GenerateOpts -> Double
bandwidthLimit GenerateOpts
opts)
then Cost
forall a. Maybe a
Nothing
else Int -> Cost
forall a. a -> Maybe a
Just (PartitionMap -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length PartitionMap
pm)
opts :: GenerateOpts
opts = GenerateOpts
defaultOpts { imports :: [[Char]]
imports = GenerateOpts -> [[Char]]
imports GenerateOpts
defaultOpts [[Char]] -> [[Char]] -> [[Char]]
forall a. [a] -> [a] -> [a]
++ [ [Char]
"System.Random" ] }
source :: Q Exp
source = [| do
i <- getStdRandom (randomR (1,10)) :: IO Int
threadDelay 1000000
putStrLn $ "client sending " ++ (show i)
return i
|]
v1 :: StreamVertex
v1 = Int
-> StreamOperator
-> [Q Exp]
-> [Char]
-> [Char]
-> Double
-> StreamVertex
StreamVertex Int
0 (Double -> StreamOperator
Source Double
1) [Q Exp
source] [Char]
"Int" [Char]
"Int" Double
0
v2 :: StreamVertex
v2 = Int
-> StreamOperator
-> [Q Exp]
-> [Char]
-> [Char]
-> Double
-> StreamVertex
StreamVertex Int
1 (Double -> StreamOperator
Source Double
1) [Q Exp
source] [Char]
"Int" [Char]
"Int" Double
0
v3 :: StreamVertex
v3 = Int
-> StreamOperator
-> [Q Exp]
-> [Char]
-> [Char]
-> Double
-> StreamVertex
StreamVertex Int
2 StreamOperator
Merge [] [Char]
"Int" [Char]
"Int" Double
0
v4 :: StreamVertex
v4 = Int
-> StreamOperator
-> [Q Exp]
-> [Char]
-> [Char]
-> Double
-> StreamVertex
StreamVertex Int
3 (Double -> StreamOperator
Filter Double
0.5) [[| (>3) |]] [Char]
"Int" [Char]
"Int" Double
1
v5 :: StreamVertex
v5 = Int
-> StreamOperator
-> [Q Exp]
-> [Char]
-> [Char]
-> Double
-> StreamVertex
StreamVertex Int
4 StreamOperator
Sink [[| mapM_ print |]] [Char]
"Int" [Char]
"Int" Double
0
graph :: StreamGraph
graph = StreamGraph -> StreamGraph -> StreamGraph
forall a. Graph a -> Graph a -> Graph a
overlay ([StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path [StreamVertex
v1,StreamVertex
v3,StreamVertex
v4,StreamVertex
v5]) ([StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path [StreamVertex
v2,StreamVertex
v3])
tooMuch :: StreamGraph
tooMuch = [(StreamOperator, [Q Exp], [Char], Double)] -> StreamGraph
simpleStream
[ (Double -> StreamOperator
Source Double
1, [[| return 1 |]], [Char]
"Int", Double
0)
, (StreamOperator
Map, [[| (+1) |]], [Char]
"Int", Double
2)
, (StreamOperator
Sink, [[| mapM_ print |]], [Char]
"Int", Double
0)
]
test_tooMuch_notviable :: IO ()
test_tooMuch_notviable = HasCallStack => Bool -> IO ()
Bool -> IO ()
assertBool ([OperatorInfo] -> Bool
isOverUtilised ([OperatorInfo] -> Bool)
-> (StreamGraph -> [OperatorInfo]) -> StreamGraph -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamGraph -> [OperatorInfo]
calcAllSg (StreamGraph -> Bool) -> StreamGraph -> Bool
forall a b. (a -> b) -> a -> b
$ StreamGraph
tooMuch)
main :: IO ()
main = TestSuite -> IO ()
forall t. TestableHTF t => t -> IO ()
htfMain htf_thisModulesTests
totalNodeUtilisations :: [OperatorInfo] -> PartitionMap -> [Double]
totalNodeUtilisations :: [OperatorInfo] -> PartitionMap -> [Double]
totalNodeUtilisations [OperatorInfo]
oi PartitionMap
pm =
let oi' :: [(Int, OperatorInfo)]
oi' = (OperatorInfo -> (Int, OperatorInfo))
-> [OperatorInfo] -> [(Int, OperatorInfo)]
forall a b. (a -> b) -> [a] -> [b]
map ((OperatorInfo -> Int) -> OperatorInfo -> (Int, OperatorInfo)
forall a b. (a -> b) -> a -> (b, a)
toFst OperatorInfo -> Int
opId) [OperatorInfo]
oi
in ([Int] -> Double) -> PartitionMap -> [Double]
forall a b. (a -> b) -> [a] -> [b]
map ([(Int, OperatorInfo)] -> [Int] -> Double
sumPartitionUtilisation [(Int, OperatorInfo)]
oi') PartitionMap
pm
sumPartitionUtilisation :: [(Int, OperatorInfo)] -> [Partition] -> Double
sumPartitionUtilisation :: [(Int, OperatorInfo)] -> [Int] -> Double
sumPartitionUtilisation [(Int, OperatorInfo)]
opInfo =
[Double] -> Double
forall (t :: * -> *) a. (Foldable t, Num a) => t a -> a
sum ([Double] -> Double) -> ([Int] -> [Double]) -> [Int] -> Double
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Int -> Double) -> [Int] -> [Double]
forall a b. (a -> b) -> [a] -> [b]
map (OperatorInfo -> Double
util (OperatorInfo -> Double) -> (Int -> OperatorInfo) -> Int -> Double
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Maybe OperatorInfo -> OperatorInfo
forall a. HasCallStack => Maybe a -> a
fromJust (Maybe OperatorInfo -> OperatorInfo)
-> (Int -> Maybe OperatorInfo) -> Int -> OperatorInfo
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ((Int -> [(Int, OperatorInfo)] -> Maybe OperatorInfo)
-> [(Int, OperatorInfo)] -> Int -> Maybe OperatorInfo
forall a b c. (a -> b -> c) -> b -> a -> c
flip Int -> [(Int, OperatorInfo)] -> Maybe OperatorInfo
forall a b. Eq a => a -> [(a, b)] -> Maybe b
lookup) [(Int, OperatorInfo)]
opInfo)
partUtilGraph :: StreamGraph
partUtilGraph = [(StreamOperator, [Q Exp], [Char], Double)] -> StreamGraph
simpleStream
[ ( Double -> StreamOperator
Source Double
1, [[| tempSensor |]], [Char]
"Int", Double
1 )
, ( StreamOperator
Map, [[| farToCels |]], [Char]
"Int", Double
1 )
, ( Double -> StreamOperator
Filter Double
1, [[| over100 |]], [Char]
"Int", Double
1 )
, ( StreamOperator
Map, [[| farToCels |]], [Char]
"Int", Double
1 )
, ( Double -> StreamOperator
Filter Double
1, [[| over100 |]], [Char]
"Int", Double
1 )
, ( StreamOperator
Map, [[| farToCels |]], [Char]
"Int", Double
1 )
, ( Double -> StreamOperator
Filter Double
1, [[| over100 |]], [Char]
"Int", Double
1 )
, ( StreamOperator
Map, [[| farToCels |]], [Char]
"Int", Double
1 )
, ( StreamOperator
Sink, [[| mapM_ print |]], [Char]
"IO ()", Double
1 )
]
test_overUtilisedPartition_minThreePartitions :: IO ()
test_overUtilisedPartition_minThreePartitions = HasCallStack => Bool -> IO ()
Bool -> IO ()
assertBool (Bool -> IO ()) -> Bool -> IO ()
forall a b. (a -> b) -> a -> b
$
(Bool -> Bool
not (Bool -> Bool) -> (StreamGraph -> Bool) -> StreamGraph -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Int -> Bool) -> [Int] -> Bool
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool
any (Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<Int
3) ([Int] -> Bool) -> (StreamGraph -> [Int]) -> StreamGraph -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (((StreamGraph, PartitionMap), Cost) -> Int)
-> [((StreamGraph, PartitionMap), Cost)] -> [Int]
forall a b. (a -> b) -> [a] -> [b]
map (PartitionMap -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length(PartitionMap -> Int)
-> (((StreamGraph, PartitionMap), Cost) -> PartitionMap)
-> ((StreamGraph, PartitionMap), Cost)
-> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
.(StreamGraph, PartitionMap) -> PartitionMap
forall a b. (a, b) -> b
snd((StreamGraph, PartitionMap) -> PartitionMap)
-> (((StreamGraph, PartitionMap), Cost)
-> (StreamGraph, PartitionMap))
-> ((StreamGraph, PartitionMap), Cost)
-> PartitionMap
forall b c a. (b -> c) -> (a -> b) -> a -> c
.((StreamGraph, PartitionMap), Cost) -> (StreamGraph, PartitionMap)
forall a b. (a, b) -> a
fst) ([((StreamGraph, PartitionMap), Cost)] -> [Int])
-> (StreamGraph -> [((StreamGraph, PartitionMap), Cost)])
-> StreamGraph
-> [Int]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. GenerateOpts
-> StreamGraph -> [((StreamGraph, PartitionMap), Cost)]
viableRewrites GenerateOpts
defaultOpts) StreamGraph
partUtilGraph
test_overUtilisedPartition_rejected :: IO ()
test_overUtilisedPartition_rejected =
Cost -> IO ()
forall a. (HasCallStack, Show a) => Maybe a -> IO ()
assertNothing (GenerateOpts -> (StreamGraph, PartitionMap) -> Cost
planCost GenerateOpts
defaultOpts (StreamGraph
partUtilGraph, [[Int
1,Int
2],[Int
3,Int
4,Int
5,Int
6,Int
7,Int
8,Int
9]]))
test_overUtilisedPartition_acceptable :: IO ()
test_overUtilisedPartition_acceptable = PartitionMap -> [PartitionMap] -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> [a] -> IO ()
assertElem [[Int
1,Int
2,Int
3],[Int
4,Int
5,Int
6],[Int
7,Int
8,Int
9]]
([PartitionMap] -> IO ()) -> [PartitionMap] -> IO ()
forall a b. (a -> b) -> a -> b
$ (PartitionMap -> PartitionMap) -> [PartitionMap] -> [PartitionMap]
forall a b. (a -> b) -> [a] -> [b]
map (PartitionMap -> PartitionMap
forall a. Ord a => [a] -> [a]
sort (PartitionMap -> PartitionMap)
-> (PartitionMap -> PartitionMap) -> PartitionMap -> PartitionMap
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (([Int] -> [Int]) -> PartitionMap -> PartitionMap
forall a b. (a -> b) -> [a] -> [b]
map [Int] -> [Int]
forall a. Ord a => [a] -> [a]
sort))
([PartitionMap] -> [PartitionMap])
-> [PartitionMap] -> [PartitionMap]
forall a b. (a -> b) -> a -> b
$ ((((StreamGraph, PartitionMap), Cost) -> PartitionMap)
-> [((StreamGraph, PartitionMap), Cost)] -> [PartitionMap]
forall a b. (a -> b) -> [a] -> [b]
map ((StreamGraph, PartitionMap) -> PartitionMap
forall a b. (a, b) -> b
snd((StreamGraph, PartitionMap) -> PartitionMap)
-> (((StreamGraph, PartitionMap), Cost)
-> (StreamGraph, PartitionMap))
-> ((StreamGraph, PartitionMap), Cost)
-> PartitionMap
forall b c a. (b -> c) -> (a -> b) -> a -> c
.((StreamGraph, PartitionMap), Cost) -> (StreamGraph, PartitionMap)
forall a b. (a, b) -> a
fst) ([((StreamGraph, PartitionMap), Cost)] -> [PartitionMap])
-> (StreamGraph -> [((StreamGraph, PartitionMap), Cost)])
-> StreamGraph
-> [PartitionMap]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. GenerateOpts
-> StreamGraph -> [((StreamGraph, PartitionMap), Cost)]
viableRewrites GenerateOpts
opts) StreamGraph
partUtilGraph
where
opts :: GenerateOpts
opts = GenerateOpts
defaultOpts { bandwidthLimit :: Double
bandwidthLimit = Double
46 }