Tuppence Tour of Haskell Concurrency Constructs

Paul Brown @ 2007-10-21T05:04:00Z

One of the little widgets that I need for an experiment is a sequence number generator, and it's a good way to get a tuppence (i.e., less than half a nickel) tour of Haskell's concurrency constructs with a little lesson on laziness thrown in for spice.

Requirements

The generator should produce unique Int values on demand and support concurrent access, and I want to try out a couple of methods, one that uses GHC's baseline concurrency capabilities and one that uses STM. (Also, I'd like a better feel for the concepts in practice, so this makes a good exercise!)

Take One: Asynchronous Channels

The base GHC concurrency packages (Control.Concurrent and its descendents) include a great set of buildings blocks: one-place buffers (MVar), asynchronous channels (Chan) that can be combined into one-to-many broadcast channels, and quantity semaphores (QSem and QSemN).

The design I have in mind uses two asynchronous channels, one for requests and one for responses. All clients of the generator receive responses on the one channel, which means that one might get the number that another one requested, but that's no big deal.

First, one data structure for the requests and one for the client view of the generator:

import Control.Concurrent ( ThreadId, forkIO )
import Control.Concurrent.Chan ( Chan, newChan, writeChan, readChan )

data Request = Get
             | Set { initial_value :: Int }
             | Die

data Generator = Generator { thread_id :: ThreadId,
                             request_channel :: Chan Request, 
                             response_channel :: Chan Int }

Then some functions (whose signatures I'll match with the STM version below) to manipulate the generator:

reset :: Generator -> Int -> IO ()
reset g i = writeChan (request_channel g) (Set i)

get_next :: Generator -> IO Int
get_next g = (writeChan (request_channel g) (Get)) >> 
             readChan (response_channel g)

stop :: Generator -> IO ()
stop g = writeChan (request_channel g) Die

Each client function is implemented in terms of sending the request data structure to the generator on the request_channel and then, in the case of the Get operation, blocking to read a value from the response_channel.

Finally, the request-handling event loop in a separate lightweight thread:

new_counter :: IO Generator
new_counter = do { in_chan <- newChan
                 ; out_chan <- newChan
                 ; tid <- forkIO $ loop in_chan out_chan 0
                 ; return $ Generator tid in_chan out_chan }

loop :: Chan Request -> Chan Int -> Int -> IO ()
loop ic oc i = do { req <- readChan ic
                  ; case req of 
                      Die -> return ()
                      (Set n) ->
                          (loop ic oc n)
                      Get ->
                          do { writeChan oc i
                             ; loop ic oc $! (i+1) } }

A similar pattern (request channel, response channel or one-place buffer (MVar) either pre-set or passed with the request, tail-recursive event loop) works for a wide variety of problems and presents a reasonable API for clients.

The single-threadedness of the loop makes it intuitively easy to conclude that it does the right thing (returns unique values to clients), but it's easy enough to check with some experiments in ghci:

> :m + Control.Concurrent
> g <- new_counter
> get_next g
0
> forkIO $ sequence_ $ replicate 10000000 (get_next g)
ThreadId 111
> forkIO $ sequence_ $ replicate 10000000 (get_next g)
ThreadId 112
> forkIO $ sequence_ $ replicate 10000000 (get_next g)
ThreadId 113
[... wait a while ...]
> get_next g
300000001

Which is the right answer. Another experiment will show how fair the scheduler is in terms of multiple client threads:

> g <- new_counter
> :set -fno-print-bind-result -fglasgow-exts

By the way, [TAB]-completion in ghci means that the above can be obtained by typing:

:set -fno-pr[TAB] -fgl[TAB]

The -fno-print-bind-result prevents ghci from spoiling our attempts to be lazy by printing (and thus evaluating), and the -fglasgow-exts lets us use a type specification to specify what kind of channel we're creating. (Normally, the compiler would figure it out from context, but that won't work in ghci.)

> tid::(Chan (Int,Int)) <- newChan
> mapM_ (\n -> (forkIO $ sequence_ $ replicate 1000 (get_next g >>= (writeChan tid).((,) n)))) [1..10]

The last expression looks dense, but it breaks down simply. In plain English, fork 10 threads numbered 1 through 10, and on each thread, do the following 1000 times: get a sequence number from the generator and then send the ordered pair of the threads number and the sequence number on the tid channel. (The (,) function makes ordered pairs out of its arguments.) To inspect the contents of the channel once the threads are done:

> x <- getChanContents tid

(Without the -fno-print-bind-result above, this would run forever.) And now a couple of quick checks:

> :m + List
> let y = take 10000 x
> length $ nub $ map snd y
10000
> [ length $ filter (((==) j).fst) y | j <- [1..10] ]
[1000,1000,1000,1000,1000,1000,1000,1000,1000,1000]
> let w = [ length $ nub (take 10 (drop k (map fst y))) | k <- [0..9990] ]
> [ length $ filter ((==) j) w | j <- [1..10] ]
[0,0,0,0,0,0,1,1,522,9467]

So, most of the time, in 10 turns, 10 client threads get a chance.

A Quick Word About Laziness

The $! is a piece of Haskell magic that ensures that the value on the right is evaluated. Without it, this happens:

> g <- new_counter
> get_next g
0
> sequence_ $ replicate 100000000 (get_next g)
> get_next g
*** Exception: stack overflow

The reason lies in Haskell's laziness. At the time that the second get_next is evaluated, there are 100,000,000 (i+1) queued-up and waiting to be evaluated because no one ever actually consumed the values passed back on the response channel. This is just the way that Haskell works: You can tell the runtime about a ridiculous computation, but it won't complain until you ask for the result. The $! ensures that the (i+1) value is evaluated each time Get is performed.

Take Two: TVar

Haskell's STM (software transactional memory; read/watch Simon Peyton Jones on the subject) implementation provides another set of building blocks in the form of atomically mutable cells (TVar), asynchronous channels (TChan), one-place buffers (TMVar), and arrays (TArray).

The sequence generator implementation with the same API but using a TVar to hold the current value is shorter and simpler (no backing thread) than the one above that uses channels:

import Control.Concurrent.STM
import Control.Concurrent.STM.TVar
import Control.Monad

data Generator = Generator { counter :: TVar Int }

new_counter :: Int -> IO Generator
new_counter i = (atomically $ newTVar i) >>= (return . Generator)

get_next :: Generator -> IO Int
get_next g = atomically $ do { n <- readTVar $ counter g
                             ; (writeTVar (counter g)) $! (n+1)
                             ; return n }

reset :: Generator -> Int -> IO ()
reset g n = atomically $ writeTVar (counter g) n

Note the $! that appears in get_next; it is there for the same reason as it appears in the Chan version.

The same set of basic verifications as above ends with:

> [ length $ filter ((==) j) w | j <- [1..10] ]
[9901,90,0,0,0,0,0,0,0,0]

Or, the 10 threads took blocks of 1000 numbers from the sequence because the scheduler had no reason to switch. For a slightly different spawning of clients with a yield added, we get more regular results:

> mapM_ (\n -> (forkIO $ sequence_ $ replicate 1000 (get_next g >>= (writeChan tid).((,) n) >> yield))) [1..10]
[... same steps ...]
[0,0,0,0,0,0,0,0,0,9991]
> (map fst y) == (concat $ replicate 1000 [1..10])
True

Conclusions

The STM version is probably the one that I'll use, but I also need some more complicated components where the channel-based design will work well.

(comment bubbles) 2 comments

STM and IO Redux

Paul Brown @ 2007-03-13T13:05:00Z

Pepe Iborra left a comment on my entry on STM and IO about the use unsafeIOToSTM that spurred me to do some more reading and ask a few questions by email. (Better yet, people who knew the answers were kind enough to respond.)

Better without unsafeIOToSTM

The consensus was to avoid the use of unsafeIOToSTM and just combine the IO actions in the IO monad. This changes things around a bit but in a good way; refactoring (if the word applies in this scenario) only took about 15 minutes.

Disregarding the suggestion to use TMVar for the moment, here are some revisions. (If you look at TMVar, the source is more informative than the documentation.)

First, check_out and check_in need to change, and storing an entry to disk can get simpler:

check_out :: TVar Holder -> IO Holder
check_out h = atomically ( do { h' <- readTVar h
                              ; if locked h'
                                then retry
                                else writeTVar h (lock h')
                              ; return h' } )

check_in :: TVar Holder -> Holder -> IO ()
check_in h' h = atomically ( do { h'' <- readTVar h'
                                ; if (locked h'')
                                  then writeTVar h' (unlock h)
                                  else error "Internal error." } )

store :: Holder -> IO Holder
store h = do { let e = entry h
              ; do writeFile (fname e) ((show e) ++ "\n")
              ; return h }

And two additional utility functions:

onHolders :: (Entry -> Entry) -> (Holder -> Holder)
onHolders f = \ (Holder e l) -> Holder (f e) l

s_apply :: (Entry -> Entry) -> TVar Holder -> IO ()
s_apply f h' = check_out h'
               >>= (store' . onHolders f)
               >>= (check_in h')

(The ">>=" in IO is sequencing where the output of each step is passed to the next as input.) The flow of s_apply is as follows:

  1. "Check out" the entry by setting the locked field to True and then pass the Holder to the next step.
  2. Apply the function f to the Entry wrapped in the Holder, write the result to disk, and pass it along.
  3. "Check in" the entry by setting the locked field to True and writing the new value into the TVar.

Publishing and unpublishing a persistent Entry now has the appealingly simple form:

s_publish :: TVar Holder -> IO ()
s_publish = s_apply publish

s_unpublish :: TVar Holder -> IO ()
s_unpublish = s_apply unpublish

Even Better with bracket

An anonymous commenter pointed out bracket, a function that has the same semantics as try { ... } finally { ... } in Java. The bracket function has the signature:

bracket :: IO a -> (a -> IO b) -> (a -> IO c) -> IO c

In the analogy with try/finally from Java, the IO a would occur before the try, like lock'ing a Lock in the usual idiom. The result of the initial computation is passed both to the inner computation and to the final computation, so the application of the function (e.g., publish) would need to be grouped with the check_out operation if the published Entry was to be the one checked back in. For my purposes, bracketOnError is preferable, since it only executes the fallback action if the inner action (i.e., the last argument) fails. With bracketOnError added and a little more clean-up from another pass over the code, everything gets a little simpler yet:

store :: Entry -> IO Entry
store e = do { writeFile (fname e) ((show e) ++ "\n")
             ; return e }

check_out :: TVar Holder -> IO Entry
check_out h = atomically ( do { h' <- readTVar h
                              ; if locked h'
                                then retry
                                else writeTVar h (lock h')
                              ; return $ entry h' } )

check_in :: TVar Holder -> Entry -> IO ()
check_in h' e = atomically ( do { h'' <- readTVar h'
                                ; if (locked h'')
                                  then writeTVar h' (Holder e False)
                                  else error "Programmer error." } )

s_apply :: (Entry -> Entry) -> TVar Holder -> IO ()
s_apply f h' = bracketOnError (check_out h')
               (\e -> (store.f) e >>= (check_in h'))
               (check_in h')

With s_publish as before, this does the right thing in the event of an error while writing:

*Main> th <- atomically ( newTVar (Holder (Entry "foo" False) False ))
*Main> s_show th
"Holder {entry = Entry {entry_id = \"foo\", published = False}, locked = False}"

*Main> :! chmod -w entry-foo.hb
*Main> do s_publish th
*** Exception: entry-foo.hb: openFile: permission denied (Permission denied)
*Main> s_show th
"Holder {entry = Entry {entry_id = \"foo\", published = False}, locked = False}"

*Main> :! chmod +w entry-foo.hb
*Main> do s_publish th
*Main> s_show th
"Holder {entry = Entry {entry_id = \"foo\", published = True}, locked = False}"

Whither TMVar?

From the TMVar source code comments:

A 'TMVar' is a synchronising variable, used for communication between concurrent threads. It can be thought of as a a box, which may be empty or full.

The idea would be that when an Entry was locked, its TMVar "box" would be empty, to be filled with a TVar wrapping the new value after the operation was completed. Other threads (e.g., threads rendering page views or feeds) need to be able to read values while output is being performed, so I don't think that a TMVar is what I'm after in this case.

(comment bubbles) 0 comments

STM, IO, and a Simple Persistence Model

Paul Brown @ 2007-03-04T17:57:00Z

Herein post 5 of n on my hobby project to rewrite my personal publishing software in Haskell. (In strict terms, the project is to write it, since I didn't write the current system.) This post covers persistence and concurrency using the filesystem and Haskell's software transactional memory implementation.

Exploiting Commutativity and Choosing Locking Granularity

As I imagine things working, the basic operations that I want to be able to perform against the persistent form of the blog are something like:

  • Create an entry (and by extension, a comment).
  • Change the metadata on an entry, e.g., publish/unpublish or add/remove tags.
  • Add a comment to an existing entry.

From an end-user perspective, these all commute with each other — it doesn't matter whether a comment is added before or after a tag is changed — so it's reasonable to let the system take care of ordering the operations to be performed. Moreover, because creation commutes with linking, locking granularity can be limited to an individual entry. (There is no reason to lock both the newly created comment and its parent entry simultaneously.)

Without further ado, here's a locking scheme implemented at the granularity of an entry. This would be used only for writes. First, a wrapper type to hold the lock status for an entry:

data Holder = Holder { entry :: Entry,
                       locked :: Bool }
            deriving (Show)

And then the lock/unlock code:

lock :: Holder -> Holder
lock (Holder e False) = Holder e True
lock (Holder e True) = error "Already locked."

unlock :: Holder -> Holder
unlock (Holder e True) = Holder e False
unlock (Holder e False) = error "Already unlocked."

It's worth stopping to observe a common construct in functional programming. A lock function that locks a Holder can't exist because all values are immutable. Instead, lock creates a new Holder that has locked set to True but is otherwise identical to the original, and we can use the STM mechanics to create actions to be applied to a TVar:

check_out :: TVar Holder -> STM ()
check_out h = do { h' < readTVar h 
                 ; if locked h'
                   then retry
                   else writeTVar h (lock h') }

check_in :: TVar Holder -> STM ()
check_in h = do { h' < readTVar h
                ; if locked h'
                  then writeTVar h (unlock h')
                  else error "Already unlocked." }

The retry above will cause check_out to block until the entry is checked back in, while check_in signals an error if it is asked to release an already free entry.

By the way, the following one-liner to print the showable value wrapped in a TVar is useful for experimenting with STM in ghci:
s_show :: Show a => TVar a -> IO String
s_show = atomically.(liftM show).readTVar

Operating on Entries

To integrate operations on entries, I'm going to take the minimal use case of publishing and unpublishing, so my Entry data structure is almost trivial:

data Entry = Entry { entry_id :: String,
                     published :: Bool }
             deriving (Show)

publish :: Entry -> Entry
publish (Entry i _) = Entry i True

unpublish :: Entry -> Entry
unpublish (Entry i _) = Entry i False

Add in a function to convert an Entry -> Entry function to a Holder -> Holder function:

liftH :: (Entry -> Entry) -> (TVar Holder -> STM ())
liftH f = \ h -> do { h' < readTVar h
                    ; writeTVar h ((holderize f) h')
                    ; return () }
          where holderize f = \ (Holder e l) -> Holder (f e) l

Combining a publish with check_in/check_out is straightforward in the STM monoid. Here's some scratch work in ghci that shows this in action:

$ ghci -package stm
   ___         ___ _
  / _ \ /\  /\/ __(_)
 / /_\// /_/ / /  | |      GHC Interactive, version 6.6, for Haskell 98.
/ /_\\/ __  / /___| |      http://www.haskell.org/ghc/
\____/\/ /_/\____/|_|      Type :? for help.

Loading package base ... linking ... done.
Loading package stm-2.0 ... linking ... done.
:Prelude> :load experiment.hs
[1 of 1] Compiling Main             ( experiment.hs, interpreted )
Ok, modules loaded: Main.
*Main> let h = Holder (Entry "foo" False) False
*Main> th < atomically ( newTVar h )
*Main> s_show th
"Holder {entry = Entry {entry_id = \"foo\", published = False}, locked = False}"
*Main> let co = check_out th
*Main> let pub = (liftH publish) th
*Main> let ci = check_in th
*Main> atomically ( co >> pub >> ci)
*Main> s_show th
"Holder {entry = Entry {entry_id = \"foo\", published = True}, locked = False}"

Integrating Persistence via IO

One of my working design assumptions is that the data for the system will reside entirely in memory, being updated as changes are made and reloaded (lazily) in the event of a system crash or system start-up. (As I commented previously, four years of blogging has produced around 500kb of content, mark-up included, so this isn't an unreasonable assumption.) Comments from spammers could produce a lot more data, but I plan to save every item but only load published items into memory. (So spammers are just going to burn disk space.) I'm going to aim for one file per entry, for the sake of the current discussion, named by the entry_id of the Entry. Conveniently, STM includes the unsafeIOToSTM function for composing STM actions and IO actions. (The other way around is not permitted by design.)

Attention: I've gotten some public and private comments that unsafeIOToSTM is not the right thing to use in this scenario, so I've written a revision to this entry.

Writing an entry to a file is straightforward:

store :: TVar Holder -> STM ()
store h = do { h' < readTVar h
             ; let e = entry h'
             ; unsafeIOToSTM (writeFile (fname e) ((show e) ++ "\n")) }

Continuing the same ghci session from above:

*Main> let out = store th
*Main> :! cat entry-foo.hb
cat: entry-foo.hb: No such file or directory
*Main> atomically (  co >> pub >> out >> ci )
*Main> :! cat entry-foo.hb
Entry {entry_id = "foo", published = True}
*Main> let unpub = (liftH unpublish) th
*Main> atomically (  co >> unpub >> out >> ci )
*Main> :! cat entry-foo.hb
Entry {entry_id = "foo", published = False}

This could (and probably should) be made a bit fancier with regard to recovering from errors while writing the file, but I'm happy with the basic ergonomics so far.

(comment bubbles) 1 comment

More Haskell and Personal Publishing Platform Ramblings

Paul Brown @ 2006-10-18T07:00:00Z

Herein, part two of n, where I think about some basic design decisions and some actual code gets written.

From GET to Function Currying

First up, some thoughts about how to map a request to a response.

A weblog is a list of entries grouped and displayed in different formats according to parameters supplied by client applications, so serving pages is limited to figuring out what format to display (single-entry HTML, multi-entry HTML, Atom) and which entries to include (include/exclude by tag, include/exclude by category, date range, maximum number of entries). Ideally, I'd like a reader to be able to, e.g., exclude only the posts I make about my kid or include only posts about BPEL and Java.

The simple design is to turn the combination of a URL and parameters into a list of functions that are applied to the list of all entries, loosely:

(format url) (filter (filters url) entries)

And that's the whole program, up to details. In mildly abused Haskell notation:

format :: URL -> [Entry] -> String
filters :: URL -> [Entry -> Boolean]

In Haskell, the notation

f :: A -> B

is very much as it is in mathematics with f being a function with domain (things of type) A and codomain (things of type) B. Unraveling expressions with multiple arrows is accomplished by adding parentheses from the right. That is,

f :: A -> B -> C  =  f :: A -> (B -> C)

is essentially a function from A×B to C as in "f is a function that maps (things of type) A to a function that maps (things of type) B to (things of type) C". So, the function format above maps a URL and a list of Entry to a String. This is one of the things that I like about Haskell — a function looks like a function.

Haskell supports currying of functions, which is only appropriate, as the language was named for Haskell Curry. The filters can be implemented by currying arguments onto functions that take two arguments. For example, consider the functions:

include_by_tag :: String -> Entry -> Boolean
include_by_tag s e = s `elem` (tags e)

exclude_by_tag :: String -> Entry -> Boolean
exclude_by_tag s e = s `notElem` (tags e)

(tags is a function that returns the list of tags applied to an entry.) In addition to showing off that Haskell lets you flip back and forth between prefix notation (foo 1 2) and infix notation (1 `foo` 2), the idea would be to map a query string atom like tag=foo to the function (include_by_tag "foo") or a query string atom like tag=-foo to the function (exclude_by_tag "foo").

Playing with the concept in ghci is straightforward. For example, drop the following two lines of Haskell into a text file called curryplus.hs:

plus :: Integer -> Integer -> Integer
plus x y = x + y

And then fire-up ghci:

$ ghci
   ___         ___ _
  / _ \ /\  /\/ __(_)
 / /_\// /_/ / /  | |      GHC Interactive, version 6.6, for Haskell 98.
/ /_\\/ __  / /___| |      http://www.haskell.org/ghc/
\____/\/ /_/\____/|_|      Type :? for help.

Loading package base ... linking ... done.
Prelude> :load curryplus.hs
[1 of 1] Compiling Main             ( curryplus.hs, interpreted )
Ok, modules loaded: Main.
*Main> :t plus
plus :: Integer -> Integer -> Integer
*Main> :t (plus 1)
(plus 1) :: Integer -> Integer
*Main> let f = (plus 1)
*Main> f 2
3

The :t command in ghci interrogates the type of the expression passed to it.

Now, where does the list of entries come from?

Storing Entries — Database or Filesystem or...?

Databases are lovely places to store data where reads and writes may overlap, and the filesystem is a good place to store information that either doesn't fit or isn't needed in memory. For a weblog, read/write contention should be light (frequent reads, infrequent writes), with writes limited to posts and comments, and optimistic concurrency is entirely acceptable. (It's of no consequence if someone gets slightly stale content.) However, the total amount of content in my weblog, counting from 2002, is in the hundreds of kilobytes, so there is no reason not to hold the whole thing in memory.

Haskell (and specifically GHC) has a couple of shiny objects that I'm tempted by. The shiniest one is STM or Software Transactional Memory, and like the three-line quicksort implementation is one of the teasers for Haskell, the four-line AtomicInteger.getAndIncrement() implementation is the teaser for STM:

get_and_increment :: TVar Integer -> IO Integer
get_and_increment i = atomically ( do j <- readTVar i
                                      writeTVar i (j+1)
                                      return j )

The whitespace in the above definition is critically important, as it tells Haskell that the lines are all part of the do. I'll come back to the left arrow ("<-") and do notation below.

Experimenting with this in ghci isn't much more complicated than the plus example above. Put the four lines above in a text file called get_and_inc.hs following the import statement:

import Control.Concurrent.STM

And fire up ghci with an extra directive to get it to load the STM package:

$ ghci -package stm
   ___         ___ _
  / _ \ /\  /\/ __(_)
 / /_\// /_/ / /  | |      GHC Interactive, version 6.6, for Haskell 98.
/ /_\\/ __  / /___| |      http://www.haskell.org/ghc/
\____/\/ /_/\____/|_|      Type :? for help.

Loading package base ... linking ... done.
Loading package stm-2.0 ... linking ... done.
Prelude> :load get_and_inc.hs
[1 of 1] Compiling Main             ( get_and_inc.hs, interpreted )
Ok, modules loaded: Main.
*Main> x <- atomically ( newTVar 1 )
*Main> :t x
x :: TVar Integer
*Main> get_and_increment x
1
*Main> get_and_increment x
2
*Main> get_and_increment x
3

The original paper on STM is from Microsoft Research back in 2005. The follow-on paper Lock Free Data Structures using STM in Haskell is a good read, wherein the authors construct two implementations of ArrayBlockingQueue in Haskell, one using locks and one using STM, and then compare their performance. (Spoiler: The larger the number of processors, the better STM performs.)

As promised, a quick word about <- and do. Haskell is a purely functional language, and that means that it is side-effect-free. On the one hand it's great to have referential transparency, since it lets the compiler or runtime VM do things like replace common pieces of a complicated expression with values, but on the other hand, something imperatively trivial like console output is a side-effect. The elegant workaround is to allow a function to return an action as a value, where the action is to be performed by an external observer at some point. The construct is called a monad, and <- and do are some of the notation that comes with it. (Defining the nature of the action and the observer is the art of constructing a monad.) The "do" chains up actions, and the "<-" captures a value created by an action. Yet another good paper from Microsoft Research has a thorough tutorial on monads in Haskell.

This brings me to the design for storing and accessing the entries:

  • Use a single multi-threaded FastCGI handler to serve all requests.
  • Maintain an in-memory copy of all content (entries and approved comments), with access and updates managed via STM.
  • Store entry and approved comment content as separate files on disk, to be loaded at start-up time and written when an entry is posted or comment approved.

Data structures and a lo-fi approach to outputting Atom are up next.

(comment bubbles) 6 comments