Change Data Capture (CDC)

Introduction

Change Data Capture (CDC) refers to all of the techniques used to identify and capture changes needed in a database. This process is fundamental as data often changes at a high speed, and this may cause your database to change or grow. Therefore, being able to address these changes in an effective way is important to ensure that your code, application and software work in an efficient way.

By using CDC, organizations are able to identify and integrate the changes in a database more quickly and with fewer resources. In other words, CDC reduces both the time required to analyze data and the cost of resources.

Requirements of Database Systems

flowchart TD
A[Source of Truth] -->B
B[System Logic]
B --> C[Cache]
B --> D[Search Index]
B --> E[Data Warehouse]

After database data gets propagated past the source of truth, it's effectively derived data. All of the blocks in the diagram that are results of the system logic beyond system logic are derived data.

These components allow expanding the capabilities of the system and addressing many (non-exhaustive) requirements of database systems.

CDC Methods

Dual Writes

Network Level Sniffing

Database Triggers

Periodic Queries

CDC Core Components

Propagating Changes

There are several ways to implement change tracking, a lot of the CDC methods mentioned before will work with all, some or none of the following methods.

Audit Columns

Audit columns are some of the most common CDC techniques. To implement this method, you need to add columns to all tables that are tracked to timestamp the moment when a row was created or updated. In SQL parlance, this is generally any time the INSERT or UPDATE statements are run. Think adding a column called last_changed created_time. (Kutay 2021)

Implementing Audit Column Tracking

This technique can be implemented by these steps:

  1. Add columns named something like last_changed & created_time to the tables.
  2. Get the maximum value of both the target tables' last_changed and created_time columns.
  3. Select rows where created_time is greater than final final table's created_time
  4. Select all rows from original table that have an updated_time, but less than its maximum created_time value.
  5. Insert new rows from third step or modify existing rows from the fourth step. (Kutay 2021)

The advantage of this method is that it's easy to implement using simple queries. The main disadvantage is that it's easy to make errors that cause inconsistencies.

Table Deltas

Table deltas are one of the easiest techniques to implement CDC.

Suppose this table:

customer_id last_purchase
1 2021-03-13
2 2021-05-07
3 2021-10-24

Now suppose the customer with ID equal to 1 makes a new purchase on 2021-10-31. Another customer of ID 4 is added makes their first purchase on 2021-11-02. Using table deltas, the table becomes:

customer_id last_purchase
1 2021-10-31
2 2021-05-07
3 2021-10-24
4 2021-11-02

The advantage of this approach is that it provides an accurate view of the changed data using simple queries. The disadvantage of this approach is related to memory. The demand for data storage increases because you need three copies of the data sources being used in this technique: the original data, the previous snapshot, and the current snapshot. (Kutay 2021)

Basic CDC Workflow

Practical Example Introduction

Let's try and implement such a system.

  1. We'll start with a MySQL relational database as the source of truth.
  2. Timestamp tracked changes will be captured and propagated by periodic queries to first MongoDB.
  3. Then perform CDC from document database MongoDB to key-value database Redis.
  4. Then perform CDC from Redis to the Data Warehouse Cassandra.

Each database performs a different function to service different needs. Ultimately though it's important the MySQL database is the source of truth or system of record. To demonstrate the flow of propagating changes, here's a diagram of the workflow.

flowchart LR
MySQL --> |Timestamp Sync| MySQL
MySQL --> |Propagate| MongoDB
MongoDB --> |Propagate| Redis
Redis --> |Propagate| Cassandra

Periodic Queries Practical Example

Here we will use the periodic queries method to capture changes in the source of truth, the MySQL database. Here is a flowchart showing the workflow.

flowchart TD
DB[QueryDB] --> Ch{Changed?}
Ch --> |No| X
X[Query again X times] --> DB
Ch --> |Yes - Capture Changes| Ca
Ca[Select New Data] --> |Propagate Changes| P
P[Save/Share data]

This process gets repeated for each database in the system, starting with the system of record, MySQL. First, a timed event triggers the QueryDB task. Then some method of detecting changes since the previous timestamp is checked. If there's no change intermediary checks are queued to check again before the next one. If changes are detected, then the data gets selected, i.e. the changes are captured. Then those changes are saved and shared to the next system in the chain, i.e. the changes are propagated.

Practical CDC Example Implementation Overview

  1. Create containers
  2. Check on logic
  3. Create a change tracking system
  4. Write an automated system
  5. Break it down into components
  6. Check for intended functionality

Event-Based Approach to CDC

When creating a CDC system, you have to detect changes, capture them, then propagate them. There can be a lot of moving parts behind these seemingly simple three steps. Below is a summarizing diagram of this.

flowchart LR
S[State change] --> D
D[Database] --> |Query| D
D --> C[Client]
C --> 1[Notify]
C --> 2[Notify]
C --> 3[Notify]

A better approach can be the event-driven approach to CDC. Most data stores keeps a transaction-log, which can help with:

And although this isn't necessarily what transaction logs are meant for these factors can be leveraged to create an event-driven architecture for CDC.

Some advantages of this approach:

But there are some disadvantages to keep in mind:

That would be equally challenging in some cases. And you might want to have:

But if you can get away with that, this is a really nice way to go about it.

Practical Overview of Event-Driven CDC

MySQL

First start the container.

docker run -p 3306:3306 --name some-mysql -e MYSQL_ROOT_PASSWORD=root -d mysql

Then somehow run this SQL:

DROP DATABASE IF EXISTS `education`;
CREATE DATABASE IF NOT EXISTS `education`;
USE `education`;

-- TABLE STUDENTS

CREATE TABLE `students`
(
`email` varchar(50),
`name` varchar(50),
`city` varchar(50),
PRIMARY KEY (`email`)
);

Here we create a students table with email (PK), name, city fields.

You could also do this as part of this Python driver script below.

First install these:

pip install PyMSQL==0.10.1
pip install mysql-replication==0.22

Then we can create a driver script to setup database replication for use with CDC.

from pymysqlreplication import BinLogStreamReader

MYSQL_SETTINGS = {
'host': '127.0.0.1',
'port': 3306,
'user': 'root',
'passwd' 'root',
}

def call_mysql():
# server_id is your slave identifier, it should be unique.
# set blocking to True if you want to block and wait for
# the next event at the end of the stream
stream = BinLogStreamReader(
connection_settings=MYSQL_SETTINGS,
server_id=3,
blocking=True)

for binlogevent in stream:
binlogevent.dump()

stream.close()

This script will connect to the MySQL database and start listening for changes. When a change is detected, it will print the change to the console.

This is a whole step in a CDC system. Thanks to the community now we just configure and deploy the script and we now have an event-driven CDC system.

Now let's add some data to the database.

INSERT INTO `education`.`students` (`email`, `name`, `city`) VALUES (
'peter@mit.edu', 'Peter Parker', 'Cambridge, MA'
);

CDC Using Debezium

Debezium is a distributed platform for change data capture. To read more about it, read the Debezium document.

References

Web Links

Note Links