ZooKeeper’s zNodes provide a great way to cache a small cache across multiple running instances of the same application.

Let’s look into using Kafka’s Log Compaction feature for the same purpose.

Here’s some official reading on the subject: https://kafka.apache.org/documentation/#compaction.

And here’s a docker-compose file to get up-and-running quickly with the Confluent 4.0 platform which is the latest as of this writing.

  • Copy the following content into a file called docker-compose.yml
  • execute the following command from the same directory: docker-compose up -d
version: "2"

services:
  zookeeper:
    container_name: zookeeper
    image: confluentinc/cp-zookeeper:4.0.0
    ports:
      - "2181:2181"
    hostname: zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  kafka:
    container_name: kafka
    image: confluentinc/cp-kafka:4.0.0
    hostname: kafka
    ports:
      - "9092"
      - "9091:9091"
    links:
      - zookeeper
    depends_on:
      - zookeeper
    environment:
      ADVERTISED_HOST_NAME: "kafka"
      KAFKA_INTER_BROKER_LISTENER_NAME: "PLAINTEXT"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT"
      KAFKA_ADVERTISED_LISTENERS: "EXTERNAL://localhost:9091,PLAINTEXT://kafka:9092"
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_DELETE_RETENTION_MS: 5000
      KAFKA_LOG_CLEANER_DELETE_RETENTION_MS: 5000
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1      

  schema_registry:
    container_name: schema_registry
    image: confluentinc/cp-schema-registry:4.0.0
    ports:
      - "8081:8081"
    links:
      - zookeeper
      - kafka
    depends_on:
      - zookeeper
      - kafka
    environment:
      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper:2181
      SCHEMA_REGISTRY_HOST_NAME: schema_registry

  kafka_connect:
    container_name: kafka_connect
    image: confluentinc/cp-kafka-connect:4.0.0
    ports:
      - "8083:8083"
    links:
      - zookeeper
      - kafka
      - schema_registry
    depends_on:
      - zookeeper
      - kafka
      - schema_registry
    environment:
      # networking
      CONNECT_BOOTSTRAP_SERVERS: "kafka:9092"
      CONNECT_REST_ADVERTISED_HOST_NAME: "kafka_connect"
      CONNECT_REST_PORT: "8083"
      # kafka
      CONNECT_GROUP_ID: "kc"
      CONNECT_CONFIG_STORAGE_TOPIC: "kc-config"
      CONNECT_OFFSET_STORAGE_TOPIC: "kc-offset"
      CONNECT_STATUS_STORAGE_TOPIC: "kc-status"
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      # convertors
      CONNECT_KEY_CONVERTER: "io.confluent.connect.avro.AvroConverter"
      CONNECT_VALUE_CONVERTER: "io.confluent.connect.avro.AvroConverter"
      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: "http://schema_registry:8081"
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: "http://schema_registry:8081"
      CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"

*Note: this docker-compose file using some nifty settings that enable traffic to Kafka from inside and outside the Docker network. *

Now let’s create a compacted topic. Here I am fiddling with values such as delete.retention.ms and min.cleanable.dirty.ratio just to prove the point for the example. For production these values would have to be tuned.

Execute this from the terminal:

kafka-topics --zookeeper localhost:2181 \
  --create --topic cache.topic \
  --config "cleanup.policy=compact" \
  --config "delete.retention.ms=100" \
  --config "segment.ms=100" \
  --config "min.cleanable.dirty.ratio=0.01" \
  --partitions 1 \
  --replication-factor 1

Now lets send a few messages to the topic with the same key for each message.

Read more here about how a Kafka background process determines how it removes records with dup’ed keys from the log:

Log compaction is handled by the log cleaner, a pool of background threads that recopy log segment files, removing records whose key appears in the head of the log

https://kafka.apache.org/documentation/#design_compactiondetails

This requires us to use a pass special properties using the -property flag in the Kafka Console Producer

  • parse.key=true
  • key.separator=:
for i in $(seq 0 10); do \
  echo "sameKey123:differentMessage$i" | kafka-console-producer \
  --broker-list localhost:9091 \
  --topic cache.topic \
  --property "parse.key=true" \
  --property "key.separator=:"; \
done

# output doesn't print but messages will be: `differentMessage1 .. differentMessage10`

Now let’s check out the results:

kafka-console-consumer --bootstrap-server localhost:9091 \
  --topic cache.topic \
  --property print.key=true \
  --property key.separator=" : " \
  --from-beginning

I see:

sameKey123 : differentMessage9
sameKey123 : differentMessage10

Our bash for loop ran 10 times but only entries 9 and 10 are present on the topic. Messages 1 - 8 have been cleaned up by the log cleaner.

I was expected to see ONLY the latest messages. (not the last two)

Well, this is only the ā€œhello worldā€ of Log Compaction and the settings would need tuning based on more research and testing to ensure a reliable cache.

But my reaction for now: pause and think that each application using a compacted Kafka topic as a cache may encounter a situation where they read the cache and see the same key twice (this is what happpened in the example above).

But that is topic-tuning and some unit tests away. Very cool to see the potential of using Kafka as a distributed systems cache.

unanswered: zookeeper provides reentrant locking on zNodes, preventing cache-update race conditions. How would the group arrive at a consensus with no guarantee of resource locking?