pipes
and some implications of the Free monad-style monadic programmingMihály Bárász, nilcons.com
HaskellerZ meetup - March 27th, 2014
Intro to pipes
View of pipes: structured continuation passing
The Proxy
type
Brief overview of pipes ecosystem
Analysis of two deep “bugs”
main = runEffect $ streamList [0..] >-> take 10 >-> printAll
streamList xs = forM_ xs yield
take 0 = return ()
take n = do
x <- await
yield x
take (n-1)
printAll = forever $ do
x <- await
lift $ print x
import Control.Monad (forever, forM_)
import Pipes
import Prelude hiding (take)
main = runEffect $ streamList [0..] >-> take 10 >-> printAll
streamList :: [a] -> Producer a IO ()
streamList xs = forM_ xs yield
take :: Int -> Pipe a a IO ()
take 0 = return ()
take n = do
x <- await
yield x
take (n-1)
printAll :: Show a => Consumer a IO ()
printAll = forever $ do
x <- await
lift $ print x
import Control.Monad (forever, forM_)
import Pipes
import Prelude hiding (take)
main = runEffect $ streamList [0..] >-> take 10 >-> printAll
streamList :: Monad m => [a] -> Producer a m ()
streamList xs = forM_ xs yield
take :: Monad m => Int -> Pipe a a m ()
take 0 = return ()
take n = do
x <- await
yield x
take (n-1)
printAll :: Show a => Consumer a IO r
printAll = forever $ do
x <- await
lift $ print x
forkIO $ runEffect $
measureCPU >-> delay >-> collectN 100 >-> updateIcon cpuIcon blue1 blue2
runEffect $ readChanSignalledS (rawTWSInput raw)
>-> useD (\t -> $logDebug $ printf "New incoming message: %s" (show $ twsI t))
>-> timePipe
>-> nextIdPipe
>-> managedAccountPipe
>-> errMsgPipe
>-> sanitizeCashOrdersInPipe
>-> filledOrdersInPipe
>-> openOrdersInPipe
>-> writeChanD chanFromTWS_
data Pipe a m r = Pipe { stepPipe :: m (Step a m r) }
data Step a m r = Yield (a, Pipe a m r)
| Await (a -> Pipe a m r)
| Done r
An excellent discussion in Monad.Reader #19: Mario Blažević, Coroutine Pipelines
Proxy
typedata Proxy a' a b' b m r
= Request a' (a -> Proxy a' a b' b m r )
| Respond b (b' -> Proxy a' a b' b m r )
| M (m (Proxy a' a b' b m r))
| Pure r
A Proxy
is a monad transformer that receives and sends information on both an upstream and downstream interface.
The type variables signify:
a'
and a
- The upstream interface, where (a')
s go out and (a)
s come in
b'
and b
- The downstream interface, where (b)
s go out and (b')
s come in
m
- The base monad
r
- The return value
Proxy
type diagrammatically
data Proxy a' a b' b m r
= Request a' (a -> Proxy a' a b' b m r )
| Respond b (b' -> Proxy a' a b' b m r )
| M (m (Proxy a' a b' b m r))
| Pure r
(>->) :: Monad m => Proxy a' a () b m r -> Proxy () b c' c m r -> Proxy a' a c' c m r
Proxy
type aliasesProducer
s can only yield
:
type Producer b = Proxy X () () b
type Producer' b m r = forall x' x. Proxy x' x () b m r
Consumer
s can only await
:
type Consumer a = Proxy () a () X
type Consumer' a m r = forall y' y. Proxy () a y' y m r
Here X
is an uninhabited type (like Data.Void
).
There are 5 predefined ways of composing pipes (or 4, or 7, depends how you look at it). And they can also be intermixed freely. These define the following categories over Proxy
:
cat
or pipes category: (>->)
and cat
.pull
category: (>+>)
and pull
.push
category: (>~>)
and push
.request
category: (\>\)
and request
.respond
category: (/>/)
and respond
.(~>)
and yield
.(>~)
and await
.
i -> Proxy a' a b' b m r
respond :: b -> Proxy a' a b' b m b'
(/>/) :: Monad m
=> (a -> Proxy x' x b' b m a')
-> (b -> Proxy x' x c' c m b')
-> a -> Proxy x' x c' c m a'
Pipes composition is not limited to the ways provided by pipes
package. If you are willing to “look inside” (by using the Pipes.Internal
module).
loopPipe :: Monad m => Maybe a -> Proxy () a () a m r -> Effect' m r
Pipesification of specific libraries, libs with pipes API:
Not really pipes specific, but designed for pipes
Extending pipes functionality
Producer
sIn (surprisingly) many ways pipes – especially Producer
s – behave like lists.
Can you spot the problem?
numbers :: Producer Int IO r
numbers = go 0
where
go n = do yield n
go (n+1)
Producer
The problem is that you can’t use the numbers
twice in your code. The following program leaks:
import Pipes
import Pipes.Core
numbers :: Producer Int IO ()
numbers = go 0
where
go n = do yield n
go (n+1)
main :: IO ()
main = do
runEffect $ numbers //> lift . print
runEffect $ numbers //> lift . print
import Control.Monad
numbers :: [Int]
numbers = go 0
where
go n = n : go (n+1)
main :: IO ()
main = do
forM_ numbers print
forM_ numbers print
This is a bit misleading, the real question is what thunks are created and what have reference to what. You can visualise this with ghc-vis.
Sometimes an optimized build might help. Eg. we can produce the same kind of structure simply in IO
, no pipes involved:
printNumbers :: IO ()
printNumbers = go 0
where
go n = do print n
go (n+1)
main :: IO ()
main = do
printNumbers
printNumbers
And it does leak if compiled with -O0
. But if compiled with -O
it runs in constant space.
Generally, avoid defining recursive structures that don’t depend on anything (parameters or IO) and using them more than once.
There’s no silver bullet, though. Consider this:
numbersFrom :: Int -> Producer Int IO ()
numbersFrom = go
where
go n = do yield n
go (n+1)
main :: IO ()
main = do
let n = 42
runEffect $ numbersFrom n //> lift . print
runEffect $ numbersFrom n //> lift . print
Solution: -fno-full-laziness
.
Can you spot the issue?
chunkify :: Monad m => Int -> Pipe a [a] m r
chunkify n = forever $ do
xs <- replicateM n await
yield xs
main :: IO ()
main = do
runEffect $ numbers >-> chunkify 10000 >-> P.take 1 //> lift . print . last
To understand the issue we need to know two things:
replicateM
:replicateM :: Monad m => Int -> m a -> m [a]
replicateM n op = go n
where
go 0 = return []
go k = do
x <- op
xs <- go (k-1)
return (x : xs)
numList :: Int -> [Int]
numList 0 = []
numList n = numList (n-1) ++ [n]
(++) :: [a] -> [a] -> [a]
[] ++ l = l
(x : xs) ++ l = x : (xs ++ l)
(>>=) :: Monad m => Proxy a a' b b' m x
-> (x -> Proxy a a' b b' m r)
-> Proxy a a' b b' m r
Pure x >>= p = p x
Request a ca' >>= p = Request a (\a' -> ca' a' >>= p)
This doesn’t happen with most commonly used monads, like IO
, StateT
, ReaderT
. How come?
But, this always happens with monads that represent structure of computation explicitly, like Proxy
, ConduitT
, Free
, operational
Program
etc.
Solutions?
replicateM
, which is right-leaning:accReplicateM :: Monad m => Int -> m a -> m [a]
accReplicateM n op = go n []
where
go 0 acc = return $ reverse acc
go k acc = do
x <- op
go (k-1) (x : acc)
Data.Sequence
’s replicateM
. (And get a O(n log n) behavior, instead of O(n2) one.)The magic is the same as for the list case: continuations.
The big gun: Codensity
monad from kan-extensions
.
chunkify :: Monad m => Int -> Pipe a [a] m r
chunkify n = forever $ do
xs <- lowerCodensity $ replicateM n (lift await)
yield xs
The universal solution: Church-encoding your data structure.
Off topic: if you work a lot with date-time values in Haskell and are annoyed with the time
library, I’d like to hear from you!
For correct and efficient (and pure) handling of time zones check out hackage.haskell.org/package/tz
hoist :: (MFuctor t, Monad m) => (forall a. m a -> n a) -> t m b -> t n b
With this you can use a rigidly typed pipe in a different pipeline.
import Control.Monad.Trans.State
import Pipes.Lift
pipeInIO :: Pipe a a IO ()
pipeInStateIO :: Pipe a a (StateT s IO) ()
pipeInStateIO = hoist lift pipeInIO
evalStateP :: Monad m => s -> Proxy a' a b' b (StateT s m) r -> Proxy a' a b' b m r
distribute :: (Monad m, MonadTrans t, MFunctor t, Monad (t m), Monad (t (Proxy a' a b' b m)))
=> Proxy a' a b' b (t m) r
-> t (Proxy a' a b' b m) r
distribute p = runEffect $ request' >\\ hoist (hoist lift) p //> respond'
where
request' = lift . lift . request
respond' = lift . lift . respond
runStateP :: Monad m
=> s
-> Proxy a' a b' b (S.StateT s m) r
-> Proxy a' a b' b m (r, s)
runStateP s p = (`S.runStateT` s) $ distribute p
Space, Right Arrow or swipe left to move to next slide, click help below for more details