kafka connect in action, part 3
make sure to follow this example first to set up a docker environment for the example
–
High Level Overview
- Setup Kafka Connect so that updates to existing rows in a Postgres
source
table are put into a topic - (aka set up an event stream representing changes to a PG table)
- Use Kafka Connect to write that PG data to a local
sink
Start Containers
i repeat…make sure to follow this example for the docker compose config
Create a new Postgres Table with timestamp trigger
We will need to set up a Postgres table that automatically updates a last_modified
column with the current timestamp every time a change is made.
Then, we will tell our Kafka Connect Connector to pay attention to that specific column.
Enter a psql
session inside the running PG container:
inside the psql
session:
Setup Kafka Source Connector for our new table
Here we set the JDBC connector mode
as timestamp+incrementing
. More details here: https://docs.confluent.io/current/connect/connect-jdbc/docs/source_connector.html#incremental-query-modes
Take note of our key values: connection.url
, timestamp.column.name
, incrementing.column.name
which contain data specific to this example.
POST
http://localhost:8083/connectors
Confirm new topic created
Setup sink connector
First, let’s set up a “sink” file and tail it (recall that the file location directory specified is mounted in the Kafka Connect container via the docker-compose
file):
Here’s the Sink Connector config which needs to be posted to Kafka Connect:
POST
http://localhost:8083/connectors
After a few seconds the record should show up in the window where you ran tail -f data/streaming_output.txt
Profit
Now we can make updates to our existing Postgres records and the updates will “stream” out of the database thru Kafka Connect and into a topic.
The record should come across to our file!
Use case? We can store all the database changes as they happen, and then play them back later - i.e. for disaster recovery. This is an alternative to say, taking a whole snapshot of the database.
We can also soft delete and watch the update come through the data pipeline and into our sink:
Note that if we issue an acutal SQL DELETE
command into our psql
session, that update does not come across the topic. For that we would apparently need some more specialized “CDC” or Change Data Capture software. I’ll save that for another time. Also in my experience it is farily uncommon to actually “DELETE” data from a database, more common to soft-delete it using a boolean flag.