kafka connect in action, part 3
make sure to follow this example first to set up a docker environment for the example
–
High Level Overview
- Setup Kafka Connect so that updates to existing rows in a Postgres
source
table are put into a topic - (aka set up an event stream representing changes to a PG table)
- Use Kafka Connect to write that PG data to a local
sink
Start Containers
i repeat…make sure to follow this example for the docker compose config
docker-compose up -d
Create a new Postgres Table with timestamp trigger
We will need to set up a Postgres table that automatically updates a last_modified
column with the current timestamp every time a change is made.
Then, we will tell our Kafka Connect Connector to pay attention to that specific column.
Enter a psql
session inside the running PG container:
docker exec -it postgres psql -U postgres
inside the psql
session:
CREATE DATABASE streaming_example;
\connect streaming_example
CREATE TABLE streaming_update_table (
id serial PRIMARY KEY,
data varchar(256) NOT NULL,
is_soft_deleted boolean default false,
create_date timestamp NOT NULL default now(),
last_updated timestamp NOT NULL default now()
);
-- setup a function to update the last_updated column automatically
CREATE OR REPLACE FUNCTION update_last_updated_column()
RETURNS TRIGGER AS $$
BEGIN
NEW.last_updated = now();
RETURN NEW;
END;
$$ language 'plpgsql';
-- apply the function as a trigger
CREATE TRIGGER update_last_updated_column
BEFORE UPDATE ON streaming_update_table
FOR EACH ROW EXECUTE PROCEDURE update_last_updated_column();
-- start off with a sample record
INSERT INTO streaming_update_table (data) values ('test data');
Setup Kafka Source Connector for our new table
Here we set the JDBC connector mode
as timestamp+incrementing
. More details here: https://docs.confluent.io/current/connect/connect-jdbc/docs/source_connector.html#incremental-query-modes
Take note of our key values: connection.url
, timestamp.column.name
, incrementing.column.name
which contain data specific to this example.
POST
http://localhost:8083/connectors
{
"name": "pg.source.connector",
"config": {
"connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max":"1",
"connection.url":"jdbc:postgresql://postgres:5432/streaming_example",
"connection.user": "postgres",
"connection.password":"postgres",
"mode": "timestamp+incrementing",
"timestamp.column.name": "last_updated",
"incrementing.column.name": "id",
"topic.prefix": "postgres.with.timestamps."
}
}
Confirm new topic created
kafka-topics --zookeeper localhost:2181 --list
# ...
# postgres.with.timestamps.streaming_update_table
Setup sink connector
First, let’s set up a “sink” file and tail it (recall that the file location directory specified is mounted in the Kafka Connect container via the docker-compose
file):
touch data/streaming_output.txt
tail -f data/streaming_output.txt
Here’s the Sink Connector config which needs to be posted to Kafka Connect:
POST
http://localhost:8083/connectors
{
"name":"streaming.updates.sink",
"config":{
"connector.class":"org.apache.kafka.connect.file.FileStreamSinkConnector",
"tasks.max":"1",
"topics": "postgres.with.timestamps.streaming_update_table",
"file":"/tmp/data/streaming_output.txt"
}
}
After a few seconds the record should show up in the window where you ran tail -f data/streaming_output.txt
Profit
Now we can make updates to our existing Postgres records and the updates will “stream” out of the database thru Kafka Connect and into a topic.
UPDATE streaming_update_table
SET data = 'some new data'
WHERE id = 1;
The record should come across to our file!
Use case? We can store all the database changes as they happen, and then play them back later - i.e. for disaster recovery. This is an alternative to say, taking a whole snapshot of the database.
We can also soft delete and watch the update come through the data pipeline and into our sink:
UPDATE streaming_update_table
SET is_soft_deleted = true
WHERE id = 1;
Note that if we issue an acutal SQL DELETE
command into our psql
session, that update does not come across the topic. For that we would apparently need some more specialized “CDC” or Change Data Capture software. I’ll save that for another time. Also in my experience it is farily uncommon to actually “DELETE” data from a database, more common to soft-delete it using a boolean flag.