TPL Dataflow–First Tests


Introduction

Over the last couple of weeks, i attended Tech Ed Australia. The sessions by Joseph Albahari on what is coming C# 4 and available in the Async CTP now, has finally spurred me to go exploring.

What I wanted to achieve

I set out to start creating some simple examples of data flow networks, which employ the asynchronous elements of the framework. The key elements of my testing, and explorations where:

  • As is my want, I always do my exploring with console applications, rather than more elaborate UI’s. This put me in bit of a bind as most of the examples I would locate were things with Winforms or WPF / ZAML  UI’s. I’d rather not spend time mucking about with the UI.
  • Having a UI thread, makes some of the async examples far simpler, as there is a tread which wants to keep running until the application is closed. This makes doing thing with async CTP data flow much simpler.
  • Most of the examples used external things like web pages to get the async happening. I wanted to just have a data driven, from within the program, approach.

Where Did I Get

Example 1

private static void Test1()
{
    Action<int> fred = (i) =>
    {
        int j = i + 1;
        Debug.WriteLine(j);
    };
    var a = new ActionBlock<int>(fred);
    a.Post(1);
    a.Post(2);
    a.Post(3);
    Debug.WriteLine("Test 1 Done");
}

This seems to work, but there is a hidden fault in the code (more on  that later).

On the up side, getting a simple ActionBlock and posting to it may seem trivial, but proves things are installed correctly

Example 2

private static void Test3()
{
    var actor = new ActionBlock<int>((i) =>
    {
        Debug.WriteLine(i);
    });
    var trans1 = new TransformBlock<int, int>((i) =>
    {
        return i * 2;
    }
    );
    trans1.LinkTo(actor);
    for (int i = 1; i < 10; i++)
    {
        trans1.Post(i);
    }
    Debug.WriteLine("Done Test 3");
}

This is an example of wiring two async processing blocks together (the LinkTo in the code).  Again, trivial but does prove the point that I am on the right track. Again, there is an error in here, which does not become obvious in a trivial example.

Example 3

private static void Test4()
{
    var actor = new ActionBlock<int>((i) =>
    {
        Debug.WriteLine(
            string.Format("Action {0}", i));
    });
    var trans1 = new TransformBlock<int, int>((i) =>
    {
        int res = i * 2;
        Debug.WriteLine(
            String.Format("Transform of {0} to {1} Done", i, res));
        return res;
    }
    );
    trans1.LinkTo(actor);
    for (int i = 1; i < 10; i++)
    {
        if (trans1.Post(i))
            Debug.WriteLine(
                String.Format("Post of {0} Succeeded", i));
        else Debug.WriteLine(
                String.Format("Post of {0} Not Accepted", i));
    }
    Debug.WriteLine("Done Test 4");
}

This is very much like Example 2, but provides nicer output. The benefit of the output is the you can see that things are happening asynchronously.

Example 4

private static BroadcastBlock<int> Test5()
{
    int factor1 = 2;
    var trans1 = new TransformBlock<int, int>((i) =>
    {
        int res = i * factor1;
        Debug.WriteLine(
            String.Format("1 Transform of {0} to {1} Done", i, res));
        return res;
    }
    );
    int factor2 = 3;
    var trans2 = new TransformBlock<int, int>((i) =>
    {
        int res = i * factor2;
        Debug.WriteLine(
            String.Format("2 Transform of {0} to {1} Done", i, res));
        return res;
    }
    );
    var actor1 = new ActionBlock<int>((i) =>
    {
        Debug.WriteLine(
            string.Format("1 Action {0}", i));
    });
    var actor2 = new ActionBlock<int>((i) =>
    {
        Debug.WriteLine(
            string.Format("2 Action {0}", i));
    });
    trans1.LinkTo(actor1);
    trans2.LinkTo(actor2);
    BroadcastBlock<int> bcBlock =
        new BroadcastBlock<int>((i) =>
        {
            return i;
        });
    bcBlock.LinkTo(trans1);
    bcBlock.LinkTo(trans2);
    return bcBlock;
}

This is the part which builds the data flow network, which is getting more elaborate. There are five elements which are linked together to provide a more interesting network.

private static void Test5_Main()
{
    BroadcastBlock<int> bcBlock = Test5();
    for (int i = 0; i <= 10; i++)
    {
        if (bcBlock.Post(i))
            Debug.WriteLine(
                String.Format("Post of {0} Succeeded", i));
        else Debug.WriteLine(
                String.Format("Post of {0} Not Accepted", i));
    }
    bcBlock.Complete();
    while (bcBlock.Completion.IsCompleted == false)
    {
        Debug.WriteLine("Thread Sleeping");
        Thread.Sleep(1000);
    }
    Debug.WriteLine("Done");
    return;
}

This is the main for this example. There are a couple of key point, and a solution to the problem (or bug) I mentioned above. The bug was that sometimes the networks were not getting enough time to complete before the main thread was exiting. There result was that sometimes the output would be incomplete.  There key elements are:

  • Posting the data into the network.
  • The Broadcast block sends that data into both of the two TransformBlock and ActionBlock chains.
  • The BroadcastBlock Complete, tells the BroadcastBlock that there will be no more inputs, and when you are empty, to mark yourself “Completed”.
  • The while loop keeps the main thread running, while the data passes through the network.

Conclusions

  • There are simple to the point of trivial examples, but a place to start.
  • The Async CTP is something which I will continue to play with. The construction of these examples has been an interesting learning journey.
  • I have some ideas for more elaborate networks, and simulations of networks of relationships.
  • Future posts could start to explore more of what is available from the Async Data Flow CPT.
Advertisements

, , , , ,

  1. #1 by aussiecraig on September 15, 2011 - 9:02 pm

    The use of Thread.Sleep in the post is not the right way to keep the main thread open. See my subsequent post
    https://craigwatson1962.wordpress.com/2011/09/15/keeping-the-main-thread-running-with-the-async-ctp-dataflow/
    for a better way of achieving this.

    Craig

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

%d bloggers like this: