Moving on

I'm moving this blog to live spaces. The feed can be found here. Despite the rather idiotic restriction of only one category-per-post, I've found it marginally easier to format in-line code there, and the ability to put code samples up on a publicly available folder was the clincher.

Proof perhaps that beggars can be choosers....

0 comments

F# Asynchronous Workflows with the Coordination and Concurrency Runtime

F# ships with library support for ‘mailboxes’, an asynchronous message-passing mechanism recently popularised by interest in Erlang. This is good news. Message-passing in concurrent systems is always a saner approach than explicitly spawning threads and taking locks, and F#’s support for immutable types and values provides additional safety. Combined with the asynchronous workflows that the language also supports, a powerful Actor-oriented programming model is available to F# developers.

If you’re playing in this space in .Net though, I still think you can’t go past the Concurrency and Coordination Runtime (CCR), especially on performance terms, and it turns out to be straightforward to integrate the CCR with F# asynchronous workflows.

Let’s start out by defining some basic CCR support – this is mostly just plumbing…

#light

namespace
FSharp.Ccr.Adaptors

open System
open System.Threading
open Microsoft.Ccr.Core
open Microsoft.Ccr.Core.Arbiters

type Receive() =

static let taskQueue = new DispatcherQueue()

static member internal TaskQueue with get() = taskQueue

static member internal activate(task : ITask) =
Arbiter.Activate(taskQueue, [| task |])



In the declaration above, there is only one task queue over the standard CLR thread-pool. I’ll cover the support for specifiable task queues in a later post. Now let’s cover some adaptor code that enables the use of the basic arbiters from within F# asynchronous workflows...



    (* Receive. This allows us to write an asynchronous workflow that waits (but does not block) 
on the port as follows:
let recv (port : Port<string>) =
async {
let! item = Receive.from port
printfn "%s" item
}
*)

[<OverloadID("from")>]
static member from (p : Port<'a>) =
Async.Primitive(fun (cont, econt) ->
Arbiter.Receive(false, p, (fun result -> cont result))
|> Receive.activate)

(* JoinedReceive. This allows us to write an asynchronous workflow that waits (but does not block)
until both ports have an item for processing. It then returns both in a tuple:
let recv (p1 : Port<string>, p2 : Port<int>) =
async {
let! s1,i1 = Receive.from_both (p1,p2)
printfn "%s %i" s1 i1
}
*)
[<OverloadID("from_both")>]
static member from_both (p1 : Port<'a>, p2 : Port<'b>) =
Async.Primitive(fun (cont, econt) ->
Arbiter.JoinedReceive(false, p1, p2, fun a b -> cont (a,b))
|> Receive.activate)
   
(* Choice. This allows us to write an asynchronous workflow that waits (but does not block)
       until either port has an item for processing. It then returns the result in a
       Choice:
    let recv (p : PortSet<string,int>) =
        async {
            let! result = Receive.from_either p
            match result with
            | Choice2_1 s -> printfn "%s" s
            | Choice2_2 i -> printfn "%i" i
        }
     *)
   
     [<OverloadID("from_either")>]
    static member from_either (ps : PortSet<'a,'b>) =
        Async.Primitive(fun (cont, econt) ->
            Arbiter.Choice(ps, (fun s -> cont (Choice2_1 s)), (fun e -> cont (Choice2_2 e)))
            |> Receive.activate)

(* MultipleItemReceive. This allows us to write an asynchronous workflow that waits (but does not block)
until n items have queued to the port. It then returns the result in an array.
let recv (p : Port<string>) =
async {
let! result = Receive.multiple_from (p,100)
printfn "%i items received." result.Length
}
*)

[<OverloadID("multiple_from")>]
static member multiple_from (p : Port<'a>, n : int) =
Async.Primitive(fun (cont, econt) ->
Arbiter.MultipleItemReceive(false, p, n, fun r -> cont r)
|> Receive.activate)

(* MultipleItemReceive. This allows us to write an asynchronous workflow that waits (but does not block)
until n items have queued to the portset. It then returns the result as a tuple of
collections.
let recv (p : PortSet<string,Exception>) =
async {
let! results,failures = Receive.multiple_from_both (p,100)
printfn "%i results received and %i failures." results.Count failures.Count
}
*)

[<OverloadID("multiple_from_both")>]
static member multiple_from_both (p : PortSet<'a,'b>, n : int) =
Async.Primitive(fun (cont, econt) ->
Arbiter.MultipleItemReceive(p, n, fun f s -> cont (f,s))
|> Receive.activate)

(* MultiplePortReceive. This allows us to write an asynchronous workflow that waits (but does not block)
until each port in the specified array has a pending item. It then returns the
result as a array.
let recv (p : Port<string> array) =
async {
let! results = Receive.from_multiple p
printfn "%i results received." results.Length
}
*)

[<OverloadID("from_multiple")>]
static member from_multiple (p : Port<'a> array) =
Async.Primitive(fun (cont, econt) ->
Arbiter.MultiplePortReceive(false, p, fun r -> cont r)
|> Receive.activate)


The Overloads are defined because each of these members has an alternate version that takes a timeout limit. I’ll cover those in the next post.

29 comments