Archive for September, 2011

Keeping The Main Thread Running with the Async CTP Dataflow


Introduction

The previous post presented some code for keeping the main thread of a console application open when the dataflow was being processed. This code I believe is down right wrong, or at least not the best way to achieve the desired result.

The wrong code

private static Task Test6_Main_Wrong()
{
    BroadcastBlock<int> bcBlock = Test6();
    for (int i = 0; i <= 100; i++)
    {
        if (bcBlock.Post(i))
            Debug.WriteLine(
                String.Format("Post of {0} Succeeded", i));
        else Debug.WriteLine(
                String.Format("Post of {0} Not Accepted", i));
    }
    bcBlock.Complete();
    while (bcBlock.Completion.IsCompleted == false)
    {
        Debug.WriteLine("Thread Sleeping");
        Thread.Sleep(1000);
    }
    Debug.WriteLine("Done");
    return bcBlock.Completion;
}

The use of the sleep is what is wrong here. It is an arbitrary value for the amount of the data flow network will take to complete.

The Right Code

private static Task Test6_Main()
{
    BroadcastBlock<int> bcBlock = Test6();
    for (int i = 0; i <= 100; i++)
    {
        if (bcBlock.Post(i))
            Debug.WriteLine(
                String.Format("Post of {0} Succeeded", i));
        else Debug.WriteLine(
                String.Format("Post of {0} Not Accepted", i));
    }
    bcBlock.Complete();
    Debug.WriteLine("Done");
    return bcBlock.Completion;
}

And the main function invocation of the function is:

Test6_Main().Wait();

Key Points:

  • Returning the Task from the function to the main. The Task Object has all the plumbing to know about competition of the task, the framework does all of the heavy lifting here.
  • The Wait method keeps the main thread open while a data flow network of any size will complete.

Conclusions

  • Using Thread.Sleep is bad in Async Dataflow applications.
  • Using Task.Wait looks like a much better way to achieve what I was after.
  • There are other occasions where I have used Sleep. I have solved one of these cases in a much better way, and I will post that example shortly.
Advertisements

, , , ,

Leave a comment

TPL Dataflow–First Tests


Introduction

Over the last couple of weeks, i attended Tech Ed Australia. The sessions by Joseph Albahari on what is coming C# 4 and available in the Async CTP now, has finally spurred me to go exploring.

What I wanted to achieve

I set out to start creating some simple examples of data flow networks, which employ the asynchronous elements of the framework. The key elements of my testing, and explorations where:

  • As is my want, I always do my exploring with console applications, rather than more elaborate UI’s. This put me in bit of a bind as most of the examples I would locate were things with Winforms or WPF / ZAML  UI’s. I’d rather not spend time mucking about with the UI.
  • Having a UI thread, makes some of the async examples far simpler, as there is a tread which wants to keep running until the application is closed. This makes doing thing with async CTP data flow much simpler.
  • Most of the examples used external things like web pages to get the async happening. I wanted to just have a data driven, from within the program, approach.

Where Did I Get

Example 1

private static void Test1()
{
    Action<int> fred = (i) =>
    {
        int j = i + 1;
        Debug.WriteLine(j);
    };
    var a = new ActionBlock<int>(fred);
    a.Post(1);
    a.Post(2);
    a.Post(3);
    Debug.WriteLine("Test 1 Done");
}

This seems to work, but there is a hidden fault in the code (more on  that later).

On the up side, getting a simple ActionBlock and posting to it may seem trivial, but proves things are installed correctly

Example 2

private static void Test3()
{
    var actor = new ActionBlock<int>((i) =>
    {
        Debug.WriteLine(i);
    });
    var trans1 = new TransformBlock<int, int>((i) =>
    {
        return i * 2;
    }
    );
    trans1.LinkTo(actor);
    for (int i = 1; i < 10; i++)
    {
        trans1.Post(i);
    }
    Debug.WriteLine("Done Test 3");
}

This is an example of wiring two async processing blocks together (the LinkTo in the code).  Again, trivial but does prove the point that I am on the right track. Again, there is an error in here, which does not become obvious in a trivial example.

Example 3

private static void Test4()
{
    var actor = new ActionBlock<int>((i) =>
    {
        Debug.WriteLine(
            string.Format("Action {0}", i));
    });
    var trans1 = new TransformBlock<int, int>((i) =>
    {
        int res = i * 2;
        Debug.WriteLine(
            String.Format("Transform of {0} to {1} Done", i, res));
        return res;
    }
    );
    trans1.LinkTo(actor);
    for (int i = 1; i < 10; i++)
    {
        if (trans1.Post(i))
            Debug.WriteLine(
                String.Format("Post of {0} Succeeded", i));
        else Debug.WriteLine(
                String.Format("Post of {0} Not Accepted", i));
    }
    Debug.WriteLine("Done Test 4");
}

This is very much like Example 2, but provides nicer output. The benefit of the output is the you can see that things are happening asynchronously.

Example 4

private static BroadcastBlock<int> Test5()
{
    int factor1 = 2;
    var trans1 = new TransformBlock<int, int>((i) =>
    {
        int res = i * factor1;
        Debug.WriteLine(
            String.Format("1 Transform of {0} to {1} Done", i, res));
        return res;
    }
    );
    int factor2 = 3;
    var trans2 = new TransformBlock<int, int>((i) =>
    {
        int res = i * factor2;
        Debug.WriteLine(
            String.Format("2 Transform of {0} to {1} Done", i, res));
        return res;
    }
    );
    var actor1 = new ActionBlock<int>((i) =>
    {
        Debug.WriteLine(
            string.Format("1 Action {0}", i));
    });
    var actor2 = new ActionBlock<int>((i) =>
    {
        Debug.WriteLine(
            string.Format("2 Action {0}", i));
    });
    trans1.LinkTo(actor1);
    trans2.LinkTo(actor2);
    BroadcastBlock<int> bcBlock =
        new BroadcastBlock<int>((i) =>
        {
            return i;
        });
    bcBlock.LinkTo(trans1);
    bcBlock.LinkTo(trans2);
    return bcBlock;
}

This is the part which builds the data flow network, which is getting more elaborate. There are five elements which are linked together to provide a more interesting network.

private static void Test5_Main()
{
    BroadcastBlock<int> bcBlock = Test5();
    for (int i = 0; i <= 10; i++)
    {
        if (bcBlock.Post(i))
            Debug.WriteLine(
                String.Format("Post of {0} Succeeded", i));
        else Debug.WriteLine(
                String.Format("Post of {0} Not Accepted", i));
    }
    bcBlock.Complete();
    while (bcBlock.Completion.IsCompleted == false)
    {
        Debug.WriteLine("Thread Sleeping");
        Thread.Sleep(1000);
    }
    Debug.WriteLine("Done");
    return;
}

This is the main for this example. There are a couple of key point, and a solution to the problem (or bug) I mentioned above. The bug was that sometimes the networks were not getting enough time to complete before the main thread was exiting. There result was that sometimes the output would be incomplete.  There key elements are:

  • Posting the data into the network.
  • The Broadcast block sends that data into both of the two TransformBlock and ActionBlock chains.
  • The BroadcastBlock Complete, tells the BroadcastBlock that there will be no more inputs, and when you are empty, to mark yourself “Completed”.
  • The while loop keeps the main thread running, while the data passes through the network.

Conclusions

  • There are simple to the point of trivial examples, but a place to start.
  • The Async CTP is something which I will continue to play with. The construction of these examples has been an interesting learning journey.
  • I have some ideas for more elaborate networks, and simulations of networks of relationships.
  • Future posts could start to explore more of what is available from the Async Data Flow CPT.

, , , , ,

1 Comment

%d bloggers like this: