{-# OPTIONS_GHC -F -pgmF htfpp #-}
{-# OPTIONS_HADDOCK prune #-}
{-# LANGUAGE TemplateHaskell, FlexibleInstances #-}
{-|
Module      : Striot.StreamGraph
Description : StrIoT StreamGraph
Copyright   : © Jonathan Dowland, 2021
License     : Apache 2.0
Maintainer  : jon@dow.land
Stability   : experimental

StrIoT `StreamGraph` type, used for representing a stream-processing program,
such that it can be re-written, partitioned and translated into code in terms
of `Striot.FunctionalIoTTypes` for execution on distributed nodes.

 -}

module Striot.StreamGraph ( StreamGraph(..)
                          , StreamOperator(..)
                          , StreamVertex(..)
                          , PartitionedGraph(..)
                          , deQ
                          , isSource
                          , showParam

                          , simpleStream

                          -- QuickCheck generators
                          , streamgraph
                          , streamgraph'

                          , htf_thisModulesTests
                          ) where

import Algebra.Graph
import Data.List (intercalate)
import Language.Haskell.TH
import System.IO.Unsafe (unsafePerformIO)
import Test.Framework -- Arbitrary, etc.

import Data.List.Split (splitOn)
-- SYB generic programming
import Data.Data
import Data.Generics.Schemes (everywhere)
import Data.Generics.Aliases (mkT)

import Data.Tree

-- |The `StreamOperator` and associated information required to encode a stream-processing
-- program into a Graph. Each distinct `StreamVertex` within a `StreamGraph` should have a
-- unique `vertexId` to ensure that they can be distinguished. For simple path-style graphs,
-- the IDs should be in ascending order.
data StreamVertex = StreamVertex
    { StreamVertex -> Int
vertexId   :: Int
    , StreamVertex -> StreamOperator
operator   :: StreamOperator
    , StreamVertex -> [ExpQ]
parameters :: [ExpQ]
    , StreamVertex -> String
intype     :: String
    , StreamVertex -> String
outtype    :: String
    , StreamVertex -> Double
serviceTime:: Double
    }

instance Eq StreamVertex where
    StreamVertex
a == :: StreamVertex -> StreamVertex -> Bool
== StreamVertex
b = [Bool] -> Bool
forall (t :: * -> *). Foldable t => t Bool -> Bool
and [ StreamVertex -> Int
vertexId StreamVertex
a Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== StreamVertex -> Int
vertexId StreamVertex
b
                 , StreamVertex -> StreamOperator
operator StreamVertex
a StreamOperator -> StreamOperator -> Bool
forall a. Eq a => a -> a -> Bool
== StreamVertex -> StreamOperator
operator StreamVertex
b
                 , StreamVertex -> String
intype StreamVertex
a   String -> String -> Bool
forall a. Eq a => a -> a -> Bool
== StreamVertex -> String
intype StreamVertex
b
                 , StreamVertex -> String
outtype StreamVertex
a  String -> String -> Bool
forall a. Eq a => a -> a -> Bool
== StreamVertex -> String
outtype StreamVertex
b
                 , ((ExpQ -> String) -> [ExpQ] -> [String]
forall a b. (a -> b) -> [a] -> [b]
map ExpQ -> String
showParam (StreamVertex -> [ExpQ]
parameters StreamVertex
a)) [String] -> [String] -> Bool
forall a. Eq a => a -> a -> Bool
== ((ExpQ -> String) -> [ExpQ] -> [String]
forall a b. (a -> b) -> [a] -> [b]
map ExpQ -> String
showParam (StreamVertex -> [ExpQ]
parameters StreamVertex
b))
                 , StreamVertex -> Double
serviceTime StreamVertex
a Double -> Double -> Bool
forall a. Eq a => a -> a -> Bool
== StreamVertex -> Double
serviceTime StreamVertex
b
                 ]

instance Show StreamVertex where
    show :: StreamVertex -> String
show (StreamVertex Int
i StreamOperator
o [ExpQ]
ps String
inT String
outT Double
s) =
        String
"StreamVertex " String -> ShowS
forall a. [a] -> [a] -> [a]
++ String -> [String] -> String
forall a. [a] -> [[a]] -> [a]
intercalate String
" "
            [ Int -> String
forall a. Show a => a -> String
show Int
i
            , StreamOperator -> String
forall a. Show a => a -> String
show StreamOperator
o
            , [String] -> String
forall a. Show a => a -> String
show ((ExpQ -> String) -> [ExpQ] -> [String]
forall a b. (a -> b) -> [a] -> [b]
map ExpQ -> String
showParam [ExpQ]
ps)
            , ShowS
forall a. Show a => a -> String
show String
inT
            , ShowS
forall a. Show a => a -> String
show String
outT
            , Double -> String
forall a. Show a => a -> String
show Double
s
            ]

deQ :: Q Exp -> Exp
deQ :: ExpQ -> Exp
deQ = IO Exp -> Exp
forall a. IO a -> a
unsafePerformIO (IO Exp -> Exp) -> (ExpQ -> IO Exp) -> ExpQ -> Exp
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ExpQ -> IO Exp
forall (m :: * -> *) a. Quasi m => Q a -> m a
runQ

showParam :: Q Exp -> String
showParam :: ExpQ -> String
showParam = Exp -> String
forall a. Ppr a => a -> String
pprint (Exp -> String) -> (ExpQ -> Exp) -> ExpQ -> String
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Exp -> Exp
unQualifyNames (Exp -> Exp) -> (ExpQ -> Exp) -> ExpQ -> Exp
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ExpQ -> Exp
deQ

-- | Walk over an Exp expression and replace all embedded Names with unqualified versions.
-- E.g. GHC.List.last => last. Special-handling for composition (.).
unQualifyNames :: Exp -> Exp
unQualifyNames :: Exp -> Exp
unQualifyNames = (forall a. Data a => a -> a) -> forall a. Data a => a -> a
everywhere (\a
a -> (Name -> Name) -> a -> a
forall a b. (Typeable a, Typeable b) => (b -> b) -> a -> a
mkT Name -> Name
f a
a)
     where f :: Name -> Name
           f :: Name -> Name
f Name
n = if Name
n Name -> Name -> Bool
forall a. Eq a => a -> a -> Bool
== '(.)
            then String -> Name
mkName String
"."
            else (String -> Name
mkName (String -> Name) -> (Name -> String) -> Name -> Name
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [String] -> String
forall a. [a] -> a
last ([String] -> String) -> (Name -> [String]) -> Name -> String
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> String -> [String]
forall a. Eq a => [a] -> [a] -> [[a]]
splitOn String
"." (String -> [String]) -> (Name -> String) -> Name -> [String]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Name -> String
forall a. Ppr a => a -> String
pprint) Name
n

-- |A graph representation of a stream-processing program.
type StreamGraph = Graph StreamVertex

-- |A collection of partitioned StreamGraphs
type PartitionedGraph = ([StreamGraph], StreamGraph)

-- |An enumeration of the possible stream operators within a stream-processing program,
-- as well as `Source` and `Sink` to represent the ingress and egress points of programs.
data StreamOperator = Map
                    | Filter Double -- selectivity
                    | Expand
                    | Window
                    | Merge
                    | Join
                    | Scan
                    | FilterAcc Double -- selectivity
                    | Source Double -- arrival rate
                    | Sink
                    deriving (Int -> StreamOperator -> ShowS
[StreamOperator] -> ShowS
StreamOperator -> String
(Int -> StreamOperator -> ShowS)
-> (StreamOperator -> String)
-> ([StreamOperator] -> ShowS)
-> Show StreamOperator
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [StreamOperator] -> ShowS
$cshowList :: [StreamOperator] -> ShowS
show :: StreamOperator -> String
$cshow :: StreamOperator -> String
showsPrec :: Int -> StreamOperator -> ShowS
$cshowsPrec :: Int -> StreamOperator -> ShowS
Show,Eq StreamOperator
Eq StreamOperator
-> (StreamOperator -> StreamOperator -> Ordering)
-> (StreamOperator -> StreamOperator -> Bool)
-> (StreamOperator -> StreamOperator -> Bool)
-> (StreamOperator -> StreamOperator -> Bool)
-> (StreamOperator -> StreamOperator -> Bool)
-> (StreamOperator -> StreamOperator -> StreamOperator)
-> (StreamOperator -> StreamOperator -> StreamOperator)
-> Ord StreamOperator
StreamOperator -> StreamOperator -> Bool
StreamOperator -> StreamOperator -> Ordering
StreamOperator -> StreamOperator -> StreamOperator
forall a.
Eq a
-> (a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
min :: StreamOperator -> StreamOperator -> StreamOperator
$cmin :: StreamOperator -> StreamOperator -> StreamOperator
max :: StreamOperator -> StreamOperator -> StreamOperator
$cmax :: StreamOperator -> StreamOperator -> StreamOperator
>= :: StreamOperator -> StreamOperator -> Bool
$c>= :: StreamOperator -> StreamOperator -> Bool
> :: StreamOperator -> StreamOperator -> Bool
$c> :: StreamOperator -> StreamOperator -> Bool
<= :: StreamOperator -> StreamOperator -> Bool
$c<= :: StreamOperator -> StreamOperator -> Bool
< :: StreamOperator -> StreamOperator -> Bool
$c< :: StreamOperator -> StreamOperator -> Bool
compare :: StreamOperator -> StreamOperator -> Ordering
$ccompare :: StreamOperator -> StreamOperator -> Ordering
Ord,StreamOperator -> StreamOperator -> Bool
(StreamOperator -> StreamOperator -> Bool)
-> (StreamOperator -> StreamOperator -> Bool) -> Eq StreamOperator
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: StreamOperator -> StreamOperator -> Bool
$c/= :: StreamOperator -> StreamOperator -> Bool
== :: StreamOperator -> StreamOperator -> Bool
$c== :: StreamOperator -> StreamOperator -> Bool
Eq)

instance Ord StreamVertex where
    compare :: StreamVertex -> StreamVertex -> Ordering
compare StreamVertex
x StreamVertex
y = Int -> Int -> Ordering
forall a. Ord a => a -> a -> Ordering
compare (StreamVertex -> Int
vertexId StreamVertex
x) (StreamVertex -> Int
vertexId StreamVertex
y)

isSource :: StreamOperator -> Bool
isSource :: StreamOperator -> Bool
isSource (Source Double
_) = Bool
True
isSource StreamOperator
_ = Bool
False

-- |Convenience function for specifying a simple path-style of stream
-- processing program, with no merge or join operations. The list of tuples are
-- converted into a series of connected Stream Vertices in a Graph. The tuple
-- arguments are the relevant `StreamOperator` for the node; the parameters;the
-- *output* type and the service time. The other parameters to `StreamVertex`
-- are inferred from the neighbouring tuples. Unique and ascending `vertexId`
-- values are assigned.
simpleStream :: [(StreamOperator, [ExpQ], String, Double)] -> Graph StreamVertex
simpleStream :: [(StreamOperator, [ExpQ], String, Double)] -> StreamGraph
simpleStream [(StreamOperator, [ExpQ], String, Double)]
tupes = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path [StreamVertex]
lst

    where
        intypes :: [String]
intypes = String
"IO ()" String -> [String] -> [String]
forall a. a -> [a] -> [a]
: (((StreamOperator, [ExpQ], String, Double) -> String)
-> [(StreamOperator, [ExpQ], String, Double)] -> [String]
forall a b. (a -> b) -> [a] -> [b]
map (\(StreamOperator
_,[ExpQ]
_,String
ty,Double
_) -> String
ty) ([(StreamOperator, [ExpQ], String, Double)]
-> [(StreamOperator, [ExpQ], String, Double)]
forall a. [a] -> [a]
init [(StreamOperator, [ExpQ], String, Double)]
tupes))
        tupes3 :: [(Int, String, (StreamOperator, [ExpQ], String, Double))]
tupes3 = [Int]
-> [String]
-> [(StreamOperator, [ExpQ], String, Double)]
-> [(Int, String, (StreamOperator, [ExpQ], String, Double))]
forall a b c. [a] -> [b] -> [c] -> [(a, b, c)]
zip3 [Int
1..] [String]
intypes [(StreamOperator, [ExpQ], String, Double)]
tupes
        lst :: [StreamVertex]
lst = ((Int, String, (StreamOperator, [ExpQ], String, Double))
 -> StreamVertex)
-> [(Int, String, (StreamOperator, [ExpQ], String, Double))]
-> [StreamVertex]
forall a b. (a -> b) -> [a] -> [b]
map (\ (Int
i,String
intype,(StreamOperator
op,[ExpQ]
params,String
outtype,Double
sTime)) ->
            Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
i StreamOperator
op [ExpQ]
params String
intype String
outtype Double
sTime) [(Int, String, (StreamOperator, [ExpQ], String, Double))]
tupes3


------------------------------------------------------------------------------
-- quickcheck experiment

-- never generates sources or sinks
instance Arbitrary StreamOperator where
    arbitrary :: Gen StreamOperator
arbitrary = do
        Double
d <- Positive Double -> Double
forall a. Positive a -> a
getPositive (Positive Double -> Double) -> Gen (Positive Double) -> Gen Double
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Gen (Positive Double)
forall a. Arbitrary a => Gen a
arbitrary
        [StreamOperator] -> Gen StreamOperator
forall a. [a] -> Gen a
elements [StreamOperator
Map, Double -> StreamOperator
Filter Double
d, StreamOperator
Expand, StreamOperator
Window, StreamOperator
Merge, StreamOperator
Join, StreamOperator
Scan, Double -> StreamOperator
FilterAcc Double
d]

instance Arbitrary StreamVertex where
    arbitrary :: Gen StreamVertex
arbitrary = do
        Int
vertexId <- Positive Int -> Int
forall a. Positive a -> a
getPositive (Positive Int -> Int) -> Gen (Positive Int) -> Gen Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Gen (Positive Int)
forall a. Arbitrary a => Gen a
arbitrary
        Double
serviceT <- Positive Double -> Double
forall a. Positive a -> a
getPositive (Positive Double -> Double) -> Gen (Positive Double) -> Gen Double
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Gen (Positive Double)
forall a. Arbitrary a => Gen a
arbitrary
        StreamOperator
operator <- Gen StreamOperator
forall a. Arbitrary a => Gen a
arbitrary

        let parameters :: [a]
parameters = []
            ty :: String
ty = String
"?" in
            StreamVertex -> Gen StreamVertex
forall (m :: * -> *) a. Monad m => a -> m a
return (StreamVertex -> Gen StreamVertex)
-> StreamVertex -> Gen StreamVertex
forall a b. (a -> b) -> a -> b
$ Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
vertexId StreamOperator
operator [ExpQ]
forall a. [a]
parameters String
ty String
ty Double
serviceT

streamgraph :: Gen StreamGraph
streamgraph :: Gen StreamGraph
streamgraph = (Int -> Gen StreamGraph) -> Gen StreamGraph
forall a. (Int -> Gen a) -> Gen a
sized Int -> Gen StreamGraph
streamgraph'

streamgraph' :: Int -> Gen StreamGraph
streamgraph' Int
0 = StreamGraph -> Gen StreamGraph
forall (m :: * -> *) a. Monad m => a -> m a
return StreamGraph
forall a. Graph a
empty

streamgraph' Int
n | Int
nInt -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>=Int
1 =
    Int -> Gen (Tree StreamVertex)
treegraph' Int
n Gen (Tree StreamVertex)
-> (Tree StreamVertex -> Gen StreamGraph) -> Gen StreamGraph
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= StreamGraph -> Gen StreamGraph
forall (m :: * -> *) a. Monad m => a -> m a
return (StreamGraph -> Gen StreamGraph)
-> (Tree StreamVertex -> StreamGraph)
-> Tree StreamVertex
-> Gen StreamGraph
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamGraph -> StreamGraph
forall a. Graph a -> Graph a
transpose (StreamGraph -> StreamGraph)
-> (Tree StreamVertex -> StreamGraph)
-> Tree StreamVertex
-> StreamGraph
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Tree StreamVertex -> StreamGraph
forall a. Tree a -> Graph a
tree

-- requires FlexibleInstances
instance Arbitrary StreamGraph where
    arbitrary :: Gen StreamGraph
arbitrary = Gen StreamGraph
streamgraph

------------------------------------------------------------------------------
-- Tree StreamVertex: private type for convenience of algorithms that fit Tree
-- better than Graph.

treegraph :: Gen (Tree StreamVertex)
treegraph :: Gen (Tree StreamVertex)
treegraph = (Int -> Gen (Tree StreamVertex)) -> Gen (Tree StreamVertex)
forall a. (Int -> Gen a) -> Gen a
sized Int -> Gen (Tree StreamVertex)
treegraph'

treegraph' :: Int -> Gen (Tree StreamVertex)
treegraph' :: Int -> Gen (Tree StreamVertex)
treegraph' Int
0 = String -> Gen (Tree StreamVertex)
forall a. HasCallStack => String -> a
error String
"can't represent an empty tree"
treegraph' Int
1 = Gen StreamVertex
forall a. Arbitrary a => Gen a
arbitrary Gen StreamVertex
-> (StreamVertex -> Gen (Tree StreamVertex))
-> Gen (Tree StreamVertex)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \StreamVertex
v ->
    Tree StreamVertex -> Gen (Tree StreamVertex)
forall (m :: * -> *) a. Monad m => a -> m a
return (StreamVertex -> [Tree StreamVertex] -> Tree StreamVertex
forall a. a -> [Tree a] -> Tree a
Node (StreamVertex
v { operator :: StreamOperator
operator = StreamOperator
Sink , vertexId :: Int
vertexId = Int
0 }) [])

treegraph' Int
n | Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>Int
1 = Int -> Gen (Tree StreamVertex)
treegraph' Int
1 Gen (Tree StreamVertex)
-> (Tree StreamVertex -> Gen (Tree StreamVertex))
-> Gen (Tree StreamVertex)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Int -> Tree StreamVertex -> Gen (Tree StreamVertex)
extendTree (Int
nInt -> Int -> Int
forall a. Num a => a -> a -> a
-Int
1)

chOp :: StreamOperator -> StreamVertex -> StreamVertex
chOp :: StreamOperator -> StreamVertex -> StreamVertex
chOp StreamOperator
o StreamVertex
x = StreamVertex
x { operator :: StreamOperator
operator = StreamOperator
o }

chId :: Int -> StreamVertex -> StreamVertex
chId :: Int -> StreamVertex -> StreamVertex
chId Int
i StreamVertex
x = StreamVertex
x { vertexId :: Int
vertexId = Int
i }

extendTree :: Int -> Tree StreamVertex -> Gen (Tree StreamVertex)
extendTree :: Int -> Tree StreamVertex -> Gen (Tree StreamVertex)
extendTree Int
0 Tree StreamVertex
n = Tree StreamVertex -> Gen (Tree StreamVertex)
forall (m :: * -> *) a. Monad m => a -> m a
return Tree StreamVertex
n

-- only one more Node to create; it must be a source
extendTree Int
1 (Node StreamVertex
v []) = do
    Double
n   <- Gen Double
forall a. Arbitrary a => Gen a
arbitrary
    StreamVertex
src <- Int -> StreamVertex -> StreamVertex
chId Int
1 (StreamVertex -> StreamVertex)
-> (StreamVertex -> StreamVertex) -> StreamVertex -> StreamVertex
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> StreamOperator -> StreamVertex -> StreamVertex
chOp (Double -> StreamOperator
Source Double
n) (StreamVertex -> StreamVertex)
-> Gen StreamVertex -> Gen StreamVertex
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Gen StreamVertex
forall a. Arbitrary a => Gen a
arbitrary
    Tree StreamVertex -> Gen (Tree StreamVertex)
forall (m :: * -> *) a. Monad m => a -> m a
return (Tree StreamVertex -> Gen (Tree StreamVertex))
-> Tree StreamVertex -> Gen (Tree StreamVertex)
forall a b. (a -> b) -> a -> b
$ StreamVertex -> [Tree StreamVertex] -> Tree StreamVertex
forall a. a -> [Tree a] -> Tree a
Node StreamVertex
v [StreamVertex -> [Tree StreamVertex] -> Tree StreamVertex
forall a. a -> [Tree a] -> Tree a
Node StreamVertex
src []]

extendTree Int
n t :: Tree StreamVertex
t@(Node StreamVertex
v []) | Int
nInt -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>Int
1 = do
    case StreamVertex -> StreamOperator
operator StreamVertex
v of
        StreamOperator
Join   -> Int -> Tree StreamVertex -> Gen (Tree StreamVertex)
extendJoin Int
n Tree StreamVertex
t
        StreamOperator
Merge  -> Int -> Tree StreamVertex -> Gen (Tree StreamVertex)
extendMerge Int
n Tree StreamVertex
t
        StreamOperator
_      -> do
            Double
d    <- Gen Double
forall a. Arbitrary a => Gen a
arbitrary
            let ops :: [StreamOperator]
ops = [StreamOperator
Map, Double -> StreamOperator
Filter Double
d, StreamOperator
Expand, StreamOperator
Window, StreamOperator
Merge, StreamOperator
Scan, Double -> StreamOperator
FilterAcc Double
d]
                    -- Join needs at least 3 nodes (two Sources) and cannot precede
                    -- Expand (type (a,b) ≠ [a])
            StreamOperator
op   <- if   Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
3 Bool -> Bool -> Bool
|| StreamVertex -> StreamOperator
operator StreamVertex
v StreamOperator -> StreamOperator -> Bool
forall a. Eq a => a -> a -> Bool
== StreamOperator
Expand
                    then [StreamOperator] -> Gen StreamOperator
forall a. [a] -> Gen a
elements [StreamOperator]
ops
                    else [StreamOperator] -> Gen StreamOperator
forall a. [a] -> Gen a
elements (StreamOperator
JoinStreamOperator -> [StreamOperator] -> [StreamOperator]
forall a. a -> [a] -> [a]
:[StreamOperator]
ops)

            StreamVertex
new  <- StreamOperator -> StreamVertex -> StreamVertex
chOp StreamOperator
op (StreamVertex -> StreamVertex)
-> (StreamVertex -> StreamVertex) -> StreamVertex -> StreamVertex
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Int -> StreamVertex -> StreamVertex
chId Int
n (StreamVertex -> StreamVertex)
-> Gen StreamVertex -> Gen StreamVertex
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Gen StreamVertex
forall a. Arbitrary a => Gen a
arbitrary
            Tree StreamVertex
rest <- Int -> Tree StreamVertex -> Gen (Tree StreamVertex)
extendTree (Int
nInt -> Int -> Int
forall a. Num a => a -> a -> a
-Int
1) (StreamVertex -> [Tree StreamVertex] -> Tree StreamVertex
forall a. a -> [Tree a] -> Tree a
Node StreamVertex
new [])
            Tree StreamVertex -> Gen (Tree StreamVertex)
forall (m :: * -> *) a. Monad m => a -> m a
return (Tree StreamVertex -> Gen (Tree StreamVertex))
-> Tree StreamVertex -> Gen (Tree StreamVertex)
forall a b. (a -> b) -> a -> b
$ StreamVertex -> [Tree StreamVertex] -> Tree StreamVertex
forall a. a -> [Tree a] -> Tree a
Node StreamVertex
v [Tree StreamVertex
rest]

incrId :: Int -> StreamVertex -> StreamVertex
incrId :: Int -> StreamVertex -> StreamVertex
incrId Int
i StreamVertex
v = StreamVertex
v { vertexId :: Int
vertexId = StreamVertex -> Int
vertexId StreamVertex
v Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
i }

-- in the specialised extend* functions below, we can't generate
-- the branches from the real node of consideration or we'll loop
fakeroot :: Tree StreamVertex
fakeroot = StreamVertex -> [Tree StreamVertex] -> Tree StreamVertex
forall a. a -> [Tree a] -> Tree a
Node (Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 StreamOperator
Sink [] String
"" String
"" Double
0) []

extendJoin :: Int -> Tree (StreamVertex) -> Gen (Tree StreamVertex)
extendJoin :: Int -> Tree StreamVertex -> Gen (Tree StreamVertex)
extendJoin Int
n (Node v :: StreamVertex
v@(StreamVertex Int
_ StreamOperator
Join [ExpQ]
_ String
_ String
_ Double
_) []) = do
    let n1 :: Int
n1 = Int
n Int -> Int -> Int
forall a. Integral a => a -> a -> a
`div` Int
2
    let n2 :: Int
n2 = Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
n1

    Node StreamVertex
_ [Tree StreamVertex]
children  <- Int -> Tree StreamVertex -> Gen (Tree StreamVertex)
extendTree Int
n1 Tree StreamVertex
fakeroot
    Node StreamVertex
_ [Tree StreamVertex]
children' <- (Tree StreamVertex -> Tree StreamVertex)
-> Gen (Tree StreamVertex) -> Gen (Tree StreamVertex)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((StreamVertex -> StreamVertex)
-> Tree StreamVertex -> Tree StreamVertex
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Int -> StreamVertex -> StreamVertex
incrId Int
n1)) (Int -> Tree StreamVertex -> Gen (Tree StreamVertex)
extendTree Int
n2 Tree StreamVertex
fakeroot)

    Tree StreamVertex -> Gen (Tree StreamVertex)
forall (m :: * -> *) a. Monad m => a -> m a
return (Tree StreamVertex -> Gen (Tree StreamVertex))
-> Tree StreamVertex -> Gen (Tree StreamVertex)
forall a b. (a -> b) -> a -> b
$ StreamVertex -> [Tree StreamVertex] -> Tree StreamVertex
forall a. a -> [Tree a] -> Tree a
Node StreamVertex
v ([Tree StreamVertex]
children [Tree StreamVertex] -> [Tree StreamVertex] -> [Tree StreamVertex]
forall a. [a] -> [a] -> [a]
++ [Tree StreamVertex]
children')

extendMerge :: Int -> Tree (StreamVertex) -> Gen (Tree StreamVertex)
extendMerge :: Int -> Tree StreamVertex -> Gen (Tree StreamVertex)
extendMerge Int
n (Node v :: StreamVertex
v@(StreamVertex Int
_ StreamOperator
Merge [ExpQ]
_ String
_ String
_ Double
_) []) = do
    -- XXX extend to >2 incoming streams?
    let n1 :: Int
n1 = Int
n Int -> Int -> Int
forall a. Integral a => a -> a -> a
`div` Int
2
    let n2 :: Int
n2 = Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
n1

    -- XXX determine type from one child, force the other
    Node StreamVertex
_ [Tree StreamVertex]
children  <- Int -> Tree StreamVertex -> Gen (Tree StreamVertex)
extendTree Int
n1 Tree StreamVertex
fakeroot
    Node StreamVertex
_ [Tree StreamVertex]
children' <- Int -> Tree StreamVertex -> Gen (Tree StreamVertex)
extendTree Int
n2 Tree StreamVertex
fakeroot

    Tree StreamVertex -> Gen (Tree StreamVertex)
forall (m :: * -> *) a. Monad m => a -> m a
return (Tree StreamVertex -> Gen (Tree StreamVertex))
-> Tree StreamVertex -> Gen (Tree StreamVertex)
forall a b. (a -> b) -> a -> b
$ StreamVertex -> [Tree StreamVertex] -> Tree StreamVertex
forall a. a -> [Tree a] -> Tree a
Node StreamVertex
v ([Tree StreamVertex]
children [Tree StreamVertex] -> [Tree StreamVertex] -> [Tree StreamVertex]
forall a. [a] -> [a] -> [a]
++ ((Tree StreamVertex -> Tree StreamVertex)
-> [Tree StreamVertex] -> [Tree StreamVertex]
forall a b. (a -> b) -> [a] -> [b]
map ((StreamVertex -> StreamVertex)
-> Tree StreamVertex -> Tree StreamVertex
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Int -> StreamVertex -> StreamVertex
incrId Int
n1)) [Tree StreamVertex]
children'))

-- for debugging in GHCi
draw :: Gen (Tree StreamVertex) -> IO ()
draw :: Gen (Tree StreamVertex) -> IO ()
draw Gen (Tree StreamVertex)
g = Gen (Tree StreamVertex) -> IO (Tree StreamVertex)
forall a. Gen a -> IO a
generate Gen (Tree StreamVertex)
g IO (Tree StreamVertex) -> (Tree StreamVertex -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= String -> IO ()
putStrLn (String -> IO ())
-> (Tree StreamVertex -> String) -> Tree StreamVertex -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Tree String -> String
drawTree (Tree String -> String)
-> (Tree StreamVertex -> Tree String)
-> Tree StreamVertex
-> String
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (StreamVertex -> String) -> Tree StreamVertex -> Tree String
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap StreamVertex -> String
forall a. Show a => a -> String
show

prop_noShortJoin :: Property
prop_noShortJoin = Gen (Tree StreamVertex) -> (Tree StreamVertex -> Bool) -> Property
forall a prop.
(Show a, Testable prop) =>
Gen a -> (a -> prop) -> Property
forAll (Int -> Gen (Tree StreamVertex)
treegraph' Int
3) ((Tree StreamVertex -> Bool) -> Property)
-> (Tree StreamVertex -> Bool) -> Property
forall a b. (a -> b) -> a -> b
$
    Bool -> Bool
not (Bool -> Bool)
-> (Tree StreamVertex -> Bool) -> Tree StreamVertex -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamOperator -> [StreamOperator] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
elem StreamOperator
Join ([StreamOperator] -> Bool)
-> (Tree StreamVertex -> [StreamOperator])
-> Tree StreamVertex
-> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (StreamVertex -> StreamOperator)
-> [StreamVertex] -> [StreamOperator]
forall a b. (a -> b) -> [a] -> [b]
map StreamVertex -> StreamOperator
operator ([StreamVertex] -> [StreamOperator])
-> (Tree StreamVertex -> [StreamVertex])
-> Tree StreamVertex
-> [StreamOperator]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Tree StreamVertex -> [StreamVertex]
forall a. Tree a -> [a]
flatten

prop_noJoinExpand :: Property
prop_noJoinExpand = Gen (Tree StreamVertex) -> (Tree StreamVertex -> Bool) -> Property
forall a prop.
(Show a, Testable prop) =>
Gen a -> (a -> prop) -> Property
forAll (((StreamVertex -> StreamVertex)
-> Tree StreamVertex -> Tree StreamVertex
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (StreamOperator -> StreamVertex -> StreamVertex
chOp Expand)) <$> (treegraph' 1) >>= extendTree 3) $
    StreamOperator -> StreamOperator -> Bool
forall a. Eq a => a -> a -> Bool
(/=) StreamOperator
Join (StreamOperator -> Bool)
-> (Tree StreamVertex -> StreamOperator)
-> Tree StreamVertex
-> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamVertex -> StreamOperator
operator (StreamVertex -> StreamOperator)
-> (Tree StreamVertex -> StreamVertex)
-> Tree StreamVertex
-> StreamOperator
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Tree StreamVertex -> StreamVertex
forall a. Tree a -> a
rootLabel (Tree StreamVertex -> StreamVertex)
-> (Tree StreamVertex -> Tree StreamVertex)
-> Tree StreamVertex
-> StreamVertex
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [Tree StreamVertex] -> Tree StreamVertex
forall a. [a] -> a
head ([Tree StreamVertex] -> Tree StreamVertex)
-> (Tree StreamVertex -> [Tree StreamVertex])
-> Tree StreamVertex
-> Tree StreamVertex
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Tree StreamVertex -> [Tree StreamVertex]
forall a. Tree a -> [Tree a]
subForest