My Introduction to Erlang
I've just finished reading Joe Armstrong's PhD thesis entitled 'Making reliable distributed systems in the presence of software errors' (this is becoming a bit of a habit).
Anyway, I found it very clear and well presented. Although I don't know much about Erlang or switching systems, what Joe has to say about distributed software architectures has much in common with Decentralized Software Services (DSS). To paraphrase Joe's summary of the characteristics of Concurrency-Oriented Programming:
- Services are isolated from one another. A fault in one should not adversely affect another.
- Each service has an identifier, and is addressable by it.
- There is no shared state between services.
- Services interact via asynchronous messaging, which must be assumed to be unreliable.
- Services are units of concurrency. They run independently of one another.
CCR 101 - Part 8: Dispatchers
I've not covered the Dispatcher class before in any detail, and in all the examples so far, I've only used DispatcherQueues via their default constructor which causes all tasks to be scheduled over the CLR thread-pool. The CCR Dispatcher class provides a means of scheduling tasks over your own custom thread-pool.
Being in control of the thread-pool creation gives you some control over the properties of the threads themselves (albeit as a homogenous block - you can't set these properties on individual threads). These include:
- The name of the threads. This is a useful diagnostic property.
- Whether the threads will run in the background. Background threads will not keep the application active, should all the other foreground threads exit.
- The priority the threads will run at.
- The COM apartment state of the threads. This is useful for interop.
First though, let's adapt our first sample for use with a standard dispatcher:
class Sample0801 {
static void Main(string[] args) {
using (Dispatcher dispatcher = new Dispatcher()) {
using (DispatcherQueue dq = new DispatcherQueue("Default", dispatcher)) {
dq.Enqueue(new Task(delegate() {
Console.WriteLine("Hello world.");
}));
}
Console.ReadLine();
}
}
}
Straightforward enough. First we create our dispatcher. We use the default constructor. This will create a fixed number of threads (more on this later). All threads will run in the foreground at normal priority, their apartment state will be not be set and their names will be empty strings. Notice how we dispose of the dispatcher when we are finished with it. This ensures that all the threads are properly destroyed.
So why use a dispatcher at all? Why not just use a default-constructed DispatcherQueue and schedule items over the CLR thread-pool? Well, you might want to control some of aspects of the thread-pool via the properties discussed above. You might want to isolate your CCR task-execution threads from 3rd-party libraries that use the CLR thread-pool (or even other CCR tasks, via two or more dispatchers).
Another important reason is that the execution policies which govern how task-execution is constrained only work with custom dispatcher pools. We'll have a look at an example of this shortly.
You might even need the extra performance. Some of the flexibility of the CLR thread-pool - such as its dynamic sizing capabilities - comes at a small performance cost. Dispatcher-based thread-pools have a fixed number of threads and because there is less management of the threads, they have been observed to be more performant in some situations. Your mileage may very.
So how many threads in a dispatcher-pool? Well, the default constructor will create 1 thread-per-CPU on a multi-processor box, and two threads on a uniprocessor box. You can use one of the constructor overloads to specify an exact number of threads, or you can set the static Dispatcher.ThreadsPerCpu property prior to using the default constructor. And even with the overloads you can always pass 0 to the number of threads, and let the CCR work it out for you.
COM Interop
Whilst it might be preferable to have all your application in managed code, there's a good chance that you still have COM components that form part of your existing application and which you'll need to interop with in the short term. In principle, if your components are 'Both' or 'Free' threaded, then you don't have to do anything - they'll play happily in the same threads as your tasks without creating any COM proxies, and should be addressable without said proxies from those threads.
If however, those components are marked as 'Apartment' then you need to tread carefully (as always). If you create these components on a vanilla dispatcher thread, you'll get a proxy. When you call across through this proxy, you'll incur the marshaling cost, but more importantly, the calling thread will be forced to sleep, whilst the recipient thread processes the call. What's worse, is that in the absence of an appropriate STA thread, the COM runtime will spin one up for you and put the COM object in that thread. In fact, it will spin up only one and put all STA instances on that thread, forming another serialization bottleneck.
What you do about this depends on how you use the COM object. If you can get away with a create-use-destroy model, then you can create n STA threads under a dispatcher register tasks using that model and just schedule receives to do the work. The following (albeit contrived) example creates 2 STA threads and then posts a receive which creates an instance of the Windows Media Player, calls a method on it and then releases it.
class Sample0802 {
static void Main(string[] args) {
using (Dispatcher dispatcher = new Dispatcher(
2, ThreadPriority.Normal, DispatcherOptions.None, ApartmentState.STA, "STAPool")) {
using (DispatcherQueue dq = new DispatcherQueue("Default", dispatcher)) {
Port<EmptyValue> signalPort = new Port<EmptyValue>();
Arbiter.Activate(dq, Arbiter.Receive(true, signalPort, delegate(EmptyValue ev) {
IWMPPlayer player = new WindowsMediaPlayerClass();
Console.WriteLine(player.versionInfo);
Marshal.ReleaseComObject(player);
}));
string line = Console.ReadLine();
while (line != "quit") {
signalPort.Post(EmptyValue.SharedInstance);
line = Console.ReadLine();
}
}
}
}
}
If the COM object is long-lived, then you need a different strategy, which basically involves creating single-threaded STA dispatchers and the judicious use of IterativeTasks, so I shall defer that sample until I've covered iterators properly.
Task Execution Policies
The following examples shows how task execution policies can be used to constrain scheduling. The first generates an int every second, but the receiver takes two seconds to process it. We use a scheduling policy that constrains the queue depth to 1, which effectively means that a newly-arrived item will replace an unprocessed item. If the receiver can't keep up, it will only have to process the latest item.
class Sample0803 {
static void Main(string[] args) {
using (Dispatcher dispatcher = new Dispatcher(1, "PolicyDemo")) {
using (DispatcherQueue dq = new DispatcherQueue(
"Default", dispatcher, TaskExecutionPolicy.ConstrainQueueDepthDiscardTasks, 1)) {
Port<int> intPort = new Port<int>();
Arbiter.Activate(dq, Arbiter.Receive(true, intPort, delegate(int i) {
Console.WriteLine(i.ToString());
Thread.Sleep(TimeSpan.FromSeconds(2));
}));
for (int i = 0; i < 20; ++i) {
intPort.Post(i);
Thread.Sleep(TimeSpan.FromSeconds(1));
}
Console.ReadLine();
}
}
}
}
You might have noticed that I've created a single threaded Dispatcher here. That's really to force the demo to work. If I use a default dispatcher on the machine I'm currently sitting at, which has a single hyper-threaded CPU, I get two threads, and two threads is enough to process all the items in turn. Even if one thread blocks for two seconds, the other thread is free to process the next item.
That's all I want to cover about Dispatchers for now. I'll come back to them soon when I demonstrate a strategy for dealing with long-running STA objects, but the next topic will be about Iterators or IterativeTasks, arguably one of the most powerful features of the CCR.
CCR 101 - Part 7: Interleave
The Interleave is essentially an arbitration with reader-writer scheduling semantics. It does this by splitting the Receiver tasks it arbitrates into 3 categories:
- Concurrent receivers. Any receivers in this group can be scheduled concurrently with any other receivers from this group, providing no exclusive receivers (see below) are active or pending.
- Exclusive receivers. A receiver from this group will only be scheduled when there are no other exclusive or concurrent receivers active.
- Teardown receivers. A receiver in this group will also run exclusively but causes the Interleave to 'retire'. Once a receiver in the group has been scheduled, all other receivers are abandoned and will not be scheduled.
Note that none of these categories need have any receivers at all.
There are a few more interesting things about Interleaves:
- Exclusive receivers have their order retained. Remember how we saw that a persistent Receive (without an outer arbitration to control the scheduling) would schedule tasks as fast as items arrived, leading to out-of-order processing across multiple threads? Well, by placing that Receiver within the set of exclusive Receivers, the processing will remain in order. This is because the Interleave is concerned with the completion of the Receiver task, not just its scheduling.
- Interleaves may be combined. You can 'merge' one interleave with another. Each group (concurrent, exclusive, teardown) is merged and behaves as if you had declared the single more complex interleave.
- Interleaves still offer opportunities for deadlock and I'll cover these in the post that deals with IterativeTasks, because the use of the latter within the former is where they can occur.
Let's look at example. This one is more complicated than the others we've seen so far and I've burnished it with a few comments to help illustrate the salient points. We use the Interleave to protect a counter that increments every second and allows the the client to query the current value.
class Sample0701 {
static void Main(string[] args) {
using (DispatcherQueue dq = new DispatcherQueue()) {
ManualResetEvent done = new ManualResetEvent(false);
int counter = 0;
// Signal port - indicates the counter should be incremented
Port<EmptyValue> signalPort = new Port<EmptyValue>();
// Query port - indicates that the counter should be read
Port<EmptyValue> queryPort = new Port<EmptyValue>();
// Quit port - indicates that the interleave should exit
Port<EmptyValue> quitPort = new Port<EmptyValue>();
// Declare the interleave
Interleave il = new Interleave(
new TeardownReceiverGroup(Arbiter.Receive(false, quitPort, delegate {
Console.WriteLine("Finished.");
done.Set();
})),
new ExclusiveReceiverGroup(Arbiter.Receive(true, signalPort, delegate {
++counter;
})),
new ConcurrentReceiverGroup(Arbiter.Receive(true, queryPort, delegate {
Console.WriteLine(counter.ToString());
})));
// Then activate it
Arbiter.Activate(dq, il);
// Now start a timer that will fire every second, then read the console. An empty
// line causes the program to quit.
using (Timer t = new Timer(delegate { signalPort.Post(EmptyValue.SharedInstance); })) {
t.Change(0, 1000);
string line = Console.ReadLine();
while (line.Length > 0) {
if (line == "q")
queryPort.Post(EmptyValue.SharedInstance);
line = Console.ReadLine();
}
quitPort.Post(EmptyValue.SharedInstance);
}
done.WaitOne();
Console.ReadLine();
}
}
}
We create three ports. One carries a request to increment the counter, one to query the counter and one to shutdown. We create our Interleave arbitration over these three ports, then activate it. Then we activate a timer which fires every second, posting a message to the appropriate port when it does so. Then we sit in a loop, reading the console.
Now consider, if we wanted to support an extra message that would reset the counter. This requires a new port (in this example at least) and an additional exclusive receiver. I've altered the above sample, to show how to do this, highlighting the amendments in bold.
class Sample0702 {
static void Main(string[] args) {
using (DispatcherQueue dq = new DispatcherQueue()) {
ManualResetEvent done = new ManualResetEvent(false);
int counter = 0;
// Signal port - indicates the counter should be incremented
Port<EmptyValue> signalPort = new Port<EmptyValue>();
// Reset port
Port<EmptyValue> resetPort = new Port<EmptyValue>();
// Query port - indicates that the counter should be read
Port<EmptyValue> queryPort = new Port<EmptyValue>();
// Quit port - indicates that the interleave should exit
Port<EmptyValue> quitPort = new Port<EmptyValue>();
// Declare the interleave
Interleave il = new Interleave(
new TeardownReceiverGroup(Arbiter.Receive(false, quitPort, delegate {
Console.WriteLine("Finished.");
done.Set();
})),
new ExclusiveReceiverGroup(
Arbiter.Receive(true, signalPort, delegate {
++counter;
}),
Arbiter.Receive(true, resetPort, delegate {
counter = 0;
})),
new ConcurrentReceiverGroup(Arbiter.Receive(true, queryPort, delegate {
Console.WriteLine(counter.ToString());
})));
// Then activate it
Arbiter.Activate(dq, il);
// Now start a timer that will fire every second, then read the console. An empty
// line causes the program to quit.
using (Timer t = new Timer(delegate { signalPort.Post(EmptyValue.SharedInstance); })) {
t.Change(0, 1000);
string line = Console.ReadLine();
while (line.Length > 0) {
if (line == "q")
queryPort.Post(EmptyValue.SharedInstance);
else if (line == "r")
resetPort.Post(EmptyValue.SharedInstance);
line = Console.ReadLine();
}
quitPort.Post(EmptyValue.SharedInstance);
}
done.WaitOne();
Console.ReadLine();
}
}
}
Note, that I could have combined two Interleaves to the same effect. The following snippet illustrates how I might have done that:
// Declare the interleave
Interleave il = new Interleave(
new TeardownReceiverGroup(Arbiter.Receive(false, quitPort, delegate {
Console.WriteLine("Finished.");
done.Set();
})),
new ExclusiveReceiverGroup(Arbiter.Receive(true, signalPort, delegate {
++counter;
})),
new ConcurrentReceiverGroup(Arbiter.Receive(true, queryPort, delegate {
Console.WriteLine(counter.ToString());
})));
// Declare the second interleave
Interleave il2 = new Interleave(
new TeardownReceiverGroup(),
new ExclusiveReceiverGroup(Arbiter.Receive(true, resetPort, delegate {
counter = 0;
})),
new ConcurrentReceiverGroup());
// Combine them
il.CombineWith(il2);
// Then activate it
Arbiter.Activate(dq, il);
In my next post, I'll cover Dispatchers, custom thread-pools and COM apartment interoperability.
CCR 101 - Part 6: Arbitrations (Join)
Actually, we've covered Joins already. I just forgot to mention it. The MultipleItemReceives we covered in Part 4 are joins, i.e. they wait for all the ports to produce n messages. All Joins (like their name suggests) are just rendezvous points for a set of messages and are an important construct - without them, we wouldn't easily be able to gather the results of a parallelized operation.
Whilst the most useful arbiters are typically those that allow you to deal with a concurrency problem whose size isn't known until runtime, there exists a join across two ports that is explicitly provided for by the Arbiter class (not unlike Choice in fact). The following sample takes a number, n, and a string from the console and prints that string n times.
class Sample0601 {
static void Main(string[] args) {
using (DispatcherQueue dq = new DispatcherQueue()) {
Port<int> intPort = new Port<int>();
Port<string> stringPort = new Port<string>();
Arbiter.Activate(dq, Arbiter.JoinedReceive(false, intPort, stringPort,
delegate(int n, string s) {
for (int i = 0; i < n; ++i) {
Console.WriteLine(s);
}
}));
Console.Write("Enter a number: ");
intPort.Post(Int32.Parse(Console.ReadLine()));
Console.Write("Enter a phrase: ");
stringPort.Post(Console.ReadLine());
Console.ReadLine();
}
}
}
In the next post, I'll introduce the final arbitration, Interleave. Conceptually, its a message-passing version of the Reader-Writer lock, scheduling readers and writers in a way that balances concurrency with correctness, albeit in an asynchronous fashion. Nearly every resource with some modifiable state can be protected by an Interleave and indeed it forms a core component of every service in the DSS.
Once the section on Arbitrations is complete, I want to cover the following (in order).
- Dispatchers. So far we've only really looked at DispatcherQueues running across the CLR thread-pool. By using our own Dispatchers we can control the number of threads and various properties of those threads (such as COM Apartment type).
- Iterative Tasks. Hugely important to the CCR, IterativeTasks exploit C# 2.0 iterator functionality to allow logical sequences of asynchronous methods to be written without ever blocking a physical thread.
- Causalities. A cross-thread extension of exceptions, they allow errors raised in some scheduled task, to propagate through their causalities, back to the instigator of the operation.
Longer term, I'll be posting some useful samples of code, illustrating usage with asynchronous sockets, asynchronous database interaction, interoperability with COM and UI threads and how the CCR can power your WCF-enabled server (or async ASP.NET web pages). If you have any specific requests, please leave them in the comments section.
Web Buffoon
I notice that this blog doesn't render properly on IE6. I have figured out a fix of sorts (something to do with the text in a fixed-length DIV overflowing) with some help, but need to make sure it doesn't break the experience for IE7 and Firefox users, before I apply it. As my CSS skills fall somewhere short of useless, this may take a couple of days.
I've also noticed that the code is considerably less legible on Firefox than on IE. I'll try and fix that too. Thank you for your patience.
CCR 101 - Part 5: Arbitrations (Choice)
Our first 'proper' arbitration is Choice. It takes one or more (normally at least two to be useful) Receiver tasks and schedules the first eligible one. The others will never be scheduled. An example will suffice:
class Sample0501 {
static void Main(string[] args) {
using (DispatcherQueue dq = new DispatcherQueue()) {
Port<int> intPort = new Port<int>();
Port<string> stringPort = new Port<string>();
Port<EmptyValue> emptyPort = new Port<EmptyValue>();
Arbiter.Activate(dq, Arbiter.Choice(
Arbiter.Receive(false, intPort, delegate(int i) {
Console.WriteLine("{0} is an int.", i.ToString());
}),
Arbiter.Receive(false, stringPort, delegate(string s) {
Console.WriteLine("{0} is a string.", s);
}),
Arbiter.Receive(false, emptyPort, delegate(EmptyValue e) {
Console.WriteLine("An empty line.");
})));
Console.Write("Enter a value: ");
string input = Console.ReadLine();
if (input.Length == 0) {
emptyPort.Post(EmptyValue.SharedInstance);
} else {
int intInput;
if (Int32.TryParse(input, out intInput))
intPort.Post(intInput);
else
stringPort.Post(input);
}
Console.ReadLine();
}
}
}
Only the first half of this program is of any interest. First, we create 3 ports. The last port is of type EmptyValue, a CCR type that can be useful for indicating that something happened even if there is no data associated with the message. Then, we create and activate our Choice arbitration. In the above example, we pass three Receiver tasks to the Arbiter.Choice call.
When a message is posted to any one of the ports, the corresponding receiver task becomes eligible for scheduling. However, it is the Choice arbitration that determines whether it will actually be scheduled. The Choice operates a simple first-come, first-served policy. As soon as a single receiver is scheduled, all the others in the Choice are doomed. This can be demonstrated by modifying the example so that each port is posted to at the end anyway. You'll see that the receivers don't fire.
class Sample0502 {
static void Main(string[] args) {
using (DispatcherQueue dq = new DispatcherQueue()) {
Port<int> intPort = new Port<int>();
Port<string> stringPort = new Port<string>();
Port<EmptyValue> emptyPort = new Port<EmptyValue>();
Arbiter.Activate(dq, Arbiter.Choice(
Arbiter.Receive(false, intPort, delegate(int i) {
Console.WriteLine("{0} is an int.", i.ToString());
}),
Arbiter.Receive(false, stringPort, delegate(string s) {
Console.WriteLine("{0} is a string.", s);
}),
Arbiter.Receive(false, emptyPort, delegate(EmptyValue e) {
Console.WriteLine("An empty line.");
})));
Console.Write("Enter a value: ");
string input = Console.ReadLine();
if (input.Length == 0) {
emptyPort.Post(EmptyValue.SharedInstance);
} else {
int intInput;
if (Int32.TryParse(input, out intInput))
intPort.Post(intInput);
else
stringPort.Post(input);
}
intPort.Post(0);
stringPort.Post("Hello World.");
emptyPort.Post(EmptyValue.SharedInstance);
Console.ReadLine();
}
}
}
Note that none of the Receivers is persistent (the first parameter is false in Arbiter.Receive). Choice requires this and will throw a runtime exception if you try and pass a persistent Receive to it. The reason for this is simple: Choice picks one from n Receives and then ignores the rest, so there's actually no point in any of them being persistent.
Choice Arbitrations for determining success/failure.
Perhaps the most common use of Choice is determining whether some asynchronous operation succeeded or failed. In fact, it's so common, the Arbiter class has a particular overload of the Choice method that takes a 2-ary PortSet and has a more direct syntax for declaring the handlers for either outcome. Look at the example below. It attempts to resolve a hostname to an IP address via an asynchronous operation.
class Sample0503 {
static void Main(string[] args) {
using (DispatcherQueue dq = new DispatcherQueue()) {
PortSet<IPAddress[], Exception> result = new PortSet<IPAddress[], Exception>();
Arbiter.Activate(dq, Arbiter.Choice(result,
delegate(IPAddress[] addresses) {
for (int i = 0; i < addresses.Length; ++i) {
Console.WriteLine("{0}. {1}", i.ToString(), addresses[i].ToString());
}
},
delegate(Exception e) {
Console.WriteLine(e.Message);
}));
Console.Write("Enter host: ");
string hostname = Console.ReadLine();
Dns.BeginGetHostAddresses(hostname, delegate(IAsyncResult iar) {
try {
result.Post(Dns.EndGetHostAddresses(iar));
} catch (Exception e) {
result.Post(e);
}
}, null);
Console.ReadLine();
}
}
}
First, we define a PortSet with two types; the first an array of IPAddresses that represents the successful outcome, the second an exception which represents failure. Next, we create and activate our Choice using the more concise syntax available (no explicit Receives).
In the bottom half of the program, we issue the call via the Dns class. Note how I've used an anonymous delegate to implement the operation's callback, posting to the result portset in both the success and failure case. This is a very common implementation pattern when wrapping APM methods with CCR ports.
CCR 101 - Part 4: Arbitrations (Receivers)
So far we've seen how to queue simple tasks to a thread-pool and how to post messages to ports. In this tutorial, I'll introduce arbitrations and an important type in the CCR library, the Arbiter class.
In the previous tutorial, I demonstrated how to post and retrieve messages to ports. Most of the time however, we won't explicitly dequeue messages, rather we'll define the code that should run when a message arrives on a port. The CCR then takes care of dequeueing the item for us.
Receivers
Receivers are simplest from of message handler. In fact, they're so simple they aren't really an arbitration at all. They simply process messages as they get dequeued from the port. Let's look at example:
class Sample0401 {
static void Main(string[] args) {
using (DispatcherQueue dq = new DispatcherQueue()) {
Port<string> port = new Port<string>();
Arbiter.Activate(dq, Arbiter.Receive(false, port, delegate(string item) {
Console.WriteLine(item);
}));
port.Post("Hello World.");
Console.ReadLine();
}
}
}
There's quite a lot of new stuff going on here, mostly on the Arbiter.Activate line, but let's go through the code:
- First, we create our dispatcher queue. This is where our message-handling code is going to be scheduled. We used the default constructor, so this queue will schedule items across the CLR thread-pool.
- Next, we create a single port of strings.
- Then, we set-up the handler to process messages on that port. You'll see two calls to the static Arbiter class here, one to Receive which simply creates a Receiver for you, and one to Activate which is where the task is actually activated. Without this activation step, the handler will never run.
- We then post a message onto the port.
Once activated, the Receiver is triggered on the presence of an item in the port. It removes the item and schedules the handler to run against it. Simple ;-)
One important concept to understand is that it doesn't matter if the arbitration is activated before or after the message has been posted. For example, the following example (which swaps the Arbiter.Activate and port.Post lines) produces exactly the same result.
class Sample0402 {
static void Main(string[] args) {
using (DispatcherQueue dq = new DispatcherQueue()) {
Port<string> port = new Port<string>();
port.Post("Hello World.");
Arbiter.Activate(dq, Arbiter.Receive(false, port, delegate(string item) {
Console.WriteLine(item);
}));
Console.ReadLine();
}
}
}
Another item worth discussing is the first parameter to the Arbiter.Receive call. This is a flag that indicates whether the handler will be persistent. A persistent-handler will process all messages on the port, whereas a transient handler processes just one message. We can illustrate this with the following example:
class Sample0403 {
static void Main(string[] args) {
using (DispatcherQueue dq = new DispatcherQueue()) {
Port<string> port = new Port<string>();
port.Post("Hello World.");
port.Post("Goodbye World.");
Arbiter.Activate(dq, Arbiter.Receive(false, port, delegate(string item) {
Console.WriteLine(item);
}));
Console.ReadLine();
}
}
}
The second message, 'Goodbye World.' never appears on the console. If we change the persistent parameter to true...
class Sample0404 {
static void Main(string[] args) {
using (DispatcherQueue dq = new DispatcherQueue()) {
Port<string> port = new Port<string>();
port.Post("Hello World.");
port.Post("Goodbye World.");
Arbiter.Activate(dq, Arbiter.Receive(true, port, delegate(string item) {
Console.WriteLine(item);
}));
Console.ReadLine();
}
}
}
... then it does.
Message Ordering
It's possible that the order in which the messages appear in the console in the example above is different to the order in which they were posted. What's going on?
Well, the CCR is dequeueing the items in order, but once the handlers are scheduled (not executed), it'll just go back and fetch the next. If multiple items are scheduled either across the CLR thread-pool or a custom thread-pool, then we are really at the mercy of the underlying thread-scheduling mechanism. We will see in a later tutorial how we can control the ordering of messages.
Predicates
We can perform certain filtering of message via Predicates. The following sample handles odd and even numbers differently on a port of integers.
class Sample0405 {
static void Main(string[] args) {
using (DispatcherQueue dq = new DispatcherQueue()) {
Port<int> port = new Port<int>();
Arbiter.Activate(dq, Arbiter.Receive(true, port, delegate(int item) {
Console.WriteLine("Odd: " + item.ToString());
},
delegate(int item) {
return item % 2 == 1;
}));
Arbiter.Activate(dq, Arbiter.Receive(true, port, delegate(int item) {
Console.WriteLine("Even: " + item.ToString());
},
delegate(int item) {
return item % 2 == 0;
}));
for (int i = 0; i < 10; ++i)
port.Post(i);
Console.ReadLine();
}
}
}
We've activated 2 receivers here, and passed a predicate to each (again as anonymous delegates).
Arbiter.Receive
The Arbiter static class supports a number of methods that each return an aribtration. These are really convenience methods - For example, the first sample in this post can actually be written without the Receive call, by constructing the Receive arbitration directly:
class Sample0406 {
static void Main(string[] args) {
using (DispatcherQueue dq = new DispatcherQueue()) {
Port<string> port = new Port<string>();
Arbiter.Activate(dq, new Receiver<string>(false, port, delegate(string item) {
return true;
},
new Task<string>(delegate(string item) {
Console.WriteLine(item);
})));
port.Post("Hello World.");
Console.ReadLine();
}
}
}
This is more verbose, but it does demonstrate what is actually going on. I think it also conveys better the two-stage arbitration process i.e. creation then activation, but perhaps that's really because Arbiter.Receive would have been better named Arbiter.CreateReceive.
Batching Messages
Sometimes you want to process message in batches. CCR allows you to do this in a number of ways. Let's have a look at the simplest way to do this. The following example processes batches of 10 numbers at a time:
class Sample0407 {
static void Main(string[] args) {
using (DispatcherQueue dq = new DispatcherQueue()) {
Port<int> port = new Port<int>();
Arbiter.Activate(dq, Arbiter.MultipleItemReceive(true, port, 10, delegate(int[] ints) {
StringBuilder sb = new StringBuilder();
foreach (int item in ints)
sb.Append(item.ToString()).Append(" ");
Console.WriteLine(sb.ToString());
}));
for (int i = 0; i < 101; ++i)
port.Post(i);
Console.ReadLine();
Console.WriteLine(port.ItemCount.ToString());
Console.ReadLine();
}
}
}
Once, this sample has dumped its numbers, you can hit Enter and it will output the number of items still in the port. The above sample posted 101 items, then processed 10 at a time (10 times), leaving 1 in the port. This is an example of an atomic receive. If the batch can't be completed i.e. there are less than 10 items in the queue, then the item(s) will not logically be removed from the queue. There are other types of atomic arbitrations which will be covered in a future tutorial.
Sometimes you want to batch messages across ports. This is commonly used in scatter/gather operations. Consider this scenario:
- Your application needs to issue n operations.
- Any or all these operations might fail.
- Each operation can run independently of the others.
- Once all the operations are complete, process the results.
The following sample simulates this through another variant of MultipleItemReceive.
class Sample0408 {
static void Main(string[] args) {
using (DispatcherQueue dq = new DispatcherQueue()) {
Console.Write("Enter number of operations: ");
int numOps = Int32.Parse(Console.ReadLine());
SuccessFailurePort results = new SuccessFailurePort();
Arbiter.Activate(dq, Arbiter.MultipleItemReceive(results, numOps,
delegate(ICollection<SuccessResult> successes, ICollection<Exception> failures) {
Console.WriteLine("{0} successes, {1} failures.", successes.Count, failures.Count);
}));
// Simulate async operations with 10% chance of failure.
Random r = new Random();
for (int i = 0; i < numOps; ++i) {
Arbiter.Activate(dq, Arbiter.FromHandler(delegate() {
if (r.Next(10) < 9)
results.Post(new SuccessResult());
else
results.Post(new Exception());
}));
}
Console.ReadLine();
}
}
}
We create a SuccessFailurePort portset (discussed previously), and then activate a MultipleItemReceiver arbitration that specifies that when numOps messages have been received, regardless of which sub-port they were posted on, then fire this handler.
[The Arbiter.FromHandler call is a helper to schedule a delegate directly onto the dispatcher queue].
Wrap-up
That covers quite a lot about receivers. I'll leave the rest for future posts. In the next tutorial I'd like to cover our first proper simple arbitration, Choice.
CCR 101 - Part 3: Ports
In a message-passing system there must exist some mechanism over which to pass messages. In CCR, ports are that mechanism. They are essentially simple FIFO queues. For your programming convenience, they are declared as the generic type, Port<T>, where T defines the type that can travel on that port.
The following sample puts 10 strings on a port and then dequeues them and writes them to the console:
class Sample0301 {
static void Main(string[] args) {
Port<string> port = new Port<string>();
// Enqueue the items
for (int i = 0; i < 10; ++i)
port.Post(i.ToString());
// Dequeue the items
string item;
while (port.Test(out item))
Console.WriteLine(item);
Console.ReadLine();
}
}
As illustrated, we use the Post method to enqueue items, and the Test method to dequeue them. Test will return false if the port is empty and set the output parameter to its default value.
PortSets
PortSets are just aggregated ports of different types. They are essentially a programming convenience and they use overloads to simplify posting any of the supported types to the single portset. The following example posts 10 numbers to a portset, evens to an int port and odds to a string port.
class Sample0302 {
static void Main(string[] args) {
PortSet<int, string> portSet = new PortSet<int, string>();
// Enqueue the items
for (int i = 0; i < 10; ++i) {
if (i % 2 == 0)
portSet.Post(i);
else
portSet.Post(i.ToString());
}
}
}
There's no equivalent for reliably reading items out of the PortSet, rather you must access the individual ports explicitly. Each sub-port is numbered P0...PN, depending on the number of types. The following examples dumps the contents of each sub-port populated above.
class Sample0303 {
static void Main(string[] args) {
PortSet<int, string> portSet = new PortSet<int, string>();
// Enqueue the items (elided)
// ...
// Output them
int intElem;
while (portSet.P0.Test(out intElem))
Console.WriteLine("int: {0}", intElem.ToString());
string stringElem;
while (portSet.P1.Test(out stringElem))
Console.WriteLine("string: {0}", stringElem);
Console.ReadLine();
}
}
Although the CCR provides generic versions of the PortSet class with ability to be specialized with up to 19 different types, the most commonly used is probably the 2-type version (shown above). The reason for this is that two types is often all it needs to convey the result of an operation i.e. the success result and the failure result. In fact, this is so common the CCR defines a form of this type for you:
namespace Microsoft.Ccr.Core {
// Summary:
// Port collection with ports typed for SuccessResult and Exception
//
// Remarks:
// This port type is appropriate as the result port type for generic request/response
// interactions
public class SuccessFailurePort : PortSet<SuccessResult, Exception> {
public SuccessFailurePort();
}
}
Unfortunately, SuccessResult is really just a wrapper around an int, so if the result of your operation defines more than that, you'll need to define your own PortSet explicitly. If your operation returns more than one type in either scenario, you can use the useful Tuple<T0, ..., TN> class to wrap these types.
Next Steps
So far we've only really covered the very basics of the CCR, and if you're coming to these tutorials completely cold, then you're probably distinctly unimpressed thus far. In the next tutorial, we'll introduce the predominant concept in the CCR, Arbiters.
Conceptually, arbiters stand between ports and messages on one side, and dispatcher queues and tasks on the other. They respond to messages arriving on ports, by determining whether some application-specified condition has been met by that arrival. If it has, it will schedule some specified task to execute on a specified dispatcher queue. Arbiters cover a variety of basic scenario and are (importantly) composable. You can build more complex arbitrations out of simpler ones.
CCR 101 - Part 2: DispatcherQueues and Tasks
Let's just get straight into some samples, as a means of introducing some of the CCR types and concepts. To run these samples yourself, you'll need the .NET 2.0 Framework SDK, and the MSRS 1.5 release. Unless otherwise stated, all the samples are just simple console applications that reference Ccr.Core.dll.
Here is the canonical example:
class Sample01 {
static void Main(string[] args) {
using (DispatcherQueue dq = new DispatcherQueue()) {
dq.Enqueue(new Task(delegate() {
Console.WriteLine("Hello world.");
}));
Console.ReadLine();
}
}
}
Nothing spectacular here, and frankly nothing that couldn't be achieved with ThreadPool.QueueUserWorkItem. It does, however, introduce the DispatcherQueue. Instances of this type are really just queues of delegates waiting to be executed. These delegates are removed in turn from the queue and executed over the available threads (more on that soon), so in the presence of multiple threads, might execute out of order. For example this code, which enqueues 10 items...
class Sample02 {
static void Main(string[] args) {
using (DispatcherQueue dq = new DispatcherQueue()) {
for (int i = 0; i < 10; ++i) {
int num = i;
dq.Enqueue(new Task(delegate() {
Console.WriteLine("Hello world({0}).", num.ToString());
}));
}
Console.ReadLine();
}
}
}
...produces this result on my machine:
Hello world(0).
Hello world(1).
Hello world(2).
Hello world(3).
Hello world(4).
Hello world(5).
Hello world(6).
Hello world(7).
Hello world(9).
Hello world(8).
Again, this is exactly the same sort of behaviour you see over the CLR thread-pool.
Another type visible in both samples is the Task, which itself implements ITask, an interface that encapsulates the delegate and exposes an Execute method (no prizes for guessing what that does). DispatcherQueues are populated with ITasks, rather than the delegates directly. This indirection allows for more flexibility in terms of the types of tasks that can be queued. For example, there is a generic form of the Task class specialized by a single type, that invokes delegates that take an argument of that type. So we could rewrite Sample02 as:
class Sample03 {
static void Main(string[] args) {
using (DispatcherQueue dq = new DispatcherQueue()) {
for (int i = 0; i < 10; ++i) {
dq.Enqueue(new Task<int>(i, delegate(int val) {
Console.WriteLine("Hello world({0}).", val.ToString());
}));
}
Console.ReadLine();
}
}
}
There are many different types of tasks available in the CCR and we'll look at some of the more interesting ones in future posts. Often however, the higher-level abstractions of the CCR library do most of the task creation and queuing for us.
Just before we complete our initial look at DispatcherQueues, it's worth discussing two configurable properties that this type supports:
- Scheduling policy. The DispatcherQueues created for the samples so far run with 'unconstrained' policy. This means that there are no logical limits on the number or frequency of tasks that may be enqueued. It is possible to constrain this policy by controlling either the maximum number of items in the queue or the frequency with which they may be added. You can also decide whether a breach of these constraints results in the caller blocking or tasks being dropped.
- The target thread-pool. Creating a DispatcherQueue via the default constructor will cause tasks to be executed over the regular CLR thread-pool. You can however construct a DispatcherQueue with a reference to a custom thread-pool. These thread-pools are controlled by a Dispatcher, a type that is the focus of Part 3 and provides facilities missing from the CLR thread-pool.
And finally, note that the DispatcherQueue is disposable. If you are using DispatcherQueues without reference to a controlling Dispatcher, you don't strictly need to dispose of them, but this is an implementation detail you shouldn't rely on.
CCR 101 - Part 1: Introduction
[This post is intended to provide an introduction to the CCR, complementing other introductory materials online (links at the end)].
So we agree that multi-threading programming is hard right? Or, at least that it gets hard quickly. Sure, if you only have a single task you want to execute in parallel with your main code, you can just spin up a thread, or maybe even use the BackgroundWorker class. But the thing about multi-threading is that things start to get tricky quite quickly:
- Can I easily cancel this task? This suggests some inter-thread communication. It also suggests that there are points in the long-running task where I can check for this pending cancellation.
- How can I detect if the task fails? More inter-thread communication is required. I also need to worry about the thread termination in the failure class.
- Actually, how do I detect if the task completes at all? Can it timeout?
- What if I'm writing a server application? I can't really spawn a thread for each request - this is not a model that scales well on Windows.
So even reasoning about this parallelism is complicated by the fact that I pretty much have to do a lot of the basic scaffolding myself.
What about the thread-pool?
Now the thread-pool helps a lot, and combined with C# 2.0 features like anonymous delegates, I can quite succinctly capture simple operations, but I still have the issues of propagating the success/failure of the result back to the original instigator. What's more, as the complexity of the operation increases, anonymous delegates break down as an implementation mechanism. And traditional 'named' delegates often tend to split the same logical operation across multiple physical ones which introduces different readability and maintainability issues.
And there's one more aspect of the thread-pool that's often overlooked. It is essentially just a single queue of work items. They might actually be dispatched concurrently across the available processors, but if you post a whole lot of items into the pool, they'll be processed approximately in turn. If you have logical requirement to support priority queues, then you'll need to implement these yourself with or without the thread-pool.
So is the CCR just a better thread-pool?
Well, it is a better thread-pool, but it's quite a bit more as well. The CCR provides some library primitives that allow you to more easily to exploit the available processing power of your machine, which is probably quite a lot (although the CCR runs on some fairly minimal implementations of Windows). It does this by providing a lightweight message-passing infrastructure that can easily be rolled on top of existing asynchronous programming models. In addition, it supports a pseudo-declarative style of co-ordinating messages, a mechanism for propagating cross-thread failure in a consistent fashion and support for priority (or at least multiple) work queues to avoid the starvation problem outlined above. The CCR will dispatch your work items across the available processors, either over the existing CLR thread-pool or via a custom pool of dedicated threads.
Back up a bit - what do you mean by message passing?
Basically, rather than explicitly co-ordinating shared data between threads (e.g. via events, semaphores etc), parallel dependent tasks pass messages between one another. These messages travel over queues and tasks are registered to run on the arrival of the message. Queues may be short-lived or long-lived and can be passed around between tasks. Any form of WIndows UI programming is ultimately an exercise in message passing, albeit a less dynamic one. Message passing as a technique for describing parallel computations is nothing new, but the CCR brings it within range of any competent .NET developer.
Sounds a bit slow to me...
The CCR is very lightweight and has clocked some impressive figures. It's quite possible that a custom solution using native OS primitives to schedule co-ordinating threads would be quicker, but it probably wouldn't be as simple or as consistent and more importantly, probably wouldn't scale as well as the number of available cores increases.
And what's the Asynchronous Programming Model?
Well, the actual nuts-and-bolts of the Asynchronous Programming Model (APM) have been described elsewhere; what I just want to mention is really the spirit of the thing, and this takes us back to the introduction of I/O completion ports in NT3.51. IOCPs really surfaced to the C/C++ application developer the underlying reality that I/O took place off the processor. The CPU would dispatch a disk I/O (or a network I/O) via a device driver and then go and do something else until an interrupt fired, signalling completion. Abstractly, IOCPs propagated this interrupt into the application code, allowing the developer to instigate an I/O, do something else, and have some other code fire when it completed. Coupled with an efficient task-dispatching mechanism that worked over a small number of threads (ideally one-per-processor to eliminate the overhead of a context-switch), IOCP-based programs could reach tremendous levels of scalability and responsiveness. Of course, they were extremely difficult to write correctly, especially in C/C++. They still are difficult to write, even with the APM, C# 2.0 and the CLR thread-pool, but the principle remains:
Always perform I/O asynchronously; never block a thread that could otherwise do useful work.
As I'll demonstrate in future posts, the CCR has facilities that allow the developer to remain true to this principle, whilst avoiding many of the implementation pitfalls of doing it all 'manually'.
CCR
Others have blogged about it, others have written technical pieces about it. The Concurrency and Coordination Runtime, which currently forms part of Microsoft Robotics Studio seeks to enable concurrent/parallel programming through the coordination of a lightweight message-passing system.
The CCR (and its distributed big-brother, DSS) represents a shift in thinking for developers like me. Previously, I used to bring my not-inconsiderable (ahem) threading knowledge to a problem, consider the costs of introducing parallelism and weigh-up whether the increased algorithmic complexity was worth the potential runtime improvement.
[Not sure where I read this, but someone once asked if debugging code is harder than writing code, and you're being as clever as you can be when you're writing code, what chance have you of debugging it?]
Spend sometime with the CCR and maybe like me, you'll basically start to worry a lot less about the how and more on the why. And their are plenty of whys:
- Multicore hyper-threaded machines are becoming the norm. Most of the new boxes at my place of work have 4 logical processors. We might be ahead of the curve here but I doubt it.
- An awful lot of the power of Windows (even in .NET) relies on the developer exploiting the Asynchronous Programming Model (APM) in areas of I/O. You get the best scalability in the back-end this way. On the client, you can build more responsive apps and handle non-UI thread related work on one of those spare processors.
- Even in .NET 2.0, this is hard to do, except for the most trivial tasks.
The CCR brings some simplicity to bear. It can allow you to build your apps the way the OS designers really want you to, to get the most of the available power you have, without sacrificing the coherence of your implementation.
I should state from the outset that neither the CCR or DSS runtimes are tied in any way to robotics other than that's how their shipped right now. One day they might ship as part of the BCL but right now, they don't.
I hope to blog some more on the CCR and DSS in the near future, providing some examples of how to exploit its potential within your own (non-robotics) applications.