如何限制由 Pig 脚本启动的并发作业数量?

Posted

技术标签:

【中文标题】如何限制由 Pig 脚本启动的并发作业数量?【英文标题】:How to limit number of concurrent jobs that are starting by Pig script? 【发布时间】:2013-12-26 15:18:25 【问题描述】:

我正在尝试使用Hortonworks sandbox 在 Pig 中为 POC 实现简单的数据处理流程。

这个想法如下:有一些已经处理过的数据。新数据集应添加到旧数据中,不得重复。

出于测试目的,我使用非常小的数据集(小于 10 KB)。 对于虚拟机,我分配了 4GB 的 RAM 和 4 个处理器内核中的 2 个。

这是我的猪脚本:

-- CONFIGURABLE PROPERTIES
%DEFAULT atbInput '/user/hue/ATB_Details/in/1'
%DEFAULT atbOutputBase '/user/hue/ATB_Details/out/1'
%DEFAULT atbPrevOutputBase '/user/hue/ATB_Details/in/empty'

%DEFAULT validData 'valid'
%DEFAULT invalidData 'invalid'
%DEFAULT billDateDimensionName 'tmlBillingDate'
%DEFAULT admissionDateDimensionName 'tmlAdmissionDate'
%DEFAULT dischargeDateDimensionName 'tmlDischargeDate'
%DEFAULT arPostDateDimensionName 'tmlARPostDate'
%DEFAULT patientTypeDimensionName 'dicPatientType'
%DEFAULT patientTypeCodeDimensionName 'dicPatientTypeCode'

REGISTER bdw-all-deps-1.0.jar;

DEFINE toDateDimension com.epam.bigdata.etl.udf.ToDateDimension();
DEFINE toCodeDimension com.epam.bigdata.etl.udf.ToCodeDimension();
DEFINE isValid com.epam.bigdata.etl.udf.atbdetails.IsValidFunc();
DEFINE isGarbage com.epam.bigdata.etl.udf.atbdetails.IsGarbageFunc();
DEFINE toAccounntBalanceCategory com.epam.bigdata.etl.udf.atbdetails.ToBalanceCategoryFunc();
DEFINE isEndOfMonth com.epam.bigdata.etl.udf.IsLastDayOfMonthFunc();
DEFINE toBalanceCategoryId com.epam.bigdata.etl.udf.atbdetails.ToBalanceCategoryIdFunc();

rawData = LOAD '$atbInput';

--CLEANSING
SPLIT rawData INTO garbage IF isGarbage($0),
    cleanLines OTHERWISE;

splitRecords = FOREACH cleanLines GENERATE FLATTEN(STRSPLIT($0, '\\|'));

cleanData = FOREACH splitRecords GENERATE
    $0 AS Id:LONG,
    $1 AS FacilityName:CHARARRAY,
    $2 AS SubFacilityName:CHARARRAY,
    $3 AS PeriodDate:CHARARRAY,
    $4 AS AccountNumber:CHARARRAY,
    $5 AS RAC:CHARARRAY,
    $6 AS ServiceTypeCode:CHARARRAY,
    $7 AS ServiceType:CHARARRAY,
    $8 AS AdmissionDate:CHARARRAY,
    $9 AS DischargeDate:CHARARRAY,
    $10 AS BillDate:CHARARRAY,
    $11 AS PatientTypeCode:CHARARRAY,
    $12 AS PatientType:CHARARRAY,
    $13 AS InOutType:CHARARRAY,
    $14 AS FinancialClassCode:CHARARRAY,
    $15 AS FinancialClass:CHARARRAY,
    $16 AS SystemIPGroupCode:CHARARRAY,
    $17 AS SystemIPGroup:CHARARRAY,
    $18 AS CurrentInsuranceCode:CHARARRAY,
    $19 AS CurrentInsurance:CHARARRAY,
    $20 AS InsuranceCode1:CHARARRAY,
    $21 AS InsuranceBalance1:DOUBLE,
    $22 AS InsuranceCode2:CHARARRAY,
    $23 AS InsuranceBalance2:DOUBLE,
    $24 AS InsuranceCode3:CHARARRAY,
    $25 AS InsuranceBalance3:DOUBLE,
    $26 AS InsuranceCode4:CHARARRAY,
    $27 AS InsuranceBalance4:DOUBLE,
    $28 AS InsuranceCode5:CHARARRAY,
    $29 AS InsuranceBalance5:DOUBLE,
    $30 AS AgingBucket:CHARARRAY,
    $31 AS AccountBalance:DOUBLE,
    $32 AS TotalCharges:DOUBLE,
    $33 AS TotalPayments:DOUBLE,
    $34 AS EstimatedRevenue:DOUBLE,
    $35 AS CreateDateTime:CHARARRAY,
    $36 AS UniqueFileId:LONG,
    $37 AS PatientBalance:LONG,
    $38 AS VendorCode:CHARARRAY;


--VALIDATION
SPLIT cleanData INTO validData IF isValid(*),
    invalidData OTHERWISE;

--Dimension update--

--MACROS
DEFINE mergeDateDimension(validDataSet, dimensionFieldName, previousDimensionFile) RETURNS merged 
    dates = FOREACH $validDataSet GENERATE $dimensionFieldName;
    oldDimensions = LOAD '$previousDimensionFile' USING PigStorage('|') AS (
        id:LONG,
        monthName:CHARARRAY,
        monthId:INT,
        year:INT,
        fiscalYear:INT,
        originalDate:CHARARRAY);
    oldOriginalDates = FOREACH oldDimensions GENERATE originalDate;
    allDates = UNION dates, oldOriginalDates;
    uniqueDates = DISTINCT allDates;
    $merged = FOREACH uniqueDates GENERATE toDateDimension($0);
;


DEFINE mergeCodeDimension(validDataSet, dimensionFieldName, previousDimensionFile, outputIdField) RETURNS merged 
    newCodes = FOREACH $validDataSet GENERATE $dimensionFieldName as newCode;
    oldDim = LOAD '$previousDimensionFile' USING PigStorage('|') AS (
        id:LONG,
        code:CHARARRAY);
    allCodes = COGROUP oldDim BY code, newCodes BY newCode;

    grouped = FOREACH allCodes GENERATE  
        (IsEmpty(oldDim) ? 0L : SUM(oldDim.id)) as id,
        group AS code;
    ranked = RANK grouped BY id DESC, code DESC DENSE;
    $merged = FOREACH ranked GENERATE
        ((id == 0L) ? $0 : id) as $outputIdField,
        code AS $dimensionFieldName;
;

--DATE DIMENSIONS
billDateDim = mergeDateDimension(validData, BillDate, '$atbPrevOutputBase/dimensions/$billDateDimensionName');
STORE billDateDim INTO '$atbOutputBase/dimensions/$billDateDimensionName';

admissionDateDim = mergeDateDimension(validData, AdmissionDate, '$atbPrevOutputBase/dimensions/$admissionDateDimensionName');
STORE admissionDateDim INTO '$atbOutputBase/dimensions/$admissionDateDimensionName';

dischDateDim = mergeDateDimension(validData, DischargeDate, '$atbPrevOutputBase/dimensions/$dischargeDateDimensionName');
STORE dischDateDim INTO '$atbOutputBase/dimensions/$dischargeDateDimensionName';

arPostDateDim =  mergeDateDimension(validData, PeriodDate, '$atbPrevOutputBase/dimensions/$arPostDateDimensionName');
STORE arPostDateDim INTO '$atbOutputBase/dimensions/$arPostDateDimensionName';

--CODE DIMENSION
patientTypeDim = mergeCodeDimension(validData, PatientType, '$atbPrevOutputBase/dimensions/$patientTypeDimensionName', PatientTypeId);
STORE patientTypeDim INTO '$atbOutputBase/dimensions/$patientTypeDimensionName' USING PigStorage('|');

patientTypeCodeDim =  mergeCodeDimension(validData, PatientTypeCode, '$atbPrevOutputBase/dimensions/$patientTypeCodeDimensionName', PatientTypeCodeId);
STORE patientTypeCodeDim INTO '$atbOutputBase/dimensions/$patientTypeCodeDimensionName' USING PigStorage('|');

问题是当我运行这个脚本时它永远不会完成(卡住)。 在 Job Browser 中,我可以看到一个已完成的工作和多个进度为 0% 的工作。

如果我注释掉最后三个文件的处理 - 一切正常(即三个并行作业成功)。

我尝试了几种方法来解决这个问题:

    -no_multiquery Pig 参数 - 允许一次只使用一个作业完全执行脚本。主要缺点是生成的作业数量巨大 (26) 和非常长的执行时间(描述的脚本接近 15 分钟,更复杂的版本接近 40 分钟)。 仅使用我通过注释掉其他部分来开发和测试的部分 - 从长远来看,这不是一个选项。 更改 ma​​pred-site.xml 中的 ma​​pred.capacity-scheduler.maximum-system-jobs 属性,因此一次应该少于三个作业 as described here。 在 capacity-scheduler.xml 中更改 ma​​pred.capacity-scheduler.queue.default.maximum-capacity 以配置默认队列。但这种方法对我的效果不如以前。 为沙盒虚拟机以及映射器和缩减器分配更多内存 - 无效。

所以我的问题是如何限制由 Pig 脚本启动的并发作业的数量? 或者是否有其他配置修复允许同时执行多个作业?


[更新]

如果我从 shell 控制台使用相同的输入数据运行相同的脚本 - 一切正常。 所以我认为 HUE 存在一些问题。


[更新]

如果我从控制台启动更复杂的脚本,它也会卡住,但在这种情况下,并行作业的数量是 8。

【问题讨论】:

首先,您的错误是尝试使用单节点伪分布式集群进行分析。这适用于测试您的脚本是否编译并产生正确的输出,而不是分析。如果您需要配置文件,则需要设置一个真正的多节点集群。其次,为什么要限制并发 MR 作业的数量?通常人们谈论的是 Map 或 Reduce 任务的数量......当一个 MR 作业的大量任务阻止另一个 MR 作业(其任务必须等待挂起)运行时。你为什么要违反 Hadoop 的并行范式? 1. 我不尝试分析。我只想拥有能够在合适的时间范围内运行我的脚本的开发环境。 2. 因为如果有很多工作,他们就会陷入困境。所以我想将它们放入队列中,一次只处理 3 个作业。 那么您似乎陷入了性能限制,并正在尝试做一些技巧来解决它们。也许您可以尝试类似于 Hortonworks VM 的 Cloudera Quickstart VM。此外(如果您不想设置基于硬件的集群)您可以在 Amazon Elastic MapReduce 中租用 Hadoop 实例(具有许多节点)。 至于您对 HUE 的更新 - HUE 运行其作业的用户与您从控制台运行作业时使用的用户之间的配置是否存在差异? 不,在 ssh 中我使用与 Web UI 相同的用户(即色调)。我注意到的唯一区别是作业浏览器中缺少 TempletonControllerJob。看起来 HUE 使用的脚本执行机制与纯 Pig 不同。 【参考方案1】:

上次我们看到这个是因为集群有only one map task。

【讨论】:

但在我的情况下,三个作业可以并行运行。所以任务容量不止一个。【参考方案2】:

您可以按照此处所述使用 EXEC:

http://pig.apache.org/docs/r0.11.1/perf.html#Implicit-Dependencies

【讨论】:

以上是关于如何限制由 Pig 脚本启动的并发作业数量?的主要内容,如果未能解决你的问题,请参考以下文章

在 Amazon AWS 上并行运行 Pig 脚本

如何限制 Google Bigquery 中的作业数量 [关闭]

如何使用 oozie 启动 N 次脚本 Pig?

从 oozie 提交猪作业

如何根据条件限制并发消息消耗

通过 Pig 转储中间 MR 作业数据