Streaming ETL: SQL Change Data Capture (CDC) to Azure Event Hub

[read this post on Mr. Fox SQL blog]

I had a recent requirement to capture and stream real-time data changes on several SQL database tables from an on-prem SQL Server to Azure for downstream processing.

Specifically we needed to create a streaming ETL solution that …

  1. Captured intermediate DML operations on tables in an on-prem SQL database
  2. Transmit data securely and real-time into Azure
  3. Store the delta changes as TXT files in Azure Data Lake Store (ADLS)
  4. Visualise the real-time change telemetry on a Power BI dashboard (specifically the number of Inserts, Updates, Deletes over time).

The first part was easy; SQL has a feature called Change Data Capture (CDC) which does an amazing job of tracking DML changes to seperate system tables.  If you dont know about CDC then see here – https://docs.microsoft.com/en-us/sql/relational-databases/track-changes/about-change-data-capture-sql-server

The second part wasn’t easy, and after some searching I came across this blog post by Spyros Sakellariadis which gave me inspiration and starter code for my streaming ETL solution.  Excellent post.  See here – https://azure.microsoft.com/en-us/resources/samples/event-hubs-dotnet-import-from-sql/

And so, the final architecture looks something like this…

The solution picks up the SQL data changes from the CDC Change Tracking system tables, creates JSON messages from the change rows, and then posts the message to an Azure Event Hub.  Once landed in the Event Hub an Azure Stream Analytics (ASA) Job distributes the changes into the multiple outputs.

What I found pretty cool was that I could transmit SQL delta changes from source to target in as little as 5 seconds end to end!

And so, lets get into some CDC to Event Hub data streaming action!

Set Up the Source SQL Database and Tables

The solution requires a table that you want tracked in a source database, as well as a separate offset table to manage the last position/row that was sent to Azure.

This script will create the database, sample table, offset tracking table and enable CDC.

 
-- CREATE Source Database
USE [master]
GO
CREATE DATABASE [SQL2AEH]
GO
ALTER DATABASE [SQL2AEH] SET RECOVERY FULL WITH NO_WAIT
GO
USE [SQL2AEH]
GO
execute dbo.sp_changedbowner @loginame = N'sa'
GO

-- Enable CDC on Source Database
execute sys.sp_cdc_enable_db
GO

-- Create User Table to be Tracked
CREATE TABLE [dbo].[BCTran]
(
 [BCTranID] [int] IDENTITY(1,1) NOT NULL,
 [BCTranDate] [datetime] NULL,
 [PartyAID] [int] NULL,
 [PartyBID] [int] NULL,
 [RoomID] [int] NULL,
 [BCTranStatus] [char](1) NULL,
 CONSTRAINT [pk_BCTran] PRIMARY KEY CLUSTERED 
 (
 [BCTranID] ASC
 )
) 
GO

-- Enable CDC on the User Table
-- ### ENSURE SQL AGENT IS RUNNING BEFORE RUNNING THIS STEP ###
-- This will create the system tracking table [cdc].[dbo_BCTran_CT]
-- This will create and start the following SQL Agent jobs...
-- (1) cdc.SQL2AEH_capture --> Read SQL Txn Log, identify DML on [BCTran], copy change data to [cdc].[dbo_BCTran_CT]
-- (2) cdc.SQL2AEH_cleanup --> Delete data in [cdc].[dbo_BCTran_CT] older than 72 hours (default)
execute sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'BCTran', @role_name = 'cdc_reader'
GO

-- Create Separate Offset Table to Manage Last Position/Row Sent to Azure
CREATE TABLE [dbo].[SQL2AEH_TableOffset]
(
 [TableName] [varchar](250) NOT NULL,
 [LastMaxVal] [binary](10) NOT NULL,
 [LastUpdateDateTime] [datetime] NOT NULL DEFAULT getdate(),
 [LastCheckedDateTime] [datetime] NOT NULL DEFAULT getdate(),
 CONSTRAINT [PK_SQL2AEH_TableOffset_1] PRIMARY KEY NONCLUSTERED 
 (
 [TableName] ASC
 ) 
) 
GO

-- Insert Starting Point for the Tracked User Table
insert into [dbo].[SQL2AEH_TableOffset] select 'dbo.BCTran', 0x00000000000000000000, '1900-01-01 00:00:00', '1900-01-01 00:00:00'
GO

 

Set Up the “SQL CDC 2 Event Hub” Solution

To extract data from the SQL CDC change tracking system tables and create Event Hub messages you need a small c# command line program and an Azure Event Hub to send the messages to.

Create the Azure Event Hub from the Azure Portal

You can easily create a new Azure Event Hub in the Azure Portal under your Azure Subscription.  Once created you will need the connection string of that Event Hub, which you can get from the Azure Portal.

If you need to learn about Event Hubs then see here – https://azure.microsoft.com/en-us/services/event-hubs/

Create the SQL2AEH Command Line Program

For the SQL2AEH program, you can download the source code from my GitHub Repo called SQL CDC 2 Event Hub – https://github.com/rolftesmer/SQLCDC2EventHub

Open the solution in Visual Studio (2015) and edit the “app.config” file;

  • ExecutionControl – 1 = run continuously, 0 = run once then exit (default 1)
  • ExecutionControlSleepMs – milliseconds sleep between each iteration when program is set to run continuously (default 1000)
  • DataTableName – the name of the source SQL Server table in “owner.table” format
  • SQLBatchSize – the number of SQL CDC change rows to bundle into a single JSON Event Hub message
  • sqlDatabaseConnectionString – the connection string for the source SQL Server
  • Microsoft.ServiceBus.EventHubToUse – the name of the target Azure Event Hub
  • Microsoft.ServiceBus.ServiceBusConnectionString – the connection string for the target Azure Event Hub

Build the solution (release).  In the “…\Bin\Release” folder you will see the SQL2AEH.exe program.

This EXE can be run from command line, or for my scenario its configured as a SQL Agent scheduled job to continually poll the CDC change table for new rows to send to Event Hubs.

NOTE:  I am not a seasoned c# developer!  As such the application code is provided free without any support or warranty of any kind.  The code has not been thoroughly tested and is not considered production ready.  The code is provided free of charge and can be reused in any way you wish.  

 

Schedule SQL2AEH.exe in SQL Agent

As we want data to be streamed real-time from SQL CDC we’ll schedule the SQL2AEH.exe program in SQL Agent.   Create a new SQL Agent job as per the below…

Setup a new SQL Agent Job with a single job step

Define the job step to call the SQL2AEH.exe

Schedule the job to start on SQL Agent startup

Create Event Hub Messages from CDC Tracking Tables

Once the source SQL Server, tables and SQL2AEH.exe is setup, we can then test it out!

Lets create some new rows in our source SQL table.

-- Create Some New Rows in our Source SQL Table
insert into [dbo].[BCTran] 
(
 BCTranDate, PartyAID, PartyBID, RoomID, BCTranStatus
) 
select '2017-07-10 15:00:00', 1, 2, 550, 'C'
union 
select '2017-07-10 18:00:00', 2, 3, 650, 'D'
union 
select '2017-07-10 21:00:00', 3, 1, 750, 'I'
GO

 

Lets see what’s in the source SQL table, and then also what’s been tracked by CDC in our CDC system change tracking table (about 1-2 sec later).

select * from [dbo].[BCTran]
GO
select * from [cdc].[dbo_BCTran_CT]
GO

 

There are 3 new rows in our source table, and also 3 news rows tracked by CDC as operation “2” (INSERT).

When you run the SQL2AEH.exe program, it will read any new rows from the [cdc].[dbo_BCTran_CT] tracking table, create a JSON message, and send it to the Azure Event Hub.

In our case it has created a single JSON message with the 3 new rowsThis is the look/feel of a typical message which would land in the Event Hub. 

  • Columns 1-5 named “SQL2AEH_” are control fields added by the SQL2AEH program.
  • Columns 6-10 named “__$” are control fields added by SQL CDC.
  • Columns 11+ are all from the source table.
 
[
 {
 "SQL2AEH_TableName": "dbo.BCTran",
 "SQL2AEH_RowNbr": 1,
 "SQL2AEH_$start_lsn_string": "0x0000002500000D310005",
 "SQL2AEH_$seqval_string": "0x0000002500000D310002",
 "SQL2AEH_$update_mask_string": "0x3F",
 "__$start_lsn": "AAAAJQAADTEABQ==",
 "__$end_lsn": null,
 "__$seqval": "AAAAJQAADTEAAg==",
 "__$operation": 2,
 "__$update_mask": "Pw==",
 "BCTranID": 5,
 "BCTranDate": "2017-07-10T15:00:00",
 "PartyAID": 1,
 "PartyBID": 2,
 "RoomID": 550,
 "BCTranStatus": "C"
 },
 {
 "SQL2AEH_TableName": "dbo.BCTran",
 "SQL2AEH_RowNbr": 2,
 "SQL2AEH_$start_lsn_string": "0x0000002500000D310005",
 "SQL2AEH_$seqval_string": "0x0000002500000D310003",
 "SQL2AEH_$update_mask_string": "0x3F",
 "__$start_lsn": "AAAAJQAADTEABQ==",
 "__$end_lsn": null,
 "__$seqval": "AAAAJQAADTEAAw==",
 "__$operation": 2,
 "__$update_mask": "Pw==",
 "BCTranID": 6,
 "BCTranDate": "2017-07-10T18:00:00",
 "PartyAID": 2,
 "PartyBID": 3,
 "RoomID": 650,
 "BCTranStatus": "D"
 },
 {
 "SQL2AEH_TableName": "dbo.BCTran",
 "SQL2AEH_RowNbr": 3,
 "SQL2AEH_$start_lsn_string": "0x0000002500000D310005",
 "SQL2AEH_$seqval_string": "0x0000002500000D310004",
 "SQL2AEH_$update_mask_string": "0x3F",
 "__$start_lsn": "AAAAJQAADTEABQ==",
 "__$end_lsn": null,
 "__$seqval": "AAAAJQAADTEABA==",
 "__$operation": 2,
 "__$update_mask": "Pw==",
 "BCTranID": 7,
 "BCTranDate": "2017-07-10T21:00:00",
 "PartyAID": 3,
 "PartyBID": 1,
 "RoomID": 750,
 "BCTranStatus": "I"
 }
]
GO 

If needed to see whats going on, you can use the totally awesome and totally free Azure Service Bus Explorer to peek into your Event Hub to check out what the SQL CDC messages look like and to debug.   This is an essential tool for any Azure streaming solution.

For those who have not used it before – see here – https://code.msdn.microsoft.com/windowsapps/Service-Bus-Explorer-f2abca5a

 

Reading CDC Messages Downstream from Event Hub

Once data is in Azure Event Hub you have many options to process that data.  Essentially anything which can make a call to the Event Hub can consume the SQL CDC data!

I really like Azure Stream Analytics because its simple, fast, cheap and allows multiple streaming output destinations from the same single streaming input feed.

The ASA SQL Query to read from the Azure Event Hub, and write to an Azure SQL DB output would look like this code below.

Because the event sent into Event Hubs has the table name in the payload “SQL2AEH_TableName” then this provides flexibility to have the SQL2AEH program send multiple streams of CDC traffic to the same Event Hub and simply have a single ASA job with multiple ASA SQL queries with a where clause filtering content for a specific table.

 
SELECT
 SQL2AEH_$start_lsn_string, 
 SQL2AEH_$seqval_string, 
 SQL2AEH_$update_mask_string, 
 __$operation, 
 BCTranID, 
 BCTranDate, 
 PartyAID, 
 PartyBID, 
 RoomID, 
 BCTranStatus
INTO
 Target_AzureSQLDB
FROM
 Source_AzureEventHub
WHERE SQL2AEH_TableName = 'dbo.BCTran'

 

Source to Target Performance Benchmark

To round it out, I did basic benchmarking to see how long it took from the time a DML change occurred on my SQL table above on my on-prem SQL Server (VM) to an Azure SQL Database (PaaS), and also how many rows per sec I could push via the SQL2AEH program.

TEST Source Configuration: On-Prem SQL Server 2016;

  • SQL Server 2016 SP1, Developer Edition
  • 4x 1.73GHZ cores, 8GB RAM, 256GB SSD
  • Melbourne, Australia (20KM from Azure DC)
  • 10Mb/sec Internet path, no express route

TEST Target Configuration: Azure Event Hub + Stream Analytics + SQL Database;

  • All resources in Australia South East Region (Melbourne)
  • Azure Event Hub = Standard, TU5, 4 partitions, no compression
  • Azure Stream Analytics = SU6 (allocates entire node), using PartitionId
  • Azure SQL Database = PRS1 (125DTU), single target table (heap)

TEST Workload Configuration;

  • 100000 individual insert rows created in source SQL table
  • No delay between insert of rows into source SQL table
  • SQL CDC capture job to run continuously
  • SQL2AEH send program to run continuously
  • SQL2AEH delay between send events = 0 ms
  • SQL2AEH program batch size (rows per JSON event payload) = 500
  • SQL2AEH event size = approx 85 bytes

RESULTS Source-Target Performance Benchmark;

  • Total Time to Insert Individual Rows on Source SQL Server = 25 sec
  • Avg Row Insert Rate on Source SQL Server = 3850 rows/sec (231000 rows/min)
  • Row Receive Rate on Azure SQL DB = 166 rows/sec (9960 rows/min)
  • Min Event Speed: SQL Server to Azure SQL DB = 8 sec (fastest receive time)
  • Max Event Speed: SQL Server to Azure SQL DB = 582 sec (slowest receive time)
  • Avg Event Speed: SQL Server to Azure SQL DB = 288 sec
  • Azure Stream Analytics SU% = 5%
  • Azure SQL Database DTU% = 3%

The test didn’t push the limits of the local SQL box, nor the Azure services either.  In fact the Event Hub, Stream Analytics Job and SQL DB shows they weren’t being fully utilised, so no immediate scalability concern on the cloud side.

Instead performance was completely bound by how fast the SQL2AEH program can bundle and send the SQL CDC changes from the on-prem SQL Server.

In this case – it I couldn’t push past a sustained level of 166 rows/sec, which isn’t too bad, but couldn’t keep up with my very high row creation rate.  This is OK for occasional spikes, but not a sustained level.  However, I would think that with a bit of application tuning, and perhaps even looking to leverage multiple source SQL CDC table read + send threads, then I would anticipate orders of magnitude improvements!  (there’s a challenge for someone!)

 

So in Summary

So there you have it – a pretty simple streaming ETL solution to take SQL CDC changes from any SQL Server up to Azure and redirect it to many different ouptut destinations!

There are a couple of limitations that I can think of so far that could occur;

  • As at writing, the max Event Hub message size was 256KB (see ** below), so ensure that you take this into account when calculating how many CDC rows per batch.  Example – if the SQL row is 356 bytes, then your batch size should be less than 736
  • ** EDIT 21 Oct 2017 – Microsoft now support compressed input for Stream Analytics which means the payload can be compressed (ie gzip) on the event producer before sending to Event Hub.  This dramatically reduces the effective event size, and increases the potential send batch size.  See here – https://docs.microsoft.com/en-us/azure/stream-analytics/stream-analytics-define-inputs#compression 
  • The max SQL row is 8KB, however SQL rows can extend with LOB data types (ie varchar(max)).  I haven’t tested this scenario.
  • haven’t tested all SQL data types, but I did notice that some won’t convert well from their native format into JSON.  SQL [binary] is one such example.

So as per usual, and as I always say, please test this yourself as your milage may vary!


Disclaimer: all content on Mr. Fox SQL blog is subject to the disclaimer found here

Advertisements

20 thoughts on “Streaming ETL: SQL Change Data Capture (CDC) to Azure Event Hub

  1. alberto 15 September 2017 / 12:44 AM

    Buenas tardes, esto se podría usar con SQL database de azure directamente sin crear el script inicial ?

    Like

    • Mr. Fox SQL (Rolf Tesmer) 15 September 2017 / 9:17 AM

      “Hi, this could be used with Azure SQL Database directly without creating the initial script?”

      Unfortunately no – you will need to create the initial target table in the database before you redirect data. The SQL tables wont auto-create. However if redirecting data to Power BI (via Stream Analytics) then yes Power BI will auto-create the target table/object in the powerbi.com service

      Like

        • Mr. Fox SQL (Rolf Tesmer) 16 September 2017 / 10:20 AM

          Hi Alberto. You definitely wont be able to use my code developed in this article because as you know CDC isn’t supported yet in ASDB – and my solution relies on CDC. However yes you can use the code as described in the article by Spyros’s – his solution was built to specifically work on ASDB.

          Like

  2. Fernando 3 October 2017 / 3:10 AM

    hello mister fox, regarding the deletion of the data in the table after 72 hours where you specify that according to the script?

    – (2) cdc.SQL2AEH_cleanup -> Delete data in [cdc]. [Dbo_BCTran_CT] older than 72 hours (default)
    execute sys.sp_cdc_enable_table @source_schema = ‘dbo’, @source_name = ‘BCTran’, @role_name = ‘cdc_reader’
    GO

    I created the work in sql server but I do not see in the options the time of erasure, I need that the erasure is done every 2 hours, in which part should this be modified? thank you very much.

    Like

  3. Rogert 17 October 2017 / 6:12 AM

    Good afternoon, I have a question, can you just activate the insert 2 operation? since I only need to copy the inserted data, or some way to deactivate the operation 1 (DELETE)? thank you very much.

    Like

    • Mr. Fox SQL (Rolf Tesmer) 17 October 2017 / 1:44 PM

      Yep absolutely.
      The program will send all change data up to the Event Hub, and so all you need to do is add the additional filter to the Stream Analytics (ASA) query (… __$operation = 2).
      That way ASA will still read all data in the event stream but only send matched data through to downstream processing.
      In this blog post I show how this is done for Power BI – https://mrfoxsql.wordpress.com/2017/08/01/streaming-reporting-sql-change-data-capture-cdc-to-power-bi/
      So your ASA query will look like this…
      SELECT
      SQL2AEH_$start_lsn_string,
      SQL2AEH_$seqval_string,
      SQL2AEH_$update_mask_string,
      __$operation,
      BCTranID,
      BCTranDate,
      PartyAID,
      PartyBID,
      RoomID,
      BCTranStatus
      INTO
      Target_AzureSQLDB
      FROM
      Source_AzureEventHub
      WHERE SQL2AEH_TableName = ‘dbo.BCTran’
      AND __$operation = 2

      Like

      • Rogert 17 October 2017 / 1:52 PM

        but before entering event hub? since my architecture does not stream analitycs, it is sql server — event hub — time series insight, so I need to just copy the insert to the swap table

        Like

        • Mr. Fox SQL (Rolf Tesmer) 17 October 2017 / 2:03 PM

          Ok I see – SQL CDC will always capture all the changes into the tracking tables, and you cannot change that. However you can change the code for the c# application SQL2AEH.exe to only get data from the SQL CDC tracking table that is of Operation = 2 (INSERT). Grab the code from GITHUB and change the SELECT statement in the section of code “// GET ROWS FROM SQL CDC CHANGE TRACKING TABLE GREATER THAN THE OFFSET” to update the WHERE clause with “AND __$operation = 2”

          Like

          • Rogert 18 October 2017 / 6:40 AM

            yes! works perfect, thank you very much !!!

            Liked by 1 person

  4. Brent Lightsey 25 January 2018 / 1:19 AM

    Thank you for writing out these instructions so thoroughly and clearly, Rolf! I may face a similar challenge soon, but sadly, won’t have SQL Server as my data source. So I’m trying to explore of there are any options other than polling the source table. Not coming up with any so far, except perhaps to somehow use triggers to feed other tables, so we don’t have to depend on the source only.

    Would love to hear how this has progressed as it has been used over the last few months?

    Liked by 1 person

    • Mr. Fox SQL (Rolf Tesmer) 25 January 2018 / 8:35 PM

      Hi Brent – my solution was specific to SQL and so it was pretty easy to leverage CDC for what I needed. What is the source DB? Does it have a CDC-like capability? If then, then yep you will need to roll your own using something like Triggers on all the tables you want to distribute. You could look at paid 3rd party tools like Attunity as well.

      Like

  5. Ramki 1 July 2018 / 5:50 AM

    Thanks a lot for the detailed instructions. The article starts with “several” SQL database tables but the example contains only one table.

    How do you do if we want to stream data from multiple tables?

    1. All tables into one event hub or
    2. Each table data to a corresponding event hub in Azure Event Hubs

    Liked by 1 person

    • Mr. Fox SQL (Rolf Tesmer) 1 July 2018 / 10:37 AM

      You would need to run seperate SQL2AEH.exe programs, one per table you want to push from SQL into Azure. You could modify the exe so that you pass the name of the table on the command line, as opposed to in the app.config. You really just need one Event Hub, as the JSON being sent has the table name as a field. Ensure the Hub has a TU scale which is enough to handle all the data from multiple tables. Stream Analytics can read from the same single Hub where all of the data is landed and using a WHERE clause select data from each of the tables to downstream systems. Historically I have found if you are pushing dozens of tables then having a hub per table can be an overhead, the only real benefit of that path is to isolate the data feeds into separate streams. If volumes are relatively low then its probably unnecessary.

      Like

  6. Ramki Palle 1 July 2018 / 11:48 PM

    I have about 100 tables. Do you still suggest running separate SQL2AEH.exe for each table? I am thinking of adding a for loop to iterate through all tables inside do..while loop.

    Like

    • Mr. Fox SQL (Rolf Tesmer) 2 July 2018 / 9:41 AM

      Yep nice one, if you are OK to change the exe then that is probably the better option. Perhaps can have the list of tables as a CSV sting in the app.config.

      Like

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google+ photo

You are commenting using your Google+ account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

w

Connecting to %s