Friday, March 20, 2026

Monitoring and Troubleshooting Debezium PostgreSQL CDC on Amazon MSK Connect

 

Introduction

Effective monitoring and troubleshooting are critical for maintaining a healthy Change Data Capture (CDC) pipeline. This comprehensive guide covers monitoring strategies, key metrics, diagnostic queries, and solutions to common issues when running Debezium PostgreSQL connector on Amazon MSK Connect.

Monitoring Architecture

Key Components to Monitor

  1. MSK Connect Connector - Connector state and health
  2. PostgreSQL Replication Slot - Lag and WAL retention
  3. Kafka Topics - Message flow and consumer lag
  4. CloudWatch Logs - Connector logs and errors
  5. Database Performance - Connection count and query performance

Essential Monitoring Queries

1. Replication Slot Health

Primary monitoring query:

SELECT 
    slot_name,
    plugin,
    slot_type,
    active,
    confirmed_flush_lsn,
    pg_current_wal_lsn() as current_lsn,
    pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn)) AS lag,
    pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn) as lag_bytes,
    restart_lsn,
    pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS wal_retained,
    CASE 
        WHEN pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn) < 10485760 THEN 'HEALTHY'
        WHEN pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn) < 104857600 THEN 'WARNING'
        ELSE 'CRITICAL'
    END as status
FROM pg_replication_slots
WHERE slot_name = 'debezium_slot';

Expected output:

   slot_name   |  plugin  | slot_type | active | confirmed_flush_lsn | current_lsn |  lag   | lag_bytes | restart_lsn | wal_retained | status  
---------------+----------+-----------+--------+---------------------+-------------+--------+-----------+-------------+--------------+---------
 debezium_slot | pgoutput | logical   | t      | 0/ABC12345          | 0/ABC12400  | 187 kB | 187000    | 0/ABC12300  | 256 kB       | HEALTHY

Health Thresholds:

  • HEALTHY: < 10 MB lag
  • WARNING: 10 MB - 100 MB lag
  • CRITICAL: > 100 MB lag

2. Active Replication Connections

SELECT 
    pid,
    application_name,
    client_addr,
    state,
    backend_type,
    backend_start,
    state_change
FROM pg_stat_activity
WHERE backend_type = 'walsender';

Expected output:

  pid  |  application_name   | client_addr  |   state   | backend_type |      backend_start       |       state_change        
-------+---------------------+--------------+-----------+--------------+--------------------------+---------------------------
 12345 | debezium_slot       | 10.0.1.50    | streaming | walsender    | 2026-03-20 08:00:00+00   | 2026-03-20 08:00:05+00

3. Publication Status

-- List all publications
SELECT * FROM pg_publication;

-- List tables in publication
SELECT schemaname, tablename 
FROM pg_publication_tables 
WHERE pubname = 'debezium_publication'
ORDER BY schemaname, tablename;

4. WAL Disk Usage

SELECT 
    slot_name,
    pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS wal_retained,
    pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn) / 1024 / 1024 as wal_retained_mb
FROM pg_replication_slots
WHERE slot_name = 'debezium_slot';

5. Heartbeat Freshness (If Using Heartbeat Table)

SELECT 
    id,
    ts as last_heartbeat,
    NOW() - ts as age,
    CASE 
        WHEN NOW() - ts < INTERVAL '1 minute' THEN 'HEALTHY'
        WHEN NOW() - ts < INTERVAL '5 minutes' THEN 'WARNING'
        ELSE 'CRITICAL'
    END as status
FROM public.debezium_heartbeat
ORDER BY ts DESC
LIMIT 1;

6. Table Activity Statistics

SELECT 
    schemaname,
    relname as table_name,
    n_tup_ins as inserts,
    n_tup_upd as updates,
    n_tup_del as deletes,
    n_live_tup as live_rows,
    n_dead_tup as dead_rows,
    last_vacuum,
    last_autovacuum
FROM pg_stat_user_tables
WHERE schemaname = 'public'
ORDER BY n_tup_ins + n_tup_upd + n_tup_del DESC;

AWS Monitoring

1. Check Connector State

# Get connector state
aws kafkaconnect describe-connector \
  --connector-arn <connector-arn> \
  --region us-east-1 \
  --query 'connectorState' \
  --output text

Possible states:

  • CREATING - Connector being created
  • RUNNING - Healthy and operational
  • UPDATING - Configuration being updated
  • FAILED - Connector failed (requires investigation)
  • DELETING - Being deleted

2. Get Connector Details

aws kafkaconnect describe-connector \
  --connector-arn <connector-arn> \
  --region us-east-1 \
  --query '{
    State: connectorState,
    Version: currentVersion,
    Capacity: capacity.provisionedCapacity,
    CreatedAt: creationTime
  }'

3. Monitor CloudWatch Logs

Tail logs in real-time:

aws logs tail /aws/mskconnect/rds-postgres-debezium \
  --since 5m \
  --follow \
  --region us-east-1

Search for errors:

aws logs filter-log-events \
  --log-group-name /aws/mskconnect/rds-postgres-debezium \
  --filter-pattern "ERROR" \
  --start-time $(date -u -d '1 hour ago' +%s)000 \
  --region us-east-1

Check throughput:

aws logs tail /aws/mskconnect/rds-postgres-debezium \
  --since 5m \
  --region us-east-1 | grep "records sent"

Expected output:

[INFO] 150 records sent during previous 00:00:30.123, last recorded offset is {lsn=42633788024}

4. Kafka Topic Monitoring

List topics:

BOOTSTRAP_SERVERS=$(aws kafka get-bootstrap-brokers \
  --cluster-arn <msk-cluster-arn> \
  --region us-east-1 \
  --query 'BootstrapBrokerString' \
  --output text)

kafka-topics.sh --bootstrap-server $BOOTSTRAP_SERVERS --list | grep cdc

Check topic details:

kafka-topics.sh --bootstrap-server $BOOTSTRAP_SERVERS \
  --describe --topic cdc.public.orders

Get message count:

kafka-run-class.sh kafka.tools.GetOffsetShell \
  --broker-list $BOOTSTRAP_SERVERS \
  --topic cdc.public.orders | awk -F ":" '{sum += $3} END {print sum}'

CloudWatch Alarms

1. Replication Lag Alarm

Create a custom metric from CloudWatch Logs Insights:

Query:

fields @timestamp, @message
| filter @message like /last recorded offset/
| parse @message /lsn=(?<lsn>\d+)/
| stats latest(lsn) as latest_lsn by bin(5m)

Alarm Configuration:

aws cloudwatch put-metric-alarm \
  --alarm-name "Debezium-Replication-Lag-High" \
  --alarm-description "Alert when replication lag exceeds 100MB" \
  --metric-name ReplicationLagBytes \
  --namespace Custom/Debezium \
  --statistic Average \
  --period 300 \
  --threshold 104857600 \
  --comparison-operator GreaterThanThreshold \
  --evaluation-periods 2 \
  --region us-east-1

2. Connector State Alarm

Monitor connector state changes:

aws cloudwatch put-metric-alarm \
  --alarm-name "Debezium-Connector-Failed" \
  --alarm-description "Alert when connector enters FAILED state" \
  --metric-name ConnectorState \
  --namespace AWS/KafkaConnect \
  --statistic Maximum \
  --period 60 \
  --threshold 1 \
  --comparison-operator GreaterThanThreshold \
  --evaluation-periods 1 \
  --region us-east-1

3. Error Log Alarm

Create metric filter for errors:

aws logs put-metric-filter \
  --log-group-name /aws/mskconnect/rds-postgres-debezium \
  --filter-name DebeziumErrors \
  --filter-pattern "ERROR" \
  --metric-transformations \
    metricName=ErrorCount,\
metricNamespace=Custom/Debezium,\
metricValue=1 \
  --region us-east-1

Common Issues and Solutions

Issue 1: Connector in FAILED State

Symptoms:

  • Connector state shows FAILED
  • No messages flowing to Kafka
  • Replication slot inactive

Diagnosis:

# Check recent logs
aws logs tail /aws/mskconnect/rds-postgres-debezium \
  --since 30m | grep -i "error\|exception\|failed"

# Check connector details
aws kafkaconnect describe-connector \
  --connector-arn <connector-arn> \
  --region us-east-1

Common Causes and Solutions:

  1. Database Connection Failed

    • ErrorConnection refused or timeout
    • Solution: Check security groups, verify database endpoint, test connectivity
  2. Authentication Failed

    • Errorpassword authentication failed
    • Solution: Verify credentials, check user permissions
  3. Replication Slot Conflict

    • Errorreplication slot "debezium_slot" is active for PID 12345
    • Solution:
      SELECT pg_terminate_backend(12345);
      -- Or drop and recreate slot
      SELECT pg_drop_replication_slot('debezium_slot');
      
  4. Publication Missing

    • Errorpublication "debezium_publication" does not exist
    • Solution:
      CREATE PUBLICATION debezium_publication FOR TABLE public.orders, public.customers;
      

Issue 2: High Replication Lag

Symptoms:

  • Replication lag > 100 MB
  • WAL files accumulating
  • Disk space warnings

Diagnosis:

-- Check current lag
SELECT 
    slot_name,
    pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn)) AS lag,
    active
FROM pg_replication_slots
WHERE slot_name = 'debezium_slot';

-- Check table activity
SELECT schemaname, relname, n_tup_ins, n_tup_upd, n_tup_del
FROM pg_stat_user_tables
WHERE schemaname = 'public'
ORDER BY n_tup_ins + n_tup_upd + n_tup_del DESC;

Solutions:

  1. Implement Heartbeat (if not already done)

    • See Blog Post #3 for implementation
    • Prevents lag with idle replicated tables
  2. Increase Connector Capacity

    aws kafkaconnect update-connector \
      --connector-arn <connector-arn> \
      --current-version <version> \
      --capacity '{
        "provisionedCapacity": {
          "mcuCount": 2,
          "workerCount": 2
        }
      }' \
      --region us-east-1
    
  3. Optimize Table Selection

    • Remove unnecessary tables from table.include.list
    • Split large tables across multiple connectors
  4. Check Network Throughput

    • Verify network connectivity between connector and database
    • Check for network throttling or bandwidth limits

Issue 3: Snapshot Taking Too Long

Symptoms:

  • Connector stuck in snapshot phase
  • CloudWatch logs show slow progress
  • Initial data not appearing in Kafka

Diagnosis:

# Check snapshot progress
aws logs tail /aws/mskconnect/rds-postgres-debezium \
  --since 10m | grep -i snapshot

Solutions:

  1. Use Parallel Snapshots (for large tables)

    {
      "snapshot.max.threads": "4"
    }
    
  2. Skip Snapshot (if initial data not needed)

    {
      "snapshot.mode": "never"
    }
    
  3. Snapshot Specific Tables Only

    {
      "snapshot.include.collection.list": "public.orders"
    }
    

Issue 4: Duplicate Messages in Kafka

Symptoms:

  • Same CDC event appears multiple times
  • Consumer sees duplicate records

Cause: Connector restart during processing

Solution: This is expected behavior. Implement idempotent consumers:

# Example: Use LSN as deduplication key
processed_lsns = set()

for message in consumer:
    lsn = message.value['source']['lsn']
    
    if lsn in processed_lsns:
        continue  # Skip duplicate
    
    process_message(message)
    processed_lsns.add(lsn)

Issue 5: Schema Changes Not Captured

Symptoms:

  • New columns not appearing in Kafka messages
  • Schema mismatch errors

Diagnosis:

-- Check table structure
\d public.orders

-- Check publication
SELECT * FROM pg_publication_tables WHERE pubname = 'debezium_publication';

Solutions:

  1. Refresh Publication

    ALTER PUBLICATION debezium_publication REFRESH;
    
  2. Restart Connector

    # Update connector with same config to force restart
    aws kafkaconnect update-connector \
      --connector-arn <connector-arn> \
      --current-version <version> \
      --connector-configuration file://connector-config.json \
      --region us-east-1
    
  3. Enable Schema History

    {
      "schema.history.internal.kafka.topic": "schema-changes.cdc",
      "schema.history.internal.kafka.bootstrap.servers": "bootstrap-servers"
    }
    

Issue 6: Connector Consuming Too Many Database Connections

Symptoms:

  • Database connection limit reached
  • too many connections error

Diagnosis:

-- Check current connections
SELECT COUNT(*) as total_connections,
       COUNT(*) FILTER (WHERE application_name LIKE 'debezium%') as debezium_connections
FROM pg_stat_activity;

-- Check connection limit
SHOW max_connections;

Solutions:

  1. Reduce Tasks

    {
      "tasks.max": "1"
    }
    
  2. Increase Database Connection Limit

    • Modify RDS parameter group
    • Set max_connections to higher value
    • Reboot database
  3. Use Connection Pooling

    {
      "database.initial.statements": "SET application_name='debezium'"
    }
    

Issue 7: WAL Disk Space Full

Symptoms:

  • Database disk space at 100%
  • WAL files not being cleaned up
  • Database performance degraded

Diagnosis:

-- Check WAL retention
SELECT 
    slot_name,
    pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS wal_retained
FROM pg_replication_slots;

-- Check disk usage
SELECT pg_size_pretty(pg_database_size('testdb'));

Immediate Solutions:

  1. Drop Inactive Replication Slots

    -- List all slots
    SELECT * FROM pg_replication_slots;
    
    -- Drop inactive slot
    SELECT pg_drop_replication_slot('old_slot_name');
    
  2. Advance Slot Manually (emergency only)

    -- Advance to current LSN (data loss!)
    SELECT pg_replication_slot_advance('debezium_slot', pg_current_wal_lsn());
    
  3. Increase Disk Space

    • Modify RDS instance storage
    • Scale up to larger instance type

Long-term Solutions:

  • Implement heartbeat (Blog Post #3)
  • Set up monitoring and alerts
  • Regular slot health checks

Monitoring Dashboard

Create CloudWatch Dashboard

aws cloudwatch put-dashboard \
  --dashboard-name Debezium-CDC-Monitoring \
  --dashboard-body file://dashboard.json \
  --region us-east-1

dashboard.json:

{
  "widgets": [
    {
      "type": "metric",
      "properties": {
        "metrics": [
          ["Custom/Debezium", "ReplicationLagBytes"]
        ],
        "period": 300,
        "stat": "Average",
        "region": "us-east-1",
        "title": "Replication Lag"
      }
    },
    {
      "type": "log",
      "properties": {
        "query": "SOURCE '/aws/mskconnect/rds-postgres-debezium' | fields @timestamp, @message | filter @message like /ERROR/ | sort @timestamp desc | limit 20",
        "region": "us-east-1",
        "title": "Recent Errors"
      }
    }
  ]
}

Automated Health Check Script

Create a comprehensive health check script:

#!/bin/bash

echo "=== Debezium CDC Health Check ==="
echo "Timestamp: $(date)"
echo ""

# 1. Connector State
echo "1. Connector State:"
CONNECTOR_STATE=$(aws kafkaconnect describe-connector \
  --connector-arn $CONNECTOR_ARN \
  --region us-east-1 \
  --query 'connectorState' \
  --output text)
echo "   Status: $CONNECTOR_STATE"

if [ "$CONNECTOR_STATE" != "RUNNING" ]; then
  echo "   ⚠️  WARNING: Connector not in RUNNING state"
fi
echo ""

# 2. Replication Slot
echo "2. Replication Slot:"
psql -h $DB_HOST -U $DB_USER -d $DB_NAME << EOF
SELECT 
    slot_name,
    active,
    pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn)) AS lag,
    CASE 
        WHEN pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn) < 10485760 THEN '✅ HEALTHY'
        WHEN pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn) < 104857600 THEN '⚠️  WARNING'
        ELSE '❌ CRITICAL'
    END as status
FROM pg_replication_slots
WHERE slot_name = 'debezium_slot';
EOF
echo ""

# 3. Recent Errors
echo "3. Recent Errors (last 10 minutes):"
ERROR_COUNT=$(aws logs filter-log-events \
  --log-group-name /aws/mskconnect/rds-postgres-debezium \
  --filter-pattern "ERROR" \
  --start-time $(($(date +%s) - 600))000 \
  --region us-east-1 \
  --query 'length(events)' \
  --output text)

echo "   Error count: $ERROR_COUNT"
if [ "$ERROR_COUNT" -gt 0 ]; then
  echo "   ⚠️  Errors detected in logs"
fi
echo ""

# 4. Throughput
echo "4. Throughput (last 5 minutes):"
aws logs tail /aws/mskconnect/rds-postgres-debezium \
  --since 5m \
  --region us-east-1 | grep "records sent" | tail -1
echo ""

# 5. Heartbeat (if using heartbeat table)
echo "5. Heartbeat Status:"
psql -h $DB_HOST -U $DB_USER -d $DB_NAME << EOF
SELECT 
    ts as last_heartbeat,
    NOW() - ts as age,
    CASE 
        WHEN NOW() - ts < INTERVAL '1 minute' THEN '✅ HEALTHY'
        WHEN NOW() - ts < INTERVAL '5 minutes' THEN '⚠️  WARNING'
        ELSE '❌ CRITICAL'
    END as status
FROM public.debezium_heartbeat
ORDER BY ts DESC
LIMIT 1;
EOF

echo ""
echo "=== Health Check Complete ==="

Make it executable and schedule with cron:

chmod +x health-check.sh

# Run every 5 minutes
crontab -e
*/5 * * * * /path/to/health-check.sh >> /var/log/debezium-health.log 2>&1

Best Practices

1. Proactive Monitoring

  • Set up CloudWatch alarms for critical metrics
  • Monitor replication lag continuously
  • Track connector state changes
  • Review logs daily for warnings

2. Regular Health Checks

  • Run automated health checks every 5 minutes
  • Review weekly reports
  • Test failover scenarios monthly
  • Document baseline metrics

3. Capacity Planning

  • Monitor throughput trends
  • Plan for peak loads
  • Scale connector capacity proactively
  • Review table growth patterns

4. Incident Response

  • Document runbooks for common issues
  • Maintain escalation procedures
  • Keep contact information updated
  • Practice recovery procedures

5. Performance Optimization

  • Tune heartbeat.interval.ms based on workload
  • Optimize table.include.list
  • Use appropriate snapshot.mode
  • Monitor and adjust connector capacity

Troubleshooting Checklist

When issues occur, follow this systematic approach:

  • [ ] Check connector state in AWS Console
  • [ ] Review CloudWatch logs for errors
  • [ ] Verify replication slot is active
  • [ ] Check database connectivity
  • [ ] Verify publication includes all tables
  • [ ] Check Kafka topic existence
  • [ ] Review security group rules
  • [ ] Verify database user permissions
  • [ ] Check disk space on database
  • [ ] Review recent configuration changes

Conclusion

Effective monitoring and troubleshooting are essential for maintaining a reliable CDC pipeline. By implementing the monitoring queries, CloudWatch alarms, and automated health checks described in this guide, you can proactively identify and resolve issues before they impact your data pipeline. Regular monitoring combined with documented troubleshooting procedures ensures your Debezium CDC system remains healthy and performant.

Resources

No comments:

Post a Comment