Skip to content

Commit ea2cdba

Browse files
committed
Infinite steams cookbook recipe
1 parent 36a2fe9 commit ea2cdba

File tree

4 files changed

+305
-0
lines changed

4 files changed

+305
-0
lines changed

cabal.project

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ packages:
5050
doc/cookbook/using-free-client
5151
-- doc/cookbook/open-id-connect
5252
doc/cookbook/managed-resource
53+
doc/cookbook/infinite-streams
5354
doc/cookbook/openapi3
5455
doc/cookbook/expose-prometheus
5556

doc/cookbook/index.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,3 +42,4 @@ you name it!
4242
testing/Testing.lhs
4343
open-id-connect/OpenIdConnect.lhs
4444
managed-resource/ManagedResource.lhs
45+
infinite-streams/InfiniteStreams.lhs
Lines changed: 277 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,277 @@
1+
# Serving infinite streams
2+
3+
Servant provides facilities to work with streams of data. This is handy for cases where the data may take a while to
4+
fetch, but we can start returning data early. In this cookbook, we will be concerned with serving _infinite_ HTTP
5+
streams.
6+
7+
HTTP streams have many advantages compared to other streaming standards like websockets: they are simple
8+
and are well-supported by a broad range of intermediaries, such as proxy servers, content-delivery networks, and
9+
corporate firewalls.
10+
11+
An _infinite_ HTTP stream differs from a _finite_ stream in two major ways. First, cleaning up resources (such as
12+
connections) associated with infinite streams is not deterministic. We do not know when the handler will
13+
terminate -- if at all! Second, we want to ensure that the connection isn't cut off because no data is flowing.
14+
To keep the connection alive, we will need to send bytes on a regular basis.
15+
16+
This is a Literate Haskell file, so let's get imports out of the way.
17+
18+
```haskell
19+
{-# LANGUAGE BangPatterns #-}
20+
{-# LANGUAGE DeriveGeneric #-}
21+
{-# LANGUAGE LambdaCase #-}
22+
{-# LANGUAGE NumericUnderscores #-}
23+
{-# LANGUAGE OverloadedStrings #-}
24+
{-# LANGUAGE TypeOperators #-}
25+
module Main (main) where
26+
27+
-- from `aeson`
28+
import Data.Aeson (FromJSON, ToJSON)
29+
30+
-- from `base`
31+
import Control.Concurrent (threadDelay, forkIO, killThread)
32+
import Control.Concurrent.Chan (Chan, newChan, readChan, writeChan)
33+
import Control.Concurrent.MVar (MVar, newEmptyMVar, putMVar, isEmptyMVar)
34+
import Control.Exception (throwIO, bracket)
35+
import Control.Monad (forever)
36+
import qualified Data.List
37+
import Data.Proxy (Proxy(Proxy))
38+
import GHC.Generics (Generic)
39+
40+
-- from `http-client`
41+
import Network.HTTP.Client (newManager, defaultManagerSettings)
42+
43+
-- from the `resourcet` package
44+
import Control.Monad.Trans.Resource (ReleaseKey)
45+
import Data.Acquire ( mkAcquire, Acquire )
46+
47+
-- from `servant`
48+
import Servant
49+
( WithResource,
50+
type (:>),
51+
StreamGet,
52+
NewlineFraming,
53+
JSON,
54+
SourceIO,
55+
Context((:.), EmptyContext),
56+
Handler,
57+
Application )
58+
import qualified Servant.Types.SourceT as SourceT
59+
60+
-- from `servant-client`
61+
import Servant.Client.Streaming (ClientM, mkClientEnv, client, withClientM, BaseUrl (..), Scheme (Http))
62+
63+
-- from `servant-server`
64+
import Servant.Server (serveWithContext)
65+
66+
-- from `warp`
67+
import qualified Network.Wai.Handler.Warp as Warp (run)
68+
```
69+
70+
We start with our scenario: we are tasked with creating an API which will serve random numbers in real-time. We are
71+
given a function that creates a producer of characters, and a method for us to stop the producer:
72+
73+
74+
```haskell
75+
createProducer :: IO (Chan Int, IO ())
76+
createProducer = do
77+
chan <- newChan
78+
isDone <- newEmptyMVar
79+
80+
let -- This is the action that the consumer of the stream
81+
-- can run to stop feeding the channel
82+
weAreDone = putMVar isDone ()
83+
84+
-- Writer thread will feed our Chan forever. This is where
85+
-- your secret random number generation algorithm would go.
86+
-- Unfortunately, we have some technical debt here.
87+
_ <- forkIO (go (cycle [1,5,10,20,45]) chan isDone)
88+
89+
pure ( chan
90+
, weAreDone
91+
)
92+
where
93+
go :: [Int] -> Chan Int -> MVar () -> IO ()
94+
go stream chan isDone = do
95+
isEmpty <- isEmptyMVar isDone
96+
if not isEmpty
97+
-- We are done
98+
then pure ()
99+
else case Data.List.uncons stream of
100+
Nothing -> throwIO (userError "Impossible!")
101+
Just (nextNum, restStream) -> do
102+
-- We simulate a random delay in how numbers are returned.
103+
threadDelay (nextNum * 7_000)
104+
chan `writeChan` nextNum
105+
go restStream chan isDone
106+
```
107+
108+
This was a lot of set up; we now have the ability to create an infinite stream of integers, and message the producer
109+
that we are not listening anymore. In practice, this might mean connecting and disconnecting from a source of
110+
data upstream, for example.
111+
112+
We will now define our API. It has a single route: a method for a subscriber to subscribe to our infinite stream
113+
of random numbers. As mentioned previously, there might be a long time between integers being generated upstream.
114+
We will need to send some bytes just to keep the connection open. To do this, we create a type for the elements
115+
of our infinite stream:
116+
117+
```haskell
118+
data InfiniteStream a = Element a | KeepAlive
119+
deriving (Show, Generic)
120+
121+
-- For brevity, we derive these instances generically.
122+
-- In production, you can optimize the representation
123+
-- much better.
124+
instance ToJSON a => ToJSON (InfiniteStream a)
125+
instance FromJSON a => FromJSON (InfiniteStream a)
126+
```
127+
128+
We'll also need to package our resources into a specific type, `Upstream`:
129+
130+
```haskell
131+
data Upstream a =
132+
Upstream { getNext :: IO (InfiniteStream a)
133+
-- ^ Fetch the next element to forward to the client
134+
, pleaseStop :: IO ()
135+
-- ^ Notify upstream to stop sending data
136+
}
137+
```
138+
139+
`Upstream` is a data type which we want to allocate for a handler, and deallocate once the connection
140+
is closed, which means we want to involve `WithResource`. The API definition becomes:
141+
142+
```haskell
143+
type InfiniteIntegersAPI
144+
= WithResource (Upstream Int)
145+
:> StreamGet
146+
NewlineFraming
147+
JSON
148+
(SourceIO (InfiniteStream Int))
149+
```
150+
151+
Let's write our handler, which is pretty simple: return an infinite stream by
152+
continuously calling `getNext`:
153+
154+
```haskell
155+
handleInfiniteIntegersAPI :: (ReleaseKey, Upstream Int) -> Handler (SourceIO (InfiniteStream Int))
156+
handleInfiniteIntegersAPI (_, upstream) =
157+
let neverStop = const False
158+
in pure (SourceT.fromAction neverStop (getNext upstream))
159+
```
160+
161+
Now for the tricky bit. We need to produce data on a regular basis, even if there are no
162+
numbers available upstream. Typically, a HTTP server will break connections after 30 seconds without data.
163+
For this example, we'll provide data 0.1 seconds so that the example runs quickly. We do this when we
164+
allocate a new `Upstream` in `allocate`:
165+
166+
```haskell
167+
allocate :: IO (Upstream Int)
168+
allocate = do
169+
-- Channel that will feed the client
170+
toDownstream <- newChan
171+
172+
-- Producer from upstream
173+
(intChan, weAreDone) <- createProducer
174+
175+
let -- action to spawn a thread that will continuously write 'KeepAlive' messages
176+
keepalive = forkIO (forever (threadDelay 100_000 *> writeChan toDownstream KeepAlive))
177+
178+
-- The function below, `go`, is used to forward elements from the upstream
179+
-- producer 'intChan' to the 'toDownstream' channel.
180+
--
181+
-- The wrinkle is that we must send data downstream regularly. Therefore, every time
182+
-- a new element is produced by 'toDownstream', we reset the keepalive thread
183+
-- (named 'keepAliveThreadId ') by killing it and starting it again.
184+
--
185+
-- This ensures:
186+
--
187+
-- * that we send data (either an `Element` or `KeepAlive`) every 0.1 seconds at most;
188+
-- * that we do not send more `KeepAlive` messages than necessary.
189+
go keepAliveThreadId = do
190+
readChan intChan >>= writeChan toDownstream . Element
191+
killThread keepAliveThreadId
192+
keepalive >>= go
193+
194+
loopThreadId <- forkIO (keepalive >>= go)
195+
196+
pure (Upstream { getNext = readChan toDownstream
197+
, pleaseStop = weAreDone >> killThread loopThreadId
198+
}
199+
)
200+
```
201+
202+
Finally, we must tell our server how to allocate and deallocate an `Upstream Int`. The `allocate` function
203+
below is executed when a client connects, and the `deallocate` function is executed when the connection is
204+
closed in any way:
205+
206+
```haskell
207+
withUpstream :: Acquire (Upstream Int)
208+
withUpstream = mkAcquire allocate pleaseStop
209+
```
210+
211+
We now have all the pieces to assemble our server:
212+
213+
```haskell
214+
server :: Application
215+
server = serveWithContext
216+
(Proxy :: Proxy InfiniteIntegersAPI)
217+
(withUpstream :. EmptyContext)
218+
handleInfiniteIntegersAPI
219+
```
220+
221+
and our client:
222+
223+
```haskell
224+
getInfiniteIntegers :: ClientM (SourceIO (InfiniteStream Int))
225+
getInfiniteIntegers = client (Proxy :: Proxy InfiniteIntegersAPI)
226+
```
227+
228+
We can see how they interact:
229+
230+
```haskell
231+
main :: IO ()
232+
main = do
233+
mgr <- newManager defaultManagerSettings
234+
let url = (BaseUrl Http "localhost" 8080 "")
235+
bracket (forkIO (Warp.run 8080 server)) killThread (\_ -> do
236+
threadDelay 100_000
237+
withClientM getInfiniteIntegers (mkClientEnv mgr url) (\case
238+
Left clientError -> throwIO clientError
239+
Right stream -> SourceT.unSourceT stream go
240+
)
241+
)
242+
where
243+
go (SourceT.Yield !incoming next) = print incoming >> go next
244+
go (SourceT.Effect !x) = x >>= go
245+
go (SourceT.Skip !next) = go next
246+
go (SourceT.Error err) = error err
247+
go (SourceT.Stop) = error "Unexpected stream end"
248+
```
249+
250+
Running this program shows:
251+
252+
```
253+
Element 1
254+
Element 5
255+
Element 10
256+
KeepAlive
257+
Element 20
258+
KeepAlive
259+
KeepAlive
260+
KeepAlive
261+
Element 45
262+
Element 1
263+
Element 5
264+
Element 10
265+
KeepAlive
266+
Element 20
267+
KeepAlive
268+
KeepAlive
269+
KeepAlive
270+
Element 45
271+
Element 1
272+
Element 5
273+
Element 10
274+
KeepAlive
275+
Element 20
276+
...
277+
```
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
cabal-version: 2.2
2+
name: cookbook-infinite-streams
3+
version: 2.1
4+
synopsis: Serving infinite streams
5+
homepage: http://docs.servant.dev/
6+
license: BSD-3-Clause
7+
license-file: ../../../servant/LICENSE
8+
author: Servant Contributors
9+
maintainer: [email protected]
10+
build-type: Simple
11+
12+
executable cookbook-infinite-streams
13+
main-is: InfiniteStreams.lhs
14+
build-tool-depends: markdown-unlit:markdown-unlit
15+
default-language: Haskell2010
16+
ghc-options: -Wall -pgmL markdown-unlit -Wunused-packages -threaded -rtsopts -with-rtsopts=-N
17+
18+
hs-source-dirs: .
19+
build-depends: base >= 4.8 && <5
20+
, aeson
21+
, http-client
22+
, resourcet
23+
, servant
24+
, servant-server
25+
, servant-client
26+
, warp

0 commit comments

Comments
 (0)