Beat Cache Invalidation in ASP.NET Core Using Kafka and Debezium

The answer is whenever your underlying database record is changed.

But data might be changed by a process that is not in our control.

If we can subscribe to any change events (insert, update and delete) that occurs to database records then we can successfully invalidate the relevant cache entry in real time.

The good news is that most databases publish their insert, update and remove events.

This feature is called change data capture (CDC).

TL;DRIn this article, I want to show you how we can use CDC to subscribe in any event that changes MySQL database records and publish each of these events as a separate message to Apache Kafka.

Debezium is a platform that makes this possible for us.

Then we use these events to build our in-memory cache in ASP.

NET Core.

We will see how we can build our REST API based on our real-time cache.

This is similar to how replication works in databases.

In replication secondary replica subscribes to any transaction (change events) on primary and apply that transaction to its own database records and in this way secondary database state will be eventually equal to the primary.

As shown in the image, we subscribe Kafka cluster as a replica to the database and process change events to build our cache:Not only this approach solves cache invalidation, but it also solves other problems such as race condition and cache warm start as Martin Kleppmann described in his Turning the database inside-out article.

The source code of this article is available in GitHub.

Prerequisites.

NET Core 2.

2docker and docker-composeStart Docker ServicesWe will set up our Apache Kafka cluster, MySQL database and Debezium connector (which we will discuss it shortly) using Docker.

First, create a docker-compose.

yml file with the following content:version: '3.

1'services:mysql: image: mysql:5.

7 environment: MYSQL_ROOT_PASSWORD: 123456 MYSQL_USER: mysql volumes: – .

/my.

cnf:/etc/mysql/my.

cnf ports: – 3306:3306zookeeper: image: confluentinc/cp-zookeeper ports: – "2181:2181" environment: ZOOKEEPER_CLIENT_PORT: 2181kafka: image: confluentinc/cp-kafka depends_on: – zookeeper – mysql ports: – "9092:9092" environment: KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_LOG_CLEANER_DELETE_RETENTION_MS: 5000 KAFKA_BROKER_ID: 1 KAFKA_MIN_INSYNC_REPLICAS: 1connector: image: debezium/connect:0.

10 ports: – "8083:8083" environment: GROUP_ID: 1 CONFIG_STORAGE_TOPIC: my_connect_configs OFFSET_STORAGE_TOPIC: my_connect_offsets BOOTSTRAP_SERVERS: kafka:9092 depends_on: – zookeeper – mysql – kafkamy.

cnf file enables CDC feature of MySQL.

Now start all docker services:docker-compose upCreate MySQL DatabaseOpen another terminal in the same path, connect to MySQL container and run MySQL CLI:docker-compose exec mysql bash -c 'mysql -u root -p$MYSQL_ROOT_PASSWORD'Now we want to create a database in MySQL.

Run the following script to create a database named mystore, a table named products and insert a simple record to the products table:create database mystore;use mystore;create table products (id int unsigned auto_increment primary key, name varchar(50), price int, creation_time datetime default current_timestamp, modification_time datetime on update current_timestamp);insert into products(name, price) values("Red T-Shirt", 12);Create Debezium ConnectorNow we want to move every change that happens to products table to Kafka.

To this end, we must create a connector.

A connector is an application that is responsible for moving data from a database (or any other storage system) to Kafka cluster (and vice versa).

If you are not familiar with Kafka connector you can read Confluent documents.

Here we want to move MySQL change events to Apache Kafka cluster.

Debezium is a Kafka Connector that can read all change events from MySQL (and some other databases) and publish them to Kafka:Debezium exposes a REST API to create a connector.

So to create a Debezium connector open another terminal and run the following script (Most of the configuration is self-explanatory but for more information read Debezium MySQL tutorial):curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "mystore-connector", "config": { "connector.

class": "io.

debezium.

connector.

mysql.

MySqlConnector", "tasks.

max": "1", "database.

hostname": "mysql", "database.

port": "3306", "database.

user": "root", "database.

password": "123456", "database.

server.

id": "223345", "database.

server.

name": "mysql", "database.

whitelist": "mystore", "database.

history.

kafka.

bootstrap.

servers": "kafka:9092", "database.

history.

kafka.

topic": "dbhistory.

mystore","transforms":"unwrap","transforms.

unwrap.

type":"io.

debezium.

transforms.

UnwrapFromEnvelope","transforms.

unwrap.

drop.

tombstones":"false","key.

converter": "org.

apache.

kafka.

connect.

json.

JsonConverter","key.

converter.

schemas.

enable": "false","value.

converter": "org.

apache.

kafka.

connect.

json.

JsonConverter","value.

converter.

schemas.

enable": "false","include.

schema.

changes": "false"} }'If you receive HTTP/1.

1 201 Created, your connector has been created successfully.

You can also check the status of the connector:curl localhost:8083/connectors/mystore-connector/status{ "name": "mystore-connector", "connector": { "state": "RUNNING", "worker_id": "172.

24.

0.

5:8083" }, "tasks": [ { "id": 0, "state": "RUNNING", "worker_id": "172.

24.

0.

5:8083" } ], "type": "source"}Running value in the state field indicates that your connector is working.

Now let's check that any database changes will be synced to Kafka.

First, connect to Kafka container:docker-compose exec kafka bashAnd view the list of topics:kafka-topics –zookeeper zookeeper:2181 –list__confluent.

support.

metrics__consumer_offsetsconnect-statusdbhistory.

mystoremy_connect_configsmy_connect_offsetsmysqlmysql.

mystore.

productsmysql.

mystore.

products topic stores change events of products table.

We can read messages inside this topic using the following script(message keys are separated by -):kafka-console-consumer –bootstrap-server kafka:9092 –from-beginning –topic mysql.

mystore.

products –property print.

key=true –property key.

separator="-"{"id":1}-{"id":1,"name":"Red T-Shirt","price":12,"creation_time":1553595845000,"modification_time":null}To check that the changes will be synced in (near) real time, in MySQL container, add another record:insert into products(name, price) values("Blue Hat", 5);The change will be shown immediately in the consumer terminal:{"id":1}-{"id":1,"name":"Red T-Shirt","price":12,"creation_time":1553595845000,"modification_time":null}{"id":2}-{"id":2,"name":"Blue Hat","price":5,"creation_time":1553595958000,"modification_time":null}Update “Blue Hat” record:update products set price = 17 where name = "Blue Hat";An update event is published:{"id":1}-{"id":1,"name":"Red T-Shirt","price":12,"creation_time":1553595845000,"modification_time":null}{"id":2}-{"id":2,"name":"Blue Hat","price":5,"creation_time":1553595958000,"modification_time":null}{"id":2}-{"id":2,"name":"Blue Hat","price":17,"creation_time":1553595958000,"modification_time":1553595986000}Delete record with the id of 1:delete from products where id = 1;A message with a null value will be added to the topic:{"id":1}-{"id":1,"name":"Red T-Shirt","price":12,"creation_time":1553595845000,"modification_time":null}{"id":2}-{"id":2,"name":"Blue Hat","price":5,"creation_time":1553595958000,"modification_time":null}{"id":2}-{"id":2,"name":"Blue Hat","price":17,"creation_time":1553595958000,"modification_time":1553595986000}{"id":1}-nulla null value indicates that the record is deleted.

Let's add a column named description:alter table products add column description nvarchar(1000);And update a product record:update products set description = "Can be used for the spring!" where id = 2;And you will see that even schema changes will be reflected in the message:{"id":1}-{"id":1,"name":"Red T-Shirt","price":12,"creation_time":1553595845000,"modification_time":null}{"id":2}-{"id":2,"name":"Blue Hat","price":5,"creation_time":1553595958000,"modification_time":null}{"id":2}-{"id":2,"name":"Blue Hat","price":17,"creation_time":1553595958000,"modification_time":1553595986000}{"id":1}-null{"id":2}-{"id":2,"name":"Blue Hat","price":17,"creation_time":1553595958000,"modification_time":1553596044000,"description":"Can be used for the spring!"}Note that some RDBMSs like SQL Server does not automatically reflect schema changes in CDC data.

But MySQL CDC supports schema changes.

Cache Builder Projectmysql.

mystore.

products topic holds every change for products table.

To build our cache, we want to keep the latest value for each record.

To this end, we can keep the latest value for each product id in a separate topic named products.

cache.

We also create a project called cache builder that will read every message in mysql.

mystore.

products topic and produce them to products.

cache topic.

products.

cache topic is a compacted topic which means, for each key (in this case product id) will hold only one message.

You can read more about the compacted topic in Cloudurable and my recent article.

Create a compacted topic in Kafka by running following script inside Kafka container:kafka-topics –create –zookeeper zookeeper:2181 –topic products.

cache –replication-factor 1 –partitions 1 –config "cleanup.

policy=compact" –config "delete.

retention.

ms=100" –config "segment.

ms=100" –config "min.

cleanable.

dirty.

ratio=0.

01"In order to connect to Kafka broker from our application we also need to add kafka hostname to the hosts file (just add the bold line):sudo vi /etc/hosts127.

0.

0.

1 localhost# .

127.

0.

0.

1 kafkaCreate a solution and CacheBuilder project (You can see the complete code in GitHub):mkdir srccd src/dotnet new sln –name KafkaCachedotnet new console -o KafkaCache.

CacheBuilderdotnet sln add KafkaCache.

CacheBuilder/KafkaCache.

CacheBuilder.

csprojAnd install Confluent.

Kafka and Newtonsoft.

Json NuGet packages:dotnet add KafkaCache.

CacheBuilder/KafkaCache.

CacheBuilder.

csproj package Confluent.

Kafka –version 1.

0.

0-RC1dotnet add KafkaCache.

CacheBuilder/KafkaCache.

CacheBuilder.

csproj package Newtonsoft.

Json –version 12.

0.

1Here you can see Program.

cs code:And here is ProductKey class:In Program.

cs we just read messages from mysql.

mystore.

products topic, extract product id field and create another message and publish it to products.

cache topic.

Now run CacheBuilder project (in a separate terminal):dotnet run –project KafkaCache.

CacheBuilder/KafkaCache.

CacheBuilder.

csprojYou can view this topic inside Kafka container:kafka-console-consumer –bootstrap-server kafka:9092 –from-beginning –topic products.

cacheWeb API projectWe create a Web API project that will expose simple REST API to get the details of a product (retrieved from the cache).

Also, this project is responsible for consuming cache entry from products.

cache topic and store them in the in-memory cache.

Run the following script to create the project:dotnet new webapi -o KafkaCache.

Apidotnet sln add KafkaCache.

Api/KafkaCache.

Api.

csprojAnd install Confluent.

Kafka and Newtonsoft.

Json packages:dotnet add KafkaCache.

CacheBuilder/KafkaCache.

CacheBuilder.

csproj package Confluent.

Kafka –version 1.

0.

0-RC1dotnet add KafkaCache.

CacheBuilder/KafkaCache.

CacheBuilder.

csproj package Newtonsoft.

Json –version 12.

0.

1Enable in-memory cache in Startup class:services.

AddMemoryCache();This allows us to use IMemoryCache which is used for storing and retrieving cache entry from memory in ASP.

NET Core.

To fill this cache we need a CacheUpdater class responsible for consuming message from products.

cache topic and update our memory cache:And ProductItemCache class:Notice that we have a Run method that is responsible for updating the cache.

The method receives a returnOnLastOffset parameter that is used to return on the last message on the topic.

If it has true value and we are at the end of the topic/partition, we return from the method.

This is useful during startup when we want to warm our cache before proceeding to serve any REST API request.

We now use CacheUpdater in application initialization:As you see above, we call CacheUpdater.

Run method two times.

At first, for warming cache and secondly for running a background job to continuously read products.

cache topic and update in-memory cache.

Finally here is our controller that directly serve requests from the cache:Now run the API project:dotnet run –project KafkaCache.

Api/KafkaCache.

Api.

csprojAnd check your API:curl -k https://localhost:5001/api/products/2{"id":2,"name":"Blue Hat","price":17}Let's change the price of the product with id 2 inside MySQL container:update products set price = 56 where id = 2;And again request to your API:curl -k https://localhost:5001/api/products/2{"id":2,"name":"Blue Hat","price":56}As you see any changes are reflected immediately to our cache!ConclusionUsing CDC we can reflect our database changes near real-time into Kafka.

Then we could create an In-Memory cache by consuming Kafka messages.

Now I would like to point out some of the advantages and disadvantages of this approach.

Advantages:Mitigate 3 problems in implementing a cache: cache invalidation, race condition, and warm start performance.

Syncing database changes to cache in real time.

Faster cache warms up due to sequential IO (in reading messages from Kafka topic).

Disadvantages:More complexity: you need to implement cache builder, use Debezium Connector, Enable CDC for database and read events from Kafka cluster.

Need to monitor Kafka, connector and cache builder.

Needs more knowledge: New developer must learn more frameworks.

Referenceshttps://vladmihalcea.

com/how-to-extract-change-data-events-from-mysql-to-kafka-using-debezium/https://github.

com/confluentinc/demo-scene/blob/master/ksql-workshop/ksql-workshop.

adochttps://martin.

kleppmann.

com/2015/03/04/turning-the-database-inside-out.

htmlhttps://debezium.

io/docs/configuration/event-flattening/https://www.

mssqltips.

com/sqlservertip/4096/understanding-how-dml-and-ddl-changes-impact-change-data-capture-in-sql-server/https://www.

reddit.

com/r/compsci/comments/96hbx9/what_is_hard_about_cache_invalidation/.

. More details

Leave a Reply