This application demonstrates Change Data Capture (CDC) from PostgreSQL using Debezium. It captures database changes and processes them in real-time.
- handle failure & restarts
- handling multiple tables from one slot & publish to one topic
- increasing throughput through batching
- loading historical data
- replaying data
- consistency guarantees
- exactly once or at-least once, can you get it near exactly once?
- no missing data
- metrics on processing volume
- Real-time change capture from PostgreSQL using Debezium
- Configurable table inclusion and filtering
- Offset management for reliable processing
- Integration with Spring Boot
- Java 21+
- Maven 3.6+
- PostgreSQL 14+
- Docker (for Kafka, if needed)
just setup # install dependencies
just up # setup postgres & kafka
just run # run the application
just load # insert data into postgres
# kafka ui - http://localhost:8088
just down # stop postgres & kafkaUpdate the application.yml with your PostgreSQL connection details:
application:
postgres:
url: jdbc:postgresql://localhost:5432/your_database
username: your_username
password: your_password
replication:
plugin_name: pgoutput
slot_name: debezium_slot
publication_names: debezium_pub
server_name: dbserver1
table_include_list: public.*-
Ensure PostgreSQL is running with the following settings:
wal_level = logicalmax_wal_senders> 1max_replication_slots> 1
-
Create a replication slot and publication in PostgreSQL:
-- Create replication slot SELECT pg_create_logical_replication_slot('debezium_slot', 'pgoutput'); -- Create publication for tables you want to track CREATE PUBLICATION debezium_pub FOR ALL TABLES;
-
Build and run the application:
mvn clean install java -jar target/postgres-stream-using-logical-replication-0.0.1-SNAPSHOT.jar
The application uses Debezium's embedded engine to capture changes from PostgreSQL. When a change occurs in the tracked tables, the handleChangeEvent method in DebeziumConfig is called with the change event.
plugin_name: PostgreSQL logical decoding plugin (default: pgoutput)slot_name: Replication slot namepublication_names: PostgreSQL publication nameserver_name: Logical name for the database servertable_include_list: Tables to include in change capture (e.g., public.*)include_schema_changes: Whether to include schema changes (default: false)snapshot_mode: When to take snapshots (default: never)