并行运行 PL/SQL 调用并等待执行完成(fork 和 join)

Posted

技术标签:

【中文标题】并行运行 PL/SQL 调用并等待执行完成(fork 和 join)【英文标题】:Running PL/SQL calls in parallel and wait for execution to finish (fork and join) 【发布时间】:2014-10-23 15:50:32 【问题描述】:

在遗留系统中,有一些 PL/SQL 过程使用不同的参数多次调用另一个过程。该过程包含大量 PL/SQL 逻辑(if、then、else)。

由于这个过程的执行时间很长,我们考虑在不触及实际逻辑的情况下使用并发来加快速度。

我了解在 oracle 上有多种并行运行 (PL/)SQL 的方法(见下文)。

但是,我无法找到一种方法将不同的参数/参数传递给 PL/SQL 过程,并行执行它们并等待所有过程完成执行(即我'我正在寻找join所有线程的机制或在oracle中寻找barrier机制。

让我们在 SCOTT Schema 上使用以下简化示例:

DECLARE

PROCEDURE DELETE_BONUS(
    in_job IN VARCHAR2)
IS
BEGIN
  -- Imagine a lot of IF, ELSEIF, ELSE statements here
  DELETE FROM BONUS WHERE JOB=in_job;
END;

BEGIN 
 INSERT into BONUS(ENAME, JOB) SELECT ROWNUM, 'A' FROM DUAL CONNECT BY LEVEL <= 1000000;
 INSERT into BONUS(ENAME, JOB) SELECT ROWNUM, 'B' FROM DUAL CONNECT BY LEVEL <= 1000000;
 INSERT into BONUS(ENAME, JOB) SELECT ROWNUM, 'C' FROM DUAL CONNECT BY LEVEL <= 1000000;

 -- TODO execute those in parallel
 DELETE_BONUS('A');
 DELETE_BONUS('B');
 DELETE_BONUS('C');
 -- TODO wait for all procedures to finish

EXCEPTION
WHEN OTHERS THEN
  RAISE;
END;
/

这是我目前发现的:

    DBMS_JOB(已弃用) DBMS_SCHEDULER(如何等待作业完成?LOCKS?) DBMS_SCHEDULER CHAINS (passing parameters/arguments is not really possible?!) DBMS_PARALLEL_EXECUTE(可用于并行运行 SQL 查询,但不能用于 PL/SQL 过程)

可以使用其中一种方法来分叉和加入过程调用吗?或者还有其他方法可以吗?

【问题讨论】:

为什么调度器不能用于创建作业、调度它们并创建依赖?有什么具体原因吗? 可以使用调度器。但是我该如何等待作业完成呢? 等等,让我正确理解你。如果你真的想走程序化的方式,让下一个操作只有在当前操作结束后才开始,那么只需将作业串行作为父作业,并安排父作业。 你见过这个吗? ***.com/questions/15296289/… 仅供参考,您可以使用 dbms_parallel_execute 并行运行自定义过程(我在我的系统上执行此操作,效果很好) 【参考方案1】:

我使用 DBMS_SCHEDULER 和 PIPE 进行同步/IPC 解决了这个问题,它不依赖轮询并且不需要额外的表。不过,它仍然会在每个完成的工作中唤醒一次。

这是相当的努力,所以如果有人可以提出更简单的解决方案,请分享!

    定义一个调用可以运行的实际过程的过程 来自程序/作业并处理 IPC(完成后将消息写入管道)。 定义一个调用此过程并定义要传递给该过程的参数的程序 定义从程序创建作业、将参数映射到作业参数并运行作业的过程 定义等待所有作业完成的逻辑:等待每个作业都在管道上发送消息。
--
-- Define stored procedures to be executed by job
--

/** Actual method that should be run in parallel*/
CREATE OR REPLACE PROCEDURE PROC_DELETE_TEST_BONUS(
    in_job IN VARCHAR2)
IS
BEGIN
   -- Imagine a lot of IF, ELSEIF, ELSE statements here
  DELETE FROM TEST_BONUS WHERE JOB=in_job;
END;
/

/** Stored procedure to be run from the job: Uses pipes for job synchronization, executes PROC_DELETE_TEST_BONUS. */
CREATE OR REPLACE PROCEDURE PROC_DELETE_TEST_BONUS_CONCUR(in_pipe_name IN VARCHAR2,
    in_job IN VARCHAR2)
IS
 flag INTEGER;
BEGIN
  -- Execute actual procedure
  PROC_DELETE_TEST_BONUS(in_job);

 -- Signal completion
 -- Use the procedure to put a message in the local buffer. 
  DBMS_PIPE.PACK_MESSAGE(SYSDATE ||': Success ' ||in_job);
  -- Send message, success is a zero return value.
  flag := DBMS_PIPE.SEND_MESSAGE(in_pipe_name);
EXCEPTION
WHEN OTHERS THEN
  -- Signal completion
  -- Use the procedure to put a message in the local buffer. 
  DBMS_PIPE.PACK_MESSAGE(SYSDATE ||':Failed ' || in_job);
  -- Send message, success is a zero return value.
  flag := DBMS_PIPE.SEND_MESSAGE(in_pipe_name);
  RAISE;
END;
/


--
-- Run Jobs
--
DECLARE
  timestart NUMBER;
  duration_insert NUMBER;
  jobs_amount NUMBER := 0;
  retval INTEGER;
  message     VARCHAR2(4000);
  rows_amount NUMBER;

/** Create and define a program that calls PROG_DELETE_TEST_BONUS_CONCUR to be run as job. */
PROCEDURE create_prog_delete_test_bonus
IS  
BEGIN
  -- define new in each run in order to ease development. TODO Once it works, no need to redefine for each run!
  dbms_scheduler.drop_program(program_name => 'PROG_DELETE_TEST_BONUS_CONCUR', force=> TRUE);

  dbms_scheduler.create_program ( program_name => 'PROG_DELETE_TEST_BONUS_CONCUR', program_action =>
  'PROC_DELETE_TEST_BONUS_CONCUR', program_type => 'STORED_PROCEDURE', number_of_arguments => 2,
  enabled => FALSE );

  dbms_scheduler.DEFINE_PROGRAM_ARGUMENT( program_name => 'PROG_DELETE_TEST_BONUS_CONCUR',
  argument_position => 1, argument_name => 'in_pipe_name', argument_type => 'VARCHAR2');
  dbms_scheduler.DEFINE_PROGRAM_ARGUMENT( program_name=>'PROG_DELETE_TEST_BONUS_CONCUR',
  argument_position => 2, argument_name => 'in_job', argument_type => 'VARCHAR2');

  dbms_scheduler.enable('PROG_DELETE_TEST_BONUS_CONCUR');
END;

/** "Forks" a job that runs PROG_DELETE_TEST_BONUS_CONCUR */
PROCEDURE RUN_TEST_BONUS_JOB(
    in_pipe_name IN VARCHAR2,
    in_job    IN VARCHAR2,
    io_job_amount IN OUT NUMBER)
IS
  jobname VARCHAR2(100);
BEGIN
  jobname:=DBMS_SCHEDULER.GENERATE_JOB_NAME;

  dbms_scheduler.create_job(job_name => jobname, program_name =>
  'PROG_DELETE_TEST_BONUS_CONCUR');
  dbms_scheduler.set_job_argument_value(job_name => jobname, argument_name => 
  'in_pipe_name' , argument_value => in_pipe_name);
  dbms_scheduler.set_job_argument_value(job_name => jobname, argument_name => 
  'in_job' , argument_value => in_job);
  dbms_output.put_line(SYSDATE || ': Running job: '|| jobname);

  dbms_scheduler.RUN_JOB(jobname, false );
  io_job_amount:= io_job_amount+1;
END;


-- Anonymous "Main" block
BEGIN 
  create_prog_delete_test_bonus;

  -- Define private pipe
  retval := DBMS_PIPE.CREATE_PIPE(DBMS_PIPE.UNIQUE_SESSION_NAME, 100, FALSE);  
  dbms_output.put_line(SYSDATE || ': Created pipe: ''' || DBMS_PIPE.UNIQUE_SESSION_NAME || ''' returned ' ||retval);

  timestart := dbms_utility.get_time();
  INSERT into TEST_BONUS(ENAME, JOB) SELECT ROWNUM, 'A' FROM DUAL CONNECT BY LEVEL <= 1000000;
  INSERT into TEST_BONUS(ENAME, JOB) SELECT ROWNUM, 'B' FROM DUAL CONNECT BY LEVEL <= 1000000;
  INSERT into TEST_BONUS(ENAME, JOB) SELECT ROWNUM, 'C' FROM DUAL CONNECT BY LEVEL <= 1000000;
  COMMIT;
  duration_insert := dbms_utility.get_time() -  timestart;
  dbms_output.put_line(SYSDATE || ': Duration (1/100s): INSERT=' || duration_insert);

  SELECT COUNT(*) INTO rows_amount FROM TEST_BONUS;
  dbms_output.put_line(SYSDATE || ': COUNT(*) FROM TEST_BONUS: ' || rows_amount);

  timestart := dbms_utility.get_time();

--  -- Process sequentially
-- PROC_DELETE_TEST_BONUS('A');
-- PROC_DELETE_TEST_BONUS('B');
-- PROC_DELETE_TEST_BONUS('C');

  -- start concurrent processing 
  RUN_TEST_BONUS_JOB(DBMS_PIPE.UNIQUE_SESSION_NAME, 'A', jobs_amount);
  RUN_TEST_BONUS_JOB(DBMS_PIPE.UNIQUE_SESSION_NAME, 'B', jobs_amount);
  RUN_TEST_BONUS_JOB(DBMS_PIPE.UNIQUE_SESSION_NAME, 'C', jobs_amount);

  -- "Barrier": Wait for all jobs to finish
  for i in 1 .. jobs_amount loop
    -- Reset the local buffer.
    DBMS_PIPE.RESET_BUFFER;

    -- Wait and receive message. Timeout after an hour.
    retval := SYS.DBMS_PIPE.RECEIVE_MESSAGE(SYS.DBMS_PIPE.UNIQUE_SESSION_NAME, 3600);
    -- Handle errors: timeout, etc.
    IF retval != 0 THEN
      raise_application_error(-20000, 'Error: '||to_char(retval)||' receiving on pipe. See Job Log in table user_scheduler_job_run_details');
    END IF;
    -- Read message from local buffer.
    DBMS_PIPE.UNPACK_MESSAGE(message);
    dbms_output.put_line(SYSDATE || ': Received message on '''|| DBMS_PIPE.UNIQUE_SESSION_NAME ||''' (Status='|| retval ||'): ' || message); 
  end loop;

  dbms_output.put(SYSDATE || ': Duration (1/100s): DELETE=');
  dbms_output.put_line(dbms_utility.get_time() -  timestart);

  SELECT COUNT(*) INTO rows_amount FROM TEST_BONUS;
  dbms_output.put_line(SYSDATE || ': COUNT(*) FROM TEST_BONUS: ' || rows_amount);

  retval :=DBMS_PIPE.REMOVE_PIPE(DBMS_PIPE.UNIQUE_SESSION_NAME);
  dbms_output.put_line(systimestamp || ': REMOVE_PIPE: ''' || DBMS_PIPE.UNIQUE_SESSION_NAME || ''' returned: ' ||retval);
EXCEPTION
WHEN OTHERS THEN
  dbms_output.put_line(SYSDATE ||  SUBSTR(SQLERRM, 1, 1000) || ' ' ||
                                          SUBSTR(DBMS_UTILITY.FORMAT_ERROR_BACKTRACE, 1, 1000));
  retval := DBMS_PIPE.REMOVE_PIPE(DBMS_PIPE.UNIQUE_SESSION_NAME);
  dbms_output.put_line(SYSDATE || ': REMOVE_PIPE: ''' || DBMS_PIPE.UNIQUE_SESSION_NAME || ''' returned: ' ||retval);

  -- Clean up in case of error
  PROC_DELETE_TEST_BONUS('A');
  PROC_DELETE_TEST_BONUS('B');
  PROC_DELETE_TEST_BONUS('C');
  RAISE;
END;
/

您应该始终牢记,在作业中执行的更改是在单独的事务中提交的。

只是为了了解这种并发性实现了什么,这里有一些平均测量值:问题中的顺序代码大约需要 60 秒才能完成,并行代码大约需要 40 秒。 当并行运行的作业超过三个时,这将是一个有趣的进一步调查。

PS 以下是一个有用的查询,可帮助您了解作业的状态

SELECT job_name,
     destination,
     TO_CHAR(actual_start_date) AS actual_start_date,
     run_duration,
     TO_CHAR((ACTUAL_START_DATE+run_duration)) AS actual_end_date,
     status,
     error#,
     ADDITIONAL_INFO
FROM   user_scheduler_job_run_details
ORDER BY actual_start_date desc;

【讨论】:

【参考方案2】:

只是想添加一些关于 Oracle 的 DBMS_PARALLEL_EXECUTE 包的注释。

这不仅可以用于更新表,但许多示例都显示了这个简单的用例。

诀窍是使用匿名块而不是 DML 语句,其余示例仍然相关。所以,而不是这个:

l_sql_stmt := 'update EMPLOYEES e 
      SET e.salary = e.salary + 10
      WHERE manager_id between :start_id and :end_id';

我们可能有这个:

l_sql_stmt := 'BEGIN my_package.some_procedure(:start_id, :end_id); END;';

示例的其余部分可以在“Chunk by User-Provided SQL”中找到example section

您仍然需要告诉 Oracle 每个进程的开始/结束 ID(使用 CREATE_CHUNKS_BY_SQL),我通常将它们存储在单独的查找表中(如果预定义),或者您可以提供返回一组的 SQL 查询开始/结束值。对于后一种方法,请尝试使用 NTILE。例如,使用 8 个块:

select min(id) as start_id, max(id) as end_id
from (
  select id, ntile(8) over (order by 1) bucket
  from some_table
  where some_clause...
)
group by bucket
order by bucket;

希望有帮助

【讨论】:

【参考方案3】:

如果是,你可以试试这个:

创建一个包含任务(和任务参数)的新表 创建将从表中读取参数的过程,将它们传递给“旧过程”,然后在处理后更新任务表(使用自主事务)以显示处理已结束 创建任务的外部过程可以扫描任务表以获取有关进度的信息。您可以使用 DBMS_LOCK.SLEEP 等待。

【讨论】:

这听起来可行。但是,我正在寻找一种省力(无需额外表格)和正确信号(无需任意睡眠和轮询)的解决方案。我在回答中描述了一种可能但复杂的解决方案:***.com/a/26610201/1845976

以上是关于并行运行 PL/SQL 调用并等待执行完成(fork 和 join)的主要内容,如果未能解决你的问题,请参考以下文章

在 PL/SQL 中并行化调用

有没有办法保证 PL/SQL 工作的 DOP?

使用不同的参数并行运行相同的函数,并知道哪个并行运行在 python 中结束了

PL/SQL 块需要大量时间来执行

Oracle PL/SQL 并行执行 [重复]

如何将python for循环从顺序转换为并行运行