Haskell Part 5: Concurrent programming

Martin Sulzmann

Concurrency versus Parallelism

Shared-Memory vs Message-Passing

Concurrently executing threads need to communicate/synchronize. How?

There are two principled approaches:

We will take a look at both (from the Haskell point of view).

Common pitfalls

Data race:

Deadlock:

Haskell

Haskell is great for concurrent programming

In-place update in Haskell (imperative variables)

newIORef :: a -> IO (IORef a)
readIORef :: IORef a -> IO a
writeIORef :: IORef a -> a -> IO ()
 int x = 1;
 x = x + 1;
  x <- newIORef (1::Int)
  v <- readIORef x
  writeIORef x (v+1)

Feels like three address code/assembler. But precise control about side effecting functions!

Concurrent execution of programs

import Control.Concurrent
import Data.IORef

doWhile cmd cond = do cmd
                      v <- cond
                      if v
                        then doWhile cmd cond
                        else return ()

steps = 100

check cnt = do v <- readIORef cnt
               if v >= steps
                 then return False
                 else return True

thread cnt s = doWhile (do putStrLn s
                           v <- readIORef cnt
                           writeIORef cnt (v+1)
                           threadDelay 1000)
                       (check cnt)

main = do cnt <- newIORef 0
          forkIO $ thread cnt "A"
          forkIO $ thread cnt "B"
          thread cnt "C"

Observations:

Mutuable shared variables (MVar)

newEmptyMVar :: IO (MVar a)
newMVar :: a -> IO (MVar a)
takeMVar :: MVar a -> IO a
putMVar :: MVar a -> a -> IO ()
readMVar :: MVar a -> IO a
import Control.Concurrent
import Control.Concurrent.MVar

doWhile cmd cond = do cmd
                      v <- cond
                      if v
                        then doWhile cmd cond
                        else return ()

steps = 100

check cnt = do v <- readMVar cnt
               if v >= steps
                 then return False
                 else return True

thread cnt s = doWhile (do putStrLn s
                           v <- takeMVar cnt
                           putMVar cnt (v+1)
                           threadDelay 1000)
                       (check cnt)

main = do cnt <- newMVar 0
          forkIO $ thread cnt "A"
          forkIO $ thread cnt "B"
          thread cnt "C"

MVar guarantees mutually exclusive access to the counter.

Message-passing with channels

newChan :: IO (Chan a)
writeChan :: Chan a -> a -> IO ()
readChan :: Chan a -> IO a

Chan is built upon MVar. See here http://www.haskell.org/ghc/docs/latest/html/libraries/base/Control-Concurrent-Chan.html

producer ch = do
  mapM_(\x -> do writeChan ch x
                 threadDelay 1000)
       ([1..100] ++ [0])

consumer ch = do
  v <- readChan ch
  if v == 0
   then return ()
   else do putStrLn ("Received : " ++ show v)
           consumer ch

mainCP = do
  ch <- newChan
  forkIO (producer ch)
  consumer ch

Shared-Memory vs Message-Passing (landscape)

Shared-memory:

Message-passing:

Notice that MVar classifies as shared-memory as well as a message-passing primitve.

-- Message-passing
receive = takeMVar
send    = putMVar

-- Mutex
-- initially full MVar
lock   = takeMVar
unLock = putMVar

Building (concurrency) abstractions

Barrier with Chan

type Barrier = (Chan (), Int)

newBarrier :: Int -> IO Barrier
newBarrier n = do b <- newChan
                  return (b,n)

signalBarrier :: Barrier -> IO ()
signalBarrier (b,_) = writeChan b ()

waitBarrier :: Barrier -> IO ()
waitBarrier (b,n) = do mapM (\_ -> readChan b) [1..n]
                       return ()

Select with MVar

Often we're only interested if at least one of some tasks has some finished.

select = do
  m  <- newEmptyMVar 
  forkIO $ do task1
              putMVar m "Task1 Done"

  forkIO $ do task1
              putMVar m "Task2 Done"

  r <- takeMVar m

  return ()

Futures/promises

Futures/promises are a concurrent data structures to manage (not yet completed) computatations in a non-blocking (asynchronous) fashion.

A future can be viewed as a placeholder for a computation that will eventually become available. The term promise is often referred to a form of future where the result can be explicitly provided by the programmer. However, in many descriptions both terms can be used interchangeably.

We make the following distinction.

Simple promise interface with MVar (and exploring the design space a bit)



-- Exploring the design space of futures/promises

import Control.Concurrent
import Control.Concurrent.MVar

type Promise a = MVar a

-- | Promises represented by an MVar.
-- The promise is initially empty.
new :: IO (Promise a)
new = newEmptyMVar

-- | Retrieve promise value.
-- Blocks if value is not yet available.
-- Value is just read.
get :: Promise a -> IO a
get = readMVar

-- | Retrieve promise value via a callback function.
-- Non-blocking. Callback executed once valie is available.
onComplete :: Promise a -> (a -> IO ()) -> IO ()
onComplete p f = do forkIO $ do v <- get p
                                f v
                    return ()

-- | Complete a promise by executing a command.
-- The returned value will be assigned to the promise,
-- if the promise has not been completed yet.
-- That is, promises can be written only once.
tryComplete :: Promise a -> (IO a) -> IO ()
tryComplete p x = do forkIO $ do v <- x
                                 putMVar p v
                     return ()


-- Some derived combinators

-- | Non-blocking execution of some command.
-- The (returned) value, once available, can be retrieved via a promise.
-- Commonly, this form of a promise is referred to as a future.
future :: IO a -> IO (Promise a)
future f = do p <- new
              tryComplete p f
              return p

-- | Complete a promise via another promise.
tryCompleteWith p f = do
    onComplete f (\x -> do tryComplete p (return x)
                           return ())

-- | Select among the first completed promise.
first p1 p2 = do
   p <- new
   tryCompleteWith p p1
   tryCompleteWith p p2
   return p


-- | Example to illustrate the above derived combinators.
holidayBooking = do p1 <- future getQuoteUSA
                    p2 <- future getQuoteCH
                    p <- first p1 p2
                    onComplete p (\s -> putStrLn $ "Hooray: " ++ s)
                    threadDelay 10000

getQuoteUSA = do threadDelay 1000
                 return "USA"

getQuoteCH = do threadDelay 5000
                return "CH"



-- Further extensions.
-- 1. A promise may fail (e.g. timeout).
--   Can be supported via exceptions or the Maybe data type.
-- 
-- 2. Our use of `forks` is rather wasteful.
-- Not so much any issue in the Haskell setting where threads are cheap.
-- In a refined design, a not yet completed promise collects all callbacks,
-- and will execute them all at once, once the promise will be completed.

"Locks" are evil

We will use swap as our running example to illustrate the tricky points of concurrent programming.

We first a version which contains a data race.

Then, we use "locking" primitives to fix the data race. We either achieve poor scalability or we may run into deadlocks.

Later, we will consider a much simpler solution using Software Transactional Memory (STM).

Swap with data race

swapIORef :: Barrier -> IORef a -> IORef a -> IO ()
swapIORef b x y = do
   vx <- readIORef x
   vy <- readIORef y
   writeIORef x vy
   writeIORef y vx
   signalBarrier b
test1 = do
   b <- newBarrier 2
   x1 <- newIORef (1::Int)
   x2 <- newIORef (2::Int)
   x3 <- newIORef (3::Int)
   forkIO $ swapIORef b x1 x2
   forkIO $ swapIORef b x2 x3
 
   waitBarrier b

Complete Source Code

import Control.Concurrent
import Control.Concurrent.STM
import Data.IORef
import Data.List


type Barrier = (Chan (), Int)

newBarrier :: Int -> IO Barrier
newBarrier n = do b <- newChan
                  return (b,n)

signalBarrier :: Barrier -> IO ()
signalBarrier (b,_) = writeChan b ()

waitBarrier :: Barrier -> IO ()
waitBarrier (b,n) = do mapM (\_ -> readChan b) [1..n]
                       return ()


swapIORef :: Barrier -> IORef a -> IORef a -> IO ()
swapIORef b x y = do
   vx <- readIORef x
   vy <- readIORef y
   writeIORef x vy
   writeIORef y vx
   signalBarrier b

test1 = do
   b <- newBarrier 2
   x1 <- newIORef (1::Int)
   x2 <- newIORef (2::Int)
   x3 <- newIORef (3::Int)
   forkIO $ swapIORef b x1 x2
   forkIO $ swapIORef b x2 x3
 
   waitBarrier b

   [v1,v2,v3] <- mapM (\x -> readIORef x) [x1,x2,x3]
   putStrLn $ "x1 = " ++ show v1
   putStrLn $ "x2 = " ++ show v2
   putStrLn $ "x3 = " ++ show v3
     
   return ()

Guaranteeing mutually exclusive access to swapped values

The simplest solution is to impose a global lock.

This results in poor scalability because all concurrent swaps, even if they don't interfere, synchronize via the same global lock.

What about having a lock per swapped variable? Deadlocks are possible. For example, consider the concurrent execution of swap x1 x2 and swap x2 x1.

Short summary

Software Transactional Memory (STM)

Let's first recall the four rules of data base transactions

STM = data base transactions transferred to the programming language setting

Durability is dropped. RAM is not persistent

STM Interface

data STM a
data TVar a
newTVarIO  :: a -> IO (TVar a)
newTVar :: a -> STM (TVar s)
readTVar :: TVar a -> STM a
writeTVar :: TVar a -> a -> STM ()
atomically :: STM a -> IO a
retry      :: STM a
orElse :: STM a-> STM a-> STM a 

Atomic swap example

swap :: TVar a -> TVar a -> STM ()
swap x y = do
   vx <- readTVar x
   vy <- readTVar y
   writeTVar x vy
   writeTVar y vx

testSwap :: IO ()
testSwap = do
   x1 <- newTVarIO (3::Int)
   x2 <- newTVarIO (5::Int)
   atomically (swap x1 x2)

IO and STM can't be mixed

someIOSideEffect :: IO ()
someIOSideEffect = do deleteSomeFile
                      fireMissile

The following yields a type error

prog = do ...
          atomically someIOSideEffect 
          ...

Bank account - retry

transfer :: TVar Int -> TVar Int -> Int -> IO ()
transfer fromAcc toAcc amount =
 atomically $ do
   f <- readTVar fromAcc
   if f <= amount
     then retry
     else do
       writeTVar fromAcc (f - amount)
       t <- readTVar toAcc
       writeTVar toAcc (t + amount)

Bank account - orElse

transfer2 :: TVar Int -> TVar Int -> Int -> STM ()
transfer2 fromAcc toAcc amount = 
 (do f <- readTVar fromAcc
     if f <= amount
      then retry
      else do
       writeTVar fromAcc (f - amount)
       t <- readTVar toAcc
       writeTVar toAcc (t + amount) )
 `orElse`
    transfer2 fromAcc toAcc (amount-50) 

STM Summary

Atomic compare-swap

Most STM implementations rely on a compare-and-swap (CAS) instruction.

CAS in Haskell.

casIORef :: Eq a => IORef a -> a -> a -> IO Bool
casIORef ptr old new =
   atomicModifyIORef ptr (\ cur -> if cur == old
                                   then (new, True)
                                   else (cur, False))

The behavior of primitive atomicModifyIORef is as follows.

atomicModifyIORef :: IORef a -> (a -> (a,b)) -> IO b
atomicModifyIORef r f = do
a <- readIORef r
let p = f a
writeIORef r (fst p)
return (snd p)

where we assume that the above sequence of operations are executed atomically.

CAS with STM.

casSTM :: Eq a => TVar a -> a -> a -> IO Bool
casSTM ptr old new =
atomically $ do
  cur <- readTVar ptr
  if cur == old
   then do writeTVar ptr new
           return True
   else return False

Exercises: Concurrent Programming with STM

STM-based Semaphore

The semaphore is classic concurrent data structure. Below you are given an interface to an unbounded semaphore. Your task is to provide the implementation details. Your implementation shall make use of STM

newSem :: Int -> IO Sem
p :: Sem -> STM ()
v :: Sem -> STM ()

Dinining Philosophers

Implement the classic concurrency example of dining philosophers with STM. You will find many solutions already on the net. Additional tasks: Does your STM solution guarantee fairness (each philosopher will eventually be able to eat). Compare STM solutions against traditional lock-based solutions.

Hints: Represents forks as transactional variables. For simplicity, you may consider a specific instances with three philosophers and four forks.

Sleeping Barber

This is yet another classic concurrency example.

The barber has one barber chair and a waiting room with a number of chairs in it. When the barber finishes cutting a customer's hair, he dismisses the customer and then goes to the waiting room to see if there are other customers waiting. If there are, he brings one of them back to the chair and cuts his hair. If there are no other customers waiting, he returns to his chair and sleeps in it.

Each customer, when he arrives, looks to see what the barber is doing. If the barber is sleeping, then the customer wakes him up and sits in the chair. If the barber is cutting hair, then the customer goes to the waiting room. If there is a free chair in the waiting room, the customer sits in it and waits his turn. If there is no free chair, then the customer leaves.

How to represent the barber and its shop? For example, the barber could be represented as a transactional variable which has several states (cutting, sleeping). In a first instance, you could ignore waiting chairs.

Appendix

You will need the following imports in your program.

import Control.Concurrent
import Control.Concurrent.STM
import Data.IORef
import Data.List

Sample Solutions

import Control.Concurrent
import Control.Concurrent.STM
import Data.IORef
import Data.List


-- semaphore
type Sem = TVar Int

newSem :: Int -> IO Sem
newSem n = newTVarIO n

p :: Sem -> STM ()
p sem = do n <- readTVar sem
           if n > 0
             then writeTVar sem (n-1)
             else retry

v :: Sem -> STM ()
v sem = do n <- readTVar sem
           writeTVar sem (n+1)


testSem = do s <- newSem 5
             atomically $ p s
             atomically $ v s


-- Dining philosopher:
-- We assume that forks are unordered.
philo :: Int -> TVar Int -> Chan [Char] -> IO ()
philo no forks out = do
   atomically $ do v <- readTVar forks
                   if v <= 1 
                    then retry
                    else writeTVar forks (v-2)

   writeChan out $ "Philosopher " ++ show no ++ " is eating"
   threadDelay 1000000

   atomically $ do v <- readTVar forks
                   writeTVar forks (v+2)

   philo no forks out


printOut out = do
   txt <- readChan out
   putStrLn txt
   printOut out

testPhilo = do 
   out <- newChan
   forks <- newTVarIO 4
   forkIO $ philo 1 forks out
   forkIO $ philo 2 forks out
   forkIO $ philo 3 forks out
   printOut out