High Level Overview
- Setup Kafka Connect so that updates to existing rows in a Postgres
sourcetable are put into a topic
- (aka set up an event stream representing changes to a PG table)
- Use Kafka Connect to write that PG data to a local
Create a new Postgres Table with timestamp trigger
We will need to set up a Postgres table that automatically updates a
last_modified column with the current timestamp every time a change is made.
Then, we will tell our Kafka Connect Connector to pay attention to that specific column.
psql session inside the running PG container:
Setup Kafka Source Connector for our new table
Here we set the JDBC connector
timestamp+incrementing. More details here: https://docs.confluent.io/current/connect/connect-jdbc/docs/source_connector.html#incremental-query-modes
Take note of our key values:
incrementing.column.name which contain data specific to this example.
Confirm new topic created
Setup sink connector
First, let’s set up a “sink” file and tail it (recall that the file location directory specified is mounted in the Kafka Connect container via the
Here’s the Sink Connector config which needs to be posted to Kafka Connect:
After a few seconds the record should show up in the window where you ran
tail -f data/streaming_output.txt
Now we can make updates to our existing Postgres records and the updates will “stream” out of the database thru Kafka Connect and into a topic.
The record should come across to our file!
Use case? We can store all the database changes as they happen, and then play them back later - i.e. for disaster recovery. This is an alternative to say, taking a whole snapshot of the database.
We can also soft delete and watch the update come through the data pipeline and into our sink:
Note that if we issue an acutal SQL
DELETE command into our
psql session, that update does not come across the topic. For that we would apparently need some more specialized “CDC” or Change Data Capture software. I’ll save that for another time. Also in my experience it is farily uncommon to actually “DELETE” data from a database, more common to soft-delete it using a boolean flag.