kafka connect in action, part 2
Up til now, I have focused mainly on Kafka Connect’s Sinks connectors. Lets’s set up a quick data pipeline to explore a Source connectors.
For the example I’ll be using a pre-populated Postgres Docker image from a previous post:
High Level Overview
- Use Kafka Connect to read data from a Postgres DB
source
that has multiple tables into distinct kafka topics - Use Kafka Connect to write that PG data to a
sink
(we’ll use file sink in this example)
Setup
mkdir kafka-connect-source-example
cd kafka-connect-source-example/
mkdir data
touch data/data.txt
touch docker-compose.yml
inside docker-compose.yml
:
(yes, I know I am many versions behind on the confluent platform @3.2.1. I’m more interested in the concepts… I’ll save the upgrade for another time.)
version: "2"
services:
zookeeper:
container_name: zookeeper
image: confluentinc/cp-zookeeper:3.2.1
ports:
- "2181:2181"
hostname: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
container_name: kafka
image: confluentinc/cp-kafka:3.2.1
hostname: kafka
ports:
- "9092"
- "9091:9091"
links:
- zookeeper
depends_on:
- zookeeper
environment:
ADVERTISED_HOST_NAME: "kafka"
KAFKA_INTER_BROKER_LISTENER_NAME: "PLAINTEXT"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT"
KAFKA_ADVERTISED_LISTENERS: "EXTERNAL://localhost:9091,PLAINTEXT://kafka:9092"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
schema_registry:
container_name: schema_registry
image: confluentinc/cp-schema-registry:3.2.1
ports:
- "8081:8081"
links:
- zookeeper
- kafka
depends_on:
- zookeeper
- kafka
environment:
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper:2181
SCHEMA_REGISTRY_HOST_NAME: schema_registry
kafka_connect:
container_name: kafka_connect
image: confluentinc/cp-kafka-connect:3.2.1
ports:
- "8083:8083"
links:
- zookeeper
- kafka
- schema_registry
- postgres
depends_on:
- zookeeper
- kafka
- schema_registry
environment:
# networking
CONNECT_BOOTSTRAP_SERVERS: "kafka:9092"
CONNECT_REST_ADVERTISED_HOST_NAME: "kafka_connect"
CONNECT_REST_PORT: "8083"
# kafka
CONNECT_GROUP_ID: "kc"
CONNECT_CONFIG_STORAGE_TOPIC: "kc-config"
CONNECT_OFFSET_STORAGE_TOPIC: "kc-offset"
CONNECT_STATUS_STORAGE_TOPIC: "kc-status"
# convertors
CONNECT_KEY_CONVERTER: "io.confluent.connect.avro.AvroConverter"
CONNECT_VALUE_CONVERTER: "io.confluent.connect.avro.AvroConverter"
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: "http://schema_registry:8081"
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: "http://schema_registry:8081"
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
volumes:
- ./data:/tmp/data
postgres:
image: lombardo/postgres-scrabble-helper:latest
container_name: postgres
ports:
- "5431:5432"
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
- POSTGRES_DB=scrabble_helper
Start Your Containers
docker-compose up -d
Source Connector
Let’s confirm data in postgres:
docker exec -it postgres psql -U postgres
# psql session
\connect scrabble_helper
\d
We see greetings
and words
tables (along with the *_id_seq
sequence tables that PG auto-creates)
Now we need to setup a Kafa Connector to read from these tables.
The config is pretty standard - I copped mine from the Confluent tutorial. but there are a few modifications for our use case.
POST
localhost:8083/connectors
{
"name":"postgres.connector",
"config":{
"connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max":"1",
"connection.url":"jdbc:postgresql://postgres:5432/scrabble_helper",
"connection.user":"postgres",
"connection.password":"postgres",
"mode":"incrementing",
"incrementing.column.name":"id",
"topic.prefix":"postgres.sourced."
}
}
(note the connection.url
which specifies a postgres driver type, the PG address on the Docker network, and the name of our database)
Let’s check ZooKeeper to see if our new topics exist
# OSX TIP: `brew install kafka` for quick access to the cli.
kafka-topics --zookeeper localhost:2181 --list
# ...
# postgres.sourced.greetings
# postgres.sourced.words
Our new topics are there. Let’s consume from one to be sure it has messages:
kafka-console-consumer --bootstrap-server localhost:9091 --topic postgres.sourced.greetings --from-beginning
There should be 4 messages. The text will be jumbled because we are in Avro format.
Schemas
Speaking of Avro, let’s check the schema registry.
There’s the auto-generated schemas for our PG Tables. PG makes this easy as each column provides a type and the data structure is inherently flat.
Here’s the inferred schema for the Greetings Table: http://localhost:8081/subjects/postgres.sourced.greetings-value/versions/latest
{
"type":"record",
"name":"greetings",
"fields":[
{
"name":"id",
"type":"int"
},
{
"name":"language",
"type":"string"
},
{
"name":"content",
"type":"string"
},
{
"name":"create_date",
"type":{
"type":"long",
"connect.version":1,
"connect.name":"org.apache.kafka.connect.data.Timestamp",
"logicalType":"timestamp-millis"
}
}
],
"connect.name":"greetings"
}
There’s also a connect.name
field that seems to have been added automatically as metadata which is interesting.
Ok, so now our data is in Kafka and conforms to a schema. Let’s set up a sink.
Sink
We’ll use the basic Apache FileStreamSinkConnector
config for a local File Connector:
POST
localhost:8083/connectors
{
"name":"greetings.sink",
"config":{
"connector.class":"org.apache.kafka.connect.file.FileStreamSinkConnector",
"tasks.max":"1",
"topics":"postgres.sourced.greetings",
"file":"/tmp/data/data.txt"
}
}
In your terminal:
tail -f data/data.txt
There we should see our 4 records which have completed their journey from PG to a local Sink.
–
Let’s watch it in real time. In one terminal pane, tail -f data/data.txt
In another,
docker exec -it postgres psql -U postgres
\connect scrabble_helper
insert into greetings (language, content) values ('Czech', 'Ahoj');
Records are now showing up in the file moments after being added to Postgres.
A basic but real-time data pipeline!
–
Note that delete from greetings where id=3;
does not show up in the topic and therefore the sink. so we are not streaming updates from the database, merely appends. I may explore another type of mode
for this Kafka Connect JDBC connector at a later time which may allow for streaming updates.