Part 3: Concurrency

Martin Sulzmann

Concurrency in Go (golang)

Concurrency versus Parallelism

Concurrency (goroutine)

Concurrent execution: "just say go"

package main

import "fmt"
import "time"

func thread(s string) {
    for {
        fmt.Print(s)
        time.Sleep(1 * 1e9)
    }
}

func main() {

    go thread("A")
    go thread("B")
    thread("C")
}

In Go we find cooperative multi-threading. That is, a thread will be executed until we reach a blocking statement (such as 'Sleep', receive on an empty channel, ...).

Let's consider a sample execution of the above example.

Each thread is in one of the following states:

* Running
* Waiting
* Blocked
    Main.Running

--> (Main.Running, A.Waiting)

--> (Main.Running, A.Waiting, B.Waiting)

--> (Main.Blocked, A.Waiting, B.Waiting)

...

--> (Main.Blocked, A.Waiting, B.Waiting)

--> (Main.Blocked, A.Running, B.Waiting)

--> (Main.Waiting, A.Blocked, B. Waiting)

...

--> (Main.Waiting, A.Blocked, B. Waiting)

--> (Main.Waiting, A.Blocked, B.Running)

and so on ...

Concurrency Issues

Concurrent programming is a tricky business. Consider the following example.

type position struct {
    x int
    y int
}

func main() {
    var p position

    // Producer 1
    go func() {
        p.x = 1
        p.y = 2
    }()

    // Producer 2
    go func() {
        p.x = 2
        p.y = 3
    }()

    // Consumer
    go func() {
        time.Sleep(1 * 1e9)
        x1 := p.x
        y1 := p.y
    }()

}

A consumer threads waits for x-y coordinates supplied by either producer 1 or producer 2 thread.

There are several issues:

Next, we take a look at channel-based communication in Go for thread synchronization and data race free exchange of information.

Communication ("channels")

ch chan int
ch = make(chan int)      // No buffer

ch = make(chan int, 50)  // Buffer size 50
ch <- y

Send value y over channel ch

x = <- ch

Receive from Kanal ch. Store received value in x

package main

import "fmt"
import "time"

func snd(ch chan int) {
    var x int = 0
    for {
        x++
        ch <- x
        time.Sleep(1 * 1e9)
    }

}

func rcv(ch chan int) {
    var x int
    for {
        x = <-ch
        fmt.Printf("received %d \n", x)

    }

}

func main() {
    var ch chan int = make(chan int)
    go snd(ch)
    rcv(ch)

}

Sending only

func snd(ch chan <- int) { 
 ...
}

Receiving only

func rcv(ch <- chan int) { 
 ...
}

Sample execution

    rcv.Running

--> (rcv.Running, snd.Waiting)

--> (rcv.Blocked_(<-ch?), snd.Waiting)

--> (rcv.Blocked_(<-ch?), snd.Running)

--> (rcv.Blocked_(<-ch?), snd.Blocked_(ch<-1?))

--> (rcv.Waiting, snd.Waiting)

--> (rcv.Running, snd.Waiting)

...

We consider the following variant.

func main() {
    var ch chan int = make(chan int, 1) // Kanal mit Puffer
    go snd(ch)
    rcv(ch)

}
    rcv.Running

--> (rcv.Running, snd.Waiting)

--> (rcv.Blocked_(<-ch?), snd.Waiting)

--> (rcv.Blocked_(<-ch?), snd.Running)

     // after execution of ch <- 1
     // the empty buffer becomes full
     // snd thread keeps runing
  
--> (rcv.Blocked_(<-ch?), snd.Blocked_(Sleep(1s)?))

--> (rcv.Waiting, snd.Blocked_(Sleep(1s)?))

    // channel buffer empty

--> (rcv.Running, snd.Blocked_(Sleep(1s)?))

...

As another variant. Channel with buffer size 1 and snd without Sleep.

func snd(ch chan int) {
    var x int = 0
    for {
        x++
        ch <- x
    }

}
    rcv.Running

--> (rcv.Running, snd.Waiting)

--> (rcv.Blocked_(<-ch?), snd.Waiting)

--> (rcv.Blocked_(<-ch?), snd.Running)

     // after execution of ch <- 1
     // the empty buffer becomes full
     // snd thread keeps runing

--> (rcv.Blocked_(<-ch?), snd.Blocked_(ch<-2?))

     // Two possibilies
     // (a) rcv reads from channel, or
     // (b) directly synchronizes with snd
     //
     // Go run-time follows case (a)

--> (rcv.Running, snd.Blocked_(ch<-2?))

     // Channel buffer empty

--> (rcv.Blocked_(<-ch?), snd.Blocked_(ch<-2?))

     // Two possibilities
     // (a) snd writes into channel, or
     // (b) directly synchronizes with rcv
     //
     // Go run-time follows case (a)

Observations:

Extended examples

package main

import "fmt"

func snd(ch chan int) {
    var x int = 0
    for {
        x++
        ch <- x
    }

}

func rcv(s string, ch chan int) {
    var x int
    for {
        x = <-ch
        fmt.Printf("received %s %d \n", s, x)

    }

}

func main() {
    var ch chan int = make(chan int)
    go snd(ch)
    go rcv("A", ch)
    rcv("B", ch)

}

Sample run (Macbook Air'15 OS 10.11)

received A 2 
received A 3 
received A 4 
received A 5 
received A 6 
received A 7 
received A 8 
received A 9 
received A 10 
received B 1 
received A 11 
received A 12 
received B 13 
received B 15 
received B 16 
received B 17 
received B 18 
received B 19 
received B 20 
received B 21 
received B 22 
received B 23 
received B 24 
received B 25 
received B 26 
received B 27 
received B 28 
received B 29 
received B 30 
received B 31 
received B 32 
received B 33 
received B 34 
received B 35 
received B 36 
received B 37 
received B 38 
received B 39 
received B 40 
received B 41 
received B 42 
received B 43 
received B 44 
received B 45 
received B 46 
received B 47 
received B 48 
received B 49 
received B 50 
received B 51 
received B 52 
received B 53 
received B 54 
received B 55 
received B 56 
received B 57 
received B 58 
received B 59 
received B 60 
received B 61 
received B 62 
received B 63 
received B 64 
received B 65 
received B 66 
received B 67 
received A 14 
received B 68 
received B 70 
received B 71 
received B 72 
received B 73 
received B 74 
received B 75 
received B 76 
received B 77 
received B 78 
received A 69 
received B 79 
received A 80 
received B 81 
received B 83 
received B 84 
received B 85 
received B 86 
received B 87 
received B 88 

FYI, on my old MacBook'08, only thread A is receiving.

Synchronous versus asynchronous communication

Exercise: Mutex

Go has support for standard synchronization primitives such as Mutex etc. See http://golang.org/pkg/sync/.

As an exercise, we implement our own Mutex via channels.

type Mutex (chan int)

func newMutex() Mutex {
    var ch = make(chan int, 1)
    return ch
}

func lock(m Mutex) {
    m <- 1
}

func unlock(m Mutex) {
    <-m
}

Variation:

Indeterministic choice ("select")

x = <-ch1
y = <-ch2
ch3 <- 1

versus

y = <-ch2
x = <-ch1
ch3 <- 1

Each event may block. How to try each event simultaneously?

select {
  case x = <-ch1: ...
  case y = <-ch2: ...
  case ch3 <- 1:
  // default and timeout possible
}

If one event, say <-ch1 is successful, then the respective case will be executed.

If there are several successful events, one event will be choosen indeterministically.

In theory, select can be encoded with just threads and channels. However, a faithful encoding is far from trivial.

In the following, we consider a bunch of examples to illustrate the workings and expressive power of select.

Example: Receive over two channels

package main

import "fmt"
import "time"
import "math/rand"

func snd(ch chan int) {
    var x int = 0
    for {
        x++
        ch <- x
        if rand.Int()%2 == 0 {
            time.Sleep(1 * 1e9)
        } else {
            time.Sleep(5 * 1e9)
        }
    }

}

func rcv(ch1 chan int, ch2 chan int) {
    var x int
    for {
        select {
        case x = <-ch1:
            fmt.Printf("received ch1 %d \n", x)
        case x = <-ch2:
            fmt.Printf("received ch2 %d \n", x)
        }

    }

}

func main() {
    var ch1 chan int = make(chan int)
    var ch2 chan int = make(chan int)
    go snd(ch1)
    go snd(ch2)
    rcv(ch1, ch2)

}

Example: Newsreader

Shows that an encoding of select with threads + channels is far from trivial.

package main

import "fmt"

func reuters(ch chan string) {
    ch <- "REUTERS"

}

func bloomberg(ch chan string) {
    ch <- "BLOOMBERG"

}

func newsReaderWithThreads(reutersCh chan string, bloombergCh chan string) {
    ch := make(chan string)

    go func() {
        ch <- (<-reutersCh)
    }()

    go func() {
        ch <- (<-bloombergCh)
    }()

    x := <-ch
    fmt.Printf("got news from %s \n", x)

}

func newsReaderWithSelect(reutersCh chan string, bloombergCh chan string) {
    var x string

    select {
    case x = <-reutersCh:
    case x = <-bloombergCh:
    }

    fmt.Printf("got news from %s \n", x)

}

func test1() {
    reutersCh := make(chan string)
    bloombergCh := make(chan string)

    go reuters(reutersCh)
    go bloomberg(bloombergCh)
    newsReaderWithThreads(reutersCh, bloombergCh)
    newsReaderWithThreads(reutersCh, bloombergCh)
}

func test2() {
    reutersCh := make(chan string)
    bloombergCh := make(chan string)

    go reuters(reutersCh)
    go bloomberg(bloombergCh)
    newsReaderWithSelect(reutersCh, bloombergCh)
    newsReaderWithSelect(reutersCh, bloombergCh)
}

func main() {
    test1() // potentially deadlocks
    // test2()
}

Example: Execution several tasks

package main

import "fmt"
import "time"

func task1() { time.Sleep(100 * time.Millisecond) }
func task2() { time.Sleep(200 * time.Millisecond) }
func task3() { time.Sleep(300 * time.Millisecond) }

func barrier() {
    var ch = make(chan int)
    // run all three tasks concurrently
    go func() {
        task1()
        ch <- 1 // signal done
    }()
    go func() {
        task2()
        ch <- 1
    }()
    go func() {
        task3()
        ch <- 1
    }()

    // collect results concurrently
    // set timeout to guarantee that within 500ms we're done
    timeout := time.After(500 * time.Millisecond)
    for i := 0; i < 3; i++ {
        select {
        case <-ch:
        case <-timeout:
            fmt.Println("timed out")
            return
        }

    }
    fmt.Println("done")
}


func main() {
    barrier()
}

Challenge

In the above, we guarantee that all tasks are completed within a certain time frame. In addition ensure that each tasks completes within a certain time frame.

Here's a possible attempt where we only show the relevant program parts.

    timeout := time.After(500 * time.Millisecond)
    for i := 0; i < 3; i++ {
        timeoutEach := time.After(100 * time.Millisecond)
        select {
        case <-ch:
        case <-timeout:
            fmt.Println("timed out (global)")
            return
        case <-timeoutEach:
            fmt.Println("timed out (local)")
            return
        }

    }
    fmt.Println("done")

We introduce a local timeout to keep track of the time spent each round. Notice that timeoutEach will be reset each time we enter the for-loop body.

Will this work?

No. It does not work. We only guarantee that the first task completes within the given time frame. In subsequent rounds we restart timeoutEach but some of the tasks are already running. Hence, there is no guarantee that all tasks respect the time frame imposed on each task.

What we require is to monitor each task with its own 'timeout' bound. We need something like this.

func completeWithin(task func(), ms time.Duration) chan bool {
    var ch = make(chan int)
    var res = make(chan bool)
    go func() {
        task()
        ch <- 1
    }()
    t := time.After(ms * time.Millisecond)
    go func() {
        select {
        case <-ch:
            res <- true
        case <-t:
            res <- false
        }
    }()
    return res
}

As we may want to perform this check for several tasks, the function shall be non-blocking. Hence, the result, either true = task completed within time frame, or false = task took longer is communicated via a channel. The channel can then be queried later.

    // run all three tasks concurrently
    // must complete within 500ms
    r1 := completeWithin(task1, 500)
    r2 := completeWithin(task2, 500)
    r3 := completeWithin(task3, 500)

    b1 := <-r1
    b2 := <-r2
    b3 := <-r3

    if b1 && b2 && b3 {
        fmt.Println("done")
    } else {
        fmt.Println("timed out")
    }

Deadlock

The go run-time is able to detect if all goroutines are asleep (i.e. are mutually blocked).

We say the program has a deadlock. A deadlock may not necessarily arise for all program runs. For example, consider

ch := make(chan int)
go func() {     // T1
     <- ch
     }()
go func() {     // T2
     ch <- 1
     }()     
     
ch <- 1

It's possible that thread T1 synchronizes with thread T2 and therefore we run into a deadlock. However, it is also possible that the main thread synchronizes with thread T1 (and no deadlock occurs).

Here's an example for which we encounter a deadlock for all possible program runs.

ch := make(chan int)
ch <- 1

Sample runs

Here's a sample run for which the program runs through.

   (main.Running)
   -> (main.Running, T1.Waiting)
   -> (main.Running, T1.Waiting, T2.Waiting)
   -> (main.Blocked_(ch<-1?), T1.Waiting, T2.Waiting)
   -> (main.Blocked_(ch<-1?), T1.Running, T2.Waiting)
   -> (main.Blocked_(ch<-1?), T1.Blocked_(<-ch?), T2.Waiting)
                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^
                     synchronize

   -> (main.Waiting, T1.Waiting, T2.Waiting)
   -> (main.Running, T1.Waiting, T2.Waiting)
   -> main terminates all other thread terminate as well

Here's another run where we encounter a deadlock.

   (main.Running)
   -> (main.Running, T1.Running)
   -> (main.Running, T1.Running, T2.Waiting)
   -> (main.Running, T1.Running, T2.Running)
   -> (main.Blocked_(ch<-1?), T1.Blocked_(<-ch?), T2.Blocked_(ch<-1?))

            either main synchronizes with T1, or
            T1 synchronizes with T2 (we choose the latter option here)

   -> (main.Blocked_(ch<-1?), T1.Waiting, T2.Waiting)
   -> (main.Blocked_(ch<-1?), T1.Running, T2.Running)
   -> (main.Blocked_(ch<-1?))

     T1 and T2 terminate, main thread is still blocked! Hence, deadlock!

Newsreader variant

We know that the newsreader with thread variants (possibly) runs into a deadlock. On my Mac it seems that all runs result into a deadlock. To highlight that not all runs result into a deadlock, we include a sleep statement for one of the worker threads.

Run the below, in contrast to the earlier variant, no deadlock will (most likely) be observed.


func newsReaderWithThreads(reutersCh chan string, bloombergCh chan string) {
    ch := make(chan string)

    go func() {
        ch <- (<-reutersCh)
    }()

    go func() {
        ch <- (<-bloombergCh)
    }()

    x := <-ch
    fmt.Printf("got news from %s \n", x)

}


func newsReaderWithThreadsAndSleep(reutersCh chan string, bloombergCh chan string) {
    ch := make(chan string)

    go func() {
        ch <- (<-reutersCh)
    }()

    go func() {
        time.Sleep(1 * 1e9)     
        ch <- (<-bloombergCh)
    }()

    x := <-ch
    fmt.Printf("got news from %s \n", x)

}


func main() {
    reutersCh := make(chan string)
    bloombergCh := make(chan string)

    go reuters(reutersCh)
    go bloomberg(bloombergCh)
    newsReaderWithThreadsAndSleep(reutersCh, bloombergCh)
    newsReaderWithThreads(reutersCh, bloombergCh)
}

Channels of channels

Channels are first-class citizens.

Channels can be arguments of channels.

`var ch chan (chan int)`

A channel where the transmitted values are themselves channels.

Thus, we can encode complex synchronization patterns.

Example

package main

import "fmt"
import "time"

type Request struct {
    id  int
    ack chan int
}

A request consists of an identification number and a channel. The channel is used to acknowledge a successful request.

func worker(req chan Request) {
    var c Request
    for {
        c = <-req
        fmt.Printf("request received from %d \n", c.id)
        time.Sleep(1 * 1e9)
        fmt.Println("notify")
        c.ack <- 1
    }
}

Wait for requests. Acknowledge reception of request via channel ack.

func client(id int, req chan Request) {
    var ack = make(chan int)
    for {
        c := Request{id, ack}
        req <- c
        <-ack
    }

}

Transmit request and wait for acknowledgment by performing a receive on channel ack.

func main() {
    var req = make(chan Request)
    go worker(req)
    go client(1, req)
    client(2, req)
}

Sleeping barber

A classic synchronization problem.

In its most simple form, the sleeping barber problem can be described as follows:

Here is a direct implementation.

package main

import "fmt"
import "time"

const (
    NUMBER_OF_CHAIRS = 8
)

type Request struct {
    id  int
    ack chan int
}

func barber(waitQ (chan Request)) {

    for {
        req := <-waitQ
        fmt.Printf("BARBER: Serving customer %d \n", req.id)
        time.Sleep(1 * 1e9)
        fmt.Printf("BARBER: Done with customer %d \n", req.id)
        req.ack <- 1

    } // for

} // barber

func customer(waitQ (chan Request), id int) {
    var ack = make(chan int)
    for {

        fmt.Printf("CUSTOMER: %d wants hair cut \n", id)
        req := Request{id, ack}
        waitQ <- req
        fmt.Printf("CUSTOMER: %d sits on chair \n", id)
        <-ack
        fmt.Printf("CUSTOMER: %d served by barber \n", id)
        time.Sleep(1 * 1e9)

    } // for

} // customer

func main() {

    var (
        waitQ = make(chan Request, NUMBER_OF_CHAIRS)
    )

    go customer(waitQ, 1)
    go customer(waitQ, 2)
    barber(waitQ)

}

Summary

Highlights

Synchronous channels

The following declaration introduces a synchronous channel.

ch := make(chan int)

Each sender blocks until a receiver arrives. For example, consider concurrent execution of
the following program fragments.

ch <- 1   
x := <- ch   

Asynchronous channels

The following declaration introduces a asynchronous channel.

ch := make(chan int, 2)

The buffer size of the (asynchronous) channel is 2. As long as there is sufficient space in the buffer, a sender won't block (that is, act asynchronously).

For example, consider the concurrent execution of the following program fragments.

ch <- 1   
ch <- 2
ch <- 3  // S
x := <- ch   // R

The sender will block at program point S until at program point R we have received 1 (channels behave like queues according to the FIFO principle).

Deadlocks

Suppose we execute the following program fragments concurrently. Will we run into a deadlock?

ch1 <- 1  // S1
ch2 <- 1  // S2
x := <-ch2  // R2  
y := <-ch1  // R1

Depends on the kind of channels. Suppose, we have

ch1 := make(chan int)
ch2 := make(chan int)

The above program will then deadlock because

Suppose, we have

ch1 := make(chan int,1)
ch2 := make(chan int)

The first send at program point S1 will not block and therefore the program runs through (without deadlock).

(Non-deterministic) select

Suppose we want to implement a news reader, receiving information from various sources (Reuters, Bloomberg etc). In which order shall we receiver the news? Difficult to specify a fixed order. What if we first choose Reuters but for some reason Reuters has a temporary black-out. Well, then let's frist choose Bloomborg. Erh, same problem.

First attempt

Let's have two threads, collecting news from Reuters and Bloomberg. Each thread passes on the news to a common channel which will be read by our news reader.

The thread taken care of Reuters:

for {
  x:= <-chReuters
  chNews <- x
}

The thread taking care of Bloomberg:

for {
  x:= <-chBloomberg
  chNews <- x
}

Finally, our news reader:

for {
  someNews := <- chNews
}

Issue:

chNews := make(chan string)
go func() {   x:= <-chReuters
              chNews <- x}()
go func() {   x:= <-chBloomberg
              chNews <- x}()          
someNews := <- chNews

So, there is a race among the two threads. As we are only interested in a single message, only one of the 'winning' thread will be able to transfer the message. The 'loosing' thread takes out the message from the news channel but as there is no receiver, this message will never be delivered and therefore is lost. For example, consider the situation that there are several clients only interested in a single message, either from Reuters and Bloomberg.

Hence, the 'loosing' thread would need to be informed to transfer back the unnecessarily consumed message. This requires a rather complicated protocol and also destroys the order in which messages are processed.

Second attempt via non-deterministic select

Good news. GoLang supports the non-deterministic selection over multiple events (i.e. send/receive). We can simply say

select {
  case x := <-chReuters:   // R
  case y := <-chBloomberg: // B
}

Giving preference

What if we prefer to read Reuters? We can use a nested select statement with a default case.

select {
  case x := <-chReuters:   // R
  case y := <-chBloomberg: select {
                              case x' := <chReuters:  // prefer Reuters
                              default:                // Bloomberg if no Reuters news
                           }
}

Exercises

Concurrency - Basics

package main

import "fmt"
import "time"
import "math/rand"

func snd(ch chan int) {
    var x int = 0
    for {
        x++
        ch <- x
        if rand.Int()%2 == 0 {
            time.Sleep(1 * 1e9)
        } else {
            time.Sleep(5 * 1e9)
        }
    }

}

func rcv(ch1 chan int, ch2 chan int) {
    var x int
    for {
        select {
        case x = <-ch1:
            fmt.Printf("received ch1 %d \n", x)
        case x = <-ch2:
            fmt.Printf("received ch2 %d \n", x)
        }

    }

}

func main() {
    var ch1 chan int = make(chan int)
    var ch2 chan int = make(chan int)
    go snd(ch1)
    go snd(ch2)
    rcv(ch1, ch2)

}

Concurrency - Mutex, Semaphores

Comments and further questions

Mutex

Sample solution mutex

package main

import "fmt"

type Mutex (chan int)

func newMutex() Mutex {
    var ch = make(chan int, 1)
    return ch
}

func lock(m Mutex) {
    m <- 1
}

func unlock(m Mutex) {
    <-m
}

func main() {
    var m Mutex

    m = newMutex()

    lock(m)
    fmt.Print("locked\n")
    unlock(m)

    fmt.Print("done\n")

}

Write a program which will deadlock

Semaphore/MVar

We can almost trivially model/implement semaphores via buffered channel.

Can we implement semaphores with just synchronous (bufferless) channels? Yes!

For simplicity, we consider a mutuable variable which roughly corresponds to a semaphore with quantity one. The MVar interface shall be as follows.

type MVar (chan int)
func newMVar(x int) MVar
func takeMVar(m MVar) int
func putMVar(m MVar, x int)

Let's try to implement an MVar via a synchronous (bufferless) channel. Our first try has a bug which will be fixed later.

First try (failed attempt)

package main

import "fmt"
import "time"

type MVar (chan int)

func newMVar(x int) MVar {
    var ch = make(chan int)
    go func() { ch <- x }()
    return ch
}

Trick:

Remaining operations are then trivial.

func takeMVar(m MVar) int {
    var x int
    x = <-m
    return x
}

func putMVar(m MVar, x int) {
    m <- x
}

But there's problem.

The above won't work in case of subsequent takes and puts in the same thread.

Consider

    var m MVar
    m = newMVar(1)   // Full
    takeMVar(m)      // Empty
    putMVar(m, 2)    // Full

The above program text is annotated with the desired MVar state (full or empty). Recall that the initial filling of the MVar happens in a concurrent thread. Hence, the call newMVar(1) won't block.

The actual problem is that takeMVar and putMvar are built upon synchronous send/receive operations. Recall that we use here a channel without buffer!

So what could we do to fix the problem?

Concurrent thread to control MVar state

const (
    Empty = 0
    Full  = 1
)

func newMVar(x int) MVar {
    var ch = make(chan int)
    go func() {
        var state = Full
        var elem int = x
        for {
            switch {
            case state == Full:
                ch <- elem     // FULL
                state = Empty
            case state == Empty:
                elem = <-ch    // EMPTY
                state = Full
            }
        }
    }()
    return ch
}

Each newMVar call starts an "asynchronous" worker thread which controls the MVar state and helps to unblock takeMVar and putMVar calls.

No changes are required to takeMVar and putMVar.

func takeMVar(m MVar) int {
    var x int
    x = <-m      // TAKE
    return x
}

func putMVar(m MVar, x int) {
    m <- x      // PUT
}

Let's consider a few use cases. We will refer to the (above) annotated program positions FULL, EMPTY, TAKE, PUT.

 m := newMVar(1)
 x := takeMVar(m)
 putMVar(m, x+1)
  1. The worker thread created by the newMVar call blocks at position FULL. Our main thread of course continues.
  2. The takeMVar calls reaches position TAKE and can synchronize with the concurrent worker thread.
  3. Both threads (main and worker) continue.
  4. The worker thread will block at position EMPTY.
  5. The main thread executes the putMVar call. Position PUT will synchronize with position EMPTY.

Here's another example which involves three user threads (recall there's a fourth worker thread created by newMVar).

 m := newMVar(1)                     // P1
    // THREAD 1
 go func() { x := takeMVar(m)        // P2
             y := takeMVar(m)        // P3
           }()
    // THREAD 2
 go func() { putMVar(m,2)            // P4
           }()

A possible execution schedule is as follows.

  1. In THREAD 1, the first takeMVar, at position P2, synchronizes with the worker thread, at position FULL, and therefore assigns 1 to x. Then, the thread blocks when executing the second takeMVar call, at position P3.
  2. The putMVar call, at position P4, synchronizes with the worker thread, at position EMPTY.
  3. Subsequently, the takeMVar call at postion P3 unblocks and we assign 2 to y.

In theory, another execution schedule is possible.

  1. The Worker thread blocks at position FULL.
  2. THREAD 1 blocks at postion P2 (position TAKE within takeMVar)
  3. THREAD 2 blocks at position P4 (position PUT within putMVar)
  4. We have two blocked "writers" (worker thread and THREAD 2) and one blocked "reader" (THREAD 1)
  5. Let's assume that THREAD 1 and THREAD 2 synchronize with each other.
  6. Then, 2 is assigned to x and THREAD 1 advances to block again at position P3.
  7. Next, the worker thread and THREAD 1 synchronize with each other.
  8. Then, 1 is assigned to y.

The conclusion is that a newMVar can be "overtaken" by a subsequent putMVar executed in some concurrent thread. A takeMVar can pick up the value from putMVar instead of newMVar. This may seem a little strange first but there's really nothing wrong here. (In practice, it'll be quite tricky to create this specific situation).

Our revised MVar implementation is correct and simply follows a slightly more general semantics where the initial "write" by newMVar might not necessarily be picked up the the "first" takeMVar. We can argue that in a concurrent world such a thing as "first" is difficult to test, to observe and to guarantee.

Also note that two concurrent takeMvar and putVar calls may react with each other without changing the MVar state. That is, without interacting with the worker thread.

Complete MVar solution

package main

import "fmt"
import "time"

type MVar (chan int)

const (
    Empty = 0
    Full  = 1
)

func newMVar(x int) MVar {
    var ch = make(chan int)
    go func() {
        var state = Full
        var elem int = x
        for {
            switch {
            case state == Full:
                ch <- elem
                state = Empty
            case state == Empty:
                elem = <-ch
                state = Full
            }
        }
    }()
    return ch
}

func takeMVar(m MVar) int {
    var x int
    x = <-m
    return x
}

func putMVar(m MVar, x int) {
    m <- x
}

func producer(m MVar) {
    var x int = 1
    for {
        time.Sleep(1 * 1e9)
        putMVar(m, x)
        x++
    }
}

func consumer(m MVar) {
    for {
        var x int = takeMVar(m)
        fmt.Printf("Received %d \n", x)
    }
}

func main() {

    test1()

} // main

func test1() {
    var m MVar
    m = newMVar(1)
    takeMVar(m)
    putMVar(m, 2)
    fmt.Print("won't get here")

}

func test2() {
    var m MVar
    m = newMVar(1)
    go producer(m)
    consumer(m)
}

// almost impossible to create a situation
// where a subsequent putMVar overtakes the initial 'put' of newMVar
func test() {
    var m MVar
    m = newMVar(1)
    go fmt.Printf("Take1 %d \n", takeMVar(m))
    go putMVar(m, 2)
    fmt.Printf("Take2 %d \n", takeMVar(m))
    time.Sleep(1 * 1e9)

}

Concurrency - Sleeping barber

Consider a barber shop where there is only one barber. When there are no customers the barber sleeps. When a customer arrives he awakes and cuts the customer's hair. In case of several customers arriving the barber chooses one randomly.

The following variations/extensions exist:

Below is a sample solution for the first variant.

package main

import "fmt"
import "time"

const (
    NUMBER_OF_CHAIRS = 8
)

type Request struct {
    id  int
    ack chan int
}

func barber(waitQ (chan Request)) {

    for {
        req := <-waitQ
        fmt.Printf("BARBER: Serving customer %d \n", req.id)
        time.Sleep(1 * 1e9)
        fmt.Printf("BARBER: Done with customer %d \n", req.id)
        req.ack <- 1

    } // for

} // barber

func customer(waitQ (chan Request), id int) {
    var ack = make(chan int)
    for {

        fmt.Printf("CUSTOMER: %d wants hair cut \n", id)
        req := Request{id, ack}
        waitQ <- req
        fmt.Printf("CUSTOMER: %d sits on chair \n", id)
        <-ack
        fmt.Printf("CUSTOMER: %d served by barber \n", id)
        time.Sleep(1 * 1e9)

    } // for

} // customer

func main() {

    var (
        waitQ = make(chan Request, NUMBER_OF_CHAIRS)
    )

    go customer(waitQ, 1)
    go customer(waitQ, 2)
    barber(waitQ)

}