ZooKeeper’s zNodes provide a great way to cache a small cache across multiple running instances of the same application.

Let’s look into using Kafka’s Log Compaction feature for the same purpose.

Here’s some official reading on the subject: https://kafka.apache.org/documentation/#compaction.

And here’s a docker-compose file to get up-and-running quickly with the Confluent 4.0 platform which is the latest as of this writing.

  • Copy the following content into a file called docker-compose.yml
  • execute the following command from the same directory: docker-compose up -d
version: "2"

services:
  zookeeper:
    container_name: zookeeper
    image: confluentinc/cp-zookeeper:4.0.0
    ports:
      - "2181:2181"
    hostname: zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  kafka:
    container_name: kafka
    image: confluentinc/cp-kafka:4.0.0
    hostname: kafka
    ports:
      - "9092"
      - "9091:9091"
    links:
      - zookeeper
    depends_on:
      - zookeeper
    environment:
      ADVERTISED_HOST_NAME: "kafka"
      KAFKA_INTER_BROKER_LISTENER_NAME: "PLAINTEXT"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT"
      KAFKA_ADVERTISED_LISTENERS: "EXTERNAL://localhost:9091,PLAINTEXT://kafka:9092"
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_DELETE_RETENTION_MS: 5000
      KAFKA_LOG_CLEANER_DELETE_RETENTION_MS: 5000
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1      

  schema_registry:
    container_name: schema_registry
    image: confluentinc/cp-schema-registry:4.0.0
    ports:
      - "8081:8081"
    links:
      - zookeeper
      - kafka
    depends_on:
      - zookeeper
      - kafka
    environment:
      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper:2181
      SCHEMA_REGISTRY_HOST_NAME: schema_registry

  kafka_connect:
    container_name: kafka_connect
    image: confluentinc/cp-kafka-connect:4.0.0
    ports:
      - "8083:8083"
    links:
      - zookeeper
      - kafka
      - schema_registry
    depends_on:
      - zookeeper
      - kafka
      - schema_registry
    environment:
      # networking
      CONNECT_BOOTSTRAP_SERVERS: "kafka:9092"
      CONNECT_REST_ADVERTISED_HOST_NAME: "kafka_connect"
      CONNECT_REST_PORT: "8083"
      # kafka
      CONNECT_GROUP_ID: "kc"
      CONNECT_CONFIG_STORAGE_TOPIC: "kc-config"
      CONNECT_OFFSET_STORAGE_TOPIC: "kc-offset"
      CONNECT_STATUS_STORAGE_TOPIC: "kc-status"
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      # convertors
      CONNECT_KEY_CONVERTER: "io.confluent.connect.avro.AvroConverter"
      CONNECT_VALUE_CONVERTER: "io.confluent.connect.avro.AvroConverter"
      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: "http://schema_registry:8081"
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: "http://schema_registry:8081"
      CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"

*Note: this docker-compose file using some nifty settings that enable traffic to Kafka from inside and outside the Docker network. *

Now let’s create a compacted topic. Here I am fiddling with values such as delete.retention.ms and min.cleanable.dirty.ratio just to prove the point for the example. For production these values would have to be tuned.

Execute this from the terminal:

kafka-topics --zookeeper localhost:2181 \
  --create --topic cache.topic \
  --config "cleanup.policy=compact" \
  --config "delete.retention.ms=100" \
  --config "segment.ms=100" \
  --config "min.cleanable.dirty.ratio=0.01" \
  --partitions 1 \
  --replication-factor 1

Now lets send a few messages to the topic with the same key for each message.

Read more here about how a Kafka background process determines how it removes records with dup’ed keys from the log:

Log compaction is handled by the log cleaner, a pool of background threads that recopy log segment files, removing records whose key appears in the head of the log

https://kafka.apache.org/documentation/#design_compactiondetails

This requires us to use a pass special properties using the -property flag in the Kafka Console Producer

  • parse.key=true
  • key.separator=:
for i in $(seq 0 10); do \
  echo "sameKey123:differentMessage$i" | kafka-console-producer \
  --broker-list localhost:9091 \
  --topic cache.topic \
  --property "parse.key=true" \
  --property "key.separator=:"; \
done

# output doesn't print but messages will be: `differentMessage1 .. differentMessage10`

Now let’s check out the results:

kafka-console-consumer --bootstrap-server localhost:9091 \
  --topic cache.topic \
  --property print.key=true \
  --property key.separator=" : " \
  --from-beginning

I see:

sameKey123 : differentMessage9
sameKey123 : differentMessage10

Our bash for loop ran 10 times but only entries 9 and 10 are present on the topic. Messages 1 - 8 have been cleaned up by the log cleaner.

I was expected to see ONLY the latest messages. (not the last two)

Well, this is only the “hello world” of Log Compaction and the settings would need tuning based on more research and testing to ensure a reliable cache.

But my reaction for now: pause and think that each application using a compacted Kafka topic as a cache may encounter a situation where they read the cache and see the same key twice (this is what happpened in the example above).

But that is topic-tuning and some unit tests away. Very cool to see the potential of using Kafka as a distributed systems cache.

unanswered: zookeeper provides reentrant locking on zNodes, preventing cache-update race conditions. How would the group arrive at a consensus with no guarantee of resource locking?