Friday, March 20, 2026

Verifying Debezium CDC: Testing Initial Snapshot and Change Data Capture

 

ntroduction

After setting up your Debezium PostgreSQL connector on Amazon MSK Connect, it's crucial to verify that both the initial snapshot and ongoing change data capture (CDC) are working correctly. This guide provides comprehensive testing procedures to ensure your CDC pipeline is functioning as expected.

Understanding Debezium Phases

Debezium operates in two distinct phases:

  1. Snapshot Phase: Captures the current state of all tables in table.include.list
  2. Streaming Phase: Continuously captures INSERT, UPDATE, and DELETE operations

Prerequisites

Before testing, ensure:

  • Debezium connector is in RUNNING state
  • Replication slot is active in PostgreSQL
  • Kafka topics are created (if auto-create is disabled)
  • You have access to Kafka consumer tools

Phase 1: Verify Initial Snapshot

Step 1: Check Connector State

aws kafkaconnect describe-connector \
  --connector-arn <your-connector-arn> \
  --region us-east-1 \
  --query 'connectorState' \
  --output text

Expected output: RUNNING

Step 2: Monitor CloudWatch Logs for Snapshot Progress

aws logs tail /aws/mskconnect/rds-postgres-debezium \
  --since 10m \
  --region us-east-1 | grep -i snapshot

Look for these log entries indicating snapshot progress:

[INFO] Snapshot step 1 - Preparing
[INFO] Snapshot step 2 - Determining captured tables
[INFO] Snapshot step 3 - Locking captured tables
[INFO] Snapshot step 4 - Determining snapshot offset
[INFO] Snapshot step 5 - Reading structure of captured tables
[INFO] Snapshot step 6 - Persisting schema history
[INFO] Snapshot step 7 - Snapshotting data
[INFO] Snapshot step 8 - Finalizing
[INFO] Snapshot completed successfully

Step 3: Verify Replication Slot Created

Connect to PostgreSQL and check the replication slot:

SELECT slot_name, 
       plugin, 
       slot_type, 
       active,
       confirmed_flush_lsn,
       pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn)) AS lag
FROM pg_replication_slots
WHERE slot_name = 'debezium_slot';

Expected output:

   slot_name   |  plugin  | slot_type | active | confirmed_flush_lsn |  lag   
---------------+----------+-----------+--------+---------------------+--------
 debezium_slot | pgoutput | logical   | t      | 0/ABC12345          | 0 bytes

Key indicators:

  • active = t (true) - Connector is connected
  • lag should be minimal (< 1 MB for idle systems)

Step 4: Verify Kafka Topics Created

List Kafka topics to confirm they were created:

# Get bootstrap servers
BOOTSTRAP_SERVERS=$(aws kafka get-bootstrap-brokers \
  --cluster-arn <your-msk-cluster-arn> \
  --region us-east-1 \
  --query 'BootstrapBrokerString' \
  --output text)

# List topics
kafka-topics.sh --bootstrap-server $BOOTSTRAP_SERVERS --list | grep cdc

Expected topics:

cdc.public.orders
cdc.public.customers

Step 5: Consume Snapshot Data from Kafka

Consume messages from a topic to verify snapshot data:

kafka-console-consumer.sh \
  --bootstrap-server $BOOTSTRAP_SERVERS \
  --topic cdc.public.orders \
  --from-beginning \
  --max-messages 5

Snapshot Message Structure:

{
  "before": null,
  "after": {
    "order_id": 1,
    "customer_id": 101,
    "order_date": "2026-03-20T08:30:00Z",
    "total_amount": 99.99,
    "status": "pending"
  },
  "source": {
    "version": "3.2.6.Final",
    "connector": "postgresql",
    "name": "cdc",
    "ts_ms": 1773991234567,
    "snapshot": "true",
    "db": "testdb",
    "schema": "public",
    "table": "orders",
    "txId": null,
    "lsn": null
  },
  "op": "r",
  "ts_ms": 1773991234567
}

Key fields to verify:

  • "snapshot": "true" - Indicates this is snapshot data
  • "op": "r" - Read operation (snapshot)
  • "before": null - No previous state for snapshot
  • "after": {...} - Current row data

Step 6: Verify Row Count Matches

Compare row counts between PostgreSQL and Kafka:

PostgreSQL:

SELECT COUNT(*) FROM public.orders;
SELECT COUNT(*) FROM public.customers;

Kafka (count messages):

# Count messages in orders topic
kafka-run-class.sh kafka.tools.GetOffsetShell \
  --broker-list $BOOTSTRAP_SERVERS \
  --topic cdc.public.orders | awk -F ":" '{sum += $3} END {print sum}'

Row counts should match.

Phase 2: Verify CDC Operations

Now test that ongoing changes are captured correctly.

Test 1: INSERT Operation

Step 1: Insert Data in PostgreSQL

INSERT INTO public.customers (customer_name, email) 
VALUES ('Jane Smith', 'jane@example.com')
RETURNING customer_id;

Note the returned customer_id (e.g., 2).

Step 2: Consume from Kafka

kafka-console-consumer.sh \
  --bootstrap-server $BOOTSTRAP_SERVERS \
  --topic cdc.public.customers \
  --from-beginning \
  --max-messages 10

Step 3: Verify INSERT Message

Look for the new record:

{
  "before": null,
  "after": {
    "customer_id": 2,
    "customer_name": "Jane Smith",
    "email": "jane@example.com",
    "created_at": "2026-03-20T08:45:00Z"
  },
  "source": {
    "version": "3.2.6.Final",
    "connector": "postgresql",
    "name": "cdc",
    "ts_ms": 1773991500000,
    "snapshot": "false",
    "db": "testdb",
    "sequence": "[\"0/ABC12345\",\"0/ABC12346\"]",
    "schema": "public",
    "table": "customers",
    "txId": 12345,
    "lsn": 2886123456
  },
  "op": "c",
  "ts_ms": 1773991500123
}

Verify:

  • ✅ "op": "c" - Create (INSERT) operation
  • ✅ "snapshot": "false" - Streaming mode
  • ✅ "before": null - No previous state
  • ✅ "after" contains new row data
  • ✅ txId and lsn are present

Test 2: UPDATE Operation

Step 1: Update Data in PostgreSQL

UPDATE public.customers 
SET email = 'jane.smith@example.com' 
WHERE customer_id = 2;

Step 2: Consume from Kafka

kafka-console-consumer.sh \
  --bootstrap-server $BOOTSTRAP_SERVERS \
  --topic cdc.public.customers \
  --offset latest \
  --partition 0 \
  --max-messages 1

Step 3: Verify UPDATE Message

{
  "before": {
    "customer_id": 2,
    "customer_name": "Jane Smith",
    "email": "jane@example.com",
    "created_at": "2026-03-20T08:45:00Z"
  },
  "after": {
    "customer_id": 2,
    "customer_name": "Jane Smith",
    "email": "jane.smith@example.com",
    "created_at": "2026-03-20T08:45:00Z"
  },
  "source": {
    "version": "3.2.6.Final",
    "connector": "postgresql",
    "name": "cdc",
    "ts_ms": 1773991600000,
    "snapshot": "false",
    "db": "testdb",
    "sequence": "[\"0/ABC12346\",\"0/ABC12347\"]",
    "schema": "public",
    "table": "customers",
    "txId": 12346,
    "lsn": 2886123457
  },
  "op": "u",
  "ts_ms": 1773991600234
}

Verify:

  • ✅ "op": "u" - Update operation
  • ✅ "before" contains old values
  • ✅ "after" contains new values
  • ✅ Only email field changed

Test 3: DELETE Operation

Step 1: Delete Data in PostgreSQL

DELETE FROM public.customers WHERE customer_id = 2;

Step 2: Consume from Kafka

kafka-console-consumer.sh \
  --bootstrap-server $BOOTSTRAP_SERVERS \
  --topic cdc.public.customers \
  --offset latest \
  --partition 0 \
  --max-messages 1

Step 3: Verify DELETE Message

{
  "before": {
    "customer_id": 2,
    "customer_name": "Jane Smith",
    "email": "jane.smith@example.com",
    "created_at": "2026-03-20T08:45:00Z"
  },
  "after": null,
  "source": {
    "version": "3.2.6.Final",
    "connector": "postgresql",
    "name": "cdc",
    "ts_ms": 1773991700000,
    "snapshot": "false",
    "db": "testdb",
    "sequence": "[\"0/ABC12347\",\"0/ABC12348\"]",
    "schema": "public",
    "table": "customers",
    "txId": 12347,
    "lsn": 2886123458
  },
  "op": "d",
  "ts_ms": 1773991700345
}

Verify:

  • ✅ "op": "d" - Delete operation
  • ✅ "before" contains deleted row data
  • ✅ "after": null - No new state
  • ✅ Tombstone message may follow (if enabled)

Test 4: Bulk Operations

Test with multiple operations in a transaction:

BEGIN;

INSERT INTO public.orders (customer_id, total_amount, status) 
VALUES (1, 150.00, 'pending');

INSERT INTO public.orders (customer_id, total_amount, status) 
VALUES (1, 200.00, 'pending');

UPDATE public.orders SET status = 'confirmed' WHERE customer_id = 1;

COMMIT;

Verify all operations appear in Kafka with the same txId.

Phase 3: Verify Data Consistency

Check 1: Message Ordering

Verify messages are ordered by LSN:

kafka-console-consumer.sh \
  --bootstrap-server $BOOTSTRAP_SERVERS \
  --topic cdc.public.orders \
  --from-beginning \
  --property print.key=true \
  --max-messages 10 | jq '.source.lsn'

LSN values should be monotonically increasing.

Check 2: No Message Loss

Compare database transaction count with Kafka message count:

PostgreSQL:

SELECT COUNT(*) as total_operations FROM (
  SELECT 'insert' as op, COUNT(*) as cnt FROM public.orders
  UNION ALL
  SELECT 'update', COUNT(*) FROM pg_stat_user_tables WHERE relname = 'orders'
) t;

Kafka: Count all messages (snapshot + streaming)

Check 3: Schema Consistency

Verify the schema in Kafka matches PostgreSQL:

-- PostgreSQL schema
\d public.orders

Compare with after field structure in Kafka messages.

Phase 4: Performance Verification

Check 1: Replication Lag

Monitor replication lag to ensure it stays minimal:

SELECT slot_name,
       pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn)) AS lag,
       pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn) AS lag_bytes
FROM pg_replication_slots
WHERE slot_name = 'debezium_slot';

Healthy lag: < 10 MB for active systems

Check 2: Connector Throughput

Check CloudWatch logs for throughput metrics:

aws logs tail /aws/mskconnect/rds-postgres-debezium \
  --since 5m \
  --region us-east-1 | grep "records sent"

Example output:

[INFO] 150 records sent during previous 00:00:30.123

Check 3: WAL Growth

Monitor WAL disk usage:

SELECT pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS wal_retained
FROM pg_replication_slots
WHERE slot_name = 'debezium_slot';

Troubleshooting Common Issues

Issue 1: No Messages in Kafka

Symptoms:

  • Connector is RUNNING
  • Replication slot is active
  • But no messages in Kafka topics

Diagnosis:

# Check CloudWatch logs
aws logs tail /aws/mskconnect/rds-postgres-debezium --since 10m

# Check for errors
aws logs filter-log-events \
  --log-group-name /aws/mskconnect/rds-postgres-debezium \
  --filter-pattern "ERROR" \
  --since 10m

Common Causes:

  1. Tables not in publication
  2. Snapshot still in progress
  3. Network connectivity issues

Solution:

-- Verify tables in publication
SELECT schemaname, tablename 
FROM pg_publication_tables 
WHERE pubname = 'debezium_publication';

-- Add missing tables
ALTER PUBLICATION debezium_publication ADD TABLE public.missing_table;

Issue 2: Snapshot Data Missing

Symptoms: Streaming works but initial snapshot incomplete

Diagnosis:

# Check snapshot completion
aws logs tail /aws/mskconnect/rds-postgres-debezium \
  --since 30m | grep -i "snapshot.*complete"

Solution: If snapshot failed, restart connector or use snapshot.mode=always to force re-snapshot

Issue 3: Duplicate Messages

Symptoms: Same operation appears multiple times in Kafka

Cause: Connector restart during processing

Solution: This is expected behavior. Implement idempotent consumers using the source.lsn field as a deduplication key.

Issue 4: Schema Mismatch

Symptoms: Kafka messages missing columns

Cause: Schema change not captured

Solution:

-- Refresh publication
ALTER PUBLICATION debezium_publication REFRESH;

-- Restart connector to pick up schema changes

Automated Verification Script

Create a script to automate verification:

#!/bin/bash

echo "=== Debezium CDC Verification ==="

# 1. Check connector state
echo "1. Connector State:"
aws kafkaconnect describe-connector \
  --connector-arn $CONNECTOR_ARN \
  --region us-east-1 \
  --query 'connectorState' \
  --output text

# 2. Check replication slot
echo "2. Replication Slot:"
psql -h $DB_HOST -U $DB_USER -d $DB_NAME -c \
  "SELECT slot_name, active, pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn)) AS lag FROM pg_replication_slots WHERE slot_name = 'debezium_slot';"

# 3. Test INSERT
echo "3. Testing INSERT..."
psql -h $DB_HOST -U $DB_USER -d $DB_NAME -c \
  "INSERT INTO public.customers (customer_name, email) VALUES ('Test User', 'test@example.com');"

sleep 5

# 4. Verify in Kafka
echo "4. Verifying in Kafka..."
kafka-console-consumer.sh \
  --bootstrap-server $BOOTSTRAP_SERVERS \
  --topic cdc.public.customers \
  --offset latest \
  --partition 0 \
  --max-messages 1 \
  --timeout-ms 10000

echo "=== Verification Complete ==="

Best Practices for Verification

  1. Test in Non-Production First: Always verify CDC in a test environment before production

  2. Monitor Continuously: Set up CloudWatch alarms for:

    • Connector state changes
    • Replication lag > threshold
    • Error log entries
  3. Validate Data Types: Ensure PostgreSQL data types map correctly to Kafka/JSON

  4. Test Edge Cases:

    • NULL values
    • Large text fields
    • Binary data (bytea)
    • Arrays and JSON columns
  5. Document Baseline: Record normal throughput and lag metrics for comparison

Next Steps

Now that you've verified CDC is working:

  1. Set Up Heartbeat: Prevent replication lag with idle tables (Blog Post #3)
  2. Implement Monitoring: Set up comprehensive monitoring and alerting (Blog Post #4)
  3. Build Consumers: Create applications to consume and process CDC events

Conclusion

Thorough verification of your Debezium CDC pipeline ensures data integrity and reliability. By testing both the initial snapshot and ongoing change capture, you can confidently deploy CDC to production. Regular monitoring and testing should be part of your operational procedures to catch issues early.

Resources

No comments:

Post a Comment