Introduction
Effective monitoring and troubleshooting are critical for maintaining a healthy Change Data Capture (CDC) pipeline. This comprehensive guide covers monitoring strategies, key metrics, diagnostic queries, and solutions to common issues when running Debezium PostgreSQL connector on Amazon MSK Connect.
Monitoring Architecture
Key Components to Monitor
- MSK Connect Connector - Connector state and health
- PostgreSQL Replication Slot - Lag and WAL retention
- Kafka Topics - Message flow and consumer lag
- CloudWatch Logs - Connector logs and errors
- Database Performance - Connection count and query performance
Essential Monitoring Queries
1. Replication Slot Health
Primary monitoring query:
SELECT
slot_name,
plugin,
slot_type,
active,
confirmed_flush_lsn,
pg_current_wal_lsn() as current_lsn,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn)) AS lag,
pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn) as lag_bytes,
restart_lsn,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS wal_retained,
CASE
WHEN pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn) < 10485760 THEN 'HEALTHY'
WHEN pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn) < 104857600 THEN 'WARNING'
ELSE 'CRITICAL'
END as status
FROM pg_replication_slots
WHERE slot_name = 'debezium_slot';
Expected output:
slot_name | plugin | slot_type | active | confirmed_flush_lsn | current_lsn | lag | lag_bytes | restart_lsn | wal_retained | status
---------------+----------+-----------+--------+---------------------+-------------+--------+-----------+-------------+--------------+---------
debezium_slot | pgoutput | logical | t | 0/ABC12345 | 0/ABC12400 | 187 kB | 187000 | 0/ABC12300 | 256 kB | HEALTHY
Health Thresholds:
- HEALTHY: < 10 MB lag
- WARNING: 10 MB - 100 MB lag
- CRITICAL: > 100 MB lag
2. Active Replication Connections
SELECT
pid,
application_name,
client_addr,
state,
backend_type,
backend_start,
state_change
FROM pg_stat_activity
WHERE backend_type = 'walsender';
Expected output:
pid | application_name | client_addr | state | backend_type | backend_start | state_change
-------+---------------------+--------------+-----------+--------------+--------------------------+---------------------------
12345 | debezium_slot | 10.0.1.50 | streaming | walsender | 2026-03-20 08:00:00+00 | 2026-03-20 08:00:05+00
3. Publication Status
-- List all publications
SELECT * FROM pg_publication;
-- List tables in publication
SELECT schemaname, tablename
FROM pg_publication_tables
WHERE pubname = 'debezium_publication'
ORDER BY schemaname, tablename;
4. WAL Disk Usage
SELECT
slot_name,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS wal_retained,
pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn) / 1024 / 1024 as wal_retained_mb
FROM pg_replication_slots
WHERE slot_name = 'debezium_slot';
5. Heartbeat Freshness (If Using Heartbeat Table)
SELECT
id,
ts as last_heartbeat,
NOW() - ts as age,
CASE
WHEN NOW() - ts < INTERVAL '1 minute' THEN 'HEALTHY'
WHEN NOW() - ts < INTERVAL '5 minutes' THEN 'WARNING'
ELSE 'CRITICAL'
END as status
FROM public.debezium_heartbeat
ORDER BY ts DESC
LIMIT 1;
6. Table Activity Statistics
SELECT
schemaname,
relname as table_name,
n_tup_ins as inserts,
n_tup_upd as updates,
n_tup_del as deletes,
n_live_tup as live_rows,
n_dead_tup as dead_rows,
last_vacuum,
last_autovacuum
FROM pg_stat_user_tables
WHERE schemaname = 'public'
ORDER BY n_tup_ins + n_tup_upd + n_tup_del DESC;
AWS Monitoring
1. Check Connector State
# Get connector state
aws kafkaconnect describe-connector \
--connector-arn <connector-arn> \
--region us-east-1 \
--query 'connectorState' \
--output text
Possible states:
CREATING- Connector being createdRUNNING- Healthy and operationalUPDATING- Configuration being updatedFAILED- Connector failed (requires investigation)DELETING- Being deleted
2. Get Connector Details
aws kafkaconnect describe-connector \
--connector-arn <connector-arn> \
--region us-east-1 \
--query '{
State: connectorState,
Version: currentVersion,
Capacity: capacity.provisionedCapacity,
CreatedAt: creationTime
}'
3. Monitor CloudWatch Logs
Tail logs in real-time:
aws logs tail /aws/mskconnect/rds-postgres-debezium \
--since 5m \
--follow \
--region us-east-1
Search for errors:
aws logs filter-log-events \
--log-group-name /aws/mskconnect/rds-postgres-debezium \
--filter-pattern "ERROR" \
--start-time $(date -u -d '1 hour ago' +%s)000 \
--region us-east-1
Check throughput:
aws logs tail /aws/mskconnect/rds-postgres-debezium \
--since 5m \
--region us-east-1 | grep "records sent"
Expected output:
[INFO] 150 records sent during previous 00:00:30.123, last recorded offset is {lsn=42633788024}
4. Kafka Topic Monitoring
List topics:
BOOTSTRAP_SERVERS=$(aws kafka get-bootstrap-brokers \
--cluster-arn <msk-cluster-arn> \
--region us-east-1 \
--query 'BootstrapBrokerString' \
--output text)
kafka-topics.sh --bootstrap-server $BOOTSTRAP_SERVERS --list | grep cdc
Check topic details:
kafka-topics.sh --bootstrap-server $BOOTSTRAP_SERVERS \
--describe --topic cdc.public.orders
Get message count:
kafka-run-class.sh kafka.tools.GetOffsetShell \
--broker-list $BOOTSTRAP_SERVERS \
--topic cdc.public.orders | awk -F ":" '{sum += $3} END {print sum}'
CloudWatch Alarms
1. Replication Lag Alarm
Create a custom metric from CloudWatch Logs Insights:
Query:
fields @timestamp, @message
| filter @message like /last recorded offset/
| parse @message /lsn=(?<lsn>\d+)/
| stats latest(lsn) as latest_lsn by bin(5m)
Alarm Configuration:
aws cloudwatch put-metric-alarm \
--alarm-name "Debezium-Replication-Lag-High" \
--alarm-description "Alert when replication lag exceeds 100MB" \
--metric-name ReplicationLagBytes \
--namespace Custom/Debezium \
--statistic Average \
--period 300 \
--threshold 104857600 \
--comparison-operator GreaterThanThreshold \
--evaluation-periods 2 \
--region us-east-1
2. Connector State Alarm
Monitor connector state changes:
aws cloudwatch put-metric-alarm \
--alarm-name "Debezium-Connector-Failed" \
--alarm-description "Alert when connector enters FAILED state" \
--metric-name ConnectorState \
--namespace AWS/KafkaConnect \
--statistic Maximum \
--period 60 \
--threshold 1 \
--comparison-operator GreaterThanThreshold \
--evaluation-periods 1 \
--region us-east-1
3. Error Log Alarm
Create metric filter for errors:
aws logs put-metric-filter \
--log-group-name /aws/mskconnect/rds-postgres-debezium \
--filter-name DebeziumErrors \
--filter-pattern "ERROR" \
--metric-transformations \
metricName=ErrorCount,\
metricNamespace=Custom/Debezium,\
metricValue=1 \
--region us-east-1
Common Issues and Solutions
Issue 1: Connector in FAILED State
Symptoms:
- Connector state shows
FAILED - No messages flowing to Kafka
- Replication slot inactive
Diagnosis:
# Check recent logs
aws logs tail /aws/mskconnect/rds-postgres-debezium \
--since 30m | grep -i "error\|exception\|failed"
# Check connector details
aws kafkaconnect describe-connector \
--connector-arn <connector-arn> \
--region us-east-1
Common Causes and Solutions:
Database Connection Failed
- Error:
Connection refusedortimeout - Solution: Check security groups, verify database endpoint, test connectivity
- Error:
Authentication Failed
- Error:
password authentication failed - Solution: Verify credentials, check user permissions
- Error:
Replication Slot Conflict
- Error:
replication slot "debezium_slot" is active for PID 12345 - Solution:
SELECT pg_terminate_backend(12345); -- Or drop and recreate slot SELECT pg_drop_replication_slot('debezium_slot');
- Error:
Publication Missing
- Error:
publication "debezium_publication" does not exist - Solution:
CREATE PUBLICATION debezium_publication FOR TABLE public.orders, public.customers;
- Error:
Issue 2: High Replication Lag
Symptoms:
- Replication lag > 100 MB
- WAL files accumulating
- Disk space warnings
Diagnosis:
-- Check current lag
SELECT
slot_name,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn)) AS lag,
active
FROM pg_replication_slots
WHERE slot_name = 'debezium_slot';
-- Check table activity
SELECT schemaname, relname, n_tup_ins, n_tup_upd, n_tup_del
FROM pg_stat_user_tables
WHERE schemaname = 'public'
ORDER BY n_tup_ins + n_tup_upd + n_tup_del DESC;
Solutions:
Implement Heartbeat (if not already done)
- See Blog Post #3 for implementation
- Prevents lag with idle replicated tables
Increase Connector Capacity
aws kafkaconnect update-connector \ --connector-arn <connector-arn> \ --current-version <version> \ --capacity '{ "provisionedCapacity": { "mcuCount": 2, "workerCount": 2 } }' \ --region us-east-1Optimize Table Selection
- Remove unnecessary tables from
table.include.list - Split large tables across multiple connectors
- Remove unnecessary tables from
Check Network Throughput
- Verify network connectivity between connector and database
- Check for network throttling or bandwidth limits
Issue 3: Snapshot Taking Too Long
Symptoms:
- Connector stuck in snapshot phase
- CloudWatch logs show slow progress
- Initial data not appearing in Kafka
Diagnosis:
# Check snapshot progress
aws logs tail /aws/mskconnect/rds-postgres-debezium \
--since 10m | grep -i snapshot
Solutions:
Use Parallel Snapshots (for large tables)
{ "snapshot.max.threads": "4" }Skip Snapshot (if initial data not needed)
{ "snapshot.mode": "never" }Snapshot Specific Tables Only
{ "snapshot.include.collection.list": "public.orders" }
Issue 4: Duplicate Messages in Kafka
Symptoms:
- Same CDC event appears multiple times
- Consumer sees duplicate records
Cause: Connector restart during processing
Solution: This is expected behavior. Implement idempotent consumers:
# Example: Use LSN as deduplication key
processed_lsns = set()
for message in consumer:
lsn = message.value['source']['lsn']
if lsn in processed_lsns:
continue # Skip duplicate
process_message(message)
processed_lsns.add(lsn)
Issue 5: Schema Changes Not Captured
Symptoms:
- New columns not appearing in Kafka messages
- Schema mismatch errors
Diagnosis:
-- Check table structure
\d public.orders
-- Check publication
SELECT * FROM pg_publication_tables WHERE pubname = 'debezium_publication';
Solutions:
Refresh Publication
ALTER PUBLICATION debezium_publication REFRESH;Restart Connector
# Update connector with same config to force restart aws kafkaconnect update-connector \ --connector-arn <connector-arn> \ --current-version <version> \ --connector-configuration file://connector-config.json \ --region us-east-1Enable Schema History
{ "schema.history.internal.kafka.topic": "schema-changes.cdc", "schema.history.internal.kafka.bootstrap.servers": "bootstrap-servers" }
Issue 6: Connector Consuming Too Many Database Connections
Symptoms:
- Database connection limit reached
too many connectionserror
Diagnosis:
-- Check current connections
SELECT COUNT(*) as total_connections,
COUNT(*) FILTER (WHERE application_name LIKE 'debezium%') as debezium_connections
FROM pg_stat_activity;
-- Check connection limit
SHOW max_connections;
Solutions:
Reduce Tasks
{ "tasks.max": "1" }Increase Database Connection Limit
- Modify RDS parameter group
- Set
max_connectionsto higher value - Reboot database
Use Connection Pooling
{ "database.initial.statements": "SET application_name='debezium'" }
Issue 7: WAL Disk Space Full
Symptoms:
- Database disk space at 100%
- WAL files not being cleaned up
- Database performance degraded
Diagnosis:
-- Check WAL retention
SELECT
slot_name,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS wal_retained
FROM pg_replication_slots;
-- Check disk usage
SELECT pg_size_pretty(pg_database_size('testdb'));
Immediate Solutions:
Drop Inactive Replication Slots
-- List all slots SELECT * FROM pg_replication_slots; -- Drop inactive slot SELECT pg_drop_replication_slot('old_slot_name');Advance Slot Manually (emergency only)
-- Advance to current LSN (data loss!) SELECT pg_replication_slot_advance('debezium_slot', pg_current_wal_lsn());Increase Disk Space
- Modify RDS instance storage
- Scale up to larger instance type
Long-term Solutions:
- Implement heartbeat (Blog Post #3)
- Set up monitoring and alerts
- Regular slot health checks
Monitoring Dashboard
Create CloudWatch Dashboard
aws cloudwatch put-dashboard \
--dashboard-name Debezium-CDC-Monitoring \
--dashboard-body file://dashboard.json \
--region us-east-1
dashboard.json:
{
"widgets": [
{
"type": "metric",
"properties": {
"metrics": [
["Custom/Debezium", "ReplicationLagBytes"]
],
"period": 300,
"stat": "Average",
"region": "us-east-1",
"title": "Replication Lag"
}
},
{
"type": "log",
"properties": {
"query": "SOURCE '/aws/mskconnect/rds-postgres-debezium' | fields @timestamp, @message | filter @message like /ERROR/ | sort @timestamp desc | limit 20",
"region": "us-east-1",
"title": "Recent Errors"
}
}
]
}
Automated Health Check Script
Create a comprehensive health check script:
#!/bin/bash
echo "=== Debezium CDC Health Check ==="
echo "Timestamp: $(date)"
echo ""
# 1. Connector State
echo "1. Connector State:"
CONNECTOR_STATE=$(aws kafkaconnect describe-connector \
--connector-arn $CONNECTOR_ARN \
--region us-east-1 \
--query 'connectorState' \
--output text)
echo " Status: $CONNECTOR_STATE"
if [ "$CONNECTOR_STATE" != "RUNNING" ]; then
echo " ⚠️ WARNING: Connector not in RUNNING state"
fi
echo ""
# 2. Replication Slot
echo "2. Replication Slot:"
psql -h $DB_HOST -U $DB_USER -d $DB_NAME << EOF
SELECT
slot_name,
active,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn)) AS lag,
CASE
WHEN pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn) < 10485760 THEN '✅ HEALTHY'
WHEN pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn) < 104857600 THEN '⚠️ WARNING'
ELSE '❌ CRITICAL'
END as status
FROM pg_replication_slots
WHERE slot_name = 'debezium_slot';
EOF
echo ""
# 3. Recent Errors
echo "3. Recent Errors (last 10 minutes):"
ERROR_COUNT=$(aws logs filter-log-events \
--log-group-name /aws/mskconnect/rds-postgres-debezium \
--filter-pattern "ERROR" \
--start-time $(($(date +%s) - 600))000 \
--region us-east-1 \
--query 'length(events)' \
--output text)
echo " Error count: $ERROR_COUNT"
if [ "$ERROR_COUNT" -gt 0 ]; then
echo " ⚠️ Errors detected in logs"
fi
echo ""
# 4. Throughput
echo "4. Throughput (last 5 minutes):"
aws logs tail /aws/mskconnect/rds-postgres-debezium \
--since 5m \
--region us-east-1 | grep "records sent" | tail -1
echo ""
# 5. Heartbeat (if using heartbeat table)
echo "5. Heartbeat Status:"
psql -h $DB_HOST -U $DB_USER -d $DB_NAME << EOF
SELECT
ts as last_heartbeat,
NOW() - ts as age,
CASE
WHEN NOW() - ts < INTERVAL '1 minute' THEN '✅ HEALTHY'
WHEN NOW() - ts < INTERVAL '5 minutes' THEN '⚠️ WARNING'
ELSE '❌ CRITICAL'
END as status
FROM public.debezium_heartbeat
ORDER BY ts DESC
LIMIT 1;
EOF
echo ""
echo "=== Health Check Complete ==="
Make it executable and schedule with cron:
chmod +x health-check.sh
# Run every 5 minutes
crontab -e
*/5 * * * * /path/to/health-check.sh >> /var/log/debezium-health.log 2>&1
Best Practices
1. Proactive Monitoring
- Set up CloudWatch alarms for critical metrics
- Monitor replication lag continuously
- Track connector state changes
- Review logs daily for warnings
2. Regular Health Checks
- Run automated health checks every 5 minutes
- Review weekly reports
- Test failover scenarios monthly
- Document baseline metrics
3. Capacity Planning
- Monitor throughput trends
- Plan for peak loads
- Scale connector capacity proactively
- Review table growth patterns
4. Incident Response
- Document runbooks for common issues
- Maintain escalation procedures
- Keep contact information updated
- Practice recovery procedures
5. Performance Optimization
- Tune
heartbeat.interval.msbased on workload - Optimize
table.include.list - Use appropriate
snapshot.mode - Monitor and adjust connector capacity
Troubleshooting Checklist
When issues occur, follow this systematic approach:
- [ ] Check connector state in AWS Console
- [ ] Review CloudWatch logs for errors
- [ ] Verify replication slot is active
- [ ] Check database connectivity
- [ ] Verify publication includes all tables
- [ ] Check Kafka topic existence
- [ ] Review security group rules
- [ ] Verify database user permissions
- [ ] Check disk space on database
- [ ] Review recent configuration changes
Conclusion
Effective monitoring and troubleshooting are essential for maintaining a reliable CDC pipeline. By implementing the monitoring queries, CloudWatch alarms, and automated health checks described in this guide, you can proactively identify and resolve issues before they impact your data pipeline. Regular monitoring combined with documented troubleshooting procedures ensures your Debezium CDC system remains healthy and performant.
No comments:
Post a Comment