CCR 101 - Part 4: Arbitrations (Receivers)

So far we've seen how to queue simple tasks to a thread-pool and how to post messages to ports. In this tutorial, I'll introduce arbitrations and an important type in the CCR library, the Arbiter class.

In the previous tutorial, I demonstrated how to post and retrieve messages to ports. Most of the time however, we won't explicitly dequeue messages, rather we'll define the code that should run when a message arrives on a port. The CCR then takes care of dequeueing the item for us.

Receivers

Receivers are simplest from of message handler. In fact, they're so simple they aren't really an arbitration at all. They simply process messages as they get dequeued from the port. Let's look at example:

class Sample0401 {
static void Main(string[] args) {
using (DispatcherQueue dq = new DispatcherQueue()) {
Port<string> port = new Port<string>();
Arbiter.Activate(dq, Arbiter.Receive(false, port, delegate(string item) {
Console.WriteLine(item);
}));
port.Post("Hello World.");
Console.ReadLine();
}
}
}

There's quite a lot of new stuff going on here, mostly on the Arbiter.Activate line, but let's go through the code:



  1. First, we create our dispatcher queue. This is where our message-handling code is going to be scheduled. We used the default constructor, so this queue will schedule items across the CLR thread-pool.
  2. Next, we create a single port of strings.
  3. Then, we set-up the handler to process messages on that port. You'll see two calls to the static Arbiter class here, one to Receive which simply creates a Receiver for you, and one to Activate which is where the task is actually activated. Without this activation step, the handler will never run.
  4. We then post a message onto the port.

Once activated, the Receiver is triggered on the presence of an item in the port. It removes the item and schedules the handler to run against it. Simple ;-)


One important concept to understand is that it doesn't matter if the arbitration is activated before or after the message has been posted. For example, the following example (which swaps the Arbiter.Activate and port.Post lines) produces exactly the same result.

class Sample0402 {
static void Main(string[] args) {
using (DispatcherQueue dq = new DispatcherQueue()) {
Port<string> port = new Port<string>();
port.Post("Hello World.");
Arbiter.Activate(dq, Arbiter.Receive(false, port, delegate(string item) {
Console.WriteLine(item);
}));
Console.ReadLine();
}
}
}

Another item worth discussing is the first parameter to the Arbiter.Receive call. This is a flag that indicates whether the handler will be persistent. A persistent-handler will process all messages on the port, whereas a transient handler processes just one message. We can illustrate this with the following example:

class Sample0403 {
static void Main(string[] args) {
using (DispatcherQueue dq = new DispatcherQueue()) {
Port<string> port = new Port<string>();
port.Post("Hello World.");
port.Post("Goodbye World.");
Arbiter.Activate(dq, Arbiter.Receive(false, port, delegate(string item) {
Console.WriteLine(item);
}));
Console.ReadLine();
}
}
}

The second message, 'Goodbye World.' never appears on the console. If we change the persistent parameter to true...

class Sample0404 {
static void Main(string[] args) {
using (DispatcherQueue dq = new DispatcherQueue()) {
Port<string> port = new Port<string>();
port.Post("Hello World.");
port.Post("Goodbye World.");
Arbiter.Activate(dq, Arbiter.Receive(true, port, delegate(string item) {
Console.WriteLine(item);
}));
Console.ReadLine();
}
}
}

... then it does.


Message Ordering

It's possible that the order in which the messages appear in the console in the example above is different to the order in which they were posted. What's going on?


Well, the CCR is dequeueing the items in order, but once the handlers are scheduled (not executed), it'll just go back and fetch the next. If multiple items are scheduled either across the CLR thread-pool or a custom thread-pool, then we are really at the mercy of the underlying thread-scheduling mechanism. We will see in a later tutorial how we can control the ordering of messages.


Predicates

We can perform certain filtering of message via Predicates. The following sample handles odd and even numbers differently on a port of integers.

class Sample0405 {
static void Main(string[] args) {
using (DispatcherQueue dq = new DispatcherQueue()) {
Port<int> port = new Port<int>();
Arbiter.Activate(dq, Arbiter.Receive(true, port, delegate(int item) {
Console.WriteLine("Odd: " + item.ToString());
},
delegate(int item) {
return item % 2 == 1;
}));
Arbiter.Activate(dq, Arbiter.Receive(true, port, delegate(int item) {
Console.WriteLine("Even: " + item.ToString());
},
delegate(int item) {
return item % 2 == 0;
}));
for (int i = 0; i < 10; ++i)
port.Post(i);
Console.ReadLine();
}
}
}

We've activated 2 receivers here, and passed a predicate to each (again as anonymous delegates).


Arbiter.Receive

The Arbiter static class supports a number of methods that each return an aribtration. These are really convenience methods - For example, the first sample in this post can actually be written without the Receive call, by constructing the Receive arbitration directly:

class Sample0406 {
static void Main(string[] args) {
using (DispatcherQueue dq = new DispatcherQueue()) {
Port<string> port = new Port<string>();
Arbiter.Activate(dq, new Receiver<string>(false, port, delegate(string item) {
return true;
},
new Task<string>(delegate(string item) {
Console.WriteLine(item);
})));
port.Post("Hello World.");
Console.ReadLine();
}
}
}

This is more verbose, but it does demonstrate what is actually going on. I think it also conveys better the two-stage arbitration process i.e. creation then activation, but perhaps that's really because Arbiter.Receive would have been better named Arbiter.CreateReceive.


Batching Messages

Sometimes you want to process message in batches. CCR allows you to do this in a number of ways. Let's have a look at the simplest way to do this. The following example processes batches of 10 numbers at a time:

class Sample0407 {
static void Main(string[] args) {
using (DispatcherQueue dq = new DispatcherQueue()) {
Port<int> port = new Port<int>();
Arbiter.Activate(dq, Arbiter.MultipleItemReceive(true, port, 10, delegate(int[] ints) {
StringBuilder sb = new StringBuilder();
foreach (int item in ints)
sb.Append(item.ToString()).Append(" ");
Console.WriteLine(sb.ToString());
}));
for (int i = 0; i < 101; ++i)
port.Post(i);
Console.ReadLine();
Console.WriteLine(port.ItemCount.ToString());
Console.ReadLine();
}
}
}

Once, this sample has dumped its numbers, you can hit Enter and it will output the number of items still in the port. The above sample posted 101 items, then processed 10 at a time (10 times), leaving 1 in the port. This is an example of an atomic receive. If the batch can't be completed i.e. there are less than 10 items in the queue, then the item(s) will not logically be removed from the queue. There are other types of atomic arbitrations which will be covered in a future tutorial.


Sometimes you want to batch messages across ports. This is commonly used in scatter/gather operations. Consider this scenario:



  1. Your application needs to issue n operations.

  2. Any or all these operations might fail.

  3. Each operation can run independently of the others.

  4. Once all the operations are complete, process the results.

The following sample simulates this through another variant of MultipleItemReceive.

class Sample0408 {
static void Main(string[] args) {
using (DispatcherQueue dq = new DispatcherQueue()) {
Console.Write("Enter number of operations: ");
int numOps = Int32.Parse(Console.ReadLine());
SuccessFailurePort results = new SuccessFailurePort();
Arbiter.Activate(dq, Arbiter.MultipleItemReceive(results, numOps,
delegate(ICollection<SuccessResult> successes, ICollection<Exception> failures) {
Console.WriteLine("{0} successes, {1} failures.", successes.Count, failures.Count);
}));
// Simulate async operations with 10% chance of failure.
Random r = new Random();
for (int i = 0; i < numOps; ++i) {
Arbiter.Activate(dq, Arbiter.FromHandler(delegate() {
if (r.Next(10) < 9)
results.Post(new SuccessResult());
else
results.Post(new Exception());
}));
}
Console.ReadLine();
}
}
}

We create a SuccessFailurePort portset (discussed previously), and then activate a MultipleItemReceiver arbitration that specifies that when numOps messages have been received, regardless of which sub-port they were posted on, then fire this handler.


[The Arbiter.FromHandler call is a helper to schedule a delegate directly onto the dispatcher queue].


Wrap-up

That covers quite a lot about receivers. I'll leave the rest for future posts. In the next tutorial I'd like to cover our first proper simple arbitration, Choice.


2 comments:

  1. Paul said,

    Nice work. An example or two using one (and/or more) Dispatchers (thread pools) would be great.

    on July 25, 2007 at 11:08 AM


  2. NickG said,

    Thanks Paul. I'll be covering dispatchers in more detail soon (once I'm done with arbitrations).

    on July 25, 2007 at 2:45 PM