Futures and promises in Haskell and Go

Martin Sulzmann

Overview

Futures and promises in Haskell and Scala

Abstract:

Futures and promises are a high-level concurrency construct to aid the user in writing scalable and correct asynchronous programs. We introduce a simple core language based on which we can derive a rich set of future and promise features. We discuss ways to implement the core features via shared-state concurrency making either use of Software Transactional Memory, an elementary lock-based primitive, or an atomic compare-and-swap operation. The approach has been fully implemented in Haskell and Scala. For both languages, we provide empirical evidence of the effectiveness of our method. We consider program transformations in the context of futures and promises and observe potential problems in existing Scala-based libraries.

Compared to Haskell. Go with generics demands the user to supply explict type instances for polymorphic functions. This can be rather cumbersome.

Futures and promises in Haskell


{-# LANGUAGE GADTs, FlexibleInstances, UndecidableInstances #-}


module Main where

{-

Supplementary material to our PEPM'19 submission.
Some raw Haskell code.

-}

import Data.Time
import Text.Printf

import System.Mem
import System.Environment

import Data.IORef
import Data.List
import Data.Maybe
import Control.Concurrent
import Control.Concurrent.STM



casIORef :: Eq a => IORef a -> a -> a -> IO Bool
casIORef ptr old new =
   atomicModifyIORef ptr (\ cur -> if cur == old
                                   then (new, True)
                                   else (cur, False))

forkIO_ cmd = do forkIO cmd
                 return ()


-- Primitive set of (promise/future) features, specified in terms of a constructor class.
-- Inspired by Scala FP (= Scala's futures and promises).
class Core t where
  newC :: IO (t a)
  getC :: t a -> IO (Maybe a)
  tryCompleteC :: t a -> IO (Maybe a) -> IO Bool
  onCompleteC :: t a -> (Maybe a -> IO ()) -> IO ()



-- | Atomic reference based implementation.
-- Inspired by Scala FP.
-- Invariant: Either promise not yet completed, or list of callbacks non-empty.
-- CAS requires Eq instance, as Either predefined, define our own.
-- Via MVar () we signal if promise has been completed.
data E a b = L a | R b

data CIO a = CIO (IORef (E (Maybe a) [Maybe a -> IO ()]))
                 (MVar ())

-- | Eq required to perform CAS on atomic reference.
-- There is no need to inspect the actual values (value, list of callbacks) because:
-- 1. Once set, a promise can't be overriden.
-- 2. We only add callbacks to the list.
--
-- Requires FlexibleInstances because of the nested pattern (safe use here).
instance Eq (E (Maybe a) [Maybe a -> IO ()]) where
  (==) (L _) (R _) = False
  (==) (R _) (L _) = False
  (==) (L Nothing) (L (Just _)) = False
  (==) (L (Just _)) (L Nothing) = False
  (==) (L (Just _)) (L (Just _)) = True
  (==) (L Nothing) (L Nothing) = True
  (==) (R xs) (R ys) = length xs == length ys

instance Core CIO where
  newC =
    do x <- newIORef (R [])
       y <- newEmptyMVar
       return $ CIO x y

  getC (CIO x y) =
    do _ <- readMVar y
       (L v) <- readIORef x
       return v

  tryCompleteC (CIO x y) m = do
    v <- m
    let go = do
         val <- readIORef x
         case val of
           L _ -> return False
           R hs -> do b <- casIORef x val (L v)
                      if b
                       then do putMVar y ()
                               mapM_ (\h -> forkIO $ h v) hs
                               return True
                       else go
    go

  onCompleteC (CIO x _) h = do
       let go = do val <- readIORef x
                   case val of
                     L v -> do forkIO $ h v
                               return ()
                     R hs -> do b <- casIORef x val (R (h:hs))
                                if b
                                  then return ()
                                  else go
       go



-- | MVar
-- Largely similar to atomic reference impl (no spinning, blocking).
data CMVAR a = CMVAR (MVar (E (Maybe a) [Maybe a -> IO ()]))
                           (MVar ())

instance Core CMVAR where
   newC = do x <- newMVar (R [])
             y <- newEmptyMVar
             return $ CMVAR x y

   getC (CMVAR x y) = do _ <- readMVar y
                         (L v) <- readMVar x
                         return v
   tryCompleteC (CMVAR x y) m = do
        v <- m
        s <- takeMVar x
        case s of
          L _ -> do putMVar x s
                    return False
          R hs -> do putMVar x (L v)
                     putMVar y ()
                     mapM_ (\h -> forkIO $ h v) hs
                     return True

   onCompleteC (CMVAR x y) h = do
       s <- takeMVar x
       case s of
         (L v) -> do putMVar x (L v)
                     forkIO_ $ h v
         (R hs) -> putMVar x (R (h:hs))

-- | STM-based implementation
-- Invariant: Either promise not yet completed, or list of callbacks non-empty.
-- Signaling done via retry.
data CSTM a = CSTM (TVar (E (Maybe a) [Maybe a -> IO ()]))

unPrimSTM (CSTM x) = x

instance Core CSTM where
   newC = do x <- atomically $ newTVar (R []) -- newTVarIO (R [])
             return $ CSTM x

   getC (CSTM x) = atomically $ do s <- readTVar x
                                   case s of
                                     R _ -> retry
                                     L v -> return v

   tryCompleteC (CSTM x) m = do
       v <- m
       action <- atomically $ do s <- readTVar x
                                 case s of
                                    R hs -> do writeTVar x (L v)
                                               return $ do mapM_ (\h -> forkIO $ h v) hs
                                                           return True
                                    L _ -> return (return False)
       action

   onCompleteC (CSTM x) h = do
      action <- atomically $ do s <- readTVar x
                                case s of
                                  (R hs) -> do writeTVar x $ R (h:hs)
                                               return (return ())
                                  (L v) -> return (forkIO_ $ h v)
      action


-- | Futures *and* Promises.
-- We don't make any distinction at this point.
-- A future is  promise and vice versa.
-- At some later stage, we can provide a more refined interface,
-- e.g. for futures we only provide certain operations etc.

class FP t where
  new :: IO (t a)
  trySuccess :: t a -> a -> IO Bool
  tryFail :: t a -> IO Bool
  tryComplete :: t a -> IO (Maybe a) -> IO Bool
  trySuccWith :: t a -> t a -> IO ()
  tryFailWith :: t a -> t a -> IO ()
  tryCompleteWith :: t a -> t a -> IO ()

  future_ :: (() -> IO (Maybe a)) -> IO (t a)
  future :: IO (Maybe a) -> IO (t a)
  get :: t a -> IO (Maybe a)
  onComplete :: t a -> (Maybe a -> IO ()) -> IO ()
  onSuccess :: t a -> (a -> IO ()) -> IO ()
  onFail :: t a -> (() -> IO ()) -> IO ()

  transformWith :: t a -> (Maybe a -> IO (t b)) -> IO (t b)
  transform :: t a -> (Maybe a -> IO (Maybe b)) -> IO (t b)
  followedBy :: t a -> (a -> IO (Maybe b)) -> IO (t b)
  followedByWith :: t a -> (a -> IO (t b)) -> IO (t b)
  guard :: t a -> (a -> IO Bool) -> IO (t a)
  orAlt :: t a -> t a -> IO (t a)
  first :: t a -> t a -> IO (t a)
  firstSucc :: t a -> t a -> IO (t a)



-- Requires UndecidableInstaces as the context constraint Core t is no smaller than the instead head FP t.
-- For our uses, the instance declaration is safe.
-- Requires FlexibleInstances because in instead head FP t, t is a plain variable (safe use here).
instance Core t => FP t where
    new = newC
    trySuccess p x = tryCompleteC p (return $ Just x)
    tryFail p = tryCompleteC p (return Nothing)
    tryComplete = tryCompleteC
    trySuccWith p f =
        onSuccess f (\x -> do trySuccess p x
                              return ())
    tryFailWith p f =
        onFail f (\() -> do tryFail p
                            return ())
    tryCompleteWith p f = do
        onComplete f (\x -> do tryComplete p (return x)
                               return ())
{-
-- creates 2 instead of 1 callback
        trySuccWith p f
        tryFailWith p f
-}

    future_ h = do p <- newC
                   forkIO $ do tryComplete p (h ())
                               return ()
                   return p
    future f = future_ (\() -> f)
    get = getC
    onComplete f h = onCompleteC f h
    onSuccess f h = onComplete  f (\x -> case x of
                                           Nothing -> return ()
                                           Just v -> h v)
    onFail f h = onComplete  f (\x -> case x of
                                           Nothing -> h ()
                                           Just v -> return ())

    transformWith f h = do
        p <- new
        onComplete  f (\x -> do v <- h x
                                tryCompleteWith p v)
        return p
    transform f h = do
        p <- new
        onComplete  f (\x -> do tryComplete p (h x)
                                return ())
        return p
{-

-- 'with' introduces additional callback that might lead to some run-time overhead

       transformWith f (\x -> do p <- newP
                                 tryComplete p (h x)
                                 return p)
-}
    followedBy f h =
       transform f (\x -> case x of
                           Just v -> h v
                           Nothing -> return Nothing)
{-
       followedByWith f (\x -> do p <- newP
                                  tryComplete p (h x)
                                  return p)
-}

    guard f h = followedBy f (\x -> do v <- h x
                                       if v
                                          then return $ Just x
                                          else return Nothing)
    followedByWith f h = do
        transformWith f (\x -> case x of
                                 Just v -> h v
                                 Nothing -> do p <- new
                                               tryFail p
                                               return p)
    orAlt f1 f2 = do
      transformWith f1
        (\x -> case x of
                 Just v -> return f1
                 Nothing -> transform f2
                               (\x -> case x of
                                        Just v -> return x
                                        Nothing -> return Nothing))
{-
    orAlt f1 f2 = do
      transformWith f1
        (\x -> case x of
                 Just v -> return f1
                 Nothing -> transformWith f2
                               (\x -> case x of
                                        Just v -> return f2
                                        Nothing -> return f1))
-}

    first f1 f2 = do
      p <- new
      tryCompleteWith p f1
      tryCompleteWith p f2
      return p
    firstSucc f1 f2 = do
      p <- new
      trySuccWith p f1
      trySuccWith p f2
      return p


-- Just playing. Alternative definition.

{-
    followedBy f h = do p <- newP
                        onComplete  f (\x -> case x of
                                               Just v -> do tryCompleteP p (h v)
                                                            return ()
                                               Nothing -> do tryCompleteP p (return Nothing)
                                                             return ())
                        return p
-}

{-
    followedByWith f h = do
        p <- newP
        onComplete  f (\x -> case x of
                              Just v -> do f' <- h v
                                           tryCompleteWith p f'
                              Nothing -> do tryCompleteP p (return Nothing)
                                            return ())
        return p
-}

{-
    orAlt f1 f2 = do
      future (\() -> do v <- get f1
                        case v of
                           Just{} -> return v
                           Nothing -> get f2)
-}

{-
    orAlt f1 f2 = do
       p <- newP
       onComplete f1
         (\x -> case x of
                  Just v -> do trySuccess p v
                               return ()
                  Nothing -> onComplete f2 (\x -> case x of
                                                    Just v -> do trySuccess p v
                                                                 return ()
                                                    Nothing -> do tryFail p
                                                                  return ()))
       return p
-}


{-

    firstSucc f1 f2 = do
      f3 <- orAlt f1 f2
      f4 <- orAlt f2 f1
      first f3 f3
-}


-- Benchmarks

bio :: CIO a
bio = undefined

bstm :: CSTM a
bstm = undefined

bmvar :: CMVAR a
bmvar = undefined

-- Measures high/low contention
-- q to select candidate (bio, bmvar, bstm)
-- n number of promises
-- m * n number of oncomplete runs
-- k * n number of try completes
-- For each promise pi taken from [p1,...,pn]
--     m times onComplete pi incCount
--     k times tryComplete pi v
--     get pi
-- wait for counter to reach n*m (all onCompletes are processed)
perf1 n m k q = do
         x <- newTVarIO 0
         ps <- mapM (\_ -> newC) [1..n]
         let qs = q:ps -- trick to force type
         let ps' = ps
         let cmd = mapM_ (\p-> onCompleteC p (\_ -> do atomically $ do v <- readTVar x
                                                                       writeTVar x (v+1)
                                                       -- putStrLn "doo"
                                             ))
                                    ps'
         mapM_ (\_ -> forkIO cmd) [1..m]

         let ts = mapM_ (\p -> tryCompleteC p (return $ Just 1)) ps

         mapM_ (\_ -> forkIO ts) [1..k]

         mapM_ getC ps  -- XX
         atomically $ do v <- readTVar x
                         if v < n*m
                          then retry
                          else return ()




-- chain of (nested) oncomplete and tries
--  onComplete p1 (do tryComplete p2
--                    onComplete p2 (do tryComplete p3
--                                      ...            ))
--  tryComplete p1
-- get pn
perf2 n q = do
            ps <- mapM (\_ -> newC) [1..n]
            let qs = q:ps -- trick to force type
            let go (p1:p2:ps) = onCompleteC p1 (\_ -> do tryCompleteC p2 (return $ Just 1)
                                                         go (p2:ps))
                go [p] = onCompleteC p (\_ -> return ())
            go ps
            tryCompleteC (head ps) (return $ Just 1)
            _ <- getC (last ps)
            return ()


-- chain of oncomplete and tries
-- no excessive nesting
-- do onComplete p1 (tryComplete p2)
--    onComplete p2 (tryComplete p3)
--    ...
---   tryComplete p1
perf3 n q = do
            ps <- mapM (\_ -> newC) [1..n]
            let qs = q:ps -- trick to force type
            let go (p1:p2:ps) = do onCompleteC p1 (\_ -> do do tryCompleteC p2 (return $ Just 1)
                                                               return ())
                                   go (p2:ps)
                go [p] = onCompleteC p (\_ -> return ())
            go ps
            tryCompleteC (head ps) (return $ Just 1)
            _ <- getC (last ps)
            return ()


execTest test = do performGC
                   start <- getCurrentTime
                   test
                   fin <- getCurrentTime
                   let result = diffUTCTime fin start
                   printf "time: %.2fs\n" (realToFrac result :: Double)
                   return (realToFrac result :: Double)


runTest n test = do rs <- mapM (\_ -> execTest test) [1..n]
                    let xs = sort rs
                    let low = head xs
                    let high = last xs
                    let m :: Double
                        m = fromIntegral (length xs)
                    let av = sum xs / m
                    printf "low: %.2fs high: %.2fs avrg: %.2fs\n" low high av


runAll n t1 t2 t3 = do
                   putStrLn "CIO"
                   runTest n t1
                   putStrLn "CMVAR"
                   runTest n t2
                   putStrLn "CSTM"
                   runTest n t3


test1 = runAll 10 (perf1 10000 100 200 bio)
                 (perf1 10000 100 200 bmvar)
                 (perf1 10000 100 200 bstm)

test2 = runAll 10 (perf1 100000 20 2 bio)
                 (perf1 100000 20 2 bmvar)
                 (perf1 100000 20 2 bstm)

test3 = do let n = 2000000
           runAll 10 (perf2 n bio)
                     (perf2 n bmvar)
                     (perf2 n bstm)

test4 = do let n = 2000000
           runAll 10 (perf3 n bio)
                     (perf3 n bmvar)
                     (perf3 n bstm)

main = do args <- getArgs
          let t = head args
          case t of
            "1" -> do print "Test 1"
                      test1
            "2" -> do print "Test 2"
                      test2
            "3" -> do print "Test 3"
                      test3
            "4" -> do print "Test 4"
                      test4

“MVar”-based Futures and promises in plain Go

package main

// Direct Go implementation of "Futures and promises in Haskell and Scala"
//  (https://www.researchgate.net/publication/330066278_Futures_and_promises_in_Haskell_and_Scala)
//
// Recent versions of Go support "generics" (parametric polymorphism).
// Below we give an implementation that does not exploit parametric polymorphism.
// Not having parametric polymorphism can be a serious limitation for building EDSLs.
// The Go types are less descriptive. We need to resort to "interface{}" and type casts.
// This might introduce bugs in user code and in the EDSL implementation.

import "fmt"
import "time"

// Maybe data type

type Maybe struct {
    val     interface{}
    nothing bool
}

func mkJust(x interface{}) Maybe {
    return Maybe{x, false}
}

func mkNothing() Maybe {
    return Maybe{nothing: true}
}

func (m Maybe) isJust() (r interface{}, b bool) {
    r = m.val
    b = !m.nothing

    return r, b
}

func (m Maybe) isNothing() bool {
    return m.nothing
}

// Either data type

type Either struct {
    left        interface{}
    right       interface{}
    leftOrRight bool // true => left, false => right
}

func mkLeft(x interface{}) Either {
    return Either{left: x, leftOrRight: true}
}

func mkRight(x interface{}) Either {
    return Either{right: x, leftOrRight: false}
}

func (e Either) isLeft() (r interface{}, b bool) {
    r = e.left
    b = e.leftOrRight

    return r, b
}

func (e Either) isRight() (r interface{}, b bool) {
    r = e.right
    b = !e.leftOrRight

    return r, b
}

// Channel-based MVar

type MVar chan (interface{})

func newMVar(x interface{}) MVar {
    ch := make(chan interface{}, 1)
    ch <- x
    return ch
}

func newEmptyMVar() MVar {
    ch := make(chan interface{}, 1)
    return ch
}

func (m MVar) takeMVar() interface{} {
    return <-m
}

func (m MVar) putMVar(x interface{}) {
    m <- x
}

func (m MVar) readMVar() interface{} {
    x := m.takeMVar()
    m.putMVar(x)
    return x
}

// Future/promise MVar-based core implementation
// CMVar = Core MVar-based

type CMVar struct {
    valOrCallbacks MVar
    signal         MVar
}

func newC() CMVar {
    x := newMVar(mkRight(make([]func(Maybe), 0)))
    y := newEmptyMVar()

    return CMVar{x, y}
}

func (c CMVar) getC() Maybe {
    c.signal.readMVar()
    r, _ := c.valOrCallbacks.readMVar().(Either).isLeft()

    return r.(Maybe)
}

func (c CMVar) tryCompleteC(f func() Maybe) bool {
    var b bool
    v := f()
    s := c.valOrCallbacks.takeMVar()
    hs, right := s.(Either).isRight()

    switch {
    case right == false:
        c.valOrCallbacks.putMVar(s)
        b = false
    case right == true:
        c.valOrCallbacks.putMVar(mkLeft(v))
        c.signal.putMVar(1)
        for _, h := range hs.([]func(Maybe)) {
            go h(v)
        }
        b = true

    }
    return b
}

func (c CMVar) onComplete(h func(Maybe)) {
    s := c.valOrCallbacks.takeMVar()
    v, left := s.(Either).isLeft()
    hs, right := s.(Either).isRight()

    switch {
    case left == true:
        c.valOrCallbacks.putMVar(s)
        go h(v.(Maybe))
    case right == true:
        hs = append(hs.([]func(Maybe)), h)
        c.valOrCallbacks.putMVar(mkRight(hs))

    }

}

// Example

func example() {

    p := newC()

    go p.tryCompleteC(func() Maybe {
        time.Sleep(1 * time.Second)
        return mkNothing()

    })

    p.onComplete(func(x Maybe) {
        _, something := x.isJust()
        switch {
        case something == true:
            fmt.Printf("\n something")
        default:
            fmt.Printf("\n nothing")
        }

    })

    time.Sleep(2 * time.Second)

}

func main() {

    example()
    fmt.Printf("done")

}

“MVar”-based Futures and promises in Go with generics

package main

// Direct Go implementation of "Futures and promises in Haskell and Scala"
//  (https://www.researchgate.net/publication/330066278_Futures_and_promises_in_Haskell_and_Scala)
//
// Recent versions of Go support "generics" (parametric polymorphism).
// Below we give an implementation that does exploit parametric polymorphism.
// Unlike Haskell, Go with generics does not support type inference.
// This means that for parametric polymorphic functions, the user needs to
// provide explict type instances.
// Explicit type instance are not necessary for Go methods due to the following trick:
//    - Method definitions are not allowed to introduce any type parameters
//    - Of course, they may refer to polymorphic types (as receivers and arguments)
//    - The types of variables are known in Go
//    - Thus, each method call can be type checked by knowing the types of receivers and arguments.

import "fmt"
import "time"

// Maybe data type

type Maybe[T any] struct {
    val     T
    nothing bool
}

func mkJust[T any](x T) Maybe[T] {
    return Maybe[T]{x, false}
}

func mkNothing[T any]() Maybe[T] {
    return Maybe[T]{nothing: true}
}

func (m Maybe[T]) isJust() (r T, b bool) {
    r = m.val
    b = !m.nothing

    return r, b
}

func (m Maybe[T]) isNothing() bool {
    return m.nothing
}


// Either data type

type Either[T any, S any] struct {
    left        T
    right       S
    leftOrRight bool // true => left, false => right
}

func mkLeft[T any, S any](x T) Either[T,S] {
    return Either[T,S]{left: x, leftOrRight: true}
}

func mkRight[T any, S any](x S) Either[T,S] {
    return Either[T,S]{right: x, leftOrRight: false}
}

func (e Either[T,S]) isLeft() (l T, b bool) {
    l = e.left
    b = e.leftOrRight

    return l, b
}

func (e Either[T,S]) isRight() (r S, b bool) {
    r = e.right
    b = !e.leftOrRight

    return r, b
}


// Channel-based MVar

type MVar[T any] chan (T)



func newMVar[T any](x T) MVar[T] {
    ch := make(chan T, 1)
    ch <- x
    return ch
}



func newEmptyMVar[T any]() MVar[T] {
    ch := make(chan T, 1)
    return ch
}

func (m MVar[T]) takeMVar() T {
    return <-m
}

func (m MVar[T]) putMVar(x T) {
    m <- x
}

func (m MVar[T]) readMVar() T {
    x := m.takeMVar()
    m.putMVar(x)
    return x
}



// Future/promise MVar-based core implementation
// CMVar = Core MVar-based

type CMVar[T any] struct {
    valOrCallbacks MVar[Either[Maybe[T], []func(Maybe[T])]]
    signal         MVar[int]
}


func newC[T any]() CMVar[T] {
    x := newMVar[Either[Maybe[T], []func(Maybe[T])]](mkRight[Maybe[T],[]func(Maybe[T])](make([]func(Maybe[T]), 0)))
    y := newEmptyMVar[int]()

    return CMVar[T]{x, y}
}




func (c CMVar[T]) getC() Maybe[T] {
    sig := c.signal
    sig.readMVar()
    r, _ := c.valOrCallbacks.readMVar().isLeft()

    return r
}



func (c CMVar[T]) tryCompleteC(f func() Maybe[T]) bool {
    var b bool
    v := f()
    s := c.valOrCallbacks.takeMVar()
    hs, right := s.isRight()

    switch {
    case right == false:
        c.valOrCallbacks.putMVar(s)
        b = false
    case right == true:
        c.valOrCallbacks.putMVar(mkLeft[Maybe[T], []func(Maybe[T])](v))
        c.signal.putMVar(1)
        for _, h := range hs {
            go h(v)
        }
        b = true

    }
    return b
}



func (c CMVar[T]) onComplete(h func(Maybe[T])) {
    s := c.valOrCallbacks.takeMVar()
    v, left := s.isLeft()
    hs, right := s.isRight()

    switch {
    case left == true:
        c.valOrCallbacks.putMVar(s)
        go h(v)
    case right == true:
        hs = append(hs, h)
        c.valOrCallbacks.putMVar(mkRight[Maybe[T], []func(Maybe[T])](hs))

    }

}



// Example

func example() {

    p := newC[int]()

    go p.tryCompleteC(func() Maybe[int] {
        time.Sleep(1 * time.Second)
        return mkNothing[int]()

    })

    p.onComplete(func(x Maybe[int]) {
        _, something := x.isJust()
        switch {
        case something == true:
            fmt.Printf("\n something")
        default:
            fmt.Printf("\n nothing")
        }

    })

    time.Sleep(2 * time.Second)

}


func main() {

    example()
    fmt.Printf("done")

}