{-# OPTIONS_GHC -F -pgmF htfpp #-}
{-# OPTIONS_HADDOCK prune #-}
{-# LANGUAGE TemplateHaskell #-}
{-|
Module      : Striot.Bandwidth
Description : StrIoT Bandwidth Cost Model calculations
Copyright   : © StrIoT maintainers, 2022
License     : Apache 2.0
Maintainer  : StrIoT maintainers
Stability   : experimental

Experimental routines for reasoning about bandwidth.

-}
module Striot.Bandwidth ( howBig
                        , knownEventSizes
                        , departRate
                        , whatBandwidth
                        , whatBandwidthWeighted
                        , connectedToSources
                        , overBandwidthLimit
                        ) where

import Striot.CompileIoT
import Striot.FunctionalIoTtypes
import Striot.FunctionalProcessing
import Striot.StreamGraph
import Striot.Jackson

import Algebra.Graph
import Data.Maybe (fromJust, mapMaybe, catMaybes)
import Data.Time (addUTCTime) -- UTCTime (..),addUTCTime,diffUTCTime,NominalDiffTime,picosecondsToDiffTime, Day (..))
import Data.Store (Store)
import Test.Framework
import Data.Function ((&))
import Data.List.Unicode

import qualified Data.Store.Streaming as SS
import qualified Data.ByteString as B

--import Data.Time

-- Given an Event, how big is it in bytes for on-wire
-- transfer?
howBig :: Store a => Event a -> Int
howBig :: forall a. Store a => Event a -> Partition
howBig = ByteString -> Partition
B.length (ByteString -> Partition)
-> (Event a -> ByteString) -> Event a -> Partition
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Message (Event a) -> ByteString
forall a. Store a => Message a -> ByteString
SS.encodeMessage (Message (Event a) -> ByteString)
-> (Event a -> Message (Event a)) -> Event a -> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Event a -> Message (Event a)
forall a. a -> Message a
SS.Message

-- except StreamGraph does not have Events in it, the types are in
-- Strings.
-- from the POV of a StreamGraph we do not know if Events have timestamps
-- so assume they do. The following figures calculated using the above

e :: Store a => a -> Int
e :: forall a. Store a => a -> Partition
e = Event a -> Partition
forall a. Store a => Event a -> Partition
howBig (Event a -> Partition) -> (a -> Event a) -> a -> Partition
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Maybe UTCTime -> Maybe a -> Event a
forall alpha. Maybe UTCTime -> Maybe alpha -> Event alpha
Event (UTCTime -> Maybe UTCTime
forall a. a -> Maybe a
Just (NominalDiffTime -> UTCTime -> UTCTime
addUTCTime NominalDiffTime
0 (String -> UTCTime
forall a. Read a => String -> a
read String
"2013-01-01 00:00:00 +0000"))) (Maybe a -> Event a) -> (a -> Maybe a) -> a -> Event a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Maybe a
forall a. a -> Maybe a
Just

-- these are types copied from examples/wearable. Longer term we should
-- accept user-provided event sizes
type AccelVal = Int
type Accelerometer = (AccelVal,AccelVal,AccelVal) -- X,Y,Z accelerometer values
type Vibe = Int
type PebbleMode60 = (Accelerometer,Vibe)

-- limited to boxed types, and non-list types. Very limited!
knownEventSizes :: [(String, Partition)]
knownEventSizes =
  [ (String
"Int",           Partition -> Partition
forall a. Store a => a -> Partition
e (Partition
2 :: Int))
  , (String
"Double",        Double -> Partition
forall a. Store a => a -> Partition
e (Double
2 :: Double))
  , (String
"Char",          Char -> Partition
forall a. Store a => a -> Partition
e Char
'c')
  , (String
"String1",       String -> Partition
forall a. Store a => a -> Partition
e String
"c")
  , (String
"String2",       String -> Partition
forall a. Store a => a -> Partition
e String
"cc")
  , (String
"String3",       String -> Partition
forall a. Store a => a -> Partition
e String
"ccc")
  , (String
"(Int,Int,Int)", (Partition, Partition, Partition) -> Partition
forall a. Store a => a -> Partition
e ((Partition
1,Partition
2,Partition
3)::(Int,Int,Int)))
  , (String
"PebbleMode60",  PebbleMode60 -> Partition
forall a. Store a => a -> Partition
e (((Partition
0,Partition
0,Partition
0),Partition
0) :: PebbleMode60))
  ]

-- XXX copy in Orchestration too. put in Util
toFst :: (a -> b) -> a -> (b, a)
toFst :: forall a b. (a -> b) -> a -> (b, a)
toFst a -> b
f a
a = (a -> b
f a
a, a
a)

------------------------------------------------------------------------------
-- test data

v1 :: StreamVertex
v1 = Partition
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Partition
1 (Double -> StreamOperator
Source Double
2) []    String
"Int" String
"Int" Double
0
v2 :: StreamVertex
v2 = Partition
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Partition
2 StreamOperator
Map    [[| id |]]        String
"Int" String
"Int" Double
1
v3 :: StreamVertex
v3 = Partition
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Partition
3 (Double -> StreamOperator
Source Double
1) []    String
"Int" String
"Int" Double
2
v4 :: StreamVertex
v4 = Partition
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Partition
4 StreamOperator
Map    [[| id |]]        String
"Int" String
"Int" Double
3
v5 :: StreamVertex
v5 = Partition
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Partition
5 StreamOperator
Merge  []                String
"[Int]" String
"Int" Double
4
v6 :: StreamVertex
v6 = Partition
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Partition
6 StreamOperator
Sink   [[| mapM_ print|]] String
"Int" String
"IO ()" Double
5
graph :: StreamGraph
graph :: StreamGraph
graph = StreamGraph -> StreamGraph -> StreamGraph
forall a. Graph a -> Graph a -> Graph a
overlay ([StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path [StreamVertex
v3, StreamVertex
v4, StreamVertex
v5]) ([StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path [StreamVertex
v1, StreamVertex
v2, StreamVertex
v5, StreamVertex
v6])

------------------------------------------------------------------------------
-- arrival/departure time

parentsOf :: StreamGraph -> Int -> [Int]
parentsOf :: StreamGraph -> Partition -> [Partition]
parentsOf = (Partition -> StreamGraph -> [Partition])
-> StreamGraph -> Partition -> [Partition]
forall a b c. (a -> b -> c) -> b -> a -> c
flip Partition -> StreamGraph -> [Partition]
f where
  f :: Partition -> StreamGraph -> [Partition]
f Partition
i = ((StreamVertex, StreamVertex) -> Partition)
-> [(StreamVertex, StreamVertex)] -> [Partition]
forall a b. (a -> b) -> [a] -> [b]
map (StreamVertex -> Partition
vertexId (StreamVertex -> Partition)
-> ((StreamVertex, StreamVertex) -> StreamVertex)
-> (StreamVertex, StreamVertex)
-> Partition
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (StreamVertex, StreamVertex) -> StreamVertex
forall a b. (a, b) -> a
fst) ([(StreamVertex, StreamVertex)] -> [Partition])
-> (StreamGraph -> [(StreamVertex, StreamVertex)])
-> StreamGraph
-> [Partition]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ((StreamVertex, StreamVertex) -> Bool)
-> [(StreamVertex, StreamVertex)] -> [(StreamVertex, StreamVertex)]
forall a. (a -> Bool) -> [a] -> [a]
filter ((Partition -> Partition -> Bool
forall a. Eq a => a -> a -> Bool
==Partition
i) (Partition -> Bool)
-> ((StreamVertex, StreamVertex) -> Partition)
-> (StreamVertex, StreamVertex)
-> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamVertex -> Partition
vertexId (StreamVertex -> Partition)
-> ((StreamVertex, StreamVertex) -> StreamVertex)
-> (StreamVertex, StreamVertex)
-> Partition
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (StreamVertex, StreamVertex) -> StreamVertex
forall a b. (a, b) -> b
snd) ([(StreamVertex, StreamVertex)] -> [(StreamVertex, StreamVertex)])
-> (StreamGraph -> [(StreamVertex, StreamVertex)])
-> StreamGraph
-> [(StreamVertex, StreamVertex)]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamGraph -> [(StreamVertex, StreamVertex)]
forall a. Ord a => Graph a -> [(a, a)]
edgeList

departRate :: StreamGraph -> Int -> Double
departRate :: StreamGraph -> Partition -> Double
departRate StreamGraph
g Partition
i = let
  vs :: [(Partition, StreamVertex)]
vs = (StreamVertex -> (Partition, StreamVertex))
-> [StreamVertex] -> [(Partition, StreamVertex)]
forall a b. (a -> b) -> [a] -> [b]
map ((StreamVertex -> Partition)
-> StreamVertex -> (Partition, StreamVertex)
forall a b. (a -> b) -> a -> (b, a)
toFst StreamVertex -> Partition
vertexId) (StreamGraph -> [StreamVertex]
forall a. Ord a => Graph a -> [a]
vertexList StreamGraph
g)
  v :: StreamVertex
v  = (Maybe StreamVertex -> StreamVertex
forall a. HasCallStack => Maybe a -> a
fromJust (Maybe StreamVertex -> StreamVertex)
-> ([(Partition, StreamVertex)] -> Maybe StreamVertex)
-> [(Partition, StreamVertex)]
-> StreamVertex
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Partition -> [(Partition, StreamVertex)] -> Maybe StreamVertex
forall a b. Eq a => a -> [(a, b)] -> Maybe b
lookup Partition
i) [(Partition, StreamVertex)]
vs

  ps :: [Partition]
ps = StreamGraph -> Partition -> [Partition]
parentsOf StreamGraph
g Partition
i
  p :: Partition
p  = [Partition] -> Partition
forall a. [a] -> a
head [Partition]
ps
  p2 :: Partition
p2 = [Partition] -> Partition
forall a. [a] -> a
last [Partition]
ps

  in case StreamVertex -> StreamOperator
operator StreamVertex
v of
    Source Double
d    -> Double
d
    StreamOperator
Merge       -> [Double] -> Double
forall (t :: * -> *) a. (Foldable t, Num a) => t a -> a
sum ((Partition -> Double) -> [Partition] -> [Double]
forall a b. (a -> b) -> [a] -> [b]
map (StreamGraph -> Partition -> Double
departRate StreamGraph
g) [Partition]
ps)
    StreamOperator
Join        -> Double -> Double -> Double
forall a. Ord a => a -> a -> a
min (StreamGraph -> Partition -> Double
departRate StreamGraph
g Partition
p) (StreamGraph -> Partition -> Double
departRate StreamGraph
g Partition
p2)
    Filter Double
r    -> Double
r Double -> Double -> Double
forall a. Num a => a -> a -> a
* (StreamGraph -> Partition -> Double
departRate StreamGraph
g Partition
p)
    FilterAcc Double
r -> Double
r Double -> Double -> Double
forall a. Num a => a -> a -> a
* (StreamGraph -> Partition -> Double
departRate StreamGraph
g Partition
p)

    StreamOperator
Window      -> let params :: [String]
params = (String -> [String]
words (String -> [String])
-> (StreamVertex -> String) -> StreamVertex -> [String]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ExpQ -> String
showParam (ExpQ -> String)
-> (StreamVertex -> ExpQ) -> StreamVertex -> String
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [ExpQ] -> ExpQ
forall a. [a] -> a
head ([ExpQ] -> ExpQ)
-> (StreamVertex -> [ExpQ]) -> StreamVertex -> ExpQ
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamVertex -> [ExpQ]
parameters) StreamVertex
v in
                   if   String
"chopTime" String -> String -> Bool
forall a. Eq a => a -> a -> Bool
== [String] -> String
forall a. [a] -> a
head [String]
params
                   then Double
1 Double -> Double -> Double
forall a. Fractional a => a -> a -> a
/ String -> Double
forall a. Read a => String -> a
read ([String]
params [String] -> Partition -> String
forall a. [a] -> Partition -> a
!! Partition
1)
                   else StreamGraph -> Partition -> Double
departRate StreamGraph
g Partition
p

    StreamOperator
_           -> StreamGraph -> Partition -> Double
departRate StreamGraph
g Partition
p

v7 :: StreamVertex
v7 = Partition
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Partition
7 (Double -> StreamOperator
Filter Double
0.5) [] String
"Int" String
"Int" Double
7
v8 :: StreamVertex
v8 = Partition
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Partition
8 StreamOperator
Join [] String
"Int" String
"(Int, Int)" Double
8
graph2 :: StreamGraph
graph2 = StreamGraph -> StreamGraph -> StreamGraph
forall a. Graph a -> Graph a -> Graph a
overlay ([StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path [StreamVertex
v3, StreamVertex
v4, StreamVertex
v8]) ([StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path [StreamVertex
v1, StreamVertex
v2, StreamVertex
v8, StreamVertex
v7, StreamVertex
v6])

test_departRate_merge :: IO ()
test_departRate_merge = Double -> Double -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
assertEqual Double
3.0 (Double -> IO ()) -> Double -> IO ()
forall a b. (a -> b) -> a -> b
$ StreamGraph -> Partition -> Double
departRate StreamGraph
graph Partition
6
test_departRate_join :: IO ()
test_departRate_join  = Double -> Double -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
assertEqual Double
1.0 (Double -> IO ()) -> Double -> IO ()
forall a b. (a -> b) -> a -> b
$ StreamGraph -> Partition -> Double
departRate StreamGraph
graph2 Partition
8
test_departRate_filter :: IO ()
test_departRate_filter= Double -> Double -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
assertEqual Double
0.5 (Double -> IO ()) -> Double -> IO ()
forall a b. (a -> b) -> a -> b
$ StreamGraph -> Partition -> Double
departRate StreamGraph
graph2 Partition
7

v9 :: StreamVertex
v9 = Partition
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Partition
9 StreamOperator
Window [[| chopTime 120 |]] String
"a" String
"[a]" Double
9

graph3 :: StreamGraph
graph3 = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path [StreamVertex
v1, StreamVertex
v2, StreamVertex
v9, StreamVertex
v7, StreamVertex
v6]

test_departRate_window :: IO ()
test_departRate_window = Double -> Double -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
assertEqual (Double
1Double -> Double -> Double
forall a. Fractional a => a -> a -> a
/Double
120) (Double -> IO ()) -> Double -> IO ()
forall a b. (a -> b) -> a -> b
$ StreamGraph -> Partition -> Double
departRate StreamGraph
graph3 Partition
9

------------------------------------------------------------------------------

-- what is the bandwidth out of a StreamVertex
whatBandwidth :: StreamGraph -> Int -> Maybe Double
whatBandwidth :: StreamGraph -> Partition -> Maybe Double
whatBandwidth StreamGraph
g Partition
i =
  let v :: StreamVertex
v = ([StreamVertex] -> StreamVertex
forall a. [a] -> a
head ([StreamVertex] -> StreamVertex)
-> (StreamGraph -> [StreamVertex]) -> StreamGraph -> StreamVertex
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (StreamVertex -> Bool) -> [StreamVertex] -> [StreamVertex]
forall a. (a -> Bool) -> [a] -> [a]
filter ((Partition -> Partition -> Bool
forall a. Eq a => a -> a -> Bool
==Partition
i) (Partition -> Bool)
-> (StreamVertex -> Partition) -> StreamVertex -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamVertex -> Partition
vertexId) ([StreamVertex] -> [StreamVertex])
-> (StreamGraph -> [StreamVertex]) -> StreamGraph -> [StreamVertex]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamGraph -> [StreamVertex]
forall a. Ord a => Graph a -> [a]
vertexList) StreamGraph
g
  in if StreamOperator
Window StreamOperator -> StreamOperator -> Bool
forall a. Eq a => a -> a -> Bool
== StreamVertex -> StreamOperator
operator StreamVertex
v
     then StreamGraph -> Partition -> Maybe Double
whatBandwidth StreamGraph
g (Partition -> Maybe Double) -> Partition -> Maybe Double
forall a b. (a -> b) -> a -> b
$ [Partition] -> Partition
forall a. [a] -> a
head ([Partition] -> Partition) -> [Partition] -> Partition
forall a b. (a -> b) -> a -> b
$ StreamGraph -> Partition -> [Partition]
parentsOf StreamGraph
g Partition
i
     else
       let outrate :: Double
outrate = StreamGraph -> Partition -> Double
departRate StreamGraph
g Partition
i
       in (Partition -> Double) -> Maybe Partition -> Maybe Double
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((Double -> Double -> Double
forall a. Num a => a -> a -> a
*Double
outrate) (Double -> Double) -> (Partition -> Double) -> Partition -> Double
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Partition -> Double
forall a b. (Integral a, Num b) => a -> b
fromIntegral) (String -> [(String, Partition)] -> Maybe Partition
forall a b. Eq a => a -> [(a, b)] -> Maybe b
lookup (StreamVertex -> String
outtype StreamVertex
v) [(String, Partition)]
knownEventSizes)


-- applies a rate-based overhead weighting
weighting :: Double
weighting = Double
2.0
whatBandwidthWeighted :: StreamGraph -> Partition -> Maybe Double
whatBandwidthWeighted StreamGraph
g Partition
i = (Double -> Double) -> Maybe Double -> Maybe Double
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Double -> Double -> Double
forall a. Num a => a -> a -> a
+ (StreamGraph -> Partition -> Double
departRate StreamGraph
g Partition
i Double -> Double -> Double
forall a. Num a => a -> a -> a
* Double
weighting)) (StreamGraph -> Partition -> Maybe Double
whatBandwidth StreamGraph
g Partition
i)

-- XXX: write an "departSize"? we could estimate window sizes for fixed-length
-- or time-bound windows for example. Joins approx double, etc

-- | Does this StreamGraph breach a bandwidth limit?
overBandwidthLimit :: StreamGraph -> PartitionMap -> Double -> Bool
overBandwidthLimit :: StreamGraph -> PartitionMap -> Double -> Bool
overBandwidthLimit StreamGraph
sg PartitionMap
pm Double
bandwidthLimit = let
  sourceIds :: [Partition]
sourceIds = ((StreamVertex -> Partition) -> [StreamVertex] -> [Partition]
forall a b. (a -> b) -> [a] -> [b]
map StreamVertex -> Partition
vertexId ([StreamVertex] -> [Partition])
-> (StreamGraph -> [StreamVertex]) -> StreamGraph -> [Partition]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (StreamVertex -> Bool) -> [StreamVertex] -> [StreamVertex]
forall a. (a -> Bool) -> [a] -> [a]
filter (StreamOperator -> Bool
isSource (StreamOperator -> Bool)
-> (StreamVertex -> StreamOperator) -> StreamVertex -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamVertex -> StreamOperator
operator) ([StreamVertex] -> [StreamVertex])
-> (StreamGraph -> [StreamVertex]) -> StreamGraph -> [StreamVertex]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamGraph -> [StreamVertex]
forall a. Ord a => Graph a -> [a]
vertexList) StreamGraph
sg
  connected :: [Partition]
connected = [Partition] -> PartitionMap -> [Partition]
connectedToSources [Partition]
sourceIds PartitionMap
pm

  in StreamGraph -> [(StreamVertex, StreamVertex)]
forall a. Ord a => Graph a -> [(a, a)]
edgeList StreamGraph
sg
    [(StreamVertex, StreamVertex)]
-> ([(StreamVertex, StreamVertex)]
    -> [(StreamVertex, StreamVertex)])
-> [(StreamVertex, StreamVertex)]
forall a b. a -> (a -> b) -> b
& ((StreamVertex, StreamVertex) -> Bool)
-> [(StreamVertex, StreamVertex)] -> [(StreamVertex, StreamVertex)]
forall a. (a -> Bool) -> [a] -> [a]
filter (([Partition]
connected [Partition] -> Partition -> Bool
forall α. Eq α => [α] -> α -> Bool
) (Partition -> Bool)
-> ((StreamVertex, StreamVertex) -> Partition)
-> (StreamVertex, StreamVertex)
-> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamVertex -> Partition
vertexId (StreamVertex -> Partition)
-> ((StreamVertex, StreamVertex) -> StreamVertex)
-> (StreamVertex, StreamVertex)
-> Partition
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (StreamVertex, StreamVertex) -> StreamVertex
forall a b. (a, b) -> a
fst) -- edges from source Partition
    [(StreamVertex, StreamVertex)]
-> ([(StreamVertex, StreamVertex)]
    -> [(StreamVertex, StreamVertex)])
-> [(StreamVertex, StreamVertex)]
forall a b. a -> (a -> b) -> b
& ((StreamVertex, StreamVertex) -> Bool)
-> [(StreamVertex, StreamVertex)] -> [(StreamVertex, StreamVertex)]
forall a. (a -> Bool) -> [a] -> [a]
filter (([Partition]
connected [Partition] -> Partition -> Bool
forall α. Eq α => [α] -> α -> Bool
) (Partition -> Bool)
-> ((StreamVertex, StreamVertex) -> Partition)
-> (StreamVertex, StreamVertex)
-> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamVertex -> Partition
vertexId (StreamVertex -> Partition)
-> ((StreamVertex, StreamVertex) -> StreamVertex)
-> (StreamVertex, StreamVertex)
-> Partition
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (StreamVertex, StreamVertex) -> StreamVertex
forall a b. (a, b) -> b
snd) -- not terminating there
    [(StreamVertex, StreamVertex)]
-> ([(StreamVertex, StreamVertex)] -> [StreamVertex])
-> [StreamVertex]
forall a b. a -> (a -> b) -> b
& ((StreamVertex, StreamVertex) -> StreamVertex)
-> [(StreamVertex, StreamVertex)] -> [StreamVertex]
forall a b. (a -> b) -> [a] -> [b]
map (StreamVertex, StreamVertex) -> StreamVertex
forall a b. (a, b) -> a
fst -- source vertex
    [StreamVertex] -> ([StreamVertex] -> [Bool]) -> [Bool]
forall a b. a -> (a -> b) -> b
& (StreamVertex -> Maybe Bool) -> [StreamVertex] -> [Bool]
forall a b. (a -> Maybe b) -> [a] -> [b]
mapMaybe ((Double -> Bool) -> Maybe Double -> Maybe Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Double -> Double -> Bool
forall a. Ord a => a -> a -> Bool
>Double
bandwidthLimit) (Maybe Double -> Maybe Bool)
-> (StreamVertex -> Maybe Double) -> StreamVertex -> Maybe Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamGraph -> Partition -> Maybe Double
whatBandwidthWeighted StreamGraph
sg (Partition -> Maybe Double)
-> (StreamVertex -> Partition) -> StreamVertex -> Maybe Double
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamVertex -> Partition
vertexId)
    [Bool] -> ([Bool] -> Bool) -> Bool
forall a b. a -> (a -> b) -> b
& [Bool] -> Bool
forall (t :: * -> *). Foldable t => t Bool -> Bool
or

-- XXX: tests or overBandwidthLimit

test_overBandwidthLimit :: IO ()
test_overBandwidthLimit = HasCallStack => Bool -> IO ()
Bool -> IO ()
assertBool (Bool -> IO ()) -> Bool -> IO ()
forall a b. (a -> b) -> a -> b
$ StreamGraph -> PartitionMap -> Double -> Bool
overBandwidthLimit StreamGraph
graph [[Partition
1,Partition
2],[Partition
3,Partition
4],[Partition
5,Partition
6]] Double
29

-- | Provide a flattened list of node IDs from a PartitionMap which are
-- connected to a source node within a partition.
connectedToSources :: [Partition] -> PartitionMap -> [Partition]
connectedToSources :: [Partition] -> PartitionMap -> [Partition]
connectedToSources [Partition]
sources =
  PartitionMap -> [Partition]
forall (t :: * -> *) a. Foldable t => t [a] -> [a]
concat (PartitionMap -> [Partition])
-> (PartitionMap -> PartitionMap) -> PartitionMap -> [Partition]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ([Partition] -> Bool) -> PartitionMap -> PartitionMap
forall a. (a -> Bool) -> [a] -> [a]
filter (Bool -> Bool
not (Bool -> Bool) -> ([Partition] -> Bool) -> [Partition] -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [Partition] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null ([Partition] -> Bool)
-> ([Partition] -> [Partition]) -> [Partition] -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Partition -> Bool) -> [Partition] -> [Partition]
forall a. (a -> Bool) -> [a] -> [a]
filter ([Partition]
sources [Partition] -> Partition -> Bool
forall α. Eq α => [α] -> α -> Bool
))

test_connectedToSources :: IO ()
test_connectedToSources = [Partition] -> [Partition] -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
assertEqual [Partition
1,Partition
2,Partition
3,Partition
4] ([Partition] -> IO ()) -> [Partition] -> IO ()
forall a b. (a -> b) -> a -> b
$ [Partition] -> PartitionMap -> [Partition]
connectedToSources [Partition
1,Partition
3] [[Partition
1,Partition
2],[Partition
3,Partition
4],[Partition
5,Partition
6,Partition
7]]
test_connectedToSources2 :: IO ()
test_connectedToSources2= [Partition] -> [Partition] -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
assertEqual [Partition
1,Partition
2]     ([Partition] -> IO ()) -> [Partition] -> IO ()
forall a b. (a -> b) -> a -> b
$ [Partition] -> PartitionMap -> [Partition]
connectedToSources   [Partition
1] [[Partition
1,Partition
2],[Partition
3,Partition
4],[Partition
5,Partition
6,Partition
7]]
test_connectedToSources3 :: IO ()
test_connectedToSources3= [Partition] -> [Partition] -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
assertEqual [Partition
5,Partition
6,Partition
7]   ([Partition] -> IO ()) -> [Partition] -> IO ()
forall a b. (a -> b) -> a -> b
$ [Partition] -> PartitionMap -> [Partition]
connectedToSources   [Partition
7] [[Partition
1,Partition
2],[Partition
3,Partition
4],[Partition
5,Partition
6,Partition
7]]
test_connectedToSources4 :: IO ()
test_connectedToSources4= [Partition] -> [Partition] -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
assertEqual []        ([Partition] -> IO ()) -> [Partition] -> IO ()
forall a b. (a -> b) -> a -> b
$ [Partition] -> PartitionMap -> [Partition]
connectedToSources   [Partition
0] [[Partition
1,Partition
2],[Partition
3,Partition
4],[Partition
5,Partition
6,Partition
7]]