Posts Tagged Microsoft SQL Server

Corrigenda, Errata for the “Parallel Load CSV Using SqlBulkCopy”.


Introduction

This post is a “patch up” for the post I made yesterday. The “Parallel Load CSV Data Using SqlBulkCopy” blog posts has one omission.  I have forgotten to post one of the source files. The missing file was the SqlLoadResults class. This file is included below.

SqlLoadResults.cs:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace ParallelLoader
{
    /// <summary>
    /// Internal class, could be public of you wanted to pass the results out further
    /// </summary>
    internal class SqlLoadResults
    {
        #region FileName being loaded
        private string _fileName;
        public string FileName
        {
            get { return _fileName; }
            set { _fileName = value; }
        }
        #endregion
        #region Start Time
        private DateTime _startTime;
        public DateTime StartTime
        {
            get { return _startTime; }
            set { _startTime = value; }
        }
        #endregion
        #region End Time
        private DateTime _endTime;

        public DateTime EndTime
        {
            get { return _endTime; }
            set { _endTime = value; }
        }
        #endregion
        #region Rows Loaded
        private long _rowsLoaded;
        public long RowsLoaded
        {
            get { return _rowsLoaded; }
            set { _rowsLoaded = value; }
        }
        #endregion
        #region Rows Skipped
        private long _rowsSkipped;
        public long RowsSkipped
        {
            get { return _rowsSkipped; }
            set { _rowsSkipped = value; }
        }
        #endregion
        #region Elapsed time
        private long _elapsed;
        public long Elapsed
        {
            get { return _elapsed; }
            set { _elapsed = value; }
        }
        #endregion
        #region Rows Returned
        private long _rowsReturned;
        public long RowsReturned
        {
            get { return _rowsReturned; }
            set { _rowsReturned = value; }
        }
        #endregion
        /// <summary>
        /// Constructor for the class
        /// </summary>
        /// <param name="fileName">File which was loaded</param>
        /// <param name="start">Date Time the load started</param>
        /// <param name="end">Data Time the load ended
        /// rowsLoaded">Count of the rows loaded
        /// rowsSkipped">Count of rows skipped</param>
        /// <param name="elapsed">Clock ticks for the duration
        /// rowsReturned">Rows returned from the Data Reader</param>
        public SqlLoadResults(
            string fileName, DateTime start, DateTime end,
            long rowsLoaded, long rowsSkipped, long elapsed,
            long rowsReturned)
        {
            this._fileName = fileName;
            this._startTime = start;
            this._endTime = end;
            this._rowsLoaded = rowsLoaded;
            this._rowsSkipped = rowsSkipped;
            this._elapsed = elapsed;
            this._rowsReturned = rowsLoaded;
        }
    }
}

, , , , , , , ,

3 Comments

Parallel Load CSV Data Using SqlBulkCopy


Preface

I have been trying do decide whether to split the following content over more than one post, or just as one big post. I have decide to post the all of the content in one blog post. As a result this blog post will contain:

  • The key technologies which support the implementation.
  • The discussions of the implementation. This will include as many illustrations as I can muster, and text.
  • The key features of the implementation,
  • The complete source code for the implementation.

My rational for posting this as a single article, is that one post is easier to read. This is probably more of a technical article, rather than a blog pots. It is my belief that blog posts tend to be briefer in length than this will end up. I hate articles that go over multiple pages, you just settle in to reading on the topic, and are reading and then you have to go and click the link to get to the next page. This strategy probably works well for sites where the site generate income for each add shown, and generating page views equals more revenue.

There are no “hard and fast” rules for blogging. Apart from one, which is the “golden rule”, that rule is “give the reader what they want”. If you came here to read about how to load in parallel to Sql Server using SqlBulkCopy, then that is what your going to receive.

Introduction

This article presents an implementation of parallel loading to Sql Server using the SqlBulkCopy class. This implementation make use of some of the new feature in C# 4.0 and the .Net 4.0 Framework.

Implementation Objectives

There were a number of objectives I had in mind when I started to develop this implementation. These objectives included:

  • Refreshing an implementation which I had made a while ago (before C# 4.0) which used System.Threading.Thread as the vehicle to parallelise loading of data to SQL Server.
  • Trying to smooth out the IO waits that a load process incurs, with more effective caching of data. The IO waits which a load process incurs come in from two sources, the first is one this implementation attempts to address, the second is one which it cannot. These two sources of IO waits are:
    1) The reading of the input CSV files from disk, be that local or network.
    2) The IO overhead communicating with SQL Server.
  • Using Parallel LINQ to make the data loading process happen in parallel. The load process can be viewed as an arbitrary set of files which need to be loaded into a symmetrical set of table. Having an arbitrary set of files (file names in this case), lends itself letting Parallel LINQ data partition the iteration over that set of file names into as many Tasks as the machine can support. This maybe, not what the designers of Parallel LINQ had in mind when they built it, but it has worked for me.
  • Using the Consumer Producer Pattern, and the BlockingCollection Class to implement the “pipe” between the separate tasks which makes up the implementation.
  • Using Task Class, which is new in .Net 4.0 Framework. The idea behind using the Task Class, and using Parallel LINQ, was to let the .Net Framework manage some of the tasks created (those it does under the covers to support Parallel LINQ), scheduling of all of the tasks, and all of the management of the Tasks. Generally, I was trying to produce, and think I have succeeded, in producing a parallel loading implementation without “customary” thread management code.

Solution Architecture Overviewimage

The architecture of the implementation can be broken to a couple of parts. These parts are:

  • The load pipeline. This process performs the following steps:
    • Reads the file names for the CSV files to be loaded,
    • Passes them into a “pipeline” (BlockingCollection),
    • A parallel LINQ statement reads the “pipeline”, and spawns load processes,
    • The load processes report the load result back to the output “pipeline”,
    • The output pipeline is then read to completion, and the load results reported.
  • The file read and parse into fields pipeline, which is as follows: image
    • Create a Task to read the file
      • Read a line from the file,
      • Parse the line into tokens,
      • Stick the tokens in a list
      • Put the list if tokens into the pipeline,
    • In the parent task,
      • On NextResult,
        • Take a list of token from the pipeline

Why SqlBulkCopy?

To quote the MSDN “Lets you efficiently bulk load a SQL Server table with data from another source.”. Simply put, for inserting data into a SQL Server table, the SqlBulkCopy class is the only way to go. It is the fastest way to load data, significantly faster than bound insert statements.

The “nuts and bolts” of the solution

There are a number of significant pieces of .Net 4.0 technology which this implementation rests upon. In the following I highlight some of the significant bits, and why I’ve used them in the solution.

I have chosen to take a  technology first approach to solution. By highlighting the technology elements used in the solution, hopefully I will provide enough information to the reader to understand some of the design decisions which contributed the shape of the current implementation.

A Delightful Parallel Problem

The process of loading one file into one SQL Table, is one of those “delightful parallel” problems. A “delightful parallel” problem is one which lends itself to a parallel implementation, with a minimum of “fuss”. The processes of loading one CSV file into one SQL Table represent a “delightful parallel problem”, because of the following properties of the processes:

  • The resources used along the process chain are used exclusively by that process chain (one unique file and one unique table); hence there are no contentions for exclusive resources along the process chain. There are resource contentions in the process, Input Output operations, network bandwidth, and memory usage and caching, being the obvious resource contenders. With the “right” implementation one can minimise the impact of those contentions.
  • There is no communications between loading Table A and loading Table B. There is nothing which shared between different loads; they are independent data operations. This independence of operation goes to making the load processes ideal for “rafting off” onto separate threads.
  • The state of a load process, on completion of that load process, only needs to be communicated to the parent thread. This implementation uses as pass the results of the load process back to the parent strategy. If a log file was used, you could do away with the passing of the results to the parent as well.

The “invisible” technologies

It may seem strange to write about the “invisible” technologies, but there are a number of built-in bits of the C# 4.0, and .Net 4.0 Framework, which  this implementation simply uses The underlying infrastructure of the language and framework really surface as identifiable “bits”, but are very note worthy.

Task Propagation through Parallel LINQ

This is a very appealing way to manage parallel processing, simply hand the problem off to Parallel LINQ to manage. The following code shows how I am doing just that. Parallel LINQ with start tasks, reuse task, and manage the number of active Tasks, as the “pipeline” is consumed. The construction of the CSVReaderLoad object, and invoking the LoadData method, leaves the Parallel LINQ infrastructure to start the processes.

// Read the file name pipe, and parallel call the CSV Reader Load class and the Data Load Method
var processes = Task.Factory.StartNew(() =>
{
    try
    {
        // A bit of dirty pool here, calling the constructor then the instance method in one go!
        // This ensures that there multiple instances of the class being used on multiple threads
        foreach (var loadResult in inputfiles.GetConsumingEnumerable().AsParallel().
            Select(fileName => new CSVReaderLoad().LoadData(fileName, connectString, rowBufferLength)))
        {
            processResults.Add(loadResult);
        }
    }
    finally
    {
        processResults.CompleteAdding();
    }
});

Thread Synchronisation a “Wait until Done” without System.Threading Objects

The following lines achieve quite a lot of work, all “under the covers”.

// This "pulls" all of the results back into on IEnumerable,
// there is some fancy thread synchronisation happening here (all under the covers).
var summary = (from res in processResults.GetConsumingEnumerable()
               select res).ToList();
foreach (var a in summary)
{
    Debug.WriteLine("{0} rows read: {1} rows returned: {2}", a.FileName, a.RowsLoaded, a.RowsReturned);
}

This one LINQ statement causes a “wait” on the blocking collection (processResults), until all of the threads have completed adding to the “pipe”.  All of the “nasty” System.Threading objects are buried in the implementations of the BlockingCollection Class. The “ToList()” is probably overkill, but when I was developing the code, I wanted to make sure that the execution of the LINQ statement happen immediately, rather than being deferred.

Thread Synchronisation – By Default, And With An Exception

There is one point in the implementation where there is synchronisation between two threads which are on opposite ends of one of the BlockingCollection “pipes”. This is because I’ve used the Take method to read the data rows out of the BlockingCollection. There is the possibility of the collection changing to completed between checking the IsCompleted property and while the Take method is waiting for a data row. This is a classic thread synchronisation situation. Fortunately, the BlockingCollection throws an exception when this happens, making dealing with the thread synchronisation issue relatively simple. The following code fragment shows the situation:

/// <summary>
/// Attempts to read a row from the pipe.
/// This is the "guts" of the read process.
/// The Take call is a blocking wait for a row from the pipe
/// </summary>
/// <returns>bool true: read a row, false: no rows left to read</returns>
private bool TryAndReadARow()
{
    if (!rows.IsCompleted)
    {
        // This is in essence the same as the MSDN example
        // see: http://msdn.microsoft.com/en-us/library/dd997371.aspx
        try
        {
            // This will blocking wait until a row is available
            rowData = rows.Take();
        }
        catch (InvalidOperationException)
        {
            if (rows.IsCompleted)
                return false;
            throw;
        }
        if (rowData != null)
        {
            //Debug.WriteLine("{0} Read A row OK", fileName);
            ++_rowsReturned;
            return true;
        }
        else
        {
            //Debug.WriteLine("{0} Did not read a row", fileName);
            return false;
        }
    }
    else
    {
        return false;
    }
}

Turning a File Read into an IEnumerable

This is not astounding, for those have used the yield return statement, but it is another example of things being done “under the covers” by the C# implementation for you. The compiler is doing “bunch” of work to manage state, so that the developer does not need to worry implementing a state machine. For this implementation, using the yield return statement made implementing the “pipe” between the CSV file being read, and returning a rows of data to the SqlBulkCopy easy. The code fragment follows:

/// <summary>
/// Function which converts the lines in the CSV file into an IEnumerable
/// Parses the line into a List of string tokens.
/// </summary>
/// <returns></returns>
private IEnumerable<List<string>> fileReader()
{
    while (true)
    {
        string line = strReader.ReadLine();
        if (line != null)
        {
            ++_rowsRead;
            yield return ParseLine(line);
        }
        else
        {
            break;
        }
    }
}

The Task Class

The implementation does make use of the Task class, and the Task.Factory.StartNew method, quite a bit. The Task class appears at the points where I want to separate the execution of the load process into logical partitions. These logical partitions are the “independent” parts of the load process.

The BlockingCollection Class

The BlockingCollection is one of the key pieces of technology in the implementation. The blocking collection is an implementation of the Consumer Producer Pattern. The Consumer Producer Pattern is leveraged in a couple of key places in this implementation. The elements of the implementation where the Consumer Producer Pattern is implemented are:

  1. The listing of the CSV file names which are each “pipe” to load process.
  2. The results of each load process, which are “piped” to a reporting loop at the completion of the load processes.
  3. The reading of a line from a CSV file, which is “piped” to a List of strings (on string for each field in the CSV line).

The SqlBulkCopy Class

This class is very much a pillar upon which this implementation is built. There are a couple of features of the class which should be noted. These features include:

  • The ColumnMappings Property. This is an instance of the SqlBulkCopyColumnMappingCollection Class, and is needed. The current implementation just sets up a 1 to 1 mapping from the input columns in the CSV file to the SQL Table’s columns. It’s an annoying little requirement; I wish the implementation of SqlBulkCopy did it for itself. But, there has probably been religious wars somewhere about that sort of feature being included. My current implementation of the Column Mappings is:
  • // The bulk copy needs a column mapping.
    // CSV Column position = SQL Column Position
    int cols = ((IDataReader)reader).FieldCount;
    for(int i = 0; i <= cols; i++)
        loader.ColumnMappings.Add(i, i);
  • The Rows Copied notify feature. This is handy for debugging, and also for observing the concurrent behaviours of the implementation. Particularlly, post facto, as I send the notifications to the debug output window.

IDataReader Interface

This is the data provider which the SqlBulkCopy consumes for data. Creating an implementation of the IDataReader interface which reads the CSV data files, is one of the keys to making this solution work. There is not too much extra logic one needs to implement (the code for my implementation will follow).

It is a daunting interface when you first look at it. So many methods, you wonder it it’s worth the effort to implement. Luckily two things come to ones aid:

  1. Visual Studio’s implement the interface. Which generates all of the method and properties for you. Granted it is an implementation which throws NotImplementedException all over the place, but it saves a lot of typing.
  2. SqlBulkCopy only uses a couple of the methods on the interface. This is a “by observation” statement. The statement should be qualified by, “the current implementation in .Net Framework 4.0”.

The Implementation

The following is the the source code for the implementation.

Program.cs

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Diagnostics;

namespace ParallelLoader
{
    /// <summary>
    /// Program class
    /// </summary>
    class Program
    {
        /// <summary>
        /// String which is the directory to load from.
        /// Could be an argument, but this was simpler.
        /// </summary>
        const string loadDir =
@"C:\Users\Craig\Documents\Visual Studio 10\Projects\ParallelDataLoad\LoadData\Small";
        /// <summary>
        /// Connect string to the database which will be loaded to.
        /// </summary>
        const string connectString = @"Data Source=CRAIG-PC-1;Initial Catalog=LoadContainer;Integrated Security=True;Pooling=False";
        /// <summary>
        /// The main of the program.
        /// Just creates the parallel load driver class and calls the Load method.
        /// The load should probably return something like a run summary, but that will be a later
        /// enhancement.
        /// 
        /// args">Not currently used</param>
        static void Main(string[] args)
        {
            int RowBufferLength = 2;
            ParallelDriver driver = new ParallelDriver(loadDir, connectString, RowBufferLength);
            driver.Load();
        }
    }
}

ParallelDriver.cs

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Collections.Concurrent;
using System.Threading.Tasks;
using System.IO;
using System.Diagnostics;

namespace ParallelLoader
{
    /// <summary>
    /// The driver class which makes some of the parallel processing work.
    /// </summary>
    public class ParallelDriver
    {
        /// <summary>
        /// The directory to be loaded from
        /// </summary>
        private string loadDir;
        /// <summary>
        /// The connect string for database which will be loaded to.
        /// </summary>
        private string connectString;
        private int rowBufferLength;

        /// <summary>
        /// The constructor for the class.
        /// Just caches the two important input values.
        /// 
        /// loadDir">The directory which contains the CSV files to load.
        /// The CSV files must be named tablename.CSV.
        /// connectString">The connect string to the data base which contains the tables to be loaded to</param>
        public ParallelDriver(string loadDir, string connectString, int rowBufferLength)
        {
            // TODO: Complete member initialization
            this.loadDir = loadDir;
            this.connectString = connectString;
            this.rowBufferLength = rowBufferLength;
        }
        /// <summary>
        /// This is where the parallel fun starts.
        /// The BlockingCollection, and Task object are the parallel object which start the parallel processing going.
        /// </summary>
        internal void Load()
        {
            // "Pipe" which contains the file names to be processed
            var inputfiles = new BlockingCollection<string>();
            // "Pipe" which contains the results of the loads.
            var processResults = new BlockingCollection<SqlLoadResults>();
            // Start feeding the file name pipe.
            var readfileNames = Task.Factory.StartNew(() =>
            {
                try
                {
                    foreach (var filename in Directory.EnumerateFiles(loadDir, "*.CSV")) inputfiles.Add(filename);
                }
                finally { inputfiles.CompleteAdding(); }
            });
            // Read the file name pipe, and parallel call the CSV Reader Load class and the Data Load Method
            var processes = Task.Factory.StartNew(() =>
            {
                try
                {
                    // A bit of dirty pool here, calling the constructor then the instance method in one go!
                    // This ensures that there multiple instances of the class being used on multiple threads
                    foreach (var loadResult in inputfiles.GetConsumingEnumerable().AsParallel().
                        Select(fileName => new CSVReaderLoad().LoadData(fileName, connectString, rowBufferLength)))
                    {
                        processResults.Add(loadResult);
                    }
                }
                finally
                {
                    processResults.CompleteAdding();
                }
            });
            // This "pulls" all of the results back into on IEnumerable,
            // there is some fancy thread synchronisation happening here (all under the covers).
            var summary = (from res in processResults.GetConsumingEnumerable()
                           select res).ToList();
            foreach (var a in summary)
            {
                Debug.WriteLine("{0} rows read: {1} rows returned: {2}", a.FileName, a.RowsLoaded, a.RowsReturned);
            }

        }
    }
}

CSVReaderLoad.cs

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Data.SqlClient;
using System.Diagnostics;
using System.IO;
using System.Data;

namespace ParallelLoader
{
    /// <summary>
    /// This the class which  does the SQL Bulk Copy operation
    /// </summary>
    internal class CSVReaderLoad
    {
        /// <summary>
        /// File name to be loaded
        /// </summary>
        private string _filename;
        /// <summary>
        /// Constructor, which just saves the file to load
        /// </summary>
        /// <param name="filename">full path to the file to load</param>
        public CSVReaderLoad(string filename)
        {
            // TODO: Complete member initialization
            this._filename = filename;
        }
        /// <summary>
        /// Method which does the SQL Bulk Copy
        /// </summary>
        /// <param name="fileName">CSV Load file name, can need the path as well
        /// connectString">Connect string to the SQL Server database to load to
        /// rowBufferLength">The size of the "pipe" which buffers the read rows.</param>
        /// <returns>The results of the load operation</returns>
        public SqlLoadResults LoadData(string fileName, string connectString, int rowBufferLength)
        {
            _filename = fileName;
            // Pull the table name from the CSV file name
            string tableName = ParseFileName(fileName);
            DateTime start = DateTime.Now;
            // High accuracy time measurement
            Stopwatch sw = new Stopwatch();
            sw.Start();
            // Build the class which reads the CSV data.
            CSVDataReader reader = new CSVDataReader(fileName, rowBufferLength);
            // Connect to the db
            using (SqlConnection con = new SqlConnection(connectString))
            {
                try
                {
                    con.Open();
                    // Put the operation into a transaction, means that can commit at the end.
                    SqlTransaction tx = con.BeginTransaction();
                    using (SqlBulkCopy loader = new SqlBulkCopy(con, SqlBulkCopyOptions.Default, tx))
                    {
                        try
                        {
                            loader.DestinationTableName = tableName;
                            // Progress notification - nice to have when testing
                            loader.SqlRowsCopied += new SqlRowsCopiedEventHandler(loader_SqlRowsCopied);
                            loader.NotifyAfter = 20;
                            // The bulk copy needs a column mapping.
                            // CSV Column position = SQL Column Position
                            int cols = ((IDataReader)reader).FieldCount;
                            for(int i = 0; i <= cols; i++)
                                loader.ColumnMappings.Add(i, i);
                            // Does the load
                            loader.WriteToServer(reader);
                            tx.Commit();
                            loader.Close();
                        }
                        catch (Exception ex)
                        {
                            Debug.WriteLine(ex);
                        }
                        finally
                        {
                        }
                    }
                }
                catch (Exception ex)
                {
                    Debug.WriteLine(ex);
                }
                finally
                {
                    con.Close();
                }
            }
            DateTime end = DateTime.Now;
            sw.Stop();
            // Return the load summary object
            return new SqlLoadResults(
                fileName, start, end, reader.RowsRead, 0L,
                sw.ElapsedMilliseconds, reader.RowsReturned);
        }

        /// <summary>
        /// Notification call back for this load
        /// </summary>
        /// <param name="sender">Ignored</param>
        /// <param name="e">Contains Rows Copied</param>
        void loader_SqlRowsCopied(object sender, SqlRowsCopiedEventArgs e)
        {
            Debug.WriteLine("FileName = {0}, Rows {1}", this._filename, e.RowsCopied);
        }
        /// <summary>
        /// Just grabs the file name from the full path
        /// </summary>
        /// <param name="fileName"></param>
        /// <returns></returns>
        private string ParseFileName(string fileName)
        {
            string fName = Path.GetFileNameWithoutExtension(fileName);
            return fName;
        }
        /// <summary>
        /// Default constructor
        /// </summary>
        public CSVReaderLoad()
        {

        }
    }
}

CSVDataReader.cs

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Data;
using System.IO;
using System.Collections.Concurrent;
using System.Threading.Tasks;
using System.Diagnostics;

namespace ParallelLoader
{
    /// <summary>
    /// The CSV file reader class.
    /// Most of this is generated by Visual Studio,
    ///     by implementing the IDataReader Interface,
    /// Much of this class is not called from the SQL Bulk Copy Class,
    ///     hence many NotImplementedException being thrown
    /// </summary>
    internal class CSVDataReader:IDataReader
    {
        /// <summary>
        /// CSV Filename which is being loaded
        /// </summary>
        private string fileName;
        /// <summary>
        /// "Pipe" between the reading from the data file, and returning the rows to the caller
        /// </summary>
        private BlockingCollection<List<string>> rows;
        /// <summary>
        /// Stream used to read the input file
        /// </summary>
        private StreamReader strReader;
        /// <summary>
        /// Maintain the state of the input file: open/closed
        /// </summary>
        private bool inputFileClosed;
        /// <summary>
        /// Current data row being return field by field to the caller
        /// </summary>
        private List<String> rowData;
        /// <summary>
        /// Size of the row buffer, rows held in the "pipe" cache
        /// </summary>
        private int ThrottleValue;
        #region Rows Returned
        /// <summary>
        /// Number of rows returned from the read process
        /// </summary>
        private long _rowsReturned;
        /// <summary>
        /// Rows Returned Property
        /// </summary>
        public long RowsReturned
        {
            get { return _rowsReturned; }
            set { _rowsReturned = value; }
        }
        #endregion
        #region Rows Read From the file
        /// <summary>
        /// Rows read from the file.
        /// </summary>
        private long _rowsRead;
        /// <summary>
        /// Rows read from the file property
        /// </summary>
        public long RowsRead
        {
            get { return _rowsRead; }
            set { _rowsRead = value; }
        }
        #endregion
        /// <summary>
        /// Constructor for the class
        /// </summary>
        /// <param name="fileName">The file name for the CSV data to load
        /// throttelValue">The number of rows/line from the file to hold in the "pipe"</param>
        public CSVDataReader(string fileName, int throttelValue)
        {
            this.fileName = fileName;
            strReader = new StreamReader(fileName);
            inputFileClosed = false;
            ThrottleValue = throttelValue;
            _rowsRead = 0L;
            _rowsReturned = 0L;
        }

        #region Private functions which implement the functionality
        /// <summary>
        /// Function which converts the lines in the CSV file into an IEnumerable
        /// Parses the line into a List of string tokens.
        /// </summary>
        /// <returns></returns>
        private IEnumerable<List<string>> fileReader()
        {
            while (true)
            {
                string line = strReader.ReadLine();
                if (line != null)
                {
                    ++_rowsRead;
                    yield return ParseLine(line);
                }
                else
                {
                    break;
                }
            }
        }

        /// <summary>
        /// A very simple CSV Parse function
        /// </summary>
        /// <param name="line">The line of comma delimited values</param>
        /// <returns>List<String> of string tokens from the input line</returns>
        private List<string> ParseLine(string line)
        {
            return line.Split(',').ToList();
        }

        /// <summary>
        /// Attempts to read a row from the pipe.
        /// This is the "guts" of the read process.
        /// The Take call is a blocking wait for a row from the pipe
        /// </summary>
        /// <returns>bool true: read a row, false: no rows left to read</returns>
        private bool TryAndReadARow()
        {
            if (!rows.IsCompleted)
            {
                // This is in essence the same as the MSDN example
                // see: http://msdn.microsoft.com/en-us/library/dd997371.aspx
                try
                {
                    // This will blocking wait until a row is available
                    rowData = rows.Take();
                }
                catch (InvalidOperationException)
                {
                    if (rows.IsCompleted)
                        return false;
                    throw;
                }
                if (rowData != null)
                {
                    //Debug.WriteLine("{0} Read A row OK", fileName);
                    ++_rowsReturned;
                    return true;
                }
                else
                {
                    //Debug.WriteLine("{0} Did not read a row", fileName);
                    return false;
                }
            }
            else
            {
                return false;
            }
        }
        /// <summary>
        /// This creates, and runs, the Task which reads from the CSV file,
        /// and puts the results into the "pipe".
        /// </summary>
        private void SetUpReaderPipeline()
        {
            if (rows == null)
            {
                rows = new BlockingCollection<List<string>>(ThrottleValue);
                var rowReader = Task.Factory.StartNew(() =>
                {
                    try
                    {
                        foreach (var parsedRow in fileReader())
                        {
                            rows.Add(parsedRow);
                        }
                    }
                    finally
                    {
                        rows.CompleteAdding();
                    }
                }
                );
            }
        }
        /// <summary>
        /// This just opens the file and reads the first line.
        /// Then the line is parsed to get the token count.
        /// </summary>
        /// <returns></returns>
        private int QuickPeekFile()
        {
            using (StreamReader rdr = new StreamReader(fileName))
            {
                string line = rdr.ReadLine();
                List<string> res = ParseLine(line);
                return res.Count;
            }
        }
        /// <summary>
        /// Function which pull a value from the row list
        /// 
        /// i">Position to be returned</param>
        /// <returns>String from that position</returns>
        private string GetColumnByIndex(int i)
        {
            if (rowData != null)
            {
                if (i < rowData.Count)
                    return rowData[i];
            }
            return String.Empty;
        }
        #endregion

        #region IDataReader Interface Implementation
        /// <summary>
        /// Close method just closes the input stream
        /// </summary>
        void IDataReader.Close()
        {
            strReader.Close();
            inputFileClosed = true;
        }
        /// <summary>
        /// Not a nested data file, so depth = 0
        /// </summary>
        int IDataReader.Depth
        {
            get { return 0; }
        }

        /// <summary>
        /// Return the closed value for the input stream
        /// </summary>
        bool IDataReader.IsClosed
        {
            get { return inputFileClosed; }
        }
        /// <summary>
        /// Sets up the pipeline,
        /// Then attempts to read a row from the pipe.
        /// </summary>
        /// <returns></returns>
        bool IDataReader.NextResult()
        {
            SetUpReaderPipeline();
            return TryAndReadARow();
        }
        /// <summary>
        /// The read method.
        /// This uses the private functions to create the pipe
        ///     and then read from that pipe.
        /// </summary>
        /// <returns></returns>
        bool IDataReader.Read()
        {
            SetUpReaderPipeline();
            return TryAndReadARow();
        }
        /// <summary>
        /// No records affected
        /// </summary>
        int IDataReader.RecordsAffected
        {
            get { return -1; }
        }
        /// <summary>
        /// To get the field count, need to take a peek at the first row in the file.
        /// </summary>
        int IDataRecord.FieldCount
        {
            get
            {
                if (rowData != null)
                {
                    return rowData.Count;
                }
                else
                {
                    return QuickPeekFile() - 1;
                }
            }
        }
        /// <summary>
        /// Gets a string value from row position i
        /// 
        /// i">Row position required</param>
        /// <returns>String value for the position</returns>
        string IDataRecord.GetString(int i)
        {
            return GetColumnByIndex(i);
        }
        /// <summary>
        /// Returns an object from the position in the row list
        /// 
        /// i">Position required</param>
        /// <returns>object (string representation) of the position</returns>
        object IDataRecord.GetValue(int i)
        {
            // This one fires
            string res = GetColumnByIndex(i);
            //Debug.WriteLine("Col {0} value {1}", i, res);
            return res;
        }
        #endregion

        void IDisposable.Dispose()
        {
            if (!inputFileClosed)
            {
                strReader.Close();
            }
            strReader.Dispose();
        }

        #region Unimplemented IDataRecord Methods. These are not called by SqlBulkCopy (the 4.0 version)

        DataTable IDataReader.GetSchemaTable()
        {
            throw new NotImplementedException();
        }

        bool IDataRecord.GetBoolean(int i)
        {
            throw new NotImplementedException();
        }

        byte IDataRecord.GetByte(int i)
        {
            throw new NotImplementedException();
        }

        long IDataRecord.GetBytes(int i, long fieldOffset, byte[] buffer, int bufferoffset, int length)
        {
            throw new NotImplementedException();
        }

        char IDataRecord.GetChar(int i)
        {
            throw new NotImplementedException();
        }

        long IDataRecord.GetChars(int i, long fieldoffset, char[] buffer, int bufferoffset, int length)
        {
            throw new NotImplementedException();
        }

        IDataReader IDataRecord.GetData(int i)
        {
            throw new NotImplementedException();
        }

        string IDataRecord.GetDataTypeName(int i)
        {
            throw new NotImplementedException();
        }

        DateTime IDataRecord.GetDateTime(int i)
        {
            throw new NotImplementedException();
        }

        decimal IDataRecord.GetDecimal(int i)
        {
            throw new NotImplementedException();
        }

        double IDataRecord.GetDouble(int i)
        {
            throw new NotImplementedException();
        }

        Type IDataRecord.GetFieldType(int i)
        {
            throw new NotImplementedException();
        }

        float IDataRecord.GetFloat(int i)
        {
            throw new NotImplementedException();
        }

        Guid IDataRecord.GetGuid(int i)
        {
            throw new NotImplementedException();
        }

        short IDataRecord.GetInt16(int i)
        {
            throw new NotImplementedException();
        }

        int IDataRecord.GetInt32(int i)
        {
            throw new NotImplementedException();
        }

        long IDataRecord.GetInt64(int i)
        {
            throw new NotImplementedException();
        }

        string IDataRecord.GetName(int i)
        {
            throw new NotImplementedException();
        }

        int IDataRecord.GetOrdinal(string name)
        {
            throw new NotImplementedException();
        }

        int IDataRecord.GetValues(object[] values)
        {
            throw new NotImplementedException();
        }

        bool IDataRecord.IsDBNull(int i)
        {
            throw new NotImplementedException();
        }

        object IDataRecord.this[string name]
        {
            get { throw new NotImplementedException(); }
        }

        object IDataRecord.this[int i]
        {
            get { return GetColumnByIndex(i); }
        }
        #endregion

    }
}

Some Very Big Thankyous

  1. The .Net and C# development teams. This implementation builds on a “mountain” of good work that these people have done in extending the C#, and .Net Framework, to make facets of parallel programming easier for developers to implement. Without all of that hard work, this implementation would have instances of Semaphore Class, Mutex Class, and lock statements all over the place, plus the added need to reason about how these object are being used. The implementation is free of those constructs because the language support does much of this “under the covers” for you, if you know the right paths through the maze.
  2. A special vote of thanks to Stephen Toub for writing “Patterns of Parallel Programming”. This document has proved invaluable as a resource, which has opened my eyes to the features which support parallel implementations that exist in the .Net 4.0 Framework.
  3. Eric Lippert and his blog Fabulous Adventures In Coding. Whilst, I’m not sure I can point to a single bog post specifically which contributed to this implementation. Eric’s erudite discussions of all things C#, and parallel implementations specifically, have shaped (or maybe that should be sharpened) my thinking about parallel implementations. I am certain I have significantly improved my skills at reasoning about parallel implementations through reading Eric’s blog.

Conclusions

The implementation which is presented here is a “bare bones” CSV processor. It is “bare bones” in that the CSV file must be very well behaved. By “well behaved” the input CSV file must have:

  • No null tokens. The implementation probably caters for this in some ways, but does not set, or manage, the IsDbNull method of the IDataReader Interface.
  • No enclosing quotes around a field,
  • No header row. Having a header row is feature which is often used in data transfer sue cases.
  • Only a comma as the delimiter. Some CSV variations use other than the comma as a field delimiter.

These are all features which could be added to the implementation, if they were required.

I was surprised how well all the “bits” fell into place when I started assembling this implementation. The only big surprise was the Take method throwing and exception. Then I thought through what was happening, even that was not too hard to come to terms with, and the “patch” was very simple.

, , , , , , , ,

8 Comments

Random CSV Test Data Generator


Introduction

I have been building a piece of software which parallel loads CSV data, using the SqlBulkCopy class, into SQL Server tables. If you have ever loaded data into SQL Server, then you would know that SqlBulkCopy is the fastest way to load data. This little project will be the subject of some subsequent blog posts.

To test the parallel loading software I needed some “junk”, or random, data to load into the tables. I just needs a “bunch” of rows to load from a file, which would go into the table “clean”. I did not want to have data errors, at least in the initial testing of the program. I also wanted enough rows so that I can see that the parallel loading is really happening.

After a quick browse on the internet, nothing jumped up as being a quick way to generate data. So, I wrote this piece of software which generates some dummy data form me. I now present that software here for anyone to take and use (modify to your hearts content).

Why load in Parallel?

If you have ever loaded bulk data into any relational database, you will know that it is a slow process. This slowness comes from a number of causes, but one way to combat the slowness is to load multiple tables at the same time. Also, data files often come in different sizes, resulting different tables taking different amounts of time to load, so having something which will load multiple tables at once is very handy. Having a routine which will load numerous tables, on different threads, at the same time means that you can significantly cut the elapse time for the data loads, when compared to a serial process (believe me it does, I’ve done it before today).

Junk or Random Data

The “junk” data has some very special properties, which include:

  • Only numbers (integers), or strings. I did not want the problems of loading other data types (just yet).
  • The strings should be within the maximum length of the database field. Again, I don’t want data errors (just yet).
  • The strings will be simple. There will be no enclosing ‘”’ characters, and the imbedded ‘,’ is out of scope as well. Again, I just wanted the simplest simple string parsing as possible.
  • I did not care if the data made sense. This program makes really random junk text and numbers.
  • There is no referential integrity between the tables being loaded. That means a key in one needed to go into another table. Again, this removes the possibility of loads failing because of violated integrity constraints,
  • There are minimal integrity constraints on the table. This is a two fold constraint:
    1. There are no keys or indexes on the tables.
    2. I just wanted data which would test the load process, not be slowed down by the database doing housekeeping on the way through.

The Main Program

This is a pretty simple main. There are a couple of points to note:

  • The creating of the SqlConnection is a bit a “hangover” from years of writing code which used Oracle. In the Oracle environment connections cost a lot (a relative comparison to SQL Server) to create, and are more flexible (you can have multiple fetches happening on the one connection – the SqlCommand is better isolated).
  • It just passes off to the Generator to do all of the heavy lifting.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Data.SqlClient;
using System.Diagnostics;

namespace CSVDataGenerator
{
    /// <summary>
    /// Program class, which does some of the work
    /// </summary>
    class Program
    {
        /// <summary>
        /// Where the output files are written
        /// </summary>
        private const string outputDir =
@"C:\Users\Craig\Documents\Visual Studio 10\Projects\ParallelDataLoad\LoadData\Small";
        /// <summary>
        /// XML Control file which currently sets the number of output rows in the
        /// generated output files
        /// </summary>
        private const string controlFile =
@"C:\Users\Craig\Documents\Visual Studio 10\Projects\ParallelDataLoad\LoadData\Small\ControlFile.xml";
        /// <summary>
        /// Connect string to the db which contains the tables.
        /// </summary>
        private const string connectString =
"Data Source=CRAIG-PC-1;Initial Catalog=LoadContainer;Integrated Security=True;Pooling=False";
        /// <summary>
        /// Main method.
        /// 
        /// args">Unused, at present</param>
        static void Main(string[] args)
        {
            var dataGenerator = new DataGenerator(outputDir, controlFile);
            using (SqlConnection con = new SqlConnection(connectString))
            {
                try
                {
                    con.Open();
                    dataGenerator.Generate(con);
                }
                // Naughty, but it is a programmer tool.
                // I just want to know if any exceptions are generated
                catch (Exception ex)
                {
                    Debug.WriteLine(ex);
                }
                finally
                {
                    con.Close();
                }
            }
        }
    }
}

Column Type Enumeration

This is very simple, just number and string types. I use this, in part, because of the code completion snippet for the switch statement, which puts in the case labels from the enumeration.

/// <summary>
/// Simple enum to keep the types which this routine will create
/// </summary>
internal enum ColumnType
{
    String, Number
}

Column Information Class

This is just a simple container class for the column details which are read from the database. This could be a struct , but I have arguments when at times. I should spend some time getting to know how to use the struct type more effectively, but that can wait.

/// <summary>
/// Simple data class (could be a struct but I have arguments with them at times)
/// Just holds the key values for a columns being generated.
/// </summary>
internal class ColumnInfo
{
    private string _coumnName;
    private string _columnType;
    private ColumnType _colTypeEnum;
    private int _columnLength;

    public int ColumnLength
    {
        get { return _columnLength; }
        set { _columnLength = value; }
    }

    internal ColumnType ColTypeEnum
    {
        get { return _colTypeEnum; }
        set { _colTypeEnum = value; }
    }

    public string ColumnType
    {
        get { return _columnType; }
        set { _columnType = value; }
    }

    public string CoumnName
    {
        get { return _coumnName; }
        set { _coumnName = value; }
    }
    public ColumnInfo(string colName, string colType, int colLength)
    {
        this._coumnName = colName;
        this._columnType = colType;
        this._columnLength = colLength;
        if (_columnType == "numeric")
            _colTypeEnum = CSVDataGenerator.ColumnType.Number;
        else
            _colTypeEnum = CSVDataGenerator.ColumnType.String;
    }
}

Data Generator Class

This is the class where all of the “heavy lifting” is done. A couple of key point to note:

  • The XML Control File (see sample below) is set up to define the one size for all output files. Extending this xml file to define the number of rows generated by the table is entirely possible. This feature was not something which I need at this point.
  • You do need the tables created in the database prior to using this. Not a big issue in my mind. But, does slow some uses down.
  • Extending the data types supported by this process, would not be a big issue.
    internal class DataGenerator
    {
        /// <summary>
        /// Private string for where to write the output files
        /// </summary>
        private string outputDir;
        /// <summary>
        /// The path and filename form the control file which will be used
        /// </summary>
        private string controlFile;
        /// <summary>
        /// Number of row to be created, read from the control file
        /// </summary>
        private int RowsRequired;

        /// <summary>
        /// Main constructor for the class.
        /// Just stores the two pieces of file information
        /// 
        /// outputDir">The director which the output will be written to
        /// controlFile">The location and name of the control file which will be used</param>
        public DataGenerator(string outputDir, string controlFile)
        {
            // TODO: Complete member initialization
            this.outputDir = outputDir;
            this.controlFile = controlFile;
        }
        /// <summary>
        /// Default constructor, not call. Probably should throw an exception.
        /// </summary>
        internal void Generate()
        {
            throw new NotImplementedException();
        }

        /// <summary>
        /// Main loop
        /// Reads the control file
        /// Finds all tables
        /// Finds all columns for each table
        /// Generates the random data
        /// </summary>
        /// <param name="con">SQLConnect used to find all the tables</param>
        internal void Generate(SqlConnection con)
        {
            ReadControlFile();
            using (SqlCommand cmd = new SqlCommand(
                "select table_name from INFORMATION_SCHEMA.TABLES", con))
            {
                using (SqlDataReader rdr = cmd.ExecuteReader())
                {
                    while (rdr.Read())
                    {
                        string tableName = rdr.GetString(0);
                        var ColumnMap = ReadColumns(con, tableName);
                        BuildTestData(ColumnMap, RowsRequired, tableName, outputDir);
                    }
                }
            }
        }
        /// <summary>
        /// Writes the output file, with the number of rows required
        /// 
        /// ColumnMap">The columns, in order
        /// RowsRequired">The number of rows to write. An argument so that different length outputs could be produced
        /// tableName">Table Name from the Database, used to create the output file name</param>
        /// <param name="outputDir">The directory that the files are written to.</param>
        private void BuildTestData(List<ColumnInfo> ColumnMap, int RowsRequired, string tableName, string outputDir)
        {
            string writeFile = outputDir + "\\" + tableName + ".csv";
            // Skip if already exists. Handy if you don't want to make all of the files.
            if (File.Exists(writeFile))
                return;
            // Random number generator. Initialised with the most random thing I can easily find.
            Random random = new Random((int)DateTime.Now.Ticks);
            // Lots of appending, use a StringBuilder for better performance.
            StringBuilder line = new StringBuilder();
            // Using the write, does the clean up for me.
            using (StreamWriter writer = new StreamWriter(writeFile))
            {
                for (int i = 0; i < RowsRequired; i++)
                {
                    // First column written for the row does not need the ',' first
                    bool bFirst = true;
                    foreach (var col in ColumnMap)
                    {
                        if (bFirst)
                            bFirst = false;
                        else
                            line.Append(",");
                        // Make some data type sensitive random data
                        switch (col.ColTypeEnum)
                        {
                            case ColumnType.String:
                                line.Append(BuildRandomString(col.ColumnLength, random));
                                break;
                            case ColumnType.Number:
                                line.Append(random.Next());
                                break;
                            default:
                                break;
                        }
                    }
                    // write the line to the file
                    writer.WriteLine(line.ToString());
                    // Empty the StringBuilder, saves making a new one each line.
                    line.Clear();
                }
            }
        }
        /// <summary>
        /// Builds a string of "normal" ASCII characters, minus '"' and ','
        /// 
        /// maxLength">The maximum length of the string column
        /// rnd">The Random object to use</param>
        /// <returns></returns>
        private string BuildRandomString(int maxLength, Random rnd)
        {
            // Random length string up to max lenght
            int length = rnd.Next(0, maxLength);
            // StringBuilder, because lots of appends here.
            StringBuilder rndTxt = new StringBuilder();
            for (int i = 0; i < length; i++)
            {
                char nextChar = Convert.ToChar(rnd.Next(32, 126));
                while(nextChar == ',' || nextChar == '"')
                    nextChar = Convert.ToChar(rnd.Next(32, 126));
                rndTxt.Append(nextChar);
            }
            return rndTxt.ToString();
        }
        /// <summary>
        /// Reads the columns from the table, and builds the internal column list.
        /// </summary>
        /// <param name="con">SqlConnection which is used to provide the connection string to be used
        /// tableName">The table we want the columns from</param>
        /// <returns>List of column information objects for the table</returns>
        private List<ColumnInfo> ReadColumns(SqlConnection con, string tableName)
        {
            List<ColumnInfo> results = null;
            // using SqlConnection, lets the framework do the clean up
            using (SqlConnection con1 = new SqlConnection(con.ConnectionString))
            {
                con1.Open();
                // SQL Statement, order by column ordinal, keeps the columns in order
                using (SqlCommand cmd = new SqlCommand(
@"select column_name, data_type, character_maximum_Length, ORDINAL_POSITION " +
"from INFORMATION_SCHEMA.COLUMNS where table_name = @1 ORDER BY ORDINAL_POSITION", con1))
                {
                    // Bind the table name into the statement
                    SqlParameter param1 = new SqlParameter("1", tableName);
                    cmd.Parameters.Add(param1);
                    using (SqlDataReader rdr = cmd.ExecuteReader())
                    {
                        while (rdr.Read())
                        {
                            if (results == null)
                                results = new List<ColumnInfo>();
                            string columnName = rdr.GetString(0);
                            string columnType = rdr.GetString(1);
                            switch (columnType)
                            {
                                case "numeric":
                                    results.Add(new ColumnInfo(columnName, columnType, 0));
                                    break;
                                case "nvarchar":
                                    int columnLength = rdr.GetInt32(2);
                                    results.Add(new ColumnInfo(columnName, columnType, columnLength));
                                    break;
                                default:
                                    break;
                            }
                        }
                    }
                    return results;
                }
            }
        }
        /// <summary>
        /// Reads the control file and pulls out the number of rows to write
        /// </summary>
        private void ReadControlFile()
        {
            XDocument xDoc = XDocument.Load(controlFile);
            string rows = (from a in xDoc.Elements("Control").Elements("FileSize").Attributes("Rows")
                            select a.Value).First();
            RowsRequired = Convert.ToInt32(rows);
        }
    }

Conclusions

This routine is “dumb as a box of hammers”, and only designed to generate random data to be loaded to Sql Server. But, for a quick set of data to test with it does the job.

I hope you find this useful. Like much of what I post here, some assembly is required. But copy and paste coding, is far simpler (and quicker) than writing something from scratch. Please feel free to extend this shell in any way you like.

, , , , , , ,

1 Comment

%d bloggers like this: