Friday, March 20, 2026

Streaming Kafka CDC to S3 Iceberg with Amazon Data Firehose

 

Introduction

While MSK Connect provides a robust solution for streaming Kafka data to Iceberg (Blog Post #5), Amazon Data Firehose offers a simpler, fully managed alternative with less operational overhead. This guide demonstrates how to stream CDC events from Kafka to S3 Iceberg tables using Amazon Data Firehose with Apache Iceberg table format support.

Architecture Overview

PostgreSQL (RDS/Aurora)
    ↓ Debezium CDC
Kafka Topics (MSK)
    ↓ Amazon Data Firehose
S3 Iceberg Tables (Glue Catalog)
    ↓ Query
Athena / Spark / Trino

MSK Connect vs Firehose: When to Use Each

FeatureMSK Connect + IcebergAmazon Data Firehose
Setup ComplexityHigh (build plugin, configure)Low (managed service)
Operational OverheadMedium (monitor workers)Low (fully managed)
CustomizationHigh (full Kafka Connect API)Limited (Firehose options)
Cost~$316/month (4 MCU)Pay per GB ingested (~$7/month for 100GB)
Latency5-10 minutes (configurable)60-900 seconds (configurable)
Schema EvolutionFull Iceberg supportLimited (manual Glue updates)
PartitioningDynamic partitioningStatic partitioning
CDC Handling✅ Upsert mode available❌ Append-only (requires query-time dedup)
UPDATE/DELETE✅ Native support with upsert⚠️ Appends events (need dedup logic)
Current State Queries✅ Simple (with upsert mode)❌ Complex (need window functions)
Audit Trail✅ Available in both modes✅ All events preserved
Best ForComplex transformations, high throughput, frequent updatesSimple CDC, cost optimization, append-heavy workloads

Use Case: Customers Table

We'll implement the customers table CDC pipeline using Amazon Data Firehose as an alternative to MSK Connect.

Sample Data Flow

PostgreSQL Operations:

-- INSERT
INSERT INTO customers (name, email, phone, address, created_at)
VALUES ('John Doe', 'john@example.com', '555-0101', '123 Main St', NOW());

-- UPDATE
UPDATE customers SET email = 'john.doe@example.com', phone = '555-9999'
WHERE customer_id = 1001;

-- DELETE
DELETE FROM customers WHERE customer_id = 1001;

Kafka CDC Events:

INSERT Event (op: "c"):

{
  "before": null,
  "after": {
    "customer_id": 1001,
    "name": "John Doe",
    "email": "john@example.com",
    "phone": "555-0101",
    "address": "123 Main St",
    "created_at": "2026-03-20T10:30:00Z"
  },
  "op": "c",
  "ts_ms": 1773991800000
}

UPDATE Event (op: "u"):

{
  "before": {
    "customer_id": 1001,
    "name": "John Doe",
    "email": "john@example.com",
    "phone": "555-0101",
    "address": "123 Main St",
    "created_at": "2026-03-20T10:30:00Z"
  },
  "after": {
    "customer_id": 1001,
    "name": "John Doe",
    "email": "john.doe@example.com",
    "phone": "555-9999",
    "address": "123 Main St",
    "created_at": "2026-03-20T10:30:00Z"
  },
  "op": "u",
  "ts_ms": 1773991900000
}

DELETE Event (op: "d"):

{
  "before": {
    "customer_id": 1001,
    "name": "John Doe",
    "email": "john.doe@example.com",
    "phone": "555-9999",
    "address": "123 Main St",
    "created_at": "2026-03-20T10:30:00Z"
  },
  "after": null,
  "op": "d",
  "ts_ms": 1773992000000
}

Iceberg Table Behavior:

  • INSERT: New row added to Iceberg table
  • UPDATE: New row appended (Firehose uses append-only mode)
  • DELETE: New row with delete marker appended

⚠️ Important: Amazon Data Firehose writes to Iceberg in append-only mode. All CDC events (INSERT, UPDATE, DELETE) are appended as separate rows. To query current state, you need to:

  1. Use Iceberg's time travel to get latest snapshot
  2. Implement deduplication logic in queries
  3. Run periodic compaction jobs to merge updates

Prerequisites

  1. Debezium connector running on MSK Connect (Blog Post #1)
  2. Kafka topics with CDC events (cdc.public.customers)
  3. S3 bucket for Iceberg data storage
  4. AWS Glue database and table schema
  5. IAM roles for Firehose

Step 1: Create Glue Table Schema

Unlike MSK Connect with auto-create, Firehose requires the Glue table to exist first.

1.1 Create Glue Database (if not exists)

aws glue create-database \
  --database-input '{
    "Name": "cdc_iceberg",
    "Description": "CDC data lake with Apache Iceberg tables",
    "LocationUri": "s3://cdc-iceberg-data-123456789012/"
}'
\ --region us-east-1

1.2 Create Glue Table for Customers

cat > customers-table-input.json <<'EOF'
{
  "Name": "customers",
  "StorageDescriptor": {
    "Columns": [
      {"Name": "customer_id", "Type": "bigint"},
      {"Name": "name", "Type": "string"},
      {"Name": "email", "Type": "string"},
      {"Name": "phone", "Type": "string"},
      {"Name": "address", "Type": "string"},
      {"Name": "created_at", "Type": "timestamp"},
      {"Name": "updated_at", "Type": "timestamp"}
    ],
    "Location": "s3://cdc-iceberg-data-123456789012/customers/",
    "InputFormat": "org.apache.hadoop.mapred.FileInputFormat",
    "OutputFormat": "org.apache.hadoop.mapred.FileOutputFormat",
    "SerdeInfo": {
      "SerializationLibrary": "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"
    }
  },
  "Parameters": {
    "table_type": "ICEBERG",
    "format": "parquet",
    "write.format.default": "parquet"
  }
}
EOF

aws glue create-table \
  --database-name cdc_iceberg \
  --table-input file://customers-table-input.json \
  --region us-east-1

Step 2: Create IAM Role for Firehose

2.1 Trust Policy

cat > firehose-trust-policy.json <<'EOF'
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "firehose.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    }
  ]
}
EOF

aws iam create-role \
  --role-name FirehoseIcebergRole \
  --assume-role-policy-document file://firehose-trust-policy.json \
  --region us-east-1

2.2 Permissions Policy

ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text)

cat > firehose-policy.json <<EOF
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "s3:GetObject",
        "s3:PutObject",
        "s3:DeleteObject",
        "s3:ListBucket",
        "s3:GetBucketLocation"
      ],
      "Resource": [
        "arn:aws:s3:::cdc-iceberg-data-${ACCOUNT_ID}",
        "arn:aws:s3:::cdc-iceberg-data-${ACCOUNT_ID}/*"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
        "glue:GetDatabase",
        "glue:GetTable",
        "glue:GetTableVersion",
        "glue:GetTableVersions",
        "glue:UpdateTable"
      ],
      "Resource": [
        "arn:aws:glue:us-east-1:${ACCOUNT_ID}:catalog",
        "arn:aws:glue:us-east-1:${ACCOUNT_ID}:database/cdc_iceberg",
        "arn:aws:glue:us-east-1:${ACCOUNT_ID}:table/cdc_iceberg/customers"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
        "kafka:GetBootstrapBrokers",
        "kafka:DescribeCluster",
        "kafka:DescribeClusterV2"
      ],
      "Resource": "*"
    },
    {
      "Effect": "Allow",
      "Action": [
        "logs:CreateLogGroup",
        "logs:CreateLogStream",
        "logs:PutLogEvents"
      ],
      "Resource": "arn:aws:logs:us-east-1:${ACCOUNT_ID}:log-group:/aws/kinesisfirehose/*"
    }
  ]
}
EOF

aws iam put-role-policy \
  --role-name FirehoseIcebergRole \
  --policy-name FirehoseIcebergPolicy \
  --policy-document file://firehose-policy.json \
  --region us-east-1

Step 3: Create Firehose Delivery Stream

3.1 Get MSK Cluster ARN

MSK_ARN=$(aws kafka list-clusters-v2 \
  --region us-east-1 \
  --query 'ClusterInfoList[0].ClusterArn' \
  --output text)

echo "MSK Cluster ARN: $MSK_ARN"

3.2 Create Delivery Stream

ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text)

cat > firehose-config.json <<EOF
{
  "DeliveryStreamName": "kafka-cdc-customers-to-iceberg",
  "DeliveryStreamType": "MSKAsSource",
  "MSKSourceConfiguration": {
    "MSKClusterARN": "${MSK_ARN}",
    "TopicName": "cdc.public.customers",
    "AuthenticationConfiguration": {
      "Connectivity": "PRIVATE",
      "RoleARN": "arn:aws:iam::${ACCOUNT_ID}:role/FirehoseIcebergRole"
    }
  },
  "IcebergDestinationConfiguration": {
    "RoleARN": "arn:aws:iam::${ACCOUNT_ID}:role/FirehoseIcebergRole",
    "CatalogConfiguration": {
      "CatalogARN": "arn:aws:glue:us-east-1:${ACCOUNT_ID}:catalog"
    },
    "DestinationTableConfigurationList": [
      {
        "DestinationDatabaseName": "cdc_iceberg",
        "DestinationTableName": "customers"
      }
    ],
    "S3Configuration": {
      "RoleARN": "arn:aws:iam::${ACCOUNT_ID}:role/FirehoseIcebergRole",
      "BucketARN": "arn:aws:s3:::cdc-iceberg-data-${ACCOUNT_ID}",
      "Prefix": "customers/",
      "ErrorOutputPrefix": "errors/customers/",
      "BufferingHints": {
        "SizeInMBs": 128,
        "IntervalInSeconds": 300
      },
      "CompressionFormat": "UNCOMPRESSED",
      "CloudWatchLoggingOptions": {
        "Enabled": true,
        "LogGroupName": "/aws/kinesisfirehose/kafka-cdc-customers",
        "LogStreamName": "IcebergDelivery"
      }
    },
    "ProcessingConfiguration": {
      "Enabled": false
    },
    "BufferingHints": {
      "SizeInMBs": 128,
      "IntervalInSeconds": 300
    }
  }
}
EOF

# Create log group first
aws logs create-log-group \
  --log-group-name /aws/kinesisfirehose/kafka-cdc-customers \
  --region us-east-1

# Create delivery stream
aws firehose create-delivery-stream \
  --cli-input-json file://firehose-config.json \
  --region us-east-1

Key Configuration Parameters:

ParameterValueDescription
TopicNamecdc.public.customersKafka topic to consume
DestinationDatabaseNamecdc_icebergGlue database
DestinationTableNamecustomersGlue table
BufferingHints.SizeInMBs128Buffer size before write
BufferingHints.IntervalInSeconds300Buffer time (5 minutes)

3.3 Monitor Delivery Stream Creation

# Check status
aws firehose describe-delivery-stream \
  --delivery-stream-name kafka-cdc-customers-to-iceberg \
  --region us-east-1 \
  --query 'DeliveryStreamDescription.DeliveryStreamStatus' \
  --output text

# Expected: CREATING → ACTIVE (2-3 minutes)

Step 4: Test Data Flow

4.1 Insert Test Data

# Get RDS endpoint
RDS_ENDPOINT=$(aws cloudformation describe-stacks \
  --stack-name cdc-demo-rds \
  --region us-east-1 \
  --query 'Stacks[0].Outputs[?OutputKey==`RDSEndpoint`].OutputValue' \
  --output text)

# Insert via bastion
aws ssm send-command \
  --document-name "AWS-RunShellScript" \
  --instance-ids "i-01054a7000e6dbdbc" \
  --parameters "commands=[
    \"PGPASSWORD=Welcome2026!! psql -h ${RDS_ENDPOINT} -U postgres -d testdb -c \\\"INSERT INTO customers (name, email, phone, address, created_at) VALUES ('Alice Johnson', 'alice@example.com', '555-0101', '123 Main St', NOW()), ('Bob Smith', 'bob@example.com', '555-0102', '456 Oak Ave', NOW()), ('Carol White', 'carol@example.com', '555-0103', '789 Pine Rd', NOW());\\\"\"
  ]" \
  --region us-east-1

4.2 Wait for Buffering Interval

Firehose buffers data based on:

  • Size: 128 MB (whichever comes first)
  • Time: 300 seconds (5 minutes)

For small test data, wait 5-6 minutes.

4.3 Check Delivery Stream Metrics

# Get delivery metrics
aws cloudwatch get-metric-statistics \
  --namespace AWS/Firehose \
  --metric-name DeliveryToS3.Records \
  --dimensions Name=DeliveryStreamName,Value=kafka-cdc-customers-to-iceberg \
  --start-time $(date -u -d '10 minutes ago' +%Y-%m-%dT%H:%M:%S) \
  --end-time $(date -u +%Y-%m-%dT%H:%M:%S) \
  --period 300 \
  --statistics Sum \
  --region us-east-1

4.4 Query Data via Athena

-- Check data arrived
SELECT * FROM cdc_iceberg.customers
ORDER BY customer_id DESC
LIMIT 10;

-- Count records
SELECT COUNT(*) as total_customers FROM cdc_iceberg.customers;

-- Check recent inserts
SELECT name, email, created_at
FROM cdc_iceberg.customers
WHERE created_at > CURRENT_TIMESTAMP - INTERVAL '1' HOUR;

-- Query current state (deduplicating CDC events)
-- This handles INSERT, UPDATE, DELETE by taking the latest event per customer_id
WITH ranked_events AS (
  SELECT 
    *,
    ROW_NUMBER() OVER (PARTITION BY customer_id ORDER BY ts_ms DESC) as rn
  FROM cdc_iceberg.customers
)
SELECT customer_id, name, email, phone, address, created_at
FROM ranked_events
WHERE rn = 1 
  AND op != 'd'  -- Exclude deleted records
ORDER BY customer_id;

Handling CDC Operations in Queries:

Since Firehose uses append-only mode, you need to handle UPDATE and DELETE events in your queries:

-- Get latest version of each customer (including updates)
SELECT DISTINCT ON (customer_id)
  customer_id, name, email, phone, address, created_at
FROM cdc_iceberg.customers
ORDER BY customer_id, ts_ms DESC;

-- Count active customers (excluding deletes)
SELECT COUNT(DISTINCT customer_id) as active_customers
FROM (
  SELECT customer_id, op,
    ROW_NUMBER() OVER (PARTITION BY customer_id ORDER BY ts_ms DESC) as rn
  FROM cdc_iceberg.customers
) latest
WHERE rn = 1 AND op != 'd';

-- Audit trail: See all changes to a customer
SELECT customer_id, op, name, email, ts_ms,
  CASE op
    WHEN 'c' THEN 'Created'
    WHEN 'u' THEN 'Updated'
    WHEN 'd' THEN 'Deleted'
  END as operation
FROM cdc_iceberg.customers
WHERE customer_id = 1001
ORDER BY ts_ms;

4.5 Verify S3 Data Files

# List Iceberg data files
aws s3 ls s3://cdc-iceberg-data-519199681203/customers/data/ --recursive

# Check metadata files
aws s3 ls s3://cdc-iceberg-data-519199681203/customers/metadata/ --recursive

Step 5: Monitor and Troubleshoot

5.1 CloudWatch Logs

# Tail logs
aws logs tail /aws/kinesisfirehose/kafka-cdc-customers \
  --since 5m \
  --follow \
  --region us-east-1

# Search for errors
aws logs filter-log-events \
  --log-group-name /aws/kinesisfirehose/kafka-cdc-customers \
  --filter-pattern "ERROR" \
  --start-time $(($(date +%s) - 3600))000 \
  --region us-east-1

5.2 Common Issues

IssueCauseSolution
Stream stuck in CREATINGIAM permissionsCheck S3, Glue, MSK permissions
No data deliveredBuffering not metWait for buffer time/size threshold
Schema mismatchGlue table schema wrongUpdate Glue table schema
Access deniedIAM roleVerify Firehose role has all permissions
MSK connectivityVPC/Security groupsCheck Firehose can reach MSK

5.3 Monitor Delivery Stream Health

# Get stream details
aws firehose describe-delivery-stream \
  --delivery-stream-name kafka-cdc-customers-to-iceberg \
  --region us-east-1 \
  --query 'DeliveryStreamDescription.{Status:DeliveryStreamStatus,Created:CreateTimestamp,Source:Source.MSKSourceDescription.TopicName}'

Performance Tuning

1. Optimize Buffering

{
  "BufferingHints": {
    "SizeInMBs": 128,        // Increase for higher throughput
    "IntervalInSeconds": 60  // Decrease for lower latency
  }
}

Trade-offs:

  • Larger buffer: Fewer S3 writes, lower cost, higher latency
  • Smaller buffer: More S3 writes, higher cost, lower latency

2. Enable Data Transformation (Optional)

{
  "ProcessingConfiguration": {
    "Enabled": true,
    "Processors": [
      {
        "Type": "Lambda",
        "Parameters": [
          {
            "ParameterName": "LambdaArn",
            "ParameterValue": "arn:aws:lambda:us-east-1:123456789012:function:transform-cdc"
          }
        ]
      }
    ]
  }
}

3. Monitor Key Metrics

# Records delivered
aws cloudwatch get-metric-statistics \
  --namespace AWS/Firehose \
  --metric-name DeliveryToS3.Records \
  --dimensions Name=DeliveryStreamName,Value=kafka-cdc-customers-to-iceberg \
  --start-time $(date -u -d '1 hour ago' +%Y-%m-%dT%H:%M:%S) \
  --end-time $(date -u +%Y-%m-%dT%H:%M:%S) \
  --period 300 \
  --statistics Sum \
  --region us-east-1

# Data freshness (age of oldest record)
aws cloudwatch get-metric-statistics \
  --namespace AWS/Firehose \
  --metric-name DeliveryToS3.DataFreshness \
  --dimensions Name=DeliveryStreamName,Value=kafka-cdc-customers-to-iceberg \
  --start-time $(date -u -d '1 hour ago' +%Y-%m-%dT%H:%M:%S) \
  --end-time $(date -u +%Y-%m-%dT%H:%M:%S) \
  --period 300 \
  --statistics Average \
  --region us-east-1

Cost Analysis

Estimated Costs (100 GB/month)

Amazon Data Firehose:

  • Data ingestion: 100 GB × $0.029/GB = $2.90/month
  • Format conversion: 100 GB × $0.018/GB = $1.80/month
  • Total Firehose: $4.70/month

S3 Storage:

  • 100 GB × $0.023/GB = $2.30/month

Glue Catalog:

  • First 1M requests free
  • Minimal cost

Total: ~$7/month

Cost Comparison

SolutionMonthly CostSetup ComplexityOperational Overhead
Firehose~$7LowVery Low
MSK Connect~$316HighMedium

Savings: ~$309/month (98% reduction) for similar throughput

Limitations

  1. No Auto-Create: Glue table must exist before streaming
  2. Limited Schema Evolution: Manual Glue table updates required
  3. Static Partitioning: Cannot dynamically change partitions
  4. No Custom Transformations: Limited to Lambda functions
  5. Buffering Constraints: Minimum 60 seconds latency
  6. Append-Only Mode: No native upsert support - all CDC events appended as rows
  7. Query Complexity: Need deduplication logic to get current state

Handling CDC Events with Firehose

Unlike MSK Connect with Iceberg connector's upsert mode, Firehose writes all CDC events as append-only rows. This means:

Challenge: Multiple rows per entity

-- After: INSERT customer_id=1001
--       UPDATE customer_id=1001 (change email)
--       UPDATE customer_id=1001 (change phone)

SELECT * FROM customers WHERE customer_id = 1001;
-- Returns 3 rows (1 INSERT + 2 UPDATEs)

Solution 1: Query-time Deduplication

-- Get latest state per customer
SELECT DISTINCT ON (customer_id) *
FROM customers
ORDER BY customer_id, ts_ms DESC;

Solution 2: Materialized Views

-- Create view with latest state
CREATE OR REPLACE VIEW customers_current AS
SELECT customer_id, name, email, phone, address, created_at
FROM (
  SELECT *, ROW_NUMBER() OVER (PARTITION BY customer_id ORDER BY ts_ms DESC) as rn
  FROM customers
) WHERE rn = 1 AND op != 'd';

Solution 3: Periodic Compaction

# AWS Glue job or Lambda to compact Iceberg table
import pyiceberg
from pyiceberg.catalog import load_catalog

catalog = load_catalog("glue")
table = catalog.load_table("cdc_iceberg.customers")

# Compact to latest state only
table.rewrite_data_files(
    target_file_size_bytes=128 * 1024 * 1024,
    min_input_files=2
)

Recommendation: For tables with frequent updates, consider MSK Connect with upsert mode instead of Firehose.

Best Practices

  1. Pre-create Glue Tables: Define schema before starting stream
  2. Monitor Data Freshness: Set CloudWatch alarms for delivery lag
  3. Optimize Buffer Size: Balance latency vs cost
  4. Enable Error Logging: Always configure CloudWatch Logs
  5. Test Schema Changes: Validate in non-production first
  6. Use S3 Lifecycle: Archive old data to Glacier
  7. Set Up Alerts: Monitor delivery failures

Conclusion

Amazon Data Firehose provides a cost-effective, low-maintenance alternative to MSK Connect for streaming Kafka CDC data to Iceberg. While it has some limitations compared to MSK Connect, it's ideal for:

✅ Simple CDC pipelines
✅ Cost-sensitive workloads
✅ Low operational overhead requirements
✅ Predictable schema
✅ Acceptable 1-5 minute latency

For the customers table with stable schema and moderate throughput, Firehose delivers 98% cost savings compared to MSK Connect while maintaining data quality and query performance.

Next Steps

  • Compare MSK Connect vs Firehose performance (Blog Post #7)
  • Implement monitoring and alerting
  • Set up data quality checks
  • Configure retention policies
  • Optimize query performance

Resources

No comments:

Post a Comment