Martin Sulzmann
Light-weight threads
Channel-based communication
Non-deterministic selection
Philosophy: “Do not communicate by sharing memory. Instead share by communicating.”
Parallelism: Make programs run faster by making use of additional CPUs (parallel hardware)
Concurrency: Program organized into multiple threads of control. Threads may work independently or work on a common task.
See also here https://wiki.haskell.org/Parallelism_vs._Concurrency
Concurrent execution: "just say go"
package main
import "fmt"
import "time"
func thread(s string) {
for {
fmt.Print(s)
time.Sleep(1 * 1e9)
}
}
func main() {
go thread("A")
go thread("B")
thread("C")
}
In Go we find cooperative multi-threading. That is, a thread will be executed until we reach a blocking statement (such as 'Sleep', receive on an empty channel, ...).
Let's consider a sample execution of the above example.
Each thread is in one of the following states:
* Running
* Waiting
* Blocked
Main.Running
--> (Main.Running, A.Waiting)
--> (Main.Running, A.Waiting, B.Waiting)
--> (Main.Blocked, A.Waiting, B.Waiting)
Main Thread is blocked due to Sleep
We pass control to one of the waiting threads
Generally, waiting threads are managed by a queue (FIFO)
...
--> (Main.Blocked, A.Waiting, B.Waiting)
--> (Main.Blocked, A.Running, B.Waiting)
--> (Main.Waiting, A.Blocked, B. Waiting)
The A thread blocks due to Sleep
Meanwhile, the Main thread switches from Blocked to Waiting (assuming that more than 1 second has passed)
...
--> (Main.Waiting, A.Blocked, B. Waiting)
--> (Main.Waiting, A.Blocked, B.Running)
and so on ...
Concurrent programming is a tricky business. Consider the following example.
type position struct {
x int
y int
}
func main() {
var p position
// Producer 1
go func() {
p.x = 1
p.y = 2
}()
// Producer 2
go func() {
p.x = 2
p.y = 3
}()
// Consumer
go func() {
time.Sleep(1 * 1e9)
x1 := p.x
y1 := p.y
}()
}
A consumer threads waits for x-y coordinates supplied by either producer 1 or producer 2 thread.
There are several issues:
Synchronization is via a sleep
statement. There is no guarantee that once the consumer thread wakes up any of the producers has supplied the expected data.
Data races (data corruption) is possible. There is no guarantee that the x-y coordinates are written atomically.
Next, we take a look at channel-based communication in Go for thread synchronization and data race free exchange of information.
ch chan int
ch = make(chan int) // No buffer
ch = make(chan int, 50) // Buffer size 50
ch <- y
Send value y
over channel ch
x = <- ch
Receive from Kanal ch
. Store received value in x
package main
import "fmt"
import "time"
func snd(ch chan int) {
var x int = 0
for {
x++
ch <- x
time.Sleep(1 * 1e9)
}
}
func rcv(ch chan int) {
var x int
for {
x = <-ch
fmt.Printf("received %d \n", x)
}
}
func main() {
var ch chan int = make(chan int)
go snd(ch)
rcv(ch)
}
Sending only
func snd(ch chan <- int) {
...
}
Receiving only
func rcv(ch <- chan int) {
...
}
rcv.Running
--> (rcv.Running, snd.Waiting)
--> (rcv.Blocked_(<-ch?), snd.Waiting)
--> (rcv.Blocked_(<-ch?), snd.Running)
--> (rcv.Blocked_(<-ch?), snd.Blocked_(ch<-1?))
--> (rcv.Waiting, snd.Waiting)
--> (rcv.Running, snd.Waiting)
...
We consider the following variant.
func main() {
var ch chan int = make(chan int, 1) // Kanal mit Puffer
go snd(ch)
rcv(ch)
}
rcv.Running
--> (rcv.Running, snd.Waiting)
--> (rcv.Blocked_(<-ch?), snd.Waiting)
--> (rcv.Blocked_(<-ch?), snd.Running)
// after execution of ch <- 1
// the empty buffer becomes full
// snd thread keeps runing
--> (rcv.Blocked_(<-ch?), snd.Blocked_(Sleep(1s)?))
--> (rcv.Waiting, snd.Blocked_(Sleep(1s)?))
// channel buffer empty
--> (rcv.Running, snd.Blocked_(Sleep(1s)?))
...
As another variant. Channel with buffer size 1 and snd without Sleep.
func snd(ch chan int) {
var x int = 0
for {
x++
ch <- x
}
}
rcv.Running
--> (rcv.Running, snd.Waiting)
--> (rcv.Blocked_(<-ch?), snd.Waiting)
--> (rcv.Blocked_(<-ch?), snd.Running)
// after execution of ch <- 1
// the empty buffer becomes full
// snd thread keeps runing
--> (rcv.Blocked_(<-ch?), snd.Blocked_(ch<-2?))
// Two possibilies
// (a) rcv reads from channel, or
// (b) directly synchronizes with snd
//
// Go run-time follows case (a)
--> (rcv.Running, snd.Blocked_(ch<-2?))
// Channel buffer empty
--> (rcv.Blocked_(<-ch?), snd.Blocked_(ch<-2?))
// Two possibilities
// (a) snd writes into channel, or
// (b) directly synchronizes with rcv
//
// Go run-time follows case (a)
Observations:
In case of Sleep, execution becomes chaotic.
In case of a channel with buffer, send possibly is non-blocking (if enough buffer is available).
In case of a bufferless channel, the execution behavior is more predictable because each send must always synchronize with a receive.
package main
import "fmt"
func snd(ch chan int) {
var x int = 0
for {
x++
ch <- x
}
}
func rcv(s string, ch chan int) {
var x int
for {
x = <-ch
fmt.Printf("received %s %d \n", s, x)
}
}
func main() {
var ch chan int = make(chan int)
go snd(ch)
go rcv("A", ch)
rcv("B", ch)
}
Sample run (Macbook Air'15 OS 10.11)
received A 2
received A 3
received A 4
received A 5
received A 6
received A 7
received A 8
received A 9
received A 10
received B 1
received A 11
received A 12
received B 13
received B 15
received B 16
received B 17
received B 18
received B 19
received B 20
received B 21
received B 22
received B 23
received B 24
received B 25
received B 26
received B 27
received B 28
received B 29
received B 30
received B 31
received B 32
received B 33
received B 34
received B 35
received B 36
received B 37
received B 38
received B 39
received B 40
received B 41
received B 42
received B 43
received B 44
received B 45
received B 46
received B 47
received B 48
received B 49
received B 50
received B 51
received B 52
received B 53
received B 54
received B 55
received B 56
received B 57
received B 58
received B 59
received B 60
received B 61
received B 62
received B 63
received B 64
received B 65
received B 66
received B 67
received A 14
received B 68
received B 70
received B 71
received B 72
received B 73
received B 74
received B 75
received B 76
received B 77
received B 78
received A 69
received B 79
received A 80
received B 81
received B 83
received B 84
received B 85
received B 86
received B 87
received B 88
FYI, on my old MacBook'08, only thread A is receiving.
Go has support for standard synchronization primitives such as Mutex etc. See http://golang.org/pkg/sync/.
As an exercise, we implement our own Mutex via channels.
lock
equals send and unlock
equals receive.type Mutex (chan int)
func newMutex() Mutex {
var ch = make(chan int, 1)
return ch
}
func lock(m Mutex) {
m <- 1
}
func unlock(m Mutex) {
<-m
}
Variation:
Represent the mutex as a synchronouns channel (no buffer)
Hint: Within newMutex
create a "receiving thread". Thus, some initial lock
will not block.
Some event may block whereas the other may execute successfully
In which order to try events?
x = <-ch1
y = <-ch2
ch3 <- 1
versus
y = <-ch2
x = <-ch1
ch3 <- 1
Each event may block. How to try each event simultaneously?
select
primitive allows us to simultanenously try several eventsselect {
case x = <-ch1: ...
case y = <-ch2: ...
case ch3 <- 1:
// default and timeout possible
}
If one event, say <-ch1
is successful, then the respective case will be executed.
If there are several successful events, one event will be choosen indeterministically.
In theory, select
can be encoded with just threads and channels. However, a faithful encoding is far from trivial.
In the following, we consider a bunch of examples to illustrate the workings and expressive power of select
.
package main
import "fmt"
import "time"
import "math/rand"
func snd(ch chan int) {
var x int = 0
for {
x++
ch <- x
if rand.Int()%2 == 0 {
time.Sleep(1 * 1e9)
} else {
time.Sleep(5 * 1e9)
}
}
}
func rcv(ch1 chan int, ch2 chan int) {
var x int
for {
select {
case x = <-ch1:
fmt.Printf("received ch1 %d \n", x)
case x = <-ch2:
fmt.Printf("received ch2 %d \n", x)
}
}
}
func main() {
var ch1 chan int = make(chan int)
var ch2 chan int = make(chan int)
go snd(ch1)
go snd(ch2)
rcv(ch1, ch2)
}
Shows that an encoding of select
with threads + channels is far from trivial.
package main
import "fmt"
func reuters(ch chan string) {
ch <- "REUTERS"
}
func bloomberg(ch chan string) {
ch <- "BLOOMBERG"
}
func newsReaderWithThreads(reutersCh chan string, bloombergCh chan string) {
ch := make(chan string)
go func() {
ch <- (<-reutersCh)
}()
go func() {
ch <- (<-bloombergCh)
}()
x := <-ch
fmt.Printf("got news from %s \n", x)
}
func newsReaderWithSelect(reutersCh chan string, bloombergCh chan string) {
var x string
select {
case x = <-reutersCh:
case x = <-bloombergCh:
}
fmt.Printf("got news from %s \n", x)
}
func test1() {
reutersCh := make(chan string)
bloombergCh := make(chan string)
go reuters(reutersCh)
go bloomberg(bloombergCh)
newsReaderWithThreads(reutersCh, bloombergCh)
newsReaderWithThreads(reutersCh, bloombergCh)
}
func test2() {
reutersCh := make(chan string)
bloombergCh := make(chan string)
go reuters(reutersCh)
go bloomberg(bloombergCh)
newsReaderWithSelect(reutersCh, bloombergCh)
newsReaderWithSelect(reutersCh, bloombergCh)
}
func main() {
test1() // potentially deadlocks
// test2()
}
We attempt to encode select
by spawning two helper threads. See newsReaderWithThreads
.
Each helper treads waits for either a Reuters or Bloomberg message and redirects that messsage to a common channel.
The main thread waits on that common channel.
There is an issue if there are several newsreader and each newsreader attempts to consume either a Reuters or Bloomberg message.
Consider the following scenario:
The first newreader consumes the Reuters message.
As the Bloomberg helper thread is still running, it's entirely possible that the Bloomberg message is retrieved without ever being consumed.
Hence, the second newsreader will block as there are no messages left!
Try running the above program. The go run-time will fatal error: all goroutines are asleep - deadlock!
In contrast, the newsReaderWithSelect
version guarantees that each of the two newsreaders obtains either a Reuters or Bloomberg message.
package main
import "fmt"
import "time"
func task1() { time.Sleep(100 * time.Millisecond) }
func task2() { time.Sleep(200 * time.Millisecond) }
func task3() { time.Sleep(300 * time.Millisecond) }
func barrier() {
var ch = make(chan int)
// run all three tasks concurrently
go func() {
task1()
ch <- 1 // signal done
}()
go func() {
task2()
ch <- 1
}()
go func() {
task3()
ch <- 1
}()
// collect results concurrently
// set timeout to guarantee that within 500ms we're done
timeout := time.After(500 * time.Millisecond)
for i := 0; i < 3; i++ {
select {
case <-ch:
case <-timeout:
fmt.Println("timed out")
return
}
}
fmt.Println("done")
}
func main() {
barrier()
}
In the above, we guarantee that all tasks are completed within a certain time frame. In addition ensure that each tasks completes within a certain time frame.
Here's a possible attempt where we only show the relevant program parts.
timeout := time.After(500 * time.Millisecond)
for i := 0; i < 3; i++ {
timeoutEach := time.After(100 * time.Millisecond)
select {
case <-ch:
case <-timeout:
fmt.Println("timed out (global)")
return
case <-timeoutEach:
fmt.Println("timed out (local)")
return
}
}
fmt.Println("done")
We introduce a local timeout to keep track of the time spent each round. Notice that timeoutEach
will be reset each time we enter the for-loop body.
Will this work?
No. It does not work. We only guarantee that the first task completes within the given time frame. In subsequent rounds we restart timeoutEach
but some of the tasks are already running. Hence, there is no guarantee that all tasks respect the time frame imposed on each task.
What we require is to monitor each task with its own 'timeout' bound. We need something like this.
func completeWithin(task func(), ms time.Duration) chan bool {
var ch = make(chan int)
var res = make(chan bool)
go func() {
task()
ch <- 1
}()
t := time.After(ms * time.Millisecond)
go func() {
select {
case <-ch:
res <- true
case <-t:
res <- false
}
}()
return res
}
As we may want to perform this check for several tasks, the function shall be non-blocking. Hence, the result, either true = task completed within time frame, or false = task took longer is communicated via a channel. The channel can then be queried later.
// run all three tasks concurrently
// must complete within 500ms
r1 := completeWithin(task1, 500)
r2 := completeWithin(task2, 500)
r3 := completeWithin(task3, 500)
b1 := <-r1
b2 := <-r2
b3 := <-r3
if b1 && b2 && b3 {
fmt.Println("done")
} else {
fmt.Println("timed out")
}
The go run-time is able to detect if all goroutines are asleep (i.e. are mutually blocked).
We say the program has a deadlock. A deadlock may not necessarily arise for all program runs. For example, consider
ch := make(chan int)
go func() { // T1
<- ch
}()
go func() { // T2
ch <- 1
}()
ch <- 1
It's possible that thread T1 synchronizes with thread T2 and therefore we run into a deadlock. However, it is also possible that the main thread synchronizes with thread T1 (and no deadlock occurs).
Here's an example for which we encounter a deadlock for all possible program runs.
ch := make(chan int)
ch <- 1
Here's a sample run for which the program runs through.
(main.Running)
-> (main.Running, T1.Waiting)
-> (main.Running, T1.Waiting, T2.Waiting)
-> (main.Blocked_(ch<-1?), T1.Waiting, T2.Waiting)
-> (main.Blocked_(ch<-1?), T1.Running, T2.Waiting)
-> (main.Blocked_(ch<-1?), T1.Blocked_(<-ch?), T2.Waiting)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
synchronize
-> (main.Waiting, T1.Waiting, T2.Waiting)
-> (main.Running, T1.Waiting, T2.Waiting)
-> main terminates all other thread terminate as well
Here's another run where we encounter a deadlock.
(main.Running)
-> (main.Running, T1.Running)
-> (main.Running, T1.Running, T2.Waiting)
-> (main.Running, T1.Running, T2.Running)
-> (main.Blocked_(ch<-1?), T1.Blocked_(<-ch?), T2.Blocked_(ch<-1?))
either main synchronizes with T1, or
T1 synchronizes with T2 (we choose the latter option here)
-> (main.Blocked_(ch<-1?), T1.Waiting, T2.Waiting)
-> (main.Blocked_(ch<-1?), T1.Running, T2.Running)
-> (main.Blocked_(ch<-1?))
T1 and T2 terminate, main thread is still blocked! Hence, deadlock!
We know that the newsreader with thread variants (possibly) runs into a deadlock. On my Mac it seems that all runs result into a deadlock. To highlight that not all runs result into a deadlock, we include a sleep statement for one of the worker threads.
Run the below, in contrast to the earlier variant, no deadlock will (most likely) be observed.
func newsReaderWithThreads(reutersCh chan string, bloombergCh chan string) {
ch := make(chan string)
go func() {
ch <- (<-reutersCh)
}()
go func() {
ch <- (<-bloombergCh)
}()
x := <-ch
fmt.Printf("got news from %s \n", x)
}
func newsReaderWithThreadsAndSleep(reutersCh chan string, bloombergCh chan string) {
ch := make(chan string)
go func() {
ch <- (<-reutersCh)
}()
go func() {
time.Sleep(1 * 1e9)
ch <- (<-bloombergCh)
}()
x := <-ch
fmt.Printf("got news from %s \n", x)
}
func main() {
reutersCh := make(chan string)
bloombergCh := make(chan string)
go reuters(reutersCh)
go bloomberg(bloombergCh)
newsReaderWithThreadsAndSleep(reutersCh, bloombergCh)
newsReaderWithThreads(reutersCh, bloombergCh)
}
Channels are first-class citizens.
Channels can be arguments of channels.
`var ch chan (chan int)`
A channel where the transmitted values are themselves channels.
Thus, we can encode complex synchronization patterns.
package main
import "fmt"
import "time"
type Request struct {
id int
ack chan int
}
A request consists of an identification number and a channel. The channel is used to acknowledge a successful request.
func worker(req chan Request) {
var c Request
for {
c = <-req
fmt.Printf("request received from %d \n", c.id)
time.Sleep(1 * 1e9)
fmt.Println("notify")
c.ack <- 1
}
}
Wait for requests. Acknowledge reception of request via channel ack
.
func client(id int, req chan Request) {
var ack = make(chan int)
for {
c := Request{id, ack}
req <- c
<-ack
}
}
Transmit request and wait for acknowledgment by performing a receive on channel ack
.
func main() {
var req = make(chan Request)
go worker(req)
go client(1, req)
client(2, req)
}
A classic synchronization problem.
In its most simple form, the sleeping barber problem can be described as follows:
Here is a direct implementation.
package main
import "fmt"
import "time"
const (
NUMBER_OF_CHAIRS = 8
)
type Request struct {
id int
ack chan int
}
func barber(waitQ (chan Request)) {
for {
req := <-waitQ
fmt.Printf("BARBER: Serving customer %d \n", req.id)
time.Sleep(1 * 1e9)
fmt.Printf("BARBER: Done with customer %d \n", req.id)
req.ack <- 1
} // for
} // barber
func customer(waitQ (chan Request), id int) {
var ack = make(chan int)
for {
fmt.Printf("CUSTOMER: %d wants hair cut \n", id)
req := Request{id, ack}
waitQ <- req
fmt.Printf("CUSTOMER: %d sits on chair \n", id)
<-ack
fmt.Printf("CUSTOMER: %d served by barber \n", id)
time.Sleep(1 * 1e9)
} // for
} // customer
func main() {
var (
waitQ = make(chan Request, NUMBER_OF_CHAIRS)
)
go customer(waitQ, 1)
go customer(waitQ, 2)
barber(waitQ)
}
The following declaration introduces a synchronous channel.
ch := make(chan int)
Each sender blocks until a receiver arrives. For example, consider concurrent execution of
the following program fragments.
ch <- 1
x := <- ch
The following declaration introduces a asynchronous channel.
ch := make(chan int, 2)
The buffer size of the (asynchronous) channel is 2. As long as there is sufficient space in the buffer, a sender won't block (that is, act asynchronously).
For example, consider the concurrent execution of the following program fragments.
ch <- 1
ch <- 2
ch <- 3 // S
x := <- ch // R
The sender will block at program point S
until at program point R
we have received 1 (channels behave like queues according to the FIFO principle).
Suppose we execute the following program fragments concurrently. Will we run into a deadlock?
ch1 <- 1 // S1
ch2 <- 1 // S2
x := <-ch2 // R2
y := <-ch1 // R1
Depends on the kind of channels. Suppose, we have
ch1 := make(chan int)
ch2 := make(chan int)
The above program will then deadlock because
S1
can only be synchronized with R1
, butR1
is not reachable because the earlier program point R2
first has to synchronize with S2
.S2
is only reachable once S1
can be synchronized.Suppose, we have
ch1 := make(chan int,1)
ch2 := make(chan int)
The first send at program point S1
will not block and therefore the program runs through (without deadlock).
Suppose we want to implement a news reader, receiving information from various sources (Reuters, Bloomberg etc). In which order shall we receiver the news? Difficult to specify a fixed order. What if we first choose Reuters but for some reason Reuters has a temporary black-out. Well, then let's frist choose Bloomborg. Erh, same problem.
Let's have two threads, collecting news from Reuters and Bloomberg. Each thread passes on the news to a common channel which will be read by our news reader.
The thread taken care of Reuters:
for {
x:= <-chReuters
chNews <- x
}
The thread taking care of Bloomberg:
for {
x:= <-chBloomberg
chNews <- x
}
Finally, our news reader:
for {
someNews := <- chNews
}
Issue:
chNews := make(chan string)
go func() { x:= <-chReuters
chNews <- x}()
go func() { x:= <-chBloomberg
chNews <- x}()
someNews := <- chNews
So, there is a race among the two threads. As we are only interested in a single message, only one of the 'winning' thread will be able to transfer the message. The 'loosing' thread takes out the message from the news channel but as there is no receiver, this message will never be delivered and therefore is lost. For example, consider the situation that there are several clients only interested in a single message, either from Reuters and Bloomberg.
Hence, the 'loosing' thread would need to be informed to transfer back the unnecessarily consumed message. This requires a rather complicated protocol and also destroys the order in which messages are processed.
Good news. GoLang supports the non-deterministic selection over multiple events (i.e. send/receive). We can simply say
select {
case x := <-chReuters: // R
case y := <-chBloomberg: // B
}
select
blocks if neither a value has been transmitted over chReuters nor chBloomberg.
If a value has been transmitted via chReuters, select
will choose R.
If a value has been transmitted via chBloomberg, select
will choose B.
What if a value has been transmitted via chReuters and chBloomberg?
select
will non-deterministically choose among R or B.case
clauses does not matter.select
provides a fairness guarantee. This is useful in case there is high-traffic over say chReuters because we still will some times choose B.What if we prefer to read Reuters? We can use a nested select
statement with a default
case.
select {
case x := <-chReuters: // R
case y := <-chBloomberg: select {
case x' := <chReuters: // prefer Reuters
default: // Bloomberg if no Reuters news
}
}
default
case will only be choosen if all cases will blockConcurrency basics: Threading and channels in GoLang
package main
import "fmt"
import "time"
import "math/rand"
func snd(ch chan int) {
var x int = 0
for {
x++
ch <- x
if rand.Int()%2 == 0 {
time.Sleep(1 * 1e9)
} else {
time.Sleep(5 * 1e9)
}
}
}
func rcv(ch1 chan int, ch2 chan int) {
var x int
for {
select {
case x = <-ch1:
fmt.Printf("received ch1 %d \n", x)
case x = <-ch2:
fmt.Printf("received ch2 %d \n", x)
}
}
}
func main() {
var ch1 chan int = make(chan int)
var ch2 chan int = make(chan int)
go snd(ch1)
go snd(ch2)
rcv(ch1, ch2)
}
How to model/implement a Semaphore?
Your Semaphore implementation probably relies on a buffered channel. Could you implement a Semaphore with a synchronous channel only?
Sample solution mutex
package main
import "fmt"
type Mutex (chan int)
func newMutex() Mutex {
var ch = make(chan int, 1)
return ch
}
func lock(m Mutex) {
m <- 1
}
func unlock(m Mutex) {
<-m
}
func main() {
var m Mutex
m = newMutex()
lock(m)
fmt.Print("locked\n")
unlock(m)
fmt.Print("done\n")
}
Write a program which will deadlock
We can almost trivially model/implement semaphores via buffered channel.
Can we implement semaphores with just synchronous (bufferless) channels? Yes!
For simplicity, we consider a mutuable variable which roughly corresponds to a semaphore with quantity one. The MVar interface shall be as follows.
type MVar (chan int)
func newMVar(x int) MVar
func takeMVar(m MVar) int
func putMVar(m MVar, x int)
takeMVar
putMVar
takeMVar
corresonds to receiveputMVar
corresponds to sendLet's try to implement an MVar via a synchronous (bufferless) channel. Our first try has a bug which will be fixed later.
package main
import "fmt"
import "time"
type MVar (chan int)
func newMVar(x int) MVar {
var ch = make(chan int)
go func() { ch <- x }()
return ch
}
Trick:
newMVar
won't block and returns the MVarRemaining operations are then trivial.
func takeMVar(m MVar) int {
var x int
x = <-m
return x
}
func putMVar(m MVar, x int) {
m <- x
}
But there's problem.
The above won't work in case of subsequent takes and puts in the same thread.
Consider
var m MVar
m = newMVar(1) // Full
takeMVar(m) // Empty
putMVar(m, 2) // Full
The above program text is annotated with the desired MVar state (full or empty). Recall that the initial filling of the MVar happens in a concurrent thread. Hence, the call newMVar(1)
won't block.
The actual problem is that takeMVar
and putMvar
are built upon synchronous send/receive operations. Recall that we use here a channel without buffer!
So what could we do to fix the problem?
const (
Empty = 0
Full = 1
)
func newMVar(x int) MVar {
var ch = make(chan int)
go func() {
var state = Full
var elem int = x
for {
switch {
case state == Full:
ch <- elem // FULL
state = Empty
case state == Empty:
elem = <-ch // EMPTY
state = Full
}
}
}()
return ch
}
Each newMVar
call starts an "asynchronous" worker thread which controls the MVar state and helps to unblock takeMVar
and putMVar
calls.
No changes are required to takeMVar
and putMVar
.
func takeMVar(m MVar) int {
var x int
x = <-m // TAKE
return x
}
func putMVar(m MVar, x int) {
m <- x // PUT
}
Let's consider a few use cases. We will refer to the (above) annotated program positions FULL, EMPTY, TAKE, PUT.
m := newMVar(1)
x := takeMVar(m)
putMVar(m, x+1)
newMVar
call blocks at position FULL. Our main thread of course continues.takeMVar
calls reaches position TAKE and can synchronize with the concurrent worker thread.putMVar
call. Position PUT will synchronize with position EMPTY.Here's another example which involves three user threads (recall there's a fourth worker thread created by newMVar
).
m := newMVar(1) // P1
// THREAD 1
go func() { x := takeMVar(m) // P2
y := takeMVar(m) // P3
}()
// THREAD 2
go func() { putMVar(m,2) // P4
}()
A possible execution schedule is as follows.
takeMVar
, at position P2, synchronizes with the worker thread, at position FULL, and therefore assigns 1 to x. Then, the thread blocks when executing the second takeMVar
call, at position P3.putMVar
call, at position P4, synchronizes with the worker thread, at position EMPTY.takeMVar
call at postion P3 unblocks and we assign 2 to y.In theory, another execution schedule is possible.
takeMVar
)putMVar
)The conclusion is that a newMVar
can be "overtaken" by a subsequent putMVar
executed in some concurrent thread. A takeMVar
can pick up the value from putMVar
instead of newMVar
. This may seem a little strange first but there's really nothing wrong here. (In practice, it'll be quite tricky to create this specific situation).
Our revised MVar implementation is correct and simply follows a slightly more general semantics where the initial "write" by newMVar
might not necessarily be picked up the the "first" takeMVar
. We can argue that in a concurrent world such a thing as "first" is difficult to test, to observe and to guarantee.
Also note that two concurrent takeMvar
and putVar
calls may react with each other without changing the MVar state. That is, without interacting with the worker thread.
package main
import "fmt"
import "time"
type MVar (chan int)
const (
Empty = 0
Full = 1
)
func newMVar(x int) MVar {
var ch = make(chan int)
go func() {
var state = Full
var elem int = x
for {
switch {
case state == Full:
ch <- elem
state = Empty
case state == Empty:
elem = <-ch
state = Full
}
}
}()
return ch
}
func takeMVar(m MVar) int {
var x int
x = <-m
return x
}
func putMVar(m MVar, x int) {
m <- x
}
func producer(m MVar) {
var x int = 1
for {
time.Sleep(1 * 1e9)
putMVar(m, x)
x++
}
}
func consumer(m MVar) {
for {
var x int = takeMVar(m)
fmt.Printf("Received %d \n", x)
}
}
func main() {
test1()
} // main
func test1() {
var m MVar
m = newMVar(1)
takeMVar(m)
putMVar(m, 2)
fmt.Print("won't get here")
}
func test2() {
var m MVar
m = newMVar(1)
go producer(m)
consumer(m)
}
// almost impossible to create a situation
// where a subsequent putMVar overtakes the initial 'put' of newMVar
func test() {
var m MVar
m = newMVar(1)
go fmt.Printf("Take1 %d \n", takeMVar(m))
go putMVar(m, 2)
fmt.Printf("Take2 %d \n", takeMVar(m))
time.Sleep(1 * 1e9)
}
Consider a barber shop where there is only one barber. When there are no customers the barber sleeps. When a customer arrives he awakes and cuts the customer's hair. In case of several customers arriving the barber chooses one randomly.
The following variations/extensions exist:
There are n waiting chairs. If the barber is busy, the customer takes a seat. If there are no seats left, the customer leaves (and tries again after a certain amount of time)
There are several barbers serving customers
Below is a sample solution for the first variant.
Play around with the sample solution (change the number of customers, waiting chairs)
Attempt the second variant (several barbers)
package main
import "fmt"
import "time"
const (
NUMBER_OF_CHAIRS = 8
)
type Request struct {
id int
ack chan int
}
func barber(waitQ (chan Request)) {
for {
req := <-waitQ
fmt.Printf("BARBER: Serving customer %d \n", req.id)
time.Sleep(1 * 1e9)
fmt.Printf("BARBER: Done with customer %d \n", req.id)
req.ack <- 1
} // for
} // barber
func customer(waitQ (chan Request), id int) {
var ack = make(chan int)
for {
fmt.Printf("CUSTOMER: %d wants hair cut \n", id)
req := Request{id, ack}
waitQ <- req
fmt.Printf("CUSTOMER: %d sits on chair \n", id)
<-ack
fmt.Printf("CUSTOMER: %d served by barber \n", id)
time.Sleep(1 * 1e9)
} // for
} // customer
func main() {
var (
waitQ = make(chan Request, NUMBER_OF_CHAIRS)
)
go customer(waitQ, 1)
go customer(waitQ, 2)
barber(waitQ)
}