并行运行 PL/SQL 调用并等待执行完成(fork 和 join)
Posted
技术标签:
【中文标题】并行运行 PL/SQL 调用并等待执行完成(fork 和 join)【英文标题】:Running PL/SQL calls in parallel and wait for execution to finish (fork and join) 【发布时间】:2014-10-23 15:50:32 【问题描述】:在遗留系统中,有一些 PL/SQL 过程使用不同的参数多次调用另一个过程。该过程包含大量 PL/SQL 逻辑(if、then、else)。
由于这个过程的执行时间很长,我们考虑在不触及实际逻辑的情况下使用并发来加快速度。
我了解在 oracle 上有多种并行运行 (PL/)SQL 的方法(见下文)。
但是,我无法找到一种方法将不同的参数/参数传递给 PL/SQL 过程,并行执行它们并等待所有过程完成执行(即我'我正在寻找join所有线程的机制或在oracle中寻找barrier机制。
让我们在 SCOTT Schema 上使用以下简化示例:
DECLARE
PROCEDURE DELETE_BONUS(
in_job IN VARCHAR2)
IS
BEGIN
-- Imagine a lot of IF, ELSEIF, ELSE statements here
DELETE FROM BONUS WHERE JOB=in_job;
END;
BEGIN
INSERT into BONUS(ENAME, JOB) SELECT ROWNUM, 'A' FROM DUAL CONNECT BY LEVEL <= 1000000;
INSERT into BONUS(ENAME, JOB) SELECT ROWNUM, 'B' FROM DUAL CONNECT BY LEVEL <= 1000000;
INSERT into BONUS(ENAME, JOB) SELECT ROWNUM, 'C' FROM DUAL CONNECT BY LEVEL <= 1000000;
-- TODO execute those in parallel
DELETE_BONUS('A');
DELETE_BONUS('B');
DELETE_BONUS('C');
-- TODO wait for all procedures to finish
EXCEPTION
WHEN OTHERS THEN
RAISE;
END;
/
这是我目前发现的:
-
DBMS_JOB(已弃用)
DBMS_SCHEDULER(如何等待作业完成?LOCKS?)
DBMS_SCHEDULER CHAINS (passing parameters/arguments is not really possible?!)
DBMS_PARALLEL_EXECUTE(可用于并行运行 SQL 查询,但不能用于 PL/SQL 过程)
可以使用其中一种方法来分叉和加入过程调用吗?或者还有其他方法可以吗?
【问题讨论】:
为什么调度器不能用于创建作业、调度它们并创建依赖?有什么具体原因吗? 可以使用调度器。但是我该如何等待作业完成呢? 等等,让我正确理解你。如果你真的想走程序化的方式,让下一个操作只有在当前操作结束后才开始,那么只需将作业串行作为父作业,并安排父作业。 你见过这个吗? ***.com/questions/15296289/… 仅供参考,您可以使用 dbms_parallel_execute 并行运行自定义过程(我在我的系统上执行此操作,效果很好) 【参考方案1】:我使用 DBMS_SCHEDULER 和 PIPE 进行同步/IPC 解决了这个问题,它不依赖轮询并且不需要额外的表。不过,它仍然会在每个完成的工作中唤醒一次。
这是相当的努力,所以如果有人可以提出更简单的解决方案,请分享!
-
定义一个调用可以运行的实际过程的过程
来自程序/作业并处理 IPC(完成后将消息写入管道)。
定义一个调用此过程并定义要传递给该过程的参数的程序
定义从程序创建作业、将参数映射到作业参数并运行作业的过程
定义等待所有作业完成的逻辑:等待每个作业都在管道上发送消息。
--
-- Define stored procedures to be executed by job
--
/** Actual method that should be run in parallel*/
CREATE OR REPLACE PROCEDURE PROC_DELETE_TEST_BONUS(
in_job IN VARCHAR2)
IS
BEGIN
-- Imagine a lot of IF, ELSEIF, ELSE statements here
DELETE FROM TEST_BONUS WHERE JOB=in_job;
END;
/
/** Stored procedure to be run from the job: Uses pipes for job synchronization, executes PROC_DELETE_TEST_BONUS. */
CREATE OR REPLACE PROCEDURE PROC_DELETE_TEST_BONUS_CONCUR(in_pipe_name IN VARCHAR2,
in_job IN VARCHAR2)
IS
flag INTEGER;
BEGIN
-- Execute actual procedure
PROC_DELETE_TEST_BONUS(in_job);
-- Signal completion
-- Use the procedure to put a message in the local buffer.
DBMS_PIPE.PACK_MESSAGE(SYSDATE ||': Success ' ||in_job);
-- Send message, success is a zero return value.
flag := DBMS_PIPE.SEND_MESSAGE(in_pipe_name);
EXCEPTION
WHEN OTHERS THEN
-- Signal completion
-- Use the procedure to put a message in the local buffer.
DBMS_PIPE.PACK_MESSAGE(SYSDATE ||':Failed ' || in_job);
-- Send message, success is a zero return value.
flag := DBMS_PIPE.SEND_MESSAGE(in_pipe_name);
RAISE;
END;
/
--
-- Run Jobs
--
DECLARE
timestart NUMBER;
duration_insert NUMBER;
jobs_amount NUMBER := 0;
retval INTEGER;
message VARCHAR2(4000);
rows_amount NUMBER;
/** Create and define a program that calls PROG_DELETE_TEST_BONUS_CONCUR to be run as job. */
PROCEDURE create_prog_delete_test_bonus
IS
BEGIN
-- define new in each run in order to ease development. TODO Once it works, no need to redefine for each run!
dbms_scheduler.drop_program(program_name => 'PROG_DELETE_TEST_BONUS_CONCUR', force=> TRUE);
dbms_scheduler.create_program ( program_name => 'PROG_DELETE_TEST_BONUS_CONCUR', program_action =>
'PROC_DELETE_TEST_BONUS_CONCUR', program_type => 'STORED_PROCEDURE', number_of_arguments => 2,
enabled => FALSE );
dbms_scheduler.DEFINE_PROGRAM_ARGUMENT( program_name => 'PROG_DELETE_TEST_BONUS_CONCUR',
argument_position => 1, argument_name => 'in_pipe_name', argument_type => 'VARCHAR2');
dbms_scheduler.DEFINE_PROGRAM_ARGUMENT( program_name=>'PROG_DELETE_TEST_BONUS_CONCUR',
argument_position => 2, argument_name => 'in_job', argument_type => 'VARCHAR2');
dbms_scheduler.enable('PROG_DELETE_TEST_BONUS_CONCUR');
END;
/** "Forks" a job that runs PROG_DELETE_TEST_BONUS_CONCUR */
PROCEDURE RUN_TEST_BONUS_JOB(
in_pipe_name IN VARCHAR2,
in_job IN VARCHAR2,
io_job_amount IN OUT NUMBER)
IS
jobname VARCHAR2(100);
BEGIN
jobname:=DBMS_SCHEDULER.GENERATE_JOB_NAME;
dbms_scheduler.create_job(job_name => jobname, program_name =>
'PROG_DELETE_TEST_BONUS_CONCUR');
dbms_scheduler.set_job_argument_value(job_name => jobname, argument_name =>
'in_pipe_name' , argument_value => in_pipe_name);
dbms_scheduler.set_job_argument_value(job_name => jobname, argument_name =>
'in_job' , argument_value => in_job);
dbms_output.put_line(SYSDATE || ': Running job: '|| jobname);
dbms_scheduler.RUN_JOB(jobname, false );
io_job_amount:= io_job_amount+1;
END;
-- Anonymous "Main" block
BEGIN
create_prog_delete_test_bonus;
-- Define private pipe
retval := DBMS_PIPE.CREATE_PIPE(DBMS_PIPE.UNIQUE_SESSION_NAME, 100, FALSE);
dbms_output.put_line(SYSDATE || ': Created pipe: ''' || DBMS_PIPE.UNIQUE_SESSION_NAME || ''' returned ' ||retval);
timestart := dbms_utility.get_time();
INSERT into TEST_BONUS(ENAME, JOB) SELECT ROWNUM, 'A' FROM DUAL CONNECT BY LEVEL <= 1000000;
INSERT into TEST_BONUS(ENAME, JOB) SELECT ROWNUM, 'B' FROM DUAL CONNECT BY LEVEL <= 1000000;
INSERT into TEST_BONUS(ENAME, JOB) SELECT ROWNUM, 'C' FROM DUAL CONNECT BY LEVEL <= 1000000;
COMMIT;
duration_insert := dbms_utility.get_time() - timestart;
dbms_output.put_line(SYSDATE || ': Duration (1/100s): INSERT=' || duration_insert);
SELECT COUNT(*) INTO rows_amount FROM TEST_BONUS;
dbms_output.put_line(SYSDATE || ': COUNT(*) FROM TEST_BONUS: ' || rows_amount);
timestart := dbms_utility.get_time();
-- -- Process sequentially
-- PROC_DELETE_TEST_BONUS('A');
-- PROC_DELETE_TEST_BONUS('B');
-- PROC_DELETE_TEST_BONUS('C');
-- start concurrent processing
RUN_TEST_BONUS_JOB(DBMS_PIPE.UNIQUE_SESSION_NAME, 'A', jobs_amount);
RUN_TEST_BONUS_JOB(DBMS_PIPE.UNIQUE_SESSION_NAME, 'B', jobs_amount);
RUN_TEST_BONUS_JOB(DBMS_PIPE.UNIQUE_SESSION_NAME, 'C', jobs_amount);
-- "Barrier": Wait for all jobs to finish
for i in 1 .. jobs_amount loop
-- Reset the local buffer.
DBMS_PIPE.RESET_BUFFER;
-- Wait and receive message. Timeout after an hour.
retval := SYS.DBMS_PIPE.RECEIVE_MESSAGE(SYS.DBMS_PIPE.UNIQUE_SESSION_NAME, 3600);
-- Handle errors: timeout, etc.
IF retval != 0 THEN
raise_application_error(-20000, 'Error: '||to_char(retval)||' receiving on pipe. See Job Log in table user_scheduler_job_run_details');
END IF;
-- Read message from local buffer.
DBMS_PIPE.UNPACK_MESSAGE(message);
dbms_output.put_line(SYSDATE || ': Received message on '''|| DBMS_PIPE.UNIQUE_SESSION_NAME ||''' (Status='|| retval ||'): ' || message);
end loop;
dbms_output.put(SYSDATE || ': Duration (1/100s): DELETE=');
dbms_output.put_line(dbms_utility.get_time() - timestart);
SELECT COUNT(*) INTO rows_amount FROM TEST_BONUS;
dbms_output.put_line(SYSDATE || ': COUNT(*) FROM TEST_BONUS: ' || rows_amount);
retval :=DBMS_PIPE.REMOVE_PIPE(DBMS_PIPE.UNIQUE_SESSION_NAME);
dbms_output.put_line(systimestamp || ': REMOVE_PIPE: ''' || DBMS_PIPE.UNIQUE_SESSION_NAME || ''' returned: ' ||retval);
EXCEPTION
WHEN OTHERS THEN
dbms_output.put_line(SYSDATE || SUBSTR(SQLERRM, 1, 1000) || ' ' ||
SUBSTR(DBMS_UTILITY.FORMAT_ERROR_BACKTRACE, 1, 1000));
retval := DBMS_PIPE.REMOVE_PIPE(DBMS_PIPE.UNIQUE_SESSION_NAME);
dbms_output.put_line(SYSDATE || ': REMOVE_PIPE: ''' || DBMS_PIPE.UNIQUE_SESSION_NAME || ''' returned: ' ||retval);
-- Clean up in case of error
PROC_DELETE_TEST_BONUS('A');
PROC_DELETE_TEST_BONUS('B');
PROC_DELETE_TEST_BONUS('C');
RAISE;
END;
/
您应该始终牢记,在作业中执行的更改是在单独的事务中提交的。
只是为了了解这种并发性实现了什么,这里有一些平均测量值:问题中的顺序代码大约需要 60 秒才能完成,并行代码大约需要 40 秒。 当并行运行的作业超过三个时,这将是一个有趣的进一步调查。
PS 以下是一个有用的查询,可帮助您了解作业的状态
SELECT job_name,
destination,
TO_CHAR(actual_start_date) AS actual_start_date,
run_duration,
TO_CHAR((ACTUAL_START_DATE+run_duration)) AS actual_end_date,
status,
error#,
ADDITIONAL_INFO
FROM user_scheduler_job_run_details
ORDER BY actual_start_date desc;
【讨论】:
【参考方案2】:只是想添加一些关于 Oracle 的 DBMS_PARALLEL_EXECUTE 包的注释。
这不仅可以用于更新表,但许多示例都显示了这个简单的用例。
诀窍是使用匿名块而不是 DML 语句,其余示例仍然相关。所以,而不是这个:
l_sql_stmt := 'update EMPLOYEES e
SET e.salary = e.salary + 10
WHERE manager_id between :start_id and :end_id';
我们可能有这个:
l_sql_stmt := 'BEGIN my_package.some_procedure(:start_id, :end_id); END;';
示例的其余部分可以在“Chunk by User-Provided SQL”中找到example section
您仍然需要告诉 Oracle 每个进程的开始/结束 ID(使用 CREATE_CHUNKS_BY_SQL),我通常将它们存储在单独的查找表中(如果预定义),或者您可以提供返回一组的 SQL 查询开始/结束值。对于后一种方法,请尝试使用 NTILE。例如,使用 8 个块:
select min(id) as start_id, max(id) as end_id
from (
select id, ntile(8) over (order by 1) bucket
from some_table
where some_clause...
)
group by bucket
order by bucket;
希望有帮助
【讨论】:
【参考方案3】:如果是,你可以试试这个:
创建一个包含任务(和任务参数)的新表 创建将从表中读取参数的过程,将它们传递给“旧过程”,然后在处理后更新任务表(使用自主事务)以显示处理已结束 创建任务的外部过程可以扫描任务表以获取有关进度的信息。您可以使用 DBMS_LOCK.SLEEP 等待。【讨论】:
这听起来可行。但是,我正在寻找一种省力(无需额外表格)和正确信号(无需任意睡眠和轮询)的解决方案。我在回答中描述了一种可能但复杂的解决方案:***.com/a/26610201/1845976以上是关于并行运行 PL/SQL 调用并等待执行完成(fork 和 join)的主要内容,如果未能解决你的问题,请参考以下文章