{-# OPTIONS_GHC -F -pgmF htfpp #-}
{-# OPTIONS_HADDOCK prune #-}
{-|
Module      : Striot.FunctionalProcessing
Description : StrIoT operators for processing Streams
Copyright   : © StrIoT maintainers, 2021
License     : Apache 2.0
Maintainer  : StrIoT maintainers
Stability   : experimental

The eight StrIoT low-level operators and related functions, as
well as some composite operators.
-}

module Striot.FunctionalProcessing (
    -- * The eight fundamental operators
     streamFilter
   , streamFilterAcc
   , streamMap
   , streamScan
   , streamWindow
   , streamExpand
   , streamMerge
   , streamJoin

   -- * Utility functions and convenience types
   , WindowMaker
   , WindowAggregator
   , sliding
   , slidingTime
   , chop
   , chopTime

   -- * Example composite operators
   , streamWindowAggregate
   , streamJoinE
   , streamJoinW

   -- * ???
   , complete
   , EventFilter
   , EventMap
   , JoinFilter
   , JoinMap

   , filterAcc

   , htf_thisModulesTests) where

import Striot.FunctionalIoTtypes
import Data.Time (UTCTime (..),addUTCTime,diffUTCTime,NominalDiffTime,picosecondsToDiffTime, Day (..))
import Test.Framework

-- Define the Basic IoT Stream Functions

-- Filter a Stream ...
type EventFilter alpha = alpha -> Bool                                 -- the type of the user-supplied function

streamFilter :: EventFilter alpha -> Stream alpha -> Stream alpha      -- if the value in the event meets the criteria then it can pass through
streamFilter :: forall alpha. EventFilter alpha -> Stream alpha -> Stream alpha
streamFilter EventFilter alpha
ff Stream alpha
s = (Event alpha -> Bool) -> Stream alpha -> Stream alpha
forall a. (a -> Bool) -> [a] -> [a]
filter (\(Event Maybe UTCTime
t Maybe alpha
v) -> case Maybe alpha
v of
                                                  Just alpha
val -> EventFilter alpha
ff alpha
val
                                                  Maybe alpha
Nothing  -> Bool
True)      -- allow timestamped events to pass through for time-based windowing
                           Stream alpha
s

type EventMap alpha beta = alpha -> beta
-- Map a Stream ...
streamMap :: EventMap alpha beta -> Stream alpha -> Stream beta
streamMap :: forall alpha beta.
EventMap alpha beta -> Stream alpha -> Stream beta
streamMap EventMap alpha beta
fm Stream alpha
s = (Event alpha -> Event beta) -> Stream alpha -> [Event beta]
forall a b. (a -> b) -> [a] -> [b]
map (\(Event Maybe UTCTime
t Maybe alpha
v) -> case Maybe alpha
v of
                                            Just alpha
val -> Maybe UTCTime -> Maybe beta -> Event beta
forall alpha. Maybe UTCTime -> Maybe alpha -> Event alpha
Event Maybe UTCTime
t (beta -> Maybe beta
forall a. a -> Maybe a
Just (EventMap alpha beta
fm alpha
val))
                                            Maybe alpha
Nothing  -> Maybe UTCTime -> Maybe beta -> Event beta
forall alpha. Maybe UTCTime -> Maybe alpha -> Event alpha
Event Maybe UTCTime
t Maybe beta
forall a. Maybe a
Nothing       ) -- allow timestamped events to pass through for time-based windowing
                     Stream alpha
s

-- create and aggregate windows
type WindowMaker alpha = Stream alpha -> [Stream alpha]
type WindowAggregator alpha beta = [alpha] -> beta

streamWindow :: WindowMaker alpha -> Stream alpha -> Stream [alpha]
streamWindow :: forall alpha. WindowMaker alpha -> Stream alpha -> Stream [alpha]
streamWindow WindowMaker alpha
fwm Stream alpha
s = [Stream alpha] -> Stream [alpha]
forall alpha. [Stream alpha] -> Stream [alpha]
mapWindowId (WindowMaker alpha
fwm Stream alpha
s)
           where getVals :: Stream alpha -> [alpha]
                 getVals :: forall alpha. Stream alpha -> [alpha]
getVals Stream alpha
s' = (Event alpha -> alpha) -> Stream alpha -> [alpha]
forall a b. (a -> b) -> [a] -> [b]
map (\(Event Maybe UTCTime
_ (Just alpha
val))->alpha
val) (Stream alpha -> [alpha]) -> Stream alpha -> [alpha]
forall a b. (a -> b) -> a -> b
$ (Event alpha -> Bool) -> Stream alpha -> Stream alpha
forall a. (a -> Bool) -> [a] -> [a]
filter Event alpha -> Bool
forall alpha. Event alpha -> Bool
dataEvent Stream alpha
s'
                 mapWindowId :: [Stream alpha] -> Stream [alpha]
                 mapWindowId :: forall alpha. [Stream alpha] -> Stream [alpha]
mapWindowId [] = []
                 mapWindowId (Stream alpha
x:[Stream alpha]
xs) =
                     case Stream alpha
x of
                         Event Maybe UTCTime
t Maybe alpha
_ : Stream alpha
_ -> Maybe UTCTime -> Maybe [alpha] -> Event [alpha]
forall alpha. Maybe UTCTime -> Maybe alpha -> Event alpha
Event Maybe UTCTime
t       ([alpha] -> Maybe [alpha]
forall a. a -> Maybe a
Just (Stream alpha -> [alpha]
forall alpha. Stream alpha -> [alpha]
getVals Stream alpha
x)) Event [alpha] -> [Event [alpha]] -> [Event [alpha]]
forall a. a -> [a] -> [a]
: [Stream alpha] -> [Event [alpha]]
forall alpha. [Stream alpha] -> Stream [alpha]
mapWindowId [Stream alpha]
xs
                         []            -> Maybe UTCTime -> Maybe [alpha] -> Event [alpha]
forall alpha. Maybe UTCTime -> Maybe alpha -> Event alpha
Event Maybe UTCTime
forall a. Maybe a
Nothing ([alpha] -> Maybe [alpha]
forall a. a -> Maybe a
Just [])          Event [alpha] -> [Event [alpha]] -> [Event [alpha]]
forall a. a -> [a] -> [a]
: [Stream alpha] -> [Event [alpha]]
forall alpha. [Stream alpha] -> Stream [alpha]
mapWindowId [Stream alpha]
xs

-- a useful function building on streamWindow and streamMap
streamWindowAggregate :: WindowMaker alpha -> WindowAggregator alpha beta -> Stream alpha -> Stream beta
streamWindowAggregate :: forall alpha beta.
WindowMaker alpha
-> WindowAggregator alpha beta -> Stream alpha -> Stream beta
streamWindowAggregate WindowMaker alpha
fwm WindowAggregator alpha beta
fwa Stream alpha
s = WindowAggregator alpha beta -> Stream [alpha] -> Stream beta
forall alpha beta.
EventMap alpha beta -> Stream alpha -> Stream beta
streamMap WindowAggregator alpha beta
fwa (Stream [alpha] -> Stream beta) -> Stream [alpha] -> Stream beta
forall a b. (a -> b) -> a -> b
$ WindowMaker alpha -> Stream alpha -> Stream [alpha]
forall alpha. WindowMaker alpha -> Stream alpha -> Stream [alpha]
streamWindow WindowMaker alpha
fwm Stream alpha
s

-- some examples of window functions
sliding :: Int -> WindowMaker alpha
sliding :: forall alpha. Int -> WindowMaker alpha
sliding Int
wLength Stream alpha
s = Int -> WindowMaker alpha
forall alpha. Int -> WindowMaker alpha
sliding' Int
wLength WindowMaker alpha -> WindowMaker alpha
forall a b. (a -> b) -> a -> b
$ (Event alpha -> Bool) -> Stream alpha -> Stream alpha
forall a. (a -> Bool) -> [a] -> [a]
filter Event alpha -> Bool
forall alpha. Event alpha -> Bool
dataEvent Stream alpha
s
           where sliding':: Int -> WindowMaker alpha
                 sliding' :: forall alpha. Int -> WindowMaker alpha
sliding' Int
wLength []      = []
                 sliding' Int
wLength s :: [Event alpha]
s@(Event alpha
h:[Event alpha]
t) = (Int -> [Event alpha] -> [Event alpha]
forall a. Int -> [a] -> [a]
take Int
wLength [Event alpha]
s) [Event alpha] -> [[Event alpha]] -> [[Event alpha]]
forall a. a -> [a] -> [a]
: Int -> WindowMaker alpha
forall alpha. Int -> WindowMaker alpha
sliding' Int
wLength [Event alpha]
t

slidingTime:: Int -> WindowMaker alpha -- the first argument is the window length in milliseconds
slidingTime :: forall alpha. Int -> WindowMaker alpha
slidingTime Int
tLength Stream alpha
s = NominalDiffTime -> Stream alpha -> [Stream alpha]
forall alpha. NominalDiffTime -> Stream alpha -> [Stream alpha]
slidingTime' (Int -> NominalDiffTime
milliToTimeDiff Int
tLength) (Stream alpha -> [Stream alpha]) -> Stream alpha -> [Stream alpha]
forall a b. (a -> b) -> a -> b
$ (Event alpha -> Bool) -> Stream alpha -> Stream alpha
forall a. (a -> Bool) -> [a] -> [a]
filter Event alpha -> Bool
forall alpha. Event alpha -> Bool
timedEvent Stream alpha
s
                        where slidingTime':: NominalDiffTime -> Stream alpha -> [Stream alpha]
                              slidingTime' :: forall alpha. NominalDiffTime -> Stream alpha -> [Stream alpha]
slidingTime' NominalDiffTime
tLen []                        = []
                              slidingTime' NominalDiffTime
tLen s :: [Event alpha]
s@(Event (Just UTCTime
t) Maybe alpha
_:[Event alpha]
xs) = (UTCTime -> [Event alpha] -> [Event alpha]
forall alpha. UTCTime -> Stream alpha -> Stream alpha
takeTime (NominalDiffTime -> UTCTime -> UTCTime
addUTCTime NominalDiffTime
tLen UTCTime
t) [Event alpha]
s) [Event alpha] -> [[Event alpha]] -> [[Event alpha]]
forall a. a -> [a] -> [a]
: NominalDiffTime -> [Event alpha] -> [[Event alpha]]
forall alpha. NominalDiffTime -> Stream alpha -> [Stream alpha]
slidingTime' NominalDiffTime
tLen [Event alpha]
xs

takeTime:: UTCTime -> Stream alpha -> Stream alpha
takeTime :: forall alpha. UTCTime -> Stream alpha -> Stream alpha
takeTime UTCTime
endTime []                                        = []
takeTime UTCTime
endTime (e :: Event alpha
e@(Event (Just UTCTime
t) Maybe alpha
_):[Event alpha]
xs) | UTCTime
t UTCTime -> UTCTime -> Bool
forall a. Ord a => a -> a -> Bool
< UTCTime
endTime = Event alpha
e Event alpha -> [Event alpha] -> [Event alpha]
forall a. a -> [a] -> [a]
: UTCTime -> [Event alpha] -> [Event alpha]
forall alpha. UTCTime -> Stream alpha -> Stream alpha
takeTime UTCTime
endTime [Event alpha]
xs
                                             | Bool
otherwise   = []

milliToTimeDiff :: Int -> NominalDiffTime
milliToTimeDiff :: Int -> NominalDiffTime
milliToTimeDiff Int
x = Int -> NominalDiffTime
forall a. Enum a => Int -> a
toEnum (Int
x Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
10 Int -> Integer -> Int
forall a b. (Num a, Integral b) => a -> b -> a
^ Integer
9)

chop :: Int -> WindowMaker alpha
chop :: forall alpha. Int -> WindowMaker alpha
chop Int
wLength Stream alpha
s = Int -> Stream alpha -> [Stream alpha]
forall {a}. Int -> [a] -> [[a]]
chop' Int
wLength (Stream alpha -> [Stream alpha]) -> Stream alpha -> [Stream alpha]
forall a b. (a -> b) -> a -> b
$ (Event alpha -> Bool) -> Stream alpha -> Stream alpha
forall a. (a -> Bool) -> [a] -> [a]
filter Event alpha -> Bool
forall alpha. Event alpha -> Bool
dataEvent Stream alpha
s
     where chop' :: Int -> [a] -> [[a]]
chop' Int
wLength [] = []
           chop' Int
wLength [a]
s  = [a]
w[a] -> [[a]] -> [[a]]
forall a. a -> [a] -> [a]
:(Int -> [a] -> [[a]]
chop' Int
wLength [a]
r) where ([a]
w,[a]
r) = Int -> [a] -> ([a], [a])
forall a. Int -> [a] -> ([a], [a])
splitAt Int
wLength [a]
s

chopTime :: Int -> WindowMaker alpha -- N.B. discards events without a timestamp
chopTime :: forall alpha. Int -> WindowMaker alpha
chopTime Int
_       []                         = []
chopTime Int
tLength s :: [Event alpha]
s@((Event (Just UTCTime
t) Maybe alpha
_):[Event alpha]
_) = NominalDiffTime -> UTCTime -> WindowMaker alpha
forall alpha. NominalDiffTime -> UTCTime -> WindowMaker alpha
chopTime' (Int -> NominalDiffTime
milliToTimeDiff Int
tLength) UTCTime
t WindowMaker alpha -> WindowMaker alpha
forall a b. (a -> b) -> a -> b
$ (Event alpha -> Bool) -> [Event alpha] -> [Event alpha]
forall a. (a -> Bool) -> [a] -> [a]
filter Event alpha -> Bool
forall alpha. Event alpha -> Bool
timedEvent [Event alpha]
s
    where chopTime' :: NominalDiffTime -> UTCTime -> WindowMaker alpha -- the first argument is in milliseconds
          chopTime' :: forall alpha. NominalDiffTime -> UTCTime -> WindowMaker alpha
chopTime' NominalDiffTime
_    UTCTime
_      []    = []
          chopTime' NominalDiffTime
tLen UTCTime
tStart [Event alpha]
s     = let endTime :: UTCTime
endTime           = NominalDiffTime -> UTCTime -> UTCTime
addUTCTime NominalDiffTime
tLen UTCTime
tStart
                                            ([Event alpha]
fstBuffer, [Event alpha]
rest) = UTCTime -> [Event alpha] -> ([Event alpha], [Event alpha])
forall alpha.
UTCTime -> Stream alpha -> (Stream alpha, Stream alpha)
timeTake UTCTime
endTime [Event alpha]
s
                                        in  [Event alpha]
fstBuffer [Event alpha] -> [[Event alpha]] -> [[Event alpha]]
forall a. a -> [a] -> [a]
: NominalDiffTime -> UTCTime -> WindowMaker alpha
forall alpha. NominalDiffTime -> UTCTime -> WindowMaker alpha
chopTime' NominalDiffTime
tLen UTCTime
endTime [Event alpha]
rest

timeTake :: UTCTime -> Stream alpha -> (Stream alpha, Stream alpha)
timeTake :: forall alpha.
UTCTime -> Stream alpha -> (Stream alpha, Stream alpha)
timeTake UTCTime
endTime Stream alpha
s = (Event alpha -> Bool)
-> Stream alpha -> (Stream alpha, Stream alpha)
forall a. (a -> Bool) -> [a] -> ([a], [a])
span (\(Event (Just UTCTime
t) Maybe alpha
_) -> UTCTime
t UTCTime -> UTCTime -> Bool
forall a. Ord a => a -> a -> Bool
< UTCTime
endTime) Stream alpha
s
                                        
complete :: WindowMaker alpha
complete :: forall alpha. WindowMaker alpha
complete Stream alpha
s = [Stream alpha
s]

-- Merge a set of streams that are of the same type. Preserve time ordering
streamMerge:: [Stream alpha]-> Stream alpha
streamMerge :: forall alpha. [Stream alpha] -> Stream alpha
streamMerge []     = []
streamMerge (Stream alpha
x:[]) = Stream alpha
x
streamMerge (Stream alpha
x:[Stream alpha]
xs) = Stream alpha -> Stream alpha -> Stream alpha
forall alpha. Stream alpha -> Stream alpha -> Stream alpha
merge' Stream alpha
x ([Stream alpha] -> Stream alpha
forall alpha. [Stream alpha] -> Stream alpha
streamMerge [Stream alpha]
xs)
    where merge':: Stream alpha -> Stream alpha -> Stream alpha
          merge' :: forall alpha. Stream alpha -> Stream alpha -> Stream alpha
merge' Stream alpha
xs                               []                                       = Stream alpha
xs
          merge' []                               Stream alpha
ys                                       = Stream alpha
ys
          merge' s1 :: Stream alpha
s1@(e1 :: Event alpha
e1@(Event (Just UTCTime
t1) Maybe alpha
_):Stream alpha
xs) s2 :: Stream alpha
s2@(e2 :: Event alpha
e2@(Event (Just UTCTime
t2) Maybe alpha
_):Stream alpha
ys) | UTCTime
t1 UTCTime -> UTCTime -> Bool
forall a. Ord a => a -> a -> Bool
< UTCTime
t2   = Event alpha
e1Event alpha -> Stream alpha -> Stream alpha
forall a. a -> [a] -> [a]
: Stream alpha -> Stream alpha -> Stream alpha
forall alpha. Stream alpha -> Stream alpha -> Stream alpha
merge' Stream alpha
s2 Stream alpha
xs
                                                                               | Bool
otherwise = Event alpha
e2Event alpha -> Stream alpha -> Stream alpha
forall a. a -> [a] -> [a]
: Stream alpha -> Stream alpha -> Stream alpha
forall alpha. Stream alpha -> Stream alpha -> Stream alpha
merge' Stream alpha
ys Stream alpha
s1
          merge' (Event alpha
e1:Stream alpha
xs)                          Stream alpha
s2                                       = Event alpha
e1Event alpha -> Stream alpha -> Stream alpha
forall a. a -> [a] -> [a]
: Stream alpha -> Stream alpha -> Stream alpha
forall alpha. Stream alpha -> Stream alpha -> Stream alpha
merge' Stream alpha
s2 Stream alpha
xs  -- arbitrary ordering if 1 or 2 of the events aren't timed
                                                                                                                   -- swap order of streams so as to interleave

-- Join 2 streams by combining elements
streamJoin :: Stream alpha -> Stream beta -> Stream (alpha,beta)
streamJoin :: forall alpha beta.
Stream alpha -> Stream beta -> Stream (alpha, beta)
streamJoin []                               []                               = []
streamJoin [Event alpha]
_                                []                               = []
streamJoin []                               [Event beta]
_                                = []
streamJoin    ((Event Maybe UTCTime
t1 (Just alpha
v1)):[Event alpha]
r1)    ((Event Maybe UTCTime
_  (Just beta
v2)):[Event beta]
r2) = (Maybe UTCTime -> Maybe (alpha, beta) -> Event (alpha, beta)
forall alpha. Maybe UTCTime -> Maybe alpha -> Event alpha
Event Maybe UTCTime
t1 ((alpha, beta) -> Maybe (alpha, beta)
forall a. a -> Maybe a
Just(alpha
v1,beta
v2)))Event (alpha, beta)
-> [Event (alpha, beta)] -> [Event (alpha, beta)]
forall a. a -> [a] -> [a]
:([Event alpha] -> [Event beta] -> [Event (alpha, beta)]
forall alpha beta.
Stream alpha -> Stream beta -> Stream (alpha, beta)
streamJoin [Event alpha]
r1 [Event beta]
r2)
streamJoin    ((Event Maybe UTCTime
_  Maybe alpha
Nothing  ):[Event alpha]
r1) s2 :: [Event beta]
s2@((Event Maybe UTCTime
_  (Just beta
v2)):[Event beta]
_ ) = [Event alpha] -> [Event beta] -> [Event (alpha, beta)]
forall alpha beta.
Stream alpha -> Stream beta -> Stream (alpha, beta)
streamJoin [Event alpha]
r1 [Event beta]
s2
streamJoin s1 :: [Event alpha]
s1@((Event Maybe UTCTime
_  (Just alpha
v1)):[Event alpha]
_ )    ((Event Maybe UTCTime
_  Maybe beta
Nothing  ):[Event beta]
r2) = [Event alpha] -> [Event beta] -> [Event (alpha, beta)]
forall alpha beta.
Stream alpha -> Stream beta -> Stream (alpha, beta)
streamJoin [Event alpha]
s1 [Event beta]
r2
streamJoin    ((Event Maybe UTCTime
_  Maybe alpha
Nothing  ):[Event alpha]
r1)    ((Event Maybe UTCTime
_  Maybe beta
Nothing  ):[Event beta]
r2) = [Event alpha] -> [Event beta] -> [Event (alpha, beta)]
forall alpha beta.
Stream alpha -> Stream beta -> Stream (alpha, beta)
streamJoin [Event alpha]
r1 [Event beta]
r2

-- Join 2 streams by combining windows - some useful functions that build on streamJoin
type JoinFilter alpha beta        = alpha -> beta -> Bool
type JoinMap    alpha beta gamma  = alpha -> beta -> gamma

streamJoinE :: WindowMaker alpha ->
               WindowMaker beta ->
               JoinFilter alpha beta ->
               JoinMap alpha beta gamma ->
               Stream alpha ->
               Stream beta  ->
               Stream gamma
streamJoinE :: forall alpha beta gamma.
WindowMaker alpha
-> WindowMaker beta
-> JoinFilter alpha beta
-> JoinMap alpha beta gamma
-> Stream alpha
-> Stream beta
-> Stream gamma
streamJoinE WindowMaker alpha
fwm1 WindowMaker beta
fwm2 JoinFilter alpha beta
fwj JoinMap alpha beta gamma
fwm Stream alpha
s1 Stream beta
s2 = Stream [gamma] -> Stream gamma
forall alpha. Stream [alpha] -> Stream alpha
streamExpand (Stream [gamma] -> Stream gamma) -> Stream [gamma] -> Stream gamma
forall a b. (a -> b) -> a -> b
$ EventMap ([alpha], [beta]) [gamma]
-> Stream ([alpha], [beta]) -> Stream [gamma]
forall alpha beta.
EventMap alpha beta -> Stream alpha -> Stream beta
streamMap (JoinFilter alpha beta
-> JoinMap alpha beta gamma -> EventMap ([alpha], [beta]) [gamma]
forall alpha beta gamma.
JoinFilter alpha beta
-> JoinMap alpha beta gamma -> ([alpha], [beta]) -> [gamma]
cartesianJoin JoinFilter alpha beta
fwj JoinMap alpha beta gamma
fwm) (Stream ([alpha], [beta]) -> Stream [gamma])
-> Stream ([alpha], [beta]) -> Stream [gamma]
forall a b. (a -> b) -> a -> b
$ Stream [alpha] -> Stream [beta] -> Stream ([alpha], [beta])
forall alpha beta.
Stream alpha -> Stream beta -> Stream (alpha, beta)
streamJoin (WindowMaker alpha -> Stream alpha -> Stream [alpha]
forall alpha. WindowMaker alpha -> Stream alpha -> Stream [alpha]
streamWindow WindowMaker alpha
fwm1 Stream alpha
s1) (WindowMaker beta -> Stream beta -> Stream [beta]
forall alpha. WindowMaker alpha -> Stream alpha -> Stream [alpha]
streamWindow WindowMaker beta
fwm2 Stream beta
s2)
    where cartesianJoin :: JoinFilter alpha beta -> JoinMap alpha beta gamma -> ([alpha],[beta]) -> [gamma]
          cartesianJoin :: forall alpha beta gamma.
JoinFilter alpha beta
-> JoinMap alpha beta gamma -> ([alpha], [beta]) -> [gamma]
cartesianJoin JoinFilter alpha beta
jf JoinMap alpha beta gamma
jm ([alpha]
w1,[beta]
w2) = ((alpha, beta) -> gamma) -> [(alpha, beta)] -> [gamma]
forall a b. (a -> b) -> [a] -> [b]
map (\(alpha
e1,beta
e2)->JoinMap alpha beta gamma
jm alpha
e1 beta
e2) ([(alpha, beta)] -> [gamma]) -> [(alpha, beta)] -> [gamma]
forall a b. (a -> b) -> a -> b
$ ((alpha, beta) -> Bool) -> [(alpha, beta)] -> [(alpha, beta)]
forall a. (a -> Bool) -> [a] -> [a]
filter (\(alpha
e1,beta
e2)->JoinFilter alpha beta
jf alpha
e1 beta
e2) ([(alpha, beta)] -> [(alpha, beta)])
-> [(alpha, beta)] -> [(alpha, beta)]
forall a b. (a -> b) -> a -> b
$ [alpha] -> [beta] -> [(alpha, beta)]
forall alpha beta. [alpha] -> [beta] -> [(alpha, beta)]
cartesianProduct [alpha]
w1 [beta]
w2

          cartesianProduct:: [alpha] -> [beta] -> [(alpha,beta)]
          cartesianProduct :: forall alpha beta. [alpha] -> [beta] -> [(alpha, beta)]
cartesianProduct [alpha]
s1 [beta]
s2 = [(alpha
a,beta
b)|alpha
a<-[alpha]
s1,beta
b<-[beta]
s2]

streamJoinW :: WindowMaker alpha ->
               WindowMaker beta  ->
              ([alpha] -> [beta] -> gamma)      -> Stream alpha -> Stream beta  -> Stream gamma
streamJoinW :: forall alpha beta gamma.
WindowMaker alpha
-> WindowMaker beta
-> ([alpha] -> [beta] -> gamma)
-> Stream alpha
-> Stream beta
-> Stream gamma
streamJoinW WindowMaker alpha
fwm1 WindowMaker beta
fwm2 [alpha] -> [beta] -> gamma
fwj Stream alpha
s1 Stream beta
s2 = EventMap ([alpha], [beta]) gamma
-> Stream ([alpha], [beta]) -> Stream gamma
forall alpha beta.
EventMap alpha beta -> Stream alpha -> Stream beta
streamMap (\([alpha]
w1,[beta]
w2)->[alpha] -> [beta] -> gamma
fwj [alpha]
w1 [beta]
w2) (Stream ([alpha], [beta]) -> Stream gamma)
-> Stream ([alpha], [beta]) -> Stream gamma
forall a b. (a -> b) -> a -> b
$ Stream [alpha] -> Stream [beta] -> Stream ([alpha], [beta])
forall alpha beta.
Stream alpha -> Stream beta -> Stream (alpha, beta)
streamJoin (WindowMaker alpha -> Stream alpha -> Stream [alpha]
forall alpha. WindowMaker alpha -> Stream alpha -> Stream [alpha]
streamWindow WindowMaker alpha
fwm1 Stream alpha
s1) (WindowMaker beta -> Stream beta -> Stream [beta]
forall alpha. WindowMaker alpha -> Stream alpha -> Stream [alpha]
streamWindow WindowMaker beta
fwm2 Stream beta
s2)

-- Stream Filter with accumulating parameter
streamFilterAcc:: (beta -> alpha -> beta) -> beta -> (alpha -> beta -> Bool) -> Stream alpha -> Stream alpha
streamFilterAcc :: forall beta alpha.
(beta -> alpha -> beta)
-> beta -> (alpha -> beta -> Bool) -> Stream alpha -> Stream alpha
streamFilterAcc beta -> alpha -> beta
accfn beta
acc alpha -> beta -> Bool
ff []                         = []
streamFilterAcc beta -> alpha -> beta
accfn beta
acc alpha -> beta -> Bool
ff (e :: Event alpha
e@(Event Maybe UTCTime
_ (Just alpha
v)):[Event alpha]
r) | alpha -> beta -> Bool
ff alpha
v beta
acc  = Event alpha
eEvent alpha -> [Event alpha] -> [Event alpha]
forall a. a -> [a] -> [a]
:((beta -> alpha -> beta)
-> beta
-> (alpha -> beta -> Bool)
-> [Event alpha]
-> [Event alpha]
forall beta alpha.
(beta -> alpha -> beta)
-> beta -> (alpha -> beta -> Bool) -> Stream alpha -> Stream alpha
streamFilterAcc beta -> alpha -> beta
accfn (beta -> alpha -> beta
accfn beta
acc alpha
v) alpha -> beta -> Bool
ff [Event alpha]
r)
                                                        | Bool
otherwise =    (beta -> alpha -> beta)
-> beta
-> (alpha -> beta -> Bool)
-> [Event alpha]
-> [Event alpha]
forall beta alpha.
(beta -> alpha -> beta)
-> beta -> (alpha -> beta -> Bool) -> Stream alpha -> Stream alpha
streamFilterAcc beta -> alpha -> beta
accfn (beta -> alpha -> beta
accfn beta
acc alpha
v) alpha -> beta -> Bool
ff [Event alpha]
r
streamFilterAcc beta -> alpha -> beta
accfn beta
acc alpha -> beta -> Bool
ff (e :: Event alpha
e@(Event Maybe UTCTime
_ Maybe alpha
Nothing ):[Event alpha]
r)             = Event alpha
eEvent alpha -> [Event alpha] -> [Event alpha]
forall a. a -> [a] -> [a]
:((beta -> alpha -> beta)
-> beta
-> (alpha -> beta -> Bool)
-> [Event alpha]
-> [Event alpha]
forall beta alpha.
(beta -> alpha -> beta)
-> beta -> (alpha -> beta -> Bool) -> Stream alpha -> Stream alpha
streamFilterAcc beta -> alpha -> beta
accfn beta
acc           alpha -> beta -> Bool
ff [Event alpha]
r) -- allow events without data to pass through

-- Stream map with accumulating parameter
streamScan:: (beta -> alpha -> beta) -> beta -> Stream alpha -> Stream beta
streamScan :: forall beta alpha.
(beta -> alpha -> beta) -> beta -> Stream alpha -> Stream beta
streamScan beta -> alpha -> beta
_  beta
_   []                       = []
streamScan beta -> alpha -> beta
mf beta
acc (Event Maybe UTCTime
t (Just alpha
v):[Event alpha]
r) = Maybe UTCTime -> Maybe beta -> Event beta
forall alpha. Maybe UTCTime -> Maybe alpha -> Event alpha
Event Maybe UTCTime
t (beta -> Maybe beta
forall a. a -> Maybe a
Just beta
newacc)Event beta -> [Event beta] -> [Event beta]
forall a. a -> [a] -> [a]
:(beta -> alpha -> beta) -> beta -> [Event alpha] -> [Event beta]
forall beta alpha.
(beta -> alpha -> beta) -> beta -> Stream alpha -> Stream beta
streamScan beta -> alpha -> beta
mf beta
newacc [Event alpha]
r where newacc :: beta
newacc = beta -> alpha -> beta
mf beta
acc alpha
v
streamScan beta -> alpha -> beta
mf beta
acc (Event Maybe UTCTime
t Maybe alpha
Nothing :[Event alpha]
r) = Maybe UTCTime -> Maybe beta -> Event beta
forall alpha. Maybe UTCTime -> Maybe alpha -> Event alpha
Event Maybe UTCTime
t Maybe beta
forall a. Maybe a
Nothing      Event beta -> [Event beta] -> [Event beta]
forall a. a -> [a] -> [a]
:(beta -> alpha -> beta) -> beta -> [Event alpha] -> [Event beta]
forall beta alpha.
(beta -> alpha -> beta) -> beta -> Stream alpha -> Stream beta
streamScan beta -> alpha -> beta
mf beta
acc    [Event alpha]
r -- allow events without data to pass through

instance Arbitrary UTCTime where
    arbitrary :: Gen UTCTime
arbitrary = do
        NonNegative Integer
d <- Gen (NonNegative Integer)
forall a. Arbitrary a => Gen a
arbitrary :: Gen (NonNegative Integer)
        NonNegative Integer
i <- Gen (NonNegative Integer)
forall a. Arbitrary a => Gen a
arbitrary :: Gen (NonNegative Integer)
        UTCTime -> Gen UTCTime
forall (m :: * -> *) a. Monad m => a -> m a
return (UTCTime -> Gen UTCTime) -> UTCTime -> Gen UTCTime
forall a b. (a -> b) -> a -> b
$ Day -> DiffTime -> UTCTime
UTCTime (Integer -> Day
ModifiedJulianDay Integer
d) (Integer -> DiffTime
picosecondsToDiffTime Integer
i)

instance Arbitrary a => Arbitrary (Event a) where
    arbitrary :: Gen (Event a)
arbitrary = do
        a
i <- Gen a
forall a. Arbitrary a => Gen a
arbitrary
        UTCTime
t <- Gen UTCTime
forall a. Arbitrary a => Gen a
arbitrary
        Event a -> Gen (Event a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Event a -> Gen (Event a)) -> Event a -> Gen (Event a)
forall a b. (a -> b) -> a -> b
$ Maybe UTCTime -> Maybe a -> Event a
forall alpha. Maybe UTCTime -> Maybe alpha -> Event alpha
Event (UTCTime -> Maybe UTCTime
forall a. a -> Maybe a
Just UTCTime
t) (a -> Maybe a
forall a. a -> Maybe a
Just a
i)

prop_streamScan_samelength :: Stream Int -> Bool
prop_streamScan_samelength :: Stream Int -> Bool
prop_streamScan_samelength Stream Int
s = Stream Int -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length Stream Int
s Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Stream Int -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length ((Int -> Int -> Int) -> Int -> Stream Int -> Stream Int
forall beta alpha.
(beta -> alpha -> beta) -> beta -> Stream alpha -> Stream beta
streamScan (\Int
_ Int
x-> Int
x) Int
0 Stream Int
s)

-- Map a Stream to a set of events
streamExpand :: Stream [alpha] -> Stream alpha
streamExpand s = concatMap eventExpand s
      where eventExpand :: Event [alpha] -> [Event alpha]
            eventExpand (Event t (Just v)) = map (\nv->Event t (Just nv)) v
            eventExpand (Event t Nothing ) = [Event t Nothing]

--streamSource :: Stream alpha -> Stream alpha
--streamSource ss = ss

-- streamSink:: (Stream alpha -> beta) -> Stream alpha -> beta
-- streamSink ssink s = ssink s

-- | filterAcc, for plain lists
filterAcc :: (b -> a -> b) -> b -> (a -> b -> Bool) -> [a] -> ([a],b)
filterAcc accfn acc pred = let
    foldfn (list,acc) v = (if pred v acc then v:list else list, accfn acc v)
    in foldl foldfn ([],acc)

--- Tests ------
--t1 :: Int -> Int -> Stream alpha -> (Bool,Stream alpha,Stream alpha)
--t1 tLen sLen s = splitAtValuedEvents tLen (take sLen s)

s1 :: Stream Int
s1 = [(Event (Just (addUTCTime i (read "2013-01-01 00:00:00 +0000"))) (Just 999))|i<-[0..]]

s2 :: Stream Int
s2 = [Event (Just (addUTCTime i (read "2013-01-01 00:00:00 +0000"))) Nothing |i<-[0..]]

s3 :: Stream Int
s3 = streamMerge [s1,s2]

s4 :: Stream Int
s4 = [Event Nothing (Just i)|i<-[0..]]

s5 :: Stream Int
s5 = streamMerge [s2,s4]

s6 :: Stream Int
s6 = [Event Nothing (Just i)|i<-[100..]]

ex1 i = streamWindow (sliding i) s3

ex2 i = streamWindow (chop i) s3

ex3 i = streamWindow (sliding i) s4

ex4 i = streamWindow (chop i) s4

ex5 = streamFilter (\v->v>1000) s1

ex6 = streamFilter (\v->v<1000) s1

sample :: Int -> Stream alpha -> Stream alpha
sample n s = streamFilterAcc (\acc h -> if acc==0 then n else acc-1) n (\h acc -> acc==0) s

ex7 = streamJoin s1 s4
ex8 = streamJoinW (chop 2) (chop 2) (\a b->(sum a)+(sum b)) s4 s6
ex9 = streamJoinE (chop 2) (chop 2) (\a b->a<b) (\a b->a+b) s4 s6