CCR 101 - Part 9: Iterators

Iterators (or Iterative Tasks in CCR parlance) are one of the standout features of the CCR. Their design and implementation is an imaginative fusion of the message-passing paradigm and the iterator functionality introduced in C# 2.0. But whether or not you choose to sit and admire the aesthetics of the CCR, you would probably agree that it is what they enable that is truly impressive - the ability to write a logical sequence of actions, within a single method, without ever blocking the thread between those actions.

Now obviously the strength of this functionality really shines through when each action results in some form of asynchronous I/O, e.g. a web request or a database write. In this case, the thread is freed up to service other tasks whilst the I/O completes, before returning to the next action in the sequence. But remember, in our CCR-based message-passing world, pretty much all I/O is performed asynchronously even if those messages are passed between CPU-bound services in the same process.

C# 2.0 Iterators: A (quick) recap.

The fundamental idea of the iterator functionality is to allow the developer to present an enumerable list to some other code without the explicit population or implementation of that list. Here's a sample that presents the list of the first 5 primes.

class Sample0901 {

static void Main(string[] args) {
foreach (int p in FirstFivePrimes()) {
Console.WriteLine(p.ToString());
}
Console.ReadLine();
}

private static IEnumerable<int> FirstFivePrimes() {
yield return 1;
yield return 2;
yield return 3;
yield return 5;
yield return 7;
}
}

Very contrived, and next to useless, but let's just examine the salient points. The first is that the client can just write a simple foreach loop. This in itself is just a shorthand (and compiler-generated) way of writing:

class Sample0902 {

static void Main(string[] args) {
IEnumerator<int> pe = FirstFivePrimes().GetEnumerator();
while (pe.MoveNext())
Console.WriteLine(pe.Current.ToString());
Console.ReadLine();
}

// Elided...
}

Now, the really clever code-generation is not in Main(), but in FirstFivePrimes(). The compiler generates a new class for this method that acts as a state machine. GetEnumerator() returns a new instance of this class. The client moves through the state by calling MoveNext() on the enumerator and the generated class exposes this state through the Current value. Each 'yield return X' statement sets the new state of the instance.


There are other, much better articles on iterators, in MSDN Magazine and on Matt Pietrek's blog.


Iterators the CCR way.

In the Coordination and Concurrency Runtime, iterators are used to return tasks that are then scheduled for execution. The CCR calls MoveNext() to execute the first task, which will execute all the code in the iterator method up to and including the first yield statement. And here's another clever bit. By yielding arbitrations (e.g. join or choice), which are themselves tasks, you can logically block until the arbitration completes. In the meantime, the dispatcher thread just goes off and processes other pending tasks.


Let's take a look at an example. The following code crawls the front page of this blog's home site for links to MSDN blogs. It then issues requests to fetch each of these blogs and prints out how many it successfully retrieved and how many failed.

class Sample0903 {
static void Main(string[] args) {
using (Dispatcher dispatcher = new Dispatcher()) {
using (DispatcherQueue taskQueue = new DispatcherQueue("IteratorSample", dispatcher)) {
string url = "http://iodyne.blogspot.com";
Arbiter.Activate(taskQueue,
Arbiter.FromIteratorHandler(delegate { return DoWork(url); }));
Console.ReadLine();
}
}
}

private static IEnumerator<ITask> DoWork(string url) {
PortSet<WebResponse, Exception> webResult = new PortSet<WebResponse, Exception>();
WebRequest webRequest = WebRequest.Create(url);
webRequest.BeginGetResponse(delegate(IAsyncResult iar) {
try {
webResult.Post(webRequest.EndGetResponse(iar));
} catch (Exception e) {
webResult.Post(e);
}
}, null);
WebResponse webResponse = null;
yield return Arbiter.Choice(webResult,
delegate(WebResponse wr) {
webResponse = wr;
},
delegate(Exception e) {
Console.WriteLine(e);
});
if (webResponse == null)
yield break;

string body = new StreamReader(webResponse.GetResponseStream()).ReadToEnd();
webResponse.Close();
Regex hrefRegex = new Regex("http\\:\\/\\/blogs\\.msdn\\.com\\/[^'\"\"]*", RegexOptions.IgnoreCase);
MatchCollection mc = hrefRegex.Matches(body);
PortSet<EmptyValue, Exception> multiPort = new PortSet<EmptyValue, Exception>();
foreach (Match m in mc) {
string murl = m.ToString();
WebRequest murlRequest = WebRequest.Create(murl);
murlRequest.BeginGetResponse(delegate(IAsyncResult iar) {
try {
murlRequest.EndGetResponse(iar).Close();
multiPort.Post(EmptyValue.SharedInstance);
} catch (Exception e) {
multiPort.Post(e);
}
}, null);
}
yield return Arbiter.MultipleItemReceive(multiPort, mc.Count,
delegate(ICollection<EmptyValue> wr, ICollection<Exception> ex) {
Console.WriteLine(wr.Count.ToString() + " succeeded. " + ex.Count.ToString() + " failed.");
});

yield break;
}
}

The bulk of the work is done within the iterator. It only yields two tasks, the first of which is a Choice, that detects whether the initial page fetch worked at all. If it didn't, we have no WebResponse and we yield break, which exits the iterator. If we did get a response then we parse the page for the MSDN blogs and then for each match we issue another asynchronous web request concurrently. Each successful response results in an EmptyValue signal, each failure an exception. Finally we yield to a MultipleItemReceive and output how many of these requests succeeded and how many failed.


This highly I/O-bound iterator only consumes threads for a tiny fraction of its elapsed time, mostly when doing the initial page parse and yet I still got to code it up in a basically sequential fashion.


I'll add a few further notes to this is a subsequent post.


1 comments:

  1. Eric E said,

    This is a great series you have put together. I have read through your posts and the CCR has become clearer to me. I have a scenario though and was wondering which post would be a better fit.

    Basically I want to validate a collection of AddressToCheck objects. I would like to fire off the validations async and when all are done collect the resulting ValidAddress objects and return them to the client. In post 4 you talk about 'Scatter/Gather', is that the best example? I can't figure out how to return the collection to the main thread so that I can display it (for example in a windows form app). Or would the example in post 9 be a better fit using the iterators?

    Thanks for the great info.

    Eric

    on June 7, 2009 at 8:02 AM