{-# OPTIONS_GHC -F -pgmF htfpp #-}
{-# OPTIONS_HADDOCK prune #-}
{-# LANGUAGE TemplateHaskell #-}

module Striot.LogicalOptimiser ( applyRules

                               , applyRule
                               , firstMatch

                               , RewriteRule(..)
                               , defaultRewriteRules

                               , mapFilter
                               , filterFilterAcc
                               , filterAccFilter
                               , filterAccFilterAcc
                               , filterFuse
                               , mapFuse
                               , mapScan
                               , expandFilter
                               , mapFilterAcc
                               , mapWindow
                               , expandMap
                               , expandScan
                               , expandExpand
                               , mergeFilter
                               , mergeExpand
                               , mergeMap
                               , mapMerge
                               , filterMerge
                               , expandMerge
                               , mergeFuse
                               , expandFilterAcc

                               , filterWindow
                               , filterAccWindow

                               , htf_thisModulesTests
                               ) where

import Striot.StreamGraph
import Striot.FunctionalProcessing
import Algebra.Graph
import Test.Framework hiding ((===))
import Data.Char (isLower)
import Data.Maybe (mapMaybe, fromMaybe)
import Data.Function ((&))
import Data.List (nub, sort, intercalate)
import Control.Arrow ((>>>))

-- applying encoded rules and their resulting ReWriteOps ----------------------

type RewriteRule = StreamGraph -> Maybe (StreamGraph -> StreamGraph)

applyRule :: RewriteRule -> StreamGraph -> StreamGraph
applyRule :: RewriteRule -> StreamGraph -> StreamGraph
applyRule RewriteRule
f StreamGraph
g = StreamGraph
g StreamGraph -> (StreamGraph -> StreamGraph) -> StreamGraph
forall a b. a -> (a -> b) -> b
& (StreamGraph -> StreamGraph)
-> Maybe (StreamGraph -> StreamGraph) -> StreamGraph -> StreamGraph
forall a. a -> Maybe a -> a
fromMaybe StreamGraph -> StreamGraph
forall a. a -> a
id (StreamGraph -> RewriteRule -> Maybe (StreamGraph -> StreamGraph)
firstMatch StreamGraph
g RewriteRule
f)

-- recursively attempt to apply the rule to the graph, but stop
-- as soon as we get a match
firstMatch :: StreamGraph -> RewriteRule -> Maybe (StreamGraph -> StreamGraph)
firstMatch :: StreamGraph -> RewriteRule -> Maybe (StreamGraph -> StreamGraph)
firstMatch StreamGraph
g RewriteRule
f = case RewriteRule
f StreamGraph
g of
        Just StreamGraph -> StreamGraph
f -> (StreamGraph -> StreamGraph) -> Maybe (StreamGraph -> StreamGraph)
forall a. a -> Maybe a
Just StreamGraph -> StreamGraph
f
        Maybe (StreamGraph -> StreamGraph)
_      -> case StreamGraph
g of
            StreamGraph
Empty       -> Maybe (StreamGraph -> StreamGraph)
forall a. Maybe a
Nothing
            Vertex StreamVertex
v    -> Maybe (StreamGraph -> StreamGraph)
forall a. Maybe a
Nothing
            Overlay StreamGraph
a StreamGraph
b -> case StreamGraph -> RewriteRule -> Maybe (StreamGraph -> StreamGraph)
firstMatch StreamGraph
a RewriteRule
f of
                                Just StreamGraph -> StreamGraph
f  -> (StreamGraph -> StreamGraph) -> Maybe (StreamGraph -> StreamGraph)
forall a. a -> Maybe a
Just StreamGraph -> StreamGraph
f
                                Maybe (StreamGraph -> StreamGraph)
Nothing -> StreamGraph -> RewriteRule -> Maybe (StreamGraph -> StreamGraph)
firstMatch StreamGraph
b RewriteRule
f
            Connect StreamGraph
a StreamGraph
b -> case StreamGraph -> RewriteRule -> Maybe (StreamGraph -> StreamGraph)
firstMatch StreamGraph
a RewriteRule
f of
                                Just StreamGraph -> StreamGraph
f  -> (StreamGraph -> StreamGraph) -> Maybe (StreamGraph -> StreamGraph)
forall a. a -> Maybe a
Just StreamGraph -> StreamGraph
f
                                Maybe (StreamGraph -> StreamGraph)
Nothing -> StreamGraph -> RewriteRule -> Maybe (StreamGraph -> StreamGraph)
firstMatch StreamGraph
b RewriteRule
f

-- N-bounded recursive rule traversal
-- (caller may wish to apply 'nub')
applyRules :: [RewriteRule] -> Int -> StreamGraph -> [StreamGraph]
applyRules :: [RewriteRule] -> Int -> StreamGraph -> [StreamGraph]
applyRules [RewriteRule]
rs Int
n StreamGraph
sg =
        if   Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
1 then [StreamGraph
sg]
        else let
             sgs :: [StreamGraph]
sgs = ((StreamGraph -> StreamGraph) -> StreamGraph)
-> [StreamGraph -> StreamGraph] -> [StreamGraph]
forall a b. (a -> b) -> [a] -> [b]
map (StreamGraph -> (StreamGraph -> StreamGraph) -> StreamGraph
forall a b. a -> (a -> b) -> b
(&) StreamGraph
sg) ([StreamGraph -> StreamGraph] -> [StreamGraph])
-> [StreamGraph -> StreamGraph] -> [StreamGraph]
forall a b. (a -> b) -> a -> b
$ (RewriteRule -> Maybe (StreamGraph -> StreamGraph))
-> [RewriteRule] -> [StreamGraph -> StreamGraph]
forall a b. (a -> Maybe b) -> [a] -> [b]
mapMaybe (StreamGraph -> RewriteRule -> Maybe (StreamGraph -> StreamGraph)
firstMatch StreamGraph
sg) [RewriteRule]
rs
             in    StreamGraph
sg StreamGraph -> [StreamGraph] -> [StreamGraph]
forall a. a -> [a] -> [a]
: [StreamGraph]
sgs [StreamGraph] -> [StreamGraph] -> [StreamGraph]
forall a. [a] -> [a] -> [a]
++ ((StreamGraph -> [StreamGraph]) -> [StreamGraph] -> [StreamGraph]
forall (t :: * -> *) a b. Foldable t => (a -> [b]) -> t a -> [b]
concatMap ([RewriteRule] -> Int -> StreamGraph -> [StreamGraph]
applyRules [RewriteRule]
rs (Int
nInt -> Int -> Int
forall a. Num a => a -> a -> a
-Int
1)) [StreamGraph]
sgs)

------------------------------------------------------------------------------

rules :: [RewriteRule]
rules :: [RewriteRule]
rules = [ RewriteRule
filterFuse
        , RewriteRule
mapFilter
        , RewriteRule
filterFilterAcc
        , RewriteRule
filterAccFilter
        , RewriteRule
filterAccFilterAcc
        , RewriteRule
mapFuse
        , RewriteRule
mapScan
        , RewriteRule
expandFilter
        , RewriteRule
mapFilterAcc
        , RewriteRule
mapWindow
        , RewriteRule
expandMap
        , RewriteRule
expandScan
        , RewriteRule
expandExpand
        , RewriteRule
mergeFilter
        , RewriteRule
mergeExpand
        , RewriteRule
mergeMap
        , RewriteRule
mapMerge
        , RewriteRule
filterMerge
        , RewriteRule
expandMerge
        , RewriteRule
mergeFuse
        , RewriteRule
expandFilterAcc
        ]
defaultRewriteRules :: [RewriteRule]
defaultRewriteRules = [RewriteRule]
rules

-- streamFilter f >>> streamFilter g = streamFilter (\x -> f x && g x) -------

filterFuse :: RewriteRule
filterFuse :: RewriteRule
filterFuse (Connect (Vertex a :: StreamVertex
a@(StreamVertex Int
i (Filter Double
sel1) (ExpQ
p:[ExpQ]
_) String
ty String
_ Double
s1))
                    (Vertex b :: StreamVertex
b@(StreamVertex Int
_ (Filter Double
sel2) (ExpQ
q:[ExpQ]
_) String
_ String
_ Double
s2))) =
    let c :: StreamVertex
c = StreamVertex
a { operator :: StreamOperator
operator    = Double -> StreamOperator
Filter (Double
sel1 Double -> Double -> Double
forall a. Num a => a -> a -> a
* Double
sel2)
              , parameters :: [ExpQ]
parameters  = [[| (\p q x -> p x && q x) $(p) $(q) |]]
              , serviceTime :: Double
serviceTime = Double -> Double -> Double -> Double
sumTimes Double
s1 Double
sel1 Double
s2
              }
    in (StreamGraph -> StreamGraph) -> Maybe (StreamGraph -> StreamGraph)
forall a. a -> Maybe a
Just (StreamVertex -> StreamVertex -> StreamGraph -> StreamGraph
forall a. Eq a => a -> a -> Graph a -> Graph a
removeEdge StreamVertex
c StreamVertex
c (StreamGraph -> StreamGraph)
-> (StreamGraph -> StreamGraph) -> StreamGraph -> StreamGraph
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (StreamVertex -> Bool)
-> StreamVertex -> StreamGraph -> StreamGraph
forall a. (a -> Bool) -> a -> Graph a -> Graph a
mergeVertices (StreamVertex -> [StreamVertex] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` [StreamVertex
a,StreamVertex
b]) StreamVertex
c)

filterFuse StreamGraph
_ = Maybe (StreamGraph -> StreamGraph)
forall a. Maybe a
Nothing

gt3 :: ExpQ
gt3 = [| (>3) |]
lt5 :: ExpQ
lt5 = [| (<5) |]

so' :: StreamVertex
so' = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 (Double -> StreamOperator
Source Double
1)   []    String
"String" String
"String" Double
1
f3 :: StreamVertex
f3  = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 (Double -> StreamOperator
Filter Double
0.5) [ExpQ
gt3] String
"String" String
"String" Double
0.1
f4 :: StreamVertex
f4  = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
2 (Double -> StreamOperator
Filter Double
0.5) [ExpQ
lt5] String
"String" String
"String" Double
0.2
si' :: StreamVertex
si' = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
3 StreamOperator
Sink         []    String
"String" String
"String" Double
1

fused :: ExpQ
fused = [| (\p q x -> p x && q x) (>3) (<5) |]

filterFusePre :: StreamGraph
filterFusePre = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path [StreamVertex
so', StreamVertex
f3, StreamVertex
f4, StreamVertex
si']

filterFusePost :: StreamGraph
filterFusePost = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path [ StreamVertex
so'
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 (Double -> StreamOperator
Filter Double
0.25) [ExpQ
fused] String
"String" String
"String" Double
0.2
    , StreamVertex
si' ]

test_filterFuse :: IO ()
test_filterFuse = StreamGraph -> StreamGraph -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
assertEqual (RewriteRule -> StreamGraph -> StreamGraph
applyRule RewriteRule
filterFuse StreamGraph
filterFusePre)
    StreamGraph
filterFusePost

-- streamMap f >>> streamFilter p = streamFilter (f >>> p) >>> streamMap f ---
-- TODO: should we decrease the service rate of the filter?

mapFilter :: RewriteRule
mapFilter :: RewriteRule
mapFilter (Connect (Vertex m :: StreamVertex
m@(StreamVertex Int
i StreamOperator
Map (ExpQ
f:[ExpQ]
_) String
intype String
_ Double
sm))
                   (Vertex f1 :: StreamVertex
f1@(StreamVertex Int
j (Filter Double
sel) (ExpQ
p:[ExpQ]
_) String
_ String
_ Double
sf))) =

    let f2 :: StreamVertex
f2 = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
i (Double -> StreamOperator
Filter Double
sel) [[| $(p) . $(f) |]] String
intype String
intype (Double
smDouble -> Double -> Double
forall a. Num a => a -> a -> a
+Double
sf)
        m2 :: StreamVertex
m2 = StreamVertex
m { vertexId :: Int
vertexId = Int
j }
    in  (StreamGraph -> StreamGraph) -> Maybe (StreamGraph -> StreamGraph)
forall a. a -> Maybe a
Just (StreamVertex -> StreamVertex -> StreamGraph -> StreamGraph
forall a. Eq a => a -> a -> Graph a -> Graph a
replaceVertex StreamVertex
f1 StreamVertex
m2 (StreamGraph -> StreamGraph)
-> (StreamGraph -> StreamGraph) -> StreamGraph -> StreamGraph
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamVertex -> StreamVertex -> StreamGraph -> StreamGraph
forall a. Eq a => a -> a -> Graph a -> Graph a
replaceVertex StreamVertex
m StreamVertex
f2)

mapFilter StreamGraph
_ = Maybe (StreamGraph -> StreamGraph)
forall a. Maybe a
Nothing

m1 :: StreamVertex
m1 = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 StreamOperator
Map [[| show |]] String
"Int" String
"String" Double
1
f1 :: StreamVertex
f1 = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
2 (Double -> StreamOperator
Filter Double
0.5) [[| \x -> length x <3 |]] String
"String" String
"String" Double
1

f2 :: StreamVertex
f2 = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 (Double -> StreamOperator
Filter Double
0.5) [[| (\x -> length x <3) . (show) |]] String
"Int" String
"Int" Double
2
m2 :: StreamVertex
m2 = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
2 StreamOperator
Map [[| show |]] String
"Int" String
"String" Double
1

so :: StreamVertex
so = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 (Double -> StreamOperator
Source Double
1) [] String
"Int" String
"Int" Double
1
si :: StreamVertex
si = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
3 StreamOperator
Sink [] String
"String" String
"String" Double
1

mapFilterPre :: StreamGraph
mapFilterPre  = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path [ StreamVertex
so, StreamVertex
m1, StreamVertex
f1, StreamVertex
si ]
mapFilterPost :: StreamGraph
mapFilterPost = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path [ StreamVertex
so, StreamVertex
f2, StreamVertex
m2, StreamVertex
si ]

test_mapfilter2 :: IO ()
test_mapfilter2 = StreamGraph -> StreamGraph -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
assertEqual StreamGraph
mapFilterPost
    (StreamGraph -> IO ()) -> StreamGraph -> IO ()
forall a b. (a -> b) -> a -> b
$ RewriteRule -> StreamGraph -> StreamGraph
applyRule RewriteRule
mapFilter StreamGraph
mapFilterPre

-- test it finds matches in sub-graphs
mapFilterSub :: StreamGraph
mapFilterSub = StreamGraph
mapFilterPre StreamGraph -> StreamGraph -> StreamGraph
forall a. Graph a -> Graph a -> Graph a
`Overlay` StreamGraph
forall a. Graph a
Empty
test_mapfilter3 :: IO ()
test_mapfilter3 = StreamGraph -> StreamGraph -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
assertEqual StreamGraph
mapFilterPost
    (StreamGraph -> IO ()) -> StreamGraph -> IO ()
forall a b. (a -> b) -> a -> b
$ RewriteRule -> StreamGraph -> StreamGraph
applyRule RewriteRule
mapFilter StreamGraph
mapFilterSub

-- deeper sub-graphs and some redundancy
mapFilterSub2 :: StreamGraph
mapFilterSub2 = StreamGraph
forall a. Graph a
Empty StreamGraph -> StreamGraph -> StreamGraph
forall a. Graph a -> Graph a -> Graph a
`Overlay` StreamGraph
forall a. Graph a
Empty StreamGraph -> StreamGraph -> StreamGraph
forall a. Graph a -> Graph a -> Graph a
`Overlay` StreamGraph
mapFilterPre StreamGraph -> StreamGraph -> StreamGraph
forall a. Graph a -> Graph a -> Graph a
`Overlay` StreamGraph
mapFilterPre
test_mapfilter4 :: IO ()
test_mapfilter4 = StreamGraph -> StreamGraph -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
assertEqual StreamGraph
mapFilterPost
    (StreamGraph -> IO ()) -> StreamGraph -> IO ()
forall a b. (a -> b) -> a -> b
$ RewriteRule -> StreamGraph -> StreamGraph
applyRule RewriteRule
mapFilter StreamGraph
mapFilterSub2

-- vertex ordering is preserved
test_mapFilter_ord :: IO ()
test_mapFilter_ord = HasCallStack => Bool -> IO ()
Bool -> IO ()
assertBool (Bool -> IO ()) -> Bool -> IO ()
forall a b. (a -> b) -> a -> b
$ [Int] -> Bool
forall a. Ord a => [a] -> Bool
sorted ([Int] -> Bool) -> [Int] -> Bool
forall a b. (a -> b) -> a -> b
$ (StreamVertex -> Int) -> [StreamVertex] -> [Int]
forall a b. (a -> b) -> [a] -> [b]
map StreamVertex -> Int
vertexId (StreamGraph -> [StreamVertex]
forall a. Ord a => Graph a -> [a]
vertexList StreamGraph
mapFilterPost)

sorted :: Ord a => [a] -> Bool
sorted :: forall a. Ord a => [a] -> Bool
sorted [] = Bool
True
sorted [a
x] = Bool
True
sorted (a
x:a
y:[a]
zz) = (a
x a -> a -> Bool
forall a. Ord a => a -> a -> Bool
<= a
y) Bool -> Bool -> Bool
&& [a] -> Bool
forall a. Ord a => [a] -> Bool
sorted (a
ya -> [a] -> [a]
forall a. a -> [a] -> [a]
:[a]
zz)

-- streamFilter >>> streamFilterAcc f a q ------------------------------------

filterFilterAcc :: RewriteRule
filterFilterAcc :: RewriteRule
filterFilterAcc (Connect (Vertex v1 :: StreamVertex
v1@(StreamVertex Int
i (Filter Double
sel1) (ExpQ
p:[ExpQ]
_) String
ty String
_ Double
s1))
                         (Vertex v2 :: StreamVertex
v2@(StreamVertex Int
_ (FilterAcc Double
sel2) (ExpQ
f:ExpQ
a:ExpQ
q:[ExpQ]
_) String
_ String
_ Double
s2))) =
    let v3 :: StreamVertex
v3 = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
i (Double -> StreamOperator
FilterAcc (Double
sel1Double -> Double -> Double
forall a. Num a => a -> a -> a
*Double
sel2))
          [ [| \a v -> if $(p) v then $(f) a v else a |]
          , ExpQ
a
          , [| \v a -> $(p) v && $(q) v a |] ] String
ty String
ty (Double -> Double -> Double -> Double
sumTimes Double
s1 Double
sel1 Double
s2)
    in  (StreamGraph -> StreamGraph) -> Maybe (StreamGraph -> StreamGraph)
forall a. a -> Maybe a
Just (StreamVertex -> StreamVertex -> StreamGraph -> StreamGraph
forall a. Eq a => a -> a -> Graph a -> Graph a
removeEdge StreamVertex
v3 StreamVertex
v3 (StreamGraph -> StreamGraph)
-> (StreamGraph -> StreamGraph) -> StreamGraph -> StreamGraph
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (StreamVertex -> Bool)
-> StreamVertex -> StreamGraph -> StreamGraph
forall a. (a -> Bool) -> a -> Graph a -> Graph a
mergeVertices (StreamVertex -> [StreamVertex] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` [StreamVertex
v1,StreamVertex
v2]) StreamVertex
v3)
filterFilterAcc StreamGraph
_ = Maybe (StreamGraph -> StreamGraph)
forall a. Maybe a
Nothing

filterFilterAccPre :: StreamGraph
filterFilterAccPre = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path
    [ Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 (Double -> StreamOperator
Source Double
1) [] String
"Int" String
"Int" Double
1
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 (Double -> StreamOperator
Filter Double
0.5) [ExpQ
p] String
"Int" String
"Int" Double
0.1
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
2 (Double -> StreamOperator
FilterAcc Double
0.5) [ExpQ
f , ExpQ
a , ExpQ
q] String
"Int" String
"Int" Double
0.2
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
3 StreamOperator
Sink [] String
"Int" String
"Int" Double
1
    ]
    where p :: ExpQ
p = [| (>3) |]
          f :: ExpQ
f = [| (\_ h -> (False, h)) |]
          a :: ExpQ
a = [| (True, undefined) |]
          q :: ExpQ
q = [| \new (b,old) -> b || old /= new |]

filterFilterAccPost :: StreamGraph
filterFilterAccPost = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path
    [ Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 (Double -> StreamOperator
Source Double
1) [] String
"Int" String
"Int" Double
1
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 (Double -> StreamOperator
FilterAcc Double
0.25)
        [ [| \a v -> if $(p) v then $(f) a v else a |]
        , ExpQ
a
        , [| \v a -> $(p) v && $(q) v a |]
        ] String
"Int" String
"Int" Double
0.2
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
3 StreamOperator
Sink [] String
"Int" String
"Int" Double
1
    ]
    where p :: ExpQ
p = [| (>3) |]
          f :: ExpQ
f = [| (\_ h -> (False, h)) |]
          a :: ExpQ
a = [| (True, undefined) |]
          q :: ExpQ
q = [| \new (b,old) -> b || old /= new |]

test_filterFilterAcc :: IO ()
test_filterFilterAcc = StreamGraph -> StreamGraph -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
assertEqual (RewriteRule -> StreamGraph -> StreamGraph
applyRule RewriteRule
filterFilterAcc StreamGraph
filterFilterAccPre)
    StreamGraph
filterFilterAccPost

-- streamFilterAcc >>> streamFilter ------------------------------------------

filterAccFilter :: RewriteRule
filterAccFilter :: RewriteRule
filterAccFilter (Connect (Vertex v1 :: StreamVertex
v1@(StreamVertex Int
i (FilterAcc Double
sel1) (ExpQ
f:ExpQ
a:ExpQ
p:[ExpQ]
_) String
ty String
_ Double
s1))
                         (Vertex v2 :: StreamVertex
v2@(StreamVertex Int
_ (Filter Double
sel2) (ExpQ
q:[ExpQ]
_) String
_ String
_ Double
s2))) =
    let p' :: ExpQ
p' = [| \v a -> $(p) v a && $(q) v |]
        v :: StreamVertex
v  = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
i (Double -> StreamOperator
FilterAcc (Double
sel1Double -> Double -> Double
forall a. Num a => a -> a -> a
*Double
sel2)) [ExpQ
f,ExpQ
a,ExpQ
p'] String
ty String
ty (Double -> Double -> Double -> Double
sumTimes Double
s1 Double
sel1 Double
s2)
    in  (StreamGraph -> StreamGraph) -> Maybe (StreamGraph -> StreamGraph)
forall a. a -> Maybe a
Just (StreamVertex -> StreamVertex -> StreamGraph -> StreamGraph
forall a. Eq a => a -> a -> Graph a -> Graph a
removeEdge StreamVertex
v StreamVertex
v (StreamGraph -> StreamGraph)
-> (StreamGraph -> StreamGraph) -> StreamGraph -> StreamGraph
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (StreamVertex -> Bool)
-> StreamVertex -> StreamGraph -> StreamGraph
forall a. (a -> Bool) -> a -> Graph a -> Graph a
mergeVertices (StreamVertex -> [StreamVertex] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` [StreamVertex
v1,StreamVertex
v2]) StreamVertex
v)
filterAccFilter StreamGraph
_ = Maybe (StreamGraph -> StreamGraph)
forall a. Maybe a
Nothing

filterAccFilterPre :: StreamGraph
filterAccFilterPre = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path
    [ Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 (Double -> StreamOperator
Source Double
1) [] String
"Int" String
"Int" Double
1
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 (Double -> StreamOperator
FilterAcc Double
0.5) [ExpQ
f,ExpQ
a,ExpQ
p] String
"Int" String
"Int" Double
0.1
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
2 (Double -> StreamOperator
Filter Double
0.5)    [ExpQ
q] String
"Int" String
"Int" Double
0.2
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
3 StreamOperator
Sink [] String
"Int" String
"Int" Double
1
    ]
    where f :: ExpQ
f = [| (\_ h -> (False, h)) |]
          a :: ExpQ
a = [| (True, undefined) |]
          p :: ExpQ
p = [| \new (b,old) -> b || old /= new |]
          q :: ExpQ
q = [| (>3) |]

filterAccFilterPost :: StreamGraph
filterAccFilterPost = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path
    [ Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 (Double -> StreamOperator
Source Double
1) [] String
"Int" String
"Int" Double
1
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 (Double -> StreamOperator
FilterAcc Double
0.25) [ExpQ
f, ExpQ
a, ExpQ
p'] String
"Int" String
"Int" Double
0.2
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
3 StreamOperator
Sink [] String
"Int" String
"Int" Double
1
    ]
    where f :: ExpQ
f = [| (\_ h -> (False, h)) |]
          a :: ExpQ
a = [| (True, undefined) |]
          p :: ExpQ
p = [| \new (b,old) -> b || old /= new |]
          q :: ExpQ
q = [| (>3) |]
          p' :: ExpQ
p'= [| \v a -> $(p) v a && $(q) v |]

test_filterAccFilter :: IO ()
test_filterAccFilter = StreamGraph -> StreamGraph -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
assertEqual (RewriteRule -> StreamGraph -> StreamGraph
applyRule RewriteRule
filterAccFilter StreamGraph
filterAccFilterPre)
    StreamGraph
filterAccFilterPost

-- streamFilterAcc >>> streamFilterAcc ---------------------------------------



filterAccFilterAcc :: RewriteRule
filterAccFilterAcc :: RewriteRule
filterAccFilterAcc (Connect (Vertex v1 :: StreamVertex
v1@(StreamVertex Int
i (FilterAcc Double
sel1) (ExpQ
f:ExpQ
a:ExpQ
p:[ExpQ]
ss) String
ty String
_ Double
s1))
                            (Vertex v2 :: StreamVertex
v2@(StreamVertex Int
_ (FilterAcc Double
sel2) (ExpQ
g:ExpQ
b:ExpQ
q:[ExpQ]
_) String
_ String
_ Double
s2))) =
    let f' :: ExpQ
f' = [| \ (a,b) v -> ($(f) a v, if $(p) v a then $(g) b v else b) |]
        a' :: ExpQ
a' = [| ($(a), $(b)) |]
        q' :: ExpQ
q' = [| \v (y,z) -> $(p) v y && $(q) v z |]
        v :: StreamVertex
v  = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
i (Double -> StreamOperator
FilterAcc (Double
sel1Double -> Double -> Double
forall a. Num a => a -> a -> a
*Double
sel2)) (ExpQ
f'ExpQ -> [ExpQ] -> [ExpQ]
forall a. a -> [a] -> [a]
:ExpQ
a'ExpQ -> [ExpQ] -> [ExpQ]
forall a. a -> [a] -> [a]
:ExpQ
q'ExpQ -> [ExpQ] -> [ExpQ]
forall a. a -> [a] -> [a]
:[ExpQ]
ss) String
ty String
ty (Double -> Double -> Double -> Double
sumTimes Double
s1 Double
sel1 Double
s2)
    in  (StreamGraph -> StreamGraph) -> Maybe (StreamGraph -> StreamGraph)
forall a. a -> Maybe a
Just (StreamVertex -> StreamVertex -> StreamGraph -> StreamGraph
forall a. Eq a => a -> a -> Graph a -> Graph a
removeEdge StreamVertex
v StreamVertex
v (StreamGraph -> StreamGraph)
-> (StreamGraph -> StreamGraph) -> StreamGraph -> StreamGraph
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (StreamVertex -> Bool)
-> StreamVertex -> StreamGraph -> StreamGraph
forall a. (a -> Bool) -> a -> Graph a -> Graph a
mergeVertices (StreamVertex -> [StreamVertex] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` [StreamVertex
v1,StreamVertex
v2]) StreamVertex
v)
filterAccFilterAcc StreamGraph
_ = Maybe (StreamGraph -> StreamGraph)
forall a. Maybe a
Nothing

filterAccFilterAccPre :: StreamGraph
filterAccFilterAccPre = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path
    [ Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 (Double -> StreamOperator
Source Double
1) [] String
"Int" String
"Int" Double
1
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 (Double -> StreamOperator
FilterAcc Double
0.5) [ExpQ
f,ExpQ
a,ExpQ
p] String
"Int" String
"Int" Double
0.1
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
2 (Double -> StreamOperator
FilterAcc Double
0.5) [ExpQ
g,ExpQ
b,ExpQ
q] String
"Int" String
"Int" Double
0.2
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
3 StreamOperator
Sink [] String
"Int" String
"Int" Double
1
    ]
    where
        -- remove repeating elements
        f :: ExpQ
f = [| (\_ h -> (False, h)) |]
        a :: ExpQ
a = [| (True, undefined) |]
        p :: ExpQ
p = [| \new (b,old) -> b || old /= new |]
        -- increasing +ve values only
        g :: ExpQ
g = [| \_ v -> v |]
        b :: ExpQ
b = [| 0 |]
        q :: ExpQ
q = [| (>=) |]

filterAccFilterAccPost :: StreamGraph
filterAccFilterAccPost = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path
    [ Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 (Double -> StreamOperator
Source Double
1) [] String
"Int" String
"Int" Double
1
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 (Double -> StreamOperator
FilterAcc Double
0.25)
        [ [| \(a,b) v -> ($(f) a v, if $(p) v a then $(g) b v else b) |]
        , [| ($(a),$(b)) |]
        , [| \v (y,z) -> $(p) v y && $(q) v z |]
        ] String
"Int" String
"Int" Double
0.2
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
3 StreamOperator
Sink [] String
"Int" String
"Int" Double
1
    ]
    where f :: ExpQ
f = [| (\_ h -> (False, h)) |]
          a :: ExpQ
a = [| (True, undefined) |]
          p :: ExpQ
p = [| \new (b,old) -> b || old /= new |]
          g :: ExpQ
g = [| \_ v -> v |]
          b :: ExpQ
b = [| 0 |]
          q :: ExpQ
q = [| (>=) |]

test_filterAccFilterAcc :: IO ()
test_filterAccFilterAcc = StreamGraph -> StreamGraph -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
assertEqual (RewriteRule -> StreamGraph -> StreamGraph
applyRule RewriteRule
filterAccFilterAcc StreamGraph
filterAccFilterAccPre)
    StreamGraph
filterAccFilterAccPost

-- streamMap >>> streamMap ---------------------------------------------------

mapFuse :: RewriteRule
mapFuse :: RewriteRule
mapFuse (Connect (Vertex v1 :: StreamVertex
v1@(StreamVertex Int
i StreamOperator
Map (ExpQ
f:[ExpQ]
ss) String
t1 String
_ Double
s1))
                 (Vertex v2 :: StreamVertex
v2@(StreamVertex Int
_ StreamOperator
Map (ExpQ
g:[ExpQ]
_) String
_ String
t2 Double
s2))) =
    let v :: StreamVertex
v = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
i StreamOperator
Map ([| $(f) >>> $(g) |]ExpQ -> [ExpQ] -> [ExpQ]
forall a. a -> [a] -> [a]
:[ExpQ]
ss) String
t1 String
t2 (Double -> Double -> Double -> Double
sumTimes Double
s1 Double
1 Double
s2)
    in  (StreamGraph -> StreamGraph) -> Maybe (StreamGraph -> StreamGraph)
forall a. a -> Maybe a
Just (StreamVertex -> StreamVertex -> StreamGraph -> StreamGraph
forall a. Eq a => a -> a -> Graph a -> Graph a
removeEdge StreamVertex
v StreamVertex
v (StreamGraph -> StreamGraph)
-> (StreamGraph -> StreamGraph) -> StreamGraph -> StreamGraph
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (StreamVertex -> Bool)
-> StreamVertex -> StreamGraph -> StreamGraph
forall a. (a -> Bool) -> a -> Graph a -> Graph a
mergeVertices (StreamVertex -> [StreamVertex] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` [StreamVertex
v1,StreamVertex
v2]) StreamVertex
v)
mapFuse StreamGraph
_ = Maybe (StreamGraph -> StreamGraph)
forall a. Maybe a
Nothing

mapFusePre :: StreamGraph
mapFusePre = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path
    [ Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 (Double -> StreamOperator
Source Double
1) [] String
"String" String
"String" Double
1
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 StreamOperator
Map [[| show |]] String
"Int" String
"String" Double
1
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
2 StreamOperator
Map [[| length |]] String
"String" String
"Int" Double
1
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
3 StreamOperator
Sink [] String
"Int" String
"Int" Double
1
    ]

mapFusePost :: StreamGraph
mapFusePost = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path
    [ Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 (Double -> StreamOperator
Source Double
1) [] String
"String" String
"String" Double
1
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 StreamOperator
Map [[| show >>> length |]] String
"Int" String
"Int" Double
2
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
3 StreamOperator
Sink [] String
"Int" String
"Int" Double
1
    ]
test_mapFuse :: IO ()
test_mapFuse = StreamGraph -> StreamGraph -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
assertEqual (RewriteRule -> StreamGraph -> StreamGraph
applyRule RewriteRule
mapFuse StreamGraph
mapFusePre) StreamGraph
mapFusePost

-- streamMap >>> streamScan --------------------------------------------------

mapScan :: RewriteRule
mapScan :: RewriteRule
mapScan (Connect (Vertex v1 :: StreamVertex
v1@(StreamVertex Int
i StreamOperator
Map (ExpQ
f:[ExpQ]
ss) String
t1 String
_ Double
s1))
                 (Vertex v2 :: StreamVertex
v2@(StreamVertex Int
_ StreamOperator
Scan (ExpQ
g:ExpQ
a:[ExpQ]
_) String
_ String
t2 Double
s2))) =
    let v :: StreamVertex
v = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
i StreamOperator
Scan ([| flip (flip $(f) >>> $(g)) |]ExpQ -> [ExpQ] -> [ExpQ]
forall a. a -> [a] -> [a]
:ExpQ
aExpQ -> [ExpQ] -> [ExpQ]
forall a. a -> [a] -> [a]
:[ExpQ]
ss) String
t1 String
t2 (Double -> Double -> Double -> Double
sumTimes Double
s1 Double
1 Double
s2) 
    in  (StreamGraph -> StreamGraph) -> Maybe (StreamGraph -> StreamGraph)
forall a. a -> Maybe a
Just (StreamVertex -> StreamVertex -> StreamGraph -> StreamGraph
forall a. Eq a => a -> a -> Graph a -> Graph a
removeEdge StreamVertex
v StreamVertex
v (StreamGraph -> StreamGraph)
-> (StreamGraph -> StreamGraph) -> StreamGraph -> StreamGraph
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (StreamVertex -> Bool)
-> StreamVertex -> StreamGraph -> StreamGraph
forall a. (a -> Bool) -> a -> Graph a -> Graph a
mergeVertices (StreamVertex -> [StreamVertex] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` [StreamVertex
v1,StreamVertex
v2]) StreamVertex
v)
mapScan StreamGraph
_ = Maybe (StreamGraph -> StreamGraph)
forall a. Maybe a
Nothing

mapScanPre :: StreamGraph
mapScanPre = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path
    [ Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 (Double -> StreamOperator
Source Double
1) [] String
"Int" String
"Int" Double
1
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 StreamOperator
Map [ExpQ
f] String
"Int" String
"Int" Double
1
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
2 StreamOperator
Scan [ExpQ
g,ExpQ
a] String
"Int" String
"Int" Double
1
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
3 StreamOperator
Sink [] String
"Int" String
"Int" Double
1
    ]
    where
        f :: ExpQ
f = [| (+1) |]
        g :: ExpQ
g = [| \c _ -> c +1|]
        a :: ExpQ
a = [| 0 |]

mapScanPost :: StreamGraph
mapScanPost = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path
    [ Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 (Double -> StreamOperator
Source Double
1) [] String
"Int" String
"Int" Double
1
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 StreamOperator
Scan [[| flip (flip $(f) >>> $(g))|], [| $(a) |]] String
"Int" String
"Int" Double
2
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
3 StreamOperator
Sink [] String
"Int" String
"Int" Double
1
    ]
    where
        f :: ExpQ
f = [| (+1) |]
        g :: ExpQ
g = [| \c _ -> c +1|]
        a :: ExpQ
a = [| 0 |]

test_mapScan :: IO ()
test_mapScan = StreamGraph -> StreamGraph -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
assertEqual (RewriteRule -> StreamGraph -> StreamGraph
applyRule RewriteRule
mapScan StreamGraph
mapScanPre) StreamGraph
mapScanPost

-- streamExpand >>> streamFilter f == streamMap (filter f) >>> streamExpand --
-- TODO: assuming that the serviceTime for the new map matches the old filter
-- Note that the filter selectivity information is lost

expandFilter :: RewriteRule
expandFilter :: RewriteRule
expandFilter (Connect (Vertex e :: StreamVertex
e@(StreamVertex Int
j StreamOperator
Expand [ExpQ]
_ String
t1 String
t2 Double
se))
                      (Vertex f :: StreamVertex
f@(StreamVertex Int
i (Filter Double
_) (ExpQ
p:[ExpQ]
_) String
_ String
_ Double
sf))) =
    let m :: StreamVertex
m = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
j StreamOperator
Map [[| filter $(p) |]] String
t1 String
t1 Double
sf
        e' :: StreamVertex
e'= Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
i StreamOperator
Expand [] String
t1 String
t2 Double
se
    in  (StreamGraph -> StreamGraph) -> Maybe (StreamGraph -> StreamGraph)
forall a. a -> Maybe a
Just (StreamVertex -> StreamVertex -> StreamGraph -> StreamGraph
forall a. Eq a => a -> a -> Graph a -> Graph a
replaceVertex StreamVertex
f StreamVertex
e' (StreamGraph -> StreamGraph)
-> (StreamGraph -> StreamGraph) -> StreamGraph -> StreamGraph
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamVertex -> StreamVertex -> StreamGraph -> StreamGraph
forall a. Eq a => a -> a -> Graph a -> Graph a
replaceVertex StreamVertex
e StreamVertex
m)
expandFilter StreamGraph
_ = Maybe (StreamGraph -> StreamGraph)
forall a. Maybe a
Nothing

expandFilterPre :: StreamGraph
expandFilterPre = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path
    [ Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 (Double -> StreamOperator
Source Double
1)       [] String
"[Int]" String
"[Int]" Double
1
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 StreamOperator
Expand       [] String
"[Int]" String
"Int" Double
2
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
2 (Double -> StreamOperator
Filter Double
0.5) [[|$(p)|]] String
"Int" String
"Int" Double
3
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
3 StreamOperator
Sink         [] String
"Int" String
"Int" Double
4
    ]
    where
        p :: ExpQ
p = [| (>3) |]

expandFilterPost :: StreamGraph
expandFilterPost = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path
    [ Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 (Double -> StreamOperator
Source Double
1) [] String
"[Int]" String
"[Int]" Double
1
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 StreamOperator
Map [[|filter $(p) |]] String
"[Int]" String
"[Int]" Double
3
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
2 StreamOperator
Expand [] String
"[Int]" String
"Int" Double
2
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
3 StreamOperator
Sink [] String
"Int" String
"Int" Double
4
    ]
    where
        p :: ExpQ
p = [| (>3) |]

test_expandFilter :: IO ()
test_expandFilter = StreamGraph -> StreamGraph -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
assertEqual (RewriteRule -> StreamGraph -> StreamGraph
applyRule RewriteRule
expandFilter StreamGraph
expandFilterPre) StreamGraph
expandFilterPost

-- streamMap f >>> streamFilterAcc g a p == streamFilterAcc g a (f >>> p) >>> streamMap f

mapFilterAcc :: RewriteRule
mapFilterAcc :: RewriteRule
mapFilterAcc (Connect (Vertex m :: StreamVertex
m@(StreamVertex Int
i StreamOperator
Map (ExpQ
f:[ExpQ]
_) String
t1 String
_ Double
sm))
                      (Vertex f1 :: StreamVertex
f1@(StreamVertex Int
j (FilterAcc Double
sel) (ExpQ
g:ExpQ
a:ExpQ
p:[ExpQ]
_) String
_ String
_ Double
sf))) =

    let f2 :: StreamVertex
f2 = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
i (Double -> StreamOperator
FilterAcc Double
sel) [ExpQ
g, ExpQ
a, [| ($f) >>> $(p) |]] String
t1 String
t1 (Double -> Double -> Double -> Double
sumTimes Double
sm Double
1 Double
sf)
        m2 :: StreamVertex
m2 = StreamVertex
m { vertexId :: Int
vertexId = Int
j }
    in  (StreamGraph -> StreamGraph) -> Maybe (StreamGraph -> StreamGraph)
forall a. a -> Maybe a
Just (StreamVertex -> StreamVertex -> StreamGraph -> StreamGraph
forall a. Eq a => a -> a -> Graph a -> Graph a
replaceVertex StreamVertex
f1 StreamVertex
m2 (StreamGraph -> StreamGraph)
-> (StreamGraph -> StreamGraph) -> StreamGraph -> StreamGraph
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamVertex -> StreamVertex -> StreamGraph -> StreamGraph
forall a. Eq a => a -> a -> Graph a -> Graph a
replaceVertex StreamVertex
m StreamVertex
f2)

mapFilterAcc StreamGraph
_ = Maybe (StreamGraph -> StreamGraph)
forall a. Maybe a
Nothing

mapFilterAccPre :: StreamGraph
mapFilterAccPre = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path
    [ Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 (Double -> StreamOperator
Source Double
1) [] String
"Int" String
"Int" Double
1
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 StreamOperator
Map [ExpQ
f] String
"Int" String
"String" Double
1
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
2 (Double -> StreamOperator
FilterAcc Double
0.5) [ExpQ
g,ExpQ
a,ExpQ
p] String
"String" String
"String" Double
1
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
3 StreamOperator
Sink [] String
"String" String
"String" Double
1
    ]
    where
        f :: ExpQ
f = [| (+1) |]
        g :: ExpQ
g = [| (\_ h -> (False, h)) |]
        a :: ExpQ
a = [| (True, undefined) |]
        p :: ExpQ
p = [| \new (b,old) -> b || old /= new |]

mapFilterAccPost :: StreamGraph
mapFilterAccPost = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path
    [ Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 (Double -> StreamOperator
Source Double
1) [] String
"Int" String
"Int" Double
1
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 (Double -> StreamOperator
FilterAcc Double
0.5) [ExpQ
g,ExpQ
a, [| $(f) >>> $(p) |]] String
"Int" String
"Int" Double
2
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
2 StreamOperator
Map [ExpQ
f] String
"Int" String
"String" Double
1
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
3 StreamOperator
Sink [] String
"String" String
"String" Double
1
    ]
    where
        f :: ExpQ
f = [| (+1) |]
        g :: ExpQ
g = [| (\_ h -> (False, h)) |]
        a :: ExpQ
a = [| (True, undefined) |]
        p :: ExpQ
p = [| \new (b,old) -> b || old /= new |]

test_mapFilterAcc :: IO ()
test_mapFilterAcc = StreamGraph -> StreamGraph -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
assertEqual (RewriteRule -> StreamGraph -> StreamGraph
applyRule RewriteRule
mapFilterAcc StreamGraph
mapFilterAccPre) StreamGraph
mapFilterAccPost

-- streamMap f >>> streamWindow wm == streamWindow wm >>> streamMap (map f) --
-- TODO: assuming serviceTime for map is the same
-- This is only applicable when the map parameter has type (a -> a)

isTypeVariable :: String -> Bool
isTypeVariable :: String -> Bool
isTypeVariable = Char -> Bool
isLower (Char -> Bool) -> (String -> Char) -> String -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> Char
forall a. [a] -> a
head

-- could two types be plugged together?
-- we do not handle any typeclass constraints
compatibleTypes :: String -> String -> Bool
compatibleTypes :: String -> String -> Bool
compatibleTypes String
outT String
inT | String
outT String -> String -> Bool
forall a. Eq a => a -> a -> Bool
== String
inT        = Bool
True -- matching concrete types
                         | String -> Bool
isTypeVariable String
inT = Bool
True
                         | String -> Bool
isTypeVariable String
outT= Bool
True
                         | Bool
otherwise          = Bool
False

test_compatibleTypes :: IO ()
test_compatibleTypes = HasCallStack => Bool -> IO ()
Bool -> IO ()
assertBool (Bool -> IO ()) -> Bool -> IO ()
forall a b. (a -> b) -> a -> b
$ String -> String -> Bool
compatibleTypes String
"Int" String
"a"

mapWindow :: RewriteRule
mapWindow :: RewriteRule
mapWindow (Connect (Vertex m :: StreamVertex
m@(StreamVertex Int
i StreamOperator
Map    (ExpQ
f:[ExpQ]
_) String
mapInT String
mapOutT Double
sm))
                   (Vertex w :: StreamVertex
w@(StreamVertex Int
j StreamOperator
Window (ExpQ
wm:[ExpQ]
_) String
windowInT String
windowOutT Double
sw))) =

    if   Bool -> Bool
not (String -> String -> Bool
compatibleTypes String
mapInT String
windowInT)
    then Maybe (StreamGraph -> StreamGraph)
forall a. Maybe a
Nothing
    else let
        w2 :: StreamVertex
w2 = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
i StreamOperator
Window [ExpQ
wm] String
windowInT String
windowOutT Double
sw
        m2 :: StreamVertex
m2 = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
j StreamOperator
Map [[| map $(f) |]] (String
"["String -> String -> String
forall a. [a] -> [a] -> [a]
++String
mapInTString -> String -> String
forall a. [a] -> [a] -> [a]
++String
"]") (String
"["String -> String -> String
forall a. [a] -> [a] -> [a]
++String
mapOutTString -> String -> String
forall a. [a] -> [a] -> [a]
++String
"]") Double
sm
    in  (StreamGraph -> StreamGraph) -> Maybe (StreamGraph -> StreamGraph)
forall a. a -> Maybe a
Just (StreamVertex -> StreamVertex -> StreamGraph -> StreamGraph
forall a. Eq a => a -> a -> Graph a -> Graph a
replaceVertex StreamVertex
m StreamVertex
w2 (StreamGraph -> StreamGraph)
-> (StreamGraph -> StreamGraph) -> StreamGraph -> StreamGraph
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamVertex -> StreamVertex -> StreamGraph -> StreamGraph
forall a. Eq a => a -> a -> Graph a -> Graph a
replaceVertex StreamVertex
w StreamVertex
m2)

mapWindow StreamGraph
_ = Maybe (StreamGraph -> StreamGraph)
forall a. Maybe a
Nothing

-- concrete map, concrete window, map types match
mapWindowPre :: StreamGraph
mapWindowPre = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path
    [ Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 (Double -> StreamOperator
Source Double
1) [] String
"Int" String
"Int" Double
1
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 StreamOperator
Map    [[| (+1) |]] String
"Int" String
"Int" Double
2
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
2 StreamOperator
Window [[| chop 2 |]] String
"Int" String
"[Int]" Double
3
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
3 StreamOperator
Sink   [] String
"[Int]" String
"[Int]" Double
4
    ]

mapWindowPost :: StreamGraph
mapWindowPost = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path
    [ Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 (Double -> StreamOperator
Source Double
1) [] String
"Int" String
"Int" Double
1
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 StreamOperator
Window [[| chop 2 |]] String
"Int" String
"[Int]" Double
3
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
2 StreamOperator
Map    [[| map (+1) |]] String
"[Int]" String
"[Int]" Double
2
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
3 StreamOperator
Sink   [] String
"[Int]" String
"[Int]" Double
4
    ]

test_mapWindow :: IO ()
test_mapWindow = StreamGraph -> StreamGraph -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
assertEqual (RewriteRule -> StreamGraph -> StreamGraph
applyRule RewriteRule
mapWindow StreamGraph
mapWindowPre) StreamGraph
mapWindowPost

-- concrete map, concrete window, map types don't match
mapWindowPre2 :: StreamGraph
mapWindowPre2 = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path
    [ Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 StreamOperator
Map    [[| read   :: String -> Int  |]] String
"String" String
"Int"   Double
1
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 StreamOperator
Window [[| chop 2 :: WindowMaker Int|]] String
"Int"    String
"[Int]" Double
1
    ]

test_mapWindow2 :: IO ()
test_mapWindow2 = Maybe (StreamGraph -> StreamGraph) -> IO ()
forall a. HasCallStack => Maybe a -> IO ()
assertNothingNoShow (RewriteRule
mapWindow StreamGraph
mapWindowPre2)

-- concrete map, polymorphic window
mapWindowPre3 :: StreamGraph
mapWindowPre3 = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path
    [ Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 (Double -> StreamOperator
Source Double
1) [] String
"(Int,Int)" String
"(Int,Int)" Double
1
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 StreamOperator
Map [[|fst|]] String
"(Int,Int)" String
"Int" Double
0.0
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
2 StreamOperator
Window [[|chop 2|]] String
"a" String
"[a]" Double
0.0
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
3 StreamOperator
Sink   [] String
"[Int]" String
"[Int]" Double
4
    ]
mapWindowPost3 :: StreamGraph
mapWindowPost3 = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path
    [ Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 (Double -> StreamOperator
Source Double
1) [] String
"(Int,Int)" String
"(Int,Int)" Double
1
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 StreamOperator
Window [[|chop 2|]] String
"a" String
"[a]" Double
0.0
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
2 StreamOperator
Map [[|map fst|]] String
"[(Int,Int)]" String
"[Int]" Double
0.0
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
3 StreamOperator
Sink   [] String
"[Int]" String
"[Int]" Double
4
    ]
test_mapWindow3 :: IO ()
test_mapWindow3 = StreamGraph -> StreamGraph -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
assertEqual StreamGraph
mapWindowPost3 (RewriteRule -> StreamGraph -> StreamGraph
applyRule RewriteRule
mapWindow StreamGraph
mapWindowPre3)

-- polymorphic map, concrete window, map types match
mapWindowPre4 :: StreamGraph
mapWindowPre4 = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path
    [ Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 (Double -> StreamOperator
Source Double
1) [] String
"Int" String
"Int" Double
1
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 StreamOperator
Map [[|(+1)|]] String
"a" String
"a" Double
0.0
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
2 StreamOperator
Window [[|chop 2|]] String
"Int" String
"[Int]" Double
0.0
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
3 StreamOperator
Sink   [] String
"[Int]" String
"[Int]" Double
4
    ]
mapWindowPost4 :: StreamGraph
mapWindowPost4 = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path
    [ Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 (Double -> StreamOperator
Source Double
1) [] String
"Int" String
"Int" Double
1
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 StreamOperator
Window [[|chop 2|]] String
"Int" String
"[Int]" Double
0.0
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
2 StreamOperator
Map [[|map (+1)|]] String
"[a]" String
"[a]" Double
0.0
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
3 StreamOperator
Sink   [] String
"[Int]" String
"[Int]" Double
4
    ]
test_mapWindow4 :: IO ()
test_mapWindow4 = StreamGraph -> StreamGraph -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
assertEqual StreamGraph
mapWindowPost4 (RewriteRule -> StreamGraph -> StreamGraph
applyRule RewriteRule
mapWindow StreamGraph
mapWindowPre4)

-- polymorphic map, concrete window, map types don't match
mapWindowPre5 :: StreamGraph
mapWindowPre5 = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path
    [ Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 (Double -> StreamOperator
Source Double
1) [] String
"String" String
"String" Double
1
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 StreamOperator
Map [[|show|]] String
"a" String
"b" Double
0.0
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
2 StreamOperator
Window [[|chop 2|]] String
"String" String
"[String]" Double
0.0
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
3 StreamOperator
Sink   [] String
"[String]" String
"[String]" Double
4
    ]
test_mapWindow5 :: IO ()
test_mapWindow5 = Maybe (StreamGraph -> StreamGraph) -> IO ()
forall a. HasCallStack => Maybe a -> IO ()
assertNothingNoShow (RewriteRule
mapWindow StreamGraph
mapWindowPre5)

-- polymorphic map, polymorphic window
mapWindowPre6 :: StreamGraph
mapWindowPre6 = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path
    [ Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 (Double -> StreamOperator
Source Double
1) []       String
"String"   String
"String" Double
1
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 StreamOperator
Map [[|show|]]      String
"a"        String
"b" Double
0.0
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
2 StreamOperator
Window [[|chop 2|]] String
"c"        String
"[c]" Double
0.0
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
3 StreamOperator
Sink   []           String
"[String]" String
"[String]" Double
4
    ]
mapWindowPost6 :: StreamGraph
mapWindowPost6 = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path
    [ Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 (Double -> StreamOperator
Source Double
1) []       String
"String"   String
"String" Double
1
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 StreamOperator
Window [[|chop 2|]] String
"c"        String
"[c]" Double
0.0
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
2 StreamOperator
Map [[|map show|]]  String
"[a]"      String
"[b]" Double
0.0
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
3 StreamOperator
Sink   []           String
"[String]" String
"[String]" Double
4
    ]
test_mapWindow6 :: IO ()
test_mapWindow6 = StreamGraph -> StreamGraph -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
assertEqual StreamGraph
mapWindowPost6 (RewriteRule -> StreamGraph -> StreamGraph
applyRule RewriteRule
mapWindow StreamGraph
mapWindowPre6)

-- streamExpand >>> streamMap f == streamMap (map f) >>> streamExpand --------
-- [a]           a            b   [a]               [b]               b
-- TODO: assuming serviceTime for map unaffected

expandMap :: RewriteRule
expandMap :: RewriteRule
expandMap (Connect (Vertex e :: StreamVertex
e@(StreamVertex Int
i StreamOperator
Expand [ExpQ]
_ String
t1 String
_ Double
se))
                   (Vertex m :: StreamVertex
m@(StreamVertex Int
j StreamOperator
Map (ExpQ
f:[ExpQ]
_) String
_ String
t4 Double
sm))) =
    let t5 :: String
t5 = String
"[" String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
t4 String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
"]"
        m2 :: StreamVertex
m2 = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
i StreamOperator
Map [[| map $(f) |]] String
t1 String
t5 Double
sm
        e2 :: StreamVertex
e2 = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
j StreamOperator
Expand [] String
t5 String
t4 Double
se
    in  (StreamGraph -> StreamGraph) -> Maybe (StreamGraph -> StreamGraph)
forall a. a -> Maybe a
Just (StreamVertex -> StreamVertex -> StreamGraph -> StreamGraph
forall a. Eq a => a -> a -> Graph a -> Graph a
replaceVertex StreamVertex
m StreamVertex
e2 (StreamGraph -> StreamGraph)
-> (StreamGraph -> StreamGraph) -> StreamGraph -> StreamGraph
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamVertex -> StreamVertex -> StreamGraph -> StreamGraph
forall a. Eq a => a -> a -> Graph a -> Graph a
replaceVertex StreamVertex
e StreamVertex
m2)

expandMap StreamGraph
_ = Maybe (StreamGraph -> StreamGraph)
forall a. Maybe a
Nothing

expandMapPre :: StreamGraph
expandMapPre = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path
    [ Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 (Double -> StreamOperator
Source Double
1) [] String
"[Int]" String
"[Int]" Double
1
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 StreamOperator
Expand [] String
"[Int]" String
"Int" Double
2
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
2 StreamOperator
Map [[| show |]] String
"Int" String
"String" Double
3
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
3 StreamOperator
Sink [] String
"String" String
"String" Double
4
    ]

expandMapPost :: StreamGraph
expandMapPost = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path
    [ Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 (Double -> StreamOperator
Source Double
1) [] String
"[Int]" String
"[Int]" Double
1
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 StreamOperator
Map [[| map (show) |]] String
"[Int]" String
"[String]" Double
3
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
2 StreamOperator
Expand [] String
"[String]" String
"String" Double
2
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
3 StreamOperator
Sink [] String
"String" String
"String" Double
4
    ]

test_expandMap :: IO ()
test_expandMap = StreamGraph -> StreamGraph -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
assertEqual (RewriteRule -> StreamGraph -> StreamGraph
applyRule RewriteRule
expandMap StreamGraph
expandMapPre) StreamGraph
expandMapPost

-- streamExpand >>> streamScan f a == ----------------------------------------
--     streamFilter (not.null)
--         >>> streamScan (\b a' -> tail $ scanl f (last b) a') [a]
--         >>> streamExpand

-- TODO: assuming zero service time for the new filter, and the same for the
-- two scans. And 50% selectivity for the new filter! Which is a stab in the
-- dark.
expandScan :: RewriteRule
expandScan :: RewriteRule
expandScan (Connect (Vertex  e :: StreamVertex
e@(StreamVertex Int
i StreamOperator
Expand ([ExpQ]
_)     String
t1 String
t2 Double
se))
                    (Vertex sc :: StreamVertex
sc@(StreamVertex Int
j StreamOperator
Scan   (ExpQ
f:ExpQ
a:[ExpQ]
_) String
_  String
t3 Double
ss))) =
    (StreamGraph -> StreamGraph) -> Maybe (StreamGraph -> StreamGraph)
forall a. a -> Maybe a
Just ((StreamGraph -> StreamGraph)
 -> Maybe (StreamGraph -> StreamGraph))
-> (StreamGraph -> StreamGraph)
-> Maybe (StreamGraph -> StreamGraph)
forall a b. (a -> b) -> a -> b
$ \StreamGraph
g ->
        let t4 :: String
t4 = String
"[" String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
t3 String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
"]"
            k :: Int
k  = StreamGraph -> Int
newVertexId StreamGraph
g
            p :: ExpQ
p  = [| \b a' -> tail $ scanl $(f) (last b) a' |]

            f' :: StreamVertex
f' = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
i (Double -> StreamOperator
Filter Double
0.5) [[| not.null |]]  String
t1 String
t1 Double
0
            sc' :: StreamVertex
sc'= Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
j StreamOperator
Scan         [ExpQ
p, [| [$(a)] |]] String
t1 String
t4 Double
ss
            e' :: StreamVertex
e' = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
k StreamOperator
Expand       []                String
t4 String
t3 Double
se

        in  StreamGraph -> StreamGraph -> StreamGraph
forall a. Graph a -> Graph a -> Graph a
overlay ([StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path [StreamVertex
f',StreamVertex
sc',StreamVertex
e']) (StreamGraph -> StreamGraph) -> StreamGraph -> StreamGraph
forall a b. (a -> b) -> a -> b
$
            (StreamVertex -> StreamVertex -> StreamGraph -> StreamGraph
forall a. Eq a => a -> a -> Graph a -> Graph a
removeEdge StreamVertex
f' StreamVertex
e' (StreamGraph -> StreamGraph)
-> (StreamGraph -> StreamGraph) -> StreamGraph -> StreamGraph
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamVertex -> StreamVertex -> StreamGraph -> StreamGraph
forall a. Eq a => a -> a -> Graph a -> Graph a
replaceVertex StreamVertex
e StreamVertex
f' (StreamGraph -> StreamGraph)
-> (StreamGraph -> StreamGraph) -> StreamGraph -> StreamGraph
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamVertex -> StreamVertex -> StreamGraph -> StreamGraph
forall a. Eq a => a -> a -> Graph a -> Graph a
replaceVertex StreamVertex
sc StreamVertex
e') StreamGraph
g

expandScan StreamGraph
_ = Maybe (StreamGraph -> StreamGraph)
forall a. Maybe a
Nothing

expandScanPre :: StreamGraph
expandScanPre = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path
    [ Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 (Double -> StreamOperator
Source Double
1) []    String
"[Int]" String
"[Int]" Double
1
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 StreamOperator
Expand []    String
"[Int]" String
"Int" Double
2
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
2 StreamOperator
Scan   [ExpQ
f,ExpQ
a] String
"Int"   String
"Int" Double
3
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
3 StreamOperator
Sink   []    String
"Int"   String
"Int" Double
4
    ]
    where
        f :: ExpQ
f = [| \c _ -> c + 1 |]
        a :: ExpQ
a = [| 0 |]

expandScanPost :: StreamGraph
expandScanPost = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path
    [ Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 (Double -> StreamOperator
Source Double
1)       []     String
"[Int]" String
"[Int]" Double
1
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 (Double -> StreamOperator
Filter Double
0.5) [ExpQ
p]    String
"[Int]" String
"[Int]" Double
0
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
2 StreamOperator
Scan         [ExpQ
g,ExpQ
as] String
"[Int]" String
"[Int]" Double
3
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
4 StreamOperator
Expand       []     String
"[Int]" String
"Int" Double
2
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
3 StreamOperator
Sink         []     String
"Int"   String
"Int" Double
4
    ]
    where
        p :: ExpQ
p  = [| not . null |]
        g :: ExpQ
g  = [| \b a' -> tail $ scanl $(f) (last b) a' |]
        f :: ExpQ
f  = [| \c _ -> c + 1 |]
        as :: ExpQ
as = [| [$(a)] |]
        a :: ExpQ
a  = [| 0 |]

test_expandScan :: IO ()
test_expandScan = StreamGraph -> StreamGraph -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
assertEqual (StreamGraph -> StreamGraph
forall a. Ord a => Graph a -> Graph a
simplify (StreamGraph -> StreamGraph) -> StreamGraph -> StreamGraph
forall a b. (a -> b) -> a -> b
$ RewriteRule -> StreamGraph -> StreamGraph
applyRule RewriteRule
expandScan StreamGraph
expandScanPre) StreamGraph
expandScanPost

-- streamExpand >>> streamExpand == streamMap concat >>> streamExpand --------
-- [[a]]        [a]             a  [[a]]            [a]              a

expandExpand :: RewriteRule
expandExpand :: RewriteRule
expandExpand (Connect (Vertex e :: StreamVertex
e@(StreamVertex Int
i StreamOperator
Expand [ExpQ]
_ String
t1 String
t2 Double
s))
                      (Vertex   (StreamVertex Int
j StreamOperator
Expand [ExpQ]
_ String
_ String
_ Double
_))) =
    let m :: StreamVertex
m = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
i StreamOperator
Map [[| concat |]] String
t1 String
t2 Double
s
    in  (StreamGraph -> StreamGraph) -> Maybe (StreamGraph -> StreamGraph)
forall a. a -> Maybe a
Just (StreamVertex -> StreamVertex -> StreamGraph -> StreamGraph
forall a. Eq a => a -> a -> Graph a -> Graph a
replaceVertex StreamVertex
e StreamVertex
m)

expandExpand StreamGraph
_ = Maybe (StreamGraph -> StreamGraph)
forall a. Maybe a
Nothing

expandExpandPre :: StreamGraph
expandExpandPre = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path
    [ Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 (Double -> StreamOperator
Source Double
1) [] String
"[Int]" String
"[Int]" Double
1
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 StreamOperator
Expand [] String
"[[Int]]" String
"[Int]" Double
2
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
2 StreamOperator
Expand [] String
"[Int]" String
"Int" Double
3
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
3 StreamOperator
Sink   [] String
"Int" String
"[Int]" Double
4
    ]

expandExpandPost :: StreamGraph
expandExpandPost = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path
    [ Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 (Double -> StreamOperator
Source Double
1) [] String
"[Int]" String
"[Int]" Double
1
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 StreamOperator
Map [[| concat |]] String
"[[Int]]" String
"[Int]" Double
2
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
2 StreamOperator
Expand [] String
"[Int]" String
"Int" Double
3
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
3 StreamOperator
Sink   [] String
"Int" String
"[Int]" Double
4
    ]

test_expandExpand :: IO ()
test_expandExpand = StreamGraph -> StreamGraph -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
assertEqual (RewriteRule -> StreamGraph -> StreamGraph
applyRule RewriteRule
expandExpand StreamGraph
expandExpandPre)
    StreamGraph
expandExpandPost

-- streamFilter f (streamMerge [s1, s2]) -------------------------------------
-- = streamMerge [streamFilter f s1, streamFilter f s2]

mergeFilter :: RewriteRule
mergeFilter :: RewriteRule
mergeFilter = StreamOperator -> RewriteRule
hoistOp (Double -> StreamOperator
Filter Double
0)

-- TODO address Filter selectivities
-- | "hoist" an Operator (such as a Filter) upstream through a Merge operator.
hoistOp :: StreamOperator -> RewriteRule
hoistOp StreamOperator
op (Connect (Vertex m :: StreamVertex
m@(StreamVertex Int
i StreamOperator
Merge [ExpQ]
_ String
_ String
ty Double
_))
                    (Vertex f :: StreamVertex
f@(StreamVertex Int
j StreamOperator
o [ExpQ]
pred String
_ String
ty' Double
s))) =

    if Bool -> Bool
not(StreamOperator -> StreamOperator -> Bool
cmpOps StreamOperator
o StreamOperator
op) then Maybe (StreamGraph -> StreamGraph)
forall a. Maybe a
Nothing
    else (StreamGraph -> StreamGraph) -> Maybe (StreamGraph -> StreamGraph)
forall a. a -> Maybe a
Just ((StreamGraph -> StreamGraph)
 -> Maybe (StreamGraph -> StreamGraph))
-> (StreamGraph -> StreamGraph)
-> Maybe (StreamGraph -> StreamGraph)
forall a b. (a -> b) -> a -> b
$ \StreamGraph
g -> let

        mkOp :: StreamGraph -> StreamVertex
mkOp StreamGraph
g = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex (StreamGraph -> Int
newVertexId StreamGraph
g) StreamOperator
o [ExpQ]
pred String
ty String
ty' Double
s

        -- for each NODE that connects to Merge: (:: [StreamVertex])
        inbound :: [StreamVertex]
inbound    = ((StreamVertex, StreamVertex) -> StreamVertex)
-> [(StreamVertex, StreamVertex)] -> [StreamVertex]
forall a b. (a -> b) -> [a] -> [b]
map (StreamVertex, StreamVertex) -> StreamVertex
forall a b. (a, b) -> a
fst ([(StreamVertex, StreamVertex)] -> [StreamVertex])
-> (StreamGraph -> [(StreamVertex, StreamVertex)])
-> StreamGraph
-> [StreamVertex]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ((StreamVertex, StreamVertex) -> Bool)
-> [(StreamVertex, StreamVertex)] -> [(StreamVertex, StreamVertex)]
forall a. (a -> Bool) -> [a] -> [a]
filter ((StreamVertex
mStreamVertex -> StreamVertex -> Bool
forall a. Eq a => a -> a -> Bool
==) (StreamVertex -> Bool)
-> ((StreamVertex, StreamVertex) -> StreamVertex)
-> (StreamVertex, StreamVertex)
-> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (StreamVertex, StreamVertex) -> StreamVertex
forall a b. (a, b) -> b
snd) ([(StreamVertex, StreamVertex)] -> [(StreamVertex, StreamVertex)])
-> (StreamGraph -> [(StreamVertex, StreamVertex)])
-> StreamGraph
-> [(StreamVertex, StreamVertex)]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamGraph -> [(StreamVertex, StreamVertex)]
forall a. Ord a => Graph a -> [(a, a)]
edgeList (StreamGraph -> [StreamVertex]) -> StreamGraph -> [StreamVertex]
forall a b. (a -> b) -> a -> b
$ StreamGraph
g

        -- * remove that edge (:: [StreamGraph -> StreamGraph])
        snipMerge :: [StreamGraph -> StreamGraph]
snipMerge  = (StreamVertex -> StreamGraph -> StreamGraph)
-> [StreamVertex] -> [StreamGraph -> StreamGraph]
forall a b. (a -> b) -> [a] -> [b]
map (\StreamVertex
v -> StreamVertex -> StreamVertex -> StreamGraph -> StreamGraph
forall a. Eq a => a -> a -> Graph a -> Graph a
removeEdge StreamVertex
v StreamVertex
m) [StreamVertex]
inbound

        -- * insert new Operators (:: [StreamGraph -> StreamGraph])
        newOps :: [StreamGraph -> StreamGraph]
newOps = (StreamVertex -> StreamGraph -> StreamGraph)
-> [StreamVertex] -> [StreamGraph -> StreamGraph]
forall a b. (a -> b) -> [a] -> [b]
map (\StreamVertex
v -> \StreamGraph
g -> StreamGraph -> StreamGraph -> StreamGraph
forall a. Graph a -> Graph a -> Graph a
overlay StreamGraph
g (StreamGraph -> StreamGraph) -> StreamGraph -> StreamGraph
forall a b. (a -> b) -> a -> b
$ [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path [StreamVertex
v, StreamGraph -> StreamVertex
mkOp StreamGraph
g, StreamVertex
m]) [StreamVertex]
inbound

        m' :: StreamVertex
m' = StreamVertex
m { intype :: String
intype = String
ty', outtype :: String
outtype = String
ty' }

  in (StreamVertex -> StreamVertex -> StreamGraph -> StreamGraph
forall a. Eq a => a -> a -> Graph a -> Graph a
removeEdge StreamVertex
m StreamVertex
f
      (StreamGraph -> StreamGraph)
-> (StreamGraph -> StreamGraph) -> StreamGraph -> StreamGraph
forall {k} (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> StreamVertex -> StreamVertex -> StreamGraph -> StreamGraph
forall a. Eq a => a -> a -> Graph a -> Graph a
replaceVertex StreamVertex
f StreamVertex
m
      (StreamGraph -> StreamGraph)
-> (StreamGraph -> StreamGraph) -> StreamGraph -> StreamGraph
forall {k} (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> [StreamGraph -> StreamGraph] -> StreamGraph -> StreamGraph
forall a. [Graph a -> Graph a] -> Graph a -> Graph a
foldRules [StreamGraph -> StreamGraph]
snipMerge
      (StreamGraph -> StreamGraph)
-> (StreamGraph -> StreamGraph) -> StreamGraph -> StreamGraph
forall {k} (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> [StreamGraph -> StreamGraph] -> StreamGraph -> StreamGraph
forall a. [Graph a -> Graph a] -> Graph a -> Graph a
foldRules [StreamGraph -> StreamGraph]
newOps
      (StreamGraph -> StreamGraph)
-> (StreamGraph -> StreamGraph) -> StreamGraph -> StreamGraph
forall {k} (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> StreamVertex -> StreamVertex -> StreamGraph -> StreamGraph
forall a. Eq a => a -> a -> Graph a -> Graph a
replaceVertex StreamVertex
m StreamVertex
m') StreamGraph
g

hoistOp StreamOperator
_ StreamGraph
_ = Maybe (StreamGraph -> StreamGraph)
forall a. Maybe a
Nothing

v1 :: StreamVertex
v1 = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 (Double -> StreamOperator
Source Double
1)       []           String
"Int" String
"Int" Double
1
v2 :: StreamVertex
v2 = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 (Double -> StreamOperator
Source Double
1)       []           String
"Int" String
"Int" Double
2
v3 :: StreamVertex
v3 = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
2 StreamOperator
Merge        []           String
"Int" String
"Int" Double
3
v4 :: StreamVertex
v4 = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
3 (Double -> StreamOperator
Filter Double
0.5) [[| (>3) |]] String
"Int" String
"Int" Double
4
v5 :: StreamVertex
v5 = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
4 StreamOperator
Sink         []           String
"Int" String
"Int" Double
5

mergeFilterPre :: StreamGraph
mergeFilterPre = StreamGraph -> StreamGraph -> StreamGraph
forall a. Graph a -> Graph a -> Graph a
overlay ([StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path [StreamVertex
v1,StreamVertex
v3,StreamVertex
v4,StreamVertex
v5]) ([StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path [StreamVertex
v2,StreamVertex
v3])

v6 :: StreamVertex
v6 = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
5 (Double -> StreamOperator
Filter Double
0.5) [[| (>3) |]] String
"Int" String
"Int" Double
4
v7 :: StreamVertex
v7 = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
6 (Double -> StreamOperator
Filter Double
0.5) [[| (>3) |]] String
"Int" String
"Int" Double
4

mergeFilterPost :: StreamGraph
mergeFilterPost = StreamGraph -> StreamGraph -> StreamGraph
forall a. Graph a -> Graph a -> Graph a
overlay ([StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path [StreamVertex
v1,StreamVertex
v6,StreamVertex
v3,StreamVertex
v5]) ([StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path [StreamVertex
v2,StreamVertex
v7,StreamVertex
v3])

test_mergeFilter :: IO ()
test_mergeFilter = StreamGraph -> StreamGraph -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
assertEqual (RewriteRule -> StreamGraph -> StreamGraph
applyRule RewriteRule
mergeFilter StreamGraph
mergeFilterPre)
    StreamGraph
mergeFilterPost

-- streamExpand (streamMerge [s1, s2]) -------------------------------------
-- = streamMerge [streamExpand s1, streamExpand s2]

mergeExpand :: RewriteRule
mergeExpand :: RewriteRule
mergeExpand = StreamOperator -> RewriteRule
hoistOp StreamOperator
Expand

v8 :: StreamVertex
v8  = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 (Double -> StreamOperator
Source Double
1) [] String
"[Int]" String
"[Int]" Double
1
v9 :: StreamVertex
v9  = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 (Double -> StreamOperator
Source Double
1) [] String
"[Int]" String
"[Int]" Double
2
v10 :: StreamVertex
v10 = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
2 StreamOperator
Merge  [] String
"[Int]" String
"[Int]" Double
3
v11 :: StreamVertex
v11 = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
3 StreamOperator
Expand [] String
"[Int]" String
"Int"   Double
4

mergeExpandPre :: StreamGraph
mergeExpandPre  = StreamGraph -> StreamGraph -> StreamGraph
forall a. Graph a -> Graph a -> Graph a
overlay ([StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path [StreamVertex
v8, StreamVertex
v10, StreamVertex
v11, StreamVertex
v5]) ([StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path [StreamVertex
v9, StreamVertex
v10])

v12 :: StreamVertex
v12 = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
2 StreamOperator
Merge  [] String
"Int" String
"Int" Double
3
v13 :: StreamVertex
v13 = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
5 StreamOperator
Expand [] String
"[Int]" String
"Int" Double
4
v14 :: StreamVertex
v14 = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
6 StreamOperator
Expand [] String
"[Int]" String
"Int" Double
4

mergeExpandPost :: StreamGraph
mergeExpandPost = StreamGraph -> StreamGraph -> StreamGraph
forall a. Graph a -> Graph a -> Graph a
overlay ([StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path [StreamVertex
v8, StreamVertex
v13, StreamVertex
v12, StreamVertex
v5]) ([StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path [StreamVertex
v9, StreamVertex
v14, StreamVertex
v12])

test_mergeExpand :: IO ()
test_mergeExpand = StreamGraph -> StreamGraph -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
assertEqual (RewriteRule -> StreamGraph -> StreamGraph
applyRule RewriteRule
mergeExpand StreamGraph
mergeExpandPre)
    StreamGraph
mergeExpandPost

-- streamMap (streamMerge [s1, s2]) ------------------------------------------
-- = streamMerge [streamMap s1, streamMap s2]

mergeMap :: RewriteRule
mergeMap :: RewriteRule
mergeMap = StreamOperator -> RewriteRule
hoistOp StreamOperator
Map

v15 :: StreamVertex
v15 = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 (Double -> StreamOperator
Source Double
1) [] String
"Int" String
"Int" Double
1
v16 :: StreamVertex
v16 = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 (Double -> StreamOperator
Source Double
1) [] String
"Int" String
"Int" Double
2
v17 :: StreamVertex
v17 = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
2 StreamOperator
Merge []  String
"Int" String
"Int" Double
3
v18 :: StreamVertex
v18 = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
3 StreamOperator
Map [[| show |]]  String
"Int" String
"String" Double
4
v19 :: StreamVertex
v19 = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
4 StreamOperator
Sink [] String
"String" String
"String" Double
5

mergeMapPre :: StreamGraph
mergeMapPre = StreamGraph -> StreamGraph -> StreamGraph
forall a. Graph a -> Graph a -> Graph a
overlay ([StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path [StreamVertex
v15,StreamVertex
v17,StreamVertex
v18,StreamVertex
v19]) ([StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path [StreamVertex
v16,StreamVertex
v17])

v20 :: StreamVertex
v20 = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
5 StreamOperator
Map [[| show |]]  String
"Int" String
"String" Double
4
v21 :: StreamVertex
v21 = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
6 StreamOperator
Map [[| show |]]  String
"Int" String
"String" Double
4
v22 :: StreamVertex
v22 = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
2 StreamOperator
Merge [] String
"String" String
"String" Double
3

mergeMapPost :: StreamGraph
mergeMapPost = StreamGraph -> StreamGraph -> StreamGraph
forall a. Graph a -> Graph a -> Graph a
overlay ([StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path [StreamVertex
v15,StreamVertex
v20,StreamVertex
v22,StreamVertex
v19]) ([StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path [StreamVertex
v16,StreamVertex
v21,StreamVertex
v22])

test_mergeMap :: IO ()
test_mergeMap = StreamGraph -> StreamGraph -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
assertEqual (RewriteRule -> StreamGraph -> StreamGraph
applyRule RewriteRule
mergeMap StreamGraph
mergeMapPre) StreamGraph
mergeMapPost

-- streamMerge [streamMap f s1, streamMap f s2]
--     == streamMap f (streamMerge [s1,s2])

identicalParams :: [StreamVertex] -> Bool
identicalParams :: [StreamVertex] -> Bool
identicalParams [StreamVertex]
inbound =
    let params :: [[Exp]]
params = (([ExpQ] -> [Exp]) -> [[ExpQ]] -> [[Exp]]
forall a b. (a -> b) -> [a] -> [b]
map(([ExpQ] -> [Exp]) -> [[ExpQ]] -> [[Exp]])
-> ((ExpQ -> Exp) -> [ExpQ] -> [Exp])
-> (ExpQ -> Exp)
-> [[ExpQ]]
-> [[Exp]]
forall b c a. (b -> c) -> (a -> b) -> a -> c
.(ExpQ -> Exp) -> [ExpQ] -> [Exp]
forall a b. (a -> b) -> [a] -> [b]
map) ExpQ -> Exp
deQ ((StreamVertex -> [ExpQ]) -> [StreamVertex] -> [[ExpQ]]
forall a b. (a -> b) -> [a] -> [b]
map StreamVertex -> [ExpQ]
parameters [StreamVertex]
inbound)
    in  [Bool] -> Bool
forall (t :: * -> *). Foldable t => t Bool -> Bool
and (([Exp] -> Bool) -> [[Exp]] -> [Bool]
forall a b. (a -> b) -> [a] -> [b]
map ([Exp] -> [Exp] -> Bool
forall a. Eq a => a -> a -> Bool
==([[Exp]] -> [Exp]
forall a. [a] -> a
head [[Exp]]
params)) ([[Exp]] -> [[Exp]]
forall a. [a] -> [a]
tail [[Exp]]
params))

-- TODO: for filters, this will only match when all the upstream filters
-- have the same selectivity. An alternative would be to average them.
pushOp :: StreamOperator -> RewriteRule
pushOp StreamOperator
op (Connect (Vertex ma :: StreamVertex
ma@(StreamVertex Int
i StreamOperator
o [ExpQ]
fs String
t1 String
t2 Double
_))
                   (Vertex me :: StreamVertex
me@(StreamVertex Int
j StreamOperator
Merge [ExpQ]
_ String
t3 String
_ Double
_))) =

    if Bool -> Bool
not(StreamOperator -> StreamOperator -> Bool
cmpOps StreamOperator
o StreamOperator
op) then Maybe (StreamGraph -> StreamGraph)
forall a. Maybe a
Nothing
    else (StreamGraph -> StreamGraph) -> Maybe (StreamGraph -> StreamGraph)
forall a. a -> Maybe a
Just ((StreamGraph -> StreamGraph)
 -> Maybe (StreamGraph -> StreamGraph))
-> (StreamGraph -> StreamGraph)
-> Maybe (StreamGraph -> StreamGraph)
forall a b. (a -> b) -> a -> b
$ \StreamGraph
g -> let

        inbound :: [StreamVertex]
inbound = ((StreamVertex, StreamVertex) -> StreamVertex)
-> [(StreamVertex, StreamVertex)] -> [StreamVertex]
forall a b. (a -> b) -> [a] -> [b]
map (StreamVertex, StreamVertex) -> StreamVertex
forall a b. (a, b) -> a
fst ([(StreamVertex, StreamVertex)] -> [StreamVertex])
-> (StreamGraph -> [(StreamVertex, StreamVertex)])
-> StreamGraph
-> [StreamVertex]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ((StreamVertex, StreamVertex) -> Bool)
-> [(StreamVertex, StreamVertex)] -> [(StreamVertex, StreamVertex)]
forall a. (a -> Bool) -> [a] -> [a]
filter ((StreamVertex
meStreamVertex -> StreamVertex -> Bool
forall a. Eq a => a -> a -> Bool
==) (StreamVertex -> Bool)
-> ((StreamVertex, StreamVertex) -> StreamVertex)
-> (StreamVertex, StreamVertex)
-> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (StreamVertex, StreamVertex) -> StreamVertex
forall a b. (a, b) -> b
snd) ([(StreamVertex, StreamVertex)] -> [(StreamVertex, StreamVertex)])
-> (StreamGraph -> [(StreamVertex, StreamVertex)])
-> StreamGraph
-> [(StreamVertex, StreamVertex)]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamGraph -> [(StreamVertex, StreamVertex)]
forall a. Ord a => Graph a -> [(a, a)]
edgeList (StreamGraph -> [StreamVertex]) -> StreamGraph -> [StreamVertex]
forall a b. (a -> b) -> a -> b
$ StreamGraph
g
        -- the pattern match is not enough to be conclusive that this applies
        in if [StreamVertex] -> Bool
identicalOperators [StreamVertex]
inbound Bool -> Bool -> Bool
&& [StreamVertex] -> Bool
identicalParams [StreamVertex]
inbound
           then let
               me' :: StreamVertex
me' = StreamVertex
me { intype :: String
intype = String
t1, outtype :: String
outtype = String
t1 }
               ma' :: StreamVertex
ma' = StreamVertex
ma { vertexId :: Int
vertexId =  StreamGraph -> Int
newVertexId StreamGraph
g }
               on :: StreamVertex
on  = (StreamVertex, StreamVertex) -> StreamVertex
forall a b. (a, b) -> b
snd ((StreamVertex, StreamVertex) -> StreamVertex)
-> (StreamGraph -> (StreamVertex, StreamVertex))
-> StreamGraph
-> StreamVertex
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [(StreamVertex, StreamVertex)] -> (StreamVertex, StreamVertex)
forall a. [a] -> a
head ([(StreamVertex, StreamVertex)] -> (StreamVertex, StreamVertex))
-> (StreamGraph -> [(StreamVertex, StreamVertex)])
-> StreamGraph
-> (StreamVertex, StreamVertex)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ((StreamVertex, StreamVertex) -> Bool)
-> [(StreamVertex, StreamVertex)] -> [(StreamVertex, StreamVertex)]
forall a. (a -> Bool) -> [a] -> [a]
filter ((StreamVertex -> StreamVertex -> Bool
forall a. Eq a => a -> a -> Bool
==StreamVertex
me) (StreamVertex -> Bool)
-> ((StreamVertex, StreamVertex) -> StreamVertex)
-> (StreamVertex, StreamVertex)
-> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (StreamVertex, StreamVertex) -> StreamVertex
forall a b. (a, b) -> a
fst) ([(StreamVertex, StreamVertex)] -> [(StreamVertex, StreamVertex)])
-> (StreamGraph -> [(StreamVertex, StreamVertex)])
-> StreamGraph
-> [(StreamVertex, StreamVertex)]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamGraph -> [(StreamVertex, StreamVertex)]
forall a. Ord a => Graph a -> [(a, a)]
edgeList (StreamGraph -> StreamVertex) -> StreamGraph -> StreamVertex
forall a b. (a -> b) -> a -> b
$ StreamGraph
g
               in ( StreamVertex -> StreamVertex -> StreamGraph -> StreamGraph
forall a. Eq a => a -> a -> Graph a -> Graph a
removeEdge StreamVertex
me StreamVertex
on
                   -- remove all the inbound operators
                   (StreamGraph -> StreamGraph)
-> (StreamGraph -> StreamGraph) -> StreamGraph -> StreamGraph
forall {k} (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> (StreamVertex -> Bool)
-> StreamVertex -> StreamGraph -> StreamGraph
forall a. (a -> Bool) -> a -> Graph a -> Graph a
mergeVertices (StreamVertex -> [StreamVertex] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` [StreamVertex]
inbound) StreamVertex
me
                   (StreamGraph -> StreamGraph)
-> (StreamGraph -> StreamGraph) -> StreamGraph -> StreamGraph
forall {k} (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> StreamVertex -> StreamVertex -> StreamGraph -> StreamGraph
forall a. Eq a => a -> a -> Graph a -> Graph a
replaceVertex StreamVertex
me StreamVertex
me' -- fix Merge type
                   (StreamGraph -> StreamGraph)
-> (StreamGraph -> StreamGraph) -> StreamGraph -> StreamGraph
forall {k} (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> StreamVertex -> StreamVertex -> StreamGraph -> StreamGraph
forall a. Eq a => a -> a -> Graph a -> Graph a
removeEdge StreamVertex
me' StreamVertex
me'
                   -- new Operator after Merge
                   (StreamGraph -> StreamGraph)
-> (StreamGraph -> StreamGraph) -> StreamGraph -> StreamGraph
forall {k} (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> StreamGraph -> StreamGraph -> StreamGraph
forall a. Graph a -> Graph a -> Graph a
overlay ([StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path [StreamVertex
me', StreamVertex
ma', StreamVertex
on])
               ) StreamGraph
g
           else StreamGraph
g

pushOp StreamOperator
_ StreamGraph
_ = Maybe (StreamGraph -> StreamGraph)
forall a. Maybe a
Nothing

mapMerge :: RewriteRule
mapMerge :: RewriteRule
mapMerge = StreamOperator -> RewriteRule
pushOp StreamOperator
Map

mapMergePre :: StreamGraph
mapMergePre  = StreamGraph
mergeMapPost
mapMergePost :: StreamGraph
mapMergePost = StreamGraph -> StreamGraph -> StreamGraph
forall a. Graph a -> Graph a -> Graph a
overlay ([StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path [StreamVertex
v15,StreamVertex
v17,StreamVertex
v18 {vertexId :: Int
vertexId = Int
7}, StreamVertex
v19]) ([StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path [StreamVertex
v16,StreamVertex
v17])

test_mapMerge :: IO ()
test_mapMerge = StreamGraph -> StreamGraph -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
assertEqual (RewriteRule -> StreamGraph -> StreamGraph
applyRule RewriteRule
mapMerge StreamGraph
mapMergePre) StreamGraph
mapMergePost

-- streamMerge [streamFilter p s1, streamFilter p s2] ------------------------
-- == streamFilter p (streamMerge [s1,s2])

-- TODO: here we assume that the filters have the same selectivity. Alternative
-- we could/should average them.
filterMerge :: RewriteRule
filterMerge :: RewriteRule
filterMerge = StreamOperator -> RewriteRule
pushOp (Double -> StreamOperator
Filter Double
0)

filterMergePre :: StreamGraph
filterMergePre  = StreamGraph
mergeFilterPost
filterMergePost :: StreamGraph
filterMergePost = StreamGraph -> StreamGraph -> StreamGraph
forall a. Graph a -> Graph a -> Graph a
overlay ([StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path [StreamVertex
v1,StreamVertex
v3,StreamVertex
v4 {vertexId :: Int
vertexId=Int
7},StreamVertex
v5]) ([StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path [StreamVertex
v2,StreamVertex
v3])
test_filterMerge :: IO ()
test_filterMerge = StreamGraph -> StreamGraph -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
assertEqual (RewriteRule -> StreamGraph -> StreamGraph
applyRule RewriteRule
filterMerge StreamGraph
filterMergePre)
    StreamGraph
filterMergePost

-- streamMerge [streamExpand s1, streamExpand s2] ----------------------------
-- == streamExpand (streamMerge [s1,s2])

expandMerge :: RewriteRule
expandMerge :: RewriteRule
expandMerge = StreamOperator -> RewriteRule
pushOp StreamOperator
Expand

expandMergePre :: StreamGraph
expandMergePre  = StreamGraph
mergeExpandPost
expandMergePost :: StreamGraph
expandMergePost = StreamGraph -> StreamGraph -> StreamGraph
forall a. Graph a -> Graph a -> Graph a
overlay ([StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path [StreamVertex
v8, StreamVertex
v10, StreamVertex
v11 {vertexId :: Int
vertexId=Int
7}, StreamVertex
v5]) ([StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path [StreamVertex
v9, StreamVertex
v10])
test_expandMerge :: IO ()
test_expandMerge = StreamGraph -> StreamGraph -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
assertEqual (RewriteRule -> StreamGraph -> StreamGraph
applyRule RewriteRule
expandMerge StreamGraph
expandMergePre)
    StreamGraph
expandMergePost

-- streamMerge merge ---------------------------------------------------------

-- | fuse two Merge operators. The order-preserving transformation is strictly
-- right-oriented i.e. merge [s1, merge [s2,s3]] == merge [s1,s2,s3] but for
-- non-order-preserving we can write a much more generic rule.
mergeFuse :: RewriteRule
mergeFuse :: RewriteRule
mergeFuse (Connect (Vertex m1 :: StreamVertex
m1@(StreamVertex Int
i StreamOperator
Merge [ExpQ]
_ String
_ String
_ Double
_))
                   (Vertex m2 :: StreamVertex
m2@(StreamVertex Int
j StreamOperator
Merge [ExpQ]
_ String
_ String
_ Double
_))) =
    (StreamGraph -> StreamGraph) -> Maybe (StreamGraph -> StreamGraph)
forall a. a -> Maybe a
Just (StreamVertex -> StreamVertex -> StreamGraph -> StreamGraph
forall a. Eq a => a -> a -> Graph a -> Graph a
removeEdge StreamVertex
m1 StreamVertex
m1 (StreamGraph -> StreamGraph)
-> (StreamGraph -> StreamGraph) -> StreamGraph -> StreamGraph
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (StreamVertex -> Bool)
-> StreamVertex -> StreamGraph -> StreamGraph
forall a. (a -> Bool) -> a -> Graph a -> Graph a
mergeVertices (StreamVertex -> [StreamVertex] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` [StreamVertex
m1,StreamVertex
m2]) StreamVertex
m1)

mergeFuse StreamGraph
_ = Maybe (StreamGraph -> StreamGraph)
forall a. Maybe a
Nothing

v23 :: StreamVertex
v23 = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 (Double -> StreamOperator
Source Double
1) [] String
"Int" String
"Int" Double
1
v24 :: StreamVertex
v24 = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 (Double -> StreamOperator
Source Double
1) [] String
"Int" String
"Int" Double
2
v25 :: StreamVertex
v25 = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
2 (Double -> StreamOperator
Source Double
1) [] String
"Int" String
"Int" Double
3
v26 :: StreamVertex
v26 = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
3 StreamOperator
Merge []  String
"Int" String
"Int" Double
4
v27 :: StreamVertex
v27 = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
4 StreamOperator
Merge []  String
"Int" String
"Int" Double
5
v28 :: StreamVertex
v28 = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
5 StreamOperator
Sink []   String
"Int" String
"Int" Double
6

mergeFusePre :: StreamGraph
mergeFusePre = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path [StreamVertex
v23,StreamVertex
v26,StreamVertex
v27,StreamVertex
v28]
    StreamGraph -> StreamGraph -> StreamGraph
forall a. Graph a -> Graph a -> Graph a
`Overlay`  [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path [StreamVertex
v24,StreamVertex
v26,StreamVertex
v27]
    StreamGraph -> StreamGraph -> StreamGraph
forall a. Graph a -> Graph a -> Graph a
`Overlay`  [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path [StreamVertex
v25,StreamVertex
v27]

mergeFusePost :: StreamGraph
mergeFusePost = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path [StreamVertex
v23,StreamVertex
v26,StreamVertex
v28]
    StreamGraph -> StreamGraph -> StreamGraph
forall a. Graph a -> Graph a -> Graph a
`Overlay`   [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path [StreamVertex
v24,StreamVertex
v26]
    StreamGraph -> StreamGraph -> StreamGraph
forall a. Graph a -> Graph a -> Graph a
`Overlay`   [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path [StreamVertex
v25,StreamVertex
v26]

test_mergeFuse :: IO ()
test_mergeFuse = StreamGraph -> StreamGraph -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
assertEqual (RewriteRule -> StreamGraph -> StreamGraph
applyRule RewriteRule
mergeFuse StreamGraph
mergeFusePre) StreamGraph
mergeFusePost

-- streamExpand >>> streamFilterAcc ... = streamScan ... >>> streamExpand-----
-- [a]           a                   a  [a]               [a]              a

expandFilterAcc :: RewriteRule
expandFilterAcc :: RewriteRule
expandFilterAcc (Connect (Vertex e :: StreamVertex
e@(StreamVertex Int
i StreamOperator
Expand [ExpQ]
_ String
t1 String
t2 Double
_))
                         (Vertex fa :: StreamVertex
fa@(StreamVertex Int
j (FilterAcc Double
_) (ExpQ
f:ExpQ
a:ExpQ
p:[ExpQ]
_) String
_ String
_ Double
s))) =
    (StreamGraph -> StreamGraph) -> Maybe (StreamGraph -> StreamGraph)
forall a. a -> Maybe a
Just ((StreamGraph -> StreamGraph)
 -> Maybe (StreamGraph -> StreamGraph))
-> (StreamGraph -> StreamGraph)
-> Maybe (StreamGraph -> StreamGraph)
forall a b. (a -> b) -> a -> b
$ \StreamGraph
g -> let
        scan :: StreamVertex
scan = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
i StreamOperator
Scan
            [ [| \(_,acc) a -> filterAcc $(f) acc $(p) a |]
            , [| ([],$(a)) |]
            ] String
t1 String
t1 Double
s
        mapr :: StreamVertex
mapr = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
j StreamOperator
Map [[| reverse.fst |]] String
t1 String
t1 Double
0
        k :: Int
k    = StreamGraph -> Int
newVertexId StreamGraph
g
        expd :: StreamVertex
expd = StreamVertex
e { vertexId :: Int
vertexId = Int
k }

        in StreamGraph
g StreamGraph -> (StreamGraph -> StreamGraph) -> StreamGraph
forall a b. a -> (a -> b) -> b
& StreamVertex -> StreamVertex -> StreamGraph -> StreamGraph
forall a. Eq a => a -> a -> Graph a -> Graph a
removeEdge StreamVertex
e StreamVertex
fa
             StreamGraph -> (StreamGraph -> StreamGraph) -> StreamGraph
forall a b. a -> (a -> b) -> b
& StreamVertex -> StreamVertex -> StreamGraph -> StreamGraph
forall a. Eq a => a -> a -> Graph a -> Graph a
replaceVertex StreamVertex
e StreamVertex
scan
             StreamGraph -> (StreamGraph -> StreamGraph) -> StreamGraph
forall a b. a -> (a -> b) -> b
& StreamVertex -> StreamVertex -> StreamGraph -> StreamGraph
forall a. Eq a => a -> a -> Graph a -> Graph a
replaceVertex StreamVertex
fa StreamVertex
expd
             StreamGraph -> (StreamGraph -> StreamGraph) -> StreamGraph
forall a b. a -> (a -> b) -> b
& StreamGraph -> StreamGraph -> StreamGraph
forall a. Graph a -> Graph a -> Graph a
overlay ([StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path [StreamVertex
scan,StreamVertex
mapr,StreamVertex
expd])

expandFilterAcc  StreamGraph
_ = Maybe (StreamGraph -> StreamGraph)
forall a. Maybe a
Nothing

expandFilterAccPre :: StreamGraph
expandFilterAccPre = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path
    [ Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 (Double -> StreamOperator
Source Double
1)      []        String
"[Int]" String
"[Int]" Double
1
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 StreamOperator
Expand          []        String
"[Int]" String
"Int"   Double
1
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
2 (Double -> StreamOperator
FilterAcc Double
0.5) [ExpQ
f, ExpQ
a, ExpQ
p] String
"Int"   String
"Int"   Double
2
    ] where f :: ExpQ
f = [| (\_ h -> (False, h)) |]
            a :: ExpQ
a = [| (True, undefined) |]
            p :: ExpQ
p = [| \new (b,old) -> b || old /= new |]

expandFilterAccPost :: StreamGraph
expandFilterAccPost = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path
    [ Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 (Double -> StreamOperator
Source Double
1)      []        String
"[Int]" String
"[Int]" Double
1
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 StreamOperator
Scan            [ExpQ
sa, ExpQ
si]  String
"[Int]" String
"[Int]" Double
2
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
2 StreamOperator
Map             [ExpQ
f']      String
"[Int]" String
"[Int]" Double
0
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
3 StreamOperator
Expand          []        String
"[Int]" String
"Int"   Double
1
    ] where f :: ExpQ
f  = [| (\_ h -> (False, h)) |]
            a :: ExpQ
a  = [| (True, undefined) |]
            p :: ExpQ
p  = [| \new (b,old) -> b || old /= new |]
            f' :: ExpQ
f' = [| reverse.fst |]
            sa :: ExpQ
sa = [| \(_,acc) a -> filterAcc $(f) acc $(p) a |]
            si :: ExpQ
si = [| ([],$(a)) |]

test_expandFilterAcc :: IO ()
test_expandFilterAcc = StreamGraph -> StreamGraph -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
assertEqual StreamGraph
expandFilterAccPost
    (StreamGraph -> IO ()) -> StreamGraph -> IO ()
forall a b. (a -> b) -> a -> b
$ RewriteRule -> StreamGraph -> StreamGraph
applyRule RewriteRule
expandFilterAcc StreamGraph
expandFilterAccPre

-- utility/boilerplate -------------------------------------------------------

sumTimes :: Double -> Double -> Double -> Double
sumTimes :: Double -> Double -> Double -> Double
sumTimes Double
a Double
f Double
b = Double
a Double -> Double -> Double
forall a. Num a => a -> a -> a
+ (Double
f Double -> Double -> Double
forall a. Num a => a -> a -> a
* Double
b)

test_sumTimes1 :: IO ()
test_sumTimes1 = Double -> Double -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
assertEqual Double
1 (Double -> IO ()) -> Double -> IO ()
forall a b. (a -> b) -> a -> b
$ Double -> Double -> Double -> Double
sumTimes (Double
1Double -> Double -> Double
forall a. Fractional a => a -> a -> a
/Double
2) Double
1 (Double
1Double -> Double -> Double
forall a. Fractional a => a -> a -> a
/Double
2)
test_sumTimes2 :: IO ()
test_sumTimes2 = Double -> Double -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
assertEqual (Double
1Double -> Double -> Double
forall a. Fractional a => a -> a -> a
/Double
2) (Double -> IO ()) -> Double -> IO ()
forall a b. (a -> b) -> a -> b
$ Double -> Double -> Double -> Double
sumTimes (Double
1Double -> Double -> Double
forall a. Fractional a => a -> a -> a
/Double
4) Double
1 (Double
1Double -> Double -> Double
forall a. Fractional a => a -> a -> a
/Double
4)

-- compare operators, ignoring filter selectivity
cmpOps :: StreamOperator -> StreamOperator -> Bool
cmpOps :: StreamOperator -> StreamOperator -> Bool
cmpOps (Filter Double
_) (Filter Double
_) = Bool
True
cmpOps (FilterAcc Double
_) (FilterAcc Double
_) = Bool
True
cmpOps StreamOperator
x StreamOperator
y = StreamOperator
x StreamOperator -> StreamOperator -> Bool
forall a. Eq a => a -> a -> Bool
== StreamOperator
y

-- this will only match filter operators with the same selectivity
identicalOperators :: [StreamVertex] -> Bool
identicalOperators :: [StreamVertex] -> Bool
identicalOperators = (Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
==Int
1) (Int -> Bool) -> ([StreamVertex] -> Int) -> [StreamVertex] -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [StreamOperator] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length ([StreamOperator] -> Int)
-> ([StreamVertex] -> [StreamOperator]) -> [StreamVertex] -> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [StreamOperator] -> [StreamOperator]
forall a. Eq a => [a] -> [a]
nub ([StreamOperator] -> [StreamOperator])
-> ([StreamVertex] -> [StreamOperator])
-> [StreamVertex]
-> [StreamOperator]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (StreamVertex -> StreamOperator)
-> [StreamVertex] -> [StreamOperator]
forall a b. (a -> b) -> [a] -> [b]
map StreamVertex -> StreamOperator
operator

-- | left-biased rule application via fold
foldRules :: [Graph a -> Graph a] -> Graph a -> Graph a
foldRules :: forall a. [Graph a -> Graph a] -> Graph a -> Graph a
foldRules [Graph a -> Graph a]
rules Graph a
g = (Graph a -> (Graph a -> Graph a) -> Graph a)
-> Graph a -> [Graph a -> Graph a] -> Graph a
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
foldl Graph a -> (Graph a -> Graph a) -> Graph a
forall a b. a -> (a -> b) -> b
(&) Graph a
g [Graph a -> Graph a]
rules

-- generate a new vertexId which doesn't clash with any existing ones
-- TODO this will still break the assumption that CompileIoT makes regarding
-- vertexId ordering in a path. We either need to renumber all subsequent nodes
-- or remove that requirement in CompileIoT
newVertexId :: StreamGraph -> Int
newVertexId :: StreamGraph -> Int
newVertexId = Int -> Int
forall a. Enum a => a -> a
succ (Int -> Int) -> (StreamGraph -> Int) -> StreamGraph -> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [Int] -> Int
forall a. [a] -> a
last ([Int] -> Int) -> (StreamGraph -> [Int]) -> StreamGraph -> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [Int] -> [Int]
forall a. Ord a => [a] -> [a]
sort ([Int] -> [Int]) -> (StreamGraph -> [Int]) -> StreamGraph -> [Int]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (StreamVertex -> Int) -> [StreamVertex] -> [Int]
forall a b. (a -> b) -> [a] -> [b]
map StreamVertex -> Int
vertexId ([StreamVertex] -> [Int])
-> (StreamGraph -> [StreamVertex]) -> StreamGraph -> [Int]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamGraph -> [StreamVertex]
forall a. Ord a => Graph a -> [a]
vertexList

main :: IO ()
main = TestSuite -> IO ()
forall t. TestableHTF t => t -> IO ()
htfMain htf_thisModulesTests

-- streamFilter p >>> streamWindow w = streamWindow w >>> streamMap (filter p)
-- this is not a generally-applicable rule:
--  * if the WindowMaker makes decisions based on the Event values or their
--    sequencing, then this is altered by removing the pre-filter 
--  * even if it doesn't, the windows might change
-- the filter's selectivity is lost

filterWindow :: RewriteRule
filterWindow :: RewriteRule
filterWindow (Connect (Vertex f :: StreamVertex
f@(StreamVertex Int
i (Filter Double
_) (ExpQ
p:[ExpQ]
_) String
_ String
_ Double
s))
                      (Vertex w :: StreamVertex
w@(StreamVertex Int
j StreamOperator
Window     [ExpQ]
_     String
_ String
t Double
_))) =
    let m :: StreamVertex
m = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
j StreamOperator
Map [[| filter $(p) |]] String
t String
t Double
s
        w' :: StreamVertex
w'= StreamVertex
w { vertexId :: Int
vertexId = Int
i }
    in  (StreamGraph -> StreamGraph) -> Maybe (StreamGraph -> StreamGraph)
forall a. a -> Maybe a
Just (StreamVertex -> StreamVertex -> StreamGraph -> StreamGraph
forall a. Eq a => a -> a -> Graph a -> Graph a
replaceVertex StreamVertex
f StreamVertex
w' (StreamGraph -> StreamGraph)
-> (StreamGraph -> StreamGraph) -> StreamGraph -> StreamGraph
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamVertex -> StreamVertex -> StreamGraph -> StreamGraph
forall a. Eq a => a -> a -> Graph a -> Graph a
replaceVertex StreamVertex
w StreamVertex
m)

filterWindow StreamGraph
_ = Maybe (StreamGraph -> StreamGraph)
forall a. Maybe a
Nothing

filterWindowPre :: StreamGraph
filterWindowPre = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path
    [ Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 (Double -> StreamOperator
Source Double
1)   []         String
"Int"   String
"Int"   Double
1
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 (Double -> StreamOperator
Filter Double
0.5) [[|(>3)|]] String
"Int"   String
"Int"   Double
2
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
2 StreamOperator
Window       []         String
"Int"   String
"[Int]" Double
3
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
3 StreamOperator
Sink         []         String
"[Int]" String
"[Int]" Double
4
    ]

filterWindowPost :: StreamGraph
filterWindowPost = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path
    [ Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 (Double -> StreamOperator
Source Double
1)   []                String
"Int"   String
"Int"   Double
1
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 StreamOperator
Window       []                String
"Int"   String
"[Int]" Double
3
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
2 StreamOperator
Map          [[|filter (>3)|]] String
"[Int]" String
"[Int]" Double
2
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
3 StreamOperator
Sink         []                String
"[Int]" String
"[Int]" Double
4
    ]

test_filterWindow :: IO ()
test_filterWindow = StreamGraph -> StreamGraph -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
assertEqual (RewriteRule -> StreamGraph -> StreamGraph
applyRule RewriteRule
filterWindow StreamGraph
filterWindowPre) StreamGraph
filterWindowPost

-- streamFilterAcc f a p >>> streamWindow w =
-- streamWindow w >>> streamScan (\ (_,acc) a -> filterAcc f acc p a) ([],a)
--                >>> streamMap (reverse.fst) 

-- Same caveats as filterWindow

filterAccWindow :: RewriteRule
filterAccWindow :: RewriteRule
filterAccWindow (Connect (Vertex fa :: StreamVertex
fa@(StreamVertex Int
i (FilterAcc Double
_) (ExpQ
f:ExpQ
a:ExpQ
p:[ExpQ]
_) String
_ String
_ Double
s))
                         (Vertex  w :: StreamVertex
w@(StreamVertex Int
j StreamOperator
Window [ExpQ]
_ String
_ String
t Double
_))) =
    (StreamGraph -> StreamGraph) -> Maybe (StreamGraph -> StreamGraph)
forall a. a -> Maybe a
Just ((StreamGraph -> StreamGraph)
 -> Maybe (StreamGraph -> StreamGraph))
-> (StreamGraph -> StreamGraph)
-> Maybe (StreamGraph -> StreamGraph)
forall a b. (a -> b) -> a -> b
$ \StreamGraph
g -> let
      w' :: StreamVertex
w' = StreamVertex
w { vertexId :: Int
vertexId = Int
i }
      sc :: StreamVertex
sc = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
j StreamOperator
Scan
         [ [| \ (_, acc) a -> filterAcc $(f) acc $(p) a |]
         , [| ([], $(a)) |]
         ] String
t String
t Double
s
      m :: StreamVertex
m  = Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex (StreamGraph -> Int
newVertexId StreamGraph
g) StreamOperator
Map [[| reverse.fst |]] String
t String
t Double
0
      in StreamGraph
g StreamGraph -> (StreamGraph -> StreamGraph) -> StreamGraph
forall a b. a -> (a -> b) -> b
& StreamVertex -> StreamVertex -> StreamGraph -> StreamGraph
forall a. Eq a => a -> a -> Graph a -> Graph a
replaceVertex StreamVertex
fa StreamVertex
w'
           StreamGraph -> (StreamGraph -> StreamGraph) -> StreamGraph
forall a b. a -> (a -> b) -> b
& StreamVertex -> StreamVertex -> StreamGraph -> StreamGraph
forall a. Eq a => a -> a -> Graph a -> Graph a
replaceVertex StreamVertex
w  StreamVertex
m
           StreamGraph -> (StreamGraph -> StreamGraph) -> StreamGraph
forall a b. a -> (a -> b) -> b
& StreamVertex -> StreamVertex -> StreamGraph -> StreamGraph
forall a. Eq a => a -> a -> Graph a -> Graph a
removeEdge StreamVertex
w' StreamVertex
m
           StreamGraph -> (StreamGraph -> StreamGraph) -> StreamGraph
forall a b. a -> (a -> b) -> b
& StreamGraph -> StreamGraph -> StreamGraph
forall a. Graph a -> Graph a -> Graph a
overlay ([StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path [StreamVertex
w', StreamVertex
sc, StreamVertex
m])

filterAccWindow StreamGraph
_ = Maybe (StreamGraph -> StreamGraph)
forall a. Maybe a
Nothing

filterAccWindowPre :: StreamGraph
filterAccWindowPre = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path
    [ Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 (Double -> StreamOperator
Source Double
1)      []        String
"Int" String
"Int"   Double
1
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 (Double -> StreamOperator
FilterAcc Double
0.5) [ExpQ
f, ExpQ
a, ExpQ
p] String
"Int" String
"Int"   Double
2
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
2 StreamOperator
Window          []        String
"Int" String
"[Int]" Double
1
    ] where f :: ExpQ
f = [| (\_ h -> (False, h)) |]
            a :: ExpQ
a = [| (True, undefined) |]
            p :: ExpQ
p = [| \new (b,old) -> b || old /= new |]

filterAccWindowPost :: StreamGraph
filterAccWindowPost = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path
    [ Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 (Double -> StreamOperator
Source Double
1) []       String
"Int"   String
"Int"   Double
1
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 StreamOperator
Window     []       String
"Int"   String
"[Int]" Double
1
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
2 StreamOperator
Scan       [ExpQ
sa, ExpQ
si] String
"[Int]" String
"[Int]" Double
2
    , Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
3 StreamOperator
Map        [ExpQ
f']     String
"[Int]" String
"[Int]" Double
0
    ] where f :: ExpQ
f = [| (\_ h -> (False, h)) |]
            a :: ExpQ
a = [| (True, undefined) |]
            p :: ExpQ
p = [| \new (b,old) -> b || old /= new |]
            f' :: ExpQ
f' = [| reverse.fst |]
            sa :: ExpQ
sa = [| \(_,acc) a -> filterAcc $(f) acc $(p) a |]
            si :: ExpQ
si = [| ([],$(a)) |]

test_filterAccWindow :: IO ()
test_filterAccWindow = StreamGraph -> StreamGraph -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
assertEqual StreamGraph
filterAccWindowPost
    (StreamGraph -> IO ()) -> StreamGraph -> IO ()
forall a b. (a -> b) -> a -> b
$ RewriteRule -> StreamGraph -> StreamGraph
applyRule RewriteRule
filterAccWindow StreamGraph
filterAccWindowPre