Redshift - 并发写入 - 如果动态创建查询,则插入不起作用

Posted

技术标签:

【中文标题】Redshift - 并发写入 - 如果动态创建查询,则插入不起作用【英文标题】:Redshift - Concurrent Write - Insert does not work if query is dynamicly created 【发布时间】:2020-03-03 15:51:18 【问题描述】:

我有一个存储过程:

CREATE OR REPLACE PROCEDURE public.lock_users(j_id "varchar",order_id "varchar",order_detail_id "varchar",insert_qry varchar(65535),rec_per_order "varchar")
    LANGUAGE plpgsql
AS $$       
declare 
lc_stmt varchar(65535);
BEGIN 

    lc_stmt = 'INSERT INTO test.USER_LOCK 
            SELECT '''||$1||''', '''||$2||''', '''||$3||''', user_id,cast(TIMEOFDAY() as timestamp) FROM ('||insert_qry|| 'AND USER_ID NOT IN (SELECT USER_ID FROM test.USER_LOCK)) WHERE ORDER_CNT <='||rec_per_order||'))';

    EXECUTE ''||lc_stmt||'';
END
  $$
;

从上述过程生成的一个示例查询是:

INSERT INTO test.USER_LOCK
SELECT '657d7563-6de4-4dc9-ac74-3c23adf7a4e9', 'DSS-12345', 'DSS-74523-4-7569',
USER_ID,cast(TIMEOFDAY() as timestamp) 
FROM (
SELECT USER_ID FROM (
SELECT * FROM (
SELECT XA.USER_ID, XA.EMAIL_ID,YA.COMPANY_NAME
rank() OVER (PARTITION BY XA.account_id ORDER BY XA.account_id) ORDER_CNT 
FROM test.contacts_20 XA 
LEFT JOIN test.accounts_20 YA
ON XA.ACCOUNT_ID = YA.ACCOUNT_ID  
AND XA.COUNTRY = YA.COUNTRY 
WHERE XA.IS_CONTACT_SUPPRESSED = 0  
AND UPPER(XA.TELE_SUPPRESSION_LOB) != UPPER('DSS') 
AND XA.TELE_SUPPRESSION_LOB != 'BOTH' 
AND XA.IS_TELE_VERIFIED = 1 
AND XA.IS_TELE_SUPPRESSED = 0 
AND UPPER(PHONE_LINE) = 'DIRECT' 
AND XA.COUNTRY IN (
SELECT INCLUSION_VALUE FROM user_inc_list
WHERE JOB_ID = '657d7563-6de4-4dc9-ac74-3c23adf7a4e9' 
AND UPPER(INCLUSION_TYPE) = 'COUNTRY') 
AND XA.COUNTRY NOT IN (
SELECT EXCLUSION_VALUE FROM user_exc_list 
WHERE JOB_ID = '657d7563-6de4-4dc9-ac74-3c23adf7a4e9' 
AND UPPER(EXCLUSION_TYPE) = 'COUNTRY') 
AND XA.USER_ID NOT IN (
SELECT USER_ID FROM test.user_lead_20
WHERE (CURRENT_DATE - creation_date::date) <= 60 AND UPPER(LOB) != 'DSS' AND AGENCY_ID != '1456') 
AND XA.USER_ID NOT IN (
SELECT USER_ID FROM test.user_lead_20 
WHERE (CURRENT_DATE - creation_date::date) <= 60 AND UPPER(LOB) != 'DSS' AND SPONSOR_ID != '8659') 
AND USER_ID NOT IN (
select USER_ID 
from user_e_history 
where sf_campaign_id = 'DSS-12345' AND (CURRENT_DATE - creation_date::date) >= 7 AND channel = 'TELE') 
AND USER_ID NOT IN (
select USER_ID from user_e_history 
where creation_date::date = CURRENT_DATE AND channel = 'TELE' ) 
AND USER_ID NOT IN (
select USER_ID from test.user_lead_20
where sf_campaign_id = 'DSS-12345' GROUP BY USER_ID,"DOMAIN" HAVING COUNT(*) >= 3 ) 
AND USER_ID NOT IN (
select USER_ID from test.user_lead_20 
where AGENCY_ID = 1456 and (CURRENT_DATE - creation_date::date) <= 180 ) 
AND XA.E_domain NOT LIKE '%.gov'
AND USER_ID NOT IN (
SELECT USER_ID FROM test.USER_LOCK)) WHERE ORDER_CNT <=20));

当我并行执行这个存储过程时,它给了我这个错误:

SQL Error [500310] [XX000]: [Amazon](500310) Invalid operation: 1023
Details: 
 Serializable isolation violation on table - 132075, transactions forming the cycle are: 2040186, 2040187 (pid:14687);

当我更改存储过程而不是传递参数并在查询中创建固定插入时,它可以工作。

这是有效的存储过程:

CREATE OR REPLACE PROCEDURE public.new_procedure(type_value "varchar")
    LANGUAGE plpgsql
AS $$   
declare 
lc_stmt varchar;
BEGIN 
    lc_stmt = 'INSERT into temp_table 
    select ct_id,email_id,first_name,last_name from users where active_type = '''||$1||''' ';

    EXECUTE ''||lc_stmt||'';
END
 $$
;

我无法理解此问题的原因和解决方案。请帮忙。

【问题讨论】:

嘿,你能分享一下参数的值和你可以并行执行的过程的非动态版本吗?另外,您如何并行执行它们?调度? 我添加了生成的 SQL 查询。我将很快分享非动态版本。对于执行,如果 2 个用户同时并行执行同一个存储过程,则会引发错误。 “工作”版本中的 select 语句看起来与动态版本完全不同,比如没有 user_id 等。我说的对吗? 是的,我创建它是为了测试它。 好的,知道了。添加了解释,看看是否有帮助。 【参考方案1】:

动态过程中的插入查询会查找不在 user_lock 表中的 user_id 并插入结果。两个动态版本的结果集中似乎有共同的user_id。

所以假设当你的第一个版本被执行时,它可能会在 user_lock 表中添加一个 user_id,同样的 user_id 也可能在结果集中被第二个版本插入到 user_lock 表中。

因此,取决于哪个版本首先运行,这两个版本的结果集与串行执行的情况相比会有所不同,即它们不是“可序列化隔离的”。

而且您在测试示例中没有收到错误,因为它是一种独立的插入(用户和 temp_table)。

尝试LOCK 可能会解决此问题。

【讨论】:

是否可以使用 BEGIN...END 让其他存储过程等待? 我认为应该。试试吧,让我们知道 就是这样,它不起作用。你能给我正确的代码片段来试试吗? 这样的?开始;锁定public.test;从 public.test 中选择 *;结束;

以上是关于Redshift - 并发写入 - 如果动态创建查询,则插入不起作用的主要内容,如果未能解决你的问题,请参考以下文章

amazon redshift 并发写入导致插入记录,导致重复

redshift data.frame 没有被写入

将数据从 R 写入 Redshift 问题

psycopg2、Redshift 和 unittest 的并发问题

通过数据管道将错误消息存储到 Redshift

如何在写入 Redshift DW 之前从 S3 存储桶转换数据?