Friday, March 20, 2026

Setting Up Debezium PostgreSQL Connector on Amazon MSK Connect

 

Introduction

Change Data Capture (CDC) is essential for real-time data integration, enabling you to capture and stream database changes to downstream systems. Amazon MSK Connect provides a fully managed service to run Apache Kafka Connect connectors, including Debezium for PostgreSQL CDC. This guide walks you through setting up a Debezium PostgreSQL connector on MSK Connect to capture changes from Amazon Aurora PostgreSQL or Amazon RDS PostgreSQL.

Architecture Overview

Our CDC pipeline consists of:

  • Source: Amazon Aurora PostgreSQL 17.5 (or RDS PostgreSQL)
  • Target: Amazon MSK (Kafka 3.7.x)
  • CDC Tool: Debezium PostgreSQL Connector 3.2.6
  • Deployment: MSK Connect (fully managed)

Prerequisites

Before you begin, ensure you have:

  1. Amazon Aurora PostgreSQL or RDS PostgreSQL with:

    • PostgreSQL version 10 or higher
    • Logical replication enabled (rds.logical_replication = 1)
    • Appropriate security groups allowing connectivity
  2. Amazon MSK Cluster with:

    • Kafka version 3.7.x or compatible
    • Appropriate security groups
    • Bootstrap servers accessible from MSK Connect
  3. AWS CLI configured with appropriate permissions

  4. IAM Roles with permissions for:

    • MSK Connect operations
    • S3 access (for custom plugin storage)
    • CloudWatch Logs (for connector logging)

Step 1: Prepare PostgreSQL Database

1.1 Enable Logical Replication

For RDS/Aurora PostgreSQL, modify the parameter group:

-- Check current setting
SHOW wal_level;

-- If not 'logical', modify parameter group
-- In AWS Console: RDS > Parameter Groups > Edit
-- Set: rds.logical_replication = 1
-- Reboot the database instance

1.2 Create Replication User (Optional)

Create a dedicated user for Debezium:

-- Create user
CREATE USER debezium_user WITH PASSWORD 'your_secure_password';

-- Grant replication privileges
ALTER USER debezium_user WITH REPLICATION;

-- Grant schema access
GRANT USAGE ON SCHEMA public TO debezium_user;

-- Grant table access
GRANT SELECT ON ALL TABLES IN SCHEMA public TO debezium_user;
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO debezium_user;

1.3 Create Publication

Create a PostgreSQL publication for the tables you want to replicate:

-- Create publication for specific tables
CREATE PUBLICATION debezium_publication FOR TABLE public.orders, public.customers;

-- Or create publication for all tables
CREATE PUBLICATION debezium_publication FOR ALL TABLES;

-- Verify publication
SELECT * FROM pg_publication;
SELECT schemaname, tablename 
FROM pg_publication_tables 
WHERE pubname = 'debezium_publication';

1.4 Create Test Tables

-- Create sample tables
CREATE TABLE public.orders (
    order_id SERIAL PRIMARY KEY,
    customer_id INTEGER NOT NULL,
    order_date TIMESTAMP WITH TIME ZONE DEFAULT now(),
    total_amount DECIMAL(10,2),
    status VARCHAR(50)
);

CREATE TABLE public.customers (
    customer_id SERIAL PRIMARY KEY,
    customer_name VARCHAR(100) NOT NULL,
    email VARCHAR(100),
    created_at TIMESTAMP WITH TIME ZONE DEFAULT now()
);

-- Insert sample data
INSERT INTO public.customers (customer_name, email) 
VALUES ('John Doe', 'john@example.com');

INSERT INTO public.orders (customer_id, total_amount, status) 
VALUES (1, 99.99, 'pending');

Step 2: Create Debezium Custom Plugin

2.1 Download Debezium Connector

# Create working directory
mkdir -p ~/debezium-plugin
cd ~/debezium-plugin

# Download Debezium PostgreSQL connector from Confluent Hub
# Version 3.2.6 compatible with Kafka 3.7.x
wget https://d1i4a15mxbxib1.cloudfront.net/api/plugins/debezium/debezium-connector-postgresql/versions/3.2.6/debezium-debezium-connector-postgresql-3.2.6.zip

# Unzip
unzip debezium-debezium-connector-postgresql-3.2.6.zip

2.2 Upload to S3

# Create S3 bucket (if not exists)
aws s3 mb s3://your-msk-connect-plugins --region us-east-1

# Create ZIP file for MSK Connect
cd debezium-debezium-connector-postgresql-3.2.6
zip -r ../debezium-postgres-connector.zip .
cd ..

# Upload to S3
aws s3 cp debezium-postgres-connector.zip \
  s3://your-msk-connect-plugins/debezium-postgres-connector.zip \
  --region us-east-1

2.3 Create Custom Plugin in MSK Connect

aws kafkaconnect create-custom-plugin \
  --custom-plugin-name debezium-postgres-3-2-6 \
  --content-type ZIP \
  --location s3Location={bucketArn=arn:aws:s3:::your-msk-connect-plugins,fileKey=debezium-postgres-connector.zip} \
  --region us-east-1

Wait for the plugin to be in ACTIVE state:

aws kafkaconnect describe-custom-plugin \
  --custom-plugin-arn <plugin-arn> \
  --region us-east-1 \
  --query 'customPluginState'

Step 3: Create Worker Configuration

Create a worker configuration for MSK Connect:

aws kafkaconnect create-worker-configuration \
  --name debezium-worker-config \
  --properties-file-content "$(cat <<EOF
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
EOF
)" \
  --region us-east-1

Note: We disable schemas for simpler JSON output. Enable schemas if you need Avro or schema registry integration.

Step 4: Create Kafka Topics (If Auto-Create Disabled)

If your MSK cluster has auto.create.topics.enable=false, manually create topics:

# Get bootstrap servers
BOOTSTRAP_SERVERS=$(aws kafka get-bootstrap-brokers \
  --cluster-arn <your-msk-cluster-arn> \
  --region us-east-1 \
  --query 'BootstrapBrokerString' \
  --output text)

# Create topics for each table
kafka-topics.sh --bootstrap-server $BOOTSTRAP_SERVERS \
  --create --topic cdc.public.orders \
  --partitions 1 --replication-factor 2

kafka-topics.sh --bootstrap-server $BOOTSTRAP_SERVERS \
  --create --topic cdc.public.customers \
  --partitions 1 --replication-factor 2

Step 5: Create MSK Connect Connector

5.1 Prepare Connector Configuration

Create a file connector-config.json:

{
  "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
  "database.hostname": "your-rds-endpoint.rds.amazonaws.com",
  "database.port": "5432",
  "database.user": "postgres",
  "database.password": "your_secure_password",
  "database.dbname": "your_database",
  "database.server.name": "rds_pg",
  "database.sslmode": "require",
  "table.include.list": "public.orders,public.customers",
  "plugin.name": "pgoutput",
  "slot.name": "debezium_slot",
  "publication.name": "debezium_publication",
  "topic.prefix": "cdc",
  "tasks.max": "1",
  "snapshot.mode": "initial"
}

Key Parameters Explained:

ParameterDescriptionValue
database.hostnameRDS/Aurora endpointYour database endpoint
database.sslmodeSSL mode for connectionrequire for RDS/Aurora
table.include.listTables to replicateComma-separated list
plugin.nameLogical decoding pluginpgoutput (native PostgreSQL)
slot.nameReplication slot nameUnique identifier
publication.namePostgreSQL publicationMust match created publication
topic.prefixKafka topic prefixPrepended to table names
snapshot.modeInitial snapshot behaviorinitial for first-time setup

5.2 Create Connector

aws kafkaconnect create-connector \
  --connector-name rds-postgres-debezium \
  --kafka-connect-version "3.7.x" \
  --capacity '{
    "provisionedCapacity": {
      "mcuCount": 1,
      "workerCount": 1
    }
  }' \
  --connector-configuration file://connector-config.json \
  --kafka-cluster '{
    "apacheKafkaCluster": {
      "bootstrapServers": "b-1.your-msk.amazonaws.com:9092,b-2.your-msk.amazonaws.com:9092",
      "vpc": {
        "subnets": ["subnet-xxx", "subnet-yyy"],
        "securityGroups": ["sg-xxx"]
      }
    }
  }' \
  --kafka-cluster-client-authentication '{
    "authenticationType": "NONE"
  }' \
  --kafka-cluster-encryption-in-transit '{
    "encryptionType": "PLAINTEXT"
  }' \
  --plugins '[{
    "customPlugin": {
      "customPluginArn": "arn:aws:kafkaconnect:us-east-1:xxx:custom-plugin/debezium-postgres-3-2-6/xxx",
      "revision": 1
    }
  }]' \
  --service-execution-role-arn "arn:aws:iam::xxx:role/MSKConnectRole" \
  --worker-configuration '{
    "workerConfigurationArn": "arn:aws:kafkaconnect:us-east-1:xxx:worker-configuration/debezium-worker-config/xxx",
    "revision": 1
  }' \
  --log-delivery '{
    "workerLogDelivery": {
      "cloudWatchLogs": {
        "enabled": true,
        "logGroup": "/aws/mskconnect/rds-postgres-debezium"
      }
    }
  }' \
  --region us-east-1

5.3 Monitor Connector Creation

# Check connector state
aws kafkaconnect describe-connector \
  --connector-arn <connector-arn> \
  --region us-east-1 \
  --query 'connectorState'

# Expected states: CREATING → RUNNING

Wait for the connector to reach RUNNING state (typically 2-5 minutes).

Step 6: Verify Replication Slot

Check that Debezium created the replication slot:

-- View replication slots
SELECT slot_name, 
       plugin, 
       slot_type, 
       active,
       confirmed_flush_lsn,
       pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn)) AS lag
FROM pg_replication_slots;

Expected output:

   slot_name   |  plugin  | slot_type | active | confirmed_flush_lsn |  lag   
---------------+----------+-----------+--------+---------------------+--------
 debezium_slot | pgoutput | logical   | t      | 0/1234567          | 0 bytes

Step 7: Monitor CloudWatch Logs

View connector logs to verify successful startup:

aws logs tail /aws/mskconnect/rds-postgres-debezium \
  --since 5m \
  --follow \
  --region us-east-1

Look for:

  • Snapshot step 1 - Preparing - Snapshot starting
  • Snapshot step 8 - Finalizing - Snapshot complete
  • Streaming requested from LSN - Streaming mode active
  • records sent during previous - Data flowing

Common Issues and Solutions

Issue 1: Connector Fails to Start

Symptom: Connector state is FAILED

Check:

aws logs tail /aws/mskconnect/rds-postgres-debezium --since 10m

Common Causes:

  • Incorrect database credentials
  • Network connectivity issues (security groups)
  • Publication doesn't exist
  • Insufficient database permissions

Issue 2: Replication Slot Not Created

Symptom: No replication slot in pg_replication_slots

Solution: Check connector logs for permission errors. User needs REPLICATION privilege:

ALTER USER debezium_user WITH REPLICATION;

Issue 3: Topics Not Created

Symptom: Connector running but no Kafka topics

Solution: If MSK has auto-create disabled, manually create topics (see Step 4)

Issue 4: SSL Connection Errors

SymptomSSL connection required error

Solution: Ensure database.sslmode=require in connector configuration

Best Practices

  1. Use Dedicated Replication User: Create a separate user with minimal required permissions

  2. Monitor Replication Lag: Set up CloudWatch alarms for replication slot lag

  3. Enable CloudWatch Logs: Essential for troubleshooting

  4. Start with Small Table Set: Test with a few tables before adding all tables

  5. Use Appropriate Capacity: Start with 1 MCU and 1 worker, scale as needed

  6. Snapshot Mode Selection:

    • initial: First-time setup (recommended)
    • data_only: Skip snapshot, stream only
    • initial_only: Snapshot then stop
  7. Topic Naming: Use consistent topic.prefix for easy identification

Next Steps

Now that your connector is running:

  1. Verify CDC Operations: Test INSERT, UPDATE, DELETE operations (see Blog Post #2)
  2. Set Up Heartbeat: Prevent replication lag with idle tables (see Blog Post #3)
  3. Monitor and Troubleshoot: Set up monitoring and alerts (see Blog Post #4)

Conclusion

You've successfully set up a Debezium PostgreSQL connector on Amazon MSK Connect! The connector is now capturing changes from your PostgreSQL database and streaming them to Kafka topics. This fully managed solution eliminates the operational overhead of running Kafka Connect yourself while providing enterprise-grade reliability and scalability.

Resources

No comments:

Post a Comment