Friday, March 20, 2026

Building a Real-Time Data Lake: Kafka to S3 Iceberg with MSK Connect

 

Introduction

After capturing database changes with Debezium, the next challenge is storing them in a queryable, scalable data lake. Apache Iceberg provides ACID transactions, schema evolution, and time travel capabilities—perfect for a modern data lake. This guide demonstrates how to stream CDC events from Kafka to S3 Iceberg tables using Amazon MSK Connect and the Iceberg Kafka Connect sink connector.

Architecture Overview

PostgreSQL (RDS/Aurora)
    ↓ Debezium CDC
Kafka Topics (MSK)
    ↓ Iceberg Sink Connector
S3 Iceberg Tables (Glue Catalog)
    ↓ Query
Athena / Spark / Trino

Why Apache Iceberg?

Apache Iceberg is a high-performance table format for huge analytic datasets that provides:

  • ✅ ACID Transactions - Reliable writes and reads
  • ✅ Schema Evolution - Add, drop, rename columns safely
  • ✅ Time Travel - Query historical data snapshots
  • ✅ Partition Evolution - Change partitioning without rewriting data
  • ✅ Hidden Partitioning - No partition predicates in queries
  • ✅ Incremental Reads - Efficient change data capture
  • ✅ Compaction - Automatic small file optimization

Prerequisites

Before starting, ensure you have:

  1. Debezium connector running on MSK Connect (see Blog Post #1)
  2. Kafka topics with CDC events (cdc.public.orders)
  3. S3 bucket for Iceberg data storage
  4. AWS Glue for metadata catalog
  5. IAM roles with appropriate permissions

Use Case: Orders Table

We'll implement the orders table CDC pipeline using the official AWS solution with MSK Connect and Iceberg Kafka Connect.

Sample Data Flow

PostgreSQL Operations:

-- INSERT
INSERT INTO orders (customer_id, order_date, total_amount, status)
VALUES (501, '2026-03-20', 299.99, 'pending');

-- UPDATE
UPDATE orders SET total_amount = 350.00, status = 'completed' 
WHERE order_id = 1001;

-- DELETE
DELETE FROM orders WHERE order_id = 1001;

Kafka CDC Events:

INSERT Event (op: "c"):

{
  "before": null,
  "after": {
    "order_id": 1001,
    "customer_id": 501,
    "order_date": "2026-03-20T10:30:00Z",
    "total_amount": 299.99,
    "status": "pending"
  },
  "op": "c",
  "ts_ms": 1773991800000
}

UPDATE Event (op: "u"):

{
  "before": {
    "order_id": 1001,
    "customer_id": 501,
    "order_date": "2026-03-20T10:30:00Z",
    "total_amount": 299.99,
    "status": "pending"
  },
  "after": {
    "order_id": 1001,
    "customer_id": 501,
    "order_date": "2026-03-20T10:30:00Z",
    "total_amount": 350.00,
    "status": "completed"
  },
  "op": "u",
  "ts_ms": 1773991900000
}

DELETE Event (op: "d"):

{
  "before": {
    "order_id": 1001,
    "customer_id": 501,
    "order_date": "2026-03-20T10:30:00Z",
    "total_amount": 350.00,
    "status": "completed"
  },
  "after": null,
  "op": "d",
  "ts_ms": 1773992000000
}

Iceberg Table Behavior:

  • INSERT: New row added to Iceberg table
  • UPDATE: Iceberg performs upsert based on primary key (if configured) or appends new version
  • DELETE: Row marked as deleted (soft delete) or physically removed based on configuration

Step 1: Build Iceberg Kafka Connect Plugin

1.1 Build from Apache Iceberg Source

The Iceberg Kafka Connect connector is not available as a pre-built JAR in Maven Central. You need to build it from the official Apache Iceberg source:

# Create working directory
mkdir -p ~/iceberg-kafka-connect
cd ~/iceberg-kafka-connect

# Clone Apache Iceberg repository (version 1.10.1)
git clone --depth 1 --branch apache-iceberg-1.10.1 https://github.com/apache/iceberg.git

# Build the project (skip tests for faster build)
cd iceberg
./gradlew -x test -x integrationTest clean build

# Distribution ZIP will be created at:
# kafka-connect/kafka-connect-runtime/build/distributions/

Build Output:

  • Standard: iceberg-kafka-connect-runtime-1.11.0-SNAPSHOT.zip (~165 MB)
  • With Hive: iceberg-kafka-connect-runtime-hive-1.11.0-SNAPSHOT.zip (~378 MB)

Build Time: ~4-5 minutes on a modern machine

1.2 Copy Distribution

# Copy the standard distribution (without Hive)
cp kafka-connect/kafka-connect-runtime/build/distributions/iceberg-kafka-connect-runtime-1.11.0-SNAPSHOT.zip ~/iceberg-kafka-connect/

# Verify
cd ~/iceberg-kafka-connect
ls -lh *.zip
# -rw-r--r--  1 user  staff   165M Mar 20 17:45 iceberg-kafka-connect-runtime-1.11.0-SNAPSHOT.zip

1.2 Upload to S3

# Create S3 bucket for plugins (use your account ID)
ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text)
aws s3 mb s3://cdc-msk-connect-plugins-${ACCOUNT_ID} --region us-east-1

# Upload plugin to S3
cd ~/iceberg-kafka-connect
aws s3 cp iceberg-kafka-connect-runtime-1.11.0-SNAPSHOT.zip \
  s3://cdc-msk-connect-plugins-${ACCOUNT_ID}/ \
  --region us-east-1

# Verify upload
aws s3 ls s3://cdc-msk-connect-plugins-${ACCOUNT_ID}/

Upload Time: ~15 seconds for 165 MB

1.3 Create Custom Plugin in MSK Connect

aws kafkaconnect create-custom-plugin \
  --name iceberg-kafka-connect \
  --content-type ZIP \
  --location '{"s3Location":{"bucketArn":"arn:aws:s3:::cdc-msk-connect-plugins-'${ACCOUNT_ID}'","fileKey":"iceberg-kafka-connect-runtime-1.11.0-SNAPSHOT.zip"}}' \
  --region us-east-1 \
  --query 'customPluginArn' \
  --output text

Output:

arn:aws:kafkaconnect:us-east-1:v123456789012
:custom-plugin/iceberg-kafka-connect/94f98f8a-4d71-4e3c-ae59-62f72ef7f2dd-2

Wait for plugin to be ACTIVE (takes ~2-3 minutes):

aws kafkaconnect describe-custom-plugin \
  --custom-plugin-arn <plugin-arn> \
  --region us-east-1 \
  --query 'customPluginState'

Step 2: Prepare AWS Infrastructure

2.1 Create S3 Bucket for Iceberg Data

# Create S3 bucket for Iceberg data
aws s3 mb s3://cdc-iceberg-data-${ACCOUNT_ID} --region us-east-1

2.2 Create Glue Database

aws glue create-database \
  --database-input '{
    "Name": "cdc_iceberg",
    "Description": "CDC data lake with Apache Iceberg tables",
    "LocationUri": "s3://cdc-iceberg-data-'${ACCOUNT_ID}'/"
  }' \
  --region us-east-1

Note: The Iceberg table will be auto-created by the connector on first write.

Step 3: Configure IAM Permissions

3.1 Create Trust Policy

cat > msk-connect-trust-policy.json <<EOF
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "kafkaconnect.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    }
  ]
}
EOF

3.2 Create IAM Role

aws iam create-role \
  --role-name MSKConnectIcebergRole \
  --assume-role-policy-document file://msk-connect-trust-policy.json \
  --region us-east-1

3.3 Create and Attach IAM Policy

cat > msk-connect-policy.json <<EOF
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "s3:GetObject",
        "s3:PutObject",
        "s3:DeleteObject",
        "s3:ListBucket"
      ],
      "Resource": [
        "arn:aws:s3:::cdc-iceberg-data-${ACCOUNT_ID}",
        "arn:aws:s3:::cdc-iceberg-data-${ACCOUNT_ID}/*"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
        "glue:GetDatabase",
        "glue:GetTable",
        "glue:CreateTable",
        "glue:UpdateTable",
        "glue:DeleteTable",
        "glue:GetPartition",
        "glue:GetPartitions",
        "glue:CreatePartition",
        "glue:UpdatePartition",
        "glue:DeletePartition"
      ],
      "Resource": [
        "arn:aws:glue:us-east-1:${ACCOUNT_ID}:catalog",
        "arn:aws:glue:us-east-1:${ACCOUNT_ID}:database/cdc_iceberg",
        "arn:aws:glue:us-east-1:${ACCOUNT_ID}:table/cdc_iceberg/*"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
        "kafka:DescribeCluster",
        "kafka:GetBootstrapBrokers"
      ],
      "Resource": "*"
    },
    {
      "Effect": "Allow",
      "Action": [
        "logs:CreateLogGroup",
        "logs:CreateLogStream",
        "logs:PutLogEvents"
      ],
      "Resource": "arn:aws:logs:us-east-1:${ACCOUNT_ID}:log-group:/aws/mskconnect/*"
    }
  ]
}
EOF

# Attach policy to role
aws iam put-role-policy \
  --role-name MSKConnectIcebergRole \
  --policy-name MSKConnectIcebergPolicy \
  --policy-document file://msk-connect-policy.json \
  --region us-east-1

Step 4: Create Iceberg Sink Connector

4.1 Connector Configuration

Create iceberg-connector-config.json:

{
  "connector.class": "org.apache.iceberg.connect.IcebergSinkConnector",
  "tasks.max": "2",
  "topics": "cdc.public.orders",
  "iceberg.tables": "cdc_iceberg.orders",
  "iceberg.tables.dynamic-enabled": "false",
  "iceberg.catalog.catalog-impl": "org.apache.iceberg.aws.glue.GlueCatalog",
  "iceberg.catalog.warehouse": "s3://cdc-iceberg-data-123456789012/",
"iceberg.catalog.io-impl": "org.apache.iceberg.aws.s3.S3FileIO", "iceberg.catalog.glue.skip-name-validation": "true", "iceberg.catalog.client.region": "us-east-1", "iceberg.table.auto-create-enabled": "true", "iceberg.table.write-format": "parquet", "iceberg.control.commit.interval-ms": "300000", "iceberg.control.commit.timeout-ms": "30000", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable": "false", "value.converter.schemas.enable": "false" }

Key Parameters Explained:

ParameterValueDescription
topicscdc.public.ordersKafka topic to consume
iceberg.tablescdc_iceberg.ordersTarget Iceberg table (database.table)
iceberg.catalog.catalog-implGlueCatalogUse AWS Glue for metadata
iceberg.catalog.warehouseS3 pathRoot location for Iceberg data
iceberg.catalog.client.regionus-east-1REQUIRED: AWS region for SDK
iceberg.table.auto-create-enabledtrueAuto-create table on first write
iceberg.table.write-formatparquetFile format (parquet/orc/avro)
iceberg.control.commit.interval-ms300000Commit every 5 minutes
iceberg.control.commit.timeout-ms30000Commit timeout: 30 seconds

⚠️ CRITICAL: The iceberg.catalog.client.region parameter is required for MSK Connect deployments. Without it, the connector will fail with "Unable to load region" error.

Step 5: Deploy MSK Connect Connector

5.1 Get VPC Configuration

# Get VPC ID from existing bastion instance
VPC_ID=$(aws ec2 describe-instances \
  --instance-ids i-01054a7000e6dbdbc \
  --query 'Reservations[0].Instances[0].VpcId' \
  --output text \
  --region us-east-1)

# Get private subnets
SUBNETS=$(aws ec2 describe-subnets \
  --filters "Name=vpc-id,Values=${VPC_ID}" "Name=map-public-ip-on-launch,Values=false" \
  --query 'Subnets[].SubnetId' \
  --output text \
  --region us-east-1)

# Get MSK security group
SG_ID=$(aws ec2 describe-security-groups \
  --filters "Name=vpc-id,Values=${VPC_ID}" "Name=group-name,Values=*msk*" \
  --query 'SecurityGroups[0].GroupId' \
  --output text \
  --region us-east-1)

5.2 Create Connector

aws kafkaconnect create-connector \
  --connector-name iceberg-sink-orders \
  --kafka-connect-version "3.7.x" \
  --plugins '[{"customPlugin":{"customPluginArn":"arn:aws:kafkaconnect:us-east-1:1234567890123:custom-plugin/iceberg-kafka-connect/94f98f8a-4d71-4e3c-ae59-62f72ef7f2dd-2","revision":1}}]' \
  --capacity '{"provisionedCapacity":{"mcuCount":2,"workerCount":2}}' \
  --connector-configuration file://iceberg-connector-config.json \
  --kafka-cluster '{"apacheKafkaCluster":{"bootstrapServers":"b-1.cdcmskcluster.o8mq7j.c7.kafka.us-east-1.amazonaws.com:9092,b-2.cdcmskcluster.o8mq7j.c7.kafka.us-east-1.amazonaws.com:9092","vpc":{"subnets":["subnet-123456789012","subnet-0f61d1cd95ad38d3b"],"securityGroups":["sg-08025899feecdc7af"]}}}' \
--kafka-cluster-client-authentication '{"authenticationType":"NONE"}' \ --kafka-cluster-encryption-in-transit '{"encryptionType":"PLAINTEXT"}' \ --service-execution-role-arn "arn:aws:iam:: 1234567890123:role/MSKConnectIcebergRole" \
--region us-east-1 \ --query 'connectorArn' \ --output text

Output:

arn:aws:kafkaconnect:us-east-1:1234567890123
:connector/iceberg-sink-orders/e1b1f5be-2b19-434d-982b-7f7696bdae3e-2

Creation Time: 5-10 minutes

5.3 Monitor Connector Status

# Check connector state
aws kafkaconnect describe-connector \
  --connector-arn arn:aws:kafkaconnect:us-east-1:123456789012:connector/iceberg-sink-orders/e1b1f5be-2b19-434d-982b-7f7696bdae3e-2 \
--region us-east-1 \ --query '{State:connectorState,Workers:capacity.provisionedCapacity}' # Expected: CREATING → RUNNING (2-5 minutes)

Step 6: Verify Data Flow

6.1 Insert Test Data in PostgreSQL

-- Insert test orders
INSERT INTO public.orders (customer_id, order_date, total_amount, status)
VALUES 
  (501, '2026-03-20 10:00:00', 299.99, 'pending'),
  (502, '2026-03-20 11:00:00', 149.50, 'confirmed'),

Expected States:

  • CREATING → RUNNING (5-10 minutes)
  • If FAILED, check CloudWatch Logs

Step 6: Test Data Flow

6.1 Insert Test Data

# Insert test data via bastion
aws ssm send-command \
  --document-name "AWS-RunShellScript" \
  --instance-ids "i-01054a7000e6dbdbc" \
  --parameters 'commands=[
    "PGPASSWORD=Welcome2026!! psql -h cdc-rds-postgres.cac6zqfrfzzz.us-east-1.rds.amazonaws.com -U postgres -d testdb -c \"INSERT INTO orders (customer_id, order_date, total_amount, status) VALUES (1, CURRENT_DATE, 99.99, '\''pending'\''), (2, CURRENT_DATE, 149.99, '\''completed'\''), (3, CURRENT_DATE, 199.99, '\''pending'\'');\""
  ]' \
  --region us-east-1

Result: 3 rows inserted

6.2 Test UPDATE and DELETE Operations

UPDATE Test:

# Update an order
aws ssm send-command \
  --document-name "AWS-RunShellScript" \
  --instance-ids "i-01054a7000e6dbdbc" \
  --parameters 'commands=[
    "PGPASSWORD=Welcome2026!! psql -h cdc-rds-postgres.cac6zqfrfzzz.us-east-1.rds.amazonaws.com -U postgres -d testdb -c \"UPDATE orders SET total_amount = 250.00, status = '\''completed'\'' WHERE customer_id = 1 RETURNING order_id, total_amount, status;\""
  ]' \
  --region us-east-1

DELETE Test:

# Delete an order
aws ssm send-command \
  --document-name "AWS-RunShellScript" \
  --instance-ids "i-01054a7000e6dbdbc" \
  --parameters 'commands=[
    "PGPASSWORD=Welcome2026!! psql -h cdc-rds-postgres.cac6zqfrfzzz.us-east-1.rds.amazonaws.com -U postgres -d testdb -c \"DELETE FROM orders WHERE customer_id = 3 RETURNING order_id;\""
  ]' \
  --region us-east-1

Expected CDC Events:

  • INSERT: op: "c" with after data
  • UPDATE: op: "u" with before and after data
  • DELETE: op: "d" with before data only

Iceberg Connector Behavior:

  • Default: Appends all events (INSERT, UPDATE, DELETE) as separate rows
  • With upsert mode: Maintains latest state per primary key
  • Soft deletes: DELETE events can be filtered or kept for audit trail

⚠️ Important: By default, Iceberg Kafka Connect in append mode will create multiple rows for the same order_id (one for each operation). To maintain current state only, you need to:

  1. Enable upsert mode with iceberg.table.upsert-mode-enabled: true
  2. Configure primary key: iceberg.table.primary-key: order_id
  3. Or use Iceberg's merge-on-read with compaction

6.3 Wait for Commit Interval

The connector commits data every 5 minutes (300,000 ms). After inserting data:

  • Wait 5-6 minutes for first commit
  • Data will appear in S3 and Glue catalog

6.3 Check Glue Table Created

# Check if table exists
aws glue get-table \
  --database-name cdc_iceberg \
  --name orders \
  --region us-east-1 \
  --query 'Table.{Name:Name,Location:StorageDescriptor.Location,Format:Parameters.table_type}'

Expected Output:

{
    "Name": "orders",
    "Location": "s3://cdc-iceberg-data-519199681203/orders",
    "Format": "ICEBERG"
}

6.4 Query Data via Athena

-- Check data arrived
SELECT * FROM cdc_iceberg.orders
ORDER BY order_id DESC
LIMIT 10;

-- Count records
SELECT COUNT(*) as total_orders FROM cdc_iceberg.orders;

-- Check by status
SELECT status, COUNT(*) as count
FROM cdc_iceberg.orders
GROUP BY status;

6.5 Verify S3 Data Files

# List Iceberg data files
aws s3 ls s3://cdc-iceberg-data-519199681203/orders/data/ --recursive

# Check metadata files
aws s3 ls s3://cdc-iceberg-data-519199681203/orders/metadata/ --recursive

Expected Structure:

orders/
├── data/
│   └── 00000-0-<uuid>.parquet
└── metadata/
    ├── v1.metadata.json
    ├── snap-<id>-1-<uuid>.avro
    └── <uuid>.metadata.json

Step 7: Monitor and Troubleshoot

7.1 CloudWatch Logs

# Tail logs in real-time
aws logs tail /aws/mskconnect/iceberg-sink-orders \
  --since 5m \
  --follow \
  --region us-east-1

# Search for errors
aws logs filter-log-events \
  --log-group-name /aws/mskconnect/iceberg-sink-orders \
  --filter-pattern "ERROR" \
  --start-time $(($(date +%s) - 3600))000 \
  --region us-east-1

7.2 Common Issues (Real Implementation Experience)

IssueCauseSolutionStatus
Region not foundMissing AWS region configAdd "iceberg.catalog.client.region": "us-east-1"✅ FIXED
Kafka producer timeoutSecurity group missing self-referenceAdd ingress rule allowing SG to itself on ports 9092-9098✅ FIXED
Connector stuck in CREATINGIAM permissionsCheck S3, Glue, MSK permissions-
No data in IcebergCommit intervalWait 5+ minutes after insert-
Table not createdAuto-create disabledEnable iceberg.table.auto-create-enabled-
Permission deniedIAM roleVerify role has S3/Glue access-

Issue 1: AWS SDK Region Error (CRITICAL)

Error Log:

software.amazon.awssdk.core.exception.SdkClientException: Unable to load region from any of the providers in the chain

Root Cause: The Iceberg connector's AWS SDK cannot automatically detect the region from MSK Connect workers.

Solution: Add explicit region configuration to connector config:

{
  "iceberg.catalog.client.region": "us-east-1"
}

Why This Happens: MSK Connect workers don't have EC2 metadata service access, and the connector doesn't inherit region from the MSK Connect service itself.

Issue 2: Kafka Producer Timeout (CRITICAL)

Error Log:

org.apache.kafka.common.errors.TimeoutException: Timeout expired after 60000ms while awaiting InitProducerId

Root Cause: MSK Connect workers need to produce to internal Kafka topics (offsets, status, config) but the security group only allowed ingress from VPC CIDR, not from the security group itself.

Solution: Add self-referencing security group rule:

aws ec2 authorize-security-group-ingress \
  --group-id sg-08025899feecdc7af \
  --protocol tcp \
  --port 9092-9098 \
  --source-group sg-08025899feecdc7af \
  --region us-east-1

Why This Happens: MSK Connect workers are in the same security group and need to communicate with each other and with MSK brokers for internal coordination.

7.3 Monitor Connector Health

# Get connector status
aws kafkaconnect describe-connector \
  --connector-arn arn:aws:kafkaconnect:us-east-1:519199681203:connector/iceberg-sink-orders/e1b1f5be-2b19-434d-982b-7f7696bdae3e-2 \
  --region us-east-1 \
  --query '{State:connectorState,Workers:capacity.provisionedCapacity,CreatedAt:creationTime}'

Advanced Features

Upsert Mode for CDC (Maintaining Current State)

By default, the Iceberg connector operates in append mode, creating a new row for every CDC event (INSERT, UPDATE, DELETE). For maintaining only the current state of each record, enable upsert mode:

{
  "connector.class": "org.apache.iceberg.connect.IcebergSinkConnector",
  "iceberg.table.upsert-mode-enabled": "true",
  "iceberg.table.primary-key": "order_id",
  "iceberg.table.upsert.dedup-column": "ts_ms",
  ...
}

Configuration Parameters:

ParameterValueDescription
iceberg.table.upsert-mode-enabledtrueEnable upsert instead of append
iceberg.table.primary-keyorder_idColumn(s) to use for deduplication
iceberg.table.upsert.dedup-columnts_msTimestamp column for ordering events

Behavior Comparison:

ModeINSERTUPDATEDELETEResult
AppendNew rowNew rowNew rowMultiple rows per order_id
UpsertNew rowReplaces rowRemoves rowOne row per order_id (current state)

Example with Upsert Mode:

-- After: INSERT (order_id=1001, amount=100)
--       UPDATE (order_id=1001, amount=150)
--       DELETE (order_id=1001)

-- Append mode result:
SELECT * FROM orders WHERE order_id = 1001;
-- Returns 3 rows (INSERT, UPDATE, DELETE events)

-- Upsert mode result:
SELECT * FROM orders WHERE order_id = 1001;
-- Returns 0 rows (DELETE removed the record)

When to Use Each Mode:

  • Append Mode: Audit trail, historical analysis, event sourcing
  • Upsert Mode: Current state queries, dashboards, reporting

Time Travel Queries

Query historical data snapshots:

-- Query data as of specific timestamp
SELECT * FROM cdc_iceberg.orders
FOR SYSTEM_TIME AS OF TIMESTAMP '2026-03-20 10:00:00';

-- Query data as of specific snapshot
SELECT * FROM cdc_iceberg.orders
FOR SYSTEM_VERSION AS OF 1234567890;

-- Compare current vs historical
SELECT 
  current.order_id,
  current.status as current_status,
  historical.status as previous_status
FROM cdc_iceberg.orders current
JOIN cdc_iceberg.orders FOR SYSTEM_TIME AS OF TIMESTAMP '2026-03-19 00:00:00' historical
  ON current.order_id = historical.order_id
WHERE current.status != historical.status;

Schema Evolution

Add columns without rewriting data:

-- Add new column
ALTER TABLE cdc_iceberg.orders 
ADD COLUMNS (shipping_address STRING);

-- Rename column
ALTER TABLE cdc_iceberg.orders 
RENAME COLUMN total_amount TO order_total;

-- Drop column
ALTER TABLE cdc_iceberg.orders 
DROP COLUMN old_column;

Partition Evolution

Change partitioning without rewriting data:

-- Change from daily to monthly partitions
ALTER TABLE cdc_iceberg.orders 
SET TBLPROPERTIES (
  'write.partition-spec' = 'month(order_date)'
);

Incremental Reads

Read only new data since last query:

-- Get latest snapshot ID
SELECT snapshot_id FROM cdc_iceberg.orders$snapshots
ORDER BY committed_at DESC LIMIT 1;

-- Read incremental changes
SELECT * FROM cdc_iceberg.orders
FOR SYSTEM_VERSION AS OF <previous_snapshot_id>
EXCEPT
SELECT * FROM cdc_iceberg.orders
FOR SYSTEM_VERSION AS OF <current_snapshot_id>;

Performance Optimization

1. Tune Commit Interval

{
  "iceberg.control.commit.interval-ms": "300000",  // 5 minutes (default)
  "iceberg.control.commit.timeout-ms": "30000"     // 30 seconds
}

Trade-offs:

  • Shorter interval: More frequent commits, smaller files, higher overhead
  • Longer interval: Fewer commits, larger files, higher latency

2. Configure File Size

{
  "iceberg.table.write.target-file-size-bytes": "134217728"  // 128 MB
}

3. Enable Compaction

Iceberg automatically compacts small files. Configure thresholds:

ALTER TABLE cdc_iceberg.orders 
SET TBLPROPERTIES (
  'write.target-file-size-bytes' = '134217728',
  'commit.manifest.target-size-bytes' = '8388608',
  'commit.manifest-merge.enabled' = 'true'
);

4. Optimize Partitioning

-- Check partition statistics
SELECT 
  partition,
  record_count,
  file_count,
  total_data_file_size_in_bytes / 1024 / 1024 as size_mb
FROM cdc_iceberg.orders$partitions
ORDER BY partition DESC;

Troubleshooting

Issue 1: Connector Fails to Start

Symptoms: Connector state is FAILED

Check logs:

aws logs tail /aws/mskconnect/iceberg-sink-orders --since 10m

Common causes:

  1. IAM permissions missing
  2. S3 bucket doesn't exist
  3. Glue database doesn't exist
  4. Invalid configuration

Issue 2: No Data in Iceberg Table

Diagnosis:

# Check if connector is consuming
aws logs tail /aws/mskconnect/iceberg-sink-orders \
  --since 5m | grep "Committing"

# Check Kafka topic has data
kafka-console-consumer.sh \
  --bootstrap-server $BOOTSTRAP_SERVERS \
  --topic cdc.public.orders \
  --from-beginning \
  --max-messages 1

Solutions:

  • Verify topic name matches configuration
  • Check connector is in RUNNING state
  • Verify IAM permissions for S3 and Glue

Issue 3: Schema Mismatch

Symptoms: Connector logs show schema errors

Solution:

-- Drop and recreate table
DROP TABLE cdc_iceberg.orders;

-- Let connector auto-create with correct schema
-- Or manually create with matching schema

Cost Optimization

Estimated Costs (100 GB/month)

MSK Connect:

  • 2 MCU × 2 workers = 4 MCU
  • Cost: ~$0.11/hour × 4 = $0.44/hour
  • Monthly: ~$316

S3 Storage:

  • 100 GB × $0.023/GB = $2.30/month

Glue Catalog:

  • First 1M requests free
  • Minimal cost for metadata

Total: ~$320/month

Cost Reduction Tips:

  1. Right-size workers: Start with 1 MCU × 1 worker
  2. Optimize commit interval: Reduce commits to save API calls
  3. Use S3 Intelligent-Tiering: For infrequently accessed data
  4. Enable compaction: Reduce small file overhead

Best Practices

  1. Partition Strategy: Choose based on query patterns
  2. Commit Interval: Balance latency vs overhead (5-10 minutes recommended)
  3. Monitoring: Set up CloudWatch alarms for connector state
  4. Schema Management: Use schema evolution instead of recreating tables
  5. Testing: Test in non-production first
  6. Backup: Enable S3 versioning for Iceberg metadata
  7. Compaction: Schedule regular compaction jobs

Conclusion

Using MSK Connect with Iceberg Kafka Connect provides a fully managed, scalable solution for building a real-time data lake. The combination of Debezium CDC, Kafka, and Iceberg delivers:

✅ Real-time data ingestion
✅ ACID transactions
✅ Schema evolution
✅ Time travel queries
✅ Efficient storage
✅ Fully managed on AWS

This architecture is production-ready and scales to handle high-throughput CDC workloads while maintaining data quality and query performance.

Next Steps

  • Implement customers table with Firehose (Blog Post #6)
  • Compare both approaches (Blog Post #7)
  • Set up monitoring and alerting
  • Implement data quality checks
  • Configure retention policies

Resources

No comments:

Post a Comment