Introduction
Change Data Capture (CDC) is essential for real-time data integration, enabling you to capture and stream database changes to downstream systems. Amazon MSK Connect provides a fully managed service to run Apache Kafka Connect connectors, including Debezium for PostgreSQL CDC. This guide walks you through setting up a Debezium PostgreSQL connector on MSK Connect to capture changes from Amazon Aurora PostgreSQL or Amazon RDS PostgreSQL.
Architecture Overview
Our CDC pipeline consists of:
- Source: Amazon Aurora PostgreSQL 17.5 (or RDS PostgreSQL)
- Target: Amazon MSK (Kafka 3.7.x)
- CDC Tool: Debezium PostgreSQL Connector 3.2.6
- Deployment: MSK Connect (fully managed)
Prerequisites
Before you begin, ensure you have:
Amazon Aurora PostgreSQL or RDS PostgreSQL with:
- PostgreSQL version 10 or higher
- Logical replication enabled (
rds.logical_replication = 1) - Appropriate security groups allowing connectivity
Amazon MSK Cluster with:
- Kafka version 3.7.x or compatible
- Appropriate security groups
- Bootstrap servers accessible from MSK Connect
AWS CLI configured with appropriate permissions
IAM Roles with permissions for:
- MSK Connect operations
- S3 access (for custom plugin storage)
- CloudWatch Logs (for connector logging)
Step 1: Prepare PostgreSQL Database
1.1 Enable Logical Replication
For RDS/Aurora PostgreSQL, modify the parameter group:
-- Check current setting
SHOW wal_level;
-- If not 'logical', modify parameter group
-- In AWS Console: RDS > Parameter Groups > Edit
-- Set: rds.logical_replication = 1
-- Reboot the database instance
1.2 Create Replication User (Optional)
Create a dedicated user for Debezium:
-- Create user
CREATE USER debezium_user WITH PASSWORD 'your_secure_password';
-- Grant replication privileges
ALTER USER debezium_user WITH REPLICATION;
-- Grant schema access
GRANT USAGE ON SCHEMA public TO debezium_user;
-- Grant table access
GRANT SELECT ON ALL TABLES IN SCHEMA public TO debezium_user;
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO debezium_user;
1.3 Create Publication
Create a PostgreSQL publication for the tables you want to replicate:
-- Create publication for specific tables
CREATE PUBLICATION debezium_publication FOR TABLE public.orders, public.customers;
-- Or create publication for all tables
CREATE PUBLICATION debezium_publication FOR ALL TABLES;
-- Verify publication
SELECT * FROM pg_publication;
SELECT schemaname, tablename
FROM pg_publication_tables
WHERE pubname = 'debezium_publication';
1.4 Create Test Tables
-- Create sample tables
CREATE TABLE public.orders (
order_id SERIAL PRIMARY KEY,
customer_id INTEGER NOT NULL,
order_date TIMESTAMP WITH TIME ZONE DEFAULT now(),
total_amount DECIMAL(10,2),
status VARCHAR(50)
);
CREATE TABLE public.customers (
customer_id SERIAL PRIMARY KEY,
customer_name VARCHAR(100) NOT NULL,
email VARCHAR(100),
created_at TIMESTAMP WITH TIME ZONE DEFAULT now()
);
-- Insert sample data
INSERT INTO public.customers (customer_name, email)
VALUES ('John Doe', 'john@example.com');
INSERT INTO public.orders (customer_id, total_amount, status)
VALUES (1, 99.99, 'pending');
Step 2: Create Debezium Custom Plugin
2.1 Download Debezium Connector
# Create working directory
mkdir -p ~/debezium-plugin
cd ~/debezium-plugin
# Download Debezium PostgreSQL connector from Confluent Hub
# Version 3.2.6 compatible with Kafka 3.7.x
wget https://d1i4a15mxbxib1.cloudfront.net/api/plugins/debezium/debezium-connector-postgresql/versions/3.2.6/debezium-debezium-connector-postgresql-3.2.6.zip
# Unzip
unzip debezium-debezium-connector-postgresql-3.2.6.zip
2.2 Upload to S3
# Create S3 bucket (if not exists)
aws s3 mb s3://your-msk-connect-plugins --region us-east-1
# Create ZIP file for MSK Connect
cd debezium-debezium-connector-postgresql-3.2.6
zip -r ../debezium-postgres-connector.zip .
cd ..
# Upload to S3
aws s3 cp debezium-postgres-connector.zip \
s3://your-msk-connect-plugins/debezium-postgres-connector.zip \
--region us-east-1
2.3 Create Custom Plugin in MSK Connect
aws kafkaconnect create-custom-plugin \
--custom-plugin-name debezium-postgres-3-2-6 \
--content-type ZIP \
--location s3Location={bucketArn=arn:aws:s3:::your-msk-connect-plugins,fileKey=debezium-postgres-connector.zip} \
--region us-east-1
Wait for the plugin to be in ACTIVE state:
aws kafkaconnect describe-custom-plugin \
--custom-plugin-arn <plugin-arn> \
--region us-east-1 \
--query 'customPluginState'
Step 3: Create Worker Configuration
Create a worker configuration for MSK Connect:
aws kafkaconnect create-worker-configuration \
--name debezium-worker-config \
--properties-file-content "$(cat <<EOF
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
EOF
)" \
--region us-east-1
Note: We disable schemas for simpler JSON output. Enable schemas if you need Avro or schema registry integration.
Step 4: Create Kafka Topics (If Auto-Create Disabled)
If your MSK cluster has auto.create.topics.enable=false, manually create topics:
# Get bootstrap servers
BOOTSTRAP_SERVERS=$(aws kafka get-bootstrap-brokers \
--cluster-arn <your-msk-cluster-arn> \
--region us-east-1 \
--query 'BootstrapBrokerString' \
--output text)
# Create topics for each table
kafka-topics.sh --bootstrap-server $BOOTSTRAP_SERVERS \
--create --topic cdc.public.orders \
--partitions 1 --replication-factor 2
kafka-topics.sh --bootstrap-server $BOOTSTRAP_SERVERS \
--create --topic cdc.public.customers \
--partitions 1 --replication-factor 2
Step 5: Create MSK Connect Connector
5.1 Prepare Connector Configuration
Create a file connector-config.json:
{
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "your-rds-endpoint.rds.amazonaws.com",
"database.port": "5432",
"database.user": "postgres",
"database.password": "your_secure_password",
"database.dbname": "your_database",
"database.server.name": "rds_pg",
"database.sslmode": "require",
"table.include.list": "public.orders,public.customers",
"plugin.name": "pgoutput",
"slot.name": "debezium_slot",
"publication.name": "debezium_publication",
"topic.prefix": "cdc",
"tasks.max": "1",
"snapshot.mode": "initial"
}
Key Parameters Explained:
| Parameter | Description | Value |
|---|---|---|
database.hostname | RDS/Aurora endpoint | Your database endpoint |
database.sslmode | SSL mode for connection | require for RDS/Aurora |
table.include.list | Tables to replicate | Comma-separated list |
plugin.name | Logical decoding plugin | pgoutput (native PostgreSQL) |
slot.name | Replication slot name | Unique identifier |
publication.name | PostgreSQL publication | Must match created publication |
topic.prefix | Kafka topic prefix | Prepended to table names |
snapshot.mode | Initial snapshot behavior | initial for first-time setup |
5.2 Create Connector
aws kafkaconnect create-connector \
--connector-name rds-postgres-debezium \
--kafka-connect-version "3.7.x" \
--capacity '{
"provisionedCapacity": {
"mcuCount": 1,
"workerCount": 1
}
}' \
--connector-configuration file://connector-config.json \
--kafka-cluster '{
"apacheKafkaCluster": {
"bootstrapServers": "b-1.your-msk.amazonaws.com:9092,b-2.your-msk.amazonaws.com:9092",
"vpc": {
"subnets": ["subnet-xxx", "subnet-yyy"],
"securityGroups": ["sg-xxx"]
}
}
}' \
--kafka-cluster-client-authentication '{
"authenticationType": "NONE"
}' \
--kafka-cluster-encryption-in-transit '{
"encryptionType": "PLAINTEXT"
}' \
--plugins '[{
"customPlugin": {
"customPluginArn": "arn:aws:kafkaconnect:us-east-1:xxx:custom-plugin/debezium-postgres-3-2-6/xxx",
"revision": 1
}
}]' \
--service-execution-role-arn "arn:aws:iam::xxx:role/MSKConnectRole" \
--worker-configuration '{
"workerConfigurationArn": "arn:aws:kafkaconnect:us-east-1:xxx:worker-configuration/debezium-worker-config/xxx",
"revision": 1
}' \
--log-delivery '{
"workerLogDelivery": {
"cloudWatchLogs": {
"enabled": true,
"logGroup": "/aws/mskconnect/rds-postgres-debezium"
}
}
}' \
--region us-east-1
5.3 Monitor Connector Creation
# Check connector state
aws kafkaconnect describe-connector \
--connector-arn <connector-arn> \
--region us-east-1 \
--query 'connectorState'
# Expected states: CREATING → RUNNING
Wait for the connector to reach RUNNING state (typically 2-5 minutes).
Step 6: Verify Replication Slot
Check that Debezium created the replication slot:
-- View replication slots
SELECT slot_name,
plugin,
slot_type,
active,
confirmed_flush_lsn,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn)) AS lag
FROM pg_replication_slots;
Expected output:
slot_name | plugin | slot_type | active | confirmed_flush_lsn | lag
---------------+----------+-----------+--------+---------------------+--------
debezium_slot | pgoutput | logical | t | 0/1234567 | 0 bytes
Step 7: Monitor CloudWatch Logs
View connector logs to verify successful startup:
aws logs tail /aws/mskconnect/rds-postgres-debezium \
--since 5m \
--follow \
--region us-east-1
Look for:
Snapshot step 1 - Preparing- Snapshot startingSnapshot step 8 - Finalizing- Snapshot completeStreaming requested from LSN- Streaming mode activerecords sent during previous- Data flowing
Common Issues and Solutions
Issue 1: Connector Fails to Start
Symptom: Connector state is FAILED
Check:
aws logs tail /aws/mskconnect/rds-postgres-debezium --since 10m
Common Causes:
- Incorrect database credentials
- Network connectivity issues (security groups)
- Publication doesn't exist
- Insufficient database permissions
Issue 2: Replication Slot Not Created
Symptom: No replication slot in pg_replication_slots
Solution: Check connector logs for permission errors. User needs REPLICATION privilege:
ALTER USER debezium_user WITH REPLICATION;
Issue 3: Topics Not Created
Symptom: Connector running but no Kafka topics
Solution: If MSK has auto-create disabled, manually create topics (see Step 4)
Issue 4: SSL Connection Errors
Symptom: SSL connection required error
Solution: Ensure database.sslmode=require in connector configuration
Best Practices
Use Dedicated Replication User: Create a separate user with minimal required permissions
Monitor Replication Lag: Set up CloudWatch alarms for replication slot lag
Enable CloudWatch Logs: Essential for troubleshooting
Start with Small Table Set: Test with a few tables before adding all tables
Use Appropriate Capacity: Start with 1 MCU and 1 worker, scale as needed
Snapshot Mode Selection:
initial: First-time setup (recommended)data_only: Skip snapshot, stream onlyinitial_only: Snapshot then stop
Topic Naming: Use consistent
topic.prefixfor easy identification
Next Steps
Now that your connector is running:
- Verify CDC Operations: Test INSERT, UPDATE, DELETE operations (see Blog Post #2)
- Set Up Heartbeat: Prevent replication lag with idle tables (see Blog Post #3)
- Monitor and Troubleshoot: Set up monitoring and alerts (see Blog Post #4)
Conclusion
You've successfully set up a Debezium PostgreSQL connector on Amazon MSK Connect! The connector is now capturing changes from your PostgreSQL database and streaming them to Kafka topics. This fully managed solution eliminates the operational overhead of running Kafka Connect yourself while providing enterprise-grade reliability and scalability.
No comments:
Post a Comment