Posts Tagged Implementation

LINQ Pivot, or Crosstab, Extension Method


Introduction

This blog post is a continuation of the series of LINQ Short takes. In this blog post, I present an implementation of a pivot, or crosstab, transformation in Linq. I have used an implementation as a Linq extension method (specifically an extension method to IEnumerable(Of T) Interface).

This blog post builds on a number of preceding posts:

The Problemimage

The pivot, or crosstab, transformation is notoriously difficult to achieve in SQL. The original definition of SQL used as foundations concepts from relational algebra and tuple relational calculus. Linq having some of its root in SQL, shares the difficulty with SQL formulation of a pivot, or crosstab, transformation. Fortunately, there are enough of the procedural, and functional, programming aspects of C# that are included into the Linq design and implementation. The aspects of functional and procedural, programming present in Linq make expressing a generic solution to the pivot, or crosstab, transformation possible.

The following diagram attempts to show what the pivot transformation does to the data. At its core, the pivot transformation is just the swapping of the columns to rows, with the data following the transposition of the row and column vector. The difficulty comes in expressing this in a generic way.

 

The Solution – Design

The following are the design considerations and design criteria. These design criterions, then informed the implementation of the pivot transformation. These were the mandatory criteria.

  • The implementation must be an extension method (see MSDN documentation: Extension Methods (C# Programming Guide)for further details). This allows the implementation of Pivot to be applied to the just like any other Linq transformation.
  • The extension method must use a generic type parameter (see MSDN documentation: Generics (C# Programming Guide)) for the object type being manipulated. This allows the broadest range of objects as inputs to the pivot transform.
  • The extension method must be an extension to IEnumerable(Of T) Interface. This is the other half, to a broadest application criterion, for the pivot transformation. This criterion allows application of the pivot transformation to the broadest range of collections.
  • The return type of the pivot transformation must be as generic as possible. This implies using the IEnumerable(Of T) Interface and a generic type parameter.
  • The return type should preserve the type of object from the input sequence to the transformation. Failure to do this would mean that the pivot transformation is arbitrarily changing the input data. It is my belief that this would break the Linq transformation model the .Net Framework provides. Failure to adhere to this criterion would make the transformation far less usable in general.
  • The implementation must be correct. I define correctness, as the pivot transformation should result in the generally accepted output of a pivot transformation.
  • The implantation of the pivot transformation should be efficient. Here I mean that it should be relatively quick for reasonable volumes of data. I did not want an implementation that showed up as a performance hot spot in any program that used the function.

The following were the desirable design criteria:

  • The implementation should be reasonable robust in dealing with ragged data. What I mean by ragged, or jagged, data is that not all rows of data have the same number of columns. This may be potentially difficult, and expensive, design criterion to address.
  • The implementation should be able to cater for null objects in the input, transferring the null into the output.

The Solution – Implementation

The following is the method signature of the implementation.

public static IEnumerable<IEnumerable<TSource>> Pivot(
            this IEnumerable<IEnumerable<TSource>> pivotSource)

The Pivot<TSource> element of the function signature addresses the following design criteria:

  • This introduces the generic type argument to the method. This then addresses ability to process any type of object with the transformation.
  • The return type of IEnumerable<ienumerable<TSource>> addresses the following elements of the design criteria:
  • The type argument TSource to the IEnumerable<IEnumerable<>> addresses the requirement to return a generic output.
  • The use of the IEnumerable(Of T) Interface addresses the requirement to support the broadest range of collections.

The signature element this IEnumerable<ienumerable> pivotSource argument to the method addresses a number of the design criteria. These design criteria are:

  • The use of this as the preface to the first argument in the argument list addresses, in part, the requirement for the implementation to be an extension method.
  • The use of the IEnumerable(Of T) Interface address the requirement for the implementation to process the broadest range of collections.
  • The use of the TSource type parameter enables the use of the method against the broadest range of objects possible.

In combination, the complete function signature addresses the following design criterion:

  • The function signature addresses the requirement for type stability, or not arbitrarily change the type of the object processed, implemented through the usage of the same type argument TSource as part of the definition of the input and return types.

The Source Code For The Pivot Implementation

Below is the source code for the implementation of the pivot extension method.

The bulk of the code is devoted to the checking of the argument. These argument checks protect the implementation from invalid argument that could cause exceptions from the underlying Linq implementation.

In the following

public static IEnumerable<IEnumerable<TSource>> Pivot<TSource>(
            this IEnumerable<IEnumerable> pivotSource)
{
    // Validation of the input arguments, and structure of those arguments. if (Object.ReferenceEquals(pivotSource, null))
        throw new ArgumentNullException("pivotSource",
            "The source IEnumerable cannot be null.");
    if (pivotSource.Count( ) == 0)
        throw new ArgumentOutOfRangeException("pivotSource",
            "The outer IEnumerable cannot be an empty sequence");
    if (pivotSource.Any(A => Object.Equals(A, null)))
        throw new ArgumentOutOfRangeException("pivotSource",
            "None of any inner IEnumerables in pivotSource can be null");
    if (pivotSource.All(A => A.Count( ) == 0))
        throw new ArgumentOutOfRangeException("pivotSource",
            "All of the input inner sequences have no columns of data.");
    // Get the row lengths to check if the data needs squaring out int maxRowLen = pivotSource.Select(a => a.Count( )).Max( );
    int minRowLen = pivotSource.Select(a => a.Count( )).Min( );
    // Set up the input to the Pivot IEnumerable<IEnumerable> squared = pivotSource;
    // If a square out is required if (maxRowLen != minRowLen)
        // Fill the tail of short rows with the default value for the type squared = pivotSource.Select(row =>
            row.Concat(
                Enumerable.Repeat(default(TSource), maxRowLen - row.Count( ))));
    // Perform the Pivot operation on squared out data var result = Enumerable.Range(0, maxRowLen)
    .Select((ColumnNumber) =>
    {
        return squared.SelectMany
            (row => row
                .Where((Column, ColumnPosition) =>
                    ColumnPosition == ColumnNumber)
            );
    });
    return result;
}

Key Features in the Implementation

The following features of the implementation are worthwhile highlighting. These include:

  • To manage ragged data the extension method makes all of the rows the same length (squares out the data). The square out process is done by the following method logic. The Concat Linq Extension Method appends the data required to fill out the input rows. The call to the Enumerable Repeat Method, builds the sequence that contains the number of elements required to fill out the row. The default(TSource) obtains the default value for the type of object in the sequence.
  • The implementation uses the Enumerable.Range Method to generate the sequence of column numbers. The sequence of column numbers is then used to drive the transformation of each of the columns in the pivotSource argument inner IEnumerable into rows. I have written blog posts previously that describe using the Enumerable.Range Method (see: BLOG LINQ Short Takes – Number 1 – Enumerable.Range() )
  • The implementation uses an upper bound for the Enumerable.Range Method that is the longest row in the inner IEnumerable of the pivotSource argument. The code fragment which achieves the determination of the longest row is: ‘pivotSource.Select(a => a.Count( )).Max( )‘. The use of the longest row in the pivotSource, in part, addresses the design criterion requirement for the pivot implementation to deal appropriately with ragged data.
  • The implementation uses the Linq SelectMany extension method to flatten sequence of column values from each row of the input rows into a single IEnumerable for each output row.

Downloadable Versions of the code

The following URLs contain the source code, including the XML Documentation Comments, for the implementation of the Pivot Linq extension method. There are pdf and docx versions of the code. These are the only types of text (rich text) files that Word Press allows to be loaded to the web site.

How and Why the Pivot Transformation Works

The following sections describe the logical operations, and implementation details, of the pivot transformation. I have broken this into three segments, and Overview, The Square Out Process, and The Data Pivot Process.

Overview

The following diagram describes the broad logical flow of the implementation of the pivot extension method.

The validate arguments phase of the implementation is a relatively simply trying set of checks. The checks protect the remained of the implementation from inputs that would cause an exception to thrown. Since the implementation uses many Linq extension methods, these exceptions may originate from deep within the .Net Framework (see MSDN article: .NET Framework Conceptual Overview). Diagnosis of the reason why an exception was thrown, when thrown from within the .Net Framework can prove to be very difficult process. This would be even harder if the developer is using the pivot extension method supplied as of a class library (see MSDN article: Assemblies and the Global Assembly Cache (C# and Visual Basic)).

There are a couple of feature of the implementation of the input validation phase of the pivot transformation worthwhile noting. These features include:

  • Many of the validation tests use the Object.ReferenceEquals method. Using the ReferenceEquals avoids possibility that Object.Equals method and the == operator could have been overridden. What the overridden versions implement could be inappropriate for the argument testing implementation.
  • The argument validations use a Linq extension method worth mentioning. This is the check validates that there are no rows in the input that are object with a value of null. This check employs the Linq extension method ‘Any’ (see: Enumerable.Any(Of TSource) Method ). The Any Linq extension method is one of the Linq extension methods that I have overlooked in the past. Therefore, I make note of it, just in case the reader has not seen the ‘Any’ extension in action previously.
  • There is another argument validation that uses a Linq extension method that is worthy of mentioning. This is the argument validation that all of the rows in the input that are not zero elements in length. This check employs the Linq extension method ‘All’ (see: Enumerable.All(Of TSource) Method). The ‘All’ Linq extension method is another Linq extension methods that I have overlooked in the past. Therefore, I make note of ‘All’ Linq extension method here, just in case the reader has not seen it in action previously.

Pivot Transform Top Level Processes

image

There are two processes in the implementation that will describe in further depth. These are:

  • I will describe in more detail the Linq process of squaring out the input data. The square out process makes all of the rows in the inner IEnumerable the same length. Only ragged input data has this transformation applied.
  • I will also describe in more detail the Linq process of performing the pivot transformation. The only input to the pivot transformation is the squared out data. The output from the pivot transform is then the return value of the extension method.

The Square Out Process

The following diagram illustrates the transformation to the row from the input to form the squared output. The transformation actually uses the default keyword (see the MSDN article: default Keyword in Generic Code (C# Programming Guide) for further details) to obtain the fill value used in the square out process.

image

The flowing diagram describes (attempts to) the logical process flow of the Square Out process.

The Components of the Square Out Process

image

The Code for the Square Out Process

The following is the code used in the pivot transform to generate the squared out data.

// Get the row lengths to check if the data needs squaring out int maxRowLen = pivotSource.Select(a => a.Count( )).Max( );
int minRowLen = pivotSource.Select(a => a.Count( )).Min( );
// Set up the input to the Pivot IEnumerable<IEnumerable<TSource>> squared = pivotSource;
// If a square out is required if (maxRowLen != minRowLen)
    // Fill the tail of short rows with the default value for the type squared = pivotSource.Select(row =>
        row.Concat(
            Enumerable.Repeat(default(TSource), maxRowLen - row.Count( ))));

The Square Out Process in Prose

There are only a couple of steps it the square out process. These steps are:

  1. Get the minimum and maximum column count from the input sequence. These are the Count().Min() and Count().Max().
  2. If the column count minimum value and column count maximum value, are not the same. Then data needs squaring out, and proceeds into the square out process. Otherwise, the data is already square and the data proceeds to the pivot transformation.
  3. For each row in the input sequence. The code: pivotSource.Select(row =>
  4. Concatenate onto the row, for output, the required filler. The Enumerable.Concat() call for each of the rows, does this.
  5. Generate the correct number, and correct type, of filler elements. The code:Enumerable.Repeat(default(TSource), maxRowLen – row.Count( )).

The Data Pivot Process

The diagram Linq Pivot Transformation (blow) tries to express visually what the Linq statement that implements the Pivot does. I hope that the readers of this blog post find the diagram helps understand how the pivot transformation works.

image

The Code For the Pivot Transformation

// Perform the Pivot operation on squared out data var result = Enumerable.Range(0, maxRowLen)
.Select((ColumnNumber) =>
{
    return squared.SelectMany
        (row => row
            .Where((Column, ColumnPosition) =>
                ColumnPosition == ColumnNumber)
        );
});
return result;

The Pivot Process Logical Process

The keys to the operation of the pivot process are the way use of the following:

  • The use of the Enumerable.Range() that is used to generate the sequence of columns numbers. The pivot transformation is one that takes each of the columns of the input sequence. Then the transformation makes a row from that set of values that then forms the result.
  • The SelectMany() which squashes the set of values from each column, into a row for the output.
  • The Where(value, column) variant that exposed the column number. This allows the selection of the correct column in the row.

Future Versions or Enhancements

The following are the potential enhancements I may make to the Pivot Linq Extension method.

  • As I describe below, the question ‘Should the square out process be optional within the pivot?’, I have yet to satisfactorily resolve. The enhancement to enable this would be to include a Boolean argument to the Pivot method, with a default value of true. It is my belief that most of the use cases for the pivot transformation would require squared out data. Nevertheless, I cannot completely dismiss the possibility that there could be use cases where the output of a pivot without squared out data is required.
  • In addition, as described below, I still have the question ‘Should the square out process be a separate Linq extension method?” unresolved. Refactoring the pivot method to make the square out process a separate Linq extension method, or a private support method within the library, I will leave until sometime in the future.

Making an IEnumerable<IEnumerable<of Source Type>>

Introduction

It has become a concern to me that I have been using the generic data structure IEnumerable<ienumerable>, but have omitted any hints, or code examples, on how to build such a structure. This section will address that concern and present some of the ways I have used to create this type of data structure. Much of what I will present is from my testing unit of the pivot transformation. Although these will be short code snippets, I trust that they will give the reader, some clues for creating this type of data structure.

Code Examples

I have decided to include a link to the code examples in pdf, and docx format. The pasting of the code for the examples in here would only increase the size of this post, which I believe is getting too big anyway.

Summary and Conclusions

There are many different ways to achieve the formation of an IEnumerable<ienumerable> structure. The above links present two of the basic strategies:

  • The first set of examples shows an approach to achieve the structure that centre around using an array, and the IEnumerable interface that the System.Array class presents.
  • There is a second set of examples included above. These links show the approach of implementing the IEnumerable interface on a user defined class. Object instances, within an array definition, achieve the required structure.

As an aside, these objects also build random test data for a variety of data types. These classes built the test data for the pivot process. The inheritance hierarchy allowed creation of ragged data.

Conclusions and Observations

There are a number of areas which are worthy of noting in my final remarks.

The Pivot and Square Out Processes

I am still to reach satisfactory conclusions to the following questions:

  • Should the square out operation within the Pivot process be optional? As I note below, the Pivot on ragged data without the square out process, gives a results that, I would describe as unacceptable (or plain ‘wrong’). This conclusion is strictly a matter of one man’s, that man being I, opinion. There may be use cases where the type of results of a pivot transformation on unpadded (squared out) data is what the caller of the method requires. This is one of the many eternal dilemmas that library designer consistently wrestle with, and ruminate upon. The question, put succinctly, for the library designer to resolve is, ‘Does this design decision precluding some real use cases?’ The normal response to this dilemma is to make the option an argument option in the method signature and allow the user to make the choice between the behaviours. Fortunately, C# provides optional arguments with a default value, which provides one resolution to the dilemma. The library designer can provide the library user with the option in the method signature, and provide some guidance by using an optional argument, with the default value that satisfies the expected normal, and most common, use case. Before I publish this blog post, I may yet, make switching on the square out process an optional Boolean argument, with a default value of true.
  • Is the square out transformation useful as a standalone Linq extension method? Here again I am uncertain that there are use cases where this would be needed. In this case, I doubt that I will refactor out the square out process into a new extension method any time soon. I will wait and see if I find a need for the square out process as separate Linq extension method.

Conclusions, and Observations, from the Design and Development Process

During the writing of this blog post, I have modified the code for the Pivot extension method a number of times. There have been two major sources for the modifications.

  • One source of modifications was the building a set of unit tests (see MSDN article: Verifying Code by Using Unit Tests) for Pivot extension method. The process of designing, and implementing, the unit tests made me think more clearly on the validation of input arguments.
  • Another source of modifications was writing this blog post. Writing this article made me think more clearly about ragged data as an input.

The most notable modifications were:

  • The tightening up, and greater clarity, in the validation checks performed on the input data sequence.
  • The addition of the square out process came from the thinking more clearly, and deeply, on the processing of ragged input data. Specifically, I realised that what the transformation resulted in when processing ragged data was unacceptable. The output from a ragged input would have broken the rows swapped to columns conceptual model of the transform. The pivot transformation applied to input data without the square out resulted in data from row ending up in the wrong columns. This realisation resulted in the implementation of the square out process.

The result of these modifications is a ‘better’ implementation of the pivot transformation. The formulation of the input argument is clearer. The output from the extension method is significantly more robust. This increase in robustness comes from the introduction of the square out process that addresses the problems that ragged input data brought.

Observations from the Implementation

There were a number of ‘discoveries’ (they were always there, but I found them and had a use for them) in the standard Linq extension methods. These Linq extension methods included:

The conclusion and observation I would draw, is that no matter how well you think you know the .Net Framework library, there is still more to useful classes and methods to find. Microsoft unremittingly is adding to my ‘voyage of discovery’ through the .Net Framework. Microsoft keeps C# .Net environment relevant to the emerging challenges in the IT industry, by adding new features to the .Net Framework library, and the C# language, with every new version of the C# .Net environment. For me, this only adds to the ‘fun’ of using C# .Net, there seems that there is always something new to learn how to use effectively.

Advertisements

, , , , , , , , , , , , ,

Leave a comment

Parallel Load CSV Data Using SqlBulkCopy


Preface

I have been trying do decide whether to split the following content over more than one post, or just as one big post. I have decide to post the all of the content in one blog post. As a result this blog post will contain:

  • The key technologies which support the implementation.
  • The discussions of the implementation. This will include as many illustrations as I can muster, and text.
  • The key features of the implementation,
  • The complete source code for the implementation.

My rational for posting this as a single article, is that one post is easier to read. This is probably more of a technical article, rather than a blog pots. It is my belief that blog posts tend to be briefer in length than this will end up. I hate articles that go over multiple pages, you just settle in to reading on the topic, and are reading and then you have to go and click the link to get to the next page. This strategy probably works well for sites where the site generate income for each add shown, and generating page views equals more revenue.

There are no “hard and fast” rules for blogging. Apart from one, which is the “golden rule”, that rule is “give the reader what they want”. If you came here to read about how to load in parallel to Sql Server using SqlBulkCopy, then that is what your going to receive.

Introduction

This article presents an implementation of parallel loading to Sql Server using the SqlBulkCopy class. This implementation make use of some of the new feature in C# 4.0 and the .Net 4.0 Framework.

Implementation Objectives

There were a number of objectives I had in mind when I started to develop this implementation. These objectives included:

  • Refreshing an implementation which I had made a while ago (before C# 4.0) which used System.Threading.Thread as the vehicle to parallelise loading of data to SQL Server.
  • Trying to smooth out the IO waits that a load process incurs, with more effective caching of data. The IO waits which a load process incurs come in from two sources, the first is one this implementation attempts to address, the second is one which it cannot. These two sources of IO waits are:
    1) The reading of the input CSV files from disk, be that local or network.
    2) The IO overhead communicating with SQL Server.
  • Using Parallel LINQ to make the data loading process happen in parallel. The load process can be viewed as an arbitrary set of files which need to be loaded into a symmetrical set of table. Having an arbitrary set of files (file names in this case), lends itself letting Parallel LINQ data partition the iteration over that set of file names into as many Tasks as the machine can support. This maybe, not what the designers of Parallel LINQ had in mind when they built it, but it has worked for me.
  • Using the Consumer Producer Pattern, and the BlockingCollection Class to implement the “pipe” between the separate tasks which makes up the implementation.
  • Using Task Class, which is new in .Net 4.0 Framework. The idea behind using the Task Class, and using Parallel LINQ, was to let the .Net Framework manage some of the tasks created (those it does under the covers to support Parallel LINQ), scheduling of all of the tasks, and all of the management of the Tasks. Generally, I was trying to produce, and think I have succeeded, in producing a parallel loading implementation without “customary” thread management code.

Solution Architecture Overviewimage

The architecture of the implementation can be broken to a couple of parts. These parts are:

  • The load pipeline. This process performs the following steps:
    • Reads the file names for the CSV files to be loaded,
    • Passes them into a “pipeline” (BlockingCollection),
    • A parallel LINQ statement reads the “pipeline”, and spawns load processes,
    • The load processes report the load result back to the output “pipeline”,
    • The output pipeline is then read to completion, and the load results reported.
  • The file read and parse into fields pipeline, which is as follows: image
    • Create a Task to read the file
      • Read a line from the file,
      • Parse the line into tokens,
      • Stick the tokens in a list
      • Put the list if tokens into the pipeline,
    • In the parent task,
      • On NextResult,
        • Take a list of token from the pipeline

Why SqlBulkCopy?

To quote the MSDN “Lets you efficiently bulk load a SQL Server table with data from another source.”. Simply put, for inserting data into a SQL Server table, the SqlBulkCopy class is the only way to go. It is the fastest way to load data, significantly faster than bound insert statements.

The “nuts and bolts” of the solution

There are a number of significant pieces of .Net 4.0 technology which this implementation rests upon. In the following I highlight some of the significant bits, and why I’ve used them in the solution.

I have chosen to take a  technology first approach to solution. By highlighting the technology elements used in the solution, hopefully I will provide enough information to the reader to understand some of the design decisions which contributed the shape of the current implementation.

A Delightful Parallel Problem

The process of loading one file into one SQL Table, is one of those “delightful parallel” problems. A “delightful parallel” problem is one which lends itself to a parallel implementation, with a minimum of “fuss”. The processes of loading one CSV file into one SQL Table represent a “delightful parallel problem”, because of the following properties of the processes:

  • The resources used along the process chain are used exclusively by that process chain (one unique file and one unique table); hence there are no contentions for exclusive resources along the process chain. There are resource contentions in the process, Input Output operations, network bandwidth, and memory usage and caching, being the obvious resource contenders. With the “right” implementation one can minimise the impact of those contentions.
  • There is no communications between loading Table A and loading Table B. There is nothing which shared between different loads; they are independent data operations. This independence of operation goes to making the load processes ideal for “rafting off” onto separate threads.
  • The state of a load process, on completion of that load process, only needs to be communicated to the parent thread. This implementation uses as pass the results of the load process back to the parent strategy. If a log file was used, you could do away with the passing of the results to the parent as well.

The “invisible” technologies

It may seem strange to write about the “invisible” technologies, but there are a number of built-in bits of the C# 4.0, and .Net 4.0 Framework, which  this implementation simply uses The underlying infrastructure of the language and framework really surface as identifiable “bits”, but are very note worthy.

Task Propagation through Parallel LINQ

This is a very appealing way to manage parallel processing, simply hand the problem off to Parallel LINQ to manage. The following code shows how I am doing just that. Parallel LINQ with start tasks, reuse task, and manage the number of active Tasks, as the “pipeline” is consumed. The construction of the CSVReaderLoad object, and invoking the LoadData method, leaves the Parallel LINQ infrastructure to start the processes.

// Read the file name pipe, and parallel call the CSV Reader Load class and the Data Load Method
var processes = Task.Factory.StartNew(() =>
{
    try
    {
        // A bit of dirty pool here, calling the constructor then the instance method in one go!
        // This ensures that there multiple instances of the class being used on multiple threads
        foreach (var loadResult in inputfiles.GetConsumingEnumerable().AsParallel().
            Select(fileName => new CSVReaderLoad().LoadData(fileName, connectString, rowBufferLength)))
        {
            processResults.Add(loadResult);
        }
    }
    finally
    {
        processResults.CompleteAdding();
    }
});

Thread Synchronisation a “Wait until Done” without System.Threading Objects

The following lines achieve quite a lot of work, all “under the covers”.

// This "pulls" all of the results back into on IEnumerable,
// there is some fancy thread synchronisation happening here (all under the covers).
var summary = (from res in processResults.GetConsumingEnumerable()
               select res).ToList();
foreach (var a in summary)
{
    Debug.WriteLine("{0} rows read: {1} rows returned: {2}", a.FileName, a.RowsLoaded, a.RowsReturned);
}

This one LINQ statement causes a “wait” on the blocking collection (processResults), until all of the threads have completed adding to the “pipe”.  All of the “nasty” System.Threading objects are buried in the implementations of the BlockingCollection Class. The “ToList()” is probably overkill, but when I was developing the code, I wanted to make sure that the execution of the LINQ statement happen immediately, rather than being deferred.

Thread Synchronisation – By Default, And With An Exception

There is one point in the implementation where there is synchronisation between two threads which are on opposite ends of one of the BlockingCollection “pipes”. This is because I’ve used the Take method to read the data rows out of the BlockingCollection. There is the possibility of the collection changing to completed between checking the IsCompleted property and while the Take method is waiting for a data row. This is a classic thread synchronisation situation. Fortunately, the BlockingCollection throws an exception when this happens, making dealing with the thread synchronisation issue relatively simple. The following code fragment shows the situation:

/// <summary>
/// Attempts to read a row from the pipe.
/// This is the "guts" of the read process.
/// The Take call is a blocking wait for a row from the pipe
/// </summary>
/// <returns>bool true: read a row, false: no rows left to read</returns>
private bool TryAndReadARow()
{
    if (!rows.IsCompleted)
    {
        // This is in essence the same as the MSDN example
        // see: http://msdn.microsoft.com/en-us/library/dd997371.aspx
        try
        {
            // This will blocking wait until a row is available
            rowData = rows.Take();
        }
        catch (InvalidOperationException)
        {
            if (rows.IsCompleted)
                return false;
            throw;
        }
        if (rowData != null)
        {
            //Debug.WriteLine("{0} Read A row OK", fileName);
            ++_rowsReturned;
            return true;
        }
        else
        {
            //Debug.WriteLine("{0} Did not read a row", fileName);
            return false;
        }
    }
    else
    {
        return false;
    }
}

Turning a File Read into an IEnumerable

This is not astounding, for those have used the yield return statement, but it is another example of things being done “under the covers” by the C# implementation for you. The compiler is doing “bunch” of work to manage state, so that the developer does not need to worry implementing a state machine. For this implementation, using the yield return statement made implementing the “pipe” between the CSV file being read, and returning a rows of data to the SqlBulkCopy easy. The code fragment follows:

/// <summary>
/// Function which converts the lines in the CSV file into an IEnumerable
/// Parses the line into a List of string tokens.
/// </summary>
/// <returns></returns>
private IEnumerable<List<string>> fileReader()
{
    while (true)
    {
        string line = strReader.ReadLine();
        if (line != null)
        {
            ++_rowsRead;
            yield return ParseLine(line);
        }
        else
        {
            break;
        }
    }
}

The Task Class

The implementation does make use of the Task class, and the Task.Factory.StartNew method, quite a bit. The Task class appears at the points where I want to separate the execution of the load process into logical partitions. These logical partitions are the “independent” parts of the load process.

The BlockingCollection Class

The BlockingCollection is one of the key pieces of technology in the implementation. The blocking collection is an implementation of the Consumer Producer Pattern. The Consumer Producer Pattern is leveraged in a couple of key places in this implementation. The elements of the implementation where the Consumer Producer Pattern is implemented are:

  1. The listing of the CSV file names which are each “pipe” to load process.
  2. The results of each load process, which are “piped” to a reporting loop at the completion of the load processes.
  3. The reading of a line from a CSV file, which is “piped” to a List of strings (on string for each field in the CSV line).

The SqlBulkCopy Class

This class is very much a pillar upon which this implementation is built. There are a couple of features of the class which should be noted. These features include:

  • The ColumnMappings Property. This is an instance of the SqlBulkCopyColumnMappingCollection Class, and is needed. The current implementation just sets up a 1 to 1 mapping from the input columns in the CSV file to the SQL Table’s columns. It’s an annoying little requirement; I wish the implementation of SqlBulkCopy did it for itself. But, there has probably been religious wars somewhere about that sort of feature being included. My current implementation of the Column Mappings is:
  • // The bulk copy needs a column mapping.
    // CSV Column position = SQL Column Position
    int cols = ((IDataReader)reader).FieldCount;
    for(int i = 0; i <= cols; i++)
        loader.ColumnMappings.Add(i, i);
  • The Rows Copied notify feature. This is handy for debugging, and also for observing the concurrent behaviours of the implementation. Particularlly, post facto, as I send the notifications to the debug output window.

IDataReader Interface

This is the data provider which the SqlBulkCopy consumes for data. Creating an implementation of the IDataReader interface which reads the CSV data files, is one of the keys to making this solution work. There is not too much extra logic one needs to implement (the code for my implementation will follow).

It is a daunting interface when you first look at it. So many methods, you wonder it it’s worth the effort to implement. Luckily two things come to ones aid:

  1. Visual Studio’s implement the interface. Which generates all of the method and properties for you. Granted it is an implementation which throws NotImplementedException all over the place, but it saves a lot of typing.
  2. SqlBulkCopy only uses a couple of the methods on the interface. This is a “by observation” statement. The statement should be qualified by, “the current implementation in .Net Framework 4.0”.

The Implementation

The following is the the source code for the implementation.

Program.cs

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Diagnostics;

namespace ParallelLoader
{
    /// <summary>
    /// Program class
    /// </summary>
    class Program
    {
        /// <summary>
        /// String which is the directory to load from.
        /// Could be an argument, but this was simpler.
        /// </summary>
        const string loadDir =
@"C:\Users\Craig\Documents\Visual Studio 10\Projects\ParallelDataLoad\LoadData\Small";
        /// <summary>
        /// Connect string to the database which will be loaded to.
        /// </summary>
        const string connectString = @"Data Source=CRAIG-PC-1;Initial Catalog=LoadContainer;Integrated Security=True;Pooling=False";
        /// <summary>
        /// The main of the program.
        /// Just creates the parallel load driver class and calls the Load method.
        /// The load should probably return something like a run summary, but that will be a later
        /// enhancement.
        /// 
        /// args">Not currently used</param>
        static void Main(string[] args)
        {
            int RowBufferLength = 2;
            ParallelDriver driver = new ParallelDriver(loadDir, connectString, RowBufferLength);
            driver.Load();
        }
    }
}

ParallelDriver.cs

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Collections.Concurrent;
using System.Threading.Tasks;
using System.IO;
using System.Diagnostics;

namespace ParallelLoader
{
    /// <summary>
    /// The driver class which makes some of the parallel processing work.
    /// </summary>
    public class ParallelDriver
    {
        /// <summary>
        /// The directory to be loaded from
        /// </summary>
        private string loadDir;
        /// <summary>
        /// The connect string for database which will be loaded to.
        /// </summary>
        private string connectString;
        private int rowBufferLength;

        /// <summary>
        /// The constructor for the class.
        /// Just caches the two important input values.
        /// 
        /// loadDir">The directory which contains the CSV files to load.
        /// The CSV files must be named tablename.CSV.
        /// connectString">The connect string to the data base which contains the tables to be loaded to</param>
        public ParallelDriver(string loadDir, string connectString, int rowBufferLength)
        {
            // TODO: Complete member initialization
            this.loadDir = loadDir;
            this.connectString = connectString;
            this.rowBufferLength = rowBufferLength;
        }
        /// <summary>
        /// This is where the parallel fun starts.
        /// The BlockingCollection, and Task object are the parallel object which start the parallel processing going.
        /// </summary>
        internal void Load()
        {
            // "Pipe" which contains the file names to be processed
            var inputfiles = new BlockingCollection<string>();
            // "Pipe" which contains the results of the loads.
            var processResults = new BlockingCollection<SqlLoadResults>();
            // Start feeding the file name pipe.
            var readfileNames = Task.Factory.StartNew(() =>
            {
                try
                {
                    foreach (var filename in Directory.EnumerateFiles(loadDir, "*.CSV")) inputfiles.Add(filename);
                }
                finally { inputfiles.CompleteAdding(); }
            });
            // Read the file name pipe, and parallel call the CSV Reader Load class and the Data Load Method
            var processes = Task.Factory.StartNew(() =>
            {
                try
                {
                    // A bit of dirty pool here, calling the constructor then the instance method in one go!
                    // This ensures that there multiple instances of the class being used on multiple threads
                    foreach (var loadResult in inputfiles.GetConsumingEnumerable().AsParallel().
                        Select(fileName => new CSVReaderLoad().LoadData(fileName, connectString, rowBufferLength)))
                    {
                        processResults.Add(loadResult);
                    }
                }
                finally
                {
                    processResults.CompleteAdding();
                }
            });
            // This "pulls" all of the results back into on IEnumerable,
            // there is some fancy thread synchronisation happening here (all under the covers).
            var summary = (from res in processResults.GetConsumingEnumerable()
                           select res).ToList();
            foreach (var a in summary)
            {
                Debug.WriteLine("{0} rows read: {1} rows returned: {2}", a.FileName, a.RowsLoaded, a.RowsReturned);
            }

        }
    }
}

CSVReaderLoad.cs

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Data.SqlClient;
using System.Diagnostics;
using System.IO;
using System.Data;

namespace ParallelLoader
{
    /// <summary>
    /// This the class which  does the SQL Bulk Copy operation
    /// </summary>
    internal class CSVReaderLoad
    {
        /// <summary>
        /// File name to be loaded
        /// </summary>
        private string _filename;
        /// <summary>
        /// Constructor, which just saves the file to load
        /// </summary>
        /// <param name="filename">full path to the file to load</param>
        public CSVReaderLoad(string filename)
        {
            // TODO: Complete member initialization
            this._filename = filename;
        }
        /// <summary>
        /// Method which does the SQL Bulk Copy
        /// </summary>
        /// <param name="fileName">CSV Load file name, can need the path as well
        /// connectString">Connect string to the SQL Server database to load to
        /// rowBufferLength">The size of the "pipe" which buffers the read rows.</param>
        /// <returns>The results of the load operation</returns>
        public SqlLoadResults LoadData(string fileName, string connectString, int rowBufferLength)
        {
            _filename = fileName;
            // Pull the table name from the CSV file name
            string tableName = ParseFileName(fileName);
            DateTime start = DateTime.Now;
            // High accuracy time measurement
            Stopwatch sw = new Stopwatch();
            sw.Start();
            // Build the class which reads the CSV data.
            CSVDataReader reader = new CSVDataReader(fileName, rowBufferLength);
            // Connect to the db
            using (SqlConnection con = new SqlConnection(connectString))
            {
                try
                {
                    con.Open();
                    // Put the operation into a transaction, means that can commit at the end.
                    SqlTransaction tx = con.BeginTransaction();
                    using (SqlBulkCopy loader = new SqlBulkCopy(con, SqlBulkCopyOptions.Default, tx))
                    {
                        try
                        {
                            loader.DestinationTableName = tableName;
                            // Progress notification - nice to have when testing
                            loader.SqlRowsCopied += new SqlRowsCopiedEventHandler(loader_SqlRowsCopied);
                            loader.NotifyAfter = 20;
                            // The bulk copy needs a column mapping.
                            // CSV Column position = SQL Column Position
                            int cols = ((IDataReader)reader).FieldCount;
                            for(int i = 0; i <= cols; i++)
                                loader.ColumnMappings.Add(i, i);
                            // Does the load
                            loader.WriteToServer(reader);
                            tx.Commit();
                            loader.Close();
                        }
                        catch (Exception ex)
                        {
                            Debug.WriteLine(ex);
                        }
                        finally
                        {
                        }
                    }
                }
                catch (Exception ex)
                {
                    Debug.WriteLine(ex);
                }
                finally
                {
                    con.Close();
                }
            }
            DateTime end = DateTime.Now;
            sw.Stop();
            // Return the load summary object
            return new SqlLoadResults(
                fileName, start, end, reader.RowsRead, 0L,
                sw.ElapsedMilliseconds, reader.RowsReturned);
        }

        /// <summary>
        /// Notification call back for this load
        /// </summary>
        /// <param name="sender">Ignored</param>
        /// <param name="e">Contains Rows Copied</param>
        void loader_SqlRowsCopied(object sender, SqlRowsCopiedEventArgs e)
        {
            Debug.WriteLine("FileName = {0}, Rows {1}", this._filename, e.RowsCopied);
        }
        /// <summary>
        /// Just grabs the file name from the full path
        /// </summary>
        /// <param name="fileName"></param>
        /// <returns></returns>
        private string ParseFileName(string fileName)
        {
            string fName = Path.GetFileNameWithoutExtension(fileName);
            return fName;
        }
        /// <summary>
        /// Default constructor
        /// </summary>
        public CSVReaderLoad()
        {

        }
    }
}

CSVDataReader.cs

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Data;
using System.IO;
using System.Collections.Concurrent;
using System.Threading.Tasks;
using System.Diagnostics;

namespace ParallelLoader
{
    /// <summary>
    /// The CSV file reader class.
    /// Most of this is generated by Visual Studio,
    ///     by implementing the IDataReader Interface,
    /// Much of this class is not called from the SQL Bulk Copy Class,
    ///     hence many NotImplementedException being thrown
    /// </summary>
    internal class CSVDataReader:IDataReader
    {
        /// <summary>
        /// CSV Filename which is being loaded
        /// </summary>
        private string fileName;
        /// <summary>
        /// "Pipe" between the reading from the data file, and returning the rows to the caller
        /// </summary>
        private BlockingCollection<List<string>> rows;
        /// <summary>
        /// Stream used to read the input file
        /// </summary>
        private StreamReader strReader;
        /// <summary>
        /// Maintain the state of the input file: open/closed
        /// </summary>
        private bool inputFileClosed;
        /// <summary>
        /// Current data row being return field by field to the caller
        /// </summary>
        private List<String> rowData;
        /// <summary>
        /// Size of the row buffer, rows held in the "pipe" cache
        /// </summary>
        private int ThrottleValue;
        #region Rows Returned
        /// <summary>
        /// Number of rows returned from the read process
        /// </summary>
        private long _rowsReturned;
        /// <summary>
        /// Rows Returned Property
        /// </summary>
        public long RowsReturned
        {
            get { return _rowsReturned; }
            set { _rowsReturned = value; }
        }
        #endregion
        #region Rows Read From the file
        /// <summary>
        /// Rows read from the file.
        /// </summary>
        private long _rowsRead;
        /// <summary>
        /// Rows read from the file property
        /// </summary>
        public long RowsRead
        {
            get { return _rowsRead; }
            set { _rowsRead = value; }
        }
        #endregion
        /// <summary>
        /// Constructor for the class
        /// </summary>
        /// <param name="fileName">The file name for the CSV data to load
        /// throttelValue">The number of rows/line from the file to hold in the "pipe"</param>
        public CSVDataReader(string fileName, int throttelValue)
        {
            this.fileName = fileName;
            strReader = new StreamReader(fileName);
            inputFileClosed = false;
            ThrottleValue = throttelValue;
            _rowsRead = 0L;
            _rowsReturned = 0L;
        }

        #region Private functions which implement the functionality
        /// <summary>
        /// Function which converts the lines in the CSV file into an IEnumerable
        /// Parses the line into a List of string tokens.
        /// </summary>
        /// <returns></returns>
        private IEnumerable<List<string>> fileReader()
        {
            while (true)
            {
                string line = strReader.ReadLine();
                if (line != null)
                {
                    ++_rowsRead;
                    yield return ParseLine(line);
                }
                else
                {
                    break;
                }
            }
        }

        /// <summary>
        /// A very simple CSV Parse function
        /// </summary>
        /// <param name="line">The line of comma delimited values</param>
        /// <returns>List<String> of string tokens from the input line</returns>
        private List<string> ParseLine(string line)
        {
            return line.Split(',').ToList();
        }

        /// <summary>
        /// Attempts to read a row from the pipe.
        /// This is the "guts" of the read process.
        /// The Take call is a blocking wait for a row from the pipe
        /// </summary>
        /// <returns>bool true: read a row, false: no rows left to read</returns>
        private bool TryAndReadARow()
        {
            if (!rows.IsCompleted)
            {
                // This is in essence the same as the MSDN example
                // see: http://msdn.microsoft.com/en-us/library/dd997371.aspx
                try
                {
                    // This will blocking wait until a row is available
                    rowData = rows.Take();
                }
                catch (InvalidOperationException)
                {
                    if (rows.IsCompleted)
                        return false;
                    throw;
                }
                if (rowData != null)
                {
                    //Debug.WriteLine("{0} Read A row OK", fileName);
                    ++_rowsReturned;
                    return true;
                }
                else
                {
                    //Debug.WriteLine("{0} Did not read a row", fileName);
                    return false;
                }
            }
            else
            {
                return false;
            }
        }
        /// <summary>
        /// This creates, and runs, the Task which reads from the CSV file,
        /// and puts the results into the "pipe".
        /// </summary>
        private void SetUpReaderPipeline()
        {
            if (rows == null)
            {
                rows = new BlockingCollection<List<string>>(ThrottleValue);
                var rowReader = Task.Factory.StartNew(() =>
                {
                    try
                    {
                        foreach (var parsedRow in fileReader())
                        {
                            rows.Add(parsedRow);
                        }
                    }
                    finally
                    {
                        rows.CompleteAdding();
                    }
                }
                );
            }
        }
        /// <summary>
        /// This just opens the file and reads the first line.
        /// Then the line is parsed to get the token count.
        /// </summary>
        /// <returns></returns>
        private int QuickPeekFile()
        {
            using (StreamReader rdr = new StreamReader(fileName))
            {
                string line = rdr.ReadLine();
                List<string> res = ParseLine(line);
                return res.Count;
            }
        }
        /// <summary>
        /// Function which pull a value from the row list
        /// 
        /// i">Position to be returned</param>
        /// <returns>String from that position</returns>
        private string GetColumnByIndex(int i)
        {
            if (rowData != null)
            {
                if (i < rowData.Count)
                    return rowData[i];
            }
            return String.Empty;
        }
        #endregion

        #region IDataReader Interface Implementation
        /// <summary>
        /// Close method just closes the input stream
        /// </summary>
        void IDataReader.Close()
        {
            strReader.Close();
            inputFileClosed = true;
        }
        /// <summary>
        /// Not a nested data file, so depth = 0
        /// </summary>
        int IDataReader.Depth
        {
            get { return 0; }
        }

        /// <summary>
        /// Return the closed value for the input stream
        /// </summary>
        bool IDataReader.IsClosed
        {
            get { return inputFileClosed; }
        }
        /// <summary>
        /// Sets up the pipeline,
        /// Then attempts to read a row from the pipe.
        /// </summary>
        /// <returns></returns>
        bool IDataReader.NextResult()
        {
            SetUpReaderPipeline();
            return TryAndReadARow();
        }
        /// <summary>
        /// The read method.
        /// This uses the private functions to create the pipe
        ///     and then read from that pipe.
        /// </summary>
        /// <returns></returns>
        bool IDataReader.Read()
        {
            SetUpReaderPipeline();
            return TryAndReadARow();
        }
        /// <summary>
        /// No records affected
        /// </summary>
        int IDataReader.RecordsAffected
        {
            get { return -1; }
        }
        /// <summary>
        /// To get the field count, need to take a peek at the first row in the file.
        /// </summary>
        int IDataRecord.FieldCount
        {
            get
            {
                if (rowData != null)
                {
                    return rowData.Count;
                }
                else
                {
                    return QuickPeekFile() - 1;
                }
            }
        }
        /// <summary>
        /// Gets a string value from row position i
        /// 
        /// i">Row position required</param>
        /// <returns>String value for the position</returns>
        string IDataRecord.GetString(int i)
        {
            return GetColumnByIndex(i);
        }
        /// <summary>
        /// Returns an object from the position in the row list
        /// 
        /// i">Position required</param>
        /// <returns>object (string representation) of the position</returns>
        object IDataRecord.GetValue(int i)
        {
            // This one fires
            string res = GetColumnByIndex(i);
            //Debug.WriteLine("Col {0} value {1}", i, res);
            return res;
        }
        #endregion

        void IDisposable.Dispose()
        {
            if (!inputFileClosed)
            {
                strReader.Close();
            }
            strReader.Dispose();
        }

        #region Unimplemented IDataRecord Methods. These are not called by SqlBulkCopy (the 4.0 version)

        DataTable IDataReader.GetSchemaTable()
        {
            throw new NotImplementedException();
        }

        bool IDataRecord.GetBoolean(int i)
        {
            throw new NotImplementedException();
        }

        byte IDataRecord.GetByte(int i)
        {
            throw new NotImplementedException();
        }

        long IDataRecord.GetBytes(int i, long fieldOffset, byte[] buffer, int bufferoffset, int length)
        {
            throw new NotImplementedException();
        }

        char IDataRecord.GetChar(int i)
        {
            throw new NotImplementedException();
        }

        long IDataRecord.GetChars(int i, long fieldoffset, char[] buffer, int bufferoffset, int length)
        {
            throw new NotImplementedException();
        }

        IDataReader IDataRecord.GetData(int i)
        {
            throw new NotImplementedException();
        }

        string IDataRecord.GetDataTypeName(int i)
        {
            throw new NotImplementedException();
        }

        DateTime IDataRecord.GetDateTime(int i)
        {
            throw new NotImplementedException();
        }

        decimal IDataRecord.GetDecimal(int i)
        {
            throw new NotImplementedException();
        }

        double IDataRecord.GetDouble(int i)
        {
            throw new NotImplementedException();
        }

        Type IDataRecord.GetFieldType(int i)
        {
            throw new NotImplementedException();
        }

        float IDataRecord.GetFloat(int i)
        {
            throw new NotImplementedException();
        }

        Guid IDataRecord.GetGuid(int i)
        {
            throw new NotImplementedException();
        }

        short IDataRecord.GetInt16(int i)
        {
            throw new NotImplementedException();
        }

        int IDataRecord.GetInt32(int i)
        {
            throw new NotImplementedException();
        }

        long IDataRecord.GetInt64(int i)
        {
            throw new NotImplementedException();
        }

        string IDataRecord.GetName(int i)
        {
            throw new NotImplementedException();
        }

        int IDataRecord.GetOrdinal(string name)
        {
            throw new NotImplementedException();
        }

        int IDataRecord.GetValues(object[] values)
        {
            throw new NotImplementedException();
        }

        bool IDataRecord.IsDBNull(int i)
        {
            throw new NotImplementedException();
        }

        object IDataRecord.this[string name]
        {
            get { throw new NotImplementedException(); }
        }

        object IDataRecord.this[int i]
        {
            get { return GetColumnByIndex(i); }
        }
        #endregion

    }
}

Some Very Big Thankyous

  1. The .Net and C# development teams. This implementation builds on a “mountain” of good work that these people have done in extending the C#, and .Net Framework, to make facets of parallel programming easier for developers to implement. Without all of that hard work, this implementation would have instances of Semaphore Class, Mutex Class, and lock statements all over the place, plus the added need to reason about how these object are being used. The implementation is free of those constructs because the language support does much of this “under the covers” for you, if you know the right paths through the maze.
  2. A special vote of thanks to Stephen Toub for writing “Patterns of Parallel Programming”. This document has proved invaluable as a resource, which has opened my eyes to the features which support parallel implementations that exist in the .Net 4.0 Framework.
  3. Eric Lippert and his blog Fabulous Adventures In Coding. Whilst, I’m not sure I can point to a single bog post specifically which contributed to this implementation. Eric’s erudite discussions of all things C#, and parallel implementations specifically, have shaped (or maybe that should be sharpened) my thinking about parallel implementations. I am certain I have significantly improved my skills at reasoning about parallel implementations through reading Eric’s blog.

Conclusions

The implementation which is presented here is a “bare bones” CSV processor. It is “bare bones” in that the CSV file must be very well behaved. By “well behaved” the input CSV file must have:

  • No null tokens. The implementation probably caters for this in some ways, but does not set, or manage, the IsDbNull method of the IDataReader Interface.
  • No enclosing quotes around a field,
  • No header row. Having a header row is feature which is often used in data transfer sue cases.
  • Only a comma as the delimiter. Some CSV variations use other than the comma as a field delimiter.

These are all features which could be added to the implementation, if they were required.

I was surprised how well all the “bits” fell into place when I started assembling this implementation. The only big surprise was the Take method throwing and exception. Then I thought through what was happening, even that was not too hard to come to terms with, and the “patch” was very simple.

, , , , , , , ,

8 Comments

%d bloggers like this: