{-# OPTIONS_GHC -F -pgmF htfpp #-}
{-# OPTIONS_HADDOCK prune #-}
module Striot.FunctionalProcessing (
streamFilter
, streamFilterAcc
, streamMap
, streamScan
, streamWindow
, streamExpand
, streamMerge
, streamJoin
, WindowMaker
, WindowAggregator
, sliding
, slidingTime
, chop
, chopTime
, streamWindowAggregate
, streamJoinE
, streamJoinW
, complete
, EventFilter
, EventMap
, JoinFilter
, JoinMap
, filterAcc
, htf_thisModulesTests) where
import Striot.FunctionalIoTtypes
import Data.Time (UTCTime (..),addUTCTime,diffUTCTime,NominalDiffTime,picosecondsToDiffTime, Day (..))
import Test.Framework
type EventFilter alpha = alpha -> Bool
streamFilter :: EventFilter alpha -> Stream alpha -> Stream alpha
streamFilter :: forall alpha. EventFilter alpha -> Stream alpha -> Stream alpha
streamFilter EventFilter alpha
ff Stream alpha
s = (Event alpha -> Bool) -> Stream alpha -> Stream alpha
forall a. (a -> Bool) -> [a] -> [a]
filter (\(Event Maybe UTCTime
t Maybe alpha
v) -> case Maybe alpha
v of
Just alpha
val -> EventFilter alpha
ff alpha
val
Maybe alpha
Nothing -> Bool
True)
Stream alpha
s
type EventMap alpha beta = alpha -> beta
streamMap :: EventMap alpha beta -> Stream alpha -> Stream beta
streamMap :: forall alpha beta.
EventMap alpha beta -> Stream alpha -> Stream beta
streamMap EventMap alpha beta
fm Stream alpha
s = (Event alpha -> Event beta) -> Stream alpha -> [Event beta]
forall a b. (a -> b) -> [a] -> [b]
map (\(Event Maybe UTCTime
t Maybe alpha
v) -> case Maybe alpha
v of
Just alpha
val -> Maybe UTCTime -> Maybe beta -> Event beta
forall alpha. Maybe UTCTime -> Maybe alpha -> Event alpha
Event Maybe UTCTime
t (beta -> Maybe beta
forall a. a -> Maybe a
Just (EventMap alpha beta
fm alpha
val))
Maybe alpha
Nothing -> Maybe UTCTime -> Maybe beta -> Event beta
forall alpha. Maybe UTCTime -> Maybe alpha -> Event alpha
Event Maybe UTCTime
t Maybe beta
forall a. Maybe a
Nothing )
Stream alpha
s
type WindowMaker alpha = Stream alpha -> [Stream alpha]
type WindowAggregator alpha beta = [alpha] -> beta
streamWindow :: WindowMaker alpha -> Stream alpha -> Stream [alpha]
streamWindow :: forall alpha. WindowMaker alpha -> Stream alpha -> Stream [alpha]
streamWindow WindowMaker alpha
fwm Stream alpha
s = [Stream alpha] -> Stream [alpha]
forall alpha. [Stream alpha] -> Stream [alpha]
mapWindowId (WindowMaker alpha
fwm Stream alpha
s)
where getVals :: Stream alpha -> [alpha]
getVals :: forall alpha. Stream alpha -> [alpha]
getVals Stream alpha
s' = (Event alpha -> alpha) -> Stream alpha -> [alpha]
forall a b. (a -> b) -> [a] -> [b]
map (\(Event Maybe UTCTime
_ (Just alpha
val))->alpha
val) (Stream alpha -> [alpha]) -> Stream alpha -> [alpha]
forall a b. (a -> b) -> a -> b
$ (Event alpha -> Bool) -> Stream alpha -> Stream alpha
forall a. (a -> Bool) -> [a] -> [a]
filter Event alpha -> Bool
forall alpha. Event alpha -> Bool
dataEvent Stream alpha
s'
mapWindowId :: [Stream alpha] -> Stream [alpha]
mapWindowId :: forall alpha. [Stream alpha] -> Stream [alpha]
mapWindowId [] = []
mapWindowId (Stream alpha
x:[Stream alpha]
xs) =
case Stream alpha
x of
Event Maybe UTCTime
t Maybe alpha
_ : Stream alpha
_ -> Maybe UTCTime -> Maybe [alpha] -> Event [alpha]
forall alpha. Maybe UTCTime -> Maybe alpha -> Event alpha
Event Maybe UTCTime
t ([alpha] -> Maybe [alpha]
forall a. a -> Maybe a
Just (Stream alpha -> [alpha]
forall alpha. Stream alpha -> [alpha]
getVals Stream alpha
x)) Event [alpha] -> [Event [alpha]] -> [Event [alpha]]
forall a. a -> [a] -> [a]
: [Stream alpha] -> [Event [alpha]]
forall alpha. [Stream alpha] -> Stream [alpha]
mapWindowId [Stream alpha]
xs
[] -> Maybe UTCTime -> Maybe [alpha] -> Event [alpha]
forall alpha. Maybe UTCTime -> Maybe alpha -> Event alpha
Event Maybe UTCTime
forall a. Maybe a
Nothing ([alpha] -> Maybe [alpha]
forall a. a -> Maybe a
Just []) Event [alpha] -> [Event [alpha]] -> [Event [alpha]]
forall a. a -> [a] -> [a]
: [Stream alpha] -> [Event [alpha]]
forall alpha. [Stream alpha] -> Stream [alpha]
mapWindowId [Stream alpha]
xs
streamWindowAggregate :: WindowMaker alpha -> WindowAggregator alpha beta -> Stream alpha -> Stream beta
streamWindowAggregate :: forall alpha beta.
WindowMaker alpha
-> WindowAggregator alpha beta -> Stream alpha -> Stream beta
streamWindowAggregate WindowMaker alpha
fwm WindowAggregator alpha beta
fwa Stream alpha
s = WindowAggregator alpha beta -> Stream [alpha] -> Stream beta
forall alpha beta.
EventMap alpha beta -> Stream alpha -> Stream beta
streamMap WindowAggregator alpha beta
fwa (Stream [alpha] -> Stream beta) -> Stream [alpha] -> Stream beta
forall a b. (a -> b) -> a -> b
$ WindowMaker alpha -> Stream alpha -> Stream [alpha]
forall alpha. WindowMaker alpha -> Stream alpha -> Stream [alpha]
streamWindow WindowMaker alpha
fwm Stream alpha
s
sliding :: Int -> WindowMaker alpha
sliding :: forall alpha. Int -> WindowMaker alpha
sliding Int
wLength Stream alpha
s = Int -> WindowMaker alpha
forall alpha. Int -> WindowMaker alpha
sliding' Int
wLength WindowMaker alpha -> WindowMaker alpha
forall a b. (a -> b) -> a -> b
$ (Event alpha -> Bool) -> Stream alpha -> Stream alpha
forall a. (a -> Bool) -> [a] -> [a]
filter Event alpha -> Bool
forall alpha. Event alpha -> Bool
dataEvent Stream alpha
s
where sliding':: Int -> WindowMaker alpha
sliding' :: forall alpha. Int -> WindowMaker alpha
sliding' Int
wLength [] = []
sliding' Int
wLength s :: [Event alpha]
s@(Event alpha
h:[Event alpha]
t) = (Int -> [Event alpha] -> [Event alpha]
forall a. Int -> [a] -> [a]
take Int
wLength [Event alpha]
s) [Event alpha] -> [[Event alpha]] -> [[Event alpha]]
forall a. a -> [a] -> [a]
: Int -> WindowMaker alpha
forall alpha. Int -> WindowMaker alpha
sliding' Int
wLength [Event alpha]
t
slidingTime:: Int -> WindowMaker alpha
slidingTime :: forall alpha. Int -> WindowMaker alpha
slidingTime Int
tLength Stream alpha
s = NominalDiffTime -> Stream alpha -> [Stream alpha]
forall alpha. NominalDiffTime -> Stream alpha -> [Stream alpha]
slidingTime' (Int -> NominalDiffTime
milliToTimeDiff Int
tLength) (Stream alpha -> [Stream alpha]) -> Stream alpha -> [Stream alpha]
forall a b. (a -> b) -> a -> b
$ (Event alpha -> Bool) -> Stream alpha -> Stream alpha
forall a. (a -> Bool) -> [a] -> [a]
filter Event alpha -> Bool
forall alpha. Event alpha -> Bool
timedEvent Stream alpha
s
where slidingTime':: NominalDiffTime -> Stream alpha -> [Stream alpha]
slidingTime' :: forall alpha. NominalDiffTime -> Stream alpha -> [Stream alpha]
slidingTime' NominalDiffTime
tLen [] = []
slidingTime' NominalDiffTime
tLen s :: [Event alpha]
s@(Event (Just UTCTime
t) Maybe alpha
_:[Event alpha]
xs) = (UTCTime -> [Event alpha] -> [Event alpha]
forall alpha. UTCTime -> Stream alpha -> Stream alpha
takeTime (NominalDiffTime -> UTCTime -> UTCTime
addUTCTime NominalDiffTime
tLen UTCTime
t) [Event alpha]
s) [Event alpha] -> [[Event alpha]] -> [[Event alpha]]
forall a. a -> [a] -> [a]
: NominalDiffTime -> [Event alpha] -> [[Event alpha]]
forall alpha. NominalDiffTime -> Stream alpha -> [Stream alpha]
slidingTime' NominalDiffTime
tLen [Event alpha]
xs
takeTime:: UTCTime -> Stream alpha -> Stream alpha
takeTime :: forall alpha. UTCTime -> Stream alpha -> Stream alpha
takeTime UTCTime
endTime [] = []
takeTime UTCTime
endTime (e :: Event alpha
e@(Event (Just UTCTime
t) Maybe alpha
_):[Event alpha]
xs) | UTCTime
t UTCTime -> UTCTime -> Bool
forall a. Ord a => a -> a -> Bool
< UTCTime
endTime = Event alpha
e Event alpha -> [Event alpha] -> [Event alpha]
forall a. a -> [a] -> [a]
: UTCTime -> [Event alpha] -> [Event alpha]
forall alpha. UTCTime -> Stream alpha -> Stream alpha
takeTime UTCTime
endTime [Event alpha]
xs
| Bool
otherwise = []
milliToTimeDiff :: Int -> NominalDiffTime
milliToTimeDiff :: Int -> NominalDiffTime
milliToTimeDiff Int
x = Int -> NominalDiffTime
forall a. Enum a => Int -> a
toEnum (Int
x Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
10 Int -> Integer -> Int
forall a b. (Num a, Integral b) => a -> b -> a
^ Integer
9)
chop :: Int -> WindowMaker alpha
chop :: forall alpha. Int -> WindowMaker alpha
chop Int
wLength Stream alpha
s = Int -> Stream alpha -> [Stream alpha]
forall {a}. Int -> [a] -> [[a]]
chop' Int
wLength (Stream alpha -> [Stream alpha]) -> Stream alpha -> [Stream alpha]
forall a b. (a -> b) -> a -> b
$ (Event alpha -> Bool) -> Stream alpha -> Stream alpha
forall a. (a -> Bool) -> [a] -> [a]
filter Event alpha -> Bool
forall alpha. Event alpha -> Bool
dataEvent Stream alpha
s
where chop' :: Int -> [a] -> [[a]]
chop' Int
wLength [] = []
chop' Int
wLength [a]
s = [a]
w[a] -> [[a]] -> [[a]]
forall a. a -> [a] -> [a]
:(Int -> [a] -> [[a]]
chop' Int
wLength [a]
r) where ([a]
w,[a]
r) = Int -> [a] -> ([a], [a])
forall a. Int -> [a] -> ([a], [a])
splitAt Int
wLength [a]
s
chopTime :: Int -> WindowMaker alpha
chopTime :: forall alpha. Int -> WindowMaker alpha
chopTime Int
_ [] = []
chopTime Int
tLength s :: [Event alpha]
s@((Event (Just UTCTime
t) Maybe alpha
_):[Event alpha]
_) = NominalDiffTime -> UTCTime -> WindowMaker alpha
forall alpha. NominalDiffTime -> UTCTime -> WindowMaker alpha
chopTime' (Int -> NominalDiffTime
milliToTimeDiff Int
tLength) UTCTime
t WindowMaker alpha -> WindowMaker alpha
forall a b. (a -> b) -> a -> b
$ (Event alpha -> Bool) -> [Event alpha] -> [Event alpha]
forall a. (a -> Bool) -> [a] -> [a]
filter Event alpha -> Bool
forall alpha. Event alpha -> Bool
timedEvent [Event alpha]
s
where chopTime' :: NominalDiffTime -> UTCTime -> WindowMaker alpha
chopTime' :: forall alpha. NominalDiffTime -> UTCTime -> WindowMaker alpha
chopTime' NominalDiffTime
_ UTCTime
_ [] = []
chopTime' NominalDiffTime
tLen UTCTime
tStart [Event alpha]
s = let endTime :: UTCTime
endTime = NominalDiffTime -> UTCTime -> UTCTime
addUTCTime NominalDiffTime
tLen UTCTime
tStart
([Event alpha]
fstBuffer, [Event alpha]
rest) = UTCTime -> [Event alpha] -> ([Event alpha], [Event alpha])
forall alpha.
UTCTime -> Stream alpha -> (Stream alpha, Stream alpha)
timeTake UTCTime
endTime [Event alpha]
s
in [Event alpha]
fstBuffer [Event alpha] -> [[Event alpha]] -> [[Event alpha]]
forall a. a -> [a] -> [a]
: NominalDiffTime -> UTCTime -> WindowMaker alpha
forall alpha. NominalDiffTime -> UTCTime -> WindowMaker alpha
chopTime' NominalDiffTime
tLen UTCTime
endTime [Event alpha]
rest
timeTake :: UTCTime -> Stream alpha -> (Stream alpha, Stream alpha)
timeTake :: forall alpha.
UTCTime -> Stream alpha -> (Stream alpha, Stream alpha)
timeTake UTCTime
endTime Stream alpha
s = (Event alpha -> Bool)
-> Stream alpha -> (Stream alpha, Stream alpha)
forall a. (a -> Bool) -> [a] -> ([a], [a])
span (\(Event (Just UTCTime
t) Maybe alpha
_) -> UTCTime
t UTCTime -> UTCTime -> Bool
forall a. Ord a => a -> a -> Bool
< UTCTime
endTime) Stream alpha
s
complete :: WindowMaker alpha
complete :: forall alpha. WindowMaker alpha
complete Stream alpha
s = [Stream alpha
s]
streamMerge:: [Stream alpha]-> Stream alpha
streamMerge :: forall alpha. [Stream alpha] -> Stream alpha
streamMerge [] = []
streamMerge (Stream alpha
x:[]) = Stream alpha
x
streamMerge (Stream alpha
x:[Stream alpha]
xs) = Stream alpha -> Stream alpha -> Stream alpha
forall alpha. Stream alpha -> Stream alpha -> Stream alpha
merge' Stream alpha
x ([Stream alpha] -> Stream alpha
forall alpha. [Stream alpha] -> Stream alpha
streamMerge [Stream alpha]
xs)
where merge':: Stream alpha -> Stream alpha -> Stream alpha
merge' :: forall alpha. Stream alpha -> Stream alpha -> Stream alpha
merge' Stream alpha
xs [] = Stream alpha
xs
merge' [] Stream alpha
ys = Stream alpha
ys
merge' s1 :: Stream alpha
s1@(e1 :: Event alpha
e1@(Event (Just UTCTime
t1) Maybe alpha
_):Stream alpha
xs) s2 :: Stream alpha
s2@(e2 :: Event alpha
e2@(Event (Just UTCTime
t2) Maybe alpha
_):Stream alpha
ys) | UTCTime
t1 UTCTime -> UTCTime -> Bool
forall a. Ord a => a -> a -> Bool
< UTCTime
t2 = Event alpha
e1Event alpha -> Stream alpha -> Stream alpha
forall a. a -> [a] -> [a]
: Stream alpha -> Stream alpha -> Stream alpha
forall alpha. Stream alpha -> Stream alpha -> Stream alpha
merge' Stream alpha
s2 Stream alpha
xs
| Bool
otherwise = Event alpha
e2Event alpha -> Stream alpha -> Stream alpha
forall a. a -> [a] -> [a]
: Stream alpha -> Stream alpha -> Stream alpha
forall alpha. Stream alpha -> Stream alpha -> Stream alpha
merge' Stream alpha
ys Stream alpha
s1
merge' (Event alpha
e1:Stream alpha
xs) Stream alpha
s2 = Event alpha
e1Event alpha -> Stream alpha -> Stream alpha
forall a. a -> [a] -> [a]
: Stream alpha -> Stream alpha -> Stream alpha
forall alpha. Stream alpha -> Stream alpha -> Stream alpha
merge' Stream alpha
s2 Stream alpha
xs
streamJoin :: Stream alpha -> Stream beta -> Stream (alpha,beta)
streamJoin :: forall alpha beta.
Stream alpha -> Stream beta -> Stream (alpha, beta)
streamJoin [] [] = []
streamJoin [Event alpha]
_ [] = []
streamJoin [] [Event beta]
_ = []
streamJoin ((Event Maybe UTCTime
t1 (Just alpha
v1)):[Event alpha]
r1) ((Event Maybe UTCTime
_ (Just beta
v2)):[Event beta]
r2) = (Maybe UTCTime -> Maybe (alpha, beta) -> Event (alpha, beta)
forall alpha. Maybe UTCTime -> Maybe alpha -> Event alpha
Event Maybe UTCTime
t1 ((alpha, beta) -> Maybe (alpha, beta)
forall a. a -> Maybe a
Just(alpha
v1,beta
v2)))Event (alpha, beta)
-> [Event (alpha, beta)] -> [Event (alpha, beta)]
forall a. a -> [a] -> [a]
:([Event alpha] -> [Event beta] -> [Event (alpha, beta)]
forall alpha beta.
Stream alpha -> Stream beta -> Stream (alpha, beta)
streamJoin [Event alpha]
r1 [Event beta]
r2)
streamJoin ((Event Maybe UTCTime
_ Maybe alpha
Nothing ):[Event alpha]
r1) s2 :: [Event beta]
s2@((Event Maybe UTCTime
_ (Just beta
v2)):[Event beta]
_ ) = [Event alpha] -> [Event beta] -> [Event (alpha, beta)]
forall alpha beta.
Stream alpha -> Stream beta -> Stream (alpha, beta)
streamJoin [Event alpha]
r1 [Event beta]
s2
streamJoin s1 :: [Event alpha]
s1@((Event Maybe UTCTime
_ (Just alpha
v1)):[Event alpha]
_ ) ((Event Maybe UTCTime
_ Maybe beta
Nothing ):[Event beta]
r2) = [Event alpha] -> [Event beta] -> [Event (alpha, beta)]
forall alpha beta.
Stream alpha -> Stream beta -> Stream (alpha, beta)
streamJoin [Event alpha]
s1 [Event beta]
r2
streamJoin ((Event Maybe UTCTime
_ Maybe alpha
Nothing ):[Event alpha]
r1) ((Event Maybe UTCTime
_ Maybe beta
Nothing ):[Event beta]
r2) = [Event alpha] -> [Event beta] -> [Event (alpha, beta)]
forall alpha beta.
Stream alpha -> Stream beta -> Stream (alpha, beta)
streamJoin [Event alpha]
r1 [Event beta]
r2
type JoinFilter alpha beta = alpha -> beta -> Bool
type JoinMap alpha beta gamma = alpha -> beta -> gamma
streamJoinE :: WindowMaker alpha ->
WindowMaker beta ->
JoinFilter alpha beta ->
JoinMap alpha beta gamma ->
Stream alpha ->
Stream beta ->
Stream gamma
streamJoinE :: forall alpha beta gamma.
WindowMaker alpha
-> WindowMaker beta
-> JoinFilter alpha beta
-> JoinMap alpha beta gamma
-> Stream alpha
-> Stream beta
-> Stream gamma
streamJoinE WindowMaker alpha
fwm1 WindowMaker beta
fwm2 JoinFilter alpha beta
fwj JoinMap alpha beta gamma
fwm Stream alpha
s1 Stream beta
s2 = Stream [gamma] -> Stream gamma
forall alpha. Stream [alpha] -> Stream alpha
streamExpand (Stream [gamma] -> Stream gamma) -> Stream [gamma] -> Stream gamma
forall a b. (a -> b) -> a -> b
$ EventMap ([alpha], [beta]) [gamma]
-> Stream ([alpha], [beta]) -> Stream [gamma]
forall alpha beta.
EventMap alpha beta -> Stream alpha -> Stream beta
streamMap (JoinFilter alpha beta
-> JoinMap alpha beta gamma -> EventMap ([alpha], [beta]) [gamma]
forall alpha beta gamma.
JoinFilter alpha beta
-> JoinMap alpha beta gamma -> ([alpha], [beta]) -> [gamma]
cartesianJoin JoinFilter alpha beta
fwj JoinMap alpha beta gamma
fwm) (Stream ([alpha], [beta]) -> Stream [gamma])
-> Stream ([alpha], [beta]) -> Stream [gamma]
forall a b. (a -> b) -> a -> b
$ Stream [alpha] -> Stream [beta] -> Stream ([alpha], [beta])
forall alpha beta.
Stream alpha -> Stream beta -> Stream (alpha, beta)
streamJoin (WindowMaker alpha -> Stream alpha -> Stream [alpha]
forall alpha. WindowMaker alpha -> Stream alpha -> Stream [alpha]
streamWindow WindowMaker alpha
fwm1 Stream alpha
s1) (WindowMaker beta -> Stream beta -> Stream [beta]
forall alpha. WindowMaker alpha -> Stream alpha -> Stream [alpha]
streamWindow WindowMaker beta
fwm2 Stream beta
s2)
where cartesianJoin :: JoinFilter alpha beta -> JoinMap alpha beta gamma -> ([alpha],[beta]) -> [gamma]
cartesianJoin :: forall alpha beta gamma.
JoinFilter alpha beta
-> JoinMap alpha beta gamma -> ([alpha], [beta]) -> [gamma]
cartesianJoin JoinFilter alpha beta
jf JoinMap alpha beta gamma
jm ([alpha]
w1,[beta]
w2) = ((alpha, beta) -> gamma) -> [(alpha, beta)] -> [gamma]
forall a b. (a -> b) -> [a] -> [b]
map (\(alpha
e1,beta
e2)->JoinMap alpha beta gamma
jm alpha
e1 beta
e2) ([(alpha, beta)] -> [gamma]) -> [(alpha, beta)] -> [gamma]
forall a b. (a -> b) -> a -> b
$ ((alpha, beta) -> Bool) -> [(alpha, beta)] -> [(alpha, beta)]
forall a. (a -> Bool) -> [a] -> [a]
filter (\(alpha
e1,beta
e2)->JoinFilter alpha beta
jf alpha
e1 beta
e2) ([(alpha, beta)] -> [(alpha, beta)])
-> [(alpha, beta)] -> [(alpha, beta)]
forall a b. (a -> b) -> a -> b
$ [alpha] -> [beta] -> [(alpha, beta)]
forall alpha beta. [alpha] -> [beta] -> [(alpha, beta)]
cartesianProduct [alpha]
w1 [beta]
w2
cartesianProduct:: [alpha] -> [beta] -> [(alpha,beta)]
cartesianProduct :: forall alpha beta. [alpha] -> [beta] -> [(alpha, beta)]
cartesianProduct [alpha]
s1 [beta]
s2 = [(alpha
a,beta
b)|alpha
a<-[alpha]
s1,beta
b<-[beta]
s2]
streamJoinW :: WindowMaker alpha ->
WindowMaker beta ->
([alpha] -> [beta] -> gamma) -> Stream alpha -> Stream beta -> Stream gamma
streamJoinW :: forall alpha beta gamma.
WindowMaker alpha
-> WindowMaker beta
-> ([alpha] -> [beta] -> gamma)
-> Stream alpha
-> Stream beta
-> Stream gamma
streamJoinW WindowMaker alpha
fwm1 WindowMaker beta
fwm2 [alpha] -> [beta] -> gamma
fwj Stream alpha
s1 Stream beta
s2 = EventMap ([alpha], [beta]) gamma
-> Stream ([alpha], [beta]) -> Stream gamma
forall alpha beta.
EventMap alpha beta -> Stream alpha -> Stream beta
streamMap (\([alpha]
w1,[beta]
w2)->[alpha] -> [beta] -> gamma
fwj [alpha]
w1 [beta]
w2) (Stream ([alpha], [beta]) -> Stream gamma)
-> Stream ([alpha], [beta]) -> Stream gamma
forall a b. (a -> b) -> a -> b
$ Stream [alpha] -> Stream [beta] -> Stream ([alpha], [beta])
forall alpha beta.
Stream alpha -> Stream beta -> Stream (alpha, beta)
streamJoin (WindowMaker alpha -> Stream alpha -> Stream [alpha]
forall alpha. WindowMaker alpha -> Stream alpha -> Stream [alpha]
streamWindow WindowMaker alpha
fwm1 Stream alpha
s1) (WindowMaker beta -> Stream beta -> Stream [beta]
forall alpha. WindowMaker alpha -> Stream alpha -> Stream [alpha]
streamWindow WindowMaker beta
fwm2 Stream beta
s2)
streamFilterAcc:: (beta -> alpha -> beta) -> beta -> (alpha -> beta -> Bool) -> Stream alpha -> Stream alpha
streamFilterAcc :: forall beta alpha.
(beta -> alpha -> beta)
-> beta -> (alpha -> beta -> Bool) -> Stream alpha -> Stream alpha
streamFilterAcc beta -> alpha -> beta
accfn beta
acc alpha -> beta -> Bool
ff [] = []
streamFilterAcc beta -> alpha -> beta
accfn beta
acc alpha -> beta -> Bool
ff (e :: Event alpha
e@(Event Maybe UTCTime
_ (Just alpha
v)):[Event alpha]
r) | alpha -> beta -> Bool
ff alpha
v beta
acc = Event alpha
eEvent alpha -> [Event alpha] -> [Event alpha]
forall a. a -> [a] -> [a]
:((beta -> alpha -> beta)
-> beta
-> (alpha -> beta -> Bool)
-> [Event alpha]
-> [Event alpha]
forall beta alpha.
(beta -> alpha -> beta)
-> beta -> (alpha -> beta -> Bool) -> Stream alpha -> Stream alpha
streamFilterAcc beta -> alpha -> beta
accfn (beta -> alpha -> beta
accfn beta
acc alpha
v) alpha -> beta -> Bool
ff [Event alpha]
r)
| Bool
otherwise = (beta -> alpha -> beta)
-> beta
-> (alpha -> beta -> Bool)
-> [Event alpha]
-> [Event alpha]
forall beta alpha.
(beta -> alpha -> beta)
-> beta -> (alpha -> beta -> Bool) -> Stream alpha -> Stream alpha
streamFilterAcc beta -> alpha -> beta
accfn (beta -> alpha -> beta
accfn beta
acc alpha
v) alpha -> beta -> Bool
ff [Event alpha]
r
streamFilterAcc beta -> alpha -> beta
accfn beta
acc alpha -> beta -> Bool
ff (e :: Event alpha
e@(Event Maybe UTCTime
_ Maybe alpha
Nothing ):[Event alpha]
r) = Event alpha
eEvent alpha -> [Event alpha] -> [Event alpha]
forall a. a -> [a] -> [a]
:((beta -> alpha -> beta)
-> beta
-> (alpha -> beta -> Bool)
-> [Event alpha]
-> [Event alpha]
forall beta alpha.
(beta -> alpha -> beta)
-> beta -> (alpha -> beta -> Bool) -> Stream alpha -> Stream alpha
streamFilterAcc beta -> alpha -> beta
accfn beta
acc alpha -> beta -> Bool
ff [Event alpha]
r)
streamScan:: (beta -> alpha -> beta) -> beta -> Stream alpha -> Stream beta
streamScan :: forall beta alpha.
(beta -> alpha -> beta) -> beta -> Stream alpha -> Stream beta
streamScan beta -> alpha -> beta
_ beta
_ [] = []
streamScan beta -> alpha -> beta
mf beta
acc (Event Maybe UTCTime
t (Just alpha
v):[Event alpha]
r) = Maybe UTCTime -> Maybe beta -> Event beta
forall alpha. Maybe UTCTime -> Maybe alpha -> Event alpha
Event Maybe UTCTime
t (beta -> Maybe beta
forall a. a -> Maybe a
Just beta
newacc)Event beta -> [Event beta] -> [Event beta]
forall a. a -> [a] -> [a]
:(beta -> alpha -> beta) -> beta -> [Event alpha] -> [Event beta]
forall beta alpha.
(beta -> alpha -> beta) -> beta -> Stream alpha -> Stream beta
streamScan beta -> alpha -> beta
mf beta
newacc [Event alpha]
r where newacc :: beta
newacc = beta -> alpha -> beta
mf beta
acc alpha
v
streamScan beta -> alpha -> beta
mf beta
acc (Event Maybe UTCTime
t Maybe alpha
Nothing :[Event alpha]
r) = Maybe UTCTime -> Maybe beta -> Event beta
forall alpha. Maybe UTCTime -> Maybe alpha -> Event alpha
Event Maybe UTCTime
t Maybe beta
forall a. Maybe a
Nothing Event beta -> [Event beta] -> [Event beta]
forall a. a -> [a] -> [a]
:(beta -> alpha -> beta) -> beta -> [Event alpha] -> [Event beta]
forall beta alpha.
(beta -> alpha -> beta) -> beta -> Stream alpha -> Stream beta
streamScan beta -> alpha -> beta
mf beta
acc [Event alpha]
r
instance Arbitrary UTCTime where
arbitrary :: Gen UTCTime
arbitrary = do
NonNegative Integer
d <- Gen (NonNegative Integer)
forall a. Arbitrary a => Gen a
arbitrary :: Gen (NonNegative Integer)
NonNegative Integer
i <- Gen (NonNegative Integer)
forall a. Arbitrary a => Gen a
arbitrary :: Gen (NonNegative Integer)
UTCTime -> Gen UTCTime
forall (m :: * -> *) a. Monad m => a -> m a
return (UTCTime -> Gen UTCTime) -> UTCTime -> Gen UTCTime
forall a b. (a -> b) -> a -> b
$ Day -> DiffTime -> UTCTime
UTCTime (Integer -> Day
ModifiedJulianDay Integer
d) (Integer -> DiffTime
picosecondsToDiffTime Integer
i)
instance Arbitrary a => Arbitrary (Event a) where
arbitrary :: Gen (Event a)
arbitrary = do
a
i <- Gen a
forall a. Arbitrary a => Gen a
arbitrary
UTCTime
t <- Gen UTCTime
forall a. Arbitrary a => Gen a
arbitrary
Event a -> Gen (Event a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Event a -> Gen (Event a)) -> Event a -> Gen (Event a)
forall a b. (a -> b) -> a -> b
$ Maybe UTCTime -> Maybe a -> Event a
forall alpha. Maybe UTCTime -> Maybe alpha -> Event alpha
Event (UTCTime -> Maybe UTCTime
forall a. a -> Maybe a
Just UTCTime
t) (a -> Maybe a
forall a. a -> Maybe a
Just a
i)
prop_streamScan_samelength :: Stream Int -> Bool
prop_streamScan_samelength :: Stream Int -> Bool
prop_streamScan_samelength Stream Int
s = Stream Int -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length Stream Int
s Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Stream Int -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length ((Int -> Int -> Int) -> Int -> Stream Int -> Stream Int
forall beta alpha.
(beta -> alpha -> beta) -> beta -> Stream alpha -> Stream beta
streamScan (\Int
_ Int
x-> Int
x) Int
0 Stream Int
s)
streamExpand :: Stream [alpha] -> Stream alpha
streamExpand s = concatMap eventExpand s
where eventExpand :: Event [alpha] -> [Event alpha]
eventExpand (Event t (Just v)) = map (\nv->Event t (Just nv)) v
eventExpand (Event t Nothing ) = [Event t Nothing]
filterAcc :: (b -> a -> b) -> b -> (a -> b -> Bool) -> [a] -> ([a],b)
filterAcc accfn acc pred = let
foldfn (list,acc) v = (if pred v acc then v:list else list, accfn acc v)
in foldl foldfn ([],acc)
s1 :: Stream Int
s1 = [(Event (Just (addUTCTime i (read "2013-01-01 00:00:00 +0000"))) (Just 999))|i<-[0..]]
s2 :: Stream Int
s2 = [Event (Just (addUTCTime i (read "2013-01-01 00:00:00 +0000"))) Nothing |i<-[0..]]
s3 :: Stream Int
s3 = streamMerge [s1,s2]
s4 :: Stream Int
s4 = [Event Nothing (Just i)|i<-[0..]]
s5 :: Stream Int
s5 = streamMerge [s2,s4]
s6 :: Stream Int
s6 = [Event Nothing (Just i)|i<-[100..]]
ex1 i = streamWindow (sliding i) s3
ex2 i = streamWindow (chop i) s3
ex3 i = streamWindow (sliding i) s4
ex4 i = streamWindow (chop i) s4
ex5 = streamFilter (\v->v>1000) s1
ex6 = streamFilter (\v->v<1000) s1
sample :: Int -> Stream alpha -> Stream alpha
sample n s = streamFilterAcc (\acc h -> if acc==0 then n else acc-1) n (\h acc -> acc==0) s
ex7 = streamJoin s1 s4
ex8 = streamJoinW (chop 2) (chop 2) (\a b->(sum a)+(sum b)) s4 s6
ex9 = streamJoinE (chop 2) (chop 2) (\a b->a<b) (\a b->a+b) s4 s6