F# Asynchronous Workflows with the Coordination and Concurrency Runtime
F# ships with library support for ‘mailboxes’, an asynchronous message-passing mechanism recently popularised by interest in Erlang. This is good news. Message-passing in concurrent systems is always a saner approach than explicitly spawning threads and taking locks, and F#’s support for immutable types and values provides additional safety. Combined with the asynchronous workflows that the language also supports, a powerful Actor-oriented programming model is available to F# developers.
If you’re playing in this space in .Net though, I still think you can’t go past the Concurrency and Coordination Runtime (CCR), especially on performance terms, and it turns out to be straightforward to integrate the CCR with F# asynchronous workflows.
Let’s start out by defining some basic CCR support – this is mostly just plumbing…
#light
namespace FSharp.Ccr.Adaptors
open System
open System.Threading
open Microsoft.Ccr.Core
open Microsoft.Ccr.Core.Arbiters
type Receive() =
static let taskQueue = new DispatcherQueue()
static member internal TaskQueue with get() = taskQueue
static member internal activate(task : ITask) =
Arbiter.Activate(taskQueue, [| task |])
In the declaration above, there is only one task queue over the standard CLR thread-pool. I’ll cover the support for specifiable task queues in a later post. Now let’s cover some adaptor code that enables the use of the basic arbiters from within F# asynchronous workflows...
(* Receive. This allows us to write an asynchronous workflow that waits (but does not block)
on the port as follows:
let recv (port : Port<string>) =
async {
let! item = Receive.from port
printfn "%s" item
}
*)
[<OverloadID("from")>]
static member from (p : Port<'a>) =
Async.Primitive(fun (cont, econt) ->
Arbiter.Receive(false, p, (fun result -> cont result))
|> Receive.activate)
(* JoinedReceive. This allows us to write an asynchronous workflow that waits (but does not block)
until both ports have an item for processing. It then returns both in a tuple:
let recv (p1 : Port<string>, p2 : Port<int>) =
async {
let! s1,i1 = Receive.from_both (p1,p2)
printfn "%s %i" s1 i1
}
*)
[<OverloadID("from_both")>]
static member from_both (p1 : Port<'a>, p2 : Port<'b>) =
Async.Primitive(fun (cont, econt) ->
Arbiter.JoinedReceive(false, p1, p2, fun a b -> cont (a,b))
|> Receive.activate)
(* Choice. This allows us to write an asynchronous workflow that waits (but does not block)
until either port has an item for processing. It then returns the result in a
Choice:
let recv (p : PortSet<string,int>) =
async {
let! result = Receive.from_either p
match result with
| Choice2_1 s -> printfn "%s" s
| Choice2_2 i -> printfn "%i" i
}
*)
[<OverloadID("from_either")>]
static member from_either (ps : PortSet<'a,'b>) =
Async.Primitive(fun (cont, econt) ->
Arbiter.Choice(ps, (fun s -> cont (Choice2_1 s)), (fun e -> cont (Choice2_2 e)))
|> Receive.activate)
(* MultipleItemReceive. This allows us to write an asynchronous workflow that waits (but does not block)
until n items have queued to the port. It then returns the result in an array.
let recv (p : Port<string>) =
async {
let! result = Receive.multiple_from (p,100)
printfn "%i items received." result.Length
}
*)
[<OverloadID("multiple_from")>]
static member multiple_from (p : Port<'a>, n : int) =
Async.Primitive(fun (cont, econt) ->
Arbiter.MultipleItemReceive(false, p, n, fun r -> cont r)
|> Receive.activate)
(* MultipleItemReceive. This allows us to write an asynchronous workflow that waits (but does not block)
until n items have queued to the portset. It then returns the result as a tuple of
collections.
let recv (p : PortSet<string,Exception>) =
async {
let! results,failures = Receive.multiple_from_both (p,100)
printfn "%i results received and %i failures." results.Count failures.Count
}
*)
[<OverloadID("multiple_from_both")>]
static member multiple_from_both (p : PortSet<'a,'b>, n : int) =
Async.Primitive(fun (cont, econt) ->
Arbiter.MultipleItemReceive(p, n, fun f s -> cont (f,s))
|> Receive.activate)
(* MultiplePortReceive. This allows us to write an asynchronous workflow that waits (but does not block)
until each port in the specified array has a pending item. It then returns the
result as a array.
let recv (p : Port<string> array) =
async {
let! results = Receive.from_multiple p
printfn "%i results received." results.Length
}
*)
[<OverloadID("from_multiple")>]
static member from_multiple (p : Port<'a> array) =
Async.Primitive(fun (cont, econt) ->
Arbiter.MultiplePortReceive(false, p, fun r -> cont r)
|> Receive.activate)
The Overloads are defined because each of these members has an alternate version that takes a timeout limit. I’ll cover those in the next post.
9 comments:
-
Hな人妻たちの社交場、割り切った付き合いも当然OK!欲求不満のエロ人妻たちを好みに合わせてご紹介します。即会い、幼な妻、セレブ、熟女、SM妻、秘密、以上6つのジャンルから遊んでみたい女性を選んでください
-
1日5万円~が手に入るサイドビジネスのご案内です。男狂いのセレブ女性はネットで知り合った男を次々に金の力で食い散らかしています。そんな女性を手玉にとって大金を稼いでみませんか
-
みんなで楽しめるHチェッカー!簡単な設問に答えるだけであなたの隠されたH度数がわかっちゃいます!あの人のムッツリ度もバレちゃう診断を今すぐ試してみよう
-
最近流行の家出掲示板では、各地のネットカフェ等を泊り歩いている家出少女のメッセージが多数書き込みされています。彼女たちはお金がないので掲示板で知り合った男性の家にでもすぐに泊まりに行くようです。あなたも書き込みに返事を返してみませんか
-
性欲を持て余し、欲求不満になっている女性を金銭の対価を得て、癒して差し上げるお仕事です。参加にあたり用紙、学歴等は一切問いません。高収入アルバイトに興味のある方はぜひどうぞ
-
童貞を奪ってみたい女性たちは、男性にとって「初体験」という一生に一度だけの、特別なイベントを共に心に刻み込むことを至上の喜びにしているのです。そんな童貞好きな女性たちと高級チェリーで最高のSEXをしてみませんか
-
最近寂しくて困っています。夜一人で寝るのが凄く寂しいです…隣で添い寝してくれる男性いませんか?見た目とか特に気にしません。優しくて一緒にいてくれる方大歓迎☆一緒に布団で温まりましょう♪shart.enamorado.de-me@docomo.ne.jp
-
一晩の割り切ったお付き合いで副収入が得られるサイトのご案内です。アルバイト感覚での挑戦もできる、安心の無料登録システムを採用しておりますので、興味のある方は当サイトをぜひご覧ください
-
復活、スタービーチ!日本最大の友達探しサイトがついに復活、進化を遂げた新生スタビをやってみませんか?理想のパートナー探しの手助け、合コンパーティー等も随時開催しています。楽しかった頃のスタビを体験しよう
