This will be a bit of a free form post.

As I said in my last post I am looking to set up some Kafka infrastructure on my own terms and really get to know how it works.

So I decided to start at the “front” of a Kafka pipeline, by making a Producer of messages.

I used my Scalatra Starter Pack to create a quick, http mini-service that accepts JSON and publishes that input as Kafka messages.

I started checking out the Class ProducerRecord docs on the Apache site, and noticed that it is possible to publish Kafka records both with a key, and without a key. One use case for a key would be a unique ID, or some type of numbering system so consumers can organize the records. Useful, but not required.

So here’s a super basic Message class and Kafka Producer wrapper class written in Scala, that allows the user to send a message with a key or without:

package com.lombardo.app.kafka

import java.util.Properties
import org.apache.kafka.clients.producer._

// key is an Option type, defaulted to a None
case class Message(topic: String, key: Option[String] = None, content: String)

class MessengerService {
  val props = new Properties()

  props.put("bootstrap.servers", "YOUR_KAFKA_HOST:AND_PORT")
  props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
  props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")

  val producer = new KafkaProducer[String, String](props)

  def send(msg: Message) = {
    val record = msg match {
      case Message(_, Some(_), _) => new ProducerRecord(msg.topic, msg.key.get, msg.content)
      case _ => new ProducerRecord[String, String](msg.topic, msg.content)
    }

    producer.send(record)
  }
}

An Scalatra endpoint could then be written like this:

post("/messages") {
  val message = parsedBody.extract[Message]
  log.info(message.toString)

  messengerService.send(message)
}

and it could accept JSON in either format, not caring either way:

{
  "topic": "readings",
  "key": "3214-aefcb",
  "content": "-20"
}

// or

{
  "topic": "readings",
  "content": "-30"
}

I added my mini-producer service to this docker-compose.yml file: https://github.com/lombardo-chcg/kafka-local-stack/blob/ch1/docker-compose.yml#L71

Let’s start up the stack and confirm it works. docker-compose up -d

Send the POST requests mentioned above to localhost:8080/messages

curl -X POST \
  http://localhost:8080/messages \
  -H 'content-type: application/json' \
  -d '{
    "topic": "readings",
    "key": "3214-aefcb",
    "content": "-20"
  }'

curl -X POST \
  http://localhost:8080/messages \
  -H 'content-type: application/json' \
  -d '{
    "topic": "readings",
    "content": "-30"
  }'  

Then fire up a console consumer:

docker exec -it kafka-cli bash

./kafka-console-consumer.sh \
  --bootstrap-server kafka:9092 \
  --topic readings \
  --from-beginning \
  --property print.key=true \
  --property key.separator=": "

  # 3214-aefcb: -20
  # null: -30