{-# OPTIONS_GHC -F -pgmF htfpp #-}
{-# OPTIONS_HADDOCK prune #-}
{-# LANGUAGE TemplateHaskell #-}
module Striot.LogicalOptimiser ( applyRules
, applyRule
, firstMatch
, RewriteRule(..)
, defaultRewriteRules
, mapFilter
, filterFilterAcc
, filterAccFilter
, filterAccFilterAcc
, filterFuse
, mapFuse
, mapScan
, expandFilter
, mapFilterAcc
, mapWindow
, expandMap
, expandScan
, expandExpand
, mergeFilter
, mergeExpand
, mergeMap
, mapMerge
, filterMerge
, expandMerge
, mergeFuse
, expandFilterAcc
, filterWindow
, filterAccWindow
, htf_thisModulesTests
) where
import Striot.StreamGraph
import Striot.FunctionalProcessing
import Algebra.Graph
import Test.Framework hiding ((===))
import Data.Char (isLower)
import Data.Maybe (mapMaybe, fromMaybe)
import Data.Function ((&))
import Data.List (nub, sort, intercalate)
import Control.Arrow ((>>>))
type RewriteRule = StreamGraph -> Maybe (StreamGraph -> StreamGraph)
applyRule :: RewriteRule -> StreamGraph -> StreamGraph
applyRule :: RewriteRule -> StreamGraph -> StreamGraph
applyRule RewriteRule
f StreamGraph
g = StreamGraph
g StreamGraph -> (StreamGraph -> StreamGraph) -> StreamGraph
forall a b. a -> (a -> b) -> b
& (StreamGraph -> StreamGraph)
-> Maybe (StreamGraph -> StreamGraph) -> StreamGraph -> StreamGraph
forall a. a -> Maybe a -> a
fromMaybe StreamGraph -> StreamGraph
forall a. a -> a
id (StreamGraph -> RewriteRule -> Maybe (StreamGraph -> StreamGraph)
firstMatch StreamGraph
g RewriteRule
f)
firstMatch :: StreamGraph -> RewriteRule -> Maybe (StreamGraph -> StreamGraph)
firstMatch :: StreamGraph -> RewriteRule -> Maybe (StreamGraph -> StreamGraph)
firstMatch StreamGraph
g RewriteRule
f = case RewriteRule
f StreamGraph
g of
Just StreamGraph -> StreamGraph
f -> (StreamGraph -> StreamGraph) -> Maybe (StreamGraph -> StreamGraph)
forall a. a -> Maybe a
Just StreamGraph -> StreamGraph
f
Maybe (StreamGraph -> StreamGraph)
_ -> case StreamGraph
g of
StreamGraph
Empty -> Maybe (StreamGraph -> StreamGraph)
forall a. Maybe a
Nothing
Vertex StreamVertex
v -> Maybe (StreamGraph -> StreamGraph)
forall a. Maybe a
Nothing
Overlay StreamGraph
a StreamGraph
b -> case StreamGraph -> RewriteRule -> Maybe (StreamGraph -> StreamGraph)
firstMatch StreamGraph
a RewriteRule
f of
Just StreamGraph -> StreamGraph
f -> (StreamGraph -> StreamGraph) -> Maybe (StreamGraph -> StreamGraph)
forall a. a -> Maybe a
Just StreamGraph -> StreamGraph
f
Maybe (StreamGraph -> StreamGraph)
Nothing -> StreamGraph -> RewriteRule -> Maybe (StreamGraph -> StreamGraph)
firstMatch StreamGraph
b RewriteRule
f
Connect StreamGraph
a StreamGraph
b -> case StreamGraph -> RewriteRule -> Maybe (StreamGraph -> StreamGraph)
firstMatch StreamGraph
a RewriteRule
f of
Just StreamGraph -> StreamGraph
f -> (StreamGraph -> StreamGraph) -> Maybe (StreamGraph -> StreamGraph)
forall a. a -> Maybe a
Just StreamGraph -> StreamGraph
f
Maybe (StreamGraph -> StreamGraph)
Nothing -> StreamGraph -> RewriteRule -> Maybe (StreamGraph -> StreamGraph)
firstMatch StreamGraph
b RewriteRule
f
applyRules :: [RewriteRule] -> Int -> StreamGraph -> [StreamGraph]
applyRules :: [RewriteRule] -> Int -> StreamGraph -> [StreamGraph]
applyRules [RewriteRule]
rs Int
n StreamGraph
sg =
if Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
1 then [StreamGraph
sg]
else let
sgs :: [StreamGraph]
sgs = ((StreamGraph -> StreamGraph) -> StreamGraph)
-> [StreamGraph -> StreamGraph] -> [StreamGraph]
forall a b. (a -> b) -> [a] -> [b]
map (StreamGraph -> (StreamGraph -> StreamGraph) -> StreamGraph
forall a b. a -> (a -> b) -> b
(&) StreamGraph
sg) ([StreamGraph -> StreamGraph] -> [StreamGraph])
-> [StreamGraph -> StreamGraph] -> [StreamGraph]
forall a b. (a -> b) -> a -> b
$ (RewriteRule -> Maybe (StreamGraph -> StreamGraph))
-> [RewriteRule] -> [StreamGraph -> StreamGraph]
forall a b. (a -> Maybe b) -> [a] -> [b]
mapMaybe (StreamGraph -> RewriteRule -> Maybe (StreamGraph -> StreamGraph)
firstMatch StreamGraph
sg) [RewriteRule]
rs
in StreamGraph
sg StreamGraph -> [StreamGraph] -> [StreamGraph]
forall a. a -> [a] -> [a]
: [StreamGraph]
sgs [StreamGraph] -> [StreamGraph] -> [StreamGraph]
forall a. [a] -> [a] -> [a]
++ ((StreamGraph -> [StreamGraph]) -> [StreamGraph] -> [StreamGraph]
forall (t :: * -> *) a b. Foldable t => (a -> [b]) -> t a -> [b]
concatMap ([RewriteRule] -> Int -> StreamGraph -> [StreamGraph]
applyRules [RewriteRule]
rs (Int
nInt -> Int -> Int
forall a. Num a => a -> a -> a
-Int
1)) [StreamGraph]
sgs)
rules :: [RewriteRule]
rules :: [RewriteRule]
rules = [ RewriteRule
filterFuse
, RewriteRule
mapFilter
, RewriteRule
filterFilterAcc
, RewriteRule
filterAccFilter
, RewriteRule
filterAccFilterAcc
, RewriteRule
mapFuse
, RewriteRule
mapScan
, RewriteRule
expandFilter
, RewriteRule
mapFilterAcc
, RewriteRule
mapWindow
, RewriteRule
expandMap
, RewriteRule
expandScan
, RewriteRule
expandExpand
, RewriteRule
mergeFilter
, RewriteRule
mergeExpand
, RewriteRule
mergeMap
, RewriteRule
mapMerge
, RewriteRule
filterMerge
, RewriteRule
expandMerge
, RewriteRule
mergeFuse
, RewriteRule
expandFilterAcc
]
defaultRewriteRules :: [RewriteRule]
defaultRewriteRules = [RewriteRule]
rules
filterFuse :: RewriteRule
filterFuse :: RewriteRule
filterFuse (Connect (Vertex a :: StreamVertex
a@(StreamVertex Int
i (Filter Double
sel1) (ExpQ
p:[ExpQ]
_) String
ty String
_ Double
s1))
(Vertex b :: StreamVertex
b@(StreamVertex Int
_ (Filter Double
sel2) (ExpQ
q:[ExpQ]
_) String
_ String
_ Double
s2))) =
let c :: StreamVertex
c = StreamVertex
a { operator :: StreamOperator
operator = Double -> StreamOperator
Filter (Double
sel1 Double -> Double -> Double
forall a. Num a => a -> a -> a
* Double
sel2)
, parameters :: [ExpQ]
parameters = [[| (\p q x -> p x && q x) $(p) $(q) |]]
, serviceTime :: Double
serviceTime = Double -> Double -> Double -> Double
sumTimes Double
s1 Double
sel1 Double
s2
}
in (StreamGraph -> StreamGraph) -> Maybe (StreamGraph -> StreamGraph)
forall a. a -> Maybe a
Just (StreamVertex -> StreamVertex -> StreamGraph -> StreamGraph
forall a. Eq a => a -> a -> Graph a -> Graph a
removeEdge StreamVertex
c StreamVertex
c (StreamGraph -> StreamGraph)
-> (StreamGraph -> StreamGraph) -> StreamGraph -> StreamGraph
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (StreamVertex -> Bool)
-> StreamVertex -> StreamGraph -> StreamGraph
forall a. (a -> Bool) -> a -> Graph a -> Graph a
mergeVertices (StreamVertex -> [StreamVertex] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` [StreamVertex
a,StreamVertex
b]) StreamVertex
c)
filterFuse StreamGraph
_ = Maybe (StreamGraph -> StreamGraph)
forall a. Maybe a
Nothing
gt3 :: ExpQ
gt3 = [| (>3) |]
lt5 :: ExpQ
lt5 = [| (<5) |]
so' :: StreamVertex
so' = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 (Double -> StreamOperator
Source Double
1) [] String
"String" String
"String" Double
1
f3 :: StreamVertex
f3 = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 (Double -> StreamOperator
Filter Double
0.5) [ExpQ
gt3] String
"String" String
"String" Double
0.1
f4 :: StreamVertex
f4 = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
2 (Double -> StreamOperator
Filter Double
0.5) [ExpQ
lt5] String
"String" String
"String" Double
0.2
si' :: StreamVertex
si' = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
3 StreamOperator
Sink [] String
"String" String
"String" Double
1
fused :: ExpQ
fused = [| (\p q x -> p x && q x) (>3) (<5) |]
filterFusePre :: StreamGraph
filterFusePre = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path [StreamVertex
so', StreamVertex
f3, StreamVertex
f4, StreamVertex
si']
filterFusePost :: StreamGraph
filterFusePost = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path [ StreamVertex
so'
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 (Double -> StreamOperator
Filter Double
0.25) [ExpQ
fused] String
"String" String
"String" Double
0.2
, StreamVertex
si' ]
test_filterFuse :: IO ()
test_filterFuse = StreamGraph -> StreamGraph -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
assertEqual (RewriteRule -> StreamGraph -> StreamGraph
applyRule RewriteRule
filterFuse StreamGraph
filterFusePre)
StreamGraph
filterFusePost
mapFilter :: RewriteRule
mapFilter :: RewriteRule
mapFilter (Connect (Vertex m :: StreamVertex
m@(StreamVertex Int
i StreamOperator
Map (ExpQ
f:[ExpQ]
_) String
intype String
_ Double
sm))
(Vertex f1 :: StreamVertex
f1@(StreamVertex Int
j (Filter Double
sel) (ExpQ
p:[ExpQ]
_) String
_ String
_ Double
sf))) =
let f2 :: StreamVertex
f2 = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
i (Double -> StreamOperator
Filter Double
sel) [[| $(p) . $(f) |]] String
intype String
intype (Double
smDouble -> Double -> Double
forall a. Num a => a -> a -> a
+Double
sf)
m2 :: StreamVertex
m2 = StreamVertex
m { vertexId :: Int
vertexId = Int
j }
in (StreamGraph -> StreamGraph) -> Maybe (StreamGraph -> StreamGraph)
forall a. a -> Maybe a
Just (StreamVertex -> StreamVertex -> StreamGraph -> StreamGraph
forall a. Eq a => a -> a -> Graph a -> Graph a
replaceVertex StreamVertex
f1 StreamVertex
m2 (StreamGraph -> StreamGraph)
-> (StreamGraph -> StreamGraph) -> StreamGraph -> StreamGraph
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamVertex -> StreamVertex -> StreamGraph -> StreamGraph
forall a. Eq a => a -> a -> Graph a -> Graph a
replaceVertex StreamVertex
m StreamVertex
f2)
mapFilter StreamGraph
_ = Maybe (StreamGraph -> StreamGraph)
forall a. Maybe a
Nothing
m1 :: StreamVertex
m1 = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 StreamOperator
Map [[| show |]] String
"Int" String
"String" Double
1
f1 :: StreamVertex
f1 = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
2 (Double -> StreamOperator
Filter Double
0.5) [[| \x -> length x <3 |]] String
"String" String
"String" Double
1
f2 :: StreamVertex
f2 = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 (Double -> StreamOperator
Filter Double
0.5) [[| (\x -> length x <3) . (show) |]] String
"Int" String
"Int" Double
2
m2 :: StreamVertex
m2 = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
2 StreamOperator
Map [[| show |]] String
"Int" String
"String" Double
1
so :: StreamVertex
so = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 (Double -> StreamOperator
Source Double
1) [] String
"Int" String
"Int" Double
1
si :: StreamVertex
si = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
3 StreamOperator
Sink [] String
"String" String
"String" Double
1
mapFilterPre :: StreamGraph
mapFilterPre = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path [ StreamVertex
so, StreamVertex
m1, StreamVertex
f1, StreamVertex
si ]
mapFilterPost :: StreamGraph
mapFilterPost = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path [ StreamVertex
so, StreamVertex
f2, StreamVertex
m2, StreamVertex
si ]
test_mapfilter2 :: IO ()
test_mapfilter2 = StreamGraph -> StreamGraph -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
assertEqual StreamGraph
mapFilterPost
(StreamGraph -> IO ()) -> StreamGraph -> IO ()
forall a b. (a -> b) -> a -> b
$ RewriteRule -> StreamGraph -> StreamGraph
applyRule RewriteRule
mapFilter StreamGraph
mapFilterPre
mapFilterSub :: StreamGraph
mapFilterSub = StreamGraph
mapFilterPre StreamGraph -> StreamGraph -> StreamGraph
forall a. Graph a -> Graph a -> Graph a
`Overlay` StreamGraph
forall a. Graph a
Empty
test_mapfilter3 :: IO ()
test_mapfilter3 = StreamGraph -> StreamGraph -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
assertEqual StreamGraph
mapFilterPost
(StreamGraph -> IO ()) -> StreamGraph -> IO ()
forall a b. (a -> b) -> a -> b
$ RewriteRule -> StreamGraph -> StreamGraph
applyRule RewriteRule
mapFilter StreamGraph
mapFilterSub
mapFilterSub2 :: StreamGraph
mapFilterSub2 = StreamGraph
forall a. Graph a
Empty StreamGraph -> StreamGraph -> StreamGraph
forall a. Graph a -> Graph a -> Graph a
`Overlay` StreamGraph
forall a. Graph a
Empty StreamGraph -> StreamGraph -> StreamGraph
forall a. Graph a -> Graph a -> Graph a
`Overlay` StreamGraph
mapFilterPre StreamGraph -> StreamGraph -> StreamGraph
forall a. Graph a -> Graph a -> Graph a
`Overlay` StreamGraph
mapFilterPre
test_mapfilter4 :: IO ()
test_mapfilter4 = StreamGraph -> StreamGraph -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
assertEqual StreamGraph
mapFilterPost
(StreamGraph -> IO ()) -> StreamGraph -> IO ()
forall a b. (a -> b) -> a -> b
$ RewriteRule -> StreamGraph -> StreamGraph
applyRule RewriteRule
mapFilter StreamGraph
mapFilterSub2
test_mapFilter_ord :: IO ()
test_mapFilter_ord = HasCallStack => Bool -> IO ()
Bool -> IO ()
assertBool (Bool -> IO ()) -> Bool -> IO ()
forall a b. (a -> b) -> a -> b
$ [Int] -> Bool
forall a. Ord a => [a] -> Bool
sorted ([Int] -> Bool) -> [Int] -> Bool
forall a b. (a -> b) -> a -> b
$ (StreamVertex -> Int) -> [StreamVertex] -> [Int]
forall a b. (a -> b) -> [a] -> [b]
map StreamVertex -> Int
vertexId (StreamGraph -> [StreamVertex]
forall a. Ord a => Graph a -> [a]
vertexList StreamGraph
mapFilterPost)
sorted :: Ord a => [a] -> Bool
sorted :: forall a. Ord a => [a] -> Bool
sorted [] = Bool
True
sorted [a
x] = Bool
True
sorted (a
x:a
y:[a]
zz) = (a
x a -> a -> Bool
forall a. Ord a => a -> a -> Bool
<= a
y) Bool -> Bool -> Bool
&& [a] -> Bool
forall a. Ord a => [a] -> Bool
sorted (a
ya -> [a] -> [a]
forall a. a -> [a] -> [a]
:[a]
zz)
filterFilterAcc :: RewriteRule
filterFilterAcc :: RewriteRule
filterFilterAcc (Connect (Vertex v1 :: StreamVertex
v1@(StreamVertex Int
i (Filter Double
sel1) (ExpQ
p:[ExpQ]
_) String
ty String
_ Double
s1))
(Vertex v2 :: StreamVertex
v2@(StreamVertex Int
_ (FilterAcc Double
sel2) (ExpQ
f:ExpQ
a:ExpQ
q:[ExpQ]
_) String
_ String
_ Double
s2))) =
let v3 :: StreamVertex
v3 = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
i (Double -> StreamOperator
FilterAcc (Double
sel1Double -> Double -> Double
forall a. Num a => a -> a -> a
*Double
sel2))
[ [| \a v -> if $(p) v then $(f) a v else a |]
, ExpQ
a
, [| \v a -> $(p) v && $(q) v a |] ] String
ty String
ty (Double -> Double -> Double -> Double
sumTimes Double
s1 Double
sel1 Double
s2)
in (StreamGraph -> StreamGraph) -> Maybe (StreamGraph -> StreamGraph)
forall a. a -> Maybe a
Just (StreamVertex -> StreamVertex -> StreamGraph -> StreamGraph
forall a. Eq a => a -> a -> Graph a -> Graph a
removeEdge StreamVertex
v3 StreamVertex
v3 (StreamGraph -> StreamGraph)
-> (StreamGraph -> StreamGraph) -> StreamGraph -> StreamGraph
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (StreamVertex -> Bool)
-> StreamVertex -> StreamGraph -> StreamGraph
forall a. (a -> Bool) -> a -> Graph a -> Graph a
mergeVertices (StreamVertex -> [StreamVertex] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` [StreamVertex
v1,StreamVertex
v2]) StreamVertex
v3)
filterFilterAcc StreamGraph
_ = Maybe (StreamGraph -> StreamGraph)
forall a. Maybe a
Nothing
filterFilterAccPre :: StreamGraph
filterFilterAccPre = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path
[ Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 (Double -> StreamOperator
Source Double
1) [] String
"Int" String
"Int" Double
1
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 (Double -> StreamOperator
Filter Double
0.5) [ExpQ
p] String
"Int" String
"Int" Double
0.1
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
2 (Double -> StreamOperator
FilterAcc Double
0.5) [ExpQ
f , ExpQ
a , ExpQ
q] String
"Int" String
"Int" Double
0.2
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
3 StreamOperator
Sink [] String
"Int" String
"Int" Double
1
]
where p :: ExpQ
p = [| (>3) |]
f :: ExpQ
f = [| (\_ h -> (False, h)) |]
a :: ExpQ
a = [| (True, undefined) |]
q :: ExpQ
q = [| \new (b,old) -> b || old /= new |]
filterFilterAccPost :: StreamGraph
filterFilterAccPost = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path
[ Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 (Double -> StreamOperator
Source Double
1) [] String
"Int" String
"Int" Double
1
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 (Double -> StreamOperator
FilterAcc Double
0.25)
[ [| \a v -> if $(p) v then $(f) a v else a |]
, ExpQ
a
, [| \v a -> $(p) v && $(q) v a |]
] String
"Int" String
"Int" Double
0.2
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
3 StreamOperator
Sink [] String
"Int" String
"Int" Double
1
]
where p :: ExpQ
p = [| (>3) |]
f :: ExpQ
f = [| (\_ h -> (False, h)) |]
a :: ExpQ
a = [| (True, undefined) |]
q :: ExpQ
q = [| \new (b,old) -> b || old /= new |]
test_filterFilterAcc :: IO ()
test_filterFilterAcc = StreamGraph -> StreamGraph -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
assertEqual (RewriteRule -> StreamGraph -> StreamGraph
applyRule RewriteRule
filterFilterAcc StreamGraph
filterFilterAccPre)
StreamGraph
filterFilterAccPost
filterAccFilter :: RewriteRule
filterAccFilter :: RewriteRule
filterAccFilter (Connect (Vertex v1 :: StreamVertex
v1@(StreamVertex Int
i (FilterAcc Double
sel1) (ExpQ
f:ExpQ
a:ExpQ
p:[ExpQ]
_) String
ty String
_ Double
s1))
(Vertex v2 :: StreamVertex
v2@(StreamVertex Int
_ (Filter Double
sel2) (ExpQ
q:[ExpQ]
_) String
_ String
_ Double
s2))) =
let p' :: ExpQ
p' = [| \v a -> $(p) v a && $(q) v |]
v :: StreamVertex
v = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
i (Double -> StreamOperator
FilterAcc (Double
sel1Double -> Double -> Double
forall a. Num a => a -> a -> a
*Double
sel2)) [ExpQ
f,ExpQ
a,ExpQ
p'] String
ty String
ty (Double -> Double -> Double -> Double
sumTimes Double
s1 Double
sel1 Double
s2)
in (StreamGraph -> StreamGraph) -> Maybe (StreamGraph -> StreamGraph)
forall a. a -> Maybe a
Just (StreamVertex -> StreamVertex -> StreamGraph -> StreamGraph
forall a. Eq a => a -> a -> Graph a -> Graph a
removeEdge StreamVertex
v StreamVertex
v (StreamGraph -> StreamGraph)
-> (StreamGraph -> StreamGraph) -> StreamGraph -> StreamGraph
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (StreamVertex -> Bool)
-> StreamVertex -> StreamGraph -> StreamGraph
forall a. (a -> Bool) -> a -> Graph a -> Graph a
mergeVertices (StreamVertex -> [StreamVertex] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` [StreamVertex
v1,StreamVertex
v2]) StreamVertex
v)
filterAccFilter StreamGraph
_ = Maybe (StreamGraph -> StreamGraph)
forall a. Maybe a
Nothing
filterAccFilterPre :: StreamGraph
filterAccFilterPre = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path
[ Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 (Double -> StreamOperator
Source Double
1) [] String
"Int" String
"Int" Double
1
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 (Double -> StreamOperator
FilterAcc Double
0.5) [ExpQ
f,ExpQ
a,ExpQ
p] String
"Int" String
"Int" Double
0.1
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
2 (Double -> StreamOperator
Filter Double
0.5) [ExpQ
q] String
"Int" String
"Int" Double
0.2
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
3 StreamOperator
Sink [] String
"Int" String
"Int" Double
1
]
where f :: ExpQ
f = [| (\_ h -> (False, h)) |]
a :: ExpQ
a = [| (True, undefined) |]
p :: ExpQ
p = [| \new (b,old) -> b || old /= new |]
q :: ExpQ
q = [| (>3) |]
filterAccFilterPost :: StreamGraph
filterAccFilterPost = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path
[ Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 (Double -> StreamOperator
Source Double
1) [] String
"Int" String
"Int" Double
1
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 (Double -> StreamOperator
FilterAcc Double
0.25) [ExpQ
f, ExpQ
a, ExpQ
p'] String
"Int" String
"Int" Double
0.2
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
3 StreamOperator
Sink [] String
"Int" String
"Int" Double
1
]
where f :: ExpQ
f = [| (\_ h -> (False, h)) |]
a :: ExpQ
a = [| (True, undefined) |]
p :: ExpQ
p = [| \new (b,old) -> b || old /= new |]
q :: ExpQ
q = [| (>3) |]
p' :: ExpQ
p'= [| \v a -> $(p) v a && $(q) v |]
test_filterAccFilter :: IO ()
test_filterAccFilter = StreamGraph -> StreamGraph -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
assertEqual (RewriteRule -> StreamGraph -> StreamGraph
applyRule RewriteRule
filterAccFilter StreamGraph
filterAccFilterPre)
StreamGraph
filterAccFilterPost
filterAccFilterAcc :: RewriteRule
filterAccFilterAcc :: RewriteRule
filterAccFilterAcc (Connect (Vertex v1 :: StreamVertex
v1@(StreamVertex Int
i (FilterAcc Double
sel1) (ExpQ
f:ExpQ
a:ExpQ
p:[ExpQ]
ss) String
ty String
_ Double
s1))
(Vertex v2 :: StreamVertex
v2@(StreamVertex Int
_ (FilterAcc Double
sel2) (ExpQ
g:ExpQ
b:ExpQ
q:[ExpQ]
_) String
_ String
_ Double
s2))) =
let f' :: ExpQ
f' = [| \ (a,b) v -> ($(f) a v, if $(p) v a then $(g) b v else b) |]
a' :: ExpQ
a' = [| ($(a), $(b)) |]
q' :: ExpQ
q' = [| \v (y,z) -> $(p) v y && $(q) v z |]
v :: StreamVertex
v = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
i (Double -> StreamOperator
FilterAcc (Double
sel1Double -> Double -> Double
forall a. Num a => a -> a -> a
*Double
sel2)) (ExpQ
f'ExpQ -> [ExpQ] -> [ExpQ]
forall a. a -> [a] -> [a]
:ExpQ
a'ExpQ -> [ExpQ] -> [ExpQ]
forall a. a -> [a] -> [a]
:ExpQ
q'ExpQ -> [ExpQ] -> [ExpQ]
forall a. a -> [a] -> [a]
:[ExpQ]
ss) String
ty String
ty (Double -> Double -> Double -> Double
sumTimes Double
s1 Double
sel1 Double
s2)
in (StreamGraph -> StreamGraph) -> Maybe (StreamGraph -> StreamGraph)
forall a. a -> Maybe a
Just (StreamVertex -> StreamVertex -> StreamGraph -> StreamGraph
forall a. Eq a => a -> a -> Graph a -> Graph a
removeEdge StreamVertex
v StreamVertex
v (StreamGraph -> StreamGraph)
-> (StreamGraph -> StreamGraph) -> StreamGraph -> StreamGraph
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (StreamVertex -> Bool)
-> StreamVertex -> StreamGraph -> StreamGraph
forall a. (a -> Bool) -> a -> Graph a -> Graph a
mergeVertices (StreamVertex -> [StreamVertex] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` [StreamVertex
v1,StreamVertex
v2]) StreamVertex
v)
filterAccFilterAcc StreamGraph
_ = Maybe (StreamGraph -> StreamGraph)
forall a. Maybe a
Nothing
filterAccFilterAccPre :: StreamGraph
filterAccFilterAccPre = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path
[ Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 (Double -> StreamOperator
Source Double
1) [] String
"Int" String
"Int" Double
1
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 (Double -> StreamOperator
FilterAcc Double
0.5) [ExpQ
f,ExpQ
a,ExpQ
p] String
"Int" String
"Int" Double
0.1
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
2 (Double -> StreamOperator
FilterAcc Double
0.5) [ExpQ
g,ExpQ
b,ExpQ
q] String
"Int" String
"Int" Double
0.2
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
3 StreamOperator
Sink [] String
"Int" String
"Int" Double
1
]
where
f :: ExpQ
f = [| (\_ h -> (False, h)) |]
a :: ExpQ
a = [| (True, undefined) |]
p :: ExpQ
p = [| \new (b,old) -> b || old /= new |]
g :: ExpQ
g = [| \_ v -> v |]
b :: ExpQ
b = [| 0 |]
q :: ExpQ
q = [| (>=) |]
filterAccFilterAccPost :: StreamGraph
filterAccFilterAccPost = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path
[ Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 (Double -> StreamOperator
Source Double
1) [] String
"Int" String
"Int" Double
1
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 (Double -> StreamOperator
FilterAcc Double
0.25)
[ [| \(a,b) v -> ($(f) a v, if $(p) v a then $(g) b v else b) |]
, [| ($(a),$(b)) |]
, [| \v (y,z) -> $(p) v y && $(q) v z |]
] String
"Int" String
"Int" Double
0.2
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
3 StreamOperator
Sink [] String
"Int" String
"Int" Double
1
]
where f :: ExpQ
f = [| (\_ h -> (False, h)) |]
a :: ExpQ
a = [| (True, undefined) |]
p :: ExpQ
p = [| \new (b,old) -> b || old /= new |]
g :: ExpQ
g = [| \_ v -> v |]
b :: ExpQ
b = [| 0 |]
q :: ExpQ
q = [| (>=) |]
test_filterAccFilterAcc :: IO ()
test_filterAccFilterAcc = StreamGraph -> StreamGraph -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
assertEqual (RewriteRule -> StreamGraph -> StreamGraph
applyRule RewriteRule
filterAccFilterAcc StreamGraph
filterAccFilterAccPre)
StreamGraph
filterAccFilterAccPost
mapFuse :: RewriteRule
mapFuse :: RewriteRule
mapFuse (Connect (Vertex v1 :: StreamVertex
v1@(StreamVertex Int
i StreamOperator
Map (ExpQ
f:[ExpQ]
ss) String
t1 String
_ Double
s1))
(Vertex v2 :: StreamVertex
v2@(StreamVertex Int
_ StreamOperator
Map (ExpQ
g:[ExpQ]
_) String
_ String
t2 Double
s2))) =
let v :: StreamVertex
v = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
i StreamOperator
Map ([| $(f) >>> $(g) |]ExpQ -> [ExpQ] -> [ExpQ]
forall a. a -> [a] -> [a]
:[ExpQ]
ss) String
t1 String
t2 (Double -> Double -> Double -> Double
sumTimes Double
s1 Double
1 Double
s2)
in (StreamGraph -> StreamGraph) -> Maybe (StreamGraph -> StreamGraph)
forall a. a -> Maybe a
Just (StreamVertex -> StreamVertex -> StreamGraph -> StreamGraph
forall a. Eq a => a -> a -> Graph a -> Graph a
removeEdge StreamVertex
v StreamVertex
v (StreamGraph -> StreamGraph)
-> (StreamGraph -> StreamGraph) -> StreamGraph -> StreamGraph
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (StreamVertex -> Bool)
-> StreamVertex -> StreamGraph -> StreamGraph
forall a. (a -> Bool) -> a -> Graph a -> Graph a
mergeVertices (StreamVertex -> [StreamVertex] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` [StreamVertex
v1,StreamVertex
v2]) StreamVertex
v)
mapFuse StreamGraph
_ = Maybe (StreamGraph -> StreamGraph)
forall a. Maybe a
Nothing
mapFusePre :: StreamGraph
mapFusePre = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path
[ Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 (Double -> StreamOperator
Source Double
1) [] String
"String" String
"String" Double
1
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 StreamOperator
Map [[| show |]] String
"Int" String
"String" Double
1
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
2 StreamOperator
Map [[| length |]] String
"String" String
"Int" Double
1
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
3 StreamOperator
Sink [] String
"Int" String
"Int" Double
1
]
mapFusePost :: StreamGraph
mapFusePost = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path
[ Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 (Double -> StreamOperator
Source Double
1) [] String
"String" String
"String" Double
1
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 StreamOperator
Map [[| show >>> length |]] String
"Int" String
"Int" Double
2
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
3 StreamOperator
Sink [] String
"Int" String
"Int" Double
1
]
test_mapFuse :: IO ()
test_mapFuse = StreamGraph -> StreamGraph -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
assertEqual (RewriteRule -> StreamGraph -> StreamGraph
applyRule RewriteRule
mapFuse StreamGraph
mapFusePre) StreamGraph
mapFusePost
mapScan :: RewriteRule
mapScan :: RewriteRule
mapScan (Connect (Vertex v1 :: StreamVertex
v1@(StreamVertex Int
i StreamOperator
Map (ExpQ
f:[ExpQ]
ss) String
t1 String
_ Double
s1))
(Vertex v2 :: StreamVertex
v2@(StreamVertex Int
_ StreamOperator
Scan (ExpQ
g:ExpQ
a:[ExpQ]
_) String
_ String
t2 Double
s2))) =
let v :: StreamVertex
v = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
i StreamOperator
Scan ([| flip (flip $(f) >>> $(g)) |]ExpQ -> [ExpQ] -> [ExpQ]
forall a. a -> [a] -> [a]
:ExpQ
aExpQ -> [ExpQ] -> [ExpQ]
forall a. a -> [a] -> [a]
:[ExpQ]
ss) String
t1 String
t2 (Double -> Double -> Double -> Double
sumTimes Double
s1 Double
1 Double
s2)
in (StreamGraph -> StreamGraph) -> Maybe (StreamGraph -> StreamGraph)
forall a. a -> Maybe a
Just (StreamVertex -> StreamVertex -> StreamGraph -> StreamGraph
forall a. Eq a => a -> a -> Graph a -> Graph a
removeEdge StreamVertex
v StreamVertex
v (StreamGraph -> StreamGraph)
-> (StreamGraph -> StreamGraph) -> StreamGraph -> StreamGraph
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (StreamVertex -> Bool)
-> StreamVertex -> StreamGraph -> StreamGraph
forall a. (a -> Bool) -> a -> Graph a -> Graph a
mergeVertices (StreamVertex -> [StreamVertex] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` [StreamVertex
v1,StreamVertex
v2]) StreamVertex
v)
mapScan StreamGraph
_ = Maybe (StreamGraph -> StreamGraph)
forall a. Maybe a
Nothing
mapScanPre :: StreamGraph
mapScanPre = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path
[ Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 (Double -> StreamOperator
Source Double
1) [] String
"Int" String
"Int" Double
1
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 StreamOperator
Map [ExpQ
f] String
"Int" String
"Int" Double
1
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
2 StreamOperator
Scan [ExpQ
g,ExpQ
a] String
"Int" String
"Int" Double
1
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
3 StreamOperator
Sink [] String
"Int" String
"Int" Double
1
]
where
f :: ExpQ
f = [| (+1) |]
g :: ExpQ
g = [| \c _ -> c +1|]
a :: ExpQ
a = [| 0 |]
mapScanPost :: StreamGraph
mapScanPost = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path
[ Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 (Double -> StreamOperator
Source Double
1) [] String
"Int" String
"Int" Double
1
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 StreamOperator
Scan [[| flip (flip $(f) >>> $(g))|], [| $(a) |]] String
"Int" String
"Int" Double
2
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
3 StreamOperator
Sink [] String
"Int" String
"Int" Double
1
]
where
f :: ExpQ
f = [| (+1) |]
g :: ExpQ
g = [| \c _ -> c +1|]
a :: ExpQ
a = [| 0 |]
test_mapScan :: IO ()
test_mapScan = StreamGraph -> StreamGraph -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
assertEqual (RewriteRule -> StreamGraph -> StreamGraph
applyRule RewriteRule
mapScan StreamGraph
mapScanPre) StreamGraph
mapScanPost
expandFilter :: RewriteRule
expandFilter :: RewriteRule
expandFilter (Connect (Vertex e :: StreamVertex
e@(StreamVertex Int
j StreamOperator
Expand [ExpQ]
_ String
t1 String
t2 Double
se))
(Vertex f :: StreamVertex
f@(StreamVertex Int
i (Filter Double
_) (ExpQ
p:[ExpQ]
_) String
_ String
_ Double
sf))) =
let m :: StreamVertex
m = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
j StreamOperator
Map [[| filter $(p) |]] String
t1 String
t1 Double
sf
e' :: StreamVertex
e'= Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
i StreamOperator
Expand [] String
t1 String
t2 Double
se
in (StreamGraph -> StreamGraph) -> Maybe (StreamGraph -> StreamGraph)
forall a. a -> Maybe a
Just (StreamVertex -> StreamVertex -> StreamGraph -> StreamGraph
forall a. Eq a => a -> a -> Graph a -> Graph a
replaceVertex StreamVertex
f StreamVertex
e' (StreamGraph -> StreamGraph)
-> (StreamGraph -> StreamGraph) -> StreamGraph -> StreamGraph
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamVertex -> StreamVertex -> StreamGraph -> StreamGraph
forall a. Eq a => a -> a -> Graph a -> Graph a
replaceVertex StreamVertex
e StreamVertex
m)
expandFilter StreamGraph
_ = Maybe (StreamGraph -> StreamGraph)
forall a. Maybe a
Nothing
expandFilterPre :: StreamGraph
expandFilterPre = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path
[ Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 (Double -> StreamOperator
Source Double
1) [] String
"[Int]" String
"[Int]" Double
1
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 StreamOperator
Expand [] String
"[Int]" String
"Int" Double
2
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
2 (Double -> StreamOperator
Filter Double
0.5) [[|$(p)|]] String
"Int" String
"Int" Double
3
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
3 StreamOperator
Sink [] String
"Int" String
"Int" Double
4
]
where
p :: ExpQ
p = [| (>3) |]
expandFilterPost :: StreamGraph
expandFilterPost = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path
[ Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 (Double -> StreamOperator
Source Double
1) [] String
"[Int]" String
"[Int]" Double
1
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 StreamOperator
Map [[|filter $(p) |]] String
"[Int]" String
"[Int]" Double
3
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
2 StreamOperator
Expand [] String
"[Int]" String
"Int" Double
2
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
3 StreamOperator
Sink [] String
"Int" String
"Int" Double
4
]
where
p :: ExpQ
p = [| (>3) |]
test_expandFilter :: IO ()
test_expandFilter = StreamGraph -> StreamGraph -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
assertEqual (RewriteRule -> StreamGraph -> StreamGraph
applyRule RewriteRule
expandFilter StreamGraph
expandFilterPre) StreamGraph
expandFilterPost
mapFilterAcc :: RewriteRule
mapFilterAcc :: RewriteRule
mapFilterAcc (Connect (Vertex m :: StreamVertex
m@(StreamVertex Int
i StreamOperator
Map (ExpQ
f:[ExpQ]
_) String
t1 String
_ Double
sm))
(Vertex f1 :: StreamVertex
f1@(StreamVertex Int
j (FilterAcc Double
sel) (ExpQ
g:ExpQ
a:ExpQ
p:[ExpQ]
_) String
_ String
_ Double
sf))) =
let f2 :: StreamVertex
f2 = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
i (Double -> StreamOperator
FilterAcc Double
sel) [ExpQ
g, ExpQ
a, [| ($f) >>> $(p) |]] String
t1 String
t1 (Double -> Double -> Double -> Double
sumTimes Double
sm Double
1 Double
sf)
m2 :: StreamVertex
m2 = StreamVertex
m { vertexId :: Int
vertexId = Int
j }
in (StreamGraph -> StreamGraph) -> Maybe (StreamGraph -> StreamGraph)
forall a. a -> Maybe a
Just (StreamVertex -> StreamVertex -> StreamGraph -> StreamGraph
forall a. Eq a => a -> a -> Graph a -> Graph a
replaceVertex StreamVertex
f1 StreamVertex
m2 (StreamGraph -> StreamGraph)
-> (StreamGraph -> StreamGraph) -> StreamGraph -> StreamGraph
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamVertex -> StreamVertex -> StreamGraph -> StreamGraph
forall a. Eq a => a -> a -> Graph a -> Graph a
replaceVertex StreamVertex
m StreamVertex
f2)
mapFilterAcc StreamGraph
_ = Maybe (StreamGraph -> StreamGraph)
forall a. Maybe a
Nothing
mapFilterAccPre :: StreamGraph
mapFilterAccPre = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path
[ Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 (Double -> StreamOperator
Source Double
1) [] String
"Int" String
"Int" Double
1
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 StreamOperator
Map [ExpQ
f] String
"Int" String
"String" Double
1
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
2 (Double -> StreamOperator
FilterAcc Double
0.5) [ExpQ
g,ExpQ
a,ExpQ
p] String
"String" String
"String" Double
1
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
3 StreamOperator
Sink [] String
"String" String
"String" Double
1
]
where
f :: ExpQ
f = [| (+1) |]
g :: ExpQ
g = [| (\_ h -> (False, h)) |]
a :: ExpQ
a = [| (True, undefined) |]
p :: ExpQ
p = [| \new (b,old) -> b || old /= new |]
mapFilterAccPost :: StreamGraph
mapFilterAccPost = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path
[ Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 (Double -> StreamOperator
Source Double
1) [] String
"Int" String
"Int" Double
1
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 (Double -> StreamOperator
FilterAcc Double
0.5) [ExpQ
g,ExpQ
a, [| $(f) >>> $(p) |]] String
"Int" String
"Int" Double
2
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
2 StreamOperator
Map [ExpQ
f] String
"Int" String
"String" Double
1
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
3 StreamOperator
Sink [] String
"String" String
"String" Double
1
]
where
f :: ExpQ
f = [| (+1) |]
g :: ExpQ
g = [| (\_ h -> (False, h)) |]
a :: ExpQ
a = [| (True, undefined) |]
p :: ExpQ
p = [| \new (b,old) -> b || old /= new |]
test_mapFilterAcc :: IO ()
test_mapFilterAcc = StreamGraph -> StreamGraph -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
assertEqual (RewriteRule -> StreamGraph -> StreamGraph
applyRule RewriteRule
mapFilterAcc StreamGraph
mapFilterAccPre) StreamGraph
mapFilterAccPost
isTypeVariable :: String -> Bool
isTypeVariable :: String -> Bool
isTypeVariable = Char -> Bool
isLower (Char -> Bool) -> (String -> Char) -> String -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> Char
forall a. [a] -> a
head
compatibleTypes :: String -> String -> Bool
compatibleTypes :: String -> String -> Bool
compatibleTypes String
outT String
inT | String
outT String -> String -> Bool
forall a. Eq a => a -> a -> Bool
== String
inT = Bool
True
| String -> Bool
isTypeVariable String
inT = Bool
True
| String -> Bool
isTypeVariable String
outT= Bool
True
| Bool
otherwise = Bool
False
test_compatibleTypes :: IO ()
test_compatibleTypes = HasCallStack => Bool -> IO ()
Bool -> IO ()
assertBool (Bool -> IO ()) -> Bool -> IO ()
forall a b. (a -> b) -> a -> b
$ String -> String -> Bool
compatibleTypes String
"Int" String
"a"
mapWindow :: RewriteRule
mapWindow :: RewriteRule
mapWindow (Connect (Vertex m :: StreamVertex
m@(StreamVertex Int
i StreamOperator
Map (ExpQ
f:[ExpQ]
_) String
mapInT String
mapOutT Double
sm))
(Vertex w :: StreamVertex
w@(StreamVertex Int
j StreamOperator
Window (ExpQ
wm:[ExpQ]
_) String
windowInT String
windowOutT Double
sw))) =
if Bool -> Bool
not (String -> String -> Bool
compatibleTypes String
mapInT String
windowInT)
then Maybe (StreamGraph -> StreamGraph)
forall a. Maybe a
Nothing
else let
w2 :: StreamVertex
w2 = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
i StreamOperator
Window [ExpQ
wm] String
windowInT String
windowOutT Double
sw
m2 :: StreamVertex
m2 = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
j StreamOperator
Map [[| map $(f) |]] (String
"["String -> String -> String
forall a. [a] -> [a] -> [a]
++String
mapInTString -> String -> String
forall a. [a] -> [a] -> [a]
++String
"]") (String
"["String -> String -> String
forall a. [a] -> [a] -> [a]
++String
mapOutTString -> String -> String
forall a. [a] -> [a] -> [a]
++String
"]") Double
sm
in (StreamGraph -> StreamGraph) -> Maybe (StreamGraph -> StreamGraph)
forall a. a -> Maybe a
Just (StreamVertex -> StreamVertex -> StreamGraph -> StreamGraph
forall a. Eq a => a -> a -> Graph a -> Graph a
replaceVertex StreamVertex
m StreamVertex
w2 (StreamGraph -> StreamGraph)
-> (StreamGraph -> StreamGraph) -> StreamGraph -> StreamGraph
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamVertex -> StreamVertex -> StreamGraph -> StreamGraph
forall a. Eq a => a -> a -> Graph a -> Graph a
replaceVertex StreamVertex
w StreamVertex
m2)
mapWindow StreamGraph
_ = Maybe (StreamGraph -> StreamGraph)
forall a. Maybe a
Nothing
mapWindowPre :: StreamGraph
mapWindowPre = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path
[ Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 (Double -> StreamOperator
Source Double
1) [] String
"Int" String
"Int" Double
1
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 StreamOperator
Map [[| (+1) |]] String
"Int" String
"Int" Double
2
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
2 StreamOperator
Window [[| chop 2 |]] String
"Int" String
"[Int]" Double
3
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
3 StreamOperator
Sink [] String
"[Int]" String
"[Int]" Double
4
]
mapWindowPost :: StreamGraph
mapWindowPost = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path
[ Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 (Double -> StreamOperator
Source Double
1) [] String
"Int" String
"Int" Double
1
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 StreamOperator
Window [[| chop 2 |]] String
"Int" String
"[Int]" Double
3
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
2 StreamOperator
Map [[| map (+1) |]] String
"[Int]" String
"[Int]" Double
2
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
3 StreamOperator
Sink [] String
"[Int]" String
"[Int]" Double
4
]
test_mapWindow :: IO ()
test_mapWindow = StreamGraph -> StreamGraph -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
assertEqual (RewriteRule -> StreamGraph -> StreamGraph
applyRule RewriteRule
mapWindow StreamGraph
mapWindowPre) StreamGraph
mapWindowPost
mapWindowPre2 :: StreamGraph
mapWindowPre2 = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path
[ Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 StreamOperator
Map [[| read :: String -> Int |]] String
"String" String
"Int" Double
1
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 StreamOperator
Window [[| chop 2 :: WindowMaker Int|]] String
"Int" String
"[Int]" Double
1
]
test_mapWindow2 :: IO ()
test_mapWindow2 = Maybe (StreamGraph -> StreamGraph) -> IO ()
forall a. HasCallStack => Maybe a -> IO ()
assertNothingNoShow (RewriteRule
mapWindow StreamGraph
mapWindowPre2)
mapWindowPre3 :: StreamGraph
mapWindowPre3 = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path
[ Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 (Double -> StreamOperator
Source Double
1) [] String
"(Int,Int)" String
"(Int,Int)" Double
1
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 StreamOperator
Map [[|fst|]] String
"(Int,Int)" String
"Int" Double
0.0
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
2 StreamOperator
Window [[|chop 2|]] String
"a" String
"[a]" Double
0.0
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
3 StreamOperator
Sink [] String
"[Int]" String
"[Int]" Double
4
]
mapWindowPost3 :: StreamGraph
mapWindowPost3 = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path
[ Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 (Double -> StreamOperator
Source Double
1) [] String
"(Int,Int)" String
"(Int,Int)" Double
1
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 StreamOperator
Window [[|chop 2|]] String
"a" String
"[a]" Double
0.0
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
2 StreamOperator
Map [[|map fst|]] String
"[(Int,Int)]" String
"[Int]" Double
0.0
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
3 StreamOperator
Sink [] String
"[Int]" String
"[Int]" Double
4
]
test_mapWindow3 :: IO ()
test_mapWindow3 = StreamGraph -> StreamGraph -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
assertEqual StreamGraph
mapWindowPost3 (RewriteRule -> StreamGraph -> StreamGraph
applyRule RewriteRule
mapWindow StreamGraph
mapWindowPre3)
mapWindowPre4 :: StreamGraph
mapWindowPre4 = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path
[ Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 (Double -> StreamOperator
Source Double
1) [] String
"Int" String
"Int" Double
1
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 StreamOperator
Map [[|(+1)|]] String
"a" String
"a" Double
0.0
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
2 StreamOperator
Window [[|chop 2|]] String
"Int" String
"[Int]" Double
0.0
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
3 StreamOperator
Sink [] String
"[Int]" String
"[Int]" Double
4
]
mapWindowPost4 :: StreamGraph
mapWindowPost4 = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path
[ Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 (Double -> StreamOperator
Source Double
1) [] String
"Int" String
"Int" Double
1
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 StreamOperator
Window [[|chop 2|]] String
"Int" String
"[Int]" Double
0.0
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
2 StreamOperator
Map [[|map (+1)|]] String
"[a]" String
"[a]" Double
0.0
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
3 StreamOperator
Sink [] String
"[Int]" String
"[Int]" Double
4
]
test_mapWindow4 :: IO ()
test_mapWindow4 = StreamGraph -> StreamGraph -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
assertEqual StreamGraph
mapWindowPost4 (RewriteRule -> StreamGraph -> StreamGraph
applyRule RewriteRule
mapWindow StreamGraph
mapWindowPre4)
mapWindowPre5 :: StreamGraph
mapWindowPre5 = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path
[ Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 (Double -> StreamOperator
Source Double
1) [] String
"String" String
"String" Double
1
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 StreamOperator
Map [[|show|]] String
"a" String
"b" Double
0.0
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
2 StreamOperator
Window [[|chop 2|]] String
"String" String
"[String]" Double
0.0
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
3 StreamOperator
Sink [] String
"[String]" String
"[String]" Double
4
]
test_mapWindow5 :: IO ()
test_mapWindow5 = Maybe (StreamGraph -> StreamGraph) -> IO ()
forall a. HasCallStack => Maybe a -> IO ()
assertNothingNoShow (RewriteRule
mapWindow StreamGraph
mapWindowPre5)
mapWindowPre6 :: StreamGraph
mapWindowPre6 = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path
[ Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 (Double -> StreamOperator
Source Double
1) [] String
"String" String
"String" Double
1
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 StreamOperator
Map [[|show|]] String
"a" String
"b" Double
0.0
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
2 StreamOperator
Window [[|chop 2|]] String
"c" String
"[c]" Double
0.0
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
3 StreamOperator
Sink [] String
"[String]" String
"[String]" Double
4
]
mapWindowPost6 :: StreamGraph
mapWindowPost6 = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path
[ Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 (Double -> StreamOperator
Source Double
1) [] String
"String" String
"String" Double
1
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 StreamOperator
Window [[|chop 2|]] String
"c" String
"[c]" Double
0.0
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
2 StreamOperator
Map [[|map show|]] String
"[a]" String
"[b]" Double
0.0
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
3 StreamOperator
Sink [] String
"[String]" String
"[String]" Double
4
]
test_mapWindow6 :: IO ()
test_mapWindow6 = StreamGraph -> StreamGraph -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
assertEqual StreamGraph
mapWindowPost6 (RewriteRule -> StreamGraph -> StreamGraph
applyRule RewriteRule
mapWindow StreamGraph
mapWindowPre6)
expandMap :: RewriteRule
expandMap :: RewriteRule
expandMap (Connect (Vertex e :: StreamVertex
e@(StreamVertex Int
i StreamOperator
Expand [ExpQ]
_ String
t1 String
_ Double
se))
(Vertex m :: StreamVertex
m@(StreamVertex Int
j StreamOperator
Map (ExpQ
f:[ExpQ]
_) String
_ String
t4 Double
sm))) =
let t5 :: String
t5 = String
"[" String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
t4 String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
"]"
m2 :: StreamVertex
m2 = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
i StreamOperator
Map [[| map $(f) |]] String
t1 String
t5 Double
sm
e2 :: StreamVertex
e2 = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
j StreamOperator
Expand [] String
t5 String
t4 Double
se
in (StreamGraph -> StreamGraph) -> Maybe (StreamGraph -> StreamGraph)
forall a. a -> Maybe a
Just (StreamVertex -> StreamVertex -> StreamGraph -> StreamGraph
forall a. Eq a => a -> a -> Graph a -> Graph a
replaceVertex StreamVertex
m StreamVertex
e2 (StreamGraph -> StreamGraph)
-> (StreamGraph -> StreamGraph) -> StreamGraph -> StreamGraph
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamVertex -> StreamVertex -> StreamGraph -> StreamGraph
forall a. Eq a => a -> a -> Graph a -> Graph a
replaceVertex StreamVertex
e StreamVertex
m2)
expandMap StreamGraph
_ = Maybe (StreamGraph -> StreamGraph)
forall a. Maybe a
Nothing
expandMapPre :: StreamGraph
expandMapPre = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path
[ Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 (Double -> StreamOperator
Source Double
1) [] String
"[Int]" String
"[Int]" Double
1
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 StreamOperator
Expand [] String
"[Int]" String
"Int" Double
2
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
2 StreamOperator
Map [[| show |]] String
"Int" String
"String" Double
3
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
3 StreamOperator
Sink [] String
"String" String
"String" Double
4
]
expandMapPost :: StreamGraph
expandMapPost = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path
[ Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 (Double -> StreamOperator
Source Double
1) [] String
"[Int]" String
"[Int]" Double
1
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 StreamOperator
Map [[| map (show) |]] String
"[Int]" String
"[String]" Double
3
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
2 StreamOperator
Expand [] String
"[String]" String
"String" Double
2
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
3 StreamOperator
Sink [] String
"String" String
"String" Double
4
]
test_expandMap :: IO ()
test_expandMap = StreamGraph -> StreamGraph -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
assertEqual (RewriteRule -> StreamGraph -> StreamGraph
applyRule RewriteRule
expandMap StreamGraph
expandMapPre) StreamGraph
expandMapPost
expandScan :: RewriteRule
expandScan :: RewriteRule
expandScan (Connect (Vertex e :: StreamVertex
e@(StreamVertex Int
i StreamOperator
Expand ([ExpQ]
_) String
t1 String
t2 Double
se))
(Vertex sc :: StreamVertex
sc@(StreamVertex Int
j StreamOperator
Scan (ExpQ
f:ExpQ
a:[ExpQ]
_) String
_ String
t3 Double
ss))) =
(StreamGraph -> StreamGraph) -> Maybe (StreamGraph -> StreamGraph)
forall a. a -> Maybe a
Just ((StreamGraph -> StreamGraph)
-> Maybe (StreamGraph -> StreamGraph))
-> (StreamGraph -> StreamGraph)
-> Maybe (StreamGraph -> StreamGraph)
forall a b. (a -> b) -> a -> b
$ \StreamGraph
g ->
let t4 :: String
t4 = String
"[" String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
t3 String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
"]"
k :: Int
k = StreamGraph -> Int
newVertexId StreamGraph
g
p :: ExpQ
p = [| \b a' -> tail $ scanl $(f) (last b) a' |]
f' :: StreamVertex
f' = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
i (Double -> StreamOperator
Filter Double
0.5) [[| not.null |]] String
t1 String
t1 Double
0
sc' :: StreamVertex
sc'= Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
j StreamOperator
Scan [ExpQ
p, [| [$(a)] |]] String
t1 String
t4 Double
ss
e' :: StreamVertex
e' = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
k StreamOperator
Expand [] String
t4 String
t3 Double
se
in StreamGraph -> StreamGraph -> StreamGraph
forall a. Graph a -> Graph a -> Graph a
overlay ([StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path [StreamVertex
f',StreamVertex
sc',StreamVertex
e']) (StreamGraph -> StreamGraph) -> StreamGraph -> StreamGraph
forall a b. (a -> b) -> a -> b
$
(StreamVertex -> StreamVertex -> StreamGraph -> StreamGraph
forall a. Eq a => a -> a -> Graph a -> Graph a
removeEdge StreamVertex
f' StreamVertex
e' (StreamGraph -> StreamGraph)
-> (StreamGraph -> StreamGraph) -> StreamGraph -> StreamGraph
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamVertex -> StreamVertex -> StreamGraph -> StreamGraph
forall a. Eq a => a -> a -> Graph a -> Graph a
replaceVertex StreamVertex
e StreamVertex
f' (StreamGraph -> StreamGraph)
-> (StreamGraph -> StreamGraph) -> StreamGraph -> StreamGraph
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamVertex -> StreamVertex -> StreamGraph -> StreamGraph
forall a. Eq a => a -> a -> Graph a -> Graph a
replaceVertex StreamVertex
sc StreamVertex
e') StreamGraph
g
expandScan StreamGraph
_ = Maybe (StreamGraph -> StreamGraph)
forall a. Maybe a
Nothing
expandScanPre :: StreamGraph
expandScanPre = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path
[ Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 (Double -> StreamOperator
Source Double
1) [] String
"[Int]" String
"[Int]" Double
1
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 StreamOperator
Expand [] String
"[Int]" String
"Int" Double
2
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
2 StreamOperator
Scan [ExpQ
f,ExpQ
a] String
"Int" String
"Int" Double
3
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
3 StreamOperator
Sink [] String
"Int" String
"Int" Double
4
]
where
f :: ExpQ
f = [| \c _ -> c + 1 |]
a :: ExpQ
a = [| 0 |]
expandScanPost :: StreamGraph
expandScanPost = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path
[ Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 (Double -> StreamOperator
Source Double
1) [] String
"[Int]" String
"[Int]" Double
1
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 (Double -> StreamOperator
Filter Double
0.5) [ExpQ
p] String
"[Int]" String
"[Int]" Double
0
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
2 StreamOperator
Scan [ExpQ
g,ExpQ
as] String
"[Int]" String
"[Int]" Double
3
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
4 StreamOperator
Expand [] String
"[Int]" String
"Int" Double
2
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
3 StreamOperator
Sink [] String
"Int" String
"Int" Double
4
]
where
p :: ExpQ
p = [| not . null |]
g :: ExpQ
g = [| \b a' -> tail $ scanl $(f) (last b) a' |]
f :: ExpQ
f = [| \c _ -> c + 1 |]
as :: ExpQ
as = [| [$(a)] |]
a :: ExpQ
a = [| 0 |]
test_expandScan :: IO ()
test_expandScan = StreamGraph -> StreamGraph -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
assertEqual (StreamGraph -> StreamGraph
forall a. Ord a => Graph a -> Graph a
simplify (StreamGraph -> StreamGraph) -> StreamGraph -> StreamGraph
forall a b. (a -> b) -> a -> b
$ RewriteRule -> StreamGraph -> StreamGraph
applyRule RewriteRule
expandScan StreamGraph
expandScanPre) StreamGraph
expandScanPost
expandExpand :: RewriteRule
expandExpand :: RewriteRule
expandExpand (Connect (Vertex e :: StreamVertex
e@(StreamVertex Int
i StreamOperator
Expand [ExpQ]
_ String
t1 String
t2 Double
s))
(Vertex (StreamVertex Int
j StreamOperator
Expand [ExpQ]
_ String
_ String
_ Double
_))) =
let m :: StreamVertex
m = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
i StreamOperator
Map [[| concat |]] String
t1 String
t2 Double
s
in (StreamGraph -> StreamGraph) -> Maybe (StreamGraph -> StreamGraph)
forall a. a -> Maybe a
Just (StreamVertex -> StreamVertex -> StreamGraph -> StreamGraph
forall a. Eq a => a -> a -> Graph a -> Graph a
replaceVertex StreamVertex
e StreamVertex
m)
expandExpand StreamGraph
_ = Maybe (StreamGraph -> StreamGraph)
forall a. Maybe a
Nothing
expandExpandPre :: StreamGraph
expandExpandPre = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path
[ Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 (Double -> StreamOperator
Source Double
1) [] String
"[Int]" String
"[Int]" Double
1
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 StreamOperator
Expand [] String
"[[Int]]" String
"[Int]" Double
2
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
2 StreamOperator
Expand [] String
"[Int]" String
"Int" Double
3
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
3 StreamOperator
Sink [] String
"Int" String
"[Int]" Double
4
]
expandExpandPost :: StreamGraph
expandExpandPost = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path
[ Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 (Double -> StreamOperator
Source Double
1) [] String
"[Int]" String
"[Int]" Double
1
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 StreamOperator
Map [[| concat |]] String
"[[Int]]" String
"[Int]" Double
2
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
2 StreamOperator
Expand [] String
"[Int]" String
"Int" Double
3
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
3 StreamOperator
Sink [] String
"Int" String
"[Int]" Double
4
]
test_expandExpand :: IO ()
test_expandExpand = StreamGraph -> StreamGraph -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
assertEqual (RewriteRule -> StreamGraph -> StreamGraph
applyRule RewriteRule
expandExpand StreamGraph
expandExpandPre)
StreamGraph
expandExpandPost
mergeFilter :: RewriteRule
mergeFilter :: RewriteRule
mergeFilter = StreamOperator -> RewriteRule
hoistOp (Double -> StreamOperator
Filter Double
0)
hoistOp :: StreamOperator -> RewriteRule
hoistOp StreamOperator
op (Connect (Vertex m :: StreamVertex
m@(StreamVertex Int
i StreamOperator
Merge [ExpQ]
_ String
_ String
ty Double
_))
(Vertex f :: StreamVertex
f@(StreamVertex Int
j StreamOperator
o [ExpQ]
pred String
_ String
ty' Double
s))) =
if Bool -> Bool
not(StreamOperator -> StreamOperator -> Bool
cmpOps StreamOperator
o StreamOperator
op) then Maybe (StreamGraph -> StreamGraph)
forall a. Maybe a
Nothing
else (StreamGraph -> StreamGraph) -> Maybe (StreamGraph -> StreamGraph)
forall a. a -> Maybe a
Just ((StreamGraph -> StreamGraph)
-> Maybe (StreamGraph -> StreamGraph))
-> (StreamGraph -> StreamGraph)
-> Maybe (StreamGraph -> StreamGraph)
forall a b. (a -> b) -> a -> b
$ \StreamGraph
g -> let
mkOp :: StreamGraph -> StreamVertex
mkOp StreamGraph
g = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex (StreamGraph -> Int
newVertexId StreamGraph
g) StreamOperator
o [ExpQ]
pred String
ty String
ty' Double
s
inbound :: [StreamVertex]
inbound = ((StreamVertex, StreamVertex) -> StreamVertex)
-> [(StreamVertex, StreamVertex)] -> [StreamVertex]
forall a b. (a -> b) -> [a] -> [b]
map (StreamVertex, StreamVertex) -> StreamVertex
forall a b. (a, b) -> a
fst ([(StreamVertex, StreamVertex)] -> [StreamVertex])
-> (StreamGraph -> [(StreamVertex, StreamVertex)])
-> StreamGraph
-> [StreamVertex]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ((StreamVertex, StreamVertex) -> Bool)
-> [(StreamVertex, StreamVertex)] -> [(StreamVertex, StreamVertex)]
forall a. (a -> Bool) -> [a] -> [a]
filter ((StreamVertex
mStreamVertex -> StreamVertex -> Bool
forall a. Eq a => a -> a -> Bool
==) (StreamVertex -> Bool)
-> ((StreamVertex, StreamVertex) -> StreamVertex)
-> (StreamVertex, StreamVertex)
-> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (StreamVertex, StreamVertex) -> StreamVertex
forall a b. (a, b) -> b
snd) ([(StreamVertex, StreamVertex)] -> [(StreamVertex, StreamVertex)])
-> (StreamGraph -> [(StreamVertex, StreamVertex)])
-> StreamGraph
-> [(StreamVertex, StreamVertex)]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamGraph -> [(StreamVertex, StreamVertex)]
forall a. Ord a => Graph a -> [(a, a)]
edgeList (StreamGraph -> [StreamVertex]) -> StreamGraph -> [StreamVertex]
forall a b. (a -> b) -> a -> b
$ StreamGraph
g
snipMerge :: [StreamGraph -> StreamGraph]
snipMerge = (StreamVertex -> StreamGraph -> StreamGraph)
-> [StreamVertex] -> [StreamGraph -> StreamGraph]
forall a b. (a -> b) -> [a] -> [b]
map (\StreamVertex
v -> StreamVertex -> StreamVertex -> StreamGraph -> StreamGraph
forall a. Eq a => a -> a -> Graph a -> Graph a
removeEdge StreamVertex
v StreamVertex
m) [StreamVertex]
inbound
newOps :: [StreamGraph -> StreamGraph]
newOps = (StreamVertex -> StreamGraph -> StreamGraph)
-> [StreamVertex] -> [StreamGraph -> StreamGraph]
forall a b. (a -> b) -> [a] -> [b]
map (\StreamVertex
v -> \StreamGraph
g -> StreamGraph -> StreamGraph -> StreamGraph
forall a. Graph a -> Graph a -> Graph a
overlay StreamGraph
g (StreamGraph -> StreamGraph) -> StreamGraph -> StreamGraph
forall a b. (a -> b) -> a -> b
$ [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path [StreamVertex
v, StreamGraph -> StreamVertex
mkOp StreamGraph
g, StreamVertex
m]) [StreamVertex]
inbound
m' :: StreamVertex
m' = StreamVertex
m { intype :: String
intype = String
ty', outtype :: String
outtype = String
ty' }
in (StreamVertex -> StreamVertex -> StreamGraph -> StreamGraph
forall a. Eq a => a -> a -> Graph a -> Graph a
removeEdge StreamVertex
m StreamVertex
f
(StreamGraph -> StreamGraph)
-> (StreamGraph -> StreamGraph) -> StreamGraph -> StreamGraph
forall {k} (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> StreamVertex -> StreamVertex -> StreamGraph -> StreamGraph
forall a. Eq a => a -> a -> Graph a -> Graph a
replaceVertex StreamVertex
f StreamVertex
m
(StreamGraph -> StreamGraph)
-> (StreamGraph -> StreamGraph) -> StreamGraph -> StreamGraph
forall {k} (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> [StreamGraph -> StreamGraph] -> StreamGraph -> StreamGraph
forall a. [Graph a -> Graph a] -> Graph a -> Graph a
foldRules [StreamGraph -> StreamGraph]
snipMerge
(StreamGraph -> StreamGraph)
-> (StreamGraph -> StreamGraph) -> StreamGraph -> StreamGraph
forall {k} (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> [StreamGraph -> StreamGraph] -> StreamGraph -> StreamGraph
forall a. [Graph a -> Graph a] -> Graph a -> Graph a
foldRules [StreamGraph -> StreamGraph]
newOps
(StreamGraph -> StreamGraph)
-> (StreamGraph -> StreamGraph) -> StreamGraph -> StreamGraph
forall {k} (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> StreamVertex -> StreamVertex -> StreamGraph -> StreamGraph
forall a. Eq a => a -> a -> Graph a -> Graph a
replaceVertex StreamVertex
m StreamVertex
m') StreamGraph
g
hoistOp StreamOperator
_ StreamGraph
_ = Maybe (StreamGraph -> StreamGraph)
forall a. Maybe a
Nothing
v1 :: StreamVertex
v1 = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 (Double -> StreamOperator
Source Double
1) [] String
"Int" String
"Int" Double
1
v2 :: StreamVertex
v2 = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 (Double -> StreamOperator
Source Double
1) [] String
"Int" String
"Int" Double
2
v3 :: StreamVertex
v3 = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
2 StreamOperator
Merge [] String
"Int" String
"Int" Double
3
v4 :: StreamVertex
v4 = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
3 (Double -> StreamOperator
Filter Double
0.5) [[| (>3) |]] String
"Int" String
"Int" Double
4
v5 :: StreamVertex
v5 = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
4 StreamOperator
Sink [] String
"Int" String
"Int" Double
5
mergeFilterPre :: StreamGraph
mergeFilterPre = StreamGraph -> StreamGraph -> StreamGraph
forall a. Graph a -> Graph a -> Graph a
overlay ([StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path [StreamVertex
v1,StreamVertex
v3,StreamVertex
v4,StreamVertex
v5]) ([StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path [StreamVertex
v2,StreamVertex
v3])
v6 :: StreamVertex
v6 = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
5 (Double -> StreamOperator
Filter Double
0.5) [[| (>3) |]] String
"Int" String
"Int" Double
4
v7 :: StreamVertex
v7 = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
6 (Double -> StreamOperator
Filter Double
0.5) [[| (>3) |]] String
"Int" String
"Int" Double
4
mergeFilterPost :: StreamGraph
mergeFilterPost = StreamGraph -> StreamGraph -> StreamGraph
forall a. Graph a -> Graph a -> Graph a
overlay ([StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path [StreamVertex
v1,StreamVertex
v6,StreamVertex
v3,StreamVertex
v5]) ([StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path [StreamVertex
v2,StreamVertex
v7,StreamVertex
v3])
test_mergeFilter :: IO ()
test_mergeFilter = StreamGraph -> StreamGraph -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
assertEqual (RewriteRule -> StreamGraph -> StreamGraph
applyRule RewriteRule
mergeFilter StreamGraph
mergeFilterPre)
StreamGraph
mergeFilterPost
mergeExpand :: RewriteRule
mergeExpand :: RewriteRule
mergeExpand = StreamOperator -> RewriteRule
hoistOp StreamOperator
Expand
v8 :: StreamVertex
v8 = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 (Double -> StreamOperator
Source Double
1) [] String
"[Int]" String
"[Int]" Double
1
v9 :: StreamVertex
v9 = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 (Double -> StreamOperator
Source Double
1) [] String
"[Int]" String
"[Int]" Double
2
v10 :: StreamVertex
v10 = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
2 StreamOperator
Merge [] String
"[Int]" String
"[Int]" Double
3
v11 :: StreamVertex
v11 = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
3 StreamOperator
Expand [] String
"[Int]" String
"Int" Double
4
mergeExpandPre :: StreamGraph
mergeExpandPre = StreamGraph -> StreamGraph -> StreamGraph
forall a. Graph a -> Graph a -> Graph a
overlay ([StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path [StreamVertex
v8, StreamVertex
v10, StreamVertex
v11, StreamVertex
v5]) ([StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path [StreamVertex
v9, StreamVertex
v10])
v12 :: StreamVertex
v12 = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
2 StreamOperator
Merge [] String
"Int" String
"Int" Double
3
v13 :: StreamVertex
v13 = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
5 StreamOperator
Expand [] String
"[Int]" String
"Int" Double
4
v14 :: StreamVertex
v14 = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
6 StreamOperator
Expand [] String
"[Int]" String
"Int" Double
4
mergeExpandPost :: StreamGraph
mergeExpandPost = StreamGraph -> StreamGraph -> StreamGraph
forall a. Graph a -> Graph a -> Graph a
overlay ([StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path [StreamVertex
v8, StreamVertex
v13, StreamVertex
v12, StreamVertex
v5]) ([StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path [StreamVertex
v9, StreamVertex
v14, StreamVertex
v12])
test_mergeExpand :: IO ()
test_mergeExpand = StreamGraph -> StreamGraph -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
assertEqual (RewriteRule -> StreamGraph -> StreamGraph
applyRule RewriteRule
mergeExpand StreamGraph
mergeExpandPre)
StreamGraph
mergeExpandPost
mergeMap :: RewriteRule
mergeMap :: RewriteRule
mergeMap = StreamOperator -> RewriteRule
hoistOp StreamOperator
Map
v15 :: StreamVertex
v15 = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 (Double -> StreamOperator
Source Double
1) [] String
"Int" String
"Int" Double
1
v16 :: StreamVertex
v16 = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 (Double -> StreamOperator
Source Double
1) [] String
"Int" String
"Int" Double
2
v17 :: StreamVertex
v17 = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
2 StreamOperator
Merge [] String
"Int" String
"Int" Double
3
v18 :: StreamVertex
v18 = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
3 StreamOperator
Map [[| show |]] String
"Int" String
"String" Double
4
v19 :: StreamVertex
v19 = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
4 StreamOperator
Sink [] String
"String" String
"String" Double
5
mergeMapPre :: StreamGraph
mergeMapPre = StreamGraph -> StreamGraph -> StreamGraph
forall a. Graph a -> Graph a -> Graph a
overlay ([StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path [StreamVertex
v15,StreamVertex
v17,StreamVertex
v18,StreamVertex
v19]) ([StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path [StreamVertex
v16,StreamVertex
v17])
v20 :: StreamVertex
v20 = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
5 StreamOperator
Map [[| show |]] String
"Int" String
"String" Double
4
v21 :: StreamVertex
v21 = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
6 StreamOperator
Map [[| show |]] String
"Int" String
"String" Double
4
v22 :: StreamVertex
v22 = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
2 StreamOperator
Merge [] String
"String" String
"String" Double
3
mergeMapPost :: StreamGraph
mergeMapPost = StreamGraph -> StreamGraph -> StreamGraph
forall a. Graph a -> Graph a -> Graph a
overlay ([StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path [StreamVertex
v15,StreamVertex
v20,StreamVertex
v22,StreamVertex
v19]) ([StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path [StreamVertex
v16,StreamVertex
v21,StreamVertex
v22])
test_mergeMap :: IO ()
test_mergeMap = StreamGraph -> StreamGraph -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
assertEqual (RewriteRule -> StreamGraph -> StreamGraph
applyRule RewriteRule
mergeMap StreamGraph
mergeMapPre) StreamGraph
mergeMapPost
identicalParams :: [StreamVertex] -> Bool
identicalParams :: [StreamVertex] -> Bool
identicalParams [StreamVertex]
inbound =
let params :: [[Exp]]
params = (([ExpQ] -> [Exp]) -> [[ExpQ]] -> [[Exp]]
forall a b. (a -> b) -> [a] -> [b]
map(([ExpQ] -> [Exp]) -> [[ExpQ]] -> [[Exp]])
-> ((ExpQ -> Exp) -> [ExpQ] -> [Exp])
-> (ExpQ -> Exp)
-> [[ExpQ]]
-> [[Exp]]
forall b c a. (b -> c) -> (a -> b) -> a -> c
.(ExpQ -> Exp) -> [ExpQ] -> [Exp]
forall a b. (a -> b) -> [a] -> [b]
map) ExpQ -> Exp
deQ ((StreamVertex -> [ExpQ]) -> [StreamVertex] -> [[ExpQ]]
forall a b. (a -> b) -> [a] -> [b]
map StreamVertex -> [ExpQ]
parameters [StreamVertex]
inbound)
in [Bool] -> Bool
forall (t :: * -> *). Foldable t => t Bool -> Bool
and (([Exp] -> Bool) -> [[Exp]] -> [Bool]
forall a b. (a -> b) -> [a] -> [b]
map ([Exp] -> [Exp] -> Bool
forall a. Eq a => a -> a -> Bool
==([[Exp]] -> [Exp]
forall a. [a] -> a
head [[Exp]]
params)) ([[Exp]] -> [[Exp]]
forall a. [a] -> [a]
tail [[Exp]]
params))
pushOp :: StreamOperator -> RewriteRule
pushOp StreamOperator
op (Connect (Vertex ma :: StreamVertex
ma@(StreamVertex Int
i StreamOperator
o [ExpQ]
fs String
t1 String
t2 Double
_))
(Vertex me :: StreamVertex
me@(StreamVertex Int
j StreamOperator
Merge [ExpQ]
_ String
t3 String
_ Double
_))) =
if Bool -> Bool
not(StreamOperator -> StreamOperator -> Bool
cmpOps StreamOperator
o StreamOperator
op) then Maybe (StreamGraph -> StreamGraph)
forall a. Maybe a
Nothing
else (StreamGraph -> StreamGraph) -> Maybe (StreamGraph -> StreamGraph)
forall a. a -> Maybe a
Just ((StreamGraph -> StreamGraph)
-> Maybe (StreamGraph -> StreamGraph))
-> (StreamGraph -> StreamGraph)
-> Maybe (StreamGraph -> StreamGraph)
forall a b. (a -> b) -> a -> b
$ \StreamGraph
g -> let
inbound :: [StreamVertex]
inbound = ((StreamVertex, StreamVertex) -> StreamVertex)
-> [(StreamVertex, StreamVertex)] -> [StreamVertex]
forall a b. (a -> b) -> [a] -> [b]
map (StreamVertex, StreamVertex) -> StreamVertex
forall a b. (a, b) -> a
fst ([(StreamVertex, StreamVertex)] -> [StreamVertex])
-> (StreamGraph -> [(StreamVertex, StreamVertex)])
-> StreamGraph
-> [StreamVertex]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ((StreamVertex, StreamVertex) -> Bool)
-> [(StreamVertex, StreamVertex)] -> [(StreamVertex, StreamVertex)]
forall a. (a -> Bool) -> [a] -> [a]
filter ((StreamVertex
meStreamVertex -> StreamVertex -> Bool
forall a. Eq a => a -> a -> Bool
==) (StreamVertex -> Bool)
-> ((StreamVertex, StreamVertex) -> StreamVertex)
-> (StreamVertex, StreamVertex)
-> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (StreamVertex, StreamVertex) -> StreamVertex
forall a b. (a, b) -> b
snd) ([(StreamVertex, StreamVertex)] -> [(StreamVertex, StreamVertex)])
-> (StreamGraph -> [(StreamVertex, StreamVertex)])
-> StreamGraph
-> [(StreamVertex, StreamVertex)]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamGraph -> [(StreamVertex, StreamVertex)]
forall a. Ord a => Graph a -> [(a, a)]
edgeList (StreamGraph -> [StreamVertex]) -> StreamGraph -> [StreamVertex]
forall a b. (a -> b) -> a -> b
$ StreamGraph
g
in if [StreamVertex] -> Bool
identicalOperators [StreamVertex]
inbound Bool -> Bool -> Bool
&& [StreamVertex] -> Bool
identicalParams [StreamVertex]
inbound
then let
me' :: StreamVertex
me' = StreamVertex
me { intype :: String
intype = String
t1, outtype :: String
outtype = String
t1 }
ma' :: StreamVertex
ma' = StreamVertex
ma { vertexId :: Int
vertexId = StreamGraph -> Int
newVertexId StreamGraph
g }
on :: StreamVertex
on = (StreamVertex, StreamVertex) -> StreamVertex
forall a b. (a, b) -> b
snd ((StreamVertex, StreamVertex) -> StreamVertex)
-> (StreamGraph -> (StreamVertex, StreamVertex))
-> StreamGraph
-> StreamVertex
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [(StreamVertex, StreamVertex)] -> (StreamVertex, StreamVertex)
forall a. [a] -> a
head ([(StreamVertex, StreamVertex)] -> (StreamVertex, StreamVertex))
-> (StreamGraph -> [(StreamVertex, StreamVertex)])
-> StreamGraph
-> (StreamVertex, StreamVertex)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ((StreamVertex, StreamVertex) -> Bool)
-> [(StreamVertex, StreamVertex)] -> [(StreamVertex, StreamVertex)]
forall a. (a -> Bool) -> [a] -> [a]
filter ((StreamVertex -> StreamVertex -> Bool
forall a. Eq a => a -> a -> Bool
==StreamVertex
me) (StreamVertex -> Bool)
-> ((StreamVertex, StreamVertex) -> StreamVertex)
-> (StreamVertex, StreamVertex)
-> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (StreamVertex, StreamVertex) -> StreamVertex
forall a b. (a, b) -> a
fst) ([(StreamVertex, StreamVertex)] -> [(StreamVertex, StreamVertex)])
-> (StreamGraph -> [(StreamVertex, StreamVertex)])
-> StreamGraph
-> [(StreamVertex, StreamVertex)]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamGraph -> [(StreamVertex, StreamVertex)]
forall a. Ord a => Graph a -> [(a, a)]
edgeList (StreamGraph -> StreamVertex) -> StreamGraph -> StreamVertex
forall a b. (a -> b) -> a -> b
$ StreamGraph
g
in ( StreamVertex -> StreamVertex -> StreamGraph -> StreamGraph
forall a. Eq a => a -> a -> Graph a -> Graph a
removeEdge StreamVertex
me StreamVertex
on
(StreamGraph -> StreamGraph)
-> (StreamGraph -> StreamGraph) -> StreamGraph -> StreamGraph
forall {k} (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> (StreamVertex -> Bool)
-> StreamVertex -> StreamGraph -> StreamGraph
forall a. (a -> Bool) -> a -> Graph a -> Graph a
mergeVertices (StreamVertex -> [StreamVertex] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` [StreamVertex]
inbound) StreamVertex
me
(StreamGraph -> StreamGraph)
-> (StreamGraph -> StreamGraph) -> StreamGraph -> StreamGraph
forall {k} (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> StreamVertex -> StreamVertex -> StreamGraph -> StreamGraph
forall a. Eq a => a -> a -> Graph a -> Graph a
replaceVertex StreamVertex
me StreamVertex
me'
(StreamGraph -> StreamGraph)
-> (StreamGraph -> StreamGraph) -> StreamGraph -> StreamGraph
forall {k} (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> StreamVertex -> StreamVertex -> StreamGraph -> StreamGraph
forall a. Eq a => a -> a -> Graph a -> Graph a
removeEdge StreamVertex
me' StreamVertex
me'
(StreamGraph -> StreamGraph)
-> (StreamGraph -> StreamGraph) -> StreamGraph -> StreamGraph
forall {k} (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> StreamGraph -> StreamGraph -> StreamGraph
forall a. Graph a -> Graph a -> Graph a
overlay ([StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path [StreamVertex
me', StreamVertex
ma', StreamVertex
on])
) StreamGraph
g
else StreamGraph
g
pushOp StreamOperator
_ StreamGraph
_ = Maybe (StreamGraph -> StreamGraph)
forall a. Maybe a
Nothing
mapMerge :: RewriteRule
mapMerge :: RewriteRule
mapMerge = StreamOperator -> RewriteRule
pushOp StreamOperator
Map
mapMergePre :: StreamGraph
mapMergePre = StreamGraph
mergeMapPost
mapMergePost :: StreamGraph
mapMergePost = StreamGraph -> StreamGraph -> StreamGraph
forall a. Graph a -> Graph a -> Graph a
overlay ([StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path [StreamVertex
v15,StreamVertex
v17,StreamVertex
v18 {vertexId :: Int
vertexId = Int
7}, StreamVertex
v19]) ([StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path [StreamVertex
v16,StreamVertex
v17])
test_mapMerge :: IO ()
test_mapMerge = StreamGraph -> StreamGraph -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
assertEqual (RewriteRule -> StreamGraph -> StreamGraph
applyRule RewriteRule
mapMerge StreamGraph
mapMergePre) StreamGraph
mapMergePost
filterMerge :: RewriteRule
filterMerge :: RewriteRule
filterMerge = StreamOperator -> RewriteRule
pushOp (Double -> StreamOperator
Filter Double
0)
filterMergePre :: StreamGraph
filterMergePre = StreamGraph
mergeFilterPost
filterMergePost :: StreamGraph
filterMergePost = StreamGraph -> StreamGraph -> StreamGraph
forall a. Graph a -> Graph a -> Graph a
overlay ([StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path [StreamVertex
v1,StreamVertex
v3,StreamVertex
v4 {vertexId :: Int
vertexId=Int
7},StreamVertex
v5]) ([StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path [StreamVertex
v2,StreamVertex
v3])
test_filterMerge :: IO ()
test_filterMerge = StreamGraph -> StreamGraph -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
assertEqual (RewriteRule -> StreamGraph -> StreamGraph
applyRule RewriteRule
filterMerge StreamGraph
filterMergePre)
StreamGraph
filterMergePost
expandMerge :: RewriteRule
expandMerge :: RewriteRule
expandMerge = StreamOperator -> RewriteRule
pushOp StreamOperator
Expand
expandMergePre :: StreamGraph
expandMergePre = StreamGraph
mergeExpandPost
expandMergePost :: StreamGraph
expandMergePost = StreamGraph -> StreamGraph -> StreamGraph
forall a. Graph a -> Graph a -> Graph a
overlay ([StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path [StreamVertex
v8, StreamVertex
v10, StreamVertex
v11 {vertexId :: Int
vertexId=Int
7}, StreamVertex
v5]) ([StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path [StreamVertex
v9, StreamVertex
v10])
test_expandMerge :: IO ()
test_expandMerge = StreamGraph -> StreamGraph -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
assertEqual (RewriteRule -> StreamGraph -> StreamGraph
applyRule RewriteRule
expandMerge StreamGraph
expandMergePre)
StreamGraph
expandMergePost
mergeFuse :: RewriteRule
mergeFuse :: RewriteRule
mergeFuse (Connect (Vertex m1 :: StreamVertex
m1@(StreamVertex Int
i StreamOperator
Merge [ExpQ]
_ String
_ String
_ Double
_))
(Vertex m2 :: StreamVertex
m2@(StreamVertex Int
j StreamOperator
Merge [ExpQ]
_ String
_ String
_ Double
_))) =
(StreamGraph -> StreamGraph) -> Maybe (StreamGraph -> StreamGraph)
forall a. a -> Maybe a
Just (StreamVertex -> StreamVertex -> StreamGraph -> StreamGraph
forall a. Eq a => a -> a -> Graph a -> Graph a
removeEdge StreamVertex
m1 StreamVertex
m1 (StreamGraph -> StreamGraph)
-> (StreamGraph -> StreamGraph) -> StreamGraph -> StreamGraph
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (StreamVertex -> Bool)
-> StreamVertex -> StreamGraph -> StreamGraph
forall a. (a -> Bool) -> a -> Graph a -> Graph a
mergeVertices (StreamVertex -> [StreamVertex] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` [StreamVertex
m1,StreamVertex
m2]) StreamVertex
m1)
mergeFuse StreamGraph
_ = Maybe (StreamGraph -> StreamGraph)
forall a. Maybe a
Nothing
v23 :: StreamVertex
v23 = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 (Double -> StreamOperator
Source Double
1) [] String
"Int" String
"Int" Double
1
v24 :: StreamVertex
v24 = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 (Double -> StreamOperator
Source Double
1) [] String
"Int" String
"Int" Double
2
v25 :: StreamVertex
v25 = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
2 (Double -> StreamOperator
Source Double
1) [] String
"Int" String
"Int" Double
3
v26 :: StreamVertex
v26 = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
3 StreamOperator
Merge [] String
"Int" String
"Int" Double
4
v27 :: StreamVertex
v27 = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
4 StreamOperator
Merge [] String
"Int" String
"Int" Double
5
v28 :: StreamVertex
v28 = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
5 StreamOperator
Sink [] String
"Int" String
"Int" Double
6
mergeFusePre :: StreamGraph
mergeFusePre = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path [StreamVertex
v23,StreamVertex
v26,StreamVertex
v27,StreamVertex
v28]
StreamGraph -> StreamGraph -> StreamGraph
forall a. Graph a -> Graph a -> Graph a
`Overlay` [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path [StreamVertex
v24,StreamVertex
v26,StreamVertex
v27]
StreamGraph -> StreamGraph -> StreamGraph
forall a. Graph a -> Graph a -> Graph a
`Overlay` [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path [StreamVertex
v25,StreamVertex
v27]
mergeFusePost :: StreamGraph
mergeFusePost = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path [StreamVertex
v23,StreamVertex
v26,StreamVertex
v28]
StreamGraph -> StreamGraph -> StreamGraph
forall a. Graph a -> Graph a -> Graph a
`Overlay` [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path [StreamVertex
v24,StreamVertex
v26]
StreamGraph -> StreamGraph -> StreamGraph
forall a. Graph a -> Graph a -> Graph a
`Overlay` [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path [StreamVertex
v25,StreamVertex
v26]
test_mergeFuse :: IO ()
test_mergeFuse = StreamGraph -> StreamGraph -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
assertEqual (RewriteRule -> StreamGraph -> StreamGraph
applyRule RewriteRule
mergeFuse StreamGraph
mergeFusePre) StreamGraph
mergeFusePost
expandFilterAcc :: RewriteRule
expandFilterAcc :: RewriteRule
expandFilterAcc (Connect (Vertex e :: StreamVertex
e@(StreamVertex Int
i StreamOperator
Expand [ExpQ]
_ String
t1 String
t2 Double
_))
(Vertex fa :: StreamVertex
fa@(StreamVertex Int
j (FilterAcc Double
_) (ExpQ
f:ExpQ
a:ExpQ
p:[ExpQ]
_) String
_ String
_ Double
s))) =
(StreamGraph -> StreamGraph) -> Maybe (StreamGraph -> StreamGraph)
forall a. a -> Maybe a
Just ((StreamGraph -> StreamGraph)
-> Maybe (StreamGraph -> StreamGraph))
-> (StreamGraph -> StreamGraph)
-> Maybe (StreamGraph -> StreamGraph)
forall a b. (a -> b) -> a -> b
$ \StreamGraph
g -> let
scan :: StreamVertex
scan = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
i StreamOperator
Scan
[ [| \(_,acc) a -> filterAcc $(f) acc $(p) a |]
, [| ([],$(a)) |]
] String
t1 String
t1 Double
s
mapr :: StreamVertex
mapr = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
j StreamOperator
Map [[| reverse.fst |]] String
t1 String
t1 Double
0
k :: Int
k = StreamGraph -> Int
newVertexId StreamGraph
g
expd :: StreamVertex
expd = StreamVertex
e { vertexId :: Int
vertexId = Int
k }
in StreamGraph
g StreamGraph -> (StreamGraph -> StreamGraph) -> StreamGraph
forall a b. a -> (a -> b) -> b
& StreamVertex -> StreamVertex -> StreamGraph -> StreamGraph
forall a. Eq a => a -> a -> Graph a -> Graph a
removeEdge StreamVertex
e StreamVertex
fa
StreamGraph -> (StreamGraph -> StreamGraph) -> StreamGraph
forall a b. a -> (a -> b) -> b
& StreamVertex -> StreamVertex -> StreamGraph -> StreamGraph
forall a. Eq a => a -> a -> Graph a -> Graph a
replaceVertex StreamVertex
e StreamVertex
scan
StreamGraph -> (StreamGraph -> StreamGraph) -> StreamGraph
forall a b. a -> (a -> b) -> b
& StreamVertex -> StreamVertex -> StreamGraph -> StreamGraph
forall a. Eq a => a -> a -> Graph a -> Graph a
replaceVertex StreamVertex
fa StreamVertex
expd
StreamGraph -> (StreamGraph -> StreamGraph) -> StreamGraph
forall a b. a -> (a -> b) -> b
& StreamGraph -> StreamGraph -> StreamGraph
forall a. Graph a -> Graph a -> Graph a
overlay ([StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path [StreamVertex
scan,StreamVertex
mapr,StreamVertex
expd])
expandFilterAcc StreamGraph
_ = Maybe (StreamGraph -> StreamGraph)
forall a. Maybe a
Nothing
expandFilterAccPre :: StreamGraph
expandFilterAccPre = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path
[ Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 (Double -> StreamOperator
Source Double
1) [] String
"[Int]" String
"[Int]" Double
1
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 StreamOperator
Expand [] String
"[Int]" String
"Int" Double
1
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
2 (Double -> StreamOperator
FilterAcc Double
0.5) [ExpQ
f, ExpQ
a, ExpQ
p] String
"Int" String
"Int" Double
2
] where f :: ExpQ
f = [| (\_ h -> (False, h)) |]
a :: ExpQ
a = [| (True, undefined) |]
p :: ExpQ
p = [| \new (b,old) -> b || old /= new |]
expandFilterAccPost :: StreamGraph
expandFilterAccPost = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path
[ Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 (Double -> StreamOperator
Source Double
1) [] String
"[Int]" String
"[Int]" Double
1
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 StreamOperator
Scan [ExpQ
sa, ExpQ
si] String
"[Int]" String
"[Int]" Double
2
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
2 StreamOperator
Map [ExpQ
f'] String
"[Int]" String
"[Int]" Double
0
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
3 StreamOperator
Expand [] String
"[Int]" String
"Int" Double
1
] where f :: ExpQ
f = [| (\_ h -> (False, h)) |]
a :: ExpQ
a = [| (True, undefined) |]
p :: ExpQ
p = [| \new (b,old) -> b || old /= new |]
f' :: ExpQ
f' = [| reverse.fst |]
sa :: ExpQ
sa = [| \(_,acc) a -> filterAcc $(f) acc $(p) a |]
si :: ExpQ
si = [| ([],$(a)) |]
test_expandFilterAcc :: IO ()
test_expandFilterAcc = StreamGraph -> StreamGraph -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
assertEqual StreamGraph
expandFilterAccPost
(StreamGraph -> IO ()) -> StreamGraph -> IO ()
forall a b. (a -> b) -> a -> b
$ RewriteRule -> StreamGraph -> StreamGraph
applyRule RewriteRule
expandFilterAcc StreamGraph
expandFilterAccPre
sumTimes :: Double -> Double -> Double -> Double
sumTimes :: Double -> Double -> Double -> Double
sumTimes Double
a Double
f Double
b = Double
a Double -> Double -> Double
forall a. Num a => a -> a -> a
+ (Double
f Double -> Double -> Double
forall a. Num a => a -> a -> a
* Double
b)
test_sumTimes1 :: IO ()
test_sumTimes1 = Double -> Double -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
assertEqual Double
1 (Double -> IO ()) -> Double -> IO ()
forall a b. (a -> b) -> a -> b
$ Double -> Double -> Double -> Double
sumTimes (Double
1Double -> Double -> Double
forall a. Fractional a => a -> a -> a
/Double
2) Double
1 (Double
1Double -> Double -> Double
forall a. Fractional a => a -> a -> a
/Double
2)
test_sumTimes2 :: IO ()
test_sumTimes2 = Double -> Double -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
assertEqual (Double
1Double -> Double -> Double
forall a. Fractional a => a -> a -> a
/Double
2) (Double -> IO ()) -> Double -> IO ()
forall a b. (a -> b) -> a -> b
$ Double -> Double -> Double -> Double
sumTimes (Double
1Double -> Double -> Double
forall a. Fractional a => a -> a -> a
/Double
4) Double
1 (Double
1Double -> Double -> Double
forall a. Fractional a => a -> a -> a
/Double
4)
cmpOps :: StreamOperator -> StreamOperator -> Bool
cmpOps :: StreamOperator -> StreamOperator -> Bool
cmpOps (Filter Double
_) (Filter Double
_) = Bool
True
cmpOps (FilterAcc Double
_) (FilterAcc Double
_) = Bool
True
cmpOps StreamOperator
x StreamOperator
y = StreamOperator
x StreamOperator -> StreamOperator -> Bool
forall a. Eq a => a -> a -> Bool
== StreamOperator
y
identicalOperators :: [StreamVertex] -> Bool
identicalOperators :: [StreamVertex] -> Bool
identicalOperators = (Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
==Int
1) (Int -> Bool) -> ([StreamVertex] -> Int) -> [StreamVertex] -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [StreamOperator] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length ([StreamOperator] -> Int)
-> ([StreamVertex] -> [StreamOperator]) -> [StreamVertex] -> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [StreamOperator] -> [StreamOperator]
forall a. Eq a => [a] -> [a]
nub ([StreamOperator] -> [StreamOperator])
-> ([StreamVertex] -> [StreamOperator])
-> [StreamVertex]
-> [StreamOperator]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (StreamVertex -> StreamOperator)
-> [StreamVertex] -> [StreamOperator]
forall a b. (a -> b) -> [a] -> [b]
map StreamVertex -> StreamOperator
operator
foldRules :: [Graph a -> Graph a] -> Graph a -> Graph a
foldRules :: forall a. [Graph a -> Graph a] -> Graph a -> Graph a
foldRules [Graph a -> Graph a]
rules Graph a
g = (Graph a -> (Graph a -> Graph a) -> Graph a)
-> Graph a -> [Graph a -> Graph a] -> Graph a
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
foldl Graph a -> (Graph a -> Graph a) -> Graph a
forall a b. a -> (a -> b) -> b
(&) Graph a
g [Graph a -> Graph a]
rules
newVertexId :: StreamGraph -> Int
newVertexId :: StreamGraph -> Int
newVertexId = Int -> Int
forall a. Enum a => a -> a
succ (Int -> Int) -> (StreamGraph -> Int) -> StreamGraph -> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [Int] -> Int
forall a. [a] -> a
last ([Int] -> Int) -> (StreamGraph -> [Int]) -> StreamGraph -> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [Int] -> [Int]
forall a. Ord a => [a] -> [a]
sort ([Int] -> [Int]) -> (StreamGraph -> [Int]) -> StreamGraph -> [Int]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (StreamVertex -> Int) -> [StreamVertex] -> [Int]
forall a b. (a -> b) -> [a] -> [b]
map StreamVertex -> Int
vertexId ([StreamVertex] -> [Int])
-> (StreamGraph -> [StreamVertex]) -> StreamGraph -> [Int]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamGraph -> [StreamVertex]
forall a. Ord a => Graph a -> [a]
vertexList
main :: IO ()
main = TestSuite -> IO ()
forall t. TestableHTF t => t -> IO ()
htfMain htf_thisModulesTests
filterWindow :: RewriteRule
filterWindow :: RewriteRule
filterWindow (Connect (Vertex f :: StreamVertex
f@(StreamVertex Int
i (Filter Double
_) (ExpQ
p:[ExpQ]
_) String
_ String
_ Double
s))
(Vertex w :: StreamVertex
w@(StreamVertex Int
j StreamOperator
Window [ExpQ]
_ String
_ String
t Double
_))) =
let m :: StreamVertex
m = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
j StreamOperator
Map [[| filter $(p) |]] String
t String
t Double
s
w' :: StreamVertex
w'= StreamVertex
w { vertexId :: Int
vertexId = Int
i }
in (StreamGraph -> StreamGraph) -> Maybe (StreamGraph -> StreamGraph)
forall a. a -> Maybe a
Just (StreamVertex -> StreamVertex -> StreamGraph -> StreamGraph
forall a. Eq a => a -> a -> Graph a -> Graph a
replaceVertex StreamVertex
f StreamVertex
w' (StreamGraph -> StreamGraph)
-> (StreamGraph -> StreamGraph) -> StreamGraph -> StreamGraph
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamVertex -> StreamVertex -> StreamGraph -> StreamGraph
forall a. Eq a => a -> a -> Graph a -> Graph a
replaceVertex StreamVertex
w StreamVertex
m)
filterWindow StreamGraph
_ = Maybe (StreamGraph -> StreamGraph)
forall a. Maybe a
Nothing
filterWindowPre :: StreamGraph
filterWindowPre = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path
[ Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 (Double -> StreamOperator
Source Double
1) [] String
"Int" String
"Int" Double
1
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 (Double -> StreamOperator
Filter Double
0.5) [[|(>3)|]] String
"Int" String
"Int" Double
2
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
2 StreamOperator
Window [] String
"Int" String
"[Int]" Double
3
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
3 StreamOperator
Sink [] String
"[Int]" String
"[Int]" Double
4
]
filterWindowPost :: StreamGraph
filterWindowPost = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path
[ Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 (Double -> StreamOperator
Source Double
1) [] String
"Int" String
"Int" Double
1
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 StreamOperator
Window [] String
"Int" String
"[Int]" Double
3
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
2 StreamOperator
Map [[|filter (>3)|]] String
"[Int]" String
"[Int]" Double
2
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
3 StreamOperator
Sink [] String
"[Int]" String
"[Int]" Double
4
]
test_filterWindow :: IO ()
test_filterWindow = StreamGraph -> StreamGraph -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
assertEqual (RewriteRule -> StreamGraph -> StreamGraph
applyRule RewriteRule
filterWindow StreamGraph
filterWindowPre) StreamGraph
filterWindowPost
filterAccWindow :: RewriteRule
filterAccWindow :: RewriteRule
filterAccWindow (Connect (Vertex fa :: StreamVertex
fa@(StreamVertex Int
i (FilterAcc Double
_) (ExpQ
f:ExpQ
a:ExpQ
p:[ExpQ]
_) String
_ String
_ Double
s))
(Vertex w :: StreamVertex
w@(StreamVertex Int
j StreamOperator
Window [ExpQ]
_ String
_ String
t Double
_))) =
(StreamGraph -> StreamGraph) -> Maybe (StreamGraph -> StreamGraph)
forall a. a -> Maybe a
Just ((StreamGraph -> StreamGraph)
-> Maybe (StreamGraph -> StreamGraph))
-> (StreamGraph -> StreamGraph)
-> Maybe (StreamGraph -> StreamGraph)
forall a b. (a -> b) -> a -> b
$ \StreamGraph
g -> let
w' :: StreamVertex
w' = StreamVertex
w { vertexId :: Int
vertexId = Int
i }
sc :: StreamVertex
sc = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
j StreamOperator
Scan
[ [| \ (_, acc) a -> filterAcc $(f) acc $(p) a |]
, [| ([], $(a)) |]
] String
t String
t Double
s
m :: StreamVertex
m = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex (StreamGraph -> Int
newVertexId StreamGraph
g) StreamOperator
Map [[| reverse.fst |]] String
t String
t Double
0
in StreamGraph
g StreamGraph -> (StreamGraph -> StreamGraph) -> StreamGraph
forall a b. a -> (a -> b) -> b
& StreamVertex -> StreamVertex -> StreamGraph -> StreamGraph
forall a. Eq a => a -> a -> Graph a -> Graph a
replaceVertex StreamVertex
fa StreamVertex
w'
StreamGraph -> (StreamGraph -> StreamGraph) -> StreamGraph
forall a b. a -> (a -> b) -> b
& StreamVertex -> StreamVertex -> StreamGraph -> StreamGraph
forall a. Eq a => a -> a -> Graph a -> Graph a
replaceVertex StreamVertex
w StreamVertex
m
StreamGraph -> (StreamGraph -> StreamGraph) -> StreamGraph
forall a b. a -> (a -> b) -> b
& StreamVertex -> StreamVertex -> StreamGraph -> StreamGraph
forall a. Eq a => a -> a -> Graph a -> Graph a
removeEdge StreamVertex
w' StreamVertex
m
StreamGraph -> (StreamGraph -> StreamGraph) -> StreamGraph
forall a b. a -> (a -> b) -> b
& StreamGraph -> StreamGraph -> StreamGraph
forall a. Graph a -> Graph a -> Graph a
overlay ([StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path [StreamVertex
w', StreamVertex
sc, StreamVertex
m])
filterAccWindow StreamGraph
_ = Maybe (StreamGraph -> StreamGraph)
forall a. Maybe a
Nothing
filterAccWindowPre :: StreamGraph
filterAccWindowPre = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path
[ Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 (Double -> StreamOperator
Source Double
1) [] String
"Int" String
"Int" Double
1
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 (Double -> StreamOperator
FilterAcc Double
0.5) [ExpQ
f, ExpQ
a, ExpQ
p] String
"Int" String
"Int" Double
2
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
2 StreamOperator
Window [] String
"Int" String
"[Int]" Double
1
] where f :: ExpQ
f = [| (\_ h -> (False, h)) |]
a :: ExpQ
a = [| (True, undefined) |]
p :: ExpQ
p = [| \new (b,old) -> b || old /= new |]
filterAccWindowPost :: StreamGraph
filterAccWindowPost = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path
[ Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 (Double -> StreamOperator
Source Double
1) [] String
"Int" String
"Int" Double
1
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 StreamOperator
Window [] String
"Int" String
"[Int]" Double
1
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
2 StreamOperator
Scan [ExpQ
sa, ExpQ
si] String
"[Int]" String
"[Int]" Double
2
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
3 StreamOperator
Map [ExpQ
f'] String
"[Int]" String
"[Int]" Double
0
] where f :: ExpQ
f = [| (\_ h -> (False, h)) |]
a :: ExpQ
a = [| (True, undefined) |]
p :: ExpQ
p = [| \new (b,old) -> b || old /= new |]
f' :: ExpQ
f' = [| reverse.fst |]
sa :: ExpQ
sa = [| \(_,acc) a -> filterAcc $(f) acc $(p) a |]
si :: ExpQ
si = [| ([],$(a)) |]
test_filterAccWindow :: IO ()
test_filterAccWindow = StreamGraph -> StreamGraph -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
assertEqual StreamGraph
filterAccWindowPost
(StreamGraph -> IO ()) -> StreamGraph -> IO ()
forall a b. (a -> b) -> a -> b
$ RewriteRule -> StreamGraph -> StreamGraph
applyRule RewriteRule
filterAccWindow StreamGraph
filterAccWindowPre