Saturday, October 30, 2010

Is this an Arrow II: The Type System Strikes Back!

In my last post I asked about my EventStream and RTSP types, and asked if RTSP could be an arrow and what the behaviour of "&&&" should be. You can see the first post with the type definitions here. Jason Hart Priestley replied suggesting that "&&&" should be thought of as a join of two computations, which made a lot of sense. So I went with that.

Returning now to the definition of "first", consider:

first ((*2) >>> dup 5)


where "dup 5" emits two copies of every input event, the second one delayed by 5 seconds. So if the input event was [Event t (3, 3)] you would want the output to be [Event t (6, 3), Event (t+5) (6, 3)]".

But suppose that the input had two events like this: [Event t (3,3), Event (t+1) (4,4)]. What should the output be now?

It seems to me that the output of the argument to "first" should be paired with the second value of the events that "caused" those outputs. So the result should be:

[Event t (3,6),
Event (t+1) (4,8),
Event (t+5) (3,6),
Event (t+6) (4,8)]


However to get this you need to have every arrow track the "cause" of every output and tag the event with that information. So you get

type TagEvent t a = (t, Event a)

data EventStream t b c =
Emit {esEmit :: TagEvent t c, esFuture :: EventStream t b c, esHandle :: RTSP t b c}
| Wait {esHandle :: RTSP t b c} deriving Show

newtype RTSP t b c = RTSP {runRTSP :: TagEvent t b → EventStream t b c}


The type "t" indicates the type of the tag for this kind of event. Whenever an arrow emits an event in response to an input then it puts the tag from the input on the output. In the case of "dup" the EventStream it returns from handling a new event will contain both the pending outputs (with the tags from previous input events) and the two new events with tags from the latest input.

So now I can define two functions:

esFirst :: EventStream (d, t) b c → EventStream t (b, d) (c, d)
esFirst (Emit (Event t va, (vb, tag)) es k) =
Emit (Event t (va, vb), tag) (esFirst es) (rtspFirst k)
esFirst (Wait k) = Wait $ rtspFirst k

rtspFirst :: RTSP (d, t) b c → RTSP t (b, d) (c, d)
rtspFirst (RTSP r) = RTSP $ λ(Event t (va,vb), tag) ->
esFirst $ r (Event t va, (vb, tag))


The second item in the pair (of type "d") gets tacked on to the tag for the whole arrow and re-appended at the end. Problem solved? No.

instance Arrow (RTSP t) where
arr f = RTSP $ λ(e, tag) → Emit (fmap f e, tag) (Wait $ arr f) (arr f)
first = rtspFirst


GHC complains about the infinite type "t = (d, t)". The problem is that the argument to rtspFirst has the type "RTSP (d, t) b c" but it needs to have the type "RTSP t b c". So this version of RTSP isn't an arrow. I have get rid of the type tag "t", but still have some way of having an arrow match causes with effects. Is there some kind of type hackery that will do this?

Tuesday, October 26, 2010

Is this an Arrow?

I've been thinking about what a native Haskell middleware would look like for a while. One thing seemed obvious: it was going to have to be event-based. So I started thinking about event
streams.


data Event a = Event {
eventTime :: UTCTime
eventValue :: a
} deriving (Show, Eq)

instance Functor Event where
fmap f (Event t v) = Event t (f v)

-- | True if the first event occurs strictly before the second.
isBefore :: Event a → Event b → Bool
isBefore ev1 ev2 = eventTime ev1

The simplest representation of a stream of events is just an infinite list, so you can define an event processor as a function from one stream to another:


type EventProcessor a = [Event a] -> [Event a]

Unfortunately this won't work. Suppose that your program is going to emit an event e1 at time t1, but an incoming event before t1 would make your program do something else. In order to decide whether the next item in the output list is e1 you need to look at the time of the next input event. But since that event hasn't happened yet you have to wait for it. If it arrives after t1 then you have missed the deadline.


What is needed is a way to reify the two possibilities here: either the next input event is before t1 or it is after it. This leads to the following data structures:


   data EventStream b c =
Emit {esEmit :: Event c, esFuture :: EventStream b c, esHandle :: RTSP b c}
| Wait {esHandle :: RTSP b c} deriving Show

newtype RTSP b c = RTSP {runRTSP :: Event b → EventStream b c}

The "Emit" case embodies these two possibilities. An interpreter function will process this as follows:


  1. If no input event intervenes then it will emit the event esEmit and call
    itself on esFuture.

  2. On the other hand if an input event arrives before esEmit then the function
    in esHandle gets called instead, generating a replacement EventStream
    representing a different future.

  3. Finally the Wait case allows the event stream to do nothing until an event
    arrives.

The RTSP stands for "Real Time Stream Processor". It also handles the problem of what an event stream is supposed to do before the first event arrives.


Interestingly, both EventStream and RTSP are Categories. The composition operator for EventStream is simple in principle, although in
practice all the different combinations of value and timings make it rather fiddly. Here it is,along with a block comment that explains the logic if you are really interested.


{-
The (.) operator for EventStream has to deal with several scenarios:

1: (Wait k2)∘(Wait k1). This is simple because the only possible event
is an input that is piped through the two RTSPs.

2: (Wait k2)∘es1@(Emit e1 es1a k1). In this case there are three timelines:
a) e1 : (k2 e1)∘es1a -- Event e1 is passed to k2 and the result composed with es1a.
b) ev e1: (Wait k2)∘(k1 ev) -- e1 never happens. Instead the input is passed to k1, which
generates a new event stream to be composed with k2.

The code for case b is the "k" value in the "let" clause.

3: es2@(Emit e2 es2a k2) es1@(Emit e1 es1a k1). The logic here is similar to scenario
2, except that the timing of e2 has to be taken into account as well. There are five basic
timelines. Let ev = the next input event and es2b = runRTSP k2 e1

a) e2 e1 : Emit e2 (es2a∘es1a) (k2∘k1)
b) e1 e2 : k2 e1 ∘ es1a -- e2 never happens because it is overridden by e1
c) ev ... : es2∘(k1 ev) -- ev overrides e1, and the new output is fed to es2.
d) e1 ev e2 : esProcess (es2b∘es1a) ev -- e1 overrides e2, giving es1a and es2b to process ev.
e) e2 ev e1 : Emit e2 (es2a∘(k1 ev)) -- e2 is emitted, then ev overrides e1.
-}
instance Category EventStream where
-- id :: EventStream b c
id = Wait $ RTSP $ λe → Emit e id id

-- (.) :: EventStream c d → EventStream b c → EventStream b d
(Wait k2)∘(Wait k1) =
Wait $ k2∘k1
es2@(Wait k2)∘es1@(Emit e1 es1a k1) =
let es = (runRTSP k2 e1)∘es1a
k = RTSP $ λev ->
if ev `isBefore` e1
then es2 ∘ runRTSP k1 ev -- Timeline 2b
else runRTSP (esProcess es) ev -- Timeline 2a
in case es of
Emit e2 es2b _ → Emit e2 es2b k
Wait _ → Wait k
es2@(Emit e2 es2a k2)∘es1@(Wait k1) =
Emit e2 (es2a∘es1) (RTSP $ λev → es2∘runRTSP k1 ev)
es2@(Emit e2 es2a k2)∘es1@(Emit e1 es1a k1) =
let k = RTSP $ λev ->
if ev `isBefore` e1
then es2∘runRTSP k1 ev -- Timeline 3c
else runRTSP (esProcess es) ev -- Timeline 3d
es = (runRTSP k2 e1)∘es1a
in if e1 `isBefore` e2
then -- Timelines 3b, 3c and 3d. e2 never happens.
case es of
Emit e3 es3 _ → Emit e3 es3 k
Wait _ → Wait k
else -- Timelines 3a, 3c and 3e.
Emit e2 (es2a∘es1) (esProcess es2∘k1)

-- | Given a new input event to an existing event stream, this returns the
-- modified event stream.
esProcess :: EventStream b c → RTSP b c
esProcess (Wait k) = k
esProcess (Emit eOut rest k) = RTSP $ λeIn ->
if eIn `isBefore` eOut
then runRTSP k eIn
else Emit eOut (runRTSP (esProcess rest) eIn) k

And here is the matching RTSP instance:


  {-
The (.) operator for RTSP has to deal with a similar range of scenarios to that of EventStream.
The event that fires off the composed stream is e0. This is passed to r1, giving es1, and the
first event from es1 is passed to r2, giving es2. Hence these events must occur in order. However
ev is a second input event that can arrive at any time after e0.

The possible timelines are therefore:

1: e0 e1 e2 ev: Emit e2 (es2a∘es1a) _ -- The basic case. ev is passed to (es2a∘es1a)
2: e0 e1 ev e2: es2∘es1a -- e1 has happened, but ev occurs before e2 is emitted.
3: e0 ev e1 e2: r2∘esProcess es1 -- e1 is overriden by ev.
-}
instance Category RTSP where
-- id :: RTSP b c
id = RTSP $ λev → Emit ev id id
-- (.) :: RTSP c d → RTSP b c → RTSP b d
r2∘r1 = RTSP $ λe0 ->
let es1 = runRTSP r1 e0
in case es1 of
Emit e1 es1a k1a ->
let es2 = runRTSP r2 e1
k = RTSP $ λev ->
runRTSP (if ev `isBefore` e1 then r2∘k1a else r2∘esProcess es1) ev
in case es2 of
Emit e2 es2a k2a ->
Emit e2 (es2a∘es1a) k
Wait k2a ->
Wait k
Wait k1a ->
Wait (r2∘k1a)

So now we have shown that these two things are both Categories, can we make them Arrows? The Arrow class requires a definition of first with the following types:


  first :: EventStream b c -> EventStream (b, d) (c, d)  
first :: RTSP b c → RTSP (b, d) (c, d)

This is where I run into problems. EventStream can't be an Arrow because the first argument might emit a value before any input is received. At this point first is supposed to pair up the output with some other value of type d, but no such value exists. So EventStream can't be an Arrow.


RTSP doesn't have this problem because some value of type d must have been passed in at the start, so this lets us define first. However from a semantic point of view it is not clear how this should deal with delays and subsequent events.


Its easy to write a delay arrow: it simply echoes every incoming event with a fixed delay added to the event time. Other variations on the theme are possible, such as a dup arrow that echoes every input immediately and then again after a fixed delay, or an arrow that echoes every input as a Right value, but also emits a Left timeout value if no input is received after a defined delay. There is clearly a whole family of combinators waiting to be written here. But first things first.


What should first do if its argument is delay 1? Intuitively, first puts the first item of the pair through its argument, but the second argument bypasses this processing. However this would mean emitting one part of the result before the
second. What should it do with dup 1? And what do the answers imply for this
expression?

  delay 4 &&& delay 5

I don't have any good answers. Maybe RTSP isn't an arrow, but that is a real nuisance. Any ideas?