Streaming ETL: SQL Change Data Capture (CDC) to Azure Event Hub

I had a recent requirement to capture and stream real-time data changes on several SQL database tables from an on-prem SQL Server to Azure for downstream processing.

Specifically we needed to create a streaming ETL solution that …

  1. Captured intermediate DML operations on tables in an on-prem SQL database
  2. Transmit data securely and real-time into Azure
  3. Store the delta changes as TXT files in Azure Data Lake Store (ADLS)
  4. Visualise the real-time change telemetry on a Power BI dashboard (specifically the number of Inserts, Updates, Deletes over time).

The first part was easy; SQL has a feature called Change Data Capture (CDC) which does an amazing job of tracking DML changes to seperate system tables.  If you dont know about CDC then see here – https://docs.microsoft.com/en-us/sql/relational-databases/track-changes/about-change-data-capture-sql-server

The second part wasn’t easy, and after some searching I came across this blog post by Spyros Sakellariadis which gave me inspiration and starter code for my streaming ETL solution.  Excellent post.  See here – https://azure.microsoft.com/en-us/resources/samples/event-hubs-dotnet-import-from-sql/

And so, the final architecture looks something like this…

The solution picks up the SQL data changes from the CDC Change Tracking system tables, creates JSON messages from the change rows, and then posts the message to an Azure Event Hub.  Once landed in the Event Hub an Azure Stream Analytics (ASA) Job distributes the changes into the multiple outputs.

What I found pretty cool was that I could transmit SQL delta changes from source to target in as little as 5 seconds end to end!

And so, lets get into some CDC to Event Hub data streaming action!

Set Up the Source SQL Database and Tables

The solution requires a table that you want tracked in a source database, as well as a separate offset table to manage the last position/row that was sent to Azure.

This script will create the database, sample table, offset tracking table and enable CDC.

 
-- CREATE Source Database
USE [master]
GO
CREATE DATABASE [SQL2AEH]
GO
ALTER DATABASE [SQL2AEH] SET RECOVERY FULL WITH NO_WAIT
GO
USE [SQL2AEH]
GO
execute dbo.sp_changedbowner @loginame = N'sa'
GO

-- Enable CDC on Source Database
execute sys.sp_cdc_enable_db
GO

-- Create User Table to be Tracked
CREATE TABLE [dbo].[BCTran]
(
 [BCTranID] [int] IDENTITY(1,1) NOT NULL,
 [BCTranDate] [datetime] NULL,
 [PartyAID] [int] NULL,
 [PartyBID] [int] NULL,
 [RoomID] [int] NULL,
 [BCTranStatus] [char](1) NULL,
 CONSTRAINT [pk_BCTran] PRIMARY KEY CLUSTERED 
 (
 [BCTranID] ASC
 )
) 
GO

-- Enable CDC on the User Table
-- ### ENSURE SQL AGENT IS RUNNING BEFORE RUNNING THIS STEP ###
-- This will create the system tracking table [cdc].[dbo_BCTran_CT]
-- This will create and start the following SQL Agent jobs...
-- (1) cdc.SQL2AEH_capture --> Read SQL Txn Log, identify DML on [BCTran], copy change data to [cdc].[dbo_BCTran_CT]
-- (2) cdc.SQL2AEH_cleanup --> Delete data in [cdc].[dbo_BCTran_CT] older than 72 hours (default)
execute sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'BCTran', @role_name = 'cdc_reader'
GO

-- Create Separate Offset Table to Manage Last Position/Row Sent to Azure
CREATE TABLE [dbo].[SQL2AEH_TableOffset]
(
 [TableName] [varchar](250) NOT NULL,
 [LastMaxVal] [binary](10) NOT NULL,
 [LastUpdateDateTime] [datetime] NOT NULL DEFAULT getdate(),
 [LastCheckedDateTime] [datetime] NOT NULL DEFAULT getdate(),
 CONSTRAINT [PK_SQL2AEH_TableOffset_1] PRIMARY KEY NONCLUSTERED 
 (
 [TableName] ASC
 ) 
) 
GO

-- Insert Starting Point for the Tracked User Table
insert into [dbo].[SQL2AEH_TableOffset] select 'dbo.BCTran', 0x00000000000000000000, '1900-01-01 00:00:00', '1900-01-01 00:00:00'
GO

 

Set Up the “SQL CDC 2 Event Hub” Solution

To extract data from the SQL CDC change tracking system tables and create Event Hub messages you need a small c# command line program and an Azure Event Hub to send the messages to.

Create the Azure Event Hub from the Azure Portal

You can easily create a new Azure Event Hub in the Azure Portal under your Azure Subscription.  Once created you will need the connection string of that Event Hub, which you can get from the Azure Portal.

If you need to learn about Event Hubs then see here – https://azure.microsoft.com/en-us/services/event-hubs/

Create the SQL2AEH Command Line Program

For the SQL2AEH program, you can download the source code from my GitHub Repo called SQL CDC 2 Event Hub – https://github.com/rolftesmer/SQLCDC2EventHub

Open the solution in Visual Studio (2015) and edit the “app.config” file;

  • ExecutionControl – 1 = run continuously, 0 = run once then exit (default 1)
  • ExecutionControlSleepMs – milliseconds sleep between each iteration when program is set to run continuously (default 1000)
  • DataTableName – the name of the source SQL Server table in “owner.table” format
  • SQLBatchSize – the number of SQL CDC change rows to bundle into a single JSON Event Hub message
  • sqlDatabaseConnectionString – the connection string for the source SQL Server
  • Microsoft.ServiceBus.EventHubToUse – the name of the target Azure Event Hub
  • Microsoft.ServiceBus.ServiceBusConnectionString – the connection string for the target Azure Event Hub

Build the solution (release).  In the “…\Bin\Release” folder you will see the SQL2AEH.exe program.

This EXE can be run from command line, or for my scenario its configured as a SQL Agent scheduled job to continually poll the CDC change table for new rows to send to Event Hubs.

NOTE:  I am not a seasoned c# developer!  As such the application code is provided free without any support or warranty of any kind.  The code has not been thoroughly tested and is not considered production ready.  The code is provided free of charge and can be reused in any way you wish.  

 

Schedule SQL2AEH.exe in SQL Agent

As we want data to be streamed real-time from SQL CDC we’ll schedule the SQL2AEH.exe program in SQL Agent.   Create a new SQL Agent job as per the below…

Setup a new SQL Agent Job with a single job step

Define the job step to call the SQL2AEH.exe

Schedule the job to start on SQL Agent startup

Create Event Hub Messages from CDC Tracking Tables

Once the source SQL Server, tables and SQL2AEH.exe is setup, we can then test it out!

Lets create some new rows in our source SQL table.

-- Create Some New Rows in our Source SQL Table
insert into [dbo].[BCTran] 
(
 BCTranDate, PartyAID, PartyBID, RoomID, BCTranStatus
) 
select '2017-07-10 15:00:00', 1, 2, 550, 'C'
union 
select '2017-07-10 18:00:00', 2, 3, 650, 'D'
union 
select '2017-07-10 21:00:00', 3, 1, 750, 'I'
GO

 

Lets see what’s in the source SQL table, and then also what’s been tracked by CDC in our CDC system change tracking table (about 1-2 sec later).

select * from [dbo].[BCTran]
GO
select * from [cdc].[dbo_BCTran_CT]
GO

 

There are 3 new rows in our source table, and also 3 news rows tracked by CDC as operation “2” (INSERT).

When you run the SQL2AEH.exe program, it will read any new rows from the [cdc].[dbo_BCTran_CT] tracking table, create a JSON message, and send it to the Azure Event Hub.

In our case it has created a single JSON message with the 3 new rows.

 
[
 {
 "SQL2AEH_TableName": "dbo.BCTran",
 "SQL2AEH_RowNbr": 1,
 "SQL2AEH_$start_lsn_string": "0x0000002500000D310005",
 "SQL2AEH_$seqval_string": "0x0000002500000D310002",
 "SQL2AEH_$update_mask_string": "0x3F",
 "__$start_lsn": "AAAAJQAADTEABQ==",
 "__$end_lsn": null,
 "__$seqval": "AAAAJQAADTEAAg==",
 "__$operation": 2,
 "__$update_mask": "Pw==",
 "BCTranID": 5,
 "BCTranDate": "2017-07-10T15:00:00",
 "PartyAID": 1,
 "PartyBID": 2,
 "RoomID": 550,
 "BCTranStatus": "C"
 },
 {
 "SQL2AEH_TableName": "dbo.BCTran",
 "SQL2AEH_RowNbr": 2,
 "SQL2AEH_$start_lsn_string": "0x0000002500000D310005",
 "SQL2AEH_$seqval_string": "0x0000002500000D310003",
 "SQL2AEH_$update_mask_string": "0x3F",
 "__$start_lsn": "AAAAJQAADTEABQ==",
 "__$end_lsn": null,
 "__$seqval": "AAAAJQAADTEAAw==",
 "__$operation": 2,
 "__$update_mask": "Pw==",
 "BCTranID": 6,
 "BCTranDate": "2017-07-10T18:00:00",
 "PartyAID": 2,
 "PartyBID": 3,
 "RoomID": 650,
 "BCTranStatus": "D"
 },
 {
 "SQL2AEH_TableName": "dbo.BCTran",
 "SQL2AEH_RowNbr": 3,
 "SQL2AEH_$start_lsn_string": "0x0000002500000D310005",
 "SQL2AEH_$seqval_string": "0x0000002500000D310004",
 "SQL2AEH_$update_mask_string": "0x3F",
 "__$start_lsn": "AAAAJQAADTEABQ==",
 "__$end_lsn": null,
 "__$seqval": "AAAAJQAADTEABA==",
 "__$operation": 2,
 "__$update_mask": "Pw==",
 "BCTranID": 7,
 "BCTranDate": "2017-07-10T21:00:00",
 "PartyAID": 3,
 "PartyBID": 1,
 "RoomID": 750,
 "BCTranStatus": "I"
 }
]
GO 

 

If needed to see whats going on, you can use the totally awesome and totally free Azure Service Bus Explorer to peek into your Event Hub to check out what the SQL CDC messages look like and to debug.   This is an essential tool for any Azure streaming solution.

For those who have not used it before – see here – https://code.msdn.microsoft.com/windowsapps/Service-Bus-Explorer-f2abca5a

 

Reading CDC Messages Downstream from Event Hub

Once data is in Azure Event Hub you have many options to process that data.  Essentially anything which can make a call to the Event Hub can consume the SQL CDC data!

I really like Azure Stream Analytics because its simple, fast, cheap and allows multiple streaming output destinations from the same single streaming input feed.

The ASA SQL Query to read from the Azure Event Hub, and write to an Azure SQL DB output would look like this code below.

 
SELECT
 SQL2AEH_$start_lsn_string, 
 SQL2AEH_$seqval_string, 
 SQL2AEH_$update_mask_string, 
 __$operation, 
 BCTranID, 
 BCTranDate, 
 PartyAID, 
 PartyBID, 
 RoomID, 
 BCTranStatus
INTO
 Target_AzureSQLDB
FROM
 Source_AzureEventHub
WHERE SQL2AEH_TableName = 'dbo.BCTran'

 

Source to Target Performance Benchmark

To round it out, I did basic benchmarking to see how long it took from the time a DML change occurred on my on-prem SQL Server (VM) to an Azure SQL Database (PaaS), and also how many rows per sec I could push via the c# program.

Source: On-Prem SQL Server 2016;

  • SQL Server 2016 SP1, Developer Edition
  • 4x 1.73GHZ cores, 8GB RAM, 256GB SSD
  • Australia South East (20KM from Azure DC)
  • Internet path, no express route

Target: Azure Event Hub + Azure Stream Analytics;

  • Australia South East
  • Azure Event Hub = Basic, TU1 (lowest performance setting)
  • Azure Stream Analytics = SU1 (lowest performance setting)

Source-Target: Performance Benchmark Results;

  • Rows per JSON Message (Batch) = 5
  • SQL Send Rate = 47 rows per sec (2820 rows per min)
  • End to End Time = 5-7 sec (source SQL change to landing in Azure SQL DB)

I didn’t really push the limits of the local SQL box, nor the Azure services. The deployed Event Hub and Stream Analytics Job can handle 1000 messages/sec even at the lowest performance setting, so I dont think scalability is any immediate concern.   Performance will instead be bound by how fast the c# application itself can bundle and send the CDC changes.

 

So in Summary

So there you have it – a pretty simple streaming ETL solution to take SQL CDC changes from any SQL Server up to Azure and redirect it to many different ouptut destinations!

There are a couple of limitations that I can think of so far that could occur;

  • As at writing, the max Event Hub message size was 256KB, so ensure that you take this into account when calculating how many CDC rows per batch.  Example – if the SQL row is 356 bytes, then your batch size should be less than 736
  • The max SQL row is 8KB, however SQL rows can extend with LOB data types (ie varchar(max)).  I haven’t tested this scenario.
  • haven’t tested all SQL data types, but I did notice that some won’t convert well from their native format into JSON.  SQL [binary] is one such example.

So as per usual, and as I always say, please test this yourself as your milage may vary!


Disclaimer: all content on Mr. Fox SQL blog is subject to the disclaimer found here

Advertisements

13 thoughts on “Streaming ETL: SQL Change Data Capture (CDC) to Azure Event Hub

  1. alberto 15 September 2017 / 12:44 AM

    Buenas tardes, esto se podría usar con SQL database de azure directamente sin crear el script inicial ?

    Like

    • Mr. Fox SQL (Rolf Tesmer) 15 September 2017 / 9:17 AM

      “Hi, this could be used with Azure SQL Database directly without creating the initial script?”

      Unfortunately no – you will need to create the initial target table in the database before you redirect data. The SQL tables wont auto-create. However if redirecting data to Power BI (via Stream Analytics) then yes Power BI will auto-create the target table/object in the powerbi.com service

      Like

        • Mr. Fox SQL (Rolf Tesmer) 16 September 2017 / 10:20 AM

          Hi Alberto. You definitely wont be able to use my code developed in this article because as you know CDC isn’t supported yet in ASDB – and my solution relies on CDC. However yes you can use the code as described in the article by Spyros’s – his solution was built to specifically work on ASDB.

          Like

  2. Fernando 3 October 2017 / 3:10 AM

    hello mister fox, regarding the deletion of the data in the table after 72 hours where you specify that according to the script?

    – (2) cdc.SQL2AEH_cleanup -> Delete data in [cdc]. [Dbo_BCTran_CT] older than 72 hours (default)
    execute sys.sp_cdc_enable_table @source_schema = ‘dbo’, @source_name = ‘BCTran’, @role_name = ‘cdc_reader’
    GO

    I created the work in sql server but I do not see in the options the time of erasure, I need that the erasure is done every 2 hours, in which part should this be modified? thank you very much.

    Like

  3. Rogert 17 October 2017 / 6:12 AM

    Good afternoon, I have a question, can you just activate the insert 2 operation? since I only need to copy the inserted data, or some way to deactivate the operation 1 (DELETE)? thank you very much.

    Like

    • Mr. Fox SQL (Rolf Tesmer) 17 October 2017 / 1:44 PM

      Yep absolutely.
      The program will send all change data up to the Event Hub, and so all you need to do is add the additional filter to the Stream Analytics (ASA) query (… __$operation = 2).
      That way ASA will still read all data in the event stream but only send matched data through to downstream processing.
      In this blog post I show how this is done for Power BI – https://mrfoxsql.wordpress.com/2017/08/01/streaming-reporting-sql-change-data-capture-cdc-to-power-bi/
      So your ASA query will look like this…
      SELECT
      SQL2AEH_$start_lsn_string,
      SQL2AEH_$seqval_string,
      SQL2AEH_$update_mask_string,
      __$operation,
      BCTranID,
      BCTranDate,
      PartyAID,
      PartyBID,
      RoomID,
      BCTranStatus
      INTO
      Target_AzureSQLDB
      FROM
      Source_AzureEventHub
      WHERE SQL2AEH_TableName = ‘dbo.BCTran’
      AND __$operation = 2

      Like

      • Rogert 17 October 2017 / 1:52 PM

        but before entering event hub? since my architecture does not stream analitycs, it is sql server — event hub — time series insight, so I need to just copy the insert to the swap table

        Like

        • Mr. Fox SQL (Rolf Tesmer) 17 October 2017 / 2:03 PM

          Ok I see – SQL CDC will always capture all the changes into the tracking tables, and you cannot change that. However you can change the code for the c# application SQL2AEH.exe to only get data from the SQL CDC tracking table that is of Operation = 2 (INSERT). Grab the code from GITHUB and change the SELECT statement in the section of code “// GET ROWS FROM SQL CDC CHANGE TRACKING TABLE GREATER THAN THE OFFSET” to update the WHERE clause with “AND __$operation = 2”

          Like

          • Rogert 18 October 2017 / 6:40 AM

            yes! works perfect, thank you very much !!!

            Like

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s