Log Compacted Topics in Apache Kafka

Log Compacted Topics in Apache KafkaSeyed Morteza MousaviBlockedUnblockFollowFollowingApr 12When I had begun reading Kafka documentation, although log compacted topic seemed a simple concept, it wasn’t clear to me how internally Kafka keeps the states of them in the file system.

This month I had some time to read more about this feature and I want to share my understandings with you.

TL;DRIn this article, I will describe the log compacted topics in Kafka.

Then I will show you how Kafka internally keeps the states of these topics in the file system.

PrerequisitesI assume that you are already familiar with Apache Kafka basic concepts such as broker, topic, partition, consumer and producer.

Also if you want to run the sample commands, you have to run a Kafka broker and a Zookeeper server.

What is a Log Compacted TopicsKafka documentation says:Log compaction is a mechanism to give finer-grained per-record retention, rather than the coarser-grained time-based retention.

The idea is to selectively remove records where we have a more recent update with the same primary key.

This way the log is guaranteed to have at least the last state for each key.

To simplify this description, Kafka removes any old records when there is a newer version of it with the same key in the partition log.

As an example consider following partition of a log compacted topic called latest-product-price:As you see at first there are two records with key p3.

But because it is a log compacted topic, Kafka removes the older record in a background thread (more on this in next sections).

Now assume we have a producer that sends new records to this partition.

The producer produces 3 records with keys p6,p5,p5 respectively:Again a background thread inside Kafka broker removes older records with keys p5 and p6.

Note that compacted log is composed of two part: a tail and a head.

Kafka makes sure that all records inside the tail part have a unique key because the tail section is scanned in the previous cycle of the cleaning process.

But the head section can have duplicate values.

Now that we learned what is log compacted topic its time to create them using kafka-topics tool.

Create a Log Compacted TopicCreate a compacted topic (I will describe all configs in details):kafka-topics –create –zookeeper zookeeper:2181 –topic latest-product-price –replication-factor 1 –partitions 1 –config "cleanup.

policy=compact" –config "delete.

retention.

ms=100" –config "segment.

ms=100" –config "min.

cleanable.

dirty.

ratio=0.

01"Produce some records:kafka-console-producer –broker-list localhost:9092 –topic latest-product-price –property parse.

key=true –property key.

separator=:>p3:10$>p5:7$>p3:11$>p6:25$>p6:12$>p5:14$>p5:17$Notice that in the above command I separated key and value by :.

Now Consume the topic:kafka-console-consumer –bootstrap-server localhost:9092 –topic latest-product-price –property print.

key=true –property key.

separator=: –from-beginningp3:11$p6:12$p5:14$p5:17$As you see records with duplicated keys are removed.

The p5:14$ record is not removed which we will see the reason when I describe the cleaning process.

But we must first look at how Kafka internally stores messages.

SegmentsPartition log is an abstraction that allows us to easily consume ordered messages inside the partition, without being worried about the internal storage of Kafka.

In reality, however, the partition log is divided by Kafka broker into segments.

Segments are files stored in the file system (inside data directory and in the directory of the partition), which their name ends with .

log.

In the below image a partition log is divided into 3 segments:As you see, we have a partition log that holds 7 records residing in 3 separate segment files.

The first offset of a segment is called the base offset of the segment.

The segment file name is always equal to its base offset value.

The last segment in the partition is called the active segment.

Only the active segment of a log can receive the newly produced messages.

We will see how Kafka behaves with the active segment in the cleaning process of a compacted log.

Returning to our example, we can view segment files of our topic partition by the following command (assuming your Kafka data directory is /var/lib/kafka/data):ls /var/lib/kafka/data/latest-product-price-0/00000000000000000000.

index 00000000000000000006.

log00000000000000000000.

log 00000000000000000006.

snapshot00000000000000000000.

timeindex 00000000000000000006.

timeindex00000000000000000005.

snapshot leader-epoch-checkpoint00000000000000000006.

index00000000000000000000.

log and 00000000000000000006.

log are segments of this partition and 00000000000000000006.

log is the active segment.

When does Kafka create a new segment?.One option is by setting segment.

bytes (default is 1GB) config during topic creation.

When your segment size become bigger than this value, Kafka will create a new segment.

Another option is by setting segment.

ms as you saw earlier.

With this option, when Kafka receives a produce request, it will check that the active segment is older than segment.

ms value.

If it is older, then it will create a new segment.

In our command, we set segment.

ms=100 to make sure that every 100 milliseconds a new segment is created.

Interesting point is that when you set segment.

ms=100 you probably will have smaller segments.

After the cleaning process (see next section) Kafka broker, will merge non-active segments and creates a large segment from them.

For more information about segments and internal storage of Kafka, you can read How Kafka’s Storage Internals Work and A Practical Introduction to Kafka Storage Internals articles.

Cleaning ProcessDuring startup, the Kafka broker creates a number of cleaner threads, responsible for cleaning compacted logs (The number of these threads are configurable through log.

cleaner.

threads config).

The cleaner thread, constantly will try to find the filthiest log in the broker and then try to clean it.

For each log, it calculates the dirty ratio as below:dirty ratio = the number of bytes in the head / total number of bytes in the log(tail + head)The cleaner thread then chooses the log with the highest dirty ratio.

This log is called the filthiest log and if its value is greater than min.

cleanable.

dirty.

ratio config, it will be cleaned.

Otherwise, the cleaner thread will be blocked for a number of milliseconds (configurable with log.

cleaner.

backoff.

ms).

After finding the filthiest log we want to find the portion of the log that is cleanable.

Note that some portion of the log is not cleanable and will not be scanned:All records inside the active segment.

That’s why we still see duplicated p5:14$ record in our consumer.

If you set min.

compaction.

lag.

ms config bigger than 0, then any segment that has a record with a timestamp younger than this config, will not be cleaned.

These segments will not be scanned for compaction.

Now we know which records we are going to compact.

From the first record in the log to the first record that is not cleanable.

For simplicity in this article, we assume that all records in the head are cleanable.

Note that we know that every record in the tail section of a log has a unique key because duplicates were removed in the last clean.

It is only possible that we have some records in the head section which their keys is not unique in the log.

To find duplicate record faster, Kafka creates a map for records in the head section.

Returning to our example the offset map structure is something like this:As you see Kafka creates a structure called offset map that for each key in the head section holds its corresponding offset.

If we have duplicates in the head, Kafka uses the newest offset.

In the above image, record with key p6 is at offset 5 and p5 newest offset is 7.

Now cleaner thread checks every record in the log and removes it if there is any record with the same key inside offset map and its offset is different from the entry in the map (we don’t want to delete newest records).

During the cleaning process of a compacted log, not only duplicated message will be removed but Kafka also removes records that have a value of null.

These records are called tombstone.

You can delay the removal of them by setting delete.

retention.

ms config.

By setting this config, Kafka checks the modification timestamp of the segment that contains this record and if the modification time is younger than the config value, the record will be retained.

Now the log became clean.

After this cleaning process, we have a new tail and a new head!.The last offset that is scanned for cleaning (in our example the last record in the old head) is the last offset of the new tail.

Kafka keeps the start offset of the new head in a file named cleaner-offset-checkpoint in the root of the data directory.

This file is used for the next cleaning cycle for the log.

We can view our topic checkpoint file:cat /var/lib/kafka/data/cleaner-offset-checkpoint01latest-product-price 0 6As you see there are three lines.

The first line is the version of the file (I think for backward compatibility), the second line has value 1 which shows how many lines will follow this line (just one line), and the last line contains the name of compacted log topic, the partition number, and the head offset of this partition.

ConclusionIn this article, I showed you what is log compacted topic, how they are stored and how Kafka cleans them periodically.

In the end, I want to point out that log compaction is great for caching scenarios where you want to just keep the latest value for each record in near real time.

Assume you want to build your cache in the startup of your application.

You can just read your compacted topic and build your cache and because Kafka read messages sequentially, it is much faster than warming your cache using a SQL database.

You can read more about this technique in Martin Kleppmann Turning the database inside-out article.

You may also find my previous article Beat Cache Invalidation in ASP.

NET Core Using Kafka and Debezium useful which is an implementation of this technique.

Referenceshttps://github.

com/apache/kafka/https://thehoard.

blog/how-kafkas-storage-internals-work-3a29b02e026https://medium.

com/@durgaswaroop/a-practical-introduction-to-kafka-storage-internals-d5b544f6925fhttps://kafka.

apache.

org/documentation/.

. More details

Leave a Reply