Last time we saw the basic config settings needed to stand up an instance of Kafka Connect. Now let’s see it in action.

The goal:

  1. Produce messages to a readings topic
  2. Set up Kafka Connect to consume from this topic and write to a local file

Reference Repo: https://github.com/lombardo-chcg/kafka-local-stack/tree/ch3

Step one: Start containers

git clone https://github.com/lombardo-chcg/kafka-local-stack.git
cd kafka-local-stack
git checkout ch3
docker-compose up -d

1) Note that the Kafka Connect container has a volume mounted consisting of a empty text file. This is the “sink” Kafak Connect will use in our example. In the real world this would be a database like Elasticsearch.

volumes:
  - ./readings:/tmp/readings

2) Also note that we are using the basic StringConverter to interface Kafka Connect to Kafka.

CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.storage.StringConverter"
CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.storage.StringConverter"

Step two: create Kafka Connect job.

This is where Kafka Connect shows its worth: deploying a new connector with zero coding. We will be using the official FileStreamSinkConnector that has been created by the Kafka Connect team and passing it a configuration.

We must send a JSON payload to a rest endpoint to create this new connector. Make this request using Postman:

POST http://localhost:8083/connectors

{
	"name": "readings-sink",
	"config": {
		"connector.class":"org.apache.kafka.connect.file.FileStreamSinkConnector",
		"tasks.max":"1",
		"topics":"readings",
		"file": "/tmp/readings/data.txt"
	}
}

Info on our new connector is available via the rest api:

  • http://localhost:8083/connectors/readings-sink
  • http://localhost:8083/connectors/readings-sink/status

Step three: publish messages to readings topic

The demo docker network contains a homemade “rest_producer” which takes a JSON payload and publishes it to Kafka as a ProducerRecord(String topic, String key, String value)

Sample message:

POST http://localhost:8080/messages

{
  "topic": "readings",
  "key": "52",
  "content": "looking at the trees"
}

We’ll use a bash script which sends messages to our rest_producer on a loop:

./create_readings.sh

Reminder: we are using the StringConverter to interface Kafka Connect to Kafka. This is because our rest_producer, while accepting JSON, is presently publishing to Kafka using the basic String message format.

Step four: glory

In a separate window, tail the local file to where Kafka Connect is writing the data:

tail -f readings/data.txt