Sunday, July 17, 2022

Use Merge feature in PostgreSQL 15 to merge CDC staging table into target table

 Setup

drop table if exists account_stage;

create table account_stage (
    id int,
    name varchar(10),
    last_updated timestamptz,
    operation char(1),
    constraint account_stage_pk primary key (id,last_updated)
);
insert into account_stage values(1,'name1a','2022-01-01','U');
insert into account_stage values(1,'name1a','2022-01-02','D');
insert into account_stage values(2,'name2a','2022-01-01','U');
insert into account_stage values(2,'name2b','2022-01-02','U');
insert into account_stage values(5,'name5','2022-01-02','I');

drop table if exists account;

create table account (
    id int,
    name varchar(10),
    last_updated timestamptz,
    constraint account_pk primary key (id)
);
insert into account values(1,'name1','2021-12-31');
insert into account values(2,'name2','2021-12-31');
insert into account values(3,'name3','2021-12-31');
insert into account values(4,'name4','2021-12-31');
test=# select * from account_stage;
 id |  name  |      last_updated      | operation
----+--------+------------------------+-----------
  1 | name1a | 2022-01-01 00:00:00+00 | U
  1 | name1a | 2022-01-02 00:00:00+00 | D
  2 | name2a | 2022-01-01 00:00:00+00 | U
  2 | name2b | 2022-01-02 00:00:00+00 | U
  5 | name5  | 2022-01-02 00:00:00+00 | I
(5 rows)

test=# select * from account;
 id | name  |      last_updated
----+-------+------------------------
  1 | name1 | 2021-12-31 00:00:00+00
  2 | name2 | 2021-12-31 00:00:00+00
  3 | name3 | 2021-12-31 00:00:00+00
  4 | name4 | 2021-12-31 00:00:00+00
(4 rows)

Get the net changes from staging table

select distinct id, 
       first_value(name) over w as name, 
       first_value(last_updated) over w as last_updated,
       first_value(operation) over w as operation
from account_stage 
    window w as (partition by id order by last_updated desc)
order by id;
test=# select distinct id,
test-#        first_value(name) over w as name,
test-#        first_value(last_updated) over w as last_updated,
test-#        first_value(operation) over w as operation
test-# from account_stage
test-#     window w as (partition by id order by last_updated desc)
test-# order by id;
 id |  name  |      last_updated      | operation
----+--------+------------------------+-----------
  1 | name1a | 2022-01-02 00:00:00+00 | D
  2 | name2b | 2022-01-02 00:00:00+00 | U
  5 | name5  | 2022-01-02 00:00:00+00 | I
(3 rows)

Use Merge feature to merge the changes from staging table into target table

merge into account
using
(
  select distinct id, 
       first_value(name) over w as name, 
       first_value(last_updated) over w as last_updated,
       first_value(operation) over w as operation
from account_stage 
    window w as (partition by id order by last_updated desc)
order by id
) cdc
on account.id=cdc.id
when not matched and cdc.operation='I' then
  insert values(cdc.id,cdc.name,cdc.last_updated)
when matched and cdc.operation='D' then
  delete
when matched and cdc.operation='U' then
  update set name=cdc.name, 
             last_updated=cdc.last_updated
;
test=# merge into account
test-# using
test-# (
test(#   select distinct id,
test(#        first_value(name) over w as name,
test(#        first_value(last_updated) over w as last_updated,
test(#        first_value(operation) over w as operation
test(# from account_stage
test(#     window w as (partition by id order by last_updated desc)
test(# order by id
test(# ) cdc
test-# on account.id=cdc.id
test-# when not matched and cdc.operation='I' then
test-#   insert values(cdc.id,cdc.name,cdc.last_updated)
test-# when matched and cdc.operation='D' then
test-#   delete
test-# when matched and cdc.operation='U' then
test-#   update set name=cdc.name,
test-#              last_updated=cdc.last_updated
test-# ;
MERGE 3
test=# select * from account order by id;
 id |  name  |      last_updated
----+--------+------------------------
  2 | name2b | 2022-01-02 00:00:00+00
  3 | name3  | 2021-12-31 00:00:00+00
  4 | name4  | 2021-12-31 00:00:00+00
  5 | name5  | 2022-01-02 00:00:00+00
(4 rows)

Reference:

No comments:

Post a Comment