Bits and Pieces
This has been a pretty content-free zone lately. Most of that has been due to work pressure. I've got a few systems going live around now and, as always, the last 20% of each project takes 80% of the time. Anyway, I have been looking at a few things here and there that might be of interest...
First off, I noticed that George Chrysanthakopoulos has announced on the MSRS CCR forum that work is underway to unify the CCR with the PFX libraries recently released to CTP, although no time-frames have been given. Additionally, George also states that they are working towards the integration of the CCR into the core .NET libraries (possibly via PFX?). This is excellent news all round. I read with interest the MSDN article on the Task Parallel library, which sits at the core of PFX. Actually, in my line of work, I believe the CCR would be more useful than PFX, as taming the asynchrony, whilst exploiting the latent concurrency tends to be the bigger (and harder) problem than parallelising the processing of the data itself. Nevertheless, it's as a good a vehicle as any in which to ship the CCR and means that in future, if you're considering the library and don't work in robotics you won't have to try and convince your boss why buying and deploying MSRS is a good idea.
I've also been following the herd and researching F#, following the announcement that Microsoft are to fold it into the set of officially supported .NET languages. I've noted before what I liked in general about the language, but readers of this blog might be interested in what it offers in the sort of space that CCR plays in.
- Immutability. In common with many functional languages the default syntactic declaration of a 'variable' is immutable. Immutability is a generally useful property in concurrent programming and message-passing systems in particular. F# makes it simple to both declare types (records) and instances of them in a way well-suited to message-passing.
- Composition. Functional languages tend to compose and decompose solutions very well (see this classic introduction). This ability helps factor out a lot of concurrent machinery, whilst still exploiting it. In fact, Joe Armstrong called this 'Abstracting out Concurrency' in his PhD thesis, and Erlang programs, whilst naturally concurrent, tend to make heavy use of these common patterns.
- The latest version of F# contains support for 'workflows', which are very similar in concept to CCR iterators, although an entirely different implementation under the covers. Robert Pickering's blog has covered these in a recent series of tutorials, as well the Erlang-style message-passing approach (something akin to one-time CCR receivers in a read-process loop) it supports.
Incidentally, I read Robert's book, Foundations of F# as my first foray into the language. I thought the book was pretty good, but felt (as an FP novice) that it might have benefited more by explaining a little more about FP style and accumulated good practice. Anyway, since then I have also read Practical OCaml, since F# aims to be compatible with a subset of OCaml. If you're considering either, my personal recommendation would be read the OCaml book first, and then read Foundations, since the former does a better job of introducing functional programming and the latter provides solid examples of integration with the existing BCL.
I've also been working on very lightweight COM-based C++ library for Erlang-style message-passing and task distribution over the native Win32 thread-pool. It's an ATL-style solution i.e. headers only and I'll blog a little more about this in future.
F# Goes 'Official'
Joe Duffy points to the announcement that the Developer Division at Microsoft is folding F# into the 'official' set of supported .NET languages. This is great news.
Hiring...
I don't normally write about my job, so I'm going to write about a different one that could be yours...
My team has vacancies. We design, develop and support equity-derivatives trading systems for one of Australia's foremost financial institutions for markets at home and abroad. You'd like to work in a small, highly-skilled team of software developers and have some or all of the following:
- Strong C++ and optionally VB6 and/or .NET
- Significant experience of multi-threaded server application development
- SQL experience (preferably SQL Server)
- A solid understanding of IP-based communication protocols
- A working knowledge of equity-derivative products
- A working knowledge of client-side development
You will be a self-starter who can work with minimum supervision on projects from inception through delivery and onto support, across the various architectural tiers. You will be comfortable communicating with the key business stake-holders, primarily traders and business managers and be focused on delivering solutions that meet their needs.
Successful applicants would fill vacancies at our Sydney offices and should hold either Australian Citizenship or Permanent Residence. (There are opportunities abroad also).
If you're interested, please drop me a line via my gmail address (nagunn at ...) and we can take it from there.
F# and Message-Passing
My inexperience with F# has caught me out. Robert Pickering has an article on asynchronous workflows (similar concept to CCR iterators) in which he mentions a forthcoming article on 'mailboxes', the message-passing feature I said didn't exist. I look forward to it.
Why COM STAs aren't completely awful
...and why productivity and pragmatism are important too. The last two paragraphs are especially interesting.
F# and DSS
So I rushed ahead of myself a bit and though that I'd try and implement a DSS service in F#. I've posted below what I think is the equivalent code for the default DSS service you get when you create a new DSS simple service project. There are three salient points:
- I've collapsed the *Types and *Service into a single file.
- The support for CCR iterators in the handler methods is a little bit of a hack (my hack).
- It doesn't work. It doesn't even compile.
(3) is clearly the most salient point. DSS requires that the Contract class in a service namespace expose a string literal called 'Identifier' that contains the contract URI. Unfortunately, the code listed below doesn't expose a string literal but rather a read-only property. CLR aficionados will know that the difference is that clients that refer to 'Contract.Identifier' via a literal end up with that literal in their own assembly so that if that literal changes in the source assembly, pre-existing dependent assemblies don't reflect that change.
[BTW, don't change contract identifiers like this. Treat them as web-shaped GUIDs].
Unfortunately, F# does not currently support const literals. So you cannot currently write DSS services in F#. Which is a shame because I think the code could be a slight improvement on the C# equivalent.
Incidentally, it doesn't compile simply because of the use of Contract.Identifier in the ContractAttribute. As a read-only property, this is a runtime evaluation, not a compile-time one so cannot be used in an attribute constructor. This can be fixed by just replacing the symbolic property with its literal value. However, the DSS runtime still fails to find a Contract class of the right shape.
There are some differences also in the implementation of the handler methods, due to the current limitations of computation expressions, the primary one being (approximately) that such expressions can only yield results directly - you can't write 'regular' code between yields. The workaround for this is to yield functions that contain the regular code and each return an ITask, then have another computation expression that yields the result of those functions. The following snippets from an F# file of CCR helpers illustrate this:
#light
open System
open Microsoft.Ccr.Core
let deref (x : seq<unit -> ITask>) = (seq { for f in x do yield f() }).GetEnumerator()
let handler x = Arbiter.FromHandler(new Handler(x))
let receive<'a> persist (port : Port<'a>) handler =
Arbiter.Receive(persist, port, new Handler<'a>(handler))
let receive_one<'a> (port : Port<'a>) = receive false port
let receive_many<'a> (port : Port<'a>) = receive true port
let receive_with_iterator<'a> persist (port : Port<'a>) handler =
Arbiter.ReceiveWithIterator(persist, port, new IteratorHandler<'a>(fun(a) ->
deref(handler(a))))
let receive_one_with_iterator<'a> (port : Port<'a>) =
receive_with_iterator false port
let receive_many_with_iterator<'a> (port : Port<'a>) =
receive_with_iterator true port
let activate_many taskQueue tasks =
Arbiter.Activate(taskQueue, [| for t in tasks -> t :> ITask |] )
let activate taskQueue task =
activate_many taskQueue [| task |]
let ( &= ) taskQueue task =
activate taskQueue task
And here's an example of using them
let port = new Port<int>()
taskQueue &= (receive_many_with_iterator port (fun i ->
print_endline ("Triggered.")
seq {
yield fun() ->
receive_one port (fun i -> print_endline (i.ToString()))
yield fun() ->
receive_one port (fun i -> print_endline (i.ToString()))
}))
port.Post(1)
port.Post(2)
port.Post(3)
Anyway, I digress. Here's the code for the service. My F# experience is minimal, so I'd be happy to hear about any improvements to any of this code.
#light
namespace Iodyne.Drivers.Tcp.Connection
open System
open System.ComponentModel
open System.Net
open System.Net.Sockets
open Microsoft.Ccr.Core
open Microsoft.Dss.Core.Attributes
open Microsoft.Dss.ServiceModel.Dssp
open Microsoft.Dss.ServiceModel.DsspServiceBase;
open W3C.Soap
open Ccradaptors;
type Contract = class
static member Identifier = "http://iodyne.blogspot.com/2007/10/driver/tcp/connection"
end
[<DataContract>]
type State = class
new() = {}
end
type Get = class
inherit Get<GetRequestType, PortSet<State, Fault>>
new() = {}
new(body : GetRequestType) =
{ inherit Get<GetRequestType, PortSet<State, Fault>>(body) }
new(body : GetRequestType, response : PortSet<State, Fault>) =
{ inherit Get<GetRequestType, PortSet<State, Fault>>(body, response) }
end
[<ServicePort>]
type Operations = class
inherit PortSet<DsspDefaultLookup, DsspDefaultDrop, Get>
new() = {}
end
[<DisplayName("TcpConnection");
Description("TcpDriver Connection Service");
ContractAttribute(Contract.Identifier)>]
type Service = class
inherit DsspServiceBase as base
[<ServicePort(AllowMultipleInstances = true)>]
val mainPort : Operations
val state : State
new(creationPort : DsspServiceCreationPort) = {
inherit DsspServiceBase(creationPort) ;
state = new State()
mainPort = new Operations() }
override this.Start() =
base.Start()
[<ServiceHandler(ServiceHandlerBehavior.Concurrent)>]
member this.GetHandler(get : Get) =
enumerate
(seq {
yield (handler (fun () -> get.ResponsePort.Post(this.state)))
})
end
F# and CCR
I mentioned a while back that I'd read Joe Armstrong's PhD thesis on building reliable distributed systems. Early on, Joe introduces Erlang, a language he and his team designed at Ericsson to build large-scale, fault-tolerant, concurrent systems.
I've no background at all in functional languages, but I was struck by 4 things (in no particular order):
- The code seemed to have hit a sweet-spot of readability and brevity. It's a compact syntax.
- The support of functions as first-class types (a feature of functional languages), allowed for some simple but powerful constructs.
- A lot of type inference (and a lot less typing!)
- Language support for message-passing primitives.
I won't expand particularly on these right now, but some of these features have crept into C#, notably limited type inference (i.e. the var keyword) and lambda functions (via succinct syntax for anonymous delegates).
Anyway, whilst reading round Erlang and functional languages in general, I discovered F#, a research language from the Microsoft Research group at Cambridge University, England. F# is a CLR language, with strong functional influences but with support also for imperative and object-oriented programming. (One of the principal developers, Don Syme, was one of the guys who worked out how to do generics on the CLR).
F# shares with Erlang the first 3 points above. And that got me thinking that the CCR, integrated through the F# could provide the fourth. You can use the CCR directly from F#, but wrapper functions make it easier still to add powerful concurrency and co-ordination support for task-based programming directly into F# in a natural way, and I'll blog about these in forthcoming posts.
Crunched
I was asked on the Australian DotNet Mailing List (send "subscribe dotnet" in the body of the email to: imailsrv@stanski.com) if I could provide the CCR posts as a document, to make it easier to print off. Sort of. My host (blogger) doesn't support downloads as far as I can tell, and Windows Live Writer (Beta) has no facility for saving posts as word documents, so it was a cut-and-paste job in the end.
The document can be found on FileCrunch.
It's a little rough and ready, in particular the spacing of the code samples is not all it could be. Any hints or tips on this greatly received.
Innovation book
I've started reading Scott Berkun's 'The Myths of Innovation'. It's an interesting read, exposing the cultural myths that surround ideas, those who have them and those who capitalise on them. It's also an O'Reilly book without an animal on the front!
DSS Optimizations
I was looking at some of the code generated by the dssproxy tool. When you define a service (by specifying types and operations), the dssproxy tool takes your service assembly and generates two associated assemblies, a proxy and a transform.
The proxy is the assembly that clients of the service use to send and receive messages. It contains representations of the service types that can also serialize and deserialize themselves for transmission. I noticed that these representations have a small inefficiency on how they deal with the deserialization of List<T>s. Prior to the state of each T is the count of all Ts in the list. This information is used to determine how many Ts to expect in the stream. It could also be used to construct the list with an initial capacity, to avoid excessive re-allocation and copying for large numbers of T. The generated code doesn't currently do this. Fortunately, you do have the source for the proxies, so you can change it yourself if it really matters.
Incidentally, the transform assembly is used in the same app-domain as the service to transform the proxy types (e.g. the ones that arrive off the wire) into the original types as defined in your service assembly. The transform suffers the same small issue regarding collections.
The other thing I noticed was that DSS maintains tables (of type Dictionary<K,V>) keyed on Type. One place these tables are used is in looking up the correct transform for a proxy instance. Given an object of some proxy type, it calls GetType() and uses the resulting Type object to look up the transform instance.
As Vance Morrison pointed out, in his excellent blog, if all you need is a type look-up then it's more efficient to use a RuntimeTypeHandle. It's a simple struct that just contains an IntPtr, so it consumes less space than a Type reference and as a struct doesn't participate in garbage collection cycles. As he points out later in the article, using the handle instead of the type will reap more benefits still once certain JIT optimizations are released.
All (not) my own work
I've not posted much of my own recently. Outside of work (which has been getting a little hectic lately), I've been working on a small, sample project that will hopefully be a good segway into Distributed Software Services, which provides an application model for highly-concurrent, message-passing based programs in .NET.
That will come after I finish off covering the remaining CCR features, specifically causalities and maybe interop. I'd be happy to hear any suggestions for future posts that you think I've not covered, or not covered sufficiently well. I'm aware that after building up CCR iterators, I rushed through them rather quickly.
Just wing it
Courtesy of Kiran, an interesting presentation by Edward A. Lee on why shared memory is broken and what is needed instead to 'make concurrency mainstream'.
[I couldn't get it to go under Firefox, but IE had no bother. You may get different results.]
Newsqueak and Payback Time
An interesting video presenting Newsqueak, another language with message-passing built in. And Joe Armstrong looks forward to a golden era for Erlang developers. Am I just looking harder or is message-passing gaining momentum nowadays?
I'm back
Obviously. Blogger suspended my account as their anti-spam bot identified this content as spam. So much for heuristics. Or maybe not depending on your point of view.
CCR 101 - Part 9: Iterators
Iterators (or Iterative Tasks in CCR parlance) are one of the standout features of the CCR. Their design and implementation is an imaginative fusion of the message-passing paradigm and the iterator functionality introduced in C# 2.0. But whether or not you choose to sit and admire the aesthetics of the CCR, you would probably agree that it is what they enable that is truly impressive - the ability to write a logical sequence of actions, within a single method, without ever blocking the thread between those actions.
Now obviously the strength of this functionality really shines through when each action results in some form of asynchronous I/O, e.g. a web request or a database write. In this case, the thread is freed up to service other tasks whilst the I/O completes, before returning to the next action in the sequence. But remember, in our CCR-based message-passing world, pretty much all I/O is performed asynchronously even if those messages are passed between CPU-bound services in the same process.
C# 2.0 Iterators: A (quick) recap.
The fundamental idea of the iterator functionality is to allow the developer to present an enumerable list to some other code without the explicit population or implementation of that list. Here's a sample that presents the list of the first 5 primes.
class Sample0901 {
static void Main(string[] args) {
foreach (int p in FirstFivePrimes()) {
Console.WriteLine(p.ToString());
}
Console.ReadLine();
}
private static IEnumerable<int> FirstFivePrimes() {
yield return 1;
yield return 2;
yield return 3;
yield return 5;
yield return 7;
}
}
Very contrived, and next to useless, but let's just examine the salient points. The first is that the client can just write a simple foreach loop. This in itself is just a shorthand (and compiler-generated) way of writing:
class Sample0902 {
static void Main(string[] args) {
IEnumerator<int> pe = FirstFivePrimes().GetEnumerator();
while (pe.MoveNext())
Console.WriteLine(pe.Current.ToString());
Console.ReadLine();
}
// Elided...
}
Now, the really clever code-generation is not in Main(), but in FirstFivePrimes(). The compiler generates a new class for this method that acts as a state machine. GetEnumerator() returns a new instance of this class. The client moves through the state by calling MoveNext() on the enumerator and the generated class exposes this state through the Current value. Each 'yield return X' statement sets the new state of the instance.
There are other, much better articles on iterators, in MSDN Magazine and on Matt Pietrek's blog.
Iterators the CCR way.
In the Coordination and Concurrency Runtime, iterators are used to return tasks that are then scheduled for execution. The CCR calls MoveNext() to execute the first task, which will execute all the code in the iterator method up to and including the first yield statement. And here's another clever bit. By yielding arbitrations (e.g. join or choice), which are themselves tasks, you can logically block until the arbitration completes. In the meantime, the dispatcher thread just goes off and processes other pending tasks.
Let's take a look at an example. The following code crawls the front page of this blog's home site for links to MSDN blogs. It then issues requests to fetch each of these blogs and prints out how many it successfully retrieved and how many failed.
class Sample0903 {
static void Main(string[] args) {
using (Dispatcher dispatcher = new Dispatcher()) {
using (DispatcherQueue taskQueue = new DispatcherQueue("IteratorSample", dispatcher)) {
string url = "http://iodyne.blogspot.com";
Arbiter.Activate(taskQueue,
Arbiter.FromIteratorHandler(delegate { return DoWork(url); }));
Console.ReadLine();
}
}
}
private static IEnumerator<ITask> DoWork(string url) {
PortSet<WebResponse, Exception> webResult = new PortSet<WebResponse, Exception>();
WebRequest webRequest = WebRequest.Create(url);
webRequest.BeginGetResponse(delegate(IAsyncResult iar) {
try {
webResult.Post(webRequest.EndGetResponse(iar));
} catch (Exception e) {
webResult.Post(e);
}
}, null);
WebResponse webResponse = null;
yield return Arbiter.Choice(webResult,
delegate(WebResponse wr) {
webResponse = wr;
},
delegate(Exception e) {
Console.WriteLine(e);
});
if (webResponse == null)
yield break;
string body = new StreamReader(webResponse.GetResponseStream()).ReadToEnd();
webResponse.Close();
Regex hrefRegex = new Regex("http\\:\\/\\/blogs\\.msdn\\.com\\/[^'\"\"]*", RegexOptions.IgnoreCase);
MatchCollection mc = hrefRegex.Matches(body);
PortSet<EmptyValue, Exception> multiPort = new PortSet<EmptyValue, Exception>();
foreach (Match m in mc) {
string murl = m.ToString();
WebRequest murlRequest = WebRequest.Create(murl);
murlRequest.BeginGetResponse(delegate(IAsyncResult iar) {
try {
murlRequest.EndGetResponse(iar).Close();
multiPort.Post(EmptyValue.SharedInstance);
} catch (Exception e) {
multiPort.Post(e);
}
}, null);
}
yield return Arbiter.MultipleItemReceive(multiPort, mc.Count,
delegate(ICollection<EmptyValue> wr, ICollection<Exception> ex) {
Console.WriteLine(wr.Count.ToString() + " succeeded. " + ex.Count.ToString() + " failed.");
});
yield break;
}
}
The bulk of the work is done within the iterator. It only yields two tasks, the first of which is a Choice, that detects whether the initial page fetch worked at all. If it didn't, we have no WebResponse and we yield break, which exits the iterator. If we did get a response then we parse the page for the MSDN blogs and then for each match we issue another asynchronous web request concurrently. Each successful response results in an EmptyValue signal, each failure an exception. Finally we yield to a MultipleItemReceive and output how many of these requests succeeded and how many failed.
This highly I/O-bound iterator only consumes threads for a tiny fraction of its elapsed time, mostly when doing the initial page parse and yet I still got to code it up in a basically sequential fashion.
I'll add a few further notes to this is a subsequent post.
More Concurrency Reading...
I just haven't had the time this week to write the CCR Iterators post. A hard deadline at work means that spare capacity is at a premium right now, and likely to be in the near future. In the meantime, a few articles related to concurrency. Probably preaching to the converted...
- One from Herb Sutter. A bit out of date now, but not dated. Also, a follow-up.
- An interesting introduction to Software Transactional Memory by Simon Peyton-Jones.
Patrick Dussud on CLR Garbage Collection
An interesting video on Channel9 featuring Patrick Dussud, now chief architect (and one-time principal developer) of the .NET garbage collector. There are lots of smart cookies working in the software industry, many without much fanfare. Kudos to Microsoft (and Patrick) for putting themselves forward. The rest of us benefit from this kind of effort.
Indirectly, I also re-acquainted myself with Maoni Stephens blog (also CLR GC). Well worth a look for CLR aficionados.
To those readers who maybe waiting for the next instalment of CCR 101, I intend to fulfill my promise to cover Iterators this week, work commitments allowing.
My Introduction to Erlang
I've just finished reading Joe Armstrong's PhD thesis entitled 'Making reliable distributed systems in the presence of software errors' (this is becoming a bit of a habit).
Anyway, I found it very clear and well presented. Although I don't know much about Erlang or switching systems, what Joe has to say about distributed software architectures has much in common with Decentralized Software Services (DSS). To paraphrase Joe's summary of the characteristics of Concurrency-Oriented Programming:
- Services are isolated from one another. A fault in one should not adversely affect another.
- Each service has an identifier, and is addressable by it.
- There is no shared state between services.
- Services interact via asynchronous messaging, which must be assumed to be unreliable.
- Services are units of concurrency. They run independently of one another.
CCR 101 - Part 8: Dispatchers
I've not covered the Dispatcher class before in any detail, and in all the examples so far, I've only used DispatcherQueues via their default constructor which causes all tasks to be scheduled over the CLR thread-pool. The CCR Dispatcher class provides a means of scheduling tasks over your own custom thread-pool.
Being in control of the thread-pool creation gives you some control over the properties of the threads themselves (albeit as a homogenous block - you can't set these properties on individual threads). These include:
- The name of the threads. This is a useful diagnostic property.
- Whether the threads will run in the background. Background threads will not keep the application active, should all the other foreground threads exit.
- The priority the threads will run at.
- The COM apartment state of the threads. This is useful for interop.
First though, let's adapt our first sample for use with a standard dispatcher:
class Sample0801 {
static void Main(string[] args) {
using (Dispatcher dispatcher = new Dispatcher()) {
using (DispatcherQueue dq = new DispatcherQueue("Default", dispatcher)) {
dq.Enqueue(new Task(delegate() {
Console.WriteLine("Hello world.");
}));
}
Console.ReadLine();
}
}
}
Straightforward enough. First we create our dispatcher. We use the default constructor. This will create a fixed number of threads (more on this later). All threads will run in the foreground at normal priority, their apartment state will be not be set and their names will be empty strings. Notice how we dispose of the dispatcher when we are finished with it. This ensures that all the threads are properly destroyed.
So why use a dispatcher at all? Why not just use a default-constructed DispatcherQueue and schedule items over the CLR thread-pool? Well, you might want to control some of aspects of the thread-pool via the properties discussed above. You might want to isolate your CCR task-execution threads from 3rd-party libraries that use the CLR thread-pool (or even other CCR tasks, via two or more dispatchers).
Another important reason is that the execution policies which govern how task-execution is constrained only work with custom dispatcher pools. We'll have a look at an example of this shortly.
You might even need the extra performance. Some of the flexibility of the CLR thread-pool - such as its dynamic sizing capabilities - comes at a small performance cost. Dispatcher-based thread-pools have a fixed number of threads and because there is less management of the threads, they have been observed to be more performant in some situations. Your mileage may very.
So how many threads in a dispatcher-pool? Well, the default constructor will create 1 thread-per-CPU on a multi-processor box, and two threads on a uniprocessor box. You can use one of the constructor overloads to specify an exact number of threads, or you can set the static Dispatcher.ThreadsPerCpu property prior to using the default constructor. And even with the overloads you can always pass 0 to the number of threads, and let the CCR work it out for you.
COM Interop
Whilst it might be preferable to have all your application in managed code, there's a good chance that you still have COM components that form part of your existing application and which you'll need to interop with in the short term. In principle, if your components are 'Both' or 'Free' threaded, then you don't have to do anything - they'll play happily in the same threads as your tasks without creating any COM proxies, and should be addressable without said proxies from those threads.
If however, those components are marked as 'Apartment' then you need to tread carefully (as always). If you create these components on a vanilla dispatcher thread, you'll get a proxy. When you call across through this proxy, you'll incur the marshaling cost, but more importantly, the calling thread will be forced to sleep, whilst the recipient thread processes the call. What's worse, is that in the absence of an appropriate STA thread, the COM runtime will spin one up for you and put the COM object in that thread. In fact, it will spin up only one and put all STA instances on that thread, forming another serialization bottleneck.
What you do about this depends on how you use the COM object. If you can get away with a create-use-destroy model, then you can create n STA threads under a dispatcher register tasks using that model and just schedule receives to do the work. The following (albeit contrived) example creates 2 STA threads and then posts a receive which creates an instance of the Windows Media Player, calls a method on it and then releases it.
class Sample0802 {
static void Main(string[] args) {
using (Dispatcher dispatcher = new Dispatcher(
2, ThreadPriority.Normal, DispatcherOptions.None, ApartmentState.STA, "STAPool")) {
using (DispatcherQueue dq = new DispatcherQueue("Default", dispatcher)) {
Port<EmptyValue> signalPort = new Port<EmptyValue>();
Arbiter.Activate(dq, Arbiter.Receive(true, signalPort, delegate(EmptyValue ev) {
IWMPPlayer player = new WindowsMediaPlayerClass();
Console.WriteLine(player.versionInfo);
Marshal.ReleaseComObject(player);
}));
string line = Console.ReadLine();
while (line != "quit") {
signalPort.Post(EmptyValue.SharedInstance);
line = Console.ReadLine();
}
}
}
}
}
If the COM object is long-lived, then you need a different strategy, which basically involves creating single-threaded STA dispatchers and the judicious use of IterativeTasks, so I shall defer that sample until I've covered iterators properly.
Task Execution Policies
The following examples shows how task execution policies can be used to constrain scheduling. The first generates an int every second, but the receiver takes two seconds to process it. We use a scheduling policy that constrains the queue depth to 1, which effectively means that a newly-arrived item will replace an unprocessed item. If the receiver can't keep up, it will only have to process the latest item.
class Sample0803 {
static void Main(string[] args) {
using (Dispatcher dispatcher = new Dispatcher(1, "PolicyDemo")) {
using (DispatcherQueue dq = new DispatcherQueue(
"Default", dispatcher, TaskExecutionPolicy.ConstrainQueueDepthDiscardTasks, 1)) {
Port<int> intPort = new Port<int>();
Arbiter.Activate(dq, Arbiter.Receive(true, intPort, delegate(int i) {
Console.WriteLine(i.ToString());
Thread.Sleep(TimeSpan.FromSeconds(2));
}));
for (int i = 0; i < 20; ++i) {
intPort.Post(i);
Thread.Sleep(TimeSpan.FromSeconds(1));
}
Console.ReadLine();
}
}
}
}
You might have noticed that I've created a single threaded Dispatcher here. That's really to force the demo to work. If I use a default dispatcher on the machine I'm currently sitting at, which has a single hyper-threaded CPU, I get two threads, and two threads is enough to process all the items in turn. Even if one thread blocks for two seconds, the other thread is free to process the next item.
That's all I want to cover about Dispatchers for now. I'll come back to them soon when I demonstrate a strategy for dealing with long-running STA objects, but the next topic will be about Iterators or IterativeTasks, arguably one of the most powerful features of the CCR.
CCR 101 - Part 7: Interleave
The Interleave is essentially an arbitration with reader-writer scheduling semantics. It does this by splitting the Receiver tasks it arbitrates into 3 categories:
- Concurrent receivers. Any receivers in this group can be scheduled concurrently with any other receivers from this group, providing no exclusive receivers (see below) are active or pending.
- Exclusive receivers. A receiver from this group will only be scheduled when there are no other exclusive or concurrent receivers active.
- Teardown receivers. A receiver in this group will also run exclusively but causes the Interleave to 'retire'. Once a receiver in the group has been scheduled, all other receivers are abandoned and will not be scheduled.
Note that none of these categories need have any receivers at all.
There are a few more interesting things about Interleaves:
- Exclusive receivers have their order retained. Remember how we saw that a persistent Receive (without an outer arbitration to control the scheduling) would schedule tasks as fast as items arrived, leading to out-of-order processing across multiple threads? Well, by placing that Receiver within the set of exclusive Receivers, the processing will remain in order. This is because the Interleave is concerned with the completion of the Receiver task, not just its scheduling.
- Interleaves may be combined. You can 'merge' one interleave with another. Each group (concurrent, exclusive, teardown) is merged and behaves as if you had declared the single more complex interleave.
- Interleaves still offer opportunities for deadlock and I'll cover these in the post that deals with IterativeTasks, because the use of the latter within the former is where they can occur.
Let's look at example. This one is more complicated than the others we've seen so far and I've burnished it with a few comments to help illustrate the salient points. We use the Interleave to protect a counter that increments every second and allows the the client to query the current value.
class Sample0701 {
static void Main(string[] args) {
using (DispatcherQueue dq = new DispatcherQueue()) {
ManualResetEvent done = new ManualResetEvent(false);
int counter = 0;
// Signal port - indicates the counter should be incremented
Port<EmptyValue> signalPort = new Port<EmptyValue>();
// Query port - indicates that the counter should be read
Port<EmptyValue> queryPort = new Port<EmptyValue>();
// Quit port - indicates that the interleave should exit
Port<EmptyValue> quitPort = new Port<EmptyValue>();
// Declare the interleave
Interleave il = new Interleave(
new TeardownReceiverGroup(Arbiter.Receive(false, quitPort, delegate {
Console.WriteLine("Finished.");
done.Set();
})),
new ExclusiveReceiverGroup(Arbiter.Receive(true, signalPort, delegate {
++counter;
})),
new ConcurrentReceiverGroup(Arbiter.Receive(true, queryPort, delegate {
Console.WriteLine(counter.ToString());
})));
// Then activate it
Arbiter.Activate(dq, il);
// Now start a timer that will fire every second, then read the console. An empty
// line causes the program to quit.
using (Timer t = new Timer(delegate { signalPort.Post(EmptyValue.SharedInstance); })) {
t.Change(0, 1000);
string line = Console.ReadLine();
while (line.Length > 0) {
if (line == "q")
queryPort.Post(EmptyValue.SharedInstance);
line = Console.ReadLine();
}
quitPort.Post(EmptyValue.SharedInstance);
}
done.WaitOne();
Console.ReadLine();
}
}
}
We create three ports. One carries a request to increment the counter, one to query the counter and one to shutdown. We create our Interleave arbitration over these three ports, then activate it. Then we activate a timer which fires every second, posting a message to the appropriate port when it does so. Then we sit in a loop, reading the console.
Now consider, if we wanted to support an extra message that would reset the counter. This requires a new port (in this example at least) and an additional exclusive receiver. I've altered the above sample, to show how to do this, highlighting the amendments in bold.
class Sample0702 {
static void Main(string[] args) {
using (DispatcherQueue dq = new DispatcherQueue()) {
ManualResetEvent done = new ManualResetEvent(false);
int counter = 0;
// Signal port - indicates the counter should be incremented
Port<EmptyValue> signalPort = new Port<EmptyValue>();
// Reset port
Port<EmptyValue> resetPort = new Port<EmptyValue>();
// Query port - indicates that the counter should be read
Port<EmptyValue> queryPort = new Port<EmptyValue>();
// Quit port - indicates that the interleave should exit
Port<EmptyValue> quitPort = new Port<EmptyValue>();
// Declare the interleave
Interleave il = new Interleave(
new TeardownReceiverGroup(Arbiter.Receive(false, quitPort, delegate {
Console.WriteLine("Finished.");
done.Set();
})),
new ExclusiveReceiverGroup(
Arbiter.Receive(true, signalPort, delegate {
++counter;
}),
Arbiter.Receive(true, resetPort, delegate {
counter = 0;
})),
new ConcurrentReceiverGroup(Arbiter.Receive(true, queryPort, delegate {
Console.WriteLine(counter.ToString());
})));
// Then activate it
Arbiter.Activate(dq, il);
// Now start a timer that will fire every second, then read the console. An empty
// line causes the program to quit.
using (Timer t = new Timer(delegate { signalPort.Post(EmptyValue.SharedInstance); })) {
t.Change(0, 1000);
string line = Console.ReadLine();
while (line.Length > 0) {
if (line == "q")
queryPort.Post(EmptyValue.SharedInstance);
else if (line == "r")
resetPort.Post(EmptyValue.SharedInstance);
line = Console.ReadLine();
}
quitPort.Post(EmptyValue.SharedInstance);
}
done.WaitOne();
Console.ReadLine();
}
}
}
Note, that I could have combined two Interleaves to the same effect. The following snippet illustrates how I might have done that:
// Declare the interleave
Interleave il = new Interleave(
new TeardownReceiverGroup(Arbiter.Receive(false, quitPort, delegate {
Console.WriteLine("Finished.");
done.Set();
})),
new ExclusiveReceiverGroup(Arbiter.Receive(true, signalPort, delegate {
++counter;
})),
new ConcurrentReceiverGroup(Arbiter.Receive(true, queryPort, delegate {
Console.WriteLine(counter.ToString());
})));
// Declare the second interleave
Interleave il2 = new Interleave(
new TeardownReceiverGroup(),
new ExclusiveReceiverGroup(Arbiter.Receive(true, resetPort, delegate {
counter = 0;
})),
new ConcurrentReceiverGroup());
// Combine them
il.CombineWith(il2);
// Then activate it
Arbiter.Activate(dq, il);
In my next post, I'll cover Dispatchers, custom thread-pools and COM apartment interoperability.
CCR 101 - Part 6: Arbitrations (Join)
Actually, we've covered Joins already. I just forgot to mention it. The MultipleItemReceives we covered in Part 4 are joins, i.e. they wait for all the ports to produce n messages. All Joins (like their name suggests) are just rendezvous points for a set of messages and are an important construct - without them, we wouldn't easily be able to gather the results of a parallelized operation.
Whilst the most useful arbiters are typically those that allow you to deal with a concurrency problem whose size isn't known until runtime, there exists a join across two ports that is explicitly provided for by the Arbiter class (not unlike Choice in fact). The following sample takes a number, n, and a string from the console and prints that string n times.
class Sample0601 {
static void Main(string[] args) {
using (DispatcherQueue dq = new DispatcherQueue()) {
Port<int> intPort = new Port<int>();
Port<string> stringPort = new Port<string>();
Arbiter.Activate(dq, Arbiter.JoinedReceive(false, intPort, stringPort,
delegate(int n, string s) {
for (int i = 0; i < n; ++i) {
Console.WriteLine(s);
}
}));
Console.Write("Enter a number: ");
intPort.Post(Int32.Parse(Console.ReadLine()));
Console.Write("Enter a phrase: ");
stringPort.Post(Console.ReadLine());
Console.ReadLine();
}
}
}
In the next post, I'll introduce the final arbitration, Interleave. Conceptually, its a message-passing version of the Reader-Writer lock, scheduling readers and writers in a way that balances concurrency with correctness, albeit in an asynchronous fashion. Nearly every resource with some modifiable state can be protected by an Interleave and indeed it forms a core component of every service in the DSS.
Once the section on Arbitrations is complete, I want to cover the following (in order).
- Dispatchers. So far we've only really looked at DispatcherQueues running across the CLR thread-pool. By using our own Dispatchers we can control the number of threads and various properties of those threads (such as COM Apartment type).
- Iterative Tasks. Hugely important to the CCR, IterativeTasks exploit C# 2.0 iterator functionality to allow logical sequences of asynchronous methods to be written without ever blocking a physical thread.
- Causalities. A cross-thread extension of exceptions, they allow errors raised in some scheduled task, to propagate through their causalities, back to the instigator of the operation.
Longer term, I'll be posting some useful samples of code, illustrating usage with asynchronous sockets, asynchronous database interaction, interoperability with COM and UI threads and how the CCR can power your WCF-enabled server (or async ASP.NET web pages). If you have any specific requests, please leave them in the comments section.
Web Buffoon
I notice that this blog doesn't render properly on IE6. I have figured out a fix of sorts (something to do with the text in a fixed-length DIV overflowing) with some help, but need to make sure it doesn't break the experience for IE7 and Firefox users, before I apply it. As my CSS skills fall somewhere short of useless, this may take a couple of days.
I've also noticed that the code is considerably less legible on Firefox than on IE. I'll try and fix that too. Thank you for your patience.
CCR 101 - Part 5: Arbitrations (Choice)
Our first 'proper' arbitration is Choice. It takes one or more (normally at least two to be useful) Receiver tasks and schedules the first eligible one. The others will never be scheduled. An example will suffice:
class Sample0501 {
static void Main(string[] args) {
using (DispatcherQueue dq = new DispatcherQueue()) {
Port<int> intPort = new Port<int>();
Port<string> stringPort = new Port<string>();
Port<EmptyValue> emptyPort = new Port<EmptyValue>();
Arbiter.Activate(dq, Arbiter.Choice(
Arbiter.Receive(false, intPort, delegate(int i) {
Console.WriteLine("{0} is an int.", i.ToString());
}),
Arbiter.Receive(false, stringPort, delegate(string s) {
Console.WriteLine("{0} is a string.", s);
}),
Arbiter.Receive(false, emptyPort, delegate(EmptyValue e) {
Console.WriteLine("An empty line.");
})));
Console.Write("Enter a value: ");
string input = Console.ReadLine();
if (input.Length == 0) {
emptyPort.Post(EmptyValue.SharedInstance);
} else {
int intInput;
if (Int32.TryParse(input, out intInput))
intPort.Post(intInput);
else
stringPort.Post(input);
}
Console.ReadLine();
}
}
}
Only the first half of this program is of any interest. First, we create 3 ports. The last port is of type EmptyValue, a CCR type that can be useful for indicating that something happened even if there is no data associated with the message. Then, we create and activate our Choice arbitration. In the above example, we pass three Receiver tasks to the Arbiter.Choice call.
When a message is posted to any one of the ports, the corresponding receiver task becomes eligible for scheduling. However, it is the Choice arbitration that determines whether it will actually be scheduled. The Choice operates a simple first-come, first-served policy. As soon as a single receiver is scheduled, all the others in the Choice are doomed. This can be demonstrated by modifying the example so that each port is posted to at the end anyway. You'll see that the receivers don't fire.
class Sample0502 {
static void Main(string[] args) {
using (DispatcherQueue dq = new DispatcherQueue()) {
Port<int> intPort = new Port<int>();
Port<string> stringPort = new Port<string>();
Port<EmptyValue> emptyPort = new Port<EmptyValue>();
Arbiter.Activate(dq, Arbiter.Choice(
Arbiter.Receive(false, intPort, delegate(int i) {
Console.WriteLine("{0} is an int.", i.ToString());
}),
Arbiter.Receive(false, stringPort, delegate(string s) {
Console.WriteLine("{0} is a string.", s);
}),
Arbiter.Receive(false, emptyPort, delegate(EmptyValue e) {
Console.WriteLine("An empty line.");
})));
Console.Write("Enter a value: ");
string input = Console.ReadLine();
if (input.Length == 0) {
emptyPort.Post(EmptyValue.SharedInstance);
} else {
int intInput;
if (Int32.TryParse(input, out intInput))
intPort.Post(intInput);
else
stringPort.Post(input);
}
intPort.Post(0);
stringPort.Post("Hello World.");
emptyPort.Post(EmptyValue.SharedInstance);
Console.ReadLine();
}
}
}
Note that none of the Receivers is persistent (the first parameter is false in Arbiter.Receive). Choice requires this and will throw a runtime exception if you try and pass a persistent Receive to it. The reason for this is simple: Choice picks one from n Receives and then ignores the rest, so there's actually no point in any of them being persistent.
Choice Arbitrations for determining success/failure.
Perhaps the most common use of Choice is determining whether some asynchronous operation succeeded or failed. In fact, it's so common, the Arbiter class has a particular overload of the Choice method that takes a 2-ary PortSet and has a more direct syntax for declaring the handlers for either outcome. Look at the example below. It attempts to resolve a hostname to an IP address via an asynchronous operation.
class Sample0503 {
static void Main(string[] args) {
using (DispatcherQueue dq = new DispatcherQueue()) {
PortSet<IPAddress[], Exception> result = new PortSet<IPAddress[], Exception>();
Arbiter.Activate(dq, Arbiter.Choice(result,
delegate(IPAddress[] addresses) {
for (int i = 0; i < addresses.Length; ++i) {
Console.WriteLine("{0}. {1}", i.ToString(), addresses[i].ToString());
}
},
delegate(Exception e) {
Console.WriteLine(e.Message);
}));
Console.Write("Enter host: ");
string hostname = Console.ReadLine();
Dns.BeginGetHostAddresses(hostname, delegate(IAsyncResult iar) {
try {
result.Post(Dns.EndGetHostAddresses(iar));
} catch (Exception e) {
result.Post(e);
}
}, null);
Console.ReadLine();
}
}
}
First, we define a PortSet with two types; the first an array of IPAddresses that represents the successful outcome, the second an exception which represents failure. Next, we create and activate our Choice using the more concise syntax available (no explicit Receives).
In the bottom half of the program, we issue the call via the Dns class. Note how I've used an anonymous delegate to implement the operation's callback, posting to the result portset in both the success and failure case. This is a very common implementation pattern when wrapping APM methods with CCR ports.
CCR 101 - Part 4: Arbitrations (Receivers)
So far we've seen how to queue simple tasks to a thread-pool and how to post messages to ports. In this tutorial, I'll introduce arbitrations and an important type in the CCR library, the Arbiter class.
In the previous tutorial, I demonstrated how to post and retrieve messages to ports. Most of the time however, we won't explicitly dequeue messages, rather we'll define the code that should run when a message arrives on a port. The CCR then takes care of dequeueing the item for us.
Receivers
Receivers are simplest from of message handler. In fact, they're so simple they aren't really an arbitration at all. They simply process messages as they get dequeued from the port. Let's look at example:
class Sample0401 {
static void Main(string[] args) {
using (DispatcherQueue dq = new DispatcherQueue()) {
Port<string> port = new Port<string>();
Arbiter.Activate(dq, Arbiter.Receive(false, port, delegate(string item) {
Console.WriteLine(item);
}));
port.Post("Hello World.");
Console.ReadLine();
}
}
}
There's quite a lot of new stuff going on here, mostly on the Arbiter.Activate line, but let's go through the code:
- First, we create our dispatcher queue. This is where our message-handling code is going to be scheduled. We used the default constructor, so this queue will schedule items across the CLR thread-pool.
- Next, we create a single port of strings.
- Then, we set-up the handler to process messages on that port. You'll see two calls to the static Arbiter class here, one to Receive which simply creates a Receiver for you, and one to Activate which is where the task is actually activated. Without this activation step, the handler will never run.
- We then post a message onto the port.
Once activated, the Receiver is triggered on the presence of an item in the port. It removes the item and schedules the handler to run against it. Simple ;-)
One important concept to understand is that it doesn't matter if the arbitration is activated before or after the message has been posted. For example, the following example (which swaps the Arbiter.Activate and port.Post lines) produces exactly the same result.
class Sample0402 {
static void Main(string[] args) {
using (DispatcherQueue dq = new DispatcherQueue()) {
Port<string> port = new Port<string>();
port.Post("Hello World.");
Arbiter.Activate(dq, Arbiter.Receive(false, port, delegate(string item) {
Console.WriteLine(item);
}));
Console.ReadLine();
}
}
}
Another item worth discussing is the first parameter to the Arbiter.Receive call. This is a flag that indicates whether the handler will be persistent. A persistent-handler will process all messages on the port, whereas a transient handler processes just one message. We can illustrate this with the following example:
class Sample0403 {
static void Main(string[] args) {
using (DispatcherQueue dq = new DispatcherQueue()) {
Port<string> port = new Port<string>();
port.Post("Hello World.");
port.Post("Goodbye World.");
Arbiter.Activate(dq, Arbiter.Receive(false, port, delegate(string item) {
Console.WriteLine(item);
}));
Console.ReadLine();
}
}
}
The second message, 'Goodbye World.' never appears on the console. If we change the persistent parameter to true...
class Sample0404 {
static void Main(string[] args) {
using (DispatcherQueue dq = new DispatcherQueue()) {
Port<string> port = new Port<string>();
port.Post("Hello World.");
port.Post("Goodbye World.");
Arbiter.Activate(dq, Arbiter.Receive(true, port, delegate(string item) {
Console.WriteLine(item);
}));
Console.ReadLine();
}
}
}
... then it does.
Message Ordering
It's possible that the order in which the messages appear in the console in the example above is different to the order in which they were posted. What's going on?
Well, the CCR is dequeueing the items in order, but once the handlers are scheduled (not executed), it'll just go back and fetch the next. If multiple items are scheduled either across the CLR thread-pool or a custom thread-pool, then we are really at the mercy of the underlying thread-scheduling mechanism. We will see in a later tutorial how we can control the ordering of messages.
Predicates
We can perform certain filtering of message via Predicates. The following sample handles odd and even numbers differently on a port of integers.
class Sample0405 {
static void Main(string[] args) {
using (DispatcherQueue dq = new DispatcherQueue()) {
Port<int> port = new Port<int>();
Arbiter.Activate(dq, Arbiter.Receive(true, port, delegate(int item) {
Console.WriteLine("Odd: " + item.ToString());
},
delegate(int item) {
return item % 2 == 1;
}));
Arbiter.Activate(dq, Arbiter.Receive(true, port, delegate(int item) {
Console.WriteLine("Even: " + item.ToString());
},
delegate(int item) {
return item % 2 == 0;
}));
for (int i = 0; i < 10; ++i)
port.Post(i);
Console.ReadLine();
}
}
}
We've activated 2 receivers here, and passed a predicate to each (again as anonymous delegates).
Arbiter.Receive
The Arbiter static class supports a number of methods that each return an aribtration. These are really convenience methods - For example, the first sample in this post can actually be written without the Receive call, by constructing the Receive arbitration directly:
class Sample0406 {
static void Main(string[] args) {
using (DispatcherQueue dq = new DispatcherQueue()) {
Port<string> port = new Port<string>();
Arbiter.Activate(dq, new Receiver<string>(false, port, delegate(string item) {
return true;
},
new Task<string>(delegate(string item) {
Console.WriteLine(item);
})));
port.Post("Hello World.");
Console.ReadLine();
}
}
}
This is more verbose, but it does demonstrate what is actually going on. I think it also conveys better the two-stage arbitration process i.e. creation then activation, but perhaps that's really because Arbiter.Receive would have been better named Arbiter.CreateReceive.
Batching Messages
Sometimes you want to process message in batches. CCR allows you to do this in a number of ways. Let's have a look at the simplest way to do this. The following example processes batches of 10 numbers at a time:
class Sample0407 {
static void Main(string[] args) {
using (DispatcherQueue dq = new DispatcherQueue()) {
Port<int> port = new Port<int>();
Arbiter.Activate(dq, Arbiter.MultipleItemReceive(true, port, 10, delegate(int[] ints) {
StringBuilder sb = new StringBuilder();
foreach (int item in ints)
sb.Append(item.ToString()).Append(" ");
Console.WriteLine(sb.ToString());
}));
for (int i = 0; i < 101; ++i)
port.Post(i);
Console.ReadLine();
Console.WriteLine(port.ItemCount.ToString());
Console.ReadLine();
}
}
}
Once, this sample has dumped its numbers, you can hit Enter and it will output the number of items still in the port. The above sample posted 101 items, then processed 10 at a time (10 times), leaving 1 in the port. This is an example of an atomic receive. If the batch can't be completed i.e. there are less than 10 items in the queue, then the item(s) will not logically be removed from the queue. There are other types of atomic arbitrations which will be covered in a future tutorial.
Sometimes you want to batch messages across ports. This is commonly used in scatter/gather operations. Consider this scenario:
- Your application needs to issue n operations.
- Any or all these operations might fail.
- Each operation can run independently of the others.
- Once all the operations are complete, process the results.
The following sample simulates this through another variant of MultipleItemReceive.
class Sample0408 {
static void Main(string[] args) {
using (DispatcherQueue dq = new DispatcherQueue()) {
Console.Write("Enter number of operations: ");
int numOps = Int32.Parse(Console.ReadLine());
SuccessFailurePort results = new SuccessFailurePort();
Arbiter.Activate(dq, Arbiter.MultipleItemReceive(results, numOps,
delegate(ICollection<SuccessResult> successes, ICollection<Exception> failures) {
Console.WriteLine("{0} successes, {1} failures.", successes.Count, failures.Count);
}));
// Simulate async operations with 10% chance of failure.
Random r = new Random();
for (int i = 0; i < numOps; ++i) {
Arbiter.Activate(dq, Arbiter.FromHandler(delegate() {
if (r.Next(10) < 9)
results.Post(new SuccessResult());
else
results.Post(new Exception());
}));
}
Console.ReadLine();
}
}
}
We create a SuccessFailurePort portset (discussed previously), and then activate a MultipleItemReceiver arbitration that specifies that when numOps messages have been received, regardless of which sub-port they were posted on, then fire this handler.
[The Arbiter.FromHandler call is a helper to schedule a delegate directly onto the dispatcher queue].
Wrap-up
That covers quite a lot about receivers. I'll leave the rest for future posts. In the next tutorial I'd like to cover our first proper simple arbitration, Choice.