ntroduction
After setting up your Debezium PostgreSQL connector on Amazon MSK Connect, it's crucial to verify that both the initial snapshot and ongoing change data capture (CDC) are working correctly. This guide provides comprehensive testing procedures to ensure your CDC pipeline is functioning as expected.
Understanding Debezium Phases
Debezium operates in two distinct phases:
- Snapshot Phase: Captures the current state of all tables in
table.include.list - Streaming Phase: Continuously captures INSERT, UPDATE, and DELETE operations
Prerequisites
Before testing, ensure:
- Debezium connector is in
RUNNINGstate - Replication slot is active in PostgreSQL
- Kafka topics are created (if auto-create is disabled)
- You have access to Kafka consumer tools
Phase 1: Verify Initial Snapshot
Step 1: Check Connector State
aws kafkaconnect describe-connector \
--connector-arn <your-connector-arn> \
--region us-east-1 \
--query 'connectorState' \
--output text
Expected output: RUNNING
Step 2: Monitor CloudWatch Logs for Snapshot Progress
aws logs tail /aws/mskconnect/rds-postgres-debezium \
--since 10m \
--region us-east-1 | grep -i snapshot
Look for these log entries indicating snapshot progress:
[INFO] Snapshot step 1 - Preparing
[INFO] Snapshot step 2 - Determining captured tables
[INFO] Snapshot step 3 - Locking captured tables
[INFO] Snapshot step 4 - Determining snapshot offset
[INFO] Snapshot step 5 - Reading structure of captured tables
[INFO] Snapshot step 6 - Persisting schema history
[INFO] Snapshot step 7 - Snapshotting data
[INFO] Snapshot step 8 - Finalizing
[INFO] Snapshot completed successfully
Step 3: Verify Replication Slot Created
Connect to PostgreSQL and check the replication slot:
SELECT slot_name,
plugin,
slot_type,
active,
confirmed_flush_lsn,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn)) AS lag
FROM pg_replication_slots
WHERE slot_name = 'debezium_slot';
Expected output:
slot_name | plugin | slot_type | active | confirmed_flush_lsn | lag
---------------+----------+-----------+--------+---------------------+--------
debezium_slot | pgoutput | logical | t | 0/ABC12345 | 0 bytes
Key indicators:
active = t(true) - Connector is connectedlagshould be minimal (< 1 MB for idle systems)
Step 4: Verify Kafka Topics Created
List Kafka topics to confirm they were created:
# Get bootstrap servers
BOOTSTRAP_SERVERS=$(aws kafka get-bootstrap-brokers \
--cluster-arn <your-msk-cluster-arn> \
--region us-east-1 \
--query 'BootstrapBrokerString' \
--output text)
# List topics
kafka-topics.sh --bootstrap-server $BOOTSTRAP_SERVERS --list | grep cdc
Expected topics:
cdc.public.orders
cdc.public.customers
Step 5: Consume Snapshot Data from Kafka
Consume messages from a topic to verify snapshot data:
kafka-console-consumer.sh \
--bootstrap-server $BOOTSTRAP_SERVERS \
--topic cdc.public.orders \
--from-beginning \
--max-messages 5
Snapshot Message Structure:
{
"before": null,
"after": {
"order_id": 1,
"customer_id": 101,
"order_date": "2026-03-20T08:30:00Z",
"total_amount": 99.99,
"status": "pending"
},
"source": {
"version": "3.2.6.Final",
"connector": "postgresql",
"name": "cdc",
"ts_ms": 1773991234567,
"snapshot": "true",
"db": "testdb",
"schema": "public",
"table": "orders",
"txId": null,
"lsn": null
},
"op": "r",
"ts_ms": 1773991234567
}
Key fields to verify:
"snapshot": "true"- Indicates this is snapshot data"op": "r"- Read operation (snapshot)"before": null- No previous state for snapshot"after": {...}- Current row data
Step 6: Verify Row Count Matches
Compare row counts between PostgreSQL and Kafka:
PostgreSQL:
SELECT COUNT(*) FROM public.orders;
SELECT COUNT(*) FROM public.customers;
Kafka (count messages):
# Count messages in orders topic
kafka-run-class.sh kafka.tools.GetOffsetShell \
--broker-list $BOOTSTRAP_SERVERS \
--topic cdc.public.orders | awk -F ":" '{sum += $3} END {print sum}'
Row counts should match.
Phase 2: Verify CDC Operations
Now test that ongoing changes are captured correctly.
Test 1: INSERT Operation
Step 1: Insert Data in PostgreSQL
INSERT INTO public.customers (customer_name, email)
VALUES ('Jane Smith', 'jane@example.com')
RETURNING customer_id;
Note the returned customer_id (e.g., 2).
Step 2: Consume from Kafka
kafka-console-consumer.sh \
--bootstrap-server $BOOTSTRAP_SERVERS \
--topic cdc.public.customers \
--from-beginning \
--max-messages 10
Step 3: Verify INSERT Message
Look for the new record:
{
"before": null,
"after": {
"customer_id": 2,
"customer_name": "Jane Smith",
"email": "jane@example.com",
"created_at": "2026-03-20T08:45:00Z"
},
"source": {
"version": "3.2.6.Final",
"connector": "postgresql",
"name": "cdc",
"ts_ms": 1773991500000,
"snapshot": "false",
"db": "testdb",
"sequence": "[\"0/ABC12345\",\"0/ABC12346\"]",
"schema": "public",
"table": "customers",
"txId": 12345,
"lsn": 2886123456
},
"op": "c",
"ts_ms": 1773991500123
}
Verify:
- ✅
"op": "c"- Create (INSERT) operation - ✅
"snapshot": "false"- Streaming mode - ✅
"before": null- No previous state - ✅
"after"contains new row data - ✅
txIdandlsnare present
Test 2: UPDATE Operation
Step 1: Update Data in PostgreSQL
UPDATE public.customers
SET email = 'jane.smith@example.com'
WHERE customer_id = 2;
Step 2: Consume from Kafka
kafka-console-consumer.sh \
--bootstrap-server $BOOTSTRAP_SERVERS \
--topic cdc.public.customers \
--offset latest \
--partition 0 \
--max-messages 1
Step 3: Verify UPDATE Message
{
"before": {
"customer_id": 2,
"customer_name": "Jane Smith",
"email": "jane@example.com",
"created_at": "2026-03-20T08:45:00Z"
},
"after": {
"customer_id": 2,
"customer_name": "Jane Smith",
"email": "jane.smith@example.com",
"created_at": "2026-03-20T08:45:00Z"
},
"source": {
"version": "3.2.6.Final",
"connector": "postgresql",
"name": "cdc",
"ts_ms": 1773991600000,
"snapshot": "false",
"db": "testdb",
"sequence": "[\"0/ABC12346\",\"0/ABC12347\"]",
"schema": "public",
"table": "customers",
"txId": 12346,
"lsn": 2886123457
},
"op": "u",
"ts_ms": 1773991600234
}
Verify:
- ✅
"op": "u"- Update operation - ✅
"before"contains old values - ✅
"after"contains new values - ✅ Only
emailfield changed
Test 3: DELETE Operation
Step 1: Delete Data in PostgreSQL
DELETE FROM public.customers WHERE customer_id = 2;
Step 2: Consume from Kafka
kafka-console-consumer.sh \
--bootstrap-server $BOOTSTRAP_SERVERS \
--topic cdc.public.customers \
--offset latest \
--partition 0 \
--max-messages 1
Step 3: Verify DELETE Message
{
"before": {
"customer_id": 2,
"customer_name": "Jane Smith",
"email": "jane.smith@example.com",
"created_at": "2026-03-20T08:45:00Z"
},
"after": null,
"source": {
"version": "3.2.6.Final",
"connector": "postgresql",
"name": "cdc",
"ts_ms": 1773991700000,
"snapshot": "false",
"db": "testdb",
"sequence": "[\"0/ABC12347\",\"0/ABC12348\"]",
"schema": "public",
"table": "customers",
"txId": 12347,
"lsn": 2886123458
},
"op": "d",
"ts_ms": 1773991700345
}
Verify:
- ✅
"op": "d"- Delete operation - ✅
"before"contains deleted row data - ✅
"after": null- No new state - ✅ Tombstone message may follow (if enabled)
Test 4: Bulk Operations
Test with multiple operations in a transaction:
BEGIN;
INSERT INTO public.orders (customer_id, total_amount, status)
VALUES (1, 150.00, 'pending');
INSERT INTO public.orders (customer_id, total_amount, status)
VALUES (1, 200.00, 'pending');
UPDATE public.orders SET status = 'confirmed' WHERE customer_id = 1;
COMMIT;
Verify all operations appear in Kafka with the same txId.
Phase 3: Verify Data Consistency
Check 1: Message Ordering
Verify messages are ordered by LSN:
kafka-console-consumer.sh \
--bootstrap-server $BOOTSTRAP_SERVERS \
--topic cdc.public.orders \
--from-beginning \
--property print.key=true \
--max-messages 10 | jq '.source.lsn'
LSN values should be monotonically increasing.
Check 2: No Message Loss
Compare database transaction count with Kafka message count:
PostgreSQL:
SELECT COUNT(*) as total_operations FROM (
SELECT 'insert' as op, COUNT(*) as cnt FROM public.orders
UNION ALL
SELECT 'update', COUNT(*) FROM pg_stat_user_tables WHERE relname = 'orders'
) t;
Kafka: Count all messages (snapshot + streaming)
Check 3: Schema Consistency
Verify the schema in Kafka matches PostgreSQL:
-- PostgreSQL schema
\d public.orders
Compare with after field structure in Kafka messages.
Phase 4: Performance Verification
Check 1: Replication Lag
Monitor replication lag to ensure it stays minimal:
SELECT slot_name,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn)) AS lag,
pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn) AS lag_bytes
FROM pg_replication_slots
WHERE slot_name = 'debezium_slot';
Healthy lag: < 10 MB for active systems
Check 2: Connector Throughput
Check CloudWatch logs for throughput metrics:
aws logs tail /aws/mskconnect/rds-postgres-debezium \
--since 5m \
--region us-east-1 | grep "records sent"
Example output:
[INFO] 150 records sent during previous 00:00:30.123
Check 3: WAL Growth
Monitor WAL disk usage:
SELECT pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS wal_retained
FROM pg_replication_slots
WHERE slot_name = 'debezium_slot';
Troubleshooting Common Issues
Issue 1: No Messages in Kafka
Symptoms:
- Connector is RUNNING
- Replication slot is active
- But no messages in Kafka topics
Diagnosis:
# Check CloudWatch logs
aws logs tail /aws/mskconnect/rds-postgres-debezium --since 10m
# Check for errors
aws logs filter-log-events \
--log-group-name /aws/mskconnect/rds-postgres-debezium \
--filter-pattern "ERROR" \
--since 10m
Common Causes:
- Tables not in publication
- Snapshot still in progress
- Network connectivity issues
Solution:
-- Verify tables in publication
SELECT schemaname, tablename
FROM pg_publication_tables
WHERE pubname = 'debezium_publication';
-- Add missing tables
ALTER PUBLICATION debezium_publication ADD TABLE public.missing_table;
Issue 2: Snapshot Data Missing
Symptoms: Streaming works but initial snapshot incomplete
Diagnosis:
# Check snapshot completion
aws logs tail /aws/mskconnect/rds-postgres-debezium \
--since 30m | grep -i "snapshot.*complete"
Solution: If snapshot failed, restart connector or use snapshot.mode=always to force re-snapshot
Issue 3: Duplicate Messages
Symptoms: Same operation appears multiple times in Kafka
Cause: Connector restart during processing
Solution: This is expected behavior. Implement idempotent consumers using the source.lsn field as a deduplication key.
Issue 4: Schema Mismatch
Symptoms: Kafka messages missing columns
Cause: Schema change not captured
Solution:
-- Refresh publication
ALTER PUBLICATION debezium_publication REFRESH;
-- Restart connector to pick up schema changes
Automated Verification Script
Create a script to automate verification:
#!/bin/bash
echo "=== Debezium CDC Verification ==="
# 1. Check connector state
echo "1. Connector State:"
aws kafkaconnect describe-connector \
--connector-arn $CONNECTOR_ARN \
--region us-east-1 \
--query 'connectorState' \
--output text
# 2. Check replication slot
echo "2. Replication Slot:"
psql -h $DB_HOST -U $DB_USER -d $DB_NAME -c \
"SELECT slot_name, active, pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn)) AS lag FROM pg_replication_slots WHERE slot_name = 'debezium_slot';"
# 3. Test INSERT
echo "3. Testing INSERT..."
psql -h $DB_HOST -U $DB_USER -d $DB_NAME -c \
"INSERT INTO public.customers (customer_name, email) VALUES ('Test User', 'test@example.com');"
sleep 5
# 4. Verify in Kafka
echo "4. Verifying in Kafka..."
kafka-console-consumer.sh \
--bootstrap-server $BOOTSTRAP_SERVERS \
--topic cdc.public.customers \
--offset latest \
--partition 0 \
--max-messages 1 \
--timeout-ms 10000
echo "=== Verification Complete ==="
Best Practices for Verification
Test in Non-Production First: Always verify CDC in a test environment before production
Monitor Continuously: Set up CloudWatch alarms for:
- Connector state changes
- Replication lag > threshold
- Error log entries
Validate Data Types: Ensure PostgreSQL data types map correctly to Kafka/JSON
Test Edge Cases:
- NULL values
- Large text fields
- Binary data (bytea)
- Arrays and JSON columns
Document Baseline: Record normal throughput and lag metrics for comparison
Next Steps
Now that you've verified CDC is working:
- Set Up Heartbeat: Prevent replication lag with idle tables (Blog Post #3)
- Implement Monitoring: Set up comprehensive monitoring and alerting (Blog Post #4)
- Build Consumers: Create applications to consume and process CDC events
Conclusion
Thorough verification of your Debezium CDC pipeline ensures data integrity and reliability. By testing both the initial snapshot and ongoing change capture, you can confidently deploy CDC to production. Regular monitoring and testing should be part of your operational procedures to catch issues early.
No comments:
Post a Comment