Introduction
After capturing database changes with Debezium, the next challenge is storing them in a queryable, scalable data lake. Apache Iceberg provides ACID transactions, schema evolution, and time travel capabilities—perfect for a modern data lake. This guide demonstrates how to stream CDC events from Kafka to S3 Iceberg tables using Amazon MSK Connect and the Iceberg Kafka Connect sink connector.
Architecture Overview
PostgreSQL (RDS/Aurora)
↓ Debezium CDC
Kafka Topics (MSK)
↓ Iceberg Sink Connector
S3 Iceberg Tables (Glue Catalog)
↓ Query
Athena / Spark / Trino
Why Apache Iceberg?
Apache Iceberg is a high-performance table format for huge analytic datasets that provides:
- ✅ ACID Transactions - Reliable writes and reads
- ✅ Schema Evolution - Add, drop, rename columns safely
- ✅ Time Travel - Query historical data snapshots
- ✅ Partition Evolution - Change partitioning without rewriting data
- ✅ Hidden Partitioning - No partition predicates in queries
- ✅ Incremental Reads - Efficient change data capture
- ✅ Compaction - Automatic small file optimization
Prerequisites
Before starting, ensure you have:
- Debezium connector running on MSK Connect (see Blog Post #1)
- Kafka topics with CDC events (
cdc.public.orders) - S3 bucket for Iceberg data storage
- AWS Glue for metadata catalog
- IAM roles with appropriate permissions
Use Case: Orders Table
We'll implement the orders table CDC pipeline using the official AWS solution with MSK Connect and Iceberg Kafka Connect.
Sample Data Flow
PostgreSQL Operations:
-- INSERT
INSERT INTO orders (customer_id, order_date, total_amount, status)
VALUES (501, '2026-03-20', 299.99, 'pending');
-- UPDATE
UPDATE orders SET total_amount = 350.00, status = 'completed'
WHERE order_id = 1001;
-- DELETE
DELETE FROM orders WHERE order_id = 1001;
Kafka CDC Events:
INSERT Event (op: "c"):
{
"before": null,
"after": {
"order_id": 1001,
"customer_id": 501,
"order_date": "2026-03-20T10:30:00Z",
"total_amount": 299.99,
"status": "pending"
},
"op": "c",
"ts_ms": 1773991800000
}
UPDATE Event (op: "u"):
{
"before": {
"order_id": 1001,
"customer_id": 501,
"order_date": "2026-03-20T10:30:00Z",
"total_amount": 299.99,
"status": "pending"
},
"after": {
"order_id": 1001,
"customer_id": 501,
"order_date": "2026-03-20T10:30:00Z",
"total_amount": 350.00,
"status": "completed"
},
"op": "u",
"ts_ms": 1773991900000
}
DELETE Event (op: "d"):
{
"before": {
"order_id": 1001,
"customer_id": 501,
"order_date": "2026-03-20T10:30:00Z",
"total_amount": 350.00,
"status": "completed"
},
"after": null,
"op": "d",
"ts_ms": 1773992000000
}
Iceberg Table Behavior:
- INSERT: New row added to Iceberg table
- UPDATE: Iceberg performs upsert based on primary key (if configured) or appends new version
- DELETE: Row marked as deleted (soft delete) or physically removed based on configuration
Step 1: Build Iceberg Kafka Connect Plugin
1.1 Build from Apache Iceberg Source
The Iceberg Kafka Connect connector is not available as a pre-built JAR in Maven Central. You need to build it from the official Apache Iceberg source:
# Create working directory
mkdir -p ~/iceberg-kafka-connect
cd ~/iceberg-kafka-connect
# Clone Apache Iceberg repository (version 1.10.1)
git clone --depth 1 --branch apache-iceberg-1.10.1 https://github.com/apache/iceberg.git
# Build the project (skip tests for faster build)
cd iceberg
./gradlew -x test -x integrationTest clean build
# Distribution ZIP will be created at:
# kafka-connect/kafka-connect-runtime/build/distributions/
Build Output:
- Standard:
iceberg-kafka-connect-runtime-1.11.0-SNAPSHOT.zip(~165 MB) - With Hive:
iceberg-kafka-connect-runtime-hive-1.11.0-SNAPSHOT.zip(~378 MB)
Build Time: ~4-5 minutes on a modern machine
1.2 Copy Distribution
# Copy the standard distribution (without Hive)
cp kafka-connect/kafka-connect-runtime/build/distributions/iceberg-kafka-connect-runtime-1.11.0-SNAPSHOT.zip ~/iceberg-kafka-connect/
# Verify
cd ~/iceberg-kafka-connect
ls -lh *.zip
# -rw-r--r-- 1 user staff 165M Mar 20 17:45 iceberg-kafka-connect-runtime-1.11.0-SNAPSHOT.zip
1.2 Upload to S3
# Create S3 bucket for plugins (use your account ID)
ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text)
aws s3 mb s3://cdc-msk-connect-plugins-${ACCOUNT_ID} --region us-east-1
# Upload plugin to S3
cd ~/iceberg-kafka-connect
aws s3 cp iceberg-kafka-connect-runtime-1.11.0-SNAPSHOT.zip \
s3://cdc-msk-connect-plugins-${ACCOUNT_ID}/ \
--region us-east-1
# Verify upload
aws s3 ls s3://cdc-msk-connect-plugins-${ACCOUNT_ID}/
Upload Time: ~15 seconds for 165 MB
1.3 Create Custom Plugin in MSK Connect
aws kafkaconnect create-custom-plugin \
--name iceberg-kafka-connect \
--content-type ZIP \
--location '{"s3Location":{"bucketArn":"arn:aws:s3:::cdc-msk-connect-plugins-'${ACCOUNT_ID}'","fileKey":"iceberg-kafka-connect-runtime-1.11.0-SNAPSHOT.zip"}}' \
--region us-east-1 \
--query 'customPluginArn' \
--output text
Output:
arn:aws:kafkaconnect:us-east-1:v123456789012
:custom-plugin/iceberg-kafka-connect/94f98f8a-4d71-4e3c-ae59-62f72ef7f2dd-2Wait for plugin to be ACTIVE (takes ~2-3 minutes):
aws kafkaconnect describe-custom-plugin \
--custom-plugin-arn <plugin-arn> \
--region us-east-1 \
--query 'customPluginState'
Step 2: Prepare AWS Infrastructure
2.1 Create S3 Bucket for Iceberg Data
# Create S3 bucket for Iceberg data
aws s3 mb s3://cdc-iceberg-data-${ACCOUNT_ID} --region us-east-1
2.2 Create Glue Database
aws glue create-database \
--database-input '{
"Name": "cdc_iceberg",
"Description": "CDC data lake with Apache Iceberg tables",
"LocationUri": "s3://cdc-iceberg-data-'${ACCOUNT_ID}'/"
}' \
--region us-east-1
Note: The Iceberg table will be auto-created by the connector on first write.
Step 3: Configure IAM Permissions
3.1 Create Trust Policy
cat > msk-connect-trust-policy.json <<EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "kafkaconnect.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
EOF
3.2 Create IAM Role
aws iam create-role \
--role-name MSKConnectIcebergRole \
--assume-role-policy-document file://msk-connect-trust-policy.json \
--region us-east-1
3.3 Create and Attach IAM Policy
cat > msk-connect-policy.json <<EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:PutObject",
"s3:DeleteObject",
"s3:ListBucket"
],
"Resource": [
"arn:aws:s3:::cdc-iceberg-data-${ACCOUNT_ID}",
"arn:aws:s3:::cdc-iceberg-data-${ACCOUNT_ID}/*"
]
},
{
"Effect": "Allow",
"Action": [
"glue:GetDatabase",
"glue:GetTable",
"glue:CreateTable",
"glue:UpdateTable",
"glue:DeleteTable",
"glue:GetPartition",
"glue:GetPartitions",
"glue:CreatePartition",
"glue:UpdatePartition",
"glue:DeletePartition"
],
"Resource": [
"arn:aws:glue:us-east-1:${ACCOUNT_ID}:catalog",
"arn:aws:glue:us-east-1:${ACCOUNT_ID}:database/cdc_iceberg",
"arn:aws:glue:us-east-1:${ACCOUNT_ID}:table/cdc_iceberg/*"
]
},
{
"Effect": "Allow",
"Action": [
"kafka:DescribeCluster",
"kafka:GetBootstrapBrokers"
],
"Resource": "*"
},
{
"Effect": "Allow",
"Action": [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Resource": "arn:aws:logs:us-east-1:${ACCOUNT_ID}:log-group:/aws/mskconnect/*"
}
]
}
EOF
# Attach policy to role
aws iam put-role-policy \
--role-name MSKConnectIcebergRole \
--policy-name MSKConnectIcebergPolicy \
--policy-document file://msk-connect-policy.json \
--region us-east-1
Step 4: Create Iceberg Sink Connector
4.1 Connector Configuration
Create iceberg-connector-config.json:
{
"connector.class": "org.apache.iceberg.connect.IcebergSinkConnector",
"tasks.max": "2",
"topics": "cdc.public.orders",
"iceberg.tables": "cdc_iceberg.orders",
"iceberg.tables.dynamic-enabled": "false",
"iceberg.catalog.catalog-impl": "org.apache.iceberg.aws.glue.GlueCatalog",
"iceberg.catalog.warehouse": "s3://cdc-iceberg-data-123456789012/",
"iceberg.catalog.io-impl": "org.apache.iceberg.aws.s3.S3FileIO",
"iceberg.catalog.glue.skip-name-validation": "true",
"iceberg.catalog.client.region": "us-east-1",
"iceberg.table.auto-create-enabled": "true",
"iceberg.table.write-format": "parquet",
"iceberg.control.commit.interval-ms": "300000",
"iceberg.control.commit.timeout-ms": "30000",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false"
}
Key Parameters Explained:
| Parameter | Value | Description |
|---|---|---|
topics | cdc.public.orders | Kafka topic to consume |
iceberg.tables | cdc_iceberg.orders | Target Iceberg table (database.table) |
iceberg.catalog.catalog-impl | GlueCatalog | Use AWS Glue for metadata |
iceberg.catalog.warehouse | S3 path | Root location for Iceberg data |
iceberg.catalog.client.region | us-east-1 | REQUIRED: AWS region for SDK |
iceberg.table.auto-create-enabled | true | Auto-create table on first write |
iceberg.table.write-format | parquet | File format (parquet/orc/avro) |
iceberg.control.commit.interval-ms | 300000 | Commit every 5 minutes |
iceberg.control.commit.timeout-ms | 30000 | Commit timeout: 30 seconds |
⚠️ CRITICAL: The iceberg.catalog.client.region parameter is required for MSK Connect deployments. Without it, the connector will fail with "Unable to load region" error.
Step 5: Deploy MSK Connect Connector
5.1 Get VPC Configuration
# Get VPC ID from existing bastion instance
VPC_ID=$(aws ec2 describe-instances \
--instance-ids i-01054a7000e6dbdbc \
--query 'Reservations[0].Instances[0].VpcId' \
--output text \
--region us-east-1)
# Get private subnets
SUBNETS=$(aws ec2 describe-subnets \
--filters "Name=vpc-id,Values=${VPC_ID}" "Name=map-public-ip-on-launch,Values=false" \
--query 'Subnets[].SubnetId' \
--output text \
--region us-east-1)
# Get MSK security group
SG_ID=$(aws ec2 describe-security-groups \
--filters "Name=vpc-id,Values=${VPC_ID}" "Name=group-name,Values=*msk*" \
--query 'SecurityGroups[0].GroupId' \
--output text \
--region us-east-1)
5.2 Create Connector
aws kafkaconnect create-connector \
--connector-name iceberg-sink-orders \
--kafka-connect-version "3.7.x" \
--plugins '[{"customPlugin":{"customPluginArn":"arn:aws:kafkaconnect:us-east-1:1234567890123:custom-plugin/iceberg-kafka-connect/94f98f8a-4d71-4e3c-ae59-62f72ef7f2dd-2","revision":1}}]' \
--capacity '{"provisionedCapacity":{"mcuCount":2,"workerCount":2}}' \
--connector-configuration file://iceberg-connector-config.json \
--kafka-cluster '{"apacheKafkaCluster":{"bootstrapServers":"b-1.cdcmskcluster.o8mq7j.c7.kafka.us-east-1.amazonaws.com:9092,b-2.cdcmskcluster.o8mq7j.c7.kafka.us-east-1.amazonaws.com:9092","vpc":{"subnets":["subnet-123456789012","subnet-0f61d1cd95ad38d3b"],"securityGroups":["sg-08025899feecdc7af"]}}}' \
--kafka-cluster-client-authentication '{"authenticationType":"NONE"}' \
--kafka-cluster-encryption-in-transit '{"encryptionType":"PLAINTEXT"}' \
--service-execution-role-arn "arn:aws:iam:: 1234567890123:role/MSKConnectIcebergRole" \
--region us-east-1 \
--query 'connectorArn' \
--output text
Output:
arn:aws:kafkaconnect:us-east-1:1234567890123
:connector/iceberg-sink-orders/e1b1f5be-2b19-434d-982b-7f7696bdae3e-2Creation Time: 5-10 minutes
5.3 Monitor Connector Status
# Check connector state
aws kafkaconnect describe-connector \
--connector-arn arn:aws:kafkaconnect:us-east-1:123456789012:connector/iceberg-sink-orders/e1b1f5be-2b19-434d-982b-7f7696bdae3e-2 \
--region us-east-1 \
--query '{State:connectorState,Workers:capacity.provisionedCapacity}'
# Expected: CREATING → RUNNING (2-5 minutes)
Step 6: Verify Data Flow
6.1 Insert Test Data in PostgreSQL
-- Insert test orders
INSERT INTO public.orders (customer_id, order_date, total_amount, status)
VALUES
(501, '2026-03-20 10:00:00', 299.99, 'pending'),
(502, '2026-03-20 11:00:00', 149.50, 'confirmed'),
Expected States:
CREATING→RUNNING(5-10 minutes)- If
FAILED, check CloudWatch Logs
Step 6: Test Data Flow
6.1 Insert Test Data
# Insert test data via bastion
aws ssm send-command \
--document-name "AWS-RunShellScript" \
--instance-ids "i-01054a7000e6dbdbc" \
--parameters 'commands=[
"PGPASSWORD=Welcome2026!! psql -h cdc-rds-postgres.cac6zqfrfzzz.us-east-1.rds.amazonaws.com -U postgres -d testdb -c \"INSERT INTO orders (customer_id, order_date, total_amount, status) VALUES (1, CURRENT_DATE, 99.99, '\''pending'\''), (2, CURRENT_DATE, 149.99, '\''completed'\''), (3, CURRENT_DATE, 199.99, '\''pending'\'');\""
]' \
--region us-east-1
Result: 3 rows inserted
6.2 Test UPDATE and DELETE Operations
UPDATE Test:
# Update an order
aws ssm send-command \
--document-name "AWS-RunShellScript" \
--instance-ids "i-01054a7000e6dbdbc" \
--parameters 'commands=[
"PGPASSWORD=Welcome2026!! psql -h cdc-rds-postgres.cac6zqfrfzzz.us-east-1.rds.amazonaws.com -U postgres -d testdb -c \"UPDATE orders SET total_amount = 250.00, status = '\''completed'\'' WHERE customer_id = 1 RETURNING order_id, total_amount, status;\""
]' \
--region us-east-1
DELETE Test:
# Delete an order
aws ssm send-command \
--document-name "AWS-RunShellScript" \
--instance-ids "i-01054a7000e6dbdbc" \
--parameters 'commands=[
"PGPASSWORD=Welcome2026!! psql -h cdc-rds-postgres.cac6zqfrfzzz.us-east-1.rds.amazonaws.com -U postgres -d testdb -c \"DELETE FROM orders WHERE customer_id = 3 RETURNING order_id;\""
]' \
--region us-east-1
Expected CDC Events:
- INSERT:
op: "c"withafterdata - UPDATE:
op: "u"withbeforeandafterdata - DELETE:
op: "d"withbeforedata only
Iceberg Connector Behavior:
- Default: Appends all events (INSERT, UPDATE, DELETE) as separate rows
- With upsert mode: Maintains latest state per primary key
- Soft deletes: DELETE events can be filtered or kept for audit trail
⚠️ Important: By default, Iceberg Kafka Connect in append mode will create multiple rows for the same order_id (one for each operation). To maintain current state only, you need to:
- Enable upsert mode with
iceberg.table.upsert-mode-enabled: true - Configure primary key:
iceberg.table.primary-key: order_id - Or use Iceberg's merge-on-read with compaction
6.3 Wait for Commit Interval
The connector commits data every 5 minutes (300,000 ms). After inserting data:
- Wait 5-6 minutes for first commit
- Data will appear in S3 and Glue catalog
6.3 Check Glue Table Created
# Check if table exists
aws glue get-table \
--database-name cdc_iceberg \
--name orders \
--region us-east-1 \
--query 'Table.{Name:Name,Location:StorageDescriptor.Location,Format:Parameters.table_type}'
Expected Output:
{
"Name": "orders",
"Location": "s3://cdc-iceberg-data-519199681203/orders",
"Format": "ICEBERG"
}
6.4 Query Data via Athena
-- Check data arrived
SELECT * FROM cdc_iceberg.orders
ORDER BY order_id DESC
LIMIT 10;
-- Count records
SELECT COUNT(*) as total_orders FROM cdc_iceberg.orders;
-- Check by status
SELECT status, COUNT(*) as count
FROM cdc_iceberg.orders
GROUP BY status;
6.5 Verify S3 Data Files
# List Iceberg data files
aws s3 ls s3://cdc-iceberg-data-519199681203/orders/data/ --recursive
# Check metadata files
aws s3 ls s3://cdc-iceberg-data-519199681203/orders/metadata/ --recursive
Expected Structure:
orders/
├── data/
│ └── 00000-0-<uuid>.parquet
└── metadata/
├── v1.metadata.json
├── snap-<id>-1-<uuid>.avro
└── <uuid>.metadata.json
Step 7: Monitor and Troubleshoot
7.1 CloudWatch Logs
# Tail logs in real-time
aws logs tail /aws/mskconnect/iceberg-sink-orders \
--since 5m \
--follow \
--region us-east-1
# Search for errors
aws logs filter-log-events \
--log-group-name /aws/mskconnect/iceberg-sink-orders \
--filter-pattern "ERROR" \
--start-time $(($(date +%s) - 3600))000 \
--region us-east-1
7.2 Common Issues (Real Implementation Experience)
| Issue | Cause | Solution | Status |
|---|---|---|---|
| Region not found | Missing AWS region config | Add "iceberg.catalog.client.region": "us-east-1" | ✅ FIXED |
| Kafka producer timeout | Security group missing self-reference | Add ingress rule allowing SG to itself on ports 9092-9098 | ✅ FIXED |
| Connector stuck in CREATING | IAM permissions | Check S3, Glue, MSK permissions | - |
| No data in Iceberg | Commit interval | Wait 5+ minutes after insert | - |
| Table not created | Auto-create disabled | Enable iceberg.table.auto-create-enabled | - |
| Permission denied | IAM role | Verify role has S3/Glue access | - |
Issue 1: AWS SDK Region Error (CRITICAL)
Error Log:
software.amazon.awssdk.core.exception.SdkClientException: Unable to load region from any of the providers in the chain
Root Cause: The Iceberg connector's AWS SDK cannot automatically detect the region from MSK Connect workers.
Solution: Add explicit region configuration to connector config:
{
"iceberg.catalog.client.region": "us-east-1"
}
Why This Happens: MSK Connect workers don't have EC2 metadata service access, and the connector doesn't inherit region from the MSK Connect service itself.
Issue 2: Kafka Producer Timeout (CRITICAL)
Error Log:
org.apache.kafka.common.errors.TimeoutException: Timeout expired after 60000ms while awaiting InitProducerId
Root Cause: MSK Connect workers need to produce to internal Kafka topics (offsets, status, config) but the security group only allowed ingress from VPC CIDR, not from the security group itself.
Solution: Add self-referencing security group rule:
aws ec2 authorize-security-group-ingress \
--group-id sg-08025899feecdc7af \
--protocol tcp \
--port 9092-9098 \
--source-group sg-08025899feecdc7af \
--region us-east-1
Why This Happens: MSK Connect workers are in the same security group and need to communicate with each other and with MSK brokers for internal coordination.
7.3 Monitor Connector Health
# Get connector status
aws kafkaconnect describe-connector \
--connector-arn arn:aws:kafkaconnect:us-east-1:519199681203:connector/iceberg-sink-orders/e1b1f5be-2b19-434d-982b-7f7696bdae3e-2 \
--region us-east-1 \
--query '{State:connectorState,Workers:capacity.provisionedCapacity,CreatedAt:creationTime}'
Advanced Features
Upsert Mode for CDC (Maintaining Current State)
By default, the Iceberg connector operates in append mode, creating a new row for every CDC event (INSERT, UPDATE, DELETE). For maintaining only the current state of each record, enable upsert mode:
{
"connector.class": "org.apache.iceberg.connect.IcebergSinkConnector",
"iceberg.table.upsert-mode-enabled": "true",
"iceberg.table.primary-key": "order_id",
"iceberg.table.upsert.dedup-column": "ts_ms",
...
}
Configuration Parameters:
| Parameter | Value | Description |
|---|---|---|
iceberg.table.upsert-mode-enabled | true | Enable upsert instead of append |
iceberg.table.primary-key | order_id | Column(s) to use for deduplication |
iceberg.table.upsert.dedup-column | ts_ms | Timestamp column for ordering events |
Behavior Comparison:
| Mode | INSERT | UPDATE | DELETE | Result |
|---|---|---|---|---|
| Append | New row | New row | New row | Multiple rows per order_id |
| Upsert | New row | Replaces row | Removes row | One row per order_id (current state) |
Example with Upsert Mode:
-- After: INSERT (order_id=1001, amount=100)
-- UPDATE (order_id=1001, amount=150)
-- DELETE (order_id=1001)
-- Append mode result:
SELECT * FROM orders WHERE order_id = 1001;
-- Returns 3 rows (INSERT, UPDATE, DELETE events)
-- Upsert mode result:
SELECT * FROM orders WHERE order_id = 1001;
-- Returns 0 rows (DELETE removed the record)
When to Use Each Mode:
- Append Mode: Audit trail, historical analysis, event sourcing
- Upsert Mode: Current state queries, dashboards, reporting
Time Travel Queries
Query historical data snapshots:
-- Query data as of specific timestamp
SELECT * FROM cdc_iceberg.orders
FOR SYSTEM_TIME AS OF TIMESTAMP '2026-03-20 10:00:00';
-- Query data as of specific snapshot
SELECT * FROM cdc_iceberg.orders
FOR SYSTEM_VERSION AS OF 1234567890;
-- Compare current vs historical
SELECT
current.order_id,
current.status as current_status,
historical.status as previous_status
FROM cdc_iceberg.orders current
JOIN cdc_iceberg.orders FOR SYSTEM_TIME AS OF TIMESTAMP '2026-03-19 00:00:00' historical
ON current.order_id = historical.order_id
WHERE current.status != historical.status;
Schema Evolution
Add columns without rewriting data:
-- Add new column
ALTER TABLE cdc_iceberg.orders
ADD COLUMNS (shipping_address STRING);
-- Rename column
ALTER TABLE cdc_iceberg.orders
RENAME COLUMN total_amount TO order_total;
-- Drop column
ALTER TABLE cdc_iceberg.orders
DROP COLUMN old_column;
Partition Evolution
Change partitioning without rewriting data:
-- Change from daily to monthly partitions
ALTER TABLE cdc_iceberg.orders
SET TBLPROPERTIES (
'write.partition-spec' = 'month(order_date)'
);
Incremental Reads
Read only new data since last query:
-- Get latest snapshot ID
SELECT snapshot_id FROM cdc_iceberg.orders$snapshots
ORDER BY committed_at DESC LIMIT 1;
-- Read incremental changes
SELECT * FROM cdc_iceberg.orders
FOR SYSTEM_VERSION AS OF <previous_snapshot_id>
EXCEPT
SELECT * FROM cdc_iceberg.orders
FOR SYSTEM_VERSION AS OF <current_snapshot_id>;
Performance Optimization
1. Tune Commit Interval
{
"iceberg.control.commit.interval-ms": "300000", // 5 minutes (default)
"iceberg.control.commit.timeout-ms": "30000" // 30 seconds
}
Trade-offs:
- Shorter interval: More frequent commits, smaller files, higher overhead
- Longer interval: Fewer commits, larger files, higher latency
2. Configure File Size
{
"iceberg.table.write.target-file-size-bytes": "134217728" // 128 MB
}
3. Enable Compaction
Iceberg automatically compacts small files. Configure thresholds:
ALTER TABLE cdc_iceberg.orders
SET TBLPROPERTIES (
'write.target-file-size-bytes' = '134217728',
'commit.manifest.target-size-bytes' = '8388608',
'commit.manifest-merge.enabled' = 'true'
);
4. Optimize Partitioning
-- Check partition statistics
SELECT
partition,
record_count,
file_count,
total_data_file_size_in_bytes / 1024 / 1024 as size_mb
FROM cdc_iceberg.orders$partitions
ORDER BY partition DESC;
Troubleshooting
Issue 1: Connector Fails to Start
Symptoms: Connector state is FAILED
Check logs:
aws logs tail /aws/mskconnect/iceberg-sink-orders --since 10m
Common causes:
- IAM permissions missing
- S3 bucket doesn't exist
- Glue database doesn't exist
- Invalid configuration
Issue 2: No Data in Iceberg Table
Diagnosis:
# Check if connector is consuming
aws logs tail /aws/mskconnect/iceberg-sink-orders \
--since 5m | grep "Committing"
# Check Kafka topic has data
kafka-console-consumer.sh \
--bootstrap-server $BOOTSTRAP_SERVERS \
--topic cdc.public.orders \
--from-beginning \
--max-messages 1
Solutions:
- Verify topic name matches configuration
- Check connector is in RUNNING state
- Verify IAM permissions for S3 and Glue
Issue 3: Schema Mismatch
Symptoms: Connector logs show schema errors
Solution:
-- Drop and recreate table
DROP TABLE cdc_iceberg.orders;
-- Let connector auto-create with correct schema
-- Or manually create with matching schema
Cost Optimization
Estimated Costs (100 GB/month)
MSK Connect:
- 2 MCU × 2 workers = 4 MCU
- Cost: ~$0.11/hour × 4 = $0.44/hour
- Monthly: ~$316
S3 Storage:
- 100 GB × $0.023/GB = $2.30/month
Glue Catalog:
- First 1M requests free
- Minimal cost for metadata
Total: ~$320/month
Cost Reduction Tips:
- Right-size workers: Start with 1 MCU × 1 worker
- Optimize commit interval: Reduce commits to save API calls
- Use S3 Intelligent-Tiering: For infrequently accessed data
- Enable compaction: Reduce small file overhead
Best Practices
- Partition Strategy: Choose based on query patterns
- Commit Interval: Balance latency vs overhead (5-10 minutes recommended)
- Monitoring: Set up CloudWatch alarms for connector state
- Schema Management: Use schema evolution instead of recreating tables
- Testing: Test in non-production first
- Backup: Enable S3 versioning for Iceberg metadata
- Compaction: Schedule regular compaction jobs
Conclusion
Using MSK Connect with Iceberg Kafka Connect provides a fully managed, scalable solution for building a real-time data lake. The combination of Debezium CDC, Kafka, and Iceberg delivers:
✅ Real-time data ingestion
✅ ACID transactions
✅ Schema evolution
✅ Time travel queries
✅ Efficient storage
✅ Fully managed on AWS
This architecture is production-ready and scales to handle high-throughput CDC workloads while maintaining data quality and query performance.
Next Steps
- Implement customers table with Firehose (Blog Post #6)
- Compare both approaches (Blog Post #7)
- Set up monitoring and alerting
- Implement data quality checks
- Configure retention policies
No comments:
Post a Comment