merge/upsert操作

线上环境对数据做更新是很常见的场景,比如业务每天都会产生新数据到S3,或者对原来的数据做更新。要把数据更新到Redshift,这就需要设计数据增量更新到Redshift的方案

merge/upsert

Merge 语句经常用在增量数据导入和数据更新的场景,我们可以同时更新(update)已经存在的记录和插入(insert)新的记录,需要注意的是有时候update insert 也被称为 UPSERT

一张表在一条语句里面既可以被更新,也可以被插入。是否被更新还是插入取决于search condition的结果和指定的merge when matched clause(当condition匹配时做什么操作)和merge when not matched clause(当condition不匹配时做什么操作)语法。

MERGE INTO 命令涉及到两张表,目标表是被插入或者更新的表,源表用于跟目标表进行匹配的表,即目标表的数据来源。MERGE INTO语句将目标表和源表中数据针对关联条件进行匹配,若关联条件匹配时对目标表进行UPDATE,无法匹配时对目标表执行INSERT。

使用场景:当业务中需要将一个表中大量数据添加到现有表时,使用MERGE INTO 可以高效地将数据导入,避免多次INSERT + UPDATE操作。同步到数仓时,大多采用merge数据,新增的就insert进去,如果是更新的,就直接对原有数据进行更新就好

但是Redshift目前不支持merge/upsert语法,只能通过stage表实现类似的逻辑

Redshift实现merge/upsert示例

我们将基于users表来演示如何进行merge操作,首先要把数据上传到S3

导入数据

kongpingfan:~/environment/redshift $ mkdir ticketdb && cd ticketdb
kongpingfan:~/environment/redshift/ticketdb $ wget https://docs.aws.amazon.com/redshift/latest/gsg/samples/tickitdb.zip

kongpingfan:~/environment/redshift/ticketdb $ unzip tickitdb.zip
kongpingfan:~/environment/redshift/ticketdb $ aws s3 sync . s3://redshift-kpf/tickitdb/

image-20221107144413557

创建schema及users表,然后导入数据:

create schema tickitdb;
set search_path tickitdb,public;

create table users(
	userid integer not null distkey sortkey,
	username char(8),
	firstname varchar(30),
	lastname varchar(30),
	city varchar(30),
	state char(2),
	email varchar(100),
	phone char(14),
	likesports boolean,
	liketheatre boolean,
	likeconcerts boolean,
	likejazz boolean,
	likeclassical boolean,
	likeopera boolean,
	likerock boolean,
	likevegas boolean,
	likebroadway boolean,
	likemusicals boolean);
 
copy users from 's3://<myBucket>/tickit/allusers_pipe.txt' 
iam_role 'xxxx' 
delimiter '|' region '<aws-region>';

select count(*) from users;

image-20221107144656150

实现merge操作

执行以下语句实现merge操作:

-- 创建一张临时表,注意这张临时表在结束后会自动被删除,即使没有明显使用drop语句。这张表的结构和users一样
create temp table stage (like users);

-- 往临时表里写一条users表里数据,模拟更新数据的场景
insert into stage
select * from users where userid = 1;
-- 将firstname更新
update stage set firstname = firstname + ' - Updated' where userid = 1;

-- 在进行 deleting / inserting前,先启动事务
begin transaction;

-- 删除users里重复的数据,用更新的数据来替换它
delete from users
using stage
where users.userid = stage.userid;

insert into users
select * from stage;

end transaction;

drop table stage;

确认原表的数据已被更新:

select * from users where userid=1;

image-20221109063111982