Martin Sulzmann
Mean different things
Concurrently executing threads need to communicate/synchronize. How?
There are two principled approaches:
shared-memory
message-passing
We will take a look at both (from the Haskell point of view).
Data race:
Two concurrent writes to the same memory location, or
One concurrent read and one concurrent write to the same memory location.
Deadlock:
Haskell is great for concurrent programming
newIORef :: a -> IO (IORef a)
readIORef :: IORef a -> IO a
writeIORef :: IORef a -> a -> IO ()
IORef a
is mutable reference to a value of type a
C example
int x = 1;
x = x + 1;
x <- newIORef (1::Int)
v <- readIORef x
writeIORef x (v+1)
Feels like three address code/assembler. But precise control about side effecting functions!
import Control.Concurrent
import Data.IORef
doWhile cmd cond = do cmd
v <- cond
if v
then doWhile cmd cond
else return ()
steps = 100
check cnt = do v <- readIORef cnt
if v >= steps
then return False
else return True
thread cnt s = doWhile (do putStrLn s
v <- readIORef cnt
writeIORef cnt (v+1)
threadDelay 1000)
(check cnt)
main = do cnt <- newIORef 0
forkIO $ thread cnt "A"
forkIO $ thread cnt "B"
thread cnt "C"
forkIO :: IO () -> IO ThreadId
threadDelay :: Int -> IO ()
Observations:
Non-deterministic execution
Data-races!
newEmptyMVar :: IO (MVar a)
newMVar :: a -> IO (MVar a)
takeMVar :: MVar a -> IO a
putMVar :: MVar a -> a -> IO ()
readMVar :: MVar a -> IO a
MVar a
a
takeMVar
blocks if emptyputMVar
blocks if fulltakeMVar
/putMVar
call will be unblocked (wait queue)readMVar
effectively sequence of takeMvar
and putMvar
import Control.Concurrent
import Control.Concurrent.MVar
doWhile cmd cond = do cmd
v <- cond
if v
then doWhile cmd cond
else return ()
steps = 100
check cnt = do v <- readMVar cnt
if v >= steps
then return False
else return True
thread cnt s = doWhile (do putStrLn s
v <- takeMVar cnt
putMVar cnt (v+1)
threadDelay 1000)
(check cnt)
main = do cnt <- newMVar 0
forkIO $ thread cnt "A"
forkIO $ thread cnt "B"
thread cnt "C"
MVar
guarantees mutually exclusive access to the counter.
MVar represents a one-place buffer
Chan = unbounded FIFO channel
newChan :: IO (Chan a)
writeChan :: Chan a -> a -> IO ()
readChan :: Chan a -> IO a
Chan is built upon MVar. See here http://www.haskell.org/ghc/docs/latest/html/libraries/base/Control-Concurrent-Chan.html
producer ch = do
mapM_(\x -> do writeChan ch x
threadDelay 1000)
([1..100] ++ [0])
consumer ch = do
v <- readChan ch
if v == 0
then return ()
else do putStrLn ("Received : " ++ show v)
consumer ch
mainCP = do
ch <- newChan
forkIO (producer ch)
consumer ch
Shared-memory:
IORefs
Locks (Mutex), Semaphore
MVar
...
Message-passing:
MVar
Chan
Channel of a fixed buffer size
Actors (a form of Chan, where there are multiple writers but only a single reader)
...
Notice that MVar classifies as shared-memory as well as a message-passing primitve.
-- Message-passing
receive = takeMVar
send = putMVar
-- Mutex
-- initially full MVar
lock = takeMVar
unLock = putMVar
Chan
type Barrier = (Chan (), Int)
newBarrier :: Int -> IO Barrier
newBarrier n = do b <- newChan
return (b,n)
signalBarrier :: Barrier -> IO ()
signalBarrier (b,_) = writeChan b ()
waitBarrier :: Barrier -> IO ()
waitBarrier (b,n) = do mapM (\_ -> readChan b) [1..n]
return ()
MVar
Often we're only interested if at least one of some tasks has some finished.
select = do
m <- newEmptyMVar
forkIO $ do task1
putMVar m "Task1 Done"
forkIO $ do task1
putMVar m "Task2 Done"
r <- takeMVar m
return ()
putMVar
.takeMVar
.Futures/promises are a concurrent data structures to manage (not yet completed) computatations in a non-blocking (asynchronous) fashion.
A future can be viewed as a placeholder for a computation that will eventually become available. The term promise is often referred to a form of future where the result can be explicitly provided by the programmer. However, in many descriptions both terms can be used interchangeably.
We make the following distinction.
Futures represent not yet completed representations. Once completed, the result never changes ("read-many").
A promise is a form of future where the result can be explcitely provided at a designated program point. The result can only be provided once. We say, the promise has been completed. Further completion attempts have no effect ("write-once").
-- Exploring the design space of futures/promises
import Control.Concurrent
import Control.Concurrent.MVar
type Promise a = MVar a
-- | Promises represented by an MVar.
-- The promise is initially empty.
new :: IO (Promise a)
new = newEmptyMVar
-- | Retrieve promise value.
-- Blocks if value is not yet available.
-- Value is just read.
get :: Promise a -> IO a
get = readMVar
-- | Retrieve promise value via a callback function.
-- Non-blocking. Callback executed once valie is available.
onComplete :: Promise a -> (a -> IO ()) -> IO ()
onComplete p f = do forkIO $ do v <- get p
f v
return ()
-- | Complete a promise by executing a command.
-- The returned value will be assigned to the promise,
-- if the promise has not been completed yet.
-- That is, promises can be written only once.
tryComplete :: Promise a -> (IO a) -> IO ()
tryComplete p x = do forkIO $ do v <- x
putMVar p v
return ()
-- Some derived combinators
-- | Non-blocking execution of some command.
-- The (returned) value, once available, can be retrieved via a promise.
-- Commonly, this form of a promise is referred to as a future.
future :: IO a -> IO (Promise a)
future f = do p <- new
tryComplete p f
return p
-- | Complete a promise via another promise.
tryCompleteWith p f = do
onComplete f (\x -> do tryComplete p (return x)
return ())
-- | Select among the first completed promise.
first p1 p2 = do
p <- new
tryCompleteWith p p1
tryCompleteWith p p2
return p
-- | Example to illustrate the above derived combinators.
holidayBooking = do p1 <- future getQuoteUSA
p2 <- future getQuoteCH
p <- first p1 p2
onComplete p (\s -> putStrLn $ "Hooray: " ++ s)
threadDelay 10000
getQuoteUSA = do threadDelay 1000
return "USA"
getQuoteCH = do threadDelay 5000
return "CH"
-- Further extensions.
-- 1. A promise may fail (e.g. timeout).
-- Can be supported via exceptions or the Maybe data type.
--
-- 2. Our use of `forks` is rather wasteful.
-- Not so much any issue in the Haskell setting where threads are cheap.
-- In a refined design, a not yet completed promise collects all callbacks,
-- and will execute them all at once, once the promise will be completed.
We will use swap as our running example to illustrate the tricky points of concurrent programming.
We first a version which contains a data race.
Then, we use "locking" primitives to fix the data race. We either achieve poor scalability or we may run into deadlocks.
Later, we will consider a much simpler solution using Software Transactional Memory (STM).
swapIORef :: Barrier -> IORef a -> IORef a -> IO ()
swapIORef b x y = do
vx <- readIORef x
vy <- readIORef y
writeIORef x vy
writeIORef y vx
signalBarrier b
The swap function may be executed concurrently. Therefore, we use a barrier to signal the calling thread ones swapping is done.
In case of several concurrent swaps, we may potentially run into a data race.
test1 = do
b <- newBarrier 2
x1 <- newIORef (1::Int)
x2 <- newIORef (2::Int)
x3 <- newIORef (3::Int)
forkIO $ swapIORef b x1 x2
forkIO $ swapIORef b x2 x3
waitBarrier b
readIORef
and writeIORef
happen atomically. Therefore, the value stored in x2
could be corrupted.import Control.Concurrent
import Control.Concurrent.STM
import Data.IORef
import Data.List
type Barrier = (Chan (), Int)
newBarrier :: Int -> IO Barrier
newBarrier n = do b <- newChan
return (b,n)
signalBarrier :: Barrier -> IO ()
signalBarrier (b,_) = writeChan b ()
waitBarrier :: Barrier -> IO ()
waitBarrier (b,n) = do mapM (\_ -> readChan b) [1..n]
return ()
swapIORef :: Barrier -> IORef a -> IORef a -> IO ()
swapIORef b x y = do
vx <- readIORef x
vy <- readIORef y
writeIORef x vy
writeIORef y vx
signalBarrier b
test1 = do
b <- newBarrier 2
x1 <- newIORef (1::Int)
x2 <- newIORef (2::Int)
x3 <- newIORef (3::Int)
forkIO $ swapIORef b x1 x2
forkIO $ swapIORef b x2 x3
waitBarrier b
[v1,v2,v3] <- mapM (\x -> readIORef x) [x1,x2,x3]
putStrLn $ "x1 = " ++ show v1
putStrLn $ "x2 = " ++ show v2
putStrLn $ "x3 = " ++ show v3
return ()
The simplest solution is to impose a global lock.
This results in poor scalability because all concurrent swaps, even if they don't interfere, synchronize via the same global lock.
What about having a lock per swapped variable? Deadlocks are possible. For example, consider the concurrent execution of swap x1 x2
and swap x2 x1
.
Let's first recall the four rules of data base transactions
STM = data base transactions transferred to the programming language setting
Durability is dropped. RAM is not persistent
data STM a
data TVar a
newTVarIO :: a -> IO (TVar a)
newTVar :: a -> STM (TVar s)
readTVar :: TVar a -> STM a
writeTVar :: TVar a -> a -> STM ()
atomically :: STM a -> IO a
retry :: STM a
orElse :: STM a-> STM a-> STM a
TVar a
identifies a transactional variable which holds a value of type a
IORef a
!STM
identifies a transactional computation
IO
!swap :: TVar a -> TVar a -> STM ()
swap x y = do
vx <- readTVar x
vy <- readTVar y
writeTVar x vy
writeTVar y vx
testSwap :: IO ()
testSwap = do
x1 <- newTVarIO (3::Int)
x2 <- newTVarIO (5::Int)
atomically (swap x1 x2)
atomically
executes a STM transaction
IO
and STM
someIOSideEffect :: IO ()
someIOSideEffect = do deleteSomeFile
fireMissile
The following yields a type error
prog = do ...
atomically someIOSideEffect
...
atomically
expects STM
and not IO
STM
computations may only affect transactional variablesIO
actions like fireMissile
atomically
on IO
actionstransfer :: TVar Int -> TVar Int -> Int -> IO ()
transfer fromAcc toAcc amount =
atomically $ do
f <- readTVar fromAcc
if f <= amount
then retry
else do
writeTVar fromAcc (f - amount)
t <- readTVar toAcc
writeTVar toAcc (t + amount)
retry
retry
transfer2 :: TVar Int -> TVar Int -> Int -> STM ()
transfer2 fromAcc toAcc amount =
(do f <- readTVar fromAcc
if f <= amount
then retry
else do
writeTVar fromAcc (f - amount)
t <- readTVar toAcc
writeTVar toAcc (t + amount) )
`orElse`
transfer2 fromAcc toAcc (amount-50)
orElse
if a transaction is retriedIn general, concurrent programs are much easier to get right with STM than with locks
STM is particularly beautiful in Haskell thanks to the strict separation between IO
and STM
computations
Most STM implementations rely on a compare-and-swap (CAS) instruction.
CAS in Haskell.
casIORef :: Eq a => IORef a -> a -> a -> IO Bool
casIORef ptr old new =
atomicModifyIORef ptr (\ cur -> if cur == old
then (new, True)
else (cur, False))
The behavior of primitive atomicModifyIORef
is as follows.
atomicModifyIORef :: IORef a -> (a -> (a,b)) -> IO b
atomicModifyIORef r f = do
a <- readIORef r
let p = f a
writeIORef r (fst p)
return (snd p)
where we assume that the above sequence of operations are executed atomically.
CAS with STM.
casSTM :: Eq a => TVar a -> a -> a -> IO Bool
casSTM ptr old new =
atomically $ do
cur <- readTVar ptr
if cur == old
then do writeTVar ptr new
return True
else return False
The semaphore is classic concurrent data structure. Below you are given an interface to an unbounded semaphore. Your task is to provide the implementation details. Your implementation shall make use of STM
newSem :: Int -> IO Sem
p :: Sem -> STM ()
v :: Sem -> STM ()
Implement the classic concurrency example of dining philosophers with STM. You will find many solutions already on the net. Additional tasks: Does your STM solution guarantee fairness (each philosopher will eventually be able to eat). Compare STM solutions against traditional lock-based solutions.
Hints: Represents forks as transactional variables. For simplicity, you may consider a specific instances with three philosophers and four forks.
This is yet another classic concurrency example.
The barber has one barber chair and a waiting room with a number of chairs in it. When the barber finishes cutting a customer's hair, he dismisses the customer and then goes to the waiting room to see if there are other customers waiting. If there are, he brings one of them back to the chair and cuts his hair. If there are no other customers waiting, he returns to his chair and sleeps in it.
Each customer, when he arrives, looks to see what the barber is doing. If the barber is sleeping, then the customer wakes him up and sits in the chair. If the barber is cutting hair, then the customer goes to the waiting room. If there is a free chair in the waiting room, the customer sits in it and waits his turn. If there is no free chair, then the customer leaves.
How to represent the barber and its shop? For example, the barber could be represented as a transactional variable which has several states (cutting, sleeping). In a first instance, you could ignore waiting chairs.
You will need the following imports in your program.
import Control.Concurrent
import Control.Concurrent.STM
import Data.IORef
import Data.List
import Control.Concurrent
import Control.Concurrent.STM
import Data.IORef
import Data.List
-- semaphore
type Sem = TVar Int
newSem :: Int -> IO Sem
newSem n = newTVarIO n
p :: Sem -> STM ()
p sem = do n <- readTVar sem
if n > 0
then writeTVar sem (n-1)
else retry
v :: Sem -> STM ()
v sem = do n <- readTVar sem
writeTVar sem (n+1)
testSem = do s <- newSem 5
atomically $ p s
atomically $ v s
-- Dining philosopher:
-- We assume that forks are unordered.
philo :: Int -> TVar Int -> Chan [Char] -> IO ()
philo no forks out = do
atomically $ do v <- readTVar forks
if v <= 1
then retry
else writeTVar forks (v-2)
writeChan out $ "Philosopher " ++ show no ++ " is eating"
threadDelay 1000000
atomically $ do v <- readTVar forks
writeTVar forks (v+2)
philo no forks out
printOut out = do
txt <- readChan out
putStrLn txt
printOut out
testPhilo = do
out <- newChan
forks <- newTVarIO 4
forkIO $ philo 1 forks out
forkIO $ philo 2 forks out
forkIO $ philo 3 forks out
printOut out