CCR 101 - Part 7: Interleave

The Interleave is essentially an arbitration with reader-writer scheduling semantics. It does this by splitting the Receiver tasks it arbitrates into 3 categories:

  1. Concurrent receivers. Any receivers in this group can be scheduled concurrently with any other receivers from this group, providing no exclusive receivers (see below) are active or pending.
  2. Exclusive receivers. A receiver from this group will only be scheduled when there are no other exclusive or concurrent receivers active.
  3. Teardown receivers. A receiver in this group will also run exclusively but causes the Interleave to 'retire'. Once a receiver in the group has been scheduled, all other receivers are abandoned and will not be scheduled.

Note that none of these categories need have any receivers at all.

There are a few more interesting things about Interleaves:

  1. Exclusive receivers have their order retained. Remember how we saw that a persistent Receive (without an outer arbitration to control the scheduling) would schedule tasks as fast as items arrived, leading to out-of-order processing across multiple threads? Well, by placing that Receiver within the set of exclusive Receivers, the processing will remain in order. This is because the Interleave is concerned with the completion of the Receiver task, not just its scheduling.
  2. Interleaves may be combined. You can 'merge' one interleave with another. Each group (concurrent, exclusive, teardown) is merged and behaves as if you had declared the single more complex interleave.
  3. Interleaves still offer opportunities for deadlock and I'll cover these in the post that deals with IterativeTasks, because the use of the latter within the former is where they can occur.

Let's look at example. This one is more complicated than the others we've seen so far and I've burnished it with a few comments to help illustrate the salient points. We use the Interleave to protect a counter that increments every second and allows the the client to query the current value.

class Sample0701 {
static void Main(string[] args) {
using (DispatcherQueue dq = new DispatcherQueue()) {
ManualResetEvent done = new ManualResetEvent(false);
int counter = 0;
// Signal port - indicates the counter should be incremented
Port<EmptyValue> signalPort = new Port<EmptyValue>();
// Query port - indicates that the counter should be read
Port<EmptyValue> queryPort = new Port<EmptyValue>();
// Quit port - indicates that the interleave should exit
Port<EmptyValue> quitPort = new Port<EmptyValue>();
// Declare the interleave
Interleave il = new Interleave(
new TeardownReceiverGroup(Arbiter.Receive(false, quitPort, delegate {
Console.WriteLine("Finished.");
done.Set();
})),
new ExclusiveReceiverGroup(Arbiter.Receive(true, signalPort, delegate {
++counter;
})),
new ConcurrentReceiverGroup(Arbiter.Receive(true, queryPort, delegate {
Console.WriteLine(counter.ToString());
})));
// Then activate it
Arbiter.Activate(dq, il);
// Now start a timer that will fire every second, then read the console. An empty
// line causes the program to quit.
using (Timer t = new Timer(delegate { signalPort.Post(EmptyValue.SharedInstance); })) {
t.Change(0, 1000);
string line = Console.ReadLine();
while (line.Length > 0) {
if (line == "q")
queryPort.Post(EmptyValue.SharedInstance);
line = Console.ReadLine();
}
quitPort.Post(EmptyValue.SharedInstance);
}
done.WaitOne();
Console.ReadLine();
}
}
}

We create three ports. One carries a request to increment the counter, one to query the counter and one to shutdown. We create our Interleave arbitration over these three ports, then activate it. Then we activate a timer which fires every second, posting a message to the appropriate port when it does so. Then we sit in a loop, reading the console.


Now consider, if we wanted to support an extra message that would reset the counter. This requires a new port (in this example at least) and an additional exclusive receiver. I've altered the above sample, to show how to do this, highlighting the amendments in bold.

class Sample0702 {
static void Main(string[] args) {
using (DispatcherQueue dq = new DispatcherQueue()) {
ManualResetEvent done = new ManualResetEvent(false);
int counter = 0;
// Signal port - indicates the counter should be incremented
Port<EmptyValue> signalPort = new Port<EmptyValue>();
// Reset port
Port<EmptyValue> resetPort = new Port<EmptyValue>();
// Query port - indicates that the counter should be read
Port<EmptyValue> queryPort = new Port<EmptyValue>();
// Quit port - indicates that the interleave should exit
Port<EmptyValue> quitPort = new Port<EmptyValue>();
// Declare the interleave
Interleave il = new Interleave(
new TeardownReceiverGroup(Arbiter.Receive(false, quitPort, delegate {
Console.WriteLine("Finished.");
done.Set();
})),
new ExclusiveReceiverGroup(
Arbiter.Receive(true, signalPort, delegate {
++counter;
}),
Arbiter.Receive(true, resetPort, delegate {
counter = 0;
})),
new ConcurrentReceiverGroup(Arbiter.Receive(true, queryPort, delegate {
Console.WriteLine(counter.ToString());
})));
// Then activate it
Arbiter.Activate(dq, il);
// Now start a timer that will fire every second, then read the console. An empty
// line causes the program to quit.
using (Timer t = new Timer(delegate { signalPort.Post(EmptyValue.SharedInstance); })) {
t.Change(0, 1000);
string line = Console.ReadLine();
while (line.Length > 0) {
if (line == "q")
queryPort.Post(EmptyValue.SharedInstance);
else if (line == "r")
resetPort.Post(EmptyValue.SharedInstance);
line = Console.ReadLine();
}
quitPort.Post(EmptyValue.SharedInstance);
}
done.WaitOne();
Console.ReadLine();
}
}
}

Note, that I could have combined two Interleaves to the same effect. The following snippet illustrates how I might have done that:

            // Declare the interleave
Interleave il = new Interleave(
new TeardownReceiverGroup(Arbiter.Receive(false, quitPort, delegate {
Console.WriteLine("Finished.");
done.Set();
})),
new ExclusiveReceiverGroup(Arbiter.Receive(true, signalPort, delegate {
++counter;
})),
new ConcurrentReceiverGroup(Arbiter.Receive(true, queryPort, delegate {
Console.WriteLine(counter.ToString());
})));
// Declare the second interleave
Interleave il2 = new Interleave(
new TeardownReceiverGroup(),
new ExclusiveReceiverGroup(Arbiter.Receive(true, resetPort, delegate {
counter = 0;
})),
new ConcurrentReceiverGroup());
// Combine them
il.CombineWith(il2);
// Then activate it
Arbiter.Activate(dq, il);

In my next post, I'll cover Dispatchers, custom thread-pools and COM apartment interoperability.


0 comments: