Creating a Custom Windows Azure Table Storage Importer

As mentioned in my previous post on Windows Azure Table Storage, I put together a basic importer for moving records stored in CSV files into Windows Azure Table Storage.

I have uploaded the code to GITHUB here.  Feel free to borrow and repurpose the code as you see fit as a learning tool.  This is not production class code but sufficient for an initial proof of concept.

There are a few sources of information that I was able to use to build my tool that include:

This article walks through some key ideas on how the utility works and how I architected and developed the solution.

Basic Requirements

The requirements for my import utility are as follows:

  • Read a CSV file as a list of records
  • Identify a partition key and row identifier key for each record
  • Insert each entity into Windows Azure Table Storage

Setting Up the Environment

I used Visual Studio 2012 and the latest Windows Azure SDK.  I used Visual Studio Online to manage the source code.  For testing, I used initially the Windows Azure Storage Emulator locally and then tested in the cloud using a live Windows Azure Storage Account.

Basics of the Architecture

I created two projects: 1) a console application called “WindowsAzureTableStorageImporter” used to read in CSV files and manage the overall process and 2) a class library called “WindowsAzureTableStorage” that contains the key classes for managing transactions with Windows Azure Table Storage.

Console App: Configuration Parameters

In designing the console application, I added a few configuration parameters that are used to configure the import process:

  • StorageConnectionString: ConnectionString for WindowsAzureStorageAccount
    FileName: Path to CSV File
    PartitionKeyField: Name of one of the fields in the CSV to use as a partitionkey
    RowKeyField: Name of one of the fields to use as a rowkeyfield – if this is not supplied then importer will use Guid.NewGuid() to generate a new ID
    AzureTableName: Name of the table to create and add the entities
    MaximumRowsToImport: Maximum number of rows to load into Azure – if this is left out or less than 0 the program will load in all rows available.
    StartingOffset: Starting row to start adding from the CSV file.
    MaximumTasks: Maximum number of async requests to create before waiting for them to finish.

The console application is responsible for reading in these configuration parameters and passing them to the WindowsAzureTableStorageService class as appropriate.

Console App: Reading in CSV File

I borrowed a publically available package for reading in CSV files.  This package works really well, it’s fast and its configurable.  In my version, I assume a very basic CSV file with headers in the first row and comma as a field delimiter.  The CSVHelper package supports configuration options to change these assumptions and do mapping of incoming fields to in memory entities. 

Reading in the file is pretty simple – we locate the CSV file and read through the file row by row.  For each row, we identify the partition key, the row key and any additional fields and add them to a DictionaryTableEntity.

// open the file and start reading
StreamReader textReader = File.OpenText(fileName);
CsvReader reader = new CsvReader(textReader);

while (reader.Read())
{
    if (reader.Row-1 < startingOffset)
    {
        // Row -1 is because the header
        // do nothing – move read to the startingOffset
    }
    else
    {
        // populate an entity for each row
        DictionaryTableEntity entity = new DictionaryTableEntity();

        if (rowKeyField == null)
        {
            entity.RowKey = Guid.NewGuid().ToString();
        }

        foreach (string field in reader.FieldHeaders)
        {
            if (field == rowKeyField)
            {
                entity.RowKey = reader[field];
            }
            else if (field == partitionKeyField)
            {
                entity.PartitionKey = WindowsAzureTableStorageService.createValidPartitionKey(reader[field]);
            }
            else
            {
                string value = reader[field];
                entity.Add(field, value);
            }
        }
        if (entity.PartitionKey == null)
            throw new Exception(“Bad data record. Partition key not found.”);

        entities.Add(entity);
        if (maximumRowsToImport > 0 && entities.Count == maximumRowsToImport)
            break;
    }

I added in the concept of an startingOffset and a maximumRowsToImport to limit the rows stored.  This is helpful for testing in that we can take a file that has 500,000 rows but only test with rows 100-200.  This is also helpful as we will see in running multiple instances of the importer all importing records but from different ranges of the file.

DictionaryTableEntity

In order to load our rows, we need an object to store each row.  Windows Azure Table Storage provides an interface called ITableEntity that can be used to create a custom class that represents the entity to be added to the table. 

In this case, the key requirement is to support a dynamic list of properties since we don’t know what fields are in the CSV file.  After reading through a few suggestions, I created a DictionaryTableEntity class which implements the ITableEntity interface as well as the IDictionary interface and provides the functionality similar to a Dictionary List where you can add any field as a name value combination like so:

entity.Add(field, value);

In this case, the field is determined based on the CSV file layout and the value is the corresponding row value for each record.

We can store these entities in a simple list and we pass the list of entities to the WindowsAzureTableStorageService to be processed as a series of batches.

WindowsAzureTableStorageService: Creating a Table

Creating a table is easy – you connect to the storage account and create the table.

// Retrieve the storage account from the connection string.
CloudStorageAccount storageAccount = CloudStorageAccount.Parse(storageConnectionString);

// Create the table client.
CloudTableClient tableClient = storageAccount.CreateCloudTableClient();

// Create the table if it doesn’t exist.
CloudTable table = tableClient.GetTableReference(TableName);
table = tableClient.GetTableReference(TableName);
table.CreateIfNotExists();

In this case we create the table if it doesn’t already exist.

Important Note on Deleting Tables: if you delete a table either through Azure Storage Explorer or through code, it performs the action asynchronously.   According to Microsoft it can take up to 40 seconds for the actual delete to happen and if you try to create the table while its in progress of being deleted, you will receive a StorageException with a “409” error.

WindowsAzureTableStorageService: Adding Batches

The Table Storage API allows for adding entities one by one or in batches.  Given that we have hundreds of thousands of records, using batches is going to be much faster because we can add up to one hundred records in a single call.  Each call is done through HTTP so its reasonably slow, so the fewer calls we can make the faster our import will be.  In addition, transaction costs are calculated per call, so adding records in a batch is considered one transaction from a billing perspective.

The key rules for adding a batch in Windows Table Azure Storage are:

  • A maximum of 100 entities can be added in a batch
  • All the entities in a batch must have the same partition key

So the basic logic of the AddBatch method is as follows:

  • Sort the list of entities by PartitionKey so that we can group by PartitionKey into batches.
  • When we hit 100 entities, commit the batch and start a new one.
  • When we hit a new PartitionKey, commit the batch and start a new one.

WindowsAzureTableStorage: Speeding Up By Using ASYNC

In the original AddBatch method, I used a synchronous call for each ExecuteBatch call.  However, this is quite slow especially with a high latency connection to the Azure data center.

We can speed this up by using an asynchronous call to the Windows Azure API.  The idea is that instead of waiting for the HTTP call to finish processing, we’ll start on the next batch and make another HTTP call.  Instead of having only one connection at a time, we can have hundreds of connections open all processing batches in progress.

The Windows Azure API supports the new .NET ASYNC pattern through the ExecuteBatchAsync method.  The strategy that is employed in my code works by taking all the batches that have been organized and for each one creating a ASYNC task that returns immediate but is now processing in the background. 

.NET now also provides the ability to wait for all these asynchronous tasks to complete and we bubble this waiting up to the console app so that it doesn’t shut down before all the tasks are complete.  The way it works is through a .NET method called Task.WhenAll which takes all of the tasks you have created and creates a single task that represents all of those tasks and waits for them to all complete. 

foreach (TableBatchOperation batch in batchOperations)
{
batchCount++;
taskCount++;
Debug.WriteLine(“Adding batch ” + batchCount + ” of ” + batchOperations.Count);
Task<IList<TableResult>> task = table.ExecuteBatchAsync(batch);
batchTasks.Add(task);
if (MaximumTaskCount > 0 && taskCount >= MaximumTaskCount)
{
  Debug.WriteLine(“Maximum task threshold reached – waiting for    existing tasks to finish.”);
  await Task.WhenAll(batchTasks);
  taskCount = 0;
  }
}
await Task.WhenAll(batchTasks);

One other feature I added was the ability to set the maximum number of asynchronous processes created at a time.  If we have 500,000 records to import, we could have potentially thousands of batches.  In my testing especially where there was a high latency connection, creating thousands of requests would end up overloading the network and timing out one of the connections.  So instead, we have a configurable parameter that throttles the number of asynchronous calls in flight to a maximum number and if it reaches the maximum it waits for them to finish before starting up a batch of calls.  In my testing, having 200 asynchronous calls in flight at once worked quite well and was reasonably fast – having 500 tended to create random timeouts from my home network connecting to Azure. 

In our console application, we get this task returned and we simply wait for it to complete:

var task = tableStorageService.AddBatchAsync(“test”, entities, maximumTasks);
task.Wait();

In our console application, we wait until all the batch insert tasks are completed and then we’re done processing.

Speeding Up Using Windows Azure VMs and Multiple Console Instances

There are a couple ways to speed up the import process further:

  • Remove the latency through using a Windows Azure virtual machine to run the importer close to the Table Storage network.
  • Run multiple instances of the importer to force usage of more available cores.

I created an extra large VM which has 8 cores and 14 gigs of RAM and deployed my code to that VM.  I created 5 instances of my console app and set each one to process the same CSV file but at different starting offsets – the first one imports records 0-100,000, the second imports 100,001 to 200,000 and so on. 

image

We can now run five instances of the application, each with 200 asynchronous calls plowing data into the same table.

Here is what the applications look like in the Task Manager – as you can see the CPU hits about 10-15% and memory hits about 15% running all 5 instances at once.  We could in theory run 20-30 instances and gain even faster performance.

image

Loading in 500,000 records takes about 5 minutes.  This is pretty good but slow in comparison to the results reported by Troy Hunt who was able to demonstrate 22,500 rows being inserted per second!

Share Button

leave your comment