Parallel Load CSV Data Using SqlBulkCopy


Preface

I have been trying do decide whether to split the following content over more than one post, or just as one big post. I have decide to post the all of the content in one blog post. As a result this blog post will contain:

  • The key technologies which support the implementation.
  • The discussions of the implementation. This will include as many illustrations as I can muster, and text.
  • The key features of the implementation,
  • The complete source code for the implementation.

My rational for posting this as a single article, is that one post is easier to read. This is probably more of a technical article, rather than a blog pots. It is my belief that blog posts tend to be briefer in length than this will end up. I hate articles that go over multiple pages, you just settle in to reading on the topic, and are reading and then you have to go and click the link to get to the next page. This strategy probably works well for sites where the site generate income for each add shown, and generating page views equals more revenue.

There are no “hard and fast” rules for blogging. Apart from one, which is the “golden rule”, that rule is “give the reader what they want”. If you came here to read about how to load in parallel to Sql Server using SqlBulkCopy, then that is what your going to receive.

Introduction

This article presents an implementation of parallel loading to Sql Server using the SqlBulkCopy class. This implementation make use of some of the new feature in C# 4.0 and the .Net 4.0 Framework.

Implementation Objectives

There were a number of objectives I had in mind when I started to develop this implementation. These objectives included:

  • Refreshing an implementation which I had made a while ago (before C# 4.0) which used System.Threading.Thread as the vehicle to parallelise loading of data to SQL Server.
  • Trying to smooth out the IO waits that a load process incurs, with more effective caching of data. The IO waits which a load process incurs come in from two sources, the first is one this implementation attempts to address, the second is one which it cannot. These two sources of IO waits are:
    1) The reading of the input CSV files from disk, be that local or network.
    2) The IO overhead communicating with SQL Server.
  • Using Parallel LINQ to make the data loading process happen in parallel. The load process can be viewed as an arbitrary set of files which need to be loaded into a symmetrical set of table. Having an arbitrary set of files (file names in this case), lends itself letting Parallel LINQ data partition the iteration over that set of file names into as many Tasks as the machine can support. This maybe, not what the designers of Parallel LINQ had in mind when they built it, but it has worked for me.
  • Using the Consumer Producer Pattern, and the BlockingCollection Class to implement the “pipe” between the separate tasks which makes up the implementation.
  • Using Task Class, which is new in .Net 4.0 Framework. The idea behind using the Task Class, and using Parallel LINQ, was to let the .Net Framework manage some of the tasks created (those it does under the covers to support Parallel LINQ), scheduling of all of the tasks, and all of the management of the Tasks. Generally, I was trying to produce, and think I have succeeded, in producing a parallel loading implementation without “customary” thread management code.

Solution Architecture Overviewimage

The architecture of the implementation can be broken to a couple of parts. These parts are:

  • The load pipeline. This process performs the following steps:
    • Reads the file names for the CSV files to be loaded,
    • Passes them into a “pipeline” (BlockingCollection),
    • A parallel LINQ statement reads the “pipeline”, and spawns load processes,
    • The load processes report the load result back to the output “pipeline”,
    • The output pipeline is then read to completion, and the load results reported.
  • The file read and parse into fields pipeline, which is as follows: image
    • Create a Task to read the file
      • Read a line from the file,
      • Parse the line into tokens,
      • Stick the tokens in a list
      • Put the list if tokens into the pipeline,
    • In the parent task,
      • On NextResult,
        • Take a list of token from the pipeline

Why SqlBulkCopy?

To quote the MSDN “Lets you efficiently bulk load a SQL Server table with data from another source.”. Simply put, for inserting data into a SQL Server table, the SqlBulkCopy class is the only way to go. It is the fastest way to load data, significantly faster than bound insert statements.

The “nuts and bolts” of the solution

There are a number of significant pieces of .Net 4.0 technology which this implementation rests upon. In the following I highlight some of the significant bits, and why I’ve used them in the solution.

I have chosen to take a  technology first approach to solution. By highlighting the technology elements used in the solution, hopefully I will provide enough information to the reader to understand some of the design decisions which contributed the shape of the current implementation.

A Delightful Parallel Problem

The process of loading one file into one SQL Table, is one of those “delightful parallel” problems. A “delightful parallel” problem is one which lends itself to a parallel implementation, with a minimum of “fuss”. The processes of loading one CSV file into one SQL Table represent a “delightful parallel problem”, because of the following properties of the processes:

  • The resources used along the process chain are used exclusively by that process chain (one unique file and one unique table); hence there are no contentions for exclusive resources along the process chain. There are resource contentions in the process, Input Output operations, network bandwidth, and memory usage and caching, being the obvious resource contenders. With the “right” implementation one can minimise the impact of those contentions.
  • There is no communications between loading Table A and loading Table B. There is nothing which shared between different loads; they are independent data operations. This independence of operation goes to making the load processes ideal for “rafting off” onto separate threads.
  • The state of a load process, on completion of that load process, only needs to be communicated to the parent thread. This implementation uses as pass the results of the load process back to the parent strategy. If a log file was used, you could do away with the passing of the results to the parent as well.

The “invisible” technologies

It may seem strange to write about the “invisible” technologies, but there are a number of built-in bits of the C# 4.0, and .Net 4.0 Framework, which  this implementation simply uses The underlying infrastructure of the language and framework really surface as identifiable “bits”, but are very note worthy.

Task Propagation through Parallel LINQ

This is a very appealing way to manage parallel processing, simply hand the problem off to Parallel LINQ to manage. The following code shows how I am doing just that. Parallel LINQ with start tasks, reuse task, and manage the number of active Tasks, as the “pipeline” is consumed. The construction of the CSVReaderLoad object, and invoking the LoadData method, leaves the Parallel LINQ infrastructure to start the processes.

// Read the file name pipe, and parallel call the CSV Reader Load class and the Data Load Method
var processes = Task.Factory.StartNew(() =>
{
    try
    {
        // A bit of dirty pool here, calling the constructor then the instance method in one go!
        // This ensures that there multiple instances of the class being used on multiple threads
        foreach (var loadResult in inputfiles.GetConsumingEnumerable().AsParallel().
            Select(fileName => new CSVReaderLoad().LoadData(fileName, connectString, rowBufferLength)))
        {
            processResults.Add(loadResult);
        }
    }
    finally
    {
        processResults.CompleteAdding();
    }
});

Thread Synchronisation a “Wait until Done” without System.Threading Objects

The following lines achieve quite a lot of work, all “under the covers”.

// This "pulls" all of the results back into on IEnumerable,
// there is some fancy thread synchronisation happening here (all under the covers).
var summary = (from res in processResults.GetConsumingEnumerable()
               select res).ToList();
foreach (var a in summary)
{
    Debug.WriteLine("{0} rows read: {1} rows returned: {2}", a.FileName, a.RowsLoaded, a.RowsReturned);
}

This one LINQ statement causes a “wait” on the blocking collection (processResults), until all of the threads have completed adding to the “pipe”.  All of the “nasty” System.Threading objects are buried in the implementations of the BlockingCollection Class. The “ToList()” is probably overkill, but when I was developing the code, I wanted to make sure that the execution of the LINQ statement happen immediately, rather than being deferred.

Thread Synchronisation – By Default, And With An Exception

There is one point in the implementation where there is synchronisation between two threads which are on opposite ends of one of the BlockingCollection “pipes”. This is because I’ve used the Take method to read the data rows out of the BlockingCollection. There is the possibility of the collection changing to completed between checking the IsCompleted property and while the Take method is waiting for a data row. This is a classic thread synchronisation situation. Fortunately, the BlockingCollection throws an exception when this happens, making dealing with the thread synchronisation issue relatively simple. The following code fragment shows the situation:

/// <summary>
/// Attempts to read a row from the pipe.
/// This is the "guts" of the read process.
/// The Take call is a blocking wait for a row from the pipe
/// </summary>
/// <returns>bool true: read a row, false: no rows left to read</returns>
private bool TryAndReadARow()
{
    if (!rows.IsCompleted)
    {
        // This is in essence the same as the MSDN example
        // see: http://msdn.microsoft.com/en-us/library/dd997371.aspx
        try
        {
            // This will blocking wait until a row is available
            rowData = rows.Take();
        }
        catch (InvalidOperationException)
        {
            if (rows.IsCompleted)
                return false;
            throw;
        }
        if (rowData != null)
        {
            //Debug.WriteLine("{0} Read A row OK", fileName);
            ++_rowsReturned;
            return true;
        }
        else
        {
            //Debug.WriteLine("{0} Did not read a row", fileName);
            return false;
        }
    }
    else
    {
        return false;
    }
}

Turning a File Read into an IEnumerable

This is not astounding, for those have used the yield return statement, but it is another example of things being done “under the covers” by the C# implementation for you. The compiler is doing “bunch” of work to manage state, so that the developer does not need to worry implementing a state machine. For this implementation, using the yield return statement made implementing the “pipe” between the CSV file being read, and returning a rows of data to the SqlBulkCopy easy. The code fragment follows:

/// <summary>
/// Function which converts the lines in the CSV file into an IEnumerable
/// Parses the line into a List of string tokens.
/// </summary>
/// <returns></returns>
private IEnumerable<List<string>> fileReader()
{
    while (true)
    {
        string line = strReader.ReadLine();
        if (line != null)
        {
            ++_rowsRead;
            yield return ParseLine(line);
        }
        else
        {
            break;
        }
    }
}

The Task Class

The implementation does make use of the Task class, and the Task.Factory.StartNew method, quite a bit. The Task class appears at the points where I want to separate the execution of the load process into logical partitions. These logical partitions are the “independent” parts of the load process.

The BlockingCollection Class

The BlockingCollection is one of the key pieces of technology in the implementation. The blocking collection is an implementation of the Consumer Producer Pattern. The Consumer Producer Pattern is leveraged in a couple of key places in this implementation. The elements of the implementation where the Consumer Producer Pattern is implemented are:

  1. The listing of the CSV file names which are each “pipe” to load process.
  2. The results of each load process, which are “piped” to a reporting loop at the completion of the load processes.
  3. The reading of a line from a CSV file, which is “piped” to a List of strings (on string for each field in the CSV line).

The SqlBulkCopy Class

This class is very much a pillar upon which this implementation is built. There are a couple of features of the class which should be noted. These features include:

  • The ColumnMappings Property. This is an instance of the SqlBulkCopyColumnMappingCollection Class, and is needed. The current implementation just sets up a 1 to 1 mapping from the input columns in the CSV file to the SQL Table’s columns. It’s an annoying little requirement; I wish the implementation of SqlBulkCopy did it for itself. But, there has probably been religious wars somewhere about that sort of feature being included. My current implementation of the Column Mappings is:
  • // The bulk copy needs a column mapping.
    // CSV Column position = SQL Column Position
    int cols = ((IDataReader)reader).FieldCount;
    for(int i = 0; i <= cols; i++)
        loader.ColumnMappings.Add(i, i);
  • The Rows Copied notify feature. This is handy for debugging, and also for observing the concurrent behaviours of the implementation. Particularlly, post facto, as I send the notifications to the debug output window.

IDataReader Interface

This is the data provider which the SqlBulkCopy consumes for data. Creating an implementation of the IDataReader interface which reads the CSV data files, is one of the keys to making this solution work. There is not too much extra logic one needs to implement (the code for my implementation will follow).

It is a daunting interface when you first look at it. So many methods, you wonder it it’s worth the effort to implement. Luckily two things come to ones aid:

  1. Visual Studio’s implement the interface. Which generates all of the method and properties for you. Granted it is an implementation which throws NotImplementedException all over the place, but it saves a lot of typing.
  2. SqlBulkCopy only uses a couple of the methods on the interface. This is a “by observation” statement. The statement should be qualified by, “the current implementation in .Net Framework 4.0”.

The Implementation

The following is the the source code for the implementation.

Program.cs

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Diagnostics;

namespace ParallelLoader
{
    /// <summary>
    /// Program class
    /// </summary>
    class Program
    {
        /// <summary>
        /// String which is the directory to load from.
        /// Could be an argument, but this was simpler.
        /// </summary>
        const string loadDir =
@"C:\Users\Craig\Documents\Visual Studio 10\Projects\ParallelDataLoad\LoadData\Small";
        /// <summary>
        /// Connect string to the database which will be loaded to.
        /// </summary>
        const string connectString = @"Data Source=CRAIG-PC-1;Initial Catalog=LoadContainer;Integrated Security=True;Pooling=False";
        /// <summary>
        /// The main of the program.
        /// Just creates the parallel load driver class and calls the Load method.
        /// The load should probably return something like a run summary, but that will be a later
        /// enhancement.
        /// 
        /// args">Not currently used</param>
        static void Main(string[] args)
        {
            int RowBufferLength = 2;
            ParallelDriver driver = new ParallelDriver(loadDir, connectString, RowBufferLength);
            driver.Load();
        }
    }
}

ParallelDriver.cs

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Collections.Concurrent;
using System.Threading.Tasks;
using System.IO;
using System.Diagnostics;

namespace ParallelLoader
{
    /// <summary>
    /// The driver class which makes some of the parallel processing work.
    /// </summary>
    public class ParallelDriver
    {
        /// <summary>
        /// The directory to be loaded from
        /// </summary>
        private string loadDir;
        /// <summary>
        /// The connect string for database which will be loaded to.
        /// </summary>
        private string connectString;
        private int rowBufferLength;

        /// <summary>
        /// The constructor for the class.
        /// Just caches the two important input values.
        /// 
        /// loadDir">The directory which contains the CSV files to load.
        /// The CSV files must be named tablename.CSV.
        /// connectString">The connect string to the data base which contains the tables to be loaded to</param>
        public ParallelDriver(string loadDir, string connectString, int rowBufferLength)
        {
            // TODO: Complete member initialization
            this.loadDir = loadDir;
            this.connectString = connectString;
            this.rowBufferLength = rowBufferLength;
        }
        /// <summary>
        /// This is where the parallel fun starts.
        /// The BlockingCollection, and Task object are the parallel object which start the parallel processing going.
        /// </summary>
        internal void Load()
        {
            // "Pipe" which contains the file names to be processed
            var inputfiles = new BlockingCollection<string>();
            // "Pipe" which contains the results of the loads.
            var processResults = new BlockingCollection<SqlLoadResults>();
            // Start feeding the file name pipe.
            var readfileNames = Task.Factory.StartNew(() =>
            {
                try
                {
                    foreach (var filename in Directory.EnumerateFiles(loadDir, "*.CSV")) inputfiles.Add(filename);
                }
                finally { inputfiles.CompleteAdding(); }
            });
            // Read the file name pipe, and parallel call the CSV Reader Load class and the Data Load Method
            var processes = Task.Factory.StartNew(() =>
            {
                try
                {
                    // A bit of dirty pool here, calling the constructor then the instance method in one go!
                    // This ensures that there multiple instances of the class being used on multiple threads
                    foreach (var loadResult in inputfiles.GetConsumingEnumerable().AsParallel().
                        Select(fileName => new CSVReaderLoad().LoadData(fileName, connectString, rowBufferLength)))
                    {
                        processResults.Add(loadResult);
                    }
                }
                finally
                {
                    processResults.CompleteAdding();
                }
            });
            // This "pulls" all of the results back into on IEnumerable,
            // there is some fancy thread synchronisation happening here (all under the covers).
            var summary = (from res in processResults.GetConsumingEnumerable()
                           select res).ToList();
            foreach (var a in summary)
            {
                Debug.WriteLine("{0} rows read: {1} rows returned: {2}", a.FileName, a.RowsLoaded, a.RowsReturned);
            }

        }
    }
}

CSVReaderLoad.cs

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Data.SqlClient;
using System.Diagnostics;
using System.IO;
using System.Data;

namespace ParallelLoader
{
    /// <summary>
    /// This the class which  does the SQL Bulk Copy operation
    /// </summary>
    internal class CSVReaderLoad
    {
        /// <summary>
        /// File name to be loaded
        /// </summary>
        private string _filename;
        /// <summary>
        /// Constructor, which just saves the file to load
        /// </summary>
        /// <param name="filename">full path to the file to load</param>
        public CSVReaderLoad(string filename)
        {
            // TODO: Complete member initialization
            this._filename = filename;
        }
        /// <summary>
        /// Method which does the SQL Bulk Copy
        /// </summary>
        /// <param name="fileName">CSV Load file name, can need the path as well
        /// connectString">Connect string to the SQL Server database to load to
        /// rowBufferLength">The size of the "pipe" which buffers the read rows.</param>
        /// <returns>The results of the load operation</returns>
        public SqlLoadResults LoadData(string fileName, string connectString, int rowBufferLength)
        {
            _filename = fileName;
            // Pull the table name from the CSV file name
            string tableName = ParseFileName(fileName);
            DateTime start = DateTime.Now;
            // High accuracy time measurement
            Stopwatch sw = new Stopwatch();
            sw.Start();
            // Build the class which reads the CSV data.
            CSVDataReader reader = new CSVDataReader(fileName, rowBufferLength);
            // Connect to the db
            using (SqlConnection con = new SqlConnection(connectString))
            {
                try
                {
                    con.Open();
                    // Put the operation into a transaction, means that can commit at the end.
                    SqlTransaction tx = con.BeginTransaction();
                    using (SqlBulkCopy loader = new SqlBulkCopy(con, SqlBulkCopyOptions.Default, tx))
                    {
                        try
                        {
                            loader.DestinationTableName = tableName;
                            // Progress notification - nice to have when testing
                            loader.SqlRowsCopied += new SqlRowsCopiedEventHandler(loader_SqlRowsCopied);
                            loader.NotifyAfter = 20;
                            // The bulk copy needs a column mapping.
                            // CSV Column position = SQL Column Position
                            int cols = ((IDataReader)reader).FieldCount;
                            for(int i = 0; i <= cols; i++)
                                loader.ColumnMappings.Add(i, i);
                            // Does the load
                            loader.WriteToServer(reader);
                            tx.Commit();
                            loader.Close();
                        }
                        catch (Exception ex)
                        {
                            Debug.WriteLine(ex);
                        }
                        finally
                        {
                        }
                    }
                }
                catch (Exception ex)
                {
                    Debug.WriteLine(ex);
                }
                finally
                {
                    con.Close();
                }
            }
            DateTime end = DateTime.Now;
            sw.Stop();
            // Return the load summary object
            return new SqlLoadResults(
                fileName, start, end, reader.RowsRead, 0L,
                sw.ElapsedMilliseconds, reader.RowsReturned);
        }

        /// <summary>
        /// Notification call back for this load
        /// </summary>
        /// <param name="sender">Ignored</param>
        /// <param name="e">Contains Rows Copied</param>
        void loader_SqlRowsCopied(object sender, SqlRowsCopiedEventArgs e)
        {
            Debug.WriteLine("FileName = {0}, Rows {1}", this._filename, e.RowsCopied);
        }
        /// <summary>
        /// Just grabs the file name from the full path
        /// </summary>
        /// <param name="fileName"></param>
        /// <returns></returns>
        private string ParseFileName(string fileName)
        {
            string fName = Path.GetFileNameWithoutExtension(fileName);
            return fName;
        }
        /// <summary>
        /// Default constructor
        /// </summary>
        public CSVReaderLoad()
        {

        }
    }
}

CSVDataReader.cs

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Data;
using System.IO;
using System.Collections.Concurrent;
using System.Threading.Tasks;
using System.Diagnostics;

namespace ParallelLoader
{
    /// <summary>
    /// The CSV file reader class.
    /// Most of this is generated by Visual Studio,
    ///     by implementing the IDataReader Interface,
    /// Much of this class is not called from the SQL Bulk Copy Class,
    ///     hence many NotImplementedException being thrown
    /// </summary>
    internal class CSVDataReader:IDataReader
    {
        /// <summary>
        /// CSV Filename which is being loaded
        /// </summary>
        private string fileName;
        /// <summary>
        /// "Pipe" between the reading from the data file, and returning the rows to the caller
        /// </summary>
        private BlockingCollection<List<string>> rows;
        /// <summary>
        /// Stream used to read the input file
        /// </summary>
        private StreamReader strReader;
        /// <summary>
        /// Maintain the state of the input file: open/closed
        /// </summary>
        private bool inputFileClosed;
        /// <summary>
        /// Current data row being return field by field to the caller
        /// </summary>
        private List<String> rowData;
        /// <summary>
        /// Size of the row buffer, rows held in the "pipe" cache
        /// </summary>
        private int ThrottleValue;
        #region Rows Returned
        /// <summary>
        /// Number of rows returned from the read process
        /// </summary>
        private long _rowsReturned;
        /// <summary>
        /// Rows Returned Property
        /// </summary>
        public long RowsReturned
        {
            get { return _rowsReturned; }
            set { _rowsReturned = value; }
        }
        #endregion
        #region Rows Read From the file
        /// <summary>
        /// Rows read from the file.
        /// </summary>
        private long _rowsRead;
        /// <summary>
        /// Rows read from the file property
        /// </summary>
        public long RowsRead
        {
            get { return _rowsRead; }
            set { _rowsRead = value; }
        }
        #endregion
        /// <summary>
        /// Constructor for the class
        /// </summary>
        /// <param name="fileName">The file name for the CSV data to load
        /// throttelValue">The number of rows/line from the file to hold in the "pipe"</param>
        public CSVDataReader(string fileName, int throttelValue)
        {
            this.fileName = fileName;
            strReader = new StreamReader(fileName);
            inputFileClosed = false;
            ThrottleValue = throttelValue;
            _rowsRead = 0L;
            _rowsReturned = 0L;
        }

        #region Private functions which implement the functionality
        /// <summary>
        /// Function which converts the lines in the CSV file into an IEnumerable
        /// Parses the line into a List of string tokens.
        /// </summary>
        /// <returns></returns>
        private IEnumerable<List<string>> fileReader()
        {
            while (true)
            {
                string line = strReader.ReadLine();
                if (line != null)
                {
                    ++_rowsRead;
                    yield return ParseLine(line);
                }
                else
                {
                    break;
                }
            }
        }

        /// <summary>
        /// A very simple CSV Parse function
        /// </summary>
        /// <param name="line">The line of comma delimited values</param>
        /// <returns>List<String> of string tokens from the input line</returns>
        private List<string> ParseLine(string line)
        {
            return line.Split(',').ToList();
        }

        /// <summary>
        /// Attempts to read a row from the pipe.
        /// This is the "guts" of the read process.
        /// The Take call is a blocking wait for a row from the pipe
        /// </summary>
        /// <returns>bool true: read a row, false: no rows left to read</returns>
        private bool TryAndReadARow()
        {
            if (!rows.IsCompleted)
            {
                // This is in essence the same as the MSDN example
                // see: http://msdn.microsoft.com/en-us/library/dd997371.aspx
                try
                {
                    // This will blocking wait until a row is available
                    rowData = rows.Take();
                }
                catch (InvalidOperationException)
                {
                    if (rows.IsCompleted)
                        return false;
                    throw;
                }
                if (rowData != null)
                {
                    //Debug.WriteLine("{0} Read A row OK", fileName);
                    ++_rowsReturned;
                    return true;
                }
                else
                {
                    //Debug.WriteLine("{0} Did not read a row", fileName);
                    return false;
                }
            }
            else
            {
                return false;
            }
        }
        /// <summary>
        /// This creates, and runs, the Task which reads from the CSV file,
        /// and puts the results into the "pipe".
        /// </summary>
        private void SetUpReaderPipeline()
        {
            if (rows == null)
            {
                rows = new BlockingCollection<List<string>>(ThrottleValue);
                var rowReader = Task.Factory.StartNew(() =>
                {
                    try
                    {
                        foreach (var parsedRow in fileReader())
                        {
                            rows.Add(parsedRow);
                        }
                    }
                    finally
                    {
                        rows.CompleteAdding();
                    }
                }
                );
            }
        }
        /// <summary>
        /// This just opens the file and reads the first line.
        /// Then the line is parsed to get the token count.
        /// </summary>
        /// <returns></returns>
        private int QuickPeekFile()
        {
            using (StreamReader rdr = new StreamReader(fileName))
            {
                string line = rdr.ReadLine();
                List<string> res = ParseLine(line);
                return res.Count;
            }
        }
        /// <summary>
        /// Function which pull a value from the row list
        /// 
        /// i">Position to be returned</param>
        /// <returns>String from that position</returns>
        private string GetColumnByIndex(int i)
        {
            if (rowData != null)
            {
                if (i < rowData.Count)
                    return rowData[i];
            }
            return String.Empty;
        }
        #endregion

        #region IDataReader Interface Implementation
        /// <summary>
        /// Close method just closes the input stream
        /// </summary>
        void IDataReader.Close()
        {
            strReader.Close();
            inputFileClosed = true;
        }
        /// <summary>
        /// Not a nested data file, so depth = 0
        /// </summary>
        int IDataReader.Depth
        {
            get { return 0; }
        }

        /// <summary>
        /// Return the closed value for the input stream
        /// </summary>
        bool IDataReader.IsClosed
        {
            get { return inputFileClosed; }
        }
        /// <summary>
        /// Sets up the pipeline,
        /// Then attempts to read a row from the pipe.
        /// </summary>
        /// <returns></returns>
        bool IDataReader.NextResult()
        {
            SetUpReaderPipeline();
            return TryAndReadARow();
        }
        /// <summary>
        /// The read method.
        /// This uses the private functions to create the pipe
        ///     and then read from that pipe.
        /// </summary>
        /// <returns></returns>
        bool IDataReader.Read()
        {
            SetUpReaderPipeline();
            return TryAndReadARow();
        }
        /// <summary>
        /// No records affected
        /// </summary>
        int IDataReader.RecordsAffected
        {
            get { return -1; }
        }
        /// <summary>
        /// To get the field count, need to take a peek at the first row in the file.
        /// </summary>
        int IDataRecord.FieldCount
        {
            get
            {
                if (rowData != null)
                {
                    return rowData.Count;
                }
                else
                {
                    return QuickPeekFile() - 1;
                }
            }
        }
        /// <summary>
        /// Gets a string value from row position i
        /// 
        /// i">Row position required</param>
        /// <returns>String value for the position</returns>
        string IDataRecord.GetString(int i)
        {
            return GetColumnByIndex(i);
        }
        /// <summary>
        /// Returns an object from the position in the row list
        /// 
        /// i">Position required</param>
        /// <returns>object (string representation) of the position</returns>
        object IDataRecord.GetValue(int i)
        {
            // This one fires
            string res = GetColumnByIndex(i);
            //Debug.WriteLine("Col {0} value {1}", i, res);
            return res;
        }
        #endregion

        void IDisposable.Dispose()
        {
            if (!inputFileClosed)
            {
                strReader.Close();
            }
            strReader.Dispose();
        }

        #region Unimplemented IDataRecord Methods. These are not called by SqlBulkCopy (the 4.0 version)

        DataTable IDataReader.GetSchemaTable()
        {
            throw new NotImplementedException();
        }

        bool IDataRecord.GetBoolean(int i)
        {
            throw new NotImplementedException();
        }

        byte IDataRecord.GetByte(int i)
        {
            throw new NotImplementedException();
        }

        long IDataRecord.GetBytes(int i, long fieldOffset, byte[] buffer, int bufferoffset, int length)
        {
            throw new NotImplementedException();
        }

        char IDataRecord.GetChar(int i)
        {
            throw new NotImplementedException();
        }

        long IDataRecord.GetChars(int i, long fieldoffset, char[] buffer, int bufferoffset, int length)
        {
            throw new NotImplementedException();
        }

        IDataReader IDataRecord.GetData(int i)
        {
            throw new NotImplementedException();
        }

        string IDataRecord.GetDataTypeName(int i)
        {
            throw new NotImplementedException();
        }

        DateTime IDataRecord.GetDateTime(int i)
        {
            throw new NotImplementedException();
        }

        decimal IDataRecord.GetDecimal(int i)
        {
            throw new NotImplementedException();
        }

        double IDataRecord.GetDouble(int i)
        {
            throw new NotImplementedException();
        }

        Type IDataRecord.GetFieldType(int i)
        {
            throw new NotImplementedException();
        }

        float IDataRecord.GetFloat(int i)
        {
            throw new NotImplementedException();
        }

        Guid IDataRecord.GetGuid(int i)
        {
            throw new NotImplementedException();
        }

        short IDataRecord.GetInt16(int i)
        {
            throw new NotImplementedException();
        }

        int IDataRecord.GetInt32(int i)
        {
            throw new NotImplementedException();
        }

        long IDataRecord.GetInt64(int i)
        {
            throw new NotImplementedException();
        }

        string IDataRecord.GetName(int i)
        {
            throw new NotImplementedException();
        }

        int IDataRecord.GetOrdinal(string name)
        {
            throw new NotImplementedException();
        }

        int IDataRecord.GetValues(object[] values)
        {
            throw new NotImplementedException();
        }

        bool IDataRecord.IsDBNull(int i)
        {
            throw new NotImplementedException();
        }

        object IDataRecord.this[string name]
        {
            get { throw new NotImplementedException(); }
        }

        object IDataRecord.this[int i]
        {
            get { return GetColumnByIndex(i); }
        }
        #endregion

    }
}

Some Very Big Thankyous

  1. The .Net and C# development teams. This implementation builds on a “mountain” of good work that these people have done in extending the C#, and .Net Framework, to make facets of parallel programming easier for developers to implement. Without all of that hard work, this implementation would have instances of Semaphore Class, Mutex Class, and lock statements all over the place, plus the added need to reason about how these object are being used. The implementation is free of those constructs because the language support does much of this “under the covers” for you, if you know the right paths through the maze.
  2. A special vote of thanks to Stephen Toub for writing “Patterns of Parallel Programming”. This document has proved invaluable as a resource, which has opened my eyes to the features which support parallel implementations that exist in the .Net 4.0 Framework.
  3. Eric Lippert and his blog Fabulous Adventures In Coding. Whilst, I’m not sure I can point to a single bog post specifically which contributed to this implementation. Eric’s erudite discussions of all things C#, and parallel implementations specifically, have shaped (or maybe that should be sharpened) my thinking about parallel implementations. I am certain I have significantly improved my skills at reasoning about parallel implementations through reading Eric’s blog.

Conclusions

The implementation which is presented here is a “bare bones” CSV processor. It is “bare bones” in that the CSV file must be very well behaved. By “well behaved” the input CSV file must have:

  • No null tokens. The implementation probably caters for this in some ways, but does not set, or manage, the IsDbNull method of the IDataReader Interface.
  • No enclosing quotes around a field,
  • No header row. Having a header row is feature which is often used in data transfer sue cases.
  • Only a comma as the delimiter. Some CSV variations use other than the comma as a field delimiter.

These are all features which could be added to the implementation, if they were required.

I was surprised how well all the “bits” fell into place when I started assembling this implementation. The only big surprise was the Take method throwing and exception. Then I thought through what was happening, even that was not too hard to come to terms with, and the “patch” was very simple.

Advertisements

, , , , , , , ,

  1. #1 by Jonathan Channon (@jchannon) on November 10, 2011 - 9:45 pm

    Challenge. Make this work for SQL CE4!

    • #2 by aussiecraig on November 12, 2011 - 6:04 am

      Jomathan,

      I must admit I have never had a reason to look at SQL CE 4.
      Consequentially, I have no I idea if the implementation will, or will not, work against that variant of SQL.
      If time, and interest, allows I may have a look at getting it up and running against that version of SQL.

      Craig

  2. #3 by jerdobi on September 25, 2011 - 11:25 am

    Great code and good implementation as I adapted it running under a web service! One suggestion is to use a “using…” on the CSVDataReader in CSVReaderLoad.cs as the CSV files were left open exclusively and couldn’t delete them under IIS. Also, would like to see the implementation handle a CSV file with a header row where the mapping between the CSV file and the table doesn’t have to be 1:1. Otherwise and in spite of this code saved me a lot of time in my implementation of a windows service that automatically backs up a database to the cloud using the CSV files as the transfer medium.

    • #4 by aussiecraig on October 18, 2011 - 3:34 pm

      Thanks for the comments.

      On “using”, that is one of the blind (partially) spots in my C#. I learnt C# and Java at about the same time, and swapped between the two for a while. As a result it took me some time to catch on to the usefulness (imperative of using it) of the “using” statement. As a result I sometimes miss the cases where I should be using a “using” statement. I wish the compiler has a message for IDisposable objects which are not in using statements.
      On the issue of other than 1:1 CSV and table relationship, that makes my “head hurt”. The whole project would be far more complex. There are many issues which opening the process up to loading a network of SQL tables would expose.

  3. #5 by beer guy on June 8, 2011 - 3:02 pm

    Great article.

    However, am I missing something? It doesn’t seem to handle anything other than string/varchar csv to SQL table fields.

    • #6 by aussiecraig on June 8, 2011 - 3:17 pm

      That’s a “current limitation of the implementation.
      If you need them, then the framework is there to add the functionality.
      Alternatively, load everything as text to a temp table and do the data conversions in SQL, inserting the converted data into the end table.

  4. #7 by kourtnie on February 22, 2011 - 6:08 am

    as if!

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

%d bloggers like this: