Introduction
While MSK Connect provides a robust solution for streaming Kafka data to Iceberg (Blog Post #5), Amazon Data Firehose offers a simpler, fully managed alternative with less operational overhead. This guide demonstrates how to stream CDC events from Kafka to S3 Iceberg tables using Amazon Data Firehose with Apache Iceberg table format support.
Architecture Overview
PostgreSQL (RDS/Aurora)
↓ Debezium CDC
Kafka Topics (MSK)
↓ Amazon Data Firehose
S3 Iceberg Tables (Glue Catalog)
↓ Query
Athena / Spark / Trino
MSK Connect vs Firehose: When to Use Each
| Feature | MSK Connect + Iceberg | Amazon Data Firehose |
|---|---|---|
| Setup Complexity | High (build plugin, configure) | Low (managed service) |
| Operational Overhead | Medium (monitor workers) | Low (fully managed) |
| Customization | High (full Kafka Connect API) | Limited (Firehose options) |
| Cost | ~$316/month (4 MCU) | Pay per GB ingested (~$7/month for 100GB) |
| Latency | 5-10 minutes (configurable) | 60-900 seconds (configurable) |
| Schema Evolution | Full Iceberg support | Limited (manual Glue updates) |
| Partitioning | Dynamic partitioning | Static partitioning |
| CDC Handling | ✅ Upsert mode available | ❌ Append-only (requires query-time dedup) |
| UPDATE/DELETE | ✅ Native support with upsert | ⚠️ Appends events (need dedup logic) |
| Current State Queries | ✅ Simple (with upsert mode) | ❌ Complex (need window functions) |
| Audit Trail | ✅ Available in both modes | ✅ All events preserved |
| Best For | Complex transformations, high throughput, frequent updates | Simple CDC, cost optimization, append-heavy workloads |
Use Case: Customers Table
We'll implement the customers table CDC pipeline using Amazon Data Firehose as an alternative to MSK Connect.
Sample Data Flow
PostgreSQL Operations:
-- INSERT
INSERT INTO customers (name, email, phone, address, created_at)
VALUES ('John Doe', 'john@example.com', '555-0101', '123 Main St', NOW());
-- UPDATE
UPDATE customers SET email = 'john.doe@example.com', phone = '555-9999'
WHERE customer_id = 1001;
-- DELETE
DELETE FROM customers WHERE customer_id = 1001;
Kafka CDC Events:
INSERT Event (op: "c"):
{
"before": null,
"after": {
"customer_id": 1001,
"name": "John Doe",
"email": "john@example.com",
"phone": "555-0101",
"address": "123 Main St",
"created_at": "2026-03-20T10:30:00Z"
},
"op": "c",
"ts_ms": 1773991800000
}
UPDATE Event (op: "u"):
{
"before": {
"customer_id": 1001,
"name": "John Doe",
"email": "john@example.com",
"phone": "555-0101",
"address": "123 Main St",
"created_at": "2026-03-20T10:30:00Z"
},
"after": {
"customer_id": 1001,
"name": "John Doe",
"email": "john.doe@example.com",
"phone": "555-9999",
"address": "123 Main St",
"created_at": "2026-03-20T10:30:00Z"
},
"op": "u",
"ts_ms": 1773991900000
}
DELETE Event (op: "d"):
{
"before": {
"customer_id": 1001,
"name": "John Doe",
"email": "john.doe@example.com",
"phone": "555-9999",
"address": "123 Main St",
"created_at": "2026-03-20T10:30:00Z"
},
"after": null,
"op": "d",
"ts_ms": 1773992000000
}
Iceberg Table Behavior:
- INSERT: New row added to Iceberg table
- UPDATE: New row appended (Firehose uses append-only mode)
- DELETE: New row with delete marker appended
⚠️ Important: Amazon Data Firehose writes to Iceberg in append-only mode. All CDC events (INSERT, UPDATE, DELETE) are appended as separate rows. To query current state, you need to:
- Use Iceberg's time travel to get latest snapshot
- Implement deduplication logic in queries
- Run periodic compaction jobs to merge updates
Prerequisites
- Debezium connector running on MSK Connect (Blog Post #1)
- Kafka topics with CDC events (
cdc.public.customers) - S3 bucket for Iceberg data storage
- AWS Glue database and table schema
- IAM roles for Firehose
Step 1: Create Glue Table Schema
Unlike MSK Connect with auto-create, Firehose requires the Glue table to exist first.
1.1 Create Glue Database (if not exists)
aws glue create-database \
--database-input '{
"Name": "cdc_iceberg",
"Description": "CDC data lake with Apache Iceberg tables",
"LocationUri": "s3://cdc-iceberg-data-123456789012/"
}' \
--region us-east-1
1.2 Create Glue Table for Customers
cat > customers-table-input.json <<'EOF'
{
"Name": "customers",
"StorageDescriptor": {
"Columns": [
{"Name": "customer_id", "Type": "bigint"},
{"Name": "name", "Type": "string"},
{"Name": "email", "Type": "string"},
{"Name": "phone", "Type": "string"},
{"Name": "address", "Type": "string"},
{"Name": "created_at", "Type": "timestamp"},
{"Name": "updated_at", "Type": "timestamp"}
],
"Location": "s3://cdc-iceberg-data-123456789012/customers/",
"InputFormat": "org.apache.hadoop.mapred.FileInputFormat",
"OutputFormat": "org.apache.hadoop.mapred.FileOutputFormat",
"SerdeInfo": {
"SerializationLibrary": "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"
}
},
"Parameters": {
"table_type": "ICEBERG",
"format": "parquet",
"write.format.default": "parquet"
}
}
EOF
aws glue create-table \
--database-name cdc_iceberg \
--table-input file://customers-table-input.json \
--region us-east-1
Step 2: Create IAM Role for Firehose
2.1 Trust Policy
cat > firehose-trust-policy.json <<'EOF'
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "firehose.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
EOF
aws iam create-role \
--role-name FirehoseIcebergRole \
--assume-role-policy-document file://firehose-trust-policy.json \
--region us-east-1
2.2 Permissions Policy
ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text)
cat > firehose-policy.json <<EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:PutObject",
"s3:DeleteObject",
"s3:ListBucket",
"s3:GetBucketLocation"
],
"Resource": [
"arn:aws:s3:::cdc-iceberg-data-${ACCOUNT_ID}",
"arn:aws:s3:::cdc-iceberg-data-${ACCOUNT_ID}/*"
]
},
{
"Effect": "Allow",
"Action": [
"glue:GetDatabase",
"glue:GetTable",
"glue:GetTableVersion",
"glue:GetTableVersions",
"glue:UpdateTable"
],
"Resource": [
"arn:aws:glue:us-east-1:${ACCOUNT_ID}:catalog",
"arn:aws:glue:us-east-1:${ACCOUNT_ID}:database/cdc_iceberg",
"arn:aws:glue:us-east-1:${ACCOUNT_ID}:table/cdc_iceberg/customers"
]
},
{
"Effect": "Allow",
"Action": [
"kafka:GetBootstrapBrokers",
"kafka:DescribeCluster",
"kafka:DescribeClusterV2"
],
"Resource": "*"
},
{
"Effect": "Allow",
"Action": [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Resource": "arn:aws:logs:us-east-1:${ACCOUNT_ID}:log-group:/aws/kinesisfirehose/*"
}
]
}
EOF
aws iam put-role-policy \
--role-name FirehoseIcebergRole \
--policy-name FirehoseIcebergPolicy \
--policy-document file://firehose-policy.json \
--region us-east-1
Step 3: Create Firehose Delivery Stream
3.1 Get MSK Cluster ARN
MSK_ARN=$(aws kafka list-clusters-v2 \
--region us-east-1 \
--query 'ClusterInfoList[0].ClusterArn' \
--output text)
echo "MSK Cluster ARN: $MSK_ARN"
3.2 Create Delivery Stream
ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text)
cat > firehose-config.json <<EOF
{
"DeliveryStreamName": "kafka-cdc-customers-to-iceberg",
"DeliveryStreamType": "MSKAsSource",
"MSKSourceConfiguration": {
"MSKClusterARN": "${MSK_ARN}",
"TopicName": "cdc.public.customers",
"AuthenticationConfiguration": {
"Connectivity": "PRIVATE",
"RoleARN": "arn:aws:iam::${ACCOUNT_ID}:role/FirehoseIcebergRole"
}
},
"IcebergDestinationConfiguration": {
"RoleARN": "arn:aws:iam::${ACCOUNT_ID}:role/FirehoseIcebergRole",
"CatalogConfiguration": {
"CatalogARN": "arn:aws:glue:us-east-1:${ACCOUNT_ID}:catalog"
},
"DestinationTableConfigurationList": [
{
"DestinationDatabaseName": "cdc_iceberg",
"DestinationTableName": "customers"
}
],
"S3Configuration": {
"RoleARN": "arn:aws:iam::${ACCOUNT_ID}:role/FirehoseIcebergRole",
"BucketARN": "arn:aws:s3:::cdc-iceberg-data-${ACCOUNT_ID}",
"Prefix": "customers/",
"ErrorOutputPrefix": "errors/customers/",
"BufferingHints": {
"SizeInMBs": 128,
"IntervalInSeconds": 300
},
"CompressionFormat": "UNCOMPRESSED",
"CloudWatchLoggingOptions": {
"Enabled": true,
"LogGroupName": "/aws/kinesisfirehose/kafka-cdc-customers",
"LogStreamName": "IcebergDelivery"
}
},
"ProcessingConfiguration": {
"Enabled": false
},
"BufferingHints": {
"SizeInMBs": 128,
"IntervalInSeconds": 300
}
}
}
EOF
# Create log group first
aws logs create-log-group \
--log-group-name /aws/kinesisfirehose/kafka-cdc-customers \
--region us-east-1
# Create delivery stream
aws firehose create-delivery-stream \
--cli-input-json file://firehose-config.json \
--region us-east-1
Key Configuration Parameters:
| Parameter | Value | Description |
|---|---|---|
TopicName | cdc.public.customers | Kafka topic to consume |
DestinationDatabaseName | cdc_iceberg | Glue database |
DestinationTableName | customers | Glue table |
BufferingHints.SizeInMBs | 128 | Buffer size before write |
BufferingHints.IntervalInSeconds | 300 | Buffer time (5 minutes) |
3.3 Monitor Delivery Stream Creation
# Check status
aws firehose describe-delivery-stream \
--delivery-stream-name kafka-cdc-customers-to-iceberg \
--region us-east-1 \
--query 'DeliveryStreamDescription.DeliveryStreamStatus' \
--output text
# Expected: CREATING → ACTIVE (2-3 minutes)
Step 4: Test Data Flow
4.1 Insert Test Data
# Get RDS endpoint
RDS_ENDPOINT=$(aws cloudformation describe-stacks \
--stack-name cdc-demo-rds \
--region us-east-1 \
--query 'Stacks[0].Outputs[?OutputKey==`RDSEndpoint`].OutputValue' \
--output text)
# Insert via bastion
aws ssm send-command \
--document-name "AWS-RunShellScript" \
--instance-ids "i-01054a7000e6dbdbc" \
--parameters "commands=[
\"PGPASSWORD=Welcome2026!! psql -h ${RDS_ENDPOINT} -U postgres -d testdb -c \\\"INSERT INTO customers (name, email, phone, address, created_at) VALUES ('Alice Johnson', 'alice@example.com', '555-0101', '123 Main St', NOW()), ('Bob Smith', 'bob@example.com', '555-0102', '456 Oak Ave', NOW()), ('Carol White', 'carol@example.com', '555-0103', '789 Pine Rd', NOW());\\\"\"
]" \
--region us-east-1
4.2 Wait for Buffering Interval
Firehose buffers data based on:
- Size: 128 MB (whichever comes first)
- Time: 300 seconds (5 minutes)
For small test data, wait 5-6 minutes.
4.3 Check Delivery Stream Metrics
# Get delivery metrics
aws cloudwatch get-metric-statistics \
--namespace AWS/Firehose \
--metric-name DeliveryToS3.Records \
--dimensions Name=DeliveryStreamName,Value=kafka-cdc-customers-to-iceberg \
--start-time $(date -u -d '10 minutes ago' +%Y-%m-%dT%H:%M:%S) \
--end-time $(date -u +%Y-%m-%dT%H:%M:%S) \
--period 300 \
--statistics Sum \
--region us-east-1
4.4 Query Data via Athena
-- Check data arrived
SELECT * FROM cdc_iceberg.customers
ORDER BY customer_id DESC
LIMIT 10;
-- Count records
SELECT COUNT(*) as total_customers FROM cdc_iceberg.customers;
-- Check recent inserts
SELECT name, email, created_at
FROM cdc_iceberg.customers
WHERE created_at > CURRENT_TIMESTAMP - INTERVAL '1' HOUR;
-- Query current state (deduplicating CDC events)
-- This handles INSERT, UPDATE, DELETE by taking the latest event per customer_id
WITH ranked_events AS (
SELECT
*,
ROW_NUMBER() OVER (PARTITION BY customer_id ORDER BY ts_ms DESC) as rn
FROM cdc_iceberg.customers
)
SELECT customer_id, name, email, phone, address, created_at
FROM ranked_events
WHERE rn = 1
AND op != 'd' -- Exclude deleted records
ORDER BY customer_id;
Handling CDC Operations in Queries:
Since Firehose uses append-only mode, you need to handle UPDATE and DELETE events in your queries:
-- Get latest version of each customer (including updates)
SELECT DISTINCT ON (customer_id)
customer_id, name, email, phone, address, created_at
FROM cdc_iceberg.customers
ORDER BY customer_id, ts_ms DESC;
-- Count active customers (excluding deletes)
SELECT COUNT(DISTINCT customer_id) as active_customers
FROM (
SELECT customer_id, op,
ROW_NUMBER() OVER (PARTITION BY customer_id ORDER BY ts_ms DESC) as rn
FROM cdc_iceberg.customers
) latest
WHERE rn = 1 AND op != 'd';
-- Audit trail: See all changes to a customer
SELECT customer_id, op, name, email, ts_ms,
CASE op
WHEN 'c' THEN 'Created'
WHEN 'u' THEN 'Updated'
WHEN 'd' THEN 'Deleted'
END as operation
FROM cdc_iceberg.customers
WHERE customer_id = 1001
ORDER BY ts_ms;
4.5 Verify S3 Data Files
# List Iceberg data files
aws s3 ls s3://cdc-iceberg-data-519199681203/customers/data/ --recursive
# Check metadata files
aws s3 ls s3://cdc-iceberg-data-519199681203/customers/metadata/ --recursive
Step 5: Monitor and Troubleshoot
5.1 CloudWatch Logs
# Tail logs
aws logs tail /aws/kinesisfirehose/kafka-cdc-customers \
--since 5m \
--follow \
--region us-east-1
# Search for errors
aws logs filter-log-events \
--log-group-name /aws/kinesisfirehose/kafka-cdc-customers \
--filter-pattern "ERROR" \
--start-time $(($(date +%s) - 3600))000 \
--region us-east-1
5.2 Common Issues
| Issue | Cause | Solution |
|---|---|---|
| Stream stuck in CREATING | IAM permissions | Check S3, Glue, MSK permissions |
| No data delivered | Buffering not met | Wait for buffer time/size threshold |
| Schema mismatch | Glue table schema wrong | Update Glue table schema |
| Access denied | IAM role | Verify Firehose role has all permissions |
| MSK connectivity | VPC/Security groups | Check Firehose can reach MSK |
5.3 Monitor Delivery Stream Health
# Get stream details
aws firehose describe-delivery-stream \
--delivery-stream-name kafka-cdc-customers-to-iceberg \
--region us-east-1 \
--query 'DeliveryStreamDescription.{Status:DeliveryStreamStatus,Created:CreateTimestamp,Source:Source.MSKSourceDescription.TopicName}'
Performance Tuning
1. Optimize Buffering
{
"BufferingHints": {
"SizeInMBs": 128, // Increase for higher throughput
"IntervalInSeconds": 60 // Decrease for lower latency
}
}
Trade-offs:
- Larger buffer: Fewer S3 writes, lower cost, higher latency
- Smaller buffer: More S3 writes, higher cost, lower latency
2. Enable Data Transformation (Optional)
{
"ProcessingConfiguration": {
"Enabled": true,
"Processors": [
{
"Type": "Lambda",
"Parameters": [
{
"ParameterName": "LambdaArn",
"ParameterValue": "arn:aws:lambda:us-east-1:123456789012:function:transform-cdc"
}
]
}
]
}
}
3. Monitor Key Metrics
# Records delivered
aws cloudwatch get-metric-statistics \
--namespace AWS/Firehose \
--metric-name DeliveryToS3.Records \
--dimensions Name=DeliveryStreamName,Value=kafka-cdc-customers-to-iceberg \
--start-time $(date -u -d '1 hour ago' +%Y-%m-%dT%H:%M:%S) \
--end-time $(date -u +%Y-%m-%dT%H:%M:%S) \
--period 300 \
--statistics Sum \
--region us-east-1
# Data freshness (age of oldest record)
aws cloudwatch get-metric-statistics \
--namespace AWS/Firehose \
--metric-name DeliveryToS3.DataFreshness \
--dimensions Name=DeliveryStreamName,Value=kafka-cdc-customers-to-iceberg \
--start-time $(date -u -d '1 hour ago' +%Y-%m-%dT%H:%M:%S) \
--end-time $(date -u +%Y-%m-%dT%H:%M:%S) \
--period 300 \
--statistics Average \
--region us-east-1
Cost Analysis
Estimated Costs (100 GB/month)
Amazon Data Firehose:
- Data ingestion: 100 GB × $0.029/GB = $2.90/month
- Format conversion: 100 GB × $0.018/GB = $1.80/month
- Total Firehose: $4.70/month
S3 Storage:
- 100 GB × $0.023/GB = $2.30/month
Glue Catalog:
- First 1M requests free
- Minimal cost
Total: ~$7/month
Cost Comparison
| Solution | Monthly Cost | Setup Complexity | Operational Overhead |
|---|---|---|---|
| Firehose | ~$7 | Low | Very Low |
| MSK Connect | ~$316 | High | Medium |
Savings: ~$309/month (98% reduction) for similar throughput
Limitations
- No Auto-Create: Glue table must exist before streaming
- Limited Schema Evolution: Manual Glue table updates required
- Static Partitioning: Cannot dynamically change partitions
- No Custom Transformations: Limited to Lambda functions
- Buffering Constraints: Minimum 60 seconds latency
- Append-Only Mode: No native upsert support - all CDC events appended as rows
- Query Complexity: Need deduplication logic to get current state
Handling CDC Events with Firehose
Unlike MSK Connect with Iceberg connector's upsert mode, Firehose writes all CDC events as append-only rows. This means:
Challenge: Multiple rows per entity
-- After: INSERT customer_id=1001
-- UPDATE customer_id=1001 (change email)
-- UPDATE customer_id=1001 (change phone)
SELECT * FROM customers WHERE customer_id = 1001;
-- Returns 3 rows (1 INSERT + 2 UPDATEs)
Solution 1: Query-time Deduplication
-- Get latest state per customer
SELECT DISTINCT ON (customer_id) *
FROM customers
ORDER BY customer_id, ts_ms DESC;
Solution 2: Materialized Views
-- Create view with latest state
CREATE OR REPLACE VIEW customers_current AS
SELECT customer_id, name, email, phone, address, created_at
FROM (
SELECT *, ROW_NUMBER() OVER (PARTITION BY customer_id ORDER BY ts_ms DESC) as rn
FROM customers
) WHERE rn = 1 AND op != 'd';
Solution 3: Periodic Compaction
# AWS Glue job or Lambda to compact Iceberg table
import pyiceberg
from pyiceberg.catalog import load_catalog
catalog = load_catalog("glue")
table = catalog.load_table("cdc_iceberg.customers")
# Compact to latest state only
table.rewrite_data_files(
target_file_size_bytes=128 * 1024 * 1024,
min_input_files=2
)
Recommendation: For tables with frequent updates, consider MSK Connect with upsert mode instead of Firehose.
Best Practices
- Pre-create Glue Tables: Define schema before starting stream
- Monitor Data Freshness: Set CloudWatch alarms for delivery lag
- Optimize Buffer Size: Balance latency vs cost
- Enable Error Logging: Always configure CloudWatch Logs
- Test Schema Changes: Validate in non-production first
- Use S3 Lifecycle: Archive old data to Glacier
- Set Up Alerts: Monitor delivery failures
Conclusion
Amazon Data Firehose provides a cost-effective, low-maintenance alternative to MSK Connect for streaming Kafka CDC data to Iceberg. While it has some limitations compared to MSK Connect, it's ideal for:
✅ Simple CDC pipelines
✅ Cost-sensitive workloads
✅ Low operational overhead requirements
✅ Predictable schema
✅ Acceptable 1-5 minute latency
For the customers table with stable schema and moderate throughput, Firehose delivers 98% cost savings compared to MSK Connect while maintaining data quality and query performance.
Next Steps
- Compare MSK Connect vs Firehose performance (Blog Post #7)
- Implement monitoring and alerting
- Set up data quality checks
- Configure retention policies
- Optimize query performance
No comments:
Post a Comment